// // Created by xajhu on 2021/7/15 0015. // #include #include #include #include #include "zlog_module.h" #include "msg_queue.h" #include "config.h" #include "user_errno.h" #ifdef USED_LWIP #include "misc.h" #endif static int g_mqExit = FALSE; static void *g_pContext = NULL; static void *g_pResponse = NULL; static void process_msg(zmq_msg_t *pMsg) { const char *pResp; zmq_msg_t msg; const char *pRecMsg = strdup((const char *)zmq_msg_data(pMsg)); LOG_MOD(info, ZLOG_MOD_MQ, "receive(%zu): %s\n", zmq_msg_size(pMsg), pRecMsg); zmq_msg_close(pMsg); pResp = on_msg_cmd(pRecMsg); if (pResp != NULL && strlen(pResp) > 0) { zmq_msg_init_size(&msg, strlen(pResp)); memcpy(zmq_msg_data(&msg), pResp, strlen(pResp)); zmq_msg_send(&msg, g_pResponse, 0); free((void *)pResp); } else { zmq_msg_init_size(&msg, 5); memcpy(zmq_msg_data(&msg), "error", 5); zmq_msg_send(&msg, g_pResponse, 0); } zmq_msg_close(&msg); free((void *)pRecMsg); } void mqServerCb(void *UNUSED(pArg)) { while (TRUE) { zmq_msg_t msg; zmq_msg_init(&msg); if (zmq_msg_recv(&msg, g_pResponse, 0) != -1) { process_msg(&msg); zmq_msg_close(&msg); #if 0 zmq_msg_close(&msg); zmq_msg_init_size(&msg, 5); memcpy(zmq_msg_data(&msg), "hello", 5); zmq_msg_send(&msg, g_pResponse, 0); #endif } if (g_mqExit) { break; } uv_sleep(10); } zmq_close(g_pResponse); zmq_ctx_destroy(g_pContext); } int mq_init(void) { static uv_thread_t uvThread; char buf[1024]; g_pContext = zmq_ctx_new(); if (g_pContext == NULL) { return -ERR_MQ_CREATE_MQ; } g_pResponse = zmq_socket(g_pContext, ZMQ_REP); if (g_pResponse == NULL) { zmq_ctx_destroy(g_pContext); return -ERR_MQ_CREATE_REP; } memset(buf, 0, 1024); sprintf(buf, "tcp://*:%d", cfg_get_zero_mq_port()); LOG_MOD(info, ZLOG_MOD_MQ, "Start message queue server: tcp://*:%d\n", cfg_get_zero_mq_port()); if (zmq_bind(g_pResponse, buf) != 0) { zmq_close(g_pResponse); zmq_ctx_destroy(g_pContext); return -ERR_MQ_BIND_SOCKET; } uv_thread_create(&uvThread, mqServerCb, NULL); return ERR_SUCCESS; } void *get_mq_context() { return g_pContext; } void mq_uninit(void) { g_mqExit = TRUE; }