REM:
1. 消息队列修改为双向IPC类型
2. pcap驱动支持vxLan数据包
This commit is contained in:
huangxin 2022-06-13 11:09:36 +08:00
parent 25adf97758
commit cfa665e9b6
12 changed files with 151 additions and 119 deletions

View File

@ -45,23 +45,24 @@ application:
zero_mq:
{
svr_port = 6278; # ZeroMQ 服务器端口
agent_port = 6279; # Agetn 通信端口
agent_addr = "ipc:///tmp/msg_fifo0"; # 消息通道路径
};
# vxlan 相关
vxlan:
vxlan_wan:
{
vxlan_enable = true; # 是否启动vxLan隧道封装
vxlan_nic = "ens36"; # vxlan 物理网卡名称
vxlan_peer_ip = "192.168.20.112"; # vxlan 对端IP
vxlan_peer_mac = "00:0C:29:49:CB:27"; # vxlan 对端 MAC 地址
enable = true; # 是否启动vxLan隧道封装
nic = "ens36"; # vxlan 物理网卡名称
peer_ip = "192.168.20.112"; # vxlan 对端IP
peer_mac = "00:0C:29:49:CB:27"; # vxlan 对端 MAC 地址
pkg_filter = "udp port 4789 and ether src not 00:0c:29:07:cb:55"; # 包过滤器
};
# vcpe 本地服务网络接口配置
local_eth:
{
local_ip = "192.168.100.1";
local_netmast = "255.255.0.0";
local_gw = "192.168.100.1";
ip = "192.168.100.1";
netmast = "255.255.0.0";
gw = "192.168.100.1";
}
}

View File

