// // Created by xajhuang on 2022/6/6. // #include #include #include #include #include #include "msg_queue.h" #include "config.h" #include "s2j/s2j.h" #include "user_errno.h" #ifdef LWIP_ON #include "misc.h" #include "pppoe_info.h" #include "user_info.h" #include "vxlan_pkg.h" #endif #define AGENT_CMD_ADDUSER ("add-ywg-pppoe-vcpe") #define AGENT_CMD_REMOVUSER ("remove-ywg-pppoe-vcpe") static void *g_pDataCh = NULL; typedef struct { const char *entity; const char *message; const char *params; } MQ_DATA_MSG, *PMQ_DATA_MSG; typedef struct { char vsvxlanIP[MAX_IP_V4_STR]; char vsvxlanMac[MAX_MAC_ADDR_STR]; } ADD_USER_VXLAN, *PADD_USER_VXLAN; typedef struct { unsigned int vni; unsigned int userId; unsigned int c_tag_in; unsigned int s_tag_in; char clientMac[MAX_MAC_ADDR_STR]; char pppoeUser[MAX_PATH]; char pppoePass[MAX_PATH]; } ADD_USER_USER, *PADD_USER_USER; typedef struct { ADD_USER_VXLAN vxlan; PADD_USER_USER pUser; unsigned int userCount; } ADD_INFO, *PADD_INFO; typedef struct { PADD_INFO pInfo; unsigned int infoCount; } MQ_DATA_ADD_USER, *PMQ_DATA_ADD_USER; typedef struct { unsigned int *pUsers; unsigned int nCount; } MQ_DATA_REMOVE_USER, *PMQ_DATA_REMOVE_USER; static PMQ_DATA_MSG create_mq_data_msg(PMQ_DATA_MSG *pMsg) { *pMsg = (PMQ_DATA_MSG)malloc(sizeof(MQ_DATA_MSG)); memset(*pMsg, 0, sizeof(MQ_DATA_MSG)); return *pMsg; } static void free_remove_user(PMQ_DATA_REMOVE_USER p) { if (p) { if (p->pUsers) { free(p->pUsers); } free(p); } } static void free_add_user(PMQ_DATA_ADD_USER p) { int i; for (i = 0; i < p->infoCount; i++) { if (p[i].pInfo->pUser) { free(p[i].pInfo->pUser); } } free(p->pInfo); free(p); } static void free_mq_data_msg(PMQ_DATA_MSG pMsg) { if (pMsg->entity) { free((void *)pMsg->entity); } if (pMsg->message) { free((void *)pMsg->message); } if (pMsg->params) { free((void *)pMsg->params); } free(pMsg); } static void *j2s_add_user(cJSON *pJson) { s2j_create_struct_obj(pUser, ADD_USER_USER); s2j_struct_get_basic_element(pUser, pJson, int, vni); s2j_struct_get_basic_element(pUser, pJson, int, userId); s2j_struct_get_basic_element(pUser, pJson, int, c_tag_in); s2j_struct_get_basic_element(pUser, pJson, int, s_tag_in); s2j_struct_get_basic_element(pUser, pJson, string, clientMac); s2j_struct_get_basic_element(pUser, pJson, string, pppoeUser); s2j_struct_get_basic_element(pUser, pJson, string, pppoePass); return pUser; } PMQ_DATA_REMOVE_USER decode_remove_user_msg(const char *pStrJson, PMQ_DATA_REMOVE_USER *pRmUser) { cJSON *pJsonRoot = cJSON_Parse(pStrJson); if (pJsonRoot) { int i; cJSON *pUser = cJSON_GetObjectItem(pJsonRoot, "users"); if (pUser) { int nItems = cJSON_GetArraySize(pUser); if (nItems <= 0) { cJSON_Delete(pJsonRoot); return NULL; } *pRmUser = (PMQ_DATA_REMOVE_USER)malloc(sizeof(MQ_DATA_REMOVE_USER)); if (*pRmUser == NULL) { cJSON_Delete(pJsonRoot); return NULL; } memset(*pRmUser, 0, sizeof(MQ_DATA_REMOVE_USER)); (*pRmUser)->nCount = nItems; (*pRmUser)->pUsers = (unsigned int *)malloc(sizeof(unsigned int) * nItems); if ((*pRmUser)->pUsers == NULL) { cJSON_Delete(pJsonRoot); free(*pRmUser); return NULL; } memset((*pRmUser)->pUsers, 0, sizeof(unsigned int) * nItems); for (i = 0; i < nItems; i++) { cJSON *pcurInfo = cJSON_GetArrayItem(pUser, i); if (pcurInfo) { (*pRmUser)->pUsers[i] = pcurInfo->valueint; } } cJSON_Delete(pJsonRoot); return *pRmUser; } else { cJSON_Delete(pJsonRoot); return NULL; } } return NULL; } PMQ_DATA_ADD_USER decode_add_user_msg(const char *pStrJson, PMQ_DATA_ADD_USER *pAddUser) { cJSON *pJsonRoot = cJSON_Parse(pStrJson); if (pJsonRoot) { int i; cJSON *pvxlanMac; cJSON *pvxlanIp; cJSON *puser; int nItems = cJSON_GetArraySize(pJsonRoot); if (nItems <= 0) { cJSON_Delete(pJsonRoot); return NULL; } *pAddUser = (PMQ_DATA_ADD_USER)malloc(sizeof(MQ_DATA_ADD_USER)); if (*pAddUser == NULL) { cJSON_Delete(pJsonRoot); return NULL; } memset(*pAddUser, 0, sizeof(MQ_DATA_ADD_USER)); (*pAddUser)->infoCount = nItems; (*pAddUser)->pInfo = (PADD_INFO)malloc(sizeof(ADD_INFO) * nItems); if ((*pAddUser)->pInfo == NULL) { cJSON_Delete(pJsonRoot); free(*pAddUser); return NULL; } memset((*pAddUser)->pInfo, 0, sizeof(ADD_INFO) * nItems); for (i = 0; i < nItems; i++) { PADD_INFO pCurrent = &((*pAddUser)->pInfo[i]); cJSON *pcurInfo = cJSON_GetArrayItem(pJsonRoot, i); pvxlanMac = cJSON_GetObjectItem(pcurInfo, "vsvxlanMac"); pvxlanIp = cJSON_GetObjectItem(pcurInfo, "vsvxlanIP"); puser = cJSON_GetObjectItem(pcurInfo, "users"); if (pvxlanMac) { strncpy(pCurrent->vxlan.vsvxlanMac, SAFETY_STR_STRING(pvxlanMac->valuestring), MAX_MAC_ADDR_STR); } if (pvxlanIp) { strncpy(pCurrent->vxlan.vsvxlanIP, SAFETY_STR_STRING(pvxlanIp->valuestring), MAX_IP_V4_STR); } if (puser) { int nUser = cJSON_GetArraySize(puser); if (nUser > 0) { pCurrent->userCount = nUser; pCurrent->pUser = (PADD_USER_USER)malloc(sizeof(ADD_USER_USER) * nUser); if (pCurrent->pUser) { int j; memset(pCurrent->pUser, 0, sizeof(ADD_USER_USER) * nUser); for (j = 0; j < nUser; j++) { PADD_USER_USER p = &pCurrent->pUser[j]; cJSON *pJsonU = cJSON_GetArrayItem(puser, j); PADD_USER_USER pTmp = j2s_add_user(pJsonU); if (pTmp) { memcpy(p, pTmp, sizeof(ADD_USER_USER)); free(pTmp); } } } } } } cJSON_Delete(pJsonRoot); return *pAddUser; } return NULL; } PMQ_DATA_MSG decode_data_msg(const char *pStrJson) { cJSON *pJsonRoot = cJSON_Parse(pStrJson); if (pJsonRoot) { PMQ_DATA_MSG pMsg = NULL; create_mq_data_msg(&pMsg); if (pMsg) { cJSON *pJsonEntity = cJSON_GetObjectItem(pJsonRoot, "entity"); cJSON *pJsonMessage = cJSON_GetObjectItem(pJsonRoot, "message"); cJSON *pJsonParam = cJSON_GetObjectItem(pJsonRoot, "params"); if (pJsonEntity) { pMsg->entity = strdup(SAFETY_STR_STRING(pJsonEntity->valuestring)); } if (pJsonMessage) { pMsg->message = strdup(SAFETY_STR_STRING(pJsonMessage->valuestring)); } if (pJsonParam) { pMsg->params = strdup(SAFETY_STR_STRING(pJsonParam->valuestring)); } } cJSON_Delete(pJsonRoot); return pMsg; } return NULL; } static void process_data_msg(void *UNUSED(pDataCh), zmq_msg_t *pMsg) { PMQ_DATA_MSG pMqMsg; unsigned int size = zmq_msg_size(pMsg) + 1; char *pdata = zmq_msg_data(pMsg); unsigned char *pBuf = (unsigned char *)malloc(size); if (pBuf == NULL) { return; } memset(pBuf, 0, size); memcpy(pBuf, zmq_msg_data(pMsg), size - 1); dzlog_info("receive(%zu): %s\n", strlen(pdata), pBuf); zmq_msg_close(pMsg); pMqMsg = decode_data_msg((const char *)pBuf); if (pMqMsg) { if (strcmp(AGENT_CMD_ADDUSER, pMqMsg->message) == 0) { PMQ_DATA_ADD_USER p = NULL; dzlog_debug("Process: %s\n", pMqMsg->params); decode_add_user_msg(pMqMsg->params, &p); if (p) { #ifdef LWIP_ON int m, n; for (m = 0; m < p->infoCount; m++) { PADD_INFO pInfo = &(p->pInfo[m]); vxlan_peer_add(pInfo->vxlan.vsvxlanIP, pInfo->vxlan.vsvxlanMac); for (n = 0; n < pInfo->userCount; n++) { PADD_USER_USER pUser = &(pInfo->pUser[n]); USER_PARAMS userInfo; memset(&userInfo, 0, sizeof(USER_PARAMS)); userInfo.pppoe_user = pUser->pppoeUser; userInfo.pppoe_passwd = pUser->pppoePass; userInfo.vni = pUser->vni; userInfo.userid = pUser->userId; userInfo.q1 = pUser->c_tag_in; userInfo.q2 = pUser->s_tag_in; str_to_mac(pUser->clientMac, userInfo.mac_addr); user_info_add(userInfo.userid, &userInfo); } } #endif free_add_user(p); } } else if (strcmp(AGENT_CMD_REMOVUSER, pMqMsg->message) == 0) { PMQ_DATA_REMOVE_USER p = NULL; dzlog_debug("Process: %s\n", pMqMsg->params); decode_remove_user_msg(pMqMsg->params, &p); if (p) { #ifdef LWIP_ON int m; for (m = 0; m < p->nCount; m++) { user_info_remove(p->pUsers[m]); } #endif free_remove_user(p); } } free_mq_data_msg(pMqMsg); } free(pBuf); } int mq_data_send_msg(const char *pMsg) { zmq_msg_t msg; if (pMsg) { dzlog_debug("Send PPPoE Session: %s\n", pMsg); zmq_msg_init_size(&msg, strlen(pMsg) + 1); memset(zmq_msg_data(&msg), 0, strlen(pMsg) + 1); memcpy(zmq_msg_data(&msg), pMsg, strlen(pMsg)); zmq_msg_send(&msg, g_pDataCh, 0); zmq_msg_close(&msg); } return ERR_SUCCESS; } _Noreturn static void mqDataChannelCb(void *pDataCh) { while (TRUE) { zmq_msg_t msg; zmq_msg_init(&msg); if (zmq_msg_recv(&msg, pDataCh, 0) != -1) { process_data_msg(pDataCh, &msg); zmq_msg_close(&msg); } uv_sleep(10); } } int mq_data_init() { static uv_thread_t uvThread; void *pContext = zmq_ctx_new(); char buf[1024]; if (pContext == NULL) { return -ERR_MQ_CREATE_MQ; } g_pDataCh = zmq_socket(pContext, ZMQ_PAIR); if (g_pDataCh == NULL) { zmq_ctx_destroy(g_pDataCh); return -ERR_MQ_CREATE_REP; } memset(buf, 0, 1024); sprintf(buf, "%s", cfg_get_zero_mq_data_path()); dzlog_info("Start message queue connect: %s\n", buf); if (zmq_connect(g_pDataCh, buf) != 0) { zmq_close(g_pDataCh); zmq_ctx_destroy(g_pDataCh); return -ERR_MQ_BIND_SOCKET; } uv_thread_create(&uvThread, mqDataChannelCb, g_pDataCh); return ERR_SUCCESS; }