]> git.ipfire.org Git - thirdparty/ntp.git/commitdiff
Bug #527 - Fix buffer problems when under load
authorDanny Mayer <mayer@ntp.org>
Tue, 20 Dec 2005 14:25:05 +0000 (09:25 -0500)
committerDanny Mayer <mayer@ntp.org>
Tue, 20 Dec 2005 14:25:05 +0000 (09:25 -0500)
bk: 43a814415Rtvik3XmnX3fXE9sorHRA

ports/winnt/include/ntp_iocompletionport.h
ports/winnt/ntpd/ntp_iocompletionport.c

index fa91e0735f5c66585b6269ce23af8ae4546e1e58..549bed041ef68bab966af7ae340c6dd14ae67e0c 100644 (file)
@@ -32,7 +32,7 @@ extern        void    uninit_io_completion_port (void);
 
 extern int     io_completion_port_add_clock_io (struct refclockio * /*rio */);
 
-extern void    io_completion_port_add_socket (SOCKET fd, struct interface *);
+extern int     io_completion_port_add_socket (SOCKET fd, struct interface *);
 
 extern DWORD   io_completion_port_sendto (struct interface *, struct pkt *, int, struct sockaddr_storage*);
 
@@ -40,11 +40,6 @@ extern       HANDLE get_io_event (void);
 
 int GetReceivedBuffers(void);
 
-static int QueueIORead( struct refclockio * );
-
-static int OnSocketRecv(DWORD, IoCompletionInfo *, DWORD);
-static int OnIoReadComplete(DWORD, IoCompletionInfo *, DWORD);
-static int OnWriteComplete(DWORD, IoCompletionInfo *, DWORD);
 # endif
 
 #endif
index bc556393570ba6d5dab9aeb9eb448d326163c00e..d3d8f832e4a27b0d3a46e6f1f66971a0519a9e73 100644 (file)
 #include "ntp_iocompletionport.h"
 #include "transmitbuff.h"
 
+/*
+ * local function definitions
+ */
+static int QueueIORead( struct refclockio *, recvbuf_t *buff, IoCompletionInfo *lpo);
+static int QueueSocketRecv(SOCKET, recvbuf_t *buff, IoCompletionInfo *lpo);
+
+static int OnSocketRecv(DWORD, IoCompletionInfo *, DWORD);
+static int OnIoReadComplete(DWORD, IoCompletionInfo *, DWORD);
+static int OnWriteComplete(DWORD, IoCompletionInfo *, DWORD);
 
 #define BUFCHECK_SECS  10
 static void    TransmitCheckThread(void *NotUsed);
@@ -91,6 +100,12 @@ iocompletionthread(void *NotUsed)
                case SOCK_SEND:
                case CLOCK_WRITE:
                        OnWriteComplete(Key, lpo, BytesTransferred);
+                       /* Clear the allocated memory */
+                       if (lpo != NULL)
+                       {
+                               HeapFree(hHeapHandle, 0, lpo);
+                               lpo = NULL;
+                       }
                        break;
                default:
 #if DEBUG
@@ -101,12 +116,6 @@ iocompletionthread(void *NotUsed)
 #endif
                        break;
                }
-               /* Clear the allocated memory */
-               if (lpo != NULL)
-               {
-                       HeapFree(hHeapHandle, 0, lpo);
-                       lpo = NULL;
-               }
        }
 }
 
@@ -172,84 +181,88 @@ uninit_io_completion_port(
 }
 
 
-static int QueueIORead( struct refclockio *rio ) {
-
-       struct recvbuf *buff;
-       IoCompletionInfo *lpo;
-
-       lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle,
-                                            HEAP_ZERO_MEMORY,
-                                            sizeof(IoCompletionInfo));
-       if (lpo == NULL)
-               return 0;
+static int QueueIORead( struct refclockio *rio, recvbuf_t *buff, IoCompletionInfo *lpo) {
 
-       buff = get_free_recv_buffer();
+       memset(lpo, 0, sizeof(IoCompletionInfo));
+       memset(buff, 0, sizeof(recvbuf_t));
 
-       if (buff == NULL)
-       {
-               HeapFree(hHeapHandle, 0, lpo);
-               return 0;
-       }
-       
        lpo->request_type = CLOCK_READ;
        lpo->buff = buff;
 
        buff->fd = rio->fd;
        if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, (LPOVERLAPPED) lpo)) {
-                       DWORD Result = GetLastError();
-                       switch (Result) {                               
-                                       case NO_ERROR :
-                                       case ERROR_HANDLE_EOF :
-                                       case ERROR_IO_PENDING :
-                               break ;
-
-                               default:
-                                       msyslog(LOG_ERR, "Can't read from Refclock: %m");
-                                       freerecvbuf(buff);
-                                       /* Clear the heap */
-                                       if (lpo != NULL)
-                                               HeapFree(hHeapHandle, 0, lpo);
-                                       return 0;
-                       }
+               DWORD Result = GetLastError();
+               switch (Result) {                               
+               case NO_ERROR :
+               case ERROR_HANDLE_EOF :
+               case ERROR_IO_PENDING :
+                       break ;
+
+               /*
+                * Something bad happened
+                */
+               default:
+                       msyslog(LOG_ERR, "Can't read from Refclock: %m");
+                       freerecvbuf(buff);
+                       return 0;
+               }
        }
        return 1;
 }
 
 
 
