From 19a35fb83922bd89d678e241a531d9625a25ea05 Mon Sep 17 00:00:00 2001 From: "W.C.A. Wijngaards" Date: Mon, 13 Jul 2020 15:16:59 +0200 Subject: [PATCH] stream reuse, write and read again if more data can go over the channel, this amortizes the event loop mechanism for busy channels, for performance. --- services/outside_network.c | 13 ++++++++++ util/netevent.c | 52 ++++++++++++++++++++++++++++++++++++++ util/netevent.h | 10 ++++++++ 3 files changed, 75 insertions(+) diff --git a/services/outside_network.c b/services/outside_network.c index d7db120fe..b1e420da5 100644 --- a/services/outside_network.c +++ b/services/outside_network.c @@ -911,12 +911,19 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, /* setup to write next packet or setup read timeout */ if(pend->reuse.write_wait_first) { verbose(VERB_ALGO, "outnet tcp setup next pkt"); + /* we can write it straight away perhaps, set flag + * because this callback called after a tcp write + * succeeded and likely more buffer space is available + * and we can write some more. */ + pend->c->tcp_more_write_again = 1; pend->query = reuse_write_wait_pop(&pend->reuse); outnet_tcp_take_query_setup(pend->c->fd, pend, pend->query); } else { verbose(VERB_ALGO, "outnet tcp writes done, wait"); pend->c->tcp_write_and_read = 0; + pend->c->tcp_more_read_again = 0; + pend->c->tcp_more_write_again = 0; pend->c->tcp_is_reading = 1; reuse_tcp_setup_timeout(pend); } @@ -964,6 +971,12 @@ outnet_tcp_cb(struct comm_point* c, void* arg, int error, verbose(5, "outnet_tcp_cb reuse after cb: keep it"); /* it is in the reuse_tcp tree, with other queries, or * on the empty list. do not decommission it */ + /* if there are more outstanding queries, we could try to + * read again, to see if it is on the input, + * because this callback called after a successful read + * and there could be more bytes to read on the input */ + if(pend->reuse.tree_by_id.count != 0) + pend->c->tcp_more_read_again = 1; reuse_tcp_setup_read_and_timeout(pend); return 0; } diff --git a/util/netevent.c b/util/netevent.c index 4098dfb03..889f9a1f3 100644 --- a/util/netevent.c +++ b/util/netevent.c @@ -985,6 +985,8 @@ reclaim_tcp_handler(struct comm_point* c) comm_point_start_listening(c->tcp_parent, -1, -1); } } + c->tcp_more_read_again = 0; + c->tcp_more_write_again = 0; } /** do the callback when writing is done */ @@ -1852,6 +1854,52 @@ tcp_req_info_read_again(int fd, struct comm_point* c) } } +/** read again to drain buffers when there could be more to read */ +static void +tcp_more_read_again(int fd, struct comm_point* c) +{ + /* if the packet is done, but another one could be waiting on + * the connection, the callback signals this, and we try again */ + /* this continues until the read routines get EAGAIN or so, + * and thus does not call the callback, and the bool is 0 */ + while(c->tcp_more_read_again) { + c->tcp_more_read_again = 0; + if(!comm_point_tcp_handle_read(fd, c, 0)) { + reclaim_tcp_handler(c); + if(!c->tcp_do_close) { + fptr_ok(fptr_whitelist_comm_point( + c->callback)); + (void)(*c->callback)(c, c->cb_arg, + NETEVENT_CLOSED, NULL); + } + return; + } + } +} + +/** write again to fill up when there could be more to write */ +static void +tcp_more_write_again(int fd, struct comm_point* c) +{ + /* if the packet is done, but another is waiting to be written, + * the callback signals it and we try again. */ + /* this continues until the write routines get EAGAIN or so, + * and thus does not call the callback, and the bool is 0 */ + while(c->tcp_more_write_again) { + c->tcp_more_write_again = 0; + if(!comm_point_tcp_handle_write(fd, c)) { + reclaim_tcp_handler(c); + if(!c->tcp_do_close) { + fptr_ok(fptr_whitelist_comm_point( + c->callback)); + (void)(*c->callback)(c, c->cb_arg, + NETEVENT_CLOSED, NULL); + } + return; + } + } +} + void comm_point_tcp_handle_callback(int fd, short event, void* arg) { @@ -1903,6 +1951,8 @@ comm_point_tcp_handle_callback(int fd, short event, void* arg) } if(has_tcpq && c->tcp_req_info && c->tcp_req_info->read_again) tcp_req_info_read_again(fd, c); + if(c->tcp_more_read_again) + tcp_more_read_again(fd, c); return; } if(event&UB_EV_WRITE) { @@ -1918,6 +1968,8 @@ comm_point_tcp_handle_callback(int fd, short event, void* arg) } if(has_tcpq && c->tcp_req_info && c->tcp_req_info->read_again) tcp_req_info_read_again(fd, c); + if(c->tcp_more_write_again) + tcp_more_write_again(fd, c); return; } log_err("Ignored event %d for tcphdl.", event); diff --git a/util/netevent.h b/util/netevent.h index 300592e5b..ad24541f6 100644 --- a/util/netevent.h +++ b/util/netevent.h @@ -268,6 +268,16 @@ struct comm_point { /** length of tcp_write_pkt in bytes */ size_t tcp_write_pkt_len; + /** if set try to read another packet again (over connection with + * multiple packets), once set, tries once, then zero again, + * so set it in the packet complete section. */ + int tcp_more_read_again; + + /** if set try to write another packet (over connection with + * multiple packets), once set, tries once, then zero again, + * so set it in the packet complete section. */ + int tcp_more_write_again; + /** if set, read/write completes: read/write state of tcp is toggled. buffer reset/bytecount reset. -- 2.47.3