REM:
1. 增加删除用户接口
2. 增加用户删除返回消息
This commit is contained in:
huangxin 2022-06-15 18:08:19 +08:00
parent 8b71542787
commit b3e7ea5f16
6 changed files with 191 additions and 29 deletions

View File

@ -65,6 +65,7 @@ typedef enum {
ERR_MQ_CREATE_REP = 2301, ERR_MQ_CREATE_REP = 2301,
ERR_MQ_BIND_SOCKET = 2302, ERR_MQ_BIND_SOCKET = 2302,
ERR_MQ_CONN_SERVER = 2304, ERR_MQ_CONN_SERVER = 2304,
ERR_MQ_SEND_MSG = 2305,
// JSON 序列化相关错误 // JSON 序列化相关错误
ERR_JSON_CREAT_OBJ = 2400, ERR_JSON_CREAT_OBJ = 2400,
@ -80,7 +81,6 @@ typedef enum {
ERR_MISC_GET_GATEWAY = 2602, ERR_MISC_GET_GATEWAY = 2602,
ERR_MISC_GET_MACADDR = 2603, ERR_MISC_GET_MACADDR = 2603,
} USER_ERRNO; } USER_ERRNO;
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -17,6 +17,7 @@
#include "vxlan_pkg.h" #include "vxlan_pkg.h"
#define AGENT_CMD_ADDUSER ("add-ywg-pppoe-vcpe") #define AGENT_CMD_ADDUSER ("add-ywg-pppoe-vcpe")
#define AGENT_CMD_REMOVUSER ("remove-ywg-pppoe-vcpe")
static void *g_pDataCh = NULL; static void *g_pDataCh = NULL;
@ -52,12 +53,27 @@ typedef struct {
unsigned int infoCount; unsigned int infoCount;
} MQ_DATA_ADD_USER, *PMQ_DATA_ADD_USER; } MQ_DATA_ADD_USER, *PMQ_DATA_ADD_USER;
typedef struct {
unsigned int *pUsers;
unsigned int nCount;
} MQ_DATA_REMOVE_USER, *PMQ_DATA_REMOVE_USER;
static PMQ_DATA_MSG create_mq_data_msg(PMQ_DATA_MSG *pMsg) { static PMQ_DATA_MSG create_mq_data_msg(PMQ_DATA_MSG *pMsg) {
*pMsg = (PMQ_DATA_MSG)malloc(sizeof(MQ_DATA_MSG)); *pMsg = (PMQ_DATA_MSG)malloc(sizeof(MQ_DATA_MSG));
memset(*pMsg, 0, sizeof(MQ_DATA_MSG)); memset(*pMsg, 0, sizeof(MQ_DATA_MSG));
return *pMsg; return *pMsg;
} }
static void free_remove_user(PMQ_DATA_REMOVE_USER p) {
if (p) {
if (p->pUsers) {
free(p->pUsers);
}
free(p);
}
}
static void free_add_user(PMQ_DATA_ADD_USER p) { static void free_add_user(PMQ_DATA_ADD_USER p) {
int i, j; int i, j;
@ -99,6 +115,58 @@ static void *j2s_add_user(cJSON *pJson) {
return pUser; return pUser;
} }
PMQ_DATA_REMOVE_USER decode_remove_user_msg(const char *pStrJson, PMQ_DATA_REMOVE_USER *pRmUser) {
cJSON *pJsonRoot = cJSON_Parse(pStrJson);
if (pJsonRoot) {
int i;
cJSON *pUser = cJSON_GetObjectItem(pJsonRoot, "users");
if (pUser) {
int nItems = cJSON_GetArraySize(pUser);
if (nItems <= 0) {
cJSON_Delete(pJsonRoot);
return NULL;
}
*pRmUser = (PMQ_DATA_REMOVE_USER)malloc(sizeof(MQ_DATA_REMOVE_USER));
if (*pRmUser == NULL) {
cJSON_Delete(pJsonRoot);
return NULL;
}
memset(*pRmUser, 0, sizeof(MQ_DATA_REMOVE_USER));
(*pRmUser)->nCount = nItems;
(*pRmUser)->pUsers = (unsigned int *)malloc(sizeof(unsigned int) * nItems);
if ((*pRmUser)->pUsers == NULL) {
cJSON_Delete(pJsonRoot);
free(*pRmUser);
return NULL;
}
memset((*pRmUser)->pUsers, 0, sizeof(unsigned int) * nItems);
for (i = 0; i < nItems; i++) {
cJSON *pcurInfo = cJSON_GetArrayItem(pUser, i);
if (pcurInfo) {
(*pRmUser)->pUsers[i] = pcurInfo->valueint;
}
}
cJSON_Delete(pJsonRoot);
return *pRmUser;
} else {
cJSON_Delete(pJsonRoot);
return NULL;
}
}
return NULL;
}
PMQ_DATA_ADD_USER decode_add_user_msg(const char *pStrJson, PMQ_DATA_ADD_USER *pAddUser) { PMQ_DATA_ADD_USER decode_add_user_msg(const char *pStrJson, PMQ_DATA_ADD_USER *pAddUser) {
cJSON *pJsonRoot = cJSON_Parse(pStrJson); cJSON *pJsonRoot = cJSON_Parse(pStrJson);
@ -265,6 +333,19 @@ static void process_data_msg(void *UNUSED(pDataCh), zmq_msg_t *pMsg) {
} }
free_add_user(p); free_add_user(p);
} }
} else if (strcmp(AGENT_CMD_REMOVUSER, pMqMsg->message) == 0) {
PMQ_DATA_REMOVE_USER p = NULL;
dzlog_debug("Process: %s\n", pMqMsg->params);
decode_remove_user_msg(pMqMsg->params, &p);
if (p) {
int m;
for (m = 0; m < p->nCount; m++) {
user_info_remove(p->pUsers[m]);
}
free_remove_user(p);
}
} }
free_mq_data_msg(pMqMsg); free_mq_data_msg(pMqMsg);
} }

