]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/DiskThreads/aiops_win32.cc
Cleanup: zap CVS Id tags
[thirdparty/squid.git] / src / DiskIO / DiskThreads / aiops_win32.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 43 Windows AIOPS
5 * AUTHOR: Stewart Forster <slf@connect.com.au>
6 * AUTHOR: Robert Collins <robertc@squid-cache.org>
7 * AUTHOR: Guido Serassio <serassio@squid-cache.org>
8 *
9 * SQUID Web Proxy Cache http://www.squid-cache.org/
10 * ----------------------------------------------------------
11 *
12 * Squid is the result of efforts by numerous individuals from
13 * the Internet community; see the CONTRIBUTORS file for full
14 * details. Many organizations have provided support for Squid's
15 * development; see the SPONSORS file for full details. Squid is
16 * Copyrighted (C) 2001 by the Regents of the University of
17 * California; see the COPYRIGHT file for full details. Squid
18 * incorporates software developed and/or copyrighted by other
19 * sources; see the CREDITS file for full details.
20 *
21 * This program is free software; you can redistribute it and/or modify
22 * it under the terms of the GNU General Public License as published by
23 * the Free Software Foundation; either version 2 of the License, or
24 * (at your option) any later version.
25 *
26 * This program is distributed in the hope that it will be useful,
27 * but WITHOUT ANY WARRANTY; without even the implied warranty of
28 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 * GNU General Public License for more details.
30 *
31 * You should have received a copy of the GNU General Public License
32 * along with this program; if not, write to the Free Software
33 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
34 *
35 */
36
37 #include "squid.h"
38 #include "squid_windows.h"
39 #include "DiskThreads.h"
40
41 #include <stdio.h>
42 #include <sys/types.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <errno.h>
46 #include <dirent.h>
47 #include <signal.h>
48 #include "CommIO.h"
49 #include "SquidTime.h"
50 #include "Store.h"
51
52 #define RIDICULOUS_LENGTH 4096
53
54 enum _squidaio_thread_status {
55 _THREAD_STARTING = 0,
56 _THREAD_WAITING,
57 _THREAD_BUSY,
58 _THREAD_FAILED,
59 _THREAD_DONE
60 };
61 typedef enum _squidaio_thread_status squidaio_thread_status;
62
63 typedef struct squidaio_request_t {
64
65 struct squidaio_request_t *next;
66 squidaio_request_type request_type;
67 int cancelled;
68 char *path;
69 int oflag;
70 mode_t mode;
71 int fd;
72 char *bufferp;
73 char *tmpbufp;
74 size_t buflen;
75 off_t offset;
76 int whence;
77 int ret;
78 int err;
79
80 struct stat *tmpstatp;
81
82 struct stat *statp;
83 squidaio_result_t *resultp;
84 } squidaio_request_t;
85
86 typedef struct squidaio_request_queue_t {
87 HANDLE mutex;
88 HANDLE cond; /* See Event objects */
89 squidaio_request_t *volatile head;
90 squidaio_request_t *volatile *volatile tailp;
91 unsigned long requests;
92 unsigned long blocked; /* main failed to lock the queue */
93 } squidaio_request_queue_t;
94
95 typedef struct squidaio_thread_t squidaio_thread_t;
96
97 struct squidaio_thread_t {
98 squidaio_thread_t *next;
99 HANDLE thread;
100 DWORD dwThreadId; /* thread ID */
101 squidaio_thread_status status;
102
103 struct squidaio_request_t *current_req;
104 unsigned long requests;
105 int volatile exit;
106 };
107
108 static void squidaio_queue_request(squidaio_request_t *);
109 static void squidaio_cleanup_request(squidaio_request_t *);
110 static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
111 static void squidaio_do_open(squidaio_request_t *);
112 static void squidaio_do_read(squidaio_request_t *);
113 static void squidaio_do_write(squidaio_request_t *);
114 static void squidaio_do_close(squidaio_request_t *);
115 static void squidaio_do_stat(squidaio_request_t *);
116 static void squidaio_do_unlink(squidaio_request_t *);
117 #if AIO_OPENDIR
118 static void *squidaio_do_opendir(squidaio_request_t *);
119 #endif
120 static void squidaio_debug(squidaio_request_t *);
121 static void squidaio_poll_queues(void);
122
123 static squidaio_thread_t *threads = NULL;
124 static int squidaio_initialised = 0;
125
126
127 #define AIO_LARGE_BUFS 16384
128 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
129 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
130 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
131 #define AIO_MICRO_BUFS 128
132
133 static MemAllocator *squidaio_large_bufs = NULL; /* 16K */
134 static MemAllocator *squidaio_medium_bufs = NULL; /* 8K */
135 static MemAllocator *squidaio_small_bufs = NULL; /* 4K */
136 static MemAllocator *squidaio_tiny_bufs = NULL; /* 2K */
137 static MemAllocator *squidaio_micro_bufs = NULL; /* 128K */
138
139 static int request_queue_len = 0;
140 static MemAllocator *squidaio_request_pool = NULL;
141 static MemAllocator *squidaio_thread_pool = NULL;
142 static squidaio_request_queue_t request_queue;
143
144 static struct {
145 squidaio_request_t *head, **tailp;
146 }
147
148 request_queue2 = {
149
150 NULL, &request_queue2.head
151 };
152 static squidaio_request_queue_t done_queue;
153
154 static struct {
155 squidaio_request_t *head, **tailp;
156 }
157
158 done_requests = {
159
160 NULL, &done_requests.head
161 };
162
163 static HANDLE main_thread;
164
165 static MemAllocator *
166 squidaio_get_pool(int size)
167 {
168 if (size <= AIO_LARGE_BUFS) {
169 if (size <= AIO_MICRO_BUFS)
170 return squidaio_micro_bufs;
171 else if (size <= AIO_TINY_BUFS)
172 return squidaio_tiny_bufs;
173 else if (size <= AIO_SMALL_BUFS)
174 return squidaio_small_bufs;
175 else if (size <= AIO_MEDIUM_BUFS)
176 return squidaio_medium_bufs;
177 else
178 return squidaio_large_bufs;
179 }
180
181 return NULL;
182 }
183
184 void *
185 squidaio_xmalloc(int size)
186 {
187 void *p;
188 MemAllocator *pool;
189
190 if ((pool = squidaio_get_pool(size)) != NULL) {
191 p = pool->alloc();
192 } else
193 p = xmalloc(size);
194
195 return p;
196 }
197
198 static char *
199 squidaio_xstrdup(const char *str)
200 {
201 char *p;
202 int len = strlen(str) + 1;
203
204 p = (char *)squidaio_xmalloc(len);
205 strncpy(p, str, len);
206
207 return p;
208 }
209
210 void
211 squidaio_xfree(void *p, int size)
212 {
213 MemAllocator *pool;
214
215 if ((pool = squidaio_get_pool(size)) != NULL) {
216 pool->free(p);
217 } else
218 xfree(p);
219 }
220
221 static void
222 squidaio_xstrfree(char *str)
223 {
224 MemAllocator *pool;
225 int len = strlen(str) + 1;
226
227 if ((pool = squidaio_get_pool(len)) != NULL) {
228 pool->free(str);
229 } else
230 xfree(str);
231 }
232
233 void
234 squidaio_init(void)
235 {
236 int i;
237 squidaio_thread_t *threadp;
238
239 if (squidaio_initialised)
240 return;
241
242 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
243 GetCurrentThread(), /* pseudo handle to copy */
244 GetCurrentProcess(), /* pseudo handle, don't close */
245 &main_thread,
246 0, /* required access */
247 FALSE, /* child process's don't inherit the handle */
248 DUPLICATE_SAME_ACCESS)) {
249 /* spit errors */
250 fatal("Couldn't get current thread handle");
251 }
252
253 /* Initialize request queue */
254 if ((request_queue.mutex = CreateMutex(NULL, /* no inheritance */
255 FALSE, /* start unowned (as per mutex_init) */
256 NULL) /* no name */
257 ) == NULL) {
258 fatal("Failed to create mutex");
259 }
260
261 if ((request_queue.cond = CreateEvent(NULL, /* no inheritance */
262 FALSE, /* auto signal reset - which I think is pthreads like ? */
263 FALSE, /* start non signaled */
264 NULL) /* no name */
265 ) == NULL) {
266 fatal("Failed to create condition variable");
267 }
268
269 request_queue.head = NULL;
270
271 request_queue.tailp = &request_queue.head;
272
273 request_queue.requests = 0;
274
275 request_queue.blocked = 0;
276
277 /* Initialize done queue */
278
279 if ((done_queue.mutex = CreateMutex(NULL, /* no inheritance */
280 FALSE, /* start unowned (as per mutex_init) */
281 NULL) /* no name */
282 ) == NULL) {
283 fatal("Failed to create mutex");
284 }
285
286 if ((done_queue.cond = CreateEvent(NULL, /* no inheritance */
287 TRUE, /* manually signaled - which I think is pthreads like ? */
288 FALSE, /* start non signaled */
289 NULL) /* no name */
290 ) == NULL) {
291 fatal("Failed to create condition variable");
292 }
293
294 done_queue.head = NULL;
295
296 done_queue.tailp = &done_queue.head;
297
298 done_queue.requests = 0;
299
300 done_queue.blocked = 0;
301
302 CommIO::NotifyIOCompleted();
303
304 /* Create threads and get them to sit in their wait loop */
305 squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
306
307 assert(NUMTHREADS);
308
309 for (i = 0; i < NUMTHREADS; i++) {
310 threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
311 threadp->status = _THREAD_STARTING;
312 threadp->current_req = NULL;
313 threadp->requests = 0;
314 threadp->next = threads;
315 threads = threadp;
316
317 if ((threadp->thread = CreateThread(NULL, /* no security attributes */
318 0, /* use default stack size */
319 squidaio_thread_loop, /* thread function */
320 threadp, /* argument to thread function */
321 0, /* use default creation flags */
322 &(threadp->dwThreadId)) /* returns the thread identifier */
323 ) == NULL) {
324 fprintf(stderr, "Thread creation failed\n");
325 threadp->status = _THREAD_FAILED;
326 continue;
327 }
328
329 /* Set the new thread priority above parent process */
330 SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
331 }
332
333 /* Create request pool */
334 squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
335
336 squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
337
338 squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
339
340 squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
341
342 squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
343
344 squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
345
346 squidaio_initialised = 1;
347 }
348
349 void
350 squidaio_shutdown(void)
351 {
352 squidaio_thread_t *threadp;
353 int i;
354 HANDLE * hthreads;
355
356 if (!squidaio_initialised)
357 return;
358
359 /* This is the same as in squidaio_sync */
360 do {
361 squidaio_poll_queues();
362 } while (request_queue_len > 0);
363
364 hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
365
366 threadp = threads;
367
368 for (i = 0; i < NUMTHREADS; i++) {
369 threadp->exit = 1;
370 hthreads[i] = threadp->thread;
371 threadp = threadp->next;
372 }
373
374 ReleaseMutex(request_queue.mutex);
375 ResetEvent(request_queue.cond);
376 ReleaseMutex(done_queue.mutex);
377 ResetEvent(done_queue.cond);
378 Sleep(0);
379
380 WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
381
382 for (i = 0; i < NUMTHREADS; i++) {
383 CloseHandle(hthreads[i]);
384 }
385
386 CloseHandle(main_thread);
387 CommIO::NotifyIOClose();
388
389 squidaio_initialised = 0;
390 xfree(hthreads);
391 }
392
393 static DWORD WINAPI
394 squidaio_thread_loop(LPVOID lpParam)
395 {
396 squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
397 squidaio_request_t *request;
398 HANDLE cond; /* local copy of the event queue because win32 event handles
399 * don't atomically release the mutex as cond variables do. */
400
401 /* lock the thread info */
402
403 if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
404 fatal("Can't get ownership of mutex\n");
405 }
406
407 /* duplicate the handle */
408 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
409 request_queue.cond, /* handle to copy */
410 GetCurrentProcess(), /* pseudo handle, don't close */
411 &cond,
412 0, /* required access */
413 FALSE, /* child process's don't inherit the handle */
414 DUPLICATE_SAME_ACCESS))
415 fatal("Can't duplicate mutex handle\n");
416
417 if (!ReleaseMutex(request_queue.mutex)) {
418 CloseHandle(cond);
419 fatal("Can't release mutex\n");
420 }
421
422 Sleep(0);
423
424 while (1) {
425 DWORD rv;
426 threadp->current_req = request = NULL;
427 request = NULL;
428 /* Get a request to process */
429 threadp->status = _THREAD_WAITING;
430
431 if (threadp->exit) {
432 CloseHandle(request_queue.mutex);
433 CloseHandle(cond);
434 return 0;
435 }
436
437 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
438
439 if (rv == WAIT_FAILED) {
440 CloseHandle(cond);
441 return 1;
442 }
443
444 while (!request_queue.head) {
445 if (!ReleaseMutex(request_queue.mutex)) {
446 CloseHandle(cond);
447 threadp->status = _THREAD_FAILED;
448 return 1;
449 }
450
451 Sleep(0);
452 rv = WaitForSingleObject(cond, INFINITE);
453
454 if (rv == WAIT_FAILED) {
455 CloseHandle(cond);
456 return 1;
457 }
458
459 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
460
461 if (rv == WAIT_FAILED) {
462 CloseHandle(cond);
463 return 1;
464 }
465 }
466
467 request = request_queue.head;
468
469 if (request)
470 request_queue.head = request->next;
471
472 if (!request_queue.head)
473 request_queue.tailp = &request_queue.head;
474
475 if (!ReleaseMutex(request_queue.mutex)) {
476 CloseHandle(cond);
477 return 1;
478 }
479
480 Sleep(0);
481
482 /* process the request */
483 threadp->status = _THREAD_BUSY;
484
485 request->next = NULL;
486
487 threadp->current_req = request;
488
489 errno = 0;
490
491 if (!request->cancelled) {
492 switch (request->request_type) {
493
494 case _AIO_OP_OPEN:
495 squidaio_do_open(request);
496 break;
497
498 case _AIO_OP_READ:
499 squidaio_do_read(request);
500 break;
501
502 case _AIO_OP_WRITE:
503 squidaio_do_write(request);
504 break;
505
506 case _AIO_OP_CLOSE:
507 squidaio_do_close(request);
508 break;
509
510 case _AIO_OP_UNLINK:
511 squidaio_do_unlink(request);
512 break;
513
514 #if AIO_OPENDIR /* Opendir not implemented yet */
515
516 case _AIO_OP_OPENDIR:
517 squidaio_do_opendir(request);
518 break;
519 #endif
520
521 case _AIO_OP_STAT:
522 squidaio_do_stat(request);
523 break;
524
525 default:
526 request->ret = -1;
527 request->err = EINVAL;
528 break;
529 }
530 } else { /* cancelled */
531 request->ret = -1;
532 request->err = EINTR;
533 }
534
535 threadp->status = _THREAD_DONE;
536 /* put the request in the done queue */
537 rv = WaitForSingleObject(done_queue.mutex, INFINITE);
538
539 if (rv == WAIT_FAILED) {
540 CloseHandle(cond);
541 return 1;
542 }
543
544 *done_queue.tailp = request;
545 done_queue.tailp = &request->next;
546
547 if (!ReleaseMutex(done_queue.mutex)) {
548 CloseHandle(cond);
549 return 1;
550 }
551
552 CommIO::NotifyIOCompleted();
553 Sleep(0);
554 threadp->requests++;
555 } /* while forever */
556
557 CloseHandle(cond);
558
559 return 0;
560 } /* squidaio_thread_loop */
561
562 static void
563 squidaio_queue_request(squidaio_request_t * request)
564 {
565 static int high_start = 0;
566 debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
567 /* Mark it as not executed (failing result, no error) */
568 request->ret = -1;
569 request->err = 0;
570 /* Internal housekeeping */
571 request_queue_len += 1;
572 request->resultp->_data = request;
573 /* Play some tricks with the request_queue2 queue */
574 request->next = NULL;
575
576 if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
577 if (request_queue2.head) {
578 /* Grab blocked requests */
579 *request_queue.tailp = request_queue2.head;
580 request_queue.tailp = request_queue2.tailp;
581 }
582
583 /* Enqueue request */
584 *request_queue.tailp = request;
585
586 request_queue.tailp = &request->next;
587
588 if (!SetEvent(request_queue.cond))
589 fatal("Couldn't push queue");
590
591 if (!ReleaseMutex(request_queue.mutex)) {
592 /* unexpected error */
593 fatal("Couldn't push queue");
594 }
595
596 Sleep(0);
597
598 if (request_queue2.head) {
599 /* Clear queue of blocked requests */
600 request_queue2.head = NULL;
601 request_queue2.tailp = &request_queue2.head;
602 }
603 } else {
604 /* Oops, the request queue is blocked, use request_queue2 */
605 *request_queue2.tailp = request;
606 request_queue2.tailp = &request->next;
607 }
608
609 if (request_queue2.head) {
610 static int filter = 0;
611 static int filter_limit = 8;
612
613 if (++filter >= filter_limit) {
614 filter_limit += filter;
615 filter = 0;
616 debugs(43, 1, "squidaio_queue_request: WARNING - Queue congestion");
617 }
618 }
619
620 /* Warn if out of threads */
621 if (request_queue_len > MAGIC1) {
622 static int last_warn = 0;
623 static int queue_high, queue_low;
624
625 if (high_start == 0) {
626 high_start = (int)squid_curtime;
627 queue_high = request_queue_len;
628 queue_low = request_queue_len;
629 }
630
631 if (request_queue_len > queue_high)
632 queue_high = request_queue_len;
633
634 if (request_queue_len < queue_low)
635 queue_low = request_queue_len;
636
637 if (squid_curtime >= (last_warn + 15) &&
638 squid_curtime >= (high_start + 5)) {
639 debugs(43, 1, "squidaio_queue_request: WARNING - Disk I/O overloading");
640
641 if (squid_curtime >= (high_start + 15))
642 debugs(43, 1, "squidaio_queue_request: Queue Length: current=" <<
643 request_queue_len << ", high=" << queue_high <<
644 ", low=" << queue_low << ", duration=" <<
645 (long int) (squid_curtime - high_start));
646
647 last_warn = (int)squid_curtime;
648 }
649 } else {
650 high_start = 0;
651 }
652
653 /* Warn if seriously overloaded */
654 if (request_queue_len > RIDICULOUS_LENGTH) {
655 debugs(43, 0, "squidaio_queue_request: Async request queue growing uncontrollably!");
656 debugs(43, 0, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
657 squidaio_sync();
658 debugs(43, 0, "squidaio_queue_request: Synced");
659 }
660 } /* squidaio_queue_request */
661
662 static void
663 squidaio_cleanup_request(squidaio_request_t * requestp)
664 {
665 squidaio_result_t *resultp = requestp->resultp;
666 int cancelled = requestp->cancelled;
667
668 /* Free allocated structures and copy data back to user space if the */
669 /* request hasn't been cancelled */
670
671 switch (requestp->request_type) {
672
673 case _AIO_OP_STAT:
674
675 if (!cancelled && requestp->ret == 0)
676
677 xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
678
679 squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
680
681 squidaio_xstrfree(requestp->path);
682
683 break;
684
685 case _AIO_OP_OPEN:
686 if (cancelled && requestp->ret >= 0)
687 /* The open() was cancelled but completed */
688 close(requestp->ret);
689
690 squidaio_xstrfree(requestp->path);
691
692 break;
693
694 case _AIO_OP_CLOSE:
695 if (cancelled && requestp->ret < 0)
696 /* The close() was cancelled and never got executed */
697 close(requestp->fd);
698
699 break;
700
701 case _AIO_OP_UNLINK:
702
703 case _AIO_OP_OPENDIR:
704 squidaio_xstrfree(requestp->path);
705
706 break;
707
708 case _AIO_OP_READ:
709 break;
710
711 case _AIO_OP_WRITE:
712 break;
713
714 default:
715 break;
716 }
717
718 if (resultp != NULL && !cancelled) {
719 resultp->aio_return = requestp->ret;
720 resultp->aio_errno = requestp->err;
721 }
722
723 squidaio_request_pool->free(requestp);
724 } /* squidaio_cleanup_request */
725
726
727 int
728 squidaio_cancel(squidaio_result_t * resultp)
729 {
730 squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
731
732 if (request && request->resultp == resultp) {
733 debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
734 request->cancelled = 1;
735 request->resultp = NULL;
736 resultp->_data = NULL;
737 resultp->result_type = _AIO_OP_NONE;
738 return 0;
739 }
740
741 return 1;
742 } /* squidaio_cancel */
743
744
745 int
746 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
747 {
748 squidaio_init();
749 squidaio_request_t *requestp;
750
751 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
752
753 requestp->path = (char *) squidaio_xstrdup(path);
754
755 requestp->oflag = oflag;
756
757 requestp->mode = mode;
758
759 requestp->resultp = resultp;
760
761 requestp->request_type = _AIO_OP_OPEN;
762
763 requestp->cancelled = 0;
764
765 resultp->result_type = _AIO_OP_OPEN;
766
767 squidaio_queue_request(requestp);
768
769 return 0;
770 }
771
772
773 static void
774 squidaio_do_open(squidaio_request_t * requestp)
775 {
776 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
777 requestp->err = errno;
778 }
779
780
781 int
782 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
783 {
784 squidaio_request_t *requestp;
785
786 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
787
788 requestp->fd = fd;
789
790 requestp->bufferp = bufp;
791
792 requestp->buflen = bufs;
793
794 requestp->offset = offset;
795
796 requestp->whence = whence;
797
798 requestp->resultp = resultp;
799
800 requestp->request_type = _AIO_OP_READ;
801
802 requestp->cancelled = 0;
803
804 resultp->result_type = _AIO_OP_READ;
805
806 squidaio_queue_request(requestp);
807
808 return 0;
809 }
810
811
812 static void
813 squidaio_do_read(squidaio_request_t * requestp)
814 {
815 lseek(requestp->fd, requestp->offset, requestp->whence);
816
817 if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
818 requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
819 WIN32_maperror(GetLastError());
820 requestp->ret = -1;
821 }
822
823 requestp->err = errno;
824 }
825
826
827 int
828 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
829 {
830 squidaio_request_t *requestp;
831
832 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
833
834 requestp->fd = fd;
835
836 requestp->bufferp = bufp;
837
838 requestp->buflen = bufs;
839
840 requestp->offset = offset;
841
842 requestp->whence = whence;
843
844 requestp->resultp = resultp;
845
846 requestp->request_type = _AIO_OP_WRITE;
847
848 requestp->cancelled = 0;
849
850 resultp->result_type = _AIO_OP_WRITE;
851
852 squidaio_queue_request(requestp);
853
854 return 0;
855 }
856
857
858 static void
859 squidaio_do_write(squidaio_request_t * requestp)
860 {
861 if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
862 requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
863 WIN32_maperror(GetLastError());
864 requestp->ret = -1;
865 }
866
867 requestp->err = errno;
868 }
869
870
871 int
872 squidaio_close(int fd, squidaio_result_t * resultp)
873 {
874 squidaio_request_t *requestp;
875
876 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
877
878 requestp->fd = fd;
879
880 requestp->resultp = resultp;
881
882 requestp->request_type = _AIO_OP_CLOSE;
883
884 requestp->cancelled = 0;
885
886 resultp->result_type = _AIO_OP_CLOSE;
887
888 squidaio_queue_request(requestp);
889
890 return 0;
891 }
892
893
894 static void
895 squidaio_do_close(squidaio_request_t * requestp)
896 {
897 if ((requestp->ret = close(requestp->fd)) < 0) {
898 debugs(43, 0, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
899 close(requestp->fd);
900 }
901
902 requestp->err = errno;
903 }
904
905
906 int
907
908 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
909 {
910 squidaio_init();
911 squidaio_request_t *requestp;
912
913 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
914
915 requestp->path = (char *) squidaio_xstrdup(path);
916
917 requestp->statp = sb;
918
919 requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
920
921 requestp->resultp = resultp;
922
923 requestp->request_type = _AIO_OP_STAT;
924
925 requestp->cancelled = 0;
926
927 resultp->result_type = _AIO_OP_STAT;
928
929 squidaio_queue_request(requestp);
930
931 return 0;
932 }
933
934
935 static void
936 squidaio_do_stat(squidaio_request_t * requestp)
937 {
938 requestp->ret = stat(requestp->path, requestp->tmpstatp);
939 requestp->err = errno;
940 }
941
942
943 int
944 squidaio_unlink(const char *path, squidaio_result_t * resultp)
945 {
946 squidaio_init();
947 squidaio_request_t *requestp;
948
949 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
950
951 requestp->path = squidaio_xstrdup(path);
952
953 requestp->resultp = resultp;
954
955 requestp->request_type = _AIO_OP_UNLINK;
956
957 requestp->cancelled = 0;
958
959 resultp->result_type = _AIO_OP_UNLINK;
960
961 squidaio_queue_request(requestp);
962
963 return 0;
964 }
965
966
967 static void
968 squidaio_do_unlink(squidaio_request_t * requestp)
969 {
970 requestp->ret = unlink(requestp->path);
971 requestp->err = errno;
972 }
973
974 #if AIO_OPENDIR
975 /* XXX squidaio_opendir NOT implemented yet.. */
976
977 int
978 squidaio_opendir(const char *path, squidaio_result_t * resultp)
979 {
980 squidaio_request_t *requestp;
981 int len;
982
983 requestp = squidaio_request_pool->alloc();
984
985 resultp->result_type = _AIO_OP_OPENDIR;
986
987 return -1;
988 }
989
990 static void
991 squidaio_do_opendir(squidaio_request_t * requestp)
992 {
993 /* NOT IMPLEMENTED */
994 }
995
996 #endif
997
998 static void
999 squidaio_poll_queues(void)
1000 {
1001 /* kick "overflow" request queue */
1002
1003 if (request_queue2.head &&
1004 (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
1005 *request_queue.tailp = request_queue2.head;
1006 request_queue.tailp = request_queue2.tailp;
1007
1008 if (!SetEvent(request_queue.cond))
1009 fatal("couldn't push queue\n");
1010
1011 if (!ReleaseMutex(request_queue.mutex)) {
1012 /* unexpected error */
1013 }
1014
1015 Sleep(0);
1016 request_queue2.head = NULL;
1017 request_queue2.tailp = &request_queue2.head;
1018 }
1019
1020 /* poll done queue */
1021 if (done_queue.head &&
1022 (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
1023
1024 struct squidaio_request_t *requests = done_queue.head;
1025 done_queue.head = NULL;
1026 done_queue.tailp = &done_queue.head;
1027
1028 if (!ReleaseMutex(done_queue.mutex)) {
1029 /* unexpected error */
1030 }
1031
1032 Sleep(0);
1033 *done_requests.tailp = requests;
1034 request_queue_len -= 1;
1035
1036 while (requests->next) {
1037 requests = requests->next;
1038 request_queue_len -= 1;
1039 }
1040
1041 done_requests.tailp = &requests->next;
1042 }
1043 }
1044
1045 squidaio_result_t *
1046 squidaio_poll_done(void)
1047 {
1048 squidaio_request_t *request;
1049 squidaio_result_t *resultp;
1050 int cancelled;
1051 int polled = 0;
1052
1053 AIO_REPOLL:
1054 request = done_requests.head;
1055
1056 if (request == NULL && !polled) {
1057 CommIO::ResetNotifications();
1058 squidaio_poll_queues();
1059 polled = 1;
1060 request = done_requests.head;
1061 }
1062
1063 if (!request) {
1064 return NULL;
1065 }
1066
1067 debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
1068 done_requests.head = request->next;
1069
1070 if (!done_requests.head)
1071 done_requests.tailp = &done_requests.head;
1072
1073 resultp = request->resultp;
1074
1075 cancelled = request->cancelled;
1076
1077 squidaio_debug(request);
1078
1079 debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
1080
1081 squidaio_cleanup_request(request);
1082
1083 if (cancelled)
1084 goto AIO_REPOLL;
1085
1086 return resultp;
1087 } /* squidaio_poll_done */
1088
1089 int
1090 squidaio_operations_pending(void)
1091 {
1092 return request_queue_len + (done_requests.head ? 1 : 0);
1093 }
1094
1095 int
1096 squidaio_sync(void)
1097 {
1098 /* XXX This might take a while if the queue is large.. */
1099
1100 do {
1101 squidaio_poll_queues();
1102 } while (request_queue_len > 0);
1103
1104 return squidaio_operations_pending();
1105 }
1106
1107 int
1108 squidaio_get_queue_len(void)
1109 {
1110 return request_queue_len;
1111 }
1112
1113 static void
1114 squidaio_debug(squidaio_request_t * request)
1115 {
1116 switch (request->request_type) {
1117
1118 case _AIO_OP_OPEN:
1119 debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1120 break;
1121
1122 case _AIO_OP_READ:
1123 debugs(43, 5, "READ on fd: " << request->fd);
1124 break;
1125
1126 case _AIO_OP_WRITE:
1127 debugs(43, 5, "WRITE on fd: " << request->fd);
1128 break;
1129
1130 case _AIO_OP_CLOSE:
1131 debugs(43, 5, "CLOSE of fd: " << request->fd);
1132 break;
1133
1134 case _AIO_OP_UNLINK:
1135 debugs(43, 5, "UNLINK of " << request->path);
1136 break;
1137
1138 default:
1139 break;
1140 }
1141 }
1142
1143 void
1144 squidaio_stats(StoreEntry * sentry)
1145 {
1146 squidaio_thread_t *threadp;
1147 int i;
1148
1149 if (!squidaio_initialised)
1150 return;
1151
1152 storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1153
1154 storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1155
1156 threadp = threads;
1157
1158 for (i = 0; i < NUMTHREADS; i++) {
1159 storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1160 threadp = threadp->next;
1161 }
1162 }