456 lines
11 KiB
C
Executable File
456 lines
11 KiB
C
Executable File
/*
|
|
* rpc_thread.c
|
|
*
|
|
* Created on: 2011-3-21
|
|
* Author: yanghu
|
|
*/
|
|
#include "rpc_thread.h"
|
|
#include "rpc_server.h"
|
|
#include "rpc_client.h"
|
|
#include "rpc_conn.h"
|
|
#include "rpc_request.h"
|
|
#include "rpc_util.h"
|
|
|
|
#include <assert.h>
|
|
#include <memory.h>
|
|
#include <error.h>
|
|
#include <errno.h>
|
|
#include <unistd.h>
|
|
#include <sys/socket.h>
|
|
#include <sys/types.h>
|
|
#include <sys/uio.h>
|
|
#include <arpa/inet.h>
|
|
#include <netinet/in.h>
|
|
#include <sys/eventfd.h>
|
|
#include <sys/un.h>
|
|
#include <stddef.h>
|
|
|
|
//TODO
|
|
//get thread from free thread list
|
|
|
|
static void cb_notify_conn(struct ev_loop *l, struct ev_io *watcher,
|
|
int revents) {
|
|
rpc_worker_thread *worker = watcher->data;
|
|
uint64_t n;
|
|
read(worker->notify_fd, &n, sizeof(n));
|
|
if (n != 1) {
|
|
rpc_log_error("read notify data error, n = %ld\r\n", n);
|
|
}
|
|
|
|
while(n > 0)
|
|
{
|
|
//new conn;
|
|
int cfd = POINTER_TO_INT(rpc_queue_pop(worker->queue));
|
|
|
|
rpc_log_dbg("socket %d read notify data cfd %d\r\n", worker->notify_fd, cfd);
|
|
|
|
rpc_conn *c = rpc_conn_new(cfd, worker->loop);
|
|
c->thread = worker;
|
|
n--;
|
|
}
|
|
|
|
}
|
|
|
|
static void cb_dispatch_conn(struct ev_loop *l, struct ev_io *watcher,
|
|
int revents) {
|
|
rpc_dispatch_thread *th = watcher->data;
|
|
int cfd;
|
|
if ((cfd = accept(th->sock_fd, NULL, NULL)) == -1) {
|
|
//check errno
|
|
rpc_log_error("accept conn error\n");
|
|
return;
|
|
}
|
|
rpc_log_dbg("socket %d accept %d\r\n",th->sock_fd, cfd);
|
|
rpc_set_non_block(cfd);
|
|
//rrd thread
|
|
int idx;
|
|
idx = (th->last_thread + 1) % (th->server->worker_len);
|
|
th->last_thread = idx;
|
|
rpc_worker_thread *worker = th->server->th_workers + idx;
|
|
//notify
|
|
rpc_queue_push(worker->queue, INT_TO_POINTER(cfd));
|
|
uint64_t n = 1;
|
|
write(worker->notify_fd, &n, sizeof(n));
|
|
}
|
|
|
|
void rpc_worker_init(rpc_worker_thread *th) {
|
|
th->loop = ev_loop_new(0);
|
|
th->notify_fd = eventfd(0, 0);
|
|
th->queue = rpc_queue_new();
|
|
th->watcher.data = th;
|
|
ev_io_init(&th->watcher,cb_notify_conn,th->notify_fd,EV_READ);
|
|
ev_io_start(th->loop, &th->watcher);
|
|
}
|
|
|
|
static void* thread_worker_handler(void* data) {
|
|
rpc_worker_thread *th = data;
|
|
pthread_mutex_lock(&(th->server->mutex));
|
|
th->server->init_count++;
|
|
pthread_cond_signal(&(th->server->cond));
|
|
pthread_mutex_unlock(&(th->server->mutex));
|
|
|
|
th->thread_id = pthread_self();
|
|
ev_run(th->loop, 0);
|
|
return NULL;
|
|
}
|
|
|
|
static void* thread_dispatch_handler(void* data) {
|
|
rpc_dispatch_thread *th = data;
|
|
th->thread_id = pthread_self();
|
|
ev_run(th->loop, 0);
|
|
return NULL;
|
|
}
|
|
|
|
void rpc_worker_start(rpc_worker_thread *th) {
|
|
pthread_t pid;
|
|
pthread_create(&pid, NULL, thread_worker_handler, th);
|
|
}
|
|
|
|
void rpc_get_unix_socket_dir(char* socket_dir)
|
|
{
|
|
static char *dir_path = "/tmp/configm";
|
|
|
|
if(access(dir_path, R_OK) == 0)
|
|
{
|
|
sprintf(socket_dir, "%s/", dir_path);
|
|
return;
|
|
}
|
|
|
|
if(mkdir(dir_path, 0777) != 0)
|
|
{
|
|
rpc_log_error("create recovery file error\n");
|
|
strcpy(socket_dir, "");
|
|
}
|
|
else
|
|
{
|
|
sprintf(socket_dir, "%s/", dir_path);
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
|
|
boolean rpc_unix_sockets_get(int port, int *psfd)
|
|
{
|
|
struct sockaddr_un serun;
|
|
int listenfd, size;
|
|
char socket_path[32];
|
|
char socket_dir[32];
|
|
int flag = 1;
|
|
int buff_len = BUFFER_SIZE * 2;
|
|
|
|
memset(socket_dir, 0, 32);
|
|
memset(socket_path, 0, 32);
|
|
|
|
rpc_get_unix_socket_dir(socket_dir);
|
|
snprintf(socket_path, 32, "%s%d.socket", socket_dir, port);
|
|
|
|
if ((listenfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
|
|
{
|
|
rpc_log_error("socket error\n");
|
|
return FALSE;
|
|
}
|
|
|
|
//keepalive
|
|
setsockopt(listenfd, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag));
|
|
//reuse
|
|
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
|
|
setsockopt(listenfd, SOL_SOCKET, SO_RCVBUF, &buff_len, sizeof(buff_len));
|
|
setsockopt(listenfd, SOL_SOCKET, SO_SNDBUF, &buff_len, sizeof(buff_len));
|
|
|
|
|
|
memset(&serun, 0, sizeof(serun));
|
|
serun.sun_family = AF_UNIX;
|
|
strcpy(serun.sun_path, socket_path);
|
|
|
|
size = offsetof(struct sockaddr_un, sun_path) + strlen(serun.sun_path);
|
|
unlink(socket_path);
|
|
|
|
if (bind(listenfd, (struct sockaddr *)&serun, size) < 0)
|
|
{
|
|
rpc_log_error("bind error\n");
|
|
close(listenfd);
|
|
return FALSE;
|
|
}
|
|
|
|
if (listen(listenfd, BACKLOG) < 0)
|
|
{
|
|
rpc_log_error("listen error\n");
|
|
close(listenfd);
|
|
return FALSE;
|
|
}
|
|
|
|
*psfd = listenfd;
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
boolean rpc_unix_socketc_get(char *host, int port, int*pcfd)
|
|
{
|
|
struct sockaddr_un cliun;
|
|
socklen_t cliun_len;
|
|
char socket_path[32];
|
|
char socket_dir[32];
|
|
int buff_len = BUFFER_SIZE * 2;
|
|
int sockfd;
|
|
int len;
|
|
|
|
memset(socket_dir, 0, 32);
|
|
memset(socket_path, 0, 32);
|
|
|
|
rpc_get_unix_socket_dir(socket_dir);
|
|
snprintf(socket_path, 32, "%s%d.socket", socket_dir, port);
|
|
|
|
if ((sockfd = socket(AF_UNIX, SOCK_STREAM, 0)) < 0)
|
|
{
|
|
rpc_log_error("client socket error\n");
|
|
return FALSE;
|
|
}
|
|
|
|
setsockopt(sockfd, SOL_SOCKET, SO_RCVBUF, &buff_len, sizeof(buff_len));
|
|
setsockopt(sockfd, SOL_SOCKET, SO_SNDBUF, &buff_len, sizeof(buff_len));
|
|
|
|
memset(&cliun, 0, sizeof(cliun));
|
|
cliun.sun_family = AF_UNIX;
|
|
strcpy(cliun.sun_path, socket_path);
|
|
|
|
len = offsetof(struct sockaddr_un, sun_path) + strlen(cliun.sun_path);
|
|
if (connect(sockfd, (struct sockaddr *)&cliun, len) < 0)
|
|
{
|
|
rpc_log_error("connect error\n");
|
|
close(sockfd);
|
|
return FALSE;
|
|
}
|
|
|
|
*pcfd = sockfd;
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
boolean rpc_tcp_sockets_get(int port, int *psfd)
|
|
{
|
|
struct sockaddr_in addr;
|
|
int sfd;
|
|
int flag = 1;
|
|
int buff_len = BUFFER_SIZE * 2;
|
|
|
|
memset(&addr, 0, sizeof(addr));
|
|
addr.sin_addr.s_addr = INADDR_ANY;
|
|
addr.sin_family = AF_INET;
|
|
addr.sin_port = htons(port);
|
|
|
|
sfd = socket(AF_INET, SOCK_STREAM, 0);
|
|
|
|
//keepalive
|
|
setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag));
|
|
//reuse
|
|
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
|
|
|
|
setsockopt(sfd, SOL_SOCKET, SO_RCVBUF, &buff_len, sizeof(buff_len));
|
|
setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, &buff_len, sizeof(buff_len));
|
|
|
|
if (bind(sfd, (struct sockaddr*) &addr, sizeof(addr)) == -1)
|
|
{
|
|
rpc_log_error("bind error\n");
|
|
return FALSE;
|
|
}
|
|
|
|
if (listen(sfd, BACKLOG) == -1)
|
|
{
|
|
rpc_log_error("accept error\n");
|
|
return FALSE;
|
|
}
|
|
|
|
rpc_set_non_block(sfd);
|
|
*psfd = sfd;
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
int rpc_tcp_socketc_get(char *host, int port, int*pcfd)
|
|
{
|
|
int cfd;
|
|
struct sockaddr_in addr;
|
|
int buff_len = BUFFER_SIZE * 2;
|
|
|
|
memset(&addr, 0, sizeof(addr));
|
|
addr.sin_family = AF_INET;
|
|
addr.sin_port = htons(port);
|
|
inet_aton(host, &addr.sin_addr);
|
|
|
|
cfd = socket(AF_INET, SOCK_STREAM, 0);
|
|
|
|
setsockopt(cfd, SOL_SOCKET, SO_RCVBUF, &buff_len, sizeof(buff_len));
|
|
setsockopt(cfd, SOL_SOCKET, SO_SNDBUF, &buff_len, sizeof(buff_len));
|
|
|
|
if (connect(cfd, (struct sockaddr*) &addr, sizeof(addr)) == -1)
|
|
{
|
|
rpc_log_error("connect error\n");
|
|
return FALSE;
|
|
}
|
|
rpc_set_non_block(cfd);
|
|
|
|
*pcfd = cfd;
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
boolean rpc_dispatch_init(rpc_dispatch_thread *th) {
|
|
assert(th!=NULL);
|
|
int sfd;
|
|
|
|
if(rpc_unix_sockets_get(th->server->port, &sfd) != TRUE)
|
|
{
|
|
rpc_log_error("unix sockets get error\n");
|
|
return FALSE;
|
|
}
|
|
|
|
th->sock_fd = sfd;
|
|
|
|
th->loop = ev_loop_new(0);
|
|
th->last_thread = -1;
|
|
th->watcher.data = th;
|
|
ev_io_init(&th->watcher,cb_dispatch_conn,sfd,EV_READ);
|
|
ev_io_start(th->loop, &th->watcher);
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
boolean rpc_dispatch_start(rpc_dispatch_thread *th) {
|
|
pthread_t pid;
|
|
int ret = pthread_create(&pid, NULL, thread_dispatch_handler, th);
|
|
if (ret == -1) {
|
|
rpc_log_error("create dispatch thread error\n");
|
|
return FALSE;
|
|
}
|
|
return TRUE;
|
|
}
|
|
|
|
rpc_conn *rpc_req_conns_new(rpc_client *client, rpc_client_thread *th)
|
|
{
|
|
rpc_conn *c = NULL;
|
|
int cfd = -1;
|
|
if(rpc_unix_socketc_get(client->host, client->port, &cfd) != TRUE)
|
|
{
|
|
rpc_log_error("connect error\n");
|
|
return NULL;
|
|
}
|
|
c = rpc_conn_client_new(cfd, th->loop);
|
|
c->thread = th;
|
|
rpc_array_add(th->req_conns, c);
|
|
|
|
return c;
|
|
}
|
|
|
|
boolean rpc_client_thread_destroy(rpc_client_thread *th) {
|
|
rpc_conn *c = NULL;
|
|
|
|
if(th == NULL){
|
|
return FALSE;
|
|
}
|
|
|
|
pthread_mutex_lock(&th->mutex);
|
|
c = (rpc_conn *)rpc_array_get(th->req_conns);
|
|
while(c){
|
|
rpc_conn_close(c);
|
|
c = (rpc_conn *)rpc_array_get(th->req_conns);
|
|
}
|
|
rpc_array_destroy(th->req_conns);
|
|
pthread_mutex_unlock(&th->mutex);
|
|
pthread_mutex_destroy(&th->mutex);
|
|
|
|
ev_loop_destroy(th->loop);
|
|
rpc_async_queue_free(th->req_pending);
|
|
rpc_sessionpool_free(th->req_pool);
|
|
rpc_sessionpool_free(th->req_timer);
|
|
|
|
th->req_conn_count = 0;
|
|
th->req_conns = NULL;
|
|
th->req_pending = NULL;
|
|
th->req_pool = NULL;
|
|
th->req_timer = NULL;
|
|
th->client = NULL;
|
|
th->last_conn = -1;
|
|
|
|
return TRUE;
|
|
}
|
|
|
|
|
|
boolean rpc_client_thread_init(rpc_client *client, rpc_client_thread *th) {
|
|
assert(th!=NULL);
|
|
th->loop = ev_loop_new(0);
|
|
th->req_conn_count = CLIENT_CONN_NUM;
|
|
th->req_conns = rpc_array_new();
|
|
th->last_conn = -1;
|
|
pthread_mutex_init(&th->mutex, NULL);
|
|
int i;
|
|
for (i = 0; i < th->req_conn_count; i++) {
|
|
if(rpc_req_conns_new(client, th) == NULL){
|
|
return FALSE;
|
|
}
|
|
}
|
|
th->req_pending = rpc_async_queue_new();
|
|
th->req_pool = rpc_sessionpool_new();
|
|
th->req_timer = rpc_sessionpool_new();
|
|
th->client = client;
|
|
return TRUE;
|
|
}
|
|
|
|
static void* thread_write_hander(void* data) {
|
|
rpc_client_thread *th = data;
|
|
//send pending request
|
|
rpc_request *req = NULL;
|
|
rpc_conn *conn = NULL;
|
|
|
|
int idx;
|
|
for (;;) {
|
|
req = (rpc_request*) rpc_async_queue_pop(th->req_pending);
|
|
//push pool
|
|
req->seq = rpc_sessionpool_insert(th->req_pool, req);
|
|
|
|
idx = (th->last_conn + 1) % (th->req_conn_count);
|
|
conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
|
|
|
|
/*if(!conn){
|
|
rpc_req_conns_new(th->client, th);
|
|
conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
|
|
}*/
|
|
if(conn){
|
|
th->last_conn = idx;
|
|
|
|
rpc_log_dbg("thread_write_hander:send_message\n");
|
|
|
|
//write data
|
|
rpc_request_format(req, conn);
|
|
//rpc_client_send(conn);
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
static void* thread_client_handler(void* data) {
|
|
rpc_client_thread *th = data;
|
|
|
|
//pthread_mutex_t mutex = th->client->mutex;
|
|
|
|
pthread_mutex_lock(&(th->client->mutex));
|
|
th->client->init_count++;
|
|
pthread_cond_signal(&(th->client->cond));
|
|
pthread_mutex_unlock(&(th->client->mutex));
|
|
th->thread_receive_id = pthread_self();
|
|
ev_run(th->loop, 0);
|
|
return NULL;
|
|
}
|
|
|
|
boolean rpc_client_thread_start(rpc_client_thread *th) {
|
|
assert(th!=NULL);
|
|
//read
|
|
pthread_t pid;
|
|
pthread_create(&pid, NULL, thread_client_handler, th);
|
|
//write
|
|
//pthread_create(&pid, NULL, thread_write_hander, th);
|
|
return TRUE;
|
|
}
|