]> git.ipfire.org Git - thirdparty/ntp.git/commitdiff
[Bug 3666] avoid unlimited receive buffer allocation
authorJuergen Perlinger <perlinger@ntp.org>
Sat, 9 May 2020 06:20:44 +0000 (08:20 +0200)
committerJuergen Perlinger <perlinger@ntp.org>
Sat, 9 May 2020 06:20:44 +0000 (08:20 +0200)
bk: 5eb64bbcMQk6vLa3ERqwzpImTteOlw

ChangeLog
include/recvbuff.h
lib/isc/win32/include/isc/int.h
libntp/recvbuff.c
libntp/timexsup.c
ntpd/ntp_io.c
ntpd/refclock_parse.c
ntpdate/ntpdate.c
ports/winnt/libntp/messages.h
ports/winnt/ntpd/ntp_iocompletionport.c
tests/libntp/recvbuff.c

index 073fdb5711b00163367dad4ca9371b61ecb69842..1008f370a0f9f2943b870ca7106b9cfaa705a8af 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,7 @@
+---
+* [Bug 3666] avoid unlimited receive buffer allocation <perlinger@ntp.org>
+  - limit number of receive buffers, with an iron reserve for refclocks
+
 ---
 (4.2.8p15) 2020/04/xx Released by Harlan Stenn <stenn@ntp.or>
 
index 42597153eea2a5cdce57321a9cf764a22b5c90ab..2a4c84c5afc3b0696a616ef75caae1c4b362609e 100644 (file)
 /*
  * recvbuf memory management
  */
-#define RECV_INIT      10      /* 10 buffers initially */
+#define RECV_INIT      64      /* 64 buffers initially */
 #define RECV_LOWAT     3       /* when we're down to three buffers get more */
-#define RECV_INC       5       /* get 5 more at a time */
-#define RECV_TOOMANY   40      /* this is way too many buffers */
+#define RECV_INC       32      /* [power of 2] get 32 more at a time */
+#define RECV_BATCH     128     /* [power of 2] max increment in one sweep */
+#define RECV_TOOMANY   4096    /* this should suffice, really. TODO: tos option? */
+
+/* If we have clocks, keep an iron reserve of receive buffers for
+ * clocks only.
+ */
+#if defined(REFCLOCK)
+# if !defined(RECV_CLOCK) || RECV_CLOCK == 0
+#  undef RECV_CLOCK
+#  define RECV_CLOCK   16
+# endif
+#else
+# if defined(RECV_CLOCK)
+#  undef RECV_CLOCK
+# endif
+# define RECV_CLOCK 0
+#endif
 
 #if defined HAVE_IO_COMPLETION_PORT
 # include "ntp_iocompletionport.h"
@@ -90,10 +106,10 @@ extern     void    freerecvbuf(struct recvbuf *);
  *  you put it back with freerecvbuf() or 
  */
 
-/* signal safe - no malloc */
-extern struct recvbuf *get_free_recv_buffer(void);
-/* signal unsafe - may malloc, never returs NULL */
-extern struct recvbuf *get_free_recv_buffer_alloc(void);
+/* signal safe - no malloc, returns NULL when no bufs */
+extern struct recvbuf *get_free_recv_buffer(int /*BOOL*/ urgent);
+/* signal unsafe - may malloc, returns NULL when no bufs */
+extern struct recvbuf *get_free_recv_buffer_alloc(int /*BOOL*/ urgent);
 
 /*   Add a buffer to the full list
  */
index 2ee8bf96a3fb8622763f9b74357721890487bf5d..edcf59dd3ad7c254ad2be817b3352855301892b5 100644 (file)
@@ -20,7 +20,9 @@
 #ifndef ISC_INT_H
 #define ISC_INT_H 1
 
-#define _INTEGRAL_MAX_BITS 64
+#ifndef _INTEGRAL_MAX_BITS
+# define _INTEGRAL_MAX_BITS 64
+#endif
 #include <limits.h>
 
 typedef __int8                         isc_int8_t;
index 573fdb2f920963697384b0df4e1a263b3efaa6f8..5855ec2147ac07e02a48dd185f455a26911f915d 100644 (file)
 #include "recvbuff.h"
 #include "iosignal.h"
 
