PKT_SOCKET_ERROR
};
+static const char * const st_packet_handling[3] = {
+ "accepted",
+ "dropped"
+ "error"
+};
+
/*
* local function definitions
*/
HANDLE WaitableIoEventHandle;
static HANDLE hndIOCPLPort;
static HANDLE hMainThread;
+static HANDLE hMainRpcDone;
static BOOL DoPPShack;
DWORD ActiveWaitHandles;
msyslog(LOG_ERR, "Can't create exit event handle: %m");
exit(1);
}
+ hMainRpcDone = CreateEvent(NULL, FALSE, FALSE, NULL);
+ if (hMainRpcDone == NULL) {
+ msyslog(LOG_ERR, "Can't create RPC sync handle: %m");
+ exit(1);
+ }
/* Create the IO completion port */
hndIOCPLPort = CreateIoCompletionPort(
hIoCompletionThread = NULL;
CloseHandle(hndIOCPLPort);
hndIOCPLPort = NULL;
+ CloseHandle(hMainRpcDone);
+ hMainRpcDone = NULL;
}
}
#endif /* DEBUG */
+void
+iocpl_notify(
+ IoHndPad_T * iopad,
+ void (*pfunc)(ULONG_PTR, IoCtx_t *),
+ UINT_PTR fdn
+ )
+{
+ IoCtx_t xf;
+
+ memset(&xf, 0, sizeof(xf));
+ xf.iopad = iopad;
+ xf.ppswake = hMainRpcDone;
+ xf.onIoDone = pfunc;
+ xf.io.sfd = fdn;
+ PostQueuedCompletionStatus(hndIOCPLPort, 1, 0, &xf.ol);
+ WaitForSingleObject(xf.ppswake, INFINITE);
+}
+
/*
* -------------------------------------------------------------------
* APC callback for scheduling interface scans.
* lock, the check for errors. If the error indicates the
* operation was cancelled, let the operation fail silently.
*/
- RIO_t * rio = NULL;
- SharedLock_t * slock = slAttachShared(ctx->slock);
- if (NULL != slock) {
- rio = slock->rsrc.rio;
- if (key != slock->rsrc.key)
+ RIO_t * rio = NULL;
+ IoHndPad_T * iopad = ctx->iopad;
+ if (NULL != iopad) {
+ rio = iopad->rsrc.rio;
+ if (key != iopad->rsrc.key)
rio = NULL;
- else if (ctx->io.hnd != slock->handles[0])
+ else if (ctx->io.hnd != iopad->handles[0])
rio = NULL;
- slDetachShared(slock);
}
if (rio != NULL) switch (ctx->errCode) {
/* When we got cancelled, don't spill messages */
* it out in both the input key and the registered source.
*/
endpt * ep = NULL;
- SharedLock_t * slock = slAttachShared(ctx->slock);
- if (slock != NULL) {
- ep = slock->rsrc.ept;
- if ((key >> 1) != (slock->rsrc.key >> 1))
+ IoHndPad_T * iopad = ctx->iopad;
+ if (iopad != NULL) {
+ ep = iopad->rsrc.ept;
+ if ((key >> 1) != (iopad->rsrc.key >> 1))
ep = NULL;
- else if (ctx->io.hnd != slock->handles[key & 1])
+ else if (ctx->io.hnd != iopad->handles[key & 1])
ep = NULL;
- slDetachShared(slock);
}
if (ep == NULL)
IoCtxRelease(ctx);
lpo->recv_buf = buff;
lpo->flRawMem = 0;
- buff->fd = lpo->slock->riofd;
+ buff->fd = lpo->iopad->riofd;
/* keep receive position for continuation of partial lines! */
rc = WaitCommEvent(lpo->io.hnd, &lpo->aux.com_events, &lpo->ol);
return rc || IoResultCheck(GetLastError(), lpo, msg);
/* start next IO and leave if we hit an error */
if (lpo->errCode != ERROR_SUCCESS) {
memset(&lpo->aux, 0, sizeof(lpo->aux));
- IoCtxStartLocked(lpo, QueueSerialWait, lpo->recv_buf);
+ IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
return;
}
dev->pps_data.cc_assert++;
dev->pps_data.ts_assert = lpo->aux.RecvTime;
DPRINTF(2, ("upps-real: fd %d DCD PPS Rise at %s\n",
- lpo->slock->rsrc.rio->fd,
+ lpo->iopad->rsrc.rio->fd,
ulfptoa(&lpo->aux.RecvTime, 6)));
} else {
dev->pps_data.cc_clear++;
dev->pps_data.ts_clear = lpo->aux.RecvTime;
DPRINTF(2, ("upps-real: fd %d DCD PPS Fall at %s\n",
- lpo->slock->rsrc.rio->fd,
+ lpo->iopad->rsrc.rio->fd,
ulfptoa(&lpo->aux.RecvTime, 6)));
}
/* Update PPS buffer, writing from low to high, with index
lpo->aux.DCDSTime = lpo->aux.RecvTime;
lpo->aux.flTsDCDS = 1;
DPRINTF(2, ("upps-hack: fd %d DCD PPS Rise at %s\n",
- lpo->slock->rsrc.rio->fd,
+ lpo->iopad->rsrc.rio->fd,
ulfptoa(&lpo->aux.RecvTime, 6)));
}
}
if (EV_RXFLAG & lpo->aux.com_events) { /* line discipline */
lpo->aux.FlagTime = lpo->aux.RecvTime;
lpo->aux.flTsFlag = 1;
- IoCtxStartLocked(lpo, QueueSerialRead, lpo->recv_buf);
+ IoCtxStartChecked(lpo, QueueSerialRead, lpo->recv_buf);
} else if (EV_RXCHAR & lpo->aux.com_events) { /* raw discipline */
lpo->aux.FlagTime = lpo->aux.RecvTime;
lpo->aux.flTsFlag = 1;
- IoCtxStartLocked(lpo, QueueRawSerialRead, lpo->recv_buf);
+ IoCtxStartChecked(lpo, QueueRawSerialRead, lpo->recv_buf);
} else { /* idle... */
- IoCtxStartLocked(lpo, QueueSerialWait, lpo->recv_buf);
+ IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
}
}
lpo->flRawMem = 0;
/* 'buff->recv_length' must be set already! */
- buff->fd = lpo->slock->riofd;
+ buff->fd = lpo->iopad->riofd;
buff->dstadr = NULL;
buff->receiver = process_refclock_packet;
- buff->recv_peer = lpo->slock->rsrc.rio->srcclock;
+ buff->recv_peer = lpo->iopad->rsrc.rio->srcclock;
rc = ReadFile(lpo->io.hnd,
(char*)buff->recv_buffer + buff->recv_length,
wait_again:
/* make sure the read is issued again */
memset(&lpo->aux, 0, sizeof(lpo->aux));
- IoCtxStartLocked(lpo, QueueSerialWait, lpo->recv_buf);
+ IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
}
* the IO completion thread can resume faster.
*
* !!ATTENTION!!
- * This function runs on an arbitrary worker thread, and not under the
- * protection of the shared lock! Accessing the RIO structure must set
- * the lock explicitely!
+ * This function runs on an arbitrary worker thread. The resource
+ * management with regard to IO is synchronised only between the main
+ * thread and the IO worker thread, so decisions about queueing and
+ * starting new IO must be made by either of them.
+ *
+ * Since the IO thread sticks in the IOCPL queue and is not alertable,
+ * we could either use the APC queue to the main thread or the IOCPL
+ * queue to the IO thread.
+ *
+ * We separate the effort -- filtering based on the RIO state is done
+ * by the main thread, restarting the IO by the IO thread to reduce
+ * delays.
+ */
+
+/* -------------------------------------------------------------------
+ * IOCPL deferred bouncer -- start a new serial wait from IOCPL thread
+ */
+static void
+OnDeferredStartWait(
+ ULONG_PTR key,
+ IoCtx_t * lpo
+)
+{
+ IoCtxStartChecked(lpo, QueueSerialWait, lpo->recv_buf);
+}
+
+/* -------------------------------------------------------------------
+ * APC deferred bouncer -- put buffer to receive queueor eventually
+ * discard it if source is already disabled. Runs in the context
+ * of the main thread exclusively.
+ */
+static void WINAPI
+OnEnqueAPC(
+ ULONG_PTR arg
+)
+{
+ recvbuf_t * buff = (recvbuf_t*)arg;
+ IoHndPad_T * iopad = (IoHndPad_T*)buff->recv_peer;
+ RIO_t * rio = iopad->rsrc.rio;
+
+ /* Down below we make a nasty hack to transport the iopad
+ * pointer in the buffer so we can avoid another temporary
+ * allocation. We must undo this here.
+ */
+ if (NULL != rio) {
+ /* OK, refclock still attached */
+ buff->recv_peer = rio->srcclock;
+ if (iohpQueueLocked(iopad, iohpRefClockOK, buff))
+ ++rio->srcclock->received;
+ } else {
+ /* refclock detached while in flight... */
+ freerecvbuf(buff);
+ }
+ iohpDetach(iopad); /* one unit owned by this callback! */
+}
+
+/* -------------------------------------------------------------------
+ * worker pool thread worker doing the string processing
*/
static DWORD WINAPI
OnSerialReadWorker(
void * ctx
)
{
- IoCtx_t * lpo;
- SharedLock_t * slock;
- recvbuf_t * buff, *obuf;
+ IoCtx_t * lpo = (IoCtx_t*)ctx;
+ IoHndPad_T * iop = lpo->iopad;
+ recvbuf_t * buff = lpo->recv_buf;
+ recvbuf_t * obuf = NULL;
char *sptr, *send, *dptr;
BOOL eol;
- char ch;
- BOOL active;
- u_long rcvcnt;
- RIO_t * rio;
-
- /* Get context back */
- lpo = (IoCtx_t*)ctx;
- buff = lpo->recv_buf;
-
- /* query the lock structure under mutual exclusion */
- active = FALSE;
- rcvcnt = 0;
- if (NULL != (slock = slAttachShared(lpo->slock))) {
- if (NULL != (rio = slock->rsrc.rio)) {
- active = TRUE;
- rcvcnt = InterlockedIncrement(&rio->recvcount) - 1;
- }
- slDetachShared(slock);
- }
-
- /* bail out if we're disconnected now */
- if (!active) {
- IoCtxRelease(ctx);
- return 0;
- }
+ int ch;
- /* Ignore zero-byte reads due to closure on fd.
- * Eat the first line of input as it's possibly partial.
+ /* We should never gat a zero-byte read here. If we do, nothing
+ * really bad happens, just a useless rescan of data we have
+ * already processed. But somethings not quite right in logic
+ * and we croak loudly in debug builds.
*/
- if (lpo->byteCount && rcvcnt) {
- /* account for additional input */
- buff->recv_length += (int)lpo->byteCount;
-
- /*
- * Now mimic the Unix line discipline.
- */
- sptr = (char *)buff->recv_buffer;
- send = sptr + buff->recv_length;
- obuf = NULL;
- dptr = NULL;
-
- /* hack #1: eat away leading CR/LF if there is any */
- while (sptr != send) {
- ch = *sptr;
- if (ch != '\n' && ch != '\r')
- break;
- sptr++;
- }
+ DEBUG_INSIST(lpo->byteCount > 0);
- while (sptr != send) {
- /* get new buffer to store line */
- obuf = get_free_recv_buffer_alloc();
- obuf->fd = buff->fd;
- obuf->receiver = buff->receiver;
- obuf->dstadr = NULL;
- obuf->recv_peer = buff->recv_peer;
- set_serial_recv_time(obuf, lpo);
-
- /* Copy data to new buffer, convert CR to LF on
- * the fly. Stop after either.
- */
- dptr = (char *)obuf->recv_buffer;
- eol = FALSE;
- while (sptr != send && !eol) {
- ch = *sptr++;
- if ('\r' == ch)
- ch = '\n';
- *dptr++ = ch;
- eol = ('\n' == ch);
- }
- obuf->recv_length =
- (int)(dptr - (char *)obuf->recv_buffer);
+ /* Account for additional input and then mimic the UNIX line
+ * discipline. This is an implict state machine -- the
+ * implementation is very low-level to gather speed.
+ */
+ buff->recv_length += (int)lpo->byteCount;
+ sptr = (char *)buff->recv_buffer;
+ send = sptr + buff->recv_length;
+ if (sptr == send)
+ goto st_read_fresh;
+
+st_new_obuf:
+ /* Get new receive buffer to store the line. */
+ obuf = get_free_recv_buffer_alloc();
+ obuf->fd = buff->fd;
+ obuf->receiver = buff->receiver;
+ obuf->dstadr = NULL;
+ obuf->recv_peer = buff->recv_peer;
+ set_serial_recv_time(obuf, lpo);
+
+st_copy_start:
+ /* Copy data to new buffer, convert CR to LF on the fly.
+ * Stop after either.
+ */
+ dptr = (char *)obuf->recv_buffer;
+ do {
+ ch = *sptr++;
+ if ('\r' == ch)
+ ch = '\n';
+ *dptr++ = ch;
+ eol = ('\n' == ch);
+ } while (!(eol || sptr == send));
+ obuf->recv_length = (int)(dptr - (char *)obuf->recv_buffer);
+
+ /* If we're not at EOL, we need more data to continue the line.
+ * But this can only be done if there's more room in the buffer;
+ * if we have already reached the maximum size, treat the whole
+ * buffer as part of a mega-line and pass it on.
+ */
+ if (!eol) {
+ if (obuf->recv_length < sizeof(obuf->recv_buffer))
+ goto st_read_more;
+ else
+ goto st_pass_buffer;
+ }
- /* If NL found, push this buffer and prepare to
- * get a new one. Be prepared for concurrent
- * removal of the clock...
- */
- if (eol) {
- slQueueLocked(lpo->slock, slRefClockOK, obuf);
- obuf = NULL; /* consumed in any case */
- }
- }
+ /* if we should drop empty lines, do it here. */
+ if (obuf->recv_length < 2 && iop->flDropEmpty) {
+ obuf->recv_length = 0;
+ if (sptr != send)
+ goto st_copy_start;
+ else
+ goto st_read_more;
+ }
- /* If we still have an output buffer, prepare it to be
- * used for added input from the ComPort. Otherwise
- * use the current input buffer again.
- */
- if (obuf) {
- obuf->recv_length =
- (int)(dptr - (char *)obuf->recv_buffer);
- freerecvbuf(buff);
- buff = obuf;
- } else {
- /* clear the current buffer, continue */
- buff->recv_length = 0;
- }
- } else {
- buff->recv_length = 0;
+ if ( ! iop->flFirstSeen) {
+ iop->flFirstSeen = 1;
+ obuf->recv_length = 0;
+ if (sptr != send)
+ goto st_copy_start;
+ else
+ goto st_read_more;
}
- /* start next round -- must hold the lock during that! */
- IoCtxStartLocked(lpo, QueueSerialWait, buff);
+st_pass_buffer:
+ /* if we arrive here, we can spin off another text line to the
+ * receive queue. We use a hack to supplant the RIO pointer in
+ * the receive buffer with the IOPAD to save us a temporary
+ * workspace allocation. Note the callback owns one refcount
+ * unit to keep the IOPAD alive! Also checking that the RIO in
+ * the IOPAD matches the RIO in the buffer is dangerous: That
+ * pointer is manipulated by the other threads!
+ */
+ obuf->recv_peer = (struct peer*)iohpAttach(lpo->iopad);
+ QueueUserAPC(OnEnqueAPC, hMainThread, (ULONG_PTR)obuf);
+ if (sptr != send)
+ goto st_new_obuf;
+ buff->recv_length = 0;
+ goto st_read_fresh;
+
+st_read_more:
+ /* read more data into current OBUF, which is valid and will
+ * replace BUFF.
+ */
+ lpo->recv_buf = obuf;
+ freerecvbuf(buff);
+
+st_read_fresh:
+ /* Start next round. This is deferred to the IOCPL thread, as
+ * read access to the IOPAD is unsafe from a worker thread
+ * for anything but the flags. If the IOCPL handle is gone,
+ * just mop up the pieces.
+ */
+ lpo->onIoDone = OnDeferredStartWait;
+ if (!(hndIOCPLPort && PostQueuedCompletionStatus(hndIOCPLPort, 1, 0, &lpo->ol)))
+ IoCtxRelease(lpo);
return 0;
}
if (lpo->errCode == ERROR_SUCCESS && lpo->byteCount > 0) {
buff->recv_length = (int)lpo->byteCount;
set_serial_recv_time(buff, lpo);
- slQueueLocked(lpo->slock, slRefClockOK, buff);
+ iohpQueueLocked(lpo->iopad, iohpRefClockOK, buff);
buff = get_free_recv_buffer_alloc();
}
- IoCtxStartLocked(lpo, QueueSerialWait, buff);
+ IoCtxStartChecked(lpo, QueueSerialWait, buff);
}
IoCtx_t * lpo;
HANDLE h;
- SharedLock_t * slock = NULL;
+ IoHndPad_T * iopad = NULL;
/* preset to clear state for error cleanup:*/
rio->ioreg_ctx = NULL;
}
;
- if ( ! (rio->ioreg_ctx = slock = slCreate(rio))) {
+ if ( ! (rio->ioreg_ctx = iopad = iohpCreate(rio))) {
msyslog(LOG_ERR, "%s: Failed to create shared lock",
msgh);
goto fail;
}
- slock->handles[0] = h;
- slock->riofd = rio->fd;
- slock->rsrc.rio = rio;
+ iopad->handles[0] = h;
+ iopad->riofd = rio->fd;
+ iopad->rsrc.rio = rio;
if (!(rio->device_ctx = DevCtxAttach(serial_devctx(h)))) {
msyslog(LOG_ERR, "%s: Failed to allocate device context",
goto fail;
}
- if ( ! (lpo = IoCtxAlloc(slock, rio->device_ctx))) {
+ if ( ! (lpo = IoCtxAlloc(iopad, rio->device_ctx))) {
msyslog(LOG_ERR, "%: Failed to allocate IO context",
msgh);
goto fail;
return QueueSerialWait(lpo, get_free_recv_buffer_alloc());
fail:
- rio->ioreg_ctx = slDetach(rio->ioreg_ctx);
+ rio->ioreg_ctx = iohpDetach(rio->ioreg_ctx);
rio->device_ctx = DevCtxDetach(rio->device_ctx);
return FALSE;
}
/* ----------------------------------------------------------------- */
+static void
+OnSerialDetach(
+ ULONG_PTR key,
+ IoCtx_t * lpo
+)
+{
+ /* Make sure the key matches the context info in the shared
+ * lock, the check for errors. If the error indicates the
+ * operation was cancelled, let the operation fail silently.
+ */
+ IoHndPad_T * iopad = lpo->iopad;
+
+ INSIST(NULL != iopad);
+ if (iopad->handles[0] == lpo->io.hnd) {
+ iopad->handles[0] = INVALID_HANDLE_VALUE;
+ iopad->handles[1] = INVALID_HANDLE_VALUE;
+ iopad->rsrc.rio = NULL;
+ iopad->riofd = -1;
+ }
+ SetEvent(lpo->ppswake);
+}
+
+
void
io_completion_port_remove_clock_io(
RIO_t *rio
)
{
- SharedLock_t * slock = NULL;
- if (rio && NULL != (slock = slAttachExclusive(rio->ioreg_ctx))) {
- slDetach(slock);
-
- slock->handles[0] = INVALID_HANDLE_VALUE;
- slock->handles[1] = INVALID_HANDLE_VALUE;
- slock->rsrc.rio = NULL;
- slock->riofd = -1;
+ IoHndPad_T * iopad = (IoHndPad_T*)rio->ioreg_ctx;
- rio->device_ctx = DevCtxDetach(rio->device_ctx);
- rio->ioreg_ctx = slDetachExclusive(slock);
- }
+ INSIST(hndIOCPLPort && hMainRpcDone);
+ if (iopad)
+ iocpl_notify(iopad, OnSerialDetach, _get_osfhandle(rio->fd));
}
/*
buff->fd = lpo->io.sfd;
buff->recv_srcadr_len = sizeof(buff->recv_srcadr);
buff->receiver = receive;
- buff->dstadr = lpo->slock->rsrc.ept;
+ buff->dstadr = lpo->iopad->rsrc.ept;
wsabuf.buf = (char *)buff->recv_buffer;
wsabuf.len = sizeof(buff->recv_buffer);
static const char * const msg =
"OnSocketRecv: receive from socket failed";
- recvbuf_t * buff = NULL;
- SharedLock_t * slock = NULL;
- BOOL epOK = TRUE;
- int retCode = PKT_OK;
+ recvbuf_t * buff = NULL;
+ IoHndPad_T * iopad = NULL;
+ endpt * ep = NULL;
+ int rc;
- /* Make sure this endpoint is not closed. */
- endpt * ep = getEndptFromIoCtx(lpo, key);
- retCode = socketErrorCheck(lpo, msg);
+ /* order is important -- check first, then get endpoint! */
+ rc = socketErrorCheck(lpo, msg);
+ ep = getEndptFromIoCtx(lpo, key);
+ /* Make sure this endpoint is not closed. */
if (ep == NULL)
return;
* Since we must not use the context object once it is in
* another IO, we go through some pains to read everything
* before going out for another read request.
- * We also need an extra hold to the SLOCK structure.
+ * We also need an extra hold to the IOPAD structure.
*/
- slock = slAttach(lpo->slock);
- if (retCode == PKT_OK && lpo->byteCount > 0) {
+ iopad = iohpAttach(lpo->iopad);
+ if (rc == PKT_OK && lpo->byteCount > 0) {
/* keep input buffer, create new one for IO */
buff = lpo->recv_buf;
lpo->recv_buf = get_free_recv_buffer_alloc();
} /* Note: else we use the current buffer again */
- if (retCode != PKT_SOCKET_ERROR) {
- IoCtxStartLocked(lpo, QueueSocketRecv, lpo->recv_buf);
+ if (rc != PKT_SOCKET_ERROR) {
+ IoCtxStartChecked(lpo, QueueSocketRecv, lpo->recv_buf);
} else {
freerecvbuf(lpo->recv_buf);
IoCtxFree(lpo);
(int)buff->fd, stoa(&buff->recv_srcadr),
get_packet_mode(buff)));
- if (slAttachShared(slock)) {
- if (slEndPointOK(slock)) {
- InterlockedIncrement(&ep->received);
- slDetachShared(slock);
- InterlockedIncrement(&packets_received);
- InterlockedIncrement(&handler_pkts);
- } else {
- slDetachShared(slock);
- }
+ if (iohpEndPointOK(iopad)) {
+ InterlockedIncrement(&ep->received);
+ InterlockedIncrement(&packets_received);
+ InterlockedIncrement(&handler_pkts);
}
DPRINTF(2, ("Received %d bytes fd %d in buffer %p from %s, state = %s\n",
buff->recv_length, (int)buff->fd, buff,
- stoa(&buff->recv_srcadr), epOK? "Accepted" : "Ignored"));
- slQueueLocked(slock, slEndPointOK, buff);
+ stoa(&buff->recv_srcadr), st_packet_handling[rc]));
+ iohpQueueLocked(iopad, iohpEndPointOK, buff);
}
- slDetach(slock);
+ iohpDetach(iopad);
}
/* ----------------------------------------------------------------- */
static const char * const msg =
"OnSocketSend: send to socket failed";
- SharedLock_t * slock = NULL;
- endpt * ep = getEndptFromIoCtx(lpo, key);
- int retCode = socketErrorCheck(lpo, msg);
+ IoHndPad_T * iopad = NULL;
+ endpt * ep = NULL;
+ int rc;
+
+ /* order is important -- check first, then get endpoint! */
+ rc = socketErrorCheck(lpo, msg);
+ ep = getEndptFromIoCtx(lpo, key);
+
/* Make sure this endpoint is not closed. */
if (ep == NULL)
return;
- if (retCode != PKT_OK) {
- slock = slAttachShared(lpo->slock);
- if (slock) {
- if (slEndPointOK(slock)) {
- InterlockedIncrement(&ep->notsent);
- InterlockedDecrement(&ep->sent);
- slDetachShared(slock);
- InterlockedIncrement(&packets_notsent);
- InterlockedDecrement(&packets_sent);
- } else {
- slDetachShared(slock);
- }
- }
+ if (rc != PKT_OK) {
+ InterlockedIncrement(&ep->notsent);
+ InterlockedDecrement(&ep->sent);
+ InterlockedIncrement(&packets_notsent);
+ InterlockedDecrement(&packets_sent);
}
IoCtxRelease(lpo);
}
* register and de-register interface endpoints with the IO engine
* --------------------------------------------------------------------
*/
+static void
+OnInterfaceDetach(
+ ULONG_PTR key,
+ IoCtx_t * lpo
+ )
+{
+ IoHndPad_T * iopad = lpo->iopad;
+
+ INSIST(NULL != iopad);
+ iopad->handles[0] = INVALID_HANDLE_VALUE;
+ iopad->handles[1] = INVALID_HANDLE_VALUE;
+ iopad->rsrc.ept = NULL;
+
+ SetEvent(lpo->ppswake);
+}
+
+/* ----------------------------------------------------------------- */
BOOL
io_completion_port_add_interface(
endpt * ep
/* Registering an endpoint is simple: allocate a shared lock for
* the enpoint and return if the allocation was successful.
*/
- ep->ioreg_ctx = slCreate(ep);
+ ep->ioreg_ctx = iohpCreate(ep);
return ep->ioreg_ctx != NULL;
}
/* ----------------------------------------------------------------- */
* endpoint pointer. Do an additional detach and leave the
* write lock.
*/
- SharedLock_t * slock = slAttachExclusive(ep->ioreg_ctx);
- if (slock != NULL) {
- slDetach(slock);
+ IoHndPad_T * iopad = (IoHndPad_T*)ep->ioreg_ctx;
- slock->handles[0] = INVALID_HANDLE_VALUE;
- slock->handles[1] = INVALID_HANDLE_VALUE;
- slock->rsrc.ept = NULL;
-
- ep->ioreg_ctx = slDetachExclusive(slock);
- }
+ INSIST(hndIOCPLPort && hMainRpcDone);
+ if (iopad)
+ iocpl_notify(iopad, OnInterfaceDetach, -1);
}
/* --------------------------------------------------------------------
* --------------------------------------------------------------------
*/
-/* Add a socket handle to the I/O completion port, and send
+static void
+OnSocketDetach(
+ ULONG_PTR key,
+ IoCtx_t * lpo
+ )
+{
+ IoHndPad_T * iopad = lpo->iopad;
+
+ INSIST(NULL != iopad);
+ if (iopad->handles[0] == lpo->io.hnd)
+ iopad->handles[0] = INVALID_HANDLE_VALUE;
+ if (iopad->handles[1] == lpo->io.hnd)
+ iopad->handles[1] = INVALID_HANDLE_VALUE;
+
+ SetEvent(lpo->ppswake);
+}
+
+/* Add a socket handle to the I/O completion port, and send
* NTP_RECVS_PER_SOCKET receive requests to the kernel.
*/
BOOL
IoCtx_t * lpo;
size_t n;
ULONG_PTR key;
- SharedLock_t * slock = NULL;
+ IoHndPad_T * iopad = NULL;
key = ((ULONG_PTR)ep & ~(ULONG_PTR)1u) + !!bcast;
- if (NULL == (slock = slAttachExclusive(ep->ioreg_ctx))) {
+ if (NULL == (iopad = (IoHndPad_T*)ep->ioreg_ctx)) {
msyslog(LOG_CRIT, "io_completion_port_add_socket: endpt = %p not registered, exiting",
ep);
exit(1);
} else {
- endpt * rep = slock->rsrc.ept;
- slock->handles[!!bcast] = (HANDLE)sfd;
- slDetachExclusive(slock);
+ endpt * rep = iopad->rsrc.ept;
+ iopad->handles[!!bcast] = (HANDLE)sfd;
INSIST(rep == ep);
}
return TRUE;
fail:
- ep->ioreg_ctx = slDetach(ep->ioreg_ctx);
+ ep->ioreg_ctx = iohpDetach(ep->ioreg_ctx);
return FALSE;
}
/* ----------------------------------------------------------------- */
/* Lock the shared lock for write, then search the given
* socket handle and replace it with an invalid handle value.
*/
- SharedLock_t * lp = slAttachExclusive(ep->ioreg_ctx);
- HANDLE sh = (HANDLE)fd;
- if (lp != NULL) {
- if (lp->handles[0] == sh)
- lp->handles[0] = INVALID_HANDLE_VALUE;
- else if (lp->handles[1] == sh)
- lp->handles[1] = INVALID_HANDLE_VALUE;
- slDetachExclusive(lp);
- }
+ IoHndPad_T * iopad = (IoHndPad_T*)ep->ioreg_ctx;
+
+ INSIST(hndIOCPLPort && hMainRpcDone);
+ if (iopad)
+ iocpl_notify(iopad, OnSocketDetach, fd);
}
timer();
break;
- case WAIT_IO_COMPLETION: /* loop */
+ case WAIT_IO_COMPLETION: /* there might be something after APC */
+ have_packet = !!full_recvbuffs();
break;
case WAIT_TIMEOUT: