]> git.ipfire.org Git - thirdparty/ntp.git/commitdiff
[winio2 - unlocked]
authorJuergen Perlinger <perlinger@ntp.org>
Mon, 24 Oct 2016 05:37:25 +0000 (07:37 +0200)
committerJuergen Perlinger <perlinger@ntp.org>
Mon, 24 Oct 2016 05:37:25 +0000 (07:37 +0200)
 - the great lock removal
 - the great renaming

bk: 580d9e150pFykOW-C6BG5aDl6QyLtQ

ChangeLog
ntpd/ntp_refclock.c
ports/winnt/include/ntp_iocpltypes.h
ports/winnt/ntpd/ntp_iocompletionport.c
ports/winnt/ntpd/ntp_iocpltypes.c

index ddeb2a506782109f33fb7c60d79915a9782896e0..434ee77489fa5765494493ace27ad690eff1c81f 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -50,6 +50,7 @@
 * Shim X509_get_signature_nid() if needed.
 * git author attribution cleanup
 * bk ignore file cleanup
+* remove locks in Wndows IO, use rpc-like thread synchronisation instead
 
 ---
 (4.2.8p8) 2016/06/02 Released by Harlan Stenn <stenn@ntp.org>
index 5a6f2df74adf6e2c0204fbe431edbe54cc775256..bc389012ef56436a4de52287c24b0290203d829f 100644 (file)
@@ -710,7 +710,7 @@ process_refclock_packet(
        if (rio->io_input == NULL || (*rio->io_input)(rb) != 0) {
                rio->recvcount++;
                packets_received++;
-               handler_pkts++;         
+               handler_pkts++;
                (*rio->clock_recv)(rb);
        }
 }
index c2d40acbe4c911f609a518dc4bbc23be306806a1..b3e320b4d125dbc3e177ad94f5f5482f256eb63b 100644 (file)
@@ -22,41 +22,38 @@ typedef struct interface    endpt;
 typedef struct recvbuf         recvbuf_t;
 
 /* ---------------------------------------------------------------------
- * shared lock to avoid concurrent deletes on IO related stuff like
- * RIO or ENDPOINT blocks.
- *
- * Basically we wwould need a multiple-reader/single-writer lock,
- * but for now we do full mutual exclusion.
+ * shared control structure for IO. Removal of communication handles
+ * or other detach-like operations must be done exclusively by the IO
+ * thread, or Bad Things (tm) are bound to happen!
  */
-typedef struct SharedLock SharedLock_t;
-typedef const struct SharedLock CSharedLock_t;
-struct SharedLock {
-       CRITICAL_SECTION        mutex[1];
+typedef struct IoHndPad IoHndPad_T;
+typedef const struct IoHndPad CIoHndPad_T;
+struct IoHndPad {
        volatile u_long         refc_count;
        union {
                RIO_t *          rio;   /*  RIO back-link (for offload) */
-               endpt *          ept;   /*  inetrface backlink          */
+               endpt *          ept;   /*  interface backlink          */
                ULONG_PTR        key;   /*  as key for IOCPL queue      */
                void *           any;
        }                       rsrc;   /* registered source            */
        HANDLE                  handles[2]; /* 0->COM/SOCK 1->BCASTSOCK */
+
+       /* COMPORT specific stuff */
        int                     riofd;  /* FD for comports              */
+       unsigned int            flDropEmpty : 1;        /* no empty line*/
+       unsigned int            flFirstSeen : 1;
 };
 
-typedef BOOL(__fastcall * LockPredicateT)(CSharedLock_t*);
+typedef BOOL(__fastcall * IoPreCheck_T)(CIoHndPad_T*);
 
-extern SharedLock_t* __fastcall        slCreate(void * rsrc);
-extern SharedLock_t* __fastcall        slAttach(SharedLock_t*);
-extern SharedLock_t* __fastcall        slDetach(SharedLock_t*);
-extern SharedLock_t* __fastcall        slAttachShared(SharedLock_t*);
-extern SharedLock_t* __fastcall        slDetachShared(SharedLock_t*);
-extern SharedLock_t* __fastcall        slAttachExclusive(SharedLock_t*);
-extern SharedLock_t* __fastcall        slDetachExclusive(SharedLock_t*);
+extern IoHndPad_T* __fastcall  iohpCreate(void * rsrc);
+extern IoHndPad_T* __fastcall  iohpAttach(IoHndPad_T*);
+extern IoHndPad_T* __fastcall  iohpDetach(IoHndPad_T*);
 
-extern BOOL __fastcall slRefClockOK(CSharedLock_t*);
-extern BOOL __fastcall slEndPointOK(CSharedLock_t*);
+extern BOOL __fastcall iohpRefClockOK(CIoHndPad_T*);
+extern BOOL __fastcall iohpEndPointOK(CIoHndPad_T*);
 
-extern BOOL    slQueueLocked(SharedLock_t*, LockPredicateT, recvbuf_t*);
+extern BOOL    iohpQueueLocked(CIoHndPad_T*, IoPreCheck_T, recvbuf_t*);
 
 
 /* ---------------------------------------------------------------------
@@ -131,7 +128,7 @@ struct IoCtx {
                SOCKET           sfd;           /*  socket descriptor           */
        }                       io;             /* the IO resource used         */
        IoCompleteFunc          onIoDone;       /* HL callback to execute       */
-       SharedLock_t *          slock;
+       IoHndPad_T *            iopad;
        DevCtx_t *              devCtx;
        DWORD                   errCode;        /* error code of last I/O       */
        DWORD                   byteCount;      /* byte count     "             */
