]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Major rewrite of async-io to make it behave a bit more sane with
authorhno <>
Sat, 11 Nov 2000 04:42:03 +0000 (04:42 +0000)
committerhno <>
Sat, 11 Nov 2000 04:42:03 +0000 (04:42 +0000)
substantially less overhead.

Some tuning work still remains to make it perform optimal. See the start
of store_asyncufs.h for all the knobs.

src/fs/aufs/Makefile.in
src/fs/aufs/aiops.cc
src/fs/aufs/async_io.cc
src/fs/aufs/store_asyncufs.h
src/fs/aufs/store_dir_aufs.cc
src/fs/aufs/store_io_aufs.cc

index a577db77d54ed3abf978f191ee2fc8e2c0b178bc..51e65dbc2277b768c05527f0c6a677e9bd93be83 100644 (file)
@@ -1,7 +1,7 @@
 #
 #  Makefile for the AUFS storage driver for the Squid Object Cache server
 #
-#  $Id: Makefile.in,v 1.1 2000/05/03 17:15:46 adrian Exp $
+#  $Id: Makefile.in,v 1.2 2000/11/10 21:42:03 hno Exp $
 #
 
 FS             = aufs
@@ -36,6 +36,7 @@ $(OUT): $(OBJS)
        $(RANLIB) $(OUT)
 
 $(OBJS): $(top_srcdir)/include/version.h ../../../include/autoconf.h
+$(OBJS): store_asyncufs.h
 
 .c.o:
        @rm -f ../stamp
index b42c3cf2b1fb16740b9d6017e268b7fe501081b7..d7b5e94824837b6c417869914fce1e71161d5e25 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * $Id: aiops.cc,v 1.3 2000/10/17 08:06:07 adrian Exp $
+ * $Id: aiops.cc,v 1.4 2000/11/10 21:42:03 hno Exp $
  *
  * DEBUG: section 43    AIOPS
  * AUTHOR: Stewart Forster <slf@connect.com.au>
 
 #define RIDICULOUS_LENGTH      4096
 
