secgateway/Platform/user/rpc/rpc_client.c

263 lines
6.6 KiB
C
Executable File

/*
* rpc_client.c
*
* Created on: 2011-3-21
* Author: yanghu
*/
#include "rpc_client.h"
#include "rpc_request.h"
#include "rpc_response.h"
#include "rpc_thread.h"
#include "rpc_module.h"
#include "rpc_conn.h"
#include "rpc_util.h"
#include <assert.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <stdio.h>
#include <memory.h>
#include <unistd.h>
#include <time.h>
#include <sys/eventfd.h>
#include <sys/time.h>
//server: one thread multiply connection. a connection would block a thread
//client: one thread one connection
//three thread,three conn,send queue/pending queue
//if connection disconnected?
rpc_client* rpc_client_new() {
rpc_client *client = rpc_new0(rpc_client,1);
if(client == NULL)
{
return NULL;
}
client->host = NULL;
client->port = 0;
client->thread_count = CLIENT_THREAD_NUM;
client->threads = rpc_new0(rpc_client_thread,CLIENT_THREAD_NUM);
client->last_thread = -1;
client->init_count = 0;
pthread_mutex_init(&client->mutex, NULL);
pthread_cond_init(&client->cond, NULL);
return client;
}
void rpc_client_destroy(rpc_client *client) {
int i = 0;
if(client == NULL){
return;
}
for (i = 0; i < client->thread_count; ++i) {
rpc_client_thread_destroy(&client->threads[i]);
}
rpc_free(client->threads);
pthread_mutex_destroy(&client->mutex);
pthread_cond_destroy(&client->cond);
memset(client, 0, sizeof(rpc_client));
rpc_free(client);
return;
}
boolean rpc_client_connect(rpc_client* client, char* host, int port) {
assert(client!=NULL);
client->host = host;
client->port = port;
int i;
for (i = 0; i < client->thread_count; ++i) {
if (!rpc_client_thread_init(client, &client->threads[i]))
return FALSE;
}
for (i = 0; i < client->thread_count; ++i) {
if (!rpc_client_thread_start(&client->threads[i])) {
return FALSE;
}
}
pthread_mutex_lock(&client->mutex);
while (client->init_count < client->thread_count) {
pthread_cond_wait(&client->cond, &client->mutex);
}
pthread_mutex_unlock(&client->mutex);
return TRUE;
}
rpc_client* rpc_client_connect_ex(char *dst_name){
rpc_client* client = rpc_client_new();
rpc_module rpc_module_array[] = MODULE_REG_ARRAY;
rpc_module *dst_module = NULL;
int module_max_num = sizeof(rpc_module_array) / sizeof(rpc_module);
int module_index = 0;
for(module_index = 0; module_index < module_max_num; module_index++){
if(strcmp(rpc_module_array[module_index].module_name, dst_name) == 0){
dst_module = &rpc_module_array[module_index];
if( client && rpc_client_connect(client, dst_module->host,
dst_module->module_port) ){
return client;
}
break;
}
}
if(client){
rpc_client_destroy(client);
}
return NULL;
}
static void cb_call(rpc_conn *conn, ret_code code, pointer output,
int output_len, void* data) {
rpc_response *rsp = data;
rsp->code = code;
rsp->output = output;
rsp->output_len = output_len;
uint64_t n = 1;
write(POINTER_TO_INT(rsp->data), &n, sizeof(n));
}
static void cb_call_ex(ret_code code, pointer output,
int output_len, void* data) {
rpc_response *rsp = data;
rsp->code = code;
rsp->output = output;
rsp->output_len = output_len;
uint64_t n = 1;
write(POINTER_TO_INT(rsp->data), &n, sizeof(n));
}
ret_code rpc_client_call(rpc_client* client, char* service_name,
char* method_name, pointer input, int input_len, pointer* output,
int* output_len) {
int n_fd = eventfd(0, 0);
rpc_response rsp;
long time_diff;
ret_code ret = RET_OK;
assert(client!=NULL);
rsp.data = (INT_TO_POINTER(n_fd));
time_diff = rpc_time_msec();
ret = rpc_client_call_async(client, service_name, method_name, input, input_len,
cb_call_ex, &rsp);
if(ret != RET_OK){
close(n_fd);
return ret;
}
//TODO
//ev read timeout?
fd_set r_set;
FD_ZERO(&r_set);
FD_SET(n_fd,&r_set);
struct timeval tm;
tm.tv_sec = TIMEOUT;
tm.tv_usec = 0;
int rt = select(n_fd + 1, &r_set, NULL, NULL, &tm);
close(n_fd);
time_diff = rpc_time_msec()-time_diff;
if(time_diff > TIMEWARN)
{
rpc_log_warn("call %s@%s cost %ld\n",service_name, method_name, time_diff);
}
if (rt == -1) {
return RET_UNKNOWN;
} else if (rt == 0) {
return RET_TIMEOUT;
} else {
if (output != NULL)
*output = rsp.output;
if (output_len != NULL)
*output_len = rsp.output_len;
return rsp.code;
}
}
void rpc_client_call_async_thread(rpc_client* client, char* service_name,
char* method_name, pointer input, int input_len,
rpc_callback callback, pointer data) {
rpc_request *req = rpc_request_new();
req->callback = callback;
req->method_name = method_name;
req->service_name = service_name;
req->input = input;
req->input_len = input_len;
req->data = data;
int idx;
idx = (client->last_thread + 1) % (client->thread_count);
client->last_thread = idx;
rpc_client_thread *th = client->threads + idx;
rpc_async_queue_push(th->req_pending, req);
}
ret_code rpc_client_call_async(rpc_client* client, char* service_name,
char* method_name, pointer input, int input_len,
rpc_callback callback, pointer data) {
rpc_client_thread *th = client->threads;
rpc_request *req = NULL;
rpc_conn *conn = NULL;
ret_code ret = RET_OK;
int idx;
idx = (th->last_conn + 1) % (th->req_conn_count);
conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
/*if(!conn){
rpc_req_conns_new(client, th);
conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
}*/
if(!conn)
{
return RET_OK;
}
req = rpc_request_new();
if(req == NULL){
return RET_OK;
}
req->callback = callback;
req->method_name = method_name;
req->service_name = service_name;
req->input = input;
req->input_len = input_len;
req->data = data;
//push pool
req->seq = rpc_sessionpool_insert(th->req_pool, req);
th->last_conn = idx;
rpc_log_dbg("rpc_client_call_async:send_message\n");
//write data
rpc_request_format(req, conn);
if(rpc_client_send(conn) == FALSE)
{
ret = RET_SENDERR;
}
return ret;
}