#include <stddef.h>
#include <assert.h>
#include "redisMq.h"
#include "log.h"


#define  REDIS_DEFAULT_PORT (6379)


struct RedisPublisher  gRedisPublisher;

void redisPubConnectCallback(const redisAsyncContext *redis_context,int status);
void redisPubDisconnectCallback(redisAsyncContext *redis_context, int status);
void redisPubCmdCallback(redisAsyncContext *redis_context,void *reply, void *privdata);
void *redisEventThreadFunc();


bool redisPubInit()
{
	 SYSLOG_INIT("redis_mq_publish");
	 gRedisPublisher.eventBase = event_base_new();   
    if (NULL == gRedisPublisher.eventBase)
    {
	    SYSLOG_ERR("%s,%d,Create redis event failed.\n",__FUNCTION__,__LINE__);
        return false;
    }
 
    memset(&gRedisPublisher.eventSem, 0, sizeof(gRedisPublisher.eventSem));
    int ret = sem_init(&gRedisPublisher.eventSem, 0, 0);
    if (ret != 0)
    {
       SYSLOG_ERR("%s,%d,Init sem failed.\n", __FUNCTION__, __LINE__);
        return false;
    }
    SYSLOG_INFO("%s,%d,redisPubInit OK\n",__FUNCTION__,__LINE__);
    return true;
}

void redisPubUninit()
{
	if(gRedisPublisher.eventBase)
	{
		event_base_free(gRedisPublisher.eventBase);
	    gRedisPublisher.eventBase = NULL;
	}
    
    sem_destroy(&gRedisPublisher.eventSem);   
}
void redisPubConnectCallback(const redisAsyncContext *redis_context,
    int status)
{
    if (status != REDIS_OK)
    {
	    SYSLOG_ERR("%s,%d,Error: %s!.\n", __FUNCTION__, __LINE__,redis_context->errstr);
    }
    else
    {
	    SYSLOG_INFO("%s,%d,Redis connected!.\n", __FUNCTION__, __LINE__);
    }
}
 
void redisPubDisconnectCallback(redisAsyncContext *redis_context, int status)
{
    if (status != REDIS_OK)
    {
		// 这里异常退出,可以尝试重连
	    SYSLOG_ERR("%s,%d,Error: %s!.\n", __FUNCTION__, __LINE__,redis_context->errstr);
    }
}
 
// 消息接收回调函数
void redisPubCmdCallback(redisAsyncContext *redis_context,void *reply, void *privdata)
{
   // printf("command callback.\n");
	// 这里不执行任何操作
}
 

void *redisEventThreadFunc()
{
    sem_wait(&gRedisPublisher.eventSem);
	event_base_dispatch(gRedisPublisher.eventBase);
}


bool redisPubConnect()
{
   
    gRedisPublisher.redisContext = redisAsyncConnect("127.0.0.1", REDIS_DEFAULT_PORT);  
    if (NULL == gRedisPublisher.redisContext)
    {
	    SYSLOG_ERR("%s,%d,Connect redis failed,%s.\n", __FUNCTION__, __LINE__,gRedisPublisher.redisContext->errstr);
        return false;
    }
 
    if (gRedisPublisher.redisContext->err)
    {
    
	    SYSLOG_ERR("%s,%d,Connect redis error: %d, %s.\n", __FUNCTION__, __LINE__,gRedisPublisher.redisContext->err, gRedisPublisher.redisContext->errstr);
        return false;
    }
 
  
    redisLibeventAttach(gRedisPublisher.redisContext, gRedisPublisher.eventBase);    
    int ret = pthread_create(&gRedisPublisher.eventThread, 0, &redisEventThreadFunc, NULL);
    if (ret != 0)
    {
	    SYSLOG_ERR("%s,%d,create event thread failed.\n", __FUNCTION__, __LINE__);
        redisPubDisconnect();
        return false;
    }
 
	// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态
    redisAsyncSetConnectCallback(gRedisPublisher.redisContext, 
        (void*)&redisPubDisconnectCallback);
 
	// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连
    redisAsyncSetDisconnectCallback(gRedisPublisher.redisContext,
        (void*)&redisPubDisconnectCallback);
 
	// 启动事件线程
    sem_post(&gRedisPublisher.eventSem);
    SYSLOG_INFO("%s,%d,redisPubConnect OK!\n",__FUNCTION__,__LINE__);
    return true;
}
 
void redisPubDisconnect()
{
    if (gRedisPublisher.redisContext)
    {
        redisAsyncDisconnect(gRedisPublisher.redisContext);
        //redisAsyncFree(gRedisPublisher.redisContext);
        gRedisPublisher.redisContext = NULL;
    }

}
 
bool redisPublish(char *channel_name,
    char *message)
{
    int ret = redisAsyncCommand(gRedisPublisher.redisContext, &redisPubCmdCallback, NULL, "PUBLISH %s %s", channel_name, message);
	if (REDIS_ERR == ret)
	{
		printf("Publish command failed: %d\n", ret);
		SYSLOG_ERR("%s,%d,Publish command failed:%d.\n", __FUNCTION__, __LINE__, ret);

		        return false ;
	}
	else
	{
		SYSLOG_DEBUG("%s,%d,redisPublish:%s:%s.\n", __FUNCTION__, __LINE__, channel_name, message);
	}
 
    return true;
}