]> git.ipfire.org Git - thirdparty/ntp.git/commitdiff
Revamp of I/O Completion ports for Windows
authorDanny Mayer <mayer@ntp.org>
Sun, 24 Jul 2005 19:50:24 +0000 (15:50 -0400)
committerDanny Mayer <mayer@ntp.org>
Sun, 24 Jul 2005 19:50:24 +0000 (15:50 -0400)
bk: 42e3f100iYHOeVbnGFCdl1WPvpLpEA

include/recvbuff.h
libntp/recvbuff.c
ports/winnt/include/ntp_iocompletionport.h
ports/winnt/include/transmitbuff.h
ports/winnt/libntp/transmitbuff.c
ports/winnt/ntpd/nt_clockstuff.c
ports/winnt/ntpd/ntp_iocompletionport.c

index 4e5ac7b575f075d0e90402339527ef659b4409b3..544ef5cfb0d783d5545de8ba5b41cb29162777cb 100644 (file)
@@ -56,9 +56,7 @@ struct recvbuf {
 #define        recv_srcclock   X_from_where.X_recv_srcclock
 #define recv_peer      X_from_where.X_recv_peer
 #if defined HAVE_IO_COMPLETION_PORT
-        IoCompletionInfo       iocompletioninfo;
        WSABUF          wsabuff;
-       DWORD           AddressLength;
 #else
        struct sockaddr_storage srcadr; /* where packet came from */
 #endif
index 1a9848fd0546be61c4cf63a37143c77de9b4f17f..2e2fc794888f9191b467672ccbe6e88a9ae95750 100644 (file)
@@ -24,12 +24,18 @@ static      struct recvbuf *volatile fulllist;  /* lifo buffers with data */
 static struct recvbuf *volatile beginlist; /* fifo buffers with data */
        
 #if defined(HAVE_IO_COMPLETION_PORT)
-static CRITICAL_SECTION RecvCritSection;
-# define RECV_BLOCK_IO()       EnterCriticalSection(&RecvCritSection)
-# define RECV_UNBLOCK_IO()     LeaveCriticalSection(&RecvCritSection)
+
+/*
+ * For Windows we need to set up a lock to manipulate the
+ * recv buffers to prevent corruption. We keep it lock for as
+ * short a time as possible
+ */
+static CRITICAL_SECTION RecvLock;
+# define LOCK()                EnterCriticalSection(&RecvLock)
+# define UNLOCK()      LeaveCriticalSection(&RecvLock)
 #else
-# define RECV_BLOCK_IO()       
-# define RECV_UNBLOCK_IO()     
+# define LOCK()        
+# define UNLOCK()      
 #endif
 
 u_long
@@ -62,7 +68,6 @@ initialise_buffer(struct recvbuf *buff)
        memset((char *) buff, 0, sizeof(struct recvbuf));
 
 #if defined HAVE_IO_COMPLETION_PORT
-       buff->iocompletioninfo.overlapped.hEvent = CreateEvent(NULL, FALSE,FALSE, NULL);
        buff->wsabuff.len = RX_BUFF_SIZE;
        buff->wsabuff.buf = (char *) buff->recv_buffer;
 #endif
@@ -114,7 +119,7 @@ init_recvbuff(int nbufs)
        full_recvbufs = lowater_adds = 0;
 
 #if defined(HAVE_IO_COMPLETION_PORT)
-       InitializeCriticalSection(&RecvCritSection);
+       InitializeCriticalSection(&RecvLock);
 #endif
 
 }
@@ -131,7 +136,7 @@ getrecvbufs(void)
 {
        struct recvbuf *rb = NULL; /* nothing has arrived */;
 
-       RECV_BLOCK_IO();
+       LOCK();
        if (full_recvbufs == 0)
        {
 #ifdef DEBUG
@@ -166,7 +171,7 @@ getrecvbufs(void)
                        }
                }
        }