-#if defined(_SQUID_LINUX_)
-/* Linux requires proper use of mutexes or it will segfault deep in the
- * thread libraries. Observed on Alpha SMP Linux 2.2.10-ac12.
- */
-#define AIO_PROPER_MUTEX 1
-#endif
-
 enum _aio_thread_status {
     _THREAD_STARTING = 0,
     _THREAD_WAITING,
@@ -77,6 +70,7 @@ enum _aio_request_type {
 };
 
 typedef struct aio_request_t {
+    struct aio_request_t *next;
     enum _aio_request_type request_type;
     int cancelled;
     char *path;
@@ -93,20 +87,25 @@ typedef struct aio_request_t {
     struct stat *tmpstatp;
     struct stat *statp;
     aio_result_t *resultp;
-    struct aio_request_t *next;
 } aio_request_t;
 
-
-typedef struct aio_thread_t {
+typedef struct aio_request_queue_t {
+    pthread_mutex_t mutex;
+    pthread_cond_t cond;
+    aio_request_t * volatile head;
+    aio_request_t * volatile * volatile tailp;
+    unsigned long requests;
+    unsigned long blocked; /* main failed to lock the queue */
+} aio_request_queue_t;
+
+typedef struct aio_thread_t aio_thread_t;
+struct aio_thread_t {
+    aio_thread_t *next;
     pthread_t thread;
     enum _aio_thread_status status;
-    pthread_mutex_t mutex;     /* Mutex for testing condition variable */
-    pthread_cond_t cond;       /* Condition variable */
-    struct aio_request_t *volatile req;                /* set by main, cleared by thread */
-    struct aio_request_t *processed_req;       /* reminder to main */
-    struct aio_thread_t *next;
-} aio_thread_t;
-
+    struct aio_request_t *current_req;
+    unsigned long requests;
+};
 
 int aio_cancel(aio_result_t *);
 int aio_open(const char *, int, mode_t, aio_result_t *);
@@ -121,7 +120,6 @@ int aio_sync(void);
 
 static void aio_init(void);
 static void aio_queue_request(aio_request_t *);
-static void aio_process_request_queue(void);
 static void aio_cleanup_request(aio_request_t *);
 static void *aio_thread_loop(void *);
 static void aio_do_open(aio_request_t *);
@@ -135,30 +133,31 @@ static void aio_do_truncate(aio_request_t *);
 static void *aio_do_opendir(aio_request_t *);
 #endif
 static void aio_debug(aio_request_t *);
-static void aio_poll_threads(void);
+static void aio_poll_queues(void);
 
-static aio_thread_t *threads;
+static aio_thread_t *threads = NULL;
 static int aio_initialised = 0;
 
+
 #define AIO_LARGE_BUFS  16384
 #define AIO_MEDIUM_BUFS        AIO_LARGE_BUFS >> 1
 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
 #define AIO_TINY_BUFS  AIO_LARGE_BUFS >> 3
+#define AIO_MICRO_BUFS 128
 
-static MemPool *aio_large_bufs = NULL;         // 16K
-static MemPool *aio_medium_bufs = NULL;                // 8K
-static MemPool *aio_small_bufs = NULL;         // 4K
-static MemPool *aio_tiny_bufs = NULL;          // 2K
+static MemPool *aio_large_bufs = NULL;         /* 16K */
+static MemPool *aio_medium_bufs = NULL;                /* 8K */
+static MemPool *aio_small_bufs = NULL;         /* 4K */
+static MemPool *aio_tiny_bufs = NULL;          /* 2K */
+static MemPool *aio_micro_bufs = NULL;         /* 128K */
 
 static int request_queue_len = 0;
 static MemPool *aio_request_pool = NULL;
-static aio_request_t *request_queue_head = NULL;
-static aio_request_t *request_queue_tail = NULL;
-static aio_request_t *request_done_head = NULL;
-static aio_request_t *request_done_tail = NULL;
-static aio_thread_t *wait_threads = NULL;
-static aio_thread_t *busy_threads_head = NULL;
-static aio_thread_t *busy_threads_tail = NULL;
+static MemPool *aio_thread_pool = NULL;
+static aio_request_queue_t request_queue;
+static struct { aio_request_t *head, **tailp; } request_queue2 = { NULL, &request_queue2.head };
+static aio_request_queue_t done_queue;
+static struct { aio_request_t *head, **tailp; } done_requests = { NULL, &done_requests.head };
 static pthread_attr_t globattr;
 static struct sched_param globsched;
 static pthread_t main_thread;
@@ -168,7 +167,9 @@ aio_get_pool(int size)
 {
     MemPool *p;
     if (size <= AIO_LARGE_BUFS) {
-        if (size <= AIO_TINY_BUFS)
+        if (size <= AIO_MICRO_BUFS)
+           p = aio_micro_bufs;
+       else if (size <= AIO_TINY_BUFS)
            p = aio_tiny_bufs;
         else if (size <= AIO_SMALL_BUFS)
            p = aio_small_bufs;
@@ -195,6 +196,18 @@ aio_xmalloc(int size)
     return p;
 }
 
+static char *
+aio_xstrdup(const char *str)
+{
+    char *p;
+    int len = strlen(str)+1;
+
+    p = aio_xmalloc(len);
+    strncpy(p, str, len);
+
+    return p;
+}
+
 static void
 aio_xfree(void *p, int size)
 {
@@ -206,6 +219,18 @@ aio_xfree(void *p, int size)
         xfree(p);
 }
 
+static void
+aio_xstrfree(char *str)
+{
+    MemPool *pool;
+    int len = strlen(str)+1;
+
+    if ( (pool = aio_get_pool(len)) != NULL) {
+        memPoolFree(pool, str);
+    } else
+        xfree(str);
+}
+
 static void
 aio_init(void)
 {
@@ -229,32 +254,40 @@ aio_init(void)
     pthread_attr_setschedparam(&globattr, &globsched);
 #endif
 
-    /* Create threads and get them to sit in their wait loop */
-    threads = xcalloc(NUMTHREADS, sizeof(aio_thread_t));
+    /* Initialize request queue */
+    if (pthread_mutex_init(&(request_queue.mutex), NULL))
+       fatal("Failed to create mutex");
+    if (pthread_cond_init(&(request_queue.cond), NULL))
+       fatal("Failed to create condition variable");
+    request_queue.head = NULL;
+    request_queue.tailp = &request_queue.head;
+    request_queue.requests = 0;
+    request_queue.blocked = 0;
+
+    /* Initialize done queue */
+    if (pthread_mutex_init(&(done_queue.mutex), NULL))
+       fatal("Failed to create mutex");
+    if (pthread_cond_init(&(done_queue.cond), NULL))
+       fatal("Failed to create condition variable");
+    done_queue.head = NULL;
+    done_queue.tailp = &done_queue.head;
+    done_queue.requests = 0;
+    done_queue.blocked = 0;
 
+    /* Create threads and get them to sit in their wait loop */
+    aio_thread_pool = memPoolCreate("aio_thread", sizeof(aio_thread_t));
     for (i = 0; i < NUMTHREADS; i++) {
-       threadp = &threads[i];
+       threadp = memPoolAlloc(aio_thread_pool);
        threadp->status = _THREAD_STARTING;
-       if (pthread_mutex_init(&(threadp->mutex), NULL)) {
-           threadp->status = _THREAD_FAILED;
-           continue;
-       }
-       if (pthread_cond_init(&(threadp->cond), NULL)) {
-           threadp->status = _THREAD_FAILED;
-           continue;
-       }
-       threadp->req = NULL;
-       threadp->processed_req = NULL;
+       threadp->current_req = NULL;
+       threadp->requests = 0;
+       threadp->next = threads;
+       threads = threadp;
        if (pthread_create(&threadp->thread, &globattr, aio_thread_loop, threadp)) {
            fprintf(stderr, "Thread creation failed\n");
            threadp->status = _THREAD_FAILED;
            continue;
        }
-       threadp->next = wait_threads;
-       wait_threads = threadp;
-#if AIO_PROPER_MUTEX
-       pthread_mutex_lock(&threadp->mutex);
-#endif
     }
 
     /* Create request pool */
@@ -263,6 +296,7 @@ aio_init(void)
     aio_medium_bufs = memPoolCreate("aio_medium_bufs", AIO_MEDIUM_BUFS);
     aio_small_bufs = memPoolCreate("aio_small_bufs", AIO_SMALL_BUFS);
     aio_tiny_bufs = memPoolCreate("aio_tiny_bufs", AIO_TINY_BUFS);
+    aio_micro_bufs = memPoolCreate("aio_micro_bufs", AIO_MICRO_BUFS);
 
     aio_initialised = 1;
 }
@@ -274,9 +308,6 @@ aio_thread_loop(void *ptr)
     aio_thread_t *threadp = ptr;
     aio_request_t *request;
     sigset_t new;
-#if !AIO_PROPER_MUTEX
-    struct timespec wait_time;
-#endif
 
     /*
      * Make sure to ignore signals which may possibly get sent to
@@ -300,29 +331,25 @@ aio_thread_loop(void *ptr)
     sigaddset(&new, SIGALRM);
     pthread_sigmask(SIG_BLOCK, &new, NULL);
 
-    pthread_mutex_lock(&threadp->mutex);
     while (1) {
-#if AIO_PROPER_MUTEX
-       while (threadp->req == NULL) {
+       threadp->current_req = request = NULL;
+       request = NULL;
+       /* Get a request to process */
            threadp->status = _THREAD_WAITING;
-           pthread_cond_wait(&threadp->cond, &threadp->mutex);
+       pthread_mutex_lock(&request_queue.mutex);
+       while(!request_queue.head) {
+           pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
        }
-#else
-       /* The timeout is used to unlock the race condition where
-        * ->req is set between the check and pthread_cond_wait.
-        * The thread steps it's own clock on each timeout, to avoid a CPU
-        * spin situation if the main thread is suspended (paging), and
-        * squid_curtime is not being updated timely.
-        */
-       wait_time.tv_sec = squid_curtime + 1;   /* little quicker first time */
-       wait_time.tv_nsec = 0;
-       while (threadp->req == NULL) {
-           threadp->status = _THREAD_WAITING;
-           pthread_cond_timedwait(&threadp->cond, &threadp->mutex, &wait_time);
-           wait_time.tv_sec += 3;      /* then wait 3 seconds between each check */
-       }
-#endif
-       request = threadp->req;
+       request = request_queue.head;
+       if (request)
+           request_queue.head = request->next;
+           if (!request_queue.head)
+               request_queue.tailp = &request_queue.head;
+       pthread_mutex_unlock(&request_queue.mutex);
+       /* process the request */
+       threadp->status = _THREAD_BUSY;
+       request->next = NULL;
+       threadp->current_req = request;
        errno = 0;
        if (!request->cancelled) {
            switch (request->request_type) {
@@ -361,51 +388,73 @@ aio_thread_loop(void *ptr)
            request->ret = -1;
            request->err = EINTR;
        }
-       threadp->req = NULL;    /* tells main thread that we are done */
-    }                          /* while */
+       threadp->status = _THREAD_DONE;
+       /* put the request in the done queue */
+       pthread_mutex_lock(&done_queue.mutex);
+       *done_queue.tailp = request;
+       done_queue.tailp = &request->next;
+       pthread_mutex_unlock(&done_queue.mutex);
+       threadp->requests++;
+    }                          /* while forever */
     return NULL;
 }                              /* aio_thread_loop */
 
 static void
-aio_do_request(aio_request_t * requestp)
-{
-    if (wait_threads == NULL && busy_threads_head == NULL) {
-       fprintf(stderr, "PANIC: No threads to service requests with!\n");
-       exit(-1);
-    }
-    aio_queue_request(requestp);
-}                              /* aio_do_request */
-
-
-static void
-aio_queue_request(aio_request_t * requestp)
+aio_queue_request(aio_request_t * request)
 {
-    aio_request_t *rp;
-    static int last_warn = 0;
     static int high_start = 0;
-    static int queue_high, queue_low;
-    int i;
-
+    debug(41, 9) ("aio_queue_request: %p type=%d result=%p\n",
+           request, request->request_type, request->resultp);
     /* Mark it as not executed (failing result, no error) */
-    requestp->ret = -1;
-    requestp->err = 0;
-    /* Queue it on the request queue */
-    if (request_queue_head == NULL) {
-       request_queue_head = requestp;
-       request_queue_tail = requestp;
+    request->ret = -1;
+    request->err = 0;
+    /* Internal housekeeping */
+    request_queue_len += 1;
+    request->resultp->_data = request;
+    /* Play some tricks with the request_queue2 queue */
+    request->next = NULL;
+    if (!request_queue2.head) {
+       if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
+           /* Normal path */
+           *request_queue.tailp = request;
+           request_queue.tailp = &request->next;
+           pthread_cond_signal(&request_queue.cond);
+           pthread_mutex_unlock(&request_queue.mutex);
     } else {
-       request_queue_tail->next = requestp;
-       request_queue_tail = requestp;
+           /* Oops, the request queue is blocked, use request_queue2 */
+           *request_queue2.tailp = request;
+           request_queue2.tailp = &request->next;
+    }
+    } else {
+       /* Secondary path. We have blocked requests to deal with */
+       /* add the request to the chain */
+       *request_queue2.tailp = request;
+       if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
+           /* Ok, the queue is no longer blocked */
+           *request_queue.tailp = request_queue2.head;
+           request_queue.tailp = &request->next;
+           pthread_cond_signal(&request_queue.cond);
+           pthread_mutex_unlock(&request_queue.mutex);
+           request_queue2.head = NULL;
+           request_queue2.tailp = &request_queue2.head;
+       } else {
+           /* still blocked, bump the blocked request chain */
+           request_queue2.tailp = &request->next;
+       }
+    }
+    if (request_queue2.head) {
+       static int filter = 0;
+       static int filter_limit = 8;
+       if (++filter >= filter_limit) {
+           filter_limit += filter;
+           filter = 0;
+           debug(43, 1) ("aio_queue_request: WARNING - Queue congestion\n");
+       }
     }
-    requestp->next = NULL;
-    request_queue_len += 1;
-    /* Poll done threads if needed */
-    if (wait_threads == NULL)
-       aio_poll_threads();
-    /* Kick it rolling */
-    aio_process_request_queue();
     /* Warn if out of threads */
-    if (request_queue_len > (NUMTHREADS >> 1)) {
+    if (request_queue_len > MAGIC1) {
+       static int last_warn = 0;
+       static int queue_high, queue_low;
        if (high_start == 0) {
            high_start = squid_curtime;
            queue_high = request_queue_len;
@@ -416,49 +465,17 @@ aio_queue_request(aio_request_t * requestp)
        if (request_queue_len < queue_low)
            queue_low = request_queue_len;
        if (squid_curtime >= (last_warn + 15) &&
-           squid_curtime >= (high_start + 3)) {
-           debug(43, 1) ("aio_queue_request: WARNING - Running out of I/O threads\n");
-           debug(43, 2) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n",
-               request_queue_len, queue_high, queue_low, squid_curtime - high_start);
-           debug(43, 1) ("aio_queue_request: Perhaps you should increase NUMTHREADS\n");
-           debug(43, 1) ("aio_queue_request: Or install more disks to share the load\n");
-           debug(43, 3) ("aio_queue_request: First %d items on request queue\n", NUMTHREADS);
-           rp = request_queue_head;
-           for (i = 1; i <= NUMTHREADS; i++) {
-               switch (rp->request_type) {
-               case _AIO_OP_OPEN:
-                   debug(43, 3) ("aio_queue_request: %d : open -> %s\n", i, rp->path);
-                   break;
-               case _AIO_OP_READ:
-                   debug(43, 3) ("aio_queue_request: %d : read -> FD = %d\n", i, rp->fd);
-                   break;
-               case _AIO_OP_WRITE:
-                   debug(43, 3) ("aio_queue_request: %d : write -> FD = %d\n", i, rp->fd);
-                   break;
-               case _AIO_OP_CLOSE:
-                   debug(43, 3) ("aio_queue_request: %d : close -> FD = %d\n", i, rp->fd);
-                   break;
-               case _AIO_OP_UNLINK:
-                   debug(43, 3) ("aio_queue_request: %d : unlink -> %s\n", i, rp->path);
-                   break;
-               case _AIO_OP_TRUNCATE:
-                   debug(43, 3) ("aio_queue_request: %d : truncate -> %s\n", i, rp->path);
-                   break;
-               case _AIO_OP_STAT:
-                   debug(43, 3) ("aio_queue_request: %d : stat -> %s\n", i, rp->path);
-                   break;
-               default:
-                   debug(43, 1) ("aio_queue_request: %d : Unimplemented request type: %d\n", i, rp->request_type);
-                   break;
-               }
-               if ((rp = rp->next) == NULL)
-                   break;
-           }
+           squid_curtime >= (high_start + 5)) {
+           debug(43, 1) ("aio_queue_request: WARNING - Disk I/O overloading\n");
+           if (squid_curtime >= (high_start + 15))
+               debug(43, 1) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n",
+                       request_queue_len, queue_high, queue_low, squid_curtime - high_start);
            last_warn = squid_curtime;
        }
     } else {
        high_start = 0;
     }
+    /* Warn if seriously overloaded */
     if (request_queue_len > RIDICULOUS_LENGTH) {
        debug(43, 0) ("aio_queue_request: Async request queue growing uncontrollably!\n");
        debug(43, 0) ("aio_queue_request: Syncing pending I/O operations.. (blocking)\n");
@@ -467,47 +484,6 @@ aio_queue_request(aio_request_t * requestp)
     }
 }                              /* aio_queue_request */
 
