Add FF_MULTI_SC macro definition to support some special application scenarios, let the child process worker inherit the specified sc.

Like Nginx with reuseport.
  Parent process socket/bind/listen multi sockets, and use them in different child process worker.

  All child process workers must be forked by the same process, scilicet:
    support master fork child1, [child1 fork child2], chilid2 fork worker1/worker2/worker3...
    But not support master fork worker1, worker fork worker2, worker2 fork worker3...

2. modify some log.
This commit is contained in:
fengbojiang 2023-04-27 17:24:32 +08:00
parent f98fd1a615
commit 3240dd0dad
4 changed files with 98 additions and 19 deletions

View File

@ -8,7 +8,7 @@ ifneq ($(shell pkg-config --exists libdpdk && echo 0),0)
$(error "No installation of DPDK found, maybe you should export environment variable `PKG_CONFIG_PATH`")
endif
DEBUG=-O0 -gdwarf-2 -g3
#DEBUG=-O0 -gdwarf-2 -g3
# Per thread separate initialization dpdk lib and attach sc when needed,
# such as listen same port in different threads, and socket can use in own thread.
@ -37,6 +37,10 @@ ifdef FF_KERNEL_EVENT
CFLAGS+= -DFF_KERNEL_EVENT
endif
ifdef FF_MULTI_SC
CFLAGS+= -DFF_MULTI_SC
endif
CFLAGS += -fPIC -Wall -Werror $(shell $(PKGCONF) --cflags libdpdk)
INCLUDES= -I. -I${FF_PATH}/lib

View File

