This commit is contained in:
liujinhui-job 2025-05-13 15:47:23 +00:00 committed by GitHub
commit eb20a56447
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 437 additions and 4 deletions

View File

@ -25,6 +25,10 @@ endif
# Use for some scenarios similar to Nginx.
#FF_KERNEL_EVENT=1
# If enable FF_USE_THREAD_STRUCT_HANDLE, every ff_so_context has one struct thread handle in fstack, fork will be supported
# like linux kernel.
#FF_USE_THREAD_STRUCT_HANDLE=1
PKGCONF ?= pkg-config
ifndef DEBUG
@ -49,6 +53,10 @@ ifdef FF_PRELOAD_POLLING_MODE
CFLAGS+= -DFF_PRELOAD_POLLING_MODE
endif
ifdef FF_USE_THREAD_STRUCT_HANDLE
CFLAGS+= -DFF_USE_THREAD_STRUCT_HANDLE
endif
CFLAGS += -fPIC -Wall -Werror $(shell $(PKGCONF) --cflags libdpdk)
INCLUDES= -I. -I${FF_PATH}/lib

View File

@ -10,6 +10,8 @@
int ff_adapter_init();
//int __attribute__((constructor)) ff_adapter_init(int argc, char * const argv[]);
int ff_adapter_child_process_init(void);
void alarm_event_sem();
/*-

View File

@ -2137,6 +2137,11 @@ ff_hook_fork(void)
#ifdef FF_MULTI_SC
current_worker_id++;
ERR_LOG("parent process, current_worker_id++:%d\n", current_worker_id);
#endif
#ifdef FF_USE_THREAD_STRUCT_HANDLE
sc->forking = 1;
/* Loop until child fork done. */
while (sc->forking);
#endif
}
else if (pid == 0) {
@ -2144,6 +2149,27 @@ ff_hook_fork(void)
sc, sc->refcount, ff_so_zone);
#ifdef FF_MULTI_SC
ERR_LOG("chilid process, current_worker_id:%d\n", current_worker_id);
#endif
#ifdef FF_USE_THREAD_STRUCT_HANDLE
struct ff_so_context *parent_sc = sc;
/* Child process attach new sc */
ff_adapter_child_process_init();
/*
* The fork system call duplicates the file
* descriptors that were open in the parent process
*/
DEFINE_REQ_ARGS(fork);
args->parent_thread_handle = parent_sc->ff_thread_handle;
/* Output value */
args->child_thread_handle = NULL;
SYSCALL(FF_SO_FORK, args);
if (ret == 0) {
sc->ff_thread_handle = args->child_thread_handle;
}
parent_sc->forking = 0;
#endif
}
@ -2442,6 +2468,16 @@ thread_destructor(void *sc)
}
}
static inline int
ff_application_exit(struct ff_so_context *sc)
{
DEFINE_REQ_ARGS(exit_application);
args->sc = sc;
SYSCALL(FF_SO_EXIT_APPLICATION, args);
return ret;
}
void __attribute__((destructor))
ff_adapter_exit()
{
@ -2455,13 +2491,19 @@ ff_adapter_exit()
for (i = 0; i < worker_id; i++) {
ERR_LOG("pthread self tid:%lu, detach sc:%p\n", pthread_self(), scs[i].sc);
ff_so_zone = ff_so_zones[i];
ff_detach_so_context(scs[i].sc);
#ifdef FF_USE_THREAD_STRUCT_HANDLE
ff_application_exit(scs[i].sc);
#endif
ff_detach_so_context(scs[i].sc);
}
} else
#endif
{
ERR_LOG("pthread self tid:%lu, detach sc:%p\n", pthread_self(), sc);
ff_detach_so_context(sc);
#ifdef FF_USE_THREAD_STRUCT_HANDLE
ff_application_exit(sc);
#endif
ff_detach_so_context(sc);
sc = NULL;
}
#endif
@ -2618,11 +2660,40 @@ ff_adapter_init()
rte_spinlock_unlock(&worker_id_lock);
#ifdef FF_USE_THREAD_STRUCT_HANDLE
/*
* Request to fstack, alloc sc->ff_thread_handle
* Every appliaction
*/
{
DEFINE_REQ_ARGS(register_application);
args->sc = sc;
SYSCALL(FF_SO_REGISTER_APPLICATION, args);
if (ret < 0) {
return -1;
}
}
#endif
ERR_LOG("ff_adapter_init success, sc:%p, status:%d, ops:%d\n", sc, sc->status, sc->ops);
return 0;
}
int
ff_adapter_child_process_init(void)
{
sc = ff_attach_so_context(0);
if (sc == NULL) {
ERR_LOG("ff_attach_so_context failed\n");
return -1;
}
ERR_LOG("ff_adapter_child_process_init success, sc:%p, status:%d, ops:%d\n", sc, sc->status, sc->ops);
return 0;
}
void
alarm_event_sem()
{

View File

@ -95,6 +95,7 @@ ff_create_so_memzone()
sc->status = FF_SC_IDLE;
sc->idx = i;
sc->refcount = 0;
sc->ff_thread_handle = NULL;
//so_zone_tmp->inuse[i] = 0;
if (sem_init(&sc->wait_sem, 1, 0) == -1) {

View File

@ -370,8 +370,41 @@ ff_sys_kevent(struct ff_kevent_args *args)
static pid_t
ff_sys_fork(struct ff_fork_args *args)
{
errno = ENOSYS;
return -1;
void *parent = args->parent_thread_handle;
/*
* Linux has performed a real fork, and at this point,
* we simply need to create a new thread and duplicate the file descriptors
* that the parent process has already opened.
*/
if (parent) {
args->child_thread_handle = ff_adapt_user_thread_add(parent);
}
return 0;
}
static int
ff_sys_register_thread(struct ff_register_application_args *args)
{
/* New user application, use default thread0 */
args->sc->ff_thread_handle = ff_adapt_user_thread_add(NULL);
if (args->sc->ff_thread_handle == NULL) {
return -1;
}
return 0;
}
static int
ff_sys_exit_thread(struct ff_exit_application_args *args)
{
/* Exit user application */
if (args->sc->ff_thread_handle) {
ff_adapt_user_thread_exit(args->sc->ff_thread_handle);
}
return 0;
}
static int
@ -439,6 +472,10 @@ ff_so_handler(int ops, void *args)
return ff_sys_kevent((struct ff_kevent_args *)args);
case FF_SO_FORK:
return ff_sys_fork((struct ff_fork_args *)args);
case FF_SO_REGISTER_APPLICATION:
return ff_sys_register_thread((struct ff_register_application_args *)args);
case FF_SO_EXIT_APPLICATION:
return ff_sys_exit_thread((struct ff_exit_application_args *)args);
default:
break;
}
@ -451,6 +488,11 @@ ff_so_handler(int ops, void *args)
static inline void
ff_handle_socket_ops(struct ff_so_context *sc)
{
#ifdef FF_USE_THREAD_STRUCT_HANDLE
void *old_thread;
void *ff_thread_handle = sc->ff_thread_handle;
#endif
if (!rte_spinlock_trylock(&sc->lock)) {
return;
}
@ -463,7 +505,17 @@ ff_handle_socket_ops(struct ff_so_context *sc)
DEBUG_LOG("ff_handle_socket_ops sc:%p, status:%d, ops:%d\n", sc, sc->status, sc->ops);
errno = 0;
#ifdef FF_USE_THREAD_STRUCT_HANDLE
if (ff_thread_handle) {
old_thread = ff_switch_curthread(ff_thread_handle);
}
#endif
sc->result = ff_so_handler(sc->ops, sc->args);
#ifdef FF_USE_THREAD_STRUCT_HANDLE
if (ff_thread_handle) {
ff_restore_curthread(old_thread);
}
#endif
sc->error = errno;
DEBUG_LOG("ff_handle_socket_ops error:%d, ops:%d, result:%d\n", errno, sc->ops, sc->result);

View File

@ -64,6 +64,8 @@ enum FF_SOCKET_OPS {
FF_SO_KQUEUE,
FF_SO_KEVENT,
FF_SO_FORK, // 29
FF_SO_REGISTER_APPLICATION,
FF_SO_EXIT_APPLICATION,
};
enum FF_SO_CONTEXT_STATUS {
@ -110,6 +112,8 @@ struct ff_so_context {
/* CACHE LINE 1 */
/* listen fd, refcount.. */
int refcount;
void *ff_thread_handle;
volatile int forking;
} __attribute__((aligned(RTE_CACHE_LINE_SIZE)));
extern __FF_THREAD struct ff_socket_ops_zone *ff_so_zone;

View File

@ -191,7 +191,16 @@ struct ff_kevent_args {
};
struct ff_fork_args {
void *parent_thread_handle;
void *child_thread_handle;
};
struct ff_register_application_args {
struct ff_so_context *sc;
};
struct ff_exit_application_args {
struct ff_so_context *sc;
};
#endif

View File

@ -2516,6 +2516,82 @@ retry:
free(fdtol, M_FILEDESC_TO_LEADER);
}
/*
* Release a filedesc structure, we use this.
*/
static void
fdescfree_fds_adapt_use(struct thread *td, struct filedesc *fdp, bool needclose)
{
struct filedesc0 *fdp0;
struct freetable *ft, *tft;
struct filedescent *fde;
struct file *fp;
int i, lastfile;
KASSERT(refcount_load(&fdp->fd_refcnt) == 0,
("%s: fd table %p carries references", __func__, fdp));
/*
* Serialize with threads iterating over the table, if any.
*/
if (refcount_load(&fdp->fd_holdcnt) > 1) {
FILEDESC_XLOCK(fdp);
FILEDESC_XUNLOCK(fdp);
}
lastfile = fdlastfile_single(fdp);
for (i = 0; i <= lastfile; i++) {
fde = &fdp->fd_ofiles[i];
fp = fde->fde_file;
if (fp != NULL) {
fdefree_last(fde);
if (needclose)
(void) closefp_impl(fdp, i, fp, td, true);
else
fdrop(fp, td);
}
}
if (NDSLOTS(fdp->fd_nfiles) > NDSLOTS(NDFILE))
free(fdp->fd_map, M_FILEDESC);
if (fdp->fd_nfiles > NDFILE)
free(fdp->fd_files, M_FILEDESC);
fdp0 = (struct filedesc0 *)fdp;
SLIST_FOREACH_SAFE(ft, &fdp0->fd_free, ft_next, tft)
free(ft->ft_table, M_FILEDESC);
fddrop(fdp);
}
void
fdescfree_adapt_use(struct thread *td)
{
struct proc *p;
struct filedesc *fdp;
p = td->td_proc;
fdp = p->p_fd;
MPASS(fdp != NULL);
#ifdef RACCT
if (RACCT_ENABLED())
racct_set_unlocked(p, RACCT_NOFILE, 0);
#endif
if (p->p_fdtol != NULL)
fdclearlocks(td);
if (refcount_release(&fdp->fd_refcnt) == 0)
return;
fdescfree_fds_adapt_use(td, fdp, 1);
PROC_LOCK(p);
p->p_fd = NULL;
PROC_UNLOCK(p);
}
/*
* Release a filedesc structure.
*/

View File

@ -255,6 +255,7 @@ int fdcopy_remapped(struct filedesc *fdp, const int *fds, size_t nfds,
void fdinstall_remapped(struct thread *td, struct filedesc *fdp);
void fdunshare(struct thread *td);
void fdescfree(struct thread *td);
void fdescfree_adapt_use(struct thread *td);
void fdescfree_remapped(struct filedesc *fdp);
int fdlastfile(struct filedesc *fdp);
int fdlastfile_single(struct filedesc *fdp);

View File

@ -356,6 +356,18 @@ int ff_zc_mbuf_write(struct ff_zc_mbuf *m, const char *data, int len);
*/
int ff_zc_mbuf_read(struct ff_zc_mbuf *m, const char *data, int len);
/*
* Create user thread context for LD_PRELOAD mode.
* It saved in ff_so_context.
*/
void *ff_adapt_user_thread_add(void *parent);
void ff_adapt_user_thread_exit(void *td);
void *ff_switch_curthread(void *new_curthread);
void ff_restore_curthread(void *old_curthread);
/* ZERO COPY API end */
#ifdef __cplusplus

View File

@ -65,3 +65,7 @@ ff_pthread_join
pcurthread
ff_dpdk_raw_packet_send
ff_swi_net_excute
ff_adapt_user_thread_add
ff_adapt_user_thread_exit
ff_switch_curthread
ff_restore_curthread

View File

@ -68,6 +68,8 @@ struct prisonlist allprison;
MALLOC_DEFINE(M_FADVISE, "fadvise", "posix_fadvise(2) information");
int async_io_version;
extern unsigned int rand_r(unsigned int *seed);
extern int ff_adapt_user_proc_add(struct thread *parent_td, struct thread *td);
extern int ff_adapt_user_proc_exit(struct thread *td);
unsigned int seed = 0;
#define M_ZERO 0x0100 /* bzero the allocation */
@ -78,6 +80,65 @@ int vttoif_tab[10] = {
};
void ff_init_thread0(void);
//Only used by LD_PRELOAD mode.
void *ff_adapt_user_thread_add(void *parent);
void ff_adapt_user_thread_exit(void *td);
void *ff_switch_curthread(void *new_curthread);
void ff_restore_curthread(void *old_curthread);
void *
ff_adapt_user_thread_add(void *parent)
{
struct thread *parent_td = (struct thread *)parent;
/* new application */
if (parent_td == NULL) {
parent_td = &thread0;
}
struct thread *td = malloc(sizeof(struct proc), M_TEMP, M_ZERO);
if (td == NULL) {
goto fail;
}
if (ff_adapt_user_proc_add(parent_td, td) < 0) {
free(td, M_TEMP);
goto fail;
}
return (void*)td;
fail:
return NULL;
}
void
ff_adapt_user_thread_exit(void *td)
{
ff_adapt_user_proc_exit((struct thread*)td);
free(td, M_TEMP);
}
inline void *
ff_switch_curthread(void *new_curthread)
{
void *old_curthread = pcurthread;
if (new_curthread != NULL) {
pcurthread = new_curthread;
}
return old_curthread;
}
inline void
ff_restore_curthread(void *old_curthread)
{
if (old_curthread != NULL) {
pcurthread = old_curthread;
}
}
void
resettodr(void)

View File

@ -746,6 +746,14 @@ lim_alloc()
return (limp);
}
void
lim_free(struct plimit *limp)
{
if (refcount_release(&limp->pl_refcnt))
free((void *)limp, M_PLIMIT);
}
struct plimit *
lim_hold(struct plimit *limp)
{

View File

@ -89,6 +89,8 @@ __FBSDID("$FreeBSD$");
#include <ddb/db_sym.h>
void mi_startup(void); /* Should be elsewhere */
int ff_adapt_user_proc_add(struct thread *parent_td, struct thread *td);
void ff_adapt_user_proc_exit(struct thread *td);
/* Components of the first process -- never freed. */
struct proc proc0;
@ -536,3 +538,125 @@ proc0_post(void *dummy __unused)
#endif
}
SYSINIT(p0post, SI_SUB_INTRINSIC_POST, SI_ORDER_FIRST, proc0_post, NULL);
int
ff_adapt_user_proc_add(struct thread *parent_td, struct thread *td)
{
struct proc *p;
vm_paddr_t pageablemem;
int i;
GIANT_REQUIRED;
p = malloc(sizeof(struct proc), M_TEMP, M_ZERO);
if (p == NULL) {
return -1;
}
/*
* Initialize magic number and osrel.
*/
p->p_magic = P_MAGIC;
p->p_sysent = &null_sysvec;
p->p_flag = P_SYSTEM | P_INMEM;
p->p_state = PRS_NORMAL;
p->p_klist = knlist_alloc(&p->p_mtx);
STAILQ_INIT(&p->p_ktr);
p->p_nice = NZERO;
td->td_tid = PID_MAX + 1;
td->td_state = TDS_RUNNING;
td->td_pri_class = PRI_TIMESHARE;
td->td_user_pri = PUSER;
td->td_base_user_pri = PUSER;
td->td_priority = PVM;
td->td_base_pri = PUSER;
td->td_oncpu = 0;
td->td_flags = TDF_INMEM|TDP_KTHREAD;
td->td_proc = p;
p->p_peers = 0;
p->p_leader = p;
strncpy(p->p_comm, "kernel", sizeof (p->p_comm));
strncpy(td->td_name, "swapper", sizeof (td->td_name));
callout_init(&p->p_itcallout, CALLOUT_MPSAFE);
callout_init_mtx(&p->p_limco, &p->p_mtx, 0);
callout_init(&td->td_slpcallout, CALLOUT_MPSAFE);
/* Create credentials. */
p->p_ucred = crget();
p->p_ucred->cr_ngroups = 1; /* group 0 */
p->p_ucred->cr_uidinfo = uifind(0);
p->p_ucred->cr_ruidinfo = uifind(0);
p->p_ucred->cr_prison = &prison0;
#ifdef AUDIT
audit_cred_kproc0(p->p_ucred);
#endif
#ifdef MAC
mac_cred_create_swapper(p->p_ucred);
#endif
td->td_ucred = crhold(p->p_ucred);
/*
* Create the file descriptor table.
* Copy from parent thread.
*/
p->p_fd = fdcopy(parent_td->td_proc->p_fd);
p->p_fdtol = NULL;
/* Create the limits structures. */
p->p_limit = lim_alloc();
for (i = 0; i < RLIM_NLIMITS; i++)
p->p_limit->pl_rlimit[i].rlim_cur =
p->p_limit->pl_rlimit[i].rlim_max = RLIM_INFINITY;
p->p_limit->pl_rlimit[RLIMIT_NOFILE].rlim_cur =
p->p_limit->pl_rlimit[RLIMIT_NOFILE].rlim_max = maxfiles;
p->p_limit->pl_rlimit[RLIMIT_NPROC].rlim_cur =
p->p_limit->pl_rlimit[RLIMIT_NPROC].rlim_max = maxproc;
p->p_limit->pl_rlimit[RLIMIT_DATA].rlim_cur = dfldsiz;
p->p_limit->pl_rlimit[RLIMIT_DATA].rlim_max = maxdsiz;
p->p_limit->pl_rlimit[RLIMIT_STACK].rlim_cur = dflssiz;
p->p_limit->pl_rlimit[RLIMIT_STACK].rlim_max = maxssiz;
/* Cast to avoid overflow on i386/PAE. */
pageablemem = ptoa((vm_paddr_t)vm_free_count());
p->p_limit->pl_rlimit[RLIMIT_RSS].rlim_cur =
p->p_limit->pl_rlimit[RLIMIT_RSS].rlim_max = pageablemem;
p->p_limit->pl_rlimit[RLIMIT_MEMLOCK].rlim_cur = pageablemem / 3;
p->p_limit->pl_rlimit[RLIMIT_MEMLOCK].rlim_max = pageablemem;
p->p_cpulimit = RLIM_INFINITY;
/*
* Call the init and ctor for the new thread and proc. We wait
* to do this until all other structures are fairly sane.
*/
EVENTHANDLER_INVOKE(process_init, p);
EVENTHANDLER_INVOKE(thread_init, td);
EVENTHANDLER_INVOKE(process_ctor, p);
EVENTHANDLER_INVOKE(thread_ctor, td);
return 0;
}
void
ff_adapt_user_proc_exit(struct thread *td)
{
struct proc *p = td->td_proc;
if (p->p_klist) {
knlist_detach(p->p_klist);
}
/* Close all */
fdescfree_adapt_use(td);
if (p->p_limit) {
lim_free(p->p_limit);
}
free(p, M_TEMP);
}