if [ $ret != 0 ]; then echo_i "failed"; fi
status=`expr $status + $ret`
-# flush resolver so queries will be from others again
-$RNDCCMD 10.53.0.4 flush
-sleep 1
-
echo_i "check pipelined TCP queries using mdig"
ret=0
+$RNDCCMD 10.53.0.4 flush
+sleep 1
$MDIG $MDIGOPTS +noall +answer +vc -f input -b 10.53.0.4 @10.53.0.4 > raw.mdig
awk '{ print $1 " " $5 }' < raw.mdig > output.mdig
sort < output.mdig > output-sorted.mdig
echo_i "check keep-response-order"
ret=0
+$RNDCCMD 10.53.0.4 flush
+sleep 1
$PIPEQUERIES -p ${PORT} ++ < inputb > rawb || ret=1
awk '{ print $1 " " $5 }' < rawb > outputb
$DIFF refb outputb || ret=1
echo_i "check keep-response-order using mdig"
ret=0
+$RNDCCMD 10.53.0.4 flush
+sleep 1
$MDIG $MDIGOPTS +noall +answer +vc -f inputb -b 10.53.0.7 @10.53.0.4 > rawb.mdig
awk '{ print $1 " " $5 }' < rawb.mdig > outputb.mdig
$DIFF refb outputb.mdig || ret=1
echo_i "check mdig -4 -6"
ret=0
+$RNDCCMD 10.53.0.4 flush
+sleep 1
$MDIG $MDIGOPTS -4 -6 -f input @10.53.0.4 > output46.mdig 2>&1 && ret=1
grep "only one of -4 and -6 allowed" output46.mdig > /dev/null || ret=1
if [ $ret != 0 ]; then echo_i "failed"; fi
* as its argument.
*
* When handles are allocated for the socket, 'extrasize' additional bytes
- * will be allocated along with the handle for an associated object
- * (typically ns_client).
+ * can be allocated along with the handle for an associated object, which
+ * can then be freed automatically when the handle is destroyed.
*/
void
void
isc_nm_resume(isc_nm_t *mgr);
/*%<
- * Resume paused processing. It will return immediately
- * after signalling workers to resume.
+ * Resume paused processing. It will return immediately after signalling
+ * workers to resume.
*/
isc_result_t
isc_nm_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
+/*
+ * Begin (or continue) reading on the socket associated with 'handle', and
+ * update its recv callback to 'cb', which will be called as soon as there
+ * is data to process.
+ */
isc_result_t
isc_nm_pauseread(isc_nmhandle_t *handle);
typedef enum isc__netievent_type {
netievent_udpsend,
- netievent_udprecv,
netievent_udpstop,
netievent_tcpconnect,
netievent_tcpsend,
- netievent_tcprecv,
netievent_tcpstartread,
netievent_tcppauseread,
netievent_tcpchildaccept,
netievent_tcpstop,
netievent_tcpclose,
- netievent_tcpdnsclose,
netievent_tcpdnssend,
+ netievent_tcpdnsclose,
netievent_closecb,
netievent_shutdown,
netievent_tcplisten,
} isc__netievent_type;
-/*
- * We have to split it because we can read and write on a socket
- * simultaneously.
- */
typedef union {
isc_nm_recv_cb_t recv;
+ isc_nm_cb_t connect;
isc_nm_accept_cb_t accept;
} isc__nm_readcb_t;
-typedef union {
- isc_nm_cb_t send;
- isc_nm_cb_t connect;
-} isc__nm_writecb_t;
-
typedef union {
isc_nm_recv_cb_t recv;
isc_nm_accept_cb_t accept;
typedef isc__netievent__socket_t isc__netievent_udpstop_t;
typedef isc__netievent__socket_t isc__netievent_tcpstop_t;
typedef isc__netievent__socket_t isc__netievent_tcpclose_t;
-typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t;
typedef isc__netievent__socket_t isc__netievent_startread_t;
typedef isc__netievent__socket_t isc__netievent_pauseread_t;
typedef isc__netievent__socket_t isc__netievent_closecb_t;
+typedef isc__netievent__socket_t isc__netievent_tcpdnsclose_t;
typedef struct isc__netievent__socket_req {
isc__netievent_type type;
isc_nm_tcpsocket,
isc_nm_tcplistener,
isc_nm_tcpdnslistener,
- isc_nm_tcpdnssocket
+ isc_nm_tcpdnssocket,
} isc_nmsocket_type;
/*%
isc_nmsocket_t *children;
int nchildren;
isc_nmiface_t *iface;
- isc_nmhandle_t *tcphandle;
+ isc_nmhandle_t *statichandle;
isc_nmhandle_t *outerhandle;
/*% Extra data allocated at the end of each isc_nmhandle_t */
isc_refcount_t references;
/*%
- * TCPDNS socket has been set not to pipeliine.
+ * Established an outgoing connection, as client not server.
+ */
+ atomic_bool client;
+
+ /*%
+ * TCPDNS socket has been set not to pipeline.
*/
atomic_bool sequential;
isc_result_t
isc__nm_tcp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg);
+/*
+ * Back-end implementation of isc_nm_read() for TCP handles.
+ */
void
isc__nm_tcp_close(isc_nmsocket_t *sock);
*/
void
-isc__nm_tcp_cancelread(isc_nmsocket_t *sock);
+isc__nm_tcp_cancelread(isc_nmhandle_t *handle);
/*%<
- * Stop reading on a connected socket.
+ * Stop reading on a connected TCP handle.
*/
void
uv_stop(&worker->loop);
isc_mempool_put(worker->mgr->evpool, ievent);
return;
+
case netievent_udplisten:
isc__nm_async_udplisten(worker, ievent);
break;
case netievent_udpsend:
isc__nm_async_udpsend(worker, ievent);
break;
+
case netievent_tcpconnect:
isc__nm_async_tcpconnect(worker, ievent);
break;
case netievent_tcpclose:
isc__nm_async_tcpclose(worker, ievent);
break;
+
case netievent_tcpdnsclose:
isc__nm_async_tcpdnsclose(worker, ievent);
break;
+
case netievent_closecb:
isc__nm_async_closecb(worker, ievent);
break;
isc__nm_decstats(sock->mgr, sock->statsindex[STATID_ACTIVE]);
}
- sock->tcphandle = NULL;
+ sock->statichandle = NULL;
if (sock->outerhandle != NULL) {
isc_nmhandle_unref(sock->outerhandle);
}
}
- if (active_handles == 0 || sock->tcphandle != NULL) {
+ if (active_handles == 0 || sock->statichandle != NULL) {
destroy = true;
}
if (handle == NULL) {
handle = alloc_handle(sock);
} else {
- isc_refcount_increment0(&handle->references);
+ isc_refcount_init(&handle->references, 1);
INSIST(VALID_NMHANDLE(handle));
}
handle->ah_pos = pos;
UNLOCK(&sock->lock);
- if (sock->type == isc_nm_tcpsocket) {
- INSIST(sock->tcphandle == NULL);
- sock->tcphandle = handle;
+ if (sock->type == isc_nm_tcpsocket ||
+ (sock->type == isc_nm_udpsocket && atomic_load(&sock->client)))
+ {
+ INSIST(sock->statichandle == NULL);
+ sock->statichandle = handle;
}
return (handle);
}
}
+ if (handle == sock->statichandle) {
+ sock->statichandle = NULL;
+ }
+
isc__nmsocket_detach(&sock);
}
switch (handle->sock->type) {
case isc_nm_tcpsocket:
- isc__nm_tcp_cancelread(handle->sock);
+ isc__nm_tcp_cancelread(handle);
break;
default:
INSIST(0);
static void
tcp_connect_cb(uv_connect_t *uvreq, int status) {
+ isc_result_t result;
isc__nm_uvreq_t *req = (isc__nm_uvreq_t *)uvreq->data;
isc_nmsocket_t *sock = NULL;
+ struct sockaddr_storage ss;
+ isc_nmhandle_t *handle = NULL;
sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
REQUIRE(VALID_UVREQ(req));
- if (status == 0) {
- isc_result_t result;
- struct sockaddr_storage ss;
- isc_nmhandle_t *handle = NULL;
+ if (status != 0) {
+ req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg);
+ isc__nm_uvreq_put(&req, sock);
+ return;
+ }
- sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
- isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
- uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
- &(int){ sizeof(ss) });
- result = isc_sockaddr_fromsockaddr(&sock->peer,
- (struct sockaddr *)&ss);
- RUNTIME_CHECK(result == ISC_R_SUCCESS);
+ sock = uv_handle_get_data((uv_handle_t *)uvreq->handle);
+ isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CONNECT]);
+ uv_tcp_getpeername(&sock->uv_handle.tcp, (struct sockaddr *)&ss,
+ &(int){ sizeof(ss) });
+ result = isc_sockaddr_fromsockaddr(&sock->peer, (struct sockaddr *)&ss);
+ RUNTIME_CHECK(result == ISC_R_SUCCESS);
- handle = isc__nmhandle_get(sock, NULL, NULL);
- req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg);
+ handle = isc__nmhandle_get(sock, NULL, NULL);
+ req->cb.connect(handle, ISC_R_SUCCESS, req->cbarg);
- isc__nm_uvreq_put(&req, sock);
+ isc__nm_uvreq_put(&req, sock);
- /*
- * The sock is now attached to the handle.
- */
- isc__nmsocket_detach(&sock);
+ atomic_init(&sock->client, true);
- /*
- * If the connect callback wants to hold on to the handle,
- * it needs to attach to it.
- */
- isc_nmhandle_unref(handle);
- } else {
- /*
- * TODO:
- * Handle the connect error properly and free the socket.
- */
- req->cb.connect(NULL, isc__nm_uverr2result(status), req->cbarg);
- isc__nm_uvreq_put(&req, sock);
- }
+ /*
+ * The sock is now attached to the handle.
+ */
+ isc__nmsocket_detach(&sock);
+
+ /*
+ * If the connect callback wants to hold on to the handle,
+ * it needs to attach to it.
+ */
+ isc_nmhandle_unref(handle);
}
isc_result_t
isc_result_t result = ISC_R_SUCCESS;
REQUIRE(VALID_NM(mgr));
+ REQUIRE(local != NULL);
+ REQUIRE(peer != NULL);
nsock = isc_mem_get(mgr->mctx, sizeof(*nsock));
isc__nmsocket_init(nsock, mgr, isc_nm_tcpsocket, local);
req->cb.connect = cb;
req->cbarg = cbarg;
req->peer = peer->addr;
+ req->local = local->addr;
ievent = isc__nm_get_ievent(mgr, netievent_tcpconnect);
ievent->sock = nsock;
isc_quota_detach(&sock->quota);
}
if (sock->rcb.recv != NULL) {
- sock->rcb.recv(sock->tcphandle, ISC_R_TIMEDOUT, NULL,
+ sock->rcb.recv(sock->statichandle, ISC_R_TIMEDOUT, NULL,
sock->rcbarg);
isc__nmsocket_clearcb(sock);
}
.length = nread };
if (sock->rcb.recv != NULL) {
- sock->rcb.recv(sock->tcphandle, ISC_R_SUCCESS, ®ion,
- sock->rcbarg);
+ sock->rcb.recv(sock->statichandle, ISC_R_SUCCESS,
+ ®ion, sock->rcbarg);
}
sock->read_timeout = (atomic_load(&sock->keepalive)
*/
if (sock->rcb.recv != NULL) {
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_RECVFAIL]);
- sock->rcb.recv(sock->tcphandle, ISC_R_EOF, NULL, sock->rcbarg);
+ sock->rcb.recv(sock->statichandle, ISC_R_EOF, NULL,
+ sock->rcbarg);
isc__nmsocket_clearcb(sock);
}
isc__nm_tcp_shutdown(isc_nmsocket_t *sock) {
REQUIRE(VALID_NMSOCK(sock));
- if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL &&
+ if (sock->type == isc_nm_tcpsocket && sock->statichandle != NULL &&
sock->rcb.recv != NULL)
{
- sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL,
+ sock->rcb.recv(sock->statichandle, ISC_R_CANCELED, NULL,
sock->rcbarg);
isc__nmsocket_clearcb(sock);
}
}
void
-isc__nm_tcp_cancelread(isc_nmsocket_t *sock) {
- REQUIRE(VALID_NMSOCK(sock));
+isc__nm_tcp_cancelread(isc_nmhandle_t *handle) {
+ isc_nmsocket_t *sock = NULL;
- if (sock->type == isc_nm_tcpsocket && sock->tcphandle != NULL &&
- sock->rcb.recv != NULL)
- {
- sock->rcb.recv(sock->tcphandle, ISC_R_CANCELED, NULL,
- sock->rcbarg);
+ REQUIRE(VALID_NMHANDLE(handle));
+
+ sock = handle->sock;
+
+ REQUIRE(sock->type == isc_nm_tcpsocket);
+
+ if (atomic_load(&sock->client) && sock->rcb.recv != NULL) {
+ sock->rcb.recv(handle, ISC_R_EOF, NULL, sock->rcbarg);
isc__nmsocket_clearcb(sock);
}
}
static void
timer_close_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = (isc_nmsocket_t *)uv_handle_get_data(handle);
- INSIST(VALID_NMSOCK(sock));
+
+ REQUIRE(VALID_NMSOCK(sock));
+
atomic_store(&sock->closed, true);
tcpdns_close_direct(sock);
}
REQUIRE(VALID_NMSOCK(sock));
REQUIRE(sock->tid == isc_nm_tid());
- /* Close the TCP connection, it's closing should fire 'our' closing */
+
+ /* Close the TCP connection; its closure should fire ours. */
isc_nmhandle_unref(sock->outerhandle);
sock->outerhandle = NULL;
}
*/
len = dnslen(dnssock->buf);
if (len <= dnssock->buf_len - 2) {
- isc_nmhandle_t *dnshandle = isc__nmhandle_get(dnssock, NULL,
- NULL);
+ isc_nmhandle_t *dnshandle;
+ if (atomic_load(&dnssock->client) &&
+ dnssock->statichandle != NULL) {
+ dnshandle = dnssock->statichandle;
+ isc_nmhandle_ref(dnshandle);
+ } else {
+ dnshandle = isc__nmhandle_get(dnssock, NULL, NULL);
+ }
+
isc_nmsocket_t *listener = dnssock->listener;
if (listener != NULL && listener->rcb.recv != NULL) {
&(isc_region_t){ .base = dnssock->buf + 2,
.length = len },
listener->rcbarg);
+ } else if (dnssock->rcb.recv != NULL) {
+ isc_nm_recv_cb_t cb = dnssock->rcb.recv;
+ void *cbarg = dnssock->rcbarg;
+
+ /*
+ * We need to clear the read callback *before*
+ * calling it, because it might make another
+ * call to isc_nm_read() and set up a new callback.
+ */
+ isc__nmsocket_clearcb(dnssock);
+ cb(dnshandle, ISC_R_SUCCESS,
+ &(isc_region_t){ .base = dnssock->buf + 2,
+ .length = len },
+ cbarg);
}
len += 2;
REQUIRE(VALID_NMSOCK(dnssock));
REQUIRE(VALID_NMHANDLE(handle));
- REQUIRE(dnssock->tid == isc_nm_tid());
if (region == NULL || eresult != ISC_R_SUCCESS) {
/* Connection closed */
- isc_nmhandle_unref(handle);
+ if (eresult != ISC_R_CANCELED) {
+ isc_nmhandle_unref(handle);
+ }
dnssock->result = eresult;
if (dnssock->self != NULL) {
isc__nmsocket_detach(&dnssock->self);
uv_timer_stop(&dnssock->timer);
}
- if (atomic_load(&dnssock->sequential)) {
+ if (atomic_load(&dnssock->sequential) ||
+ dnssock->rcb.recv == NULL) {
/*
- * We're in sequential mode and we processed
- * one packet, so we're done until the next read
- * completes.
+ * Two reasons we might want to pause here:
+ * - If we're in sequential mode and we've received
+ * a whole packet, so we're done until it's been
+ * processed;
+ * - If we no longer have a read callback.
*/
isc_nm_pauseread(dnssock->outerhandle);
done = true;
void *cbarg, isc_nm_accept_cb_t accept_cb,
void *accept_cbarg, size_t extrahandlesize, int backlog,
isc_quota_t *quota, isc_nmsocket_t **sockp) {
- /* A 'wrapper' socket object with outer set to true TCP socket */
isc_nmsocket_t *dnslistensock = isc_mem_get(mgr->mctx,
sizeof(*dnslistensock));
isc_result_t result;
dnslistensock->accept_cbarg = accept_cbarg;
dnslistensock->extrahandlesize = extrahandlesize;
- /* We set dnslistensock->outer to a true listening socket */
+ /*
+ * dnslistensock will be a DNS 'wrapper' around a connected
+ * stream. We set dnslistensock->outer to a socket listening
+ * for a TCP connection.
+ */
result = isc_nm_listentcp(mgr, iface, dnslisten_acceptcb, dnslistensock,
extrahandlesize, backlog, quota,
&dnslistensock->outer);
r.base = (unsigned char *)req->uvbuf.base;
r.length = req->uvbuf.len;
- result = isc__nm_tcp_send(sock->outerhandle, &r, tcpdnssend_cb,
- req);
+ result = isc_nm_send(sock->outerhandle, &r, tcpdnssend_cb, req);
}
if (result != ISC_R_SUCCESS) {
r.base = (unsigned char *)uvreq->uvbuf.base;
r.length = uvreq->uvbuf.len;
- return (isc__nm_tcp_send(sock->outerhandle, &r, tcpdnssend_cb,
- uvreq));
+ return (isc_nm_send(sock->outerhandle, &r, tcpdnssend_cb,
+ uvreq));
} else {
isc__netievent_tcpdnssend_t *ievent = NULL;
csock->rcb.recv = cb;
csock->rcbarg = cbarg;
csock->fd = socket(family, SOCK_DGRAM, 0);
- INSIST(csock->fd >= 0);
+ RUNTIME_CHECK(csock->fd >= 0);
/*
* This is SO_REUSE**** hell:
}
static void
-udp_close_cb(uv_handle_t *handle) {
+udp_stop_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
atomic_store(&sock->closed, true);
REQUIRE(sock->tid == isc_nm_tid());
uv_udp_recv_stop(&sock->uv_handle.udp);
- uv_close((uv_handle_t *)&sock->uv_handle.udp, udp_close_cb);
+ uv_close((uv_handle_t *)&sock->uv_handle.udp, udp_stop_cb);
isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
- nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
+ if (!atomic_load(&sock->connected)) {
+ nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
+ } else {
+ nmhandle = sock->statichandle;
+ }
region.base = (unsigned char *)buf->base;
region.length = nrecv;
isc_result_t
isc__nm_udp_send(isc_nmhandle_t *handle, isc_region_t *region, isc_nm_cb_t cb,
void *cbarg) {
- isc_nmsocket_t *psock = NULL, *rsock = NULL;
isc_nmsocket_t *sock = handle->sock;
+ isc_nmsocket_t *psock = NULL, *rsock = sock;
isc_sockaddr_t *peer = &handle->peer;
isc__netievent_udpsend_t *ievent = NULL;
isc__nm_uvreq_t *uvreq = NULL;
- int ntid;
uint32_t maxudp = atomic_load(&sock->mgr->maxudp);
+ int ntid;
/*
* We're simulating a firewall blocking UDP packets bigger than
return (ISC_R_SUCCESS);
}
- if (sock->type == isc_nm_udpsocket) {
+ if (sock->type == isc_nm_udpsocket && !atomic_load(&sock->client)) {
INSIST(sock->parent != NULL);
psock = sock->parent;
} else if (sock->type == isc_nm_udplistener) {
psock = sock;
- } else {
+ } else if (!atomic_load(&sock->client)) {
INSIST(0);
ISC_UNREACHABLE();
}
*/
if (isc__nm_in_netthread()) {
ntid = isc_nm_tid();
- } else if (sock->type == isc_nm_udpsocket) {
+ } else if (sock->type == isc_nm_udpsocket &&
+ !atomic_load(&sock->client)) {
ntid = sock->tid;
} else {
ntid = (int)isc_random_uniform(sock->nchildren);
}
- rsock = &psock->children[ntid];
+ if (psock != NULL) {
+ rsock = &psock->children[ntid];
+ }
uvreq = isc__nm_uvreq_get(sock->mgr, sock);
uvreq->uvbuf.base = (char *)region->base;
static isc_result_t
udp_send_direct(isc_nmsocket_t *sock, isc__nm_uvreq_t *req,
isc_sockaddr_t *peer) {
+ const struct sockaddr *sa = NULL;
int rv;
REQUIRE(sock->tid == isc_nm_tid());
if (!isc__nmsocket_active(sock)) {
return (ISC_R_CANCELED);
}
+
isc_nmhandle_ref(req->handle);
+ sa = atomic_load(&sock->connected) ? NULL : &peer->type.sa;
rv = uv_udp_send(&req->uv_req.udp_send, &sock->uv_handle.udp,
- &req->uvbuf, 1, &peer->type.sa, udp_send_cb);
+ &req->uvbuf, 1, sa, udp_send_cb);
if (rv < 0) {
isc__nm_incstats(req->sock->mgr,
req->sock->statsindex[STATID_SENDFAIL]);