]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/DiskThreads/aiops.cc
e3b4ba3875d0ccad2611d6fedbcbb5e650da5342
[thirdparty/squid.git] / src / DiskIO / DiskThreads / aiops.cc
1 /*
2 * Copyright (C) 1996-2022 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 43 AIOPS */
10
11 #ifndef _REENTRANT
12 #error "_REENTRANT MUST be defined to build squid async io support."
13 #endif
14
15 #include "squid.h"
16 #include "DiskIO/DiskThreads/CommIO.h"
17 #include "DiskThreads.h"
18 #include "SquidConfig.h"
19 #include "Store.h"
20
21 /*
22 * struct stat and squidaio_xstrdup use explicit pool alloc()/freeOne().
23 * XXX: convert to MEMPROXY_CLASS() API
24 */
25 #include "mem/Pool.h"
26
27 #include <cerrno>
28 #include <csignal>
29 #include <sys/stat.h>
30 #include <fcntl.h>
31 #include <pthread.h>
32 #include <dirent.h>
33 #if HAVE_SCHED_H
34 #include <sched.h>
35 #endif
36
37 #define RIDICULOUS_LENGTH 4096
38
39 enum _squidaio_thread_status {
40 _THREAD_STARTING = 0,
41 _THREAD_WAITING,
42 _THREAD_BUSY,
43 _THREAD_FAILED,
44 _THREAD_DONE
45 };
46 typedef enum _squidaio_thread_status squidaio_thread_status;
47
48 typedef struct squidaio_request_t {
49
50 struct squidaio_request_t *next;
51 squidaio_request_type request_type;
52 int cancelled;
53 char *path;
54 int oflag;
55 mode_t mode;
56 int fd;
57 char *bufferp;
58 size_t buflen;
59 off_t offset;
60 int whence;
61 int ret;
62 int err;
63
64 struct stat *tmpstatp;
65
66 struct stat *statp;
67 squidaio_result_t *resultp;
68 } squidaio_request_t;
69
70 typedef struct squidaio_request_queue_t {
71 pthread_mutex_t mutex;
72 pthread_cond_t cond;
73 squidaio_request_t *volatile head;
74 squidaio_request_t *volatile *volatile tailp;
75 unsigned long requests;
76 unsigned long blocked; /* main failed to lock the queue */
77 } squidaio_request_queue_t;
78
79 typedef struct squidaio_thread_t squidaio_thread_t;
80
81 struct squidaio_thread_t {
82 squidaio_thread_t *next;
83 pthread_t thread;
84 squidaio_thread_status status;
85
86 struct squidaio_request_t *current_req;
87 unsigned long requests;
88 };
89
90 static void squidaio_queue_request(squidaio_request_t *);
91 static void squidaio_cleanup_request(squidaio_request_t *);
92 void *squidaio_thread_loop(void *);
93 static void squidaio_do_open(squidaio_request_t *);
94 static void squidaio_do_read(squidaio_request_t *);
95 static void squidaio_do_write(squidaio_request_t *);
96 static void squidaio_do_close(squidaio_request_t *);
97 static void squidaio_do_stat(squidaio_request_t *);
98 static void squidaio_do_unlink(squidaio_request_t *);
99 #if AIO_OPENDIR
100 static void *squidaio_do_opendir(squidaio_request_t *);
101 #endif
102 static void squidaio_debug(squidaio_request_t *);
103 static void squidaio_poll_queues(void);
104
105 static squidaio_thread_t *threads = nullptr;
106 static int squidaio_initialised = 0;
107
108 #define AIO_LARGE_BUFS 16384
109 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
110 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
111 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
112 #define AIO_MICRO_BUFS 128
113
114 static Mem::Allocator *squidaio_large_bufs = nullptr; /* 16K */
115 static Mem::Allocator *squidaio_medium_bufs = nullptr; /* 8K */
116 static Mem::Allocator *squidaio_small_bufs = nullptr; /* 4K */
117 static Mem::Allocator *squidaio_tiny_bufs = nullptr; /* 2K */
118 static Mem::Allocator *squidaio_micro_bufs = nullptr; /* 128K */
119
120 static int request_queue_len = 0;
121 static Mem::Allocator *squidaio_request_pool = nullptr;
122 static Mem::Allocator *squidaio_thread_pool = nullptr;
123 static squidaio_request_queue_t request_queue;
124
125 static struct {
126 squidaio_request_t *head, **tailp;
127 }
128
129 request_queue2 = {
130
131 nullptr, &request_queue2.head
132 };
133 static squidaio_request_queue_t done_queue;
134
135 static struct {
136 squidaio_request_t *head, **tailp;
137 }
138
139 done_requests = {
140
141 nullptr, &done_requests.head
142 };
143 static pthread_attr_t globattr;
144 #if HAVE_SCHED_H
145
146 static struct sched_param globsched;
147 #endif
148 static pthread_t main_thread;
149
150 static Mem::Allocator *
151 squidaio_get_pool(int size)
152 {
153 if (size <= AIO_LARGE_BUFS) {
154 if (size <= AIO_MICRO_BUFS)
155 return squidaio_micro_bufs;
156 else if (size <= AIO_TINY_BUFS)
157 return squidaio_tiny_bufs;
158 else if (size <= AIO_SMALL_BUFS)
159 return squidaio_small_bufs;
160 else if (size <= AIO_MEDIUM_BUFS)
161 return squidaio_medium_bufs;
162 else
163 return squidaio_large_bufs;
164 }
165
166 return nullptr;
167 }
168
169 void *
170 squidaio_xmalloc(int size)
171 {
172 void *p;
173
174 if (const auto pool = squidaio_get_pool(size)) {
175 p = pool->alloc();
176 } else
177 p = xmalloc(size);
178
179 return p;
180 }
181
182 static char *
183 squidaio_xstrdup(const char *str)
184 {
185 char *p;
186 int len = strlen(str) + 1;
187
188 p = (char *)squidaio_xmalloc(len);
189 strncpy(p, str, len);
190
191 return p;
192 }
193
194 void
195 squidaio_xfree(void *p, int size)
196 {
197 if (const auto pool = squidaio_get_pool(size)) {
198 pool->freeOne(p);
199 } else
200 xfree(p);
201 }
202
203 static void
204 squidaio_xstrfree(char *str)
205 {
206 int len = strlen(str) + 1;
207
208 if (const auto pool = squidaio_get_pool(len)) {
209 pool->freeOne(str);
210 } else
211 xfree(str);
212 }
213
214 void
215 squidaio_init(void)
216 {
217 int i;
218 squidaio_thread_t *threadp;
219
220 if (squidaio_initialised)
221 return;
222
223 pthread_attr_init(&globattr);
224
225 #if HAVE_PTHREAD_ATTR_SETSCOPE
226
227 pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
228
229 #endif
230 #if HAVE_SCHED_H
231
232 globsched.sched_priority = 1;
233
234 #endif
235
236 main_thread = pthread_self();
237
238 #if HAVE_SCHED_H && HAVE_PTHREAD_SETSCHEDPARAM
239
240 pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
241
242 #endif
243 #if HAVE_SCHED_H
244
245 globsched.sched_priority = 2;
246
247 #endif
248 #if HAVE_SCHED_H && HAVE_PTHREAD_ATTR_SETSCHEDPARAM
249
250 pthread_attr_setschedparam(&globattr, &globsched);
251
252 #endif
253
254 /* Give each thread a smaller 256KB stack, should be more than sufficient */
255 pthread_attr_setstacksize(&globattr, 256 * 1024);
256
257 /* Initialize request queue */
258 if (pthread_mutex_init(&(request_queue.mutex), nullptr))
259 fatal("Failed to create mutex");
260
261 if (pthread_cond_init(&(request_queue.cond), nullptr))
262 fatal("Failed to create condition variable");
263
264 request_queue.head = nullptr;
265
266 request_queue.tailp = &request_queue.head;
267
268 request_queue.requests = 0;
269
270 request_queue.blocked = 0;
271
272 /* Initialize done queue */
273 if (pthread_mutex_init(&(done_queue.mutex), nullptr))
274 fatal("Failed to create mutex");
275
276 if (pthread_cond_init(&(done_queue.cond), nullptr))
277 fatal("Failed to create condition variable");
278
279 done_queue.head = nullptr;
280
281 done_queue.tailp = &done_queue.head;
282
283 done_queue.requests = 0;
284
285 done_queue.blocked = 0;
286
287 // Initialize the thread I/O pipes before creating any threads
288 // see bug 3189 comment 5 about race conditions.
289 CommIO::Initialize();
290
291 /* Create threads and get them to sit in their wait loop */
292 squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
293
294 assert(NUMTHREADS != 0);
295
296 for (i = 0; i < NUMTHREADS; ++i) {
297 threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
298 threadp->status = _THREAD_STARTING;
299 threadp->current_req = nullptr;
300 threadp->requests = 0;
301 threadp->next = threads;
302 threads = threadp;
303
304 if (pthread_create(&threadp->thread, &globattr, squidaio_thread_loop, threadp)) {
305 fprintf(stderr, "Thread creation failed\n");
306 threadp->status = _THREAD_FAILED;
307 continue;
308 }
309 }
310
311 /* Create request pool */
312 squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
313
314 squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
315
316 squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
317
318 squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
319
320 squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
321
322 squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
323
324 squidaio_initialised = 1;
325 }
326
327 void
328 squidaio_shutdown(void)
329 {
330 if (!squidaio_initialised)
331 return;
332
333 /* This is the same as in squidaio_sync */
334 do {
335 squidaio_poll_queues();
336 } while (request_queue_len > 0);
337
338 CommIO::NotifyIOClose();
339
340 squidaio_initialised = 0;
341 }
342
343 void *
344 squidaio_thread_loop(void *ptr)
345 {
346 squidaio_thread_t *threadp = (squidaio_thread_t *)ptr;
347 squidaio_request_t *request;
348 sigset_t newSig;
349
350 /*
351 * Make sure to ignore signals which may possibly get sent to
352 * the parent squid thread. Causes havoc with mutex's and
353 * condition waits otherwise
354 */
355
356 sigemptyset(&newSig);
357 sigaddset(&newSig, SIGPIPE);
358 sigaddset(&newSig, SIGCHLD);
359 #if defined(_SQUID_LINUX_THREADS_)
360
361 sigaddset(&newSig, SIGQUIT);
362 sigaddset(&newSig, SIGTRAP);
363 #else
364
365 sigaddset(&newSig, SIGUSR1);
366 sigaddset(&newSig, SIGUSR2);
367 #endif
368
369 sigaddset(&newSig, SIGHUP);
370 sigaddset(&newSig, SIGTERM);
371 sigaddset(&newSig, SIGINT);
372 sigaddset(&newSig, SIGALRM);
373 pthread_sigmask(SIG_BLOCK, &newSig, nullptr);
374
375 while (1) {
376 threadp->current_req = request = nullptr;
377 request = nullptr;
378 /* Get a request to process */
379 threadp->status = _THREAD_WAITING;
380 pthread_mutex_lock(&request_queue.mutex);
381
382 while (!request_queue.head) {
383 pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
384 }
385
386 request = request_queue.head;
387
388 if (request)
389 request_queue.head = request->next;
390
391 if (!request_queue.head)
392 request_queue.tailp = &request_queue.head;
393
394 pthread_mutex_unlock(&request_queue.mutex);
395
396 /* process the request */
397 threadp->status = _THREAD_BUSY;
398
399 request->next = nullptr;
400
401 threadp->current_req = request;
402
403 errno = 0;
404
405 if (!request->cancelled) {
406 switch (request->request_type) {
407
408 case _AIO_OP_OPEN:
409 squidaio_do_open(request);
410 break;
411
412 case _AIO_OP_READ:
413 squidaio_do_read(request);
414 break;
415
416 case _AIO_OP_WRITE:
417 squidaio_do_write(request);
418 break;
419
420 case _AIO_OP_CLOSE:
421 squidaio_do_close(request);
422 break;
423
424 case _AIO_OP_UNLINK:
425 squidaio_do_unlink(request);
426 break;
427
428 #if AIO_OPENDIR /* Opendir not implemented yet */
429
430 case _AIO_OP_OPENDIR:
431 squidaio_do_opendir(request);
432 break;
433 #endif
434
435 case _AIO_OP_STAT:
436 squidaio_do_stat(request);
437 break;
438
439 default:
440 request->ret = -1;
441 request->err = EINVAL;
442 break;
443 }
444 } else { /* cancelled */
445 request->ret = -1;
446 request->err = EINTR;
447 }
448
449 threadp->status = _THREAD_DONE;
450 /* put the request in the done queue */
451 pthread_mutex_lock(&done_queue.mutex);
452 *done_queue.tailp = request;
453 done_queue.tailp = &request->next;
454 pthread_mutex_unlock(&done_queue.mutex);
455 CommIO::NotifyIOCompleted();
456 ++ threadp->requests;
457 } /* while forever */
458
459 return nullptr;
460 } /* squidaio_thread_loop */
461
462 static void
463 squidaio_queue_request(squidaio_request_t * request)
464 {
465 static int high_start = 0;
466 debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
467 /* Mark it as not executed (failing result, no error) */
468 request->ret = -1;
469 request->err = 0;
470 /* Internal housekeeping */
471 request_queue_len += 1;
472 request->resultp->_data = request;
473 /* Play some tricks with the request_queue2 queue */
474 request->next = nullptr;
475
476 if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
477 if (request_queue2.head) {
478 /* Grab blocked requests */
479 *request_queue.tailp = request_queue2.head;
480 request_queue.tailp = request_queue2.tailp;
481 }
482
483 /* Enqueue request */
484 *request_queue.tailp = request;
485
486 request_queue.tailp = &request->next;
487
488 pthread_cond_signal(&request_queue.cond);
489
490 pthread_mutex_unlock(&request_queue.mutex);
491
492 if (request_queue2.head) {
493 /* Clear queue of blocked requests */
494 request_queue2.head = nullptr;
495 request_queue2.tailp = &request_queue2.head;
496 }
497 } else {
498 /* Oops, the request queue is blocked, use request_queue2 */
499 *request_queue2.tailp = request;
500 request_queue2.tailp = &request->next;
501 }
502
503 if (request_queue2.head) {
504 static uint64_t filter = 0;
505 static uint64_t filter_limit = 8192;
506
507 if (++filter >= filter_limit) {
508 filter_limit += filter;
509 filter = 0;
510 debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Queue congestion (growing to " << filter_limit << ")");
511 }
512 }
513
514 /* Warn if out of threads */
515 if (request_queue_len > MAGIC1) {
516 static int last_warn = 0;
517 static int queue_high, queue_low;
518
519 if (high_start == 0) {
520 high_start = squid_curtime;
521 queue_high = request_queue_len;
522 queue_low = request_queue_len;
523 }
524
525 if (request_queue_len > queue_high)
526 queue_high = request_queue_len;
527
528 if (request_queue_len < queue_low)
529 queue_low = request_queue_len;
530
531 if (squid_curtime >= (last_warn + 15) &&
532 squid_curtime >= (high_start + 5)) {
533 debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Disk I/O overloading");
534
535 if (squid_curtime >= (high_start + 15))
536 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
537 request_queue_len << ", high=" << queue_high <<
538 ", low=" << queue_low << ", duration=" <<
539 (long int) (squid_curtime - high_start));
540
541 last_warn = squid_curtime;
542 }
543 } else {
544 high_start = 0;
545 }
546
547 /* Warn if seriously overloaded */
548 if (request_queue_len > RIDICULOUS_LENGTH) {
549 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
550 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
551 squidaio_sync();
552 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
553 }
554 } /* squidaio_queue_request */
555
556 static void
557 squidaio_cleanup_request(squidaio_request_t * requestp)
558 {
559 squidaio_result_t *resultp = requestp->resultp;
560 int cancelled = requestp->cancelled;
561
562 /* Free allocated structures and copy data back to user space if the */
563 /* request hasn't been cancelled */
564
565 switch (requestp->request_type) {
566
567 case _AIO_OP_STAT:
568
569 if (!cancelled && requestp->ret == 0)
570 memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
571
572 squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
573
574 squidaio_xstrfree(requestp->path);
575
576 break;
577
578 case _AIO_OP_OPEN:
579 if (cancelled && requestp->ret >= 0)
580 /* The open() was cancelled but completed */
581 close(requestp->ret);
582
583 squidaio_xstrfree(requestp->path);
584
585 break;
586
587 case _AIO_OP_CLOSE:
588 if (cancelled && requestp->ret < 0)
589 /* The close() was cancelled and never got executed */
590 close(requestp->fd);
591
592 break;
593
594 case _AIO_OP_UNLINK:
595
596 case _AIO_OP_OPENDIR:
597 squidaio_xstrfree(requestp->path);
598
599 break;
600
601 case _AIO_OP_READ:
602 break;
603
604 case _AIO_OP_WRITE:
605 break;
606
607 default:
608 break;
609 }
610
611 if (resultp != nullptr && !cancelled) {
612 resultp->aio_return = requestp->ret;
613 resultp->aio_errno = requestp->err;
614 }
615
616 squidaio_request_pool->freeOne(requestp);
617 } /* squidaio_cleanup_request */
618
619 int
620 squidaio_cancel(squidaio_result_t * resultp)
621 {
622 squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
623
624 if (request && request->resultp == resultp) {
625 debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
626 request->cancelled = 1;
627 request->resultp = nullptr;
628 resultp->_data = nullptr;
629 resultp->result_type = _AIO_OP_NONE;
630 return 0;
631 }
632
633 return 1;
634 } /* squidaio_cancel */
635
636 int
637 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
638 {
639 squidaio_init();
640 squidaio_request_t *requestp;
641
642 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
643
644 requestp->path = (char *) squidaio_xstrdup(path);
645
646 requestp->oflag = oflag;
647
648 requestp->mode = mode;
649
650 requestp->resultp = resultp;
651
652 requestp->request_type = _AIO_OP_OPEN;
653
654 requestp->cancelled = 0;
655
656 resultp->result_type = _AIO_OP_OPEN;
657
658 squidaio_queue_request(requestp);
659
660 return 0;
661 }
662
663 static void
664 squidaio_do_open(squidaio_request_t * requestp)
665 {
666 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
667 requestp->err = errno;
668 }
669
670 int
671 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
672 {
673 squidaio_request_t *requestp;
674
675 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
676
677 requestp->fd = fd;
678
679 requestp->bufferp = bufp;
680
681 requestp->buflen = bufs;
682
683 requestp->offset = offset;
684
685 requestp->whence = whence;
686
687 requestp->resultp = resultp;
688
689 requestp->request_type = _AIO_OP_READ;
690
691 requestp->cancelled = 0;
692
693 resultp->result_type = _AIO_OP_READ;
694
695 squidaio_queue_request(requestp);
696
697 return 0;
698 }
699
700 static void
701 squidaio_do_read(squidaio_request_t * requestp)
702 {
703 if (lseek(requestp->fd, requestp->offset, requestp->whence) >= 0)
704 requestp->ret = read(requestp->fd, requestp->bufferp, requestp->buflen);
705 else
706 requestp->ret = -1;
707 requestp->err = errno;
708 }
709
710 int
711 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
712 {
713 squidaio_request_t *requestp;
714
715 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
716
717 requestp->fd = fd;
718
719 requestp->bufferp = bufp;
720
721 requestp->buflen = bufs;
722
723 requestp->offset = offset;
724
725 requestp->whence = whence;
726
727 requestp->resultp = resultp;
728
729 requestp->request_type = _AIO_OP_WRITE;
730
731 requestp->cancelled = 0;
732
733 resultp->result_type = _AIO_OP_WRITE;
734
735 squidaio_queue_request(requestp);
736
737 return 0;
738 }
739
740 static void
741 squidaio_do_write(squidaio_request_t * requestp)
742 {
743 requestp->ret = write(requestp->fd, requestp->bufferp, requestp->buflen);
744 requestp->err = errno;
745 }
746
747 int
748 squidaio_close(int fd, squidaio_result_t * resultp)
749 {
750 squidaio_request_t *requestp;
751
752 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
753
754 requestp->fd = fd;
755
756 requestp->resultp = resultp;
757
758 requestp->request_type = _AIO_OP_CLOSE;
759
760 requestp->cancelled = 0;
761
762 resultp->result_type = _AIO_OP_CLOSE;
763
764 squidaio_queue_request(requestp);
765
766 return 0;
767 }
768
769 static void
770 squidaio_do_close(squidaio_request_t * requestp)
771 {
772 requestp->ret = close(requestp->fd);
773 requestp->err = errno;
774 }
775
776 int
777
778 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
779 {
780 squidaio_init();
781 squidaio_request_t *requestp;
782
783 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
784
785 requestp->path = (char *) squidaio_xstrdup(path);
786
787 requestp->statp = sb;
788
789 requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
790
791 requestp->resultp = resultp;
792
793 requestp->request_type = _AIO_OP_STAT;
794
795 requestp->cancelled = 0;
796
797 resultp->result_type = _AIO_OP_STAT;
798
799 squidaio_queue_request(requestp);
800
801 return 0;
802 }
803
804 static void
805 squidaio_do_stat(squidaio_request_t * requestp)
806 {
807 requestp->ret = stat(requestp->path, requestp->tmpstatp);
808 requestp->err = errno;
809 }
810
811 int
812 squidaio_unlink(const char *path, squidaio_result_t * resultp)
813 {
814 squidaio_init();
815 squidaio_request_t *requestp;
816
817 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
818
819 requestp->path = squidaio_xstrdup(path);
820
821 requestp->resultp = resultp;
822
823 requestp->request_type = _AIO_OP_UNLINK;
824
825 requestp->cancelled = 0;
826
827 resultp->result_type = _AIO_OP_UNLINK;
828
829 squidaio_queue_request(requestp);
830
831 return 0;
832 }
833
834 static void
835 squidaio_do_unlink(squidaio_request_t * requestp)
836 {
837 requestp->ret = unlink(requestp->path);
838 requestp->err = errno;
839 }
840
841 #if AIO_OPENDIR
842 /* XXX squidaio_opendir NOT implemented yet.. */
843
844 int
845 squidaio_opendir(const char *path, squidaio_result_t * resultp)
846 {
847 squidaio_request_t *requestp;
848 int len;
849
850 requestp = squidaio_request_pool->alloc();
851
852 resultp->result_type = _AIO_OP_OPENDIR;
853
854 return -1;
855 }
856
857 static void
858 squidaio_do_opendir(squidaio_request_t * requestp)
859 {
860 /* NOT IMPLEMENTED */
861 }
862
863 #endif
864
865 static void
866 squidaio_poll_queues(void)
867 {
868 /* kick "overflow" request queue */
869
870 if (request_queue2.head &&
871 pthread_mutex_trylock(&request_queue.mutex) == 0) {
872 *request_queue.tailp = request_queue2.head;
873 request_queue.tailp = request_queue2.tailp;
874 pthread_cond_signal(&request_queue.cond);
875 pthread_mutex_unlock(&request_queue.mutex);
876 request_queue2.head = nullptr;
877 request_queue2.tailp = &request_queue2.head;
878 }
879
880 /* poll done queue */
881 if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) {
882
883 struct squidaio_request_t *requests = done_queue.head;
884 done_queue.head = nullptr;
885 done_queue.tailp = &done_queue.head;
886 pthread_mutex_unlock(&done_queue.mutex);
887 *done_requests.tailp = requests;
888 request_queue_len -= 1;
889
890 while (requests->next) {
891 requests = requests->next;
892 request_queue_len -= 1;
893 }
894
895 done_requests.tailp = &requests->next;
896 }
897 }
898
899 squidaio_result_t *
900 squidaio_poll_done(void)
901 {
902 squidaio_request_t *request;
903 squidaio_result_t *resultp;
904 int cancelled;
905 int polled = 0;
906
907 AIO_REPOLL:
908 request = done_requests.head;
909
910 if (request == nullptr && !polled) {
911 CommIO::ResetNotifications();
912 squidaio_poll_queues();
913 polled = 1;
914 request = done_requests.head;
915 }
916
917 if (!request) {
918 return nullptr;
919 }
920
921 debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
922 done_requests.head = request->next;
923
924 if (!done_requests.head)
925 done_requests.tailp = &done_requests.head;
926
927 resultp = request->resultp;
928
929 cancelled = request->cancelled;
930
931 squidaio_debug(request);
932
933 debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
934
935 squidaio_cleanup_request(request);
936
937 if (cancelled)
938 goto AIO_REPOLL;
939
940 return resultp;
941 } /* squidaio_poll_done */
942
943 int
944 squidaio_operations_pending(void)
945 {
946 return request_queue_len + (done_requests.head ? 1 : 0);
947 }
948
949 int
950 squidaio_sync(void)
951 {
952 /* XXX This might take a while if the queue is large.. */
953
954 do {
955 squidaio_poll_queues();
956 } while (request_queue_len > 0);
957
958 return squidaio_operations_pending();
959 }
960
961 int
962 squidaio_get_queue_len(void)
963 {
964 return request_queue_len;
965 }
966
967 static void
968 squidaio_debug(squidaio_request_t * request)
969 {
970 switch (request->request_type) {
971
972 case _AIO_OP_OPEN:
973 debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
974 break;
975
976 case _AIO_OP_READ:
977 debugs(43, 5, "READ on fd: " << request->fd);
978 break;
979
980 case _AIO_OP_WRITE:
981 debugs(43, 5, "WRITE on fd: " << request->fd);
982 break;
983
984 case _AIO_OP_CLOSE:
985 debugs(43, 5, "CLOSE of fd: " << request->fd);
986 break;
987
988 case _AIO_OP_UNLINK:
989 debugs(43, 5, "UNLINK of " << request->path);
990 break;
991
992 default:
993 break;
994 }
995 }
996
997 void
998 squidaio_stats(StoreEntry * sentry)
999 {
1000 squidaio_thread_t *threadp;
1001 int i;
1002
1003 if (!squidaio_initialised)
1004 return;
1005
1006 storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1007
1008 storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1009
1010 threadp = threads;
1011
1012 for (i = 0; i < NUMTHREADS; ++i) {
1013 storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, (unsigned long)threadp->thread, threadp->requests);
1014 threadp = threadp->next;
1015 }
1016 }
1017