-       RECV_UNBLOCK_IO();
+       UNLOCK();
 
        /*
         * Return the chain
@@ -182,13 +187,13 @@ freerecvbuf(
        struct recvbuf *rb
        )
 {
-       RECV_BLOCK_IO();
+       LOCK();
        BLOCKIO();
        rb->next = (struct recvbuf *) freelist;
        freelist = rb;
        free_recvbufs++;
        UNBLOCKIO();
-       RECV_UNBLOCK_IO();
+       UNLOCK();
 }
 
        
@@ -197,7 +202,7 @@ add_full_recv_buffer(
        struct recvbuf *rb
        )
 {
-       RECV_BLOCK_IO();
+       LOCK();
        if (full_recvbufs == 0)
        {
                beginlist = rb;
@@ -211,14 +216,14 @@ add_full_recv_buffer(
        fulllist = rb;
        full_recvbufs++;
 
-       RECV_UNBLOCK_IO();
+       UNLOCK();
 }
 
 struct recvbuf *
 get_free_recv_buffer(void)
 {
        struct recvbuf * buffer = NULL;
-       RECV_BLOCK_IO();
+       LOCK();
        if (free_recvbufs <= RECV_LOWAT)
                {
                        if (total_recvbufs >= RECV_TOOMANY) {
@@ -239,7 +244,7 @@ get_free_recv_buffer(void)
                --free_recvbufs;
        }
 
-       RECV_UNBLOCK_IO();
+       UNLOCK();
        return buffer;
 }
 
@@ -247,13 +252,13 @@ struct recvbuf *
 get_full_recv_buffer(void)
 {
        struct recvbuf * buffer = NULL;
-       RECV_BLOCK_IO();
+       LOCK();
        if (full_recvbufs > 0) {
                --full_recvbufs;
                buffer = beginlist;
                beginlist = buffer->next;
                buffer->next = NULL;
        }
-       RECV_UNBLOCK_IO();
+       UNLOCK();
        return buffer;
 }
index f04eb9ebca2f032a21c06d9b5555fbebad6933e5..d02c0f20d65fa55982e6a169e32a9014e49b0cb0 100644 (file)
@@ -8,10 +8,22 @@
 struct IoCompletionInfo;
 struct refclockio;
 
+/*
+ * Request types
+ */
+enum {
+       SOCK_RECV,
+       SOCK_SEND,
+       CLOCK_READ,
+       CLOCK_WRITE
+};
+
 typedef int IoCompletionInfoFunction(DWORD Key, struct IoCompletionInfo *, DWORD Bytes);
 
 typedef struct IoCompletionInfo {
        OVERLAPPED                      overlapped;
+       int                             request_type;
+       LPVOID                          buff;
        IoCompletionInfoFunction *      iofunction;
 } IoCompletionInfo;
 