@@ -152,10 +149,10 @@ struct IoCtx {
 
 typedef BOOL (__fastcall *IoCtxStarterT)(IoCtx_t*, recvbuf_t*);
 
-extern IoCtx_t* __fastcall IoCtxAlloc(SharedLock_t*, DevCtx_t*);
+extern IoCtx_t* __fastcall IoCtxAlloc(IoHndPad_T*, DevCtx_t*);
 extern void    __fastcall IoCtxFree(IoCtx_t*);
 extern void    __fastcall IoCtxRelease(IoCtx_t*);
 
-extern BOOL    IoCtxStartLocked(IoCtx_t*, IoCtxStarterT, recvbuf_t*);
+extern BOOL    IoCtxStartChecked(IoCtx_t*, IoCtxStarterT, recvbuf_t*);
 
 #endif /*!defined(NTP_IOCPLTYPES_H)*/
index 90bdd99e18d999ef93f96764bb80b928b2db6060..251ba6a438ee8409baa9cf26b0f6cb5f81a71849 100644 (file)
@@ -77,6 +77,12 @@ enum io_packet_handling {
        PKT_SOCKET_ERROR
 };
 
+static const char * const st_packet_handling[3] = {
+       "accepted",
+       "dropped"
+       "error"
+};
+
 /*
  * local function definitions
  */
@@ -112,6 +118,7 @@ static void free_io_completion_port_mem(void);
        HANDLE  WaitableIoEventHandle;
 static HANDLE  hndIOCPLPort;
 static HANDLE  hMainThread;
+static HANDLE  hMainRpcDone;
 static BOOL    DoPPShack;
 
 DWORD  ActiveWaitHandles;
@@ -262,6 +269,11 @@ init_io_completion_port(void)
                msyslog(LOG_ERR, "Can't create exit event handle: %m");
                exit(1);
        }
+       hMainRpcDone = CreateEvent(NULL, FALSE, FALSE, NULL);
+       if (hMainRpcDone == NULL) {
+               msyslog(LOG_ERR, "Can't create RPC sync handle: %m");
+               exit(1);
+       }
 
        /* Create the IO completion port */
        hndIOCPLPort = CreateIoCompletionPort(
@@ -343,6 +355,8 @@ uninit_io_completion_port(
        hIoCompletionThread = NULL;
        CloseHandle(hndIOCPLPort);
        hndIOCPLPort = NULL;
+       CloseHandle(hMainRpcDone);
+       hMainRpcDone = NULL;
 }
 
 
@@ -410,6 +424,24 @@ free_io_completion_port_mem(void)
 }
 #endif /* DEBUG */
 
+void
+iocpl_notify(
+       IoHndPad_T *    iopad,
+       void            (*pfunc)(ULONG_PTR, IoCtx_t *),
+       UINT_PTR        fdn
+       )
+{
+       IoCtx_t xf;
+
+       memset(&xf, 0, sizeof(xf));
+       xf.iopad    = iopad;
+       xf.ppswake  = hMainRpcDone;
+       xf.onIoDone = pfunc;
+       xf.io.sfd   = fdn;
+       PostQueuedCompletionStatus(hndIOCPLPort, 1, 0, &xf.ol);
+       WaitForSingleObject(xf.ppswake, INFINITE);
+}
+
 /*
  * -------------------------------------------------------------------
  * APC callback for scheduling interface scans.
@@ -526,15 +558,14 @@ getRioFromIoCtx(
         * lock, the check for errors. If the error indicates the
         * operation was cancelled, let the operation fail silently.
         */