+#if (RECV_INC & (RECV_INC-1))
+# error RECV_INC not a power of 2!
+#endif
+#if (RECV_BATCH & (RECV_BATCH - 1))
+#error RECV_BATCH not a power of 2!
+#endif
+#if (RECV_BATCH < RECV_INC)
+#error RECV_BATCH must be >= RECV_INC!
+#endif
 
 /*
  * Memory allocation
@@ -21,6 +30,8 @@ static u_long volatile total_recvbufs;        /* total recvbufs currently in use */
 static u_long volatile lowater_adds;   /* number of times we have added memory */
 static u_long volatile buffer_shortfall;/* number of missed free receive buffers
                                           between replenishments */
+static u_long limit_recvbufs;          /* maximum total of receive buffers */
+static u_long emerg_recvbufs;          /* emergency/urgent buffers to keep */
 
 static DECL_FIFO_ANCHOR(recvbuf_t) full_recv_fifo;
 static recvbuf_t *                free_recv_list;
@@ -33,11 +44,16 @@ static recvbuf_t *             free_recv_list;
  * short a time as possible
  */
 static CRITICAL_SECTION RecvLock;
-# define LOCK()                EnterCriticalSection(&RecvLock)
-# define UNLOCK()      LeaveCriticalSection(&RecvLock)
+static CRITICAL_SECTION FreeLock;
+# define LOCK_R()      EnterCriticalSection(&RecvLock)
+# define UNLOCK_R()    LeaveCriticalSection(&RecvLock)
+# define LOCK_F()      EnterCriticalSection(&FreeLock)
+# define UNLOCK_F()    LeaveCriticalSection(&FreeLock)
 #else
-# define LOCK()                do {} while (FALSE)
-# define UNLOCK()      do {} while (FALSE)
+# define LOCK_R()      do {} while (FALSE)
+# define UNLOCK_R()    do {} while (FALSE)
+# define LOCK_F()      do {} while (FALSE)
+# define UNLOCK_F()    do {} while (FALSE)
 #endif
 
 #ifdef DEBUG
@@ -76,33 +92,52 @@ initialise_buffer(recvbuf_t *buff)
 }
 
 static void
-create_buffers(int nbufs)
+create_buffers(
+       size_t          nbufs)
 {
+#   ifndef DEBUG
+       static const u_int chunk = RECV_INC;
+#   else
+       /* Allocate each buffer individually so they can be free()d
+        * during ntpd shutdown on DEBUG builds to keep them out of heap
+        * leak reports.
+        */
+       static const u_int chunk = 1;
+#   endif
+
        register recvbuf_t *bufp;
-       int i, abuf;
+       u_int i;
+       size_t abuf;
 
+       if (limit_recvbufs <= total_recvbufs)
+               return;
+       
        abuf = nbufs + buffer_shortfall;
        buffer_shortfall = 0;
 
-#ifndef DEBUG
-       bufp = eallocarray(abuf, sizeof(*bufp));
-#endif
-
-       for (i = 0; i < abuf; i++) {
-#ifdef DEBUG
-               /*
-                * Allocate each buffer individually so they can be
-                * free()d during ntpd shutdown on DEBUG builds to
-                * keep them out of heap leak reports.
-                */
-               bufp = emalloc_zero(sizeof(*bufp));
-#endif
-               LINK_SLIST(free_recv_list, bufp, link);
-               bufp++;
-               free_recvbufs++;
-               total_recvbufs++;
+       if (abuf < nbufs || abuf > RECV_BATCH)
+               abuf = RECV_BATCH;      /* clamp on overflow */
+       else
+               abuf += (~abuf + 1) & (RECV_INC - 1);   /* round up */
+       
+       if (abuf > (limit_recvbufs - total_recvbufs))
+               abuf = limit_recvbufs - total_recvbufs;
+       abuf += (~abuf + 1) & (chunk - 1);              /* round up */
+       
+       while (abuf) {
+               bufp = calloc(chunk, sizeof(*bufp));
+               if (!bufp) {
+                       limit_recvbufs = total_recvbufs;
+                       break;
+               }
+               for (i = chunk; i; --i,++bufp) {
+                       LINK_SLIST(free_recv_list, bufp, link);
+               }
+               free_recvbufs += chunk;
+               total_recvbufs += chunk;
+               abuf -= chunk;
        }
-       lowater_adds++;
+       ++lowater_adds;
 }
 
 void
@@ -115,15 +150,19 @@ init_recvbuff(int nbufs)
        free_recvbufs = total_recvbufs = 0;
        full_recvbufs = lowater_adds = 0;
 
+       limit_recvbufs = RECV_TOOMANY;
+       emerg_recvbufs = RECV_CLOCK;
+
        create_buffers(nbufs);
 
-#if defined(SYS_WINNT)
+#   if defined(SYS_WINNT)
        InitializeCriticalSection(&RecvLock);
-#endif
+       InitializeCriticalSection(&FreeLock);
+#   endif
 
-#ifdef DEBUG
+#   ifdef DEBUG
        atexit(&uninit_recvbuff);
-#endif
+#   endif
 }
 
 
