]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/DiskThreads/aiops_win32.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / DiskIO / DiskThreads / aiops_win32.cc
CommitLineData
595c7973 1/*
bde978a6 2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
595c7973 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
595c7973 7 */
8
bbc27441
AJ
9/* DEBUG: section 43 Windows AIOPS */
10
582c2af2 11#include "squid.h"
1ff991dc 12#include "DiskIO/DiskThreads/CommIO.h"
595c7973 13#include "DiskThreads.h"
1e1a9021 14#include "fd.h"
bb1373a6 15#include "mem/Pool.h"
4d5904f7 16#include "SquidConfig.h"
489520a9
AJ
17#include "SquidTime.h"
18#include "Store.h"
595c7973 19
1a30fdf5 20#include <cerrno>
074d6a40 21#include <csignal>
4d5904f7
FC
22#include <sys/stat.h>
23#include <fcntl.h>
4d5904f7 24#include <dirent.h>
595c7973 25
f53969cc 26#define RIDICULOUS_LENGTH 4096
595c7973 27
28enum _squidaio_thread_status {
29 _THREAD_STARTING = 0,
30 _THREAD_WAITING,
31 _THREAD_BUSY,
32 _THREAD_FAILED,
33 _THREAD_DONE
34};
35typedef enum _squidaio_thread_status squidaio_thread_status;
36
26ac0430 37typedef struct squidaio_request_t {
595c7973 38
39 struct squidaio_request_t *next;
40 squidaio_request_type request_type;
41 int cancelled;
42 char *path;
43 int oflag;
44 mode_t mode;
45 int fd;
46 char *bufferp;
47 char *tmpbufp;
ee139403 48 size_t buflen;
595c7973 49 off_t offset;
50 int whence;
51 int ret;
52 int err;
53
54 struct stat *tmpstatp;
55
56 struct stat *statp;
57 squidaio_result_t *resultp;
2fadd50d 58} squidaio_request_t;
595c7973 59
26ac0430 60typedef struct squidaio_request_queue_t {
595c7973 61 HANDLE mutex;
62 HANDLE cond; /* See Event objects */
63 squidaio_request_t *volatile head;
64 squidaio_request_t *volatile *volatile tailp;
65 unsigned long requests;
f53969cc 66 unsigned long blocked; /* main failed to lock the queue */
2fadd50d 67} squidaio_request_queue_t;
595c7973 68
69typedef struct squidaio_thread_t squidaio_thread_t;
70
26ac0430 71struct squidaio_thread_t {
595c7973 72 squidaio_thread_t *next;
73 HANDLE thread;
74 DWORD dwThreadId; /* thread ID */
75 squidaio_thread_status status;
76
77 struct squidaio_request_t *current_req;
78 unsigned long requests;
79 int volatile exit;
80};
81
82static void squidaio_queue_request(squidaio_request_t *);
83static void squidaio_cleanup_request(squidaio_request_t *);
84static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
85static void squidaio_do_open(squidaio_request_t *);
86static void squidaio_do_read(squidaio_request_t *);
87static void squidaio_do_write(squidaio_request_t *);
88static void squidaio_do_close(squidaio_request_t *);
89static void squidaio_do_stat(squidaio_request_t *);
595c7973 90static void squidaio_do_unlink(squidaio_request_t *);
595c7973 91#if AIO_OPENDIR
92static void *squidaio_do_opendir(squidaio_request_t *);
93#endif
94static void squidaio_debug(squidaio_request_t *);
95static void squidaio_poll_queues(void);
96
97static squidaio_thread_t *threads = NULL;
98static int squidaio_initialised = 0;
99
595c7973 100#define AIO_LARGE_BUFS 16384
f53969cc
SM
101#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
102#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
103#define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
104#define AIO_MICRO_BUFS 128
595c7973 105
f53969cc
SM
106static MemAllocator *squidaio_large_bufs = NULL; /* 16K */
107static MemAllocator *squidaio_medium_bufs = NULL; /* 8K */
108static MemAllocator *squidaio_small_bufs = NULL; /* 4K */
109static MemAllocator *squidaio_tiny_bufs = NULL; /* 2K */
110static MemAllocator *squidaio_micro_bufs = NULL; /* 128K */
595c7973 111
112static int request_queue_len = 0;
113static MemAllocator *squidaio_request_pool = NULL;
114static MemAllocator *squidaio_thread_pool = NULL;
115static squidaio_request_queue_t request_queue;
116
26ac0430 117static struct {
595c7973 118 squidaio_request_t *head, **tailp;
119}
120
121request_queue2 = {
122
26ac0430
AJ
123 NULL, &request_queue2.head
124};
595c7973 125static squidaio_request_queue_t done_queue;
126
26ac0430 127static struct {
595c7973 128 squidaio_request_t *head, **tailp;
129}
130
131done_requests = {
132
26ac0430
AJ
133 NULL, &done_requests.head
134};
595c7973 135
136static HANDLE main_thread;
137
138static MemAllocator *
139squidaio_get_pool(int size)
140{
141 if (size <= AIO_LARGE_BUFS) {
142 if (size <= AIO_MICRO_BUFS)
143 return squidaio_micro_bufs;
144 else if (size <= AIO_TINY_BUFS)
145 return squidaio_tiny_bufs;
146 else if (size <= AIO_SMALL_BUFS)
147 return squidaio_small_bufs;
148 else if (size <= AIO_MEDIUM_BUFS)
149 return squidaio_medium_bufs;
150 else
151 return squidaio_large_bufs;
152 }
153
154 return NULL;
155}
156
157void *
158squidaio_xmalloc(int size)
159{
160 void *p;
161 MemAllocator *pool;
162
163 if ((pool = squidaio_get_pool(size)) != NULL) {
164 p = pool->alloc();
165 } else
166 p = xmalloc(size);
167
168 return p;
169}
170
171static char *
172squidaio_xstrdup(const char *str)
173{
174 char *p;
175 int len = strlen(str) + 1;
176
177 p = (char *)squidaio_xmalloc(len);
178 strncpy(p, str, len);
179
180 return p;
181}
182
183void
184squidaio_xfree(void *p, int size)
185{
186 MemAllocator *pool;
187
188 if ((pool = squidaio_get_pool(size)) != NULL) {
1e1a9021 189 pool->freeOne(p);
595c7973 190 } else
191 xfree(p);
192}
193
194static void
195squidaio_xstrfree(char *str)
196{
197 MemAllocator *pool;
198 int len = strlen(str) + 1;
199
200 if ((pool = squidaio_get_pool(len)) != NULL) {
1e1a9021 201 pool->freeOne(str);
595c7973 202 } else
203 xfree(str);
204}
205
206void
207squidaio_init(void)
208{
209 int i;
210 squidaio_thread_t *threadp;
211
212 if (squidaio_initialised)
213 return;
214
215 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
216 GetCurrentThread(), /* pseudo handle to copy */
217 GetCurrentProcess(), /* pseudo handle, don't close */
218 &main_thread,
219 0, /* required access */
220 FALSE, /* child process's don't inherit the handle */
221 DUPLICATE_SAME_ACCESS)) {
222 /* spit errors */
223 fatal("Couldn't get current thread handle");
224 }
225
226 /* Initialize request queue */
227 if ((request_queue.mutex = CreateMutex(NULL, /* no inheritance */
228 FALSE, /* start unowned (as per mutex_init) */
229 NULL) /* no name */
230 ) == NULL) {
231 fatal("Failed to create mutex");
232 }
233
234 if ((request_queue.cond = CreateEvent(NULL, /* no inheritance */
235 FALSE, /* auto signal reset - which I think is pthreads like ? */
236 FALSE, /* start non signaled */
237 NULL) /* no name */
238 ) == NULL) {
239 fatal("Failed to create condition variable");
240 }
241
242 request_queue.head = NULL;
243
244 request_queue.tailp = &request_queue.head;
245
246 request_queue.requests = 0;
247
248 request_queue.blocked = 0;
249
250 /* Initialize done queue */
251
252 if ((done_queue.mutex = CreateMutex(NULL, /* no inheritance */
253 FALSE, /* start unowned (as per mutex_init) */
254 NULL) /* no name */
255 ) == NULL) {
256 fatal("Failed to create mutex");
257 }
258
259 if ((done_queue.cond = CreateEvent(NULL, /* no inheritance */
260 TRUE, /* manually signaled - which I think is pthreads like ? */
261 FALSE, /* start non signaled */
262 NULL) /* no name */
263 ) == NULL) {
264 fatal("Failed to create condition variable");
265 }
266
267 done_queue.head = NULL;
268
269 done_queue.tailp = &done_queue.head;
270
271 done_queue.requests = 0;
272
273 done_queue.blocked = 0;
274
1e1a9021
AJ
275 // Initialize the thread I/O pipes before creating any threads
276 // see bug 3189 comment 5 about race conditions.
277 CommIO::Initialize();
595c7973 278
279 /* Create threads and get them to sit in their wait loop */
280 squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
281
282 assert(NUMTHREADS);
283
cb4185f1 284 for (i = 0; i < NUMTHREADS; ++i) {
595c7973 285 threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
286 threadp->status = _THREAD_STARTING;
287 threadp->current_req = NULL;
288 threadp->requests = 0;
289 threadp->next = threads;
290 threads = threadp;
291
292 if ((threadp->thread = CreateThread(NULL, /* no security attributes */
293 0, /* use default stack size */
294 squidaio_thread_loop, /* thread function */
295 threadp, /* argument to thread function */
296 0, /* use default creation flags */
297 &(threadp->dwThreadId)) /* returns the thread identifier */
298 ) == NULL) {
299 fprintf(stderr, "Thread creation failed\n");
300 threadp->status = _THREAD_FAILED;
301 continue;
302 }
303
304 /* Set the new thread priority above parent process */
305 SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
306 }
307
308 /* Create request pool */
309 squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
310
311 squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
312
313 squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
314
315 squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
316
317 squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
318
319 squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
320
321 squidaio_initialised = 1;
322}
323
324void
325squidaio_shutdown(void)
326{
327 squidaio_thread_t *threadp;
328 int i;
329 HANDLE * hthreads;
330
331 if (!squidaio_initialised)
332 return;
333
334 /* This is the same as in squidaio_sync */
335 do {
336 squidaio_poll_queues();
337 } while (request_queue_len > 0);
338
339 hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
340
341 threadp = threads;
342
cb4185f1 343 for (i = 0; i < NUMTHREADS; ++i) {
595c7973 344 threadp->exit = 1;
345 hthreads[i] = threadp->thread;
346 threadp = threadp->next;
347 }
348
349 ReleaseMutex(request_queue.mutex);
350 ResetEvent(request_queue.cond);
351 ReleaseMutex(done_queue.mutex);
352 ResetEvent(done_queue.cond);
353 Sleep(0);
354
355 WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
356
cb4185f1 357 for (i = 0; i < NUMTHREADS; ++i) {
595c7973 358 CloseHandle(hthreads[i]);
359 }
360
361 CloseHandle(main_thread);
362 CommIO::NotifyIOClose();
363
364 squidaio_initialised = 0;
365 xfree(hthreads);
366}
367
368static DWORD WINAPI
369squidaio_thread_loop(LPVOID lpParam)
370{
371 squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
372 squidaio_request_t *request;
373 HANDLE cond; /* local copy of the event queue because win32 event handles
374 * don't atomically release the mutex as cond variables do. */
375
376 /* lock the thread info */
377
378 if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
379 fatal("Can't get ownership of mutex\n");
380 }
381
382 /* duplicate the handle */
383 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
384 request_queue.cond, /* handle to copy */
385 GetCurrentProcess(), /* pseudo handle, don't close */
386 &cond,
387 0, /* required access */
388 FALSE, /* child process's don't inherit the handle */
389 DUPLICATE_SAME_ACCESS))
390 fatal("Can't duplicate mutex handle\n");
391
392 if (!ReleaseMutex(request_queue.mutex)) {
393 CloseHandle(cond);
394 fatal("Can't release mutex\n");
395 }
396
397 Sleep(0);
398
399 while (1) {
400 DWORD rv;
401 threadp->current_req = request = NULL;
402 request = NULL;
403 /* Get a request to process */
404 threadp->status = _THREAD_WAITING;
405
406 if (threadp->exit) {
407 CloseHandle(request_queue.mutex);
408 CloseHandle(cond);
409 return 0;
410 }
411
412 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
413
414 if (rv == WAIT_FAILED) {
415 CloseHandle(cond);
416 return 1;
417 }
418
419 while (!request_queue.head) {
420 if (!ReleaseMutex(request_queue.mutex)) {
421 CloseHandle(cond);
422 threadp->status = _THREAD_FAILED;
423 return 1;
424 }
425
426 Sleep(0);
427 rv = WaitForSingleObject(cond, INFINITE);
428
429 if (rv == WAIT_FAILED) {
430 CloseHandle(cond);
431 return 1;
432 }
433
434 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
435
436 if (rv == WAIT_FAILED) {
437 CloseHandle(cond);
438 return 1;
439 }
440 }
441
442 request = request_queue.head;
443
444 if (request)
445 request_queue.head = request->next;
446
447 if (!request_queue.head)
448 request_queue.tailp = &request_queue.head;
449
450 if (!ReleaseMutex(request_queue.mutex)) {
451 CloseHandle(cond);
452 return 1;
453 }
454
455 Sleep(0);
456
457 /* process the request */
458 threadp->status = _THREAD_BUSY;
459
460 request->next = NULL;
461
462 threadp->current_req = request;
463
464 errno = 0;
465
466 if (!request->cancelled) {
467 switch (request->request_type) {
468
469 case _AIO_OP_OPEN:
470 squidaio_do_open(request);
471 break;
472
473 case _AIO_OP_READ:
474 squidaio_do_read(request);
475 break;
476
477 case _AIO_OP_WRITE:
478 squidaio_do_write(request);
479 break;
480
481 case _AIO_OP_CLOSE:
482 squidaio_do_close(request);
483 break;
484
595c7973 485 case _AIO_OP_UNLINK:
486 squidaio_do_unlink(request);
487 break;
488
f53969cc 489#if AIO_OPENDIR /* Opendir not implemented yet */
595c7973 490
491 case _AIO_OP_OPENDIR:
492 squidaio_do_opendir(request);
493 break;
494#endif
495
496 case _AIO_OP_STAT:
497 squidaio_do_stat(request);
498 break;
499
500 default:
501 request->ret = -1;
502 request->err = EINVAL;
503 break;
504 }
f53969cc 505 } else { /* cancelled */
595c7973 506 request->ret = -1;
507 request->err = EINTR;
508 }
509
510 threadp->status = _THREAD_DONE;
511 /* put the request in the done queue */
512 rv = WaitForSingleObject(done_queue.mutex, INFINITE);
513
514 if (rv == WAIT_FAILED) {
515 CloseHandle(cond);
516 return 1;
517 }
518
519 *done_queue.tailp = request;
520 done_queue.tailp = &request->next;
521
522 if (!ReleaseMutex(done_queue.mutex)) {
523 CloseHandle(cond);
524 return 1;
525 }
526
527 CommIO::NotifyIOCompleted();
528 Sleep(0);
cb4185f1 529 ++ threadp->requests;
f53969cc 530 } /* while forever */
595c7973 531
532 CloseHandle(cond);
533
534 return 0;
f53969cc 535} /* squidaio_thread_loop */
595c7973 536
537static void
538squidaio_queue_request(squidaio_request_t * request)
539{
540 static int high_start = 0;
bf8fe701 541 debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
595c7973 542 /* Mark it as not executed (failing result, no error) */
543 request->ret = -1;
544 request->err = 0;
545 /* Internal housekeeping */
546 request_queue_len += 1;
547 request->resultp->_data = request;
548 /* Play some tricks with the request_queue2 queue */
549 request->next = NULL;
550
551 if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
552 if (request_queue2.head) {
553 /* Grab blocked requests */
554 *request_queue.tailp = request_queue2.head;
555 request_queue.tailp = request_queue2.tailp;
556 }
557
558 /* Enqueue request */
559 *request_queue.tailp = request;
560
561 request_queue.tailp = &request->next;
562
563 if (!SetEvent(request_queue.cond))
564 fatal("Couldn't push queue");
565
566 if (!ReleaseMutex(request_queue.mutex)) {
567 /* unexpected error */
568 fatal("Couldn't push queue");
569 }
570
571 Sleep(0);
572
573 if (request_queue2.head) {
574 /* Clear queue of blocked requests */
575 request_queue2.head = NULL;
576 request_queue2.tailp = &request_queue2.head;
577 }
578 } else {
579 /* Oops, the request queue is blocked, use request_queue2 */
580 *request_queue2.tailp = request;
581 request_queue2.tailp = &request->next;
582 }
583
584 if (request_queue2.head) {
585 static int filter = 0;
586 static int filter_limit = 8;
587
588 if (++filter >= filter_limit) {
589 filter_limit += filter;
590 filter = 0;
e0236918 591 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Queue congestion");
595c7973 592 }
593 }
594
595 /* Warn if out of threads */
596 if (request_queue_len > MAGIC1) {
597 static int last_warn = 0;
598 static int queue_high, queue_low;
599
600 if (high_start == 0) {
601 high_start = (int)squid_curtime;
602 queue_high = request_queue_len;
603 queue_low = request_queue_len;
604 }
605
606 if (request_queue_len > queue_high)
607 queue_high = request_queue_len;
608
609 if (request_queue_len < queue_low)
610 queue_low = request_queue_len;
611
612 if (squid_curtime >= (last_warn + 15) &&
613 squid_curtime >= (high_start + 5)) {
e0236918 614 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Disk I/O overloading");
595c7973 615
616 if (squid_curtime >= (high_start + 15))
e0236918 617 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
bf8fe701 618 request_queue_len << ", high=" << queue_high <<
619 ", low=" << queue_low << ", duration=" <<
620 (long int) (squid_curtime - high_start));
595c7973 621
622 last_warn = (int)squid_curtime;
623 }
624 } else {
625 high_start = 0;
626 }
627
628 /* Warn if seriously overloaded */
629 if (request_queue_len > RIDICULOUS_LENGTH) {
fa84c01d
FC
630 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
631 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
595c7973 632 squidaio_sync();
fa84c01d 633 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
595c7973 634 }
f53969cc 635} /* squidaio_queue_request */
595c7973 636
637static void
638squidaio_cleanup_request(squidaio_request_t * requestp)
639{
640 squidaio_result_t *resultp = requestp->resultp;
641 int cancelled = requestp->cancelled;
642
643 /* Free allocated structures and copy data back to user space if the */
644 /* request hasn't been cancelled */
645
646 switch (requestp->request_type) {
647
648 case _AIO_OP_STAT:
649
650 if (!cancelled && requestp->ret == 0)
41d00cd3 651 memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
595c7973 652
653 squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
654
655 squidaio_xstrfree(requestp->path);
656
657 break;
658
659 case _AIO_OP_OPEN:
660 if (cancelled && requestp->ret >= 0)
661 /* The open() was cancelled but completed */
662 close(requestp->ret);
663
664 squidaio_xstrfree(requestp->path);
665
666 break;
667
668 case _AIO_OP_CLOSE:
669 if (cancelled && requestp->ret < 0)
670 /* The close() was cancelled and never got executed */
671 close(requestp->fd);
672
673 break;
674
675 case _AIO_OP_UNLINK:
676
595c7973 677 case _AIO_OP_OPENDIR:
678 squidaio_xstrfree(requestp->path);
679
680 break;
681
682 case _AIO_OP_READ:
683 break;
684
685 case _AIO_OP_WRITE:
686 break;
687
688 default:
689 break;
690 }
691
692 if (resultp != NULL && !cancelled) {
693 resultp->aio_return = requestp->ret;
694 resultp->aio_errno = requestp->err;
695 }
696
1e1a9021 697 squidaio_request_pool->freeOne(requestp);
f53969cc 698} /* squidaio_cleanup_request */
595c7973 699
595c7973 700int
701squidaio_cancel(squidaio_result_t * resultp)
702{
703 squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
704
705 if (request && request->resultp == resultp) {
bf8fe701 706 debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
595c7973 707 request->cancelled = 1;
708 request->resultp = NULL;
709 resultp->_data = NULL;
710 resultp->result_type = _AIO_OP_NONE;
711 return 0;
712 }
713
714 return 1;
f53969cc 715} /* squidaio_cancel */
595c7973 716
595c7973 717int
718squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
719{
720 squidaio_init();
721 squidaio_request_t *requestp;
722
723 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
724
725 requestp->path = (char *) squidaio_xstrdup(path);
726
727 requestp->oflag = oflag;
728
729 requestp->mode = mode;
730
731 requestp->resultp = resultp;
732
733 requestp->request_type = _AIO_OP_OPEN;
734
735 requestp->cancelled = 0;
736
737 resultp->result_type = _AIO_OP_OPEN;
738
739 squidaio_queue_request(requestp);
740
741 return 0;
742}
743
595c7973 744static void
745squidaio_do_open(squidaio_request_t * requestp)
746{
747 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
748 requestp->err = errno;
749}
750
595c7973 751int
ee139403 752squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
595c7973 753{
754 squidaio_request_t *requestp;
755
756 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
757
758 requestp->fd = fd;
759
760 requestp->bufferp = bufp;
761
762 requestp->buflen = bufs;
763
764 requestp->offset = offset;
765
766 requestp->whence = whence;
767
768 requestp->resultp = resultp;
769
770 requestp->request_type = _AIO_OP_READ;
771
772 requestp->cancelled = 0;
773
774 resultp->result_type = _AIO_OP_READ;
775
776 squidaio_queue_request(requestp);
777
778 return 0;
779}
780
595c7973 781static void
782squidaio_do_read(squidaio_request_t * requestp)
783{
784 lseek(requestp->fd, requestp->offset, requestp->whence);
785
786 if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
787 requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
788 WIN32_maperror(GetLastError());
789 requestp->ret = -1;
790 }
791
792 requestp->err = errno;
793}
794
595c7973 795int
ee139403 796squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
595c7973 797{
798 squidaio_request_t *requestp;
799
800 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
801
802 requestp->fd = fd;
803
804 requestp->bufferp = bufp;
805
806 requestp->buflen = bufs;
807
808 requestp->offset = offset;
809
810 requestp->whence = whence;
811
812 requestp->resultp = resultp;
813
814 requestp->request_type = _AIO_OP_WRITE;
815
816 requestp->cancelled = 0;
817
818 resultp->result_type = _AIO_OP_WRITE;
819
820 squidaio_queue_request(requestp);
821
822 return 0;
823}
824
595c7973 825static void
826squidaio_do_write(squidaio_request_t * requestp)
827{
828 if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
829 requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
830 WIN32_maperror(GetLastError());
831 requestp->ret = -1;
832 }
833
834 requestp->err = errno;
835}
836
595c7973 837int
838squidaio_close(int fd, squidaio_result_t * resultp)
839{
840 squidaio_request_t *requestp;
841
842 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
843
844 requestp->fd = fd;
845
846 requestp->resultp = resultp;
847
848 requestp->request_type = _AIO_OP_CLOSE;
849
850 requestp->cancelled = 0;
851
852 resultp->result_type = _AIO_OP_CLOSE;
853
854 squidaio_queue_request(requestp);
855
856 return 0;
857}
858
595c7973 859static void
860squidaio_do_close(squidaio_request_t * requestp)
861{
26ac0430 862 if ((requestp->ret = close(requestp->fd)) < 0) {
fa84c01d 863 debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
595c7973 864 close(requestp->fd);
865 }
866
867 requestp->err = errno;
868}
869
595c7973 870int
871
872squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
873{
874 squidaio_init();
875 squidaio_request_t *requestp;
876
877 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
878
879 requestp->path = (char *) squidaio_xstrdup(path);
880
881 requestp->statp = sb;
882
883 requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
884
885 requestp->resultp = resultp;
886
887 requestp->request_type = _AIO_OP_STAT;
888
889 requestp->cancelled = 0;
890
891 resultp->result_type = _AIO_OP_STAT;
892
893 squidaio_queue_request(requestp);
894
895 return 0;
896}
897
595c7973 898static void
899squidaio_do_stat(squidaio_request_t * requestp)
900{
901 requestp->ret = stat(requestp->path, requestp->tmpstatp);
902 requestp->err = errno;
903}
904
595c7973 905int
906squidaio_unlink(const char *path, squidaio_result_t * resultp)
907{
908 squidaio_init();
909 squidaio_request_t *requestp;
910
911 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
912
913 requestp->path = squidaio_xstrdup(path);
914
915 requestp->resultp = resultp;
916
917 requestp->request_type = _AIO_OP_UNLINK;
918
919 requestp->cancelled = 0;
920
921 resultp->result_type = _AIO_OP_UNLINK;
922
923 squidaio_queue_request(requestp);
924
925 return 0;
926}
927
595c7973 928static void
929squidaio_do_unlink(squidaio_request_t * requestp)
930{
931 requestp->ret = unlink(requestp->path);
932 requestp->err = errno;
933}
934
595c7973 935#if AIO_OPENDIR
936/* XXX squidaio_opendir NOT implemented yet.. */
937
938int
939squidaio_opendir(const char *path, squidaio_result_t * resultp)
940{
941 squidaio_request_t *requestp;
942 int len;
943
944 requestp = squidaio_request_pool->alloc();
945
946 resultp->result_type = _AIO_OP_OPENDIR;
947
948 return -1;
949}
950
951static void
952squidaio_do_opendir(squidaio_request_t * requestp)
953{
954 /* NOT IMPLEMENTED */
955}
956
957#endif
958
959static void
960squidaio_poll_queues(void)
961{
962 /* kick "overflow" request queue */
963
964 if (request_queue2.head &&
965 (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
966 *request_queue.tailp = request_queue2.head;
967 request_queue.tailp = request_queue2.tailp;
968
969 if (!SetEvent(request_queue.cond))
970 fatal("couldn't push queue\n");
971
972 if (!ReleaseMutex(request_queue.mutex)) {
973 /* unexpected error */
974 }
975
976 Sleep(0);
977 request_queue2.head = NULL;
978 request_queue2.tailp = &request_queue2.head;
979 }
980
981 /* poll done queue */
982 if (done_queue.head &&
983 (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
984
985 struct squidaio_request_t *requests = done_queue.head;
986 done_queue.head = NULL;
987 done_queue.tailp = &done_queue.head;
988
989 if (!ReleaseMutex(done_queue.mutex)) {
990 /* unexpected error */
991 }
992
993 Sleep(0);
994 *done_requests.tailp = requests;
995 request_queue_len -= 1;
996
997 while (requests->next) {
998 requests = requests->next;
999 request_queue_len -= 1;
1000 }
1001
1002 done_requests.tailp = &requests->next;
1003 }
1004}
1005
1006squidaio_result_t *
1007squidaio_poll_done(void)
1008{
1009 squidaio_request_t *request;
1010 squidaio_result_t *resultp;
1011 int cancelled;
1012 int polled = 0;
1013
1014AIO_REPOLL:
1015 request = done_requests.head;
1016
1017 if (request == NULL && !polled) {
1018 CommIO::ResetNotifications();
1019 squidaio_poll_queues();
1020 polled = 1;
1021 request = done_requests.head;
1022 }
1023
1024 if (!request) {
1025 return NULL;
1026 }
1027
bf8fe701 1028 debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
595c7973 1029 done_requests.head = request->next;
1030
1031 if (!done_requests.head)
1032 done_requests.tailp = &done_requests.head;
1033
1034 resultp = request->resultp;
1035
1036 cancelled = request->cancelled;
1037
1038 squidaio_debug(request);
1039
bf8fe701 1040 debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
595c7973 1041
1042 squidaio_cleanup_request(request);
1043
1044 if (cancelled)
1045 goto AIO_REPOLL;
1046
1047 return resultp;
f53969cc 1048} /* squidaio_poll_done */
595c7973 1049
1050int
1051squidaio_operations_pending(void)
1052{
1053 return request_queue_len + (done_requests.head ? 1 : 0);
1054}
1055
1056int
1057squidaio_sync(void)
1058{
1059 /* XXX This might take a while if the queue is large.. */
1060
1061 do {
1062 squidaio_poll_queues();
1063 } while (request_queue_len > 0);
1064
1065 return squidaio_operations_pending();
1066}
1067
1068int
1069squidaio_get_queue_len(void)
1070{
1071 return request_queue_len;
1072}
1073
1074static void
1075squidaio_debug(squidaio_request_t * request)
1076{
1077 switch (request->request_type) {
1078
1079 case _AIO_OP_OPEN:
bf8fe701 1080 debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
595c7973 1081 break;
1082
1083 case _AIO_OP_READ:
bf8fe701 1084 debugs(43, 5, "READ on fd: " << request->fd);
595c7973 1085 break;
1086
1087 case _AIO_OP_WRITE:
bf8fe701 1088 debugs(43, 5, "WRITE on fd: " << request->fd);
595c7973 1089 break;
1090
1091 case _AIO_OP_CLOSE:
bf8fe701 1092 debugs(43, 5, "CLOSE of fd: " << request->fd);
595c7973 1093 break;
1094
1095 case _AIO_OP_UNLINK:
bf8fe701 1096 debugs(43, 5, "UNLINK of " << request->path);
595c7973 1097 break;
1098
595c7973 1099 default:
1100 break;
1101 }
1102}
1103
1104void
1105squidaio_stats(StoreEntry * sentry)
1106{
1107 squidaio_thread_t *threadp;
1108 int i;
1109
1110 if (!squidaio_initialised)
1111 return;
1112
1113 storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1114
1115 storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1116
1117 threadp = threads;
1118
cb4185f1 1119 for (i = 0; i < NUMTHREADS; ++i) {
595c7973 1120 storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1121 threadp = threadp->next;
1122 }
1123}
f53969cc 1124