netievent_udplisten,
netievent_udpstop,
- netievent_udpread,
netievent_tcplisten,
netievent_tcpstop,
isc__nm_async_udpstop(isc__networker_t *worker, isc__netievent_t *ev0);
void
isc__nm_async_udpcancel(isc__networker_t *worker, isc__netievent_t *ev0);
-void
-isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0);
/*%<
* Callback handlers for asynchronous UDP events (listen, stoplisten, send).
*/
NETIEVENT_SOCKET_HANDLE_TYPE(tlscancel);
NETIEVENT_SOCKET_TYPE(udplisten);
NETIEVENT_SOCKET_TYPE(udpstop);
-NETIEVENT_SOCKET_TYPE(udpread);
NETIEVENT_SOCKET_TYPE(tcpdnsclose);
NETIEVENT_SOCKET_TYPE(tcpdnsread);
NETIEVENT_SOCKET_HANDLE_DECL(tlscancel);
NETIEVENT_SOCKET_DECL(udplisten);
NETIEVENT_SOCKET_DECL(udpstop);
-NETIEVENT_SOCKET_DECL(udpread);
NETIEVENT_SOCKET_DECL(tcpdnsclose);
NETIEVENT_SOCKET_DECL(tcpdnsread);
NETIEVENT_CASE(udplisten);
NETIEVENT_CASE(udpstop);
NETIEVENT_CASE(udpcancel);
- NETIEVENT_CASE(udpread);
NETIEVENT_CASE(tcpaccept);
NETIEVENT_CASE(tcpconnect);
NETIEVENT_SOCKET_DEF(udplisten);
NETIEVENT_SOCKET_DEF(udpstop);
NETIEVENT_SOCKET_HANDLE_DEF(udpcancel);
-NETIEVENT_SOCKET_DEF(udpread);
NETIEVENT_SOCKET_DEF(tcpdnsclose);
NETIEVENT_SOCKET_DEF(tcpdnsread);
#endif /* if defined(HAVE_LINUX_NETLINK_H) && defined(HAVE_LINUX_RTNETLINK_H) \
*/
-static void
-udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
- const struct sockaddr *addr, unsigned flags);
-
static void
udp_send_cb(uv_udp_send_t *req, int status);
isc__nm_set_network_buffers(mgr, &sock->uv_handle.handle);
r = uv_udp_recv_start(&sock->uv_handle.udp, isc__nm_alloc_cb,
- udp_recv_cb);
+ isc__nm_udp_read_cb);
if (r != 0) {
isc__nm_incstats(sock, STATID_BINDFAIL);
goto done;
* reused for a series of packets, so we need to allocate a new one.
* This new one can be reused to send the response then.
*/
-static void
-udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
- const struct sockaddr *addr, unsigned flags) {
+void
+isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
+ const struct sockaddr *addr, unsigned flags) {
isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
isc__nm_uvreq_t *req = NULL;
uint32_t maxudp;
goto free;
}
- /*
- * - If addr == NULL, in which case it's the end of stream;
- * we can free the buffer and bail.
- */
- if (addr == NULL) {
- isc__nm_failed_read_cb(sock, ISC_R_EOF, false);
- goto free;
- }
-
/*
* - If the network manager is shutting down
*/
goto free;
}
+ /*
+ * End of the current (iteration) datagram stream, just free the buffer.
+ * The callback with nrecv == 0 and addr == NULL is called for both
+ * normal UDP sockets and recvmmsg sockets at the end of every event
+ * loop iteration.
+ */
+ if (nrecv == 0 && addr == NULL) {
+ INSIST(flags == 0);
+ goto free;
+ }
+
+ /*
+ * We could receive an empty datagram in which case:
+ * nrecv == 0 and addr != NULL
+ */
+ INSIST(addr != NULL);
+
if (!sock->route_sock) {
result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
RUNTIME_CHECK(result == ISC_R_SUCCESS);
sock->recv_read = false;
+ /*
+ * The client isc_nm_read() expects just a single message, so we need to
+ * stop reading now. The reading could be restarted in the read
+ * callback with another isc_nm_read() call.
+ */
+ if (atomic_load(&sock->client)) {
+ isc__nmsocket_timer_stop(sock);
+ isc__nm_stop_reading(sock);
+ }
+
REQUIRE(!sock->processing);
sock->processing = true;
isc__nm_readcb(sock, req, ISC_R_SUCCESS);
isc__nmsocket_detach(&sock);
}
-void
-isc__nm_udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
- const struct sockaddr *addr, unsigned flags) {
- isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
- REQUIRE(VALID_NMSOCK(sock));
- REQUIRE(atomic_load(&sock->client));
- REQUIRE(sock->parent == NULL);
-
- /*
- * This function can only be reached when calling isc_nm_read() on
- * a UDP client socket. There's no point calling isc_nm_read() on
- * a UDP listener socket; those are always reading.
- *
- * The reason why we stop the timer and the reading after calling the
- * callback is because there's a time window where a second UDP packet
- * might be received between isc__nm_stop_reading() call and
- * isc_nm_read() call from the callback and such UDP datagram would be
- * lost like tears in the rain.
- */
- udp_recv_cb(handle, nrecv, buf, addr, flags);
-
- isc__nmsocket_timer_stop(sock);
- isc__nm_stop_reading(sock);
-}
-
void
isc__nm_udp_failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
REQUIRE(VALID_NMSOCK(sock));
}
}
-/*
- * Asynchronous 'udpread' call handler: start or resume reading on a
- * socket; pause reading and call the 'recv' callback after each
- * datagram.
- */
void
-isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) {
- isc__netievent_udpread_t *ievent = (isc__netievent_udpread_t *)ev0;
- isc_nmsocket_t *sock = ievent->sock;
+isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
+ isc_nmsocket_t *sock = NULL;
isc_result_t result;
- UNUSED(worker);
+ REQUIRE(VALID_NMHANDLE(handle));
+
+ sock = handle->sock;
REQUIRE(VALID_NMSOCK(sock));
+ REQUIRE(sock->type == isc_nm_udpsocket);
+ REQUIRE(sock->statichandle == handle);
+ REQUIRE(!sock->recv_read);
REQUIRE(sock->tid == isc_tid());
- if (isc__nm_closing(worker)) {
+ /*
+ * We need to initialize the callback before checking for shutdown
+ * conditions, so the callback is always called even on error condition.
+ */
+ sock->recv_cb = cb;
+ sock->recv_cbarg = cbarg;
+ sock->recv_read = true;
+
+ if (isc__nm_closing(sock->worker)) {
result = ISC_R_SHUTTINGDOWN;
goto fail;
}
goto fail;
}
- isc__nmsocket_timer_start(sock);
+ isc__nmsocket_timer_restart(sock);
return;
fail:
isc__nm_failed_read_cb(sock, result, false);
}
-void
-isc__nm_udp_read(isc_nmhandle_t *handle, isc_nm_recv_cb_t cb, void *cbarg) {
- isc_nmsocket_t *sock = NULL;
-
- REQUIRE(VALID_NMHANDLE(handle));
-
- sock = handle->sock;
-
- REQUIRE(VALID_NMSOCK(sock));
- REQUIRE(sock->type == isc_nm_udpsocket);
- REQUIRE(sock->statichandle == handle);
- REQUIRE(!sock->recv_read);
-
- sock->recv_cb = cb;
- sock->recv_cbarg = cbarg;
- sock->recv_read = true;
-
- if (!atomic_load(&sock->reading) && sock->tid == isc_tid()) {
- isc__netievent_udpread_t ievent = { .sock = sock };
- isc__nm_async_udpread(sock->worker,
- (isc__netievent_t *)&ievent);
- } else {
- isc__netievent_udpread_t *ievent =
- isc__nm_get_netievent_udpread(sock->worker, sock);
- isc__nm_enqueue_ievent(sock->worker,
- (isc__netievent_t *)ievent);
- }
-}
-
static void
udp_close_cb(uv_handle_t *handle) {
isc_nmsocket_t *sock = uv_handle_get_data(handle);
isc_nonce_buf(&send_magic, sizeof(send_magic));
+ connect_readcb = connect_read_cb;
+
return (0);
}
isc_refcount_increment0(&active_creads);
isc_nmhandle_attach(handle, &readhandle);
- isc_nm_read(handle, connect_read_cb, cbarg);
+ isc_nm_read(handle, connect_readcb, cbarg);
isc_refcount_increment0(&active_csends);
isc_nmhandle_attach(handle, &sendhandle);
}
}
+static void
+double_read_send_cb(isc_nmhandle_t *handle, isc_result_t eresult, void *cbarg) {
+ UNUSED(cbarg);
+ UNUSED(eresult);
+
+ assert_non_null(handle);
+
+ F();
+
+ isc_refcount_decrement(&active_ssends);
+
+ switch (eresult) {
+ case ISC_R_SUCCESS:
+ if (have_expected_ssends(atomic_fetch_add(&ssends, 1) + 1)) {
+ do_ssends_shutdown(loopmgr);
+ } else {
+ isc_nmhandle_t *sendhandle = NULL;
+ isc_nmhandle_attach(handle, &sendhandle);
+ isc_nmhandle_setwritetimeout(sendhandle, T_IDLE);
+ isc_refcount_increment0(&active_ssends);
+ isc_nm_send(sendhandle, &send_msg, double_read_send_cb,
+ cbarg);
+ break;
+ }
+ break;
+ case ISC_R_CANCELED:
+ break;
+ default:
+ fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
+ isc_result_totext(eresult), cbarg);
+ assert_int_equal(eresult, ISC_R_SUCCESS);
+ }
+
+ isc_nmhandle_detach(&handle);
+}
+
+static void
+double_read_listen_cb(isc_nmhandle_t *handle, isc_result_t eresult,
+ isc_region_t *region, void *cbarg) {
+ uint64_t magic = 0;
+
+ assert_non_null(handle);
+
+ F();
+
+ switch (eresult) {
+ case ISC_R_EOF:
+ case ISC_R_SHUTTINGDOWN:
+ case ISC_R_CANCELED:
+ break;
+ case ISC_R_SUCCESS:
+ memmove(&magic, region->base, sizeof(magic));
+ assert_true(magic == send_magic);
+
+ assert_true(region->length >= sizeof(magic));
+
+ memmove(&magic, region->base, sizeof(magic));
+ assert_true(magic == send_magic);
+
+ isc_nmhandle_t *sendhandle = NULL;
+ isc_nmhandle_attach(handle, &sendhandle);
+ isc_nmhandle_setwritetimeout(sendhandle, T_IDLE);
+ isc_refcount_increment0(&active_ssends);
+ isc_nm_send(sendhandle, &send_msg, double_read_send_cb, cbarg);
+ return;
+ default:
+ fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
+ isc_result_totext(eresult), cbarg);
+ assert_int_equal(eresult, ISC_R_SUCCESS);
+ }
+
+ isc_refcount_decrement(&active_sreads);
+
+ isc_nmhandle_detach(&handle);
+}
+
+static void
+double_read_cb(isc_nmhandle_t *handle, isc_result_t eresult,
+ isc_region_t *region, void *cbarg) {
+ uint64_t magic = 0;
+ bool detach = false;
+
+ UNUSED(cbarg);
+
+ assert_non_null(handle);
+
+ F();
+
+ switch (eresult) {
+ case ISC_R_SUCCESS:
+ assert_true(region->length >= sizeof(magic));
+
+ memmove(&magic, region->base, sizeof(magic));
+
+ assert_true(magic == send_magic);
+
+ if (have_expected_creads(atomic_fetch_add(&creads, 1) + 1)) {
+ do_creads_shutdown(loopmgr);
+ detach = true;
+ }
+
+ if (magic == send_magic && allow_send_back) {
+ connect_send(handle);
+ return;
+ }
+
+ break;
+ case ISC_R_TIMEDOUT:
+ case ISC_R_EOF:
+ case ISC_R_SHUTTINGDOWN:
+ case ISC_R_CANCELED:
+ case ISC_R_CONNECTIONRESET:
+ detach = true;
+ break;
+ default:
+ fprintf(stderr, "%s(%p, %s, %p)\n", __func__, handle,
+ isc_result_totext(eresult), cbarg);
+ assert_int_equal(eresult, ISC_R_SUCCESS);
+ }
+
+ if (detach) {
+ isc_refcount_decrement(&active_creads);
+ isc_nmhandle_detach(&handle);
+ } else {
+ isc_nm_read(handle, connect_readcb, cbarg);
+ }
+}
+
+ISC_SETUP_TEST_IMPL(udp_double_read) {
+ setup_test(state);
+
+ expected_cconnects = 1;
+ cconnects_shutdown = false;
+
+ expected_csends = 1;
+ csends_shutdown = false;
+
+ expected_sreads = 1;
+ sreads_shutdown = false;
+
+ expected_ssends = 2;
+ ssends_shutdown = false;
+
+ expected_creads = 2;
+ creads_shutdown = true;
+
+ connect_readcb = double_read_cb;
+
+ return (0);
+}
+
+ISC_TEARDOWN_TEST_IMPL(udp_double_read) {
+ atomic_assert_int_eq(creads, expected_creads);
+
+ teardown_test(state);
+
+ return (0);
+}
+
+ISC_LOOP_TEST_IMPL(udp_double_read) {
+ start_listening(ISC_NM_LISTEN_ALL, double_read_listen_cb);
+
+ udp__connect(NULL);
+}
+
ISC_TEST_LIST_START
/* Mock tests are unreliable on OpenBSD */
ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_shutdown_read)
ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_cancel_read)
ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_timeout_recovery)
+ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_double_read)
ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_recv_one)
ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_recv_two)
ISC_TEST_ENTRY_SETUP_TEARDOWN(udp_recv_send)