]> git.ipfire.org Git - thirdparty/ntp.git/commitdiff
[Bug 2140] Rework of Windows I/O completion port handling to avoid garbling serial...
authorJuergen Perlinger <perlinger@ntp.org>
Sat, 18 Feb 2012 01:41:22 +0000 (02:41 +0100)
committerJuergen Perlinger <perlinger@ntp.org>
Sat, 18 Feb 2012 01:41:22 +0000 (02:41 +0100)
bk: 4f3f01c2forj9sGdi9t78wQdVjBjZQ

ChangeLog
ports/winnt/ntpd/ntp_iocompletionport.c

index e3056d2267671850bf44f82e8d43c05db6e9cb68..35c405193ca8a4729d901200123afa0222dae296 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,5 @@
+* [Bug 2140] Rework of Windows I/O completion port handling to avoid
+  garbling serial input in UNIX line discipline emulation.
 (4.2.7p257) 2012/02/17 Released by Harlan Stenn <stenn@ntp.org>
 * [Bug 2135] defer calls to 'io_input' to main thread under Windows.
 (4.2.7p256) 2012/02/08 Released by Harlan Stenn <stenn@ntp.org>
index ae59486fec4a5957b2f73e4350efd1d13583eb9c..2b08ce973fb2c12c5bd5062efdebbe626115a572 100644 (file)
@@ -1,3 +1,51 @@
+/*
+-----------------------------------------------------------------------
+This is the IO completion port handling for async/overlapped IO on
+Windows >= Win2000.
+
+Some notes on the implementation:
+
++ Only one thread is used to serve the IO completion port, for several
+  reasons:
+
+  * First, there seems to be (have been?) trouble that locked up NTPD
+    when more than one thread was used for IOCPL.
+
+  * Second, for the sake of the time stamp interpolation the threads
+    must run on the same CPU as the time interpolation thread. This
+    makes using more than one thread useless, as they would compete for
+    the same core and create contention.
+
++ Some IO operations need a possibly lengthy postprocessing. Emulating
+  the UN*X line discipline is currently the only but prominent example.
+  To avoid the processing in the time-critical IOCPL thread, longer
+  processing is offloaded the worker thread pool.
+
++ A fact that seems not as well-known as it should be is that all
+  ressources passed to an overlapped IO operation must be considered
+  owned by the OS until the result has been fetched/dequeued. This
+  includes all overlapped structures and buffers involved, so cleaning
+  up on shutdown must be carefully constructed. (This includes closing
+  all the IO handles and waiting for the results to be dequeued.
+  'CancleIo()' cannot be used since it's broken beyond repair.)
+
+  If this is not possible, then all ressources should be dropped into
+  oblivion -- otherwise "bad things (tm)" are bound to happen.
+
+  Using a private heap that is silently dropped but not deleted is a
+  good way to avoid cluttering memory stats with IO context related
+  objects. Leak tracing becomes more interesting, though.
+
+
+The current implementation is based on the work of Danny Mayer who wrote
+the original implementation and Dave Hart who improved on the serial I/O
+routines.  This version still provides the 'user space PPS' emulation
+feature.
+
+Juergen Perlinger (perlinger@ntp.org) Feb 2012
+
+-----------------------------------------------------------------------
+*/
 #ifdef HAVE_CONFIG_H
 # include <config.h>
 #endif
 #define CONTAINEROF(p, type, member) \
        ((type *)((char *)(p) - offsetof(type, member)))
 
-/*
- * Request types
- */
-enum {
-       SOCK_RECV,
-       SOCK_SEND,
-       SERIAL_WAIT,
-       SERIAL_READ,
-       RAW_SERIAL_READ,
-       SERIAL_WRITE
-};
-
 #ifdef _MSC_VER
 # pragma warning(push)
 # pragma warning(disable: 201)         /* nonstd extension nameless union */
 #endif
 
-typedef struct olplus_tag {
-       OVERLAPPED              ol;
-       int                     request_type;
+/*
+ * ---------------------------------------------------------------------
+ * I/O context structure
+ *
+ * This is an extended overlapped structure. Some fields are only used
+ * for serial I/O, others are used for all operations. The serial I/O is
+ * more interesting since the same context object is used for waiting,
+ * actual I/O and possibly offload processing in a worker thread until
+ * a complete operation cycle is done.
+ *
+ * In this case the I/O context is used to gather all the bits that are
+ * finally needed for the processing of the buffer.
+ * ---------------------------------------------------------------------
+ */
+struct IoCtx;
+typedef struct IoCtx IoCtx_t;
+
+typedef void (*IoCompleteFunc)(ULONG_PTR, IoCtx_t *);
+
+struct IoCtx {
+       OVERLAPPED              ol;             /* 'kernel' part of the context */
        union {
-               recvbuf_t *     recv_buf;
-               void *          trans_buf;
+               recvbuf_t *     recv_buf;       /* incoming -> buffer structure */
+               void *          trans_buf;      /* outgoing -> char array       */
        };
-#ifdef DEBUG
-       struct olplus_tag *     link;
-#endif
-} olplus;
+       IoCompleteFunc          onIoDone;       /* HL callback to execute       */
+       struct refclockio *     rio;            /* RIO backlink (for offload)   */
+       l_fp                    DCDSTime;       /* timestamp of DCD set         */
+       l_fp                    FlagTime;       /* timestamp of flag/event char */
+       l_fp                    RecvTime;       /* timestamp of callback        */
+       DWORD                   errCode;        /* error code of last I/O       */
+       DWORD                   byteCount;      /* byte count     "             */
+       DWORD                   com_events;     /* buffer for COM events        */
+       unsigned int            flRawMem : 1;   /* buffer is raw memory -> free */
+       unsigned int            flTsDCDS : 1;   /* DCDSTime valid?              */
+       unsigned int            flTsFlag : 1;   /* FlagTime valid?              */
+};
 
 #ifdef _MSC_VER
 # pragma warning(pop)