@@ -26,12 +38,13 @@ extern      DWORD   io_completion_port_sendto (struct interface *, struct pkt *, int, s
 
 extern HANDLE get_io_event (void);
 
-static int OnSocketRecv(DWORD, struct IoCompletionInfo *, DWORD);
-
 struct recvbuf *GetReceivedBuffers(void);
 
 static int QueueIORead( struct refclockio * );
-static int OnIoReadComplete(DWORD, struct IoCompletionInfo *, DWORD);
+
+static int OnSocketRecv(DWORD, IoCompletionInfo *, DWORD);
+static int OnIoReadComplete(DWORD, IoCompletionInfo *, DWORD);
+static int OnWriteComplete(DWORD, IoCompletionInfo *, DWORD);
 # endif
 
 #endif
index 3094efe1cc1b449e99e34e9349a9881dbf185afb..33487c26e7c43292f41c0877a9cdaa6fe64ac16e 100644 (file)
@@ -6,8 +6,6 @@
 # include "ntp_iocompletionport.h"
 #endif
 
-#define TRANSMIT_BUF_LENGTH 1024
-
 /*
  * Format of a transmitbuf.  These are used by the asynchronous receive
  * routine to store incoming packets and related information.
 typedef struct transmitbuf {
        struct transmitbuf *next;               /* next buffer in chain */
 
-#if defined HAVE_IO_COMPLETION_PORT
-        IoCompletionInfo       iocompletioninfo;
-       WSABUF                  wsabuf;
-#endif
+       WSABUF  wsabuf;
+       time_t  ts;             /* Time stamp for the request */
 
        /*
         * union {
@@ -37,7 +33,7 @@ extern        void    init_transmitbuff       P((void));
 
 /* freetransmitbuf - make a single transmitbuf available for reuse
  */
-extern void    free_transmit_buffer P((struct transmitbuf *));
+extern void    free_transmit_buffer    P((transmitbuf *));
 
 /*  Get a free buffer (typically used so an async
  *  read can directly place data into the buffer
@@ -45,13 +41,7 @@ extern       void    free_transmit_buffer P((struct transmitbuf *));
  *  The buffer is removed from the free list. Make sure
  *  you put it back with freetransmitbuf() or 
  */
-extern struct transmitbuf *get_free_transmit_buffer P((void));
-
-
-
-
-
-
+extern transmitbuf *get_free_transmit_buffer P((void));
 
 #endif /* defined __transmitbuff_h */
 
index d098ee94157795ddb1b81ca820a951573d48c01a..ec3093c6c12e19de5f4662c2b1a808aaea566406 100644 (file)
@@ -8,6 +8,7 @@
 #include "ntp_stdlib.h"
 #include "ntp_syslog.h"
 
+#include <isc/list.h>
 #include "transmitbuff.h"
 
 /*
 #define TRANSMIT_INC   5       /* get 5 more at a time */
 #define TRANSMIT_TOOMANY       40      /* this is way too many buffers */
 
+/*
+ * Maximum time in seconds to allow transmit request to complete
+ * After that we are free to delete it if we need the buffer
+ */
+
+#define        MAX_TRANSMIT_SEND_TIME  60.0    
+
 /*
  * Memory allocation
  */
 static volatile u_long full_transmitbufs = 0;          /* number of transmitbufs on fulllist */
 static volatile u_long free_transmitbufs = 0;          /* number of transmitbufs on freelist */
 
-static struct transmitbuf *volatile freelist = NULL;  /* free buffers */
-static struct transmitbuf *volatile fulllist = NULL;  /* lifo buffers with data */
-static struct transmitbuf *volatile beginlist = NULL; /* fifo buffers with data */
+typedef struct transmitb transmitb_t;
+
+struct transmitb {
+       transmitbuf                     *tb;
+       ISC_LINK(transmitb_t)           link;
+};
+
+ISC_LIST(transmitb_t)  fulllist;               /* Currently used transmit buffers */
+
+static transmitbuf *volatile freelist = NULL;  /* free buffers */
+static transmitbuf *volatile beginlist = NULL; /* fifo buffers with data */
 
 static u_long total_transmitbufs = 0;          /* total transmitbufs currently in use */
 static u_long lowater_additions = 0;   /* number of times we have added memory */
 
-static struct transmitbuf initial_bufs[TRANSMIT_INIT]; /* initial allocation */
+static transmitbuf initial_bufs[TRANSMIT_INIT]; /* initial allocation */
 
 
-#if defined(HAVE_SIGNALED_IO)
-# define TRANSMIT_BLOCK_IO()   BLOCKIO()
-# define TRANSMIT_UNBLOCK_IO() UNBLOCKIO()
-#elif defined(HAVE_IO_COMPLETION_PORT)
-static CRITICAL_SECTION TransmitCritSection;
-# define TRANSMIT_BLOCK_IO()   EnterCriticalSection(&TransmitCritSection)
-# define TRANSMIT_UNBLOCK_IO() LeaveCriticalSection(&TransmitCritSection)
-#else
-# define TRANSMIT_BLOCK_IO()
-# define TRANSMIT_UNBLOCK_IO()
-#endif
+static CRITICAL_SECTION TransmitLock;
+# define LOCK(lock)    EnterCriticalSection(lock)
+# define UNLOCK(lock)  LeaveCriticalSection(lock)
 
+static struct transmitbuf initial_bufs[TRANSMIT_INIT]; /* initial allocation */
+static int eventid = 0;
 
 static void 
-initialise_buffer(struct transmitbuf *buff)
+initialise_buffer(transmitbuf *buff)
 {
-       memset((char *) buff, 0, sizeof(struct transmitbuf));
+       memset((char *) buff, 0, sizeof(transmitbuf));
 
-#if defined HAVE_IO_COMPLETION_PORT
-       buff->iocompletioninfo.overlapped.hEvent = CreateEvent(NULL, FALSE,FALSE, NULL);
        buff->wsabuf.len = 0;
        buff->wsabuf.buf = (char *) &buff->pkt;
-#endif
 }
 
 
@@ -67,36 +74,97 @@ init_transmitbuff(void)
        /*
         * Init buffer free list and stat counters
         */
-       freelist = 0;
+       freelist = NULL;
        for (i = 0; i < TRANSMIT_INIT; i++)
        {
                initialise_buffer(&initial_bufs[i]);
-               initial_bufs[i].next = (struct transmitbuf *) freelist;
+               initial_bufs[i].next = (transmitbuf *) freelist;
                freelist = &initial_bufs[i];
        }
 
-       fulllist = 0;
+       ISC_LIST_INIT(fulllist);
        free_transmitbufs = total_transmitbufs = TRANSMIT_INIT;
        full_transmitbufs = lowater_additions = 0;
 
-#if defined(HAVE_IO_COMPLETION_PORT)
-       InitializeCriticalSection(&TransmitCritSection);
-#endif
+       InitializeCriticalSection(&TransmitLock);
+}
+
+static void
+add_buffer_to_freelist(transmitbuf *tb)
+{
+       tb->next = freelist;
+       freelist = tb;
+       free_transmitbufs++;
+}
+
+static void
+delete_buffer_from_full_list(transmitbuf *tb) {
+
+       transmitb_t *next;
+       transmitb_t *lbuf = ISC_LIST_HEAD(fulllist);
+
+       while(lbuf != NULL) {
+               next = ISC_LIST_NEXT(lbuf, link);
+               if(lbuf->tb == tb) {
+                       ISC_LIST_DEQUEUE_TYPE(fulllist, lbuf, link, transmitb_t);
+                       free(lbuf);
+                       break;
+               }
+               else
+                       lbuf = next;
+       }
 }
 