@@ -146,6 +185,10 @@ uninit_recvbuff(void)
                        break;
                free(rbunlinked);
        }
+#   if defined(SYS_WINNT)
+       DeleteCriticalSection(&FreeLock);
+       DeleteCriticalSection(&RecvLock);
+#   endif
 }
 #endif /* DEBUG */
 
@@ -157,13 +200,14 @@ void
 freerecvbuf(recvbuf_t *rb)
 {
        if (rb) {
-               LOCK();
-               rb->used--;
-               if (rb->used != 0)
+               if (--rb->used != 0) {
                        msyslog(LOG_ERR, "******** freerecvbuff non-zero usage: %d *******", rb->used);
+                       rb->used = 0;
+               }
+               LOCK_F();
                LINK_SLIST(free_recv_list, rb, link);
-               free_recvbufs++;
-               UNLOCK();
+               ++free_recvbufs;
+               UNLOCK_F();
        }
 }
 
@@ -175,28 +219,34 @@ add_full_recv_buffer(recvbuf_t *rb)
                msyslog(LOG_ERR, "add_full_recv_buffer received NULL buffer");
                return;
        }
-       LOCK();
+       LOCK_R();
        LINK_FIFO(full_recv_fifo, rb, link);
-       full_recvbufs++;
-       UNLOCK();
+       ++full_recvbufs;
+       UNLOCK_R();
 }
 
 
 recvbuf_t *
-get_free_recv_buffer(void)
+get_free_recv_buffer(
+    int /*BOOL*/ urgent
+    )
 {
-       recvbuf_t *buffer;
+       recvbuf_t *buffer = NULL;
 
-       LOCK();
-       UNLINK_HEAD_SLIST(buffer, free_recv_list, link);
+       LOCK_F();
+       if (free_recvbufs > (urgent ? emerg_recvbufs : 0)) {
+               UNLINK_HEAD_SLIST(buffer, free_recv_list, link);
+       }
+       
        if (buffer != NULL) {
-               free_recvbufs--;
+               if (free_recvbufs)
+                       --free_recvbufs;
                initialise_buffer(buffer);
-               buffer->used++;
+               ++buffer->used;
        } else {
-               buffer_shortfall++;
+               ++buffer_shortfall;
        }
-       UNLOCK();
+       UNLOCK_F();
 
        return buffer;
 }
@@ -204,17 +254,15 @@ get_free_recv_buffer(void)
 
 #ifdef HAVE_IO_COMPLETION_PORT
 recvbuf_t *
-get_free_recv_buffer_alloc(void)
+get_free_recv_buffer_alloc(
+    int /*BOOL*/ urgent
+    )
 {
-       recvbuf_t *buffer;
-       
-       buffer = get_free_recv_buffer();
-       if (NULL == buffer) {
+       LOCK_F();       
+       if (free_recvbufs <= emerg_recvbufs || buffer_shortfall > 0)
                create_buffers(RECV_INC);
-               buffer = get_free_recv_buffer();
-       }
-       ENSURE(buffer != NULL);
-       return (buffer);
+       UNLOCK_F();
+       return get_free_recv_buffer(urgent);
 }
 #endif
 
