From b3e7ea5f1694bb4af9ec09916f263795f7200c17 Mon Sep 17 00:00:00 2001 From: huangxin Date: Wed, 15 Jun 2022 18:08:19 +0800 Subject: [PATCH] =?UTF-8?q?OCT=20REM:=201.=20=E5=A2=9E=E5=8A=A0=E5=88=A0?= =?UTF-8?q?=E9=99=A4=E7=94=A8=E6=88=B7=E6=8E=A5=E5=8F=A3=202.=20=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E7=94=A8=E6=88=B7=E5=88=A0=E9=99=A4=E8=BF=94=E5=9B=9E?= =?UTF-8?q?=E6=B6=88=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- srcs/libs/include/user_errno.h | 10 +-- srcs/libs/mq/mq_data.c | 83 ++++++++++++++++++++++++- srcs/lwip/src/arch_linux/netif/pcapif.c | 2 +- srcs/pppoe/vcpe_pppoe.c | 80 +++++++++++++++++++++++- srcs/user/user_info.c | 28 +++++---- srcs/vcpe_agent.c | 17 +++-- 6 files changed, 191 insertions(+), 29 deletions(-) diff --git a/srcs/libs/include/user_errno.h b/srcs/libs/include/user_errno.h index 29d17d1..0133bfc 100644 --- a/srcs/libs/include/user_errno.h +++ b/srcs/libs/include/user_errno.h @@ -65,6 +65,7 @@ typedef enum { ERR_MQ_CREATE_REP = 2301, ERR_MQ_BIND_SOCKET = 2302, ERR_MQ_CONN_SERVER = 2304, + ERR_MQ_SEND_MSG = 2305, // JSON 序列化相关错误 ERR_JSON_CREAT_OBJ = 2400, @@ -75,11 +76,10 @@ typedef enum { ERR_CREATE_PPP_SESSION = 2502, // MISC 相关 - ERR_MISC_GET_IPADDR = 2600, - ERR_MISC_GET_NETMASK = 2601, - ERR_MISC_GET_GATEWAY = 2602, - ERR_MISC_GET_MACADDR = 2603, - + ERR_MISC_GET_IPADDR = 2600, + ERR_MISC_GET_NETMASK = 2601, + ERR_MISC_GET_GATEWAY = 2602, + ERR_MISC_GET_MACADDR = 2603, } USER_ERRNO; diff --git a/srcs/libs/mq/mq_data.c b/srcs/libs/mq/mq_data.c index 43cf7f1..c34d47c 100644 --- a/srcs/libs/mq/mq_data.c +++ b/srcs/libs/mq/mq_data.c @@ -16,7 +16,8 @@ #include "user_info.h" #include "vxlan_pkg.h" -#define AGENT_CMD_ADDUSER ("add-ywg-pppoe-vcpe") +#define AGENT_CMD_ADDUSER ("add-ywg-pppoe-vcpe") +#define AGENT_CMD_REMOVUSER ("remove-ywg-pppoe-vcpe") static void *g_pDataCh = NULL; @@ -52,12 +53,27 @@ typedef struct { unsigned int infoCount; } MQ_DATA_ADD_USER, *PMQ_DATA_ADD_USER; +typedef struct { + unsigned int *pUsers; + unsigned int nCount; +} MQ_DATA_REMOVE_USER, *PMQ_DATA_REMOVE_USER; + static PMQ_DATA_MSG create_mq_data_msg(PMQ_DATA_MSG *pMsg) { *pMsg = (PMQ_DATA_MSG)malloc(sizeof(MQ_DATA_MSG)); memset(*pMsg, 0, sizeof(MQ_DATA_MSG)); return *pMsg; } +static void free_remove_user(PMQ_DATA_REMOVE_USER p) { + if (p) { + if (p->pUsers) { + free(p->pUsers); + } + + free(p); + } +} + static void free_add_user(PMQ_DATA_ADD_USER p) { int i, j; @@ -99,6 +115,58 @@ static void *j2s_add_user(cJSON *pJson) { return pUser; } +PMQ_DATA_REMOVE_USER decode_remove_user_msg(const char *pStrJson, PMQ_DATA_REMOVE_USER *pRmUser) { + cJSON *pJsonRoot = cJSON_Parse(pStrJson); + if (pJsonRoot) { + int i; + cJSON *pUser = cJSON_GetObjectItem(pJsonRoot, "users"); + + if (pUser) { + int nItems = cJSON_GetArraySize(pUser); + + if (nItems <= 0) { + cJSON_Delete(pJsonRoot); + return NULL; + } + + *pRmUser = (PMQ_DATA_REMOVE_USER)malloc(sizeof(MQ_DATA_REMOVE_USER)); + + if (*pRmUser == NULL) { + cJSON_Delete(pJsonRoot); + return NULL; + } + + memset(*pRmUser, 0, sizeof(MQ_DATA_REMOVE_USER)); + (*pRmUser)->nCount = nItems; + (*pRmUser)->pUsers = (unsigned int *)malloc(sizeof(unsigned int) * nItems); + + if ((*pRmUser)->pUsers == NULL) { + cJSON_Delete(pJsonRoot); + free(*pRmUser); + return NULL; + } + + memset((*pRmUser)->pUsers, 0, sizeof(unsigned int) * nItems); + + for (i = 0; i < nItems; i++) { + cJSON *pcurInfo = cJSON_GetArrayItem(pUser, i); + + if (pcurInfo) { + (*pRmUser)->pUsers[i] = pcurInfo->valueint; + } + } + + cJSON_Delete(pJsonRoot); + return *pRmUser; + } else { + cJSON_Delete(pJsonRoot); + return NULL; + } + } + + return NULL; +} + PMQ_DATA_ADD_USER decode_add_user_msg(const char *pStrJson, PMQ_DATA_ADD_USER *pAddUser) { cJSON *pJsonRoot = cJSON_Parse(pStrJson); @@ -265,6 +333,19 @@ static void process_data_msg(void *UNUSED(pDataCh), zmq_msg_t *pMsg) { } free_add_user(p); } + } else if (strcmp(AGENT_CMD_REMOVUSER, pMqMsg->message) == 0) { + PMQ_DATA_REMOVE_USER p = NULL; + dzlog_debug("Process: %s\n", pMqMsg->params); + + decode_remove_user_msg(pMqMsg->params, &p); + + if (p) { + int m; + for (m = 0; m < p->nCount; m++) { + user_info_remove(p->pUsers[m]); + } + free_remove_user(p); + } } free_mq_data_msg(pMqMsg); } diff --git a/srcs/lwip/src/arch_linux/netif/pcapif.c b/srcs/lwip/src/arch_linux/netif/pcapif.c index 23425f5..10814c3 100644 --- a/srcs/lwip/src/arch_linux/netif/pcapif.c +++ b/srcs/lwip/src/arch_linux/netif/pcapif.c @@ -327,7 +327,7 @@ static err_t netif_input_data(struct pbuf *p, struct netif *inp) { if (pContext && pContext->session.pppif) { vxlan_pkg_decode(p, &ebuf, &tag); - if (ebuf == NULL) { + if (ebuf != NULL) { if (strlen(pContext->session.data.svrBaseMac) == 0) { if (strlen(pContext->session.data.svrBaseMac) == 0) { sprintf(pContext->session.data.svrBaseMac, diff --git a/srcs/pppoe/vcpe_pppoe.c b/srcs/pppoe/vcpe_pppoe.c index 78a4d75..e94876a 100644 --- a/srcs/pppoe/vcpe_pppoe.c +++ b/srcs/pppoe/vcpe_pppoe.c @@ -20,6 +20,8 @@ typedef struct PPPOE_CACHE { PPPPOE_SESSION_DATA pSessionData; + int nextStatus; + PUSER_INFO_CONTEXT pUser; struct PPPOE_CACHE *next, *prev; } PPPOE_CACHE, *PPPPOE_CACHE; @@ -45,16 +47,18 @@ struct PPPOE_ERR_INFO_ { static struct netif *g_rawSocketIf = NULL; static PPPPOE_CACHE g_pPPPCache = NULL; +static PPPPOE_CACHE g_pPPPDelete = NULL; static uv_rwlock_t g_cacheLock; static void pppLinkStatusCallback(ppp_pcb *pcb, int errCode, void *ctx) { + PPPPOE_CACHE pCache; struct netif *pppif = ppp_netif(pcb); struct pppoe_softc *sc = (struct pppoe_softc *)pcb->link_ctx_cb; PUSER_INFO_CONTEXT pUser = (PUSER_INFO_CONTEXT)ctx; switch (errCode) { case PPPERR_NONE: { /* No error. */ - PPPPOE_CACHE pCache = (PPPPOE_CACHE)malloc(sizeof(PPPOE_CACHE)); + pCache = (PPPPOE_CACHE)malloc(sizeof(PPPOE_CACHE)); dzlog_info("<%p> PPPoE user(%05d:%s) connect server succeeded[%08X], Session: %04X\n", pcb, pUser->userid, @@ -88,6 +92,7 @@ static void pppLinkStatusCallback(ppp_pcb *pcb, int errCode, void *ctx) { if (pCache) { pCache->pSessionData = &pUser->session.data; + pCache->pUser = pUser; uv_rwlock_wrlock(&g_cacheLock); LL_APPEND(g_pPPPCache, pCache); uv_rwlock_wrunlock(&g_cacheLock); @@ -103,11 +108,37 @@ static void pppLinkStatusCallback(ppp_pcb *pcb, int errCode, void *ctx) { g_pppoeErr[errCode].errmsg, g_pppoeErr[errCode].errid); if (pUser->session.status != STATUS_TASK_DELETE) { - pUser->session.status = STATUS_TASK_ERROR; + if (pUser->session.status == STATUS_TASK_CONNECTED) { + pCache = (PPPPOE_CACHE)malloc(sizeof(PPPOE_CACHE)); + + if (pCache) { + pCache->pSessionData = &pUser->session.data; + pCache->pUser = pUser; + pCache->nextStatus = STATUS_TASK_ERROR; + uv_rwlock_wrlock(&g_cacheLock); + LL_APPEND(g_pPPPDelete, pCache); + uv_rwlock_wrunlock(&g_cacheLock); + } else { + pUser->session.status = STATUS_TASK_ERROR; + } + } else { + pUser->session.status = STATUS_TASK_ERROR; + } } break; case PPPERR_USER: dzlog_info("User(%05d:%s) disconnect\n", pUser->userid, pUser->user_info.pppoe_user); + + pCache = (PPPPOE_CACHE)malloc(sizeof(PPPOE_CACHE)); + if (pCache) { + pCache->pSessionData = &pUser->session.data; + pCache->pUser = pUser; + pCache->nextStatus = pUser->session.status; + uv_rwlock_wrlock(&g_cacheLock); + LL_APPEND(g_pPPPDelete, pCache); + uv_rwlock_wrunlock(&g_cacheLock); + } + break; case PPPERR_OPEN: case PPPERR_DEVICE: @@ -122,7 +153,21 @@ static void pppLinkStatusCallback(ppp_pcb *pcb, int errCode, void *ctx) { g_pppoeErr[errCode].errmsg, g_pppoeErr[errCode].errid); if (pUser->session.status != STATUS_TASK_DELETE) { - pUser->session.status = STATUS_TASK_DISCONNECTED; + if (pUser->session.status == STATUS_TASK_CONNECTED) { + pCache = (PPPPOE_CACHE)malloc(sizeof(PPPOE_CACHE)); + if (pCache) { + pCache->pSessionData = &pUser->session.data; + pCache->pUser = pUser; + pCache->nextStatus = STATUS_TASK_ERROR; + uv_rwlock_wrlock(&g_cacheLock); + LL_APPEND(g_pPPPDelete, pCache); + uv_rwlock_wrunlock(&g_cacheLock); + } else { + pUser->session.status = STATUS_TASK_DISCONNECTED; + } + } else { + pUser->session.status = STATUS_TASK_DISCONNECTED; + } } break; default: { @@ -187,6 +232,7 @@ _Noreturn void sessionCalcCb(void *UNUSED(pArg)) { case STATUS_TASK_DELETE: if (pUser->session.retry.timeout == 0) { dzlog_debug("User(%05d:%s) PPPoE deleted\n", pUser->userid, pUser->user_info.pppoe_user); + pppapi_close(pUser->session.ppp, 0); pUser->session.retry.timeout = time(NULL); pppapi_free(pUser->session.ppp); } @@ -249,6 +295,34 @@ _Noreturn void cacheCalcCb(void *UNUSED(pArg)) { free((void *)pJsonString); } + uv_rwlock_rdlock(&g_cacheLock); + LL_COUNT(g_pPPPDelete, pCache, count); + uv_rwlock_rdunlock(&g_cacheLock); + + if (count > 0) { + const char *pJsonString; + cJSON *pRoot = cJSON_CreateObject(); + cJSON *pSession = cJSON_CreateArray(); + cJSON_AddStringToObject(pRoot, "message", "remove-ywg-pppoe"); + + uv_rwlock_wrlock(&g_cacheLock); + LL_FOREACH_SAFE(g_pPPPDelete, pCache, pTmp) { + cJSON *pItem = cJSON_CreateObject(); + cJSON_AddNumberToObject(pItem, "sessionId", pCache->pSessionData->sessionId); + cJSON_AddStringToObject(pItem, "clientMac", pCache->pSessionData->clientMac); + cJSON_AddItemToArray(pSession, pItem); + pCache->pUser->session.status = pCache->nextStatus; + LL_DELETE(g_pPPPDelete, pCache); + free(pCache); + } + uv_rwlock_wrunlock(&g_cacheLock); + cJSON_AddItemToObject(pRoot, "params", pSession); + pJsonString = cJSON_Print(pRoot); + mq_data_send_msg(pJsonString); + cJSON_Delete(pRoot); + free((void *)pJsonString); + } + uv_sleep(1000); } while (TRUE); } diff --git a/srcs/user/user_info.c b/srcs/user/user_info.c index 21ee7ea..d6c93e7 100644 --- a/srcs/user/user_info.c +++ b/srcs/user/user_info.c @@ -69,18 +69,19 @@ int user_info_add(unsigned int userid, PUSER_PARAMS pInfo) { HASH_ADD(hh_mac, g_pUserByMacList, mac_addr, 6, pList); uv_rwlock_wrunlock(&g_userLock); - dzlog_debug("Add user: id = %u, vni = %u, q1 = %u, q2 = %u, ppp_user = %s, mac = %02X:%02X:%02X:%02X:%02X:%02X\n", - userid, - pInfo->vni, - pInfo->q1, - pInfo->q2, - pInfo->pppoe_user, - pList->mac_addr[0], - pList->mac_addr[1], - pList->mac_addr[2], - pList->mac_addr[3], - pList->mac_addr[4], - pList->mac_addr[5]); + dzlog_debug( + "Add user: id = %u, vni = %u, q1 = %u, q2 = %u, ppp_user = %s, mac = %02X:%02X:%02X:%02X:%02X:%02X\n", + userid, + pInfo->vni, + pInfo->q1, + pInfo->q2, + pInfo->pppoe_user, + pList->mac_addr[0], + pList->mac_addr[1], + pList->mac_addr[2], + pList->mac_addr[3], + pList->mac_addr[4], + pList->mac_addr[5]); } return ERR_SUCCESS; @@ -123,7 +124,8 @@ void user_info_remove(unsigned int userid) { uv_rwlock_rdunlock(&g_userLock); if (pInfo) { - pInfo->user_status = STATUS_USER_DELETE; + pInfo->session.retry.timeout = 0; + pInfo->session.status = STATUS_TASK_DELETE; } } diff --git a/srcs/vcpe_agent.c b/srcs/vcpe_agent.c index 4851770..bf30abd 100644 --- a/srcs/vcpe_agent.c +++ b/srcs/vcpe_agent.c @@ -6,6 +6,7 @@ #include #include #include +#include #include "task_manager.h" #include "user_errno.h" #include "config.h" @@ -17,6 +18,7 @@ static void *g_pResponse = NULL; static const char *g_pSendMsg = NULL; static int mq_data_send_msg(const char *pMsg) { + int ret = ERR_SUCCESS; zmq_msg_t msg; if (pMsg) { @@ -27,11 +29,12 @@ static int mq_data_send_msg(const char *pMsg) { memcpy(zmq_msg_data(&msg), pMsg, size); if (zmq_msg_send(&msg, g_pResponse, ZMQ_DONTWAIT) == -1) { perror("zmq_msg_send"); + ret = -ERR_MQ_SEND_MSG; } zmq_msg_close(&msg); } - return ERR_SUCCESS; + return ret; } _Noreturn static void mqServerCb(void *UNUSED(pArg)) { @@ -42,7 +45,6 @@ _Noreturn static void mqServerCb(void *UNUSED(pArg)) { if (zmq_msg_recv(&msg, g_pResponse, ZMQ_DONTWAIT) != -1) { printf("Data channel receive(%zu): %s\n", zmq_msg_size(&msg), (const char *)zmq_msg_data(&msg)); zmq_msg_close(&msg); - mq_data_send_msg("xajhuang"); #if 0 zmq_msg_close(&msg); @@ -60,19 +62,22 @@ _Noreturn static void mqServerCb(void *UNUSED(pArg)) { _Noreturn static void mqsendServerCb(void *UNUSED(pArg)) { while (TRUE) { - if(g_pSendMsg) { + if (g_pSendMsg) { printf("+++++\n"); - mq_data_send_msg(g_pSendMsg); + if (mq_data_send_msg(g_pSendMsg) == ERR_SUCCESS) { + free((void *)g_pSendMsg); + g_pSendMsg = NULL; + } printf("-----\n"); } - uv_sleep(10000); + uv_sleep(3000); } } static int data_mq_init(void) { static uv_thread_t uvThread; - const char *mq_name = "ipc:///tmp/msg_fifo1"; + const char *mq_name = "ipc:///tmp/msg_fifo0"; char buf[1024];