]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Adjust FewToOneBiQueue to use IDs in [1, workerCount] range.
authorDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Tue, 19 Apr 2011 11:00:37 +0000 (15:00 +0400)
committerDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Tue, 19 Apr 2011 11:00:37 +0000 (15:00 +0400)
This allows using KidIdentifier as queue id directly without +-1 math.

src/DiskIO/IpcIo/IpcIoFile.cc
src/ipc/Queue.cc
src/ipc/Queue.h

index 11a3bd15dca4dae44e2e6ef626a80177514ed17f..ebf102738ac43438eca48c326d738e910b5e85df 100644 (file)
@@ -116,8 +116,7 @@ IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) {
     } else {
         diskId = response->strand.kidId;
         if (diskId >= 0) {
-            // XXX: Remove this +-1 math! FewToOneBiQueue API must use kid IDs.
-            workerQueue = DiskerQueue::Attach(dbName, KidIdentifier - 1);
+            workerQueue = DiskerQueue::Attach(dbName, KidIdentifier);
             const bool inserted =
                 IpcIoFiles.insert(std::make_pair(diskId, this)).second;
             Must(inserted);
@@ -401,7 +400,7 @@ IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
     debugs(47, 7, HERE << "from " << from);
     if (IamDiskProcess()) {
         const int workerId = from;
-        DiskerHandleRequests(workerId - 1);
+        DiskerHandleRequests(workerId);
     } else {
         const int diskId = from;
         const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
@@ -628,18 +627,18 @@ IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
     else // ipcIo.command == IpcIo::cmdWrite
         diskerWrite(ipcIo);
 
-    debugs(47, 7, HERE << "pushing " << SipcIo(workerId+1, ipcIo, KidIdentifier) << " at " << diskerQueue->biQueues[workerId]->pushQueue->size());
+    debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier) << " at " << diskerQueue->biQueues[workerId]->pushQueue->size());
 
     try {
         if (diskerQueue->push(workerId, ipcIo))
-            Notify(workerId + 1); // must notify worker
+            Notify(workerId); // must notify worker
     } catch (const DiskerQueue::Full &) {
         // The worker queue should not overflow because the worker should pop()
         // before push()ing and because if disker pops N requests at a time,
         // we should make sure the worker pop() queue length is the worker
         // push queue length plus N+1. XXX: implement the N+1 difference.
         debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue overflow: " <<
-               SipcIo(workerId+1, ipcIo, KidIdentifier)); // TODO: report queue len
+               SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
 
         // the I/O request we could not push will timeout
     }
index c09331909f3a3e294a7fd5a37a8da8b0e57e69ec..2492cf7457096448da1aed714bd9ebc352c47e17 100644 (file)
@@ -142,7 +142,7 @@ FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const
     biQueues.reserve(theWorkerCount);
     for (int i = 0; i < theWorkerCount; ++i) {
         OneToOneBiQueue *const biQueue =
-            new OneToOneBiQueue(QueueId(id, i), maxItemSize, capacity);
+            new OneToOneBiQueue(QueueId(id, i + WorkerIdOffset), maxItemSize, capacity);
         QueueReader *remoteReader =
             new (shm.reserve(sizeof(QueueReader))) QueueReader;
         biQueue->readers(reader, remoteReader);
@@ -158,11 +158,11 @@ FewToOneBiQueue::Attach(const String &id, const int workerId)
 
     Ipc::Mem::Segment &shm = *shmPtr;
     shm.open();
-    assert(shm.size() >= static_cast<off_t>((1 + workerId+1)*sizeof(QueueReader)));
+    assert(shm.size() >= static_cast<off_t>((1 + workerId+1 - WorkerIdOffset)*sizeof(QueueReader)));
     QueueReader *readers = reinterpret_cast<QueueReader*>(shm.mem());
     QueueReader *remoteReader = &readers[0];
     debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id);
-    QueueReader *localReader = &readers[workerId+1];
+    QueueReader *localReader = &readers[workerId+1 - WorkerIdOffset];
     debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id);
 
     OneToOneBiQueue *const biQueue =
@@ -179,7 +179,8 @@ FewToOneBiQueue::~FewToOneBiQueue()
 
 bool FewToOneBiQueue::validWorkerId(const int workerId) const
 {
-    return 0 <= workerId && workerId < theWorkerCount;
+    return WorkerIdOffset <= workerId &&
+        workerId < WorkerIdOffset + theWorkerCount;
 }
 
 void
index a766d103d556783a95e2b302d5a83cda80c30d0e..847b3ad9e95f527ebefea731c00be8a01110eea9 100644 (file)
@@ -131,7 +131,7 @@ public:
  * communicate with the one central process. The central process uses
  * FewToOneBiQueue object, while workers use OneToOneBiQueue objects
  * created with the Attach() method. Each worker has a unique integer
- * ID in [0, workerCount) range.
+ * ID in [1, workerCount] range.
  */
 class FewToOneBiQueue {
 public:
@@ -161,6 +161,8 @@ public:
 
     Ipc::Mem::Segment shm; ///< shared memory segment to store the reader
     QueueReader *reader; ///< the state of the code popping from all biQueues
+
+    enum { WorkerIdOffset = 1 }; ///< worker ID offset, always 1 for now
 };
 
 
@@ -222,7 +224,7 @@ FewToOneBiQueue::pop(int &workerId, Value &value)
     for (int i = 0; i < theWorkerCount; ++i) {
         theLastPopWorker = (theLastPopWorker + 1) % theWorkerCount;
         if (biQueues[theLastPopWorker]->pop(value)) {
-            workerId = theLastPopWorker;
+            workerId = theLastPopWorker + WorkerIdOffset;
             return true;
         }
     }
@@ -234,7 +236,7 @@ bool
 FewToOneBiQueue::push(const int workerId, const Value &value)
 {
     assert(validWorkerId(workerId));
-    return biQueues[workerId]->push(value);
+    return biQueues[workerId - WorkerIdOffset]->push(value);
 }
 
 #endif // SQUID_IPC_QUEUE_H