From: Sven Dietricht Date: Tue, 16 Nov 1999 15:09:11 +0000 (-0000) Subject: ntp_iocompletionport.c: X-Git-Tag: NTP_4_0_98_G~45 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a9591a2db54e0486209a67ff3aa7338bec7e2789;p=thirdparty%2Fntp.git ntp_iocompletionport.c: Cleanup bk: 38317397nYdKnxQQsWTnuC0hgSUk3g --- diff --git a/ports/winnt/ntpd/ntp_iocompletionport.c b/ports/winnt/ntpd/ntp_iocompletionport.c index c4826f7eb5..2627b7cccf 100644 --- a/ports/winnt/ntpd/ntp_iocompletionport.c +++ b/ports/winnt/ntpd/ntp_iocompletionport.c @@ -1,4 +1,3 @@ - #ifdef HAVE_CONFIG_H # include #endif @@ -45,8 +44,6 @@ signal_io_completion_port_exit() } - - static void __cdecl iocompletionthread(void *NotUsed) { @@ -59,17 +56,20 @@ iocompletionthread(void *NotUsed) * preempt normal recv packet processing, but not * higher than the timer sync thread. */ - if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL)) { + if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL)) { msyslog(LOG_ERR, "Can't set thread priority: %m"); } for (;;) { while (GetQueuedCompletionStatus(hIoCompletionPort, &BytesTransferred, &Key, & (LPOVERLAPPED) Overlapped, INFINITE)) { - if (Overlapped != NULL && Overlapped->iofunction != NULL) { + if (Overlapped != NULL && + Overlapped->iofunction != NULL) { if (!Overlapped->iofunction(Key, Overlapped, BytesTransferred)) { - return ; /* fail */ +#ifdef DEBUG + msyslog(LOG_INFO, "Overlapped IO Thread Exits: %m"); +#endif + return ; /* fail */ } - } } } @@ -108,8 +108,37 @@ uninit_io_completion_port( } +static int QueueIORead( struct refclockio *rio ) { + struct recvbuf *buff; + buff = get_free_recv_buffer(); + + if (buff == NULL) + return 0; + + buff->fd = rio->fd; + buff->iocompletioninfo.iofunction = OnIoReadComplete; + if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, &buff->iocompletioninfo.overlapped)) { + DWORD Result = GetLastError(); + switch (Result) { + case NO_ERROR : + case ERROR_HANDLE_EOF : + case ERROR_IO_PENDING : + break ; + + default: + msyslog(LOG_ERR, "Can't read from Refclock: %m"); + freerecvbuf(buff); + return 0; + } + } + return 1; +} + + + +/* Return 1 on Successful Read */ static int OnIoReadComplete(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes) { @@ -129,96 +158,43 @@ OnIoReadComplete(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes) add_full_recv_buffer(buff); buff = NULL; } + else + freerecvbuf(buff); - /* If we used the last buffer, then get another - */ - if (buff == NULL) { - buff = get_free_recv_buffer(); - } - - if (buff != NULL) { - buff->fd = rio->fd; - buff->iocompletioninfo.iofunction = OnIoReadComplete; - if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, &buff->iocompletioninfo.overlapped)) { - DWORD Result = GetLastError(); - switch (Result) { - } - } - } + QueueIORead( rio ); return 1; } - - - /* Add a reference clock data structures I/O handles to - * the I/O completion port + * the I/O completion port. Return 1 if any error. */ int io_completion_port_add_clock_io( struct refclockio *rio ) { - struct recvbuf *buff = NULL; if (NULL == CreateIoCompletionPort((HANDLE) rio->fd, hIoCompletionPort, (DWORD) rio, 0)) { msyslog(LOG_ERR, "Can't add COM port to i/o completion port: %m"); return 1; } - - if (NULL != (buff = get_free_recv_buffer())) { - buff->fd = rio->fd; - buff->iocompletioninfo.iofunction = OnIoReadComplete; - if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, &buff->iocompletioninfo.overlapped)) { - DWORD Result = GetLastError(); - switch (Result) { - } - } - } - else return 1; + QueueIORead( rio ); return 0; } +/* Queue a receiver on a socket. Returns 0 if no buffer can be queud */ +static unsigned long QueueSocketRecv(SOCKET s) { + + struct recvbuf *buff; + buff = get_free_recv_buffer(); - - - - -static int -OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes) -{ - struct recvbuf *buff = NULL; - struct interface * inter = (struct interface *) i; - - /* Convert the overlapped pointer back to a recvbuf pointer. - */ - buff = (struct recvbuf *) ( ((char *) Info) - offsetof(struct recvbuf, iocompletioninfo)); - - buff->recv_length = (int) Bytes; - get_systime(&buff->recv_time); - buff->receiver = receive; - buff->dstadr = inter; - if (debug > 3) { - char strbuffer[256]; - DWORD strlength = sizeof(strbuffer); - if (0 == WSAAddressToString((struct sockaddr *) &buff->recv_srcadr, buff->AddressLength, NULL, strbuffer, &strlength)) { -#ifdef DEBUG - if (debug) - printf("Received %d bytes from %s\n", Bytes, strbuffer); -#endif - } - - } - - add_full_recv_buffer(buff); - buff = NULL; - - if (NULL != (buff = get_free_recv_buffer())) { + if (buff != NULL) { DWORD BytesReceived = 0; DWORD Flags = 0; buff->iocompletioninfo.iofunction = OnSocketRecv; - buff->fd = inter->fd; + buff->fd = s; buff->AddressLength = sizeof(struct sockaddr_in); + if (SOCKET_ERROR == WSARecvFrom(buff->fd, &buff->wsabuff, 1, &BytesReceived, &Flags, (struct sockaddr *) &buff->recv_srcadr, &buff->AddressLength, @@ -229,29 +205,64 @@ OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes) case WSA_IO_INCOMPLETE : case WSA_WAIT_IO_COMPLETION : case WSA_IO_PENDING : - break ; case WSAENOTSOCK : msyslog(LOG_ERR, "Can't read from socket, because it isn't a socket: %m"); - break; + freerecvbuf(buff); + return 0; + break; case WSAEFAULT : msyslog(LOG_ERR, "The buffers parameter is incorrect: %m"); + freerecvbuf(buff); + return 0; break; default : /* nop */ ; - } } } - else return 0; + else + return 0; return 1; } - +/* Returns 0 if any Error */ +static int +OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes) +{ + struct recvbuf *buff = NULL; + struct interface * inter = (struct interface *) i; + + /* Convert the overlapped pointer back to a recvbuf pointer. + */ + + buff = (struct recvbuf *) ( ((char *) Info) - offsetof(struct recvbuf, iocompletioninfo)); + get_systime(&buff->recv_time); + + if (Bytes > 0) { + buff->recv_length = (int) Bytes; + buff->receiver = receive; + buff->dstadr = inter; +#ifdef DEBUG + if (debug > 3) { + char strbuffer[256]; + DWORD strlength = sizeof(strbuffer); + if (0 == WSAAddressToString((struct sockaddr *) &buff->recv_srcadr, buff->AddressLength, NULL, strbuffer, &strlength)) + printf("Received %d bytes from %s\n", Bytes, strbuffer); + } +#endif + add_full_recv_buffer(buff); + } + else { + freerecvbuf(buff); + } + QueueSocketRecv(inter->fd); + return 1; +} /* Add a socket handle to the I/O completion port, and send an I/O @@ -263,54 +274,29 @@ OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes) extern void io_completion_port_add_socket(struct interface *inter) { - struct recvbuf *buff = NULL; - if (NULL == CreateIoCompletionPort((HANDLE) inter->fd, hIoCompletionPort, (DWORD) inter, 0)) { msyslog(LOG_ERR, "Can't add socket to i/o completion port: %m"); - exit(1); } + else QueueSocketRecv(inter->fd); +} - if (NULL != (buff = get_free_recv_buffer())) { - DWORD BytesReceived = 0; - DWORD Flags = 0; - buff->iocompletioninfo.iofunction = OnSocketRecv; - buff->fd = inter->fd; - buff->AddressLength = sizeof(struct sockaddr_in); - buff->receiver = receive; - - if (SOCKET_ERROR == WSARecvFrom(buff->fd, &buff->wsabuff, 1, - &BytesReceived, &Flags, - (struct sockaddr *) &buff->recv_srcadr, &buff->AddressLength, - &buff->iocompletioninfo.overlapped, NULL)) { - DWORD Result = WSAGetLastError(); - switch (Result) { - case NO_ERROR : - case WSA_IO_INCOMPLETE : - case WSA_WAIT_IO_COMPLETION : - case WSA_IO_PENDING : - - break ; - - case WSAENOTSOCK : - msyslog(LOG_ERR, "Can't read from socket, because it isn't a socket: %m"); - break; - - case WSAEFAULT : - msyslog(LOG_ERR, "The buffers parameter is incorrect: %m"); - break; - default : - /* nop */ ; +static int +OnSendToComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes) +{ + transmitbuf *buff = NULL; + (void) Bytes; + (void) Key; - } - } - } + buff = (struct transmitbuf *) ( ((char *) Info) - offsetof(struct transmitbuf, iocompletioninfo)); + free_transmit_buffer(buff); + return 1; } static int -OnSendToComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes) +OnWriteComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes) { transmitbuf *buff = NULL; (void) Bytes; @@ -322,6 +308,7 @@ OnSendToComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes) return 1; } + DWORD io_completion_port_sendto( struct interface *inter, @@ -342,38 +329,81 @@ io_completion_port_sendto( buff->wsabuf.buf = buff->pkt; buff->wsabuf.len = len; - buff->iocompletioninfo.iofunction = OnSendToComplete; Result = WSASendTo(inter->fd, &buff->wsabuf, 1, &BytesSent, Flags, (struct sockaddr *) dest, sizeof(struct sockaddr_in), &buff->iocompletioninfo.overlapped, NULL); +#ifdef DEBUG if (debug > 2) { char strbuffer[256]; DWORD strlength = sizeof(strbuffer); - if (0 == WSAAddressToString((LPSOCKADDR) dest, sizeof(*dest), NULL, strbuffer, &strlength)) { -#ifdef DEBUG - if (debug) + if (0 == WSAAddressToString((LPSOCKADDR) dest, sizeof(*dest), NULL, strbuffer, &strlength)) printf("SendTo - %d bytes to %s : %d\n", len, strbuffer, Result); -#endif - } } - +#endif } else { #ifdef DEBUG if (debug) printf("No more transmit buffers left - data discarded\n"); #endif + return ~ERROR_SUCCESS; } } else { #ifdef DEBUG if (debug) printf("Packet too large\n"); #endif - exit(1); + return ~ERROR_SUCCESS; } return Result; } +/* + * Async IO Write + */ +DWORD +io_completion_port_write( + HANDLE fd, + char *pkt, + int len) +{ + transmitbuf *buff = NULL; + DWORD lpNumberOfBytesWritten; + DWORD Result = -1; + + if (len <= sizeof(buff->pkt)) { + buff = get_free_transmit_buffer(); + if (buff != NULL) { + DWORD BytesSent = 0; + DWORD Flags = 0; + + memcpy(&buff->pkt, pkt, len); + buff->iocompletioninfo.iofunction = OnWriteComplete; + + Result = WriteFile(fd, buff->pkt, len, &lpNumberOfBytesWritten, &buff->iocompletioninfo.overlapped); + +#ifdef DEBUG + if (debug > 2) { + printf("SendTo - %d bytes %d\n", len, Result); + } +#endif + if (Result) return len; + } + else { +#ifdef DEBUG + if (debug) + printf("No more transmit buffers left - data discarded\n"); +#endif + + } + } + else { +#ifdef DEBUG + if (debug) printf("Packet too large\n"); +#endif + } + return Result; +} #else static int NonEmptyCompilationUnit;