-
-static void
-aio_process_request_queue(void)
-{
-    aio_thread_t *threadp;
-    aio_request_t *requestp;
-
-    for (;;) {
-       if (wait_threads == NULL || request_queue_head == NULL)
-           return;
-
-       requestp = request_queue_head;
-       if ((request_queue_head = requestp->next) == NULL)
-           request_queue_tail = NULL;
-       requestp->next = NULL;
-       request_queue_len--;
-
-       if (requestp->cancelled) {
-           aio_cleanup_request(requestp);
-           continue;
-       }
-       threadp = wait_threads;
-       wait_threads = threadp->next;
-       threadp->next = NULL;
-
-       if (busy_threads_head != NULL)
-           busy_threads_tail->next = threadp;
-       else
-           busy_threads_head = threadp;
-       busy_threads_tail = threadp;
-
-       threadp->status = _THREAD_BUSY;
-       threadp->req = threadp->processed_req = requestp;
-       pthread_cond_signal(&(threadp->cond));
-#if AIO_PROPER_MUTEX
-       pthread_mutex_unlock(&threadp->mutex);
-#endif
-    }
-}                              /* aio_process_request_queue */
-
-
 static void
 aio_cleanup_request(aio_request_t * requestp)
 {
@@ -521,11 +497,13 @@ aio_cleanup_request(aio_request_t * requestp)
        if (!cancelled && requestp->ret == 0)
            xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
        aio_xfree(requestp->tmpstatp, sizeof(struct stat));
+       aio_xstrfree(requestp->path);
+       break;
     case _AIO_OP_OPEN:
        if (cancelled && requestp->ret >= 0)
            /* The open() was cancelled but completed */
            close(requestp->ret);
-       xfree(requestp->path);
+       aio_xstrfree(requestp->path);
        break;
     case _AIO_OP_CLOSE:
        if (cancelled && requestp->ret < 0)
@@ -535,11 +513,13 @@ aio_cleanup_request(aio_request_t * requestp)
     case _AIO_OP_UNLINK:
     case _AIO_OP_TRUNCATE:
     case _AIO_OP_OPENDIR:
-       xfree(requestp->path);
+       aio_xstrfree(requestp->path);
        break;
     case _AIO_OP_READ:
        if (!cancelled && requestp->ret > 0)
            xmemcpy(requestp->bufferp, requestp->tmpbufp, requestp->ret);
+       aio_xfree(requestp->tmpbufp, requestp->buflen);
+       break;
     case _AIO_OP_WRITE:
        aio_xfree(requestp->tmpbufp, requestp->buflen);
        break;
@@ -557,27 +537,16 @@ aio_cleanup_request(aio_request_t * requestp)
 int
 aio_cancel(aio_result_t * resultp)
 {
-    aio_thread_t *threadp;
-    aio_request_t *requestp;
+    aio_request_t *request = resultp->_data;
 
-    for (threadp = busy_threads_head; threadp != NULL; threadp = threadp->next)
-       if (threadp->processed_req->resultp == resultp) {
-           threadp->processed_req->cancelled = 1;
-           threadp->processed_req->resultp = NULL;
-           return 0;
-       }
-    for (requestp = request_queue_head; requestp != NULL; requestp = requestp->next)
-       if (requestp->resultp == resultp) {
-           requestp->cancelled = 1;
-           requestp->resultp = NULL;
-           return 0;
-       }
-    for (requestp = request_done_head; requestp != NULL; requestp = requestp->next)
-       if (requestp->resultp == resultp) {
-           requestp->cancelled = 1;
-           requestp->resultp = NULL;
-           return 0;
-       }
+    if (request && request->resultp == resultp) {
+       debug(41, 9) ("aio_cancel: %p type=%d result=%p\n",
+               request, request->request_type, request->resultp);
+       request->cancelled = 1;
+       request->resultp = NULL;
+       resultp->_data = NULL;
+       return 0;
+    }
     return 1;
 }                              /* aio_cancel */
 
@@ -586,28 +555,18 @@ int
 aio_open(const char *path, int oflag, mode_t mode, aio_result_t * resultp)
 {
     aio_request_t *requestp;
-    int len;
 
     if (!aio_initialised)
        aio_init();
-    if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
-       errno = ENOMEM;
-       return -1;
-    }
-    len = strlen(path) + 1;
-    if ((requestp->path = (char *) xmalloc(len)) == NULL) {
-       memPoolFree(aio_request_pool, requestp);
-       errno = ENOMEM;
-       return -1;
-    }
-    strncpy(requestp->path, path, len);
+    requestp = memPoolAlloc(aio_request_pool);
+    requestp->path = (char *) aio_xstrdup(path);
     requestp->oflag = oflag;
     requestp->mode = mode;
     requestp->resultp = resultp;
     requestp->request_type = _AIO_OP_OPEN;
     requestp->cancelled = 0;
 
-    aio_do_request(requestp);
+    aio_queue_request(requestp);
     return 0;
 }
 