+/* Return 1 on Successful Read */
 /* Return 1 on Successful Read */
 static int 
 OnIoReadComplete(DWORD i, IoCompletionInfo *lpo, DWORD Bytes)
 {
-       struct recvbuf *buff = NULL;
+       recvbuf_t *buff;
+       recvbuf_t *newbuff;
        struct refclockio * rio = (struct refclockio *) i;
 
        /*  Convert the overlapped pointer back to a recvbuf pointer.
        */
-       buff = (struct recvbuf *) lpo->buff;
-       if (Bytes > 0) { /* ignore 0 bytes read due to timeout's */
-               get_systime(&buff->recv_time);
-               buff->recv_length = (int) Bytes;
-               buff->receiver = rio->clock_recv;
-               buff->dstadr = NULL;
-               buff->recv_srcclock = rio->srcclock;
-               add_full_recv_buffer(buff);
-               if( !SetEvent( WaitableIoEventHandle ) ) {
+       buff = (recvbuf_t *) lpo->buff;
+       /*
+        * Get a new recv buffer for the next packet
+        */
+       newbuff = get_free_recv_buffer();
+       if (newbuff == NULL) {
+               /*
+                * recv buffers not available so we drop the packet
+                * and reuse the buffer.
+                */
+               newbuff = buff;
+       }
+       else 
+       {
+               if (Bytes > 0) { /* ignore 0 bytes read due to timeout's */
+                       get_systime(&buff->recv_time);
+                       buff->recv_length = (int) Bytes;
+                       buff->receiver = rio->clock_recv;
+                       buff->dstadr = NULL;
+                       buff->recv_srcclock = rio->srcclock;
+                       add_full_recv_buffer(buff);
+               }
+               else
+               {
+                       freerecvbuf(buff);
+               }
+       }
+       if( !SetEvent( WaitableIoEventHandle ) ) {
 #ifdef DEBUG
-                       if (debug > 3) {
-                               printf( "Error %d setting IoEventHandle\n", GetLastError() );
-                       }
-#endif
+               if (debug > 3) {
+                       printf( "Error %d setting IoEventHandle\n", GetLastError() );
                }
-               buff = NULL;
+#endif
        }
-       else 
-               freerecvbuf(buff);
 
-       QueueIORead( rio );
+       QueueIORead( rio, newbuff, lpo );
        return 1;
 }
 
+
 /*  Add a reference clock data structures I/O handles to
  *  the I/O completion port. Return 1 if any error.
  */  
@@ -258,33 +271,47 @@ io_completion_port_add_clock_io(
        struct refclockio *rio
        )
 {
+       IoCompletionInfo *lpo;
+       recvbuf_t *buff;
+
        if (NULL == CreateIoCompletionPort((HANDLE) rio->fd, hIoCompletionPort, (DWORD) rio, 0)) {
                msyslog(LOG_ERR, "Can't add COM port to i/o completion port: %m");
                return 1;
        }
-       QueueIORead( rio );
-       return 0;
+
+       lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle,
+                                            HEAP_ZERO_MEMORY,
+                                            sizeof(IoCompletionInfo));
+       if (lpo == NULL)
+       {
+               msyslog(LOG_ERR, "Can't allocate heap for completion port: %m");
+               return 1;
+       }
+
+       buff = get_free_recv_buffer();
+
+       if (buff == NULL)
+       {
+               msyslog(LOG_ERR, "Can't allocate memory for clock socket: %m");
+               HeapFree(hHeapHandle, 0, lpo);
+               return 1;
+       }
+       QueueIORead( rio, buff, lpo );
+       return 1;
 }
 
 /* Queue a receiver on a socket. Returns 0 if no buffer can be queued */
 
-static unsigned long QueueSocketRecv(SOCKET s) {
+static int QueueSocketRecv(SOCKET s, recvbuf_t *buff, IoCompletionInfo *lpo) {
        
-       struct recvbuf *buff;
-       IoCompletionInfo *lpo;
        int AddrLen;
 
-       lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle,
-                                            HEAP_ZERO_MEMORY,
-                                            sizeof(IoCompletionInfo));
        if (lpo == NULL)
        {
                msyslog(LOG_ERR, "Out of heap memory for recvbufs: %m");
                return 0;
        }
 
-       buff = get_free_recv_buffer();
-
        if (buff == NULL)
        {
                msyslog(LOG_ERR, "Out of memory for recvbufs: %m");
@@ -347,6 +374,7 @@ static int
 OnSocketRecv(DWORD i, IoCompletionInfo *lpo, DWORD Bytes)
 {
        struct recvbuf *buff = NULL;
+       struct recvbuf *newbuff = NULL;
        struct interface * inter = (struct interface *) i;
        
        /*  Convert the overlapped pointer back to a recvbuf pointer.
@@ -355,48 +383,87 @@ OnSocketRecv(DWORD i, IoCompletionInfo *lpo, DWORD Bytes)
        buff = (struct recvbuf *) lpo->buff;
        get_systime(&buff->recv_time);  
        
-       if (Bytes > 0 && inter->ignore_packets == ISC_FALSE) {  
-               buff->recv_length = (int) Bytes;
-               buff->receiver = receive; 
-               buff->dstadr = inter;
-#ifdef DEBUG
-               if (debug > 3)
-                       printf("Received %d bytes from %s\n", Bytes, stoa(&buff->recv_srcadr));
-#endif
-               add_full_recv_buffer(buff);
-               if( !SetEvent( WaitableIoEventHandle ) ) {
+       /*
+        * Get a new recv buffer for the next packet
+        */
+       newbuff = get_free_recv_buffer();
+       if (newbuff == NULL) {
+               /*
+                * recv buffers not available so we drop the packet
+                * and reuse the buffer.
+                */
+               newbuff = buff;
+       }
+       else 
+       {
+               if (Bytes > 0 && inter->ignore_packets == ISC_FALSE) {  
+                       buff->recv_length = (int) Bytes;
+                       buff->receiver = receive; 
+                       buff->dstadr = inter;
 #ifdef DEBUG
-                       if (debug > 3) {
-                               printf( "Error %d setting IoEventHandle\n", GetLastError() );
-                       }
+                       if (debug > 3)
+                               printf("Received %d bytes from %s\n", Bytes, stoa(&buff->recv_srcadr));
 #endif
+                       add_full_recv_buffer(buff);
+               }
+               else {
+                       freerecvbuf(buff);
                }
        }
-       else {
-               freerecvbuf(buff);
+       if( !SetEvent( WaitableIoEventHandle ) ) {
+#ifdef DEBUG
+               if (debug > 3) {
+                       printf( "Error %d setting IoEventHandle\n", GetLastError() );
+               }
+#endif
        }
 
-       QueueSocketRecv(inter->fd);
+       QueueSocketRecv(inter->fd, newbuff, lpo);
        return 1;
 }
 
 
+
 /*  Add a socket handle to the I/O completion port, and send an I/O
  *  read request to the kernel.
  *
  *  Note: As per the winsock documentation, we use WSARecvFrom. Using
  *        ReadFile() is less efficient.
  */
-extern void
+extern int
 io_completion_port_add_socket(SOCKET fd, struct interface *inter)
 {
+       IoCompletionInfo *lpo;
+       recvbuf_t *buff;
+
        if (fd != INVALID_SOCKET) {
                if (NULL == CreateIoCompletionPort((HANDLE) fd, hIoCompletionPort,
                                                   (DWORD) inter, 0)) {
                        msyslog(LOG_ERR, "Can't add socket to i/o completion port: %m");
+                       return 1;
                }
-               else QueueSocketRecv(fd);
        }
+
+       lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle,
+                                            HEAP_ZERO_MEMORY,
+                                            sizeof(IoCompletionInfo));
+       if (lpo == NULL)
+       {
+               msyslog(LOG_ERR, "Can't allocate heap for completion port: %m");
+               return 1;
+       }
+
+       buff = get_free_recv_buffer();
+
+       if (buff == NULL)
+       {
+               msyslog(LOG_ERR, "Can't allocate memory for network socket: %m");
+               HeapFree(hHeapHandle, 0, lpo);
+               return 1;
+       }
+
+       QueueSocketRecv(fd, buff, lpo);
+       return 0;
 }
 
 static int