]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
IpcIoFile atomic queue v1.
authorDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Wed, 16 Feb 2011 16:19:26 +0000 (19:19 +0300)
committerDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Wed, 16 Feb 2011 16:19:26 +0000 (19:19 +0300)
13 files changed:
src/DiskIO/IpcIo/IpcIoFile.cc
src/DiskIO/IpcIo/IpcIoFile.h
src/fs/rock/RockDirMap.cc
src/ipc/Coordinator.cc
src/ipc/Makefile.am
src/ipc/Messages.h
src/ipc/Queue.h [new file with mode: 0644]
src/ipc/SharedMemory.cc
src/ipc/SharedMemory.h
src/ipc/Strand.cc
src/ipc/StrandSearch.cc
src/ipc/StrandSearch.h
src/ipc/forward.h

index 0f7cd69f78740f84552289568ca5b1d0a683c17c..f7349e0c6ff4679779c963b326dbd6b552f1d776 100644 (file)
 #include "DiskIO/WriteRequest.h"
 #include "ipc/Messages.h"
 #include "ipc/Port.h"
-#include "ipc/StrandCoord.h"
+#include "ipc/StrandSearch.h"
 #include "ipc/UdsOp.h"
 
 CBDATA_CLASS_INIT(IpcIoFile);
 
-IpcIoFile::RequestMap IpcIoFile::TheRequestMap1;
-IpcIoFile::RequestMap IpcIoFile::TheRequestMap2;
-IpcIoFile::RequestMap *IpcIoFile::TheOlderRequests = &IpcIoFile::TheRequestMap1;
-IpcIoFile::RequestMap *IpcIoFile::TheNewerRequests = &IpcIoFile::TheRequestMap2;
-bool IpcIoFile::TimeoutCheckScheduled = false;
-unsigned int IpcIoFile::LastRequestId = 0;
+IpcIoFile::DiskerQueue *IpcIoFile::diskerQueue = NULL;
+IpcIoFile::IpcIoFiles IpcIoFile::ipcIoFiles;
 
 static bool DiskerOpen(const String &path, int flags, mode_t mode);
 static void DiskerClose(const String &path);
@@ -31,8 +27,12 @@ static void DiskerClose(const String &path);
 IpcIoFile::IpcIoFile(char const *aDb):
     dbName(aDb),
     diskId(-1),
-    ioLevel(0),
-    error_(false)
+    workerQueue(NULL),
+    error_(false),
+    lastRequestId(0),
+    olderRequests(&requestMap1),
+    newerRequests(&requestMap2),
+    timeoutCheckScheduled(false)
 {
 }
 
@@ -43,14 +43,20 @@ IpcIoFile::~IpcIoFile()
 void
 IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
 {
+    debugs(0,0,HERE);
     ioRequestor = callback;
     Must(diskId < 0); // we do not know our disker yet
+    Must(!diskerQueue && !workerQueue);
 
     if (IamDiskProcess()) {
         error_ = !DiskerOpen(dbName, flags, mode);
         if (error_)
             return;
 
+        // XXX: make capacity configurable
+        diskerQueue = new DiskerQueue(dbName.termedBuf(), Config.workers, 1024);
+        ipcIoFiles.insert(std::make_pair(KidIdentifier, this));
+
         Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
         ann.strand.tag = dbName;
         Ipc::TypedMsgHdr message;
@@ -59,36 +65,39 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
 
         ioRequestor->ioCompletedNotification();
         return;
-       }        
+    }
 
-    // XXX: use StrandSearchRequest instead
-    IpcIoRequest ipcIo;
-    ipcIo.requestorId = KidIdentifier;
-    ipcIo.command = IpcIo::cmdOpen;
-    ipcIo.len = dbName.size();
-    assert(ipcIo.len <= sizeof(ipcIo.buf));
-    memcpy(ipcIo.buf, dbName.rawBuf(), ipcIo.len);    
+    Ipc::StrandSearchRequest request;
+    request.requestorId = KidIdentifier;
+    request.tag = dbName;
 
-    IpcIoPendingRequest *pending = new IpcIoPendingRequest(this);
-    send(ipcIo, pending);
+    Ipc::TypedMsgHdr msg;
+    request.pack(msg);
+    Ipc::SendMessage(Ipc::coordinatorAddr, msg);
+
+    IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
+    trackPendingRequest(*pending);
 }
 
 void
-IpcIoFile::openCompleted(const IpcIoResponse *ipcResponse) {
-    if (!ipcResponse) {
+IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) {
+    debugs(0,0,HERE);
+    Must(diskId < 0); // we do not know our disker yet
+    Must(!workerQueue);
+
+    if (!response) {
         debugs(79,1, HERE << "error: timeout");
         error_ = true;
-       } else
-    if (ipcResponse->xerrno) {
-        debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno));
-        error_ = true;
-       } else {
-        diskId = ipcResponse->diskId;
-        if (diskId < 0) {
+    } else {
+        diskId = response->strand.kidId;
+        if (diskId >= 0) {
+            workerQueue = DiskerQueue::Attach(dbName.termedBuf(), KidIdentifier);
+            ipcIoFiles.insert(std::make_pair(diskId, this));
+        } else {
             error_ = true;
             debugs(79,1, HERE << "error: no disker claimed " << dbName);
-               }
-       }
+        }
+    }
 
     ioRequestor->ioCompletedNotification();
 }
@@ -100,6 +109,7 @@ IpcIoFile::openCompleted(const IpcIoResponse *ipcResponse) {
 void
 IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
 {
+    debugs(0,0,HERE);
     assert(false); // check
     /* We use the same logic path for open */
     open(flags, mode, callback);
@@ -108,11 +118,15 @@ IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
 void
 IpcIoFile::close()
 {
+    debugs(0,0,HERE);
     assert(ioRequestor != NULL);
 
+    delete diskerQueue;
+    delete workerQueue;
+
     if (IamDiskProcess())
         DiskerClose(dbName);
-    // XXX: else nothing to do?  
+    // XXX: else nothing to do?
 
     ioRequestor->closeCompleted();
 }
@@ -149,31 +163,26 @@ IpcIoFile::read(ReadRequest *readRequest)
     //assert(minOffset < 0 || minOffset <= readRequest->offset);
     //assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
 
-    IpcIoRequest ipcIo;
-    ipcIo.requestorId = KidIdentifier;
-    ipcIo.command = IpcIo::cmdRead;
-    ipcIo.offset = readRequest->offset;
-    ipcIo.len = readRequest->len;
-
-    IpcIoPendingRequest *pending = new IpcIoPendingRequest(this);
+    IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
     pending->readRequest = readRequest;
-    send(ipcIo, pending);
+    push(*pending);
 }
 
 void
 IpcIoFile::readCompleted(ReadRequest *readRequest,
-                         const IpcIoResponse *ipcResponse)
+                         const IpcIoMsg *const response)
 {
+    debugs(0,0,HERE);
     bool ioError = false;
-    if (!ipcResponse) {
+    if (!response) {
         debugs(79,1, HERE << "error: timeout");
         ioError = true; // I/O timeout does not warrant setting error_?
-       } else
-    if (ipcResponse->xerrno) {
-        debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno));
+    } else
+    if (response->xerrno) {
+        debugs(79,1, HERE << "error: " << xstrerr(response->xerrno));
         ioError = error_ = true;
-       } else {
-        memcpy(readRequest->buf, ipcResponse->buf, ipcResponse->len);
+    } else {
+        memcpy(readRequest->buf, response->buf, response->len);
     }
 
     const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
