2 * $Id: aiops_win32.cc,v 1.5 2007/08/16 23:32:28 hno Exp $
4 * DEBUG: section 43 Windows AIOPS
5 * AUTHOR: Stewart Forster <slf@connect.com.au>
6 * AUTHOR: Robert Collins <robertc@squid-cache.org>
7 * AUTHOR: Guido Serassio <serassio@squid-cache.org>
9 * SQUID Web Proxy Cache http://www.squid-cache.org/
10 * ----------------------------------------------------------
12 * Squid is the result of efforts by numerous individuals from
13 * the Internet community; see the CONTRIBUTORS file for full
14 * details. Many organizations have provided support for Squid's
15 * development; see the SPONSORS file for full details. Squid is
16 * Copyrighted (C) 2001 by the Regents of the University of
17 * California; see the COPYRIGHT file for full details. Squid
18 * incorporates software developed and/or copyrighted by other
19 * sources; see the CREDITS file for full details.
21 * This program is free software; you can redistribute it and/or modify
22 * it under the terms of the GNU General Public License as published by
23 * the Free Software Foundation; either version 2 of the License, or
24 * (at your option) any later version.
26 * This program is distributed in the hope that it will be useful,
27 * but WITHOUT ANY WARRANTY; without even the implied warranty of
28 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 * GNU General Public License for more details.
31 * You should have received a copy of the GNU General Public License
32 * along with this program; if not, write to the Free Software
33 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
38 #include "squid_windows.h"
39 #include "DiskThreads.h"
42 #include <sys/types.h>
49 #include "SquidTime.h"
52 #define RIDICULOUS_LENGTH 4096
54 enum _squidaio_thread_status
{
61 typedef enum _squidaio_thread_status squidaio_thread_status
;
63 typedef struct squidaio_request_t
66 struct squidaio_request_t
*next
;
67 squidaio_request_type request_type
;
81 struct stat
*tmpstatp
;
84 squidaio_result_t
*resultp
;
89 typedef struct squidaio_request_queue_t
92 HANDLE cond
; /* See Event objects */
93 squidaio_request_t
*volatile head
;
94 squidaio_request_t
*volatile *volatile tailp
;
95 unsigned long requests
;
96 unsigned long blocked
; /* main failed to lock the queue */
99 squidaio_request_queue_t
;
101 typedef struct squidaio_thread_t squidaio_thread_t
;
103 struct squidaio_thread_t
105 squidaio_thread_t
*next
;
107 DWORD dwThreadId
; /* thread ID */
108 squidaio_thread_status status
;
110 struct squidaio_request_t
*current_req
;
111 unsigned long requests
;
115 static void squidaio_queue_request(squidaio_request_t
*);
116 static void squidaio_cleanup_request(squidaio_request_t
*);
117 static DWORD WINAPI
squidaio_thread_loop( LPVOID lpParam
);
118 static void squidaio_do_open(squidaio_request_t
*);
119 static void squidaio_do_read(squidaio_request_t
*);
120 static void squidaio_do_write(squidaio_request_t
*);
121 static void squidaio_do_close(squidaio_request_t
*);
122 static void squidaio_do_stat(squidaio_request_t
*);
123 static void squidaio_do_unlink(squidaio_request_t
*);
125 static void *squidaio_do_opendir(squidaio_request_t
*);
127 static void squidaio_debug(squidaio_request_t
*);
128 static void squidaio_poll_queues(void);
130 static squidaio_thread_t
*threads
= NULL
;
131 static int squidaio_initialised
= 0;
134 #define AIO_LARGE_BUFS 16384
135 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
136 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
137 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
138 #define AIO_MICRO_BUFS 128
140 static MemAllocator
*squidaio_large_bufs
= NULL
; /* 16K */
141 static MemAllocator
*squidaio_medium_bufs
= NULL
; /* 8K */
142 static MemAllocator
*squidaio_small_bufs
= NULL
; /* 4K */
143 static MemAllocator
*squidaio_tiny_bufs
= NULL
; /* 2K */
144 static MemAllocator
*squidaio_micro_bufs
= NULL
; /* 128K */
146 static int request_queue_len
= 0;
147 static MemAllocator
*squidaio_request_pool
= NULL
;
148 static MemAllocator
*squidaio_thread_pool
= NULL
;
149 static squidaio_request_queue_t request_queue
;
153 squidaio_request_t
*head
, **tailp
;
158 NULL
, &request_queue2
.head
160 static squidaio_request_queue_t done_queue
;
164 squidaio_request_t
*head
, **tailp
;
169 NULL
, &done_requests
.head
172 static HANDLE main_thread
;
174 static MemAllocator
*
175 squidaio_get_pool(int size
)
177 if (size
<= AIO_LARGE_BUFS
) {
178 if (size
<= AIO_MICRO_BUFS
)
179 return squidaio_micro_bufs
;
180 else if (size
<= AIO_TINY_BUFS
)
181 return squidaio_tiny_bufs
;
182 else if (size
<= AIO_SMALL_BUFS
)
183 return squidaio_small_bufs
;
184 else if (size
<= AIO_MEDIUM_BUFS
)
185 return squidaio_medium_bufs
;
187 return squidaio_large_bufs
;
194 squidaio_xmalloc(int size
)
199 if ((pool
= squidaio_get_pool(size
)) != NULL
) {
208 squidaio_xstrdup(const char *str
)
211 int len
= strlen(str
) + 1;
213 p
= (char *)squidaio_xmalloc(len
);
214 strncpy(p
, str
, len
);
220 squidaio_xfree(void *p
, int size
)
224 if ((pool
= squidaio_get_pool(size
)) != NULL
) {
231 squidaio_xstrfree(char *str
)
234 int len
= strlen(str
) + 1;
236 if ((pool
= squidaio_get_pool(len
)) != NULL
) {
246 squidaio_thread_t
*threadp
;
248 if (squidaio_initialised
)
251 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
252 GetCurrentThread(), /* pseudo handle to copy */
253 GetCurrentProcess(), /* pseudo handle, don't close */
255 0, /* required access */
256 FALSE
, /* child process's don't inherit the handle */
257 DUPLICATE_SAME_ACCESS
)) {
259 fatal("Couldn't get current thread handle");
262 /* Initialize request queue */
263 if ((request_queue
.mutex
= CreateMutex(NULL
, /* no inheritance */
264 FALSE
, /* start unowned (as per mutex_init) */
267 fatal("Failed to create mutex");
270 if ((request_queue
.cond
= CreateEvent(NULL
, /* no inheritance */
271 FALSE
, /* auto signal reset - which I think is pthreads like ? */
272 FALSE
, /* start non signaled */
275 fatal("Failed to create condition variable");
278 request_queue
.head
= NULL
;
280 request_queue
.tailp
= &request_queue
.head
;
282 request_queue
.requests
= 0;
284 request_queue
.blocked
= 0;
286 /* Initialize done queue */
288 if ((done_queue
.mutex
= CreateMutex(NULL
, /* no inheritance */
289 FALSE
, /* start unowned (as per mutex_init) */
292 fatal("Failed to create mutex");
295 if ((done_queue
.cond
= CreateEvent(NULL
, /* no inheritance */
296 TRUE
, /* manually signaled - which I think is pthreads like ? */
297 FALSE
, /* start non signaled */
300 fatal("Failed to create condition variable");
303 done_queue
.head
= NULL
;
305 done_queue
.tailp
= &done_queue
.head
;
307 done_queue
.requests
= 0;
309 done_queue
.blocked
= 0;
311 CommIO::NotifyIOCompleted();
313 /* Create threads and get them to sit in their wait loop */
314 squidaio_thread_pool
= memPoolCreate("aio_thread", sizeof(squidaio_thread_t
));
318 for (i
= 0; i
< NUMTHREADS
; i
++) {
319 threadp
= (squidaio_thread_t
*)squidaio_thread_pool
->alloc();
320 threadp
->status
= _THREAD_STARTING
;
321 threadp
->current_req
= NULL
;
322 threadp
->requests
= 0;
323 threadp
->next
= threads
;
326 if ((threadp
->thread
= CreateThread(NULL
, /* no security attributes */
327 0, /* use default stack size */
328 squidaio_thread_loop
, /* thread function */
329 threadp
, /* argument to thread function */
330 0, /* use default creation flags */
331 &(threadp
->dwThreadId
)) /* returns the thread identifier */
333 fprintf(stderr
, "Thread creation failed\n");
334 threadp
->status
= _THREAD_FAILED
;
338 /* Set the new thread priority above parent process */
339 SetThreadPriority(threadp
->thread
,THREAD_PRIORITY_ABOVE_NORMAL
);
342 /* Create request pool */
343 squidaio_request_pool
= memPoolCreate("aio_request", sizeof(squidaio_request_t
));
345 squidaio_large_bufs
= memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS
);
347 squidaio_medium_bufs
= memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS
);
349 squidaio_small_bufs
= memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS
);
351 squidaio_tiny_bufs
= memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS
);
353 squidaio_micro_bufs
= memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS
);
355 squidaio_initialised
= 1;
359 squidaio_shutdown(void)
361 squidaio_thread_t
*threadp
;
365 if (!squidaio_initialised
)
368 /* This is the same as in squidaio_sync */
370 squidaio_poll_queues();
371 } while (request_queue_len
> 0);
373 hthreads
= (HANDLE
*) xcalloc (NUMTHREADS
, sizeof (HANDLE
));
377 for (i
= 0; i
< NUMTHREADS
; i
++) {
379 hthreads
[i
] = threadp
->thread
;
380 threadp
= threadp
->next
;
383 ReleaseMutex(request_queue
.mutex
);
384 ResetEvent(request_queue
.cond
);
385 ReleaseMutex(done_queue
.mutex
);
386 ResetEvent(done_queue
.cond
);
389 WaitForMultipleObjects(NUMTHREADS
, hthreads
, TRUE
, 2000);
391 for (i
= 0; i
< NUMTHREADS
; i
++) {
392 CloseHandle(hthreads
[i
]);
395 CloseHandle(main_thread
);
396 CommIO::NotifyIOClose();
398 squidaio_initialised
= 0;
403 squidaio_thread_loop(LPVOID lpParam
)
405 squidaio_thread_t
*threadp
= (squidaio_thread_t
*)lpParam
;
406 squidaio_request_t
*request
;
407 HANDLE cond
; /* local copy of the event queue because win32 event handles
408 * don't atomically release the mutex as cond variables do. */
410 /* lock the thread info */
412 if (WAIT_FAILED
== WaitForSingleObject(request_queue
.mutex
, INFINITE
)) {
413 fatal("Can't get ownership of mutex\n");
416 /* duplicate the handle */
417 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
418 request_queue
.cond
, /* handle to copy */
419 GetCurrentProcess(), /* pseudo handle, don't close */
421 0, /* required access */
422 FALSE
, /* child process's don't inherit the handle */
423 DUPLICATE_SAME_ACCESS
))
424 fatal("Can't duplicate mutex handle\n");
426 if (!ReleaseMutex(request_queue
.mutex
)) {
428 fatal("Can't release mutex\n");
435 threadp
->current_req
= request
= NULL
;
437 /* Get a request to process */
438 threadp
->status
= _THREAD_WAITING
;
441 CloseHandle(request_queue
.mutex
);
446 rv
= WaitForSingleObject(request_queue
.mutex
, INFINITE
);
448 if (rv
== WAIT_FAILED
) {
453 while (!request_queue
.head
) {
454 if (!ReleaseMutex(request_queue
.mutex
)) {
456 threadp
->status
= _THREAD_FAILED
;
461 rv
= WaitForSingleObject(cond
, INFINITE
);
463 if (rv
== WAIT_FAILED
) {
468 rv
= WaitForSingleObject(request_queue
.mutex
, INFINITE
);
470 if (rv
== WAIT_FAILED
) {
476 request
= request_queue
.head
;
479 request_queue
.head
= request
->next
;
481 if (!request_queue
.head
)
482 request_queue
.tailp
= &request_queue
.head
;
484 if (!ReleaseMutex(request_queue
.mutex
)) {
491 /* process the request */
492 threadp
->status
= _THREAD_BUSY
;
494 request
->next
= NULL
;
496 threadp
->current_req
= request
;
500 if (!request
->cancelled
) {
501 switch (request
->request_type
) {
504 squidaio_do_open(request
);
508 squidaio_do_read(request
);
512 squidaio_do_write(request
);
516 squidaio_do_close(request
);
520 squidaio_do_unlink(request
);
523 #if AIO_OPENDIR /* Opendir not implemented yet */
525 case _AIO_OP_OPENDIR
:
526 squidaio_do_opendir(request
);
531 squidaio_do_stat(request
);
536 request
->err
= EINVAL
;
539 } else { /* cancelled */
541 request
->err
= EINTR
;
544 threadp
->status
= _THREAD_DONE
;
545 /* put the request in the done queue */
546 rv
= WaitForSingleObject(done_queue
.mutex
, INFINITE
);
548 if (rv
== WAIT_FAILED
) {
553 *done_queue
.tailp
= request
;
554 done_queue
.tailp
= &request
->next
;
556 if (!ReleaseMutex(done_queue
.mutex
)) {
561 CommIO::NotifyIOCompleted();
564 } /* while forever */
569 } /* squidaio_thread_loop */
572 squidaio_queue_request(squidaio_request_t
* request
)
574 static int high_start
= 0;
575 debugs(43, 9, "squidaio_queue_request: " << request
<< " type=" << request
->request_type
<< " result=" << request
->resultp
);
576 /* Mark it as not executed (failing result, no error) */
579 /* Internal housekeeping */
580 request_queue_len
+= 1;
581 request
->resultp
->_data
= request
;
582 /* Play some tricks with the request_queue2 queue */
583 request
->next
= NULL
;
585 if (WaitForSingleObject(request_queue
.mutex
, 0) == WAIT_OBJECT_0
) {
586 if (request_queue2
.head
) {
587 /* Grab blocked requests */
588 *request_queue
.tailp
= request_queue2
.head
;
589 request_queue
.tailp
= request_queue2
.tailp
;
592 /* Enqueue request */
593 *request_queue
.tailp
= request
;
595 request_queue
.tailp
= &request
->next
;
597 if (!SetEvent(request_queue
.cond
))
598 fatal("Couldn't push queue");
600 if (!ReleaseMutex(request_queue
.mutex
)) {
601 /* unexpected error */
602 fatal("Couldn't push queue");
607 if (request_queue2
.head
) {
608 /* Clear queue of blocked requests */
609 request_queue2
.head
= NULL
;
610 request_queue2
.tailp
= &request_queue2
.head
;
613 /* Oops, the request queue is blocked, use request_queue2 */
614 *request_queue2
.tailp
= request
;
615 request_queue2
.tailp
= &request
->next
;
618 if (request_queue2
.head
) {
619 static int filter
= 0;
620 static int filter_limit
= 8;
622 if (++filter
>= filter_limit
) {
623 filter_limit
+= filter
;
625 debugs(43, 1, "squidaio_queue_request: WARNING - Queue congestion");
629 /* Warn if out of threads */
630 if (request_queue_len
> MAGIC1
) {
631 static int last_warn
= 0;
632 static int queue_high
, queue_low
;
634 if (high_start
== 0) {
635 high_start
= (int)squid_curtime
;
636 queue_high
= request_queue_len
;
637 queue_low
= request_queue_len
;
640 if (request_queue_len
> queue_high
)
641 queue_high
= request_queue_len
;
643 if (request_queue_len
< queue_low
)
644 queue_low
= request_queue_len
;
646 if (squid_curtime
>= (last_warn
+ 15) &&
647 squid_curtime
>= (high_start
+ 5)) {
648 debugs(43, 1, "squidaio_queue_request: WARNING - Disk I/O overloading");
650 if (squid_curtime
>= (high_start
+ 15))
651 debugs(43, 1, "squidaio_queue_request: Queue Length: current=" <<
652 request_queue_len
<< ", high=" << queue_high
<<
653 ", low=" << queue_low
<< ", duration=" <<
654 (long int) (squid_curtime
- high_start
));
656 last_warn
= (int)squid_curtime
;
662 /* Warn if seriously overloaded */
663 if (request_queue_len
> RIDICULOUS_LENGTH
) {
664 debugs(43, 0, "squidaio_queue_request: Async request queue growing uncontrollably!");
665 debugs(43, 0, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
667 debugs(43, 0, "squidaio_queue_request: Synced");
669 } /* squidaio_queue_request */
672 squidaio_cleanup_request(squidaio_request_t
* requestp
)
674 squidaio_result_t
*resultp
= requestp
->resultp
;
675 int cancelled
= requestp
->cancelled
;
677 /* Free allocated structures and copy data back to user space if the */
678 /* request hasn't been cancelled */
680 switch (requestp
->request_type
) {
684 if (!cancelled
&& requestp
->ret
== 0)
686 xmemcpy(requestp
->statp
, requestp
->tmpstatp
, sizeof(struct stat
));
688 squidaio_xfree(requestp
->tmpstatp
, sizeof(struct stat
));
690 squidaio_xstrfree(requestp
->path
);
695 if (cancelled
&& requestp
->ret
>= 0)
696 /* The open() was cancelled but completed */
697 close(requestp
->ret
);
699 squidaio_xstrfree(requestp
->path
);
704 if (cancelled
&& requestp
->ret
< 0)
705 /* The close() was cancelled and never got executed */
712 case _AIO_OP_OPENDIR
:
713 squidaio_xstrfree(requestp
->path
);
727 if (resultp
!= NULL
&& !cancelled
) {
728 resultp
->aio_return
= requestp
->ret
;
729 resultp
->aio_errno
= requestp
->err
;
732 squidaio_request_pool
->free(requestp
);
733 } /* squidaio_cleanup_request */
737 squidaio_cancel(squidaio_result_t
* resultp
)
739 squidaio_request_t
*request
= (squidaio_request_t
*)resultp
->_data
;
741 if (request
&& request
->resultp
== resultp
) {
742 debugs(43, 9, "squidaio_cancel: " << request
<< " type=" << request
->request_type
<< " result=" << request
->resultp
);
743 request
->cancelled
= 1;
744 request
->resultp
= NULL
;
745 resultp
->_data
= NULL
;
746 resultp
->result_type
= _AIO_OP_NONE
;
751 } /* squidaio_cancel */
755 squidaio_open(const char *path
, int oflag
, mode_t mode
, squidaio_result_t
* resultp
)
758 squidaio_request_t
*requestp
;
760 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
762 requestp
->path
= (char *) squidaio_xstrdup(path
);
764 requestp
->oflag
= oflag
;
766 requestp
->mode
= mode
;
768 requestp
->resultp
= resultp
;
770 requestp
->request_type
= _AIO_OP_OPEN
;
772 requestp
->cancelled
= 0;
774 resultp
->result_type
= _AIO_OP_OPEN
;
776 squidaio_queue_request(requestp
);
783 squidaio_do_open(squidaio_request_t
* requestp
)
785 requestp
->ret
= open(requestp
->path
, requestp
->oflag
, requestp
->mode
);
786 requestp
->err
= errno
;
791 squidaio_read(int fd
, char *bufp
, size_t bufs
, off_t offset
, int whence
, squidaio_result_t
* resultp
)
793 squidaio_request_t
*requestp
;
795 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
799 requestp
->bufferp
= bufp
;
801 requestp
->buflen
= bufs
;
803 requestp
->offset
= offset
;
805 requestp
->whence
= whence
;
807 requestp
->resultp
= resultp
;
809 requestp
->request_type
= _AIO_OP_READ
;
811 requestp
->cancelled
= 0;
813 resultp
->result_type
= _AIO_OP_READ
;
815 squidaio_queue_request(requestp
);
822 squidaio_do_read(squidaio_request_t
* requestp
)
824 lseek(requestp
->fd
, requestp
->offset
, requestp
->whence
);
826 if (!ReadFile((HANDLE
)_get_osfhandle(requestp
->fd
), requestp
->bufferp
,
827 requestp
->buflen
, (LPDWORD
)&requestp
->ret
, NULL
)) {
828 WIN32_maperror(GetLastError());
832 requestp
->err
= errno
;
837 squidaio_write(int fd
, char *bufp
, size_t bufs
, off_t offset
, int whence
, squidaio_result_t
* resultp
)
839 squidaio_request_t
*requestp
;
841 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
845 requestp
->bufferp
= bufp
;
847 requestp
->buflen
= bufs
;
849 requestp
->offset
= offset
;
851 requestp
->whence
= whence
;
853 requestp
->resultp
= resultp
;
855 requestp
->request_type
= _AIO_OP_WRITE
;
857 requestp
->cancelled
= 0;
859 resultp
->result_type
= _AIO_OP_WRITE
;
861 squidaio_queue_request(requestp
);
868 squidaio_do_write(squidaio_request_t
* requestp
)
870 if (!WriteFile((HANDLE
)_get_osfhandle(requestp
->fd
), requestp
->bufferp
,
871 requestp
->buflen
, (LPDWORD
)&requestp
->ret
, NULL
)) {
872 WIN32_maperror(GetLastError());
876 requestp
->err
= errno
;
881 squidaio_close(int fd
, squidaio_result_t
* resultp
)
883 squidaio_request_t
*requestp
;
885 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
889 requestp
->resultp
= resultp
;
891 requestp
->request_type
= _AIO_OP_CLOSE
;
893 requestp
->cancelled
= 0;
895 resultp
->result_type
= _AIO_OP_CLOSE
;
897 squidaio_queue_request(requestp
);
904 squidaio_do_close(squidaio_request_t
* requestp
)
906 if((requestp
->ret
= close(requestp
->fd
)) < 0) {
907 debugs(43, 0, "squidaio_do_close: FD " << requestp
->fd
<< ", errno " << errno
);
911 requestp
->err
= errno
;
917 squidaio_stat(const char *path
, struct stat
*sb
, squidaio_result_t
* resultp
)
920 squidaio_request_t
*requestp
;
922 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
924 requestp
->path
= (char *) squidaio_xstrdup(path
);
926 requestp
->statp
= sb
;
928 requestp
->tmpstatp
= (struct stat
*) squidaio_xmalloc(sizeof(struct stat
));
930 requestp
->resultp
= resultp
;
932 requestp
->request_type
= _AIO_OP_STAT
;
934 requestp
->cancelled
= 0;
936 resultp
->result_type
= _AIO_OP_STAT
;
938 squidaio_queue_request(requestp
);
945 squidaio_do_stat(squidaio_request_t
* requestp
)
947 requestp
->ret
= stat(requestp
->path
, requestp
->tmpstatp
);
948 requestp
->err
= errno
;
953 squidaio_unlink(const char *path
, squidaio_result_t
* resultp
)
956 squidaio_request_t
*requestp
;
958 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
960 requestp
->path
= squidaio_xstrdup(path
);
962 requestp
->resultp
= resultp
;
964 requestp
->request_type
= _AIO_OP_UNLINK
;
966 requestp
->cancelled
= 0;
968 resultp
->result_type
= _AIO_OP_UNLINK
;
970 squidaio_queue_request(requestp
);
977 squidaio_do_unlink(squidaio_request_t
* requestp
)
979 requestp
->ret
= unlink(requestp
->path
);
980 requestp
->err
= errno
;
984 /* XXX squidaio_opendir NOT implemented yet.. */
987 squidaio_opendir(const char *path
, squidaio_result_t
* resultp
)
989 squidaio_request_t
*requestp
;
992 requestp
= squidaio_request_pool
->alloc();
994 resultp
->result_type
= _AIO_OP_OPENDIR
;
1000 squidaio_do_opendir(squidaio_request_t
* requestp
)
1002 /* NOT IMPLEMENTED */
1008 squidaio_poll_queues(void)
1010 /* kick "overflow" request queue */
1012 if (request_queue2
.head
&&
1013 (WaitForSingleObject(request_queue
.mutex
, 0 )== WAIT_OBJECT_0
)) {
1014 *request_queue
.tailp
= request_queue2
.head
;
1015 request_queue
.tailp
= request_queue2
.tailp
;
1017 if (!SetEvent(request_queue
.cond
))
1018 fatal("couldn't push queue\n");
1020 if (!ReleaseMutex(request_queue
.mutex
)) {
1021 /* unexpected error */
1025 request_queue2
.head
= NULL
;
1026 request_queue2
.tailp
= &request_queue2
.head
;
1029 /* poll done queue */
1030 if (done_queue
.head
&&
1031 (WaitForSingleObject(done_queue
.mutex
, 0)==WAIT_OBJECT_0
)) {
1033 struct squidaio_request_t
*requests
= done_queue
.head
;
1034 done_queue
.head
= NULL
;
1035 done_queue
.tailp
= &done_queue
.head
;
1037 if (!ReleaseMutex(done_queue
.mutex
)) {
1038 /* unexpected error */
1042 *done_requests
.tailp
= requests
;
1043 request_queue_len
-= 1;
1045 while (requests
->next
) {
1046 requests
= requests
->next
;
1047 request_queue_len
-= 1;
1050 done_requests
.tailp
= &requests
->next
;
1055 squidaio_poll_done(void)
1057 squidaio_request_t
*request
;
1058 squidaio_result_t
*resultp
;
1063 request
= done_requests
.head
;
1065 if (request
== NULL
&& !polled
) {
1066 CommIO::ResetNotifications();
1067 squidaio_poll_queues();
1069 request
= done_requests
.head
;
1076 debugs(43, 9, "squidaio_poll_done: " << request
<< " type=" << request
->request_type
<< " result=" << request
->resultp
);
1077 done_requests
.head
= request
->next
;
1079 if (!done_requests
.head
)
1080 done_requests
.tailp
= &done_requests
.head
;
1082 resultp
= request
->resultp
;
1084 cancelled
= request
->cancelled
;
1086 squidaio_debug(request
);
1088 debugs(43, 5, "DONE: " << request
->ret
<< " -> " << request
->err
);
1090 squidaio_cleanup_request(request
);
1096 } /* squidaio_poll_done */
1099 squidaio_operations_pending(void)
1101 return request_queue_len
+ (done_requests
.head
? 1 : 0);
1107 /* XXX This might take a while if the queue is large.. */
1110 squidaio_poll_queues();
1111 } while (request_queue_len
> 0);
1113 return squidaio_operations_pending();
1117 squidaio_get_queue_len(void)
1119 return request_queue_len
;
1123 squidaio_debug(squidaio_request_t
* request
)
1125 switch (request
->request_type
) {
1128 debugs(43, 5, "OPEN of " << request
->path
<< " to FD " << request
->ret
);
1132 debugs(43, 5, "READ on fd: " << request
->fd
);
1136 debugs(43, 5, "WRITE on fd: " << request
->fd
);
1140 debugs(43, 5, "CLOSE of fd: " << request
->fd
);
1143 case _AIO_OP_UNLINK
:
1144 debugs(43, 5, "UNLINK of " << request
->path
);
1153 squidaio_stats(StoreEntry
* sentry
)
1155 squidaio_thread_t
*threadp
;
1158 if (!squidaio_initialised
)
1161 storeAppendPrintf(sentry
, "\n\nThreads Status:\n");
1163 storeAppendPrintf(sentry
, "#\tID\t# Requests\n");
1167 for (i
= 0; i
< NUMTHREADS
; i
++) {
1168 storeAppendPrintf(sentry
, "%i\t0x%lx\t%ld\n", i
+ 1, threadp
->dwThreadId
, threadp
->requests
);
1169 threadp
= threadp
->next
;