2 * $Id: aiops_win32.cc,v 1.2 2006/09/09 15:29:59 serassio Exp $
4 * DEBUG: section 43 Windows AIOPS
5 * AUTHOR: Stewart Forster <slf@connect.com.au>
6 * AUTHOR: Robert Collins <robertc@squid-cache.org>
7 * AUTHOR: Guido Serassio <serassio@squid-cache.org>
9 * SQUID Web Proxy Cache http://www.squid-cache.org/
10 * ----------------------------------------------------------
12 * Squid is the result of efforts by numerous individuals from
13 * the Internet community; see the CONTRIBUTORS file for full
14 * details. Many organizations have provided support for Squid's
15 * development; see the SPONSORS file for full details. Squid is
16 * Copyrighted (C) 2001 by the Regents of the University of
17 * California; see the COPYRIGHT file for full details. Squid
18 * incorporates software developed and/or copyrighted by other
19 * sources; see the CREDITS file for full details.
21 * This program is free software; you can redistribute it and/or modify
22 * it under the terms of the GNU General Public License as published by
23 * the Free Software Foundation; either version 2 of the License, or
24 * (at your option) any later version.
26 * This program is distributed in the hope that it will be useful,
27 * but WITHOUT ANY WARRANTY; without even the implied warranty of
28 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
29 * GNU General Public License for more details.
31 * You should have received a copy of the GNU General Public License
32 * along with this program; if not, write to the Free Software
33 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
38 #include "squid_windows.h"
39 #include "DiskThreads.h"
42 #include <sys/types.h>
49 #include "SquidTime.h"
52 #define RIDICULOUS_LENGTH 4096
54 enum _squidaio_thread_status
{
61 typedef enum _squidaio_thread_status squidaio_thread_status
;
63 typedef struct squidaio_request_t
66 struct squidaio_request_t
*next
;
67 squidaio_request_type request_type
;
81 struct stat
*tmpstatp
;
84 squidaio_result_t
*resultp
;
89 typedef struct squidaio_request_queue_t
92 HANDLE cond
; /* See Event objects */
93 squidaio_request_t
*volatile head
;
94 squidaio_request_t
*volatile *volatile tailp
;
95 unsigned long requests
;
96 unsigned long blocked
; /* main failed to lock the queue */
99 squidaio_request_queue_t
;
101 typedef struct squidaio_thread_t squidaio_thread_t
;
103 struct squidaio_thread_t
105 squidaio_thread_t
*next
;
107 DWORD dwThreadId
; /* thread ID */
108 squidaio_thread_status status
;
110 struct squidaio_request_t
*current_req
;
111 unsigned long requests
;
115 static void squidaio_queue_request(squidaio_request_t
*);
116 static void squidaio_cleanup_request(squidaio_request_t
*);
117 static DWORD WINAPI
squidaio_thread_loop( LPVOID lpParam
);
118 static void squidaio_do_open(squidaio_request_t
*);
119 static void squidaio_do_read(squidaio_request_t
*);
120 static void squidaio_do_write(squidaio_request_t
*);
121 static void squidaio_do_close(squidaio_request_t
*);
122 static void squidaio_do_stat(squidaio_request_t
*);
124 static void squidaio_do_truncate(squidaio_request_t
*);
126 static void squidaio_do_unlink(squidaio_request_t
*);
129 static void *squidaio_do_opendir(squidaio_request_t
*);
131 static void squidaio_debug(squidaio_request_t
*);
132 static void squidaio_poll_queues(void);
134 static squidaio_thread_t
*threads
= NULL
;
135 static int squidaio_initialised
= 0;
138 #define AIO_LARGE_BUFS 16384
139 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
140 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
141 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
142 #define AIO_MICRO_BUFS 128
144 static MemAllocator
*squidaio_large_bufs
= NULL
; /* 16K */
145 static MemAllocator
*squidaio_medium_bufs
= NULL
; /* 8K */
146 static MemAllocator
*squidaio_small_bufs
= NULL
; /* 4K */
147 static MemAllocator
*squidaio_tiny_bufs
= NULL
; /* 2K */
148 static MemAllocator
*squidaio_micro_bufs
= NULL
; /* 128K */
150 static int request_queue_len
= 0;
151 static MemAllocator
*squidaio_request_pool
= NULL
;
152 static MemAllocator
*squidaio_thread_pool
= NULL
;
153 static squidaio_request_queue_t request_queue
;
157 squidaio_request_t
*head
, **tailp
;
162 NULL
, &request_queue2
.head
164 static squidaio_request_queue_t done_queue
;
168 squidaio_request_t
*head
, **tailp
;
173 NULL
, &done_requests
.head
176 static HANDLE main_thread
;
178 static MemAllocator
*
179 squidaio_get_pool(int size
)
181 if (size
<= AIO_LARGE_BUFS
) {
182 if (size
<= AIO_MICRO_BUFS
)
183 return squidaio_micro_bufs
;
184 else if (size
<= AIO_TINY_BUFS
)
185 return squidaio_tiny_bufs
;
186 else if (size
<= AIO_SMALL_BUFS
)
187 return squidaio_small_bufs
;
188 else if (size
<= AIO_MEDIUM_BUFS
)
189 return squidaio_medium_bufs
;
191 return squidaio_large_bufs
;
198 squidaio_xmalloc(int size
)
203 if ((pool
= squidaio_get_pool(size
)) != NULL
) {
212 squidaio_xstrdup(const char *str
)
215 int len
= strlen(str
) + 1;
217 p
= (char *)squidaio_xmalloc(len
);
218 strncpy(p
, str
, len
);
224 squidaio_xfree(void *p
, int size
)
228 if ((pool
= squidaio_get_pool(size
)) != NULL
) {
235 squidaio_xstrfree(char *str
)
238 int len
= strlen(str
) + 1;
240 if ((pool
= squidaio_get_pool(len
)) != NULL
) {
250 squidaio_thread_t
*threadp
;
252 if (squidaio_initialised
)
255 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
256 GetCurrentThread(), /* pseudo handle to copy */
257 GetCurrentProcess(), /* pseudo handle, don't close */
259 0, /* required access */
260 FALSE
, /* child process's don't inherit the handle */
261 DUPLICATE_SAME_ACCESS
)) {
263 fatal("Couldn't get current thread handle");
266 /* Initialize request queue */
267 if ((request_queue
.mutex
= CreateMutex(NULL
, /* no inheritance */
268 FALSE
, /* start unowned (as per mutex_init) */
271 fatal("Failed to create mutex");
274 if ((request_queue
.cond
= CreateEvent(NULL
, /* no inheritance */
275 FALSE
, /* auto signal reset - which I think is pthreads like ? */
276 FALSE
, /* start non signaled */
279 fatal("Failed to create condition variable");
282 request_queue
.head
= NULL
;
284 request_queue
.tailp
= &request_queue
.head
;
286 request_queue
.requests
= 0;
288 request_queue
.blocked
= 0;
290 /* Initialize done queue */
292 if ((done_queue
.mutex
= CreateMutex(NULL
, /* no inheritance */
293 FALSE
, /* start unowned (as per mutex_init) */
296 fatal("Failed to create mutex");
299 if ((done_queue
.cond
= CreateEvent(NULL
, /* no inheritance */
300 TRUE
, /* manually signaled - which I think is pthreads like ? */
301 FALSE
, /* start non signaled */
304 fatal("Failed to create condition variable");
307 done_queue
.head
= NULL
;
309 done_queue
.tailp
= &done_queue
.head
;
311 done_queue
.requests
= 0;
313 done_queue
.blocked
= 0;
315 CommIO::NotifyIOCompleted();
317 /* Create threads and get them to sit in their wait loop */
318 squidaio_thread_pool
= memPoolCreate("aio_thread", sizeof(squidaio_thread_t
));
322 for (i
= 0; i
< NUMTHREADS
; i
++) {
323 threadp
= (squidaio_thread_t
*)squidaio_thread_pool
->alloc();
324 threadp
->status
= _THREAD_STARTING
;
325 threadp
->current_req
= NULL
;
326 threadp
->requests
= 0;
327 threadp
->next
= threads
;
330 if ((threadp
->thread
= CreateThread(NULL
, /* no security attributes */
331 0, /* use default stack size */
332 squidaio_thread_loop
, /* thread function */
333 threadp
, /* argument to thread function */
334 0, /* use default creation flags */
335 &(threadp
->dwThreadId
)) /* returns the thread identifier */
337 fprintf(stderr
, "Thread creation failed\n");
338 threadp
->status
= _THREAD_FAILED
;
342 /* Set the new thread priority above parent process */
343 SetThreadPriority(threadp
->thread
,THREAD_PRIORITY_ABOVE_NORMAL
);
346 /* Create request pool */
347 squidaio_request_pool
= memPoolCreate("aio_request", sizeof(squidaio_request_t
));
349 squidaio_large_bufs
= memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS
);
351 squidaio_medium_bufs
= memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS
);
353 squidaio_small_bufs
= memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS
);
355 squidaio_tiny_bufs
= memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS
);
357 squidaio_micro_bufs
= memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS
);
359 squidaio_initialised
= 1;
363 squidaio_shutdown(void)
365 squidaio_thread_t
*threadp
;
369 if (!squidaio_initialised
)
372 /* This is the same as in squidaio_sync */
374 squidaio_poll_queues();
375 } while (request_queue_len
> 0);
377 hthreads
= (HANDLE
*) xcalloc (NUMTHREADS
, sizeof (HANDLE
));
381 for (i
= 0; i
< NUMTHREADS
; i
++) {
383 hthreads
[i
] = threadp
->thread
;
384 threadp
= threadp
->next
;
387 ReleaseMutex(request_queue
.mutex
);
388 ResetEvent(request_queue
.cond
);
389 ReleaseMutex(done_queue
.mutex
);
390 ResetEvent(done_queue
.cond
);
393 WaitForMultipleObjects(NUMTHREADS
, hthreads
, TRUE
, 2000);
395 for (i
= 0; i
< NUMTHREADS
; i
++) {
396 CloseHandle(hthreads
[i
]);
399 CloseHandle(main_thread
);
400 CommIO::NotifyIOClose();
402 squidaio_initialised
= 0;
407 squidaio_thread_loop(LPVOID lpParam
)
409 squidaio_thread_t
*threadp
= (squidaio_thread_t
*)lpParam
;
410 squidaio_request_t
*request
;
411 HANDLE cond
; /* local copy of the event queue because win32 event handles
412 * don't atomically release the mutex as cond variables do. */
414 /* lock the thread info */
416 if (WAIT_FAILED
== WaitForSingleObject(request_queue
.mutex
, INFINITE
)) {
417 fatal("Can't get ownership of mutex\n");
420 /* duplicate the handle */
421 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
422 request_queue
.cond
, /* handle to copy */
423 GetCurrentProcess(), /* pseudo handle, don't close */
425 0, /* required access */
426 FALSE
, /* child process's don't inherit the handle */
427 DUPLICATE_SAME_ACCESS
))
428 fatal("Can't duplicate mutex handle\n");
430 if (!ReleaseMutex(request_queue
.mutex
)) {
432 fatal("Can't release mutex\n");
439 threadp
->current_req
= request
= NULL
;
441 /* Get a request to process */
442 threadp
->status
= _THREAD_WAITING
;
445 CloseHandle(request_queue
.mutex
);
450 rv
= WaitForSingleObject(request_queue
.mutex
, INFINITE
);
452 if (rv
== WAIT_FAILED
) {
457 while (!request_queue
.head
) {
458 if (!ReleaseMutex(request_queue
.mutex
)) {
460 threadp
->status
= _THREAD_FAILED
;
465 rv
= WaitForSingleObject(cond
, INFINITE
);
467 if (rv
== WAIT_FAILED
) {
472 rv
= WaitForSingleObject(request_queue
.mutex
, INFINITE
);
474 if (rv
== WAIT_FAILED
) {
480 request
= request_queue
.head
;
483 request_queue
.head
= request
->next
;
485 if (!request_queue
.head
)
486 request_queue
.tailp
= &request_queue
.head
;
488 if (!ReleaseMutex(request_queue
.mutex
)) {
495 /* process the request */
496 threadp
->status
= _THREAD_BUSY
;
498 request
->next
= NULL
;
500 threadp
->current_req
= request
;
504 if (!request
->cancelled
) {
505 switch (request
->request_type
) {
508 squidaio_do_open(request
);
512 squidaio_do_read(request
);
516 squidaio_do_write(request
);
520 squidaio_do_close(request
);
525 case _AIO_OP_TRUNCATE
:
526 squidaio_do_truncate(request
);
531 squidaio_do_unlink(request
);
535 #if AIO_OPENDIR /* Opendir not implemented yet */
537 case _AIO_OP_OPENDIR
:
538 squidaio_do_opendir(request
);
543 squidaio_do_stat(request
);
548 request
->err
= EINVAL
;
551 } else { /* cancelled */
553 request
->err
= EINTR
;
556 threadp
->status
= _THREAD_DONE
;
557 /* put the request in the done queue */
558 rv
= WaitForSingleObject(done_queue
.mutex
, INFINITE
);
560 if (rv
== WAIT_FAILED
) {
565 *done_queue
.tailp
= request
;
566 done_queue
.tailp
= &request
->next
;
568 if (!ReleaseMutex(done_queue
.mutex
)) {
573 CommIO::NotifyIOCompleted();
576 } /* while forever */
581 } /* squidaio_thread_loop */
584 squidaio_queue_request(squidaio_request_t
* request
)
586 static int high_start
= 0;
587 debug(43, 9) ("squidaio_queue_request: %p type=%d result=%p\n",
588 request
, request
->request_type
, request
->resultp
);
589 /* Mark it as not executed (failing result, no error) */
592 /* Internal housekeeping */
593 request_queue_len
+= 1;
594 request
->resultp
->_data
= request
;
595 /* Play some tricks with the request_queue2 queue */
596 request
->next
= NULL
;
598 if (WaitForSingleObject(request_queue
.mutex
, 0) == WAIT_OBJECT_0
) {
599 if (request_queue2
.head
) {
600 /* Grab blocked requests */
601 *request_queue
.tailp
= request_queue2
.head
;
602 request_queue
.tailp
= request_queue2
.tailp
;
605 /* Enqueue request */
606 *request_queue
.tailp
= request
;
608 request_queue
.tailp
= &request
->next
;
610 if (!SetEvent(request_queue
.cond
))
611 fatal("Couldn't push queue");
613 if (!ReleaseMutex(request_queue
.mutex
)) {
614 /* unexpected error */
615 fatal("Couldn't push queue");
620 if (request_queue2
.head
) {
621 /* Clear queue of blocked requests */
622 request_queue2
.head
= NULL
;
623 request_queue2
.tailp
= &request_queue2
.head
;
626 /* Oops, the request queue is blocked, use request_queue2 */
627 *request_queue2
.tailp
= request
;
628 request_queue2
.tailp
= &request
->next
;
631 if (request_queue2
.head
) {
632 static int filter
= 0;
633 static int filter_limit
= 8;
635 if (++filter
>= filter_limit
) {
636 filter_limit
+= filter
;
638 debug(43, 1) ("squidaio_queue_request: WARNING - Queue congestion\n");
642 /* Warn if out of threads */
643 if (request_queue_len
> MAGIC1
) {
644 static int last_warn
= 0;
645 static int queue_high
, queue_low
;
647 if (high_start
== 0) {
648 high_start
= (int)squid_curtime
;
649 queue_high
= request_queue_len
;
650 queue_low
= request_queue_len
;
653 if (request_queue_len
> queue_high
)
654 queue_high
= request_queue_len
;
656 if (request_queue_len
< queue_low
)
657 queue_low
= request_queue_len
;
659 if (squid_curtime
>= (last_warn
+ 15) &&
660 squid_curtime
>= (high_start
+ 5)) {
661 debug(43, 1) ("squidaio_queue_request: WARNING - Disk I/O overloading\n");
663 if (squid_curtime
>= (high_start
+ 15))
664 debug(43, 1) ("squidaio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%ld\n",
665 request_queue_len
, queue_high
, queue_low
, (long int) (squid_curtime
- high_start
));
667 last_warn
= (int)squid_curtime
;
673 /* Warn if seriously overloaded */
674 if (request_queue_len
> RIDICULOUS_LENGTH
) {
675 debug(43, 0) ("squidaio_queue_request: Async request queue growing uncontrollably!\n");
676 debug(43, 0) ("squidaio_queue_request: Syncing pending I/O operations.. (blocking)\n");
678 debug(43, 0) ("squidaio_queue_request: Synced\n");
680 } /* squidaio_queue_request */
683 squidaio_cleanup_request(squidaio_request_t
* requestp
)
685 squidaio_result_t
*resultp
= requestp
->resultp
;
686 int cancelled
= requestp
->cancelled
;
688 /* Free allocated structures and copy data back to user space if the */
689 /* request hasn't been cancelled */
691 switch (requestp
->request_type
) {
695 if (!cancelled
&& requestp
->ret
== 0)
697 xmemcpy(requestp
->statp
, requestp
->tmpstatp
, sizeof(struct stat
));
699 squidaio_xfree(requestp
->tmpstatp
, sizeof(struct stat
));
701 squidaio_xstrfree(requestp
->path
);
706 if (cancelled
&& requestp
->ret
>= 0)
707 /* The open() was cancelled but completed */
708 close(requestp
->ret
);
710 squidaio_xstrfree(requestp
->path
);
715 if (cancelled
&& requestp
->ret
< 0)
716 /* The close() was cancelled and never got executed */
723 case _AIO_OP_TRUNCATE
:
725 case _AIO_OP_OPENDIR
:
726 squidaio_xstrfree(requestp
->path
);
740 if (resultp
!= NULL
&& !cancelled
) {
741 resultp
->aio_return
= requestp
->ret
;
742 resultp
->aio_errno
= requestp
->err
;
745 squidaio_request_pool
->free(requestp
);
746 } /* squidaio_cleanup_request */
750 squidaio_cancel(squidaio_result_t
* resultp
)
752 squidaio_request_t
*request
= (squidaio_request_t
*)resultp
->_data
;
754 if (request
&& request
->resultp
== resultp
) {
755 debug(43, 9) ("squidaio_cancel: %p type=%d result=%p\n",
756 request
, request
->request_type
, request
->resultp
);
757 request
->cancelled
= 1;
758 request
->resultp
= NULL
;
759 resultp
->_data
= NULL
;
760 resultp
->result_type
= _AIO_OP_NONE
;
765 } /* squidaio_cancel */
769 squidaio_open(const char *path
, int oflag
, mode_t mode
, squidaio_result_t
* resultp
)
772 squidaio_request_t
*requestp
;
774 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
776 requestp
->path
= (char *) squidaio_xstrdup(path
);
778 requestp
->oflag
= oflag
;
780 requestp
->mode
= mode
;
782 requestp
->resultp
= resultp
;
784 requestp
->request_type
= _AIO_OP_OPEN
;
786 requestp
->cancelled
= 0;
788 resultp
->result_type
= _AIO_OP_OPEN
;
790 squidaio_queue_request(requestp
);
797 squidaio_do_open(squidaio_request_t
* requestp
)
799 requestp
->ret
= open(requestp
->path
, requestp
->oflag
, requestp
->mode
);
800 requestp
->err
= errno
;
805 squidaio_read(int fd
, char *bufp
, int bufs
, off_t offset
, int whence
, squidaio_result_t
* resultp
)
807 squidaio_request_t
*requestp
;
809 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
813 requestp
->bufferp
= bufp
;
815 requestp
->buflen
= bufs
;
817 requestp
->offset
= offset
;
819 requestp
->whence
= whence
;
821 requestp
->resultp
= resultp
;
823 requestp
->request_type
= _AIO_OP_READ
;
825 requestp
->cancelled
= 0;
827 resultp
->result_type
= _AIO_OP_READ
;
829 squidaio_queue_request(requestp
);
836 squidaio_do_read(squidaio_request_t
* requestp
)
838 lseek(requestp
->fd
, requestp
->offset
, requestp
->whence
);
840 if (!ReadFile((HANDLE
)_get_osfhandle(requestp
->fd
), requestp
->bufferp
,
841 requestp
->buflen
, (LPDWORD
)&requestp
->ret
, NULL
)) {
842 WIN32_maperror(GetLastError());
846 requestp
->err
= errno
;
851 squidaio_write(int fd
, char *bufp
, int bufs
, off_t offset
, int whence
, squidaio_result_t
* resultp
)
853 squidaio_request_t
*requestp
;
855 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
859 requestp
->bufferp
= bufp
;
861 requestp
->buflen
= bufs
;
863 requestp
->offset
= offset
;
865 requestp
->whence
= whence
;
867 requestp
->resultp
= resultp
;
869 requestp
->request_type
= _AIO_OP_WRITE
;
871 requestp
->cancelled
= 0;
873 resultp
->result_type
= _AIO_OP_WRITE
;
875 squidaio_queue_request(requestp
);
882 squidaio_do_write(squidaio_request_t
* requestp
)
884 if (!WriteFile((HANDLE
)_get_osfhandle(requestp
->fd
), requestp
->bufferp
,
885 requestp
->buflen
, (LPDWORD
)&requestp
->ret
, NULL
)) {
886 WIN32_maperror(GetLastError());
890 requestp
->err
= errno
;
895 squidaio_close(int fd
, squidaio_result_t
* resultp
)
897 squidaio_request_t
*requestp
;
899 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
903 requestp
->resultp
= resultp
;
905 requestp
->request_type
= _AIO_OP_CLOSE
;
907 requestp
->cancelled
= 0;
909 resultp
->result_type
= _AIO_OP_CLOSE
;
911 squidaio_queue_request(requestp
);
918 squidaio_do_close(squidaio_request_t
* requestp
)
920 if((requestp
->ret
= close(requestp
->fd
)) < 0) {
921 debug(43, 0) ("squidaio_do_close: FD %d, errno %d\n", requestp
->fd
, errno
);
925 requestp
->err
= errno
;
931 squidaio_stat(const char *path
, struct stat
*sb
, squidaio_result_t
* resultp
)
934 squidaio_request_t
*requestp
;
936 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
938 requestp
->path
= (char *) squidaio_xstrdup(path
);
940 requestp
->statp
= sb
;
942 requestp
->tmpstatp
= (struct stat
*) squidaio_xmalloc(sizeof(struct stat
));
944 requestp
->resultp
= resultp
;
946 requestp
->request_type
= _AIO_OP_STAT
;
948 requestp
->cancelled
= 0;
950 resultp
->result_type
= _AIO_OP_STAT
;
952 squidaio_queue_request(requestp
);
959 squidaio_do_stat(squidaio_request_t
* requestp
)
961 requestp
->ret
= stat(requestp
->path
, requestp
->tmpstatp
);
962 requestp
->err
= errno
;
968 squidaio_truncate(const char *path
, off_t length
, squidaio_result_t
* resultp
)
971 squidaio_request_t
*requestp
;
973 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
975 requestp
->path
= (char *) squidaio_xstrdup(path
);
977 requestp
->offset
= length
;
979 requestp
->resultp
= resultp
;
981 requestp
->request_type
= _AIO_OP_TRUNCATE
;
983 requestp
->cancelled
= 0;
985 resultp
->result_type
= _AIO_OP_TRUNCATE
;
987 squidaio_queue_request(requestp
);
994 squidaio_do_truncate(squidaio_request_t
* requestp
)
996 requestp
->ret
= truncate(requestp
->path
, requestp
->offset
);
997 requestp
->err
= errno
;
1003 squidaio_unlink(const char *path
, squidaio_result_t
* resultp
)
1006 squidaio_request_t
*requestp
;
1008 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
1010 requestp
->path
= squidaio_xstrdup(path
);
1012 requestp
->resultp
= resultp
;
1014 requestp
->request_type
= _AIO_OP_UNLINK
;
1016 requestp
->cancelled
= 0;
1018 resultp
->result_type
= _AIO_OP_UNLINK
;
1020 squidaio_queue_request(requestp
);
1027 squidaio_do_unlink(squidaio_request_t
* requestp
)
1029 requestp
->ret
= unlink(requestp
->path
);
1030 requestp
->err
= errno
;
1036 /* XXX squidaio_opendir NOT implemented yet.. */
1039 squidaio_opendir(const char *path
, squidaio_result_t
* resultp
)
1041 squidaio_request_t
*requestp
;
1044 requestp
= squidaio_request_pool
->alloc();
1046 resultp
->result_type
= _AIO_OP_OPENDIR
;
1052 squidaio_do_opendir(squidaio_request_t
* requestp
)
1054 /* NOT IMPLEMENTED */
1060 squidaio_poll_queues(void)
1062 /* kick "overflow" request queue */
1064 if (request_queue2
.head
&&
1065 (WaitForSingleObject(request_queue
.mutex
, 0 )== WAIT_OBJECT_0
)) {
1066 *request_queue
.tailp
= request_queue2
.head
;
1067 request_queue
.tailp
= request_queue2
.tailp
;
1069 if (!SetEvent(request_queue
.cond
))
1070 fatal("couldn't push queue\n");
1072 if (!ReleaseMutex(request_queue
.mutex
)) {
1073 /* unexpected error */
1077 request_queue2
.head
= NULL
;
1078 request_queue2
.tailp
= &request_queue2
.head
;
1081 /* poll done queue */
1082 if (done_queue
.head
&&
1083 (WaitForSingleObject(done_queue
.mutex
, 0)==WAIT_OBJECT_0
)) {
1085 struct squidaio_request_t
*requests
= done_queue
.head
;
1086 done_queue
.head
= NULL
;
1087 done_queue
.tailp
= &done_queue
.head
;
1089 if (!ReleaseMutex(done_queue
.mutex
)) {
1090 /* unexpected error */
1094 *done_requests
.tailp
= requests
;
1095 request_queue_len
-= 1;
1097 while (requests
->next
) {
1098 requests
= requests
->next
;
1099 request_queue_len
-= 1;
1102 done_requests
.tailp
= &requests
->next
;
1107 squidaio_poll_done(void)
1109 squidaio_request_t
*request
;
1110 squidaio_result_t
*resultp
;
1115 request
= done_requests
.head
;
1117 if (request
== NULL
&& !polled
) {
1118 CommIO::ResetNotifications();
1119 squidaio_poll_queues();
1121 request
= done_requests
.head
;
1128 debug(43, 9) ("squidaio_poll_done: %p type=%d result=%p\n",
1129 request
, request
->request_type
, request
->resultp
);
1130 done_requests
.head
= request
->next
;
1132 if (!done_requests
.head
)
1133 done_requests
.tailp
= &done_requests
.head
;
1135 resultp
= request
->resultp
;
1137 cancelled
= request
->cancelled
;
1139 squidaio_debug(request
);
1141 debug(43, 5) ("DONE: %d -> %d\n", request
->ret
, request
->err
);
1143 squidaio_cleanup_request(request
);
1149 } /* squidaio_poll_done */
1152 squidaio_operations_pending(void)
1154 return request_queue_len
+ (done_requests
.head
? 1 : 0);
1160 /* XXX This might take a while if the queue is large.. */
1163 squidaio_poll_queues();
1164 } while (request_queue_len
> 0);
1166 return squidaio_operations_pending();
1170 squidaio_get_queue_len(void)
1172 return request_queue_len
;
1176 squidaio_debug(squidaio_request_t
* request
)
1178 switch (request
->request_type
) {
1181 debug(43, 5) ("OPEN of %s to FD %d\n", request
->path
, request
->ret
);
1185 debug(43, 5) ("READ on fd: %d\n", request
->fd
);
1189 debug(43, 5) ("WRITE on fd: %d\n", request
->fd
);
1193 debug(43, 5) ("CLOSE of fd: %d\n", request
->fd
);
1196 case _AIO_OP_UNLINK
:
1197 debug(43, 5) ("UNLINK of %s\n", request
->path
);
1200 case _AIO_OP_TRUNCATE
:
1201 debug(43, 5) ("UNLINK of %s\n", request
->path
);
1210 squidaio_stats(StoreEntry
* sentry
)
1212 squidaio_thread_t
*threadp
;
1215 if (!squidaio_initialised
)
1218 storeAppendPrintf(sentry
, "\n\nThreads Status:\n");
1220 storeAppendPrintf(sentry
, "#\tID\t# Requests\n");
1224 for (i
= 0; i
< NUMTHREADS
; i
++) {
1225 storeAppendPrintf(sentry
, "%i\t0x%lx\t%ld\n", i
+ 1, threadp
->dwThreadId
, threadp
->requests
);
1226 threadp
= threadp
->next
;