@@ -196,34 +205,27 @@ IpcIoFile::write(WriteRequest *writeRequest)
     //assert(minOffset < 0 || minOffset <= writeRequest->offset);
     //assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
 
-    IpcIoRequest ipcIo;
-    ipcIo.requestorId = KidIdentifier;
-    ipcIo.command = IpcIo::cmdWrite;
-    ipcIo.offset = writeRequest->offset;
-    ipcIo.len = writeRequest->len;
-    assert(ipcIo.len <= sizeof(ipcIo.buf));
-    memcpy(ipcIo.buf, writeRequest->buf, ipcIo.len); // optimize away
-
-    IpcIoPendingRequest *pending = new IpcIoPendingRequest(this);
+    IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
     pending->writeRequest = writeRequest;
-    send(ipcIo, pending);
+    push(*pending);
 }
 
 void
 IpcIoFile::writeCompleted(WriteRequest *writeRequest,
-                          const IpcIoResponse *ipcResponse)
+                          const IpcIoMsg *const response)
 {
+    debugs(0,0,HERE);
     bool ioError = false;
-    if (!ipcResponse) {
+    if (!response) {
         debugs(79,1, HERE << "error: timeout");
         ioError = true; // I/O timeout does not warrant setting error_?
-       } else
-    if (ipcResponse->xerrno) {
-        debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno));
-        error_ = true;
     } else
-    if (ipcResponse->len != writeRequest->len) {
-        debugs(79,1, HERE << "problem: " << ipcResponse->len << " < " << writeRequest->len);
+    if (response->xerrno) {
+        debugs(79,1, HERE << "error: " << xstrerr(response->xerrno));
+        ioError = error_ = true;
+    } else
+    if (response->len != writeRequest->len) {
+        debugs(79,1, HERE << "problem: " << response->len << " < " << writeRequest->len);
         error_ = true;
     }
 
@@ -243,111 +245,176 @@ IpcIoFile::writeCompleted(WriteRequest *writeRequest,
 bool
 IpcIoFile::ioInProgress() const
 {
-    return ioLevel > 0; // XXX: todo
+    return !olderRequests->empty() || !newerRequests->empty();
 }
 
-/// sends an I/O request to disker
+/// track a new pending request
 void
-IpcIoFile::send(IpcIoRequest &ipcIo, IpcIoPendingRequest *pending)
+IpcIoFile::trackPendingRequest(IpcIoPendingRequest &pending)
 {
-    if (++LastRequestId == 0) // don't use zero value as requestId
-        ++LastRequestId;
-    ipcIo.requestId = LastRequestId;
-    TheNewerRequests->insert(std::make_pair(ipcIo.requestId, pending));
-
-    Ipc::TypedMsgHdr message;
-    ipcIo.pack(message);
+    debugs(0,0,HERE);
+    newerRequests->insert(std::make_pair(pending.id, &pending));
+    if (!timeoutCheckScheduled)
+        scheduleTimeoutCheck();
+}
 
-    Must(diskId >= 0 || ipcIo.command == IpcIo::cmdOpen);
-    const String addr = diskId >= 0 ?
-        Ipc::Port::MakeAddr(Ipc::strandAddrPfx, diskId) :
-        Ipc::coordinatorAddr;
+/// push an I/O request to disker
+void
+IpcIoFile::push(IpcIoPendingRequest &pending)
+{
+    debugs(0,0,HERE);
+    Must(diskId >= 0);
+    Must(workerQueue);
+    Must(pending.readRequest || pending.writeRequest);
+
+    IpcIoMsg ipcIo;
+    if (pending.readRequest) {
+        ipcIo.command = IpcIo::cmdRead;
+        ipcIo.offset = pending.readRequest->offset;
+        ipcIo.len = pending.readRequest->len;
+        assert(ipcIo.len <= sizeof(ipcIo.buf));
+        memcpy(ipcIo.buf, pending.readRequest->buf, ipcIo.len); // optimize away
+    } else { // pending.writeRequest
+        ipcIo.command = IpcIo::cmdWrite;
+        ipcIo.offset = pending.writeRequest->offset;
+        ipcIo.len = pending.writeRequest->len;
+        assert(ipcIo.len <= sizeof(ipcIo.buf));
+        memcpy(ipcIo.buf, pending.writeRequest->buf, ipcIo.len); // optimize away
+    }
 
-    debugs(47, 7, HERE << "asking disker" << diskId << " to " <<
-        ipcIo.command << "; ipcIo" << KidIdentifier << '.' << ipcIo.requestId);
+    if (!workerQueue->push(ipcIo)) {
+        pending.completeIo(NULL);
+        return;
+    }
 
-    Ipc::SendMessage(addr, message);
-    ++ioLevel;
+    if (workerQueue->pushedSize() == 1)
+        Notify(diskId); // notify disker
 
-    if (!TimeoutCheckScheduled)
-        ScheduleTimeoutCheck();
+    trackPendingRequest(pending);
 }
 
-/// called when disker responds to our I/O request
+/// called when coordinator responds to worker open request
 void
-IpcIoFile::HandleResponse(const Ipc::TypedMsgHdr &raw)
+IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
 {
-    IpcIoResponse response(raw);
-
-    const int requestId = response.requestId;
-    debugs(47, 7, HERE << "disker response to " << response.command <<
-        "; ipcIo" << KidIdentifier << '.' << requestId);
+    debugs(47, 7, HERE << "coordinator response to open request");
+    Must(response.data);
+    reinterpret_cast<IpcIoFile*>(response.data)->openCompleted(&response);
+}
 
-    Must(requestId != 0);
+void
+IpcIoFile::handleResponses()
+{
+    Must(workerQueue);
+    IpcIoMsg ipcIo;
+    while (workerQueue->pop(ipcIo))
+        handleResponse(ipcIo);
+}
 
-    if (IpcIoPendingRequest *pending = DequeueRequest(requestId)) {
-        pending->completeIo(&response);
+void
+IpcIoFile::handleResponse(const IpcIoMsg &ipcIo)
+{
+    const int requestId = ipcIo.requestId;
+    debugs(47, 7, HERE << "disker response to " << ipcIo.command <<
+           "; ipcIo" << KidIdentifier << '.' << requestId);
+    Must(requestId);
+    if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
+        pending->completeIo(&ipcIo);
         delete pending; // XXX: leaking if throwing
     } else {
-        debugs(47, 4, HERE << "LATE disker response to " << response.command <<
-            "; ipcIo" << KidIdentifier << '.' << requestId);
+        debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command <<
+               "; ipcIo" << KidIdentifier << '.' << requestId);
         // nothing we can do about it; completeIo() has been called already
     }
 }
 
-/// Mgr::IpcIoFile::checkTimeous wrapper
 void
-IpcIoFile::CheckTimeouts(void*)
+IpcIoFile::Notify(const int peerId)
 {
-    TimeoutCheckScheduled = false;
+    debugs(0,0,HERE);
+    Ipc::TypedMsgHdr msg;
+    msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
+    msg.putInt(KidIdentifier);
+    const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, peerId);
+    Ipc::SendMessage(addr, msg);
+}
+
+void
+IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
+{
+    debugs(0,0,HERE);
+    if (IamDiskProcess())
+        DiskerHandleRequests();
+    else {
+        const int diskId = msg.getInt();
+        const IpcIoFiles::const_iterator i = ipcIoFiles.find(diskId);
+        Must(i != ipcIoFiles.end()); // XXX: warn and continue?
+        i->second->handleResponses();
+    }
+}
+
+/// IpcIoFile::checkTimeouts wrapper
+void
+IpcIoFile::CheckTimeouts(void *const param)
+{
+    debugs(0,0,HERE);
+    Must(param);
+    reinterpret_cast<IpcIoFile*>(param)->checkTimeouts();
+}
+
+void
+IpcIoFile::checkTimeouts()
+{
+    debugs(0,0,HERE);
+    timeoutCheckScheduled = false;
 
     // any old request would have timed out by now
     typedef RequestMap::const_iterator RMCI;
-    RequestMap &elders = *TheOlderRequests;
-    for (RMCI i = elders.begin(); i != elders.end(); ++i) {
-        IpcIoPendingRequest *pending = i->second;
+    for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
+        IpcIoPendingRequest *const pending = i->second;
 
         const int requestId = i->first;
         debugs(47, 7, HERE << "disker timeout; ipcIo" <<
-            KidIdentifier << '.' << requestId);
+               KidIdentifier << '.' << diskId << '.' << requestId);
 
         pending->completeIo(NULL); // no response
         delete pending; // XXX: leaking if throwing
-       }
-    elders.clear();
+    }
+    olderRequests->clear();
 
