2 * $Id: aiops_win32.cc,v 1.1 2006/09/06 19:36:42 serassio Exp $
4 * DEBUG: section 43 AIOPS
5 * AUTHOR: Stewart Forster <slf@connect.com.au>
7 * SQUID Web Proxy Cache http://www.squid-cache.org/
8 * ----------------------------------------------------------
10 * Squid is the result of efforts by numerous individuals from
11 * the Internet community; see the CONTRIBUTORS file for full
12 * details. Many organizations have provided support for Squid's
13 * development; see the SPONSORS file for full details. Squid is
14 * Copyrighted (C) 2001 by the Regents of the University of
15 * California; see the COPYRIGHT file for full details. Squid
16 * incorporates software developed and/or copyrighted by other
17 * sources; see the CREDITS file for full details.
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
36 #include "squid_windows.h"
37 #include "DiskThreads.h"
40 #include <sys/types.h>
47 #include "SquidTime.h"
50 #define RIDICULOUS_LENGTH 4096
52 enum _squidaio_thread_status
{
59 typedef enum _squidaio_thread_status squidaio_thread_status
;
61 typedef struct squidaio_request_t
64 struct squidaio_request_t
*next
;
65 squidaio_request_type request_type
;
79 struct stat
*tmpstatp
;
82 squidaio_result_t
*resultp
;
87 typedef struct squidaio_request_queue_t
90 HANDLE cond
; /* See Event objects */
91 squidaio_request_t
*volatile head
;
92 squidaio_request_t
*volatile *volatile tailp
;
93 unsigned long requests
;
94 unsigned long blocked
; /* main failed to lock the queue */
97 squidaio_request_queue_t
;
99 typedef struct squidaio_thread_t squidaio_thread_t
;
101 struct squidaio_thread_t
103 squidaio_thread_t
*next
;
105 DWORD dwThreadId
; /* thread ID */
106 squidaio_thread_status status
;
108 struct squidaio_request_t
*current_req
;
109 unsigned long requests
;
113 static void squidaio_queue_request(squidaio_request_t
*);
114 static void squidaio_cleanup_request(squidaio_request_t
*);
115 static DWORD WINAPI
squidaio_thread_loop( LPVOID lpParam
);
116 static void squidaio_do_open(squidaio_request_t
*);
117 static void squidaio_do_read(squidaio_request_t
*);
118 static void squidaio_do_write(squidaio_request_t
*);
119 static void squidaio_do_close(squidaio_request_t
*);
120 static void squidaio_do_stat(squidaio_request_t
*);
122 static void squidaio_do_truncate(squidaio_request_t
*);
124 static void squidaio_do_unlink(squidaio_request_t
*);
127 static void *squidaio_do_opendir(squidaio_request_t
*);
129 static void squidaio_debug(squidaio_request_t
*);
130 static void squidaio_poll_queues(void);
132 static squidaio_thread_t
*threads
= NULL
;
133 static int squidaio_initialised
= 0;
136 #define AIO_LARGE_BUFS 16384
137 #define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
138 #define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
139 #define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
140 #define AIO_MICRO_BUFS 128
142 static MemAllocator
*squidaio_large_bufs
= NULL
; /* 16K */
143 static MemAllocator
*squidaio_medium_bufs
= NULL
; /* 8K */
144 static MemAllocator
*squidaio_small_bufs
= NULL
; /* 4K */
145 static MemAllocator
*squidaio_tiny_bufs
= NULL
; /* 2K */
146 static MemAllocator
*squidaio_micro_bufs
= NULL
; /* 128K */
148 static int request_queue_len
= 0;
149 static MemAllocator
*squidaio_request_pool
= NULL
;
150 static MemAllocator
*squidaio_thread_pool
= NULL
;
151 static squidaio_request_queue_t request_queue
;
155 squidaio_request_t
*head
, **tailp
;
160 NULL
, &request_queue2
.head
162 static squidaio_request_queue_t done_queue
;
166 squidaio_request_t
*head
, **tailp
;
171 NULL
, &done_requests
.head
174 static HANDLE main_thread
;
176 static MemAllocator
*
177 squidaio_get_pool(int size
)
179 if (size
<= AIO_LARGE_BUFS
) {
180 if (size
<= AIO_MICRO_BUFS
)
181 return squidaio_micro_bufs
;
182 else if (size
<= AIO_TINY_BUFS
)
183 return squidaio_tiny_bufs
;
184 else if (size
<= AIO_SMALL_BUFS
)
185 return squidaio_small_bufs
;
186 else if (size
<= AIO_MEDIUM_BUFS
)
187 return squidaio_medium_bufs
;
189 return squidaio_large_bufs
;
196 squidaio_xmalloc(int size
)
201 if ((pool
= squidaio_get_pool(size
)) != NULL
) {
210 squidaio_xstrdup(const char *str
)
213 int len
= strlen(str
) + 1;
215 p
= (char *)squidaio_xmalloc(len
);
216 strncpy(p
, str
, len
);
222 squidaio_xfree(void *p
, int size
)
226 if ((pool
= squidaio_get_pool(size
)) != NULL
) {
233 squidaio_xstrfree(char *str
)
236 int len
= strlen(str
) + 1;
238 if ((pool
= squidaio_get_pool(len
)) != NULL
) {
248 squidaio_thread_t
*threadp
;
250 if (squidaio_initialised
)
253 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
254 GetCurrentThread(), /* pseudo handle to copy */
255 GetCurrentProcess(), /* pseudo handle, don't close */
257 0, /* required access */
258 FALSE
, /* child process's don't inherit the handle */
259 DUPLICATE_SAME_ACCESS
)) {
261 fatal("Couldn't get current thread handle");
264 /* Initialize request queue */
265 if ((request_queue
.mutex
= CreateMutex(NULL
, /* no inheritance */
266 FALSE
, /* start unowned (as per mutex_init) */
269 fatal("Failed to create mutex");
272 if ((request_queue
.cond
= CreateEvent(NULL
, /* no inheritance */
273 FALSE
, /* auto signal reset - which I think is pthreads like ? */
274 FALSE
, /* start non signaled */
277 fatal("Failed to create condition variable");
280 request_queue
.head
= NULL
;
282 request_queue
.tailp
= &request_queue
.head
;
284 request_queue
.requests
= 0;
286 request_queue
.blocked
= 0;
288 /* Initialize done queue */
290 if ((done_queue
.mutex
= CreateMutex(NULL
, /* no inheritance */
291 FALSE
, /* start unowned (as per mutex_init) */
294 fatal("Failed to create mutex");
297 if ((done_queue
.cond
= CreateEvent(NULL
, /* no inheritance */
298 TRUE
, /* manually signaled - which I think is pthreads like ? */
299 FALSE
, /* start non signaled */
302 fatal("Failed to create condition variable");
305 done_queue
.head
= NULL
;
307 done_queue
.tailp
= &done_queue
.head
;
309 done_queue
.requests
= 0;
311 done_queue
.blocked
= 0;
313 CommIO::NotifyIOCompleted();
315 /* Create threads and get them to sit in their wait loop */
316 squidaio_thread_pool
= memPoolCreate("aio_thread", sizeof(squidaio_thread_t
));
320 for (i
= 0; i
< NUMTHREADS
; i
++) {
321 threadp
= (squidaio_thread_t
*)squidaio_thread_pool
->alloc();
322 threadp
->status
= _THREAD_STARTING
;
323 threadp
->current_req
= NULL
;
324 threadp
->requests
= 0;
325 threadp
->next
= threads
;
328 if ((threadp
->thread
= CreateThread(NULL
, /* no security attributes */
329 0, /* use default stack size */
330 squidaio_thread_loop
, /* thread function */
331 threadp
, /* argument to thread function */
332 0, /* use default creation flags */
333 &(threadp
->dwThreadId
)) /* returns the thread identifier */
335 fprintf(stderr
, "Thread creation failed\n");
336 threadp
->status
= _THREAD_FAILED
;
340 /* Set the new thread priority above parent process */
341 SetThreadPriority(threadp
->thread
,THREAD_PRIORITY_ABOVE_NORMAL
);
344 /* Create request pool */
345 squidaio_request_pool
= memPoolCreate("aio_request", sizeof(squidaio_request_t
));
347 squidaio_large_bufs
= memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS
);
349 squidaio_medium_bufs
= memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS
);
351 squidaio_small_bufs
= memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS
);
353 squidaio_tiny_bufs
= memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS
);
355 squidaio_micro_bufs
= memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS
);
357 squidaio_initialised
= 1;
361 squidaio_shutdown(void)
363 squidaio_thread_t
*threadp
;
367 if (!squidaio_initialised
)
370 /* This is the same as in squidaio_sync */
372 squidaio_poll_queues();
373 } while (request_queue_len
> 0);
375 hthreads
= (HANDLE
*) xcalloc (NUMTHREADS
, sizeof (HANDLE
));
379 for (i
= 0; i
< NUMTHREADS
; i
++) {
381 hthreads
[i
] = threadp
->thread
;
382 threadp
= threadp
->next
;
385 ReleaseMutex(request_queue
.mutex
);
386 ResetEvent(request_queue
.cond
);
387 ReleaseMutex(done_queue
.mutex
);
388 ResetEvent(done_queue
.cond
);
391 WaitForMultipleObjects(NUMTHREADS
, hthreads
, TRUE
, 2000);
393 for (i
= 0; i
< NUMTHREADS
; i
++) {
394 CloseHandle(hthreads
[i
]);
397 CloseHandle(main_thread
);
398 CommIO::NotifyIOClose();
400 squidaio_initialised
= 0;
405 squidaio_thread_loop(LPVOID lpParam
)
407 squidaio_thread_t
*threadp
= (squidaio_thread_t
*)lpParam
;
408 squidaio_request_t
*request
;
409 HANDLE cond
; /* local copy of the event queue because win32 event handles
410 * don't atomically release the mutex as cond variables do. */
412 /* lock the thread info */
414 if (WAIT_FAILED
== WaitForSingleObject(request_queue
.mutex
, INFINITE
)) {
415 fatal("Can't get ownership of mutex\n");
418 /* duplicate the handle */
419 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
420 request_queue
.cond
, /* handle to copy */
421 GetCurrentProcess(), /* pseudo handle, don't close */
423 0, /* required access */
424 FALSE
, /* child process's don't inherit the handle */
425 DUPLICATE_SAME_ACCESS
))
426 fatal("Can't duplicate mutex handle\n");
428 if (!ReleaseMutex(request_queue
.mutex
)) {
430 fatal("Can't release mutex\n");
437 threadp
->current_req
= request
= NULL
;
439 /* Get a request to process */
440 threadp
->status
= _THREAD_WAITING
;
443 CloseHandle(request_queue
.mutex
);
448 rv
= WaitForSingleObject(request_queue
.mutex
, INFINITE
);
450 if (rv
== WAIT_FAILED
) {
455 while (!request_queue
.head
) {
456 if (!ReleaseMutex(request_queue
.mutex
)) {
458 threadp
->status
= _THREAD_FAILED
;
463 rv
= WaitForSingleObject(cond
, INFINITE
);
465 if (rv
== WAIT_FAILED
) {
470 rv
= WaitForSingleObject(request_queue
.mutex
, INFINITE
);
472 if (rv
== WAIT_FAILED
) {
478 request
= request_queue
.head
;
481 request_queue
.head
= request
->next
;
483 if (!request_queue
.head
)
484 request_queue
.tailp
= &request_queue
.head
;
486 if (!ReleaseMutex(request_queue
.mutex
)) {
493 /* process the request */
494 threadp
->status
= _THREAD_BUSY
;
496 request
->next
= NULL
;
498 threadp
->current_req
= request
;
502 if (!request
->cancelled
) {
503 switch (request
->request_type
) {
506 squidaio_do_open(request
);
510 squidaio_do_read(request
);
514 squidaio_do_write(request
);
518 squidaio_do_close(request
);
523 case _AIO_OP_TRUNCATE
:
524 squidaio_do_truncate(request
);
529 squidaio_do_unlink(request
);
533 #if AIO_OPENDIR /* Opendir not implemented yet */
535 case _AIO_OP_OPENDIR
:
536 squidaio_do_opendir(request
);
541 squidaio_do_stat(request
);
546 request
->err
= EINVAL
;
549 } else { /* cancelled */
551 request
->err
= EINTR
;
554 threadp
->status
= _THREAD_DONE
;
555 /* put the request in the done queue */
556 rv
= WaitForSingleObject(done_queue
.mutex
, INFINITE
);
558 if (rv
== WAIT_FAILED
) {
563 *done_queue
.tailp
= request
;
564 done_queue
.tailp
= &request
->next
;
566 if (!ReleaseMutex(done_queue
.mutex
)) {
571 CommIO::NotifyIOCompleted();
574 } /* while forever */
579 } /* squidaio_thread_loop */
582 squidaio_queue_request(squidaio_request_t
* request
)
584 static int high_start
= 0;
585 debug(43, 9) ("squidaio_queue_request: %p type=%d result=%p\n",
586 request
, request
->request_type
, request
->resultp
);
587 /* Mark it as not executed (failing result, no error) */
590 /* Internal housekeeping */
591 request_queue_len
+= 1;
592 request
->resultp
->_data
= request
;
593 /* Play some tricks with the request_queue2 queue */
594 request
->next
= NULL
;
596 if (WaitForSingleObject(request_queue
.mutex
, 0) == WAIT_OBJECT_0
) {
597 if (request_queue2
.head
) {
598 /* Grab blocked requests */
599 *request_queue
.tailp
= request_queue2
.head
;
600 request_queue
.tailp
= request_queue2
.tailp
;
603 /* Enqueue request */
604 *request_queue
.tailp
= request
;
606 request_queue
.tailp
= &request
->next
;
608 if (!SetEvent(request_queue
.cond
))
609 fatal("Couldn't push queue");
611 if (!ReleaseMutex(request_queue
.mutex
)) {
612 /* unexpected error */
613 fatal("Couldn't push queue");
618 if (request_queue2
.head
) {
619 /* Clear queue of blocked requests */
620 request_queue2
.head
= NULL
;
621 request_queue2
.tailp
= &request_queue2
.head
;
624 /* Oops, the request queue is blocked, use request_queue2 */
625 *request_queue2
.tailp
= request
;
626 request_queue2
.tailp
= &request
->next
;
629 if (request_queue2
.head
) {
630 static int filter
= 0;
631 static int filter_limit
= 8;
633 if (++filter
>= filter_limit
) {
634 filter_limit
+= filter
;
636 debug(43, 1) ("squidaio_queue_request: WARNING - Queue congestion\n");
640 /* Warn if out of threads */
641 if (request_queue_len
> MAGIC1
) {
642 static int last_warn
= 0;
643 static int queue_high
, queue_low
;
645 if (high_start
== 0) {
646 high_start
= (int)squid_curtime
;
647 queue_high
= request_queue_len
;
648 queue_low
= request_queue_len
;
651 if (request_queue_len
> queue_high
)
652 queue_high
= request_queue_len
;
654 if (request_queue_len
< queue_low
)
655 queue_low
= request_queue_len
;
657 if (squid_curtime
>= (last_warn
+ 15) &&
658 squid_curtime
>= (high_start
+ 5)) {
659 debug(43, 1) ("squidaio_queue_request: WARNING - Disk I/O overloading\n");
661 if (squid_curtime
>= (high_start
+ 15))
662 debug(43, 1) ("squidaio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%ld\n",
663 request_queue_len
, queue_high
, queue_low
, (long int) (squid_curtime
- high_start
));
665 last_warn
= (int)squid_curtime
;
671 /* Warn if seriously overloaded */
672 if (request_queue_len
> RIDICULOUS_LENGTH
) {
673 debug(43, 0) ("squidaio_queue_request: Async request queue growing uncontrollably!\n");
674 debug(43, 0) ("squidaio_queue_request: Syncing pending I/O operations.. (blocking)\n");
676 debug(43, 0) ("squidaio_queue_request: Synced\n");
678 } /* squidaio_queue_request */
681 squidaio_cleanup_request(squidaio_request_t
* requestp
)
683 squidaio_result_t
*resultp
= requestp
->resultp
;
684 int cancelled
= requestp
->cancelled
;
686 /* Free allocated structures and copy data back to user space if the */
687 /* request hasn't been cancelled */
689 switch (requestp
->request_type
) {
693 if (!cancelled
&& requestp
->ret
== 0)
695 xmemcpy(requestp
->statp
, requestp
->tmpstatp
, sizeof(struct stat
));
697 squidaio_xfree(requestp
->tmpstatp
, sizeof(struct stat
));
699 squidaio_xstrfree(requestp
->path
);
704 if (cancelled
&& requestp
->ret
>= 0)
705 /* The open() was cancelled but completed */
706 close(requestp
->ret
);
708 squidaio_xstrfree(requestp
->path
);
713 if (cancelled
&& requestp
->ret
< 0)
714 /* The close() was cancelled and never got executed */
721 case _AIO_OP_TRUNCATE
:
723 case _AIO_OP_OPENDIR
:
724 squidaio_xstrfree(requestp
->path
);
738 if (resultp
!= NULL
&& !cancelled
) {
739 resultp
->aio_return
= requestp
->ret
;
740 resultp
->aio_errno
= requestp
->err
;
743 squidaio_request_pool
->free(requestp
);
744 } /* squidaio_cleanup_request */
748 squidaio_cancel(squidaio_result_t
* resultp
)
750 squidaio_request_t
*request
= (squidaio_request_t
*)resultp
->_data
;
752 if (request
&& request
->resultp
== resultp
) {
753 debug(43, 9) ("squidaio_cancel: %p type=%d result=%p\n",
754 request
, request
->request_type
, request
->resultp
);
755 request
->cancelled
= 1;
756 request
->resultp
= NULL
;
757 resultp
->_data
= NULL
;
758 resultp
->result_type
= _AIO_OP_NONE
;
763 } /* squidaio_cancel */
767 squidaio_open(const char *path
, int oflag
, mode_t mode
, squidaio_result_t
* resultp
)
770 squidaio_request_t
*requestp
;
772 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
774 requestp
->path
= (char *) squidaio_xstrdup(path
);
776 requestp
->oflag
= oflag
;
778 requestp
->mode
= mode
;
780 requestp
->resultp
= resultp
;
782 requestp
->request_type
= _AIO_OP_OPEN
;
784 requestp
->cancelled
= 0;
786 resultp
->result_type
= _AIO_OP_OPEN
;
788 squidaio_queue_request(requestp
);
795 squidaio_do_open(squidaio_request_t
* requestp
)
797 requestp
->ret
= open(requestp
->path
, requestp
->oflag
, requestp
->mode
);
798 requestp
->err
= errno
;
803 squidaio_read(int fd
, char *bufp
, int bufs
, off_t offset
, int whence
, squidaio_result_t
* resultp
)
805 squidaio_request_t
*requestp
;
807 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
811 requestp
->bufferp
= bufp
;
813 requestp
->buflen
= bufs
;
815 requestp
->offset
= offset
;
817 requestp
->whence
= whence
;
819 requestp
->resultp
= resultp
;
821 requestp
->request_type
= _AIO_OP_READ
;
823 requestp
->cancelled
= 0;
825 resultp
->result_type
= _AIO_OP_READ
;
827 squidaio_queue_request(requestp
);
834 squidaio_do_read(squidaio_request_t
* requestp
)
836 lseek(requestp
->fd
, requestp
->offset
, requestp
->whence
);
838 if (!ReadFile((HANDLE
)_get_osfhandle(requestp
->fd
), requestp
->bufferp
,
839 requestp
->buflen
, (LPDWORD
)&requestp
->ret
, NULL
)) {
840 WIN32_maperror(GetLastError());
844 requestp
->err
= errno
;
849 squidaio_write(int fd
, char *bufp
, int bufs
, off_t offset
, int whence
, squidaio_result_t
* resultp
)
851 squidaio_request_t
*requestp
;
853 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
857 requestp
->bufferp
= bufp
;
859 requestp
->buflen
= bufs
;
861 requestp
->offset
= offset
;
863 requestp
->whence
= whence
;
865 requestp
->resultp
= resultp
;
867 requestp
->request_type
= _AIO_OP_WRITE
;
869 requestp
->cancelled
= 0;
871 resultp
->result_type
= _AIO_OP_WRITE
;
873 squidaio_queue_request(requestp
);
880 squidaio_do_write(squidaio_request_t
* requestp
)
882 if (!WriteFile((HANDLE
)_get_osfhandle(requestp
->fd
), requestp
->bufferp
,
883 requestp
->buflen
, (LPDWORD
)&requestp
->ret
, NULL
)) {
884 WIN32_maperror(GetLastError());
888 requestp
->err
= errno
;
893 squidaio_close(int fd
, squidaio_result_t
* resultp
)
895 squidaio_request_t
*requestp
;
897 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
901 requestp
->resultp
= resultp
;
903 requestp
->request_type
= _AIO_OP_CLOSE
;
905 requestp
->cancelled
= 0;
907 resultp
->result_type
= _AIO_OP_CLOSE
;
909 squidaio_queue_request(requestp
);
916 squidaio_do_close(squidaio_request_t
* requestp
)
918 if((requestp
->ret
= close(requestp
->fd
)) < 0) {
919 debug(43, 0) ("squidaio_do_close: FD %d, errno %d\n", requestp
->fd
, errno
);
923 requestp
->err
= errno
;
929 squidaio_stat(const char *path
, struct stat
*sb
, squidaio_result_t
* resultp
)
932 squidaio_request_t
*requestp
;
934 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
936 requestp
->path
= (char *) squidaio_xstrdup(path
);
938 requestp
->statp
= sb
;
940 requestp
->tmpstatp
= (struct stat
*) squidaio_xmalloc(sizeof(struct stat
));
942 requestp
->resultp
= resultp
;
944 requestp
->request_type
= _AIO_OP_STAT
;
946 requestp
->cancelled
= 0;
948 resultp
->result_type
= _AIO_OP_STAT
;
950 squidaio_queue_request(requestp
);
957 squidaio_do_stat(squidaio_request_t
* requestp
)
959 requestp
->ret
= stat(requestp
->path
, requestp
->tmpstatp
);
960 requestp
->err
= errno
;
966 squidaio_truncate(const char *path
, off_t length
, squidaio_result_t
* resultp
)
969 squidaio_request_t
*requestp
;
971 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
973 requestp
->path
= (char *) squidaio_xstrdup(path
);
975 requestp
->offset
= length
;
977 requestp
->resultp
= resultp
;
979 requestp
->request_type
= _AIO_OP_TRUNCATE
;
981 requestp
->cancelled
= 0;
983 resultp
->result_type
= _AIO_OP_TRUNCATE
;
985 squidaio_queue_request(requestp
);
992 squidaio_do_truncate(squidaio_request_t
* requestp
)
994 requestp
->ret
= truncate(requestp
->path
, requestp
->offset
);
995 requestp
->err
= errno
;
1001 squidaio_unlink(const char *path
, squidaio_result_t
* resultp
)
1004 squidaio_request_t
*requestp
;
1006 requestp
= (squidaio_request_t
*)squidaio_request_pool
->alloc();
1008 requestp
->path
= squidaio_xstrdup(path
);
1010 requestp
->resultp
= resultp
;
1012 requestp
->request_type
= _AIO_OP_UNLINK
;
1014 requestp
->cancelled
= 0;
1016 resultp
->result_type
= _AIO_OP_UNLINK
;
1018 squidaio_queue_request(requestp
);
1025 squidaio_do_unlink(squidaio_request_t
* requestp
)
1027 requestp
->ret
= unlink(requestp
->path
);
1028 requestp
->err
= errno
;
1034 /* XXX squidaio_opendir NOT implemented yet.. */
1037 squidaio_opendir(const char *path
, squidaio_result_t
* resultp
)
1039 squidaio_request_t
*requestp
;
1042 requestp
= squidaio_request_pool
->alloc();
1044 resultp
->result_type
= _AIO_OP_OPENDIR
;
1050 squidaio_do_opendir(squidaio_request_t
* requestp
)
1052 /* NOT IMPLEMENTED */
1058 squidaio_poll_queues(void)
1060 /* kick "overflow" request queue */
1062 if (request_queue2
.head
&&
1063 (WaitForSingleObject(request_queue
.mutex
, 0 )== WAIT_OBJECT_0
)) {
1064 *request_queue
.tailp
= request_queue2
.head
;
1065 request_queue
.tailp
= request_queue2
.tailp
;
1067 if (!SetEvent(request_queue
.cond
))
1068 fatal("couldn't push queue\n");
1070 if (!ReleaseMutex(request_queue
.mutex
)) {
1071 /* unexpected error */
1075 request_queue2
.head
= NULL
;
1076 request_queue2
.tailp
= &request_queue2
.head
;
1079 /* poll done queue */
1080 if (done_queue
.head
&&
1081 (WaitForSingleObject(done_queue
.mutex
, 0)==WAIT_OBJECT_0
)) {
1083 struct squidaio_request_t
*requests
= done_queue
.head
;
1084 done_queue
.head
= NULL
;
1085 done_queue
.tailp
= &done_queue
.head
;
1087 if (!ReleaseMutex(done_queue
.mutex
)) {
1088 /* unexpected error */
1092 *done_requests
.tailp
= requests
;
1093 request_queue_len
-= 1;
1095 while (requests
->next
) {
1096 requests
= requests
->next
;
1097 request_queue_len
-= 1;
1100 done_requests
.tailp
= &requests
->next
;
1105 squidaio_poll_done(void)
1107 squidaio_request_t
*request
;
1108 squidaio_result_t
*resultp
;
1113 request
= done_requests
.head
;
1115 if (request
== NULL
&& !polled
) {
1116 CommIO::ResetNotifications();
1117 squidaio_poll_queues();
1119 request
= done_requests
.head
;
1126 debug(43, 9) ("squidaio_poll_done: %p type=%d result=%p\n",
1127 request
, request
->request_type
, request
->resultp
);
1128 done_requests
.head
= request
->next
;
1130 if (!done_requests
.head
)
1131 done_requests
.tailp
= &done_requests
.head
;
1133 resultp
= request
->resultp
;
1135 cancelled
= request
->cancelled
;
1137 squidaio_debug(request
);
1139 debug(43, 5) ("DONE: %d -> %d\n", request
->ret
, request
->err
);
1141 squidaio_cleanup_request(request
);
1147 } /* squidaio_poll_done */
1150 squidaio_operations_pending(void)
1152 return request_queue_len
+ (done_requests
.head
? 1 : 0);
1158 /* XXX This might take a while if the queue is large.. */
1161 squidaio_poll_queues();
1162 } while (request_queue_len
> 0);
1164 return squidaio_operations_pending();
1168 squidaio_get_queue_len(void)
1170 return request_queue_len
;
1174 squidaio_debug(squidaio_request_t
* request
)
1176 switch (request
->request_type
) {
1179 debug(43, 5) ("OPEN of %s to FD %d\n", request
->path
, request
->ret
);
1183 debug(43, 5) ("READ on fd: %d\n", request
->fd
);
1187 debug(43, 5) ("WRITE on fd: %d\n", request
->fd
);
1191 debug(43, 5) ("CLOSE of fd: %d\n", request
->fd
);
1194 case _AIO_OP_UNLINK
:
1195 debug(43, 5) ("UNLINK of %s\n", request
->path
);
1198 case _AIO_OP_TRUNCATE
:
1199 debug(43, 5) ("UNLINK of %s\n", request
->path
);
1208 squidaio_stats(StoreEntry
* sentry
)
1210 squidaio_thread_t
*threadp
;
1213 if (!squidaio_initialised
)
1216 storeAppendPrintf(sentry
, "\n\nThreads Status:\n");
1218 storeAppendPrintf(sentry
, "#\tID\t# Requests\n");
1222 for (i
= 0; i
< NUMTHREADS
; i
++) {
1223 storeAppendPrintf(sentry
, "%i\t0x%lx\t%ld\n", i
+ 1, threadp
->dwThreadId
, threadp
->requests
);
1224 threadp
= threadp
->next
;