]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Added IpcIo DiskIO module for communication with remote disk processes via UDS.
authorAlex Rousskov <rousskov@measurement-factory.com>
Tue, 1 Feb 2011 05:01:43 +0000 (22:01 -0700)
committerAlex Rousskov <rousskov@measurement-factory.com>
Tue, 1 Feb 2011 05:01:43 +0000 (22:01 -0700)
Used IpcIo for Rock Store filesystem module.

Added StrandSearch API: Workers use it to ask Coordinator for the right
address (i.e., kid identifier) of the disk process for a given cache_dir path.
If Coordinator does not know the answer, it waits for more disk processes to
register. Implemented using generic tagging of kids (StrandCoord) and
searching for the right tag.

Raised UDS message size maximum to 36K in order to accommodate non-trivial
rock store I/O while we are using UDS messages for I/O content.

Fixed shutdown handling broken by hiding cache_dirs from Coordinator while
switching IamPrimaryProcess() logic to use NumberOfKids() which needs
cache_dir count.

23 files changed:
configure.ac
src/DiskIO/IpcIo/DiskIOIpcIo.cc [new file with mode: 0644]
src/DiskIO/IpcIo/IpcIoDiskIOModule.cc [new file with mode: 0644]
src/DiskIO/IpcIo/IpcIoDiskIOModule.h [new file with mode: 0644]
src/DiskIO/IpcIo/IpcIoFile.cc [new file with mode: 0644]
src/DiskIO/IpcIo/IpcIoFile.h [new file with mode: 0644]
src/DiskIO/IpcIo/IpcIoIOStrategy.cc [new file with mode: 0644]
src/DiskIO/IpcIo/IpcIoIOStrategy.h [new file with mode: 0644]
src/Makefile.am
src/cache_cf.cc
src/fs/rock/RockSwapDir.cc
src/ipc/Coordinator.cc
src/ipc/Coordinator.h
src/ipc/Makefile.am
src/ipc/Messages.h
src/ipc/Strand.cc
src/ipc/Strand.h
src/ipc/StrandCoord.cc
src/ipc/StrandCoord.h
src/ipc/TypedMsgHdr.h
src/ipc/forward.h
src/structs.h
src/tools.cc

index 8dcdc7b21f421003459fc3d6a5aae6bc03c70963..79682c3c60bf562bcd799d690a0014584612afac 100644 (file)
@@ -633,6 +633,13 @@ for module in $squid_disk_module_candidates none; do
       DISK_LINKOBJS="$DISK_LINKOBJS DiskIO/Mmapped/MmappedDiskIOModule.o"
       ;;
 
+    IpcIo)
+      AC_MSG_NOTICE([Enabling IpcIo DiskIO module])
+      DISK_LIBS="$DISK_LIBS libIpcIo.a"
+      DISK_MODULES="$DISK_MODULES IpcIo"
+      DISK_LINKOBJS="$DISK_LINKOBJS DiskIO/IpcIo/IpcIoDiskIOModule.o"
+      ;;
+
     Blocking)
       AC_MSG_NOTICE([Enabling Blocking DiskIO module])
       DISK_LIBS="$DISK_LIBS libBlocking.a"
@@ -721,8 +728,8 @@ for fs in $squid_storeio_module_candidates none; do
       STORE_TESTS="$STORE_TESTS tests/testCoss$EXEEXT"
       ;;
     rock)
-       if ! test "x$squid_disk_module_candidates_Mmapped" = "xyes"; then
-         AC_MSG_ERROR([Storage modeule Rock requires Mmapped DiskIO module])
+       if ! test "x$squid_disk_module_candidates_IpcIo" = "xyes"; then
+         AC_MSG_ERROR([Storage modeule Rock requires IpcIo DiskIO module])
        fi
        STORE_TESTS="$STORE_TESTS tests/testRock$EXEEXT"
        ;;