-    swap(TheOlderRequests, TheNewerRequests); // switches pointers around
-    if (!TheOlderRequests->empty())
-        ScheduleTimeoutCheck();
+    swap(olderRequests, newerRequests); // switches pointers around
+    if (!olderRequests->empty())
+        scheduleTimeoutCheck();
 }
 
 /// prepare to check for timeouts in a little while
 void
-IpcIoFile::ScheduleTimeoutCheck()
+IpcIoFile::scheduleTimeoutCheck()
 {
+    debugs(0,0,HERE);
     // we check all older requests at once so some may be wait for 2*timeout
     const double timeout = 7; // in seconds
     eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
-             NULL, timeout, 0, false);
-    TimeoutCheckScheduled = true;
+             this, timeout, 0, false);
+    timeoutCheckScheduled = true;
 }
 
 /// returns and forgets the right IpcIoFile pending request
 IpcIoPendingRequest *
-IpcIoFile::DequeueRequest(unsigned int requestId)
+IpcIoFile::dequeueRequest(const unsigned int requestId)
 {
     debugs(47, 3, HERE);
     Must(requestId != 0);
 
-       RequestMap *map = NULL;
-    RequestMap::iterator i = TheRequestMap1.find(requestId);
+    RequestMap *map = NULL;
+    RequestMap::iterator i = requestMap1.find(requestId);
 
-    if (i != TheRequestMap1.end())
-        map = &TheRequestMap1;
+    if (i != requestMap1.end())
+        map = &requestMap1;
     else {
-        i = TheRequestMap2.find(requestId);
-        if (i != TheRequestMap2.end())
-            map = &TheRequestMap2;
+        i = requestMap2.find(requestId);
+        if (i != requestMap2.end())
+            map = &requestMap2;
     }
 
     if (!map) // not found in both maps
@@ -359,100 +426,35 @@ IpcIoFile::DequeueRequest(unsigned int requestId)
 }
 
 int
-IpcIoFile::getFD() const 
+IpcIoFile::getFD() const
 {
     assert(false); // not supported; TODO: remove this method from API
     return -1;
 }
 
 
-/* IpcIoRequest */
-
-IpcIoRequest::IpcIoRequest():
-    requestorId(0), requestId(0),
-    offset(0), len(0),
-    command(IpcIo::cmdNone)
-{
-}
-
-IpcIoRequest::IpcIoRequest(const Ipc::TypedMsgHdr& msg)
-{
-    msg.checkType(Ipc::mtIpcIoRequest);
-    msg.getPod(requestorId);
-    msg.getPod(requestId);
-   
-    msg.getPod(offset);
-    msg.getPod(len);
-    msg.getPod(command);
-
-    if (command == IpcIo::cmdOpen || command == IpcIo::cmdWrite)
-        msg.getFixed(buf, len);
-}
-
-void
-IpcIoRequest::pack(Ipc::TypedMsgHdr& msg) const
-{
-    msg.setType(Ipc::mtIpcIoRequest);
-    msg.putPod(requestorId);
-    msg.putPod(requestId);
-
-    msg.putPod(offset);
-    msg.putPod(len);
-    msg.putPod(command);
-
-    if (command == IpcIo::cmdOpen || command == IpcIo::cmdWrite)
-        msg.putFixed(buf, len);
-}
-
-
-/* IpcIoResponse */
-
-IpcIoResponse::IpcIoResponse():
-    diskId(-1),
-    requestId(0),
-    len(0),
-    xerrno(0)
-{
-}
-
-IpcIoResponse::IpcIoResponse(const Ipc::TypedMsgHdr& msg)
-{
-    msg.checkType(Ipc::mtIpcIoResponse);
-    msg.getPod(diskId);
-    msg.getPod(requestId);
-    msg.getPod(len);
-    msg.getPod(command);
-    msg.getPod(xerrno);
-
-    if (command == IpcIo::cmdRead && !xerrno)
-        msg.getFixed(buf, len);
-}
+/* IpcIoMsg */
 
