MOD aaa-12 解决configm重启之后,webserver第一次发送配置失败的问题

SOL  解决configm重启之后,webserver第一次发送配置失败的问题
修改人:zhangliang
检视人:zhangliang
This commit is contained in:
zhanglianghy 2019-08-29 15:52:17 +08:00
parent e644d1e74e
commit 1d59ad71b6
5 changed files with 77 additions and 63 deletions

View File

@ -40,6 +40,7 @@ struct _rpc_worker_thread {
struct _rpc_client_thread {
pthread_t thread_send_id;
pthread_t thread_receive_id;
pthread_mutex_t mutex;
struct ev_loop *loop;

View File

@ -1,5 +1,6 @@
#include <stdlib.h>
#include <stdio.h>
#include "rpc_thread.h"
#include "rpc.h"
#include "configmapi.h"
@ -95,6 +96,13 @@ ret_code web_config_exec_sync(uint config_type, uint64 config_id,
if(code == RET_SENDERR || code == RET_TIMEOUT)
{
config_client_destroy();
client = config_client_get();
if(client)
{
code = rpc_client_call(client, "ConfigManger#0", "cm_config_process",
config_msg, msg_len, (pointer)output, output_len);
ASSERT_RET_NO(code);
}
}
config_destroy_msg(config_msg, msg_len);

View File

@ -211,52 +211,59 @@ void rpc_client_call_async_thread(rpc_client* client, char* service_name,
ret_code rpc_client_call_async(rpc_client* client, char* service_name,
char* method_name, pointer input, int input_len,
rpc_callback callback, pointer data) {
rpc_callback callback, pointer data) {
rpc_client_thread *th = client->threads;
rpc_request *req = NULL;
rpc_conn *conn = NULL;
rpc_client_thread *th = client->threads;
rpc_request *req = NULL;
rpc_conn *conn = NULL;
ret_code ret = RET_OK;
int idx;
idx = (th->last_conn + 1) % (th->req_conn_count);
conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
/*if(!conn){
rpc_req_conns_new(client, th);
conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
}*/
pthread_mutex_lock(&th->mutex);
idx = (th->last_conn + 1) % (th->req_conn_count);
conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
if(!conn)
{
pthread_mutex_unlock(&th->mutex);
return RET_OK;
}
if(conn->sfd == -1)
{
pthread_mutex_unlock(&th->mutex);
return RET_SENDERR;
}
req = rpc_request_new();
if(req == NULL){
pthread_mutex_unlock(&th->mutex);
return RET_OK;
}
req->callback = callback;
req->method_name = method_name;
req->service_name = service_name;
req->input = input;
req->input_len = input_len;
req->data = data;
req->callback = callback;
req->method_name = method_name;
req->service_name = service_name;
req->input = input;
req->input_len = input_len;
req->data = data;
//push pool
req->seq = rpc_sessionpool_insert(th->req_pool, req);
req->seq = rpc_sessionpool_insert(th->req_pool, req);
th->last_conn = idx;
th->last_conn = idx;
rpc_log_dbg("rpc_client_call_async:send_message\n");
//write data
rpc_request_format(req, conn);
if(rpc_client_send(conn) == FALSE)
{
//write data
rpc_request_format(req, conn);
if(conn->sfd == -1 || rpc_client_send(conn) == FALSE)
{
ret = RET_SENDERR;
}
pthread_mutex_unlock(&th->mutex);
return ret;
}

View File

@ -93,7 +93,7 @@ static boolean rsp_read_from_conn(rpc_conn *c) {
if (n == 0) {
rpc_log_error("connected closed!\n");
//client_conn_close(c);
rpc_conn_close(c);
rpc_sleep(1000);
return FALSE;
} else if (n == -1) {
@ -101,7 +101,7 @@ static boolean rsp_read_from_conn(rpc_conn *c) {
continue;
}
rpc_log_error("read error!");
//client_conn_close(c);
rpc_conn_close(c);
rpc_sleep(1000);
break;
} else {
@ -273,11 +273,15 @@ static void cb_req_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
}
static void cb_rsp_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
rpc_conn *c = watcher->data;
rpc_conn *c = watcher->data;
rpc_client_thread *th = (rpc_client_thread *) c->thread;
pthread_mutex_lock(&th->mutex);
//read response
if (rsp_read_from_conn(c)) {
for (;;) {
if (c->rbytes <= 0) {
pthread_mutex_unlock(&th->mutex);
return;
}
boolean has_copyhead = FALSE;
@ -288,13 +292,16 @@ static void cb_rsp_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
result = rpc_response_parse(c, &rsp);
if (result == RPC_Parse_Error) {
fprintf(stderr, "response parse error!\n");
pthread_mutex_unlock(&th->mutex);
return;
} else if (result == RPC_Parse_NeedData) {
c->unprocess_data = rpc_response_copy_head(rsp);
rpc_response_free(rsp);
pthread_mutex_unlock(&th->mutex);
return;
}
} else {
pthread_mutex_unlock(&th->mutex);
return;
}
} else {
@ -308,10 +315,10 @@ static void cb_rsp_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
c->unprocess_data = NULL;
has_copyhead = TRUE;
} else {
pthread_mutex_unlock(&th->mutex);
return;
}
}
rpc_client_thread *th = (rpc_client_thread *) c->thread;
//pop send queue
rpc_request *req = NULL;
@ -320,6 +327,7 @@ static void cb_rsp_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
req = (rpc_request*) rpc_sessionpool_get(th->req_pool, rsp->seq);
if ((req == NULL) || (req->seq != rsp->seq)) {
fprintf(stderr, "seq not equal!\n");
pthread_mutex_unlock(&th->mutex);
return;
}
//TODO
@ -338,39 +346,29 @@ static void cb_rsp_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
} else {
fprintf(stderr, "read response error!\n");
}
pthread_mutex_unlock(&th->mutex);
}
void rpc_conn_close(rpc_conn *c) {
assert(c!=NULL);
ev_io_stop(c->loop, &c->watcher);
close(c->sfd);
c->rcurr = 0;
c->iovused = 0;
c->thread = NULL;
c->loop = NULL;
rpc_conn_add_freelist(c);
}
/*void client_conn_close(rpc_conn *c) {
rpc_client_thread *th;
assert(c!=NULL);
if(th = (rpc_client_thread *)c->thread){
rpc_array_del(th->req_conns, c);
if(c == NULL){
return;
}
ev_io_stop(c->loop, &c->watcher);
close(c->sfd);
c->sfd = -1;
c->rcurr = 0;
c->iovused = 0;
c->thread = NULL;
c->loop = NULL;
rpc_conn_add_freelist(c);
}*/
if(c->loop){
ev_io_stop(c->loop, &c->watcher);
}
if(c->sfd != -1){
close(c->sfd);
c->sfd = -1;
}
c->rcurr = 0;
c->iovused = 0;
c->thread = NULL;
c->loop = NULL;
rpc_conn_add_freelist(c);
}
static rpc_conn *rpc_conn_new_inner(int fd, struct ev_loop* l, void(*cb)(
struct ev_loop *l, struct ev_io *watcher, int revents)) {

View File

@ -344,29 +344,28 @@ rpc_conn *rpc_req_conns_new(rpc_client *client, rpc_client_thread *th)
boolean rpc_client_thread_destroy(rpc_client_thread *th) {
rpc_conn *c = NULL;
//void *nRes;
if(th == NULL){
return FALSE;
}
//pthread_cancel(th->thread_receive_id);
//pthread_join(th->thread_receive_id, &nRes);
pthread_mutex_lock(&th->mutex);
c = (rpc_conn *)rpc_array_get(th->req_conns);
while(c){
rpc_conn_close(c);
c = (rpc_conn *)rpc_array_get(th->req_conns);
}
}
rpc_array_destroy(th->req_conns);
pthread_mutex_unlock(&th->mutex);
pthread_mutex_destroy(&th->mutex);
ev_loop_destroy(th->loop);
rpc_array_destroy(th->req_conns);
rpc_async_queue_free(th->req_pending);
rpc_sessionpool_free(th->req_pool);
rpc_sessionpool_free(th->req_timer);
th->req_conn_count = 0;
th->req_conns = NULL;
th->req_conns = NULL;
th->req_pending = NULL;
th->req_pool = NULL;
th->req_timer = NULL;
@ -383,6 +382,7 @@ boolean rpc_client_thread_init(rpc_client *client, rpc_client_thread *th) {
th->req_conn_count = CLIENT_CONN_NUM;
th->req_conns = rpc_array_new();
th->last_conn = -1;
pthread_mutex_init(&th->mutex, NULL);
int i;
for (i = 0; i < th->req_conn_count; i++) {
if(rpc_req_conns_new(client, th) == NULL){