@@ -57,236 +119,215 @@ typedef struct olplus_tag {
 /*
  * local function definitions
  */
-       void    ntpd_addremove_semaphore(HANDLE, int);
-static int     QueueSerialWait(struct refclockio *, recvbuf_t *buff,
-                               olplus *lpo, BOOL clear_timestamp);
-static int     QueueRawSerialRead(struct refclockio *, recvbuf_t *buff,
-                                  olplus *lpo);
-static int     OnSocketRecv(ULONG_PTR, olplus *, DWORD, int);
-static int     OnSerialWaitComplete(struct refclockio *, olplus *,
-                                    DWORD, int);
-static int     OnSerialReadComplete(struct refclockio *, olplus *,
-                                    DWORD, int);
-static int     OnRawSerialReadComplete(struct refclockio *, olplus *,
-                                       DWORD, int);
-static int     OnWriteComplete(ULONG_PTR, olplus *, DWORD, int);
+static void ntpd_addremove_semaphore(HANDLE, int);
+
+/* Initiate/Request async IO operations */
+static BOOL QueueSerialWait   (struct refclockio *, recvbuf_t *, IoCtx_t *);
+static BOOL QueueSerialRead   (struct refclockio *, recvbuf_t *, IoCtx_t *);
+static BOOL QueueRawSerialRead(struct refclockio *, recvbuf_t *, IoCtx_t *);
+static  BOOL QueueSocketRecv   (SOCKET             , recvbuf_t *, IoCtx_t *);
+
+
+/* High-level IO callback functions */
+static void OnSocketRecv           (ULONG_PTR, IoCtx_t *);
+static void OnSerialWaitComplete   (ULONG_PTR, IoCtx_t *);
+static void OnSerialReadComplete   (ULONG_PTR, IoCtx_t *);
+static void OnRawSerialReadComplete(ULONG_PTR, IoCtx_t *);
+static void OnSerialWriteComplete  (ULONG_PTR, IoCtx_t *);
+
+/* worker pool offload functions */
+static DWORD WINAPI OnSerialReadWorker(void * ctx);
 
 
 /* keep a list to traverse to free memory on debug builds */
 #ifdef DEBUG
 static void free_io_completion_port_mem(void);
-olplus *               compl_info_list;
-CRITICAL_SECTION       compl_info_lock;
-#define LOCK_COMPL()   EnterCriticalSection(&compl_info_lock)
-#define UNLOCK_COMPL() LeaveCriticalSection(&compl_info_lock)
 #endif
 
-/* #define USE_HEAP */
-
-#ifdef USE_HEAP
-static HANDLE hHeapHandle = NULL;
-#endif
 
        HANDLE WaitableExitEventHandle;
        HANDLE WaitableIoEventHandle;
 static HANDLE hIoCompletionPort;
 
-#ifdef NTPNEEDNAMEDHANDLE
-#define WAITABLEIOEVENTHANDLE "WaitableIoEventHandle"
-#else
-#define WAITABLEIOEVENTHANDLE NULL
-#endif
-
 DWORD  ActiveWaitHandles;
 HANDLE WaitHandles[16];
 
-olplus *
-GetHeapAlloc(char *fromfunc)
-{
-       olplus *lpo;
-
-#ifdef USE_HEAP
-       lpo = HeapAlloc(hHeapHandle,
-                       HEAP_ZERO_MEMORY,
-                       sizeof(olplus));
-#else
-       lpo = emalloc_zero(sizeof(*lpo));
-#endif
-       DPRINTF(3, ("Allocation %d memory for %s, ptr %x\n", sizeof(olplus), fromfunc, lpo));
+/*
+ * -------------------------------------------------------------------
+ * We make a pool of our own for IO context objects -- the are owned by
+ * the system until a completion result is pulled from the queue, and
+ * they seriously go into the way of memory tracking until we can safely
+ * cancel an IO request.
+ * -------------------------------------------------------------------
+ */
+static HANDLE hHeapHandle;
 
-#ifdef DEBUG
-       LOCK_COMPL();
-       LINK_SLIST(compl_info_list, lpo, link);
-       UNLOCK_COMPL();
-#endif
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * Create a new heap for IO context objects
+ */
+static void
+IoCtxPoolInit(size_t initObjs)
+{
+       hHeapHandle = HeapCreate(0, initObjs*sizeof(IoCtx_t), 0);
+       if (hHeapHandle == NULL)
+       {
+               msyslog(LOG_ERR, "Can't initialize Heap: %m");
+               exit(1);
+       }
 
-       return (lpo);
 }
 
-void
-FreeHeap(olplus *lpo, char *fromfunc)
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ *
+ * Delete the IO context heap
+ *
+ * Since we do not know what callbacks are pending, we just drop the
+ * pool into oblivion. New allocs and frees will fail from this moment,
+ * but we simply don't care. At least the normal heap dump stats will
+ * show no leaks from IO context blocks. On the downside, we have to
+ * track them ourselves if something goes wrong.
+ */
+static void
+IoCtxPoolDone()
 {
-#ifdef DEBUG
-       olplus *unlinked;
-
-       DPRINTF(3, ("Freeing memory for %s, ptr %x\n", fromfunc, lpo));
+       hHeapHandle = NULL;
+}
 
-       LOCK_COMPL();
-       UNLINK_SLIST(unlinked, compl_info_list, lpo, link,
-           olplus);
-       UNLOCK_COMPL();
-#endif
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * Alloc & Free
+ *
+ * When the heap handle is NULL, these both will fail; Alloc with a NULL
+ * return and Free silently.
+ */
+static IoCtx_t *
+IoCtxAlloc(void)
+{
+       IoCtx_t *lpo;
 
-#ifdef USE_HEAP
-       HeapFree(hHeapHandle, 0, lpo);
-#else
-       free(lpo);
-#endif
+       if (hHeapHandle)
+               lpo = HeapAlloc(hHeapHandle, HEAP_ZERO_MEMORY, sizeof(IoCtx_t));
+       else
+               lpo = NULL;
+       DPRINTF(3, ("Allocate IO ctx, heap=%p, ptr=%p\n", hHeapHandle, lpo));
+       return (lpo);
 }
 
-/*  This function will add an entry to the I/O completion port
- *  that will signal the I/O thread to exit (gracefully)
- */
 static void
-signal_io_completion_port_exit()
+IoCtxFree(IoCtx_t *lpo)
 {
-       if (!PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, 0)) {
-               msyslog(LOG_ERR, "Can't request service thread to exit: %m");
-               exit(1);
-       }
+       DPRINTF(3, ("Free IO ctx, heap=%p, ptr=%p\n", hHeapHandle, lpo));
+       if (lpo && hHeapHandle)
+               HeapFree(hHeapHandle, 0, lpo);
 }
 
+
+/*
+ * -------------------------------------------------------------------
+ * The IO completion thread and support functions
+ *
+ * There is only one completion thread, because it is locked to the same
+ * core as the time interpolation. Having more than one causes core
+ * contention and is not useful.
+ * -------------------------------------------------------------------
+ */
+static HANDLE hIoCompletionThread;
+
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * The IO completion worker thread
+ *
+ * Note that this thread does not enter an alertable wait state and that
+ * the only waiting point is the IO completion port. If stopping this
+ * thread with a special queued result packet does not work,
+ * 'TerminateThread()' is the only remaining weapon in the arsenal. A
+ * dangerous weapon -- it's like SIGKILL.
+ */
 static unsigned WINAPI
 iocompletionthread(void *NotUsed)
 {
-       BOOL                    bSuccess;
-       int                     err;
-       DWORD                   octets;
-       ULONG_PTR               key;
-       struct refclockio *     rio;
-       OVERLAPPED *            pol;
-       olplus *                lpo;
+       DWORD           err;
+       DWORD           octets;
+       ULONG_PTR       key;
+       OVERLAPPED *    pol;
+       IoCtx_t *       lpo;
 
 
        UNUSED_ARG(NotUsed);
 
        /*
-        *      socket and refclock receive call gettimeofday()
-        *      so the I/O thread needs to be on the same 
-        *      processor as the main and timing threads
-        *      to ensure consistent QueryPerformanceCounter()
-        *      results.
+        * Socket and refclock receive call gettimeofday() so the I/O
+        * thread needs to be on the same processor as the main and
+        * timing threads to ensure consistent QueryPerformanceCounter()
+        * results.
+        *
+        * This gets seriously into the way of efficient thread pooling
+        * on multicore systems.
         */
        lock_thread_to_processor(GetCurrentThread());
 
-       /*      Set the thread priority high enough so I/O will
-        *      preempt normal recv packet processing, but not
-        *      higher than the timer sync thread.
+       /*
+        * Set the thread priority high enough so I/O will preempt
+        * normal recv packet processing, but not higher than the timer
+        * sync thread.
         */
        if (!SetThreadPriority(GetCurrentThread(),
                               THREAD_PRIORITY_ABOVE_NORMAL))
                msyslog(LOG_ERR, "Can't set thread priority: %m");
 
-       while (TRUE) {
-               bSuccess = GetQueuedCompletionStatus(
+       for(;;) {
+               if (GetQueuedCompletionStatus(
                                        hIoCompletionPort, 
                                        &octets, 
                                        &key, 
                                        &pol, 
-                                       INFINITE);
+                                       INFINITE))
+                       err = ERROR_SUCCESS;
+               else
+                       err = GetLastError();
                if (NULL == pol) {
                        DPRINTF(2, ("Overlapped IO Thread Exiting\n"));
                        break; /* fail */
                }
+               lpo = CONTAINEROF(pol, IoCtx_t, ol);
+               get_systime(&lpo->RecvTime);
+               lpo->byteCount = octets;
+               lpo->errCode   = err;
                handler_calls++;
-               lpo = CONTAINEROF(pol, olplus, ol);
-               rio = (struct refclockio *)key;
-               
-               /*
-                * Deal with errors
-                */
-               if (bSuccess)
-                       err = 0;
-               else
-                       err = GetLastError();
-
-               /*
-                * Invoke the appropriate function based on
-                * the value of the request_type
-                */
-               switch (lpo->request_type) {
-
-               case SOCK_RECV:
-                       OnSocketRecv(key, lpo, octets, err);
-                       break;
-
-               case SOCK_SEND:
-                       NTP_INSIST(0);
-                       break;
-
-               case SERIAL_WAIT:
-                       OnSerialWaitComplete(rio, lpo, octets, err);
-                       break;
-
-               case SERIAL_READ:
-                       OnSerialReadComplete(rio, lpo, octets, err);
-                       break;
-
-               case RAW_SERIAL_READ:
-                       OnRawSerialReadComplete(rio, lpo, octets, err);
-                       break;
-
-               case SERIAL_WRITE:
-                       OnWriteComplete(key, lpo, octets, err);
-                       break;
-
-               default:
-                       DPRINTF(1, ("Unknown request type %d found in completion port\n",
-                                   lpo->request_type));
-                       break;
-               }
+               (*lpo->onIoDone)(key, lpo);
        }
 
        return 0;
 }
 
-/*  Create/initialise the I/O creation port
+/*
+ * -------------------------------------------------------------------
+ * Create/initialise the I/O creation port
  */
 void
 init_io_completion_port(
        void
        )
 {
-       unsigned tid;
-       HANDLE thread;
 
 #ifdef DEBUG
-       InitializeCriticalSection(&compl_info_lock);
        atexit(&free_io_completion_port_mem);
 #endif
 
-#ifdef USE_HEAP
-       /*
-        * Create a handle to the Heap
-        */
-       hHeapHandle = HeapCreate(0, 20*sizeof(olplus), 0);
-       if (hHeapHandle == NULL)
-       {
-               msyslog(LOG_ERR, "Can't initialize Heap: %m");
-               exit(1);
-       }
-#endif
+       /* Create the context pool first. */
+       IoCtxPoolInit(20);
 
-       /* Create the event used to signal an IO event
-        */
-       WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, WAITABLEIOEVENTHANDLE);
+       /* Create the event used to signal an IO event */
+       WaitableIoEventHandle = CreateEvent(
+               NULL, FALSE, FALSE, NULL);
        if (WaitableIoEventHandle == NULL) {
                msyslog(LOG_ERR,
                "Can't create I/O event handle: %m - another process may be running - EXITING");
                exit(1);
        }
-       /* Create the event used to signal an exit event
-        */
+       /* Create the event used to signal an exit event */
        WaitableExitEventHandle = CreateEvent(NULL, FALSE, FALSE, NULL);
        if (WaitableExitEventHandle == NULL) {
                msyslog(LOG_ERR,
@@ -294,18 +335,16 @@ init_io_completion_port(
                exit(1);
        }
 
-       /* Create the IO completion port
-        */
-       hIoCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+       /* Create the IO completion port */
+       hIoCompletionPort = CreateIoCompletionPort(
+               INVALID_HANDLE_VALUE, NULL, 0, 0);
        if (hIoCompletionPort == NULL) {
                msyslog(LOG_ERR, "Can't create I/O completion port: %m");
                exit(1);
        }
 
 
-       /*
-        * Initialize the Wait Handles
-        */
+       /* Initialize the Wait Handles table */
        WaitHandles[0] = WaitableIoEventHandle;
        WaitHandles[1] = WaitableExitEventHandle; /* exit request */
        WaitHandles[2] = WaitableTimerHandle;
@@ -320,52 +359,102 @@ init_io_completion_port(
        addremove_io_semaphore = &ntpd_addremove_semaphore;
 
        /*
-        * Have one thread servicing I/O - there were 4, but this would 
-        * somehow cause NTP to stop replying to ntpq requests; TODO
+        * Have one thread servicing I/O. See rationale in front matter.
         */
-       thread = (HANDLE)_beginthreadex(
+       hIoCompletionThread = (HANDLE)_beginthreadex(
                NULL, 
                0, 
                iocompletionthread, 
                NULL, 
-               CREATE_SUSPENDED, 
-               &tid);
-       ResumeThread(thread);
-       CloseHandle(thread);
+               0, 
+               NULL);
 }
 
-
+/*
+ * -------------------------------------------------------------------
+ * completion port teardown
+ */
 void
-ntpd_addremove_semaphore(
-       HANDLE  sem,
-       int     remove
+uninit_io_completion_port(
+       void
        )
 {
-       const size_t    hnd_sz = sizeof(WaitHandles[0]);
-       u_int           hi;
+       /* do noting if completion port already gone. */
+       if (NULL == hIoCompletionPort)
+               return;
 
-       if (!remove) {
-               INSIST((ActiveWaitHandles + 1) < COUNTOF(WaitHandles));
-               WaitHandles[ActiveWaitHandles] = sem;
-               ActiveWaitHandles++;
+       /*
+        * Service thread seems running. Terminate him with grace
+        * first and force later...
+        */
 
-               return;
+       /* post exit request for thread. */
+       PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, 0);
+       IoCtxPoolDone(); /* stop using the memory pool */
+
+       if (WAIT_TIMEOUT == WaitForSingleObject(hIoCompletionThread, 5000)) {
+               /* Everything is lost. Kill off with TerminateThread. */
+               msyslog(LOG_ERR, "IO completion thread refuses to terminate");
+               TerminateThread(hIoCompletionThread, ~0UL);
        }
 
-       /* removing semaphore */
+       /* now reap all handles... */
+       CloseHandle(hIoCompletionThread);
+       hIoCompletionThread = NULL;
+       CloseHandle(hIoCompletionPort);
+       hIoCompletionPort = NULL;
+}
+
+
+/*
+ * -------------------------------------------------------------------
+ * external worker thread support (wait handle stuff)
+ *
+ * !Attention!
+ *
+ *  - This function must only be called from the main thread. Changing
+ *    a set of wait handles while someone is waiting on it creates
+ *    undefined behaviour. Also there's no provision for mutual
+ *    exclusion when accessing global values. 
+ *
+ *  - It's not possible to register a handle that is already in the table.
+ */
+static void
+ntpd_addremove_semaphore(
+       HANDLE  sem,
+       int     remove
+       )
+{
+       DWORD   hi;
+
+       /* search for a matching entry first. */
        for (hi = 3; hi < ActiveWaitHandles; hi++)
                if (sem == WaitHandles[hi])
                        break;
 
-       if (hi == ActiveWaitHandles)
-               return;
-
-       ActiveWaitHandles--;
-       if (hi < ActiveWaitHandles)
-               memmove(&WaitHandles[hi],
-                       &WaitHandles[hi + 1],
-                       (ActiveWaitHandles - hi) * hnd_sz);
-       WaitHandles[ActiveWaitHandles] = NULL;
+       if (remove) {
+               /*
+                * If found, eventually swap with last entry to keep
+                * the table dense.
+                */
+               if (hi < ActiveWaitHandles) {
+                       ActiveWaitHandles--;
+                       if (hi < ActiveWaitHandles)
+                               WaitHandles[hi] =
+                                   WaitHandles[ActiveWaitHandles];
+                       WaitHandles[ActiveWaitHandles] = NULL;
+               }
+       } else {
+               /*
+                * Make sure the entry is not found and there is enough
+                * room, then append to the table array.
+                */
+               if (hi >= ActiveWaitHandles) {
+                       NTP_INSIST(ActiveWaitHandles < COUNTOF(WaitHandles));
+                       WaitHandles[ActiveWaitHandles] = sem;
+                       ActiveWaitHandles++;
+               }
+       }
 }
 
 
@@ -375,330 +464,500 @@ free_io_completion_port_mem(
        void
        )
 {
-       olplus *        pci;
-
-#if defined(_MSC_VER) && defined (_DEBUG)
-       _CrtCheckMemory();
-#endif
-       LOCK_COMPL();
-       while ((pci = compl_info_list) != NULL) {
-
-#if 0  /* sockaddr with received-from address in recvbuf */
-       /* is sometimes modified by system after we free it  */
-       /* triggering heap corruption warning -- find a */
-       /* better way to free it after I/O is surely done */
-               /* this handles both xmit and recv buffs */
-               if (pci->recv_buf != NULL) {
-                       DPRINTF(1, ("freeing xmit/recv buff %p\n", pci->recv_buf));
-                       free(pci->recv_buf);
-               }
-#endif
-
-               FreeHeap(pci, "free_io_completion_port_mem");
-               /* FreeHeap() removed this item from compl_info_list */
-       }
-       UNLOCK_COMPL();
-
-#if defined(_MSC_VER) && defined (_DEBUG)
-       _CrtCheckMemory();
-#endif
+       /*
+        * At the moment, do absolutely nothing. Returning memory here
+        * requires NO PENDING OVERLAPPED OPERATIONS AT ALL at this
+        * point in time, and as long we cannot be reasonable sure about
+        * that the simple advice is:
+        *
+        * HANDS OFF!
+        */
 }
 #endif /* DEBUG */
 
 
-void
-uninit_io_completion_port(
-       void
+/*
+ * -------------------------------------------------------------------
+ * Serial IO stuff
+ *
+ * Prelude -- common error checking code
+ * -------------------------------------------------------------------
+ */
+extern char * NTstrerror(int err, BOOL *bfreebuf);
+
+static BOOL
+IoResultCheck(
+       DWORD           err,
+       IoCtx_t *       ctx,
+       const char *    msg
        )
 {
-       if (hIoCompletionPort != NULL) {
-               /*  Get each of the service threads to exit
-               */
-               signal_io_completion_port_exit();
+       char * msgbuf;
+       BOOL   dynbuf;
+
+       /* If the clock is not / no longer active, assume
+        * 'ERROR_OPERATION_ABORTED' and do the necessary cleanup.
+        */
+       if (ctx->rio && !ctx->rio->active)
+               err = ERROR_OPERATION_ABORTED;
+       
+       switch (err)
+       {
+               /* The first ones are no real errors. */
+       case ERROR_SUCCESS:     /* all is good */
+       case ERROR_IO_PENDING:  /* callback pending */
+               return TRUE;
+
+               /* the next ones go silently -- only cleanup is done */
+       case ERROR_INVALID_PARAMETER:   /* handle already closed */
+       case ERROR_OPERATION_ABORTED:   /* handle closed while wait */
+               break;
+
+
+       default:
+               /* We have to resort to the low level error formatting
+                * functions here, since the error code can be an
+                * overlapped result. Relying the value to be the same
+                * as the 'GetLastError()' result at this point of
+                * execution is shaky at best, and using SetLastError()
+                * to force it seems too nasty.
+                */
+               msgbuf = NTstrerror(err, &dynbuf);
+               msyslog(LOG_ERR, "%s: err=%u, '%s'", msg, err, msgbuf);
+               if (dynbuf)
+                       LocalFree(msgbuf);
+               break;
+       }
+
+       /* If we end here, we have to mop up the buffer and context */
+       if (ctx->flRawMem) {
+               if (ctx->trans_buf)
+                       free(ctx->trans_buf);
+       } else {
+               if (ctx->recv_buf)
+                       freerecvbuf(ctx->recv_buf);
        }
+       IoCtxFree(ctx);
+       return FALSE;
 }
 
+/*
+ * -------------------------------------------------------------------
+ * Serial IO stuff
+ *
+ * Part 1 -- COMM event handling
+ * -------------------------------------------------------------------
+ */
 
-static int
+static BOOL
 QueueSerialWait(
        struct refclockio *     rio,
        recvbuf_t *             buff,
-       olplus *                lpo,
-       BOOL                    clear_timestamp
+       IoCtx_t *               lpo
        )
 {
-       DWORD err;
+       BOOL   rc;
 
-       lpo->request_type = SERIAL_WAIT;
+       lpo->onIoDone = OnSerialWaitComplete;
        lpo->recv_buf = buff;
-
-       if (clear_timestamp)
-               ZERO(buff->recv_time);
-
-       buff->fd = _get_osfhandle(rio->fd);
-       if (!WaitCommEvent((HANDLE)buff->fd,
-                          (DWORD *)&buff->recv_buffer, &lpo->ol)) {
-               err = GetLastError();
-               if (ERROR_IO_PENDING != err) {
-                       /*
-                        * ERROR_INVALID_PARAMETER occurs after rio->fd
-                        * is closed.
-                        */
-                       if (ERROR_INVALID_PARAMETER != err)
-                               msyslog(LOG_ERR, "Can't wait on Refclock: %m");
-                       freerecvbuf(buff);
-                       return 0;
-               }
-       }
-       return 1;
+       lpo->flRawMem = 0;
+       lpo->rio      = rio;
+       buff->fd      = rio->fd;
+
+       rc = WaitCommEvent((HANDLE)_get_osfhandle(rio->fd),
+                          &lpo->com_events, &lpo->ol);
+       if (!rc)
+               return IoResultCheck(GetLastError(), lpo,
+                                    "Can't wait on Refclock");
+       return TRUE;
 }
 
+/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
 
-static int 
+static void 
 OnSerialWaitComplete(
-       struct refclockio *     rio,
-       olplus *                lpo,
-       DWORD                   Bytes,
-       int errstatus
+       ULONG_PTR       key,       
+       IoCtx_t *       lpo
        )
 {
-       recvbuf_t *buff;
-       struct peer *pp;
-       l_fp arrival_time;
-       DWORD comm_mask;
-       DWORD modem_status;
-       static const l_fp zero_time = { 0 };
-       BOOL rc;
-
-       get_systime(&arrival_time);
+       struct refclockio *     rio;
+       recvbuf_t *             buff;
+       DWORD                   modem_status;
 
-       if (!rio->active)
-               return FALSE;
+       /* check and bail out if operation failed */
+       if (!IoResultCheck(lpo->errCode, lpo,
+                          "WaitCommEvent failed"))
+               return;
 
-       /*
-        * Get the recvbuf pointer from the overlapped buffer.
-        */
+       /* get & validate context and buffer. */
+       rio  = lpo->rio;
        buff = lpo->recv_buf;
-       comm_mask = (*(DWORD *)&buff->recv_buffer);
+       NTP_INSIST((ULONG_PTR)rio == key);
+
 #ifdef DEBUG
-               if (errstatus || comm_mask &
-                   ~(EV_RXFLAG | EV_RLSD | EV_RXCHAR)) {
-                       msyslog(LOG_ERR, "WaitCommEvent returned unexpected mask %x errstatus %d",
-                               comm_mask, errstatus);
-                       exit(-1);
-               }
+       if (lpo->com_events & ~(EV_RXFLAG | EV_RLSD | EV_RXCHAR)) {
+               msyslog(LOG_ERR, "WaitCommEvent returned unexpected mask %x",
+                       lpo->com_events);
+               exit(-1);
+       }
 #endif
-               if (EV_RLSD & comm_mask) { 
-                       modem_status = 0;
-                       GetCommModemStatus((HANDLE)buff->fd, &modem_status);
-                       if (modem_status & MS_RLSD_ON) {
-                               /*
-                                * Use the timestamp from this PPS CD not
-                                * the later end of line.
-                                */
-                               buff->recv_time = arrival_time;
-                       }
-
-                       if (EV_RLSD == comm_mask) {
-                               /*
-                                * if we didn't see an end of line yet
-                                * issue another wait for it.
-                                */
-                               QueueSerialWait(rio, buff, lpo, FALSE);
-                               return TRUE;
-                       }
-               }
-
-               if (EV_RXCHAR & comm_mask) {            /* raw discipline */
-                       QueueRawSerialRead(rio, buff, lpo);
-                       return TRUE;
-               }
-
-               /*
-                * We've detected the end of line of serial input.
-                * Use this timestamp unless we already have a CD PPS
-                * timestamp in buff->recv_time.
-                */
-               if (memcmp(&buff->recv_time, &zero_time, sizeof buff->recv_time)) {
-                       /*
-                        * We will first see a user PPS timestamp here on either
-                        * the first or second line of text.  Log a one-time
-                        * message while processing the second line.
-                        */
-                       if (1 == rio->recvcount) {
-                               pp = (struct peer *)rio->srcclock;
-                               msyslog(LOG_NOTICE, "Using user-mode PPS timestamp for %s",
-                                       refnumtoa(&pp->srcadr));
-                       }
-               } else {
-                       buff->recv_time = arrival_time;
+       /* Take note of changes on DCD; 'user space PPS' */
+       if (EV_RLSD & lpo->com_events) { 
+               modem_status = 0;
+               GetCommModemStatus((HANDLE)buff->fd, &modem_status);
+               if (modem_status & MS_RLSD_ON) {
+                       lpo->DCDSTime = lpo->RecvTime;
+                       lpo->flTsDCDS = 1;
                }
+       }
 
-               /*
-                * Now that we have a complete line waiting, read it.
-                * There is still a race here, but we're likely to win.
-                */
+       /* If IO ready, read data. Go back waiting else. */
+       if (EV_RXFLAG & lpo->com_events) {              /* line discipline */
+               lpo->FlagTime = lpo->RecvTime;
+               lpo->flTsFlag = 1;
+               QueueSerialRead(rio, buff, lpo);
+       } else if (EV_RXCHAR & lpo->com_events) {       /* raw discipline */
+               QueueRawSerialRead(rio, buff, lpo);
+       } else {                                        /* idle... */
+               QueueSerialWait(rio, buff, lpo);
+       }
+}
 
-               lpo->request_type = SERIAL_READ;
 
-               rc = ReadFile(
-                       (HANDLE)buff->fd,
-                       buff->recv_buffer,
-                       sizeof(buff->recv_buffer),
-                       NULL,
-                       &lpo->ol);
+/*
+ * -------------------------------------------------------------------
+ * Serial IO stuff
+ *
+ * Part 2 -- line discipline emulation
+ *
+ * Ideally this should *not* be done in the IO completion thread.
+ * We use a worker pool thread to offload the low-level processing.
+ * -------------------------------------------------------------------
+ */
 
-               if (!rc && ERROR_IO_PENDING != GetLastError()) {
-                       msyslog(LOG_ERR, "Can't read from Refclock: %m");
-                       freerecvbuf(buff);
-                       return 0;
-               }
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * Start & Queue a serial read for line discipline emulation.
+ */
+static BOOL
+QueueSerialRead(
+       struct refclockio *     rio,
+       recvbuf_t *             buff,
+       IoCtx_t *               lpo
+       )
+{
+       BOOL   rc;
 
-       return 1;
+       lpo->onIoDone = OnSerialReadComplete;
+       lpo->recv_buf = buff;
+       lpo->flRawMem = 0;
+       lpo->rio      = rio;
+       buff->fd      = rio->fd;
+
+       rc = ReadFile((HANDLE)_get_osfhandle(rio->fd),
+                     (char*)buff->recv_buffer  + buff->recv_length,
+                     sizeof(buff->recv_buffer) - buff->recv_length,
+                     NULL, &lpo->ol);
+       if (!rc)
+               return IoResultCheck(GetLastError(), lpo,
+                                    "Can't read from Refclock");
+       return TRUE;
 }
 
-
-/* Return 1 on Successful Read */
-static int 
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * IO completion thread callback. Takes a time stamp and offloads the
+ * real work to the worker pool ASAP.
+ */
+static void
 OnSerialReadComplete(
-       struct refclockio *     rio,
-       olplus *                lpo,
-       DWORD                   Bytes,
-       int                     errstatus
+       ULONG_PTR       key,
+       IoCtx_t *       lpo
        )
 {
+       struct refclockio *     rio;
        recvbuf_t *             buff;
-       l_fp                    cr_time;
-       int                     consumed;
 
-       if (!rio->active)
-               return FALSE;
+       /* check and bail out if operation failed */
+       if (!IoResultCheck(lpo->errCode, lpo,
+                          "Read from Refclock failed"))
+               return;
 
-       /*
-        * Get the recvbuf pointer from the overlapped buffer.
-        */
+       /* get & validate context and buffer. */
+       rio  = lpo->rio;
        buff = lpo->recv_buf;
+       NTP_INSIST((ULONG_PTR)rio == key);
+
+       /* Offload to worker pool */
+       if (!QueueUserWorkItem(OnSerialReadWorker, lpo, WT_EXECUTEDEFAULT)) {
+               msyslog(LOG_ERR,
+                       "Can't offload to worker thread, will skip data: %m");
+               ZERO(*lpo);
+               buff->recv_length = 0;
+               QueueSerialWait(rio, buff, lpo);
+       }
+}
+
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * Worker pool offload function -- avoid lengthy operations in the IO
+ * completion thread (affects timing...)
+ *
+ * This function does the real work of emulating the UN*X line
+ * discipline. Since this involves allocation of additional buffers and
+ * string parsing/copying, it is offloaded to the worker thread pool so
+ * the IO completion thread can resume faster.
+ */
+static DWORD WINAPI
+OnSerialReadWorker(void * ctx)
+{
+       IoCtx_t *               lpo;
+       recvbuf_t *             buff, *obuf;
+       struct refclockio *     rio;
+       char                    *sptr, *send, *dptr;
+       BOOL                    eol;
+       char                    ch;
 
+       /* Get context back */
+       lpo  = (IoCtx_t*)ctx;
+       buff = lpo->recv_buf;
+       rio  = lpo->rio;
        /*
-        * ignore 0 bytes read due to timeout's and closure on fd
+        * ignore 0 bytes read due to closure on fd.
+        * Eat the first line of input as it's possibly partial.
         */
-       if (!errstatus && Bytes) {
-               buff->recv_length = (int) Bytes;
-               buff->receiver = process_refclock_packet;
-               buff->dstadr = NULL;
-               buff->recv_peer = rio->srcclock;
+       if (lpo->byteCount && rio->recvcount++) {
+               /* account for additional input */
+               buff->recv_length += (int)lpo->byteCount;
+
                /*
-                * Eat the first line of input as it's possibly
-                * partial and if a PPS is present, it may not 
-                * have fired since the port was opened.
+                * Now mimic the Unix line discipline. 
                 */
-               if (rio->recvcount++ > 0) {
-                       cr_time = buff->recv_time;
-                       buff->fd = rio->fd;             /* was handle */
-                       add_full_recv_buffer(buff);
+               sptr = (char *)buff->recv_buffer;
+               send = sptr + buff->recv_length;
+               obuf = NULL;
+               dptr = NULL;
+               while (sptr != send)
+               {
+                       /* get new buffer to store line */
+                       obuf = get_free_recv_buffer_alloc();
+                       obuf->recv_time   = lpo->RecvTime;
+                       obuf->recv_length = 0;
+                       obuf->fd          = rio->fd;
+                       obuf->receiver    = process_refclock_packet;
+                       obuf->dstadr      = NULL;
+                       obuf->recv_peer   = rio->srcclock;
+
+                       /*
+                        * Time stamp assignment is interesting.  If we
+                        * have a DCD stamp, we use it, otherwise we use
+                        * the FLAG char event time, and if that is also
+                        * not / no longer available we use the arrival
+                        * time.
+                        */
+                       if (lpo->flTsDCDS)
+                               obuf->recv_time = lpo->DCDSTime;
+                       else if (lpo->flTsFlag)
+                               obuf->recv_time = lpo->FlagTime;
+                       else
+                               obuf->recv_time = lpo->RecvTime;
+                       lpo->flTsDCDS = lpo->flTsFlag = 0; /* use only once... */
+
                        /*
-                        * Mimic Unix line discipline and assume CR/LF
-                        * line termination.  On Unix the CR terminates
-                        * the line containing the timecode, and
-                        * immediately after the LF terminates an empty
-                        * line.  So synthesize the empty LF-terminated
-                        * line using the same CR timestamp.  Both CR
-                        * and LF are stripped by refclock_gtlin().
+                        * Copy data to new buffer, convert CR to LF on
+                        * the fly. Stop after LF.
                         */
-                       buff = get_free_recv_buffer_alloc();
-                       buff->recv_time = cr_time;
+                       dptr = (char *)obuf->recv_buffer;
+                       eol  = FALSE;
+                       while (sptr != send && !eol) {
+                               ch  = *sptr++;
+                               if (ch == '\r')
+                                       ch = '\n';
+                               *dptr++ = ch;
+                               eol = (ch == '\n');
+                       }
+                       obuf->recv_length =
+                           (int)(dptr - (char *)obuf->recv_buffer);
+
+                       /*
+                        * If NL found, push this buffer and prepare to
+                        * get a new one.
+                        */
+                       if (eol) {
+                               add_full_recv_buffer(obuf);
+                               SetEvent(WaitableIoEventHandle);
+                               obuf = NULL;
+                       }
+               }
+
+               /*
+                * If we still have an output buffer, continue to fill
+                * it again.
+                */
+               if (obuf) {
+                       obuf->recv_length =
+                           (int)(dptr - (char *)obuf->recv_buffer);
+                       freerecvbuf(buff);
+                       buff = obuf;
+               } else {
+                       /* clear the current buffer, continue */
                        buff->recv_length = 0;
-                       buff->fd = rio->fd;
-                       buff->receiver = process_refclock_packet;
-                       buff->dstadr = NULL;
-                       buff->recv_peer = rio->srcclock;
-                       add_full_recv_buffer(buff);
-                       SetEvent(WaitableIoEventHandle);
-                       buff = get_free_recv_buffer_alloc();
                }
+       } else {
+               ZERO(*lpo);
+               buff->recv_length = 0;
        }
 
-       QueueSerialWait(rio, buff, lpo, TRUE);
-
-       return 1;
+       QueueSerialWait(rio, buff, lpo);
+       return 0;
 }
 
 
 /*
- * QueueRawSerialRead() returns TRUE if successful.
+ * -------------------------------------------------------------------
+ * Serial IO stuff
+ *
+ * Part 3 -- raw data input
+ *
+ * Raw data processing is fast enough to do without offloading to the
+ * worker pool, so this is rather short'n sweet...
+ * -------------------------------------------------------------------
  */
-static int
+
+static BOOL
 QueueRawSerialRead(
        struct refclockio *     rio,
        recvbuf_t *             buff,
-       olplus *                lpo
+       IoCtx_t *               lpo
        )
 {
-       BOOL            rc;
+       BOOL   rc;
 
-       buff->fd = _get_osfhandle(rio->fd);
-       lpo->request_type = RAW_SERIAL_READ;
+       lpo->onIoDone = OnRawSerialReadComplete;
        lpo->recv_buf = buff;
-       rc = ReadFile((HANDLE)buff->fd, buff->recv_buffer,
-                     sizeof(buff->recv_buffer), NULL, &lpo->ol);
-       if (!rc && ERROR_IO_PENDING != GetLastError()) {
-               msyslog(LOG_ERR, "Can't raw read from Refclock: %m");
-               freerecvbuf(buff);
-               return FALSE;
-       }
-
+       lpo->flRawMem = 0;
+       lpo->rio      = rio;
+       buff->fd      = rio->fd;
+
+       rc = ReadFile((HANDLE)_get_osfhandle(rio->fd),
+                     buff->recv_buffer,
+                     sizeof(buff->recv_buffer),
+                     NULL, &lpo->ol);
+       if (!rc)
+               return IoResultCheck(GetLastError(), lpo,
+                                    "Can't read raw from Refclock");
        return TRUE;
 }
 
 
-static int 
+static void 
 OnRawSerialReadComplete(
-       struct refclockio *     rio,
-       olplus *                lpo,
-       DWORD                   octets,
-       int                     errstatus
+       ULONG_PTR       key,
+       IoCtx_t *       lpo
        )
 {
-       recvbuf_t *             rbufp;
-       l_fp                    arrival_time;
-       int                     consumed;
-
-       get_systime(&arrival_time);
+       struct refclockio *     rio;
+       recvbuf_t *             buff;
 
-       if (!rio->active)
-               return FALSE;
+       /* check and bail out if operation failed */
+       if (!IoResultCheck(lpo->errCode, lpo,
+                          "Raw read from Refclock failed"))
+               return;
 
-       rbufp = lpo->recv_buf;
+       /* get & validate context and buffer. */
+       rio  = lpo->rio;
+       buff = lpo->recv_buf;
+       NTP_INSIST((ULONG_PTR)rio == key);
 
        /* ignore 0 bytes read. */
-       if (NO_ERROR == errstatus && octets > 0) {
-               rbufp->recv_length = (int)octets;
-               rbufp->dstadr = NULL;
-               rbufp->recv_time = arrival_time;
-               rbufp->receiver = process_refclock_packet;
-               rbufp->recv_peer = rio->srcclock;
-               rbufp->fd = rio->fd; /* was handle */
-
-               add_full_recv_buffer(rbufp);
+       if (lpo->byteCount > 0) {
+               buff->recv_length = (int)lpo->byteCount;
+               buff->dstadr      = NULL;
+               buff->recv_time   = lpo->RecvTime;
+               buff->receiver    = process_refclock_packet;
+               buff->recv_peer   = rio->srcclock;
+               add_full_recv_buffer(buff);
                SetEvent(WaitableIoEventHandle);
-               rbufp = get_free_recv_buffer_alloc();
+               buff = get_free_recv_buffer_alloc();
+       }
+
+       buff->recv_length = 0;
+       QueueSerialWait(rio, buff, lpo);
+}
+
+/*
+ * -------------------------------------------------------------------
+ * Serial IO stuff
+ *
+ * Part 4 -- Overlapped serial output
+ *
+ * Again, no need to offload any work.
+ * -------------------------------------------------------------------
+ */
+
+/*
+ * async_write, clone of write(), used by some reflock drivers
+ */
+int    
+async_write(
+       int             fd,
+       const void *    data,
+       unsigned int    count
+       )
+{
+       IoCtx_t *       lpo;
+       BOOL            rc;
+
+       lpo  = IoCtxAlloc();
+       if (lpo == NULL) {
+               DPRINTF(1, ("async_write: out of memory\n"));
+               errno = ENOMEM;
+               return -1;
        }
 
-       QueueSerialWait(rio, rbufp, lpo, TRUE);
+       lpo->onIoDone  = OnSerialWriteComplete;
+       lpo->trans_buf = emalloc(count);
+       lpo->flRawMem  = 1;
+       memcpy(lpo->trans_buf, data, count);
+
+       rc = WriteFile((HANDLE)_get_osfhandle(fd),
+                      lpo->trans_buf, count,
+                      NULL, &lpo->ol);
+       if (!rc && !IoResultCheck(GetLastError(), lpo,
+                                 "Can't write to Refclock")) {
+               errno = EBADF;
+               return -1;
+       }
+       return count;
+}
 
-       return 1;
+static void
+OnSerialWriteComplete(
+       ULONG_PTR       key,
+       IoCtx_t *       lpo
+       )
+{
+       /* set RIO and force silent cleanup if no error */
+       lpo->rio = (struct refclockio *)key;
+       if (ERROR_SUCCESS == lpo->errCode)
+               lpo->errCode = ERROR_OPERATION_ABORTED;
+       IoResultCheck(lpo->errCode, lpo,
+                     "Write to Refclock failed");
 }
 
 
-/*  Add a reference clock data structures I/O handles to
- *  the I/O completion port. Return 1 if any error.
+
+
+/*
+ * Add a reference clock data structures I/O handles to
+ * the I/O completion port. Return 1 if any error.
  */  
 int
 io_completion_port_add_clock_io(
        struct refclockio *rio
        )
 {
-       olplus *        lpo;
+       IoCtx_t *       lpo;
        recvbuf_t *     buff;
        HANDLE          h;
 
@@ -712,14 +971,14 @@ io_completion_port_add_clock_io(
                return 1;
        }
 
-       lpo = GetHeapAlloc("io_completion_port_add_clock_io");
+       lpo = IoCtxAlloc();
        if (NULL == lpo) {
                msyslog(LOG_ERR, "Can't allocate heap for completion port: %m");
                return 1;
        }
-
        buff = get_free_recv_buffer_alloc();
-       QueueSerialWait(rio, buff, lpo, TRUE);
+       buff->recv_length = 0;
+       QueueSerialWait(rio, buff, lpo);
 
        return 0;
 }
@@ -730,100 +989,74 @@ io_completion_port_add_clock_io(
  *  Note: As per the winsock documentation, we use WSARecvFrom. Using
  *       ReadFile() is less efficient.
  */
-static unsigned long 
+static BOOL 
 QueueSocketRecv(
-       SOCKET s,
-       recvbuf_t *buff,
-       olplus *lpo
+       SOCKET          s,
+       recvbuf_t *     buff,
+       IoCtx_t *       lpo
        )
 {
        WSABUF wsabuf;
-       DWORD Flags;
-       DWORD Result;
+       DWORD  Flags;
+       int    rc;
 
-       lpo->request_type = SOCK_RECV;
+       lpo->onIoDone = OnSocketRecv;
        lpo->recv_buf = buff;
-
-       if (buff != NULL) {
-               Flags = 0;
-               buff->fd = s;
-               buff->recv_srcadr_len = sizeof(buff->recv_srcadr);
-               wsabuf.buf = (char *)buff->recv_buffer;
-               wsabuf.len = sizeof(buff->recv_buffer);
-
-               if (SOCKET_ERROR == WSARecvFrom(buff->fd, &wsabuf, 1, 
-                                               NULL, &Flags, 
-                                               &buff->recv_srcadr.sa, 
-                                               &buff->recv_srcadr_len, 
-                                               &lpo->ol, NULL)) {
-                       Result = GetLastError();
-                       switch (Result) {
-                               case NO_ERROR :
-                               case WSA_IO_PENDING :
-                                       break ;
-
-                               case WSAENOTSOCK :
-                                       msyslog(LOG_ERR, "Can't read from non-socket fd %d: %m", (int)buff->fd);
-                                       /* return the buffer */
-                                       freerecvbuf(buff);
-                                       return 0;
-                                       break;
-
-                               case WSAEFAULT :
-                                       msyslog(LOG_ERR, "The buffers parameter is incorrect: %m");
-                                       /* return the buffer */
-                                       freerecvbuf(buff);
-                                       return 0;
-                               break;
-
-                               default :
-                                 /* nop */ ;
-                       }
-               }
-       }
-       else 
-               return 0;
-       return 1;
+       lpo->flRawMem = 0;
+       lpo->rio      = NULL;
+
+       Flags = 0;
+       buff->fd = s;
+       buff->recv_srcadr_len = sizeof(buff->recv_srcadr);
+       wsabuf.buf = (char *)buff->recv_buffer;
+       wsabuf.len = sizeof(buff->recv_buffer);
+
+       rc = WSARecvFrom(buff->fd, &wsabuf, 1, NULL, &Flags, 
+                        &buff->recv_srcadr.sa, &buff->recv_srcadr_len, 
+                        &lpo->ol, NULL);
+       if (SOCKET_ERROR == rc) 
+               return IoResultCheck(GetLastError(), lpo,
+                                    "Can't read from Socket");
+       return TRUE;
 }
 
 
-/* Returns 0 if any Error */
-static int 
-OnSocketRecv(ULONG_PTR i, olplus *lpo, DWORD Bytes, int errstatus)
+static void 
+OnSocketRecv(
+       ULONG_PTR       key,
+       IoCtx_t *       lpo
+       )
 {
-       struct recvbuf *buff = NULL;
-       recvbuf_t *newbuff;
-       l_fp arrival_time;
-       struct interface * inter = (struct interface *) i;
+       recvbuf_t * buff;
+       recvbuf_t * newbuff;
+       struct interface * inter = (struct interface *)key;
        
-       get_systime(&arrival_time);
-
        NTP_REQUIRE(NULL != lpo);
        NTP_REQUIRE(NULL != lpo->recv_buf);
 
+       /* check and bail out if operation failed */
+       if (!IoResultCheck(lpo->errCode, lpo,
+                          "Read from Socket failed"))
+               return;
+
        /*
         * Convert the overlapped pointer back to a recvbuf pointer.
+        * Fetch items that are lost when the context is queued again.
         */
        buff = lpo->recv_buf;
-
-       /*
-        * If the socket is closed we get an Operation Aborted error
-        * Just clean up
-        */
-       if (errstatus == WSA_OPERATION_ABORTED)
-       {
-               freerecvbuf(buff);
-               lpo->recv_buf = NULL;
-               FreeHeap(lpo, "OnSocketRecv: Socket Closed");
-               return (1);
-       }
+       buff->recv_time   = lpo->RecvTime;
+       buff->recv_length = (int)lpo->byteCount;
 
        /*
         * Get a new recv buffer for the replacement socket receive
         */
        newbuff = get_free_recv_buffer_alloc();
-       QueueSocketRecv(inter->fd, newbuff, lpo);
-
+       if (NULL != newbuff) {
+               QueueSocketRecv(inter->fd, newbuff, lpo);
+       } else {
+               IoCtxFree(lpo);
+               msyslog(LOG_ERR, "Can't add I/O request to socket");
+       }
        DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n", 
                    (MODE_BROADCAST == get_packet_mode(buff))
                        ? " **** Broadcast "
@@ -834,14 +1067,11 @@ OnSocketRecv(ULONG_PTR i, olplus *lpo, DWORD Bytes, int errstatus)
        /*
         * If we keep it add some info to the structure
         */
-       if (Bytes && !inter->ignore_packets) {
+       if (buff->recv_length && !inter->ignore_packets) {
                NTP_INSIST(buff->recv_srcadr_len <=
                           sizeof(buff->recv_srcadr));
-               memcpy(&buff->recv_time, &arrival_time,
-                      sizeof(buff->recv_time));        
-               buff->recv_length = (int) Bytes;
                buff->receiver = &receive; 
-               buff->dstadr = inter;
+               buff->dstadr   = inter;
                packets_received++;
                handler_pkts++;
                inter->received++;
@@ -857,25 +1087,28 @@ OnSocketRecv(ULONG_PTR i, olplus *lpo, DWORD Bytes, int errstatus)
                SetEvent(WaitableIoEventHandle);
        } else
                freerecvbuf(buff);
-
-       return 1;
 }
 
 
-/*  Add a socket handle to the I/O completion port, and send 
- *  NTP_RECVS_PER_SOCKET recv requests to the kernel.
+/*
+ * Add a socket handle to the I/O completion port, and send 
+ * NTP_RECVS_PER_SOCKET recv requests to the kernel.
  */
-extern int
-io_completion_port_add_socket(SOCKET fd, struct interface *inter)
+int
+io_completion_port_add_socket(
+       SOCKET                  fd,
+       struct interface *      inter
+       )
 {
-       olplus *lpo;
-       recvbuf_t *buff;
-       int n;
+       IoCtx_t *       lpo;
+       recvbuf_t *     buff;
+       int             n;
 
        if (fd != INVALID_SOCKET) {
                if (NULL == CreateIoCompletionPort((HANDLE)fd, 
                    hIoCompletionPort, (ULONG_PTR)inter, 0)) {
-                       msyslog(LOG_ERR, "Can't add socket to i/o completion port: %m");
+                       msyslog(LOG_ERR,
+                               "Can't add socket to i/o completion port: %m");
                        return 1;
                }
        }
@@ -894,10 +1127,11 @@ io_completion_port_add_socket(SOCKET fd, struct interface *inter)
        for (n = 0; n < WINDOWS_RECVS_PER_SOCKET; n++) {
 
                buff = get_free_recv_buffer_alloc();
-               lpo = (olplus *) GetHeapAlloc("io_completion_port_add_socket");
+               lpo = IoCtxAlloc();
                if (lpo == NULL)
                {
-                       msyslog(LOG_ERR, "Can't allocate heap for completion port: %m");
+                       msyslog(LOG_ERR
+                               , "Can't allocate IO completion context: %m");
                        return 1;
                }
 
@@ -908,24 +1142,6 @@ io_completion_port_add_socket(SOCKET fd, struct interface *inter)
 }
 
 
-static int 
-OnWriteComplete(ULONG_PTR i, olplus *lpo, DWORD Bytes, int errstatus)
-{
-       void *buff;
-
-       UNUSED_ARG(i);
-       UNUSED_ARG(Bytes);
-       UNUSED_ARG(errstatus);
-
-       buff = lpo->trans_buf;
-       free(buff);
-       lpo->trans_buf = NULL;
-       FreeHeap(lpo, "OnWriteComplete");
-
-       return 1;
-}
-
-
 /*
  * io_completion_port_sendto() -- sendto() replacement for Windows
  *
@@ -955,9 +1171,8 @@ io_completion_port_sendto(
 
        Result = WSASendTo(fd, &wsabuf, 1, &octets_sent, 0,
                           &dest->sa, AddrLen, NULL, NULL);
-
+       errval = GetLastError();
        if (SOCKET_ERROR == Result) {
-               errval = GetLastError();
                if (ERROR_UNEXP_NET_ERR == errval) {
                        /*
                         * We get this error when trying to send if the
@@ -993,54 +1208,14 @@ io_completion_port_sendto(
 }
 
 
-/*
- * async_write, clone of write(), used by some reflock drivers
- */
-int    
-async_write(
-       int fd,
-       const void *data,
-       unsigned int count
-       )
-{
-       void *buff;
-       olplus *lpo;
-       DWORD BytesWritten;
-
-       buff = emalloc(count);
-       lpo = GetHeapAlloc("async_write");
-       if (lpo == NULL) {
-               free(buff);
-               DPRINTF(1, ("async_write: out of memory\n"));
-               errno = ENOMEM;
-               return -1;
-       }
-
-       lpo->request_type = SERIAL_WRITE;
-       lpo->trans_buf = buff;
-       memcpy(buff, data, count);
-
-       if (!WriteFile((HANDLE)_get_osfhandle(fd), buff, count,
-               &BytesWritten, &lpo->ol)
-               && ERROR_IO_PENDING != GetLastError()) {
-
-               msyslog(LOG_ERR, "async_write - error %m");
-               free(buff);
-               FreeHeap(lpo, "async_write");
-               errno = EBADF;
-               return -1;
-       }
-
-       return count;
-}
-
 
 /*
  * GetReceivedBuffers
  * Note that this is in effect the main loop for processing requests
  * both send and receive. This should be reimplemented
  */
-int GetReceivedBuffers()
+int
+GetReceivedBuffers()
 {
        DWORD   index;
        HANDLE  ready;
@@ -1070,12 +1245,14 @@ int GetReceivedBuffers()
                        break;
 
                case WAIT_TIMEOUT:
-                       msyslog(LOG_ERR, "ntpd: WaitForMultipleObjects INFINITE timed out.");
+                       msyslog(LOG_ERR,
+                               "WaitForMultipleObjects INFINITE timed out.");
                        exit(1);
                        break;
 
                case WAIT_FAILED:
-                       msyslog(LOG_ERR, "ntpd: WaitForMultipleObjects Failed: Error: %m");
+                       msyslog(LOG_ERR,
+                               "WaitForMultipleObjects Failed: Error: %m");
                        exit(1);
                        break;