diff --git a/src/DiskIO/IpcIo/DiskIOIpcIo.cc b/src/DiskIO/IpcIo/DiskIOIpcIo.cc
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/DiskIO/IpcIo/IpcIoDiskIOModule.cc b/src/DiskIO/IpcIo/IpcIoDiskIOModule.cc
new file mode 100644 (file)
index 0000000..d0878e4
--- /dev/null
@@ -0,0 +1,37 @@
+#include "squid.h"
+#include "IpcIoDiskIOModule.h"
+#include "IpcIoIOStrategy.h"
+
+IpcIoDiskIOModule::IpcIoDiskIOModule()
+{
+    ModuleAdd(*this);
+}
+
+IpcIoDiskIOModule &
+IpcIoDiskIOModule::GetInstance()
+{
+    return Instance;
+}
+
+void
+IpcIoDiskIOModule::init()
+{}
+
+void
+IpcIoDiskIOModule::shutdown()
+{}
+
+
+DiskIOStrategy*
+IpcIoDiskIOModule::createStrategy()
+{
+    return new IpcIoIOStrategy();
+}
+
+IpcIoDiskIOModule IpcIoDiskIOModule::Instance;
+
+char const *
+IpcIoDiskIOModule::type () const
+{
+    return "IpcIo";
+}
diff --git a/src/DiskIO/IpcIo/IpcIoDiskIOModule.h b/src/DiskIO/IpcIo/IpcIoDiskIOModule.h
new file mode 100644 (file)
index 0000000..92e1cf2
--- /dev/null
@@ -0,0 +1,21 @@
+#ifndef SQUID_IPC_IODISKIOMODULE_H
+#define SQUID_IPC_IODISKIOMODULE_H
+
+#include "DiskIO/DiskIOModule.h"
+
+class IpcIoDiskIOModule : public DiskIOModule
+{
+
+public:
+    static IpcIoDiskIOModule &GetInstance();
+    IpcIoDiskIOModule();
+    virtual void init();
+    virtual void shutdown();
+    virtual char const *type () const;
+    virtual DiskIOStrategy* createStrategy();
+
+private:
+    static IpcIoDiskIOModule Instance;
+};
+
+#endif /* SQUID_IPC_IODISKIOMODULE_H */
diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc
new file mode 100644 (file)
index 0000000..9246e6c
--- /dev/null
@@ -0,0 +1,532 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 47    Store Directory Routines
+ */
+
+#include "config.h"
+#include "base/TextException.h"
+#include "DiskIO/IORequestor.h"
+#include "DiskIO/IpcIo/IpcIoFile.h"
+#include "DiskIO/ReadRequest.h"
+#include "DiskIO/WriteRequest.h"
+#include "ipc/Messages.h"
+#include "ipc/Port.h"
+#include "ipc/UdsOp.h"
+
+CBDATA_CLASS_INIT(IpcIoFile);
+
+IpcIoFile::RequestsMap IpcIoFile::TheRequestsMap;
+unsigned int IpcIoFile::LastRequestId = 0;
+
+static bool DiskerOpen(const String &path, int flags, mode_t mode);
+static void DiskerClose(const String &path);
+
+
+IpcIoFile::IpcIoFile(char const *aDb):
+    dbName(aDb),
+    diskId(-1),
+    ioLevel(0),
+    error_(false)
+{
+}
+
+IpcIoFile::~IpcIoFile()
+{
+}
+
+void
+IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
+{
+    ioRequestor = callback;
+    Must(diskId < 0); // we do not know our disker yet
+
+    if (IamDiskProcess()) {
+        error_ = !DiskerOpen(dbName, flags, mode);
+        ioRequestor->ioCompletedNotification();
+        return;
+       }        
+
+    // XXX: use StrandSearchRequest instead
+    IpcIoRequest ipcIo;
+    ipcIo.requestorId = KidIdentifier;
+    ipcIo.command = IpcIo::cmdOpen;
+    ipcIo.len = dbName.size();
+    assert(ipcIo.len <= sizeof(ipcIo.buf));
+    memcpy(ipcIo.buf, dbName.rawBuf(), ipcIo.len);    
+
+    IpcIoPendingRequest *pending = new IpcIoPendingRequest(this);
+    send(ipcIo, pending);
+}
+
+void
+IpcIoFile::openCompleted(const IpcIoResponse &ipcResponse) {
+    if (ipcResponse.xerrno) {
+        debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno));
+        error_ = true;
+       } else {
+        diskId = ipcResponse.diskId;
+        if (diskId < 0) {
+            error_ = true;
+            debugs(79,1, HERE << "error: no disker claimed " << dbName);
+               }
+       }
+
+    ioRequestor->ioCompletedNotification();
+}
+
+/**
+ * Alias for IpcIoFile::open(...)
+ \copydoc IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
+ */
+void
+IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
+{
+    assert(false); // check
+    /* We use the same logic path for open */
+    open(flags, mode, callback);
+}
+
+void
+IpcIoFile::close()
+{
+    assert(ioRequestor != NULL);
+
+    if (IamDiskProcess())
+        DiskerClose(dbName);
+    // XXX: else nothing to do?  
+
+    ioRequestor->closeCompleted();
+}
+
+bool
+IpcIoFile::canRead() const
+{
+    return diskId >= 0;
+}
+
+bool
+IpcIoFile::canWrite() const
+{
+    return diskId >= 0;
+}
+
+bool
+IpcIoFile::error() const
+{
+    return error_;
+}
+
+void
+IpcIoFile::read(ReadRequest *readRequest)
+{
+    debugs(79,3, HERE << "(disker" << diskId << ", " << readRequest->len << ", " <<
+        readRequest->offset << ")");
+
+    assert(ioRequestor != NULL);
+    assert(readRequest->len >= 0);
+    assert(readRequest->offset >= 0);
+    Must(!error_);
+
+    //assert(minOffset < 0 || minOffset <= readRequest->offset);
+    //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
+
+    IpcIoRequest ipcIo;
+    ipcIo.requestorId = KidIdentifier;
+    ipcIo.command = IpcIo::cmdRead;
+    ipcIo.offset = readRequest->offset;
+    ipcIo.len = readRequest->len;
+
+    IpcIoPendingRequest *pending = new IpcIoPendingRequest(this);
+    pending->readRequest = readRequest;
+    send(ipcIo, pending);
+}
+
+void
+IpcIoFile::readCompleted(ReadRequest *readRequest,
+                         const IpcIoResponse &ipcResponse)
+{
+    if (ipcResponse.xerrno) {
+        debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno));
+        error_ = true;
+       }
+
+    const ssize_t rlen = error_ ? -1 : (ssize_t)readRequest->len;
+    const int errflag = error_ ? DISK_ERROR : DISK_OK;
+    // XXX: check buffering expectations of the recepient
+    ioRequestor->readCompleted(ipcResponse.buf, rlen, errflag, readRequest);
+}
+
+void
+IpcIoFile::write(WriteRequest *writeRequest)
+{
+    debugs(79,3, HERE << "(disker" << diskId << ", " << writeRequest->len << ", " <<
+        writeRequest->offset << ")");
+
+    assert(ioRequestor != NULL);
+    assert(writeRequest->len >= 0);
+    assert(writeRequest->len > 0); // TODO: work around mmap failures on zero-len?
+    assert(writeRequest->offset >= 0);
+    Must(!error_);
+
+    //assert(minOffset < 0 || minOffset <= writeRequest->offset);
+    //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
+
+    IpcIoRequest ipcIo;
+    ipcIo.requestorId = KidIdentifier;
+    ipcIo.command = IpcIo::cmdWrite;
+    ipcIo.offset = writeRequest->offset;
+    ipcIo.len = writeRequest->len;
+    assert(ipcIo.len <= sizeof(ipcIo.buf));
+    memcpy(ipcIo.buf, writeRequest->buf, ipcIo.len); // optimize away
+
+    IpcIoPendingRequest *pending = new IpcIoPendingRequest(this);
+    pending->writeRequest = writeRequest;
+    send(ipcIo, pending);
+}
+
+void
+IpcIoFile::writeCompleted(WriteRequest *writeRequest,
+                          const IpcIoResponse &ipcResponse)
+{
+    if (ipcResponse.xerrno) {
+        debugs(79,1, HERE << "error: " << xstrerr(ipcResponse.xerrno));
+        error_ = true;
+    } else
+    if (ipcResponse.len != writeRequest->len) {
+        debugs(79,1, HERE << "problem: " << ipcResponse.len << " < " << writeRequest->len);
+        error_ = true;
+    }
+
+    if (writeRequest->free_func)
+        (writeRequest->free_func)(const_cast<char*>(writeRequest->buf)); // broken API?
+
+    if (!error_) {
+        debugs(79,5, HERE << "wrote " << writeRequest->len << " to disker" <<
+            diskId << " at " << writeRequest->offset);
+       }
+
+    const ssize_t rlen = error_ ? 0 : (ssize_t)writeRequest->len;
+    const int errflag = error_ ? DISK_ERROR : DISK_OK;
+    ioRequestor->writeCompleted(errflag, rlen, writeRequest);
+}
+
+bool
+IpcIoFile::ioInProgress() const
+{
+    return ioLevel > 0; // XXX: todo
+}
+
+/// sends an I/O request to disker
+void
+IpcIoFile::send(IpcIoRequest &ipcIo, IpcIoPendingRequest *pending)
+{
+    if (++LastRequestId == 0) // don't use zero value as requestId
+        ++LastRequestId;
+    ipcIo.requestId = LastRequestId;
+    TheRequestsMap[ipcIo.requestId] = pending;
+
+    Ipc::TypedMsgHdr message;
+    ipcIo.pack(message);
+
+    Must(diskId >= 0 || ipcIo.command == IpcIo::cmdOpen);
+    const String addr = diskId >= 0 ?
+        Ipc::Port::MakeAddr(Ipc::strandAddrPfx, diskId) :
+        Ipc::coordinatorAddr;
+
+    debugs(47, 7, HERE << "asking disker" << diskId << " to " <<
+        ipcIo.command << "; ipcIo" << KidIdentifier << '.' << ipcIo.requestId);
+
+    Ipc::SendMessage(addr, message);
+    ++ioLevel;
+
+    const double timeout = 10; // in seconds
+    eventAdd("IpcIoFile::requestTimedOut", &IpcIoFile::RequestTimedOut,
+             this, timeout, 0, false);
+}
+
+/// called when disker responds to our I/O request
+void
+IpcIoFile::HandleResponse(const Ipc::TypedMsgHdr &raw)
+{
+    IpcIoResponse response(raw);
+
+    const int requestId = response.requestId;
+    debugs(47, 7, HERE << "disker response to " <<
+        response.command << "; ipcIo" << KidIdentifier << '.' << requestId);
+
+    Must(requestId != 0);
+
+    IpcIoPendingRequest *pending = DequeueRequest(requestId);
+    Must(pending);
+
+    if (pending->readRequest)
+        pending->file->readCompleted(pending->readRequest, response);
+    else
+    if (pending->writeRequest)
+        pending->file->writeCompleted(pending->writeRequest, response);
+    else
+        pending->file->openCompleted(response);
+
+    // XXX: leaking if throwinig
+    delete pending;
+}
+
+
+/// Mgr::IpcIoFile::requestTimedOut wrapper
+void
+IpcIoFile::RequestTimedOut(void* param)
+{
+    debugs(47, 1, HERE << "bug: request timedout and we cannot handle that");
+    Must(param != NULL);
+    // XXX: cannot get to file because IpcIoFile is not cbdata-protected
+    // IpcIoFile* file = static_cast<IpcIoFile*>(param);
+
+    // TODO: notify the pending request (XXX: which one?)
+
+    // use async call to enable job call protection that time events lack
+    // CallJobHere(47, 5, mgrFwdr, IpcIoFile, requestTimedOut);
+}
+
+/// Called when Coordinator fails to start processing the request [in time]
+void
+IpcIoFile::requestTimedOut()
+{
+    debugs(47, 3, HERE);
+    assert(false); // TODO: notify the pending request (XXX: which one?)
+}
+
+/// called when we are no longer waiting for Coordinator to respond
+void
+IpcIoFile::removeTimeoutEvent()
+{
+    if (eventFind(&IpcIoFile::RequestTimedOut, this))
+        eventDelete(&IpcIoFile::RequestTimedOut, this);
+}
+
+/// returns and forgets the right IpcIoFile pending request
+IpcIoPendingRequest *
+IpcIoFile::DequeueRequest(unsigned int requestId)
+{
+    debugs(47, 3, HERE);
+    Must(requestId != 0);
+    RequestsMap::iterator i = TheRequestsMap.find(requestId);
+    if (i != TheRequestsMap.end()) {
+        IpcIoPendingRequest *pending = i->second;
+        TheRequestsMap.erase(i);
+        return pending;
+       }
+    return NULL;
+}
+
+int
+IpcIoFile::getFD() const 
+{
+    assert(false); // not supported; TODO: remove this method from API
+    return -1;
+}
+
+
+/* IpcIoRequest */
+
+IpcIoRequest::IpcIoRequest():
+    requestorId(0), requestId(0),
+    offset(0), len(0),
+    command(IpcIo::cmdNone)
+{
+}
+
+IpcIoRequest::IpcIoRequest(const Ipc::TypedMsgHdr& msg)
+{
+    msg.checkType(Ipc::mtIpcIoRequest);
+    msg.getPod(requestorId);
+    msg.getPod(requestId);
+   
+    msg.getPod(offset);
+    msg.getPod(len);
+    msg.getPod(command);
+
+    if (command == IpcIo::cmdOpen || command == IpcIo::cmdWrite)
+        msg.getFixed(buf, len);
+}
+
+void
+IpcIoRequest::pack(Ipc::TypedMsgHdr& msg) const
+{
+    msg.setType(Ipc::mtIpcIoRequest);
+    msg.putPod(requestorId);
+    msg.putPod(requestId);
+
+    msg.putPod(offset);
+    msg.putPod(len);
+    msg.putPod(command);
+
+    if (command == IpcIo::cmdOpen || command == IpcIo::cmdWrite)
+        msg.putFixed(buf, len);
+}
+
+
+/* IpcIoResponse */
+
+IpcIoResponse::IpcIoResponse():
+    diskId(-1),
+    requestId(0),
+    len(0),
+    xerrno(0)
+{
+}
+
+IpcIoResponse::IpcIoResponse(const Ipc::TypedMsgHdr& msg)
+{
+    msg.checkType(Ipc::mtIpcIoResponse);
+    msg.getPod(diskId);
+    msg.getPod(requestId);
+    msg.getPod(len);
+    msg.getPod(command);
+    msg.getPod(xerrno);
+
+    if (command == IpcIo::cmdRead && !xerrno)
+        msg.getFixed(buf, len);
+}
+
+void
+IpcIoResponse::pack(Ipc::TypedMsgHdr& msg) const
+{
+    msg.setType(Ipc::mtIpcIoResponse);
+    msg.putPod(diskId);
+    msg.putPod(requestId);
+    msg.putPod(len);
+    msg.putPod(command);
+    msg.putPod(xerrno);
+
+    if (command == IpcIo::cmdRead && !xerrno)
+        msg.putFixed(buf, len);
+}
+
+
+/* IpcIoPendingRequest: */
+
+IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
+    file(aFile), readRequest(NULL), writeRequest(NULL)
+{
+}
+
+
+/* XXX: disker code that should probably be moved elsewhere */
+
+static int TheFile = -1; ///< db file descriptor
+
+static
+void diskerRead(const IpcIoRequest &request)
+{
+    debugs(47,5, HERE << "disker" << KidIdentifier << " reads " <<
+        request.len << " at " << request.offset <<
+        " ipcIo" << request.requestorId << '.' << request.requestId);
+
+    IpcIoResponse response;
+    response.diskId = KidIdentifier;
+    response.requestId = request.requestId;
+    response.command = request.command;
+
+    const ssize_t read = pread(TheFile, response.buf, request.len, request.offset);
+    if (read >= 0) {
+        response.xerrno = 0;
+        response.len = static_cast<size_t>(read); // safe because read > 0
+        debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
+            (response.len == request.len ? "all " : "just ") << read);
+       } else {
+        response.xerrno = errno;
+        response.len = 0;
+        debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
+            response.xerrno);
+       }
+    
+    Ipc::TypedMsgHdr message;
+    response.pack(message);
+    const String addr =
+        Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId);
+    Ipc::SendMessage(addr, message);
+}
+
+static
+void diskerWrite(const IpcIoRequest &request)
+{
+    debugs(47,5, HERE << "disker" << KidIdentifier << " writes " <<
+        request.len << " at " << request.offset <<
+        " ipcIo" << request.requestorId << '.' << request.requestId);
+
+    IpcIoResponse response;
+    response.diskId = KidIdentifier;
+    response.requestId = request.requestId;
+    response.command = request.command;
+
+    const ssize_t wrote = pwrite(TheFile, request.buf, request.len, request.offset);
+    if (wrote >= 0) {
+        response.xerrno = 0;
+        response.len = static_cast<size_t>(wrote); // safe because wrote > 0
+        debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " <<
+            (response.len == request.len ? "all " : "just ") << wrote);
+       } else {
+        response.xerrno = errno;
+        response.len = 0;
+        debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " <<
+            response.xerrno);
+       }
+    
+    Ipc::TypedMsgHdr message;
+    response.pack(message);
+    const String addr =
+        Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId);
+    Ipc::SendMessage(addr, message);
+}
+
+/// called when disker receives an I/O request
+void
+IpcIoFile::HandleRequest(const IpcIoRequest &request)
+{
+    switch (request.command) {
+    case IpcIo::cmdRead:
+        diskerRead(request);
+        break;
+
+    case IpcIo::cmdWrite:
+        diskerWrite(request);
+        break;
+
+    default:
+        debugs(0,0, HERE << "disker" << KidIdentifier <<
+               " should not receive " << request.command <<
+               " ipcIo" << request.requestorId << '.' << request.requestId);
+        break;
+       }
+}
+
+static bool
+DiskerOpen(const String &path, int flags, mode_t mode)
+{
+    assert(TheFile < 0);
+
+    TheFile = file_open(path.termedBuf(), flags);
+
+    if (TheFile < 0) {
+        const int xerrno = errno;
+        debugs(47,0, HERE << "rock db error opening " << path << ": " <<
+               xstrerr(xerrno));
+        return false;
+       }
+
+    store_open_disk_fd++;
+    debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile);
+       return true;
+}
+
+static void
+DiskerClose(const String &path)
+{
+    if (TheFile >= 0) {
+        file_close(TheFile);
+        debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
+        TheFile = -1;
+        store_open_disk_fd--;
+       }
+}
diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h
new file mode 100644 (file)
index 0000000..3b23c40
--- /dev/null
@@ -0,0 +1,140 @@
+#ifndef SQUID_IPC_IOFILE_H
+#define SQUID_IPC_IOFILE_H
+
+#include "base/AsyncCall.h"
+#include "cbdata.h"
+#include "DiskIO/DiskFile.h"
+#include "DiskIO/IORequestor.h"
+#include "ipc/forward.h"
+#include <map>
+
+// TODO: expand to all classes
+namespace IpcIo {
+
+/// what kind of I/O the disker needs to do or have done
+typedef enum { cmdNone, cmdOpen, cmdRead, cmdWrite } Command;
+
+enum { BufCapacity = 32*1024 }; // XXX: must not exceed TypedMsgHdr.maxSize
+
+} // namespace IpcIo
+
+
+/// converts DiskIO requests to IPC messages
+// TODO: make this IpcIoMsg to make IpcIoRequest and IpcIoResponse similar
+class IpcIoRequest {
+public:
+    IpcIoRequest();
+
+    explicit IpcIoRequest(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
+    void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
+
+public:
+    int requestorId; ///< kidId of the requestor; used for response destination
+    unsigned int requestId; ///< unique for sender; matches request w/ response
+
+    /* ReadRequest and WriteRequest parameters to pass to disker */
+    char buf[IpcIo::BufCapacity]; // XXX: inefficient
+    off_t offset;
+    size_t len;
+
+    IpcIo::Command command; ///< what disker is supposed to do
+};
+
+/// disker response to IpcIoRequest
+class IpcIoResponse {
+public:
+    IpcIoResponse();
+
+    explicit IpcIoResponse(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
+    void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
+
+public:
+    int diskId; ///< kidId of the responding disker
+    unsigned int requestId; ///< unique for sender; matches request w/ response
+
+    char buf[IpcIo::BufCapacity]; // XXX: inefficient
+    size_t len;
+
+    IpcIo::Command command; ///< what disker did
+
+    int xerrno; ///< I/O error code or zero
+};
+
+class IpcIoPendingRequest;
+
+class IpcIoFile: public DiskFile
+{
+
+public:
+    typedef RefCount<IpcIoFile> Pointer;
+
+    IpcIoFile(char const *aDb);
+    virtual ~IpcIoFile();
+
+    /* DiskFile API */
+    virtual void open(int flags, mode_t mode, RefCount<IORequestor> callback);
+    virtual void create(int flags, mode_t mode, RefCount<IORequestor> callback);
+    virtual void read(ReadRequest *);
+    virtual void write(WriteRequest *);
+    virtual void close();
+    virtual bool error() const;
+    virtual int getFD() const;
+    virtual bool canRead() const;
+    virtual bool canWrite() const;
+    virtual bool ioInProgress() const;
+
+    /// finds and calls the right IpcIoFile upon disker's response
+    static void HandleResponse(const Ipc::TypedMsgHdr &response);
+
+    /// disker entry point for remote I/O requests
+    static void HandleRequest(const IpcIoRequest &request);
+
+private:
+    void send(IpcIoRequest &request, IpcIoPendingRequest *pending);
+
+    void openCompleted(const IpcIoResponse &);
+    void readCompleted(ReadRequest *readRequest, const IpcIoResponse &);
+    void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse &);
+
+    static void RequestTimedOut(void* param);
+    void requestTimedOut();
+    void removeTimeoutEvent();
+    static IpcIoPendingRequest *DequeueRequest(unsigned int requestId);
+
+private:
+    const String dbName; ///< the name of the file we are managing
+    int diskId; ///< the process ID of the disker we talk to
+    RefCount<IORequestor> ioRequestor;
+
+    int ioLevel; ///< number of pending I/O requests using this file
+
+    bool error_; ///< whether we have seen at least one I/O error (XXX)
+
+    /// maps requestId to the handleResponse callback
+    typedef std::map<unsigned int, IpcIoPendingRequest*> RequestsMap;
+    static RequestsMap TheRequestsMap; ///< pending requests map
+
+    static unsigned int LastRequestId; ///< last requestId used
+
+    CBDATA_CLASS2(IpcIoFile);
+};
+
+
+/// keeps original I/O request parameters while disker is handling the request
+class IpcIoPendingRequest
+{
+public:
+    IpcIoPendingRequest(const IpcIoFile::Pointer &aFile);
+
+public:
+    IpcIoFile::Pointer file; ///< the file object waiting for the response
+    ReadRequest *readRequest; ///< set if this is a read requests
+    WriteRequest *writeRequest; ///< set if this is a write request
+
+private:
+    IpcIoPendingRequest(const IpcIoPendingRequest &d); // not implemented
+    IpcIoPendingRequest &operator =(const IpcIoPendingRequest &d); // ditto
+};
+
+
+#endif /* SQUID_IPC_IOFILE_H */
diff --git a/src/DiskIO/IpcIo/IpcIoIOStrategy.cc b/src/DiskIO/IpcIo/IpcIoIOStrategy.cc
new file mode 100644 (file)
index 0000000..c5e46c6
--- /dev/null
@@ -0,0 +1,37 @@
+
+/*
+ * $Id$
+ *
+ * DEBUG: section 47    Store Directory Routines
+ */
+
+#include "IpcIoIOStrategy.h"
+#include "IpcIoFile.h"
+bool
+IpcIoIOStrategy::shedLoad()
+{
+    return false;
+}
+
+int
+IpcIoIOStrategy::load()
+{
+    /* Return 999 (99.9%) constant load */
+    return 999;
+}
+
+DiskFile::Pointer
+IpcIoIOStrategy::newFile (char const *path)
+{
+    return new IpcIoFile (path);
+}
+
+void
+IpcIoIOStrategy::unlinkFile(char const *path)
+{
+#if USE_UNLINKD
+    unlinkdUnlink(path);
+#else
+    ::unlink(path);
+#endif
+}
diff --git a/src/DiskIO/IpcIo/IpcIoIOStrategy.h b/src/DiskIO/IpcIo/IpcIoIOStrategy.h
new file mode 100644 (file)
index 0000000..3bd2945
--- /dev/null
@@ -0,0 +1,15 @@
+#ifndef SQUID_IPC_IOIOSTRATEGY_H
+#define SQUID_IPC_IOIOSTRATEGY_H
+#include "DiskIO/DiskIOStrategy.h"
+
+class IpcIoIOStrategy : public DiskIOStrategy
+{
+
+public:
+    virtual bool shedLoad();
+    virtual int load();
+    virtual RefCount<DiskFile> newFile(char const *path);
+    virtual void unlinkFile (char const *);
+};
+
+#endif /* SQUID_IPC_IOIOSTRATEGY_H */
index c4c0b438b04701e589d0a0eddb919eaa50b8b6a4..2300eba3b4aa4fc2b3a87dd727519a81c69ea48c 100644 (file)
@@ -156,7 +156,7 @@ AIOPS_SOURCE = DiskIO/DiskThreads/aiops.cc
 endif
 
 EXTRA_LIBRARIES = libAIO.a libBlocking.a libDiskDaemon.a libDiskThreads.a \
-       libMmapped.a
+       libMmapped.a libIpcIo.a
 noinst_LIBRARIES = $(DISK_LIBS)
 noinst_LTLIBRARIES = libsquid.la
 
@@ -716,6 +716,14 @@ libMmapped_a_SOURCES = \
                DiskIO/Mmapped/MmappedDiskIOModule.cc \
                DiskIO/Mmapped/MmappedDiskIOModule.h 
 
+libIpcIo_a_SOURCES = \
+               DiskIO/IpcIo/IpcIoFile.cc \
+               DiskIO/IpcIo/IpcIoFile.h \
+               DiskIO/IpcIo/IpcIoIOStrategy.cc \
+               DiskIO/IpcIo/IpcIoIOStrategy.h \
+               DiskIO/IpcIo/IpcIoDiskIOModule.cc \
+               DiskIO/IpcIo/IpcIoDiskIOModule.h 
+
 libDiskDaemon_a_SOURCES = \
                DiskIO/DiskDaemon/DiskdFile.cc \
                DiskIO/DiskDaemon/DiskdFile.h \
index 1ec303cba92d29e804b15d972c1302ed8c922e04..eaa610107cee621718fa4dfb9194991c1d219cd1 100644 (file)
@@ -1885,8 +1885,11 @@ static void
 parse_cachedir(SquidConfig::_cacheSwap * swap)
 {
     // coordinator does not need to handle cache_dir.
-    if (IamCoordinatorProcess())
+    if (IamCoordinatorProcess()) {
+        // make sure the NumberOfKids() is correct for coordinator
+        ++swap->n_processes; // XXX: does not work in reconfigure
         return;
+    }
 
     char *type_str;
     char *path_str;
@@ -1953,6 +1956,7 @@ parse_cachedir(SquidConfig::_cacheSwap * swap)
     sd->parse(swap->n_configured, path_str);
 
     ++swap->n_configured;
+    ++swap->n_processes;
 
     /* Update the max object size */
     update_maxobjsize();
index 207cd7e6fe992b9d94d23b045ac81aea99cb0f52..cc5324f1045210611c8613de230bbb80a124d7cb 100644 (file)
@@ -109,7 +109,7 @@ Rock::SwapDir::init()
     // are refcounted. We up our count once to avoid implicit delete's.
     RefCountReference();
 
-    DiskIOModule *m = DiskIOModule::Find("Mmapped"); // TODO: configurable?
+    DiskIOModule *m = DiskIOModule::Find("IpcIo"); // TODO: configurable?
     assert(m);
     io = m->createStrategy();
     io->init();
index d7f341949680a85164f5f784faf0b9cb3bba6b74..f1e6032132f64b87a1bdc87d8d671189de5ba042 100644 (file)
@@ -17,6 +17,7 @@
 #include "mgr/Request.h"
 #include "mgr/Response.h"
 #include "mgr/StoreToCommWriter.h"
+#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: layering violation */
 
 
 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
@@ -45,10 +46,23 @@ Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
 
 void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
 {
+    debugs(54, 3, HERE << "registering kid" << strand.kidId <<
+           ' ' << strand.tag);
     if (StrandCoord* found = findStrand(strand.kidId))
         *found = strand;
     else
         strands_.push_back(strand);
+
+    // notify searchers waiting for this new strand, if any
+    typedef Searchers::iterator SRI;
+    for (SRI i = searchers.begin(); i != searchers.end();) {
+        if (i->tag == strand.tag) {
+            notifySearcher(*i, strand);
+            i = searchers.erase(i);
+               } else {
+            ++i;
+               }
+    }
 }
 
 void Ipc::Coordinator::receive(const TypedMsgHdr& message)
@@ -56,9 +70,20 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message)
     switch (message.type()) {
     case mtRegistration:
         debugs(54, 6, HERE << "Registration request");
-        handleRegistrationRequest(StrandCoord(message));
+        handleRegistrationRequest(HereIamMessage(message));
         break;
 
+    case mtIpcIoRequest: { // XXX: this should have been mtStrandSearchRequest
+        IpcIoRequest io(message);
+        StrandSearchRequest sr;
+        sr.requestorId = io.requestorId;
+        sr.requestId = io.requestId;
+        sr.tag.limitInit(io.buf, io.len);
+        debugs(54, 6, HERE << "Strand search request: " << io.requestorId << ' ' << io.requestId << ' ' << io.len << " cmd=" << io.command << " tag: " << sr.tag);
+        handleSearchRequest(sr);
+        break;
+       }
+
     case mtSharedListenRequest:
         debugs(54, 6, HERE << "Shared listen request");
         handleSharedListenRequest(SharedListenRequest(message));
@@ -80,14 +105,14 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message)
     }
 }
 
