]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
IpcIoFile atomic queue v2:
authorDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Tue, 1 Mar 2011 02:44:52 +0000 (05:44 +0300)
committerDmitry Kurochkin <dmitry.kurochkin@measurement-factory.com>
Tue, 1 Mar 2011 02:44:52 +0000 (05:44 +0300)
* rework ipc/Queue, move implementation to .cc
* fix raw pointer use in IpcIoFile

src/DiskIO/IpcIo/IpcIoFile.cc
src/DiskIO/IpcIo/IpcIoFile.h
src/Makefile.am
src/ipc/Coordinator.cc
src/ipc/Makefile.am
src/ipc/Queue.h
src/ipc/StrandSearch.cc
src/ipc/StrandSearch.h

index 4514fa7153eee69123fe6f955f2c0147f2ca7c02..eb51a503f886458fcf8ec19534eaa1eb1dc7025b 100644 (file)
 CBDATA_CLASS_INIT(IpcIoFile);
 
 IpcIoFile::DiskerQueue *IpcIoFile::diskerQueue = NULL;
-IpcIoFile::IpcIoFiles IpcIoFile::ipcIoFiles;
+const double IpcIoFile::Timeout = 7;
+IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
+IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
 
 static bool DiskerOpen(const String &path, int flags, mode_t mode);
 static void DiskerClose(const String &path);
 
 
 IpcIoFile::IpcIoFile(char const *aDb):
-    dbName(aDb),
-    diskId(-1),
-    workerQueue(NULL),
-    error_(false),
-    lastRequestId(0),
-    olderRequests(&requestMap1),
-    newerRequests(&requestMap2),
+    dbName(aDb), diskId(-1), workerQueue(NULL), error_(false), lastRequestId(0),
+    olderRequests(&requestMap1), newerRequests(&requestMap2),
     timeoutCheckScheduled(false)
 {
 }
 
 IpcIoFile::~IpcIoFile()
 {
+    if (diskId >= 0) {
+        const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId);
+        // XXX: warn and continue?
+        Must(i != IpcIoFiles.end());
+        Must(i->second == this);
+        IpcIoFiles.erase(i);
+    }
 }
 
 void
@@ -53,8 +57,12 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
             return;
 
         // XXX: make capacity configurable
-        diskerQueue = new DiskerQueue(dbName.termedBuf(), Config.workers, 1024);
-        ipcIoFiles.insert(std::make_pair(KidIdentifier, this));
+        diskerQueue =
+            new DiskerQueue(dbName, Config.workers, sizeof(IpcIoMsg), 1024);
+        diskId = KidIdentifier;
+        const bool inserted =
+            IpcIoFiles.insert(std::make_pair(diskId, this)).second;
+        Must(inserted);
 
         Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
         ann.strand.tag = dbName;
@@ -68,15 +76,16 @@ IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
 
     Ipc::StrandSearchRequest request;
     request.requestorId = KidIdentifier;
-    request.data = this;
     request.tag = dbName;
 
     Ipc::TypedMsgHdr msg;
     request.pack(msg);
     Ipc::SendMessage(Ipc::coordinatorAddr, msg);
 
-    IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
-    trackPendingRequest(*pending);
+    WaitingForOpen.push_back(this);
+
+    eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
+             this, Timeout, 0, false); // "this" pointer is used as id
 }
 
 void