@ -162,6 +162,30 @@ static __thread int sh_iov_static_fill_idx_share = 0;
static __FF_THREAD int inited = 0;
static __FF_THREAD struct ff_so_context *sc;
/*
* For parent process socket/bind/listen multi sockets
* and use them in different child process,
* like Nginx with reuseport.
*/
#ifdef FF_MULTI_SC
typedef struct ff_multi_sc {
int worker_id;
//int fd;
struct ff_so_context *sc;
} ff_multi_sc_type;
static ff_multi_sc_type scs[SOCKET_OPS_CONTEXT_MAX_NUM];
/*
* For child worker process,
* All workers must be forked by the same process, scilicet
* support master fork child1, [child1 fork child2], chilid2 fork worker1/worker2/worker3...
* But not support master fork worker1, worker fork worker2, worker2 fork worker3...
*/
#define CURRENT_WORKER_ID_DEFAULT 0
static int current_worker_id = CURRENT_WORKER_ID_DEFAULT;
#endif
static pthread_key_t key;
#ifdef FF_KERNEL_EVENT
@ -182,7 +206,6 @@ static uint64_t initial_lcore_id = INITIAL_LCORE_ID_DEFAULT;
#define WORKER_ID_DEFAULT 0
#define FF_PROC_ID_STR "FF_PROC_ID"
static int worker_id = WORKER_ID_DEFAULT;
static __thread int thread_id = 0;
rte_spinlock_t worker_id_lock;
/* The num of F-Stack process instance, default 1 */
@ -241,7 +264,7 @@ fstack_territory(int domain, int type, int protocol)
int
ff_hook_socket(int domain, int type, int protocol)
{
DEBUG_LOG("ff_hook_socket, domain:%d, type:%d, protocol:%d\n", domain, type, protocol);
ERR_LOG("ff_hook_socket, domain:%d, type:%d, protocol:%d\n", domain, type, protocol);
if (unlikely(fstack_territory(domain, type, protocol) == 0)) {
return ff_linux_socket(domain, type, protocol);
}
@ -251,9 +274,19 @@ ff_hook_socket(int domain, int type, int protocol)
return ff_linux_socket(domain, type, protocol);
}
if (unlikely(inited == 0 && ff_adapter_init() < 0)) {
return ff_linux_socket(domain, type, protocol);
if (unlikely(inited == 0)) {
if (ff_adapter_init() < 0) {
return ff_linux_socket(domain, type, protocol);
}
}
#ifdef FF_MULTI_SC
else {
if (ff_adapter_init() < 0) {
ERR_LOG("FF_MUTLI_SC ff_adapter_init failed\n");
return -1;
}
}
#endif
type &= ~SOCK_FSTACK;
@ -269,7 +302,7 @@ ff_hook_socket(int domain, int type, int protocol)
ret = convert_fstack_fd(ret);
}
DEBUG_LOG("ff_hook_socket return fd:%d\n", ret);
ERR_LOG("ff_hook_socket return fd:%d\n", ret);
RETURN();
}
@ -278,6 +311,8 @@ int
ff_hook_bind(int fd, const struct sockaddr *addr,
socklen_t addrlen)
{
ERR_LOG("ff_hook_bind, fd:%d, addr:%p, addrlen:%d\n", fd, addr, addrlen);
if (addr == NULL) {
errno = EINVAL;
return -1;
@ -307,6 +342,8 @@ ff_hook_bind(int fd, const struct sockaddr *addr,
int
ff_hook_listen(int fd, int backlog)
{
ERR_LOG("ff_hook_listen, fd:%d, backlog:%d\n", fd, backlog);
CHECK_FD_OWNERSHIP(listen, (fd, backlog));
DEFINE_REQ_ARGS(listen);
@ -1557,7 +1594,7 @@ int
ff_hook_epoll_create(int fdsize)
{
DEBUG_LOG("ff_hook_epoll_create, fdsize:%d\n", fdsize);
ERR_LOG("ff_hook_epoll_create, fdsize:%d\n", fdsize);
if (inited == 0 || ((fdsize & SOCK_KERNEL) && !(fdsize & SOCK_FSTACK))/* || (fdsize >= 1 && fdsize <= 16)*/) {
fdsize &= ~SOCK_KERNEL;
return ff_linux_epoll_create(fdsize);
@ -1580,7 +1617,7 @@ ff_hook_epoll_create(int fdsize)
ret = convert_fstack_fd(ret);
}
DEBUG_LOG("ff_hook_epoll_create return fd:%d\n", ret);
ERR_LOG("ff_hook_epoll_create return fd:%d\n", ret);
RETURN();
}
@ -1842,7 +1879,12 @@ ff_hook_fork(void)
{
pid_t pid;
DEBUG_LOG("ff_hook_fork\n");
ERR_LOG("ff_hook_fork\n");
#ifdef FF_MULTI_SC
/* Let the child process inherit the specified sc */
sc = scs[current_worker_id].sc;
#endif
if (sc) {
rte_spinlock_lock(&sc->lock);
}
@ -1853,6 +1895,19 @@ ff_hook_fork(void)
/* Parent process set refcount. */
if (pid > 0) {
sc->refcount++;
ERR_LOG("parent process, chilid pid:%d, sc:%p, sc->refcount:%d\n",
pid, sc, sc->refcount);
#ifdef FF_MULTI_SC
current_worker_id++;
ERR_LOG("parent process, current_worker_id++:%d\n", current_worker_id);
#endif
}
else if (pid == 0) {
ERR_LOG("chilid process, sc:%p, sc->refcount:%d\n",
sc, sc->refcount);
#ifdef FF_MULTI_SC
ERR_LOG("chilid process, current_worker_id:%d\n", current_worker_id);
#endif
}
/* Parent process unlock sc, fork success of failed. */
@ -2156,9 +2211,21 @@ ff_adapter_exit()
pthread_key_delete(key);
#ifndef FF_THREAD_SOCKET
ERR_LOG("pthread self tid:%lu, detach sc:%p\n", pthread_self(), sc);
ff_detach_so_context(sc);
sc = NULL;
#ifdef FF_MULTI_SC
if (current_worker_id == worker_id) {
int i;
for (i = 0; i < worker_id; i ++) {
ERR_LOG("pthread self tid:%lu, detach sc:%p\n", pthread_self(), scs[i].sc);
ff_detach_so_context(scs[i].sc);
}
} else
#endif
{
ERR_LOG("pthread self tid:%lu, detach sc:%p\n", pthread_self(), sc);
ff_detach_so_context(sc);
sc = NULL;
}
#endif
}
@ -2169,9 +2236,13 @@ ff_adapter_init()
{
int ret;
ERR_LOG("inited:%d, proc_inited:%d\n", inited, proc_inited);
#ifndef FF_MULTI_SC
if (inited) {
return 0;
}
#endif
if (proc_inited == 0) {
/* May conflict */
@ -2240,7 +2311,7 @@ ff_adapter_init()
}
/*
* Get environment variable FF_PROC_ID to set nb_procs.
* Get environment variable FF_PROC_ID to set worker_id.
*/
char *ff_worker_id = getenv(FF_PROC_ID_STR);
if (ff_worker_id != NULL) {
@ -2298,7 +2369,10 @@ ff_adapter_init()
pthread_setspecific(key, sc);
thread_id = worker_id;
#ifdef FF_MULTI_SC
scs[worker_id].worker_id = worker_id;
scs[worker_id].sc = sc;
#endif
worker_id++;
inited = 1;

View File

@ -76,7 +76,7 @@ linux_syscall_init()
int
ff_linux_socket(int domain, int type, int protocol)
{
DEBUG_LOG("ff_linux_socket, domain:%d, type:%d, protocol:%d\n", domain, type, protocol);
ERR_LOG("ff_linux_socket, domain:%d, type:%d, protocol:%d\n", domain, type, protocol);
SYSCALL(socket, (domain, type, protocol));
}
@ -226,14 +226,14 @@ int ff_linux_fcntl(int s, int cmd, unsigned long data)
int ff_linux_epoll_create(int size)
{
DEBUG_LOG("ff_linux_epoll_create, fdsize:%d\n", size);
ERR_LOG("ff_linux_epoll_create, fdsize:%d\n", size);
SYSCALL(epoll_create, (size));
}
int ff_linux_epoll_ctl(int epfd, int op, int fd,
struct epoll_event *event)
{
DEBUG_LOG("ff_linux_epoll_ctl, epfd:%d, op:%d, fd:%d\n", epfd, op, fd);
ERR_LOG("ff_linux_epoll_ctl, epfd:%d, op:%d, fd:%d\n", epfd, op, fd);
SYSCALL(epoll_ctl, (epfd, op, fd, event));
}

View File

@ -1,6 +1,7 @@
#ifndef _FF_SOCKET_OPS_H_
#define _FF_SOCKET_OPS_H_
#include <unistd.h>
#include <semaphore.h>
#include <rte_atomic.h>
@ -19,8 +20,8 @@
#endif
#define ERR_LOG(fmt, ...) do { \
printf("file:%s, line:%u, fun:%s, thread self tid:%ld, "fmt, \
__FILE__, __LINE__, __func__, pthread_self(), ##__VA_ARGS__); \
printf("file:%s, line:%u, fun:%s, pid:%d, "fmt, \
__FILE__, __LINE__, __func__, getpid(), ##__VA_ARGS__); \
} while (0)
#ifdef NDEBUG