/** use the buffer to setup writing the query */
static void
-outnet_tcp_take_query_setup(int s, struct pending_tcp* pend, uint8_t* pkt,
- size_t pkt_len)
+outnet_tcp_take_query_setup(int s, struct pending_tcp* pend,
+ struct waiting_tcp* w)
{
- pend->id = LDNS_ID_WIRE(pkt);
- sldns_buffer_clear(pend->c->buffer);
- sldns_buffer_write(pend->c->buffer, pkt, pkt_len);
- sldns_buffer_flip(pend->c->buffer);
+ struct timeval tv;
+ pend->id = LDNS_ID_WIRE(w->pkt);
+ pend->c->tcp_write_pkt = w->pkt;
+ pend->c->tcp_write_pkt_len = w->pkt_len;
pend->c->tcp_write_and_read = 1;
pend->c->tcp_write_byte_count = 0;
comm_point_start_listening(pend->c, s, -1);
+ /* set timer on the waiting_tcp entry, this is the write timeout
+ * for the written packet. The timer on pend->c is the timer
+ * for when there is no written packet and we have readtimeouts */
+#ifndef S_SPLINT_S
+ tv.tv_sec = w->timeout/1000;
+ tv.tv_usec = (w->timeout%1000)*1000;
+#endif
+ /* if the waiting_tcp was previously waiting for a buffer in the
+ * outside_network.tcpwaitlist, then the timer is reset now that
+ * we start writing it */
+ comm_timer_set(w->timer, &tv);
}
/** use next free buffer to service a tcp query */
pend->c->repinfo.addrlen = w->addrlen;
memcpy(&pend->c->repinfo.addr, &w->addr, w->addrlen);
pend->reuse.pending = pend;
- outnet_tcp_take_query_setup(s, pend, w->pkt, w->pkt_len);
+ outnet_tcp_take_query_setup(s, pend, w);
return 1;
}
decommission_pending_tcp(struct outside_network* outnet,
struct pending_tcp* pend)
{
- verbose(5, "decommision_pending_tcp");
+ verbose(5, "decommission_pending_tcp");
if(pend->c->ssl) {
#ifdef HAVE_SSL
SSL_shutdown(pend->c->ssl);
return 1;
}
+/** perform failure callbacks for waiting queries in reuse write list */
+static void reuse_cb_writewait_for_failure(struct pending_tcp* pend, int err)
+{
+ struct waiting_tcp* w;
+ w = pend->reuse.write_wait_first;
+ while(w) {
+ comm_point_callback_type* cb = w->cb;
+ void* cb_arg = w->cb_arg;
+ fptr_ok(fptr_whitelist_pending_tcp(cb));
+ (void)(*cb)(NULL, cb_arg, err, NULL);
+ w = w->write_wait_next;
+ }
+}
+
+/** perform failure callbacks for waiting queries in reuse read rbtree */
+static void reuse_cb_readwait_for_failure(struct pending_tcp* pend, int err)
+{
+ rbnode_type* node;
+ if(pend->reuse.tree_by_id.root == NULL ||
+ pend->reuse.tree_by_id.root == RBTREE_NULL)
+ return;
+ node = rbtree_first(&pend->reuse.tree_by_id);
+ while(node && node != RBTREE_NULL) {
+ struct waiting_tcp* w = (struct waiting_tcp*)node->key;
+ comm_point_callback_type* cb = w->cb;
+ void* cb_arg = w->cb_arg;
+ fptr_ok(fptr_whitelist_pending_tcp(cb));
+ (void)(*cb)(NULL, cb_arg, err, NULL);
+ node = rbtree_next(node);
+ }
+}
+
+/** perform failure callbacks for current written query in reuse struct */
+static void reuse_cb_curquery_for_failure(struct pending_tcp* pend, int err)
+{
+ struct waiting_tcp* w = pend->query;
+ if(w) {
+ comm_point_callback_type* cb = w->cb;
+ void* cb_arg = w->cb_arg;
+ fptr_ok(fptr_whitelist_pending_tcp(cb));
+ (void)(*cb)(NULL, cb_arg, err, NULL);
+ }
+}
+
+/** delete element from tree by id */
+static void
+reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w)
+{
+ log_assert(w->id_node.key != NULL);
+ rbtree_delete(&reuse->tree_by_id, w);
+ w->id_node.key = NULL;
+}
+
+/** pop the first element from the writewait list */
+static struct waiting_tcp* reuse_write_wait_pop(struct pending_tcp* pend)
+{
+ struct waiting_tcp* w = pend->reuse.write_wait_first;
+ if(!w)
+ return NULL;
+ if(w->write_wait_prev)
+ w->write_wait_prev->write_wait_next = w->write_wait_next;
+ else pend->reuse.write_wait_first = w->write_wait_next;
+ if(w->write_wait_next)
+ w->write_wait_next->write_wait_prev = w->write_wait_prev;
+ else pend->reuse.write_wait_last = w->write_wait_prev;
+ w->write_wait_queued = 0;
+ return w;
+}
+
/** set timeout on tcp fd and setup read event to catch incoming dns msgs */
static void
reuse_tcp_setup_readtimeout(struct pending_tcp* pend_tcp)
struct pending_tcp* pend = (struct pending_tcp*)arg;
struct outside_network* outnet = pend->reuse.outnet;
verbose(VERB_ALGO, "outnettcp cb");
- if(error != NETEVENT_NOERROR) {
+ if(error == NETEVENT_TIMEOUT) {
+ if(pend->c->tcp_write_and_read)
+ verbose(VERB_QUERY, "outnettcp got tcp timeout "
+ "for read, ignored because write underway");
+ else verbose(VERB_QUERY, "outnettcp got tcp timeout %s",
+ (pend->reuse.tree_by_id.count?"for reading pkt":
+ "for keepalive for reuse"));
+ /* if we are writing, ignore readtimer, wait for write timer
+ * or write is done */
+ if(pend->c->tcp_write_and_read)
+ return 0;
+ /* must be timeout for reading or keepalive reuse,
+ * close it. */
+ reuse_tcp_remove_tree_list(outnet, &pend->reuse);
+ } else if(error == NETEVENT_PKT_WRITTEN) {
+ /* the packet we want to write has been written. */
+ log_assert(c == pend->c);
+ log_assert(pend->query->pkt == pend->c->tcp_write_pkt);
+ log_assert(pend->query->pkt_len == pend->c->tcp_write_pkt_len);
+ pend->c->tcp_write_pkt = NULL;
+ pend->c->tcp_write_pkt_len = 0;
+ /* the pend.query is already in tree_by_id */
+ pend->query = NULL;
+ /* setup to write next packet or setup read timeout */
+ if(pend->reuse.write_wait_first) {
+ pend->query = reuse_write_wait_pop(pend);
+ outnet_tcp_take_query_setup(pend->c->fd, pend,
+ pend->query);
+ } else {
+ reuse_tcp_setup_readtimeout(pend);
+ }
+ } else if(error != NETEVENT_NOERROR) {
verbose(VERB_QUERY, "outnettcp got tcp error %d", error);
/* pass error below and exit */
} else {
}
}
if(pend->query) {
+ reuse_tree_by_id_delete(&pend->reuse, pend->query);
fptr_ok(fptr_whitelist_pending_tcp(pend->query->cb));
(void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info);
waiting_tcp_delete(pend->query);
return 0;
}
verbose(5, "outnet_tcp_cb reuse after cb: decommission it");
- /* no queries on it, no space to keep it. Close it */
+ /* no queries on it, no space to keep it. or timeout or closed due
+ * to error. Close it */
+ reuse_cb_readwait_for_failure(pend, (error==NETEVENT_TIMEOUT?
+ NETEVENT_TIMEOUT:NETEVENT_CLOSED));
+ reuse_cb_writewait_for_failure(pend, (error==NETEVENT_TIMEOUT?
+ NETEVENT_TIMEOUT:NETEVENT_CLOSED));
decommission_pending_tcp(outnet, pend);
use_free_buffer(outnet);
return 0;
return pend;
}
-/** perform failure callbacks for waiting queries in reuse write list */
-static void reuse_cb_writewait_for_failure(struct pending_tcp* pend, int err)
-{
- struct waiting_tcp* w;
- w = pend->reuse.write_wait_first;
- while(w) {
- comm_point_callback_type* cb = w->cb;
- void* cb_arg = w->cb_arg;
- fptr_ok(fptr_whitelist_pending_tcp(cb));
- (void)(*cb)(NULL, cb_arg, err, NULL);
- w = w->write_wait_next;
- }
-}
-
-/** perform failure callbacks for waiting queries in reuse read rbtree */
-static void reuse_cb_readwait_for_failure(struct pending_tcp* pend, int err)
-{
- rbnode_type* node;
- if(pend->reuse.tree_by_id.root == NULL ||
- pend->reuse.tree_by_id.root == RBTREE_NULL)
- return;
- node = rbtree_first(&pend->reuse.tree_by_id);
- while(node && node != RBTREE_NULL) {
- struct waiting_tcp* w = (struct waiting_tcp*)node->key;
- comm_point_callback_type* cb = w->cb;
- void* cb_arg = w->cb_arg;
- fptr_ok(fptr_whitelist_pending_tcp(cb));
- (void)(*cb)(NULL, cb_arg, err, NULL);
- node = rbtree_next(node);
- }
-}
-
-/** perform failure callbacks for current written query in reuse struct */
-static void reuse_cb_curquery_for_failure(struct pending_tcp* pend, int err)
-{
- struct waiting_tcp* w = pend->query;
- if(w) {
- comm_point_callback_type* cb = w->cb;
- void* cb_arg = w->cb_arg;
- fptr_ok(fptr_whitelist_pending_tcp(cb));
- (void)(*cb)(NULL, cb_arg, err, NULL);
- }
-}
-
void
outnet_tcptimer(void* arg)
{
return NULL;
}
+/** insert element in tree by id */
+static void
+reuse_tree_by_id_insert(struct reuse_tcp* reuse, struct waiting_tcp* w)
+{
+ log_assert(w->id_node.key == NULL);
+ w->id_node.key = w;
+ rbtree_insert(&reuse->tree_by_id, &w->id_node);
+}
+
/** find element in tree by id */
static struct waiting_tcp*
reuse_tcp_by_id_find(struct reuse_tcp* reuse, uint16_t id)
struct pending_tcp* pend = sq->outnet->tcp_free;
struct reuse_tcp* reuse = NULL;
struct waiting_tcp* w;
- struct timeval tv;
uint16_t id;
verbose(5, "pending_tcp_query");
w->cb_arg = callback_arg;
w->ssl_upstream = sq->ssl_upstream;
w->tls_auth_name = sq->tls_auth_name;
-#ifndef S_SPLINT_S
- tv.tv_sec = timeout/1000;
- tv.tv_usec = (timeout%1000)*1000;
-#endif
- comm_timer_set(w->timer, &tv);
+ w->timeout = timeout;
if(pend) {
/* we have a buffer available right now */
if(reuse) {
/* if cannot write now, store query and put it
* in the waiting list for this stream TODO */
/* and insert in tree_by_id */
+ reuse_tree_by_id_insert(&pend->reuse, w);
/* and also delete it from waitlst if query gone,
* eg. sq is deleted TODO */
/* and also servfail all waiting queries if
* stream closes TODO */
/* reuse existing fd, write query and continue */
w->next_waiting = (void*)pend;
- pend->query = w;
- outnet_tcp_take_query_setup(pend->c->fd, pend,
- w->pkt, w->pkt_len);
+ if(pend->query == NULL) {
+ /* write straight away */
+ pend->query = w;
+ outnet_tcp_take_query_setup(pend->c->fd, pend,
+ w);
+ }
} else {
verbose(5, "pending_tcp_query: new fd, connect");
/* create new fd and connect to addr, setup to
comm_tcp, sq->zone, sq->zonelen, packet);
#endif
} else {
+ struct timeval tv;
/* queue up */
verbose(5, "pending_tcp_query: queue to wait");
w->next_waiting = NULL;
else sq->outnet->tcp_wait_first = w;
sq->outnet->tcp_wait_last = w;
w->on_tcp_waiting_list = 1;
+#ifndef S_SPLINT_S
+ tv.tv_sec = timeout/1000;
+ tv.tv_usec = (timeout%1000)*1000;
+#endif
+ comm_timer_set(w->timer, &tv);
}
return w;
}