+/*
+-----------------------------------------------------------------------
+This is the IO completion port handling for async/overlapped IO on
+Windows >= Win2000.
+
+Some notes on the implementation:
+
++ Only one thread is used to serve the IO completion port, for several
+ reasons:
+
+ * First, there seems to be (have been?) trouble that locked up NTPD
+ when more than one thread was used for IOCPL.
+
+ * Second, for the sake of the time stamp interpolation the threads
+ must run on the same CPU as the time interpolation thread. This
+ makes using more than one thread useless, as they would compete for
+ the same core and create contention.
+
++ Some IO operations need a possibly lengthy postprocessing. Emulating
+ the UN*X line discipline is currently the only but prominent example.
+ To avoid the processing in the time-critical IOCPL thread, longer
+ processing is offloaded the worker thread pool.
+
++ A fact that seems not as well-known as it should be is that all
+ ressources passed to an overlapped IO operation must be considered
+ owned by the OS until the result has been fetched/dequeued. This
+ includes all overlapped structures and buffers involved, so cleaning
+ up on shutdown must be carefully constructed. (This includes closing
+ all the IO handles and waiting for the results to be dequeued.
+ 'CancleIo()' cannot be used since it's broken beyond repair.)
+
+ If this is not possible, then all ressources should be dropped into
+ oblivion -- otherwise "bad things (tm)" are bound to happen.
+
+ Using a private heap that is silently dropped but not deleted is a
+ good way to avoid cluttering memory stats with IO context related
+ objects. Leak tracing becomes more interesting, though.
+
+
+The current implementation is based on the work of Danny Mayer who wrote
+the original implementation and Dave Hart who improved on the serial I/O
+routines. This version still provides the 'user space PPS' emulation
+feature.
+
+Juergen Perlinger (perlinger@ntp.org) Feb 2012
+
+-----------------------------------------------------------------------
+*/
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
#define CONTAINEROF(p, type, member) \
((type *)((char *)(p) - offsetof(type, member)))
-/*
- * Request types
- */
-enum {
- SOCK_RECV,
- SOCK_SEND,
- SERIAL_WAIT,
- SERIAL_READ,
- RAW_SERIAL_READ,
- SERIAL_WRITE
-};
-
#ifdef _MSC_VER
# pragma warning(push)
# pragma warning(disable: 201) /* nonstd extension nameless union */
#endif
-typedef struct olplus_tag {
- OVERLAPPED ol;
- int request_type;
+/*
+ * ---------------------------------------------------------------------
+ * I/O context structure
+ *
+ * This is an extended overlapped structure. Some fields are only used
+ * for serial I/O, others are used for all operations. The serial I/O is
+ * more interesting since the same context object is used for waiting,
+ * actual I/O and possibly offload processing in a worker thread until
+ * a complete operation cycle is done.
+ *
+ * In this case the I/O context is used to gather all the bits that are
+ * finally needed for the processing of the buffer.
+ * ---------------------------------------------------------------------
+ */
+struct IoCtx;
+typedef struct IoCtx IoCtx_t;
+
+typedef void (*IoCompleteFunc)(ULONG_PTR, IoCtx_t *);
+
+struct IoCtx {
+ OVERLAPPED ol; /* 'kernel' part of the context */
union {
- recvbuf_t * recv_buf;
- void * trans_buf;
+ recvbuf_t * recv_buf; /* incoming -> buffer structure */
+ void * trans_buf; /* outgoing -> char array */
};
-#ifdef DEBUG
- struct olplus_tag * link;
-#endif
-} olplus;
+ IoCompleteFunc onIoDone; /* HL callback to execute */
+ struct refclockio * rio; /* RIO backlink (for offload) */
+ l_fp DCDSTime; /* timestamp of DCD set */
+ l_fp FlagTime; /* timestamp of flag/event char */
+ l_fp RecvTime; /* timestamp of callback */
+ DWORD errCode; /* error code of last I/O */
+ DWORD byteCount; /* byte count " */
+ DWORD com_events; /* buffer for COM events */
+ unsigned int flRawMem : 1; /* buffer is raw memory -> free */
+ unsigned int flTsDCDS : 1; /* DCDSTime valid? */
+ unsigned int flTsFlag : 1; /* FlagTime valid? */
+};
#ifdef _MSC_VER
# pragma warning(pop)
/*
* local function definitions
*/
- void ntpd_addremove_semaphore(HANDLE, int);
-static int QueueSerialWait(struct refclockio *, recvbuf_t *buff,
- olplus *lpo, BOOL clear_timestamp);
-static int QueueRawSerialRead(struct refclockio *, recvbuf_t *buff,
- olplus *lpo);
-static int OnSocketRecv(ULONG_PTR, olplus *, DWORD, int);
-static int OnSerialWaitComplete(struct refclockio *, olplus *,
- DWORD, int);
-static int OnSerialReadComplete(struct refclockio *, olplus *,
- DWORD, int);
-static int OnRawSerialReadComplete(struct refclockio *, olplus *,
- DWORD, int);
-static int OnWriteComplete(ULONG_PTR, olplus *, DWORD, int);
+static void ntpd_addremove_semaphore(HANDLE, int);
+
+/* Initiate/Request async IO operations */
+static BOOL QueueSerialWait (struct refclockio *, recvbuf_t *, IoCtx_t *);
+static BOOL QueueSerialRead (struct refclockio *, recvbuf_t *, IoCtx_t *);
+static BOOL QueueRawSerialRead(struct refclockio *, recvbuf_t *, IoCtx_t *);
+static BOOL QueueSocketRecv (SOCKET , recvbuf_t *, IoCtx_t *);
+
+
+/* High-level IO callback functions */
+static void OnSocketRecv (ULONG_PTR, IoCtx_t *);
+static void OnSerialWaitComplete (ULONG_PTR, IoCtx_t *);
+static void OnSerialReadComplete (ULONG_PTR, IoCtx_t *);
+static void OnRawSerialReadComplete(ULONG_PTR, IoCtx_t *);
+static void OnSerialWriteComplete (ULONG_PTR, IoCtx_t *);
+
+/* worker pool offload functions */
+static DWORD WINAPI OnSerialReadWorker(void * ctx);
/* keep a list to traverse to free memory on debug builds */
#ifdef DEBUG
static void free_io_completion_port_mem(void);
-olplus * compl_info_list;
-CRITICAL_SECTION compl_info_lock;
-#define LOCK_COMPL() EnterCriticalSection(&compl_info_lock)
-#define UNLOCK_COMPL() LeaveCriticalSection(&compl_info_lock)
#endif
-/* #define USE_HEAP */
-
-#ifdef USE_HEAP
-static HANDLE hHeapHandle = NULL;
-#endif
HANDLE WaitableExitEventHandle;
HANDLE WaitableIoEventHandle;
static HANDLE hIoCompletionPort;
-#ifdef NTPNEEDNAMEDHANDLE
-#define WAITABLEIOEVENTHANDLE "WaitableIoEventHandle"
-#else
-#define WAITABLEIOEVENTHANDLE NULL
-#endif
-
DWORD ActiveWaitHandles;
HANDLE WaitHandles[16];
-olplus *
-GetHeapAlloc(char *fromfunc)
-{
- olplus *lpo;
-
-#ifdef USE_HEAP
- lpo = HeapAlloc(hHeapHandle,
- HEAP_ZERO_MEMORY,
- sizeof(olplus));
-#else
- lpo = emalloc_zero(sizeof(*lpo));
-#endif
- DPRINTF(3, ("Allocation %d memory for %s, ptr %x\n", sizeof(olplus), fromfunc, lpo));
+/*
+ * -------------------------------------------------------------------
+ * We make a pool of our own for IO context objects -- the are owned by
+ * the system until a completion result is pulled from the queue, and
+ * they seriously go into the way of memory tracking until we can safely
+ * cancel an IO request.
+ * -------------------------------------------------------------------
+ */
+static HANDLE hHeapHandle;
-#ifdef DEBUG
- LOCK_COMPL();
- LINK_SLIST(compl_info_list, lpo, link);
- UNLOCK_COMPL();
-#endif
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * Create a new heap for IO context objects
+ */
+static void
+IoCtxPoolInit(size_t initObjs)
+{
+ hHeapHandle = HeapCreate(0, initObjs*sizeof(IoCtx_t), 0);
+ if (hHeapHandle == NULL)
+ {
+ msyslog(LOG_ERR, "Can't initialize Heap: %m");
+ exit(1);
+ }
- return (lpo);
}
-void
-FreeHeap(olplus *lpo, char *fromfunc)
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ *
+ * Delete the IO context heap
+ *
+ * Since we do not know what callbacks are pending, we just drop the
+ * pool into oblivion. New allocs and frees will fail from this moment,
+ * but we simply don't care. At least the normal heap dump stats will
+ * show no leaks from IO context blocks. On the downside, we have to
+ * track them ourselves if something goes wrong.
+ */
+static void
+IoCtxPoolDone()
{
-#ifdef DEBUG
- olplus *unlinked;
-
- DPRINTF(3, ("Freeing memory for %s, ptr %x\n", fromfunc, lpo));
+ hHeapHandle = NULL;
+}
- LOCK_COMPL();
- UNLINK_SLIST(unlinked, compl_info_list, lpo, link,
- olplus);
- UNLOCK_COMPL();
-#endif
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * Alloc & Free
+ *
+ * When the heap handle is NULL, these both will fail; Alloc with a NULL
+ * return and Free silently.
+ */
+static IoCtx_t *
+IoCtxAlloc(void)
+{
+ IoCtx_t *lpo;
-#ifdef USE_HEAP
- HeapFree(hHeapHandle, 0, lpo);
-#else
- free(lpo);
-#endif
+ if (hHeapHandle)
+ lpo = HeapAlloc(hHeapHandle, HEAP_ZERO_MEMORY, sizeof(IoCtx_t));
+ else
+ lpo = NULL;
+ DPRINTF(3, ("Allocate IO ctx, heap=%p, ptr=%p\n", hHeapHandle, lpo));
+ return (lpo);
}
-/* This function will add an entry to the I/O completion port
- * that will signal the I/O thread to exit (gracefully)
- */
static void
-signal_io_completion_port_exit()
+IoCtxFree(IoCtx_t *lpo)
{
- if (!PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, 0)) {
- msyslog(LOG_ERR, "Can't request service thread to exit: %m");
- exit(1);
- }
+ DPRINTF(3, ("Free IO ctx, heap=%p, ptr=%p\n", hHeapHandle, lpo));
+ if (lpo && hHeapHandle)
+ HeapFree(hHeapHandle, 0, lpo);
}
+
+/*
+ * -------------------------------------------------------------------
+ * The IO completion thread and support functions
+ *
+ * There is only one completion thread, because it is locked to the same
+ * core as the time interpolation. Having more than one causes core
+ * contention and is not useful.
+ * -------------------------------------------------------------------
+ */
+static HANDLE hIoCompletionThread;
+
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * The IO completion worker thread
+ *
+ * Note that this thread does not enter an alertable wait state and that
+ * the only waiting point is the IO completion port. If stopping this
+ * thread with a special queued result packet does not work,
+ * 'TerminateThread()' is the only remaining weapon in the arsenal. A
+ * dangerous weapon -- it's like SIGKILL.
+ */
static unsigned WINAPI
iocompletionthread(void *NotUsed)
{
- BOOL bSuccess;
- int err;
- DWORD octets;
- ULONG_PTR key;
- struct refclockio * rio;
- OVERLAPPED * pol;
- olplus * lpo;
+ DWORD err;
+ DWORD octets;
+ ULONG_PTR key;
+ OVERLAPPED * pol;
+ IoCtx_t * lpo;
UNUSED_ARG(NotUsed);
/*
- * socket and refclock receive call gettimeofday()
- * so the I/O thread needs to be on the same
- * processor as the main and timing threads
- * to ensure consistent QueryPerformanceCounter()
- * results.
+ * Socket and refclock receive call gettimeofday() so the I/O
+ * thread needs to be on the same processor as the main and
+ * timing threads to ensure consistent QueryPerformanceCounter()
+ * results.
+ *
+ * This gets seriously into the way of efficient thread pooling
+ * on multicore systems.
*/
lock_thread_to_processor(GetCurrentThread());
- /* Set the thread priority high enough so I/O will
- * preempt normal recv packet processing, but not
- * higher than the timer sync thread.
+ /*
+ * Set the thread priority high enough so I/O will preempt
+ * normal recv packet processing, but not higher than the timer
+ * sync thread.
*/
if (!SetThreadPriority(GetCurrentThread(),
THREAD_PRIORITY_ABOVE_NORMAL))
msyslog(LOG_ERR, "Can't set thread priority: %m");
- while (TRUE) {
- bSuccess = GetQueuedCompletionStatus(
+ for(;;) {
+ if (GetQueuedCompletionStatus(
hIoCompletionPort,
&octets,
&key,
&pol,
- INFINITE);
+ INFINITE))
+ err = ERROR_SUCCESS;
+ else
+ err = GetLastError();
if (NULL == pol) {
DPRINTF(2, ("Overlapped IO Thread Exiting\n"));
break; /* fail */
}
+ lpo = CONTAINEROF(pol, IoCtx_t, ol);
+ get_systime(&lpo->RecvTime);
+ lpo->byteCount = octets;
+ lpo->errCode = err;
handler_calls++;
- lpo = CONTAINEROF(pol, olplus, ol);
- rio = (struct refclockio *)key;
-
- /*
- * Deal with errors
- */
- if (bSuccess)
- err = 0;
- else
- err = GetLastError();
-
- /*
- * Invoke the appropriate function based on
- * the value of the request_type
- */
- switch (lpo->request_type) {
-
- case SOCK_RECV:
- OnSocketRecv(key, lpo, octets, err);
- break;
-
- case SOCK_SEND:
- NTP_INSIST(0);
- break;
-
- case SERIAL_WAIT:
- OnSerialWaitComplete(rio, lpo, octets, err);
- break;
-
- case SERIAL_READ:
- OnSerialReadComplete(rio, lpo, octets, err);
- break;
-
- case RAW_SERIAL_READ:
- OnRawSerialReadComplete(rio, lpo, octets, err);
- break;
-
- case SERIAL_WRITE:
- OnWriteComplete(key, lpo, octets, err);
- break;
-
- default:
- DPRINTF(1, ("Unknown request type %d found in completion port\n",
- lpo->request_type));
- break;
- }
+ (*lpo->onIoDone)(key, lpo);
}
return 0;
}
-/* Create/initialise the I/O creation port
+/*
+ * -------------------------------------------------------------------
+ * Create/initialise the I/O creation port
*/
void
init_io_completion_port(
void
)
{
- unsigned tid;
- HANDLE thread;
#ifdef DEBUG
- InitializeCriticalSection(&compl_info_lock);
atexit(&free_io_completion_port_mem);
#endif
-#ifdef USE_HEAP
- /*
- * Create a handle to the Heap
- */
- hHeapHandle = HeapCreate(0, 20*sizeof(olplus), 0);
- if (hHeapHandle == NULL)
- {
- msyslog(LOG_ERR, "Can't initialize Heap: %m");
- exit(1);
- }
-#endif
+ /* Create the context pool first. */
+ IoCtxPoolInit(20);
- /* Create the event used to signal an IO event
- */
- WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, WAITABLEIOEVENTHANDLE);
+ /* Create the event used to signal an IO event */
+ WaitableIoEventHandle = CreateEvent(
+ NULL, FALSE, FALSE, NULL);
if (WaitableIoEventHandle == NULL) {
msyslog(LOG_ERR,
"Can't create I/O event handle: %m - another process may be running - EXITING");
exit(1);
}
- /* Create the event used to signal an exit event
- */
+ /* Create the event used to signal an exit event */
WaitableExitEventHandle = CreateEvent(NULL, FALSE, FALSE, NULL);
if (WaitableExitEventHandle == NULL) {
msyslog(LOG_ERR,
exit(1);
}
- /* Create the IO completion port
- */
- hIoCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
+ /* Create the IO completion port */
+ hIoCompletionPort = CreateIoCompletionPort(
+ INVALID_HANDLE_VALUE, NULL, 0, 0);
if (hIoCompletionPort == NULL) {
msyslog(LOG_ERR, "Can't create I/O completion port: %m");
exit(1);
}
- /*
- * Initialize the Wait Handles
- */
+ /* Initialize the Wait Handles table */
WaitHandles[0] = WaitableIoEventHandle;
WaitHandles[1] = WaitableExitEventHandle; /* exit request */
WaitHandles[2] = WaitableTimerHandle;
addremove_io_semaphore = &ntpd_addremove_semaphore;
/*
- * Have one thread servicing I/O - there were 4, but this would
- * somehow cause NTP to stop replying to ntpq requests; TODO
+ * Have one thread servicing I/O. See rationale in front matter.
*/
- thread = (HANDLE)_beginthreadex(
+ hIoCompletionThread = (HANDLE)_beginthreadex(
NULL,
0,
iocompletionthread,
NULL,
- CREATE_SUSPENDED,
- &tid);
- ResumeThread(thread);
- CloseHandle(thread);
+ 0,
+ NULL);
}
-
+/*
+ * -------------------------------------------------------------------
+ * completion port teardown
+ */
void
-ntpd_addremove_semaphore(
- HANDLE sem,
- int remove
+uninit_io_completion_port(
+ void
)
{
- const size_t hnd_sz = sizeof(WaitHandles[0]);
- u_int hi;
+ /* do noting if completion port already gone. */
+ if (NULL == hIoCompletionPort)
+ return;
- if (!remove) {
- INSIST((ActiveWaitHandles + 1) < COUNTOF(WaitHandles));
- WaitHandles[ActiveWaitHandles] = sem;
- ActiveWaitHandles++;
+ /*
+ * Service thread seems running. Terminate him with grace
+ * first and force later...
+ */
- return;
+ /* post exit request for thread. */
+ PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, 0);
+ IoCtxPoolDone(); /* stop using the memory pool */
+
+ if (WAIT_TIMEOUT == WaitForSingleObject(hIoCompletionThread, 5000)) {
+ /* Everything is lost. Kill off with TerminateThread. */
+ msyslog(LOG_ERR, "IO completion thread refuses to terminate");
+ TerminateThread(hIoCompletionThread, ~0UL);
}
- /* removing semaphore */
+ /* now reap all handles... */
+ CloseHandle(hIoCompletionThread);
+ hIoCompletionThread = NULL;
+ CloseHandle(hIoCompletionPort);
+ hIoCompletionPort = NULL;
+}
+
+
+/*
+ * -------------------------------------------------------------------
+ * external worker thread support (wait handle stuff)
+ *
+ * !Attention!
+ *
+ * - This function must only be called from the main thread. Changing
+ * a set of wait handles while someone is waiting on it creates
+ * undefined behaviour. Also there's no provision for mutual
+ * exclusion when accessing global values.
+ *
+ * - It's not possible to register a handle that is already in the table.
+ */
+static void
+ntpd_addremove_semaphore(
+ HANDLE sem,
+ int remove
+ )
+{
+ DWORD hi;
+
+ /* search for a matching entry first. */
for (hi = 3; hi < ActiveWaitHandles; hi++)
if (sem == WaitHandles[hi])
break;
- if (hi == ActiveWaitHandles)
- return;
-
- ActiveWaitHandles--;
- if (hi < ActiveWaitHandles)
- memmove(&WaitHandles[hi],
- &WaitHandles[hi + 1],
- (ActiveWaitHandles - hi) * hnd_sz);
- WaitHandles[ActiveWaitHandles] = NULL;
+ if (remove) {
+ /*
+ * If found, eventually swap with last entry to keep
+ * the table dense.
+ */
+ if (hi < ActiveWaitHandles) {
+ ActiveWaitHandles--;
+ if (hi < ActiveWaitHandles)
+ WaitHandles[hi] =
+ WaitHandles[ActiveWaitHandles];
+ WaitHandles[ActiveWaitHandles] = NULL;
+ }
+ } else {
+ /*
+ * Make sure the entry is not found and there is enough
+ * room, then append to the table array.
+ */
+ if (hi >= ActiveWaitHandles) {
+ NTP_INSIST(ActiveWaitHandles < COUNTOF(WaitHandles));
+ WaitHandles[ActiveWaitHandles] = sem;
+ ActiveWaitHandles++;
+ }
+ }
}
void
)
{
- olplus * pci;
-
-#if defined(_MSC_VER) && defined (_DEBUG)
- _CrtCheckMemory();
-#endif
- LOCK_COMPL();
- while ((pci = compl_info_list) != NULL) {
-
-#if 0 /* sockaddr with received-from address in recvbuf */
- /* is sometimes modified by system after we free it */
- /* triggering heap corruption warning -- find a */
- /* better way to free it after I/O is surely done */
- /* this handles both xmit and recv buffs */
- if (pci->recv_buf != NULL) {
- DPRINTF(1, ("freeing xmit/recv buff %p\n", pci->recv_buf));
- free(pci->recv_buf);
- }
-#endif
-
- FreeHeap(pci, "free_io_completion_port_mem");
- /* FreeHeap() removed this item from compl_info_list */
- }
- UNLOCK_COMPL();
-
-#if defined(_MSC_VER) && defined (_DEBUG)
- _CrtCheckMemory();
-#endif
+ /*
+ * At the moment, do absolutely nothing. Returning memory here
+ * requires NO PENDING OVERLAPPED OPERATIONS AT ALL at this
+ * point in time, and as long we cannot be reasonable sure about
+ * that the simple advice is:
+ *
+ * HANDS OFF!
+ */
}
#endif /* DEBUG */
-void
-uninit_io_completion_port(
- void
+/*
+ * -------------------------------------------------------------------
+ * Serial IO stuff
+ *
+ * Prelude -- common error checking code
+ * -------------------------------------------------------------------
+ */
+extern char * NTstrerror(int err, BOOL *bfreebuf);
+
+static BOOL
+IoResultCheck(
+ DWORD err,
+ IoCtx_t * ctx,
+ const char * msg
)
{
- if (hIoCompletionPort != NULL) {
- /* Get each of the service threads to exit
- */
- signal_io_completion_port_exit();
+ char * msgbuf;
+ BOOL dynbuf;
+
+ /* If the clock is not / no longer active, assume
+ * 'ERROR_OPERATION_ABORTED' and do the necessary cleanup.
+ */
+ if (ctx->rio && !ctx->rio->active)
+ err = ERROR_OPERATION_ABORTED;
+
+ switch (err)
+ {
+ /* The first ones are no real errors. */
+ case ERROR_SUCCESS: /* all is good */
+ case ERROR_IO_PENDING: /* callback pending */
+ return TRUE;
+
+ /* the next ones go silently -- only cleanup is done */
+ case ERROR_INVALID_PARAMETER: /* handle already closed */
+ case ERROR_OPERATION_ABORTED: /* handle closed while wait */
+ break;
+
+
+ default:
+ /* We have to resort to the low level error formatting
+ * functions here, since the error code can be an
+ * overlapped result. Relying the value to be the same
+ * as the 'GetLastError()' result at this point of
+ * execution is shaky at best, and using SetLastError()
+ * to force it seems too nasty.
+ */
+ msgbuf = NTstrerror(err, &dynbuf);
+ msyslog(LOG_ERR, "%s: err=%u, '%s'", msg, err, msgbuf);
+ if (dynbuf)
+ LocalFree(msgbuf);
+ break;
+ }
+
+ /* If we end here, we have to mop up the buffer and context */
+ if (ctx->flRawMem) {
+ if (ctx->trans_buf)
+ free(ctx->trans_buf);
+ } else {
+ if (ctx->recv_buf)
+ freerecvbuf(ctx->recv_buf);
}
+ IoCtxFree(ctx);
+ return FALSE;
}
+/*
+ * -------------------------------------------------------------------
+ * Serial IO stuff
+ *
+ * Part 1 -- COMM event handling
+ * -------------------------------------------------------------------
+ */
-static int
+static BOOL
QueueSerialWait(
struct refclockio * rio,
recvbuf_t * buff,
- olplus * lpo,
- BOOL clear_timestamp
+ IoCtx_t * lpo
)
{
- DWORD err;
+ BOOL rc;
- lpo->request_type = SERIAL_WAIT;
+ lpo->onIoDone = OnSerialWaitComplete;
lpo->recv_buf = buff;
-
- if (clear_timestamp)
- ZERO(buff->recv_time);
-
- buff->fd = _get_osfhandle(rio->fd);
- if (!WaitCommEvent((HANDLE)buff->fd,
- (DWORD *)&buff->recv_buffer, &lpo->ol)) {
- err = GetLastError();
- if (ERROR_IO_PENDING != err) {
- /*
- * ERROR_INVALID_PARAMETER occurs after rio->fd
- * is closed.
- */
- if (ERROR_INVALID_PARAMETER != err)
- msyslog(LOG_ERR, "Can't wait on Refclock: %m");
- freerecvbuf(buff);
- return 0;
- }
- }
- return 1;
+ lpo->flRawMem = 0;
+ lpo->rio = rio;
+ buff->fd = rio->fd;
+
+ rc = WaitCommEvent((HANDLE)_get_osfhandle(rio->fd),
+ &lpo->com_events, &lpo->ol);
+ if (!rc)
+ return IoResultCheck(GetLastError(), lpo,
+ "Can't wait on Refclock");
+ return TRUE;
}
+/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */
-static int
+static void
OnSerialWaitComplete(
- struct refclockio * rio,
- olplus * lpo,
- DWORD Bytes,
- int errstatus
+ ULONG_PTR key,
+ IoCtx_t * lpo
)
{
- recvbuf_t *buff;
- struct peer *pp;
- l_fp arrival_time;
- DWORD comm_mask;
- DWORD modem_status;
- static const l_fp zero_time = { 0 };
- BOOL rc;
-
- get_systime(&arrival_time);
+ struct refclockio * rio;
+ recvbuf_t * buff;
+ DWORD modem_status;
- if (!rio->active)
- return FALSE;
+ /* check and bail out if operation failed */
+ if (!IoResultCheck(lpo->errCode, lpo,
+ "WaitCommEvent failed"))
+ return;
- /*
- * Get the recvbuf pointer from the overlapped buffer.
- */
+ /* get & validate context and buffer. */
+ rio = lpo->rio;
buff = lpo->recv_buf;
- comm_mask = (*(DWORD *)&buff->recv_buffer);
+ NTP_INSIST((ULONG_PTR)rio == key);
+
#ifdef DEBUG
- if (errstatus || comm_mask &
- ~(EV_RXFLAG | EV_RLSD | EV_RXCHAR)) {
- msyslog(LOG_ERR, "WaitCommEvent returned unexpected mask %x errstatus %d",
- comm_mask, errstatus);
- exit(-1);
- }
+ if (lpo->com_events & ~(EV_RXFLAG | EV_RLSD | EV_RXCHAR)) {
+ msyslog(LOG_ERR, "WaitCommEvent returned unexpected mask %x",
+ lpo->com_events);
+ exit(-1);
+ }
#endif
- if (EV_RLSD & comm_mask) {
- modem_status = 0;
- GetCommModemStatus((HANDLE)buff->fd, &modem_status);
- if (modem_status & MS_RLSD_ON) {
- /*
- * Use the timestamp from this PPS CD not
- * the later end of line.
- */
- buff->recv_time = arrival_time;
- }
-
- if (EV_RLSD == comm_mask) {
- /*
- * if we didn't see an end of line yet
- * issue another wait for it.
- */
- QueueSerialWait(rio, buff, lpo, FALSE);
- return TRUE;
- }
- }
-
- if (EV_RXCHAR & comm_mask) { /* raw discipline */
- QueueRawSerialRead(rio, buff, lpo);
- return TRUE;
- }
-
- /*
- * We've detected the end of line of serial input.
- * Use this timestamp unless we already have a CD PPS
- * timestamp in buff->recv_time.
- */
- if (memcmp(&buff->recv_time, &zero_time, sizeof buff->recv_time)) {
- /*
- * We will first see a user PPS timestamp here on either
- * the first or second line of text. Log a one-time
- * message while processing the second line.
- */
- if (1 == rio->recvcount) {
- pp = (struct peer *)rio->srcclock;
- msyslog(LOG_NOTICE, "Using user-mode PPS timestamp for %s",
- refnumtoa(&pp->srcadr));
- }
- } else {
- buff->recv_time = arrival_time;
+ /* Take note of changes on DCD; 'user space PPS' */
+ if (EV_RLSD & lpo->com_events) {
+ modem_status = 0;
+ GetCommModemStatus((HANDLE)buff->fd, &modem_status);
+ if (modem_status & MS_RLSD_ON) {
+ lpo->DCDSTime = lpo->RecvTime;
+ lpo->flTsDCDS = 1;
}
+ }
- /*
- * Now that we have a complete line waiting, read it.
- * There is still a race here, but we're likely to win.
- */
+ /* If IO ready, read data. Go back waiting else. */
+ if (EV_RXFLAG & lpo->com_events) { /* line discipline */
+ lpo->FlagTime = lpo->RecvTime;
+ lpo->flTsFlag = 1;
+ QueueSerialRead(rio, buff, lpo);
+ } else if (EV_RXCHAR & lpo->com_events) { /* raw discipline */
+ QueueRawSerialRead(rio, buff, lpo);
+ } else { /* idle... */
+ QueueSerialWait(rio, buff, lpo);
+ }
+}
- lpo->request_type = SERIAL_READ;
- rc = ReadFile(
- (HANDLE)buff->fd,
- buff->recv_buffer,
- sizeof(buff->recv_buffer),
- NULL,
- &lpo->ol);
+/*
+ * -------------------------------------------------------------------
+ * Serial IO stuff
+ *
+ * Part 2 -- line discipline emulation
+ *
+ * Ideally this should *not* be done in the IO completion thread.
+ * We use a worker pool thread to offload the low-level processing.
+ * -------------------------------------------------------------------
+ */
- if (!rc && ERROR_IO_PENDING != GetLastError()) {
- msyslog(LOG_ERR, "Can't read from Refclock: %m");
- freerecvbuf(buff);
- return 0;
- }
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * Start & Queue a serial read for line discipline emulation.
+ */
+static BOOL
+QueueSerialRead(
+ struct refclockio * rio,
+ recvbuf_t * buff,
+ IoCtx_t * lpo
+ )
+{
+ BOOL rc;
- return 1;
+ lpo->onIoDone = OnSerialReadComplete;
+ lpo->recv_buf = buff;
+ lpo->flRawMem = 0;
+ lpo->rio = rio;
+ buff->fd = rio->fd;
+
+ rc = ReadFile((HANDLE)_get_osfhandle(rio->fd),
+ (char*)buff->recv_buffer + buff->recv_length,
+ sizeof(buff->recv_buffer) - buff->recv_length,
+ NULL, &lpo->ol);
+ if (!rc)
+ return IoResultCheck(GetLastError(), lpo,
+ "Can't read from Refclock");
+ return TRUE;
}
-
-/* Return 1 on Successful Read */
-static int
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * IO completion thread callback. Takes a time stamp and offloads the
+ * real work to the worker pool ASAP.
+ */
+static void
OnSerialReadComplete(
- struct refclockio * rio,
- olplus * lpo,
- DWORD Bytes,
- int errstatus
+ ULONG_PTR key,
+ IoCtx_t * lpo
)
{
+ struct refclockio * rio;
recvbuf_t * buff;
- l_fp cr_time;
- int consumed;
- if (!rio->active)
- return FALSE;
+ /* check and bail out if operation failed */
+ if (!IoResultCheck(lpo->errCode, lpo,
+ "Read from Refclock failed"))
+ return;
- /*
- * Get the recvbuf pointer from the overlapped buffer.
- */
+ /* get & validate context and buffer. */
+ rio = lpo->rio;
buff = lpo->recv_buf;
+ NTP_INSIST((ULONG_PTR)rio == key);
+
+ /* Offload to worker pool */
+ if (!QueueUserWorkItem(OnSerialReadWorker, lpo, WT_EXECUTEDEFAULT)) {
+ msyslog(LOG_ERR,
+ "Can't offload to worker thread, will skip data: %m");
+ ZERO(*lpo);
+ buff->recv_length = 0;
+ QueueSerialWait(rio, buff, lpo);
+ }
+}
+
+/*
+ * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+ * Worker pool offload function -- avoid lengthy operations in the IO
+ * completion thread (affects timing...)
+ *
+ * This function does the real work of emulating the UN*X line
+ * discipline. Since this involves allocation of additional buffers and
+ * string parsing/copying, it is offloaded to the worker thread pool so
+ * the IO completion thread can resume faster.
+ */
+static DWORD WINAPI
+OnSerialReadWorker(void * ctx)
+{
+ IoCtx_t * lpo;
+ recvbuf_t * buff, *obuf;
+ struct refclockio * rio;
+ char *sptr, *send, *dptr;
+ BOOL eol;
+ char ch;
+ /* Get context back */
+ lpo = (IoCtx_t*)ctx;
+ buff = lpo->recv_buf;
+ rio = lpo->rio;
/*
- * ignore 0 bytes read due to timeout's and closure on fd
+ * ignore 0 bytes read due to closure on fd.
+ * Eat the first line of input as it's possibly partial.
*/
- if (!errstatus && Bytes) {
- buff->recv_length = (int) Bytes;
- buff->receiver = process_refclock_packet;
- buff->dstadr = NULL;
- buff->recv_peer = rio->srcclock;
+ if (lpo->byteCount && rio->recvcount++) {
+ /* account for additional input */
+ buff->recv_length += (int)lpo->byteCount;
+
/*
- * Eat the first line of input as it's possibly
- * partial and if a PPS is present, it may not
- * have fired since the port was opened.
+ * Now mimic the Unix line discipline.
*/
- if (rio->recvcount++ > 0) {
- cr_time = buff->recv_time;
- buff->fd = rio->fd; /* was handle */
- add_full_recv_buffer(buff);
+ sptr = (char *)buff->recv_buffer;
+ send = sptr + buff->recv_length;
+ obuf = NULL;
+ dptr = NULL;
+ while (sptr != send)
+ {
+ /* get new buffer to store line */
+ obuf = get_free_recv_buffer_alloc();
+ obuf->recv_time = lpo->RecvTime;
+ obuf->recv_length = 0;
+ obuf->fd = rio->fd;
+ obuf->receiver = process_refclock_packet;
+ obuf->dstadr = NULL;
+ obuf->recv_peer = rio->srcclock;
+
+ /*
+ * Time stamp assignment is interesting. If we
+ * have a DCD stamp, we use it, otherwise we use
+ * the FLAG char event time, and if that is also
+ * not / no longer available we use the arrival
+ * time.
+ */
+ if (lpo->flTsDCDS)
+ obuf->recv_time = lpo->DCDSTime;
+ else if (lpo->flTsFlag)
+ obuf->recv_time = lpo->FlagTime;
+ else
+ obuf->recv_time = lpo->RecvTime;
+ lpo->flTsDCDS = lpo->flTsFlag = 0; /* use only once... */
+
/*
- * Mimic Unix line discipline and assume CR/LF
- * line termination. On Unix the CR terminates
- * the line containing the timecode, and
- * immediately after the LF terminates an empty
- * line. So synthesize the empty LF-terminated
- * line using the same CR timestamp. Both CR
- * and LF are stripped by refclock_gtlin().
+ * Copy data to new buffer, convert CR to LF on
+ * the fly. Stop after LF.
*/
- buff = get_free_recv_buffer_alloc();
- buff->recv_time = cr_time;
+ dptr = (char *)obuf->recv_buffer;
+ eol = FALSE;
+ while (sptr != send && !eol) {
+ ch = *sptr++;
+ if (ch == '\r')
+ ch = '\n';
+ *dptr++ = ch;
+ eol = (ch == '\n');
+ }
+ obuf->recv_length =
+ (int)(dptr - (char *)obuf->recv_buffer);
+
+ /*
+ * If NL found, push this buffer and prepare to
+ * get a new one.
+ */
+ if (eol) {
+ add_full_recv_buffer(obuf);
+ SetEvent(WaitableIoEventHandle);
+ obuf = NULL;
+ }
+ }
+
+ /*
+ * If we still have an output buffer, continue to fill
+ * it again.
+ */
+ if (obuf) {
+ obuf->recv_length =
+ (int)(dptr - (char *)obuf->recv_buffer);
+ freerecvbuf(buff);
+ buff = obuf;
+ } else {
+ /* clear the current buffer, continue */
buff->recv_length = 0;
- buff->fd = rio->fd;
- buff->receiver = process_refclock_packet;
- buff->dstadr = NULL;
- buff->recv_peer = rio->srcclock;
- add_full_recv_buffer(buff);
- SetEvent(WaitableIoEventHandle);
- buff = get_free_recv_buffer_alloc();
}
+ } else {
+ ZERO(*lpo);
+ buff->recv_length = 0;
}
- QueueSerialWait(rio, buff, lpo, TRUE);
-
- return 1;
+ QueueSerialWait(rio, buff, lpo);
+ return 0;
}
/*
- * QueueRawSerialRead() returns TRUE if successful.
+ * -------------------------------------------------------------------
+ * Serial IO stuff
+ *
+ * Part 3 -- raw data input
+ *
+ * Raw data processing is fast enough to do without offloading to the
+ * worker pool, so this is rather short'n sweet...
+ * -------------------------------------------------------------------
*/
-static int
+
+static BOOL
QueueRawSerialRead(
struct refclockio * rio,
recvbuf_t * buff,
- olplus * lpo
+ IoCtx_t * lpo
)
{
- BOOL rc;
+ BOOL rc;
- buff->fd = _get_osfhandle(rio->fd);
- lpo->request_type = RAW_SERIAL_READ;
+ lpo->onIoDone = OnRawSerialReadComplete;
lpo->recv_buf = buff;
- rc = ReadFile((HANDLE)buff->fd, buff->recv_buffer,
- sizeof(buff->recv_buffer), NULL, &lpo->ol);
- if (!rc && ERROR_IO_PENDING != GetLastError()) {
- msyslog(LOG_ERR, "Can't raw read from Refclock: %m");
- freerecvbuf(buff);
- return FALSE;
- }
-
+ lpo->flRawMem = 0;
+ lpo->rio = rio;
+ buff->fd = rio->fd;
+
+ rc = ReadFile((HANDLE)_get_osfhandle(rio->fd),
+ buff->recv_buffer,
+ sizeof(buff->recv_buffer),
+ NULL, &lpo->ol);
+ if (!rc)
+ return IoResultCheck(GetLastError(), lpo,
+ "Can't read raw from Refclock");
return TRUE;
}
-static int
+static void
OnRawSerialReadComplete(
- struct refclockio * rio,
- olplus * lpo,
- DWORD octets,
- int errstatus
+ ULONG_PTR key,
+ IoCtx_t * lpo
)
{
- recvbuf_t * rbufp;
- l_fp arrival_time;
- int consumed;
-
- get_systime(&arrival_time);
+ struct refclockio * rio;
+ recvbuf_t * buff;
- if (!rio->active)
- return FALSE;
+ /* check and bail out if operation failed */
+ if (!IoResultCheck(lpo->errCode, lpo,
+ "Raw read from Refclock failed"))
+ return;
- rbufp = lpo->recv_buf;
+ /* get & validate context and buffer. */
+ rio = lpo->rio;
+ buff = lpo->recv_buf;
+ NTP_INSIST((ULONG_PTR)rio == key);
/* ignore 0 bytes read. */
- if (NO_ERROR == errstatus && octets > 0) {
- rbufp->recv_length = (int)octets;
- rbufp->dstadr = NULL;
- rbufp->recv_time = arrival_time;
- rbufp->receiver = process_refclock_packet;
- rbufp->recv_peer = rio->srcclock;
- rbufp->fd = rio->fd; /* was handle */
-
- add_full_recv_buffer(rbufp);
+ if (lpo->byteCount > 0) {
+ buff->recv_length = (int)lpo->byteCount;
+ buff->dstadr = NULL;
+ buff->recv_time = lpo->RecvTime;
+ buff->receiver = process_refclock_packet;
+ buff->recv_peer = rio->srcclock;
+ add_full_recv_buffer(buff);
SetEvent(WaitableIoEventHandle);
- rbufp = get_free_recv_buffer_alloc();
+ buff = get_free_recv_buffer_alloc();
+ }
+
+ buff->recv_length = 0;
+ QueueSerialWait(rio, buff, lpo);
+}
+
+/*
+ * -------------------------------------------------------------------
+ * Serial IO stuff
+ *
+ * Part 4 -- Overlapped serial output
+ *
+ * Again, no need to offload any work.
+ * -------------------------------------------------------------------
+ */
+
+/*
+ * async_write, clone of write(), used by some reflock drivers
+ */
+int
+async_write(
+ int fd,
+ const void * data,
+ unsigned int count
+ )
+{
+ IoCtx_t * lpo;
+ BOOL rc;
+
+ lpo = IoCtxAlloc();
+ if (lpo == NULL) {
+ DPRINTF(1, ("async_write: out of memory\n"));
+ errno = ENOMEM;
+ return -1;
}
- QueueSerialWait(rio, rbufp, lpo, TRUE);
+ lpo->onIoDone = OnSerialWriteComplete;
+ lpo->trans_buf = emalloc(count);
+ lpo->flRawMem = 1;
+ memcpy(lpo->trans_buf, data, count);
+
+ rc = WriteFile((HANDLE)_get_osfhandle(fd),
+ lpo->trans_buf, count,
+ NULL, &lpo->ol);
+ if (!rc && !IoResultCheck(GetLastError(), lpo,
+ "Can't write to Refclock")) {
+ errno = EBADF;
+ return -1;
+ }
+ return count;
+}
- return 1;
+static void
+OnSerialWriteComplete(
+ ULONG_PTR key,
+ IoCtx_t * lpo
+ )
+{
+ /* set RIO and force silent cleanup if no error */
+ lpo->rio = (struct refclockio *)key;
+ if (ERROR_SUCCESS == lpo->errCode)
+ lpo->errCode = ERROR_OPERATION_ABORTED;
+ IoResultCheck(lpo->errCode, lpo,
+ "Write to Refclock failed");
}
-/* Add a reference clock data structures I/O handles to
- * the I/O completion port. Return 1 if any error.
+
+
+/*
+ * Add a reference clock data structures I/O handles to
+ * the I/O completion port. Return 1 if any error.
*/
int
io_completion_port_add_clock_io(
struct refclockio *rio
)
{
- olplus * lpo;
+ IoCtx_t * lpo;
recvbuf_t * buff;
HANDLE h;
return 1;
}
- lpo = GetHeapAlloc("io_completion_port_add_clock_io");
+ lpo = IoCtxAlloc();
if (NULL == lpo) {
msyslog(LOG_ERR, "Can't allocate heap for completion port: %m");
return 1;
}
-
buff = get_free_recv_buffer_alloc();
- QueueSerialWait(rio, buff, lpo, TRUE);
+ buff->recv_length = 0;
+ QueueSerialWait(rio, buff, lpo);
return 0;
}
* Note: As per the winsock documentation, we use WSARecvFrom. Using
* ReadFile() is less efficient.
*/
-static unsigned long
+static BOOL
QueueSocketRecv(
- SOCKET s,
- recvbuf_t *buff,
- olplus *lpo
+ SOCKET s,
+ recvbuf_t * buff,
+ IoCtx_t * lpo
)
{
WSABUF wsabuf;
- DWORD Flags;
- DWORD Result;
+ DWORD Flags;
+ int rc;
- lpo->request_type = SOCK_RECV;
+ lpo->onIoDone = OnSocketRecv;
lpo->recv_buf = buff;
-
- if (buff != NULL) {
- Flags = 0;
- buff->fd = s;
- buff->recv_srcadr_len = sizeof(buff->recv_srcadr);
- wsabuf.buf = (char *)buff->recv_buffer;
- wsabuf.len = sizeof(buff->recv_buffer);
-
- if (SOCKET_ERROR == WSARecvFrom(buff->fd, &wsabuf, 1,
- NULL, &Flags,
- &buff->recv_srcadr.sa,
- &buff->recv_srcadr_len,
- &lpo->ol, NULL)) {
- Result = GetLastError();
- switch (Result) {
- case NO_ERROR :
- case WSA_IO_PENDING :
- break ;
-
- case WSAENOTSOCK :
- msyslog(LOG_ERR, "Can't read from non-socket fd %d: %m", (int)buff->fd);
- /* return the buffer */
- freerecvbuf(buff);
- return 0;
- break;
-
- case WSAEFAULT :
- msyslog(LOG_ERR, "The buffers parameter is incorrect: %m");
- /* return the buffer */
- freerecvbuf(buff);
- return 0;
- break;
-
- default :
- /* nop */ ;
- }
- }
- }
- else
- return 0;
- return 1;
+ lpo->flRawMem = 0;
+ lpo->rio = NULL;
+
+ Flags = 0;
+ buff->fd = s;
+ buff->recv_srcadr_len = sizeof(buff->recv_srcadr);
+ wsabuf.buf = (char *)buff->recv_buffer;
+ wsabuf.len = sizeof(buff->recv_buffer);
+
+ rc = WSARecvFrom(buff->fd, &wsabuf, 1, NULL, &Flags,
+ &buff->recv_srcadr.sa, &buff->recv_srcadr_len,
+ &lpo->ol, NULL);
+ if (SOCKET_ERROR == rc)
+ return IoResultCheck(GetLastError(), lpo,
+ "Can't read from Socket");
+ return TRUE;
}
-/* Returns 0 if any Error */
-static int
-OnSocketRecv(ULONG_PTR i, olplus *lpo, DWORD Bytes, int errstatus)
+static void
+OnSocketRecv(
+ ULONG_PTR key,
+ IoCtx_t * lpo
+ )
{
- struct recvbuf *buff = NULL;
- recvbuf_t *newbuff;
- l_fp arrival_time;
- struct interface * inter = (struct interface *) i;
+ recvbuf_t * buff;
+ recvbuf_t * newbuff;
+ struct interface * inter = (struct interface *)key;
- get_systime(&arrival_time);
-
NTP_REQUIRE(NULL != lpo);
NTP_REQUIRE(NULL != lpo->recv_buf);
+ /* check and bail out if operation failed */
+ if (!IoResultCheck(lpo->errCode, lpo,
+ "Read from Socket failed"))
+ return;
+
/*
* Convert the overlapped pointer back to a recvbuf pointer.
+ * Fetch items that are lost when the context is queued again.
*/
buff = lpo->recv_buf;
-
- /*
- * If the socket is closed we get an Operation Aborted error
- * Just clean up
- */
- if (errstatus == WSA_OPERATION_ABORTED)
- {
- freerecvbuf(buff);
- lpo->recv_buf = NULL;
- FreeHeap(lpo, "OnSocketRecv: Socket Closed");
- return (1);
- }
+ buff->recv_time = lpo->RecvTime;
+ buff->recv_length = (int)lpo->byteCount;
/*
* Get a new recv buffer for the replacement socket receive
*/
newbuff = get_free_recv_buffer_alloc();
- QueueSocketRecv(inter->fd, newbuff, lpo);
-
+ if (NULL != newbuff) {
+ QueueSocketRecv(inter->fd, newbuff, lpo);
+ } else {
+ IoCtxFree(lpo);
+ msyslog(LOG_ERR, "Can't add I/O request to socket");
+ }
DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n",
(MODE_BROADCAST == get_packet_mode(buff))
? " **** Broadcast "
/*
* If we keep it add some info to the structure
*/
- if (Bytes && !inter->ignore_packets) {
+ if (buff->recv_length && !inter->ignore_packets) {
NTP_INSIST(buff->recv_srcadr_len <=
sizeof(buff->recv_srcadr));
- memcpy(&buff->recv_time, &arrival_time,
- sizeof(buff->recv_time));
- buff->recv_length = (int) Bytes;
buff->receiver = &receive;
- buff->dstadr = inter;
+ buff->dstadr = inter;
packets_received++;
handler_pkts++;
inter->received++;
SetEvent(WaitableIoEventHandle);
} else
freerecvbuf(buff);
-
- return 1;
}
-/* Add a socket handle to the I/O completion port, and send
- * NTP_RECVS_PER_SOCKET recv requests to the kernel.
+/*
+ * Add a socket handle to the I/O completion port, and send
+ * NTP_RECVS_PER_SOCKET recv requests to the kernel.
*/
-extern int
-io_completion_port_add_socket(SOCKET fd, struct interface *inter)
+int
+io_completion_port_add_socket(
+ SOCKET fd,
+ struct interface * inter
+ )
{
- olplus *lpo;
- recvbuf_t *buff;
- int n;
+ IoCtx_t * lpo;
+ recvbuf_t * buff;
+ int n;
if (fd != INVALID_SOCKET) {
if (NULL == CreateIoCompletionPort((HANDLE)fd,
hIoCompletionPort, (ULONG_PTR)inter, 0)) {
- msyslog(LOG_ERR, "Can't add socket to i/o completion port: %m");
+ msyslog(LOG_ERR,
+ "Can't add socket to i/o completion port: %m");
return 1;
}
}
for (n = 0; n < WINDOWS_RECVS_PER_SOCKET; n++) {
buff = get_free_recv_buffer_alloc();
- lpo = (olplus *) GetHeapAlloc("io_completion_port_add_socket");
+ lpo = IoCtxAlloc();
if (lpo == NULL)
{
- msyslog(LOG_ERR, "Can't allocate heap for completion port: %m");
+ msyslog(LOG_ERR
+ , "Can't allocate IO completion context: %m");
return 1;
}
}
-static int
-OnWriteComplete(ULONG_PTR i, olplus *lpo, DWORD Bytes, int errstatus)
-{
- void *buff;
-
- UNUSED_ARG(i);
- UNUSED_ARG(Bytes);
- UNUSED_ARG(errstatus);
-
- buff = lpo->trans_buf;
- free(buff);
- lpo->trans_buf = NULL;
- FreeHeap(lpo, "OnWriteComplete");
-
- return 1;
-}
-
-
/*
* io_completion_port_sendto() -- sendto() replacement for Windows
*
Result = WSASendTo(fd, &wsabuf, 1, &octets_sent, 0,
&dest->sa, AddrLen, NULL, NULL);
-
+ errval = GetLastError();
if (SOCKET_ERROR == Result) {
- errval = GetLastError();
if (ERROR_UNEXP_NET_ERR == errval) {
/*
* We get this error when trying to send if the
}
-/*
- * async_write, clone of write(), used by some reflock drivers
- */
-int
-async_write(
- int fd,
- const void *data,
- unsigned int count
- )
-{
- void *buff;
- olplus *lpo;
- DWORD BytesWritten;
-
- buff = emalloc(count);
- lpo = GetHeapAlloc("async_write");
- if (lpo == NULL) {
- free(buff);
- DPRINTF(1, ("async_write: out of memory\n"));
- errno = ENOMEM;
- return -1;
- }
-
- lpo->request_type = SERIAL_WRITE;
- lpo->trans_buf = buff;
- memcpy(buff, data, count);
-
- if (!WriteFile((HANDLE)_get_osfhandle(fd), buff, count,
- &BytesWritten, &lpo->ol)
- && ERROR_IO_PENDING != GetLastError()) {
-
- msyslog(LOG_ERR, "async_write - error %m");
- free(buff);
- FreeHeap(lpo, "async_write");
- errno = EBADF;
- return -1;
- }
-
- return count;
-}
-
/*
* GetReceivedBuffers
* Note that this is in effect the main loop for processing requests
* both send and receive. This should be reimplemented
*/
-int GetReceivedBuffers()
+int
+GetReceivedBuffers()
{
DWORD index;
HANDLE ready;
break;
case WAIT_TIMEOUT:
- msyslog(LOG_ERR, "ntpd: WaitForMultipleObjects INFINITE timed out.");
+ msyslog(LOG_ERR,
+ "WaitForMultipleObjects INFINITE timed out.");
exit(1);
break;
case WAIT_FAILED:
- msyslog(LOG_ERR, "ntpd: WaitForMultipleObjects Failed: Error: %m");
+ msyslog(LOG_ERR,
+ "WaitForMultipleObjects Failed: Error: %m");
exit(1);
break;