vcpe/srcs/libs/mq/mq_data.c

421 lines
12 KiB
C

//
// Created by xajhuang on 2022/6/6.
//
#include <stdlib.h>
#include <zmq.h>
#include <uv.h>
#include <string.h>
#include "msg_queue.h"
#include "config.h"
#include "s2j/s2j.h"
#include "user_errno.h"
#include "zlog_module.h"
#ifdef LWIP_ON
#include "misc.h"
#include "pppoe_info.h"
#include "user_info.h"
#include "vxlan_pkg.h"
#endif
#define AGENT_CMD_ADDUSER ("add-ywg-pppoe-vcpe")
#define AGENT_CMD_REMOVUSER ("remove-ywg-pppoe-vcpe")
static void *g_pDataCh = NULL;
typedef struct {
const char *entity;
const char *message;
const char *params;
} MQ_DATA_MSG, *PMQ_DATA_MSG;
typedef struct {
char vsvxlanIP[MAX_IP_V4_STR];
char vsvxlanMac[MAX_MAC_ADDR_STR];
} ADD_USER_VXLAN, *PADD_USER_VXLAN;
typedef struct {
unsigned int vni;
unsigned int userId;
unsigned int c_tag_in;
unsigned int s_tag_in;
char clientMac[MAX_MAC_ADDR_STR];
char pppoeUser[MAX_PATH];
char pppoePass[MAX_PATH];
} ADD_USER_USER, *PADD_USER_USER;
typedef struct {
ADD_USER_VXLAN vxlan;
PADD_USER_USER pUser;
unsigned int userCount;
} ADD_INFO, *PADD_INFO;
typedef struct {
PADD_INFO pInfo;
unsigned int infoCount;
} MQ_DATA_ADD_USER, *PMQ_DATA_ADD_USER;
typedef struct {
unsigned int *pUsers;
unsigned int nCount;
} MQ_DATA_REMOVE_USER, *PMQ_DATA_REMOVE_USER;
static PMQ_DATA_MSG create_mq_data_msg(PMQ_DATA_MSG *pMsg) {
*pMsg = (PMQ_DATA_MSG)malloc(sizeof(MQ_DATA_MSG));
memset(*pMsg, 0, sizeof(MQ_DATA_MSG));
return *pMsg;
}
static void free_remove_user(PMQ_DATA_REMOVE_USER p) {
if (p) {
if (p->pUsers) {
free(p->pUsers);
}
free(p);
}
}
static void free_add_user(PMQ_DATA_ADD_USER p) {
int i;
for (i = 0; i < p->infoCount; i++) {
if (p[i].pInfo->pUser) {
free(p[i].pInfo->pUser);
}
}
free(p->pInfo);
free(p);
}
static void free_mq_data_msg(PMQ_DATA_MSG pMsg) {
if (pMsg->entity) {
free((void *)pMsg->entity);
}
if (pMsg->message) {
free((void *)pMsg->message);
}
if (pMsg->params) {
free((void *)pMsg->params);
}
free(pMsg);
}
static void *j2s_add_user(cJSON *pJson) {
s2j_create_struct_obj(pUser, ADD_USER_USER);
s2j_struct_get_basic_element(pUser, pJson, int, vni);
s2j_struct_get_basic_element(pUser, pJson, int, userId);
s2j_struct_get_basic_element(pUser, pJson, int, c_tag_in);
s2j_struct_get_basic_element(pUser, pJson, int, s_tag_in);
s2j_struct_get_basic_element(pUser, pJson, string, clientMac);
s2j_struct_get_basic_element(pUser, pJson, string, pppoeUser);
s2j_struct_get_basic_element(pUser, pJson, string, pppoePass);
return pUser;
}
PMQ_DATA_REMOVE_USER decode_remove_user_msg(const char *pStrJson, PMQ_DATA_REMOVE_USER *pRmUser) {
cJSON *pJsonRoot = cJSON_Parse(pStrJson);
if (pJsonRoot) {
int i;
cJSON *pUser = cJSON_GetObjectItem(pJsonRoot, "users");
if (pUser) {
int nItems = cJSON_GetArraySize(pUser);
if (nItems <= 0) {
cJSON_Delete(pJsonRoot);
return NULL;
}
*pRmUser = (PMQ_DATA_REMOVE_USER)malloc(sizeof(MQ_DATA_REMOVE_USER));
if (*pRmUser == NULL) {
cJSON_Delete(pJsonRoot);
return NULL;
}
memset(*pRmUser, 0, sizeof(MQ_DATA_REMOVE_USER));
(*pRmUser)->nCount = nItems;
(*pRmUser)->pUsers = (unsigned int *)malloc(sizeof(unsigned int) * nItems);
if ((*pRmUser)->pUsers == NULL) {
cJSON_Delete(pJsonRoot);
free(*pRmUser);
return NULL;
}
memset((*pRmUser)->pUsers, 0, sizeof(unsigned int) * nItems);
for (i = 0; i < nItems; i++) {
cJSON *pcurInfo = cJSON_GetArrayItem(pUser, i);
if (pcurInfo) {
(*pRmUser)->pUsers[i] = pcurInfo->valueint;
}
}
cJSON_Delete(pJsonRoot);
return *pRmUser;
} else {
cJSON_Delete(pJsonRoot);
return NULL;
}
}
return NULL;
}
PMQ_DATA_ADD_USER decode_add_user_msg(const char *pStrJson, PMQ_DATA_ADD_USER *pAddUser) {
cJSON *pJsonRoot = cJSON_Parse(pStrJson);
if (pJsonRoot) {
int i;
cJSON *pvxlanMac;
cJSON *pvxlanIp;
cJSON *puser;
int nItems = cJSON_GetArraySize(pJsonRoot);
if (nItems <= 0) {
cJSON_Delete(pJsonRoot);
return NULL;
}
*pAddUser = (PMQ_DATA_ADD_USER)malloc(sizeof(MQ_DATA_ADD_USER));
if (*pAddUser == NULL) {
cJSON_Delete(pJsonRoot);
return NULL;
}
memset(*pAddUser, 0, sizeof(MQ_DATA_ADD_USER));
(*pAddUser)->infoCount = nItems;
(*pAddUser)->pInfo = (PADD_INFO)malloc(sizeof(ADD_INFO) * nItems);
if ((*pAddUser)->pInfo == NULL) {
cJSON_Delete(pJsonRoot);
free(*pAddUser);
return NULL;
}
memset((*pAddUser)->pInfo, 0, sizeof(ADD_INFO) * nItems);
for (i = 0; i < nItems; i++) {
PADD_INFO pCurrent = &((*pAddUser)->pInfo[i]);
cJSON *pcurInfo = cJSON_GetArrayItem(pJsonRoot, i);
pvxlanMac = cJSON_GetObjectItem(pcurInfo, "vsvxlanMac");
pvxlanIp = cJSON_GetObjectItem(pcurInfo, "vsvxlanIP");
puser = cJSON_GetObjectItem(pcurInfo, "users");
if (pvxlanMac) {
strncpy(pCurrent->vxlan.vsvxlanMac, SAFETY_STR_STRING(pvxlanMac->valuestring), MAX_MAC_ADDR_STR);
}
if (pvxlanIp) {
strncpy(pCurrent->vxlan.vsvxlanIP, SAFETY_STR_STRING(pvxlanIp->valuestring), MAX_IP_V4_STR);
}
if (puser) {
int nUser = cJSON_GetArraySize(puser);
if (nUser > 0) {
pCurrent->userCount = nUser;
pCurrent->pUser = (PADD_USER_USER)malloc(sizeof(ADD_USER_USER) * nUser);
if (pCurrent->pUser) {
int j;
memset(pCurrent->pUser, 0, sizeof(ADD_USER_USER) * nUser);
for (j = 0; j < nUser; j++) {
PADD_USER_USER p = &pCurrent->pUser[j];
cJSON *pJsonU = cJSON_GetArrayItem(puser, j);
PADD_USER_USER pTmp = j2s_add_user(pJsonU);
if (pTmp) {
memcpy(p, pTmp, sizeof(ADD_USER_USER));
free(pTmp);
}
}
}
}
}
}
cJSON_Delete(pJsonRoot);
return *pAddUser;
}
return NULL;
}
PMQ_DATA_MSG decode_data_msg(const char *pStrJson) {
cJSON *pJsonRoot = cJSON_Parse(pStrJson);
if (pJsonRoot) {
PMQ_DATA_MSG pMsg = NULL;
create_mq_data_msg(&pMsg);
if (pMsg) {
cJSON *pJsonEntity = cJSON_GetObjectItem(pJsonRoot, "entity");
cJSON *pJsonMessage = cJSON_GetObjectItem(pJsonRoot, "message");
cJSON *pJsonParam = cJSON_GetObjectItem(pJsonRoot, "params");
if (pJsonEntity) {
pMsg->entity = strdup(SAFETY_STR_STRING(pJsonEntity->valuestring));
}
if (pJsonMessage) {
pMsg->message = strdup(SAFETY_STR_STRING(pJsonMessage->valuestring));
}
if (pJsonParam) {
pMsg->params = strdup(SAFETY_STR_STRING(pJsonParam->valuestring));
}
}
cJSON_Delete(pJsonRoot);
return pMsg;
}
return NULL;
}
static void process_data_msg(void *UNUSED(pDataCh), zmq_msg_t *pMsg) {
PMQ_DATA_MSG pMqMsg;
unsigned int size = zmq_msg_size(pMsg) + 1;
char *pdata = zmq_msg_data(pMsg);
unsigned char *pBuf = (unsigned char *)malloc(size);
if (pBuf == NULL) {
return;
}
memset(pBuf, 0, size);
memcpy(pBuf, zmq_msg_data(pMsg), size - 1);
LOG_MSG(info, ZLOG_MOD_MQ, "receive(%zu): %s\n", strlen(pdata), pBuf);
zmq_msg_close(pMsg);
pMqMsg = decode_data_msg((const char *)pBuf);
if (pMqMsg) {
if (strcmp(AGENT_CMD_ADDUSER, pMqMsg->message) == 0) {
PMQ_DATA_ADD_USER p = NULL;
LOG_MSG(debug, ZLOG_MOD_MQ, "Process: %s\n", pMqMsg->params);
decode_add_user_msg(pMqMsg->params, &p);
if (p) {
#ifdef LWIP_ON
int m, n;
for (m = 0; m < p->infoCount; m++) {
PADD_INFO pInfo = &(p->pInfo[m]);
vxlan_peer_add(pInfo->vxlan.vsvxlanIP, pInfo->vxlan.vsvxlanMac);
for (n = 0; n < pInfo->userCount; n++) {
PADD_USER_USER pUser = &(pInfo->pUser[n]);
USER_PARAMS userInfo;
memset(&userInfo, 0, sizeof(USER_PARAMS));
userInfo.pppoe_user = pUser->pppoeUser;
userInfo.pppoe_passwd = pUser->pppoePass;
userInfo.vni = pUser->vni;
userInfo.userid = pUser->userId;
userInfo.q1 = pUser->c_tag_in;
userInfo.q2 = pUser->s_tag_in;
str_to_mac(pUser->clientMac, userInfo.mac_addr);
user_info_add(userInfo.userid, &userInfo);
}
}
#endif
free_add_user(p);
}
} else if (strcmp(AGENT_CMD_REMOVUSER, pMqMsg->message) == 0) {
PMQ_DATA_REMOVE_USER p = NULL;
LOG_MSG(debug, ZLOG_MOD_MQ, "Process: %s\n", pMqMsg->params);
decode_remove_user_msg(pMqMsg->params, &p);
if (p) {
#ifdef LWIP_ON
int m;
for (m = 0; m < p->nCount; m++) {
user_info_remove(p->pUsers[m]);
}
#endif
free_remove_user(p);
}
}
free_mq_data_msg(pMqMsg);
}
free(pBuf);
}
int mq_data_send_msg(const char *pMsg) {
zmq_msg_t msg;
if (pMsg) {
LOG_MSG(debug, ZLOG_MOD_MQ, "Send PPPoE Session: %s\n", pMsg);
zmq_msg_init_size(&msg, strlen(pMsg) + 1);
memset(zmq_msg_data(&msg), 0, strlen(pMsg) + 1);
memcpy(zmq_msg_data(&msg), pMsg, strlen(pMsg));
zmq_msg_send(&msg, g_pDataCh, 0);
zmq_msg_close(&msg);
}
return ERR_SUCCESS;
}
_Noreturn static void mqDataChannelCb(void *pDataCh) {
while (TRUE) {
zmq_msg_t msg;
zmq_msg_init(&msg);
if (zmq_msg_recv(&msg, pDataCh, 0) != -1) {
process_data_msg(pDataCh, &msg);
zmq_msg_close(&msg);
}
uv_sleep(10);
}
}
int mq_data_init() {
static uv_thread_t uvThread;
void *pContext = zmq_ctx_new();
char buf[1024];
if (pContext == NULL) {
return -ERR_MQ_CREATE_MQ;
}
g_pDataCh = zmq_socket(pContext, ZMQ_PAIR);
if (g_pDataCh == NULL) {
zmq_ctx_destroy(g_pDataCh);
return -ERR_MQ_CREATE_REP;
}
memset(buf, 0, 1024);
sprintf(buf, "%s", cfg_get_zero_mq_data_path());
LOG_MSG(info, ZLOG_MOD_MQ, "Start message queue connect: %s\n", buf);
if (zmq_connect(g_pDataCh, buf) != 0) {
zmq_close(g_pDataCh);
zmq_ctx_destroy(g_pDataCh);
return -ERR_MQ_BIND_SOCKET;
}
uv_thread_create(&uvThread, mqDataChannelCb, g_pDataCh);
return ERR_SUCCESS;
}