]> git.ipfire.org Git - thirdparty/squid.git/blame - src/DiskIO/DiskThreads/aiops_win32.cc
Maintenance: replace most NULL with nullptr (#1402)
[thirdparty/squid.git] / src / DiskIO / DiskThreads / aiops_win32.cc
CommitLineData
595c7973 1/*
b8ae064d 2 * Copyright (C) 1996-2023 The Squid Software Foundation and contributors
595c7973 3 *
bbc27441
AJ
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
595c7973 7 */
8
bbc27441
AJ
9/* DEBUG: section 43 Windows AIOPS */
10
582c2af2 11#include "squid.h"
1ff991dc 12#include "DiskIO/DiskThreads/CommIO.h"
595c7973 13#include "DiskThreads.h"
1e1a9021 14#include "fd.h"
bb1373a6 15#include "mem/Pool.h"
4d5904f7 16#include "SquidConfig.h"
489520a9 17#include "Store.h"
595c7973 18
1a30fdf5 19#include <cerrno>
074d6a40 20#include <csignal>
4d5904f7
FC
21#include <sys/stat.h>
22#include <fcntl.h>
4d5904f7 23#include <dirent.h>
595c7973 24
f53969cc 25#define RIDICULOUS_LENGTH 4096
595c7973 26
27enum _squidaio_thread_status {
28 _THREAD_STARTING = 0,
29 _THREAD_WAITING,
30 _THREAD_BUSY,
31 _THREAD_FAILED,
32 _THREAD_DONE
33};
34typedef enum _squidaio_thread_status squidaio_thread_status;
35
26ac0430 36typedef struct squidaio_request_t {
595c7973 37
38 struct squidaio_request_t *next;
39 squidaio_request_type request_type;
40 int cancelled;
41 char *path;
42 int oflag;
43 mode_t mode;
44 int fd;
45 char *bufferp;
46 char *tmpbufp;
ee139403 47 size_t buflen;
595c7973 48 off_t offset;
49 int whence;
50 int ret;
51 int err;
52
53 struct stat *tmpstatp;
54
55 struct stat *statp;
56 squidaio_result_t *resultp;
2fadd50d 57} squidaio_request_t;
595c7973 58
26ac0430 59typedef struct squidaio_request_queue_t {
595c7973 60 HANDLE mutex;
61 HANDLE cond; /* See Event objects */
62 squidaio_request_t *volatile head;
63 squidaio_request_t *volatile *volatile tailp;
64 unsigned long requests;
f53969cc 65 unsigned long blocked; /* main failed to lock the queue */
2fadd50d 66} squidaio_request_queue_t;
595c7973 67
68typedef struct squidaio_thread_t squidaio_thread_t;
69
26ac0430 70struct squidaio_thread_t {
595c7973 71 squidaio_thread_t *next;
72 HANDLE thread;
73 DWORD dwThreadId; /* thread ID */
74 squidaio_thread_status status;
75
76 struct squidaio_request_t *current_req;
77 unsigned long requests;
78 int volatile exit;
79};
80
81static void squidaio_queue_request(squidaio_request_t *);
82static void squidaio_cleanup_request(squidaio_request_t *);
83static DWORD WINAPI squidaio_thread_loop( LPVOID lpParam );
84static void squidaio_do_open(squidaio_request_t *);
85static void squidaio_do_read(squidaio_request_t *);
86static void squidaio_do_write(squidaio_request_t *);
87static void squidaio_do_close(squidaio_request_t *);
88static void squidaio_do_stat(squidaio_request_t *);
595c7973 89static void squidaio_do_unlink(squidaio_request_t *);
595c7973 90#if AIO_OPENDIR
91static void *squidaio_do_opendir(squidaio_request_t *);
92#endif
93static void squidaio_debug(squidaio_request_t *);
94static void squidaio_poll_queues(void);
95
a1b1756c 96static squidaio_thread_t *threads = nullptr;
595c7973 97static int squidaio_initialised = 0;
98
595c7973 99#define AIO_LARGE_BUFS 16384
f53969cc
SM
100#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
101#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
102#define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
103#define AIO_MICRO_BUFS 128
595c7973 104
341876ec
AJ
105static Mem::Allocator *squidaio_large_bufs = nullptr; /* 16K */
106static Mem::Allocator *squidaio_medium_bufs = nullptr; /* 8K */
107static Mem::Allocator *squidaio_small_bufs = nullptr; /* 4K */
108static Mem::Allocator *squidaio_tiny_bufs = nullptr; /* 2K */
109static Mem::Allocator *squidaio_micro_bufs = nullptr; /* 128K */
595c7973 110
111static int request_queue_len = 0;
341876ec
AJ
112static Mem::Allocator *squidaio_request_pool = nullptr;
113static Mem::Allocator *squidaio_thread_pool = nullptr;
595c7973 114static squidaio_request_queue_t request_queue;
115
26ac0430 116static struct {
595c7973 117 squidaio_request_t *head, **tailp;
118}
119
120request_queue2 = {
121
a1b1756c 122 nullptr, &request_queue2.head
26ac0430 123};
595c7973 124static squidaio_request_queue_t done_queue;
125
26ac0430 126static struct {
595c7973 127 squidaio_request_t *head, **tailp;
128}
129
130done_requests = {
131
a1b1756c 132 nullptr, &done_requests.head
26ac0430 133};
595c7973 134
135static HANDLE main_thread;
136
341876ec 137static Mem::Allocator *
595c7973 138squidaio_get_pool(int size)
139{
140 if (size <= AIO_LARGE_BUFS) {
141 if (size <= AIO_MICRO_BUFS)
142 return squidaio_micro_bufs;
143 else if (size <= AIO_TINY_BUFS)
144 return squidaio_tiny_bufs;
145 else if (size <= AIO_SMALL_BUFS)
146 return squidaio_small_bufs;
147 else if (size <= AIO_MEDIUM_BUFS)
148 return squidaio_medium_bufs;
149 else
150 return squidaio_large_bufs;
151 }
152
a1b1756c 153 return nullptr;
595c7973 154}
155
156void *
157squidaio_xmalloc(int size)
158{
159 void *p;
341876ec 160 if (const auto pool = squidaio_get_pool(size)) {
595c7973 161 p = pool->alloc();
162 } else
163 p = xmalloc(size);
164
165 return p;
166}
167
168static char *
169squidaio_xstrdup(const char *str)
170{
171 char *p;
172 int len = strlen(str) + 1;
173
174 p = (char *)squidaio_xmalloc(len);
175 strncpy(p, str, len);
176
177 return p;
178}
179
180void
181squidaio_xfree(void *p, int size)
182{
341876ec 183 if (const auto pool = squidaio_get_pool(size)) {
1e1a9021 184 pool->freeOne(p);
595c7973 185 } else
186 xfree(p);
187}
188
189static void
190squidaio_xstrfree(char *str)
191{
595c7973 192 int len = strlen(str) + 1;
193
341876ec 194 if (const auto pool = squidaio_get_pool(len)) {
1e1a9021 195 pool->freeOne(str);
595c7973 196 } else
197 xfree(str);
198}
199
200void
201squidaio_init(void)
202{
203 int i;
204 squidaio_thread_t *threadp;
205
206 if (squidaio_initialised)
207 return;
208
209 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
210 GetCurrentThread(), /* pseudo handle to copy */
211 GetCurrentProcess(), /* pseudo handle, don't close */
212 &main_thread,
213 0, /* required access */
214 FALSE, /* child process's don't inherit the handle */
215 DUPLICATE_SAME_ACCESS)) {
216 /* spit errors */
217 fatal("Couldn't get current thread handle");
218 }
219
220 /* Initialize request queue */
a1b1756c 221 if ((request_queue.mutex = CreateMutex(nullptr, /* no inheritance */
595c7973 222 FALSE, /* start unowned (as per mutex_init) */
a1b1756c 223 nullptr) /* no name */
595c7973 224 ) == NULL) {
225 fatal("Failed to create mutex");
226 }
227
a1b1756c 228 if ((request_queue.cond = CreateEvent(nullptr, /* no inheritance */
595c7973 229 FALSE, /* auto signal reset - which I think is pthreads like ? */
230 FALSE, /* start non signaled */
a1b1756c 231 nullptr) /* no name */
595c7973 232 ) == NULL) {
233 fatal("Failed to create condition variable");
234 }
235
a1b1756c 236 request_queue.head = nullptr;
595c7973 237
238 request_queue.tailp = &request_queue.head;
239
240 request_queue.requests = 0;
241
242 request_queue.blocked = 0;
243
244 /* Initialize done queue */
245
a1b1756c 246 if ((done_queue.mutex = CreateMutex(nullptr, /* no inheritance */
595c7973 247 FALSE, /* start unowned (as per mutex_init) */
a1b1756c 248 nullptr) /* no name */
595c7973 249 ) == NULL) {
250 fatal("Failed to create mutex");
251 }
252
a1b1756c 253 if ((done_queue.cond = CreateEvent(nullptr, /* no inheritance */
595c7973 254 TRUE, /* manually signaled - which I think is pthreads like ? */
255 FALSE, /* start non signaled */
a1b1756c 256 nullptr) /* no name */
595c7973 257 ) == NULL) {
258 fatal("Failed to create condition variable");
259 }
260
a1b1756c 261 done_queue.head = nullptr;
595c7973 262
263 done_queue.tailp = &done_queue.head;
264
265 done_queue.requests = 0;
266
267 done_queue.blocked = 0;
268
1e1a9021
AJ
269 // Initialize the thread I/O pipes before creating any threads
270 // see bug 3189 comment 5 about race conditions.
271 CommIO::Initialize();
595c7973 272
273 /* Create threads and get them to sit in their wait loop */
274 squidaio_thread_pool = memPoolCreate("aio_thread", sizeof(squidaio_thread_t));
275
6c1219b9 276 assert(NUMTHREADS > 0);
595c7973 277
cb4185f1 278 for (i = 0; i < NUMTHREADS; ++i) {
595c7973 279 threadp = (squidaio_thread_t *)squidaio_thread_pool->alloc();
280 threadp->status = _THREAD_STARTING;
a1b1756c 281 threadp->current_req = nullptr;
595c7973 282 threadp->requests = 0;
283 threadp->next = threads;
284 threads = threadp;
285
a1b1756c 286 if ((threadp->thread = CreateThread(nullptr, /* no security attributes */
595c7973 287 0, /* use default stack size */
288 squidaio_thread_loop, /* thread function */
289 threadp, /* argument to thread function */
290 0, /* use default creation flags */
291 &(threadp->dwThreadId)) /* returns the thread identifier */
292 ) == NULL) {
293 fprintf(stderr, "Thread creation failed\n");
294 threadp->status = _THREAD_FAILED;
295 continue;
296 }
297
298 /* Set the new thread priority above parent process */
299 SetThreadPriority(threadp->thread,THREAD_PRIORITY_ABOVE_NORMAL);
300 }
301
302 /* Create request pool */
303 squidaio_request_pool = memPoolCreate("aio_request", sizeof(squidaio_request_t));
304
305 squidaio_large_bufs = memPoolCreate("squidaio_large_bufs", AIO_LARGE_BUFS);
306
307 squidaio_medium_bufs = memPoolCreate("squidaio_medium_bufs", AIO_MEDIUM_BUFS);
308
309 squidaio_small_bufs = memPoolCreate("squidaio_small_bufs", AIO_SMALL_BUFS);
310
311 squidaio_tiny_bufs = memPoolCreate("squidaio_tiny_bufs", AIO_TINY_BUFS);
312
313 squidaio_micro_bufs = memPoolCreate("squidaio_micro_bufs", AIO_MICRO_BUFS);
314
315 squidaio_initialised = 1;
316}
317
318void
319squidaio_shutdown(void)
320{
321 squidaio_thread_t *threadp;
322 int i;
323 HANDLE * hthreads;
324
325 if (!squidaio_initialised)
326 return;
327
328 /* This is the same as in squidaio_sync */
329 do {
330 squidaio_poll_queues();
331 } while (request_queue_len > 0);
332
333 hthreads = (HANDLE *) xcalloc (NUMTHREADS, sizeof (HANDLE));
334
335 threadp = threads;
336
cb4185f1 337 for (i = 0; i < NUMTHREADS; ++i) {
595c7973 338 threadp->exit = 1;
339 hthreads[i] = threadp->thread;
340 threadp = threadp->next;
341 }
342
343 ReleaseMutex(request_queue.mutex);
344 ResetEvent(request_queue.cond);
345 ReleaseMutex(done_queue.mutex);
346 ResetEvent(done_queue.cond);
347 Sleep(0);
348
349 WaitForMultipleObjects(NUMTHREADS, hthreads, TRUE, 2000);
350
cb4185f1 351 for (i = 0; i < NUMTHREADS; ++i) {
595c7973 352 CloseHandle(hthreads[i]);
353 }
354
355 CloseHandle(main_thread);
356 CommIO::NotifyIOClose();
357
358 squidaio_initialised = 0;
359 xfree(hthreads);
360}
361
362static DWORD WINAPI
363squidaio_thread_loop(LPVOID lpParam)
364{
365 squidaio_thread_t *threadp = (squidaio_thread_t *)lpParam;
366 squidaio_request_t *request;
367 HANDLE cond; /* local copy of the event queue because win32 event handles
368 * don't atomically release the mutex as cond variables do. */
369
370 /* lock the thread info */
371
372 if (WAIT_FAILED == WaitForSingleObject(request_queue.mutex, INFINITE)) {
373 fatal("Can't get ownership of mutex\n");
374 }
375
376 /* duplicate the handle */
377 if (!DuplicateHandle(GetCurrentProcess(), /* pseudo handle, don't close */
378 request_queue.cond, /* handle to copy */
379 GetCurrentProcess(), /* pseudo handle, don't close */
380 &cond,
381 0, /* required access */
382 FALSE, /* child process's don't inherit the handle */
383 DUPLICATE_SAME_ACCESS))
384 fatal("Can't duplicate mutex handle\n");
385
386 if (!ReleaseMutex(request_queue.mutex)) {
387 CloseHandle(cond);
388 fatal("Can't release mutex\n");
389 }
390
391 Sleep(0);
392
393 while (1) {
394 DWORD rv;
a1b1756c
AJ
395 threadp->current_req = request = nullptr;
396 request = nullptr;
595c7973 397 /* Get a request to process */
398 threadp->status = _THREAD_WAITING;
399
400 if (threadp->exit) {
401 CloseHandle(request_queue.mutex);
402 CloseHandle(cond);
403 return 0;
404 }
405
406 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
407
408 if (rv == WAIT_FAILED) {
409 CloseHandle(cond);
410 return 1;
411 }
412
413 while (!request_queue.head) {
414 if (!ReleaseMutex(request_queue.mutex)) {
415 CloseHandle(cond);
416 threadp->status = _THREAD_FAILED;
417 return 1;
418 }
419
420 Sleep(0);
421 rv = WaitForSingleObject(cond, INFINITE);
422
423 if (rv == WAIT_FAILED) {
424 CloseHandle(cond);
425 return 1;
426 }
427
428 rv = WaitForSingleObject(request_queue.mutex, INFINITE);
429
430 if (rv == WAIT_FAILED) {
431 CloseHandle(cond);
432 return 1;
433 }
434 }
435
436 request = request_queue.head;
437
438 if (request)
439 request_queue.head = request->next;
440
441 if (!request_queue.head)
442 request_queue.tailp = &request_queue.head;
443
444 if (!ReleaseMutex(request_queue.mutex)) {
445 CloseHandle(cond);
446 return 1;
447 }
448
449 Sleep(0);
450
451 /* process the request */
452 threadp->status = _THREAD_BUSY;
453
a1b1756c 454 request->next = nullptr;
595c7973 455
456 threadp->current_req = request;
457
458 errno = 0;
459
460 if (!request->cancelled) {
461 switch (request->request_type) {
462
463 case _AIO_OP_OPEN:
464 squidaio_do_open(request);
465 break;
466
467 case _AIO_OP_READ:
468 squidaio_do_read(request);
469 break;
470
471 case _AIO_OP_WRITE:
472 squidaio_do_write(request);
473 break;
474
475 case _AIO_OP_CLOSE:
476 squidaio_do_close(request);
477 break;
478
595c7973 479 case _AIO_OP_UNLINK:
480 squidaio_do_unlink(request);
481 break;
482
f53969cc 483#if AIO_OPENDIR /* Opendir not implemented yet */
595c7973 484
485 case _AIO_OP_OPENDIR:
486 squidaio_do_opendir(request);
487 break;
488#endif
489
490 case _AIO_OP_STAT:
491 squidaio_do_stat(request);
492 break;
493
494 default:
495 request->ret = -1;
496 request->err = EINVAL;
497 break;
498 }
f53969cc 499 } else { /* cancelled */
595c7973 500 request->ret = -1;
501 request->err = EINTR;
502 }
503
504 threadp->status = _THREAD_DONE;
505 /* put the request in the done queue */
506 rv = WaitForSingleObject(done_queue.mutex, INFINITE);
507
508 if (rv == WAIT_FAILED) {
509 CloseHandle(cond);
510 return 1;
511 }
512
513 *done_queue.tailp = request;
514 done_queue.tailp = &request->next;
515
516 if (!ReleaseMutex(done_queue.mutex)) {
517 CloseHandle(cond);
518 return 1;
519 }
520
521 CommIO::NotifyIOCompleted();
522 Sleep(0);
cb4185f1 523 ++ threadp->requests;
f53969cc 524 } /* while forever */
595c7973 525
526 CloseHandle(cond);
527
528 return 0;
f53969cc 529} /* squidaio_thread_loop */
595c7973 530
531static void
532squidaio_queue_request(squidaio_request_t * request)
533{
534 static int high_start = 0;
bf8fe701 535 debugs(43, 9, "squidaio_queue_request: " << request << " type=" << request->request_type << " result=" << request->resultp);
595c7973 536 /* Mark it as not executed (failing result, no error) */
537 request->ret = -1;
538 request->err = 0;
539 /* Internal housekeeping */
540 request_queue_len += 1;
541 request->resultp->_data = request;
542 /* Play some tricks with the request_queue2 queue */
a1b1756c 543 request->next = nullptr;
595c7973 544
545 if (WaitForSingleObject(request_queue.mutex, 0) == WAIT_OBJECT_0) {
546 if (request_queue2.head) {
547 /* Grab blocked requests */
548 *request_queue.tailp = request_queue2.head;
549 request_queue.tailp = request_queue2.tailp;
550 }
551
552 /* Enqueue request */
553 *request_queue.tailp = request;
554
555 request_queue.tailp = &request->next;
556
557 if (!SetEvent(request_queue.cond))
558 fatal("Couldn't push queue");
559
560 if (!ReleaseMutex(request_queue.mutex)) {
561 /* unexpected error */
562 fatal("Couldn't push queue");
563 }
564
565 Sleep(0);
566
567 if (request_queue2.head) {
568 /* Clear queue of blocked requests */
a1b1756c 569 request_queue2.head = nullptr;
595c7973 570 request_queue2.tailp = &request_queue2.head;
571 }
572 } else {
573 /* Oops, the request queue is blocked, use request_queue2 */
574 *request_queue2.tailp = request;
575 request_queue2.tailp = &request->next;
576 }
577
578 if (request_queue2.head) {
8ab8bb53
AJ
579 static uint64_t filter = 0;
580 static uint64_t filter_limit = 8196;
595c7973 581
582 if (++filter >= filter_limit) {
583 filter_limit += filter;
584 filter = 0;
d816f28d 585 debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Queue congestion (growing to " << filter_limit << ")");
595c7973 586 }
587 }
588
589 /* Warn if out of threads */
590 if (request_queue_len > MAGIC1) {
591 static int last_warn = 0;
592 static int queue_high, queue_low;
593
594 if (high_start == 0) {
595 high_start = (int)squid_curtime;
596 queue_high = request_queue_len;
597 queue_low = request_queue_len;
598 }
599
600 if (request_queue_len > queue_high)
601 queue_high = request_queue_len;
602
603 if (request_queue_len < queue_low)
604 queue_low = request_queue_len;
605
606 if (squid_curtime >= (last_warn + 15) &&
607 squid_curtime >= (high_start + 5)) {
d816f28d 608 debugs(43, DBG_IMPORTANT, "WARNING: squidaio_queue_request: Disk I/O overloading");
595c7973 609
610 if (squid_curtime >= (high_start + 15))
e0236918 611 debugs(43, DBG_IMPORTANT, "squidaio_queue_request: Queue Length: current=" <<
bf8fe701 612 request_queue_len << ", high=" << queue_high <<
613 ", low=" << queue_low << ", duration=" <<
614 (long int) (squid_curtime - high_start));
595c7973 615
616 last_warn = (int)squid_curtime;
617 }
618 } else {
619 high_start = 0;
620 }
621
622 /* Warn if seriously overloaded */
623 if (request_queue_len > RIDICULOUS_LENGTH) {
fa84c01d
FC
624 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Async request queue growing uncontrollably!");
625 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Syncing pending I/O operations.. (blocking)");
595c7973 626 squidaio_sync();
fa84c01d 627 debugs(43, DBG_CRITICAL, "squidaio_queue_request: Synced");
595c7973 628 }
f53969cc 629} /* squidaio_queue_request */
595c7973 630
631static void
632squidaio_cleanup_request(squidaio_request_t * requestp)
633{
634 squidaio_result_t *resultp = requestp->resultp;
635 int cancelled = requestp->cancelled;
636
637 /* Free allocated structures and copy data back to user space if the */
638 /* request hasn't been cancelled */
639
640 switch (requestp->request_type) {
641
642 case _AIO_OP_STAT:
643
644 if (!cancelled && requestp->ret == 0)
41d00cd3 645 memcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
595c7973 646
647 squidaio_xfree(requestp->tmpstatp, sizeof(struct stat));
648
649 squidaio_xstrfree(requestp->path);
650
651 break;
652
653 case _AIO_OP_OPEN:
654 if (cancelled && requestp->ret >= 0)
655 /* The open() was cancelled but completed */
656 close(requestp->ret);
657
658 squidaio_xstrfree(requestp->path);
659
660 break;
661
662 case _AIO_OP_CLOSE:
663 if (cancelled && requestp->ret < 0)
664 /* The close() was cancelled and never got executed */
665 close(requestp->fd);
666
667 break;
668
669 case _AIO_OP_UNLINK:
670
595c7973 671 case _AIO_OP_OPENDIR:
672 squidaio_xstrfree(requestp->path);
673
674 break;
675
676 case _AIO_OP_READ:
677 break;
678
679 case _AIO_OP_WRITE:
680 break;
681
682 default:
683 break;
684 }
685
686 if (resultp != NULL && !cancelled) {
687 resultp->aio_return = requestp->ret;
688 resultp->aio_errno = requestp->err;
689 }
690
1e1a9021 691 squidaio_request_pool->freeOne(requestp);
f53969cc 692} /* squidaio_cleanup_request */
595c7973 693
595c7973 694int
695squidaio_cancel(squidaio_result_t * resultp)
696{
697 squidaio_request_t *request = (squidaio_request_t *)resultp->_data;
698
699 if (request && request->resultp == resultp) {
bf8fe701 700 debugs(43, 9, "squidaio_cancel: " << request << " type=" << request->request_type << " result=" << request->resultp);
595c7973 701 request->cancelled = 1;
a1b1756c
AJ
702 request->resultp = nullptr;
703 resultp->_data = nullptr;
595c7973 704 resultp->result_type = _AIO_OP_NONE;
705 return 0;
706 }
707
708 return 1;
f53969cc 709} /* squidaio_cancel */
595c7973 710
595c7973 711int
712squidaio_open(const char *path, int oflag, mode_t mode, squidaio_result_t * resultp)
713{
714 squidaio_init();
715 squidaio_request_t *requestp;
716
717 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
718
719 requestp->path = (char *) squidaio_xstrdup(path);
720
721 requestp->oflag = oflag;
722
723 requestp->mode = mode;
724
725 requestp->resultp = resultp;
726
727 requestp->request_type = _AIO_OP_OPEN;
728
729 requestp->cancelled = 0;
730
731 resultp->result_type = _AIO_OP_OPEN;
732
733 squidaio_queue_request(requestp);
734
735 return 0;
736}
737
595c7973 738static void
739squidaio_do_open(squidaio_request_t * requestp)
740{
741 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
742 requestp->err = errno;
743}
744
595c7973 745int
ee139403 746squidaio_read(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
595c7973 747{
748 squidaio_request_t *requestp;
749
750 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
751
752 requestp->fd = fd;
753
754 requestp->bufferp = bufp;
755
756 requestp->buflen = bufs;
757
758 requestp->offset = offset;
759
760 requestp->whence = whence;
761
762 requestp->resultp = resultp;
763
764 requestp->request_type = _AIO_OP_READ;
765
766 requestp->cancelled = 0;
767
768 resultp->result_type = _AIO_OP_READ;
769
770 squidaio_queue_request(requestp);
771
772 return 0;
773}
774
595c7973 775static void
776squidaio_do_read(squidaio_request_t * requestp)
777{
778 lseek(requestp->fd, requestp->offset, requestp->whence);
779
780 if (!ReadFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
a1b1756c 781 requestp->buflen, (LPDWORD)&requestp->ret, nullptr)) {
595c7973 782 WIN32_maperror(GetLastError());
783 requestp->ret = -1;
784 }
785
786 requestp->err = errno;
787}
788
595c7973 789int
ee139403 790squidaio_write(int fd, char *bufp, size_t bufs, off_t offset, int whence, squidaio_result_t * resultp)
595c7973 791{
792 squidaio_request_t *requestp;
793
794 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
795
796 requestp->fd = fd;
797
798 requestp->bufferp = bufp;
799
800 requestp->buflen = bufs;
801
802 requestp->offset = offset;
803
804 requestp->whence = whence;
805
806 requestp->resultp = resultp;
807
808 requestp->request_type = _AIO_OP_WRITE;
809
810 requestp->cancelled = 0;
811
812 resultp->result_type = _AIO_OP_WRITE;
813
814 squidaio_queue_request(requestp);
815
816 return 0;
817}
818
595c7973 819static void
820squidaio_do_write(squidaio_request_t * requestp)
821{
822 if (!WriteFile((HANDLE)_get_osfhandle(requestp->fd), requestp->bufferp,
a1b1756c 823 requestp->buflen, (LPDWORD)&requestp->ret, nullptr)) {
595c7973 824 WIN32_maperror(GetLastError());
825 requestp->ret = -1;
826 }
827
828 requestp->err = errno;
829}
830
595c7973 831int
832squidaio_close(int fd, squidaio_result_t * resultp)
833{
834 squidaio_request_t *requestp;
835
836 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
837
838 requestp->fd = fd;
839
840 requestp->resultp = resultp;
841
842 requestp->request_type = _AIO_OP_CLOSE;
843
844 requestp->cancelled = 0;
845
846 resultp->result_type = _AIO_OP_CLOSE;
847
848 squidaio_queue_request(requestp);
849
850 return 0;
851}
852
595c7973 853static void
854squidaio_do_close(squidaio_request_t * requestp)
855{
26ac0430 856 if ((requestp->ret = close(requestp->fd)) < 0) {
fa84c01d 857 debugs(43, DBG_CRITICAL, "squidaio_do_close: FD " << requestp->fd << ", errno " << errno);
595c7973 858 close(requestp->fd);
859 }
860
861 requestp->err = errno;
862}
863
595c7973 864int
865
866squidaio_stat(const char *path, struct stat *sb, squidaio_result_t * resultp)
867{
868 squidaio_init();
869 squidaio_request_t *requestp;
870
871 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
872
873 requestp->path = (char *) squidaio_xstrdup(path);
874
875 requestp->statp = sb;
876
877 requestp->tmpstatp = (struct stat *) squidaio_xmalloc(sizeof(struct stat));
878
879 requestp->resultp = resultp;
880
881 requestp->request_type = _AIO_OP_STAT;
882
883 requestp->cancelled = 0;
884
885 resultp->result_type = _AIO_OP_STAT;
886
887 squidaio_queue_request(requestp);
888
889 return 0;
890}
891
595c7973 892static void
893squidaio_do_stat(squidaio_request_t * requestp)
894{
895 requestp->ret = stat(requestp->path, requestp->tmpstatp);
896 requestp->err = errno;
897}
898
595c7973 899int
900squidaio_unlink(const char *path, squidaio_result_t * resultp)
901{
902 squidaio_init();
903 squidaio_request_t *requestp;
904
905 requestp = (squidaio_request_t *)squidaio_request_pool->alloc();
906
907 requestp->path = squidaio_xstrdup(path);
908
909 requestp->resultp = resultp;
910
911 requestp->request_type = _AIO_OP_UNLINK;
912
913 requestp->cancelled = 0;
914
915 resultp->result_type = _AIO_OP_UNLINK;
916
917 squidaio_queue_request(requestp);
918
919 return 0;
920}
921
595c7973 922static void
923squidaio_do_unlink(squidaio_request_t * requestp)
924{
925 requestp->ret = unlink(requestp->path);
926 requestp->err = errno;
927}
928
595c7973 929#if AIO_OPENDIR
930/* XXX squidaio_opendir NOT implemented yet.. */
931
932int
933squidaio_opendir(const char *path, squidaio_result_t * resultp)
934{
935 squidaio_request_t *requestp;
936 int len;
937
938 requestp = squidaio_request_pool->alloc();
939
940 resultp->result_type = _AIO_OP_OPENDIR;
941
942 return -1;
943}
944
945static void
946squidaio_do_opendir(squidaio_request_t * requestp)
947{
948 /* NOT IMPLEMENTED */
949}
950
951#endif
952
953static void
954squidaio_poll_queues(void)
955{
956 /* kick "overflow" request queue */
957
958 if (request_queue2.head &&
959 (WaitForSingleObject(request_queue.mutex, 0 )== WAIT_OBJECT_0)) {
960 *request_queue.tailp = request_queue2.head;
961 request_queue.tailp = request_queue2.tailp;
962
963 if (!SetEvent(request_queue.cond))
964 fatal("couldn't push queue\n");
965
966 if (!ReleaseMutex(request_queue.mutex)) {
967 /* unexpected error */
968 }
969
970 Sleep(0);
a1b1756c 971 request_queue2.head = nullptr;
595c7973 972 request_queue2.tailp = &request_queue2.head;
973 }
974
975 /* poll done queue */
976 if (done_queue.head &&
977 (WaitForSingleObject(done_queue.mutex, 0)==WAIT_OBJECT_0)) {
978
979 struct squidaio_request_t *requests = done_queue.head;
a1b1756c 980 done_queue.head = nullptr;
595c7973 981 done_queue.tailp = &done_queue.head;
982
983 if (!ReleaseMutex(done_queue.mutex)) {
984 /* unexpected error */
985 }
986
987 Sleep(0);
988 *done_requests.tailp = requests;
989 request_queue_len -= 1;
990
991 while (requests->next) {
992 requests = requests->next;
993 request_queue_len -= 1;
994 }
995
996 done_requests.tailp = &requests->next;
997 }
998}
999
1000squidaio_result_t *
1001squidaio_poll_done(void)
1002{
1003 squidaio_request_t *request;
1004 squidaio_result_t *resultp;
1005 int cancelled;
1006 int polled = 0;
1007
1008AIO_REPOLL:
1009 request = done_requests.head;
1010
1011 if (request == NULL && !polled) {
1012 CommIO::ResetNotifications();
1013 squidaio_poll_queues();
1014 polled = 1;
1015 request = done_requests.head;
1016 }
1017
1018 if (!request) {
a1b1756c 1019 return nullptr;
595c7973 1020 }
1021
bf8fe701 1022 debugs(43, 9, "squidaio_poll_done: " << request << " type=" << request->request_type << " result=" << request->resultp);
595c7973 1023 done_requests.head = request->next;
1024
1025 if (!done_requests.head)
1026 done_requests.tailp = &done_requests.head;
1027
1028 resultp = request->resultp;
1029
1030 cancelled = request->cancelled;
1031
1032 squidaio_debug(request);
1033
bf8fe701 1034 debugs(43, 5, "DONE: " << request->ret << " -> " << request->err);
595c7973 1035
1036 squidaio_cleanup_request(request);
1037
1038 if (cancelled)
1039 goto AIO_REPOLL;
1040
1041 return resultp;
f53969cc 1042} /* squidaio_poll_done */
595c7973 1043
1044int
1045squidaio_operations_pending(void)
1046{
1047 return request_queue_len + (done_requests.head ? 1 : 0);
1048}
1049
1050int
1051squidaio_sync(void)
1052{
1053 /* XXX This might take a while if the queue is large.. */
1054
1055 do {
1056 squidaio_poll_queues();
1057 } while (request_queue_len > 0);
1058
1059 return squidaio_operations_pending();
1060}
1061
1062int
1063squidaio_get_queue_len(void)
1064{
1065 return request_queue_len;
1066}
1067
1068static void
1069squidaio_debug(squidaio_request_t * request)
1070{
1071 switch (request->request_type) {
1072
1073 case _AIO_OP_OPEN:
bf8fe701 1074 debugs(43, 5, "OPEN of " << request->path << " to FD " << request->ret);
595c7973 1075 break;
1076
1077 case _AIO_OP_READ:
bf8fe701 1078 debugs(43, 5, "READ on fd: " << request->fd);
595c7973 1079 break;
1080
1081 case _AIO_OP_WRITE:
bf8fe701 1082 debugs(43, 5, "WRITE on fd: " << request->fd);
595c7973 1083 break;
1084
1085 case _AIO_OP_CLOSE:
bf8fe701 1086 debugs(43, 5, "CLOSE of fd: " << request->fd);
595c7973 1087 break;
1088
1089 case _AIO_OP_UNLINK:
bf8fe701 1090 debugs(43, 5, "UNLINK of " << request->path);
595c7973 1091 break;
1092
595c7973 1093 default:
1094 break;
1095 }
1096}
1097
1098void
1099squidaio_stats(StoreEntry * sentry)
1100{
1101 squidaio_thread_t *threadp;
1102 int i;
1103
1104 if (!squidaio_initialised)
1105 return;
1106
1107 storeAppendPrintf(sentry, "\n\nThreads Status:\n");
1108
1109 storeAppendPrintf(sentry, "#\tID\t# Requests\n");
1110
1111 threadp = threads;
1112
cb4185f1 1113 for (i = 0; i < NUMTHREADS; ++i) {
595c7973 1114 storeAppendPrintf(sentry, "%i\t0x%lx\t%ld\n", i + 1, threadp->dwThreadId, threadp->requests);
1115 threadp = threadp->next;
1116 }
1117}
f53969cc 1118