]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Windows port: Added Windows threads support to DiskThreads Disk module
authorserassio <>
Thu, 7 Sep 2006 01:36:42 +0000 (01:36 +0000)
committerserassio <>
Thu, 7 Sep 2006 01:36:42 +0000 (01:36 +0000)
configure.in
src/DiskIO/DiskThreads/aiops_win32.cc [new file with mode: 0755]
src/Makefile.am

index 7311d4b0fc43c50d900648c634590700b7a6c730..13df799d76d3d7777c277e7c1e48e0a931e336a8 100644 (file)
@@ -1,7 +1,7 @@
 
 dnl  Configuration input file for Squid
 dnl
-dnl  $Id: configure.in,v 1.435 2006/09/04 20:15:21 serassio Exp $
+dnl  $Id: configure.in,v 1.436 2006/09/06 19:36:42 serassio Exp $
 dnl
 dnl
 dnl
@@ -11,7 +11,7 @@ AM_CONFIG_HEADER(include/autoconf.h)
 AC_CONFIG_AUX_DIR(cfgaux)
 AC_CONFIG_SRCDIR([src/main.cc])
 AM_INIT_AUTOMAKE([tar-ustar])
-AC_REVISION($Revision: 1.435 $)dnl
+AC_REVISION($Revision: 1.436 $)dnl
 AC_PREFIX_DEFAULT(/usr/local/squid)
 AM_MAINTAINER_MODE
 
@@ -627,6 +627,7 @@ if test -z "$FOUND_DISKTHREADS" && test -n "$NEED_DISKTHREADS"; then
     DISK_MODULES="$DISK_MODULES DiskThreads"
     DISK_LINKOBJS="$DISK_LINKOBJS DiskIO/DiskThreads/DiskThreadsDiskIOModule.o"
 fi
+
 if test -z "$FOUND_AIO" && test -n "$NEED_AIO"; then
     echo "adding AIO, as it is used by an active, legacy Store Module"
     DISK_LIBS="$DISK_LIBS libAIO.a"
@@ -639,8 +640,17 @@ for fs in $DISK_MODULES none; do
     case "$fs" in
     DiskThreads)
        if test -z "$with_pthreads"; then
-           echo "DiskThreads IO Module used, pthreads support automatically enabled"
-           with_pthreads=yes
+            case "$host_os" in
+            mingw|mingw32|cygwin|cygwin32)
+               AM_CONDITIONAL(USE_AIOPS_WIN32, true)
+               echo "DiskThreads IO Module used, Windows threads support automatically enabled"
+                ;;
+            *)
+               AM_CONDITIONAL(USE_AIOPS_WIN32, false)
+               echo "DiskThreads IO Module used, pthreads support automatically enabled"
+               with_pthreads=yes
+                ;;
+            esac
        fi
        ;;
     AIO)