-void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
+void Ipc::Coordinator::handleRegistrationRequest(const HereIamMessage& msg)
 {
-    registerStrand(strand);
+    registerStrand(msg.strand);
 
     // send back an acknowledgement; TODO: remove as not needed?
     TypedMsgHdr message;
-    strand.pack(message);
-    SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
+    msg.pack(message);
+    SendMessage(MakeAddr(strandAddrPfx, msg.strand.kidId), message);
 }
 
 void
@@ -133,6 +158,45 @@ Ipc::Coordinator::handleCacheMgrResponse(const Mgr::Response& response)
     Mgr::Inquirer::HandleRemoteAck(response);
 }
 
+void
+Ipc::Coordinator::handleSearchRequest(const Ipc::StrandSearchRequest &request)
+{
+    // do we know of a strand with the given search tag?
+    const StrandCoord *strand = NULL;
+    typedef StrandCoords::const_iterator SCCI;
+    for (SCCI i = strands_.begin(); !strand && i != strands_.end(); ++i) {
+        if (i->tag == request.tag)
+            strand = &(*i);
+    }
+
+    if (strand) {
+        notifySearcher(request, *strand);
+        return;
+       }
+
+    searchers.push_back(request);
+    debugs(54, 3, HERE << "cannot yet tell kid" << request.requestorId <<
+        " who " << request.tag << " is");
+}
+
+void
+Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
+                                 const StrandCoord& strand)
+{
+    debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
+        request.tag << " is kid" << strand.kidId);
+    const StrandSearchResponse response0(request.requestId, strand);
+    // XXX: we should use StrandSearchResponse instead of converting it
+    IpcIoResponse io;
+    io.diskId = strand.kidId;
+    io.requestId = request.requestId;
+    io.command = IpcIo::cmdOpen;
+    TypedMsgHdr message;
+    io.pack(message);
+    SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+}
+
+
 int
 Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
                                    int &errNo)
