CBDATA_CLASS_INIT(IpcIoFile);
+IpcIoFile::DiskerQueue::Owner *IpcIoFile::diskerQueueOwner = NULL;
IpcIoFile::DiskerQueue *IpcIoFile::diskerQueue = NULL;
const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
{
ioRequestor = callback;
Must(diskId < 0); // we do not know our disker yet
- Must(!diskerQueue && !workerQueue);
+ Must(!diskerQueueOwner && !diskerQueue && !workerQueue);
if (IamDiskProcess()) {
error_ = !DiskerOpen(dbName, flags, mode);
return;
// XXX: make capacity configurable
- diskerQueue =
- new DiskerQueue(dbName, Config.workers, sizeof(IpcIoMsg), 1024);
+ diskerQueueOwner =
+ DiskerQueue::Init(dbName, Config.workers, sizeof(IpcIoMsg), 1024);
+ diskerQueue = new DiskerQueue(dbName);
diskId = KidIdentifier;
const bool inserted =
IpcIoFiles.insert(std::make_pair(diskId, this)).second;
{
assert(ioRequestor != NULL);
+ delete diskerQueueOwner;
+ diskerQueueOwner = NULL;
delete diskerQueue;
+ diskerQueue = NULL;
delete workerQueue;
+ workerQueue = NULL;
if (IamDiskProcess())
DiskerClose(dbName);
const String dbName; ///< the name of the file we are managing
int diskId; ///< the process ID of the disker we talk to
+ static DiskerQueue::Owner *diskerQueueOwner; ///< IPC queue owner for disker
static DiskerQueue *diskerQueue; ///< IPC queue for disker
WorkerQueue *workerQueue; ///< IPC queue for worker
RefCount<IORequestor> ioRequestor;
Server.h \
structs.h \
swap_log_op.h \
- SwapDir.cc MemStore.cc MemStoreMap.cc \
- SwapDir.h MemStore.h MemStoreMap.h \
+ SwapDir.cc MemStore.cc \
+ SwapDir.h MemStore.h \
time.cc \
tools.cc \
tunnel.cc \
StoreSwapLogData.cc \
tools.cc \
tunnel.cc \
- SwapDir.cc MemStore.cc MemStoreMap.cc \
+ SwapDir.cc MemStore.cc \
url.cc \
URLScheme.cc \
urn.cc \
time.cc \
tools.cc \
tunnel.cc \
- SwapDir.cc MemStore.cc MemStoreMap.cc \
+ SwapDir.cc MemStore.cc \
url.cc \
URLScheme.cc \
urn.cc \
time.cc \
tools.cc \
tunnel.cc \
- SwapDir.cc MemStore.cc MemStoreMap.cc \
+ SwapDir.cc MemStore.cc \
url.cc \
URLScheme.cc \
urn.cc \
event.cc \
tools.cc \
tunnel.cc \
- SwapDir.cc MemStore.cc MemStoreMap.cc \
+ SwapDir.cc MemStore.cc \
url.cc \
URLScheme.cc \
urn.cc \
event.cc \
tools.cc \
tunnel.cc \
- SwapDir.cc MemStore.cc MemStoreMap.cc \
+ SwapDir.cc MemStore.cc \
urn.cc \
wccp2.cc \
whois.cc \
// XXX: support storage using more than one page per entry
-void
-MemStore::Init()
-{
- const int64_t entryLimit = EntryLimit();
- if (entryLimit <= 0)
- return; // no memory cache configured or a misconfiguration
-
- MemStoreMap *map = new MemStoreMap(ShmLabel, entryLimit);
- delete map; // we just wanted to initialize shared memory segments
-}
-
MemStore::MemStore(): map(NULL), theCurrentSize(0)
{
}
{
public:
/* RegisteredRunner API */
+ MemStoreRr(): owner(NULL) {}
virtual void run(const RunnerRegistry &);
virtual ~MemStoreRr();
+
+private:
+ MemStoreMap::Owner *owner;
};
RunnerRegistrationEntry(rrAfterConfig, MemStoreRr);
if (!UsingSmp())
return;
- if (IamMasterProcess())
- MemStore::Init();
+ if (IamMasterProcess()) {
+ Must(!owner);
+ const int64_t entryLimit = MemStore::EntryLimit();
+ if (entryLimit <= 0)
+ return; // no memory cache configured or a misconfiguration
+ owner = MemStoreMap::Init(ShmLabel, entryLimit);
+ }
}
MemStoreRr::~MemStoreRr()
{
- if (!UsingSmp())
- return;
-
- if (IamMasterProcess())
- MemStoreMap::Unlink(ShmLabel);
+ delete owner;
}
#ifndef SQUID_MEMSTORE_H
#define SQUID_MEMSTORE_H
+#include "ipc/mem/Page.h"
+#include "ipc/StoreMap.h"
#include "Store.h"
-#include "MemStoreMap.h"
+
+// StoreEntry restoration info not already stored by Ipc::StoreMap
+struct MemStoreMapExtras {
+ Ipc::Mem::PageId page; ///< shared memory page with the entry content
+ int64_t storedSize; ///< total size of the stored entry content
+};
+typedef Ipc::StoreMapWithExtras<MemStoreMapExtras> MemStoreMap;
/// Stores HTTP entities in RAM. Current implementation uses shared memory.
/// Unlike a disk store (SwapDir), operations are synchronous (and fast).
virtual void maintain();
virtual void updateSize(int64_t size, int sign);
- /// initializes shared memory segments before they are used by workers
- static void Init();
+ static int64_t EntryLimit();
protected:
bool willFit(int64_t needed);
// Ipc::StoreMapCleaner API
virtual void cleanReadable(const sfileno fileno);
- static int64_t EntryLimit();
-
private:
MemStoreMap *map; ///< index of mem-cached entries
uint64_t theCurrentSize; ///< currently used space in the storage area
+++ /dev/null
-/*
- * $Id$
- *
- * DEBUG: section 20 Memory Cache
- */
-
-#include "config.h"
-
-#include "Store.h"
-#include "MemStoreMap.h"
-
-MemStoreMap::MemStoreMap(const char *const aPath, const int limit):
- Ipc::StoreMap(aPath, limit, Shared::MemSize(limit))
-{
- assert(shm.mem());
- shared = new (shm.reserve(Shared::MemSize(limit))) Shared;
-}
-
-MemStoreMap::MemStoreMap(const char *const aPath):
- Ipc::StoreMap(aPath)
-{
- const int limit = entryLimit();
- assert(shm.mem());
- shared = reinterpret_cast<Shared *>(shm.reserve(Shared::MemSize(limit)));
-}
-
-MemStoreMap::Extras &
-MemStoreMap::extras(const sfileno fileno)
-{
- assert(0 <= fileno && fileno < entryLimit());
- assert(shared);
- return shared->extras[fileno];
-}
-
-const MemStoreMap::Extras &
-MemStoreMap::extras(const sfileno fileno) const
-{
- assert(0 <= fileno && fileno < entryLimit());
- assert(shared);
- return shared->extras[fileno];
-}
-
-size_t
-MemStoreMap::Shared::MemSize(int limit)
-{
- return sizeof(Shared) + limit*sizeof(Extras);
-}
+++ /dev/null
-#ifndef SQUID_MEMSTOREMAP_H
-#define SQUID_MEMSTOREMAP_H
-
-#include "ipc/StoreMap.h"
-#include "ipc/mem/Page.h"
-
-/// map of MemStore-cached entries
-class MemStoreMap: public Ipc::StoreMap
-{
-public:
- // StoreEntry restoration info not already stored by Ipc::StoreMap
- class Extras {
- public:
- Ipc::Mem::PageId page; ///< shared memory page with the entry content
- int64_t storedSize; ///< total size of the stored entry content
- };
-
-public:
- MemStoreMap(const char *const aPath, const int limit); ///< create a new shared StoreMap
- MemStoreMap(const char *const aPath); ///< open an existing shared StoreMap
-
- /// write access to the extras; call openForWriting() first!
- Extras &extras(const sfileno fileno);
- /// read-only access to the extras; call openForReading() first!
- const Extras &extras(const sfileno fileno) const;
-
-private:
- /// data shared by all MemStoreMaps with the same path
- class Shared {
- public:
- static size_t MemSize(int limit);
- Extras extras[]; ///< extras storage
- };
-
- Shared *shared; ///< pointer to shared memory
-};
-
-#endif /* SQUID_MEMSTOREMAP_H */
librock_la_SOURCES = \
rock/RockCommon.cc \
rock/RockCommon.h \
- rock/RockDirMap.cc \
- rock/RockDirMap.h \
rock/RockFile.cc \
rock/RockFile.h \
rock/RockIoState.cc \
+++ /dev/null
-/*
- * $Id$
- *
- * DEBUG: section 79 Disk IO Routines
- */
-
-#include "squid.h"
-
-#include "Store.h"
-#include "fs/rock/RockDirMap.h"
-
-Rock::DirMap::DirMap(const char *const aPath, const int limit):
- Ipc::StoreMap(aPath, limit, Shared::MemSize(limit))
-{
- assert(shm.mem());
- shared = new (shm.reserve(Shared::MemSize(limit))) Shared;
-}
-
-Rock::DirMap::DirMap(const char *const aPath):
- Ipc::StoreMap(aPath)
-{
- const int limit = entryLimit();
- assert(shm.mem());
- shared = reinterpret_cast<Shared *>(shm.reserve(Shared::MemSize(limit)));
-}
-
-Rock::DbCellHeader &
-Rock::DirMap::header(const sfileno fileno)
-{
- assert(0 <= fileno && fileno < entryLimit());
- assert(shared);
- return shared->headers[fileno];
-}
-
-const Rock::DbCellHeader &
-Rock::DirMap::header(const sfileno fileno) const
-{
- assert(0 <= fileno && fileno < entryLimit());
- assert(shared);
- return shared->headers[fileno];
-}
-
-int
-Rock::DirMap::AbsoluteEntryLimit()
-{
- const int sfilenoMax = 0xFFFFFF; // Core sfileno maximum
- return sfilenoMax;
-}
-
-size_t
-Rock::DirMap::Shared::MemSize(int limit)
-{
- return sizeof(Shared) + limit*sizeof(DbCellHeader);
-}
+++ /dev/null
-#ifndef SQUID_FS_ROCK_DIR_MAP_H
-#define SQUID_FS_ROCK_DIR_MAP_H
-
-#include "fs/rock/RockFile.h"
-#include "ipc/StoreMap.h"
-
-namespace Rock {
-
-/// \ingroup Rock
-/// map of used db slots indexed by sfileno
-class DirMap: public Ipc::StoreMap
-{
-public:
- DirMap(const char *const aPath, const int limit); ///< create a new shared DirMap
- DirMap(const char *const aPath); ///< open an existing shared DirMap
-
- /// write access to the cell header; call openForWriting() first!
- DbCellHeader &header(const sfileno fileno);
- /// read-only access to the cell header; call openForReading() first!
- const DbCellHeader &header(const sfileno fileno) const;
-
- static int AbsoluteEntryLimit(); ///< maximum entryLimit() possible
-
-private:
- /// data shared by all DirMaps with the same path
- class Shared {
- public:
- static size_t MemSize(int limit);
- DbCellHeader headers[]; ///< DbCellHeaders storage
- };
-
- Shared *shared; ///< pointer to shared memory
-};
-
-} // namespace Rock
-
-// We do not reuse struct _fileMap because we cannot control its size,
-// resulting in sfilenos that are pointing beyond the database.
-
-/*
- * Rock::DirMap does not implement Ipc::StoreMapCleaner API because we want
- * to avoid extra I/O necessary to mark the disk slot empty. We may create some
- * stale responses if Squid quits, but should save a lot of I/O in the common
- * cases. TODO: Consider cleaning on-disk slots on exit; always scheduling
- * but cancelling/merging cleanup I/O; scheduling cleanup I/O after a
- * configurable delay; etc.
- */
-
-#endif /* SQUID_FS_ROCK_DIR_MAP_H */
// must be divisible by 1024 due to cur_size and max_size KB madness
const int64_t Rock::SwapDir::HeaderSize = 16*1024;
-Rock::SwapDir::SwapDir(): ::SwapDir("rock"), filePath(NULL), io(NULL), map(NULL)
+Rock::SwapDir::SwapDir(): ::SwapDir("rock"), filePath(NULL), io(NULL),
+ mapOwner(NULL), map(NULL)
{
}
Rock::SwapDir::~SwapDir()
{
delete io;
+ delete mapOwner;
delete map;
safe_free(filePath);
}
const int64_t eLimitLo = 0; // dynamic shrinking unsupported
const int64_t eWanted = (maximumSize() - HeaderSize)/max_objsize;
const int64_t eAllowed = min(max(eLimitLo, eWanted), eLimitHi);
- map = new DirMap(path, eAllowed);
+ Must(!mapOwner);
+ mapOwner = DirMap::Init(path, eAllowed);
+ map = new DirMap(path);
}
const char *ioModule = UsingSmp() ? "IpcIo" : "Blocking";
assert(diskOffsetLimit() <= maximumSize());
// warn if maximum db size is not reachable due to sfileno limit
- if (map->entryLimit() == map->AbsoluteEntryLimit() &&
- totalWaste > roundingWasteMx) {
+ if (map->entryLimit() == eLimitHi && totalWaste > roundingWasteMx) {
debugs(47, 0, "Rock store cache_dir[" << index << "]:");
debugs(47, 0, "\tmaximum number of entries: " << map->entryLimit());
debugs(47, 0, "\tmaximum entry size: " << max_objsize << " bytes");
if (Ipc::StoreMapSlot *slot = map->openForWriting(reinterpret_cast<const cache_key *>(from.key), newLocation)) {
if (fileno == newLocation) {
slot->set(from);
- map->header(fileno) = header;
+ map->extras(fileno) = header;
// core will not updateSize: we do not add the entry to store_table
updateSize(from.swap_file_sz, +1);
} // else some other, newer entry got into our cell
}
e.swap_file_sz = header.payloadSize; // and will be copied to the map
slot->set(e);
- map->header(fileno) = header;
+ map->extras(fileno) = header;
// XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno.
// If that does not happen, the entry will not decrement the read level!
sio->swap_dirn = index;
sio->swap_filen = e.swap_filen;
- sio->payloadEnd = sizeof(DbCellHeader) + map->header(e.swap_filen).payloadSize;
+ sio->payloadEnd = sizeof(DbCellHeader) + map->extras(e.swap_filen).payloadSize;
assert(sio->payloadEnd <= max_objsize); // the payload fits the slot
debugs(47,5, HERE << "dir " << index << " has old fileno: " <<
#include "SwapDir.h"
#include "DiskIO/IORequestor.h"
-#include "fs/rock/RockDirMap.h"
+#include "fs/rock/RockFile.h"
+#include "ipc/StoreMap.h"
class DiskIOStrategy;
class DiskFile;
const char *filePath; ///< location of cache storage file inside path/
private:
+ typedef Ipc::StoreMapWithExtras<DbCellHeader> DirMap;
+
DiskIOStrategy *io;
RefCount<DiskFile> theFile; ///< cache storage for this cache_dir
+ DirMap::Owner *mapOwner;
DirMap *map;
static const int64_t HeaderSize; ///< on-disk db header size
mem/Pages.h \
mem/PageStack.cc \
mem/PageStack.h \
+ mem/Pointer.h \
mem/Segment.cc \
mem/Segment.h
debugs(54, 7, HERE << "constructed " << id);
}
+/* QueueReaders */
+
+QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
+{
+ Must(theCapacity > 0);
+ new (theReaders) QueueReader[theCapacity];
+}
+
+size_t
+QueueReaders::sharedMemorySize() const
+{
+ return SharedMemorySize(theCapacity);
+}
+
+size_t
+QueueReaders::SharedMemorySize(const int capacity)
+{
+ return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
+}
+
// OneToOneUniQueue
-OneToOneUniQueue::OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity):
- shm(id.termedBuf()), reader_(NULL)
+OneToOneUniQueue::Owner *
+OneToOneUniQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity)
{
- const int sharedSize = Items2Bytes(maxItemSize, capacity);
- shm.create(sharedSize);
- shared = new (shm.reserve(sharedSize)) Shared(maxItemSize, capacity);
+ Must(maxItemSize > 0);
+ Must(capacity > 0);
+ return shm_new(Shared)(id.termedBuf(), maxItemSize, capacity);
}
-OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf()),
- reader_(NULL)
+OneToOneUniQueue::OneToOneUniQueue(const String &id):
+ shared(shm_old(Shared)(id.termedBuf())), reader_(NULL)
{
- shm.open();
- shared = reinterpret_cast<Shared *>(shm.mem());
- assert(shared);
- const int sharedSize =
- Items2Bytes(shared->theMaxItemSize, shared->theCapacity);
- assert(shared == reinterpret_cast<Shared *>(shm.reserve(sharedSize)));
}
void
return sizeof(Shared) + maxItemSize * size;
}
+QueueReader &
+OneToOneUniQueue::reader()
+{
+ Must(reader_);
+ return *reader_;
+}
+
OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity):
theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
theCapacity(aCapacity)
{
}
-QueueReader &
-OneToOneUniQueue::reader()
+size_t
+OneToOneUniQueue::Shared::sharedMemorySize() const
{
- Must(reader_);
- return *reader_;
+ return SharedMemorySize(theMaxItemSize, theCapacity);
+}
+
+size_t
+OneToOneUniQueue::Shared::SharedMemorySize(const unsigned int maxItemSize, const int capacity)
+{
+ return Items2Bytes(maxItemSize, capacity);
}
// OneToOneBiQueue
-OneToOneBiQueue::OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity):
- popQueue(new OneToOneUniQueue(QueueId(id, 1), maxItemSize, capacity)),
- pushQueue(new OneToOneUniQueue(QueueId(id, 2), maxItemSize, capacity))
+OneToOneBiQueue::Owner *
+OneToOneBiQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity)
{
+ UniQueueOwner owner1(OneToOneUniQueue::Init(QueueId(id, Side1), maxItemSize, capacity));
+ UniQueueOwner owner2(OneToOneUniQueue::Init(QueueId(id, Side2), maxItemSize, capacity));
+ Owner *const owner = new Owner;
+ owner->first = owner1;
+ owner->second = owner2;
+ return owner;
}
-OneToOneBiQueue::OneToOneBiQueue(const String &id):
- popQueue(new OneToOneUniQueue(QueueId(id, 2))),
- pushQueue(new OneToOneUniQueue(QueueId(id, 1)))
+OneToOneBiQueue::OneToOneBiQueue(const String &id, const Side side)
{
+ OneToOneUniQueue *const queue1 = new OneToOneUniQueue(QueueId(id, Side1));
+ OneToOneUniQueue *const queue2 = new OneToOneUniQueue(QueueId(id, Side2));
+ switch (side) {
+ case Side1:
+ popQueue.reset(queue1);
+ pushQueue.reset(queue2);
+ break;
+ case Side2:
+ popQueue.reset(queue2);
+ pushQueue.reset(queue1);
+ break;
+ default:
+ Must(false);
+ }
}
void
// FewToOneBiQueue
-FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity):
- theLastPopWorker(0), theWorkerCount(aWorkerCount),
- shm(ReaderId(id).termedBuf()),
- reader(NULL)
+FewToOneBiQueue::Owner *
+FewToOneBiQueue::Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity)
+{
+ return new Owner(id, workerCount, maxItemSize, capacity);
+}
+
+FewToOneBiQueue::FewToOneBiQueue(const String &id):
+ theLastPopWorker(0),
+ readers(shm_old(QueueReaders)(ReaderId(id).termedBuf())),
+ reader(readers->theReaders)
{
- // create a new segment for the local and remote queue readers
- // TODO: all our queues and readers should use a single segment
- shm.create((theWorkerCount+1)*sizeof(QueueReader));
- reader = new (shm.reserve(sizeof(QueueReader))) QueueReader;
+ Must(readers->theCapacity > 1);
+
debugs(54, 7, HERE << "disker " << id << " reader: " << reader->id);
- assert(theWorkerCount >= 0);
- biQueues.reserve(theWorkerCount);
- for (int i = 0; i < theWorkerCount; ++i) {
- OneToOneBiQueue *const biQueue =
- new OneToOneBiQueue(QueueId(id, i + WorkerIdOffset), maxItemSize, capacity);
- QueueReader *remoteReader =
- new (shm.reserve(sizeof(QueueReader))) QueueReader;
+ biQueues.reserve(workerCount());
+ for (int i = 0; i < workerCount(); ++i) {
+ OneToOneBiQueue *const biQueue = new OneToOneBiQueue(QueueId(id, i + WorkerIdOffset), OneToOneBiQueue::Side1);
+ QueueReader *const remoteReader = readers->theReaders + i + 1;
biQueue->readers(reader, remoteReader);
biQueues.push_back(biQueue);
}
OneToOneBiQueue *
FewToOneBiQueue::Attach(const String &id, const int workerId)
{
- // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker?
- Ipc::Mem::Segment *shmPtr = new Ipc::Mem::Segment(ReaderId(id).termedBuf());
-
- Ipc::Mem::Segment &shm = *shmPtr;
- shm.open();
- assert(shm.size() >= static_cast<off_t>((1 + workerId+1 - WorkerIdOffset)*sizeof(QueueReader)));
- QueueReader *readers = reinterpret_cast<QueueReader*>(shm.mem());
- QueueReader *remoteReader = &readers[0];
+ Ipc::Mem::Pointer<QueueReaders> readers = shm_old(QueueReaders)(ReaderId(id).termedBuf());
+ Must(workerId >= WorkerIdOffset);
+ Must(workerId < readers->theCapacity - 1 + WorkerIdOffset);
+ QueueReader *const remoteReader = readers->theReaders;
debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id);
- QueueReader *localReader = &readers[workerId+1 - WorkerIdOffset];
+ QueueReader *const localReader =
+ readers->theReaders + workerId - WorkerIdOffset + 1;
debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id);
OneToOneBiQueue *const biQueue =
- new OneToOneBiQueue(QueueId(id, workerId));
+ new OneToOneBiQueue(QueueId(id, workerId), OneToOneBiQueue::Side2);
biQueue->readers(localReader, remoteReader);
+
+ // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker?
+ const Ipc::Mem::Pointer<QueueReaders> *const leakingReaders = new Ipc::Mem::Pointer<QueueReaders>(readers);
+ Must(leakingReaders); // silence unused variable warning
+
return biQueue;
}
FewToOneBiQueue::~FewToOneBiQueue()
{
- for (int i = 0; i < theWorkerCount; ++i)
+ for (int i = 0; i < workerCount(); ++i)
delete biQueues[i];
}
bool FewToOneBiQueue::validWorkerId(const int workerId) const
{
return WorkerIdOffset <= workerId &&
- workerId < WorkerIdOffset + theWorkerCount;
+ workerId < WorkerIdOffset + workerCount();
}
void
// we got a hint; we could reposition iteration to try popping from the
// workerId queue first; but it does not seem to help much and might
// introduce some bias so we do not do that for now:
- // theLastPopWorker = (workerId + theWorkerCount - 1) % theWorkerCount;
+ // theLastPopWorker = (workerId + workerCount() - 1) % workerCount();
+}
+
+FewToOneBiQueue::Owner::Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity):
+ readersOwner(shm_new(QueueReaders)(ReaderId(id).termedBuf(), workerCount + 1))
+{
+ biQueueOwners.reserve(workerCount);
+ for (int i = 0; i < workerCount; ++i) {
+ OneToOneBiQueue::Owner *const queueOwner = OneToOneBiQueue::Init(QueueId(id, i + WorkerIdOffset), maxItemSize, capacity);
+ biQueueOwners.push_back(queueOwner);
+ }
+}
+
+FewToOneBiQueue::Owner::~Owner()
+{
+ for (size_t i = 0; i < biQueueOwners.size(); ++i)
+ delete biQueueOwners[i];
+ delete readersOwner;
}
#include "Array.h"
#include "base/InstanceId.h"
#include "ipc/AtomicWord.h"
-#include "ipc/mem/Segment.h"
+#include "ipc/mem/Pointer.h"
#include "util.h"
+#include <memory>
+
class String;
/// State of the reading end of a queue (i.e., of the code calling pop()).
const InstanceId<QueueReader> id;
};
+/// shared array of QueueReaders
+class QueueReaders {
+public:
+ QueueReaders(const int aCapacity);
+ size_t sharedMemorySize() const;
+ static size_t SharedMemorySize(const int capacity);
+
+ const int theCapacity; /// number of readers
+ QueueReader theReaders[]; /// readers
+};
/**
* Lockless fixed-capacity queue for a single writer and a single reader.
* different value). We can add support for blocked writers if needed.
*/
class OneToOneUniQueue {
+private:
+ struct Shared {
+ Shared(const unsigned int aMaxItemSize, const int aCapacity);
+ size_t sharedMemorySize() const;
+ static size_t SharedMemorySize(const unsigned int maxItemSize, const int capacity);
+
+ unsigned int theIn; ///< input index, used only in push()
+ unsigned int theOut; ///< output index, used only in pop()
+
+ AtomicWord theSize; ///< number of items in the queue
+ const unsigned int theMaxItemSize; ///< maximum item size
+ const int theCapacity; ///< maximum number of items, i.e. theBuffer size
+
+ char theBuffer[];
+ };
+
public:
// pop() and push() exceptions; TODO: use TextException instead
class Full {};
class ItemTooLarge {};
- OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity);
+ typedef Ipc::Mem::Owner<Shared> Owner;
+
+ /// initialize shared memory
+ static Owner *Init(const String &id, const unsigned int maxItemSize, const int capacity);
+
OneToOneUniQueue(const String &id);
unsigned int maxItemSize() const { return shared->theMaxItemSize; }
void reader(QueueReader *aReader);
private:
- struct Shared {
- Shared(const unsigned int aMaxItemSize, const int aCapacity);
-
- unsigned int theIn; ///< input index, used only in push()
- unsigned int theOut; ///< output index, used only in pop()
-
- AtomicWord theSize; ///< number of items in the queue
- const unsigned int theMaxItemSize; ///< maximum item size
- const int theCapacity; ///< maximum number of items, i.e. theBuffer size
-
- char theBuffer[];
- };
- Ipc::Mem::Segment shm; ///< shared memory segment
- Shared *shared; ///< pointer to shared memory
+ Ipc::Mem::Pointer<Shared> shared; ///< pointer to shared memory
QueueReader *reader_; ///< the state of the code popping from this queue
};
/// Lockless fixed-capacity bidirectional queue for two processes.
class OneToOneBiQueue {
+private:
+ typedef std::auto_ptr<OneToOneUniQueue::Owner> UniQueueOwner;
+
public:
typedef OneToOneUniQueue::Full Full;
typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
- /// Create a new shared queue.
- OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity);
- OneToOneBiQueue(const String &id); ///< Attach to existing shared queue.
+ typedef std::pair<UniQueueOwner, UniQueueOwner> Owner;
+
+ /// initialize shared memory
+ static Owner *Init(const String &id, const unsigned int maxItemSize, const int capacity);
+
+ enum Side { Side1 = 1, Side2 = 2 };
+ OneToOneBiQueue(const String &id, const Side side);
void readers(QueueReader *r1, QueueReader *r2);
void clearReaderSignal();
template<class Value> bool push(const Value &value) { return pushQueue->push(value); }
//private:
- OneToOneUniQueue *const popQueue; ///< queue to pop from for this process
- OneToOneUniQueue *const pushQueue; ///< queue to push to for this process
+ std::auto_ptr<OneToOneUniQueue> popQueue; ///< queue to pop from for this process
+ std::auto_ptr<OneToOneUniQueue> pushQueue; ///< queue to push to for this process
};
/**
typedef OneToOneBiQueue::Full Full;
typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge;
- FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity);
+ class Owner {
+ public:
+ Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity);
+ ~Owner();
+
+ private:
+ Vector<OneToOneBiQueue::Owner *> biQueueOwners;
+ Ipc::Mem::Owner<QueueReaders> *const readersOwner;
+ };
+
+ static Owner *Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity);
+
+ FewToOneBiQueue(const String &id);
static OneToOneBiQueue *Attach(const String &id, const int workerId);
~FewToOneBiQueue();
bool validWorkerId(const int workerId) const;
- int workerCount() const { return theWorkerCount; }
+ int workerCount() const { return readers->theCapacity - 1; }
/// clears the reader notification received by the disker from worker
void clearReaderSignal(int workerId);
//private: XXX: make private by moving pop/push debugging into pop/push
int theLastPopWorker; ///< the ID of the last worker we tried to pop() from
Vector<OneToOneBiQueue *> biQueues; ///< worker queues indexed by worker ID
- const int theWorkerCount; ///< the total number of workers
- Ipc::Mem::Segment shm; ///< shared memory segment to store the reader
- QueueReader *reader; ///< the state of the code popping from all biQueues
+ const Ipc::Mem::Pointer<QueueReaders> readers; ///< readers array
+ QueueReader *const reader; ///< the state of the code popping from all biQueues
enum { WorkerIdOffset = 1 }; ///< worker ID offset, always 1 for now
};
FewToOneBiQueue::pop(int &workerId, Value &value)
{
// iterate all workers, starting after the one we visited last
- for (int i = 0; i < theWorkerCount; ++i) {
- theLastPopWorker = (theLastPopWorker + 1) % theWorkerCount;
+ for (int i = 0; i < workerCount(); ++i) {
+ theLastPopWorker = (theLastPopWorker + 1) % workerCount();
if (biQueues[theLastPopWorker]->pop(value)) {
workerId = theLastPopWorker + WorkerIdOffset;
return true;
#include "Store.h"
#include "ipc/StoreMap.h"
-Ipc::StoreMap::StoreMap(const char *const aPath, const int limit,
- size_t sharedSizeExtra):
- cleaner(NULL), path(aPath), shm(aPath), shared(NULL)
+Ipc::StoreMap::Owner *
+Ipc::StoreMap::Init(const char *const path, const int limit, const size_t extrasSize)
{
assert(limit > 0); // we should not be created otherwise
- const size_t sharedSize = Shared::MemSize(limit);
- shm.create(sharedSize + sharedSizeExtra);
- shared = new (shm.reserve(sharedSize)) Shared(limit);
+ assert(extrasSize >= 0);
+ Owner *const owner = shm_new(Shared)(path, limit, extrasSize);
debugs(54, 5, HERE << "new map [" << path << "] created: " << limit);
+ return owner;
}
-Ipc::StoreMap::StoreMap(const char *const aPath):
- cleaner(NULL), path(aPath), shm(aPath), shared(NULL)
+Ipc::StoreMap::Owner *
+Ipc::StoreMap::Init(const char *const path, const int limit)
{
- shm.open();
- assert(shm.mem());
- shared = reinterpret_cast<Shared *>(shm.mem());
- assert(shared->limit > 0); // we should not be created otherwise
- // now that the limit is checked, check that nobody used our segment chunk
- const size_t sharedSize = Shared::MemSize(shared->limit);
- assert(shared == reinterpret_cast<Shared *>(shm.reserve(sharedSize)));
- debugs(54, 5, HERE << "attached map [" << path << "] created: " << shared->limit);
+ return Init(path, limit, 0);
}
-void
-Ipc::StoreMap::Unlink(const char *const path)
+Ipc::StoreMap::StoreMap(const char *const aPath): cleaner(NULL), path(aPath),
+ shared(shm_old(Shared)(aPath))
{
- Mem::Segment::Unlink(path);
+ assert(shared->limit > 0); // we should not be created otherwise
+ debugs(54, 5, HERE << "attached map [" << path << "] created: " <<
+ shared->limit);
}
Ipc::StoreMap::Slot *
/* Ipc::StoreMap::Shared */
-Ipc::StoreMap::Shared::Shared(const int aLimit): limit(aLimit), count(0)
+Ipc::StoreMap::Shared::Shared(const int aLimit, const size_t anExtrasSize):
+ limit(aLimit), extrasSize(anExtrasSize), count(0)
+{
+}
+
+size_t
+Ipc::StoreMap::Shared::sharedMemorySize() const
{
+ return SharedMemorySize(limit, extrasSize);
}
size_t
-Ipc::StoreMap::Shared::MemSize(int limit)
+Ipc::StoreMap::Shared::SharedMemorySize(const int limit, const size_t extrasSize)
{
- return sizeof(Shared) + limit * sizeof(Slot);
+ return sizeof(Shared) + limit * (sizeof(Slot) + extrasSize);
}
#define SQUID_IPC_STORE_MAP_H
#include "ipc/ReadWriteLock.h"
-#include "ipc/mem/Segment.h"
+#include "ipc/mem/Pointer.h"
#include "typedefs.h"
namespace Ipc {
public:
typedef StoreMapSlot Slot;
- StoreMap(const char *const aPath, const int limit, size_t sharedSizeExtra); ///< create a new shared StoreMap
- explicit StoreMap(const char *const aPath); ///< open an existing shared StoreMap
- static void Unlink(const char *const path); ///< unlink shared memory segment
+private:
+ struct Shared
+ {
+ Shared(const int aLimit, const size_t anExtrasSize);
+ size_t sharedMemorySize() const;
+ static size_t SharedMemorySize(const int limit, const size_t anExtrasSize);
+
+ const int limit; ///< maximum number of map slots
+ const size_t extrasSize; ///< size of slot extra data
+ AtomicWord count; ///< current number of map slots
+ Slot slots[]; ///< slots storage
+ };
+
+public:
+ typedef Mem::Owner<Shared> Owner;
+
+ /// initialize shared memory
+ static Owner *Init(const char *const path, const int limit);
+
+ StoreMap(const char *const aPath);
/// finds, reservers space for writing a new entry or returns nil
Slot *openForWriting(const cache_key *const key, sfileno &fileno);
void abortIo(const sfileno fileno);
bool full() const; ///< there are no empty slots left
- bool valid(int n) const; ///< whether n is a valid slot coordinate
+ bool valid(const int n) const; ///< whether n is a valid slot coordinate
int entryCount() const; ///< number of used slots
int entryLimit() const; ///< maximum number of slots that can be used
StoreMapCleaner *cleaner; ///< notified before a readable entry is freed
protected:
- class Shared {
- public:
- static size_t MemSize(int limit);
+ static Owner *Init(const char *const path, const int limit, const size_t extrasSize);
- Shared(const int aLimit);
-
- const AtomicWord limit; ///< maximum number of map slots
- AtomicWord count; ///< current number of map slots
-
- Slot slots[]; ///< slots storage
- };
-
-protected:
const String path; ///< cache_dir path, used for logging
- Ipc::Mem::Segment shm; ///< shared memory segment
+ Mem::Pointer<Shared> shared;
private:
int slotIndexByKey(const cache_key *const key) const;
void abortWriting(const sfileno fileno);
void freeIfNeeded(Slot &s);
void freeLocked(Slot &s, bool keepLocked);
- String sharedMemoryName();
+};
- Shared *shared; ///< pointer to shared memory
+/// StoreMap with extra slot data
+/// Note: ExtrasT must be POD, it is initialized with zeroes, no
+/// constructors or destructors are called
+template <class ExtrasT>
+class StoreMapWithExtras: public StoreMap
+{
+public:
+ typedef ExtrasT Extras;
+
+ /// initialize shared memory
+ static Owner *Init(const char *const path, const int limit);
+
+ StoreMapWithExtras(const char *const path);
+
+ /// write access to the extras; call openForWriting() first!
+ ExtrasT &extras(const sfileno fileno);
+ /// read-only access to the extras; call openForReading() first!
+ const ExtrasT &extras(const sfileno fileno) const;
+
+protected:
+
+ ExtrasT *sharedExtras; ///< pointer to extras in shared memory
};
/// API for adjusting external state when dirty map slot is being freed
virtual void cleanReadable(const sfileno fileno) = 0;
};
+// StoreMapWithExtras implementation
+
+template <class ExtrasT>
+StoreMap::Owner *
+StoreMapWithExtras<ExtrasT>::Init(const char *const path, const int limit)
+{
+ return StoreMap::Init(path, limit, sizeof(Extras));
+}
+
+template <class ExtrasT>
+StoreMapWithExtras<ExtrasT>::StoreMapWithExtras(const char *const path):
+ StoreMap(path)
+{
+ const size_t sharedSizeWithoutExtras =
+ Shared::SharedMemorySize(entryLimit(), 0);
+ sharedExtras = reinterpret_cast<Extras *>(reinterpret_cast<char *>(shared.getRaw()) + sharedSizeWithoutExtras);
+}
+
+template <class ExtrasT>
+ExtrasT &
+StoreMapWithExtras<ExtrasT>::extras(const sfileno fileno)
+{
+ return const_cast<ExtrasT &>(const_cast<const StoreMapWithExtras *>(this)->extras(fileno));
+}
+
+template <class ExtrasT>
+const ExtrasT &
+StoreMapWithExtras<ExtrasT>::extras(const sfileno fileno) const
+{
+ assert(sharedExtras);
+ assert(valid(fileno));
+ return sharedExtras[fileno];
+}
+
} // namespace Ipc
#include "ipc/mem/PagePool.h"
-static String
-PageIndexId(String id)
-{
- id.append("-index");
- return id;
-}
-
-
// Ipc::Mem::PagePool
-Ipc::Mem::PagePool::PagePool(const String &id, const unsigned int capacity, const size_t pageSize):
- pageIndex(PageIndexId(id), capacity),
- shm(id.termedBuf())
-{
- const off_t sharedSize = Shared::MemSize(capacity, pageSize);
- shm.create(sharedSize);
- assert(shm.mem());
- shared = new (shm.reserve(sharedSize)) Shared(capacity, pageSize);
-}
-
-Ipc::Mem::PagePool::PagePool(const String &id):
- pageIndex(PageIndexId(id)), shm(id.termedBuf())
+Ipc::Mem::PagePool::Owner *
+Ipc::Mem::PagePool::Init(const char *const id, const unsigned int capacity, const size_t pageSize)
{
- shm.open();
- shared = reinterpret_cast<Shared *>(shm.mem());
- assert(shared);
- const off_t sharedSize =
- Shared::MemSize(shared->theCapacity, shared->thePageSize);
- assert(shared == reinterpret_cast<Shared *>(shm.reserve(sharedSize)));
-}
-
-void
-Ipc::Mem::PagePool::Unlink(const String &id)
-{
- PageStack::Unlink(PageIndexId(id));
- Segment::Unlink(id.termedBuf());
-}
-
-bool
-Ipc::Mem::PagePool::get(PageId &page)
-{
- if (pageIndex.pop(page.number)) {
- page.pool = shared->theId;
- return true;
- }
- return false;
+ static uint32_t LastPagePoolId = 0;
+ if (++LastPagePoolId == 0)
+ ++LastPagePoolId; // skip zero pool id
+ return shm_new(PageStack)(id, LastPagePoolId, capacity, pageSize);
}
-void
-Ipc::Mem::PagePool::put(PageId &page)
+Ipc::Mem::PagePool::PagePool(const char *const id):
+ pageIndex(shm_old(PageStack)(id))
{
- Must(pageIdIsValid(page));
- pageIndex.push(page.number);
- page = PageId();
+ const size_t pagesDataOffset =
+ pageIndex->sharedMemorySize() - capacity() * pageSize();
+ theBuf = reinterpret_cast<char *>(pageIndex.getRaw()) + pagesDataOffset;
}
void *
Ipc::Mem::PagePool::pagePointer(const PageId &page)
{
- Must(pageIdIsValid(page));
- return shared->theBuf + shared->thePageSize * (page.number - 1);
-}
-
-bool
-Ipc::Mem::PagePool::pageIdIsValid(const PageId &page) const
-{
- return page.pool == shared->theId &&
- 0 < page.number && page.number <= shared->theCapacity;
-}
-
-
-// Ipc::Mem::PagePool::Shared
-
-static unsigned int LastPagePoolId = 0;
-
-Ipc::Mem::PagePool::Shared::Shared(const unsigned int aCapacity, size_t aPageSize):
- theId(++LastPagePoolId), theCapacity(aCapacity), thePageSize(aPageSize)
-{
- if (LastPagePoolId + 1 == 0)
- ++LastPagePoolId; // skip zero pool id
-}
-
-off_t
-Ipc::Mem::PagePool::Shared::MemSize(const unsigned int capacity, const size_t pageSize)
-{
- return static_cast<off_t>(sizeof(Shared)) +
- static_cast<off_t>(pageSize) * capacity;
+ Must(pageIndex->pageIdIsValid(page));
+ return theBuf + pageSize() * (page.number - 1);
}
#define SQUID_IPC_MEM_PAGE_POOL_H
#include "ipc/mem/PageStack.h"
-#include "ipc/mem/Segment.h"
+#include "ipc/mem/Pointer.h"
namespace Ipc {
namespace Mem {
-class PageId;
-
/// Atomic container of shared memory pages. Implemented using a collection of
/// Segments, each with a PageStack index of free pages. All pools must be
/// created by a single process.
class PagePool {
public:
- /// creates a new shared page pool that can hold up to capacity pages of pageSize size
- PagePool(const String &id, const unsigned int capacity, const size_t pageSize);
- /// attaches to the identified shared page pool
- PagePool(const String &id);
- /// unlinks shared memory segments
- static void Unlink(const String &id);
+ typedef Ipc::Mem::Owner<PageStack> Owner;
+
+ static Owner *Init(const char *const id, const unsigned int capacity, const size_t pageSize);
- unsigned int capacity() const { return shared->theCapacity; }
+ PagePool(const char *const id);
+
+ unsigned int capacity() const { return pageIndex->capacity(); }
+ size_t pageSize() const { return pageIndex->pageSize(); }
/// lower bound for the number of free pages
- unsigned int size() const { return pageIndex.size(); }
- size_t pageSize() const { return shared->thePageSize; }
+ unsigned int size() const { return pageIndex->size(); }
/// sets page ID and returns true unless no free pages are found
- bool get(PageId &page);
+ bool get(PageId &page) { return pageIndex->pop(page); }
/// makes identified page available as a free page to future get() callers
- void put(PageId &page);
+ void put(PageId &page) { return pageIndex->push(page); }
/// converts page handler into a temporary writeable shared memory pointer
void *pagePointer(const PageId &page);
private:
- inline bool pageIdIsValid(const PageId &page) const;
-
- struct Shared {
- Shared(const unsigned int aCapacity, const size_t aPageSize);
-
- /// total shared memory size required to share
- static off_t MemSize(const unsigned int capacity, const size_t pageSize);
-
- const unsigned int theId; ///< pool id
- const unsigned int theCapacity; ///< number of pages in the pool
- const size_t thePageSize; ///< page size
-
- // TODO: add padding to make pages system page-aligned?
- char theBuf[]; ///< pages storage
- };
-
- PageStack pageIndex; ///< free pages index
- Segment shm; ///< shared memory segment to store metadata (and pages)
- Shared *shared; ///< our metadata and page storage, shared among all pool users
+ Ipc::Mem::Pointer<PageStack> pageIndex; ///< free pages index
+ char *theBuf; ///< pages storage
};
} // namespace Mem
#include "config.h"
#include "base/TextException.h"
+#include "ipc/mem/Page.h"
#include "ipc/mem/PageStack.h"
/// used to mark a stack slot available for storing free page offsets
const Ipc::Mem::PageStack::Value Writable = 0;
-Ipc::Mem::PageStack::PageStack(const String &id, const unsigned int capacity):
- shm(id.termedBuf())
-{
- const size_t sharedSize = Shared::MemSize(capacity);
- shm.create(sharedSize);
- assert(shm.mem());
- shared = new (shm.reserve(sharedSize)) Shared(capacity);
-}
-
-Ipc::Mem::PageStack::PageStack(const String &id): shm(id.termedBuf())
-{
- shm.open();
- shared = reinterpret_cast<Shared *>(shm.mem());
- assert(shared);
- const off_t sharedSize = Shared::MemSize(shared->theCapacity);
- assert(shared == reinterpret_cast<Shared *>(shm.reserve(sharedSize)));
-}
-
-void
-Ipc::Mem::PageStack::Unlink(const String &id)
+Ipc::Mem::PageStack::PageStack(const uint32_t aPoolId, const unsigned int aCapacity, const size_t aPageSize):
+ thePoolId(aPoolId), theCapacity(aCapacity), thePageSize(aPageSize),
+ theSize(theCapacity),
+ theLastReadable(prev(theSize)), theFirstWritable(next(theLastReadable))
{
- Segment::Unlink(id.termedBuf());
+ // initially, all pages are free
+ for (Offset i = 0; i < theSize; ++i)
+ theItems[i] = i + 1; // skip page number zero to keep numbers positive
}
/*
* approach is better? Same for push().
*/
bool
-Ipc::Mem::PageStack::pop(Value &value)
+Ipc::Mem::PageStack::pop(PageId &page)
{
// we may fail to dequeue, but be conservative to prevent long searches
- --shared->theSize;
+ --theSize;
// find a Readable slot, starting with theLastReadable and going left
- while (shared->theSize >= 0) {
- const Offset idx = shared->theLastReadable;
+ while (theSize >= 0) {
+ const Offset idx = theLastReadable;
// mark the slot at ids Writable while extracting its current value
- value = shared->theItems[idx].fetchAndAnd(0); // works if Writable is 0
+ const Value value = theItems[idx].fetchAndAnd(0); // works if Writable is 0
const bool popped = value != Writable;
// theItems[idx] is probably not Readable [any more]
// Whether we popped a Readable value or not, we should try going left
// to maintain the index (and make progress).
// We may fail if others already updated the index, but that is OK.
- shared->theLastReadable.swap_if(idx, shared->prev(idx)); // may fail or lie
+ theLastReadable.swap_if(idx, prev(idx)); // may fail or lie
if (popped) {
// the slot we emptied may already be filled, but that is OK
- shared->theFirstWritable = idx; // may lie
+ theFirstWritable = idx; // may lie
+ page.pool = thePoolId;
+ page.number = value;
return true;
}
// TODO: report suspiciously long loops
}
- ++shared->theSize;
+ ++theSize;
return false;
}
void
-Ipc::Mem::PageStack::push(const Value value)
+Ipc::Mem::PageStack::push(PageId &page)
{
- Must(value != Writable);
- Must(static_cast<Offset>(value) <= shared->theCapacity);
+ Must(pageIdIsValid(page));
// find a Writable slot, starting with theFirstWritable and going right
- while (shared->theSize < shared->theCapacity) {
- const Offset idx = shared->theFirstWritable;
- const bool pushed = shared->theItems[idx].swap_if(Writable, value);
+ while (theSize < theCapacity) {
+ const Offset idx = theFirstWritable;
+ const bool pushed = theItems[idx].swap_if(Writable, page.number);
// theItems[idx] is probably not Writable [any more];
- // Whether we pushed the value or not, we should try going right
+ // Whether we pushed the page number or not, we should try going right
// to maintain the index (and make progress).
// We may fail if others already updated the index, but that is OK.
- shared->theFirstWritable.swap_if(idx, shared->next(idx)); // may fail or lie
+ theFirstWritable.swap_if(idx, next(idx)); // may fail or lie
if (pushed) {
// the enqueued value may already by gone, but that is OK
- shared->theLastReadable = idx; // may lie
- ++shared->theSize;
+ theLastReadable = idx; // may lie
+ ++theSize;
+ page = PageId();
return;
}
// TODO: report suspiciously long loops
Must(false); // the number of pages cannot exceed theCapacity
}
-Ipc::Mem::PageStack::Shared::Shared(const unsigned int aCapacity):
- theCapacity(aCapacity), theSize(theCapacity),
- theLastReadable(prev(theSize)), theFirstWritable(next(theLastReadable))
+bool
+Ipc::Mem::PageStack::pageIdIsValid(const PageId &page) const
{
- // initially, all pages are free
- for (Offset i = 0; i < theSize; ++i)
- theItems[i] = i + 1; // skip page number zero to keep numbers positive
+ return page.pool == thePoolId && page.number != Writable &&
+ page.number <= capacity();
+}
+
+size_t
+Ipc::Mem::PageStack::sharedMemorySize() const
+{
+ return SharedMemorySize(thePoolId, theCapacity, thePageSize);
}
size_t
-Ipc::Mem::PageStack::Shared::MemSize(const unsigned int capacity)
+Ipc::Mem::PageStack::SharedMemorySize(const uint32_t, const unsigned int capacity, const size_t pageSize)
{
- return sizeof(Item) * capacity + sizeof(Shared);
+ return sizeof(PageStack) + capacity * (sizeof(Item) + pageSize);
}
#define SQUID_IPC_MEM_PAGE_STACK_H
#include "ipc/AtomicWord.h"
-#include "ipc/mem/Segment.h"
namespace Ipc {
namespace Mem {
+class PageId;
+
/// Atomic container of "free" page numbers inside a single SharedMemory space.
/// Assumptions: all page numbers are unique, positive, have an known maximum,
/// and can be temporary unavailable as long as they are never trully lost.
public:
typedef uint32_t Value; ///< stack item type (a free page number)
- /// creates a new shared stack that can hold up to capacity items
- PageStack(const String &id, const unsigned int capacity);
- /// attaches to the identified shared stack
- PageStack(const String &id);
- /// unlinks shared memory segment
- static void Unlink(const String &id);
+ PageStack(const uint32_t aPoolId, const unsigned int aCapacity, const size_t aPageSize);
+ unsigned int capacity() const { return theCapacity; }
+ size_t pageSize() const { return thePageSize; }
/// lower bound for the number of free pages
- unsigned int size() const { return max(0, shared->theSize.get()); }
+ unsigned int size() const { return max(0, theSize.get()); }
/// sets value and returns true unless no free page numbers are found
- bool pop(Value &value);
+ bool pop(PageId &page);
/// makes value available as a free page number to future pop() callers
- void push(const Value value);
+ void push(PageId &page);
+
+ bool pageIdIsValid(const PageId &page) const;
+
+ /// total shared memory size required to share
+ static size_t SharedMemorySize(const uint32_t aPoolId, const unsigned int capacity, const size_t pageSize);
+ size_t sharedMemorySize() const;
private:
/// stack index and size type (may temporary go negative)
typedef int Offset;
- struct Shared {
- Shared(const unsigned int aCapacity);
-
- /// total shared memory size required to share
- static size_t MemSize(const unsigned int capacity);
-
- // these help iterate the stack in search of a free spot or a page
- Offset next(const Offset idx) const { return (idx + 1) % theCapacity; }
- Offset prev(const Offset idx) const { return (theCapacity + idx - 1) % theCapacity; }
-
- const Offset theCapacity; ///< stack capacity, i.e. theItems size
- /// lower bound for the number of free pages (may get negative!)
- AtomicWordT<Offset> theSize;
+ // these help iterate the stack in search of a free spot or a page
+ Offset next(const Offset idx) const { return (idx + 1) % theCapacity; }
+ Offset prev(const Offset idx) const { return (theCapacity + idx - 1) % theCapacity; }
- /// last readable item index; just a hint, not a guarantee
- AtomicWordT<Offset> theLastReadable;
- /// first writable item index; just a hint, not a guarantee
- AtomicWordT<Offset> theFirstWritable;
+ const uint32_t thePoolId; ///< pool ID
+ const Offset theCapacity; ///< stack capacity, i.e. theItems size
+ const size_t thePageSize; ///< page size, used to calculate shared memory size
+ /// lower bound for the number of free pages (may get negative!)
+ AtomicWordT<Offset> theSize;
- typedef AtomicWordT<Value> Item;
- Item theItems[]; ///< page number storage
- };
+ /// last readable item index; just a hint, not a guarantee
+ AtomicWordT<Offset> theLastReadable;
+ /// first writable item index; just a hint, not a guarantee
+ AtomicWordT<Offset> theFirstWritable;
- Segment shm; ///< shared memory segment to store metadata (and pages)
- Shared *shared; ///< our metadata, shared among all stack users
+ typedef AtomicWordT<Value> Item;
+ Item theItems[]; ///< page number storage
};
} // namespace Mem
{
public:
/* RegisteredRunner API */
+ SharedMemPagesRr(): owner(NULL) {}
virtual void run(const RunnerRegistry &);
virtual ~SharedMemPagesRr();
+
+private:
+ Ipc::Mem::PagePool::Owner *owner;
};
RunnerRegistrationEntry(rrAfterConfig, SharedMemPagesRr);
return;
}
- Must(!ThePagePool);
if (IamMasterProcess()) {
+ Must(!owner);
const size_t capacity = Ipc::Mem::Limit() / Ipc::Mem::PageSize();
- ThePagePool =
- new Ipc::Mem::PagePool(PagePoolId, capacity, Ipc::Mem::PageSize());
- } else
- ThePagePool = new Ipc::Mem::PagePool(PagePoolId);
+ owner = Ipc::Mem::PagePool::Init(PagePoolId, capacity, Ipc::Mem::PageSize());
+ }
+
+ Must(!ThePagePool);
+ ThePagePool = new Ipc::Mem::PagePool(PagePoolId);
}
SharedMemPagesRr::~SharedMemPagesRr()
delete ThePagePool;
ThePagePool = NULL;
- if (IamMasterProcess())
- Ipc::Mem::PagePool::Unlink(PagePoolId);
+ delete owner;
}
--- /dev/null
+/*
+ * $Id$
+ *
+ */
+
+#ifndef SQUID_IPC_MEM_POINTER_H
+#define SQUID_IPC_MEM_POINTER_H
+
+#include "base/TextException.h"
+#include "ipc/mem/Segment.h"
+#include "RefCount.h"
+
+namespace Ipc {
+
+namespace Mem {
+
+/// allocates/deallocates shared memory; creates and later destroys a
+/// Class object using that memory
+template <class Class>
+class Owner
+{
+public:
+ static Owner *New(const char *const id);
+ template <class P1>
+ static Owner *New(const char *const id, const P1 &p1);
+ template <class P1, class P2>
+ static Owner *New(const char *const id, const P1 &p1, const P2 &p2);
+ template <class P1, class P2, class P3>
+ static Owner *New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3);
+
+ ~Owner();
+
+private:
+ Owner(const char *const id, const off_t sharedSize);
+
+ // not implemented
+ Owner(const Owner &);
+ Owner &operator =(const Owner &);
+
+ Segment theSegment; ///< shared memory segment that holds the object
+ Class *theObject; ///< shared object
+};
+
+template <class Class> class Pointer;
+
+/// attaches to a shared memory segment with Class object owned by Owner
+template <class Class>
+class Object: public RefCountable
+{
+public:
+ static Pointer<Class> Old(const char *const id);
+
+private:
+ explicit Object(const char *const id);
+
+ // not implemented
+ Object(const Object &);
+ Object &operator =(const Object &);
+
+ Segment theSegment; ///< shared memory segment that holds the object
+ Class *theObject; ///< shared object
+
+ friend class Pointer<Class>;
+};
+
+/// uses a refcounted pointer to Object<Class> as a parent, but
+/// translates its API to return raw Class pointers
+template <class Class>
+class Pointer: public RefCount< Object<Class> >
+{
+private:
+ typedef RefCount< Object<Class> > Base;
+
+public:
+ explicit Pointer(Object<Class> *const anObject): Base(anObject) {}
+
+ Class *operator ->() const { return Base::operator ->()->theObject; }
+ Class &operator *() const { return *Base::operator *().theObject; }
+ Class *const getRaw() const { return Base::getRaw()->theObject; }
+ Class *getRaw() { return Base::getRaw()->theObject; }
+};
+
+// Owner implementation
+
+template <class Class>
+Owner<Class>::Owner(const char *const id, const off_t sharedSize):
+ theSegment(id), theObject(NULL)
+{
+ theSegment.create(sharedSize);
+ Must(theSegment.mem());
+}
+
+template <class Class>
+Owner<Class>::~Owner()
+{
+ if (theObject)
+ theObject->~Class();
+}
+
+template <class Class>
+Owner<Class> *
+Owner<Class>::New(const char *const id)
+{
+ const off_t sharedSize = Class::SharedMemorySize();
+ Owner *const owner = new Owner(id, sharedSize);
+ owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class;
+ return owner;
+}
+
+template <class Class> template <class P1>
+Owner<Class> *
+Owner<Class>::New(const char *const id, const P1 &p1)
+{
+ const off_t sharedSize = Class::SharedMemorySize(p1);
+ Owner *const owner = new Owner(id, sharedSize);
+ owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class(p1);
+ return owner;
+}
+
+template <class Class> template <class P1, class P2>
+Owner<Class> *
+Owner<Class>::New(const char *const id, const P1 &p1, const P2 &p2)
+{
+ const off_t sharedSize = Class::SharedMemorySize(p1, p2);
+ Owner *const owner = new Owner(id, sharedSize);
+ owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class(p1, p2);
+ return owner;
+}
+
+template <class Class> template <class P1, class P2, class P3>
+Owner<Class> *
+Owner<Class>::New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3)
+{
+ const off_t sharedSize = Class::SharedMemorySize(p1, p2, p3);
+ Owner *const owner = new Owner(id, sharedSize);
+ owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class(p1, p2, p3);
+ return owner;
+}
+
+// Object implementation
+
+template <class Class>
+Object<Class>::Object(const char *const id): theSegment(id)
+{
+ theSegment.open();
+ Must(theSegment.mem());
+ theObject = reinterpret_cast<Class*>(theSegment.mem());
+ Must(static_cast<off_t>(theObject->sharedMemorySize()) == theSegment.size());
+}
+
+template <class Class>
+Pointer<Class>
+Object<Class>::Old(const char *const id)
+{
+ return Pointer<Class>(new Object(id));
+}
+
+// convenience macros for creating shared objects
+#define shm_new(Class) Ipc::Mem::Owner<Class>::New
+#define shm_old(Class) Ipc::Mem::Object<Class>::Old
+
+} // namespace Mem
+
+} // namespace Ipc
+
+#endif /* SQUID_IPC_MEM_POINTER_H */
Ipc::Mem::Segment::Segment(const char *const id):
theName(GenerateName(id)), theFD(-1), theMem(NULL),
- theSize(0), theReserved(0)
+ theSize(0), theReserved(0), doUnlink(false)
{
}
if (close(theFD) != 0)
debugs(54, 5, HERE << "close " << theName << ": " << xstrerror());
}
+ if (doUnlink)
+ unlink();
}
void
theSize = aSize;
theReserved = 0;
+ doUnlink = true;
debugs(54, 3, HERE << "created " << theName << " segment: " << theSize);
theMem = 0;
}
+void
+Ipc::Mem::Segment::unlink()
+{
+ if (shm_unlink(theName.termedBuf()) != 0)
+ debugs(54, 5, HERE << "shm_unlink(" << theName << "): " << xstrerror());
+ else
+ debugs(54, 3, HERE << "unlinked " << theName << " segment");
+}
+
void *
Ipc::Mem::Segment::reserve(size_t chunkSize)
{
return result;
}
-void
-Ipc::Mem::Segment::Unlink(const char *const id)
-{
- const String path = GenerateName(id);
- if (shm_unlink(path.termedBuf()) != 0)
- debugs(54, 5, HERE << "shm_unlink(" << path << "): " << xstrerror());
-}
-
/// determines the size of the underlying "file"
off_t
Ipc::Mem::Segment::statSize(const char *context) const
~Segment();
/// Create a new shared memory segment. Fails if a segment with
- /// the same name already exists.
+ /// the same name already exists. Unlinks the segment on destruction.
void create(const off_t aSize);
void open(); ///< Open an existing shared memory segment.
void *mem() { return reserve(0); } ///< pointer to the next chunk
void *reserve(size_t chunkSize); ///< reserve and return the next chunk
- static void Unlink(const char *const id); ///< unlink the segment
private:
void attach();
void detach();
+ void unlink(); ///< unlink the segment
off_t statSize(const char *context) const;
static String GenerateName(const char *id);
+ // not implemented
+ Segment(const Segment &);
+ Segment &operator =(const Segment &);
+
const String theName; ///< shared memory segment file name
int theFD; ///< shared memory segment file descriptor
void *theMem; ///< pointer to mmapped shared memory segment
off_t theSize; ///< shared memory segment size
off_t theReserved; ///< the total number of reserve()d bytes
+ bool doUnlink; ///< whether the segment should be unlinked on destruction
};
} // namespace Mem
#include "MemPool.h"
#include "icmp/IcmpSquid.h"
#include "icmp/net_db.h"
-#include "fs/rock/RockDirMap.h"
#if USE_LOADABLE_MODULES
#include "LoadableModules.h"