-
#ifdef HAVE_CONFIG_H
# include <config.h>
#endif
}
-
-
static void __cdecl
iocompletionthread(void *NotUsed)
{
* preempt normal recv packet processing, but not
* higher than the timer sync thread.
*/
- if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_BELOW_NORMAL)) {
+ if (!SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_ABOVE_NORMAL)) {
msyslog(LOG_ERR, "Can't set thread priority: %m");
}
for (;;) {
while (GetQueuedCompletionStatus(hIoCompletionPort, &BytesTransferred, &Key, & (LPOVERLAPPED) Overlapped, INFINITE)) {
- if (Overlapped != NULL && Overlapped->iofunction != NULL) {
+ if (Overlapped != NULL &&
+ Overlapped->iofunction != NULL) {
if (!Overlapped->iofunction(Key, Overlapped, BytesTransferred)) {
- return ; /* fail */
+#ifdef DEBUG
+ msyslog(LOG_INFO, "Overlapped IO Thread Exits: %m");
+#endif
+ return ; /* fail */
}
-
}
}
}
}
+static int QueueIORead( struct refclockio *rio ) {
+ struct recvbuf *buff;
+ buff = get_free_recv_buffer();
+
+ if (buff == NULL)
+ return 0;
+
+ buff->fd = rio->fd;
+ buff->iocompletioninfo.iofunction = OnIoReadComplete;
+ if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, &buff->iocompletioninfo.overlapped)) {
+ DWORD Result = GetLastError();
+ switch (Result) {
+ case NO_ERROR :
+ case ERROR_HANDLE_EOF :
+ case ERROR_IO_PENDING :
+ break ;
+
+ default:
+ msyslog(LOG_ERR, "Can't read from Refclock: %m");
+ freerecvbuf(buff);
+ return 0;
+ }
+ }
+ return 1;
+}
+
+
+
+/* Return 1 on Successful Read */
static int
OnIoReadComplete(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes)
{
add_full_recv_buffer(buff);
buff = NULL;
}
+ else
+ freerecvbuf(buff);
- /* If we used the last buffer, then get another
- */
- if (buff == NULL) {
- buff = get_free_recv_buffer();
- }
-
- if (buff != NULL) {
- buff->fd = rio->fd;
- buff->iocompletioninfo.iofunction = OnIoReadComplete;
- if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, &buff->iocompletioninfo.overlapped)) {
- DWORD Result = GetLastError();
- switch (Result) {
- }
- }
- }
+ QueueIORead( rio );
return 1;
}
-
-
-
/* Add a reference clock data structures I/O handles to
- * the I/O completion port
+ * the I/O completion port. Return 1 if any error.
*/
int
io_completion_port_add_clock_io(
struct refclockio *rio
)
{
- struct recvbuf *buff = NULL;
if (NULL == CreateIoCompletionPort((HANDLE) rio->fd, hIoCompletionPort, (DWORD) rio, 0)) {
msyslog(LOG_ERR, "Can't add COM port to i/o completion port: %m");
return 1;
}
-
- if (NULL != (buff = get_free_recv_buffer())) {
- buff->fd = rio->fd;
- buff->iocompletioninfo.iofunction = OnIoReadComplete;
- if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, &buff->iocompletioninfo.overlapped)) {
- DWORD Result = GetLastError();
- switch (Result) {
- }
- }
- }
- else return 1;
+ QueueIORead( rio );
return 0;
}
+/* Queue a receiver on a socket. Returns 0 if no buffer can be queud */
+static unsigned long QueueSocketRecv(SOCKET s) {
+
+ struct recvbuf *buff;
+ buff = get_free_recv_buffer();
-
-
-
-
-static int
-OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes)
-{
- struct recvbuf *buff = NULL;
- struct interface * inter = (struct interface *) i;
-
- /* Convert the overlapped pointer back to a recvbuf pointer.
- */
- buff = (struct recvbuf *) ( ((char *) Info) - offsetof(struct recvbuf, iocompletioninfo));
-
- buff->recv_length = (int) Bytes;
- get_systime(&buff->recv_time);
- buff->receiver = receive;
- buff->dstadr = inter;
- if (debug > 3) {
- char strbuffer[256];
- DWORD strlength = sizeof(strbuffer);
- if (0 == WSAAddressToString((struct sockaddr *) &buff->recv_srcadr, buff->AddressLength, NULL, strbuffer, &strlength)) {
-#ifdef DEBUG
- if (debug)
- printf("Received %d bytes from %s\n", Bytes, strbuffer);
-#endif
- }
-
- }
-
- add_full_recv_buffer(buff);
- buff = NULL;
-
- if (NULL != (buff = get_free_recv_buffer())) {
+ if (buff != NULL) {
DWORD BytesReceived = 0;
DWORD Flags = 0;
buff->iocompletioninfo.iofunction = OnSocketRecv;
- buff->fd = inter->fd;
+ buff->fd = s;
buff->AddressLength = sizeof(struct sockaddr_in);
+
if (SOCKET_ERROR == WSARecvFrom(buff->fd, &buff->wsabuff, 1,
&BytesReceived, &Flags,
(struct sockaddr *) &buff->recv_srcadr, &buff->AddressLength,
case WSA_IO_INCOMPLETE :
case WSA_WAIT_IO_COMPLETION :
case WSA_IO_PENDING :
-
break ;
case WSAENOTSOCK :
msyslog(LOG_ERR, "Can't read from socket, because it isn't a socket: %m");
- break;
+ freerecvbuf(buff);
+ return 0;
+ break;
case WSAEFAULT :
msyslog(LOG_ERR, "The buffers parameter is incorrect: %m");
+ freerecvbuf(buff);
+ return 0;
break;
default :
/* nop */ ;
-
}
}
}
- else return 0;
+ else
+ return 0;
return 1;
}
-
+/* Returns 0 if any Error */
+static int
+OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes)
+{
+ struct recvbuf *buff = NULL;
+ struct interface * inter = (struct interface *) i;
+
+ /* Convert the overlapped pointer back to a recvbuf pointer.
+ */
+
+ buff = (struct recvbuf *) ( ((char *) Info) - offsetof(struct recvbuf, iocompletioninfo));
+ get_systime(&buff->recv_time);
+
+ if (Bytes > 0) {
+ buff->recv_length = (int) Bytes;
+ buff->receiver = receive;
+ buff->dstadr = inter;
+#ifdef DEBUG
+ if (debug > 3) {
+ char strbuffer[256];
+ DWORD strlength = sizeof(strbuffer);
+ if (0 == WSAAddressToString((struct sockaddr *) &buff->recv_srcadr, buff->AddressLength, NULL, strbuffer, &strlength))
+ printf("Received %d bytes from %s\n", Bytes, strbuffer);
+ }
+#endif
+ add_full_recv_buffer(buff);
+ }
+ else {
+ freerecvbuf(buff);
+ }
+ QueueSocketRecv(inter->fd);
+ return 1;
+}
/* Add a socket handle to the I/O completion port, and send an I/O
extern void
io_completion_port_add_socket(struct interface *inter)
{
- struct recvbuf *buff = NULL;
-
if (NULL == CreateIoCompletionPort((HANDLE) inter->fd, hIoCompletionPort, (DWORD) inter, 0)) {
msyslog(LOG_ERR, "Can't add socket to i/o completion port: %m");
- exit(1);
}
+ else QueueSocketRecv(inter->fd);
+}
- if (NULL != (buff = get_free_recv_buffer())) {
- DWORD BytesReceived = 0;
- DWORD Flags = 0;
- buff->iocompletioninfo.iofunction = OnSocketRecv;
- buff->fd = inter->fd;
- buff->AddressLength = sizeof(struct sockaddr_in);
- buff->receiver = receive;
-
- if (SOCKET_ERROR == WSARecvFrom(buff->fd, &buff->wsabuff, 1,
- &BytesReceived, &Flags,
- (struct sockaddr *) &buff->recv_srcadr, &buff->AddressLength,
- &buff->iocompletioninfo.overlapped, NULL)) {
- DWORD Result = WSAGetLastError();
- switch (Result) {
- case NO_ERROR :
- case WSA_IO_INCOMPLETE :
- case WSA_WAIT_IO_COMPLETION :
- case WSA_IO_PENDING :
-
- break ;
-
- case WSAENOTSOCK :
- msyslog(LOG_ERR, "Can't read from socket, because it isn't a socket: %m");
- break;
-
- case WSAEFAULT :
- msyslog(LOG_ERR, "The buffers parameter is incorrect: %m");
- break;
- default :
- /* nop */ ;
+static int
+OnSendToComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes)
+{
+ transmitbuf *buff = NULL;
+ (void) Bytes;
+ (void) Key;
- }
- }
- }
+ buff = (struct transmitbuf *) ( ((char *) Info) - offsetof(struct transmitbuf, iocompletioninfo));
+ free_transmit_buffer(buff);
+ return 1;
}
static int
-OnSendToComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes)
+OnWriteComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes)
{
transmitbuf *buff = NULL;
(void) Bytes;
return 1;
}
+
DWORD
io_completion_port_sendto(
struct interface *inter,
buff->wsabuf.buf = buff->pkt;
buff->wsabuf.len = len;
-
buff->iocompletioninfo.iofunction = OnSendToComplete;
Result = WSASendTo(inter->fd, &buff->wsabuf, 1, &BytesSent, Flags, (struct sockaddr *) dest, sizeof(struct sockaddr_in), &buff->iocompletioninfo.overlapped, NULL);
+#ifdef DEBUG
if (debug > 2) {
char strbuffer[256];
DWORD strlength = sizeof(strbuffer);
- if (0 == WSAAddressToString((LPSOCKADDR) dest, sizeof(*dest), NULL, strbuffer, &strlength)) {
-#ifdef DEBUG
- if (debug)
+ if (0 == WSAAddressToString((LPSOCKADDR) dest, sizeof(*dest), NULL, strbuffer, &strlength))
printf("SendTo - %d bytes to %s : %d\n", len, strbuffer, Result);
-#endif
- }
}
-
+#endif
}
else {
#ifdef DEBUG
if (debug)
printf("No more transmit buffers left - data discarded\n");
#endif
+ return ~ERROR_SUCCESS;
}
}
else {
#ifdef DEBUG
if (debug) printf("Packet too large\n");
#endif
- exit(1);
+ return ~ERROR_SUCCESS;
}
return Result;
}
+/*
+ * Async IO Write
+ */
+DWORD
+io_completion_port_write(
+ HANDLE fd,
+ char *pkt,
+ int len)
+{
+ transmitbuf *buff = NULL;
+ DWORD lpNumberOfBytesWritten;
+ DWORD Result = -1;
+
+ if (len <= sizeof(buff->pkt)) {
+ buff = get_free_transmit_buffer();
+ if (buff != NULL) {
+ DWORD BytesSent = 0;
+ DWORD Flags = 0;
+
+ memcpy(&buff->pkt, pkt, len);
+ buff->iocompletioninfo.iofunction = OnWriteComplete;
+
+ Result = WriteFile(fd, buff->pkt, len, &lpNumberOfBytesWritten, &buff->iocompletioninfo.overlapped);
+
+#ifdef DEBUG
+ if (debug > 2) {
+ printf("SendTo - %d bytes %d\n", len, Result);
+ }
+#endif
+ if (Result) return len;
+ }
+ else {
+#ifdef DEBUG
+ if (debug)
+ printf("No more transmit buffers left - data discarded\n");
+#endif
+
+ }
+ }
+ else {
+#ifdef DEBUG
+ if (debug) printf("Packet too large\n");
+#endif
+ }
+ return Result;
+}
#else
static int NonEmptyCompilationUnit;