From: Grigorii Demidov Date: Tue, 15 Jan 2019 13:13:23 +0000 (+0100) Subject: pytest/proxy: support for multiple clients, some new functionality; tcproxy removed X-Git-Tag: v4.0.0~47^2~6 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=708ae1dcd9eed0e8a2ab5a002b275d6c9bcdf1b6;p=thirdparty%2Fknot-resolver.git pytest/proxy: support for multiple clients, some new functionality; tcproxy removed --- diff --git a/tests/pytests/proxy/Makefile b/tests/pytests/proxy/Makefile index 170b89e2d..4f93c7053 100644 --- a/tests/pytests/proxy/Makefile +++ b/tests/pytests/proxy/Makefile @@ -1,8 +1,7 @@ CC=gcc -CFLAGS_TLS=-DDEBUG -ggdb3 -O0 -lgnutls -luv -CFLAGS_TCP=-DDEBUG -ggdb3 -O0 -luv +CFLAGS_TLS=-DDEBUG -ggdb3 -O0 -lgnutls -luv -lasan -fsanitize=address -fno-omit-frame-pointer -all: tcproxy tlsproxy +all: tlsproxy tlsproxy: tls-proxy.o tlsproxy.o $(CC) tls-proxy.o tlsproxy.o -o tlsproxy $(CFLAGS_TLS) @@ -13,16 +12,7 @@ tls-proxy.o: tls-proxy.c tls-proxy.h array.h tlsproxy.o: tlsproxy.c tls-proxy.h $(CC) -c -o $@ $< $(CFLAGS_TLS) -tcproxy: tcp-proxy.o tcproxy.o - $(CC) tcp-proxy.o tcproxy.o -o tcproxy $(CFLAGS_TCP) - -tcp-proxy.o: tcp-proxy.c tcp-proxy.h array.h - $(CC) -c -o $@ $< $(CFLAGS_TCP) - -tcproxy.o: tcproxy.c tcp-proxy.h - $(CC) -c -o $@ $< $(CFLAGS_TCP) - clean: - rm -f tcp-proxy.o tcproxy.o tcproxy tls-proxy.o tlsproxy.o tlsproxy + rm -f tls-proxy.o tlsproxy.o tlsproxy .PHONY: all clean diff --git a/tests/pytests/proxy/tcp-proxy.c b/tests/pytests/proxy/tcp-proxy.c deleted file mode 100644 index ba7198bcf..000000000 --- a/tests/pytests/proxy/tcp-proxy.c +++ /dev/null @@ -1,336 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include "array.h" - -struct buf { - char buf[16 * 1024]; - size_t size; -}; - -enum peer_state { - STATE_NOT_CONNECTED, - STATE_LISTENING, - STATE_CONNECTED, - STATE_CONNECT_IN_PROGRESS, - STATE_CLOSING_IN_PROGRESS -}; - -struct proxy_ctx { - uv_loop_t *loop; - uv_tcp_t server; - uv_tcp_t client; - uv_tcp_t upstream; - struct sockaddr_storage server_addr; - struct sockaddr_storage upstream_addr; - - int server_state; - int client_state; - int upstream_state; - - array_t(struct buf *) buffer_pool; - array_t(struct buf *) upstream_pending; -}; - -static void read_from_upstream_cb(uv_stream_t *upstream, ssize_t nread, const uv_buf_t *buf); -static void read_from_client_cb(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); - -static struct buf *borrow_io_buffer(struct proxy_ctx *proxy) -{ - struct buf *buf = NULL; - if (proxy->buffer_pool.len > 0) { - buf = array_tail(proxy->buffer_pool); - array_pop(proxy->buffer_pool); - } else { - buf = calloc(1, sizeof (struct buf)); - } - return buf; -} - -static void release_io_buffer(struct proxy_ctx *proxy, struct buf *buf) -{ - if (!buf) { - return; - } - - if (proxy->buffer_pool.len < 1000) { - buf->size = 0; - array_push(proxy->buffer_pool, buf); - } else { - free(buf); - } -} - -static void push_to_upstream_pending(struct proxy_ctx *proxy, const char *buf, size_t size) -{ - while (size > 0) { - struct buf *b = borrow_io_buffer(proxy); - b->size = size <= sizeof(b->buf) ? size : sizeof(b->buf); - memcpy(b->buf, buf, b->size); - array_push(proxy->upstream_pending, b); - size -= b->size; - } -} - -static struct buf *get_first_upstream_pending(struct proxy_ctx *proxy) -{ - struct buf *buf = NULL; - if (proxy->upstream_pending.len > 0) { - buf = proxy->upstream_pending.at[0]; - } - return buf; -} - -static void remove_first_upstream_pending(struct proxy_ctx *proxy) -{ - for (int i = 1; i < proxy->upstream_pending.len; ++i) { - proxy->upstream_pending.at[i - 1] = proxy->upstream_pending.at[i]; - } - if (proxy->upstream_pending.len > 0) { - proxy->upstream_pending.len -= 1; - } -} - -static void clear_upstream_pending(struct proxy_ctx *proxy) -{ - for (int i = 1; i < proxy->upstream_pending.len; ++i) { - struct buf *b = proxy->upstream_pending.at[i]; - release_io_buffer(proxy, b); - } - proxy->upstream_pending.len = 0; -} - -static void clear_buffer_pool(struct proxy_ctx *proxy) -{ - for (int i = 1; i < proxy->buffer_pool.len; ++i) { - struct buf *b = proxy->buffer_pool.at[i]; - free(b); - } - proxy->buffer_pool.len = 0; -} - -static void alloc_uv_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) -{ - buf->base = (char*)malloc(suggested_size); - buf->len = suggested_size; -} - -static void on_client_close(uv_handle_t *handle) -{ - struct proxy_ctx *proxy = (struct proxy_ctx *)handle->loop->data; - proxy->client_state = STATE_NOT_CONNECTED; -} - -static void on_upstream_close(uv_handle_t *handle) -{ - struct proxy_ctx *proxy = (struct proxy_ctx *)handle->loop->data; - proxy->upstream_state = STATE_NOT_CONNECTED; -} - -static void write_to_client_cb(uv_write_t *req, int status) -{ - struct proxy_ctx *proxy = (struct proxy_ctx *)req->handle->loop->data; - free(req); - if (status) { - fprintf(stderr, "error writing to client: %s\n", uv_strerror(status)); - clear_upstream_pending(proxy); - proxy->client_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*)&proxy->client, on_client_close); - } -} - -static void write_to_upstream_cb(uv_write_t *req, int status) -{ - struct proxy_ctx *proxy = (struct proxy_ctx *)req->handle->loop->data; - free(req); - if (status) { - fprintf(stderr, "error writing to upstream: %s\n", uv_strerror(status)); - clear_upstream_pending(proxy); - proxy->upstream_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close); - return; - } - if (proxy->upstream_pending.len > 0) { - struct buf *buf = get_first_upstream_pending(proxy); - remove_first_upstream_pending(proxy); - release_io_buffer(proxy, buf); - if (proxy->upstream_state == STATE_CONNECTED && - proxy->upstream_pending.len > 0) { - buf = get_first_upstream_pending(proxy); - /* TODO avoid allocation */ - uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t)); - uv_buf_t wrbuf = uv_buf_init(buf->buf, buf->size); - uv_write(req, (uv_stream_t *)&proxy->upstream, &wrbuf, 1, write_to_upstream_cb); - } - } -} - -static void on_client_connection(uv_stream_t *server, int status) -{ - if (status < 0) { - fprintf(stderr, "incoming connection error: %s\n", uv_strerror(status)); - return; - } - - fprintf(stdout, "incoming connection\n"); - - struct proxy_ctx *proxy = (struct proxy_ctx *)server->loop->data; - if (proxy->client_state != STATE_NOT_CONNECTED) { - fprintf(stderr, "client already connected, ignoring\n"); - return; - } - - uv_tcp_init(proxy->loop, &proxy->client); - proxy->client_state = STATE_CONNECTED; - if (uv_accept(server, (uv_stream_t*)&proxy->client) == 0) { - uv_read_start((uv_stream_t*)&proxy->client, alloc_uv_buffer, read_from_client_cb); - } else { - proxy->client_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*)&proxy->client, on_client_close); - } -} - -static void on_connect_to_upstream(uv_connect_t *req, int status) -{ - struct proxy_ctx *proxy = (struct proxy_ctx *)req->handle->loop->data; - free(req); - if (status < 0) { - fprintf(stderr, "error connecting to upstream: %s\n", uv_strerror(status)); - clear_upstream_pending(proxy); - proxy->upstream_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close); - return; - } - - proxy->upstream_state = STATE_CONNECTED; - uv_read_start((uv_stream_t*)&proxy->upstream, alloc_uv_buffer, read_from_upstream_cb); - if (proxy->upstream_pending.len > 0) { - struct buf *buf = get_first_upstream_pending(proxy); - /* TODO avoid allocation */ - uv_write_t *wreq = (uv_write_t *) malloc(sizeof(uv_write_t)); - uv_buf_t wrbuf = uv_buf_init(buf->buf, buf->size); - uv_write(wreq, (uv_stream_t *)&proxy->upstream, &wrbuf, 1, write_to_upstream_cb); - } -} - -static void read_from_client_cb(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) -{ - if (nread == 0) { - return; - } - struct proxy_ctx *proxy = (struct proxy_ctx *)client->loop->data; - if (nread < 0) { - if (nread != UV_EOF) { - fprintf(stderr, "error reading from client: %s\n", uv_err_name(nread)); - } - if (proxy->client_state == STATE_CONNECTED) { - proxy->client_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*) client, on_client_close); - } - return; - } - if (proxy->upstream_state == STATE_CONNECTED) { - if (proxy->upstream_pending.len > 0) { - push_to_upstream_pending(proxy, buf->base, nread); - } else { - /* TODO avoid allocation */ - uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t)); - uv_buf_t wrbuf = uv_buf_init(buf->base, nread); - uv_write(req, (uv_stream_t *)&proxy->upstream, &wrbuf, 1, write_to_upstream_cb); - } - } else if (proxy->upstream_state == STATE_NOT_CONNECTED) { - /* TODO avoid allocation */ - uv_tcp_init(proxy->loop, &proxy->upstream); - uv_connect_t *conn = (uv_connect_t *) malloc(sizeof(uv_connect_t)); - proxy->upstream_state = STATE_CONNECT_IN_PROGRESS; - uv_tcp_connect(conn, &proxy->upstream, (struct sockaddr *)&proxy->upstream_addr, - on_connect_to_upstream); - push_to_upstream_pending(proxy, buf->base, nread); - } else if (proxy->upstream_state == STATE_CONNECT_IN_PROGRESS) { - push_to_upstream_pending(proxy, buf->base, nread); - } -} - -static void read_from_upstream_cb(uv_stream_t *upstream, ssize_t nread, const uv_buf_t *buf) -{ - if (nread == 0) { - return; - } - struct proxy_ctx *proxy = (struct proxy_ctx *)upstream->loop->data; - if (nread < 0) { - if (nread != UV_EOF) { - fprintf(stderr, "error reading from upstream: %s\n", uv_err_name(nread)); - } - clear_upstream_pending(proxy); - if (proxy->upstream_state == STATE_CONNECTED) { - proxy->upstream_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close); - } - return; - } - if (proxy->client_state == STATE_CONNECTED) { - /* TODO Avoid allocation */ - uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t)); - uv_buf_t wrbuf = uv_buf_init(buf->base, nread); - uv_write(req, (uv_stream_t *)&proxy->client, &wrbuf, 1, write_to_client_cb); - } -} - -struct proxy_ctx *proxy_allocate() -{ - return malloc(sizeof(struct proxy_ctx)); -} - -int proxy_init(struct proxy_ctx *proxy, - const char *server_addr, int server_port, - const char *upstream_addr, int upstream_port) -{ - proxy->loop = uv_default_loop(); - uv_tcp_init(proxy->loop, &proxy->server); - int res = uv_ip4_addr(server_addr, server_port, (struct sockaddr_in *)&proxy->server_addr); - if (res != 0) { - return res; - } - res = uv_ip4_addr(upstream_addr, upstream_port, (struct sockaddr_in *)&proxy->upstream_addr); - if (res != 0) { - return res; - } - array_init(proxy->buffer_pool); - array_init(proxy->upstream_pending); - proxy->server_state = STATE_NOT_CONNECTED; - proxy->client_state = STATE_NOT_CONNECTED; - proxy->upstream_state = STATE_NOT_CONNECTED; - - proxy->loop->data = proxy; - return 0; -} - -void proxy_free(struct proxy_ctx *proxy) -{ - if (!proxy) { - return; - } - clear_upstream_pending(proxy); - clear_buffer_pool(proxy); - /* TODO correctly close all the uv_tcp_t */ - free(proxy); -} - -int proxy_start_listen(struct proxy_ctx *proxy) -{ - uv_tcp_bind(&proxy->server, (const struct sockaddr*)&proxy->server_addr, 0); - int ret = uv_listen((uv_stream_t*)&proxy->server, 128, on_client_connection); - if (ret == 0) { - proxy->server_state = STATE_LISTENING; - } - return ret; -} - -int proxy_run(struct proxy_ctx *proxy) -{ - return uv_run(proxy->loop, UV_RUN_DEFAULT); -} diff --git a/tests/pytests/proxy/tcp-proxy.h b/tests/pytests/proxy/tcp-proxy.h deleted file mode 100644 index 668a65fd4..000000000 --- a/tests/pytests/proxy/tcp-proxy.h +++ /dev/null @@ -1,12 +0,0 @@ -#pragma once - -struct proxy_ctx; - -struct proxy_ctx *proxy_allocate(); -void proxy_free(struct proxy_ctx *proxy); -int proxy_init(struct proxy_ctx *proxy, - const char *server_addr, int server_port, - const char *upstream_addr, int upstream_port); -int proxy_start_listen(struct proxy_ctx *proxy); -int proxy_run(struct proxy_ctx *proxy); - diff --git a/tests/pytests/proxy/tcproxy.c b/tests/pytests/proxy/tcproxy.c deleted file mode 100644 index 87a6b4c16..000000000 --- a/tests/pytests/proxy/tcproxy.c +++ /dev/null @@ -1,25 +0,0 @@ -#include -#include "tcp-proxy.h" - -int main() -{ - struct proxy_ctx *proxy = proxy_allocate(); - if (!proxy) { - fprintf(stderr, "can't allocate proxy structure\n"); - return 1; - } - int res = proxy_init(proxy, "127.0.0.1", 54000, "127.0.0.1", 53001); - if (res) { - fprintf(stderr, "can't initialize proxy by given addresses\n"); - return res; - } - res = proxy_start_listen(proxy); - if (res) { - fprintf(stderr, "error starting listen, error code: %i\n", res); - return res; - } - res = proxy_run(proxy); - proxy_free(proxy); - return res; -} - diff --git a/tests/pytests/proxy/tls-proxy.c b/tests/pytests/proxy/tls-proxy.c index b7f28d172..b26493cee 100644 --- a/tests/pytests/proxy/tls-proxy.c +++ b/tests/pytests/proxy/tls-proxy.c @@ -15,8 +15,8 @@ #define MAX_CLIENT_PENDING_SIZE 4096 struct buf { - char buf[16 * 1024]; size_t size; + char buf[]; }; enum peer_state { @@ -38,9 +38,7 @@ enum handshake_state { struct tls_ctx { gnutls_session_t session; - int handshake_state; - gnutls_certificate_credentials_t credentials; - gnutls_priority_t priority_cache; + enum handshake_state handshake_state; /* for reading from the network */ const uint8_t *buf; ssize_t nread; @@ -48,43 +46,50 @@ struct tls_ctx { uint8_t recv_buf[4096]; }; +struct peer { + uv_tcp_t handle; + enum peer_state state; + struct sockaddr_storage addr; + array_t(struct buf *) pending_buf; + uint64_t connection_timestamp; + struct tls_ctx *tls; + struct peer *peer; + int active_requests; +}; + struct tls_proxy_ctx { const struct args *a; - uv_loop_t *loop; - uv_tcp_t server; - uv_tcp_t client; - uv_tcp_t upstream; - struct sockaddr_storage server_addr; + gnutls_certificate_credentials_t tls_credentials; + gnutls_priority_t tls_priority_cache; + struct { + uv_tcp_t handle; + struct sockaddr_storage addr; + } server; struct sockaddr_storage upstream_addr; - struct sockaddr_storage client_addr; - - int server_state; - int client_state; - int upstream_state; - - uint64_t client_connection_timestamp; - - array_t(struct buf *) buffer_pool; - array_t(struct buf *) upstream_pending; - array_t(struct buf *) client_pending; - - char io_buf[0xFFFF]; - struct tls_ctx tls; + array_t(struct peer *) client_list; + char uv_wire_buf[65535 * 2]; + int conn_sequence; }; static void read_from_upstream_cb(uv_stream_t *upstream, ssize_t nread, const uv_buf_t *buf); static void read_from_client_cb(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); static ssize_t proxy_gnutls_pull(gnutls_transport_ptr_t h, void *buf, size_t len); static ssize_t proxy_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len); -static int tls_process_from_upstream(struct tls_proxy_ctx *proxy, const uint8_t *buf, ssize_t nread); -static int tls_process_from_client(struct tls_proxy_ctx *proxy, const uint8_t *buf, ssize_t nread); -static int write_to_upstream_pending(struct tls_proxy_ctx *proxy); -static int write_to_client_pending(struct tls_proxy_ctx *proxy); - +static int tls_process_from_upstream(struct peer *upstream, const uint8_t *buf, ssize_t nread); +static int tls_process_from_client(struct peer *client, const uint8_t *buf, ssize_t nread); +static int write_to_upstream_pending(struct peer *peer); +static int write_to_client_pending(struct peer *peer); +static void on_client_close(uv_handle_t *handle); +static void on_upstream_close(uv_handle_t *handle); static int gnutls_references = 0; +static struct tls_proxy_ctx *get_proxy(struct peer *peer) +{ + return (struct tls_proxy_ctx *)peer->handle.loop->data; +} + const void *ip_addr(const struct sockaddr *addr) { if (!addr) { @@ -134,9 +139,10 @@ static int ip_addr_str(const struct sockaddr *addr, char *buf, size_t *buflen) return ret; } -static inline char *ip_straddr(const struct sockaddr *addr) +static inline char *ip_straddr(const struct sockaddr_storage *saddr_storage) { - assert(addr != NULL); + assert(saddr_storage != NULL); + const struct sockaddr *addr = (const struct sockaddr *)saddr_storage; /* We are the sinle-threaded application */ static char str[INET6_ADDRSTRLEN + 6]; size_t len = sizeof(str); @@ -144,404 +150,434 @@ static inline char *ip_straddr(const struct sockaddr *addr) return ret != 0 || len == 0 ? NULL : str; } -static struct buf *borrow_io_buffer(struct tls_proxy_ctx *proxy) +static struct buf *alloc_io_buffer(size_t size) { - struct buf *buf = NULL; - if (proxy->buffer_pool.len > 0) { - buf = array_tail(proxy->buffer_pool); - array_pop(proxy->buffer_pool); - } else { - buf = calloc(1, sizeof (struct buf)); - } + struct buf *buf = calloc(1, sizeof (struct buf) + size); + buf->size = size; return buf; } -static void release_io_buffer(struct tls_proxy_ctx *proxy, struct buf *buf) +static void free_io_buffer(struct buf *buf) { if (!buf) { return; } - - if (proxy->buffer_pool.len < 1000) { - buf->size = 0; - array_push(proxy->buffer_pool, buf); - } else { - free(buf); - } + free(buf); } -static struct buf *get_first_upstream_pending(struct tls_proxy_ctx *proxy) +static struct buf *get_first_pending_buf(struct peer *peer) { struct buf *buf = NULL; - if (proxy->upstream_pending.len > 0) { - buf = proxy->upstream_pending.at[0]; + if (peer->pending_buf.len > 0) { + buf = peer->pending_buf.at[0]; } return buf; } -static struct buf *get_first_client_pending(struct tls_proxy_ctx *proxy) +static struct buf *remove_first_pending_buf(struct peer *peer) { - struct buf *buf = NULL; - if (proxy->client_pending.len > 0) { - buf = proxy->client_pending.at[0]; - } - return buf; -} - -static void remove_first_upstream_pending(struct tls_proxy_ctx *proxy) -{ - for (int i = 1; i < proxy->upstream_pending.len; ++i) { - proxy->upstream_pending.at[i - 1] = proxy->upstream_pending.at[i]; - } - if (proxy->upstream_pending.len > 0) { - proxy->upstream_pending.len -= 1; - } -} - -static void remove_first_client_pending(struct tls_proxy_ctx *proxy) -{ - for (int i = 1; i < proxy->client_pending.len; ++i) { - proxy->client_pending.at[i - 1] = proxy->client_pending.at[i]; + if (peer->pending_buf.len == 0) { + return NULL; } - if (proxy->client_pending.len > 0) { - proxy->client_pending.len -= 1; + struct buf * buf = peer->pending_buf.at[0]; + for (int i = 1; i < peer->pending_buf.len; ++i) { + peer->pending_buf.at[i - 1] = peer->pending_buf.at[i]; } + peer->pending_buf.len -= 1; + return buf; } -static void clear_upstream_pending(struct tls_proxy_ctx *proxy) -{ - for (int i = 0; i < proxy->upstream_pending.len; ++i) { - struct buf *b = proxy->upstream_pending.at[i]; - release_io_buffer(proxy, b); - } - proxy->upstream_pending.len = 0; -} - -static void clear_client_pending(struct tls_proxy_ctx *proxy) -{ - for (int i = 0; i < proxy->client_pending.len; ++i) { - struct buf *b = proxy->client_pending.at[i]; - release_io_buffer(proxy, b); - } - proxy->client_pending.len = 0; -} - -static void clear_buffer_pool(struct tls_proxy_ctx *proxy) +static void clear_pending_bufs(struct peer *peer) { - for (int i = 0; i < proxy->buffer_pool.len; ++i) { - struct buf *b = proxy->buffer_pool.at[i]; - free(b); + for (int i = 0; i < peer->pending_buf.len; ++i) { + struct buf *b = peer->pending_buf.at[i]; + free_io_buffer(b); } - proxy->buffer_pool.len = 0; + peer->pending_buf.len = 0; } static void alloc_uv_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) { struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)handle->loop->data; - buf->base = proxy->io_buf; - buf->len = sizeof(proxy->io_buf); + buf->base = proxy->uv_wire_buf; + buf->len = sizeof(proxy->uv_wire_buf); } static void on_client_close(uv_handle_t *handle) { - struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)handle->loop->data; - gnutls_deinit(proxy->tls.session); - proxy->tls.handshake_state = TLS_HS_NOT_STARTED; - proxy->client_state = STATE_NOT_CONNECTED; -} - -static void on_dummmy_client_close(uv_handle_t *handle) -{ - free(handle); + struct peer *client = (struct peer *)handle->data; + struct peer *upstream = client->peer; + fprintf(stdout, "[client] connection with '%s' closed\n", ip_straddr(&client->addr)); + assert(client->tls); + gnutls_deinit(client->tls->session); + client->tls->handshake_state = TLS_HS_NOT_STARTED; + client->state = STATE_NOT_CONNECTED; + if (upstream->state != STATE_NOT_CONNECTED) { + if (upstream->state == STATE_CONNECTED) { + fprintf(stdout, "[client] closing connection with upstream for '%s'\n", + ip_straddr(&client->addr)); + upstream->state = STATE_CLOSING_IN_PROGRESS; + uv_close((uv_handle_t*)&upstream->handle, on_upstream_close); + } + return; + } + struct tls_proxy_ctx *proxy = get_proxy(client); + for (size_t i = 0; i < proxy->client_list.len; ++i) { + struct peer *client_i = proxy->client_list.at[i]; + if (client_i == client) { + fprintf(stdout, "[client] connection structures deallocated for '%s'\n", + ip_straddr(&client->addr)); + array_del(proxy->client_list, i); + free(client->tls); + free(client); + break; + } + } } static void on_upstream_close(uv_handle_t *handle) { - struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)handle->loop->data; - proxy->upstream_state = STATE_NOT_CONNECTED; + struct peer *upstream = (struct peer *)handle->data; + struct peer *client = upstream->peer; + assert(upstream->tls == NULL); + upstream->state = STATE_NOT_CONNECTED; + fprintf(stdout, "[upstream] connection with upstream closed for client '%s'\n", ip_straddr(&client->addr)); + if (client->state != STATE_NOT_CONNECTED) { + if (client->state == STATE_CONNECTED) { + fprintf(stdout, "[upstream] closing connection to client '%s'\n", + ip_straddr(&client->addr)); + client->state = STATE_CLOSING_IN_PROGRESS; + uv_close((uv_handle_t*)&client->handle, on_client_close); + } + return; + } + struct tls_proxy_ctx *proxy = get_proxy(upstream); + for (size_t i = 0; i < proxy->client_list.len; ++i) { + struct peer *client_i = proxy->client_list.at[i]; + if (client_i == client) { + fprintf(stdout, "[upstream] connection structures deallocated for '%s'\n", + ip_straddr(&client->addr)); + array_del(proxy->client_list, i); + free(upstream); + free(client->tls); + free(client); + break; + } + } } static void write_to_client_cb(uv_write_t *req, int status) { - struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)req->handle->loop->data; + struct peer *client = (struct peer *)req->handle->data; free(req); + client->active_requests -= 1; if (status) { - fprintf(stderr, "error writing to client: %s\n", uv_strerror(status)); - clear_client_pending(proxy); - if (proxy->client_state == STATE_CONNECTED) { - proxy->client_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*)&proxy->client, on_client_close); + fprintf(stdout, "[client] error writing to client '%s': %s\n", + ip_straddr(&client->addr), uv_strerror(status)); + clear_pending_bufs(client); + if (client->state == STATE_CONNECTED) { + client->state = STATE_CLOSING_IN_PROGRESS; + uv_close((uv_handle_t*)&client->handle, on_client_close); return; } } - fprintf(stdout, "successfully wrote to client, pending len is %zd\n", - proxy->client_pending.len); - if (proxy->client_state == STATE_CONNECTED && - proxy->tls.handshake_state == TLS_HS_DONE) { - uint64_t elapsed = uv_now(proxy->loop) - proxy->client_connection_timestamp; + fprintf(stdout, "[client] successfully wrote to client '%s', pending len is %zd, active requests %i\n", + ip_straddr(&client->addr), client->pending_buf.len, client->active_requests); + if (client->state == STATE_CONNECTED && + client->tls->handshake_state == TLS_HS_DONE) { + struct tls_proxy_ctx *proxy = get_proxy(client); + uint64_t elapsed = uv_now(proxy->loop) - client->connection_timestamp; if (!proxy->a->close_connection || elapsed < proxy->a->close_timeout) { - write_to_client_pending(proxy); + write_to_client_pending(client); } else { - clear_client_pending(proxy); - proxy->client_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*)&proxy->client, on_client_close); - fprintf(stdout, "closing connection to client\n"); + clear_pending_bufs(client); + client->state = STATE_CLOSING_IN_PROGRESS; + uv_close((uv_handle_t*)&client->handle, on_client_close); + fprintf(stdout, "[client] closing connection to client '%s'\n", ip_straddr(&client->addr)); } } } static void write_to_upstream_cb(uv_write_t *req, int status) { - struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)req->handle->loop->data; + struct peer *upstream = (struct peer *)req->handle->data; + void *data = req->data; + free(req); if (status) { - free(req); - fprintf(stderr, "error writing to upstream: %s\n", uv_strerror(status)); - clear_upstream_pending(proxy); - proxy->upstream_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close); + fprintf(stdout, "[upstream] error writing to upstream: %s\n", uv_strerror(status)); + clear_pending_bufs(upstream); + upstream->state = STATE_CLOSING_IN_PROGRESS; + uv_close((uv_handle_t*)&upstream->handle, on_upstream_close); return; } - if (req->data != NULL) { - assert(proxy->upstream_pending.len > 0); - struct buf *buf = get_first_upstream_pending(proxy); - assert(req->data == (void *)buf->buf); - fprintf(stdout, "successfully wrote %zi bytes to upstream, pending len is %zd\n", - buf->size, proxy->upstream_pending.len); - remove_first_upstream_pending(proxy); - release_io_buffer(proxy, buf); + if (data != NULL) { + assert(upstream->pending_buf.len > 0); + struct buf *buf = get_first_pending_buf(upstream); + assert(data == (void *)buf->buf); + fprintf(stdout, "[upstream] successfully wrote %zi bytes to upstream, pending len is %zd\n", + buf->size, upstream->pending_buf.len); + remove_first_pending_buf(upstream); + free_io_buffer(buf); } else { - fprintf(stdout, "successfully wrote bytes to upstream, pending len is %zd\n", - proxy->upstream_pending.len); + fprintf(stdout, "[upstream] successfully wrote to upstream, pending len is %zd\n", + upstream->pending_buf.len); } - if (proxy->client_state != STATE_CONNECTED) { - clear_upstream_pending(proxy); - } else if (proxy->upstream_state == STATE_CONNECTED && - proxy->upstream_pending.len > 0) { - write_to_upstream_pending(proxy); + if (upstream->peer == NULL || upstream->peer->state != STATE_CONNECTED) { + clear_pending_bufs(upstream); + } else if (upstream->state == STATE_CONNECTED && upstream->pending_buf.len > 0) { + write_to_upstream_pending(upstream); } - free(req); } -static void on_client_connection(uv_stream_t *server, int status) +static void accept_connection_from_client(uv_stream_t *server) { - if (status < 0) { - fprintf(stderr, "incoming connection error: %s\n", uv_strerror(status)); - return; - } - - int err = 0; - int ret = 0; struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)server->loop->data; - if (proxy->client_state != STATE_NOT_CONNECTED) { - fprintf(stderr, "incoming connection"); - uv_tcp_t *dummy_client = malloc(sizeof(uv_tcp_t)); - uv_tcp_init(proxy->loop, dummy_client); - err = uv_accept(server, (uv_stream_t*)dummy_client); - if (err == 0) { - struct sockaddr dummy_addr; - int dummy_addr_len = sizeof(dummy_addr); - ret = uv_tcp_getpeername(dummy_client, - &dummy_addr, - &dummy_addr_len); - if (ret == 0) { - fprintf(stderr, " from %s", ip_straddr(&dummy_addr)); - } - uv_close((uv_handle_t *)dummy_client, on_dummmy_client_close); - } else { - on_dummmy_client_close((uv_handle_t *)dummy_client); - } - fprintf(stderr, " - client already connected, rejecting\n"); - return; - } + struct peer *client = calloc(1, sizeof(struct peer)); + uv_tcp_init(proxy->loop, &client->handle); + uv_tcp_nodelay((uv_tcp_t *)&client->handle, 1); - uv_tcp_init(proxy->loop, &proxy->client); - uv_tcp_nodelay((uv_tcp_t *)&proxy->client, 1); - proxy->client_state = STATE_CONNECTED; - err = uv_accept(server, (uv_stream_t*)&proxy->client); + int err = uv_accept(server, (uv_stream_t*)&client->handle); if (err != 0) { - fprintf(stderr, "incoming connection - uv_accept() failed: (%d) %s\n", - err, uv_strerror(err)); + fprintf(stdout, "[client] incoming connection - uv_accept() failed: (%d) %s\n", + err, uv_strerror(err)); + proxy->conn_sequence = 0; return; } - struct sockaddr *addr = (struct sockaddr *)&(proxy->client_addr); - int addr_len = sizeof(proxy->client_addr); - ret = uv_tcp_getpeername(&proxy->client, addr, &addr_len); + client->state = STATE_CONNECTED; + array_init(client->pending_buf); + client->handle.data = client; + + struct peer *upstream = calloc(1, sizeof(struct peer)); + uv_tcp_init(proxy->loop, &upstream->handle); + uv_tcp_nodelay((uv_tcp_t *)&upstream->handle, 1); + + client->peer = upstream; + + array_init(upstream->pending_buf); + upstream->state = STATE_NOT_CONNECTED; + upstream->peer = client; + upstream->handle.data = upstream; + + struct sockaddr *addr = (struct sockaddr *)&(client->addr); + int addr_len = sizeof(client->addr); + int ret = uv_tcp_getpeername(&client->handle, addr, &addr_len); if (ret || addr->sa_family == AF_UNSPEC) { - proxy->client_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*)&proxy->client, on_client_close); - fprintf(stderr, "incoming connection - uv_tcp_getpeername() failed: (%d) %s\n", + client->state = STATE_CLOSING_IN_PROGRESS; + uv_close((uv_handle_t*)&client->handle, on_client_close); + fprintf(stdout, "[client] incoming connection - uv_tcp_getpeername() failed: (%d) %s\n", err, uv_strerror(err)); + proxy->conn_sequence = 0; return; } + memcpy(&upstream->addr, &proxy->upstream_addr, sizeof(struct sockaddr_storage)); - fprintf(stdout, "incoming connection from %s\n", ip_straddr(addr)); - - uv_read_start((uv_stream_t*)&proxy->client, alloc_uv_buffer, read_from_client_cb); + struct tls_ctx *tls = calloc(1, sizeof(struct tls_ctx)); + tls->handshake_state = TLS_HS_NOT_STARTED; + client->tls = tls; const char *errpos = NULL; - struct tls_ctx *tls = &proxy->tls; - assert (tls->handshake_state == TLS_HS_NOT_STARTED); err = gnutls_init(&tls->session, GNUTLS_SERVER | GNUTLS_NONBLOCK); if (err != GNUTLS_E_SUCCESS) { - fprintf(stderr, "gnutls_init() failed: (%d) %s\n", + fprintf(stdout, "[client] gnutls_init() failed: (%d) %s\n", err, gnutls_strerror_name(err)); } - err = gnutls_priority_set(tls->session, tls->priority_cache); + err = gnutls_priority_set(tls->session, proxy->tls_priority_cache); if (err != GNUTLS_E_SUCCESS) { - fprintf(stderr, "gnutls_priority_set() failed: (%d) %s\n", + fprintf(stdout, "[client] gnutls_priority_set() failed: (%d) %s\n", err, gnutls_strerror_name(err)); } - err = gnutls_credentials_set(tls->session, GNUTLS_CRD_CERTIFICATE, tls->credentials); + err = gnutls_credentials_set(tls->session, GNUTLS_CRD_CERTIFICATE, proxy->tls_credentials); if (err != GNUTLS_E_SUCCESS) { - fprintf(stderr, "gnutls_credentials_set() failed: (%d) %s\n", + fprintf(stdout, "[client] gnutls_credentials_set() failed: (%d) %s\n", err, gnutls_strerror_name(err)); } gnutls_certificate_server_set_request(tls->session, GNUTLS_CERT_IGNORE); gnutls_handshake_set_timeout(tls->session, GNUTLS_DEFAULT_HANDSHAKE_TIMEOUT); - gnutls_transport_set_pull_function(tls->session, proxy_gnutls_pull); gnutls_transport_set_push_function(tls->session, proxy_gnutls_push); - gnutls_transport_set_ptr(tls->session, proxy); + gnutls_transport_set_ptr(tls->session, client); tls->handshake_state = TLS_HS_IN_PROGRESS; - proxy->client_connection_timestamp = uv_now(proxy->loop); + client->connection_timestamp = uv_now(proxy->loop); + proxy->conn_sequence += 1; + array_push(proxy->client_list, client); + + fprintf(stdout, "[client] incoming connection from '%s'\n", ip_straddr(&client->addr)); + uv_read_start((uv_stream_t*)&client->handle, alloc_uv_buffer, read_from_client_cb); +} + +static void dynamic_handle_close_cb(uv_handle_t *handle) +{ + free(handle); +} + +static void delayed_accept_timer_cb(uv_timer_t *timer) +{ + uv_stream_t *server = (uv_stream_t *)timer->data; + fprintf(stdout, "[client] delayed connection processing\n"); + accept_connection_from_client(server); + uv_close((uv_handle_t *)timer, dynamic_handle_close_cb); +} + +static void on_client_connection(uv_stream_t *server, int status) +{ + if (status < 0) { + fprintf(stdout, "[client] incoming connection error: %s\n", uv_strerror(status)); + return; + } + struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)server->loop->data; + proxy->conn_sequence += 1; + if (proxy->a->max_conn_sequence > 0 && + proxy->conn_sequence > proxy->a->max_conn_sequence) { + fprintf(stdout, "[client] incoming connection, delaying\n"); + uv_timer_t *timer = (uv_timer_t*)malloc(sizeof *timer); + uv_timer_init(uv_default_loop(), timer); + timer->data = server; + uv_timer_start(timer, delayed_accept_timer_cb, 10000, 0); + proxy->conn_sequence = 0; + } else { + accept_connection_from_client(server); + } } static void on_connect_to_upstream(uv_connect_t *req, int status) { - struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)req->handle->loop->data; + struct peer *upstream = (struct peer *)req->handle->data; + struct tls_proxy_ctx *proxy = get_proxy(upstream); free(req); if (status < 0) { - fprintf(stderr, "error connecting to upstream (%s): %s\n", - ip_straddr((struct sockaddr *)&proxy->upstream_addr), + fprintf(stdout, "[upstream] error connecting to upstream (%s): %s\n", + ip_straddr(&upstream->addr), uv_strerror(status)); - clear_upstream_pending(proxy); - proxy->upstream_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close); + clear_pending_bufs(upstream); + upstream->state = STATE_CLOSING_IN_PROGRESS; + uv_close((uv_handle_t*)&upstream->handle, on_upstream_close); return; } - fprintf(stdout, "connected to %s\n", ip_straddr((struct sockaddr *)&proxy->upstream_addr)); + fprintf(stdout, "[upstream] connected to %s\n", ip_straddr(&upstream->addr)); - proxy->upstream_state = STATE_CONNECTED; - uv_read_start((uv_stream_t*)&proxy->upstream, alloc_uv_buffer, read_from_upstream_cb); - if (proxy->upstream_pending.len > 0) { - write_to_upstream_pending(proxy); + upstream->state = STATE_CONNECTED; + uv_read_start((uv_stream_t*)&upstream->handle, alloc_uv_buffer, read_from_upstream_cb); + if (upstream->pending_buf.len > 0) { + write_to_upstream_pending(upstream); } } -static void read_from_client_cb(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) +static void read_from_client_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { - fprintf(stdout, "reading %zd bytes from client\n", nread); if (nread == 0) { + fprintf(stdout, "[client] reading %zd bytes\n", nread); return; } - struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)client->loop->data; + struct peer *client = (struct peer *)handle->data; if (nread < 0) { if (nread != UV_EOF) { - fprintf(stderr, "error reading from client: %s\n", uv_err_name(nread)); + fprintf(stdout, "[client] error reading from '%s': %s\n", + ip_straddr(&client->addr), + uv_err_name(nread)); } else { - fprintf(stdout, "client has closed the connection\n"); + fprintf(stdout, "[client] closing connection with '%s'\n", + ip_straddr(&client->addr)); } - if (proxy->client_state == STATE_CONNECTED) { - proxy->client_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*) client, on_client_close); + if (client->state == STATE_CONNECTED) { + client->state = STATE_CLOSING_IN_PROGRESS; + uv_close((uv_handle_t*)handle, on_client_close); } return; } - int res = tls_process_from_client(proxy, buf->base, nread); + struct tls_proxy_ctx *proxy = get_proxy(client); + if (proxy->a->accept_only) { + fprintf(stdout, "[client] ignoring %zd bytes from '%s'\n", nread, ip_straddr(&client->addr)); + return; + } + fprintf(stdout, "[client] reading %zd bytes from '%s'\n", nread, ip_straddr(&client->addr)); + + int res = tls_process_from_client(client, buf->base, nread); if (res < 0) { - if (proxy->client_state == STATE_CONNECTED) { - proxy->client_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*) client, on_client_close); + if (client->state == STATE_CONNECTED) { + client->state = STATE_CLOSING_IN_PROGRESS; + uv_close((uv_handle_t*)&client->handle, on_client_close); } } } -static void read_from_upstream_cb(uv_stream_t *upstream, ssize_t nread, const uv_buf_t *buf) +static void read_from_upstream_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) { - fprintf(stdout, "reading %zd bytes from upstream\n", nread); + fprintf(stdout, "[upstream] reading %zd bytes\n", nread); if (nread == 0) { return; } - struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)upstream->loop->data; + struct peer *upstream = (struct peer *)handle->data; if (nread < 0) { if (nread != UV_EOF) { - fprintf(stderr, "error reading from upstream: %s\n", uv_err_name(nread)); + fprintf(stdout, "[upstream] error reading from upstream: %s\n", uv_err_name(nread)); } else { - fprintf(stdout, "upstream has closed the connection\n"); + fprintf(stdout, "[upstream] closing connection\n"); } - clear_upstream_pending(proxy); - if (proxy->upstream_state == STATE_CONNECTED) { - proxy->upstream_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close); + clear_pending_bufs(upstream); + if (upstream->state == STATE_CONNECTED) { + upstream->state = STATE_CLOSING_IN_PROGRESS; + uv_close((uv_handle_t*)&upstream->handle, on_upstream_close); } return; } - int res = tls_process_from_upstream(proxy, buf->base, nread); + int res = tls_process_from_upstream(upstream, buf->base, nread); if (res < 0) { - fprintf(stderr, "error sending tls data to client\n"); - if (proxy->client_state == STATE_CONNECTED) { - proxy->client_state = STATE_CLOSING_IN_PROGRESS; - uv_close((uv_handle_t*)&proxy->client, on_client_close); + fprintf(stdout, "[upstream] error processing tls data to client\n"); + if (upstream->peer->state == STATE_CONNECTED) { + upstream->peer->state = STATE_CLOSING_IN_PROGRESS; + uv_close((uv_handle_t*)&upstream->peer->handle, on_client_close); } } } -static void push_to_upstream_pending(struct tls_proxy_ctx *proxy, const char *buf, size_t size) +static void push_to_upstream_pending(struct peer *upstream, const char *buf, size_t size) { - while (size > 0) { - struct buf *b = borrow_io_buffer(proxy); - b->size = size <= sizeof(b->buf) ? size : sizeof(b->buf); - memcpy(b->buf, buf, b->size); - array_push(proxy->upstream_pending, b); - size -= b->size; - buf += b->size; - } + struct buf *b = alloc_io_buffer(size); + memcpy(b->buf, buf, b->size); + array_push(upstream->pending_buf, b); } -static void push_to_client_pending(struct tls_proxy_ctx *proxy, const char *buf, size_t size) +static void push_to_client_pending(struct peer *client, const char *buf, size_t size) { - if (proxy->a->rehandshake && - proxy->client_pending.len > MAX_CLIENT_PENDING_SIZE) { - return; - } + struct tls_proxy_ctx *proxy = get_proxy(client); while (size > 0) { - struct buf *b = borrow_io_buffer(proxy); - b->size = size <= sizeof(b->buf) ? size : sizeof(b->buf); - if (proxy->a->rehandshake && b->size > CLIENT_ANSWER_CHUNK_SIZE) { - b->size = CLIENT_ANSWER_CHUNK_SIZE; + int temp_size = size; + if (proxy->a->rehandshake && temp_size > CLIENT_ANSWER_CHUNK_SIZE) { + temp_size = CLIENT_ANSWER_CHUNK_SIZE; } + struct buf *b = alloc_io_buffer(temp_size); memcpy(b->buf, buf, b->size); - array_push(proxy->client_pending, b); - size -= b->size; - buf += b->size; + array_push(client->pending_buf, b); + size -= temp_size; + buf += temp_size; } } -static int write_to_upstream_pending(struct tls_proxy_ctx *proxy) +static int write_to_upstream_pending(struct peer *upstream) { - struct buf *buf = get_first_upstream_pending(proxy); - /* TODO avoid allocation */ + struct tls_proxy_ctx *proxy = get_proxy(upstream); + struct buf *buf = get_first_pending_buf(upstream); uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t)); uv_buf_t wrbuf = uv_buf_init(buf->buf, buf->size); req->data = buf->buf; - fprintf(stdout, "writing %zd bytes to upstream\n", buf->size); - return uv_write(req, (uv_stream_t *)&proxy->upstream, &wrbuf, 1, write_to_upstream_cb); + fprintf(stdout, "[upstream] writing %zd bytes\n", buf->size); + return uv_write(req, (uv_stream_t *)&upstream->handle, &wrbuf, 1, write_to_upstream_cb); } static ssize_t proxy_gnutls_pull(gnutls_transport_ptr_t h, void *buf, size_t len) { - struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)h; - struct tls_ctx *t = &proxy->tls; + struct peer *peer = (struct peer *)h; + struct tls_ctx *t = peer->tls; - fprintf(stdout, "\t gnutls: pulling %zd bytes from client\n", len); + fprintf(stdout, "[gnutls_pull] pulling %zd bytes\n", len); if (t->nread <= t->consumed) { errno = EAGAIN; - fprintf(stdout, "\t gnutls: return EAGAIN\n"); + fprintf(stdout, "[gnutls_pull] return EAGAIN\n"); return -1; } @@ -554,9 +590,9 @@ static ssize_t proxy_gnutls_pull(gnutls_transport_ptr_t h, void *buf, size_t len ssize_t proxy_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len) { - struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)h; - struct tls_ctx *t = &proxy->tls; - fprintf(stdout, "\t gnutls: writing %zd bytes to client\n", len); + struct peer *client = (struct peer *)h; + struct tls_ctx *t = client->tls; + fprintf(stdout, "[gnutls_push] writing %zd bytes\n", len); ssize_t ret = -1; const size_t req_size_aligned = ((sizeof(uv_write_t) / 16) + 1) * 16; @@ -567,10 +603,10 @@ ssize_t proxy_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len) { data, len } }; memcpy(data, buf, len); - req->data = data; - int res = uv_write(req, (uv_stream_t *)&proxy->client, uv_buf, 1, write_to_client_cb); + int res = uv_write(req, (uv_stream_t *)&client->handle, uv_buf, 1, write_to_client_cb); if (res == 0) { ret = len; + client->active_requests += 1; } else { free(common_buf); errno = EIO; @@ -578,18 +614,19 @@ ssize_t proxy_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len) return ret; } -static int write_to_client_pending(struct tls_proxy_ctx *proxy) +static int write_to_client_pending(struct peer *client) { - if (proxy->client_pending.len == 0) { + if (client->pending_buf.len == 0) { return 0; } - struct buf *buf = get_first_client_pending(proxy); + struct tls_proxy_ctx *proxy = get_proxy(client); + struct buf *buf = get_first_pending_buf(client); uv_buf_t wrbuf = uv_buf_init(buf->buf, buf->size); - fprintf(stdout, "writing %zd bytes to client\n", buf->size); + fprintf(stdout, "[client] writing %zd bytes\n", buf->size); - gnutls_session_t tls_session = proxy->tls.session; - assert(proxy->tls.handshake_state != TLS_HS_IN_PROGRESS); + gnutls_session_t tls_session = client->tls->session; + assert(client->tls->handshake_state != TLS_HS_IN_PROGRESS); char *data = buf->buf; size_t len = buf->size; @@ -601,12 +638,12 @@ static int write_to_client_pending(struct tls_proxy_ctx *proxy) count = gnutls_record_send(tls_session, data, len); if (count < 0) { if (gnutls_error_is_fatal(count)) { - fprintf(stderr, "gnutls_record_send failed: %s (%zd)\n", + fprintf(stdout, "[client] gnutls_record_send failed: %s (%zd)\n", gnutls_strerror_name(count), count); return -1; } if (++retries > TLS_MAX_SEND_RETRIES) { - fprintf(stderr, "gnutls_record_send: too many sequential non-fatal errors (%zd), last error is: %s (%zd)\n", + fprintf(stdout, "[client] gnutls_record_send: too many sequential non-fatal errors (%zd), last error is: %s (%zd)\n", retries, gnutls_strerror_name(count), count); return -1; } @@ -618,47 +655,48 @@ static int write_to_client_pending(struct tls_proxy_ctx *proxy) if (++retries < TLS_MAX_SEND_RETRIES) { continue; } - fprintf(stderr, "gnutls_record_send: too many retries (%zd)\n", + fprintf(stdout, "[client] gnutls_record_send: too many retries (%zd)\n", retries); - fprintf(stderr, "tls_push_to_client didn't send all data(%zd of %zd)\n", + fprintf(stdout, "[client] tls_push_to_client didn't send all data(%zd of %zd)\n", len, submitted); return -1; } } while (len > 0); - remove_first_client_pending(proxy); - release_io_buffer(proxy, buf); + remove_first_pending_buf(client); + free_io_buffer(buf); - fprintf(stdout, "submitted %zd bytes to client\n", submitted); + fprintf(stdout, "[client] submitted %zd bytes\n", submitted); if (proxy->a->rehandshake) { assert (gnutls_safe_renegotiation_status(tls_session) != 0); assert (gnutls_rehandshake(tls_session) == GNUTLS_E_SUCCESS); /* Prevent write-to-client callback from sending next pending chunk. * At the same time tls_process_from_client() must not call gnutls_handshake() * as there can be application data in this direction. */ - proxy->tls.handshake_state = TLS_HS_EXPECTED; - fprintf(stdout, "rehandshake started\n"); + client->tls->handshake_state = TLS_HS_EXPECTED; + fprintf(stdout, "[client] rehandshake started\n"); } return submitted; } -static int tls_process_from_upstream(struct tls_proxy_ctx *proxy, const uint8_t *buf, ssize_t len) +static int tls_process_from_upstream(struct peer *upstream, const uint8_t *buf, ssize_t len) { - gnutls_session_t tls_session = proxy->tls.session; + struct peer *client = upstream->peer; + gnutls_session_t tls_session = client->tls->session; - fprintf(stdout, "pushing %zd bytes to client\n", len); + fprintf(stdout, "[upstream] pushing %zd bytes to client\n", len); ssize_t submitted = 0; - if (proxy->client_state != STATE_CONNECTED) { + if (client->state != STATE_CONNECTED) { return submitted; } - bool list_was_empty = (proxy->client_pending.len == 0); - push_to_client_pending(proxy, buf, len); + bool list_was_empty = (client->pending_buf.len == 0); + push_to_client_pending(client, buf, len); submitted = len; - if (proxy->tls.handshake_state == TLS_HS_DONE) { - if (list_was_empty && proxy->client_pending.len > 0) { - int ret = write_to_client_pending(proxy); + if (client->tls->handshake_state == TLS_HS_DONE) { + if (list_was_empty && client->pending_buf.len > 0) { + int ret = write_to_client_pending(client); if (ret < 0) { submitted = -1; } @@ -668,27 +706,27 @@ static int tls_process_from_upstream(struct tls_proxy_ctx *proxy, const uint8_t return submitted; } -int tls_process_handshake(struct tls_proxy_ctx *proxy) +int tls_process_handshake(struct peer *peer) { - struct tls_ctx *tls = &proxy->tls; + struct tls_ctx *tls = peer->tls; int ret = 1; while (tls->handshake_state == TLS_HS_IN_PROGRESS) { - fprintf(stdout, "TLS handshake in progress...\n"); + fprintf(stdout, "[tls] TLS handshake in progress...\n"); int err = gnutls_handshake(tls->session); if (err == GNUTLS_E_SUCCESS) { tls->handshake_state = TLS_HS_DONE; - fprintf(stdout, "TLS handshake has completed\n"); + fprintf(stdout, "[tls] TLS handshake has completed\n"); ret = 1; - if (proxy->client_pending.len != 0) { - write_to_client_pending(proxy); + if (peer->pending_buf.len != 0) { + write_to_client_pending(peer); } } else if (gnutls_error_is_fatal(err)) { - fprintf(stderr, "gnutls_handshake failed: %s (%d)\n", + fprintf(stdout, "[tls] gnutls_handshake failed: %s (%d)\n", gnutls_strerror_name(err), err); ret = -1; break; } else { - fprintf(stderr, "gnutls_handshake nonfatal error: %s (%d)\n", + fprintf(stdout, "[tls] gnutls_handshake nonfatal error: %s (%d)\n", gnutls_strerror_name(err), err); ret = 0; break; @@ -697,61 +735,58 @@ int tls_process_handshake(struct tls_proxy_ctx *proxy) return ret; } -int tls_process_from_client(struct tls_proxy_ctx *proxy, const uint8_t *buf, ssize_t nread) +int tls_process_from_client(struct peer *client, const uint8_t *buf, ssize_t nread) { - struct tls_ctx *tls = &proxy->tls; + struct tls_ctx *tls = client->tls; tls->buf = buf; tls->nread = nread >= 0 ? nread : 0; tls->consumed = 0; - fprintf(stdout, "tls_process: reading %zd bytes from client\n", nread); + fprintf(stdout, "[client] tls_process: reading %zd bytes from client\n", nread); - int ret = tls_process_handshake(proxy); + int ret = tls_process_handshake(client); if (ret <= 0) { return ret; } int submitted = 0; while (true) { - ssize_t count = 0; - count = gnutls_record_recv(tls->session, tls->recv_buf, sizeof(tls->recv_buf)); + ssize_t count = gnutls_record_recv(tls->session, tls->recv_buf, sizeof(tls->recv_buf)); if (count == GNUTLS_E_AGAIN) { break; /* No data available */ } else if (count == GNUTLS_E_INTERRUPTED) { continue; /* Try reading again */ } else if (count == GNUTLS_E_REHANDSHAKE) { tls->handshake_state = TLS_HS_IN_PROGRESS; - ret = tls_process_handshake(proxy); - if (ret <= 0) { + ret = tls_process_handshake(client); + if (ret < 0) { return ret; } continue; } else if (count < 0) { - fprintf(stderr, "gnutls_record_recv failed: %s (%zd)\n", + fprintf(stdout, "[client] gnutls_record_recv failed: %s (%zd)\n", gnutls_strerror_name(count), count); return -1; } else if (count == 0) { break; } - if (proxy->upstream_state == STATE_CONNECTED) { - bool upstream_pending_is_empty = (proxy->upstream_pending.len == 0); - push_to_upstream_pending(proxy, tls->recv_buf, count); + struct peer *upstream = client->peer; + if (upstream->state == STATE_CONNECTED) { + bool upstream_pending_is_empty = (upstream->pending_buf.len == 0); + push_to_upstream_pending(upstream, tls->recv_buf, count); if (upstream_pending_is_empty) { - write_to_upstream_pending(proxy); + write_to_upstream_pending(upstream); } - } else if (proxy->upstream_state == STATE_NOT_CONNECTED) { - /* TODO avoid allocation */ - uv_tcp_init(proxy->loop, &proxy->upstream); + } else if (upstream->state == STATE_NOT_CONNECTED) { uv_connect_t *conn = (uv_connect_t *) malloc(sizeof(uv_connect_t)); - proxy->upstream_state = STATE_CONNECT_IN_PROGRESS; - fprintf(stdout, "connecting to %s\n", - ip_straddr((struct sockaddr *)&proxy->upstream_addr)); - uv_tcp_connect(conn, &proxy->upstream, (struct sockaddr *)&proxy->upstream_addr, + upstream->state = STATE_CONNECT_IN_PROGRESS; + fprintf(stdout, "[client] connecting to upstream '%s'\n", ip_straddr(&upstream->addr)); + uv_tcp_connect(conn, &upstream->handle, (struct sockaddr *)&upstream->addr, on_connect_to_upstream); - push_to_upstream_pending(proxy, tls->recv_buf, count); - } else if (proxy->upstream_state == STATE_CONNECT_IN_PROGRESS) { - push_to_upstream_pending(proxy, tls->recv_buf, count); + push_to_upstream_pending(upstream, tls->recv_buf, count); + } else if (upstream->state == STATE_CONNECT_IN_PROGRESS) { + push_to_upstream_pending(upstream, tls->recv_buf, count); } submitted += count; } @@ -773,12 +808,12 @@ int tls_proxy_init(struct tls_proxy_ctx *proxy, const struct args *a) const char *key_file = a->key_file; proxy->a = a; proxy->loop = uv_default_loop(); - uv_tcp_init(proxy->loop, &proxy->server); - int res = uv_ip4_addr(server_addr, server_port, (struct sockaddr_in *)&proxy->server_addr); + uv_tcp_init(proxy->loop, &proxy->server.handle); + int res = uv_ip4_addr(server_addr, server_port, (struct sockaddr_in *)&proxy->server.addr); if (res != 0) { - res = uv_ip6_addr(server_addr, server_port, (struct sockaddr_in6 *)&proxy->server_addr); + res = uv_ip6_addr(server_addr, server_port, (struct sockaddr_in6 *)&proxy->server.addr); if (res != 0) { - fprintf(stderr, "tls_proxy_init: can't parse local address '%s'\n", server_addr); + fprintf(stdout, "[proxy] tls_proxy_init: can't parse local address '%s'\n", server_addr); return -1; } } @@ -786,16 +821,12 @@ int tls_proxy_init(struct tls_proxy_ctx *proxy, const struct args *a) if (res != 0) { res = uv_ip6_addr(upstream_addr, upstream_port, (struct sockaddr_in6 *)&proxy->upstream_addr); if (res != 0) { - fprintf(stderr, "tls_proxy_init: can't parse upstream address '%s'\n", upstream_addr); + fprintf(stdout, "[proxy] tls_proxy_init: can't parse upstream address '%s'\n", upstream_addr); return -1; } } - array_init(proxy->buffer_pool); - array_init(proxy->upstream_pending); - array_init(proxy->client_pending); - proxy->server_state = STATE_NOT_CONNECTED; - proxy->client_state = STATE_NOT_CONNECTED; - proxy->upstream_state = STATE_NOT_CONNECTED; + array_init(proxy->client_list); + proxy->conn_sequence = 0; proxy->loop->data = proxy; @@ -803,46 +834,44 @@ int tls_proxy_init(struct tls_proxy_ctx *proxy, const struct args *a) if (gnutls_references == 0) { err = gnutls_global_init(); if (err != GNUTLS_E_SUCCESS) { - fprintf(stderr, "gnutls_global_init() failed: (%d) %s\n", + fprintf(stdout, "[proxy] gnutls_global_init() failed: (%d) %s\n", err, gnutls_strerror_name(err)); return -1; } } gnutls_references += 1; - err = gnutls_certificate_allocate_credentials(&proxy->tls.credentials); + err = gnutls_certificate_allocate_credentials(&proxy->tls_credentials); if (err != GNUTLS_E_SUCCESS) { - fprintf(stderr, "gnutls_certificate_allocate_credentials() failed: (%d) %s\n", + fprintf(stdout, "[proxy] gnutls_certificate_allocate_credentials() failed: (%d) %s\n", err, gnutls_strerror_name(err)); return -1; } - err = gnutls_certificate_set_x509_system_trust(proxy->tls.credentials); + err = gnutls_certificate_set_x509_system_trust(proxy->tls_credentials); if (err <= 0) { - fprintf(stderr, "gnutls_certificate_set_x509_system_trust() failed: (%d) %s\n", + fprintf(stdout, "[proxy] gnutls_certificate_set_x509_system_trust() failed: (%d) %s\n", err, gnutls_strerror_name(err)); return -1; } if (cert_file && key_file) { - err = gnutls_certificate_set_x509_key_file(proxy->tls.credentials, + err = gnutls_certificate_set_x509_key_file(proxy->tls_credentials, cert_file, key_file, GNUTLS_X509_FMT_PEM); if (err != GNUTLS_E_SUCCESS) { - fprintf(stderr, "gnutls_certificate_set_x509_key_file() failed: (%d) %s\n", + fprintf(stdout, "[proxy] gnutls_certificate_set_x509_key_file() failed: (%d) %s\n", err, gnutls_strerror_name(err)); return -1; } } - err = gnutls_priority_init(&proxy->tls.priority_cache, NULL, NULL); + err = gnutls_priority_init(&proxy->tls_priority_cache, NULL, NULL); if (err != GNUTLS_E_SUCCESS) { - fprintf(stderr, "gnutls_priority_init() failed: (%d) %s\n", + fprintf(stdout, "[proxy] gnutls_priority_init() failed: (%d) %s\n", err, gnutls_strerror_name(err)); return -1; } - - proxy->tls.handshake_state = TLS_HS_NOT_STARTED; return 0; } @@ -851,12 +880,18 @@ void tls_proxy_free(struct tls_proxy_ctx *proxy) if (!proxy) { return; } - clear_upstream_pending(proxy); - clear_client_pending(proxy); - clear_buffer_pool(proxy); - gnutls_certificate_free_credentials(proxy->tls.credentials); - gnutls_priority_deinit(proxy->tls.priority_cache); - /* TODO correctly close all the uv_tcp_t */ + while (proxy->client_list.len > 0) { + size_t last_index = proxy->client_list.len - 1; + struct peer *client = proxy->client_list.at[last_index]; + clear_pending_bufs(client); + clear_pending_bufs(client->peer); + /* TODO correctly close all the uv_tcp_t */ + free(client->peer); + free(client); + array_del(proxy->client_list, last_index); + } + gnutls_certificate_free_credentials(proxy->tls_credentials); + gnutls_priority_deinit(proxy->tls_priority_cache); free(proxy); gnutls_references -= 1; @@ -867,11 +902,8 @@ void tls_proxy_free(struct tls_proxy_ctx *proxy) int tls_proxy_start_listen(struct tls_proxy_ctx *proxy) { - uv_tcp_bind(&proxy->server, (const struct sockaddr*)&proxy->server_addr, 0); - int ret = uv_listen((uv_stream_t*)&proxy->server, 128, on_client_connection); - if (ret == 0) { - proxy->server_state = STATE_LISTENING; - } + uv_tcp_bind(&proxy->server.handle, (const struct sockaddr*)&proxy->server.addr, 0); + int ret = uv_listen((uv_stream_t*)&proxy->server.handle, 128, on_client_connection); return ret; } diff --git a/tests/pytests/proxy/tls-proxy.h b/tests/pytests/proxy/tls-proxy.h index f16250c5a..4ec13d34f 100644 --- a/tests/pytests/proxy/tls-proxy.h +++ b/tests/pytests/proxy/tls-proxy.h @@ -12,8 +12,10 @@ struct args { bool rehandshake; bool close_connection; + bool accept_only; uint64_t close_timeout; + uint32_t max_conn_sequence; const char *cert_file; const char *key_file; diff --git a/tests/pytests/proxy/tlsproxy.c b/tests/pytests/proxy/tlsproxy.c index 41a91cff9..36f213b50 100644 --- a/tests/pytests/proxy/tlsproxy.c +++ b/tests/pytests/proxy/tlsproxy.c @@ -22,12 +22,19 @@ void help(char *argv[], struct args *a) " -d, --uport=[port] Upstream port (default: %u).\n" " -t, --cert=[path] Path to certificate file (default: %s).\n" " -k, --key=[path] Path to key file (default: %s).\n" - " -c, --close=[N] Close connection to client after every N ms (default: no).\n" - " -r, --rehandshake Do TLS rehandshake after every 8 bytes sent to client (default: no).\n", + " -c, --close=[N] Close connection to client after\n" + " every N ms (default: %li).\n" + " -f, --fail=[N] Delay every Nth incoming connection by 10 sec,\n" + " 0 disables delaying (default: 0).\n" + " -r, --rehandshake Do TLS rehandshake after every 8 bytes\n" + " sent to the client (default: no).\n" + " -a, --acceptonly Accept incoming connections, but don't\n" + " connect to upstream (default: no).\n" + , a->local_addr, a->local_port, a->upstream, a->upstream_port, - a->cert_file, a->key_file - ); + a->cert_file, a->key_file, + a->close_timeout); } void init_args(struct args *a) @@ -39,7 +46,10 @@ void init_args(struct args *a) a->cert_file = default_cert_path; a->key_file = default_key_path; a->rehandshake = false; + a->accept_only = false; a->close_connection = false; + a->close_timeout = 1000; + a->max_conn_sequence = 0; /* disabled */ } int main(int argc, char **argv) @@ -54,12 +64,14 @@ int main(int argc, char **argv) {"cert", required_argument, 0, 't'}, {"key", required_argument, 0, 'k'}, {"close", required_argument, 0, 'c'}, + {"fail", required_argument, 0, 'f'}, {"rehandshake", no_argument, 0, 'r'}, + {"acceptonly", no_argument, 0, 'a'}, {0, 0, 0, 0} }; struct args args; init_args(&args); - while ((c = getopt_long(argc, argv, "l:p:u:d:t:k:c:r", opts, &li)) != -1) { + while ((c = getopt_long(argc, argv, "l:p:u:d:t:k:c:f:ra", opts, &li)) != -1) { switch (c) { case 'l': @@ -102,9 +114,22 @@ int main(int argc, char **argv) args.close_connection = true; args.close_timeout = li_value; break; + case 'f': + li_value = strtol(optarg, NULL, 10); + if (li_value <= 0 || li_value > UINT32_MAX) { + printf("error: '-f' requires a positive" + " number less or equal to %i, not '%s'\n", + UINT32_MAX, optarg); + return -1; + } + args.max_conn_sequence = (uint32_t)li_value; + break; case 'r': args.rehandshake = true; break; + case 'a': + args.accept_only = true; + break; default: init_args(&args); help(argv, &args); @@ -130,14 +155,19 @@ int main(int argc, char **argv) fprintf(stderr, "error starting listen, error code: %i\n", res); return res; } - fprintf(stdout, "Listen on %s#%u\n" - "Upstream is expected on %s#%u\n" - "Rehandshake %s\n" - "Close %s\n", + fprintf(stdout, "Listen on %s#%u\n" + "Upstream is expected on %s#%u\n" + "Rehandshake %s\n" + "Close %s\n" + "Refuse incoming connections every %ith%s\n" + "Only accept, don't forward %s\n", args.local_addr, args.local_port, args.upstream, args.upstream_port, args.rehandshake ? "yes" : "no", - args.close_connection ? "yes" : "no"); + args.close_connection ? "yes" : "no", + args.max_conn_sequence, args.max_conn_sequence ? "" : " (disabled)", + args.accept_only ? "yes" : "no" + ); res = tls_proxy_run(proxy); tls_proxy_free(proxy); return res;