From: Dmitry Kurochkin Date: Thu, 17 Feb 2011 11:37:59 +0000 (+0300) Subject: IpcIoFile atomic queue v2. X-Git-Tag: take04~1^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a1c9883034b4016c1a49fec9d93b86810ced5b5a;p=thirdparty%2Fsquid.git IpcIoFile atomic queue v2. --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index 30a344d69a..4514fa7153 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -43,7 +43,6 @@ IpcIoFile::~IpcIoFile() void IpcIoFile::open(int flags, mode_t mode, RefCount callback) { - debugs(0,0,HERE); ioRequestor = callback; Must(diskId < 0); // we do not know our disker yet Must(!diskerQueue && !workerQueue); @@ -69,6 +68,7 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) Ipc::StrandSearchRequest request; request.requestorId = KidIdentifier; + request.data = this; request.tag = dbName; Ipc::TypedMsgHdr msg; @@ -81,17 +81,20 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) void IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) { - debugs(0,0,HERE); Must(diskId < 0); // we do not know our disker yet Must(!workerQueue); + // contain single pending open request at this time + newerRequests->clear(); + olderRequests->clear(); + if (!response) { debugs(79,1, HERE << "error: timeout"); error_ = true; } else { diskId = response->strand.kidId; if (diskId >= 0) { - workerQueue = DiskerQueue::Attach(dbName.termedBuf(), KidIdentifier); + workerQueue = DiskerQueue::Attach(dbName.termedBuf(), KidIdentifier - 1); ipcIoFiles.insert(std::make_pair(diskId, this)); } else { error_ = true; @@ -109,7 +112,6 @@ IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) { void IpcIoFile::create(int flags, mode_t mode, RefCount callback) { - debugs(0,0,HERE); assert(false); // check /* We use the same logic path for open */ open(flags, mode, callback); @@ -118,7 +120,6 @@ IpcIoFile::create(int flags, mode_t mode, RefCount callback) void IpcIoFile::close() { - debugs(0,0,HERE); assert(ioRequestor != NULL); delete diskerQueue; @@ -172,7 +173,6 @@ void IpcIoFile::readCompleted(ReadRequest *readRequest, const IpcIoMsg *const response) { - debugs(0,0,HERE); bool ioError = false; if (!response) { debugs(79,1, HERE << "error: timeout"); @@ -214,7 +214,6 @@ void IpcIoFile::writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response) { - debugs(0,0,HERE); bool ioError = false; if (!response) { debugs(79,1, HERE << "error: timeout"); @@ -252,7 +251,6 @@ IpcIoFile::ioInProgress() const void IpcIoFile::trackPendingRequest(IpcIoPendingRequest &pending) { - debugs(0,0,HERE); newerRequests->insert(std::make_pair(pending.id, &pending)); if (!timeoutCheckScheduled) scheduleTimeoutCheck(); @@ -262,12 +260,13 @@ IpcIoFile::trackPendingRequest(IpcIoPendingRequest &pending) void IpcIoFile::push(IpcIoPendingRequest &pending) { - debugs(0,0,HERE); + debugs(47, 7, HERE); Must(diskId >= 0); Must(workerQueue); Must(pending.readRequest || pending.writeRequest); IpcIoMsg ipcIo; + ipcIo.requestId = pending.id; if (pending.readRequest) { ipcIo.command = IpcIo::cmdRead; ipcIo.offset = pending.readRequest->offset; @@ -331,7 +330,7 @@ IpcIoFile::handleResponse(const IpcIoMsg &ipcIo) void IpcIoFile::Notify(const int peerId) { - debugs(0,0,HERE); + debugs(47, 7, HERE << "kid" << peerId); Ipc::TypedMsgHdr msg; msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type? msg.putInt(KidIdentifier); @@ -342,7 +341,7 @@ IpcIoFile::Notify(const int peerId) void IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg) { - debugs(0,0,HERE); + debugs(47, 7, HERE); if (IamDiskProcess()) DiskerHandleRequests(); else { @@ -357,7 +356,6 @@ IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg) void IpcIoFile::CheckTimeouts(void *const param) { - debugs(0,0,HERE); Must(param); reinterpret_cast(param)->checkTimeouts(); } @@ -365,7 +363,6 @@ IpcIoFile::CheckTimeouts(void *const param) void IpcIoFile::checkTimeouts() { - debugs(0,0,HERE); timeoutCheckScheduled = false; // any old request would have timed out by now @@ -391,7 +388,6 @@ IpcIoFile::checkTimeouts() void IpcIoFile::scheduleTimeoutCheck() { - debugs(0,0,HERE); // we check all older requests at once so some may be wait for 2*timeout const double timeout = 7; // in seconds eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts, @@ -403,7 +399,6 @@ IpcIoFile::scheduleTimeoutCheck() IpcIoPendingRequest * IpcIoFile::dequeueRequest(const unsigned int requestId) { - debugs(47, 3, HERE); Must(requestId != 0); RequestMap *map = NULL; @@ -445,7 +440,6 @@ IpcIoMsg::IpcIoMsg(): IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile): file(aFile), readRequest(NULL), writeRequest(NULL) { - debugs(0,0,HERE); if (++file->lastRequestId == 0) // don't use zero value as requestId ++file->lastRequestId; id = file->lastRequestId; @@ -454,7 +448,6 @@ IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile): void IpcIoPendingRequest::completeIo(const IpcIoMsg *const response) { - debugs(0,0,HERE); Must(file != NULL); if (readRequest) @@ -477,7 +470,6 @@ static int TheFile = -1; ///< db file descriptor static void diskerRead(IpcIoMsg &ipcIo) { - debugs(0,0,HERE); const ssize_t read = pread(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset); statCounter.syscalls.disk.reads++; fd_bytes(TheFile, read, FD_READ); @@ -499,7 +491,6 @@ diskerRead(IpcIoMsg &ipcIo) static void diskerWrite(IpcIoMsg &ipcIo) { - debugs(0,0,HERE); const ssize_t wrote = pwrite(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset); statCounter.syscalls.disk.writes++; fd_bytes(TheFile, wrote, FD_WRITE); @@ -521,7 +512,6 @@ diskerWrite(IpcIoMsg &ipcIo) void IpcIoFile::DiskerHandleRequests() { - debugs(0,0,HERE); Must(diskerQueue); int workerId; IpcIoMsg ipcIo; @@ -533,7 +523,6 @@ IpcIoFile::DiskerHandleRequests() void IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) { - debugs(0,0,HERE); Must(diskerQueue); if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) { @@ -556,13 +545,12 @@ IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) diskerQueue->push(workerId, ipcIo); // XXX: report warning if (diskerQueue->pushedSize(workerId) == 1) - Notify(workerId); // notify worker + Notify(workerId + 1); // notify worker } static bool DiskerOpen(const String &path, int flags, mode_t mode) { - debugs(0,0,HERE); assert(TheFile < 0); TheFile = file_open(path.termedBuf(), flags); @@ -582,11 +570,10 @@ DiskerOpen(const String &path, int flags, mode_t mode) static void DiskerClose(const String &path) { - debugs(0,0,HERE); if (TheFile >= 0) { file_close(TheFile); debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile); TheFile = -1; store_open_disk_fd--; - } + } } diff --git a/src/ipc/Strand.cc b/src/ipc/Strand.cc index 8a8e5120dd..0e76e04c8a 100644 --- a/src/ipc/Strand.cc +++ b/src/ipc/Strand.cc @@ -67,6 +67,7 @@ void Ipc::Strand::receive(const TypedMsgHdr &message) case mtIpcIoNotification: IpcIoFile::HandleNotification(message); + break; case mtCacheMgrRequest: handleCacheMgrRequest(Mgr::Request(message)); diff --git a/src/ipc/TypedMsgHdr.h b/src/ipc/TypedMsgHdr.h index e94d14f5f7..95afe00dd6 100644 --- a/src/ipc/TypedMsgHdr.h +++ b/src/ipc/TypedMsgHdr.h @@ -27,7 +27,7 @@ namespace Ipc class TypedMsgHdr: public msghdr { public: - enum {maxSize = 36*1024}; // XXX: so that disker I/O can fit + enum {maxSize = 4096}; public: TypedMsgHdr();