secgateway/Platform/user/redismq/redis_publisher.c

159 lines
4.3 KiB
C

#include <stddef.h>
#include <assert.h>
#include "redisMq.h"
#include "log.h"
#define REDIS_DEFAULT_PORT (6379)
struct RedisPublisher gRedisPublisher;
void redisPubConnectCallback(const redisAsyncContext *redis_context,int status);
void redisPubDisconnectCallback(redisAsyncContext *redis_context, int status);
void redisPubCmdCallback(redisAsyncContext *redis_context,void *reply, void *privdata);
void *redisEventThreadFunc();
bool redisPubInit()
{
SYSLOG_INIT("redis_mq_publish");
gRedisPublisher.eventBase = event_base_new();
if (NULL == gRedisPublisher.eventBase)
{
SYSLOG_ERR("%s,%d,Create redis event failed.\n",__FUNCTION__,__LINE__);
return false;
}
memset(&gRedisPublisher.eventSem, 0, sizeof(gRedisPublisher.eventSem));
int ret = sem_init(&gRedisPublisher.eventSem, 0, 0);
if (ret != 0)
{
SYSLOG_ERR("%s,%d,Init sem failed.\n", __FUNCTION__, __LINE__);
return false;
}
SYSLOG_INFO("%s,%d,redisPubInit OK\n",__FUNCTION__,__LINE__);
return true;
}
void redisPubUninit()
{
if(gRedisPublisher.eventBase)
{
event_base_free(gRedisPublisher.eventBase);
gRedisPublisher.eventBase = NULL;
}
sem_destroy(&gRedisPublisher.eventSem);
}
void redisPubConnectCallback(const redisAsyncContext *redis_context,
int status)
{
if (status != REDIS_OK)
{
SYSLOG_ERR("%s,%d,Error: %s!.\n", __FUNCTION__, __LINE__,redis_context->errstr);
}
else
{
SYSLOG_INFO("%s,%d,Redis connected!.\n", __FUNCTION__, __LINE__);
}
}
void redisPubDisconnectCallback(redisAsyncContext *redis_context, int status)
{
if (status != REDIS_OK)
{
// 这里异常退出,可以尝试重连
SYSLOG_ERR("%s,%d,Error: %s!.\n", __FUNCTION__, __LINE__,redis_context->errstr);
}
}
// 消息接收回调函数
void redisPubCmdCallback(redisAsyncContext *redis_context,void *reply, void *privdata)
{
// printf("command callback.\n");
// 这里不执行任何操作
}
void *redisEventThreadFunc()
{
sem_wait(&gRedisPublisher.eventSem);
event_base_dispatch(gRedisPublisher.eventBase);
}
bool redisPubConnect()
{
gRedisPublisher.redisContext = redisAsyncConnect("127.0.0.1", REDIS_DEFAULT_PORT);
if (NULL == gRedisPublisher.redisContext)
{
SYSLOG_ERR("%s,%d,Connect redis failed,%s.\n", __FUNCTION__, __LINE__,gRedisPublisher.redisContext->errstr);
return false;
}
if (gRedisPublisher.redisContext->err)
{
SYSLOG_ERR("%s,%d,Connect redis error: %d, %s.\n", __FUNCTION__, __LINE__,gRedisPublisher.redisContext->err, gRedisPublisher.redisContext->errstr);
return false;
}
redisLibeventAttach(gRedisPublisher.redisContext, gRedisPublisher.eventBase);
int ret = pthread_create(&gRedisPublisher.eventThread, 0, &redisEventThreadFunc, NULL);
if (ret != 0)
{
SYSLOG_ERR("%s,%d,create event thread failed.\n", __FUNCTION__, __LINE__);
redisPubDisconnect();
return false;
}
// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态
redisAsyncSetConnectCallback(gRedisPublisher.redisContext,
(void*)&redisPubDisconnectCallback);
// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连
redisAsyncSetDisconnectCallback(gRedisPublisher.redisContext,
(void*)&redisPubDisconnectCallback);
// 启动事件线程
sem_post(&gRedisPublisher.eventSem);
SYSLOG_INFO("%s,%d,redisPubConnect OK!\n",__FUNCTION__,__LINE__);
return true;
}
void redisPubDisconnect()
{
if (gRedisPublisher.redisContext)
{
redisAsyncDisconnect(gRedisPublisher.redisContext);
//redisAsyncFree(gRedisPublisher.redisContext);
gRedisPublisher.redisContext = NULL;
}
}
bool redisPublish(char *channel_name,
char *message)
{
int ret = redisAsyncCommand(gRedisPublisher.redisContext, &redisPubCmdCallback, NULL, "PUBLISH %s %s", channel_name, message);
if (REDIS_ERR == ret)
{
printf("Publish command failed: %d\n", ret);
SYSLOG_ERR("%s,%d,Publish command failed:%d.\n", __FUNCTION__, __LINE__, ret);
return false ;
}
else
{
SYSLOG_DEBUG("%s,%d,redisPublish:%s:%s.\n", __FUNCTION__, __LINE__, channel_name, message);
}
return true;
}