index 11856b1d3e2d0f7a0ca956668bab6d4c988e7682..f2f4da03a4ab4d2d19fcbbd3190f4ca8f8fdeaa2 100644 (file)
 #include "ipc/Port.h"
 #include "ipc/SharedListen.h"
 #include "ipc/StrandCoords.h"
+#include "ipc/StrandSearch.h"
 #include "mgr/forward.h"
 
+#include <list>
 #include <map>
 
 namespace Ipc
@@ -40,7 +42,12 @@ protected:
 
     StrandCoord* findStrand(int kidId); ///< registered strand or NULL
     void registerStrand(const StrandCoord &); ///< adds or updates existing
-    void handleRegistrationRequest(const StrandCoord &); ///< register,ACK
+    void handleRegistrationRequest(const HereIamMessage &); ///< register,ACK
+
+    /// answer the waiting search request
+    void notifySearcher(const StrandSearchRequest &request, const StrandCoord&);
+    /// answers or queues the request if the answer is not yet known
+    void handleSearchRequest(const StrandSearchRequest &request);
 
     /// returns cached socket or calls openListenSocket()
     void handleSharedListenRequest(const SharedListenRequest& request);
@@ -53,6 +60,9 @@ protected:
 private:
     StrandCoords strands_; ///< registered processes and threads
 