@@ -84,18 +93,16 @@ IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) {
     Must(diskId < 0); // we do not know our disker yet
     Must(!workerQueue);
 
-    // contain single pending open request at this time
-    newerRequests->clear();
-    olderRequests->clear();
-
     if (!response) {
         debugs(79,1, HERE << "error: timeout");
         error_ = true;
     } else {
         diskId = response->strand.kidId;
         if (diskId >= 0) {
-            workerQueue = DiskerQueue::Attach(dbName.termedBuf(), KidIdentifier - 1);
-            ipcIoFiles.insert(std::make_pair(diskId, this));
+            workerQueue = DiskerQueue::Attach(dbName, KidIdentifier - 1);
+            const bool inserted =
+                IpcIoFiles.insert(std::make_pair(diskId, this)).second;
+            Must(inserted);
         } else {
             error_ = true;
             debugs(79,1, HERE << "error: no disker claimed " << dbName);
@@ -251,7 +258,7 @@ IpcIoFile::ioInProgress() const
 void
 IpcIoFile::trackPendingRequest(IpcIoPendingRequest &pending)
 {
-    newerRequests->insert(std::make_pair(pending.id, &pending));
+    newerRequests->insert(std::make_pair(lastRequestId, &pending));
     if (!timeoutCheckScheduled)
         scheduleTimeoutCheck();
 }
@@ -266,7 +273,7 @@ IpcIoFile::push(IpcIoPendingRequest &pending)
     Must(pending.readRequest || pending.writeRequest);
 
     IpcIoMsg ipcIo;
-    ipcIo.requestId = pending.id;
+    ipcIo.requestId = lastRequestId;
     if (pending.readRequest) {
         ipcIo.command = IpcIo::cmdRead;
         ipcIo.offset = pending.readRequest->offset;
@@ -297,8 +304,18 @@ void
 IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
 {
     debugs(47, 7, HERE << "coordinator response to open request");
-    Must(response.data);
-    reinterpret_cast<IpcIoFile*>(response.data)->openCompleted(&response);
+    for (IpcIoFileList::iterator i = WaitingForOpen.begin();
+         i != WaitingForOpen.end(); ++i) {
+        if (response.strand.tag == (*i)->dbName) {
+            (*i)->openCompleted(&response);
+            WaitingForOpen.erase(i);
+            return;
+        }
+    }
+
+    debugs(47, 4, HERE << "LATE disker response to open for " <<
+           response.strand.tag);
+    // nothing we can do about it; completeIo() has been called already
 }
 
 void
@@ -346,18 +363,40 @@ IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
         DiskerHandleRequests();
     else {
         const int diskId = msg.getInt();
-        const IpcIoFiles::const_iterator i = ipcIoFiles.find(diskId);
-        Must(i != ipcIoFiles.end()); // XXX: warn and continue?
+        const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
+        Must(i != IpcIoFiles.end()); // XXX: warn and continue?
         i->second->handleResponses();
     }
 }
 
+/// handles open request timeout
+void
+IpcIoFile::OpenTimeout(void *const param)
+{
+    Must(param);
+    // the pointer is used for comparison only and not dereferenced
+    const IpcIoFile *const ipcIoFile =
+        reinterpret_cast<const IpcIoFile *>(param);
+    for (IpcIoFileList::iterator i = WaitingForOpen.begin();
+         i != WaitingForOpen.end(); ++i) {
+        if (*i == ipcIoFile) {
+            (*i)->openCompleted(NULL);
+            WaitingForOpen.erase(i);
+            break;
+        }
+    }
+}
+
 /// IpcIoFile::checkTimeouts wrapper
 void
 IpcIoFile::CheckTimeouts(void *const param)
 {
     Must(param);
-    reinterpret_cast<IpcIoFile*>(param)->checkTimeouts();
+    const int diskId = reinterpret_cast<uintptr_t>(param);
+    debugs(47, 7, HERE << "diskId=" << diskId);
+    const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
+    if (i != IpcIoFiles.end())
+        i->second->checkTimeouts();
 }
 
 void
@@ -370,9 +409,9 @@ IpcIoFile::checkTimeouts()
     for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
         IpcIoPendingRequest *const pending = i->second;
 
-        const int requestId = i->first;
+        const unsigned int requestId = i->first;
         debugs(47, 7, HERE << "disker timeout; ipcIo" <<
-               KidIdentifier << '.' << diskId << '.' << requestId);
+               KidIdentifier << '.' << requestId);
 
         pending->completeIo(NULL); // no response
         delete pending; // XXX: leaking if throwing
@@ -388,10 +427,9 @@ IpcIoFile::checkTimeouts()
 void
 IpcIoFile::scheduleTimeoutCheck()
 {
-    // we check all older requests at once so some may be wait for 2*timeout
-    const double timeout = 7; // in seconds
+    // we check all older requests at once so some may be wait for 2*Timeout
     eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
-             this, timeout, 0, false);
+             reinterpret_cast<void *>(diskId), Timeout, 0, false);
     timeoutCheckScheduled = true;
 }
 
