From: Danny Mayer Date: Sun, 24 Jul 2005 19:50:24 +0000 (-0400) Subject: Revamp of I/O Completion ports for Windows X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4ac9b6273fa366d4e0167c42d46570b3a7fb3fb3;p=thirdparty%2Fntp.git Revamp of I/O Completion ports for Windows bk: 42e3f100iYHOeVbnGFCdl1WPvpLpEA --- diff --git a/include/recvbuff.h b/include/recvbuff.h index 4e5ac7b575..544ef5cfb0 100644 --- a/include/recvbuff.h +++ b/include/recvbuff.h @@ -56,9 +56,7 @@ struct recvbuf { #define recv_srcclock X_from_where.X_recv_srcclock #define recv_peer X_from_where.X_recv_peer #if defined HAVE_IO_COMPLETION_PORT - IoCompletionInfo iocompletioninfo; WSABUF wsabuff; - DWORD AddressLength; #else struct sockaddr_storage srcadr; /* where packet came from */ #endif diff --git a/libntp/recvbuff.c b/libntp/recvbuff.c index 1a9848fd05..2e2fc79488 100644 --- a/libntp/recvbuff.c +++ b/libntp/recvbuff.c @@ -24,12 +24,18 @@ static struct recvbuf *volatile fulllist; /* lifo buffers with data */ static struct recvbuf *volatile beginlist; /* fifo buffers with data */ #if defined(HAVE_IO_COMPLETION_PORT) -static CRITICAL_SECTION RecvCritSection; -# define RECV_BLOCK_IO() EnterCriticalSection(&RecvCritSection) -# define RECV_UNBLOCK_IO() LeaveCriticalSection(&RecvCritSection) + +/* + * For Windows we need to set up a lock to manipulate the + * recv buffers to prevent corruption. We keep it lock for as + * short a time as possible + */ +static CRITICAL_SECTION RecvLock; +# define LOCK() EnterCriticalSection(&RecvLock) +# define UNLOCK() LeaveCriticalSection(&RecvLock) #else -# define RECV_BLOCK_IO() -# define RECV_UNBLOCK_IO() +# define LOCK() +# define UNLOCK() #endif u_long @@ -62,7 +68,6 @@ initialise_buffer(struct recvbuf *buff) memset((char *) buff, 0, sizeof(struct recvbuf)); #if defined HAVE_IO_COMPLETION_PORT - buff->iocompletioninfo.overlapped.hEvent = CreateEvent(NULL, FALSE,FALSE, NULL); buff->wsabuff.len = RX_BUFF_SIZE; buff->wsabuff.buf = (char *) buff->recv_buffer; #endif @@ -114,7 +119,7 @@ init_recvbuff(int nbufs) full_recvbufs = lowater_adds = 0; #if defined(HAVE_IO_COMPLETION_PORT) - InitializeCriticalSection(&RecvCritSection); + InitializeCriticalSection(&RecvLock); #endif } @@ -131,7 +136,7 @@ getrecvbufs(void) { struct recvbuf *rb = NULL; /* nothing has arrived */; - RECV_BLOCK_IO(); + LOCK(); if (full_recvbufs == 0) { #ifdef DEBUG @@ -166,7 +171,7 @@ getrecvbufs(void) } } } - RECV_UNBLOCK_IO(); + UNLOCK(); /* * Return the chain @@ -182,13 +187,13 @@ freerecvbuf( struct recvbuf *rb ) { - RECV_BLOCK_IO(); + LOCK(); BLOCKIO(); rb->next = (struct recvbuf *) freelist; freelist = rb; free_recvbufs++; UNBLOCKIO(); - RECV_UNBLOCK_IO(); + UNLOCK(); } @@ -197,7 +202,7 @@ add_full_recv_buffer( struct recvbuf *rb ) { - RECV_BLOCK_IO(); + LOCK(); if (full_recvbufs == 0) { beginlist = rb; @@ -211,14 +216,14 @@ add_full_recv_buffer( fulllist = rb; full_recvbufs++; - RECV_UNBLOCK_IO(); + UNLOCK(); } struct recvbuf * get_free_recv_buffer(void) { struct recvbuf * buffer = NULL; - RECV_BLOCK_IO(); + LOCK(); if (free_recvbufs <= RECV_LOWAT) { if (total_recvbufs >= RECV_TOOMANY) { @@ -239,7 +244,7 @@ get_free_recv_buffer(void) --free_recvbufs; } - RECV_UNBLOCK_IO(); + UNLOCK(); return buffer; } @@ -247,13 +252,13 @@ struct recvbuf * get_full_recv_buffer(void) { struct recvbuf * buffer = NULL; - RECV_BLOCK_IO(); + LOCK(); if (full_recvbufs > 0) { --full_recvbufs; buffer = beginlist; beginlist = buffer->next; buffer->next = NULL; } - RECV_UNBLOCK_IO(); + UNLOCK(); return buffer; } diff --git a/ports/winnt/include/ntp_iocompletionport.h b/ports/winnt/include/ntp_iocompletionport.h index f04eb9ebca..d02c0f20d6 100644 --- a/ports/winnt/include/ntp_iocompletionport.h +++ b/ports/winnt/include/ntp_iocompletionport.h @@ -8,10 +8,22 @@ struct IoCompletionInfo; struct refclockio; +/* + * Request types + */ +enum { + SOCK_RECV, + SOCK_SEND, + CLOCK_READ, + CLOCK_WRITE +}; + typedef int IoCompletionInfoFunction(DWORD Key, struct IoCompletionInfo *, DWORD Bytes); typedef struct IoCompletionInfo { OVERLAPPED overlapped; + int request_type; + LPVOID buff; IoCompletionInfoFunction * iofunction; } IoCompletionInfo; @@ -26,12 +38,13 @@ extern DWORD io_completion_port_sendto (struct interface *, struct pkt *, int, s extern HANDLE get_io_event (void); -static int OnSocketRecv(DWORD, struct IoCompletionInfo *, DWORD); - struct recvbuf *GetReceivedBuffers(void); static int QueueIORead( struct refclockio * ); -static int OnIoReadComplete(DWORD, struct IoCompletionInfo *, DWORD); + +static int OnSocketRecv(DWORD, IoCompletionInfo *, DWORD); +static int OnIoReadComplete(DWORD, IoCompletionInfo *, DWORD); +static int OnWriteComplete(DWORD, IoCompletionInfo *, DWORD); # endif #endif diff --git a/ports/winnt/include/transmitbuff.h b/ports/winnt/include/transmitbuff.h index 3094efe1cc..33487c26e7 100644 --- a/ports/winnt/include/transmitbuff.h +++ b/ports/winnt/include/transmitbuff.h @@ -6,8 +6,6 @@ # include "ntp_iocompletionport.h" #endif -#define TRANSMIT_BUF_LENGTH 1024 - /* * Format of a transmitbuf. These are used by the asynchronous receive * routine to store incoming packets and related information. @@ -16,10 +14,8 @@ typedef struct transmitbuf { struct transmitbuf *next; /* next buffer in chain */ -#if defined HAVE_IO_COMPLETION_PORT - IoCompletionInfo iocompletioninfo; - WSABUF wsabuf; -#endif + WSABUF wsabuf; + time_t ts; /* Time stamp for the request */ /* * union { @@ -37,7 +33,7 @@ extern void init_transmitbuff P((void)); /* freetransmitbuf - make a single transmitbuf available for reuse */ -extern void free_transmit_buffer P((struct transmitbuf *)); +extern void free_transmit_buffer P((transmitbuf *)); /* Get a free buffer (typically used so an async * read can directly place data into the buffer @@ -45,13 +41,7 @@ extern void free_transmit_buffer P((struct transmitbuf *)); * The buffer is removed from the free list. Make sure * you put it back with freetransmitbuf() or */ -extern struct transmitbuf *get_free_transmit_buffer P((void)); - - - - - - +extern transmitbuf *get_free_transmit_buffer P((void)); #endif /* defined __transmitbuff_h */ diff --git a/ports/winnt/libntp/transmitbuff.c b/ports/winnt/libntp/transmitbuff.c index d098ee9415..ec3093c6c1 100644 --- a/ports/winnt/libntp/transmitbuff.c +++ b/ports/winnt/libntp/transmitbuff.c @@ -8,6 +8,7 @@ #include "ntp_stdlib.h" #include "ntp_syslog.h" +#include #include "transmitbuff.h" /* @@ -18,45 +19,51 @@ #define TRANSMIT_INC 5 /* get 5 more at a time */ #define TRANSMIT_TOOMANY 40 /* this is way too many buffers */ +/* + * Maximum time in seconds to allow transmit request to complete + * After that we are free to delete it if we need the buffer + */ + +#define MAX_TRANSMIT_SEND_TIME 60.0 + /* * Memory allocation */ static volatile u_long full_transmitbufs = 0; /* number of transmitbufs on fulllist */ static volatile u_long free_transmitbufs = 0; /* number of transmitbufs on freelist */ -static struct transmitbuf *volatile freelist = NULL; /* free buffers */ -static struct transmitbuf *volatile fulllist = NULL; /* lifo buffers with data */ -static struct transmitbuf *volatile beginlist = NULL; /* fifo buffers with data */ +typedef struct transmitb transmitb_t; + +struct transmitb { + transmitbuf *tb; + ISC_LINK(transmitb_t) link; +}; + +ISC_LIST(transmitb_t) fulllist; /* Currently used transmit buffers */ + +static transmitbuf *volatile freelist = NULL; /* free buffers */ +static transmitbuf *volatile beginlist = NULL; /* fifo buffers with data */ static u_long total_transmitbufs = 0; /* total transmitbufs currently in use */ static u_long lowater_additions = 0; /* number of times we have added memory */ -static struct transmitbuf initial_bufs[TRANSMIT_INIT]; /* initial allocation */ +static transmitbuf initial_bufs[TRANSMIT_INIT]; /* initial allocation */ -#if defined(HAVE_SIGNALED_IO) -# define TRANSMIT_BLOCK_IO() BLOCKIO() -# define TRANSMIT_UNBLOCK_IO() UNBLOCKIO() -#elif defined(HAVE_IO_COMPLETION_PORT) -static CRITICAL_SECTION TransmitCritSection; -# define TRANSMIT_BLOCK_IO() EnterCriticalSection(&TransmitCritSection) -# define TRANSMIT_UNBLOCK_IO() LeaveCriticalSection(&TransmitCritSection) -#else -# define TRANSMIT_BLOCK_IO() -# define TRANSMIT_UNBLOCK_IO() -#endif +static CRITICAL_SECTION TransmitLock; +# define LOCK(lock) EnterCriticalSection(lock) +# define UNLOCK(lock) LeaveCriticalSection(lock) +static struct transmitbuf initial_bufs[TRANSMIT_INIT]; /* initial allocation */ +static int eventid = 0; static void -initialise_buffer(struct transmitbuf *buff) +initialise_buffer(transmitbuf *buff) { - memset((char *) buff, 0, sizeof(struct transmitbuf)); + memset((char *) buff, 0, sizeof(transmitbuf)); -#if defined HAVE_IO_COMPLETION_PORT - buff->iocompletioninfo.overlapped.hEvent = CreateEvent(NULL, FALSE,FALSE, NULL); buff->wsabuf.len = 0; buff->wsabuf.buf = (char *) &buff->pkt; -#endif } @@ -67,36 +74,97 @@ init_transmitbuff(void) /* * Init buffer free list and stat counters */ - freelist = 0; + freelist = NULL; for (i = 0; i < TRANSMIT_INIT; i++) { initialise_buffer(&initial_bufs[i]); - initial_bufs[i].next = (struct transmitbuf *) freelist; + initial_bufs[i].next = (transmitbuf *) freelist; freelist = &initial_bufs[i]; } - fulllist = 0; + ISC_LIST_INIT(fulllist); free_transmitbufs = total_transmitbufs = TRANSMIT_INIT; full_transmitbufs = lowater_additions = 0; -#if defined(HAVE_IO_COMPLETION_PORT) - InitializeCriticalSection(&TransmitCritSection); -#endif + InitializeCriticalSection(&TransmitLock); +} + +static void +add_buffer_to_freelist(transmitbuf *tb) +{ + tb->next = freelist; + freelist = tb; + free_transmitbufs++; +} + +static void +delete_buffer_from_full_list(transmitbuf *tb) { + + transmitb_t *next; + transmitb_t *lbuf = ISC_LIST_HEAD(fulllist); + + while(lbuf != NULL) { + next = ISC_LIST_NEXT(lbuf, link); + if(lbuf->tb == tb) { + ISC_LIST_DEQUEUE_TYPE(fulllist, lbuf, link, transmitb_t); + free(lbuf); + break; + } + else + lbuf = next; + } } +/* + * routine to free up any buffer that has not been freed up + * after MAX_TRANSMIT_SEND_TIME seconds. Note that we are not being + * too careful here about the correct value of time since we just need + * and approximate measure of how much time has elapsed since the + * packet was sent and this routine is only called if we run out + * of tranmit buffers. + */ +static int +free_unsent_buffers() +{ + int tot_freed = 0; + double elapsed_time; + time_t ct; + transmitbuf *buf; + transmitb_t *next; + transmitb_t *lbuf = ISC_LIST_HEAD(fulllist); + + time(&ct); /* Current Time */ + + LOCK(&TransmitLock); + while(lbuf != NULL) { + next = ISC_LIST_NEXT(lbuf, link); + elapsed_time = difftime(ct, lbuf->tb->ts); + if (elapsed_time > MAX_TRANSMIT_SEND_TIME) { + ISC_LIST_DEQUEUE_TYPE(fulllist, lbuf, link, transmitb_t); + free(lbuf); + add_buffer_to_freelist(lbuf->tb); + tot_freed++; + } + else + lbuf = next; + } + UNLOCK(&TransmitLock); + return (tot_freed); +} static void create_buffers(void) { - register struct transmitbuf *buf; + transmitbuf *buf; int i; + if (free_unsent_buffers() > 0) + return; - buf = (struct transmitbuf *) - emalloc(TRANSMIT_INC*sizeof(struct transmitbuf)); + buf = (transmitbuf *) emalloc(TRANSMIT_INC*sizeof(transmitbuf)); for (i = 0; i < TRANSMIT_INC; i++) { initialise_buffer(buf); - buf->next = (struct transmitbuf *) freelist; + buf->next = (transmitbuf *) freelist; freelist = buf; buf++; } @@ -108,31 +176,33 @@ create_buffers(void) extern void -free_transmit_buffer( - struct transmitbuf *rb - ) +free_transmit_buffer(transmitbuf *rb) { - TRANSMIT_BLOCK_IO(); - rb->next = freelist; - freelist = rb; - free_transmitbufs++; - TRANSMIT_UNBLOCK_IO(); + LOCK(&TransmitLock); + add_buffer_to_freelist(rb); + delete_buffer_from_full_list(rb); + UNLOCK(&TransmitLock); } -extern struct transmitbuf * +extern transmitbuf * get_free_transmit_buffer(void) { - struct transmitbuf * buffer = NULL; - TRANSMIT_BLOCK_IO(); + transmitb_t *lbuf = (transmitb_t *)malloc(sizeof(transmitb_t)); + + transmitbuf * buffer = NULL; + LOCK(&TransmitLock); if (free_transmitbufs <= 0) { create_buffers(); } buffer = freelist; freelist = buffer->next; buffer->next = NULL; + time(&buffer->ts); /* Time we gave out the transmit buffer */ + lbuf->tb = buffer; + ISC_LIST_APPEND(fulllist, lbuf, link); --free_transmitbufs; - TRANSMIT_UNBLOCK_IO(); + UNLOCK(&TransmitLock); return buffer; } diff --git a/ports/winnt/ntpd/nt_clockstuff.c b/ports/winnt/ntpd/nt_clockstuff.c index 9678c3127e..b6a03cb73d 100644 --- a/ports/winnt/ntpd/nt_clockstuff.c +++ b/ports/winnt/ntpd/nt_clockstuff.c @@ -430,7 +430,7 @@ static void StartClockThread(void) /* init sync objects */ InitializeCriticalSection(&TimerCritialSection); - TimerThreadExitRequest = CreateEvent(NULL, FALSE, FALSE, NULL); + TimerThreadExitRequest = CreateEvent(NULL, FALSE, FALSE, "TimerThreadExitRequest"); ClockThreadHandle = CreateThread(NULL, 0, ClockThread, NULL, 0, &tid); if (ClockThreadHandle != NULL) { diff --git a/ports/winnt/ntpd/ntp_iocompletionport.c b/ports/winnt/ntpd/ntp_iocompletionport.c index 2bfe406e1e..9cd14fb7e1 100644 --- a/ports/winnt/ntpd/ntp_iocompletionport.c +++ b/ports/winnt/ntpd/ntp_iocompletionport.c @@ -18,10 +18,18 @@ #include "transmitbuff.h" +#define BUFCHECK_SECS 10 +static void TransmitCheckThread(void *NotUsed); +static BOOL bExit; +static HANDLE TimerHandle; +static HANDLE hHeapHandle = NULL; + static HANDLE hIoCompletionPort = NULL; static HANDLE WaitableIoEventHandle = NULL; -HANDLE WaitHandles[3] = { NULL, NULL, NULL }; + +#define MAXHANDLES 3 +HANDLE WaitHandles[MAXHANDLES] = { NULL, NULL, NULL }; HANDLE get_io_event() @@ -29,38 +37,24 @@ get_io_event() return( WaitableIoEventHandle ); } - -static int -OnExitRequest(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes) -{ - (void) Info; - (void) Bytes; - (void) Key; - return 0; /* auto-fail to request an exit */ -} - -static IoCompletionInfo ExitRequest; /* This function will add an entry to the I/O completion port * that will signal the I/O thread to exit (gracefully) */ static void signal_io_completion_port_exit() { - ExitRequest.iofunction = OnExitRequest; - if (!PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, &ExitRequest.overlapped)) { + if (!PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, 0)) { msyslog(LOG_ERR, "Can't request service thread to exit: %m"); exit(1); } } - -static void __cdecl +static void iocompletionthread(void *NotUsed) { DWORD BytesTransferred = 0; DWORD Key = 0; - IoCompletionInfo * Overlapped = NULL; - (void) NotUsed; + IoCompletionInfo * lpo = NULL; /* Set the thread priority high enough so I/O will * preempt normal recv packet processing, but not @@ -70,17 +64,43 @@ iocompletionthread(void *NotUsed) msyslog(LOG_ERR, "Can't set thread priority: %m"); } - for (;;) { - while (GetQueuedCompletionStatus(hIoCompletionPort, &BytesTransferred, &Key, & (LPOVERLAPPED) Overlapped, INFINITE)) { - if (Overlapped != NULL && - Overlapped->iofunction != NULL) { - if (!Overlapped->iofunction(Key, Overlapped, BytesTransferred)) { + while (TRUE) { + GetQueuedCompletionStatus(hIoCompletionPort, + &BytesTransferred, + &Key, + & (LPOVERLAPPED) lpo, + INFINITE); + if (lpo == NULL) + { #ifdef DEBUG msyslog(LOG_INFO, "Overlapped IO Thread Exits: %m"); #endif - return ; /* fail */ - } + break; /* fail */ + } + + /* + * Invoke the appropriate function based on + * the value of the request_type + */ + switch(lpo->request_type) + { + case CLOCK_READ: + OnIoReadComplete(Key, lpo, BytesTransferred); + break; + case SOCK_RECV: + OnSocketRecv(Key, lpo, BytesTransferred); + break; + case SOCK_SEND: + case CLOCK_WRITE: + OnWriteComplete(Key, lpo, BytesTransferred); + break; + default: +#if DEBUG + if (debug > 3) { + printf("Unknown request type %d found in completion port\n", + lpo->request_type); } +#endif } } } @@ -93,9 +113,21 @@ init_io_completion_port( ) { + /* + * Create a handle to the Heap + */ + hHeapHandle = HeapCreate(0, 20*sizeof(IoCompletionInfo), 0); + + /* Set the exit flag */ + bExit = FALSE; + /* + * Initialize the timer watch section + */ +// _beginthread(TransmitCheckThread, 0, NULL); + /* Create the event used to signal an IO event */ - WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, NULL); + WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, "WaitableIoEventHandle"); if (WaitableIoEventHandle == NULL) { msyslog(LOG_ERR, "Can't create I/O event handle: %m"); exit(1); @@ -112,7 +144,7 @@ init_io_completion_port( /* * Initialize the Wait Handles */ - WaitHandles[0] = CreateEvent(NULL, FALSE, FALSE, NULL); /* exit request */ + WaitHandles[0] = CreateEvent(NULL, FALSE, FALSE, "WaitHandles0"); /* exit request */ WaitHandles[1] = get_timer_handle(); WaitHandles[2] = get_io_event(); @@ -128,6 +160,12 @@ uninit_io_completion_port( void ) { + /* + * Tell the timer handle to exit + */ + bExit = TRUE; + SetEvent(TimerHandle); + if (hIoCompletionPort != NULL) { /* Get each of the service threads to exit */ @@ -139,15 +177,21 @@ uninit_io_completion_port( static int QueueIORead( struct refclockio *rio ) { struct recvbuf *buff; + IoCompletionInfo *lpo; + lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle, + HEAP_ZERO_MEMORY, + sizeof(IoCompletionInfo)); buff = get_free_recv_buffer(); if (buff == NULL) return 0; + lpo->request_type = CLOCK_READ; + lpo->buff = buff; + buff->fd = rio->fd; - buff->iocompletioninfo.iofunction = OnIoReadComplete; - if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, &buff->iocompletioninfo.overlapped)) { + if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, (LPOVERLAPPED) lpo)) { DWORD Result = GetLastError(); switch (Result) { case NO_ERROR : @@ -158,6 +202,9 @@ static int QueueIORead( struct refclockio *rio ) { default: msyslog(LOG_ERR, "Can't read from Refclock: %m"); freerecvbuf(buff); + /* Clear the heap */ + if (lpo != NULL) + HeapFree(hHeapHandle, 0, lpo); return 0; } } @@ -168,15 +215,14 @@ static int QueueIORead( struct refclockio *rio ) { /* Return 1 on Successful Read */ static int -OnIoReadComplete(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes) +OnIoReadComplete(DWORD i, IoCompletionInfo *lpo, DWORD Bytes) { struct recvbuf *buff = NULL; struct refclockio * rio = (struct refclockio *) i; /* Convert the overlapped pointer back to a recvbuf pointer. */ - buff = (struct recvbuf *) ( ((char *) Info) - offsetof(struct recvbuf, iocompletioninfo)); - + buff = (struct recvbuf *) lpo->buff; if (Bytes > 0) { /* ignore 0 bytes read due to timeout's */ get_systime(&buff->recv_time); buff->recv_length = (int) Bytes; @@ -196,6 +242,10 @@ OnIoReadComplete(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes) else freerecvbuf(buff); + /* Clear the heap */ + if (lpo != NULL) + HeapFree(hHeapHandle, 0, lpo); + QueueIORead( rio ); return 1; } @@ -221,19 +271,27 @@ io_completion_port_add_clock_io( static unsigned long QueueSocketRecv(SOCKET s) { struct recvbuf *buff; + IoCompletionInfo *lpo; + int AddrLen; + + lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle, + HEAP_ZERO_MEMORY, + sizeof(IoCompletionInfo)); buff = get_free_recv_buffer(); + lpo->request_type = SOCK_RECV; + lpo->buff = buff; + if (buff != NULL) { DWORD BytesReceived = 0; DWORD Flags = 0; - buff->iocompletioninfo.iofunction = OnSocketRecv; buff->fd = s; - buff->AddressLength = sizeof(struct sockaddr_in); + AddrLen = sizeof(struct sockaddr_in); if (SOCKET_ERROR == WSARecvFrom(buff->fd, &buff->wsabuff, 1, &BytesReceived, &Flags, - (struct sockaddr *) &buff->recv_srcadr, (LPINT) &buff->AddressLength, - &buff->iocompletioninfo.overlapped, NULL)) { + (struct sockaddr *) &buff->recv_srcadr, (LPINT) &AddrLen, + (LPOVERLAPPED) lpo, NULL)) { DWORD Result = WSAGetLastError(); switch (Result) { case NO_ERROR : @@ -245,11 +303,17 @@ static unsigned long QueueSocketRecv(SOCKET s) { case WSAENOTSOCK : netsyslog(LOG_ERR, "Can't read from socket, because it isn't a socket: %m"); freerecvbuf(buff); + /* Clear the heap */ + if (lpo != NULL) + HeapFree(hHeapHandle, 0, lpo); return 0; break; case WSAEFAULT : netsyslog(LOG_ERR, "The buffers parameter is incorrect: %m"); + /* Clear the heap */ + if (lpo != NULL) + HeapFree(hHeapHandle, 0, lpo); freerecvbuf(buff); return 0; break; @@ -267,7 +331,7 @@ static unsigned long QueueSocketRecv(SOCKET s) { /* Returns 0 if any Error */ static int -OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes) +OnSocketRecv(DWORD i, IoCompletionInfo *lpo, DWORD Bytes) { struct recvbuf *buff = NULL; struct interface * inter = (struct interface *) i; @@ -275,7 +339,7 @@ OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes) /* Convert the overlapped pointer back to a recvbuf pointer. */ - buff = (struct recvbuf *) ( ((char *) Info) - offsetof(struct recvbuf, iocompletioninfo)); + buff = (struct recvbuf *) lpo->buff; get_systime(&buff->recv_time); if (Bytes > 0 && inter->ignore_packets == ISC_FALSE) { @@ -283,12 +347,8 @@ OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes) buff->receiver = receive; buff->dstadr = inter; #ifdef DEBUG - if (debug > 3) { - char strbuffer[256]; - DWORD strlength = sizeof(strbuffer); - if (0 == WSAAddressToString((struct sockaddr *) &buff->recv_srcadr, buff->AddressLength, NULL, strbuffer, &strlength)) - printf("Received %d bytes from %s\n", Bytes, strbuffer); - } + if (debug > 3) + printf("Received %d bytes from %s\n", stoa(&buff->recv_srcadr)); #endif add_full_recv_buffer(buff); if( !SetEvent( WaitableIoEventHandle ) ) { @@ -302,6 +362,10 @@ OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes) else { freerecvbuf(buff); } + /* Clear the heap */ + if (lpo != NULL) + HeapFree(hHeapHandle, 0, lpo); + QueueSocketRecv(inter->fd); return 1; } @@ -327,29 +391,35 @@ io_completion_port_add_socket(SOCKET fd, struct interface *inter) static int -OnSendToComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes) +OnSendToComplete(DWORD Key, IoCompletionInfo *lpo, DWORD Bytes) { transmitbuf *buff = NULL; (void) Bytes; (void) Key; - buff = (struct transmitbuf *) ( ((char *) Info) - offsetof(struct transmitbuf, iocompletioninfo)); + buff = (struct transmitbuf *) lpo->buff; free_transmit_buffer(buff); + /* Clear the heap */ + if (lpo != NULL) + HeapFree(hHeapHandle, 0, lpo); return 1; } static int -OnWriteComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes) +OnWriteComplete(DWORD Key, IoCompletionInfo *lpo, DWORD Bytes) { transmitbuf *buff = NULL; (void) Bytes; (void) Key; - buff = (struct transmitbuf *) ( ((char *) Info) - offsetof(struct transmitbuf, iocompletioninfo)); + buff = (struct transmitbuf *) lpo->buff; free_transmit_buffer(buff); + /* Clear the heap */ + if (lpo != NULL) + HeapFree(hHeapHandle, 0, lpo); return 1; } @@ -364,6 +434,12 @@ io_completion_port_sendto( transmitbuf *buff = NULL; DWORD Result = ERROR_SUCCESS; int errval; + int AddrLen; + IoCompletionInfo *lpo; + + lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle, + HEAP_ZERO_MEMORY, + sizeof(IoCompletionInfo)); if (len <= sizeof(buff->pkt)) { buff = get_free_transmit_buffer(); @@ -375,8 +451,11 @@ io_completion_port_sendto( buff->wsabuf.buf = buff->pkt; buff->wsabuf.len = len; - buff->iocompletioninfo.iofunction = OnSendToComplete; - Result = WSASendTo(inter->fd, &buff->wsabuf, 1, &BytesSent, Flags, (struct sockaddr *) dest, sizeof(struct sockaddr_in), &buff->iocompletioninfo.overlapped, NULL); + AddrLen = sizeof(struct sockaddr_in); + lpo->request_type = SOCK_SEND; + lpo->buff = buff; + + Result = WSASendTo(inter->fd, &buff->wsabuf, 1, &BytesSent, Flags, (struct sockaddr *) dest, AddrLen, (LPOVERLAPPED) lpo, NULL); if(Result == SOCKET_ERROR) { @@ -392,16 +471,13 @@ io_completion_port_sendto( default : netsyslog(LOG_ERR, "WSASendTo - error sending message: %m"); + free_transmit_buffer(buff); break; } } #ifdef DEBUG - if (debug > 2) { - char strbuffer[256]; - DWORD strlength = sizeof(strbuffer); - if (0 == WSAAddressToString((LPSOCKADDR) dest, sizeof(*dest), NULL, strbuffer, &strlength)) - printf("WSASendTo - %d bytes to %s : %d\n", len, strbuffer, Result); - } + if (debug > 3) + printf("WSASendTo - %d bytes to %s : %d\n", len, stoa(dest), Result); #endif return (Result); } @@ -431,22 +507,47 @@ io_completion_port_write( char *pkt, int len) { + DWORD errval; transmitbuf *buff = NULL; DWORD lpNumberOfBytesWritten; DWORD Result = ERROR_INSUFFICIENT_BUFFER; + IoCompletionInfo *lpo; + + lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle, + HEAP_ZERO_MEMORY, + sizeof(IoCompletionInfo)); if (len <= sizeof(buff->pkt)) { buff = get_free_transmit_buffer(); if (buff != NULL) { + lpo->request_type = CLOCK_WRITE; + lpo->buff = buff; memcpy(&buff->pkt, pkt, len); - buff->iocompletioninfo.iofunction = OnWriteComplete; - Result = WriteFile(fd, buff->pkt, len, &lpNumberOfBytesWritten, &buff->iocompletioninfo.overlapped); + Result = WriteFile(fd, buff->pkt, len, &lpNumberOfBytesWritten, (LPOVERLAPPED) lpo); + + if(Result == SOCKET_ERROR) + { + errval = WSAGetLastError(); + switch (errval) { + case NO_ERROR : + case WSA_IO_INCOMPLETE : + case WSA_WAIT_IO_COMPLETION : + case WSA_IO_PENDING : + Result = ERROR_SUCCESS; + break ; + + default : + netsyslog(LOG_ERR, "WriteFile - error sending message: %m"); + free_transmit_buffer(buff); + break; + } + } #ifdef DEBUG if (debug > 2) { - printf("SendTo - %d bytes %d\n", len, Result); + printf("WriteFile - %d bytes %d\n", len, Result); } #endif if (Result) return len; @@ -467,10 +568,14 @@ io_completion_port_write( return Result; } +/* + * GetReceivedBuffers + * Note that this is in effect the main loop for processing requests + * both send and receive. This should be reimplemented + */ struct recvbuf *GetReceivedBuffers() { - - DWORD Index = WaitForMultipleObjectsEx(sizeof(WaitHandles)/sizeof(WaitHandles[0]), WaitHandles, FALSE, 1000, TRUE); + DWORD Index = WaitForMultipleObjectsEx(MAXHANDLES, WaitHandles, FALSE, INFINITE, TRUE); switch (Index) { case WAIT_OBJECT_0 + 0 : /* exit request */ exit(0); @@ -501,8 +606,24 @@ struct recvbuf *GetReceivedBuffers() break; } /* switch */ + return (getrecvbufs()); /* get received buffers */ } +static void +TransmitCheckThread(void *NotUsed) +{ + + int SleepSecs = BUFCHECK_SECS; + while (TRUE) + { + Sleep(SleepSecs*1000); + + /* If we are done we exit */ + if (bExit) + return; + + } /* while */ +} #else static int NonEmptyCompilationUnit;