/* * rpc_thread.c * * Created on: 2011-3-21 * Author: yanghu */ #include "rpc_thread.h" #include "rpc_server.h" #include "rpc_client.h" #include "rpc_conn.h" #include "rpc_request.h" #include "rpc_util.h" #include #include #include #include #include #include #include #include #include #include #include #include #include //TODO //get thread from free thread list static void cb_notify_conn(struct ev_loop *l, struct ev_io *watcher, int revents) { rpc_worker_thread *worker = watcher->data; uint64_t n; read(worker->notify_fd, &n, sizeof(n)); if (n != 1) { rpc_log_error("read notify data error, n = %ld\r\n", n); } while(n > 0) { //new conn; int cfd = POINTER_TO_INT(rpc_queue_pop(worker->queue)); rpc_log_dbg("socket %d read notify data cfd %d\r\n", worker->notify_fd, cfd); rpc_conn *c = rpc_conn_new(cfd, worker->loop); c->thread = worker; n--; } } static void cb_dispatch_conn(struct ev_loop *l, struct ev_io *watcher, int revents) { rpc_dispatch_thread *th = watcher->data; int cfd; if ((cfd = accept(th->sock_fd, NULL, NULL)) == -1) { //check errno rpc_log_error("accept conn error"); return; } rpc_log_dbg("socket %d accept %d\r\n",th->sock_fd, cfd); rpc_set_non_block(cfd); //rrd thread int idx; idx = (th->last_thread + 1) % (th->server->worker_len); th->last_thread = idx; rpc_worker_thread *worker = th->server->th_workers + idx; //notify rpc_queue_push(worker->queue, INT_TO_POINTER(cfd)); uint64_t n = 1; write(worker->notify_fd, &n, sizeof(n)); } void rpc_worker_init(rpc_worker_thread *th) { th->loop = ev_loop_new(0); th->notify_fd = eventfd(0, 0); th->queue = rpc_queue_new(); th->watcher.data = th; ev_io_init(&th->watcher,cb_notify_conn,th->notify_fd,EV_READ); ev_io_start(th->loop, &th->watcher); } static void* thread_worker_handler(void* data) { rpc_worker_thread *th = data; pthread_mutex_lock(&(th->server->mutex)); th->server->init_count++; pthread_cond_signal(&(th->server->cond)); pthread_mutex_unlock(&(th->server->mutex)); th->thread_id = pthread_self(); ev_run(th->loop, 0); return NULL; } static void* thread_dispatch_handler(void* data) { rpc_dispatch_thread *th = data; th->thread_id = pthread_self(); ev_run(th->loop, 0); return NULL; } void rpc_worker_start(rpc_worker_thread *th) { pthread_t pid; pthread_create(&pid, NULL, thread_worker_handler, th); } //#define CONFIGM_UNIX_SOCKET "configm.socket" boolean rpc_unix_sockets_get(int port, int *psfd) { struct sockaddr_un serun; int listenfd, size; char socket_path[32]; //char *socket_path = CONFIGM_UNIX_SOCKET; int flag = 1; memset(socket_path, 0, 32); snprintf(socket_path, 32, "%d.socket", port); if ((listenfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { rpc_log_error("socket error"); return FALSE; } //keepalive setsockopt(listenfd, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)); //reuse setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)); memset(&serun, 0, sizeof(serun)); serun.sun_family = AF_UNIX; strcpy(serun.sun_path, socket_path); size = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path); unlink(socket_path); if (bind(listenfd, (struct sockaddr *)&serun, size) < 0) { rpc_log_error("bind error"); return FALSE; } if (listen(listenfd, BACKLOG) < 0) { rpc_log_error("listen error"); return FALSE; } *psfd = listenfd; return TRUE; } boolean rpc_unix_socketc_get(char *host, int port, int*pcfd) { struct sockaddr_un cliun; socklen_t cliun_len; int sockfd; int len; char socket_path[32]; memset(socket_path, 0, 32); snprintf(socket_path, 32, "%d.socket", port); //char *socket_path = CONFIGM_UNIX_SOCKET; if ((sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0) { rpc_log_error("client socket error"); return FALSE; } memset(&cliun, 0, sizeof(cliun)); cliun.sun_family = AF_UNIX; strcpy(cliun.sun_path, socket_path); len = offsetof(struct sockaddr_un, sun_path) + strlen(cliun.sun_path); if (connect(sockfd, (struct sockaddr *)&cliun, len) < 0) { rpc_log_error("connect error"); return FALSE; } *pcfd = sockfd; return TRUE; } boolean rpc_tcp_sockets_get(int port, int *psfd) { struct sockaddr_in addr; int sfd; int flag = 1; memset(&addr, 0, sizeof(addr)); addr.sin_addr.s_addr = INADDR_ANY; addr.sin_family = AF_INET; addr.sin_port = htons(port); sfd = socket(AF_INET, SOCK_STREAM, 0); //keepalive setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)); //reuse setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag)); if (bind(sfd, (struct sockaddr*) &addr, sizeof(addr)) == -1) { rpc_log_error("bind error"); return FALSE; } if (listen(sfd, BACKLOG) == -1) { rpc_log_error("accept error"); return FALSE; } rpc_set_non_block(sfd); *psfd = sfd; return TRUE; } int rpc_tcp_socketc_get(char *host, int port, int*pcfd) { int cfd; struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(port); inet_aton(host, &addr.sin_addr); cfd = socket(AF_INET, SOCK_STREAM, 0); if (connect(cfd, (struct sockaddr*) &addr, sizeof(addr)) == -1) { rpc_log_error("connect error"); return FALSE; } rpc_set_non_block(cfd); *pcfd = cfd; return TRUE; } boolean rpc_dispatch_init(rpc_dispatch_thread *th) { assert(th!=NULL); int sfd; if(rpc_unix_sockets_get(th->server->port, &sfd) != TRUE) { rpc_log_error("unix sockets get error"); return FALSE; } th->sock_fd = sfd; th->loop = ev_loop_new(0); th->last_thread = -1; th->watcher.data = th; ev_io_init(&th->watcher,cb_dispatch_conn,sfd,EV_READ); ev_io_start(th->loop, &th->watcher); return TRUE; } boolean rpc_dispatch_start(rpc_dispatch_thread *th) { pthread_t pid; int ret = pthread_create(&pid, NULL, thread_dispatch_handler, th); if (ret == -1) { rpc_log_error("create dispatch thread error"); return FALSE; } return TRUE; } boolean rpc_client_thread_init(rpc_client *client, rpc_client_thread *th) { assert(th!=NULL); th->loop = ev_loop_new(0); th->req_conn_count = CLIENT_CONN_NUM; th->req_conns = rpc_array_new(); th->last_conn = -1; int i, cfd; rpc_conn *c = NULL; for (i = 0; i < th->req_conn_count; i++) { if(rpc_unix_socketc_get(client->host, client->port, &cfd) != TRUE) { rpc_log_error("connect error"); return FALSE; } c = rpc_conn_client_new(cfd, th->loop); c->thread = th; rpc_array_add(th->req_conns, c); } th->req_pending = rpc_async_queue_new(); th->req_pool = rpc_sessionpool_new(); th->req_timer = rpc_sessionpool_new(); th->client = client; c->thread = th; return TRUE; } static void* thread_write_hander(void* data) { rpc_client_thread *th = data; //send pending request rpc_request *req = NULL; rpc_conn *conn = NULL; int idx; for (;;) { req = (rpc_request*) rpc_async_queue_pop(th->req_pending); //push pool req->seq = rpc_sessionpool_insert(th->req_pool, req); idx = (th->last_conn + 1) % (th->req_conn_count); conn = (rpc_conn*) rpc_array_index(th->req_conns, idx); th->last_conn = idx; rpc_log_dbg("thread_write_hander:send_message\n"); //write data rpc_request_format(req, conn); rpc_send_message(conn); } return NULL; } static void* thread_client_handler(void* data) { rpc_client_thread *th = data; //pthread_mutex_t mutex = th->client->mutex; pthread_mutex_lock(&(th->client->mutex)); th->client->init_count++; pthread_cond_signal(&(th->client->cond)); pthread_mutex_unlock(&(th->client->mutex)); th->thread_receive_id = pthread_self(); ev_run(th->loop, 0); return NULL; } boolean rpc_client_thread_start(rpc_client_thread *th) { assert(th!=NULL); //read pthread_t pid; pthread_create(&pid, NULL, thread_client_handler, th); //write pthread_create(&pid, NULL, thread_write_hander, th); return TRUE; }