// // Created by xajhuang on 2022/6/10. // #include #include #include #include #include #include "task_manager.h" #include "user_errno.h" #include "config.h" #include "misc.h" #include "init.h" static void *g_pContext = NULL; static void *g_pResponse = NULL; static const char *g_pSendMsg = NULL; static int mq_data_send_msg(const char *pMsg) { int ret = ERR_SUCCESS; zmq_msg_t msg; if (pMsg) { unsigned int size = strlen(pMsg) + 1; printf("Send: %s\n", pMsg); zmq_msg_init_size(&msg, size); memset(zmq_msg_data(&msg), 0, size); memcpy(zmq_msg_data(&msg), pMsg, size); if (zmq_msg_send(&msg, g_pResponse, ZMQ_DONTWAIT) == -1) { perror("zmq_msg_send"); ret = -ERR_MQ_SEND_MSG; } zmq_msg_close(&msg); } return ret; } _Noreturn static void mqServerCb(void *UNUSED(pArg)) { while (TRUE) { zmq_msg_t msg; zmq_msg_init(&msg); if (zmq_msg_recv(&msg, g_pResponse, ZMQ_DONTWAIT) != -1) { printf("Data channel receive(%zu): %s\n", zmq_msg_size(&msg), (const char *)zmq_msg_data(&msg)); zmq_msg_close(&msg); #if 0 zmq_msg_close(&msg); zmq_msg_init_size(&msg, 5); memcpy(zmq_msg_data(&msg), "hello", 5); zmq_msg_send(&msg, g_pResponse, 0); #endif } uv_sleep(10); } } _Noreturn static void mqsendServerCb(void *UNUSED(pArg)) { while (TRUE) { if (g_pSendMsg) { printf("+++++\n"); if (mq_data_send_msg(g_pSendMsg) == ERR_SUCCESS) { free((void *)g_pSendMsg); g_pSendMsg = NULL; } printf("-----\n"); } uv_sleep(3000); } } static int data_mq_init(void) { static uv_thread_t uvThread; const char *mq_name = "ipc:///tmp/msg_fifo0"; char buf[1024]; g_pContext = zmq_ctx_new(); if (g_pContext == NULL) { return -ERR_MQ_CREATE_MQ; } g_pResponse = zmq_socket(g_pContext, ZMQ_PAIR); if (g_pResponse == NULL) { zmq_ctx_destroy(g_pContext); return -ERR_MQ_CREATE_REP; } //memset(buf, 0, 1024); //sprintf(buf, "ipc:///tmp/msg_fifo1"); printf("Start message data channel server: %s\n", mq_name); if (zmq_bind(g_pResponse, mq_name) != 0) { perror("zmq_bind"); zmq_close(g_pResponse); zmq_ctx_destroy(g_pContext); return -ERR_MQ_CONN_SERVER; } uv_thread_create(&uvThread, mqServerCb, NULL); uv_thread_create(&uvThread, mqsendServerCb, NULL); return ERR_SUCCESS; } int main(int argc, char **argv) { uv_setup_args(argc, argv); if (data_mq_init() != ERR_SUCCESS) { printf("MQ init error\n"); return -1; } if (argc > 1) { if (strlen(argv[1]) > 0) { g_pSendMsg = strdup(argv[1]); } } task_manager_run(); return 0; }