/* IpcIoFile */
IpcIoFile::IpcIoFile(char const *aDb):
- dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
+ dbName(aDb),
+ myPid(getpid()),
+ diskId(-1),
+ error_(false),
+ lastRequestId(0),
olderRequests(&requestMap1), newerRequests(&requestMap2),
timeoutCheckScheduled(false)
{
+ assert(myPid >= 0);
}
IpcIoFile::~IpcIoFile()
queue->localRateLimit().store(config.ioRate);
- Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
+ Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, myPid));
ann.strand.tag = dbName;
Ipc::TypedMsgHdr message;
ann.pack(message);
++lastRequestId;
ipcIo.requestId = lastRequestId;
ipcIo.start = current_time;
+ ipcIo.workerPid = myPid;
if (pending->readRequest) {
ipcIo.command = IpcIo::cmdRead;
ipcIo.offset = pending->readRequest->offset;
if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
CallBack(pending->codeContext, [&] {
debugs(47, 7, "popped disker response to " << SipcIo(KidIdentifier, ipcIo, diskId));
- pending->completeIo(&ipcIo);
+ if (myPid == ipcIo.workerPid)
+ pending->completeIo(&ipcIo);
+ else
+ debugs(47, 5, "ignoring response meant for our predecessor PID: " << ipcIo.workerPid);
delete pending; // XXX: leaking if throwing
});
} else {
requestId(0),
offset(0),
len(0),
+ workerPid(-1), // Unix-like systems use process IDs starting from 0
command(IpcIo::cmdNone),
xerrno(0)
{
os << "id: " << requestId <<
", offset: " << offset <<
", size: " << len <<
+ ", workerPid: " << workerPid <<
", page: " << page <<
", command: " << command <<
", start: " << start <<
return false;
// is there an I/O request we could potentially delay?
- int processId;
+ int kidId;
IpcIoMsg ipcIo;
- if (!queue->peek(processId, ipcIo)) {
+ if (!queue->peek(kidId, ipcIo)) {
// unlike pop(), peek() is not reliable and does not block reader
// so we must proceed with pop() even if it is likely to fail
return false;
ipcIo.len << " at " << ipcIo.offset <<
" ipcIo" << workerId << '.' << ipcIo.requestId);
+ const auto workerPid = ipcIo.workerPid;
+ assert(workerPid >= 0);
+
if (ipcIo.command == IpcIo::cmdRead)
diskerRead(ipcIo);
else // ipcIo.command == IpcIo::cmdWrite
diskerWrite(ipcIo);
+ assert(ipcIo.workerPid == workerPid);
+
debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
try {
off_t offset;
size_t len;
Ipc::Mem::PageId page;
+ pid_t workerPid; ///< the process ID of the I/O requestor
IpcIo::Command command; ///< what disker is supposed to do or did
struct timeval start; ///< when the I/O request was converted to IpcIoMsg
private:
const String dbName; ///< the name of the file we are managing
- int diskId; ///< the process ID of the disker we talk to
+ const pid_t myPid; ///< optimization: cached process ID of our process
+ int diskId; ///< the kid ID of the disker we talk to
RefCount<IORequestor> ioRequestor;
bool error_; ///< whether we have seen at least one I/O error (XXX)