@@ -440,16 +478,14 @@ IpcIoMsg::IpcIoMsg():
 IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
     file(aFile), readRequest(NULL), writeRequest(NULL)
 {
+    Must(file != NULL);
     if (++file->lastRequestId == 0) // don't use zero value as requestId
         ++file->lastRequestId;
-    id = file->lastRequestId;
 }
 
 void
 IpcIoPendingRequest::completeIo(const IpcIoMsg *const response)
 {
-    Must(file != NULL);
-
     if (readRequest)
         file->readCompleted(readRequest, response);
     else
index 99573a22ba26d3a5533f8c3bda179d474a8e2e1e..c92833396e4b93b5da37744133fb676946a33a5c 100644 (file)
@@ -7,6 +7,7 @@
 #include "DiskIO/IORequestor.h"
 #include "ipc/forward.h"
 #include "ipc/Queue.h"
+#include <list>
 #include <map>
 
 // TODO: expand to all classes
@@ -79,6 +80,7 @@ private:
 
     static void Notify(const int peerId);
 
+    static void OpenTimeout(void *const param);
     static void CheckTimeouts(void *const param);
     void checkTimeouts();
     void scheduleTimeoutCheck();
@@ -90,8 +92,8 @@ private:
     static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
 
 private:
-    typedef FewToOneBiQueue<IpcIoMsg> DiskerQueue;
-    typedef OneToOneBiQueue<IpcIoMsg> WorkerQueue;
+    typedef FewToOneBiQueue DiskerQueue;
+    typedef OneToOneBiQueue WorkerQueue;
 
     const String dbName; ///< the name of the file we are managing
     int diskId; ///< the process ID of the disker we talk to
@@ -111,8 +113,14 @@ private:
     RequestMap *newerRequests; ///< newer requests (map2 or map1)
     bool timeoutCheckScheduled; ///< we expect a CheckTimeouts() call
 
-    typedef std::map<int, IpcIoFile*> IpcIoFiles; ///< maps diskerId to IpcIoFile
-    static IpcIoFiles ipcIoFiles;
+    static const double Timeout; ///< timeout value in seconds
+
+    typedef std::list<Pointer> IpcIoFileList;
+    static IpcIoFileList WaitingForOpen; ///< pending open requests
+
+    ///< maps diskerId to IpcIoFile, cleared in destructor
+    typedef std::map<int, IpcIoFile*> IpcIoFilesMap;
+    static IpcIoFilesMap IpcIoFiles;
 
     CBDATA_CLASS2(IpcIoFile);
 };
@@ -128,8 +136,7 @@ public:
     void completeIo(const IpcIoMsg *const response);
 
 public:
-    IpcIoFile::Pointer file; ///< the file object waiting for the response
-    unsigned int id; ///< request id
+    const IpcIoFile::Pointer file; ///< the file object waiting for the response
     ReadRequest *readRequest; ///< set if this is a read requests
     WriteRequest *writeRequest; ///< set if this is a write request
 
index 2300eba3b4aa4fc2b3a87dd727519a81c69ea48c..89d76e77a44c49c5947cc9683c7954ce21c2fdfa 100644 (file)
@@ -536,6 +536,7 @@ nodist_squid_SOURCES = \
        $(BUILT_SOURCES)
 
 squid_LDADD = \
+       $(DISK_LIBS) \
        $(COMMON_LIBS) \
        comm/libcomm.la \
        eui/libeui.la \
@@ -544,7 +545,6 @@ squid_LDADD = \
        $(XTRA_OBJS) \
        $(DISK_LINKOBJS) \
        $(REPL_OBJS) \
-       $(DISK_LIBS) \
        $(DISK_OS_LIBS) \
        $(CRYPTLIB) \
        $(REGEXLIB) \
