From 1d59ad71b65848610e3f2d91b3ecd0de9931e2ca Mon Sep 17 00:00:00 2001
From: zhanglianghy <zhanglianghy@cmhi.chinamobile.com>
Date: Thu, 29 Aug 2019 15:52:17 +0800
Subject: [PATCH] =?UTF-8?q?MOD=20aaa-12=20=E8=A7=A3=E5=86=B3configm?=
 =?UTF-8?q?=E9=87=8D=E5=90=AF=E4=B9=8B=E5=90=8E=EF=BC=8Cwebserver=E7=AC=AC?=
 =?UTF-8?q?=E4=B8=80=E6=AC=A1=E5=8F=91=E9=80=81=E9=85=8D=E7=BD=AE=E5=A4=B1?=
 =?UTF-8?q?=E8=B4=A5=E7=9A=84=E9=97=AE=E9=A2=98=20SOL=20=20=E8=A7=A3?=
 =?UTF-8?q?=E5=86=B3configm=E9=87=8D=E5=90=AF=E4=B9=8B=E5=90=8E=EF=BC=8Cwe?=
 =?UTF-8?q?bserver=E7=AC=AC=E4=B8=80=E6=AC=A1=E5=8F=91=E9=80=81=E9=85=8D?=
 =?UTF-8?q?=E7=BD=AE=E5=A4=B1=E8=B4=A5=E7=9A=84=E9=97=AE=E9=A2=98=20?=
 =?UTF-8?q?=E4=BF=AE=E6=94=B9=E4=BA=BA=EF=BC=9Azhangliang=20=E6=A3=80?=
 =?UTF-8?q?=E8=A7=86=E4=BA=BA=EF=BC=9Azhangliang?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 Platform/common/rpc/rpc_thread.h              |  1 +
 .../user/configm/config-api/configclient.c    |  8 +++
 Platform/user/rpc/rpc_client.c                | 57 ++++++++++--------
 Platform/user/rpc/rpc_conn.c                  | 60 +++++++++----------
 Platform/user/rpc/rpc_thread.c                | 14 ++---
 5 files changed, 77 insertions(+), 63 deletions(-)

