]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/DiskThreads/aiops_win32.cc
Docs: Copyright updates for 2018 (#114)
[thirdparty/squid.git] / src / DiskIO / DiskThreads / aiops_win32.cc
1 /*
2 * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 43 Windows AIOPS */
10
11 #include "squid.h"
12 #include "DiskIO/DiskThreads/CommIO.h"
13 #include "DiskThreads.h"
14 #include "fd.h"
15 #include "mem/Pool.h"
16 #include "SquidConfig.h"
17 #include "SquidTime.h"
18 #include "Store.h"
19
20 #include <cerrno>
21 #include <csignal>
22 #include <sys/stat.h>
23 #include <fcntl.h>
24 #include <dirent.h>
25
26 #define RIDICULOUS_LENGTH 4096
27
28 enum _squidaio_thread_status {
29 _THREAD_STARTING = 0,
30 _THREAD_WAITING,
31 _THREAD_BUSY,
32 _THREAD_FAILED,
33 _THREAD_DONE
34 };
35 typedef enum _squidaio_thread_status squidaio_thread_status;
36
37 typedef struct squidaio_request_t {
38
39 struct squidaio_request_t *next;
40 squidaio_request_type request_type;
41 int cancelled;
42 char *path;
43 int oflag;
44 mode_t mode;
45 int fd;
46 char *bufferp;
47 char *tmpbufp;
48 size_t buflen;
49 off_t offset;
50 int whence;
51 int ret;
52 int err;
53
54 struct stat *tmpstatp;
55
56 struct stat *statp;
57 squidaio_result_t *resultp;
58 } squidaio_request_t;
59
60 typedef struct squidaio_request_queue_t {
61 HANDLE mutex;
62 HANDLE cond; /* See Event objects */
63 squidaio_request_t *volatile head;
64 squidaio_request_t *volatile *volatile tailp;
65 unsigned long requests;
66 unsigned long blocked; /* main failed to lock the queue */
67 } squidaio_request_queue_t;
68
69 typedef struct squidaio_thread_t squidaio_thread_t;
70
71 struct squidaio_thread_t {
72 squidaio_thread_t *next;
73 HANDLE thread;
74 DWORD dwThreadId; /* thread ID */
75 squidaio_thread_status status;
76
77 struct squidaio_request_t *current_req;
78 unsigned long requests;
79 int volatile exit;
80 };
81
82 static void squidaio_queue_request(squidaio_request_t *);
83 static void squidaio_cleanup_request(squidaio_request_t *);
84 static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
85 static void squidaio_do_open(squidaio_request_t *);
86 static void squidaio_do_read(squidaio_request_t *);
87 static void squidaio_do_write(squidaio_request_t *);
88 static void squidaio_do_close(squidaio_request_t *);
89 static void squidaio_do_stat(squidaio_request_t *);
90 static void squidaio_do_unlink(squidaio_request_t *);
91 #if AIO_OPENDIR
92 static void *squidaio_do_opendir(squidaio_request_t *);
93 #endif
94 static void squidaio_debug(squidaio_request_t *);
95 static void squidaio_poll_queues(void);
96
97 static squidaio_thread_t *threads = NULL;
98 static int squidaio_initialised = 0;
99
100 #define AIO_LARGE_BUFS 16384
101 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
102 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
103 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
104 #define AIO_MICRO_BUFS 128
105
106 static MemAllocator *squidaio_large_bufs = NULL; /* 16K */
107 static MemAllocator *squidaio_medium_bufs = NULL; /* 8K */
108 static MemAllocator *squidaio_small_bufs = NULL; /* 4K */
109 static MemAllocator *squidaio_tiny_bufs = NULL; /* 2K */
110 static MemAllocator *squidaio_micro_bufs = NULL; /* 128K */
111
112 static int request_queue_len = 0;
113 static MemAllocator *squidaio_request_pool = NULL;
114 static MemAllocator *squidaio_thread_pool = NULL;
115 static squidaio_request_queue_t request_queue;
116
117 static struct {
118 squidaio_request_t *head, **tailp;
119 }
120
121 request_queue2 = {
122
123 NULL, &request_queue2.head
124 };
125 static squidaio_request_queue_t done_queue;
126
127 static struct {
128 squidaio_request_t *head, **tailp;
129 }
130
131 done_requests = {
132
133 NULL, &done_requests.head
134 };
135
136 static HANDLE main_thread;
137
138 static MemAllocator *
139 squidaio_get_pool(int size)
140 {
141 if (size <= AIO_LARGE_BUFS) {
142 if (size <= AIO_MICRO_BUFS)
143 return squidaio_micro_bufs;
144 else if (size <= AIO_TINY_BUFS)
145 return squidaio_tiny_bufs;
146 else if (size <= AIO_SMALL_BUFS)
147 return squidaio_small_bufs;
148 else if (size <= AIO_MEDIUM_BUFS)
149 return squidaio_medium_bufs;
150 else
151 return squidaio_large_bufs;
152 }
153
154 return NULL;
155 }
156
157 void *
158 squidaio_xmalloc(int size)
159 {
160 void *p;
161 MemAllocator *pool;
162
163 if ((pool = squidaio_get_pool(size)) != NULL) {
164 p = pool->alloc();
165 } else
166 p = xmalloc(size);
167
168 return p;
169 }
170
171 static char *
172 squidaio_xstrdup(const char *str)
173 {
174 char *p;
175 int len = strlen(str) + 1;
176
177 p = (char *)squidaio_xmalloc(len);
178 strncpy(p, str, len);
179
180 return p;
181 }
182
183 void
184 squidaio_xfree(void *p, int size)
185 {
186 MemAllocator *pool;
187
188 if ((pool = squidaio_get_pool(size)) != NULL) {
189 pool->freeOne(p);
190 } else
191 xfree(p);
192 }
193
194 static void
195 squidaio_xstrfree(char *str)
196 {
197 MemAllocator *pool;
198 int len = strlen(str) + 1;
199
200 if ((pool = squidaio_get_pool(len)) != NULL) {
201 pool->freeOne(str);
202 } else
203 xfree(str);
204 }
205
206 void
207 squidaio_init(void)
208 {
209 int i;
210 squidaio_thread_t *threadp;
211
212 if (squidaio_initialised)
213 return;
214
215 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
216 GetCurrentThread(), /* pseudo handle to copy */
217 GetCurrentProcess(), /* pseudo handle, don't close */
218 &main_thread,
219 0, /* required access */
220 FALSE, /* child process's don't inherit the handle */
221 DUPLICATE_SAME_ACCESS)) {
222 /* spit errors */
223 fatal("Couldn't get current thread handle");
224 }
225
226 /* Initialize request queue */
227 if ((request_queue.mutex = CreateMutex(NULL, /* no inheritance */
228 FALSE, /* start unowned (as per mutex_init) */
229 NULL) /* no name */
230 ) == NULL) {
231 fatal("Failed to create mutex");
232 }
233
234 if ((request_queue.cond = CreateEvent(NULL, /* no inheritance */
235 FALSE, /* auto signal reset - which I think is pthreads like ? */
236 FALSE, /* start non signaled */
237 NULL) /* no name */
238 ) == NULL) {
239 fatal("Failed to create condition variable");
240 }
241
242 request_queue.head = NULL;
243
244 request_queue.tailp = &request_queue.head;
245
246 request_queue.requests = 0;
247
248 request_queue.blocked = 0;
249
250 /* Initialize done queue */
251
252 if ((done_queue.mutex = CreateMutex(NULL, /* no inheritance */
253 FALSE, /* start unowned (as per mutex_init) */
254 NULL) /* no name */
255 ) == NULL) {
256 fatal("Failed to create mutex");
257 }
258
259 if ((done_queue.cond = CreateEvent(NULL, /* no inheritance */
260 TRUE, /* manually signaled - which I think is pthreads like ? */
261 FALSE, /* start non signaled */
262 NULL) /* no name */
263 ) == NULL) {
264 fatal("Failed to create condition variable");
265 }
266
267 done_queue.head = NULL;
268
269 done_queue.tailp = &done_queue.head;
270
271 done_queue.requests = 0;
272
273 done_queue.blocked = 0;
274
275 // Initialize the thread I/O pipes before creating any threads
276 // see bug 3189 comment 5 about race conditions.
277 CommIO::Initialize();
278
279 /* Create threads and get them to sit in their wait loop */
280 squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
281
282 assert(NUMTHREADS);
283
284 for (i = 0; i < NUMTHREADS; ++i) {
285 threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
286 threadp->status = _THREAD_STARTING;
287 threadp->current_req = NULL;
288 threadp->requests = 0;
289 threadp->next = threads;
290 threads = threadp;
291
292 if ((threadp->thread = CreateThread(NULL, /* no security attributes */
293 0, /* use default stack size */
294 squidaio_thread_loop, /* thread function */
295 threadp, /* argument to thread function */
296 0, /* use default creation flags */
297 &(threadp->dwThreadId)) /* returns the thread identifier */
298 ) == NULL) {
299 fprintf(stderr, "Thread creation failed\n");
300 threadp->status = _THREAD_FAILED;
301 continue;
302 }
303
304 /* Set the new thread priority above parent process */
305 SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
306 }
307
308 /* Create request pool */
309 squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
310
311 squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
312
313 squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
314
315 squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
316
317 squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
318
319 squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
320
321 squidaio_initialised = 1;
322 }
323
324 void
325 squidaio_shutdown(void)
326 {
327 squidaio_thread_t *threadp;
328 int i;
329 HANDLE * hthreads;
330
331 if (!squidaio_initialised)
332 return;
333
334 /* This is the same as in squidaio_sync */
335 do {
336 squidaio_poll_queues();
337 } while (request_queue_len > 0);
338
339 hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
340
341 threadp = threads;
342
343 for (i = 0; i < NUMTHREADS; ++i) {
344 threadp->exit = 1;
345 hthreads[i] = threadp->thread;
346 threadp = threadp->next;
347 }
348
349 ReleaseMutex(request_queue.mutex);
350 ResetEvent(request_queue.cond);
351 ReleaseMutex(done_queue.mutex);
352 ResetEvent(done_queue.cond);
353 Sleep(0);
354
355 WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
356
357 for (i = 0; i < NUMTHREADS; ++i) {
358 CloseHandle(hthreads[i]);
359 }
360
361 CloseHandle(main_thread);
362 CommIO::NotifyIOClose();
363
364 squidaio_initialised = 0;
365 xfree(hthreads);
366 }
367
368 static DWORD WINAPI
369 squidaio_thread_loop(LPVOID lpParam)
370 {
371 squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
372 squidaio_request_t *request;
373 HANDLE cond; /* local copy of the event queue because win32 event handles
374 * don't atomically release the mutex as cond variables do. */
375
376 /* lock the thread info */
377
378 if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
379 fatal("Can't get ownership of mutex\n");
380 }
381
382 /* duplicate the handle */
383 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
384 request_queue.cond, /* handle to copy */
385 GetCurrentProcess(), /* pseudo handle, don't close */
386 &cond,
387 0, /* required access */
388 FALSE, /* child process's don't inherit the handle */
389 DUPLICATE_SAME_ACCESS))
390 fatal("Can't duplicate mutex handle\n");
391
392 if (!ReleaseMutex(request_queue.mutex)) {
393 CloseHandle(cond);
394 fatal("Can't release mutex\n");
395 }
396
397 Sleep(0);
398
399 while (1) {
400 DWORD rv;
401 threadp->current_req = request = NULL;
402 request = NULL;
403 /* Get a request to process */
404 threadp->status = _THREAD_WAITING;
405
406 if (threadp->exit) {
407 CloseHandle(request_queue.mutex);
408 CloseHandle(cond);
409 return 0;
410 }
411
412 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
413
414 if (rv == WAIT_FAILED) {
415 CloseHandle(cond);
416 return 1;
417 }
418
419 while (!request_queue.head) {
420 if (!ReleaseMutex(request_queue.mutex)) {
421 CloseHandle(cond);
422 threadp->status = _THREAD_FAILED;
423 return 1;
424 }
425
426 Sleep(0);
427 rv = WaitForSingleObject(cond, INFINITE);
428
429 if (rv == WAIT_FAILED) {
430 CloseHandle(cond);
431 return 1;
432 }
433
434 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
435
436 if (rv == WAIT_FAILED) {
437 CloseHandle(cond);
438 return 1;
439 }
440 }
441
442 request = request_queue.head;
443
444 if (request)
445 request_queue.head = request->next;
446
447 if (!request_queue.head)
448 request_queue.tailp = &request_queue.head;
449
450 if (!ReleaseMutex(request_queue.mutex)) {
451 CloseHandle(cond);
452 return 1;
453 }
454
455 Sleep(0);
456
457 /* process the request */
458 threadp->status = _THREAD_BUSY;
459
460 request->next = NULL;
461
462 threadp->current_req = request;
463
464 errno = 0;
465
466 if (!request->cancelled) {
467 switch (request->request_type) {
468
469 case _AIO_OP_OPEN:
470 squidaio_do_open(request);
471 break;
472
473 case _AIO_OP_READ:
474 squidaio_do_read(request);
475 break;
476
477 case _AIO_OP_WRITE:
478 squidaio_do_write(request);
479 break;
480
481 case _AIO_OP_CLOSE:
482 squidaio_do_close(request);
483 break;
484
485 case _AIO_OP_UNLINK:
486 squidaio_do_unlink(request);
487 break;
488
489 #if AIO_OPENDIR /* Opendir not implemented yet */
490
491 case _AIO_OP_OPENDIR:
492 squidaio_do_opendir(request);
493 break;
494 #endif
495
496 case _AIO_OP_STAT:
497 squidaio_do_stat(request);
498 break;
499
500 default:
501 request->ret = -1;
502 request->err = EINVAL;
503 break;
504 }
505 } else { /* cancelled */
506 request->ret = -1;
507 request->err = EINTR;
508 }
509
510 threadp->status = _THREAD_DONE;
511 /* put the request in the done queue */
512 rv = WaitForSingleObject(done_queue.mutex, INFINITE);
513
514 if (rv == WAIT_FAILED) {
515 CloseHandle(cond);
516 return 1;
517 }
518
519 *done_queue.tailp = request;
520 done_queue.tailp = &request->next;
521
522 if (!ReleaseMutex(done_queue.mutex)) {
523 CloseHandle(cond);
524 return 1;
525 }
526
527 CommIO::NotifyIOCompleted();
528 Sleep(0);
529 ++ threadp->requests;
530 } /* while forever */
531
532 CloseHandle(cond);
533
534 return 0;
535 } /* squidaio_thread_loop */
536
537 static void
538 squidaio_queue_request(squidaio_request_t * request)
539 {
540 static int high_start = 0;
541 debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
542 /* Mark it as not executed (failing result, no error) */
543 request->ret = -1;
544 request->err = 0;
545 /* Internal housekeeping */
546 request_queue_len += 1;
547 request->resultp->_data = request;
548 /* Play some tricks with the request_queue2 queue */
549 request->next = NULL;
550
551 if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
552 if (request_queue2.head) {
553 /* Grab blocked requests */
554 *request_queue.tailp = request_queue2.head;
555 request_queue.tailp = request_queue2.tailp;
556 }
557
558 /* Enqueue request */
559 *request_queue.tailp = request;
560
561 request_queue.tailp = &request->next;
562
563 if (!SetEvent(request_queue.cond))
564 fatal("Couldn't push queue");
565
566 if (!ReleaseMutex(request_queue.mutex)) {
567 /* unexpected error */
568 fatal("Couldn't push queue");
569 }
570
571 Sleep(0);
572
573 if (request_queue2.head) {
574 /* Clear queue of blocked requests */
575 request_queue2.head = NULL;
576 request_queue2.tailp = &request_queue2.head;
577 }
578 } else {
579 /* Oops, the request queue is blocked, use request_queue2 */
580 *request_queue2.tailp = request;
581 request_queue2.tailp = &request->next;
582 }
583
584 if (request_queue2.head) {
585 static uint64_t filter = 0;
586 static uint64_t filter_limit = 8196;
587
588 if (++filter >= filter_limit) {
589 filter_limit += filter;
590 filter = 0;
591 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Queue congestion (growing to " << filter_limit << ")");
592 }
593 }
594
595 /* Warn if out of threads */
596 if (request_queue_len > MAGIC1) {
597 static int last_warn = 0;
598 static int queue_high, queue_low;
599
600 if (high_start == 0) {
601 high_start = (int)squid_curtime;
602 queue_high = request_queue_len;
603 queue_low = request_queue_len;
604 }
605
606 if (request_queue_len > queue_high)
607 queue_high = request_queue_len;
608
609 if (request_queue_len < queue_low)
610 queue_low = request_queue_len;
611
612 if (squid_curtime >= (last_warn + 15) &&
613 squid_curtime >= (high_start + 5)) {
614 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Disk I/O overloading");
615
616 if (squid_curtime >= (high_start + 15))
617 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
618 request_queue_len << ", high=" << queue_high <<
619 ", low=" << queue_low << ", duration=" <<
620 (long int) (squid_curtime - high_start));
621
622 last_warn = (int)squid_curtime;
623 }
624 } else {
625 high_start = 0;
626 }
627
628 /* Warn if seriously overloaded */
629 if (request_queue_len > RIDICULOUS_LENGTH) {
630 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
631 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
632 squidaio_sync();
633 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
634 }
635 } /* squidaio_queue_request */
636
637 static void
638 squidaio_cleanup_request(squidaio_request_t * requestp)
639 {
640 squidaio_result_t *resultp = requestp->resultp;
641 int cancelled = requestp->cancelled;
642
643 /* Free allocated structures and copy data back to user space if the */
644 /* request hasn't been cancelled */
645
646 switch (requestp->request_type) {
647
648 case _AIO_OP_STAT:
649
650 if (!cancelled && requestp->ret == 0)
651 memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
652
653 squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
654
655 squidaio_xstrfree(requestp->path);
656
657 break;
658
659 case _AIO_OP_OPEN:
660 if (cancelled && requestp->ret >= 0)
661 /* The open() was cancelled but completed */
662 close(requestp->ret);
663
664 squidaio_xstrfree(requestp->path);
665
666 break;
667
668 case _AIO_OP_CLOSE:
669 if (cancelled && requestp->ret < 0)
670 /* The close() was cancelled and never got executed */
671 close(requestp->fd);
672
673 break;
674
675 case _AIO_OP_UNLINK:
676
677 case _AIO_OP_OPENDIR:
678 squidaio_xstrfree(requestp->path);
679
680 break;
681
682 case _AIO_OP_READ:
683 break;
684
685 case _AIO_OP_WRITE:
686 break;
687
688 default:
689 break;
690 }
691
692 if (resultp != NULL && !cancelled) {
693 resultp->aio_return = requestp->ret;
694 resultp->aio_errno = requestp->err;
695 }
696
697 squidaio_request_pool->freeOne(requestp);
698 } /* squidaio_cleanup_request */
699
700 int
701 squidaio_cancel(squidaio_result_t * resultp)
702 {
703 squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
704
705 if (request && request->resultp == resultp) {
706 debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
707 request->cancelled = 1;
708 request->resultp = NULL;
709 resultp->_data = NULL;
710 resultp->result_type = _AIO_OP_NONE;
711 return 0;
712 }
713
714 return 1;
715 } /* squidaio_cancel */
716
717 int
718 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
719 {
720 squidaio_init();
721 squidaio_request_t *requestp;
722
723 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
724
725 requestp->path = (char *) squidaio_xstrdup(path);
726
727 requestp->oflag = oflag;
728
729 requestp->mode = mode;
730
731 requestp->resultp = resultp;
732
733 requestp->request_type = _AIO_OP_OPEN;
734
735 requestp->cancelled = 0;
736
737 resultp->result_type = _AIO_OP_OPEN;
738
739 squidaio_queue_request(requestp);
740
741 return 0;
742 }
743
744 static void
745 squidaio_do_open(squidaio_request_t * requestp)
746 {
747 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
748 requestp->err = errno;
749 }
750
751 int
752 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
753 {
754 squidaio_request_t *requestp;
755
756 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
757
758 requestp->fd = fd;
759
760 requestp->bufferp = bufp;
761
762 requestp->buflen = bufs;
763
764 requestp->offset = offset;
765
766 requestp->whence = whence;
767
768 requestp->resultp = resultp;
769
770 requestp->request_type = _AIO_OP_READ;
771
772 requestp->cancelled = 0;
773
774 resultp->result_type = _AIO_OP_READ;
775
776 squidaio_queue_request(requestp);
777
778 return 0;
779 }
780
781 static void
782 squidaio_do_read(squidaio_request_t * requestp)
783 {
784 lseek(requestp->fd, requestp->offset, requestp->whence);
785
786 if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
787 requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
788 WIN32_maperror(GetLastError());
789 requestp->ret = -1;
790 }
791
792 requestp->err = errno;
793 }
794
795 int
796 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
797 {
798 squidaio_request_t *requestp;
799
800 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
801
802 requestp->fd = fd;
803
804 requestp->bufferp = bufp;
805
806 requestp->buflen = bufs;
807
808 requestp->offset = offset;
809
810 requestp->whence = whence;
811
812 requestp->resultp = resultp;
813
814 requestp->request_type = _AIO_OP_WRITE;
815
816 requestp->cancelled = 0;
817
818 resultp->result_type = _AIO_OP_WRITE;
819
820 squidaio_queue_request(requestp);
821
822 return 0;
823 }
824
825 static void
826 squidaio_do_write(squidaio_request_t * requestp)
827 {
828 if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
829 requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
830 WIN32_maperror(GetLastError());
831 requestp->ret = -1;
832 }
833
834 requestp->err = errno;
835 }
836
837 int
838 squidaio_close(int fd, squidaio_result_t * resultp)
839 {
840 squidaio_request_t *requestp;
841
842 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
843
844 requestp->fd = fd;
845
846 requestp->resultp = resultp;
847
848 requestp->request_type = _AIO_OP_CLOSE;
849
850 requestp->cancelled = 0;
851
852 resultp->result_type = _AIO_OP_CLOSE;
853
854 squidaio_queue_request(requestp);
855
856 return 0;
857 }
858
859 static void
860 squidaio_do_close(squidaio_request_t * requestp)
861 {
862 if ((requestp->ret = close(requestp->fd)) < 0) {
863 debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
864 close(requestp->fd);
865 }
866
867 requestp->err = errno;
868 }
869
870 int
871
872 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
873 {
874 squidaio_init();
875 squidaio_request_t *requestp;
876
877 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
878
879 requestp->path = (char *) squidaio_xstrdup(path);
880
881 requestp->statp = sb;
882
883 requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
884
885 requestp->resultp = resultp;
886
887 requestp->request_type = _AIO_OP_STAT;
888
889 requestp->cancelled = 0;
890
891 resultp->result_type = _AIO_OP_STAT;
892
893 squidaio_queue_request(requestp);
894
895 return 0;
896 }
897
898 static void
899 squidaio_do_stat(squidaio_request_t * requestp)
900 {
901 requestp->ret = stat(requestp->path, requestp->tmpstatp);
902 requestp->err = errno;
903 }
904
905 int
906 squidaio_unlink(const char *path, squidaio_result_t * resultp)
907 {
908 squidaio_init();
909 squidaio_request_t *requestp;
910
911 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
912
913 requestp->path = squidaio_xstrdup(path);
914
915 requestp->resultp = resultp;
916
917 requestp->request_type = _AIO_OP_UNLINK;
918
919 requestp->cancelled = 0;
920
921 resultp->result_type = _AIO_OP_UNLINK;
922
923 squidaio_queue_request(requestp);
924
925 return 0;
926 }
927
928 static void
929 squidaio_do_unlink(squidaio_request_t * requestp)
930 {
931 requestp->ret = unlink(requestp->path);
932 requestp->err = errno;
933 }
934
935 #if AIO_OPENDIR
936 /* XXX squidaio_opendir NOT implemented yet.. */
937
938 int
939 squidaio_opendir(const char *path, squidaio_result_t * resultp)
940 {
941 squidaio_request_t *requestp;
942 int len;
943
944 requestp = squidaio_request_pool->alloc();
945
946 resultp->result_type = _AIO_OP_OPENDIR;
947
948 return -1;
949 }
950
951 static void
952 squidaio_do_opendir(squidaio_request_t * requestp)
953 {
954 /* NOT IMPLEMENTED */
955 }
956
957 #endif
958
959 static void
960 squidaio_poll_queues(void)
961 {
962 /* kick "overflow" request queue */
963
964 if (request_queue2.head &&
965 (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
966 *request_queue.tailp = request_queue2.head;
967 request_queue.tailp = request_queue2.tailp;
968
969 if (!SetEvent(request_queue.cond))
970 fatal("couldn't push queue\n");
971
972 if (!ReleaseMutex(request_queue.mutex)) {
973 /* unexpected error */
974 }
975
976 Sleep(0);
977 request_queue2.head = NULL;
978 request_queue2.tailp = &request_queue2.head;
979 }
980
981 /* poll done queue */
982 if (done_queue.head &&
983 (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
984
985 struct squidaio_request_t *requests = done_queue.head;
986 done_queue.head = NULL;
987 done_queue.tailp = &done_queue.head;
988
989 if (!ReleaseMutex(done_queue.mutex)) {
990 /* unexpected error */
991 }
992
993 Sleep(0);
994 *done_requests.tailp = requests;
995 request_queue_len -= 1;
996
997 while (requests->next) {
998 requests = requests->next;
999 request_queue_len -= 1;
1000 }
1001
1002 done_requests.tailp = &requests->next;
1003 }
1004 }
1005
1006 squidaio_result_t *
1007 squidaio_poll_done(void)
1008 {
1009 squidaio_request_t *request;
1010 squidaio_result_t *resultp;
1011 int cancelled;
1012 int polled = 0;
1013
1014 AIO_REPOLL:
1015 request = done_requests.head;
1016
1017 if (request == NULL && !polled) {
1018 CommIO::ResetNotifications();
1019 squidaio_poll_queues();
1020 polled = 1;
1021 request = done_requests.head;
1022 }
1023
1024 if (!request) {
1025 return NULL;
1026 }
1027
1028 debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
1029 done_requests.head = request->next;
1030
1031 if (!done_requests.head)
1032 done_requests.tailp = &done_requests.head;
1033
1034 resultp = request->resultp;
1035
1036 cancelled = request->cancelled;
1037
1038 squidaio_debug(request);
1039
1040 debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
1041
1042 squidaio_cleanup_request(request);
1043
1044 if (cancelled)
1045 goto AIO_REPOLL;
1046
1047 return resultp;
1048 } /* squidaio_poll_done */
1049
1050 int
1051 squidaio_operations_pending(void)
1052 {
1053 return request_queue_len + (done_requests.head ? 1 : 0);
1054 }
1055
1056 int
1057 squidaio_sync(void)
1058 {
1059 /* XXX This might take a while if the queue is large.. */
1060
1061 do {
1062 squidaio_poll_queues();
1063 } while (request_queue_len > 0);
1064
1065 return squidaio_operations_pending();
1066 }
1067
1068 int
1069 squidaio_get_queue_len(void)
1070 {
1071 return request_queue_len;
1072 }
1073
1074 static void
1075 squidaio_debug(squidaio_request_t * request)
1076 {
1077 switch (request->request_type) {
1078
1079 case _AIO_OP_OPEN:
1080 debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1081 break;
1082
1083 case _AIO_OP_READ:
1084 debugs(43, 5, "READ on fd: " << request->fd);
1085 break;
1086
1087 case _AIO_OP_WRITE:
1088 debugs(43, 5, "WRITE on fd: " << request->fd);
1089 break;
1090
1091 case _AIO_OP_CLOSE:
1092 debugs(43, 5, "CLOSE of fd: " << request->fd);
1093 break;
1094
1095 case _AIO_OP_UNLINK:
1096 debugs(43, 5, "UNLINK of " << request->path);
1097 break;
1098
1099 default:
1100 break;
1101 }
1102 }
1103
1104 void
1105 squidaio_stats(StoreEntry * sentry)
1106 {
1107 squidaio_thread_t *threadp;
1108 int i;
1109
1110 if (!squidaio_initialised)
1111 return;
1112
1113 storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1114
1115 storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1116
1117 threadp = threads;
1118
1119 for (i = 0; i < NUMTHREADS; ++i) {
1120 storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1121 threadp = threadp->next;
1122 }
1123 }
1124