]> git.ipfire.org Git - thirdparty/squid.git/blame - src/fs/aufs/aiops.cc
Georgy Salnikov
[thirdparty/squid.git] / src / fs / aufs / aiops.cc
CommitLineData
cd748f27 1/*
4672d0cd 2 * $Id: aiops.cc,v 1.5 2000/12/09 04:43:30 wessels Exp $
cd748f27 3 *
4 * DEBUG: section 43 AIOPS
5 * AUTHOR: Stewart Forster <slf@connect.com.au>
6 *
7 * SQUID Internet Object Cache http://squid.nlanr.net/Squid/
8 * ----------------------------------------------------------
9 *
10 * Squid is the result of efforts by numerous individuals from the
11 * Internet community. Development is led by Duane Wessels of the
12 * National Laboratory for Applied Network Research and funded by the
13 * National Science Foundation. Squid is Copyrighted (C) 1998 by
14 * Duane Wessels and the University of California San Diego. Please
15 * see the COPYRIGHT file for full details. Squid incorporates
16 * software developed and/or copyrighted by other sources. Please see
17 * the CREDITS file for full details.
18 *
19 * This program is free software; you can redistribute it and/or modify
20 * it under the terms of the GNU General Public License as published by
21 * the Free Software Foundation; either version 2 of the License, or
22 * (at your option) any later version.
23 *
24 * This program is distributed in the hope that it will be useful,
25 * but WITHOUT ANY WARRANTY; without even the implied warranty of
26 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
27 * GNU General Public License for more details.
28 *
29 * You should have received a copy of the GNU General Public License
30 * along with this program; if not, write to the Free Software
31 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
32 *
33 */
34
35#include "squid.h"
36#include "store_asyncufs.h"
37
38#include <stdio.h>
39#include <sys/types.h>
40#include <sys/stat.h>
41#include <fcntl.h>
42#include <pthread.h>
43#include <errno.h>
44#include <dirent.h>
45#include <signal.h>
46#if HAVE_SCHED_H
47#include <sched.h>
48#endif
49
50#define RIDICULOUS_LENGTH 4096
51
cd748f27 52enum _aio_thread_status {
53 _THREAD_STARTING = 0,
54 _THREAD_WAITING,
55 _THREAD_BUSY,
56 _THREAD_FAILED,
57 _THREAD_DONE
58};
59
60enum _aio_request_type {
61 _AIO_OP_NONE = 0,
62 _AIO_OP_OPEN,
63 _AIO_OP_READ,
64 _AIO_OP_WRITE,
65 _AIO_OP_CLOSE,
66 _AIO_OP_UNLINK,
15a47d1d 67 _AIO_OP_TRUNCATE,
cd748f27 68 _AIO_OP_OPENDIR,
69 _AIO_OP_STAT
70};
71
72typedef struct aio_request_t {
55f0e6f7 73 struct aio_request_t *next;
cd748f27 74 enum _aio_request_type request_type;
75 int cancelled;
76 char *path;
77 int oflag;
78 mode_t mode;
79 int fd;
80 char *bufferp;
81 char *tmpbufp;
82 int buflen;
83 off_t offset;
84 int whence;
85 int ret;
86 int err;
87 struct stat *tmpstatp;
88 struct stat *statp;
89 aio_result_t *resultp;
cd748f27 90} aio_request_t;
91
55f0e6f7 92typedef struct aio_request_queue_t {
93 pthread_mutex_t mutex;
94 pthread_cond_t cond;
95 aio_request_t * volatile head;
96 aio_request_t * volatile * volatile tailp;
97 unsigned long requests;
98 unsigned long blocked; /* main failed to lock the queue */
99} aio_request_queue_t;
100
101typedef struct aio_thread_t aio_thread_t;
102struct aio_thread_t {
103 aio_thread_t *next;
cd748f27 104 pthread_t thread;
105 enum _aio_thread_status status;
55f0e6f7 106 struct aio_request_t *current_req;
107 unsigned long requests;
108};
cd748f27 109
110int aio_cancel(aio_result_t *);
111int aio_open(const char *, int, mode_t, aio_result_t *);
112int aio_read(int, char *, int, off_t, int, aio_result_t *);
113int aio_write(int, char *, int, off_t, int, aio_result_t *);
114int aio_close(int, aio_result_t *);
115int aio_unlink(const char *, aio_result_t *);
15a47d1d 116int aio_truncate(const char *, off_t length, aio_result_t *);
cd748f27 117int aio_opendir(const char *, aio_result_t *);
118aio_result_t *aio_poll_done();
119int aio_sync(void);
120
121static void aio_init(void);
122static void aio_queue_request(aio_request_t *);
cd748f27 123static void aio_cleanup_request(aio_request_t *);
124static void *aio_thread_loop(void *);
125static void aio_do_open(aio_request_t *);
126static void aio_do_read(aio_request_t *);
127static void aio_do_write(aio_request_t *);
128static void aio_do_close(aio_request_t *);
129static void aio_do_stat(aio_request_t *);
130static void aio_do_unlink(aio_request_t *);
15a47d1d 131static void aio_do_truncate(aio_request_t *);
cd748f27 132#if AIO_OPENDIR
133static void *aio_do_opendir(aio_request_t *);
134#endif
135static void aio_debug(aio_request_t *);
55f0e6f7 136static void aio_poll_queues(void);
cd748f27 137
55f0e6f7 138static aio_thread_t *threads = NULL;
cd748f27 139static int aio_initialised = 0;
140
55f0e6f7 141
58cd5bbd 142#define AIO_LARGE_BUFS 16384
143#define AIO_MEDIUM_BUFS AIO_LARGE_BUFS >> 1
144#define AIO_SMALL_BUFS AIO_LARGE_BUFS >> 2
145#define AIO_TINY_BUFS AIO_LARGE_BUFS >> 3
55f0e6f7 146#define AIO_MICRO_BUFS 128
58cd5bbd 147
55f0e6f7 148static MemPool *aio_large_bufs = NULL; /* 16K */
149static MemPool *aio_medium_bufs = NULL; /* 8K */
150static MemPool *aio_small_bufs = NULL; /* 4K */
151static MemPool *aio_tiny_bufs = NULL; /* 2K */
152static MemPool *aio_micro_bufs = NULL; /* 128K */
58cd5bbd 153
cd748f27 154static int request_queue_len = 0;
155static MemPool *aio_request_pool = NULL;
55f0e6f7 156static MemPool *aio_thread_pool = NULL;
157static aio_request_queue_t request_queue;
158static struct { aio_request_t *head, **tailp; } request_queue2 = { NULL, &request_queue2.head };
159static aio_request_queue_t done_queue;
160static struct { aio_request_t *head, **tailp; } done_requests = { NULL, &done_requests.head };
cd748f27 161static pthread_attr_t globattr;
162static struct sched_param globsched;
163static pthread_t main_thread;
164
58cd5bbd 165static MemPool *
166aio_get_pool(int size)
167{
168 MemPool *p;
169 if (size <= AIO_LARGE_BUFS) {
55f0e6f7 170 if (size <= AIO_MICRO_BUFS)
171 p = aio_micro_bufs;
172 else if (size <= AIO_TINY_BUFS)
58cd5bbd 173 p = aio_tiny_bufs;
174 else if (size <= AIO_SMALL_BUFS)
175 p = aio_small_bufs;
176 else if (size <= AIO_MEDIUM_BUFS)
177 p = aio_medium_bufs;
178 else
179 p = aio_large_bufs;
180 } else
181 p = NULL;
182 return p;
183}
184
185static void *
186aio_xmalloc(int size)
187{
188 void *p;
189 MemPool *pool;
190
191 if ( (pool = aio_get_pool(size)) != NULL) {
192 p = memPoolAlloc(pool);
193 } else
194 p = xmalloc(size);
195
196 return p;
197}
198
55f0e6f7 199static char *
200aio_xstrdup(const char *str)
201{
202 char *p;
203 int len = strlen(str)+1;
204
205 p = aio_xmalloc(len);
206 strncpy(p, str, len);
207
208 return p;
209}
210
58cd5bbd 211static void
212aio_xfree(void *p, int size)
213{
214 MemPool *pool;
215
216 if ( (pool = aio_get_pool(size)) != NULL) {
217 memPoolFree(pool, p);
218 } else
219 xfree(p);
220}
221
55f0e6f7 222static void
223aio_xstrfree(char *str)
224{
225 MemPool *pool;
226 int len = strlen(str)+1;
227
228 if ( (pool = aio_get_pool(len)) != NULL) {
229 memPoolFree(pool, str);
230 } else
231 xfree(str);
232}
233
cd748f27 234static void
235aio_init(void)
236{
237 int i;
238 aio_thread_t *threadp;
239
240 if (aio_initialised)
241 return;
242
243 pthread_attr_init(&globattr);
244#if HAVE_PTHREAD_ATTR_SETSCOPE
245 pthread_attr_setscope(&globattr, PTHREAD_SCOPE_SYSTEM);
246#endif
247 globsched.sched_priority = 1;
248 main_thread = pthread_self();
249#if HAVE_PTHREAD_SETSCHEDPARAM
250 pthread_setschedparam(main_thread, SCHED_OTHER, &globsched);
251#endif
252 globsched.sched_priority = 2;
253#if HAVE_PTHREAD_ATTR_SETSCHEDPARAM
254 pthread_attr_setschedparam(&globattr, &globsched);
255#endif
256
55f0e6f7 257 /* Initialize request queue */
258 if (pthread_mutex_init(&(request_queue.mutex), NULL))
259 fatal("Failed to create mutex");
260 if (pthread_cond_init(&(request_queue.cond), NULL))
261 fatal("Failed to create condition variable");
262 request_queue.head = NULL;
263 request_queue.tailp = &request_queue.head;
264 request_queue.requests = 0;
265 request_queue.blocked = 0;
266
267 /* Initialize done queue */
268 if (pthread_mutex_init(&(done_queue.mutex), NULL))
269 fatal("Failed to create mutex");
270 if (pthread_cond_init(&(done_queue.cond), NULL))
271 fatal("Failed to create condition variable");
272 done_queue.head = NULL;
273 done_queue.tailp = &done_queue.head;
274 done_queue.requests = 0;
275 done_queue.blocked = 0;
cd748f27 276
55f0e6f7 277 /* Create threads and get them to sit in their wait loop */
278 aio_thread_pool = memPoolCreate("aio_thread", sizeof(aio_thread_t));
cd748f27 279 for (i = 0; i < NUMTHREADS; i++) {
55f0e6f7 280 threadp = memPoolAlloc(aio_thread_pool);
cd748f27 281 threadp->status = _THREAD_STARTING;
55f0e6f7 282 threadp->current_req = NULL;
283 threadp->requests = 0;
284 threadp->next = threads;
285 threads = threadp;
cd748f27 286 if (pthread_create(&threadp->thread, &globattr, aio_thread_loop, threadp)) {
287 fprintf(stderr, "Thread creation failed\n");
288 threadp->status = _THREAD_FAILED;
289 continue;
290 }
cd748f27 291 }
292
293 /* Create request pool */
294 aio_request_pool = memPoolCreate("aio_request", sizeof(aio_request_t));
58cd5bbd 295 aio_large_bufs = memPoolCreate("aio_large_bufs", AIO_LARGE_BUFS);
296 aio_medium_bufs = memPoolCreate("aio_medium_bufs", AIO_MEDIUM_BUFS);
297 aio_small_bufs = memPoolCreate("aio_small_bufs", AIO_SMALL_BUFS);
298 aio_tiny_bufs = memPoolCreate("aio_tiny_bufs", AIO_TINY_BUFS);
55f0e6f7 299 aio_micro_bufs = memPoolCreate("aio_micro_bufs", AIO_MICRO_BUFS);
cd748f27 300
301 aio_initialised = 1;
302}
303
304
305static void *
306aio_thread_loop(void *ptr)
307{
308 aio_thread_t *threadp = ptr;
309 aio_request_t *request;
310 sigset_t new;
cd748f27 311
312 /*
313 * Make sure to ignore signals which may possibly get sent to
314 * the parent squid thread. Causes havoc with mutex's and
315 * condition waits otherwise
316 */
317
318 sigemptyset(&new);
319 sigaddset(&new, SIGPIPE);
320 sigaddset(&new, SIGCHLD);
321#ifdef _SQUID_LINUX_THREADS_
322 sigaddset(&new, SIGQUIT);
323 sigaddset(&new, SIGTRAP);
324#else
325 sigaddset(&new, SIGUSR1);
326 sigaddset(&new, SIGUSR2);
327#endif
328 sigaddset(&new, SIGHUP);
329 sigaddset(&new, SIGTERM);
330 sigaddset(&new, SIGINT);
331 sigaddset(&new, SIGALRM);
332 pthread_sigmask(SIG_BLOCK, &new, NULL);
333
cd748f27 334 while (1) {
55f0e6f7 335 threadp->current_req = request = NULL;
336 request = NULL;
337 /* Get a request to process */
cd748f27 338 threadp->status = _THREAD_WAITING;
55f0e6f7 339 pthread_mutex_lock(&request_queue.mutex);
340 while(!request_queue.head) {
341 pthread_cond_wait(&request_queue.cond, &request_queue.mutex);
cd748f27 342 }
55f0e6f7 343 request = request_queue.head;
344 if (request)
345 request_queue.head = request->next;
346 if (!request_queue.head)
347 request_queue.tailp = &request_queue.head;
348 pthread_mutex_unlock(&request_queue.mutex);
349 /* process the request */
350 threadp->status = _THREAD_BUSY;
351 request->next = NULL;
352 threadp->current_req = request;
cd748f27 353 errno = 0;
354 if (!request->cancelled) {
355 switch (request->request_type) {
356 case _AIO_OP_OPEN:
357 aio_do_open(request);
358 break;
359 case _AIO_OP_READ:
360 aio_do_read(request);
361 break;
362 case _AIO_OP_WRITE:
363 aio_do_write(request);
364 break;
365 case _AIO_OP_CLOSE:
366 aio_do_close(request);
367 break;
368 case _AIO_OP_UNLINK:
369 aio_do_unlink(request);
370 break;
15a47d1d 371 case _AIO_OP_TRUNCATE:
372 aio_do_truncate(request);
373 break;
cd748f27 374#if AIO_OPENDIR /* Opendir not implemented yet */
375 case _AIO_OP_OPENDIR:
376 aio_do_opendir(request);
377 break;
378#endif
379 case _AIO_OP_STAT:
380 aio_do_stat(request);
381 break;
382 default:
383 request->ret = -1;
384 request->err = EINVAL;
385 break;
386 }
387 } else { /* cancelled */
388 request->ret = -1;
389 request->err = EINTR;
390 }
55f0e6f7 391 threadp->status = _THREAD_DONE;
392 /* put the request in the done queue */
393 pthread_mutex_lock(&done_queue.mutex);
394 *done_queue.tailp = request;
395 done_queue.tailp = &request->next;
396 pthread_mutex_unlock(&done_queue.mutex);
397 threadp->requests++;
398 } /* while forever */
cd748f27 399 return NULL;
400} /* aio_thread_loop */
401
402static void
55f0e6f7 403aio_queue_request(aio_request_t * request)
cd748f27 404{
cd748f27 405 static int high_start = 0;
55f0e6f7 406 debug(41, 9) ("aio_queue_request: %p type=%d result=%p\n",
407 request, request->request_type, request->resultp);
cd748f27 408 /* Mark it as not executed (failing result, no error) */
55f0e6f7 409 request->ret = -1;
410 request->err = 0;
411 /* Internal housekeeping */
412 request_queue_len += 1;
413 request->resultp->_data = request;
414 /* Play some tricks with the request_queue2 queue */
415 request->next = NULL;
416 if (!request_queue2.head) {
417 if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
418 /* Normal path */
419 *request_queue.tailp = request;
420 request_queue.tailp = &request->next;
421 pthread_cond_signal(&request_queue.cond);
422 pthread_mutex_unlock(&request_queue.mutex);
cd748f27 423 } else {
55f0e6f7 424 /* Oops, the request queue is blocked, use request_queue2 */
425 *request_queue2.tailp = request;
426 request_queue2.tailp = &request->next;
427 }
428 } else {
429 /* Secondary path. We have blocked requests to deal with */
430 /* add the request to the chain */
431 *request_queue2.tailp = request;
432 if (pthread_mutex_trylock(&request_queue.mutex) == 0) {
433 /* Ok, the queue is no longer blocked */
434 *request_queue.tailp = request_queue2.head;
435 request_queue.tailp = &request->next;
436 pthread_cond_signal(&request_queue.cond);
437 pthread_mutex_unlock(&request_queue.mutex);
438 request_queue2.head = NULL;
439 request_queue2.tailp = &request_queue2.head;
440 } else {
441 /* still blocked, bump the blocked request chain */
442 request_queue2.tailp = &request->next;
443 }
444 }
445 if (request_queue2.head) {
446 static int filter = 0;
447 static int filter_limit = 8;
448 if (++filter >= filter_limit) {
449 filter_limit += filter;
450 filter = 0;
451 debug(43, 1) ("aio_queue_request: WARNING - Queue congestion\n");
452 }
cd748f27 453 }
cd748f27 454 /* Warn if out of threads */
55f0e6f7 455 if (request_queue_len > MAGIC1) {
456 static int last_warn = 0;
457 static int queue_high, queue_low;
cd748f27 458 if (high_start == 0) {
459 high_start = squid_curtime;
460 queue_high = request_queue_len;
461 queue_low = request_queue_len;
462 }
463 if (request_queue_len > queue_high)
464 queue_high = request_queue_len;
465 if (request_queue_len < queue_low)
466 queue_low = request_queue_len;
467 if (squid_curtime >= (last_warn + 15) &&
55f0e6f7 468 squid_curtime >= (high_start + 5)) {
469 debug(43, 1) ("aio_queue_request: WARNING - Disk I/O overloading\n");
470 if (squid_curtime >= (high_start + 15))
471 debug(43, 1) ("aio_queue_request: Queue Length: current=%d, high=%d, low=%d, duration=%d\n",
472 request_queue_len, queue_high, queue_low, squid_curtime - high_start);
cd748f27 473 last_warn = squid_curtime;
474 }
475 } else {
476 high_start = 0;
477 }
55f0e6f7 478 /* Warn if seriously overloaded */
cd748f27 479 if (request_queue_len > RIDICULOUS_LENGTH) {
480 debug(43, 0) ("aio_queue_request: Async request queue growing uncontrollably!\n");
481 debug(43, 0) ("aio_queue_request: Syncing pending I/O operations.. (blocking)\n");
482 aio_sync();
483 debug(43, 0) ("aio_queue_request: Synced\n");
484 }
485} /* aio_queue_request */
486
cd748f27 487static void
488aio_cleanup_request(aio_request_t * requestp)
489{
490 aio_result_t *resultp = requestp->resultp;
491 int cancelled = requestp->cancelled;
492
493 /* Free allocated structures and copy data back to user space if the */
494 /* request hasn't been cancelled */
495 switch (requestp->request_type) {
496 case _AIO_OP_STAT:
497 if (!cancelled && requestp->ret == 0)
498 xmemcpy(requestp->statp, requestp->tmpstatp, sizeof(struct stat));
58cd5bbd 499 aio_xfree(requestp->tmpstatp, sizeof(struct stat));
55f0e6f7 500 aio_xstrfree(requestp->path);
501 break;
cd748f27 502 case _AIO_OP_OPEN:
503 if (cancelled && requestp->ret >= 0)
504 /* The open() was cancelled but completed */
505 close(requestp->ret);
55f0e6f7 506 aio_xstrfree(requestp->path);
cd748f27 507 break;
508 case _AIO_OP_CLOSE:
509 if (cancelled && requestp->ret < 0)
510 /* The close() was cancelled and never got executed */
511 close(requestp->fd);
512 break;
513 case _AIO_OP_UNLINK:
15a47d1d 514 case _AIO_OP_TRUNCATE:
cd748f27 515 case _AIO_OP_OPENDIR:
55f0e6f7 516 aio_xstrfree(requestp->path);
cd748f27 517 break;
518 case _AIO_OP_READ:
519 if (!cancelled && requestp->ret > 0)
520 xmemcpy(requestp->bufferp, requestp->tmpbufp, requestp->ret);
55f0e6f7 521 aio_xfree(requestp->tmpbufp, requestp->buflen);
522 break;
cd748f27 523 case _AIO_OP_WRITE:
58cd5bbd 524 aio_xfree(requestp->tmpbufp, requestp->buflen);
cd748f27 525 break;
526 default:
527 break;
528 }
529 if (resultp != NULL && !cancelled) {
530 resultp->aio_return = requestp->ret;
531 resultp->aio_errno = requestp->err;
532 }
533 memPoolFree(aio_request_pool, requestp);
534} /* aio_cleanup_request */
535
536
537int
538aio_cancel(aio_result_t * resultp)
539{
55f0e6f7 540 aio_request_t *request = resultp->_data;
cd748f27 541
55f0e6f7 542 if (request && request->resultp == resultp) {
543 debug(41, 9) ("aio_cancel: %p type=%d result=%p\n",
544 request, request->request_type, request->resultp);
545 request->cancelled = 1;
546 request->resultp = NULL;
547 resultp->_data = NULL;
548 return 0;
549 }
cd748f27 550 return 1;
551} /* aio_cancel */
552
553
554int
555aio_open(const char *path, int oflag, mode_t mode, aio_result_t * resultp)
556{
557 aio_request_t *requestp;
cd748f27 558
559 if (!aio_initialised)
560 aio_init();
55f0e6f7 561 requestp = memPoolAlloc(aio_request_pool);
562 requestp->path = (char *) aio_xstrdup(path);
cd748f27 563 requestp->oflag = oflag;
564 requestp->mode = mode;
565 requestp->resultp = resultp;
566 requestp->request_type = _AIO_OP_OPEN;
567 requestp->cancelled = 0;
568
55f0e6f7 569 aio_queue_request(requestp);
cd748f27 570 return 0;
571}
572
573
574static void
575aio_do_open(aio_request_t * requestp)
576{
577 requestp->ret = open(requestp->path, requestp->oflag, requestp->mode);
578 requestp->err = errno;
579}
580
581
582int
583aio_read(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * resultp)
584{
585 aio_request_t *requestp;
586
587 if (!aio_initialised)
588 aio_init();
55f0e6f7 589 requestp = memPoolAlloc(aio_request_pool);
cd748f27 590 requestp->fd = fd;
591 requestp->bufferp = bufp;
58cd5bbd 592 requestp->tmpbufp = (char *) aio_xmalloc(bufs);
cd748f27 593 requestp->buflen = bufs;
594 requestp->offset = offset;
595 requestp->whence = whence;
596 requestp->resultp = resultp;
597 requestp->request_type = _AIO_OP_READ;
598 requestp->cancelled = 0;
599
55f0e6f7 600 aio_queue_request(requestp);
cd748f27 601 return 0;
602}
603
604
605static void
606aio_do_read(aio_request_t * requestp)
607{
608 lseek(requestp->fd, requestp->offset, requestp->whence);
609 requestp->ret = read(requestp->fd, requestp->tmpbufp, requestp->buflen);
610 requestp->err = errno;
611}
612
613
614int
615aio_write(int fd, char *bufp, int bufs, off_t offset, int whence, aio_result_t * resultp)
616{
617 aio_request_t *requestp;
618
619 if (!aio_initialised)
620 aio_init();
55f0e6f7 621 requestp = memPoolAlloc(aio_request_pool);
cd748f27 622 requestp->fd = fd;
58cd5bbd 623 requestp->tmpbufp = (char *) aio_xmalloc(bufs);
cd748f27 624 xmemcpy(requestp->tmpbufp, bufp, bufs);
625 requestp->buflen = bufs;
626 requestp->offset = offset;
627 requestp->whence = whence;
628 requestp->resultp = resultp;
629 requestp->request_type = _AIO_OP_WRITE;
630 requestp->cancelled = 0;
631
55f0e6f7 632 aio_queue_request(requestp);
cd748f27 633 return 0;
634}
635
636
637static void
638aio_do_write(aio_request_t * requestp)
639{
640 requestp->ret = write(requestp->fd, requestp->tmpbufp, requestp->buflen);
641 requestp->err = errno;
642}
643
644
645int
646aio_close(int fd, aio_result_t * resultp)
647{
648 aio_request_t *requestp;
649
650 if (!aio_initialised)
651 aio_init();
55f0e6f7 652 requestp = memPoolAlloc(aio_request_pool);
cd748f27 653 requestp->fd = fd;
654 requestp->resultp = resultp;
655 requestp->request_type = _AIO_OP_CLOSE;
656 requestp->cancelled = 0;
657
55f0e6f7 658 aio_queue_request(requestp);
cd748f27 659 return 0;
660}
661
662
663static void
664aio_do_close(aio_request_t * requestp)
665{
666 requestp->ret = close(requestp->fd);
667 requestp->err = errno;
668}
669
670
671int
672aio_stat(const char *path, struct stat *sb, aio_result_t * resultp)
673{
674 aio_request_t *requestp;
cd748f27 675
676 if (!aio_initialised)
677 aio_init();
55f0e6f7 678 requestp = memPoolAlloc(aio_request_pool);
679 requestp->path = (char *) aio_xstrdup(path);
cd748f27 680 requestp->statp = sb;
58cd5bbd 681 requestp->tmpstatp = (struct stat *) aio_xmalloc(sizeof(struct stat));
cd748f27 682 requestp->resultp = resultp;
683 requestp->request_type = _AIO_OP_STAT;
684 requestp->cancelled = 0;
685
55f0e6f7 686 aio_queue_request(requestp);
cd748f27 687 return 0;
688}
689
690
691static void
692aio_do_stat(aio_request_t * requestp)
693{
694 requestp->ret = stat(requestp->path, requestp->tmpstatp);
695 requestp->err = errno;
696}
697
698
699int
700aio_unlink(const char *path, aio_result_t * resultp)
701{
702 aio_request_t *requestp;
cd748f27 703
704 if (!aio_initialised)
705 aio_init();
55f0e6f7 706 requestp = memPoolAlloc(aio_request_pool);
707 requestp->path = aio_xstrdup(path);
cd748f27 708 requestp->resultp = resultp;
709 requestp->request_type = _AIO_OP_UNLINK;
710 requestp->cancelled = 0;
711
55f0e6f7 712 aio_queue_request(requestp);
cd748f27 713 return 0;
714}
715
716
717static void
718aio_do_unlink(aio_request_t * requestp)
719{
720 requestp->ret = unlink(requestp->path);
721 requestp->err = errno;
722}
723
15a47d1d 724int
725aio_truncate(const char *path, off_t length, aio_result_t * resultp)
726{
727 aio_request_t *requestp;
15a47d1d 728
729 if (!aio_initialised)
730 aio_init();
55f0e6f7 731 requestp = memPoolAlloc(aio_request_pool);
732 requestp->path = (char *) aio_xstrdup(path);
15a47d1d 733 requestp->offset = length;
15a47d1d 734 requestp->resultp = resultp;
735 requestp->request_type = _AIO_OP_TRUNCATE;
736 requestp->cancelled = 0;
737
55f0e6f7 738 aio_queue_request(requestp);
15a47d1d 739 return 0;
740}
741
742
743static void
744aio_do_truncate(aio_request_t * requestp)
745{
746 requestp->ret = truncate(requestp->path, requestp->offset);
747 requestp->err = errno;
748}
749
cd748f27 750
751#if AIO_OPENDIR
55f0e6f7 752/* XXX aio_opendir NOT implemented yet.. */
cd748f27 753
754int
755aio_opendir(const char *path, aio_result_t * resultp)
756{
757 aio_request_t *requestp;
758 int len;
759
760 if (!aio_initialised)
761 aio_init();
55f0e6f7 762 requestp = memPoolAlloc(aio_request_pool);
cd748f27 763 return -1;
764}
765
766static void
767aio_do_opendir(aio_request_t * requestp)
768{
769 /* NOT IMPLEMENTED */
770}
771
772#endif
773
55f0e6f7 774static void
775aio_poll_queues(void)
776{
777 /* kick "overflow" request queue */
778 if (request_queue2.head &&
4672d0cd 779 pthread_mutex_trylock(&request_queue.mutex) == 0) {
55f0e6f7 780 *request_queue.tailp = request_queue2.head;
781 request_queue.tailp = request_queue2.tailp;
782 pthread_cond_signal(&request_queue.cond);
783 pthread_mutex_unlock(&request_queue.mutex);
784 request_queue2.head = NULL;
785 request_queue2.tailp = &request_queue2.head;
786 }
787 /* poll done queue */
788 if (done_queue.head && pthread_mutex_trylock(&done_queue.mutex) == 0) {
789 struct aio_request_t *requests = done_queue.head;
790 done_queue.head = NULL;
791 done_queue.tailp = &done_queue.head;
792 pthread_mutex_unlock(&done_queue.mutex);
793 *done_requests.tailp = requests;
794 request_queue_len -= 1;
4672d0cd 795 while (requests->next) {
55f0e6f7 796 requests = requests->next;
797 request_queue_len -= 1;
cd748f27 798 }
55f0e6f7 799 done_requests.tailp = &requests->next;
800 }
801 /* Give up the CPU to allow the threads to do their work */
4672d0cd 802 /*
803 * For Andres thoughts about yield(), see
804 * http://www.squid-cache.org/mail-archive/squid-dev/200012/0001.html
805 */
55f0e6f7 806 if (done_queue.head || request_queue.head)
4672d0cd 807#ifndef _SQUID_SOLARIS_
55f0e6f7 808 sched_yield();
4672d0cd 809#else
810 yield();
811#endif
55f0e6f7 812}
cd748f27 813
814aio_result_t *
815aio_poll_done(void)
816{
55f0e6f7 817 aio_request_t *request;
cd748f27 818 aio_result_t *resultp;
819 int cancelled;
55f0e6f7 820 int polled = 0;
cd748f27 821
822 AIO_REPOLL:
55f0e6f7 823 request = done_requests.head;
824 if (request == NULL && !polled) {
825 aio_poll_queues();
826 polled = 1;
827 request = done_requests.head;
828 }
829 if (!request) {
cd748f27 830 return NULL;
831 }
55f0e6f7 832 debug(41, 9) ("aio_poll_done: %p type=%d result=%p\n",
833 request, request->request_type, request->resultp);
834 done_requests.head = request->next;
835 if (!done_requests.head)
836 done_requests.tailp = &done_requests.head;
837 resultp = request->resultp;
838 cancelled = request->cancelled;
839 aio_debug(request);
840 debug(43, 5) ("DONE: %d -> %d\n", request->ret, request->err);
841 aio_cleanup_request(request);
cd748f27 842 if (cancelled)
843 goto AIO_REPOLL;
844 return resultp;
845} /* aio_poll_done */
846
847int
848aio_operations_pending(void)
849{
55f0e6f7 850 return request_queue_len + (done_requests.head ? 1 : 0);
cd748f27 851}
852
853int
854aio_sync(void)
855{
55f0e6f7 856 /* XXX This might take a while if the queue is large.. */
cd748f27 857 do {
55f0e6f7 858 aio_poll_queues();
cd748f27 859 } while (request_queue_len > 0);
860 return aio_operations_pending();
861}
862
863int
864aio_get_queue_len(void)
865{
866 return request_queue_len;
867}
868
869static void
55f0e6f7 870aio_debug(aio_request_t * request)
cd748f27 871{
55f0e6f7 872 switch (request->request_type) {
cd748f27 873 case _AIO_OP_OPEN:
55f0e6f7 874 debug(43, 5) ("OPEN of %s to FD %d\n", request->path, request->ret);
cd748f27 875 break;
876 case _AIO_OP_READ:
55f0e6f7 877 debug(43, 5) ("READ on fd: %d\n", request->fd);
cd748f27 878 break;
879 case _AIO_OP_WRITE:
55f0e6f7 880 debug(43, 5) ("WRITE on fd: %d\n", request->fd);
cd748f27 881 break;
882 case _AIO_OP_CLOSE:
55f0e6f7 883 debug(43, 5) ("CLOSE of fd: %d\n", request->fd);
cd748f27 884 break;
885 case _AIO_OP_UNLINK:
55f0e6f7 886 debug(43, 5) ("UNLINK of %s\n", request->path);
cd748f27 887 break;
15a47d1d 888 case _AIO_OP_TRUNCATE:
55f0e6f7 889 debug(43, 5) ("UNLINK of %s\n", request->path);
15a47d1d 890 break;
cd748f27 891 default:
892 break;
893 }
894}