/*
- * $Id: aiops.cc,v 1.3 2000/10/17 08:06:07 adrian Exp $
+ * $Id: aiops.cc,v 1.4 2000/11/10 21:42:03 hno Exp $
*
* DEBUG: section 43 AIOPS
* AUTHOR: Stewart Forster <slf@connect.com.au>
#define RIDICULOUS_LENGTH 4096
-#if defined(_SQUID_LINUX_)
-/* Linux requires proper use of mutexes or it will segfault deep in the
- * thread libraries. Observed on Alpha SMP Linux 2.2.10-ac12.
- */
-#define AIO_PROPER_MUTEX 1
-#endif
-
enum _aio_thread_status {
_THREAD_STARTING = 0,
_THREAD_WAITING,
};
typedef struct aio_request_t {
+ struct aio_request_t *next;
enum _aio_request_type request_type;
int cancelled;
char *path;
struct stat *tmpstatp;
struct stat *statp;
aio_result_t *resultp;
- struct aio_request_t *next;
} aio_request_t;
-
-typedef struct aio_thread_t {
+typedef struct aio_request_queue_t {
+ pthread_mutex_t mutex;
+ pthread_cond_t cond;
+ aio_request_t * volatile head;
+ aio_request_t * volatile * volatile tailp;
+ unsigned long requests;
+ unsigned long blocked; /* main failed to lock the queue */
+} aio_request_queue_t;
+
+typedef struct aio_thread_t aio_thread_t;
+struct aio_thread_t {
+ aio_thread_t *next;
pthread_t thread;
enum _aio_thread_status status;
- pthread_mutex_t mutex; /* Mutex for testing condition variable */
- pthread_cond_t cond; /* Condition variable */
- struct aio_request_t *volatile req; /* set by main, cleared by thread */
- struct aio_request_t *processed_req; /* reminder to main */
- struct aio_thread_t *next;
-} aio_thread_t;
-
+ struct aio_request_t *current_req;
+ unsigned long requests;
+};
int aio_cancel(aio_result_t *);
int aio_open(const char *, int, mode_t, aio_result_t *);
static void aio_init(void);
static void aio_queue_request(aio_request_t *);
-static void aio_process_request_queue(void);
static void aio_cleanup_request(aio_request_t *);
static void *aio_thread_loop(void *);
static void aio_do_open(aio_request_t *);
static void *aio_do_opendir(aio_request_t *);
#endif
static void aio_debug(aio_request_t *);
-static void aio_poll_threads(void);
+static void aio_poll_queues(void);
-static aio_thread_t *threads;
+static aio_thread_t *threads = NULL;
static int aio_initialised = 0;
+
#define AIO_LARGE_BUFS 16384
#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
#define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
+#define AIO_MICRO_BUFS 128
-static MemPool *aio_large_bufs = NULL; // 16K
-static MemPool *aio_medium_bufs = NULL; // 8K
-static MemPool *aio_small_bufs = NULL; // 4K
-static MemPool *aio_tiny_bufs = NULL; // 2K
+static MemPool *aio_large_bufs = NULL; /* 16K */
+static MemPool *aio_medium_bufs = NULL; /* 8K */
+static MemPool *aio_small_bufs = NULL; /* 4K */
+static MemPool *aio_tiny_bufs = NULL; /* 2K */
+static MemPool *aio_micro_bufs = NULL; /* 128K */
static int request_queue_len = 0;
static MemPool *aio_request_pool = NULL;
-static aio_request_t *request_queue_head = NULL;
-static aio_request_t *request_queue_tail = NULL;
-static aio_request_t *request_done_head = NULL;
-static aio_request_t *request_done_tail = NULL;
-static aio_thread_t *wait_threads = NULL;
-static aio_thread_t *busy_threads_head = NULL;
-static aio_thread_t *busy_threads_tail = NULL;
+static MemPool *aio_thread_pool = NULL;
+static aio_request_queue_t request_queue;
+static struct { aio_request_t *head, **tailp; } request_queue2 = { NULL, &request_queue2.head };
+static aio_request_queue_t done_queue;
+static struct { aio_request_t *head, **tailp; } done_requests = { NULL, &done_requests.head };
static pthread_attr_t globattr;
static struct sched_param globsched;
static pthread_t main_thread;
{
MemPool *p;
if (size <= AIO_LARGE_BUFS) {
- if (size <= AIO_TINY_BUFS)
+ if (size <= AIO_MICRO_BUFS)
+ p = aio_micro_bufs;
+ else if (size <= AIO_TINY_BUFS)
p = aio_tiny_bufs;
else if (size <= AIO_SMALL_BUFS)
p = aio_small_bufs;
return p;
}
+static char *
+aio_xstrdup(const char *str)
+{
+ char *p;
+ int len = strlen(str)+1;
+
+ p = aio_xmalloc(len);
+ strncpy(p, str, len);
+
+ return p;
+}
+
static void
aio_xfree(void *p, int size)
{
xfree(p);
}
+static void
+aio_xstrfree(char *str)
+{
+ MemPool *pool;
+ int len = strlen(str)+1;
+
+ if ( (pool = aio_get_pool(len)) != NULL) {
+ memPoolFree(pool, str);
+ } else
+ xfree(str);
+}
+
static void
aio_init(void)
{
pthread_attr_setschedparam(&globattr, &globsched);
#endif
- /* Create threads and get them to sit in their wait loop */
- threads = xcalloc(NUMTHREADS, sizeof(aio_thread_t));
+ /* Initialize request queue */
+ if (pthread_mutex_init(&(request_queue.mutex), NULL))
+ fatal("Failed to create mutex");
+ if (pthread_cond_init(&(request_queue.cond), NULL))
+ fatal("Failed to create condition variable");
+ request_queue.head = NULL;
+ request_queue.tailp = &request_queue.head;
+ request_queue.requests = 0;
+ request_queue.blocked = 0;
+
+ /* Initialize done queue */
+ if (pthread_mutex_init(&(done_queue.mutex), NULL))
+ fatal("Failed to create mutex");
+ if (pthread_cond_init(&(done_queue.cond), NULL))
+ fatal("Failed to create condition variable");
+ done_queue.head = NULL;
+ done_queue.tailp = &done_queue.head;
+ done_queue.requests = 0;
+ done_queue.blocked = 0;
+ /* Create threads and get them to sit in their wait loop */
+ aio_thread_pool = memPoolCreate("aio_thread", sizeof(aio_thread_t));
for (i = 0; i < NUMTHREADS; i++) {
- threadp = &threads[i];
+ threadp = memPoolAlloc(aio_thread_pool);
threadp->status = _THREAD_STARTING;
- if (pthread_mutex_init(&(threadp->mutex), NULL)) {
- threadp->status = _THREAD_FAILED;
- continue;
- }
- if (pthread_cond_init(&(threadp->cond), NULL)) {
- threadp->status = _THREAD_FAILED;
- continue;
- }
- threadp->req = NULL;
- threadp->processed_req = NULL;
+ threadp->current_req = NULL;
+ threadp->requests = 0;
+ threadp->next = threads;
+ threads = threadp;
if (pthread_create(&threadp->thread, &globattr, aio_thread_loop, threadp)) {
fprintf(stderr, "Thread creation failed\n");
threadp->status = _THREAD_FAILED;
continue;
}
- threadp->next = wait_threads;
- wait_threads = threadp;
-#if AIO_PROPER_MUTEX
- pthread_mutex_lock(&threadp->mutex);
-#endif
}
/* Create request pool */
aio_medium_bufs = memPoolCreate("aio_medium_bufs", AIO_MEDIUM_BUFS);
aio_small_bufs = memPoolCreate("aio_small_bufs", AIO_SMALL_BUFS);
aio_tiny_bufs = memPoolCreate("aio_tiny_bufs", AIO_TINY_BUFS);
+ aio_micro_bufs = memPoolCreate("aio_micro_bufs", AIO_MICRO_BUFS);
aio_initialised = 1;
}
aio_thread_t *threadp = ptr;
aio_request_t *request;
sigset_t new;
-#if !AIO_PROPER_MUTEX
- struct timespec wait_time;
-#endif
/*
* Make sure to ignore signals which may possibly get sent to
sigaddset(&new, SIGALRM);
pthread_sigmask(SIG_BLOCK, &new, NULL);
- pthread_mutex_lock(&threadp->mutex);
while (1) {
-#if AIO_PROPER_MUTEX
- while (threadp->req == NULL) {
+ threadp->current_req = request = NULL;
+ request = NULL;
+ /* Get a request to process */
threadp->status = _THREAD_WAITING;
- pthread_cond_wait(&threadp->cond, &threadp->mutex);
+ pthread_mutex_lock(&request_queue.mutex);
+ while(!request_queue.head) {
+ pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
}
-#else
- /* The timeout is used to unlock the race condition where
- * ->req is set between the check and pthread_cond_wait.
- * The thread steps it's own clock on each timeout, to avoid a CPU
- * spin situation if the main thread is suspended (paging), and
- * squid_curtime is not being updated timely.
- */
- wait_time.tv_sec = squid_curtime + 1; /* little quicker first time */
- wait_time.tv_nsec = 0;
- while (threadp->req == NULL) {
- threadp->status = _THREAD_WAITING;
- pthread_cond_timedwait(&threadp->cond, &threadp->mutex, &wait_time);
- wait_time.tv_sec += 3; /* then wait 3 seconds between each check */
- }
-#endif
- request = threadp->req;
+ request = request_queue.head;
+ if (request)
+ request_queue.head = request->next;
+ if (!request_queue.head)
+ request_queue.tailp = &request_queue.head;
+ pthread_mutex_unlock(&request_queue.mutex);
+ /* process the request */
+ threadp->status = _THREAD_BUSY;
+ request->next = NULL;
+ threadp->current_req = request;
errno = 0;
if (!request->cancelled) {
switch (request->request_type) {
request->ret = -1;
request->err = EINTR;
}
- threadp->req = NULL; /* tells main thread that we are done */
- } /* while */
+ threadp->status = _THREAD_DONE;
+ /* put the request in the done queue */
+ pthread_mutex_lock(&done_queue.mutex);
+ *done_queue.tailp = request;
+ done_queue.tailp = &request->next;
+ pthread_mutex_unlock(&done_queue.mutex);
+ threadp->requests++;
+ } /* while forever */
return NULL;
} /* aio_thread_loop */
static void
-aio_do_request(aio_request_t * requestp)
-{
- if (wait_threads == NULL && busy_threads_head == NULL) {
- fprintf(stderr, "PANIC: No threads to service requests with!\n");
- exit(-1);
- }
- aio_queue_request(requestp);
-} /* aio_do_request */
-
-
-static void
-aio_queue_request(aio_request_t * requestp)
+aio_queue_request(aio_request_t * request)
{
- aio_request_t *rp;
- static int last_warn = 0;
static int high_start = 0;
- static int queue_high, queue_low;
- int i;
-
+ debug(41, 9) ("aio_queue_request: %p type=%d result=%p\n",
+ request, request->request_type, request->resultp);
/* Mark it as not executed (failing result, no error) */
- requestp->ret = -1;
- requestp->err = 0;
- /* Queue it on the request queue */
- if (request_queue_head == NULL) {
- request_queue_head = requestp;
- request_queue_tail = requestp;
+ request->ret = -1;
+ request->err = 0;
+ /* Internal housekeeping */
+ request_queue_len += 1;
+ request->resultp->_data = request;
+ /* Play some tricks with the request_queue2 queue */
+ request->next = NULL;
+ if (!request_queue2.head) {
+ if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
+ /* Normal path */
+ *request_queue.tailp = request;
+ request_queue.tailp = &request->next;
+ pthread_cond_signal(&request_queue.cond);
+ pthread_mutex_unlock(&request_queue.mutex);
} else {
- request_queue_tail->next = requestp;
- request_queue_tail = requestp;
+ /* Oops, the request queue is blocked, use request_queue2 */
+ *request_queue2.tailp = request;
+ request_queue2.tailp = &request->next;
+ }
+ } else {
+ /* Secondary path. We have blocked requests to deal with */
+ /* add the request to the chain */
+ *request_queue2.tailp = request;
+ if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
+ /* Ok, the queue is no longer blocked */
+ *request_queue.tailp = request_queue2.head;
+ request_queue.tailp = &request->next;
+ pthread_cond_signal(&request_queue.cond);
+ pthread_mutex_unlock(&request_queue.mutex);
+ request_queue2.head = NULL;
+ request_queue2.tailp = &request_queue2.head;
+ } else {
+ /* still blocked, bump the blocked request chain */
+ request_queue2.tailp = &request->next;
+ }
+ }
+ if (request_queue2.head) {
+ static int filter = 0;
+ static int filter_limit = 8;
+ if (++filter >= filter_limit) {
+ filter_limit += filter;
+ filter = 0;
+ debug(43, 1) ("aio_queue_request: WARNING - Queue congestion\n");
+ }
}
- requestp->next = NULL;
- request_queue_len += 1;
- /* Poll done threads if needed */
- if (wait_threads == NULL)
- aio_poll_threads();
- /* Kick it rolling */
- aio_process_request_queue();
/* Warn if out of threads */
- if (request_queue_len > (NUMTHREADS >> 1)) {
+ if (request_queue_len > MAGIC1) {
+ static int last_warn = 0;
+ static int queue_high, queue_low;
if (high_start == 0) {
high_start = squid_curtime;
queue_high = request_queue_len;
if (request_queue_len < queue_low)
queue_low = request_queue_len;
if (squid_curtime >= (last_warn + 15) &&
- squid_curtime >= (high_start + 3)) {
- debug(43, 1) ("aio_queue_request: WARNING - Running out of I/O threads\n");
- debug(43, 2) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n",
- request_queue_len, queue_high, queue_low, squid_curtime - high_start);
- debug(43, 1) ("aio_queue_request: Perhaps you should increase NUMTHREADS\n");
- debug(43, 1) ("aio_queue_request: Or install more disks to share the load\n");
- debug(43, 3) ("aio_queue_request: First %d items on request queue\n", NUMTHREADS);
- rp = request_queue_head;
- for (i = 1; i <= NUMTHREADS; i++) {
- switch (rp->request_type) {
- case _AIO_OP_OPEN:
- debug(43, 3) ("aio_queue_request: %d : open -> %s\n", i, rp->path);
- break;
- case _AIO_OP_READ:
- debug(43, 3) ("aio_queue_request: %d : read -> FD = %d\n", i, rp->fd);
- break;
- case _AIO_OP_WRITE:
- debug(43, 3) ("aio_queue_request: %d : write -> FD = %d\n", i, rp->fd);
- break;
- case _AIO_OP_CLOSE:
- debug(43, 3) ("aio_queue_request: %d : close -> FD = %d\n", i, rp->fd);
- break;
- case _AIO_OP_UNLINK:
- debug(43, 3) ("aio_queue_request: %d : unlink -> %s\n", i, rp->path);
- break;
- case _AIO_OP_TRUNCATE:
- debug(43, 3) ("aio_queue_request: %d : truncate -> %s\n", i, rp->path);
- break;
- case _AIO_OP_STAT:
- debug(43, 3) ("aio_queue_request: %d : stat -> %s\n", i, rp->path);
- break;
- default:
- debug(43, 1) ("aio_queue_request: %d : Unimplemented request type: %d\n", i, rp->request_type);
- break;
- }
- if ((rp = rp->next) == NULL)
- break;
- }
+ squid_curtime >= (high_start + 5)) {
+ debug(43, 1) ("aio_queue_request: WARNING - Disk I/O overloading\n");
+ if (squid_curtime >= (high_start + 15))
+ debug(43, 1) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n",
+ request_queue_len, queue_high, queue_low, squid_curtime - high_start);
last_warn = squid_curtime;
}
} else {
high_start = 0;
}
+ /* Warn if seriously overloaded */
if (request_queue_len > RIDICULOUS_LENGTH) {
debug(43, 0) ("aio_queue_request: Async request queue growing uncontrollably!\n");
debug(43, 0) ("aio_queue_request: Syncing pending I/O operations.. (blocking)\n");
}
} /* aio_queue_request */
-
-static void
-aio_process_request_queue(void)
-{
- aio_thread_t *threadp;
- aio_request_t *requestp;
-
- for (;;) {
- if (wait_threads == NULL || request_queue_head == NULL)
- return;
-
- requestp = request_queue_head;
- if ((request_queue_head = requestp->next) == NULL)
- request_queue_tail = NULL;
- requestp->next = NULL;
- request_queue_len--;
-
- if (requestp->cancelled) {
- aio_cleanup_request(requestp);
- continue;
- }
- threadp = wait_threads;
- wait_threads = threadp->next;
- threadp->next = NULL;
-
- if (busy_threads_head != NULL)
- busy_threads_tail->next = threadp;
- else
- busy_threads_head = threadp;
- busy_threads_tail = threadp;
-
- threadp->status = _THREAD_BUSY;
- threadp->req = threadp->processed_req = requestp;
- pthread_cond_signal(&(threadp->cond));
-#if AIO_PROPER_MUTEX
- pthread_mutex_unlock(&threadp->mutex);
-#endif
- }
-} /* aio_process_request_queue */
-
-
static void
aio_cleanup_request(aio_request_t * requestp)
{
if (!cancelled && requestp->ret == 0)
xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
aio_xfree(requestp->tmpstatp, sizeof(struct stat));
+ aio_xstrfree(requestp->path);
+ break;
case _AIO_OP_OPEN:
if (cancelled && requestp->ret >= 0)
/* The open() was cancelled but completed */
close(requestp->ret);
- xfree(requestp->path);
+ aio_xstrfree(requestp->path);
break;
case _AIO_OP_CLOSE:
if (cancelled && requestp->ret < 0)
case _AIO_OP_UNLINK:
case _AIO_OP_TRUNCATE:
case _AIO_OP_OPENDIR:
- xfree(requestp->path);
+ aio_xstrfree(requestp->path);
break;
case _AIO_OP_READ:
if (!cancelled && requestp->ret > 0)
xmemcpy(requestp->bufferp, requestp->tmpbufp, requestp->ret);
+ aio_xfree(requestp->tmpbufp, requestp->buflen);
+ break;
case _AIO_OP_WRITE:
aio_xfree(requestp->tmpbufp, requestp->buflen);
break;
int
aio_cancel(aio_result_t * resultp)
{
- aio_thread_t *threadp;
- aio_request_t *requestp;
+ aio_request_t *request = resultp->_data;
- for (threadp = busy_threads_head; threadp != NULL; threadp = threadp->next)
- if (threadp->processed_req->resultp == resultp) {
- threadp->processed_req->cancelled = 1;
- threadp->processed_req->resultp = NULL;
- return 0;
- }
- for (requestp = request_queue_head; requestp != NULL; requestp = requestp->next)
- if (requestp->resultp == resultp) {
- requestp->cancelled = 1;
- requestp->resultp = NULL;
- return 0;
- }
- for (requestp = request_done_head; requestp != NULL; requestp = requestp->next)
- if (requestp->resultp == resultp) {
- requestp->cancelled = 1;
- requestp->resultp = NULL;
- return 0;
- }
+ if (request && request->resultp == resultp) {
+ debug(41, 9) ("aio_cancel: %p type=%d result=%p\n",
+ request, request->request_type, request->resultp);
+ request->cancelled = 1;
+ request->resultp = NULL;
+ resultp->_data = NULL;
+ return 0;
+ }
return 1;
} /* aio_cancel */
aio_open(const char *path, int oflag, mode_t mode, aio_result_t * resultp)
{
aio_request_t *requestp;
- int len;
if (!aio_initialised)
aio_init();
- if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
- errno = ENOMEM;
- return -1;
- }
- len = strlen(path) + 1;
- if ((requestp->path = (char *) xmalloc(len)) == NULL) {
- memPoolFree(aio_request_pool, requestp);
- errno = ENOMEM;
- return -1;
- }
- strncpy(requestp->path, path, len);
+ requestp = memPoolAlloc(aio_request_pool);
+ requestp->path = (char *) aio_xstrdup(path);
requestp->oflag = oflag;
requestp->mode = mode;
requestp->resultp = resultp;
requestp->request_type = _AIO_OP_OPEN;
requestp->cancelled = 0;
- aio_do_request(requestp);
+ aio_queue_request(requestp);
return 0;
}
if (!aio_initialised)
aio_init();
- if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
- errno = ENOMEM;
- return -1;
- }
+ requestp = memPoolAlloc(aio_request_pool);
requestp->fd = fd;
requestp->bufferp = bufp;
requestp->tmpbufp = (char *) aio_xmalloc(bufs);
requestp->request_type = _AIO_OP_READ;
requestp->cancelled = 0;
- aio_do_request(requestp);
+ aio_queue_request(requestp);
return 0;
}
if (!aio_initialised)
aio_init();
- if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
- errno = ENOMEM;
- return -1;
- }
+ requestp = memPoolAlloc(aio_request_pool);
requestp->fd = fd;
requestp->tmpbufp = (char *) aio_xmalloc(bufs);
xmemcpy(requestp->tmpbufp, bufp, bufs);
requestp->request_type = _AIO_OP_WRITE;
requestp->cancelled = 0;
- aio_do_request(requestp);
+ aio_queue_request(requestp);
return 0;
}
if (!aio_initialised)
aio_init();
- if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
- errno = ENOMEM;
- return -1;
- }
+ requestp = memPoolAlloc(aio_request_pool);
requestp->fd = fd;
requestp->resultp = resultp;
requestp->request_type = _AIO_OP_CLOSE;
requestp->cancelled = 0;
- aio_do_request(requestp);
+ aio_queue_request(requestp);
return 0;
}
aio_stat(const char *path, struct stat *sb, aio_result_t * resultp)
{
aio_request_t *requestp;
- int len;
if (!aio_initialised)
aio_init();
- if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
- errno = ENOMEM;
- return -1;
- }
- len = strlen(path) + 1;
- requestp->path = (char *) xmalloc(len);
- strncpy(requestp->path, path, len);
+ requestp = memPoolAlloc(aio_request_pool);
+ requestp->path = (char *) aio_xstrdup(path);
requestp->statp = sb;
requestp->tmpstatp = (struct stat *) aio_xmalloc(sizeof(struct stat));
requestp->resultp = resultp;
requestp->request_type = _AIO_OP_STAT;
requestp->cancelled = 0;
- aio_do_request(requestp);
+ aio_queue_request(requestp);
return 0;
}
aio_unlink(const char *path, aio_result_t * resultp)
{
aio_request_t *requestp;
- int len;
if (!aio_initialised)
aio_init();
- if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
- errno = ENOMEM;
- return -1;
- }
- len = strlen(path) + 1;
- requestp->path = (char *) xmalloc(len);
- strncpy(requestp->path, path, len);
+ requestp = memPoolAlloc(aio_request_pool);
+ requestp->path = aio_xstrdup(path);
requestp->resultp = resultp;
requestp->request_type = _AIO_OP_UNLINK;
requestp->cancelled = 0;
- aio_do_request(requestp);
+ aio_queue_request(requestp);
return 0;
}
aio_truncate(const char *path, off_t length, aio_result_t * resultp)
{
aio_request_t *requestp;
- int len;
if (!aio_initialised)
aio_init();
- if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
- errno = ENOMEM;
- return -1;
- }
- len = strlen(path) + 1;
- if ((requestp->path = (char *) xmalloc(len)) == NULL) {
- memPoolFree(aio_request_pool, requestp);
- errno = ENOMEM;
- return -1;
- }
+ requestp = memPoolAlloc(aio_request_pool);
+ requestp->path = (char *) aio_xstrdup(path);
requestp->offset = length;
- strncpy(requestp->path, path, len);
requestp->resultp = resultp;
requestp->request_type = _AIO_OP_TRUNCATE;
requestp->cancelled = 0;
- aio_do_request(requestp);
+ aio_queue_request(requestp);
return 0;
}
#if AIO_OPENDIR
-/* XXX aio_opendir NOT implemented? */
+/* XXX aio_opendir NOT implemented yet.. */
int
aio_opendir(const char *path, aio_result_t * resultp)
if (!aio_initialised)
aio_init();
- if ((requestp = memPoolAlloc(aio_request_pool)) == NULL) {
- errno = ENOMEM;
- return -1;
- }
+ requestp = memPoolAlloc(aio_request_pool);
return -1;
}
#endif
-
-void
-aio_poll_threads(void)
-{
- aio_thread_t *prev;
- aio_thread_t *threadp;
- aio_request_t *requestp;
-
- do { /* while found completed thread */
- prev = NULL;
- threadp = busy_threads_head;
- while (threadp) {
- debug(43, 9) ("aio_poll_threads: %p: request type %d -> status %d\n",
- threadp,
- threadp->processed_req->request_type,
- threadp->status);
-#if AIO_PROPER_MUTEX
- if (threadp->req == NULL)
- if (pthread_mutex_trylock(&threadp->mutex) == 0)
- break;
-#else
- if (threadp->req == NULL)
- break;
-#endif
- prev = threadp;
- threadp = threadp->next;
+static void
+aio_poll_queues(void)
+{
+ /* kick "overflow" request queue */
+ if (request_queue2.head &&
+ pthread_mutex_trylock(&request_queue.mutex) == 0) {
+ *request_queue.tailp = request_queue2.head;
+ request_queue.tailp = request_queue2.tailp;
+ pthread_cond_signal(&request_queue.cond);
+ pthread_mutex_unlock(&request_queue.mutex);
+ request_queue2.head = NULL;
+ request_queue2.tailp = &request_queue2.head;
+ }
+ /* poll done queue */
+ if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) {
+ struct aio_request_t *requests = done_queue.head;
+ done_queue.head = NULL;
+ done_queue.tailp = &done_queue.head;
+ pthread_mutex_unlock(&done_queue.mutex);
+ *done_requests.tailp = requests;
+ request_queue_len -= 1;
+ while(requests->next) {
+ requests = requests->next;
+ request_queue_len -= 1;
}
- if (threadp == NULL)
- break;
-
- if (prev == NULL)
- busy_threads_head = busy_threads_head->next;
- else
- prev->next = threadp->next;
-
- if (busy_threads_tail == threadp)
- busy_threads_tail = prev;
-
- requestp = threadp->processed_req;
- threadp->processed_req = NULL;
-
- threadp->next = wait_threads;
- wait_threads = threadp;
-
- if (request_done_tail != NULL)
- request_done_tail->next = requestp;
- else
- request_done_head = requestp;
- request_done_tail = requestp;
- } while (threadp);
-
- aio_process_request_queue();
-} /* aio_poll_threads */
+ done_requests.tailp = &requests->next;
+ }
+ /* Give up the CPU to allow the threads to do their work */
+ if (done_queue.head || request_queue.head)
+ sched_yield();
+}
aio_result_t *
aio_poll_done(void)
{
- aio_request_t *requestp;
+ aio_request_t *request;
aio_result_t *resultp;
int cancelled;
+ int polled = 0;
AIO_REPOLL:
- aio_poll_threads();
- if (request_done_head == NULL) {
+ request = done_requests.head;
+ if (request == NULL && !polled) {
+ aio_poll_queues();
+ polled = 1;
+ request = done_requests.head;
+ }
+ if (!request) {
return NULL;
}
- requestp = request_done_head;
- request_done_head = requestp->next;
- if (!request_done_head)
- request_done_tail = NULL;
-
- resultp = requestp->resultp;
- cancelled = requestp->cancelled;
- aio_debug(requestp);
- debug(43, 5) ("DONE: %d -> %d\n", requestp->ret, requestp->err);
- aio_cleanup_request(requestp);
+ debug(41, 9) ("aio_poll_done: %p type=%d result=%p\n",
+ request, request->request_type, request->resultp);
+ done_requests.head = request->next;
+ if (!done_requests.head)
+ done_requests.tailp = &done_requests.head;
+ resultp = request->resultp;
+ cancelled = request->cancelled;
+ aio_debug(request);
+ debug(43, 5) ("DONE: %d -> %d\n", request->ret, request->err);
+ aio_cleanup_request(request);
if (cancelled)
goto AIO_REPOLL;
return resultp;
int
aio_operations_pending(void)
{
- return request_queue_len + (request_done_head != NULL) + (busy_threads_head != NULL);
-}
-
-int
-aio_overloaded(void)
-{
- static time_t last_warn = 0;
- if (aio_operations_pending() > RIDICULOUS_LENGTH / 4) {
- if (squid_curtime >= (last_warn + 15)) {
- debug(43, 0) ("Warning: Async-IO overloaded\n");
- last_warn = squid_curtime;
- }
- return 1;
- }
- return 0;
+ return request_queue_len + (done_requests.head ? 1 : 0);
}
int
aio_sync(void)
{
- int loop_count = 0;
+ /* XXX This might take a while if the queue is large.. */
do {
- aio_poll_threads();
- assert(++loop_count < 10);
+ aio_poll_queues();
} while (request_queue_len > 0);
return aio_operations_pending();
}
}
static void
-aio_debug(aio_request_t * requestp)
+aio_debug(aio_request_t * request)
{
- switch (requestp->request_type) {
+ switch (request->request_type) {
case _AIO_OP_OPEN:
- debug(43, 5) ("OPEN of %s to FD %d\n", requestp->path, requestp->ret);
+ debug(43, 5) ("OPEN of %s to FD %d\n", request->path, request->ret);
break;
case _AIO_OP_READ:
- debug(43, 5) ("READ on fd: %d\n", requestp->fd);
+ debug(43, 5) ("READ on fd: %d\n", request->fd);
break;
case _AIO_OP_WRITE:
- debug(43, 5) ("WRITE on fd: %d\n", requestp->fd);
+ debug(43, 5) ("WRITE on fd: %d\n", request->fd);
break;
case _AIO_OP_CLOSE:
- debug(43, 5) ("CLOSE of fd: %d\n", requestp->fd);
+ debug(43, 5) ("CLOSE of fd: %d\n", request->fd);
break;
case _AIO_OP_UNLINK:
- debug(43, 5) ("UNLINK of %s\n", requestp->path);
+ debug(43, 5) ("UNLINK of %s\n", request->path);
break;
case _AIO_OP_TRUNCATE:
- debug(43, 5) ("UNLINK of %s\n", requestp->path);
+ debug(43, 5) ("UNLINK of %s\n", request->path);
break;
default:
break;
/*
- * $Id: async_io.cc,v 1.5 2000/06/27 08:33:53 hno Exp $
+ * $Id: async_io.cc,v 1.6 2000/11/10 21:42:03 hno Exp $
*
* DEBUG: section 32 Asynchronous Disk I/O
* AUTHOR: Pete Bentley <pete@demon.net>
AIOCB *done_handler;
void *done_handler_data;
aio_result_t result;
+ char *bufp;
+ FREE *free_func;
+ dlink_node node;
} aio_ctrl_t;
struct {
struct aio_unlinkq_t *next;
} aio_unlinkq_t;
-static aio_ctrl_t *used_list = NULL;
+static dlink_list used_list;
static int initialised = 0;
static OBJH aioStats;
static MemPool *aio_ctrl_pool;
aioOpen(const char *path, int oflag, mode_t mode, AIOCB * callback, void *callback_data)
{
aio_ctrl_t *ctrlp;
- int ret;
assert(initialised);
aio_counts.open++;
ctrlp->done_handler_data = callback_data;
ctrlp->operation = _AIO_OPEN;
cbdataLock(callback_data);
- if (aio_open(path, oflag, mode, &ctrlp->result) < 0) {
- ret = open(path, oflag, mode);
- if (callback)
- (callback) (ctrlp->fd, callback_data, ret, errno);
- cbdataUnlock(callback_data);
- memPoolFree(aio_ctrl_pool, ctrlp);
- return;
- }
- ctrlp->next = used_list;
- used_list = ctrlp;
+ ctrlp->result.data = ctrlp;
+ aio_open(path, oflag, mode, &ctrlp->result);
+ dlinkAdd(ctrlp, &ctrlp->node, &used_list);
return;
}
ctrlp->done_handler = NULL;
ctrlp->done_handler_data = NULL;
ctrlp->operation = _AIO_CLOSE;
- if (aio_close(fd, &ctrlp->result) < 0) {
- close(fd); /* Can't create thread - do a normal close */
- memPoolFree(aio_ctrl_pool, ctrlp);
- aioFDWasClosed(fd);
- return;
- }
- ctrlp->next = used_list;
- used_list = ctrlp;
+ ctrlp->result.data = ctrlp;
+ aio_close(fd, &ctrlp->result);
+ dlinkAdd(ctrlp, &ctrlp->node, &used_list);
return;
}
aioCancel(int fd)
{
aio_ctrl_t *curr;
- aio_ctrl_t *prev;
- aio_ctrl_t *next;
AIOCB *done_handler;
void *their_data;
+ dlink_node *m, *next;
assert(initialised);
aio_counts.cancel++;
- prev = NULL;
- curr = used_list;
- for (curr = used_list;; curr = next) {
- while (curr != NULL) {
+ for (m = used_list.head; m; m = next) {
+ while (m) {
+ curr = m->data;
if (curr->fd == fd)
break;
- prev = curr;
- curr = curr->next;
+ m = m->next;
}
- if (curr == NULL)
+ if (m == NULL)
break;
aio_cancel(&curr->result);
done_handler(fd, their_data, -2, -2);
cbdataUnlock(their_data);
}
- next = curr->next;
- if (prev == NULL)
- used_list = next;
- else
- prev->next = next;
-
+ next = m->next;
+ dlinkDelete(m, &used_list);
memPoolFree(aio_ctrl_pool, curr);
}
}
assert(initialised);
aio_counts.write++;
- for (ctrlp = used_list; ctrlp != NULL; ctrlp = ctrlp->next)
- if (ctrlp->fd == fd)
- break;
- if (ctrlp != NULL) {
- debug(0, 0) ("aioWrite: EWOULDBLOCK\n");
- errno = EWOULDBLOCK;
- if (callback)
- (callback) (fd, callback_data, -1, errno);
- if (free_func)
- free_func(bufp);
- return;
- }
ctrlp = memPoolAlloc(aio_ctrl_pool);
ctrlp->fd = fd;
ctrlp->done_handler = callback;
ctrlp->done_handler_data = callback_data;
ctrlp->operation = _AIO_WRITE;
+ ctrlp->bufp = bufp;
+ ctrlp->free_func = free_func;
if (offset >= 0)
seekmode = SEEK_SET;
else {
offset = 0;
}
cbdataLock(callback_data);
- if (aio_write(fd, bufp, len, offset, seekmode, &ctrlp->result) < 0) {
- if (errno == ENOMEM || errno == EAGAIN || errno == EINVAL)
- errno = EWOULDBLOCK;
- if (callback)
- (callback) (fd, callback_data, -1, errno);
- cbdataUnlock(callback_data);
- memPoolFree(aio_ctrl_pool, ctrlp);
- } else {
- ctrlp->next = used_list;
- used_list = ctrlp;
- }
- /*
- * aio_write copies the buffer so we can free it here
- */
- if (free_func)
- free_func(bufp);
+ ctrlp->result.data = ctrlp;
+ aio_write(fd, bufp, len, offset, seekmode, &ctrlp->result);
+ dlinkAdd(ctrlp, &ctrlp->node, &used_list);
} /* aioWrite */
assert(initialised);
aio_counts.read++;
- for (ctrlp = used_list; ctrlp != NULL; ctrlp = ctrlp->next)
- if (ctrlp->fd == fd)
- break;
- if (ctrlp != NULL) {
- errno = EWOULDBLOCK;
- if (callback)
- (callback) (fd, callback_data, -1, errno);
- return;
- }
ctrlp = memPoolAlloc(aio_ctrl_pool);
ctrlp->fd = fd;
ctrlp->done_handler = callback;
offset = 0;
}
cbdataLock(callback_data);
- if (aio_read(fd, bufp, len, offset, seekmode, &ctrlp->result) < 0) {
- if (errno == ENOMEM || errno == EAGAIN || errno == EINVAL)
- errno = EWOULDBLOCK;
- if (callback)
- (callback) (fd, callback_data, -1, errno);
- cbdataUnlock(callback_data);
- memPoolFree(aio_ctrl_pool, ctrlp);
- return;
- }
- ctrlp->next = used_list;
- used_list = ctrlp;
+ ctrlp->result.data = ctrlp;
+ aio_read(fd, bufp, len, offset, seekmode, &ctrlp->result);
+ dlinkAdd(ctrlp, &ctrlp->node, &used_list);
return;
} /* aioRead */
ctrlp->done_handler_data = callback_data;
ctrlp->operation = _AIO_STAT;
cbdataLock(callback_data);
- if (aio_stat(path, sb, &ctrlp->result) < 0) {
- if (errno == ENOMEM || errno == EAGAIN || errno == EINVAL)
- errno = EWOULDBLOCK;
- if (callback)
- (callback) (ctrlp->fd, callback_data, -1, errno);
- cbdataUnlock(callback_data);
- memPoolFree(aio_ctrl_pool, ctrlp);
- return;
- }
- ctrlp->next = used_list;
- used_list = ctrlp;
+ ctrlp->result.data = ctrlp;
+ aio_stat(path, sb, &ctrlp->result);
+ dlinkAdd(ctrlp, &ctrlp->node, &used_list);
return;
} /* aioStat */
void
-aioUnlink(const char *pathname, AIOCB * callback, void *callback_data)
+aioUnlink(const char *path, AIOCB * callback, void *callback_data)
{
aio_ctrl_t *ctrlp;
- char *path;
assert(initialised);
aio_counts.unlink++;
ctrlp = memPoolAlloc(aio_ctrl_pool);
ctrlp->done_handler = callback;
ctrlp->done_handler_data = callback_data;
ctrlp->operation = _AIO_UNLINK;
- path = xstrdup(pathname);
cbdataLock(callback_data);
- if (aio_unlink(path, &ctrlp->result) < 0) {
- int ret = unlink(path);
- if (callback)
- (callback) (ctrlp->fd, callback_data, ret, errno);
- cbdataUnlock(callback_data);
- memPoolFree(aio_ctrl_pool, ctrlp);
- xfree(path);
- return;
- }
- ctrlp->next = used_list;
- used_list = ctrlp;
- xfree(path);
+ ctrlp->result.data = ctrlp;
+ aio_unlink(path, &ctrlp->result);
+ dlinkAdd(ctrlp, &ctrlp->node, &used_list);
} /* aioUnlink */
void
-aioTruncate(const char *pathname, off_t length, AIOCB * callback, void *callback_data)
+aioTruncate(const char *path, off_t length, AIOCB * callback, void *callback_data)
{
aio_ctrl_t *ctrlp;
- char *path;
assert(initialised);
aio_counts.unlink++;
ctrlp = memPoolAlloc(aio_ctrl_pool);
ctrlp->done_handler = callback;
ctrlp->done_handler_data = callback_data;
ctrlp->operation = _AIO_TRUNCATE;
- path = xstrdup(pathname);
cbdataLock(callback_data);
- if (aio_truncate(path, length, &ctrlp->result) < 0) {
- int ret = truncate(path, length);
- if (callback)
- (callback) (ctrlp->fd, callback_data, ret, errno);
- cbdataUnlock(callback_data);
- memPoolFree(aio_ctrl_pool, ctrlp);
- xfree(path);
- return;
- }
- ctrlp->next = used_list;
- used_list = ctrlp;
- xfree(path);
+ ctrlp->result.data = ctrlp;
+ aio_truncate(path, length, &ctrlp->result);
+ dlinkAdd(ctrlp, &ctrlp->node, &used_list);
} /* aioTruncate */
{
aio_result_t *resultp;
aio_ctrl_t *ctrlp;
- aio_ctrl_t *prev;
AIOCB *done_handler;
void *their_data;
int retval = 0;
for (;;) {
if ((resultp = aio_poll_done()) == NULL)
break;
- prev = NULL;
- for (ctrlp = used_list; ctrlp != NULL; prev = ctrlp, ctrlp = ctrlp->next)
- if (&ctrlp->result == resultp)
- break;
+ ctrlp = (aio_ctrl_t *)resultp->data;
if (ctrlp == NULL)
- continue;
- if (prev == NULL)
- used_list = ctrlp->next;
- else
- prev->next = ctrlp->next;
+ continue; /* XXX Should not happen */
+ dlinkDelete(&ctrlp->node, &used_list);
if ((done_handler = ctrlp->done_handler)) {
their_data = ctrlp->done_handler_data;
ctrlp->done_handler = NULL;
ctrlp->done_handler_data = NULL;
- if (cbdataValid(their_data))
+ if (cbdataValid(their_data)) {
retval = 1; /* Return that we've actually done some work */
done_handler(ctrlp->fd, their_data,
ctrlp->result.aio_return, ctrlp->result.aio_errno);
+ }
cbdataUnlock(their_data);
}
+ /* free data if requested to aioWrite() */
+ if (ctrlp->free_func)
+ ctrlp->free_func(ctrlp->bufp);
if (ctrlp->operation == _AIO_CLOSE)
aioFDWasClosed(ctrlp->fd);
memPoolFree(aio_ctrl_pool, ctrlp);
#include "squid.h"
#include "store_asyncufs.h"
+#if ASYNC_READ
static AIOCB storeAufsReadDone;
+#else
+static DRCB storeAufsReadDone;
+#endif
+#if ASYNC_WRITE
static AIOCB storeAufsWriteDone;
+#else
+static DWCB storeAufsWriteDone;
+#endif
static void storeAufsIOCallback(storeIOState * sio, int errflag);
static AIOCB storeAufsOpenDone;
static int storeAufsSomethingPending(storeIOState *);
sfileno f = e->swap_filen;
char *path = storeAufsDirFullPath(SD, f, NULL);
storeIOState *sio;
+#if !ASYNC_OPEN
+ int fd;
+#endif
debug(78, 3) ("storeAufsOpen: fileno %08X\n", f);
/*
* we should detect some 'too many files open' condition and return
* NULL here.
*/
- while (aioQueueSize() > MAGIC1)
+#ifdef MAGIC2
+ if (aioQueueSize() > MAGIC2)
return NULL;
+#endif
+#if !ASYNC_OPEN
+ fd = file_open(path, O_RDONLY);
+ if (fd < 0) {
+ debug(78, 3) ("storeAufsOpen: got failude (%d)\n", errno);
+ return NULL;
+ }
+#endif
sio = memAllocate(MEM_STORE_IO);
cbdataAdd(sio, storeAufsIOFreeEntry, MEM_STORE_IO);
sio->fsstate = memPoolAlloc(aio_state_pool);
sio->callback_data = callback_data;
sio->e = e;
cbdataLock(callback_data);
- aioOpen(path, O_RDONLY, 0644, storeAufsOpenDone, sio);
Opening_FD++;
+#if ASYNC_OPEN
+ aioOpen(path, O_RDONLY, 0644, storeAufsOpenDone, sio);
+#else
+ storeAufsOpenDone(fd, sio, fd, 0);
+#endif
store_open_disk_fd++;
return sio;
}
storeIOState *sio;
sfileno filn;
sdirno dirn;
+#if !ASYNC_CREATE
+ int fd;
+#endif
/* Allocate a number */
dirn = SD->index;
* we should detect some 'too many files open' condition and return
* NULL here.
*/
- while (aioQueueSize() > MAGIC1)
+#ifdef MAGIC2
+ if (aioQueueSize() > MAGIC2)
+ return NULL;
+#endif
+#if !ASYNC_CREATE
+ fd = file_open(path, O_WRONLY | O_CREAT | O_TRUNC);
+ if (fd < 0) {
+ debug(78, 3) ("storeAufsCreate: got failude (%d)\n", errno);
return NULL;
+ }
+#endif
sio = memAllocate(MEM_STORE_IO);
cbdataAdd(sio, storeAufsIOFreeEntry, MEM_STORE_IO);
sio->fsstate = memPoolAlloc(aio_state_pool);
sio->callback_data = callback_data;
sio->e = (StoreEntry *) e;
cbdataLock(callback_data);
- aioOpen(path, O_WRONLY | O_CREAT | O_TRUNC, 0644, storeAufsOpenDone, sio);
Opening_FD++;
+#if ASYNC_CREATE
+ aioOpen(path, O_WRONLY | O_CREAT | O_TRUNC, 0644, storeAufsOpenDone, sio);
+#else
+ storeAufsOpenDone(fd, sio, fd, 0);
+#endif
store_open_disk_fd++;
/* now insert into the replacement policy */
aiostate->flags.close_request = 1;
return;
}
- storeAufsIOCallback(sio, 0);
+ storeAufsIOCallback(sio, DISK_OK);
}
sio->swap_dirn, sio->swap_filen, aiostate->fd);
sio->offset = offset;
aiostate->flags.reading = 1;
+#if ASYNC_READ
aioRead(aiostate->fd, offset, buf, size, storeAufsReadDone, sio);
+#else
+ file_read(aiostate->fd, offset, buf, size, storeAufsReadDone, sio);
+#endif
}
linklistPush(&(aiostate->pending_writes), q);
return;
}
+#if ASYNC_WRITE
if (aiostate->flags.writing) {
struct _queued_write *q;
debug(78, 3) ("storeAufsWrite: queuing write\n");
* XXX it might be nice if aioWrite() gave is immediate
* feedback here about EWOULDBLOCK instead of in the
* callback function
+ * XXX Should never give EWOULDBLOCK under normal operations
+ * if it does then the MAGIC1/2 tuning is wrong.
*/
aioWrite(aiostate->fd, offset, buf, size, storeAufsWriteDone, sio,
free_func);
+#else
+ file_write(aiostate->fd, offset, buf, size, storeAufsWriteDone, sio,
+ free_func);
+#endif
}
/* Unlink */
errno = errflag;
debug(78, 0) ("storeAufsOpenDone: %s\n", xstrerror());
debug(78, 1) ("\t%s\n", storeAufsDirFullPath(INDEXSD(sio->swap_dirn), sio->swap_filen, NULL));
- storeAufsIOCallback(sio, errflag);
+ storeAufsIOCallback(sio, DISK_ERROR);
return;
}
aiostate->fd = fd;
debug(78, 3) ("storeAufsOpenDone: exiting\n");
}
+/*
+ * XXX TODO
+ * if errflag == EWOULDBLOCK, then we'll need to re-queue the
+ * chunk at the beginning of the write_pending list and try
+ * again later.
+ * XXX Should not normally happen.
+ */
+#if ASYNC_READ
static void
storeAufsReadDone(int fd, void *my_data, int len, int errflag)
+#else
+static void
+storeAufsReadDone(int fd, int errflag, size_t len, void *my_data)
+#endif
{
storeIOState *sio = my_data;
aiostate_t *aiostate = (aiostate_t *) sio->fsstate;
ssize_t rlen;
debug(78, 3) ("storeAufsReadDone: dirno %d, fileno %08X, FD %d, len %d\n",
sio->swap_dirn, sio->swap_filen, fd, len);
+ aiostate->flags.inreaddone = 1;
aiostate->flags.reading = 0;
if (errflag) {
debug(78, 3) ("storeAufsReadDone: got failure (%d)\n", errflag);
rlen = (ssize_t) len;
sio->offset += len;
}
+#if ASYNC_READ
+ /* translate errflag from errno to Squid disk error */
+ errno = errflag;
+ if (errflag)
+ errflag = DISK_ERROR;
+ else
+ errflag = DISK_OK;
+#else
+ if (errflag == DISK_EOF)
+ errflag = DISK_OK; /* EOF is signalled by len == 0, not errors... */
+#endif
assert(callback);
assert(their_data);
sio->read.callback = NULL;
if (cbdataValid(their_data))
callback(their_data, aiostate->read_buf, rlen);
cbdataUnlock(their_data);
- /*
- * XXX is this safe? The above callback may have caused sio
- * to be freed/closed already? Philip Guenther <guenther@gac.edu>
- * says it fixes his FD leaks, with no side effects.
- */
+ aiostate->flags.inreaddone = 0;
if (aiostate->flags.close_request)
- storeAufsIOCallback(sio, DISK_OK);
+ storeAufsIOCallback(sio, errflag);
}
/*
* if errflag == EWOULDBLOCK, then we'll need to re-queue the
* chunk at the beginning of the write_pending list and try
* again later.
+ * XXX Should not normally happen.
*/
+#if ASYNC_WRITE
static void
storeAufsWriteDone(int fd, void *my_data, int len, int errflag)
+#else
+static void
+storeAufsWriteDone(int fd, int errflag, size_t len, void *my_data)
+#endif
{
static int loop_detect = 0;
storeIOState *sio = my_data;
aiostate_t *aiostate = (aiostate_t *) sio->fsstate;
- debug(78, 3) ("storeAufsWriteDone: dirno %d, fileno %08X, FD %d, len %d\n",
- sio->swap_dirn, sio->swap_filen, fd, len);
+ debug(78, 3) ("storeAufsWriteDone: dirno %d, fileno %08X, FD %d, len %d, err=%d\n",
+ sio->swap_dirn, sio->swap_filen, fd, len, errflag);
+#if ASYNC_WRITE
+ /* Translate from errno to Squid disk error */
+ errno = errflag;
+ if (errflag)
+ errflag = errno == ENOSP ? DISK_NO_SPACE_LEFT : DISK_ERROR;
+ else
+ errflag = DISK_OK;
+#endif
assert(++loop_detect < 10);
aiostate->flags.writing = 0;
if (errflag) {
return;
}
sio->offset += len;
- if (storeAufsKickWriteQueue(sio))
- (void) 0;
+#if ASYNC_WRITE
+ if (!storeAufsKickWriteQueue(sio))
+ 0;
else if (aiostate->flags.close_request)
storeAufsIOCallback(sio, errflag);
+#else
+ if (!aiostate->flags.write_kicking) {
+ aiostate->flags.write_kicking = 1;
+ while (storeAufsKickWriteQueue(sio))
+ (void) 0;
+ aiostate->flags.write_kicking = 0;
+ if (aiostate->flags.close_request)
+ storeAufsIOCallback(sio, errflag);
+ }
+#endif
loop_detect--;
}
return 1;
if (aiostate->flags.opening)
return 1;
+ if (aiostate->flags.inreaddone)
+ return 1;
return 0;
}