//
// Created by xajhuang on 2022/6/6.
//
#include <stdlib.h>
#include <zmq.h>
#include <uv.h>
#include <string.h>
#include <zlog.h>

#include "msg_queue.h"
#include "config.h"
#include "s2j/s2j.h"
#include "user_errno.h"

#ifdef LWIP_ON
#include "misc.h"
#include "pppoe_info.h"
#include "user_info.h"
#include "vxlan_pkg.h"
#endif

#define AGENT_CMD_ADDUSER   ("add-ywg-pppoe-vcpe")
#define AGENT_CMD_REMOVUSER ("remove-ywg-pppoe-vcpe")

static void *g_pDataCh = NULL;

typedef struct {
    const char *entity;
    const char *message;
    const char *params;
} MQ_DATA_MSG, *PMQ_DATA_MSG;

typedef struct {
    char vsvxlanIP[MAX_IP_V4_STR];
    char vsvxlanMac[MAX_MAC_ADDR_STR];
} ADD_USER_VXLAN, *PADD_USER_VXLAN;

typedef struct {
    unsigned int vni;
    unsigned int userId;
    unsigned int c_tag_in;
    unsigned int s_tag_in;
    char         clientMac[MAX_MAC_ADDR_STR];
    char         pppoeUser[MAX_PATH];
    char         pppoePass[MAX_PATH];
} ADD_USER_USER, *PADD_USER_USER;

typedef struct {
    ADD_USER_VXLAN vxlan;
    PADD_USER_USER pUser;
    unsigned int   userCount;
} ADD_INFO, *PADD_INFO;

typedef struct {
    PADD_INFO    pInfo;
    unsigned int infoCount;
} MQ_DATA_ADD_USER, *PMQ_DATA_ADD_USER;

typedef struct {
    unsigned int *pUsers;
    unsigned int  nCount;
} MQ_DATA_REMOVE_USER, *PMQ_DATA_REMOVE_USER;

static PMQ_DATA_MSG create_mq_data_msg(PMQ_DATA_MSG *pMsg) {
    *pMsg = (PMQ_DATA_MSG)malloc(sizeof(MQ_DATA_MSG));
    memset(*pMsg, 0, sizeof(MQ_DATA_MSG));
    return *pMsg;
}

static void free_remove_user(PMQ_DATA_REMOVE_USER p) {
    if (p) {
        if (p->pUsers) {
            free(p->pUsers);
        }

        free(p);
    }
}

static void free_add_user(PMQ_DATA_ADD_USER p) {
    int i;

    for (i = 0; i < p->infoCount; i++) {
        if (p[i].pInfo->pUser) {
            free(p[i].pInfo->pUser);
        }
    }

    free(p->pInfo);
    free(p);
}

static void free_mq_data_msg(PMQ_DATA_MSG pMsg) {
    if (pMsg->entity) {
        free((void *)pMsg->entity);
    }
    if (pMsg->message) {
        free((void *)pMsg->message);
    }
    if (pMsg->params) {
        free((void *)pMsg->params);
    }

    free(pMsg);
}

static void *j2s_add_user(cJSON *pJson) {
    s2j_create_struct_obj(pUser, ADD_USER_USER);
    s2j_struct_get_basic_element(pUser, pJson, int, vni);
    s2j_struct_get_basic_element(pUser, pJson, int, userId);
    s2j_struct_get_basic_element(pUser, pJson, int, c_tag_in);
    s2j_struct_get_basic_element(pUser, pJson, int, s_tag_in);
    s2j_struct_get_basic_element(pUser, pJson, string, clientMac);
    s2j_struct_get_basic_element(pUser, pJson, string, pppoeUser);
    s2j_struct_get_basic_element(pUser, pJson, string, pppoePass);
    return pUser;
}

PMQ_DATA_REMOVE_USER decode_remove_user_msg(const char *pStrJson, PMQ_DATA_REMOVE_USER *pRmUser) {
    cJSON *pJsonRoot = cJSON_Parse(pStrJson);
    if (pJsonRoot) {
        int    i;
        cJSON *pUser = cJSON_GetObjectItem(pJsonRoot, "users");

        if (pUser) {
            int nItems = cJSON_GetArraySize(pUser);

            if (nItems <= 0) {
                cJSON_Delete(pJsonRoot);
                return NULL;
            }

            *pRmUser = (PMQ_DATA_REMOVE_USER)malloc(sizeof(MQ_DATA_REMOVE_USER));

            if (*pRmUser == NULL) {
                cJSON_Delete(pJsonRoot);
                return NULL;
            }

            memset(*pRmUser, 0, sizeof(MQ_DATA_REMOVE_USER));
            (*pRmUser)->nCount = nItems;
            (*pRmUser)->pUsers = (unsigned int *)malloc(sizeof(unsigned int) * nItems);

            if ((*pRmUser)->pUsers == NULL) {
                cJSON_Delete(pJsonRoot);
                free(*pRmUser);
                return NULL;
            }

            memset((*pRmUser)->pUsers, 0, sizeof(unsigned int) * nItems);

            for (i = 0; i < nItems; i++) {
                cJSON *pcurInfo = cJSON_GetArrayItem(pUser, i);

                if (pcurInfo) {
                    (*pRmUser)->pUsers[i] = pcurInfo->valueint;
                }
            }

            cJSON_Delete(pJsonRoot);
            return *pRmUser;
        } else {
            cJSON_Delete(pJsonRoot);
            return NULL;
        }
    }

    return NULL;
}

