debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
+ // protect DiskerHandleRequest() from pop queue overflow
+ if (pendingRequests() >= QueueCapacity)
+ throw Ipc::OneToOneUniQueue::Full();
+
if (queue->push(diskId, ipcIo))
Notify(diskId); // must notify disker
trackPendingRequest(ipcIo.requestId, pending);
if (queue->push(workerId, ipcIo))
Notify(workerId); // must notify worker
} catch (const Queue::Full &) {
- // The worker queue should not overflow because the worker should pop()
- // before push()ing and because if disker pops N requests at a time,
- // we should make sure the worker pop() queue length is the worker
- // push queue length plus N+1. XXX: implement the N+1 difference.
+ // The worker pop queue should not overflow because the worker can
+ // push only if pendingRequests() is less than QueueCapacity.
debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue for " <<
DbName << " overflow: " <<
SipcIo(workerId, ipcIo, KidIdentifier)); // TODO: report queue len
class IpcIoPendingRequest;
+/// In a worker process, represents a single (remote) cache_dir disker file.
+/// In a disker process, used as a bunch of static methods handling that file.
class IpcIoFile: public DiskFile
{
CBDATA_CLASS(IpcIoFile);
void push(IpcIoPendingRequest *const pending);
IpcIoPendingRequest *dequeueRequest(const unsigned int requestId);
+ /// the total number of I/O requests in push queue and pop queue
+ /// (but no, the implementation does not add push and pop queue sizes)
+ size_t pendingRequests() const { return olderRequests->size() + newerRequests->size(); }
+
static void Notify(const int peerId);
static void OpenTimeout(void *const param);