This allows using KidIdentifier as queue id directly without +-1 math.
} else {
diskId = response->strand.kidId;
if (diskId >= 0) {
- // XXX: Remove this +-1 math! FewToOneBiQueue API must use kid IDs.
- workerQueue = DiskerQueue::Attach(dbName, KidIdentifier - 1);
+ workerQueue = DiskerQueue::Attach(dbName, KidIdentifier);
const bool inserted =
IpcIoFiles.insert(std::make_pair(diskId, this)).second;
Must(inserted);
debugs(47, 7, HERE << "from " << from);
if (IamDiskProcess()) {
const int workerId = from;
- DiskerHandleRequests(workerId - 1);
+ DiskerHandleRequests(workerId);
} else {
const int diskId = from;
const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
else // ipcIo.command == IpcIo::cmdWrite
diskerWrite(ipcIo);
- debugs(47, 7, HERE << "pushing " << SipcIo(workerId+1, ipcIo, KidIdentifier) << " at " << diskerQueue->biQueues[workerId]->pushQueue->size());
+ debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier) << " at " << diskerQueue->biQueues[workerId]->pushQueue->size());
try {
if (diskerQueue->push(workerId, ipcIo))
- Notify(workerId + 1); // must notify worker
+ Notify(workerId); // must notify worker
} catch (const DiskerQueue::Full &) {
// The worker queue should not overflow because the worker should pop()
// before push()ing and because if disker pops N requests at a time,
// we should make sure the worker pop() queue length is the worker
// push queue length plus N+1. XXX: implement the N+1 difference.
debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue overflow: " <<
- SipcIo(workerId+1, ipcIo, KidIdentifier)); // TODO: report queue len
+ SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
// the I/O request we could not push will timeout
}
biQueues.reserve(theWorkerCount);
for (int i = 0; i < theWorkerCount; ++i) {
OneToOneBiQueue *const biQueue =
- new OneToOneBiQueue(QueueId(id, i), maxItemSize, capacity);
+ new OneToOneBiQueue(QueueId(id, i + WorkerIdOffset), maxItemSize, capacity);
QueueReader *remoteReader =
new (shm.reserve(sizeof(QueueReader))) QueueReader;
biQueue->readers(reader, remoteReader);
Ipc::Mem::Segment &shm = *shmPtr;
shm.open();
- assert(shm.size() >= static_cast<off_t>((1 + workerId+1)*sizeof(QueueReader)));
+ assert(shm.size() >= static_cast<off_t>((1 + workerId+1 - WorkerIdOffset)*sizeof(QueueReader)));
QueueReader *readers = reinterpret_cast<QueueReader*>(shm.mem());
QueueReader *remoteReader = &readers[0];
debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id);
- QueueReader *localReader = &readers[workerId+1];
+ QueueReader *localReader = &readers[workerId+1 - WorkerIdOffset];
debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id);
OneToOneBiQueue *const biQueue =
bool FewToOneBiQueue::validWorkerId(const int workerId) const
{
- return 0 <= workerId && workerId < theWorkerCount;
+ return WorkerIdOffset <= workerId &&
+ workerId < WorkerIdOffset + theWorkerCount;
}
void
* communicate with the one central process. The central process uses
* FewToOneBiQueue object, while workers use OneToOneBiQueue objects
* created with the Attach() method. Each worker has a unique integer
- * ID in [0, workerCount) range.
+ * ID in [1, workerCount] range.
*/
class FewToOneBiQueue {
public:
Ipc::Mem::Segment shm; ///< shared memory segment to store the reader
QueueReader *reader; ///< the state of the code popping from all biQueues
+
+ enum { WorkerIdOffset = 1 }; ///< worker ID offset, always 1 for now
};
for (int i = 0; i < theWorkerCount; ++i) {
theLastPopWorker = (theLastPopWorker + 1) % theWorkerCount;
if (biQueues[theLastPopWorker]->pop(value)) {
- workerId = theLastPopWorker;
+ workerId = theLastPopWorker + WorkerIdOffset;
return true;
}
}
FewToOneBiQueue::push(const int workerId, const Value &value)
{
assert(validWorkerId(workerId));
- return biQueues[workerId]->push(value);
+ return biQueues[workerId - WorkerIdOffset]->push(value);
}
#endif // SQUID_IPC_QUEUE_H