]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Commit the basic code to implement async io via posix aio.
authoradrian <>
Sun, 12 Aug 2001 20:58:24 +0000 (20:58 +0000)
committeradrian <>
Sun, 12 Aug 2001 20:58:24 +0000 (20:58 +0000)
only read is done at the moment - write and sync haven't been
implemented yet.

src/fs/coss/async_io.cc
src/fs/coss/async_io.h

index 3288d977ee5df60c0ea60d4fcf1d4d5495bd584d..55d46cf4d87e0dda2f39c10fd9ecf8784011786e 100644 (file)
@@ -11,7 +11,7 @@
  * supports are read/write, and since COSS works on a single file
  * per storedir it should work just fine.
  *
- * $Id: async_io.cc,v 1.2 2001/08/12 14:02:01 adrian Exp $
+ * $Id: async_io.cc,v 1.3 2001/08/12 14:58:24 adrian Exp $
  */
 
 #include "squid.h"
@@ -57,16 +57,47 @@ void
 a_file_read(async_queue_t *q, int fd, void *buf, int req_len, off_t offset,
   DRCB *callback, void *data)
 {
+       int slot;
+       async_queue_entry_t *qe;
+
        assert(q->aq_state == AQ_STATE_SETUP);
 
 #if 0
        file_read(fd, buf, req_len, offset, callback, data);
 #endif
        /* Find a free slot */
+       slot = a_file_findslot(q);
+       if (slot < 0) {
                /* No free slot? Callback error, and return */
+               fatal("Aiee! out of aiocb slots!\n");
+       }
 
        /* Mark slot as ours */
+       qe = &q->aq_queue[slot];
+       qe->aq_e_state = AQ_ENTRY_USED;
+       qe->aq_e_callback.read = callback;
+       qe->aq_e_callback_data = data;
+       qe->aq_e_type = AQ_ENTRY_READ;
+       qe->aq_e_free = NULL;
+       qe->aq_e_buf = buf;
+       qe->aq_e_fd = fd;
+
+       qe->aq_e_aiocb.aio_fildes = fd;
+       qe->aq_e_aiocb.aio_nbytes = req_len;
+       qe->aq_e_aiocb.aio_offset = offset;
+       qe->aq_e_aiocb.aio_buf = buf;
+
+       /* Account */
+       q->aq_numpending++;
+
+       /* Lock */
+       cbdataLock(data);
+
        /* Initiate aio */
+       if (aio_read(&qe->aq_e_aiocb) < 0) {
+               debug(1, 1)("Aiee! aio_read() returned error (%d)!\n", errno);
+               assert(1==0);
+       }
 }
 
 
@@ -76,9 +107,7 @@ a_file_write(async_queue_t *q, int fd, off_t offset, void *buf, int len,
 {
        assert(q->aq_state == AQ_STATE_SETUP);
 
-#if 0
        file_write(fd, offset, buf, len, callback, data, freefunc);
-#endif
        /* Find a free slot */
                /* No free slot? Callback error, and return */
        /* Mark slot as ours */
@@ -97,16 +126,60 @@ a_file_write(async_queue_t *q, int fd, off_t offset, void *buf, int len,
 int
 a_file_callback(async_queue_t *q)
 {
+       int i;
+       int completed = 0;
+       int retval, reterr;
+       DRCB *rc;
+       DWCB *wc;
+       FREE *freefunc;
+       void *callback_data;
+       void *buf;
+       int fd;
+       async_queue_entry_t *aqe;
+       async_queue_entry_type_t type;
+
        assert(q->aq_state == AQ_STATE_SETUP);
 
        /* Loop through all slots */
-               /* Active, get status */
-                       /* Ready? Grab the state locally */
-                       /* Free the state */
-                       /* Call callback */
-
+       for (i = 0; i < MAX_ASYNCOP; i++) {
+               if (q->aq_queue[i].aq_e_state == AQ_ENTRY_USED) {
+                       aqe = &q->aq_queue[i];
+                       /* Active, get status */
+                       reterr = aio_error(&aqe->aq_e_aiocb);
+                       if (reterr < 0) {
+                               fatal("aio_error returned an error!\n");
+                       }
+                       if (reterr != EINPROGRESS) {
+                               /* Get the return code */
+                               retval = aio_return(&aqe->aq_e_aiocb);
+
+                               /* Get the callback parameters */
+                               callback_data = aqe->aq_e_callback_data;
+                               freefunc = aqe->aq_e_free;
+                               rc = aqe->aq_e_callback.read;
+                               wc = aqe->aq_e_callback.write;
+                               buf = aqe->aq_e_buf;
+                               fd = aqe->aq_e_fd;
+                               type = aqe->aq_e_type;
+
+                               /* Free slot */
+                               bzero(aqe, sizeof(async_queue_entry_t));
+                               aqe->aq_e_state = AQ_ENTRY_FREE;
+                               q->aq_numpending--;
+
+                               /* Callback */
+                               if (cbdataValid(callback_data)) {
+                                       if (type == AQ_ENTRY_READ)
+                                               rc(fd, buf, retval, reterr, callback_data);
+                                       if (type == AQ_ENTRY_WRITE)
+                                               wc(fd, reterr, retval, callback_data);
+                               }
+                               cbdataUnlock(callback_data);
+                       }
+               }
 
-       return 0;
+       }
+       return completed;
 }
 
 
index 348a5a58f290457fdfab0d4a5c884de38f75864e..71f8d4a0e953f25f67e193d8e3c7815ea7142c46 100644 (file)
@@ -13,6 +13,12 @@ typedef enum {
        AQ_ENTRY_USED
 } async_queue_entry_state_t;
 
+typedef enum {
+       AQ_ENTRY_NONE,
+       AQ_ENTRY_READ,
+       AQ_ENTRY_WRITE
+} async_queue_entry_type_t;
+
 
 typedef struct _async_queue_entry async_queue_entry_t;
 typedef struct _async_queue async_queue_t;
@@ -20,12 +26,16 @@ typedef struct _async_queue async_queue_t;
 /* An async queue entry */
 struct _async_queue_entry {
        async_queue_entry_state_t aq_e_state;
+       async_queue_entry_type_t aq_e_type;
        struct aiocb aq_e_aiocb;
        union {
                DRCB *read;
                DWCB *write;
-       } callback;
-       void *callback_data;
+       } aq_e_callback;
+       void *aq_e_callback_data;
+       FREE *aq_e_free;
+       int aq_e_fd;
+       void *aq_e_buf;
 };
 
 /* An async queue */