@@ -627,10 +586,7 @@ aio_read(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t *
 
     if (!aio_initialised)
        aio_init();
-    if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
-       errno = ENOMEM;
-       return -1;
-    }
+    requestp = memPoolAlloc(aio_request_pool);
     requestp->fd = fd;
     requestp->bufferp = bufp;
     requestp->tmpbufp = (char *) aio_xmalloc(bufs);
@@ -641,7 +597,7 @@ aio_read(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t *
     requestp->request_type = _AIO_OP_READ;
     requestp->cancelled = 0;
 
-    aio_do_request(requestp);
+    aio_queue_request(requestp);
     return 0;
 }
 
@@ -662,10 +618,7 @@ aio_write(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t *
 
     if (!aio_initialised)
        aio_init();
-    if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
-       errno = ENOMEM;
-       return -1;
-    }
+    requestp = memPoolAlloc(aio_request_pool);
     requestp->fd = fd;
     requestp->tmpbufp = (char *) aio_xmalloc(bufs);
     xmemcpy(requestp->tmpbufp, bufp, bufs);
@@ -676,7 +629,7 @@ aio_write(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t *
     requestp->request_type = _AIO_OP_WRITE;
     requestp->cancelled = 0;
 
-    aio_do_request(requestp);
+    aio_queue_request(requestp);
     return 0;
 }
 
@@ -696,16 +649,13 @@ aio_close(int fd, aio_result_t * resultp)
 
     if (!aio_initialised)
        aio_init();
-    if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
-       errno = ENOMEM;
-       return -1;
-    }
+    requestp = memPoolAlloc(aio_request_pool);
     requestp->fd = fd;
     requestp->resultp = resultp;
     requestp->request_type = _AIO_OP_CLOSE;
     requestp->cancelled = 0;
 
-    aio_do_request(requestp);
+    aio_queue_request(requestp);
     return 0;
 }
 
@@ -722,24 +672,18 @@ int
 aio_stat(const char *path, struct stat *sb, aio_result_t * resultp)
 {
     aio_request_t *requestp;
-    int len;
 
     if (!aio_initialised)
        aio_init();
-    if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
-       errno = ENOMEM;
-       return -1;
-    }
-    len = strlen(path) + 1;
-    requestp->path = (char *) xmalloc(len);
-    strncpy(requestp->path, path, len);
+    requestp = memPoolAlloc(aio_request_pool);
+    requestp->path = (char *) aio_xstrdup(path);
     requestp->statp = sb;
     requestp->tmpstatp = (struct stat *) aio_xmalloc(sizeof(struct stat));
     requestp->resultp = resultp;
     requestp->request_type = _AIO_OP_STAT;
     requestp->cancelled = 0;
 
-    aio_do_request(requestp);
+    aio_queue_request(requestp);
     return 0;
 }
 
@@ -756,22 +700,16 @@ int
 aio_unlink(const char *path, aio_result_t * resultp)
 {
     aio_request_t *requestp;
-    int len;
 
     if (!aio_initialised)
        aio_init();
-    if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
-       errno = ENOMEM;
-       return -1;
-    }
-    len = strlen(path) + 1;
-    requestp->path = (char *) xmalloc(len);
-    strncpy(requestp->path, path, len);
+    requestp = memPoolAlloc(aio_request_pool);
+    requestp->path = aio_xstrdup(path);
     requestp->resultp = resultp;
     requestp->request_type = _AIO_OP_UNLINK;
     requestp->cancelled = 0;
 
-    aio_do_request(requestp);
+    aio_queue_request(requestp);
     return 0;
 }
 
@@ -787,27 +725,17 @@ int
 aio_truncate(const char *path, off_t length, aio_result_t * resultp)
 {
     aio_request_t *requestp;
-    int len;
 
     if (!aio_initialised)
        aio_init();
-    if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
-       errno = ENOMEM;
-       return -1;
-    }
-    len = strlen(path) + 1;
-    if ((requestp->path = (char *) xmalloc(len)) == NULL) {
-       memPoolFree(aio_request_pool, requestp);
-       errno = ENOMEM;
-       return -1;
-    }
+    requestp = memPoolAlloc(aio_request_pool);
+    requestp->path = (char *) aio_xstrdup(path);
     requestp->offset = length;
-    strncpy(requestp->path, path, len);
     requestp->resultp = resultp;
     requestp->request_type = _AIO_OP_TRUNCATE;
     requestp->cancelled = 0;
 
-    aio_do_request(requestp);
+    aio_queue_request(requestp);
     return 0;
 }
 
@@ -821,7 +749,7 @@ aio_do_truncate(aio_request_t * requestp)
 
 
 #if AIO_OPENDIR
-/* XXX aio_opendir NOT implemented? */
+/* XXX aio_opendir NOT implemented yet.. */
 
 int
 aio_opendir(const char *path, aio_result_t * resultp)
@@ -831,10 +759,7 @@ aio_opendir(const char *path, aio_result_t * resultp)
 
     if (!aio_initialised)
        aio_init();
-    if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
-       errno = ENOMEM;
-       return -1;
-    }
+    requestp = memPoolAlloc(aio_request_pool);
     return -1;
 }
 
@@ -846,82 +771,66 @@ aio_do_opendir(aio_request_t * requestp)
 
 #endif
 