-void
-IpcIoResponse::pack(Ipc::TypedMsgHdr& msg) const
+IpcIoMsg::IpcIoMsg():
+    requestId(0), offset(0), len(0), command(IpcIo::cmdNone), xerrno(0)
 {
-    msg.setType(Ipc::mtIpcIoResponse);
-    msg.putPod(diskId);
-    msg.putPod(requestId);
-    msg.putPod(len);
-    msg.putPod(command);
-    msg.putPod(xerrno);
-
-    if (command == IpcIo::cmdRead && !xerrno)
-        msg.putFixed(buf, len);
 }
 
-
 /* IpcIoPendingRequest */
 
 IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
     file(aFile), readRequest(NULL), writeRequest(NULL)
 {
+    debugs(0,0,HERE);
+    if (++file->lastRequestId == 0) // don't use zero value as requestId
+        ++file->lastRequestId;
+    id = file->lastRequestId;
 }
 
 void
-IpcIoPendingRequest::completeIo(IpcIoResponse *response)
+IpcIoPendingRequest::completeIo(const IpcIoMsg *const response)
 {
+    debugs(0,0,HERE);
     Must(file != NULL);
 
     if (readRequest)
@@ -460,8 +462,10 @@ IpcIoPendingRequest::completeIo(IpcIoResponse *response)
     else
     if (writeRequest)
         file->writeCompleted(writeRequest, response);
-    else
-        file->openCompleted(response);
+    else {
+        Must(!response); // only timeouts are handled here
+        file->openCompleted(NULL);
+    }
 }
 
 
@@ -470,94 +474,89 @@ IpcIoPendingRequest::completeIo(IpcIoResponse *response)
 
 static int TheFile = -1; ///< db file descriptor
 
-static
-void diskerRead(const IpcIoRequest &request)
+static void
+diskerRead(IpcIoMsg &ipcIo)
 {
-    debugs(47,5, HERE << "disker" << KidIdentifier << " reads " <<
-        request.len << " at " << request.offset <<
-        " ipcIo" << request.requestorId << '.' << request.requestId);
-
-    IpcIoResponse response;
-    response.diskId = KidIdentifier;
-    response.requestId = request.requestId;
-    response.command = request.command;
-
-    const ssize_t read = pread(TheFile, response.buf, request.len, request.offset);
+    debugs(0,0,HERE);
+    const ssize_t read = pread(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
     if (read >= 0) {
-        response.xerrno = 0;
-        response.len = static_cast<size_t>(read); // safe because read > 0
+        ipcIo.xerrno = 0;
+        const size_t len = static_cast<size_t>(read); // safe because read > 0
         debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
-            (response.len == request.len ? "all " : "just ") << read);
-       } else {
-        response.xerrno = errno;
-        response.len = 0;
+            (len == ipcIo.len ? "all " : "just ") << read);
+        ipcIo.len = len;
+    } else {
+        ipcIo.xerrno = errno;
+        ipcIo.len = 0;
         debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
-            response.xerrno);
-       }
-    
-    Ipc::TypedMsgHdr message;
-    response.pack(message);
-    const String addr =
-        Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId);
-    Ipc::SendMessage(addr, message);
+            ipcIo.xerrno);
+    }
 }
 
-static
-void diskerWrite(const IpcIoRequest &request)
+static void
+diskerWrite(IpcIoMsg &ipcIo)
 {
-    debugs(47,5, HERE << "disker" << KidIdentifier << " writes " <<
-        request.len << " at " << request.offset <<
-        " ipcIo" << request.requestorId << '.' << request.requestId);
-
-    IpcIoResponse response;
-    response.diskId = KidIdentifier;
-    response.requestId = request.requestId;
-    response.command = request.command;
-
-    const ssize_t wrote = pwrite(TheFile, request.buf, request.len, request.offset);
+    debugs(0,0,HERE);
+    const ssize_t wrote = pwrite(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
     if (wrote >= 0) {
-        response.xerrno = 0;
-        response.len = static_cast<size_t>(wrote); // safe because wrote > 0
+        ipcIo.xerrno = 0;
+        const size_t len = static_cast<size_t>(wrote); // safe because wrote > 0
         debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " <<
-            (response.len == request.len ? "all " : "just ") << wrote);
-       } else {
-        response.xerrno = errno;
-        response.len = 0;
+            (len == ipcIo.len ? "all " : "just ") << wrote);
+        ipcIo.len = len;
+    } else {
+        ipcIo.xerrno = errno;
+        ipcIo.len = 0;
         debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " <<
-            response.xerrno);
-       }
-    
-    Ipc::TypedMsgHdr message;
-    response.pack(message);
-    const String addr =
-        Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId);
-    Ipc::SendMessage(addr, message);
+               ipcIo.xerrno);
+    }
 }
 
-/// called when disker receives an I/O request
 void
-IpcIoFile::HandleRequest(const IpcIoRequest &request)
+IpcIoFile::DiskerHandleRequests()
 {
-    switch (request.command) {
-    case IpcIo::cmdRead:
-        diskerRead(request);
-        break;
+    debugs(0,0,HERE);
+    Must(diskerQueue);
+    int workerId;
+    IpcIoMsg ipcIo;
+    while (diskerQueue->pop(workerId, ipcIo))
+        DiskerHandleRequest(workerId, ipcIo);
+}
 
-    case IpcIo::cmdWrite:
-        diskerWrite(request);
-        break;
+/// called when disker receives an I/O request
+void
+IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
+{
+    debugs(0,0,HERE);
+    Must(diskerQueue);
 
-    default:
+    if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
         debugs(0,0, HERE << "disker" << KidIdentifier <<
-               " should not receive " << request.command <<
-               " ipcIo" << request.requestorId << '.' << request.requestId);
-        break;
-       }
+               " should not receive " << ipcIo.command <<
+               " ipcIo" << workerId << '.' << ipcIo.requestId);
+        return;
+    }
+
+    debugs(47,5, HERE << "disker" << KidIdentifier <<
+           (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
+           ipcIo.len << " at " << ipcIo.offset <<
+           " ipcIo" << workerId << '.' << ipcIo.requestId);
+
+    if (ipcIo.command == IpcIo::cmdRead)
+        diskerRead(ipcIo);
+    else // ipcIo.command == IpcIo::cmdWrite
+        diskerWrite(ipcIo);
+
+    diskerQueue->push(workerId, ipcIo); // XXX: report warning
+
+    if (diskerQueue->pushedSize(workerId) == 1)
+        Notify(workerId); // notify worker
 }
 
 static bool
 DiskerOpen(const String &path, int flags, mode_t mode)
 {
+    debugs(0,0,HERE);
     assert(TheFile < 0);
 
     TheFile = file_open(path.termedBuf(), flags);
@@ -567,16 +566,17 @@ DiskerOpen(const String &path, int flags, mode_t mode)
         debugs(47,0, HERE << "rock db error opening " << path << ": " <<
                xstrerr(xerrno));
         return false;
-       }
+    }
 
     store_open_disk_fd++;
     debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile);
