+++ /dev/null
-#include <assert.h>
-#include <stdio.h>
-#include <unistd.h>
-#include <string.h>
-#include <stdlib.h>
-#include <stdbool.h>
-#include <uv.h>
-#include "array.h"
-
-struct buf {
- char buf[16 * 1024];
- size_t size;
-};
-
-enum peer_state {
- STATE_NOT_CONNECTED,
- STATE_LISTENING,
- STATE_CONNECTED,
- STATE_CONNECT_IN_PROGRESS,
- STATE_CLOSING_IN_PROGRESS
-};
-
-struct proxy_ctx {
- uv_loop_t *loop;
- uv_tcp_t server;
- uv_tcp_t client;
- uv_tcp_t upstream;
- struct sockaddr_storage server_addr;
- struct sockaddr_storage upstream_addr;
-
- int server_state;
- int client_state;
- int upstream_state;
-
- array_t(struct buf *) buffer_pool;
- array_t(struct buf *) upstream_pending;
-};
-
-static void read_from_upstream_cb(uv_stream_t *upstream, ssize_t nread, const uv_buf_t *buf);
-static void read_from_client_cb(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
-
-static struct buf *borrow_io_buffer(struct proxy_ctx *proxy)
-{
- struct buf *buf = NULL;
- if (proxy->buffer_pool.len > 0) {
- buf = array_tail(proxy->buffer_pool);
- array_pop(proxy->buffer_pool);
- } else {
- buf = calloc(1, sizeof (struct buf));
- }
- return buf;
-}
-
-static void release_io_buffer(struct proxy_ctx *proxy, struct buf *buf)
-{
- if (!buf) {
- return;
- }
-
- if (proxy->buffer_pool.len < 1000) {
- buf->size = 0;
- array_push(proxy->buffer_pool, buf);
- } else {
- free(buf);
- }
-}
-
-static void push_to_upstream_pending(struct proxy_ctx *proxy, const char *buf, size_t size)
-{
- while (size > 0) {
- struct buf *b = borrow_io_buffer(proxy);
- b->size = size <= sizeof(b->buf) ? size : sizeof(b->buf);
- memcpy(b->buf, buf, b->size);
- array_push(proxy->upstream_pending, b);
- size -= b->size;
- }
-}
-
-static struct buf *get_first_upstream_pending(struct proxy_ctx *proxy)
-{
- struct buf *buf = NULL;
- if (proxy->upstream_pending.len > 0) {
- buf = proxy->upstream_pending.at[0];
- }
- return buf;
-}
-
-static void remove_first_upstream_pending(struct proxy_ctx *proxy)
-{
- for (int i = 1; i < proxy->upstream_pending.len; ++i) {
- proxy->upstream_pending.at[i - 1] = proxy->upstream_pending.at[i];
- }
- if (proxy->upstream_pending.len > 0) {
- proxy->upstream_pending.len -= 1;
- }
-}
-
-static void clear_upstream_pending(struct proxy_ctx *proxy)
-{
- for (int i = 1; i < proxy->upstream_pending.len; ++i) {
- struct buf *b = proxy->upstream_pending.at[i];
- release_io_buffer(proxy, b);
- }
- proxy->upstream_pending.len = 0;
-}
-
-static void clear_buffer_pool(struct proxy_ctx *proxy)
-{
- for (int i = 1; i < proxy->buffer_pool.len; ++i) {
- struct buf *b = proxy->buffer_pool.at[i];
- free(b);
- }
- proxy->buffer_pool.len = 0;
-}
-
-static void alloc_uv_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
-{
- buf->base = (char*)malloc(suggested_size);
- buf->len = suggested_size;
-}
-
-static void on_client_close(uv_handle_t *handle)
-{
- struct proxy_ctx *proxy = (struct proxy_ctx *)handle->loop->data;
- proxy->client_state = STATE_NOT_CONNECTED;
-}
-
-static void on_upstream_close(uv_handle_t *handle)
-{
- struct proxy_ctx *proxy = (struct proxy_ctx *)handle->loop->data;
- proxy->upstream_state = STATE_NOT_CONNECTED;
-}
-
-static void write_to_client_cb(uv_write_t *req, int status)
-{
- struct proxy_ctx *proxy = (struct proxy_ctx *)req->handle->loop->data;
- free(req);
- if (status) {
- fprintf(stderr, "error writing to client: %s\n", uv_strerror(status));
- clear_upstream_pending(proxy);
- proxy->client_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*)&proxy->client, on_client_close);
- }
-}
-
-static void write_to_upstream_cb(uv_write_t *req, int status)
-{
- struct proxy_ctx *proxy = (struct proxy_ctx *)req->handle->loop->data;
- free(req);
- if (status) {
- fprintf(stderr, "error writing to upstream: %s\n", uv_strerror(status));
- clear_upstream_pending(proxy);
- proxy->upstream_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close);
- return;
- }
- if (proxy->upstream_pending.len > 0) {
- struct buf *buf = get_first_upstream_pending(proxy);
- remove_first_upstream_pending(proxy);
- release_io_buffer(proxy, buf);
- if (proxy->upstream_state == STATE_CONNECTED &&
- proxy->upstream_pending.len > 0) {
- buf = get_first_upstream_pending(proxy);
- /* TODO avoid allocation */
- uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t));
- uv_buf_t wrbuf = uv_buf_init(buf->buf, buf->size);
- uv_write(req, (uv_stream_t *)&proxy->upstream, &wrbuf, 1, write_to_upstream_cb);
- }
- }
-}
-
-static void on_client_connection(uv_stream_t *server, int status)
-{
- if (status < 0) {
- fprintf(stderr, "incoming connection error: %s\n", uv_strerror(status));
- return;
- }
-
- fprintf(stdout, "incoming connection\n");
-
- struct proxy_ctx *proxy = (struct proxy_ctx *)server->loop->data;
- if (proxy->client_state != STATE_NOT_CONNECTED) {
- fprintf(stderr, "client already connected, ignoring\n");
- return;
- }
-
- uv_tcp_init(proxy->loop, &proxy->client);
- proxy->client_state = STATE_CONNECTED;
- if (uv_accept(server, (uv_stream_t*)&proxy->client) == 0) {
- uv_read_start((uv_stream_t*)&proxy->client, alloc_uv_buffer, read_from_client_cb);
- } else {
- proxy->client_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*)&proxy->client, on_client_close);
- }
-}
-
-static void on_connect_to_upstream(uv_connect_t *req, int status)
-{
- struct proxy_ctx *proxy = (struct proxy_ctx *)req->handle->loop->data;
- free(req);
- if (status < 0) {
- fprintf(stderr, "error connecting to upstream: %s\n", uv_strerror(status));
- clear_upstream_pending(proxy);
- proxy->upstream_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close);
- return;
- }
-
- proxy->upstream_state = STATE_CONNECTED;
- uv_read_start((uv_stream_t*)&proxy->upstream, alloc_uv_buffer, read_from_upstream_cb);
- if (proxy->upstream_pending.len > 0) {
- struct buf *buf = get_first_upstream_pending(proxy);
- /* TODO avoid allocation */
- uv_write_t *wreq = (uv_write_t *) malloc(sizeof(uv_write_t));
- uv_buf_t wrbuf = uv_buf_init(buf->buf, buf->size);
- uv_write(wreq, (uv_stream_t *)&proxy->upstream, &wrbuf, 1, write_to_upstream_cb);
- }
-}
-
-static void read_from_client_cb(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf)
-{
- if (nread == 0) {
- return;
- }
- struct proxy_ctx *proxy = (struct proxy_ctx *)client->loop->data;
- if (nread < 0) {
- if (nread != UV_EOF) {
- fprintf(stderr, "error reading from client: %s\n", uv_err_name(nread));
- }
- if (proxy->client_state == STATE_CONNECTED) {
- proxy->client_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*) client, on_client_close);
- }
- return;
- }
- if (proxy->upstream_state == STATE_CONNECTED) {
- if (proxy->upstream_pending.len > 0) {
- push_to_upstream_pending(proxy, buf->base, nread);
- } else {
- /* TODO avoid allocation */
- uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t));
- uv_buf_t wrbuf = uv_buf_init(buf->base, nread);
- uv_write(req, (uv_stream_t *)&proxy->upstream, &wrbuf, 1, write_to_upstream_cb);
- }
- } else if (proxy->upstream_state == STATE_NOT_CONNECTED) {
- /* TODO avoid allocation */
- uv_tcp_init(proxy->loop, &proxy->upstream);
- uv_connect_t *conn = (uv_connect_t *) malloc(sizeof(uv_connect_t));
- proxy->upstream_state = STATE_CONNECT_IN_PROGRESS;
- uv_tcp_connect(conn, &proxy->upstream, (struct sockaddr *)&proxy->upstream_addr,
- on_connect_to_upstream);
- push_to_upstream_pending(proxy, buf->base, nread);
- } else if (proxy->upstream_state == STATE_CONNECT_IN_PROGRESS) {
- push_to_upstream_pending(proxy, buf->base, nread);
- }
-}
-
-static void read_from_upstream_cb(uv_stream_t *upstream, ssize_t nread, const uv_buf_t *buf)
-{
- if (nread == 0) {
- return;
- }
- struct proxy_ctx *proxy = (struct proxy_ctx *)upstream->loop->data;
- if (nread < 0) {
- if (nread != UV_EOF) {
- fprintf(stderr, "error reading from upstream: %s\n", uv_err_name(nread));
- }
- clear_upstream_pending(proxy);
- if (proxy->upstream_state == STATE_CONNECTED) {
- proxy->upstream_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close);
- }
- return;
- }
- if (proxy->client_state == STATE_CONNECTED) {
- /* TODO Avoid allocation */
- uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t));
- uv_buf_t wrbuf = uv_buf_init(buf->base, nread);
- uv_write(req, (uv_stream_t *)&proxy->client, &wrbuf, 1, write_to_client_cb);
- }
-}
-
-struct proxy_ctx *proxy_allocate()
-{
- return malloc(sizeof(struct proxy_ctx));
-}
-
-int proxy_init(struct proxy_ctx *proxy,
- const char *server_addr, int server_port,
- const char *upstream_addr, int upstream_port)
-{
- proxy->loop = uv_default_loop();
- uv_tcp_init(proxy->loop, &proxy->server);
- int res = uv_ip4_addr(server_addr, server_port, (struct sockaddr_in *)&proxy->server_addr);
- if (res != 0) {
- return res;
- }
- res = uv_ip4_addr(upstream_addr, upstream_port, (struct sockaddr_in *)&proxy->upstream_addr);
- if (res != 0) {
- return res;
- }
- array_init(proxy->buffer_pool);
- array_init(proxy->upstream_pending);
- proxy->server_state = STATE_NOT_CONNECTED;
- proxy->client_state = STATE_NOT_CONNECTED;
- proxy->upstream_state = STATE_NOT_CONNECTED;
-
- proxy->loop->data = proxy;
- return 0;
-}
-
-void proxy_free(struct proxy_ctx *proxy)
-{
- if (!proxy) {
- return;
- }
- clear_upstream_pending(proxy);
- clear_buffer_pool(proxy);
- /* TODO correctly close all the uv_tcp_t */
- free(proxy);
-}
-
-int proxy_start_listen(struct proxy_ctx *proxy)
-{
- uv_tcp_bind(&proxy->server, (const struct sockaddr*)&proxy->server_addr, 0);
- int ret = uv_listen((uv_stream_t*)&proxy->server, 128, on_client_connection);
- if (ret == 0) {
- proxy->server_state = STATE_LISTENING;
- }
- return ret;
-}
-
-int proxy_run(struct proxy_ctx *proxy)
-{
- return uv_run(proxy->loop, UV_RUN_DEFAULT);
-}
#define MAX_CLIENT_PENDING_SIZE 4096
struct buf {
- char buf[16 * 1024];
size_t size;
+ char buf[];
};
enum peer_state {
struct tls_ctx {
gnutls_session_t session;
- int handshake_state;
- gnutls_certificate_credentials_t credentials;
- gnutls_priority_t priority_cache;
+ enum handshake_state handshake_state;
/* for reading from the network */
const uint8_t *buf;
ssize_t nread;
uint8_t recv_buf[4096];
};
+struct peer {
+ uv_tcp_t handle;
+ enum peer_state state;
+ struct sockaddr_storage addr;
+ array_t(struct buf *) pending_buf;
+ uint64_t connection_timestamp;
+ struct tls_ctx *tls;
+ struct peer *peer;
+ int active_requests;
+};
+
struct tls_proxy_ctx {
const struct args *a;
-
uv_loop_t *loop;
- uv_tcp_t server;
- uv_tcp_t client;
- uv_tcp_t upstream;
- struct sockaddr_storage server_addr;
+ gnutls_certificate_credentials_t tls_credentials;
+ gnutls_priority_t tls_priority_cache;
+ struct {
+ uv_tcp_t handle;
+ struct sockaddr_storage addr;
+ } server;
struct sockaddr_storage upstream_addr;
- struct sockaddr_storage client_addr;
-
- int server_state;
- int client_state;
- int upstream_state;
-
- uint64_t client_connection_timestamp;
-
- array_t(struct buf *) buffer_pool;
- array_t(struct buf *) upstream_pending;
- array_t(struct buf *) client_pending;
-
- char io_buf[0xFFFF];
- struct tls_ctx tls;
+ array_t(struct peer *) client_list;
+ char uv_wire_buf[65535 * 2];
+ int conn_sequence;
};
static void read_from_upstream_cb(uv_stream_t *upstream, ssize_t nread, const uv_buf_t *buf);
static void read_from_client_cb(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf);
static ssize_t proxy_gnutls_pull(gnutls_transport_ptr_t h, void *buf, size_t len);
static ssize_t proxy_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len);
-static int tls_process_from_upstream(struct tls_proxy_ctx *proxy, const uint8_t *buf, ssize_t nread);
-static int tls_process_from_client(struct tls_proxy_ctx *proxy, const uint8_t *buf, ssize_t nread);
-static int write_to_upstream_pending(struct tls_proxy_ctx *proxy);
-static int write_to_client_pending(struct tls_proxy_ctx *proxy);
-
+static int tls_process_from_upstream(struct peer *upstream, const uint8_t *buf, ssize_t nread);
+static int tls_process_from_client(struct peer *client, const uint8_t *buf, ssize_t nread);
+static int write_to_upstream_pending(struct peer *peer);
+static int write_to_client_pending(struct peer *peer);
+static void on_client_close(uv_handle_t *handle);
+static void on_upstream_close(uv_handle_t *handle);
static int gnutls_references = 0;
+static struct tls_proxy_ctx *get_proxy(struct peer *peer)
+{
+ return (struct tls_proxy_ctx *)peer->handle.loop->data;
+}
+
const void *ip_addr(const struct sockaddr *addr)
{
if (!addr) {
return ret;
}
-static inline char *ip_straddr(const struct sockaddr *addr)
+static inline char *ip_straddr(const struct sockaddr_storage *saddr_storage)
{
- assert(addr != NULL);
+ assert(saddr_storage != NULL);
+ const struct sockaddr *addr = (const struct sockaddr *)saddr_storage;
/* We are the sinle-threaded application */
static char str[INET6_ADDRSTRLEN + 6];
size_t len = sizeof(str);
return ret != 0 || len == 0 ? NULL : str;
}
-static struct buf *borrow_io_buffer(struct tls_proxy_ctx *proxy)
+static struct buf *alloc_io_buffer(size_t size)
{
- struct buf *buf = NULL;
- if (proxy->buffer_pool.len > 0) {
- buf = array_tail(proxy->buffer_pool);
- array_pop(proxy->buffer_pool);
- } else {
- buf = calloc(1, sizeof (struct buf));
- }
+ struct buf *buf = calloc(1, sizeof (struct buf) + size);
+ buf->size = size;
return buf;
}
-static void release_io_buffer(struct tls_proxy_ctx *proxy, struct buf *buf)
+static void free_io_buffer(struct buf *buf)
{
if (!buf) {
return;
}
-
- if (proxy->buffer_pool.len < 1000) {
- buf->size = 0;
- array_push(proxy->buffer_pool, buf);
- } else {
- free(buf);
- }
+ free(buf);
}
-static struct buf *get_first_upstream_pending(struct tls_proxy_ctx *proxy)
+static struct buf *get_first_pending_buf(struct peer *peer)
{
struct buf *buf = NULL;
- if (proxy->upstream_pending.len > 0) {
- buf = proxy->upstream_pending.at[0];
+ if (peer->pending_buf.len > 0) {
+ buf = peer->pending_buf.at[0];
}
return buf;
}
-static struct buf *get_first_client_pending(struct tls_proxy_ctx *proxy)
+static struct buf *remove_first_pending_buf(struct peer *peer)
{
- struct buf *buf = NULL;
- if (proxy->client_pending.len > 0) {
- buf = proxy->client_pending.at[0];
- }
- return buf;
-}
-
-static void remove_first_upstream_pending(struct tls_proxy_ctx *proxy)
-{
- for (int i = 1; i < proxy->upstream_pending.len; ++i) {
- proxy->upstream_pending.at[i - 1] = proxy->upstream_pending.at[i];
- }
- if (proxy->upstream_pending.len > 0) {
- proxy->upstream_pending.len -= 1;
- }
-}
-
-static void remove_first_client_pending(struct tls_proxy_ctx *proxy)
-{
- for (int i = 1; i < proxy->client_pending.len; ++i) {
- proxy->client_pending.at[i - 1] = proxy->client_pending.at[i];
+ if (peer->pending_buf.len == 0) {
+ return NULL;
}
- if (proxy->client_pending.len > 0) {
- proxy->client_pending.len -= 1;
+ struct buf * buf = peer->pending_buf.at[0];
+ for (int i = 1; i < peer->pending_buf.len; ++i) {
+ peer->pending_buf.at[i - 1] = peer->pending_buf.at[i];
}
+ peer->pending_buf.len -= 1;
+ return buf;
}
-static void clear_upstream_pending(struct tls_proxy_ctx *proxy)
-{
- for (int i = 0; i < proxy->upstream_pending.len; ++i) {
- struct buf *b = proxy->upstream_pending.at[i];
- release_io_buffer(proxy, b);
- }
- proxy->upstream_pending.len = 0;
-}
-
-static void clear_client_pending(struct tls_proxy_ctx *proxy)
-{
- for (int i = 0; i < proxy->client_pending.len; ++i) {
- struct buf *b = proxy->client_pending.at[i];
- release_io_buffer(proxy, b);
- }
- proxy->client_pending.len = 0;
-}
-
-static void clear_buffer_pool(struct tls_proxy_ctx *proxy)
+static void clear_pending_bufs(struct peer *peer)
{
- for (int i = 0; i < proxy->buffer_pool.len; ++i) {
- struct buf *b = proxy->buffer_pool.at[i];
- free(b);
+ for (int i = 0; i < peer->pending_buf.len; ++i) {
+ struct buf *b = peer->pending_buf.at[i];
+ free_io_buffer(b);
}
- proxy->buffer_pool.len = 0;
+ peer->pending_buf.len = 0;
}
static void alloc_uv_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf)
{
struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)handle->loop->data;
- buf->base = proxy->io_buf;
- buf->len = sizeof(proxy->io_buf);
+ buf->base = proxy->uv_wire_buf;
+ buf->len = sizeof(proxy->uv_wire_buf);
}
static void on_client_close(uv_handle_t *handle)
{
- struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)handle->loop->data;
- gnutls_deinit(proxy->tls.session);
- proxy->tls.handshake_state = TLS_HS_NOT_STARTED;
- proxy->client_state = STATE_NOT_CONNECTED;
-}
-
-static void on_dummmy_client_close(uv_handle_t *handle)
-{
- free(handle);
+ struct peer *client = (struct peer *)handle->data;
+ struct peer *upstream = client->peer;
+ fprintf(stdout, "[client] connection with '%s' closed\n", ip_straddr(&client->addr));
+ assert(client->tls);
+ gnutls_deinit(client->tls->session);
+ client->tls->handshake_state = TLS_HS_NOT_STARTED;
+ client->state = STATE_NOT_CONNECTED;
+ if (upstream->state != STATE_NOT_CONNECTED) {
+ if (upstream->state == STATE_CONNECTED) {
+ fprintf(stdout, "[client] closing connection with upstream for '%s'\n",
+ ip_straddr(&client->addr));
+ upstream->state = STATE_CLOSING_IN_PROGRESS;
+ uv_close((uv_handle_t*)&upstream->handle, on_upstream_close);
+ }
+ return;
+ }
+ struct tls_proxy_ctx *proxy = get_proxy(client);
+ for (size_t i = 0; i < proxy->client_list.len; ++i) {
+ struct peer *client_i = proxy->client_list.at[i];
+ if (client_i == client) {
+ fprintf(stdout, "[client] connection structures deallocated for '%s'\n",
+ ip_straddr(&client->addr));
+ array_del(proxy->client_list, i);
+ free(client->tls);
+ free(client);
+ break;
+ }
+ }
}
static void on_upstream_close(uv_handle_t *handle)
{
- struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)handle->loop->data;
- proxy->upstream_state = STATE_NOT_CONNECTED;
+ struct peer *upstream = (struct peer *)handle->data;
+ struct peer *client = upstream->peer;
+ assert(upstream->tls == NULL);
+ upstream->state = STATE_NOT_CONNECTED;
+ fprintf(stdout, "[upstream] connection with upstream closed for client '%s'\n", ip_straddr(&client->addr));
+ if (client->state != STATE_NOT_CONNECTED) {
+ if (client->state == STATE_CONNECTED) {
+ fprintf(stdout, "[upstream] closing connection to client '%s'\n",
+ ip_straddr(&client->addr));
+ client->state = STATE_CLOSING_IN_PROGRESS;
+ uv_close((uv_handle_t*)&client->handle, on_client_close);
+ }
+ return;
+ }
+ struct tls_proxy_ctx *proxy = get_proxy(upstream);
+ for (size_t i = 0; i < proxy->client_list.len; ++i) {
+ struct peer *client_i = proxy->client_list.at[i];
+ if (client_i == client) {
+ fprintf(stdout, "[upstream] connection structures deallocated for '%s'\n",
+ ip_straddr(&client->addr));
+ array_del(proxy->client_list, i);
+ free(upstream);
+ free(client->tls);
+ free(client);
+ break;
+ }
+ }
}
static void write_to_client_cb(uv_write_t *req, int status)
{
- struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)req->handle->loop->data;
+ struct peer *client = (struct peer *)req->handle->data;
free(req);
+ client->active_requests -= 1;
if (status) {
- fprintf(stderr, "error writing to client: %s\n", uv_strerror(status));
- clear_client_pending(proxy);
- if (proxy->client_state == STATE_CONNECTED) {
- proxy->client_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*)&proxy->client, on_client_close);
+ fprintf(stdout, "[client] error writing to client '%s': %s\n",
+ ip_straddr(&client->addr), uv_strerror(status));
+ clear_pending_bufs(client);
+ if (client->state == STATE_CONNECTED) {
+ client->state = STATE_CLOSING_IN_PROGRESS;
+ uv_close((uv_handle_t*)&client->handle, on_client_close);
return;
}
}
- fprintf(stdout, "successfully wrote to client, pending len is %zd\n",
- proxy->client_pending.len);
- if (proxy->client_state == STATE_CONNECTED &&
- proxy->tls.handshake_state == TLS_HS_DONE) {
- uint64_t elapsed = uv_now(proxy->loop) - proxy->client_connection_timestamp;
+ fprintf(stdout, "[client] successfully wrote to client '%s', pending len is %zd, active requests %i\n",
+ ip_straddr(&client->addr), client->pending_buf.len, client->active_requests);
+ if (client->state == STATE_CONNECTED &&
+ client->tls->handshake_state == TLS_HS_DONE) {
+ struct tls_proxy_ctx *proxy = get_proxy(client);
+ uint64_t elapsed = uv_now(proxy->loop) - client->connection_timestamp;
if (!proxy->a->close_connection || elapsed < proxy->a->close_timeout) {
- write_to_client_pending(proxy);
+ write_to_client_pending(client);
} else {
- clear_client_pending(proxy);
- proxy->client_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*)&proxy->client, on_client_close);
- fprintf(stdout, "closing connection to client\n");
+ clear_pending_bufs(client);
+ client->state = STATE_CLOSING_IN_PROGRESS;
+ uv_close((uv_handle_t*)&client->handle, on_client_close);
+ fprintf(stdout, "[client] closing connection to client '%s'\n", ip_straddr(&client->addr));
}
}
}
static void write_to_upstream_cb(uv_write_t *req, int status)
{
- struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)req->handle->loop->data;
+ struct peer *upstream = (struct peer *)req->handle->data;
+ void *data = req->data;
+ free(req);
if (status) {
- free(req);
- fprintf(stderr, "error writing to upstream: %s\n", uv_strerror(status));
- clear_upstream_pending(proxy);
- proxy->upstream_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close);
+ fprintf(stdout, "[upstream] error writing to upstream: %s\n", uv_strerror(status));
+ clear_pending_bufs(upstream);
+ upstream->state = STATE_CLOSING_IN_PROGRESS;
+ uv_close((uv_handle_t*)&upstream->handle, on_upstream_close);
return;
}
- if (req->data != NULL) {
- assert(proxy->upstream_pending.len > 0);
- struct buf *buf = get_first_upstream_pending(proxy);
- assert(req->data == (void *)buf->buf);
- fprintf(stdout, "successfully wrote %zi bytes to upstream, pending len is %zd\n",
- buf->size, proxy->upstream_pending.len);
- remove_first_upstream_pending(proxy);
- release_io_buffer(proxy, buf);
+ if (data != NULL) {
+ assert(upstream->pending_buf.len > 0);
+ struct buf *buf = get_first_pending_buf(upstream);
+ assert(data == (void *)buf->buf);
+ fprintf(stdout, "[upstream] successfully wrote %zi bytes to upstream, pending len is %zd\n",
+ buf->size, upstream->pending_buf.len);
+ remove_first_pending_buf(upstream);
+ free_io_buffer(buf);
} else {
- fprintf(stdout, "successfully wrote bytes to upstream, pending len is %zd\n",
- proxy->upstream_pending.len);
+ fprintf(stdout, "[upstream] successfully wrote to upstream, pending len is %zd\n",
+ upstream->pending_buf.len);
}
- if (proxy->client_state != STATE_CONNECTED) {
- clear_upstream_pending(proxy);
- } else if (proxy->upstream_state == STATE_CONNECTED &&
- proxy->upstream_pending.len > 0) {
- write_to_upstream_pending(proxy);
+ if (upstream->peer == NULL || upstream->peer->state != STATE_CONNECTED) {
+ clear_pending_bufs(upstream);
+ } else if (upstream->state == STATE_CONNECTED && upstream->pending_buf.len > 0) {
+ write_to_upstream_pending(upstream);
}
- free(req);
}
-static void on_client_connection(uv_stream_t *server, int status)
+static void accept_connection_from_client(uv_stream_t *server)
{
- if (status < 0) {
- fprintf(stderr, "incoming connection error: %s\n", uv_strerror(status));
- return;
- }
-
- int err = 0;
- int ret = 0;
struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)server->loop->data;
- if (proxy->client_state != STATE_NOT_CONNECTED) {
- fprintf(stderr, "incoming connection");
- uv_tcp_t *dummy_client = malloc(sizeof(uv_tcp_t));
- uv_tcp_init(proxy->loop, dummy_client);
- err = uv_accept(server, (uv_stream_t*)dummy_client);
- if (err == 0) {
- struct sockaddr dummy_addr;
- int dummy_addr_len = sizeof(dummy_addr);
- ret = uv_tcp_getpeername(dummy_client,
- &dummy_addr,
- &dummy_addr_len);
- if (ret == 0) {
- fprintf(stderr, " from %s", ip_straddr(&dummy_addr));
- }
- uv_close((uv_handle_t *)dummy_client, on_dummmy_client_close);
- } else {
- on_dummmy_client_close((uv_handle_t *)dummy_client);
- }
- fprintf(stderr, " - client already connected, rejecting\n");
- return;
- }
+ struct peer *client = calloc(1, sizeof(struct peer));
+ uv_tcp_init(proxy->loop, &client->handle);
+ uv_tcp_nodelay((uv_tcp_t *)&client->handle, 1);
- uv_tcp_init(proxy->loop, &proxy->client);
- uv_tcp_nodelay((uv_tcp_t *)&proxy->client, 1);
- proxy->client_state = STATE_CONNECTED;
- err = uv_accept(server, (uv_stream_t*)&proxy->client);
+ int err = uv_accept(server, (uv_stream_t*)&client->handle);
if (err != 0) {
- fprintf(stderr, "incoming connection - uv_accept() failed: (%d) %s\n",
- err, uv_strerror(err));
+ fprintf(stdout, "[client] incoming connection - uv_accept() failed: (%d) %s\n",
+ err, uv_strerror(err));
+ proxy->conn_sequence = 0;
return;
}
- struct sockaddr *addr = (struct sockaddr *)&(proxy->client_addr);
- int addr_len = sizeof(proxy->client_addr);
- ret = uv_tcp_getpeername(&proxy->client, addr, &addr_len);
+ client->state = STATE_CONNECTED;
+ array_init(client->pending_buf);
+ client->handle.data = client;
+
+ struct peer *upstream = calloc(1, sizeof(struct peer));
+ uv_tcp_init(proxy->loop, &upstream->handle);
+ uv_tcp_nodelay((uv_tcp_t *)&upstream->handle, 1);
+
+ client->peer = upstream;
+
+ array_init(upstream->pending_buf);
+ upstream->state = STATE_NOT_CONNECTED;
+ upstream->peer = client;
+ upstream->handle.data = upstream;
+
+ struct sockaddr *addr = (struct sockaddr *)&(client->addr);
+ int addr_len = sizeof(client->addr);
+ int ret = uv_tcp_getpeername(&client->handle, addr, &addr_len);
if (ret || addr->sa_family == AF_UNSPEC) {
- proxy->client_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*)&proxy->client, on_client_close);
- fprintf(stderr, "incoming connection - uv_tcp_getpeername() failed: (%d) %s\n",
+ client->state = STATE_CLOSING_IN_PROGRESS;
+ uv_close((uv_handle_t*)&client->handle, on_client_close);
+ fprintf(stdout, "[client] incoming connection - uv_tcp_getpeername() failed: (%d) %s\n",
err, uv_strerror(err));
+ proxy->conn_sequence = 0;
return;
}
+ memcpy(&upstream->addr, &proxy->upstream_addr, sizeof(struct sockaddr_storage));
- fprintf(stdout, "incoming connection from %s\n", ip_straddr(addr));
-
- uv_read_start((uv_stream_t*)&proxy->client, alloc_uv_buffer, read_from_client_cb);
+ struct tls_ctx *tls = calloc(1, sizeof(struct tls_ctx));
+ tls->handshake_state = TLS_HS_NOT_STARTED;
+ client->tls = tls;
const char *errpos = NULL;
- struct tls_ctx *tls = &proxy->tls;
- assert (tls->handshake_state == TLS_HS_NOT_STARTED);
err = gnutls_init(&tls->session, GNUTLS_SERVER | GNUTLS_NONBLOCK);
if (err != GNUTLS_E_SUCCESS) {
- fprintf(stderr, "gnutls_init() failed: (%d) %s\n",
+ fprintf(stdout, "[client] gnutls_init() failed: (%d) %s\n",
err, gnutls_strerror_name(err));
}
- err = gnutls_priority_set(tls->session, tls->priority_cache);
+ err = gnutls_priority_set(tls->session, proxy->tls_priority_cache);
if (err != GNUTLS_E_SUCCESS) {
- fprintf(stderr, "gnutls_priority_set() failed: (%d) %s\n",
+ fprintf(stdout, "[client] gnutls_priority_set() failed: (%d) %s\n",
err, gnutls_strerror_name(err));
}
- err = gnutls_credentials_set(tls->session, GNUTLS_CRD_CERTIFICATE, tls->credentials);
+ err = gnutls_credentials_set(tls->session, GNUTLS_CRD_CERTIFICATE, proxy->tls_credentials);
if (err != GNUTLS_E_SUCCESS) {
- fprintf(stderr, "gnutls_credentials_set() failed: (%d) %s\n",
+ fprintf(stdout, "[client] gnutls_credentials_set() failed: (%d) %s\n",
err, gnutls_strerror_name(err));
}
gnutls_certificate_server_set_request(tls->session, GNUTLS_CERT_IGNORE);
gnutls_handshake_set_timeout(tls->session, GNUTLS_DEFAULT_HANDSHAKE_TIMEOUT);
-
gnutls_transport_set_pull_function(tls->session, proxy_gnutls_pull);
gnutls_transport_set_push_function(tls->session, proxy_gnutls_push);
- gnutls_transport_set_ptr(tls->session, proxy);
+ gnutls_transport_set_ptr(tls->session, client);
tls->handshake_state = TLS_HS_IN_PROGRESS;
- proxy->client_connection_timestamp = uv_now(proxy->loop);
+ client->connection_timestamp = uv_now(proxy->loop);
+ proxy->conn_sequence += 1;
+ array_push(proxy->client_list, client);
+
+ fprintf(stdout, "[client] incoming connection from '%s'\n", ip_straddr(&client->addr));
+ uv_read_start((uv_stream_t*)&client->handle, alloc_uv_buffer, read_from_client_cb);
+}
+
+static void dynamic_handle_close_cb(uv_handle_t *handle)
+{
+ free(handle);
+}
+
+static void delayed_accept_timer_cb(uv_timer_t *timer)
+{
+ uv_stream_t *server = (uv_stream_t *)timer->data;
+ fprintf(stdout, "[client] delayed connection processing\n");
+ accept_connection_from_client(server);
+ uv_close((uv_handle_t *)timer, dynamic_handle_close_cb);
+}
+
+static void on_client_connection(uv_stream_t *server, int status)
+{
+ if (status < 0) {
+ fprintf(stdout, "[client] incoming connection error: %s\n", uv_strerror(status));
+ return;
+ }
+ struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)server->loop->data;
+ proxy->conn_sequence += 1;
+ if (proxy->a->max_conn_sequence > 0 &&
+ proxy->conn_sequence > proxy->a->max_conn_sequence) {
+ fprintf(stdout, "[client] incoming connection, delaying\n");
+ uv_timer_t *timer = (uv_timer_t*)malloc(sizeof *timer);
+ uv_timer_init(uv_default_loop(), timer);
+ timer->data = server;
+ uv_timer_start(timer, delayed_accept_timer_cb, 10000, 0);
+ proxy->conn_sequence = 0;
+ } else {
+ accept_connection_from_client(server);
+ }
}
static void on_connect_to_upstream(uv_connect_t *req, int status)
{
- struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)req->handle->loop->data;
+ struct peer *upstream = (struct peer *)req->handle->data;
+ struct tls_proxy_ctx *proxy = get_proxy(upstream);
free(req);
if (status < 0) {
- fprintf(stderr, "error connecting to upstream (%s): %s\n",
- ip_straddr((struct sockaddr *)&proxy->upstream_addr),
+ fprintf(stdout, "[upstream] error connecting to upstream (%s): %s\n",
+ ip_straddr(&upstream->addr),
uv_strerror(status));
- clear_upstream_pending(proxy);
- proxy->upstream_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close);
+ clear_pending_bufs(upstream);
+ upstream->state = STATE_CLOSING_IN_PROGRESS;
+ uv_close((uv_handle_t*)&upstream->handle, on_upstream_close);
return;
}
- fprintf(stdout, "connected to %s\n", ip_straddr((struct sockaddr *)&proxy->upstream_addr));
+ fprintf(stdout, "[upstream] connected to %s\n", ip_straddr(&upstream->addr));
- proxy->upstream_state = STATE_CONNECTED;
- uv_read_start((uv_stream_t*)&proxy->upstream, alloc_uv_buffer, read_from_upstream_cb);
- if (proxy->upstream_pending.len > 0) {
- write_to_upstream_pending(proxy);
+ upstream->state = STATE_CONNECTED;
+ uv_read_start((uv_stream_t*)&upstream->handle, alloc_uv_buffer, read_from_upstream_cb);
+ if (upstream->pending_buf.len > 0) {
+ write_to_upstream_pending(upstream);
}
}
-static void read_from_client_cb(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf)
+static void read_from_client_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
- fprintf(stdout, "reading %zd bytes from client\n", nread);
if (nread == 0) {
+ fprintf(stdout, "[client] reading %zd bytes\n", nread);
return;
}
- struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)client->loop->data;
+ struct peer *client = (struct peer *)handle->data;
if (nread < 0) {
if (nread != UV_EOF) {
- fprintf(stderr, "error reading from client: %s\n", uv_err_name(nread));
+ fprintf(stdout, "[client] error reading from '%s': %s\n",
+ ip_straddr(&client->addr),
+ uv_err_name(nread));
} else {
- fprintf(stdout, "client has closed the connection\n");
+ fprintf(stdout, "[client] closing connection with '%s'\n",
+ ip_straddr(&client->addr));
}
- if (proxy->client_state == STATE_CONNECTED) {
- proxy->client_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*) client, on_client_close);
+ if (client->state == STATE_CONNECTED) {
+ client->state = STATE_CLOSING_IN_PROGRESS;
+ uv_close((uv_handle_t*)handle, on_client_close);
}
return;
}
- int res = tls_process_from_client(proxy, buf->base, nread);
+ struct tls_proxy_ctx *proxy = get_proxy(client);
+ if (proxy->a->accept_only) {
+ fprintf(stdout, "[client] ignoring %zd bytes from '%s'\n", nread, ip_straddr(&client->addr));
+ return;
+ }
+ fprintf(stdout, "[client] reading %zd bytes from '%s'\n", nread, ip_straddr(&client->addr));
+
+ int res = tls_process_from_client(client, buf->base, nread);
if (res < 0) {
- if (proxy->client_state == STATE_CONNECTED) {
- proxy->client_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*) client, on_client_close);
+ if (client->state == STATE_CONNECTED) {
+ client->state = STATE_CLOSING_IN_PROGRESS;
+ uv_close((uv_handle_t*)&client->handle, on_client_close);
}
}
}
-static void read_from_upstream_cb(uv_stream_t *upstream, ssize_t nread, const uv_buf_t *buf)
+static void read_from_upstream_cb(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
{
- fprintf(stdout, "reading %zd bytes from upstream\n", nread);
+ fprintf(stdout, "[upstream] reading %zd bytes\n", nread);
if (nread == 0) {
return;
}
- struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)upstream->loop->data;
+ struct peer *upstream = (struct peer *)handle->data;
if (nread < 0) {
if (nread != UV_EOF) {
- fprintf(stderr, "error reading from upstream: %s\n", uv_err_name(nread));
+ fprintf(stdout, "[upstream] error reading from upstream: %s\n", uv_err_name(nread));
} else {
- fprintf(stdout, "upstream has closed the connection\n");
+ fprintf(stdout, "[upstream] closing connection\n");
}
- clear_upstream_pending(proxy);
- if (proxy->upstream_state == STATE_CONNECTED) {
- proxy->upstream_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*)&proxy->upstream, on_upstream_close);
+ clear_pending_bufs(upstream);
+ if (upstream->state == STATE_CONNECTED) {
+ upstream->state = STATE_CLOSING_IN_PROGRESS;
+ uv_close((uv_handle_t*)&upstream->handle, on_upstream_close);
}
return;
}
- int res = tls_process_from_upstream(proxy, buf->base, nread);
+ int res = tls_process_from_upstream(upstream, buf->base, nread);
if (res < 0) {
- fprintf(stderr, "error sending tls data to client\n");
- if (proxy->client_state == STATE_CONNECTED) {
- proxy->client_state = STATE_CLOSING_IN_PROGRESS;
- uv_close((uv_handle_t*)&proxy->client, on_client_close);
+ fprintf(stdout, "[upstream] error processing tls data to client\n");
+ if (upstream->peer->state == STATE_CONNECTED) {
+ upstream->peer->state = STATE_CLOSING_IN_PROGRESS;
+ uv_close((uv_handle_t*)&upstream->peer->handle, on_client_close);
}
}
}
-static void push_to_upstream_pending(struct tls_proxy_ctx *proxy, const char *buf, size_t size)
+static void push_to_upstream_pending(struct peer *upstream, const char *buf, size_t size)
{
- while (size > 0) {
- struct buf *b = borrow_io_buffer(proxy);
- b->size = size <= sizeof(b->buf) ? size : sizeof(b->buf);
- memcpy(b->buf, buf, b->size);
- array_push(proxy->upstream_pending, b);
- size -= b->size;
- buf += b->size;
- }
+ struct buf *b = alloc_io_buffer(size);
+ memcpy(b->buf, buf, b->size);
+ array_push(upstream->pending_buf, b);
}
-static void push_to_client_pending(struct tls_proxy_ctx *proxy, const char *buf, size_t size)
+static void push_to_client_pending(struct peer *client, const char *buf, size_t size)
{
- if (proxy->a->rehandshake &&
- proxy->client_pending.len > MAX_CLIENT_PENDING_SIZE) {
- return;
- }
+ struct tls_proxy_ctx *proxy = get_proxy(client);
while (size > 0) {
- struct buf *b = borrow_io_buffer(proxy);
- b->size = size <= sizeof(b->buf) ? size : sizeof(b->buf);
- if (proxy->a->rehandshake && b->size > CLIENT_ANSWER_CHUNK_SIZE) {
- b->size = CLIENT_ANSWER_CHUNK_SIZE;
+ int temp_size = size;
+ if (proxy->a->rehandshake && temp_size > CLIENT_ANSWER_CHUNK_SIZE) {
+ temp_size = CLIENT_ANSWER_CHUNK_SIZE;
}
+ struct buf *b = alloc_io_buffer(temp_size);
memcpy(b->buf, buf, b->size);
- array_push(proxy->client_pending, b);
- size -= b->size;
- buf += b->size;
+ array_push(client->pending_buf, b);
+ size -= temp_size;
+ buf += temp_size;
}
}
-static int write_to_upstream_pending(struct tls_proxy_ctx *proxy)
+static int write_to_upstream_pending(struct peer *upstream)
{
- struct buf *buf = get_first_upstream_pending(proxy);
- /* TODO avoid allocation */
+ struct tls_proxy_ctx *proxy = get_proxy(upstream);
+ struct buf *buf = get_first_pending_buf(upstream);
uv_write_t *req = (uv_write_t *) malloc(sizeof(uv_write_t));
uv_buf_t wrbuf = uv_buf_init(buf->buf, buf->size);
req->data = buf->buf;
- fprintf(stdout, "writing %zd bytes to upstream\n", buf->size);
- return uv_write(req, (uv_stream_t *)&proxy->upstream, &wrbuf, 1, write_to_upstream_cb);
+ fprintf(stdout, "[upstream] writing %zd bytes\n", buf->size);
+ return uv_write(req, (uv_stream_t *)&upstream->handle, &wrbuf, 1, write_to_upstream_cb);
}
static ssize_t proxy_gnutls_pull(gnutls_transport_ptr_t h, void *buf, size_t len)
{
- struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)h;
- struct tls_ctx *t = &proxy->tls;
+ struct peer *peer = (struct peer *)h;
+ struct tls_ctx *t = peer->tls;
- fprintf(stdout, "\t gnutls: pulling %zd bytes from client\n", len);
+ fprintf(stdout, "[gnutls_pull] pulling %zd bytes\n", len);
if (t->nread <= t->consumed) {
errno = EAGAIN;
- fprintf(stdout, "\t gnutls: return EAGAIN\n");
+ fprintf(stdout, "[gnutls_pull] return EAGAIN\n");
return -1;
}
ssize_t proxy_gnutls_push(gnutls_transport_ptr_t h, const void *buf, size_t len)
{
- struct tls_proxy_ctx *proxy = (struct tls_proxy_ctx *)h;
- struct tls_ctx *t = &proxy->tls;
- fprintf(stdout, "\t gnutls: writing %zd bytes to client\n", len);
+ struct peer *client = (struct peer *)h;
+ struct tls_ctx *t = client->tls;
+ fprintf(stdout, "[gnutls_push] writing %zd bytes\n", len);
ssize_t ret = -1;
const size_t req_size_aligned = ((sizeof(uv_write_t) / 16) + 1) * 16;
{ data, len }
};
memcpy(data, buf, len);
- req->data = data;
- int res = uv_write(req, (uv_stream_t *)&proxy->client, uv_buf, 1, write_to_client_cb);
+ int res = uv_write(req, (uv_stream_t *)&client->handle, uv_buf, 1, write_to_client_cb);
if (res == 0) {
ret = len;
+ client->active_requests += 1;
} else {
free(common_buf);
errno = EIO;
return ret;
}
-static int write_to_client_pending(struct tls_proxy_ctx *proxy)
+static int write_to_client_pending(struct peer *client)
{
- if (proxy->client_pending.len == 0) {
+ if (client->pending_buf.len == 0) {
return 0;
}
- struct buf *buf = get_first_client_pending(proxy);
+ struct tls_proxy_ctx *proxy = get_proxy(client);
+ struct buf *buf = get_first_pending_buf(client);
uv_buf_t wrbuf = uv_buf_init(buf->buf, buf->size);
- fprintf(stdout, "writing %zd bytes to client\n", buf->size);
+ fprintf(stdout, "[client] writing %zd bytes\n", buf->size);
- gnutls_session_t tls_session = proxy->tls.session;
- assert(proxy->tls.handshake_state != TLS_HS_IN_PROGRESS);
+ gnutls_session_t tls_session = client->tls->session;
+ assert(client->tls->handshake_state != TLS_HS_IN_PROGRESS);
char *data = buf->buf;
size_t len = buf->size;
count = gnutls_record_send(tls_session, data, len);
if (count < 0) {
if (gnutls_error_is_fatal(count)) {
- fprintf(stderr, "gnutls_record_send failed: %s (%zd)\n",
+ fprintf(stdout, "[client] gnutls_record_send failed: %s (%zd)\n",
gnutls_strerror_name(count), count);
return -1;
}
if (++retries > TLS_MAX_SEND_RETRIES) {
- fprintf(stderr, "gnutls_record_send: too many sequential non-fatal errors (%zd), last error is: %s (%zd)\n",
+ fprintf(stdout, "[client] gnutls_record_send: too many sequential non-fatal errors (%zd), last error is: %s (%zd)\n",
retries, gnutls_strerror_name(count), count);
return -1;
}
if (++retries < TLS_MAX_SEND_RETRIES) {
continue;
}
- fprintf(stderr, "gnutls_record_send: too many retries (%zd)\n",
+ fprintf(stdout, "[client] gnutls_record_send: too many retries (%zd)\n",
retries);
- fprintf(stderr, "tls_push_to_client didn't send all data(%zd of %zd)\n",
+ fprintf(stdout, "[client] tls_push_to_client didn't send all data(%zd of %zd)\n",
len, submitted);
return -1;
}
} while (len > 0);
- remove_first_client_pending(proxy);
- release_io_buffer(proxy, buf);
+ remove_first_pending_buf(client);
+ free_io_buffer(buf);
- fprintf(stdout, "submitted %zd bytes to client\n", submitted);
+ fprintf(stdout, "[client] submitted %zd bytes\n", submitted);
if (proxy->a->rehandshake) {
assert (gnutls_safe_renegotiation_status(tls_session) != 0);
assert (gnutls_rehandshake(tls_session) == GNUTLS_E_SUCCESS);
/* Prevent write-to-client callback from sending next pending chunk.
* At the same time tls_process_from_client() must not call gnutls_handshake()
* as there can be application data in this direction. */
- proxy->tls.handshake_state = TLS_HS_EXPECTED;
- fprintf(stdout, "rehandshake started\n");
+ client->tls->handshake_state = TLS_HS_EXPECTED;
+ fprintf(stdout, "[client] rehandshake started\n");
}
return submitted;
}
-static int tls_process_from_upstream(struct tls_proxy_ctx *proxy, const uint8_t *buf, ssize_t len)
+static int tls_process_from_upstream(struct peer *upstream, const uint8_t *buf, ssize_t len)
{
- gnutls_session_t tls_session = proxy->tls.session;
+ struct peer *client = upstream->peer;
+ gnutls_session_t tls_session = client->tls->session;
- fprintf(stdout, "pushing %zd bytes to client\n", len);
+ fprintf(stdout, "[upstream] pushing %zd bytes to client\n", len);
ssize_t submitted = 0;
- if (proxy->client_state != STATE_CONNECTED) {
+ if (client->state != STATE_CONNECTED) {
return submitted;
}
- bool list_was_empty = (proxy->client_pending.len == 0);
- push_to_client_pending(proxy, buf, len);
+ bool list_was_empty = (client->pending_buf.len == 0);
+ push_to_client_pending(client, buf, len);
submitted = len;
- if (proxy->tls.handshake_state == TLS_HS_DONE) {
- if (list_was_empty && proxy->client_pending.len > 0) {
- int ret = write_to_client_pending(proxy);
+ if (client->tls->handshake_state == TLS_HS_DONE) {
+ if (list_was_empty && client->pending_buf.len > 0) {
+ int ret = write_to_client_pending(client);
if (ret < 0) {
submitted = -1;
}
return submitted;
}
-int tls_process_handshake(struct tls_proxy_ctx *proxy)
+int tls_process_handshake(struct peer *peer)
{
- struct tls_ctx *tls = &proxy->tls;
+ struct tls_ctx *tls = peer->tls;
int ret = 1;
while (tls->handshake_state == TLS_HS_IN_PROGRESS) {
- fprintf(stdout, "TLS handshake in progress...\n");
+ fprintf(stdout, "[tls] TLS handshake in progress...\n");
int err = gnutls_handshake(tls->session);
if (err == GNUTLS_E_SUCCESS) {
tls->handshake_state = TLS_HS_DONE;
- fprintf(stdout, "TLS handshake has completed\n");
+ fprintf(stdout, "[tls] TLS handshake has completed\n");
ret = 1;
- if (proxy->client_pending.len != 0) {
- write_to_client_pending(proxy);
+ if (peer->pending_buf.len != 0) {
+ write_to_client_pending(peer);
}
} else if (gnutls_error_is_fatal(err)) {
- fprintf(stderr, "gnutls_handshake failed: %s (%d)\n",
+ fprintf(stdout, "[tls] gnutls_handshake failed: %s (%d)\n",
gnutls_strerror_name(err), err);
ret = -1;
break;
} else {
- fprintf(stderr, "gnutls_handshake nonfatal error: %s (%d)\n",
+ fprintf(stdout, "[tls] gnutls_handshake nonfatal error: %s (%d)\n",
gnutls_strerror_name(err), err);
ret = 0;
break;
return ret;
}
-int tls_process_from_client(struct tls_proxy_ctx *proxy, const uint8_t *buf, ssize_t nread)
+int tls_process_from_client(struct peer *client, const uint8_t *buf, ssize_t nread)
{
- struct tls_ctx *tls = &proxy->tls;
+ struct tls_ctx *tls = client->tls;
tls->buf = buf;
tls->nread = nread >= 0 ? nread : 0;
tls->consumed = 0;
- fprintf(stdout, "tls_process: reading %zd bytes from client\n", nread);
+ fprintf(stdout, "[client] tls_process: reading %zd bytes from client\n", nread);
- int ret = tls_process_handshake(proxy);
+ int ret = tls_process_handshake(client);
if (ret <= 0) {
return ret;
}
int submitted = 0;
while (true) {
- ssize_t count = 0;
- count = gnutls_record_recv(tls->session, tls->recv_buf, sizeof(tls->recv_buf));
+ ssize_t count = gnutls_record_recv(tls->session, tls->recv_buf, sizeof(tls->recv_buf));
if (count == GNUTLS_E_AGAIN) {
break; /* No data available */
} else if (count == GNUTLS_E_INTERRUPTED) {
continue; /* Try reading again */
} else if (count == GNUTLS_E_REHANDSHAKE) {
tls->handshake_state = TLS_HS_IN_PROGRESS;
- ret = tls_process_handshake(proxy);
- if (ret <= 0) {
+ ret = tls_process_handshake(client);
+ if (ret < 0) {
return ret;
}
continue;
} else if (count < 0) {
- fprintf(stderr, "gnutls_record_recv failed: %s (%zd)\n",
+ fprintf(stdout, "[client] gnutls_record_recv failed: %s (%zd)\n",
gnutls_strerror_name(count), count);
return -1;
} else if (count == 0) {
break;
}
- if (proxy->upstream_state == STATE_CONNECTED) {
- bool upstream_pending_is_empty = (proxy->upstream_pending.len == 0);
- push_to_upstream_pending(proxy, tls->recv_buf, count);
+ struct peer *upstream = client->peer;
+ if (upstream->state == STATE_CONNECTED) {
+ bool upstream_pending_is_empty = (upstream->pending_buf.len == 0);
+ push_to_upstream_pending(upstream, tls->recv_buf, count);
if (upstream_pending_is_empty) {
- write_to_upstream_pending(proxy);
+ write_to_upstream_pending(upstream);
}
- } else if (proxy->upstream_state == STATE_NOT_CONNECTED) {
- /* TODO avoid allocation */
- uv_tcp_init(proxy->loop, &proxy->upstream);
+ } else if (upstream->state == STATE_NOT_CONNECTED) {
uv_connect_t *conn = (uv_connect_t *) malloc(sizeof(uv_connect_t));
- proxy->upstream_state = STATE_CONNECT_IN_PROGRESS;
- fprintf(stdout, "connecting to %s\n",
- ip_straddr((struct sockaddr *)&proxy->upstream_addr));
- uv_tcp_connect(conn, &proxy->upstream, (struct sockaddr *)&proxy->upstream_addr,
+ upstream->state = STATE_CONNECT_IN_PROGRESS;
+ fprintf(stdout, "[client] connecting to upstream '%s'\n", ip_straddr(&upstream->addr));
+ uv_tcp_connect(conn, &upstream->handle, (struct sockaddr *)&upstream->addr,
on_connect_to_upstream);
- push_to_upstream_pending(proxy, tls->recv_buf, count);
- } else if (proxy->upstream_state == STATE_CONNECT_IN_PROGRESS) {
- push_to_upstream_pending(proxy, tls->recv_buf, count);
+ push_to_upstream_pending(upstream, tls->recv_buf, count);
+ } else if (upstream->state == STATE_CONNECT_IN_PROGRESS) {
+ push_to_upstream_pending(upstream, tls->recv_buf, count);
}
submitted += count;
}
const char *key_file = a->key_file;
proxy->a = a;
proxy->loop = uv_default_loop();
- uv_tcp_init(proxy->loop, &proxy->server);
- int res = uv_ip4_addr(server_addr, server_port, (struct sockaddr_in *)&proxy->server_addr);
+ uv_tcp_init(proxy->loop, &proxy->server.handle);
+ int res = uv_ip4_addr(server_addr, server_port, (struct sockaddr_in *)&proxy->server.addr);
if (res != 0) {
- res = uv_ip6_addr(server_addr, server_port, (struct sockaddr_in6 *)&proxy->server_addr);
+ res = uv_ip6_addr(server_addr, server_port, (struct sockaddr_in6 *)&proxy->server.addr);
if (res != 0) {
- fprintf(stderr, "tls_proxy_init: can't parse local address '%s'\n", server_addr);
+ fprintf(stdout, "[proxy] tls_proxy_init: can't parse local address '%s'\n", server_addr);
return -1;
}
}
if (res != 0) {
res = uv_ip6_addr(upstream_addr, upstream_port, (struct sockaddr_in6 *)&proxy->upstream_addr);
if (res != 0) {
- fprintf(stderr, "tls_proxy_init: can't parse upstream address '%s'\n", upstream_addr);
+ fprintf(stdout, "[proxy] tls_proxy_init: can't parse upstream address '%s'\n", upstream_addr);
return -1;
}
}
- array_init(proxy->buffer_pool);
- array_init(proxy->upstream_pending);
- array_init(proxy->client_pending);
- proxy->server_state = STATE_NOT_CONNECTED;
- proxy->client_state = STATE_NOT_CONNECTED;
- proxy->upstream_state = STATE_NOT_CONNECTED;
+ array_init(proxy->client_list);
+ proxy->conn_sequence = 0;
proxy->loop->data = proxy;
if (gnutls_references == 0) {
err = gnutls_global_init();
if (err != GNUTLS_E_SUCCESS) {
- fprintf(stderr, "gnutls_global_init() failed: (%d) %s\n",
+ fprintf(stdout, "[proxy] gnutls_global_init() failed: (%d) %s\n",
err, gnutls_strerror_name(err));
return -1;
}
}
gnutls_references += 1;
- err = gnutls_certificate_allocate_credentials(&proxy->tls.credentials);
+ err = gnutls_certificate_allocate_credentials(&proxy->tls_credentials);
if (err != GNUTLS_E_SUCCESS) {
- fprintf(stderr, "gnutls_certificate_allocate_credentials() failed: (%d) %s\n",
+ fprintf(stdout, "[proxy] gnutls_certificate_allocate_credentials() failed: (%d) %s\n",
err, gnutls_strerror_name(err));
return -1;
}
- err = gnutls_certificate_set_x509_system_trust(proxy->tls.credentials);
+ err = gnutls_certificate_set_x509_system_trust(proxy->tls_credentials);
if (err <= 0) {
- fprintf(stderr, "gnutls_certificate_set_x509_system_trust() failed: (%d) %s\n",
+ fprintf(stdout, "[proxy] gnutls_certificate_set_x509_system_trust() failed: (%d) %s\n",
err, gnutls_strerror_name(err));
return -1;
}
if (cert_file && key_file) {
- err = gnutls_certificate_set_x509_key_file(proxy->tls.credentials,
+ err = gnutls_certificate_set_x509_key_file(proxy->tls_credentials,
cert_file, key_file, GNUTLS_X509_FMT_PEM);
if (err != GNUTLS_E_SUCCESS) {
- fprintf(stderr, "gnutls_certificate_set_x509_key_file() failed: (%d) %s\n",
+ fprintf(stdout, "[proxy] gnutls_certificate_set_x509_key_file() failed: (%d) %s\n",
err, gnutls_strerror_name(err));
return -1;
}
}
- err = gnutls_priority_init(&proxy->tls.priority_cache, NULL, NULL);
+ err = gnutls_priority_init(&proxy->tls_priority_cache, NULL, NULL);
if (err != GNUTLS_E_SUCCESS) {
- fprintf(stderr, "gnutls_priority_init() failed: (%d) %s\n",
+ fprintf(stdout, "[proxy] gnutls_priority_init() failed: (%d) %s\n",
err, gnutls_strerror_name(err));
return -1;
}
-
- proxy->tls.handshake_state = TLS_HS_NOT_STARTED;
return 0;
}
if (!proxy) {
return;
}
- clear_upstream_pending(proxy);
- clear_client_pending(proxy);
- clear_buffer_pool(proxy);
- gnutls_certificate_free_credentials(proxy->tls.credentials);
- gnutls_priority_deinit(proxy->tls.priority_cache);
- /* TODO correctly close all the uv_tcp_t */
+ while (proxy->client_list.len > 0) {
+ size_t last_index = proxy->client_list.len - 1;
+ struct peer *client = proxy->client_list.at[last_index];
+ clear_pending_bufs(client);
+ clear_pending_bufs(client->peer);
+ /* TODO correctly close all the uv_tcp_t */
+ free(client->peer);
+ free(client);
+ array_del(proxy->client_list, last_index);
+ }
+ gnutls_certificate_free_credentials(proxy->tls_credentials);
+ gnutls_priority_deinit(proxy->tls_priority_cache);
free(proxy);
gnutls_references -= 1;
int tls_proxy_start_listen(struct tls_proxy_ctx *proxy)
{
- uv_tcp_bind(&proxy->server, (const struct sockaddr*)&proxy->server_addr, 0);
- int ret = uv_listen((uv_stream_t*)&proxy->server, 128, on_client_connection);
- if (ret == 0) {
- proxy->server_state = STATE_LISTENING;
- }
+ uv_tcp_bind(&proxy->server.handle, (const struct sockaddr*)&proxy->server.addr, 0);
+ int ret = uv_listen((uv_stream_t*)&proxy->server.handle, 128, on_client_connection);
return ret;
}