418 lines
11 KiB
C
418 lines
11 KiB
C
//
|
|
// Created by xajhuang on 2022/6/6.
|
|
//
|
|
#include <stdlib.h>
|
|
#include <zmq.h>
|
|
#include <uv.h>
|
|
#include <string.h>
|
|
#include <zlog.h>
|
|
|
|
#include "msg_queue.h"
|
|
#include "config.h"
|
|
#include "misc.h"
|
|
#include "s2j/s2j.h"
|
|
#include "pppoe_info.h"
|
|
#include "user_errno.h"
|
|
#include "user_info.h"
|
|
#include "vxlan_pkg.h"
|
|
|
|
#define AGENT_CMD_ADDUSER ("add-ywg-pppoe-vcpe")
|
|
#define AGENT_CMD_REMOVUSER ("remove-ywg-pppoe-vcpe")
|
|
|
|
static void *g_pDataCh = NULL;
|
|
|
|
typedef struct {
|
|
const char *entity;
|
|
const char *message;
|
|
const char *params;
|
|
} MQ_DATA_MSG, *PMQ_DATA_MSG;
|
|
|
|
typedef struct {
|
|
char vsvxlanIP[MAX_IP_V4_STR];
|
|
char vsvxlanMac[MAX_MAC_ADDR_STR];
|
|
} ADD_USER_VXLAN, *PADD_USER_VXLAN;
|
|
|
|
typedef struct {
|
|
unsigned int vni;
|
|
unsigned int userId;
|
|
unsigned int c_tag_in;
|
|
unsigned int s_tag_in;
|
|
char clientMac[MAX_MAC_ADDR_STR];
|
|
char pppoeUser[MAX_PATH];
|
|
char pppoePass[MAX_PATH];
|
|
} ADD_USER_USER, *PADD_USER_USER;
|
|
|
|
typedef struct {
|
|
ADD_USER_VXLAN vxlan;
|
|
PADD_USER_USER pUser;
|
|
unsigned int userCount;
|
|
} ADD_INFO, *PADD_INFO;
|
|
|
|
typedef struct {
|
|
PADD_INFO pInfo;
|
|
unsigned int infoCount;
|
|
} MQ_DATA_ADD_USER, *PMQ_DATA_ADD_USER;
|
|
|
|
typedef struct {
|
|
unsigned int *pUsers;
|
|
unsigned int nCount;
|
|
} MQ_DATA_REMOVE_USER, *PMQ_DATA_REMOVE_USER;
|
|
|
|
static PMQ_DATA_MSG create_mq_data_msg(PMQ_DATA_MSG *pMsg) {
|
|
*pMsg = (PMQ_DATA_MSG)malloc(sizeof(MQ_DATA_MSG));
|
|
memset(*pMsg, 0, sizeof(MQ_DATA_MSG));
|
|
return *pMsg;
|
|
}
|
|
|
|
static void free_remove_user(PMQ_DATA_REMOVE_USER p) {
|
|
if (p) {
|
|
if (p->pUsers) {
|
|
free(p->pUsers);
|
|
}
|
|
|
|
free(p);
|
|
}
|
|
}
|
|
|
|
static void free_add_user(PMQ_DATA_ADD_USER p) {
|
|
int i;
|
|
|
|
for (i = 0; i < p->infoCount; i++) {
|
|
if (p[i].pInfo->pUser) {
|
|
free(p[i].pInfo->pUser);
|
|
}
|
|
}
|
|
|
|
free(p->pInfo);
|
|
free(p);
|
|
}
|
|
|
|
static void free_mq_data_msg(PMQ_DATA_MSG pMsg) {
|
|
if (pMsg->entity) {
|
|
free((void *)pMsg->entity);
|
|
}
|
|
if (pMsg->message) {
|
|
free((void *)pMsg->message);
|
|
}
|
|
if (pMsg->params) {
|
|
free((void *)pMsg->params);
|
|
}
|
|
|
|
free(pMsg);
|
|
}
|
|
|
|
static void *j2s_add_user(cJSON *pJson) {
|
|
s2j_create_struct_obj(pUser, ADD_USER_USER);
|
|
s2j_struct_get_basic_element(pUser, pJson, int, vni);
|
|
s2j_struct_get_basic_element(pUser, pJson, int, userId);
|
|
s2j_struct_get_basic_element(pUser, pJson, int, c_tag_in);
|
|
s2j_struct_get_basic_element(pUser, pJson, int, s_tag_in);
|
|
s2j_struct_get_basic_element(pUser, pJson, string, clientMac);
|
|
s2j_struct_get_basic_element(pUser, pJson, string, pppoeUser);
|
|
s2j_struct_get_basic_element(pUser, pJson, string, pppoePass);
|
|
return pUser;
|
|
}
|
|
|
|
PMQ_DATA_REMOVE_USER decode_remove_user_msg(const char *pStrJson, PMQ_DATA_REMOVE_USER *pRmUser) {
|
|
cJSON *pJsonRoot = cJSON_Parse(pStrJson);
|
|
if (pJsonRoot) {
|
|
int i;
|
|
cJSON *pUser = cJSON_GetObjectItem(pJsonRoot, "users");
|
|
|
|
if (pUser) {
|
|
int nItems = cJSON_GetArraySize(pUser);
|
|
|
|
if (nItems <= 0) {
|
|
cJSON_Delete(pJsonRoot);
|
|
return NULL;
|
|
}
|
|
|
|
*pRmUser = (PMQ_DATA_REMOVE_USER)malloc(sizeof(MQ_DATA_REMOVE_USER));
|
|
|
|
if (*pRmUser == NULL) {
|
|
cJSON_Delete(pJsonRoot);
|
|
return NULL;
|
|
}
|
|
|
|
memset(*pRmUser, 0, sizeof(MQ_DATA_REMOVE_USER));
|
|
(*pRmUser)->nCount = nItems;
|
|
(*pRmUser)->pUsers = (unsigned int *)malloc(sizeof(unsigned int) * nItems);
|
|
|
|
if ((*pRmUser)->pUsers == NULL) {
|
|
cJSON_Delete(pJsonRoot);
|
|
free(*pRmUser);
|
|
return NULL;
|
|
}
|
|
|
|
memset((*pRmUser)->pUsers, 0, sizeof(unsigned int) * nItems);
|
|
|
|
for (i = 0; i < nItems; i++) {
|
|
cJSON *pcurInfo = cJSON_GetArrayItem(pUser, i);
|
|
|
|
if (pcurInfo) {
|
|
(*pRmUser)->pUsers[i] = pcurInfo->valueint;
|
|
}
|
|
}
|
|
|
|
cJSON_Delete(pJsonRoot);
|
|
return *pRmUser;
|
|
} else {
|
|
cJSON_Delete(pJsonRoot);
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
PMQ_DATA_ADD_USER decode_add_user_msg(const char *pStrJson, PMQ_DATA_ADD_USER *pAddUser) {
|
|
cJSON *pJsonRoot = cJSON_Parse(pStrJson);
|
|
|
|
if (pJsonRoot) {
|
|
int i;
|
|
|
|
cJSON *pvxlanMac;
|
|
cJSON *pvxlanIp;
|
|
cJSON *puser;
|
|
|
|
int nItems = cJSON_GetArraySize(pJsonRoot);
|
|
|
|
if (nItems <= 0) {
|
|
cJSON_Delete(pJsonRoot);
|
|
return NULL;
|
|
}
|
|
|
|
*pAddUser = (PMQ_DATA_ADD_USER)malloc(sizeof(MQ_DATA_ADD_USER));
|
|
|
|
if (*pAddUser == NULL) {
|
|
cJSON_Delete(pJsonRoot);
|
|
return NULL;
|
|
}
|
|
|
|
memset(*pAddUser, 0, sizeof(MQ_DATA_ADD_USER));
|
|
|
|
(*pAddUser)->infoCount = nItems;
|
|
(*pAddUser)->pInfo = (PADD_INFO)malloc(sizeof(ADD_INFO) * nItems);
|
|
|
|
if ((*pAddUser)->pInfo == NULL) {
|
|
cJSON_Delete(pJsonRoot);
|
|
free(*pAddUser);
|
|
return NULL;
|
|
}
|
|
|
|
memset((*pAddUser)->pInfo, 0, sizeof(ADD_INFO) * nItems);
|
|
|
|
for (i = 0; i < nItems; i++) {
|
|
PADD_INFO pCurrent = &((*pAddUser)->pInfo[i]);
|
|
cJSON *pcurInfo = cJSON_GetArrayItem(pJsonRoot, i);
|
|
|
|
pvxlanMac = cJSON_GetObjectItem(pcurInfo, "vsvxlanMac");
|
|
pvxlanIp = cJSON_GetObjectItem(pcurInfo, "vsvxlanIP");
|
|
puser = cJSON_GetObjectItem(pcurInfo, "users");
|
|
|
|
if (pvxlanMac) {
|
|
strncpy(pCurrent->vxlan.vsvxlanMac, SAFETY_STR_STRING(pvxlanMac->valuestring), MAX_MAC_ADDR_STR);
|
|
}
|
|
|
|
if (pvxlanIp) {
|
|
strncpy(pCurrent->vxlan.vsvxlanIP, SAFETY_STR_STRING(pvxlanIp->valuestring), MAX_IP_V4_STR);
|
|
}
|
|
|
|
if (puser) {
|
|
int nUser = cJSON_GetArraySize(puser);
|
|
|
|
if (nUser > 0) {
|
|
pCurrent->userCount = nUser;
|
|
pCurrent->pUser = (PADD_USER_USER)malloc(sizeof(ADD_USER_USER) * nUser);
|
|
|
|
if (pCurrent->pUser) {
|
|
int j;
|
|
memset(pCurrent->pUser, 0, sizeof(ADD_USER_USER) * nUser);
|
|
for (j = 0; j < nUser; j++) {
|
|
PADD_USER_USER p = &pCurrent->pUser[j];
|
|
cJSON *pJsonU = cJSON_GetArrayItem(puser, j);
|
|
PADD_USER_USER pTmp = j2s_add_user(pJsonU);
|
|
|
|
if (pTmp) {
|
|
memcpy(p, pTmp, sizeof(ADD_USER_USER));
|
|
free(pTmp);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
cJSON_Delete(pJsonRoot);
|
|
|
|
return *pAddUser;
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
PMQ_DATA_MSG decode_data_msg(const char *pStrJson) {
|
|
cJSON *pJsonRoot = cJSON_Parse(pStrJson);
|
|
|
|
if (pJsonRoot) {
|
|
PMQ_DATA_MSG pMsg = NULL;
|
|
|
|
create_mq_data_msg(&pMsg);
|
|
|
|
if (pMsg) {
|
|
cJSON *pJsonEntity = cJSON_GetObjectItem(pJsonRoot, "entity");
|
|
cJSON *pJsonMessage = cJSON_GetObjectItem(pJsonRoot, "message");
|
|
cJSON *pJsonParam = cJSON_GetObjectItem(pJsonRoot, "params");
|
|
|
|
if (pJsonEntity) {
|
|
pMsg->entity = strdup(SAFETY_STR_STRING(pJsonEntity->valuestring));
|
|
}
|
|
|
|
if (pJsonMessage) {
|
|
pMsg->message = strdup(SAFETY_STR_STRING(pJsonMessage->valuestring));
|
|
}
|
|
|
|
if (pJsonParam) {
|
|
pMsg->params = strdup(SAFETY_STR_STRING(pJsonParam->valuestring));
|
|
}
|
|
}
|
|
|
|
cJSON_Delete(pJsonRoot);
|
|
|
|
return pMsg;
|
|
}
|
|
|
|
return NULL;
|
|
}
|
|
|
|
static void process_data_msg(void *UNUSED(pDataCh), zmq_msg_t *pMsg) {
|
|
PMQ_DATA_MSG pMqMsg;
|
|
unsigned int size = zmq_msg_size(pMsg) + 1;
|
|
char *pdata = zmq_msg_data(pMsg);
|
|
unsigned char *pBuf = (unsigned char *)malloc(size);
|
|
|
|
if (pBuf == NULL) {
|
|
return;
|
|
}
|
|
|
|
memset(pBuf, 0, size);
|
|
memcpy(pBuf, zmq_msg_data(pMsg), size - 1);
|
|
dzlog_info("receive(%zu): %s\n", strlen(pdata), pBuf);
|
|
|
|
zmq_msg_close(pMsg);
|
|
|
|
pMqMsg = decode_data_msg((const char *)pBuf);
|
|
|
|
if (pMqMsg) {
|
|
if (strcmp(AGENT_CMD_ADDUSER, pMqMsg->message) == 0) {
|
|
PMQ_DATA_ADD_USER p = NULL;
|
|
dzlog_debug("Process: %s\n", pMqMsg->params);
|
|
decode_add_user_msg(pMqMsg->params, &p);
|
|
if (p) {
|
|
#if LWIP_ON
|
|
int m, n;
|
|
for (m = 0; m < p->infoCount; m++) {
|
|
PADD_INFO pInfo = &(p->pInfo[m]);
|
|
vxlan_peer_add(pInfo->vxlan.vsvxlanIP, pInfo->vxlan.vsvxlanMac);
|
|
for (n = 0; n < pInfo->userCount; n++) {
|
|
PADD_USER_USER pUser = &(pInfo->pUser[n]);
|
|
USER_PARAMS userInfo;
|
|
|
|
memset(&userInfo, 0, sizeof(USER_PARAMS));
|
|
|
|
userInfo.pppoe_user = pUser->pppoeUser;
|
|
userInfo.pppoe_passwd = pUser->pppoePass;
|
|
userInfo.vni = pUser->vni;
|
|
userInfo.userid = pUser->userId;
|
|
userInfo.q1 = pUser->c_tag_in;
|
|
userInfo.q2 = pUser->s_tag_in;
|
|
str_to_mac(pUser->clientMac, userInfo.mac_addr);
|
|
user_info_add(userInfo.userid, &userInfo);
|
|
}
|
|
}
|
|
#endif
|
|
free_add_user(p);
|
|
}
|
|
} else if (strcmp(AGENT_CMD_REMOVUSER, pMqMsg->message) == 0) {
|
|
PMQ_DATA_REMOVE_USER p = NULL;
|
|
dzlog_debug("Process: %s\n", pMqMsg->params);
|
|
|
|
decode_remove_user_msg(pMqMsg->params, &p);
|
|
|
|
if (p) {
|
|
#if LWIP_ON
|
|
int m;
|
|
for (m = 0; m < p->nCount; m++) {
|
|
user_info_remove(p->pUsers[m]);
|
|
}
|
|
#endif
|
|
free_remove_user(p);
|
|
}
|
|
}
|
|
free_mq_data_msg(pMqMsg);
|
|
}
|
|
|
|
free(pBuf);
|
|
}
|
|
|
|
int mq_data_send_msg(const char *pMsg) {
|
|
zmq_msg_t msg;
|
|
|
|
if (pMsg) {
|
|
dzlog_debug("Send PPPoE Session: %s\n", pMsg);
|
|
zmq_msg_init_size(&msg, strlen(pMsg) + 1);
|
|
memset(zmq_msg_data(&msg), 0, strlen(pMsg) + 1);
|
|
memcpy(zmq_msg_data(&msg), pMsg, strlen(pMsg));
|
|
zmq_msg_send(&msg, g_pDataCh, 0);
|
|
zmq_msg_close(&msg);
|
|
}
|
|
|
|
return ERR_SUCCESS;
|
|
}
|
|
|
|
_Noreturn static void mqDataChannelCb(void *pDataCh) {
|
|
while (TRUE) {
|
|
zmq_msg_t msg;
|
|
zmq_msg_init(&msg);
|
|
|
|
if (zmq_msg_recv(&msg, pDataCh, 0) != -1) {
|
|
process_data_msg(pDataCh, &msg);
|
|
zmq_msg_close(&msg);
|
|
}
|
|
|
|
uv_sleep(10);
|
|
}
|
|
}
|
|
|
|
int mq_data_init() {
|
|
static uv_thread_t uvThread;
|
|
void *pContext = zmq_ctx_new();
|
|
|
|
char buf[1024];
|
|
|
|
if (pContext == NULL) {
|
|
return -ERR_MQ_CREATE_MQ;
|
|
}
|
|
|
|
g_pDataCh = zmq_socket(pContext, ZMQ_PAIR);
|
|
|
|
if (g_pDataCh == NULL) {
|
|
zmq_ctx_destroy(g_pDataCh);
|
|
return -ERR_MQ_CREATE_REP;
|
|
}
|
|
|
|
memset(buf, 0, 1024);
|
|
|
|
sprintf(buf, "%s", cfg_get_zero_mq_data_path());
|
|
dzlog_info("Start message queue connect: %s\n", buf);
|
|
|
|
if (zmq_connect(g_pDataCh, buf) != 0) {
|
|
zmq_close(g_pDataCh);
|
|
zmq_ctx_destroy(g_pDataCh);
|
|
return -ERR_MQ_BIND_SOCKET;
|
|
}
|
|
|
|
uv_thread_create(&uvThread, mqDataChannelCb, g_pDataCh);
|
|
|
|
return ERR_SUCCESS;
|
|
} |