-       return true;
+    return true;
 }
 
 static void
 DiskerClose(const String &path)
 {
+    debugs(0,0,HERE);
     if (TheFile >= 0) {
         file_close(TheFile);
         debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
index 10181eae1a00484d2a56c3b1b959d8c668ac4f1c..99573a22ba26d3a5533f8c3bda179d474a8e2e1e 100644 (file)
@@ -6,6 +6,7 @@
 #include "DiskIO/DiskFile.h"
 #include "DiskIO/IORequestor.h"
 #include "ipc/forward.h"
+#include "ipc/Queue.h"
 #include <map>
 
 // TODO: expand to all classes
@@ -19,43 +20,19 @@ enum { BufCapacity = 32*1024 }; // XXX: must not exceed TypedMsgHdr.maxSize
 } // namespace IpcIo
 
 
-/// converts DiskIO requests to IPC messages
-// TODO: make this IpcIoMsg to make IpcIoRequest and IpcIoResponse similar
-class IpcIoRequest {
+/// converts DiskIO requests to IPC queue messages
+class IpcIoMsg {
 public:
-    IpcIoRequest();
-
-    explicit IpcIoRequest(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
-    void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
+    IpcIoMsg();
 
 public:
-    int requestorId; ///< kidId of the requestor; used for response destination
-    unsigned int requestId; ///< unique for sender; matches request w/ response
+    unsigned int requestId; ///< unique for requestor; matches request w/ response
 
-    /* ReadRequest and WriteRequest parameters to pass to disker */
     char buf[IpcIo::BufCapacity]; // XXX: inefficient
     off_t offset;
     size_t len;
 
-    IpcIo::Command command; ///< what disker is supposed to do
-};
-
-/// disker response to IpcIoRequest
-class IpcIoResponse {
-public:
-    IpcIoResponse();
-
-    explicit IpcIoResponse(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
-    void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
-
-public:
-    int diskId; ///< kidId of the responding disker
-    unsigned int requestId; ///< unique for sender; matches request w/ response
-
-    char buf[IpcIo::BufCapacity]; // XXX: inefficient
-    size_t len;
-
-    IpcIo::Command command; ///< what disker did
+    IpcIo::Command command; ///< what disker is supposed to do or did
 
     int xerrno; ///< I/O error code or zero
 };
@@ -83,44 +60,59 @@ public:
     virtual bool canWrite() const;
     virtual bool ioInProgress() const;
 
-    /// finds and calls the right IpcIoFile upon disker's response
-    static void HandleResponse(const Ipc::TypedMsgHdr &response);
+    /// handle open response from coordinator
+    static void HandleOpenResponse(const Ipc::StrandSearchResponse &response);
 
-    /// disker entry point for remote I/O requests
-    static void HandleRequest(const IpcIoRequest &request);
+    /// handle queue push notifications from worker or disker
+    static void HandleNotification(const Ipc::TypedMsgHdr &msg);
 
 protected:
     friend class IpcIoPendingRequest;
-    void openCompleted(const IpcIoResponse *response);
-    void readCompleted(ReadRequest *readRequest, const IpcIoResponse *);
-    void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse *);
+    void openCompleted(const Ipc::StrandSearchResponse *const response);
+    void readCompleted(ReadRequest *readRequest, const IpcIoMsg *const response);
+    void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response);
 
 private:
-    void send(IpcIoRequest &request, IpcIoPendingRequest *pending);
+    void trackPendingRequest(IpcIoPendingRequest &pending);
+    void push(IpcIoPendingRequest &pending);
+    IpcIoPendingRequest *dequeueRequest(const unsigned int requestId);
+
+    static void Notify(const int peerId);
 
-    static IpcIoPendingRequest *DequeueRequest(unsigned int requestId);
+    static void CheckTimeouts(void *const param);
+    void checkTimeouts();
+    void scheduleTimeoutCheck();
 
-    static void CheckTimeouts(void* param);
-    static void ScheduleTimeoutCheck();
+    void handleResponses();
+    void handleResponse(const IpcIoMsg &ipcIo);
+
+    static void DiskerHandleRequests();
+    static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
 
 private:
+    typedef FewToOneBiQueue<IpcIoMsg> DiskerQueue;
+    typedef OneToOneBiQueue<IpcIoMsg> WorkerQueue;
+
     const String dbName; ///< the name of the file we are managing
     int diskId; ///< the process ID of the disker we talk to
+    static DiskerQueue *diskerQueue; ///< IPC queue for disker
+    WorkerQueue *workerQueue; ///< IPC queue for worker
     RefCount<IORequestor> ioRequestor;
 
-    int ioLevel; ///< number of pending I/O requests using this file
-
     bool error_; ///< whether we have seen at least one I/O error (XXX)
 
+    unsigned int lastRequestId; ///< last requestId used
+
     /// maps requestId to the handleResponse callback
     typedef std::map<unsigned int, IpcIoPendingRequest*> RequestMap;
-    static RequestMap TheRequestMap1; ///< older (or newer) pending requests
-    static RequestMap TheRequestMap2; ///< newer (or older) pending requests
-    static RequestMap *TheOlderRequests; ///< older requests (map1 or map2)
-    static RequestMap *TheNewerRequests; ///< newer requests (map2 or map1)
-    static bool TimeoutCheckScheduled; ///< we expect a CheckTimeouts() call
+    RequestMap requestMap1; ///< older (or newer) pending requests
+    RequestMap requestMap2; ///< newer (or older) pending requests
+    RequestMap *olderRequests; ///< older requests (map1 or map2)
+    RequestMap *newerRequests; ///< newer requests (map2 or map1)
+    bool timeoutCheckScheduled; ///< we expect a CheckTimeouts() call
 
-    static unsigned int LastRequestId; ///< last requestId used
+    typedef std::map<int, IpcIoFile*> IpcIoFiles; ///< maps diskerId to IpcIoFile
+    static IpcIoFiles ipcIoFiles;
 
     CBDATA_CLASS2(IpcIoFile);
 };
@@ -133,10 +125,11 @@ public:
     IpcIoPendingRequest(const IpcIoFile::Pointer &aFile);
 
     /// called when response is received and, with a nil response, on timeouts
-    void completeIo(IpcIoResponse *response);
+    void completeIo(const IpcIoMsg *const response);
 
 public:
     IpcIoFile::Pointer file; ///< the file object waiting for the response
+    unsigned int id; ///< request id
     ReadRequest *readRequest; ///< set if this is a read requests
     WriteRequest *writeRequest; ///< set if this is a write request
 
index 6424265eee845917a9d6a100be284c949a37eeb7..d2b124ffa147585e3a7b7049c9839439a622fb06 100644 (file)
@@ -9,10 +9,8 @@
 #include "Store.h"
 #include "fs/rock/RockDirMap.h"
 
-static const char SharedMemoryName[] = "RockDirMap";
-
 Rock::DirMap::DirMap(const char *const aPath, const int limit):
-    path(aPath), shm(sharedMemoryName())
+    path(aPath), shm(aPath)
 {
     shm.create(SharedSize(limit));
     assert(shm.mem());
@@ -23,7 +21,7 @@ Rock::DirMap::DirMap(const char *const aPath, const int limit):
 }
 
 Rock::DirMap::DirMap(const char *const aPath):
-    path(aPath), shm(sharedMemoryName())
+    path(aPath), shm(aPath)
 {
     shm.open();
     assert(shm.mem());
@@ -271,22 +269,6 @@ Rock::DirMap::freeLocked(Slot &s)
            " in map [" << path << ']');
 }
 
