]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/DiskThreads/aiops.cc
Cleanup: zap CVS Id tags
[thirdparty/squid.git] / src / DiskIO / DiskThreads / aiops.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 43 AIOPS
5 * AUTHOR: Stewart Forster <slf@connect.com.au>
6 *
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
9 *
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
23 *
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 */
34
35 #ifndef _REENTRANT
36 #error "_REENTRANT MUST be defined to build squid async io support."
37 #endif
38
39 #include "squid.h"
40 #include "DiskThreads.h"
41
42 #include <stdio.h>
43 #include <sys/types.h>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46 #include <pthread.h>
47 #include <errno.h>
48 #include <dirent.h>
49 #include <signal.h>
50 #if HAVE_SCHED_H
51 #include <sched.h>
52 #endif
53 #include "CommIO.h"
54 #include "SquidTime.h"
55 #include "Store.h"
56
57 #define RIDICULOUS_LENGTH 4096
58
59 enum _squidaio_thread_status {
60 _THREAD_STARTING = 0,
61 _THREAD_WAITING,
62 _THREAD_BUSY,
63 _THREAD_FAILED,
64 _THREAD_DONE
65 };
66 typedef enum _squidaio_thread_status squidaio_thread_status;
67
68 typedef struct squidaio_request_t {
69
70 struct squidaio_request_t *next;
71 squidaio_request_type request_type;
72 int cancelled;
73 char *path;
74 int oflag;
75 mode_t mode;
76 int fd;
77 char *bufferp;
78 size_t buflen;
79 off_t offset;
80 int whence;
81 int ret;
82 int err;
83
84 struct stat *tmpstatp;
85
86 struct stat *statp;
87 squidaio_result_t *resultp;
88 } squidaio_request_t;
89
90 typedef struct squidaio_request_queue_t {
91 pthread_mutex_t mutex;
92 pthread_cond_t cond;
93 squidaio_request_t *volatile head;
94 squidaio_request_t *volatile *volatile tailp;
95 unsigned long requests;
96 unsigned long blocked; /* main failed to lock the queue */
97 } squidaio_request_queue_t;
98
99 typedef struct squidaio_thread_t squidaio_thread_t;
100
101 struct squidaio_thread_t {
102 squidaio_thread_t *next;
103 pthread_t thread;
104 squidaio_thread_status status;
105
106 struct squidaio_request_t *current_req;
107 unsigned long requests;
108 };
109
110 static void squidaio_queue_request(squidaio_request_t *);
111 static void squidaio_cleanup_request(squidaio_request_t *);
112 static void *squidaio_thread_loop(void *);
113 static void squidaio_do_open(squidaio_request_t *);
114 static void squidaio_do_read(squidaio_request_t *);
115 static void squidaio_do_write(squidaio_request_t *);
116 static void squidaio_do_close(squidaio_request_t *);
117 static void squidaio_do_stat(squidaio_request_t *);
118 static void squidaio_do_unlink(squidaio_request_t *);
119 #if AIO_OPENDIR
120 static void *squidaio_do_opendir(squidaio_request_t *);
121 #endif
122 static void squidaio_debug(squidaio_request_t *);
123 static void squidaio_poll_queues(void);
124
125 static squidaio_thread_t *threads = NULL;
126 static int squidaio_initialised = 0;
127
128
129 #define AIO_LARGE_BUFS 16384
130 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
131 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
132 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
133 #define AIO_MICRO_BUFS 128
134
135 static MemAllocator *squidaio_large_bufs = NULL; /* 16K */
136 static MemAllocator *squidaio_medium_bufs = NULL; /* 8K */
137 static MemAllocator *squidaio_small_bufs = NULL; /* 4K */
138 static MemAllocator *squidaio_tiny_bufs = NULL; /* 2K */
139 static MemAllocator *squidaio_micro_bufs = NULL; /* 128K */
140
141 static int request_queue_len = 0;
142 static MemAllocator *squidaio_request_pool = NULL;
143 static MemAllocator *squidaio_thread_pool = NULL;
144 static squidaio_request_queue_t request_queue;
145
146 static struct {
147 squidaio_request_t *head, **tailp;
148 }
149
150 request_queue2 = {
151
152 NULL, &request_queue2.head
153 };
154 static squidaio_request_queue_t done_queue;
155
156 static struct {
157 squidaio_request_t *head, **tailp;
158 }
159
160 done_requests = {
161
162 NULL, &done_requests.head
163 };
164 static pthread_attr_t globattr;
165 #if HAVE_SCHED_H
166
167 static struct sched_param globsched;
168 #endif
169 static pthread_t main_thread;
170
171 static MemAllocator *
172 squidaio_get_pool(int size)
173 {
174 if (size <= AIO_LARGE_BUFS) {
175 if (size <= AIO_MICRO_BUFS)
176 return squidaio_micro_bufs;
177 else if (size <= AIO_TINY_BUFS)
178 return squidaio_tiny_bufs;
179 else if (size <= AIO_SMALL_BUFS)
180 return squidaio_small_bufs;
181 else if (size <= AIO_MEDIUM_BUFS)
182 return squidaio_medium_bufs;
183 else
184 return squidaio_large_bufs;
185 }
186
187 return NULL;
188 }
189
190 void *
191 squidaio_xmalloc(int size)
192 {
193 void *p;
194 MemAllocator *pool;
195
196 if ((pool = squidaio_get_pool(size)) != NULL) {
197 p = pool->alloc();
198 } else
199 p = xmalloc(size);
200
201 return p;
202 }
203
204 static char *
205 squidaio_xstrdup(const char *str)
206 {
207 char *p;
208 int len = strlen(str) + 1;
209
210 p = (char *)squidaio_xmalloc(len);
211 strncpy(p, str, len);
212
213 return p;
214 }
215
216 void
217 squidaio_xfree(void *p, int size)
218 {
219 MemAllocator *pool;
220
221 if ((pool = squidaio_get_pool(size)) != NULL) {
222 pool->free(p);
223 } else
224 xfree(p);
225 }
226
227 static void
228 squidaio_xstrfree(char *str)
229 {
230 MemAllocator *pool;
231 int len = strlen(str) + 1;
232
233 if ((pool = squidaio_get_pool(len)) != NULL) {
234 pool->free(str);
235 } else
236 xfree(str);
237 }
238
239 void
240 squidaio_init(void)
241 {
242 int i;
243 squidaio_thread_t *threadp;
244
245 if (squidaio_initialised)
246 return;
247
248 pthread_attr_init(&globattr);
249
250 #if HAVE_PTHREAD_ATTR_SETSCOPE
251
252 pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
253
254 #endif
255 #if HAVE_SCHED_H
256
257 globsched.sched_priority = 1;
258
259 #endif
260
261 main_thread = pthread_self();
262
263 #if HAVE_SCHED_H && HAVE_PTHREAD_SETSCHEDPARAM
264
265 pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
266
267 #endif
268 #if HAVE_SCHED_H
269
270 globsched.sched_priority = 2;
271
272 #endif
273 #if HAVE_SCHED_H && HAVE_PTHREAD_ATTR_SETSCHEDPARAM
274
275 pthread_attr_setschedparam(&globattr, &globsched);
276
277 #endif
278
279 /* Give each thread a smaller 256KB stack, should be more than sufficient */
280 pthread_attr_setstacksize(&globattr, 256 * 1024);
281
282 /* Initialize request queue */
283 if (pthread_mutex_init(&(request_queue.mutex), NULL))
284 fatal("Failed to create mutex");
285
286 if (pthread_cond_init(&(request_queue.cond), NULL))
287 fatal("Failed to create condition variable");
288
289 request_queue.head = NULL;
290
291 request_queue.tailp = &request_queue.head;
292
293 request_queue.requests = 0;
294
295 request_queue.blocked = 0;
296
297 /* Initialize done queue */
298 if (pthread_mutex_init(&(done_queue.mutex), NULL))
299 fatal("Failed to create mutex");
300
301 if (pthread_cond_init(&(done_queue.cond), NULL))
302 fatal("Failed to create condition variable");
303
304 done_queue.head = NULL;
305
306 done_queue.tailp = &done_queue.head;
307
308 done_queue.requests = 0;
309
310 done_queue.blocked = 0;
311
312 /* Create threads and get them to sit in their wait loop */
313 squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
314
315 assert(NUMTHREADS);
316
317 for (i = 0; i < NUMTHREADS; i++) {
318 threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
319 threadp->status = _THREAD_STARTING;
320 threadp->current_req = NULL;
321 threadp->requests = 0;
322 threadp->next = threads;
323 threads = threadp;
324
325 if (pthread_create(&threadp->thread, &globattr, squidaio_thread_loop, threadp)) {
326 fprintf(stderr, "Thread creation failed\n");
327 threadp->status = _THREAD_FAILED;
328 continue;
329 }
330 }
331
332 /* Create request pool */
333 squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
334
335 squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
336
337 squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
338
339 squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
340
341 squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
342
343 squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
344
345 squidaio_initialised = 1;
346 }
347
348 void
349 squidaio_shutdown(void)
350 {
351 if (!squidaio_initialised)
352 return;
353
354 /* This is the same as in squidaio_sync */
355 do {
356 squidaio_poll_queues();
357 } while (request_queue_len > 0);
358
359 CommIO::NotifyIOClose();
360
361 squidaio_initialised = 0;
362 }
363
364 static void *
365 squidaio_thread_loop(void *ptr)
366 {
367 squidaio_thread_t *threadp = (squidaio_thread_t *)ptr;
368 squidaio_request_t *request;
369 sigset_t newSig;
370
371 /*
372 * Make sure to ignore signals which may possibly get sent to
373 * the parent squid thread. Causes havoc with mutex's and
374 * condition waits otherwise
375 */
376
377 sigemptyset(&newSig);
378 sigaddset(&newSig, SIGPIPE);
379 sigaddset(&newSig, SIGCHLD);
380 #ifdef _SQUID_LINUX_THREADS_
381
382 sigaddset(&newSig, SIGQUIT);
383 sigaddset(&newSig, SIGTRAP);
384 #else
385
386 sigaddset(&newSig, SIGUSR1);
387 sigaddset(&newSig, SIGUSR2);
388 #endif
389
390 sigaddset(&newSig, SIGHUP);
391 sigaddset(&newSig, SIGTERM);
392 sigaddset(&newSig, SIGINT);
393 sigaddset(&newSig, SIGALRM);
394 pthread_sigmask(SIG_BLOCK, &newSig, NULL);
395
396 while (1) {
397 threadp->current_req = request = NULL;
398 request = NULL;
399 /* Get a request to process */
400 threadp->status = _THREAD_WAITING;
401 pthread_mutex_lock(&request_queue.mutex);
402
403 while (!request_queue.head) {
404 pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
405 }
406
407 request = request_queue.head;
408
409 if (request)
410 request_queue.head = request->next;
411
412 if (!request_queue.head)
413 request_queue.tailp = &request_queue.head;
414
415 pthread_mutex_unlock(&request_queue.mutex);
416
417 /* process the request */
418 threadp->status = _THREAD_BUSY;
419
420 request->next = NULL;
421
422 threadp->current_req = request;
423
424 errno = 0;
425
426 if (!request->cancelled) {
427 switch (request->request_type) {
428
429 case _AIO_OP_OPEN:
430 squidaio_do_open(request);
431 break;
432
433 case _AIO_OP_READ:
434 squidaio_do_read(request);
435 break;
436
437 case _AIO_OP_WRITE:
438 squidaio_do_write(request);
439 break;
440
441 case _AIO_OP_CLOSE:
442 squidaio_do_close(request);
443 break;
444
445 case _AIO_OP_UNLINK:
446 squidaio_do_unlink(request);
447 break;
448
449 #if AIO_OPENDIR /* Opendir not implemented yet */
450
451 case _AIO_OP_OPENDIR:
452 squidaio_do_opendir(request);
453 break;
454 #endif
455
456 case _AIO_OP_STAT:
457 squidaio_do_stat(request);
458 break;
459
460 default:
461 request->ret = -1;
462 request->err = EINVAL;
463 break;
464 }
465 } else { /* cancelled */
466 request->ret = -1;
467 request->err = EINTR;
468 }
469
470 threadp->status = _THREAD_DONE;
471 /* put the request in the done queue */
472 pthread_mutex_lock(&done_queue.mutex);
473 *done_queue.tailp = request;
474 done_queue.tailp = &request->next;
475 pthread_mutex_unlock(&done_queue.mutex);
476 CommIO::NotifyIOCompleted();
477 threadp->requests++;
478 } /* while forever */
479
480 return NULL;
481 } /* squidaio_thread_loop */
482
483 static void
484 squidaio_queue_request(squidaio_request_t * request)
485 {
486 static int high_start = 0;
487 debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
488 /* Mark it as not executed (failing result, no error) */
489 request->ret = -1;
490 request->err = 0;
491 /* Internal housekeeping */
492 request_queue_len += 1;
493 request->resultp->_data = request;
494 /* Play some tricks with the request_queue2 queue */
495 request->next = NULL;
496
497 if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
498 if (request_queue2.head) {
499 /* Grab blocked requests */
500 *request_queue.tailp = request_queue2.head;
501 request_queue.tailp = request_queue2.tailp;
502 }
503
504 /* Enqueue request */
505 *request_queue.tailp = request;
506
507 request_queue.tailp = &request->next;
508
509 pthread_cond_signal(&request_queue.cond);
510
511 pthread_mutex_unlock(&request_queue.mutex);
512
513 if (request_queue2.head) {
514 /* Clear queue of blocked requests */
515 request_queue2.head = NULL;
516 request_queue2.tailp = &request_queue2.head;
517 }
518 } else {
519 /* Oops, the request queue is blocked, use request_queue2 */
520 *request_queue2.tailp = request;
521 request_queue2.tailp = &request->next;
522 }
523
524 if (request_queue2.head) {
525 static int filter = 0;
526 static int filter_limit = 8;
527
528 if (++filter >= filter_limit) {
529 filter_limit += filter;
530 filter = 0;
531 debugs(43, 1, "squidaio_queue_request: WARNING - Queue congestion");
532 }
533 }
534
535 /* Warn if out of threads */
536 if (request_queue_len > MAGIC1) {
537 static int last_warn = 0;
538 static int queue_high, queue_low;
539
540 if (high_start == 0) {
541 high_start = squid_curtime;
542 queue_high = request_queue_len;
543 queue_low = request_queue_len;
544 }
545
546 if (request_queue_len > queue_high)
547 queue_high = request_queue_len;
548
549 if (request_queue_len < queue_low)
550 queue_low = request_queue_len;
551
552 if (squid_curtime >= (last_warn + 15) &&
553 squid_curtime >= (high_start + 5)) {
554 debugs(43, 1, "squidaio_queue_request: WARNING - Disk I/O overloading");
555
556 if (squid_curtime >= (high_start + 15))
557 debugs(43, 1, "squidaio_queue_request: Queue Length: current=" <<
558 request_queue_len << ", high=" << queue_high <<
559 ", low=" << queue_low << ", duration=" <<
560 (long int) (squid_curtime - high_start));
561
562 last_warn = squid_curtime;
563 }
564 } else {
565 high_start = 0;
566 }
567
568 /* Warn if seriously overloaded */
569 if (request_queue_len > RIDICULOUS_LENGTH) {
570 debugs(43, 0, "squidaio_queue_request: Async request queue growing uncontrollably!");
571 debugs(43, 0, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
572 squidaio_sync();
573 debugs(43, 0, "squidaio_queue_request: Synced");
574 }
575 } /* squidaio_queue_request */
576
577 static void
578 squidaio_cleanup_request(squidaio_request_t * requestp)
579 {
580 squidaio_result_t *resultp = requestp->resultp;
581 int cancelled = requestp->cancelled;
582
583 /* Free allocated structures and copy data back to user space if the */
584 /* request hasn't been cancelled */
585
586 switch (requestp->request_type) {
587
588 case _AIO_OP_STAT:
589
590 if (!cancelled && requestp->ret == 0)
591
592 xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
593
594 squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
595
596 squidaio_xstrfree(requestp->path);
597
598 break;
599
600 case _AIO_OP_OPEN:
601 if (cancelled && requestp->ret >= 0)
602 /* The open() was cancelled but completed */
603 close(requestp->ret);
604
605 squidaio_xstrfree(requestp->path);
606
607 break;
608
609 case _AIO_OP_CLOSE:
610 if (cancelled && requestp->ret < 0)
611 /* The close() was cancelled and never got executed */
612 close(requestp->fd);
613
614 break;
615
616 case _AIO_OP_UNLINK:
617
618 case _AIO_OP_OPENDIR:
619 squidaio_xstrfree(requestp->path);
620
621 break;
622
623 case _AIO_OP_READ:
624 break;
625
626 case _AIO_OP_WRITE:
627 break;
628
629 default:
630 break;
631 }
632
633 if (resultp != NULL && !cancelled) {
634 resultp->aio_return = requestp->ret;
635 resultp->aio_errno = requestp->err;
636 }
637
638 squidaio_request_pool->free(requestp);
639 } /* squidaio_cleanup_request */
640
641
642 int
643 squidaio_cancel(squidaio_result_t * resultp)
644 {
645 squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
646
647 if (request && request->resultp == resultp) {
648 debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
649 request->cancelled = 1;
650 request->resultp = NULL;
651 resultp->_data = NULL;
652 resultp->result_type = _AIO_OP_NONE;
653 return 0;
654 }
655
656 return 1;
657 } /* squidaio_cancel */
658
659
660 int
661 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
662 {
663 squidaio_init();
664 squidaio_request_t *requestp;
665
666 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
667
668 requestp->path = (char *) squidaio_xstrdup(path);
669
670 requestp->oflag = oflag;
671
672 requestp->mode = mode;
673
674 requestp->resultp = resultp;
675
676 requestp->request_type = _AIO_OP_OPEN;
677
678 requestp->cancelled = 0;
679
680 resultp->result_type = _AIO_OP_OPEN;
681
682 squidaio_queue_request(requestp);
683
684 return 0;
685 }
686
687
688 static void
689 squidaio_do_open(squidaio_request_t * requestp)
690 {
691 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
692 requestp->err = errno;
693 }
694
695
696 int
697 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
698 {
699 squidaio_request_t *requestp;
700
701 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
702
703 requestp->fd = fd;
704
705 requestp->bufferp = bufp;
706
707 requestp->buflen = bufs;
708
709 requestp->offset = offset;
710
711 requestp->whence = whence;
712
713 requestp->resultp = resultp;
714
715 requestp->request_type = _AIO_OP_READ;
716
717 requestp->cancelled = 0;
718
719 resultp->result_type = _AIO_OP_READ;
720
721 squidaio_queue_request(requestp);
722
723 return 0;
724 }
725
726
727 static void
728 squidaio_do_read(squidaio_request_t * requestp)
729 {
730 lseek(requestp->fd, requestp->offset, requestp->whence);
731 requestp->ret = read(requestp->fd, requestp->bufferp, requestp->buflen);
732 requestp->err = errno;
733 }
734
735
736 int
737 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
738 {
739 squidaio_request_t *requestp;
740
741 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
742
743 requestp->fd = fd;
744
745 requestp->bufferp = bufp;
746
747 requestp->buflen = bufs;
748
749 requestp->offset = offset;
750
751 requestp->whence = whence;
752
753 requestp->resultp = resultp;
754
755 requestp->request_type = _AIO_OP_WRITE;
756
757 requestp->cancelled = 0;
758
759 resultp->result_type = _AIO_OP_WRITE;
760
761 squidaio_queue_request(requestp);
762
763 return 0;
764 }
765
766
767 static void
768 squidaio_do_write(squidaio_request_t * requestp)
769 {
770 requestp->ret = write(requestp->fd, requestp->bufferp, requestp->buflen);
771 requestp->err = errno;
772 }
773
774
775 int
776 squidaio_close(int fd, squidaio_result_t * resultp)
777 {
778 squidaio_request_t *requestp;
779
780 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
781
782 requestp->fd = fd;
783
784 requestp->resultp = resultp;
785
786 requestp->request_type = _AIO_OP_CLOSE;
787
788 requestp->cancelled = 0;
789
790 resultp->result_type = _AIO_OP_CLOSE;
791
792 squidaio_queue_request(requestp);
793
794 return 0;
795 }
796
797
798 static void
799 squidaio_do_close(squidaio_request_t * requestp)
800 {
801 requestp->ret = close(requestp->fd);
802 requestp->err = errno;
803 }
804
805
806 int
807
808 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
809 {
810 squidaio_init();
811 squidaio_request_t *requestp;
812
813 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
814
815 requestp->path = (char *) squidaio_xstrdup(path);
816
817 requestp->statp = sb;
818
819 requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
820
821 requestp->resultp = resultp;
822
823 requestp->request_type = _AIO_OP_STAT;
824
825 requestp->cancelled = 0;
826
827 resultp->result_type = _AIO_OP_STAT;
828
829 squidaio_queue_request(requestp);
830
831 return 0;
832 }
833
834
835 static void
836 squidaio_do_stat(squidaio_request_t * requestp)
837 {
838 requestp->ret = stat(requestp->path, requestp->tmpstatp);
839 requestp->err = errno;
840 }
841
842
843 int
844 squidaio_unlink(const char *path, squidaio_result_t * resultp)
845 {
846 squidaio_init();
847 squidaio_request_t *requestp;
848
849 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
850
851 requestp->path = squidaio_xstrdup(path);
852
853 requestp->resultp = resultp;
854
855 requestp->request_type = _AIO_OP_UNLINK;
856
857 requestp->cancelled = 0;
858
859 resultp->result_type = _AIO_OP_UNLINK;
860
861 squidaio_queue_request(requestp);
862
863 return 0;
864 }
865
866
867 static void
868 squidaio_do_unlink(squidaio_request_t * requestp)
869 {
870 requestp->ret = unlink(requestp->path);
871 requestp->err = errno;
872 }
873
874 #if AIO_OPENDIR
875 /* XXX squidaio_opendir NOT implemented yet.. */
876
877 int
878 squidaio_opendir(const char *path, squidaio_result_t * resultp)
879 {
880 squidaio_request_t *requestp;
881 int len;
882
883 requestp = squidaio_request_pool->alloc();
884
885 resultp->result_type = _AIO_OP_OPENDIR;
886
887 return -1;
888 }
889
890 static void
891 squidaio_do_opendir(squidaio_request_t * requestp)
892 {
893 /* NOT IMPLEMENTED */
894 }
895
896 #endif
897
898 static void
899 squidaio_poll_queues(void)
900 {
901 /* kick "overflow" request queue */
902
903 if (request_queue2.head &&
904 pthread_mutex_trylock(&request_queue.mutex) == 0) {
905 *request_queue.tailp = request_queue2.head;
906 request_queue.tailp = request_queue2.tailp;
907 pthread_cond_signal(&request_queue.cond);
908 pthread_mutex_unlock(&request_queue.mutex);
909 request_queue2.head = NULL;
910 request_queue2.tailp = &request_queue2.head;
911 }
912
913 /* poll done queue */
914 if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) {
915
916 struct squidaio_request_t *requests = done_queue.head;
917 done_queue.head = NULL;
918 done_queue.tailp = &done_queue.head;
919 pthread_mutex_unlock(&done_queue.mutex);
920 *done_requests.tailp = requests;
921 request_queue_len -= 1;
922
923 while (requests->next) {
924 requests = requests->next;
925 request_queue_len -= 1;
926 }
927
928 done_requests.tailp = &requests->next;
929 }
930 }
931
932 squidaio_result_t *
933 squidaio_poll_done(void)
934 {
935 squidaio_request_t *request;
936 squidaio_result_t *resultp;
937 int cancelled;
938 int polled = 0;
939
940 AIO_REPOLL:
941 request = done_requests.head;
942
943 if (request == NULL && !polled) {
944 CommIO::ResetNotifications();
945 squidaio_poll_queues();
946 polled = 1;
947 request = done_requests.head;
948 }
949
950 if (!request) {
951 return NULL;
952 }
953
954 debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
955 done_requests.head = request->next;
956
957 if (!done_requests.head)
958 done_requests.tailp = &done_requests.head;
959
960 resultp = request->resultp;
961
962 cancelled = request->cancelled;
963
964 squidaio_debug(request);
965
966 debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
967
968 squidaio_cleanup_request(request);
969
970 if (cancelled)
971 goto AIO_REPOLL;
972
973 return resultp;
974 } /* squidaio_poll_done */
975
976 int
977 squidaio_operations_pending(void)
978 {
979 return request_queue_len + (done_requests.head ? 1 : 0);
980 }
981
982 int
983 squidaio_sync(void)
984 {
985 /* XXX This might take a while if the queue is large.. */
986
987 do {
988 squidaio_poll_queues();
989 } while (request_queue_len > 0);
990
991 return squidaio_operations_pending();
992 }
993
994 int
995 squidaio_get_queue_len(void)
996 {
997 return request_queue_len;
998 }
999
1000 static void
1001 squidaio_debug(squidaio_request_t * request)
1002 {
1003 switch (request->request_type) {
1004
1005 case _AIO_OP_OPEN:
1006 debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1007 break;
1008
1009 case _AIO_OP_READ:
1010 debugs(43, 5, "READ on fd: " << request->fd);
1011 break;
1012
1013 case _AIO_OP_WRITE:
1014 debugs(43, 5, "WRITE on fd: " << request->fd);
1015 break;
1016
1017 case _AIO_OP_CLOSE:
1018 debugs(43, 5, "CLOSE of fd: " << request->fd);
1019 break;
1020
1021 case _AIO_OP_UNLINK:
1022 debugs(43, 5, "UNLINK of " << request->path);
1023 break;
1024
1025 default:
1026 break;
1027 }
1028 }
1029
1030 void
1031 squidaio_stats(StoreEntry * sentry)
1032 {
1033 squidaio_thread_t *threadp;
1034 int i;
1035
1036 if (!squidaio_initialised)
1037 return;
1038
1039 storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1040
1041 storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1042
1043 threadp = threads;
1044
1045 for (i = 0; i < NUMTHREADS; i++) {
1046 storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, (unsigned long)threadp->thread, threadp->requests);
1047 threadp = threadp->next;
1048 }
1049 }