diff --git a/Platform/common/rpc/rpc_thread.h b/Platform/common/rpc/rpc_thread.h
index 3fdb4b369..64d205dbf 100755
--- a/Platform/common/rpc/rpc_thread.h
+++ b/Platform/common/rpc/rpc_thread.h
@@ -40,6 +40,7 @@ struct _rpc_worker_thread {
 struct _rpc_client_thread {
 	pthread_t thread_send_id;
 	pthread_t thread_receive_id;
+    pthread_mutex_t mutex;
 
 	struct ev_loop *loop;
 
diff --git a/Platform/user/configm/config-api/configclient.c b/Platform/user/configm/config-api/configclient.c
index ec35575e1..6b6e910e0 100644
--- a/Platform/user/configm/config-api/configclient.c
+++ b/Platform/user/configm/config-api/configclient.c
@@ -1,5 +1,6 @@
 #include <stdlib.h>
 #include <stdio.h>
+#include "rpc_thread.h"
 #include "rpc.h"
 #include "configmapi.h"
 
@@ -95,6 +96,13 @@ ret_code web_config_exec_sync(uint config_type, uint64 config_id,
     if(code == RET_SENDERR || code == RET_TIMEOUT)
     {
         config_client_destroy();
+        client = config_client_get();
+        if(client)
+        {
+            code = rpc_client_call(client, "ConfigManger#0", "cm_config_process", 
+                                     config_msg, msg_len, (pointer)output, output_len);
+            ASSERT_RET_NO(code);
+        }
     }
 
     config_destroy_msg(config_msg, msg_len);
diff --git a/Platform/user/rpc/rpc_client.c b/Platform/user/rpc/rpc_client.c
index 3ccc45574..4eb652b21 100755
--- a/Platform/user/rpc/rpc_client.c
+++ b/Platform/user/rpc/rpc_client.c
@@ -211,52 +211,59 @@ void rpc_client_call_async_thread(rpc_client* client, char* service_name,
 
 ret_code rpc_client_call_async(rpc_client* client, char* service_name,
                                   char* method_name, pointer input, int input_len,
-		                          rpc_callback callback, pointer data) {
+                                  rpc_callback callback, pointer data) {
 
-    rpc_client_thread *th = client->threads;		                      
-	rpc_request *req = NULL;    
-	rpc_conn *conn = NULL;
+    rpc_client_thread *th = client->threads;
+    rpc_request *req = NULL;    
+    rpc_conn *conn = NULL;
     ret_code ret = RET_OK;
     int idx;
 
-	idx = (th->last_conn + 1) % (th->req_conn_count);
-	conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
-    /*if(!conn){
-        rpc_req_conns_new(client, th);
-        conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
-    }*/
+    pthread_mutex_lock(&th->mutex);
+    idx = (th->last_conn + 1) % (th->req_conn_count);
+    conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
 
     if(!conn)
     {
+       pthread_mutex_unlock(&th->mutex);
        return RET_OK;
     }
+    
+    if(conn->sfd == -1)
+    {
+        pthread_mutex_unlock(&th->mutex);
+        return RET_SENDERR;
+    }
 
     req = rpc_request_new();
     if(req == NULL){
+        pthread_mutex_unlock(&th->mutex);
         return RET_OK;
     }
-        
-	req->callback = callback;
-	req->method_name = method_name;
-	req->service_name = service_name;
-	req->input = input;
-	req->input_len = input_len;
-	req->data = data;
+
+    req->callback = callback;
+    req->method_name = method_name;
+    req->service_name = service_name;
+    req->input = input;
+    req->input_len = input_len;
+    req->data = data;
 
     //push pool
-	req->seq = rpc_sessionpool_insert(th->req_pool, req);
+    req->seq = rpc_sessionpool_insert(th->req_pool, req);
 
-    
-	th->last_conn = idx;
+
+    th->last_conn = idx;
     rpc_log_dbg("rpc_client_call_async:send_message\n");
 
-	//write data
-	rpc_request_format(req, conn);
-    
-	if(rpc_client_send(conn) == FALSE)
-	{
+    //write data
+    rpc_request_format(req, conn);
+
+    if(conn->sfd == -1 || rpc_client_send(conn) == FALSE)
+    {
         ret = RET_SENDERR;
     }
+    
+    pthread_mutex_unlock(&th->mutex);
 
     return ret;
 }
diff --git a/Platform/user/rpc/rpc_conn.c b/Platform/user/rpc/rpc_conn.c
index 6797dfe6c..2bca9a407 100755
--- a/Platform/user/rpc/rpc_conn.c
+++ b/Platform/user/rpc/rpc_conn.c
@@ -93,7 +93,7 @@ static boolean rsp_read_from_conn(rpc_conn *c) {
         
 		if (n == 0) {
 			rpc_log_error("connected closed!\n");
-			//client_conn_close(c);
+            rpc_conn_close(c);
 			rpc_sleep(1000);
 			return FALSE;
 		} else if (n == -1) {
@@ -101,7 +101,7 @@ static boolean rsp_read_from_conn(rpc_conn *c) {
 				continue;
 			}
 			rpc_log_error("read error!");
-			//client_conn_close(c);
+            rpc_conn_close(c);
 			rpc_sleep(1000);
 			break;
 		} else {
@@ -273,11 +273,15 @@ static void cb_req_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
 }
 
 static void cb_rsp_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
-	rpc_conn *c = watcher->data;
+	rpc_conn *c = watcher->data;    
+    rpc_client_thread *th = (rpc_client_thread *) c->thread;
+
+    pthread_mutex_lock(&th->mutex);
 	//read response
 	if (rsp_read_from_conn(c)) {
 		for (;;) {
 			if (c->rbytes <= 0) {
+                pthread_mutex_unlock(&th->mutex);
 				return;
 			}
 			boolean has_copyhead = FALSE;
@@ -288,13 +292,16 @@ static void cb_rsp_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
 					result = rpc_response_parse(c, &rsp);
 					if (result == RPC_Parse_Error) {
 						fprintf(stderr, "response parse error!\n");
+                        pthread_mutex_unlock(&th->mutex);
 						return;
 					} else if (result == RPC_Parse_NeedData) {
 						c->unprocess_data = rpc_response_copy_head(rsp);
 						rpc_response_free(rsp);
+                        pthread_mutex_unlock(&th->mutex);
 						return;
 					}
 				} else {
+				    pthread_mutex_unlock(&th->mutex);
 					return;
 				}
 			} else {
@@ -308,10 +315,10 @@ static void cb_rsp_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
 					c->unprocess_data = NULL;
 					has_copyhead = TRUE;
 				} else {
+                    pthread_mutex_unlock(&th->mutex);
 					return;
 				}
 			}
-			rpc_client_thread *th = (rpc_client_thread *) c->thread;
 			//pop send queue
 
 			rpc_request *req = NULL;
@@ -320,6 +327,7 @@ static void cb_rsp_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
 			req = (rpc_request*) rpc_sessionpool_get(th->req_pool, rsp->seq);
 			if ((req == NULL) || (req->seq != rsp->seq)) {
 				fprintf(stderr, "seq not equal!\n");
+                pthread_mutex_unlock(&th->mutex);
 				return;
 			}
 			//TODO
@@ -338,39 +346,29 @@ static void cb_rsp_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
 	} else {
 		fprintf(stderr, "read response error!\n");
 	}
+    
+    pthread_mutex_unlock(&th->mutex);
 }
 
 void rpc_conn_close(rpc_conn *c) {
-	assert(c!=NULL);
-	ev_io_stop(c->loop, &c->watcher);
-	close(c->sfd);
-	c->rcurr = 0;
-	c->iovused = 0;
-	c->thread = NULL;
-	c->loop = NULL;
-	rpc_conn_add_freelist(c);
-}
-
-/*void client_conn_close(rpc_conn *c) {
-    rpc_client_thread *th;
-	assert(c!=NULL);
-
-    if(th = (rpc_client_thread *)c->thread){
-        rpc_array_del(th->req_conns, c);
+	if(c == NULL){
+        return;
     }
-    
-	ev_io_stop(c->loop, &c->watcher);
-	close(c->sfd);
-    
-    c->sfd = -1;
-	c->rcurr = 0;
-	c->iovused = 0;
-	c->thread = NULL;
-	c->loop = NULL;
 
-	rpc_conn_add_freelist(c);
-}*/
+    if(c->loop){
+        ev_io_stop(c->loop, &c->watcher);
+    }
+    if(c->sfd != -1){
+        close(c->sfd);
+        c->sfd = -1;
+    }
 
+    c->rcurr = 0;
+    c->iovused = 0;
+    c->thread = NULL;
+    c->loop = NULL;    
+    rpc_conn_add_freelist(c);
+}
 
 static rpc_conn *rpc_conn_new_inner(int fd, struct ev_loop* l, void(*cb)(
 		struct ev_loop *l, struct ev_io *watcher, int revents)) {
diff --git a/Platform/user/rpc/rpc_thread.c b/Platform/user/rpc/rpc_thread.c
index 58ba5cca9..d681ae91e 100755
--- a/Platform/user/rpc/rpc_thread.c
+++ b/Platform/user/rpc/rpc_thread.c
@@ -344,29 +344,28 @@ rpc_conn *rpc_req_conns_new(rpc_client *client, rpc_client_thread *th)
 
 boolean rpc_client_thread_destroy(rpc_client_thread *th) {
     rpc_conn *c = NULL;
-    //void  *nRes;
 
     if(th == NULL){
         return FALSE;
     }
     
-    //pthread_cancel(th->thread_receive_id);
-    //pthread_join(th->thread_receive_id, &nRes);
-
+    pthread_mutex_lock(&th->mutex);
     c = (rpc_conn *)rpc_array_get(th->req_conns);
     while(c){
         rpc_conn_close(c);
         c = (rpc_conn *)rpc_array_get(th->req_conns);
-    }
+    }    
+    rpc_array_destroy(th->req_conns);
+    pthread_mutex_unlock(&th->mutex);    
+    pthread_mutex_destroy(&th->mutex);
     
     ev_loop_destroy(th->loop);
-    rpc_array_destroy(th->req_conns);
     rpc_async_queue_free(th->req_pending);
     rpc_sessionpool_free(th->req_pool);
     rpc_sessionpool_free(th->req_timer);
     
     th->req_conn_count = 0;
-    th->req_conns      = NULL;    
+    th->req_conns      = NULL;
     th->req_pending    = NULL;
     th->req_pool       = NULL;
     th->req_timer      = NULL;
@@ -383,6 +382,7 @@ boolean rpc_client_thread_init(rpc_client *client, rpc_client_thread *th) {
 	th->req_conn_count = CLIENT_CONN_NUM;
 	th->req_conns = rpc_array_new();
 	th->last_conn = -1;
+    pthread_mutex_init(&th->mutex, NULL);
 	int i;
 	for (i = 0; i < th->req_conn_count; i++) {
         if(rpc_req_conns_new(client, th) == NULL){