-String
-Rock::DirMap::sharedMemoryName()
-{
-    String result;
-    const char *begin = path.termedBuf();
-    for (const char *end = strchr(begin, '/'); end; end = strchr(begin, '/')) {
-        if (begin != end) {
-            result.append(begin, end - begin);
-            result.append('.');
-        }
-        begin = end + 1;
-    }
-    result.append(begin);
-    return result;
-}
-
 int
 Rock::DirMap::SharedSize(const int limit)
 {
index afdec48cadd9c0a51508298d4d5140a2b85f5aa5..4c6a40638d6c8f5fdb2187a6ef7e07b22c2dfb35 100644 (file)
@@ -17,7 +17,6 @@
 #include "mgr/Request.h"
 #include "mgr/Response.h"
 #include "mgr/StoreToCommWriter.h"
-#include "DiskIO/IpcIo/IpcIoFile.h" /* XXX: layering violation */
 
 
 CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
@@ -77,16 +76,13 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message)
         handleRegistrationRequest(HereIamMessage(message));
         break;
 
-    case mtIpcIoRequest: { // XXX: this should have been mtStrandSearchRequest
-        IpcIoRequest io(message);
-        StrandSearchRequest sr;
-        sr.requestorId = io.requestorId;
-        sr.requestId = io.requestId;
-        sr.tag.limitInit(io.buf, io.len);
-        debugs(54, 6, HERE << "Strand search request: " << io.requestorId << ' ' << io.requestId << ' ' << io.len << " cmd=" << io.command << " tag: " << sr.tag);
+    case mtStrandSearchRequest: {
+        const StrandSearchRequest sr(message);
+        debugs(54, 6, HERE << "Strand search request: " << sr.requestorId <<
+               " tag: " << sr.tag);
         handleSearchRequest(sr);
         break;
-       }
+    }
 
     case mtSharedListenRequest:
         debugs(54, 6, HERE << "Shared listen request");
@@ -189,14 +185,9 @@ Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
 {
     debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
         request.tag << " is kid" << strand.kidId);
-    const StrandSearchResponse response0(request.requestId, strand);
-    // XXX: we should use StrandSearchResponse instead of converting it
-    IpcIoResponse io;
-    io.diskId = strand.kidId;
-    io.requestId = request.requestId;
-    io.command = IpcIo::cmdOpen;
+    const StrandSearchResponse response(request.data, strand);
     TypedMsgHdr message;
-    io.pack(message);
+    response.pack(message);
     SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
 }
 
index cff479eb2951ba9faebcc37d348b715bfaf39d07..eaa3d491a97aacc6388dc96675be9aaab94474ba 100644 (file)
@@ -12,6 +12,7 @@ libipc_la_SOURCES = \
        Kids.cc \
        Kids.h \
        Messages.h \
+       Queue.h \
        StartListening.cc \
        StartListening.h \
        StrandCoord.cc \
index e90e8bbd759332fa95d099846e85fd72c1e24559..e3ee4fa9a2c36a39d8a4b7c13b88072382cbcc40 100644 (file)
@@ -20,7 +20,7 @@ namespace Ipc
 typedef enum { mtNone = 0, mtRegistration,
                mtStrandSearchRequest, mtStrandSearchResponse,
                mtSharedListenRequest, mtSharedListenResponse,
-               mtIpcIoRequest, mtIpcIoResponse,
+               mtIpcIoNotification,
                mtCacheMgrRequest, mtCacheMgrResponse
              } MessageType;
 
