From: Dmitry Kurochkin Date: Tue, 19 Apr 2011 11:00:37 +0000 (+0400) Subject: Adjust FewToOneBiQueue to use IDs in [1, workerCount] range. X-Git-Tag: take06~7 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=5e44782ee7b532848b596d92ccf6ee5b738834ec;p=thirdparty%2Fsquid.git Adjust FewToOneBiQueue to use IDs in [1, workerCount] range. This allows using KidIdentifier as queue id directly without +-1 math. --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index 11a3bd15dc..ebf102738a 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -116,8 +116,7 @@ IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) { } else { diskId = response->strand.kidId; if (diskId >= 0) { - // XXX: Remove this +-1 math! FewToOneBiQueue API must use kid IDs. - workerQueue = DiskerQueue::Attach(dbName, KidIdentifier - 1); + workerQueue = DiskerQueue::Attach(dbName, KidIdentifier); const bool inserted = IpcIoFiles.insert(std::make_pair(diskId, this)).second; Must(inserted); @@ -401,7 +400,7 @@ IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg) debugs(47, 7, HERE << "from " << from); if (IamDiskProcess()) { const int workerId = from; - DiskerHandleRequests(workerId - 1); + DiskerHandleRequests(workerId); } else { const int diskId = from; const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId); @@ -628,18 +627,18 @@ IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) else // ipcIo.command == IpcIo::cmdWrite diskerWrite(ipcIo); - debugs(47, 7, HERE << "pushing " << SipcIo(workerId+1, ipcIo, KidIdentifier) << " at " << diskerQueue->biQueues[workerId]->pushQueue->size()); + debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier) << " at " << diskerQueue->biQueues[workerId]->pushQueue->size()); try { if (diskerQueue->push(workerId, ipcIo)) - Notify(workerId + 1); // must notify worker + Notify(workerId); // must notify worker } catch (const DiskerQueue::Full &) { // The worker queue should not overflow because the worker should pop() // before push()ing and because if disker pops N requests at a time, // we should make sure the worker pop() queue length is the worker // push queue length plus N+1. XXX: implement the N+1 difference. debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue overflow: " << - SipcIo(workerId+1, ipcIo, KidIdentifier)); // TODO: report queue len + SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len // the I/O request we could not push will timeout } diff --git a/src/ipc/Queue.cc b/src/ipc/Queue.cc index c09331909f..2492cf7457 100644 --- a/src/ipc/Queue.cc +++ b/src/ipc/Queue.cc @@ -142,7 +142,7 @@ FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const biQueues.reserve(theWorkerCount); for (int i = 0; i < theWorkerCount; ++i) { OneToOneBiQueue *const biQueue = - new OneToOneBiQueue(QueueId(id, i), maxItemSize, capacity); + new OneToOneBiQueue(QueueId(id, i + WorkerIdOffset), maxItemSize, capacity); QueueReader *remoteReader = new (shm.reserve(sizeof(QueueReader))) QueueReader; biQueue->readers(reader, remoteReader); @@ -158,11 +158,11 @@ FewToOneBiQueue::Attach(const String &id, const int workerId) Ipc::Mem::Segment &shm = *shmPtr; shm.open(); - assert(shm.size() >= static_cast((1 + workerId+1)*sizeof(QueueReader))); + assert(shm.size() >= static_cast((1 + workerId+1 - WorkerIdOffset)*sizeof(QueueReader))); QueueReader *readers = reinterpret_cast(shm.mem()); QueueReader *remoteReader = &readers[0]; debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id); - QueueReader *localReader = &readers[workerId+1]; + QueueReader *localReader = &readers[workerId+1 - WorkerIdOffset]; debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id); OneToOneBiQueue *const biQueue = @@ -179,7 +179,8 @@ FewToOneBiQueue::~FewToOneBiQueue() bool FewToOneBiQueue::validWorkerId(const int workerId) const { - return 0 <= workerId && workerId < theWorkerCount; + return WorkerIdOffset <= workerId && + workerId < WorkerIdOffset + theWorkerCount; } void diff --git a/src/ipc/Queue.h b/src/ipc/Queue.h index a766d103d5..847b3ad9e9 100644 --- a/src/ipc/Queue.h +++ b/src/ipc/Queue.h @@ -131,7 +131,7 @@ public: * communicate with the one central process. The central process uses * FewToOneBiQueue object, while workers use OneToOneBiQueue objects * created with the Attach() method. Each worker has a unique integer - * ID in [0, workerCount) range. + * ID in [1, workerCount] range. */ class FewToOneBiQueue { public: @@ -161,6 +161,8 @@ public: Ipc::Mem::Segment shm; ///< shared memory segment to store the reader QueueReader *reader; ///< the state of the code popping from all biQueues + + enum { WorkerIdOffset = 1 }; ///< worker ID offset, always 1 for now }; @@ -222,7 +224,7 @@ FewToOneBiQueue::pop(int &workerId, Value &value) for (int i = 0; i < theWorkerCount; ++i) { theLastPopWorker = (theLastPopWorker + 1) % theWorkerCount; if (biQueues[theLastPopWorker]->pop(value)) { - workerId = theLastPopWorker; + workerId = theLastPopWorker + WorkerIdOffset; return true; } } @@ -234,7 +236,7 @@ bool FewToOneBiQueue::push(const int workerId, const Value &value) { assert(validWorkerId(workerId)); - return biQueues[workerId]->push(value); + return biQueues[workerId - WorkerIdOffset]->push(value); } #endif // SQUID_IPC_QUEUE_H