//
// Created by xajhuang on 2022/6/10.
//
#include <uv.h>
#include <zlog.h>
#include <zmq.h>
#include <string.h>
#include <unistd.h>
#include <stdlib.h>
#include "task_manager.h"
#include "user_errno.h"
#include "config.h"
#include "misc.h"
#include "init.h"

static void       *g_pContext  = NULL;
static void       *g_pResponse = NULL;
static const char *g_pSendMsg  = NULL;

static int mq_data_send_msg(const char *pMsg) {
    int       ret = ERR_SUCCESS;
    zmq_msg_t msg;

    if (pMsg) {
        unsigned int size = strlen(pMsg) + 1;
        printf("Send: %s\n", pMsg);
        zmq_msg_init_size(&msg, size);
        memset(zmq_msg_data(&msg), 0, size);
        memcpy(zmq_msg_data(&msg), pMsg, size);
        if (zmq_msg_send(&msg, g_pResponse, ZMQ_DONTWAIT) == -1) {
            perror("zmq_msg_send");
            ret = -ERR_MQ_SEND_MSG;
        }
        zmq_msg_close(&msg);
    }

    return ret;
}

_Noreturn static void mqServerCb(void *UNUSED(pArg)) {
    while (TRUE) {
        zmq_msg_t msg;
        zmq_msg_init(&msg);

        if (zmq_msg_recv(&msg, g_pResponse, ZMQ_DONTWAIT) != -1) {
            printf("Data channel receive(%zu): %s\n", zmq_msg_size(&msg), (const char *)zmq_msg_data(&msg));
            zmq_msg_close(&msg);
#if 0
            zmq_msg_close(&msg);

            zmq_msg_init_size(&msg, 5);

            memcpy(zmq_msg_data(&msg), "hello", 5);

            zmq_msg_send(&msg, g_pResponse, 0);
#endif
        }

        uv_sleep(10);
    }
}

_Noreturn static void mqsendServerCb(void *UNUSED(pArg)) {
    while (TRUE) {
        if (g_pSendMsg) {
            printf("+++++\n");
            if (mq_data_send_msg(g_pSendMsg) == ERR_SUCCESS) {
                free((void *)g_pSendMsg);
                g_pSendMsg = NULL;
            }
            printf("-----\n");
        }

        uv_sleep(3000);
    }
}

static int data_mq_init(void) {
    static uv_thread_t uvThread;
    const char        *mq_name = "ipc:///tmp/msg_fifo0";

    char buf[1024];

    g_pContext = zmq_ctx_new();

    if (g_pContext == NULL) {
        return -ERR_MQ_CREATE_MQ;
    }

    g_pResponse = zmq_socket(g_pContext, ZMQ_PAIR);

    if (g_pResponse == NULL) {
        zmq_ctx_destroy(g_pContext);
        return -ERR_MQ_CREATE_REP;
    }

    //memset(buf, 0, 1024);

    //sprintf(buf, "ipc:///tmp/msg_fifo1");
    printf("Start message data channel server: %s\n", mq_name);

    if (zmq_bind(g_pResponse, mq_name) != 0) {
        perror("zmq_bind");
        zmq_close(g_pResponse);
        zmq_ctx_destroy(g_pContext);
        return -ERR_MQ_CONN_SERVER;
    }

    uv_thread_create(&uvThread, mqServerCb, NULL);
    uv_thread_create(&uvThread, mqsendServerCb, NULL);

    return ERR_SUCCESS;
}

int main(int argc, char **argv) {
    uv_setup_args(argc, argv);

    if (data_mq_init() != ERR_SUCCESS) {
        printf("MQ init error\n");
        return -1;
    }

    if (argc > 1) {
        if (strlen(argv[1]) > 0) {
            g_pSendMsg = strdup(argv[1]);
        }
    }
    task_manager_run();

    return 0;
}