2 * DEBUG: section 43 Windows AIOPS
3 * AUTHOR: Stewart Forster <slf@connect.com.au>
4 * AUTHOR: Robert Collins <robertc@squid-cache.org>
5 * AUTHOR: Guido Serassio <serassio@squid-cache.org>
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
36 #include "DiskIO/DiskThreads/CommIO.h"
37 #include "DiskThreads.h"
39 #include "SquidConfig.h"
40 #include "SquidTime.h"
49 #define RIDICULOUS_LENGTH 4096
51 enum _squidaio_thread_status
{
58 typedef enum _squidaio_thread_status squidaio_thread_status
;
60 typedef struct squidaio_request_t
{
62 struct squidaio_request_t
*next
;
63 squidaio_request_type request_type
;
77 struct stat
*tmpstatp
;
80 squidaio_result_t
*resultp
;
83 typedef struct squidaio_request_queue_t
{
85 HANDLE cond
; /* See Event objects */
86 squidaio_request_t
*volatile head
;
87 squidaio_request_t
*volatile *volatile tailp
;
88 unsigned long requests
;
89 unsigned long blocked
; /* main failed to lock the queue */
90 } squidaio_request_queue_t
;
92 typedef struct squidaio_thread_t squidaio_thread_t
;
94 struct squidaio_thread_t
{
95 squidaio_thread_t
*next
;
97 DWORD dwThreadId
; /* thread ID */
98 squidaio_thread_status status
;
100 struct squidaio_request_t
*current_req
;
101 unsigned long requests
;
105 static void squidaio_queue_request(squidaio_request_t
*);
106 static void squidaio_cleanup_request(squidaio_request_t
*);
107 static DWORD WINAPI
squidaio_thread_loop( LPVOID lpParam
);
108 static void squidaio_do_open(squidaio_request_t
*);
109 static void squidaio_do_read(squidaio_request_t
*);
110 static void squidaio_do_write(squidaio_request_t
*);
111 static void squidaio_do_close(squidaio_request_t
*);
112 static void squidaio_do_stat(squidaio_request_t
*);
113 static void squidaio_do_unlink(squidaio_request_t
*);
115 static void *squidaio_do_opendir(squidaio_request_t
*);
117 static void squidaio_debug(squidaio_request_t
*);
118 static void squidaio_poll_queues(void);
120 static squidaio_thread_t
*threads
= NULL
;
121 static int squidaio_initialised
= 0;
123 #define AIO_LARGE_BUFS 16384
124 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
125 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
126 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
127 #define AIO_MICRO_BUFS 128
129 static MemAllocator
*squidaio_large_bufs
= NULL
; /* 16K */
130 static MemAllocator
*squidaio_medium_bufs
= NULL
; /* 8K */
131 static MemAllocator
*squidaio_small_bufs
= NULL
; /* 4K */
132 static MemAllocator
*squidaio_tiny_bufs
= NULL
; /* 2K */
133 static MemAllocator
*squidaio_micro_bufs
= NULL
; /* 128K */
135 static int request_queue_len
= 0;
136 static MemAllocator
*squidaio_request_pool
= NULL
;
137 static MemAllocator
*squidaio_thread_pool
= NULL
;
138 static squidaio_request_queue_t request_queue
;
141 squidaio_request_t
*head
, **tailp
;
146 NULL
, &request_queue2
.head
148 static squidaio_request_queue_t done_queue
;
151 squidaio_request_t
*head
, **tailp
;
156 NULL
, &done_requests
.head
159 static HANDLE main_thread
;
161 static MemAllocator
*
162 squidaio_get_pool(int size
)
164 if (size
<= AIO_LARGE_BUFS
) {
165 if (size
<= AIO_MICRO_BUFS
)
166 return squidaio_micro_bufs
;
167 else if (size
<= AIO_TINY_BUFS
)
168 return squidaio_tiny_bufs
;
169 else if (size
<= AIO_SMALL_BUFS
)
170 return squidaio_small_bufs
;
171 else if (size
<= AIO_MEDIUM_BUFS
)
172 return squidaio_medium_bufs
;
174 return squidaio_large_bufs
;
181 squidaio_xmalloc(int size
)
186 if ((pool
= squidaio_get_pool(size
)) != NULL
) {
195 squidaio_xstrdup(const char *str
)
198 int len
= strlen(str
) + 1;
200 p
= (char *)squidaio_xmalloc(len
);
201 strncpy(p
, str
, len
);
207 squidaio_xfree(void *p
, int size
)
211 if ((pool
= squidaio_get_pool(size
)) != NULL
) {
218 squidaio_xstrfree(char *str
)
221 int len
= strlen(str
) + 1;
223 if ((pool
= squidaio_get_pool(len
)) != NULL
) {
233 squidaio_thread_t
*threadp
;
235 if (squidaio_initialised
)
238 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
239 GetCurrentThread(), /* pseudo handle to copy */
240 GetCurrentProcess(), /* pseudo handle, don't close */
242 0, /* required access */
243 FALSE
, /* child process's don't inherit the handle */
244 DUPLICATE_SAME_ACCESS
)) {
246 fatal("Couldn't get current thread handle");
249 /* Initialize request queue */
250 if ((request_queue
.mutex
= CreateMutex(NULL
, /* no inheritance */
251 FALSE
, /* start unowned (as per mutex_init) */
254 fatal("Failed to create mutex");
257 if ((request_queue
.cond
= CreateEvent(NULL
, /* no inheritance */
258 FALSE
, /* auto signal reset - which I think is pthreads like ? */
259 FALSE
, /* start non signaled */
262 fatal("Failed to create condition variable");
265 request_queue
.head
= NULL
;
267 request_queue
.tailp
= &request_queue
.head
;
269 request_queue
.requests
= 0;
271 request_queue
.blocked
= 0;
273 /* Initialize done queue */
275 if ((done_queue
.mutex
= CreateMutex(NULL
, /* no inheritance */
276 FALSE
, /* start unowned (as per mutex_init) */
279 fatal("Failed to create mutex");
282 if ((done_queue
.cond
= CreateEvent(NULL
, /* no inheritance */
283 TRUE
, /* manually signaled - which I think is pthreads like ? */
284 FALSE
, /* start non signaled */
287 fatal("Failed to create condition variable");
290 done_queue
.head
= NULL
;
292 done_queue
.tailp
= &done_queue
.head
;
294 done_queue
.requests
= 0;
296 done_queue
.blocked
= 0;
298 // Initialize the thread I/O pipes before creating any threads
299 // see bug 3189 comment 5 about race conditions.
300 CommIO::Initialize();
302 /* Create threads and get them to sit in their wait loop */
303 squidaio_thread_pool
= memPoolCreate("aio_thread", sizeof(squidaio_thread_t
));
307 for (i
= 0; i
< NUMTHREADS
; ++i
) {
308 threadp
= (squidaio_thread_t
*)squidaio_thread_pool
->alloc();
309 threadp
->status
= _THREAD_STARTING
;
310 threadp
->current_req
= NULL
;
311 threadp
->requests
= 0;
312 threadp
->next
= threads
;
315 if ((threadp
->thread
= CreateThread(NULL
, /* no security attributes */
316 0, /* use default stack size */
317 squidaio_thread_loop
, /* thread function */
318 threadp
, /* argument to thread function */
319 0, /* use default creation flags */
320 &(threadp
->dwThreadId
)) /* returns the thread identifier */
322 fprintf(stderr
, "Thread creation failed\n");
323 threadp
->status
= _THREAD_FAILED
;
327 /* Set the new thread priority above parent process */
328 SetThreadPriority(threadp
->thread
,THREAD_PRIORITY_ABOVE_NORMAL
);
331 /* Create request pool */
332 squidaio_request_pool
= memPoolCreate("aio_request", sizeof(squidaio_request_t
));
334 squidaio_large_bufs
= memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS
);
336 squidaio_medium_bufs
= memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS
);
338 squidaio_small_bufs
= memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS
);
340 squidaio_tiny_bufs
= memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS
);
342 squidaio_micro_bufs
= memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS
);
344 squidaio_initialised
= 1;
348 squidaio_shutdown(void)
350 squidaio_thread_t
*threadp
;
354 if (!squidaio_initialised
)
357 /* This is the same as in squidaio_sync */
359 squidaio_poll_queues();
360 } while (request_queue_len
> 0);
362 hthreads
= (HANDLE
*) xcalloc (NUMTHREADS
, sizeof (HANDLE
));
366 for (i
= 0; i
< NUMTHREADS
; ++i
) {
368 hthreads
[i
] = threadp
->thread
;
369 threadp
= threadp
->next
;
372 ReleaseMutex(request_queue
.mutex
);
373 ResetEvent(request_queue
.cond
);
374 ReleaseMutex(done_queue
.mutex
);
375 ResetEvent(done_queue
.cond
);
378 WaitForMultipleObjects(NUMTHREADS
, hthreads
, TRUE
, 2000);
380 for (i
= 0; i
< NUMTHREADS
; ++i
) {
381 CloseHandle(hthreads
[i
]);
384 CloseHandle(main_thread
);
385 CommIO::NotifyIOClose();
387 squidaio_initialised
= 0;
392 squidaio_thread_loop(LPVOID lpParam
)
394 squidaio_thread_t
*threadp
= (squidaio_thread_t
*)lpParam
;
395 squidaio_request_t
*request
;
396 HANDLE cond
; /* local copy of the event queue because win32 event handles
397 * don't atomically release the mutex as cond variables do. */
399 /* lock the thread info */
401 if (WAIT_FAILED
== WaitForSingleObject(request_queue
.mutex
, INFINITE
)) {
402 fatal("Can't get ownership of mutex\n");
405 /* duplicate the handle */
406 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
407 request_queue
.cond
, /* handle to copy */
408 GetCurrentProcess(), /* pseudo handle, don't close */
410 0, /* required access */
411 FALSE
, /* child process's don't inherit the handle */
412 DUPLICATE_SAME_ACCESS
))
413 fatal("Can't duplicate mutex handle\n");
415 if (!ReleaseMutex(request_queue
.mutex
)) {
417 fatal("Can't release mutex\n");
424 threadp
->current_req
= request
= NULL
;
426 /* Get a request to process */
427 threadp
->status
= _THREAD_WAITING
;
430 CloseHandle(request_queue
.mutex
);
435 rv
= WaitForSingleObject(request_queue
.mutex
, INFINITE
);
437 if (rv
== WAIT_FAILED
) {
442 while (!request_queue
.head
) {
443 if (!ReleaseMutex(request_queue
.mutex
)) {
445 threadp
->status
= _THREAD_FAILED
;
450 rv
= WaitForSingleObject(cond
, INFINITE
);
452 if (rv
== WAIT_FAILED
) {
457 rv
= WaitForSingleObject(request_queue
.mutex
, INFINITE
);
459 if (rv
== WAIT_FAILED
) {
465 request
= request_queue
.head
;
468 request_queue
.head
= request
->next
;
470 if (!request_queue
.head
)
471 request_queue
.tailp
= &request_queue
.head
;
473 if (!ReleaseMutex(request_queue
.mutex
)) {
480 /* process the request */
481 threadp
->status
= _THREAD_BUSY
;
483 request
->next
= NULL
;
485 threadp
->current_req
= request
;
489 if (!request
->cancelled
) {
490 switch (request
->request_type
) {
493 squidaio_do_open(request
);
497 squidaio_do_read(request
);
501 squidaio_do_write(request
);
505 squidaio_do_close(request
);
509 squidaio_do_unlink(request
);
512 #if AIO_OPENDIR /* Opendir not implemented yet */
514 case _AIO_OP_OPENDIR
:
515 squidaio_do_opendir(request
);
520 squidaio_do_stat(request
);
525 request
->err
= EINVAL
;
528 } else { /* cancelled */
530 request
->err
= EINTR
;
533 threadp
->status
= _THREAD_DONE
;
534 /* put the request in the done queue */
535 rv
= WaitForSingleObject(done_queue
.mutex
, INFINITE
);
537 if (rv
== WAIT_FAILED
) {
542 *done_queue
.tailp
= request
;
543 done_queue
.tailp
= &request
->next
;
545 if (!ReleaseMutex(done_queue
.mutex
)) {
550 CommIO::NotifyIOCompleted();
552 ++ threadp
->requests
;
553 } /* while forever */
558 } /* squidaio_thread_loop */
561 squidaio_queue_request(squidaio_request_t
* request
)
563 static int high_start
= 0;
564 debugs(43, 9, "squidaio_queue_request: " << request
<< " type=" << request
->request_type
<< " result=" << request
->resultp
);
565 /* Mark it as not executed (failing result, no error) */
568 /* Internal housekeeping */
569 request_queue_len
+= 1;
570 request
->resultp
->_data
= request
;
571 /* Play some tricks with the request_queue2 queue */
572 request
->next
= NULL
;
574 if (WaitForSingleObject(request_queue
.mutex
, 0) == WAIT_OBJECT_0
) {
575 if (request_queue2
.head
) {
576 /* Grab blocked requests */
577 *request_queue
.tailp
= request_queue2
.head
;
578 request_queue
.tailp
= request_queue2
.tailp
;
581 /* Enqueue request */
582 *request_queue
.tailp
= request
;
584 request_queue
.tailp
= &request
->next
;
586 if (!SetEvent(request_queue
.cond
))
587 fatal("Couldn't push queue");
589 if (!ReleaseMutex(request_queue
.mutex
)) {
590 /* unexpected error */
591 fatal("Couldn't push queue");
596 if (request_queue2
.head
) {
597 /* Clear queue of blocked requests */
598 request_queue2
.head
= NULL
;
599 request_queue2
.tailp
= &request_queue2
.head
;
602 /* Oops, the request queue is blocked, use request_queue2 */
603 *request_queue2
.tailp
= request
;
604 request_queue2
.tailp
= &request
->next
;
607 if (request_queue2
.head
) {
608 static int filter
= 0;
609 static int filter_limit
= 8;
611 if (++filter
>= filter_limit
) {
612 filter_limit
+= filter
;
614 debugs(43, DBG_IMPORTANT
, "squidaio_queue_request: WARNING - Queue congestion");
618 /* Warn if out of threads */
619 if (request_queue_len
> MAGIC1
) {
620 static int last_warn
= 0;
621 static int queue_high
, queue_low
;
623 if (high_start
== 0) {
624 high_start
= (int)squid_curtime
;
625 queue_high
= request_queue_len
;
626 queue_low
= request_queue_len
;
629 if (request_queue_len
> queue_high
)
630 queue_high
= request_queue_len
;
632 if (request_queue_len
< queue_low
)
633 queue_low
= request_queue_len
;
635 if (squid_curtime
>= (last_warn
+ 15) &&
636 squid_curtime
>= (high_start
+ 5)) {
637 debugs(43, DBG_IMPORTANT
, "squidaio_queue_request: WARNING - Disk I/O overloading");
639 if (squid_curtime
>= (high_start
+ 15))
640 debugs(43, DBG_IMPORTANT
, "squidaio_queue_request: Queue Length: current=" <<
641 request_queue_len
<< ", high=" << queue_high
<<
642 ", low=" << queue_low
<< ", duration=" <<
643 (long int) (squid_curtime
- high_start
));
645 last_warn
= (int)squid_curtime
;
651 /* Warn if seriously overloaded */
652 if (request_queue_len
> RIDICULOUS_LENGTH
) {
653 debugs(43, DBG_CRITICAL
, "squidaio_queue_request: Async request queue growing uncontrollably!");
654 debugs(43, DBG_CRITICAL
, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
656 debugs(43, DBG_CRITICAL
, "squidaio_queue_request: Synced");
658 } /* squidaio_queue_request */
661 squidaio_cleanup_request(squidaio_request_t
* requestp
)
663 squidaio_result_t
*resultp
= requestp
->resultp
;
664 int cancelled
= requestp
->cancelled
;
666 /* Free allocated structures and copy data back to user space if the */
667 /* request hasn't been cancelled */
669 switch (requestp
->request_type
) {
673 if (!cancelled
&& requestp
->ret
== 0)
674 memcpy(requestp
->statp
, requestp
->tmpstatp
, sizeof(struct stat
));
676 squidaio_xfree(requestp
->tmpstatp
, sizeof(struct stat
));
678 squidaio_xstrfree(requestp
->path
);
683 if (cancelled
&& requestp
->ret
>= 0)
684 /* The open() was cancelled but completed */
685 close(requestp
->ret
);
687 squidaio_xstrfree(requestp
->path
);
692 if (cancelled
&& requestp
->ret
< 0)
693 /* The close() was cancelled and never got executed */
700 case _AIO_OP_OPENDIR
:
701 squidaio_xstrfree(requestp
->path
);
715 if (resultp
!= NULL
&& !cancelled
) {
716 resultp
->aio_return
= requestp
->ret
;
717 resultp
->aio_errno
= requestp
->err
;
720 squidaio_request_pool
->freeOne(requestp
);
721 } /* squidaio_cleanup_request */
724 squidaio_cancel(squidaio_result_t
* resultp
)
726 squidaio_request_t
*request
= (squidaio_request_t
*)resultp
->_data
;
728 if (request
&& request
->resultp
== resultp
) {
729 debugs(43, 9, "squidaio_cancel: " << request
<< " type=" << request
->request_type
<< " result=" << request
->resultp
);
730 request
->cancelled
= 1;
731 request
->resultp
= NULL
;
732 resultp
->_data
= NULL
;
733 resultp
->result_type
= _AIO_OP_NONE
;
738 } /* squidaio_cancel */
741 squidaio_open(const char *path
, int oflag
, mode_t mode
, squidaio_result_t
* resultp
)
744 squidaio_request_t
*requestp
;
746 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
748 requestp
->path
= (char *) squidaio_xstrdup(path
);
750 requestp
->oflag
= oflag
;
752 requestp
->mode
= mode
;
754 requestp
->resultp
= resultp
;
756 requestp
->request_type
= _AIO_OP_OPEN
;
758 requestp
->cancelled
= 0;
760 resultp
->result_type
= _AIO_OP_OPEN
;
762 squidaio_queue_request(requestp
);
768 squidaio_do_open(squidaio_request_t
* requestp
)
770 requestp
->ret
= open(requestp
->path
, requestp
->oflag
, requestp
->mode
);
771 requestp
->err
= errno
;
775 squidaio_read(int fd
, char *bufp
, size_t bufs
, off_t offset
, int whence
, squidaio_result_t
* resultp
)
777 squidaio_request_t
*requestp
;
779 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
783 requestp
->bufferp
= bufp
;
785 requestp
->buflen
= bufs
;
787 requestp
->offset
= offset
;
789 requestp
->whence
= whence
;
791 requestp
->resultp
= resultp
;
793 requestp
->request_type
= _AIO_OP_READ
;
795 requestp
->cancelled
= 0;
797 resultp
->result_type
= _AIO_OP_READ
;
799 squidaio_queue_request(requestp
);
805 squidaio_do_read(squidaio_request_t
* requestp
)
807 lseek(requestp
->fd
, requestp
->offset
, requestp
->whence
);
809 if (!ReadFile((HANDLE
)_get_osfhandle(requestp
->fd
), requestp
->bufferp
,
810 requestp
->buflen
, (LPDWORD
)&requestp
->ret
, NULL
)) {
811 WIN32_maperror(GetLastError());
815 requestp
->err
= errno
;
819 squidaio_write(int fd
, char *bufp
, size_t bufs
, off_t offset
, int whence
, squidaio_result_t
* resultp
)
821 squidaio_request_t
*requestp
;
823 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
827 requestp
->bufferp
= bufp
;
829 requestp
->buflen
= bufs
;
831 requestp
->offset
= offset
;
833 requestp
->whence
= whence
;
835 requestp
->resultp
= resultp
;
837 requestp
->request_type
= _AIO_OP_WRITE
;
839 requestp
->cancelled
= 0;
841 resultp
->result_type
= _AIO_OP_WRITE
;
843 squidaio_queue_request(requestp
);
849 squidaio_do_write(squidaio_request_t
* requestp
)
851 if (!WriteFile((HANDLE
)_get_osfhandle(requestp
->fd
), requestp
->bufferp
,
852 requestp
->buflen
, (LPDWORD
)&requestp
->ret
, NULL
)) {
853 WIN32_maperror(GetLastError());
857 requestp
->err
= errno
;
861 squidaio_close(int fd
, squidaio_result_t
* resultp
)
863 squidaio_request_t
*requestp
;
865 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
869 requestp
->resultp
= resultp
;
871 requestp
->request_type
= _AIO_OP_CLOSE
;
873 requestp
->cancelled
= 0;
875 resultp
->result_type
= _AIO_OP_CLOSE
;
877 squidaio_queue_request(requestp
);
883 squidaio_do_close(squidaio_request_t
* requestp
)
885 if ((requestp
->ret
= close(requestp
->fd
)) < 0) {
886 debugs(43, DBG_CRITICAL
, "squidaio_do_close: FD " << requestp
->fd
<< ", errno " << errno
);
890 requestp
->err
= errno
;
895 squidaio_stat(const char *path
, struct stat
*sb
, squidaio_result_t
* resultp
)
898 squidaio_request_t
*requestp
;
900 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
902 requestp
->path
= (char *) squidaio_xstrdup(path
);
904 requestp
->statp
= sb
;
906 requestp
->tmpstatp
= (struct stat
*) squidaio_xmalloc(sizeof(struct stat
));
908 requestp
->resultp
= resultp
;
910 requestp
->request_type
= _AIO_OP_STAT
;
912 requestp
->cancelled
= 0;
914 resultp
->result_type
= _AIO_OP_STAT
;
916 squidaio_queue_request(requestp
);
922 squidaio_do_stat(squidaio_request_t
* requestp
)
924 requestp
->ret
= stat(requestp
->path
, requestp
->tmpstatp
);
925 requestp
->err
= errno
;
929 squidaio_unlink(const char *path
, squidaio_result_t
* resultp
)
932 squidaio_request_t
*requestp
;
934 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
936 requestp
->path
= squidaio_xstrdup(path
);
938 requestp
->resultp
= resultp
;
940 requestp
->request_type
= _AIO_OP_UNLINK
;
942 requestp
->cancelled
= 0;
944 resultp
->result_type
= _AIO_OP_UNLINK
;
946 squidaio_queue_request(requestp
);
952 squidaio_do_unlink(squidaio_request_t
* requestp
)
954 requestp
->ret
= unlink(requestp
->path
);
955 requestp
->err
= errno
;
959 /* XXX squidaio_opendir NOT implemented yet.. */
962 squidaio_opendir(const char *path
, squidaio_result_t
* resultp
)
964 squidaio_request_t
*requestp
;
967 requestp
= squidaio_request_pool
->alloc();
969 resultp
->result_type
= _AIO_OP_OPENDIR
;
975 squidaio_do_opendir(squidaio_request_t
* requestp
)
977 /* NOT IMPLEMENTED */
983 squidaio_poll_queues(void)
985 /* kick "overflow" request queue */
987 if (request_queue2
.head
&&
988 (WaitForSingleObject(request_queue
.mutex
, 0 )== WAIT_OBJECT_0
)) {
989 *request_queue
.tailp
= request_queue2
.head
;
990 request_queue
.tailp
= request_queue2
.tailp
;
992 if (!SetEvent(request_queue
.cond
))
993 fatal("couldn't push queue\n");
995 if (!ReleaseMutex(request_queue
.mutex
)) {
996 /* unexpected error */
1000 request_queue2
.head
= NULL
;
1001 request_queue2
.tailp
= &request_queue2
.head
;
1004 /* poll done queue */
1005 if (done_queue
.head
&&
1006 (WaitForSingleObject(done_queue
.mutex
, 0)==WAIT_OBJECT_0
)) {
1008 struct squidaio_request_t
*requests
= done_queue
.head
;
1009 done_queue
.head
= NULL
;
1010 done_queue
.tailp
= &done_queue
.head
;
1012 if (!ReleaseMutex(done_queue
.mutex
)) {
1013 /* unexpected error */
1017 *done_requests
.tailp
= requests
;
1018 request_queue_len
-= 1;
1020 while (requests
->next
) {
1021 requests
= requests
->next
;
1022 request_queue_len
-= 1;
1025 done_requests
.tailp
= &requests
->next
;
1030 squidaio_poll_done(void)
1032 squidaio_request_t
*request
;
1033 squidaio_result_t
*resultp
;
1038 request
= done_requests
.head
;
1040 if (request
== NULL
&& !polled
) {
1041 CommIO::ResetNotifications();
1042 squidaio_poll_queues();
1044 request
= done_requests
.head
;
1051 debugs(43, 9, "squidaio_poll_done: " << request
<< " type=" << request
->request_type
<< " result=" << request
->resultp
);
1052 done_requests
.head
= request
->next
;
1054 if (!done_requests
.head
)
1055 done_requests
.tailp
= &done_requests
.head
;
1057 resultp
= request
->resultp
;
1059 cancelled
= request
->cancelled
;
1061 squidaio_debug(request
);
1063 debugs(43, 5, "DONE: " << request
->ret
<< " -> " << request
->err
);
1065 squidaio_cleanup_request(request
);
1071 } /* squidaio_poll_done */
1074 squidaio_operations_pending(void)
1076 return request_queue_len
+ (done_requests
.head
? 1 : 0);
1082 /* XXX This might take a while if the queue is large.. */
1085 squidaio_poll_queues();
1086 } while (request_queue_len
> 0);
1088 return squidaio_operations_pending();
1092 squidaio_get_queue_len(void)
1094 return request_queue_len
;
1098 squidaio_debug(squidaio_request_t
* request
)
1100 switch (request
->request_type
) {
1103 debugs(43, 5, "OPEN of " << request
->path
<< " to FD " << request
->ret
);
1107 debugs(43, 5, "READ on fd: " << request
->fd
);
1111 debugs(43, 5, "WRITE on fd: " << request
->fd
);
1115 debugs(43, 5, "CLOSE of fd: " << request
->fd
);
1118 case _AIO_OP_UNLINK
:
1119 debugs(43, 5, "UNLINK of " << request
->path
);
1128 squidaio_stats(StoreEntry
* sentry
)
1130 squidaio_thread_t
*threadp
;
1133 if (!squidaio_initialised
)
1136 storeAppendPrintf(sentry
, "\n\nThreads Status:\n");
1138 storeAppendPrintf(sentry
, "#\tID\t# Requests\n");
1142 for (i
= 0; i
< NUMTHREADS
; ++i
) {
1143 storeAppendPrintf(sentry
, "%i\t0x%lx\t%ld\n", i
+ 1, threadp
->dwThreadId
, threadp
->requests
);
1144 threadp
= threadp
->next
;