diff --git a/src/ipc/Queue.h b/src/ipc/Queue.h
new file mode 100644 (file)
index 0000000..8c5554d
--- /dev/null
@@ -0,0 +1,251 @@
+/*
+ * $Id$
+ *
+ */
+
+#ifndef SQUID_IPC_QUEUE_H
+#define SQUID_IPC_QUEUE_H
+
+#include "Array.h"
+#include "ipc/AtomicWord.h"
+#include "ipc/SharedMemory.h"
+#include "SquidString.h"
+#include "util.h"
+
+/// Lockless fixed-capacity queue for a single writer and a single
+/// reader. Does not manage shared memory segment.
+template <class Value>
+class OneToOneUniQueue {
+public:
+    OneToOneUniQueue(const int aCapacity);
+
+    int size() const { return theSize; }
+    int capacity() const { return theCapacity; }
+
+    bool empty() const { return !theSize; }
+    bool full() const { return theSize == theCapacity; }
+
+    bool pop(Value &value); ///< returns false iff the queue is empty
+    bool push(const Value &value); ///< returns false iff the queue is full
+
+    static int Bytes2Items(int size);
+    static int Items2Bytes(const int size);
+
+private:
+    unsigned int theIn; ///< input index, used only in push()
+    unsigned int theOut; ///< output index, used only in pop()
+
+    AtomicWord theSize; ///< number of items in the queue
+    const int theCapacity; ///< maximum number of items, i.e. theBuffer size
+    Value theBuffer[];
+};
+
+/// Lockless fixed-capacity bidirectional queue for two processes.
+/// Manages shared memory segment.
+template <class Value>
+class OneToOneBiQueue {
+public:
+    typedef OneToOneUniQueue<Value> UniQueue;
+
+    /// Create a new shared queue.
+    OneToOneBiQueue(const char *const id, const int aCapacity);
+    OneToOneBiQueue(const char *const id); ///< Attach to existing shared queue.
+
+    int pushedSize() const { return pushQueue->size(); }
+
+    bool pop(Value &value) { return popQueue->pop(value); }
+    bool push(const Value &value) { return pushQueue->push(value); }
+
+private:
+    SharedMemory shm; ///< shared memory segment
+    UniQueue *popQueue; ///< queue to pop from for this process
+    UniQueue *pushQueue; ///< queue to push to for this process
+};
+
+/**
+ * Lockless fixed-capacity bidirectional queue for a limited number
+ * pricesses. Implements a star topology: Many worker processes
+ * communicate with the one central process. The central process uses
+ * FewToOneBiQueue object, while workers use OneToOneBiQueue objects
+ * created with the Attach() method. Each worker has a unique integer ID
+ * in [0, workerCount) range.
+ */
+template <class Value>
+class FewToOneBiQueue {
+public:
+    typedef OneToOneBiQueue<Value> BiQueue;
+
+    FewToOneBiQueue(const char *const id, const int aWorkerCount, const int aCapacity);
+    static BiQueue *Attach(const char *const id, const int workerId);
+    ~FewToOneBiQueue();
+
+    bool validWorkerId(const int workerId) const;
+    int workerCount() const { return theWorkerCount; }
+    int pushedSize(const int workerId) const;
+
+    bool pop(int &workerId, Value &value); ///< returns false iff the queue is empty
+    bool push(const int workerId, const Value &value); ///< returns false iff the queue is full
+
+private:
+    static String BiQueueId(String id, const int workerId);
+
+    int theLastPopWorkerId; ///< the last worker ID we pop()ed from
+    Vector<BiQueue *> biQueues; ///< worker queues
+    const int theWorkerCount; ///< number of worker processes
+    const int theCapacity; ///< per-worker capacity
+};
+
+
+// OneToOneUniQueue
+
+template <class Value>
+OneToOneUniQueue<Value>::OneToOneUniQueue(const int aCapacity):
+    theIn(0), theOut(0), theSize(0), theCapacity(aCapacity)
+{
+    assert(theCapacity > 0);
+}
+
+template <class Value>
+bool
+OneToOneUniQueue<Value>::pop(Value &value)
+{
+    if (empty())
+        return false;
+
+    const unsigned int pos = theOut++ % theCapacity;
+    value = theBuffer[pos];
+    --theSize;
+    return true;
+}
+
+template <class Value>
+bool
+OneToOneUniQueue<Value>::push(const Value &value)
+{
+    if (full())
+        return false;
+
+    const unsigned int pos = theIn++ % theCapacity;
+    theBuffer[pos] = value;
+    ++theSize;
+    return true;
+}
+
+template <class Value>
+int
+OneToOneUniQueue<Value>::Bytes2Items(int size)
+{
+    assert(size >= 0);
+    size -= sizeof(OneToOneUniQueue);
+    return size >= 0 ? size / sizeof(Value) : 0;
+}
+
+template <class Value>
+int
+OneToOneUniQueue<Value>::Items2Bytes(const int size)
+{
+    assert(size >= 0);
+    return sizeof(OneToOneUniQueue) + sizeof(Value) * size;
+}
+
+
+// OneToOneBiQueue
+
+template <class Value>
+OneToOneBiQueue<Value>::OneToOneBiQueue(const char *const id, const int capacity) :
+    shm(id)
+{
+    const int uniSize = UniQueue::Items2Bytes(capacity);
+    shm.create(uniSize * 2);
+    char *const mem = reinterpret_cast<char *>(shm.mem());
+    assert(mem);
+    popQueue = new (mem) UniQueue(capacity);
+    pushQueue = new (mem + uniSize) UniQueue(capacity);
+}
+
+template <class Value>
+OneToOneBiQueue<Value>::OneToOneBiQueue(const char *const id) :
+    shm(id)
+{
+    shm.open();
+    char *const mem = reinterpret_cast<char *>(shm.mem());
+    assert(mem);
+    pushQueue = reinterpret_cast<UniQueue *>(mem);
+    const int uniSize = pushQueue->Items2Bytes(pushQueue->capacity());
+    popQueue = reinterpret_cast<UniQueue *>(mem + uniSize);
+}
+
+// FewToOneBiQueue
+
+template <class Value>
+FewToOneBiQueue<Value>::FewToOneBiQueue(const char *const id, const int aWorkerCount, const int aCapacity):
+    theLastPopWorkerId(-1), theWorkerCount(aWorkerCount),
+    theCapacity(aCapacity)
+{
+    biQueues.reserve(theWorkerCount);
+    for (int i = 0; i < theWorkerCount; ++i) {
+        const String biQueueId = BiQueueId(id, i);
+        biQueues.push_back(new BiQueue(biQueueId.termedBuf(), theCapacity));
+    }
+}
+
+template <class Value>
+typename FewToOneBiQueue<Value>::BiQueue *
+FewToOneBiQueue<Value>::Attach(const char *const id, const int workerId)
+{
+    return new BiQueue(BiQueueId(id, workerId).termedBuf());
+}
+
+template <class Value>
+FewToOneBiQueue<Value>::~FewToOneBiQueue()
+{
+    for (int i = 0; i < theWorkerCount; ++i)
+        delete biQueues[i];
+}
+
+template <class Value>
+bool FewToOneBiQueue<Value>::validWorkerId(const int workerId) const
+{
+    return 0 <= workerId && workerId < theWorkerCount;
+}
+
+template <class Value>
+int FewToOneBiQueue<Value>::pushedSize(const int workerId) const
+{
+    assert(validWorkerId(workerId));
+    return biQueues[workerId]->pushedSize();
+}
+
+template <class Value>
+bool
+FewToOneBiQueue<Value>::pop(int &workerId, Value &value)
+{
+    ++theLastPopWorkerId;
+    for (int i = 0; i < theWorkerCount; ++i) {
+        theLastPopWorkerId = (theLastPopWorkerId + 1) % theWorkerCount;
+        if (biQueues[theLastPopWorkerId]->pop(value)) {
+            workerId = theLastPopWorkerId;
+            return true;
+        }
+    }
+    return false;
+}
+
+template <class Value>
+bool
+FewToOneBiQueue<Value>::push(const int workerId, const Value &value)
+{
+    assert(validWorkerId(workerId));
+    return biQueues[workerId]->push(value);
+}
+
+template <class Value>
+String
+FewToOneBiQueue<Value>::BiQueueId(String id, const int workerId)
+{
+    id.append("__");
+    id.append(xitoa(workerId));
+    return id;
+}
+
+#endif // SQUID_IPC_QUEUE_H
index e7a64e7337fe9974e7dfca05b4a85a35b2a0b0d4..22633b12eec8506b084648e5fda05bcec69f42fc 100644 (file)
@@ -16,7 +16,7 @@
 #include <sys/types.h>
 #include <unistd.h>
 