index 4c6a40638d6c8f5fdb2187a6ef7e07b22c2dfb35..8479da88be450659ce87872e709bde128b0d3846 100644 (file)
@@ -185,7 +185,7 @@ Ipc::Coordinator::notifySearcher(const Ipc::StrandSearchRequest &request,
 {
     debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
         request.tag << " is kid" << strand.kidId);
-    const StrandSearchResponse response(request.data, strand);
+    const StrandSearchResponse response(strand);
     TypedMsgHdr message;
     response.pack(message);
     SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
index eaa3d491a97aacc6388dc96675be9aaab94474ba..da53ddb4f0f50d6cc960de9e45ef85f37dcdfbe2 100644 (file)
@@ -12,6 +12,7 @@ libipc_la_SOURCES = \
        Kids.cc \
        Kids.h \
        Messages.h \
+       Queue.cc \
        Queue.h \
        StartListening.cc \
        StartListening.h \
index 8c5554d66bd069c45674b5b5739765c0752ff3d7..25839851b6ca6ac73122fe4403b0e8f6a1f01937 100644 (file)
@@ -9,57 +9,68 @@
 #include "Array.h"
 #include "ipc/AtomicWord.h"
 #include "ipc/SharedMemory.h"
-#include "SquidString.h"
 #include "util.h"
 
-/// Lockless fixed-capacity queue for a single writer and a single
-/// reader. Does not manage shared memory segment.
-template <class Value>
+class String;
+
+/// Lockless fixed-capacity queue for a single writer and a single reader.
 class OneToOneUniQueue {
 public:
-    OneToOneUniQueue(const int aCapacity);
+    class ItemTooLarge {};
+
+    OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity);
+    OneToOneUniQueue(const String &id);
 
-    int size() const { return theSize; }
-    int capacity() const { return theCapacity; }
+    unsigned int maxItemSize() const { return shared->theMaxItemSize; }
+    int size() const { return shared->theSize; }
+    int capacity() const { return shared->theCapacity; }
 
-    bool empty() const { return !theSize; }
-    bool full() const { return theSize == theCapacity; }
+    bool empty() const { return !shared->theSize; }
+    bool full() const { return shared->theSize == shared->theCapacity; }
 
+    static int Bytes2Items(const unsigned int maxItemSize, int size);
+    static int Items2Bytes(const unsigned int maxItemSize, const int size);
+
+    template <class Value>
     bool pop(Value &value); ///< returns false iff the queue is empty
+    template <class Value>
     bool push(const Value &value); ///< returns false iff the queue is full
 
-    static int Bytes2Items(int size);
-    static int Items2Bytes(const int size);
-
 private:
-    unsigned int theIn; ///< input index, used only in push()
-    unsigned int theOut; ///< output index, used only in pop()
+    struct Shared {
+        Shared(const unsigned int aMaxItemSize, const int aCapacity);
 
-    AtomicWord theSize; ///< number of items in the queue
-    const int theCapacity; ///< maximum number of items, i.e. theBuffer size
-    Value theBuffer[];
+        unsigned int theIn; ///< input index, used only in push()
+        unsigned int theOut; ///< output index, used only in pop()
+
+        AtomicWord theSize; ///< number of items in the queue
+        const unsigned int theMaxItemSize; ///< maximum item size
+        const int theCapacity; ///< maximum number of items, i.e. theBuffer size
+
+        char theBuffer[];
+    };
+
+    SharedMemory shm; ///< shared memory segment
+    Shared *shared; ///< pointer to shared memory
 };
 
 /// Lockless fixed-capacity bidirectional queue for two processes.
-/// Manages shared memory segment.
-template <class Value>
 class OneToOneBiQueue {
 public:
-    typedef OneToOneUniQueue<Value> UniQueue;
-
     /// Create a new shared queue.
-    OneToOneBiQueue(const char *const id, const int aCapacity);
-    OneToOneBiQueue(const char *const id); ///< Attach to existing shared queue.
+    OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity);
+    OneToOneBiQueue(const String &id); ///< Attach to existing shared queue.
 
     int pushedSize() const { return pushQueue->size(); }
 
+    template <class Value>
     bool pop(Value &value) { return popQueue->pop(value); }
+    template <class Value>
     bool push(const Value &value) { return pushQueue->push(value); }
 
 private:
-    SharedMemory shm; ///< shared memory segment
-    UniQueue *popQueue; ///< queue to pop from for this process
-    UniQueue *pushQueue; ///< queue to push to for this process
+    OneToOneUniQueue *const popQueue; ///< queue to pop from for this process
+    OneToOneUniQueue *const pushQueue; ///< queue to push to for this process
 };
 
 /**
@@ -67,158 +78,73 @@ private:
  * pricesses. Implements a star topology: Many worker processes
  * communicate with the one central process. The central process uses
  * FewToOneBiQueue object, while workers use OneToOneBiQueue objects
- * created with the Attach() method. Each worker has a unique integer ID
- * in [0, workerCount) range.
+ * created with the Attach() method. Each worker has a unique integer
+ * ID in [0, workerCount) range.
  */
-template <class Value>
 class FewToOneBiQueue {
 public:
-    typedef OneToOneBiQueue<Value> BiQueue;
-
-    FewToOneBiQueue(const char *const id, const int aWorkerCount, const int aCapacity);
-    static BiQueue *Attach(const char *const id, const int workerId);
+    FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity);
+    static OneToOneBiQueue *Attach(const String &id, const int workerId);
     ~FewToOneBiQueue();
 
     bool validWorkerId(const int workerId) const;
     int workerCount() const { return theWorkerCount; }
     int pushedSize(const int workerId) const;
 
+    template <class Value>
     bool pop(int &workerId, Value &value); ///< returns false iff the queue is empty
+    template <class Value>
     bool push(const int workerId, const Value &value); ///< returns false iff the queue is full
 
 private:
-    static String BiQueueId(String id, const int workerId);
-
     int theLastPopWorkerId; ///< the last worker ID we pop()ed from
-    Vector<BiQueue *> biQueues; ///< worker queues
+    Vector<OneToOneBiQueue *> biQueues; ///< worker queues
     const int theWorkerCount; ///< number of worker processes
-    const int theCapacity; ///< per-worker capacity
 };
 
 
 // OneToOneUniQueue
 
-template <class Value>
-OneToOneUniQueue<Value>::OneToOneUniQueue(const int aCapacity):
-    theIn(0), theOut(0), theSize(0), theCapacity(aCapacity)
-{
-    assert(theCapacity > 0);
-}
-
 template <class Value>
 bool
-OneToOneUniQueue<Value>::pop(Value &value)
+OneToOneUniQueue::pop(Value &value)
 {
+    if (sizeof(value) > shared->theMaxItemSize)
+        throw ItemTooLarge();
+
     if (empty())
         return false;
 
-    const unsigned int pos = theOut++ % theCapacity;
-    value = theBuffer[pos];
-    --theSize;
+    const unsigned int pos =
+        shared->theOut++ % shared->theCapacity * shared->theMaxItemSize;
+    memcpy(&value, shared->theBuffer + pos, sizeof(value));
+    --shared->theSize;
     return true;
 }
 
 template <class Value>
 bool
-OneToOneUniQueue<Value>::push(const Value &value)
+OneToOneUniQueue::push(const Value &value)
 {
+    if (sizeof(value) > shared->theMaxItemSize)
+        throw ItemTooLarge();
+
     if (full())
         return false;
 
-    const unsigned int pos = theIn++ % theCapacity;
-    theBuffer[pos] = value;
-    ++theSize;
+    const unsigned int pos =
+        shared->theIn++ % shared->theCapacity * shared->theMaxItemSize;
+    memcpy(shared->theBuffer + pos, &value, sizeof(value));
+    ++shared->theSize;
     return true;
 }
 
-template <class Value>
-int
-OneToOneUniQueue<Value>::Bytes2Items(int size)
-{
-    assert(size >= 0);
-    size -= sizeof(OneToOneUniQueue);
-    return size >= 0 ? size / sizeof(Value) : 0;
-}
-
-template <class Value>
-int
-OneToOneUniQueue<Value>::Items2Bytes(const int size)
-{
-    assert(size >= 0);
-    return sizeof(OneToOneUniQueue) + sizeof(Value) * size;
-}
-
-
-// OneToOneBiQueue
-
-template <class Value>
-OneToOneBiQueue<Value>::OneToOneBiQueue(const char *const id, const int capacity) :
-    shm(id)
-{
-    const int uniSize = UniQueue::Items2Bytes(capacity);
-    shm.create(uniSize * 2);
-    char *const mem = reinterpret_cast<char *>(shm.mem());
-    assert(mem);
-    popQueue = new (mem) UniQueue(capacity);
-    pushQueue = new (mem + uniSize) UniQueue(capacity);
-}
-
-template <class Value>
-OneToOneBiQueue<Value>::OneToOneBiQueue(const char *const id) :
-    shm(id)
-{
-    shm.open();
-    char *const mem = reinterpret_cast<char *>(shm.mem());
-    assert(mem);
-    pushQueue = reinterpret_cast<UniQueue *>(mem);
-    const int uniSize = pushQueue->Items2Bytes(pushQueue->capacity());
-    popQueue = reinterpret_cast<UniQueue *>(mem + uniSize);
-}
 
 // FewToOneBiQueue
 
-template <class Value>
-FewToOneBiQueue<Value>::FewToOneBiQueue(const char *const id, const int aWorkerCount, const int aCapacity):
-    theLastPopWorkerId(-1), theWorkerCount(aWorkerCount),
-    theCapacity(aCapacity)
-{
-    biQueues.reserve(theWorkerCount);
-    for (int i = 0; i < theWorkerCount; ++i) {
-        const String biQueueId = BiQueueId(id, i);
-        biQueues.push_back(new BiQueue(biQueueId.termedBuf(), theCapacity));
-    }
-}
-
-template <class Value>
-typename FewToOneBiQueue<Value>::BiQueue *
-FewToOneBiQueue<Value>::Attach(const char *const id, const int workerId)
-{
-    return new BiQueue(BiQueueId(id, workerId).termedBuf());
-}
-
-template <class Value>
-FewToOneBiQueue<Value>::~FewToOneBiQueue()
-{
-    for (int i = 0; i < theWorkerCount; ++i)
-        delete biQueues[i];
-}
-
-template <class Value>
-bool FewToOneBiQueue<Value>::validWorkerId(const int workerId) const
-{
-    return 0 <= workerId && workerId < theWorkerCount;
-}
-
-template <class Value>
-int FewToOneBiQueue<Value>::pushedSize(const int workerId) const
-{
-    assert(validWorkerId(workerId));
-    return biQueues[workerId]->pushedSize();
-}
-
 template <class Value>
 bool
