From: Dmitry Kurochkin Date: Mon, 25 Apr 2011 19:24:35 +0000 (+0400) Subject: Use shared pages in IpcIoFile instead of passing data through shared queues. X-Git-Tag: take07~38 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=8ed94021d5a7d09070f81cddaf52d23b02454e40;p=thirdparty%2Fsquid.git Use shared pages in IpcIoFile instead of passing data through shared queues. --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index ee323e2c31..1cc4a81f78 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -14,6 +14,7 @@ #include "ipc/Port.h" #include "ipc/StrandSearch.h" #include "ipc/UdsOp.h" +#include "ipc/mem/Pages.h" CBDATA_CLASS_INIT(IpcIoFile); @@ -201,7 +202,7 @@ IpcIoFile::read(ReadRequest *readRequest) void IpcIoFile::readCompleted(ReadRequest *readRequest, - const IpcIoMsg *const response) + IpcIoMsg *const response) { bool ioError = false; if (!response) { @@ -211,10 +212,18 @@ IpcIoFile::readCompleted(ReadRequest *readRequest, if (response->xerrno) { debugs(79,1, HERE << "error: " << xstrerr(response->xerrno)); ioError = error_ = true; + } + else + if (!response->page) { + debugs(79,1, HERE << "error: run out of shared memory pages"); + ioError = true; } else { - memcpy(readRequest->buf, response->buf, response->len); + const char *const buf = Ipc::Mem::PagePointer(response->page); + memcpy(readRequest->buf, buf, response->len); } + Ipc::Mem::PutPage(response->page); + const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len; const int errflag = ioError ? DISK_ERROR : DISK_OK; ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest); @@ -300,24 +309,27 @@ IpcIoFile::push(IpcIoPendingRequest *const pending) Must(pending->readRequest || pending->writeRequest); IpcIoMsg ipcIo; - ipcIo.requestId = lastRequestId; - if (pending->readRequest) { - ipcIo.command = IpcIo::cmdRead; - ipcIo.offset = pending->readRequest->offset; - ipcIo.len = pending->readRequest->len; - assert(ipcIo.len <= sizeof(ipcIo.buf)); - memcpy(ipcIo.buf, pending->readRequest->buf, ipcIo.len); // optimize away - } else { // pending->writeRequest - ipcIo.command = IpcIo::cmdWrite; - ipcIo.offset = pending->writeRequest->offset; - ipcIo.len = pending->writeRequest->len; - assert(ipcIo.len <= sizeof(ipcIo.buf)); - memcpy(ipcIo.buf, pending->writeRequest->buf, ipcIo.len); // optimize away - } + try { + ipcIo.requestId = lastRequestId; + if (pending->readRequest) { + ipcIo.command = IpcIo::cmdRead; + ipcIo.offset = pending->readRequest->offset; + ipcIo.len = pending->readRequest->len; + } else { // pending->writeRequest + Must(pending->writeRequest->len <= Ipc::Mem::PageSize()); + if (!Ipc::Mem::GetPage(ipcIo.page)) { + ipcIo.len = 0; + throw TexcHere("run out of shared memory pages"); + } + ipcIo.command = IpcIo::cmdWrite; + ipcIo.offset = pending->writeRequest->offset; + ipcIo.len = pending->writeRequest->len; + char *const buf = Ipc::Mem::PagePointer(ipcIo.page); + memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away + } - debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->pushQueue->size()); + debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->pushQueue->size()); - try { if (workerQueue->push(ipcIo)) Notify(diskId); // must notify disker trackPendingRequest(pending); @@ -325,7 +337,11 @@ IpcIoFile::push(IpcIoPendingRequest *const pending) debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " << SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len // TODO: grow queue size - + + pending->completeIo(NULL); // XXX: should distinguish this from timeout + delete pending; + } catch (const TextException &e) { + debugs(47, DBG_IMPORTANT, HERE << e.what()); pending->completeIo(NULL); // XXX: should distinguish this from timeout delete pending; } @@ -370,7 +386,7 @@ IpcIoFile::handleResponses(const char *when) } void -IpcIoFile::handleResponse(const IpcIoMsg &ipcIo) +IpcIoFile::handleResponse(IpcIoMsg &ipcIo) { const int requestId = ipcIo.requestId; debugs(47, 7, HERE << "popped disker response: " << @@ -530,7 +546,7 @@ IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile): } void -IpcIoPendingRequest::completeIo(const IpcIoMsg *const response) +IpcIoPendingRequest::completeIo(IpcIoMsg *const response) { if (readRequest) file->readCompleted(readRequest, response); @@ -552,7 +568,14 @@ static int TheFile = -1; ///< db file descriptor static void diskerRead(IpcIoMsg &ipcIo) { - const ssize_t read = pread(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset); + if (!Ipc::Mem::GetPage(ipcIo.page)) { + ipcIo.len = 0; + debugs(47,5, HERE << "run out of shared memory pages"); + return; + } + + char *const buf = Ipc::Mem::PagePointer(ipcIo.page); + const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset); statCounter.syscalls.disk.reads++; fd_bytes(TheFile, read, FD_READ); @@ -573,7 +596,8 @@ diskerRead(IpcIoMsg &ipcIo) static void diskerWrite(IpcIoMsg &ipcIo) { - const ssize_t wrote = pwrite(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset); + const char *const buf = Ipc::Mem::PagePointer(ipcIo.page); + const ssize_t wrote = pwrite(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset); statCounter.syscalls.disk.writes++; fd_bytes(TheFile, wrote, FD_WRITE); @@ -589,6 +613,8 @@ diskerWrite(IpcIoMsg &ipcIo) debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " << ipcIo.xerrno); } + + Ipc::Mem::PutPage(ipcIo.page); } void diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h index de8e85b9f0..fd0865e22b 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.h +++ b/src/DiskIO/IpcIo/IpcIoFile.h @@ -7,6 +7,7 @@ #include "DiskIO/IORequestor.h" #include "ipc/forward.h" #include "ipc/Queue.h" +#include "ipc/mem/Page.h" #include #include @@ -16,8 +17,6 @@ namespace IpcIo { /// what kind of I/O the disker needs to do or have done typedef enum { cmdNone, cmdOpen, cmdRead, cmdWrite } Command; -enum { BufCapacity = 32*1024 }; - } // namespace IpcIo @@ -29,9 +28,9 @@ public: public: unsigned int requestId; ///< unique for requestor; matches request w/ response - char buf[IpcIo::BufCapacity]; // XXX: inefficient off_t offset; size_t len; + Ipc::Mem::PageId page; IpcIo::Command command; ///< what disker is supposed to do or did @@ -70,7 +69,7 @@ public: protected: friend class IpcIoPendingRequest; void openCompleted(const Ipc::StrandSearchResponse *const response); - void readCompleted(ReadRequest *readRequest, const IpcIoMsg *const response); + void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response); void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response); private: @@ -87,7 +86,7 @@ private: void handleNotification(); void handleResponses(const char *when); - void handleResponse(const IpcIoMsg &ipcIo); + void handleResponse(IpcIoMsg &ipcIo); static void DiskerHandleRequests(const int workerId); static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo); @@ -135,7 +134,7 @@ public: IpcIoPendingRequest(const IpcIoFile::Pointer &aFile); /// called when response is received and, with a nil response, on timeouts - void completeIo(const IpcIoMsg *const response); + void completeIo(IpcIoMsg *const response); public: const IpcIoFile::Pointer file; ///< the file object waiting for the response diff --git a/src/ipc/mem/Page.h b/src/ipc/mem/Page.h index fabab730a7..b8edf95b9e 100644 --- a/src/ipc/mem/Page.h +++ b/src/ipc/mem/Page.h @@ -19,6 +19,8 @@ class PageId { public: PageId(): pool(0), number(0) {} + operator bool() const { return pool && number; } + uint32_t pool; ///< page pool ID within Squid // uint32_t segment; ///< memory segment ID within the pool; unused for now uint32_t number; ///< page number within the segment diff --git a/src/ipc/mem/PagePool.cc b/src/ipc/mem/PagePool.cc index 8cd9bf0283..843b28f1ae 100644 --- a/src/ipc/mem/PagePool.cc +++ b/src/ipc/mem/PagePool.cc @@ -30,7 +30,7 @@ Ipc::Mem::PagePool::PagePool(const char *const id): theBuf = reinterpret_cast(pageIndex.getRaw()) + pagesDataOffset; } -void * +char * Ipc::Mem::PagePool::pagePointer(const PageId &page) { Must(pageIndex->pageIdIsValid(page)); diff --git a/src/ipc/mem/PagePool.h b/src/ipc/mem/PagePool.h index 2658fc4216..f2b0ef827b 100644 --- a/src/ipc/mem/PagePool.h +++ b/src/ipc/mem/PagePool.h @@ -34,7 +34,7 @@ public: /// makes identified page available as a free page to future get() callers void put(PageId &page) { return pageIndex->push(page); } /// converts page handler into a temporary writeable shared memory pointer - void *pagePointer(const PageId &page); + char *pagePointer(const PageId &page); private: Ipc::Mem::Pointer pageIndex; ///< free pages index diff --git a/src/ipc/mem/PageStack.cc b/src/ipc/mem/PageStack.cc index fca378b72b..6061e3fbca 100644 --- a/src/ipc/mem/PageStack.cc +++ b/src/ipc/mem/PageStack.cc @@ -35,6 +35,8 @@ Ipc::Mem::PageStack::PageStack(const uint32_t aPoolId, const unsigned int aCapac bool Ipc::Mem::PageStack::pop(PageId &page) { + Must(!page); + // we may fail to dequeue, but be conservative to prevent long searches --theSize; @@ -68,6 +70,9 @@ Ipc::Mem::PageStack::pop(PageId &page) void Ipc::Mem::PageStack::push(PageId &page) { + if (!page) + return; + Must(pageIdIsValid(page)); // find a Writable slot, starting with theFirstWritable and going right while (theSize < theCapacity) { diff --git a/src/ipc/mem/Pages.cc b/src/ipc/mem/Pages.cc index c828c4e7f9..89b3ab3d80 100644 --- a/src/ipc/mem/Pages.cc +++ b/src/ipc/mem/Pages.cc @@ -39,7 +39,7 @@ Ipc::Mem::PutPage(PageId &page) ThePagePool->put(page); } -void * +char * Ipc::Mem::PagePointer(const PageId &page) { Must(ThePagePool); diff --git a/src/ipc/mem/Pages.h b/src/ipc/mem/Pages.h index db22271435..9e54c4bae1 100644 --- a/src/ipc/mem/Pages.h +++ b/src/ipc/mem/Pages.h @@ -21,7 +21,7 @@ bool GetPage(PageId &page); void PutPage(PageId &page); /// converts page handler into a temporary writeable shared memory pointer -void *PagePointer(const PageId &page); +char *PagePointer(const PageId &page); /* Limits and statistics */