+/*
+ * routine to free up any buffer that has not been freed up
+ * after MAX_TRANSMIT_SEND_TIME seconds. Note that we are not being
+ * too careful here about the correct value of time since we just need
+ * and approximate measure of how much time has elapsed since the
+ * packet was sent and this routine is only called if we run out
+ * of tranmit buffers.
+ */
+static int
+free_unsent_buffers()
+{
+       int tot_freed = 0;
+       double elapsed_time;
+       time_t ct;
+       transmitbuf *buf;
+       transmitb_t *next;
+       transmitb_t *lbuf = ISC_LIST_HEAD(fulllist);
+
+       time(&ct);      /* Current Time */
+
+       LOCK(&TransmitLock);
+       while(lbuf != NULL) {
+               next = ISC_LIST_NEXT(lbuf, link);
+               elapsed_time = difftime(ct, lbuf->tb->ts);
+               if (elapsed_time > MAX_TRANSMIT_SEND_TIME) {
+                       ISC_LIST_DEQUEUE_TYPE(fulllist, lbuf, link, transmitb_t);
+                       free(lbuf);
+                       add_buffer_to_freelist(lbuf->tb);
+                       tot_freed++;
+               }
+               else
+                       lbuf = next;
+       }
+       UNLOCK(&TransmitLock);
+       return (tot_freed);
+}
 
 static void
 create_buffers(void)
 {
-       register struct transmitbuf *buf;
+       transmitbuf *buf;
        int i;
+       if (free_unsent_buffers() > 0)
+               return;
 
-       buf = (struct transmitbuf *)
-           emalloc(TRANSMIT_INC*sizeof(struct transmitbuf));
+       buf = (transmitbuf *) emalloc(TRANSMIT_INC*sizeof(transmitbuf));
        for (i = 0; i < TRANSMIT_INC; i++)
        {
                initialise_buffer(buf);
-               buf->next = (struct transmitbuf *) freelist;
+               buf->next = (transmitbuf *) freelist;
                freelist = buf;
                buf++;
        }
@@ -108,31 +176,33 @@ create_buffers(void)
 
 
 extern void
-free_transmit_buffer(
-       struct transmitbuf *rb
-       )
+free_transmit_buffer(transmitbuf *rb)
 {
-       TRANSMIT_BLOCK_IO();
-       rb->next = freelist;
-       freelist = rb;
-       free_transmitbufs++;
-       TRANSMIT_UNBLOCK_IO();
+       LOCK(&TransmitLock);
+       add_buffer_to_freelist(rb);
+       delete_buffer_from_full_list(rb);
+       UNLOCK(&TransmitLock);
 }
 
 
-extern struct transmitbuf *
+extern transmitbuf *
 get_free_transmit_buffer(void)
 {
-       struct transmitbuf * buffer = NULL;
-       TRANSMIT_BLOCK_IO();
+       transmitb_t *lbuf = (transmitb_t *)malloc(sizeof(transmitb_t));
+
+       transmitbuf * buffer = NULL;
+       LOCK(&TransmitLock);
        if (free_transmitbufs <= 0) {
                create_buffers();
        }
        buffer = freelist;
        freelist = buffer->next;
        buffer->next = NULL;
+       time(&buffer->ts);      /* Time we gave out the transmit buffer */
+       lbuf->tb = buffer;
+       ISC_LIST_APPEND(fulllist, lbuf, link);
        --free_transmitbufs;
-       TRANSMIT_UNBLOCK_IO();
+       UNLOCK(&TransmitLock);
        return buffer;
 }
 
index 9678c3127ea1052c988bd71814364fadf6b0210a..b6a03cb73d25e48779e0b5beedd8bc12f181d677 100644 (file)
@@ -430,7 +430,7 @@ static void StartClockThread(void)
 
        /* init sync objects */
        InitializeCriticalSection(&TimerCritialSection);
-       TimerThreadExitRequest = CreateEvent(NULL, FALSE, FALSE, NULL);
+       TimerThreadExitRequest = CreateEvent(NULL, FALSE, FALSE, "TimerThreadExitRequest");
 
        ClockThreadHandle = CreateThread(NULL, 0, ClockThread, NULL, 0, &tid);
        if (ClockThreadHandle != NULL) {
index 2bfe406e1e68c41e1c0569133590cbe77b17e385..9cd14fb7e144322e5064e3a596bd6ee1d839ce9b 100644 (file)
 #include "transmitbuff.h"
 
 
+#define BUFCHECK_SECS  10
+static void    TransmitCheckThread(void *NotUsed);
+static BOOL    bExit;
+static HANDLE  TimerHandle;
+static HANDLE hHeapHandle = NULL;
+
 static HANDLE hIoCompletionPort = NULL;
 
 static HANDLE WaitableIoEventHandle = NULL;
-HANDLE WaitHandles[3] = { NULL, NULL, NULL };
+
+#define MAXHANDLES 3
+HANDLE WaitHandles[MAXHANDLES] = { NULL, NULL, NULL };
 
 HANDLE
 get_io_event()
@@ -29,38 +37,24 @@ get_io_event()
        return( WaitableIoEventHandle );
 }
 
-
-static int 
-OnExitRequest(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes)
-{
-  (void) Info;
-  (void) Bytes;
-  (void) Key;
-  return 0; /* auto-fail to request an exit */
-}
-
-static IoCompletionInfo ExitRequest;
 /*  This function will add an entry to the I/O completion port
  *  that will signal the I/O thread to exit (gracefully)
  */
 static void
 signal_io_completion_port_exit()
 {
-       ExitRequest.iofunction = OnExitRequest;
-       if (!PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, &ExitRequest.overlapped)) {
+       if (!PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, 0)) {
                msyslog(LOG_ERR, "Can't request service thread to exit: %m");
                exit(1);
        }
 }
 