PMQ_DATA_ADD_USER decode_add_user_msg(const char *pStrJson, PMQ_DATA_ADD_USER *pAddUser) {
    cJSON *pJsonRoot = cJSON_Parse(pStrJson);

    if (pJsonRoot) {
        int i;

        cJSON *pvxlanMac;
        cJSON *pvxlanIp;
        cJSON *puser;

        int nItems = cJSON_GetArraySize(pJsonRoot);

        if (nItems <= 0) {
            cJSON_Delete(pJsonRoot);
            return NULL;
        }

        *pAddUser = (PMQ_DATA_ADD_USER)malloc(sizeof(MQ_DATA_ADD_USER));

        if (*pAddUser == NULL) {
            cJSON_Delete(pJsonRoot);
            return NULL;
        }

        memset(*pAddUser, 0, sizeof(MQ_DATA_ADD_USER));

        (*pAddUser)->infoCount = nItems;
        (*pAddUser)->pInfo     = (PADD_INFO)malloc(sizeof(ADD_INFO) * nItems);

        if ((*pAddUser)->pInfo == NULL) {
            cJSON_Delete(pJsonRoot);
            free(*pAddUser);
            return NULL;
        }

        memset((*pAddUser)->pInfo, 0, sizeof(ADD_INFO) * nItems);

        for (i = 0; i < nItems; i++) {
            PADD_INFO pCurrent = &((*pAddUser)->pInfo[i]);
            cJSON    *pcurInfo = cJSON_GetArrayItem(pJsonRoot, i);

            pvxlanMac = cJSON_GetObjectItem(pcurInfo, "vsvxlanMac");
            pvxlanIp  = cJSON_GetObjectItem(pcurInfo, "vsvxlanIP");
            puser     = cJSON_GetObjectItem(pcurInfo, "users");

            if (pvxlanMac) {
                strncpy(pCurrent->vxlan.vsvxlanMac, SAFETY_STR_STRING(pvxlanMac->valuestring), MAX_MAC_ADDR_STR);
            }

            if (pvxlanIp) {
                strncpy(pCurrent->vxlan.vsvxlanIP, SAFETY_STR_STRING(pvxlanIp->valuestring), MAX_IP_V4_STR);
            }

            if (puser) {
                int nUser = cJSON_GetArraySize(puser);

                if (nUser > 0) {
                    pCurrent->userCount = nUser;
                    pCurrent->pUser     = (PADD_USER_USER)malloc(sizeof(ADD_USER_USER) * nUser);

                    if (pCurrent->pUser) {
                        int j;
                        memset(pCurrent->pUser, 0, sizeof(ADD_USER_USER) * nUser);
                        for (j = 0; j < nUser; j++) {
                            PADD_USER_USER p      = &pCurrent->pUser[j];
                            cJSON         *pJsonU = cJSON_GetArrayItem(puser, j);
                            PADD_USER_USER pTmp   = j2s_add_user(pJsonU);

                            if (pTmp) {
                                memcpy(p, pTmp, sizeof(ADD_USER_USER));
                                free(pTmp);
                            }
                        }
                    }
                }
            }
        }

        cJSON_Delete(pJsonRoot);

        return *pAddUser;
    }

    return NULL;
}

PMQ_DATA_MSG decode_data_msg(const char *pStrJson) {
    cJSON *pJsonRoot = cJSON_Parse(pStrJson);

    if (pJsonRoot) {
        PMQ_DATA_MSG pMsg = NULL;

        create_mq_data_msg(&pMsg);

        if (pMsg) {
            cJSON *pJsonEntity  = cJSON_GetObjectItem(pJsonRoot, "entity");
            cJSON *pJsonMessage = cJSON_GetObjectItem(pJsonRoot, "message");
            cJSON *pJsonParam   = cJSON_GetObjectItem(pJsonRoot, "params");

            if (pJsonEntity) {
                pMsg->entity = strdup(SAFETY_STR_STRING(pJsonEntity->valuestring));
            }

            if (pJsonMessage) {
                pMsg->message = strdup(SAFETY_STR_STRING(pJsonMessage->valuestring));
            }

            if (pJsonParam) {
                pMsg->params = strdup(SAFETY_STR_STRING(pJsonParam->valuestring));
            }
        }

        cJSON_Delete(pJsonRoot);

        return pMsg;
    }

    return NULL;
}

