]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/DiskThreads/aiops.cc
Renamed squid.h to squid-old.h and config.h to squid.h
[thirdparty/squid.git] / src / DiskIO / DiskThreads / aiops.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 43 AIOPS
5 * AUTHOR: Stewart Forster <slf@connect.com.au>
6 *
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
9 *
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
23 *
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 */
34
35 #ifndef _REENTRANT
36 #error "_REENTRANT MUST be defined to build squid async io support."
37 #endif
38
39 #include "squid-old.h"
40 #include "DiskThreads.h"
41
42 #include <stdio.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <pthread.h>
46 #include <errno.h>
47 #include <dirent.h>
48 #include <signal.h>
49 #if HAVE_SCHED_H
50 #include <sched.h>
51 #endif
52 #include "CommIO.h"
53 #include "SquidTime.h"
54 #include "Store.h"
55
56 #define RIDICULOUS_LENGTH 4096
57
58 enum _squidaio_thread_status {
59 _THREAD_STARTING = 0,
60 _THREAD_WAITING,
61 _THREAD_BUSY,
62 _THREAD_FAILED,
63 _THREAD_DONE
64 };
65 typedef enum _squidaio_thread_status squidaio_thread_status;
66
67 typedef struct squidaio_request_t {
68
69 struct squidaio_request_t *next;
70 squidaio_request_type request_type;
71 int cancelled;
72 char *path;
73 int oflag;
74 mode_t mode;
75 int fd;
76 char *bufferp;
77 size_t buflen;
78 off_t offset;
79 int whence;
80 int ret;
81 int err;
82
83 struct stat *tmpstatp;
84
85 struct stat *statp;
86 squidaio_result_t *resultp;
87 } squidaio_request_t;
88
89 typedef struct squidaio_request_queue_t {
90 pthread_mutex_t mutex;
91 pthread_cond_t cond;
92 squidaio_request_t *volatile head;
93 squidaio_request_t *volatile *volatile tailp;
94 unsigned long requests;
95 unsigned long blocked; /* main failed to lock the queue */
96 } squidaio_request_queue_t;
97
98 typedef struct squidaio_thread_t squidaio_thread_t;
99
100 struct squidaio_thread_t {
101 squidaio_thread_t *next;
102 pthread_t thread;
103 squidaio_thread_status status;
104
105 struct squidaio_request_t *current_req;
106 unsigned long requests;
107 };
108
109 static void squidaio_queue_request(squidaio_request_t *);
110 static void squidaio_cleanup_request(squidaio_request_t *);
111 SQUIDCEXTERN void *squidaio_thread_loop(void *);
112 static void squidaio_do_open(squidaio_request_t *);
113 static void squidaio_do_read(squidaio_request_t *);
114 static void squidaio_do_write(squidaio_request_t *);
115 static void squidaio_do_close(squidaio_request_t *);
116 static void squidaio_do_stat(squidaio_request_t *);
117 static void squidaio_do_unlink(squidaio_request_t *);
118 #if AIO_OPENDIR
119 static void *squidaio_do_opendir(squidaio_request_t *);
120 #endif
121 static void squidaio_debug(squidaio_request_t *);
122 static void squidaio_poll_queues(void);
123
124 static squidaio_thread_t *threads = NULL;
125 static int squidaio_initialised = 0;
126
127
128 #define AIO_LARGE_BUFS 16384
129 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
130 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
131 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
132 #define AIO_MICRO_BUFS 128
133
134 static MemAllocator *squidaio_large_bufs = NULL; /* 16K */
135 static MemAllocator *squidaio_medium_bufs = NULL; /* 8K */
136 static MemAllocator *squidaio_small_bufs = NULL; /* 4K */
137 static MemAllocator *squidaio_tiny_bufs = NULL; /* 2K */
138 static MemAllocator *squidaio_micro_bufs = NULL; /* 128K */
139
140 static int request_queue_len = 0;
141 static MemAllocator *squidaio_request_pool = NULL;
142 static MemAllocator *squidaio_thread_pool = NULL;
143 static squidaio_request_queue_t request_queue;
144
145 static struct {
146 squidaio_request_t *head, **tailp;
147 }
148
149 request_queue2 = {
150
151 NULL, &request_queue2.head
152 };
153 static squidaio_request_queue_t done_queue;
154
155 static struct {
156 squidaio_request_t *head, **tailp;
157 }
158
159 done_requests = {
160
161 NULL, &done_requests.head
162 };
163 static pthread_attr_t globattr;
164 #if HAVE_SCHED_H
165
166 static struct sched_param globsched;
167 #endif
168 static pthread_t main_thread;
169
170 static MemAllocator *
171 squidaio_get_pool(int size)
172 {
173 if (size <= AIO_LARGE_BUFS) {
174 if (size <= AIO_MICRO_BUFS)
175 return squidaio_micro_bufs;
176 else if (size <= AIO_TINY_BUFS)
177 return squidaio_tiny_bufs;
178 else if (size <= AIO_SMALL_BUFS)
179 return squidaio_small_bufs;
180 else if (size <= AIO_MEDIUM_BUFS)
181 return squidaio_medium_bufs;
182 else
183 return squidaio_large_bufs;
184 }
185
186 return NULL;
187 }
188
189 void *
190 squidaio_xmalloc(int size)
191 {
192 void *p;
193 MemAllocator *pool;
194
195 if ((pool = squidaio_get_pool(size)) != NULL) {
196 p = pool->alloc();
197 } else
198 p = xmalloc(size);
199
200 return p;
201 }
202
203 static char *
204 squidaio_xstrdup(const char *str)
205 {
206 char *p;
207 int len = strlen(str) + 1;
208
209 p = (char *)squidaio_xmalloc(len);
210 strncpy(p, str, len);
211
212 return p;
213 }
214
215 void
216 squidaio_xfree(void *p, int size)
217 {
218 MemAllocator *pool;
219
220 if ((pool = squidaio_get_pool(size)) != NULL) {
221 pool->freeOne(p);
222 } else
223 xfree(p);
224 }
225
226 static void
227 squidaio_xstrfree(char *str)
228 {
229 MemAllocator *pool;
230 int len = strlen(str) + 1;
231
232 if ((pool = squidaio_get_pool(len)) != NULL) {
233 pool->freeOne(str);
234 } else
235 xfree(str);
236 }
237
238 void
239 squidaio_init(void)
240 {
241 int i;
242 squidaio_thread_t *threadp;
243
244 if (squidaio_initialised)
245 return;
246
247 pthread_attr_init(&globattr);
248
249 #if HAVE_PTHREAD_ATTR_SETSCOPE
250
251 pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
252
253 #endif
254 #if HAVE_SCHED_H
255
256 globsched.sched_priority = 1;
257
258 #endif
259
260 main_thread = pthread_self();
261
262 #if HAVE_SCHED_H && HAVE_PTHREAD_SETSCHEDPARAM
263
264 pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
265
266 #endif
267 #if HAVE_SCHED_H
268
269 globsched.sched_priority = 2;
270
271 #endif
272 #if HAVE_SCHED_H && HAVE_PTHREAD_ATTR_SETSCHEDPARAM
273
274 pthread_attr_setschedparam(&globattr, &globsched);
275
276 #endif
277
278 /* Give each thread a smaller 256KB stack, should be more than sufficient */
279 pthread_attr_setstacksize(&globattr, 256 * 1024);
280
281 /* Initialize request queue */
282 if (pthread_mutex_init(&(request_queue.mutex), NULL))
283 fatal("Failed to create mutex");
284
285 if (pthread_cond_init(&(request_queue.cond), NULL))
286 fatal("Failed to create condition variable");
287
288 request_queue.head = NULL;
289
290 request_queue.tailp = &request_queue.head;
291
292 request_queue.requests = 0;
293
294 request_queue.blocked = 0;
295
296 /* Initialize done queue */
297 if (pthread_mutex_init(&(done_queue.mutex), NULL))
298 fatal("Failed to create mutex");
299
300 if (pthread_cond_init(&(done_queue.cond), NULL))
301 fatal("Failed to create condition variable");
302
303 done_queue.head = NULL;
304
305 done_queue.tailp = &done_queue.head;
306
307 done_queue.requests = 0;
308
309 done_queue.blocked = 0;
310
311 /* Create threads and get them to sit in their wait loop */
312 squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
313
314 assert(NUMTHREADS);
315
316 for (i = 0; i < NUMTHREADS; i++) {
317 threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
318 threadp->status = _THREAD_STARTING;
319 threadp->current_req = NULL;
320 threadp->requests = 0;
321 threadp->next = threads;
322 threads = threadp;
323
324 if (pthread_create(&threadp->thread, &globattr, squidaio_thread_loop, threadp)) {
325 fprintf(stderr, "Thread creation failed\n");
326 threadp->status = _THREAD_FAILED;
327 continue;
328 }
329 }
330
331 /* Create request pool */
332 squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
333
334 squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
335
336 squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
337
338 squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
339
340 squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
341
342 squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
343
344 squidaio_initialised = 1;
345 }
346
347 void
348 squidaio_shutdown(void)
349 {
350 if (!squidaio_initialised)
351 return;
352
353 /* This is the same as in squidaio_sync */
354 do {
355 squidaio_poll_queues();
356 } while (request_queue_len > 0);
357
358 CommIO::NotifyIOClose();
359
360 squidaio_initialised = 0;
361 }
362
363 void *
364 squidaio_thread_loop(void *ptr)
365 {
366 squidaio_thread_t *threadp = (squidaio_thread_t *)ptr;
367 squidaio_request_t *request;
368 sigset_t newSig;
369
370 /*
371 * Make sure to ignore signals which may possibly get sent to
372 * the parent squid thread. Causes havoc with mutex's and
373 * condition waits otherwise
374 */
375
376 sigemptyset(&newSig);
377 sigaddset(&newSig, SIGPIPE);
378 sigaddset(&newSig, SIGCHLD);
379 #if defined(_SQUID_LINUX_THREADS_)
380
381 sigaddset(&newSig, SIGQUIT);
382 sigaddset(&newSig, SIGTRAP);
383 #else
384
385 sigaddset(&newSig, SIGUSR1);
386 sigaddset(&newSig, SIGUSR2);
387 #endif
388
389 sigaddset(&newSig, SIGHUP);
390 sigaddset(&newSig, SIGTERM);
391 sigaddset(&newSig, SIGINT);
392 sigaddset(&newSig, SIGALRM);
393 pthread_sigmask(SIG_BLOCK, &newSig, NULL);
394
395 while (1) {
396 threadp->current_req = request = NULL;
397 request = NULL;
398 /* Get a request to process */
399 threadp->status = _THREAD_WAITING;
400 pthread_mutex_lock(&request_queue.mutex);
401
402 while (!request_queue.head) {
403 pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
404 }
405
406 request = request_queue.head;
407
408 if (request)
409 request_queue.head = request->next;
410
411 if (!request_queue.head)
412 request_queue.tailp = &request_queue.head;
413
414 pthread_mutex_unlock(&request_queue.mutex);
415
416 /* process the request */
417 threadp->status = _THREAD_BUSY;
418
419 request->next = NULL;
420
421 threadp->current_req = request;
422
423 errno = 0;
424
425 if (!request->cancelled) {
426 switch (request->request_type) {
427
428 case _AIO_OP_OPEN:
429 squidaio_do_open(request);
430 break;
431
432 case _AIO_OP_READ:
433 squidaio_do_read(request);
434 break;
435
436 case _AIO_OP_WRITE:
437 squidaio_do_write(request);
438 break;
439
440 case _AIO_OP_CLOSE:
441 squidaio_do_close(request);
442 break;
443
444 case _AIO_OP_UNLINK:
445 squidaio_do_unlink(request);
446 break;
447
448 #if AIO_OPENDIR /* Opendir not implemented yet */
449
450 case _AIO_OP_OPENDIR:
451 squidaio_do_opendir(request);
452 break;
453 #endif
454
455 case _AIO_OP_STAT:
456 squidaio_do_stat(request);
457 break;
458
459 default:
460 request->ret = -1;
461 request->err = EINVAL;
462 break;
463 }
464 } else { /* cancelled */
465 request->ret = -1;
466 request->err = EINTR;
467 }
468
469 threadp->status = _THREAD_DONE;
470 /* put the request in the done queue */
471 pthread_mutex_lock(&done_queue.mutex);
472 *done_queue.tailp = request;
473 done_queue.tailp = &request->next;
474 pthread_mutex_unlock(&done_queue.mutex);
475 CommIO::NotifyIOCompleted();
476 threadp->requests++;
477 } /* while forever */
478
479 return NULL;
480 } /* squidaio_thread_loop */
481
482 static void
483 squidaio_queue_request(squidaio_request_t * request)
484 {
485 static int high_start = 0;
486 debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
487 /* Mark it as not executed (failing result, no error) */
488 request->ret = -1;
489 request->err = 0;
490 /* Internal housekeeping */
491 request_queue_len += 1;
492 request->resultp->_data = request;
493 /* Play some tricks with the request_queue2 queue */
494 request->next = NULL;
495
496 if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
497 if (request_queue2.head) {
498 /* Grab blocked requests */
499 *request_queue.tailp = request_queue2.head;
500 request_queue.tailp = request_queue2.tailp;
501 }
502
503 /* Enqueue request */
504 *request_queue.tailp = request;
505
506 request_queue.tailp = &request->next;
507
508 pthread_cond_signal(&request_queue.cond);
509
510 pthread_mutex_unlock(&request_queue.mutex);
511
512 if (request_queue2.head) {
513 /* Clear queue of blocked requests */
514 request_queue2.head = NULL;
515 request_queue2.tailp = &request_queue2.head;
516 }
517 } else {
518 /* Oops, the request queue is blocked, use request_queue2 */
519 *request_queue2.tailp = request;
520 request_queue2.tailp = &request->next;
521 }
522
523 if (request_queue2.head) {
524 static int filter = 0;
525 static int filter_limit = 8;
526
527 if (++filter >= filter_limit) {
528 filter_limit += filter;
529 filter = 0;
530 debugs(43, 1, "squidaio_queue_request: WARNING - Queue congestion");
531 }
532 }
533
534 /* Warn if out of threads */
535 if (request_queue_len > MAGIC1) {
536 static int last_warn = 0;
537 static int queue_high, queue_low;
538
539 if (high_start == 0) {
540 high_start = squid_curtime;
541 queue_high = request_queue_len;
542 queue_low = request_queue_len;
543 }
544
545 if (request_queue_len > queue_high)
546 queue_high = request_queue_len;
547
548 if (request_queue_len < queue_low)
549 queue_low = request_queue_len;
550
551 if (squid_curtime >= (last_warn + 15) &&
552 squid_curtime >= (high_start + 5)) {
553 debugs(43, 1, "squidaio_queue_request: WARNING - Disk I/O overloading");
554
555 if (squid_curtime >= (high_start + 15))
556 debugs(43, 1, "squidaio_queue_request: Queue Length: current=" <<
557 request_queue_len << ", high=" << queue_high <<
558 ", low=" << queue_low << ", duration=" <<
559 (long int) (squid_curtime - high_start));
560
561 last_warn = squid_curtime;
562 }
563 } else {
564 high_start = 0;
565 }
566
567 /* Warn if seriously overloaded */
568 if (request_queue_len > RIDICULOUS_LENGTH) {
569 debugs(43, 0, "squidaio_queue_request: Async request queue growing uncontrollably!");
570 debugs(43, 0, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
571 squidaio_sync();
572 debugs(43, 0, "squidaio_queue_request: Synced");
573 }
574 } /* squidaio_queue_request */
575
576 static void
577 squidaio_cleanup_request(squidaio_request_t * requestp)
578 {
579 squidaio_result_t *resultp = requestp->resultp;
580 int cancelled = requestp->cancelled;
581
582 /* Free allocated structures and copy data back to user space if the */
583 /* request hasn't been cancelled */
584
585 switch (requestp->request_type) {
586
587 case _AIO_OP_STAT:
588
589 if (!cancelled && requestp->ret == 0)
590 memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
591
592 squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
593
594 squidaio_xstrfree(requestp->path);
595
596 break;
597
598 case _AIO_OP_OPEN:
599 if (cancelled && requestp->ret >= 0)
600 /* The open() was cancelled but completed */
601 close(requestp->ret);
602
603 squidaio_xstrfree(requestp->path);
604
605 break;
606
607 case _AIO_OP_CLOSE:
608 if (cancelled && requestp->ret < 0)
609 /* The close() was cancelled and never got executed */
610 close(requestp->fd);
611
612 break;
613
614 case _AIO_OP_UNLINK:
615
616 case _AIO_OP_OPENDIR:
617 squidaio_xstrfree(requestp->path);
618
619 break;
620
621 case _AIO_OP_READ:
622 break;
623
624 case _AIO_OP_WRITE:
625 break;
626
627 default:
628 break;
629 }
630
631 if (resultp != NULL && !cancelled) {
632 resultp->aio_return = requestp->ret;
633 resultp->aio_errno = requestp->err;
634 }
635
636 squidaio_request_pool->freeOne(requestp);
637 } /* squidaio_cleanup_request */
638
639
640 int
641 squidaio_cancel(squidaio_result_t * resultp)
642 {
643 squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
644
645 if (request && request->resultp == resultp) {
646 debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
647 request->cancelled = 1;
648 request->resultp = NULL;
649 resultp->_data = NULL;
650 resultp->result_type = _AIO_OP_NONE;
651 return 0;
652 }
653
654 return 1;
655 } /* squidaio_cancel */
656
657
658 int
659 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
660 {
661 squidaio_init();
662 squidaio_request_t *requestp;
663
664 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
665
666 requestp->path = (char *) squidaio_xstrdup(path);
667
668 requestp->oflag = oflag;
669
670 requestp->mode = mode;
671
672 requestp->resultp = resultp;
673
674 requestp->request_type = _AIO_OP_OPEN;
675
676 requestp->cancelled = 0;
677
678 resultp->result_type = _AIO_OP_OPEN;
679
680 squidaio_queue_request(requestp);
681
682 return 0;
683 }
684
685
686 static void
687 squidaio_do_open(squidaio_request_t * requestp)
688 {
689 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
690 requestp->err = errno;
691 }
692
693
694 int
695 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
696 {
697 squidaio_request_t *requestp;
698
699 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
700
701 requestp->fd = fd;
702
703 requestp->bufferp = bufp;
704
705 requestp->buflen = bufs;
706
707 requestp->offset = offset;
708
709 requestp->whence = whence;
710
711 requestp->resultp = resultp;
712
713 requestp->request_type = _AIO_OP_READ;
714
715 requestp->cancelled = 0;
716
717 resultp->result_type = _AIO_OP_READ;
718
719 squidaio_queue_request(requestp);
720
721 return 0;
722 }
723
724
725 static void
726 squidaio_do_read(squidaio_request_t * requestp)
727 {
728 lseek(requestp->fd, requestp->offset, requestp->whence);
729 requestp->ret = read(requestp->fd, requestp->bufferp, requestp->buflen);
730 requestp->err = errno;
731 }
732
733
734 int
735 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
736 {
737 squidaio_request_t *requestp;
738
739 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
740
741 requestp->fd = fd;
742
743 requestp->bufferp = bufp;
744
745 requestp->buflen = bufs;
746
747 requestp->offset = offset;
748
749 requestp->whence = whence;
750
751 requestp->resultp = resultp;
752
753 requestp->request_type = _AIO_OP_WRITE;
754
755 requestp->cancelled = 0;
756
757 resultp->result_type = _AIO_OP_WRITE;
758
759 squidaio_queue_request(requestp);
760
761 return 0;
762 }
763
764
765 static void
766 squidaio_do_write(squidaio_request_t * requestp)
767 {
768 requestp->ret = write(requestp->fd, requestp->bufferp, requestp->buflen);
769 requestp->err = errno;
770 }
771
772
773 int
774 squidaio_close(int fd, squidaio_result_t * resultp)
775 {
776 squidaio_request_t *requestp;
777
778 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
779
780 requestp->fd = fd;
781
782 requestp->resultp = resultp;
783
784 requestp->request_type = _AIO_OP_CLOSE;
785
786 requestp->cancelled = 0;
787
788 resultp->result_type = _AIO_OP_CLOSE;
789
790 squidaio_queue_request(requestp);
791
792 return 0;
793 }
794
795
796 static void
797 squidaio_do_close(squidaio_request_t * requestp)
798 {
799 requestp->ret = close(requestp->fd);
800 requestp->err = errno;
801 }
802
803
804 int
805
806 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
807 {
808 squidaio_init();
809 squidaio_request_t *requestp;
810
811 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
812
813 requestp->path = (char *) squidaio_xstrdup(path);
814
815 requestp->statp = sb;
816
817 requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
818
819 requestp->resultp = resultp;
820
821 requestp->request_type = _AIO_OP_STAT;
822
823 requestp->cancelled = 0;
824
825 resultp->result_type = _AIO_OP_STAT;
826
827 squidaio_queue_request(requestp);
828
829 return 0;
830 }
831
832
833 static void
834 squidaio_do_stat(squidaio_request_t * requestp)
835 {
836 requestp->ret = stat(requestp->path, requestp->tmpstatp);
837 requestp->err = errno;
838 }
839
840
841 int
842 squidaio_unlink(const char *path, squidaio_result_t * resultp)
843 {
844 squidaio_init();
845 squidaio_request_t *requestp;
846
847 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
848
849 requestp->path = squidaio_xstrdup(path);
850
851 requestp->resultp = resultp;
852
853 requestp->request_type = _AIO_OP_UNLINK;
854
855 requestp->cancelled = 0;
856
857 resultp->result_type = _AIO_OP_UNLINK;
858
859 squidaio_queue_request(requestp);
860
861 return 0;
862 }
863
864
865 static void
866 squidaio_do_unlink(squidaio_request_t * requestp)
867 {
868 requestp->ret = unlink(requestp->path);
869 requestp->err = errno;
870 }
871
872 #if AIO_OPENDIR
873 /* XXX squidaio_opendir NOT implemented yet.. */
874
875 int
876 squidaio_opendir(const char *path, squidaio_result_t * resultp)
877 {
878 squidaio_request_t *requestp;
879 int len;
880
881 requestp = squidaio_request_pool->alloc();
882
883 resultp->result_type = _AIO_OP_OPENDIR;
884
885 return -1;
886 }
887
888 static void
889 squidaio_do_opendir(squidaio_request_t * requestp)
890 {
891 /* NOT IMPLEMENTED */
892 }
893
894 #endif
895
896 static void
897 squidaio_poll_queues(void)
898 {
899 /* kick "overflow" request queue */
900
901 if (request_queue2.head &&
902 pthread_mutex_trylock(&request_queue.mutex) == 0) {
903 *request_queue.tailp = request_queue2.head;
904 request_queue.tailp = request_queue2.tailp;
905 pthread_cond_signal(&request_queue.cond);
906 pthread_mutex_unlock(&request_queue.mutex);
907 request_queue2.head = NULL;
908 request_queue2.tailp = &request_queue2.head;
909 }
910
911 /* poll done queue */
912 if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) {
913
914 struct squidaio_request_t *requests = done_queue.head;
915 done_queue.head = NULL;
916 done_queue.tailp = &done_queue.head;
917 pthread_mutex_unlock(&done_queue.mutex);
918 *done_requests.tailp = requests;
919 request_queue_len -= 1;
920
921 while (requests->next) {
922 requests = requests->next;
923 request_queue_len -= 1;
924 }
925
926 done_requests.tailp = &requests->next;
927 }
928 }
929
930 squidaio_result_t *
931 squidaio_poll_done(void)
932 {
933 squidaio_request_t *request;
934 squidaio_result_t *resultp;
935 int cancelled;
936 int polled = 0;
937
938 AIO_REPOLL:
939 request = done_requests.head;
940
941 if (request == NULL && !polled) {
942 CommIO::ResetNotifications();
943 squidaio_poll_queues();
944 polled = 1;
945 request = done_requests.head;
946 }
947
948 if (!request) {
949 return NULL;
950 }
951
952 debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
953 done_requests.head = request->next;
954
955 if (!done_requests.head)
956 done_requests.tailp = &done_requests.head;
957
958 resultp = request->resultp;
959
960 cancelled = request->cancelled;
961
962 squidaio_debug(request);
963
964 debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
965
966 squidaio_cleanup_request(request);
967
968 if (cancelled)
969 goto AIO_REPOLL;
970
971 return resultp;
972 } /* squidaio_poll_done */
973
974 int
975 squidaio_operations_pending(void)
976 {
977 return request_queue_len + (done_requests.head ? 1 : 0);
978 }
979
980 int
981 squidaio_sync(void)
982 {
983 /* XXX This might take a while if the queue is large.. */
984
985 do {
986 squidaio_poll_queues();
987 } while (request_queue_len > 0);
988
989 return squidaio_operations_pending();
990 }
991
992 int
993 squidaio_get_queue_len(void)
994 {
995 return request_queue_len;
996 }
997
998 static void
999 squidaio_debug(squidaio_request_t * request)
1000 {
1001 switch (request->request_type) {
1002
1003 case _AIO_OP_OPEN:
1004 debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1005 break;
1006
1007 case _AIO_OP_READ:
1008 debugs(43, 5, "READ on fd: " << request->fd);
1009 break;
1010
1011 case _AIO_OP_WRITE:
1012 debugs(43, 5, "WRITE on fd: " << request->fd);
1013 break;
1014
1015 case _AIO_OP_CLOSE:
1016 debugs(43, 5, "CLOSE of fd: " << request->fd);
1017 break;
1018
1019 case _AIO_OP_UNLINK:
1020 debugs(43, 5, "UNLINK of " << request->path);
1021 break;
1022
1023 default:
1024 break;
1025 }
1026 }
1027
1028 void
1029 squidaio_stats(StoreEntry * sentry)
1030 {
1031 squidaio_thread_t *threadp;
1032 int i;
1033
1034 if (!squidaio_initialised)
1035 return;
1036
1037 storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1038
1039 storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1040
1041 threadp = threads;
1042
1043 for (i = 0; i < NUMTHREADS; i++) {
1044 storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, (unsigned long)threadp->thread, threadp->requests);
1045 threadp = threadp->next;
1046 }
1047 }