-
-static void __cdecl 
+static void
 iocompletionthread(void *NotUsed)
 {
        DWORD BytesTransferred = 0;
        DWORD Key = 0;
-       IoCompletionInfo * Overlapped = NULL;
-       (void) NotUsed;
+       IoCompletionInfo * lpo = NULL;
 
        /*      Set the thread priority high enough so I/O will
         *      preempt normal recv packet processing, but not
@@ -70,17 +64,43 @@ iocompletionthread(void *NotUsed)
                msyslog(LOG_ERR, "Can't set thread priority: %m");
        }
 
-       for (;;) {
-               while (GetQueuedCompletionStatus(hIoCompletionPort, &BytesTransferred, &Key, & (LPOVERLAPPED) Overlapped, INFINITE)) {
-                       if (Overlapped != NULL && 
-                               Overlapped->iofunction != NULL) {
-                               if (!Overlapped->iofunction(Key, Overlapped, BytesTransferred)) {
+       while (TRUE) {
+               GetQueuedCompletionStatus(hIoCompletionPort, 
+                                         &BytesTransferred, 
+                                         &Key, 
+                                         & (LPOVERLAPPED) lpo, 
+                                         INFINITE);
+               if (lpo == NULL)
+               {
 #ifdef DEBUG
                        msyslog(LOG_INFO, "Overlapped IO Thread Exits: %m");    
 #endif
-                               return ; /* fail */
-                               }
+                       break; /* fail */
+               }
+
+               /*
+                * Invoke the appropriate function based on
+                * the value of the request_type
+                */
+               switch(lpo->request_type)
+               {
+               case CLOCK_READ:
+                       OnIoReadComplete(Key, lpo, BytesTransferred);
+                       break;
+               case SOCK_RECV:
+                       OnSocketRecv(Key, lpo, BytesTransferred);
+                       break;
+               case SOCK_SEND:
+               case CLOCK_WRITE:
+                       OnWriteComplete(Key, lpo, BytesTransferred);
+                       break;
+               default:
+#if DEBUG
+                       if (debug > 3) {
+                               printf("Unknown request type %d found in completion port\n",
+                                       lpo->request_type);
                        }
+#endif
                }
        }
 }