@@ -224,30 +272,26 @@ get_full_recv_buffer(void)
 {
        recvbuf_t *     rbuf;
 
-       LOCK();
-       
        /*
-        * make sure there are free buffers when we
-        * wander off to do lengthy packet processing with
-        * any buffer we grab from the full list.
+        * make sure there are free buffers when we wander off to do
+        * lengthy packet processing with any buffer we grab from the
+        * full list.
         * 
-        * fixes malloc() interrupted by SIGIO risk
-        * (Bug 889)
+        * fixes malloc() interrupted by SIGIO risk (Bug 889)
         */
-       if (NULL == free_recv_list || buffer_shortfall > 0) {
-               /*
-                * try to get us some more buffers
-                */
+       LOCK_F();       
+       if (free_recvbufs <= emerg_recvbufs || buffer_shortfall > 0)
                create_buffers(RECV_INC);
-       }
+       UNLOCK_F();
 
        /*
         * try to grab a full buffer
         */
+       LOCK_R();
        UNLINK_FIFO(rbuf, full_recv_fifo, link);
-       if (rbuf != NULL)
-               full_recvbufs--;
-       UNLOCK();
+       if (rbuf != NULL && full_recvbufs)
+               --full_recvbufs;
+       UNLOCK_R();
 
        return rbuf;
 }
@@ -265,12 +309,18 @@ purge_recv_buffers_for_fd(
        recvbuf_t *rbufp;
        recvbuf_t *next;
        recvbuf_t *punlinked;
+       recvbuf_t *freelist = NULL;
 
-       LOCK();
+       /* We want to hold only one lock at a time. So we do a scan on
+        * the full buffer queue, collecting items as we go, and when
+        * done we spool the the collected items to 'freerecvbuf()'.
+        */
+       LOCK_R();
 
        for (rbufp = HEAD_FIFO(full_recv_fifo);
             rbufp != NULL;
-            rbufp = next) {
+            rbufp = next)
+       {
                next = rbufp->link;
 #          ifdef HAVE_IO_COMPLETION_PORT
                if (rbufp->dstadr == NULL && rbufp->fd == fd)
@@ -281,12 +331,20 @@ purge_recv_buffers_for_fd(
                        UNLINK_MID_FIFO(punlinked, full_recv_fifo,
                                        rbufp, link, recvbuf_t);
                        INSIST(punlinked == rbufp);
-                       full_recvbufs--;
-                       freerecvbuf(rbufp);
+                       if (full_recvbufs)
+                               --full_recvbufs;
+                       rbufp->link = freelist;
+                       freelist = rbufp;
                }
        }
 
-       UNLOCK();
+       UNLOCK_R();
+       
+       while (freelist) {
+               next = freelist->link;
+               freerecvbuf(freelist);
+               freelist = next;
+       }
 }
 
 
index 498961f3b3c76ab3da7ba06923d997934d27a5fb..979a7c4aea8eaf1ff2871ef24f1e5ce23743a89f 100644 (file)
@@ -27,13 +27,13 @@ clamp_rounded(
        dval = floor(dval + 0.5);
 
        /* clamp / saturate */
-       if (dval >= LONG_MAX)
+       if (dval >= (double)LONG_MAX)
                return LONG_MAX;
-       if (dval <= LONG_MIN)
+       if (dval <= (double)LONG_MIN)
                return LONG_MIN;
        return (long)dval;
-       
 }
