CBDATA_CLASS_INIT(IpcIoFile);
IpcIoFile::DiskerQueue *IpcIoFile::diskerQueue = NULL;
-IpcIoFile::IpcIoFiles IpcIoFile::ipcIoFiles;
+const double IpcIoFile::Timeout = 7;
+IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
+IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
static bool DiskerOpen(const String &path, int flags, mode_t mode);
static void DiskerClose(const String &path);
IpcIoFile::IpcIoFile(char const *aDb):
- dbName(aDb),
- diskId(-1),
- workerQueue(NULL),
- error_(false),
- lastRequestId(0),
- olderRequests(&requestMap1),
- newerRequests(&requestMap2),
+ dbName(aDb), diskId(-1), workerQueue(NULL), error_(false), lastRequestId(0),
+ olderRequests(&requestMap1), newerRequests(&requestMap2),
timeoutCheckScheduled(false)
{
}
IpcIoFile::~IpcIoFile()
{
+ if (diskId >= 0) {
+ const IpcIoFilesMap::iterator i = IpcIoFiles.find(diskId);
+ // XXX: warn and continue?
+ Must(i != IpcIoFiles.end());
+ Must(i->second == this);
+ IpcIoFiles.erase(i);
+ }
}
void
return;
// XXX: make capacity configurable
- diskerQueue = new DiskerQueue(dbName.termedBuf(), Config.workers, 1024);
- ipcIoFiles.insert(std::make_pair(KidIdentifier, this));
+ diskerQueue =
+ new DiskerQueue(dbName, Config.workers, sizeof(IpcIoMsg), 1024);
+ diskId = KidIdentifier;
+ const bool inserted =
+ IpcIoFiles.insert(std::make_pair(diskId, this)).second;
+ Must(inserted);
Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
ann.strand.tag = dbName;
Ipc::StrandSearchRequest request;
request.requestorId = KidIdentifier;
- request.data = this;
request.tag = dbName;
Ipc::TypedMsgHdr msg;
request.pack(msg);
Ipc::SendMessage(Ipc::coordinatorAddr, msg);
- IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
- trackPendingRequest(*pending);
+ WaitingForOpen.push_back(this);
+
+ eventAdd("IpcIoFile::OpenTimeout", &IpcIoFile::OpenTimeout,
+ this, Timeout, 0, false); // "this" pointer is used as id
}
void
Must(diskId < 0); // we do not know our disker yet
Must(!workerQueue);
- // contain single pending open request at this time
- newerRequests->clear();
- olderRequests->clear();
-
if (!response) {
debugs(79,1, HERE << "error: timeout");
error_ = true;
} else {
diskId = response->strand.kidId;
if (diskId >= 0) {
- workerQueue = DiskerQueue::Attach(dbName.termedBuf(), KidIdentifier - 1);
- ipcIoFiles.insert(std::make_pair(diskId, this));
+ workerQueue = DiskerQueue::Attach(dbName, KidIdentifier - 1);
+ const bool inserted =
+ IpcIoFiles.insert(std::make_pair(diskId, this)).second;
+ Must(inserted);
} else {
error_ = true;
debugs(79,1, HERE << "error: no disker claimed " << dbName);
void
IpcIoFile::trackPendingRequest(IpcIoPendingRequest &pending)
{
- newerRequests->insert(std::make_pair(pending.id, &pending));
+ newerRequests->insert(std::make_pair(lastRequestId, &pending));
if (!timeoutCheckScheduled)
scheduleTimeoutCheck();
}
Must(pending.readRequest || pending.writeRequest);
IpcIoMsg ipcIo;
- ipcIo.requestId = pending.id;
+ ipcIo.requestId = lastRequestId;
if (pending.readRequest) {
ipcIo.command = IpcIo::cmdRead;
ipcIo.offset = pending.readRequest->offset;
IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
{
debugs(47, 7, HERE << "coordinator response to open request");
- Must(response.data);
- reinterpret_cast<IpcIoFile*>(response.data)->openCompleted(&response);
+ for (IpcIoFileList::iterator i = WaitingForOpen.begin();
+ i != WaitingForOpen.end(); ++i) {
+ if (response.strand.tag == (*i)->dbName) {
+ (*i)->openCompleted(&response);
+ WaitingForOpen.erase(i);
+ return;
+ }
+ }
+
+ debugs(47, 4, HERE << "LATE disker response to open for " <<
+ response.strand.tag);
+ // nothing we can do about it; completeIo() has been called already
}
void
DiskerHandleRequests();
else {
const int diskId = msg.getInt();
- const IpcIoFiles::const_iterator i = ipcIoFiles.find(diskId);
- Must(i != ipcIoFiles.end()); // XXX: warn and continue?
+ const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
+ Must(i != IpcIoFiles.end()); // XXX: warn and continue?
i->second->handleResponses();
}
}
+/// handles open request timeout
+void
+IpcIoFile::OpenTimeout(void *const param)
+{
+ Must(param);
+ // the pointer is used for comparison only and not dereferenced
+ const IpcIoFile *const ipcIoFile =
+ reinterpret_cast<const IpcIoFile *>(param);
+ for (IpcIoFileList::iterator i = WaitingForOpen.begin();
+ i != WaitingForOpen.end(); ++i) {
+ if (*i == ipcIoFile) {
+ (*i)->openCompleted(NULL);
+ WaitingForOpen.erase(i);
+ break;
+ }
+ }
+}
+
/// IpcIoFile::checkTimeouts wrapper
void
IpcIoFile::CheckTimeouts(void *const param)
{
Must(param);
- reinterpret_cast<IpcIoFile*>(param)->checkTimeouts();
+ const int diskId = reinterpret_cast<uintptr_t>(param);
+ debugs(47, 7, HERE << "diskId=" << diskId);
+ const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
+ if (i != IpcIoFiles.end())
+ i->second->checkTimeouts();
}
void
for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
IpcIoPendingRequest *const pending = i->second;
- const int requestId = i->first;
+ const unsigned int requestId = i->first;
debugs(47, 7, HERE << "disker timeout; ipcIo" <<
- KidIdentifier << '.' << diskId << '.' << requestId);
+ KidIdentifier << '.' << requestId);
pending->completeIo(NULL); // no response
delete pending; // XXX: leaking if throwing
void
IpcIoFile::scheduleTimeoutCheck()
{
- // we check all older requests at once so some may be wait for 2*timeout
- const double timeout = 7; // in seconds
+ // we check all older requests at once so some may be wait for 2*Timeout
eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
- this, timeout, 0, false);
+ reinterpret_cast<void *>(diskId), Timeout, 0, false);
timeoutCheckScheduled = true;
}
IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
file(aFile), readRequest(NULL), writeRequest(NULL)
{
+ Must(file != NULL);
if (++file->lastRequestId == 0) // don't use zero value as requestId
++file->lastRequestId;
- id = file->lastRequestId;
}
void
IpcIoPendingRequest::completeIo(const IpcIoMsg *const response)
{
- Must(file != NULL);
-
if (readRequest)
file->readCompleted(readRequest, response);
else
#include "DiskIO/IORequestor.h"
#include "ipc/forward.h"
#include "ipc/Queue.h"
+#include <list>
#include <map>
// TODO: expand to all classes
static void Notify(const int peerId);
+ static void OpenTimeout(void *const param);
static void CheckTimeouts(void *const param);
void checkTimeouts();
void scheduleTimeoutCheck();
static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
private:
- typedef FewToOneBiQueue<IpcIoMsg> DiskerQueue;
- typedef OneToOneBiQueue<IpcIoMsg> WorkerQueue;
+ typedef FewToOneBiQueue DiskerQueue;
+ typedef OneToOneBiQueue WorkerQueue;
const String dbName; ///< the name of the file we are managing
int diskId; ///< the process ID of the disker we talk to
RequestMap *newerRequests; ///< newer requests (map2 or map1)
bool timeoutCheckScheduled; ///< we expect a CheckTimeouts() call
- typedef std::map<int, IpcIoFile*> IpcIoFiles; ///< maps diskerId to IpcIoFile
- static IpcIoFiles ipcIoFiles;
+ static const double Timeout; ///< timeout value in seconds
+
+ typedef std::list<Pointer> IpcIoFileList;
+ static IpcIoFileList WaitingForOpen; ///< pending open requests
+
+ ///< maps diskerId to IpcIoFile, cleared in destructor
+ typedef std::map<int, IpcIoFile*> IpcIoFilesMap;
+ static IpcIoFilesMap IpcIoFiles;
CBDATA_CLASS2(IpcIoFile);
};
void completeIo(const IpcIoMsg *const response);
public:
- IpcIoFile::Pointer file; ///< the file object waiting for the response
- unsigned int id; ///< request id
+ const IpcIoFile::Pointer file; ///< the file object waiting for the response
ReadRequest *readRequest; ///< set if this is a read requests
WriteRequest *writeRequest; ///< set if this is a write request
$(BUILT_SOURCES)
squid_LDADD = \
+ $(DISK_LIBS) \
$(COMMON_LIBS) \
comm/libcomm.la \
eui/libeui.la \
$(XTRA_OBJS) \
$(DISK_LINKOBJS) \
$(REPL_OBJS) \
- $(DISK_LIBS) \
$(DISK_OS_LIBS) \
$(CRYPTLIB) \
$(REGEXLIB) \
{
debugs(54, 3, HERE << "tell kid" << request.requestorId << " that " <<
request.tag << " is kid" << strand.kidId);
- const StrandSearchResponse response(request.data, strand);
+ const StrandSearchResponse response(strand);
TypedMsgHdr message;
response.pack(message);
SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
Kids.cc \
Kids.h \
Messages.h \
+ Queue.cc \
Queue.h \
StartListening.cc \
StartListening.h \
#include "Array.h"
#include "ipc/AtomicWord.h"
#include "ipc/SharedMemory.h"
-#include "SquidString.h"
#include "util.h"
-/// Lockless fixed-capacity queue for a single writer and a single
-/// reader. Does not manage shared memory segment.
-template <class Value>
+class String;
+
+/// Lockless fixed-capacity queue for a single writer and a single reader.
class OneToOneUniQueue {
public:
- OneToOneUniQueue(const int aCapacity);
+ class ItemTooLarge {};
+
+ OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity);
+ OneToOneUniQueue(const String &id);
- int size() const { return theSize; }
- int capacity() const { return theCapacity; }
+ unsigned int maxItemSize() const { return shared->theMaxItemSize; }
+ int size() const { return shared->theSize; }
+ int capacity() const { return shared->theCapacity; }
- bool empty() const { return !theSize; }
- bool full() const { return theSize == theCapacity; }
+ bool empty() const { return !shared->theSize; }
+ bool full() const { return shared->theSize == shared->theCapacity; }
+ static int Bytes2Items(const unsigned int maxItemSize, int size);
+ static int Items2Bytes(const unsigned int maxItemSize, const int size);
+
+ template <class Value>
bool pop(Value &value); ///< returns false iff the queue is empty
+ template <class Value>
bool push(const Value &value); ///< returns false iff the queue is full
- static int Bytes2Items(int size);
- static int Items2Bytes(const int size);
-
private:
- unsigned int theIn; ///< input index, used only in push()
- unsigned int theOut; ///< output index, used only in pop()
+ struct Shared {
+ Shared(const unsigned int aMaxItemSize, const int aCapacity);
- AtomicWord theSize; ///< number of items in the queue
- const int theCapacity; ///< maximum number of items, i.e. theBuffer size
- Value theBuffer[];
+ unsigned int theIn; ///< input index, used only in push()
+ unsigned int theOut; ///< output index, used only in pop()
+
+ AtomicWord theSize; ///< number of items in the queue
+ const unsigned int theMaxItemSize; ///< maximum item size
+ const int theCapacity; ///< maximum number of items, i.e. theBuffer size
+
+ char theBuffer[];
+ };
+
+ SharedMemory shm; ///< shared memory segment
+ Shared *shared; ///< pointer to shared memory
};
/// Lockless fixed-capacity bidirectional queue for two processes.
-/// Manages shared memory segment.
-template <class Value>
class OneToOneBiQueue {
public:
- typedef OneToOneUniQueue<Value> UniQueue;
-
/// Create a new shared queue.
- OneToOneBiQueue(const char *const id, const int aCapacity);
- OneToOneBiQueue(const char *const id); ///< Attach to existing shared queue.
+ OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity);
+ OneToOneBiQueue(const String &id); ///< Attach to existing shared queue.
int pushedSize() const { return pushQueue->size(); }
+ template <class Value>
bool pop(Value &value) { return popQueue->pop(value); }
+ template <class Value>
bool push(const Value &value) { return pushQueue->push(value); }
private:
- SharedMemory shm; ///< shared memory segment
- UniQueue *popQueue; ///< queue to pop from for this process
- UniQueue *pushQueue; ///< queue to push to for this process
+ OneToOneUniQueue *const popQueue; ///< queue to pop from for this process
+ OneToOneUniQueue *const pushQueue; ///< queue to push to for this process
};
/**
* pricesses. Implements a star topology: Many worker processes
* communicate with the one central process. The central process uses
* FewToOneBiQueue object, while workers use OneToOneBiQueue objects
- * created with the Attach() method. Each worker has a unique integer ID
- * in [0, workerCount) range.
+ * created with the Attach() method. Each worker has a unique integer
+ * ID in [0, workerCount) range.
*/
-template <class Value>
class FewToOneBiQueue {
public:
- typedef OneToOneBiQueue<Value> BiQueue;
-
- FewToOneBiQueue(const char *const id, const int aWorkerCount, const int aCapacity);
- static BiQueue *Attach(const char *const id, const int workerId);
+ FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity);
+ static OneToOneBiQueue *Attach(const String &id, const int workerId);
~FewToOneBiQueue();
bool validWorkerId(const int workerId) const;
int workerCount() const { return theWorkerCount; }
int pushedSize(const int workerId) const;
+ template <class Value>
bool pop(int &workerId, Value &value); ///< returns false iff the queue is empty
+ template <class Value>
bool push(const int workerId, const Value &value); ///< returns false iff the queue is full
private:
- static String BiQueueId(String id, const int workerId);
-
int theLastPopWorkerId; ///< the last worker ID we pop()ed from
- Vector<BiQueue *> biQueues; ///< worker queues
+ Vector<OneToOneBiQueue *> biQueues; ///< worker queues
const int theWorkerCount; ///< number of worker processes
- const int theCapacity; ///< per-worker capacity
};
// OneToOneUniQueue
-template <class Value>
-OneToOneUniQueue<Value>::OneToOneUniQueue(const int aCapacity):
- theIn(0), theOut(0), theSize(0), theCapacity(aCapacity)
-{
- assert(theCapacity > 0);
-}
-
template <class Value>
bool
-OneToOneUniQueue<Value>::pop(Value &value)
+OneToOneUniQueue::pop(Value &value)
{
+ if (sizeof(value) > shared->theMaxItemSize)
+ throw ItemTooLarge();
+
if (empty())
return false;
- const unsigned int pos = theOut++ % theCapacity;
- value = theBuffer[pos];
- --theSize;
+ const unsigned int pos =
+ shared->theOut++ % shared->theCapacity * shared->theMaxItemSize;
+ memcpy(&value, shared->theBuffer + pos, sizeof(value));
+ --shared->theSize;
return true;
}
template <class Value>
bool
-OneToOneUniQueue<Value>::push(const Value &value)
+OneToOneUniQueue::push(const Value &value)
{
+ if (sizeof(value) > shared->theMaxItemSize)
+ throw ItemTooLarge();
+
if (full())
return false;
- const unsigned int pos = theIn++ % theCapacity;
- theBuffer[pos] = value;
- ++theSize;
+ const unsigned int pos =
+ shared->theIn++ % shared->theCapacity * shared->theMaxItemSize;
+ memcpy(shared->theBuffer + pos, &value, sizeof(value));
+ ++shared->theSize;
return true;
}
-template <class Value>
-int
-OneToOneUniQueue<Value>::Bytes2Items(int size)
-{
- assert(size >= 0);
- size -= sizeof(OneToOneUniQueue);
- return size >= 0 ? size / sizeof(Value) : 0;
-}
-
-template <class Value>
-int
-OneToOneUniQueue<Value>::Items2Bytes(const int size)
-{
- assert(size >= 0);
- return sizeof(OneToOneUniQueue) + sizeof(Value) * size;
-}
-
-
-// OneToOneBiQueue
-
-template <class Value>
-OneToOneBiQueue<Value>::OneToOneBiQueue(const char *const id, const int capacity) :
- shm(id)
-{
- const int uniSize = UniQueue::Items2Bytes(capacity);
- shm.create(uniSize * 2);
- char *const mem = reinterpret_cast<char *>(shm.mem());
- assert(mem);
- popQueue = new (mem) UniQueue(capacity);
- pushQueue = new (mem + uniSize) UniQueue(capacity);
-}
-
-template <class Value>
-OneToOneBiQueue<Value>::OneToOneBiQueue(const char *const id) :
- shm(id)
-{
- shm.open();
- char *const mem = reinterpret_cast<char *>(shm.mem());
- assert(mem);
- pushQueue = reinterpret_cast<UniQueue *>(mem);
- const int uniSize = pushQueue->Items2Bytes(pushQueue->capacity());
- popQueue = reinterpret_cast<UniQueue *>(mem + uniSize);
-}
// FewToOneBiQueue
-template <class Value>
-FewToOneBiQueue<Value>::FewToOneBiQueue(const char *const id, const int aWorkerCount, const int aCapacity):
- theLastPopWorkerId(-1), theWorkerCount(aWorkerCount),
- theCapacity(aCapacity)
-{
- biQueues.reserve(theWorkerCount);
- for (int i = 0; i < theWorkerCount; ++i) {
- const String biQueueId = BiQueueId(id, i);
- biQueues.push_back(new BiQueue(biQueueId.termedBuf(), theCapacity));
- }
-}
-
-template <class Value>
-typename FewToOneBiQueue<Value>::BiQueue *
-FewToOneBiQueue<Value>::Attach(const char *const id, const int workerId)
-{
- return new BiQueue(BiQueueId(id, workerId).termedBuf());
-}
-
-template <class Value>
-FewToOneBiQueue<Value>::~FewToOneBiQueue()
-{
- for (int i = 0; i < theWorkerCount; ++i)
- delete biQueues[i];
-}
-
-template <class Value>
-bool FewToOneBiQueue<Value>::validWorkerId(const int workerId) const
-{
- return 0 <= workerId && workerId < theWorkerCount;
-}
-
-template <class Value>
-int FewToOneBiQueue<Value>::pushedSize(const int workerId) const
-{
- assert(validWorkerId(workerId));
- return biQueues[workerId]->pushedSize();
-}
-
template <class Value>
bool
-FewToOneBiQueue<Value>::pop(int &workerId, Value &value)
+FewToOneBiQueue::pop(int &workerId, Value &value)
{
++theLastPopWorkerId;
for (int i = 0; i < theWorkerCount; ++i) {
template <class Value>
bool
-FewToOneBiQueue<Value>::push(const int workerId, const Value &value)
+FewToOneBiQueue::push(const int workerId, const Value &value)
{
assert(validWorkerId(workerId));
return biQueues[workerId]->push(value);
}
-template <class Value>
-String
-FewToOneBiQueue<Value>::BiQueueId(String id, const int workerId)
-{
- id.append("__");
- id.append(xitoa(workerId));
- return id;
-}
-
#endif // SQUID_IPC_QUEUE_H
#include "ipc/TypedMsgHdr.h"
-Ipc::StrandSearchRequest::StrandSearchRequest(): requestorId(-1), data(0)
+Ipc::StrandSearchRequest::StrandSearchRequest(): requestorId(-1)
{
}
Ipc::StrandSearchRequest::StrandSearchRequest(const TypedMsgHdr &hdrMsg):
- requestorId(-1), data(NULL)
+ requestorId(-1)
{
hdrMsg.checkType(mtStrandSearchRequest);
hdrMsg.getPod(requestorId);
- hdrMsg.getPod(data);
hdrMsg.getString(tag);
}
{
hdrMsg.setType(mtStrandSearchRequest);
hdrMsg.putPod(requestorId);
- hdrMsg.putPod(data);
hdrMsg.putString(tag);
}
/* StrandSearchResponse */
-Ipc::StrandSearchResponse::StrandSearchResponse(void *const aData,
- const Ipc::StrandCoord &aStrand):
- data(aData), strand(aStrand)
+Ipc::StrandSearchResponse::StrandSearchResponse(const Ipc::StrandCoord &aStrand):
+ strand(aStrand)
{
}
-Ipc::StrandSearchResponse::StrandSearchResponse(const TypedMsgHdr &hdrMsg):
- data(NULL)
+Ipc::StrandSearchResponse::StrandSearchResponse(const TypedMsgHdr &hdrMsg)
{
hdrMsg.checkType(mtStrandSearchResponse);
- hdrMsg.getPod(data);
strand.unpack(hdrMsg);
}
void Ipc::StrandSearchResponse::pack(TypedMsgHdr &hdrMsg) const
{
hdrMsg.setType(mtStrandSearchResponse);
- hdrMsg.putPod(data);
strand.pack(hdrMsg);
}
public:
int requestorId; ///< sender-provided return address
- void *data; ///< sender-provided opaque pointer
String tag; ///< set when looking for a matching StrandCoord::tag
};
class StrandSearchResponse
{
public:
- StrandSearchResponse(void *const aData, const StrandCoord &strand);
+ StrandSearchResponse(const StrandCoord &strand);
explicit StrandSearchResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
public:
- void *data; ///< a copy of the StrandSearchRequest::data
StrandCoord strand; ///< answer matching StrandSearchRequest criteria
};