+    typedef std::list<StrandSearchRequest> Searchers; ///< search requests
+    Searchers searchers; ///< yet unanswered search requests in arrival order
+
     typedef std::map<OpenListenerParams, int> Listeners; ///< params:fd map
     Listeners listeners; ///< cached comm_open_listener() results
 
index e2d13504d9dc99754ad79ca8a6c9c9fea4a93635..522adc8d9e10af7c43c10866c96816020f02da99 100644 (file)
@@ -16,6 +16,8 @@ libipc_la_SOURCES = \
        StrandCoord.cc \
        StrandCoord.h \
        StrandCoords.h \
+       StrandSearch.cc \
+       StrandSearch.h \
        SharedListen.cc \
        SharedListen.h \
        TypedMsgHdr.cc \
index 57ae03632d05cc325c6048052a83f5f7fb8f0a25..e90e8bbd759332fa95d099846e85fd72c1e24559 100644 (file)
@@ -18,7 +18,9 @@ namespace Ipc
 
 /// message class identifier
 typedef enum { mtNone = 0, mtRegistration,
+               mtStrandSearchRequest, mtStrandSearchResponse,
                mtSharedListenRequest, mtSharedListenResponse,
+               mtIpcIoRequest, mtIpcIoResponse,
                mtCacheMgrRequest, mtCacheMgrResponse
              } MessageType;
 
