$(srcdir)/services/listen_dnsport.h $(srcdir)/util/netevent.h $(srcdir)/dnscrypt/dnscrypt.h \
$(srcdir)/services/outside_network.h $(srcdir)/util/rbtree.h \
$(srcdir)/util/log.h $(srcdir)/util/config_file.h $(srcdir)/util/net_help.h \
- $(srcdir)/sldns/sbuffer.h
+ $(srcdir)/sldns/sbuffer.h $(srcdir)/services/mesh.h $(srcdir)/util/data/msgparse.h \
+ $(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/sldns/pkthdr.h $(srcdir)/sldns/rrdef.h \
+ $(srcdir)/util/module.h $(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h \
+ $(srcdir)/services/modstack.h $(srcdir)/util/fptr_wlist.h $(srcdir)/util/tube.h
localzone.lo localzone.o: $(srcdir)/services/localzone.c config.h $(srcdir)/services/localzone.h \
$(srcdir)/util/rbtree.h $(srcdir)/util/locks.h $(srcdir)/util/log.h $(srcdir)/util/storage/dnstree.h \
$(srcdir)/util/module.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/data/msgreply.h \
$(srcdir)/util/data/msgencode.h $(srcdir)/util/timehist.h $(srcdir)/util/fptr_wlist.h $(srcdir)/util/tube.h \
$(srcdir)/util/alloc.h $(srcdir)/util/config_file.h $(srcdir)/util/edns.h $(srcdir)/sldns/sbuffer.h \
$(srcdir)/sldns/wire2str.h $(srcdir)/services/localzone.h $(srcdir)/util/storage/dnstree.h \
- $(srcdir)/services/view.h $(srcdir)/util/data/dname.h $(srcdir)/respip/respip.h
+ $(srcdir)/services/view.h $(srcdir)/util/data/dname.h $(srcdir)/respip/respip.h \
+ $(srcdir)/services/listen_dnsport.h
modstack.lo modstack.o: $(srcdir)/services/modstack.c config.h $(srcdir)/services/modstack.h \
$(srcdir)/util/module.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/util/log.h \
$(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h $(srcdir)/util/data/msgparse.h \
$(srcdir)/util/data/msgreply.h $(srcdir)/util/data/packed_rrset.h $(srcdir)/util/data/msgparse.h \
$(srcdir)/sldns/pkthdr.h $(srcdir)/sldns/rrdef.h $(srcdir)/util/tube.h $(srcdir)/services/mesh.h \
$(srcdir)/services/modstack.h $(srcdir)/sldns/sbuffer.h $(srcdir)/sldns/str2wire.h $(srcdir)/dnstap/dnstap.h \
- \
+ $(srcdir)/services/listen_dnsport.h \
net_help.lo net_help.o: $(srcdir)/util/net_help.c config.h $(srcdir)/util/net_help.h $(srcdir)/util/log.h \
$(srcdir)/util/data/dname.h $(srcdir)/util/storage/lruhash.h $(srcdir)/util/locks.h $(srcdir)/util/module.h \
+11 January 2018: Wouter
+ - Initial commit for out-of-order processing for TCP and TLS.
+
9 January 2018: Wouter
- Log query name for looping module errors.
#include "util/config_file.h"
#include "util/net_help.h"
#include "sldns/sbuffer.h"
+#include "services/mesh.h"
+#include "util/fptr_wlist.h"
#ifdef HAVE_NETDB_H
#include <netdb.h>
ports->ftype == listen_type_tcp_dnscrypt)
cp = comm_point_create_tcp(base, ports->fd,
tcp_accept_count, tcp_idle_timeout,
- tcp_conn_limit, bufsize, cb, cb_arg);
+ tcp_conn_limit, bufsize, front->udp_buff,
+ cb, cb_arg);
else if(ports->ftype == listen_type_ssl) {
cp = comm_point_create_tcp(base, ports->fd,
tcp_accept_count, tcp_idle_timeout,
- tcp_conn_limit, bufsize, cb, cb_arg);
+ tcp_conn_limit, bufsize, front->udp_buff,
+ cb, cb_arg);
cp->ssl = sslctx;
} else if(ports->ftype == listen_type_udpancil ||
ports->ftype == listen_type_udpancil_dnscrypt)
}
}
+struct tcp_req_info*
+tcp_req_info_create(struct sldns_buffer* spoolbuf)
+{
+ struct tcp_req_info* req = (struct tcp_req_info*)malloc(sizeof(*req));
+ if(!req) {
+ log_err("malloc failure for new stream outoforder processing structure");
+ return NULL;
+ }
+ memset(req, 0, sizeof(*req));
+ req->spool_buffer = spoolbuf;
+ return req;
+}
+
+void
+tcp_req_info_delete(struct tcp_req_info* req)
+{
+ if(!req) return;
+ tcp_req_info_clear(req);
+ /* cp is pointer back to commpoint that owns this struct and
+ * called delete on us */
+ /* spool_buffer is shared udp buffer, not deleted here */
+ free(req);
+}
+
+void tcp_req_info_clear(struct tcp_req_info* req)
+{
+ struct tcp_req_open_item* open, *nopen;
+ struct tcp_req_done_item* item, *nitem;
+ if(!req) return;
+
+ /* free outstanding request mesh reply entries */
+ open = req->open_req_list;
+ while(open) {
+ nopen = open->next;
+ mesh_state_remove_reply(open->mesh, open->mesh_state, req->cp);
+ free(open);
+ open = nopen;
+ }
+ req->open_req_list = NULL;
+ req->num_open_req = 0;
+
+ /* free pending writable result packets */
+ item = req->done_req_list;
+ while(item) {
+ nitem = item->next;
+ free(item->buf);
+ free(item);
+ item = nitem;
+ }
+ req->done_req_list = NULL;
+ req->num_done_req = 0;
+ req->read_is_closed = 0;
+}
+
+void
+tcp_req_info_remove_mesh_state(struct tcp_req_info* req, struct mesh_state* m)
+{
+ struct tcp_req_open_item* open, *prev = NULL;
+ if(!req || !m) return;
+ open = req->open_req_list;
+ while(open) {
+ if(open->mesh_state == m) {
+ struct tcp_req_open_item* next;
+ if(prev) prev->next = open->next;
+ else req->open_req_list = open->next;
+ /* caller has to manage the mesh state reply entry */
+ next = open->next;
+ free(open);
+ req->num_open_req --;
+
+ /* prev = prev; */
+ open = next;
+ continue;
+ }
+ prev = open;
+ open = open->next;
+ }
+}
+
+/** number of simultaneous requests a client can have */
+#define TCP_MAX_REQ_SIMULTANEOUS 10
+
+/** setup listening for read or write */
+static void
+tcp_req_info_setup_listen(struct tcp_req_info* req)
+{
+ int wr = 0;
+ int rd = 0;
+
+ if(req->cp->tcp_byte_count != 0) {
+ /* cannot change, halfway through */
+ return;
+ }
+
+ if(!req->cp->tcp_is_reading)
+ wr = 1;
+ if(req->num_open_req + req->num_done_req < TCP_MAX_REQ_SIMULTANEOUS &&
+ !req->read_is_closed)
+ rd = 1;
+
+ if(wr) {
+ req->cp->tcp_is_reading = 0;
+ comm_point_start_listening(req->cp, -1,
+ req->cp->tcp_timeout_msec);
+ } else if(rd) {
+ req->cp->tcp_is_reading = 1;
+ comm_point_start_listening(req->cp, -1,
+ req->cp->tcp_timeout_msec);
+ } else {
+ comm_point_start_listening(req->cp, -1,
+ req->cp->tcp_timeout_msec);
+ comm_point_listen_for_rw(req->cp, 0, 0);
+ }
+}
+
+/** remove first item from list of pending results */
+static struct tcp_req_done_item*
+tcp_req_info_pop_done(struct tcp_req_info* req)
+{
+ struct tcp_req_done_item* item;
+ log_assert(req->num_done_req > 0 && req->done_req_list);
+ item = req->done_req_list;
+ req->done_req_list = req->done_req_list->next;
+ req->num_done_req --;
+ return item;
+}
+
+/** Send given buffer and setup to write */
+static void
+tcp_req_info_start_write_buf(struct tcp_req_info* req, uint8_t* buf,
+ size_t len)
+{
+ sldns_buffer_clear(req->cp->buffer);
+ sldns_buffer_write(req->cp->buffer, buf, len);
+ sldns_buffer_flip(req->cp->buffer);
+
+ req->cp->tcp_is_reading = 0; /* we are now writing */
+}
+
+/** pick up the next result and start writing it to the channel */
+static void
+tcp_req_pickup_next_result(struct tcp_req_info* req)
+{
+ if(req->num_done_req > 0) {
+ /* unlist the done item from the list of pending results */
+ struct tcp_req_done_item* item = tcp_req_info_pop_done(req);
+ tcp_req_info_start_write_buf(req, item->buf, item->len);
+ free(item->buf);
+ free(item);
+ }
+}
+
+/** the read channel has closed */
+int
+tcp_req_info_handle_read_close(struct tcp_req_info* req)
+{
+ verbose(VERB_ALGO, "tcp channel read side closed %d", req->cp->fd);
+ if(req->num_done_req != 0) {
+ tcp_req_pickup_next_result(req);
+ tcp_req_info_setup_listen(req);
+ return 1;
+ }
+ if(req->num_open_req == 0 && req->num_done_req == 0)
+ return 0;
+ req->read_is_closed = 1;
+ tcp_req_info_setup_listen(req);
+ return 1;
+}
+
+void
+tcp_req_info_handle_writedone(struct tcp_req_info* req)
+{
+ /* back to reading state, we finished this write event */
+ sldns_buffer_clear(req->cp->buffer);
+ req->cp->tcp_is_reading = 1;
+ /* see if another result needs writing */
+ tcp_req_pickup_next_result(req);
+
+ /* see if there is more to write, if not stop_listening for writing */
+ /* see if new requests are allowed, if so, start_listening
+ * for reading */
+ tcp_req_info_setup_listen(req);
+}
+
+void
+tcp_req_info_handle_readdone(struct tcp_req_info* req)
+{
+ struct comm_point* c = req->cp;
+
+ /* we want to read up several requests, unless there are
+ * pending answers */
+
+ req->is_drop = 0;
+ req->is_reply = 0;
+ req->in_worker_handle = 1;
+ /* handle the current request, */
+ fptr_ok(fptr_whitelist_comm_point(c->callback));
+ if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &c->repinfo) ) {
+ req->in_worker_handle = 0;
+ /* there is an answer, put it up. It is already in the
+ * c->buffer, just send it. */
+ /* since we were just reading a query, the channel is
+ * clear to write to */
+ send_it:
+ c->tcp_is_reading = 0;
+ comm_point_start_listening(c, -1, c->tcp_timeout_msec);
+ return;
+ }
+ req->in_worker_handle = 0;
+ /* it should be waiting in the mesh for recursion.
+ * If mesh failed(formerr) and called commpoint_drop_reply. Then the
+ * mesh state has been cleared. */
+ if(req->is_drop) {
+ return;
+ }
+ /* If mesh failed(mallocfail) and called commpoint_send_reply with
+ * something like servfail then we pick up that reply below. */
+ if(req->is_reply) {
+ goto send_it;
+ }
+
+ sldns_buffer_clear(req->cp->buffer);
+ /* if pending answers, pick up an answer and start sending it */
+ tcp_req_pickup_next_result(req);
+
+ /* if answers pending, start sending answers */
+ /* read more requests if we can have more requests */
+ tcp_req_info_setup_listen(req);
+}
+
+int
+tcp_req_info_add_meshstate(struct tcp_req_info* req,
+ struct mesh_area* mesh, struct mesh_state* m)
+{
+ struct tcp_req_open_item* item;
+ log_assert(req && mesh && m);
+ item = (struct tcp_req_open_item*)malloc(sizeof(*item));
+ if(!item) return 0;
+ item->next = req->open_req_list;
+ item->mesh = mesh;
+ item->mesh_state = m;
+ req->open_req_list = item;
+ req->num_open_req++;
+ return 1;
+}
+
+/** Add a result to the result list. At the end. */
+static int
+tcp_req_info_add_result(struct tcp_req_info* req, uint8_t* buf, size_t len)
+{
+ struct tcp_req_done_item* last = NULL;
+ struct tcp_req_done_item* item;
+ /* find last element */
+ last = req->done_req_list;
+ while(last && last->next)
+ last = last->next;
+
+ /* create new element */
+ item = (struct tcp_req_done_item*)malloc(sizeof(*item));
+ if(!item) {
+ return 0;
+ }
+ item->next = NULL;
+ item->len = len;
+ item->buf = memdup(buf, len);
+ if(!item->buf) {
+ free(item);
+ return 0;
+ }
+
+ /* link in */
+ if(last) last->next = item;
+ else req->done_req_list = item;
+ req->num_done_req++;
+ return 1;
+}
+
+void
+tcp_req_info_send_reply(struct tcp_req_info* req)
+{
+ if(req->in_worker_handle) {
+ /* It is in the right buffer to answer straight away */
+ req->is_reply = 1;
+ return;
+ }
+ /* now that the query has been handled, that mesh_reply entry
+ * should be removed, from the tcp_req_info list */
+ /* TODO: find it, need mstate ptr */
+ /* see if we can send it straight away (we are not doing
+ * anything else). If so, copy to buffer and start */
+ if(req->cp->tcp_is_reading && req->cp->tcp_byte_count == 0) {
+ /* buffer is free, and was ready to read new query into,
+ * but we are now going to use it to send this answer */
+ tcp_req_info_start_write_buf(req,
+ sldns_buffer_begin(req->spool_buffer),
+ sldns_buffer_limit(req->spool_buffer));
+ /* switch to listen to write events */
+ comm_point_stop_listening(req->cp);
+ comm_point_start_listening(req->cp, -1,
+ req->cp->tcp_timeout_msec);
+ return;
+ }
+ /* queue up the answer behind the others already pending */
+ if(!tcp_req_info_add_result(req, sldns_buffer_begin(req->spool_buffer),
+ sldns_buffer_limit(req->spool_buffer))) {
+ log_err("malloc failure adding reply to stream result list");
+ }
+}
*/
int create_local_accept_sock(const char* path, int* noproto, int use_systemd);
+/**
+ * TCP request info. List of requests outstanding on the channel, that
+ * are asked for but not yet answered back.
+ */
+struct tcp_req_info {
+ /** the TCP comm point for this. Its buffer is used for read/write */
+ struct comm_point* cp;
+ /** the buffer to use to spool reply from mesh into,
+ * it can then be copied to the result list and written.
+ * it is a pointer to the shared udp buffer. */
+ struct sldns_buffer* spool_buffer;
+ /** are we in worker_handle function call (for recursion callback)*/
+ int in_worker_handle;
+ /** is the comm point dropped (by worker handle).
+ * That means we have to disconnect the channel. */
+ int is_drop;
+ /** is the comm point set to send_reply (by mesh new client in worker
+ * handle), if so answer is available in c.buffer */
+ int is_reply;
+ /** read channel has closed, just write pending results */
+ int read_is_closed;
+ /** number of outstanding requests */
+ int num_open_req;
+ /** list of outstanding requests */
+ struct tcp_req_open_item* open_req_list;
+ /** number of pending writeable results */
+ int num_done_req;
+ /** list of pending writable result packets, malloced one at a time */
+ struct tcp_req_done_item* done_req_list;
+};
+
+/**
+ * List of open items in TCP channel
+ */
+struct tcp_req_open_item {
+ /** next in list */
+ struct tcp_req_open_item* next;
+ /** the mesh area of the mesh_state */
+ struct mesh_area* mesh;
+ /** the mesh state */
+ struct mesh_state* mesh_state;
+};
+
+/**
+ * List of done items in TCP channel
+ */
+struct tcp_req_done_item {
+ /** next in list */
+ struct tcp_req_done_item* next;
+ /** the buffer with packet contents */
+ uint8_t* buf;
+ /** length of the buffer */
+ size_t len;
+};
+
+/**
+ * Create tcp request info structure that keeps track of open
+ * requests on the TCP channel that are resolved at the same time,
+ * and the pending results that have to get written back to that client.
+ * @param spoolbuf: shared buffer
+ * @return new structure or NULL on alloc failure.
+ */
+struct tcp_req_info* tcp_req_info_create(struct sldns_buffer* spoolbuf);
+
+/**
+ * Delete tcp request structure. Called by owning commpoint.
+ * Removes mesh entry references and stored results from the lists.
+ * @param req: the tcp request info
+ */
+void tcp_req_info_delete(struct tcp_req_info* req);
+
+/**
+ * Clear tcp request structure. Removes list entries, sets it up ready
+ * for the next connection.
+ * @param req: tcp request info structure.
+ */
+void tcp_req_info_clear(struct tcp_req_info* req);
+
+/**
+ * Remove mesh state entry from list in tcp_req_info.
+ * caller has to manage the mesh state reply entry in the mesh state.
+ * @param req: the tcp req info that has the entry removed from the list.
+ * @param m: the state removed from the list.
+ */
+void tcp_req_info_remove_mesh_state(struct tcp_req_info* req,
+ struct mesh_state* m);
+
+/**
+ * Handle write done of the last result packet
+ */
+void tcp_req_info_handle_writedone(struct tcp_req_info* req);
+
+/**
+ * Handle read done of a new request from the client
+ */
+void tcp_req_info_handle_readdone(struct tcp_req_info* req);
+
+/**
+ * Add mesh state to the tcp req list of open requests.
+ * So the comm_reply can be removed off the mesh reply list when
+ * the tcp channel has to be closed (for other reasons then that that
+ * request was done, eg. channel closed by client or some format error.
+ * @param req: tcp req info structure. It keeps track of the simultaneous
+ * requests and results on a tcp (or TLS) channel.
+ * @param mesh: mesh area for the state.
+ * @param m: mesh state to add.
+ * @return 0 on failure (malloc failure).
+ */
+int tcp_req_info_add_meshstate(struct tcp_req_info* req,
+ struct mesh_area* mesh, struct mesh_state* m);
+
+/**
+ * Send reply on tcp simultaneous answer channel. May queue it up.
+ * @param req: request info structure.
+ */
+void tcp_req_info_send_reply(struct tcp_req_info* req);
+
+/** the read channel has closed
+ * @param req: request. remaining queries are looked up and answered.
+ * @return zero if nothing to do, just close the tcp.
+ */
+int tcp_req_info_handle_read_close(struct tcp_req_info* req);
+
#endif /* LISTEN_DNSPORT_H */
#include "services/localzone.h"
#include "util/data/dname.h"
#include "respip/respip.h"
+#include "services/listen_dnsport.h"
/** subtract timers and the values do not overflow or become negative */
static void
/* add reply to s */
if(!mesh_state_add_reply(s, edns, rep, qid, qflags, qinfo)) {
log_err("mesh_new_client: out of memory; SERVFAIL");
+ servfail_mem:
if(!inplace_cb_reply_servfail_call(mesh->env, qinfo, &s->s,
NULL, LDNS_RCODE_SERVFAIL, edns, rep, mesh->env->scratch))
edns->opt_list = NULL;
mesh_state_delete(&s->s);
return;
}
+ if(rep->c->tcp_req_info) {
+ if(!tcp_req_info_add_meshstate(rep->c->tcp_req_info, mesh, s)) {
+ log_err("mesh_new_client: out of memory add tcpreqinfo");
+ goto servfail_mem;
+ }
+ }
/* update statistics */
if(was_detached) {
log_assert(mesh->num_detached_states > 0);
* @param rcode: if not 0, error code.
* @param rep: reply to send (or NULL if rcode is set).
* @param r: reply entry
+ * @param r_buffer: buffer to use for reply entry.
* @param prev: previous reply, already has its answer encoded in buffer.
+ * @param prev_buffer: buffer for previous reply.
*/
static void
mesh_send_reply(struct mesh_state* m, int rcode, struct reply_info* rep,
- struct mesh_reply* r, struct mesh_reply* prev)
+ struct mesh_reply* r, struct sldns_buffer* r_buffer,
+ struct mesh_reply* prev, struct sldns_buffer* prev_buffer)
{
struct timeval end_time;
struct timeval duration;
* and still reuse the previous answer if they are the same, but that
* would be complicated and error prone for the relatively minor case.
* So we err on the side of safety. */
- if(prev && prev->qflags == r->qflags &&
+ if(prev && prev_buffer && prev->qflags == r->qflags &&
!prev->local_alias && !r->local_alias &&
prev->edns.edns_present == r->edns.edns_present &&
prev->edns.bits == r->edns.bits &&
edns_opt_list_compare(prev->edns.opt_list, r->edns.opt_list)
== 0) {
/* if the previous reply is identical to this one, fix ID */
- if(prev->query_reply.c->buffer != r->query_reply.c->buffer)
- sldns_buffer_copy(r->query_reply.c->buffer,
- prev->query_reply.c->buffer);
- sldns_buffer_write_at(r->query_reply.c->buffer, 0,
- &r->qid, sizeof(uint16_t));
- sldns_buffer_write_at(r->query_reply.c->buffer, 12,
- r->qname, m->s.qinfo.qname_len);
+ if(prev_buffer != r_buffer)
+ sldns_buffer_copy(r_buffer, prev_buffer);
+ sldns_buffer_write_at(r_buffer, 0, &r->qid, sizeof(uint16_t));
+ sldns_buffer_write_at(r_buffer, 12, r->qname,
+ m->s.qinfo.qname_len);
comm_point_send_reply(&r->query_reply);
} else if(rcode) {
m->s.qinfo.qname = r->qname;
&r->edns, NULL, m->s.region))
r->edns.opt_list = NULL;
}
- error_encode(r->query_reply.c->buffer, rcode, &m->s.qinfo,
- r->qid, r->qflags, &r->edns);
+ error_encode(r_buffer, rcode, &m->s.qinfo, r->qid,
+ r->qflags, &r->edns);
comm_point_send_reply(&r->query_reply);
} else {
size_t udp_size = r->edns.udp_size;
m->s.env->cfg, r->query_reply.c,
m->s.region) ||
!reply_info_answer_encode(&m->s.qinfo, rep, r->qid,
- r->qflags, r->query_reply.c->buffer, 0, 1,
- m->s.env->scratch, udp_size, &r->edns,
- (int)(r->edns.bits & EDNS_DO), secure))
+ r->qflags, r_buffer, 0, 1, m->s.env->scratch,
+ udp_size, &r->edns, (int)(r->edns.bits & EDNS_DO),
+ secure))
{
if(!inplace_cb_reply_servfail_call(m->s.env, &m->s.qinfo, &m->s,
rep, LDNS_RCODE_SERVFAIL, &r->edns, NULL, m->s.region))
r->edns.opt_list = NULL;
- error_encode(r->query_reply.c->buffer,
- LDNS_RCODE_SERVFAIL, &m->s.qinfo, r->qid,
- r->qflags, &r->edns);
+ error_encode(r_buffer, LDNS_RCODE_SERVFAIL,
+ &m->s.qinfo, r->qid, r->qflags, &r->edns);
}
r->edns = edns_bak;
comm_point_send_reply(&r->query_reply);
timeval_add(&m->s.env->mesh->replies_sum_wait, &duration);
timehist_insert(m->s.env->mesh->histogram, &duration);
if(m->s.env->cfg->stat_extended) {
- uint16_t rc = FLAGS_GET_RCODE(sldns_buffer_read_u16_at(r->
- query_reply.c->buffer, 2));
+ uint16_t rc = FLAGS_GET_RCODE(sldns_buffer_read_u16_at(
+ r_buffer, 2));
if(secure) m->s.env->mesh->ans_secure++;
m->s.env->mesh->ans_rcode[ rc ] ++;
- if(rc == 0 && LDNS_ANCOUNT(sldns_buffer_begin(r->
- query_reply.c->buffer)) == 0)
+ if(rc == 0 && LDNS_ANCOUNT(sldns_buffer_begin(r_buffer)) == 0)
m->s.env->mesh->ans_nodata++;
}
/* Log reply sent */
if(m->s.env->cfg->log_replies) {
log_reply_info(0, &m->s.qinfo, &r->query_reply.addr,
- r->query_reply.addrlen, duration, 0,
- r->query_reply.c->buffer);
+ r->query_reply.addrlen, duration, 0, r_buffer);
}
}
{
struct mesh_reply* r;
struct mesh_reply* prev = NULL;
+ struct sldns_buffer* prev_buffer = NULL;
struct mesh_cb* c;
struct reply_info* rep = (mstate->s.return_msg?
mstate->s.return_msg->rep:NULL);
if(mstate->s.is_drop)
comm_point_drop_reply(&r->query_reply);
else {
+ struct sldns_buffer* r_buffer = r->query_reply.c->buffer;
+ if(r->query_reply.c->tcp_req_info)
+ r_buffer = r->query_reply.c->tcp_req_info->spool_buffer;
mesh_send_reply(mstate, mstate->s.return_rcode, rep,
- r, prev);
+ r, r_buffer, prev, prev_buffer);
+ if(r->query_reply.c->tcp_req_info)
+ tcp_req_info_remove_mesh_state(r->query_reply.c->tcp_req_info, mstate);
prev = r;
+ prev_buffer = r_buffer;
}
}
mstate->replies_sent = 1;
m->prev->next = m->next;
else *fp = m->next;
}
+
+void mesh_state_remove_reply(struct mesh_area* mesh, struct mesh_state* m,
+ struct comm_point* cp)
+{
+ struct mesh_reply* n, *prev = NULL;
+ n = m->reply_list;
+ if(!n) return; /* nothing to remove, also no accounting needed */
+ while(n) {
+ if(n->query_reply.c == cp) {
+ /* unlink it */
+ if(prev) prev->next = n->next;
+ else m->reply_list = n->next;
+ /* delete it, but allocated in m region */
+ mesh->num_reply_addrs--;
+
+ /* prev = prev; */
+ n = n->next;
+ continue;
+ }
+ prev = n;
+ n = n->next;
+ }
+ /* it was not detached (because it had a reply list), could be now */
+ if(!m->reply_list && !m->cb_list
+ && m->super_set.count == 0) {
+ mesh->num_detached_states++;
+ }
+ /* if not replies any more in mstate, it is no longer a reply_state */
+ if(!m->reply_list && !m->cb_list) {
+ log_assert(mesh->num_reply_states > 0);
+ mesh->num_reply_states--;
+ }
+}
void mesh_list_remove(struct mesh_state* m, struct mesh_state** fp,
struct mesh_state** lp);
+/**
+ * Remove mesh reply entry from the reply entry list. Searches for
+ * the repinfo pointer.
+ * @param mesh: to update the counters.
+ * @param m: the mesh state.
+ * @param cp: the commpoint to remove from the list.
+ */
+void mesh_state_remove_reply(struct mesh_area* mesh, struct mesh_state* m,
+ struct comm_point* cp);
+
#endif /* SERVICES_MESH_H */
return 0;
}
+int tcp_req_info_add_meshstate(struct tcp_req_info* ATTR_UNUSED(req),
+ struct mesh_area* ATTR_UNUSED(mesh), struct mesh_state* ATTR_UNUSED(m))
+{
+ log_assert(0);
+ return 0;
+}
+
+void
+tcp_req_info_remove_mesh_state(struct tcp_req_info* ATTR_UNUSED(req),
+ struct mesh_state* ATTR_UNUSED(m))
+{
+ log_assert(0);
+}
+
/*********** End of Dummy routines ***********/
. ../common.sh
kill_pid $FWD_PID
kill_pid $UNBOUND_PID
+cat unbound.log
# check what sort of netcat we have
if nc -h 2>&1 | grep "q secs"; then
- ncopt="-q 3 -w 2"
+ ncopt="-q 3 -i 2"
else
- ncopt="-w 2"
+ ncopt="-i 2"
fi
PRE="../.."
#include "sldns/str2wire.h"
#include "dnstap/dnstap.h"
#include "dnscrypt/dnscrypt.h"
+#include "services/listen_dnsport.h"
#ifdef HAVE_OPENSSL_SSL_H
#include <openssl/ssl.h>
#endif
/** create a tcp handler with a parent */
static struct comm_point* comm_point_create_tcp_handler(
struct comm_base *base, struct comm_point* parent, size_t bufsize,
- comm_point_callback_type* callback, void* callback_arg);
+ struct sldns_buffer* spoolbuf, comm_point_callback_type* callback,
+ void* callback_arg);
/* -------- End of local definitions -------- */
c->tcp_byte_count = 0;
/* switch from listening(write) to listening(read) */
comm_point_stop_listening(c);
- comm_point_start_listening(c, -1, -1);
+ if(c->tcp_req_info) {
+ tcp_req_info_handle_writedone(c->tcp_req_info);
+ } else {
+ comm_point_start_listening(c, -1, -1);
+ }
}
/** do the callback when reading is done */
c->tcp_byte_count = 0;
if(c->type == comm_tcp)
comm_point_stop_listening(c);
- fptr_ok(fptr_whitelist_comm_point(c->callback));
- if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &c->repinfo) ) {
- comm_point_start_listening(c, -1, c->tcp_timeout_msec);
+ if(c->tcp_req_info) {
+ tcp_req_info_handle_readdone(c->tcp_req_info);
+ } else {
+ fptr_ok(fptr_whitelist_comm_point(c->callback));
+ if( (*c->callback)(c, c->cb_arg, NETEVENT_NOERROR, &c->repinfo) ) {
+ comm_point_start_listening(c, -1, c->tcp_timeout_msec);
+ }
}
}
c->tcp_byte_count))) <= 0) {
int want = SSL_get_error(c->ssl, r);
if(want == SSL_ERROR_ZERO_RETURN) {
+ if(c->tcp_req_info)
+ return tcp_req_info_handle_read_close(c->tcp_req_info);
return 0; /* shutdown, closed */
} else if(want == SSL_ERROR_WANT_READ) {
ub_winsock_tcp_wouldblock(c->ev->ev, UB_EV_READ);
if(r <= 0) {
int want = SSL_get_error(c->ssl, r);
if(want == SSL_ERROR_ZERO_RETURN) {
+ if(c->tcp_req_info)
+ return tcp_req_info_handle_read_close(c->tcp_req_info);
return 0; /* shutdown, closed */
} else if(want == SSL_ERROR_WANT_READ) {
ub_winsock_tcp_wouldblock(c->ev->ev, UB_EV_READ);
/* read length bytes */
r = recv(fd,(void*)sldns_buffer_at(c->buffer,c->tcp_byte_count),
sizeof(uint16_t)-c->tcp_byte_count, 0);
- if(r == 0)
+ if(r == 0) {
+ if(c->tcp_req_info)
+ return tcp_req_info_handle_read_close(c->tcp_req_info);
return 0;
- else if(r == -1) {
+ } else if(r == -1) {
#ifndef USE_WINSOCK
if(errno == EINTR || errno == EAGAIN)
return 1;
r = recv(fd, (void*)sldns_buffer_current(c->buffer),
sldns_buffer_remaining(c->buffer), 0);
if(r == 0) {
+ if(c->tcp_req_info)
+ return tcp_req_info_handle_read_close(c->tcp_req_info);
return 0;
} else if(r == -1) {
#ifndef USE_WINSOCK
static struct comm_point*
comm_point_create_tcp_handler(struct comm_base *base,
struct comm_point* parent, size_t bufsize,
- comm_point_callback_type* callback, void* callback_arg)
+ struct sldns_buffer* spoolbuf, comm_point_callback_type* callback,
+ void* callback_arg)
{
struct comm_point* c = (struct comm_point*)calloc(1,
sizeof(struct comm_point));
c->repinfo.c = c;
c->callback = callback;
c->cb_arg = callback_arg;
+ if(spoolbuf) {
+ c->tcp_req_info = tcp_req_info_create(spoolbuf);
+ if(!c->tcp_req_info) {
+ log_err("could not create tcp commpoint");
+ sldns_buffer_free(c->buffer);
+ free(c->timeout);
+ free(c->ev);
+ free(c);
+ return NULL;
+ }
+ c->tcp_req_info->cp = c;
+ c->tcp_do_close = 1;
+ c->tcp_do_toggle_rw = 0;
+ }
/* add to parent free list */
c->tcp_free = parent->tcp_free;
parent->tcp_free = c;
{
log_err("could not basetset tcphdl event");
parent->tcp_free = c->tcp_free;
+ tcp_req_info_delete(c->tcp_req_info);
+ sldns_buffer_free(c->buffer);
+ free(c->timeout);
free(c->ev);
free(c);
return NULL;
struct comm_point*
comm_point_create_tcp(struct comm_base *base, int fd, int num,
int idle_timeout, struct tcl_list* tcp_conn_limit, size_t bufsize,
- comm_point_callback_type* callback, void* callback_arg)
+ struct sldns_buffer* spoolbuf, comm_point_callback_type* callback,
+ void* callback_arg)
{
struct comm_point* c = (struct comm_point*)calloc(1,
sizeof(struct comm_point));
/* now prealloc the tcp handlers */
for(i=0; i<num; i++) {
c->tcp_handlers[i] = comm_point_create_tcp_handler(base,
- c, bufsize, callback, callback_arg);
+ c, bufsize, spoolbuf, callback, callback_arg);
if(!c->tcp_handlers[i]) {
comm_point_delete(c);
return NULL;
}
}
tcl_close_connection(c->tcl_addr);
+ if(c->tcp_req_info)
+ tcp_req_info_clear(c->tcp_req_info);
/* close fd after removing from event lists, or epoll.. is messed up */
if(c->fd != -1 && !c->do_not_close) {
if(c->type == comm_tcp || c->type == comm_http) {
sldns_buffer_free(c->dnscrypt_buffer);
}
#endif
+ if(c->tcp_req_info) {
+ tcp_req_info_delete(c->tcp_req_info);
+ }
}
ub_event_free(c->ev->ev);
free(c->ev);
dt_msg_send_client_response(repinfo->c->tcp_parent->dtenv,
&repinfo->addr, repinfo->c->type, repinfo->c->buffer);
#endif
- comm_point_start_listening(repinfo->c, -1,
- repinfo->c->tcp_timeout_msec);
+ if(repinfo->c->tcp_req_info) {
+ tcp_req_info_send_reply(repinfo->c->tcp_req_info);
+ } else {
+ comm_point_start_listening(repinfo->c, -1,
+ repinfo->c->tcp_timeout_msec);
+ }
}
}
log_assert(repinfo->c->type != comm_tcp_accept);
if(repinfo->c->type == comm_udp)
return;
+ if(repinfo->c->tcp_req_info)
+ repinfo->c->tcp_req_info->is_drop = 1;
reclaim_tcp_handler(repinfo->c);
}
/** the entry for the connection. */
struct tcl_addr* tcl_addr;
+ /** the structure to keep track of open requests on this channel */
+ struct tcp_req_info* tcp_req_info;
+
#ifdef USE_MSG_FASTOPEN
/** used to track if the sendto() call should be done when using TFO. */
int tcp_do_fastopen;
* @param idle_timeout: TCP idle timeout in ms.
* @param tcp_conn_limit: TCP connection limit info.
* @param bufsize: size of buffer to create for handlers.
+ * @param spoolbuf: shared spool buffer for tcp_req_info structures.
+ * or NULL to not create those structures in the tcp handlers.
* @param callback: callback function pointer for TCP handlers.
* @param callback_arg: will be passed to your callback function.
* @return: returns the TCP listener commpoint. You can find the
*/
struct comm_point* comm_point_create_tcp(struct comm_base* base,
int fd, int num, int idle_timeout, struct tcl_list* tcp_conn_limit,
- size_t bufsize, comm_point_callback_type* callback, void* callback_arg);
+ size_t bufsize, struct sldns_buffer* spoolbuf,
+ comm_point_callback_type* callback, void* callback_arg);
/**
* Create an outgoing TCP commpoint. No file descriptor is opened, left at -1.