@@ -93,9 +113,21 @@ init_io_completion_port(
        )
 {
 
+       /*
+        * Create a handle to the Heap
+        */
+       hHeapHandle = HeapCreate(0, 20*sizeof(IoCompletionInfo), 0);
+
+       /* Set the exit flag */
+       bExit = FALSE;
+       /*
+        * Initialize the timer watch section
+        */
+//     _beginthread(TransmitCheckThread, 0, NULL);
+
        /* Create the event used to signal an IO event
         */
-       WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, NULL);
+       WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, "WaitableIoEventHandle");
        if (WaitableIoEventHandle == NULL) {
                msyslog(LOG_ERR, "Can't create I/O event handle: %m");
                exit(1);
@@ -112,7 +144,7 @@ init_io_completion_port(
        /*
         * Initialize the Wait Handles
         */
-       WaitHandles[0] = CreateEvent(NULL, FALSE, FALSE, NULL); /* exit request */
+       WaitHandles[0] = CreateEvent(NULL, FALSE, FALSE, "WaitHandles0"); /* exit request */
        WaitHandles[1] = get_timer_handle();
        WaitHandles[2] = get_io_event();
 
@@ -128,6 +160,12 @@ uninit_io_completion_port(
        void
        )
 {
+       /*
+        * Tell the timer handle to exit
+        */
+       bExit = TRUE;
+       SetEvent(TimerHandle);
+
        if (hIoCompletionPort != NULL) {
                /*  Get each of the service threads to exit
                */
@@ -139,15 +177,21 @@ uninit_io_completion_port(
 static int QueueIORead( struct refclockio *rio ) {
 
        struct recvbuf *buff;
+       IoCompletionInfo *lpo;
 
+       lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle,
+                                            HEAP_ZERO_MEMORY,
+                                            sizeof(IoCompletionInfo));
        buff = get_free_recv_buffer();
        
        if (buff == NULL)
                return 0;
        
+       lpo->request_type = CLOCK_READ;
+       lpo->buff = buff;
+
        buff->fd = rio->fd;
-       buff->iocompletioninfo.iofunction = OnIoReadComplete;
-       if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, &buff->iocompletioninfo.overlapped)) {
+       if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, (LPOVERLAPPED) lpo)) {
                        DWORD Result = GetLastError();
                        switch (Result) {                               
                                        case NO_ERROR :
@@ -158,6 +202,9 @@ static int QueueIORead( struct refclockio *rio ) {
                                default:
                                        msyslog(LOG_ERR, "Can't read from Refclock: %m");
                                        freerecvbuf(buff);
+                                       /* Clear the heap */
+                                       if (lpo != NULL)
+                                               HeapFree(hHeapHandle, 0, lpo);
                                        return 0;
                        }
        }
@@ -168,15 +215,14 @@ static int QueueIORead( struct refclockio *rio ) {
 
 /* Return 1 on Successful Read */
 static int 
-OnIoReadComplete(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes)
+OnIoReadComplete(DWORD i, IoCompletionInfo *lpo, DWORD Bytes)
 {
        struct recvbuf *buff = NULL;
        struct refclockio * rio = (struct refclockio *) i;
 
        /*  Convert the overlapped pointer back to a recvbuf pointer.
        */
-       buff = (struct recvbuf *) ( ((char *) Info) - offsetof(struct recvbuf, iocompletioninfo));
-
+       buff = (struct recvbuf *) lpo->buff;
        if (Bytes > 0) { /* ignore 0 bytes read due to timeout's */
                get_systime(&buff->recv_time);
                buff->recv_length = (int) Bytes;
@@ -196,6 +242,10 @@ OnIoReadComplete(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes)
        else 
                freerecvbuf(buff);
 
+       /* Clear the heap */
+       if (lpo != NULL)
+               HeapFree(hHeapHandle, 0, lpo);
+
        QueueIORead( rio );
        return 1;
 }
@@ -221,19 +271,27 @@ io_completion_port_add_clock_io(
 static unsigned long QueueSocketRecv(SOCKET s) {
        
        struct recvbuf *buff;
+       IoCompletionInfo *lpo;
+       int AddrLen;
+
+       lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle,
+                                            HEAP_ZERO_MEMORY,
+                                            sizeof(IoCompletionInfo));
        buff = get_free_recv_buffer();
 
+       lpo->request_type = SOCK_RECV;
+       lpo->buff = buff;
+
        if (buff != NULL) {
                DWORD BytesReceived = 0;
                DWORD Flags = 0;
-               buff->iocompletioninfo.iofunction = OnSocketRecv;
                buff->fd = s;
-               buff->AddressLength = sizeof(struct sockaddr_in);
+               AddrLen = sizeof(struct sockaddr_in);
 
                if (SOCKET_ERROR == WSARecvFrom(buff->fd, &buff->wsabuff, 1, 
                                                &BytesReceived, &Flags, 
-                                               (struct sockaddr *) &buff->recv_srcadr, (LPINT) &buff->AddressLength
-                                               &buff->iocompletioninfo.overlapped, NULL)) {
+                                               (struct sockaddr *) &buff->recv_srcadr, (LPINT) &AddrLen
+                                               (LPOVERLAPPED) lpo, NULL)) {
                        DWORD Result = WSAGetLastError();
                        switch (Result) {
                                case NO_ERROR :
@@ -245,11 +303,17 @@ static unsigned long QueueSocketRecv(SOCKET s) {
                                case WSAENOTSOCK :
                                        netsyslog(LOG_ERR, "Can't read from socket, because it isn't a socket: %m");
                                        freerecvbuf(buff);
+                                       /* Clear the heap */
+                                       if (lpo != NULL)
+                                               HeapFree(hHeapHandle, 0, lpo);
                                        return 0;
                                        break;
 
                                case WSAEFAULT :
                                        netsyslog(LOG_ERR, "The buffers parameter is incorrect: %m");
+                                       /* Clear the heap */
+                                       if (lpo != NULL)
+                                               HeapFree(hHeapHandle, 0, lpo);
                                        freerecvbuf(buff);
                                        return 0;
                                break;
@@ -267,7 +331,7 @@ static unsigned long QueueSocketRecv(SOCKET s) {
 
 /* Returns 0 if any Error */
 static int 
-OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes)
+OnSocketRecv(DWORD i, IoCompletionInfo *lpo, DWORD Bytes)
 {
        struct recvbuf *buff = NULL;
        struct interface * inter = (struct interface *) i;
@@ -275,7 +339,7 @@ OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes)
        /*  Convert the overlapped pointer back to a recvbuf pointer.
        */
        
-       buff = (struct recvbuf *) ( ((char *) Info) - offsetof(struct recvbuf, iocompletioninfo));
+       buff = (struct recvbuf *) lpo->buff;
        get_systime(&buff->recv_time);  
        
        if (Bytes > 0 && inter->ignore_packets == ISC_FALSE) {  
@@ -283,12 +347,8 @@ OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes)
                buff->receiver = receive; 
                buff->dstadr = inter;
 #ifdef DEBUG
-               if (debug > 3) {
-               char  strbuffer[256];
-               DWORD strlength = sizeof(strbuffer);
-               if (0 == WSAAddressToString((struct sockaddr *) &buff->recv_srcadr, buff->AddressLength, NULL, strbuffer, &strlength)) 
-                       printf("Received %d bytes from %s\n", Bytes, strbuffer);
-       }
+               if (debug > 3)
+                       printf("Received %d bytes from %s\n", stoa(&buff->recv_srcadr));
 #endif
                add_full_recv_buffer(buff);
                if( !SetEvent( WaitableIoEventHandle ) ) {
@@ -302,6 +362,10 @@ OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes)
        else {
                freerecvbuf(buff);
        }
+       /* Clear the heap */
+       if (lpo != NULL)
+               HeapFree(hHeapHandle, 0, lpo);
+
        QueueSocketRecv(inter->fd);
        return 1;
 }
@@ -327,29 +391,35 @@ io_completion_port_add_socket(SOCKET fd, struct interface *inter)
 
 
 static int 
-OnSendToComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes)
+OnSendToComplete(DWORD Key, IoCompletionInfo *lpo, DWORD Bytes)
 {
        transmitbuf *buff = NULL;
        (void) Bytes;
        (void) Key;
 
-       buff = (struct transmitbuf *) ( ((char *) Info) - offsetof(struct transmitbuf, iocompletioninfo));
+       buff = (struct transmitbuf *) lpo->buff;
 
        free_transmit_buffer(buff);
+       /* Clear the heap */
+       if (lpo != NULL)
+               HeapFree(hHeapHandle, 0, lpo);
        return 1;
 }
 
 
 static int 
-OnWriteComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes)
+OnWriteComplete(DWORD Key, IoCompletionInfo *lpo, DWORD Bytes)
 {
        transmitbuf *buff = NULL;
        (void) Bytes;
        (void) Key;
 
-       buff = (struct transmitbuf *) ( ((char *) Info) - offsetof(struct transmitbuf, iocompletioninfo));
+       buff = (struct transmitbuf *) lpo->buff;
 
        free_transmit_buffer(buff);
+       /* Clear the heap */
+       if (lpo != NULL)
+               HeapFree(hHeapHandle, 0, lpo);
        return 1;
 }
 
@@ -364,6 +434,12 @@ io_completion_port_sendto(
        transmitbuf *buff = NULL;
        DWORD Result = ERROR_SUCCESS;
        int errval;
+       int AddrLen;
+       IoCompletionInfo *lpo;
+
+       lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle,
+                                            HEAP_ZERO_MEMORY,
+                                            sizeof(IoCompletionInfo));
 
        if (len <= sizeof(buff->pkt)) {
                buff = get_free_transmit_buffer();
@@ -375,8 +451,11 @@ io_completion_port_sendto(
                        buff->wsabuf.buf = buff->pkt;
                        buff->wsabuf.len = len;
 
-                       buff->iocompletioninfo.iofunction = OnSendToComplete;
-                       Result = WSASendTo(inter->fd, &buff->wsabuf, 1, &BytesSent, Flags, (struct sockaddr *) dest, sizeof(struct sockaddr_in), &buff->iocompletioninfo.overlapped, NULL);
+                       AddrLen = sizeof(struct sockaddr_in);
+                       lpo->request_type = SOCK_SEND;
+                       lpo->buff = buff;
+
+                       Result = WSASendTo(inter->fd, &buff->wsabuf, 1, &BytesSent, Flags, (struct sockaddr *) dest, AddrLen, (LPOVERLAPPED) lpo, NULL);
 
                        if(Result == SOCKET_ERROR)
                        {
@@ -392,16 +471,13 @@ io_completion_port_sendto(
 
                                default :
                                        netsyslog(LOG_ERR, "WSASendTo - error sending message: %m");
+                                       free_transmit_buffer(buff);
                                        break;
                                }
                        }
 #ifdef DEBUG
-                       if (debug > 2) {
-                               char  strbuffer[256];
-                               DWORD strlength = sizeof(strbuffer);
-                               if (0 == WSAAddressToString((LPSOCKADDR) dest, sizeof(*dest), NULL, strbuffer, &strlength)) 
-                                       printf("WSASendTo - %d bytes to %s : %d\n", len, strbuffer, Result);
-                       }
+                       if (debug > 3)
+                               printf("WSASendTo - %d bytes to %s : %d\n", len, stoa(dest), Result);
 #endif
                        return (Result);
                }
@@ -431,22 +507,47 @@ io_completion_port_write(
        char *pkt,      
        int len)
 {
+       DWORD errval;
        transmitbuf *buff = NULL;
        DWORD lpNumberOfBytesWritten;
        DWORD Result = ERROR_INSUFFICIENT_BUFFER;
+       IoCompletionInfo *lpo;
+
+       lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle,
+                                            HEAP_ZERO_MEMORY,
+                                            sizeof(IoCompletionInfo));
 
        if (len <= sizeof(buff->pkt)) {
                buff = get_free_transmit_buffer();
                if (buff != NULL) {
 
+                       lpo->request_type = CLOCK_WRITE;
+                       lpo->buff = buff;
                        memcpy(&buff->pkt, pkt, len);
-                       buff->iocompletioninfo.iofunction = OnWriteComplete;
 
-                       Result = WriteFile(fd, buff->pkt, len, &lpNumberOfBytesWritten, &buff->iocompletioninfo.overlapped);
+                       Result = WriteFile(fd, buff->pkt, len, &lpNumberOfBytesWritten, (LPOVERLAPPED) lpo);
+
+                       if(Result == SOCKET_ERROR)
+                       {
+                               errval = WSAGetLastError();
+                               switch (errval) {
 
+                               case NO_ERROR :
+                               case WSA_IO_INCOMPLETE :
+                               case WSA_WAIT_IO_COMPLETION :
+                               case WSA_IO_PENDING :
+                                       Result = ERROR_SUCCESS;
+                                       break ;
+
+                               default :
+                                       netsyslog(LOG_ERR, "WriteFile - error sending message: %m");
+                                       free_transmit_buffer(buff);
+                                       break;
+                               }
+                       }
 #ifdef DEBUG
                        if (debug > 2) {
-                               printf("SendTo - %d bytes %d\n", len, Result);
+                               printf("WriteFile - %d bytes %d\n", len, Result);
                        }
 #endif
                        if (Result) return len;
@@ -467,10 +568,14 @@ io_completion_port_write(
        return Result;
 }
 
+/*
+ * GetReceivedBuffers
+ * Note that this is in effect the main loop for processing requests
+ * both send and receive. This should be reimplemented
+ */
 struct recvbuf *GetReceivedBuffers()
 {
-
-       DWORD Index = WaitForMultipleObjectsEx(sizeof(WaitHandles)/sizeof(WaitHandles[0]), WaitHandles, FALSE, 1000, TRUE);
+       DWORD Index = WaitForMultipleObjectsEx(MAXHANDLES, WaitHandles, FALSE, INFINITE, TRUE);
        switch (Index) {
        case WAIT_OBJECT_0 + 0 : /* exit request */
                exit(0);
@@ -501,8 +606,24 @@ struct recvbuf *GetReceivedBuffers()
                break;          
                                
        } /* switch */
+
        return (getrecvbufs()); /* get received buffers */
 }
+static void
+TransmitCheckThread(void *NotUsed)
+{
+
+       int SleepSecs = BUFCHECK_SECS;
+       while (TRUE) 
+       {
+               Sleep(SleepSecs*1000);
+
+               /* If we are done we exit */
+               if (bExit)
+                       return;
+                               
+       } /* while */
+}
 
 #else
   static int NonEmptyCompilationUnit;