vcpe/srcs/libs/mq/mq_data.c

106 lines
2.5 KiB
C
Raw Normal View History

//
// Created by xajhuang on 2022/6/6.
//
#include <stdlib.h>
#include <zmq.h>
#include <uv.h>
#include <string.h>
#include <zlog.h>
#include "msg_queue.h"
#include "config.h"
#include "user_errno.h"
#include "misc.h"
#include "task_manager.h"
static void *g_pDataCh = NULL;
static DATACHNNELCB g_pDataChCb = NULL;
static void process_data_msg(void *pDataCh, zmq_msg_t *pMsg) {
const char *pResp;
zmq_msg_t msg;
const char *pRecMsg = strdup((const char *)zmq_msg_data(pMsg));
dzlog_info("receive(%zu): %s\n", zmq_msg_size(pMsg), pRecMsg);
zmq_msg_close(pMsg);
if (g_pDataChCb) {
pResp = g_pDataChCb(pRecMsg, pDataCh);
if (pResp != NULL && strlen(pResp) > 0) {
zmq_msg_init_size(&msg, strlen(pResp) + 1);
memcpy(zmq_msg_data(&msg), pResp, strlen(pResp));
zmq_msg_send(&msg, pDataCh, 0);
free((void *)pResp);
}
zmq_msg_close(&msg);
} else {
dzlog_warn("Unhandled message: %s\n", pRecMsg);
}
free((void *)pRecMsg);
}
int mq_data_send_msg(const char *pMsg) {
zmq_msg_t msg;
if (pMsg) {
dzlog_debug("Send PPPoE Session: %s\n", pMsg);
zmq_msg_init_size(&msg, strlen(pMsg) + 1);
memcpy(zmq_msg_data(&msg), pMsg, strlen(pMsg));
zmq_msg_send(&msg, g_pDataCh, 0);
zmq_msg_close(&msg);
}
return ERR_SUCCESS;
}
_Noreturn static void mqDataChannelCb(void *pDataCh) {
while (TRUE) {
zmq_msg_t msg;
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, pDataCh, 0) != -1) {
process_data_msg(pDataCh, &msg);
zmq_msg_close(&msg);
}
uv_sleep(10);
}
}
int mq_data_init(DATACHNNELCB dataCb) {
static uv_thread_t uvThread;
void *pContext = zmq_ctx_new();
char buf[1024];
if (pContext == NULL) {
return -ERR_MQ_CREATE_MQ;
}
g_pDataChCb = dataCb;
g_pDataCh = zmq_socket(pContext, ZMQ_REQ);
if (g_pDataCh == NULL) {
zmq_ctx_destroy(g_pDataCh);
return -ERR_MQ_CREATE_REP;
}
memset(buf, 0, 1024);
sprintf(buf, "tcp://127.0.0.1:%d", cfg_get_zero_mq_data_channel());
dzlog_info("Start message queue connect: tcp://127.0.0.1:%d\n", cfg_get_zero_mq_data_channel());
if (zmq_connect(g_pDataCh, buf) != 0) {
zmq_close(g_pDataCh);
zmq_ctx_destroy(g_pDataCh);
return -ERR_MQ_BIND_SOCKET;
}
uv_thread_create(&uvThread, mqDataChannelCb, g_pDataCh);
return ERR_SUCCESS;
}