]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/DiskThreads/aiops_win32.cc
Cleanup: un-wrap C++ header includes
[thirdparty/squid.git] / src / DiskIO / DiskThreads / aiops_win32.cc
1 /*
2 * DEBUG: section 43 Windows AIOPS
3 * AUTHOR: Stewart Forster <slf@connect.com.au>
4 * AUTHOR: Robert Collins <robertc@squid-cache.org>
5 * AUTHOR: Guido Serassio <serassio@squid-cache.org>
6 *
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
9 *
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
23 *
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 */
34
35 #include "squid.h"
36 #include "DiskIO/DiskThreads/CommIO.h"
37 #include "DiskThreads.h"
38 #include "fd.h"
39 #include "SquidConfig.h"
40 #include "SquidTime.h"
41 #include "Store.h"
42
43 #include <csignal>
44 #include <sys/stat.h>
45 #include <fcntl.h>
46 #include <errno.h>
47 #include <dirent.h>
48
49 #define RIDICULOUS_LENGTH 4096
50
51 enum _squidaio_thread_status {
52 _THREAD_STARTING = 0,
53 _THREAD_WAITING,
54 _THREAD_BUSY,
55 _THREAD_FAILED,
56 _THREAD_DONE
57 };
58 typedef enum _squidaio_thread_status squidaio_thread_status;
59
60 typedef struct squidaio_request_t {
61
62 struct squidaio_request_t *next;
63 squidaio_request_type request_type;
64 int cancelled;
65 char *path;
66 int oflag;
67 mode_t mode;
68 int fd;
69 char *bufferp;
70 char *tmpbufp;
71 size_t buflen;
72 off_t offset;
73 int whence;
74 int ret;
75 int err;
76
77 struct stat *tmpstatp;
78
79 struct stat *statp;
80 squidaio_result_t *resultp;
81 } squidaio_request_t;
82
83 typedef struct squidaio_request_queue_t {
84 HANDLE mutex;
85 HANDLE cond; /* See Event objects */
86 squidaio_request_t *volatile head;
87 squidaio_request_t *volatile *volatile tailp;
88 unsigned long requests;
89 unsigned long blocked; /* main failed to lock the queue */
90 } squidaio_request_queue_t;
91
92 typedef struct squidaio_thread_t squidaio_thread_t;
93
94 struct squidaio_thread_t {
95 squidaio_thread_t *next;
96 HANDLE thread;
97 DWORD dwThreadId; /* thread ID */
98 squidaio_thread_status status;
99
100 struct squidaio_request_t *current_req;
101 unsigned long requests;
102 int volatile exit;
103 };
104
105 static void squidaio_queue_request(squidaio_request_t *);
106 static void squidaio_cleanup_request(squidaio_request_t *);
107 static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
108 static void squidaio_do_open(squidaio_request_t *);
109 static void squidaio_do_read(squidaio_request_t *);
110 static void squidaio_do_write(squidaio_request_t *);
111 static void squidaio_do_close(squidaio_request_t *);
112 static void squidaio_do_stat(squidaio_request_t *);
113 static void squidaio_do_unlink(squidaio_request_t *);
114 #if AIO_OPENDIR
115 static void *squidaio_do_opendir(squidaio_request_t *);
116 #endif
117 static void squidaio_debug(squidaio_request_t *);
118 static void squidaio_poll_queues(void);
119
120 static squidaio_thread_t *threads = NULL;
121 static int squidaio_initialised = 0;
122
123 #define AIO_LARGE_BUFS 16384
124 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
125 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
126 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
127 #define AIO_MICRO_BUFS 128
128
129 static MemAllocator *squidaio_large_bufs = NULL; /* 16K */
130 static MemAllocator *squidaio_medium_bufs = NULL; /* 8K */
131 static MemAllocator *squidaio_small_bufs = NULL; /* 4K */
132 static MemAllocator *squidaio_tiny_bufs = NULL; /* 2K */
133 static MemAllocator *squidaio_micro_bufs = NULL; /* 128K */
134
135 static int request_queue_len = 0;
136 static MemAllocator *squidaio_request_pool = NULL;
137 static MemAllocator *squidaio_thread_pool = NULL;
138 static squidaio_request_queue_t request_queue;
139
140 static struct {
141 squidaio_request_t *head, **tailp;
142 }
143
144 request_queue2 = {
145
146 NULL, &request_queue2.head
147 };
148 static squidaio_request_queue_t done_queue;
149
150 static struct {
151 squidaio_request_t *head, **tailp;
152 }
153
154 done_requests = {
155
156 NULL, &done_requests.head
157 };
158
159 static HANDLE main_thread;
160
161 static MemAllocator *
162 squidaio_get_pool(int size)
163 {
164 if (size <= AIO_LARGE_BUFS) {
165 if (size <= AIO_MICRO_BUFS)
166 return squidaio_micro_bufs;
167 else if (size <= AIO_TINY_BUFS)
168 return squidaio_tiny_bufs;
169 else if (size <= AIO_SMALL_BUFS)
170 return squidaio_small_bufs;
171 else if (size <= AIO_MEDIUM_BUFS)
172 return squidaio_medium_bufs;
173 else
174 return squidaio_large_bufs;
175 }
176
177 return NULL;
178 }
179
180 void *
181 squidaio_xmalloc(int size)
182 {
183 void *p;
184 MemAllocator *pool;
185
186 if ((pool = squidaio_get_pool(size)) != NULL) {
187 p = pool->alloc();
188 } else
189 p = xmalloc(size);
190
191 return p;
192 }
193
194 static char *
195 squidaio_xstrdup(const char *str)
196 {
197 char *p;
198 int len = strlen(str) + 1;
199
200 p = (char *)squidaio_xmalloc(len);
201 strncpy(p, str, len);
202
203 return p;
204 }
205
206 void
207 squidaio_xfree(void *p, int size)
208 {
209 MemAllocator *pool;
210
211 if ((pool = squidaio_get_pool(size)) != NULL) {
212 pool->freeOne(p);
213 } else
214 xfree(p);
215 }
216
217 static void
218 squidaio_xstrfree(char *str)
219 {
220 MemAllocator *pool;
221 int len = strlen(str) + 1;
222
223 if ((pool = squidaio_get_pool(len)) != NULL) {
224 pool->freeOne(str);
225 } else
226 xfree(str);
227 }
228
229 void
230 squidaio_init(void)
231 {
232 int i;
233 squidaio_thread_t *threadp;
234
235 if (squidaio_initialised)
236 return;
237
238 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
239 GetCurrentThread(), /* pseudo handle to copy */
240 GetCurrentProcess(), /* pseudo handle, don't close */
241 &main_thread,
242 0, /* required access */
243 FALSE, /* child process's don't inherit the handle */
244 DUPLICATE_SAME_ACCESS)) {
245 /* spit errors */
246 fatal("Couldn't get current thread handle");
247 }
248
249 /* Initialize request queue */
250 if ((request_queue.mutex = CreateMutex(NULL, /* no inheritance */
251 FALSE, /* start unowned (as per mutex_init) */
252 NULL) /* no name */
253 ) == NULL) {
254 fatal("Failed to create mutex");
255 }
256
257 if ((request_queue.cond = CreateEvent(NULL, /* no inheritance */
258 FALSE, /* auto signal reset - which I think is pthreads like ? */
259 FALSE, /* start non signaled */
260 NULL) /* no name */
261 ) == NULL) {
262 fatal("Failed to create condition variable");
263 }
264
265 request_queue.head = NULL;
266
267 request_queue.tailp = &request_queue.head;
268
269 request_queue.requests = 0;
270
271 request_queue.blocked = 0;
272
273 /* Initialize done queue */
274
275 if ((done_queue.mutex = CreateMutex(NULL, /* no inheritance */
276 FALSE, /* start unowned (as per mutex_init) */
277 NULL) /* no name */
278 ) == NULL) {
279 fatal("Failed to create mutex");
280 }
281
282 if ((done_queue.cond = CreateEvent(NULL, /* no inheritance */
283 TRUE, /* manually signaled - which I think is pthreads like ? */
284 FALSE, /* start non signaled */
285 NULL) /* no name */
286 ) == NULL) {
287 fatal("Failed to create condition variable");
288 }
289
290 done_queue.head = NULL;
291
292 done_queue.tailp = &done_queue.head;
293
294 done_queue.requests = 0;
295
296 done_queue.blocked = 0;
297
298 // Initialize the thread I/O pipes before creating any threads
299 // see bug 3189 comment 5 about race conditions.
300 CommIO::Initialize();
301
302 /* Create threads and get them to sit in their wait loop */
303 squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
304
305 assert(NUMTHREADS);
306
307 for (i = 0; i < NUMTHREADS; ++i) {
308 threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
309 threadp->status = _THREAD_STARTING;
310 threadp->current_req = NULL;
311 threadp->requests = 0;
312 threadp->next = threads;
313 threads = threadp;
314
315 if ((threadp->thread = CreateThread(NULL, /* no security attributes */
316 0, /* use default stack size */
317 squidaio_thread_loop, /* thread function */
318 threadp, /* argument to thread function */
319 0, /* use default creation flags */
320 &(threadp->dwThreadId)) /* returns the thread identifier */
321 ) == NULL) {
322 fprintf(stderr, "Thread creation failed\n");
323 threadp->status = _THREAD_FAILED;
324 continue;
325 }
326
327 /* Set the new thread priority above parent process */
328 SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
329 }
330
331 /* Create request pool */
332 squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
333
334 squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
335
336 squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
337
338 squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
339
340 squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
341
342 squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
343
344 squidaio_initialised = 1;
345 }
346
347 void
348 squidaio_shutdown(void)
349 {
350 squidaio_thread_t *threadp;
351 int i;
352 HANDLE * hthreads;
353
354 if (!squidaio_initialised)
355 return;
356
357 /* This is the same as in squidaio_sync */
358 do {
359 squidaio_poll_queues();
360 } while (request_queue_len > 0);
361
362 hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
363
364 threadp = threads;
365
366 for (i = 0; i < NUMTHREADS; ++i) {
367 threadp->exit = 1;
368 hthreads[i] = threadp->thread;
369 threadp = threadp->next;
370 }
371
372 ReleaseMutex(request_queue.mutex);
373 ResetEvent(request_queue.cond);
374 ReleaseMutex(done_queue.mutex);
375 ResetEvent(done_queue.cond);
376 Sleep(0);
377
378 WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
379
380 for (i = 0; i < NUMTHREADS; ++i) {
381 CloseHandle(hthreads[i]);
382 }
383
384 CloseHandle(main_thread);
385 CommIO::NotifyIOClose();
386
387 squidaio_initialised = 0;
388 xfree(hthreads);
389 }
390
391 static DWORD WINAPI
392 squidaio_thread_loop(LPVOID lpParam)
393 {
394 squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
395 squidaio_request_t *request;
396 HANDLE cond; /* local copy of the event queue because win32 event handles
397 * don't atomically release the mutex as cond variables do. */
398
399 /* lock the thread info */
400
401 if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
402 fatal("Can't get ownership of mutex\n");
403 }
404
405 /* duplicate the handle */
406 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
407 request_queue.cond, /* handle to copy */
408 GetCurrentProcess(), /* pseudo handle, don't close */
409 &cond,
410 0, /* required access */
411 FALSE, /* child process's don't inherit the handle */
412 DUPLICATE_SAME_ACCESS))
413 fatal("Can't duplicate mutex handle\n");
414
415 if (!ReleaseMutex(request_queue.mutex)) {
416 CloseHandle(cond);
417 fatal("Can't release mutex\n");
418 }
419
420 Sleep(0);
421
422 while (1) {
423 DWORD rv;
424 threadp->current_req = request = NULL;
425 request = NULL;
426 /* Get a request to process */
427 threadp->status = _THREAD_WAITING;
428
429 if (threadp->exit) {
430 CloseHandle(request_queue.mutex);
431 CloseHandle(cond);
432 return 0;
433 }
434
435 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
436
437 if (rv == WAIT_FAILED) {
438 CloseHandle(cond);
439 return 1;
440 }
441
442 while (!request_queue.head) {
443 if (!ReleaseMutex(request_queue.mutex)) {
444 CloseHandle(cond);
445 threadp->status = _THREAD_FAILED;
446 return 1;
447 }
448
449 Sleep(0);
450 rv = WaitForSingleObject(cond, INFINITE);
451
452 if (rv == WAIT_FAILED) {
453 CloseHandle(cond);
454 return 1;
455 }
456
457 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
458
459 if (rv == WAIT_FAILED) {
460 CloseHandle(cond);
461 return 1;
462 }
463 }
464
465 request = request_queue.head;
466
467 if (request)
468 request_queue.head = request->next;
469
470 if (!request_queue.head)
471 request_queue.tailp = &request_queue.head;
472
473 if (!ReleaseMutex(request_queue.mutex)) {
474 CloseHandle(cond);
475 return 1;
476 }
477
478 Sleep(0);
479
480 /* process the request */
481 threadp->status = _THREAD_BUSY;
482
483 request->next = NULL;
484
485 threadp->current_req = request;
486
487 errno = 0;
488
489 if (!request->cancelled) {
490 switch (request->request_type) {
491
492 case _AIO_OP_OPEN:
493 squidaio_do_open(request);
494 break;
495
496 case _AIO_OP_READ:
497 squidaio_do_read(request);
498 break;
499
500 case _AIO_OP_WRITE:
501 squidaio_do_write(request);
502 break;
503
504 case _AIO_OP_CLOSE:
505 squidaio_do_close(request);
506 break;
507
508 case _AIO_OP_UNLINK:
509 squidaio_do_unlink(request);
510 break;
511
512 #if AIO_OPENDIR /* Opendir not implemented yet */
513
514 case _AIO_OP_OPENDIR:
515 squidaio_do_opendir(request);
516 break;
517 #endif
518
519 case _AIO_OP_STAT:
520 squidaio_do_stat(request);
521 break;
522
523 default:
524 request->ret = -1;
525 request->err = EINVAL;
526 break;
527 }
528 } else { /* cancelled */
529 request->ret = -1;
530 request->err = EINTR;
531 }
532
533 threadp->status = _THREAD_DONE;
534 /* put the request in the done queue */
535 rv = WaitForSingleObject(done_queue.mutex, INFINITE);
536
537 if (rv == WAIT_FAILED) {
538 CloseHandle(cond);
539 return 1;
540 }
541
542 *done_queue.tailp = request;
543 done_queue.tailp = &request->next;
544
545 if (!ReleaseMutex(done_queue.mutex)) {
546 CloseHandle(cond);
547 return 1;
548 }
549
550 CommIO::NotifyIOCompleted();
551 Sleep(0);
552 ++ threadp->requests;
553 } /* while forever */
554
555 CloseHandle(cond);
556
557 return 0;
558 } /* squidaio_thread_loop */
559
560 static void
561 squidaio_queue_request(squidaio_request_t * request)
562 {
563 static int high_start = 0;
564 debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
565 /* Mark it as not executed (failing result, no error) */
566 request->ret = -1;
567 request->err = 0;
568 /* Internal housekeeping */
569 request_queue_len += 1;
570 request->resultp->_data = request;
571 /* Play some tricks with the request_queue2 queue */
572 request->next = NULL;
573
574 if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
575 if (request_queue2.head) {
576 /* Grab blocked requests */
577 *request_queue.tailp = request_queue2.head;
578 request_queue.tailp = request_queue2.tailp;
579 }
580
581 /* Enqueue request */
582 *request_queue.tailp = request;
583
584 request_queue.tailp = &request->next;
585
586 if (!SetEvent(request_queue.cond))
587 fatal("Couldn't push queue");
588
589 if (!ReleaseMutex(request_queue.mutex)) {
590 /* unexpected error */
591 fatal("Couldn't push queue");
592 }
593
594 Sleep(0);
595
596 if (request_queue2.head) {
597 /* Clear queue of blocked requests */
598 request_queue2.head = NULL;
599 request_queue2.tailp = &request_queue2.head;
600 }
601 } else {
602 /* Oops, the request queue is blocked, use request_queue2 */
603 *request_queue2.tailp = request;
604 request_queue2.tailp = &request->next;
605 }
606
607 if (request_queue2.head) {
608 static int filter = 0;
609 static int filter_limit = 8;
610
611 if (++filter >= filter_limit) {
612 filter_limit += filter;
613 filter = 0;
614 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Queue congestion");
615 }
616 }
617
618 /* Warn if out of threads */
619 if (request_queue_len > MAGIC1) {
620 static int last_warn = 0;
621 static int queue_high, queue_low;
622
623 if (high_start == 0) {
624 high_start = (int)squid_curtime;
625 queue_high = request_queue_len;
626 queue_low = request_queue_len;
627 }
628
629 if (request_queue_len > queue_high)
630 queue_high = request_queue_len;
631
632 if (request_queue_len < queue_low)
633 queue_low = request_queue_len;
634
635 if (squid_curtime >= (last_warn + 15) &&
636 squid_curtime >= (high_start + 5)) {
637 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Disk I/O overloading");
638
639 if (squid_curtime >= (high_start + 15))
640 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
641 request_queue_len << ", high=" << queue_high <<
642 ", low=" << queue_low << ", duration=" <<
643 (long int) (squid_curtime - high_start));
644
645 last_warn = (int)squid_curtime;
646 }
647 } else {
648 high_start = 0;
649 }
650
651 /* Warn if seriously overloaded */
652 if (request_queue_len > RIDICULOUS_LENGTH) {
653 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
654 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
655 squidaio_sync();
656 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
657 }
658 } /* squidaio_queue_request */
659
660 static void
661 squidaio_cleanup_request(squidaio_request_t * requestp)
662 {
663 squidaio_result_t *resultp = requestp->resultp;
664 int cancelled = requestp->cancelled;
665
666 /* Free allocated structures and copy data back to user space if the */
667 /* request hasn't been cancelled */
668
669 switch (requestp->request_type) {
670
671 case _AIO_OP_STAT:
672
673 if (!cancelled && requestp->ret == 0)
674 memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
675
676 squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
677
678 squidaio_xstrfree(requestp->path);
679
680 break;
681
682 case _AIO_OP_OPEN:
683 if (cancelled && requestp->ret >= 0)
684 /* The open() was cancelled but completed */
685 close(requestp->ret);
686
687 squidaio_xstrfree(requestp->path);
688
689 break;
690
691 case _AIO_OP_CLOSE:
692 if (cancelled && requestp->ret < 0)
693 /* The close() was cancelled and never got executed */
694 close(requestp->fd);
695
696 break;
697
698 case _AIO_OP_UNLINK:
699
700 case _AIO_OP_OPENDIR:
701 squidaio_xstrfree(requestp->path);
702
703 break;
704
705 case _AIO_OP_READ:
706 break;
707
708 case _AIO_OP_WRITE:
709 break;
710
711 default:
712 break;
713 }
714
715 if (resultp != NULL && !cancelled) {
716 resultp->aio_return = requestp->ret;
717 resultp->aio_errno = requestp->err;
718 }
719
720 squidaio_request_pool->freeOne(requestp);
721 } /* squidaio_cleanup_request */
722
723 int
724 squidaio_cancel(squidaio_result_t * resultp)
725 {
726 squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
727
728 if (request && request->resultp == resultp) {
729 debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
730 request->cancelled = 1;
731 request->resultp = NULL;
732 resultp->_data = NULL;
733 resultp->result_type = _AIO_OP_NONE;
734 return 0;
735 }
736
737 return 1;
738 } /* squidaio_cancel */
739
740 int
741 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
742 {
743 squidaio_init();
744 squidaio_request_t *requestp;
745
746 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
747
748 requestp->path = (char *) squidaio_xstrdup(path);
749
750 requestp->oflag = oflag;
751
752 requestp->mode = mode;
753
754 requestp->resultp = resultp;
755
756 requestp->request_type = _AIO_OP_OPEN;
757
758 requestp->cancelled = 0;
759
760 resultp->result_type = _AIO_OP_OPEN;
761
762 squidaio_queue_request(requestp);
763
764 return 0;
765 }
766
767 static void
768 squidaio_do_open(squidaio_request_t * requestp)
769 {
770 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
771 requestp->err = errno;
772 }
773
774 int
775 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
776 {
777 squidaio_request_t *requestp;
778
779 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
780
781 requestp->fd = fd;
782
783 requestp->bufferp = bufp;
784
785 requestp->buflen = bufs;
786
787 requestp->offset = offset;
788
789 requestp->whence = whence;
790
791 requestp->resultp = resultp;
792
793 requestp->request_type = _AIO_OP_READ;
794
795 requestp->cancelled = 0;
796
797 resultp->result_type = _AIO_OP_READ;
798
799 squidaio_queue_request(requestp);
800
801 return 0;
802 }
803
804 static void
805 squidaio_do_read(squidaio_request_t * requestp)
806 {
807 lseek(requestp->fd, requestp->offset, requestp->whence);
808
809 if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
810 requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
811 WIN32_maperror(GetLastError());
812 requestp->ret = -1;
813 }
814
815 requestp->err = errno;
816 }
817
818 int
819 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
820 {
821 squidaio_request_t *requestp;
822
823 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
824
825 requestp->fd = fd;
826
827 requestp->bufferp = bufp;
828
829 requestp->buflen = bufs;
830
831 requestp->offset = offset;
832
833 requestp->whence = whence;
834
835 requestp->resultp = resultp;
836
837 requestp->request_type = _AIO_OP_WRITE;
838
839 requestp->cancelled = 0;
840
841 resultp->result_type = _AIO_OP_WRITE;
842
843 squidaio_queue_request(requestp);
844
845 return 0;
846 }
847
848 static void
849 squidaio_do_write(squidaio_request_t * requestp)
850 {
851 if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
852 requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
853 WIN32_maperror(GetLastError());
854 requestp->ret = -1;
855 }
856
857 requestp->err = errno;
858 }
859
860 int
861 squidaio_close(int fd, squidaio_result_t * resultp)
862 {
863 squidaio_request_t *requestp;
864
865 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
866
867 requestp->fd = fd;
868
869 requestp->resultp = resultp;
870
871 requestp->request_type = _AIO_OP_CLOSE;
872
873 requestp->cancelled = 0;
874
875 resultp->result_type = _AIO_OP_CLOSE;
876
877 squidaio_queue_request(requestp);
878
879 return 0;
880 }
881
882 static void
883 squidaio_do_close(squidaio_request_t * requestp)
884 {
885 if ((requestp->ret = close(requestp->fd)) < 0) {
886 debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
887 close(requestp->fd);
888 }
889
890 requestp->err = errno;
891 }
892
893 int
894
895 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
896 {
897 squidaio_init();
898 squidaio_request_t *requestp;
899
900 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
901
902 requestp->path = (char *) squidaio_xstrdup(path);
903
904 requestp->statp = sb;
905
906 requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
907
908 requestp->resultp = resultp;
909
910 requestp->request_type = _AIO_OP_STAT;
911
912 requestp->cancelled = 0;
913
914 resultp->result_type = _AIO_OP_STAT;
915
916 squidaio_queue_request(requestp);
917
918 return 0;
919 }
920
921 static void
922 squidaio_do_stat(squidaio_request_t * requestp)
923 {
924 requestp->ret = stat(requestp->path, requestp->tmpstatp);
925 requestp->err = errno;
926 }
927
928 int
929 squidaio_unlink(const char *path, squidaio_result_t * resultp)
930 {
931 squidaio_init();
932 squidaio_request_t *requestp;
933
934 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
935
936 requestp->path = squidaio_xstrdup(path);
937
938 requestp->resultp = resultp;
939
940 requestp->request_type = _AIO_OP_UNLINK;
941
942 requestp->cancelled = 0;
943
944 resultp->result_type = _AIO_OP_UNLINK;
945
946 squidaio_queue_request(requestp);
947
948 return 0;
949 }
950
951 static void
952 squidaio_do_unlink(squidaio_request_t * requestp)
953 {
954 requestp->ret = unlink(requestp->path);
955 requestp->err = errno;
956 }
957
958 #if AIO_OPENDIR
959 /* XXX squidaio_opendir NOT implemented yet.. */
960
961 int
962 squidaio_opendir(const char *path, squidaio_result_t * resultp)
963 {
964 squidaio_request_t *requestp;
965 int len;
966
967 requestp = squidaio_request_pool->alloc();
968
969 resultp->result_type = _AIO_OP_OPENDIR;
970
971 return -1;
972 }
973
974 static void
975 squidaio_do_opendir(squidaio_request_t * requestp)
976 {
977 /* NOT IMPLEMENTED */
978 }
979
980 #endif
981
982 static void
983 squidaio_poll_queues(void)
984 {
985 /* kick "overflow" request queue */
986
987 if (request_queue2.head &&
988 (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
989 *request_queue.tailp = request_queue2.head;
990 request_queue.tailp = request_queue2.tailp;
991
992 if (!SetEvent(request_queue.cond))
993 fatal("couldn't push queue\n");
994
995 if (!ReleaseMutex(request_queue.mutex)) {
996 /* unexpected error */
997 }
998
999 Sleep(0);
1000 request_queue2.head = NULL;
1001 request_queue2.tailp = &request_queue2.head;
1002 }
1003
1004 /* poll done queue */
1005 if (done_queue.head &&
1006 (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
1007
1008 struct squidaio_request_t *requests = done_queue.head;
1009 done_queue.head = NULL;
1010 done_queue.tailp = &done_queue.head;
1011
1012 if (!ReleaseMutex(done_queue.mutex)) {
1013 /* unexpected error */
1014 }
1015
1016 Sleep(0);
1017 *done_requests.tailp = requests;
1018 request_queue_len -= 1;
1019
1020 while (requests->next) {
1021 requests = requests->next;
1022 request_queue_len -= 1;
1023 }
1024
1025 done_requests.tailp = &requests->next;
1026 }
1027 }
1028
1029 squidaio_result_t *
1030 squidaio_poll_done(void)
1031 {
1032 squidaio_request_t *request;
1033 squidaio_result_t *resultp;
1034 int cancelled;
1035 int polled = 0;
1036
1037 AIO_REPOLL:
1038 request = done_requests.head;
1039
1040 if (request == NULL && !polled) {
1041 CommIO::ResetNotifications();
1042 squidaio_poll_queues();
1043 polled = 1;
1044 request = done_requests.head;
1045 }
1046
1047 if (!request) {
1048 return NULL;
1049 }
1050
1051 debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
1052 done_requests.head = request->next;
1053
1054 if (!done_requests.head)
1055 done_requests.tailp = &done_requests.head;
1056
1057 resultp = request->resultp;
1058
1059 cancelled = request->cancelled;
1060
1061 squidaio_debug(request);
1062
1063 debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
1064
1065 squidaio_cleanup_request(request);
1066
1067 if (cancelled)
1068 goto AIO_REPOLL;
1069
1070 return resultp;
1071 } /* squidaio_poll_done */
1072
1073 int
1074 squidaio_operations_pending(void)
1075 {
1076 return request_queue_len + (done_requests.head ? 1 : 0);
1077 }
1078
1079 int
1080 squidaio_sync(void)
1081 {
1082 /* XXX This might take a while if the queue is large.. */
1083
1084 do {
1085 squidaio_poll_queues();
1086 } while (request_queue_len > 0);
1087
1088 return squidaio_operations_pending();
1089 }
1090
1091 int
1092 squidaio_get_queue_len(void)
1093 {
1094 return request_queue_len;
1095 }
1096
1097 static void
1098 squidaio_debug(squidaio_request_t * request)
1099 {
1100 switch (request->request_type) {
1101
1102 case _AIO_OP_OPEN:
1103 debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1104 break;
1105
1106 case _AIO_OP_READ:
1107 debugs(43, 5, "READ on fd: " << request->fd);
1108 break;
1109
1110 case _AIO_OP_WRITE:
1111 debugs(43, 5, "WRITE on fd: " << request->fd);
1112 break;
1113
1114 case _AIO_OP_CLOSE:
1115 debugs(43, 5, "CLOSE of fd: " << request->fd);
1116 break;
1117
1118 case _AIO_OP_UNLINK:
1119 debugs(43, 5, "UNLINK of " << request->path);
1120 break;
1121
1122 default:
1123 break;
1124 }
1125 }
1126
1127 void
1128 squidaio_stats(StoreEntry * sentry)
1129 {
1130 squidaio_thread_t *threadp;
1131 int i;
1132
1133 if (!squidaio_initialised)
1134 return;
1135
1136 storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1137
1138 storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1139
1140 threadp = threads;
1141
1142 for (i = 0; i < NUMTHREADS; ++i) {
1143 storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1144 threadp = threadp->next;
1145 }
1146 }