1. Add kevent demo, need link libff_syscall.so, not LD_PRELOAD.

2. Fix some issues.
This commit is contained in:
fengbojiang 2023-04-07 20:39:31 +08:00
parent e8ed03b521
commit f27d862f85
8 changed files with 534 additions and 18 deletions

View File

@ -846,6 +846,8 @@ ff_hook_recvmsg(int fd, struct msghdr *msg, int flags)
ssize_t
ff_hook_read(int fd, void *buf, size_t len)
{
DEBUG_LOG("ff_hook_read, fd:%d, len:%lu\n", fd, len);
if (buf == NULL || len == 0) {
errno = EINVAL;
return -1;
@ -1000,6 +1002,8 @@ ff_hook_send(int fd, const void *buf, size_t len, int flags)
ssize_t
ff_hook_write(int fd, const void *buf, size_t len)
{
DEBUG_LOG("ff_hook_write, fd:%d, len:%lu\n", fd, len);
if (buf == NULL || len == 0) {
errno = EINVAL;
return -1;
@ -1062,6 +1066,8 @@ ff_hook_writev(int fd, const struct iovec *iov, int iovcnt)
int
ff_hook_close(int fd)
{
DEBUG_LOG("ff_hook_close, fd:%d\n", fd);
CHECK_FD_OWNERSHIP(close, (fd));
DEFINE_REQ_ARGS(close);
@ -1288,6 +1294,8 @@ kqueue()
{
int ret = -1;
DEBUG_LOG("run kqueue\n");
if (unlikely(inited == 0)) {
errno = ENOSYS;
return -1;
@ -1299,6 +1307,8 @@ kqueue()
ret = convert_fstack_fd(ret);
}
DEBUG_LOG("get fd:%d\n", ret);
return ret;
}
@ -1307,6 +1317,11 @@ kevent(int kq, const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents,
const struct timespec *timeout)
{
int i;
struct kevent *kev;
DEBUG_LOG("kq:%d, nchanges:%d, nevents:%d\n", kq, nchanges, nevents);
if (unlikely(inited == 0 && ff_adapter_init() < 0)) {
errno = ENOSYS;
return -1;
@ -1326,8 +1341,6 @@ kevent(int kq, const struct kevent *changelist, int nchanges,
rte_memcpy(sh_changelist, changelist, sizeof(struct kevent) * nchanges);
struct kevent *kev;
int i = 0;
for(i = 0; i < nchanges; i++) {
kev = (struct kevent *)&sh_changelist[i];
switch (kev->filter) {
@ -1402,6 +1415,11 @@ kevent(int kq, const struct kevent *changelist, int nchanges,
if (eventlist && nevents) {
rte_memcpy(eventlist, sh_eventlist,
sizeof(struct kevent) * ret);
for (i = 0; i < nevents; i++) {
kev = &eventlist[i];
kev->ident = convert_fstack_fd(kev->ident);
}
}
}

View File

@ -5,4 +5,8 @@
#define FF_SYSCALL_DECL(ret, fn, args) extern ret ff_hook_##fn args
#include <ff_declare_syscalls.h>
extern int kqueue(void);
extern int kevent(int kq, const struct kevent *changelist, int nchanges,
struct kevent *eventlist, int nevents, const struct timespec *timeout);
#endif

View File

@ -207,6 +207,7 @@ ff_sys_recvmsg(struct ff_recvmsg_args *args)
static ssize_t
ff_sys_read(struct ff_read_args *args)
{
DEBUG_LOG("ff_sys_read, fd:%d, len:%lu\n", args->fd, args->len);
return ff_read(args->fd, args->buf, args->len);
}
@ -238,6 +239,7 @@ ff_sys_sendmsg(struct ff_sendmsg_args *args)
static ssize_t
ff_sys_write(struct ff_write_args *args)
{
DEBUG_LOG("ff_sys_write, fd:%d, len:%lu\n", args->fd, args->len);
return ff_write(args->fd, args->buf, args->len);
}
@ -250,6 +252,7 @@ ff_sys_writev(struct ff_writev_args *args)
static int
ff_sys_close(struct ff_close_args *args)
{
DEBUG_LOG("ff_sys_close, fd:%d\n", args->fd);
sockaddr_unbind(args->fd);
return ff_close(args->fd);
}
@ -389,6 +392,9 @@ ff_handle_kevent(struct ff_so_context *sc)
{
struct ff_kevent_args *ka = sc->args;
DEBUG_LOG("ff_handle_kevent sc:%p, status:%d, ops:%d, kq:%d, nchanges:%d, nevents:%d\n",
sc, sc->status, sc->ops, ka->kq, ka->nchanges, ka->nevents);
errno = 0;
sc->result = ff_sys_kevent(ka);
sc->error = errno;
@ -397,9 +403,9 @@ ff_handle_kevent(struct ff_so_context *sc)
ka->nchanges = 0;
}
if (sc->result == 0 && ka->nevents != 0) {
/*if (sc->result == 0 && ka->nevents != 0) {
return;
}
}*/
sc->status = FF_SC_REP;
sem_post(&sc->wait_sem);

View File

@ -20,8 +20,10 @@ TARGET="helloworld"
all:
cc ${CFLAGS} -DINET6 -o ${TARGET} main.c ${LIBS}
cc ${CFLAGS} -o ${TARGET}_epoll main_epoll.c ${LIBS}
cc ${CFLAGS} -I ../adapter -o ${TARGET}_stack_epoll main_stack_epoll.c ${LIBS}
cc ${CFLAGS} -I ../adapter -o ${TARGET}_stack_epoll_thread_socket main_stack_epoll_thread_socket.c ${LIBS}
cc ${CFLAGS} -I ${FF_PATH}/adapter -L ${FF_PATH}/adapter -lff_syscall -o ${TARGET}_stack main_stack.c ${LIBS}
cc ${CFLAGS} -I ${FF_PATH}/adapter -L ${FF_PATH}/adapter -lff_syscall -o ${TARGET}_stack_thread_socket main_stack_thread_socket.c ${LIBS}
cc ${CFLAGS} -I ${FF_PATH}/adapter -o ${TARGET}_stack_epoll main_stack_epoll.c ${LIBS}
cc ${CFLAGS} -I ${FF_PATH}/adapter -o ${TARGET}_stack_epoll_thread_socket main_stack_epoll_thread_socket.c ${LIBS}
.PHONY: clean
clean:

210
example/main_stack.c Normal file
View File

@ -0,0 +1,210 @@
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <errno.h>
#include <assert.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <pthread.h>
#include <signal.h>
//#include "ff_config.h"
//#include "ff_api.h"
#include "ff_event.h"
#include "ff_adapter.h"
#include "ff_hook_syscall.h"
pthread_t hworker;
#define MAX_EVENTS 512
/* kevent set */
struct kevent kevSet;
/* events */
struct kevent events[MAX_EVENTS];
/* kq */
int kq;
int sockfd;
static int exit_flag = 0;
char html[] =
"HTTP/1.1 200 OK\r\n"
"Server: F-Stack\r\n"
"Date: Sat, 25 Feb 2017 09:26:33 GMT\r\n"
"Content-Type: text/html\r\n"
"Content-Length: 438\r\n"
"Last-Modified: Tue, 21 Feb 2017 09:44:03 GMT\r\n"
"Connection: keep-alive\r\n"
"Accept-Ranges: bytes\r\n"
"\r\n"
"<!DOCTYPE html>\r\n"
"<html>\r\n"
"<head>\r\n"
"<title>Welcome to F-Stack!</title>\r\n"
"<style>\r\n"
" body { \r\n"
" width: 35em;\r\n"
" margin: 0 auto; \r\n"
" font-family: Tahoma, Verdana, Arial, sans-serif;\r\n"
" }\r\n"
"</style>\r\n"
"</head>\r\n"
"<body>\r\n"
"<h1>Welcome to F-Stack!</h1>\r\n"
"\r\n"
"<p>For online documentation and support please refer to\r\n"
"<a href=\"http://F-Stack.org/\">F-Stack.org</a>.<br/>\r\n"
"\r\n"
"<p><em>Thank you for using F-Stack.</em></p>\r\n"
"</body>\r\n"
"</html>";
void sig_term(int sig)
{
printf("we caught signal %d, to exit helloworld\n", sig);
exit_flag = 1;
return;
}
void *loop(void *arg)
{
/* Wait for events to happen */
while (!exit_flag) {
int nevents = kevent(kq, NULL, 0, events, MAX_EVENTS, NULL);
int i;
if (nevents <= 0) {
if (nevents) {
printf("ff_kevent failed:%d, %s\n", errno,
strerror(errno));
return NULL;
}
//usleep(100);
sleep(1);
}
printf("get nevents:%d\n", nevents);
for (i = 0; i < nevents; ++i) {
struct kevent event = events[i];
int clientfd = (int)event.ident;
/* Handle disconnect */
if (event.flags & EV_EOF) {
/* Simply close socket */
close(clientfd);
} else if (clientfd == sockfd) {
int available = (int)event.data;
do {
int nclientfd = accept(clientfd, NULL, NULL);
if (nclientfd < 0) {
printf("ff_accept failed:%d, %s\n", errno,
strerror(errno));
break;
}
/* Add to event list */
EV_SET(&kevSet, nclientfd, EVFILT_READ, EV_ADD, 0, 0, NULL);
if(kevent(kq, &kevSet, 1, NULL, 0, NULL) < 0) {
printf("ff_kevent error:%d, %s\n", errno,
strerror(errno));
close(nclientfd);
break;
}
available--;
} while (available);
} else if (event.filter == EVFILT_READ) {
char buf[256];
ssize_t readlen = read(clientfd, buf, sizeof(buf));
ssize_t writelen = write(clientfd, html, sizeof(html) - 1);
if (writelen < 0){
printf("ff_write failed:%d, %s\n", errno,
strerror(errno));
close(clientfd);
}
} else {
printf("unknown event: %8.8X\n", event.flags);
}
}
}
return NULL;
}
int main(int argc, char * argv[])
{
signal(SIGINT, sig_term);
signal(SIGTERM, sig_term);
sockfd = socket(AF_INET, SOCK_STREAM, 0);
printf("sockfd:%d\n", sockfd);
if (sockfd < 0) {
printf("ff_socket failed, sockfd:%d, errno:%d, %s\n", sockfd, errno, strerror(errno));
return -1;;
}
/* Set non blocking */
int on = 1;
ioctl(sockfd, FIONBIO, &on);
struct sockaddr_in my_addr;
bzero(&my_addr, sizeof(my_addr));
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons(80);
my_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int ret = bind(sockfd, (const struct sockaddr *)&my_addr, sizeof(my_addr));
if (ret < 0) {
printf("ff_bind failed, sockfd:%d, errno:%d, %s\n", sockfd, errno, strerror(errno));
close(sockfd);
return -1;
}
ret = listen(sockfd, MAX_EVENTS);
if (ret < 0) {
printf("ff_listen failed, sockfd:%d, errno:%d, %s\n", sockfd, errno, strerror(errno));
close(sockfd);
return -1;
}
kq = kqueue();
printf("kq:%d\n", kq);
if (kq < 0) {
printf("ff_kqueue failed, errno:%d, %s\n", errno, strerror(errno));
close(sockfd);
return -1;
}
EV_SET(&kevSet, sockfd, EVFILT_READ, EV_ADD, 0, MAX_EVENTS, NULL);
/* Update kqueue */
ret = kevent(kq, &kevSet, 1, NULL, 0, NULL);
if (ret < 0) {
printf("kevent failed\n");
close(kq);
close(sockfd);
return -1;
}
if(pthread_create(&hworker, NULL, loop, NULL) < 0) {
printf("create loop thread failed., errno:%d/%s\n",
errno, strerror(errno));
close(kq);
close(sockfd);
return -1;
}
pthread_join(hworker, NULL);
close(kq);
close(sockfd);
return 0;
}

View File

@ -69,16 +69,18 @@ void *loop(void *arg)
/* Wait for events to happen */
while (!exit_flag) {
int nevents = epoll_wait(epfd, events, MAX_EVENTS, 100);
int i;
if (nevents <= 0) {
if (nevents) {
printf("hello world epoll wait ret %d, errno:%d, %s\n",
nevents, errno, strerror(errno));
break;
}
//usleep(100);
sleep(1);
}
printf("get nevents:%d\n", nevents);
int i;
for (i = 0; i < nevents; ++i) {
/* Handle new connect */
@ -95,6 +97,7 @@ void *loop(void *arg)
if (epoll_ctl(epfd, EPOLL_CTL_ADD, nclientfd, &ev) != 0) {
printf("ff_epoll_ctl failed:%d, %s\n",
errno, strerror(errno));
close(nclientfd);
break;
}
}
@ -118,6 +121,8 @@ void *loop(void *arg)
}
}
}
return NULL;
}
int main(int argc, char * argv[])
@ -131,7 +136,7 @@ int main(int argc, char * argv[])
printf("sockfd:%d\n", sockfd);
if (sockfd < 0) {
printf("ff_socket failed\n");
exit(1);
return -1;
}
int on = 1;
@ -146,29 +151,41 @@ int main(int argc, char * argv[])
int ret = bind(sockfd, (const struct sockaddr *)&my_addr, sizeof(my_addr));
if (ret < 0) {
printf("ff_bind failed\n");
exit(1);
close(sockfd);
return -1;
}
ret = listen(sockfd, MAX_EVENTS);
if (ret < 0) {
printf("ff_listen failed\n");
exit(1);
close(sockfd);
return -1;
}
epfd = epoll_create(0);
if (epfd <= 0) {
printf("ff_epoll_create failed, errno:%d, %s\n",
errno, strerror(errno));
exit(1);
close(sockfd);
return -1;
}
ev.data.fd = sockfd;
ev.events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
if (ret < 0) {
printf("ff_listen failed\n");
close(epfd);
close(sockfd);
return -1;
}
for (i = 0; i < worker_num; i++) {
if(pthread_create(&hworker[i], NULL, loop, (void *)&i) < 0) {
printf("create loop thread failed., errno:%d/%s\n",
errno, strerror(errno));
close(epfd);
close(sockfd);
return -1;
}
}
@ -177,5 +194,8 @@ int main(int argc, char * argv[])
pthread_join(hworker[i], NULL);
}
close(epfd);
close(sockfd);
return 0;
}

View File

@ -76,7 +76,7 @@ void *loop(void *arg)
if (sockfd < 0) {
printf("thread %d, ff_socket failed\n", thread_id);
pthread_spin_unlock(&worker_lock);
exit(1);
return NULL;
}
/* socket will init adapter,so unlock after socket */
@ -94,38 +94,49 @@ void *loop(void *arg)
int ret = bind(sockfd, (const struct sockaddr *)&my_addr, sizeof(my_addr));
if (ret < 0) {
printf("thread %d, ff_bind failed\n", thread_id);
exit(1);
close(sockfd);
return NULL;
}
ret = listen(sockfd, MAX_EVENTS);
if (ret < 0) {
printf("thread %d, ff_listen failed\n", thread_id);
exit(1);
close(sockfd);
return NULL;
}
epfd = epoll_create(0);
if (epfd <= 0) {
printf("thread %d, ff_epoll_create failed, errno:%d, %s\n",
thread_id, errno, strerror(errno));
exit(1);
close(sockfd);
return NULL;
}
ev.data.fd = sockfd;
ev.events = EPOLLIN;
epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, sockfd, &ev);
if (ret < 0) {
printf("ff_listen failed\n");
close(epfd);
close(sockfd);
return NULL;
}
/* Wait for events to happen */
while (!exit_flag) {
int nevents = epoll_wait(epfd, events, MAX_EVENTS, 100);
int i;
if (nevents <= 0) {
if (nevents) {
printf("thread %d, hello world epoll wait ret %d, errno:%d, %s\n",
thread_id, nevents, errno, strerror(errno));
break;
}
//usleep(100);
sleep(1);
}
printf("thread %d, get nevents:%d\n", thread_id, nevents);
int i;
for (i = 0; i < nevents; ++i) {
/* Handle new connect */
@ -142,6 +153,7 @@ void *loop(void *arg)
if (epoll_ctl(epfd, EPOLL_CTL_ADD, nclientfd, &ev) != 0) {
printf("thread %d, ff_epoll_ctl failed:%d, %s\n",
thread_id, errno, strerror(errno));
close(nclientfd);
break;
}
}
@ -165,6 +177,11 @@ void *loop(void *arg)
}
}
}
close(epfd);
close(sockfd);
return NULL;
}
int main(int argc, char * argv[])

View File

@ -0,0 +1,239 @@
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <arpa/inet.h>
#include <errno.h>
#include <assert.h>
#include <sys/ioctl.h>
#include <unistd.h>
#include <pthread.h>
#include <signal.h>
//#include "ff_config.h"
//#include "ff_api.h"
#include "ff_event.h"
#include "ff_adapter.h"
#include "ff_hook_syscall.h"
#define MAX_WORKERS 128
pthread_t hworker[MAX_WORKERS];
pthread_spinlock_t worker_lock;
#define MAX_EVENTS 512
static int exit_flag = 0;
char html[] =
"HTTP/1.1 200 OK\r\n"
"Server: F-Stack\r\n"
"Date: Sat, 25 Feb 2017 09:26:33 GMT\r\n"
"Content-Type: text/html\r\n"
"Content-Length: 438\r\n"
"Last-Modified: Tue, 21 Feb 2017 09:44:03 GMT\r\n"
"Connection: keep-alive\r\n"
"Accept-Ranges: bytes\r\n"
"\r\n"
"<!DOCTYPE html>\r\n"
"<html>\r\n"
"<head>\r\n"
"<title>Welcome to F-Stack!</title>\r\n"
"<style>\r\n"
" body { \r\n"
" width: 35em;\r\n"
" margin: 0 auto; \r\n"
" font-family: Tahoma, Verdana, Arial, sans-serif;\r\n"
" }\r\n"
"</style>\r\n"
"</head>\r\n"
"<body>\r\n"
"<h1>Welcome to F-Stack!</h1>\r\n"
"\r\n"
"<p>For online documentation and support please refer to\r\n"
"<a href=\"http://F-Stack.org/\">F-Stack.org</a>.<br/>\r\n"
"\r\n"
"<p><em>Thank you for using F-Stack.</em></p>\r\n"
"</body>\r\n"
"</html>";
void sig_term(int sig)
{
printf("we caught signal %d, to exit helloworld\n", sig);
exit_flag = 1;
return;
}
void *loop(void *arg)
{
/* kevent set */
struct kevent kevSet;
/* events */
struct kevent events[MAX_EVENTS];
/* kq */
int kq;
int sockfd;
int thread_id;
thread_id = *(int *)arg;
printf("start thread %d\n", thread_id);
sockfd = socket(AF_INET, SOCK_STREAM, 0);
printf("thread %d, sockfd:%d\n", thread_id, sockfd);
if (sockfd < 0) {
printf("thread %d, ff_socket failed\n", thread_id);
pthread_spin_unlock(&worker_lock);
return NULL;
}
/* socket will init adapter,so unlock after socket */
pthread_spin_unlock(&worker_lock);
int on = 1;
ioctl(sockfd, FIONBIO, &on);
struct sockaddr_in my_addr;
bzero(&my_addr, sizeof(my_addr));
my_addr.sin_family = AF_INET;
my_addr.sin_port = htons(80);
my_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int ret = bind(sockfd, (const struct sockaddr *)&my_addr, sizeof(my_addr));
if (ret < 0) {
printf("thread %d, ff_bind failed\n", thread_id);
close(sockfd);
return NULL;
}
ret = listen(sockfd, MAX_EVENTS);
if (ret < 0) {
printf("thread %d, ff_listen failed\n", thread_id);
close(sockfd);
return NULL;
}
kq = kqueue();
printf("thread %d, kq:%d\n", thread_id, kq);
if (kq < 0) {
printf("thread %d, ff_kqueue failed, errno:%d, %s\n", thread_id, errno, strerror(errno));
close(sockfd);
return NULL;
}
EV_SET(&kevSet, sockfd, EVFILT_READ, EV_ADD, 0, MAX_EVENTS, NULL);
/* Update kqueue */
ret = kevent(kq, &kevSet, 1, NULL, 0, NULL);
if (ret < 0) {
printf("thread %d, kevent failed\n", thread_id);
close(kq);
close(sockfd);
return NULL;
}
/* Wait for events to happen */
while (!exit_flag) {
int nevents = kevent(kq, NULL, 0, events, MAX_EVENTS, NULL);
int i;
if (nevents <= 0) {
if (nevents) {
printf("thread %d, ff_kevent failed:%d, %s\n", thread_id, errno,
strerror(errno));
return NULL;
}
//usleep(100);
sleep(1);
}
printf("thread %d, get nevents:%d\n", thread_id, nevents);
for (i = 0; i < nevents; ++i) {
struct kevent event = events[i];
int clientfd = (int)event.ident;
/* Handle disconnect */
if (event.flags & EV_EOF) {
/* Simply close socket */
close(clientfd);
} else if (clientfd == sockfd) {
int available = (int)event.data;
do {
int nclientfd = accept(clientfd, NULL, NULL);
if (nclientfd < 0) {
printf("thread %d, ff_accept failed:%d, %s\n", thread_id, errno,
strerror(errno));
break;
}
/* Add to event list */
EV_SET(&kevSet, nclientfd, EVFILT_READ, EV_ADD, 0, 0, NULL);
if(kevent(kq, &kevSet, 1, NULL, 0, NULL) < 0) {
printf("thread %d, ff_kevent error:%d, %s\n", thread_id, errno,
strerror(errno));
close(nclientfd);
break;
}
available--;
} while (available);
} else if (event.filter == EVFILT_READ) {
char buf[256];
ssize_t readlen = read(clientfd, buf, sizeof(buf));
ssize_t writelen = write(clientfd, html, sizeof(html) - 1);
if (writelen < 0){
printf("thread %d, ff_write failed:%d, %s\n", thread_id, errno,
strerror(errno));
close(clientfd);
}
} else {
printf("thread %d, unknown event: %8.8X\n", thread_id, event.flags);
}
}
}
close(kq);
close(sockfd);
return NULL;
}
int main(int argc, char * argv[])
{
int i, worker_num;
signal(SIGINT, sig_term);
signal(SIGTERM, sig_term);
if (argc == 1) {
worker_num = 1;
} else {
worker_num = atoi(argv[1]);
}
printf("to init %d workers.\n", worker_num);
pthread_spin_init(&worker_lock, PTHREAD_PROCESS_PRIVATE);
pthread_spin_lock(&worker_lock);
for (i = 0; i < worker_num; i++) {
if(pthread_create(&hworker[i], NULL, loop, (void *)&i) < 0) {
printf("create loop thread failed., errno:%d/%s\n",
errno, strerror(errno));
pthread_spin_unlock(&worker_lock);
pthread_spin_destroy(&worker_lock);
return -1;
}
pthread_spin_lock(&worker_lock);
}
for (i = 0; i < worker_num; i++) {
pthread_join(hworker[i], NULL);
}
pthread_spin_destroy(&worker_lock);
return 0;
}