}
}
+/** add waiting_tcp element to the outnet tcp waiting list */
+static void
+outnet_add_tcp_waiting(struct outside_network* outnet, struct waiting_tcp* w)
+{
+ struct timeval tv;
+ w->next_waiting = NULL;
+ if(outnet->tcp_wait_last)
+ outnet->tcp_wait_last->next_waiting = w;
+ else outnet->tcp_wait_first = w;
+ outnet->tcp_wait_last = w;
+ w->on_tcp_waiting_list = 1;
+#ifndef S_SPLINT_S
+ tv.tv_sec = w->timeout/1000;
+ tv.tv_usec = (w->timeout%1000)*1000;
+#endif
+ comm_timer_set(w->timer, &tv);
+}
+
+/** delete element from tree by id */
+static void
+reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w)
+{
+ log_assert(w->id_node.key != NULL);
+ rbtree_delete(&reuse->tree_by_id, w);
+ w->id_node.key = NULL;
+}
+
+/** more writewait list to go for another connection. */
+static void
+reuse_move_writewait_away(struct outside_network* outnet,
+ struct pending_tcp* pend)
+{
+ /* the writewait list has not been written yet, so if the
+ * stream was closed, they have not actually been failed, only
+ * the queries written. Other queries can get written to another
+ * stream. For upstreams that do not support multiple queries
+ * and answers, the stream can get closed, and then the queries
+ * can get written on a new socket */
+ struct waiting_tcp* w;
+ if(pend->query && pend->query->error_count == 0 &&
+ pend->c->tcp_write_pkt == pend->query->pkt &&
+ pend->c->tcp_write_pkt_len == pend->query->pkt_len) {
+ /* since the current query is not written, it can also
+ * move to a free buffer */
+ verbose(5, "reuse_move_writewait_away current %d done",
+ (int)pend->c->tcp_write_byte_count);
+ pend->c->tcp_write_pkt = NULL;
+ pend->c->tcp_write_pkt_len = 0;
+ pend->c->tcp_write_and_read = 0;
+ pend->c->tcp_more_read_again = 0;
+ pend->c->tcp_more_write_again = 0;
+ pend->c->tcp_is_reading = 1;
+ w = pend->query;
+ pend->query = NULL;
+ /* increase error count, so that if the next socket fails too
+ * the server selection is run again with this query failed
+ * and it can select a different server (if possible), or
+ * fail the query */
+ w->error_count ++;
+ reuse_tree_by_id_delete(&pend->reuse, w);
+ outnet_add_tcp_waiting(outnet, w);
+ }
+ while((w = reuse_write_wait_pop(&pend->reuse)) != NULL) {
+ verbose(5, "reuse_move_writewait_away item");
+ reuse_tree_by_id_delete(&pend->reuse, w);
+ outnet_add_tcp_waiting(outnet, w);
+ }
+}
+
/** remove reused element from tree and lru list */
static void
reuse_tcp_remove_tree_list(struct outside_network* outnet,
reuse_del_readwait(&store);
}
-/** delete element from tree by id */
-static void
-reuse_tree_by_id_delete(struct reuse_tcp* reuse, struct waiting_tcp* w)
-{
- log_assert(w->id_node.key != NULL);
- rbtree_delete(&reuse->tree_by_id, w);
- w->id_node.key = NULL;
-}
-
/** set timeout on tcp fd and setup read event to catch incoming dns msgs */
static void
reuse_tcp_setup_timeout(struct pending_tcp* pend_tcp)
return 0;
} else if(error != NETEVENT_NOERROR) {
verbose(VERB_QUERY, "outnettcp got tcp error %d", error);
+ reuse_move_writewait_away(outnet, pend);
/* pass error below and exit */
} else {
/* check ID */
w->write_wait_prev = NULL;
w->write_wait_next = NULL;
w->write_wait_queued = 0;
+ w->error_count = 0;
if(pend) {
/* we have a buffer available right now */
if(reuse) {
/* queue up */
/* waiting for a buffer on the outside network buffer wait
* list */
- struct timeval tv;
verbose(5, "pending_tcp_query: queue to wait");
- w->next_waiting = NULL;
- if(sq->outnet->tcp_wait_last)
- sq->outnet->tcp_wait_last->next_waiting = w;
- else sq->outnet->tcp_wait_first = w;
- sq->outnet->tcp_wait_last = w;
- w->on_tcp_waiting_list = 1;
-#ifndef S_SPLINT_S
- tv.tv_sec = timeout/1000;
- tv.tv_usec = (timeout%1000)*1000;
-#endif
- comm_timer_set(w->timer, &tv);
+ outnet_add_tcp_waiting(sq->outnet, w);
}
#ifdef USE_DNSTAP
if(sq->outnet->dtenv &&