2 * Copyright (C) 1996-2016 The Squid Software Foundation and contributors
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
9 /* DEBUG: section 43 Windows AIOPS */
12 #include "DiskIO/DiskThreads/CommIO.h"
13 #include "DiskThreads.h"
16 #include "SquidConfig.h"
17 #include "SquidTime.h"
26 #define RIDICULOUS_LENGTH 4096
28 enum _squidaio_thread_status
{
35 typedef enum _squidaio_thread_status squidaio_thread_status
;
37 typedef struct squidaio_request_t
{
39 struct squidaio_request_t
*next
;
40 squidaio_request_type request_type
;
54 struct stat
*tmpstatp
;
57 squidaio_result_t
*resultp
;
60 typedef struct squidaio_request_queue_t
{
62 HANDLE cond
; /* See Event objects */
63 squidaio_request_t
*volatile head
;
64 squidaio_request_t
*volatile *volatile tailp
;
65 unsigned long requests
;
66 unsigned long blocked
; /* main failed to lock the queue */
67 } squidaio_request_queue_t
;
69 typedef struct squidaio_thread_t squidaio_thread_t
;
71 struct squidaio_thread_t
{
72 squidaio_thread_t
*next
;
74 DWORD dwThreadId
; /* thread ID */
75 squidaio_thread_status status
;
77 struct squidaio_request_t
*current_req
;
78 unsigned long requests
;
82 static void squidaio_queue_request(squidaio_request_t
*);
83 static void squidaio_cleanup_request(squidaio_request_t
*);
84 static DWORD WINAPI
squidaio_thread_loop( LPVOID lpParam
);
85 static void squidaio_do_open(squidaio_request_t
*);
86 static void squidaio_do_read(squidaio_request_t
*);
87 static void squidaio_do_write(squidaio_request_t
*);
88 static void squidaio_do_close(squidaio_request_t
*);
89 static void squidaio_do_stat(squidaio_request_t
*);
90 static void squidaio_do_unlink(squidaio_request_t
*);
92 static void *squidaio_do_opendir(squidaio_request_t
*);
94 static void squidaio_debug(squidaio_request_t
*);
95 static void squidaio_poll_queues(void);
97 static squidaio_thread_t
*threads
= NULL
;
98 static int squidaio_initialised
= 0;
100 #define AIO_LARGE_BUFS 16384
101 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
102 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
103 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
104 #define AIO_MICRO_BUFS 128
106 static MemAllocator
*squidaio_large_bufs
= NULL
; /* 16K */
107 static MemAllocator
*squidaio_medium_bufs
= NULL
; /* 8K */
108 static MemAllocator
*squidaio_small_bufs
= NULL
; /* 4K */
109 static MemAllocator
*squidaio_tiny_bufs
= NULL
; /* 2K */
110 static MemAllocator
*squidaio_micro_bufs
= NULL
; /* 128K */
112 static int request_queue_len
= 0;
113 static MemAllocator
*squidaio_request_pool
= NULL
;
114 static MemAllocator
*squidaio_thread_pool
= NULL
;
115 static squidaio_request_queue_t request_queue
;
118 squidaio_request_t
*head
, **tailp
;
123 NULL
, &request_queue2
.head
125 static squidaio_request_queue_t done_queue
;
128 squidaio_request_t
*head
, **tailp
;
133 NULL
, &done_requests
.head
136 static HANDLE main_thread
;
138 static MemAllocator
*
139 squidaio_get_pool(int size
)
141 if (size
<= AIO_LARGE_BUFS
) {
142 if (size
<= AIO_MICRO_BUFS
)
143 return squidaio_micro_bufs
;
144 else if (size
<= AIO_TINY_BUFS
)
145 return squidaio_tiny_bufs
;
146 else if (size
<= AIO_SMALL_BUFS
)
147 return squidaio_small_bufs
;
148 else if (size
<= AIO_MEDIUM_BUFS
)
149 return squidaio_medium_bufs
;
151 return squidaio_large_bufs
;
158 squidaio_xmalloc(int size
)
163 if ((pool
= squidaio_get_pool(size
)) != NULL
) {
172 squidaio_xstrdup(const char *str
)
175 int len
= strlen(str
) + 1;
177 p
= (char *)squidaio_xmalloc(len
);
178 strncpy(p
, str
, len
);
184 squidaio_xfree(void *p
, int size
)
188 if ((pool
= squidaio_get_pool(size
)) != NULL
) {
195 squidaio_xstrfree(char *str
)
198 int len
= strlen(str
) + 1;
200 if ((pool
= squidaio_get_pool(len
)) != NULL
) {
210 squidaio_thread_t
*threadp
;
212 if (squidaio_initialised
)
215 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
216 GetCurrentThread(), /* pseudo handle to copy */
217 GetCurrentProcess(), /* pseudo handle, don't close */
219 0, /* required access */
220 FALSE
, /* child process's don't inherit the handle */
221 DUPLICATE_SAME_ACCESS
)) {
223 fatal("Couldn't get current thread handle");
226 /* Initialize request queue */
227 if ((request_queue
.mutex
= CreateMutex(NULL
, /* no inheritance */
228 FALSE
, /* start unowned (as per mutex_init) */
231 fatal("Failed to create mutex");
234 if ((request_queue
.cond
= CreateEvent(NULL
, /* no inheritance */
235 FALSE
, /* auto signal reset - which I think is pthreads like ? */
236 FALSE
, /* start non signaled */
239 fatal("Failed to create condition variable");
242 request_queue
.head
= NULL
;
244 request_queue
.tailp
= &request_queue
.head
;
246 request_queue
.requests
= 0;
248 request_queue
.blocked
= 0;
250 /* Initialize done queue */
252 if ((done_queue
.mutex
= CreateMutex(NULL
, /* no inheritance */
253 FALSE
, /* start unowned (as per mutex_init) */
256 fatal("Failed to create mutex");
259 if ((done_queue
.cond
= CreateEvent(NULL
, /* no inheritance */
260 TRUE
, /* manually signaled - which I think is pthreads like ? */
261 FALSE
, /* start non signaled */
264 fatal("Failed to create condition variable");
267 done_queue
.head
= NULL
;
269 done_queue
.tailp
= &done_queue
.head
;
271 done_queue
.requests
= 0;
273 done_queue
.blocked
= 0;
275 // Initialize the thread I/O pipes before creating any threads
276 // see bug 3189 comment 5 about race conditions.
277 CommIO::Initialize();
279 /* Create threads and get them to sit in their wait loop */
280 squidaio_thread_pool
= memPoolCreate("aio_thread", sizeof(squidaio_thread_t
));
284 for (i
= 0; i
< NUMTHREADS
; ++i
) {
285 threadp
= (squidaio_thread_t
*)squidaio_thread_pool
->alloc();
286 threadp
->status
= _THREAD_STARTING
;
287 threadp
->current_req
= NULL
;
288 threadp
->requests
= 0;
289 threadp
->next
= threads
;
292 if ((threadp
->thread
= CreateThread(NULL
, /* no security attributes */
293 0, /* use default stack size */
294 squidaio_thread_loop
, /* thread function */
295 threadp
, /* argument to thread function */
296 0, /* use default creation flags */
297 &(threadp
->dwThreadId
)) /* returns the thread identifier */
299 fprintf(stderr
, "Thread creation failed\n");
300 threadp
->status
= _THREAD_FAILED
;
304 /* Set the new thread priority above parent process */
305 SetThreadPriority(threadp
->thread
,THREAD_PRIORITY_ABOVE_NORMAL
);
308 /* Create request pool */
309 squidaio_request_pool
= memPoolCreate("aio_request", sizeof(squidaio_request_t
));
311 squidaio_large_bufs
= memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS
);
313 squidaio_medium_bufs
= memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS
);
315 squidaio_small_bufs
= memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS
);
317 squidaio_tiny_bufs
= memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS
);
319 squidaio_micro_bufs
= memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS
);
321 squidaio_initialised
= 1;
325 squidaio_shutdown(void)
327 squidaio_thread_t
*threadp
;
331 if (!squidaio_initialised
)
334 /* This is the same as in squidaio_sync */
336 squidaio_poll_queues();
337 } while (request_queue_len
> 0);
339 hthreads
= (HANDLE
*) xcalloc (NUMTHREADS
, sizeof (HANDLE
));
343 for (i
= 0; i
< NUMTHREADS
; ++i
) {
345 hthreads
[i
] = threadp
->thread
;
346 threadp
= threadp
->next
;
349 ReleaseMutex(request_queue
.mutex
);
350 ResetEvent(request_queue
.cond
);
351 ReleaseMutex(done_queue
.mutex
);
352 ResetEvent(done_queue
.cond
);
355 WaitForMultipleObjects(NUMTHREADS
, hthreads
, TRUE
, 2000);
357 for (i
= 0; i
< NUMTHREADS
; ++i
) {
358 CloseHandle(hthreads
[i
]);
361 CloseHandle(main_thread
);
362 CommIO::NotifyIOClose();
364 squidaio_initialised
= 0;
369 squidaio_thread_loop(LPVOID lpParam
)
371 squidaio_thread_t
*threadp
= (squidaio_thread_t
*)lpParam
;
372 squidaio_request_t
*request
;
373 HANDLE cond
; /* local copy of the event queue because win32 event handles
374 * don't atomically release the mutex as cond variables do. */
376 /* lock the thread info */
378 if (WAIT_FAILED
== WaitForSingleObject(request_queue
.mutex
, INFINITE
)) {
379 fatal("Can't get ownership of mutex\n");
382 /* duplicate the handle */
383 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
384 request_queue
.cond
, /* handle to copy */
385 GetCurrentProcess(), /* pseudo handle, don't close */
387 0, /* required access */
388 FALSE
, /* child process's don't inherit the handle */
389 DUPLICATE_SAME_ACCESS
))
390 fatal("Can't duplicate mutex handle\n");
392 if (!ReleaseMutex(request_queue
.mutex
)) {
394 fatal("Can't release mutex\n");
401 threadp
->current_req
= request
= NULL
;
403 /* Get a request to process */
404 threadp
->status
= _THREAD_WAITING
;
407 CloseHandle(request_queue
.mutex
);
412 rv
= WaitForSingleObject(request_queue
.mutex
, INFINITE
);
414 if (rv
== WAIT_FAILED
) {
419 while (!request_queue
.head
) {
420 if (!ReleaseMutex(request_queue
.mutex
)) {
422 threadp
->status
= _THREAD_FAILED
;
427 rv
= WaitForSingleObject(cond
, INFINITE
);
429 if (rv
== WAIT_FAILED
) {
434 rv
= WaitForSingleObject(request_queue
.mutex
, INFINITE
);
436 if (rv
== WAIT_FAILED
) {
442 request
= request_queue
.head
;
445 request_queue
.head
= request
->next
;
447 if (!request_queue
.head
)
448 request_queue
.tailp
= &request_queue
.head
;
450 if (!ReleaseMutex(request_queue
.mutex
)) {
457 /* process the request */
458 threadp
->status
= _THREAD_BUSY
;
460 request
->next
= NULL
;
462 threadp
->current_req
= request
;
466 if (!request
->cancelled
) {
467 switch (request
->request_type
) {
470 squidaio_do_open(request
);
474 squidaio_do_read(request
);
478 squidaio_do_write(request
);
482 squidaio_do_close(request
);
486 squidaio_do_unlink(request
);
489 #if AIO_OPENDIR /* Opendir not implemented yet */
491 case _AIO_OP_OPENDIR
:
492 squidaio_do_opendir(request
);
497 squidaio_do_stat(request
);
502 request
->err
= EINVAL
;
505 } else { /* cancelled */
507 request
->err
= EINTR
;
510 threadp
->status
= _THREAD_DONE
;
511 /* put the request in the done queue */
512 rv
= WaitForSingleObject(done_queue
.mutex
, INFINITE
);
514 if (rv
== WAIT_FAILED
) {
519 *done_queue
.tailp
= request
;
520 done_queue
.tailp
= &request
->next
;
522 if (!ReleaseMutex(done_queue
.mutex
)) {
527 CommIO::NotifyIOCompleted();
529 ++ threadp
->requests
;
530 } /* while forever */
535 } /* squidaio_thread_loop */
538 squidaio_queue_request(squidaio_request_t
* request
)
540 static int high_start
= 0;
541 debugs(43, 9, "squidaio_queue_request: " << request
<< " type=" << request
->request_type
<< " result=" << request
->resultp
);
542 /* Mark it as not executed (failing result, no error) */
545 /* Internal housekeeping */
546 request_queue_len
+= 1;
547 request
->resultp
->_data
= request
;
548 /* Play some tricks with the request_queue2 queue */
549 request
->next
= NULL
;
551 if (WaitForSingleObject(request_queue
.mutex
, 0) == WAIT_OBJECT_0
) {
552 if (request_queue2
.head
) {
553 /* Grab blocked requests */
554 *request_queue
.tailp
= request_queue2
.head
;
555 request_queue
.tailp
= request_queue2
.tailp
;
558 /* Enqueue request */
559 *request_queue
.tailp
= request
;
561 request_queue
.tailp
= &request
->next
;
563 if (!SetEvent(request_queue
.cond
))
564 fatal("Couldn't push queue");
566 if (!ReleaseMutex(request_queue
.mutex
)) {
567 /* unexpected error */
568 fatal("Couldn't push queue");
573 if (request_queue2
.head
) {
574 /* Clear queue of blocked requests */
575 request_queue2
.head
= NULL
;
576 request_queue2
.tailp
= &request_queue2
.head
;
579 /* Oops, the request queue is blocked, use request_queue2 */
580 *request_queue2
.tailp
= request
;
581 request_queue2
.tailp
= &request
->next
;
584 if (request_queue2
.head
) {
585 static uint64_t filter
= 0;
586 static uint64_t filter_limit
= 8196;
588 if (++filter
>= filter_limit
) {
589 filter_limit
+= filter
;
591 debugs(43, DBG_IMPORTANT
, "squidaio_queue_request: WARNING - Queue congestion (growing to " << filter_limit
<< ")");
595 /* Warn if out of threads */
596 if (request_queue_len
> MAGIC1
) {
597 static int last_warn
= 0;
598 static int queue_high
, queue_low
;
600 if (high_start
== 0) {
601 high_start
= (int)squid_curtime
;
602 queue_high
= request_queue_len
;
603 queue_low
= request_queue_len
;
606 if (request_queue_len
> queue_high
)
607 queue_high
= request_queue_len
;
609 if (request_queue_len
< queue_low
)
610 queue_low
= request_queue_len
;
612 if (squid_curtime
>= (last_warn
+ 15) &&
613 squid_curtime
>= (high_start
+ 5)) {
614 debugs(43, DBG_IMPORTANT
, "squidaio_queue_request: WARNING - Disk I/O overloading");
616 if (squid_curtime
>= (high_start
+ 15))
617 debugs(43, DBG_IMPORTANT
, "squidaio_queue_request: Queue Length: current=" <<
618 request_queue_len
<< ", high=" << queue_high
<<
619 ", low=" << queue_low
<< ", duration=" <<
620 (long int) (squid_curtime
- high_start
));
622 last_warn
= (int)squid_curtime
;
628 /* Warn if seriously overloaded */
629 if (request_queue_len
> RIDICULOUS_LENGTH
) {
630 debugs(43, DBG_CRITICAL
, "squidaio_queue_request: Async request queue growing uncontrollably!");
631 debugs(43, DBG_CRITICAL
, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
633 debugs(43, DBG_CRITICAL
, "squidaio_queue_request: Synced");
635 } /* squidaio_queue_request */
638 squidaio_cleanup_request(squidaio_request_t
* requestp
)
640 squidaio_result_t
*resultp
= requestp
->resultp
;
641 int cancelled
= requestp
->cancelled
;
643 /* Free allocated structures and copy data back to user space if the */
644 /* request hasn't been cancelled */
646 switch (requestp
->request_type
) {
650 if (!cancelled
&& requestp
->ret
== 0)
651 memcpy(requestp
->statp
, requestp
->tmpstatp
, sizeof(struct stat
));
653 squidaio_xfree(requestp
->tmpstatp
, sizeof(struct stat
));
655 squidaio_xstrfree(requestp
->path
);
660 if (cancelled
&& requestp
->ret
>= 0)
661 /* The open() was cancelled but completed */
662 close(requestp
->ret
);
664 squidaio_xstrfree(requestp
->path
);
669 if (cancelled
&& requestp
->ret
< 0)
670 /* The close() was cancelled and never got executed */
677 case _AIO_OP_OPENDIR
:
678 squidaio_xstrfree(requestp
->path
);
692 if (resultp
!= NULL
&& !cancelled
) {
693 resultp
->aio_return
= requestp
->ret
;
694 resultp
->aio_errno
= requestp
->err
;
697 squidaio_request_pool
->freeOne(requestp
);
698 } /* squidaio_cleanup_request */
701 squidaio_cancel(squidaio_result_t
* resultp
)
703 squidaio_request_t
*request
= (squidaio_request_t
*)resultp
->_data
;
705 if (request
&& request
->resultp
== resultp
) {
706 debugs(43, 9, "squidaio_cancel: " << request
<< " type=" << request
->request_type
<< " result=" << request
->resultp
);
707 request
->cancelled
= 1;
708 request
->resultp
= NULL
;
709 resultp
->_data
= NULL
;
710 resultp
->result_type
= _AIO_OP_NONE
;
715 } /* squidaio_cancel */
718 squidaio_open(const char *path
, int oflag
, mode_t mode
, squidaio_result_t
* resultp
)
721 squidaio_request_t
*requestp
;
723 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
725 requestp
->path
= (char *) squidaio_xstrdup(path
);
727 requestp
->oflag
= oflag
;
729 requestp
->mode
= mode
;
731 requestp
->resultp
= resultp
;
733 requestp
->request_type
= _AIO_OP_OPEN
;
735 requestp
->cancelled
= 0;
737 resultp
->result_type
= _AIO_OP_OPEN
;
739 squidaio_queue_request(requestp
);
745 squidaio_do_open(squidaio_request_t
* requestp
)
747 requestp
->ret
= open(requestp
->path
, requestp
->oflag
, requestp
->mode
);
748 requestp
->err
= errno
;
752 squidaio_read(int fd
, char *bufp
, size_t bufs
, off_t offset
, int whence
, squidaio_result_t
* resultp
)
754 squidaio_request_t
*requestp
;
756 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
760 requestp
->bufferp
= bufp
;
762 requestp
->buflen
= bufs
;
764 requestp
->offset
= offset
;
766 requestp
->whence
= whence
;
768 requestp
->resultp
= resultp
;
770 requestp
->request_type
= _AIO_OP_READ
;
772 requestp
->cancelled
= 0;
774 resultp
->result_type
= _AIO_OP_READ
;
776 squidaio_queue_request(requestp
);
782 squidaio_do_read(squidaio_request_t
* requestp
)
784 lseek(requestp
->fd
, requestp
->offset
, requestp
->whence
);
786 if (!ReadFile((HANDLE
)_get_osfhandle(requestp
->fd
), requestp
->bufferp
,
787 requestp
->buflen
, (LPDWORD
)&requestp
->ret
, NULL
)) {
788 WIN32_maperror(GetLastError());
792 requestp
->err
= errno
;
796 squidaio_write(int fd
, char *bufp
, size_t bufs
, off_t offset
, int whence
, squidaio_result_t
* resultp
)
798 squidaio_request_t
*requestp
;
800 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
804 requestp
->bufferp
= bufp
;
806 requestp
->buflen
= bufs
;
808 requestp
->offset
= offset
;
810 requestp
->whence
= whence
;
812 requestp
->resultp
= resultp
;
814 requestp
->request_type
= _AIO_OP_WRITE
;
816 requestp
->cancelled
= 0;
818 resultp
->result_type
= _AIO_OP_WRITE
;
820 squidaio_queue_request(requestp
);
826 squidaio_do_write(squidaio_request_t
* requestp
)
828 if (!WriteFile((HANDLE
)_get_osfhandle(requestp
->fd
), requestp
->bufferp
,
829 requestp
->buflen
, (LPDWORD
)&requestp
->ret
, NULL
)) {
830 WIN32_maperror(GetLastError());
834 requestp
->err
= errno
;
838 squidaio_close(int fd
, squidaio_result_t
* resultp
)
840 squidaio_request_t
*requestp
;
842 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
846 requestp
->resultp
= resultp
;
848 requestp
->request_type
= _AIO_OP_CLOSE
;
850 requestp
->cancelled
= 0;
852 resultp
->result_type
= _AIO_OP_CLOSE
;
854 squidaio_queue_request(requestp
);
860 squidaio_do_close(squidaio_request_t
* requestp
)
862 if ((requestp
->ret
= close(requestp
->fd
)) < 0) {
863 debugs(43, DBG_CRITICAL
, "squidaio_do_close: FD " << requestp
->fd
<< ", errno " << errno
);
867 requestp
->err
= errno
;
872 squidaio_stat(const char *path
, struct stat
*sb
, squidaio_result_t
* resultp
)
875 squidaio_request_t
*requestp
;
877 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
879 requestp
->path
= (char *) squidaio_xstrdup(path
);
881 requestp
->statp
= sb
;
883 requestp
->tmpstatp
= (struct stat
*) squidaio_xmalloc(sizeof(struct stat
));
885 requestp
->resultp
= resultp
;
887 requestp
->request_type
= _AIO_OP_STAT
;
889 requestp
->cancelled
= 0;
891 resultp
->result_type
= _AIO_OP_STAT
;
893 squidaio_queue_request(requestp
);
899 squidaio_do_stat(squidaio_request_t
* requestp
)
901 requestp
->ret
= stat(requestp
->path
, requestp
->tmpstatp
);
902 requestp
->err
= errno
;
906 squidaio_unlink(const char *path
, squidaio_result_t
* resultp
)
909 squidaio_request_t
*requestp
;
911 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
913 requestp
->path
= squidaio_xstrdup(path
);
915 requestp
->resultp
= resultp
;
917 requestp
->request_type
= _AIO_OP_UNLINK
;
919 requestp
->cancelled
= 0;
921 resultp
->result_type
= _AIO_OP_UNLINK
;
923 squidaio_queue_request(requestp
);
929 squidaio_do_unlink(squidaio_request_t
* requestp
)
931 requestp
->ret
= unlink(requestp
->path
);
932 requestp
->err
= errno
;
936 /* XXX squidaio_opendir NOT implemented yet.. */
939 squidaio_opendir(const char *path
, squidaio_result_t
* resultp
)
941 squidaio_request_t
*requestp
;
944 requestp
= squidaio_request_pool
->alloc();
946 resultp
->result_type
= _AIO_OP_OPENDIR
;
952 squidaio_do_opendir(squidaio_request_t
* requestp
)
954 /* NOT IMPLEMENTED */
960 squidaio_poll_queues(void)
962 /* kick "overflow" request queue */
964 if (request_queue2
.head
&&
965 (WaitForSingleObject(request_queue
.mutex
, 0 )== WAIT_OBJECT_0
)) {
966 *request_queue
.tailp
= request_queue2
.head
;
967 request_queue
.tailp
= request_queue2
.tailp
;
969 if (!SetEvent(request_queue
.cond
))
970 fatal("couldn't push queue\n");
972 if (!ReleaseMutex(request_queue
.mutex
)) {
973 /* unexpected error */
977 request_queue2
.head
= NULL
;
978 request_queue2
.tailp
= &request_queue2
.head
;
981 /* poll done queue */
982 if (done_queue
.head
&&
983 (WaitForSingleObject(done_queue
.mutex
, 0)==WAIT_OBJECT_0
)) {
985 struct squidaio_request_t
*requests
= done_queue
.head
;
986 done_queue
.head
= NULL
;
987 done_queue
.tailp
= &done_queue
.head
;
989 if (!ReleaseMutex(done_queue
.mutex
)) {
990 /* unexpected error */
994 *done_requests
.tailp
= requests
;
995 request_queue_len
-= 1;
997 while (requests
->next
) {
998 requests
= requests
->next
;
999 request_queue_len
-= 1;
1002 done_requests
.tailp
= &requests
->next
;
1007 squidaio_poll_done(void)
1009 squidaio_request_t
*request
;
1010 squidaio_result_t
*resultp
;
1015 request
= done_requests
.head
;
1017 if (request
== NULL
&& !polled
) {
1018 CommIO::ResetNotifications();
1019 squidaio_poll_queues();
1021 request
= done_requests
.head
;
1028 debugs(43, 9, "squidaio_poll_done: " << request
<< " type=" << request
->request_type
<< " result=" << request
->resultp
);
1029 done_requests
.head
= request
->next
;
1031 if (!done_requests
.head
)
1032 done_requests
.tailp
= &done_requests
.head
;
1034 resultp
= request
->resultp
;
1036 cancelled
= request
->cancelled
;
1038 squidaio_debug(request
);
1040 debugs(43, 5, "DONE: " << request
->ret
<< " -> " << request
->err
);
1042 squidaio_cleanup_request(request
);
1048 } /* squidaio_poll_done */
1051 squidaio_operations_pending(void)
1053 return request_queue_len
+ (done_requests
.head
? 1 : 0);
1059 /* XXX This might take a while if the queue is large.. */
1062 squidaio_poll_queues();
1063 } while (request_queue_len
> 0);
1065 return squidaio_operations_pending();
1069 squidaio_get_queue_len(void)
1071 return request_queue_len
;
1075 squidaio_debug(squidaio_request_t
* request
)
1077 switch (request
->request_type
) {
1080 debugs(43, 5, "OPEN of " << request
->path
<< " to FD " << request
->ret
);
1084 debugs(43, 5, "READ on fd: " << request
->fd
);
1088 debugs(43, 5, "WRITE on fd: " << request
->fd
);
1092 debugs(43, 5, "CLOSE of fd: " << request
->fd
);
1095 case _AIO_OP_UNLINK
:
1096 debugs(43, 5, "UNLINK of " << request
->path
);
1105 squidaio_stats(StoreEntry
* sentry
)
1107 squidaio_thread_t
*threadp
;
1110 if (!squidaio_initialised
)
1113 storeAppendPrintf(sentry
, "\n\nThreads Status:\n");
1115 storeAppendPrintf(sentry
, "#\tID\t# Requests\n");
1119 for (i
= 0; i
< NUMTHREADS
; ++i
) {
1120 storeAppendPrintf(sentry
, "%i\t0x%lx\t%ld\n", i
+ 1, threadp
->dwThreadId
, threadp
->requests
);
1121 threadp
= threadp
->next
;