secgateway/Platform/user/redismq/redis_subscriber.c

255 lines
6.3 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <stddef.h>
#include <assert.h>
#include <json-c/json.h>
#include "redisMq.h"
#include "log.h"
/*全局变量*/
struct RedisSubscriber gRedisSubscriber;
struct json_object* gJsonObj = NULL;
threadpool gThpool;
/*函数声明*/
void redisSubConnectCallback(redisAsyncContext * redis_context, int status);
void redisSubDisconnectCallback(redisAsyncContext * redis_context, int status);
void redisSubCmdCallback(redisAsyncContext *redis_context,void *reply, void *privdata);
/*
channel:通道名字
func消息处理函数
*/
void redisRegisterChannelFunc(char *channel, pChanneCallBack func)
{
if (gJsonObj == NULL)
{
gJsonObj = json_object_new_object();
}
json_object_object_add(gJsonObj, channel, json_object_new_int64((uint64_t)func));
SYSLOG_INFO("%s,%d,%s register func %p\n",__FUNCTION__,__LINE__,channel,func);
}
/*
根据通道获取对应处理函数
*/
void *get_channel_func(char *channel)
{
struct json_object *result_object = NULL;
int64_t funcptr = 0;
result_object = json_object_object_get(gJsonObj, (const char*)channel);
funcptr = json_object_get_int64(result_object);
return (void*) funcptr;
}
bool redisSubInit(unsigned int threadNum)
{
SYSLOG_INIT("redis_mq_subscriber");
gRedisSubscriber.eventBase = event_base_new();
if(gRedisSubscriber.eventBase == NULL)
{
SYSLOG_ERR("%s,%d,Create redis event failed.\n", __FUNCTION__, __LINE__);
return false;
}
memset(&gRedisSubscriber.eventSem, 0, sizeof(gRedisSubscriber.eventSem));
int ret = sem_init(&gRedisSubscriber.eventSem, 0, 0);
if (ret != 0)
{
SYSLOG_ERR("%s,%d,Init sem failed.\n", __FUNCTION__, __LINE__);
return false;
}
/* 初始化线程池*/
if(threadNum > 32) threadNum = 32;
gThpool = thpool_init(threadNum);
SYSLOG_INFO("%s,%d,redisSubInit OK\n",__FUNCTION__,__LINE__);
return true;
}
void redisSubUninit()
{
if(gRedisSubscriber.eventBase)
{
event_base_free(gRedisSubscriber.eventBase);
gRedisSubscriber.eventBase = NULL;
}
sem_destroy(&gRedisSubscriber.eventSem);
return ;
}
void redisSubDisconnect()
{
if (gRedisSubscriber.redisContext)
{
redisAsyncDisconnect(gRedisSubscriber.redisContext);
gRedisSubscriber.redisContext = NULL;
}
}
void redisSubConnectCallback(redisAsyncContext *redis_context,int status)
{
if (status != REDIS_OK)
{
}
else
{
}
}
void redisSubDisconnectCallback(redisAsyncContext *redis_context, int status)
{
if (status != REDIS_OK)
{
SYSLOG_ERR("%s,%d,Error: %s!.\n", __FUNCTION__, __LINE__,redis_context->errstr);
}
}
void recvChanMsg(const char *channel_name, char *message, int len)
{
pChanneCallBack pFunc = NULL;
struct RecvMsg_t *pRecvMsg;
char *pMsg = NULL;
if (NULL != channel_name)
{
pFunc = (pChanneCallBack)get_channel_func((char*)channel_name);
}
if (pFunc)
{
pRecvMsg = malloc(sizeof(struct RecvMsg_t));
if (pRecvMsg)
{
pRecvMsg->msg = malloc(len);
if(pRecvMsg->msg)
{
memcpy(pRecvMsg->msg, message, len);
pRecvMsg->len = len;
SYSLOG_DEBUG("%s,%d,len:%d,msg:%s\n", __FUNCTION__, __LINE__, len, message);
thpool_add_work(gThpool,(void*)pFunc,(void*)pRecvMsg);
}
}
}
}
void freeMsg(struct RecvMsg_t *pMsg)
{
if(pMsg)
{
if (pMsg->msg)
{
free(pMsg->msg);
free(pMsg);
}
}
}
void redisSubCommandCallback(redisAsyncContext *redis_context,
void *reply, void *privdata)
{
if (NULL == reply)
{
return ;
}
redisReply *redis_reply = (redisReply *)(reply);
// 订阅接收到的消息是一个带三元素的数组
if (redis_reply->type == REDIS_REPLY_ARRAY &&
redis_reply->elements == 3)
{
#if 0
printf(": Recieve message:%s:%d:%s:%d:%s:%d\n",
redis_reply->element[0]->str, redis_reply->element[0]->len,
redis_reply->element[1]->str, redis_reply->element[1]->len,
redis_reply->element[2]->str, redis_reply->element[2]->len);
#endif
recvChanMsg(redis_reply->element[1]->str,redis_reply->element[2]->str, redis_reply->element[2]->len);
}
}
void redisSubEventThreadFunc()
{
sem_wait(&gRedisSubscriber.eventSem);
// 开启事件分发event_base_dispatch会阻塞
event_base_dispatch(gRedisSubscriber.eventBase);
return;
}
bool redisSubConnect()
{
/* 异步连接到redis服务器上使用默认端口 */
gRedisSubscriber.redisContext = redisAsyncConnect("127.0.0.1", REDIS_DEFAULT_PORT);
if (NULL == gRedisSubscriber.redisContext)
{
SYSLOG_ERR("%s,%d,Connect redis failed.%s\n", __FUNCTION__, __LINE__,gRedisSubscriber.redisContext->errstr);
return false;
}
if (gRedisSubscriber.redisContext->err)
{
SYSLOG_ERR("%s,%d,Connect redis error: %d, %s.\n", __FUNCTION__, __LINE__,gRedisSubscriber.redisContext->err, gRedisSubscriber.redisContext->errstr);
return false;
}
/* 将事件绑定到redis context上使设置给redis的回调跟事件关联 */
redisLibeventAttach(gRedisSubscriber.redisContext, gRedisSubscriber.eventBase);
/* 创建事件处理线程 */
int ret = pthread_create(&gRedisSubscriber.eventThread, 0, (void*)redisSubEventThreadFunc, NULL);
if (ret != 0)
{
SYSLOG_ERR("%s,%d,create event thread failed.\n", __FUNCTION__, __LINE__);
redisSubDisconnect();
return false;
}
// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态
redisAsyncSetConnectCallback(gRedisSubscriber.redisContext,
(void*)&redisSubConnectCallback);
// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连
redisAsyncSetDisconnectCallback(gRedisSubscriber.redisContext,
(void*)&redisSubDisconnectCallback);
// 启动事件线程
sem_post(&gRedisSubscriber.eventSem);
SYSLOG_INFO("%s,%d,redisSubConnect OK!\n",__FUNCTION__,__LINE__);
return true;
}
bool redisSubscriber(char *channel_name)
{
int ret = redisAsyncCommand(gRedisSubscriber.redisContext,
&redisSubCommandCallback, NULL, "SUBSCRIBE %s",
channel_name);
if (REDIS_ERR == ret)
{
SYSLOG_ERR("%s,%d,redisSubscriber %s failed,ret=%d\n", __FUNCTION__, __LINE__, channel_name, ret);
return false;
}
return true;
}