mirror of https://github.com/F-Stack/f-stack.git
549 lines
13 KiB
C
549 lines
13 KiB
C
/*-
|
|
* Copyright (c) 2014 Chelsio Communications, Inc.
|
|
* All rights reserved.
|
|
* Written by: Navdeep Parhar <np@FreeBSD.org>
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions
|
|
* are met:
|
|
* 1. Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* 2. Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
|
|
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
|
|
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
|
|
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
|
|
* OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
|
* HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
|
|
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
|
|
* SUCH DAMAGE.
|
|
*/
|
|
|
|
#include <sys/cdefs.h>
|
|
__FBSDID("$FreeBSD$");
|
|
|
|
#include <sys/types.h>
|
|
#include <sys/param.h>
|
|
#include <sys/systm.h>
|
|
#include <sys/counter.h>
|
|
#include <sys/lock.h>
|
|
#include <sys/mutex.h>
|
|
#include <sys/malloc.h>
|
|
#include <machine/cpu.h>
|
|
#include <net/mp_ring.h>
|
|
|
|
union ring_state {
|
|
struct {
|
|
uint16_t pidx_head;
|
|
uint16_t pidx_tail;
|
|
uint16_t cidx;
|
|
uint16_t flags;
|
|
};
|
|
uint64_t state;
|
|
};
|
|
|
|
enum {
|
|
IDLE = 0, /* consumer ran to completion, nothing more to do. */
|
|
BUSY, /* consumer is running already, or will be shortly. */
|
|
STALLED, /* consumer stopped due to lack of resources. */
|
|
ABDICATED, /* consumer stopped even though there was work to be
|
|
done because it wants another thread to take over. */
|
|
};
|
|
|
|
static inline uint16_t
|
|
space_available(struct ifmp_ring *r, union ring_state s)
|
|
{
|
|
uint16_t x = r->size - 1;
|
|
|
|
if (s.cidx == s.pidx_head)
|
|
return (x);
|
|
else if (s.cidx > s.pidx_head)
|
|
return (s.cidx - s.pidx_head - 1);
|
|
else
|
|
return (x - s.pidx_head + s.cidx);
|
|
}
|
|
|
|
static inline uint16_t
|
|
increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
|
|
{
|
|
int x = r->size - idx;
|
|
|
|
MPASS(x > 0);
|
|
return (x > n ? idx + n : n - x);
|
|
}
|
|
|
|
/* Consumer is about to update the ring's state to s */
|
|
static inline uint16_t
|
|
state_to_flags(union ring_state s, int abdicate)
|
|
{
|
|
|
|
if (s.cidx == s.pidx_tail)
|
|
return (IDLE);
|
|
else if (abdicate && s.pidx_tail != s.pidx_head)
|
|
return (ABDICATED);
|
|
|
|
return (BUSY);
|
|
}
|
|
|
|
#ifdef MP_RING_NO_64BIT_ATOMICS
|
|
static void
|
|
drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
|
|
{
|
|
union ring_state ns;
|
|
int n, pending, total;
|
|
uint16_t cidx = os.cidx;
|
|
uint16_t pidx = os.pidx_tail;
|
|
|
|
MPASS(os.flags == BUSY);
|
|
MPASS(cidx != pidx);
|
|
|
|
if (prev == IDLE)
|
|
counter_u64_add(r->starts, 1);
|
|
pending = 0;
|
|
total = 0;
|
|
|
|
while (cidx != pidx) {
|
|
/* Items from cidx to pidx are available for consumption. */
|
|
n = r->drain(r, cidx, pidx);
|
|
if (n == 0) {
|
|
os.state = ns.state = r->state;
|
|
ns.cidx = cidx;
|
|
ns.flags = STALLED;
|
|
r->state = ns.state;
|
|
if (prev != STALLED)
|
|
counter_u64_add(r->stalls, 1);
|
|
else if (total > 0) {
|
|
counter_u64_add(r->restarts, 1);
|
|
counter_u64_add(r->stalls, 1);
|
|
}
|
|
break;
|
|
}
|
|
cidx = increment_idx(r, cidx, n);
|
|
pending += n;
|
|
total += n;
|
|
|
|
/*
|
|
* We update the cidx only if we've caught up with the pidx, the
|
|
* real cidx is getting too far ahead of the one visible to
|
|
* everyone else, or we have exceeded our budget.
|
|
*/
|
|
if (cidx != pidx && pending < 64 && total < budget)
|
|
continue;
|
|
|
|
os.state = ns.state = r->state;
|
|
ns.cidx = cidx;
|
|
ns.flags = state_to_flags(ns, total >= budget);
|
|
r->state = ns.state;
|
|
|
|
if (ns.flags == ABDICATED)
|
|
counter_u64_add(r->abdications, 1);
|
|
if (ns.flags != BUSY) {
|
|
/* Wrong loop exit if we're going to stall. */
|
|
MPASS(ns.flags != STALLED);
|
|
if (prev == STALLED) {
|
|
MPASS(total > 0);
|
|
counter_u64_add(r->restarts, 1);
|
|
}
|
|
break;
|
|
}
|
|
|
|
/*
|
|
* The acquire style atomic above guarantees visibility of items
|
|
* associated with any pidx change that we notice here.
|
|
*/
|
|
pidx = ns.pidx_tail;
|
|
pending = 0;
|
|
}
|
|
}
|
|
#else
|
|
/*
|
|
* Caller passes in a state, with a guarantee that there is work to do and that
|
|
* all items up to the pidx_tail in the state are visible.
|
|
*/
|
|
static void
|
|
drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
|
|
{
|
|
union ring_state ns;
|
|
int n, pending, total;
|
|
uint16_t cidx = os.cidx;
|
|
uint16_t pidx = os.pidx_tail;
|
|
|
|
MPASS(os.flags == BUSY);
|
|
MPASS(cidx != pidx);
|
|
|
|
if (prev == IDLE)
|
|
counter_u64_add(r->starts, 1);
|
|
pending = 0;
|
|
total = 0;
|
|
|
|
while (cidx != pidx) {
|
|
/* Items from cidx to pidx are available for consumption. */
|
|
n = r->drain(r, cidx, pidx);
|
|
if (n == 0) {
|
|
critical_enter();
|
|
os.state = r->state;
|
|
do {
|
|
ns.state = os.state;
|
|
ns.cidx = cidx;
|
|
ns.flags = STALLED;
|
|
} while (atomic_fcmpset_64(&r->state, &os.state,
|
|
ns.state) == 0);
|
|
critical_exit();
|
|
if (prev != STALLED)
|
|
counter_u64_add(r->stalls, 1);
|
|
else if (total > 0) {
|
|
counter_u64_add(r->restarts, 1);
|
|
counter_u64_add(r->stalls, 1);
|
|
}
|
|
break;
|
|
}
|
|
cidx = increment_idx(r, cidx, n);
|
|
pending += n;
|
|
total += n;
|
|
|
|
/*
|
|
* We update the cidx only if we've caught up with the pidx, the
|
|
* real cidx is getting too far ahead of the one visible to
|
|
* everyone else, or we have exceeded our budget.
|
|
*/
|
|
if (cidx != pidx && pending < 64 && total < budget)
|
|
continue;
|
|
critical_enter();
|
|
os.state = r->state;
|
|
do {
|
|
ns.state = os.state;
|
|
ns.cidx = cidx;
|
|
ns.flags = state_to_flags(ns, total >= budget);
|
|
} while (atomic_fcmpset_acq_64(&r->state, &os.state,
|
|
ns.state) == 0);
|
|
critical_exit();
|
|
|
|
if (ns.flags == ABDICATED)
|
|
counter_u64_add(r->abdications, 1);
|
|
if (ns.flags != BUSY) {
|
|
/* Wrong loop exit if we're going to stall. */
|
|
MPASS(ns.flags != STALLED);
|
|
if (prev == STALLED) {
|
|
MPASS(total > 0);
|
|
counter_u64_add(r->restarts, 1);
|
|
}
|
|
break;
|
|
}
|
|
|
|
/*
|
|
* The acquire style atomic above guarantees visibility of items
|
|
* associated with any pidx change that we notice here.
|
|
*/
|
|
pidx = ns.pidx_tail;
|
|
pending = 0;
|
|
}
|
|
}
|
|
#endif
|
|
|
|
int
|
|
ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
|
|
mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
|
|
{
|
|
struct ifmp_ring *r;
|
|
|
|
/* All idx are 16b so size can be 65536 at most */
|
|
if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
|
|
can_drain == NULL)
|
|
return (EINVAL);
|
|
*pr = NULL;
|
|
flags &= M_NOWAIT | M_WAITOK;
|
|
MPASS(flags != 0);
|
|
|
|
r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
|
|
if (r == NULL)
|
|
return (ENOMEM);
|
|
r->size = size;
|
|
r->cookie = cookie;
|
|
r->mt = mt;
|
|
r->drain = drain;
|
|
r->can_drain = can_drain;
|
|
r->enqueues = counter_u64_alloc(flags);
|
|
r->drops = counter_u64_alloc(flags);
|
|
r->starts = counter_u64_alloc(flags);
|
|
r->stalls = counter_u64_alloc(flags);
|
|
r->restarts = counter_u64_alloc(flags);
|
|
r->abdications = counter_u64_alloc(flags);
|
|
if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
|
|
r->stalls == NULL || r->restarts == NULL ||
|
|
r->abdications == NULL) {
|
|
ifmp_ring_free(r);
|
|
return (ENOMEM);
|
|
}
|
|
|
|
*pr = r;
|
|
#ifdef MP_RING_NO_64BIT_ATOMICS
|
|
mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
|
|
#endif
|
|
return (0);
|
|
}
|
|
|
|
void
|
|
ifmp_ring_free(struct ifmp_ring *r)
|
|
{
|
|
|
|
if (r == NULL)
|
|
return;
|
|
|
|
if (r->enqueues != NULL)
|
|
counter_u64_free(r->enqueues);
|
|
if (r->drops != NULL)
|
|
counter_u64_free(r->drops);
|
|
if (r->starts != NULL)
|
|
counter_u64_free(r->starts);
|
|
if (r->stalls != NULL)
|
|
counter_u64_free(r->stalls);
|
|
if (r->restarts != NULL)
|
|
counter_u64_free(r->restarts);
|
|
if (r->abdications != NULL)
|
|
counter_u64_free(r->abdications);
|
|
|
|
free(r, r->mt);
|
|
}
|
|
|
|
/*
|
|
* Enqueue n items and maybe drain the ring for some time.
|
|
*
|
|
* Returns an errno.
|
|
*/
|
|
#ifdef MP_RING_NO_64BIT_ATOMICS
|
|
int
|
|
ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
|
|
{
|
|
union ring_state os, ns;
|
|
uint16_t pidx_start, pidx_stop;
|
|
int i;
|
|
|
|
MPASS(items != NULL);
|
|
MPASS(n > 0);
|
|
|
|
mtx_lock(&r->lock);
|
|
/*
|
|
* Reserve room for the new items. Our reservation, if successful, is
|
|
* from 'pidx_start' to 'pidx_stop'.
|
|
*/
|
|
os.state = r->state;
|
|
if (n >= space_available(r, os)) {
|
|
counter_u64_add(r->drops, n);
|
|
MPASS(os.flags != IDLE);
|
|
mtx_unlock(&r->lock);
|
|
if (os.flags == STALLED)
|
|
ifmp_ring_check_drainage(r, 0);
|
|
return (ENOBUFS);
|
|
}
|
|
ns.state = os.state;
|
|
ns.pidx_head = increment_idx(r, os.pidx_head, n);
|
|
r->state = ns.state;
|
|
pidx_start = os.pidx_head;
|
|
pidx_stop = ns.pidx_head;
|
|
|
|
/*
|
|
* Wait for other producers who got in ahead of us to enqueue their
|
|
* items, one producer at a time. It is our turn when the ring's
|
|
* pidx_tail reaches the beginning of our reservation (pidx_start).
|
|
*/
|
|
while (ns.pidx_tail != pidx_start) {
|
|
cpu_spinwait();
|
|
ns.state = r->state;
|
|
}
|
|
|
|
/* Now it is our turn to fill up the area we reserved earlier. */
|
|
i = pidx_start;
|
|
do {
|
|
r->items[i] = *items++;
|
|
if (__predict_false(++i == r->size))
|
|
i = 0;
|
|
} while (i != pidx_stop);
|
|
|
|
/*
|
|
* Update the ring's pidx_tail. The release style atomic guarantees
|
|
* that the items are visible to any thread that sees the updated pidx.
|
|
*/
|
|
os.state = ns.state = r->state;
|
|
ns.pidx_tail = pidx_stop;
|
|
if (abdicate) {
|
|
if (os.flags == IDLE)
|
|
ns.flags = ABDICATED;
|
|
} else
|
|
ns.flags = BUSY;
|
|
r->state = ns.state;
|
|
counter_u64_add(r->enqueues, n);
|
|
|
|
if (!abdicate) {
|
|
/*
|
|
* Turn into a consumer if some other thread isn't active as a consumer
|
|
* already.
|
|
*/
|
|
if (os.flags != BUSY)
|
|
drain_ring_locked(r, ns, os.flags, budget);
|
|
}
|
|
|
|
mtx_unlock(&r->lock);
|
|
return (0);
|
|
}
|
|
#else
|
|
int
|
|
ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
|
|
{
|
|
union ring_state os, ns;
|
|
uint16_t pidx_start, pidx_stop;
|
|
int i;
|
|
|
|
MPASS(items != NULL);
|
|
MPASS(n > 0);
|
|
|
|
/*
|
|
* Reserve room for the new items. Our reservation, if successful, is
|
|
* from 'pidx_start' to 'pidx_stop'.
|
|
*/
|
|
os.state = r->state;
|
|
for (;;) {
|
|
if (n >= space_available(r, os)) {
|
|
counter_u64_add(r->drops, n);
|
|
MPASS(os.flags != IDLE);
|
|
if (os.flags == STALLED)
|
|
ifmp_ring_check_drainage(r, 0);
|
|
return (ENOBUFS);
|
|
}
|
|
ns.state = os.state;
|
|
ns.pidx_head = increment_idx(r, os.pidx_head, n);
|
|
critical_enter();
|
|
if (atomic_fcmpset_64(&r->state, &os.state, ns.state))
|
|
break;
|
|
critical_exit();
|
|
cpu_spinwait();
|
|
}
|
|
pidx_start = os.pidx_head;
|
|
pidx_stop = ns.pidx_head;
|
|
|
|
/*
|
|
* Wait for other producers who got in ahead of us to enqueue their
|
|
* items, one producer at a time. It is our turn when the ring's
|
|
* pidx_tail reaches the beginning of our reservation (pidx_start).
|
|
*/
|
|
while (ns.pidx_tail != pidx_start) {
|
|
cpu_spinwait();
|
|
ns.state = r->state;
|
|
}
|
|
|
|
/* Now it is our turn to fill up the area we reserved earlier. */
|
|
i = pidx_start;
|
|
do {
|
|
r->items[i] = *items++;
|
|
if (__predict_false(++i == r->size))
|
|
i = 0;
|
|
} while (i != pidx_stop);
|
|
|
|
/*
|
|
* Update the ring's pidx_tail. The release style atomic guarantees
|
|
* that the items are visible to any thread that sees the updated pidx.
|
|
*/
|
|
os.state = r->state;
|
|
do {
|
|
ns.state = os.state;
|
|
ns.pidx_tail = pidx_stop;
|
|
if (abdicate) {
|
|
if (os.flags == IDLE)
|
|
ns.flags = ABDICATED;
|
|
} else
|
|
ns.flags = BUSY;
|
|
} while (atomic_fcmpset_rel_64(&r->state, &os.state, ns.state) == 0);
|
|
critical_exit();
|
|
counter_u64_add(r->enqueues, n);
|
|
|
|
if (!abdicate) {
|
|
/*
|
|
* Turn into a consumer if some other thread isn't active as a consumer
|
|
* already.
|
|
*/
|
|
if (os.flags != BUSY)
|
|
drain_ring_lockless(r, ns, os.flags, budget);
|
|
}
|
|
|
|
return (0);
|
|
}
|
|
#endif
|
|
|
|
void
|
|
ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
|
|
{
|
|
union ring_state os, ns;
|
|
|
|
os.state = r->state;
|
|
if ((os.flags != STALLED && os.flags != ABDICATED) || // Only continue in STALLED and ABDICATED
|
|
os.pidx_head != os.pidx_tail || // Require work to be available
|
|
(os.flags != ABDICATED && r->can_drain(r) == 0)) // Can either drain, or everyone left
|
|
return;
|
|
|
|
MPASS(os.cidx != os.pidx_tail); /* implied by STALLED */
|
|
ns.state = os.state;
|
|
ns.flags = BUSY;
|
|
|
|
#ifdef MP_RING_NO_64BIT_ATOMICS
|
|
mtx_lock(&r->lock);
|
|
if (r->state != os.state) {
|
|
mtx_unlock(&r->lock);
|
|
return;
|
|
}
|
|
r->state = ns.state;
|
|
drain_ring_locked(r, ns, os.flags, budget);
|
|
mtx_unlock(&r->lock);
|
|
#else
|
|
/*
|
|
* The acquire style atomic guarantees visibility of items associated
|
|
* with the pidx that we read here.
|
|
*/
|
|
if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
|
|
return;
|
|
|
|
drain_ring_lockless(r, ns, os.flags, budget);
|
|
#endif
|
|
}
|
|
|
|
void
|
|
ifmp_ring_reset_stats(struct ifmp_ring *r)
|
|
{
|
|
|
|
counter_u64_zero(r->enqueues);
|
|
counter_u64_zero(r->drops);
|
|
counter_u64_zero(r->starts);
|
|
counter_u64_zero(r->stalls);
|
|
counter_u64_zero(r->restarts);
|
|
counter_u64_zero(r->abdications);
|
|
}
|
|
|
|
int
|
|
ifmp_ring_is_idle(struct ifmp_ring *r)
|
|
{
|
|
union ring_state s;
|
|
|
|
s.state = r->state;
|
|
if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
|
|
s.flags == IDLE)
|
|
return (1);
|
|
|
|
return (0);
|
|
}
|
|
|
|
int
|
|
ifmp_ring_is_stalled(struct ifmp_ring *r)
|
|
{
|
|
union ring_state s;
|
|
|
|
s.state = r->state;
|
|
if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
|
|
return (1);
|
|
|
|
return (0);
|
|
}
|