#ifdef PLATFORM_POSIX #include #endif// PLATFORM_POSIX #include #include #include #include #include #include #include "uv.h" #include "hw_string.h" #include "khash.h" #include "http_server.h" #include "connection_consumer.h" #include "connection_dispatcher.h" #include "http_response_cache.h" #include "server_stats.h" #include "configuration/configuration.h" #include "http_connection.h" #include "http_request.h" #include "misc.h" #define UVERR(err, msg) dzlog_error("%s: %s\n", msg, uv_strerror(err)) //fprintf(stderr, "%s: %s\n", msg, uv_strerror(err)) #if 0 #define CHECK(r, msg) \ if (r) { \ uv_err_t err = uv_last_error(uv_loop); \ UVERR(err, msg); \ exit(1); \ } #endif KHASH_MAP_INIT_STR(string_hashmap, hw_route_entry *) configuration *config; static uv_tcp_t server; static http_parser_settings parser_settings; static struct sockaddr_in listen_address; uv_loop_t *uv_loop; void *routes; hw_string *http_v1_0; hw_string *http_v1_1; hw_string *server_name; int listener_count; uv_async_t *listener_async_handles; uv_loop_t *listener_event_loops; uv_barrier_t *listeners_created_barrier; int hw_init_with_config(configuration *c) { #ifdef DEBUG char route[] = "/stats"; hw_http_add_route(route, get_server_stats, NULL); #endif /* DEBUG */ /* Copy the configuration */ config = malloc(sizeof(configuration)); config->http_listen_address = dupstr(c->http_listen_address); config->http_listen_port = c->http_listen_port; config->thread_count = c->thread_count; config->tcp_nodelay = c->tcp_nodelay; config->listen_backlog = c->listen_backlog ? c->listen_backlog : SOMAXCONN; config->parser = dupstr(c->parser); config->balancer = dupstr(c->balancer); config->max_request_size = c->max_request_size; http_v1_0 = create_string("HTTP/1.0 "); http_v1_1 = create_string("HTTP/1.1 "); server_name = create_string("Server: Haywire/master"); if (strcmp(config->parser, "http_parser") == 0) { http_stream_on_read = &http_stream_on_read_http_parser; } http_server_write_response = &http_server_write_response_single; return 0; } int hw_init_from_config(char *configuration_filename) { configuration *config = load_configuration(configuration_filename); if (config == NULL) { return 1; } return hw_init_with_config(config); } void print_configuration() { #if 0 dzlog_debug("Address: %s\n\tPort: %d\n\tThreads: %d\n\tBalancer: %s\n\t" "Parser: %s\n\tTCP No Delay: %s\n\tListen backlog: %d\n\tMaximum request size: %d\n", config->http_listen_address, config->http_listen_port, config->thread_count, config->balancer, config->parser, config->tcp_nodelay? "on": "off", config->listen_backlog, config->max_request_size); #endif } http_connection *create_http_connection() { http_connection *connection = calloc(1, sizeof(http_connection)); connection->buffer = http_request_buffer_init(config->max_request_size); INCREMENT_STAT(stat_connections_created_total); return connection; } void free_http_connection(http_connection *connection) { if (connection->request) { free_http_request(connection->request); } http_request_buffer_destroy(connection->buffer); free(connection); INCREMENT_STAT(stat_connections_destroyed_total); } void set_route(void *hashmap, char *name, hw_route_entry *route_entry) { int ret; khiter_t k; khash_t(string_hashmap) *h = hashmap; k = kh_put(string_hashmap, h, dupstr(name), &ret); kh_value(h, k) = route_entry; } void hw_http_add_route(char *route, http_request_callback callback, void *user_data) { hw_route_entry *route_entry = malloc(sizeof(hw_route_entry)); route_entry->callback = callback; route_entry->user_data = user_data; if (routes == NULL) { routes = kh_init(string_hashmap); } set_route(routes, route, route_entry); dzlog_debug("Added route path: [%s]\n", route);// TODO: Replace with logging instead. } void free_http_server() { /* TODO: Shut down accepting incoming requests */ khash_t(string_hashmap) *h = routes; const char *k; const char *v; kh_foreach(h, k, v, { free((char *)k); free((char *)v); }); kh_destroy(string_hashmap, routes); uv_close((uv_handle_t*)&server, NULL); uninit_http_request_cache(); dzlog_debug("HTTP Server Close http://%s:%d\n", config->http_listen_address, config->http_listen_port); } int hw_http_open() { int threads = (int)config->thread_count; static uv_async_t service_handle; if (routes == NULL) { routes = kh_init(string_hashmap); } parser_settings.on_header_field = http_request_on_header_field; parser_settings.on_header_value = http_request_on_header_value; parser_settings.on_headers_complete = http_request_on_headers_complete; parser_settings.on_body = http_request_on_body; parser_settings.on_message_begin = http_request_on_message_begin; parser_settings.on_message_complete = http_request_on_message_complete; parser_settings.on_url = http_request_on_url; #ifdef UNIX signal(SIGPIPE, SIG_IGN); #endif// UNIX listener_count = threads; /* TODO: Use the return values from uv_tcp_init() and uv_tcp_bind() */ uv_loop = uv_default_loop(); listener_async_handles = calloc(listener_count, sizeof(uv_async_t)); listener_event_loops = calloc(listener_count, sizeof(uv_loop_t)); listeners_created_barrier = malloc(sizeof(uv_barrier_t)); uv_barrier_init(listeners_created_barrier, listener_count + 1); //service_handle = malloc(sizeof(uv_async_t)); uv_async_init(uv_loop, &service_handle, NULL); if (listener_count == 0) { /* If running single threaded there is no need to use the IPC pipe to distribute requests between threads so let's avoid the IPC overhead */ int rc; rc = uv_tcp_init_ex(uv_loop, &server, AF_INET); if (rc != 0) { dzlog_warn("TWO %d\n", rc); } if (strcmp(config->balancer, "reuseport") == 0) { uv_os_fd_t fd; int on = 1; rc = uv_fileno(&server, &fd); if (rc != 0) { dzlog_warn("ONE %d\n", rc); } rc = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on)); if (rc != 0) { dzlog_warn("THREE %d\n", errno); } } initialize_http_request_cache(); http_request_cache_configure_listener(uv_loop, NULL); uv_ip4_addr(config->http_listen_address, (int)config->http_listen_port, &listen_address); uv_tcp_bind(&server, (const struct sockaddr *)&listen_address, 0); if (config->tcp_nodelay) { uv_tcp_nodelay(&server, 1); } uv_listen((uv_stream_t *)&server, (int)config->listen_backlog, http_stream_on_connect); print_configuration(); dzlog_debug("HTTP Server Listening at http://%s:%d\n", config->http_listen_address, config->http_listen_port); //uv_run(uv_loop, UV_RUN_DEFAULT); } else if (listener_count > 0 && strcmp(config->balancer, "ipc") == 0) { int i; /* If we are running multithreaded spin up the dispatcher that uses an IPC pipe to send socket connection requests to listening threads */ struct server_ctx *servers; servers = calloc(threads, sizeof(servers[0])); for (i = 0; i < threads; i++) { //int rc; struct server_ctx *ctx = servers + i; ctx->index = i; ctx->listen_backlog = config->listen_backlog; uv_sem_init(&ctx->semaphore, 0); uv_thread_create(&ctx->thread_id, connection_consumer_start, ctx); } uv_barrier_wait(listeners_created_barrier); initialize_http_request_cache(); start_connection_dispatching(UV_TCP, threads, servers, config->http_listen_address, (int)config->http_listen_port, config->tcp_nodelay, (int)config->listen_backlog); } else if (listener_count > 0 && strcmp(config->balancer, "reuseport") == 0) { struct server_ctx *servers; servers = calloc(threads, sizeof(servers[0])); for (int i = 0; i < threads; i++) { struct server_ctx *ctx = servers + i; ctx->index = i; uv_thread_create(&ctx->thread_id, reuseport_thread_start, ctx); } print_configuration(); dzlog_debug("Listening...\n"); //uv_run(uv_loop, UV_RUN_DEFAULT); } return 0; } void reuseport_thread_start(void *arg) { int rc; struct server_ctx *ctx; uv_loop_t *loop; ctx = arg; loop = uv_loop_new(); listener_event_loops[ctx->index] = *loop; initialize_http_request_cache(); http_request_cache_configure_listener(loop, &listener_async_handles[ctx->index]); struct sockaddr_in addr; uv_tcp_t server; uv_tcp_init_ex(loop, &server, AF_INET); uv_ip4_addr(config->http_listen_address, (int)config->http_listen_port, &addr); uv_os_fd_t fd; int on = 1; uv_fileno(&server, &fd); rc = setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (char *)&on, sizeof(on)); if (rc != 0) { dzlog_warn("%d\n", errno); } uv_tcp_bind(&server, (const struct sockaddr *)&addr, 0); uv_listen((uv_stream_t *)&server, 128, http_stream_on_connect); uv_run(loop, UV_RUN_DEFAULT); uv_loop_delete(loop); } void http_stream_on_connect(uv_stream_t *stream, int UNUSED(status)) { http_connection *connection = create_http_connection(); uv_tcp_init(stream->loop, &connection->stream); http_parser_init(&connection->parser, HTTP_REQUEST); connection->parser.data = connection; connection->stream.data = connection; /* TODO: Use the return values from uv_accept() and uv_read_start() */ uv_accept(stream, (uv_stream_t *)&connection->stream); connection->state = OPEN; uv_read_start((uv_stream_t *)&connection->stream, http_stream_on_alloc, http_stream_on_read); } void http_stream_on_alloc(uv_handle_t *client, size_t suggested_size, uv_buf_t *buf) { http_connection *connection = (http_connection *)client->data; bool success = http_request_buffer_alloc(connection->buffer, suggested_size); hw_request_buffer_chunk chunk; chunk.size = 0; chunk.buffer = NULL; if (success) { http_request_buffer_chunk(connection->buffer, &chunk); } else { /* TODO out of memory event - we should hook up an application callback to this */ } *buf = uv_buf_init(chunk.buffer, chunk.size); } void http_stream_on_close(uv_handle_t *handle) { uv_handle_t *stream = handle; http_connection *connection = stream->data; if (connection->state != CLOSED) { connection->state = CLOSED; http_connection *connection = (http_connection *)handle->data; free_http_connection(connection); } } void http_stream_close_connection(http_connection *connection) { if (connection->state == OPEN) { connection->state = CLOSING; uv_close(&connection->stream, http_stream_on_close); } } void handle_request_error(http_connection *connection) { uv_handle_t *stream = &connection->stream; if (connection->state == OPEN) { uv_read_stop(stream); } connection->keep_alive = false; if (connection->request) { if (connection->state == OPEN) { /* Send the error message back. */ http_request_on_message_complete(&connection->parser); } } else { http_stream_close_connection(connection); } } void handle_bad_request(http_connection *connection) { if (connection->request) { connection->request->state = BAD_REQUEST; } handle_request_error(connection); } void handle_buffer_exceeded_error(http_connection *connection) { if (connection->request) { connection->request->state = SIZE_EXCEEDED; } handle_request_error(connection); } void handle_internal_error(http_connection *connection) { if (connection->request) { connection->request->state = INTERNAL_ERROR; } handle_request_error(connection); } void http_stream_on_shutdown(uv_shutdown_t *req, int UNUSED(status)) { http_connection *connection = req->data; // uv_handle_t *stream = &connection->stream; if (connection->state == OPEN) { http_stream_close_connection(connection); } free(req); } void http_stream_on_read_http_parser(uv_stream_t *tcp, ssize_t nread, const uv_buf_t *buf) { http_connection *connection = (http_connection *)tcp->data; if (nread > 0) { /* Need to tell the buffer that we care about the next nread bytes */ http_request_buffer_consume(connection->buffer, nread); http_parser_execute(&connection->parser, &parser_settings, (const char *)buf->base, nread); if (connection->parser.http_errno) { handle_bad_request(connection); } else { /* We finished processing this chunk of data, therefore we can't get rid of any chunks that were read before * the current one we're reading. * * We can't get rid of the one we're currently processing as it may contain a partial request that will * only be complete with the next chunk coming into a subsequent call of this function. */ http_request_buffer_sweep(connection->buffer); } } else if (nread == 0) { /* no-op - there's no data to be read, but there might be later */ } else if (nread == UV_ENOBUFS) { handle_buffer_exceeded_error(connection); } else if (nread == UV_EOF) { uv_shutdown_t *req = malloc(sizeof(uv_shutdown_t)); req->data = connection; uv_shutdown(req, &connection->stream, http_stream_on_shutdown); } else if (nread == UV_ECONNRESET || nread == UV_ECONNABORTED) { /* Let's close the connection as the other peer just disappeared */ http_stream_close_connection(connection); } else { /* We didn't see this coming, but an unexpected UV error code was passed in, so we'll * respond with a blanket 500 error if we can */ handle_internal_error(connection); } } void http_server_cleanup_write(char *response_string, hw_write_context *write_context, uv_write_t *write_req) { free(response_string); free(write_context); free(write_req); } int http_server_write_response_single(hw_write_context *write_context, hw_string *response) { http_connection *connection = write_context->connection; if (connection->state == OPEN) { uv_write_t *write_req = (uv_write_t *)malloc(sizeof(*write_req) + sizeof(uv_buf_t)); uv_buf_t *resbuf = (uv_buf_t *)(write_req + 1); resbuf->base = response->value; resbuf->len = response->length; write_req->data = write_context; uv_stream_t *stream = (uv_stream_t *)&write_context->connection->stream; if (uv_is_writable(stream)) { /* Ensuring that the response can still be written. */ uv_write(write_req, stream, resbuf, 1, http_server_after_write); /* TODO: Use the return values from uv_write() */ } else { /* The connection was closed, so we can write the response back, but we still need to free up things */ http_server_cleanup_write(resbuf->base, write_context, write_req); } } return 0; } void http_server_after_write(uv_write_t *req, int UNUSED(status)) { hw_write_context *write_context = (hw_write_context *)req->data; uv_buf_t *resbuf = (uv_buf_t *)(req + 1); //uv_handle_t *stream = (uv_handle_t *)req->handle; http_connection *connection = write_context->connection; if (!connection->keep_alive && connection->state == OPEN) { http_stream_close_connection(connection); } if (write_context->callback) { write_context->callback(write_context->user_data); } http_server_cleanup_write(resbuf->base, write_context, req); }