if(!w)
return NULL;
log_assert(w->write_wait_queued);
+ log_assert(!w->write_wait_prev);
+ reuse->write_wait_first = w->write_wait_next;
+ if(w->write_wait_next)
+ w->write_wait_next->write_wait_prev = NULL;
+ else reuse->write_wait_last = NULL;
+ w->write_wait_queued = 0;
+ return w;
+}
+
+/** remove the element from the writewait list */
+static void reuse_write_wait_remove(struct reuse_tcp* reuse,
+ struct waiting_tcp* w)
+{
+ if(!w)
+ return;
+ if(!w->write_wait_queued)
+ return;
if(w->write_wait_prev)
w->write_wait_prev->write_wait_next = w->write_wait_next;
else reuse->write_wait_first = w->write_wait_next;
w->write_wait_next->write_wait_prev = w->write_wait_prev;
else reuse->write_wait_last = w->write_wait_prev;
w->write_wait_queued = 0;
- return w;
}
/** push the element after the last on the writewait list */
return 1;
}
+/** call callback on waiting_tcp, if not NULL */
+static void
+waiting_tcp_callback(struct waiting_tcp* w, struct comm_point* c, int error,
+ struct comm_reply* reply_info)
+{
+ if(w->cb) {
+ fptr_ok(fptr_whitelist_pending_tcp(w->cb));
+ (void)(*w->cb)(c, w->cb_arg, error, reply_info);
+ }
+}
+
/** see if buffers can be used to service TCP queries */
static void
use_free_buffer(struct outside_network* outnet)
}
} else {
if(!outnet_tcp_take_into_use(w)) {
- comm_point_callback_type* cb = w->cb;
- void* cb_arg = w->cb_arg;
+ waiting_tcp_callback(w, NULL, NETEVENT_CLOSED,
+ NULL);
waiting_tcp_delete(w);
- fptr_ok(fptr_whitelist_pending_tcp(cb));
- (void)(*cb)(NULL, cb_arg, NETEVENT_CLOSED, NULL);
}
}
}
struct waiting_tcp* w;
w = pend->reuse.write_wait_first;
while(w) {
- comm_point_callback_type* cb = w->cb;
- void* cb_arg = w->cb_arg;
- fptr_ok(fptr_whitelist_pending_tcp(cb));
- (void)(*cb)(NULL, cb_arg, err, NULL);
+ waiting_tcp_callback(w, NULL, err, NULL);
w = w->write_wait_next;
}
}
node = rbtree_first(&pend->reuse.tree_by_id);
while(node && node != RBTREE_NULL) {
struct waiting_tcp* w = (struct waiting_tcp*)node->key;
- comm_point_callback_type* cb = w->cb;
- void* cb_arg = w->cb_arg;
- fptr_ok(fptr_whitelist_pending_tcp(cb));
- (void)(*cb)(NULL, cb_arg, err, NULL);
+ waiting_tcp_callback(w, NULL, err, NULL);
node = rbtree_next(node);
}
}
{
struct waiting_tcp* w = pend->query;
if(w) {
- comm_point_callback_type* cb = w->cb;
- void* cb_arg = w->cb_arg;
- fptr_ok(fptr_whitelist_pending_tcp(cb));
- (void)(*cb)(NULL, cb_arg, err, NULL);
+ waiting_tcp_callback(w, NULL, err, NULL);
}
}
/** set timeout on tcp fd and setup read event to catch incoming dns msgs */
static void
-reuse_tcp_setup_readtimeout(struct pending_tcp* pend_tcp)
+reuse_tcp_setup_timeout(struct pending_tcp* pend_tcp)
+{
+ log_reuse_tcp(5, "reuse_tcp_setup_timeout", &pend_tcp->reuse);
+ comm_point_start_listening(pend_tcp->c, -1, REUSE_TIMEOUT);
+}
+
+/** set timeout on tcp fd and setup read event to catch incoming dns msgs */
+static void
+reuse_tcp_setup_read_and_timeout(struct pending_tcp* pend_tcp)
{
log_reuse_tcp(5, "reuse_tcp_setup_readtimeout", &pend_tcp->reuse);
sldns_buffer_clear(pend_tcp->c->buffer);
outnet_tcp_take_query_setup(pend->c->fd, pend,
pend->query);
} else {
- reuse_tcp_setup_readtimeout(pend);
+ reuse_tcp_setup_timeout(pend);
}
return 0;
} else if(error != NETEVENT_NOERROR) {
* query again to the same destination. */
if(outnet->tcp_reuse.count < outnet->tcp_reuse_max) {
(void)reuse_tcp_insert(outnet, pend);
- reuse_tcp_setup_readtimeout(pend);
}
}
if(pend->query) {
reuse_tree_by_id_delete(&pend->reuse, pend->query);
- fptr_ok(fptr_whitelist_pending_tcp(pend->query->cb));
- (void)(*pend->query->cb)(c, pend->query->cb_arg, error, reply_info);
+ waiting_tcp_callback(pend->query, c, error, reply_info);
waiting_tcp_delete(pend->query);
pend->query = NULL;
}
verbose(5, "outnet_tcp_cb reuse after cb: keep it");
/* it is in the reuse_tcp tree, with other queries, or
* on the empty list. do not decommission it */
+ reuse_tcp_setup_read_and_timeout(pend);
return 0;
}
verbose(5, "outnet_tcp_cb reuse after cb: decommission it");
verbose(5, "outnet_tcptimer");
if(w->on_tcp_waiting_list) {
/* it is on the waiting list */
- comm_point_callback_type* cb = w->cb;
- void* cb_arg = w->cb_arg;
waiting_list_remove(outnet, w);
+ waiting_tcp_callback(w, NULL, NETEVENT_TIMEOUT, NULL);
waiting_tcp_delete(w);
- fptr_ok(fptr_whitelist_pending_tcp(cb));
- (void)(*cb)(NULL, cb_arg, NETEVENT_TIMEOUT, NULL);
} else {
/* it was in use */
struct pending_tcp* pend=(struct pending_tcp*)w->next_waiting;
{
struct pending_tcp* pend_tcp = (struct pending_tcp*)w->next_waiting;
verbose(5, "reuse_tcp_remove_serviced_keep");
+ /* remove the callback. let query continue to write to not cancel
+ * the stream itself. also keep it as an entry in the tree_by_id,
+ * in case the answer returns (that we no longer want), but we cannot
+ * pick the same ID number meanwhile */
+ pend_tcp->query->cb = NULL;
/* see if can be entered in reuse tree
* for that the FD has to be non-1 */
if(pend_tcp->c->fd == -1) {
/* if in tree and used by other queries */
if(pend_tcp->reuse.node.key) {
verbose(5, "reuse_tcp_remove_serviced_keep: in use by other queries");
- /* note less use of stream */
- /* remove id value used by this svcd. */
/* do not reset the keepalive timer, for that
- * we'd need traffic, and this is where the servicedq is
+ * we'd need traffic, and this is where the serviced is
* removed due to state machine internal reasons,
* eg. iterator no longer interested in this query */
return 1;
if(pend_tcp->c->fd != -1 && sq->outnet->tcp_reuse.count <
sq->outnet->tcp_reuse_max) {
verbose(5, "reuse_tcp_remove_serviced_keep: keep open");
- /* note less use of stream */
- /* remove id value used by this svcd. */
/* set a keepalive timer on it */
if(!reuse_tcp_insert(sq->outnet, pend_tcp)) {
return 0;
}
- reuse_tcp_setup_readtimeout(pend_tcp);
+ reuse_tcp_setup_timeout(pend_tcp);
return 1;
}
return 0;
* mesh */
outnet_send_wait_udp(sq->outnet);
} else {
- struct waiting_tcp* p = (struct waiting_tcp*)
+ struct waiting_tcp* w = (struct waiting_tcp*)
sq->pending;
verbose(5, "serviced_delete: TCP");
- /* TODO: if on stream-write-waiting list then
+ /* if on stream-write-waiting list then
* remove from waiting list and waiting_tcp_delete */
- if(!p->on_tcp_waiting_list) {
+ if(w->write_wait_queued) {
+ struct pending_tcp* pend =
+ (struct pending_tcp*)w->next_waiting;
+ verbose(5, "serviced_delete: writewait");
+ reuse_tree_by_id_delete(&pend->reuse, w);
+ reuse_write_wait_remove(&pend->reuse, w);
+ waiting_tcp_delete(w);
+ } else if(!w->on_tcp_waiting_list) {
+ struct pending_tcp* pend =
+ (struct pending_tcp*)w->next_waiting;
verbose(5, "serviced_delete: tcpreusekeep");
- if(!reuse_tcp_remove_serviced_keep(p, sq)) {
+ if(!reuse_tcp_remove_serviced_keep(w, sq)) {
reuse_cb_curquery_for_failure(
- (struct pending_tcp*)p->
- next_waiting, NETEVENT_CLOSED);
+ pend, NETEVENT_CLOSED);
reuse_cb_readwait_for_failure(
- (struct pending_tcp*)p->
- next_waiting, NETEVENT_CLOSED);
+ pend, NETEVENT_CLOSED);
reuse_cb_writewait_for_failure(
- (struct pending_tcp*)p->
- next_waiting, NETEVENT_CLOSED);
+ pend, NETEVENT_CLOSED);
decommission_pending_tcp(sq->outnet,
- (struct pending_tcp*)p->next_waiting);
+ pend);
use_free_buffer(sq->outnet);
}
+ sq->pending = NULL;
} else {
verbose(5, "serviced_delete: tcpwait");
- waiting_list_remove(sq->outnet, p);
- waiting_tcp_delete(p);
+ waiting_list_remove(sq->outnet, w);
+ waiting_tcp_delete(w);
}
}
}