]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/DiskThreads/aiops_win32.cc
Boilerplate: update copyright blurbs on src/
[thirdparty/squid.git] / src / DiskIO / DiskThreads / aiops_win32.cc
CommitLineData
595c7973 1/*
bbc27441 2 * Copyright (C) 1996-2014 The Squid Software Foundation and contributors
595c7973 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
595c7973 7 */
8
bbc27441
AJ
9/* DEBUG: section 43 Windows AIOPS */
10
582c2af2 11#include "squid.h"
1ff991dc 12#include "DiskIO/DiskThreads/CommIO.h"
595c7973 13#include "DiskThreads.h"
1e1a9021 14#include "fd.h"
4d5904f7 15#include "SquidConfig.h"
489520a9
AJ
16#include "SquidTime.h"
17#include "Store.h"
595c7973 18
1a30fdf5 19#include <cerrno>
074d6a40 20#include <csignal>
4d5904f7
FC
21#include <sys/stat.h>
22#include <fcntl.h>
4d5904f7 23#include <dirent.h>
595c7973 24
25#define RIDICULOUS_LENGTH 4096
26
27enum _squidaio_thread_status {
28 _THREAD_STARTING = 0,
29 _THREAD_WAITING,
30 _THREAD_BUSY,
31 _THREAD_FAILED,
32 _THREAD_DONE
33};
34typedef enum _squidaio_thread_status squidaio_thread_status;
35
26ac0430 36typedef struct squidaio_request_t {
595c7973 37
38 struct squidaio_request_t *next;
39 squidaio_request_type request_type;
40 int cancelled;
41 char *path;
42 int oflag;
43 mode_t mode;
44 int fd;
45 char *bufferp;
46 char *tmpbufp;
ee139403 47 size_t buflen;
595c7973 48 off_t offset;
49 int whence;
50 int ret;
51 int err;
52
53 struct stat *tmpstatp;
54
55 struct stat *statp;
56 squidaio_result_t *resultp;
2fadd50d 57} squidaio_request_t;
595c7973 58
26ac0430 59typedef struct squidaio_request_queue_t {
595c7973 60 HANDLE mutex;
61 HANDLE cond; /* See Event objects */
62 squidaio_request_t *volatile head;
63 squidaio_request_t *volatile *volatile tailp;
64 unsigned long requests;
65 unsigned long blocked; /* main failed to lock the queue */
2fadd50d 66} squidaio_request_queue_t;
595c7973 67
68typedef struct squidaio_thread_t squidaio_thread_t;
69
26ac0430 70struct squidaio_thread_t {
595c7973 71 squidaio_thread_t *next;
72 HANDLE thread;
73 DWORD dwThreadId; /* thread ID */
74 squidaio_thread_status status;
75
76 struct squidaio_request_t *current_req;
77 unsigned long requests;
78 int volatile exit;
79};
80
81static void squidaio_queue_request(squidaio_request_t *);
82static void squidaio_cleanup_request(squidaio_request_t *);
83static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
84static void squidaio_do_open(squidaio_request_t *);
85static void squidaio_do_read(squidaio_request_t *);
86static void squidaio_do_write(squidaio_request_t *);
87static void squidaio_do_close(squidaio_request_t *);
88static void squidaio_do_stat(squidaio_request_t *);
595c7973 89static void squidaio_do_unlink(squidaio_request_t *);
595c7973 90#if AIO_OPENDIR
91static void *squidaio_do_opendir(squidaio_request_t *);
92#endif
93static void squidaio_debug(squidaio_request_t *);
94static void squidaio_poll_queues(void);
95
96static squidaio_thread_t *threads = NULL;
97static int squidaio_initialised = 0;
98
595c7973 99#define AIO_LARGE_BUFS 16384
100#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
101#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
102#define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
103#define AIO_MICRO_BUFS 128
104
105static MemAllocator *squidaio_large_bufs = NULL; /* 16K */
106static MemAllocator *squidaio_medium_bufs = NULL; /* 8K */
107static MemAllocator *squidaio_small_bufs = NULL; /* 4K */
108static MemAllocator *squidaio_tiny_bufs = NULL; /* 2K */
109static MemAllocator *squidaio_micro_bufs = NULL; /* 128K */
110
111static int request_queue_len = 0;
112static MemAllocator *squidaio_request_pool = NULL;
113static MemAllocator *squidaio_thread_pool = NULL;
114static squidaio_request_queue_t request_queue;
115
26ac0430 116static struct {
595c7973 117 squidaio_request_t *head, **tailp;
118}
119
120request_queue2 = {
121
26ac0430
AJ
122 NULL, &request_queue2.head
123};
595c7973 124static squidaio_request_queue_t done_queue;
125
26ac0430 126static struct {
595c7973 127 squidaio_request_t *head, **tailp;
128}
129
130done_requests = {
131
26ac0430
AJ
132 NULL, &done_requests.head
133};
595c7973 134
135static HANDLE main_thread;
136
137static MemAllocator *
138squidaio_get_pool(int size)
139{
140 if (size <= AIO_LARGE_BUFS) {
141 if (size <= AIO_MICRO_BUFS)
142 return squidaio_micro_bufs;
143 else if (size <= AIO_TINY_BUFS)
144 return squidaio_tiny_bufs;
145 else if (size <= AIO_SMALL_BUFS)
146 return squidaio_small_bufs;
147 else if (size <= AIO_MEDIUM_BUFS)
148 return squidaio_medium_bufs;
149 else
150 return squidaio_large_bufs;
151 }
152
153 return NULL;
154}
155
156void *
157squidaio_xmalloc(int size)
158{
159 void *p;
160 MemAllocator *pool;
161
162 if ((pool = squidaio_get_pool(size)) != NULL) {
163 p = pool->alloc();
164 } else
165 p = xmalloc(size);
166
167 return p;
168}
169
170static char *
171squidaio_xstrdup(const char *str)
172{
173 char *p;
174 int len = strlen(str) + 1;
175
176 p = (char *)squidaio_xmalloc(len);
177 strncpy(p, str, len);
178
179 return p;
180}
181
182void
183squidaio_xfree(void *p, int size)
184{
185 MemAllocator *pool;
186
187 if ((pool = squidaio_get_pool(size)) != NULL) {
1e1a9021 188 pool->freeOne(p);
595c7973 189 } else
190 xfree(p);
191}
192
193static void
194squidaio_xstrfree(char *str)
195{
196 MemAllocator *pool;
197 int len = strlen(str) + 1;
198
199 if ((pool = squidaio_get_pool(len)) != NULL) {
1e1a9021 200 pool->freeOne(str);
595c7973 201 } else
202 xfree(str);
203}
204
205void
206squidaio_init(void)
207{
208 int i;
209 squidaio_thread_t *threadp;
210
211 if (squidaio_initialised)
212 return;
213
214 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
215 GetCurrentThread(), /* pseudo handle to copy */
216 GetCurrentProcess(), /* pseudo handle, don't close */
217 &main_thread,
218 0, /* required access */
219 FALSE, /* child process's don't inherit the handle */
220 DUPLICATE_SAME_ACCESS)) {
221 /* spit errors */
222 fatal("Couldn't get current thread handle");
223 }
224
225 /* Initialize request queue */
226 if ((request_queue.mutex = CreateMutex(NULL, /* no inheritance */
227 FALSE, /* start unowned (as per mutex_init) */
228 NULL) /* no name */
229 ) == NULL) {
230 fatal("Failed to create mutex");
231 }
232
233 if ((request_queue.cond = CreateEvent(NULL, /* no inheritance */
234 FALSE, /* auto signal reset - which I think is pthreads like ? */
235 FALSE, /* start non signaled */
236 NULL) /* no name */
237 ) == NULL) {
238 fatal("Failed to create condition variable");
239 }
240
241 request_queue.head = NULL;
242
243 request_queue.tailp = &request_queue.head;
244
245 request_queue.requests = 0;
246
247 request_queue.blocked = 0;
248
249 /* Initialize done queue */
250
251 if ((done_queue.mutex = CreateMutex(NULL, /* no inheritance */
252 FALSE, /* start unowned (as per mutex_init) */
253 NULL) /* no name */
254 ) == NULL) {
255 fatal("Failed to create mutex");
256 }
257
258 if ((done_queue.cond = CreateEvent(NULL, /* no inheritance */
259 TRUE, /* manually signaled - which I think is pthreads like ? */
260 FALSE, /* start non signaled */
261 NULL) /* no name */
262 ) == NULL) {
263 fatal("Failed to create condition variable");
264 }
265
266 done_queue.head = NULL;
267
268 done_queue.tailp = &done_queue.head;
269
270 done_queue.requests = 0;
271
272 done_queue.blocked = 0;
273
1e1a9021
AJ
274 // Initialize the thread I/O pipes before creating any threads
275 // see bug 3189 comment 5 about race conditions.
276 CommIO::Initialize();
595c7973 277
278 /* Create threads and get them to sit in their wait loop */
279 squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
280
281 assert(NUMTHREADS);
282
cb4185f1 283 for (i = 0; i < NUMTHREADS; ++i) {
595c7973 284 threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
285 threadp->status = _THREAD_STARTING;
286 threadp->current_req = NULL;
287 threadp->requests = 0;
288 threadp->next = threads;
289 threads = threadp;
290
291 if ((threadp->thread = CreateThread(NULL, /* no security attributes */
292 0, /* use default stack size */
293 squidaio_thread_loop, /* thread function */
294 threadp, /* argument to thread function */
295 0, /* use default creation flags */
296 &(threadp->dwThreadId)) /* returns the thread identifier */
297 ) == NULL) {
298 fprintf(stderr, "Thread creation failed\n");
299 threadp->status = _THREAD_FAILED;
300 continue;
301 }
302
303 /* Set the new thread priority above parent process */
304 SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
305 }
306
307 /* Create request pool */
308 squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
309
310 squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
311
312 squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
313
314 squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
315
316 squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
317
318 squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
319
320 squidaio_initialised = 1;
321}
322
323void
324squidaio_shutdown(void)
325{
326 squidaio_thread_t *threadp;
327 int i;
328 HANDLE * hthreads;
329
330 if (!squidaio_initialised)
331 return;
332
333 /* This is the same as in squidaio_sync */
334 do {
335 squidaio_poll_queues();
336 } while (request_queue_len > 0);
337
338 hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
339
340 threadp = threads;
341
cb4185f1 342 for (i = 0; i < NUMTHREADS; ++i) {
595c7973 343 threadp->exit = 1;
344 hthreads[i] = threadp->thread;
345 threadp = threadp->next;
346 }
347
348 ReleaseMutex(request_queue.mutex);
349 ResetEvent(request_queue.cond);
350 ReleaseMutex(done_queue.mutex);
351 ResetEvent(done_queue.cond);
352 Sleep(0);
353
354 WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
355
cb4185f1 356 for (i = 0; i < NUMTHREADS; ++i) {
595c7973 357 CloseHandle(hthreads[i]);
358 }
359
360 CloseHandle(main_thread);
361 CommIO::NotifyIOClose();
362
363 squidaio_initialised = 0;
364 xfree(hthreads);
365}
366
367static DWORD WINAPI
368squidaio_thread_loop(LPVOID lpParam)
369{
370 squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
371 squidaio_request_t *request;
372 HANDLE cond; /* local copy of the event queue because win32 event handles
373 * don't atomically release the mutex as cond variables do. */
374
375 /* lock the thread info */
376
377 if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
378 fatal("Can't get ownership of mutex\n");
379 }
380
381 /* duplicate the handle */
382 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
383 request_queue.cond, /* handle to copy */
384 GetCurrentProcess(), /* pseudo handle, don't close */
385 &cond,
386 0, /* required access */
387 FALSE, /* child process's don't inherit the handle */
388 DUPLICATE_SAME_ACCESS))
389 fatal("Can't duplicate mutex handle\n");
390
391 if (!ReleaseMutex(request_queue.mutex)) {
392 CloseHandle(cond);
393 fatal("Can't release mutex\n");
394 }
395
396 Sleep(0);
397
398 while (1) {
399 DWORD rv;
400 threadp->current_req = request = NULL;
401 request = NULL;
402 /* Get a request to process */
403 threadp->status = _THREAD_WAITING;
404
405 if (threadp->exit) {
406 CloseHandle(request_queue.mutex);
407 CloseHandle(cond);
408 return 0;
409 }
410
411 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
412
413 if (rv == WAIT_FAILED) {
414 CloseHandle(cond);
415 return 1;
416 }
417
418 while (!request_queue.head) {
419 if (!ReleaseMutex(request_queue.mutex)) {
420 CloseHandle(cond);
421 threadp->status = _THREAD_FAILED;
422 return 1;
423 }
424
425 Sleep(0);
426 rv = WaitForSingleObject(cond, INFINITE);
427
428 if (rv == WAIT_FAILED) {
429 CloseHandle(cond);
430 return 1;
431 }
432
433 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
434
435 if (rv == WAIT_FAILED) {
436 CloseHandle(cond);
437 return 1;
438 }
439 }
440
441 request = request_queue.head;
442
443 if (request)
444 request_queue.head = request->next;
445
446 if (!request_queue.head)
447 request_queue.tailp = &request_queue.head;
448
449 if (!ReleaseMutex(request_queue.mutex)) {
450 CloseHandle(cond);
451 return 1;
452 }
453
454 Sleep(0);
455
456 /* process the request */
457 threadp->status = _THREAD_BUSY;
458
459 request->next = NULL;
460
461 threadp->current_req = request;
462
463 errno = 0;
464
465 if (!request->cancelled) {
466 switch (request->request_type) {
467
468 case _AIO_OP_OPEN:
469 squidaio_do_open(request);
470 break;
471
472 case _AIO_OP_READ:
473 squidaio_do_read(request);
474 break;
475
476 case _AIO_OP_WRITE:
477 squidaio_do_write(request);
478 break;
479
480 case _AIO_OP_CLOSE:
481 squidaio_do_close(request);
482 break;
483
595c7973 484 case _AIO_OP_UNLINK:
485 squidaio_do_unlink(request);
486 break;
487
595c7973 488#if AIO_OPENDIR /* Opendir not implemented yet */
489
490 case _AIO_OP_OPENDIR:
491 squidaio_do_opendir(request);
492 break;
493#endif
494
495 case _AIO_OP_STAT:
496 squidaio_do_stat(request);
497 break;
498
499 default:
500 request->ret = -1;
501 request->err = EINVAL;
502 break;
503 }
504 } else { /* cancelled */
505 request->ret = -1;
506 request->err = EINTR;
507 }
508
509 threadp->status = _THREAD_DONE;
510 /* put the request in the done queue */
511 rv = WaitForSingleObject(done_queue.mutex, INFINITE);
512
513 if (rv == WAIT_FAILED) {
514 CloseHandle(cond);
515 return 1;
516 }
517
518 *done_queue.tailp = request;
519 done_queue.tailp = &request->next;
520
521 if (!ReleaseMutex(done_queue.mutex)) {
522 CloseHandle(cond);
523 return 1;
524 }
525
526 CommIO::NotifyIOCompleted();
527 Sleep(0);
cb4185f1 528 ++ threadp->requests;
595c7973 529 } /* while forever */
530
531 CloseHandle(cond);
532
533 return 0;
534} /* squidaio_thread_loop */
535
536static void
537squidaio_queue_request(squidaio_request_t * request)
538{
539 static int high_start = 0;
bf8fe701 540 debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
595c7973 541 /* Mark it as not executed (failing result, no error) */
542 request->ret = -1;
543 request->err = 0;
544 /* Internal housekeeping */
545 request_queue_len += 1;
546 request->resultp->_data = request;
547 /* Play some tricks with the request_queue2 queue */
548 request->next = NULL;
549
550 if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
551 if (request_queue2.head) {
552 /* Grab blocked requests */
553 *request_queue.tailp = request_queue2.head;
554 request_queue.tailp = request_queue2.tailp;
555 }
556
557 /* Enqueue request */
558 *request_queue.tailp = request;
559
560 request_queue.tailp = &request->next;
561
562 if (!SetEvent(request_queue.cond))
563 fatal("Couldn't push queue");
564
565 if (!ReleaseMutex(request_queue.mutex)) {
566 /* unexpected error */
567 fatal("Couldn't push queue");
568 }
569
570 Sleep(0);
571
572 if (request_queue2.head) {
573 /* Clear queue of blocked requests */
574 request_queue2.head = NULL;
575 request_queue2.tailp = &request_queue2.head;
576 }
577 } else {
578 /* Oops, the request queue is blocked, use request_queue2 */
579 *request_queue2.tailp = request;
580 request_queue2.tailp = &request->next;
581 }
582
583 if (request_queue2.head) {
584 static int filter = 0;
585 static int filter_limit = 8;
586
587 if (++filter >= filter_limit) {
588 filter_limit += filter;
589 filter = 0;
e0236918 590 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Queue congestion");
595c7973 591 }
592 }
593
594 /* Warn if out of threads */
595 if (request_queue_len > MAGIC1) {
596 static int last_warn = 0;
597 static int queue_high, queue_low;
598
599 if (high_start == 0) {
600 high_start = (int)squid_curtime;
601 queue_high = request_queue_len;
602 queue_low = request_queue_len;
603 }
604
605 if (request_queue_len > queue_high)
606 queue_high = request_queue_len;
607
608 if (request_queue_len < queue_low)
609 queue_low = request_queue_len;
610
611 if (squid_curtime >= (last_warn + 15) &&
612 squid_curtime >= (high_start + 5)) {
e0236918 613 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: WARNING - Disk I/O overloading");
595c7973 614
615 if (squid_curtime >= (high_start + 15))
e0236918 616 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
bf8fe701 617 request_queue_len << ", high=" << queue_high <<
618 ", low=" << queue_low << ", duration=" <<
619 (long int) (squid_curtime - high_start));
595c7973 620
621 last_warn = (int)squid_curtime;
622 }
623 } else {
624 high_start = 0;
625 }
626
627 /* Warn if seriously overloaded */
628 if (request_queue_len > RIDICULOUS_LENGTH) {
fa84c01d
FC
629 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
630 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
595c7973 631 squidaio_sync();
fa84c01d 632 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
595c7973 633 }
634} /* squidaio_queue_request */
635
636static void
637squidaio_cleanup_request(squidaio_request_t * requestp)
638{
639 squidaio_result_t *resultp = requestp->resultp;
640 int cancelled = requestp->cancelled;
641
642 /* Free allocated structures and copy data back to user space if the */
643 /* request hasn't been cancelled */
644
645 switch (requestp->request_type) {
646
647 case _AIO_OP_STAT:
648
649 if (!cancelled && requestp->ret == 0)
41d00cd3 650 memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
595c7973 651
652 squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
653
654 squidaio_xstrfree(requestp->path);
655
656 break;
657
658 case _AIO_OP_OPEN:
659 if (cancelled && requestp->ret >= 0)
660 /* The open() was cancelled but completed */
661 close(requestp->ret);
662
663 squidaio_xstrfree(requestp->path);
664
665 break;
666
667 case _AIO_OP_CLOSE:
668 if (cancelled && requestp->ret < 0)
669 /* The close() was cancelled and never got executed */
670 close(requestp->fd);
671
672 break;
673
674 case _AIO_OP_UNLINK:
675
595c7973 676 case _AIO_OP_OPENDIR:
677 squidaio_xstrfree(requestp->path);
678
679 break;
680
681 case _AIO_OP_READ:
682 break;
683
684 case _AIO_OP_WRITE:
685 break;
686
687 default:
688 break;
689 }
690
691 if (resultp != NULL && !cancelled) {
692 resultp->aio_return = requestp->ret;
693 resultp->aio_errno = requestp->err;
694 }
695
1e1a9021 696 squidaio_request_pool->freeOne(requestp);
595c7973 697} /* squidaio_cleanup_request */
698
595c7973 699int
700squidaio_cancel(squidaio_result_t * resultp)
701{
702 squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
703
704 if (request && request->resultp == resultp) {
bf8fe701 705 debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
595c7973 706 request->cancelled = 1;
707 request->resultp = NULL;
708 resultp->_data = NULL;
709 resultp->result_type = _AIO_OP_NONE;
710 return 0;
711 }
712
713 return 1;
714} /* squidaio_cancel */
715
595c7973 716int
717squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
718{
719 squidaio_init();
720 squidaio_request_t *requestp;
721
722 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
723
724 requestp->path = (char *) squidaio_xstrdup(path);
725
726 requestp->oflag = oflag;
727
728 requestp->mode = mode;
729
730 requestp->resultp = resultp;
731
732 requestp->request_type = _AIO_OP_OPEN;
733
734 requestp->cancelled = 0;
735
736 resultp->result_type = _AIO_OP_OPEN;
737
738 squidaio_queue_request(requestp);
739
740 return 0;
741}
742
595c7973 743static void
744squidaio_do_open(squidaio_request_t * requestp)
745{
746 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
747 requestp->err = errno;
748}
749
595c7973 750int
ee139403 751squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
595c7973 752{
753 squidaio_request_t *requestp;
754
755 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
756
757 requestp->fd = fd;
758
759 requestp->bufferp = bufp;
760
761 requestp->buflen = bufs;
762
763 requestp->offset = offset;
764
765 requestp->whence = whence;
766
767 requestp->resultp = resultp;
768
769 requestp->request_type = _AIO_OP_READ;
770
771 requestp->cancelled = 0;
772
773 resultp->result_type = _AIO_OP_READ;
774
775 squidaio_queue_request(requestp);
776
777 return 0;
778}
779
595c7973 780static void
781squidaio_do_read(squidaio_request_t * requestp)
782{
783 lseek(requestp->fd, requestp->offset, requestp->whence);
784
785 if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
786 requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
787 WIN32_maperror(GetLastError());
788 requestp->ret = -1;
789 }
790
791 requestp->err = errno;
792}
793
595c7973 794int
ee139403 795squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
595c7973 796{
797 squidaio_request_t *requestp;
798
799 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
800
801 requestp->fd = fd;
802
803 requestp->bufferp = bufp;
804
805 requestp->buflen = bufs;
806
807 requestp->offset = offset;
808
809 requestp->whence = whence;
810
811 requestp->resultp = resultp;
812
813 requestp->request_type = _AIO_OP_WRITE;
814
815 requestp->cancelled = 0;
816
817 resultp->result_type = _AIO_OP_WRITE;
818
819 squidaio_queue_request(requestp);
820
821 return 0;
822}
823
595c7973 824static void
825squidaio_do_write(squidaio_request_t * requestp)
826{
827 if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
828 requestp->buflen, (LPDWORD)&requestp->ret, NULL)) {
829 WIN32_maperror(GetLastError());
830 requestp->ret = -1;
831 }
832
833 requestp->err = errno;
834}
835
595c7973 836int
837squidaio_close(int fd, squidaio_result_t * resultp)
838{
839 squidaio_request_t *requestp;
840
841 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
842
843 requestp->fd = fd;
844
845 requestp->resultp = resultp;
846
847 requestp->request_type = _AIO_OP_CLOSE;
848
849 requestp->cancelled = 0;
850
851 resultp->result_type = _AIO_OP_CLOSE;
852
853 squidaio_queue_request(requestp);
854
855 return 0;
856}
857
595c7973 858static void
859squidaio_do_close(squidaio_request_t * requestp)
860{
26ac0430 861 if ((requestp->ret = close(requestp->fd)) < 0) {
fa84c01d 862 debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
595c7973 863 close(requestp->fd);
864 }
865
866 requestp->err = errno;
867}
868
595c7973 869int
870
871squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
872{
873 squidaio_init();
874 squidaio_request_t *requestp;
875
876 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
877
878 requestp->path = (char *) squidaio_xstrdup(path);
879
880 requestp->statp = sb;
881
882 requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
883
884 requestp->resultp = resultp;
885
886 requestp->request_type = _AIO_OP_STAT;
887
888 requestp->cancelled = 0;
889
890 resultp->result_type = _AIO_OP_STAT;
891
892 squidaio_queue_request(requestp);
893
894 return 0;
895}
896
595c7973 897static void
898squidaio_do_stat(squidaio_request_t * requestp)
899{
900 requestp->ret = stat(requestp->path, requestp->tmpstatp);
901 requestp->err = errno;
902}
903
595c7973 904int
905squidaio_unlink(const char *path, squidaio_result_t * resultp)
906{
907 squidaio_init();
908 squidaio_request_t *requestp;
909
910 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
911
912 requestp->path = squidaio_xstrdup(path);
913
914 requestp->resultp = resultp;
915
916 requestp->request_type = _AIO_OP_UNLINK;
917
918 requestp->cancelled = 0;
919
920 resultp->result_type = _AIO_OP_UNLINK;
921
922 squidaio_queue_request(requestp);
923
924 return 0;
925}
926
595c7973 927static void
928squidaio_do_unlink(squidaio_request_t * requestp)
929{
930 requestp->ret = unlink(requestp->path);
931 requestp->err = errno;
932}
933
595c7973 934#if AIO_OPENDIR
935/* XXX squidaio_opendir NOT implemented yet.. */
936
937int
938squidaio_opendir(const char *path, squidaio_result_t * resultp)
939{
940 squidaio_request_t *requestp;
941 int len;
942
943 requestp = squidaio_request_pool->alloc();
944
945 resultp->result_type = _AIO_OP_OPENDIR;
946
947 return -1;
948}
949
950static void
951squidaio_do_opendir(squidaio_request_t * requestp)
952{
953 /* NOT IMPLEMENTED */
954}
955
956#endif
957
958static void
959squidaio_poll_queues(void)
960{
961 /* kick "overflow" request queue */
962
963 if (request_queue2.head &&
964 (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
965 *request_queue.tailp = request_queue2.head;
966 request_queue.tailp = request_queue2.tailp;
967
968 if (!SetEvent(request_queue.cond))
969 fatal("couldn't push queue\n");
970
971 if (!ReleaseMutex(request_queue.mutex)) {
972 /* unexpected error */
973 }
974
975 Sleep(0);
976 request_queue2.head = NULL;
977 request_queue2.tailp = &request_queue2.head;
978 }
979
980 /* poll done queue */
981 if (done_queue.head &&
982 (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
983
984 struct squidaio_request_t *requests = done_queue.head;
985 done_queue.head = NULL;
986 done_queue.tailp = &done_queue.head;
987
988 if (!ReleaseMutex(done_queue.mutex)) {
989 /* unexpected error */
990 }
991
992 Sleep(0);
993 *done_requests.tailp = requests;
994 request_queue_len -= 1;
995
996 while (requests->next) {
997 requests = requests->next;
998 request_queue_len -= 1;
999 }
1000
1001 done_requests.tailp = &requests->next;
1002 }
1003}
1004
1005squidaio_result_t *
1006squidaio_poll_done(void)
1007{
1008 squidaio_request_t *request;
1009 squidaio_result_t *resultp;
1010 int cancelled;
1011 int polled = 0;
1012
1013AIO_REPOLL:
1014 request = done_requests.head;
1015
1016 if (request == NULL && !polled) {
1017 CommIO::ResetNotifications();
1018 squidaio_poll_queues();
1019 polled = 1;
1020 request = done_requests.head;
1021 }
1022
1023 if (!request) {
1024 return NULL;
1025 }
1026
bf8fe701 1027 debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
595c7973 1028 done_requests.head = request->next;
1029
1030 if (!done_requests.head)
1031 done_requests.tailp = &done_requests.head;
1032
1033 resultp = request->resultp;
1034
1035 cancelled = request->cancelled;
1036
1037 squidaio_debug(request);
1038
bf8fe701 1039 debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
595c7973 1040
1041 squidaio_cleanup_request(request);
1042
1043 if (cancelled)
1044 goto AIO_REPOLL;
1045
1046 return resultp;
1047} /* squidaio_poll_done */
1048
1049int
1050squidaio_operations_pending(void)
1051{
1052 return request_queue_len + (done_requests.head ? 1 : 0);
1053}
1054
1055int
1056squidaio_sync(void)
1057{
1058 /* XXX This might take a while if the queue is large.. */
1059
1060 do {
1061 squidaio_poll_queues();
1062 } while (request_queue_len > 0);
1063
1064 return squidaio_operations_pending();
1065}
1066
1067int
1068squidaio_get_queue_len(void)
1069{
1070 return request_queue_len;
1071}
1072
1073static void
1074squidaio_debug(squidaio_request_t * request)
1075{
1076 switch (request->request_type) {
1077
1078 case _AIO_OP_OPEN:
bf8fe701 1079 debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
595c7973 1080 break;
1081
1082 case _AIO_OP_READ:
bf8fe701 1083 debugs(43, 5, "READ on fd: " << request->fd);
595c7973 1084 break;
1085
1086 case _AIO_OP_WRITE:
bf8fe701 1087 debugs(43, 5, "WRITE on fd: " << request->fd);
595c7973 1088 break;
1089
1090 case _AIO_OP_CLOSE:
bf8fe701 1091 debugs(43, 5, "CLOSE of fd: " << request->fd);
595c7973 1092 break;
1093
1094 case _AIO_OP_UNLINK:
bf8fe701 1095 debugs(43, 5, "UNLINK of " << request->path);
595c7973 1096 break;
1097
595c7973 1098 default:
1099 break;
1100 }
1101}
1102
1103void
1104squidaio_stats(StoreEntry * sentry)
1105{
1106 squidaio_thread_t *threadp;
1107 int i;
1108
1109 if (!squidaio_initialised)
1110 return;
1111
1112 storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1113
1114 storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1115
1116 threadp = threads;
1117
cb4185f1 1118 for (i = 0; i < NUMTHREADS; ++i) {
595c7973 1119 storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1120 threadp = threadp->next;
1121 }
1122}