/*
 * rpc_client.c
 *
 *  Created on: 2011-3-21
 *      Author: yanghu
 */

#include "rpc_client.h"
#include "rpc_request.h"
#include "rpc_response.h"
#include "rpc_thread.h"
#include "rpc_module.h"
#include "rpc_conn.h"
#include "rpc_util.h"

#include <assert.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <stdio.h>
#include <memory.h>
#include <unistd.h>
#include <time.h>
#include <sys/eventfd.h>
#include <sys/time.h>

//server: one thread multiply connection. a connection would block a thread
//client: one thread one connection

//three thread,three conn,send queue/pending queue
//if connection disconnected?

rpc_client* rpc_client_new() {
	rpc_client *client = rpc_new0(rpc_client,1);
    if(client == NULL)
    {
        return NULL;
    }
	client->host = NULL;
	client->port = 0;
	client->thread_count = CLIENT_THREAD_NUM;
	client->threads = rpc_new(rpc_client_thread,CLIENT_THREAD_NUM);
	client->last_thread = -1;
	client->init_count = 0;
	pthread_mutex_init(&client->mutex, NULL);
	pthread_cond_init(&client->cond, NULL);
	return client;
}

void rpc_client_destroy(rpc_client *client ) {

    if(client == NULL){
        return;
    }
    
    rpc_free(client->threads);    
	pthread_mutex_destroy(&client->mutex);
	pthread_cond_destroy(&client->cond);

    memset(client, 0, sizeof(rpc_client));
	return;
}


boolean rpc_client_connect(rpc_client* client, char* host, int port) {
	assert(client!=NULL);
	client->host = host;
	client->port = port;
	int i;
	for (i = 0; i < client->thread_count; ++i) {
		if (!rpc_client_thread_init(client, &client->threads[i]))
			return FALSE;
	}
	for (i = 0; i < client->thread_count; ++i) {
		if (!rpc_client_thread_start(&client->threads[i])) {
			return FALSE;
		}
	}
	pthread_mutex_lock(&client->mutex);
	while (client->init_count < client->thread_count) {
		pthread_cond_wait(&client->cond, &client->mutex);
	}
	pthread_mutex_unlock(&client->mutex);
	return TRUE;
}

rpc_client* rpc_client_connect_ex(char *dst_name){
    rpc_client* client = rpc_client_new();
    rpc_module rpc_module_array[] = MODULE_REG_ARRAY;
    rpc_module *dst_module = NULL;
    int module_max_num = sizeof(rpc_module_array) / sizeof(rpc_module);
    int module_index = 0;

    for(module_index = 0; module_index < module_max_num; module_index++){
        if(strcmp(rpc_module_array[module_index].module_name, dst_name) == 0){
            dst_module = &rpc_module_array[module_index];
            if( client && rpc_client_connect(client, dst_module->host, 
                                      dst_module->module_port) ){
                return client;
            }

            break;
        }
    }

    if(client){
        rpc_client_destroy(client);
    }

    return NULL;
}

static void cb_call(rpc_conn *conn, ret_code code, pointer output,
		int output_len, void* data) {
	rpc_response *rsp = data;
	rsp->code = code;
	rsp->output = output;
	rsp->output_len = output_len;

	uint64_t n = 1;
	write(POINTER_TO_INT(rsp->data), &n, sizeof(n));
}

static void cb_call_ex(ret_code code, pointer output,
                       int output_len, void* data) {
    rpc_response *rsp = data;
    rsp->code = code;
    rsp->output = output;
    rsp->output_len = output_len;

    uint64_t n = 1;
    write(POINTER_TO_INT(rsp->data), &n, sizeof(n));
}


ret_code rpc_client_call(rpc_client* client, char* service_name,
		char* method_name, pointer input, int input_len, pointer* output,
		int* output_len) {
	int n_fd = eventfd(0, 0);
	rpc_response rsp;
    long time_diff;
    
    assert(client!=NULL);
	rsp.data = (INT_TO_POINTER(n_fd));
    time_diff = rpc_time_msec();
	rpc_client_call_async(client, service_name, method_name, input, input_len,
			              cb_call_ex, &rsp);

	//TODO
	//ev read timeout?
	fd_set r_set;
	FD_ZERO(&r_set);
	FD_SET(n_fd,&r_set);
	struct timeval tm;
	tm.tv_sec = TIMEOUT;
	tm.tv_usec = 0;
	int rt = select(n_fd + 1, &r_set, NULL, NULL, &tm);

	close(n_fd);
    
    time_diff = rpc_time_msec()-time_diff;
    if(time_diff > TIMEWARN)
    {
        rpc_log_warn("call %s@%s cost %ld\n",service_name, method_name, time_diff);
    }
    
	if (rt == -1) {
		return RET_UNKNOWN;
	} else if (rt == 0) {
		return RET_TIMEOUT;
	} else {
		if (output != NULL)
			*output = rsp.output;
		if (output_len != NULL)
			*output_len = rsp.output_len;
		return rsp.code;
	}
}

void rpc_client_call_async_thread(rpc_client* client, char* service_name,
        char* method_name, pointer input, int input_len,
        rpc_callback callback, pointer data) {
    rpc_request *req = rpc_request_new();
    req->callback = callback;
    req->method_name = method_name;
    req->service_name = service_name;
    req->input = input;
    req->input_len = input_len;
    req->data = data;

    int idx;
    idx = (client->last_thread + 1) % (client->thread_count);
    client->last_thread = idx;
    rpc_client_thread *th = client->threads + idx;
    rpc_async_queue_push(th->req_pending, req);
}


ret_code rpc_client_call_async(rpc_client* client, char* service_name,
                                  char* method_name, pointer input, int input_len,
		                          rpc_callback callback, pointer data) {

    rpc_client_thread *th = client->threads;		                      
	rpc_request *req = rpc_request_new();    
	rpc_conn *conn = NULL;
    int idx;

    if(req == NULL){
        return RET_ERR;
    }
        
	req->callback = callback;
	req->method_name = method_name;
	req->service_name = service_name;
	req->input = input;
	req->input_len = input_len;
	req->data = data;

    //push pool
	req->seq = rpc_sessionpool_insert(th->req_pool, req);

	idx = (th->last_conn + 1) % (th->req_conn_count);
	conn = (rpc_conn*) rpc_array_index(th->req_conns, idx);
	th->last_conn = idx;

    rpc_log_dbg("rpc_client_call_async:send_message\n");

	//write data
	rpc_request_format(req, conn);
	rpc_send_message(conn);
    
    return RET_OK;
}