]> git.ipfire.org Git - thirdparty/squid.git/blob - src/DiskIO/DiskThreads/aiops_win32.cc
Merged from trunk
[thirdparty/squid.git] / src / DiskIO / DiskThreads / aiops_win32.cc
1 /*
2 * DEBUG: section 43 Windows AIOPS
3 * AUTHOR: Stewart Forster <slf@connect.com.au>
4 * AUTHOR: Robert Collins <robertc@squid-cache.org>
5 * AUTHOR: Guido Serassio <serassio@squid-cache.org>
6 *
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
9 *
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
23 *
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 */
34
35 #include "squid.h"
36 #include "DiskIO/DiskThreads/CommIO.h"
37 #include "DiskThreads.h"
38 #include "SquidConfig.h"
39 #include "SquidTime.h"
40 #include "Store.h"
41
42 #include <stdio.h>
43 #include <sys/stat.h>
44 #include <fcntl.h>
45 #include <errno.h>
46 #include <dirent.h>
47 #include <signal.h>
48
49 #define RIDICULOUS_LENGTH 4096
50
51 enum _squidaio_thread_status {
52 _THREAD_STARTING = 0,
53 _THREAD_WAITING,
54 _THREAD_BUSY,
55 _THREAD_FAILED,
56 _THREAD_DONE
57 };
58 typedef enum _squidaio_thread_status squidaio_thread_status;
59
60 typedef struct squidaio_request_t {
61
62 struct squidaio_request_t *next;
63 squidaio_request_type request_type;
64 int cancelled;
65 char *path;
66 int oflag;
67 mode_t mode;
68 int fd;
69 char *bufferp;
70 char *tmpbufp;
71 size_t buflen;
72 off_t offset;
73 int whence;
74 int ret;
75 int err;
76
77 struct stat *tmpstatp;
78
79 struct stat *statp;
80 squidaio_result_t *resultp;
81 } squidaio_request_t;
82
83 typedef struct squidaio_request_queue_t {
84 HANDLE mutex;
85 HANDLE cond; /* See Event objects */
86 squidaio_request_t *volatile head;
87 squidaio_request_t *volatile *volatile tailp;
88 unsigned long requests;
89 unsigned long blocked; /* main failed to lock the queue */
90 } squidaio_request_queue_t;
91
92 typedef struct squidaio_thread_t squidaio_thread_t;
93
94 struct squidaio_thread_t {
95 squidaio_thread_t *next;
96 HANDLE thread;
97 DWORD dwThreadId; /* thread ID */
98 squidaio_thread_status status;
99
100 struct squidaio_request_t *current_req;
101 unsigned long requests;
102 int volatile exit;
103 };
104
105 static void squidaio_queue_request(squidaio_request_t *);
106 static void squidaio_cleanup_request(squidaio_request_t *);
107 static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
108 static void squidaio_do_open(squidaio_request_t *);
109 static void squidaio_do_read(squidaio_request_t *);
110 static void squidaio_do_write(squidaio_request_t *);
111 static void squidaio_do_close(squidaio_request_t *);
112 static void squidaio_do_stat(squidaio_request_t *);
113 static void squidaio_do_unlink(squidaio_request_t *);
114 #if AIO_OPENDIR
115 static void *squidaio_do_opendir(squidaio_request_t *);
116 #endif
117 static void squidaio_debug(squidaio_request_t *);
118 static void squidaio_poll_queues(void);
119
120 static squidaio_thread_t *threads = NULL;
121 static int squidaio_initialised = 0;
122
123 #define AIO_LARGE_BUFS 16384
124 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
125 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
126 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
127 #define AIO_MICRO_BUFS 128
128
129 static MemAllocator *squidaio_large_bufs = NULL; /* 16K */
130 static MemAllocator *squidaio_medium_bufs = NULL; /* 8K */
131 static MemAllocator *squidaio_small_bufs = NULL; /* 4K */
132 static MemAllocator *squidaio_tiny_bufs = NULL; /* 2K */
133 static MemAllocator *squidaio_micro_bufs = NULL; /* 128K */
134
135 static int request_queue_len = 0;
136 static MemAllocator *squidaio_request_pool = NULL;
137 static MemAllocator *squidaio_thread_pool = NULL;
138 static squidaio_request_queue_t request_queue;
139
140 static struct {
141 squidaio_request_t *head, **tailp;
142 }
143
144 request_queue2 = {
145
146 NULL, &request_queue2.head
147 };
148 static squidaio_request_queue_t done_queue;
149
150 static struct {
151 squidaio_request_t *head, **tailp;
152 }
153
154 done_requests = {
155
156 NULL, &done_requests.head
157 };
158
159 static HANDLE main_thread;
160
161 static MemAllocator *
162 squidaio_get_pool(int size)
163 {
164 if (size <= AIO_LARGE_BUFS) {
165 if (size <= AIO_MICRO_BUFS)
166 return squidaio_micro_bufs;
167 else if (size <= AIO_TINY_BUFS)
168 return squidaio_tiny_bufs;
169 else if (size <= AIO_SMALL_BUFS)
170 return squidaio_small_bufs;
171 else if (size <= AIO_MEDIUM_BUFS)
172 return squidaio_medium_bufs;
173 else
174 return squidaio_large_bufs;
175 }
176
177 return NULL;
178 }
179
180 void *
181 squidaio_xmalloc(int size)
182 {
183 void *p;
184 MemAllocator *pool;
185
186 if ((pool = squidaio_get_pool(size)) != NULL) {
187 p = pool->alloc();
188 } else
189 p = xmalloc(size);
190
191 return p;
192 }
193
194 static char *
195 squidaio_xstrdup(const char *str)
196 {
197 char *p;
198 int len = strlen(str) + 1;
199
200 p = (char *)squidaio_xmalloc(len);
201 strncpy(p, str, len);
202
203 return p;
204 }
205
206 void
207 squidaio_xfree(void *p, int size)
208 {
209 MemAllocator *pool;
210
211 if ((pool = squidaio_get_pool(size)) != NULL) {
212 pool->free(p);
213 } else
214 xfree(p);
215 }
216
217 static void
218 squidaio_xstrfree(char *str)
219 {
220 MemAllocator *pool;
221 int len = strlen(str) + 1;
222
223 if ((pool = squidaio_get_pool(len)) != NULL) {
224 pool->free(str);
225 } else
226 xfree(str);
227 }
228
229 void
230 squidaio_init(void)
231 {
232 int i;
233 squidaio_thread_t *threadp;
234
235 if (squidaio_initialised)
236 return;
237
238 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
239 GetCurrentThread(), /* pseudo handle to copy */
240 GetCurrentProcess(), /* pseudo handle, don't close */
241 &main_thread,
242 0, /* required access */
243 FALSE, /* child process's don't inherit the handle */
244 DUPLICATE_SAME_ACCESS)) {
245 /* spit errors */
246 fatal("Couldn't get current thread handle");
247 }
248
249 /* Initialize request queue */
250 if ((request_queue.mutex = CreateMutex(NULL, /* no inheritance */
251 FALSE, /* start unowned (as per mutex_init) */
252 NULL) /* no name */
253 ) == NULL) {
254 fatal("Failed to create mutex");
255 }
256
257 if ((request_queue.cond = CreateEvent(NULL, /* no inheritance */
258 FALSE, /* auto signal reset - which I think is pthreads like ? */
259 FALSE, /* start non signaled */
260 NULL) /* no name */
261 ) == NULL) {
262 fatal("Failed to create condition variable");
263 }
264
265 request_queue.head = NULL;
266
267 request_queue.tailp = &request_queue.head;
268
269 request_queue.requests = 0;
270
271 request_queue.blocked = 0;
272
273 /* Initialize done queue */
274
275 if ((done_queue.mutex = CreateMutex(NULL, /* no inheritance */
276 FALSE, /* start unowned (as per mutex_init) */
277 NULL) /* no name */
278 ) == NULL) {
279 fatal("Failed to create mutex");
280 }
281
282 if ((done_queue.cond = CreateEvent(NULL, /* no inheritance */
283 TRUE, /* manually signaled - which I think is pthreads like ? */
284 FALSE, /* start non signaled */
285 NULL) /* no name */
286 ) == NULL) {
287 fatal("Failed to create condition variable");
288 }
289
290 done_queue.head = NULL;
291
292 done_queue.tailp = &done_queue.head;
293
294 done_queue.requests = 0;
295
296 done_queue.blocked = 0;
297
298 CommIO::NotifyIOCompleted();
299
300 /* Create threads and get them to sit in their wait loop */
301 squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
302
303 assert(NUMTHREADS);
304
305 for (i = 0; i < NUMTHREADS; ++i) {
306 threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
307 threadp->status = _THREAD_STARTING;
308 threadp->current_req = NULL;
309 threadp->requests = 0;
310 threadp->next = threads;
311 threads = threadp;
312
313 if ((threadp->thread = CreateThread(NULL, /* no security attributes */
314 0, /* use default stack size */
315 squidaio_thread_loop, /* thread function */
316 threadp, /* argument to thread function */
317 0, /* use default creation flags */
318 &(threadp->dwThreadId)) /* returns the thread identifier */
319 ) == NULL) {
320 fprintf(stderr, "Thread creation failed\n");
321 threadp->status = _THREAD_FAILED;
322 continue;
323 }
324
325 /* Set the new thread priority above parent process */
326 SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
327 }
328
329 /* Create request pool */
330 squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
331
332 squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
333
334 squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
335
336 squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
337
338 squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
339
340 squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
341
342 squidaio_initialised = 1;
343 }
344
345 void
346 squidaio_shutdown(void)
347 {
348 squidaio_thread_t *threadp;
349 int i;
350 HANDLE * hthreads;
351
352 if (!squidaio_initialised)
353 return;
354
355 /* This is the same as in squidaio_sync */
356 do {
357 squidaio_poll_queues();
358 } while (request_queue_len > 0);
359
360 hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
361
362 threadp = threads;
363
364 for (i = 0; i < NUMTHREADS; ++i) {
365 threadp->exit = 1;
366 hthreads[i] = threadp->thread;
367 threadp = threadp->next;
368 }
369
370 ReleaseMutex(request_queue.mutex);
371 ResetEvent(request_queue.cond);
372 ReleaseMutex(done_queue.mutex);
373 ResetEvent(done_queue.cond);
374 Sleep(0);
375
376 WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
377
378 for (i = 0; i < NUMTHREADS; ++i) {
379 CloseHandle(hthreads[i]);
380 }
381
382 CloseHandle(main_thread);
383 CommIO::NotifyIOClose();
384
385 squidaio_initialised = 0;
386 xfree(hthreads);
387 }
388
389 static DWORD WINAPI
390 squidaio_thread_loop(LPVOID lpParam)
391 {
392 squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
393 squidaio_request_t *request;
394 HANDLE cond; /* local copy of the event queue because win32 event handles
395 * don't atomically release the mutex as cond variables do. */
396
397 /* lock the thread info */
398
399 if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
400 fatal("Can't get ownership of mutex\n");
401 }
402
403 /* duplicate the handle */
404 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
405 request_queue.cond, /* handle to copy */
406 GetCurrentProcess(), /* pseudo handle, don't close */
407 &cond,
408 0, /* required access */
409 FALSE, /* child process's don't inherit the handle */
410 DUPLICATE_SAME_ACCESS))
411 fatal("Can't duplicate mutex handle\n");
412
413 if (!ReleaseMutex(request_queue.mutex)) {
414 CloseHandle(cond);
415 fatal("Can't release mutex\n");
416 }
417
418 Sleep(0);
419
420 while (1) {
421 DWORD rv;
422 threadp->current_req = request = NULL;
423 request = NULL;
424 /* Get a request to process */
425 threadp->status = _THREAD_WAITING;
426
427 if (threadp->exit) {
428 CloseHandle(request_queue.mutex);
429 CloseHandle(cond);
430 return 0;
431 }
432
433 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
434
435 if (rv == WAIT_FAILED) {
436 CloseHandle(cond);
437 return 1;
438 }
439
440 while (!request_queue.head) {
441 if (!ReleaseMutex(request_queue.mutex)) {
442 CloseHandle(cond);
443 threadp->status = _THREAD_FAILED;
444 return 1;
445 }
446
447 Sleep(0);
448 rv = WaitForSingleObject(cond, INFINITE);
449
450 if (rv == WAIT_FAILED) {
451 CloseHandle(cond);
452 return 1;
453 }
454
455 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
456
457 if (rv == WAIT_FAILED) {
458 CloseHandle(cond);
459 return 1;
460 }
461 }
462
463 request = request_queue.head;
464
465 if (request)
466 request_queue.head = request->next;
467
468 if (!request_queue.head)
469 request_queue.tailp = &request_queue.head;
470
471 if (!ReleaseMutex(request_queue.mutex)) {
472 CloseHandle(cond);
473 return 1;
474 }
475
476 Sleep(0);
477
478 /* process the request */
479 threadp->status = _THREAD_BUSY;
480
481 request->next = NULL;
482
483 threadp->current_req = request;
484
485 errno = 0;
486
487 if (!request->cancelled) {
488 switch (request->request_type) {
489
490 case _AIO_OP_OPEN:
491 squidaio_do_open(request);
492 break;
493
494 case _AIO_OP_READ:
495 squidaio_do_read(request);
496 break;
497
498 case _AIO_OP_WRITE:
499 squidaio_do_write(request);
500 break;
501
502 case _AIO_OP_CLOSE:
503 squidaio_do_close(request);
504 break;
505
506 case _AIO_OP_UNLINK:
507 squidaio_do_unlink(request);
508 break;
509
510 #if AIO_OPENDIR /* Opendir not implemented yet */
511
512 case _AIO_OP_OPENDIR:
513 squidaio_do_opendir(request);
514 break;
515 #endif
516
517 case _AIO_OP_STAT:
518 squidaio_do_stat(request);
519 break;
520
521 default:
522 request->ret = -1;
523 request->err = EINVAL;
524 break;
525 }
526 } else { /* cancelled */
527 request->ret = -1;
528 request->err = EINTR;
529 }
530
531 threadp->status = _THREAD_DONE;
532 /* put the request in the done queue */
533 rv = WaitForSingleObject(done_queue.mutex, INFINITE);
534
535 if (rv == WAIT_FAILED) {
536 CloseHandle(cond);
537 return 1;
538 }
539
540 *done_queue.tailp = request;
541 done_queue.tailp = &request->next;
542
543 if (!ReleaseMutex(done_queue.mutex)) {
544 CloseHandle(cond);
545 return 1;
546 }
547
548 CommIO::NotifyIOCompleted();
549 Sleep(0);
550 ++ threadp->requests;
551 } /* while forever */
552
553 CloseHandle(cond);
554
555 return 0;
556 } /* squidaio_thread_loop */
557
558 static void
559 squidaio_queue_request(squidaio_request_t * request)
560 {
561 static int high_start = 0;
562 debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
563 /* Mark it as not executed (failing result, no error) */
564 request->ret = -1;
565 request->err = 0;
566 /* Internal housekeeping */
567 request_queue_len += 1;
568 request->resultp->_data = request;
569 /* Play some tricks with the request_queue2 queue */
570 request->next = NULL;
571
572 if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
573 if (request_queue2.head) {
574 /* Grab blocked requests */
575 *request_queue.tailp = request_queue2.head;
576 request_queue.tailp = request_queue2.tailp;
577 }
578
579 /* Enqueue request */
580 *request_queue.tailp = request;
581
582 request_queue.tailp = &request->next;
583
584 if (!SetEvent(request_queue.cond))
585 fatal("Couldn't push queue");
586
587 if (!ReleaseMutex(request_queue.mutex)) {
588 /* unexpected error */
589 fatal("Couldn't push queue");
590 }
591
592 Sleep(0);
593
594 if (request_queue2.head) {
595 /* Clear queue of blocked requests */
596 request_queue2.head = NULL;
597 request_queue2.tailp = &request_queue2.head;
598 }
599 } else {
600 /* Oops, the request queue is blocked, use request_queue2 */
601 *request_queue2.tailp = request;
602 request_queue2.tailp = &request->next;
603 }
604
605 if (request_queue2.head) {
606 static int filter = 0;
607 static int filter_limit = 8;
608
609 if (++filter >= filter_limit) {
610 filter_limit += filter;
611 filter = 0;
612 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Queue congestion");
613 }
614 }
615
616 /* Warn if out of threads */
617 if (request_queue_len > MAGIC1) {
618 static int last_warn = 0;
619 static int queue_high, queue_low;
620
621 if (high_start == 0) {
622 high_start = (int)squid_curtime;
623 queue_high = request_queue_len;
624 queue_low = request_queue_len;
625 }
626
627 if (request_queue_len > queue_high)
628 queue_high = request_queue_len;
629
630 if (request_queue_len < queue_low)
631 queue_low = request_queue_len;
632
633 if (squid_curtime >= (last_warn + 15) &&
634 squid_curtime >= (high_start + 5)) {
635 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Disk I/O overloading");
636
637 if (squid_curtime >= (high_start + 15))
638 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
639 request_queue_len << ", high=" << queue_high <<
640 ", low=" << queue_low << ", duration=" <<
641 (long int) (squid_curtime - high_start));
642
643 last_warn = (int)squid_curtime;
644 }
645 } else {
646 high_start = 0;
647 }
648
649 /* Warn if seriously overloaded */
650 if (request_queue_len > RIDICULOUS_LENGTH) {
651 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
652 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
653 squidaio_sync();
654 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
655 }
656 } /* squidaio_queue_request */
657
658 static void
659 squidaio_cleanup_request(squidaio_request_t * requestp)
660 {
661 squidaio_result_t *resultp = requestp->resultp;
662 int cancelled = requestp->cancelled;
663
664 /* Free allocated structures and copy data back to user space if the */
665 /* request hasn't been cancelled */
666
667 switch (requestp->request_type) {
668
669 case _AIO_OP_STAT:
670
671 if (!cancelled && requestp->ret == 0)
672 memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
673
674 squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
675
676 squidaio_xstrfree(requestp->path);
677
678 break;
679
680 case _AIO_OP_OPEN:
681 if (cancelled && requestp->ret >= 0)
682 /* The open() was cancelled but completed */
683 close(requestp->ret);
684
685 squidaio_xstrfree(requestp->path);
686
687 break;
688
689 case _AIO_OP_CLOSE:
690 if (cancelled && requestp->ret < 0)
691 /* The close() was cancelled and never got executed */
692 close(requestp->fd);
693
694 break;
695
696 case _AIO_OP_UNLINK:
697
698 case _AIO_OP_OPENDIR:
699 squidaio_xstrfree(requestp->path);
700
701 break;
702
703 case _AIO_OP_READ:
704 break;
705
706 case _AIO_OP_WRITE:
707 break;
708
709 default:
710 break;
711 }
712
713 if (resultp != NULL && !cancelled) {
714 resultp->aio_return = requestp->ret;
715 resultp->aio_errno = requestp->err;
716 }
717
718 squidaio_request_pool->free(requestp);
719 } /* squidaio_cleanup_request */
720
721 int
722 squidaio_cancel(squidaio_result_t * resultp)
723 {
724 squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
725
726 if (request && request->resultp == resultp) {
727 debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
728 request->cancelled = 1;
729 request->resultp = NULL;
730 resultp->_data = NULL;
731 resultp->result_type = _AIO_OP_NONE;
732 return 0;
733 }
734
735 return 1;
736 } /* squidaio_cancel */
737
738 int
739 squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
740 {
741 squidaio_init();
742 squidaio_request_t *requestp;
743
744 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
745
746 requestp->path = (char *) squidaio_xstrdup(path);
747
748 requestp->oflag = oflag;
749
750 requestp->mode = mode;
751
752 requestp->resultp = resultp;
753
754 requestp->request_type = _AIO_OP_OPEN;
755
756 requestp->cancelled = 0;
757
758 resultp->result_type = _AIO_OP_OPEN;
759
760 squidaio_queue_request(requestp);
761
762 return 0;
763 }
764
765 static void
766 squidaio_do_open(squidaio_request_t * requestp)
767 {
768 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
769 requestp->err = errno;
770 }
771
772 int
773 squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
774 {
775 squidaio_request_t *requestp;
776
777 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
778
779 requestp->fd = fd;
780
781 requestp->bufferp = bufp;
782
783 requestp->buflen = bufs;
784
785 requestp->offset = offset;
786
787 requestp->whence = whence;
788
789 requestp->resultp = resultp;
790
791 requestp->request_type = _AIO_OP_READ;
792
793 requestp->cancelled = 0;
794
795 resultp->result_type = _AIO_OP_READ;
796
797 squidaio_queue_request(requestp);
798
799 return 0;
800 }
801
802 static void
803 squidaio_do_read(squidaio_request_t * requestp)
804 {
805 lseek(requestp->fd, requestp->offset, requestp->whence);
806
807 if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
808 requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
809 WIN32_maperror(GetLastError());
810 requestp->ret = -1;
811 }
812
813 requestp->err = errno;
814 }
815
816 int
817 squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
818 {
819 squidaio_request_t *requestp;
820
821 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
822
823 requestp->fd = fd;
824
825 requestp->bufferp = bufp;
826
827 requestp->buflen = bufs;
828
829 requestp->offset = offset;
830
831 requestp->whence = whence;
832
833 requestp->resultp = resultp;
834
835 requestp->request_type = _AIO_OP_WRITE;
836
837 requestp->cancelled = 0;
838
839 resultp->result_type = _AIO_OP_WRITE;
840
841 squidaio_queue_request(requestp);
842
843 return 0;
844 }
845
846 static void
847 squidaio_do_write(squidaio_request_t * requestp)
848 {
849 if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
850 requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
851 WIN32_maperror(GetLastError());
852 requestp->ret = -1;
853 }
854
855 requestp->err = errno;
856 }
857
858 int
859 squidaio_close(int fd, squidaio_result_t * resultp)
860 {
861 squidaio_request_t *requestp;
862
863 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
864
865 requestp->fd = fd;
866
867 requestp->resultp = resultp;
868
869 requestp->request_type = _AIO_OP_CLOSE;
870
871 requestp->cancelled = 0;
872
873 resultp->result_type = _AIO_OP_CLOSE;
874
875 squidaio_queue_request(requestp);
876
877 return 0;
878 }
879
880 static void
881 squidaio_do_close(squidaio_request_t * requestp)
882 {
883 if ((requestp->ret = close(requestp->fd)) < 0) {
884 debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
885 close(requestp->fd);
886 }
887
888 requestp->err = errno;
889 }
890
891 int
892
893 squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
894 {
895 squidaio_init();
896 squidaio_request_t *requestp;
897
898 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
899
900 requestp->path = (char *) squidaio_xstrdup(path);
901
902 requestp->statp = sb;
903
904 requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
905
906 requestp->resultp = resultp;
907
908 requestp->request_type = _AIO_OP_STAT;
909
910 requestp->cancelled = 0;
911
912 resultp->result_type = _AIO_OP_STAT;
913
914 squidaio_queue_request(requestp);
915
916 return 0;
917 }
918
919 static void
920 squidaio_do_stat(squidaio_request_t * requestp)
921 {
922 requestp->ret = stat(requestp->path, requestp->tmpstatp);
923 requestp->err = errno;
924 }
925
926 int
927 squidaio_unlink(const char *path, squidaio_result_t * resultp)
928 {
929 squidaio_init();
930 squidaio_request_t *requestp;
931
932 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
933
934 requestp->path = squidaio_xstrdup(path);
935
936 requestp->resultp = resultp;
937
938 requestp->request_type = _AIO_OP_UNLINK;
939
940 requestp->cancelled = 0;
941
942 resultp->result_type = _AIO_OP_UNLINK;
943
944 squidaio_queue_request(requestp);
945
946 return 0;
947 }
948
949 static void
950 squidaio_do_unlink(squidaio_request_t * requestp)
951 {
952 requestp->ret = unlink(requestp->path);
953 requestp->err = errno;
954 }
955
956 #if AIO_OPENDIR
957 /* XXX squidaio_opendir NOT implemented yet.. */
958
959 int
960 squidaio_opendir(const char *path, squidaio_result_t * resultp)
961 {
962 squidaio_request_t *requestp;
963 int len;
964
965 requestp = squidaio_request_pool->alloc();
966
967 resultp->result_type = _AIO_OP_OPENDIR;
968
969 return -1;
970 }
971
972 static void
973 squidaio_do_opendir(squidaio_request_t * requestp)
974 {
975 /* NOT IMPLEMENTED */
976 }
977
978 #endif
979
980 static void
981 squidaio_poll_queues(void)
982 {
983 /* kick "overflow" request queue */
984
985 if (request_queue2.head &&
986 (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
987 *request_queue.tailp = request_queue2.head;
988 request_queue.tailp = request_queue2.tailp;
989
990 if (!SetEvent(request_queue.cond))
991 fatal("couldn't push queue\n");
992
993 if (!ReleaseMutex(request_queue.mutex)) {
994 /* unexpected error */
995 }
996
997 Sleep(0);
998 request_queue2.head = NULL;
999 request_queue2.tailp = &request_queue2.head;
1000 }
1001
1002 /* poll done queue */
1003 if (done_queue.head &&
1004 (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
1005
1006 struct squidaio_request_t *requests = done_queue.head;
1007 done_queue.head = NULL;
1008 done_queue.tailp = &done_queue.head;
1009
1010 if (!ReleaseMutex(done_queue.mutex)) {
1011 /* unexpected error */
1012 }
1013
1014 Sleep(0);
1015 *done_requests.tailp = requests;
1016 request_queue_len -= 1;
1017
1018 while (requests->next) {
1019 requests = requests->next;
1020 request_queue_len -= 1;
1021 }
1022
1023 done_requests.tailp = &requests->next;
1024 }
1025 }
1026
1027 squidaio_result_t *
1028 squidaio_poll_done(void)
1029 {
1030 squidaio_request_t *request;
1031 squidaio_result_t *resultp;
1032 int cancelled;
1033 int polled = 0;
1034
1035 AIO_REPOLL:
1036 request = done_requests.head;
1037
1038 if (request == NULL && !polled) {
1039 CommIO::ResetNotifications();
1040 squidaio_poll_queues();
1041 polled = 1;
1042 request = done_requests.head;
1043 }
1044
1045 if (!request) {
1046 return NULL;
1047 }
1048
1049 debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
1050 done_requests.head = request->next;
1051
1052 if (!done_requests.head)
1053 done_requests.tailp = &done_requests.head;
1054
1055 resultp = request->resultp;
1056
1057 cancelled = request->cancelled;
1058
1059 squidaio_debug(request);
1060
1061 debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
1062
1063 squidaio_cleanup_request(request);
1064
1065 if (cancelled)
1066 goto AIO_REPOLL;
1067
1068 return resultp;
1069 } /* squidaio_poll_done */
1070
1071 int
1072 squidaio_operations_pending(void)
1073 {
1074 return request_queue_len + (done_requests.head ? 1 : 0);
1075 }
1076
1077 int
1078 squidaio_sync(void)
1079 {
1080 /* XXX This might take a while if the queue is large.. */
1081
1082 do {
1083 squidaio_poll_queues();
1084 } while (request_queue_len > 0);
1085
1086 return squidaio_operations_pending();
1087 }
1088
1089 int
1090 squidaio_get_queue_len(void)
1091 {
1092 return request_queue_len;
1093 }
1094
1095 static void
1096 squidaio_debug(squidaio_request_t * request)
1097 {
1098 switch (request->request_type) {
1099
1100 case _AIO_OP_OPEN:
1101 debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
1102 break;
1103
1104 case _AIO_OP_READ:
1105 debugs(43, 5, "READ on fd: " << request->fd);
1106 break;
1107
1108 case _AIO_OP_WRITE:
1109 debugs(43, 5, "WRITE on fd: " << request->fd);
1110 break;
1111
1112 case _AIO_OP_CLOSE:
1113 debugs(43, 5, "CLOSE of fd: " << request->fd);
1114 break;
1115
1116 case _AIO_OP_UNLINK:
1117 debugs(43, 5, "UNLINK of " << request->path);
1118 break;
1119
1120 default:
1121 break;
1122 }
1123 }
1124
1125 void
1126 squidaio_stats(StoreEntry * sentry)
1127 {
1128 squidaio_thread_t *threadp;
1129 int i;
1130
1131 if (!squidaio_initialised)
1132 return;
1133
1134 storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1135
1136 storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1137
1138 threadp = threads;
1139
1140 for (i = 0; i < NUMTHREADS; ++i) {
1141 storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1142 threadp = threadp->next;
1143 }
1144 }