]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/DiskThreads/aiops.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / DiskIO / DiskThreads / aiops.cc
1 /*
2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 43 AIOPS */
10
11 #ifndef _REENTRANT
12 #error "_REENTRANT MUST be defined to build squid async io support."
13 #endif
14
15 #include "squid.h"
16 #include "DiskIO/DiskThreads/CommIO.h"
17 #include "DiskThreads.h"
18 #include "SquidConfig.h"
19 #include "SquidTime.h"
20 #include "Store.h"
21
22 /*
23 * struct stat and squidaio_xstrdup use explicit pool alloc()/freeOne().
24 * XXX: convert to MEMPROXY_CLASS() API
25 */
26 #include "mem/Pool.h"
27
28 #include <cerrno>
29 #include <csignal>
30 #include <sys/stat.h>
31 #include <fcntl.h>
32 #include <pthread.h>
33 #include <dirent.h>
34 #if HAVE_SCHED_H
35 #include <sched.h>
36 #endif
37
38 #define RIDICULOUS_LENGTH 4096
39
40 enum _squidaio_thread_status {
41 _THREAD_STARTING = 0,
42 _THREAD_WAITING,
43 _THREAD_BUSY,
44 _THREAD_FAILED,
45 _THREAD_DONE
46 };
47 typedef enum _squidaio_thread_status squidaio_thread_status;
48
49 typedef struct squidaio_request_t {
50
51 struct squidaio_request_t *next;
52 squidaio_request_type request_type;
53 int cancelled;
54 char *path;
55 int oflag;
56 mode_t mode;
57 int fd;
58 char *bufferp;
59 size_t buflen;
60 off_t offset;
61 int whence;
62 int ret;
63 int err;
64
65 struct stat *tmpstatp;
66
67 struct stat *statp;
68 squidaio_result_t *resultp;
69 } squidaio_request_t;
70
71 typedef struct squidaio_request_queue_t {
72 pthread_mutex_t mutex;
73 pthread_cond_t cond;
74 squidaio_request_t *volatile head;
75 squidaio_request_t *volatile *volatile tailp;
76 unsigned long requests;
77 unsigned long blocked; /* main failed to lock the queue */
78 } squidaio_request_queue_t;
79
80 typedef struct squidaio_thread_t squidaio_thread_t;
81
82 struct squidaio_thread_t {
83 squidaio_thread_t *next;
84 pthread_t thread;
85 squidaio_thread_status status;
86
87 struct squidaio_request_t *current_req;
88 unsigned long requests;
89 };
90
91 static void squidaio_queue_request(squidaio_request_t *);
92 static void squidaio_cleanup_request(squidaio_request_t *);
93 void *squidaio_thread_loop(void *);
94 static void squidaio_do_open(squidaio_request_t *);
95 static void squidaio_do_read(squidaio_request_t *);
96 static void squidaio_do_write(squidaio_request_t *);
97 static void squidaio_do_close(squidaio_request_t *);
98 static void squidaio_do_stat(squidaio_request_t *);
99 static void squidaio_do_unlink(squidaio_request_t *);
100 #if AIO_OPENDIR
101 static void *squidaio_do_opendir(squidaio_request_t *);
102 #endif
103 static void squidaio_debug(squidaio_request_t *);
104 static void squidaio_poll_queues(void);
105
106 static squidaio_thread_t *threads = NULL;
107 static int squidaio_initialised = 0;
108
109 #define AIO_LARGE_BUFS 16384
110 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
111 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
112 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
113 #define AIO_MICRO_BUFS 128
114
115 static MemAllocator *squidaio_large_bufs = NULL; /* 16K */
116 static MemAllocator *squidaio_medium_bufs = NULL; /* 8K */
117 static MemAllocator *squidaio_small_bufs = NULL; /* 4K */
118 static MemAllocator *squidaio_tiny_bufs = NULL; /* 2K */
119 static MemAllocator *squidaio_micro_bufs = NULL; /* 128K */
120
121 static int request_queue_len = 0;
122 static MemAllocator *squidaio_request_pool = NULL;
123 static MemAllocator *squidaio_thread_pool = NULL;
124 static squidaio_request_queue_t request_queue;
125
126 static struct {
127 squidaio_request_t *head, **tailp;
128 }
129
130 request_queue2 = {
131
132 NULL, &request_queue2.head
133 };
134 static squidaio_request_queue_t done_queue;
135
136 static struct {
137 squidaio_request_t *head, **tailp;
138 }
139
140 done_requests = {
141
142 NULL, &done_requests.head
143 };
144 static pthread_attr_t globattr;
145 #if HAVE_SCHED_H
146
147 static struct sched_param globsched;
148 #endif
149 static pthread_t main_thread;
150
151 static MemAllocator *
152 squidaio_get_pool(int size)
153 {
154 if (size <= AIO_LARGE_BUFS) {
155 if (size <= AIO_MICRO_BUFS)
156 return squidaio_micro_bufs;
157 else if (size <= AIO_TINY_BUFS)
158 return squidaio_tiny_bufs;
159 else if (size <= AIO_SMALL_BUFS)
160 return squidaio_small_bufs;
161 else if (size <= AIO_MEDIUM_BUFS)
162 return squidaio_medium_bufs;
163 else
164 return squidaio_large_bufs;
165 }
166
167 return NULL;
168 }
169
170 void *
171 squidaio_xmalloc(int size)
172 {
173 void *p;
174 MemAllocator *pool;
175
176 if ((pool = squidaio_get_pool(size)) != NULL) {
177 p = pool->alloc();
178 } else
179 p = xmalloc(size);
180
181 return p;
182 }
183
184 static char *
185 squidaio_xstrdup(const char *str)
186 {
187 char *p;
188 int len = strlen(str) + 1;
189
190 p = (char *)squidaio_xmalloc(len);
191 strncpy(p, str, len);
192
193 return p;
194 }
195
196 void
197 squidaio_xfree(void *p, int size)
198 {
199 MemAllocator *pool;
200
201 if ((pool = squidaio_get_pool(size)) != NULL) {
202 pool->freeOne(p);
203 } else
204 xfree(p);
205 }
206
207 static void
208 squidaio_xstrfree(char *str)
209 {
210 MemAllocator *pool;
211 int len = strlen(str) + 1;
212
213 if ((pool = squidaio_get_pool(len)) != NULL) {
214 pool->freeOne(str);
215 } else
216 xfree(str);
217 }
218
219 void
220 squidaio_init(void)
221 {
222 int i;
223 squidaio_thread_t *threadp;
224
225 if (squidaio_initialised)
226 return;
227
228 pthread_attr_init(&globattr);
229
230 #if HAVE_PTHREAD_ATTR_SETSCOPE
231
232 pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
233
234 #endif
235 #if HAVE_SCHED_H
236
237 globsched.sched_priority = 1;
238
239 #endif
240
241 main_thread = pthread_self();
242
243 #if HAVE_SCHED_H && HAVE_PTHREAD_SETSCHEDPARAM
244
245 pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
246
247 #endif
248 #if HAVE_SCHED_H
249
250 globsched.sched_priority = 2;
251
252 #endif
253 #if HAVE_SCHED_H && HAVE_PTHREAD_ATTR_SETSCHEDPARAM
254
255 pthread_attr_setschedparam(&globattr, &globsched);
256
257 #endif
258
259 /* Give each thread a smaller 256KB stack, should be more than sufficient */
260 pthread_attr_setstacksize(&globattr, 256 * 1024);
261
262 /* Initialize request queue */
263 if (pthread_mutex_init(&(request_queue.mutex), NULL))
264 fatal("Failed to create mutex");
265
266 if (pthread_cond_init(&(request_queue.cond), NULL))
267 fatal("Failed to create condition variable");
268
269 request_queue.head = NULL;
270
271 request_queue.tailp = &request_queue.head;
272
273 request_queue.requests = 0;
274
275 request_queue.blocked = 0;
276
277 /* Initialize done queue */
278 if (pthread_mutex_init(&(done_queue.mutex), NULL))
279 fatal("Failed to create mutex");
280
281 if (pthread_cond_init(&(done_queue.cond), NULL))
282 fatal("Failed to create condition variable");
283
284 done_queue.head = NULL;
285
286 done_queue.tailp = &done_queue.head;
287
288 done_queue.requests = 0;
289
290 done_queue.blocked = 0;
291
292 // Initialize the thread I/O pipes before creating any threads
293 // see bug 3189 comment 5 about race conditions.
294 CommIO::Initialize();
295
296 /* Create threads and get them to sit in their wait loop */
297 squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
298
299 assert(NUMTHREADS);
300
301 for (i = 0; i < NUMTHREADS; ++i) {
302 threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
303 threadp->status = _THREAD_STARTING;
304 threadp->current_req = NULL;
305 threadp->requests = 0;
306 threadp->next = threads;
307 threads = threadp;
308
309 if (pthread_create(&threadp->thread, &globattr, squidaio_thread_loop, threadp)) {
310 fprintf(stderr, "Thread creation failed\n");
311 threadp->status = _THREAD_FAILED;
312 continue;
313 }
314 }
315
316 /* Create request pool */
317 squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
318
319 squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
320
321 squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
322
323 squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
324
325 squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
326
327 squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
328
329 squidaio_initialised = 1;
330 }
331
332 void
333 squidaio_shutdown(void)
334 {
335 if (!squidaio_initialised)
336 return;
337
338 /* This is the same as in squidaio_sync */
339 do {
340 squidaio_poll_queues();
341 } while (request_queue_len > 0);
342
343 CommIO::NotifyIOClose();
344
345 squidaio_initialised = 0;
346 }
347
348 void *
349 squidaio_thread_loop(void *ptr)
350 {
351 squidaio_thread_t *threadp = (squidaio_thread_t *)ptr;
352 squidaio_request_t *request;
353 sigset_t newSig;
354
355 /*
356 * Make sure to ignore signals which may possibly get sent to
357 * the parent squid thread. Causes havoc with mutex's and
358 * condition waits otherwise
359 */
360
361 sigemptyset(&newSig);
362 sigaddset(&newSig, SIGPIPE);
363 sigaddset(&newSig, SIGCHLD);
364 #if defined(_SQUID_LINUX_THREADS_)
365
366 sigaddset(&newSig, SIGQUIT);
367 sigaddset(&newSig, SIGTRAP);
368 #else
369
370 sigaddset(&newSig, SIGUSR1);
371 sigaddset(&newSig, SIGUSR2);
372 #endif
373
374 sigaddset(&newSig, SIGHUP);
375 sigaddset(&newSig, SIGTERM);
376 sigaddset(&newSig, SIGINT);
377 sigaddset(&newSig, SIGALRM);
378 pthread_sigmask(SIG_BLOCK, &newSig, NULL);
379
380 while (1) {
381 threadp->current_req = request = NULL;
382 request = NULL;
383 /* Get a request to process */
384 threadp->status = _THREAD_WAITING;
385 pthread_mutex_lock(&request_queue.mutex);
386
387 while (!request_queue.head) {
388 pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
389 }
390
391 request = request_queue.head;
392
393 if (request)
394 request_queue.head = request->next;
395
396 if (!request_queue.head)
397 request_queue.tailp = &request_queue.head;
398
399 pthread_mutex_unlock(&request_queue.mutex);
400
401 /* process the request */
402 threadp->status = _THREAD_BUSY;
403
404 request->next = NULL;
405
406 threadp->current_req = request;
407
408 errno = 0;
409
410 if (!request->cancelled) {
411 switch (request->request_type) {
412
413 case _AIO_OP_OPEN:
414 squidaio_do_open(request);
415 break;
416
417 case _AIO_OP_READ:
418 squidaio_do_read(request);
419 break;
420
421 case _AIO_OP_WRITE:
422 squidaio_do_write(request);
423 break;
424
425 case _AIO_OP_CLOSE:
426 squidaio_do_close(request);
427 break;
428
429 case _AIO_OP_UNLINK:
430 squidaio_do_unlink(request);
431 break;
432
433 #if AIO_OPENDIR /* Opendir not implemented yet */
434
435 case _AIO_OP_OPENDIR:
436 squidaio_do_opendir(request);
437 break;
438 #endif
439
440 case _AIO_OP_STAT:
441 squidaio_do_stat(request);
442 break;
443
444 default:
445 request->ret = -1;
446 request->err = EINVAL;
447 break;
448 }
449 } else { /* cancelled */
450 request->ret = -1;
451 request->err = EINTR;
452 }
453
454 threadp->status = _THREAD_DONE;
455 /* put the request in the done queue */
456 pthread_mutex_lock(&done_queue.mutex);
457 *done_queue.tailp = request;
458 done_queue.tailp = &request->next;
459 pthread_mutex_unlock(&done_queue.mutex);
460 CommIO::NotifyIOCompleted();
461 ++ threadp->requests;
462 } /* while forever */
463
464 return NULL;
465 } /* squidaio_thread_loop */
466
467 static void
468 squidaio_queue_request(squidaio_request_t * request)
469 {
470 static int high_start = 0;
471 debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
472 /* Mark it as not executed (failing result, no error) */
473 request->ret = -1;
474 request->err = 0;
475 /* Internal housekeeping */
476 request_queue_len += 1;
477 request->resultp->_data = request;
478 /* Play some tricks with the request_queue2 queue */
479 request->next = NULL;
480
481 if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
482 if (request_queue2.head) {
483 /* Grab blocked requests */
484 *request_queue.tailp = request_queue2.head;
485 request_queue.tailp = request_queue2.tailp;
486 }
487
488 /* Enqueue request */
489 *request_queue.tailp = request;
490
491 request_queue.tailp = &request->next;
492
493 pthread_cond_signal(&request_queue.cond);
494
495 pthread_mutex_unlock(&request_queue.mutex);
496
497 if (request_queue2.head) {
498 /* Clear queue of blocked requests */
499 request_queue2.head = NULL;
500 request_queue2.tailp = &request_queue2.head;
501 }
502 } else {
503 /* Oops, the request queue is blocked, use request_queue2 */
504 *request_queue2.tailp = request;
505 request_queue2.tailp = &request->next;
506 }
507
508 if (request_queue2.head) {
509 static int filter = 0;
510 static int filter_limit = 8;
511
512 if (++filter >= filter_limit) {
513 filter_limit += filter;
514 filter = 0;
515 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Queue congestion");
516 }
517 }
518
519 /* Warn if out of threads */
520 if (request_queue_len > MAGIC1) {
521 static int last_warn = 0;
522 static int queue_high, queue_low;
523
524 if (high_start == 0) {
525 high_start = squid_curtime;
526 queue_high = request_queue_len;
527 queue_low = request_queue_len;
528 }
529
530 if (request_queue_len > queue_high)
531 queue_high = request_queue_len;
532
533 if (request_queue_len < queue_low)
534 queue_low = request_queue_len;
535
536 if (squid_curtime >= (last_warn + 15) &&
537 squid_curtime >= (high_start + 5)) {
538 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Disk I/O overloading");
539
540 if (squid_curtime >= (high_start + 15))
541 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
542 request_queue_len << ", high=" << queue_high <<
543 ", low=" << queue_low << ", duration=" <<
544 (long int) (squid_curtime - high_start));
545
546 last_warn = squid_curtime;
547 }
548 } else {
549 high_start = 0;
550 }
551
552 /* Warn if seriously overloaded */
553 if (request_queue_len > RIDICULOUS_LENGTH) {
554 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
555 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
556 squidaio_sync();
557 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
558 }
559 } /* squidaio_queue_request */
560
561 static void
562 squidaio_cleanup_request(squidaio_request_t * requestp)
563 {
564 squidaio_result_t *resultp = requestp->resultp;
565 int cancelled = requestp->cancelled;
566
567 /* Free allocated structures and copy data back to user space if the */
568 /* request hasn't been cancelled */
569
570 switch (requestp->request_type) {
571
572 case _AIO_OP_STAT:
573
574 if (!cancelled && requestp->ret == 0)
575 memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
576
577 squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
578
579 squidaio_xstrfree(requestp->path);
580
581 break;
582
583 case _AIO_OP_OPEN:
584 if (cancelled && requestp->ret >= 0)
585 /* The open() was cancelled but completed */
586 close(requestp->ret);
587
588 squidaio_xstrfree(requestp->path);
589
590 break;
591
592 case _AIO_OP_CLOSE:
593 if (cancelled && requestp->ret < 0)
594 /* The close() was cancelled and never got executed */
595 close(requestp->fd);
596
597 break;
598
599 case _AIO_OP_UNLINK:
600
601 case _AIO_OP_OPENDIR:
602 squidaio_xstrfree(requestp->path);
603
604 break;
605
606 case _AIO_OP_READ:
607 break;
608
609 case _AIO_OP_WRITE:
610 break;
611
612 default:
613 break;
614 }
615
616 if (resultp != NULL && !cancelled) {
617 resultp->aio_return = requestp->ret;
618 resultp->aio_errno = requestp->err;
619 }
620
621 squidaio_request_pool->freeOne(requestp);
622 } /* squidaio_cleanup_request */
623
624 int
625 squidaio_cancel(squidaio_result_t * resultp)
626 {
627 squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
628
629 if (request && request->resultp == resultp) {
630 debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
631 request->cancelled = 1;
632 request->resultp = NULL;
633 resultp->_data = NULL;
634 resultp->result_type = _AIO_OP_NONE;
635 return 0;
636 }
637
638 return 1;
639 } /* squidaio_cancel */
640
641 int
642 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
643 {
644 squidaio_init();
645 squidaio_request_t *requestp;
646
647 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
648
649 requestp->path = (char *) squidaio_xstrdup(path);
650
651 requestp->oflag = oflag;
652
653 requestp->mode = mode;
654
655 requestp->resultp = resultp;
656
657 requestp->request_type = _AIO_OP_OPEN;
658
659 requestp->cancelled = 0;
660
661 resultp->result_type = _AIO_OP_OPEN;
662
663 squidaio_queue_request(requestp);
664
665 return 0;
666 }
667
668 static void
669 squidaio_do_open(squidaio_request_t * requestp)
670 {
671 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
672 requestp->err = errno;
673 }
674
675 int
676 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
677 {
678 squidaio_request_t *requestp;
679
680 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
681
682 requestp->fd = fd;
683
684 requestp->bufferp = bufp;
685
686 requestp->buflen = bufs;
687
688 requestp->offset = offset;
689
690 requestp->whence = whence;
691
692 requestp->resultp = resultp;
693
694 requestp->request_type = _AIO_OP_READ;
695
696 requestp->cancelled = 0;
697
698 resultp->result_type = _AIO_OP_READ;
699
700 squidaio_queue_request(requestp);
701
702 return 0;
703 }
704
705 static void
706 squidaio_do_read(squidaio_request_t * requestp)
707 {
708 if (lseek(requestp->fd, requestp->offset, requestp->whence) >= 0)
709 requestp->ret = read(requestp->fd, requestp->bufferp, requestp->buflen);
710 else
711 requestp->ret = -1;
712 requestp->err = errno;
713 }
714
715 int
716 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
717 {
718 squidaio_request_t *requestp;
719
720 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
721
722 requestp->fd = fd;
723
724 requestp->bufferp = bufp;
725
726 requestp->buflen = bufs;
727
728 requestp->offset = offset;
729
730 requestp->whence = whence;
731
732 requestp->resultp = resultp;
733
734 requestp->request_type = _AIO_OP_WRITE;
735
736 requestp->cancelled = 0;
737
738 resultp->result_type = _AIO_OP_WRITE;
739
740 squidaio_queue_request(requestp);
741
742 return 0;
743 }
744
745 static void
746 squidaio_do_write(squidaio_request_t * requestp)
747 {
748 requestp->ret = write(requestp->fd, requestp->bufferp, requestp->buflen);
749 requestp->err = errno;
750 }
751
752 int
753 squidaio_close(int fd, squidaio_result_t * resultp)
754 {
755 squidaio_request_t *requestp;
756
757 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
758
759 requestp->fd = fd;
760
761 requestp->resultp = resultp;
762
763 requestp->request_type = _AIO_OP_CLOSE;
764
765 requestp->cancelled = 0;
766
767 resultp->result_type = _AIO_OP_CLOSE;
768
769 squidaio_queue_request(requestp);
770
771 return 0;
772 }
773
774 static void
775 squidaio_do_close(squidaio_request_t * requestp)
776 {
777 requestp->ret = close(requestp->fd);
778 requestp->err = errno;
779 }
780
781 int
782
783 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
784 {
785 squidaio_init();
786 squidaio_request_t *requestp;
787
788 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
789
790 requestp->path = (char *) squidaio_xstrdup(path);
791
792 requestp->statp = sb;
793
794 requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
795
796 requestp->resultp = resultp;
797
798 requestp->request_type = _AIO_OP_STAT;
799
800 requestp->cancelled = 0;
801
802 resultp->result_type = _AIO_OP_STAT;
803
804 squidaio_queue_request(requestp);
805
806 return 0;
807 }
808
809 static void
810 squidaio_do_stat(squidaio_request_t * requestp)
811 {
812 requestp->ret = stat(requestp->path, requestp->tmpstatp);
813 requestp->err = errno;
814 }
815
816 int
817 squidaio_unlink(const char *path, squidaio_result_t * resultp)
818 {
819 squidaio_init();
820 squidaio_request_t *requestp;
821
822 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
823
824 requestp->path = squidaio_xstrdup(path);
825
826 requestp->resultp = resultp;
827
828 requestp->request_type = _AIO_OP_UNLINK;
829
830 requestp->cancelled = 0;
831
832 resultp->result_type = _AIO_OP_UNLINK;
833
834 squidaio_queue_request(requestp);
835
836 return 0;
837 }
838
839 static void
840 squidaio_do_unlink(squidaio_request_t * requestp)
841 {
842 requestp->ret = unlink(requestp->path);
843 requestp->err = errno;
844 }
845
846 #if AIO_OPENDIR
847 /* XXX squidaio_opendir NOT implemented yet.. */
848
849 int
850 squidaio_opendir(const char *path, squidaio_result_t * resultp)
851 {
852 squidaio_request_t *requestp;
853 int len;
854
855 requestp = squidaio_request_pool->alloc();
856
857 resultp->result_type = _AIO_OP_OPENDIR;
858
859 return -1;
860 }
861
862 static void
863 squidaio_do_opendir(squidaio_request_t * requestp)
864 {
865 /* NOT IMPLEMENTED */
866 }
867
868 #endif
869
870 static void
871 squidaio_poll_queues(void)
872 {
873 /* kick "overflow" request queue */
874
875 if (request_queue2.head &&
876 pthread_mutex_trylock(&request_queue.mutex) == 0) {
877 *request_queue.tailp = request_queue2.head;
878 request_queue.tailp = request_queue2.tailp;
879 pthread_cond_signal(&request_queue.cond);
880 pthread_mutex_unlock(&request_queue.mutex);
881 request_queue2.head = NULL;
882 request_queue2.tailp = &request_queue2.head;
883 }
884
885 /* poll done queue */
886 if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) {
887
888 struct squidaio_request_t *requests = done_queue.head;
889 done_queue.head = NULL;
890 done_queue.tailp = &done_queue.head;
891 pthread_mutex_unlock(&done_queue.mutex);
892 *done_requests.tailp = requests;
893 request_queue_len -= 1;
894
895 while (requests->next) {
896 requests = requests->next;
897 request_queue_len -= 1;
898 }
899
900 done_requests.tailp = &requests->next;
901 }
902 }
903
904 squidaio_result_t *
905 squidaio_poll_done(void)
906 {
907 squidaio_request_t *request;
908 squidaio_result_t *resultp;
909 int cancelled;
910 int polled = 0;
911
912 AIO_REPOLL:
913 request = done_requests.head;
914
915 if (request == NULL && !polled) {
916 CommIO::ResetNotifications();
917 squidaio_poll_queues();
918 polled = 1;
919 request = done_requests.head;
920 }
921
922 if (!request) {
923 return NULL;
924 }
925
926 debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
927 done_requests.head = request->next;
928
929 if (!done_requests.head)
930 done_requests.tailp = &done_requests.head;
931
932 resultp = request->resultp;
933
934 cancelled = request->cancelled;
935
936 squidaio_debug(request);
937
938 debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
939
940 squidaio_cleanup_request(request);
941
942 if (cancelled)
943 goto AIO_REPOLL;
944
945 return resultp;
946 } /* squidaio_poll_done */
947
948 int
949 squidaio_operations_pending(void)
950 {
951 return request_queue_len + (done_requests.head ? 1 : 0);
952 }
953
954 int
955 squidaio_sync(void)
956 {
957 /* XXX This might take a while if the queue is large.. */
958
959 do {
960 squidaio_poll_queues();
961 } while (request_queue_len > 0);
962
963 return squidaio_operations_pending();
964 }
965
966 int
967 squidaio_get_queue_len(void)
968 {
969 return request_queue_len;
970 }
971
972 static void
973 squidaio_debug(squidaio_request_t * request)
974 {
975 switch (request->request_type) {
976
977 case _AIO_OP_OPEN:
978 debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
979 break;
980
981 case _AIO_OP_READ:
982 debugs(43, 5, "READ on fd: " << request->fd);
983 break;
984
985 case _AIO_OP_WRITE:
986 debugs(43, 5, "WRITE on fd: " << request->fd);
987 break;
988
989 case _AIO_OP_CLOSE:
990 debugs(43, 5, "CLOSE of fd: " << request->fd);
991 break;
992
993 case _AIO_OP_UNLINK:
994 debugs(43, 5, "UNLINK of " << request->path);
995 break;
996
997 default:
998 break;
999 }
1000 }
1001
1002 void
1003 squidaio_stats(StoreEntry * sentry)
1004 {
1005 squidaio_thread_t *threadp;
1006 int i;
1007
1008 if (!squidaio_initialised)
1009 return;
1010
1011 storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1012
1013 storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1014
1015 threadp = threads;
1016
1017 for (i = 0; i < NUMTHREADS; ++i) {
1018 storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, (unsigned long)threadp->thread, threadp->requests);
1019 threadp = threadp->next;
1020 }
1021 }
1022