secgateway/Platform/user/redismq/redis_subscriber.c

255 lines
6.3 KiB
C
Raw Normal View History

2019-06-19 09:10:58 +00:00
#include <stddef.h>
#include <assert.h>
#include <json-c/json.h>
#include "redisMq.h"
#include "log.h"
/*全局变量*/
struct RedisSubscriber gRedisSubscriber;
struct json_object* gJsonObj = NULL;
threadpool gThpool;
/*函数声明*/
void redisSubConnectCallback(redisAsyncContext * redis_context, int status);
void redisSubDisconnectCallback(redisAsyncContext * redis_context, int status);
void redisSubCmdCallback(redisAsyncContext *redis_context,void *reply, void *privdata);
/*
channel:
func
*/
void redisRegisterChannelFunc(char *channel, pChanneCallBack func)
{
if (gJsonObj == NULL)
{
gJsonObj = json_object_new_object();
}
json_object_object_add(gJsonObj, channel, json_object_new_int64((uint64_t)func));
SYSLOG_INFO("%s,%d,%s register func %p\n",__FUNCTION__,__LINE__,channel,func);
}
/*
*/
void *get_channel_func(char *channel)
{
struct json_object *result_object = NULL;
int64_t funcptr = 0;
result_object = json_object_object_get(gJsonObj, (const char*)channel);
funcptr = json_object_get_int64(result_object);
return (void*) funcptr;
}
bool redisSubInit(unsigned int threadNum)
{
SYSLOG_INIT("redis_mq_subscriber");
gRedisSubscriber.eventBase = event_base_new();
if(gRedisSubscriber.eventBase == NULL)
{
SYSLOG_ERR("%s,%d,Create redis event failed.\n", __FUNCTION__, __LINE__);
return false;
}
memset(&gRedisSubscriber.eventSem, 0, sizeof(gRedisSubscriber.eventSem));
int ret = sem_init(&gRedisSubscriber.eventSem, 0, 0);
if (ret != 0)
{
SYSLOG_ERR("%s,%d,Init sem failed.\n", __FUNCTION__, __LINE__);
return false;
}
/* 初始化线程池*/
if(threadNum > 32) threadNum = 32;
gThpool = thpool_init(threadNum);
SYSLOG_INFO("%s,%d,redisSubInit OK\n",__FUNCTION__,__LINE__);
return true;
}
void redisSubUninit()
{
if(gRedisSubscriber.eventBase)
{
event_base_free(gRedisSubscriber.eventBase);
gRedisSubscriber.eventBase = NULL;
}
sem_destroy(&gRedisSubscriber.eventSem);
return ;
}
void redisSubDisconnect()
{
if (gRedisSubscriber.redisContext)
{
redisAsyncDisconnect(gRedisSubscriber.redisContext);
gRedisSubscriber.redisContext = NULL;
}
}
void redisSubConnectCallback(redisAsyncContext *redis_context,int status)
{
if (status != REDIS_OK)
{
}
else
{
}
}
void redisSubDisconnectCallback(redisAsyncContext *redis_context, int status)
{
if (status != REDIS_OK)
{
SYSLOG_ERR("%s,%d,Error: %s!.\n", __FUNCTION__, __LINE__,redis_context->errstr);
}
}
void recvChanMsg(const char *channel_name, char *message, int len)
{
pChanneCallBack pFunc = NULL;
struct RecvMsg_t *pRecvMsg;
char *pMsg = NULL;
if (NULL != channel_name)
{
pFunc = (pChanneCallBack)get_channel_func((char*)channel_name);
}
if (pFunc)
{
pRecvMsg = malloc(sizeof(struct RecvMsg_t));
if (pRecvMsg)
{
pRecvMsg->msg = malloc(len);
if(pRecvMsg->msg)
{
memcpy(pRecvMsg->msg, message, len);
pRecvMsg->len = len;
SYSLOG_DEBUG("%s,%d,len:%d,msg:%s\n", __FUNCTION__, __LINE__, len, message);
thpool_add_work(gThpool,(void*)pFunc,(void*)pRecvMsg);
}
}
}
}
void freeMsg(struct RecvMsg_t *pMsg)
{
if(pMsg)
{
if (pMsg->msg)
{
free(pMsg->msg);
free(pMsg);
}
}
}
void redisSubCommandCallback(redisAsyncContext *redis_context,
void *reply, void *privdata)
{
if (NULL == reply)
{
return ;
}
redisReply *redis_reply = (redisReply *)(reply);
// 订阅接收到的消息是一个带三元素的数组
if (redis_reply->type == REDIS_REPLY_ARRAY &&
redis_reply->elements == 3)
{
#if 0
printf(": Recieve message:%s:%d:%s:%d:%s:%d\n",
redis_reply->element[0]->str, redis_reply->element[0]->len,
redis_reply->element[1]->str, redis_reply->element[1]->len,
redis_reply->element[2]->str, redis_reply->element[2]->len);
#endif
recvChanMsg(redis_reply->element[1]->str,redis_reply->element[2]->str, redis_reply->element[2]->len);
}
}
void redisSubEventThreadFunc()
{
sem_wait(&gRedisSubscriber.eventSem);
// 开启事件分发event_base_dispatch会阻塞
event_base_dispatch(gRedisSubscriber.eventBase);
return;
}
bool redisSubConnect()
{
/* 异步连接到redis服务器上使用默认端口 */
gRedisSubscriber.redisContext = redisAsyncConnect("127.0.0.1", REDIS_DEFAULT_PORT);
if (NULL == gRedisSubscriber.redisContext)
{
SYSLOG_ERR("%s,%d,Connect redis failed.%s\n", __FUNCTION__, __LINE__,gRedisSubscriber.redisContext->errstr);
return false;
}
if (gRedisSubscriber.redisContext->err)
{
SYSLOG_ERR("%s,%d,Connect redis error: %d, %s.\n", __FUNCTION__, __LINE__,gRedisSubscriber.redisContext->err, gRedisSubscriber.redisContext->errstr);
return false;
}
/* 将事件绑定到redis context上使设置给redis的回调跟事件关联 */
redisLibeventAttach(gRedisSubscriber.redisContext, gRedisSubscriber.eventBase);
/* 创建事件处理线程 */
int ret = pthread_create(&gRedisSubscriber.eventThread, 0, (void*)redisSubEventThreadFunc, NULL);
if (ret != 0)
{
SYSLOG_ERR("%s,%d,create event thread failed.\n", __FUNCTION__, __LINE__);
redisSubDisconnect();
return false;
}
// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态
redisAsyncSetConnectCallback(gRedisSubscriber.redisContext,
(void*)&redisSubConnectCallback);
// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连
redisAsyncSetDisconnectCallback(gRedisSubscriber.redisContext,
(void*)&redisSubDisconnectCallback);
// 启动事件线程
sem_post(&gRedisSubscriber.eventSem);
SYSLOG_INFO("%s,%d,redisSubConnect OK!\n",__FUNCTION__,__LINE__);
return true;
}
bool redisSubscriber(char *channel_name)
{
int ret = redisAsyncCommand(gRedisSubscriber.redisContext,
&redisSubCommandCallback, NULL, "SUBSCRIBE %s",
channel_name);
if (REDIS_ERR == ret)
{
SYSLOG_ERR("%s,%d,redisSubscriber %s failed,ret=%d\n", __FUNCTION__, __LINE__, channel_name, ret);
return false;
}
return true;
}