Used IpcIo for Rock Store filesystem module.
Added StrandSearch API: Workers use it to ask Coordinator for the right
address (i.e., kid identifier) of the disk process for a given cache_dir path.
If Coordinator does not know the answer, it waits for more disk processes to
register. Implemented using generic tagging of kids (StrandCoord) and
searching for the right tag.
Raised UDS message size maximum to 36K in order to accommodate non-trivial
rock store I/O while we are using UDS messages for I/O content.
Fixed shutdown handling broken by hiding cache_dirs from Coordinator while
switching IamPrimaryProcess() logic to use NumberOfKids() which needs
cache_dir count.
DISK_LINKOBJS="$DISK_LINKOBJS DiskIO/Mmapped/MmappedDiskIOModule.o"
;;
+ IpcIo)
+ AC_MSG_NOTICE([Enabling IpcIo DiskIO module])
+ DISK_LIBS="$DISK_LIBS libIpcIo.a"
+ DISK_MODULES="$DISK_MODULES IpcIo"
+ DISK_LINKOBJS="$DISK_LINKOBJS DiskIO/IpcIo/IpcIoDiskIOModule.o"
+ ;;
+
Blocking)
AC_MSG_NOTICE([Enabling Blocking DiskIO module])
DISK_LIBS="$DISK_LIBS libBlocking.a"
STORE_TESTS="$STORE_TESTS tests/testCoss$EXEEXT"
;;
rock)
- if ! test "x$squid_disk_module_candidates_Mmapped" = "xyes"; then
- AC_MSG_ERROR([Storage modeule Rock requires Mmapped DiskIO module])
+ if ! test "x$squid_disk_module_candidates_IpcIo" = "xyes"; then
+ AC_MSG_ERROR([Storage modeule Rock requires IpcIo DiskIO module])
fi
STORE_TESTS="$STORE_TESTS tests/testRock$EXEEXT"
;;
--- /dev/null
+#include "squid.h"
+#include "IpcIoDiskIOModule.h"
+#include "IpcIoIOStrategy.h"
+
+IpcIoDiskIOModule::IpcIoDiskIOModule()
+{
+ ModuleAdd(*this);
+}
+
+IpcIoDiskIOModule &
+IpcIoDiskIOModule::GetInstance()
+{
+ return Instance;
+}
+
+void
+IpcIoDiskIOModule::init()
+{}
+
+void
+IpcIoDiskIOModule::shutdown()
+{}
+
+
+DiskIOStrategy*
+IpcIoDiskIOModule::createStrategy()
+{
+ return new IpcIoIOStrategy();
+}
+
+IpcIoDiskIOModule IpcIoDiskIOModule::Instance;
+
+char const *
+IpcIoDiskIOModule::type () const
+{
+ return "IpcIo";
+}
--- /dev/null
+#ifndef SQUID_IPC_IODISKIOMODULE_H
+#define SQUID_IPC_IODISKIOMODULE_H
+
+#include "DiskIO/DiskIOModule.h"
+
+class IpcIoDiskIOModule : public DiskIOModule
+{
+
+public:
+ static IpcIoDiskIOModule &GetInstance();
+ IpcIoDiskIOModule();
+ virtual void init();
+ virtual void shutdown();
+ virtual char const *type () const;
+ virtual DiskIOStrategy* createStrategy();
+
+private:
+ static IpcIoDiskIOModule Instance;
+};
+
+#endif /* SQUID_IPC_IODISKIOMODULE_H */
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 47 Store Directory Routines
+ */
+
+#include "config.h"
+#include "base/TextException.h"
+#include "DiskIO/IORequestor.h"
+#include "DiskIO/IpcIo/IpcIoFile.h"
+#include "DiskIO/ReadRequest.h"
+#include "DiskIO/WriteRequest.h"
+#include "ipc/Messages.h"
+#include "ipc/Port.h"
+#include "ipc/UdsOp.h"
+
+CBDATA_CLASS_INIT(IpcIoFile);
+
+IpcIoFile::RequestsMap IpcIoFile::TheRequestsMap;
+unsigned int IpcIoFile::LastRequestId = 0;
+
+static bool DiskerOpen(const String &path, int flags, mode_t mode);
+static void DiskerClose(const String &path);
+
+
+IpcIoFile::IpcIoFile(char const *aDb):
+ dbName(aDb),
+ diskId(-1),
+ ioLevel(0),
+ error_(false)
+{
+}
+
+IpcIoFile::~IpcIoFile()
+{
+}
+
+void
+IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
+{
+ ioRequestor = callback;
+ Must(diskId < 0); // we do not know our disker yet
+
+ if (IamDiskProcess()) {
+ error_ = !DiskerOpen(dbName, flags, mode);
+ ioRequestor->ioCompletedNotification();
+ return;
+ }
+
+ // XXX: use StrandSearchRequest instead
+ IpcIoRequest ipcIo;
+ ipcIo.requestorId = KidIdentifier;
+ ipcIo.command = IpcIo::cmdOpen;
+ ipcIo.len = dbName.size();
+ assert(ipcIo.len <= sizeof(ipcIo.buf));
+ memcpy(ipcIo.buf, dbName.rawBuf(), ipcIo.len);
+
+ IpcIoPendingRequest *pending = new IpcIoPendingRequest(this);
+ send(ipcIo, pending);
+}
+
+void
+IpcIoFile::openCompleted(const IpcIoResponse &ipcResponse) {
+ if (ipcResponse.xerrno) {
+ debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno));
+ error_ = true;
+ } else {
+ diskId = ipcResponse.diskId;
+ if (diskId < 0) {
+ error_ = true;
+ debugs(79,1, HERE << "error: no disker claimed " << dbName);
+ }
+ }
+
+ ioRequestor->ioCompletedNotification();
+}
+
+/**
+ * Alias for IpcIoFile::open(...)
+ \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
+ */
+void
+IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
+{
+ assert(false); // check
+ /* We use the same logic path for open */
+ open(flags, mode, callback);
+}
+
+void
+IpcIoFile::close()
+{
+ assert(ioRequestor != NULL);
+
+ if (IamDiskProcess())
+ DiskerClose(dbName);
+ // XXX: else nothing to do?
+
+ ioRequestor->closeCompleted();
+}
+
+bool
+IpcIoFile::canRead() const
+{
+ return diskId >= 0;
+}
+
+bool
+IpcIoFile::canWrite() const
+{
+ return diskId >= 0;
+}
+
+bool
+IpcIoFile::error() const
+{
+ return error_;
+}
+
+void
+IpcIoFile::read(ReadRequest *readRequest)
+{
+ debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
+ readRequest->offset << ")");
+
+ assert(ioRequestor != NULL);
+ assert(readRequest->len >= 0);
+ assert(readRequest->offset >= 0);
+ Must(!error_);
+
+ //assert(minOffset < 0 || minOffset <= readRequest->offset);
+ //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
+
+ IpcIoRequest ipcIo;
+ ipcIo.requestorId = KidIdentifier;
+ ipcIo.command = IpcIo::cmdRead;
+ ipcIo.offset = readRequest->offset;
+ ipcIo.len = readRequest->len;
+
+ IpcIoPendingRequest *pending = new IpcIoPendingRequest(this);
+ pending->readRequest = readRequest;
+ send(ipcIo, pending);
+}
+
+void
+IpcIoFile::readCompleted(ReadRequest *readRequest,
+ const IpcIoResponse &ipcResponse)
+{
+ if (ipcResponse.xerrno) {
+ debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno));
+ error_ = true;
+ }
+
+ const ssize_t rlen = error_ ? -1 : (ssize_t)readRequest->len;
+ const int errflag = error_ ? DISK_ERROR : DISK_OK;
+ // XXX: check buffering expectations of the recepient
+ ioRequestor->readCompleted(ipcResponse.buf, rlen, errflag, readRequest);
+}
+
+void
+IpcIoFile::write(WriteRequest *writeRequest)
+{
+ debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
+ writeRequest->offset << ")");
+
+ assert(ioRequestor != NULL);
+ assert(writeRequest->len >= 0);
+ assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
+ assert(writeRequest->offset >= 0);
+ Must(!error_);
+
+ //assert(minOffset < 0 || minOffset <= writeRequest->offset);
+ //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
+
+ IpcIoRequest ipcIo;
+ ipcIo.requestorId = KidIdentifier;
+ ipcIo.command = IpcIo::cmdWrite;
+ ipcIo.offset = writeRequest->offset;
+ ipcIo.len = writeRequest->len;
+ assert(ipcIo.len <= sizeof(ipcIo.buf));
+ memcpy(ipcIo.buf, writeRequest->buf, ipcIo.len); // optimize away
+
+ IpcIoPendingRequest *pending = new IpcIoPendingRequest(this);
+ pending->writeRequest = writeRequest;
+ send(ipcIo, pending);
+}
+
+void
+IpcIoFile::writeCompleted(WriteRequest *writeRequest,
+ const IpcIoResponse &ipcResponse)
+{
+ if (ipcResponse.xerrno) {
+ debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno));
+ error_ = true;
+ } else
+ if (ipcResponse.len != writeRequest->len) {
+ debugs(79,1, HERE << "problem: " << ipcResponse.len << " < " << writeRequest->len);
+ error_ = true;
+ }
+
+ if (writeRequest->free_func)
+ (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
+
+ if (!error_) {
+ debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
+ diskId << " at " << writeRequest->offset);
+ }
+
+ const ssize_t rlen = error_ ? 0 : (ssize_t)writeRequest->len;
+ const int errflag = error_ ? DISK_ERROR : DISK_OK;
+ ioRequestor->writeCompleted(errflag, rlen, writeRequest);
+}
+
+bool
+IpcIoFile::ioInProgress() const
+{
+ return ioLevel > 0; // XXX: todo
+}
+
+/// sends an I/O request to disker
+void
+IpcIoFile::send(IpcIoRequest &ipcIo, IpcIoPendingRequest *pending)
+{
+ if (++LastRequestId == 0) // don't use zero value as requestId
+ ++LastRequestId;
+ ipcIo.requestId = LastRequestId;
+ TheRequestsMap[ipcIo.requestId] = pending;
+
+ Ipc::TypedMsgHdr message;
+ ipcIo.pack(message);
+
+ Must(diskId >= 0 || ipcIo.command == IpcIo::cmdOpen);
+ const String addr = diskId >= 0 ?
+ Ipc::Port::MakeAddr(Ipc::strandAddrPfx, diskId) :
+ Ipc::coordinatorAddr;
+
+ debugs(47, 7, HERE << "asking disker" << diskId << " to " <<
+ ipcIo.command << "; ipcIo" << KidIdentifier << '.' << ipcIo.requestId);
+
+ Ipc::SendMessage(addr, message);
+ ++ioLevel;
+
+ const double timeout = 10; // in seconds
+ eventAdd("IpcIoFile::requestTimedOut", &IpcIoFile::RequestTimedOut,
+ this, timeout, 0, false);
+}
+
+/// called when disker responds to our I/O request
+void
+IpcIoFile::HandleResponse(const Ipc::TypedMsgHdr &raw)
+{
+ IpcIoResponse response(raw);
+
+ const int requestId = response.requestId;
+ debugs(47, 7, HERE << "disker response to " <<
+ response.command << "; ipcIo" << KidIdentifier << '.' << requestId);
+
+ Must(requestId != 0);
+
+ IpcIoPendingRequest *pending = DequeueRequest(requestId);
+ Must(pending);
+
+ if (pending->readRequest)
+ pending->file->readCompleted(pending->readRequest, response);
+ else
+ if (pending->writeRequest)
+ pending->file->writeCompleted(pending->writeRequest, response);
+ else
+ pending->file->openCompleted(response);
+
+ // XXX: leaking if throwinig
+ delete pending;
+}
+
+
+/// Mgr::IpcIoFile::requestTimedOut wrapper
+void
+IpcIoFile::RequestTimedOut(void* param)
+{
+ debugs(47, 1, HERE << "bug: request timedout and we cannot handle that");
+ Must(param != NULL);
+ // XXX: cannot get to file because IpcIoFile is not cbdata-protected
+ // IpcIoFile* file = static_cast<IpcIoFile*>(param);
+
+ // TODO: notify the pending request (XXX: which one?)
+
+ // use async call to enable job call protection that time events lack
+ // CallJobHere(47, 5, mgrFwdr, IpcIoFile, requestTimedOut);
+}
+
+/// Called when Coordinator fails to start processing the request [in time]
+void
+IpcIoFile::requestTimedOut()
+{
+ debugs(47, 3, HERE);
+ assert(false); // TODO: notify the pending request (XXX: which one?)
+}
+
+/// called when we are no longer waiting for Coordinator to respond
+void
+IpcIoFile::removeTimeoutEvent()
+{
+ if (eventFind(&IpcIoFile::RequestTimedOut, this))
+ eventDelete(&IpcIoFile::RequestTimedOut, this);
+}
+
+/// returns and forgets the right IpcIoFile pending request
+IpcIoPendingRequest *
+IpcIoFile::DequeueRequest(unsigned int requestId)
+{
+ debugs(47, 3, HERE);
+ Must(requestId != 0);
+ RequestsMap::iterator i = TheRequestsMap.find(requestId);
+ if (i != TheRequestsMap.end()) {
+ IpcIoPendingRequest *pending = i->second;
+ TheRequestsMap.erase(i);
+ return pending;
+ }
+ return NULL;
+}
+
+int
+IpcIoFile::getFD() const
+{
+ assert(false); // not supported; TODO: remove this method from API
+ return -1;
+}
+
+
+/* IpcIoRequest */
+
+IpcIoRequest::IpcIoRequest():
+ requestorId(0), requestId(0),
+ offset(0), len(0),
+ command(IpcIo::cmdNone)
+{
+}
+
+IpcIoRequest::IpcIoRequest(const Ipc::TypedMsgHdr& msg)
+{
+ msg.checkType(Ipc::mtIpcIoRequest);
+ msg.getPod(requestorId);
+ msg.getPod(requestId);
+
+ msg.getPod(offset);
+ msg.getPod(len);
+ msg.getPod(command);
+
+ if (command == IpcIo::cmdOpen || command == IpcIo::cmdWrite)
+ msg.getFixed(buf, len);
+}
+
+void
+IpcIoRequest::pack(Ipc::TypedMsgHdr& msg) const
+{
+ msg.setType(Ipc::mtIpcIoRequest);
+ msg.putPod(requestorId);
+ msg.putPod(requestId);
+
+ msg.putPod(offset);
+ msg.putPod(len);
+ msg.putPod(command);
+
+ if (command == IpcIo::cmdOpen || command == IpcIo::cmdWrite)
+ msg.putFixed(buf, len);
+}
+
+
+/* IpcIoResponse */
+
+IpcIoResponse::IpcIoResponse():
+ diskId(-1),
+ requestId(0),
+ len(0),
+ xerrno(0)
+{
+}
+
+IpcIoResponse::IpcIoResponse(const Ipc::TypedMsgHdr& msg)
+{
+ msg.checkType(Ipc::mtIpcIoResponse);
+ msg.getPod(diskId);
+ msg.getPod(requestId);
+ msg.getPod(len);
+ msg.getPod(command);
+ msg.getPod(xerrno);
+
+ if (command == IpcIo::cmdRead && !xerrno)
+ msg.getFixed(buf, len);
+}
+
+void
+IpcIoResponse::pack(Ipc::TypedMsgHdr& msg) const
+{
+ msg.setType(Ipc::mtIpcIoResponse);
+ msg.putPod(diskId);
+ msg.putPod(requestId);
+ msg.putPod(len);
+ msg.putPod(command);
+ msg.putPod(xerrno);
+
+ if (command == IpcIo::cmdRead && !xerrno)
+ msg.putFixed(buf, len);
+}
+
+
+/* IpcIoPendingRequest: */
+
+IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
+ file(aFile), readRequest(NULL), writeRequest(NULL)
+{
+}
+
+
+/* XXX: disker code that should probably be moved elsewhere */
+
+static int TheFile = -1; ///< db file descriptor
+
+static
+void diskerRead(const IpcIoRequest &request)
+{
+ debugs(47,5, HERE << "disker" << KidIdentifier << " reads " <<
+ request.len << " at " << request.offset <<
+ " ipcIo" << request.requestorId << '.' << request.requestId);
+
+ IpcIoResponse response;
+ response.diskId = KidIdentifier;
+ response.requestId = request.requestId;
+ response.command = request.command;
+
+ const ssize_t read = pread(TheFile, response.buf, request.len, request.offset);
+ if (read >= 0) {
+ response.xerrno = 0;
+ response.len = static_cast<size_t>(read); // safe because read > 0
+ debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
+ (response.len == request.len ? "all " : "just ") << read);
+ } else {
+ response.xerrno = errno;
+ response.len = 0;
+ debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
+ response.xerrno);
+ }
+
+ Ipc::TypedMsgHdr message;
+ response.pack(message);
+ const String addr =
+ Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId);
+ Ipc::SendMessage(addr, message);
+}
+
+static
+void diskerWrite(const IpcIoRequest &request)
+{
+ debugs(47,5, HERE << "disker" << KidIdentifier << " writes " <<
+ request.len << " at " << request.offset <<
+ " ipcIo" << request.requestorId << '.' << request.requestId);
+
+ IpcIoResponse response;
+ response.diskId = KidIdentifier;
+ response.requestId = request.requestId;
+ response.command = request.command;
+
+ const ssize_t wrote = pwrite(TheFile, request.buf, request.len, request.offset);
+ if (wrote >= 0) {
+ response.xerrno = 0;
+ response.len = static_cast<size_t>(wrote); // safe because wrote > 0
+ debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " <<
+ (response.len == request.len ? "all " : "just ") << wrote);
+ } else {
+ response.xerrno = errno;
+ response.len = 0;
+ debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " <<
+ response.xerrno);
+ }
+
+ Ipc::TypedMsgHdr message;
+ response.pack(message);
+ const String addr =
+ Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId);
+ Ipc::SendMessage(addr, message);
+}
+
+/// called when disker receives an I/O request
+void
+IpcIoFile::HandleRequest(const IpcIoRequest &request)
+{
+ switch (request.command) {
+ case IpcIo::cmdRead:
+ diskerRead(request);
+ break;
+
+ case IpcIo::cmdWrite:
+ diskerWrite(request);
+ break;
+
+ default:
+ debugs(0,0, HERE << "disker" << KidIdentifier <<
+ " should not receive " << request.command <<
+ " ipcIo" << request.requestorId << '.' << request.requestId);
+ break;
+ }
+}
+
+static bool
+DiskerOpen(const String &path, int flags, mode_t mode)
+{
+ assert(TheFile < 0);
+
+ TheFile = file_open(path.termedBuf(), flags);
+
+ if (TheFile < 0) {
+ const int xerrno = errno;
+ debugs(47,0, HERE << "rock db error opening " << path << ": " <<
+ xstrerr(xerrno));
+ return false;
+ }
+
+ store_open_disk_fd++;
+ debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile);
+ return true;
+}
+
+static void
+DiskerClose(const String &path)
+{
+ if (TheFile >= 0) {
+ file_close(TheFile);
+ debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
+ TheFile = -1;
+ store_open_disk_fd--;
+ }
+}
--- /dev/null
+#ifndef SQUID_IPC_IOFILE_H
+#define SQUID_IPC_IOFILE_H
+
+#include "base/AsyncCall.h"
+#include "cbdata.h"
+#include "DiskIO/DiskFile.h"
+#include "DiskIO/IORequestor.h"
+#include "ipc/forward.h"
+#include <map>
+
+// TODO: expand to all classes
+namespace IpcIo {
+
+/// what kind of I/O the disker needs to do or have done
+typedef enum { cmdNone, cmdOpen, cmdRead, cmdWrite } Command;
+
+enum { BufCapacity = 32*1024 }; // XXX: must not exceed TypedMsgHdr.maxSize
+
+} // namespace IpcIo
+
+
+/// converts DiskIO requests to IPC messages
+// TODO: make this IpcIoMsg to make IpcIoRequest and IpcIoResponse similar
+class IpcIoRequest {
+public:
+ IpcIoRequest();
+
+ explicit IpcIoRequest(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
+ void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
+
+public:
+ int requestorId; ///< kidId of the requestor; used for response destination
+ unsigned int requestId; ///< unique for sender; matches request w/ response
+
+ /* ReadRequest and WriteRequest parameters to pass to disker */
+ char buf[IpcIo::BufCapacity]; // XXX: inefficient
+ off_t offset;
+ size_t len;
+
+ IpcIo::Command command; ///< what disker is supposed to do
+};
+
+/// disker response to IpcIoRequest
+class IpcIoResponse {
+public:
+ IpcIoResponse();
+
+ explicit IpcIoResponse(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
+ void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
+
+public:
+ int diskId; ///< kidId of the responding disker
+ unsigned int requestId; ///< unique for sender; matches request w/ response
+
+ char buf[IpcIo::BufCapacity]; // XXX: inefficient
+ size_t len;
+
+ IpcIo::Command command; ///< what disker did
+
+ int xerrno; ///< I/O error code or zero
+};
+
+class IpcIoPendingRequest;
+
+class IpcIoFile: public DiskFile
+{
+
+public:
+ typedef RefCount<IpcIoFile> Pointer;
+
+ IpcIoFile(char const *aDb);
+ virtual ~IpcIoFile();
+
+ /* DiskFile API */
+ virtual void open(int flags, mode_t mode, RefCount<IORequestor> callback);
+ virtual void create(int flags, mode_t mode, RefCount<IORequestor> callback);
+ virtual void read(ReadRequest *);
+ virtual void write(WriteRequest *);
+ virtual void close();
+ virtual bool error() const;
+ virtual int getFD() const;
+ virtual bool canRead() const;
+ virtual bool canWrite() const;
+ virtual bool ioInProgress() const;
+
+ /// finds and calls the right IpcIoFile upon disker's response
+ static void HandleResponse(const Ipc::TypedMsgHdr &response);
+
+ /// disker entry point for remote I/O requests
+ static void HandleRequest(const IpcIoRequest &request);
+
+private:
+ void send(IpcIoRequest &request, IpcIoPendingRequest *pending);
+
+ void openCompleted(const IpcIoResponse &);
+ void readCompleted(ReadRequest *readRequest, const IpcIoResponse &);
+ void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse &);
+
+ static void RequestTimedOut(void* param);
+ void requestTimedOut();
+ void removeTimeoutEvent();
+ static IpcIoPendingRequest *DequeueRequest(unsigned int requestId);
+
+private:
+ const String dbName; ///< the name of the file we are managing
+ int diskId; ///< the process ID of the disker we talk to
+ RefCount<IORequestor> ioRequestor;
+
+ int ioLevel; ///< number of pending I/O requests using this file
+
+ bool error_; ///< whether we have seen at least one I/O error (XXX)
+
+ /// maps requestId to the handleResponse callback
+ typedef std::map<unsigned int, IpcIoPendingRequest*> RequestsMap;
+ static RequestsMap TheRequestsMap; ///< pending requests map
+
+ static unsigned int LastRequestId; ///< last requestId used
+
+ CBDATA_CLASS2(IpcIoFile);
+};
+
+
+/// keeps original I/O request parameters while disker is handling the request
+class IpcIoPendingRequest
+{
+public:
+ IpcIoPendingRequest(const IpcIoFile::Pointer &aFile);
+
+public:
+ IpcIoFile::Pointer file; ///< the file object waiting for the response
+ ReadRequest *readRequest; ///< set if this is a read requests
+ WriteRequest *writeRequest; ///< set if this is a write request
+
+private:
+ IpcIoPendingRequest(const IpcIoPendingRequest &d); // not implemented
+ IpcIoPendingRequest &operator =(const IpcIoPendingRequest &d); // ditto
+};
+
+
+#endif /* SQUID_IPC_IOFILE_H */
--- /dev/null
+
+/*
+ * $Id$
+ *
+ * DEBUG: section 47 Store Directory Routines
+ */
+
+#include "IpcIoIOStrategy.h"
+#include "IpcIoFile.h"
+bool
+IpcIoIOStrategy::shedLoad()
+{
+ return false;
+}
+
+int
+IpcIoIOStrategy::load()
+{
+ /* Return 999 (99.9%) constant load */
+ return 999;
+}
+
+DiskFile::Pointer
+IpcIoIOStrategy::newFile (char const *path)
+{
+ return new IpcIoFile (path);
+}
+
+void
+IpcIoIOStrategy::unlinkFile(char const *path)
+{
+#if USE_UNLINKD
+ unlinkdUnlink(path);
+#else
+ ::unlink(path);
+#endif
+}
--- /dev/null
+#ifndef SQUID_IPC_IOIOSTRATEGY_H
+#define SQUID_IPC_IOIOSTRATEGY_H
+#include "DiskIO/DiskIOStrategy.h"
+
+class IpcIoIOStrategy : public DiskIOStrategy
+{
+
+public:
+ virtual bool shedLoad();
+ virtual int load();
+ virtual RefCount<DiskFile> newFile(char const *path);
+ virtual void unlinkFile (char const *);
+};
+
+#endif /* SQUID_IPC_IOIOSTRATEGY_H */
endif
EXTRA_LIBRARIES = libAIO.a libBlocking.a libDiskDaemon.a libDiskThreads.a \
- libMmapped.a
+ libMmapped.a libIpcIo.a
noinst_LIBRARIES = $(DISK_LIBS)
noinst_LTLIBRARIES = libsquid.la
DiskIO/Mmapped/MmappedDiskIOModule.cc \
DiskIO/Mmapped/MmappedDiskIOModule.h
+libIpcIo_a_SOURCES = \
+ DiskIO/IpcIo/IpcIoFile.cc \
+ DiskIO/IpcIo/IpcIoFile.h \
+ DiskIO/IpcIo/IpcIoIOStrategy.cc \
+ DiskIO/IpcIo/IpcIoIOStrategy.h \
+ DiskIO/IpcIo/IpcIoDiskIOModule.cc \
+ DiskIO/IpcIo/IpcIoDiskIOModule.h
+
libDiskDaemon_a_SOURCES = \
DiskIO/DiskDaemon/DiskdFile.cc \
DiskIO/DiskDaemon/DiskdFile.h \
parse_cachedir(SquidConfig::_cacheSwap * swap)
{
// coordinator does not need to handle cache_dir.
- if (IamCoordinatorProcess())
+ if (IamCoordinatorProcess()) {
+ // make sure the NumberOfKids() is correct for coordinator
+ ++swap->n_processes; // XXX: does not work in reconfigure
return;
+ }
char *type_str;
char *path_str;
sd->parse(swap->n_configured, path_str);
++swap->n_configured;
+ ++swap->n_processes;
/* Update the max object size */
update_maxobjsize();
// are refcounted. We up our count once to avoid implicit delete's.
RefCountReference();
- DiskIOModule *m = DiskIOModule::Find("Mmapped"); // TODO: configurable?
+ DiskIOModule *m = DiskIOModule::Find("IpcIo"); // TODO: configurable?
assert(m);
io = m->createStrategy();
io->init();
#include "mgr/Request.h"
#include "mgr/Response.h"
#include "mgr/StoreToCommWriter.h"
+#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: layering violation */
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
{
+ debugs(54, 3, HERE << "registering kid" << strand.kidId <<
+ ' ' << strand.tag);
if (StrandCoord* found = findStrand(strand.kidId))
*found = strand;
else
strands_.push_back(strand);
+
+ // notify searchers waiting for this new strand, if any
+ typedef Searchers::iterator SRI;
+ for (SRI i = searchers.begin(); i != searchers.end();) {
+ if (i->tag == strand.tag) {
+ notifySearcher(*i, strand);
+ i = searchers.erase(i);
+ } else {
+ ++i;
+ }
+ }
}
void Ipc::Coordinator::receive(const TypedMsgHdr& message)
switch (message.type()) {
case mtRegistration:
debugs(54, 6, HERE << "Registration request");
- handleRegistrationRequest(StrandCoord(message));
+ handleRegistrationRequest(HereIamMessage(message));
break;
+ case mtIpcIoRequest: { // XXX: this should have been mtStrandSearchRequest
+ IpcIoRequest io(message);
+ StrandSearchRequest sr;
+ sr.requestorId = io.requestorId;
+ sr.requestId = io.requestId;
+ sr.tag.limitInit(io.buf, io.len);
+ debugs(54, 6, HERE << "Strand search request: " << io.requestorId << ' ' << io.requestId << ' ' << io.len << " cmd=" << io.command << " tag: " << sr.tag);
+ handleSearchRequest(sr);
+ break;
+ }
+
case mtSharedListenRequest:
debugs(54, 6, HERE << "Shared listen request");
handleSharedListenRequest(SharedListenRequest(message));
}
}
-void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
+void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg)
{
- registerStrand(strand);
+ registerStrand(msg.strand);
// send back an acknowledgement; TODO: remove as not needed?
TypedMsgHdr message;
- strand.pack(message);
- SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
+ msg.pack(message);
+ SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message);
}
void
Mgr::Inquirer::HandleRemoteAck(response);
}
+void
+Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
+{
+ // do we know of a strand with the given search tag?
+ const StrandCoord *strand = NULL;
+ typedef StrandCoords::const_iterator SCCI;
+ for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
+ if (i->tag == request.tag)
+ strand = &(*i);
+ }
+
+ if (strand) {
+ notifySearcher(request, *strand);
+ return;
+ }
+
+ searchers.push_back(request);
+ debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
+ " who " << request.tag << " is");
+}
+
+void
+Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
+ const StrandCoord& strand)
+{
+ debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
+ request.tag << " is kid" << strand.kidId);
+ const StrandSearchResponse response0(request.requestId, strand);
+ // XXX: we should use StrandSearchResponse instead of converting it
+ IpcIoResponse io;
+ io.diskId = strand.kidId;
+ io.requestId = request.requestId;
+ io.command = IpcIo::cmdOpen;
+ TypedMsgHdr message;
+ io.pack(message);
+ SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+}
+
+
int
Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
int &errNo)
#include "ipc/Port.h"
#include "ipc/SharedListen.h"
#include "ipc/StrandCoords.h"
+#include "ipc/StrandSearch.h"
#include "mgr/forward.h"
+#include <list>
#include <map>
namespace Ipc
StrandCoord* findStrand(int kidId); ///< registered strand or NULL
void registerStrand(const StrandCoord &); ///< adds or updates existing
- void handleRegistrationRequest(const StrandCoord &); ///< register,ACK
+ void handleRegistrationRequest(const HereIamMessage &); ///< register,ACK
+
+ /// answer the waiting search request
+ void notifySearcher(const StrandSearchRequest &request, const StrandCoord&);
+ /// answers or queues the request if the answer is not yet known
+ void handleSearchRequest(const StrandSearchRequest &request);
/// returns cached socket or calls openListenSocket()
void handleSharedListenRequest(const SharedListenRequest& request);
private:
StrandCoords strands_; ///< registered processes and threads
+ typedef std::list<StrandSearchRequest> Searchers; ///< search requests
+ Searchers searchers; ///< yet unanswered search requests in arrival order
+
typedef std::map<OpenListenerParams, int> Listeners; ///< params:fd map
Listeners listeners; ///< cached comm_open_listener() results
StrandCoord.cc \
StrandCoord.h \
StrandCoords.h \
+ StrandSearch.cc \
+ StrandSearch.h \
SharedListen.cc \
SharedListen.h \
TypedMsgHdr.cc \
/// message class identifier
typedef enum { mtNone = 0, mtRegistration,
+ mtStrandSearchRequest, mtStrandSearchResponse,
mtSharedListenRequest, mtSharedListenResponse,
+ mtIpcIoRequest, mtIpcIoResponse,
mtCacheMgrRequest, mtCacheMgrResponse
} MessageType;
#include "mgr/Request.h"
#include "mgr/Response.h"
#include "mgr/Forwarder.h"
+#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: scope boundary violation */
+#include "SwapDir.h" /* XXX: scope boundary violation */
#include "CacheManager.h"
{
debugs(54, 6, HERE);
Must(!isRegistered);
+
+ HereIamMessage ann(StrandCoord(KidIdentifier, getpid()));
+
+ // announce that we are responsible for our cache_dir if needed
+ // XXX: misplaced
+ if (IamDiskProcess()) {
+ const int myDisk = KidIdentifier % Config.cacheSwap.n_processes;
+ if (const SwapDir *sd = dynamic_cast<const SwapDir*>(INDEXSD(myDisk))) {
+ ann.strand.tag = sd->path;
+ ann.strand.tag.append("/rock"); // XXX: scope boundary violation
+ }
+ }
+
TypedMsgHdr message;
- StrandCoord(KidIdentifier, getpid()).pack(message);
+ ann.pack(message);
SendMessage(coordinatorAddr, message);
setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
}
switch (message.type()) {
case mtRegistration:
- handleRegistrationResponse(StrandCoord(message));
+ handleRegistrationResponse(HereIamMessage(message));
break;
case mtSharedListenResponse:
SharedListenJoined(SharedListenResponse(message));
break;
+ case mtIpcIoRequest:
+ IpcIoFile::HandleRequest(IpcIoRequest(message));
+ break;
+
+ case mtIpcIoResponse:
+ IpcIoFile::HandleResponse(message);
+ break;
+
case mtCacheMgrRequest:
handleCacheMgrRequest(Mgr::Request(message));
break;
}
}
-void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand)
+void Ipc::Strand::handleRegistrationResponse(const HereIamMessage &msg)
{
// handle registration response from the coordinator; it could be stale
- if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
+ if (msg.strand.kidId == KidIdentifier && msg.strand.pid == getpid()) {
debugs(54, 6, "kid" << KidIdentifier << " registered");
clearTimeout(); // we are done
} else {
#ifndef SQUID_IPC_STRAND_H
#define SQUID_IPC_STRAND_H
+#include "ipc/forward.h"
#include "ipc/Port.h"
#include "mgr/forward.h"
private:
void registerSelf(); /// let Coordinator know this strand exists
- void handleRegistrationResponse(const StrandCoord &strand);
+ void handleRegistrationResponse(const HereIamMessage &msg);
void handleCacheMgrRequest(const Mgr::Request& request);
void handleCacheMgrResponse(const Mgr::Response& response);
#include "config.h"
+#include "Debug.h"
#include "ipc/Messages.h"
#include "ipc/StrandCoord.h"
#include "ipc/TypedMsgHdr.h"
{
}
-Ipc::StrandCoord::StrandCoord(const TypedMsgHdr &hdrMsg): kidId(-1), pid(0)
+void
+Ipc::StrandCoord::unpack(const TypedMsgHdr &hdrMsg)
{
- hdrMsg.checkType(mtRegistration);
- hdrMsg.getPod(*this);
+ hdrMsg.getPod(kidId);
+ hdrMsg.getPod(pid);
+ hdrMsg.getString(tag);
+debugs(0,0, HERE << "getting tag: " << tag);
}
void Ipc::StrandCoord::pack(TypedMsgHdr &hdrMsg) const
+{
+ hdrMsg.putPod(kidId);
+ hdrMsg.putPod(pid);
+debugs(0,0, HERE << "putting tag: " << tag);
+ hdrMsg.putString(tag);
+}
+
+
+Ipc::HereIamMessage::HereIamMessage(const StrandCoord &aStrand):
+ strand(aStrand)
+{
+}
+
+Ipc::HereIamMessage::HereIamMessage(const TypedMsgHdr &hdrMsg)
+{
+ hdrMsg.checkType(mtRegistration);
+ strand.unpack(hdrMsg);
+}
+
+void Ipc::HereIamMessage::pack(TypedMsgHdr &hdrMsg) const
{
hdrMsg.setType(mtRegistration);
- hdrMsg.putPod(*this);
+ strand.pack(hdrMsg);
}
#define SQUID_IPC_STRAND_COORD_H
#include "ipc/forward.h"
+#include "SquidString.h"
#include <sys/types.h>
namespace Ipc
{
public:
StrandCoord(); ///< unknown location
- StrandCoord(int akidId, pid_t aPid); ///< from registrant
- explicit StrandCoord(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+ StrandCoord(int akidId, pid_t aPid);
+
void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+ void unpack(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
public:
int kidId; ///< internal Squid process number
pid_t pid; ///< OS process or thread identifier
+
+ String tag; ///< optional unique well-known key (e.g., cache_dir path)
};
+/// strand registration with Coordinator (also used as an ACK)
+class HereIamMessage
+{
+public:
+ explicit HereIamMessage(const StrandCoord &strand); ///< from registrant
+ explicit HereIamMessage(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+ void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+
+public:
+ StrandCoord strand; ///< registrant coordinates and related details
+};
+
+
+
} // namespace Ipc;
#endif /* SQUID_IPC_STRAND_COORD_H */
class TypedMsgHdr: public msghdr
{
public:
- enum {maxSize = 4096};
+ enum {maxSize = 36*1024}; // XXX: so that disker I/O can fit
public:
TypedMsgHdr();
class TypedMsgHdr;
class StrandCoord;
+class HereIamMessage;
} // namespace Ipc
RefCount<class Store> *swapDirs;
int n_allocated;
int n_configured;
+ ///< number of disk processes (set even when n_configured is not)
+ int n_processes;
} cacheSwap;
/*
* I'm sick of having to keep doing this ..
// when there is a master and worker process, the master delegates
// primary functions to its only kid
- if (Config.workers == 1)
+ if (NumberOfKids() == 1)
return IamWorkerProcess();
// in SMP mode, multiple kids delegate primary functions to the coordinator
// XXX: detect and abort when called before workers/cache_dirs are parsed
// XXX: this is not always the case as there are other cache_dir types
- const int rockDirs = Config.cacheSwap.n_configured;
+ const int rockDirs = Config.cacheSwap.n_processes;
const bool needCoord = Config.workers > 1 || rockDirs > 0;
return (needCoord ? 1 : 0) + Config.workers + rockDirs;