]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Use shared pages in IpcIoFile instead of passing data through shared queues.
authorDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Mon, 25 Apr 2011 19:24:35 +0000 (23:24 +0400)
committerDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Mon, 25 Apr 2011 19:24:35 +0000 (23:24 +0400)
src/DiskIO/IpcIo/IpcIoFile.cc
src/DiskIO/IpcIo/IpcIoFile.h
src/ipc/mem/Page.h
src/ipc/mem/PagePool.cc
src/ipc/mem/PagePool.h
src/ipc/mem/PageStack.cc
src/ipc/mem/Pages.cc
src/ipc/mem/Pages.h

index ee323e2c31b52f1722b8b01cf72bc634a3cf48b5..1cc4a81f78b002083f49fdd1a48351776c8f3c2c 100644 (file)
@@ -14,6 +14,7 @@
 #include "ipc/Port.h"
 #include "ipc/StrandSearch.h"
 #include "ipc/UdsOp.h"
+#include "ipc/mem/Pages.h"
 
 CBDATA_CLASS_INIT(IpcIoFile);
 
@@ -201,7 +202,7 @@ IpcIoFile::read(ReadRequest *readRequest)
 
 void
 IpcIoFile::readCompleted(ReadRequest *readRequest,
-                         const IpcIoMsg *const response)
+                         IpcIoMsg *const response)
 {
     bool ioError = false;
     if (!response) {
@@ -211,10 +212,18 @@ IpcIoFile::readCompleted(ReadRequest *readRequest,
     if (response->xerrno) {
         debugs(79,1, HERE << "error: " << xstrerr(response->xerrno));
         ioError = error_ = true;
+    }
+    else
+    if (!response->page) {
+        debugs(79,1, HERE << "error: run out of shared memory pages");
+        ioError = true;
     } else {
-        memcpy(readRequest->buf, response->buf, response->len);
+        const char *const buf = Ipc::Mem::PagePointer(response->page);
+        memcpy(readRequest->buf, buf, response->len);
     }
 
+    Ipc::Mem::PutPage(response->page);
+
     const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
     const int errflag = ioError ? DISK_ERROR : DISK_OK;
     ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
@@ -300,24 +309,27 @@ IpcIoFile::push(IpcIoPendingRequest *const pending)
     Must(pending->readRequest || pending->writeRequest);
 
     IpcIoMsg ipcIo;
-    ipcIo.requestId = lastRequestId;
-    if (pending->readRequest) {
-        ipcIo.command = IpcIo::cmdRead;
-        ipcIo.offset = pending->readRequest->offset;
-        ipcIo.len = pending->readRequest->len;
-        assert(ipcIo.len <= sizeof(ipcIo.buf));
-        memcpy(ipcIo.buf, pending->readRequest->buf, ipcIo.len); // optimize away
-    } else { // pending->writeRequest
-        ipcIo.command = IpcIo::cmdWrite;
-        ipcIo.offset = pending->writeRequest->offset;
-        ipcIo.len = pending->writeRequest->len;
-        assert(ipcIo.len <= sizeof(ipcIo.buf));
-        memcpy(ipcIo.buf, pending->writeRequest->buf, ipcIo.len); // optimize away
-    }
+    try {
+        ipcIo.requestId = lastRequestId;
+        if (pending->readRequest) {
+            ipcIo.command = IpcIo::cmdRead;
+            ipcIo.offset = pending->readRequest->offset;
+            ipcIo.len = pending->readRequest->len;
+        } else { // pending->writeRequest
+            Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
+            if (!Ipc::Mem::GetPage(ipcIo.page)) {
+                ipcIo.len = 0;
+                throw TexcHere("run out of shared memory pages");
+            }
+            ipcIo.command = IpcIo::cmdWrite;
+            ipcIo.offset = pending->writeRequest->offset;
+            ipcIo.len = pending->writeRequest->len;
+            char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
+            memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
+        }
 
-    debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->pushQueue->size());
+        debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->pushQueue->size());
 
-    try {
         if (workerQueue->push(ipcIo))
             Notify(diskId); // must notify disker
         trackPendingRequest(pending);
@@ -325,7 +337,11 @@ IpcIoFile::push(IpcIoPendingRequest *const pending)
         debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " <<
                SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
         // TODO: grow queue size
-
+        pending->completeIo(NULL); // XXX: should distinguish this from timeout
+        delete pending;
+    } catch (const TextException &e) {
+        debugs(47, DBG_IMPORTANT, HERE << e.what());
         pending->completeIo(NULL); // XXX: should distinguish this from timeout
         delete pending;
     }
@@ -370,7 +386,7 @@ IpcIoFile::handleResponses(const char *when)
 }
 
 void
-IpcIoFile::handleResponse(const IpcIoMsg &ipcIo)
+IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
 {
     const int requestId = ipcIo.requestId;
     debugs(47, 7, HERE << "popped disker response: " <<
@@ -530,7 +546,7 @@ IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
 }
 
 void
-IpcIoPendingRequest::completeIo(const IpcIoMsg *const response)
+IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
 {
     if (readRequest)
         file->readCompleted(readRequest, response);
@@ -552,7 +568,14 @@ static int TheFile = -1; ///< db file descriptor
 static void
 diskerRead(IpcIoMsg &ipcIo)
 {
-    const ssize_t read = pread(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
+    if (!Ipc::Mem::GetPage(ipcIo.page)) {
+        ipcIo.len = 0;
+        debugs(47,5, HERE << "run out of shared memory pages");
+        return;
+    }
+
+    char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
+    const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
     statCounter.syscalls.disk.reads++;
     fd_bytes(TheFile, read, FD_READ);
 
@@ -573,7 +596,8 @@ diskerRead(IpcIoMsg &ipcIo)
 static void
 diskerWrite(IpcIoMsg &ipcIo)
 {
-    const ssize_t wrote = pwrite(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
+    const char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
+    const ssize_t wrote = pwrite(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
     statCounter.syscalls.disk.writes++;
     fd_bytes(TheFile, wrote, FD_WRITE);
 
@@ -589,6 +613,8 @@ diskerWrite(IpcIoMsg &ipcIo)
         debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " <<
                ipcIo.xerrno);
     }
+
+    Ipc::Mem::PutPage(ipcIo.page);
 }
 
 void
index de8e85b9f05489760f643ceeaba874af69331184..fd0865e22b784fcc5e02c5d6375352a561412971 100644 (file)
@@ -7,6 +7,7 @@
 #include "DiskIO/IORequestor.h"
 #include "ipc/forward.h"
 #include "ipc/Queue.h"
+#include "ipc/mem/Page.h"
 #include <list>
 #include <map>
 
@@ -16,8 +17,6 @@ namespace IpcIo {
 /// what kind of I/O the disker needs to do or have done
 typedef enum { cmdNone, cmdOpen, cmdRead, cmdWrite } Command;
 
-enum { BufCapacity = 32*1024 };
-
 } // namespace IpcIo
 
 
@@ -29,9 +28,9 @@ public:
 public:
     unsigned int requestId; ///< unique for requestor; matches request w/ response
 
-    char buf[IpcIo::BufCapacity]; // XXX: inefficient
     off_t offset;
     size_t len;
+    Ipc::Mem::PageId page;
 
     IpcIo::Command command; ///< what disker is supposed to do or did
 
@@ -70,7 +69,7 @@ public:
 protected:
     friend class IpcIoPendingRequest;
     void openCompleted(const Ipc::StrandSearchResponse *const response);
-    void readCompleted(ReadRequest *readRequest, const IpcIoMsg *const response);
+    void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response);
     void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response);
 
 private:
@@ -87,7 +86,7 @@ private:
 
     void handleNotification();
     void handleResponses(const char *when);
-    void handleResponse(const IpcIoMsg &ipcIo);
+    void handleResponse(IpcIoMsg &ipcIo);
 
     static void DiskerHandleRequests(const int workerId);
     static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
@@ -135,7 +134,7 @@ public:
     IpcIoPendingRequest(const IpcIoFile::Pointer &aFile);
 
     /// called when response is received and, with a nil response, on timeouts
-    void completeIo(const IpcIoMsg *const response);
+    void completeIo(IpcIoMsg *const response);
 
 public:
     const IpcIoFile::Pointer file; ///< the file object waiting for the response
index fabab730a78acbf71427410af3d05caca38434e0..b8edf95b9e0e3868f42852f5e6fea3f5b054a988 100644 (file)
@@ -19,6 +19,8 @@ class PageId {
 public:
     PageId(): pool(0), number(0) {}
 
+    operator bool() const { return pool && number; }
+
     uint32_t pool; ///< page pool ID within Squid
     // uint32_t segment; ///< memory segment ID within the pool; unused for now
     uint32_t number; ///< page number within the segment
index 8cd9bf0283f365f31ab656491d3dd6bd78e39847..843b28f1aea1334887af23a6fec63ec20a151d1e 100644 (file)
@@ -30,7 +30,7 @@ Ipc::Mem::PagePool::PagePool(const char *const id):
     theBuf = reinterpret_cast<char *>(pageIndex.getRaw()) + pagesDataOffset;
 }
 
-void *
+char *
 Ipc::Mem::PagePool::pagePointer(const PageId &page)
 {
     Must(pageIndex->pageIdIsValid(page));
index 2658fc4216e31f92f3b224109bfd2c541569a598..f2b0ef827b0ef4f8ecf6e80672f54e3affacc324 100644 (file)
@@ -34,7 +34,7 @@ public:
     /// makes identified page available as a free page to future get() callers
     void put(PageId &page) { return pageIndex->push(page); }
     /// converts page handler into a temporary writeable shared memory pointer
-    void *pagePointer(const PageId &page);
+    char *pagePointer(const PageId &page);
 
 private:
     Ipc::Mem::Pointer<PageStack> pageIndex; ///< free pages index
index fca378b72bffe399f7d99e2bd976cdd7b4914cc1..6061e3fbca04697befa40a3380a301a16a4cc5d5 100644 (file)
@@ -35,6 +35,8 @@ Ipc::Mem::PageStack::PageStack(const uint32_t aPoolId, const unsigned int aCapac
 bool
 Ipc::Mem::PageStack::pop(PageId &page)
 {
+    Must(!page);
+
     // we may fail to dequeue, but be conservative to prevent long searches
     --theSize;
 
@@ -68,6 +70,9 @@ Ipc::Mem::PageStack::pop(PageId &page)
 void
 Ipc::Mem::PageStack::push(PageId &page)
 {
+    if (!page)
+        return;
+
     Must(pageIdIsValid(page));
     // find a Writable slot, starting with theFirstWritable and going right
     while (theSize < theCapacity) {
index c828c4e7f97516b40b4f9848e69d749a951ff40c..89b3ab3d8011570651e82445602cb023f9e90e3b 100644 (file)
@@ -39,7 +39,7 @@ Ipc::Mem::PutPage(PageId &page)
     ThePagePool->put(page);
 }
 
-void *
+char *
 Ipc::Mem::PagePointer(const PageId &page)
 {
     Must(ThePagePool);
index db22271435c9456093296ad84bbdb9e92c697d90..9e54c4bae1a22a203b70fe7162ee465dbe9b74fa 100644 (file)
@@ -21,7 +21,7 @@ bool GetPage(PageId &page);
 void PutPage(PageId &page);
 
 /// converts page handler into a temporary writeable shared memory pointer
-void *PagePointer(const PageId &page);
+char *PagePointer(const PageId &page);
 
 
 /* Limits and statistics */