@ -484,7 +484,7 @@ do {
/* 消息队列相配置 */ \
/* ZeroMq配置 */ \
ADD_CFG_ITEM(CFG_MQ_SVR_PORT, "application.zero_mq.svr_port", VALUE_TYPE_INTEGRAL, "6278", "ZeroMQ server port"); \
ADD_CFG_ITEM(CFG_MQ_DATA_CH, "application.zero_mq.agent_port", VALUE_TYPE_INTEGRAL, "6279", "ZeroMQ Agent server port"); \
ADD_CFG_ITEM(CFG_MQ_DATA_PATH, "application.zero_mq.agent_addr", VALUE_TYPE_STRING, "ipc:///tmp/msg_fifo0", "ZeroMQ Agent data path"); \
/* vxLan 隧道配置 */ \
ADD_CFG_ITEM(CFG_VXLAN_NIC_NAME, "application.vxlan_wan.nic", VALUE_TYPE_STRING, "", "Network card name to send data"); \
ADD_CFG_ITEM(CFG_VXLAN_SUPPORT, "application.vxlan_wan.enable", VALUE_TYPE_BOOL, "1", "Is support vxLan tune"); \

View File

@ -111,6 +111,6 @@ int cfg_get_zero_mq_port() {
return (unsigned short)cfg_get_integral_value(CFG_MQ_SVR_PORT);
}
int cfg_get_zero_mq_data_channel() {
return (unsigned short)cfg_get_integral_value(CFG_MQ_DATA_CH);
const char *cfg_get_zero_mq_data_path() {
return cfg_get_string_value(CFG_MQ_DATA_PATH);
}

View File

@ -41,7 +41,7 @@ typedef enum {
CFG_DB_MYSQL_PASSWD = 19,
CFG_DB_MYSQL_DB_NAME = 20,
CFG_MQ_SVR_PORT = 21,
CFG_MQ_DATA_CH = 22,
CFG_MQ_DATA_PATH = 22,
CFG_VXLAN_NIC_NAME = 23,
CFG_VXLAN_SUPPORT = 24,
CFG_VXLAN_PEER_IP = 25,
@ -74,7 +74,7 @@ const char *cfg_get_mysql_user();
const char *cfg_get_mysql_passwd();
const char *cfg_get_mysql_database();
int cfg_get_zero_mq_port();
int cfg_get_zero_mq_data_channel();
const char *cfg_get_zero_mq_data_path();
const char *cfg_get_string_value(CONFIG_ITEM_ID id);
long double cfg_get_float_value(CONFIG_ITEM_ID id);
@ -90,6 +90,7 @@ const char *config_get_vxlan_nic_name();
const char *config_get_vxlan_peer_mac();
const char *config_get_vxlan_peer_ip();
const char *config_get_vxlan_pkg_filter();
#ifdef __cplusplus
}
#endif

View File

@ -82,7 +82,7 @@ int mq_data_init(DATACHNNELCB dataCb) {
g_pDataChCb = dataCb;
g_pDataCh = zmq_socket(pContext, ZMQ_REQ);
g_pDataCh = zmq_socket(pContext, ZMQ_PAIR);
if (g_pDataCh == NULL) {
zmq_ctx_destroy(g_pDataCh);
@ -91,8 +91,8 @@ int mq_data_init(DATACHNNELCB dataCb) {
memset(buf, 0, 1024);
sprintf(buf, "tcp://127.0.0.1:%d", cfg_get_zero_mq_data_channel());
dzlog_info("Start message queue connect: tcp://127.0.0.1:%d\n", cfg_get_zero_mq_data_channel());
sprintf(buf, "%s", cfg_get_zero_mq_data_path());
dzlog_info("Start message queue connect: %s\n", cfg_get_zero_mq_data_path());
if (zmq_connect(g_pDataCh, buf) != 0) {
zmq_close(g_pDataCh);

View File

@ -15,7 +15,6 @@ set(lwipcontribportunix_SRCS
)
set(lwipcontribportunixnetifs_SRCS
${LWIP_DIR}/src/arch_linux/netif/rawif.c
${LWIP_DIR}/src/arch_linux/netif/sio.c
${LWIP_DIR}/src/arch_linux/netif/pppoe_if.c
${LWIP_DIR}/src/arch_linux/netif/pcapif.c

View File

@ -39,7 +39,7 @@ extern "C" {
err_t pcapif_init(struct netif *netif);
void pcapif_poll(struct netif *netif);
err_t pcapif_output(struct netif *netif, struct pbuf *p);
struct netif *bind_pcap_if(const char *eth_name, const char *pkg_filter);
struct netif *bind_pcap_if(const char *eth_name, const char *pkg_filter, int vxlan_support);
#ifdef __cplusplus
}
#endif

View File

@ -47,10 +47,18 @@
#include "netif/etharp.h"
#include "netif/pcapif.h"
#include "vxlan_pkg.h"
#include "lwip/vxlan.h"
#include "user_info.h"
#include "lwip/tcpip.h"
#include "netif/pppoeif.h"
#include "misc.h"
#if defined(LWIP_UNIX_LINUX)
#include <linux/if.h>
#include <zmq.h>
#include <uv/unix.h>
#include <uv.h>
/* Define those to better describe your network interface. */
#define IFNAME0 'p'
@ -68,9 +76,12 @@ struct pcapif {
pcap_t *pcap;
const char *eth_name;
unsigned char mac_addr[6];
const char *pkg_filter;
void *msg_input;
void *msg_output;
int vxlan_support;
};
#if !NO_SYS
@ -107,6 +118,7 @@ static void pcapif_msg_thread(void *arg) {
}
}
zmq_msg_close(&msg);
usleep(10);
}
}
@ -131,31 +143,8 @@ static void pcap_pkg_cb(unsigned char *args, const struct pcap_pkthdr *pkthdr, c
}
/*-----------------------------------------------------------------------------------*/
static void low_level_init(struct netif *netif) {
char buf[2048];
bpf_u_int32 netaddr = 0, mask = 0;
pcap_t *pPcap = NULL;
struct bpf_program filter;
char errBuf[PCAP_ERRBUF_SIZE];
struct pcapif *pcapif = (struct pcapif *)netif->state;
memset(errBuf, 0, PCAP_ERRBUF_SIZE);
memset(buf, 0, 2048);
pPcap = pcap_open_live(pcapif->eth_name, MAX_BYTES_PACKAGE, 1, -1, errBuf);
pcap_lookupnet(pcapif->eth_name, &netaddr, &mask, errBuf);
snprintf(buf, 2048, "(%s) and (inbound)", pcapif->pkg_filter);
dzlog_debug("Pcap netif used filter: \"%s\"\n", buf);
if (pcap_compile(pPcap, &filter, buf, 1, mask) == -1) {
dzlog_error("Set package fileter[%s] error: %s\n", buf, pcap_geterr(pPcap));
}
if (pcap_setfilter(pPcap, &filter) == -1) {
dzlog_error("Set package fileter[%s] error: %s\n", buf, pcap_geterr(pPcap));
}
pcapif->pcap = pPcap;
/* Obtain MAC address from network interface. */
netif->hwaddr[0] = pcapif->mac_addr[0];
@ -188,9 +177,10 @@ static void low_level_init(struct netif *netif) {
/*-----------------------------------------------------------------------------------*/
static err_t low_level_output(struct netif *netif, struct pbuf *p) {
char buf[1518]; /* max packet size including VLAN excluding CRC */
ssize_t written;
unsigned char buf[1518]; /* max packet size including VLAN excluding CRC */
ssize_t written = 0;
struct pcapif *pcapif = (struct pcapif *)netif->state;
PUSER_INFO_CONTEXT pUser = (PUSER_INFO_CONTEXT)p->extra;
if (p->tot_len > sizeof(buf)) {
MIB2_STATS_NETIF_INC(netif, ifoutdiscards);
@ -201,22 +191,20 @@ static err_t low_level_output(struct netif *netif, struct pbuf *p) {
/* initiate transfer(); */
pbuf_copy_partial(p, buf, p->tot_len, 0);
/* signal that packet should be sent(); */
written = pcap_inject(pcapif->pcap, buf, p->tot_len);
if (pcapif->vxlan_support) {
unsigned int outSize = 0;
unsigned char *pBuf = vxlan_pkg_encode(buf, p->tot_len, &pUser->vxlan, &outSize);
#if 0
dzlog_info("\n Send: \n");
hdzlog_info(buf, p->tot_len);
dzlog_info("\n");
#endif
if (pBuf && outSize != 0) {
written = pcap_inject(pcapif->pcap, pBuf, outSize);
}
if (written < p->tot_len) {
MIB2_STATS_NETIF_INC(netif, ifoutdiscards);
perror("pcapif: write");
return ERR_IF;
free(pBuf);
return written == (outSize) ? ERR_OK : ERR_IF;
} else {
MIB2_STATS_NETIF_ADD(netif, ifoutoctets, (u32_t)written);
return ERR_OK;
written = pcap_inject(pcapif->pcap, buf, p->tot_len);
return (written == p->tot_len) ? ERR_OK : ERR_IF;
}
}
@ -322,9 +310,37 @@ static void link_callback(struct netif *state_netif) {
static err_t netif_input_data(struct pbuf *p, struct netif *inp) {
err_t err = ERR_OK;
struct netif *netif;
LWIP_UNUSED_ARG(inp);
struct pcapif *pcapif = (struct pcapif *)inp->state;
const unsigned char *pBuf = (const unsigned char *)p->payload;
if (pcapif->vxlan_support) {
const struct vxlan_package *pkg = (const struct vxlan_package *)pBuf;
VXLAN_TAG tag;
struct pbuf *ebuf = NULL;
PUSER_INFO_CONTEXT pContext;
tag.vni = VXLAN_VIN_ID(lwip_htonl(pkg->vxlan_head.vni_reserved));
tag.q1 = lwip_ntohs(pkg->qinq_head.out_priority_cfi_and_id);
tag.q2 = lwip_ntohs(pkg->qinq_head.in_priority_cfi_and_id);
HASH_FIND(hh_vxlan, get_all_user_by_tag(), &tag, sizeof(VXLAN_TAG), pContext);
if (pContext && pContext->session.pppif) {
vxlan_pkg_decode(p, &ebuf, &tag);
if (ebuf == NULL) {
return ERR_IF;
}
if ((err = pContext->session.nicif->input(ebuf, pContext->session.nicif)) != ERR_OK) {
LWIP_DEBUGF(NETIF_DEBUG, ("pppoeif_input: netif input error\n"));
}
pbuf_free(p);
return err;
}
} else {
struct netif *netif;
NETIF_FOREACH(netif) {
if (netif->hwaddr_len == 6 && memcmp(p->payload, netif->hwaddr, netif->hwaddr_len) == 0) {
@ -336,15 +352,23 @@ static err_t netif_input_data(struct pbuf *p, struct netif *inp) {
return err;
}
}
return err;
}
struct netif *bind_pcap_if(const char *eth_name, const char *pkg_filter) {
return tcpip_input(p, inp);
}
struct netif *bind_pcap_if(const char *eth_name, const char *pkg_filter, int vxlan_support) {
void *msg_context = zmq_ctx_new();
pcap_t *pPcap = NULL;
struct netif *netif;
struct ifreq ifr;
ip4_addr_t ipaddr, netmask, gw;
struct bpf_program filter;
unsigned char mac[6];
ip4_addr_t ipaddr_t, netmask_t, gw_t;
char errBuf[PCAP_ERRBUF_SIZE];
char buf[2048];
unsigned int ipaddr = 0, gw = 0, netmask = 0;
struct pcapif *pcapif = (struct pcapif *)mem_malloc(sizeof(struct pcapif));
if (pcapif == NULL) {
@ -354,46 +378,48 @@ struct netif *bind_pcap_if(const char *eth_name, const char *pkg_filter) {
memset(pcapif, 0, sizeof(struct pcapif));
if (vxlan_support) {
if (vxlan_link_init(eth_name) != ERR_OK) {
dzlog_error("bind_pcapsocket_if: Get local nic info failed\n");
free(pcapif);
return NULL;
}
pcapif->vxlan_support = vxlan_support;
}
if (eth_name && strlen(eth_name) > 0) {
pcapif->eth_name = eth_name;
}
pcapif->pkg_filter = pkg_filter;
#if 0
if (vxlan_link_init(eth_name) != ERR_OK) {
dzlog_error("bind_pcapsocket_if: Get local nic info failed\n");
return NULL;
pcapif->pkg_filter = strdup(pkg_filter);
memset(errBuf, 0, PCAP_ERRBUF_SIZE);
memset(buf, 0, 2048);
pPcap = pcap_open_live(pcapif->eth_name, MAX_BYTES_PACKAGE, 1, -1, errBuf);
if (get_nic_info(eth_name, &ipaddr, &netmask, &gw, mac) != ERR_OK) {
dzlog_error("Get NIC %s information error\n", eth_name);
}
#endif
ip4_addr_set_zero(&gw);
ip4_addr_set_zero(&ipaddr);
ip4_addr_set_zero(&netmask);
IP4_ADDR(&netmask, 255, 255, 255, 0);
IP4_ADDR(&ipaddr, 192, 168, 100, 32);
IP4_ADDR(&gw, 192, 168, 100, 250);
#if 0
IP4_ADDR(&gw,
(DEFAULT_GW_IPADDR & 0xFF000000),
(DEFAULT_GW_IPADDR & 0xFF0000),
(DEFAULT_GW_IPADDR & 0xFF00),
(DEFAULT_GW_IPADDR & 0xFF));
dzlog_debug("Pcap netif used filter: \"%s\"\n", pcapif->pkg_filter);
if (pcap_compile(pPcap, &filter, buf, 1, netmask) == -1) {
dzlog_error("Set package fileter[%s] error: %s\n", buf, pcap_geterr(pPcap));
}
if (pcap_setfilter(pPcap, &filter) == -1) {
dzlog_error("Set package fileter[%s] error: %s\n", buf, pcap_geterr(pPcap));
}
IP4_ADDR(&ipaddr,
(g_localIpAddrBegin & 0xFF000000),
(g_localIpAddrBegin & 0xFF0000),
(g_localIpAddrBegin & 0xFF00),
(g_localIpAddrBegin & 0xFF));
pcapif->pcap = pPcap;
ipaddr_t.addr = ipaddr;
netmask_t.addr = netmask;
gw_t.addr = gw;
memcpy(pcapif->mac_addr, mac, 6);
pcapif->vxlan_support = cfg_get_support_vxlan();
pcapif->pvxLan = g_pvxLanBuf;
#endif
pcapif->mac_addr[0] = 0x00;
pcapif->mac_addr[1] = 0x0C;
pcapif->mac_addr[2] = 0x01;
pcapif->mac_addr[3] = 0x02;
pcapif->mac_addr[4] = 0x00;
pcapif->mac_addr[5] = 0x0a;
memset(&ifr, 0, sizeof(ifr));
strcpy(ifr.ifr_name, pcapif->eth_name);
@ -413,7 +439,7 @@ struct netif *bind_pcap_if(const char *eth_name, const char *pkg_filter) {
return NULL;
}
netif_add(netif, &ipaddr, &netmask, &gw, pcapif, pcapif_init, netif_input_data);
netif_add(netif, &ipaddr_t, &netmask_t, &gw_t, pcapif, pcapif_init, netif_input_data);
#if LWIP_NETIF_STATUS_CALLBACK
netif_set_status_callback(netif, status_callback);

View File

@ -95,6 +95,7 @@ struct sockaddr_ll {
unsigned char sll_addr[8];
};
typedef struct {
unsigned int vni;
unsigned char *output_head;
@ -218,6 +219,7 @@ static void low_level_init(struct netif *netif) {
*
*/
/*-----------------------------------------------------------------------------------*/
#if 0
static unsigned short udp_checksum(unsigned char *pNetPacket, unsigned int uLen) {
unsigned int iLen = uLen - sizeof(struct eth_hdr) - sizeof(struct ip_hdr) + 12;
unsigned short sum = 0;
@ -245,9 +247,10 @@ static unsigned short udp_checksum(unsigned char *pNetPacket, unsigned int uLen)
}
return sum;
}
#endif
static err_t low_level_output(struct netif *netif, struct pbuf *p) {
unsigned char sndBuf[1518];
// unsigned char sndBuf[1518];
unsigned char buf[1518]; /* max packet size including VLAN excluding CRC */
ssize_t written = 0;
struct sockaddr_ll dstAddr;
@ -536,6 +539,7 @@ static void link_callback(struct netif *state_netif) {
}
#endif /* LWIP_NETIF_LINK_CALLBACK */
#if 0
static struct pbuf *vxlan_unpackag(const unsigned char *pBuf, int size) {
struct pbuf *q;
unsigned char buf[1500];
@ -559,6 +563,7 @@ static struct pbuf *vxlan_unpackag(const unsigned char *pBuf, int size) {
}
return q;
}
#endif
static err_t netif_input_data(struct pbuf *p, struct netif *inp) {
err_t err;

View File

@ -260,7 +260,7 @@ int pppoe_session_init() {
static uv_thread_t uvThread, cacheThread;
uv_rwlock_init(&g_cacheLock);
g_rawSocketIf = bind_pcap_if(config_get_vxlan_nic_name(), config_get_vxlan_pkg_filter());
g_rawSocketIf = bind_pcap_if(config_get_vxlan_nic_name(), config_get_vxlan_pkg_filter(), cfg_get_support_vxlan());
if (g_rawSocketIf) {
dzlog_info("Create Raw Socket netif: <%p>\n", (void *)g_rawSocketIf);

View File

@ -19,10 +19,10 @@ static USER_PARAMS g_userInfo[] = {
void user_info_init() {
uv_rwlock_init(&g_userLock);
//user_info_add(0, &g_userInfo[0]);
//user_info_add(1, &g_userInfo[1]);
user_info_add(0, &g_userInfo[0]);
user_info_add(1, &g_userInfo[1]);
//user_info_add(2, &g_userInfo[2]);
user_info_add(3, &g_userInfo[3]);
//user_info_add(3, &g_userInfo[3]);
}
void user_info_change_status(PUSER_INFO_CONTEXT pInfo, USER_STATUS status) {

View File

@ -48,7 +48,7 @@ static int data_mq_init(void) {
return -ERR_MQ_CREATE_MQ;
}
g_pResponse = zmq_socket(g_pContext, ZMQ_REP);
g_pResponse = zmq_socket(g_pContext, ZMQ_PAIR);
if (g_pResponse == NULL) {
zmq_ctx_destroy(g_pContext);
@ -57,8 +57,8 @@ static int data_mq_init(void) {
memset(buf, 0, 1024);
sprintf(buf, "tcp://*:6279");
printf("Start message data channel server: tcp://*:6279\n");
sprintf(buf, "ipc:///tmp/msg_fifo0");
printf("Start message data channel server: ipc:///tmp/msg_fifo0\n");
if (zmq_bind(g_pResponse, buf) != 0) {