static void process_data_msg(void *UNUSED(pDataCh), zmq_msg_t *pMsg) {
    PMQ_DATA_MSG   pMqMsg;
    unsigned int   size  = zmq_msg_size(pMsg) + 1;
    char          *pdata = zmq_msg_data(pMsg);
    unsigned char *pBuf  = (unsigned char *)malloc(size);

    if (pBuf == NULL) {
        return;
    }

    memset(pBuf, 0, size);
    memcpy(pBuf, zmq_msg_data(pMsg), size - 1);
    dzlog_info("receive(%zu): %s\n", strlen(pdata), pBuf);

    zmq_msg_close(pMsg);

    pMqMsg = decode_data_msg((const char *)pBuf);

    if (pMqMsg) {
        if (strcmp(AGENT_CMD_ADDUSER, pMqMsg->message) == 0) {
            PMQ_DATA_ADD_USER p = NULL;
            dzlog_debug("Process: %s\n", pMqMsg->params);
            decode_add_user_msg(pMqMsg->params, &p);
            if (p) {
#ifdef LWIP_ON
                int m, n;
                for (m = 0; m < p->infoCount; m++) {
                    PADD_INFO pInfo = &(p->pInfo[m]);
                    vxlan_peer_add(pInfo->vxlan.vsvxlanIP, pInfo->vxlan.vsvxlanMac);
                    for (n = 0; n < pInfo->userCount; n++) {
                        PADD_USER_USER pUser = &(pInfo->pUser[n]);
                        USER_PARAMS    userInfo;

                        memset(&userInfo, 0, sizeof(USER_PARAMS));

                        userInfo.pppoe_user   = pUser->pppoeUser;
                        userInfo.pppoe_passwd = pUser->pppoePass;
                        userInfo.vni          = pUser->vni;
                        userInfo.userid       = pUser->userId;
                        userInfo.q1           = pUser->c_tag_in;
                        userInfo.q2           = pUser->s_tag_in;
                        str_to_mac(pUser->clientMac, userInfo.mac_addr);
                        user_info_add(userInfo.userid, &userInfo);
                    }
                }
#endif
                free_add_user(p);
            }
        } else if (strcmp(AGENT_CMD_REMOVUSER, pMqMsg->message) == 0) {
            PMQ_DATA_REMOVE_USER p = NULL;
            dzlog_debug("Process: %s\n", pMqMsg->params);

            decode_remove_user_msg(pMqMsg->params, &p);

            if (p) {
#ifdef LWIP_ON
                int m;
                for (m = 0; m < p->nCount; m++) {
                    user_info_remove(p->pUsers[m]);
                }
#endif
                free_remove_user(p);
            }
        }
        free_mq_data_msg(pMqMsg);
    }

    free(pBuf);
}

int mq_data_send_msg(const char *pMsg) {
    zmq_msg_t msg;

    if (pMsg) {
        dzlog_debug("Send PPPoE Session: %s\n", pMsg);
        zmq_msg_init_size(&msg, strlen(pMsg) + 1);
        memset(zmq_msg_data(&msg), 0, strlen(pMsg) + 1);
        memcpy(zmq_msg_data(&msg), pMsg, strlen(pMsg));
        zmq_msg_send(&msg, g_pDataCh, 0);
        zmq_msg_close(&msg);
    }

    return ERR_SUCCESS;
}

_Noreturn static void mqDataChannelCb(void *pDataCh) {
    while (TRUE) {
        zmq_msg_t msg;
        zmq_msg_init(&msg);

        if (zmq_msg_recv(&msg, pDataCh, 0) != -1) {
            process_data_msg(pDataCh, &msg);
            zmq_msg_close(&msg);
        }

        uv_sleep(10);
    }
}

int mq_data_init() {
    static uv_thread_t uvThread;
    void              *pContext = zmq_ctx_new();

    char buf[1024];

    if (pContext == NULL) {
        return -ERR_MQ_CREATE_MQ;
    }

    g_pDataCh = zmq_socket(pContext, ZMQ_PAIR);

    if (g_pDataCh == NULL) {
        zmq_ctx_destroy(g_pDataCh);
        return -ERR_MQ_CREATE_REP;
    }

    memset(buf, 0, 1024);

    sprintf(buf, "%s", cfg_get_zero_mq_data_path());
    dzlog_info("Start message queue connect: %s\n", buf);

    if (zmq_connect(g_pDataCh, buf) != 0) {
        zmq_close(g_pDataCh);
        zmq_ctx_destroy(g_pDataCh);
        return -ERR_MQ_BIND_SOCKET;
    }

    uv_thread_create(&uvThread, mqDataChannelCb, g_pDataCh);

    return ERR_SUCCESS;
}