From: Alex Rousskov Date: Tue, 1 Feb 2011 05:01:43 +0000 (-0700) Subject: Added IpcIo DiskIO module for communication with remote disk processes via UDS. X-Git-Tag: take01~11^2 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=254912f3ce622519bf0d040cb44f9fbfd9da30e3;p=thirdparty%2Fsquid.git Added IpcIo DiskIO module for communication with remote disk processes via UDS. Used IpcIo for Rock Store filesystem module. Added StrandSearch API: Workers use it to ask Coordinator for the right address (i.e., kid identifier) of the disk process for a given cache_dir path. If Coordinator does not know the answer, it waits for more disk processes to register. Implemented using generic tagging of kids (StrandCoord) and searching for the right tag. Raised UDS message size maximum to 36K in order to accommodate non-trivial rock store I/O while we are using UDS messages for I/O content. Fixed shutdown handling broken by hiding cache_dirs from Coordinator while switching IamPrimaryProcess() logic to use NumberOfKids() which needs cache_dir count. --- diff --git a/configure.ac b/configure.ac index 8dcdc7b21f..79682c3c60 100644 --- a/configure.ac +++ b/configure.ac @@ -633,6 +633,13 @@ for module in $squid_disk_module_candidates none; do DISK_LINKOBJS="$DISK_LINKOBJS DiskIO/Mmapped/MmappedDiskIOModule.o" ;; + IpcIo) + AC_MSG_NOTICE([Enabling IpcIo DiskIO module]) + DISK_LIBS="$DISK_LIBS libIpcIo.a" + DISK_MODULES="$DISK_MODULES IpcIo" + DISK_LINKOBJS="$DISK_LINKOBJS DiskIO/IpcIo/IpcIoDiskIOModule.o" + ;; + Blocking) AC_MSG_NOTICE([Enabling Blocking DiskIO module]) DISK_LIBS="$DISK_LIBS libBlocking.a" @@ -721,8 +728,8 @@ for fs in $squid_storeio_module_candidates none; do STORE_TESTS="$STORE_TESTS tests/testCoss$EXEEXT" ;; rock) - if ! test "x$squid_disk_module_candidates_Mmapped" = "xyes"; then - AC_MSG_ERROR([Storage modeule Rock requires Mmapped DiskIO module]) + if ! test "x$squid_disk_module_candidates_IpcIo" = "xyes"; then + AC_MSG_ERROR([Storage modeule Rock requires IpcIo DiskIO module]) fi STORE_TESTS="$STORE_TESTS tests/testRock$EXEEXT" ;; diff --git a/src/DiskIO/IpcIo/DiskIOIpcIo.cc b/src/DiskIO/IpcIo/DiskIOIpcIo.cc new file mode 100644 index 0000000000..e69de29bb2 diff --git a/src/DiskIO/IpcIo/IpcIoDiskIOModule.cc b/src/DiskIO/IpcIo/IpcIoDiskIOModule.cc new file mode 100644 index 0000000000..d0878e43e7 --- /dev/null +++ b/src/DiskIO/IpcIo/IpcIoDiskIOModule.cc @@ -0,0 +1,37 @@ +#include "squid.h" +#include "IpcIoDiskIOModule.h" +#include "IpcIoIOStrategy.h" + +IpcIoDiskIOModule::IpcIoDiskIOModule() +{ + ModuleAdd(*this); +} + +IpcIoDiskIOModule & +IpcIoDiskIOModule::GetInstance() +{ + return Instance; +} + +void +IpcIoDiskIOModule::init() +{} + +void +IpcIoDiskIOModule::shutdown() +{} + + +DiskIOStrategy* +IpcIoDiskIOModule::createStrategy() +{ + return new IpcIoIOStrategy(); +} + +IpcIoDiskIOModule IpcIoDiskIOModule::Instance; + +char const * +IpcIoDiskIOModule::type () const +{ + return "IpcIo"; +} diff --git a/src/DiskIO/IpcIo/IpcIoDiskIOModule.h b/src/DiskIO/IpcIo/IpcIoDiskIOModule.h new file mode 100644 index 0000000000..92e1cf244e --- /dev/null +++ b/src/DiskIO/IpcIo/IpcIoDiskIOModule.h @@ -0,0 +1,21 @@ +#ifndef SQUID_IPC_IODISKIOMODULE_H +#define SQUID_IPC_IODISKIOMODULE_H + +#include "DiskIO/DiskIOModule.h" + +class IpcIoDiskIOModule : public DiskIOModule +{ + +public: + static IpcIoDiskIOModule &GetInstance(); + IpcIoDiskIOModule(); + virtual void init(); + virtual void shutdown(); + virtual char const *type () const; + virtual DiskIOStrategy* createStrategy(); + +private: + static IpcIoDiskIOModule Instance; +}; + +#endif /* SQUID_IPC_IODISKIOMODULE_H */ diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc new file mode 100644 index 0000000000..9246e6c479 --- /dev/null +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -0,0 +1,532 @@ +/* + * $Id$ + * + * DEBUG: section 47 Store Directory Routines + */ + +#include "config.h" +#include "base/TextException.h" +#include "DiskIO/IORequestor.h" +#include "DiskIO/IpcIo/IpcIoFile.h" +#include "DiskIO/ReadRequest.h" +#include "DiskIO/WriteRequest.h" +#include "ipc/Messages.h" +#include "ipc/Port.h" +#include "ipc/UdsOp.h" + +CBDATA_CLASS_INIT(IpcIoFile); + +IpcIoFile::RequestsMap IpcIoFile::TheRequestsMap; +unsigned int IpcIoFile::LastRequestId = 0; + +static bool DiskerOpen(const String &path, int flags, mode_t mode); +static void DiskerClose(const String &path); + + +IpcIoFile::IpcIoFile(char const *aDb): + dbName(aDb), + diskId(-1), + ioLevel(0), + error_(false) +{ +} + +IpcIoFile::~IpcIoFile() +{ +} + +void +IpcIoFile::open(int flags, mode_t mode, RefCount callback) +{ + ioRequestor = callback; + Must(diskId < 0); // we do not know our disker yet + + if (IamDiskProcess()) { + error_ = !DiskerOpen(dbName, flags, mode); + ioRequestor->ioCompletedNotification(); + return; + } + + // XXX: use StrandSearchRequest instead + IpcIoRequest ipcIo; + ipcIo.requestorId = KidIdentifier; + ipcIo.command = IpcIo::cmdOpen; + ipcIo.len = dbName.size(); + assert(ipcIo.len <= sizeof(ipcIo.buf)); + memcpy(ipcIo.buf, dbName.rawBuf(), ipcIo.len); + + IpcIoPendingRequest *pending = new IpcIoPendingRequest(this); + send(ipcIo, pending); +} + +void +IpcIoFile::openCompleted(const IpcIoResponse &ipcResponse) { + if (ipcResponse.xerrno) { + debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno)); + error_ = true; + } else { + diskId = ipcResponse.diskId; + if (diskId < 0) { + error_ = true; + debugs(79,1, HERE << "error: no disker claimed " << dbName); + } + } + + ioRequestor->ioCompletedNotification(); +} + +/** + * Alias for IpcIoFile::open(...) + \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount callback) + */ +void +IpcIoFile::create(int flags, mode_t mode, RefCount callback) +{ + assert(false); // check + /* We use the same logic path for open */ + open(flags, mode, callback); +} + +void +IpcIoFile::close() +{ + assert(ioRequestor != NULL); + + if (IamDiskProcess()) + DiskerClose(dbName); + // XXX: else nothing to do? + + ioRequestor->closeCompleted(); +} + +bool +IpcIoFile::canRead() const +{ + return diskId >= 0; +} + +bool +IpcIoFile::canWrite() const +{ + return diskId >= 0; +} + +bool +IpcIoFile::error() const +{ + return error_; +} + +void +IpcIoFile::read(ReadRequest *readRequest) +{ + debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " << + readRequest->offset << ")"); + + assert(ioRequestor != NULL); + assert(readRequest->len >= 0); + assert(readRequest->offset >= 0); + Must(!error_); + + //assert(minOffset < 0 || minOffset <= readRequest->offset); + //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset); + + IpcIoRequest ipcIo; + ipcIo.requestorId = KidIdentifier; + ipcIo.command = IpcIo::cmdRead; + ipcIo.offset = readRequest->offset; + ipcIo.len = readRequest->len; + + IpcIoPendingRequest *pending = new IpcIoPendingRequest(this); + pending->readRequest = readRequest; + send(ipcIo, pending); +} + +void +IpcIoFile::readCompleted(ReadRequest *readRequest, + const IpcIoResponse &ipcResponse) +{ + if (ipcResponse.xerrno) { + debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno)); + error_ = true; + } + + const ssize_t rlen = error_ ? -1 : (ssize_t)readRequest->len; + const int errflag = error_ ? DISK_ERROR : DISK_OK; + // XXX: check buffering expectations of the recepient + ioRequestor->readCompleted(ipcResponse.buf, rlen, errflag, readRequest); +} + +void +IpcIoFile::write(WriteRequest *writeRequest) +{ + debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " << + writeRequest->offset << ")"); + + assert(ioRequestor != NULL); + assert(writeRequest->len >= 0); + assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len? + assert(writeRequest->offset >= 0); + Must(!error_); + + //assert(minOffset < 0 || minOffset <= writeRequest->offset); + //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset); + + IpcIoRequest ipcIo; + ipcIo.requestorId = KidIdentifier; + ipcIo.command = IpcIo::cmdWrite; + ipcIo.offset = writeRequest->offset; + ipcIo.len = writeRequest->len; + assert(ipcIo.len <= sizeof(ipcIo.buf)); + memcpy(ipcIo.buf, writeRequest->buf, ipcIo.len); // optimize away + + IpcIoPendingRequest *pending = new IpcIoPendingRequest(this); + pending->writeRequest = writeRequest; + send(ipcIo, pending); +} + +void +IpcIoFile::writeCompleted(WriteRequest *writeRequest, + const IpcIoResponse &ipcResponse) +{ + if (ipcResponse.xerrno) { + debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno)); + error_ = true; + } else + if (ipcResponse.len != writeRequest->len) { + debugs(79,1, HERE << "problem: " << ipcResponse.len << " < " << writeRequest->len); + error_ = true; + } + + if (writeRequest->free_func) + (writeRequest->free_func)(const_cast(writeRequest->buf)); // broken API? + + if (!error_) { + debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" << + diskId << " at " << writeRequest->offset); + } + + const ssize_t rlen = error_ ? 0 : (ssize_t)writeRequest->len; + const int errflag = error_ ? DISK_ERROR : DISK_OK; + ioRequestor->writeCompleted(errflag, rlen, writeRequest); +} + +bool +IpcIoFile::ioInProgress() const +{ + return ioLevel > 0; // XXX: todo +} + +/// sends an I/O request to disker +void +IpcIoFile::send(IpcIoRequest &ipcIo, IpcIoPendingRequest *pending) +{ + if (++LastRequestId == 0) // don't use zero value as requestId + ++LastRequestId; + ipcIo.requestId = LastRequestId; + TheRequestsMap[ipcIo.requestId] = pending; + + Ipc::TypedMsgHdr message; + ipcIo.pack(message); + + Must(diskId >= 0 || ipcIo.command == IpcIo::cmdOpen); + const String addr = diskId >= 0 ? + Ipc::Port::MakeAddr(Ipc::strandAddrPfx, diskId) : + Ipc::coordinatorAddr; + + debugs(47, 7, HERE << "asking disker" << diskId << " to " << + ipcIo.command << "; ipcIo" << KidIdentifier << '.' << ipcIo.requestId); + + Ipc::SendMessage(addr, message); + ++ioLevel; + + const double timeout = 10; // in seconds + eventAdd("IpcIoFile::requestTimedOut", &IpcIoFile::RequestTimedOut, + this, timeout, 0, false); +} + +/// called when disker responds to our I/O request +void +IpcIoFile::HandleResponse(const Ipc::TypedMsgHdr &raw) +{ + IpcIoResponse response(raw); + + const int requestId = response.requestId; + debugs(47, 7, HERE << "disker response to " << + response.command << "; ipcIo" << KidIdentifier << '.' << requestId); + + Must(requestId != 0); + + IpcIoPendingRequest *pending = DequeueRequest(requestId); + Must(pending); + + if (pending->readRequest) + pending->file->readCompleted(pending->readRequest, response); + else + if (pending->writeRequest) + pending->file->writeCompleted(pending->writeRequest, response); + else + pending->file->openCompleted(response); + + // XXX: leaking if throwinig + delete pending; +} + + +/// Mgr::IpcIoFile::requestTimedOut wrapper +void +IpcIoFile::RequestTimedOut(void* param) +{ + debugs(47, 1, HERE << "bug: request timedout and we cannot handle that"); + Must(param != NULL); + // XXX: cannot get to file because IpcIoFile is not cbdata-protected + // IpcIoFile* file = static_cast(param); + + // TODO: notify the pending request (XXX: which one?) + + // use async call to enable job call protection that time events lack + // CallJobHere(47, 5, mgrFwdr, IpcIoFile, requestTimedOut); +} + +/// Called when Coordinator fails to start processing the request [in time] +void +IpcIoFile::requestTimedOut() +{ + debugs(47, 3, HERE); + assert(false); // TODO: notify the pending request (XXX: which one?) +} + +/// called when we are no longer waiting for Coordinator to respond +void +IpcIoFile::removeTimeoutEvent() +{ + if (eventFind(&IpcIoFile::RequestTimedOut, this)) + eventDelete(&IpcIoFile::RequestTimedOut, this); +} + +/// returns and forgets the right IpcIoFile pending request +IpcIoPendingRequest * +IpcIoFile::DequeueRequest(unsigned int requestId) +{ + debugs(47, 3, HERE); + Must(requestId != 0); + RequestsMap::iterator i = TheRequestsMap.find(requestId); + if (i != TheRequestsMap.end()) { + IpcIoPendingRequest *pending = i->second; + TheRequestsMap.erase(i); + return pending; + } + return NULL; +} + +int +IpcIoFile::getFD() const +{ + assert(false); // not supported; TODO: remove this method from API + return -1; +} + + +/* IpcIoRequest */ + +IpcIoRequest::IpcIoRequest(): + requestorId(0), requestId(0), + offset(0), len(0), + command(IpcIo::cmdNone) +{ +} + +IpcIoRequest::IpcIoRequest(const Ipc::TypedMsgHdr& msg) +{ + msg.checkType(Ipc::mtIpcIoRequest); + msg.getPod(requestorId); + msg.getPod(requestId); + + msg.getPod(offset); + msg.getPod(len); + msg.getPod(command); + + if (command == IpcIo::cmdOpen || command == IpcIo::cmdWrite) + msg.getFixed(buf, len); +} + +void +IpcIoRequest::pack(Ipc::TypedMsgHdr& msg) const +{ + msg.setType(Ipc::mtIpcIoRequest); + msg.putPod(requestorId); + msg.putPod(requestId); + + msg.putPod(offset); + msg.putPod(len); + msg.putPod(command); + + if (command == IpcIo::cmdOpen || command == IpcIo::cmdWrite) + msg.putFixed(buf, len); +} + + +/* IpcIoResponse */ + +IpcIoResponse::IpcIoResponse(): + diskId(-1), + requestId(0), + len(0), + xerrno(0) +{ +} + +IpcIoResponse::IpcIoResponse(const Ipc::TypedMsgHdr& msg) +{ + msg.checkType(Ipc::mtIpcIoResponse); + msg.getPod(diskId); + msg.getPod(requestId); + msg.getPod(len); + msg.getPod(command); + msg.getPod(xerrno); + + if (command == IpcIo::cmdRead && !xerrno) + msg.getFixed(buf, len); +} + +void +IpcIoResponse::pack(Ipc::TypedMsgHdr& msg) const +{ + msg.setType(Ipc::mtIpcIoResponse); + msg.putPod(diskId); + msg.putPod(requestId); + msg.putPod(len); + msg.putPod(command); + msg.putPod(xerrno); + + if (command == IpcIo::cmdRead && !xerrno) + msg.putFixed(buf, len); +} + + +/* IpcIoPendingRequest: */ + +IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile): + file(aFile), readRequest(NULL), writeRequest(NULL) +{ +} + + +/* XXX: disker code that should probably be moved elsewhere */ + +static int TheFile = -1; ///< db file descriptor + +static +void diskerRead(const IpcIoRequest &request) +{ + debugs(47,5, HERE << "disker" << KidIdentifier << " reads " << + request.len << " at " << request.offset << + " ipcIo" << request.requestorId << '.' << request.requestId); + + IpcIoResponse response; + response.diskId = KidIdentifier; + response.requestId = request.requestId; + response.command = request.command; + + const ssize_t read = pread(TheFile, response.buf, request.len, request.offset); + if (read >= 0) { + response.xerrno = 0; + response.len = static_cast(read); // safe because read > 0 + debugs(47,8, HERE << "disker" << KidIdentifier << " read " << + (response.len == request.len ? "all " : "just ") << read); + } else { + response.xerrno = errno; + response.len = 0; + debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " << + response.xerrno); + } + + Ipc::TypedMsgHdr message; + response.pack(message); + const String addr = + Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId); + Ipc::SendMessage(addr, message); +} + +static +void diskerWrite(const IpcIoRequest &request) +{ + debugs(47,5, HERE << "disker" << KidIdentifier << " writes " << + request.len << " at " << request.offset << + " ipcIo" << request.requestorId << '.' << request.requestId); + + IpcIoResponse response; + response.diskId = KidIdentifier; + response.requestId = request.requestId; + response.command = request.command; + + const ssize_t wrote = pwrite(TheFile, request.buf, request.len, request.offset); + if (wrote >= 0) { + response.xerrno = 0; + response.len = static_cast(wrote); // safe because wrote > 0 + debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " << + (response.len == request.len ? "all " : "just ") << wrote); + } else { + response.xerrno = errno; + response.len = 0; + debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " << + response.xerrno); + } + + Ipc::TypedMsgHdr message; + response.pack(message); + const String addr = + Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId); + Ipc::SendMessage(addr, message); +} + +/// called when disker receives an I/O request +void +IpcIoFile::HandleRequest(const IpcIoRequest &request) +{ + switch (request.command) { + case IpcIo::cmdRead: + diskerRead(request); + break; + + case IpcIo::cmdWrite: + diskerWrite(request); + break; + + default: + debugs(0,0, HERE << "disker" << KidIdentifier << + " should not receive " << request.command << + " ipcIo" << request.requestorId << '.' << request.requestId); + break; + } +} + +static bool +DiskerOpen(const String &path, int flags, mode_t mode) +{ + assert(TheFile < 0); + + TheFile = file_open(path.termedBuf(), flags); + + if (TheFile < 0) { + const int xerrno = errno; + debugs(47,0, HERE << "rock db error opening " << path << ": " << + xstrerr(xerrno)); + return false; + } + + store_open_disk_fd++; + debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile); + return true; +} + +static void +DiskerClose(const String &path) +{ + if (TheFile >= 0) { + file_close(TheFile); + debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile); + TheFile = -1; + store_open_disk_fd--; + } +} diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h new file mode 100644 index 0000000000..3b23c40227 --- /dev/null +++ b/src/DiskIO/IpcIo/IpcIoFile.h @@ -0,0 +1,140 @@ +#ifndef SQUID_IPC_IOFILE_H +#define SQUID_IPC_IOFILE_H + +#include "base/AsyncCall.h" +#include "cbdata.h" +#include "DiskIO/DiskFile.h" +#include "DiskIO/IORequestor.h" +#include "ipc/forward.h" +#include + +// TODO: expand to all classes +namespace IpcIo { + +/// what kind of I/O the disker needs to do or have done +typedef enum { cmdNone, cmdOpen, cmdRead, cmdWrite } Command; + +enum { BufCapacity = 32*1024 }; // XXX: must not exceed TypedMsgHdr.maxSize + +} // namespace IpcIo + + +/// converts DiskIO requests to IPC messages +// TODO: make this IpcIoMsg to make IpcIoRequest and IpcIoResponse similar +class IpcIoRequest { +public: + IpcIoRequest(); + + explicit IpcIoRequest(const Ipc::TypedMsgHdr& msg); ///< from recvmsg() + void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg() + +public: + int requestorId; ///< kidId of the requestor; used for response destination + unsigned int requestId; ///< unique for sender; matches request w/ response + + /* ReadRequest and WriteRequest parameters to pass to disker */ + char buf[IpcIo::BufCapacity]; // XXX: inefficient + off_t offset; + size_t len; + + IpcIo::Command command; ///< what disker is supposed to do +}; + +/// disker response to IpcIoRequest +class IpcIoResponse { +public: + IpcIoResponse(); + + explicit IpcIoResponse(const Ipc::TypedMsgHdr& msg); ///< from recvmsg() + void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg() + +public: + int diskId; ///< kidId of the responding disker + unsigned int requestId; ///< unique for sender; matches request w/ response + + char buf[IpcIo::BufCapacity]; // XXX: inefficient + size_t len; + + IpcIo::Command command; ///< what disker did + + int xerrno; ///< I/O error code or zero +}; + +class IpcIoPendingRequest; + +class IpcIoFile: public DiskFile +{ + +public: + typedef RefCount Pointer; + + IpcIoFile(char const *aDb); + virtual ~IpcIoFile(); + + /* DiskFile API */ + virtual void open(int flags, mode_t mode, RefCount callback); + virtual void create(int flags, mode_t mode, RefCount callback); + virtual void read(ReadRequest *); + virtual void write(WriteRequest *); + virtual void close(); + virtual bool error() const; + virtual int getFD() const; + virtual bool canRead() const; + virtual bool canWrite() const; + virtual bool ioInProgress() const; + + /// finds and calls the right IpcIoFile upon disker's response + static void HandleResponse(const Ipc::TypedMsgHdr &response); + + /// disker entry point for remote I/O requests + static void HandleRequest(const IpcIoRequest &request); + +private: + void send(IpcIoRequest &request, IpcIoPendingRequest *pending); + + void openCompleted(const IpcIoResponse &); + void readCompleted(ReadRequest *readRequest, const IpcIoResponse &); + void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse &); + + static void RequestTimedOut(void* param); + void requestTimedOut(); + void removeTimeoutEvent(); + static IpcIoPendingRequest *DequeueRequest(unsigned int requestId); + +private: + const String dbName; ///< the name of the file we are managing + int diskId; ///< the process ID of the disker we talk to + RefCount ioRequestor; + + int ioLevel; ///< number of pending I/O requests using this file + + bool error_; ///< whether we have seen at least one I/O error (XXX) + + /// maps requestId to the handleResponse callback + typedef std::map RequestsMap; + static RequestsMap TheRequestsMap; ///< pending requests map + + static unsigned int LastRequestId; ///< last requestId used + + CBDATA_CLASS2(IpcIoFile); +}; + + +/// keeps original I/O request parameters while disker is handling the request +class IpcIoPendingRequest +{ +public: + IpcIoPendingRequest(const IpcIoFile::Pointer &aFile); + +public: + IpcIoFile::Pointer file; ///< the file object waiting for the response + ReadRequest *readRequest; ///< set if this is a read requests + WriteRequest *writeRequest; ///< set if this is a write request + +private: + IpcIoPendingRequest(const IpcIoPendingRequest &d); // not implemented + IpcIoPendingRequest &operator =(const IpcIoPendingRequest &d); // ditto +}; + + +#endif /* SQUID_IPC_IOFILE_H */ diff --git a/src/DiskIO/IpcIo/IpcIoIOStrategy.cc b/src/DiskIO/IpcIo/IpcIoIOStrategy.cc new file mode 100644 index 0000000000..c5e46c6b0d --- /dev/null +++ b/src/DiskIO/IpcIo/IpcIoIOStrategy.cc @@ -0,0 +1,37 @@ + +/* + * $Id$ + * + * DEBUG: section 47 Store Directory Routines + */ + +#include "IpcIoIOStrategy.h" +#include "IpcIoFile.h" +bool +IpcIoIOStrategy::shedLoad() +{ + return false; +} + +int +IpcIoIOStrategy::load() +{ + /* Return 999 (99.9%) constant load */ + return 999; +} + +DiskFile::Pointer +IpcIoIOStrategy::newFile (char const *path) +{ + return new IpcIoFile (path); +} + +void +IpcIoIOStrategy::unlinkFile(char const *path) +{ +#if USE_UNLINKD + unlinkdUnlink(path); +#else + ::unlink(path); +#endif +} diff --git a/src/DiskIO/IpcIo/IpcIoIOStrategy.h b/src/DiskIO/IpcIo/IpcIoIOStrategy.h new file mode 100644 index 0000000000..3bd2945bbc --- /dev/null +++ b/src/DiskIO/IpcIo/IpcIoIOStrategy.h @@ -0,0 +1,15 @@ +#ifndef SQUID_IPC_IOIOSTRATEGY_H +#define SQUID_IPC_IOIOSTRATEGY_H +#include "DiskIO/DiskIOStrategy.h" + +class IpcIoIOStrategy : public DiskIOStrategy +{ + +public: + virtual bool shedLoad(); + virtual int load(); + virtual RefCount newFile(char const *path); + virtual void unlinkFile (char const *); +}; + +#endif /* SQUID_IPC_IOIOSTRATEGY_H */ diff --git a/src/Makefile.am b/src/Makefile.am index c4c0b438b0..2300eba3b4 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -156,7 +156,7 @@ AIOPS_SOURCE = DiskIO/DiskThreads/aiops.cc endif EXTRA_LIBRARIES = libAIO.a libBlocking.a libDiskDaemon.a libDiskThreads.a \ - libMmapped.a + libMmapped.a libIpcIo.a noinst_LIBRARIES = $(DISK_LIBS) noinst_LTLIBRARIES = libsquid.la @@ -716,6 +716,14 @@ libMmapped_a_SOURCES = \ DiskIO/Mmapped/MmappedDiskIOModule.cc \ DiskIO/Mmapped/MmappedDiskIOModule.h +libIpcIo_a_SOURCES = \ + DiskIO/IpcIo/IpcIoFile.cc \ + DiskIO/IpcIo/IpcIoFile.h \ + DiskIO/IpcIo/IpcIoIOStrategy.cc \ + DiskIO/IpcIo/IpcIoIOStrategy.h \ + DiskIO/IpcIo/IpcIoDiskIOModule.cc \ + DiskIO/IpcIo/IpcIoDiskIOModule.h + libDiskDaemon_a_SOURCES = \ DiskIO/DiskDaemon/DiskdFile.cc \ DiskIO/DiskDaemon/DiskdFile.h \ diff --git a/src/cache_cf.cc b/src/cache_cf.cc index 1ec303cba9..eaa610107c 100644 --- a/src/cache_cf.cc +++ b/src/cache_cf.cc @@ -1885,8 +1885,11 @@ static void parse_cachedir(SquidConfig::_cacheSwap * swap) { // coordinator does not need to handle cache_dir. - if (IamCoordinatorProcess()) + if (IamCoordinatorProcess()) { + // make sure the NumberOfKids() is correct for coordinator + ++swap->n_processes; // XXX: does not work in reconfigure return; + } char *type_str; char *path_str; @@ -1953,6 +1956,7 @@ parse_cachedir(SquidConfig::_cacheSwap * swap) sd->parse(swap->n_configured, path_str); ++swap->n_configured; + ++swap->n_processes; /* Update the max object size */ update_maxobjsize(); diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index 207cd7e6fe..cc5324f104 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -109,7 +109,7 @@ Rock::SwapDir::init() // are refcounted. We up our count once to avoid implicit delete's. RefCountReference(); - DiskIOModule *m = DiskIOModule::Find("Mmapped"); // TODO: configurable? + DiskIOModule *m = DiskIOModule::Find("IpcIo"); // TODO: configurable? assert(m); io = m->createStrategy(); io->init(); diff --git a/src/ipc/Coordinator.cc b/src/ipc/Coordinator.cc index d7f3419496..f1e6032132 100644 --- a/src/ipc/Coordinator.cc +++ b/src/ipc/Coordinator.cc @@ -17,6 +17,7 @@ #include "mgr/Request.h" #include "mgr/Response.h" #include "mgr/StoreToCommWriter.h" +#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: layering violation */ CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator); @@ -45,10 +46,23 @@ Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId) void Ipc::Coordinator::registerStrand(const StrandCoord& strand) { + debugs(54, 3, HERE << "registering kid" << strand.kidId << + ' ' << strand.tag); if (StrandCoord* found = findStrand(strand.kidId)) *found = strand; else strands_.push_back(strand); + + // notify searchers waiting for this new strand, if any + typedef Searchers::iterator SRI; + for (SRI i = searchers.begin(); i != searchers.end();) { + if (i->tag == strand.tag) { + notifySearcher(*i, strand); + i = searchers.erase(i); + } else { + ++i; + } + } } void Ipc::Coordinator::receive(const TypedMsgHdr& message) @@ -56,9 +70,20 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message) switch (message.type()) { case mtRegistration: debugs(54, 6, HERE << "Registration request"); - handleRegistrationRequest(StrandCoord(message)); + handleRegistrationRequest(HereIamMessage(message)); break; + case mtIpcIoRequest: { // XXX: this should have been mtStrandSearchRequest + IpcIoRequest io(message); + StrandSearchRequest sr; + sr.requestorId = io.requestorId; + sr.requestId = io.requestId; + sr.tag.limitInit(io.buf, io.len); + debugs(54, 6, HERE << "Strand search request: " << io.requestorId << ' ' << io.requestId << ' ' << io.len << " cmd=" << io.command << " tag: " << sr.tag); + handleSearchRequest(sr); + break; + } + case mtSharedListenRequest: debugs(54, 6, HERE << "Shared listen request"); handleSharedListenRequest(SharedListenRequest(message)); @@ -80,14 +105,14 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message) } } -void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand) +void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg) { - registerStrand(strand); + registerStrand(msg.strand); // send back an acknowledgement; TODO: remove as not needed? TypedMsgHdr message; - strand.pack(message); - SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message); + msg.pack(message); + SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message); } void @@ -133,6 +158,45 @@ Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response) Mgr::Inquirer::HandleRemoteAck(response); } +void +Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request) +{ + // do we know of a strand with the given search tag? + const StrandCoord *strand = NULL; + typedef StrandCoords::const_iterator SCCI; + for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) { + if (i->tag == request.tag) + strand = &(*i); + } + + if (strand) { + notifySearcher(request, *strand); + return; + } + + searchers.push_back(request); + debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId << + " who " << request.tag << " is"); +} + +void +Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request, + const StrandCoord& strand) +{ + debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " << + request.tag << " is kid" << strand.kidId); + const StrandSearchResponse response0(request.requestId, strand); + // XXX: we should use StrandSearchResponse instead of converting it + IpcIoResponse io; + io.diskId = strand.kidId; + io.requestId = request.requestId; + io.command = IpcIo::cmdOpen; + TypedMsgHdr message; + io.pack(message); + SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message); +} + + int Ipc::Coordinator::openListenSocket(const SharedListenRequest& request, int &errNo) diff --git a/src/ipc/Coordinator.h b/src/ipc/Coordinator.h index 11856b1d3e..f2f4da03a4 100644 --- a/src/ipc/Coordinator.h +++ b/src/ipc/Coordinator.h @@ -14,8 +14,10 @@ #include "ipc/Port.h" #include "ipc/SharedListen.h" #include "ipc/StrandCoords.h" +#include "ipc/StrandSearch.h" #include "mgr/forward.h" +#include #include namespace Ipc @@ -40,7 +42,12 @@ protected: StrandCoord* findStrand(int kidId); ///< registered strand or NULL void registerStrand(const StrandCoord &); ///< adds or updates existing - void handleRegistrationRequest(const StrandCoord &); ///< register,ACK + void handleRegistrationRequest(const HereIamMessage &); ///< register,ACK + + /// answer the waiting search request + void notifySearcher(const StrandSearchRequest &request, const StrandCoord&); + /// answers or queues the request if the answer is not yet known + void handleSearchRequest(const StrandSearchRequest &request); /// returns cached socket or calls openListenSocket() void handleSharedListenRequest(const SharedListenRequest& request); @@ -53,6 +60,9 @@ protected: private: StrandCoords strands_; ///< registered processes and threads + typedef std::list Searchers; ///< search requests + Searchers searchers; ///< yet unanswered search requests in arrival order + typedef std::map Listeners; ///< params:fd map Listeners listeners; ///< cached comm_open_listener() results diff --git a/src/ipc/Makefile.am b/src/ipc/Makefile.am index e2d13504d9..522adc8d9e 100644 --- a/src/ipc/Makefile.am +++ b/src/ipc/Makefile.am @@ -16,6 +16,8 @@ libipc_la_SOURCES = \ StrandCoord.cc \ StrandCoord.h \ StrandCoords.h \ + StrandSearch.cc \ + StrandSearch.h \ SharedListen.cc \ SharedListen.h \ TypedMsgHdr.cc \ diff --git a/src/ipc/Messages.h b/src/ipc/Messages.h index 57ae03632d..e90e8bbd75 100644 --- a/src/ipc/Messages.h +++ b/src/ipc/Messages.h @@ -18,7 +18,9 @@ namespace Ipc /// message class identifier typedef enum { mtNone = 0, mtRegistration, + mtStrandSearchRequest, mtStrandSearchResponse, mtSharedListenRequest, mtSharedListenResponse, + mtIpcIoRequest, mtIpcIoResponse, mtCacheMgrRequest, mtCacheMgrResponse } MessageType; diff --git a/src/ipc/Strand.cc b/src/ipc/Strand.cc index a1e9f59c5d..9c7e926249 100644 --- a/src/ipc/Strand.cc +++ b/src/ipc/Strand.cc @@ -15,6 +15,8 @@ #include "mgr/Request.h" #include "mgr/Response.h" #include "mgr/Forwarder.h" +#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: scope boundary violation */ +#include "SwapDir.h" /* XXX: scope boundary violation */ #include "CacheManager.h" @@ -37,8 +39,21 @@ void Ipc::Strand::registerSelf() { debugs(54, 6, HERE); Must(!isRegistered); + + HereIamMessage ann(StrandCoord(KidIdentifier, getpid())); + + // announce that we are responsible for our cache_dir if needed + // XXX: misplaced + if (IamDiskProcess()) { + const int myDisk = KidIdentifier % Config.cacheSwap.n_processes; + if (const SwapDir *sd = dynamic_cast(INDEXSD(myDisk))) { + ann.strand.tag = sd->path; + ann.strand.tag.append("/rock"); // XXX: scope boundary violation + } + } + TypedMsgHdr message; - StrandCoord(KidIdentifier, getpid()).pack(message); + ann.pack(message); SendMessage(coordinatorAddr, message); setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable? } @@ -49,13 +64,21 @@ void Ipc::Strand::receive(const TypedMsgHdr &message) switch (message.type()) { case mtRegistration: - handleRegistrationResponse(StrandCoord(message)); + handleRegistrationResponse(HereIamMessage(message)); break; case mtSharedListenResponse: SharedListenJoined(SharedListenResponse(message)); break; + case mtIpcIoRequest: + IpcIoFile::HandleRequest(IpcIoRequest(message)); + break; + + case mtIpcIoResponse: + IpcIoFile::HandleResponse(message); + break; + case mtCacheMgrRequest: handleCacheMgrRequest(Mgr::Request(message)); break; @@ -70,10 +93,10 @@ void Ipc::Strand::receive(const TypedMsgHdr &message) } } -void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand) +void Ipc::Strand::handleRegistrationResponse(const HereIamMessage &msg) { // handle registration response from the coordinator; it could be stale - if (strand.kidId == KidIdentifier && strand.pid == getpid()) { + if (msg.strand.kidId == KidIdentifier && msg.strand.pid == getpid()) { debugs(54, 6, "kid" << KidIdentifier << " registered"); clearTimeout(); // we are done } else { diff --git a/src/ipc/Strand.h b/src/ipc/Strand.h index d01cf2b7f2..c6f84ba9fc 100644 --- a/src/ipc/Strand.h +++ b/src/ipc/Strand.h @@ -8,6 +8,7 @@ #ifndef SQUID_IPC_STRAND_H #define SQUID_IPC_STRAND_H +#include "ipc/forward.h" #include "ipc/Port.h" #include "mgr/forward.h" @@ -31,7 +32,7 @@ protected: private: void registerSelf(); /// let Coordinator know this strand exists - void handleRegistrationResponse(const StrandCoord &strand); + void handleRegistrationResponse(const HereIamMessage &msg); void handleCacheMgrRequest(const Mgr::Request& request); void handleCacheMgrResponse(const Mgr::Response& response); diff --git a/src/ipc/StrandCoord.cc b/src/ipc/StrandCoord.cc index 3fdf45a9fc..703a7aafd2 100644 --- a/src/ipc/StrandCoord.cc +++ b/src/ipc/StrandCoord.cc @@ -7,6 +7,7 @@ #include "config.h" +#include "Debug.h" #include "ipc/Messages.h" #include "ipc/StrandCoord.h" #include "ipc/TypedMsgHdr.h" @@ -20,14 +21,37 @@ Ipc::StrandCoord::StrandCoord(int aKidId, pid_t aPid): kidId(aKidId), pid(aPid) { } -Ipc::StrandCoord::StrandCoord(const TypedMsgHdr &hdrMsg): kidId(-1), pid(0) +void +Ipc::StrandCoord::unpack(const TypedMsgHdr &hdrMsg) { - hdrMsg.checkType(mtRegistration); - hdrMsg.getPod(*this); + hdrMsg.getPod(kidId); + hdrMsg.getPod(pid); + hdrMsg.getString(tag); +debugs(0,0, HERE << "getting tag: " << tag); } void Ipc::StrandCoord::pack(TypedMsgHdr &hdrMsg) const +{ + hdrMsg.putPod(kidId); + hdrMsg.putPod(pid); +debugs(0,0, HERE << "putting tag: " << tag); + hdrMsg.putString(tag); +} + + +Ipc::HereIamMessage::HereIamMessage(const StrandCoord &aStrand): + strand(aStrand) +{ +} + +Ipc::HereIamMessage::HereIamMessage(const TypedMsgHdr &hdrMsg) +{ + hdrMsg.checkType(mtRegistration); + strand.unpack(hdrMsg); +} + +void Ipc::HereIamMessage::pack(TypedMsgHdr &hdrMsg) const { hdrMsg.setType(mtRegistration); - hdrMsg.putPod(*this); + strand.pack(hdrMsg); } diff --git a/src/ipc/StrandCoord.h b/src/ipc/StrandCoord.h index 02f2056de8..0970f0cfcf 100644 --- a/src/ipc/StrandCoord.h +++ b/src/ipc/StrandCoord.h @@ -7,6 +7,7 @@ #define SQUID_IPC_STRAND_COORD_H #include "ipc/forward.h" +#include "SquidString.h" #include namespace Ipc @@ -17,15 +18,32 @@ class StrandCoord { public: StrandCoord(); ///< unknown location - StrandCoord(int akidId, pid_t aPid); ///< from registrant - explicit StrandCoord(const TypedMsgHdr &hdrMsg); ///< from recvmsg() + StrandCoord(int akidId, pid_t aPid); + void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() + void unpack(const TypedMsgHdr &hdrMsg); ///< from recvmsg() public: int kidId; ///< internal Squid process number pid_t pid; ///< OS process or thread identifier + + String tag; ///< optional unique well-known key (e.g., cache_dir path) }; +/// strand registration with Coordinator (also used as an ACK) +class HereIamMessage +{ +public: + explicit HereIamMessage(const StrandCoord &strand); ///< from registrant + explicit HereIamMessage(const TypedMsgHdr &hdrMsg); ///< from recvmsg() + void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() + +public: + StrandCoord strand; ///< registrant coordinates and related details +}; + + + } // namespace Ipc; #endif /* SQUID_IPC_STRAND_COORD_H */ diff --git a/src/ipc/TypedMsgHdr.h b/src/ipc/TypedMsgHdr.h index 95afe00dd6..e94d14f5f7 100644 --- a/src/ipc/TypedMsgHdr.h +++ b/src/ipc/TypedMsgHdr.h @@ -27,7 +27,7 @@ namespace Ipc class TypedMsgHdr: public msghdr { public: - enum {maxSize = 4096}; + enum {maxSize = 36*1024}; // XXX: so that disker I/O can fit public: TypedMsgHdr(); diff --git a/src/ipc/forward.h b/src/ipc/forward.h index 7325abb6bc..5a159b71e3 100644 --- a/src/ipc/forward.h +++ b/src/ipc/forward.h @@ -13,6 +13,7 @@ namespace Ipc class TypedMsgHdr; class StrandCoord; +class HereIamMessage; } // namespace Ipc diff --git a/src/structs.h b/src/structs.h index 2f70ffe98b..dd034b4fc7 100644 --- a/src/structs.h +++ b/src/structs.h @@ -497,6 +497,8 @@ struct SquidConfig { RefCount *swapDirs; int n_allocated; int n_configured; + ///< number of disk processes (set even when n_configured is not) + int n_processes; } cacheSwap; /* * I'm sick of having to keep doing this .. diff --git a/src/tools.cc b/src/tools.cc index 0cfe32a1ed..5d9a1218e2 100644 --- a/src/tools.cc +++ b/src/tools.cc @@ -855,7 +855,7 @@ IamPrimaryProcess() // when there is a master and worker process, the master delegates // primary functions to its only kid - if (Config.workers == 1) + if (NumberOfKids() == 1) return IamWorkerProcess(); // in SMP mode, multiple kids delegate primary functions to the coordinator @@ -872,7 +872,7 @@ NumberOfKids() // XXX: detect and abort when called before workers/cache_dirs are parsed // XXX: this is not always the case as there are other cache_dir types - const int rockDirs = Config.cacheSwap.n_configured; + const int rockDirs = Config.cacheSwap.n_processes; const bool needCoord = Config.workers > 1 || rockDirs > 0; return (needCoord ? 1 : 0) + Config.workers + rockDirs;