+
 double
 dbl_from_var_long(
        long    lval,
@@ -80,4 +80,3 @@ usec_long_from_dbl(
 {
        return clamp_rounded(dval * 1e+6);
 }
-
index 4ad1c7fbc3e5f463df61368d8e75583bfff6aa1f..c0dcea598121e72a6fa3a7a11a65a37d22b85c62 100644 (file)
@@ -3293,15 +3293,20 @@ read_refclock_packet(
        int                     consumed;
        struct recvbuf *        rb;
 
-       rb = get_free_recv_buffer();
+       rb = get_free_recv_buffer(TRUE);
 
        if (NULL == rb) {
                /*
-                * No buffer space available - just drop the packet
+                * No buffer space available - just drop the 'packet'.
+                * Since this is a non-blocking character stream we read
+                * all data that we can.
+                *
+                * ...hmmmm... what about "tcflush(fd,TCIFLUSH)" here?!?
                 */
-               char buf[RX_BUFF_SIZE];
-
-               buflen = read(fd, buf, sizeof buf);
+               char buf[128];
+               do
+                       buflen = read(fd, buf, sizeof(buf));
+               while (buflen > 0);
                packets_dropped++;
                return (buflen);
        }
@@ -3487,15 +3492,18 @@ read_network_packet(
 #endif
 
        /*
-        * Get a buffer and read the frame.  If we
-        * haven't got a buffer, or this is received
-        * on a disallowed socket, just dump the
+        * Get a buffer and read the frame.  If we haven't got a buffer,
+        * or this is received on a disallowed socket, just dump the
         * packet.
         */
 
-       rb = get_free_recv_buffer();
-       if (NULL == rb || itf->ignore_packets) {
-               char buf[RX_BUFF_SIZE];
+       rb = itf->ignore_packets ? NULL : get_free_recv_buffer(FALSE);
+       if (NULL == rb) {
+               /* A partial read on a UDP socket truncates the data and
+                * removes the message from the queue. So there's no
+                * need to have a full buffer here on the stack.
+                */ 
+               char buf[16];
                sockaddr_u from;
 
                if (rb != NULL)
@@ -4740,12 +4748,14 @@ process_routing_msgs(struct asyncio_reader *reader)
 #ifdef HAVE_RTNETLINK
        for (nh = UA_PTR(struct nlmsghdr, buffer);
             NLMSG_OK(nh, cnt);
-            nh = NLMSG_NEXT(nh, cnt)) {
+            nh = NLMSG_NEXT(nh, cnt))
+       {
                msg_type = nh->nlmsg_type;
 #else
        for (p = buffer;
             (p + sizeof(struct rt_msghdr)) <= (buffer + cnt);
-            p += rtm.rtm_msglen) {
+            p += rtm.rtm_msglen)
+       {
                memcpy(&rtm, p, sizeof(rtm));
                if (rtm.rtm_version != RTM_VERSION) {
                        msyslog(LOG_ERR,
index b4a65b80377fb6d2f72e60f19022226c32c9f823..043bc8673c7e2cd617fa9e16c64cdedfce4180a4 100644 (file)
@@ -2366,7 +2366,7 @@ local_input(
                        }
                        if (count)
                        {       /* simulate receive */
-                               buf = get_free_recv_buffer();
+                               buf = get_free_recv_buffer(TRUE);
                                if (buf != NULL) {
                                        memmove((caddr_t)buf->recv_buffer,
                                                (caddr_t)&parse->parseio.parse_dtime,
index ce8c4a1ddc473d04e84ee99a7b2e5c1f17f6fba9..fd48604c912b0e8ad394be1be726f04429fae919 100644 (file)
@@ -1984,7 +1984,7 @@ input_handler(void)
                        continue;
                }
 
-               rb = get_free_recv_buffer();
+               rb = get_free_recv_buffer(TRUE);
 
                fromlen = sizeof(rb->recv_srcadr);
                rb->recv_length = recvfrom(fdc, (char *)&rb->recv_pkt,
index 475371169a2a4ebe84b583b957442b783ddd1827..16197e34fdee31a2a8d1d39418d45bff6bd7eaf7 100644 (file)
@@ -160,9 +160,9 @@ FacilityNames=(System=0x0:FACILITY_SYSTEM
 //
 // Define the severity codes
 //
-#define STATUS_SEVERITY_WARNING          0x2
 #define STATUS_SEVERITY_SUCCESS          0x0
 #define STATUS_SEVERITY_INFORMATIONAL    0x1
+#define STATUS_SEVERITY_WARNING          0x2
 #define STATUS_SEVERITY_ERROR            0x3
 
 
index c5db62a8dd5f47892a142773325355eeed4c13c6..9cf095225bc24009a65e50318ed60c0b9514d87d 100644 (file)
@@ -708,7 +708,7 @@ QueueSerialWait(
        recvbuf_t *     buff
        )
 {
-       static const char * const msg =
+       static const char * const msgh =
                "QueueSerialWait: cannot wait for COM event";
 
        BOOL    rc;
@@ -720,7 +720,7 @@ QueueSerialWait(
        buff->fd = lpo->iopad->riofd;
        /* keep receive position for continuation of partial lines! */
        rc  = WaitCommEvent(lpo->io.hnd, &lpo->aux.com_events, &lpo->ol);
-       return rc || IoResultCheck(GetLastError(), lpo, msg);
+       return rc || IoResultCheck(GetLastError(), lpo, msgh);
 }
 
 /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
@@ -731,7 +731,7 @@ OnSerialWaitComplete(
        IoCtx_t *       lpo
        )
 {
-       static const char * const msg =
+       static const char * const msgh =
                "OnSerialWaitComplete: wait for COM event failed";
 
        DevCtx_t *      dev;
@@ -740,7 +740,7 @@ OnSerialWaitComplete(
        u_long          covc;
 
        /* Make sure this RIO is not closed. */
-       if (NULL == getRioFromIoCtx(lpo, key, msg))
+       if (NULL == getRioFromIoCtx(lpo, key, msgh))
                return;
 
        /* start next IO and leave if we hit an error */
@@ -847,7 +847,7 @@ QueueSerialReadCommon(
        recvbuf_t *     buff
        )
 {
-       static const char * const msg =
+       static const char * const msgh =
                "QueueSerialRead: cannot schedule device read";
 
        BOOL    rc;
@@ -866,7 +866,7 @@ QueueSerialReadCommon(
                (char*)buff->recv_buffer + buff->recv_length,
                sizeof(buff->recv_buffer) - buff->recv_length,
                NULL, &lpo->ol);
-       return rc || IoResultCheck(GetLastError(), lpo, msg);
+       return rc || IoResultCheck(GetLastError(), lpo, msgh);
 }
 
 /*
@@ -904,11 +904,11 @@ OnSerialReadComplete(
        IoCtx_t *       lpo
        )
 {
-       static const char * const msg =
+       static const char * const msgh =
                "OnSerialReadComplete: read from device failed";
 
        /* Make sure this RIO is not closed. */
-       if (NULL == getRioFromIoCtx(lpo, key, msg))
+       if (NULL == getRioFromIoCtx(lpo, key, msgh))
                return;
 
        /* start next IO and leave if we hit an error */
@@ -1035,7 +1035,12 @@ OnSerialReadWorker(
 
 st_new_obuf:
        /* Get new receive buffer to store the line. */
-       obuf = get_free_recv_buffer_alloc();
+       obuf = get_free_recv_buffer_alloc(TRUE);
+       if (!obuf) {
+               ++packets_dropped; /* maybe atomic? */
+               buff->recv_length = 0;
+               goto st_read_fresh;
+       }
        obuf->fd        = buff->fd;
        obuf->receiver  = buff->receiver;
        obuf->dstadr    = NULL;
@@ -1154,11 +1159,11 @@ OnRawSerialReadComplete(
        IoCtx_t *       lpo
        )
 {
-       static const char * const msg =
+       static const char * const msgh =
                "OnRawSerialReadComplete: read from device failed";
 
        recvbuf_t *     buff = lpo->recv_buf;
-       RIO_t *         rio  = getRioFromIoCtx(lpo, key, msg);
+       RIO_t *         rio  = getRioFromIoCtx(lpo, key, msgh);
        /* Make sure this RIO is not closed. */
        if (rio == NULL)
                return;
@@ -1167,10 +1172,16 @@ OnRawSerialReadComplete(
        if (lpo->errCode == ERROR_SUCCESS && lpo->byteCount > 0) {
                buff->recv_length = (int)lpo->byteCount;
                set_serial_recv_time(buff, lpo);
-               iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff);
-               buff = get_free_recv_buffer_alloc();
+               lpo->recv_buf = get_free_recv_buffer_alloc(TRUE);
+               if (lpo->recv_buf) {
+                       iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff);
+               } else {
+                       ++packets_dropped; /* maybe atomic? */
+                       buff->recv_length = 0;
+                       lpo->recv_buf = buff;
+               }
        }
-       IoCtxStartChecked(lpo, QueueSerialWait, buff);
+       IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
 }
 
 
@@ -1217,7 +1228,7 @@ async_write(
        unsigned int    count
        )
 {
-       static const char * const msg =
+       static const char * const msgh =
                "async_write: cannot schedule device write";
        static const char * const dmsg =
                "overlapped IO data buffer";
@@ -1242,7 +1253,7 @@ async_write(
 
        rc = WriteFile(lpo->io.hnd, lpo->trans_buf, count,
                       NULL, &lpo->ol);
-       if (rc || IoResultCheck(GetLastError(), lpo, msg))
+       if (rc || IoResultCheck(GetLastError(), lpo, msgh))
                return count;   /* normal/success return */
 
        errno = EBADF;
@@ -1264,10 +1275,10 @@ OnSerialWriteComplete(
         * error processing, and it returns with a valid RIO, just
         * drop the complete context.
         */
-       static const char * const msg =
+       static const char * const msgh =
                "OnSerialWriteComplete: serial output failed";
 
-       if (NULL != getRioFromIoCtx(lpo, key, msg))
+       if (NULL != getRioFromIoCtx(lpo, key, msgh))
                IoCtxRelease(lpo);
 }
 
@@ -1367,6 +1378,7 @@ io_completion_port_add_clock_io(
        IoCtx_t *       lpo;
        HANDLE          h;
        IoHndPad_T *    iopad = NULL;
+       recvbuf_t *     rbuf;
 
        /* preset to clear state for error cleanup:*/
        rio->ioreg_ctx  = NULL;
@@ -1395,8 +1407,7 @@ io_completion_port_add_clock_io(
        }
 
        if (NULL == (lpo = IoCtxAlloc(iopad, rio->device_ctx))) {
-               msyslog(LOG_ERR, "%: Failed to allocate IO context",
-                       msgh);
+               msyslog(LOG_ERR, "%: no IO context: %m", msgh);
                goto fail;
        }
 
@@ -1407,7 +1418,11 @@ io_completion_port_add_clock_io(
        }
        lpo->io.hnd = h;
        memset(&lpo->aux, 0, sizeof(lpo->aux));
-       return QueueSerialWait(lpo, get_free_recv_buffer_alloc());
+       if (NULL == (rbuf = get_free_recv_buffer_alloc(TRUE))) {
+               msyslog(LOG_ERR, "%s: no receive buffer: %m", msgh);
+               goto fail;
+       }
+       return QueueSerialWait(lpo, rbuf);
 
 fail:
        rio->ioreg_ctx  = iohpDetach(rio->ioreg_ctx);
@@ -1470,7 +1485,7 @@ QueueSocketRecv(
        recvbuf_t *     buff
        )
 {
-       static const char * const msg =
+       static const char * const msgh =
                "QueueSocketRecv: cannot schedule socket receive";
 
        WSABUF  wsabuf;
@@ -1492,7 +1507,7 @@ QueueSocketRecv(
        rc = WSARecvFrom(lpo->io.sfd, &wsabuf, 1, NULL, &lpo->ioFlags,
                         &buff->recv_srcadr.sa, &buff->recv_srcadr_len, 
                         &lpo->ol, NULL);
-       return !rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msg);
+       return !rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msgh);
 }
 
 /* ----------------------------------------------------------------- */
@@ -1502,7 +1517,7 @@ OnSocketRecv(
        IoCtx_t *       lpo
        )
 {
-       static const char * const msg =
+       static const char * const msgh =
                "OnSocketRecv: receive from socket failed";
 
        recvbuf_t *     buff    = NULL;
@@ -1511,7 +1526,7 @@ OnSocketRecv(
        int             rc;
 
        /* order is important -- check first, then get endpoint! */
-       rc = socketErrorCheck(lpo, msg);
+       rc = socketErrorCheck(lpo, msgh);
        ep = getEndptFromIoCtx(lpo, key);
 
        /* Make sure this endpoint is not closed. */
@@ -1528,10 +1543,15 @@ OnSocketRecv(
        if (rc == PKT_OK && lpo->byteCount > 0) {
                /* keep input buffer, create new one for IO */
                buff              = lpo->recv_buf;
-               lpo->recv_buf     = get_free_recv_buffer_alloc();
-
-               buff->recv_time   = lpo->aux.RecvTime;
-               buff->recv_length = (int)lpo->byteCount;
+               lpo->recv_buf     = get_free_recv_buffer_alloc(FALSE);
+               if (lpo->recv_buf) {
+                       buff->recv_time   = lpo->aux.RecvTime;
+                       buff->recv_length = (int)lpo->byteCount;
+               } else {
+                       lpo->recv_buf = buff;
+                       buff = NULL;
+                       ++packets_dropped; /* maybe atomic? */
+               }
 
        } /* Note: else we use the current buffer again */
 
@@ -1547,7 +1567,7 @@ OnSocketRecv(
         * then feed it to the input queue. And we can be sure we have
         * a packet here, so we can update the stats.
         */
-       if (buff != NULL) {
+       if (buff) {
                INSIST(buff->recv_srcadr_len <= sizeof(buff->recv_srcadr));
                DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n",
                        (MODE_BROADCAST == get_packet_mode(buff))
@@ -1578,14 +1598,14 @@ OnSocketSend(
        )
 {
        /* this is somewhat easier: */
-       static const char * const msg =
+       static const char * const msgh =
                "OnSocketSend: send to socket failed";
 
        endpt *         ep      = NULL;
        int             rc;
 
        /* order is important -- check first, then get endpoint! */
-       rc = socketErrorCheck(lpo, msg);
+       rc = socketErrorCheck(lpo, msgh);
        ep = getEndptFromIoCtx(lpo, key);
 
        /* Make sure this endpoint is not closed. */
@@ -1686,13 +1706,14 @@ io_completion_port_add_socket(
        /* Assume the endpoint is already registered. Set the socket
         * handle into the proper slot, and then start up the IO engine.
         */
-       static const char * const msg =
+       static const char * const msgh =
                "Can't add socket to i/o completion port";
 
        IoCtx_t *       lpo;
        size_t          n;
        ULONG_PTR       key;
        IoHndPad_T *    iopad = NULL;
+       recvbuf_t *     rbuf;
 
        key = ((ULONG_PTR)ep & ~(ULONG_PTR)1u) + !!bcast;
 
@@ -1709,16 +1730,20 @@ io_completion_port_add_socket(
        if (NULL == CreateIoCompletionPort((HANDLE)sfd,
                hndIOCPLPort, key, 0))
        {
-               msyslog(LOG_ERR, "%s: %m", msg);
+               msyslog(LOG_ERR, "%s: %m", msgh);
                goto fail;
        }
        for (n = s_SockRecvSched; n > 0; --n) {
                if (NULL == (lpo = IoCtxAlloc(ep->ioreg_ctx, NULL))) {
-                       msyslog(LOG_ERR, "%s: no read buffer: %m", msg);
+                       msyslog(LOG_ERR, "%s: no IO context: %m", msgh);
                        goto fail;
                }
                lpo->io.sfd = sfd;
-               if (!QueueSocketRecv(lpo, get_free_recv_buffer_alloc()))
+               if (NULL == (rbuf = get_free_recv_buffer_alloc(FALSE))) {
+                       msyslog(LOG_ERR, "%s: no receive buffer: %m", msgh);
+                       goto fail;
+               }
+               if (!QueueSocketRecv(lpo, rbuf))
                        goto fail;
        }
        return TRUE;
@@ -1765,7 +1790,7 @@ io_completion_port_sendto(
        sockaddr_u *    dest
        )
 {
-       static const char * const msg =
+       static const char * const msgh =
                "sendto: cannot schedule socket send";
        static const char * const dmsg =
                "overlapped IO data buffer";
@@ -1798,7 +1823,7 @@ io_completion_port_sendto(
        rc  = WSASendTo(sfd, &wsabuf, 1, NULL, 0,
                        &dest->sa, SOCKLEN(dest),
                        &lpo->ol, NULL);
-       if (!rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msg))
+       if (!rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msgh))
                return (int)len;        /* normal/success return */
 
        errno = EBADF;
index 6c089bb4c33f821b9a9ae5b4ccca8c1f4a0aa38e..90986e0aba957b40770f6d3c13b38866ceed40e9 100644 (file)
@@ -29,7 +29,7 @@ test_Initialization(void) {
 void
 test_GetAndFree(void) {
        u_long initial = free_recvbuffs();
-       recvbuf_t* buf = get_free_recv_buffer();
+       recvbuf_t* buf = get_free_recv_buffer(TRUE);
 
        TEST_ASSERT_EQUAL_UINT(initial-1, free_recvbuffs());
        freerecvbuf(buf);
@@ -40,7 +40,7 @@ test_GetAndFree(void) {
 void
 test_GetAndFill(void) {
        // int initial = free_recvbuffs();
-       recvbuf_t* buf = get_free_recv_buffer();
+       recvbuf_t* buf = get_free_recv_buffer(TRUE);
 
        add_full_recv_buffer(buf);
        TEST_ASSERT_EQUAL_UINT(1, full_recvbuffs());