-SharedMemory::SharedMemory(const String &id):
+SharedMemory::SharedMemory(const char *const id):
     theName(GenerateName(id)), theFD(-1), theSize(-1), theMem(NULL)
 {
 }
@@ -112,11 +112,18 @@ SharedMemory::detach()
     theMem = 0;
 }
 
-/// Generate name for shared memory segment.
+/// Generate name for shared memory segment. Replaces all slashes with dots.
 String
-SharedMemory::GenerateName(const String &id)
+SharedMemory::GenerateName(const char *id)
 {
     String name("/squid-");
+    for (const char *slash = strchr(id, '/'); slash; slash = strchr(id, '/')) {
+        if (id != slash) {
+            name.append(id, slash - id);
+            name.append('.');
+        }
+        id = slash + 1;
+    }
     name.append(id);
     return name;
 }
index 8fd8d1c4ea1cc7ddeff7587c3d9da2ced648abec..bba5d1e1ad560e21d012c5e9be09e4e79a759ef8 100644 (file)
@@ -12,7 +12,7 @@
 class SharedMemory {
 public:
     /// Create a shared memory segment.
-    SharedMemory(const String &id);
+    SharedMemory(const char *const id);
     ~SharedMemory();
 
     /// Create a new shared memory segment. Fails if a segment with
@@ -28,7 +28,7 @@ private:
     void attach();
     void detach();
 
-    static String GenerateName(const String &id);
+    static String GenerateName(const char *id);
 
     const String theName; ///< shared memory segment file name
     int theFD; ///< shared memory segment file descriptor
index 54f82585ab2ae880dd132937f807fc4f6333f91d..8a8e5120dd3420a6911673cb54bf6eef7a310e2c 100644 (file)
@@ -11,6 +11,7 @@
 #include "ipc/StrandCoord.h"
 #include "ipc/Messages.h"
 #include "ipc/SharedListen.h"
+#include "ipc/StrandSearch.h"
 #include "ipc/Kids.h"
 #include "mgr/Request.h"
 #include "mgr/Response.h"
@@ -60,13 +61,12 @@ void Ipc::Strand::receive(const TypedMsgHdr &message)
         SharedListenJoined(SharedListenResponse(message));
         break;
 
-    case mtIpcIoRequest:
-        IpcIoFile::HandleRequest(IpcIoRequest(message));
+    case mtStrandSearchResponse:
+        IpcIoFile::HandleOpenResponse(StrandSearchResponse(message));
         break;
 
-    case mtIpcIoResponse:
-        IpcIoFile::HandleResponse(message);
-        break;
+    case mtIpcIoNotification:
+        IpcIoFile::HandleNotification(message);
 
     case mtCacheMgrRequest:
         handleCacheMgrRequest(Mgr::Request(message));
index 7568ed8c0af848a1ba9474cd8a778d1a97fbac4b..b43690bd2ed9f175706d1682995a8b2d951bca19 100644 (file)
 #include "ipc/TypedMsgHdr.h"
 
 
-Ipc::StrandSearchRequest::StrandSearchRequest(): requestorId(-1), requestId(0)
+Ipc::StrandSearchRequest::StrandSearchRequest(): requestorId(-1), data(0)
 {
 }
 
 Ipc::StrandSearchRequest::StrandSearchRequest(const TypedMsgHdr &hdrMsg):
-    requestorId(-1), requestId(0)
+    requestorId(-1), data(NULL)
 {
     hdrMsg.checkType(mtStrandSearchRequest);
     hdrMsg.getPod(requestorId);
-    hdrMsg.getPod(requestId);
+    hdrMsg.getPod(data);
     hdrMsg.getString(tag);
 }
 
@@ -29,30 +29,30 @@ void Ipc::StrandSearchRequest::pack(TypedMsgHdr &hdrMsg) const
 {
     hdrMsg.setType(mtStrandSearchRequest);
     hdrMsg.putPod(requestorId);
-    hdrMsg.putPod(requestId);
+    hdrMsg.putPod(data);
     hdrMsg.putString(tag);
 }
 
 
 /* StrandSearchResponse */
 
-Ipc::StrandSearchResponse::StrandSearchResponse(int aRequestId,
+Ipc::StrandSearchResponse::StrandSearchResponse(void *const aData,
     const Ipc::StrandCoord &aStrand):
-    requestId(aRequestId), strand(aStrand)
+    data(aData), strand(aStrand)
 {
 }
 
 Ipc::StrandSearchResponse::StrandSearchResponse(const TypedMsgHdr &hdrMsg):
-    requestId(0)
+    data(NULL)
 {
     hdrMsg.checkType(mtStrandSearchResponse);
-    hdrMsg.getPod(requestId);
+    hdrMsg.getPod(data);
     strand.unpack(hdrMsg);
 }
 
 void Ipc::StrandSearchResponse::pack(TypedMsgHdr &hdrMsg) const
 {
     hdrMsg.setType(mtStrandSearchResponse);
-    hdrMsg.putPod(requestId);
+    hdrMsg.putPod(data);
     strand.pack(hdrMsg);
 }
index a249a5df5e7cfe8bea12a1f41f18314192e34d13..2bd1d81e79c9d534ef2b3f7c2046f9e3db2cb8c5 100644 (file)
@@ -24,7 +24,7 @@ public:
 
 public:
     int requestorId; ///< sender-provided return address
-    unsigned int requestId; ///< sender-provided for response:request matching
+    void *data; ///< sender-provided opaque pointer
     String tag; ///< set when looking for a matching StrandCoord::tag
 };
 
@@ -32,12 +32,12 @@ public:
 class StrandSearchResponse
 {
 public:
-    StrandSearchResponse(int requestId, const StrandCoord &strand);
+    StrandSearchResponse(void *const aData, const StrandCoord &strand);
     explicit StrandSearchResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
     void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
 
 public:
-    unsigned int requestId; ///< a copy of the StrandSearchRequest::requestId
+    void *data; ///< a copy of the StrandSearchRequest::data
     StrandCoord strand; ///< answer matching StrandSearchRequest criteria
 };
 
index 5a159b71e3b9a3f7bc2c5915f4e1d83009520b9c..45a58a8e91576859e8007d5f8353cda3a5553fb1 100644 (file)
@@ -14,6 +14,7 @@ namespace Ipc
 class TypedMsgHdr;
 class StrandCoord;
 class HereIamMessage;
+class StrandSearchResponse;
 
 } // namespace Ipc