From: hno <> Date: Sat, 11 Nov 2000 04:42:03 +0000 (+0000) Subject: Major rewrite of async-io to make it behave a bit more sane with X-Git-Tag: SQUID_3_0_PRE1~1776 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=55f0e6f71be85cb398494acf9a7852ce424a57b8;p=thirdparty%2Fsquid.git Major rewrite of async-io to make it behave a bit more sane with substantially less overhead. Some tuning work still remains to make it perform optimal. See the start of store_asyncufs.h for all the knobs. --- diff --git a/src/fs/aufs/Makefile.in b/src/fs/aufs/Makefile.in index a577db77d5..51e65dbc22 100644 --- a/src/fs/aufs/Makefile.in +++ b/src/fs/aufs/Makefile.in @@ -1,7 +1,7 @@ # # Makefile for the AUFS storage driver for the Squid Object Cache server # -# $Id: Makefile.in,v 1.1 2000/05/03 17:15:46 adrian Exp $ +# $Id: Makefile.in,v 1.2 2000/11/10 21:42:03 hno Exp $ # FS = aufs @@ -36,6 +36,7 @@ $(OUT): $(OBJS) $(RANLIB) $(OUT) $(OBJS): $(top_srcdir)/include/version.h ../../../include/autoconf.h +$(OBJS): store_asyncufs.h .c.o: @rm -f ../stamp diff --git a/src/fs/aufs/aiops.cc b/src/fs/aufs/aiops.cc index b42c3cf2b1..d7b5e94824 100644 --- a/src/fs/aufs/aiops.cc +++ b/src/fs/aufs/aiops.cc @@ -1,5 +1,5 @@ /* - * $Id: aiops.cc,v 1.3 2000/10/17 08:06:07 adrian Exp $ + * $Id: aiops.cc,v 1.4 2000/11/10 21:42:03 hno Exp $ * * DEBUG: section 43 AIOPS * AUTHOR: Stewart Forster @@ -49,13 +49,6 @@ #define RIDICULOUS_LENGTH 4096 -#if defined(_SQUID_LINUX_) -/* Linux requires proper use of mutexes or it will segfault deep in the - * thread libraries. Observed on Alpha SMP Linux 2.2.10-ac12. - */ -#define AIO_PROPER_MUTEX 1 -#endif - enum _aio_thread_status { _THREAD_STARTING = 0, _THREAD_WAITING, @@ -77,6 +70,7 @@ enum _aio_request_type { }; typedef struct aio_request_t { + struct aio_request_t *next; enum _aio_request_type request_type; int cancelled; char *path; @@ -93,20 +87,25 @@ typedef struct aio_request_t { struct stat *tmpstatp; struct stat *statp; aio_result_t *resultp; - struct aio_request_t *next; } aio_request_t; - -typedef struct aio_thread_t { +typedef struct aio_request_queue_t { + pthread_mutex_t mutex; + pthread_cond_t cond; + aio_request_t * volatile head; + aio_request_t * volatile * volatile tailp; + unsigned long requests; + unsigned long blocked; /* main failed to lock the queue */ +} aio_request_queue_t; + +typedef struct aio_thread_t aio_thread_t; +struct aio_thread_t { + aio_thread_t *next; pthread_t thread; enum _aio_thread_status status; - pthread_mutex_t mutex; /* Mutex for testing condition variable */ - pthread_cond_t cond; /* Condition variable */ - struct aio_request_t *volatile req; /* set by main, cleared by thread */ - struct aio_request_t *processed_req; /* reminder to main */ - struct aio_thread_t *next; -} aio_thread_t; - + struct aio_request_t *current_req; + unsigned long requests; +}; int aio_cancel(aio_result_t *); int aio_open(const char *, int, mode_t, aio_result_t *); @@ -121,7 +120,6 @@ int aio_sync(void); static void aio_init(void); static void aio_queue_request(aio_request_t *); -static void aio_process_request_queue(void); static void aio_cleanup_request(aio_request_t *); static void *aio_thread_loop(void *); static void aio_do_open(aio_request_t *); @@ -135,30 +133,31 @@ static void aio_do_truncate(aio_request_t *); static void *aio_do_opendir(aio_request_t *); #endif static void aio_debug(aio_request_t *); -static void aio_poll_threads(void); +static void aio_poll_queues(void); -static aio_thread_t *threads; +static aio_thread_t *threads = NULL; static int aio_initialised = 0; + #define AIO_LARGE_BUFS 16384 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3 +#define AIO_MICRO_BUFS 128 -static MemPool *aio_large_bufs = NULL; // 16K -static MemPool *aio_medium_bufs = NULL; // 8K -static MemPool *aio_small_bufs = NULL; // 4K -static MemPool *aio_tiny_bufs = NULL; // 2K +static MemPool *aio_large_bufs = NULL; /* 16K */ +static MemPool *aio_medium_bufs = NULL; /* 8K */ +static MemPool *aio_small_bufs = NULL; /* 4K */ +static MemPool *aio_tiny_bufs = NULL; /* 2K */ +static MemPool *aio_micro_bufs = NULL; /* 128K */ static int request_queue_len = 0; static MemPool *aio_request_pool = NULL; -static aio_request_t *request_queue_head = NULL; -static aio_request_t *request_queue_tail = NULL; -static aio_request_t *request_done_head = NULL; -static aio_request_t *request_done_tail = NULL; -static aio_thread_t *wait_threads = NULL; -static aio_thread_t *busy_threads_head = NULL; -static aio_thread_t *busy_threads_tail = NULL; +static MemPool *aio_thread_pool = NULL; +static aio_request_queue_t request_queue; +static struct { aio_request_t *head, **tailp; } request_queue2 = { NULL, &request_queue2.head }; +static aio_request_queue_t done_queue; +static struct { aio_request_t *head, **tailp; } done_requests = { NULL, &done_requests.head }; static pthread_attr_t globattr; static struct sched_param globsched; static pthread_t main_thread; @@ -168,7 +167,9 @@ aio_get_pool(int size) { MemPool *p; if (size <= AIO_LARGE_BUFS) { - if (size <= AIO_TINY_BUFS) + if (size <= AIO_MICRO_BUFS) + p = aio_micro_bufs; + else if (size <= AIO_TINY_BUFS) p = aio_tiny_bufs; else if (size <= AIO_SMALL_BUFS) p = aio_small_bufs; @@ -195,6 +196,18 @@ aio_xmalloc(int size) return p; } +static char * +aio_xstrdup(const char *str) +{ + char *p; + int len = strlen(str)+1; + + p = aio_xmalloc(len); + strncpy(p, str, len); + + return p; +} + static void aio_xfree(void *p, int size) { @@ -206,6 +219,18 @@ aio_xfree(void *p, int size) xfree(p); } +static void +aio_xstrfree(char *str) +{ + MemPool *pool; + int len = strlen(str)+1; + + if ( (pool = aio_get_pool(len)) != NULL) { + memPoolFree(pool, str); + } else + xfree(str); +} + static void aio_init(void) { @@ -229,32 +254,40 @@ aio_init(void) pthread_attr_setschedparam(&globattr, &globsched); #endif - /* Create threads and get them to sit in their wait loop */ - threads = xcalloc(NUMTHREADS, sizeof(aio_thread_t)); + /* Initialize request queue */ + if (pthread_mutex_init(&(request_queue.mutex), NULL)) + fatal("Failed to create mutex"); + if (pthread_cond_init(&(request_queue.cond), NULL)) + fatal("Failed to create condition variable"); + request_queue.head = NULL; + request_queue.tailp = &request_queue.head; + request_queue.requests = 0; + request_queue.blocked = 0; + + /* Initialize done queue */ + if (pthread_mutex_init(&(done_queue.mutex), NULL)) + fatal("Failed to create mutex"); + if (pthread_cond_init(&(done_queue.cond), NULL)) + fatal("Failed to create condition variable"); + done_queue.head = NULL; + done_queue.tailp = &done_queue.head; + done_queue.requests = 0; + done_queue.blocked = 0; + /* Create threads and get them to sit in their wait loop */ + aio_thread_pool = memPoolCreate("aio_thread", sizeof(aio_thread_t)); for (i = 0; i < NUMTHREADS; i++) { - threadp = &threads[i]; + threadp = memPoolAlloc(aio_thread_pool); threadp->status = _THREAD_STARTING; - if (pthread_mutex_init(&(threadp->mutex), NULL)) { - threadp->status = _THREAD_FAILED; - continue; - } - if (pthread_cond_init(&(threadp->cond), NULL)) { - threadp->status = _THREAD_FAILED; - continue; - } - threadp->req = NULL; - threadp->processed_req = NULL; + threadp->current_req = NULL; + threadp->requests = 0; + threadp->next = threads; + threads = threadp; if (pthread_create(&threadp->thread, &globattr, aio_thread_loop, threadp)) { fprintf(stderr, "Thread creation failed\n"); threadp->status = _THREAD_FAILED; continue; } - threadp->next = wait_threads; - wait_threads = threadp; -#if AIO_PROPER_MUTEX - pthread_mutex_lock(&threadp->mutex); -#endif } /* Create request pool */ @@ -263,6 +296,7 @@ aio_init(void) aio_medium_bufs = memPoolCreate("aio_medium_bufs", AIO_MEDIUM_BUFS); aio_small_bufs = memPoolCreate("aio_small_bufs", AIO_SMALL_BUFS); aio_tiny_bufs = memPoolCreate("aio_tiny_bufs", AIO_TINY_BUFS); + aio_micro_bufs = memPoolCreate("aio_micro_bufs", AIO_MICRO_BUFS); aio_initialised = 1; } @@ -274,9 +308,6 @@ aio_thread_loop(void *ptr) aio_thread_t *threadp = ptr; aio_request_t *request; sigset_t new; -#if !AIO_PROPER_MUTEX - struct timespec wait_time; -#endif /* * Make sure to ignore signals which may possibly get sent to @@ -300,29 +331,25 @@ aio_thread_loop(void *ptr) sigaddset(&new, SIGALRM); pthread_sigmask(SIG_BLOCK, &new, NULL); - pthread_mutex_lock(&threadp->mutex); while (1) { -#if AIO_PROPER_MUTEX - while (threadp->req == NULL) { + threadp->current_req = request = NULL; + request = NULL; + /* Get a request to process */ threadp->status = _THREAD_WAITING; - pthread_cond_wait(&threadp->cond, &threadp->mutex); + pthread_mutex_lock(&request_queue.mutex); + while(!request_queue.head) { + pthread_cond_wait(&request_queue.cond, &request_queue.mutex); } -#else - /* The timeout is used to unlock the race condition where - * ->req is set between the check and pthread_cond_wait. - * The thread steps it's own clock on each timeout, to avoid a CPU - * spin situation if the main thread is suspended (paging), and - * squid_curtime is not being updated timely. - */ - wait_time.tv_sec = squid_curtime + 1; /* little quicker first time */ - wait_time.tv_nsec = 0; - while (threadp->req == NULL) { - threadp->status = _THREAD_WAITING; - pthread_cond_timedwait(&threadp->cond, &threadp->mutex, &wait_time); - wait_time.tv_sec += 3; /* then wait 3 seconds between each check */ - } -#endif - request = threadp->req; + request = request_queue.head; + if (request) + request_queue.head = request->next; + if (!request_queue.head) + request_queue.tailp = &request_queue.head; + pthread_mutex_unlock(&request_queue.mutex); + /* process the request */ + threadp->status = _THREAD_BUSY; + request->next = NULL; + threadp->current_req = request; errno = 0; if (!request->cancelled) { switch (request->request_type) { @@ -361,51 +388,73 @@ aio_thread_loop(void *ptr) request->ret = -1; request->err = EINTR; } - threadp->req = NULL; /* tells main thread that we are done */ - } /* while */ + threadp->status = _THREAD_DONE; + /* put the request in the done queue */ + pthread_mutex_lock(&done_queue.mutex); + *done_queue.tailp = request; + done_queue.tailp = &request->next; + pthread_mutex_unlock(&done_queue.mutex); + threadp->requests++; + } /* while forever */ return NULL; } /* aio_thread_loop */ static void -aio_do_request(aio_request_t * requestp) -{ - if (wait_threads == NULL && busy_threads_head == NULL) { - fprintf(stderr, "PANIC: No threads to service requests with!\n"); - exit(-1); - } - aio_queue_request(requestp); -} /* aio_do_request */ - - -static void -aio_queue_request(aio_request_t * requestp) +aio_queue_request(aio_request_t * request) { - aio_request_t *rp; - static int last_warn = 0; static int high_start = 0; - static int queue_high, queue_low; - int i; - + debug(41, 9) ("aio_queue_request: %p type=%d result=%p\n", + request, request->request_type, request->resultp); /* Mark it as not executed (failing result, no error) */ - requestp->ret = -1; - requestp->err = 0; - /* Queue it on the request queue */ - if (request_queue_head == NULL) { - request_queue_head = requestp; - request_queue_tail = requestp; + request->ret = -1; + request->err = 0; + /* Internal housekeeping */ + request_queue_len += 1; + request->resultp->_data = request; + /* Play some tricks with the request_queue2 queue */ + request->next = NULL; + if (!request_queue2.head) { + if (pthread_mutex_trylock(&request_queue.mutex) == 0) { + /* Normal path */ + *request_queue.tailp = request; + request_queue.tailp = &request->next; + pthread_cond_signal(&request_queue.cond); + pthread_mutex_unlock(&request_queue.mutex); } else { - request_queue_tail->next = requestp; - request_queue_tail = requestp; + /* Oops, the request queue is blocked, use request_queue2 */ + *request_queue2.tailp = request; + request_queue2.tailp = &request->next; + } + } else { + /* Secondary path. We have blocked requests to deal with */ + /* add the request to the chain */ + *request_queue2.tailp = request; + if (pthread_mutex_trylock(&request_queue.mutex) == 0) { + /* Ok, the queue is no longer blocked */ + *request_queue.tailp = request_queue2.head; + request_queue.tailp = &request->next; + pthread_cond_signal(&request_queue.cond); + pthread_mutex_unlock(&request_queue.mutex); + request_queue2.head = NULL; + request_queue2.tailp = &request_queue2.head; + } else { + /* still blocked, bump the blocked request chain */ + request_queue2.tailp = &request->next; + } + } + if (request_queue2.head) { + static int filter = 0; + static int filter_limit = 8; + if (++filter >= filter_limit) { + filter_limit += filter; + filter = 0; + debug(43, 1) ("aio_queue_request: WARNING - Queue congestion\n"); + } } - requestp->next = NULL; - request_queue_len += 1; - /* Poll done threads if needed */ - if (wait_threads == NULL) - aio_poll_threads(); - /* Kick it rolling */ - aio_process_request_queue(); /* Warn if out of threads */ - if (request_queue_len > (NUMTHREADS >> 1)) { + if (request_queue_len > MAGIC1) { + static int last_warn = 0; + static int queue_high, queue_low; if (high_start == 0) { high_start = squid_curtime; queue_high = request_queue_len; @@ -416,49 +465,17 @@ aio_queue_request(aio_request_t * requestp) if (request_queue_len < queue_low) queue_low = request_queue_len; if (squid_curtime >= (last_warn + 15) && - squid_curtime >= (high_start + 3)) { - debug(43, 1) ("aio_queue_request: WARNING - Running out of I/O threads\n"); - debug(43, 2) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n", - request_queue_len, queue_high, queue_low, squid_curtime - high_start); - debug(43, 1) ("aio_queue_request: Perhaps you should increase NUMTHREADS\n"); - debug(43, 1) ("aio_queue_request: Or install more disks to share the load\n"); - debug(43, 3) ("aio_queue_request: First %d items on request queue\n", NUMTHREADS); - rp = request_queue_head; - for (i = 1; i <= NUMTHREADS; i++) { - switch (rp->request_type) { - case _AIO_OP_OPEN: - debug(43, 3) ("aio_queue_request: %d : open -> %s\n", i, rp->path); - break; - case _AIO_OP_READ: - debug(43, 3) ("aio_queue_request: %d : read -> FD = %d\n", i, rp->fd); - break; - case _AIO_OP_WRITE: - debug(43, 3) ("aio_queue_request: %d : write -> FD = %d\n", i, rp->fd); - break; - case _AIO_OP_CLOSE: - debug(43, 3) ("aio_queue_request: %d : close -> FD = %d\n", i, rp->fd); - break; - case _AIO_OP_UNLINK: - debug(43, 3) ("aio_queue_request: %d : unlink -> %s\n", i, rp->path); - break; - case _AIO_OP_TRUNCATE: - debug(43, 3) ("aio_queue_request: %d : truncate -> %s\n", i, rp->path); - break; - case _AIO_OP_STAT: - debug(43, 3) ("aio_queue_request: %d : stat -> %s\n", i, rp->path); - break; - default: - debug(43, 1) ("aio_queue_request: %d : Unimplemented request type: %d\n", i, rp->request_type); - break; - } - if ((rp = rp->next) == NULL) - break; - } + squid_curtime >= (high_start + 5)) { + debug(43, 1) ("aio_queue_request: WARNING - Disk I/O overloading\n"); + if (squid_curtime >= (high_start + 15)) + debug(43, 1) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n", + request_queue_len, queue_high, queue_low, squid_curtime - high_start); last_warn = squid_curtime; } } else { high_start = 0; } + /* Warn if seriously overloaded */ if (request_queue_len > RIDICULOUS_LENGTH) { debug(43, 0) ("aio_queue_request: Async request queue growing uncontrollably!\n"); debug(43, 0) ("aio_queue_request: Syncing pending I/O operations.. (blocking)\n"); @@ -467,47 +484,6 @@ aio_queue_request(aio_request_t * requestp) } } /* aio_queue_request */ - -static void -aio_process_request_queue(void) -{ - aio_thread_t *threadp; - aio_request_t *requestp; - - for (;;) { - if (wait_threads == NULL || request_queue_head == NULL) - return; - - requestp = request_queue_head; - if ((request_queue_head = requestp->next) == NULL) - request_queue_tail = NULL; - requestp->next = NULL; - request_queue_len--; - - if (requestp->cancelled) { - aio_cleanup_request(requestp); - continue; - } - threadp = wait_threads; - wait_threads = threadp->next; - threadp->next = NULL; - - if (busy_threads_head != NULL) - busy_threads_tail->next = threadp; - else - busy_threads_head = threadp; - busy_threads_tail = threadp; - - threadp->status = _THREAD_BUSY; - threadp->req = threadp->processed_req = requestp; - pthread_cond_signal(&(threadp->cond)); -#if AIO_PROPER_MUTEX - pthread_mutex_unlock(&threadp->mutex); -#endif - } -} /* aio_process_request_queue */ - - static void aio_cleanup_request(aio_request_t * requestp) { @@ -521,11 +497,13 @@ aio_cleanup_request(aio_request_t * requestp) if (!cancelled && requestp->ret == 0) xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat)); aio_xfree(requestp->tmpstatp, sizeof(struct stat)); + aio_xstrfree(requestp->path); + break; case _AIO_OP_OPEN: if (cancelled && requestp->ret >= 0) /* The open() was cancelled but completed */ close(requestp->ret); - xfree(requestp->path); + aio_xstrfree(requestp->path); break; case _AIO_OP_CLOSE: if (cancelled && requestp->ret < 0) @@ -535,11 +513,13 @@ aio_cleanup_request(aio_request_t * requestp) case _AIO_OP_UNLINK: case _AIO_OP_TRUNCATE: case _AIO_OP_OPENDIR: - xfree(requestp->path); + aio_xstrfree(requestp->path); break; case _AIO_OP_READ: if (!cancelled && requestp->ret > 0) xmemcpy(requestp->bufferp, requestp->tmpbufp, requestp->ret); + aio_xfree(requestp->tmpbufp, requestp->buflen); + break; case _AIO_OP_WRITE: aio_xfree(requestp->tmpbufp, requestp->buflen); break; @@ -557,27 +537,16 @@ aio_cleanup_request(aio_request_t * requestp) int aio_cancel(aio_result_t * resultp) { - aio_thread_t *threadp; - aio_request_t *requestp; + aio_request_t *request = resultp->_data; - for (threadp = busy_threads_head; threadp != NULL; threadp = threadp->next) - if (threadp->processed_req->resultp == resultp) { - threadp->processed_req->cancelled = 1; - threadp->processed_req->resultp = NULL; - return 0; - } - for (requestp = request_queue_head; requestp != NULL; requestp = requestp->next) - if (requestp->resultp == resultp) { - requestp->cancelled = 1; - requestp->resultp = NULL; - return 0; - } - for (requestp = request_done_head; requestp != NULL; requestp = requestp->next) - if (requestp->resultp == resultp) { - requestp->cancelled = 1; - requestp->resultp = NULL; - return 0; - } + if (request && request->resultp == resultp) { + debug(41, 9) ("aio_cancel: %p type=%d result=%p\n", + request, request->request_type, request->resultp); + request->cancelled = 1; + request->resultp = NULL; + resultp->_data = NULL; + return 0; + } return 1; } /* aio_cancel */ @@ -586,28 +555,18 @@ int aio_open(const char *path, int oflag, mode_t mode, aio_result_t * resultp) { aio_request_t *requestp; - int len; if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { - errno = ENOMEM; - return -1; - } - len = strlen(path) + 1; - if ((requestp->path = (char *) xmalloc(len)) == NULL) { - memPoolFree(aio_request_pool, requestp); - errno = ENOMEM; - return -1; - } - strncpy(requestp->path, path, len); + requestp = memPoolAlloc(aio_request_pool); + requestp->path = (char *) aio_xstrdup(path); requestp->oflag = oflag; requestp->mode = mode; requestp->resultp = resultp; requestp->request_type = _AIO_OP_OPEN; requestp->cancelled = 0; - aio_do_request(requestp); + aio_queue_request(requestp); return 0; } @@ -627,10 +586,7 @@ aio_read(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { - errno = ENOMEM; - return -1; - } + requestp = memPoolAlloc(aio_request_pool); requestp->fd = fd; requestp->bufferp = bufp; requestp->tmpbufp = (char *) aio_xmalloc(bufs); @@ -641,7 +597,7 @@ aio_read(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * requestp->request_type = _AIO_OP_READ; requestp->cancelled = 0; - aio_do_request(requestp); + aio_queue_request(requestp); return 0; } @@ -662,10 +618,7 @@ aio_write(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { - errno = ENOMEM; - return -1; - } + requestp = memPoolAlloc(aio_request_pool); requestp->fd = fd; requestp->tmpbufp = (char *) aio_xmalloc(bufs); xmemcpy(requestp->tmpbufp, bufp, bufs); @@ -676,7 +629,7 @@ aio_write(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * requestp->request_type = _AIO_OP_WRITE; requestp->cancelled = 0; - aio_do_request(requestp); + aio_queue_request(requestp); return 0; } @@ -696,16 +649,13 @@ aio_close(int fd, aio_result_t * resultp) if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { - errno = ENOMEM; - return -1; - } + requestp = memPoolAlloc(aio_request_pool); requestp->fd = fd; requestp->resultp = resultp; requestp->request_type = _AIO_OP_CLOSE; requestp->cancelled = 0; - aio_do_request(requestp); + aio_queue_request(requestp); return 0; } @@ -722,24 +672,18 @@ int aio_stat(const char *path, struct stat *sb, aio_result_t * resultp) { aio_request_t *requestp; - int len; if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { - errno = ENOMEM; - return -1; - } - len = strlen(path) + 1; - requestp->path = (char *) xmalloc(len); - strncpy(requestp->path, path, len); + requestp = memPoolAlloc(aio_request_pool); + requestp->path = (char *) aio_xstrdup(path); requestp->statp = sb; requestp->tmpstatp = (struct stat *) aio_xmalloc(sizeof(struct stat)); requestp->resultp = resultp; requestp->request_type = _AIO_OP_STAT; requestp->cancelled = 0; - aio_do_request(requestp); + aio_queue_request(requestp); return 0; } @@ -756,22 +700,16 @@ int aio_unlink(const char *path, aio_result_t * resultp) { aio_request_t *requestp; - int len; if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { - errno = ENOMEM; - return -1; - } - len = strlen(path) + 1; - requestp->path = (char *) xmalloc(len); - strncpy(requestp->path, path, len); + requestp = memPoolAlloc(aio_request_pool); + requestp->path = aio_xstrdup(path); requestp->resultp = resultp; requestp->request_type = _AIO_OP_UNLINK; requestp->cancelled = 0; - aio_do_request(requestp); + aio_queue_request(requestp); return 0; } @@ -787,27 +725,17 @@ int aio_truncate(const char *path, off_t length, aio_result_t * resultp) { aio_request_t *requestp; - int len; if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { - errno = ENOMEM; - return -1; - } - len = strlen(path) + 1; - if ((requestp->path = (char *) xmalloc(len)) == NULL) { - memPoolFree(aio_request_pool, requestp); - errno = ENOMEM; - return -1; - } + requestp = memPoolAlloc(aio_request_pool); + requestp->path = (char *) aio_xstrdup(path); requestp->offset = length; - strncpy(requestp->path, path, len); requestp->resultp = resultp; requestp->request_type = _AIO_OP_TRUNCATE; requestp->cancelled = 0; - aio_do_request(requestp); + aio_queue_request(requestp); return 0; } @@ -821,7 +749,7 @@ aio_do_truncate(aio_request_t * requestp) #if AIO_OPENDIR -/* XXX aio_opendir NOT implemented? */ +/* XXX aio_opendir NOT implemented yet.. */ int aio_opendir(const char *path, aio_result_t * resultp) @@ -831,10 +759,7 @@ aio_opendir(const char *path, aio_result_t * resultp) if (!aio_initialised) aio_init(); - if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) { - errno = ENOMEM; - return -1; - } + requestp = memPoolAlloc(aio_request_pool); return -1; } @@ -846,82 +771,66 @@ aio_do_opendir(aio_request_t * requestp) #endif - -void -aio_poll_threads(void) -{ - aio_thread_t *prev; - aio_thread_t *threadp; - aio_request_t *requestp; - - do { /* while found completed thread */ - prev = NULL; - threadp = busy_threads_head; - while (threadp) { - debug(43, 9) ("aio_poll_threads: %p: request type %d -> status %d\n", - threadp, - threadp->processed_req->request_type, - threadp->status); -#if AIO_PROPER_MUTEX - if (threadp->req == NULL) - if (pthread_mutex_trylock(&threadp->mutex) == 0) - break; -#else - if (threadp->req == NULL) - break; -#endif - prev = threadp; - threadp = threadp->next; +static void +aio_poll_queues(void) +{ + /* kick "overflow" request queue */ + if (request_queue2.head && + pthread_mutex_trylock(&request_queue.mutex) == 0) { + *request_queue.tailp = request_queue2.head; + request_queue.tailp = request_queue2.tailp; + pthread_cond_signal(&request_queue.cond); + pthread_mutex_unlock(&request_queue.mutex); + request_queue2.head = NULL; + request_queue2.tailp = &request_queue2.head; + } + /* poll done queue */ + if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) { + struct aio_request_t *requests = done_queue.head; + done_queue.head = NULL; + done_queue.tailp = &done_queue.head; + pthread_mutex_unlock(&done_queue.mutex); + *done_requests.tailp = requests; + request_queue_len -= 1; + while(requests->next) { + requests = requests->next; + request_queue_len -= 1; } - if (threadp == NULL) - break; - - if (prev == NULL) - busy_threads_head = busy_threads_head->next; - else - prev->next = threadp->next; - - if (busy_threads_tail == threadp) - busy_threads_tail = prev; - - requestp = threadp->processed_req; - threadp->processed_req = NULL; - - threadp->next = wait_threads; - wait_threads = threadp; - - if (request_done_tail != NULL) - request_done_tail->next = requestp; - else - request_done_head = requestp; - request_done_tail = requestp; - } while (threadp); - - aio_process_request_queue(); -} /* aio_poll_threads */ + done_requests.tailp = &requests->next; + } + /* Give up the CPU to allow the threads to do their work */ + if (done_queue.head || request_queue.head) + sched_yield(); +} aio_result_t * aio_poll_done(void) { - aio_request_t *requestp; + aio_request_t *request; aio_result_t *resultp; int cancelled; + int polled = 0; AIO_REPOLL: - aio_poll_threads(); - if (request_done_head == NULL) { + request = done_requests.head; + if (request == NULL && !polled) { + aio_poll_queues(); + polled = 1; + request = done_requests.head; + } + if (!request) { return NULL; } - requestp = request_done_head; - request_done_head = requestp->next; - if (!request_done_head) - request_done_tail = NULL; - - resultp = requestp->resultp; - cancelled = requestp->cancelled; - aio_debug(requestp); - debug(43, 5) ("DONE: %d -> %d\n", requestp->ret, requestp->err); - aio_cleanup_request(requestp); + debug(41, 9) ("aio_poll_done: %p type=%d result=%p\n", + request, request->request_type, request->resultp); + done_requests.head = request->next; + if (!done_requests.head) + done_requests.tailp = &done_requests.head; + resultp = request->resultp; + cancelled = request->cancelled; + aio_debug(request); + debug(43, 5) ("DONE: %d -> %d\n", request->ret, request->err); + aio_cleanup_request(request); if (cancelled) goto AIO_REPOLL; return resultp; @@ -930,30 +839,15 @@ aio_poll_done(void) int aio_operations_pending(void) { - return request_queue_len + (request_done_head != NULL) + (busy_threads_head != NULL); -} - -int -aio_overloaded(void) -{ - static time_t last_warn = 0; - if (aio_operations_pending() > RIDICULOUS_LENGTH / 4) { - if (squid_curtime >= (last_warn + 15)) { - debug(43, 0) ("Warning: Async-IO overloaded\n"); - last_warn = squid_curtime; - } - return 1; - } - return 0; + return request_queue_len + (done_requests.head ? 1 : 0); } int aio_sync(void) { - int loop_count = 0; + /* XXX This might take a while if the queue is large.. */ do { - aio_poll_threads(); - assert(++loop_count < 10); + aio_poll_queues(); } while (request_queue_len > 0); return aio_operations_pending(); } @@ -965,26 +859,26 @@ aio_get_queue_len(void) } static void -aio_debug(aio_request_t * requestp) +aio_debug(aio_request_t * request) { - switch (requestp->request_type) { + switch (request->request_type) { case _AIO_OP_OPEN: - debug(43, 5) ("OPEN of %s to FD %d\n", requestp->path, requestp->ret); + debug(43, 5) ("OPEN of %s to FD %d\n", request->path, request->ret); break; case _AIO_OP_READ: - debug(43, 5) ("READ on fd: %d\n", requestp->fd); + debug(43, 5) ("READ on fd: %d\n", request->fd); break; case _AIO_OP_WRITE: - debug(43, 5) ("WRITE on fd: %d\n", requestp->fd); + debug(43, 5) ("WRITE on fd: %d\n", request->fd); break; case _AIO_OP_CLOSE: - debug(43, 5) ("CLOSE of fd: %d\n", requestp->fd); + debug(43, 5) ("CLOSE of fd: %d\n", request->fd); break; case _AIO_OP_UNLINK: - debug(43, 5) ("UNLINK of %s\n", requestp->path); + debug(43, 5) ("UNLINK of %s\n", request->path); break; case _AIO_OP_TRUNCATE: - debug(43, 5) ("UNLINK of %s\n", requestp->path); + debug(43, 5) ("UNLINK of %s\n", request->path); break; default: break; diff --git a/src/fs/aufs/async_io.cc b/src/fs/aufs/async_io.cc index af8a52b8b1..9f9565ca98 100644 --- a/src/fs/aufs/async_io.cc +++ b/src/fs/aufs/async_io.cc @@ -1,6 +1,6 @@ /* - * $Id: async_io.cc,v 1.5 2000/06/27 08:33:53 hno Exp $ + * $Id: async_io.cc,v 1.6 2000/11/10 21:42:03 hno Exp $ * * DEBUG: section 32 Asynchronous Disk I/O * AUTHOR: Pete Bentley @@ -53,6 +53,9 @@ typedef struct aio_ctrl_t { AIOCB *done_handler; void *done_handler_data; aio_result_t result; + char *bufp; + FREE *free_func; + dlink_node node; } aio_ctrl_t; struct { @@ -71,7 +74,7 @@ typedef struct aio_unlinkq_t { struct aio_unlinkq_t *next; } aio_unlinkq_t; -static aio_ctrl_t *used_list = NULL; +static dlink_list used_list; static int initialised = 0; static OBJH aioStats; static MemPool *aio_ctrl_pool; @@ -107,7 +110,6 @@ void aioOpen(const char *path, int oflag, mode_t mode, AIOCB * callback, void *callback_data) { aio_ctrl_t *ctrlp; - int ret; assert(initialised); aio_counts.open++; @@ -117,16 +119,9 @@ aioOpen(const char *path, int oflag, mode_t mode, AIOCB * callback, void *callba ctrlp->done_handler_data = callback_data; ctrlp->operation = _AIO_OPEN; cbdataLock(callback_data); - if (aio_open(path, oflag, mode, &ctrlp->result) < 0) { - ret = open(path, oflag, mode); - if (callback) - (callback) (ctrlp->fd, callback_data, ret, errno); - cbdataUnlock(callback_data); - memPoolFree(aio_ctrl_pool, ctrlp); - return; - } - ctrlp->next = used_list; - used_list = ctrlp; + ctrlp->result.data = ctrlp; + aio_open(path, oflag, mode, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); return; } @@ -143,14 +138,9 @@ aioClose(int fd) ctrlp->done_handler = NULL; ctrlp->done_handler_data = NULL; ctrlp->operation = _AIO_CLOSE; - if (aio_close(fd, &ctrlp->result) < 0) { - close(fd); /* Can't create thread - do a normal close */ - memPoolFree(aio_ctrl_pool, ctrlp); - aioFDWasClosed(fd); - return; - } - ctrlp->next = used_list; - used_list = ctrlp; + ctrlp->result.data = ctrlp; + aio_close(fd, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); return; } @@ -158,23 +148,20 @@ void aioCancel(int fd) { aio_ctrl_t *curr; - aio_ctrl_t *prev; - aio_ctrl_t *next; AIOCB *done_handler; void *their_data; + dlink_node *m, *next; assert(initialised); aio_counts.cancel++; - prev = NULL; - curr = used_list; - for (curr = used_list;; curr = next) { - while (curr != NULL) { + for (m = used_list.head; m; m = next) { + while (m) { + curr = m->data; if (curr->fd == fd) break; - prev = curr; - curr = curr->next; + m = m->next; } - if (curr == NULL) + if (m == NULL) break; aio_cancel(&curr->result); @@ -188,12 +175,8 @@ aioCancel(int fd) done_handler(fd, their_data, -2, -2); cbdataUnlock(their_data); } - next = curr->next; - if (prev == NULL) - used_list = next; - else - prev->next = next; - + next = m->next; + dlinkDelete(m, &used_list); memPoolFree(aio_ctrl_pool, curr); } } @@ -207,23 +190,13 @@ aioWrite(int fd, int offset, char *bufp, int len, AIOCB * callback, void *callba assert(initialised); aio_counts.write++; - for (ctrlp = used_list; ctrlp != NULL; ctrlp = ctrlp->next) - if (ctrlp->fd == fd) - break; - if (ctrlp != NULL) { - debug(0, 0) ("aioWrite: EWOULDBLOCK\n"); - errno = EWOULDBLOCK; - if (callback) - (callback) (fd, callback_data, -1, errno); - if (free_func) - free_func(bufp); - return; - } ctrlp = memPoolAlloc(aio_ctrl_pool); ctrlp->fd = fd; ctrlp->done_handler = callback; ctrlp->done_handler_data = callback_data; ctrlp->operation = _AIO_WRITE; + ctrlp->bufp = bufp; + ctrlp->free_func = free_func; if (offset >= 0) seekmode = SEEK_SET; else { @@ -231,22 +204,9 @@ aioWrite(int fd, int offset, char *bufp, int len, AIOCB * callback, void *callba offset = 0; } cbdataLock(callback_data); - if (aio_write(fd, bufp, len, offset, seekmode, &ctrlp->result) < 0) { - if (errno == ENOMEM || errno == EAGAIN || errno == EINVAL) - errno = EWOULDBLOCK; - if (callback) - (callback) (fd, callback_data, -1, errno); - cbdataUnlock(callback_data); - memPoolFree(aio_ctrl_pool, ctrlp); - } else { - ctrlp->next = used_list; - used_list = ctrlp; - } - /* - * aio_write copies the buffer so we can free it here - */ - if (free_func) - free_func(bufp); + ctrlp->result.data = ctrlp; + aio_write(fd, bufp, len, offset, seekmode, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); } /* aioWrite */ @@ -258,15 +218,6 @@ aioRead(int fd, int offset, char *bufp, int len, AIOCB * callback, void *callbac assert(initialised); aio_counts.read++; - for (ctrlp = used_list; ctrlp != NULL; ctrlp = ctrlp->next) - if (ctrlp->fd == fd) - break; - if (ctrlp != NULL) { - errno = EWOULDBLOCK; - if (callback) - (callback) (fd, callback_data, -1, errno); - return; - } ctrlp = memPoolAlloc(aio_ctrl_pool); ctrlp->fd = fd; ctrlp->done_handler = callback; @@ -279,17 +230,9 @@ aioRead(int fd, int offset, char *bufp, int len, AIOCB * callback, void *callbac offset = 0; } cbdataLock(callback_data); - if (aio_read(fd, bufp, len, offset, seekmode, &ctrlp->result) < 0) { - if (errno == ENOMEM || errno == EAGAIN || errno == EINVAL) - errno = EWOULDBLOCK; - if (callback) - (callback) (fd, callback_data, -1, errno); - cbdataUnlock(callback_data); - memPoolFree(aio_ctrl_pool, ctrlp); - return; - } - ctrlp->next = used_list; - used_list = ctrlp; + ctrlp->result.data = ctrlp; + aio_read(fd, bufp, len, offset, seekmode, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); return; } /* aioRead */ @@ -306,25 +249,16 @@ aioStat(char *path, struct stat *sb, AIOCB * callback, void *callback_data) ctrlp->done_handler_data = callback_data; ctrlp->operation = _AIO_STAT; cbdataLock(callback_data); - if (aio_stat(path, sb, &ctrlp->result) < 0) { - if (errno == ENOMEM || errno == EAGAIN || errno == EINVAL) - errno = EWOULDBLOCK; - if (callback) - (callback) (ctrlp->fd, callback_data, -1, errno); - cbdataUnlock(callback_data); - memPoolFree(aio_ctrl_pool, ctrlp); - return; - } - ctrlp->next = used_list; - used_list = ctrlp; + ctrlp->result.data = ctrlp; + aio_stat(path, sb, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); return; } /* aioStat */ void -aioUnlink(const char *pathname, AIOCB * callback, void *callback_data) +aioUnlink(const char *path, AIOCB * callback, void *callback_data) { aio_ctrl_t *ctrlp; - char *path; assert(initialised); aio_counts.unlink++; ctrlp = memPoolAlloc(aio_ctrl_pool); @@ -332,27 +266,16 @@ aioUnlink(const char *pathname, AIOCB * callback, void *callback_data) ctrlp->done_handler = callback; ctrlp->done_handler_data = callback_data; ctrlp->operation = _AIO_UNLINK; - path = xstrdup(pathname); cbdataLock(callback_data); - if (aio_unlink(path, &ctrlp->result) < 0) { - int ret = unlink(path); - if (callback) - (callback) (ctrlp->fd, callback_data, ret, errno); - cbdataUnlock(callback_data); - memPoolFree(aio_ctrl_pool, ctrlp); - xfree(path); - return; - } - ctrlp->next = used_list; - used_list = ctrlp; - xfree(path); + ctrlp->result.data = ctrlp; + aio_unlink(path, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); } /* aioUnlink */ void -aioTruncate(const char *pathname, off_t length, AIOCB * callback, void *callback_data) +aioTruncate(const char *path, off_t length, AIOCB * callback, void *callback_data) { aio_ctrl_t *ctrlp; - char *path; assert(initialised); aio_counts.unlink++; ctrlp = memPoolAlloc(aio_ctrl_pool); @@ -360,20 +283,10 @@ aioTruncate(const char *pathname, off_t length, AIOCB * callback, void *callback ctrlp->done_handler = callback; ctrlp->done_handler_data = callback_data; ctrlp->operation = _AIO_TRUNCATE; - path = xstrdup(pathname); cbdataLock(callback_data); - if (aio_truncate(path, length, &ctrlp->result) < 0) { - int ret = truncate(path, length); - if (callback) - (callback) (ctrlp->fd, callback_data, ret, errno); - cbdataUnlock(callback_data); - memPoolFree(aio_ctrl_pool, ctrlp); - xfree(path); - return; - } - ctrlp->next = used_list; - used_list = ctrlp; - xfree(path); + ctrlp->result.data = ctrlp; + aio_truncate(path, length, &ctrlp->result); + dlinkAdd(ctrlp, &ctrlp->node, &used_list); } /* aioTruncate */ @@ -382,7 +295,6 @@ aioCheckCallbacks(SwapDir * SD) { aio_result_t *resultp; aio_ctrl_t *ctrlp; - aio_ctrl_t *prev; AIOCB *done_handler; void *their_data; int retval = 0; @@ -392,26 +304,24 @@ aioCheckCallbacks(SwapDir * SD) for (;;) { if ((resultp = aio_poll_done()) == NULL) break; - prev = NULL; - for (ctrlp = used_list; ctrlp != NULL; prev = ctrlp, ctrlp = ctrlp->next) - if (&ctrlp->result == resultp) - break; + ctrlp = (aio_ctrl_t *)resultp->data; if (ctrlp == NULL) - continue; - if (prev == NULL) - used_list = ctrlp->next; - else - prev->next = ctrlp->next; + continue; /* XXX Should not happen */ + dlinkDelete(&ctrlp->node, &used_list); if ((done_handler = ctrlp->done_handler)) { their_data = ctrlp->done_handler_data; ctrlp->done_handler = NULL; ctrlp->done_handler_data = NULL; - if (cbdataValid(their_data)) + if (cbdataValid(their_data)) { retval = 1; /* Return that we've actually done some work */ done_handler(ctrlp->fd, their_data, ctrlp->result.aio_return, ctrlp->result.aio_errno); + } cbdataUnlock(their_data); } + /* free data if requested to aioWrite() */ + if (ctrlp->free_func) + ctrlp->free_func(ctrlp->bufp); if (ctrlp->operation == _AIO_CLOSE) aioFDWasClosed(ctrlp->fd); memPoolFree(aio_ctrl_pool, ctrlp); diff --git a/src/fs/aufs/store_asyncufs.h b/src/fs/aufs/store_asyncufs.h index baa3d1a303..737927d7df 100644 --- a/src/fs/aufs/store_asyncufs.h +++ b/src/fs/aufs/store_asyncufs.h @@ -10,14 +10,26 @@ #ifdef ASYNC_IO_THREADS #define NUMTHREADS ASYNC_IO_THREADS #else -#define NUMTHREADS 16 +#define NUMTHREADS (Config.cacheSwap.n_configured*16) #endif -#define MAGIC1 (NUMTHREADS/Config.cacheSwap.n_configured/2) +/* Queue limit where swapouts are deferred (load calculation) */ +#define MAGIC1 (NUMTHREADS*Config.cacheSwap.n_configured*5) +/* Queue limit where swapins are deferred (open/create fails) */ +#define MAGIC2 (NUMTHREADS*Config.cacheSwap.n_configured*20) + +/* Which operations to run async */ +#define ASYNC_OPEN 1 +#define ASYNC_CLOSE 0 +#define ASYNC_CREATE 1 +#define ASYNC_WRITE 0 +#define ASYNC_READ 1 struct _aio_result_t { int aio_return; int aio_errno; + void *_data; /* Internal housekeeping */ + void *data; /* Available to the caller */ }; typedef struct _aio_result_t aio_result_t; @@ -35,7 +47,6 @@ int aio_truncate(const char *, off_t length, aio_result_t *); int aio_opendir(const char *, aio_result_t *); aio_result_t *aio_poll_done(void); int aio_operations_pending(void); -int aio_overloaded(void); int aio_sync(void); int aio_get_queue_len(void); @@ -68,6 +79,9 @@ struct _aiostate_t { unsigned int reading:1; unsigned int writing:1; unsigned int opening:1; + unsigned int write_kicking:1; + unsigned int read_kicking:1; + unsigned int inreaddone:1; } flags; const char *read_buf; link_list *pending_writes; diff --git a/src/fs/aufs/store_dir_aufs.cc b/src/fs/aufs/store_dir_aufs.cc index cedb501f56..983ea03457 100644 --- a/src/fs/aufs/store_dir_aufs.cc +++ b/src/fs/aufs/store_dir_aufs.cc @@ -1,6 +1,6 @@ /* - * $Id: store_dir_aufs.cc,v 1.15 2000/11/10 09:04:52 adrian Exp $ + * $Id: store_dir_aufs.cc,v 1.16 2000/11/10 21:42:03 hno Exp $ * * DEBUG: section 47 Store Directory Routines * AUTHOR: Duane Wessels @@ -1374,10 +1374,8 @@ storeAufsDirCheckObj(SwapDir * SD, const StoreEntry * e) ql = aioQueueSize(); if (ql == 0) loadav = 0; - else if (ql >= MAGIC1) /* Queue is too long, don't even consider it */ - loadav = -1; - else - loadav = MAGIC1 * 1000 / ql; + loadav = ql * 1000 / MAGIC1; + debug(41, 9) ("storeAufsDirCheckObj: load=%d\n", loadav); return loadav; } diff --git a/src/fs/aufs/store_io_aufs.cc b/src/fs/aufs/store_io_aufs.cc index 4e932f00f8..88afc1d267 100644 --- a/src/fs/aufs/store_io_aufs.cc +++ b/src/fs/aufs/store_io_aufs.cc @@ -6,8 +6,16 @@ #include "squid.h" #include "store_asyncufs.h" +#if ASYNC_READ static AIOCB storeAufsReadDone; +#else +static DRCB storeAufsReadDone; +#endif +#if ASYNC_WRITE static AIOCB storeAufsWriteDone; +#else +static DWCB storeAufsWriteDone; +#endif static void storeAufsIOCallback(storeIOState * sio, int errflag); static AIOCB storeAufsOpenDone; static int storeAufsSomethingPending(storeIOState *); @@ -24,13 +32,25 @@ storeAufsOpen(SwapDir * SD, StoreEntry * e, STFNCB * file_callback, sfileno f = e->swap_filen; char *path = storeAufsDirFullPath(SD, f, NULL); storeIOState *sio; +#if !ASYNC_OPEN + int fd; +#endif debug(78, 3) ("storeAufsOpen: fileno %08X\n", f); /* * we should detect some 'too many files open' condition and return * NULL here. */ - while (aioQueueSize() > MAGIC1) +#ifdef MAGIC2 + if (aioQueueSize() > MAGIC2) return NULL; +#endif +#if !ASYNC_OPEN + fd = file_open(path, O_RDONLY); + if (fd < 0) { + debug(78, 3) ("storeAufsOpen: got failude (%d)\n", errno); + return NULL; + } +#endif sio = memAllocate(MEM_STORE_IO); cbdataAdd(sio, storeAufsIOFreeEntry, MEM_STORE_IO); sio->fsstate = memPoolAlloc(aio_state_pool); @@ -43,8 +63,12 @@ storeAufsOpen(SwapDir * SD, StoreEntry * e, STFNCB * file_callback, sio->callback_data = callback_data; sio->e = e; cbdataLock(callback_data); - aioOpen(path, O_RDONLY, 0644, storeAufsOpenDone, sio); Opening_FD++; +#if ASYNC_OPEN + aioOpen(path, O_RDONLY, 0644, storeAufsOpenDone, sio); +#else + storeAufsOpenDone(fd, sio, fd, 0); +#endif store_open_disk_fd++; return sio; } @@ -57,6 +81,9 @@ storeAufsCreate(SwapDir * SD, StoreEntry * e, STFNCB * file_callback, STIOCB * c storeIOState *sio; sfileno filn; sdirno dirn; +#if !ASYNC_CREATE + int fd; +#endif /* Allocate a number */ dirn = SD->index; @@ -68,8 +95,17 @@ storeAufsCreate(SwapDir * SD, StoreEntry * e, STFNCB * file_callback, STIOCB * c * we should detect some 'too many files open' condition and return * NULL here. */ - while (aioQueueSize() > MAGIC1) +#ifdef MAGIC2 + if (aioQueueSize() > MAGIC2) + return NULL; +#endif +#if !ASYNC_CREATE + fd = file_open(path, O_WRONLY | O_CREAT | O_TRUNC); + if (fd < 0) { + debug(78, 3) ("storeAufsCreate: got failude (%d)\n", errno); return NULL; + } +#endif sio = memAllocate(MEM_STORE_IO); cbdataAdd(sio, storeAufsIOFreeEntry, MEM_STORE_IO); sio->fsstate = memPoolAlloc(aio_state_pool); @@ -82,8 +118,12 @@ storeAufsCreate(SwapDir * SD, StoreEntry * e, STFNCB * file_callback, STIOCB * c sio->callback_data = callback_data; sio->e = (StoreEntry *) e; cbdataLock(callback_data); - aioOpen(path, O_WRONLY | O_CREAT | O_TRUNC, 0644, storeAufsOpenDone, sio); Opening_FD++; +#if ASYNC_CREATE + aioOpen(path, O_WRONLY | O_CREAT | O_TRUNC, 0644, storeAufsOpenDone, sio); +#else + storeAufsOpenDone(fd, sio, fd, 0); +#endif store_open_disk_fd++; /* now insert into the replacement policy */ @@ -105,7 +145,7 @@ storeAufsClose(SwapDir * SD, storeIOState * sio) aiostate->flags.close_request = 1; return; } - storeAufsIOCallback(sio, 0); + storeAufsIOCallback(sio, DISK_OK); } @@ -139,7 +179,11 @@ storeAufsRead(SwapDir * SD, storeIOState * sio, char *buf, size_t size, off_t of sio->swap_dirn, sio->swap_filen, aiostate->fd); sio->offset = offset; aiostate->flags.reading = 1; +#if ASYNC_READ aioRead(aiostate->fd, offset, buf, size, storeAufsReadDone, sio); +#else + file_read(aiostate->fd, offset, buf, size, storeAufsReadDone, sio); +#endif } @@ -162,6 +206,7 @@ storeAufsWrite(SwapDir * SD, storeIOState * sio, char *buf, size_t size, off_t o linklistPush(&(aiostate->pending_writes), q); return; } +#if ASYNC_WRITE if (aiostate->flags.writing) { struct _queued_write *q; debug(78, 3) ("storeAufsWrite: queuing write\n"); @@ -178,9 +223,15 @@ storeAufsWrite(SwapDir * SD, storeIOState * sio, char *buf, size_t size, off_t o * XXX it might be nice if aioWrite() gave is immediate * feedback here about EWOULDBLOCK instead of in the * callback function + * XXX Should never give EWOULDBLOCK under normal operations + * if it does then the MAGIC1/2 tuning is wrong. */ aioWrite(aiostate->fd, offset, buf, size, storeAufsWriteDone, sio, free_func); +#else + file_write(aiostate->fd, offset, buf, size, storeAufsWriteDone, sio, + free_func); +#endif } /* Unlink */ @@ -235,7 +286,7 @@ storeAufsOpenDone(int unused, void *my_data, int fd, int errflag) errno = errflag; debug(78, 0) ("storeAufsOpenDone: %s\n", xstrerror()); debug(78, 1) ("\t%s\n", storeAufsDirFullPath(INDEXSD(sio->swap_dirn), sio->swap_filen, NULL)); - storeAufsIOCallback(sio, errflag); + storeAufsIOCallback(sio, DISK_ERROR); return; } aiostate->fd = fd; @@ -248,8 +299,20 @@ storeAufsOpenDone(int unused, void *my_data, int fd, int errflag) debug(78, 3) ("storeAufsOpenDone: exiting\n"); } +/* + * XXX TODO + * if errflag == EWOULDBLOCK, then we'll need to re-queue the + * chunk at the beginning of the write_pending list and try + * again later. + * XXX Should not normally happen. + */ +#if ASYNC_READ static void storeAufsReadDone(int fd, void *my_data, int len, int errflag) +#else +static void +storeAufsReadDone(int fd, int errflag, size_t len, void *my_data) +#endif { storeIOState *sio = my_data; aiostate_t *aiostate = (aiostate_t *) sio->fsstate; @@ -258,6 +321,7 @@ storeAufsReadDone(int fd, void *my_data, int len, int errflag) ssize_t rlen; debug(78, 3) ("storeAufsReadDone: dirno %d, fileno %08X, FD %d, len %d\n", sio->swap_dirn, sio->swap_filen, fd, len); + aiostate->flags.inreaddone = 1; aiostate->flags.reading = 0; if (errflag) { debug(78, 3) ("storeAufsReadDone: got failure (%d)\n", errflag); @@ -266,6 +330,17 @@ storeAufsReadDone(int fd, void *my_data, int len, int errflag) rlen = (ssize_t) len; sio->offset += len; } +#if ASYNC_READ + /* translate errflag from errno to Squid disk error */ + errno = errflag; + if (errflag) + errflag = DISK_ERROR; + else + errflag = DISK_OK; +#else + if (errflag == DISK_EOF) + errflag = DISK_OK; /* EOF is signalled by len == 0, not errors... */ +#endif assert(callback); assert(their_data); sio->read.callback = NULL; @@ -273,13 +348,9 @@ storeAufsReadDone(int fd, void *my_data, int len, int errflag) if (cbdataValid(their_data)) callback(their_data, aiostate->read_buf, rlen); cbdataUnlock(their_data); - /* - * XXX is this safe? The above callback may have caused sio - * to be freed/closed already? Philip Guenther - * says it fixes his FD leaks, with no side effects. - */ + aiostate->flags.inreaddone = 0; if (aiostate->flags.close_request) - storeAufsIOCallback(sio, DISK_OK); + storeAufsIOCallback(sio, errflag); } /* @@ -287,15 +358,29 @@ storeAufsReadDone(int fd, void *my_data, int len, int errflag) * if errflag == EWOULDBLOCK, then we'll need to re-queue the * chunk at the beginning of the write_pending list and try * again later. + * XXX Should not normally happen. */ +#if ASYNC_WRITE static void storeAufsWriteDone(int fd, void *my_data, int len, int errflag) +#else +static void +storeAufsWriteDone(int fd, int errflag, size_t len, void *my_data) +#endif { static int loop_detect = 0; storeIOState *sio = my_data; aiostate_t *aiostate = (aiostate_t *) sio->fsstate; - debug(78, 3) ("storeAufsWriteDone: dirno %d, fileno %08X, FD %d, len %d\n", - sio->swap_dirn, sio->swap_filen, fd, len); + debug(78, 3) ("storeAufsWriteDone: dirno %d, fileno %08X, FD %d, len %d, err=%d\n", + sio->swap_dirn, sio->swap_filen, fd, len, errflag); +#if ASYNC_WRITE + /* Translate from errno to Squid disk error */ + errno = errflag; + if (errflag) + errflag = errno == ENOSP ? DISK_NO_SPACE_LEFT : DISK_ERROR; + else + errflag = DISK_OK; +#endif assert(++loop_detect < 10); aiostate->flags.writing = 0; if (errflag) { @@ -305,10 +390,21 @@ storeAufsWriteDone(int fd, void *my_data, int len, int errflag) return; } sio->offset += len; - if (storeAufsKickWriteQueue(sio)) - (void) 0; +#if ASYNC_WRITE + if (!storeAufsKickWriteQueue(sio)) + 0; else if (aiostate->flags.close_request) storeAufsIOCallback(sio, errflag); +#else + if (!aiostate->flags.write_kicking) { + aiostate->flags.write_kicking = 1; + while (storeAufsKickWriteQueue(sio)) + (void) 0; + aiostate->flags.write_kicking = 0; + if (aiostate->flags.close_request) + storeAufsIOCallback(sio, errflag); + } +#endif loop_detect--; } @@ -350,6 +446,8 @@ storeAufsSomethingPending(storeIOState * sio) return 1; if (aiostate->flags.opening) return 1; + if (aiostate->flags.inreaddone) + return 1; return 0; }