CBDATA_CLASS_INIT(IpcIoFile);
IpcIoFile::DiskerQueue *IpcIoFile::diskerQueue = NULL;
-const double IpcIoFile::Timeout = 7;
+const double IpcIoFile::Timeout = 7; // seconds; XXX: ALL,9 may require more
IpcIoFile::IpcIoFileList IpcIoFile::WaitingForOpen;
IpcIoFile::IpcIoFilesMap IpcIoFile::IpcIoFiles;
static bool DiskerOpen(const String &path, int flags, mode_t mode);
static void DiskerClose(const String &path);
+/// IpcIo wrapper for debugs() streams; XXX: find a better class name
+struct SipcIo {
+ SipcIo(int aWorker, const IpcIoMsg &aMsg, int aDisker):
+ worker(aWorker), msg(aMsg), disker(aDisker) {}
+
+ int worker;
+ const IpcIoMsg &msg;
+ int disker;
+};
+
+std::ostream &
+operator <<(std::ostream &os, const SipcIo &sio)
+{
+ return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
+ (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
+}
+
IpcIoFile::IpcIoFile(char const *aDb):
dbName(aDb), diskId(-1), workerQueue(NULL), error_(false), lastRequestId(0),
} else {
diskId = response->strand.kidId;
if (diskId >= 0) {
+ // XXX: Remove this +-1 math! FewToOneBiQueue API must use kid IDs.
workerQueue = DiskerQueue::Attach(dbName, KidIdentifier - 1);
const bool inserted =
IpcIoFiles.insert(std::make_pair(diskId, this)).second;
void
IpcIoFile::push(IpcIoPendingRequest *const pending)
{
+ // prevent queue overflows: check for responses to earlier requests
+ handleResponses("before push");
+
debugs(47, 7, HERE);
Must(diskId >= 0);
Must(workerQueue);
memcpy(ipcIo.buf, pending->writeRequest->buf, ipcIo.len); // optimize away
}
+ debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->pushQueue->size());
+
try {
if (workerQueue->push(ipcIo))
- Notify(diskId); // notify disker
+ Notify(diskId); // must notify disker
trackPendingRequest(pending);
} catch (const WorkerQueue::Full &) {
- // XXX: grow queue size?
- pending->completeIo(NULL);
+ debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " <<
+ SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
+ // TODO: grow queue size
+
+ pending->completeIo(NULL); // XXX: should distinguish this from timeout
delete pending;
}
}
}
void
-IpcIoFile::handleResponses()
+IpcIoFile::handleNotification()
{
+ debugs(47, 4, HERE << "notified");
+ workerQueue->clearReaderSignal();
+ handleResponses("after notification");
+}
+
+void
+IpcIoFile::handleResponses(const char *when)
+{
+ debugs(47, 4, HERE << "popping all " << when);
Must(workerQueue);
- try {
- while (true) {
- IpcIoMsg ipcIo;
- workerQueue->pop(ipcIo); // XXX: notify disker?
- handleResponse(ipcIo);
- }
- } catch (const WorkerQueue::Empty &) {}
+ IpcIoMsg ipcIo;
+ // get all responses we can: since we are not pushing, this will stop
+ while (workerQueue->pop(ipcIo))
+ handleResponse(ipcIo);
}
void
IpcIoFile::handleResponse(const IpcIoMsg &ipcIo)
{
const int requestId = ipcIo.requestId;
- debugs(47, 7, HERE << "disker response to " << ipcIo.command <<
- "; ipcIo" << KidIdentifier << '.' << requestId);
+ debugs(47, 7, HERE << "popped disker response: " <<
+ SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->popQueue->size());
+
Must(requestId);
if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
pending->completeIo(&ipcIo);
void
IpcIoFile::Notify(const int peerId)
{
+ // TODO: Count and report the total number of notifications, pops, pushes.
debugs(47, 7, HERE << "kid" << peerId);
Ipc::TypedMsgHdr msg;
msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
void
IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
{
- debugs(47, 7, HERE);
- if (IamDiskProcess())
- DiskerHandleRequests();
- else {
- const int diskId = msg.getInt();
+ const int from = msg.getInt();
+ debugs(47, 7, HERE << "from " << from);
+ if (IamDiskProcess()) {
+ const int workerId = from;
+ DiskerHandleRequests(workerId - 1);
+ } else {
+ const int diskId = from;
const IpcIoFilesMap::const_iterator i = IpcIoFiles.find(diskId);
- Must(i != IpcIoFiles.end()); // XXX: warn and continue?
- i->second->handleResponses();
+ Must(i != IpcIoFiles.end()); // TODO: warn but continue
+ i->second->handleNotification();
}
}
}
void
-IpcIoFile::DiskerHandleRequests()
+IpcIoFile::DiskerHandleRequests(const int workerWhoNotified)
{
Must(diskerQueue);
- try {
- while (true) {
- int workerId;
- IpcIoMsg ipcIo;
- diskerQueue->pop(workerId, ipcIo); // XXX: notify worker?
- DiskerHandleRequest(workerId, ipcIo);
- }
- } catch (const DiskerQueue::Empty &) {}
+ diskerQueue->clearReaderSignal(workerWhoNotified);
+
+ int workerId = 0;
+ IpcIoMsg ipcIo;
+ while (diskerQueue->pop(workerId, ipcIo))
+ DiskerHandleRequest(workerId, ipcIo);
+
+ // TODO: If the loop keeps on looping, we probably should take a break
+ // once in a while to update clock, read Coordinator messages, etc.
+ // This can be combined with "elevator" optimization where we get up to N
+ // requests first, then reorder the popped requests to optimize seek time,
+ // then do I/O, then take a break, and come back for the next set of I/O
+ // requests.
}
/// called when disker receives an I/O request
else // ipcIo.command == IpcIo::cmdWrite
diskerWrite(ipcIo);
+ debugs(47, 7, HERE << "pushing " << SipcIo(workerId+1, ipcIo, KidIdentifier) << " at " << diskerQueue->biQueues[workerId]->pushQueue->size());
+
try {
if (diskerQueue->push(workerId, ipcIo))
- Notify(workerId + 1); // notify worker
+ Notify(workerId + 1); // must notify worker
} catch (const DiskerQueue::Full &) {
- // XXX: grow queue size?
+ // The worker queue should not overflow because the worker should pop()
+ // before push()ing and because if disker pops N requests at a time,
+ // we should make sure the worker pop() queue length is the worker
+ // push queue length plus N+1. XXX: implement the N+1 difference.
+ debugs(47, DBG_IMPORTANT, "BUG: Worker I/O pop queue overflow: " <<
+ SipcIo(workerId+1, ipcIo, KidIdentifier)); // TODO: report queue len
+
+ // the I/O request we could not push will timeout
}
}
void checkTimeouts();
void scheduleTimeoutCheck();
- void handleResponses();
+ void handleNotification();
+ void handleResponses(const char *when);
void handleResponse(const IpcIoMsg &ipcIo);
- static void DiskerHandleRequests();
+ static void DiskerHandleRequests(const int workerId);
static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
private:
*/
#include "config.h"
+#include "base/TextException.h"
+#include "Debug.h"
+#include "globals.h"
#include "ipc/Queue.h"
-
+/// constructs shared segment ID from parent queue ID and child queue index
static String
QueueId(String id, const int idx)
{
return id;
}
+/// constructs QueueReader ID from parent queue ID
+static String
+ReaderId(String id)
+{
+ id.append("__readers");
+ return id;
+}
+
+
+/* QueueReader */
+
+InstanceIdDefinitions(QueueReader, "ipcQR");
+
+QueueReader::QueueReader(): popBlocked(1), popSignal(0)
+{
+ debugs(54, 7, HERE << "constructed " << id);
+}
+
// OneToOneUniQueue
OneToOneUniQueue::OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity):
- shm(id.termedBuf())
+ shm(id.termedBuf()), reader_(NULL)
{
- shm.create(Items2Bytes(maxItemSize, capacity));
- assert(shm.mem());
- shared = new (shm.mem()) Shared(maxItemSize, capacity);
+ const int sharedSize = Items2Bytes(maxItemSize, capacity);
+ shm.create(sharedSize);
+ shared = new (shm.reserve(sharedSize)) Shared(maxItemSize, capacity);
}
-OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf())
+OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf()),
+ reader_(NULL)
{
shm.open();
shared = reinterpret_cast<Shared *>(shm.mem());
assert(shared);
}
+void
+OneToOneUniQueue::reader(QueueReader *aReader)
+{
+ Must(!reader_ && aReader);
+ reader_ = aReader;
+}
+
int
OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
{
{
}
+QueueReader &
+OneToOneUniQueue::reader()
+{
+ Must(reader_);
+ return *reader_;
+}
+
// OneToOneBiQueue
{
}
+void
+OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2)
+{
+ popQueue->reader(r1);
+ pushQueue->reader(r2);
+}
+
+void
+OneToOneBiQueue::clearReaderSignal()
+{
+ debugs(54, 7, HERE << "reader: " << &popQueue->reader());
+ popQueue->reader().clearSignal();
+}
+
// FewToOneBiQueue
FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity):
- theLastPopWorkerId(-1), theWorkerCount(aWorkerCount)
+ theLastPopWorker(0), theWorkerCount(aWorkerCount),
+ shm(ReaderId(id).termedBuf()),
+ reader(NULL)
{
+ // create a new segment for the local and remote queue readers
+ // TODO: all our queues and readers should use a single segment
+ shm.create((theWorkerCount+1)*sizeof(QueueReader));
+ reader = new (shm.reserve(sizeof(QueueReader))) QueueReader;
+ debugs(54, 7, HERE << "disker " << id << " reader: " << reader->id);
+
assert(theWorkerCount >= 0);
biQueues.reserve(theWorkerCount);
for (int i = 0; i < theWorkerCount; ++i) {
OneToOneBiQueue *const biQueue =
new OneToOneBiQueue(QueueId(id, i), maxItemSize, capacity);
+ QueueReader *remoteReader =
+ new (shm.reserve(sizeof(QueueReader))) QueueReader;
+ biQueue->readers(reader, remoteReader);
biQueues.push_back(biQueue);
}
}
OneToOneBiQueue *
FewToOneBiQueue::Attach(const String &id, const int workerId)
{
- return new OneToOneBiQueue(QueueId(id, workerId));
+ // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker?
+ Ipc::Mem::Segment *shmPtr = new Ipc::Mem::Segment(ReaderId(id).termedBuf());
+
+ Ipc::Mem::Segment &shm = *shmPtr;
+ shm.open();
+ assert(shm.size() >= static_cast<off_t>((1 + workerId+1)*sizeof(QueueReader)));
+ QueueReader *readers = reinterpret_cast<QueueReader*>(shm.mem());
+ QueueReader *remoteReader = &readers[0];
+ debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id);
+ QueueReader *localReader = &readers[workerId+1];
+ debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id);
+
+ OneToOneBiQueue *const biQueue =
+ new OneToOneBiQueue(QueueId(id, workerId));
+ biQueue->readers(localReader, remoteReader);
+ return biQueue;
}
FewToOneBiQueue::~FewToOneBiQueue()
{
return 0 <= workerId && workerId < theWorkerCount;
}
+
+void
+FewToOneBiQueue::clearReaderSignal(int workerId)
+{
+ debugs(54, 7, HERE << "reader: " << reader->id);
+
+ assert(validWorkerId(workerId));
+ reader->clearSignal();
+
+ // we got a hint; we could reposition iteration to try popping from the
+ // workerId queue first; but it does not seem to help much and might
+ // introduce some bias so we do not do that for now:
+ // theLastPopWorker = (workerId + theWorkerCount - 1) % theWorkerCount;
+}
#define SQUID_IPC_QUEUE_H
#include "Array.h"
+#include "base/InstanceId.h"
#include "ipc/AtomicWord.h"
#include "ipc/mem/Segment.h"
#include "util.h"
class String;
-/// Lockless fixed-capacity queue for a single writer and a single reader.
+/// State of the reading end of a queue (i.e., of the code calling pop()).
+/// Multiple queues attached to one reader share this state.
+class QueueReader {
+public:
+ QueueReader(); // the initial state is "blocked without a signal"
+
+ /// whether the reader is waiting for a notification signal
+ bool blocked() const { return popBlocked == 1; }
+
+ /// marks the reader as blocked, waiting for a notification signal
+ void block() { popBlocked.swap_if(0, 1); }
+
+ /// removes the block() effects
+ void unblock() { popBlocked.swap_if(1, 0); }
+
+ /// if reader is blocked and not notified, marks the notification signal
+ /// as sent and not received, returning true; otherwise, returns false
+ bool raiseSignal() { return blocked() && popSignal.swap_if(0,1); }
+
+ /// marks sent reader notification as received (also removes pop blocking)
+ void clearSignal() { unblock(); popSignal.swap_if(1,0); }
+
+private:
+ AtomicWord popBlocked; ///< whether the reader is blocked on pop()
+ AtomicWord popSignal; ///< whether writer has sent and reader has not received notification
+
+public:
+ /// unique ID for debugging which reader is used (works across processes)
+ const InstanceId<QueueReader> id;
+};
+
+
+/**
+ * Lockless fixed-capacity queue for a single writer and a single reader.
+ *
+ * If the queue is empty, the reader is considered "blocked" and needs
+ * an out-of-band notification message to notice the next pushed item.
+ *
+ * Current implementation assumes that the writer cannot get blocked: if the
+ * queue is full, the writer will just not push and come back later (with a
+ * different value). We can add support for blocked writers if needed.
+ */
class OneToOneUniQueue {
public:
- class Empty {};
+ // pop() and push() exceptions; TODO: use TextException instead
class Full {};
class ItemTooLarge {};
static int Bytes2Items(const unsigned int maxItemSize, int size);
static int Items2Bytes(const unsigned int maxItemSize, const int size);
- template <class Value>
- bool pop(Value &value); ///< returns true iff the queue was full
- template <class Value>
- bool push(const Value &value); ///< returns true iff the queue was empty
+ /// returns true iff the value was set; [un]blocks the reader as needed
+ template<class Value> bool pop(Value &value);
+
+ /// returns true iff the caller must notify the reader of the pushed item
+ template<class Value> bool push(const Value &value);
+
+ QueueReader &reader();
+ void reader(QueueReader *aReader);
private:
struct Shared {
Ipc::Mem::Segment shm; ///< shared memory segment
Shared *shared; ///< pointer to shared memory
+ QueueReader *reader_; ///< the state of the code popping from this queue
};
/// Lockless fixed-capacity bidirectional queue for two processes.
class OneToOneBiQueue {
public:
- typedef OneToOneUniQueue::Empty Empty;
typedef OneToOneUniQueue::Full Full;
typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity);
OneToOneBiQueue(const String &id); ///< Attach to existing shared queue.
- template <class Value>
- bool pop(Value &value) { return popQueue->pop(value); }
- template <class Value>
- bool push(const Value &value) { return pushQueue->push(value); }
+ void readers(QueueReader *r1, QueueReader *r2);
+ void clearReaderSignal();
-private:
+ /* wrappers to call the right OneToOneUniQueue method for this process */
+ template<class Value> bool pop(Value &value) { return popQueue->pop(value); }
+ template<class Value> bool push(const Value &value) { return pushQueue->push(value); }
+
+//private:
OneToOneUniQueue *const popQueue; ///< queue to pop from for this process
OneToOneUniQueue *const pushQueue; ///< queue to push to for this process
};
*/
class FewToOneBiQueue {
public:
- typedef OneToOneBiQueue::Empty Empty;
typedef OneToOneBiQueue::Full Full;
typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge;
bool validWorkerId(const int workerId) const;
int workerCount() const { return theWorkerCount; }
- template <class Value>
- bool pop(int &workerId, Value &value); ///< returns false iff the queue was full
- template <class Value>
- bool push(const int workerId, const Value &value); ///< returns false iff the queue was empty
+ /// clears the reader notification received by the disker from worker
+ void clearReaderSignal(int workerId);
-private:
- int theLastPopWorkerId; ///< the last worker ID we pop()ed from
- Vector<OneToOneBiQueue *> biQueues; ///< worker queues
- const int theWorkerCount; ///< number of worker processes
+ /// picks a worker and calls OneToOneUniQueue::pop() using its queue
+ template <class Value> bool pop(int &workerId, Value &value);
+
+ /// calls OneToOneUniQueue::push() using the given worker queue
+ template <class Value> bool push(const int workerId, const Value &value);
+
+//private: XXX: make private by moving pop/push debugging into pop/push
+ int theLastPopWorker; ///< the ID of the last worker we tried to pop() from
+ Vector<OneToOneBiQueue *> biQueues; ///< worker queues indexed by worker ID
+ const int theWorkerCount; ///< the total number of workers
+
+ Ipc::Mem::Segment shm; ///< shared memory segment to store the reader
+ QueueReader *reader; ///< the state of the code popping from all biQueues
};
if (sizeof(value) > shared->theMaxItemSize)
throw ItemTooLarge();
- if (empty())
- throw Empty();
+ // A writer might push between the empty test and block() below, so we do
+ // not return false right after calling block(), but test again.
+ if (empty()) {
+ reader().block();
+ // A writer might push between the empty test and block() below,
+ // so we must test again as such a writer will not signal us.
+ if (empty())
+ return false;
+ }
- const bool wasFull = full();
+ reader().unblock();
const unsigned int pos =
- shared->theOut++ % shared->theCapacity * shared->theMaxItemSize;
+ (shared->theOut++ % shared->theCapacity) * shared->theMaxItemSize;
memcpy(&value, shared->theBuffer + pos, sizeof(value));
--shared->theSize;
- return wasFull;
+
+ return true;
}
template <class Value>
shared->theIn++ % shared->theCapacity * shared->theMaxItemSize;
memcpy(shared->theBuffer + pos, &value, sizeof(value));
++shared->theSize;
- return wasEmpty;
+
+ return wasEmpty && reader().raiseSignal();
}
bool
FewToOneBiQueue::pop(int &workerId, Value &value)
{
- ++theLastPopWorkerId;
+ // iterate all workers, starting after the one we visited last
for (int i = 0; i < theWorkerCount; ++i) {
- theLastPopWorkerId = (theLastPopWorkerId + 1) % theWorkerCount;
- try {
- const bool wasFull = biQueues[theLastPopWorkerId]->pop(value);
- workerId = theLastPopWorkerId;
- return wasFull;
- } catch (const Empty &) {}
+ theLastPopWorker = (theLastPopWorker + 1) % theWorkerCount;
+ if (biQueues[theLastPopWorker]->pop(value)) {
+ workerId = theLastPopWorker;
+ return true;
+ }
}
- throw Empty();
+ return false; // no worker had anything to pop
}
template <class Value>