#include #include #include #include "redisMq.h" #include "log.h" /*全局变量*/ struct RedisSubscriber gRedisSubscriber; struct json_object* gJsonObj = NULL; threadpool gThpool; /*函数声明*/ void redisSubConnectCallback(redisAsyncContext * redis_context, int status); void redisSubDisconnectCallback(redisAsyncContext * redis_context, int status); void redisSubCmdCallback(redisAsyncContext *redis_context,void *reply, void *privdata); /* channel:通道名字 func:消息处理函数 */ void redisRegisterChannelFunc(char *channel, pChanneCallBack func) { if (gJsonObj == NULL) { gJsonObj = json_object_new_object(); } json_object_object_add(gJsonObj, channel, json_object_new_int64((uint64_t)func)); SYSLOG_INFO("%s,%d,%s register func %p\n",__FUNCTION__,__LINE__,channel,func); } /* 根据通道获取对应处理函数 */ void *get_channel_func(char *channel) { struct json_object *result_object = NULL; int64_t funcptr = 0; result_object = json_object_object_get(gJsonObj, (const char*)channel); funcptr = json_object_get_int64(result_object); return (void*) funcptr; } bool redisSubInit(unsigned int threadNum) { SYSLOG_INIT("redis_mq_subscriber"); gRedisSubscriber.eventBase = event_base_new(); if(gRedisSubscriber.eventBase == NULL) { SYSLOG_ERR("%s,%d,Create redis event failed.\n", __FUNCTION__, __LINE__); return false; } memset(&gRedisSubscriber.eventSem, 0, sizeof(gRedisSubscriber.eventSem)); int ret = sem_init(&gRedisSubscriber.eventSem, 0, 0); if (ret != 0) { SYSLOG_ERR("%s,%d,Init sem failed.\n", __FUNCTION__, __LINE__); return false; } /* 初始化线程池*/ if(threadNum > 32) threadNum = 32; gThpool = thpool_init(threadNum); SYSLOG_INFO("%s,%d,redisSubInit OK\n",__FUNCTION__,__LINE__); return true; } void redisSubUninit() { if(gRedisSubscriber.eventBase) { event_base_free(gRedisSubscriber.eventBase); gRedisSubscriber.eventBase = NULL; } sem_destroy(&gRedisSubscriber.eventSem); return ; } void redisSubDisconnect() { if (gRedisSubscriber.redisContext) { redisAsyncDisconnect(gRedisSubscriber.redisContext); gRedisSubscriber.redisContext = NULL; } } void redisSubConnectCallback(redisAsyncContext *redis_context,int status) { if (status != REDIS_OK) { } else { } } void redisSubDisconnectCallback(redisAsyncContext *redis_context, int status) { if (status != REDIS_OK) { SYSLOG_ERR("%s,%d,Error: %s!.\n", __FUNCTION__, __LINE__,redis_context->errstr); } } void recvChanMsg(const char *channel_name, char *message, int len) { pChanneCallBack pFunc = NULL; struct RecvMsg_t *pRecvMsg; char *pMsg = NULL; if (NULL != channel_name) { pFunc = (pChanneCallBack)get_channel_func((char*)channel_name); } if (pFunc) { pRecvMsg = malloc(sizeof(struct RecvMsg_t)); if (pRecvMsg) { pRecvMsg->msg = malloc(len); if(pRecvMsg->msg) { memcpy(pRecvMsg->msg, message, len); pRecvMsg->len = len; SYSLOG_DEBUG("%s,%d,len:%d,msg:%s\n", __FUNCTION__, __LINE__, len, message); thpool_add_work(gThpool,(void*)pFunc,(void*)pRecvMsg); } } } } void freeMsg(struct RecvMsg_t *pMsg) { if(pMsg) { if (pMsg->msg) { free(pMsg->msg); free(pMsg); } } } void redisSubCommandCallback(redisAsyncContext *redis_context, void *reply, void *privdata) { if (NULL == reply) { return ; } redisReply *redis_reply = (redisReply *)(reply); // 订阅接收到的消息是一个带三元素的数组 if (redis_reply->type == REDIS_REPLY_ARRAY && redis_reply->elements == 3) { #if 0 printf(": Recieve message:%s:%d:%s:%d:%s:%d\n", redis_reply->element[0]->str, redis_reply->element[0]->len, redis_reply->element[1]->str, redis_reply->element[1]->len, redis_reply->element[2]->str, redis_reply->element[2]->len); #endif recvChanMsg(redis_reply->element[1]->str,redis_reply->element[2]->str, redis_reply->element[2]->len); } } void redisSubEventThreadFunc() { sem_wait(&gRedisSubscriber.eventSem); // 开启事件分发,event_base_dispatch会阻塞 event_base_dispatch(gRedisSubscriber.eventBase); return; } bool redisSubConnect() { /* 异步连接到redis服务器上,使用默认端口 */ gRedisSubscriber.redisContext = redisAsyncConnect("127.0.0.1", REDIS_DEFAULT_PORT); if (NULL == gRedisSubscriber.redisContext) { SYSLOG_ERR("%s,%d,Connect redis failed.%s\n", __FUNCTION__, __LINE__,gRedisSubscriber.redisContext->errstr); return false; } if (gRedisSubscriber.redisContext->err) { SYSLOG_ERR("%s,%d,Connect redis error: %d, %s.\n", __FUNCTION__, __LINE__,gRedisSubscriber.redisContext->err, gRedisSubscriber.redisContext->errstr); return false; } /* 将事件绑定到redis context上,使设置给redis的回调跟事件关联 */ redisLibeventAttach(gRedisSubscriber.redisContext, gRedisSubscriber.eventBase); /* 创建事件处理线程 */ int ret = pthread_create(&gRedisSubscriber.eventThread, 0, (void*)redisSubEventThreadFunc, NULL); if (ret != 0) { SYSLOG_ERR("%s,%d,create event thread failed.\n", __FUNCTION__, __LINE__); redisSubDisconnect(); return false; } // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态 redisAsyncSetConnectCallback(gRedisSubscriber.redisContext, (void*)&redisSubConnectCallback); // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连 redisAsyncSetDisconnectCallback(gRedisSubscriber.redisContext, (void*)&redisSubDisconnectCallback); // 启动事件线程 sem_post(&gRedisSubscriber.eventSem); SYSLOG_INFO("%s,%d,redisSubConnect OK!\n",__FUNCTION__,__LINE__); return true; } bool redisSubscriber(char *channel_name) { int ret = redisAsyncCommand(gRedisSubscriber.redisContext, &redisSubCommandCallback, NULL, "SUBSCRIBE %s", channel_name); if (REDIS_ERR == ret) { SYSLOG_ERR("%s,%d,redisSubscriber %s failed,ret=%d\n", __FUNCTION__, __LINE__, channel_name, ret); return false; } return true; }