-       RIO_t *         rio = NULL;
-       SharedLock_t *  slock = slAttachShared(ctx->slock);
-       if (NULL != slock) {
-               rio = slock->rsrc.rio;
-               if (key != slock->rsrc.key)
+       RIO_t *         rio   = NULL;
+       IoHndPad_T *    iopad = ctx->iopad;
+       if (NULL != iopad) {
+               rio = iopad->rsrc.rio;
+               if (key != iopad->rsrc.key)
                        rio = NULL;
-               else if (ctx->io.hnd != slock->handles[0])
+               else if (ctx->io.hnd != iopad->handles[0])
                        rio = NULL;
-               slDetachShared(slock);
        }
        if (rio != NULL) switch (ctx->errCode) {
                /* When we got cancelled, don't spill messages */
@@ -577,14 +608,13 @@ getEndptFromIoCtx(
         * it out in both the input key and the registered source.
         */
        endpt *         ep    = NULL;
-       SharedLock_t *  slock = slAttachShared(ctx->slock);
-       if (slock != NULL) {
-               ep = slock->rsrc.ept;
-               if ((key >> 1) != (slock->rsrc.key >> 1))
+       IoHndPad_T *    iopad = ctx->iopad;
+       if (iopad != NULL) {
+               ep = iopad->rsrc.ept;
+               if ((key >> 1) != (iopad->rsrc.key >> 1))
                        ep = NULL;
-               else if (ctx->io.hnd != slock->handles[key & 1])
+               else if (ctx->io.hnd != iopad->handles[key & 1])
                        ep = NULL;
-               slDetachShared(slock);
        }
        if (ep == NULL)
                IoCtxRelease(ctx);
@@ -703,7 +733,7 @@ QueueSerialWait(
        lpo->recv_buf = buff;
        lpo->flRawMem = 0;
 
-       buff->fd = lpo->slock->riofd;
+       buff->fd = lpo->iopad->riofd;
        /* keep receive position for continuation of partial lines! */
        rc  = WaitCommEvent(lpo->io.hnd, &lpo->aux.com_events, &lpo->ol);
        return rc || IoResultCheck(GetLastError(), lpo, msg);
@@ -732,7 +762,7 @@ OnSerialWaitComplete(
        /* start next IO and leave if we hit an error */
        if (lpo->errCode != ERROR_SUCCESS) {
                memset(&lpo->aux, 0, sizeof(lpo->aux));
-               IoCtxStartLocked(lpo, QueueSerialWait, lpo->recv_buf);
+               IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
                return;
        }
 
@@ -761,13 +791,13 @@ OnSerialWaitComplete(
                                dev->pps_data.cc_assert++;
                                dev->pps_data.ts_assert = lpo->aux.RecvTime;
                                DPRINTF(2, ("upps-real: fd %d DCD PPS Rise at %s\n",
-                                       lpo->slock->rsrc.rio->fd,
+                                       lpo->iopad->rsrc.rio->fd,
                                        ulfptoa(&lpo->aux.RecvTime, 6)));
                        } else {
                                dev->pps_data.cc_clear++;
                                dev->pps_data.ts_clear = lpo->aux.RecvTime;
                                DPRINTF(2, ("upps-real: fd %d DCD PPS Fall at %s\n",
-                                       lpo->slock->rsrc.rio->fd,
+                                       lpo->iopad->rsrc.rio->fd,
                                        ulfptoa(&lpo->aux.RecvTime, 6)));
                        }
                        /* Update PPS buffer, writing from low to high, with index
@@ -797,7 +827,7 @@ OnSerialWaitComplete(
                        lpo->aux.DCDSTime = lpo->aux.RecvTime;
                        lpo->aux.flTsDCDS = 1;
                        DPRINTF(2, ("upps-hack: fd %d DCD PPS Rise at %s\n",
-                               lpo->slock->rsrc.rio->fd,
+                               lpo->iopad->rsrc.rio->fd,
                                ulfptoa(&lpo->aux.RecvTime, 6)));
                }
        }
@@ -806,13 +836,13 @@ OnSerialWaitComplete(
        if (EV_RXFLAG & lpo->aux.com_events) {          /* line discipline */
                lpo->aux.FlagTime = lpo->aux.RecvTime;
                lpo->aux.flTsFlag = 1;
-               IoCtxStartLocked(lpo, QueueSerialRead, lpo->recv_buf);
+               IoCtxStartChecked(lpo, QueueSerialRead, lpo->recv_buf);
        } else if (EV_RXCHAR & lpo->aux.com_events) {   /* raw discipline */
                lpo->aux.FlagTime = lpo->aux.RecvTime;
                lpo->aux.flTsFlag = 1;
-               IoCtxStartLocked(lpo, QueueRawSerialRead, lpo->recv_buf);
+               IoCtxStartChecked(lpo, QueueRawSerialRead, lpo->recv_buf);
        } else {                                        /* idle... */
-               IoCtxStartLocked(lpo, QueueSerialWait, lpo->recv_buf);
+               IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
        }
 }
 
@@ -839,10 +869,10 @@ QueueSerialReadCommon(
        lpo->flRawMem = 0;
 
        /* 'buff->recv_length' must be set already! */
-       buff->fd        = lpo->slock->riofd;
+       buff->fd        = lpo->iopad->riofd;
        buff->dstadr    = NULL;
        buff->receiver  = process_refclock_packet;
-       buff->recv_peer = lpo->slock->rsrc.rio->srcclock;
+       buff->recv_peer = lpo->iopad->rsrc.rio->srcclock;
 
        rc = ReadFile(lpo->io.hnd,
                (char*)buff->recv_buffer + buff->recv_length,
@@ -911,7 +941,7 @@ OnSerialReadComplete(
 wait_again:
        /* make sure the read is issued again */
        memset(&lpo->aux, 0, sizeof(lpo->aux));
-       IoCtxStartLocked(lpo, QueueSerialWait, lpo->recv_buf);
+       IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
 }
 
 
@@ -926,122 +956,180 @@ wait_again:
  * the IO completion thread can resume faster.
  *
  * !!ATTENTION!!
- * This function runs on an arbitrary worker thread, and not under the
- * protection of the shared lock! Accessing the RIO structure must set
- * the lock explicitely!
+ * This function runs on an arbitrary worker thread. The resource
+ * management with regard to IO is synchronised only between the main
+ * thread and the IO worker thread, so decisions about queueing and
+ * starting new IO must be made by either of them.
+ *
+ * Since the IO thread sticks in the IOCPL queue and is not alertable,
+ * we could either use the APC queue to the main thread or the IOCPL
+ * queue to the IO thread.
+ *
+ * We separate the effort -- filtering based on the RIO state is done
+ * by the main thread, restarting the IO by the IO thread to reduce
+ * delays.
+ */
+
+/* -------------------------------------------------------------------
+ * IOCPL deferred bouncer -- start a new serial wait from IOCPL thread
+ */
+static void
+OnDeferredStartWait(
+       ULONG_PTR       key,
+       IoCtx_t *       lpo
+)
+{
+       IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
+}
+
+/* -------------------------------------------------------------------
+ * APC deferred bouncer -- put buffer to receive queueor eventually
+ * discard it if source is already disabled. Runs in the context
+ * of the main thread exclusively.
+ */
+static void WINAPI
+OnEnqueAPC(
+       ULONG_PTR arg
+)
+{
+       recvbuf_t *     buff  = (recvbuf_t*)arg;
+       IoHndPad_T *    iopad = (IoHndPad_T*)buff->recv_peer;
+       RIO_t *         rio   = iopad->rsrc.rio;
+
+       /* Down below we make a nasty hack to transport the iopad
+        * pointer in the buffer so we can avoid another temporary
+        * allocation. We must undo this here.
+       */
+       if (NULL != rio) {
+               /* OK, refclock still attached */
+               buff->recv_peer = rio->srcclock;
+               if (iohpQueueLocked(iopad, iohpRefClockOK, buff))
+                       ++rio->srcclock->received;
+       } else {
+               /* refclock detached while in flight... */
+               freerecvbuf(buff);
+       }
+       iohpDetach(iopad); /* one unit owned by this callback! */
+}
+
+/* -------------------------------------------------------------------
+ * worker pool thread worker doing the string processing
  */
 static DWORD WINAPI
 OnSerialReadWorker(
        void *  ctx
        )
 {
-       IoCtx_t *       lpo;
-       SharedLock_t *  slock;
-       recvbuf_t *     buff, *obuf;
+       IoCtx_t *       lpo  = (IoCtx_t*)ctx;
+       IoHndPad_T *    iop  = lpo->iopad;
+       recvbuf_t *     buff = lpo->recv_buf;
+       recvbuf_t *     obuf = NULL;
        char            *sptr, *send, *dptr;
        BOOL            eol;
-       char            ch;
-       BOOL            active;
-       u_long          rcvcnt;
-       RIO_t *         rio;
-
-       /* Get context back */
-       lpo  = (IoCtx_t*)ctx;
-       buff = lpo->recv_buf;
-
-       /* query the lock structure under mutual exclusion */
-       active = FALSE;
-       rcvcnt = 0;
-       if (NULL != (slock = slAttachShared(lpo->slock))) {
-               if (NULL != (rio = slock->rsrc.rio)) {
-                       active = TRUE;
-                       rcvcnt = InterlockedIncrement(&rio->recvcount) - 1;
-               }
-               slDetachShared(slock);
-       }
-
-       /* bail out if we're disconnected now */
-       if (!active) {
-               IoCtxRelease(ctx);
-               return 0;
-       }
+       int             ch;
 
-       /* Ignore zero-byte reads due to closure on fd.
-        * Eat the first line of input as it's possibly partial.
+       /* We should never gat a zero-byte read here. If we do, nothing
+        * really bad happens, just a useless rescan of data we have
+        * already processed. But somethings not quite right in logic
+        * and we croak loudly in debug builds.
         */
-       if (lpo->byteCount && rcvcnt) {
-               /* account for additional input */
-               buff->recv_length += (int)lpo->byteCount;
-
-               /*
-                * Now mimic the Unix line discipline.
-                */
-               sptr = (char *)buff->recv_buffer;
-               send = sptr + buff->recv_length;
-               obuf = NULL;
-               dptr = NULL;
-
-               /* hack #1: eat away leading CR/LF if there is any */
-               while (sptr != send) {
-                       ch = *sptr;
-                       if (ch != '\n' && ch != '\r')
-                               break;
-                       sptr++;
-               }
+       DEBUG_INSIST(lpo->byteCount > 0);
 
-               while (sptr != send) {
-                       /* get new buffer to store line */
-                       obuf = get_free_recv_buffer_alloc();
-                       obuf->fd        = buff->fd;
-                       obuf->receiver  = buff->receiver;
-                       obuf->dstadr    = NULL;
-                       obuf->recv_peer = buff->recv_peer;
-                       set_serial_recv_time(obuf, lpo);
-
-                       /* Copy data to new buffer, convert CR to LF on
-                        * the fly.  Stop after either.
-                        */
-                       dptr = (char *)obuf->recv_buffer;
-                       eol = FALSE;
-                       while (sptr != send && !eol) {
-                               ch = *sptr++;
-                               if ('\r' == ch)
-                                       ch = '\n';
-                               *dptr++ = ch;
-                               eol = ('\n' == ch);
-                       }
-                       obuf->recv_length =
-                               (int)(dptr - (char *)obuf->recv_buffer);
+       /* Account for additional input and then mimic the UNIX line
+        * discipline. This is an implict state machine -- the
+        * implementation is very low-level to gather speed.
+        */
+       buff->recv_length += (int)lpo->byteCount;
+       sptr = (char *)buff->recv_buffer;
+       send = sptr + buff->recv_length;
+       if (sptr == send)
+               goto st_read_fresh;
+
+st_new_obuf:
+       /* Get new receive buffer to store the line. */
+       obuf = get_free_recv_buffer_alloc();
+       obuf->fd        = buff->fd;
+       obuf->receiver  = buff->receiver;
+       obuf->dstadr    = NULL;
+       obuf->recv_peer = buff->recv_peer;
+       set_serial_recv_time(obuf, lpo);
+
+st_copy_start:
+       /* Copy data to new buffer, convert CR to LF on the fly.
+        * Stop after either.
+        */
+       dptr = (char *)obuf->recv_buffer;
+       do {
+               ch = *sptr++;
+               if ('\r' == ch)
+                       ch = '\n';
+               *dptr++ = ch;
+               eol = ('\n' == ch);
+       } while (!(eol || sptr == send));
+       obuf->recv_length = (int)(dptr - (char *)obuf->recv_buffer);
+
+       /* If we're not at EOL, we need more data to continue the line.
+        * But this can only be done if there's more room in the buffer;
+        * if we have already reached the maximum size, treat the whole
+        * buffer as part of a mega-line and pass it on.
+        */
+       if (!eol) {
+               if (obuf->recv_length < sizeof(obuf->recv_buffer))
+                       goto st_read_more;
+               else
+                       goto st_pass_buffer;
+       }
 
-                       /* If NL found, push this buffer and prepare to
-                        * get a new one. Be prepared for concurrent
-                        * removal of the clock...
-                        */
-                       if (eol) {
-                               slQueueLocked(lpo->slock, slRefClockOK, obuf);
-                               obuf = NULL; /* consumed in any case */
-                       }
-               }
+       /* if we should drop empty lines, do it here. */
+       if (obuf->recv_length < 2 && iop->flDropEmpty) {
+               obuf->recv_length = 0;
+               if (sptr != send)
+                       goto st_copy_start;
+               else
+                       goto st_read_more;
+       }
 
-               /* If we still have an output buffer, prepare it to be
-                * used for added input from the ComPort. Otherwise
-                * use the current input buffer again.
-                */
-               if (obuf) {
-                       obuf->recv_length =
-                               (int)(dptr - (char *)obuf->recv_buffer);
-                       freerecvbuf(buff);
-                       buff = obuf;
-               } else {
-                       /* clear the current buffer, continue */
-                       buff->recv_length = 0;
-               }
-       } else {
-               buff->recv_length = 0;
+       if ( ! iop->flFirstSeen) {
+               iop->flFirstSeen = 1;
+               obuf->recv_length = 0;
+               if (sptr != send)
+                       goto st_copy_start;
+               else
+                       goto st_read_more;
        }
 
-       /* start next round -- must hold the lock during that! */
-       IoCtxStartLocked(lpo, QueueSerialWait, buff);
+st_pass_buffer:
+       /* if we arrive here, we can spin off another text line to the
+        * receive queue. We use a hack to supplant the RIO pointer in
+        * the receive buffer with the IOPAD to save us a temporary
+        * workspace allocation. Note the callback owns one refcount
+        * unit to keep the IOPAD alive! Also checking that the RIO in
+        * the IOPAD matches the RIO in the buffer is dangerous: That
+        * pointer is manipulated by the other threads!
+        */
+       obuf->recv_peer = (struct peer*)iohpAttach(lpo->iopad);
+       QueueUserAPC(OnEnqueAPC, hMainThread, (ULONG_PTR)obuf);
+       if (sptr != send)
+               goto st_new_obuf;
+       buff->recv_length = 0;
+       goto st_read_fresh;
+
+st_read_more:
+       /* read more data into current OBUF, which is valid and will
+        * replace BUFF.
+        */
+       lpo->recv_buf = obuf;
+       freerecvbuf(buff);
+
+st_read_fresh:
+       /* Start next round. This is deferred to the IOCPL thread, as
+        * read access to the IOPAD is unsafe from a worker thread
+        * for anything but the flags. If the IOCPL handle is gone,
+        * just mop up the pieces.
+        */
+       lpo->onIoDone = OnDeferredStartWait;
+       if (!(hndIOCPLPort && PostQueuedCompletionStatus(hndIOCPLPort, 1, 0, &lpo->ol)))
+               IoCtxRelease(lpo);
        return 0;
 }
 
@@ -1091,10 +1179,10 @@ OnRawSerialReadComplete(
        if (lpo->errCode == ERROR_SUCCESS && lpo->byteCount > 0) {
                buff->recv_length = (int)lpo->byteCount;
                set_serial_recv_time(buff, lpo);
-               slQueueLocked(lpo->slock, slRefClockOK, buff);
+               iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff);
                buff = get_free_recv_buffer_alloc();
        }
-       IoCtxStartLocked(lpo, QueueSerialWait, buff);
+       IoCtxStartChecked(lpo, QueueSerialWait, buff);
 }
 
 
@@ -1290,7 +1378,7 @@ io_completion_port_add_clock_io(
 
        IoCtx_t *       lpo;
        HANDLE          h;
-       SharedLock_t *  slock = NULL;
+       IoHndPad_T *    iopad = NULL;
 
        /* preset to clear state for error cleanup:*/
        rio->ioreg_ctx  = NULL;
@@ -1304,14 +1392,14 @@ io_completion_port_add_clock_io(
        }
 
        ;
-       if ( ! (rio->ioreg_ctx = slock = slCreate(rio))) {
+       if ( ! (rio->ioreg_ctx = iopad = iohpCreate(rio))) {
                msyslog(LOG_ERR, "%s: Failed to create shared lock",
                        msgh);
                goto fail;
        }
-       slock->handles[0] = h;
-       slock->riofd      = rio->fd;
-       slock->rsrc.rio   = rio;
+       iopad->handles[0] = h;
+       iopad->riofd      = rio->fd;
+       iopad->rsrc.rio   = rio;
 
        if (!(rio->device_ctx = DevCtxAttach(serial_devctx(h)))) {
                msyslog(LOG_ERR, "%s: Failed to allocate device context",
@@ -1319,7 +1407,7 @@ io_completion_port_add_clock_io(
                goto fail;
        }
 
-       if ( ! (lpo = IoCtxAlloc(slock, rio->device_ctx))) {
+       if ( ! (lpo = IoCtxAlloc(iopad, rio->device_ctx))) {
                msyslog(LOG_ERR, "%: Failed to allocate IO context",
                        msgh);
                goto fail;
@@ -1335,29 +1423,45 @@ io_completion_port_add_clock_io(
        return QueueSerialWait(lpo, get_free_recv_buffer_alloc());
 
 fail:
-       rio->ioreg_ctx  = slDetach(rio->ioreg_ctx);
+       rio->ioreg_ctx  = iohpDetach(rio->ioreg_ctx);
        rio->device_ctx = DevCtxDetach(rio->device_ctx);
        return FALSE;
 }
 
 /* ----------------------------------------------------------------- */
+static void
+OnSerialDetach(
+       ULONG_PTR       key,
+       IoCtx_t *       lpo
+)
+{
+       /* Make sure the key matches the context info in the shared
+       * lock, the check for errors. If the error indicates the
+       * operation was cancelled, let the operation fail silently.
+       */
+       IoHndPad_T *    iopad = lpo->iopad;
+
+       INSIST(NULL != iopad);
+       if (iopad->handles[0] == lpo->io.hnd) {
+               iopad->handles[0] = INVALID_HANDLE_VALUE;
+               iopad->handles[1] = INVALID_HANDLE_VALUE;
+               iopad->rsrc.rio   = NULL;
+               iopad->riofd      = -1;
+       }
+       SetEvent(lpo->ppswake);
+}
+
+
 void
 io_completion_port_remove_clock_io(
        RIO_t *rio
        )
 {
-       SharedLock_t *  slock = NULL;
-       if (rio && NULL != (slock = slAttachExclusive(rio->ioreg_ctx))) {
-               slDetach(slock);
-
-               slock->handles[0] = INVALID_HANDLE_VALUE;
-               slock->handles[1] = INVALID_HANDLE_VALUE;
-               slock->rsrc.rio   = NULL;
-               slock->riofd      = -1;
+       IoHndPad_T *    iopad = (IoHndPad_T*)rio->ioreg_ctx;
 
-               rio->device_ctx = DevCtxDetach(rio->device_ctx);
-               rio->ioreg_ctx  = slDetachExclusive(slock);
-       }
+       INSIST(hndIOCPLPort && hMainRpcDone);
+       if (iopad)
+               iocpl_notify(iopad, OnSerialDetach, _get_osfhandle(rio->fd));
 }
 
 /*
@@ -1393,7 +1497,7 @@ QueueSocketRecv(
        buff->fd              = lpo->io.sfd;
        buff->recv_srcadr_len = sizeof(buff->recv_srcadr);
        buff->receiver        = receive;
-       buff->dstadr          = lpo->slock->rsrc.ept;
+       buff->dstadr          = lpo->iopad->rsrc.ept;
 
        wsabuf.buf = (char *)buff->recv_buffer;
        wsabuf.len = sizeof(buff->recv_buffer);
@@ -1414,15 +1518,16 @@ OnSocketRecv(
        static const char * const msg =
                "OnSocketRecv: receive from socket failed";
 
-       recvbuf_t *     buff    = NULL;
-       SharedLock_t *  slock   = NULL;
-       BOOL epOK = TRUE;
-       int retCode = PKT_OK;
+       recvbuf_t *     buff    = NULL;
+       IoHndPad_T *    iopad   = NULL;
+       endpt *         ep      = NULL;
+       int             rc;
 
-       /* Make sure this endpoint is not closed. */
-       endpt * ep = getEndptFromIoCtx(lpo, key);
-       retCode = socketErrorCheck(lpo, msg);
+       /* order is important -- check first, then get endpoint! */
+       rc = socketErrorCheck(lpo, msg);
+       ep = getEndptFromIoCtx(lpo, key);
 
+       /* Make sure this endpoint is not closed. */
        if (ep == NULL)
                return;
 
@@ -1430,10 +1535,10 @@ OnSocketRecv(
         * Since we must not use the context object once it is in
         * another IO, we go through some pains to read everything
         * before going out for another read request.
-        * We also need an extra hold to the SLOCK structure.
+        * We also need an extra hold to the IOPAD structure.
         */
-       slock = slAttach(lpo->slock);
-       if (retCode == PKT_OK && lpo->byteCount > 0) {
+       iopad = iohpAttach(lpo->iopad);
+       if (rc == PKT_OK && lpo->byteCount > 0) {
                /* keep input buffer, create new one for IO */
                buff              = lpo->recv_buf;
                lpo->recv_buf     = get_free_recv_buffer_alloc();
@@ -1443,8 +1548,8 @@ OnSocketRecv(
 
        } /* Note: else we use the current buffer again */
 
-       if (retCode != PKT_SOCKET_ERROR) {
-               IoCtxStartLocked(lpo, QueueSocketRecv, lpo->recv_buf);
+       if (rc != PKT_SOCKET_ERROR) {
+               IoCtxStartChecked(lpo, QueueSocketRecv, lpo->recv_buf);
        }  else {
                freerecvbuf(lpo->recv_buf);
                IoCtxFree(lpo);
@@ -1464,23 +1569,18 @@ OnSocketRecv(
                        (int)buff->fd, stoa(&buff->recv_srcadr),
                        get_packet_mode(buff)));
 
-               if (slAttachShared(slock)) {
-                       if (slEndPointOK(slock)) {
-                               InterlockedIncrement(&ep->received);
-                               slDetachShared(slock);
-                               InterlockedIncrement(&packets_received);
-                               InterlockedIncrement(&handler_pkts);
-                       } else {
-                               slDetachShared(slock);
-                       }
+               if (iohpEndPointOK(iopad)) {
+                       InterlockedIncrement(&ep->received);
+                       InterlockedIncrement(&packets_received);
+                       InterlockedIncrement(&handler_pkts);
                }
 
                DPRINTF(2, ("Received %d bytes fd %d in buffer %p from %s, state = %s\n",
                        buff->recv_length, (int)buff->fd, buff,
-                       stoa(&buff->recv_srcadr), epOK? "Accepted" : "Ignored"));
-               slQueueLocked(slock, slEndPointOK, buff);
+                       stoa(&buff->recv_srcadr), st_packet_handling[rc]));
+               iohpQueueLocked(iopad, iohpEndPointOK, buff);
        }
-       slDetach(slock);
+       iohpDetach(iopad);
 }
 
 /* ----------------------------------------------------------------- */
@@ -1494,26 +1594,23 @@ OnSocketSend(
        static const char * const msg =
                "OnSocketSend: send to socket failed";
 
-       SharedLock_t *  slock = NULL;
-       endpt *         ep = getEndptFromIoCtx(lpo, key);
-       int             retCode = socketErrorCheck(lpo, msg);
+       IoHndPad_T *    iopad   = NULL;
+       endpt *         ep      = NULL;
+       int             rc;
+
+       /* order is important -- check first, then get endpoint! */
+       rc = socketErrorCheck(lpo, msg);
+       ep = getEndptFromIoCtx(lpo, key);
+
        /* Make sure this endpoint is not closed. */
        if (ep == NULL)
                return;
 
-       if (retCode != PKT_OK) {
-               slock = slAttachShared(lpo->slock);
-               if (slock) {
-                       if (slEndPointOK(slock)) {
-                               InterlockedIncrement(&ep->notsent);
-                               InterlockedDecrement(&ep->sent);
-                               slDetachShared(slock);
-                               InterlockedIncrement(&packets_notsent);
-                               InterlockedDecrement(&packets_sent);
-                       } else {
-                               slDetachShared(slock);
-                       }
-               }
+       if (rc != PKT_OK) {
+               InterlockedIncrement(&ep->notsent);
+               InterlockedDecrement(&ep->sent);
+               InterlockedIncrement(&packets_notsent);
+               InterlockedDecrement(&packets_sent);
        }
        IoCtxRelease(lpo);
 }
@@ -1522,6 +1619,23 @@ OnSocketSend(
  * register and de-register interface endpoints with the IO engine
  * --------------------------------------------------------------------
  */
+static void
+OnInterfaceDetach(
+       ULONG_PTR       key,
+       IoCtx_t *       lpo
+       )
+{
+       IoHndPad_T *    iopad = lpo->iopad;
+
+       INSIST(NULL != iopad);
+       iopad->handles[0] = INVALID_HANDLE_VALUE;
+       iopad->handles[1] = INVALID_HANDLE_VALUE;
+       iopad->rsrc.ept = NULL;
+
+       SetEvent(lpo->ppswake);
+}
+
+/* ----------------------------------------------------------------- */
 BOOL
 io_completion_port_add_interface(
        endpt * ep
@@ -1530,7 +1644,7 @@ io_completion_port_add_interface(
        /* Registering an endpoint is simple: allocate a shared lock for
         * the enpoint and return if the allocation was successful.
         */
-       ep->ioreg_ctx = slCreate(ep);
+       ep->ioreg_ctx = iohpCreate(ep);
        return ep->ioreg_ctx != NULL;
 }
 /* ----------------------------------------------------------------- */
@@ -1544,16 +1658,11 @@ io_completion_port_remove_interface(
         * endpoint pointer. Do an additional detach and leave the
         * write lock.
         */
-       SharedLock_t *  slock = slAttachExclusive(ep->ioreg_ctx);
-       if (slock != NULL) {
-               slDetach(slock);
+       IoHndPad_T *    iopad = (IoHndPad_T*)ep->ioreg_ctx;
 
-               slock->handles[0] = INVALID_HANDLE_VALUE;
-               slock->handles[1] = INVALID_HANDLE_VALUE;
-               slock->rsrc.ept   = NULL;
-
-               ep->ioreg_ctx = slDetachExclusive(slock);
-       }
+       INSIST(hndIOCPLPort && hMainRpcDone);
+       if (iopad)
+               iocpl_notify(iopad, OnInterfaceDetach, -1);
 }
 
 /* --------------------------------------------------------------------
@@ -1561,7 +1670,24 @@ io_completion_port_remove_interface(
  * --------------------------------------------------------------------
  */
 
-/* Add a socket handle to the I/O completion port, and send 
+static void
+OnSocketDetach(
+       ULONG_PTR       key,
+       IoCtx_t *       lpo
+       )
+{
+       IoHndPad_T *    iopad = lpo->iopad;
+
+       INSIST(NULL != iopad);
+       if (iopad->handles[0] == lpo->io.hnd)
+               iopad->handles[0] = INVALID_HANDLE_VALUE;
+       if (iopad->handles[1] == lpo->io.hnd)
+               iopad->handles[1] = INVALID_HANDLE_VALUE;
+
+       SetEvent(lpo->ppswake);
+}
+
+/* Add a socket handle to the I/O completion port, and send
  * NTP_RECVS_PER_SOCKET receive requests to the kernel.
  */
 BOOL
@@ -1580,18 +1706,17 @@ io_completion_port_add_socket(
        IoCtx_t *       lpo;
        size_t          n;
        ULONG_PTR       key;
-       SharedLock_t *  slock = NULL;
+       IoHndPad_T *    iopad = NULL;
 
        key = ((ULONG_PTR)ep & ~(ULONG_PTR)1u) + !!bcast;
 
-       if (NULL == (slock = slAttachExclusive(ep->ioreg_ctx))) {
+       if (NULL == (iopad = (IoHndPad_T*)ep->ioreg_ctx)) {
                msyslog(LOG_CRIT, "io_completion_port_add_socket: endpt = %p not registered, exiting",
                        ep);
                exit(1);
        } else {
-               endpt * rep = slock->rsrc.ept;
-               slock->handles[!!bcast] = (HANDLE)sfd;
-               slDetachExclusive(slock);
+               endpt * rep = iopad->rsrc.ept;
+               iopad->handles[!!bcast] = (HANDLE)sfd;
                INSIST(rep == ep);
        }
 
@@ -1613,7 +1738,7 @@ io_completion_port_add_socket(
        return TRUE;
 
 fail:
-       ep->ioreg_ctx = slDetach(ep->ioreg_ctx);
+       ep->ioreg_ctx = iohpDetach(ep->ioreg_ctx);
        return FALSE;
 }
 /* ----------------------------------------------------------------- */
@@ -1626,15 +1751,11 @@ io_completion_port_remove_socket(
        /* Lock the shared lock for write, then search the given
         * socket handle and replace it with an invalid handle value.
         */
-       SharedLock_t *  lp = slAttachExclusive(ep->ioreg_ctx);
-       HANDLE          sh = (HANDLE)fd;
-       if (lp != NULL) {
-               if (lp->handles[0] == sh)
-                       lp->handles[0] = INVALID_HANDLE_VALUE;
-               else if (lp->handles[1] == sh)
-                       lp->handles[1] = INVALID_HANDLE_VALUE;
-               slDetachExclusive(lp);
-       }
+       IoHndPad_T *    iopad = (IoHndPad_T*)ep->ioreg_ctx;
+
+       INSIST(hndIOCPLPort && hMainRpcDone);
+       if (iopad)
+               iocpl_notify(iopad, OnSocketDetach, fd);
 }
 
 
@@ -1738,7 +1859,8 @@ GetReceivedBuffers(void)
                        timer();
                        break;
 
-               case WAIT_IO_COMPLETION: /* loop */
+               case WAIT_IO_COMPLETION: /* there might be something after APC */
+                       have_packet = !!full_recvbuffs();
                        break;
 
                case WAIT_TIMEOUT:
index 538217a7b826dc3313c04797350da14ba513d3c2..0e8bd7b56ab2a89a4bce85d02e3f3a718bec3a47 100644 (file)
  * lock is NOT aquired, and all IO handles or FDs are set to an
  * invalid value.
  */
-SharedLock_t*  __fastcall
-slCreate(
+IoHndPad_T*  __fastcall
+iohpCreate(
        void *  src
        )
 {
-       SharedLock_t* retv;
+       IoHndPad_T* retv;
 
-       retv = IOCPLPoolAlloc(sizeof(SharedLock_t), "Lock");
+       retv = IOCPLPoolAlloc(sizeof(IoHndPad_T), "Lock");
        if (retv != NULL) {
-               InitializeCriticalSection(retv->mutex);
                retv->refc_count = 1;
                retv->rsrc.any   = src;
                retv->handles[0] = INVALID_HANDLE_VALUE;
@@ -52,9 +51,9 @@ slCreate(
  * Attach to a lock. This just increments the use count, but does not
  * aquire the internal lock. Return a pointer to the lock.
  */
-SharedLock_t*  __fastcall
-slAttach(
-       SharedLock_t *  lp
+IoHndPad_T*  __fastcall
+iohpAttach(
+       IoHndPad_T *    lp
        )
 {
        if (lp != NULL)
@@ -70,84 +69,25 @@ slAttach(
  *
  * THE CALLER MUST NOT OWN THE INTERNAL LOCK WHEN DOING THIS!
  */
-SharedLock_t*  __fastcall
-slDetach(
-       SharedLock_t *  lp
+IoHndPad_T*  __fastcall
+iohpDetach(
+       IoHndPad_T *    lp
        )
 {
        if (lp != NULL && !InterlockedDecrement(&lp->refc_count)) {
-               DeleteCriticalSection(lp->mutex);
-               memset(lp, 0xFF, sizeof(SharedLock_t));
+               memset(lp, 0xFF, sizeof(IoHndPad_T));
                IOCPLPoolFree(lp, "Lock");
        }
        return NULL;
 }
 
-/* --------------------------------------------------------------------
- * Attach and aquire the lock for READ access. (This might block)
- */
-SharedLock_t*  __fastcall
-slAttachShared(
-       SharedLock_t *  lp
-       )
-{
-       if (NULL != (lp = slAttach(lp)))
-               EnterCriticalSection(lp->mutex);
-       return lp;
-}
-
-/* --------------------------------------------------------------------
- * Release the READ lock and detach from shared lock.
- * Alwys returns NULL.
- *
- * THE CALLER MUST OWN THE READ LOCK WHEN DOING THIS.
- */
-SharedLock_t*  __fastcall
-slDetachShared(
-       SharedLock_t *  lp
-       )
-{
-       if (lp != NULL)
-               LeaveCriticalSection(lp->mutex);
-       return slDetach(lp);
-}
-
-/* --------------------------------------------------------------------
- * Attach and aquire the lock for WRITE access. (This might block)
- */
-SharedLock_t*  __fastcall
-slAttachExclusive(
-       SharedLock_t *  lp
-)
-{
-       if (NULL != (lp = slAttach(lp)))
-               EnterCriticalSection(lp->mutex);
-       return lp;
-}
-
-/* --------------------------------------------------------------------
- * Release the WRITE lock and detach from shared lock.
- * Alwys returns NULL.
- *
- * THE CALLER MUST OWN THE WRITE LOCK WHEN DOING THIS.
- */
-SharedLock_t*  __fastcall
-slDetachExclusive(
-       SharedLock_t *  lp
-       )
-{
-       if (lp != NULL)
-               LeaveCriticalSection(lp->mutex);
-       return slDetach(lp);
-}
-
 /* --------------------------------------------------------------------
  * Predicate function: Is there an attached RIO, and is the RIO in
  * active state?
  */
 BOOL __fastcall
-slRefClockOK(
-       const SharedLock_t *    lp
+iohpRefClockOK(
+       const IoHndPad_T *      lp
        )
 {
        return  lp->rsrc.rio && lp->rsrc.rio->active;
@@ -158,8 +98,8 @@ slRefClockOK(
  * interface accepting packets?
  */
 BOOL __fastcall
-slEndPointOK(
-const SharedLock_t *   lp
+iohpEndPointOK(
+const IoHndPad_T *     lp
 )
 {
        return  lp->rsrc.ept && !lp->rsrc.ept->ignore_packets;
@@ -175,18 +115,17 @@ const SharedLock_t *      lp
  * independent of the function result!
  */
 BOOL
-slQueueLocked(
-       SharedLock_t *  lp,
-       LockPredicateT  pred,
+iohpQueueLocked(
+       CIoHndPad_T *   lp,
+       IoPreCheck_T    pred,
        recvbuf_t *     buf
        )
 {
        BOOL    done = FALSE;
-       if (slAttachShared(lp)) {
+       if (lp) {
                done = (*pred)(lp);
                if (done)
                        add_full_recv_buffer(buf);
-               slDetachShared(lp);
        }
        if (done)
                SetEvent(WaitableIoEventHandle);
@@ -253,7 +192,7 @@ DevCtxDetach(
  */
 IoCtx_t * __fastcall
 IoCtxAlloc(
-       SharedLock_t *  lock,
+       IoHndPad_T *    lock,
        DevCtx_t *      devCtx
        )
 {
@@ -261,7 +200,7 @@ IoCtxAlloc(
 
        ctx = (IoCtx_t *)IOCPLPoolAlloc(sizeof(IoCtx_t), "IO ctx");
        if (ctx != NULL) {
-               ctx->slock = slAttach(lock);
+               ctx->iopad = iohpAttach(lock);
                ctx->devCtx = DevCtxAttach(devCtx);
        }
        return ctx;
@@ -280,7 +219,7 @@ IoCtxFree(
        )
 {
        if (ctx) {
-               ctx->slock  = slDetach(ctx->slock);
+               ctx->iopad  = iohpDetach(ctx->iopad);
                ctx->devCtx = DevCtxDetach(ctx->devCtx);
                IOCPLPoolFree(ctx, "IO ctx");
        }
@@ -322,15 +261,13 @@ IoCtxAlive(
        )
 {
        return ctx                      &&
-               ctx->slock              &&
-               ctx->slock->rsrc.any;
+               ctx->iopad              &&
+               ctx->iopad->rsrc.any;
 }
 
 /* --------------------------------------------------------------------
  * Start an IO operation on a given context object with a specified
  * function and buffer.
- * This locks the shared lock on the context, checks for the lock
- * being active, and only then runs the starter function.
  *
  * Returns TRUE if the starter was executed successfully, FALSE in
  * all other cases.
@@ -339,22 +276,21 @@ IoCtxAlive(
  * call IN ANY CASE, independent of the function result!
  */
 BOOL
-IoCtxStartLocked(
+IoCtxStartChecked(
        IoCtx_t *       lpo,
        IoCtxStarterT   func,
        recvbuf_t *     buf
        )
 {
-       BOOL            done = FALSE;
-       SharedLock_t *  slock = slAttachShared(lpo->slock);
-       if (slock != NULL) {
-               if ((lpo->io.hnd == slock->handles[0]) ||
-                   (lpo->io.hnd == slock->handles[1])  )
+       BOOL            done  = FALSE;
+       IoHndPad_T *    iopad = lpo->iopad;
+       if (iopad != NULL) {
+               if ((lpo->io.hnd == iopad->handles[0]) ||
+                   (lpo->io.hnd == iopad->handles[1])  )
                {
                        done = (func)(lpo, buf);
-                       lpo = NULL; /* consumed by 'func' */
+                       lpo  = NULL; /* consumed by 'func' */
                }
-               slDetachShared(slock);
        }
        if (lpo != NULL) {
                freerecvbuf(buf);