-FewToOneBiQueue<Value>::pop(int &workerId, Value &value)
+FewToOneBiQueue::pop(int &workerId, Value &value)
 {
     ++theLastPopWorkerId;
     for (int i = 0; i < theWorkerCount; ++i) {
@@ -233,19 +159,10 @@ FewToOneBiQueue<Value>::pop(int &workerId, Value &value)
 
 template <class Value>
 bool
-FewToOneBiQueue<Value>::push(const int workerId, const Value &value)
+FewToOneBiQueue::push(const int workerId, const Value &value)
 {
     assert(validWorkerId(workerId));
     return biQueues[workerId]->push(value);
 }
 
-template <class Value>
-String
-FewToOneBiQueue<Value>::BiQueueId(String id, const int workerId)
-{
-    id.append("__");
-    id.append(xitoa(workerId));
-    return id;
-}
-
 #endif // SQUID_IPC_QUEUE_H
index b43690bd2ed9f175706d1682995a8b2d951bca19..a5ff3c8afdc130239f56e2b5fc2a5e62a67d0ce7 100644 (file)
 #include "ipc/TypedMsgHdr.h"
 
 
-Ipc::StrandSearchRequest::StrandSearchRequest(): requestorId(-1), data(0)
+Ipc::StrandSearchRequest::StrandSearchRequest(): requestorId(-1)
 {
 }
 
 Ipc::StrandSearchRequest::StrandSearchRequest(const TypedMsgHdr &hdrMsg):
-    requestorId(-1), data(NULL)
+    requestorId(-1)
 {
     hdrMsg.checkType(mtStrandSearchRequest);
     hdrMsg.getPod(requestorId);
-    hdrMsg.getPod(data);
     hdrMsg.getString(tag);
 }
 
@@ -29,30 +28,25 @@ void Ipc::StrandSearchRequest::pack(TypedMsgHdr &hdrMsg) const
 {
     hdrMsg.setType(mtStrandSearchRequest);
     hdrMsg.putPod(requestorId);
-    hdrMsg.putPod(data);
     hdrMsg.putString(tag);
 }
 
 
 /* StrandSearchResponse */
 
-Ipc::StrandSearchResponse::StrandSearchResponse(void *const aData,
-    const Ipc::StrandCoord &aStrand):
-    data(aData), strand(aStrand)
+Ipc::StrandSearchResponse::StrandSearchResponse(const Ipc::StrandCoord &aStrand):
+    strand(aStrand)
 {
 }
 
-Ipc::StrandSearchResponse::StrandSearchResponse(const TypedMsgHdr &hdrMsg):
-    data(NULL)
+Ipc::StrandSearchResponse::StrandSearchResponse(const TypedMsgHdr &hdrMsg)
 {
     hdrMsg.checkType(mtStrandSearchResponse);
-    hdrMsg.getPod(data);
     strand.unpack(hdrMsg);
 }
 
 void Ipc::StrandSearchResponse::pack(TypedMsgHdr &hdrMsg) const
 {
     hdrMsg.setType(mtStrandSearchResponse);
-    hdrMsg.putPod(data);
     strand.pack(hdrMsg);
 }
index 2bd1d81e79c9d534ef2b3f7c2046f9e3db2cb8c5..913f66198702a49b7217a507cf3838205b14a0aa 100644 (file)
@@ -24,7 +24,6 @@ public:
 
 public:
     int requestorId; ///< sender-provided return address
-    void *data; ///< sender-provided opaque pointer
     String tag; ///< set when looking for a matching StrandCoord::tag
 };
 
@@ -32,12 +31,11 @@ public:
 class StrandSearchResponse
 {
 public:
-    StrandSearchResponse(void *const aData, const StrandCoord &strand);
+    StrandSearchResponse(const StrandCoord &strand);
     explicit StrandSearchResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
     void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
 
 public:
-    void *data; ///< a copy of the StrandSearchRequest::data
     StrandCoord strand; ///< answer matching StrandSearchRequest criteria
 };