return 1;
}
+/** log reuse item addr and ptr with message */
+static void
+log_reuse_tcp(enum verbosity_value v, const char* msg, struct reuse_tcp* reuse)
+{
+ uint16_t port;
+ char addrbuf[128];
+ if(verbosity < v) return;
+ addr_to_str(&reuse->addr, reuse->addrlen, addrbuf, sizeof(addrbuf));
+ port = ntohs(((struct sockaddr_in*)&reuse->addr)->sin_port);
+ verbose(v, "%s %s#%u 0x%llx fd %d", msg, addrbuf, (unsigned)port,
+ (unsigned long long)reuse, reuse->pending->c->fd);
+}
+
+/** pop the first element from the writewait list */
+static struct waiting_tcp* reuse_write_wait_pop(struct reuse_tcp* reuse)
+{
+ struct waiting_tcp* w = reuse->write_wait_first;
+ if(!w)
+ return NULL;
+ log_assert(w->write_wait_queued);
+ if(w->write_wait_prev)
+ w->write_wait_prev->write_wait_next = w->write_wait_next;
+ else reuse->write_wait_first = w->write_wait_next;
+ if(w->write_wait_next)
+ w->write_wait_next->write_wait_prev = w->write_wait_prev;
+ else reuse->write_wait_last = w->write_wait_prev;
+ w->write_wait_queued = 0;
+ return w;
+}
+
+/** push the element after the last on the writewait list */
+static void reuse_write_wait_push_back(struct reuse_tcp* reuse,
+ struct waiting_tcp* w)
+{
+ if(!w) return;
+ log_assert(!w->write_wait_queued);
+ if(reuse->write_wait_last) {
+ reuse->write_wait_last->write_wait_next = w;
+ w->write_wait_prev = reuse->write_wait_last;
+ } else {
+ reuse->write_wait_first = w;
+ }
+ reuse->write_wait_last = w;
+ w->write_wait_queued = 1;
+}
+
+/** insert element in tree by id */
+static void
+reuse_tree_by_id_insert(struct reuse_tcp* reuse, struct waiting_tcp* w)
+{
+ log_assert(w->id_node.key == NULL);
+ w->id_node.key = w;
+ rbtree_insert(&reuse->tree_by_id, &w->id_node);
+}
+
+/** find reuse tcp stream to destination for query, or NULL if none */
+static struct reuse_tcp*
+reuse_tcp_find(struct outside_network* outnet, struct sockaddr_storage* addr,
+ socklen_t addrlen, int use_ssl)
+{
+ struct waiting_tcp key_w;
+ struct pending_tcp key_p;
+ struct comm_point c;
+ rbnode_type* result = NULL, *prev;
+ verbose(5, "reuse_tcp_find");
+ memset(&key_w, 0, sizeof(key_w));
+ memset(&key_p, 0, sizeof(key_p));
+ memset(&c, 0, sizeof(c));
+ key_p.query = &key_w;
+ key_p.c = &c;
+ key_p.reuse.pending = &key_p;
+ key_p.reuse.node.key = &key_p.reuse;
+ if(use_ssl) /* something nonNULL for comparisons in tree */
+ key_p.c->ssl = (void*)1;
+ if(addrlen > sizeof(key_p.reuse.addr))
+ return NULL;
+ memmove(&key_p.reuse.addr, addr, addrlen);
+ key_p.reuse.addrlen = addrlen;
+
+ verbose(5, "reuse_tcp_find: num reuse streams %u",
+ (unsigned)outnet->tcp_reuse.count);
+ if(outnet->tcp_reuse.root == NULL ||
+ outnet->tcp_reuse.root == RBTREE_NULL)
+ return NULL;
+ if(rbtree_find_less_equal(&outnet->tcp_reuse, &key_p.reuse.node,
+ &result)) {
+ /* exact match */
+ /* but the key is on stack, and ptr is compared, impossible */
+ log_assert(&key_p.reuse != (struct reuse_tcp*)result);
+ log_assert(&key_p != ((struct reuse_tcp*)result)->pending);
+ }
+ /* not found, return null */
+ if(!result || result == RBTREE_NULL)
+ return NULL;
+ verbose(5, "reuse_tcp_find check inexact match");
+ /* inexact match, find one of possibly several connections to the
+ * same destination address, with the correct port, ssl, and
+ * also less than max number of open queries, or else, fail to open
+ * a new one */
+ /* rewind to start of sequence of same address,port,ssl */
+ prev = rbtree_previous(result);
+ while(prev && prev != RBTREE_NULL &&
+ reuse_cmp_addrportssl(prev->key, &key_p.reuse) == 0) {
+ result = prev;
+ prev = rbtree_previous(result);
+ }
+
+ /* loop to find first one that has correct characteristics */
+ while(result && result != RBTREE_NULL &&
+ reuse_cmp_addrportssl(result->key, &key_p.reuse) == 0) {
+ if(((struct reuse_tcp*)result)->tree_by_id.count <
+ MAX_REUSE_TCP_QUERIES) {
+ /* same address, port, ssl-yes-or-no, and has
+ * space for another query */
+ return (struct reuse_tcp*)result;
+ }
+ result = rbtree_next(result);
+ }
+ return NULL;
+}
+
/** use the buffer to setup writing the query */
static void
outnet_tcp_take_query_setup(int s, struct pending_tcp* pend,
pend->c->repinfo.addrlen = w->addrlen;
memcpy(&pend->c->repinfo.addr, &w->addr, w->addrlen);
pend->reuse.pending = pend;
+ reuse_tree_by_id_insert(&pend->reuse, w);
outnet_tcp_take_query_setup(s, pend, w);
return 1;
}
struct waiting_tcp* w;
while(outnet->tcp_free && outnet->tcp_wait_first
&& !outnet->want_to_quit) {
+ struct reuse_tcp* reuse = NULL;
w = outnet->tcp_wait_first;
outnet->tcp_wait_first = w->next_waiting;
if(outnet->tcp_wait_last == w)
outnet->tcp_wait_last = NULL;
w->on_tcp_waiting_list = 0;
- if(!outnet_tcp_take_into_use(w)) {
- comm_point_callback_type* cb = w->cb;
- void* cb_arg = w->cb_arg;
- waiting_tcp_delete(w);
- fptr_ok(fptr_whitelist_pending_tcp(cb));
- (void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL);
+ reuse = reuse_tcp_find(outnet, &w->addr, w->addrlen,
+ w->ssl_upstream);
+ if(reuse) {
+ log_reuse_tcp(5, "use free buffer for waiting tcp: "
+ "found reuse", reuse);
+ if(reuse->pending->query) {
+ /* on the write wait list */
+ comm_timer_disable(w->timer);
+ w->next_waiting = (void*)reuse->pending;
+ reuse_tree_by_id_insert(reuse, w);
+ reuse_write_wait_push_back(reuse, w);
+ }
+ } else {
+ if(!outnet_tcp_take_into_use(w)) {
+ comm_point_callback_type* cb = w->cb;
+ void* cb_arg = w->cb_arg;
+ waiting_tcp_delete(w);
+ fptr_ok(fptr_whitelist_pending_tcp(cb));
+ (void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL);
+ }
}
}
}
reuse_del_writewait(pend);
}
-/** log reuse item addr and ptr with message */
-static void
-log_reuse_tcp(enum verbosity_value v, const char* msg, struct reuse_tcp* reuse)
-{
- uint16_t port;
- char addrbuf[128];
- if(verbosity < v) return;
- addr_to_str(&reuse->addr, reuse->addrlen, addrbuf, sizeof(addrbuf));
- port = ntohs(((struct sockaddr_in*)&reuse->addr)->sin_port);
- verbose(v, "%s %s#%u 0x%llx fd %d", msg, addrbuf, (unsigned)port,
- (unsigned long long)reuse, reuse->pending->c->fd);
-}
-
/** insert into reuse tcp tree and LRU, false on failure (duplicate) */
static int
reuse_tcp_insert(struct outside_network* outnet, struct pending_tcp* pend_tcp)
w->id_node.key = NULL;
}
-/** pop the first element from the writewait list */
-static struct waiting_tcp* reuse_write_wait_pop(struct pending_tcp* pend)
-{
- struct waiting_tcp* w = pend->reuse.write_wait_first;
- if(!w)
- return NULL;
- if(w->write_wait_prev)
- w->write_wait_prev->write_wait_next = w->write_wait_next;
- else pend->reuse.write_wait_first = w->write_wait_next;
- if(w->write_wait_next)
- w->write_wait_next->write_wait_prev = w->write_wait_prev;
- else pend->reuse.write_wait_last = w->write_wait_prev;
- w->write_wait_queued = 0;
- return w;
-}
-
/** set timeout on tcp fd and setup read event to catch incoming dns msgs */
static void
reuse_tcp_setup_readtimeout(struct pending_tcp* pend_tcp)
pend->query = NULL;
/* setup to write next packet or setup read timeout */
if(pend->reuse.write_wait_first) {
- pend->query = reuse_write_wait_pop(pend);
+ pend->query = reuse_write_wait_pop(&pend->reuse);
outnet_tcp_take_query_setup(pend->c->fd, pend,
pend->query);
} else {
decommission_pending_tcp(outnet, pend);
}
-/** find reuse tcp stream to destination for query, or NULL if none */
-static struct reuse_tcp*
-reuse_tcp_find(struct outside_network* outnet, struct serviced_query* sq)
-{
- struct waiting_tcp key_w;
- struct pending_tcp key_p;
- struct comm_point c;
- rbnode_type* result = NULL, *prev;
- verbose(5, "reuse_tcp_find");
- memset(&key_w, 0, sizeof(key_w));
- memset(&key_p, 0, sizeof(key_p));
- memset(&c, 0, sizeof(c));
- key_p.query = &key_w;
- key_p.c = &c;
- key_p.reuse.pending = &key_p;
- key_p.reuse.node.key = &key_p.reuse;
- if(sq->ssl_upstream) /* something nonNULL for comparisons in tree */
- key_p.c->ssl = (void*)1;
- if(sq->addrlen > sizeof(key_p.reuse.addr))
- return NULL;
- memmove(&key_p.reuse.addr, &sq->addr, sq->addrlen);
- key_p.reuse.addrlen = sq->addrlen;
-
- verbose(5, "reuse_tcp_find: num reuse streams %u",
- (unsigned)outnet->tcp_reuse.count);
- if(outnet->tcp_reuse.root == NULL ||
- outnet->tcp_reuse.root == RBTREE_NULL)
- return NULL;
- if(rbtree_find_less_equal(&outnet->tcp_reuse, &key_p.reuse.node,
- &result)) {
- /* exact match */
- /* but the key is on stack, and ptr is compared, impossible */
- log_assert(&key_p.reuse != (struct reuse_tcp*)result);
- log_assert(&key_p != ((struct reuse_tcp*)result)->pending);
- }
- /* not found, return null */
- if(!result || result == RBTREE_NULL)
- return NULL;
- verbose(5, "reuse_tcp_find check inexact match");
- /* inexact match, find one of possibly several connections to the
- * same destination address, with the correct port, ssl, and
- * also less than max number of open queries, or else, fail to open
- * a new one */
- /* rewind to start of sequence of same address,port,ssl */
- prev = rbtree_previous(result);
- while(prev && prev != RBTREE_NULL &&
- reuse_cmp_addrportssl(prev->key, &key_p.reuse) == 0) {
- result = prev;
- prev = rbtree_previous(result);
- }
-
- /* loop to find first one that has correct characteristics */
- while(result && result != RBTREE_NULL &&
- reuse_cmp_addrportssl(result->key, &key_p.reuse) == 0) {
- if(((struct reuse_tcp*)result)->tree_by_id.count <
- MAX_REUSE_TCP_QUERIES) {
- /* same address, port, ssl-yes-or-no, and has
- * space for another query */
- return (struct reuse_tcp*)result;
- }
- result = rbtree_next(result);
- }
- return NULL;
-}
-
-/** insert element in tree by id */
-static void
-reuse_tree_by_id_insert(struct reuse_tcp* reuse, struct waiting_tcp* w)
-{
- log_assert(w->id_node.key == NULL);
- w->id_node.key = w;
- rbtree_insert(&reuse->tree_by_id, &w->id_node);
-}
-
/** find element in tree by id */
static struct waiting_tcp*
reuse_tcp_by_id_find(struct reuse_tcp* reuse, uint16_t id)
verbose(5, "pending_tcp_query");
/* find out if a reused stream to the target exists */
/* if so, take it into use */
- reuse = reuse_tcp_find(sq->outnet, sq);
+ reuse = reuse_tcp_find(sq->outnet, &sq->addr, sq->addrlen,
+ sq->ssl_upstream);
if(reuse) {
log_reuse_tcp(5, "pending_tcp_query: found reuse", reuse);
log_assert(reuse->pending);
w->ssl_upstream = sq->ssl_upstream;
w->tls_auth_name = sq->tls_auth_name;
w->timeout = timeout;
+ w->id_node.key = NULL;
+ w->write_wait_prev = NULL;
+ w->write_wait_next = NULL;
+ w->write_wait_queued = 0;
if(pend) {
/* we have a buffer available right now */
if(reuse) {
verbose(5, "pending_tcp_query: reuse, store");
- /* if cannot write now, store query and put it
- * in the waiting list for this stream TODO */
- /* and insert in tree_by_id */
- reuse_tree_by_id_insert(&pend->reuse, w);
- /* and also delete it from waitlst if query gone,
- * eg. sq is deleted TODO */
- /* and also servfail all waiting queries if
- * stream closes TODO */
/* reuse existing fd, write query and continue */
+ /* store query in tree by id */
w->next_waiting = (void*)pend;
+ reuse_tree_by_id_insert(&pend->reuse, w);
+ /* can we write right now? */
if(pend->query == NULL) {
/* write straight away */
pend->query = w;
outnet_tcp_take_query_setup(pend->c->fd, pend,
w);
+ } else {
+ /* put it in the waiting list for
+ * this stream */
+ reuse_write_wait_push_back(&pend->reuse, w);
}
} else {
verbose(5, "pending_tcp_query: new fd, connect");
return NULL;
}
}
-#ifdef USE_DNSTAP
- if(sq->outnet->dtenv &&
- (sq->outnet->dtenv->log_resolver_query_messages ||
- sq->outnet->dtenv->log_forwarder_query_messages))
- dt_msg_send_outside_query(sq->outnet->dtenv, &sq->addr,
- comm_tcp, sq->zone, sq->zonelen, packet);
-#endif
} else {
struct timeval tv;
/* queue up */
#endif
comm_timer_set(w->timer, &tv);
}
+#ifdef USE_DNSTAP
+ if(sq->outnet->dtenv &&
+ (sq->outnet->dtenv->log_resolver_query_messages ||
+ sq->outnet->dtenv->log_forwarder_query_messages))
+ dt_msg_send_outside_query(sq->outnet->dtenv, &sq->addr,
+ comm_tcp, sq->zone, sq->zonelen, packet);
+#endif
return w;
}