diff --git a/lib/ff_dpdk_if.c b/lib/ff_dpdk_if.c index 710e15894..f1feb1cf8 100644 --- a/lib/ff_dpdk_if.c +++ b/lib/ff_dpdk_if.c @@ -162,7 +162,7 @@ static struct lcore_conf lcore_conf; static struct rte_mempool *pktmbuf_pool[NB_SOCKETS]; -static struct rte_ring **arp_ring[RTE_MAX_LCORE]; +static struct rte_ring **arp_ring[RTE_MAX_ETHPORTS]; static uint16_t rss_reta_size[RTE_MAX_ETHPORTS]; @@ -420,43 +420,40 @@ create_ring(const char *name, unsigned count, int socket_id, unsigned flags) static int init_arp_ring(void) { - int i, j, ret; + int j; char name_buf[RTE_RING_NAMESIZE]; - int nb_procs = ff_global_cfg.dpdk.nb_procs; - int proc_id = ff_global_cfg.dpdk.proc_id; - - /* Allocate arp ring ptr according to eth dev count. */ - int nb_dev_ports = rte_eth_dev_count(); - for(i = 0; i < nb_procs; ++i) { - snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_%d_%d", - proc_id, i); - - arp_ring[i] = rte_zmalloc(name_buf, - sizeof(struct rte_ring *) * nb_dev_ports, - RTE_CACHE_LINE_SIZE); - if (arp_ring[i] == NULL) { - rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) " - "failed\n", name_buf); - } - } + int queueid; unsigned socketid = lcore_conf.socket_id; /* Create ring according to ports actually being used. */ int nb_ports = ff_global_cfg.dpdk.nb_ports; for (j = 0; j < nb_ports; j++) { - uint16_t port_id = ff_global_cfg.dpdk.portid_list[j]; + uint16_t portid = ff_global_cfg.dpdk.portid_list[j]; + struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[portid]; + int nb_queues = pconf->nb_lcores; + if (arp_ring[portid] == NULL) { + snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_p%d", portid); - for(i = 0; i < nb_procs; ++i) { - snprintf(name_buf, RTE_RING_NAMESIZE, "arp_ring_%d_%d", i, port_id); - arp_ring[i][port_id] = create_ring(name_buf, ARP_RING_SIZE, + arp_ring[portid] = rte_zmalloc(name_buf, + sizeof(struct rte_ring *) * nb_queues, + RTE_CACHE_LINE_SIZE); + if (arp_ring[portid] == NULL) { + rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) " + "failed\n", name_buf); + } + } + + for(queueid = 0; queueid < nb_queues; ++queueid) { + snprintf(name_buf, RTE_RING_NAMESIZE, "arp_ring_p%d_q%d", portid, queueid); + arp_ring[portid][queueid] = create_ring(name_buf, ARP_RING_SIZE, socketid, RING_F_SC_DEQ); - if (arp_ring[i][port_id] == NULL) + if (arp_ring[portid][queueid] == NULL) rte_panic("create ring:%s failed!\n", name_buf); printf("create ring:%s success, %u ring entries are now free!\n", - name_buf, rte_ring_free_count(arp_ring[i][port_id])); + name_buf, rte_ring_free_count(arp_ring[portid][queueid])); } } @@ -931,7 +928,7 @@ process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs, mbuf_pool = pktmbuf_pool[socket_id]; mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool); if(mbuf_clone) { - int ret = rte_ring_enqueue(arp_ring[i][port_id], mbuf_clone); + int ret = rte_ring_enqueue(arp_ring[port_id][i], mbuf_clone); if (ret < 0) rte_pktmbuf_free(mbuf_clone); } @@ -962,7 +959,7 @@ process_arp_ring(uint8_t port_id, uint16_t queue_id, { /* read packet from ring buf and to process */ uint16_t nb_rb; - nb_rb = rte_ring_dequeue_burst(arp_ring[queue_id][port_id], + nb_rb = rte_ring_dequeue_burst(arp_ring[port_id][queue_id], (void **)pkts_burst, MAX_PKT_BURST); if(nb_rb > 0) {