From: Juergen Perlinger Date: Mon, 24 Oct 2016 05:37:25 +0000 (+0200) Subject: [winio2 - unlocked] X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bd5116cbcb3d7d46f36e4d548240b8d3ad392d0f;p=thirdparty%2Fntp.git [winio2 - unlocked] - the great lock removal - the great renaming bk: 580d9e150pFykOW-C6BG5aDl6QyLtQ --- diff --git a/ChangeLog b/ChangeLog index ddeb2a506..434ee7748 100644 --- a/ChangeLog +++ b/ChangeLog @@ -50,6 +50,7 @@ * Shim X509_get_signature_nid() if needed. * git author attribution cleanup * bk ignore file cleanup +* remove locks in Wndows IO, use rpc-like thread synchronisation instead --- (4.2.8p8) 2016/06/02 Released by Harlan Stenn diff --git a/ntpd/ntp_refclock.c b/ntpd/ntp_refclock.c index 5a6f2df74..bc389012e 100644 --- a/ntpd/ntp_refclock.c +++ b/ntpd/ntp_refclock.c @@ -710,7 +710,7 @@ process_refclock_packet( if (rio->io_input == NULL || (*rio->io_input)(rb) != 0) { rio->recvcount++; packets_received++; - handler_pkts++; + handler_pkts++; (*rio->clock_recv)(rb); } } diff --git a/ports/winnt/include/ntp_iocpltypes.h b/ports/winnt/include/ntp_iocpltypes.h index c2d40acbe..b3e320b4d 100644 --- a/ports/winnt/include/ntp_iocpltypes.h +++ b/ports/winnt/include/ntp_iocpltypes.h @@ -22,41 +22,38 @@ typedef struct interface endpt; typedef struct recvbuf recvbuf_t; /* --------------------------------------------------------------------- - * shared lock to avoid concurrent deletes on IO related stuff like - * RIO or ENDPOINT blocks. - * - * Basically we wwould need a multiple-reader/single-writer lock, - * but for now we do full mutual exclusion. + * shared control structure for IO. Removal of communication handles + * or other detach-like operations must be done exclusively by the IO + * thread, or Bad Things (tm) are bound to happen! */ -typedef struct SharedLock SharedLock_t; -typedef const struct SharedLock CSharedLock_t; -struct SharedLock { - CRITICAL_SECTION mutex[1]; +typedef struct IoHndPad IoHndPad_T; +typedef const struct IoHndPad CIoHndPad_T; +struct IoHndPad { volatile u_long refc_count; union { RIO_t * rio; /* RIO back-link (for offload) */ - endpt * ept; /* inetrface backlink */ + endpt * ept; /* interface backlink */ ULONG_PTR key; /* as key for IOCPL queue */ void * any; } rsrc; /* registered source */ HANDLE handles[2]; /* 0->COM/SOCK 1->BCASTSOCK */ + + /* COMPORT specific stuff */ int riofd; /* FD for comports */ + unsigned int flDropEmpty : 1; /* no empty line*/ + unsigned int flFirstSeen : 1; }; -typedef BOOL(__fastcall * LockPredicateT)(CSharedLock_t*); +typedef BOOL(__fastcall * IoPreCheck_T)(CIoHndPad_T*); -extern SharedLock_t* __fastcall slCreate(void * rsrc); -extern SharedLock_t* __fastcall slAttach(SharedLock_t*); -extern SharedLock_t* __fastcall slDetach(SharedLock_t*); -extern SharedLock_t* __fastcall slAttachShared(SharedLock_t*); -extern SharedLock_t* __fastcall slDetachShared(SharedLock_t*); -extern SharedLock_t* __fastcall slAttachExclusive(SharedLock_t*); -extern SharedLock_t* __fastcall slDetachExclusive(SharedLock_t*); +extern IoHndPad_T* __fastcall iohpCreate(void * rsrc); +extern IoHndPad_T* __fastcall iohpAttach(IoHndPad_T*); +extern IoHndPad_T* __fastcall iohpDetach(IoHndPad_T*); -extern BOOL __fastcall slRefClockOK(CSharedLock_t*); -extern BOOL __fastcall slEndPointOK(CSharedLock_t*); +extern BOOL __fastcall iohpRefClockOK(CIoHndPad_T*); +extern BOOL __fastcall iohpEndPointOK(CIoHndPad_T*); -extern BOOL slQueueLocked(SharedLock_t*, LockPredicateT, recvbuf_t*); +extern BOOL iohpQueueLocked(CIoHndPad_T*, IoPreCheck_T, recvbuf_t*); /* --------------------------------------------------------------------- @@ -131,7 +128,7 @@ struct IoCtx { SOCKET sfd; /* socket descriptor */ } io; /* the IO resource used */ IoCompleteFunc onIoDone; /* HL callback to execute */ - SharedLock_t * slock; + IoHndPad_T * iopad; DevCtx_t * devCtx; DWORD errCode; /* error code of last I/O */ DWORD byteCount; /* byte count " */ @@ -152,10 +149,10 @@ struct IoCtx { typedef BOOL (__fastcall *IoCtxStarterT)(IoCtx_t*, recvbuf_t*); -extern IoCtx_t* __fastcall IoCtxAlloc(SharedLock_t*, DevCtx_t*); +extern IoCtx_t* __fastcall IoCtxAlloc(IoHndPad_T*, DevCtx_t*); extern void __fastcall IoCtxFree(IoCtx_t*); extern void __fastcall IoCtxRelease(IoCtx_t*); -extern BOOL IoCtxStartLocked(IoCtx_t*, IoCtxStarterT, recvbuf_t*); +extern BOOL IoCtxStartChecked(IoCtx_t*, IoCtxStarterT, recvbuf_t*); #endif /*!defined(NTP_IOCPLTYPES_H)*/ diff --git a/ports/winnt/ntpd/ntp_iocompletionport.c b/ports/winnt/ntpd/ntp_iocompletionport.c index 90bdd99e1..251ba6a43 100644 --- a/ports/winnt/ntpd/ntp_iocompletionport.c +++ b/ports/winnt/ntpd/ntp_iocompletionport.c @@ -77,6 +77,12 @@ enum io_packet_handling { PKT_SOCKET_ERROR }; +static const char * const st_packet_handling[3] = { + "accepted", + "dropped" + "error" +}; + /* * local function definitions */ @@ -112,6 +118,7 @@ static void free_io_completion_port_mem(void); HANDLE WaitableIoEventHandle; static HANDLE hndIOCPLPort; static HANDLE hMainThread; +static HANDLE hMainRpcDone; static BOOL DoPPShack; DWORD ActiveWaitHandles; @@ -262,6 +269,11 @@ init_io_completion_port(void) msyslog(LOG_ERR, "Can't create exit event handle: %m"); exit(1); } + hMainRpcDone = CreateEvent(NULL, FALSE, FALSE, NULL); + if (hMainRpcDone == NULL) { + msyslog(LOG_ERR, "Can't create RPC sync handle: %m"); + exit(1); + } /* Create the IO completion port */ hndIOCPLPort = CreateIoCompletionPort( @@ -343,6 +355,8 @@ uninit_io_completion_port( hIoCompletionThread = NULL; CloseHandle(hndIOCPLPort); hndIOCPLPort = NULL; + CloseHandle(hMainRpcDone); + hMainRpcDone = NULL; } @@ -410,6 +424,24 @@ free_io_completion_port_mem(void) } #endif /* DEBUG */ +void +iocpl_notify( + IoHndPad_T * iopad, + void (*pfunc)(ULONG_PTR, IoCtx_t *), + UINT_PTR fdn + ) +{ + IoCtx_t xf; + + memset(&xf, 0, sizeof(xf)); + xf.iopad = iopad; + xf.ppswake = hMainRpcDone; + xf.onIoDone = pfunc; + xf.io.sfd = fdn; + PostQueuedCompletionStatus(hndIOCPLPort, 1, 0, &xf.ol); + WaitForSingleObject(xf.ppswake, INFINITE); +} + /* * ------------------------------------------------------------------- * APC callback for scheduling interface scans. @@ -526,15 +558,14 @@ getRioFromIoCtx( * lock, the check for errors. If the error indicates the * operation was cancelled, let the operation fail silently. */ - RIO_t * rio = NULL; - SharedLock_t * slock = slAttachShared(ctx->slock); - if (NULL != slock) { - rio = slock->rsrc.rio; - if (key != slock->rsrc.key) + RIO_t * rio = NULL; + IoHndPad_T * iopad = ctx->iopad; + if (NULL != iopad) { + rio = iopad->rsrc.rio; + if (key != iopad->rsrc.key) rio = NULL; - else if (ctx->io.hnd != slock->handles[0]) + else if (ctx->io.hnd != iopad->handles[0]) rio = NULL; - slDetachShared(slock); } if (rio != NULL) switch (ctx->errCode) { /* When we got cancelled, don't spill messages */ @@ -577,14 +608,13 @@ getEndptFromIoCtx( * it out in both the input key and the registered source. */ endpt * ep = NULL; - SharedLock_t * slock = slAttachShared(ctx->slock); - if (slock != NULL) { - ep = slock->rsrc.ept; - if ((key >> 1) != (slock->rsrc.key >> 1)) + IoHndPad_T * iopad = ctx->iopad; + if (iopad != NULL) { + ep = iopad->rsrc.ept; + if ((key >> 1) != (iopad->rsrc.key >> 1)) ep = NULL; - else if (ctx->io.hnd != slock->handles[key & 1]) + else if (ctx->io.hnd != iopad->handles[key & 1]) ep = NULL; - slDetachShared(slock); } if (ep == NULL) IoCtxRelease(ctx); @@ -703,7 +733,7 @@ QueueSerialWait( lpo->recv_buf = buff; lpo->flRawMem = 0; - buff->fd = lpo->slock->riofd; + buff->fd = lpo->iopad->riofd; /* keep receive position for continuation of partial lines! */ rc = WaitCommEvent(lpo->io.hnd, &lpo->aux.com_events, &lpo->ol); return rc || IoResultCheck(GetLastError(), lpo, msg); @@ -732,7 +762,7 @@ OnSerialWaitComplete( /* start next IO and leave if we hit an error */ if (lpo->errCode != ERROR_SUCCESS) { memset(&lpo->aux, 0, sizeof(lpo->aux)); - IoCtxStartLocked(lpo, QueueSerialWait, lpo->recv_buf); + IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf); return; } @@ -761,13 +791,13 @@ OnSerialWaitComplete( dev->pps_data.cc_assert++; dev->pps_data.ts_assert = lpo->aux.RecvTime; DPRINTF(2, ("upps-real: fd %d DCD PPS Rise at %s\n", - lpo->slock->rsrc.rio->fd, + lpo->iopad->rsrc.rio->fd, ulfptoa(&lpo->aux.RecvTime, 6))); } else { dev->pps_data.cc_clear++; dev->pps_data.ts_clear = lpo->aux.RecvTime; DPRINTF(2, ("upps-real: fd %d DCD PPS Fall at %s\n", - lpo->slock->rsrc.rio->fd, + lpo->iopad->rsrc.rio->fd, ulfptoa(&lpo->aux.RecvTime, 6))); } /* Update PPS buffer, writing from low to high, with index @@ -797,7 +827,7 @@ OnSerialWaitComplete( lpo->aux.DCDSTime = lpo->aux.RecvTime; lpo->aux.flTsDCDS = 1; DPRINTF(2, ("upps-hack: fd %d DCD PPS Rise at %s\n", - lpo->slock->rsrc.rio->fd, + lpo->iopad->rsrc.rio->fd, ulfptoa(&lpo->aux.RecvTime, 6))); } } @@ -806,13 +836,13 @@ OnSerialWaitComplete( if (EV_RXFLAG & lpo->aux.com_events) { /* line discipline */ lpo->aux.FlagTime = lpo->aux.RecvTime; lpo->aux.flTsFlag = 1; - IoCtxStartLocked(lpo, QueueSerialRead, lpo->recv_buf); + IoCtxStartChecked(lpo, QueueSerialRead, lpo->recv_buf); } else if (EV_RXCHAR & lpo->aux.com_events) { /* raw discipline */ lpo->aux.FlagTime = lpo->aux.RecvTime; lpo->aux.flTsFlag = 1; - IoCtxStartLocked(lpo, QueueRawSerialRead, lpo->recv_buf); + IoCtxStartChecked(lpo, QueueRawSerialRead, lpo->recv_buf); } else { /* idle... */ - IoCtxStartLocked(lpo, QueueSerialWait, lpo->recv_buf); + IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf); } } @@ -839,10 +869,10 @@ QueueSerialReadCommon( lpo->flRawMem = 0; /* 'buff->recv_length' must be set already! */ - buff->fd = lpo->slock->riofd; + buff->fd = lpo->iopad->riofd; buff->dstadr = NULL; buff->receiver = process_refclock_packet; - buff->recv_peer = lpo->slock->rsrc.rio->srcclock; + buff->recv_peer = lpo->iopad->rsrc.rio->srcclock; rc = ReadFile(lpo->io.hnd, (char*)buff->recv_buffer + buff->recv_length, @@ -911,7 +941,7 @@ OnSerialReadComplete( wait_again: /* make sure the read is issued again */ memset(&lpo->aux, 0, sizeof(lpo->aux)); - IoCtxStartLocked(lpo, QueueSerialWait, lpo->recv_buf); + IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf); } @@ -926,122 +956,180 @@ wait_again: * the IO completion thread can resume faster. * * !!ATTENTION!! - * This function runs on an arbitrary worker thread, and not under the - * protection of the shared lock! Accessing the RIO structure must set - * the lock explicitely! + * This function runs on an arbitrary worker thread. The resource + * management with regard to IO is synchronised only between the main + * thread and the IO worker thread, so decisions about queueing and + * starting new IO must be made by either of them. + * + * Since the IO thread sticks in the IOCPL queue and is not alertable, + * we could either use the APC queue to the main thread or the IOCPL + * queue to the IO thread. + * + * We separate the effort -- filtering based on the RIO state is done + * by the main thread, restarting the IO by the IO thread to reduce + * delays. + */ + +/* ------------------------------------------------------------------- + * IOCPL deferred bouncer -- start a new serial wait from IOCPL thread + */ +static void +OnDeferredStartWait( + ULONG_PTR key, + IoCtx_t * lpo +) +{ + IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf); +} + +/* ------------------------------------------------------------------- + * APC deferred bouncer -- put buffer to receive queueor eventually + * discard it if source is already disabled. Runs in the context + * of the main thread exclusively. + */ +static void WINAPI +OnEnqueAPC( + ULONG_PTR arg +) +{ + recvbuf_t * buff = (recvbuf_t*)arg; + IoHndPad_T * iopad = (IoHndPad_T*)buff->recv_peer; + RIO_t * rio = iopad->rsrc.rio; + + /* Down below we make a nasty hack to transport the iopad + * pointer in the buffer so we can avoid another temporary + * allocation. We must undo this here. + */ + if (NULL != rio) { + /* OK, refclock still attached */ + buff->recv_peer = rio->srcclock; + if (iohpQueueLocked(iopad, iohpRefClockOK, buff)) + ++rio->srcclock->received; + } else { + /* refclock detached while in flight... */ + freerecvbuf(buff); + } + iohpDetach(iopad); /* one unit owned by this callback! */ +} + +/* ------------------------------------------------------------------- + * worker pool thread worker doing the string processing */ static DWORD WINAPI OnSerialReadWorker( void * ctx ) { - IoCtx_t * lpo; - SharedLock_t * slock; - recvbuf_t * buff, *obuf; + IoCtx_t * lpo = (IoCtx_t*)ctx; + IoHndPad_T * iop = lpo->iopad; + recvbuf_t * buff = lpo->recv_buf; + recvbuf_t * obuf = NULL; char *sptr, *send, *dptr; BOOL eol; - char ch; - BOOL active; - u_long rcvcnt; - RIO_t * rio; - - /* Get context back */ - lpo = (IoCtx_t*)ctx; - buff = lpo->recv_buf; - - /* query the lock structure under mutual exclusion */ - active = FALSE; - rcvcnt = 0; - if (NULL != (slock = slAttachShared(lpo->slock))) { - if (NULL != (rio = slock->rsrc.rio)) { - active = TRUE; - rcvcnt = InterlockedIncrement(&rio->recvcount) - 1; - } - slDetachShared(slock); - } - - /* bail out if we're disconnected now */ - if (!active) { - IoCtxRelease(ctx); - return 0; - } + int ch; - /* Ignore zero-byte reads due to closure on fd. - * Eat the first line of input as it's possibly partial. + /* We should never gat a zero-byte read here. If we do, nothing + * really bad happens, just a useless rescan of data we have + * already processed. But somethings not quite right in logic + * and we croak loudly in debug builds. */ - if (lpo->byteCount && rcvcnt) { - /* account for additional input */ - buff->recv_length += (int)lpo->byteCount; - - /* - * Now mimic the Unix line discipline. - */ - sptr = (char *)buff->recv_buffer; - send = sptr + buff->recv_length; - obuf = NULL; - dptr = NULL; - - /* hack #1: eat away leading CR/LF if there is any */ - while (sptr != send) { - ch = *sptr; - if (ch != '\n' && ch != '\r') - break; - sptr++; - } + DEBUG_INSIST(lpo->byteCount > 0); - while (sptr != send) { - /* get new buffer to store line */ - obuf = get_free_recv_buffer_alloc(); - obuf->fd = buff->fd; - obuf->receiver = buff->receiver; - obuf->dstadr = NULL; - obuf->recv_peer = buff->recv_peer; - set_serial_recv_time(obuf, lpo); - - /* Copy data to new buffer, convert CR to LF on - * the fly. Stop after either. - */ - dptr = (char *)obuf->recv_buffer; - eol = FALSE; - while (sptr != send && !eol) { - ch = *sptr++; - if ('\r' == ch) - ch = '\n'; - *dptr++ = ch; - eol = ('\n' == ch); - } - obuf->recv_length = - (int)(dptr - (char *)obuf->recv_buffer); + /* Account for additional input and then mimic the UNIX line + * discipline. This is an implict state machine -- the + * implementation is very low-level to gather speed. + */ + buff->recv_length += (int)lpo->byteCount; + sptr = (char *)buff->recv_buffer; + send = sptr + buff->recv_length; + if (sptr == send) + goto st_read_fresh; + +st_new_obuf: + /* Get new receive buffer to store the line. */ + obuf = get_free_recv_buffer_alloc(); + obuf->fd = buff->fd; + obuf->receiver = buff->receiver; + obuf->dstadr = NULL; + obuf->recv_peer = buff->recv_peer; + set_serial_recv_time(obuf, lpo); + +st_copy_start: + /* Copy data to new buffer, convert CR to LF on the fly. + * Stop after either. + */ + dptr = (char *)obuf->recv_buffer; + do { + ch = *sptr++; + if ('\r' == ch) + ch = '\n'; + *dptr++ = ch; + eol = ('\n' == ch); + } while (!(eol || sptr == send)); + obuf->recv_length = (int)(dptr - (char *)obuf->recv_buffer); + + /* If we're not at EOL, we need more data to continue the line. + * But this can only be done if there's more room in the buffer; + * if we have already reached the maximum size, treat the whole + * buffer as part of a mega-line and pass it on. + */ + if (!eol) { + if (obuf->recv_length < sizeof(obuf->recv_buffer)) + goto st_read_more; + else + goto st_pass_buffer; + } - /* If NL found, push this buffer and prepare to - * get a new one. Be prepared for concurrent - * removal of the clock... - */ - if (eol) { - slQueueLocked(lpo->slock, slRefClockOK, obuf); - obuf = NULL; /* consumed in any case */ - } - } + /* if we should drop empty lines, do it here. */ + if (obuf->recv_length < 2 && iop->flDropEmpty) { + obuf->recv_length = 0; + if (sptr != send) + goto st_copy_start; + else + goto st_read_more; + } - /* If we still have an output buffer, prepare it to be - * used for added input from the ComPort. Otherwise - * use the current input buffer again. - */ - if (obuf) { - obuf->recv_length = - (int)(dptr - (char *)obuf->recv_buffer); - freerecvbuf(buff); - buff = obuf; - } else { - /* clear the current buffer, continue */ - buff->recv_length = 0; - } - } else { - buff->recv_length = 0; + if ( ! iop->flFirstSeen) { + iop->flFirstSeen = 1; + obuf->recv_length = 0; + if (sptr != send) + goto st_copy_start; + else + goto st_read_more; } - /* start next round -- must hold the lock during that! */ - IoCtxStartLocked(lpo, QueueSerialWait, buff); +st_pass_buffer: + /* if we arrive here, we can spin off another text line to the + * receive queue. We use a hack to supplant the RIO pointer in + * the receive buffer with the IOPAD to save us a temporary + * workspace allocation. Note the callback owns one refcount + * unit to keep the IOPAD alive! Also checking that the RIO in + * the IOPAD matches the RIO in the buffer is dangerous: That + * pointer is manipulated by the other threads! + */ + obuf->recv_peer = (struct peer*)iohpAttach(lpo->iopad); + QueueUserAPC(OnEnqueAPC, hMainThread, (ULONG_PTR)obuf); + if (sptr != send) + goto st_new_obuf; + buff->recv_length = 0; + goto st_read_fresh; + +st_read_more: + /* read more data into current OBUF, which is valid and will + * replace BUFF. + */ + lpo->recv_buf = obuf; + freerecvbuf(buff); + +st_read_fresh: + /* Start next round. This is deferred to the IOCPL thread, as + * read access to the IOPAD is unsafe from a worker thread + * for anything but the flags. If the IOCPL handle is gone, + * just mop up the pieces. + */ + lpo->onIoDone = OnDeferredStartWait; + if (!(hndIOCPLPort && PostQueuedCompletionStatus(hndIOCPLPort, 1, 0, &lpo->ol))) + IoCtxRelease(lpo); return 0; } @@ -1091,10 +1179,10 @@ OnRawSerialReadComplete( if (lpo->errCode == ERROR_SUCCESS && lpo->byteCount > 0) { buff->recv_length = (int)lpo->byteCount; set_serial_recv_time(buff, lpo); - slQueueLocked(lpo->slock, slRefClockOK, buff); + iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff); buff = get_free_recv_buffer_alloc(); } - IoCtxStartLocked(lpo, QueueSerialWait, buff); + IoCtxStartChecked(lpo, QueueSerialWait, buff); } @@ -1290,7 +1378,7 @@ io_completion_port_add_clock_io( IoCtx_t * lpo; HANDLE h; - SharedLock_t * slock = NULL; + IoHndPad_T * iopad = NULL; /* preset to clear state for error cleanup:*/ rio->ioreg_ctx = NULL; @@ -1304,14 +1392,14 @@ io_completion_port_add_clock_io( } ; - if ( ! (rio->ioreg_ctx = slock = slCreate(rio))) { + if ( ! (rio->ioreg_ctx = iopad = iohpCreate(rio))) { msyslog(LOG_ERR, "%s: Failed to create shared lock", msgh); goto fail; } - slock->handles[0] = h; - slock->riofd = rio->fd; - slock->rsrc.rio = rio; + iopad->handles[0] = h; + iopad->riofd = rio->fd; + iopad->rsrc.rio = rio; if (!(rio->device_ctx = DevCtxAttach(serial_devctx(h)))) { msyslog(LOG_ERR, "%s: Failed to allocate device context", @@ -1319,7 +1407,7 @@ io_completion_port_add_clock_io( goto fail; } - if ( ! (lpo = IoCtxAlloc(slock, rio->device_ctx))) { + if ( ! (lpo = IoCtxAlloc(iopad, rio->device_ctx))) { msyslog(LOG_ERR, "%: Failed to allocate IO context", msgh); goto fail; @@ -1335,29 +1423,45 @@ io_completion_port_add_clock_io( return QueueSerialWait(lpo, get_free_recv_buffer_alloc()); fail: - rio->ioreg_ctx = slDetach(rio->ioreg_ctx); + rio->ioreg_ctx = iohpDetach(rio->ioreg_ctx); rio->device_ctx = DevCtxDetach(rio->device_ctx); return FALSE; } /* ----------------------------------------------------------------- */ +static void +OnSerialDetach( + ULONG_PTR key, + IoCtx_t * lpo +) +{ + /* Make sure the key matches the context info in the shared + * lock, the check for errors. If the error indicates the + * operation was cancelled, let the operation fail silently. + */ + IoHndPad_T * iopad = lpo->iopad; + + INSIST(NULL != iopad); + if (iopad->handles[0] == lpo->io.hnd) { + iopad->handles[0] = INVALID_HANDLE_VALUE; + iopad->handles[1] = INVALID_HANDLE_VALUE; + iopad->rsrc.rio = NULL; + iopad->riofd = -1; + } + SetEvent(lpo->ppswake); +} + + void io_completion_port_remove_clock_io( RIO_t *rio ) { - SharedLock_t * slock = NULL; - if (rio && NULL != (slock = slAttachExclusive(rio->ioreg_ctx))) { - slDetach(slock); - - slock->handles[0] = INVALID_HANDLE_VALUE; - slock->handles[1] = INVALID_HANDLE_VALUE; - slock->rsrc.rio = NULL; - slock->riofd = -1; + IoHndPad_T * iopad = (IoHndPad_T*)rio->ioreg_ctx; - rio->device_ctx = DevCtxDetach(rio->device_ctx); - rio->ioreg_ctx = slDetachExclusive(slock); - } + INSIST(hndIOCPLPort && hMainRpcDone); + if (iopad) + iocpl_notify(iopad, OnSerialDetach, _get_osfhandle(rio->fd)); } /* @@ -1393,7 +1497,7 @@ QueueSocketRecv( buff->fd = lpo->io.sfd; buff->recv_srcadr_len = sizeof(buff->recv_srcadr); buff->receiver = receive; - buff->dstadr = lpo->slock->rsrc.ept; + buff->dstadr = lpo->iopad->rsrc.ept; wsabuf.buf = (char *)buff->recv_buffer; wsabuf.len = sizeof(buff->recv_buffer); @@ -1414,15 +1518,16 @@ OnSocketRecv( static const char * const msg = "OnSocketRecv: receive from socket failed"; - recvbuf_t * buff = NULL; - SharedLock_t * slock = NULL; - BOOL epOK = TRUE; - int retCode = PKT_OK; + recvbuf_t * buff = NULL; + IoHndPad_T * iopad = NULL; + endpt * ep = NULL; + int rc; - /* Make sure this endpoint is not closed. */ - endpt * ep = getEndptFromIoCtx(lpo, key); - retCode = socketErrorCheck(lpo, msg); + /* order is important -- check first, then get endpoint! */ + rc = socketErrorCheck(lpo, msg); + ep = getEndptFromIoCtx(lpo, key); + /* Make sure this endpoint is not closed. */ if (ep == NULL) return; @@ -1430,10 +1535,10 @@ OnSocketRecv( * Since we must not use the context object once it is in * another IO, we go through some pains to read everything * before going out for another read request. - * We also need an extra hold to the SLOCK structure. + * We also need an extra hold to the IOPAD structure. */ - slock = slAttach(lpo->slock); - if (retCode == PKT_OK && lpo->byteCount > 0) { + iopad = iohpAttach(lpo->iopad); + if (rc == PKT_OK && lpo->byteCount > 0) { /* keep input buffer, create new one for IO */ buff = lpo->recv_buf; lpo->recv_buf = get_free_recv_buffer_alloc(); @@ -1443,8 +1548,8 @@ OnSocketRecv( } /* Note: else we use the current buffer again */ - if (retCode != PKT_SOCKET_ERROR) { - IoCtxStartLocked(lpo, QueueSocketRecv, lpo->recv_buf); + if (rc != PKT_SOCKET_ERROR) { + IoCtxStartChecked(lpo, QueueSocketRecv, lpo->recv_buf); } else { freerecvbuf(lpo->recv_buf); IoCtxFree(lpo); @@ -1464,23 +1569,18 @@ OnSocketRecv( (int)buff->fd, stoa(&buff->recv_srcadr), get_packet_mode(buff))); - if (slAttachShared(slock)) { - if (slEndPointOK(slock)) { - InterlockedIncrement(&ep->received); - slDetachShared(slock); - InterlockedIncrement(&packets_received); - InterlockedIncrement(&handler_pkts); - } else { - slDetachShared(slock); - } + if (iohpEndPointOK(iopad)) { + InterlockedIncrement(&ep->received); + InterlockedIncrement(&packets_received); + InterlockedIncrement(&handler_pkts); } DPRINTF(2, ("Received %d bytes fd %d in buffer %p from %s, state = %s\n", buff->recv_length, (int)buff->fd, buff, - stoa(&buff->recv_srcadr), epOK? "Accepted" : "Ignored")); - slQueueLocked(slock, slEndPointOK, buff); + stoa(&buff->recv_srcadr), st_packet_handling[rc])); + iohpQueueLocked(iopad, iohpEndPointOK, buff); } - slDetach(slock); + iohpDetach(iopad); } /* ----------------------------------------------------------------- */ @@ -1494,26 +1594,23 @@ OnSocketSend( static const char * const msg = "OnSocketSend: send to socket failed"; - SharedLock_t * slock = NULL; - endpt * ep = getEndptFromIoCtx(lpo, key); - int retCode = socketErrorCheck(lpo, msg); + IoHndPad_T * iopad = NULL; + endpt * ep = NULL; + int rc; + + /* order is important -- check first, then get endpoint! */ + rc = socketErrorCheck(lpo, msg); + ep = getEndptFromIoCtx(lpo, key); + /* Make sure this endpoint is not closed. */ if (ep == NULL) return; - if (retCode != PKT_OK) { - slock = slAttachShared(lpo->slock); - if (slock) { - if (slEndPointOK(slock)) { - InterlockedIncrement(&ep->notsent); - InterlockedDecrement(&ep->sent); - slDetachShared(slock); - InterlockedIncrement(&packets_notsent); - InterlockedDecrement(&packets_sent); - } else { - slDetachShared(slock); - } - } + if (rc != PKT_OK) { + InterlockedIncrement(&ep->notsent); + InterlockedDecrement(&ep->sent); + InterlockedIncrement(&packets_notsent); + InterlockedDecrement(&packets_sent); } IoCtxRelease(lpo); } @@ -1522,6 +1619,23 @@ OnSocketSend( * register and de-register interface endpoints with the IO engine * -------------------------------------------------------------------- */ +static void +OnInterfaceDetach( + ULONG_PTR key, + IoCtx_t * lpo + ) +{ + IoHndPad_T * iopad = lpo->iopad; + + INSIST(NULL != iopad); + iopad->handles[0] = INVALID_HANDLE_VALUE; + iopad->handles[1] = INVALID_HANDLE_VALUE; + iopad->rsrc.ept = NULL; + + SetEvent(lpo->ppswake); +} + +/* ----------------------------------------------------------------- */ BOOL io_completion_port_add_interface( endpt * ep @@ -1530,7 +1644,7 @@ io_completion_port_add_interface( /* Registering an endpoint is simple: allocate a shared lock for * the enpoint and return if the allocation was successful. */ - ep->ioreg_ctx = slCreate(ep); + ep->ioreg_ctx = iohpCreate(ep); return ep->ioreg_ctx != NULL; } /* ----------------------------------------------------------------- */ @@ -1544,16 +1658,11 @@ io_completion_port_remove_interface( * endpoint pointer. Do an additional detach and leave the * write lock. */ - SharedLock_t * slock = slAttachExclusive(ep->ioreg_ctx); - if (slock != NULL) { - slDetach(slock); + IoHndPad_T * iopad = (IoHndPad_T*)ep->ioreg_ctx; - slock->handles[0] = INVALID_HANDLE_VALUE; - slock->handles[1] = INVALID_HANDLE_VALUE; - slock->rsrc.ept = NULL; - - ep->ioreg_ctx = slDetachExclusive(slock); - } + INSIST(hndIOCPLPort && hMainRpcDone); + if (iopad) + iocpl_notify(iopad, OnInterfaceDetach, -1); } /* -------------------------------------------------------------------- @@ -1561,7 +1670,24 @@ io_completion_port_remove_interface( * -------------------------------------------------------------------- */ -/* Add a socket handle to the I/O completion port, and send +static void +OnSocketDetach( + ULONG_PTR key, + IoCtx_t * lpo + ) +{ + IoHndPad_T * iopad = lpo->iopad; + + INSIST(NULL != iopad); + if (iopad->handles[0] == lpo->io.hnd) + iopad->handles[0] = INVALID_HANDLE_VALUE; + if (iopad->handles[1] == lpo->io.hnd) + iopad->handles[1] = INVALID_HANDLE_VALUE; + + SetEvent(lpo->ppswake); +} + +/* Add a socket handle to the I/O completion port, and send * NTP_RECVS_PER_SOCKET receive requests to the kernel. */ BOOL @@ -1580,18 +1706,17 @@ io_completion_port_add_socket( IoCtx_t * lpo; size_t n; ULONG_PTR key; - SharedLock_t * slock = NULL; + IoHndPad_T * iopad = NULL; key = ((ULONG_PTR)ep & ~(ULONG_PTR)1u) + !!bcast; - if (NULL == (slock = slAttachExclusive(ep->ioreg_ctx))) { + if (NULL == (iopad = (IoHndPad_T*)ep->ioreg_ctx)) { msyslog(LOG_CRIT, "io_completion_port_add_socket: endpt = %p not registered, exiting", ep); exit(1); } else { - endpt * rep = slock->rsrc.ept; - slock->handles[!!bcast] = (HANDLE)sfd; - slDetachExclusive(slock); + endpt * rep = iopad->rsrc.ept; + iopad->handles[!!bcast] = (HANDLE)sfd; INSIST(rep == ep); } @@ -1613,7 +1738,7 @@ io_completion_port_add_socket( return TRUE; fail: - ep->ioreg_ctx = slDetach(ep->ioreg_ctx); + ep->ioreg_ctx = iohpDetach(ep->ioreg_ctx); return FALSE; } /* ----------------------------------------------------------------- */ @@ -1626,15 +1751,11 @@ io_completion_port_remove_socket( /* Lock the shared lock for write, then search the given * socket handle and replace it with an invalid handle value. */ - SharedLock_t * lp = slAttachExclusive(ep->ioreg_ctx); - HANDLE sh = (HANDLE)fd; - if (lp != NULL) { - if (lp->handles[0] == sh) - lp->handles[0] = INVALID_HANDLE_VALUE; - else if (lp->handles[1] == sh) - lp->handles[1] = INVALID_HANDLE_VALUE; - slDetachExclusive(lp); - } + IoHndPad_T * iopad = (IoHndPad_T*)ep->ioreg_ctx; + + INSIST(hndIOCPLPort && hMainRpcDone); + if (iopad) + iocpl_notify(iopad, OnSocketDetach, fd); } @@ -1738,7 +1859,8 @@ GetReceivedBuffers(void) timer(); break; - case WAIT_IO_COMPLETION: /* loop */ + case WAIT_IO_COMPLETION: /* there might be something after APC */ + have_packet = !!full_recvbuffs(); break; case WAIT_TIMEOUT: diff --git a/ports/winnt/ntpd/ntp_iocpltypes.c b/ports/winnt/ntpd/ntp_iocpltypes.c index 538217a7b..0e8bd7b56 100644 --- a/ports/winnt/ntpd/ntp_iocpltypes.c +++ b/ports/winnt/ntpd/ntp_iocpltypes.c @@ -29,16 +29,15 @@ * lock is NOT aquired, and all IO handles or FDs are set to an * invalid value. */ -SharedLock_t* __fastcall -slCreate( +IoHndPad_T* __fastcall +iohpCreate( void * src ) { - SharedLock_t* retv; + IoHndPad_T* retv; - retv = IOCPLPoolAlloc(sizeof(SharedLock_t), "Lock"); + retv = IOCPLPoolAlloc(sizeof(IoHndPad_T), "Lock"); if (retv != NULL) { - InitializeCriticalSection(retv->mutex); retv->refc_count = 1; retv->rsrc.any = src; retv->handles[0] = INVALID_HANDLE_VALUE; @@ -52,9 +51,9 @@ slCreate( * Attach to a lock. This just increments the use count, but does not * aquire the internal lock. Return a pointer to the lock. */ -SharedLock_t* __fastcall -slAttach( - SharedLock_t * lp +IoHndPad_T* __fastcall +iohpAttach( + IoHndPad_T * lp ) { if (lp != NULL) @@ -70,84 +69,25 @@ slAttach( * * THE CALLER MUST NOT OWN THE INTERNAL LOCK WHEN DOING THIS! */ -SharedLock_t* __fastcall -slDetach( - SharedLock_t * lp +IoHndPad_T* __fastcall +iohpDetach( + IoHndPad_T * lp ) { if (lp != NULL && !InterlockedDecrement(&lp->refc_count)) { - DeleteCriticalSection(lp->mutex); - memset(lp, 0xFF, sizeof(SharedLock_t)); + memset(lp, 0xFF, sizeof(IoHndPad_T)); IOCPLPoolFree(lp, "Lock"); } return NULL; } -/* -------------------------------------------------------------------- - * Attach and aquire the lock for READ access. (This might block) - */ -SharedLock_t* __fastcall -slAttachShared( - SharedLock_t * lp - ) -{ - if (NULL != (lp = slAttach(lp))) - EnterCriticalSection(lp->mutex); - return lp; -} - -/* -------------------------------------------------------------------- - * Release the READ lock and detach from shared lock. - * Alwys returns NULL. - * - * THE CALLER MUST OWN THE READ LOCK WHEN DOING THIS. - */ -SharedLock_t* __fastcall -slDetachShared( - SharedLock_t * lp - ) -{ - if (lp != NULL) - LeaveCriticalSection(lp->mutex); - return slDetach(lp); -} - -/* -------------------------------------------------------------------- - * Attach and aquire the lock for WRITE access. (This might block) - */ -SharedLock_t* __fastcall -slAttachExclusive( - SharedLock_t * lp -) -{ - if (NULL != (lp = slAttach(lp))) - EnterCriticalSection(lp->mutex); - return lp; -} - -/* -------------------------------------------------------------------- - * Release the WRITE lock and detach from shared lock. - * Alwys returns NULL. - * - * THE CALLER MUST OWN THE WRITE LOCK WHEN DOING THIS. - */ -SharedLock_t* __fastcall -slDetachExclusive( - SharedLock_t * lp - ) -{ - if (lp != NULL) - LeaveCriticalSection(lp->mutex); - return slDetach(lp); -} - /* -------------------------------------------------------------------- * Predicate function: Is there an attached RIO, and is the RIO in * active state? */ BOOL __fastcall -slRefClockOK( - const SharedLock_t * lp +iohpRefClockOK( + const IoHndPad_T * lp ) { return lp->rsrc.rio && lp->rsrc.rio->active; @@ -158,8 +98,8 @@ slRefClockOK( * interface accepting packets? */ BOOL __fastcall -slEndPointOK( -const SharedLock_t * lp +iohpEndPointOK( +const IoHndPad_T * lp ) { return lp->rsrc.ept && !lp->rsrc.ept->ignore_packets; @@ -175,18 +115,17 @@ const SharedLock_t * lp * independent of the function result! */ BOOL -slQueueLocked( - SharedLock_t * lp, - LockPredicateT pred, +iohpQueueLocked( + CIoHndPad_T * lp, + IoPreCheck_T pred, recvbuf_t * buf ) { BOOL done = FALSE; - if (slAttachShared(lp)) { + if (lp) { done = (*pred)(lp); if (done) add_full_recv_buffer(buf); - slDetachShared(lp); } if (done) SetEvent(WaitableIoEventHandle); @@ -253,7 +192,7 @@ DevCtxDetach( */ IoCtx_t * __fastcall IoCtxAlloc( - SharedLock_t * lock, + IoHndPad_T * lock, DevCtx_t * devCtx ) { @@ -261,7 +200,7 @@ IoCtxAlloc( ctx = (IoCtx_t *)IOCPLPoolAlloc(sizeof(IoCtx_t), "IO ctx"); if (ctx != NULL) { - ctx->slock = slAttach(lock); + ctx->iopad = iohpAttach(lock); ctx->devCtx = DevCtxAttach(devCtx); } return ctx; @@ -280,7 +219,7 @@ IoCtxFree( ) { if (ctx) { - ctx->slock = slDetach(ctx->slock); + ctx->iopad = iohpDetach(ctx->iopad); ctx->devCtx = DevCtxDetach(ctx->devCtx); IOCPLPoolFree(ctx, "IO ctx"); } @@ -322,15 +261,13 @@ IoCtxAlive( ) { return ctx && - ctx->slock && - ctx->slock->rsrc.any; + ctx->iopad && + ctx->iopad->rsrc.any; } /* -------------------------------------------------------------------- * Start an IO operation on a given context object with a specified * function and buffer. - * This locks the shared lock on the context, checks for the lock - * being active, and only then runs the starter function. * * Returns TRUE if the starter was executed successfully, FALSE in * all other cases. @@ -339,22 +276,21 @@ IoCtxAlive( * call IN ANY CASE, independent of the function result! */ BOOL -IoCtxStartLocked( +IoCtxStartChecked( IoCtx_t * lpo, IoCtxStarterT func, recvbuf_t * buf ) { - BOOL done = FALSE; - SharedLock_t * slock = slAttachShared(lpo->slock); - if (slock != NULL) { - if ((lpo->io.hnd == slock->handles[0]) || - (lpo->io.hnd == slock->handles[1]) ) + BOOL done = FALSE; + IoHndPad_T * iopad = lpo->iopad; + if (iopad != NULL) { + if ((lpo->io.hnd == iopad->handles[0]) || + (lpo->io.hnd == iopad->handles[1]) ) { done = (func)(lpo, buf); - lpo = NULL; /* consumed by 'func' */ + lpo = NULL; /* consumed by 'func' */ } - slDetachShared(slock); } if (lpo != NULL) { freerecvbuf(buf);