diff --git a/src/DiskIO/DiskThreads/aiops_win32.cc b/src/DiskIO/DiskThreads/aiops_win32.cc
new file mode 100755 (executable)
index 0000000..0b96213
--- /dev/null
@@ -0,0 +1,1226 @@
+/*
+ * $Id: aiops_win32.cc,v 1.1 2006/09/06 19:36:42 serassio Exp $
+ *
+ * DEBUG: section 43    AIOPS
+ * AUTHOR: Stewart Forster <slf@connect.com.au>
+ *
+ * SQUID Web Proxy Cache          http://www.squid-cache.org/
+ * ----------------------------------------------------------
+ *
+ *  Squid is the result of efforts by numerous individuals from
+ *  the Internet community; see the CONTRIBUTORS file for full
+ *  details.   Many organizations have provided support for Squid's
+ *  development; see the SPONSORS file for full details.  Squid is
+ *  Copyrighted (C) 2001 by the Regents of the University of
+ *  California; see the COPYRIGHT file for full details.  Squid
+ *  incorporates software developed and/or copyrighted by other
+ *  sources; see the CREDITS file for full details.
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *  
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *  
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
+ *
+ */
+
+#include "squid.h"
+#include "squid_windows.h"
+#include "DiskThreads.h"
+
+#include       <stdio.h>
+#include       <sys/types.h>
+#include       <sys/stat.h>
+#include       <fcntl.h>
+#include       <errno.h>
+#include       <dirent.h>
+#include       <signal.h>
+#include "CommIO.h"
+#include "SquidTime.h"
+#include "Store.h"
+
+#define RIDICULOUS_LENGTH      4096
+
+enum _squidaio_thread_status {
+    _THREAD_STARTING = 0,
+    _THREAD_WAITING,
+    _THREAD_BUSY,
+    _THREAD_FAILED,
+    _THREAD_DONE
+};
+typedef enum _squidaio_thread_status squidaio_thread_status;
+
+typedef struct squidaio_request_t
+{
+
+    struct squidaio_request_t *next;
+    squidaio_request_type request_type;
+    int cancelled;
+    char *path;
+    int oflag;
+    mode_t mode;
+    int fd;
+    char *bufferp;
+    char *tmpbufp;
+    int buflen;
+    off_t offset;
+    int whence;
+    int ret;
+    int err;
+
+    struct stat *tmpstatp;
+
+    struct stat *statp;
+    squidaio_result_t *resultp;
+}
+
+squidaio_request_t;
+
+typedef struct squidaio_request_queue_t
+{
+    HANDLE mutex;
+    HANDLE cond; /* See Event objects */
+    squidaio_request_t *volatile head;
+    squidaio_request_t *volatile *volatile tailp;
+    unsigned long requests;
+    unsigned long blocked;     /* main failed to lock the queue */
+}
+
+squidaio_request_queue_t;
+
+typedef struct squidaio_thread_t squidaio_thread_t;
+
+struct squidaio_thread_t
+{
+    squidaio_thread_t *next;
+    HANDLE thread;
+    DWORD dwThreadId; /* thread ID */
+    squidaio_thread_status status;
+
+    struct squidaio_request_t *current_req;
+    unsigned long requests;
+    int volatile exit;
+};
+
+static void squidaio_queue_request(squidaio_request_t *);
+static void squidaio_cleanup_request(squidaio_request_t *);
+static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
+static void squidaio_do_open(squidaio_request_t *);
+static void squidaio_do_read(squidaio_request_t *);
+static void squidaio_do_write(squidaio_request_t *);
+static void squidaio_do_close(squidaio_request_t *);
+static void squidaio_do_stat(squidaio_request_t *);
+#if USE_TRUNCATE
+static void squidaio_do_truncate(squidaio_request_t *);
+#else
+static void squidaio_do_unlink(squidaio_request_t *);
+#endif
+#if AIO_OPENDIR
+static void *squidaio_do_opendir(squidaio_request_t *);
+#endif
+static void squidaio_debug(squidaio_request_t *);
+static void squidaio_poll_queues(void);
+
+static squidaio_thread_t *threads = NULL;
+static int squidaio_initialised = 0;
+
+
+#define AIO_LARGE_BUFS  16384
+#define AIO_MEDIUM_BUFS        AIO_LARGE_BUFS >> 1
+#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
+#define AIO_TINY_BUFS  AIO_LARGE_BUFS >> 3
+#define AIO_MICRO_BUFS 128
+
+static MemAllocator *squidaio_large_bufs = NULL;       /* 16K */
+static MemAllocator *squidaio_medium_bufs = NULL;      /* 8K */
+static MemAllocator *squidaio_small_bufs = NULL;       /* 4K */
+static MemAllocator *squidaio_tiny_bufs = NULL;        /* 2K */
+static MemAllocator *squidaio_micro_bufs = NULL;       /* 128K */
+
+static int request_queue_len = 0;
+static MemAllocator *squidaio_request_pool = NULL;
+static MemAllocator *squidaio_thread_pool = NULL;
+static squidaio_request_queue_t request_queue;
+
+static struct
+{
+    squidaio_request_t *head, **tailp;
+}
+
+request_queue2 = {
+
+                     NULL, &request_queue2.head
+                 };
+static squidaio_request_queue_t done_queue;
+
+static struct
+{
+    squidaio_request_t *head, **tailp;
+}
+
+done_requests = {
+
+                    NULL, &done_requests.head
+                };
+
+static HANDLE main_thread;
+
+static MemAllocator *
+squidaio_get_pool(int size)
+{
+    if (size <= AIO_LARGE_BUFS) {
+        if (size <= AIO_MICRO_BUFS)
+            return squidaio_micro_bufs;
+        else if (size <= AIO_TINY_BUFS)
+            return squidaio_tiny_bufs;
+        else if (size <= AIO_SMALL_BUFS)
+            return squidaio_small_bufs;
+        else if (size <= AIO_MEDIUM_BUFS)
+            return squidaio_medium_bufs;
+        else
+            return squidaio_large_bufs;
+    }
+
+    return NULL;
+}
+
+void *
+squidaio_xmalloc(int size)
+{
+    void *p;
+    MemAllocator *pool;
+
+    if ((pool = squidaio_get_pool(size)) != NULL) {
+        p = pool->alloc();
+    } else
+        p = xmalloc(size);
+
+    return p;
+}
+
+static char *
+squidaio_xstrdup(const char *str)
+{
+    char *p;
+    int len = strlen(str) + 1;
+
+    p = (char *)squidaio_xmalloc(len);
+    strncpy(p, str, len);
+
+    return p;
+}
+
+void
+squidaio_xfree(void *p, int size)
+{
+    MemAllocator *pool;
+
+    if ((pool = squidaio_get_pool(size)) != NULL) {
+        pool->free(p);
+    } else
+        xfree(p);
+}
+
+static void
+squidaio_xstrfree(char *str)
+{
+    MemAllocator *pool;
+    int len = strlen(str) + 1;
+
+    if ((pool = squidaio_get_pool(len)) != NULL) {
+        pool->free(str);
+    } else
+        xfree(str);
+}
+
+void
+squidaio_init(void)
+{
+    int i;
+    squidaio_thread_t *threadp;
+
+    if (squidaio_initialised)
+        return;
+
+    if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
+                         GetCurrentThread(),  /* pseudo handle to copy */
+                         GetCurrentProcess(), /* pseudo handle, don't close */
+                         &main_thread,
+                         0,                   /* required access */
+                         FALSE,               /* child process's don't inherit the handle */
+                         DUPLICATE_SAME_ACCESS)) {
+        /* spit errors */
+        fatal("Couldn't get current thread handle");
+    }
+
+    /* Initialize request queue */
+    if ((request_queue.mutex = CreateMutex(NULL,    /* no inheritance */
+                                           FALSE,   /* start unowned (as per mutex_init) */
+                                           NULL)    /* no name */
+        ) == NULL) {
+        fatal("Failed to create mutex");
+    }
+
+    if ((request_queue.cond = CreateEvent(NULL,     /* no inheritance */
+                                          FALSE,    /* auto signal reset - which I think is pthreads like ? */
+                                          FALSE,    /* start non signaled */
+                                          NULL)     /* no name */
+        ) == NULL) {
+        fatal("Failed to create condition variable");
+    }
+
+    request_queue.head = NULL;
+
+    request_queue.tailp = &request_queue.head;
+
+    request_queue.requests = 0;
+
+    request_queue.blocked = 0;
+
+    /* Initialize done queue */
+
+    if ((done_queue.mutex = CreateMutex(NULL,  /* no inheritance */
+                                        FALSE, /* start unowned (as per mutex_init) */
+                                        NULL)  /* no name */
+        ) == NULL) {
+        fatal("Failed to create mutex");
+    }
+
+    if ((done_queue.cond = CreateEvent(NULL,  /* no inheritance */
+                                       TRUE,  /* manually signaled - which I think is pthreads like ? */
+                                       FALSE, /* start non signaled */
+                                       NULL)  /* no name */
+        ) == NULL) {
+        fatal("Failed to create condition variable");
+    }
+
+    done_queue.head = NULL;
+
+    done_queue.tailp = &done_queue.head;
+
+    done_queue.requests = 0;
+
+    done_queue.blocked = 0;
+
+    CommIO::NotifyIOCompleted();
+
+    /* Create threads and get them to sit in their wait loop */
+    squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
+
+    assert(NUMTHREADS);
+
+    for (i = 0; i < NUMTHREADS; i++) {
+        threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
+        threadp->status = _THREAD_STARTING;
+        threadp->current_req = NULL;
+        threadp->requests = 0;
+        threadp->next = threads;
+        threads = threadp;
+
+        if ((threadp->thread = CreateThread(NULL,                   /* no security attributes */
+                                            0,                      /* use default stack size */
+                                            squidaio_thread_loop,   /* thread function */
+                                            threadp,                /* argument to thread function */
+                                            0,                      /* use default creation flags */
+                                            &(threadp->dwThreadId)) /* returns the thread identifier */
+            ) == NULL) {
+            fprintf(stderr, "Thread creation failed\n");
+            threadp->status = _THREAD_FAILED;
+            continue;
+        }
+
+        /* Set the new thread priority above parent process */
+        SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
+    }
+
+    /* Create request pool */
+    squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
+
+    squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
+
+    squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
+
+    squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
+
+    squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
+
+    squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
+
+    squidaio_initialised = 1;
+}
+
+void
+squidaio_shutdown(void)
+{
+    squidaio_thread_t *threadp;
+    int i;
+    HANDLE * hthreads;
+
+    if (!squidaio_initialised)
+        return;
+
+    /* This is the same as in squidaio_sync */
+    do {
+        squidaio_poll_queues();
+    } while (request_queue_len > 0);
+
+    hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
+
+    threadp = threads;
+
+    for (i = 0; i < NUMTHREADS; i++) {
+        threadp->exit = 1;
+        hthreads[i] = threadp->thread;
+        threadp = threadp->next;
+    }
+
+    ReleaseMutex(request_queue.mutex);
+    ResetEvent(request_queue.cond);
+    ReleaseMutex(done_queue.mutex);
+    ResetEvent(done_queue.cond);
+    Sleep(0);
+
+    WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
+
+    for (i = 0; i < NUMTHREADS; i++) {
+        CloseHandle(hthreads[i]);
+    }
+
+    CloseHandle(main_thread);
+    CommIO::NotifyIOClose();
+
+    squidaio_initialised = 0;
+    xfree(hthreads);
+}
+
+static DWORD WINAPI
+squidaio_thread_loop(LPVOID lpParam)
+{
+    squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
+    squidaio_request_t *request;
+    HANDLE cond; /* local copy of the event queue because win32 event handles
+                              * don't atomically release the mutex as cond variables do. */
+
+    /* lock the thread info */
+
+    if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
+        fatal("Can't get ownership of mutex\n");
+    }
+
+    /* duplicate the handle */
+    if (!DuplicateHandle(GetCurrentProcess(),    /* pseudo handle, don't close */
+                         request_queue.cond,     /* handle to copy */
+                         GetCurrentProcess(),    /* pseudo handle, don't close */
+                         &cond,
+                         0,                      /* required access */
+                         FALSE,                  /* child process's don't inherit the handle */
+                         DUPLICATE_SAME_ACCESS))
+        fatal("Can't duplicate mutex handle\n");
+
+    if (!ReleaseMutex(request_queue.mutex)) {
+        CloseHandle(cond);
+        fatal("Can't release mutex\n");
+    }
+
+    Sleep(0);
+
+    while (1) {
+        DWORD rv;
+        threadp->current_req = request = NULL;
+        request = NULL;
+        /* Get a request to process */
+        threadp->status = _THREAD_WAITING;
+
+        if (threadp->exit) {
+            CloseHandle(request_queue.mutex);
+            CloseHandle(cond);
+            return 0;
+        }
+
+        rv = WaitForSingleObject(request_queue.mutex, INFINITE);
+
+        if (rv == WAIT_FAILED) {
+            CloseHandle(cond);
+            return 1;
+        }
+
+        while (!request_queue.head) {
+            if (!ReleaseMutex(request_queue.mutex)) {
+                CloseHandle(cond);
+                threadp->status = _THREAD_FAILED;
+                return 1;
+            }
+
+            Sleep(0);
+            rv = WaitForSingleObject(cond, INFINITE);
+
+            if (rv == WAIT_FAILED) {
+                CloseHandle(cond);
+                return 1;
+            }
+
+            rv = WaitForSingleObject(request_queue.mutex, INFINITE);
+
+            if (rv == WAIT_FAILED) {
+                CloseHandle(cond);
+                return 1;
+            }
+        }
+
+        request = request_queue.head;
+
+        if (request)
+            request_queue.head = request->next;
+
+        if (!request_queue.head)
+            request_queue.tailp = &request_queue.head;
+
+        if (!ReleaseMutex(request_queue.mutex)) {
+            CloseHandle(cond);
+            return 1;
+        }
+
+        Sleep(0);
+
+        /* process the request */
+        threadp->status = _THREAD_BUSY;
+
+        request->next = NULL;
+
+        threadp->current_req = request;
+
+        errno = 0;
+
+        if (!request->cancelled) {
+            switch (request->request_type) {
+
+            case _AIO_OP_OPEN:
+                squidaio_do_open(request);
+                break;
+
+            case _AIO_OP_READ:
+                squidaio_do_read(request);
+                break;
+
+            case _AIO_OP_WRITE:
+                squidaio_do_write(request);
+                break;
+
+            case _AIO_OP_CLOSE:
+                squidaio_do_close(request);
+                break;
+
+#if USE_TRUNCATE
+
+            case _AIO_OP_TRUNCATE:
+                squidaio_do_truncate(request);
+                break;
+#else
+
+            case _AIO_OP_UNLINK:
+                squidaio_do_unlink(request);
+                break;
+
+#endif
+#if AIO_OPENDIR                        /* Opendir not implemented yet */
+
+            case _AIO_OP_OPENDIR:
+                squidaio_do_opendir(request);
+                break;
+#endif
+
+            case _AIO_OP_STAT:
+                squidaio_do_stat(request);
+                break;
+
+            default:
+                request->ret = -1;
+                request->err = EINVAL;
+                break;
+            }
+        } else {               /* cancelled */
+            request->ret = -1;
+            request->err = EINTR;
+        }
+
+        threadp->status = _THREAD_DONE;
+        /* put the request in the done queue */
+        rv = WaitForSingleObject(done_queue.mutex, INFINITE);
+
+        if (rv == WAIT_FAILED) {
+            CloseHandle(cond);
+            return 1;
+        }
+
+        *done_queue.tailp = request;
+        done_queue.tailp = &request->next;
+
+        if (!ReleaseMutex(done_queue.mutex)) {
+            CloseHandle(cond);
+            return 1;
+        }
+
+        CommIO::NotifyIOCompleted();
+        Sleep(0);
+        threadp->requests++;
+    }                          /* while forever */
+
+    CloseHandle(cond);
+
+    return 0;
+}                              /* squidaio_thread_loop */
+
+static void
+squidaio_queue_request(squidaio_request_t * request)
+{
+    static int high_start = 0;
+    debug(43, 9) ("squidaio_queue_request: %p type=%d result=%p\n",
+                  request, request->request_type, request->resultp);
+    /* Mark it as not executed (failing result, no error) */
+    request->ret = -1;
+    request->err = 0;
+    /* Internal housekeeping */
+    request_queue_len += 1;
+    request->resultp->_data = request;
+    /* Play some tricks with the request_queue2 queue */
+    request->next = NULL;
+
+    if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
+        if (request_queue2.head) {
+            /* Grab blocked requests */
+            *request_queue.tailp = request_queue2.head;
+            request_queue.tailp = request_queue2.tailp;
+        }
+
+        /* Enqueue request */
+        *request_queue.tailp = request;
+
+        request_queue.tailp = &request->next;
+
+        if (!SetEvent(request_queue.cond))
+            fatal("Couldn't push queue");
+
+        if (!ReleaseMutex(request_queue.mutex)) {
+            /* unexpected error */
+            fatal("Couldn't push queue");
+        }
+
+        Sleep(0);
+
+        if (request_queue2.head) {
+            /* Clear queue of blocked requests */
+            request_queue2.head = NULL;
+            request_queue2.tailp = &request_queue2.head;
+        }
+    } else {
+        /* Oops, the request queue is blocked, use request_queue2 */
+        *request_queue2.tailp = request;
+        request_queue2.tailp = &request->next;
+    }
+
+    if (request_queue2.head) {
+        static int filter = 0;
+        static int filter_limit = 8;
+
+        if (++filter >= filter_limit) {
+            filter_limit += filter;
+            filter = 0;
+            debug(43, 1) ("squidaio_queue_request: WARNING - Queue congestion\n");
+        }
+    }
+
+    /* Warn if out of threads */
+    if (request_queue_len > MAGIC1) {
+        static int last_warn = 0;
+        static int queue_high, queue_low;
+
+        if (high_start == 0) {
+            high_start = (int)squid_curtime;
+            queue_high = request_queue_len;
+            queue_low = request_queue_len;
+        }
+
+        if (request_queue_len > queue_high)
+            queue_high = request_queue_len;
+
+        if (request_queue_len < queue_low)
+            queue_low = request_queue_len;
+
+        if (squid_curtime >= (last_warn + 15) &&
+                squid_curtime >= (high_start + 5)) {
+            debug(43, 1) ("squidaio_queue_request: WARNING - Disk I/O overloading\n");
+
+            if (squid_curtime >= (high_start + 15))
+                debug(43, 1) ("squidaio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%ld\n",
+                              request_queue_len, queue_high, queue_low, (long int) (squid_curtime - high_start));
+
+            last_warn = (int)squid_curtime;
+        }
+    } else {
+        high_start = 0;
+    }
+
+    /* Warn if seriously overloaded */
+    if (request_queue_len > RIDICULOUS_LENGTH) {
+        debug(43, 0) ("squidaio_queue_request: Async request queue growing uncontrollably!\n");
+        debug(43, 0) ("squidaio_queue_request: Syncing pending I/O operations.. (blocking)\n");
+        squidaio_sync();
+        debug(43, 0) ("squidaio_queue_request: Synced\n");
+    }
+}                              /* squidaio_queue_request */
+
+static void
+squidaio_cleanup_request(squidaio_request_t * requestp)
+{
+    squidaio_result_t *resultp = requestp->resultp;
+    int cancelled = requestp->cancelled;
+
+    /* Free allocated structures and copy data back to user space if the */
+    /* request hasn't been cancelled */
+
+    switch (requestp->request_type) {
+
+    case _AIO_OP_STAT:
+
+        if (!cancelled && requestp->ret == 0)
+
+            xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
+
+        squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
+
+        squidaio_xstrfree(requestp->path);
+
+        break;
+
+    case _AIO_OP_OPEN:
+        if (cancelled && requestp->ret >= 0)
+            /* The open() was cancelled but completed */
+            close(requestp->ret);
+
+        squidaio_xstrfree(requestp->path);
+
+        break;
+
+    case _AIO_OP_CLOSE:
+        if (cancelled && requestp->ret < 0)
+            /* The close() was cancelled and never got executed */
+            close(requestp->fd);
+
+        break;
+
+    case _AIO_OP_UNLINK:
+
+    case _AIO_OP_TRUNCATE:
+
+    case _AIO_OP_OPENDIR:
+        squidaio_xstrfree(requestp->path);
+
+        break;
+
+    case _AIO_OP_READ:
+        break;
+
+    case _AIO_OP_WRITE:
+        break;
+
+    default:
+        break;
+    }
+
+    if (resultp != NULL && !cancelled) {
+        resultp->aio_return = requestp->ret;
+        resultp->aio_errno = requestp->err;
+    }
+
+    squidaio_request_pool->free(requestp);
+}                              /* squidaio_cleanup_request */
+
+
+int
+squidaio_cancel(squidaio_result_t * resultp)
+{
+    squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
+
+    if (request && request->resultp == resultp) {
+        debug(43, 9) ("squidaio_cancel: %p type=%d result=%p\n",
+                      request, request->request_type, request->resultp);
+        request->cancelled = 1;
+        request->resultp = NULL;
+        resultp->_data = NULL;
+        resultp->result_type = _AIO_OP_NONE;
+        return 0;
+    }
+
+    return 1;
+}                              /* squidaio_cancel */
+
+
+int
+squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
+{
+    squidaio_init();
+    squidaio_request_t *requestp;
+
+    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
+
+    requestp->path = (char *) squidaio_xstrdup(path);
+
+    requestp->oflag = oflag;
+
+    requestp->mode = mode;
+
+    requestp->resultp = resultp;
+
+    requestp->request_type = _AIO_OP_OPEN;
+
+    requestp->cancelled = 0;
+
+    resultp->result_type = _AIO_OP_OPEN;
+
+    squidaio_queue_request(requestp);
+
+    return 0;
+}
+
+
+static void
+squidaio_do_open(squidaio_request_t * requestp)
+{
+    requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
+    requestp->err = errno;
+}
+
+
+int
+squidaio_read(int fd, char *bufp, int bufs, off_t offset, int whence, squidaio_result_t * resultp)
+{
+    squidaio_request_t *requestp;
+
+    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
+
+    requestp->fd = fd;
+
+    requestp->bufferp = bufp;
+
+    requestp->buflen = bufs;
+
+    requestp->offset = offset;
+
+    requestp->whence = whence;
+
+    requestp->resultp = resultp;
+
+    requestp->request_type = _AIO_OP_READ;
+
+    requestp->cancelled = 0;
+
+    resultp->result_type = _AIO_OP_READ;
+
+    squidaio_queue_request(requestp);
+
+    return 0;
+}
+
+
+static void
+squidaio_do_read(squidaio_request_t * requestp)
+{
+    lseek(requestp->fd, requestp->offset, requestp->whence);
+
+    if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
+                  requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
+        WIN32_maperror(GetLastError());
+        requestp->ret = -1;
+    }
+
+    requestp->err = errno;
+}
+
+
+int
+squidaio_write(int fd, char *bufp, int bufs, off_t offset, int whence, squidaio_result_t * resultp)
+{
+    squidaio_request_t *requestp;
+
+    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
+
+    requestp->fd = fd;
+
+    requestp->bufferp = bufp;
+
+    requestp->buflen = bufs;
+
+    requestp->offset = offset;
+
+    requestp->whence = whence;
+
+    requestp->resultp = resultp;
+
+    requestp->request_type = _AIO_OP_WRITE;
+
+    requestp->cancelled = 0;
+
+    resultp->result_type = _AIO_OP_WRITE;
+
+    squidaio_queue_request(requestp);
+
+    return 0;
+}
+
+
+static void
+squidaio_do_write(squidaio_request_t * requestp)
+{
+    if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
+                   requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
+        WIN32_maperror(GetLastError());
+        requestp->ret = -1;
+    }
+
+    requestp->err = errno;
+}
+
+
+int
+squidaio_close(int fd, squidaio_result_t * resultp)
+{
+    squidaio_request_t *requestp;
+
+    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
+
+    requestp->fd = fd;
+
+    requestp->resultp = resultp;
+
+    requestp->request_type = _AIO_OP_CLOSE;
+
+    requestp->cancelled = 0;
+
+    resultp->result_type = _AIO_OP_CLOSE;
+
+    squidaio_queue_request(requestp);
+
+    return 0;
+}
+
+
+static void
+squidaio_do_close(squidaio_request_t * requestp)
+{
+    if((requestp->ret = close(requestp->fd)) < 0) {
+        debug(43, 0) ("squidaio_do_close: FD %d, errno %d\n", requestp->fd, errno);
+        close(requestp->fd);
+    }
+
+    requestp->err = errno;
+}
+
+
+int
+
+squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
+{
+    squidaio_init();
+    squidaio_request_t *requestp;
+
+    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
+
+    requestp->path = (char *) squidaio_xstrdup(path);
+
+    requestp->statp = sb;
+
+    requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
+
+    requestp->resultp = resultp;
+
+    requestp->request_type = _AIO_OP_STAT;
+
+    requestp->cancelled = 0;
+
+    resultp->result_type = _AIO_OP_STAT;
+
+    squidaio_queue_request(requestp);
+
+    return 0;
+}
+
+
+static void
+squidaio_do_stat(squidaio_request_t * requestp)
+{
+    requestp->ret = stat(requestp->path, requestp->tmpstatp);
+    requestp->err = errno;
+}
+
+
+#if USE_TRUNCATE
+int
+squidaio_truncate(const char *path, off_t length, squidaio_result_t * resultp)
+{
+    squidaio_init();
+    squidaio_request_t *requestp;
+
+    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
+
+    requestp->path = (char *) squidaio_xstrdup(path);
+
+    requestp->offset = length;
+
+    requestp->resultp = resultp;
+
+    requestp->request_type = _AIO_OP_TRUNCATE;
+
+    requestp->cancelled = 0;
+
+    resultp->result_type = _AIO_OP_TRUNCATE;
+
+    squidaio_queue_request(requestp);
+
+    return 0;
+}
+
+
+static void
+squidaio_do_truncate(squidaio_request_t * requestp)
+{
+    requestp->ret = truncate(requestp->path, requestp->offset);
+    requestp->err = errno;
+}
+
+
+#else
+int
+squidaio_unlink(const char *path, squidaio_result_t * resultp)
+{
+    squidaio_init();
+    squidaio_request_t *requestp;
+
+    requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
+
+    requestp->path = squidaio_xstrdup(path);
+
+    requestp->resultp = resultp;
+
+    requestp->request_type = _AIO_OP_UNLINK;
+
+    requestp->cancelled = 0;
+
+    resultp->result_type = _AIO_OP_UNLINK;
+
+    squidaio_queue_request(requestp);
+
+    return 0;
+}
+
+
+static void
+squidaio_do_unlink(squidaio_request_t * requestp)
+{
+    requestp->ret = unlink(requestp->path);
+    requestp->err = errno;
+}
+
+#endif
+
+#if AIO_OPENDIR
+/* XXX squidaio_opendir NOT implemented yet.. */
+
+int
+squidaio_opendir(const char *path, squidaio_result_t * resultp)
+{
+    squidaio_request_t *requestp;
+    int len;
+
+    requestp = squidaio_request_pool->alloc();
+
+    resultp->result_type = _AIO_OP_OPENDIR;
+
+    return -1;
+}
+
+static void
+squidaio_do_opendir(squidaio_request_t * requestp)
+{
+    /* NOT IMPLEMENTED */
+}
+
+#endif
+
+static void
+squidaio_poll_queues(void)
+{
+    /* kick "overflow" request queue */
+
+    if (request_queue2.head &&
+            (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
+        *request_queue.tailp = request_queue2.head;
+        request_queue.tailp = request_queue2.tailp;
+
+        if (!SetEvent(request_queue.cond))
+            fatal("couldn't push queue\n");
+
+        if (!ReleaseMutex(request_queue.mutex)) {
+            /* unexpected error */
+        }
+
+        Sleep(0);
+        request_queue2.head = NULL;
+        request_queue2.tailp = &request_queue2.head;
+    }
+
+    /* poll done queue */
+    if (done_queue.head &&
+            (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
+
+        struct squidaio_request_t *requests = done_queue.head;
+        done_queue.head = NULL;
+        done_queue.tailp = &done_queue.head;
+
+        if (!ReleaseMutex(done_queue.mutex)) {
+            /* unexpected error */
+        }
+
+        Sleep(0);
+        *done_requests.tailp = requests;
+        request_queue_len -= 1;
+
+        while (requests->next) {
+            requests = requests->next;
+            request_queue_len -= 1;
+        }
+
+        done_requests.tailp = &requests->next;
+    }
+}
+
+squidaio_result_t *
+squidaio_poll_done(void)
+{
+    squidaio_request_t *request;
+    squidaio_result_t *resultp;
+    int cancelled;
+    int polled = 0;
+
+AIO_REPOLL:
+    request = done_requests.head;
+
+    if (request == NULL && !polled) {
+        CommIO::ResetNotifications();
+        squidaio_poll_queues();
+        polled = 1;
+        request = done_requests.head;
+    }
+
+    if (!request) {
+        return NULL;
+    }
+
+    debug(43, 9) ("squidaio_poll_done: %p type=%d result=%p\n",
+                  request, request->request_type, request->resultp);
+    done_requests.head = request->next;
+
+    if (!done_requests.head)
+        done_requests.tailp = &done_requests.head;
+
+    resultp = request->resultp;
+
+    cancelled = request->cancelled;
+
+    squidaio_debug(request);
+
+    debug(43, 5) ("DONE: %d -> %d\n", request->ret, request->err);
+
+    squidaio_cleanup_request(request);
+
+    if (cancelled)
+        goto AIO_REPOLL;
+
+    return resultp;
+}                              /* squidaio_poll_done */
+
+int
+squidaio_operations_pending(void)
+{
+    return request_queue_len + (done_requests.head ? 1 : 0);
+}
+
+int
+squidaio_sync(void)
+{
+    /* XXX This might take a while if the queue is large.. */
+
+    do {
+        squidaio_poll_queues();
+    } while (request_queue_len > 0);
+
+    return squidaio_operations_pending();
+}
+
+int
+squidaio_get_queue_len(void)
+{
+    return request_queue_len;
+}
+
+static void
+squidaio_debug(squidaio_request_t * request)
+{
+    switch (request->request_type) {
+
+    case _AIO_OP_OPEN:
+        debug(43, 5) ("OPEN of %s to FD %d\n", request->path, request->ret);
+        break;
+
+    case _AIO_OP_READ:
+        debug(43, 5) ("READ on fd: %d\n", request->fd);
+        break;
+
+    case _AIO_OP_WRITE:
+        debug(43, 5) ("WRITE on fd: %d\n", request->fd);
+        break;
+
+    case _AIO_OP_CLOSE:
+        debug(43, 5) ("CLOSE of fd: %d\n", request->fd);
+        break;
+
+    case _AIO_OP_UNLINK:
+        debug(43, 5) ("UNLINK of %s\n", request->path);
+        break;
+
+    case _AIO_OP_TRUNCATE:
+        debug(43, 5) ("UNLINK of %s\n", request->path);
+        break;
+
+    default:
+        break;
+    }
+}
+
+void
+squidaio_stats(StoreEntry * sentry)
+{
+    squidaio_thread_t *threadp;
+    int i;
+
+    if (!squidaio_initialised)
+        return;
+
+    storeAppendPrintf(sentry, "\n\nThreads Status:\n");
+
+    storeAppendPrintf(sentry, "#\tID\t# Requests\n");
+
+    threadp = threads;
+
+    for (i = 0; i < NUMTHREADS; i++) {
+        storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
+        threadp = threadp->next;
+    }
+}
index 93543804ed58e38329848eafa3ed9ca1f9d93970..e384d3929b2e3868d6ae06fd1a3e0ffab9e66634 100644 (file)
@@ -1,7 +1,7 @@
 #
 #  Makefile for the Squid Object Cache server
 #
-#  $Id: Makefile.am,v 1.164 2006/09/04 20:15:21 serassio Exp $
+#  $Id: Makefile.am,v 1.165 2006/09/06 19:36:42 serassio Exp $
 #
 #  Uncomment and customize the following to suit your needs:
 #
@@ -162,6 +162,12 @@ else
 AIO_WIN32_SOURCES =
 endif
 
+if USE_AIOPS_WIN32
+AIOPS_SOURCE = DiskIO/DiskThreads/aiops_win32.cc
+else
+AIOPS_SOURCE = DiskIO/DiskThreads/aiops.cc
+endif
+
 IDENT_ALL_SOURCE = ACLIdent.cc ACLIdent.h ident.cc
 if ENABLE_IDENT
 IDENT_SOURCE = $(IDENT_ALL_SOURCE)
@@ -278,7 +284,9 @@ EXTRA_squid_SOURCES = \
        $(SNMP_ALL_SOURCE) \
        unlinkd.cc \
        $(SSL_ALL_SOURCE) \
-       $(WIN32_ALL_SOURCE)
+       $(WIN32_ALL_SOURCE) \
+       DiskIO/DiskThreads/aiops.cc \
+       DiskIO/DiskThreads/aiops_win32.cc
 
 squid_ACLSOURCES = \
        $(ARP_ACL_SOURCE) \
@@ -938,14 +946,14 @@ libDiskDaemon_a_SOURCES = \
                DiskIO/DiskDaemon/DiskDaemonDiskIOModule.h
 
 libDiskThreads_a_SOURCES = \
-       DiskIO/DiskThreads/aiops.cc \
-       DiskIO/DiskThreads/async_io.cc \
-       DiskIO/DiskThreads/DiskThreads.h \
-       DiskIO/DiskThreads/DiskThreadsDiskFile.cc \
-       DiskIO/DiskThreads/DiskThreadsDiskFile.h \
-       DiskIO/DiskThreads/DiskThreadsDiskIOModule.h \
-       DiskIO/DiskThreads/DiskThreadsIOStrategy.cc \
-       DiskIO/DiskThreads/DiskThreadsIOStrategy.h
+               $(AIOPS_SOURCE) \
+               DiskIO/DiskThreads/async_io.cc \
+               DiskIO/DiskThreads/DiskThreads.h \
+               DiskIO/DiskThreads/DiskThreadsDiskFile.cc \
+               DiskIO/DiskThreads/DiskThreadsDiskFile.h \
+               DiskIO/DiskThreads/DiskThreadsDiskIOModule.h \
+               DiskIO/DiskThreads/DiskThreadsIOStrategy.cc \
+               DiskIO/DiskThreads/DiskThreadsIOStrategy.h
 
 DiskIO_DiskDaemon_diskd_SOURCES = DiskIO/DiskDaemon/diskd.cc 
 DiskIO_DiskDaemon_diskd_LDADD = $(top_builddir)/lib/libmiscutil.a @XTRA_LIBS@