4 * DEBUG: section 43 Windows AIOPS
5 * AUTHOR: Stewart Forster <slf@connect.com.au>
6 * AUTHOR: Robert Collins <robertc@squid-cache.org>
7 * AUTHOR: Guido Serassio <serassio@squid-cache.org>
9 * SQUID Web Proxy Cache http://www.squid-cache.org/
10 * ----------------------------------------------------------
12 * Squid is the result of efforts by numerous individuals from
13 * the Internet community; see the CONTRIBUTORS file for full
14 * details. Many organizations have provided support for Squid's
15 * development; see the SPONSORS file for full details. Squid is
16 * Copyrighted (C) 2001 by the Regents of the University of
17 * California; see the COPYRIGHT file for full details. Squid
18 * incorporates software developed and/or copyrighted by other
19 * sources; see the CREDITS file for full details.
21 * This program is free software; you can redistribute it and/or modify
22 * it under the terms of the GNU General Public License as published by
23 * the Free Software Foundation; either version 2 of the License, or
24 * (at your option) any later version.
26 * This program is distributed in the hope that it will be useful,
27 * but WITHOUT ANY WARRANTY; without even the implied warranty of
28 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 * GNU General Public License for more details.
31 * You should have received a copy of the GNU General Public License
32 * along with this program; if not, write to the Free Software
33 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
38 #include "squid_windows.h"
39 #include "DiskThreads.h"
42 #include <sys/types.h>
49 #include "SquidTime.h"
52 #define RIDICULOUS_LENGTH 4096
54 enum _squidaio_thread_status
{
61 typedef enum _squidaio_thread_status squidaio_thread_status
;
63 typedef struct squidaio_request_t
{
65 struct squidaio_request_t
*next
;
66 squidaio_request_type request_type
;
80 struct stat
*tmpstatp
;
83 squidaio_result_t
*resultp
;
86 typedef struct squidaio_request_queue_t
{
88 HANDLE cond
; /* See Event objects */
89 squidaio_request_t
*volatile head
;
90 squidaio_request_t
*volatile *volatile tailp
;
91 unsigned long requests
;
92 unsigned long blocked
; /* main failed to lock the queue */
93 } squidaio_request_queue_t
;
95 typedef struct squidaio_thread_t squidaio_thread_t
;
97 struct squidaio_thread_t
{
98 squidaio_thread_t
*next
;
100 DWORD dwThreadId
; /* thread ID */
101 squidaio_thread_status status
;
103 struct squidaio_request_t
*current_req
;
104 unsigned long requests
;
108 static void squidaio_queue_request(squidaio_request_t
*);
109 static void squidaio_cleanup_request(squidaio_request_t
*);
110 static DWORD WINAPI
squidaio_thread_loop( LPVOID lpParam
);
111 static void squidaio_do_open(squidaio_request_t
*);
112 static void squidaio_do_read(squidaio_request_t
*);
113 static void squidaio_do_write(squidaio_request_t
*);
114 static void squidaio_do_close(squidaio_request_t
*);
115 static void squidaio_do_stat(squidaio_request_t
*);
116 static void squidaio_do_unlink(squidaio_request_t
*);
118 static void *squidaio_do_opendir(squidaio_request_t
*);
120 static void squidaio_debug(squidaio_request_t
*);
121 static void squidaio_poll_queues(void);
123 static squidaio_thread_t
*threads
= NULL
;
124 static int squidaio_initialised
= 0;
127 #define AIO_LARGE_BUFS 16384
128 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
129 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
130 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
131 #define AIO_MICRO_BUFS 128
133 static MemAllocator
*squidaio_large_bufs
= NULL
; /* 16K */
134 static MemAllocator
*squidaio_medium_bufs
= NULL
; /* 8K */
135 static MemAllocator
*squidaio_small_bufs
= NULL
; /* 4K */
136 static MemAllocator
*squidaio_tiny_bufs
= NULL
; /* 2K */
137 static MemAllocator
*squidaio_micro_bufs
= NULL
; /* 128K */
139 static int request_queue_len
= 0;
140 static MemAllocator
*squidaio_request_pool
= NULL
;
141 static MemAllocator
*squidaio_thread_pool
= NULL
;
142 static squidaio_request_queue_t request_queue
;
145 squidaio_request_t
*head
, **tailp
;
150 NULL
, &request_queue2
.head
152 static squidaio_request_queue_t done_queue
;
155 squidaio_request_t
*head
, **tailp
;
160 NULL
, &done_requests
.head
163 static HANDLE main_thread
;
165 static MemAllocator
*
166 squidaio_get_pool(int size
)
168 if (size
<= AIO_LARGE_BUFS
) {
169 if (size
<= AIO_MICRO_BUFS
)
170 return squidaio_micro_bufs
;
171 else if (size
<= AIO_TINY_BUFS
)
172 return squidaio_tiny_bufs
;
173 else if (size
<= AIO_SMALL_BUFS
)
174 return squidaio_small_bufs
;
175 else if (size
<= AIO_MEDIUM_BUFS
)
176 return squidaio_medium_bufs
;
178 return squidaio_large_bufs
;
185 squidaio_xmalloc(int size
)
190 if ((pool
= squidaio_get_pool(size
)) != NULL
) {
199 squidaio_xstrdup(const char *str
)
202 int len
= strlen(str
) + 1;
204 p
= (char *)squidaio_xmalloc(len
);
205 strncpy(p
, str
, len
);
211 squidaio_xfree(void *p
, int size
)
215 if ((pool
= squidaio_get_pool(size
)) != NULL
) {
222 squidaio_xstrfree(char *str
)
225 int len
= strlen(str
) + 1;
227 if ((pool
= squidaio_get_pool(len
)) != NULL
) {
237 squidaio_thread_t
*threadp
;
239 if (squidaio_initialised
)
242 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
243 GetCurrentThread(), /* pseudo handle to copy */
244 GetCurrentProcess(), /* pseudo handle, don't close */
246 0, /* required access */
247 FALSE
, /* child process's don't inherit the handle */
248 DUPLICATE_SAME_ACCESS
)) {
250 fatal("Couldn't get current thread handle");
253 /* Initialize request queue */
254 if ((request_queue
.mutex
= CreateMutex(NULL
, /* no inheritance */
255 FALSE
, /* start unowned (as per mutex_init) */
258 fatal("Failed to create mutex");
261 if ((request_queue
.cond
= CreateEvent(NULL
, /* no inheritance */
262 FALSE
, /* auto signal reset - which I think is pthreads like ? */
263 FALSE
, /* start non signaled */
266 fatal("Failed to create condition variable");
269 request_queue
.head
= NULL
;
271 request_queue
.tailp
= &request_queue
.head
;
273 request_queue
.requests
= 0;
275 request_queue
.blocked
= 0;
277 /* Initialize done queue */
279 if ((done_queue
.mutex
= CreateMutex(NULL
, /* no inheritance */
280 FALSE
, /* start unowned (as per mutex_init) */
283 fatal("Failed to create mutex");
286 if ((done_queue
.cond
= CreateEvent(NULL
, /* no inheritance */
287 TRUE
, /* manually signaled - which I think is pthreads like ? */
288 FALSE
, /* start non signaled */
291 fatal("Failed to create condition variable");
294 done_queue
.head
= NULL
;
296 done_queue
.tailp
= &done_queue
.head
;
298 done_queue
.requests
= 0;
300 done_queue
.blocked
= 0;
302 CommIO::NotifyIOCompleted();
304 /* Create threads and get them to sit in their wait loop */
305 squidaio_thread_pool
= memPoolCreate("aio_thread", sizeof(squidaio_thread_t
));
309 for (i
= 0; i
< NUMTHREADS
; i
++) {
310 threadp
= (squidaio_thread_t
*)squidaio_thread_pool
->alloc();
311 threadp
->status
= _THREAD_STARTING
;
312 threadp
->current_req
= NULL
;
313 threadp
->requests
= 0;
314 threadp
->next
= threads
;
317 if ((threadp
->thread
= CreateThread(NULL
, /* no security attributes */
318 0, /* use default stack size */
319 squidaio_thread_loop
, /* thread function */
320 threadp
, /* argument to thread function */
321 0, /* use default creation flags */
322 &(threadp
->dwThreadId
)) /* returns the thread identifier */
324 fprintf(stderr
, "Thread creation failed\n");
325 threadp
->status
= _THREAD_FAILED
;
329 /* Set the new thread priority above parent process */
330 SetThreadPriority(threadp
->thread
,THREAD_PRIORITY_ABOVE_NORMAL
);
333 /* Create request pool */
334 squidaio_request_pool
= memPoolCreate("aio_request", sizeof(squidaio_request_t
));
336 squidaio_large_bufs
= memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS
);
338 squidaio_medium_bufs
= memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS
);
340 squidaio_small_bufs
= memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS
);
342 squidaio_tiny_bufs
= memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS
);
344 squidaio_micro_bufs
= memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS
);
346 squidaio_initialised
= 1;
350 squidaio_shutdown(void)
352 squidaio_thread_t
*threadp
;
356 if (!squidaio_initialised
)
359 /* This is the same as in squidaio_sync */
361 squidaio_poll_queues();
362 } while (request_queue_len
> 0);
364 hthreads
= (HANDLE
*) xcalloc (NUMTHREADS
, sizeof (HANDLE
));
368 for (i
= 0; i
< NUMTHREADS
; i
++) {
370 hthreads
[i
] = threadp
->thread
;
371 threadp
= threadp
->next
;
374 ReleaseMutex(request_queue
.mutex
);
375 ResetEvent(request_queue
.cond
);
376 ReleaseMutex(done_queue
.mutex
);
377 ResetEvent(done_queue
.cond
);
380 WaitForMultipleObjects(NUMTHREADS
, hthreads
, TRUE
, 2000);
382 for (i
= 0; i
< NUMTHREADS
; i
++) {
383 CloseHandle(hthreads
[i
]);
386 CloseHandle(main_thread
);
387 CommIO::NotifyIOClose();
389 squidaio_initialised
= 0;
394 squidaio_thread_loop(LPVOID lpParam
)
396 squidaio_thread_t
*threadp
= (squidaio_thread_t
*)lpParam
;
397 squidaio_request_t
*request
;
398 HANDLE cond
; /* local copy of the event queue because win32 event handles
399 * don't atomically release the mutex as cond variables do. */
401 /* lock the thread info */
403 if (WAIT_FAILED
== WaitForSingleObject(request_queue
.mutex
, INFINITE
)) {
404 fatal("Can't get ownership of mutex\n");
407 /* duplicate the handle */
408 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
409 request_queue
.cond
, /* handle to copy */
410 GetCurrentProcess(), /* pseudo handle, don't close */
412 0, /* required access */
413 FALSE
, /* child process's don't inherit the handle */
414 DUPLICATE_SAME_ACCESS
))
415 fatal("Can't duplicate mutex handle\n");
417 if (!ReleaseMutex(request_queue
.mutex
)) {
419 fatal("Can't release mutex\n");
426 threadp
->current_req
= request
= NULL
;
428 /* Get a request to process */
429 threadp
->status
= _THREAD_WAITING
;
432 CloseHandle(request_queue
.mutex
);
437 rv
= WaitForSingleObject(request_queue
.mutex
, INFINITE
);
439 if (rv
== WAIT_FAILED
) {
444 while (!request_queue
.head
) {
445 if (!ReleaseMutex(request_queue
.mutex
)) {
447 threadp
->status
= _THREAD_FAILED
;
452 rv
= WaitForSingleObject(cond
, INFINITE
);
454 if (rv
== WAIT_FAILED
) {
459 rv
= WaitForSingleObject(request_queue
.mutex
, INFINITE
);
461 if (rv
== WAIT_FAILED
) {
467 request
= request_queue
.head
;
470 request_queue
.head
= request
->next
;
472 if (!request_queue
.head
)
473 request_queue
.tailp
= &request_queue
.head
;
475 if (!ReleaseMutex(request_queue
.mutex
)) {
482 /* process the request */
483 threadp
->status
= _THREAD_BUSY
;
485 request
->next
= NULL
;
487 threadp
->current_req
= request
;
491 if (!request
->cancelled
) {
492 switch (request
->request_type
) {
495 squidaio_do_open(request
);
499 squidaio_do_read(request
);
503 squidaio_do_write(request
);
507 squidaio_do_close(request
);
511 squidaio_do_unlink(request
);
514 #if AIO_OPENDIR /* Opendir not implemented yet */
516 case _AIO_OP_OPENDIR
:
517 squidaio_do_opendir(request
);
522 squidaio_do_stat(request
);
527 request
->err
= EINVAL
;
530 } else { /* cancelled */
532 request
->err
= EINTR
;
535 threadp
->status
= _THREAD_DONE
;
536 /* put the request in the done queue */
537 rv
= WaitForSingleObject(done_queue
.mutex
, INFINITE
);
539 if (rv
== WAIT_FAILED
) {
544 *done_queue
.tailp
= request
;
545 done_queue
.tailp
= &request
->next
;
547 if (!ReleaseMutex(done_queue
.mutex
)) {
552 CommIO::NotifyIOCompleted();
555 } /* while forever */
560 } /* squidaio_thread_loop */
563 squidaio_queue_request(squidaio_request_t
* request
)
565 static int high_start
= 0;
566 debugs(43, 9, "squidaio_queue_request: " << request
<< " type=" << request
->request_type
<< " result=" << request
->resultp
);
567 /* Mark it as not executed (failing result, no error) */
570 /* Internal housekeeping */
571 request_queue_len
+= 1;
572 request
->resultp
->_data
= request
;
573 /* Play some tricks with the request_queue2 queue */
574 request
->next
= NULL
;
576 if (WaitForSingleObject(request_queue
.mutex
, 0) == WAIT_OBJECT_0
) {
577 if (request_queue2
.head
) {
578 /* Grab blocked requests */
579 *request_queue
.tailp
= request_queue2
.head
;
580 request_queue
.tailp
= request_queue2
.tailp
;
583 /* Enqueue request */
584 *request_queue
.tailp
= request
;
586 request_queue
.tailp
= &request
->next
;
588 if (!SetEvent(request_queue
.cond
))
589 fatal("Couldn't push queue");
591 if (!ReleaseMutex(request_queue
.mutex
)) {
592 /* unexpected error */
593 fatal("Couldn't push queue");
598 if (request_queue2
.head
) {
599 /* Clear queue of blocked requests */
600 request_queue2
.head
= NULL
;
601 request_queue2
.tailp
= &request_queue2
.head
;
604 /* Oops, the request queue is blocked, use request_queue2 */
605 *request_queue2
.tailp
= request
;
606 request_queue2
.tailp
= &request
->next
;
609 if (request_queue2
.head
) {
610 static int filter
= 0;
611 static int filter_limit
= 8;
613 if (++filter
>= filter_limit
) {
614 filter_limit
+= filter
;
616 debugs(43, 1, "squidaio_queue_request: WARNING - Queue congestion");
620 /* Warn if out of threads */
621 if (request_queue_len
> MAGIC1
) {
622 static int last_warn
= 0;
623 static int queue_high
, queue_low
;
625 if (high_start
== 0) {
626 high_start
= (int)squid_curtime
;
627 queue_high
= request_queue_len
;
628 queue_low
= request_queue_len
;
631 if (request_queue_len
> queue_high
)
632 queue_high
= request_queue_len
;
634 if (request_queue_len
< queue_low
)
635 queue_low
= request_queue_len
;
637 if (squid_curtime
>= (last_warn
+ 15) &&
638 squid_curtime
>= (high_start
+ 5)) {
639 debugs(43, 1, "squidaio_queue_request: WARNING - Disk I/O overloading");
641 if (squid_curtime
>= (high_start
+ 15))
642 debugs(43, 1, "squidaio_queue_request: Queue Length: current=" <<
643 request_queue_len
<< ", high=" << queue_high
<<
644 ", low=" << queue_low
<< ", duration=" <<
645 (long int) (squid_curtime
- high_start
));
647 last_warn
= (int)squid_curtime
;
653 /* Warn if seriously overloaded */
654 if (request_queue_len
> RIDICULOUS_LENGTH
) {
655 debugs(43, 0, "squidaio_queue_request: Async request queue growing uncontrollably!");
656 debugs(43, 0, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
658 debugs(43, 0, "squidaio_queue_request: Synced");
660 } /* squidaio_queue_request */
663 squidaio_cleanup_request(squidaio_request_t
* requestp
)
665 squidaio_result_t
*resultp
= requestp
->resultp
;
666 int cancelled
= requestp
->cancelled
;
668 /* Free allocated structures and copy data back to user space if the */
669 /* request hasn't been cancelled */
671 switch (requestp
->request_type
) {
675 if (!cancelled
&& requestp
->ret
== 0)
677 xmemcpy(requestp
->statp
, requestp
->tmpstatp
, sizeof(struct stat
));
679 squidaio_xfree(requestp
->tmpstatp
, sizeof(struct stat
));
681 squidaio_xstrfree(requestp
->path
);
686 if (cancelled
&& requestp
->ret
>= 0)
687 /* The open() was cancelled but completed */
688 close(requestp
->ret
);
690 squidaio_xstrfree(requestp
->path
);
695 if (cancelled
&& requestp
->ret
< 0)
696 /* The close() was cancelled and never got executed */
703 case _AIO_OP_OPENDIR
:
704 squidaio_xstrfree(requestp
->path
);
718 if (resultp
!= NULL
&& !cancelled
) {
719 resultp
->aio_return
= requestp
->ret
;
720 resultp
->aio_errno
= requestp
->err
;
723 squidaio_request_pool
->free(requestp
);
724 } /* squidaio_cleanup_request */
728 squidaio_cancel(squidaio_result_t
* resultp
)
730 squidaio_request_t
*request
= (squidaio_request_t
*)resultp
->_data
;
732 if (request
&& request
->resultp
== resultp
) {
733 debugs(43, 9, "squidaio_cancel: " << request
<< " type=" << request
->request_type
<< " result=" << request
->resultp
);
734 request
->cancelled
= 1;
735 request
->resultp
= NULL
;
736 resultp
->_data
= NULL
;
737 resultp
->result_type
= _AIO_OP_NONE
;
742 } /* squidaio_cancel */
746 squidaio_open(const char *path
, int oflag
, mode_t mode
, squidaio_result_t
* resultp
)
749 squidaio_request_t
*requestp
;
751 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
753 requestp
->path
= (char *) squidaio_xstrdup(path
);
755 requestp
->oflag
= oflag
;
757 requestp
->mode
= mode
;
759 requestp
->resultp
= resultp
;
761 requestp
->request_type
= _AIO_OP_OPEN
;
763 requestp
->cancelled
= 0;
765 resultp
->result_type
= _AIO_OP_OPEN
;
767 squidaio_queue_request(requestp
);
774 squidaio_do_open(squidaio_request_t
* requestp
)
776 requestp
->ret
= open(requestp
->path
, requestp
->oflag
, requestp
->mode
);
777 requestp
->err
= errno
;
782 squidaio_read(int fd
, char *bufp
, size_t bufs
, off_t offset
, int whence
, squidaio_result_t
* resultp
)
784 squidaio_request_t
*requestp
;
786 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
790 requestp
->bufferp
= bufp
;
792 requestp
->buflen
= bufs
;
794 requestp
->offset
= offset
;
796 requestp
->whence
= whence
;
798 requestp
->resultp
= resultp
;
800 requestp
->request_type
= _AIO_OP_READ
;
802 requestp
->cancelled
= 0;
804 resultp
->result_type
= _AIO_OP_READ
;
806 squidaio_queue_request(requestp
);
813 squidaio_do_read(squidaio_request_t
* requestp
)
815 lseek(requestp
->fd
, requestp
->offset
, requestp
->whence
);
817 if (!ReadFile((HANDLE
)_get_osfhandle(requestp
->fd
), requestp
->bufferp
,
818 requestp
->buflen
, (LPDWORD
)&requestp
->ret
, NULL
)) {
819 WIN32_maperror(GetLastError());
823 requestp
->err
= errno
;
828 squidaio_write(int fd
, char *bufp
, size_t bufs
, off_t offset
, int whence
, squidaio_result_t
* resultp
)
830 squidaio_request_t
*requestp
;
832 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
836 requestp
->bufferp
= bufp
;
838 requestp
->buflen
= bufs
;
840 requestp
->offset
= offset
;
842 requestp
->whence
= whence
;
844 requestp
->resultp
= resultp
;
846 requestp
->request_type
= _AIO_OP_WRITE
;
848 requestp
->cancelled
= 0;
850 resultp
->result_type
= _AIO_OP_WRITE
;
852 squidaio_queue_request(requestp
);
859 squidaio_do_write(squidaio_request_t
* requestp
)
861 if (!WriteFile((HANDLE
)_get_osfhandle(requestp
->fd
), requestp
->bufferp
,
862 requestp
->buflen
, (LPDWORD
)&requestp
->ret
, NULL
)) {
863 WIN32_maperror(GetLastError());
867 requestp
->err
= errno
;
872 squidaio_close(int fd
, squidaio_result_t
* resultp
)
874 squidaio_request_t
*requestp
;
876 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
880 requestp
->resultp
= resultp
;
882 requestp
->request_type
= _AIO_OP_CLOSE
;
884 requestp
->cancelled
= 0;
886 resultp
->result_type
= _AIO_OP_CLOSE
;
888 squidaio_queue_request(requestp
);
895 squidaio_do_close(squidaio_request_t
* requestp
)
897 if ((requestp
->ret
= close(requestp
->fd
)) < 0) {
898 debugs(43, 0, "squidaio_do_close: FD " << requestp
->fd
<< ", errno " << errno
);
902 requestp
->err
= errno
;
908 squidaio_stat(const char *path
, struct stat
*sb
, squidaio_result_t
* resultp
)
911 squidaio_request_t
*requestp
;
913 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
915 requestp
->path
= (char *) squidaio_xstrdup(path
);
917 requestp
->statp
= sb
;
919 requestp
->tmpstatp
= (struct stat
*) squidaio_xmalloc(sizeof(struct stat
));
921 requestp
->resultp
= resultp
;
923 requestp
->request_type
= _AIO_OP_STAT
;
925 requestp
->cancelled
= 0;
927 resultp
->result_type
= _AIO_OP_STAT
;
929 squidaio_queue_request(requestp
);
936 squidaio_do_stat(squidaio_request_t
* requestp
)
938 requestp
->ret
= stat(requestp
->path
, requestp
->tmpstatp
);
939 requestp
->err
= errno
;
944 squidaio_unlink(const char *path
, squidaio_result_t
* resultp
)
947 squidaio_request_t
*requestp
;
949 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
951 requestp
->path
= squidaio_xstrdup(path
);
953 requestp
->resultp
= resultp
;
955 requestp
->request_type
= _AIO_OP_UNLINK
;
957 requestp
->cancelled
= 0;
959 resultp
->result_type
= _AIO_OP_UNLINK
;
961 squidaio_queue_request(requestp
);
968 squidaio_do_unlink(squidaio_request_t
* requestp
)
970 requestp
->ret
= unlink(requestp
->path
);
971 requestp
->err
= errno
;
975 /* XXX squidaio_opendir NOT implemented yet.. */
978 squidaio_opendir(const char *path
, squidaio_result_t
* resultp
)
980 squidaio_request_t
*requestp
;
983 requestp
= squidaio_request_pool
->alloc();
985 resultp
->result_type
= _AIO_OP_OPENDIR
;
991 squidaio_do_opendir(squidaio_request_t
* requestp
)
993 /* NOT IMPLEMENTED */
999 squidaio_poll_queues(void)
1001 /* kick "overflow" request queue */
1003 if (request_queue2
.head
&&
1004 (WaitForSingleObject(request_queue
.mutex
, 0 )== WAIT_OBJECT_0
)) {
1005 *request_queue
.tailp
= request_queue2
.head
;
1006 request_queue
.tailp
= request_queue2
.tailp
;
1008 if (!SetEvent(request_queue
.cond
))
1009 fatal("couldn't push queue\n");
1011 if (!ReleaseMutex(request_queue
.mutex
)) {
1012 /* unexpected error */
1016 request_queue2
.head
= NULL
;
1017 request_queue2
.tailp
= &request_queue2
.head
;
1020 /* poll done queue */
1021 if (done_queue
.head
&&
1022 (WaitForSingleObject(done_queue
.mutex
, 0)==WAIT_OBJECT_0
)) {
1024 struct squidaio_request_t
*requests
= done_queue
.head
;
1025 done_queue
.head
= NULL
;
1026 done_queue
.tailp
= &done_queue
.head
;
1028 if (!ReleaseMutex(done_queue
.mutex
)) {
1029 /* unexpected error */
1033 *done_requests
.tailp
= requests
;
1034 request_queue_len
-= 1;
1036 while (requests
->next
) {
1037 requests
= requests
->next
;
1038 request_queue_len
-= 1;
1041 done_requests
.tailp
= &requests
->next
;
1046 squidaio_poll_done(void)
1048 squidaio_request_t
*request
;
1049 squidaio_result_t
*resultp
;
1054 request
= done_requests
.head
;
1056 if (request
== NULL
&& !polled
) {
1057 CommIO::ResetNotifications();
1058 squidaio_poll_queues();
1060 request
= done_requests
.head
;
1067 debugs(43, 9, "squidaio_poll_done: " << request
<< " type=" << request
->request_type
<< " result=" << request
->resultp
);
1068 done_requests
.head
= request
->next
;
1070 if (!done_requests
.head
)
1071 done_requests
.tailp
= &done_requests
.head
;
1073 resultp
= request
->resultp
;
1075 cancelled
= request
->cancelled
;
1077 squidaio_debug(request
);
1079 debugs(43, 5, "DONE: " << request
->ret
<< " -> " << request
->err
);
1081 squidaio_cleanup_request(request
);
1087 } /* squidaio_poll_done */
1090 squidaio_operations_pending(void)
1092 return request_queue_len
+ (done_requests
.head
? 1 : 0);
1098 /* XXX This might take a while if the queue is large.. */
1101 squidaio_poll_queues();
1102 } while (request_queue_len
> 0);
1104 return squidaio_operations_pending();
1108 squidaio_get_queue_len(void)
1110 return request_queue_len
;
1114 squidaio_debug(squidaio_request_t
* request
)
1116 switch (request
->request_type
) {
1119 debugs(43, 5, "OPEN of " << request
->path
<< " to FD " << request
->ret
);
1123 debugs(43, 5, "READ on fd: " << request
->fd
);
1127 debugs(43, 5, "WRITE on fd: " << request
->fd
);
1131 debugs(43, 5, "CLOSE of fd: " << request
->fd
);
1134 case _AIO_OP_UNLINK
:
1135 debugs(43, 5, "UNLINK of " << request
->path
);
1144 squidaio_stats(StoreEntry
* sentry
)
1146 squidaio_thread_t
*threadp
;
1149 if (!squidaio_initialised
)
1152 storeAppendPrintf(sentry
, "\n\nThreads Status:\n");
1154 storeAppendPrintf(sentry
, "#\tID\t# Requests\n");
1158 for (i
= 0; i
< NUMTHREADS
; i
++) {
1159 storeAppendPrintf(sentry
, "%i\t0x%lx\t%ld\n", i
+ 1, threadp
->dwThreadId
, threadp
->requests
);
1160 threadp
= threadp
->next
;