// // Created by xajhuang on 2022/6/6. // #include #include #include #include #include #include "msg_queue.h" #include "config.h" #include "user_errno.h" #include "misc.h" #include "task_manager.h" static void *g_pDataCh = NULL; static DATACHNNELCB g_pDataChCb = NULL; static void process_data_msg(void *pDataCh, zmq_msg_t *pMsg) { const char *pResp; zmq_msg_t msg; const char *pRecMsg = strdup((const char *)zmq_msg_data(pMsg)); dzlog_info("receive(%zu): %s\n", zmq_msg_size(pMsg), pRecMsg); zmq_msg_close(pMsg); if (g_pDataChCb) { pResp = g_pDataChCb(pRecMsg, pDataCh); if (pResp != NULL && strlen(pResp) > 0) { zmq_msg_init_size(&msg, strlen(pResp) + 1); memcpy(zmq_msg_data(&msg), pResp, strlen(pResp)); zmq_msg_send(&msg, pDataCh, 0); free((void *)pResp); } zmq_msg_close(&msg); } else { dzlog_warn("Unhandled message: %s\n", pRecMsg); } free((void *)pRecMsg); } int mq_data_send_msg(const char *pMsg) { zmq_msg_t msg; if (pMsg) { dzlog_debug("Send PPPoE Session: %s\n", pMsg); zmq_msg_init_size(&msg, strlen(pMsg) + 1); memcpy(zmq_msg_data(&msg), pMsg, strlen(pMsg)); zmq_msg_send(&msg, g_pDataCh, 0); zmq_msg_close(&msg); } return ERR_SUCCESS; } _Noreturn static void mqDataChannelCb(void *pDataCh) { while (TRUE) { zmq_msg_t msg; zmq_msg_init(&msg); if (zmq_msg_recv(&msg, pDataCh, 0) != -1) { process_data_msg(pDataCh, &msg); zmq_msg_close(&msg); } uv_sleep(10); } } int mq_data_init(DATACHNNELCB dataCb) { static uv_thread_t uvThread; void *pContext = zmq_ctx_new(); char buf[1024]; if (pContext == NULL) { return -ERR_MQ_CREATE_MQ; } g_pDataChCb = dataCb; g_pDataCh = zmq_socket(pContext, ZMQ_PAIR); if (g_pDataCh == NULL) { zmq_ctx_destroy(g_pDataCh); return -ERR_MQ_CREATE_REP; } memset(buf, 0, 1024); sprintf(buf, "%s", cfg_get_zero_mq_data_path()); dzlog_info("Start message queue connect: %s\n", cfg_get_zero_mq_data_path()); if (zmq_connect(g_pDataCh, buf) != 0) { zmq_close(g_pDataCh); zmq_ctx_destroy(g_pDataCh); return -ERR_MQ_BIND_SOCKET; } uv_thread_create(&uvThread, mqDataChannelCb, g_pDataCh); return ERR_SUCCESS; }