From: Juergen Perlinger Date: Sat, 9 May 2020 06:20:44 +0000 (+0200) Subject: [Bug 3666] avoid unlimited receive buffer allocation X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=fe76ed75216daf980bd07a192ae55827efd2d8ce;p=thirdparty%2Fntp.git [Bug 3666] avoid unlimited receive buffer allocation bk: 5eb64bbcMQk6vLa3ERqwzpImTteOlw --- diff --git a/ChangeLog b/ChangeLog index 073fdb571..1008f370a 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +--- +* [Bug 3666] avoid unlimited receive buffer allocation + - limit number of receive buffers, with an iron reserve for refclocks + --- (4.2.8p15) 2020/04/xx Released by Harlan Stenn diff --git a/include/recvbuff.h b/include/recvbuff.h index 42597153e..2a4c84c5a 100644 --- a/include/recvbuff.h +++ b/include/recvbuff.h @@ -10,10 +10,26 @@ /* * recvbuf memory management */ -#define RECV_INIT 10 /* 10 buffers initially */ +#define RECV_INIT 64 /* 64 buffers initially */ #define RECV_LOWAT 3 /* when we're down to three buffers get more */ -#define RECV_INC 5 /* get 5 more at a time */ -#define RECV_TOOMANY 40 /* this is way too many buffers */ +#define RECV_INC 32 /* [power of 2] get 32 more at a time */ +#define RECV_BATCH 128 /* [power of 2] max increment in one sweep */ +#define RECV_TOOMANY 4096 /* this should suffice, really. TODO: tos option? */ + +/* If we have clocks, keep an iron reserve of receive buffers for + * clocks only. + */ +#if defined(REFCLOCK) +# if !defined(RECV_CLOCK) || RECV_CLOCK == 0 +# undef RECV_CLOCK +# define RECV_CLOCK 16 +# endif +#else +# if defined(RECV_CLOCK) +# undef RECV_CLOCK +# endif +# define RECV_CLOCK 0 +#endif #if defined HAVE_IO_COMPLETION_PORT # include "ntp_iocompletionport.h" @@ -90,10 +106,10 @@ extern void freerecvbuf(struct recvbuf *); * you put it back with freerecvbuf() or */ -/* signal safe - no malloc */ -extern struct recvbuf *get_free_recv_buffer(void); -/* signal unsafe - may malloc, never returs NULL */ -extern struct recvbuf *get_free_recv_buffer_alloc(void); +/* signal safe - no malloc, returns NULL when no bufs */ +extern struct recvbuf *get_free_recv_buffer(int /*BOOL*/ urgent); +/* signal unsafe - may malloc, returns NULL when no bufs */ +extern struct recvbuf *get_free_recv_buffer_alloc(int /*BOOL*/ urgent); /* Add a buffer to the full list */ diff --git a/lib/isc/win32/include/isc/int.h b/lib/isc/win32/include/isc/int.h index 2ee8bf96a..edcf59dd3 100644 --- a/lib/isc/win32/include/isc/int.h +++ b/lib/isc/win32/include/isc/int.h @@ -20,7 +20,9 @@ #ifndef ISC_INT_H #define ISC_INT_H 1 -#define _INTEGRAL_MAX_BITS 64 +#ifndef _INTEGRAL_MAX_BITS +# define _INTEGRAL_MAX_BITS 64 +#endif #include typedef __int8 isc_int8_t; diff --git a/libntp/recvbuff.c b/libntp/recvbuff.c index 573fdb2f9..5855ec214 100644 --- a/libntp/recvbuff.c +++ b/libntp/recvbuff.c @@ -11,6 +11,15 @@ #include "recvbuff.h" #include "iosignal.h" +#if (RECV_INC & (RECV_INC-1)) +# error RECV_INC not a power of 2! +#endif +#if (RECV_BATCH & (RECV_BATCH - 1)) +#error RECV_BATCH not a power of 2! +#endif +#if (RECV_BATCH < RECV_INC) +#error RECV_BATCH must be >= RECV_INC! +#endif /* * Memory allocation @@ -21,6 +30,8 @@ static u_long volatile total_recvbufs; /* total recvbufs currently in use */ static u_long volatile lowater_adds; /* number of times we have added memory */ static u_long volatile buffer_shortfall;/* number of missed free receive buffers between replenishments */ +static u_long limit_recvbufs; /* maximum total of receive buffers */ +static u_long emerg_recvbufs; /* emergency/urgent buffers to keep */ static DECL_FIFO_ANCHOR(recvbuf_t) full_recv_fifo; static recvbuf_t * free_recv_list; @@ -33,11 +44,16 @@ static recvbuf_t * free_recv_list; * short a time as possible */ static CRITICAL_SECTION RecvLock; -# define LOCK() EnterCriticalSection(&RecvLock) -# define UNLOCK() LeaveCriticalSection(&RecvLock) +static CRITICAL_SECTION FreeLock; +# define LOCK_R() EnterCriticalSection(&RecvLock) +# define UNLOCK_R() LeaveCriticalSection(&RecvLock) +# define LOCK_F() EnterCriticalSection(&FreeLock) +# define UNLOCK_F() LeaveCriticalSection(&FreeLock) #else -# define LOCK() do {} while (FALSE) -# define UNLOCK() do {} while (FALSE) +# define LOCK_R() do {} while (FALSE) +# define UNLOCK_R() do {} while (FALSE) +# define LOCK_F() do {} while (FALSE) +# define UNLOCK_F() do {} while (FALSE) #endif #ifdef DEBUG @@ -76,33 +92,52 @@ initialise_buffer(recvbuf_t *buff) } static void -create_buffers(int nbufs) +create_buffers( + size_t nbufs) { +# ifndef DEBUG + static const u_int chunk = RECV_INC; +# else + /* Allocate each buffer individually so they can be free()d + * during ntpd shutdown on DEBUG builds to keep them out of heap + * leak reports. + */ + static const u_int chunk = 1; +# endif + register recvbuf_t *bufp; - int i, abuf; + u_int i; + size_t abuf; + if (limit_recvbufs <= total_recvbufs) + return; + abuf = nbufs + buffer_shortfall; buffer_shortfall = 0; -#ifndef DEBUG - bufp = eallocarray(abuf, sizeof(*bufp)); -#endif - - for (i = 0; i < abuf; i++) { -#ifdef DEBUG - /* - * Allocate each buffer individually so they can be - * free()d during ntpd shutdown on DEBUG builds to - * keep them out of heap leak reports. - */ - bufp = emalloc_zero(sizeof(*bufp)); -#endif - LINK_SLIST(free_recv_list, bufp, link); - bufp++; - free_recvbufs++; - total_recvbufs++; + if (abuf < nbufs || abuf > RECV_BATCH) + abuf = RECV_BATCH; /* clamp on overflow */ + else + abuf += (~abuf + 1) & (RECV_INC - 1); /* round up */ + + if (abuf > (limit_recvbufs - total_recvbufs)) + abuf = limit_recvbufs - total_recvbufs; + abuf += (~abuf + 1) & (chunk - 1); /* round up */ + + while (abuf) { + bufp = calloc(chunk, sizeof(*bufp)); + if (!bufp) { + limit_recvbufs = total_recvbufs; + break; + } + for (i = chunk; i; --i,++bufp) { + LINK_SLIST(free_recv_list, bufp, link); + } + free_recvbufs += chunk; + total_recvbufs += chunk; + abuf -= chunk; } - lowater_adds++; + ++lowater_adds; } void @@ -115,15 +150,19 @@ init_recvbuff(int nbufs) free_recvbufs = total_recvbufs = 0; full_recvbufs = lowater_adds = 0; + limit_recvbufs = RECV_TOOMANY; + emerg_recvbufs = RECV_CLOCK; + create_buffers(nbufs); -#if defined(SYS_WINNT) +# if defined(SYS_WINNT) InitializeCriticalSection(&RecvLock); -#endif + InitializeCriticalSection(&FreeLock); +# endif -#ifdef DEBUG +# ifdef DEBUG atexit(&uninit_recvbuff); -#endif +# endif } @@ -146,6 +185,10 @@ uninit_recvbuff(void) break; free(rbunlinked); } +# if defined(SYS_WINNT) + DeleteCriticalSection(&FreeLock); + DeleteCriticalSection(&RecvLock); +# endif } #endif /* DEBUG */ @@ -157,13 +200,14 @@ void freerecvbuf(recvbuf_t *rb) { if (rb) { - LOCK(); - rb->used--; - if (rb->used != 0) + if (--rb->used != 0) { msyslog(LOG_ERR, "******** freerecvbuff non-zero usage: %d *******", rb->used); + rb->used = 0; + } + LOCK_F(); LINK_SLIST(free_recv_list, rb, link); - free_recvbufs++; - UNLOCK(); + ++free_recvbufs; + UNLOCK_F(); } } @@ -175,28 +219,34 @@ add_full_recv_buffer(recvbuf_t *rb) msyslog(LOG_ERR, "add_full_recv_buffer received NULL buffer"); return; } - LOCK(); + LOCK_R(); LINK_FIFO(full_recv_fifo, rb, link); - full_recvbufs++; - UNLOCK(); + ++full_recvbufs; + UNLOCK_R(); } recvbuf_t * -get_free_recv_buffer(void) +get_free_recv_buffer( + int /*BOOL*/ urgent + ) { - recvbuf_t *buffer; + recvbuf_t *buffer = NULL; - LOCK(); - UNLINK_HEAD_SLIST(buffer, free_recv_list, link); + LOCK_F(); + if (free_recvbufs > (urgent ? emerg_recvbufs : 0)) { + UNLINK_HEAD_SLIST(buffer, free_recv_list, link); + } + if (buffer != NULL) { - free_recvbufs--; + if (free_recvbufs) + --free_recvbufs; initialise_buffer(buffer); - buffer->used++; + ++buffer->used; } else { - buffer_shortfall++; + ++buffer_shortfall; } - UNLOCK(); + UNLOCK_F(); return buffer; } @@ -204,17 +254,15 @@ get_free_recv_buffer(void) #ifdef HAVE_IO_COMPLETION_PORT recvbuf_t * -get_free_recv_buffer_alloc(void) +get_free_recv_buffer_alloc( + int /*BOOL*/ urgent + ) { - recvbuf_t *buffer; - - buffer = get_free_recv_buffer(); - if (NULL == buffer) { + LOCK_F(); + if (free_recvbufs <= emerg_recvbufs || buffer_shortfall > 0) create_buffers(RECV_INC); - buffer = get_free_recv_buffer(); - } - ENSURE(buffer != NULL); - return (buffer); + UNLOCK_F(); + return get_free_recv_buffer(urgent); } #endif @@ -224,30 +272,26 @@ get_full_recv_buffer(void) { recvbuf_t * rbuf; - LOCK(); - /* - * make sure there are free buffers when we - * wander off to do lengthy packet processing with - * any buffer we grab from the full list. + * make sure there are free buffers when we wander off to do + * lengthy packet processing with any buffer we grab from the + * full list. * - * fixes malloc() interrupted by SIGIO risk - * (Bug 889) + * fixes malloc() interrupted by SIGIO risk (Bug 889) */ - if (NULL == free_recv_list || buffer_shortfall > 0) { - /* - * try to get us some more buffers - */ + LOCK_F(); + if (free_recvbufs <= emerg_recvbufs || buffer_shortfall > 0) create_buffers(RECV_INC); - } + UNLOCK_F(); /* * try to grab a full buffer */ + LOCK_R(); UNLINK_FIFO(rbuf, full_recv_fifo, link); - if (rbuf != NULL) - full_recvbufs--; - UNLOCK(); + if (rbuf != NULL && full_recvbufs) + --full_recvbufs; + UNLOCK_R(); return rbuf; } @@ -265,12 +309,18 @@ purge_recv_buffers_for_fd( recvbuf_t *rbufp; recvbuf_t *next; recvbuf_t *punlinked; + recvbuf_t *freelist = NULL; - LOCK(); + /* We want to hold only one lock at a time. So we do a scan on + * the full buffer queue, collecting items as we go, and when + * done we spool the the collected items to 'freerecvbuf()'. + */ + LOCK_R(); for (rbufp = HEAD_FIFO(full_recv_fifo); rbufp != NULL; - rbufp = next) { + rbufp = next) + { next = rbufp->link; # ifdef HAVE_IO_COMPLETION_PORT if (rbufp->dstadr == NULL && rbufp->fd == fd) @@ -281,12 +331,20 @@ purge_recv_buffers_for_fd( UNLINK_MID_FIFO(punlinked, full_recv_fifo, rbufp, link, recvbuf_t); INSIST(punlinked == rbufp); - full_recvbufs--; - freerecvbuf(rbufp); + if (full_recvbufs) + --full_recvbufs; + rbufp->link = freelist; + freelist = rbufp; } } - UNLOCK(); + UNLOCK_R(); + + while (freelist) { + next = freelist->link; + freerecvbuf(freelist); + freelist = next; + } } diff --git a/libntp/timexsup.c b/libntp/timexsup.c index 498961f3b..979a7c4ae 100644 --- a/libntp/timexsup.c +++ b/libntp/timexsup.c @@ -27,13 +27,13 @@ clamp_rounded( dval = floor(dval + 0.5); /* clamp / saturate */ - if (dval >= LONG_MAX) + if (dval >= (double)LONG_MAX) return LONG_MAX; - if (dval <= LONG_MIN) + if (dval <= (double)LONG_MIN) return LONG_MIN; return (long)dval; - } + double dbl_from_var_long( long lval, @@ -80,4 +80,3 @@ usec_long_from_dbl( { return clamp_rounded(dval * 1e+6); } - diff --git a/ntpd/ntp_io.c b/ntpd/ntp_io.c index 4ad1c7fbc..c0dcea598 100644 --- a/ntpd/ntp_io.c +++ b/ntpd/ntp_io.c @@ -3293,15 +3293,20 @@ read_refclock_packet( int consumed; struct recvbuf * rb; - rb = get_free_recv_buffer(); + rb = get_free_recv_buffer(TRUE); if (NULL == rb) { /* - * No buffer space available - just drop the packet + * No buffer space available - just drop the 'packet'. + * Since this is a non-blocking character stream we read + * all data that we can. + * + * ...hmmmm... what about "tcflush(fd,TCIFLUSH)" here?!? */ - char buf[RX_BUFF_SIZE]; - - buflen = read(fd, buf, sizeof buf); + char buf[128]; + do + buflen = read(fd, buf, sizeof(buf)); + while (buflen > 0); packets_dropped++; return (buflen); } @@ -3487,15 +3492,18 @@ read_network_packet( #endif /* - * Get a buffer and read the frame. If we - * haven't got a buffer, or this is received - * on a disallowed socket, just dump the + * Get a buffer and read the frame. If we haven't got a buffer, + * or this is received on a disallowed socket, just dump the * packet. */ - rb = get_free_recv_buffer(); - if (NULL == rb || itf->ignore_packets) { - char buf[RX_BUFF_SIZE]; + rb = itf->ignore_packets ? NULL : get_free_recv_buffer(FALSE); + if (NULL == rb) { + /* A partial read on a UDP socket truncates the data and + * removes the message from the queue. So there's no + * need to have a full buffer here on the stack. + */ + char buf[16]; sockaddr_u from; if (rb != NULL) @@ -4740,12 +4748,14 @@ process_routing_msgs(struct asyncio_reader *reader) #ifdef HAVE_RTNETLINK for (nh = UA_PTR(struct nlmsghdr, buffer); NLMSG_OK(nh, cnt); - nh = NLMSG_NEXT(nh, cnt)) { + nh = NLMSG_NEXT(nh, cnt)) + { msg_type = nh->nlmsg_type; #else for (p = buffer; (p + sizeof(struct rt_msghdr)) <= (buffer + cnt); - p += rtm.rtm_msglen) { + p += rtm.rtm_msglen) + { memcpy(&rtm, p, sizeof(rtm)); if (rtm.rtm_version != RTM_VERSION) { msyslog(LOG_ERR, diff --git a/ntpd/refclock_parse.c b/ntpd/refclock_parse.c index b4a65b803..043bc8673 100644 --- a/ntpd/refclock_parse.c +++ b/ntpd/refclock_parse.c @@ -2366,7 +2366,7 @@ local_input( } if (count) { /* simulate receive */ - buf = get_free_recv_buffer(); + buf = get_free_recv_buffer(TRUE); if (buf != NULL) { memmove((caddr_t)buf->recv_buffer, (caddr_t)&parse->parseio.parse_dtime, diff --git a/ntpdate/ntpdate.c b/ntpdate/ntpdate.c index ce8c4a1dd..fd48604c9 100644 --- a/ntpdate/ntpdate.c +++ b/ntpdate/ntpdate.c @@ -1984,7 +1984,7 @@ input_handler(void) continue; } - rb = get_free_recv_buffer(); + rb = get_free_recv_buffer(TRUE); fromlen = sizeof(rb->recv_srcadr); rb->recv_length = recvfrom(fdc, (char *)&rb->recv_pkt, diff --git a/ports/winnt/libntp/messages.h b/ports/winnt/libntp/messages.h index 475371169..16197e34f 100644 --- a/ports/winnt/libntp/messages.h +++ b/ports/winnt/libntp/messages.h @@ -160,9 +160,9 @@ FacilityNames=(System=0x0:FACILITY_SYSTEM // // Define the severity codes // -#define STATUS_SEVERITY_WARNING 0x2 #define STATUS_SEVERITY_SUCCESS 0x0 #define STATUS_SEVERITY_INFORMATIONAL 0x1 +#define STATUS_SEVERITY_WARNING 0x2 #define STATUS_SEVERITY_ERROR 0x3 diff --git a/ports/winnt/ntpd/ntp_iocompletionport.c b/ports/winnt/ntpd/ntp_iocompletionport.c index c5db62a8d..9cf095225 100644 --- a/ports/winnt/ntpd/ntp_iocompletionport.c +++ b/ports/winnt/ntpd/ntp_iocompletionport.c @@ -708,7 +708,7 @@ QueueSerialWait( recvbuf_t * buff ) { - static const char * const msg = + static const char * const msgh = "QueueSerialWait: cannot wait for COM event"; BOOL rc; @@ -720,7 +720,7 @@ QueueSerialWait( buff->fd = lpo->iopad->riofd; /* keep receive position for continuation of partial lines! */ rc = WaitCommEvent(lpo->io.hnd, &lpo->aux.com_events, &lpo->ol); - return rc || IoResultCheck(GetLastError(), lpo, msg); + return rc || IoResultCheck(GetLastError(), lpo, msgh); } /* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ @@ -731,7 +731,7 @@ OnSerialWaitComplete( IoCtx_t * lpo ) { - static const char * const msg = + static const char * const msgh = "OnSerialWaitComplete: wait for COM event failed"; DevCtx_t * dev; @@ -740,7 +740,7 @@ OnSerialWaitComplete( u_long covc; /* Make sure this RIO is not closed. */ - if (NULL == getRioFromIoCtx(lpo, key, msg)) + if (NULL == getRioFromIoCtx(lpo, key, msgh)) return; /* start next IO and leave if we hit an error */ @@ -847,7 +847,7 @@ QueueSerialReadCommon( recvbuf_t * buff ) { - static const char * const msg = + static const char * const msgh = "QueueSerialRead: cannot schedule device read"; BOOL rc; @@ -866,7 +866,7 @@ QueueSerialReadCommon( (char*)buff->recv_buffer + buff->recv_length, sizeof(buff->recv_buffer) - buff->recv_length, NULL, &lpo->ol); - return rc || IoResultCheck(GetLastError(), lpo, msg); + return rc || IoResultCheck(GetLastError(), lpo, msgh); } /* @@ -904,11 +904,11 @@ OnSerialReadComplete( IoCtx_t * lpo ) { - static const char * const msg = + static const char * const msgh = "OnSerialReadComplete: read from device failed"; /* Make sure this RIO is not closed. */ - if (NULL == getRioFromIoCtx(lpo, key, msg)) + if (NULL == getRioFromIoCtx(lpo, key, msgh)) return; /* start next IO and leave if we hit an error */ @@ -1035,7 +1035,12 @@ OnSerialReadWorker( st_new_obuf: /* Get new receive buffer to store the line. */ - obuf = get_free_recv_buffer_alloc(); + obuf = get_free_recv_buffer_alloc(TRUE); + if (!obuf) { + ++packets_dropped; /* maybe atomic? */ + buff->recv_length = 0; + goto st_read_fresh; + } obuf->fd = buff->fd; obuf->receiver = buff->receiver; obuf->dstadr = NULL; @@ -1154,11 +1159,11 @@ OnRawSerialReadComplete( IoCtx_t * lpo ) { - static const char * const msg = + static const char * const msgh = "OnRawSerialReadComplete: read from device failed"; recvbuf_t * buff = lpo->recv_buf; - RIO_t * rio = getRioFromIoCtx(lpo, key, msg); + RIO_t * rio = getRioFromIoCtx(lpo, key, msgh); /* Make sure this RIO is not closed. */ if (rio == NULL) return; @@ -1167,10 +1172,16 @@ OnRawSerialReadComplete( if (lpo->errCode == ERROR_SUCCESS && lpo->byteCount > 0) { buff->recv_length = (int)lpo->byteCount; set_serial_recv_time(buff, lpo); - iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff); - buff = get_free_recv_buffer_alloc(); + lpo->recv_buf = get_free_recv_buffer_alloc(TRUE); + if (lpo->recv_buf) { + iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff); + } else { + ++packets_dropped; /* maybe atomic? */ + buff->recv_length = 0; + lpo->recv_buf = buff; + } } - IoCtxStartChecked(lpo, QueueSerialWait, buff); + IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf); } @@ -1217,7 +1228,7 @@ async_write( unsigned int count ) { - static const char * const msg = + static const char * const msgh = "async_write: cannot schedule device write"; static const char * const dmsg = "overlapped IO data buffer"; @@ -1242,7 +1253,7 @@ async_write( rc = WriteFile(lpo->io.hnd, lpo->trans_buf, count, NULL, &lpo->ol); - if (rc || IoResultCheck(GetLastError(), lpo, msg)) + if (rc || IoResultCheck(GetLastError(), lpo, msgh)) return count; /* normal/success return */ errno = EBADF; @@ -1264,10 +1275,10 @@ OnSerialWriteComplete( * error processing, and it returns with a valid RIO, just * drop the complete context. */ - static const char * const msg = + static const char * const msgh = "OnSerialWriteComplete: serial output failed"; - if (NULL != getRioFromIoCtx(lpo, key, msg)) + if (NULL != getRioFromIoCtx(lpo, key, msgh)) IoCtxRelease(lpo); } @@ -1367,6 +1378,7 @@ io_completion_port_add_clock_io( IoCtx_t * lpo; HANDLE h; IoHndPad_T * iopad = NULL; + recvbuf_t * rbuf; /* preset to clear state for error cleanup:*/ rio->ioreg_ctx = NULL; @@ -1395,8 +1407,7 @@ io_completion_port_add_clock_io( } if (NULL == (lpo = IoCtxAlloc(iopad, rio->device_ctx))) { - msyslog(LOG_ERR, "%: Failed to allocate IO context", - msgh); + msyslog(LOG_ERR, "%: no IO context: %m", msgh); goto fail; } @@ -1407,7 +1418,11 @@ io_completion_port_add_clock_io( } lpo->io.hnd = h; memset(&lpo->aux, 0, sizeof(lpo->aux)); - return QueueSerialWait(lpo, get_free_recv_buffer_alloc()); + if (NULL == (rbuf = get_free_recv_buffer_alloc(TRUE))) { + msyslog(LOG_ERR, "%s: no receive buffer: %m", msgh); + goto fail; + } + return QueueSerialWait(lpo, rbuf); fail: rio->ioreg_ctx = iohpDetach(rio->ioreg_ctx); @@ -1470,7 +1485,7 @@ QueueSocketRecv( recvbuf_t * buff ) { - static const char * const msg = + static const char * const msgh = "QueueSocketRecv: cannot schedule socket receive"; WSABUF wsabuf; @@ -1492,7 +1507,7 @@ QueueSocketRecv( rc = WSARecvFrom(lpo->io.sfd, &wsabuf, 1, NULL, &lpo->ioFlags, &buff->recv_srcadr.sa, &buff->recv_srcadr_len, &lpo->ol, NULL); - return !rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msg); + return !rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msgh); } /* ----------------------------------------------------------------- */ @@ -1502,7 +1517,7 @@ OnSocketRecv( IoCtx_t * lpo ) { - static const char * const msg = + static const char * const msgh = "OnSocketRecv: receive from socket failed"; recvbuf_t * buff = NULL; @@ -1511,7 +1526,7 @@ OnSocketRecv( int rc; /* order is important -- check first, then get endpoint! */ - rc = socketErrorCheck(lpo, msg); + rc = socketErrorCheck(lpo, msgh); ep = getEndptFromIoCtx(lpo, key); /* Make sure this endpoint is not closed. */ @@ -1528,10 +1543,15 @@ OnSocketRecv( if (rc == PKT_OK && lpo->byteCount > 0) { /* keep input buffer, create new one for IO */ buff = lpo->recv_buf; - lpo->recv_buf = get_free_recv_buffer_alloc(); - - buff->recv_time = lpo->aux.RecvTime; - buff->recv_length = (int)lpo->byteCount; + lpo->recv_buf = get_free_recv_buffer_alloc(FALSE); + if (lpo->recv_buf) { + buff->recv_time = lpo->aux.RecvTime; + buff->recv_length = (int)lpo->byteCount; + } else { + lpo->recv_buf = buff; + buff = NULL; + ++packets_dropped; /* maybe atomic? */ + } } /* Note: else we use the current buffer again */ @@ -1547,7 +1567,7 @@ OnSocketRecv( * then feed it to the input queue. And we can be sure we have * a packet here, so we can update the stats. */ - if (buff != NULL) { + if (buff) { INSIST(buff->recv_srcadr_len <= sizeof(buff->recv_srcadr)); DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n", (MODE_BROADCAST == get_packet_mode(buff)) @@ -1578,14 +1598,14 @@ OnSocketSend( ) { /* this is somewhat easier: */ - static const char * const msg = + static const char * const msgh = "OnSocketSend: send to socket failed"; endpt * ep = NULL; int rc; /* order is important -- check first, then get endpoint! */ - rc = socketErrorCheck(lpo, msg); + rc = socketErrorCheck(lpo, msgh); ep = getEndptFromIoCtx(lpo, key); /* Make sure this endpoint is not closed. */ @@ -1686,13 +1706,14 @@ io_completion_port_add_socket( /* Assume the endpoint is already registered. Set the socket * handle into the proper slot, and then start up the IO engine. */ - static const char * const msg = + static const char * const msgh = "Can't add socket to i/o completion port"; IoCtx_t * lpo; size_t n; ULONG_PTR key; IoHndPad_T * iopad = NULL; + recvbuf_t * rbuf; key = ((ULONG_PTR)ep & ~(ULONG_PTR)1u) + !!bcast; @@ -1709,16 +1730,20 @@ io_completion_port_add_socket( if (NULL == CreateIoCompletionPort((HANDLE)sfd, hndIOCPLPort, key, 0)) { - msyslog(LOG_ERR, "%s: %m", msg); + msyslog(LOG_ERR, "%s: %m", msgh); goto fail; } for (n = s_SockRecvSched; n > 0; --n) { if (NULL == (lpo = IoCtxAlloc(ep->ioreg_ctx, NULL))) { - msyslog(LOG_ERR, "%s: no read buffer: %m", msg); + msyslog(LOG_ERR, "%s: no IO context: %m", msgh); goto fail; } lpo->io.sfd = sfd; - if (!QueueSocketRecv(lpo, get_free_recv_buffer_alloc())) + if (NULL == (rbuf = get_free_recv_buffer_alloc(FALSE))) { + msyslog(LOG_ERR, "%s: no receive buffer: %m", msgh); + goto fail; + } + if (!QueueSocketRecv(lpo, rbuf)) goto fail; } return TRUE; @@ -1765,7 +1790,7 @@ io_completion_port_sendto( sockaddr_u * dest ) { - static const char * const msg = + static const char * const msgh = "sendto: cannot schedule socket send"; static const char * const dmsg = "overlapped IO data buffer"; @@ -1798,7 +1823,7 @@ io_completion_port_sendto( rc = WSASendTo(sfd, &wsabuf, 1, NULL, 0, &dest->sa, SOCKLEN(dest), &lpo->ol, NULL); - if (!rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msg)) + if (!rc || IoResultCheck((DWORD)WSAGetLastError(), lpo, msgh)) return (int)len; /* normal/success return */ errno = EBADF; diff --git a/tests/libntp/recvbuff.c b/tests/libntp/recvbuff.c index 6c089bb4c..90986e0ab 100644 --- a/tests/libntp/recvbuff.c +++ b/tests/libntp/recvbuff.c @@ -29,7 +29,7 @@ test_Initialization(void) { void test_GetAndFree(void) { u_long initial = free_recvbuffs(); - recvbuf_t* buf = get_free_recv_buffer(); + recvbuf_t* buf = get_free_recv_buffer(TRUE); TEST_ASSERT_EQUAL_UINT(initial-1, free_recvbuffs()); freerecvbuf(buf); @@ -40,7 +40,7 @@ test_GetAndFree(void) { void test_GetAndFill(void) { // int initial = free_recvbuffs(); - recvbuf_t* buf = get_free_recv_buffer(); + recvbuf_t* buf = get_free_recv_buffer(TRUE); add_full_recv_buffer(buf); TEST_ASSERT_EQUAL_UINT(1, full_recvbuffs());