diff --git a/Platform/common/rpc/rpc_array.h b/Platform/common/rpc/rpc_array.h index c32a1cffb..6b2292a9e 100755 --- a/Platform/common/rpc/rpc_array.h +++ b/Platform/common/rpc/rpc_array.h @@ -22,4 +22,8 @@ pointer rpc_array_index(rpc_array* array, int index); boolean rpc_array_add(rpc_array* array, pointer data); +boolean rpc_array_del(rpc_array* array, pointer data); + +boolean rpc_array_destroy(rpc_array* array) ; + #endif /* RPC_ARRAY_H_ */ diff --git a/Platform/common/rpc/rpc_client.h b/Platform/common/rpc/rpc_client.h index 5ce7bdc2d..50ad39de7 100755 --- a/Platform/common/rpc/rpc_client.h +++ b/Platform/common/rpc/rpc_client.h @@ -24,6 +24,7 @@ struct _rpc_client { }; rpc_client* rpc_client_new(); +void rpc_client_destroy(rpc_client *client); boolean rpc_client_connect(rpc_client* client, char* host, int port); rpc_client* rpc_client_connect_ex(char *dst_name); diff --git a/Platform/common/rpc/rpc_common.h b/Platform/common/rpc/rpc_common.h index 807e2782f..08302bf24 100755 --- a/Platform/common/rpc/rpc_common.h +++ b/Platform/common/rpc/rpc_common.h @@ -16,7 +16,7 @@ #define BUF_MAX_SIZE 163840 #define BUFFER_SIZE (BUF_MAX_SIZE + 40960) #define RPC_VERSION "RPC/1.0" -#define TIMEOUT 120 +#define TIMEOUT 15 #define TIMEWARN 200 typedef enum { @@ -38,7 +38,8 @@ typedef enum { RET_IPINVALID = 13, RET_NAMEINVAL = 14, RET_EXIST = 15, - RET_FULL = 16 + RET_FULL = 16, + RET_SENDERR = 17 } ret_code; #define RET_ERROR_DISC \ @@ -57,9 +58,10 @@ typedef enum { { RET_NOTSUPPORT, "NotSupport"},\ { RET_INPUTERR, "InputError"},\ { RET_IPINVALID, "IpInvalid"},\ - { RET_NAMEINVAL, "NameInvalid"},\ - { RET_EXIST, "AlreadyExist"},\ - { RET_FULL, "Full"}\ + { RET_NAMEINVAL, "NameInvalid"},\ + { RET_EXIST, "AlreadyExist"},\ + { RET_FULL, "Full"},\ + { RET_SENDERR, "SendErr"}\ } #define RET_BUFF_SIZE 256; diff --git a/Platform/common/rpc/rpc_conn.h b/Platform/common/rpc/rpc_conn.h index 4e7323977..b3e18a1d1 100755 --- a/Platform/common/rpc/rpc_conn.h +++ b/Platform/common/rpc/rpc_conn.h @@ -36,7 +36,7 @@ rpc_conn *rpc_conn_client_new(int fd, struct ev_loop *l); void rpc_conn_addiov(rpc_conn *c, pointer data, size_t data_len); -void rpc_send_message(rpc_conn *conn); +boolean rpc_client_send(rpc_conn *conn); void rpc_conn_close(rpc_conn *c); diff --git a/Platform/common/rpc/rpc_sessionpool.h b/Platform/common/rpc/rpc_sessionpool.h index 219a1458d..babc05e58 100755 --- a/Platform/common/rpc/rpc_sessionpool.h +++ b/Platform/common/rpc/rpc_sessionpool.h @@ -22,6 +22,8 @@ pointer rpc_sessionpool_get(rpc_sessionpool *pool, int index); boolean rpc_sessionpool_remove(rpc_sessionpool *pool, int index); +boolean rpc_sessionpool_free(rpc_sessionpool *pool); + END_DECLS #endif /* RPC_SESSIONPOOL_H_ */ diff --git a/Platform/common/rpc/rpc_thread.h b/Platform/common/rpc/rpc_thread.h index 8455ad8ae..3fdb4b369 100755 --- a/Platform/common/rpc/rpc_thread.h +++ b/Platform/common/rpc/rpc_thread.h @@ -66,4 +66,8 @@ boolean rpc_client_thread_init(rpc_client *client, rpc_client_thread *th); boolean rpc_client_thread_start(rpc_client_thread *th); +rpc_conn *rpc_req_conns_new(rpc_client *client, rpc_client_thread *th); + +boolean rpc_client_thread_destroy(rpc_client_thread *th); + #endif /* RPC_THREAD_H_ */ diff --git a/Platform/user/configm/config-api/configclient.c b/Platform/user/configm/config-api/configclient.c index b54103ee1..ec35575e1 100644 --- a/Platform/user/configm/config-api/configclient.c +++ b/Platform/user/configm/config-api/configclient.c @@ -21,6 +21,18 @@ rpc_client *config_client_get() return config_client; } +void config_client_destroy() +{ + if(config_client) + { + rpc_client_destroy(config_client); + config_client = NULL; + } + + return; +} + + ret_code config_construct_msg(uint config_type, uint64 config_id, char* config_data, int config_len, config_msg_t **config_msg, int *msg_len) @@ -48,7 +60,7 @@ ret_code config_construct_msg(uint config_type, uint64 config_id, return RET_OK; } - + ret_code config_destroy_msg(config_msg_t *config_msg, int msg_len) { if(config_msg) @@ -58,7 +70,6 @@ ret_code config_destroy_msg(config_msg_t *config_msg, int msg_len) } } - ret_code web_config_exec_sync(uint config_type, uint64 config_id, char* config_data, int config_len, char**output, int *output_len) @@ -80,6 +91,11 @@ ret_code web_config_exec_sync(uint config_type, uint64 config_id, code = rpc_client_call(client, "ConfigManger#0", "cm_config_process", config_msg, msg_len, (pointer)output, output_len); ASSERT_RET_NO(code); + + if(code == RET_SENDERR || code == RET_TIMEOUT) + { + config_client_destroy(); + } config_destroy_msg(config_msg, msg_len); @@ -107,6 +123,10 @@ ret_code web_config_exec_async(uint config_type, uint64 config_id, ret = rpc_client_call_async(client, "ConfigManger#0", "cm_config_process", config_msg, msg_len, callback, data); + if(ret == RET_SENDERR || ret == RET_TIMEOUT) + { + config_client_destroy(); + } config_destroy_msg(config_msg, msg_len); diff --git a/Platform/user/configm/config-server/netconfig/bridge/brconfig.c b/Platform/user/configm/config-server/netconfig/bridge/brconfig.c index a344c31ad..391aea344 100644 --- a/Platform/user/configm/config-server/netconfig/bridge/brconfig.c +++ b/Platform/user/configm/config-server/netconfig/bridge/brconfig.c @@ -21,8 +21,13 @@ br_event_head_t br_event_tbl = {.head.first = NULL, .lock = 0, .init = FALSE}; int br_vlan_bridge(char *br_name) { - if(br_name[0] == 'b' && br_name[1] == 'r' - && br_name[2] == 'v' && br_name[3] == 'l') + if(strlen(br_name) < 5) + { + return FALSE; + } + + if(br_name[0] == 'b' && br_name[1] == 'r' && br_name[2] == '-' + && br_name[3] == 'v' && br_name[4] == 'l') { return TRUE; } diff --git a/Platform/user/configm/config-test/configtest.c b/Platform/user/configm/config-test/configtest.c index 341874f6e..095f1484a 100644 --- a/Platform/user/configm/config-test/configtest.c +++ b/Platform/user/configm/config-test/configtest.c @@ -22,29 +22,34 @@ int main(int argc, char **argv) config_id = strtol(argv[1], &stop, 16); printf("config id:0x%lx\n",config_id); - - f = fopen(argv[1],"r+"); - if(f == NULL) - { - printf("OPEN CONFIG test file FALID\n"); - return EXIT_FAILURE; - } - memset(config_linebuf, 0, sizeof(config_linebuf)); - while(fgets(config_linebuf, 512, f) != NULL) + //while(1) { - - printf("configure: %s\n", config_linebuf); - - code = web_config_exec_sync(CM_CONFIG_GET, config_id, - config_linebuf, strlen(config_linebuf) + 1, &output, &output_len); - - printf("call config type retturn:%s,result:%s\n", rpc_code_format(code), output); + f = fopen(argv[1],"r+"); + if(f == NULL) + { + printf("OPEN CONFIG test file FALID\n"); + return EXIT_FAILURE; + } memset(config_linebuf, 0, sizeof(config_linebuf)); + while(fgets(config_linebuf, 512, f) != NULL) + { + + printf("configure: %s\n", config_linebuf); + + code = web_config_exec_sync(CM_CONFIG_SET, config_id, + config_linebuf, strlen(config_linebuf) + 1, &output, &output_len); + + printf("call config type return:%s,result:%s\n", rpc_code_format(code), output); + + memset(config_linebuf, 0, sizeof(config_linebuf)); + memset(output, 0, output_len); + } + + fclose(f); + rpc_sleep(1000); } - - fclose(f); return EXIT_SUCCESS; } diff --git a/Platform/user/rpc/rpc_array.c b/Platform/user/rpc/rpc_array.c index 06e8a5e9c..e524bdf27 100755 --- a/Platform/user/rpc/rpc_array.c +++ b/Platform/user/rpc/rpc_array.c @@ -55,6 +55,49 @@ pointer rpc_array_index(rpc_array* array, int index) { return data; } +boolean rpc_array_del(rpc_array* array, pointer data) { + int i = 0; + int j = 0; + assert(array); + assert(data); + pthread_mutex_lock(&array->lock); + + for(i = 0; i < array->cur; i++) + if (array->data[i] == data) { + if(i < array->cur - 1){ + for(j = i; j < array->cur - 1; j++){ + array->data[j] = array->data[j + 1]; + } + array->cur--; + } + else{ + array->data[i] = NULL; + array->cur--; + } + break; + } + pthread_mutex_unlock(&array->lock); + return TRUE; +} + +boolean rpc_array_destroy(rpc_array* array) { + + if(!array){ + return FALSE; + } + + pthread_mutex_lock(&array->lock); + rpc_free(array->data); + array->cur = 0; + array->size = 0; + pthread_mutex_unlock(&array->lock); + + pthread_mutex_destroy(&array->lock); + + return TRUE; +} + + boolean rpc_array_add(rpc_array* array, pointer data) { pthread_mutex_lock(&array->lock); if (array->cur < array->size) { diff --git a/Platform/user/rpc/rpc_async_queue.c b/Platform/user/rpc/rpc_async_queue.c index c673e7cd9..e4d50efc0 100755 --- a/Platform/user/rpc/rpc_async_queue.c +++ b/Platform/user/rpc/rpc_async_queue.c @@ -18,6 +18,24 @@ struct _rpc_async_queue { //thread_wait }; +static pointer rpc_asyn_queue_pop_no_lock(rpc_async_queue *queue) { + rpc_queue_item *item = NULL; + item = queue->head; + + if (item != NULL) { + queue->head = item->next; + if (item->next == NULL) { + queue->tail = NULL; + } + } + if (item) { + rpc_queue_item_free(item); + return item->data; + } else { + return NULL; + } +} + rpc_async_queue* rpc_async_queue_new() { rpc_async_queue *queue = rpc_new(rpc_async_queue,1); pthread_mutex_init(&queue->mutex, NULL); @@ -28,12 +46,17 @@ rpc_async_queue* rpc_async_queue_new() { } void rpc_async_queue_free(rpc_async_queue *queue) { + if(!queue){ + return; + } pthread_mutex_lock(&queue->mutex); pointer data; - while ((data = rpc_async_queue_try_pop(queue)) != NULL) { + while ((data = rpc_asyn_queue_pop_no_lock(queue)) != NULL) { rpc_free(data); } pthread_mutex_unlock(&queue->mutex); + pthread_mutex_destroy(&queue->mutex); + pthread_cond_destroy(&queue->cond); rpc_free(queue); } diff --git a/Platform/user/rpc/rpc_client.c b/Platform/user/rpc/rpc_client.c index 285371524..3ccc45574 100755 --- a/Platform/user/rpc/rpc_client.c +++ b/Platform/user/rpc/rpc_client.c @@ -40,7 +40,7 @@ rpc_client* rpc_client_new() { client->host = NULL; client->port = 0; client->thread_count = CLIENT_THREAD_NUM; - client->threads = rpc_new(rpc_client_thread,CLIENT_THREAD_NUM); + client->threads = rpc_new0(rpc_client_thread,CLIENT_THREAD_NUM); client->last_thread = -1; client->init_count = 0; pthread_mutex_init(&client->mutex, NULL); @@ -48,17 +48,23 @@ rpc_client* rpc_client_new() { return client; } -void rpc_client_destroy(rpc_client *client ) { - +void rpc_client_destroy(rpc_client *client) { + int i = 0; + if(client == NULL){ return; } - rpc_free(client->threads); + for (i = 0; i < client->thread_count; ++i) { + rpc_client_thread_destroy(&client->threads[i]); + } + + rpc_free(client->threads); pthread_mutex_destroy(&client->mutex); pthread_cond_destroy(&client->cond); memset(client, 0, sizeof(rpc_client)); + rpc_free(client); return; } @@ -140,13 +146,19 @@ ret_code rpc_client_call(rpc_client* client, char* service_name, int n_fd = eventfd(0, 0); rpc_response rsp; long time_diff; + ret_code ret = RET_OK; assert(client!=NULL); rsp.data = (INT_TO_POINTER(n_fd)); time_diff = rpc_time_msec(); - rpc_client_call_async(client, service_name, method_name, input, input_len, + ret = rpc_client_call_async(client, service_name, method_name, input, input_len, cb_call_ex, &rsp); + if(ret != RET_OK){ + close(n_fd); + return ret; + } + //TODO //ev read timeout? fd_set r_set; @@ -202,12 +214,26 @@ ret_code rpc_client_call_async(rpc_client* client, char* service_name, rpc_callback callback, pointer data) { rpc_client_thread *th = client->threads; - rpc_request *req = rpc_request_new(); + rpc_request *req = NULL; rpc_conn *conn = NULL; + ret_code ret = RET_OK; int idx; + idx = (th->last_conn + 1) % (th->req_conn_count); + conn = (rpc_conn*) rpc_array_index(th->req_conns, idx); + /*if(!conn){ + rpc_req_conns_new(client, th); + conn = (rpc_conn*) rpc_array_index(th->req_conns, idx); + }*/ + + if(!conn) + { + return RET_OK; + } + + req = rpc_request_new(); if(req == NULL){ - return RET_ERR; + return RET_OK; } req->callback = callback; @@ -220,15 +246,17 @@ ret_code rpc_client_call_async(rpc_client* client, char* service_name, //push pool req->seq = rpc_sessionpool_insert(th->req_pool, req); - idx = (th->last_conn + 1) % (th->req_conn_count); - conn = (rpc_conn*) rpc_array_index(th->req_conns, idx); + th->last_conn = idx; - rpc_log_dbg("rpc_client_call_async:send_message\n"); //write data rpc_request_format(req, conn); - rpc_send_message(conn); - return RET_OK; + if(rpc_client_send(conn) == FALSE) + { + ret = RET_SENDERR; + } + + return ret; } diff --git a/Platform/user/rpc/rpc_conn.c b/Platform/user/rpc/rpc_conn.c index 03b116c0e..6797dfe6c 100755 --- a/Platform/user/rpc/rpc_conn.c +++ b/Platform/user/rpc/rpc_conn.c @@ -66,6 +66,57 @@ void rpc_conn_add_freelist(rpc_conn *c) { pthread_mutex_unlock(&conn_lock); } +static boolean rsp_read_from_conn(rpc_conn *c) { + if (c->rcurr != c->rbuf) { + if (c->rbytes != 0) + memcpy(c->rbuf, c->rcurr, c->rbytes); + c->rcurr = c->rbuf; + } + int num_allocs = 0; + for (;;) { + if (c->rbytes >= c->rsize) { + if (num_allocs == 4) { + return TRUE; + } + ++num_allocs; + char *new_rbuf = rpc_realloc(c->rbuf, c->rsize * 2); + if (!new_rbuf) { + break; + } + c->rcurr = c->rbuf = new_rbuf; + c->rsize *= 2; + } + int avail = c->rsize - c->rbytes; + int n = read(c->sfd, c->rbuf + c->rbytes, avail); + + rpc_log_dbg("cb_req_read ret %d, rbytes %d, avail %d\r\n",n,c->rbytes,avail); + + if (n == 0) { + rpc_log_error("connected closed!\n"); + //client_conn_close(c); + rpc_sleep(1000); + return FALSE; + } else if (n == -1) { + if (errno == EINTR || errno == EWOULDBLOCK) { + continue; + } + rpc_log_error("read error!"); + //client_conn_close(c); + rpc_sleep(1000); + break; + } else { + c->rbytes += n; + if (n == avail) { + continue; + } else { + break; + } + } + } + return TRUE; +} + + static boolean read_from_conn(rpc_conn *c) { if (c->rcurr != c->rbuf) { if (c->rbytes != 0) @@ -224,7 +275,7 @@ static void cb_req_read(struct ev_loop *l, struct ev_io *watcher, int revents) { static void cb_rsp_read(struct ev_loop *l, struct ev_io *watcher, int revents) { rpc_conn *c = watcher->data; //read response - if (read_from_conn(c)) { + if (rsp_read_from_conn(c)) { for (;;) { if (c->rbytes <= 0) { return; @@ -300,6 +351,27 @@ void rpc_conn_close(rpc_conn *c) { rpc_conn_add_freelist(c); } +/*void client_conn_close(rpc_conn *c) { + rpc_client_thread *th; + assert(c!=NULL); + + if(th = (rpc_client_thread *)c->thread){ + rpc_array_del(th->req_conns, c); + } + + ev_io_stop(c->loop, &c->watcher); + close(c->sfd); + + c->sfd = -1; + c->rcurr = 0; + c->iovused = 0; + c->thread = NULL; + c->loop = NULL; + + rpc_conn_add_freelist(c); +}*/ + + static rpc_conn *rpc_conn_new_inner(int fd, struct ev_loop* l, void(*cb)( struct ev_loop *l, struct ev_io *watcher, int revents)) { rpc_conn *conn; @@ -354,6 +426,47 @@ void rpc_conn_addiov(rpc_conn *c, pointer data, size_t data_len) { c->iovused++; } +boolean rpc_client_send(rpc_conn *conn) { + struct msghdr m; + boolean ret = TRUE; + memset(&m, 0, sizeof(m)); + m.msg_iov = conn->iov; + m.msg_iovlen = conn->iovused; + ssize_t res; + for (;;) { + res = sendmsg(conn->sfd, &m, 0); + if (res > 0) { + while (m.msg_iovlen > 0 && res >= m.msg_iov->iov_len) { + res -= m.msg_iov->iov_len; + m.msg_iovlen--; + m.msg_iov++; + } + + if (res > 0) { + m.msg_iov->iov_base = (caddr_t) m.msg_iov->iov_base + res; + m.msg_iov->iov_len -= res; + } else { + break; + } + } else if (res == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + continue; + } else { + perror("failed to write"); + ret = FALSE; + break; + } + } else { + ret = FALSE; + break; + } + } + //TODO free resource + conn->iovused = 0; + return ret; +} + + void rpc_send_message(rpc_conn *conn) { struct msghdr m; memset(&m, 0, sizeof(m)); diff --git a/Platform/user/rpc/rpc_sessionpool.c b/Platform/user/rpc/rpc_sessionpool.c index 85aa84a65..d7b1b462b 100755 --- a/Platform/user/rpc/rpc_sessionpool.c +++ b/Platform/user/rpc/rpc_sessionpool.c @@ -66,3 +66,16 @@ boolean rpc_sessionpool_remove(rpc_sessionpool *pool, int index) { pthread_mutex_unlock(&pool->mutex); return TRUE; } + +boolean rpc_sessionpool_free(rpc_sessionpool *pool) { + if(pool){ + pthread_mutex_lock(&pool->mutex); + rpc_free(pool->data); + pool->size = 0; + pthread_mutex_unlock(&pool->mutex); + pthread_mutex_destroy(&pool->mutex); + rpc_free(pool); + } + return TRUE; +} + diff --git a/Platform/user/rpc/rpc_thread.c b/Platform/user/rpc/rpc_thread.c index 8cf437f84..58ba5cca9 100755 --- a/Platform/user/rpc/rpc_thread.c +++ b/Platform/user/rpc/rpc_thread.c @@ -215,7 +215,8 @@ boolean rpc_unix_socketc_get(char *host, int port, int*pcfd) len = offsetof(struct sockaddr_un, sun_path) + strlen(cliun.sun_path); if (connect(sockfd, (struct sockaddr *)&cliun, len) < 0) { - rpc_log_error("connect error"); + rpc_log_error("connect error"); + close(sockfd); return FALSE; } @@ -325,29 +326,73 @@ boolean rpc_dispatch_start(rpc_dispatch_thread *th) { return TRUE; } +rpc_conn *rpc_req_conns_new(rpc_client *client, rpc_client_thread *th) +{ + rpc_conn *c = NULL; + int cfd = -1; + if(rpc_unix_socketc_get(client->host, client->port, &cfd) != TRUE) + { + rpc_log_error("connect error"); + return NULL; + } + c = rpc_conn_client_new(cfd, th->loop); + c->thread = th; + rpc_array_add(th->req_conns, c); + + return c; +} + +boolean rpc_client_thread_destroy(rpc_client_thread *th) { + rpc_conn *c = NULL; + //void *nRes; + + if(th == NULL){ + return FALSE; + } + + //pthread_cancel(th->thread_receive_id); + //pthread_join(th->thread_receive_id, &nRes); + + c = (rpc_conn *)rpc_array_get(th->req_conns); + while(c){ + rpc_conn_close(c); + c = (rpc_conn *)rpc_array_get(th->req_conns); + } + + ev_loop_destroy(th->loop); + rpc_array_destroy(th->req_conns); + rpc_async_queue_free(th->req_pending); + rpc_sessionpool_free(th->req_pool); + rpc_sessionpool_free(th->req_timer); + + th->req_conn_count = 0; + th->req_conns = NULL; + th->req_pending = NULL; + th->req_pool = NULL; + th->req_timer = NULL; + th->client = NULL; + th->last_conn = -1; + + return TRUE; +} + + boolean rpc_client_thread_init(rpc_client *client, rpc_client_thread *th) { assert(th!=NULL); th->loop = ev_loop_new(0); th->req_conn_count = CLIENT_CONN_NUM; th->req_conns = rpc_array_new(); th->last_conn = -1; - int i, cfd; - rpc_conn *c = NULL; + int i; for (i = 0; i < th->req_conn_count; i++) { - if(rpc_unix_socketc_get(client->host, client->port, &cfd) != TRUE) - { - rpc_log_error("connect error"); - return FALSE; + if(rpc_req_conns_new(client, th) == NULL){ + return FALSE; } - c = rpc_conn_client_new(cfd, th->loop); - c->thread = th; - rpc_array_add(th->req_conns, c); } th->req_pending = rpc_async_queue_new(); th->req_pool = rpc_sessionpool_new(); th->req_timer = rpc_sessionpool_new(); th->client = client; - c->thread = th; return TRUE; } @@ -365,13 +410,20 @@ static void* thread_write_hander(void* data) { idx = (th->last_conn + 1) % (th->req_conn_count); conn = (rpc_conn*) rpc_array_index(th->req_conns, idx); - th->last_conn = idx; - rpc_log_dbg("thread_write_hander:send_message\n"); - - //write data - rpc_request_format(req, conn); - rpc_send_message(conn); + /*if(!conn){ + rpc_req_conns_new(th->client, th); + conn = (rpc_conn*) rpc_array_index(th->req_conns, idx); + }*/ + if(conn){ + th->last_conn = idx; + + rpc_log_dbg("thread_write_hander:send_message\n"); + + //write data + rpc_request_format(req, conn); + rpc_client_send(conn); + } } return NULL; } @@ -396,6 +448,6 @@ boolean rpc_client_thread_start(rpc_client_thread *th) { pthread_t pid; pthread_create(&pid, NULL, thread_client_handler, th); //write - pthread_create(&pid, NULL, thread_write_hander, th); + //pthread_create(&pid, NULL, thread_write_hander, th); return TRUE; }