#include "ntp_stdlib.h"
#include "ntp_syslog.h"
+#include <isc/list.h>
#include "transmitbuff.h"
/*
#define TRANSMIT_INC 5 /* get 5 more at a time */
#define TRANSMIT_TOOMANY 40 /* this is way too many buffers */
+/*
+ * Maximum time in seconds to allow transmit request to complete
+ * After that we are free to delete it if we need the buffer
+ */
+
+#define MAX_TRANSMIT_SEND_TIME 60.0
+
/*
* Memory allocation
*/
static volatile u_long full_transmitbufs = 0; /* number of transmitbufs on fulllist */
static volatile u_long free_transmitbufs = 0; /* number of transmitbufs on freelist */
-static struct transmitbuf *volatile freelist = NULL; /* free buffers */
-static struct transmitbuf *volatile fulllist = NULL; /* lifo buffers with data */
-static struct transmitbuf *volatile beginlist = NULL; /* fifo buffers with data */
+typedef struct transmitb transmitb_t;
+
+struct transmitb {
+ transmitbuf *tb;
+ ISC_LINK(transmitb_t) link;
+};
+
+ISC_LIST(transmitb_t) fulllist; /* Currently used transmit buffers */
+
+static transmitbuf *volatile freelist = NULL; /* free buffers */
+static transmitbuf *volatile beginlist = NULL; /* fifo buffers with data */
static u_long total_transmitbufs = 0; /* total transmitbufs currently in use */
static u_long lowater_additions = 0; /* number of times we have added memory */
-static struct transmitbuf initial_bufs[TRANSMIT_INIT]; /* initial allocation */
+static transmitbuf initial_bufs[TRANSMIT_INIT]; /* initial allocation */
-#if defined(HAVE_SIGNALED_IO)
-# define TRANSMIT_BLOCK_IO() BLOCKIO()
-# define TRANSMIT_UNBLOCK_IO() UNBLOCKIO()
-#elif defined(HAVE_IO_COMPLETION_PORT)
-static CRITICAL_SECTION TransmitCritSection;
-# define TRANSMIT_BLOCK_IO() EnterCriticalSection(&TransmitCritSection)
-# define TRANSMIT_UNBLOCK_IO() LeaveCriticalSection(&TransmitCritSection)
-#else
-# define TRANSMIT_BLOCK_IO()
-# define TRANSMIT_UNBLOCK_IO()
-#endif
+static CRITICAL_SECTION TransmitLock;
+# define LOCK(lock) EnterCriticalSection(lock)
+# define UNLOCK(lock) LeaveCriticalSection(lock)
+static struct transmitbuf initial_bufs[TRANSMIT_INIT]; /* initial allocation */
+static int eventid = 0;
static void
-initialise_buffer(struct transmitbuf *buff)
+initialise_buffer(transmitbuf *buff)
{
- memset((char *) buff, 0, sizeof(struct transmitbuf));
+ memset((char *) buff, 0, sizeof(transmitbuf));
-#if defined HAVE_IO_COMPLETION_PORT
- buff->iocompletioninfo.overlapped.hEvent = CreateEvent(NULL, FALSE,FALSE, NULL);
buff->wsabuf.len = 0;
buff->wsabuf.buf = (char *) &buff->pkt;
-#endif
}
/*
* Init buffer free list and stat counters
*/
- freelist = 0;
+ freelist = NULL;
for (i = 0; i < TRANSMIT_INIT; i++)
{
initialise_buffer(&initial_bufs[i]);
- initial_bufs[i].next = (struct transmitbuf *) freelist;
+ initial_bufs[i].next = (transmitbuf *) freelist;
freelist = &initial_bufs[i];
}
- fulllist = 0;
+ ISC_LIST_INIT(fulllist);
free_transmitbufs = total_transmitbufs = TRANSMIT_INIT;
full_transmitbufs = lowater_additions = 0;
-#if defined(HAVE_IO_COMPLETION_PORT)
- InitializeCriticalSection(&TransmitCritSection);
-#endif
+ InitializeCriticalSection(&TransmitLock);
+}
+
+static void
+add_buffer_to_freelist(transmitbuf *tb)
+{
+ tb->next = freelist;
+ freelist = tb;
+ free_transmitbufs++;
+}
+
+static void
+delete_buffer_from_full_list(transmitbuf *tb) {
+
+ transmitb_t *next;
+ transmitb_t *lbuf = ISC_LIST_HEAD(fulllist);
+
+ while(lbuf != NULL) {
+ next = ISC_LIST_NEXT(lbuf, link);
+ if(lbuf->tb == tb) {
+ ISC_LIST_DEQUEUE_TYPE(fulllist, lbuf, link, transmitb_t);
+ free(lbuf);
+ break;
+ }
+ else
+ lbuf = next;
+ }
}
+/*
+ * routine to free up any buffer that has not been freed up
+ * after MAX_TRANSMIT_SEND_TIME seconds. Note that we are not being
+ * too careful here about the correct value of time since we just need
+ * and approximate measure of how much time has elapsed since the
+ * packet was sent and this routine is only called if we run out
+ * of tranmit buffers.
+ */
+static int
+free_unsent_buffers()
+{
+ int tot_freed = 0;
+ double elapsed_time;
+ time_t ct;
+ transmitbuf *buf;
+ transmitb_t *next;
+ transmitb_t *lbuf = ISC_LIST_HEAD(fulllist);
+
+ time(&ct); /* Current Time */
+
+ LOCK(&TransmitLock);
+ while(lbuf != NULL) {
+ next = ISC_LIST_NEXT(lbuf, link);
+ elapsed_time = difftime(ct, lbuf->tb->ts);
+ if (elapsed_time > MAX_TRANSMIT_SEND_TIME) {
+ ISC_LIST_DEQUEUE_TYPE(fulllist, lbuf, link, transmitb_t);
+ free(lbuf);
+ add_buffer_to_freelist(lbuf->tb);
+ tot_freed++;
+ }
+ else
+ lbuf = next;
+ }
+ UNLOCK(&TransmitLock);
+ return (tot_freed);
+}
static void
create_buffers(void)
{
- register struct transmitbuf *buf;
+ transmitbuf *buf;
int i;
+ if (free_unsent_buffers() > 0)
+ return;
- buf = (struct transmitbuf *)
- emalloc(TRANSMIT_INC*sizeof(struct transmitbuf));
+ buf = (transmitbuf *) emalloc(TRANSMIT_INC*sizeof(transmitbuf));
for (i = 0; i < TRANSMIT_INC; i++)
{
initialise_buffer(buf);
- buf->next = (struct transmitbuf *) freelist;
+ buf->next = (transmitbuf *) freelist;
freelist = buf;
buf++;
}
extern void
-free_transmit_buffer(
- struct transmitbuf *rb
- )
+free_transmit_buffer(transmitbuf *rb)
{
- TRANSMIT_BLOCK_IO();
- rb->next = freelist;
- freelist = rb;
- free_transmitbufs++;
- TRANSMIT_UNBLOCK_IO();
+ LOCK(&TransmitLock);
+ add_buffer_to_freelist(rb);
+ delete_buffer_from_full_list(rb);
+ UNLOCK(&TransmitLock);
}
-extern struct transmitbuf *
+extern transmitbuf *
get_free_transmit_buffer(void)
{
- struct transmitbuf * buffer = NULL;
- TRANSMIT_BLOCK_IO();
+ transmitb_t *lbuf = (transmitb_t *)malloc(sizeof(transmitb_t));
+
+ transmitbuf * buffer = NULL;
+ LOCK(&TransmitLock);
if (free_transmitbufs <= 0) {
create_buffers();
}
buffer = freelist;
freelist = buffer->next;
buffer->next = NULL;
+ time(&buffer->ts); /* Time we gave out the transmit buffer */
+ lbuf->tb = buffer;
+ ISC_LIST_APPEND(fulllist, lbuf, link);
--free_transmitbufs;
- TRANSMIT_UNBLOCK_IO();
+ UNLOCK(&TransmitLock);
return buffer;
}
#include "transmitbuff.h"
+#define BUFCHECK_SECS 10
+static void TransmitCheckThread(void *NotUsed);
+static BOOL bExit;
+static HANDLE TimerHandle;
+static HANDLE hHeapHandle = NULL;
+
static HANDLE hIoCompletionPort = NULL;
static HANDLE WaitableIoEventHandle = NULL;
-HANDLE WaitHandles[3] = { NULL, NULL, NULL };
+
+#define MAXHANDLES 3
+HANDLE WaitHandles[MAXHANDLES] = { NULL, NULL, NULL };
HANDLE
get_io_event()
return( WaitableIoEventHandle );
}
-
-static int
-OnExitRequest(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes)
-{
- (void) Info;
- (void) Bytes;
- (void) Key;
- return 0; /* auto-fail to request an exit */
-}
-
-static IoCompletionInfo ExitRequest;
/* This function will add an entry to the I/O completion port
* that will signal the I/O thread to exit (gracefully)
*/
static void
signal_io_completion_port_exit()
{
- ExitRequest.iofunction = OnExitRequest;
- if (!PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, &ExitRequest.overlapped)) {
+ if (!PostQueuedCompletionStatus(hIoCompletionPort, 0, 0, 0)) {
msyslog(LOG_ERR, "Can't request service thread to exit: %m");
exit(1);
}
}
-
-static void __cdecl
+static void
iocompletionthread(void *NotUsed)
{
DWORD BytesTransferred = 0;
DWORD Key = 0;
- IoCompletionInfo * Overlapped = NULL;
- (void) NotUsed;
+ IoCompletionInfo * lpo = NULL;
/* Set the thread priority high enough so I/O will
* preempt normal recv packet processing, but not
msyslog(LOG_ERR, "Can't set thread priority: %m");
}
- for (;;) {
- while (GetQueuedCompletionStatus(hIoCompletionPort, &BytesTransferred, &Key, & (LPOVERLAPPED) Overlapped, INFINITE)) {
- if (Overlapped != NULL &&
- Overlapped->iofunction != NULL) {
- if (!Overlapped->iofunction(Key, Overlapped, BytesTransferred)) {
+ while (TRUE) {
+ GetQueuedCompletionStatus(hIoCompletionPort,
+ &BytesTransferred,
+ &Key,
+ & (LPOVERLAPPED) lpo,
+ INFINITE);
+ if (lpo == NULL)
+ {
#ifdef DEBUG
msyslog(LOG_INFO, "Overlapped IO Thread Exits: %m");
#endif
- return ; /* fail */
- }
+ break; /* fail */
+ }
+
+ /*
+ * Invoke the appropriate function based on
+ * the value of the request_type
+ */
+ switch(lpo->request_type)
+ {
+ case CLOCK_READ:
+ OnIoReadComplete(Key, lpo, BytesTransferred);
+ break;
+ case SOCK_RECV:
+ OnSocketRecv(Key, lpo, BytesTransferred);
+ break;
+ case SOCK_SEND:
+ case CLOCK_WRITE:
+ OnWriteComplete(Key, lpo, BytesTransferred);
+ break;
+ default:
+#if DEBUG
+ if (debug > 3) {
+ printf("Unknown request type %d found in completion port\n",
+ lpo->request_type);
}
+#endif
}
}
}
)
{
+ /*
+ * Create a handle to the Heap
+ */
+ hHeapHandle = HeapCreate(0, 20*sizeof(IoCompletionInfo), 0);
+
+ /* Set the exit flag */
+ bExit = FALSE;
+ /*
+ * Initialize the timer watch section
+ */
+// _beginthread(TransmitCheckThread, 0, NULL);
+
/* Create the event used to signal an IO event
*/
- WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, NULL);
+ WaitableIoEventHandle = CreateEvent(NULL, FALSE, FALSE, "WaitableIoEventHandle");
if (WaitableIoEventHandle == NULL) {
msyslog(LOG_ERR, "Can't create I/O event handle: %m");
exit(1);
/*
* Initialize the Wait Handles
*/
- WaitHandles[0] = CreateEvent(NULL, FALSE, FALSE, NULL); /* exit request */
+ WaitHandles[0] = CreateEvent(NULL, FALSE, FALSE, "WaitHandles0"); /* exit request */
WaitHandles[1] = get_timer_handle();
WaitHandles[2] = get_io_event();
void
)
{
+ /*
+ * Tell the timer handle to exit
+ */
+ bExit = TRUE;
+ SetEvent(TimerHandle);
+
if (hIoCompletionPort != NULL) {
/* Get each of the service threads to exit
*/
static int QueueIORead( struct refclockio *rio ) {
struct recvbuf *buff;
+ IoCompletionInfo *lpo;
+ lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle,
+ HEAP_ZERO_MEMORY,
+ sizeof(IoCompletionInfo));
buff = get_free_recv_buffer();
if (buff == NULL)
return 0;
+ lpo->request_type = CLOCK_READ;
+ lpo->buff = buff;
+
buff->fd = rio->fd;
- buff->iocompletioninfo.iofunction = OnIoReadComplete;
- if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, &buff->iocompletioninfo.overlapped)) {
+ if (!ReadFile((HANDLE) buff->fd, &buff->recv_buffer, sizeof(buff->recv_buffer), NULL, (LPOVERLAPPED) lpo)) {
DWORD Result = GetLastError();
switch (Result) {
case NO_ERROR :
default:
msyslog(LOG_ERR, "Can't read from Refclock: %m");
freerecvbuf(buff);
+ /* Clear the heap */
+ if (lpo != NULL)
+ HeapFree(hHeapHandle, 0, lpo);
return 0;
}
}
/* Return 1 on Successful Read */
static int
-OnIoReadComplete(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes)
+OnIoReadComplete(DWORD i, IoCompletionInfo *lpo, DWORD Bytes)
{
struct recvbuf *buff = NULL;
struct refclockio * rio = (struct refclockio *) i;
/* Convert the overlapped pointer back to a recvbuf pointer.
*/
- buff = (struct recvbuf *) ( ((char *) Info) - offsetof(struct recvbuf, iocompletioninfo));
-
+ buff = (struct recvbuf *) lpo->buff;
if (Bytes > 0) { /* ignore 0 bytes read due to timeout's */
get_systime(&buff->recv_time);
buff->recv_length = (int) Bytes;
else
freerecvbuf(buff);
+ /* Clear the heap */
+ if (lpo != NULL)
+ HeapFree(hHeapHandle, 0, lpo);
+
QueueIORead( rio );
return 1;
}
static unsigned long QueueSocketRecv(SOCKET s) {
struct recvbuf *buff;
+ IoCompletionInfo *lpo;
+ int AddrLen;
+
+ lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle,
+ HEAP_ZERO_MEMORY,
+ sizeof(IoCompletionInfo));
buff = get_free_recv_buffer();
+ lpo->request_type = SOCK_RECV;
+ lpo->buff = buff;
+
if (buff != NULL) {
DWORD BytesReceived = 0;
DWORD Flags = 0;
- buff->iocompletioninfo.iofunction = OnSocketRecv;
buff->fd = s;
- buff->AddressLength = sizeof(struct sockaddr_in);
+ AddrLen = sizeof(struct sockaddr_in);
if (SOCKET_ERROR == WSARecvFrom(buff->fd, &buff->wsabuff, 1,
&BytesReceived, &Flags,
- (struct sockaddr *) &buff->recv_srcadr, (LPINT) &buff->AddressLength,
- &buff->iocompletioninfo.overlapped, NULL)) {
+ (struct sockaddr *) &buff->recv_srcadr, (LPINT) &AddrLen,
+ (LPOVERLAPPED) lpo, NULL)) {
DWORD Result = WSAGetLastError();
switch (Result) {
case NO_ERROR :
case WSAENOTSOCK :
netsyslog(LOG_ERR, "Can't read from socket, because it isn't a socket: %m");
freerecvbuf(buff);
+ /* Clear the heap */
+ if (lpo != NULL)
+ HeapFree(hHeapHandle, 0, lpo);
return 0;
break;
case WSAEFAULT :
netsyslog(LOG_ERR, "The buffers parameter is incorrect: %m");
+ /* Clear the heap */
+ if (lpo != NULL)
+ HeapFree(hHeapHandle, 0, lpo);
freerecvbuf(buff);
return 0;
break;
/* Returns 0 if any Error */
static int
-OnSocketRecv(DWORD i, struct IoCompletionInfo *Info, DWORD Bytes)
+OnSocketRecv(DWORD i, IoCompletionInfo *lpo, DWORD Bytes)
{
struct recvbuf *buff = NULL;
struct interface * inter = (struct interface *) i;
/* Convert the overlapped pointer back to a recvbuf pointer.
*/
- buff = (struct recvbuf *) ( ((char *) Info) - offsetof(struct recvbuf, iocompletioninfo));
+ buff = (struct recvbuf *) lpo->buff;
get_systime(&buff->recv_time);
if (Bytes > 0 && inter->ignore_packets == ISC_FALSE) {
buff->receiver = receive;
buff->dstadr = inter;
#ifdef DEBUG
- if (debug > 3) {
- char strbuffer[256];
- DWORD strlength = sizeof(strbuffer);
- if (0 == WSAAddressToString((struct sockaddr *) &buff->recv_srcadr, buff->AddressLength, NULL, strbuffer, &strlength))
- printf("Received %d bytes from %s\n", Bytes, strbuffer);
- }
+ if (debug > 3)
+ printf("Received %d bytes from %s\n", stoa(&buff->recv_srcadr));
#endif
add_full_recv_buffer(buff);
if( !SetEvent( WaitableIoEventHandle ) ) {
else {
freerecvbuf(buff);
}
+ /* Clear the heap */
+ if (lpo != NULL)
+ HeapFree(hHeapHandle, 0, lpo);
+
QueueSocketRecv(inter->fd);
return 1;
}
static int
-OnSendToComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes)
+OnSendToComplete(DWORD Key, IoCompletionInfo *lpo, DWORD Bytes)
{
transmitbuf *buff = NULL;
(void) Bytes;
(void) Key;
- buff = (struct transmitbuf *) ( ((char *) Info) - offsetof(struct transmitbuf, iocompletioninfo));
+ buff = (struct transmitbuf *) lpo->buff;
free_transmit_buffer(buff);
+ /* Clear the heap */
+ if (lpo != NULL)
+ HeapFree(hHeapHandle, 0, lpo);
return 1;
}
static int
-OnWriteComplete(DWORD Key, struct IoCompletionInfo *Info, DWORD Bytes)
+OnWriteComplete(DWORD Key, IoCompletionInfo *lpo, DWORD Bytes)
{
transmitbuf *buff = NULL;
(void) Bytes;
(void) Key;
- buff = (struct transmitbuf *) ( ((char *) Info) - offsetof(struct transmitbuf, iocompletioninfo));
+ buff = (struct transmitbuf *) lpo->buff;
free_transmit_buffer(buff);
+ /* Clear the heap */
+ if (lpo != NULL)
+ HeapFree(hHeapHandle, 0, lpo);
return 1;
}
transmitbuf *buff = NULL;
DWORD Result = ERROR_SUCCESS;
int errval;
+ int AddrLen;
+ IoCompletionInfo *lpo;
+
+ lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle,
+ HEAP_ZERO_MEMORY,
+ sizeof(IoCompletionInfo));
if (len <= sizeof(buff->pkt)) {
buff = get_free_transmit_buffer();
buff->wsabuf.buf = buff->pkt;
buff->wsabuf.len = len;
- buff->iocompletioninfo.iofunction = OnSendToComplete;
- Result = WSASendTo(inter->fd, &buff->wsabuf, 1, &BytesSent, Flags, (struct sockaddr *) dest, sizeof(struct sockaddr_in), &buff->iocompletioninfo.overlapped, NULL);
+ AddrLen = sizeof(struct sockaddr_in);
+ lpo->request_type = SOCK_SEND;
+ lpo->buff = buff;
+
+ Result = WSASendTo(inter->fd, &buff->wsabuf, 1, &BytesSent, Flags, (struct sockaddr *) dest, AddrLen, (LPOVERLAPPED) lpo, NULL);
if(Result == SOCKET_ERROR)
{
default :
netsyslog(LOG_ERR, "WSASendTo - error sending message: %m");
+ free_transmit_buffer(buff);
break;
}
}
#ifdef DEBUG
- if (debug > 2) {
- char strbuffer[256];
- DWORD strlength = sizeof(strbuffer);
- if (0 == WSAAddressToString((LPSOCKADDR) dest, sizeof(*dest), NULL, strbuffer, &strlength))
- printf("WSASendTo - %d bytes to %s : %d\n", len, strbuffer, Result);
- }
+ if (debug > 3)
+ printf("WSASendTo - %d bytes to %s : %d\n", len, stoa(dest), Result);
#endif
return (Result);
}
char *pkt,
int len)
{
+ DWORD errval;
transmitbuf *buff = NULL;
DWORD lpNumberOfBytesWritten;
DWORD Result = ERROR_INSUFFICIENT_BUFFER;
+ IoCompletionInfo *lpo;
+
+ lpo = (IoCompletionInfo *) HeapAlloc(hHeapHandle,
+ HEAP_ZERO_MEMORY,
+ sizeof(IoCompletionInfo));
if (len <= sizeof(buff->pkt)) {
buff = get_free_transmit_buffer();
if (buff != NULL) {
+ lpo->request_type = CLOCK_WRITE;
+ lpo->buff = buff;
memcpy(&buff->pkt, pkt, len);
- buff->iocompletioninfo.iofunction = OnWriteComplete;
- Result = WriteFile(fd, buff->pkt, len, &lpNumberOfBytesWritten, &buff->iocompletioninfo.overlapped);
+ Result = WriteFile(fd, buff->pkt, len, &lpNumberOfBytesWritten, (LPOVERLAPPED) lpo);
+
+ if(Result == SOCKET_ERROR)
+ {
+ errval = WSAGetLastError();
+ switch (errval) {
+ case NO_ERROR :
+ case WSA_IO_INCOMPLETE :
+ case WSA_WAIT_IO_COMPLETION :
+ case WSA_IO_PENDING :
+ Result = ERROR_SUCCESS;
+ break ;
+
+ default :
+ netsyslog(LOG_ERR, "WriteFile - error sending message: %m");
+ free_transmit_buffer(buff);
+ break;
+ }
+ }
#ifdef DEBUG
if (debug > 2) {
- printf("SendTo - %d bytes %d\n", len, Result);
+ printf("WriteFile - %d bytes %d\n", len, Result);
}
#endif
if (Result) return len;
return Result;
}
+/*
+ * GetReceivedBuffers
+ * Note that this is in effect the main loop for processing requests
+ * both send and receive. This should be reimplemented
+ */
struct recvbuf *GetReceivedBuffers()
{
-
- DWORD Index = WaitForMultipleObjectsEx(sizeof(WaitHandles)/sizeof(WaitHandles[0]), WaitHandles, FALSE, 1000, TRUE);
+ DWORD Index = WaitForMultipleObjectsEx(MAXHANDLES, WaitHandles, FALSE, INFINITE, TRUE);
switch (Index) {
case WAIT_OBJECT_0 + 0 : /* exit request */
exit(0);
break;
} /* switch */
+
return (getrecvbufs()); /* get received buffers */
}
+static void
+TransmitCheckThread(void *NotUsed)
+{
+
+ int SleepSecs = BUFCHECK_SECS;
+ while (TRUE)
+ {
+ Sleep(SleepSecs*1000);
+
+ /* If we are done we exit */
+ if (bExit)
+ return;
+
+ } /* while */
+}
#else
static int NonEmptyCompilationUnit;