]> git.ipfire.org Git - thirdparty/bind9.git/commitdiff
Refactor udp_recv_cb()
authorOndřej Surý <ondrej@sury.org>
Mon, 26 Oct 2020 16:31:55 +0000 (17:31 +0100)
committerOndřej Surý <ondrej@isc.org>
Fri, 30 Oct 2020 10:11:54 +0000 (11:11 +0100)
- more logical code flow.
- propagate errors back to the caller.
- add a 'reading' flag and call the callback from failed_read_cb()
  only when it the socket was actively reading.

lib/isc/netmgr/netmgr-int.h
lib/isc/netmgr/udp.c

index 56271e733d3afae55a33c79047e1b7e6497ddbe6..28b62853d4fd88292a1741f671299d4f86460fc6 100644 (file)
@@ -471,6 +471,7 @@ struct isc_nmsocket {
        atomic_bool connecting;
        atomic_bool connected;
        atomic_bool connect_error;
+       atomic_bool reading;
        isc_refcount_t references;
 
        /*%
index 716303f2d6204e01b2975ce5fea7015830e5286f..5d9faf305a5875d89a2bb7ac99f575e6c49f08be 100644 (file)
@@ -47,6 +47,9 @@ udp_close_cb(uv_handle_t *uvhandle);
 static void
 udp_close_direct(isc_nmsocket_t *sock);
 
+static void
+failed_read_cb(isc_nmsocket_t *sock, isc_result_t result);
+
 isc_result_t
 isc_nm_listenudp(isc_nm_t *mgr, isc_nmiface_t *iface, isc_nm_recv_cb_t cb,
                 void *cbarg, size_t extrahandlesize, isc_nmsocket_t **sockp) {
@@ -205,6 +208,7 @@ isc__nm_async_udplisten(isc__networker_t *worker, isc__netievent_t *ev0) {
                            &(int){ ISC_SEND_BUFFER_SIZE });
 #endif
        uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, udp_recv_cb);
+       atomic_store(&sock->reading, true);
 }
 
 static void
@@ -221,6 +225,7 @@ stop_udp_child(isc_nmsocket_t *sock) {
        REQUIRE(sock->tid == isc_nm_tid());
 
        uv_udp_recv_stop(&sock->uv_handle.udp);
+       atomic_store(&sock->reading, false);
        uv_close((uv_handle_t *)&sock->uv_handle.udp, udp_stop_cb);
 
        isc__nm_incstats(sock->mgr, sock->statsindex[STATID_CLOSE]);
@@ -337,24 +342,22 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
        isc_result_t result;
        isc_nmhandle_t *nmhandle = NULL;
        isc_sockaddr_t sockaddr;
-       isc_nmsocket_t *sock = NULL;
+       isc_nmsocket_t *sock = uv_handle_get_data((uv_handle_t *)handle);
        isc_region_t region;
        uint32_t maxudp;
-       bool free_buf = true;
+       bool free_buf;
        isc_nm_recv_cb_t cb;
        void *cbarg;
-       bool connected;
 
-       /*
-        * Even though destruction of the socket can only happen from the
-        * network thread that we're in, we still attach to the socket here
-        * to ensure it won't be destroyed by the recv callback.
-        */
-       isc__nmsocket_attach(uv_handle_get_data((uv_handle_t *)handle), &sock);
+       REQUIRE(VALID_NMSOCK(sock));
+       REQUIRE(sock->tid == isc_nm_tid());
 
-#ifdef UV_UDP_MMSG_CHUNK
+#ifdef UV_UDP_MMSG_FREE
+       free_buf = ((flags & UV_UDP_MMSG_FREE) == UV_UDP_MMSG_FREE);
+#elif UV_UDP_MMSG_CHUNK
        free_buf = ((flags & UV_UDP_MMSG_CHUNK) == 0);
 #else
+       free_buf = true;
        UNUSED(flags);
 #endif
 
@@ -362,57 +365,57 @@ udp_recv_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
         * Three possible reasons to return now without processing:
         * - If addr == NULL, in which case it's the end of stream;
         *   we can free the buffer and bail.
+        */
+       if (addr == NULL) {
+               goto done;
+       }
+       /*
         * - If we're simulating a firewall blocking UDP packets
         *   bigger than 'maxudp' bytes for testing purposes.
-        * - If the socket is no longer active.
         */
        maxudp = atomic_load(&sock->mgr->maxudp);
-       if ((addr == NULL) || (maxudp != 0 && (uint32_t)nrecv > maxudp) ||
-           (!isc__nmsocket_active(sock)))
-       {
-               if (free_buf) {
-                       isc__nm_free_uvbuf(sock, buf);
-               }
-               isc__nmsocket_detach(&sock);
-               return;
+       if ((maxudp != 0 && (uint32_t)nrecv > maxudp)) {
+               goto done;
        }
-
-       result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
-       RUNTIME_CHECK(result == ISC_R_SUCCESS);
-       connected = atomic_load(&sock->connected);
-
-       if (!connected) {
-               nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
-       } else {
-               nmhandle = sock->statichandle;
+       /*
+        * - If the socket is no longer active.
+        */
+       if (!isc__nmsocket_active(sock)) {
+               goto done;
        }
+
        region.base = (unsigned char *)buf->base;
        region.length = nrecv;
 
-       INSIST(sock->tid == isc_nm_tid());
-       INSIST(sock->recv_cb != NULL);
-
        cb = sock->recv_cb;
        cbarg = sock->recv_cbarg;
 
-       cb(nmhandle, ISC_R_SUCCESS, &region, cbarg);
+       if (atomic_load(&sock->client)) {
+               if (nrecv < 0) {
+                       failed_read_cb(sock, isc__nm_uverr2result(nrecv));
+                       return;
+               }
+
+               cb(sock->statichandle, ISC_R_SUCCESS, &region, cbarg);
+       } else {
+               result = isc_sockaddr_fromsockaddr(&sockaddr, addr);
+               RUNTIME_CHECK(result == ISC_R_SUCCESS);
 
-       if (free_buf) {
-               isc__nm_free_uvbuf(sock, buf);
-       }
+               nmhandle = isc__nmhandle_get(sock, &sockaddr, NULL);
 
-       /*
-        * The sock is now attached to the handle, we can detach our ref.
-        */
-       isc__nmsocket_detach(&sock);
+               cb(nmhandle, ISC_R_SUCCESS, &region, cbarg);
 
-       /*
-        * If the recv callback wants to hold on to the handle,
-        * it needs to attach to it.
-        */
-       if (!connected) {
+               /*
+                * If the recv callback wants to hold on to the handle,
+                * it needs to attach to it.
+                */
                isc_nmhandle_detach(&nmhandle);
        }
+
+done:
+       if (free_buf) {
+               isc__nm_free_uvbuf(sock, buf);
+       }
 }
 
 /*
@@ -821,6 +824,7 @@ udp_read_cb(uv_udp_t *handle, ssize_t nrecv, const uv_buf_t *buf,
        }
        udp_recv_cb(handle, nrecv, buf, addr, flags);
        uv_udp_recv_stop(&sock->uv_handle.udp);
+       atomic_store(&sock->reading, false);
 }
 
 static void
@@ -831,19 +835,22 @@ failed_read_cb(isc_nmsocket_t *sock, isc_result_t result) {
        REQUIRE(VALID_NMSOCK(sock));
        REQUIRE(sock->statichandle != NULL);
 
-       uv_udp_recv_stop(&sock->uv_handle.udp);
-
        if (sock->timer_initialized) {
                uv_timer_stop(&sock->timer);
                sock->timer_running = false;
        }
 
-       cb = sock->recv_cb;
-       cbarg = sock->recv_cbarg;
-       isc__nmsocket_clearcb(sock);
+       uv_udp_recv_stop(&sock->uv_handle.udp);
 
-       if (cb != NULL) {
-               cb(sock->statichandle, result, NULL, cbarg);
+       if (atomic_compare_exchange_strong(&sock->reading, &(bool){ true },
+                                          false)) {
+               cb = sock->recv_cb;
+               cbarg = sock->recv_cbarg;
+               isc__nmsocket_clearcb(sock);
+
+               if (cb != NULL) {
+                       cb(sock->statichandle, result, NULL, cbarg);
+               }
        }
 }
 
@@ -882,6 +889,7 @@ isc__nm_async_udpread(isc__networker_t *worker, isc__netievent_t *ev0) {
        }
 
        uv_udp_recv_start(&sock->uv_handle.udp, udp_alloc_cb, udp_read_cb);
+       atomic_store(&sock->reading, true);
 }
 
 isc_result_t