MOD aaa-12 解决rpc server重启后,rpc client无法连接的问题以及brconfig对vlan桥检查错误的问题

SOL   解决rpc server重启后,rpc client无法连接的问题以及brconfig对vlan桥检查错误的问题
修改人:zhangliang
检视人:zhangliang
This commit is contained in:
zhanglianghy 2019-08-20 14:38:48 +08:00
parent f2f1fc5ede
commit a72f0cefd6
15 changed files with 375 additions and 60 deletions

View File

@ -22,4 +22,8 @@ pointer rpc_array_index(rpc_array* array, int index);
boolean rpc_array_add(rpc_array* array, pointer data);
boolean rpc_array_del(rpc_array* array, pointer data);
boolean rpc_array_destroy(rpc_array* array) ;
#endif /* RPC_ARRAY_H_ */

View File

@ -24,6 +24,7 @@ struct _rpc_client {
};
rpc_client* rpc_client_new();
void rpc_client_destroy(rpc_client *client);
boolean rpc_client_connect(rpc_client* client, char* host, int port);
rpc_client* rpc_client_connect_ex(char *dst_name);

View File

@ -16,7 +16,7 @@
#define BUF_MAX_SIZE 163840
#define BUFFER_SIZE (BUF_MAX_SIZE + 40960)
#define RPC_VERSION "RPC/1.0"
#define TIMEOUT 120
#define TIMEOUT 15
#define TIMEWARN 200
typedef enum {
@ -38,7 +38,8 @@ typedef enum {
RET_IPINVALID = 13,
RET_NAMEINVAL = 14,
RET_EXIST = 15,
RET_FULL = 16
RET_FULL = 16,
RET_SENDERR = 17
} ret_code;
#define RET_ERROR_DISC \
@ -59,7 +60,8 @@ typedef enum {
{ RET_IPINVALID, "IpInvalid"},\
{ RET_NAMEINVAL, "NameInvalid"},\
{ RET_EXIST, "AlreadyExist"},\
{ RET_FULL, "Full"}\
{ RET_FULL, "Full"},\
{ RET_SENDERR, "SendErr"}\
}
#define RET_BUFF_SIZE 256;

View File

@ -36,7 +36,7 @@ rpc_conn *rpc_conn_client_new(int fd, struct ev_loop *l);
void rpc_conn_addiov(rpc_conn *c, pointer data, size_t data_len);
void rpc_send_message(rpc_conn *conn);
boolean rpc_client_send(rpc_conn *conn);
void rpc_conn_close(rpc_conn *c);

View File

@ -22,6 +22,8 @@ pointer rpc_sessionpool_get(rpc_sessionpool *pool, int index);
boolean rpc_sessionpool_remove(rpc_sessionpool *pool, int index);
boolean rpc_sessionpool_free(rpc_sessionpool *pool);
END_DECLS
#endif /* RPC_SESSIONPOOL_H_ */

View File

@ -66,4 +66,8 @@ boolean rpc_client_thread_init(rpc_client *client, rpc_client_thread *th);
boolean rpc_client_thread_start(rpc_client_thread *th);
rpc_conn *rpc_req_conns_new(rpc_client *client, rpc_client_thread *th);
boolean rpc_client_thread_destroy(rpc_client_thread *th);
#endif /* RPC_THREAD_H_ */

View File

@ -21,6 +21,18 @@ rpc_client *config_client_get()
return config_client;
}
void config_client_destroy()
{
if(config_client)
{
rpc_client_destroy(config_client);
config_client = NULL;
}
return;
}
ret_code config_construct_msg(uint config_type, uint64 config_id,
char* config_data, int config_len,
config_msg_t **config_msg, int *msg_len)
@ -58,7 +70,6 @@ ret_code config_destroy_msg(config_msg_t *config_msg, int msg_len)
}
}
ret_code web_config_exec_sync(uint config_type, uint64 config_id,
char* config_data, int config_len,
char**output, int *output_len)
@ -81,6 +92,11 @@ ret_code web_config_exec_sync(uint config_type, uint64 config_id,
config_msg, msg_len, (pointer)output, output_len);
ASSERT_RET_NO(code);
if(code == RET_SENDERR || code == RET_TIMEOUT)
{
config_client_destroy();
}
config_destroy_msg(config_msg, msg_len);
return code;
@ -107,6 +123,10 @@ ret_code web_config_exec_async(uint config_type, uint64 config_id,
ret = rpc_client_call_async(client, "ConfigManger#0", "cm_config_process",
config_msg, msg_len, callback, data);
if(ret == RET_SENDERR || ret == RET_TIMEOUT)
{
config_client_destroy();
}
config_destroy_msg(config_msg, msg_len);

View File

@ -21,8 +21,13 @@ br_event_head_t br_event_tbl = {.head.first = NULL, .lock = 0, .init = FALSE};
int br_vlan_bridge(char *br_name)
{
if(br_name[0] == 'b' && br_name[1] == 'r'
&& br_name[2] == 'v' && br_name[3] == 'l')
if(strlen(br_name) < 5)
{
return FALSE;
}
if(br_name[0] == 'b' && br_name[1] == 'r' && br_name[2] == '-'
&& br_name[3] == 'v' && br_name[4] == 'l')
{
return TRUE;
}

View File

@ -23,6 +23,8 @@ int main(int argc, char **argv)
config_id = strtol(argv[1], &stop, 16);
printf("config id:0x%lx\n",config_id);
//while(1)
{
f = fopen(argv[1],"r+");
if(f == NULL)
{
@ -36,15 +38,18 @@ int main(int argc, char **argv)
printf("configure: %s\n", config_linebuf);
code = web_config_exec_sync(CM_CONFIG_GET, config_id,
code = web_config_exec_sync(CM_CONFIG_SET, config_id,
config_linebuf, strlen(config_linebuf) + 1, &output, &output_len);
printf("call config type retturn:%s,result:%s\n", rpc_code_format(code), output);
printf("call config type return:%s,result:%s\n", rpc_code_format(code), output);
memset(config_linebuf, 0, sizeof(config_linebuf));
memset(output, 0, output_len);
}
fclose(f);
rpc_sleep(1000);
}
return EXIT_SUCCESS;
}

View File

@ -55,6 +55,49 @@ pointer rpc_array_index(rpc_array* array, int index) {
return data;
}
boolean rpc_array_del(rpc_array* array, pointer data) {
int i = 0;
int j = 0;
assert(array);
assert(data);
pthread_mutex_lock(&array->lock);
for(i = 0; i < array->cur; i++)
if (array->data[i] == data) {
if(i < array->cur - 1){
for(j = i; j < array->cur - 1; j++){
array->data[j] = array->data[j + 1];
}
array->cur--;
}
else{
array->data[i] = NULL;
array->cur--;
}
break;
}
pthread_mutex_unlock(&array->lock);
return TRUE;
}
boolean rpc_array_destroy(rpc_array* array) {
if(!array){
return FALSE;
}
pthread_mutex_lock(&array->lock);
rpc_free(array->data);
array->cur = 0;
array->size = 0;
pthread_mutex_unlock(&array->lock);
pthread_mutex_destroy(&array->lock);
return TRUE;
}
boolean rpc_array_add(rpc_array* array, pointer data) {
pthread_mutex_lock(&array->lock);
if (array->cur < array->size) {

View File

@ -18,6 +18,24 @@ struct _rpc_async_queue {
//thread_wait
};
static pointer rpc_asyn_queue_pop_no_lock(rpc_async_queue *queue) {
rpc_queue_item *item = NULL;
item = queue->head;
if (item != NULL) {
queue->head = item->next;
if (item->next == NULL) {
queue->tail = NULL;
}
}
if (item) {
rpc_queue_item_free(item);
return item->data;
} else {
return NULL;
}
}
rpc_async_queue* rpc_async_queue_new() {
rpc_async_queue *queue = rpc_new(rpc_async_queue,1);
pthread_mutex_init(&queue->mutex, NULL);
@ -28,12 +46,17 @@ rpc_async_queue* rpc_async_queue_new() {
}
void rpc_async_queue_free(rpc_async_queue *queue) {
if(!queue){
return;
}
pthread_mutex_lock(&queue->mutex);
pointer data;
while ((data = rpc_async_queue_try_pop(queue)) != NULL) {
while ((data = rpc_asyn_queue_pop_no_lock(queue)) != NULL) {
rpc_free(data);
}
pthread_mutex_unlock(&queue->mutex);
pthread_mutex_destroy(&queue->mutex);
pthread_cond_destroy(&queue->cond);
rpc_free(queue);
}

View File

@ -40,7 +40,7 @@ rpc_client* rpc_client_new() {
client->host = NULL;
client->port = 0;
client->thread_count = CLIENT_THREAD_NUM;
client->threads = rpc_new(rpc_client_thread,CLIENT_THREAD_NUM);
client->threads = rpc_new0(rpc_client_thread,CLIENT_THREAD_NUM);
client->last_thread = -1;
client->init_count = 0;
pthread_mutex_init(&client->mutex, NULL);
@ -49,16 +49,22 @@ rpc_client* rpc_client_new() {
}
void rpc_client_destroy(rpc_client *client) {
int i = 0;
if(client == NULL){
return;
}
for (i = 0; i < client->thread_count; ++i) {
rpc_client_thread_destroy(&client->threads[i]);
}
rpc_free(client->threads);
pthread_mutex_destroy(&client->mutex);
pthread_cond_destroy(&client->cond);
memset(client, 0, sizeof(rpc_client));
rpc_free(client);
return;
}
@ -140,13 +146,19 @@ ret_code rpc_client_call(rpc_client* client, char* service_name,
int n_fd = eventfd(0, 0);
rpc_response rsp;
long time_diff;
ret_code ret = RET_OK;
assert(client!=NULL);
rsp.data = (INT_TO_POINTER(n_fd));
time_diff = rpc_time_msec();
rpc_client_call_async(client, service_name, method_name, input, input_len,
ret = rpc_client_call_async(client, service_name, method_name, input, input_len,
cb_call_ex, &rsp);
if(ret != RET_OK){
close(n_fd);
return ret;
}
//TODO
//ev read timeout?
fd_set r_set;
@ -202,12 +214,26 @@ ret_code rpc_client_call_async(rpc_client* client, char* service_name,
rpc_callback callback, pointer data) {
rpc_client_thread *th = client->threads;
rpc_request *req = rpc_request_new();
rpc_request *req = NULL;
rpc_conn *conn = NULL;
ret_code ret = RET_OK;
int idx;
idx = (th->last_conn + 1) % (th->req_conn_count);
conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
/*if(!conn){
rpc_req_conns_new(client, th);
conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
}*/
if(!conn)
{
return RET_OK;
}
req = rpc_request_new();
if(req == NULL){
return RET_ERR;
return RET_OK;
}
req->callback = callback;
@ -220,15 +246,17 @@ ret_code rpc_client_call_async(rpc_client* client, char* service_name,
//push pool
req->seq = rpc_sessionpool_insert(th->req_pool, req);
idx = (th->last_conn + 1) % (th->req_conn_count);
conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
th->last_conn = idx;
th->last_conn = idx;
rpc_log_dbg("rpc_client_call_async:send_message\n");
//write data
rpc_request_format(req, conn);
rpc_send_message(conn);
return RET_OK;
if(rpc_client_send(conn) == FALSE)
{
ret = RET_SENDERR;
}
return ret;
}

View File

@ -66,6 +66,57 @@ void rpc_conn_add_freelist(rpc_conn *c) {
pthread_mutex_unlock(&conn_lock);
}
static boolean rsp_read_from_conn(rpc_conn *c) {
if (c->rcurr != c->rbuf) {
if (c->rbytes != 0)
memcpy(c->rbuf, c->rcurr, c->rbytes);
c->rcurr = c->rbuf;
}
int num_allocs = 0;
for (;;) {
if (c->rbytes >= c->rsize) {
if (num_allocs == 4) {
return TRUE;
}
++num_allocs;
char *new_rbuf = rpc_realloc(c->rbuf, c->rsize * 2);
if (!new_rbuf) {
break;
}
c->rcurr = c->rbuf = new_rbuf;
c->rsize *= 2;
}
int avail = c->rsize - c->rbytes;
int n = read(c->sfd, c->rbuf + c->rbytes, avail);
rpc_log_dbg("cb_req_read ret %d, rbytes %d, avail %d\r\n",n,c->rbytes,avail);
if (n == 0) {
rpc_log_error("connected closed!\n");
//client_conn_close(c);
rpc_sleep(1000);
return FALSE;
} else if (n == -1) {
if (errno == EINTR || errno == EWOULDBLOCK) {
continue;
}
rpc_log_error("read error!");
//client_conn_close(c);
rpc_sleep(1000);
break;
} else {
c->rbytes += n;
if (n == avail) {
continue;
} else {
break;
}
}
}
return TRUE;
}
static boolean read_from_conn(rpc_conn *c) {
if (c->rcurr != c->rbuf) {
if (c->rbytes != 0)
@ -224,7 +275,7 @@ static void cb_req_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
static void cb_rsp_read(struct ev_loop *l, struct ev_io *watcher, int revents) {
rpc_conn *c = watcher->data;
//read response
if (read_from_conn(c)) {
if (rsp_read_from_conn(c)) {
for (;;) {
if (c->rbytes <= 0) {
return;
@ -300,6 +351,27 @@ void rpc_conn_close(rpc_conn *c) {
rpc_conn_add_freelist(c);
}
/*void client_conn_close(rpc_conn *c) {
rpc_client_thread *th;
assert(c!=NULL);
if(th = (rpc_client_thread *)c->thread){
rpc_array_del(th->req_conns, c);
}
ev_io_stop(c->loop, &c->watcher);
close(c->sfd);
c->sfd = -1;
c->rcurr = 0;
c->iovused = 0;
c->thread = NULL;
c->loop = NULL;
rpc_conn_add_freelist(c);
}*/
static rpc_conn *rpc_conn_new_inner(int fd, struct ev_loop* l, void(*cb)(
struct ev_loop *l, struct ev_io *watcher, int revents)) {
rpc_conn *conn;
@ -354,6 +426,47 @@ void rpc_conn_addiov(rpc_conn *c, pointer data, size_t data_len) {
c->iovused++;
}
boolean rpc_client_send(rpc_conn *conn) {
struct msghdr m;
boolean ret = TRUE;
memset(&m, 0, sizeof(m));
m.msg_iov = conn->iov;
m.msg_iovlen = conn->iovused;
ssize_t res;
for (;;) {
res = sendmsg(conn->sfd, &m, 0);
if (res > 0) {
while (m.msg_iovlen > 0 && res >= m.msg_iov->iov_len) {
res -= m.msg_iov->iov_len;
m.msg_iovlen--;
m.msg_iov++;
}
if (res > 0) {
m.msg_iov->iov_base = (caddr_t) m.msg_iov->iov_base + res;
m.msg_iov->iov_len -= res;
} else {
break;
}
} else if (res == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
} else {
perror("failed to write");
ret = FALSE;
break;
}
} else {
ret = FALSE;
break;
}
}
//TODO free resource
conn->iovused = 0;
return ret;
}
void rpc_send_message(rpc_conn *conn) {
struct msghdr m;
memset(&m, 0, sizeof(m));

View File

@ -66,3 +66,16 @@ boolean rpc_sessionpool_remove(rpc_sessionpool *pool, int index) {
pthread_mutex_unlock(&pool->mutex);
return TRUE;
}
boolean rpc_sessionpool_free(rpc_sessionpool *pool) {
if(pool){
pthread_mutex_lock(&pool->mutex);
rpc_free(pool->data);
pool->size = 0;
pthread_mutex_unlock(&pool->mutex);
pthread_mutex_destroy(&pool->mutex);
rpc_free(pool);
}
return TRUE;
}

View File

@ -216,6 +216,7 @@ boolean rpc_unix_socketc_get(char *host, int port, int*pcfd)
if (connect(sockfd, (struct sockaddr *)&cliun, len) < 0)
{
rpc_log_error("connect error");
close(sockfd);
return FALSE;
}
@ -325,29 +326,73 @@ boolean rpc_dispatch_start(rpc_dispatch_thread *th) {
return TRUE;
}
rpc_conn *rpc_req_conns_new(rpc_client *client, rpc_client_thread *th)
{
rpc_conn *c = NULL;
int cfd = -1;
if(rpc_unix_socketc_get(client->host, client->port, &cfd) != TRUE)
{
rpc_log_error("connect error");
return NULL;
}
c = rpc_conn_client_new(cfd, th->loop);
c->thread = th;
rpc_array_add(th->req_conns, c);
return c;
}
boolean rpc_client_thread_destroy(rpc_client_thread *th) {
rpc_conn *c = NULL;
//void *nRes;
if(th == NULL){
return FALSE;
}
//pthread_cancel(th->thread_receive_id);
//pthread_join(th->thread_receive_id, &nRes);
c = (rpc_conn *)rpc_array_get(th->req_conns);
while(c){
rpc_conn_close(c);
c = (rpc_conn *)rpc_array_get(th->req_conns);
}
ev_loop_destroy(th->loop);
rpc_array_destroy(th->req_conns);
rpc_async_queue_free(th->req_pending);
rpc_sessionpool_free(th->req_pool);
rpc_sessionpool_free(th->req_timer);
th->req_conn_count = 0;
th->req_conns = NULL;
th->req_pending = NULL;
th->req_pool = NULL;
th->req_timer = NULL;
th->client = NULL;
th->last_conn = -1;
return TRUE;
}
boolean rpc_client_thread_init(rpc_client *client, rpc_client_thread *th) {
assert(th!=NULL);
th->loop = ev_loop_new(0);
th->req_conn_count = CLIENT_CONN_NUM;
th->req_conns = rpc_array_new();
th->last_conn = -1;
int i, cfd;
rpc_conn *c = NULL;
int i;
for (i = 0; i < th->req_conn_count; i++) {
if(rpc_unix_socketc_get(client->host, client->port, &cfd) != TRUE)
{
rpc_log_error("connect error");
if(rpc_req_conns_new(client, th) == NULL){
return FALSE;
}
c = rpc_conn_client_new(cfd, th->loop);
c->thread = th;
rpc_array_add(th->req_conns, c);
}
th->req_pending = rpc_async_queue_new();
th->req_pool = rpc_sessionpool_new();
th->req_timer = rpc_sessionpool_new();
th->client = client;
c->thread = th;
return TRUE;
}
@ -365,13 +410,20 @@ static void* thread_write_hander(void* data) {
idx = (th->last_conn + 1) % (th->req_conn_count);
conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
/*if(!conn){
rpc_req_conns_new(th->client, th);
conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
}*/
if(conn){
th->last_conn = idx;
rpc_log_dbg("thread_write_hander:send_message\n");
//write data
rpc_request_format(req, conn);
rpc_send_message(conn);
rpc_client_send(conn);
}
}
return NULL;
}
@ -396,6 +448,6 @@ boolean rpc_client_thread_start(rpc_client_thread *th) {
pthread_t pid;
pthread_create(&pid, NULL, thread_client_handler, th);
//write
pthread_create(&pid, NULL, thread_write_hander, th);
//pthread_create(&pid, NULL, thread_write_hander, th);
return TRUE;
}