//
// Created by xajhu on 2021/7/15 0015.
//
#include <stdlib.h>
#include <zmq.h>
#include <uv.h>
#include <string.h>
#include <zlog.h>

#include "msg_queue.h"
#include "config.h"
#include "user_errno.h"
#include "misc.h"

static int   g_mqExit    = FALSE;
static void *g_pContext  = NULL;
static void *g_pResponse = NULL;

static void process_msg(zmq_msg_t *pMsg) {
    const char *pResp;
    zmq_msg_t   msg;
    const char *pRecMsg = strdup((const char *)zmq_msg_data(pMsg));

    dzlog_info("receive(%zu): %s\n", zmq_msg_size(pMsg), pRecMsg);
    zmq_msg_close(pMsg);

    pResp = on_msg_cmd(pRecMsg);

    if (pResp != NULL && strlen(pResp) > 0) {
        zmq_msg_init_size(&msg, strlen(pResp));
        memcpy(zmq_msg_data(&msg), pResp, strlen(pResp));
        zmq_msg_send(&msg, g_pResponse, 0);
        free((void *)pResp);
    } else {
        zmq_msg_init_size(&msg, 5);
        memcpy(zmq_msg_data(&msg), "error", 5);
        zmq_msg_send(&msg, g_pResponse, 0);
    }

    zmq_msg_close(&msg);
    free((void *)pRecMsg);
}

void mqServerCb(void *UNUSED(pArg)) {
    while (TRUE) {
        zmq_msg_t msg;
        zmq_msg_init(&msg);

        if (zmq_msg_recv(&msg, g_pResponse, 0) != -1) {
            process_msg(&msg);
            zmq_msg_close(&msg);
#if 0
            zmq_msg_close(&msg);

            zmq_msg_init_size(&msg, 5);

            memcpy(zmq_msg_data(&msg), "hello", 5);

            zmq_msg_send(&msg, g_pResponse, 0);
#endif
        }

        if (g_mqExit) {
            break;
        }

        uv_sleep(10);
    }

    zmq_close(g_pResponse);
    zmq_ctx_destroy(g_pContext);
}

int mq_init(void) {
    static uv_thread_t uvThread;

    char buf[1024];

    g_pContext = zmq_ctx_new();

    if (g_pContext == NULL) {
        return -ERR_MQ_CREATE_MQ;
    }

    g_pResponse = zmq_socket(g_pContext, ZMQ_REP);

    if (g_pResponse == NULL) {
        zmq_ctx_destroy(g_pContext);
        return -ERR_MQ_CREATE_REP;
    }

    memset(buf, 0, 1024);

    sprintf(buf, "tcp://*:%d", cfg_get_zero_mq_port());
    dzlog_info("Start message queue server: tcp://*:%d\n", cfg_get_zero_mq_port());

    if (zmq_bind(g_pResponse, buf) != 0) {
        zmq_close(g_pResponse);
        zmq_ctx_destroy(g_pContext);
        return -ERR_MQ_BIND_SOCKET;
    }

    uv_thread_create(&uvThread, mqServerCb, NULL);

    return ERR_SUCCESS;
}

void* get_mq_context() {
    return g_pContext;
}

void mq_uninit(void) {
    g_mqExit = TRUE;
}