From: Wouter Wijngaards Date: Fri, 11 Jan 2019 14:12:27 +0000 (+0000) Subject: - Initial commit for out-of-order processing for TCP and TLS. X-Git-Tag: release-1.9.0rc1~52 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=dd19026e910c6003a9436c77488cc62929e4a9a8;p=thirdparty%2Funbound.git - Initial commit for out-of-order processing for TCP and TLS. git-svn-id: file:///svn/unbound/trunk@5032 be551aaa-1e26-0410-a405-d3ace91eadb9 --- diff --git a/Makefile.in b/Makefile.in index af5b10f65..0a257bda5 100644 --- a/Makefile.in +++ b/Makefile.in @@ -744,7 +744,10 @@ listen_dnsport.lo listen_dnsport.o: $(srcdir)/services/listen_dnsport.c config.h $(srcdir)/services/listen_dnsport.h $(srcdir)/util/netevent.h $(srcdir)/dnscrypt/dnscrypt.h \ $(srcdir)/services/outside_network.h $(srcdir)/util/rbtree.h \ $(srcdir)/util/log.h $(srcdir)/util/config_file.h $(srcdir)/util/net_help.h \ - $(srcdir)/sldns/sbuffer.h + $(srcdir)/sldns/sbuffer.h $(srcdir)/services/mesh.h $(srcdir)/util/data/msgparse.h \ + $(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/sldns/pkthdr.h $(srcdir)/sldns/rrdef.h \ + $(srcdir)/util/module.h $(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h \ + $(srcdir)/services/modstack.h $(srcdir)/util/fptr_wlist.h $(srcdir)/util/tube.h localzone.lo localzone.o: $(srcdir)/services/localzone.c config.h $(srcdir)/services/localzone.h \ $(srcdir)/util/rbtree.h $(srcdir)/util/locks.h $(srcdir)/util/log.h $(srcdir)/util/storage/dnstree.h \ $(srcdir)/util/module.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/data/msgreply.h \ @@ -762,7 +765,8 @@ mesh.lo mesh.o: $(srcdir)/services/mesh.c config.h $(srcdir)/services/mesh.h $(s $(srcdir)/util/data/msgencode.h $(srcdir)/util/timehist.h $(srcdir)/util/fptr_wlist.h $(srcdir)/util/tube.h \ $(srcdir)/util/alloc.h $(srcdir)/util/config_file.h $(srcdir)/util/edns.h $(srcdir)/sldns/sbuffer.h \ $(srcdir)/sldns/wire2str.h $(srcdir)/services/localzone.h $(srcdir)/util/storage/dnstree.h \ - $(srcdir)/services/view.h $(srcdir)/util/data/dname.h $(srcdir)/respip/respip.h + $(srcdir)/services/view.h $(srcdir)/util/data/dname.h $(srcdir)/respip/respip.h \ + $(srcdir)/services/listen_dnsport.h modstack.lo modstack.o: $(srcdir)/services/modstack.c config.h $(srcdir)/services/modstack.h \ $(srcdir)/util/module.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/util/log.h \ $(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h $(srcdir)/util/data/msgparse.h \ @@ -870,7 +874,7 @@ netevent.lo netevent.o: $(srcdir)/util/netevent.c config.h $(srcdir)/util/neteve $(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h $(srcdir)/util/data/msgparse.h \ $(srcdir)/sldns/pkthdr.h $(srcdir)/sldns/rrdef.h $(srcdir)/util/tube.h $(srcdir)/services/mesh.h \ $(srcdir)/services/modstack.h $(srcdir)/sldns/sbuffer.h $(srcdir)/sldns/str2wire.h $(srcdir)/dnstap/dnstap.h \ - \ + $(srcdir)/services/listen_dnsport.h \ net_help.lo net_help.o: $(srcdir)/util/net_help.c config.h $(srcdir)/util/net_help.h $(srcdir)/util/log.h \ $(srcdir)/util/data/dname.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/util/module.h \ diff --git a/doc/Changelog b/doc/Changelog index f95c10f6b..45e187e4c 100644 --- a/doc/Changelog +++ b/doc/Changelog @@ -1,3 +1,6 @@ +11 January 2018: Wouter + - Initial commit for out-of-order processing for TCP and TLS. + 9 January 2018: Wouter - Log query name for looping module errors. diff --git a/services/listen_dnsport.c b/services/listen_dnsport.c index d4a0d6a8c..311cf2dda 100644 --- a/services/listen_dnsport.c +++ b/services/listen_dnsport.c @@ -53,6 +53,8 @@ #include "util/config_file.h" #include "util/net_help.h" #include "sldns/sbuffer.h" +#include "services/mesh.h" +#include "util/fptr_wlist.h" #ifdef HAVE_NETDB_H #include @@ -1276,11 +1278,13 @@ listen_create(struct comm_base* base, struct listen_port* ports, ports->ftype == listen_type_tcp_dnscrypt) cp = comm_point_create_tcp(base, ports->fd, tcp_accept_count, tcp_idle_timeout, - tcp_conn_limit, bufsize, cb, cb_arg); + tcp_conn_limit, bufsize, front->udp_buff, + cb, cb_arg); else if(ports->ftype == listen_type_ssl) { cp = comm_point_create_tcp(base, ports->fd, tcp_accept_count, tcp_idle_timeout, - tcp_conn_limit, bufsize, cb, cb_arg); + tcp_conn_limit, bufsize, front->udp_buff, + cb, cb_arg); cp->ssl = sslctx; } else if(ports->ftype == listen_type_udpancil || ports->ftype == listen_type_udpancil_dnscrypt) @@ -1508,3 +1512,311 @@ void listen_start_accept(struct listen_dnsport* listen) } } +struct tcp_req_info* +tcp_req_info_create(struct sldns_buffer* spoolbuf) +{ + struct tcp_req_info* req = (struct tcp_req_info*)malloc(sizeof(*req)); + if(!req) { + log_err("malloc failure for new stream outoforder processing structure"); + return NULL; + } + memset(req, 0, sizeof(*req)); + req->spool_buffer = spoolbuf; + return req; +} + +void +tcp_req_info_delete(struct tcp_req_info* req) +{ + if(!req) return; + tcp_req_info_clear(req); + /* cp is pointer back to commpoint that owns this struct and + * called delete on us */ + /* spool_buffer is shared udp buffer, not deleted here */ + free(req); +} + +void tcp_req_info_clear(struct tcp_req_info* req) +{ + struct tcp_req_open_item* open, *nopen; + struct tcp_req_done_item* item, *nitem; + if(!req) return; + + /* free outstanding request mesh reply entries */ + open = req->open_req_list; + while(open) { + nopen = open->next; + mesh_state_remove_reply(open->mesh, open->mesh_state, req->cp); + free(open); + open = nopen; + } + req->open_req_list = NULL; + req->num_open_req = 0; + + /* free pending writable result packets */ + item = req->done_req_list; + while(item) { + nitem = item->next; + free(item->buf); + free(item); + item = nitem; + } + req->done_req_list = NULL; + req->num_done_req = 0; + req->read_is_closed = 0; +} + +void +tcp_req_info_remove_mesh_state(struct tcp_req_info* req, struct mesh_state* m) +{ + struct tcp_req_open_item* open, *prev = NULL; + if(!req || !m) return; + open = req->open_req_list; + while(open) { + if(open->mesh_state == m) { + struct tcp_req_open_item* next; + if(prev) prev->next = open->next; + else req->open_req_list = open->next; + /* caller has to manage the mesh state reply entry */ + next = open->next; + free(open); + req->num_open_req --; + + /* prev = prev; */ + open = next; + continue; + } + prev = open; + open = open->next; + } +} + +/** number of simultaneous requests a client can have */ +#define TCP_MAX_REQ_SIMULTANEOUS 10 + +/** setup listening for read or write */ +static void +tcp_req_info_setup_listen(struct tcp_req_info* req) +{ + int wr = 0; + int rd = 0; + + if(req->cp->tcp_byte_count != 0) { + /* cannot change, halfway through */ + return; + } + + if(!req->cp->tcp_is_reading) + wr = 1; + if(req->num_open_req + req->num_done_req < TCP_MAX_REQ_SIMULTANEOUS && + !req->read_is_closed) + rd = 1; + + if(wr) { + req->cp->tcp_is_reading = 0; + comm_point_start_listening(req->cp, -1, + req->cp->tcp_timeout_msec); + } else if(rd) { + req->cp->tcp_is_reading = 1; + comm_point_start_listening(req->cp, -1, + req->cp->tcp_timeout_msec); + } else { + comm_point_start_listening(req->cp, -1, + req->cp->tcp_timeout_msec); + comm_point_listen_for_rw(req->cp, 0, 0); + } +} + +/** remove first item from list of pending results */ +static struct tcp_req_done_item* +tcp_req_info_pop_done(struct tcp_req_info* req) +{ + struct tcp_req_done_item* item; + log_assert(req->num_done_req > 0 && req->done_req_list); + item = req->done_req_list; + req->done_req_list = req->done_req_list->next; + req->num_done_req --; + return item; +} + +/** Send given buffer and setup to write */ +static void +tcp_req_info_start_write_buf(struct tcp_req_info* req, uint8_t* buf, + size_t len) +{ + sldns_buffer_clear(req->cp->buffer); + sldns_buffer_write(req->cp->buffer, buf, len); + sldns_buffer_flip(req->cp->buffer); + + req->cp->tcp_is_reading = 0; /* we are now writing */ +} + +/** pick up the next result and start writing it to the channel */ +static void +tcp_req_pickup_next_result(struct tcp_req_info* req) +{ + if(req->num_done_req > 0) { + /* unlist the done item from the list of pending results */ + struct tcp_req_done_item* item = tcp_req_info_pop_done(req); + tcp_req_info_start_write_buf(req, item->buf, item->len); + free(item->buf); + free(item); + } +} + +/** the read channel has closed */ +int +tcp_req_info_handle_read_close(struct tcp_req_info* req) +{ + verbose(VERB_ALGO, "tcp channel read side closed %d", req->cp->fd); + if(req->num_done_req != 0) { + tcp_req_pickup_next_result(req); + tcp_req_info_setup_listen(req); + return 1; + } + if(req->num_open_req == 0 && req->num_done_req == 0) + return 0; + req->read_is_closed = 1; + tcp_req_info_setup_listen(req); + return 1; +} + +void +tcp_req_info_handle_writedone(struct tcp_req_info* req) +{ + /* back to reading state, we finished this write event */ + sldns_buffer_clear(req->cp->buffer); + req->cp->tcp_is_reading = 1; + /* see if another result needs writing */ + tcp_req_pickup_next_result(req); + + /* see if there is more to write, if not stop_listening for writing */ + /* see if new requests are allowed, if so, start_listening + * for reading */ + tcp_req_info_setup_listen(req); +} + +void +tcp_req_info_handle_readdone(struct tcp_req_info* req) +{ + struct comm_point* c = req->cp; + + /* we want to read up several requests, unless there are + * pending answers */ + + req->is_drop = 0; + req->is_reply = 0; + req->in_worker_handle = 1; + /* handle the current request, */ + fptr_ok(fptr_whitelist_comm_point(c->callback)); + if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &c->repinfo) ) { + req->in_worker_handle = 0; + /* there is an answer, put it up. It is already in the + * c->buffer, just send it. */ + /* since we were just reading a query, the channel is + * clear to write to */ + send_it: + c->tcp_is_reading = 0; + comm_point_start_listening(c, -1, c->tcp_timeout_msec); + return; + } + req->in_worker_handle = 0; + /* it should be waiting in the mesh for recursion. + * If mesh failed(formerr) and called commpoint_drop_reply. Then the + * mesh state has been cleared. */ + if(req->is_drop) { + return; + } + /* If mesh failed(mallocfail) and called commpoint_send_reply with + * something like servfail then we pick up that reply below. */ + if(req->is_reply) { + goto send_it; + } + + sldns_buffer_clear(req->cp->buffer); + /* if pending answers, pick up an answer and start sending it */ + tcp_req_pickup_next_result(req); + + /* if answers pending, start sending answers */ + /* read more requests if we can have more requests */ + tcp_req_info_setup_listen(req); +} + +int +tcp_req_info_add_meshstate(struct tcp_req_info* req, + struct mesh_area* mesh, struct mesh_state* m) +{ + struct tcp_req_open_item* item; + log_assert(req && mesh && m); + item = (struct tcp_req_open_item*)malloc(sizeof(*item)); + if(!item) return 0; + item->next = req->open_req_list; + item->mesh = mesh; + item->mesh_state = m; + req->open_req_list = item; + req->num_open_req++; + return 1; +} + +/** Add a result to the result list. At the end. */ +static int +tcp_req_info_add_result(struct tcp_req_info* req, uint8_t* buf, size_t len) +{ + struct tcp_req_done_item* last = NULL; + struct tcp_req_done_item* item; + /* find last element */ + last = req->done_req_list; + while(last && last->next) + last = last->next; + + /* create new element */ + item = (struct tcp_req_done_item*)malloc(sizeof(*item)); + if(!item) { + return 0; + } + item->next = NULL; + item->len = len; + item->buf = memdup(buf, len); + if(!item->buf) { + free(item); + return 0; + } + + /* link in */ + if(last) last->next = item; + else req->done_req_list = item; + req->num_done_req++; + return 1; +} + +void +tcp_req_info_send_reply(struct tcp_req_info* req) +{ + if(req->in_worker_handle) { + /* It is in the right buffer to answer straight away */ + req->is_reply = 1; + return; + } + /* now that the query has been handled, that mesh_reply entry + * should be removed, from the tcp_req_info list */ + /* TODO: find it, need mstate ptr */ + /* see if we can send it straight away (we are not doing + * anything else). If so, copy to buffer and start */ + if(req->cp->tcp_is_reading && req->cp->tcp_byte_count == 0) { + /* buffer is free, and was ready to read new query into, + * but we are now going to use it to send this answer */ + tcp_req_info_start_write_buf(req, + sldns_buffer_begin(req->spool_buffer), + sldns_buffer_limit(req->spool_buffer)); + /* switch to listen to write events */ + comm_point_stop_listening(req->cp); + comm_point_start_listening(req->cp, -1, + req->cp->tcp_timeout_msec); + return; + } + /* queue up the answer behind the others already pending */ + if(!tcp_req_info_add_result(req, sldns_buffer_begin(req->spool_buffer), + sldns_buffer_limit(req->spool_buffer))) { + log_err("malloc failure adding reply to stream result list"); + } +} diff --git a/services/listen_dnsport.h b/services/listen_dnsport.h index 46b432d4b..653413bc6 100644 --- a/services/listen_dnsport.h +++ b/services/listen_dnsport.h @@ -237,4 +237,127 @@ int create_tcp_accept_sock(struct addrinfo *addr, int v6only, int* noproto, */ int create_local_accept_sock(const char* path, int* noproto, int use_systemd); +/** + * TCP request info. List of requests outstanding on the channel, that + * are asked for but not yet answered back. + */ +struct tcp_req_info { + /** the TCP comm point for this. Its buffer is used for read/write */ + struct comm_point* cp; + /** the buffer to use to spool reply from mesh into, + * it can then be copied to the result list and written. + * it is a pointer to the shared udp buffer. */ + struct sldns_buffer* spool_buffer; + /** are we in worker_handle function call (for recursion callback)*/ + int in_worker_handle; + /** is the comm point dropped (by worker handle). + * That means we have to disconnect the channel. */ + int is_drop; + /** is the comm point set to send_reply (by mesh new client in worker + * handle), if so answer is available in c.buffer */ + int is_reply; + /** read channel has closed, just write pending results */ + int read_is_closed; + /** number of outstanding requests */ + int num_open_req; + /** list of outstanding requests */ + struct tcp_req_open_item* open_req_list; + /** number of pending writeable results */ + int num_done_req; + /** list of pending writable result packets, malloced one at a time */ + struct tcp_req_done_item* done_req_list; +}; + +/** + * List of open items in TCP channel + */ +struct tcp_req_open_item { + /** next in list */ + struct tcp_req_open_item* next; + /** the mesh area of the mesh_state */ + struct mesh_area* mesh; + /** the mesh state */ + struct mesh_state* mesh_state; +}; + +/** + * List of done items in TCP channel + */ +struct tcp_req_done_item { + /** next in list */ + struct tcp_req_done_item* next; + /** the buffer with packet contents */ + uint8_t* buf; + /** length of the buffer */ + size_t len; +}; + +/** + * Create tcp request info structure that keeps track of open + * requests on the TCP channel that are resolved at the same time, + * and the pending results that have to get written back to that client. + * @param spoolbuf: shared buffer + * @return new structure or NULL on alloc failure. + */ +struct tcp_req_info* tcp_req_info_create(struct sldns_buffer* spoolbuf); + +/** + * Delete tcp request structure. Called by owning commpoint. + * Removes mesh entry references and stored results from the lists. + * @param req: the tcp request info + */ +void tcp_req_info_delete(struct tcp_req_info* req); + +/** + * Clear tcp request structure. Removes list entries, sets it up ready + * for the next connection. + * @param req: tcp request info structure. + */ +void tcp_req_info_clear(struct tcp_req_info* req); + +/** + * Remove mesh state entry from list in tcp_req_info. + * caller has to manage the mesh state reply entry in the mesh state. + * @param req: the tcp req info that has the entry removed from the list. + * @param m: the state removed from the list. + */ +void tcp_req_info_remove_mesh_state(struct tcp_req_info* req, + struct mesh_state* m); + +/** + * Handle write done of the last result packet + */ +void tcp_req_info_handle_writedone(struct tcp_req_info* req); + +/** + * Handle read done of a new request from the client + */ +void tcp_req_info_handle_readdone(struct tcp_req_info* req); + +/** + * Add mesh state to the tcp req list of open requests. + * So the comm_reply can be removed off the mesh reply list when + * the tcp channel has to be closed (for other reasons then that that + * request was done, eg. channel closed by client or some format error. + * @param req: tcp req info structure. It keeps track of the simultaneous + * requests and results on a tcp (or TLS) channel. + * @param mesh: mesh area for the state. + * @param m: mesh state to add. + * @return 0 on failure (malloc failure). + */ +int tcp_req_info_add_meshstate(struct tcp_req_info* req, + struct mesh_area* mesh, struct mesh_state* m); + +/** + * Send reply on tcp simultaneous answer channel. May queue it up. + * @param req: request info structure. + */ +void tcp_req_info_send_reply(struct tcp_req_info* req); + +/** the read channel has closed + * @param req: request. remaining queries are looked up and answered. + * @return zero if nothing to do, just close the tcp. + */ +int tcp_req_info_handle_read_close(struct tcp_req_info* req); + #endif /* LISTEN_DNSPORT_H */ diff --git a/services/mesh.c b/services/mesh.c index 3d4403f24..01771af35 100644 --- a/services/mesh.c +++ b/services/mesh.c @@ -61,6 +61,7 @@ #include "services/localzone.h" #include "util/data/dname.h" #include "respip/respip.h" +#include "services/listen_dnsport.h" /** subtract timers and the values do not overflow or become negative */ static void @@ -429,6 +430,7 @@ void mesh_new_client(struct mesh_area* mesh, struct query_info* qinfo, /* add reply to s */ if(!mesh_state_add_reply(s, edns, rep, qid, qflags, qinfo)) { log_err("mesh_new_client: out of memory; SERVFAIL"); + servfail_mem: if(!inplace_cb_reply_servfail_call(mesh->env, qinfo, &s->s, NULL, LDNS_RCODE_SERVFAIL, edns, rep, mesh->env->scratch)) edns->opt_list = NULL; @@ -439,6 +441,12 @@ void mesh_new_client(struct mesh_area* mesh, struct query_info* qinfo, mesh_state_delete(&s->s); return; } + if(rep->c->tcp_req_info) { + if(!tcp_req_info_add_meshstate(rep->c->tcp_req_info, mesh, s)) { + log_err("mesh_new_client: out of memory add tcpreqinfo"); + goto servfail_mem; + } + } /* update statistics */ if(was_detached) { log_assert(mesh->num_detached_states > 0); @@ -1031,11 +1039,14 @@ mesh_do_callback(struct mesh_state* m, int rcode, struct reply_info* rep, * @param rcode: if not 0, error code. * @param rep: reply to send (or NULL if rcode is set). * @param r: reply entry + * @param r_buffer: buffer to use for reply entry. * @param prev: previous reply, already has its answer encoded in buffer. + * @param prev_buffer: buffer for previous reply. */ static void mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep, - struct mesh_reply* r, struct mesh_reply* prev) + struct mesh_reply* r, struct sldns_buffer* r_buffer, + struct mesh_reply* prev, struct sldns_buffer* prev_buffer) { struct timeval end_time; struct timeval duration; @@ -1063,7 +1074,7 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep, * and still reuse the previous answer if they are the same, but that * would be complicated and error prone for the relatively minor case. * So we err on the side of safety. */ - if(prev && prev->qflags == r->qflags && + if(prev && prev_buffer && prev->qflags == r->qflags && !prev->local_alias && !r->local_alias && prev->edns.edns_present == r->edns.edns_present && prev->edns.bits == r->edns.bits && @@ -1071,13 +1082,11 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep, edns_opt_list_compare(prev->edns.opt_list, r->edns.opt_list) == 0) { /* if the previous reply is identical to this one, fix ID */ - if(prev->query_reply.c->buffer != r->query_reply.c->buffer) - sldns_buffer_copy(r->query_reply.c->buffer, - prev->query_reply.c->buffer); - sldns_buffer_write_at(r->query_reply.c->buffer, 0, - &r->qid, sizeof(uint16_t)); - sldns_buffer_write_at(r->query_reply.c->buffer, 12, - r->qname, m->s.qinfo.qname_len); + if(prev_buffer != r_buffer) + sldns_buffer_copy(r_buffer, prev_buffer); + sldns_buffer_write_at(r_buffer, 0, &r->qid, sizeof(uint16_t)); + sldns_buffer_write_at(r_buffer, 12, r->qname, + m->s.qinfo.qname_len); comm_point_send_reply(&r->query_reply); } else if(rcode) { m->s.qinfo.qname = r->qname; @@ -1091,8 +1100,8 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep, &r->edns, NULL, m->s.region)) r->edns.opt_list = NULL; } - error_encode(r->query_reply.c->buffer, rcode, &m->s.qinfo, - r->qid, r->qflags, &r->edns); + error_encode(r_buffer, rcode, &m->s.qinfo, r->qid, + r->qflags, &r->edns); comm_point_send_reply(&r->query_reply); } else { size_t udp_size = r->edns.udp_size; @@ -1108,16 +1117,15 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep, m->s.env->cfg, r->query_reply.c, m->s.region) || !reply_info_answer_encode(&m->s.qinfo, rep, r->qid, - r->qflags, r->query_reply.c->buffer, 0, 1, - m->s.env->scratch, udp_size, &r->edns, - (int)(r->edns.bits & EDNS_DO), secure)) + r->qflags, r_buffer, 0, 1, m->s.env->scratch, + udp_size, &r->edns, (int)(r->edns.bits & EDNS_DO), + secure)) { if(!inplace_cb_reply_servfail_call(m->s.env, &m->s.qinfo, &m->s, rep, LDNS_RCODE_SERVFAIL, &r->edns, NULL, m->s.region)) r->edns.opt_list = NULL; - error_encode(r->query_reply.c->buffer, - LDNS_RCODE_SERVFAIL, &m->s.qinfo, r->qid, - r->qflags, &r->edns); + error_encode(r_buffer, LDNS_RCODE_SERVFAIL, + &m->s.qinfo, r->qid, r->qflags, &r->edns); } r->edns = edns_bak; comm_point_send_reply(&r->query_reply); @@ -1132,19 +1140,17 @@ mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep, timeval_add(&m->s.env->mesh->replies_sum_wait, &duration); timehist_insert(m->s.env->mesh->histogram, &duration); if(m->s.env->cfg->stat_extended) { - uint16_t rc = FLAGS_GET_RCODE(sldns_buffer_read_u16_at(r-> - query_reply.c->buffer, 2)); + uint16_t rc = FLAGS_GET_RCODE(sldns_buffer_read_u16_at( + r_buffer, 2)); if(secure) m->s.env->mesh->ans_secure++; m->s.env->mesh->ans_rcode[ rc ] ++; - if(rc == 0 && LDNS_ANCOUNT(sldns_buffer_begin(r-> - query_reply.c->buffer)) == 0) + if(rc == 0 && LDNS_ANCOUNT(sldns_buffer_begin(r_buffer)) == 0) m->s.env->mesh->ans_nodata++; } /* Log reply sent */ if(m->s.env->cfg->log_replies) { log_reply_info(0, &m->s.qinfo, &r->query_reply.addr, - r->query_reply.addrlen, duration, 0, - r->query_reply.c->buffer); + r->query_reply.addrlen, duration, 0, r_buffer); } } @@ -1152,6 +1158,7 @@ void mesh_query_done(struct mesh_state* mstate) { struct mesh_reply* r; struct mesh_reply* prev = NULL; + struct sldns_buffer* prev_buffer = NULL; struct mesh_cb* c; struct reply_info* rep = (mstate->s.return_msg? mstate->s.return_msg->rep:NULL); @@ -1180,9 +1187,15 @@ void mesh_query_done(struct mesh_state* mstate) if(mstate->s.is_drop) comm_point_drop_reply(&r->query_reply); else { + struct sldns_buffer* r_buffer = r->query_reply.c->buffer; + if(r->query_reply.c->tcp_req_info) + r_buffer = r->query_reply.c->tcp_req_info->spool_buffer; mesh_send_reply(mstate, mstate->s.return_rcode, rep, - r, prev); + r, r_buffer, prev, prev_buffer); + if(r->query_reply.c->tcp_req_info) + tcp_req_info_remove_mesh_state(r->query_reply.c->tcp_req_info, mstate); prev = r; + prev_buffer = r_buffer; } } mstate->replies_sent = 1; @@ -1613,3 +1626,36 @@ void mesh_list_remove(struct mesh_state* m, struct mesh_state** fp, m->prev->next = m->next; else *fp = m->next; } + +void mesh_state_remove_reply(struct mesh_area* mesh, struct mesh_state* m, + struct comm_point* cp) +{ + struct mesh_reply* n, *prev = NULL; + n = m->reply_list; + if(!n) return; /* nothing to remove, also no accounting needed */ + while(n) { + if(n->query_reply.c == cp) { + /* unlink it */ + if(prev) prev->next = n->next; + else m->reply_list = n->next; + /* delete it, but allocated in m region */ + mesh->num_reply_addrs--; + + /* prev = prev; */ + n = n->next; + continue; + } + prev = n; + n = n->next; + } + /* it was not detached (because it had a reply list), could be now */ + if(!m->reply_list && !m->cb_list + && m->super_set.count == 0) { + mesh->num_detached_states++; + } + /* if not replies any more in mstate, it is no longer a reply_state */ + if(!m->reply_list && !m->cb_list) { + log_assert(mesh->num_reply_states > 0); + mesh->num_reply_states--; + } +} diff --git a/services/mesh.h b/services/mesh.h index b4ce03e7e..e11c06bf4 100644 --- a/services/mesh.h +++ b/services/mesh.h @@ -633,4 +633,14 @@ void mesh_list_insert(struct mesh_state* m, struct mesh_state** fp, void mesh_list_remove(struct mesh_state* m, struct mesh_state** fp, struct mesh_state** lp); +/** + * Remove mesh reply entry from the reply entry list. Searches for + * the repinfo pointer. + * @param mesh: to update the counters. + * @param m: the mesh state. + * @param cp: the commpoint to remove from the list. + */ +void mesh_state_remove_reply(struct mesh_area* mesh, struct mesh_state* m, + struct comm_point* cp); + #endif /* SERVICES_MESH_H */ diff --git a/testcode/fake_event.c b/testcode/fake_event.c index 777ed7355..5407999d5 100644 --- a/testcode/fake_event.c +++ b/testcode/fake_event.c @@ -1802,4 +1802,18 @@ int outnet_tcp_connect(int ATTR_UNUSED(s), struct sockaddr_storage* ATTR_UNUSED( return 0; } +int tcp_req_info_add_meshstate(struct tcp_req_info* ATTR_UNUSED(req), + struct mesh_area* ATTR_UNUSED(mesh), struct mesh_state* ATTR_UNUSED(m)) +{ + log_assert(0); + return 0; +} + +void +tcp_req_info_remove_mesh_state(struct tcp_req_info* ATTR_UNUSED(req), + struct mesh_state* ATTR_UNUSED(m)) +{ + log_assert(0); +} + /*********** End of Dummy routines ***********/ diff --git a/testdata/fwd_compress_c00c.tdir/fwd_compress_c00c.post b/testdata/fwd_compress_c00c.tdir/fwd_compress_c00c.post index e6dda048d..897f8cf70 100644 --- a/testdata/fwd_compress_c00c.tdir/fwd_compress_c00c.post +++ b/testdata/fwd_compress_c00c.tdir/fwd_compress_c00c.post @@ -8,3 +8,4 @@ . ../common.sh kill_pid $FWD_PID kill_pid $UNBOUND_PID +cat unbound.log diff --git a/testdata/fwd_compress_c00c.tdir/fwd_compress_c00c.test b/testdata/fwd_compress_c00c.tdir/fwd_compress_c00c.test index 67354d014..de4250c3e 100644 --- a/testdata/fwd_compress_c00c.tdir/fwd_compress_c00c.test +++ b/testdata/fwd_compress_c00c.tdir/fwd_compress_c00c.test @@ -6,9 +6,9 @@ # check what sort of netcat we have if nc -h 2>&1 | grep "q secs"; then - ncopt="-q 3 -w 2" + ncopt="-q 3 -i 2" else - ncopt="-w 2" + ncopt="-i 2" fi PRE="../.." diff --git a/util/netevent.c b/util/netevent.c index 373538041..58c65220a 100644 --- a/util/netevent.c +++ b/util/netevent.c @@ -50,6 +50,7 @@ #include "sldns/str2wire.h" #include "dnstap/dnstap.h" #include "dnscrypt/dnscrypt.h" +#include "services/listen_dnsport.h" #ifdef HAVE_OPENSSL_SSL_H #include #endif @@ -150,7 +151,8 @@ struct internal_signal { /** create a tcp handler with a parent */ static struct comm_point* comm_point_create_tcp_handler( struct comm_base *base, struct comm_point* parent, size_t bufsize, - comm_point_callback_type* callback, void* callback_arg); + struct sldns_buffer* spoolbuf, comm_point_callback_type* callback, + void* callback_arg); /* -------- End of local definitions -------- */ @@ -988,7 +990,11 @@ tcp_callback_writer(struct comm_point* c) c->tcp_byte_count = 0; /* switch from listening(write) to listening(read) */ comm_point_stop_listening(c); - comm_point_start_listening(c, -1, -1); + if(c->tcp_req_info) { + tcp_req_info_handle_writedone(c->tcp_req_info); + } else { + comm_point_start_listening(c, -1, -1); + } } /** do the callback when reading is done */ @@ -1002,9 +1008,13 @@ tcp_callback_reader(struct comm_point* c) c->tcp_byte_count = 0; if(c->type == comm_tcp) comm_point_stop_listening(c); - fptr_ok(fptr_whitelist_comm_point(c->callback)); - if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &c->repinfo) ) { - comm_point_start_listening(c, -1, c->tcp_timeout_msec); + if(c->tcp_req_info) { + tcp_req_info_handle_readdone(c->tcp_req_info); + } else { + fptr_ok(fptr_whitelist_comm_point(c->callback)); + if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &c->repinfo) ) { + comm_point_start_listening(c, -1, c->tcp_timeout_msec); + } } } @@ -1163,6 +1173,8 @@ ssl_handle_read(struct comm_point* c) c->tcp_byte_count))) <= 0) { int want = SSL_get_error(c->ssl, r); if(want == SSL_ERROR_ZERO_RETURN) { + if(c->tcp_req_info) + return tcp_req_info_handle_read_close(c->tcp_req_info); return 0; /* shutdown, closed */ } else if(want == SSL_ERROR_WANT_READ) { ub_winsock_tcp_wouldblock(c->ev->ev, UB_EV_READ); @@ -1205,6 +1217,8 @@ ssl_handle_read(struct comm_point* c) if(r <= 0) { int want = SSL_get_error(c->ssl, r); if(want == SSL_ERROR_ZERO_RETURN) { + if(c->tcp_req_info) + return tcp_req_info_handle_read_close(c->tcp_req_info); return 0; /* shutdown, closed */ } else if(want == SSL_ERROR_WANT_READ) { ub_winsock_tcp_wouldblock(c->ev->ev, UB_EV_READ); @@ -1365,9 +1379,11 @@ comm_point_tcp_handle_read(int fd, struct comm_point* c, int short_ok) /* read length bytes */ r = recv(fd,(void*)sldns_buffer_at(c->buffer,c->tcp_byte_count), sizeof(uint16_t)-c->tcp_byte_count, 0); - if(r == 0) + if(r == 0) { + if(c->tcp_req_info) + return tcp_req_info_handle_read_close(c->tcp_req_info); return 0; - else if(r == -1) { + } else if(r == -1) { #ifndef USE_WINSOCK if(errno == EINTR || errno == EAGAIN) return 1; @@ -1416,6 +1432,8 @@ comm_point_tcp_handle_read(int fd, struct comm_point* c, int short_ok) r = recv(fd, (void*)sldns_buffer_current(c->buffer), sldns_buffer_remaining(c->buffer), 0); if(r == 0) { + if(c->tcp_req_info) + return tcp_req_info_handle_read_close(c->tcp_req_info); return 0; } else if(r == -1) { #ifndef USE_WINSOCK @@ -2523,7 +2541,8 @@ comm_point_create_udp_ancil(struct comm_base *base, int fd, static struct comm_point* comm_point_create_tcp_handler(struct comm_base *base, struct comm_point* parent, size_t bufsize, - comm_point_callback_type* callback, void* callback_arg) + struct sldns_buffer* spoolbuf, comm_point_callback_type* callback, + void* callback_arg) { struct comm_point* c = (struct comm_point*)calloc(1, sizeof(struct comm_point)); @@ -2579,6 +2598,20 @@ comm_point_create_tcp_handler(struct comm_base *base, c->repinfo.c = c; c->callback = callback; c->cb_arg = callback_arg; + if(spoolbuf) { + c->tcp_req_info = tcp_req_info_create(spoolbuf); + if(!c->tcp_req_info) { + log_err("could not create tcp commpoint"); + sldns_buffer_free(c->buffer); + free(c->timeout); + free(c->ev); + free(c); + return NULL; + } + c->tcp_req_info->cp = c; + c->tcp_do_close = 1; + c->tcp_do_toggle_rw = 0; + } /* add to parent free list */ c->tcp_free = parent->tcp_free; parent->tcp_free = c; @@ -2590,6 +2623,9 @@ comm_point_create_tcp_handler(struct comm_base *base, { log_err("could not basetset tcphdl event"); parent->tcp_free = c->tcp_free; + tcp_req_info_delete(c->tcp_req_info); + sldns_buffer_free(c->buffer); + free(c->timeout); free(c->ev); free(c); return NULL; @@ -2600,7 +2636,8 @@ comm_point_create_tcp_handler(struct comm_base *base, struct comm_point* comm_point_create_tcp(struct comm_base *base, int fd, int num, int idle_timeout, struct tcl_list* tcp_conn_limit, size_t bufsize, - comm_point_callback_type* callback, void* callback_arg) + struct sldns_buffer* spoolbuf, comm_point_callback_type* callback, + void* callback_arg) { struct comm_point* c = (struct comm_point*)calloc(1, sizeof(struct comm_point)); @@ -2667,7 +2704,7 @@ comm_point_create_tcp(struct comm_base *base, int fd, int num, /* now prealloc the tcp handlers */ for(i=0; itcp_handlers[i] = comm_point_create_tcp_handler(base, - c, bufsize, callback, callback_arg); + c, bufsize, spoolbuf, callback, callback_arg); if(!c->tcp_handlers[i]) { comm_point_delete(c); return NULL; @@ -2949,6 +2986,8 @@ comm_point_close(struct comm_point* c) } } tcl_close_connection(c->tcl_addr); + if(c->tcp_req_info) + tcp_req_info_clear(c->tcp_req_info); /* close fd after removing from event lists, or epoll.. is messed up */ if(c->fd != -1 && !c->do_not_close) { if(c->type == comm_tcp || c->type == comm_http) { @@ -2992,6 +3031,9 @@ comm_point_delete(struct comm_point* c) sldns_buffer_free(c->dnscrypt_buffer); } #endif + if(c->tcp_req_info) { + tcp_req_info_delete(c->tcp_req_info); + } } ub_event_free(c->ev->ev); free(c->ev); @@ -3032,8 +3074,12 @@ comm_point_send_reply(struct comm_reply *repinfo) dt_msg_send_client_response(repinfo->c->tcp_parent->dtenv, &repinfo->addr, repinfo->c->type, repinfo->c->buffer); #endif - comm_point_start_listening(repinfo->c, -1, - repinfo->c->tcp_timeout_msec); + if(repinfo->c->tcp_req_info) { + tcp_req_info_send_reply(repinfo->c->tcp_req_info); + } else { + comm_point_start_listening(repinfo->c, -1, + repinfo->c->tcp_timeout_msec); + } } } @@ -3046,6 +3092,8 @@ comm_point_drop_reply(struct comm_reply* repinfo) log_assert(repinfo->c->type != comm_tcp_accept); if(repinfo->c->type == comm_udp) return; + if(repinfo->c->tcp_req_info) + repinfo->c->tcp_req_info->is_drop = 1; reclaim_tcp_handler(repinfo->c); } diff --git a/util/netevent.h b/util/netevent.h index f6b6af688..d80c72b33 100644 --- a/util/netevent.h +++ b/util/netevent.h @@ -268,6 +268,9 @@ struct comm_point { /** the entry for the connection. */ struct tcl_addr* tcl_addr; + /** the structure to keep track of open requests on this channel */ + struct tcp_req_info* tcp_req_info; + #ifdef USE_MSG_FASTOPEN /** used to track if the sendto() call should be done when using TFO. */ int tcp_do_fastopen; @@ -455,6 +458,8 @@ struct comm_point* comm_point_create_udp_ancil(struct comm_base* base, * @param idle_timeout: TCP idle timeout in ms. * @param tcp_conn_limit: TCP connection limit info. * @param bufsize: size of buffer to create for handlers. + * @param spoolbuf: shared spool buffer for tcp_req_info structures. + * or NULL to not create those structures in the tcp handlers. * @param callback: callback function pointer for TCP handlers. * @param callback_arg: will be passed to your callback function. * @return: returns the TCP listener commpoint. You can find the @@ -464,7 +469,8 @@ struct comm_point* comm_point_create_udp_ancil(struct comm_base* base, */ struct comm_point* comm_point_create_tcp(struct comm_base* base, int fd, int num, int idle_timeout, struct tcl_list* tcp_conn_limit, - size_t bufsize, comm_point_callback_type* callback, void* callback_arg); + size_t bufsize, struct sldns_buffer* spoolbuf, + comm_point_callback_type* callback, void* callback_arg); /** * Create an outgoing TCP commpoint. No file descriptor is opened, left at -1.