From: Dmitry Kurochkin Date: Mon, 25 Apr 2011 15:14:10 +0000 (+0400) Subject: Rework shared object design and management API. X-Git-Tag: take07~40 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=68353d5a4eac7ec0c6afaa40e0bc36df8cd14176;p=thirdparty%2Fsquid.git Rework shared object design and management API. Before the patch, each shared object was responsible for allocating and deallocating shared memory it uses. As a result each object had a shared and non shared portion. Shared classes provided a pair of static methods for creating and attaching to existing shared segments. This is against how normal objects behave: normal objects are not responsible for managing memory they use, they use the memory they are given. Besides the old approach mixes shared memory management and object initialization logic. The patch tries to improve this. On the user side, the patch provides two functions for managing shared objects: * shm_new - allocates/deallocates shared memory, initializes the object * shm_old - gives refcounted access to the object created by shm_new Shm_new function returns so called Owner object. It is not used for working with the shared object, but to do shared memory allocation/deallocation and object initialization. This function will be typically used in Squid master process to allocate shared memory on startup. On exit, the Owner object is deleted and shared object is deallocated. Shm_old function returns a refcounted smart pointer to the shared object. It does not allocate shared memory or initialize the object, but just points to the object owned by the Owner. Smart pointer provides a simple way for working with the shared object. On the internal side, the patch removes shared memory allocation/deallocation from shared object class. There is no more local/shared parts. Shared object class implementation is now similar to an ordinary class. The additional requirements for "shared" classes are: the object must be a POD with no pointers to or references; provides a static SharedMemorySize method for shared memory size calculation; may need to use atomic primitives for safe updates of data members. All existing "shared" classes and code were converted to the new API. --- diff --git a/src/DiskIO/IpcIo/IpcIoFile.cc b/src/DiskIO/IpcIo/IpcIoFile.cc index ebf102738a..16009d481c 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.cc +++ b/src/DiskIO/IpcIo/IpcIoFile.cc @@ -17,6 +17,7 @@ CBDATA_CLASS_INIT(IpcIoFile); +IpcIoFile::DiskerQueue::Owner *IpcIoFile::diskerQueueOwner = NULL; IpcIoFile::DiskerQueue *IpcIoFile::diskerQueue = NULL; const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen; @@ -66,7 +67,7 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) { ioRequestor = callback; Must(diskId < 0); // we do not know our disker yet - Must(!diskerQueue && !workerQueue); + Must(!diskerQueueOwner && !diskerQueue && !workerQueue); if (IamDiskProcess()) { error_ = !DiskerOpen(dbName, flags, mode); @@ -74,8 +75,9 @@ IpcIoFile::open(int flags, mode_t mode, RefCount callback) return; // XXX: make capacity configurable - diskerQueue = - new DiskerQueue(dbName, Config.workers, sizeof(IpcIoMsg), 1024); + diskerQueueOwner = + DiskerQueue::Init(dbName, Config.workers, sizeof(IpcIoMsg), 1024); + diskerQueue = new DiskerQueue(dbName); diskId = KidIdentifier; const bool inserted = IpcIoFiles.insert(std::make_pair(diskId, this)).second; @@ -146,8 +148,12 @@ IpcIoFile::close() { assert(ioRequestor != NULL); + delete diskerQueueOwner; + diskerQueueOwner = NULL; delete diskerQueue; + diskerQueue = NULL; delete workerQueue; + workerQueue = NULL; if (IamDiskProcess()) DiskerClose(dbName); diff --git a/src/DiskIO/IpcIo/IpcIoFile.h b/src/DiskIO/IpcIo/IpcIoFile.h index 368c8235c4..de8e85b9f0 100644 --- a/src/DiskIO/IpcIo/IpcIoFile.h +++ b/src/DiskIO/IpcIo/IpcIoFile.h @@ -98,6 +98,7 @@ private: const String dbName; ///< the name of the file we are managing int diskId; ///< the process ID of the disker we talk to + static DiskerQueue::Owner *diskerQueueOwner; ///< IPC queue owner for disker static DiskerQueue *diskerQueue; ///< IPC queue for disker WorkerQueue *workerQueue; ///< IPC queue for worker RefCount ioRequestor; diff --git a/src/Makefile.am b/src/Makefile.am index e2014bd5e7..a69779baa8 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -466,8 +466,8 @@ squid_SOURCES = \ Server.h \ structs.h \ swap_log_op.h \ - SwapDir.cc MemStore.cc MemStoreMap.cc \ - SwapDir.h MemStore.h MemStoreMap.h \ + SwapDir.cc MemStore.cc \ + SwapDir.h MemStore.h \ time.cc \ tools.cc \ tunnel.cc \ @@ -1368,7 +1368,7 @@ tests_testCacheManager_SOURCES = \ StoreSwapLogData.cc \ tools.cc \ tunnel.cc \ - SwapDir.cc MemStore.cc MemStoreMap.cc \ + SwapDir.cc MemStore.cc \ url.cc \ URLScheme.cc \ urn.cc \ @@ -1675,7 +1675,7 @@ tests_testEvent_SOURCES = \ time.cc \ tools.cc \ tunnel.cc \ - SwapDir.cc MemStore.cc MemStoreMap.cc \ + SwapDir.cc MemStore.cc \ url.cc \ URLScheme.cc \ urn.cc \ @@ -1850,7 +1850,7 @@ tests_testEventLoop_SOURCES = \ time.cc \ tools.cc \ tunnel.cc \ - SwapDir.cc MemStore.cc MemStoreMap.cc \ + SwapDir.cc MemStore.cc \ url.cc \ URLScheme.cc \ urn.cc \ @@ -2192,7 +2192,7 @@ tests_testHttpRequest_SOURCES = \ event.cc \ tools.cc \ tunnel.cc \ - SwapDir.cc MemStore.cc MemStoreMap.cc \ + SwapDir.cc MemStore.cc \ url.cc \ URLScheme.cc \ urn.cc \ @@ -2931,7 +2931,7 @@ tests_testURL_SOURCES = \ event.cc \ tools.cc \ tunnel.cc \ - SwapDir.cc MemStore.cc MemStoreMap.cc \ + SwapDir.cc MemStore.cc \ urn.cc \ wccp2.cc \ whois.cc \ diff --git a/src/MemStore.cc b/src/MemStore.cc index 612bc5a78b..d0cb698671 100644 --- a/src/MemStore.cc +++ b/src/MemStore.cc @@ -18,17 +18,6 @@ static const char *ShmLabel = "cache_mem"; // XXX: support storage using more than one page per entry -void -MemStore::Init() -{ - const int64_t entryLimit = EntryLimit(); - if (entryLimit <= 0) - return; // no memory cache configured or a misconfiguration - - MemStoreMap *map = new MemStoreMap(ShmLabel, entryLimit); - delete map; // we just wanted to initialize shared memory segments -} - MemStore::MemStore(): map(NULL), theCurrentSize(0) { } @@ -363,8 +352,12 @@ class MemStoreRr: public RegisteredRunner { public: /* RegisteredRunner API */ + MemStoreRr(): owner(NULL) {} virtual void run(const RunnerRegistry &); virtual ~MemStoreRr(); + +private: + MemStoreMap::Owner *owner; }; RunnerRegistrationEntry(rrAfterConfig, MemStoreRr); @@ -375,15 +368,16 @@ void MemStoreRr::run(const RunnerRegistry &) if (!UsingSmp()) return; - if (IamMasterProcess()) - MemStore::Init(); + if (IamMasterProcess()) { + Must(!owner); + const int64_t entryLimit = MemStore::EntryLimit(); + if (entryLimit <= 0) + return; // no memory cache configured or a misconfiguration + owner = MemStoreMap::Init(ShmLabel, entryLimit); + } } MemStoreRr::~MemStoreRr() { - if (!UsingSmp()) - return; - - if (IamMasterProcess()) - MemStoreMap::Unlink(ShmLabel); + delete owner; } diff --git a/src/MemStore.h b/src/MemStore.h index f95875a9df..393f6a2e3d 100644 --- a/src/MemStore.h +++ b/src/MemStore.h @@ -5,8 +5,16 @@ #ifndef SQUID_MEMSTORE_H #define SQUID_MEMSTORE_H +#include "ipc/mem/Page.h" +#include "ipc/StoreMap.h" #include "Store.h" -#include "MemStoreMap.h" + +// StoreEntry restoration info not already stored by Ipc::StoreMap +struct MemStoreMapExtras { + Ipc::Mem::PageId page; ///< shared memory page with the entry content + int64_t storedSize; ///< total size of the stored entry content +}; +typedef Ipc::StoreMapWithExtras MemStoreMap; /// Stores HTTP entities in RAM. Current implementation uses shared memory. /// Unlike a disk store (SwapDir), operations are synchronous (and fast). @@ -35,8 +43,7 @@ public: virtual void maintain(); virtual void updateSize(int64_t size, int sign); - /// initializes shared memory segments before they are used by workers - static void Init(); + static int64_t EntryLimit(); protected: bool willFit(int64_t needed); @@ -48,8 +55,6 @@ protected: // Ipc::StoreMapCleaner API virtual void cleanReadable(const sfileno fileno); - static int64_t EntryLimit(); - private: MemStoreMap *map; ///< index of mem-cached entries uint64_t theCurrentSize; ///< currently used space in the storage area diff --git a/src/MemStoreMap.cc b/src/MemStoreMap.cc deleted file mode 100644 index 30e65bb6d4..0000000000 --- a/src/MemStoreMap.cc +++ /dev/null @@ -1,47 +0,0 @@ -/* - * $Id$ - * - * DEBUG: section 20 Memory Cache - */ - -#include "config.h" - -#include "Store.h" -#include "MemStoreMap.h" - -MemStoreMap::MemStoreMap(const char *const aPath, const int limit): - Ipc::StoreMap(aPath, limit, Shared::MemSize(limit)) -{ - assert(shm.mem()); - shared = new (shm.reserve(Shared::MemSize(limit))) Shared; -} - -MemStoreMap::MemStoreMap(const char *const aPath): - Ipc::StoreMap(aPath) -{ - const int limit = entryLimit(); - assert(shm.mem()); - shared = reinterpret_cast(shm.reserve(Shared::MemSize(limit))); -} - -MemStoreMap::Extras & -MemStoreMap::extras(const sfileno fileno) -{ - assert(0 <= fileno && fileno < entryLimit()); - assert(shared); - return shared->extras[fileno]; -} - -const MemStoreMap::Extras & -MemStoreMap::extras(const sfileno fileno) const -{ - assert(0 <= fileno && fileno < entryLimit()); - assert(shared); - return shared->extras[fileno]; -} - -size_t -MemStoreMap::Shared::MemSize(int limit) -{ - return sizeof(Shared) + limit*sizeof(Extras); -} diff --git a/src/MemStoreMap.h b/src/MemStoreMap.h deleted file mode 100644 index e9379cdb36..0000000000 --- a/src/MemStoreMap.h +++ /dev/null @@ -1,38 +0,0 @@ -#ifndef SQUID_MEMSTOREMAP_H -#define SQUID_MEMSTOREMAP_H - -#include "ipc/StoreMap.h" -#include "ipc/mem/Page.h" - -/// map of MemStore-cached entries -class MemStoreMap: public Ipc::StoreMap -{ -public: - // StoreEntry restoration info not already stored by Ipc::StoreMap - class Extras { - public: - Ipc::Mem::PageId page; ///< shared memory page with the entry content - int64_t storedSize; ///< total size of the stored entry content - }; - -public: - MemStoreMap(const char *const aPath, const int limit); ///< create a new shared StoreMap - MemStoreMap(const char *const aPath); ///< open an existing shared StoreMap - - /// write access to the extras; call openForWriting() first! - Extras &extras(const sfileno fileno); - /// read-only access to the extras; call openForReading() first! - const Extras &extras(const sfileno fileno) const; - -private: - /// data shared by all MemStoreMaps with the same path - class Shared { - public: - static size_t MemSize(int limit); - Extras extras[]; ///< extras storage - }; - - Shared *shared; ///< pointer to shared memory -}; - -#endif /* SQUID_MEMSTOREMAP_H */ diff --git a/src/fs/Makefile.am b/src/fs/Makefile.am index 243d0ed7d5..853408c7c7 100644 --- a/src/fs/Makefile.am +++ b/src/fs/Makefile.am @@ -31,8 +31,6 @@ libufs_la_SOURCES = \ librock_la_SOURCES = \ rock/RockCommon.cc \ rock/RockCommon.h \ - rock/RockDirMap.cc \ - rock/RockDirMap.h \ rock/RockFile.cc \ rock/RockFile.h \ rock/RockIoState.cc \ diff --git a/src/fs/rock/RockDirMap.cc b/src/fs/rock/RockDirMap.cc deleted file mode 100644 index 160afd73eb..0000000000 --- a/src/fs/rock/RockDirMap.cc +++ /dev/null @@ -1,54 +0,0 @@ -/* - * $Id$ - * - * DEBUG: section 79 Disk IO Routines - */ - -#include "squid.h" - -#include "Store.h" -#include "fs/rock/RockDirMap.h" - -Rock::DirMap::DirMap(const char *const aPath, const int limit): - Ipc::StoreMap(aPath, limit, Shared::MemSize(limit)) -{ - assert(shm.mem()); - shared = new (shm.reserve(Shared::MemSize(limit))) Shared; -} - -Rock::DirMap::DirMap(const char *const aPath): - Ipc::StoreMap(aPath) -{ - const int limit = entryLimit(); - assert(shm.mem()); - shared = reinterpret_cast(shm.reserve(Shared::MemSize(limit))); -} - -Rock::DbCellHeader & -Rock::DirMap::header(const sfileno fileno) -{ - assert(0 <= fileno && fileno < entryLimit()); - assert(shared); - return shared->headers[fileno]; -} - -const Rock::DbCellHeader & -Rock::DirMap::header(const sfileno fileno) const -{ - assert(0 <= fileno && fileno < entryLimit()); - assert(shared); - return shared->headers[fileno]; -} - -int -Rock::DirMap::AbsoluteEntryLimit() -{ - const int sfilenoMax = 0xFFFFFF; // Core sfileno maximum - return sfilenoMax; -} - -size_t -Rock::DirMap::Shared::MemSize(int limit) -{ - return sizeof(Shared) + limit*sizeof(DbCellHeader); -} diff --git a/src/fs/rock/RockDirMap.h b/src/fs/rock/RockDirMap.h deleted file mode 100644 index 9ca865316e..0000000000 --- a/src/fs/rock/RockDirMap.h +++ /dev/null @@ -1,49 +0,0 @@ -#ifndef SQUID_FS_ROCK_DIR_MAP_H -#define SQUID_FS_ROCK_DIR_MAP_H - -#include "fs/rock/RockFile.h" -#include "ipc/StoreMap.h" - -namespace Rock { - -/// \ingroup Rock -/// map of used db slots indexed by sfileno -class DirMap: public Ipc::StoreMap -{ -public: - DirMap(const char *const aPath, const int limit); ///< create a new shared DirMap - DirMap(const char *const aPath); ///< open an existing shared DirMap - - /// write access to the cell header; call openForWriting() first! - DbCellHeader &header(const sfileno fileno); - /// read-only access to the cell header; call openForReading() first! - const DbCellHeader &header(const sfileno fileno) const; - - static int AbsoluteEntryLimit(); ///< maximum entryLimit() possible - -private: - /// data shared by all DirMaps with the same path - class Shared { - public: - static size_t MemSize(int limit); - DbCellHeader headers[]; ///< DbCellHeaders storage - }; - - Shared *shared; ///< pointer to shared memory -}; - -} // namespace Rock - -// We do not reuse struct _fileMap because we cannot control its size, -// resulting in sfilenos that are pointing beyond the database. - -/* - * Rock::DirMap does not implement Ipc::StoreMapCleaner API because we want - * to avoid extra I/O necessary to mark the disk slot empty. We may create some - * stale responses if Squid quits, but should save a lot of I/O in the common - * cases. TODO: Consider cleaning on-disk slots on exit; always scheduling - * but cancelling/merging cleanup I/O; scheduling cleanup I/O after a - * configurable delay; etc. - */ - -#endif /* SQUID_FS_ROCK_DIR_MAP_H */ diff --git a/src/fs/rock/RockSwapDir.cc b/src/fs/rock/RockSwapDir.cc index 57ed5ab684..ce06bc823e 100644 --- a/src/fs/rock/RockSwapDir.cc +++ b/src/fs/rock/RockSwapDir.cc @@ -20,13 +20,15 @@ // must be divisible by 1024 due to cur_size and max_size KB madness const int64_t Rock::SwapDir::HeaderSize = 16*1024; -Rock::SwapDir::SwapDir(): ::SwapDir("rock"), filePath(NULL), io(NULL), map(NULL) +Rock::SwapDir::SwapDir(): ::SwapDir("rock"), filePath(NULL), io(NULL), + mapOwner(NULL), map(NULL) { } Rock::SwapDir::~SwapDir() { delete io; + delete mapOwner; delete map; safe_free(filePath); } @@ -199,7 +201,9 @@ Rock::SwapDir::init() const int64_t eLimitLo = 0; // dynamic shrinking unsupported const int64_t eWanted = (maximumSize() - HeaderSize)/max_objsize; const int64_t eAllowed = min(max(eLimitLo, eWanted), eLimitHi); - map = new DirMap(path, eAllowed); + Must(!mapOwner); + mapOwner = DirMap::Init(path, eAllowed); + map = new DirMap(path); } const char *ioModule = UsingSmp() ? "IpcIo" : "Blocking"; @@ -293,8 +297,7 @@ Rock::SwapDir::validateOptions() assert(diskOffsetLimit() <= maximumSize()); // warn if maximum db size is not reachable due to sfileno limit - if (map->entryLimit() == map->AbsoluteEntryLimit() && - totalWaste > roundingWasteMx) { + if (map->entryLimit() == eLimitHi && totalWaste > roundingWasteMx) { debugs(47, 0, "Rock store cache_dir[" << index << "]:"); debugs(47, 0, "\tmaximum number of entries: " << map->entryLimit()); debugs(47, 0, "\tmaximum entry size: " << max_objsize << " bytes"); @@ -325,7 +328,7 @@ Rock::SwapDir::addEntry(const int fileno, const DbCellHeader &header, const Stor if (Ipc::StoreMapSlot *slot = map->openForWriting(reinterpret_cast(from.key), newLocation)) { if (fileno == newLocation) { slot->set(from); - map->header(fileno) = header; + map->extras(fileno) = header; // core will not updateSize: we do not add the entry to store_table updateSize(from.swap_file_sz, +1); } // else some other, newer entry got into our cell @@ -383,7 +386,7 @@ Rock::SwapDir::createStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreI } e.swap_file_sz = header.payloadSize; // and will be copied to the map slot->set(e); - map->header(fileno) = header; + map->extras(fileno) = header; // XXX: We rely on our caller, storeSwapOutStart(), to set e.fileno. // If that does not happen, the entry will not decrement the read level! @@ -446,7 +449,7 @@ Rock::SwapDir::openStoreIO(StoreEntry &e, StoreIOState::STFNCB *cbFile, StoreIOS sio->swap_dirn = index; sio->swap_filen = e.swap_filen; - sio->payloadEnd = sizeof(DbCellHeader) + map->header(e.swap_filen).payloadSize; + sio->payloadEnd = sizeof(DbCellHeader) + map->extras(e.swap_filen).payloadSize; assert(sio->payloadEnd <= max_objsize); // the payload fits the slot debugs(47,5, HERE << "dir " << index << " has old fileno: " << diff --git a/src/fs/rock/RockSwapDir.h b/src/fs/rock/RockSwapDir.h index b0c58bf269..5d9ecc55c2 100644 --- a/src/fs/rock/RockSwapDir.h +++ b/src/fs/rock/RockSwapDir.h @@ -3,7 +3,8 @@ #include "SwapDir.h" #include "DiskIO/IORequestor.h" -#include "fs/rock/RockDirMap.h" +#include "fs/rock/RockFile.h" +#include "ipc/StoreMap.h" class DiskIOStrategy; class DiskFile; @@ -74,8 +75,11 @@ protected: const char *filePath; ///< location of cache storage file inside path/ private: + typedef Ipc::StoreMapWithExtras DirMap; + DiskIOStrategy *io; RefCount theFile; ///< cache storage for this cache_dir + DirMap::Owner *mapOwner; DirMap *map; static const int64_t HeaderSize; ///< on-disk db header size diff --git a/src/ipc/Makefile.am b/src/ipc/Makefile.am index 6079e11782..14d3d0af03 100644 --- a/src/ipc/Makefile.am +++ b/src/ipc/Makefile.am @@ -53,6 +53,7 @@ libipc_la_SOURCES = \ mem/Pages.h \ mem/PageStack.cc \ mem/PageStack.h \ + mem/Pointer.h \ mem/Segment.cc \ mem/Segment.h diff --git a/src/ipc/Queue.cc b/src/ipc/Queue.cc index 2492cf7457..a4294be48e 100644 --- a/src/ipc/Queue.cc +++ b/src/ipc/Queue.cc @@ -38,26 +38,40 @@ QueueReader::QueueReader(): popBlocked(1), popSignal(0) debugs(54, 7, HERE << "constructed " << id); } +/* QueueReaders */ + +QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity) +{ + Must(theCapacity > 0); + new (theReaders) QueueReader[theCapacity]; +} + +size_t +QueueReaders::sharedMemorySize() const +{ + return SharedMemorySize(theCapacity); +} + +size_t +QueueReaders::SharedMemorySize(const int capacity) +{ + return sizeof(QueueReaders) + sizeof(QueueReader) * capacity; +} + // OneToOneUniQueue -OneToOneUniQueue::OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity): - shm(id.termedBuf()), reader_(NULL) +OneToOneUniQueue::Owner * +OneToOneUniQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity) { - const int sharedSize = Items2Bytes(maxItemSize, capacity); - shm.create(sharedSize); - shared = new (shm.reserve(sharedSize)) Shared(maxItemSize, capacity); + Must(maxItemSize > 0); + Must(capacity > 0); + return shm_new(Shared)(id.termedBuf(), maxItemSize, capacity); } -OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf()), - reader_(NULL) +OneToOneUniQueue::OneToOneUniQueue(const String &id): + shared(shm_old(Shared)(id.termedBuf())), reader_(NULL) { - shm.open(); - shared = reinterpret_cast(shm.mem()); - assert(shared); - const int sharedSize = - Items2Bytes(shared->theMaxItemSize, shared->theCapacity); - assert(shared == reinterpret_cast(shm.reserve(sharedSize))); } void @@ -82,32 +96,61 @@ OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size) return sizeof(Shared) + maxItemSize * size; } +QueueReader & +OneToOneUniQueue::reader() +{ + Must(reader_); + return *reader_; +} + OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity): theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize), theCapacity(aCapacity) { } -QueueReader & -OneToOneUniQueue::reader() +size_t +OneToOneUniQueue::Shared::sharedMemorySize() const { - Must(reader_); - return *reader_; + return SharedMemorySize(theMaxItemSize, theCapacity); +} + +size_t +OneToOneUniQueue::Shared::SharedMemorySize(const unsigned int maxItemSize, const int capacity) +{ + return Items2Bytes(maxItemSize, capacity); } // OneToOneBiQueue -OneToOneBiQueue::OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity): - popQueue(new OneToOneUniQueue(QueueId(id, 1), maxItemSize, capacity)), - pushQueue(new OneToOneUniQueue(QueueId(id, 2), maxItemSize, capacity)) +OneToOneBiQueue::Owner * +OneToOneBiQueue::Init(const String &id, const unsigned int maxItemSize, const int capacity) { + UniQueueOwner owner1(OneToOneUniQueue::Init(QueueId(id, Side1), maxItemSize, capacity)); + UniQueueOwner owner2(OneToOneUniQueue::Init(QueueId(id, Side2), maxItemSize, capacity)); + Owner *const owner = new Owner; + owner->first = owner1; + owner->second = owner2; + return owner; } -OneToOneBiQueue::OneToOneBiQueue(const String &id): - popQueue(new OneToOneUniQueue(QueueId(id, 2))), - pushQueue(new OneToOneUniQueue(QueueId(id, 1))) +OneToOneBiQueue::OneToOneBiQueue(const String &id, const Side side) { + OneToOneUniQueue *const queue1 = new OneToOneUniQueue(QueueId(id, Side1)); + OneToOneUniQueue *const queue2 = new OneToOneUniQueue(QueueId(id, Side2)); + switch (side) { + case Side1: + popQueue.reset(queue1); + pushQueue.reset(queue2); + break; + case Side2: + popQueue.reset(queue2); + pushQueue.reset(queue1); + break; + default: + Must(false); + } } void @@ -127,24 +170,25 @@ OneToOneBiQueue::clearReaderSignal() // FewToOneBiQueue -FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity): - theLastPopWorker(0), theWorkerCount(aWorkerCount), - shm(ReaderId(id).termedBuf()), - reader(NULL) +FewToOneBiQueue::Owner * +FewToOneBiQueue::Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity) +{ + return new Owner(id, workerCount, maxItemSize, capacity); +} + +FewToOneBiQueue::FewToOneBiQueue(const String &id): + theLastPopWorker(0), + readers(shm_old(QueueReaders)(ReaderId(id).termedBuf())), + reader(readers->theReaders) { - // create a new segment for the local and remote queue readers - // TODO: all our queues and readers should use a single segment - shm.create((theWorkerCount+1)*sizeof(QueueReader)); - reader = new (shm.reserve(sizeof(QueueReader))) QueueReader; + Must(readers->theCapacity > 1); + debugs(54, 7, HERE << "disker " << id << " reader: " << reader->id); - assert(theWorkerCount >= 0); - biQueues.reserve(theWorkerCount); - for (int i = 0; i < theWorkerCount; ++i) { - OneToOneBiQueue *const biQueue = - new OneToOneBiQueue(QueueId(id, i + WorkerIdOffset), maxItemSize, capacity); - QueueReader *remoteReader = - new (shm.reserve(sizeof(QueueReader))) QueueReader; + biQueues.reserve(workerCount()); + for (int i = 0; i < workerCount(); ++i) { + OneToOneBiQueue *const biQueue = new OneToOneBiQueue(QueueId(id, i + WorkerIdOffset), OneToOneBiQueue::Side1); + QueueReader *const remoteReader = readers->theReaders + i + 1; biQueue->readers(reader, remoteReader); biQueues.push_back(biQueue); } @@ -153,34 +197,36 @@ FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const OneToOneBiQueue * FewToOneBiQueue::Attach(const String &id, const int workerId) { - // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker? - Ipc::Mem::Segment *shmPtr = new Ipc::Mem::Segment(ReaderId(id).termedBuf()); - - Ipc::Mem::Segment &shm = *shmPtr; - shm.open(); - assert(shm.size() >= static_cast((1 + workerId+1 - WorkerIdOffset)*sizeof(QueueReader))); - QueueReader *readers = reinterpret_cast(shm.mem()); - QueueReader *remoteReader = &readers[0]; + Ipc::Mem::Pointer readers = shm_old(QueueReaders)(ReaderId(id).termedBuf()); + Must(workerId >= WorkerIdOffset); + Must(workerId < readers->theCapacity - 1 + WorkerIdOffset); + QueueReader *const remoteReader = readers->theReaders; debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id); - QueueReader *localReader = &readers[workerId+1 - WorkerIdOffset]; + QueueReader *const localReader = + readers->theReaders + workerId - WorkerIdOffset + 1; debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id); OneToOneBiQueue *const biQueue = - new OneToOneBiQueue(QueueId(id, workerId)); + new OneToOneBiQueue(QueueId(id, workerId), OneToOneBiQueue::Side2); biQueue->readers(localReader, remoteReader); + + // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker? + const Ipc::Mem::Pointer *const leakingReaders = new Ipc::Mem::Pointer(readers); + Must(leakingReaders); // silence unused variable warning + return biQueue; } FewToOneBiQueue::~FewToOneBiQueue() { - for (int i = 0; i < theWorkerCount; ++i) + for (int i = 0; i < workerCount(); ++i) delete biQueues[i]; } bool FewToOneBiQueue::validWorkerId(const int workerId) const { return WorkerIdOffset <= workerId && - workerId < WorkerIdOffset + theWorkerCount; + workerId < WorkerIdOffset + workerCount(); } void @@ -194,5 +240,22 @@ FewToOneBiQueue::clearReaderSignal(int workerId) // we got a hint; we could reposition iteration to try popping from the // workerId queue first; but it does not seem to help much and might // introduce some bias so we do not do that for now: - // theLastPopWorker = (workerId + theWorkerCount - 1) % theWorkerCount; + // theLastPopWorker = (workerId + workerCount() - 1) % workerCount(); +} + +FewToOneBiQueue::Owner::Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity): + readersOwner(shm_new(QueueReaders)(ReaderId(id).termedBuf(), workerCount + 1)) +{ + biQueueOwners.reserve(workerCount); + for (int i = 0; i < workerCount; ++i) { + OneToOneBiQueue::Owner *const queueOwner = OneToOneBiQueue::Init(QueueId(id, i + WorkerIdOffset), maxItemSize, capacity); + biQueueOwners.push_back(queueOwner); + } +} + +FewToOneBiQueue::Owner::~Owner() +{ + for (size_t i = 0; i < biQueueOwners.size(); ++i) + delete biQueueOwners[i]; + delete readersOwner; } diff --git a/src/ipc/Queue.h b/src/ipc/Queue.h index 847b3ad9e9..99698003a5 100644 --- a/src/ipc/Queue.h +++ b/src/ipc/Queue.h @@ -9,9 +9,11 @@ #include "Array.h" #include "base/InstanceId.h" #include "ipc/AtomicWord.h" -#include "ipc/mem/Segment.h" +#include "ipc/mem/Pointer.h" #include "util.h" +#include + class String; /// State of the reading end of a queue (i.e., of the code calling pop()). @@ -45,6 +47,16 @@ public: const InstanceId id; }; +/// shared array of QueueReaders +class QueueReaders { +public: + QueueReaders(const int aCapacity); + size_t sharedMemorySize() const; + static size_t SharedMemorySize(const int capacity); + + const int theCapacity; /// number of readers + QueueReader theReaders[]; /// readers +}; /** * Lockless fixed-capacity queue for a single writer and a single reader. @@ -57,12 +69,32 @@ public: * different value). We can add support for blocked writers if needed. */ class OneToOneUniQueue { +private: + struct Shared { + Shared(const unsigned int aMaxItemSize, const int aCapacity); + size_t sharedMemorySize() const; + static size_t SharedMemorySize(const unsigned int maxItemSize, const int capacity); + + unsigned int theIn; ///< input index, used only in push() + unsigned int theOut; ///< output index, used only in pop() + + AtomicWord theSize; ///< number of items in the queue + const unsigned int theMaxItemSize; ///< maximum item size + const int theCapacity; ///< maximum number of items, i.e. theBuffer size + + char theBuffer[]; + }; + public: // pop() and push() exceptions; TODO: use TextException instead class Full {}; class ItemTooLarge {}; - OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity); + typedef Ipc::Mem::Owner Owner; + + /// initialize shared memory + static Owner *Init(const String &id, const unsigned int maxItemSize, const int capacity); + OneToOneUniQueue(const String &id); unsigned int maxItemSize() const { return shared->theMaxItemSize; } @@ -85,33 +117,27 @@ public: void reader(QueueReader *aReader); private: - struct Shared { - Shared(const unsigned int aMaxItemSize, const int aCapacity); - - unsigned int theIn; ///< input index, used only in push() - unsigned int theOut; ///< output index, used only in pop() - - AtomicWord theSize; ///< number of items in the queue - const unsigned int theMaxItemSize; ///< maximum item size - const int theCapacity; ///< maximum number of items, i.e. theBuffer size - - char theBuffer[]; - }; - Ipc::Mem::Segment shm; ///< shared memory segment - Shared *shared; ///< pointer to shared memory + Ipc::Mem::Pointer shared; ///< pointer to shared memory QueueReader *reader_; ///< the state of the code popping from this queue }; /// Lockless fixed-capacity bidirectional queue for two processes. class OneToOneBiQueue { +private: + typedef std::auto_ptr UniQueueOwner; + public: typedef OneToOneUniQueue::Full Full; typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge; - /// Create a new shared queue. - OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity); - OneToOneBiQueue(const String &id); ///< Attach to existing shared queue. + typedef std::pair Owner; + + /// initialize shared memory + static Owner *Init(const String &id, const unsigned int maxItemSize, const int capacity); + + enum Side { Side1 = 1, Side2 = 2 }; + OneToOneBiQueue(const String &id, const Side side); void readers(QueueReader *r1, QueueReader *r2); void clearReaderSignal(); @@ -121,8 +147,8 @@ public: template bool push(const Value &value) { return pushQueue->push(value); } //private: - OneToOneUniQueue *const popQueue; ///< queue to pop from for this process - OneToOneUniQueue *const pushQueue; ///< queue to push to for this process + std::auto_ptr popQueue; ///< queue to pop from for this process + std::auto_ptr pushQueue; ///< queue to push to for this process }; /** @@ -138,12 +164,24 @@ public: typedef OneToOneBiQueue::Full Full; typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge; - FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity); + class Owner { + public: + Owner(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity); + ~Owner(); + + private: + Vector biQueueOwners; + Ipc::Mem::Owner *const readersOwner; + }; + + static Owner *Init(const String &id, const int workerCount, const unsigned int maxItemSize, const int capacity); + + FewToOneBiQueue(const String &id); static OneToOneBiQueue *Attach(const String &id, const int workerId); ~FewToOneBiQueue(); bool validWorkerId(const int workerId) const; - int workerCount() const { return theWorkerCount; } + int workerCount() const { return readers->theCapacity - 1; } /// clears the reader notification received by the disker from worker void clearReaderSignal(int workerId); @@ -157,10 +195,9 @@ public: //private: XXX: make private by moving pop/push debugging into pop/push int theLastPopWorker; ///< the ID of the last worker we tried to pop() from Vector biQueues; ///< worker queues indexed by worker ID - const int theWorkerCount; ///< the total number of workers - Ipc::Mem::Segment shm; ///< shared memory segment to store the reader - QueueReader *reader; ///< the state of the code popping from all biQueues + const Ipc::Mem::Pointer readers; ///< readers array + QueueReader *const reader; ///< the state of the code popping from all biQueues enum { WorkerIdOffset = 1 }; ///< worker ID offset, always 1 for now }; @@ -221,8 +258,8 @@ bool FewToOneBiQueue::pop(int &workerId, Value &value) { // iterate all workers, starting after the one we visited last - for (int i = 0; i < theWorkerCount; ++i) { - theLastPopWorker = (theLastPopWorker + 1) % theWorkerCount; + for (int i = 0; i < workerCount(); ++i) { + theLastPopWorker = (theLastPopWorker + 1) % workerCount(); if (biQueues[theLastPopWorker]->pop(value)) { workerId = theLastPopWorker + WorkerIdOffset; return true; diff --git a/src/ipc/StoreMap.cc b/src/ipc/StoreMap.cc index 1b332802be..3fbe6dc7ea 100644 --- a/src/ipc/StoreMap.cc +++ b/src/ipc/StoreMap.cc @@ -9,34 +9,28 @@ #include "Store.h" #include "ipc/StoreMap.h" -Ipc::StoreMap::StoreMap(const char *const aPath, const int limit, - size_t sharedSizeExtra): - cleaner(NULL), path(aPath), shm(aPath), shared(NULL) +Ipc::StoreMap::Owner * +Ipc::StoreMap::Init(const char *const path, const int limit, const size_t extrasSize) { assert(limit > 0); // we should not be created otherwise - const size_t sharedSize = Shared::MemSize(limit); - shm.create(sharedSize + sharedSizeExtra); - shared = new (shm.reserve(sharedSize)) Shared(limit); + assert(extrasSize >= 0); + Owner *const owner = shm_new(Shared)(path, limit, extrasSize); debugs(54, 5, HERE << "new map [" << path << "] created: " << limit); + return owner; } -Ipc::StoreMap::StoreMap(const char *const aPath): - cleaner(NULL), path(aPath), shm(aPath), shared(NULL) +Ipc::StoreMap::Owner * +Ipc::StoreMap::Init(const char *const path, const int limit) { - shm.open(); - assert(shm.mem()); - shared = reinterpret_cast(shm.mem()); - assert(shared->limit > 0); // we should not be created otherwise - // now that the limit is checked, check that nobody used our segment chunk - const size_t sharedSize = Shared::MemSize(shared->limit); - assert(shared == reinterpret_cast(shm.reserve(sharedSize))); - debugs(54, 5, HERE << "attached map [" << path << "] created: " << shared->limit); + return Init(path, limit, 0); } -void -Ipc::StoreMap::Unlink(const char *const path) +Ipc::StoreMap::StoreMap(const char *const aPath): cleaner(NULL), path(aPath), + shared(shm_old(Shared)(aPath)) { - Mem::Segment::Unlink(path); + assert(shared->limit > 0); // we should not be created otherwise + debugs(54, 5, HERE << "attached map [" << path << "] created: " << + shared->limit); } Ipc::StoreMap::Slot * @@ -312,13 +306,20 @@ Ipc::StoreMapSlot::set(const StoreEntry &from) /* Ipc::StoreMap::Shared */ -Ipc::StoreMap::Shared::Shared(const int aLimit): limit(aLimit), count(0) +Ipc::StoreMap::Shared::Shared(const int aLimit, const size_t anExtrasSize): + limit(aLimit), extrasSize(anExtrasSize), count(0) +{ +} + +size_t +Ipc::StoreMap::Shared::sharedMemorySize() const { + return SharedMemorySize(limit, extrasSize); } size_t -Ipc::StoreMap::Shared::MemSize(int limit) +Ipc::StoreMap::Shared::SharedMemorySize(const int limit, const size_t extrasSize) { - return sizeof(Shared) + limit * sizeof(Slot); + return sizeof(Shared) + limit * (sizeof(Slot) + extrasSize); } diff --git a/src/ipc/StoreMap.h b/src/ipc/StoreMap.h index 7356471954..cbd9fef306 100644 --- a/src/ipc/StoreMap.h +++ b/src/ipc/StoreMap.h @@ -2,7 +2,7 @@ #define SQUID_IPC_STORE_MAP_H #include "ipc/ReadWriteLock.h" -#include "ipc/mem/Segment.h" +#include "ipc/mem/Pointer.h" #include "typedefs.h" namespace Ipc { @@ -53,9 +53,26 @@ class StoreMap public: typedef StoreMapSlot Slot; - StoreMap(const char *const aPath, const int limit, size_t sharedSizeExtra); ///< create a new shared StoreMap - explicit StoreMap(const char *const aPath); ///< open an existing shared StoreMap - static void Unlink(const char *const path); ///< unlink shared memory segment +private: + struct Shared + { + Shared(const int aLimit, const size_t anExtrasSize); + size_t sharedMemorySize() const; + static size_t SharedMemorySize(const int limit, const size_t anExtrasSize); + + const int limit; ///< maximum number of map slots + const size_t extrasSize; ///< size of slot extra data + AtomicWord count; ///< current number of map slots + Slot slots[]; ///< slots storage + }; + +public: + typedef Mem::Owner Owner; + + /// initialize shared memory + static Owner *Init(const char *const path, const int limit); + + StoreMap(const char *const aPath); /// finds, reservers space for writing a new entry or returns nil Slot *openForWriting(const cache_key *const key, sfileno &fileno); @@ -79,7 +96,7 @@ public: void abortIo(const sfileno fileno); bool full() const; ///< there are no empty slots left - bool valid(int n) const; ///< whether n is a valid slot coordinate + bool valid(const int n) const; ///< whether n is a valid slot coordinate int entryCount() const; ///< number of used slots int entryLimit() const; ///< maximum number of slots that can be used @@ -89,21 +106,10 @@ public: StoreMapCleaner *cleaner; ///< notified before a readable entry is freed protected: - class Shared { - public: - static size_t MemSize(int limit); + static Owner *Init(const char *const path, const int limit, const size_t extrasSize); - Shared(const int aLimit); - - const AtomicWord limit; ///< maximum number of map slots - AtomicWord count; ///< current number of map slots - - Slot slots[]; ///< slots storage - }; - -protected: const String path; ///< cache_dir path, used for logging - Ipc::Mem::Segment shm; ///< shared memory segment + Mem::Pointer shared; private: int slotIndexByKey(const cache_key *const key) const; @@ -113,9 +119,30 @@ private: void abortWriting(const sfileno fileno); void freeIfNeeded(Slot &s); void freeLocked(Slot &s, bool keepLocked); - String sharedMemoryName(); +}; - Shared *shared; ///< pointer to shared memory +/// StoreMap with extra slot data +/// Note: ExtrasT must be POD, it is initialized with zeroes, no +/// constructors or destructors are called +template +class StoreMapWithExtras: public StoreMap +{ +public: + typedef ExtrasT Extras; + + /// initialize shared memory + static Owner *Init(const char *const path, const int limit); + + StoreMapWithExtras(const char *const path); + + /// write access to the extras; call openForWriting() first! + ExtrasT &extras(const sfileno fileno); + /// read-only access to the extras; call openForReading() first! + const ExtrasT &extras(const sfileno fileno) const; + +protected: + + ExtrasT *sharedExtras; ///< pointer to extras in shared memory }; /// API for adjusting external state when dirty map slot is being freed @@ -128,6 +155,40 @@ public: virtual void cleanReadable(const sfileno fileno) = 0; }; +// StoreMapWithExtras implementation + +template +StoreMap::Owner * +StoreMapWithExtras::Init(const char *const path, const int limit) +{ + return StoreMap::Init(path, limit, sizeof(Extras)); +} + +template +StoreMapWithExtras::StoreMapWithExtras(const char *const path): + StoreMap(path) +{ + const size_t sharedSizeWithoutExtras = + Shared::SharedMemorySize(entryLimit(), 0); + sharedExtras = reinterpret_cast(reinterpret_cast(shared.getRaw()) + sharedSizeWithoutExtras); +} + +template +ExtrasT & +StoreMapWithExtras::extras(const sfileno fileno) +{ + return const_cast(const_cast(this)->extras(fileno)); +} + +template +const ExtrasT & +StoreMapWithExtras::extras(const sfileno fileno) const +{ + assert(sharedExtras); + assert(valid(fileno)); + return sharedExtras[fileno]; +} + } // namespace Ipc diff --git a/src/ipc/mem/PagePool.cc b/src/ipc/mem/PagePool.cc index 9885daf19b..8cd9bf0283 100644 --- a/src/ipc/mem/PagePool.cc +++ b/src/ipc/mem/PagePool.cc @@ -11,91 +11,28 @@ #include "ipc/mem/PagePool.h" -static String -PageIndexId(String id) -{ - id.append("-index"); - return id; -} - - // Ipc::Mem::PagePool -Ipc::Mem::PagePool::PagePool(const String &id, const unsigned int capacity, const size_t pageSize): - pageIndex(PageIndexId(id), capacity), - shm(id.termedBuf()) -{ - const off_t sharedSize = Shared::MemSize(capacity, pageSize); - shm.create(sharedSize); - assert(shm.mem()); - shared = new (shm.reserve(sharedSize)) Shared(capacity, pageSize); -} - -Ipc::Mem::PagePool::PagePool(const String &id): - pageIndex(PageIndexId(id)), shm(id.termedBuf()) +Ipc::Mem::PagePool::Owner * +Ipc::Mem::PagePool::Init(const char *const id, const unsigned int capacity, const size_t pageSize) { - shm.open(); - shared = reinterpret_cast(shm.mem()); - assert(shared); - const off_t sharedSize = - Shared::MemSize(shared->theCapacity, shared->thePageSize); - assert(shared == reinterpret_cast(shm.reserve(sharedSize))); -} - -void -Ipc::Mem::PagePool::Unlink(const String &id) -{ - PageStack::Unlink(PageIndexId(id)); - Segment::Unlink(id.termedBuf()); -} - -bool -Ipc::Mem::PagePool::get(PageId &page) -{ - if (pageIndex.pop(page.number)) { - page.pool = shared->theId; - return true; - } - return false; + static uint32_t LastPagePoolId = 0; + if (++LastPagePoolId == 0) + ++LastPagePoolId; // skip zero pool id + return shm_new(PageStack)(id, LastPagePoolId, capacity, pageSize); } -void -Ipc::Mem::PagePool::put(PageId &page) +Ipc::Mem::PagePool::PagePool(const char *const id): + pageIndex(shm_old(PageStack)(id)) { - Must(pageIdIsValid(page)); - pageIndex.push(page.number); - page = PageId(); + const size_t pagesDataOffset = + pageIndex->sharedMemorySize() - capacity() * pageSize(); + theBuf = reinterpret_cast(pageIndex.getRaw()) + pagesDataOffset; } void * Ipc::Mem::PagePool::pagePointer(const PageId &page) { - Must(pageIdIsValid(page)); - return shared->theBuf + shared->thePageSize * (page.number - 1); -} - -bool -Ipc::Mem::PagePool::pageIdIsValid(const PageId &page) const -{ - return page.pool == shared->theId && - 0 < page.number && page.number <= shared->theCapacity; -} - - -// Ipc::Mem::PagePool::Shared - -static unsigned int LastPagePoolId = 0; - -Ipc::Mem::PagePool::Shared::Shared(const unsigned int aCapacity, size_t aPageSize): - theId(++LastPagePoolId), theCapacity(aCapacity), thePageSize(aPageSize) -{ - if (LastPagePoolId + 1 == 0) - ++LastPagePoolId; // skip zero pool id -} - -off_t -Ipc::Mem::PagePool::Shared::MemSize(const unsigned int capacity, const size_t pageSize) -{ - return static_cast(sizeof(Shared)) + - static_cast(pageSize) * capacity; + Must(pageIndex->pageIdIsValid(page)); + return theBuf + pageSize() * (page.number - 1); } diff --git a/src/ipc/mem/PagePool.h b/src/ipc/mem/PagePool.h index 4c4b96b6c8..2658fc4216 100644 --- a/src/ipc/mem/PagePool.h +++ b/src/ipc/mem/PagePool.h @@ -7,58 +7,38 @@ #define SQUID_IPC_MEM_PAGE_POOL_H #include "ipc/mem/PageStack.h" -#include "ipc/mem/Segment.h" +#include "ipc/mem/Pointer.h" namespace Ipc { namespace Mem { -class PageId; - /// Atomic container of shared memory pages. Implemented using a collection of /// Segments, each with a PageStack index of free pages. All pools must be /// created by a single process. class PagePool { public: - /// creates a new shared page pool that can hold up to capacity pages of pageSize size - PagePool(const String &id, const unsigned int capacity, const size_t pageSize); - /// attaches to the identified shared page pool - PagePool(const String &id); - /// unlinks shared memory segments - static void Unlink(const String &id); + typedef Ipc::Mem::Owner Owner; + + static Owner *Init(const char *const id, const unsigned int capacity, const size_t pageSize); - unsigned int capacity() const { return shared->theCapacity; } + PagePool(const char *const id); + + unsigned int capacity() const { return pageIndex->capacity(); } + size_t pageSize() const { return pageIndex->pageSize(); } /// lower bound for the number of free pages - unsigned int size() const { return pageIndex.size(); } - size_t pageSize() const { return shared->thePageSize; } + unsigned int size() const { return pageIndex->size(); } /// sets page ID and returns true unless no free pages are found - bool get(PageId &page); + bool get(PageId &page) { return pageIndex->pop(page); } /// makes identified page available as a free page to future get() callers - void put(PageId &page); + void put(PageId &page) { return pageIndex->push(page); } /// converts page handler into a temporary writeable shared memory pointer void *pagePointer(const PageId &page); private: - inline bool pageIdIsValid(const PageId &page) const; - - struct Shared { - Shared(const unsigned int aCapacity, const size_t aPageSize); - - /// total shared memory size required to share - static off_t MemSize(const unsigned int capacity, const size_t pageSize); - - const unsigned int theId; ///< pool id - const unsigned int theCapacity; ///< number of pages in the pool - const size_t thePageSize; ///< page size - - // TODO: add padding to make pages system page-aligned? - char theBuf[]; ///< pages storage - }; - - PageStack pageIndex; ///< free pages index - Segment shm; ///< shared memory segment to store metadata (and pages) - Shared *shared; ///< our metadata and page storage, shared among all pool users + Ipc::Mem::Pointer pageIndex; ///< free pages index + char *theBuf; ///< pages storage }; } // namespace Mem diff --git a/src/ipc/mem/PageStack.cc b/src/ipc/mem/PageStack.cc index 9899fc36d7..fca378b72b 100644 --- a/src/ipc/mem/PageStack.cc +++ b/src/ipc/mem/PageStack.cc @@ -8,34 +8,21 @@ #include "config.h" #include "base/TextException.h" +#include "ipc/mem/Page.h" #include "ipc/mem/PageStack.h" /// used to mark a stack slot available for storing free page offsets const Ipc::Mem::PageStack::Value Writable = 0; -Ipc::Mem::PageStack::PageStack(const String &id, const unsigned int capacity): - shm(id.termedBuf()) -{ - const size_t sharedSize = Shared::MemSize(capacity); - shm.create(sharedSize); - assert(shm.mem()); - shared = new (shm.reserve(sharedSize)) Shared(capacity); -} - -Ipc::Mem::PageStack::PageStack(const String &id): shm(id.termedBuf()) -{ - shm.open(); - shared = reinterpret_cast(shm.mem()); - assert(shared); - const off_t sharedSize = Shared::MemSize(shared->theCapacity); - assert(shared == reinterpret_cast(shm.reserve(sharedSize))); -} - -void -Ipc::Mem::PageStack::Unlink(const String &id) +Ipc::Mem::PageStack::PageStack(const uint32_t aPoolId, const unsigned int aCapacity, const size_t aPageSize): + thePoolId(aPoolId), theCapacity(aCapacity), thePageSize(aPageSize), + theSize(theCapacity), + theLastReadable(prev(theSize)), theFirstWritable(next(theLastReadable)) { - Segment::Unlink(id.termedBuf()); + // initially, all pages are free + for (Offset i = 0; i < theSize; ++i) + theItems[i] = i + 1; // skip page number zero to keep numbers positive } /* @@ -46,56 +33,58 @@ Ipc::Mem::PageStack::Unlink(const String &id) * approach is better? Same for push(). */ bool -Ipc::Mem::PageStack::pop(Value &value) +Ipc::Mem::PageStack::pop(PageId &page) { // we may fail to dequeue, but be conservative to prevent long searches - --shared->theSize; + --theSize; // find a Readable slot, starting with theLastReadable and going left - while (shared->theSize >= 0) { - const Offset idx = shared->theLastReadable; + while (theSize >= 0) { + const Offset idx = theLastReadable; // mark the slot at ids Writable while extracting its current value - value = shared->theItems[idx].fetchAndAnd(0); // works if Writable is 0 + const Value value = theItems[idx].fetchAndAnd(0); // works if Writable is 0 const bool popped = value != Writable; // theItems[idx] is probably not Readable [any more] // Whether we popped a Readable value or not, we should try going left // to maintain the index (and make progress). // We may fail if others already updated the index, but that is OK. - shared->theLastReadable.swap_if(idx, shared->prev(idx)); // may fail or lie + theLastReadable.swap_if(idx, prev(idx)); // may fail or lie if (popped) { // the slot we emptied may already be filled, but that is OK - shared->theFirstWritable = idx; // may lie + theFirstWritable = idx; // may lie + page.pool = thePoolId; + page.number = value; return true; } // TODO: report suspiciously long loops } - ++shared->theSize; + ++theSize; return false; } void -Ipc::Mem::PageStack::push(const Value value) +Ipc::Mem::PageStack::push(PageId &page) { - Must(value != Writable); - Must(static_cast(value) <= shared->theCapacity); + Must(pageIdIsValid(page)); // find a Writable slot, starting with theFirstWritable and going right - while (shared->theSize < shared->theCapacity) { - const Offset idx = shared->theFirstWritable; - const bool pushed = shared->theItems[idx].swap_if(Writable, value); + while (theSize < theCapacity) { + const Offset idx = theFirstWritable; + const bool pushed = theItems[idx].swap_if(Writable, page.number); // theItems[idx] is probably not Writable [any more]; - // Whether we pushed the value or not, we should try going right + // Whether we pushed the page number or not, we should try going right // to maintain the index (and make progress). // We may fail if others already updated the index, but that is OK. - shared->theFirstWritable.swap_if(idx, shared->next(idx)); // may fail or lie + theFirstWritable.swap_if(idx, next(idx)); // may fail or lie if (pushed) { // the enqueued value may already by gone, but that is OK - shared->theLastReadable = idx; // may lie - ++shared->theSize; + theLastReadable = idx; // may lie + ++theSize; + page = PageId(); return; } // TODO: report suspiciously long loops @@ -103,17 +92,21 @@ Ipc::Mem::PageStack::push(const Value value) Must(false); // the number of pages cannot exceed theCapacity } -Ipc::Mem::PageStack::Shared::Shared(const unsigned int aCapacity): - theCapacity(aCapacity), theSize(theCapacity), - theLastReadable(prev(theSize)), theFirstWritable(next(theLastReadable)) +bool +Ipc::Mem::PageStack::pageIdIsValid(const PageId &page) const { - // initially, all pages are free - for (Offset i = 0; i < theSize; ++i) - theItems[i] = i + 1; // skip page number zero to keep numbers positive + return page.pool == thePoolId && page.number != Writable && + page.number <= capacity(); +} + +size_t +Ipc::Mem::PageStack::sharedMemorySize() const +{ + return SharedMemorySize(thePoolId, theCapacity, thePageSize); } size_t -Ipc::Mem::PageStack::Shared::MemSize(const unsigned int capacity) +Ipc::Mem::PageStack::SharedMemorySize(const uint32_t, const unsigned int capacity, const size_t pageSize) { - return sizeof(Item) * capacity + sizeof(Shared); + return sizeof(PageStack) + capacity * (sizeof(Item) + pageSize); } diff --git a/src/ipc/mem/PageStack.h b/src/ipc/mem/PageStack.h index 329c425bcc..8c571702c9 100644 --- a/src/ipc/mem/PageStack.h +++ b/src/ipc/mem/PageStack.h @@ -7,12 +7,13 @@ #define SQUID_IPC_MEM_PAGE_STACK_H #include "ipc/AtomicWord.h" -#include "ipc/mem/Segment.h" namespace Ipc { namespace Mem { +class PageId; + /// Atomic container of "free" page numbers inside a single SharedMemory space. /// Assumptions: all page numbers are unique, positive, have an known maximum, /// and can be temporary unavailable as long as they are never trully lost. @@ -20,50 +21,45 @@ class PageStack { public: typedef uint32_t Value; ///< stack item type (a free page number) - /// creates a new shared stack that can hold up to capacity items - PageStack(const String &id, const unsigned int capacity); - /// attaches to the identified shared stack - PageStack(const String &id); - /// unlinks shared memory segment - static void Unlink(const String &id); + PageStack(const uint32_t aPoolId, const unsigned int aCapacity, const size_t aPageSize); + unsigned int capacity() const { return theCapacity; } + size_t pageSize() const { return thePageSize; } /// lower bound for the number of free pages - unsigned int size() const { return max(0, shared->theSize.get()); } + unsigned int size() const { return max(0, theSize.get()); } /// sets value and returns true unless no free page numbers are found - bool pop(Value &value); + bool pop(PageId &page); /// makes value available as a free page number to future pop() callers - void push(const Value value); + void push(PageId &page); + + bool pageIdIsValid(const PageId &page) const; + + /// total shared memory size required to share + static size_t SharedMemorySize(const uint32_t aPoolId, const unsigned int capacity, const size_t pageSize); + size_t sharedMemorySize() const; private: /// stack index and size type (may temporary go negative) typedef int Offset; - struct Shared { - Shared(const unsigned int aCapacity); - - /// total shared memory size required to share - static size_t MemSize(const unsigned int capacity); - - // these help iterate the stack in search of a free spot or a page - Offset next(const Offset idx) const { return (idx + 1) % theCapacity; } - Offset prev(const Offset idx) const { return (theCapacity + idx - 1) % theCapacity; } - - const Offset theCapacity; ///< stack capacity, i.e. theItems size - /// lower bound for the number of free pages (may get negative!) - AtomicWordT theSize; + // these help iterate the stack in search of a free spot or a page + Offset next(const Offset idx) const { return (idx + 1) % theCapacity; } + Offset prev(const Offset idx) const { return (theCapacity + idx - 1) % theCapacity; } - /// last readable item index; just a hint, not a guarantee - AtomicWordT theLastReadable; - /// first writable item index; just a hint, not a guarantee - AtomicWordT theFirstWritable; + const uint32_t thePoolId; ///< pool ID + const Offset theCapacity; ///< stack capacity, i.e. theItems size + const size_t thePageSize; ///< page size, used to calculate shared memory size + /// lower bound for the number of free pages (may get negative!) + AtomicWordT theSize; - typedef AtomicWordT Item; - Item theItems[]; ///< page number storage - }; + /// last readable item index; just a hint, not a guarantee + AtomicWordT theLastReadable; + /// first writable item index; just a hint, not a guarantee + AtomicWordT theFirstWritable; - Segment shm; ///< shared memory segment to store metadata (and pages) - Shared *shared; ///< our metadata, shared among all stack users + typedef AtomicWordT Item; + Item theItems[]; ///< page number storage }; } // namespace Mem diff --git a/src/ipc/mem/Pages.cc b/src/ipc/mem/Pages.cc index 7f113bee04..c828c4e7f9 100644 --- a/src/ipc/mem/Pages.cc +++ b/src/ipc/mem/Pages.cc @@ -68,8 +68,12 @@ class SharedMemPagesRr: public RegisteredRunner { public: /* RegisteredRunner API */ + SharedMemPagesRr(): owner(NULL) {} virtual void run(const RunnerRegistry &); virtual ~SharedMemPagesRr(); + +private: + Ipc::Mem::PagePool::Owner *owner; }; RunnerRegistrationEntry(rrAfterConfig, SharedMemPagesRr); @@ -94,13 +98,14 @@ void SharedMemPagesRr::run(const RunnerRegistry &) return; } - Must(!ThePagePool); if (IamMasterProcess()) { + Must(!owner); const size_t capacity = Ipc::Mem::Limit() / Ipc::Mem::PageSize(); - ThePagePool = - new Ipc::Mem::PagePool(PagePoolId, capacity, Ipc::Mem::PageSize()); - } else - ThePagePool = new Ipc::Mem::PagePool(PagePoolId); + owner = Ipc::Mem::PagePool::Init(PagePoolId, capacity, Ipc::Mem::PageSize()); + } + + Must(!ThePagePool); + ThePagePool = new Ipc::Mem::PagePool(PagePoolId); } SharedMemPagesRr::~SharedMemPagesRr() @@ -110,6 +115,5 @@ SharedMemPagesRr::~SharedMemPagesRr() delete ThePagePool; ThePagePool = NULL; - if (IamMasterProcess()) - Ipc::Mem::PagePool::Unlink(PagePoolId); + delete owner; } diff --git a/src/ipc/mem/Pointer.h b/src/ipc/mem/Pointer.h new file mode 100644 index 0000000000..3726b97807 --- /dev/null +++ b/src/ipc/mem/Pointer.h @@ -0,0 +1,166 @@ +/* + * $Id$ + * + */ + +#ifndef SQUID_IPC_MEM_POINTER_H +#define SQUID_IPC_MEM_POINTER_H + +#include "base/TextException.h" +#include "ipc/mem/Segment.h" +#include "RefCount.h" + +namespace Ipc { + +namespace Mem { + +/// allocates/deallocates shared memory; creates and later destroys a +/// Class object using that memory +template +class Owner +{ +public: + static Owner *New(const char *const id); + template + static Owner *New(const char *const id, const P1 &p1); + template + static Owner *New(const char *const id, const P1 &p1, const P2 &p2); + template + static Owner *New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3); + + ~Owner(); + +private: + Owner(const char *const id, const off_t sharedSize); + + // not implemented + Owner(const Owner &); + Owner &operator =(const Owner &); + + Segment theSegment; ///< shared memory segment that holds the object + Class *theObject; ///< shared object +}; + +template class Pointer; + +/// attaches to a shared memory segment with Class object owned by Owner +template +class Object: public RefCountable +{ +public: + static Pointer Old(const char *const id); + +private: + explicit Object(const char *const id); + + // not implemented + Object(const Object &); + Object &operator =(const Object &); + + Segment theSegment; ///< shared memory segment that holds the object + Class *theObject; ///< shared object + + friend class Pointer; +}; + +/// uses a refcounted pointer to Object as a parent, but +/// translates its API to return raw Class pointers +template +class Pointer: public RefCount< Object > +{ +private: + typedef RefCount< Object > Base; + +public: + explicit Pointer(Object *const anObject): Base(anObject) {} + + Class *operator ->() const { return Base::operator ->()->theObject; } + Class &operator *() const { return *Base::operator *().theObject; } + Class *const getRaw() const { return Base::getRaw()->theObject; } + Class *getRaw() { return Base::getRaw()->theObject; } +}; + +// Owner implementation + +template +Owner::Owner(const char *const id, const off_t sharedSize): + theSegment(id), theObject(NULL) +{ + theSegment.create(sharedSize); + Must(theSegment.mem()); +} + +template +Owner::~Owner() +{ + if (theObject) + theObject->~Class(); +} + +template +Owner * +Owner::New(const char *const id) +{ + const off_t sharedSize = Class::SharedMemorySize(); + Owner *const owner = new Owner(id, sharedSize); + owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class; + return owner; +} + +template template +Owner * +Owner::New(const char *const id, const P1 &p1) +{ + const off_t sharedSize = Class::SharedMemorySize(p1); + Owner *const owner = new Owner(id, sharedSize); + owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class(p1); + return owner; +} + +template template +Owner * +Owner::New(const char *const id, const P1 &p1, const P2 &p2) +{ + const off_t sharedSize = Class::SharedMemorySize(p1, p2); + Owner *const owner = new Owner(id, sharedSize); + owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class(p1, p2); + return owner; +} + +template template +Owner * +Owner::New(const char *const id, const P1 &p1, const P2 &p2, const P3 &p3) +{ + const off_t sharedSize = Class::SharedMemorySize(p1, p2, p3); + Owner *const owner = new Owner(id, sharedSize); + owner->theObject = new (owner->theSegment.reserve(sharedSize)) Class(p1, p2, p3); + return owner; +} + +// Object implementation + +template +Object::Object(const char *const id): theSegment(id) +{ + theSegment.open(); + Must(theSegment.mem()); + theObject = reinterpret_cast(theSegment.mem()); + Must(static_cast(theObject->sharedMemorySize()) == theSegment.size()); +} + +template +Pointer +Object::Old(const char *const id) +{ + return Pointer(new Object(id)); +} + +// convenience macros for creating shared objects +#define shm_new(Class) Ipc::Mem::Owner::New +#define shm_old(Class) Ipc::Mem::Object::Old + +} // namespace Mem + +} // namespace Ipc + +#endif /* SQUID_IPC_MEM_POINTER_H */ diff --git a/src/ipc/mem/Segment.cc b/src/ipc/mem/Segment.cc index b3e80d0598..77be28fb53 100644 --- a/src/ipc/mem/Segment.cc +++ b/src/ipc/mem/Segment.cc @@ -18,7 +18,7 @@ Ipc::Mem::Segment::Segment(const char *const id): theName(GenerateName(id)), theFD(-1), theMem(NULL), - theSize(0), theReserved(0) + theSize(0), theReserved(0), doUnlink(false) { } @@ -28,6 +28,8 @@ Ipc::Mem::Segment::~Segment() { if (close(theFD) != 0) debugs(54, 5, HERE << "close " << theName << ": " << xstrerror()); } + if (doUnlink) + unlink(); } void @@ -52,6 +54,7 @@ Ipc::Mem::Segment::create(const off_t aSize) theSize = aSize; theReserved = 0; + doUnlink = true; debugs(54, 3, HERE << "created " << theName << " segment: " << theSize); @@ -112,6 +115,15 @@ Ipc::Mem::Segment::detach() theMem = 0; } +void +Ipc::Mem::Segment::unlink() +{ + if (shm_unlink(theName.termedBuf()) != 0) + debugs(54, 5, HERE << "shm_unlink(" << theName << "): " << xstrerror()); + else + debugs(54, 3, HERE << "unlinked " << theName << " segment"); +} + void * Ipc::Mem::Segment::reserve(size_t chunkSize) { @@ -125,14 +137,6 @@ Ipc::Mem::Segment::reserve(size_t chunkSize) return result; } -void -Ipc::Mem::Segment::Unlink(const char *const id) -{ - const String path = GenerateName(id); - if (shm_unlink(path.termedBuf()) != 0) - debugs(54, 5, HERE << "shm_unlink(" << path << "): " << xstrerror()); -} - /// determines the size of the underlying "file" off_t Ipc::Mem::Segment::statSize(const char *context) const diff --git a/src/ipc/mem/Segment.h b/src/ipc/mem/Segment.h index 9712144ea1..b45a99fba7 100644 --- a/src/ipc/mem/Segment.h +++ b/src/ipc/mem/Segment.h @@ -20,7 +20,7 @@ public: ~Segment(); /// Create a new shared memory segment. Fails if a segment with - /// the same name already exists. + /// the same name already exists. Unlinks the segment on destruction. void create(const off_t aSize); void open(); ///< Open an existing shared memory segment. @@ -29,20 +29,25 @@ public: void *mem() { return reserve(0); } ///< pointer to the next chunk void *reserve(size_t chunkSize); ///< reserve and return the next chunk - static void Unlink(const char *const id); ///< unlink the segment private: void attach(); void detach(); + void unlink(); ///< unlink the segment off_t statSize(const char *context) const; static String GenerateName(const char *id); + // not implemented + Segment(const Segment &); + Segment &operator =(const Segment &); + const String theName; ///< shared memory segment file name int theFD; ///< shared memory segment file descriptor void *theMem; ///< pointer to mmapped shared memory segment off_t theSize; ///< shared memory segment size off_t theReserved; ///< the total number of reserve()d bytes + bool doUnlink; ///< whether the segment should be unlinked on destruction }; } // namespace Mem diff --git a/src/main.cc b/src/main.cc index b9aa4ba847..248fa6b3b1 100644 --- a/src/main.cc +++ b/src/main.cc @@ -76,7 +76,6 @@ #include "MemPool.h" #include "icmp/IcmpSquid.h" #include "icmp/net_db.h" -#include "fs/rock/RockDirMap.h" #if USE_LOADABLE_MODULES #include "LoadableModules.h"