*/
#include "config.h"
+#include "base/RunnersRegistry.h"
#include "base/TextException.h"
#include "DiskIO/IORequestor.h"
#include "DiskIO/IpcIo/IpcIoFile.h"
#include "DiskIO/WriteRequest.h"
#include "ipc/Messages.h"
#include "ipc/Port.h"
+#include "ipc/Queue.h"
#include "ipc/StrandSearch.h"
#include "ipc/UdsOp.h"
#include "ipc/mem/Pages.h"
CBDATA_CLASS_INIT(IpcIoFile);
-IpcIoFile::DiskerQueue::Owner *IpcIoFile::diskerQueueOwner = NULL;
-IpcIoFile::DiskerQueue *IpcIoFile::diskerQueue = NULL;
+/// shared memory segment path to use for IpcIoFile maps
+static const char *const ShmLabel = "io_file";
+
const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
+std::auto_ptr<IpcIoFile::Queue> IpcIoFile::queue;
static bool DiskerOpen(const String &path, int flags, mode_t mode);
static void DiskerClose(const String &path);
IpcIoFile::IpcIoFile(char const *aDb):
- dbName(aDb), diskId(-1), workerQueue(NULL), error_(false), lastRequestId(0),
+ dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
olderRequests(&requestMap1), newerRequests(&requestMap2),
timeoutCheckScheduled(false)
{
{
ioRequestor = callback;
Must(diskId < 0); // we do not know our disker yet
- Must(!diskerQueueOwner && !diskerQueue && !workerQueue);
+
+ if (!queue.get())
+ queue.reset(new Queue(ShmLabel, IamWorkerProcess() ? Queue::groupA : Queue::groupB, KidIdentifier));
if (IamDiskProcess()) {
error_ = !DiskerOpen(dbName, flags, mode);
if (error_)
return;
- // XXX: make capacity configurable
- diskerQueueOwner =
- DiskerQueue::Init(dbName, Config.workers, sizeof(IpcIoMsg), 1024);
- diskerQueue = new DiskerQueue(dbName);
diskId = KidIdentifier;
const bool inserted =
IpcIoFiles.insert(std::make_pair(diskId, this)).second;
void
IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) {
Must(diskId < 0); // we do not know our disker yet
- Must(!workerQueue);
if (!response) {
debugs(79,1, HERE << "error: timeout");
} else {
diskId = response->strand.kidId;
if (diskId >= 0) {
- workerQueue = DiskerQueue::Attach(dbName, KidIdentifier);
const bool inserted =
IpcIoFiles.insert(std::make_pair(diskId, this)).second;
Must(inserted);
{
assert(ioRequestor != NULL);
- delete diskerQueueOwner;
- diskerQueueOwner = NULL;
- delete diskerQueue;
- diskerQueue = NULL;
- delete workerQueue;
- workerQueue = NULL;
-
if (IamDiskProcess())
DiskerClose(dbName);
// XXX: else nothing to do?
IpcIoFile::push(IpcIoPendingRequest *const pending)
{
// prevent queue overflows: check for responses to earlier requests
- handleResponses("before push");
+ HandleResponses("before push");
debugs(47, 7, HERE);
Must(diskId >= 0);
- Must(workerQueue);
Must(pending);
Must(pending->readRequest || pending->writeRequest);
memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
}
- debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->pushQueue->size());
+ debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId));
- if (workerQueue->push(ipcIo))
+ if (queue->push(diskId, ipcIo))
Notify(diskId); // must notify disker
trackPendingRequest(pending);
- } catch (const WorkerQueue::Full &) {
+ } catch (const Queue::Full &) {
debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " <<
SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
// TODO: grow queue size
}
void
-IpcIoFile::handleNotification()
-{
- debugs(47, 4, HERE << "notified");
- workerQueue->clearReaderSignal();
- handleResponses("after notification");
-}
-
-void
-IpcIoFile::handleResponses(const char *when)
+IpcIoFile::HandleResponses(const char *const when)
{
debugs(47, 4, HERE << "popping all " << when);
- Must(workerQueue);
IpcIoMsg ipcIo;
// get all responses we can: since we are not pushing, this will stop
- while (workerQueue->pop(ipcIo))
- handleResponse(ipcIo);
+ int diskId;
+ while (queue->pop(diskId, ipcIo)) {
+ const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
+ Must(i != IpcIoFiles.end()); // TODO: warn but continue
+ i->second->handleResponse(ipcIo);
+ }
}
void
{
const int requestId = ipcIo.requestId;
debugs(47, 7, HERE << "popped disker response: " <<
- SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->popQueue->size());
+ SipcIo(KidIdentifier, ipcIo, diskId));
Must(requestId);
if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
{
const int from = msg.getInt();
debugs(47, 7, HERE << "from " << from);
- if (IamDiskProcess()) {
- const int workerId = from;
- DiskerHandleRequests(workerId);
- } else {
- const int diskId = from;
- const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
- Must(i != IpcIoFiles.end()); // TODO: warn but continue
- i->second->handleNotification();
- }
+ queue->clearReaderSignal(from);
+ if (IamDiskProcess())
+ DiskerHandleRequests();
+ else
+ HandleResponses("after notification");
}
/// handles open request timeout
}
void
-IpcIoFile::DiskerHandleRequests(const int workerWhoNotified)
+IpcIoFile::DiskerHandleRequests()
{
- Must(diskerQueue);
- diskerQueue->clearReaderSignal(workerWhoNotified);
-
int workerId = 0;
IpcIoMsg ipcIo;
- while (diskerQueue->pop(workerId, ipcIo))
+ while (queue->pop(workerId, ipcIo))
DiskerHandleRequest(workerId, ipcIo);
// TODO: If the loop keeps on looping, we probably should take a break
void
IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
{
- Must(diskerQueue);
-
if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
debugs(0,0, HERE << "disker" << KidIdentifier <<
" should not receive " << ipcIo.command <<
else // ipcIo.command == IpcIo::cmdWrite
diskerWrite(ipcIo);
- debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier) << " at " << diskerQueue->biQueues[workerId-1]->pushQueue->size());
+ debugs(47, 7, HERE << "pushing " << SipcIo(workerId, ipcIo, KidIdentifier));
try {
- if (diskerQueue->push(workerId, ipcIo))
+ if (queue->push(workerId, ipcIo))
Notify(workerId); // must notify worker
- } catch (const DiskerQueue::Full &) {
+ } catch (const Queue::Full &) {
// The worker queue should not overflow because the worker should pop()
// before push()ing and because if disker pops N requests at a time,
// we should make sure the worker pop() queue length is the worker
store_open_disk_fd--;
}
}
+
+
+/// initializes shared memory segments used by IpcIoFile
+class IpcIoRr: public RegisteredRunner
+{
+public:
+ /* RegisteredRunner API */
+ IpcIoRr(): owner(NULL) {}
+ virtual void run(const RunnerRegistry &);
+ virtual ~IpcIoRr();
+
+private:
+ Ipc::FewToFewBiQueue::Owner *owner;
+};
+
+RunnerRegistrationEntry(rrAfterConfig, IpcIoRr);
+
+
+void IpcIoRr::run(const RunnerRegistry &)
+{
+ if (!UsingSmp())
+ return;
+
+ if (IamMasterProcess()) {
+ Must(!owner);
+ // XXX: make capacity configurable
+ owner = Ipc::FewToFewBiQueue::Init(ShmLabel, Config.workers, 1, Config.cacheSwap.n_configured, 1 + Config.workers, sizeof(IpcIoMsg), 1024);
+ }
+}
+
+IpcIoRr::~IpcIoRr()
+{
+ delete owner;
+}
#include "DiskIO/DiskFile.h"
#include "DiskIO/IORequestor.h"
#include "ipc/forward.h"
-#include "ipc/Queue.h"
#include "ipc/mem/Page.h"
#include <list>
#include <map>
+#include <memory>
+
+namespace Ipc {
+class FewToFewBiQueue;
+} // Ipc
// TODO: expand to all classes
namespace IpcIo {
void checkTimeouts();
void scheduleTimeoutCheck();
- void handleNotification();
- void handleResponses(const char *when);
+ static void HandleResponses(const char *const when);
void handleResponse(IpcIoMsg &ipcIo);
- static void DiskerHandleRequests(const int workerId);
+ static void DiskerHandleRequests();
static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
private:
- typedef Ipc::FewToOneBiQueue DiskerQueue;
- typedef Ipc::OneToOneBiQueue WorkerQueue;
-
const String dbName; ///< the name of the file we are managing
int diskId; ///< the process ID of the disker we talk to
- static DiskerQueue::Owner *diskerQueueOwner; ///< IPC queue owner for disker
- static DiskerQueue *diskerQueue; ///< IPC queue for disker
- WorkerQueue *workerQueue; ///< IPC queue for worker
RefCount<IORequestor> ioRequestor;
bool error_; ///< whether we have seen at least one I/O error (XXX)
typedef std::map<int, IpcIoFile*> IpcIoFilesMap;
static IpcIoFilesMap IpcIoFiles;
+ typedef Ipc::FewToFewBiQueue Queue;
+ static std::auto_ptr<Queue> queue; ///< IPC queue
+
CBDATA_CLASS2(IpcIoFile);
};
#include "globals.h"
#include "ipc/Queue.h"
-/// constructs shared segment ID from parent queue ID and child queue index
+/// constructs Metadata ID from parent queue ID
static String
-QueueId(String id, const int idx)
+MetadataId(String id)
{
- id.append("__");
- id.append(xitoa(idx));
+ id.append("__metadata");
return id;
}
-/// constructs QueueReader ID from parent queue ID
+/// constructs one-to-one queues ID from parent queue ID
static String
-ReaderId(String id)
+QueuesId(String id)
+{
+ id.append("__queues");
+ return id;
+}
+
+/// constructs QueueReaders ID from parent queue ID
+static String
+ReadersId(String id)
{
id.append("__readers");
return id;
// OneToOneUniQueue
-Ipc::OneToOneUniQueue::Owner *
-Ipc::OneToOneUniQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity)
-{
- Must(maxItemSize > 0);
- Must(capacity > 0);
- return shm_new(Shared)(id.termedBuf(), maxItemSize, capacity);
-}
-
-Ipc::OneToOneUniQueue::OneToOneUniQueue(const String &id):
- shared(shm_old(Shared)(id.termedBuf())), reader_(NULL)
-{
-}
-
-void
-Ipc::OneToOneUniQueue::reader(QueueReader *aReader)
+Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
+ theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
+ theCapacity(aCapacity)
{
- Must(!reader_ && aReader);
- reader_ = aReader;
+ Must(theMaxItemSize > 0);
+ Must(theCapacity > 0);
}
int
Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
{
assert(maxItemSize > 0);
- size -= sizeof(Shared);
+ size -= sizeof(OneToOneUniQueue);
return size >= 0 ? size / maxItemSize : 0;
}
Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
{
assert(size >= 0);
- return sizeof(Shared) + maxItemSize * size;
+ return sizeof(OneToOneUniQueue) + maxItemSize * size;
}
-Ipc::QueueReader &
-Ipc::OneToOneUniQueue::reader()
-{
- Must(reader_);
- return *reader_;
-}
-Ipc::OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity):
- theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
- theCapacity(aCapacity)
-{
-}
+/* OneToOneUniQueues */
-size_t
-Ipc::OneToOneUniQueue::Shared::sharedMemorySize() const
+Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
{
- return SharedMemorySize(theMaxItemSize, theCapacity);
+ Must(theCapacity > 0);
+ for (int i = 0; i < theCapacity; ++i)
+ new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
}
size_t
-Ipc::OneToOneUniQueue::Shared::SharedMemorySize(const unsigned int maxItemSize, const int capacity)
+Ipc::OneToOneUniQueues::sharedMemorySize() const
{
- return Items2Bytes(maxItemSize, capacity);
+ return sizeof(*this) + theCapacity * front().sharedMemorySize();
}
-
-// OneToOneBiQueue
-
-Ipc::OneToOneBiQueue::Owner *
-Ipc::OneToOneBiQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity)
-{
- UniQueueOwner owner1(OneToOneUniQueue::Init(QueueId(id, Side1), maxItemSize, capacity));
- UniQueueOwner owner2(OneToOneUniQueue::Init(QueueId(id, Side2), maxItemSize, capacity));
- Owner *const owner = new Owner;
- owner->first = owner1;
- owner->second = owner2;
- return owner;
-}
-
-Ipc::OneToOneBiQueue::OneToOneBiQueue(const String &id, const Side side)
-{
- OneToOneUniQueue *const queue1 = new OneToOneUniQueue(QueueId(id, Side1));
- OneToOneUniQueue *const queue2 = new OneToOneUniQueue(QueueId(id, Side2));
- switch (side) {
- case Side1:
- popQueue.reset(queue1);
- pushQueue.reset(queue2);
- break;
- case Side2:
- popQueue.reset(queue2);
- pushQueue.reset(queue1);
- break;
- default:
- Must(false);
- }
-}
-
-void
-Ipc::OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2)
+size_t
+Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
{
- popQueue->reader(r1);
- pushQueue->reader(r2);
+ const int queueSize =
+ OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
+ return sizeof(OneToOneUniQueues) + queueSize * capacity;
}
-void
-Ipc::OneToOneBiQueue::clearReaderSignal()
+const Ipc::OneToOneUniQueue &
+Ipc::OneToOneUniQueues::operator [](const int index) const
{
- debugs(54, 7, HERE << "reader: " << &popQueue->reader());
- popQueue->reader().clearSignal();
+ Must(0 <= index && index < theCapacity);
+ const size_t queueSize = index ? front().sharedMemorySize() : 0;
+ const char *const queue =
+ reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
+ return *reinterpret_cast<const OneToOneUniQueue *>(queue);
}
-// FewToOneBiQueue
+// FewToFewBiQueue
-Ipc::FewToOneBiQueue::Owner *
-Ipc::FewToOneBiQueue::Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity)
+Ipc::FewToFewBiQueue::Owner *
+Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
{
- return new Owner(id, workerCount, maxItemSize, capacity);
+ return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
}
-Ipc::FewToOneBiQueue::FewToOneBiQueue(const String &id):
- theLastPopWorker(0),
- readers(shm_old(QueueReaders)(ReaderId(id).termedBuf())),
- reader(readers->theReaders)
+Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
+ metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
+ queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
+ readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
+ theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
+ theLastPopProcessId(readers->theCapacity)
{
- Must(readers->theCapacity > 1);
+ Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
+ Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
- debugs(54, 7, HERE << "disker " << id << " reader: " << reader->id);
-
- biQueues.reserve(workerCount());
- for (int i = 0; i < workerCount(); ++i) {
- OneToOneBiQueue *const biQueue = new OneToOneBiQueue(QueueId(id, i + WorkerIdOffset), OneToOneBiQueue::Side1);
- QueueReader *const remoteReader = readers->theReaders + i + 1;
- biQueue->readers(reader, remoteReader);
- biQueues.push_back(biQueue);
- }
+ const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
+ debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
}
-Ipc::OneToOneBiQueue *
-Ipc::FewToOneBiQueue::Attach(const String &id, const int workerId)
-{
- Mem::Pointer<QueueReaders> readers = shm_old(QueueReaders)(ReaderId(id).termedBuf());
- Must(workerId >= WorkerIdOffset);
- Must(workerId < readers->theCapacity - 1 + WorkerIdOffset);
- QueueReader *const remoteReader = readers->theReaders;
- debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id);
- QueueReader *const localReader =
- readers->theReaders + workerId - WorkerIdOffset + 1;
- debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id);
-
- OneToOneBiQueue *const biQueue =
- new OneToOneBiQueue(QueueId(id, workerId), OneToOneBiQueue::Side2);
- biQueue->readers(localReader, remoteReader);
-
- // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker?
- const Mem::Pointer<QueueReaders> *const leakingReaders = new Mem::Pointer<QueueReaders>(readers);
- Must(leakingReaders); // silence unused variable warning
-
- return biQueue;
-}
-
-Ipc::FewToOneBiQueue::~FewToOneBiQueue()
-{
- for (int i = 0; i < workerCount(); ++i)
- delete biQueues[i];
+bool
+Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
+{
+ switch (group) {
+ case groupA:
+ return metadata->theGroupAIdOffset <= processId &&
+ processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
+ case groupB:
+ return metadata->theGroupBIdOffset <= processId &&
+ processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
+ }
+ return false;
+}
+
+Ipc::OneToOneUniQueue &
+Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
+{
+ Must(fromGroup != toGroup);
+ Must(validProcessId(fromGroup, fromProcessId));
+ Must(validProcessId(toGroup, toProcessId));
+ int index1;
+ int index2;
+ int offset;
+ if (fromGroup == groupA) {
+ index1 = fromProcessId - metadata->theGroupAIdOffset;
+ index2 = toProcessId - metadata->theGroupBIdOffset;
+ offset = 0;
+ } else {
+ index1 = toProcessId - metadata->theGroupAIdOffset;
+ index2 = fromProcessId - metadata->theGroupBIdOffset;
+ offset = metadata->theGroupASize * metadata->theGroupBSize;
+ }
+ const int index = offset + index1 * metadata->theGroupBSize + index2;
+ return (*queues)[index];
}
-bool
-Ipc::FewToOneBiQueue::validWorkerId(const int workerId) const
+Ipc::QueueReader &
+Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
{
- return WorkerIdOffset <= workerId &&
- workerId < WorkerIdOffset + workerCount();
+ Must(validProcessId(group, processId));
+ const int index = group == groupA ?
+ processId - metadata->theGroupAIdOffset :
+ metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
+ return readers->theReaders[index];
}
void
-Ipc::FewToOneBiQueue::clearReaderSignal(int workerId)
+Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
{
- debugs(54, 7, HERE << "reader: " << reader->id);
+ QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
+ debugs(54, 7, HERE << "reader: " << localReader.id);
- assert(validWorkerId(workerId));
- reader->clearSignal();
+ Must(validProcessId(remoteGroup(), remoteProcessId));
+ localReader.clearSignal();
// we got a hint; we could reposition iteration to try popping from the
- // workerId queue first; but it does not seem to help much and might
+ // remoteProcessId queue first; but it does not seem to help much and might
// introduce some bias so we do not do that for now:
- // theLastPopWorker = (workerId + workerCount() - 1) % workerCount();
+ // theLastPopProcessId = remoteProcessId;
}
-Ipc::FewToOneBiQueue::Owner::Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity):
- readersOwner(shm_new(QueueReaders)(ReaderId(id).termedBuf(), workerCount + 1))
+Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
+ theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
+ theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
+{
+ Must(theGroupASize > 0);
+ Must(theGroupBSize > 0);
+}
+
+Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
+ metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
+ queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
+ readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
{
- biQueueOwners.reserve(workerCount);
- for (int i = 0; i < workerCount; ++i) {
- OneToOneBiQueue::Owner *const queueOwner = OneToOneBiQueue::Init(QueueId(id, i + WorkerIdOffset), maxItemSize, capacity);
- biQueueOwners.push_back(queueOwner);
- }
}
-Ipc::FewToOneBiQueue::Owner::~Owner()
+Ipc::FewToFewBiQueue::Owner::~Owner()
{
- for (size_t i = 0; i < biQueueOwners.size(); ++i)
- delete biQueueOwners[i];
+ delete metadataOwner;
+ delete queuesOwner;
delete readersOwner;
}
#include "ipc/mem/Pointer.h"
#include "util.h"
-#include <memory>
-
class String;
namespace Ipc {
* different value). We can add support for blocked writers if needed.
*/
class OneToOneUniQueue {
-private:
- struct Shared {
- Shared(const unsigned int aMaxItemSize, const int aCapacity);
- size_t sharedMemorySize() const;
- static size_t SharedMemorySize(const unsigned int maxItemSize, const int capacity);
-
- unsigned int theIn; ///< input index, used only in push()
- unsigned int theOut; ///< output index, used only in pop()
-
- AtomicWord theSize; ///< number of items in the queue
- const unsigned int theMaxItemSize; ///< maximum item size
- const int theCapacity; ///< maximum number of items, i.e. theBuffer size
-
- char theBuffer[];
- };
-
public:
// pop() and push() exceptions; TODO: use TextException instead
class Full {};
class ItemTooLarge {};
- typedef Mem::Owner<Shared> Owner;
-
- /// initialize shared memory
- static Owner *Init(const String &id, const unsigned int maxItemSize, const int capacity);
+ OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity);
- OneToOneUniQueue(const String &id);
+ unsigned int maxItemSize() const { return theMaxItemSize; }
+ int size() const { return theSize; }
+ int capacity() const { return theCapacity; }
+ int sharedMemorySize() const { return Items2Bytes(theMaxItemSize, theCapacity); }
- unsigned int maxItemSize() const { return shared->theMaxItemSize; }
- int size() const { return shared->theSize; }
- int capacity() const { return shared->theCapacity; }
-
- bool empty() const { return !shared->theSize; }
- bool full() const { return shared->theSize == shared->theCapacity; }
+ bool empty() const { return !theSize; }
+ bool full() const { return theSize == theCapacity; }
static int Bytes2Items(const unsigned int maxItemSize, int size);
static int Items2Bytes(const unsigned int maxItemSize, const int size);
/// returns true iff the value was set; [un]blocks the reader as needed
- template<class Value> bool pop(Value &value);
+ template<class Value> bool pop(Value &value, QueueReader *const reader = NULL);
/// returns true iff the caller must notify the reader of the pushed item
- template<class Value> bool push(const Value &value);
-
- QueueReader &reader();
- void reader(QueueReader *aReader);
+ template<class Value> bool push(const Value &value, QueueReader *const reader = NULL);
private:
- Mem::Pointer<Shared> shared; ///< pointer to shared memory
- QueueReader *reader_; ///< the state of the code popping from this queue
-};
-
-/// Lockless fixed-capacity bidirectional queue for two processes.
-class OneToOneBiQueue {
-private:
- typedef std::auto_ptr<OneToOneUniQueue::Owner> UniQueueOwner;
+ unsigned int theIn; ///< input index, used only in push()
+ unsigned int theOut; ///< output index, used only in pop()
-public:
- typedef OneToOneUniQueue::Full Full;
- typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
+ AtomicWord theSize; ///< number of items in the queue
+ const unsigned int theMaxItemSize; ///< maximum item size
+ const int theCapacity; ///< maximum number of items, i.e. theBuffer size
- typedef std::pair<UniQueueOwner, UniQueueOwner> Owner;
+ char theBuffer[];
+};
- /// initialize shared memory
- static Owner *Init(const String &id, const unsigned int maxItemSize, const int capacity);
+/// shared array of OneToOneUniQueues
+class OneToOneUniQueues {
+public:
+ OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity);
- enum Side { Side1 = 1, Side2 = 2 };
- OneToOneBiQueue(const String &id, const Side side);
+ size_t sharedMemorySize() const;
+ static size_t SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity);
- void readers(QueueReader *r1, QueueReader *r2);
- void clearReaderSignal();
+ const OneToOneUniQueue &operator [](const int index) const;
+ inline OneToOneUniQueue &operator [](const int index);
- /* wrappers to call the right OneToOneUniQueue method for this process */
- template<class Value> bool pop(Value &value) { return popQueue->pop(value); }
- template<class Value> bool push(const Value &value) { return pushQueue->push(value); }
+private:
+ inline const OneToOneUniQueue &front() const;
-//private:
- std::auto_ptr<OneToOneUniQueue> popQueue; ///< queue to pop from for this process
- std::auto_ptr<OneToOneUniQueue> pushQueue; ///< queue to push to for this process
+public:
+ const int theCapacity; /// number of OneToOneUniQueues
};
/**
* Lockless fixed-capacity bidirectional queue for a limited number
- * pricesses. Implements a star topology: Many worker processes
- * communicate with the one central process. The central process uses
- * FewToOneBiQueue object, while workers use OneToOneBiQueue objects
- * created with the Attach() method. Each worker has a unique integer
- * ID in [1, workerCount] range.
+ * processes. Allows communication between two groups of processes:
+ * any process in one group may send data to and receive from any
+ * process in another group, but processes in the same group can not
+ * communicate. Process in each group has a unique integer ID in
+ * [groupIdOffset, groupIdOffset + groupSize) range.
*/
-class FewToOneBiQueue {
+class FewToFewBiQueue {
public:
- typedef OneToOneBiQueue::Full Full;
- typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge;
+ typedef OneToOneUniQueue::Full Full;
+ typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
+
+private:
+ /// Shared metadata for FewToFewBiQueue
+ struct Metadata {
+ Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset);
+ size_t sharedMemorySize() const { return sizeof(*this); }
+ static size_t SharedMemorySize(const int, const int, const int, const int) { return sizeof(Metadata); }
+
+ const int theGroupASize;
+ const int theGroupAIdOffset;
+ const int theGroupBSize;
+ const int theGroupBIdOffset;
+ };
+public:
class Owner {
public:
- Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity);
+ Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
~Owner();
private:
- Vector<OneToOneBiQueue::Owner *> biQueueOwners;
+ Mem::Owner<Metadata> *const metadataOwner;
+ Mem::Owner<OneToOneUniQueues> *const queuesOwner;
Mem::Owner<QueueReaders> *const readersOwner;
};
- static Owner *Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity);
+ static Owner *Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity);
- FewToOneBiQueue(const String &id);
- static OneToOneBiQueue *Attach(const String &id, const int workerId);
- ~FewToOneBiQueue();
+ enum Group { groupA = 0, groupB = 1 };
+ FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId);
- bool validWorkerId(const int workerId) const;
- int workerCount() const { return readers->theCapacity - 1; }
+ Group localGroup() const { return theLocalGroup; }
+ Group remoteGroup() const { return theLocalGroup == groupA ? groupB : groupA; }
- /// clears the reader notification received by the disker from worker
- void clearReaderSignal(int workerId);
+ /// clears the reader notification received by the local process from the remote process
+ void clearReaderSignal(const int remoteProcessId);
- /// picks a worker and calls OneToOneUniQueue::pop() using its queue
- template <class Value> bool pop(int &workerId, Value &value);
+ /// picks a process and calls OneToOneUniQueue::pop() using its queue
+ template <class Value> bool pop(int &remoteProcessId, Value &value);
- /// calls OneToOneUniQueue::push() using the given worker queue
- template <class Value> bool push(const int workerId, const Value &value);
+ /// calls OneToOneUniQueue::push() using the given process queue
+ template <class Value> bool push(const int remoteProcessId, const Value &value);
-//private: XXX: make private by moving pop/push debugging into pop/push
- int theLastPopWorker; ///< the ID of the last worker we tried to pop() from
- Vector<OneToOneBiQueue *> biQueues; ///< worker queues indexed by worker ID
+private:
+ bool validProcessId(const Group group, const int processId) const;
+ OneToOneUniQueue &oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId);
+ QueueReader &reader(const Group group, const int processId);
+ int remoteGroupSize() const { return theLocalGroup == groupA ? metadata->theGroupBSize : metadata->theGroupASize; }
+ int remoteGroupIdOffset() const { return theLocalGroup == groupA ? metadata->theGroupBIdOffset : metadata->theGroupAIdOffset; }
+private:
+ const Mem::Pointer<Metadata> metadata; ///< shared metadata
+ const Mem::Pointer<OneToOneUniQueues> queues; ///< unidirection one-to-one queues
const Mem::Pointer<QueueReaders> readers; ///< readers array
- QueueReader *const reader; ///< the state of the code popping from all biQueues
- enum { WorkerIdOffset = 1 }; ///< worker ID offset, always 1 for now
+ const Group theLocalGroup; ///< group of this queue
+ const int theLocalProcessId; ///< process ID of this queue
+ int theLastPopProcessId; ///< the ID of the last process we tried to pop() from
};
template <class Value>
bool
-OneToOneUniQueue::pop(Value &value)
+OneToOneUniQueue::pop(Value &value, QueueReader *const reader)
{
- if (sizeof(value) > shared->theMaxItemSize)
+ if (sizeof(value) > theMaxItemSize)
throw ItemTooLarge();
// A writer might push between the empty test and block() below, so we do
// not return false right after calling block(), but test again.
if (empty()) {
- reader().block();
+ if (!reader)
+ return false;
+
+ reader->block();
// A writer might push between the empty test and block() below,
// so we must test again as such a writer will not signal us.
if (empty())
return false;
}
- reader().unblock();
- const unsigned int pos =
- (shared->theOut++ % shared->theCapacity) * shared->theMaxItemSize;
- memcpy(&value, shared->theBuffer + pos, sizeof(value));
- --shared->theSize;
+ if (reader)
+ reader->unblock();
+
+ const unsigned int pos = (theOut++ % theCapacity) * theMaxItemSize;
+ memcpy(&value, theBuffer + pos, sizeof(value));
+ --theSize;
return true;
}
template <class Value>
bool
-OneToOneUniQueue::push(const Value &value)
+OneToOneUniQueue::push(const Value &value, QueueReader *const reader)
{
- if (sizeof(value) > shared->theMaxItemSize)
+ if (sizeof(value) > theMaxItemSize)
throw ItemTooLarge();
if (full())
throw Full();
const bool wasEmpty = empty();
- const unsigned int pos =
- shared->theIn++ % shared->theCapacity * shared->theMaxItemSize;
- memcpy(shared->theBuffer + pos, &value, sizeof(value));
- ++shared->theSize;
+ const unsigned int pos = theIn++ % theCapacity * theMaxItemSize;
+ memcpy(theBuffer + pos, &value, sizeof(value));
+ ++theSize;
+
+ return wasEmpty && (!reader || reader->raiseSignal());
+}
+
- return wasEmpty && reader().raiseSignal();
+// OneToOneUniQueues
+
+inline OneToOneUniQueue &
+OneToOneUniQueues::operator [](const int index)
+{
+ return const_cast<OneToOneUniQueue &>((*const_cast<const OneToOneUniQueues *>(this))[index]);
+}
+
+inline const OneToOneUniQueue &
+OneToOneUniQueues::front() const
+{
+ const char *const queue =
+ reinterpret_cast<const char *>(this) + sizeof(*this);
+ return *reinterpret_cast<const OneToOneUniQueue *>(queue);
}
-// FewToOneBiQueue
+// FewToFewBiQueue
template <class Value>
bool
-FewToOneBiQueue::pop(int &workerId, Value &value)
+FewToFewBiQueue::pop(int &remoteProcessId, Value &value)
{
- // iterate all workers, starting after the one we visited last
- for (int i = 0; i < workerCount(); ++i) {
- theLastPopWorker = (theLastPopWorker + 1) % workerCount();
- if (biQueues[theLastPopWorker]->pop(value)) {
- workerId = theLastPopWorker + WorkerIdOffset;
+ // iterate all remote group processes, starting after the one we visited last
+ QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
+ for (int i = 0; i < remoteGroupSize(); ++i) {
+ if (++theLastPopProcessId >= remoteGroupIdOffset() + remoteGroupSize())
+ theLastPopProcessId = remoteGroupIdOffset();
+ OneToOneUniQueue &queue = oneToOneQueue(remoteGroup(), theLastPopProcessId, theLocalGroup, theLocalProcessId);
+ if (queue.pop(value, &localReader)) {
+ remoteProcessId = theLastPopProcessId;
+ debugs(54, 7, HERE << "popped from " << remoteProcessId << " to " << theLocalProcessId << " at " << queue.size());
return true;
}
}
- return false; // no worker had anything to pop
+ return false; // no process had anything to pop
}
template <class Value>
bool
-FewToOneBiQueue::push(const int workerId, const Value &value)
+FewToFewBiQueue::push(const int remoteProcessId, const Value &value)
{
- assert(validWorkerId(workerId));
- return biQueues[workerId - WorkerIdOffset]->push(value);
+ OneToOneUniQueue &remoteQueue = oneToOneQueue(theLocalGroup, theLocalProcessId, remoteGroup(), remoteProcessId);
+ QueueReader &remoteReader = reader(remoteGroup(), remoteProcessId);
+ debugs(54, 7, HERE << "pushing from " << theLocalProcessId << " to " << remoteProcessId << " at " << remoteQueue.size());
+ return remoteQueue.push(value, &remoteReader);
}
} // namespace Ipc
static Owner *New(const char *const id, const P1 &p1, const P2 &p2);
template <class P1, class P2, class P3>
static Owner *New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3);
+ template <class P1, class P2, class P3, class P4>
+ static Owner *New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3, const P4 &p4);
~Owner();
typedef RefCount< Object<Class> > Base;
public:
- explicit Pointer(Object<Class> *const anObject): Base(anObject) {}
+ explicit Pointer(Object<Class> *const anObject = NULL): Base(anObject) {}
Class *operator ->() const { return Base::operator ->()->theObject; }
Class &operator *() const { return *Base::operator *().theObject; }
return owner;
}
+template <class Class> template <class P1, class P2, class P3, class P4>
+Owner<Class> *
+Owner<Class>::New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3, const P4 &p4)
+{
+ const off_t sharedSize = Class::SharedMemorySize(p1, p2, p3, p4);
+ Owner *const owner = new Owner(id, sharedSize);
+ owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class(p1, p2, p3, p4);
+ return owner;
+}
+
// Object implementation
template <class Class>