View File

@ -327,7 +327,7 @@ static err_t netif_input_data(struct pbuf *p, struct netif *inp) {
if (pContext && pContext->session.pppif) { if (pContext && pContext->session.pppif) {
vxlan_pkg_decode(p, &ebuf, &tag); vxlan_pkg_decode(p, &ebuf, &tag);
if (ebuf == NULL) { if (ebuf != NULL) {
if (strlen(pContext->session.data.svrBaseMac) == 0) { if (strlen(pContext->session.data.svrBaseMac) == 0) {
if (strlen(pContext->session.data.svrBaseMac) == 0) { if (strlen(pContext->session.data.svrBaseMac) == 0) {
sprintf(pContext->session.data.svrBaseMac, sprintf(pContext->session.data.svrBaseMac,

View File

@ -20,6 +20,8 @@
typedef struct PPPOE_CACHE { typedef struct PPPOE_CACHE {
PPPPOE_SESSION_DATA pSessionData; PPPPOE_SESSION_DATA pSessionData;
int nextStatus;
PUSER_INFO_CONTEXT pUser;
struct PPPOE_CACHE *next, *prev; struct PPPOE_CACHE *next, *prev;
} PPPOE_CACHE, *PPPPOE_CACHE; } PPPOE_CACHE, *PPPPOE_CACHE;
@ -45,16 +47,18 @@ struct PPPOE_ERR_INFO_ {
static struct netif *g_rawSocketIf = NULL; static struct netif *g_rawSocketIf = NULL;
static PPPPOE_CACHE g_pPPPCache = NULL; static PPPPOE_CACHE g_pPPPCache = NULL;
static PPPPOE_CACHE g_pPPPDelete = NULL;
static uv_rwlock_t g_cacheLock; static uv_rwlock_t g_cacheLock;
static void pppLinkStatusCallback(ppp_pcb *pcb, int errCode, void *ctx) { static void pppLinkStatusCallback(ppp_pcb *pcb, int errCode, void *ctx) {
PPPPOE_CACHE pCache;
struct netif *pppif = ppp_netif(pcb); struct netif *pppif = ppp_netif(pcb);
struct pppoe_softc *sc = (struct pppoe_softc *)pcb->link_ctx_cb; struct pppoe_softc *sc = (struct pppoe_softc *)pcb->link_ctx_cb;
PUSER_INFO_CONTEXT pUser = (PUSER_INFO_CONTEXT)ctx; PUSER_INFO_CONTEXT pUser = (PUSER_INFO_CONTEXT)ctx;
switch (errCode) { switch (errCode) {
case PPPERR_NONE: { /* No error. */ case PPPERR_NONE: { /* No error. */
PPPPOE_CACHE pCache = (PPPPOE_CACHE)malloc(sizeof(PPPOE_CACHE)); pCache = (PPPPOE_CACHE)malloc(sizeof(PPPOE_CACHE));
dzlog_info("<%p> PPPoE user(%05d:%s) connect server succeeded[%08X], Session: %04X\n", dzlog_info("<%p> PPPoE user(%05d:%s) connect server succeeded[%08X], Session: %04X\n",
pcb, pcb,
pUser->userid, pUser->userid,
@ -88,6 +92,7 @@ static void pppLinkStatusCallback(ppp_pcb *pcb, int errCode, void *ctx) {
if (pCache) { if (pCache) {
pCache->pSessionData = &pUser->session.data; pCache->pSessionData = &pUser->session.data;
pCache->pUser = pUser;
uv_rwlock_wrlock(&g_cacheLock); uv_rwlock_wrlock(&g_cacheLock);
LL_APPEND(g_pPPPCache, pCache); LL_APPEND(g_pPPPCache, pCache);
uv_rwlock_wrunlock(&g_cacheLock); uv_rwlock_wrunlock(&g_cacheLock);
@ -103,11 +108,37 @@ static void pppLinkStatusCallback(ppp_pcb *pcb, int errCode, void *ctx) {
g_pppoeErr[errCode].errmsg, g_pppoeErr[errCode].errmsg,
g_pppoeErr[errCode].errid); g_pppoeErr[errCode].errid);
if (pUser->session.status != STATUS_TASK_DELETE) { if (pUser->session.status != STATUS_TASK_DELETE) {
if (pUser->session.status == STATUS_TASK_CONNECTED) {
pCache = (PPPPOE_CACHE)malloc(sizeof(PPPOE_CACHE));
if (pCache) {
pCache->pSessionData = &pUser->session.data;
pCache->pUser = pUser;
pCache->nextStatus = STATUS_TASK_ERROR;
uv_rwlock_wrlock(&g_cacheLock);
LL_APPEND(g_pPPPDelete, pCache);
uv_rwlock_wrunlock(&g_cacheLock);
} else {
pUser->session.status = STATUS_TASK_ERROR; pUser->session.status = STATUS_TASK_ERROR;
} }
} else {
pUser->session.status = STATUS_TASK_ERROR;
}
}
break; break;
case PPPERR_USER: case PPPERR_USER:
dzlog_info("User(%05d:%s) disconnect\n", pUser->userid, pUser->user_info.pppoe_user); dzlog_info("User(%05d:%s) disconnect\n", pUser->userid, pUser->user_info.pppoe_user);
pCache = (PPPPOE_CACHE)malloc(sizeof(PPPOE_CACHE));
if (pCache) {
pCache->pSessionData = &pUser->session.data;
pCache->pUser = pUser;
pCache->nextStatus = pUser->session.status;
uv_rwlock_wrlock(&g_cacheLock);
LL_APPEND(g_pPPPDelete, pCache);
uv_rwlock_wrunlock(&g_cacheLock);
}
break; break;
case PPPERR_OPEN: case PPPERR_OPEN:
case PPPERR_DEVICE: case PPPERR_DEVICE:
@ -122,8 +153,22 @@ static void pppLinkStatusCallback(ppp_pcb *pcb, int errCode, void *ctx) {
g_pppoeErr[errCode].errmsg, g_pppoeErr[errCode].errmsg,
g_pppoeErr[errCode].errid); g_pppoeErr[errCode].errid);
if (pUser->session.status != STATUS_TASK_DELETE) { if (pUser->session.status != STATUS_TASK_DELETE) {
if (pUser->session.status == STATUS_TASK_CONNECTED) {
pCache = (PPPPOE_CACHE)malloc(sizeof(PPPOE_CACHE));
if (pCache) {
pCache->pSessionData = &pUser->session.data;
pCache->pUser = pUser;
pCache->nextStatus = STATUS_TASK_ERROR;
uv_rwlock_wrlock(&g_cacheLock);
LL_APPEND(g_pPPPDelete, pCache);
uv_rwlock_wrunlock(&g_cacheLock);
} else {
pUser->session.status = STATUS_TASK_DISCONNECTED; pUser->session.status = STATUS_TASK_DISCONNECTED;
} }
} else {
pUser->session.status = STATUS_TASK_DISCONNECTED;
}
}
break; break;
default: { default: {
printf("<%p> pppLinkStatusCallback: unknown errCode %d\n", pcb, errCode); printf("<%p> pppLinkStatusCallback: unknown errCode %d\n", pcb, errCode);
@ -187,6 +232,7 @@ _Noreturn void sessionCalcCb(void *UNUSED(pArg)) {
case STATUS_TASK_DELETE: case STATUS_TASK_DELETE:
if (pUser->session.retry.timeout == 0) { if (pUser->session.retry.timeout == 0) {
dzlog_debug("User(%05d:%s) PPPoE deleted\n", pUser->userid, pUser->user_info.pppoe_user); dzlog_debug("User(%05d:%s) PPPoE deleted\n", pUser->userid, pUser->user_info.pppoe_user);
pppapi_close(pUser->session.ppp, 0);
pUser->session.retry.timeout = time(NULL); pUser->session.retry.timeout = time(NULL);
pppapi_free(pUser->session.ppp); pppapi_free(pUser->session.ppp);
} }
@ -249,6 +295,34 @@ _Noreturn void cacheCalcCb(void *UNUSED(pArg)) {
free((void *)pJsonString); free((void *)pJsonString);
} }
uv_rwlock_rdlock(&g_cacheLock);
LL_COUNT(g_pPPPDelete, pCache, count);
uv_rwlock_rdunlock(&g_cacheLock);
if (count > 0) {
const char *pJsonString;
cJSON *pRoot = cJSON_CreateObject();
cJSON *pSession = cJSON_CreateArray();
cJSON_AddStringToObject(pRoot, "message", "remove-ywg-pppoe");
uv_rwlock_wrlock(&g_cacheLock);
LL_FOREACH_SAFE(g_pPPPDelete, pCache, pTmp) {
cJSON *pItem = cJSON_CreateObject();
cJSON_AddNumberToObject(pItem, "sessionId", pCache->pSessionData->sessionId);
cJSON_AddStringToObject(pItem, "clientMac", pCache->pSessionData->clientMac);
cJSON_AddItemToArray(pSession, pItem);
pCache->pUser->session.status = pCache->nextStatus;
LL_DELETE(g_pPPPDelete, pCache);
free(pCache);
}
uv_rwlock_wrunlock(&g_cacheLock);
cJSON_AddItemToObject(pRoot, "params", pSession);
pJsonString = cJSON_Print(pRoot);
mq_data_send_msg(pJsonString);
cJSON_Delete(pRoot);
free((void *)pJsonString);
}
uv_sleep(1000); uv_sleep(1000);
} while (TRUE); } while (TRUE);
} }

View File

@ -69,7 +69,8 @@ int user_info_add(unsigned int userid, PUSER_PARAMS pInfo) {
HASH_ADD(hh_mac, g_pUserByMacList, mac_addr, 6, pList); HASH_ADD(hh_mac, g_pUserByMacList, mac_addr, 6, pList);
uv_rwlock_wrunlock(&g_userLock); uv_rwlock_wrunlock(&g_userLock);
dzlog_debug("Add user: id = %u, vni = %u, q1 = %u, q2 = %u, ppp_user = %s, mac = %02X:%02X:%02X:%02X:%02X:%02X\n", dzlog_debug(
"Add user: id = %u, vni = %u, q1 = %u, q2 = %u, ppp_user = %s, mac = %02X:%02X:%02X:%02X:%02X:%02X\n",
userid, userid,
pInfo->vni, pInfo->vni,
pInfo->q1, pInfo->q1,
@ -123,7 +124,8 @@ void user_info_remove(unsigned int userid) {
uv_rwlock_rdunlock(&g_userLock); uv_rwlock_rdunlock(&g_userLock);
if (pInfo) { if (pInfo) {
pInfo->user_status = STATUS_USER_DELETE; pInfo->session.retry.timeout = 0;
pInfo->session.status = STATUS_TASK_DELETE;
} }
} }

View File

@ -6,6 +6,7 @@
#include <zmq.h> #include <zmq.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include <stdlib.h>
#include "task_manager.h" #include "task_manager.h"
#include "user_errno.h" #include "user_errno.h"
#include "config.h" #include "config.h"
@ -17,6 +18,7 @@ static void *g_pResponse = NULL;
static const char *g_pSendMsg = NULL; static const char *g_pSendMsg = NULL;
static int mq_data_send_msg(const char *pMsg) { static int mq_data_send_msg(const char *pMsg) {
int ret = ERR_SUCCESS;
zmq_msg_t msg; zmq_msg_t msg;
if (pMsg) { if (pMsg) {
@ -27,11 +29,12 @@ static int mq_data_send_msg(const char *pMsg) {
memcpy(zmq_msg_data(&msg), pMsg, size); memcpy(zmq_msg_data(&msg), pMsg, size);
if (zmq_msg_send(&msg, g_pResponse, ZMQ_DONTWAIT) == -1) { if (zmq_msg_send(&msg, g_pResponse, ZMQ_DONTWAIT) == -1) {
perror("zmq_msg_send"); perror("zmq_msg_send");
ret = -ERR_MQ_SEND_MSG;
} }
zmq_msg_close(&msg); zmq_msg_close(&msg);
} }
return ERR_SUCCESS; return ret;
} }
_Noreturn static void mqServerCb(void *UNUSED(pArg)) { _Noreturn static void mqServerCb(void *UNUSED(pArg)) {
@ -42,7 +45,6 @@ _Noreturn static void mqServerCb(void *UNUSED(pArg)) {
if (zmq_msg_recv(&msg, g_pResponse, ZMQ_DONTWAIT) != -1) { if (zmq_msg_recv(&msg, g_pResponse, ZMQ_DONTWAIT) != -1) {
printf("Data channel receive(%zu): %s\n", zmq_msg_size(&msg), (const char *)zmq_msg_data(&msg)); printf("Data channel receive(%zu): %s\n", zmq_msg_size(&msg), (const char *)zmq_msg_data(&msg));
zmq_msg_close(&msg); zmq_msg_close(&msg);
mq_data_send_msg("xajhuang");
#if 0 #if 0
zmq_msg_close(&msg); zmq_msg_close(&msg);
@ -62,17 +64,20 @@ _Noreturn static void mqsendServerCb(void *UNUSED(pArg)) {
while (TRUE) { while (TRUE) {
if (g_pSendMsg) { if (g_pSendMsg) {
printf("+++++\n"); printf("+++++\n");
mq_data_send_msg(g_pSendMsg); if (mq_data_send_msg(g_pSendMsg) == ERR_SUCCESS) {
free((void *)g_pSendMsg);
g_pSendMsg = NULL;
}
printf("-----\n"); printf("-----\n");
} }
uv_sleep(10000); uv_sleep(3000);
} }
} }
static int data_mq_init(void) { static int data_mq_init(void) {
static uv_thread_t uvThread; static uv_thread_t uvThread;
const char *mq_name = "ipc:///tmp/msg_fifo1"; const char *mq_name = "ipc:///tmp/msg_fifo0";
char buf[1024]; char buf[1024];