]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
IpcIoFile atomic queue v2.
authorDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Thu, 17 Feb 2011 11:37:59 +0000 (14:37 +0300)
committerDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Thu, 17 Feb 2011 11:37:59 +0000 (14:37 +0300)
src/DiskIO/IpcIo/IpcIoFile.cc
src/ipc/Strand.cc
src/ipc/TypedMsgHdr.h

index 30a344d69ac1c47df738c5c67d99d07edeea2611..4514fa7153eee69123fe6f955f2c0147f2ca7c02 100644 (file)
@@ -43,7 +43,6 @@ IpcIoFile::~IpcIoFile()
 void
 IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
 {
-    debugs(0,0,HERE);
     ioRequestor = callback;
     Must(diskId < 0); // we do not know our disker yet
     Must(!diskerQueue && !workerQueue);
@@ -69,6 +68,7 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
 
     Ipc::StrandSearchRequest request;
     request.requestorId = KidIdentifier;
+    request.data = this;
     request.tag = dbName;
 
     Ipc::TypedMsgHdr msg;
@@ -81,17 +81,20 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
 
 void
 IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) {
-    debugs(0,0,HERE);
     Must(diskId < 0); // we do not know our disker yet
     Must(!workerQueue);
 
+    // contain single pending open request at this time
+    newerRequests->clear();
+    olderRequests->clear();
+
     if (!response) {
         debugs(79,1, HERE << "error: timeout");
         error_ = true;
     } else {
         diskId = response->strand.kidId;
         if (diskId >= 0) {
-            workerQueue = DiskerQueue::Attach(dbName.termedBuf(), KidIdentifier);
+            workerQueue = DiskerQueue::Attach(dbName.termedBuf(), KidIdentifier - 1);
             ipcIoFiles.insert(std::make_pair(diskId, this));
         } else {
             error_ = true;
@@ -109,7 +112,6 @@ IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) {
 void
 IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
 {
-    debugs(0,0,HERE);
     assert(false); // check
     /* We use the same logic path for open */
     open(flags, mode, callback);
@@ -118,7 +120,6 @@ IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
 void
 IpcIoFile::close()
 {
-    debugs(0,0,HERE);
     assert(ioRequestor != NULL);
 
     delete diskerQueue;
@@ -172,7 +173,6 @@ void
 IpcIoFile::readCompleted(ReadRequest *readRequest,
                          const IpcIoMsg *const response)
 {
-    debugs(0,0,HERE);
     bool ioError = false;
     if (!response) {
         debugs(79,1, HERE << "error: timeout");
@@ -214,7 +214,6 @@ void
 IpcIoFile::writeCompleted(WriteRequest *writeRequest,
                           const IpcIoMsg *const response)
 {
-    debugs(0,0,HERE);
     bool ioError = false;
     if (!response) {
         debugs(79,1, HERE << "error: timeout");
@@ -252,7 +251,6 @@ IpcIoFile::ioInProgress() const
 void
 IpcIoFile::trackPendingRequest(IpcIoPendingRequest &pending)
 {
-    debugs(0,0,HERE);
     newerRequests->insert(std::make_pair(pending.id, &pending));
     if (!timeoutCheckScheduled)
         scheduleTimeoutCheck();
@@ -262,12 +260,13 @@ IpcIoFile::trackPendingRequest(IpcIoPendingRequest &pending)
 void
 IpcIoFile::push(IpcIoPendingRequest &pending)
 {
-    debugs(0,0,HERE);
+    debugs(47, 7, HERE);
     Must(diskId >= 0);
     Must(workerQueue);
     Must(pending.readRequest || pending.writeRequest);
 
     IpcIoMsg ipcIo;
+    ipcIo.requestId = pending.id;
     if (pending.readRequest) {
         ipcIo.command = IpcIo::cmdRead;
         ipcIo.offset = pending.readRequest->offset;
@@ -331,7 +330,7 @@ IpcIoFile::handleResponse(const IpcIoMsg &ipcIo)
 void
 IpcIoFile::Notify(const int peerId)
 {
-    debugs(0,0,HERE);
+    debugs(47, 7, HERE << "kid" << peerId);
     Ipc::TypedMsgHdr msg;
     msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
     msg.putInt(KidIdentifier);
@@ -342,7 +341,7 @@ IpcIoFile::Notify(const int peerId)
 void
 IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
 {
-    debugs(0,0,HERE);
+    debugs(47, 7, HERE);
     if (IamDiskProcess())
         DiskerHandleRequests();
     else {
@@ -357,7 +356,6 @@ IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
 void
 IpcIoFile::CheckTimeouts(void *const param)
 {
-    debugs(0,0,HERE);
     Must(param);
     reinterpret_cast<IpcIoFile*>(param)->checkTimeouts();
 }
@@ -365,7 +363,6 @@ IpcIoFile::CheckTimeouts(void *const param)
 void
 IpcIoFile::checkTimeouts()
 {
-    debugs(0,0,HERE);
     timeoutCheckScheduled = false;
 
     // any old request would have timed out by now
@@ -391,7 +388,6 @@ IpcIoFile::checkTimeouts()
 void
 IpcIoFile::scheduleTimeoutCheck()
 {
-    debugs(0,0,HERE);
     // we check all older requests at once so some may be wait for 2*timeout
     const double timeout = 7; // in seconds
     eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
@@ -403,7 +399,6 @@ IpcIoFile::scheduleTimeoutCheck()
 IpcIoPendingRequest *
 IpcIoFile::dequeueRequest(const unsigned int requestId)
 {
-    debugs(47, 3, HERE);
     Must(requestId != 0);
 
     RequestMap *map = NULL;
@@ -445,7 +440,6 @@ IpcIoMsg::IpcIoMsg():
 IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
     file(aFile), readRequest(NULL), writeRequest(NULL)
 {
-    debugs(0,0,HERE);
     if (++file->lastRequestId == 0) // don't use zero value as requestId
         ++file->lastRequestId;
     id = file->lastRequestId;
@@ -454,7 +448,6 @@ IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
 void
 IpcIoPendingRequest::completeIo(const IpcIoMsg *const response)
 {
-    debugs(0,0,HERE);
     Must(file != NULL);
 
     if (readRequest)
@@ -477,7 +470,6 @@ static int TheFile = -1; ///< db file descriptor
 static void
 diskerRead(IpcIoMsg &ipcIo)
 {
-    debugs(0,0,HERE);
     const ssize_t read = pread(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
     statCounter.syscalls.disk.reads++;
     fd_bytes(TheFile, read, FD_READ);
@@ -499,7 +491,6 @@ diskerRead(IpcIoMsg &ipcIo)
 static void
 diskerWrite(IpcIoMsg &ipcIo)
 {
-    debugs(0,0,HERE);
     const ssize_t wrote = pwrite(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
     statCounter.syscalls.disk.writes++;
     fd_bytes(TheFile, wrote, FD_WRITE);
@@ -521,7 +512,6 @@ diskerWrite(IpcIoMsg &ipcIo)
 void
 IpcIoFile::DiskerHandleRequests()
 {
-    debugs(0,0,HERE);
     Must(diskerQueue);
     int workerId;
     IpcIoMsg ipcIo;
@@ -533,7 +523,6 @@ IpcIoFile::DiskerHandleRequests()
 void
 IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
 {
-    debugs(0,0,HERE);
     Must(diskerQueue);
 
     if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
@@ -556,13 +545,12 @@ IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
     diskerQueue->push(workerId, ipcIo); // XXX: report warning
 
     if (diskerQueue->pushedSize(workerId) == 1)
-        Notify(workerId); // notify worker
+        Notify(workerId + 1); // notify worker
 }
 
 static bool
 DiskerOpen(const String &path, int flags, mode_t mode)
 {
-    debugs(0,0,HERE);
     assert(TheFile < 0);
 
     TheFile = file_open(path.termedBuf(), flags);
@@ -582,11 +570,10 @@ DiskerOpen(const String &path, int flags, mode_t mode)
 static void
 DiskerClose(const String &path)
 {
-    debugs(0,0,HERE);
     if (TheFile >= 0) {
         file_close(TheFile);
         debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
         TheFile = -1;
         store_open_disk_fd--;
-       }
+    }
 }
index 8a8e5120dd3420a6911673cb54bf6eef7a310e2c..0e76e04c8a4b557b917d031dd9ede26745a3747c 100644 (file)
@@ -67,6 +67,7 @@ void Ipc::Strand::receive(const TypedMsgHdr &message)
 
     case mtIpcIoNotification:
         IpcIoFile::HandleNotification(message);
+        break;
 
     case mtCacheMgrRequest:
         handleCacheMgrRequest(Mgr::Request(message));
index e94d14f5f7625bb1b8f671cbd8a0511fb4ce6370..95afe00dd67201606bae090f2e26e68ac79c77fb 100644 (file)
@@ -27,7 +27,7 @@ namespace Ipc
 class TypedMsgHdr: public msghdr
 {
 public:
-    enum {maxSize = 36*1024}; // XXX: so that disker I/O can fit
+    enum {maxSize = 4096};
 
 public:
     TypedMsgHdr();