]> git.ipfire.org Git - thirdparty/squid.git/blame - src/fs/aufs/aiops.cc
fix delaypools/delay-pools typo and remove references to HEAP_REPLACEMENT
[thirdparty/squid.git] / src / fs / aufs / aiops.cc
CommitLineData
cd748f27 1/*
2b6662ba 2 * $Id: aiops.cc,v 1.7 2001/01/12 00:37:32 wessels Exp $
cd748f27 3 *
4 * DEBUG: section 43 AIOPS
5 * AUTHOR: Stewart Forster <slf@connect.com.au>
6 *
2b6662ba 7 * SQUID Web Proxy Cache http://www.squid-cache.org/
cd748f27 8 * ----------------------------------------------------------
9 *
2b6662ba 10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
cd748f27 18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
23 *
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 */
34
35#include "squid.h"
36#include "store_asyncufs.h"
37
38#include <stdio.h>
39#include <sys/types.h>
40#include <sys/stat.h>
41#include <fcntl.h>
42#include <pthread.h>
43#include <errno.h>
44#include <dirent.h>
45#include <signal.h>
46#if HAVE_SCHED_H
47#include <sched.h>
48#endif
49
50#define RIDICULOUS_LENGTH 4096
51
cd748f27 52enum _aio_thread_status {
53 _THREAD_STARTING = 0,
54 _THREAD_WAITING,
55 _THREAD_BUSY,
56 _THREAD_FAILED,
57 _THREAD_DONE
58};
59
60enum _aio_request_type {
61 _AIO_OP_NONE = 0,
62 _AIO_OP_OPEN,
63 _AIO_OP_READ,
64 _AIO_OP_WRITE,
65 _AIO_OP_CLOSE,
66 _AIO_OP_UNLINK,
15a47d1d 67 _AIO_OP_TRUNCATE,
cd748f27 68 _AIO_OP_OPENDIR,
69 _AIO_OP_STAT
70};
71
72typedef struct aio_request_t {
55f0e6f7 73 struct aio_request_t *next;
cd748f27 74 enum _aio_request_type request_type;
75 int cancelled;
76 char *path;
77 int oflag;
78 mode_t mode;
79 int fd;
80 char *bufferp;
81 char *tmpbufp;
82 int buflen;
83 off_t offset;
84 int whence;
85 int ret;
86 int err;
87 struct stat *tmpstatp;
88 struct stat *statp;
89 aio_result_t *resultp;
cd748f27 90} aio_request_t;
91
55f0e6f7 92typedef struct aio_request_queue_t {
93 pthread_mutex_t mutex;
94 pthread_cond_t cond;
f0debecb 95 aio_request_t *volatile head;
96 aio_request_t *volatile *volatile tailp;
55f0e6f7 97 unsigned long requests;
f0debecb 98 unsigned long blocked; /* main failed to lock the queue */
55f0e6f7 99} aio_request_queue_t;
100
101typedef struct aio_thread_t aio_thread_t;
102struct aio_thread_t {
103 aio_thread_t *next;
cd748f27 104 pthread_t thread;
105 enum _aio_thread_status status;
55f0e6f7 106 struct aio_request_t *current_req;
107 unsigned long requests;
108};
cd748f27 109
110int aio_cancel(aio_result_t *);
111int aio_open(const char *, int, mode_t, aio_result_t *);
112int aio_read(int, char *, int, off_t, int, aio_result_t *);
113int aio_write(int, char *, int, off_t, int, aio_result_t *);
114int aio_close(int, aio_result_t *);
115int aio_unlink(const char *, aio_result_t *);
15a47d1d 116int aio_truncate(const char *, off_t length, aio_result_t *);
cd748f27 117int aio_opendir(const char *, aio_result_t *);
118aio_result_t *aio_poll_done();
119int aio_sync(void);
120
121static void aio_init(void);
122static void aio_queue_request(aio_request_t *);
cd748f27 123static void aio_cleanup_request(aio_request_t *);
124static void *aio_thread_loop(void *);
125static void aio_do_open(aio_request_t *);
126static void aio_do_read(aio_request_t *);
127static void aio_do_write(aio_request_t *);
128static void aio_do_close(aio_request_t *);
129static void aio_do_stat(aio_request_t *);
130static void aio_do_unlink(aio_request_t *);
15a47d1d 131static void aio_do_truncate(aio_request_t *);
cd748f27 132#if AIO_OPENDIR
133static void *aio_do_opendir(aio_request_t *);
134#endif
135static void aio_debug(aio_request_t *);
55f0e6f7 136static void aio_poll_queues(void);
cd748f27 137
55f0e6f7 138static aio_thread_t *threads = NULL;
cd748f27 139static int aio_initialised = 0;
140
55f0e6f7 141
58cd5bbd 142#define AIO_LARGE_BUFS 16384
143#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
144#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
145#define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
55f0e6f7 146#define AIO_MICRO_BUFS 128
58cd5bbd 147
f0debecb 148static MemPool *aio_large_bufs = NULL; /* 16K */
55f0e6f7 149static MemPool *aio_medium_bufs = NULL; /* 8K */
f0debecb 150static MemPool *aio_small_bufs = NULL; /* 4K */
151static MemPool *aio_tiny_bufs = NULL; /* 2K */
152static MemPool *aio_micro_bufs = NULL; /* 128K */
58cd5bbd 153
cd748f27 154static int request_queue_len = 0;
155static MemPool *aio_request_pool = NULL;
55f0e6f7 156static MemPool *aio_thread_pool = NULL;
157static aio_request_queue_t request_queue;
f0debecb 158static struct {
159 aio_request_t *head, **tailp;
160} request_queue2 = {
161
162 NULL, &request_queue2.head
163};
55f0e6f7 164static aio_request_queue_t done_queue;
f0debecb 165static struct {
166 aio_request_t *head, **tailp;
167} done_requests = {
168
169 NULL, &done_requests.head
170};
cd748f27 171static pthread_attr_t globattr;
172static struct sched_param globsched;
173static pthread_t main_thread;
174
58cd5bbd 175static MemPool *
176aio_get_pool(int size)
177{
178 MemPool *p;
179 if (size <= AIO_LARGE_BUFS) {
f0debecb 180 if (size <= AIO_MICRO_BUFS)
181 p = aio_micro_bufs;
55f0e6f7 182 else if (size <= AIO_TINY_BUFS)
f0debecb 183 p = aio_tiny_bufs;
184 else if (size <= AIO_SMALL_BUFS)
185 p = aio_small_bufs;
186 else if (size <= AIO_MEDIUM_BUFS)
187 p = aio_medium_bufs;
188 else
189 p = aio_large_bufs;
58cd5bbd 190 } else
191 p = NULL;
192 return p;
193}
194
195static void *
196aio_xmalloc(int size)
197{
198 void *p;
199 MemPool *pool;
200
f0debecb 201 if ((pool = aio_get_pool(size)) != NULL) {
58cd5bbd 202 p = memPoolAlloc(pool);
203 } else
204 p = xmalloc(size);
205
206 return p;
207}
208
55f0e6f7 209static char *
210aio_xstrdup(const char *str)
211{
212 char *p;
f0debecb 213 int len = strlen(str) + 1;
55f0e6f7 214
215 p = aio_xmalloc(len);
216 strncpy(p, str, len);
217
218 return p;
219}
220
58cd5bbd 221static void
222aio_xfree(void *p, int size)
223{
224 MemPool *pool;
225
f0debecb 226 if ((pool = aio_get_pool(size)) != NULL) {
227 memPoolFree(pool, p);
58cd5bbd 228 } else
f0debecb 229 xfree(p);
58cd5bbd 230}
231
55f0e6f7 232static void
233aio_xstrfree(char *str)
234{
235 MemPool *pool;
f0debecb 236 int len = strlen(str) + 1;
55f0e6f7 237
f0debecb 238 if ((pool = aio_get_pool(len)) != NULL) {
239 memPoolFree(pool, str);
55f0e6f7 240 } else
f0debecb 241 xfree(str);
55f0e6f7 242}
243
cd748f27 244static void
245aio_init(void)
246{
247 int i;
248 aio_thread_t *threadp;
249
250 if (aio_initialised)
251 return;
252
253 pthread_attr_init(&globattr);
254#if HAVE_PTHREAD_ATTR_SETSCOPE
255 pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
256#endif
257 globsched.sched_priority = 1;
258 main_thread = pthread_self();
259#if HAVE_PTHREAD_SETSCHEDPARAM
260 pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
261#endif
262 globsched.sched_priority = 2;
263#if HAVE_PTHREAD_ATTR_SETSCHEDPARAM
264 pthread_attr_setschedparam(&globattr, &globsched);
265#endif
266
55f0e6f7 267 /* Initialize request queue */
268 if (pthread_mutex_init(&(request_queue.mutex), NULL))
269 fatal("Failed to create mutex");
270 if (pthread_cond_init(&(request_queue.cond), NULL))
271 fatal("Failed to create condition variable");
272 request_queue.head = NULL;
273 request_queue.tailp = &request_queue.head;
274 request_queue.requests = 0;
275 request_queue.blocked = 0;
276
277 /* Initialize done queue */
278 if (pthread_mutex_init(&(done_queue.mutex), NULL))
279 fatal("Failed to create mutex");
280 if (pthread_cond_init(&(done_queue.cond), NULL))
281 fatal("Failed to create condition variable");
282 done_queue.head = NULL;
283 done_queue.tailp = &done_queue.head;
284 done_queue.requests = 0;
285 done_queue.blocked = 0;
cd748f27 286
55f0e6f7 287 /* Create threads and get them to sit in their wait loop */
288 aio_thread_pool = memPoolCreate("aio_thread", sizeof(aio_thread_t));
cd748f27 289 for (i = 0; i < NUMTHREADS; i++) {
55f0e6f7 290 threadp = memPoolAlloc(aio_thread_pool);
cd748f27 291 threadp->status = _THREAD_STARTING;
55f0e6f7 292 threadp->current_req = NULL;
293 threadp->requests = 0;
294 threadp->next = threads;
295 threads = threadp;
cd748f27 296 if (pthread_create(&threadp->thread, &globattr, aio_thread_loop, threadp)) {
297 fprintf(stderr, "Thread creation failed\n");
298 threadp->status = _THREAD_FAILED;
299 continue;
300 }
cd748f27 301 }
302
303 /* Create request pool */
304 aio_request_pool = memPoolCreate("aio_request", sizeof(aio_request_t));
58cd5bbd 305 aio_large_bufs = memPoolCreate("aio_large_bufs", AIO_LARGE_BUFS);
306 aio_medium_bufs = memPoolCreate("aio_medium_bufs", AIO_MEDIUM_BUFS);
307 aio_small_bufs = memPoolCreate("aio_small_bufs", AIO_SMALL_BUFS);
308 aio_tiny_bufs = memPoolCreate("aio_tiny_bufs", AIO_TINY_BUFS);
55f0e6f7 309 aio_micro_bufs = memPoolCreate("aio_micro_bufs", AIO_MICRO_BUFS);
cd748f27 310
311 aio_initialised = 1;
312}
313
314
315static void *
316aio_thread_loop(void *ptr)
317{
318 aio_thread_t *threadp = ptr;
319 aio_request_t *request;
320 sigset_t new;
cd748f27 321
322 /*
323 * Make sure to ignore signals which may possibly get sent to
324 * the parent squid thread. Causes havoc with mutex's and
325 * condition waits otherwise
326 */
327
328 sigemptyset(&new);
329 sigaddset(&new, SIGPIPE);
330 sigaddset(&new, SIGCHLD);
331#ifdef _SQUID_LINUX_THREADS_
332 sigaddset(&new, SIGQUIT);
333 sigaddset(&new, SIGTRAP);
334#else
335 sigaddset(&new, SIGUSR1);
336 sigaddset(&new, SIGUSR2);
337#endif
338 sigaddset(&new, SIGHUP);
339 sigaddset(&new, SIGTERM);
340 sigaddset(&new, SIGINT);
341 sigaddset(&new, SIGALRM);
342 pthread_sigmask(SIG_BLOCK, &new, NULL);
343
cd748f27 344 while (1) {
55f0e6f7 345 threadp->current_req = request = NULL;
346 request = NULL;
347 /* Get a request to process */
f0debecb 348 threadp->status = _THREAD_WAITING;
55f0e6f7 349 pthread_mutex_lock(&request_queue.mutex);
f0debecb 350 while (!request_queue.head) {
55f0e6f7 351 pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
cd748f27 352 }
55f0e6f7 353 request = request_queue.head;
354 if (request)
355 request_queue.head = request->next;
f0debecb 356 if (!request_queue.head)
357 request_queue.tailp = &request_queue.head;
55f0e6f7 358 pthread_mutex_unlock(&request_queue.mutex);
359 /* process the request */
360 threadp->status = _THREAD_BUSY;
361 request->next = NULL;
362 threadp->current_req = request;
cd748f27 363 errno = 0;
364 if (!request->cancelled) {
365 switch (request->request_type) {
366 case _AIO_OP_OPEN:
367 aio_do_open(request);
368 break;
369 case _AIO_OP_READ:
370 aio_do_read(request);
371 break;
372 case _AIO_OP_WRITE:
373 aio_do_write(request);
374 break;
375 case _AIO_OP_CLOSE:
376 aio_do_close(request);
377 break;
378 case _AIO_OP_UNLINK:
379 aio_do_unlink(request);
380 break;
15a47d1d 381 case _AIO_OP_TRUNCATE:
382 aio_do_truncate(request);
383 break;
cd748f27 384#if AIO_OPENDIR /* Opendir not implemented yet */
385 case _AIO_OP_OPENDIR:
386 aio_do_opendir(request);
387 break;
388#endif
389 case _AIO_OP_STAT:
390 aio_do_stat(request);
391 break;
392 default:
393 request->ret = -1;
394 request->err = EINVAL;
395 break;
396 }
397 } else { /* cancelled */
398 request->ret = -1;
399 request->err = EINTR;
400 }
55f0e6f7 401 threadp->status = _THREAD_DONE;
402 /* put the request in the done queue */
403 pthread_mutex_lock(&done_queue.mutex);
404 *done_queue.tailp = request;
405 done_queue.tailp = &request->next;
406 pthread_mutex_unlock(&done_queue.mutex);
407 threadp->requests++;
408 } /* while forever */
cd748f27 409 return NULL;
410} /* aio_thread_loop */
411
412static void
55f0e6f7 413aio_queue_request(aio_request_t * request)
cd748f27 414{
cd748f27 415 static int high_start = 0;
55f0e6f7 416 debug(41, 9) ("aio_queue_request: %p type=%d result=%p\n",
f0debecb 417 request, request->request_type, request->resultp);
cd748f27 418 /* Mark it as not executed (failing result, no error) */
55f0e6f7 419 request->ret = -1;
420 request->err = 0;
421 /* Internal housekeeping */
422 request_queue_len += 1;
423 request->resultp->_data = request;
424 /* Play some tricks with the request_queue2 queue */
425 request->next = NULL;
426 if (!request_queue2.head) {
427 if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
428 /* Normal path */
429 *request_queue.tailp = request;
430 request_queue.tailp = &request->next;
431 pthread_cond_signal(&request_queue.cond);
432 pthread_mutex_unlock(&request_queue.mutex);
f0debecb 433 } else {
55f0e6f7 434 /* Oops, the request queue is blocked, use request_queue2 */
435 *request_queue2.tailp = request;
436 request_queue2.tailp = &request->next;
f0debecb 437 }
55f0e6f7 438 } else {
439 /* Secondary path. We have blocked requests to deal with */
440 /* add the request to the chain */
441 *request_queue2.tailp = request;
442 if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
443 /* Ok, the queue is no longer blocked */
444 *request_queue.tailp = request_queue2.head;
445 request_queue.tailp = &request->next;
446 pthread_cond_signal(&request_queue.cond);
447 pthread_mutex_unlock(&request_queue.mutex);
448 request_queue2.head = NULL;
449 request_queue2.tailp = &request_queue2.head;
450 } else {
451 /* still blocked, bump the blocked request chain */
452 request_queue2.tailp = &request->next;
453 }
454 }
455 if (request_queue2.head) {
456 static int filter = 0;
457 static int filter_limit = 8;
458 if (++filter >= filter_limit) {
459 filter_limit += filter;
460 filter = 0;
461 debug(43, 1) ("aio_queue_request: WARNING - Queue congestion\n");
462 }
cd748f27 463 }
cd748f27 464 /* Warn if out of threads */
55f0e6f7 465 if (request_queue_len > MAGIC1) {
466 static int last_warn = 0;
467 static int queue_high, queue_low;
cd748f27 468 if (high_start == 0) {
469 high_start = squid_curtime;
470 queue_high = request_queue_len;
471 queue_low = request_queue_len;
472 }
473 if (request_queue_len > queue_high)
474 queue_high = request_queue_len;
475 if (request_queue_len < queue_low)
476 queue_low = request_queue_len;
477 if (squid_curtime >= (last_warn + 15) &&
55f0e6f7 478 squid_curtime >= (high_start + 5)) {
479 debug(43, 1) ("aio_queue_request: WARNING - Disk I/O overloading\n");
480 if (squid_curtime >= (high_start + 15))
481 debug(43, 1) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n",
f0debecb 482 request_queue_len, queue_high, queue_low, squid_curtime - high_start);
cd748f27 483 last_warn = squid_curtime;
484 }
485 } else {
486 high_start = 0;
487 }
55f0e6f7 488 /* Warn if seriously overloaded */
cd748f27 489 if (request_queue_len > RIDICULOUS_LENGTH) {
490 debug(43, 0) ("aio_queue_request: Async request queue growing uncontrollably!\n");
491 debug(43, 0) ("aio_queue_request: Syncing pending I/O operations.. (blocking)\n");
492 aio_sync();
493 debug(43, 0) ("aio_queue_request: Synced\n");
494 }
495} /* aio_queue_request */
496
cd748f27 497static void
498aio_cleanup_request(aio_request_t * requestp)
499{
500 aio_result_t *resultp = requestp->resultp;
501 int cancelled = requestp->cancelled;
502
503 /* Free allocated structures and copy data back to user space if the */
504 /* request hasn't been cancelled */
505 switch (requestp->request_type) {
506 case _AIO_OP_STAT:
507 if (!cancelled && requestp->ret == 0)
508 xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
58cd5bbd 509 aio_xfree(requestp->tmpstatp, sizeof(struct stat));
55f0e6f7 510 aio_xstrfree(requestp->path);
511 break;
cd748f27 512 case _AIO_OP_OPEN:
513 if (cancelled && requestp->ret >= 0)
514 /* The open() was cancelled but completed */
515 close(requestp->ret);
55f0e6f7 516 aio_xstrfree(requestp->path);
cd748f27 517 break;
518 case _AIO_OP_CLOSE:
519 if (cancelled && requestp->ret < 0)
520 /* The close() was cancelled and never got executed */
521 close(requestp->fd);
522 break;
523 case _AIO_OP_UNLINK:
15a47d1d 524 case _AIO_OP_TRUNCATE:
cd748f27 525 case _AIO_OP_OPENDIR:
55f0e6f7 526 aio_xstrfree(requestp->path);
cd748f27 527 break;
528 case _AIO_OP_READ:
529 if (!cancelled && requestp->ret > 0)
530 xmemcpy(requestp->bufferp, requestp->tmpbufp, requestp->ret);
55f0e6f7 531 aio_xfree(requestp->tmpbufp, requestp->buflen);
532 break;
cd748f27 533 case _AIO_OP_WRITE:
58cd5bbd 534 aio_xfree(requestp->tmpbufp, requestp->buflen);
cd748f27 535 break;
536 default:
537 break;
538 }
539 if (resultp != NULL && !cancelled) {
540 resultp->aio_return = requestp->ret;
541 resultp->aio_errno = requestp->err;
542 }
543 memPoolFree(aio_request_pool, requestp);
544} /* aio_cleanup_request */
545
546
547int
548aio_cancel(aio_result_t * resultp)
549{
55f0e6f7 550 aio_request_t *request = resultp->_data;
cd748f27 551
55f0e6f7 552 if (request && request->resultp == resultp) {
553 debug(41, 9) ("aio_cancel: %p type=%d result=%p\n",
f0debecb 554 request, request->request_type, request->resultp);
55f0e6f7 555 request->cancelled = 1;
556 request->resultp = NULL;
557 resultp->_data = NULL;
558 return 0;
559 }
cd748f27 560 return 1;
561} /* aio_cancel */
562
563
564int
565aio_open(const char *path, int oflag, mode_t mode, aio_result_t * resultp)
566{
567 aio_request_t *requestp;
cd748f27 568
569 if (!aio_initialised)
570 aio_init();
55f0e6f7 571 requestp = memPoolAlloc(aio_request_pool);
572 requestp->path = (char *) aio_xstrdup(path);
cd748f27 573 requestp->oflag = oflag;
574 requestp->mode = mode;
575 requestp->resultp = resultp;
576 requestp->request_type = _AIO_OP_OPEN;
577 requestp->cancelled = 0;
578
55f0e6f7 579 aio_queue_request(requestp);
cd748f27 580 return 0;
581}
582
583
584static void
585aio_do_open(aio_request_t * requestp)
586{
587 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
588 requestp->err = errno;
589}
590
591
592int
593aio_read(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * resultp)
594{
595 aio_request_t *requestp;
596
597 if (!aio_initialised)
598 aio_init();
55f0e6f7 599 requestp = memPoolAlloc(aio_request_pool);
cd748f27 600 requestp->fd = fd;
601 requestp->bufferp = bufp;
58cd5bbd 602 requestp->tmpbufp = (char *) aio_xmalloc(bufs);
cd748f27 603 requestp->buflen = bufs;
604 requestp->offset = offset;
605 requestp->whence = whence;
606 requestp->resultp = resultp;
607 requestp->request_type = _AIO_OP_READ;
608 requestp->cancelled = 0;
609
55f0e6f7 610 aio_queue_request(requestp);
cd748f27 611 return 0;
612}
613
614
615static void
616aio_do_read(aio_request_t * requestp)
617{
618 lseek(requestp->fd, requestp->offset, requestp->whence);
619 requestp->ret = read(requestp->fd, requestp->tmpbufp, requestp->buflen);
620 requestp->err = errno;
621}
622
623
624int
625aio_write(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * resultp)
626{
627 aio_request_t *requestp;
628
629 if (!aio_initialised)
630 aio_init();
55f0e6f7 631 requestp = memPoolAlloc(aio_request_pool);
cd748f27 632 requestp->fd = fd;
58cd5bbd 633 requestp->tmpbufp = (char *) aio_xmalloc(bufs);
cd748f27 634 xmemcpy(requestp->tmpbufp, bufp, bufs);
635 requestp->buflen = bufs;
636 requestp->offset = offset;
637 requestp->whence = whence;
638 requestp->resultp = resultp;
639 requestp->request_type = _AIO_OP_WRITE;
640 requestp->cancelled = 0;
641
55f0e6f7 642 aio_queue_request(requestp);
cd748f27 643 return 0;
644}
645
646
647static void
648aio_do_write(aio_request_t * requestp)
649{
650 requestp->ret = write(requestp->fd, requestp->tmpbufp, requestp->buflen);
651 requestp->err = errno;
652}
653
654
655int
656aio_close(int fd, aio_result_t * resultp)
657{
658 aio_request_t *requestp;
659
660 if (!aio_initialised)
661 aio_init();
55f0e6f7 662 requestp = memPoolAlloc(aio_request_pool);
cd748f27 663 requestp->fd = fd;
664 requestp->resultp = resultp;
665 requestp->request_type = _AIO_OP_CLOSE;
666 requestp->cancelled = 0;
667
55f0e6f7 668 aio_queue_request(requestp);
cd748f27 669 return 0;
670}
671
672
673static void
674aio_do_close(aio_request_t * requestp)
675{
676 requestp->ret = close(requestp->fd);
677 requestp->err = errno;
678}
679
680
681int
682aio_stat(const char *path, struct stat *sb, aio_result_t * resultp)
683{
684 aio_request_t *requestp;
cd748f27 685
686 if (!aio_initialised)
687 aio_init();
55f0e6f7 688 requestp = memPoolAlloc(aio_request_pool);
689 requestp->path = (char *) aio_xstrdup(path);
cd748f27 690 requestp->statp = sb;
58cd5bbd 691 requestp->tmpstatp = (struct stat *) aio_xmalloc(sizeof(struct stat));
cd748f27 692 requestp->resultp = resultp;
693 requestp->request_type = _AIO_OP_STAT;
694 requestp->cancelled = 0;
695
55f0e6f7 696 aio_queue_request(requestp);
cd748f27 697 return 0;
698}
699
700
701static void
702aio_do_stat(aio_request_t * requestp)
703{
704 requestp->ret = stat(requestp->path, requestp->tmpstatp);
705 requestp->err = errno;
706}
707
708
709int
710aio_unlink(const char *path, aio_result_t * resultp)
711{
712 aio_request_t *requestp;
cd748f27 713
714 if (!aio_initialised)
715 aio_init();
55f0e6f7 716 requestp = memPoolAlloc(aio_request_pool);
717 requestp->path = aio_xstrdup(path);
cd748f27 718 requestp->resultp = resultp;
719 requestp->request_type = _AIO_OP_UNLINK;
720 requestp->cancelled = 0;
721
55f0e6f7 722 aio_queue_request(requestp);
cd748f27 723 return 0;
724}
725
726
727static void
728aio_do_unlink(aio_request_t * requestp)
729{
730 requestp->ret = unlink(requestp->path);
731 requestp->err = errno;
732}
733
15a47d1d 734int
735aio_truncate(const char *path, off_t length, aio_result_t * resultp)
736{
737 aio_request_t *requestp;
15a47d1d 738
739 if (!aio_initialised)
740 aio_init();
55f0e6f7 741 requestp = memPoolAlloc(aio_request_pool);
742 requestp->path = (char *) aio_xstrdup(path);
15a47d1d 743 requestp->offset = length;
15a47d1d 744 requestp->resultp = resultp;
745 requestp->request_type = _AIO_OP_TRUNCATE;
746 requestp->cancelled = 0;
747
55f0e6f7 748 aio_queue_request(requestp);
15a47d1d 749 return 0;
750}
751
752
753static void
754aio_do_truncate(aio_request_t * requestp)
755{
756 requestp->ret = truncate(requestp->path, requestp->offset);
757 requestp->err = errno;
758}
759
cd748f27 760
761#if AIO_OPENDIR
55f0e6f7 762/* XXX aio_opendir NOT implemented yet.. */
cd748f27 763
764int
765aio_opendir(const char *path, aio_result_t * resultp)
766{
767 aio_request_t *requestp;
768 int len;
769
770 if (!aio_initialised)
771 aio_init();
55f0e6f7 772 requestp = memPoolAlloc(aio_request_pool);
cd748f27 773 return -1;
774}
775
776static void
777aio_do_opendir(aio_request_t * requestp)
778{
779 /* NOT IMPLEMENTED */
780}
781
782#endif
783
55f0e6f7 784static void
785aio_poll_queues(void)
786{
787 /* kick "overflow" request queue */
788 if (request_queue2.head &&
4672d0cd 789 pthread_mutex_trylock(&request_queue.mutex) == 0) {
55f0e6f7 790 *request_queue.tailp = request_queue2.head;
791 request_queue.tailp = request_queue2.tailp;
792 pthread_cond_signal(&request_queue.cond);
793 pthread_mutex_unlock(&request_queue.mutex);
794 request_queue2.head = NULL;
795 request_queue2.tailp = &request_queue2.head;
796 }
797 /* poll done queue */
798 if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) {
799 struct aio_request_t *requests = done_queue.head;
800 done_queue.head = NULL;
801 done_queue.tailp = &done_queue.head;
802 pthread_mutex_unlock(&done_queue.mutex);
803 *done_requests.tailp = requests;
804 request_queue_len -= 1;
4672d0cd 805 while (requests->next) {
55f0e6f7 806 requests = requests->next;
807 request_queue_len -= 1;
cd748f27 808 }
55f0e6f7 809 done_requests.tailp = &requests->next;
810 }
811 /* Give up the CPU to allow the threads to do their work */
4672d0cd 812 /*
813 * For Andres thoughts about yield(), see
814 * http://www.squid-cache.org/mail-archive/squid-dev/200012/0001.html
815 */
55f0e6f7 816 if (done_queue.head || request_queue.head)
4672d0cd 817#ifndef _SQUID_SOLARIS_
55f0e6f7 818 sched_yield();
4672d0cd 819#else
820 yield();
821#endif
55f0e6f7 822}
cd748f27 823
824aio_result_t *
825aio_poll_done(void)
826{
55f0e6f7 827 aio_request_t *request;
cd748f27 828 aio_result_t *resultp;
829 int cancelled;
55f0e6f7 830 int polled = 0;
cd748f27 831
832 AIO_REPOLL:
55f0e6f7 833 request = done_requests.head;
834 if (request == NULL && !polled) {
835 aio_poll_queues();
836 polled = 1;
837 request = done_requests.head;
838 }
839 if (!request) {
cd748f27 840 return NULL;
841 }
55f0e6f7 842 debug(41, 9) ("aio_poll_done: %p type=%d result=%p\n",
f0debecb 843 request, request->request_type, request->resultp);
55f0e6f7 844 done_requests.head = request->next;
845 if (!done_requests.head)
846 done_requests.tailp = &done_requests.head;
847 resultp = request->resultp;
848 cancelled = request->cancelled;
849 aio_debug(request);
850 debug(43, 5) ("DONE: %d -> %d\n", request->ret, request->err);
851 aio_cleanup_request(request);
cd748f27 852 if (cancelled)
853 goto AIO_REPOLL;
854 return resultp;
855} /* aio_poll_done */
856
857int
858aio_operations_pending(void)
859{
55f0e6f7 860 return request_queue_len + (done_requests.head ? 1 : 0);
cd748f27 861}
862
863int
864aio_sync(void)
865{
55f0e6f7 866 /* XXX This might take a while if the queue is large.. */
cd748f27 867 do {
55f0e6f7 868 aio_poll_queues();
cd748f27 869 } while (request_queue_len > 0);
870 return aio_operations_pending();
871}
872
873int
874aio_get_queue_len(void)
875{
876 return request_queue_len;
877}
878
879static void
55f0e6f7 880aio_debug(aio_request_t * request)
cd748f27 881{
55f0e6f7 882 switch (request->request_type) {
cd748f27 883 case _AIO_OP_OPEN:
55f0e6f7 884 debug(43, 5) ("OPEN of %s to FD %d\n", request->path, request->ret);
cd748f27 885 break;
886 case _AIO_OP_READ:
55f0e6f7 887 debug(43, 5) ("READ on fd: %d\n", request->fd);
cd748f27 888 break;
889 case _AIO_OP_WRITE:
55f0e6f7 890 debug(43, 5) ("WRITE on fd: %d\n", request->fd);
cd748f27 891 break;
892 case _AIO_OP_CLOSE:
55f0e6f7 893 debug(43, 5) ("CLOSE of fd: %d\n", request->fd);
cd748f27 894 break;
895 case _AIO_OP_UNLINK:
55f0e6f7 896 debug(43, 5) ("UNLINK of %s\n", request->path);
cd748f27 897 break;
15a47d1d 898 case _AIO_OP_TRUNCATE:
55f0e6f7 899 debug(43, 5) ("UNLINK of %s\n", request->path);
15a47d1d 900 break;
cd748f27 901 default:
902 break;
903 }
904}