-
-void
-aio_poll_threads(void)
-{
-    aio_thread_t *prev;
-    aio_thread_t *threadp;
-    aio_request_t *requestp;
-
-    do {                       /* while found completed thread */
-       prev = NULL;
-       threadp = busy_threads_head;
-       while (threadp) {
-           debug(43, 9) ("aio_poll_threads: %p: request type %d -> status %d\n",
-               threadp,
-               threadp->processed_req->request_type,
-               threadp->status);
-#if AIO_PROPER_MUTEX
-           if (threadp->req == NULL)
-               if (pthread_mutex_trylock(&threadp->mutex) == 0)
-                   break;
-#else
-           if (threadp->req == NULL)
-               break;
-#endif
-           prev = threadp;
-           threadp = threadp->next;
+static void
+aio_poll_queues(void)
+{
+    /* kick "overflow" request queue */
+    if (request_queue2.head &&
+           pthread_mutex_trylock(&request_queue.mutex) == 0) {
+       *request_queue.tailp = request_queue2.head;
+       request_queue.tailp = request_queue2.tailp;
+       pthread_cond_signal(&request_queue.cond);
+       pthread_mutex_unlock(&request_queue.mutex);
+       request_queue2.head = NULL;
+       request_queue2.tailp = &request_queue2.head;
+    }
+    /* poll done queue */
+    if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) {
+       struct aio_request_t *requests = done_queue.head;
+       done_queue.head = NULL;
+       done_queue.tailp = &done_queue.head;
+       pthread_mutex_unlock(&done_queue.mutex);
+       *done_requests.tailp = requests;
+       request_queue_len -= 1;
+       while(requests->next) {
+           requests = requests->next;
+           request_queue_len -= 1;
        }
-       if (threadp == NULL)
-           break;
-
-       if (prev == NULL)
-           busy_threads_head = busy_threads_head->next;
-       else
-           prev->next = threadp->next;
-
-       if (busy_threads_tail == threadp)
-           busy_threads_tail = prev;
-
-       requestp = threadp->processed_req;
-       threadp->processed_req = NULL;
-
-       threadp->next = wait_threads;
-       wait_threads = threadp;
-
-       if (request_done_tail != NULL)
-           request_done_tail->next = requestp;
-       else
-           request_done_head = requestp;
-       request_done_tail = requestp;
-    } while (threadp);
-
-    aio_process_request_queue();
-}                              /* aio_poll_threads */
+       done_requests.tailp = &requests->next;
+    }
+    /* Give up the CPU to allow the threads to do their work */
+    if (done_queue.head || request_queue.head)
+       sched_yield();
+}
 
 aio_result_t *
 aio_poll_done(void)
 {
-    aio_request_t *requestp;
+    aio_request_t *request;
     aio_result_t *resultp;
     int cancelled;
+    int polled = 0;
 
   AIO_REPOLL:
-    aio_poll_threads();
-    if (request_done_head == NULL) {
+    request = done_requests.head;
+    if (request == NULL && !polled) {
+       aio_poll_queues();
+       polled = 1;
+       request = done_requests.head;
+    }
+    if (!request) {
        return NULL;
     }
-    requestp = request_done_head;
-    request_done_head = requestp->next;
-    if (!request_done_head)
-       request_done_tail = NULL;
-
-    resultp = requestp->resultp;
-    cancelled = requestp->cancelled;
-    aio_debug(requestp);
-    debug(43, 5) ("DONE: %d -> %d\n", requestp->ret, requestp->err);
-    aio_cleanup_request(requestp);
+    debug(41, 9) ("aio_poll_done: %p type=%d result=%p\n",
+           request, request->request_type, request->resultp);
+    done_requests.head = request->next;
+    if (!done_requests.head)
+       done_requests.tailp = &done_requests.head;
+    resultp = request->resultp;
+    cancelled = request->cancelled;
+    aio_debug(request);
+    debug(43, 5) ("DONE: %d -> %d\n", request->ret, request->err);
+    aio_cleanup_request(request);
     if (cancelled)
        goto AIO_REPOLL;
     return resultp;
@@ -930,30 +839,15 @@ aio_poll_done(void)
 int
 aio_operations_pending(void)
 {
-    return request_queue_len + (request_done_head != NULL) + (busy_threads_head != NULL);
-}
-
-int
-aio_overloaded(void)
-{
-    static time_t last_warn = 0;
-    if (aio_operations_pending() > RIDICULOUS_LENGTH / 4) {
-       if (squid_curtime >= (last_warn + 15)) {
-           debug(43, 0) ("Warning: Async-IO overloaded\n");
-           last_warn = squid_curtime;
-       }
-       return 1;
-    }
-    return 0;
+    return request_queue_len + (done_requests.head ? 1 : 0);
 }
 
 int
 aio_sync(void)
 {
-    int loop_count = 0;
+    /* XXX This might take a while if the queue is large.. */
     do {
-       aio_poll_threads();
-       assert(++loop_count < 10);
+       aio_poll_queues();
     } while (request_queue_len > 0);
     return aio_operations_pending();
 }
@@ -965,26 +859,26 @@ aio_get_queue_len(void)
 }
 
 static void
-aio_debug(aio_request_t * requestp)
+aio_debug(aio_request_t * request)
 {
-    switch (requestp->request_type) {
+    switch (request->request_type) {
     case _AIO_OP_OPEN:
-       debug(43, 5) ("OPEN of %s to FD %d\n", requestp->path, requestp->ret);
+       debug(43, 5) ("OPEN of %s to FD %d\n", request->path, request->ret);
        break;
     case _AIO_OP_READ:
-       debug(43, 5) ("READ on fd: %d\n", requestp->fd);
+       debug(43, 5) ("READ on fd: %d\n", request->fd);
        break;
     case _AIO_OP_WRITE:
-       debug(43, 5) ("WRITE on fd: %d\n", requestp->fd);
+       debug(43, 5) ("WRITE on fd: %d\n", request->fd);
        break;
     case _AIO_OP_CLOSE:
-       debug(43, 5) ("CLOSE of fd: %d\n", requestp->fd);
+       debug(43, 5) ("CLOSE of fd: %d\n", request->fd);
        break;
     case _AIO_OP_UNLINK:
-       debug(43, 5) ("UNLINK of %s\n", requestp->path);
+       debug(43, 5) ("UNLINK of %s\n", request->path);
        break;
     case _AIO_OP_TRUNCATE:
-       debug(43, 5) ("UNLINK of %s\n", requestp->path);
+       debug(43, 5) ("UNLINK of %s\n", request->path);
        break;
     default:
        break;
index af8a52b8b1789ec640053bb4ab8bbb4bcb608f54..9f9565ca98bda13c9439414b6b74e4dfd715369a 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: async_io.cc,v 1.5 2000/06/27 08:33:53 hno Exp $
+ * $Id: async_io.cc,v 1.6 2000/11/10 21:42:03 hno Exp $
  *
  * DEBUG: section 32    Asynchronous Disk I/O
  * AUTHOR: Pete Bentley <pete@demon.net>
@@ -53,6 +53,9 @@ typedef struct aio_ctrl_t {
     AIOCB *done_handler;
     void *done_handler_data;
     aio_result_t result;
+    char *bufp;
+    FREE *free_func;
+    dlink_node node;
 } aio_ctrl_t;
 
 struct {
@@ -71,7 +74,7 @@ typedef struct aio_unlinkq_t {
     struct aio_unlinkq_t *next;
 } aio_unlinkq_t;
 
-static aio_ctrl_t *used_list = NULL;
+static dlink_list used_list;
 static int initialised = 0;
 static OBJH aioStats;
 static MemPool *aio_ctrl_pool;
@@ -107,7 +110,6 @@ void
 aioOpen(const char *path, int oflag, mode_t mode, AIOCB * callback, void *callback_data)
 {
     aio_ctrl_t *ctrlp;
-    int ret;
 
     assert(initialised);
     aio_counts.open++;
@@ -117,16 +119,9 @@ aioOpen(const char *path, int oflag, mode_t mode, AIOCB * callback, void *callba
     ctrlp->done_handler_data = callback_data;
     ctrlp->operation = _AIO_OPEN;
     cbdataLock(callback_data);
-    if (aio_open(path, oflag, mode, &ctrlp->result) < 0) {
-       ret = open(path, oflag, mode);
-       if (callback)
-           (callback) (ctrlp->fd, callback_data, ret, errno);
-       cbdataUnlock(callback_data);
-       memPoolFree(aio_ctrl_pool, ctrlp);
-       return;
-    }
-    ctrlp->next = used_list;
-    used_list = ctrlp;
+    ctrlp->result.data = ctrlp;
+    aio_open(path, oflag, mode, &ctrlp->result);
+    dlinkAdd(ctrlp, &ctrlp->node, &used_list);
     return;
 }
 
@@ -143,14 +138,9 @@ aioClose(int fd)
     ctrlp->done_handler = NULL;
     ctrlp->done_handler_data = NULL;
     ctrlp->operation = _AIO_CLOSE;
-    if (aio_close(fd, &ctrlp->result) < 0) {
-       close(fd);              /* Can't create thread - do a normal close */
-       memPoolFree(aio_ctrl_pool, ctrlp);
-       aioFDWasClosed(fd);
-       return;
-    }
-    ctrlp->next = used_list;
-    used_list = ctrlp;
+    ctrlp->result.data = ctrlp;
+    aio_close(fd, &ctrlp->result);
+    dlinkAdd(ctrlp, &ctrlp->node, &used_list);
     return;
 }
 
@@ -158,23 +148,20 @@ void
 aioCancel(int fd)
 {
     aio_ctrl_t *curr;
-    aio_ctrl_t *prev;
-    aio_ctrl_t *next;
     AIOCB *done_handler;
     void *their_data;
+    dlink_node *m, *next;
 
     assert(initialised);
     aio_counts.cancel++;
-    prev = NULL;
-    curr = used_list;
-    for (curr = used_list;; curr = next) {
-       while (curr != NULL) {
+    for (m = used_list.head; m; m = next) {
+       while (m) {
+           curr = m->data;
            if (curr->fd == fd)
                break;
-           prev = curr;
-           curr = curr->next;
+           m = m->next;
        }
-       if (curr == NULL)
+       if (m == NULL)
            break;
 
        aio_cancel(&curr->result);
@@ -188,12 +175,8 @@ aioCancel(int fd)
                done_handler(fd, their_data, -2, -2);
            cbdataUnlock(their_data);
        }
-       next = curr->next;
-       if (prev == NULL)
-           used_list = next;
-       else
-           prev->next = next;
-
+       next = m->next;
+       dlinkDelete(m, &used_list);
        memPoolFree(aio_ctrl_pool, curr);
     }
 }
@@ -207,23 +190,13 @@ aioWrite(int fd, int offset, char *bufp, int len, AIOCB * callback, void *callba
 
     assert(initialised);
     aio_counts.write++;
-    for (ctrlp = used_list; ctrlp != NULL; ctrlp = ctrlp->next)
-       if (ctrlp->fd == fd)
-           break;
-    if (ctrlp != NULL) {
-       debug(0, 0) ("aioWrite: EWOULDBLOCK\n");
-       errno = EWOULDBLOCK;
-       if (callback)
-           (callback) (fd, callback_data, -1, errno);
-       if (free_func)
-           free_func(bufp);
-       return;
-    }
     ctrlp = memPoolAlloc(aio_ctrl_pool);
     ctrlp->fd = fd;
     ctrlp->done_handler = callback;
     ctrlp->done_handler_data = callback_data;
     ctrlp->operation = _AIO_WRITE;
+    ctrlp->bufp = bufp;
+    ctrlp->free_func = free_func;
     if (offset >= 0)
        seekmode = SEEK_SET;
     else {
@@ -231,22 +204,9 @@ aioWrite(int fd, int offset, char *bufp, int len, AIOCB * callback, void *callba
        offset = 0;
     }
     cbdataLock(callback_data);
-    if (aio_write(fd, bufp, len, offset, seekmode, &ctrlp->result) < 0) {
-       if (errno == ENOMEM || errno == EAGAIN || errno == EINVAL)
-           errno = EWOULDBLOCK;
-       if (callback)
-           (callback) (fd, callback_data, -1, errno);
-       cbdataUnlock(callback_data);
-       memPoolFree(aio_ctrl_pool, ctrlp);
-    } else {
-       ctrlp->next = used_list;
-       used_list = ctrlp;
-    }
-    /*
-     * aio_write copies the buffer so we can free it here
-     */
-    if (free_func)
-       free_func(bufp);
+    ctrlp->result.data = ctrlp;
+    aio_write(fd, bufp, len, offset, seekmode, &ctrlp->result);
+    dlinkAdd(ctrlp, &ctrlp->node, &used_list);
 }                              /* aioWrite */
 
 
@@ -258,15 +218,6 @@ aioRead(int fd, int offset, char *bufp, int len, AIOCB * callback, void *callbac
 
     assert(initialised);
     aio_counts.read++;
-    for (ctrlp = used_list; ctrlp != NULL; ctrlp = ctrlp->next)
-       if (ctrlp->fd == fd)
-           break;
-    if (ctrlp != NULL) {
-       errno = EWOULDBLOCK;
-       if (callback)
-           (callback) (fd, callback_data, -1, errno);
-       return;
-    }
     ctrlp = memPoolAlloc(aio_ctrl_pool);
     ctrlp->fd = fd;
     ctrlp->done_handler = callback;
@@ -279,17 +230,9 @@ aioRead(int fd, int offset, char *bufp, int len, AIOCB * callback, void *callbac
        offset = 0;
     }
     cbdataLock(callback_data);
-    if (aio_read(fd, bufp, len, offset, seekmode, &ctrlp->result) < 0) {
-       if (errno == ENOMEM || errno == EAGAIN || errno == EINVAL)
-           errno = EWOULDBLOCK;
-       if (callback)
-           (callback) (fd, callback_data, -1, errno);
-       cbdataUnlock(callback_data);
-       memPoolFree(aio_ctrl_pool, ctrlp);
-       return;
-    }
-    ctrlp->next = used_list;
-    used_list = ctrlp;
+    ctrlp->result.data = ctrlp;
+    aio_read(fd, bufp, len, offset, seekmode, &ctrlp->result);
+    dlinkAdd(ctrlp, &ctrlp->node, &used_list);
     return;
 }                              /* aioRead */
 
@@ -306,25 +249,16 @@ aioStat(char *path, struct stat *sb, AIOCB * callback, void *callback_data)
     ctrlp->done_handler_data = callback_data;
     ctrlp->operation = _AIO_STAT;
     cbdataLock(callback_data);
-    if (aio_stat(path, sb, &ctrlp->result) < 0) {
-       if (errno == ENOMEM || errno == EAGAIN || errno == EINVAL)
-           errno = EWOULDBLOCK;
-       if (callback)
-           (callback) (ctrlp->fd, callback_data, -1, errno);
-       cbdataUnlock(callback_data);
-       memPoolFree(aio_ctrl_pool, ctrlp);
-       return;
-    }
-    ctrlp->next = used_list;
-    used_list = ctrlp;
+    ctrlp->result.data = ctrlp;
+    aio_stat(path, sb, &ctrlp->result);
+    dlinkAdd(ctrlp, &ctrlp->node, &used_list);
     return;
 }                              /* aioStat */
 
 void
-aioUnlink(const char *pathname, AIOCB * callback, void *callback_data)
+aioUnlink(const char *path, AIOCB * callback, void *callback_data)
 {
     aio_ctrl_t *ctrlp;
-    char *path;
     assert(initialised);
     aio_counts.unlink++;
     ctrlp = memPoolAlloc(aio_ctrl_pool);
@@ -332,27 +266,16 @@ aioUnlink(const char *pathname, AIOCB * callback, void *callback_data)
     ctrlp->done_handler = callback;
     ctrlp->done_handler_data = callback_data;
     ctrlp->operation = _AIO_UNLINK;
-    path = xstrdup(pathname);
     cbdataLock(callback_data);
-    if (aio_unlink(path, &ctrlp->result) < 0) {
-       int ret = unlink(path);
-        if (callback)
-           (callback) (ctrlp->fd, callback_data, ret, errno);
-       cbdataUnlock(callback_data);
-       memPoolFree(aio_ctrl_pool, ctrlp);
-       xfree(path);
-       return;
-    }
-    ctrlp->next = used_list;
-    used_list = ctrlp;
-    xfree(path);
+    ctrlp->result.data = ctrlp;
+    aio_unlink(path, &ctrlp->result);
+    dlinkAdd(ctrlp, &ctrlp->node, &used_list);
 }                              /* aioUnlink */
 
 void
-aioTruncate(const char *pathname, off_t length, AIOCB * callback, void *callback_data)
+aioTruncate(const char *path, off_t length, AIOCB * callback, void *callback_data)
 {
     aio_ctrl_t *ctrlp;
-    char *path;
     assert(initialised);
     aio_counts.unlink++;
     ctrlp = memPoolAlloc(aio_ctrl_pool);
@@ -360,20 +283,10 @@ aioTruncate(const char *pathname, off_t length, AIOCB * callback, void *callback
     ctrlp->done_handler = callback;
     ctrlp->done_handler_data = callback_data;
     ctrlp->operation = _AIO_TRUNCATE;
-    path = xstrdup(pathname);
     cbdataLock(callback_data);
-    if (aio_truncate(path, length, &ctrlp->result) < 0) {
-       int ret = truncate(path, length);
-        if (callback)
-           (callback) (ctrlp->fd, callback_data, ret, errno);
-       cbdataUnlock(callback_data);
-       memPoolFree(aio_ctrl_pool, ctrlp);
-       xfree(path);
-       return;
-    }
-    ctrlp->next = used_list;
-    used_list = ctrlp;
-    xfree(path);
+    ctrlp->result.data = ctrlp;
+    aio_truncate(path, length, &ctrlp->result);
+    dlinkAdd(ctrlp, &ctrlp->node, &used_list);
 }                              /* aioTruncate */
 
 
@@ -382,7 +295,6 @@ aioCheckCallbacks(SwapDir * SD)
 {
     aio_result_t *resultp;
     aio_ctrl_t *ctrlp;
-    aio_ctrl_t *prev;
     AIOCB *done_handler;
     void *their_data;
     int retval = 0;
@@ -392,26 +304,24 @@ aioCheckCallbacks(SwapDir * SD)
     for (;;) {
        if ((resultp = aio_poll_done()) == NULL)
            break;
-       prev = NULL;
-       for (ctrlp = used_list; ctrlp != NULL; prev = ctrlp, ctrlp = ctrlp->next)
-           if (&ctrlp->result == resultp)
-               break;
+       ctrlp = (aio_ctrl_t *)resultp->data;
        if (ctrlp == NULL)
-           continue;
-       if (prev == NULL)
-           used_list = ctrlp->next;
-       else
-           prev->next = ctrlp->next;
+           continue; /* XXX Should not happen */
+       dlinkDelete(&ctrlp->node, &used_list);
        if ((done_handler = ctrlp->done_handler)) {
            their_data = ctrlp->done_handler_data;
            ctrlp->done_handler = NULL;
            ctrlp->done_handler_data = NULL;
-           if (cbdataValid(their_data))
+           if (cbdataValid(their_data)) {
                 retval = 1; /* Return that we've actually done some work */
                done_handler(ctrlp->fd, their_data,
                    ctrlp->result.aio_return, ctrlp->result.aio_errno);
+           }
            cbdataUnlock(their_data);
        }
+       /* free data if requested to aioWrite() */
+       if (ctrlp->free_func)
+           ctrlp->free_func(ctrlp->bufp);
        if (ctrlp->operation == _AIO_CLOSE)
            aioFDWasClosed(ctrlp->fd);
        memPoolFree(aio_ctrl_pool, ctrlp);
index baa3d1a3033cb9e3d44fbf2d181a1457594b4682..737927d7dfa09a9a05be2bbd6e9da9e206fdcc02 100644 (file)
 #ifdef ASYNC_IO_THREADS
 #define NUMTHREADS ASYNC_IO_THREADS
 #else
-#define NUMTHREADS 16
+#define NUMTHREADS (Config.cacheSwap.n_configured*16)
 #endif
 
-#define MAGIC1 (NUMTHREADS/Config.cacheSwap.n_configured/2)
+/* Queue limit where swapouts are deferred (load calculation) */
+#define MAGIC1 (NUMTHREADS*Config.cacheSwap.n_configured*5)
+/* Queue limit where swapins are deferred (open/create fails) */
+#define MAGIC2 (NUMTHREADS*Config.cacheSwap.n_configured*20)
+
+/* Which operations to run async */
+#define ASYNC_OPEN 1
+#define ASYNC_CLOSE 0
+#define ASYNC_CREATE 1
+#define ASYNC_WRITE 0
+#define ASYNC_READ 1
 
 struct _aio_result_t {
     int aio_return;
     int aio_errno;
+    void *_data; /* Internal housekeeping */
+    void *data; /* Available to the caller */
 };
 
 typedef struct _aio_result_t aio_result_t;
@@ -35,7 +47,6 @@ int aio_truncate(const char *, off_t length, aio_result_t *);
 int aio_opendir(const char *, aio_result_t *);
 aio_result_t *aio_poll_done(void);
 int aio_operations_pending(void);
-int aio_overloaded(void);
 int aio_sync(void);
 int aio_get_queue_len(void);
 
@@ -68,6 +79,9 @@ struct _aiostate_t {
        unsigned int reading:1;
        unsigned int writing:1;
        unsigned int opening:1;
+       unsigned int write_kicking:1;
+       unsigned int read_kicking:1;
+       unsigned int inreaddone:1;
     } flags;
     const char *read_buf;
     link_list *pending_writes;
index cedb501f56730e83d14b5fe6776fa32eff6615bf..983ea0345747af3b6d3c6ed88f957b9276caa024 100644 (file)
@@ -1,6 +1,6 @@
 
 /*
- * $Id: store_dir_aufs.cc,v 1.15 2000/11/10 09:04:52 adrian Exp $
+ * $Id: store_dir_aufs.cc,v 1.16 2000/11/10 21:42:03 hno Exp $
  *
  * DEBUG: section 47    Store Directory Routines
  * AUTHOR: Duane Wessels
@@ -1374,10 +1374,8 @@ storeAufsDirCheckObj(SwapDir * SD, const StoreEntry * e)
     ql = aioQueueSize();
     if (ql == 0)
        loadav = 0;
-    else if (ql >= MAGIC1)     /* Queue is too long, don't even consider it */
-       loadav = -1;
-    else
-       loadav = MAGIC1 * 1000 / ql;
+    loadav = ql * 1000 / MAGIC1;
+    debug(41, 9) ("storeAufsDirCheckObj: load=%d\n", loadav);
     return loadav;
 }
 
index 4e932f00f8a213b8a0e288e0609d4f3a9c71d4ca..88afc1d267ede943e27caf515ff1dde7299789c4 100644 (file)
@@ -6,8 +6,16 @@
 #include "squid.h"
 #include "store_asyncufs.h"
 
+#if ASYNC_READ
 static AIOCB storeAufsReadDone;
+#else
+static DRCB storeAufsReadDone;
+#endif
+#if ASYNC_WRITE
 static AIOCB storeAufsWriteDone;
+#else
+static DWCB storeAufsWriteDone;
+#endif
 static void storeAufsIOCallback(storeIOState * sio, int errflag);
 static AIOCB storeAufsOpenDone;
 static int storeAufsSomethingPending(storeIOState *);
@@ -24,13 +32,25 @@ storeAufsOpen(SwapDir * SD, StoreEntry * e, STFNCB * file_callback,
     sfileno f = e->swap_filen;
     char *path = storeAufsDirFullPath(SD, f, NULL);
     storeIOState *sio;
+#if !ASYNC_OPEN
+    int fd;
+#endif
     debug(78, 3) ("storeAufsOpen: fileno %08X\n", f);
     /*
      * we should detect some 'too many files open' condition and return
      * NULL here.
      */
-    while (aioQueueSize() > MAGIC1)
+#ifdef MAGIC2
+    if (aioQueueSize() > MAGIC2)
        return NULL;
+#endif
+#if !ASYNC_OPEN
+    fd = file_open(path, O_RDONLY);
+    if (fd < 0) {
+       debug(78, 3) ("storeAufsOpen: got failude (%d)\n", errno);
+       return NULL;
+    }
+#endif
     sio = memAllocate(MEM_STORE_IO);
     cbdataAdd(sio, storeAufsIOFreeEntry, MEM_STORE_IO);
     sio->fsstate = memPoolAlloc(aio_state_pool);
@@ -43,8 +63,12 @@ storeAufsOpen(SwapDir * SD, StoreEntry * e, STFNCB * file_callback,
     sio->callback_data = callback_data;
     sio->e = e;
     cbdataLock(callback_data);
-    aioOpen(path, O_RDONLY, 0644, storeAufsOpenDone, sio);
     Opening_FD++;
+#if ASYNC_OPEN
+    aioOpen(path, O_RDONLY, 0644, storeAufsOpenDone, sio);
+#else
+    storeAufsOpenDone(fd, sio, fd, 0);
+#endif
     store_open_disk_fd++;
     return sio;
 }
@@ -57,6 +81,9 @@ storeAufsCreate(SwapDir * SD, StoreEntry * e, STFNCB * file_callback, STIOCB * c
     storeIOState *sio;
     sfileno filn;
     sdirno dirn;
+#if !ASYNC_CREATE
+    int fd;
+#endif
 
     /* Allocate a number */
     dirn = SD->index;
@@ -68,8 +95,17 @@ storeAufsCreate(SwapDir * SD, StoreEntry * e, STFNCB * file_callback, STIOCB * c
      * we should detect some 'too many files open' condition and return
      * NULL here.
      */
-    while (aioQueueSize() > MAGIC1)
+#ifdef MAGIC2
+    if (aioQueueSize() > MAGIC2)
+       return NULL;
+#endif
+#if !ASYNC_CREATE
+    fd = file_open(path, O_WRONLY | O_CREAT | O_TRUNC);
+    if (fd < 0) {
+       debug(78, 3) ("storeAufsCreate: got failude (%d)\n", errno);
        return NULL;
+    }
+#endif
     sio = memAllocate(MEM_STORE_IO);
     cbdataAdd(sio, storeAufsIOFreeEntry, MEM_STORE_IO);
     sio->fsstate = memPoolAlloc(aio_state_pool);
@@ -82,8 +118,12 @@ storeAufsCreate(SwapDir * SD, StoreEntry * e, STFNCB * file_callback, STIOCB * c
     sio->callback_data = callback_data;
     sio->e = (StoreEntry *) e;
     cbdataLock(callback_data);
-    aioOpen(path, O_WRONLY | O_CREAT | O_TRUNC, 0644, storeAufsOpenDone, sio);
     Opening_FD++;
+#if ASYNC_CREATE
+    aioOpen(path, O_WRONLY | O_CREAT | O_TRUNC, 0644, storeAufsOpenDone, sio);
+#else
+    storeAufsOpenDone(fd, sio, fd, 0);
+#endif
     store_open_disk_fd++;
 
     /* now insert into the replacement policy */
@@ -105,7 +145,7 @@ storeAufsClose(SwapDir * SD, storeIOState * sio)
        aiostate->flags.close_request = 1;
        return;
     }
-    storeAufsIOCallback(sio, 0);
+    storeAufsIOCallback(sio, DISK_OK);
 }
 
 
@@ -139,7 +179,11 @@ storeAufsRead(SwapDir * SD, storeIOState * sio, char *buf, size_t size, off_t of
        sio->swap_dirn, sio->swap_filen, aiostate->fd);
     sio->offset = offset;
     aiostate->flags.reading = 1;
+#if ASYNC_READ
     aioRead(aiostate->fd, offset, buf, size, storeAufsReadDone, sio);
+#else
+    file_read(aiostate->fd, offset, buf, size, storeAufsReadDone, sio);
+#endif
 }
 
 
@@ -162,6 +206,7 @@ storeAufsWrite(SwapDir * SD, storeIOState * sio, char *buf, size_t size, off_t o
        linklistPush(&(aiostate->pending_writes), q);
        return;
     }
+#if ASYNC_WRITE
     if (aiostate->flags.writing) {
        struct _queued_write *q;
        debug(78, 3) ("storeAufsWrite: queuing write\n");
@@ -178,9 +223,15 @@ storeAufsWrite(SwapDir * SD, storeIOState * sio, char *buf, size_t size, off_t o
      * XXX it might be nice if aioWrite() gave is immediate
      * feedback here about EWOULDBLOCK instead of in the
      * callback function
+     * XXX Should never give EWOULDBLOCK under normal operations
+     * if it does then the MAGIC1/2 tuning is wrong.
      */
     aioWrite(aiostate->fd, offset, buf, size, storeAufsWriteDone, sio,
        free_func);
+#else
+    file_write(aiostate->fd, offset, buf, size, storeAufsWriteDone, sio,
+       free_func);
+#endif
 }
 
 /* Unlink */
@@ -235,7 +286,7 @@ storeAufsOpenDone(int unused, void *my_data, int fd, int errflag)
        errno = errflag;
        debug(78, 0) ("storeAufsOpenDone: %s\n", xstrerror());
        debug(78, 1) ("\t%s\n", storeAufsDirFullPath(INDEXSD(sio->swap_dirn), sio->swap_filen, NULL));
-       storeAufsIOCallback(sio, errflag);
+       storeAufsIOCallback(sio, DISK_ERROR);
        return;
     }
     aiostate->fd = fd;
@@ -248,8 +299,20 @@ storeAufsOpenDone(int unused, void *my_data, int fd, int errflag)
     debug(78, 3) ("storeAufsOpenDone: exiting\n");
 }
 
+/*
+ * XXX TODO
+ * if errflag == EWOULDBLOCK, then we'll need to re-queue the
+ * chunk at the beginning of the write_pending list and try
+ * again later.
+ * XXX Should not normally happen. 
+ */
+#if ASYNC_READ
 static void
 storeAufsReadDone(int fd, void *my_data, int len, int errflag)
+#else
+static void
+storeAufsReadDone(int fd, int errflag, size_t len, void *my_data)
+#endif
 {
     storeIOState *sio = my_data;
     aiostate_t *aiostate = (aiostate_t *) sio->fsstate;
@@ -258,6 +321,7 @@ storeAufsReadDone(int fd, void *my_data, int len, int errflag)
     ssize_t rlen;
     debug(78, 3) ("storeAufsReadDone: dirno %d, fileno %08X, FD %d, len %d\n",
        sio->swap_dirn, sio->swap_filen, fd, len);
+    aiostate->flags.inreaddone = 1;
     aiostate->flags.reading = 0;
     if (errflag) {
        debug(78, 3) ("storeAufsReadDone: got failure (%d)\n", errflag);
@@ -266,6 +330,17 @@ storeAufsReadDone(int fd, void *my_data, int len, int errflag)
        rlen = (ssize_t) len;
        sio->offset += len;
     }
+#if ASYNC_READ
+    /* translate errflag from errno to Squid disk error */
+    errno = errflag;
+    if (errflag)
+       errflag = DISK_ERROR;
+    else
+       errflag = DISK_OK;
+#else
+    if (errflag == DISK_EOF)
+       errflag = DISK_OK; /* EOF is signalled by len == 0, not errors... */
+#endif
     assert(callback);
     assert(their_data);
     sio->read.callback = NULL;
@@ -273,13 +348,9 @@ storeAufsReadDone(int fd, void *my_data, int len, int errflag)
     if (cbdataValid(their_data))
        callback(their_data, aiostate->read_buf, rlen);
     cbdataUnlock(their_data);
-    /* 
-     * XXX is this safe?  The above callback may have caused sio
-     * to be freed/closed already? Philip Guenther <guenther@gac.edu>
-     * says it fixes his FD leaks, with no side effects.
-     */
+    aiostate->flags.inreaddone = 0;
     if (aiostate->flags.close_request)
-       storeAufsIOCallback(sio, DISK_OK);
+       storeAufsIOCallback(sio, errflag);
 }
 
 /*
@@ -287,15 +358,29 @@ storeAufsReadDone(int fd, void *my_data, int len, int errflag)
  * if errflag == EWOULDBLOCK, then we'll need to re-queue the
  * chunk at the beginning of the write_pending list and try
  * again later.
+ * XXX Should not normally happen. 
  */
+#if ASYNC_WRITE
 static void
 storeAufsWriteDone(int fd, void *my_data, int len, int errflag)
+#else
+static void
+storeAufsWriteDone(int fd, int errflag, size_t len, void *my_data)
+#endif
 {
     static int loop_detect = 0;
     storeIOState *sio = my_data;
     aiostate_t *aiostate = (aiostate_t *) sio->fsstate;
-    debug(78, 3) ("storeAufsWriteDone: dirno %d, fileno %08X, FD %d, len %d\n",
-       sio->swap_dirn, sio->swap_filen, fd, len);
+    debug(78, 3) ("storeAufsWriteDone: dirno %d, fileno %08X, FD %d, len %d, err=%d\n",
+       sio->swap_dirn, sio->swap_filen, fd, len, errflag);
+#if ASYNC_WRITE
+    /* Translate from errno to Squid disk error */
+    errno = errflag;
+    if (errflag)
+       errflag = errno == ENOSP ? DISK_NO_SPACE_LEFT : DISK_ERROR;
+    else
+       errflag = DISK_OK;
+#endif
     assert(++loop_detect < 10);
     aiostate->flags.writing = 0;
     if (errflag) {
@@ -305,10 +390,21 @@ storeAufsWriteDone(int fd, void *my_data, int len, int errflag)
        return;
     }
     sio->offset += len;
-    if (storeAufsKickWriteQueue(sio))
-       (void) 0;
+#if ASYNC_WRITE
+    if (!storeAufsKickWriteQueue(sio))
+       0;
     else if (aiostate->flags.close_request)
        storeAufsIOCallback(sio, errflag);
+#else
+    if (!aiostate->flags.write_kicking) {
+       aiostate->flags.write_kicking = 1;
+       while (storeAufsKickWriteQueue(sio))
+           (void) 0;
+       aiostate->flags.write_kicking = 0;
+       if (aiostate->flags.close_request)
+           storeAufsIOCallback(sio, errflag);
+    }
+#endif
     loop_detect--;
 }
 
@@ -350,6 +446,8 @@ storeAufsSomethingPending(storeIOState * sio)
        return 1;
     if (aiostate->flags.opening)
        return 1;
+    if (aiostate->flags.inreaddone)
+       return 1;
     return 0;
 }