index a1e9f59c5dc69b54b451de1fc031185c18a57cb7..9c7e926249bfeb2d479f4a422bdcebb8b2908874 100644 (file)
@@ -15,6 +15,8 @@
 #include "mgr/Request.h"
 #include "mgr/Response.h"
 #include "mgr/Forwarder.h"
+#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: scope boundary violation */
+#include "SwapDir.h" /* XXX: scope boundary violation */
 #include "CacheManager.h"
 
 
@@ -37,8 +39,21 @@ void Ipc::Strand::registerSelf()
 {
     debugs(54, 6, HERE);
     Must(!isRegistered);
+
+    HereIamMessage ann(StrandCoord(KidIdentifier, getpid()));
+
+    // announce that we are responsible for our cache_dir if needed
+    // XXX: misplaced
+    if (IamDiskProcess()) {
+        const int myDisk = KidIdentifier % Config.cacheSwap.n_processes;
+        if (const SwapDir *sd = dynamic_cast<const SwapDir*>(INDEXSD(myDisk))) {
+            ann.strand.tag = sd->path;
+            ann.strand.tag.append("/rock"); // XXX: scope boundary violation
+               }
+       }
+
     TypedMsgHdr message;
-    StrandCoord(KidIdentifier, getpid()).pack(message);
+    ann.pack(message);
     SendMessage(coordinatorAddr, message);
     setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
 }
