return sockaddr_cmp(&q1->addr, q1->addrlen, &q2->addr, q2->addrlen);
}
+int
+reuse_cmp(const void* key1, const void* key2)
+{
+ struct reuse_tcp* r1 = (struct reuse_tcp*)key1;
+ struct reuse_tcp* r2 = (struct reuse_tcp*)key2;
+ int r;
+ /* make sure the entries are in use (have a waiting_tcp entry) */
+ if(!r1->pending->query && !r2->pending->query)
+ return 0;
+ if(r1->pending->query && !r2->pending->query)
+ return 1;
+ if(!r1->pending->query && r2->pending->query)
+ return -1;
+
+ /* compare address and port */
+ r = sockaddr_cmp(&r1->pending->query->addr, r1->pending->query->addrlen,
+ &r2->pending->query->addr, r2->pending->query->addrlen);
+ if(r != 0)
+ return r;
+
+ /* compare if SSL-enabled */
+ if(r1->pending->c->ssl && !r2->pending->c->ssl)
+ return 1;
+ if(!r1->pending->c->ssl && r2->pending->c->ssl)
+ return -1;
+ return 0;
+}
+
/** delete waiting_tcp entry. Does not unlink from waiting list.
* @param w: to delete.
*/
return 1;
}
+/** use the buffer to setup writing the query */
+static void
+outnet_tcp_take_query_setup(int s, struct pending_tcp* pend, uint8_t* pkt,
+ size_t pkt_len)
+{
+ pend->id = LDNS_ID_WIRE(pkt);
+ sldns_buffer_clear(pend->c->buffer);
+ sldns_buffer_write(pend->c->buffer, pkt, pkt_len);
+ sldns_buffer_flip(pend->c->buffer);
+ pend->c->tcp_is_reading = 0;
+ pend->c->tcp_byte_count = 0;
+ comm_point_start_listening(pend->c, s, -1);
+}
+
/** use next free buffer to service a tcp query */
static int
outnet_tcp_take_into_use(struct waiting_tcp* w, uint8_t* pkt, size_t pkt_len)
}
w->pkt = NULL;
w->next_waiting = (void*)pend;
- pend->id = LDNS_ID_WIRE(pkt);
w->outnet->num_tcp_outgoing++;
w->outnet->tcp_free = pend->next_free;
pend->next_free = NULL;
pend->query = w;
pend->c->repinfo.addrlen = w->addrlen;
memcpy(&pend->c->repinfo.addr, &w->addr, w->addrlen);
- sldns_buffer_clear(pend->c->buffer);
- sldns_buffer_write(pend->c->buffer, pkt, pkt_len);
- sldns_buffer_flip(pend->c->buffer);
- pend->c->tcp_is_reading = 0;
- pend->c->tcp_byte_count = 0;
- comm_point_start_listening(pend->c, s, -1);
+ outnet_tcp_take_query_setup(s, pend, pkt, pkt_len);
return 1;
}
}
}
+/** remove reused element from tree and lru list */
+static void
+reuse_tcp_remove_tree_list(struct outside_network* outnet,
+ struct reuse_tcp* reuse)
+{
+ if(reuse->node.key) {
+ /* delete it from reuse tree */
+ (void)rbtree_delete(&outnet->tcp_reuse, &reuse->node);
+ reuse->node.key = NULL;
+ }
+ /* delete from reuse list */
+ if(reuse->pending) {
+ if(reuse->prev) {
+ /* assert that members of the lru list are waiting
+ * and thus have a pending pointer to the struct */
+ log_assert(reuse->prev->pending);
+ reuse->prev->next = reuse->next;
+ } else {
+ log_assert(!reuse->next || reuse->next->pending);
+ outnet->tcp_reuse_first =
+ (reuse->next?reuse->next->pending:NULL);
+ }
+ if(reuse->next) {
+ /* assert that members of the lru list are waiting
+ * and thus have a pending pointer to the struct */
+ log_assert(reuse->next->pending);
+ reuse->next->prev = reuse->prev;
+ } else {
+ log_assert(!reuse->prev || reuse->prev->pending);
+ outnet->tcp_reuse_last =
+ (reuse->prev?reuse->prev->pending:NULL);
+ }
+ reuse->pending = NULL;
+ }
+}
+
/** decommission a tcp buffer, closes commpoint and frees waiting_tcp entry */
static void
decommission_pending_tcp(struct outside_network* outnet,
comm_point_close(pend->c);
pend->next_free = outnet->tcp_free;
outnet->tcp_free = pend;
+ if(pend->reuse.pending) {
+ /* needs unlink from the reuse tree to get deleted */
+ reuse_tcp_remove_tree_list(outnet, &pend->reuse);
+ }
waiting_tcp_delete(pend->query);
pend->query = NULL;
- use_free_buffer(outnet);
+}
+
+/** insert into reuse tcp tree and LRU, false on failure (duplicate) */
+static int
+reuse_tcp_insert(struct outside_network* outnet, struct pending_tcp* pend_tcp)
+{
+ pend_tcp->reuse.node.key = &pend_tcp->reuse;
+ pend_tcp->reuse.pending = pend_tcp;
+ if(!rbtree_insert(&outnet->tcp_reuse, &pend_tcp->reuse.node)) {
+ /* this is a duplicate connection, close this one */
+ pend_tcp->reuse.node.key = NULL;
+ pend_tcp->reuse.pending = NULL;
+ return 0;
+ }
+ /* insert into LRU, first is newest */
+ pend_tcp->reuse.prev = NULL;
+ if(outnet->tcp_reuse_first) {
+ pend_tcp->reuse.next = &outnet->tcp_reuse_first->reuse;
+ outnet->tcp_reuse_first->reuse.prev = &pend_tcp->reuse;
+ } else {
+ pend_tcp->reuse.next = NULL;
+ outnet->tcp_reuse_last = pend_tcp;
+ }
+ outnet->tcp_reuse_first = pend_tcp;
+ return 1;
}
int
error = NETEVENT_CLOSED;
}
}
+ if(error == NETEVENT_NOERROR) {
+ /* add to reuse tree so it can be reused, if not a failure.
+ * This is possible if the state machine wants to make a tcp
+ * query again to the same destination. */
+ (void)reuse_tcp_insert(outnet, pend);
+ }
fptr_ok(fptr_whitelist_pending_tcp(pend->query->cb));
(void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info);
+ /* if reused, it should not be decommissioned, TODO */
+ /* or if another query wants to write, write that and read for more
+ * or more outstanding queries on the stream. TODO */
+ /* TODO also write multiple queries over the stream, even if no
+ * replies have returned yet */
decommission_pending_tcp(outnet, pend);
+ use_free_buffer(outnet);
return 0;
}
outside_network_delete(outnet);
return NULL;
}
+ rbtree_init(&outnet->tcp_reuse, reuse_cmp);
+ outnet->tcp_reuse_max = num_tcp;
/* allocate commpoints */
for(k=0; k<num_ports; k++) {
if(outnet->tcp_conns[i]) {
comm_point_delete(outnet->tcp_conns[i]->c);
waiting_tcp_delete(outnet->tcp_conns[i]->query);
+ /* TODO: loop over tcpwrite wait list and
+ * delete waiting_tcp_delete them */
free(outnet->tcp_conns[i]);
}
free(outnet->tcp_conns);
p = np;
}
}
+ /* was allocated in struct pending that was deleted above */
+ rbtree_init(&outnet->tcp_reuse, reuse_cmp);
+ outnet->tcp_reuse_first = NULL;
+ outnet->tcp_reuse_last = NULL;
if(outnet->udp_wait_first) {
struct pending* p = outnet->udp_wait_first, *np;
while(p) {
{
struct waiting_tcp* w = (struct waiting_tcp*)arg;
struct outside_network* outnet = w->outnet;
- comm_point_callback_type* cb;
- void* cb_arg;
+ int do_callback = 1;
if(w->pkt) {
/* it is on the waiting list */
waiting_list_remove(outnet, w);
} else {
/* it was in use */
struct pending_tcp* pend=(struct pending_tcp*)w->next_waiting;
+ /* see if it needs unlink from reuse tree */
+ if(pend->reuse.pending) {
+ reuse_tcp_remove_tree_list(outnet, &pend->reuse);
+ do_callback = 0;
+ }
+ /* do failure callbacks for all the queries in the
+ * wait for write list and in the id-tree TODO */
if(pend->c->ssl) {
#ifdef HAVE_SSL
SSL_shutdown(pend->c->ssl);
pend->next_free = outnet->tcp_free;
outnet->tcp_free = pend;
}
- cb = w->cb;
- cb_arg = w->cb_arg;
- waiting_tcp_delete(w);
- fptr_ok(fptr_whitelist_pending_tcp(cb));
- (void)(*cb)(NULL, cb_arg, NETEVENT_TIMEOUT, NULL);
+ if(do_callback) {
+ comm_point_callback_type* cb = w->cb;
+ void* cb_arg = w->cb_arg;
+ waiting_tcp_delete(w);
+ fptr_ok(fptr_whitelist_pending_tcp(cb));
+ (void)(*cb)(NULL, cb_arg, NETEVENT_TIMEOUT, NULL);
+ } else {
+ waiting_tcp_delete(w);
+ }
use_free_buffer(outnet);
}
+/** close the oldest reuse_tcp connection to make a fd and struct pend
+ * available for a new stream connection */
+static void
+reuse_tcp_close_oldest(struct outside_network* outnet)
+{
+ struct pending_tcp* pend;
+ if(!outnet->tcp_reuse_last) return;
+ pend = outnet->tcp_reuse_last;
+
+ /* snip off of LRU */
+ log_assert(pend->reuse.next == NULL);
+ if(pend->reuse.prev) {
+ log_assert(pend->reuse.prev->pending);
+ outnet->tcp_reuse_last = pend->reuse.prev->pending;
+ pend->reuse.prev->next = NULL;
+ } else {
+ outnet->tcp_reuse_last = NULL;
+ outnet->tcp_reuse_first = NULL;
+ }
+
+ /* TODO should only close unused in tree, not ones that are in use,
+ * for which we need also a tree to find in-use streams for multiple
+ * queries on them */
+ /* free up */
+ decommission_pending_tcp(outnet, pend);
+}
+
+/** find reuse tcp stream to destination for query, or NULL if none */
+static struct reuse_tcp*
+reuse_tcp_find(struct outside_network* outnet, struct serviced_query* sq)
+{
+ struct waiting_tcp key_w;
+ struct pending_tcp key_p;
+ struct comm_point c;
+ memset(&key_w, 0, sizeof(key_w));
+ memset(&key_p, 0, sizeof(key_p));
+ memset(&c, 0, sizeof(c));
+ key_p.query = &key_w;
+ key_p.c = &c;
+ key_p.reuse.pending = &key_p;
+ key_p.reuse.node.key = &key_p.reuse;
+ if(sq->ssl_upstream) /* something nonNULL for comparisons in tree */
+ key_p.c->ssl = (void*)1;
+ if(sq->addrlen > sizeof(key_w.addr))
+ return NULL;
+ memmove(&key_w.addr, &sq->addr, sq->addrlen);
+ key_w.addrlen = sq->addrlen;
+
+ return (struct reuse_tcp*)rbtree_search(&outnet->tcp_reuse,
+ &key_p.reuse.node);
+}
+
struct waiting_tcp*
pending_tcp_query(struct serviced_query* sq, sldns_buffer* packet,
int timeout, comm_point_callback_type* callback, void* callback_arg)
{
struct pending_tcp* pend = sq->outnet->tcp_free;
+ struct reuse_tcp* reuse = NULL;
struct waiting_tcp* w;
struct timeval tv;
uint16_t id;
+
+ /* find out if a reused stream to the target exists */
+ /* if so, take it into use */
+ reuse = reuse_tcp_find(sq->outnet, sq);
+ if(reuse) {
+ log_assert(reuse->pending);
+ pend = reuse->pending;
+ }
+
+ /* if !pend but we have reuse streams, close a reuse stream
+ * to be able to open a new one to this target, no use waiting
+ * to reuse a file descriptor while another query needs to use
+ * that buffer and file descriptor now. */
+ if(!pend) {
+ reuse_tcp_close_oldest(sq->outnet);
+ pend = sq->outnet->tcp_free;
+ }
+
/* if no buffer is free allocate space to store query */
+ /* TODO: if reuse cannot write right now, store query even though
+ * pend is nonNULL */
w = (struct waiting_tcp*)malloc(sizeof(struct waiting_tcp)
+ (pend?0:sldns_buffer_limit(packet)));
if(!w) {
comm_timer_set(w->timer, &tv);
if(pend) {
/* we have a buffer available right now */
- if(!outnet_tcp_take_into_use(w, sldns_buffer_begin(packet),
- sldns_buffer_limit(packet))) {
- waiting_tcp_delete(w);
- return NULL;
+ if(reuse) {
+ /* if cannot write now, store query and put it
+ * in the waiting list for this stream TODO */
+ /* and also delete it from waitlst if query gone,
+ * eg. sq is deleted TODO */
+ /* and also servfail all waiting queries if
+ * stream closes TODO */
+ /* reuse existing fd, write query and continue */
+ outnet_tcp_take_query_setup(pend->c->fd, pend,
+ sldns_buffer_begin(packet),
+ sldns_buffer_limit(packet));
+ } else {
+ /* create new fd and connect to addr, setup to
+ * write query */
+ if(!outnet_tcp_take_into_use(w,
+ sldns_buffer_begin(packet),
+ sldns_buffer_limit(packet))) {
+ waiting_tcp_delete(w);
+ return NULL;
+ }
}
#ifdef USE_DNSTAP
if(sq->outnet->dtenv &&
}
}
+/** reuse tcp stream, remove serviced query from stream,
+ * return true if the stream is kept, false if it is to be closed */
+static int
+reuse_tcp_remove_serviced_keep(struct waiting_tcp* w,
+ struct serviced_query* sq)
+{
+ struct pending_tcp* pend_tcp = (struct pending_tcp*)w->next_waiting;
+ /* see if can be entered in reuse tree
+ * for that the FD has to be non-1 */
+ if(pend_tcp->c->fd == -1) {
+ return 0;
+ }
+ /* if in tree and used by other queries */
+ if(pend_tcp->reuse.node.key) {
+ /* note less use of stream */
+ /* remove id value used by this svcd. */
+ /* do not reset the keepalive timer, for that
+ * we'd need traffic, and this is where the servicedq is
+ * removed due to state machine internal reasons,
+ * eg. iterator no longer interested in this query */
+ return 1;
+ }
+ /* if still open and want to keep it open */
+ if(pend_tcp->c->fd != -1 && sq->outnet->tcp_reuse.count <
+ sq->outnet->tcp_reuse_max) {
+ /* note less use of stream */
+ /* remove id value used by this svcd. */
+ /* set a keepalive timer on it */
+ if(!reuse_tcp_insert(sq->outnet, pend_tcp)) {
+ return 0;
+ }
+ return 1;
+ }
+ return 0;
+}
+
/** cleanup serviced query entry */
static void
serviced_delete(struct serviced_query* sq)
} else {
struct waiting_tcp* p = (struct waiting_tcp*)
sq->pending;
+ /* TODO: if on stream-write-waiting list then
+ * remove from waiting list and waiting_tcp_delete */
if(p->pkt == NULL) {
- decommission_pending_tcp(sq->outnet,
- (struct pending_tcp*)p->next_waiting);
+ if(!reuse_tcp_remove_serviced_keep(p, sq)) {
+ decommission_pending_tcp(sq->outnet,
+ (struct pending_tcp*)p->next_waiting);
+ use_free_buffer(sq->outnet);
+ }
} else {
waiting_list_remove(sq->outnet, p);
waiting_tcp_delete(p);
struct pending_tcp;
struct waiting_tcp;
struct waiting_udp;
+struct reuse_tcp;
struct infra_cache;
struct port_comm;
struct port_if;
size_t num_tcp;
/** number of tcp communication points in use. */
size_t num_tcp_outgoing;
+ /**
+ * tree of still-open and waiting tcp connections for reuse.
+ * can be closed and reopened to get a new tcp connection.
+ * or reused to the same destination again. with timeout to close.
+ * Entries are of type struct reuse_tcp.
+ * The entries are both active and empty connections.
+ */
+ rbtree_type tcp_reuse;
+ /** max number of tcp_reuse entries we want to keep open */
+ size_t tcp_reuse_max;
+ /** first and last(oldest) in lru list of reuse connections.
+ * the oldest can be closed to get a new free pending_tcp if needed
+ * The list contains empty connections, that wait for timeout or
+ * a new query that can use the existing connection. */
+ struct pending_tcp* tcp_reuse_first, *tcp_reuse_last;
/** list of tcp comm points that are free for use */
struct pending_tcp* tcp_free;
/** list of tcp queries waiting for a buffer */
struct comm_point* cp;
};
+/**
+ * Reuse TCP connection, still open can be used again.
+ */
+struct reuse_tcp {
+ /** rbtree node with links in tcp_reuse tree. key is NULL when not
+ * in tree. Both active and empty connections are in the tree. */
+ rbnode_type node;
+ /** lru chain, so that the oldest can be removed to get a new
+ * connection when all are in (re)use. oldest is last in list.
+ * The lru only contains empty connections waiting for reuse,
+ * the ones with active queries are not on the list because they
+ * do not need to be closed to make space for others. They already
+ * service a query so the close for another query does not help
+ * service a larger number of queries.
+ * TODO
+ */
+ struct reuse_tcp* next, *prev;
+ /** the connection to reuse, the fd is non-1 and is open.
+ * the addr and port determine where the connection is going,
+ * and is key to the rbtree. The SSL ptr determines if it is
+ * a TLS connection or a plain TCP connection there. And TLS
+ * or not is also part of the key to the rbtree.
+ * There is a timeout and read event on the fd, to close it.
+ */
+ struct pending_tcp* pending;
+ /** rbtree with other queries waiting on the connection, by ID number,
+ * of type struct waiting_tcp. It is for looking up received
+ * answers to the structure for callback. And also to see if ID
+ * numbers are unused and can be used for a new query. TODO */
+ rbtree_type tree_by_id;
+ /** list of queries waiting to be written on the channel,
+ * if NULL no queries are waiting to be written and the pending->query
+ * is the query currently serviced. The first is the next in line.
+ * Once written, a query moves to the tree_by_id. TODO */
+ struct waiting_tcp* write_wait_first, *write_wait_last;
+};
+
/**
* A query that has an answer pending for it.
*/
struct comm_point* c;
/** the query being serviced, NULL if the pending_tcp is unused. */
struct waiting_tcp* query;
+ /** the pre-allocated reuse tcp structure. if ->pending is nonNULL
+ * it is in use and the connection is waiting for reuse.
+ * It is here for memory pre-allocation, and used to make this
+ * pending_tcp wait for reuse. */
+ struct reuse_tcp reuse;
};
/**
* if pkt==0, this points to the pending_tcp structure.
*/
struct waiting_tcp* next_waiting;
+ /** next and prev in query waiting list for stream connection */
+ struct waiting_tcp* write_wait_prev, *write_wait_next;
+ /** true if the waiting_tcp structure is on the write_wait queue */
+ int write_wait_queued;
+ /** entry in reuse.tree_by_id, if key is NULL, not in tree, otherwise,
+ * this struct is key and sorted by ID from pending_tcp->id. */
+ rbnode_type id_node;
/** timeout event; timer keeps running whether the query is
* waiting for a buffer or the tcp reply is pending */
struct comm_timer* timer;
/** compare function of serviced query rbtree */
int serviced_cmp(const void* key1, const void* key2);
+/** compare function of reuse_tcp rbtree */
+int reuse_cmp(const void* key1, const void* key2);
+
#endif /* OUTSIDE_NETWORK_H */