From: Juergen Perlinger Date: Sat, 18 Feb 2012 01:41:22 +0000 (+0100) Subject: [Bug 2140] Rework of Windows I/O completion port handling to avoid garbling serial... X-Git-Tag: NTP_4_2_7P258~7 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1153c719d0b554d643ab6afef6ed86d84e999d75;p=thirdparty%2Fntp.git [Bug 2140] Rework of Windows I/O completion port handling to avoid garbling serial input in UNIX line discipline emulation. bk: 4f3f01c2forj9sGdi9t78wQdVjBjZQ --- diff --git a/ChangeLog b/ChangeLog index e3056d226..35c405193 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,5 @@ +* [Bug 2140] Rework of Windows I/O completion port handling to avoid + garbling serial input in UNIX line discipline emulation. (4.2.7p257) 2012/02/17 Released by Harlan Stenn * [Bug 2135] defer calls to 'io_input' to main thread under Windows. (4.2.7p256) 2012/02/08 Released by Harlan Stenn diff --git a/ports/winnt/ntpd/ntp_iocompletionport.c b/ports/winnt/ntpd/ntp_iocompletionport.c index ae59486fe..2b08ce973 100644 --- a/ports/winnt/ntpd/ntp_iocompletionport.c +++ b/ports/winnt/ntpd/ntp_iocompletionport.c @@ -1,3 +1,51 @@ +/* +----------------------------------------------------------------------- +This is the IO completion port handling for async/overlapped IO on +Windows >= Win2000. + +Some notes on the implementation: + ++ Only one thread is used to serve the IO completion port, for several + reasons: + + * First, there seems to be (have been?) trouble that locked up NTPD + when more than one thread was used for IOCPL. + + * Second, for the sake of the time stamp interpolation the threads + must run on the same CPU as the time interpolation thread. This + makes using more than one thread useless, as they would compete for + the same core and create contention. + ++ Some IO operations need a possibly lengthy postprocessing. Emulating + the UN*X line discipline is currently the only but prominent example. + To avoid the processing in the time-critical IOCPL thread, longer + processing is offloaded the worker thread pool. + ++ A fact that seems not as well-known as it should be is that all + ressources passed to an overlapped IO operation must be considered + owned by the OS until the result has been fetched/dequeued. This + includes all overlapped structures and buffers involved, so cleaning + up on shutdown must be carefully constructed. (This includes closing + all the IO handles and waiting for the results to be dequeued. + 'CancleIo()' cannot be used since it's broken beyond repair.) + + If this is not possible, then all ressources should be dropped into + oblivion -- otherwise "bad things (tm)" are bound to happen. + + Using a private heap that is silently dropped but not deleted is a + good way to avoid cluttering memory stats with IO context related + objects. Leak tracing becomes more interesting, though. + + +The current implementation is based on the work of Danny Mayer who wrote +the original implementation and Dave Hart who improved on the serial I/O +routines. This version still provides the 'user space PPS' emulation +feature. + +Juergen Perlinger (perlinger@ntp.org) Feb 2012 + +----------------------------------------------------------------------- +*/ #ifdef HAVE_CONFIG_H # include #endif @@ -21,34 +69,48 @@ #define CONTAINEROF(p, type, member) \ ((type *)((char *)(p) - offsetof(type, member))) -/* - * Request types - */ -enum { - SOCK_RECV, - SOCK_SEND, - SERIAL_WAIT, - SERIAL_READ, - RAW_SERIAL_READ, - SERIAL_WRITE -}; - #ifdef _MSC_VER # pragma warning(push) # pragma warning(disable: 201) /* nonstd extension nameless union */ #endif -typedef struct olplus_tag { - OVERLAPPED ol; - int request_type; +/* + * --------------------------------------------------------------------- + * I/O context structure + * + * This is an extended overlapped structure. Some fields are only used + * for serial I/O, others are used for all operations. The serial I/O is + * more interesting since the same context object is used for waiting, + * actual I/O and possibly offload processing in a worker thread until + * a complete operation cycle is done. + * + * In this case the I/O context is used to gather all the bits that are + * finally needed for the processing of the buffer. + * --------------------------------------------------------------------- + */ +struct IoCtx; +typedef struct IoCtx IoCtx_t; + +typedef void (*IoCompleteFunc)(ULONG_PTR, IoCtx_t *); + +struct IoCtx { + OVERLAPPED ol; /* 'kernel' part of the context */ union { - recvbuf_t * recv_buf; - void * trans_buf; + recvbuf_t * recv_buf; /* incoming -> buffer structure */ + void * trans_buf; /* outgoing -> char array */ }; -#ifdef DEBUG - struct olplus_tag * link; -#endif -} olplus; + IoCompleteFunc onIoDone; /* HL callback to execute */ + struct refclockio * rio; /* RIO backlink (for offload) */ + l_fp DCDSTime; /* timestamp of DCD set */ + l_fp FlagTime; /* timestamp of flag/event char */ + l_fp RecvTime; /* timestamp of callback */ + DWORD errCode; /* error code of last I/O */ + DWORD byteCount; /* byte count " */ + DWORD com_events; /* buffer for COM events */ + unsigned int flRawMem : 1; /* buffer is raw memory -> free */ + unsigned int flTsDCDS : 1; /* DCDSTime valid? */ + unsigned int flTsFlag : 1; /* FlagTime valid? */ +}; #ifdef _MSC_VER # pragma warning(pop) @@ -57,236 +119,215 @@ typedef struct olplus_tag { /* * local function definitions */ - void ntpd_addremove_semaphore(HANDLE, int); -static int QueueSerialWait(struct refclockio *, recvbuf_t *buff, - olplus *lpo, BOOL clear_timestamp); -static int QueueRawSerialRead(struct refclockio *, recvbuf_t *buff, - olplus *lpo); -static int OnSocketRecv(ULONG_PTR, olplus *, DWORD, int); -static int OnSerialWaitComplete(struct refclockio *, olplus *, - DWORD, int); -static int OnSerialReadComplete(struct refclockio *, olplus *, - DWORD, int); -static int OnRawSerialReadComplete(struct refclockio *, olplus *, - DWORD, int); -static int OnWriteComplete(ULONG_PTR, olplus *, DWORD, int); +static void ntpd_addremove_semaphore(HANDLE, int); + +/* Initiate/Request async IO operations */ +static BOOL QueueSerialWait (struct refclockio *, recvbuf_t *, IoCtx_t *); +static BOOL QueueSerialRead (struct refclockio *, recvbuf_t *, IoCtx_t *); +static BOOL QueueRawSerialRead(struct refclockio *, recvbuf_t *, IoCtx_t *); +static BOOL QueueSocketRecv (SOCKET , recvbuf_t *, IoCtx_t *); + + +/* High-level IO callback functions */ +static void OnSocketRecv (ULONG_PTR, IoCtx_t *); +static void OnSerialWaitComplete (ULONG_PTR, IoCtx_t *); +static void OnSerialReadComplete (ULONG_PTR, IoCtx_t *); +static void OnRawSerialReadComplete(ULONG_PTR, IoCtx_t *); +static void OnSerialWriteComplete (ULONG_PTR, IoCtx_t *); + +/* worker pool offload functions */ +static DWORD WINAPI OnSerialReadWorker(void * ctx); /* keep a list to traverse to free memory on debug builds */ #ifdef DEBUG static void free_io_completion_port_mem(void); -olplus * compl_info_list; -CRITICAL_SECTION compl_info_lock; -#define LOCK_COMPL() EnterCriticalSection(&compl_info_lock) -#define UNLOCK_COMPL() LeaveCriticalSection(&compl_info_lock) #endif -/* #define USE_HEAP */ - -#ifdef USE_HEAP -static HANDLE hHeapHandle = NULL; -#endif HANDLE WaitableExitEventHandle; HANDLE WaitableIoEventHandle; static HANDLE hIoCompletionPort; -#ifdef NTPNEEDNAMEDHANDLE -#define WAITABLEIOEVENTHANDLE "WaitableIoEventHandle" -#else -#define WAITABLEIOEVENTHANDLE NULL -#endif - DWORD ActiveWaitHandles; HANDLE WaitHandles[16]; -olplus * -GetHeapAlloc(char *fromfunc) -{ - olplus *lpo; - -#ifdef USE_HEAP - lpo = HeapAlloc(hHeapHandle, - HEAP_ZERO_MEMORY, - sizeof(olplus)); -#else - lpo = emalloc_zero(sizeof(*lpo)); -#endif - DPRINTF(3, ("Allocation %d memory for %s, ptr %x\n", sizeof(olplus), fromfunc, lpo)); +/* + * ------------------------------------------------------------------- + * We make a pool of our own for IO context objects -- the are owned by + * the system until a completion result is pulled from the queue, and + * they seriously go into the way of memory tracking until we can safely + * cancel an IO request. + * ------------------------------------------------------------------- + */ +static HANDLE hHeapHandle; -#ifdef DEBUG - LOCK_COMPL(); - LINK_SLIST(compl_info_list, lpo, link); - UNLOCK_COMPL(); -#endif +/* + * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + * Create a new heap for IO context objects + */ +static void +IoCtxPoolInit(size_t initObjs) +{ + hHeapHandle = HeapCreate(0, initObjs*sizeof(IoCtx_t), 0); + if (hHeapHandle == NULL) + { + msyslog(LOG_ERR, "Can't initialize Heap: %m"); + exit(1); + } - return (lpo); } -void -FreeHeap(olplus *lpo, char *fromfunc) +/* + * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + * + * Delete the IO context heap + * + * Since we do not know what callbacks are pending, we just drop the + * pool into oblivion. New allocs and frees will fail from this moment, + * but we simply don't care. At least the normal heap dump stats will + * show no leaks from IO context blocks. On the downside, we have to + * track them ourselves if something goes wrong. + */ +static void +IoCtxPoolDone() { -#ifdef DEBUG - olplus *unlinked; - - DPRINTF(3, ("Freeing memory for %s, ptr %x\n", fromfunc, lpo)); + hHeapHandle = NULL; +} - LOCK_COMPL(); - UNLINK_SLIST(unlinked, compl_info_list, lpo, link, - olplus); - UNLOCK_COMPL(); -#endif +/* + * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + * Alloc & Free + * + * When the heap handle is NULL, these both will fail; Alloc with a NULL + * return and Free silently. + */ +static IoCtx_t * +IoCtxAlloc(void) +{ + IoCtx_t *lpo; -#ifdef USE_HEAP - HeapFree(hHeapHandle, 0, lpo); -#else - free(lpo); -#endif + if (hHeapHandle) + lpo = HeapAlloc(hHeapHandle, HEAP_ZERO_MEMORY, sizeof(IoCtx_t)); + else + lpo = NULL; + DPRINTF(3, ("Allocate IO ctx, heap=%p, ptr=%p\n", hHeapHandle, lpo)); + return (lpo); } -/* This function will add an entry to the I/O completion port - * that will signal the I/O thread to exit (gracefully) - */ static void -signal_io_completion_port_exit() +IoCtxFree(IoCtx_t *lpo) { - if (!PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, 0)) { - msyslog(LOG_ERR, "Can't request service thread to exit: %m"); - exit(1); - } + DPRINTF(3, ("Free IO ctx, heap=%p, ptr=%p\n", hHeapHandle, lpo)); + if (lpo && hHeapHandle) + HeapFree(hHeapHandle, 0, lpo); } + +/* + * ------------------------------------------------------------------- + * The IO completion thread and support functions + * + * There is only one completion thread, because it is locked to the same + * core as the time interpolation. Having more than one causes core + * contention and is not useful. + * ------------------------------------------------------------------- + */ +static HANDLE hIoCompletionThread; + +/* + * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + * The IO completion worker thread + * + * Note that this thread does not enter an alertable wait state and that + * the only waiting point is the IO completion port. If stopping this + * thread with a special queued result packet does not work, + * 'TerminateThread()' is the only remaining weapon in the arsenal. A + * dangerous weapon -- it's like SIGKILL. + */ static unsigned WINAPI iocompletionthread(void *NotUsed) { - BOOL bSuccess; - int err; - DWORD octets; - ULONG_PTR key; - struct refclockio * rio; - OVERLAPPED * pol; - olplus * lpo; + DWORD err; + DWORD octets; + ULONG_PTR key; + OVERLAPPED * pol; + IoCtx_t * lpo; UNUSED_ARG(NotUsed); /* - * socket and refclock receive call gettimeofday() - * so the I/O thread needs to be on the same - * processor as the main and timing threads - * to ensure consistent QueryPerformanceCounter() - * results. + * Socket and refclock receive call gettimeofday() so the I/O + * thread needs to be on the same processor as the main and + * timing threads to ensure consistent QueryPerformanceCounter() + * results. + * + * This gets seriously into the way of efficient thread pooling + * on multicore systems. */ lock_thread_to_processor(GetCurrentThread()); - /* Set the thread priority high enough so I/O will - * preempt normal recv packet processing, but not - * higher than the timer sync thread. + /* + * Set the thread priority high enough so I/O will preempt + * normal recv packet processing, but not higher than the timer + * sync thread. */ if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL)) msyslog(LOG_ERR, "Can't set thread priority: %m"); - while (TRUE) { - bSuccess = GetQueuedCompletionStatus( + for(;;) { + if (GetQueuedCompletionStatus( hIoCompletionPort, &octets, &key, &pol, - INFINITE); + INFINITE)) + err = ERROR_SUCCESS; + else + err = GetLastError(); if (NULL == pol) { DPRINTF(2, ("Overlapped IO Thread Exiting\n")); break; /* fail */ } + lpo = CONTAINEROF(pol, IoCtx_t, ol); + get_systime(&lpo->RecvTime); + lpo->byteCount = octets; + lpo->errCode = err; handler_calls++; - lpo = CONTAINEROF(pol, olplus, ol); - rio = (struct refclockio *)key; - - /* - * Deal with errors - */ - if (bSuccess) - err = 0; - else - err = GetLastError(); - - /* - * Invoke the appropriate function based on - * the value of the request_type - */ - switch (lpo->request_type) { - - case SOCK_RECV: - OnSocketRecv(key, lpo, octets, err); - break; - - case SOCK_SEND: - NTP_INSIST(0); - break; - - case SERIAL_WAIT: - OnSerialWaitComplete(rio, lpo, octets, err); - break; - - case SERIAL_READ: - OnSerialReadComplete(rio, lpo, octets, err); - break; - - case RAW_SERIAL_READ: - OnRawSerialReadComplete(rio, lpo, octets, err); - break; - - case SERIAL_WRITE: - OnWriteComplete(key, lpo, octets, err); - break; - - default: - DPRINTF(1, ("Unknown request type %d found in completion port\n", - lpo->request_type)); - break; - } + (*lpo->onIoDone)(key, lpo); } return 0; } -/* Create/initialise the I/O creation port +/* + * ------------------------------------------------------------------- + * Create/initialise the I/O creation port */ void init_io_completion_port( void ) { - unsigned tid; - HANDLE thread; #ifdef DEBUG - InitializeCriticalSection(&compl_info_lock); atexit(&free_io_completion_port_mem); #endif -#ifdef USE_HEAP - /* - * Create a handle to the Heap - */ - hHeapHandle = HeapCreate(0, 20*sizeof(olplus), 0); - if (hHeapHandle == NULL) - { - msyslog(LOG_ERR, "Can't initialize Heap: %m"); - exit(1); - } -#endif + /* Create the context pool first. */ + IoCtxPoolInit(20); - /* Create the event used to signal an IO event - */ - WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, WAITABLEIOEVENTHANDLE); + /* Create the event used to signal an IO event */ + WaitableIoEventHandle = CreateEvent( + NULL, FALSE, FALSE, NULL); if (WaitableIoEventHandle == NULL) { msyslog(LOG_ERR, "Can't create I/O event handle: %m - another process may be running - EXITING"); exit(1); } - /* Create the event used to signal an exit event - */ + /* Create the event used to signal an exit event */ WaitableExitEventHandle = CreateEvent(NULL, FALSE, FALSE, NULL); if (WaitableExitEventHandle == NULL) { msyslog(LOG_ERR, @@ -294,18 +335,16 @@ init_io_completion_port( exit(1); } - /* Create the IO completion port - */ - hIoCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + /* Create the IO completion port */ + hIoCompletionPort = CreateIoCompletionPort( + INVALID_HANDLE_VALUE, NULL, 0, 0); if (hIoCompletionPort == NULL) { msyslog(LOG_ERR, "Can't create I/O completion port: %m"); exit(1); } - /* - * Initialize the Wait Handles - */ + /* Initialize the Wait Handles table */ WaitHandles[0] = WaitableIoEventHandle; WaitHandles[1] = WaitableExitEventHandle; /* exit request */ WaitHandles[2] = WaitableTimerHandle; @@ -320,52 +359,102 @@ init_io_completion_port( addremove_io_semaphore = &ntpd_addremove_semaphore; /* - * Have one thread servicing I/O - there were 4, but this would - * somehow cause NTP to stop replying to ntpq requests; TODO + * Have one thread servicing I/O. See rationale in front matter. */ - thread = (HANDLE)_beginthreadex( + hIoCompletionThread = (HANDLE)_beginthreadex( NULL, 0, iocompletionthread, NULL, - CREATE_SUSPENDED, - &tid); - ResumeThread(thread); - CloseHandle(thread); + 0, + NULL); } - +/* + * ------------------------------------------------------------------- + * completion port teardown + */ void -ntpd_addremove_semaphore( - HANDLE sem, - int remove +uninit_io_completion_port( + void ) { - const size_t hnd_sz = sizeof(WaitHandles[0]); - u_int hi; + /* do noting if completion port already gone. */ + if (NULL == hIoCompletionPort) + return; - if (!remove) { - INSIST((ActiveWaitHandles + 1) < COUNTOF(WaitHandles)); - WaitHandles[ActiveWaitHandles] = sem; - ActiveWaitHandles++; + /* + * Service thread seems running. Terminate him with grace + * first and force later... + */ - return; + /* post exit request for thread. */ + PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, 0); + IoCtxPoolDone(); /* stop using the memory pool */ + + if (WAIT_TIMEOUT == WaitForSingleObject(hIoCompletionThread, 5000)) { + /* Everything is lost. Kill off with TerminateThread. */ + msyslog(LOG_ERR, "IO completion thread refuses to terminate"); + TerminateThread(hIoCompletionThread, ~0UL); } - /* removing semaphore */ + /* now reap all handles... */ + CloseHandle(hIoCompletionThread); + hIoCompletionThread = NULL; + CloseHandle(hIoCompletionPort); + hIoCompletionPort = NULL; +} + + +/* + * ------------------------------------------------------------------- + * external worker thread support (wait handle stuff) + * + * !Attention! + * + * - This function must only be called from the main thread. Changing + * a set of wait handles while someone is waiting on it creates + * undefined behaviour. Also there's no provision for mutual + * exclusion when accessing global values. + * + * - It's not possible to register a handle that is already in the table. + */ +static void +ntpd_addremove_semaphore( + HANDLE sem, + int remove + ) +{ + DWORD hi; + + /* search for a matching entry first. */ for (hi = 3; hi < ActiveWaitHandles; hi++) if (sem == WaitHandles[hi]) break; - if (hi == ActiveWaitHandles) - return; - - ActiveWaitHandles--; - if (hi < ActiveWaitHandles) - memmove(&WaitHandles[hi], - &WaitHandles[hi + 1], - (ActiveWaitHandles - hi) * hnd_sz); - WaitHandles[ActiveWaitHandles] = NULL; + if (remove) { + /* + * If found, eventually swap with last entry to keep + * the table dense. + */ + if (hi < ActiveWaitHandles) { + ActiveWaitHandles--; + if (hi < ActiveWaitHandles) + WaitHandles[hi] = + WaitHandles[ActiveWaitHandles]; + WaitHandles[ActiveWaitHandles] = NULL; + } + } else { + /* + * Make sure the entry is not found and there is enough + * room, then append to the table array. + */ + if (hi >= ActiveWaitHandles) { + NTP_INSIST(ActiveWaitHandles < COUNTOF(WaitHandles)); + WaitHandles[ActiveWaitHandles] = sem; + ActiveWaitHandles++; + } + } } @@ -375,330 +464,500 @@ free_io_completion_port_mem( void ) { - olplus * pci; - -#if defined(_MSC_VER) && defined (_DEBUG) - _CrtCheckMemory(); -#endif - LOCK_COMPL(); - while ((pci = compl_info_list) != NULL) { - -#if 0 /* sockaddr with received-from address in recvbuf */ - /* is sometimes modified by system after we free it */ - /* triggering heap corruption warning -- find a */ - /* better way to free it after I/O is surely done */ - /* this handles both xmit and recv buffs */ - if (pci->recv_buf != NULL) { - DPRINTF(1, ("freeing xmit/recv buff %p\n", pci->recv_buf)); - free(pci->recv_buf); - } -#endif - - FreeHeap(pci, "free_io_completion_port_mem"); - /* FreeHeap() removed this item from compl_info_list */ - } - UNLOCK_COMPL(); - -#if defined(_MSC_VER) && defined (_DEBUG) - _CrtCheckMemory(); -#endif + /* + * At the moment, do absolutely nothing. Returning memory here + * requires NO PENDING OVERLAPPED OPERATIONS AT ALL at this + * point in time, and as long we cannot be reasonable sure about + * that the simple advice is: + * + * HANDS OFF! + */ } #endif /* DEBUG */ -void -uninit_io_completion_port( - void +/* + * ------------------------------------------------------------------- + * Serial IO stuff + * + * Prelude -- common error checking code + * ------------------------------------------------------------------- + */ +extern char * NTstrerror(int err, BOOL *bfreebuf); + +static BOOL +IoResultCheck( + DWORD err, + IoCtx_t * ctx, + const char * msg ) { - if (hIoCompletionPort != NULL) { - /* Get each of the service threads to exit - */ - signal_io_completion_port_exit(); + char * msgbuf; + BOOL dynbuf; + + /* If the clock is not / no longer active, assume + * 'ERROR_OPERATION_ABORTED' and do the necessary cleanup. + */ + if (ctx->rio && !ctx->rio->active) + err = ERROR_OPERATION_ABORTED; + + switch (err) + { + /* The first ones are no real errors. */ + case ERROR_SUCCESS: /* all is good */ + case ERROR_IO_PENDING: /* callback pending */ + return TRUE; + + /* the next ones go silently -- only cleanup is done */ + case ERROR_INVALID_PARAMETER: /* handle already closed */ + case ERROR_OPERATION_ABORTED: /* handle closed while wait */ + break; + + + default: + /* We have to resort to the low level error formatting + * functions here, since the error code can be an + * overlapped result. Relying the value to be the same + * as the 'GetLastError()' result at this point of + * execution is shaky at best, and using SetLastError() + * to force it seems too nasty. + */ + msgbuf = NTstrerror(err, &dynbuf); + msyslog(LOG_ERR, "%s: err=%u, '%s'", msg, err, msgbuf); + if (dynbuf) + LocalFree(msgbuf); + break; + } + + /* If we end here, we have to mop up the buffer and context */ + if (ctx->flRawMem) { + if (ctx->trans_buf) + free(ctx->trans_buf); + } else { + if (ctx->recv_buf) + freerecvbuf(ctx->recv_buf); } + IoCtxFree(ctx); + return FALSE; } +/* + * ------------------------------------------------------------------- + * Serial IO stuff + * + * Part 1 -- COMM event handling + * ------------------------------------------------------------------- + */ -static int +static BOOL QueueSerialWait( struct refclockio * rio, recvbuf_t * buff, - olplus * lpo, - BOOL clear_timestamp + IoCtx_t * lpo ) { - DWORD err; + BOOL rc; - lpo->request_type = SERIAL_WAIT; + lpo->onIoDone = OnSerialWaitComplete; lpo->recv_buf = buff; - - if (clear_timestamp) - ZERO(buff->recv_time); - - buff->fd = _get_osfhandle(rio->fd); - if (!WaitCommEvent((HANDLE)buff->fd, - (DWORD *)&buff->recv_buffer, &lpo->ol)) { - err = GetLastError(); - if (ERROR_IO_PENDING != err) { - /* - * ERROR_INVALID_PARAMETER occurs after rio->fd - * is closed. - */ - if (ERROR_INVALID_PARAMETER != err) - msyslog(LOG_ERR, "Can't wait on Refclock: %m"); - freerecvbuf(buff); - return 0; - } - } - return 1; + lpo->flRawMem = 0; + lpo->rio = rio; + buff->fd = rio->fd; + + rc = WaitCommEvent((HANDLE)_get_osfhandle(rio->fd), + &lpo->com_events, &lpo->ol); + if (!rc) + return IoResultCheck(GetLastError(), lpo, + "Can't wait on Refclock"); + return TRUE; } +/* - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - */ -static int +static void OnSerialWaitComplete( - struct refclockio * rio, - olplus * lpo, - DWORD Bytes, - int errstatus + ULONG_PTR key, + IoCtx_t * lpo ) { - recvbuf_t *buff; - struct peer *pp; - l_fp arrival_time; - DWORD comm_mask; - DWORD modem_status; - static const l_fp zero_time = { 0 }; - BOOL rc; - - get_systime(&arrival_time); + struct refclockio * rio; + recvbuf_t * buff; + DWORD modem_status; - if (!rio->active) - return FALSE; + /* check and bail out if operation failed */ + if (!IoResultCheck(lpo->errCode, lpo, + "WaitCommEvent failed")) + return; - /* - * Get the recvbuf pointer from the overlapped buffer. - */ + /* get & validate context and buffer. */ + rio = lpo->rio; buff = lpo->recv_buf; - comm_mask = (*(DWORD *)&buff->recv_buffer); + NTP_INSIST((ULONG_PTR)rio == key); + #ifdef DEBUG - if (errstatus || comm_mask & - ~(EV_RXFLAG | EV_RLSD | EV_RXCHAR)) { - msyslog(LOG_ERR, "WaitCommEvent returned unexpected mask %x errstatus %d", - comm_mask, errstatus); - exit(-1); - } + if (lpo->com_events & ~(EV_RXFLAG | EV_RLSD | EV_RXCHAR)) { + msyslog(LOG_ERR, "WaitCommEvent returned unexpected mask %x", + lpo->com_events); + exit(-1); + } #endif - if (EV_RLSD & comm_mask) { - modem_status = 0; - GetCommModemStatus((HANDLE)buff->fd, &modem_status); - if (modem_status & MS_RLSD_ON) { - /* - * Use the timestamp from this PPS CD not - * the later end of line. - */ - buff->recv_time = arrival_time; - } - - if (EV_RLSD == comm_mask) { - /* - * if we didn't see an end of line yet - * issue another wait for it. - */ - QueueSerialWait(rio, buff, lpo, FALSE); - return TRUE; - } - } - - if (EV_RXCHAR & comm_mask) { /* raw discipline */ - QueueRawSerialRead(rio, buff, lpo); - return TRUE; - } - - /* - * We've detected the end of line of serial input. - * Use this timestamp unless we already have a CD PPS - * timestamp in buff->recv_time. - */ - if (memcmp(&buff->recv_time, &zero_time, sizeof buff->recv_time)) { - /* - * We will first see a user PPS timestamp here on either - * the first or second line of text. Log a one-time - * message while processing the second line. - */ - if (1 == rio->recvcount) { - pp = (struct peer *)rio->srcclock; - msyslog(LOG_NOTICE, "Using user-mode PPS timestamp for %s", - refnumtoa(&pp->srcadr)); - } - } else { - buff->recv_time = arrival_time; + /* Take note of changes on DCD; 'user space PPS' */ + if (EV_RLSD & lpo->com_events) { + modem_status = 0; + GetCommModemStatus((HANDLE)buff->fd, &modem_status); + if (modem_status & MS_RLSD_ON) { + lpo->DCDSTime = lpo->RecvTime; + lpo->flTsDCDS = 1; } + } - /* - * Now that we have a complete line waiting, read it. - * There is still a race here, but we're likely to win. - */ + /* If IO ready, read data. Go back waiting else. */ + if (EV_RXFLAG & lpo->com_events) { /* line discipline */ + lpo->FlagTime = lpo->RecvTime; + lpo->flTsFlag = 1; + QueueSerialRead(rio, buff, lpo); + } else if (EV_RXCHAR & lpo->com_events) { /* raw discipline */ + QueueRawSerialRead(rio, buff, lpo); + } else { /* idle... */ + QueueSerialWait(rio, buff, lpo); + } +} - lpo->request_type = SERIAL_READ; - rc = ReadFile( - (HANDLE)buff->fd, - buff->recv_buffer, - sizeof(buff->recv_buffer), - NULL, - &lpo->ol); +/* + * ------------------------------------------------------------------- + * Serial IO stuff + * + * Part 2 -- line discipline emulation + * + * Ideally this should *not* be done in the IO completion thread. + * We use a worker pool thread to offload the low-level processing. + * ------------------------------------------------------------------- + */ - if (!rc && ERROR_IO_PENDING != GetLastError()) { - msyslog(LOG_ERR, "Can't read from Refclock: %m"); - freerecvbuf(buff); - return 0; - } +/* + * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + * Start & Queue a serial read for line discipline emulation. + */ +static BOOL +QueueSerialRead( + struct refclockio * rio, + recvbuf_t * buff, + IoCtx_t * lpo + ) +{ + BOOL rc; - return 1; + lpo->onIoDone = OnSerialReadComplete; + lpo->recv_buf = buff; + lpo->flRawMem = 0; + lpo->rio = rio; + buff->fd = rio->fd; + + rc = ReadFile((HANDLE)_get_osfhandle(rio->fd), + (char*)buff->recv_buffer + buff->recv_length, + sizeof(buff->recv_buffer) - buff->recv_length, + NULL, &lpo->ol); + if (!rc) + return IoResultCheck(GetLastError(), lpo, + "Can't read from Refclock"); + return TRUE; } - -/* Return 1 on Successful Read */ -static int +/* + * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + * IO completion thread callback. Takes a time stamp and offloads the + * real work to the worker pool ASAP. + */ +static void OnSerialReadComplete( - struct refclockio * rio, - olplus * lpo, - DWORD Bytes, - int errstatus + ULONG_PTR key, + IoCtx_t * lpo ) { + struct refclockio * rio; recvbuf_t * buff; - l_fp cr_time; - int consumed; - if (!rio->active) - return FALSE; + /* check and bail out if operation failed */ + if (!IoResultCheck(lpo->errCode, lpo, + "Read from Refclock failed")) + return; - /* - * Get the recvbuf pointer from the overlapped buffer. - */ + /* get & validate context and buffer. */ + rio = lpo->rio; buff = lpo->recv_buf; + NTP_INSIST((ULONG_PTR)rio == key); + + /* Offload to worker pool */ + if (!QueueUserWorkItem(OnSerialReadWorker, lpo, WT_EXECUTEDEFAULT)) { + msyslog(LOG_ERR, + "Can't offload to worker thread, will skip data: %m"); + ZERO(*lpo); + buff->recv_length = 0; + QueueSerialWait(rio, buff, lpo); + } +} + +/* + * - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + * Worker pool offload function -- avoid lengthy operations in the IO + * completion thread (affects timing...) + * + * This function does the real work of emulating the UN*X line + * discipline. Since this involves allocation of additional buffers and + * string parsing/copying, it is offloaded to the worker thread pool so + * the IO completion thread can resume faster. + */ +static DWORD WINAPI +OnSerialReadWorker(void * ctx) +{ + IoCtx_t * lpo; + recvbuf_t * buff, *obuf; + struct refclockio * rio; + char *sptr, *send, *dptr; + BOOL eol; + char ch; + /* Get context back */ + lpo = (IoCtx_t*)ctx; + buff = lpo->recv_buf; + rio = lpo->rio; /* - * ignore 0 bytes read due to timeout's and closure on fd + * ignore 0 bytes read due to closure on fd. + * Eat the first line of input as it's possibly partial. */ - if (!errstatus && Bytes) { - buff->recv_length = (int) Bytes; - buff->receiver = process_refclock_packet; - buff->dstadr = NULL; - buff->recv_peer = rio->srcclock; + if (lpo->byteCount && rio->recvcount++) { + /* account for additional input */ + buff->recv_length += (int)lpo->byteCount; + /* - * Eat the first line of input as it's possibly - * partial and if a PPS is present, it may not - * have fired since the port was opened. + * Now mimic the Unix line discipline. */ - if (rio->recvcount++ > 0) { - cr_time = buff->recv_time; - buff->fd = rio->fd; /* was handle */ - add_full_recv_buffer(buff); + sptr = (char *)buff->recv_buffer; + send = sptr + buff->recv_length; + obuf = NULL; + dptr = NULL; + while (sptr != send) + { + /* get new buffer to store line */ + obuf = get_free_recv_buffer_alloc(); + obuf->recv_time = lpo->RecvTime; + obuf->recv_length = 0; + obuf->fd = rio->fd; + obuf->receiver = process_refclock_packet; + obuf->dstadr = NULL; + obuf->recv_peer = rio->srcclock; + + /* + * Time stamp assignment is interesting. If we + * have a DCD stamp, we use it, otherwise we use + * the FLAG char event time, and if that is also + * not / no longer available we use the arrival + * time. + */ + if (lpo->flTsDCDS) + obuf->recv_time = lpo->DCDSTime; + else if (lpo->flTsFlag) + obuf->recv_time = lpo->FlagTime; + else + obuf->recv_time = lpo->RecvTime; + lpo->flTsDCDS = lpo->flTsFlag = 0; /* use only once... */ + /* - * Mimic Unix line discipline and assume CR/LF - * line termination. On Unix the CR terminates - * the line containing the timecode, and - * immediately after the LF terminates an empty - * line. So synthesize the empty LF-terminated - * line using the same CR timestamp. Both CR - * and LF are stripped by refclock_gtlin(). + * Copy data to new buffer, convert CR to LF on + * the fly. Stop after LF. */ - buff = get_free_recv_buffer_alloc(); - buff->recv_time = cr_time; + dptr = (char *)obuf->recv_buffer; + eol = FALSE; + while (sptr != send && !eol) { + ch = *sptr++; + if (ch == '\r') + ch = '\n'; + *dptr++ = ch; + eol = (ch == '\n'); + } + obuf->recv_length = + (int)(dptr - (char *)obuf->recv_buffer); + + /* + * If NL found, push this buffer and prepare to + * get a new one. + */ + if (eol) { + add_full_recv_buffer(obuf); + SetEvent(WaitableIoEventHandle); + obuf = NULL; + } + } + + /* + * If we still have an output buffer, continue to fill + * it again. + */ + if (obuf) { + obuf->recv_length = + (int)(dptr - (char *)obuf->recv_buffer); + freerecvbuf(buff); + buff = obuf; + } else { + /* clear the current buffer, continue */ buff->recv_length = 0; - buff->fd = rio->fd; - buff->receiver = process_refclock_packet; - buff->dstadr = NULL; - buff->recv_peer = rio->srcclock; - add_full_recv_buffer(buff); - SetEvent(WaitableIoEventHandle); - buff = get_free_recv_buffer_alloc(); } + } else { + ZERO(*lpo); + buff->recv_length = 0; } - QueueSerialWait(rio, buff, lpo, TRUE); - - return 1; + QueueSerialWait(rio, buff, lpo); + return 0; } /* - * QueueRawSerialRead() returns TRUE if successful. + * ------------------------------------------------------------------- + * Serial IO stuff + * + * Part 3 -- raw data input + * + * Raw data processing is fast enough to do without offloading to the + * worker pool, so this is rather short'n sweet... + * ------------------------------------------------------------------- */ -static int + +static BOOL QueueRawSerialRead( struct refclockio * rio, recvbuf_t * buff, - olplus * lpo + IoCtx_t * lpo ) { - BOOL rc; + BOOL rc; - buff->fd = _get_osfhandle(rio->fd); - lpo->request_type = RAW_SERIAL_READ; + lpo->onIoDone = OnRawSerialReadComplete; lpo->recv_buf = buff; - rc = ReadFile((HANDLE)buff->fd, buff->recv_buffer, - sizeof(buff->recv_buffer), NULL, &lpo->ol); - if (!rc && ERROR_IO_PENDING != GetLastError()) { - msyslog(LOG_ERR, "Can't raw read from Refclock: %m"); - freerecvbuf(buff); - return FALSE; - } - + lpo->flRawMem = 0; + lpo->rio = rio; + buff->fd = rio->fd; + + rc = ReadFile((HANDLE)_get_osfhandle(rio->fd), + buff->recv_buffer, + sizeof(buff->recv_buffer), + NULL, &lpo->ol); + if (!rc) + return IoResultCheck(GetLastError(), lpo, + "Can't read raw from Refclock"); return TRUE; } -static int +static void OnRawSerialReadComplete( - struct refclockio * rio, - olplus * lpo, - DWORD octets, - int errstatus + ULONG_PTR key, + IoCtx_t * lpo ) { - recvbuf_t * rbufp; - l_fp arrival_time; - int consumed; - - get_systime(&arrival_time); + struct refclockio * rio; + recvbuf_t * buff; - if (!rio->active) - return FALSE; + /* check and bail out if operation failed */ + if (!IoResultCheck(lpo->errCode, lpo, + "Raw read from Refclock failed")) + return; - rbufp = lpo->recv_buf; + /* get & validate context and buffer. */ + rio = lpo->rio; + buff = lpo->recv_buf; + NTP_INSIST((ULONG_PTR)rio == key); /* ignore 0 bytes read. */ - if (NO_ERROR == errstatus && octets > 0) { - rbufp->recv_length = (int)octets; - rbufp->dstadr = NULL; - rbufp->recv_time = arrival_time; - rbufp->receiver = process_refclock_packet; - rbufp->recv_peer = rio->srcclock; - rbufp->fd = rio->fd; /* was handle */ - - add_full_recv_buffer(rbufp); + if (lpo->byteCount > 0) { + buff->recv_length = (int)lpo->byteCount; + buff->dstadr = NULL; + buff->recv_time = lpo->RecvTime; + buff->receiver = process_refclock_packet; + buff->recv_peer = rio->srcclock; + add_full_recv_buffer(buff); SetEvent(WaitableIoEventHandle); - rbufp = get_free_recv_buffer_alloc(); + buff = get_free_recv_buffer_alloc(); + } + + buff->recv_length = 0; + QueueSerialWait(rio, buff, lpo); +} + +/* + * ------------------------------------------------------------------- + * Serial IO stuff + * + * Part 4 -- Overlapped serial output + * + * Again, no need to offload any work. + * ------------------------------------------------------------------- + */ + +/* + * async_write, clone of write(), used by some reflock drivers + */ +int +async_write( + int fd, + const void * data, + unsigned int count + ) +{ + IoCtx_t * lpo; + BOOL rc; + + lpo = IoCtxAlloc(); + if (lpo == NULL) { + DPRINTF(1, ("async_write: out of memory\n")); + errno = ENOMEM; + return -1; } - QueueSerialWait(rio, rbufp, lpo, TRUE); + lpo->onIoDone = OnSerialWriteComplete; + lpo->trans_buf = emalloc(count); + lpo->flRawMem = 1; + memcpy(lpo->trans_buf, data, count); + + rc = WriteFile((HANDLE)_get_osfhandle(fd), + lpo->trans_buf, count, + NULL, &lpo->ol); + if (!rc && !IoResultCheck(GetLastError(), lpo, + "Can't write to Refclock")) { + errno = EBADF; + return -1; + } + return count; +} - return 1; +static void +OnSerialWriteComplete( + ULONG_PTR key, + IoCtx_t * lpo + ) +{ + /* set RIO and force silent cleanup if no error */ + lpo->rio = (struct refclockio *)key; + if (ERROR_SUCCESS == lpo->errCode) + lpo->errCode = ERROR_OPERATION_ABORTED; + IoResultCheck(lpo->errCode, lpo, + "Write to Refclock failed"); } -/* Add a reference clock data structures I/O handles to - * the I/O completion port. Return 1 if any error. + + +/* + * Add a reference clock data structures I/O handles to + * the I/O completion port. Return 1 if any error. */ int io_completion_port_add_clock_io( struct refclockio *rio ) { - olplus * lpo; + IoCtx_t * lpo; recvbuf_t * buff; HANDLE h; @@ -712,14 +971,14 @@ io_completion_port_add_clock_io( return 1; } - lpo = GetHeapAlloc("io_completion_port_add_clock_io"); + lpo = IoCtxAlloc(); if (NULL == lpo) { msyslog(LOG_ERR, "Can't allocate heap for completion port: %m"); return 1; } - buff = get_free_recv_buffer_alloc(); - QueueSerialWait(rio, buff, lpo, TRUE); + buff->recv_length = 0; + QueueSerialWait(rio, buff, lpo); return 0; } @@ -730,100 +989,74 @@ io_completion_port_add_clock_io( * Note: As per the winsock documentation, we use WSARecvFrom. Using * ReadFile() is less efficient. */ -static unsigned long +static BOOL QueueSocketRecv( - SOCKET s, - recvbuf_t *buff, - olplus *lpo + SOCKET s, + recvbuf_t * buff, + IoCtx_t * lpo ) { WSABUF wsabuf; - DWORD Flags; - DWORD Result; + DWORD Flags; + int rc; - lpo->request_type = SOCK_RECV; + lpo->onIoDone = OnSocketRecv; lpo->recv_buf = buff; - - if (buff != NULL) { - Flags = 0; - buff->fd = s; - buff->recv_srcadr_len = sizeof(buff->recv_srcadr); - wsabuf.buf = (char *)buff->recv_buffer; - wsabuf.len = sizeof(buff->recv_buffer); - - if (SOCKET_ERROR == WSARecvFrom(buff->fd, &wsabuf, 1, - NULL, &Flags, - &buff->recv_srcadr.sa, - &buff->recv_srcadr_len, - &lpo->ol, NULL)) { - Result = GetLastError(); - switch (Result) { - case NO_ERROR : - case WSA_IO_PENDING : - break ; - - case WSAENOTSOCK : - msyslog(LOG_ERR, "Can't read from non-socket fd %d: %m", (int)buff->fd); - /* return the buffer */ - freerecvbuf(buff); - return 0; - break; - - case WSAEFAULT : - msyslog(LOG_ERR, "The buffers parameter is incorrect: %m"); - /* return the buffer */ - freerecvbuf(buff); - return 0; - break; - - default : - /* nop */ ; - } - } - } - else - return 0; - return 1; + lpo->flRawMem = 0; + lpo->rio = NULL; + + Flags = 0; + buff->fd = s; + buff->recv_srcadr_len = sizeof(buff->recv_srcadr); + wsabuf.buf = (char *)buff->recv_buffer; + wsabuf.len = sizeof(buff->recv_buffer); + + rc = WSARecvFrom(buff->fd, &wsabuf, 1, NULL, &Flags, + &buff->recv_srcadr.sa, &buff->recv_srcadr_len, + &lpo->ol, NULL); + if (SOCKET_ERROR == rc) + return IoResultCheck(GetLastError(), lpo, + "Can't read from Socket"); + return TRUE; } -/* Returns 0 if any Error */ -static int -OnSocketRecv(ULONG_PTR i, olplus *lpo, DWORD Bytes, int errstatus) +static void +OnSocketRecv( + ULONG_PTR key, + IoCtx_t * lpo + ) { - struct recvbuf *buff = NULL; - recvbuf_t *newbuff; - l_fp arrival_time; - struct interface * inter = (struct interface *) i; + recvbuf_t * buff; + recvbuf_t * newbuff; + struct interface * inter = (struct interface *)key; - get_systime(&arrival_time); - NTP_REQUIRE(NULL != lpo); NTP_REQUIRE(NULL != lpo->recv_buf); + /* check and bail out if operation failed */ + if (!IoResultCheck(lpo->errCode, lpo, + "Read from Socket failed")) + return; + /* * Convert the overlapped pointer back to a recvbuf pointer. + * Fetch items that are lost when the context is queued again. */ buff = lpo->recv_buf; - - /* - * If the socket is closed we get an Operation Aborted error - * Just clean up - */ - if (errstatus == WSA_OPERATION_ABORTED) - { - freerecvbuf(buff); - lpo->recv_buf = NULL; - FreeHeap(lpo, "OnSocketRecv: Socket Closed"); - return (1); - } + buff->recv_time = lpo->RecvTime; + buff->recv_length = (int)lpo->byteCount; /* * Get a new recv buffer for the replacement socket receive */ newbuff = get_free_recv_buffer_alloc(); - QueueSocketRecv(inter->fd, newbuff, lpo); - + if (NULL != newbuff) { + QueueSocketRecv(inter->fd, newbuff, lpo); + } else { + IoCtxFree(lpo); + msyslog(LOG_ERR, "Can't add I/O request to socket"); + } DPRINTF(4, ("%sfd %d %s recv packet mode is %d\n", (MODE_BROADCAST == get_packet_mode(buff)) ? " **** Broadcast " @@ -834,14 +1067,11 @@ OnSocketRecv(ULONG_PTR i, olplus *lpo, DWORD Bytes, int errstatus) /* * If we keep it add some info to the structure */ - if (Bytes && !inter->ignore_packets) { + if (buff->recv_length && !inter->ignore_packets) { NTP_INSIST(buff->recv_srcadr_len <= sizeof(buff->recv_srcadr)); - memcpy(&buff->recv_time, &arrival_time, - sizeof(buff->recv_time)); - buff->recv_length = (int) Bytes; buff->receiver = &receive; - buff->dstadr = inter; + buff->dstadr = inter; packets_received++; handler_pkts++; inter->received++; @@ -857,25 +1087,28 @@ OnSocketRecv(ULONG_PTR i, olplus *lpo, DWORD Bytes, int errstatus) SetEvent(WaitableIoEventHandle); } else freerecvbuf(buff); - - return 1; } -/* Add a socket handle to the I/O completion port, and send - * NTP_RECVS_PER_SOCKET recv requests to the kernel. +/* + * Add a socket handle to the I/O completion port, and send + * NTP_RECVS_PER_SOCKET recv requests to the kernel. */ -extern int -io_completion_port_add_socket(SOCKET fd, struct interface *inter) +int +io_completion_port_add_socket( + SOCKET fd, + struct interface * inter + ) { - olplus *lpo; - recvbuf_t *buff; - int n; + IoCtx_t * lpo; + recvbuf_t * buff; + int n; if (fd != INVALID_SOCKET) { if (NULL == CreateIoCompletionPort((HANDLE)fd, hIoCompletionPort, (ULONG_PTR)inter, 0)) { - msyslog(LOG_ERR, "Can't add socket to i/o completion port: %m"); + msyslog(LOG_ERR, + "Can't add socket to i/o completion port: %m"); return 1; } } @@ -894,10 +1127,11 @@ io_completion_port_add_socket(SOCKET fd, struct interface *inter) for (n = 0; n < WINDOWS_RECVS_PER_SOCKET; n++) { buff = get_free_recv_buffer_alloc(); - lpo = (olplus *) GetHeapAlloc("io_completion_port_add_socket"); + lpo = IoCtxAlloc(); if (lpo == NULL) { - msyslog(LOG_ERR, "Can't allocate heap for completion port: %m"); + msyslog(LOG_ERR + , "Can't allocate IO completion context: %m"); return 1; } @@ -908,24 +1142,6 @@ io_completion_port_add_socket(SOCKET fd, struct interface *inter) } -static int -OnWriteComplete(ULONG_PTR i, olplus *lpo, DWORD Bytes, int errstatus) -{ - void *buff; - - UNUSED_ARG(i); - UNUSED_ARG(Bytes); - UNUSED_ARG(errstatus); - - buff = lpo->trans_buf; - free(buff); - lpo->trans_buf = NULL; - FreeHeap(lpo, "OnWriteComplete"); - - return 1; -} - - /* * io_completion_port_sendto() -- sendto() replacement for Windows * @@ -955,9 +1171,8 @@ io_completion_port_sendto( Result = WSASendTo(fd, &wsabuf, 1, &octets_sent, 0, &dest->sa, AddrLen, NULL, NULL); - + errval = GetLastError(); if (SOCKET_ERROR == Result) { - errval = GetLastError(); if (ERROR_UNEXP_NET_ERR == errval) { /* * We get this error when trying to send if the @@ -993,54 +1208,14 @@ io_completion_port_sendto( } -/* - * async_write, clone of write(), used by some reflock drivers - */ -int -async_write( - int fd, - const void *data, - unsigned int count - ) -{ - void *buff; - olplus *lpo; - DWORD BytesWritten; - - buff = emalloc(count); - lpo = GetHeapAlloc("async_write"); - if (lpo == NULL) { - free(buff); - DPRINTF(1, ("async_write: out of memory\n")); - errno = ENOMEM; - return -1; - } - - lpo->request_type = SERIAL_WRITE; - lpo->trans_buf = buff; - memcpy(buff, data, count); - - if (!WriteFile((HANDLE)_get_osfhandle(fd), buff, count, - &BytesWritten, &lpo->ol) - && ERROR_IO_PENDING != GetLastError()) { - - msyslog(LOG_ERR, "async_write - error %m"); - free(buff); - FreeHeap(lpo, "async_write"); - errno = EBADF; - return -1; - } - - return count; -} - /* * GetReceivedBuffers * Note that this is in effect the main loop for processing requests * both send and receive. This should be reimplemented */ -int GetReceivedBuffers() +int +GetReceivedBuffers() { DWORD index; HANDLE ready; @@ -1070,12 +1245,14 @@ int GetReceivedBuffers() break; case WAIT_TIMEOUT: - msyslog(LOG_ERR, "ntpd: WaitForMultipleObjects INFINITE timed out."); + msyslog(LOG_ERR, + "WaitForMultipleObjects INFINITE timed out."); exit(1); break; case WAIT_FAILED: - msyslog(LOG_ERR, "ntpd: WaitForMultipleObjects Failed: Error: %m"); + msyslog(LOG_ERR, + "WaitForMultipleObjects Failed: Error: %m"); exit(1); break;