@@ -49,13 +64,21 @@ void Ipc::Strand::receive(const TypedMsgHdr &message)
     switch (message.type()) {
 
     case mtRegistration:
-        handleRegistrationResponse(StrandCoord(message));
+        handleRegistrationResponse(HereIamMessage(message));
         break;
 
     case mtSharedListenResponse:
         SharedListenJoined(SharedListenResponse(message));
         break;
 
+    case mtIpcIoRequest:
+        IpcIoFile::HandleRequest(IpcIoRequest(message));
+        break;
+
+    case mtIpcIoResponse:
+        IpcIoFile::HandleResponse(message);
+        break;
+
     case mtCacheMgrRequest:
         handleCacheMgrRequest(Mgr::Request(message));
         break;
@@ -70,10 +93,10 @@ void Ipc::Strand::receive(const TypedMsgHdr &message)
     }
 }
 
-void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand)
+void Ipc::Strand::handleRegistrationResponse(const HereIamMessage &msg)
 {
     // handle registration response from the coordinator; it could be stale
-    if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
+    if (msg.strand.kidId == KidIdentifier && msg.strand.pid == getpid()) {
         debugs(54, 6, "kid" << KidIdentifier << " registered");
         clearTimeout(); // we are done
     } else {
index d01cf2b7f29d7aab08a0c48f615637765e0dd179..c6f84ba9fcec7411d754c74155ce55dc7ed2f273 100644 (file)
@@ -8,6 +8,7 @@
 #ifndef SQUID_IPC_STRAND_H
 #define SQUID_IPC_STRAND_H
 
+#include "ipc/forward.h"
 #include "ipc/Port.h"
 #include "mgr/forward.h"
 
@@ -31,7 +32,7 @@ protected:
 
 private:
     void registerSelf(); /// let Coordinator know this strand exists
-    void handleRegistrationResponse(const StrandCoord &strand);
+    void handleRegistrationResponse(const HereIamMessage &msg);
     void handleCacheMgrRequest(const Mgr::Request& request);
     void handleCacheMgrResponse(const Mgr::Response& response);
 
index 3fdf45a9fc7c9c203cd06d44ebb2ff70c19324cc..703a7aafd24a4b009bbe94cf58b7702e8a5e9a43 100644 (file)
@@ -7,6 +7,7 @@
 
 
 #include "config.h"
+#include "Debug.h"
 #include "ipc/Messages.h"
 #include "ipc/StrandCoord.h"
 #include "ipc/TypedMsgHdr.h"
@@ -20,14 +21,37 @@ Ipc::StrandCoord::StrandCoord(int aKidId, pid_t aPid): kidId(aKidId), pid(aPid)
 {
 }
 
-Ipc::StrandCoord::StrandCoord(const TypedMsgHdr &hdrMsg): kidId(-1), pid(0)
+void
+Ipc::StrandCoord::unpack(const TypedMsgHdr &hdrMsg)
 {
-    hdrMsg.checkType(mtRegistration);
-    hdrMsg.getPod(*this);
+    hdrMsg.getPod(kidId);
+    hdrMsg.getPod(pid);
+    hdrMsg.getString(tag);
+debugs(0,0, HERE << "getting tag: " << tag);
 }
 
 void Ipc::StrandCoord::pack(TypedMsgHdr &hdrMsg) const
+{
+    hdrMsg.putPod(kidId);
+    hdrMsg.putPod(pid);
+debugs(0,0, HERE << "putting tag: " << tag);
+    hdrMsg.putString(tag);
+}
+
+
+Ipc::HereIamMessage::HereIamMessage(const StrandCoord &aStrand):
+    strand(aStrand)
+{
+}
+
+Ipc::HereIamMessage::HereIamMessage(const TypedMsgHdr &hdrMsg)
+{
+    hdrMsg.checkType(mtRegistration);
+    strand.unpack(hdrMsg);
+}
+
+void Ipc::HereIamMessage::pack(TypedMsgHdr &hdrMsg) const
 {
     hdrMsg.setType(mtRegistration);
-    hdrMsg.putPod(*this);
+    strand.pack(hdrMsg);
 }
index 02f2056de88ae47f97db42be88a57c54aa94b3e5..0970f0cfcfd41b3b030b67a5394b6c826cca3932 100644 (file)
@@ -7,6 +7,7 @@
 #define SQUID_IPC_STRAND_COORD_H
 
 #include "ipc/forward.h"
+#include "SquidString.h"
 #include <sys/types.h>
 
 namespace Ipc
@@ -17,15 +18,32 @@ class StrandCoord
 {
 public:
     StrandCoord(); ///< unknown location
-    StrandCoord(int akidId, pid_t aPid); ///< from registrant
-    explicit StrandCoord(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+    StrandCoord(int akidId, pid_t aPid);
+
     void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+    void unpack(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
 
 public:
     int kidId; ///< internal Squid process number
     pid_t pid; ///< OS process or thread identifier
+
+    String tag; ///< optional unique well-known key (e.g., cache_dir path)
 };
 
+/// strand registration with Coordinator (also used as an ACK)
+class HereIamMessage
+{
+public:
+    explicit HereIamMessage(const StrandCoord &strand); ///< from registrant
+    explicit HereIamMessage(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+    void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+
+public:
+    StrandCoord strand; ///< registrant coordinates and related details
+};
+
+
+
 } // namespace Ipc;
 
 #endif /* SQUID_IPC_STRAND_COORD_H */
index 95afe00dd67201606bae090f2e26e68ac79c77fb..e94d14f5f7625bb1b8f671cbd8a0511fb4ce6370 100644 (file)
@@ -27,7 +27,7 @@ namespace Ipc
 class TypedMsgHdr: public msghdr
 {
 public:
-    enum {maxSize = 4096};
+    enum {maxSize = 36*1024}; // XXX: so that disker I/O can fit
 
 public:
     TypedMsgHdr();
index 7325abb6bc68c91982783669ce19fa581ebb5464..5a159b71e3b9a3f7bc2c5915f4e1d83009520b9c 100644 (file)
@@ -13,6 +13,7 @@ namespace Ipc
 
 class TypedMsgHdr;
 class StrandCoord;
+class HereIamMessage;
 
 } // namespace Ipc
 
index 2f70ffe98b4b27b88b6e156e6ffbd53294065d7d..dd034b4fc717d5532a557c606d35b254250e53ce 100644 (file)
@@ -497,6 +497,8 @@ struct SquidConfig {
         RefCount<class Store> *swapDirs;
         int n_allocated;
         int n_configured;
+        ///< number of disk processes (set even when n_configured is not)
+        int n_processes;
     } cacheSwap;
     /*
      * I'm sick of having to keep doing this ..
index 0cfe32a1ed6f57edaa104330e92a7a26eb06dc9f..5d9a1218e24b4d665419304e78e330b5e4009957 100644 (file)
@@ -855,7 +855,7 @@ IamPrimaryProcess()
 
     // when there is a master and worker process, the master delegates
     // primary functions to its only kid
-    if (Config.workers == 1)
+    if (NumberOfKids() == 1)
         return IamWorkerProcess();
 
     // in SMP mode, multiple kids delegate primary functions to the coordinator
@@ -872,7 +872,7 @@ NumberOfKids()
     // XXX: detect and abort when called before workers/cache_dirs are parsed
 
     // XXX: this is not always the case as there are other cache_dir types
-    const int rockDirs = Config.cacheSwap.n_configured;
+    const int rockDirs = Config.cacheSwap.n_processes;
 
     const bool needCoord = Config.workers > 1 || rockDirs > 0;
     return (needCoord ? 1 : 0) + Config.workers + rockDirs;