From: Dmitry Kurochkin Date: Wed, 16 Feb 2011 16:19:26 +0000 (+0300) Subject: IpcIoFile atomic queue v1. X-Git-Tag: take04~1^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9a51593dfc8ef323a688ba66f5b363aee85e2e5d;p=thirdparty%2Fsquid.git IpcIoFile atomic queue v1. --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index 0f7cd69f78..f7349e0c6f 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -12,17 +12,13 @@ #include "DiskIO/WriteRequest.h" #include "ipc/Messages.h" #include "ipc/Port.h" -#include "ipc/StrandCoord.h" +#include "ipc/StrandSearch.h" #include "ipc/UdsOp.h" CBDATA_CLASS_INIT(IpcIoFile); -IpcIoFile::RequestMap IpcIoFile::TheRequestMap1; -IpcIoFile::RequestMap IpcIoFile::TheRequestMap2; -IpcIoFile::RequestMap *IpcIoFile::TheOlderRequests = &IpcIoFile::TheRequestMap1; -IpcIoFile::RequestMap *IpcIoFile::TheNewerRequests = &IpcIoFile::TheRequestMap2; -bool IpcIoFile::TimeoutCheckScheduled = false; -unsigned int IpcIoFile::LastRequestId = 0; +IpcIoFile::DiskerQueue *IpcIoFile::diskerQueue = NULL; +IpcIoFile::IpcIoFiles IpcIoFile::ipcIoFiles; static bool DiskerOpen(const String &path, int flags, mode_t mode); static void DiskerClose(const String &path); @@ -31,8 +27,12 @@ static void DiskerClose(const String &path); IpcIoFile::IpcIoFile(char const *aDb): dbName(aDb), diskId(-1), - ioLevel(0), - error_(false) + workerQueue(NULL), + error_(false), + lastRequestId(0), + olderRequests(&requestMap1), + newerRequests(&requestMap2), + timeoutCheckScheduled(false) { } @@ -43,14 +43,20 @@ IpcIoFile::~IpcIoFile() void IpcIoFile::open(int flags, mode_t mode, RefCount callback) { + debugs(0,0,HERE); ioRequestor = callback; Must(diskId < 0); // we do not know our disker yet + Must(!diskerQueue && !workerQueue); if (IamDiskProcess()) { error_ = !DiskerOpen(dbName, flags, mode); if (error_) return; + // XXX: make capacity configurable + diskerQueue = new DiskerQueue(dbName.termedBuf(), Config.workers, 1024); + ipcIoFiles.insert(std::make_pair(KidIdentifier, this)); + Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid())); ann.strand.tag = dbName; Ipc::TypedMsgHdr message; @@ -59,36 +65,39 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) ioRequestor->ioCompletedNotification(); return; - } + } - // XXX: use StrandSearchRequest instead - IpcIoRequest ipcIo; - ipcIo.requestorId = KidIdentifier; - ipcIo.command = IpcIo::cmdOpen; - ipcIo.len = dbName.size(); - assert(ipcIo.len <= sizeof(ipcIo.buf)); - memcpy(ipcIo.buf, dbName.rawBuf(), ipcIo.len); + Ipc::StrandSearchRequest request; + request.requestorId = KidIdentifier; + request.tag = dbName; - IpcIoPendingRequest *pending = new IpcIoPendingRequest(this); - send(ipcIo, pending); + Ipc::TypedMsgHdr msg; + request.pack(msg); + Ipc::SendMessage(Ipc::coordinatorAddr, msg); + + IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this); + trackPendingRequest(*pending); } void -IpcIoFile::openCompleted(const IpcIoResponse *ipcResponse) { - if (!ipcResponse) { +IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) { + debugs(0,0,HERE); + Must(diskId < 0); // we do not know our disker yet + Must(!workerQueue); + + if (!response) { debugs(79,1, HERE << "error: timeout"); error_ = true; - } else - if (ipcResponse->xerrno) { - debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno)); - error_ = true; - } else { - diskId = ipcResponse->diskId; - if (diskId < 0) { + } else { + diskId = response->strand.kidId; + if (diskId >= 0) { + workerQueue = DiskerQueue::Attach(dbName.termedBuf(), KidIdentifier); + ipcIoFiles.insert(std::make_pair(diskId, this)); + } else { error_ = true; debugs(79,1, HERE << "error: no disker claimed " << dbName); - } - } + } + } ioRequestor->ioCompletedNotification(); } @@ -100,6 +109,7 @@ IpcIoFile::openCompleted(const IpcIoResponse *ipcResponse) { void IpcIoFile::create(int flags, mode_t mode, RefCount callback) { + debugs(0,0,HERE); assert(false); // check /* We use the same logic path for open */ open(flags, mode, callback); @@ -108,11 +118,15 @@ IpcIoFile::create(int flags, mode_t mode, RefCount callback) void IpcIoFile::close() { + debugs(0,0,HERE); assert(ioRequestor != NULL); + delete diskerQueue; + delete workerQueue; + if (IamDiskProcess()) DiskerClose(dbName); - // XXX: else nothing to do? + // XXX: else nothing to do? ioRequestor->closeCompleted(); } @@ -149,31 +163,26 @@ IpcIoFile::read(ReadRequest *readRequest) //assert(minOffset < 0 || minOffset <= readRequest->offset); //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset); - IpcIoRequest ipcIo; - ipcIo.requestorId = KidIdentifier; - ipcIo.command = IpcIo::cmdRead; - ipcIo.offset = readRequest->offset; - ipcIo.len = readRequest->len; - - IpcIoPendingRequest *pending = new IpcIoPendingRequest(this); + IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this); pending->readRequest = readRequest; - send(ipcIo, pending); + push(*pending); } void IpcIoFile::readCompleted(ReadRequest *readRequest, - const IpcIoResponse *ipcResponse) + const IpcIoMsg *const response) { + debugs(0,0,HERE); bool ioError = false; - if (!ipcResponse) { + if (!response) { debugs(79,1, HERE << "error: timeout"); ioError = true; // I/O timeout does not warrant setting error_? - } else - if (ipcResponse->xerrno) { - debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno)); + } else + if (response->xerrno) { + debugs(79,1, HERE << "error: " << xstrerr(response->xerrno)); ioError = error_ = true; - } else { - memcpy(readRequest->buf, ipcResponse->buf, ipcResponse->len); + } else { + memcpy(readRequest->buf, response->buf, response->len); } const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len; @@ -196,34 +205,27 @@ IpcIoFile::write(WriteRequest *writeRequest) //assert(minOffset < 0 || minOffset <= writeRequest->offset); //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset); - IpcIoRequest ipcIo; - ipcIo.requestorId = KidIdentifier; - ipcIo.command = IpcIo::cmdWrite; - ipcIo.offset = writeRequest->offset; - ipcIo.len = writeRequest->len; - assert(ipcIo.len <= sizeof(ipcIo.buf)); - memcpy(ipcIo.buf, writeRequest->buf, ipcIo.len); // optimize away - - IpcIoPendingRequest *pending = new IpcIoPendingRequest(this); + IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this); pending->writeRequest = writeRequest; - send(ipcIo, pending); + push(*pending); } void IpcIoFile::writeCompleted(WriteRequest *writeRequest, - const IpcIoResponse *ipcResponse) + const IpcIoMsg *const response) { + debugs(0,0,HERE); bool ioError = false; - if (!ipcResponse) { + if (!response) { debugs(79,1, HERE << "error: timeout"); ioError = true; // I/O timeout does not warrant setting error_? - } else - if (ipcResponse->xerrno) { - debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno)); - error_ = true; } else - if (ipcResponse->len != writeRequest->len) { - debugs(79,1, HERE << "problem: " << ipcResponse->len << " < " << writeRequest->len); + if (response->xerrno) { + debugs(79,1, HERE << "error: " << xstrerr(response->xerrno)); + ioError = error_ = true; + } else + if (response->len != writeRequest->len) { + debugs(79,1, HERE << "problem: " << response->len << " < " << writeRequest->len); error_ = true; } @@ -243,111 +245,176 @@ IpcIoFile::writeCompleted(WriteRequest *writeRequest, bool IpcIoFile::ioInProgress() const { - return ioLevel > 0; // XXX: todo + return !olderRequests->empty() || !newerRequests->empty(); } -/// sends an I/O request to disker +/// track a new pending request void -IpcIoFile::send(IpcIoRequest &ipcIo, IpcIoPendingRequest *pending) +IpcIoFile::trackPendingRequest(IpcIoPendingRequest &pending) { - if (++LastRequestId == 0) // don't use zero value as requestId - ++LastRequestId; - ipcIo.requestId = LastRequestId; - TheNewerRequests->insert(std::make_pair(ipcIo.requestId, pending)); - - Ipc::TypedMsgHdr message; - ipcIo.pack(message); + debugs(0,0,HERE); + newerRequests->insert(std::make_pair(pending.id, &pending)); + if (!timeoutCheckScheduled) + scheduleTimeoutCheck(); +} - Must(diskId >= 0 || ipcIo.command == IpcIo::cmdOpen); - const String addr = diskId >= 0 ? - Ipc::Port::MakeAddr(Ipc::strandAddrPfx, diskId) : - Ipc::coordinatorAddr; +/// push an I/O request to disker +void +IpcIoFile::push(IpcIoPendingRequest &pending) +{ + debugs(0,0,HERE); + Must(diskId >= 0); + Must(workerQueue); + Must(pending.readRequest || pending.writeRequest); + + IpcIoMsg ipcIo; + if (pending.readRequest) { + ipcIo.command = IpcIo::cmdRead; + ipcIo.offset = pending.readRequest->offset; + ipcIo.len = pending.readRequest->len; + assert(ipcIo.len <= sizeof(ipcIo.buf)); + memcpy(ipcIo.buf, pending.readRequest->buf, ipcIo.len); // optimize away + } else { // pending.writeRequest + ipcIo.command = IpcIo::cmdWrite; + ipcIo.offset = pending.writeRequest->offset; + ipcIo.len = pending.writeRequest->len; + assert(ipcIo.len <= sizeof(ipcIo.buf)); + memcpy(ipcIo.buf, pending.writeRequest->buf, ipcIo.len); // optimize away + } - debugs(47, 7, HERE << "asking disker" << diskId << " to " << - ipcIo.command << "; ipcIo" << KidIdentifier << '.' << ipcIo.requestId); + if (!workerQueue->push(ipcIo)) { + pending.completeIo(NULL); + return; + } - Ipc::SendMessage(addr, message); - ++ioLevel; + if (workerQueue->pushedSize() == 1) + Notify(diskId); // notify disker - if (!TimeoutCheckScheduled) - ScheduleTimeoutCheck(); + trackPendingRequest(pending); } -/// called when disker responds to our I/O request +/// called when coordinator responds to worker open request void -IpcIoFile::HandleResponse(const Ipc::TypedMsgHdr &raw) +IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response) { - IpcIoResponse response(raw); - - const int requestId = response.requestId; - debugs(47, 7, HERE << "disker response to " << response.command << - "; ipcIo" << KidIdentifier << '.' << requestId); + debugs(47, 7, HERE << "coordinator response to open request"); + Must(response.data); + reinterpret_cast(response.data)->openCompleted(&response); +} - Must(requestId != 0); +void +IpcIoFile::handleResponses() +{ + Must(workerQueue); + IpcIoMsg ipcIo; + while (workerQueue->pop(ipcIo)) + handleResponse(ipcIo); +} - if (IpcIoPendingRequest *pending = DequeueRequest(requestId)) { - pending->completeIo(&response); +void +IpcIoFile::handleResponse(const IpcIoMsg &ipcIo) +{ + const int requestId = ipcIo.requestId; + debugs(47, 7, HERE << "disker response to " << ipcIo.command << + "; ipcIo" << KidIdentifier << '.' << requestId); + Must(requestId); + if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) { + pending->completeIo(&ipcIo); delete pending; // XXX: leaking if throwing } else { - debugs(47, 4, HERE << "LATE disker response to " << response.command << - "; ipcIo" << KidIdentifier << '.' << requestId); + debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command << + "; ipcIo" << KidIdentifier << '.' << requestId); // nothing we can do about it; completeIo() has been called already } } -/// Mgr::IpcIoFile::checkTimeous wrapper void -IpcIoFile::CheckTimeouts(void*) +IpcIoFile::Notify(const int peerId) { - TimeoutCheckScheduled = false; + debugs(0,0,HERE); + Ipc::TypedMsgHdr msg; + msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type? + msg.putInt(KidIdentifier); + const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, peerId); + Ipc::SendMessage(addr, msg); +} + +void +IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg) +{ + debugs(0,0,HERE); + if (IamDiskProcess()) + DiskerHandleRequests(); + else { + const int diskId = msg.getInt(); + const IpcIoFiles::const_iterator i = ipcIoFiles.find(diskId); + Must(i != ipcIoFiles.end()); // XXX: warn and continue? + i->second->handleResponses(); + } +} + +/// IpcIoFile::checkTimeouts wrapper +void +IpcIoFile::CheckTimeouts(void *const param) +{ + debugs(0,0,HERE); + Must(param); + reinterpret_cast(param)->checkTimeouts(); +} + +void +IpcIoFile::checkTimeouts() +{ + debugs(0,0,HERE); + timeoutCheckScheduled = false; // any old request would have timed out by now typedef RequestMap::const_iterator RMCI; - RequestMap &elders = *TheOlderRequests; - for (RMCI i = elders.begin(); i != elders.end(); ++i) { - IpcIoPendingRequest *pending = i->second; + for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) { + IpcIoPendingRequest *const pending = i->second; const int requestId = i->first; debugs(47, 7, HERE << "disker timeout; ipcIo" << - KidIdentifier << '.' << requestId); + KidIdentifier << '.' << diskId << '.' << requestId); pending->completeIo(NULL); // no response delete pending; // XXX: leaking if throwing - } - elders.clear(); + } + olderRequests->clear(); - swap(TheOlderRequests, TheNewerRequests); // switches pointers around - if (!TheOlderRequests->empty()) - ScheduleTimeoutCheck(); + swap(olderRequests, newerRequests); // switches pointers around + if (!olderRequests->empty()) + scheduleTimeoutCheck(); } /// prepare to check for timeouts in a little while void -IpcIoFile::ScheduleTimeoutCheck() +IpcIoFile::scheduleTimeoutCheck() { + debugs(0,0,HERE); // we check all older requests at once so some may be wait for 2*timeout const double timeout = 7; // in seconds eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts, - NULL, timeout, 0, false); - TimeoutCheckScheduled = true; + this, timeout, 0, false); + timeoutCheckScheduled = true; } /// returns and forgets the right IpcIoFile pending request IpcIoPendingRequest * -IpcIoFile::DequeueRequest(unsigned int requestId) +IpcIoFile::dequeueRequest(const unsigned int requestId) { debugs(47, 3, HERE); Must(requestId != 0); - RequestMap *map = NULL; - RequestMap::iterator i = TheRequestMap1.find(requestId); + RequestMap *map = NULL; + RequestMap::iterator i = requestMap1.find(requestId); - if (i != TheRequestMap1.end()) - map = &TheRequestMap1; + if (i != requestMap1.end()) + map = &requestMap1; else { - i = TheRequestMap2.find(requestId); - if (i != TheRequestMap2.end()) - map = &TheRequestMap2; + i = requestMap2.find(requestId); + if (i != requestMap2.end()) + map = &requestMap2; } if (!map) // not found in both maps @@ -359,100 +426,35 @@ IpcIoFile::DequeueRequest(unsigned int requestId) } int -IpcIoFile::getFD() const +IpcIoFile::getFD() const { assert(false); // not supported; TODO: remove this method from API return -1; } -/* IpcIoRequest */ - -IpcIoRequest::IpcIoRequest(): - requestorId(0), requestId(0), - offset(0), len(0), - command(IpcIo::cmdNone) -{ -} - -IpcIoRequest::IpcIoRequest(const Ipc::TypedMsgHdr& msg) -{ - msg.checkType(Ipc::mtIpcIoRequest); - msg.getPod(requestorId); - msg.getPod(requestId); - - msg.getPod(offset); - msg.getPod(len); - msg.getPod(command); - - if (command == IpcIo::cmdOpen || command == IpcIo::cmdWrite) - msg.getFixed(buf, len); -} - -void -IpcIoRequest::pack(Ipc::TypedMsgHdr& msg) const -{ - msg.setType(Ipc::mtIpcIoRequest); - msg.putPod(requestorId); - msg.putPod(requestId); - - msg.putPod(offset); - msg.putPod(len); - msg.putPod(command); - - if (command == IpcIo::cmdOpen || command == IpcIo::cmdWrite) - msg.putFixed(buf, len); -} - - -/* IpcIoResponse */ - -IpcIoResponse::IpcIoResponse(): - diskId(-1), - requestId(0), - len(0), - xerrno(0) -{ -} - -IpcIoResponse::IpcIoResponse(const Ipc::TypedMsgHdr& msg) -{ - msg.checkType(Ipc::mtIpcIoResponse); - msg.getPod(diskId); - msg.getPod(requestId); - msg.getPod(len); - msg.getPod(command); - msg.getPod(xerrno); - - if (command == IpcIo::cmdRead && !xerrno) - msg.getFixed(buf, len); -} +/* IpcIoMsg */ -void -IpcIoResponse::pack(Ipc::TypedMsgHdr& msg) const +IpcIoMsg::IpcIoMsg(): + requestId(0), offset(0), len(0), command(IpcIo::cmdNone), xerrno(0) { - msg.setType(Ipc::mtIpcIoResponse); - msg.putPod(diskId); - msg.putPod(requestId); - msg.putPod(len); - msg.putPod(command); - msg.putPod(xerrno); - - if (command == IpcIo::cmdRead && !xerrno) - msg.putFixed(buf, len); } - /* IpcIoPendingRequest */ IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile): file(aFile), readRequest(NULL), writeRequest(NULL) { + debugs(0,0,HERE); + if (++file->lastRequestId == 0) // don't use zero value as requestId + ++file->lastRequestId; + id = file->lastRequestId; } void -IpcIoPendingRequest::completeIo(IpcIoResponse *response) +IpcIoPendingRequest::completeIo(const IpcIoMsg *const response) { + debugs(0,0,HERE); Must(file != NULL); if (readRequest) @@ -460,8 +462,10 @@ IpcIoPendingRequest::completeIo(IpcIoResponse *response) else if (writeRequest) file->writeCompleted(writeRequest, response); - else - file->openCompleted(response); + else { + Must(!response); // only timeouts are handled here + file->openCompleted(NULL); + } } @@ -470,94 +474,89 @@ IpcIoPendingRequest::completeIo(IpcIoResponse *response) static int TheFile = -1; ///< db file descriptor -static -void diskerRead(const IpcIoRequest &request) +static void +diskerRead(IpcIoMsg &ipcIo) { - debugs(47,5, HERE << "disker" << KidIdentifier << " reads " << - request.len << " at " << request.offset << - " ipcIo" << request.requestorId << '.' << request.requestId); - - IpcIoResponse response; - response.diskId = KidIdentifier; - response.requestId = request.requestId; - response.command = request.command; - - const ssize_t read = pread(TheFile, response.buf, request.len, request.offset); + debugs(0,0,HERE); + const ssize_t read = pread(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset); if (read >= 0) { - response.xerrno = 0; - response.len = static_cast(read); // safe because read > 0 + ipcIo.xerrno = 0; + const size_t len = static_cast(read); // safe because read > 0 debugs(47,8, HERE << "disker" << KidIdentifier << " read " << - (response.len == request.len ? "all " : "just ") << read); - } else { - response.xerrno = errno; - response.len = 0; + (len == ipcIo.len ? "all " : "just ") << read); + ipcIo.len = len; + } else { + ipcIo.xerrno = errno; + ipcIo.len = 0; debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " << - response.xerrno); - } - - Ipc::TypedMsgHdr message; - response.pack(message); - const String addr = - Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId); - Ipc::SendMessage(addr, message); + ipcIo.xerrno); + } } -static -void diskerWrite(const IpcIoRequest &request) +static void +diskerWrite(IpcIoMsg &ipcIo) { - debugs(47,5, HERE << "disker" << KidIdentifier << " writes " << - request.len << " at " << request.offset << - " ipcIo" << request.requestorId << '.' << request.requestId); - - IpcIoResponse response; - response.diskId = KidIdentifier; - response.requestId = request.requestId; - response.command = request.command; - - const ssize_t wrote = pwrite(TheFile, request.buf, request.len, request.offset); + debugs(0,0,HERE); + const ssize_t wrote = pwrite(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset); if (wrote >= 0) { - response.xerrno = 0; - response.len = static_cast(wrote); // safe because wrote > 0 + ipcIo.xerrno = 0; + const size_t len = static_cast(wrote); // safe because wrote > 0 debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " << - (response.len == request.len ? "all " : "just ") << wrote); - } else { - response.xerrno = errno; - response.len = 0; + (len == ipcIo.len ? "all " : "just ") << wrote); + ipcIo.len = len; + } else { + ipcIo.xerrno = errno; + ipcIo.len = 0; debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " << - response.xerrno); - } - - Ipc::TypedMsgHdr message; - response.pack(message); - const String addr = - Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId); - Ipc::SendMessage(addr, message); + ipcIo.xerrno); + } } -/// called when disker receives an I/O request void -IpcIoFile::HandleRequest(const IpcIoRequest &request) +IpcIoFile::DiskerHandleRequests() { - switch (request.command) { - case IpcIo::cmdRead: - diskerRead(request); - break; + debugs(0,0,HERE); + Must(diskerQueue); + int workerId; + IpcIoMsg ipcIo; + while (diskerQueue->pop(workerId, ipcIo)) + DiskerHandleRequest(workerId, ipcIo); +} - case IpcIo::cmdWrite: - diskerWrite(request); - break; +/// called when disker receives an I/O request +void +IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo) +{ + debugs(0,0,HERE); + Must(diskerQueue); - default: + if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) { debugs(0,0, HERE << "disker" << KidIdentifier << - " should not receive " << request.command << - " ipcIo" << request.requestorId << '.' << request.requestId); - break; - } + " should not receive " << ipcIo.command << + " ipcIo" << workerId << '.' << ipcIo.requestId); + return; + } + + debugs(47,5, HERE << "disker" << KidIdentifier << + (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") << + ipcIo.len << " at " << ipcIo.offset << + " ipcIo" << workerId << '.' << ipcIo.requestId); + + if (ipcIo.command == IpcIo::cmdRead) + diskerRead(ipcIo); + else // ipcIo.command == IpcIo::cmdWrite + diskerWrite(ipcIo); + + diskerQueue->push(workerId, ipcIo); // XXX: report warning + + if (diskerQueue->pushedSize(workerId) == 1) + Notify(workerId); // notify worker } static bool DiskerOpen(const String &path, int flags, mode_t mode) { + debugs(0,0,HERE); assert(TheFile < 0); TheFile = file_open(path.termedBuf(), flags); @@ -567,16 +566,17 @@ DiskerOpen(const String &path, int flags, mode_t mode) debugs(47,0, HERE << "rock db error opening " << path << ": " << xstrerr(xerrno)); return false; - } + } store_open_disk_fd++; debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile); - return true; + return true; } static void DiskerClose(const String &path) { + debugs(0,0,HERE); if (TheFile >= 0) { file_close(TheFile); debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile); diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h index 10181eae1a..99573a22ba 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.h +++ b/src/DiskIO/IpcIo/IpcIoFile.h @@ -6,6 +6,7 @@ #include "DiskIO/DiskFile.h" #include "DiskIO/IORequestor.h" #include "ipc/forward.h" +#include "ipc/Queue.h" #include // TODO: expand to all classes @@ -19,43 +20,19 @@ enum { BufCapacity = 32*1024 }; // XXX: must not exceed TypedMsgHdr.maxSize } // namespace IpcIo -/// converts DiskIO requests to IPC messages -// TODO: make this IpcIoMsg to make IpcIoRequest and IpcIoResponse similar -class IpcIoRequest { +/// converts DiskIO requests to IPC queue messages +class IpcIoMsg { public: - IpcIoRequest(); - - explicit IpcIoRequest(const Ipc::TypedMsgHdr& msg); ///< from recvmsg() - void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg() + IpcIoMsg(); public: - int requestorId; ///< kidId of the requestor; used for response destination - unsigned int requestId; ///< unique for sender; matches request w/ response + unsigned int requestId; ///< unique for requestor; matches request w/ response - /* ReadRequest and WriteRequest parameters to pass to disker */ char buf[IpcIo::BufCapacity]; // XXX: inefficient off_t offset; size_t len; - IpcIo::Command command; ///< what disker is supposed to do -}; - -/// disker response to IpcIoRequest -class IpcIoResponse { -public: - IpcIoResponse(); - - explicit IpcIoResponse(const Ipc::TypedMsgHdr& msg); ///< from recvmsg() - void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg() - -public: - int diskId; ///< kidId of the responding disker - unsigned int requestId; ///< unique for sender; matches request w/ response - - char buf[IpcIo::BufCapacity]; // XXX: inefficient - size_t len; - - IpcIo::Command command; ///< what disker did + IpcIo::Command command; ///< what disker is supposed to do or did int xerrno; ///< I/O error code or zero }; @@ -83,44 +60,59 @@ public: virtual bool canWrite() const; virtual bool ioInProgress() const; - /// finds and calls the right IpcIoFile upon disker's response - static void HandleResponse(const Ipc::TypedMsgHdr &response); + /// handle open response from coordinator + static void HandleOpenResponse(const Ipc::StrandSearchResponse &response); - /// disker entry point for remote I/O requests - static void HandleRequest(const IpcIoRequest &request); + /// handle queue push notifications from worker or disker + static void HandleNotification(const Ipc::TypedMsgHdr &msg); protected: friend class IpcIoPendingRequest; - void openCompleted(const IpcIoResponse *response); - void readCompleted(ReadRequest *readRequest, const IpcIoResponse *); - void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse *); + void openCompleted(const Ipc::StrandSearchResponse *const response); + void readCompleted(ReadRequest *readRequest, const IpcIoMsg *const response); + void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response); private: - void send(IpcIoRequest &request, IpcIoPendingRequest *pending); + void trackPendingRequest(IpcIoPendingRequest &pending); + void push(IpcIoPendingRequest &pending); + IpcIoPendingRequest *dequeueRequest(const unsigned int requestId); + + static void Notify(const int peerId); - static IpcIoPendingRequest *DequeueRequest(unsigned int requestId); + static void CheckTimeouts(void *const param); + void checkTimeouts(); + void scheduleTimeoutCheck(); - static void CheckTimeouts(void* param); - static void ScheduleTimeoutCheck(); + void handleResponses(); + void handleResponse(const IpcIoMsg &ipcIo); + + static void DiskerHandleRequests(); + static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo); private: + typedef FewToOneBiQueue DiskerQueue; + typedef OneToOneBiQueue WorkerQueue; + const String dbName; ///< the name of the file we are managing int diskId; ///< the process ID of the disker we talk to + static DiskerQueue *diskerQueue; ///< IPC queue for disker + WorkerQueue *workerQueue; ///< IPC queue for worker RefCount ioRequestor; - int ioLevel; ///< number of pending I/O requests using this file - bool error_; ///< whether we have seen at least one I/O error (XXX) + unsigned int lastRequestId; ///< last requestId used + /// maps requestId to the handleResponse callback typedef std::map RequestMap; - static RequestMap TheRequestMap1; ///< older (or newer) pending requests - static RequestMap TheRequestMap2; ///< newer (or older) pending requests - static RequestMap *TheOlderRequests; ///< older requests (map1 or map2) - static RequestMap *TheNewerRequests; ///< newer requests (map2 or map1) - static bool TimeoutCheckScheduled; ///< we expect a CheckTimeouts() call + RequestMap requestMap1; ///< older (or newer) pending requests + RequestMap requestMap2; ///< newer (or older) pending requests + RequestMap *olderRequests; ///< older requests (map1 or map2) + RequestMap *newerRequests; ///< newer requests (map2 or map1) + bool timeoutCheckScheduled; ///< we expect a CheckTimeouts() call - static unsigned int LastRequestId; ///< last requestId used + typedef std::map IpcIoFiles; ///< maps diskerId to IpcIoFile + static IpcIoFiles ipcIoFiles; CBDATA_CLASS2(IpcIoFile); }; @@ -133,10 +125,11 @@ public: IpcIoPendingRequest(const IpcIoFile::Pointer &aFile); /// called when response is received and, with a nil response, on timeouts - void completeIo(IpcIoResponse *response); + void completeIo(const IpcIoMsg *const response); public: IpcIoFile::Pointer file; ///< the file object waiting for the response + unsigned int id; ///< request id ReadRequest *readRequest; ///< set if this is a read requests WriteRequest *writeRequest; ///< set if this is a write request diff --git a/src/fs/rock/RockDirMap.cc b/src/fs/rock/RockDirMap.cc index 6424265eee..d2b124ffa1 100644 --- a/src/fs/rock/RockDirMap.cc +++ b/src/fs/rock/RockDirMap.cc @@ -9,10 +9,8 @@ #include "Store.h" #include "fs/rock/RockDirMap.h" -static const char SharedMemoryName[] = "RockDirMap"; - Rock::DirMap::DirMap(const char *const aPath, const int limit): - path(aPath), shm(sharedMemoryName()) + path(aPath), shm(aPath) { shm.create(SharedSize(limit)); assert(shm.mem()); @@ -23,7 +21,7 @@ Rock::DirMap::DirMap(const char *const aPath, const int limit): } Rock::DirMap::DirMap(const char *const aPath): - path(aPath), shm(sharedMemoryName()) + path(aPath), shm(aPath) { shm.open(); assert(shm.mem()); @@ -271,22 +269,6 @@ Rock::DirMap::freeLocked(Slot &s) " in map [" << path << ']'); } -String -Rock::DirMap::sharedMemoryName() -{ - String result; - const char *begin = path.termedBuf(); - for (const char *end = strchr(begin, '/'); end; end = strchr(begin, '/')) { - if (begin != end) { - result.append(begin, end - begin); - result.append('.'); - } - begin = end + 1; - } - result.append(begin); - return result; -} - int Rock::DirMap::SharedSize(const int limit) { diff --git a/src/ipc/Coordinator.cc b/src/ipc/Coordinator.cc index afdec48cad..4c6a40638d 100644 --- a/src/ipc/Coordinator.cc +++ b/src/ipc/Coordinator.cc @@ -17,7 +17,6 @@ #include "mgr/Request.h" #include "mgr/Response.h" #include "mgr/StoreToCommWriter.h" -#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: layering violation */ CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator); @@ -77,16 +76,13 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message) handleRegistrationRequest(HereIamMessage(message)); break; - case mtIpcIoRequest: { // XXX: this should have been mtStrandSearchRequest - IpcIoRequest io(message); - StrandSearchRequest sr; - sr.requestorId = io.requestorId; - sr.requestId = io.requestId; - sr.tag.limitInit(io.buf, io.len); - debugs(54, 6, HERE << "Strand search request: " << io.requestorId << ' ' << io.requestId << ' ' << io.len << " cmd=" << io.command << " tag: " << sr.tag); + case mtStrandSearchRequest: { + const StrandSearchRequest sr(message); + debugs(54, 6, HERE << "Strand search request: " << sr.requestorId << + " tag: " << sr.tag); handleSearchRequest(sr); break; - } + } case mtSharedListenRequest: debugs(54, 6, HERE << "Shared listen request"); @@ -189,14 +185,9 @@ Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request, { debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " << request.tag << " is kid" << strand.kidId); - const StrandSearchResponse response0(request.requestId, strand); - // XXX: we should use StrandSearchResponse instead of converting it - IpcIoResponse io; - io.diskId = strand.kidId; - io.requestId = request.requestId; - io.command = IpcIo::cmdOpen; + const StrandSearchResponse response(request.data, strand); TypedMsgHdr message; - io.pack(message); + response.pack(message); SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); } diff --git a/src/ipc/Makefile.am b/src/ipc/Makefile.am index cff479eb29..eaa3d491a9 100644 --- a/src/ipc/Makefile.am +++ b/src/ipc/Makefile.am @@ -12,6 +12,7 @@ libipc_la_SOURCES = \ Kids.cc \ Kids.h \ Messages.h \ + Queue.h \ StartListening.cc \ StartListening.h \ StrandCoord.cc \ diff --git a/src/ipc/Messages.h b/src/ipc/Messages.h index e90e8bbd75..e3ee4fa9a2 100644 --- a/src/ipc/Messages.h +++ b/src/ipc/Messages.h @@ -20,7 +20,7 @@ namespace Ipc typedef enum { mtNone = 0, mtRegistration, mtStrandSearchRequest, mtStrandSearchResponse, mtSharedListenRequest, mtSharedListenResponse, - mtIpcIoRequest, mtIpcIoResponse, + mtIpcIoNotification, mtCacheMgrRequest, mtCacheMgrResponse } MessageType; diff --git a/src/ipc/Queue.h b/src/ipc/Queue.h new file mode 100644 index 0000000000..8c5554d66b --- /dev/null +++ b/src/ipc/Queue.h @@ -0,0 +1,251 @@ +/* + * $Id$ + * + */ + +#ifndef SQUID_IPC_QUEUE_H +#define SQUID_IPC_QUEUE_H + +#include "Array.h" +#include "ipc/AtomicWord.h" +#include "ipc/SharedMemory.h" +#include "SquidString.h" +#include "util.h" + +/// Lockless fixed-capacity queue for a single writer and a single +/// reader. Does not manage shared memory segment. +template +class OneToOneUniQueue { +public: + OneToOneUniQueue(const int aCapacity); + + int size() const { return theSize; } + int capacity() const { return theCapacity; } + + bool empty() const { return !theSize; } + bool full() const { return theSize == theCapacity; } + + bool pop(Value &value); ///< returns false iff the queue is empty + bool push(const Value &value); ///< returns false iff the queue is full + + static int Bytes2Items(int size); + static int Items2Bytes(const int size); + +private: + unsigned int theIn; ///< input index, used only in push() + unsigned int theOut; ///< output index, used only in pop() + + AtomicWord theSize; ///< number of items in the queue + const int theCapacity; ///< maximum number of items, i.e. theBuffer size + Value theBuffer[]; +}; + +/// Lockless fixed-capacity bidirectional queue for two processes. +/// Manages shared memory segment. +template +class OneToOneBiQueue { +public: + typedef OneToOneUniQueue UniQueue; + + /// Create a new shared queue. + OneToOneBiQueue(const char *const id, const int aCapacity); + OneToOneBiQueue(const char *const id); ///< Attach to existing shared queue. + + int pushedSize() const { return pushQueue->size(); } + + bool pop(Value &value) { return popQueue->pop(value); } + bool push(const Value &value) { return pushQueue->push(value); } + +private: + SharedMemory shm; ///< shared memory segment + UniQueue *popQueue; ///< queue to pop from for this process + UniQueue *pushQueue; ///< queue to push to for this process +}; + +/** + * Lockless fixed-capacity bidirectional queue for a limited number + * pricesses. Implements a star topology: Many worker processes + * communicate with the one central process. The central process uses + * FewToOneBiQueue object, while workers use OneToOneBiQueue objects + * created with the Attach() method. Each worker has a unique integer ID + * in [0, workerCount) range. + */ +template +class FewToOneBiQueue { +public: + typedef OneToOneBiQueue BiQueue; + + FewToOneBiQueue(const char *const id, const int aWorkerCount, const int aCapacity); + static BiQueue *Attach(const char *const id, const int workerId); + ~FewToOneBiQueue(); + + bool validWorkerId(const int workerId) const; + int workerCount() const { return theWorkerCount; } + int pushedSize(const int workerId) const; + + bool pop(int &workerId, Value &value); ///< returns false iff the queue is empty + bool push(const int workerId, const Value &value); ///< returns false iff the queue is full + +private: + static String BiQueueId(String id, const int workerId); + + int theLastPopWorkerId; ///< the last worker ID we pop()ed from + Vector biQueues; ///< worker queues + const int theWorkerCount; ///< number of worker processes + const int theCapacity; ///< per-worker capacity +}; + + +// OneToOneUniQueue + +template +OneToOneUniQueue::OneToOneUniQueue(const int aCapacity): + theIn(0), theOut(0), theSize(0), theCapacity(aCapacity) +{ + assert(theCapacity > 0); +} + +template +bool +OneToOneUniQueue::pop(Value &value) +{ + if (empty()) + return false; + + const unsigned int pos = theOut++ % theCapacity; + value = theBuffer[pos]; + --theSize; + return true; +} + +template +bool +OneToOneUniQueue::push(const Value &value) +{ + if (full()) + return false; + + const unsigned int pos = theIn++ % theCapacity; + theBuffer[pos] = value; + ++theSize; + return true; +} + +template +int +OneToOneUniQueue::Bytes2Items(int size) +{ + assert(size >= 0); + size -= sizeof(OneToOneUniQueue); + return size >= 0 ? size / sizeof(Value) : 0; +} + +template +int +OneToOneUniQueue::Items2Bytes(const int size) +{ + assert(size >= 0); + return sizeof(OneToOneUniQueue) + sizeof(Value) * size; +} + + +// OneToOneBiQueue + +template +OneToOneBiQueue::OneToOneBiQueue(const char *const id, const int capacity) : + shm(id) +{ + const int uniSize = UniQueue::Items2Bytes(capacity); + shm.create(uniSize * 2); + char *const mem = reinterpret_cast(shm.mem()); + assert(mem); + popQueue = new (mem) UniQueue(capacity); + pushQueue = new (mem + uniSize) UniQueue(capacity); +} + +template +OneToOneBiQueue::OneToOneBiQueue(const char *const id) : + shm(id) +{ + shm.open(); + char *const mem = reinterpret_cast(shm.mem()); + assert(mem); + pushQueue = reinterpret_cast(mem); + const int uniSize = pushQueue->Items2Bytes(pushQueue->capacity()); + popQueue = reinterpret_cast(mem + uniSize); +} + +// FewToOneBiQueue + +template +FewToOneBiQueue::FewToOneBiQueue(const char *const id, const int aWorkerCount, const int aCapacity): + theLastPopWorkerId(-1), theWorkerCount(aWorkerCount), + theCapacity(aCapacity) +{ + biQueues.reserve(theWorkerCount); + for (int i = 0; i < theWorkerCount; ++i) { + const String biQueueId = BiQueueId(id, i); + biQueues.push_back(new BiQueue(biQueueId.termedBuf(), theCapacity)); + } +} + +template +typename FewToOneBiQueue::BiQueue * +FewToOneBiQueue::Attach(const char *const id, const int workerId) +{ + return new BiQueue(BiQueueId(id, workerId).termedBuf()); +} + +template +FewToOneBiQueue::~FewToOneBiQueue() +{ + for (int i = 0; i < theWorkerCount; ++i) + delete biQueues[i]; +} + +template +bool FewToOneBiQueue::validWorkerId(const int workerId) const +{ + return 0 <= workerId && workerId < theWorkerCount; +} + +template +int FewToOneBiQueue::pushedSize(const int workerId) const +{ + assert(validWorkerId(workerId)); + return biQueues[workerId]->pushedSize(); +} + +template +bool +FewToOneBiQueue::pop(int &workerId, Value &value) +{ + ++theLastPopWorkerId; + for (int i = 0; i < theWorkerCount; ++i) { + theLastPopWorkerId = (theLastPopWorkerId + 1) % theWorkerCount; + if (biQueues[theLastPopWorkerId]->pop(value)) { + workerId = theLastPopWorkerId; + return true; + } + } + return false; +} + +template +bool +FewToOneBiQueue::push(const int workerId, const Value &value) +{ + assert(validWorkerId(workerId)); + return biQueues[workerId]->push(value); +} + +template +String +FewToOneBiQueue::BiQueueId(String id, const int workerId) +{ + id.append("__"); + id.append(xitoa(workerId)); + return id; +} + +#endif // SQUID_IPC_QUEUE_H diff --git a/src/ipc/SharedMemory.cc b/src/ipc/SharedMemory.cc index e7a64e7337..22633b12ee 100644 --- a/src/ipc/SharedMemory.cc +++ b/src/ipc/SharedMemory.cc @@ -16,7 +16,7 @@ #include #include -SharedMemory::SharedMemory(const String &id): +SharedMemory::SharedMemory(const char *const id): theName(GenerateName(id)), theFD(-1), theSize(-1), theMem(NULL) { } @@ -112,11 +112,18 @@ SharedMemory::detach() theMem = 0; } -/// Generate name for shared memory segment. +/// Generate name for shared memory segment. Replaces all slashes with dots. String -SharedMemory::GenerateName(const String &id) +SharedMemory::GenerateName(const char *id) { String name("/squid-"); + for (const char *slash = strchr(id, '/'); slash; slash = strchr(id, '/')) { + if (id != slash) { + name.append(id, slash - id); + name.append('.'); + } + id = slash + 1; + } name.append(id); return name; } diff --git a/src/ipc/SharedMemory.h b/src/ipc/SharedMemory.h index 8fd8d1c4ea..bba5d1e1ad 100644 --- a/src/ipc/SharedMemory.h +++ b/src/ipc/SharedMemory.h @@ -12,7 +12,7 @@ class SharedMemory { public: /// Create a shared memory segment. - SharedMemory(const String &id); + SharedMemory(const char *const id); ~SharedMemory(); /// Create a new shared memory segment. Fails if a segment with @@ -28,7 +28,7 @@ private: void attach(); void detach(); - static String GenerateName(const String &id); + static String GenerateName(const char *id); const String theName; ///< shared memory segment file name int theFD; ///< shared memory segment file descriptor diff --git a/src/ipc/Strand.cc b/src/ipc/Strand.cc index 54f82585ab..8a8e5120dd 100644 --- a/src/ipc/Strand.cc +++ b/src/ipc/Strand.cc @@ -11,6 +11,7 @@ #include "ipc/StrandCoord.h" #include "ipc/Messages.h" #include "ipc/SharedListen.h" +#include "ipc/StrandSearch.h" #include "ipc/Kids.h" #include "mgr/Request.h" #include "mgr/Response.h" @@ -60,13 +61,12 @@ void Ipc::Strand::receive(const TypedMsgHdr &message) SharedListenJoined(SharedListenResponse(message)); break; - case mtIpcIoRequest: - IpcIoFile::HandleRequest(IpcIoRequest(message)); + case mtStrandSearchResponse: + IpcIoFile::HandleOpenResponse(StrandSearchResponse(message)); break; - case mtIpcIoResponse: - IpcIoFile::HandleResponse(message); - break; + case mtIpcIoNotification: + IpcIoFile::HandleNotification(message); case mtCacheMgrRequest: handleCacheMgrRequest(Mgr::Request(message)); diff --git a/src/ipc/StrandSearch.cc b/src/ipc/StrandSearch.cc index 7568ed8c0a..b43690bd2e 100644 --- a/src/ipc/StrandSearch.cc +++ b/src/ipc/StrandSearch.cc @@ -12,16 +12,16 @@ #include "ipc/TypedMsgHdr.h" -Ipc::StrandSearchRequest::StrandSearchRequest(): requestorId(-1), requestId(0) +Ipc::StrandSearchRequest::StrandSearchRequest(): requestorId(-1), data(0) { } Ipc::StrandSearchRequest::StrandSearchRequest(const TypedMsgHdr &hdrMsg): - requestorId(-1), requestId(0) + requestorId(-1), data(NULL) { hdrMsg.checkType(mtStrandSearchRequest); hdrMsg.getPod(requestorId); - hdrMsg.getPod(requestId); + hdrMsg.getPod(data); hdrMsg.getString(tag); } @@ -29,30 +29,30 @@ void Ipc::StrandSearchRequest::pack(TypedMsgHdr &hdrMsg) const { hdrMsg.setType(mtStrandSearchRequest); hdrMsg.putPod(requestorId); - hdrMsg.putPod(requestId); + hdrMsg.putPod(data); hdrMsg.putString(tag); } /* StrandSearchResponse */ -Ipc::StrandSearchResponse::StrandSearchResponse(int aRequestId, +Ipc::StrandSearchResponse::StrandSearchResponse(void *const aData, const Ipc::StrandCoord &aStrand): - requestId(aRequestId), strand(aStrand) + data(aData), strand(aStrand) { } Ipc::StrandSearchResponse::StrandSearchResponse(const TypedMsgHdr &hdrMsg): - requestId(0) + data(NULL) { hdrMsg.checkType(mtStrandSearchResponse); - hdrMsg.getPod(requestId); + hdrMsg.getPod(data); strand.unpack(hdrMsg); } void Ipc::StrandSearchResponse::pack(TypedMsgHdr &hdrMsg) const { hdrMsg.setType(mtStrandSearchResponse); - hdrMsg.putPod(requestId); + hdrMsg.putPod(data); strand.pack(hdrMsg); } diff --git a/src/ipc/StrandSearch.h b/src/ipc/StrandSearch.h index a249a5df5e..2bd1d81e79 100644 --- a/src/ipc/StrandSearch.h +++ b/src/ipc/StrandSearch.h @@ -24,7 +24,7 @@ public: public: int requestorId; ///< sender-provided return address - unsigned int requestId; ///< sender-provided for response:request matching + void *data; ///< sender-provided opaque pointer String tag; ///< set when looking for a matching StrandCoord::tag }; @@ -32,12 +32,12 @@ public: class StrandSearchResponse { public: - StrandSearchResponse(int requestId, const StrandCoord &strand); + StrandSearchResponse(void *const aData, const StrandCoord &strand); explicit StrandSearchResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg() void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() public: - unsigned int requestId; ///< a copy of the StrandSearchRequest::requestId + void *data; ///< a copy of the StrandSearchRequest::data StrandCoord strand; ///< answer matching StrandSearchRequest criteria }; diff --git a/src/ipc/forward.h b/src/ipc/forward.h index 5a159b71e3..45a58a8e91 100644 --- a/src/ipc/forward.h +++ b/src/ipc/forward.h @@ -14,6 +14,7 @@ namespace Ipc class TypedMsgHdr; class StrandCoord; class HereIamMessage; +class StrandSearchResponse; } // namespace Ipc