IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
pending->readRequest = readRequest;
- push(*pending);
+ push(pending);
}
void
IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
pending->writeRequest = writeRequest;
- push(*pending);
+ push(pending);
}
void
/// track a new pending request
void
-IpcIoFile::trackPendingRequest(IpcIoPendingRequest &pending)
+IpcIoFile::trackPendingRequest(IpcIoPendingRequest *const pending)
{
- newerRequests->insert(std::make_pair(lastRequestId, &pending));
+ newerRequests->insert(std::make_pair(lastRequestId, pending));
if (!timeoutCheckScheduled)
scheduleTimeoutCheck();
}
/// push an I/O request to disker
void
-IpcIoFile::push(IpcIoPendingRequest &pending)
+IpcIoFile::push(IpcIoPendingRequest *const pending)
{
debugs(47, 7, HERE);
Must(diskId >= 0);
Must(workerQueue);
- Must(pending.readRequest || pending.writeRequest);
+ Must(pending);
+ Must(pending->readRequest || pending->writeRequest);
IpcIoMsg ipcIo;
ipcIo.requestId = lastRequestId;
- if (pending.readRequest) {
+ if (pending->readRequest) {
ipcIo.command = IpcIo::cmdRead;
- ipcIo.offset = pending.readRequest->offset;
- ipcIo.len = pending.readRequest->len;
+ ipcIo.offset = pending->readRequest->offset;
+ ipcIo.len = pending->readRequest->len;
assert(ipcIo.len <= sizeof(ipcIo.buf));
- memcpy(ipcIo.buf, pending.readRequest->buf, ipcIo.len); // optimize away
- } else { // pending.writeRequest
+ memcpy(ipcIo.buf, pending->readRequest->buf, ipcIo.len); // optimize away
+ } else { // pending->writeRequest
ipcIo.command = IpcIo::cmdWrite;
- ipcIo.offset = pending.writeRequest->offset;
- ipcIo.len = pending.writeRequest->len;
+ ipcIo.offset = pending->writeRequest->offset;
+ ipcIo.len = pending->writeRequest->len;
assert(ipcIo.len <= sizeof(ipcIo.buf));
- memcpy(ipcIo.buf, pending.writeRequest->buf, ipcIo.len); // optimize away
+ memcpy(ipcIo.buf, pending->writeRequest->buf, ipcIo.len); // optimize away
}
- if (!workerQueue->push(ipcIo)) {
- pending.completeIo(NULL);
- return;
+ try {
+ if (workerQueue->push(ipcIo))
+ Notify(diskId); // notify disker
+ trackPendingRequest(pending);
+ } catch (const WorkerQueue::Full &) {
+ // XXX: grow queue size?
+ pending->completeIo(NULL);
+ delete pending;
}
-
- if (workerQueue->pushedSize() == 1)
- Notify(diskId); // notify disker
-
- trackPendingRequest(pending);
}
/// called when coordinator responds to worker open request
IpcIoFile::handleResponses()
{
Must(workerQueue);
- IpcIoMsg ipcIo;
- while (workerQueue->pop(ipcIo))
- handleResponse(ipcIo);
+ try {
+ while (true) {
+ IpcIoMsg ipcIo;
+ workerQueue->pop(ipcIo); // XXX: notify disker?
+ handleResponse(ipcIo);
+ }
+ } catch (const WorkerQueue::Empty &) {}
}
void
IpcIoFile::DiskerHandleRequests()
{
Must(diskerQueue);
- int workerId;
- IpcIoMsg ipcIo;
- while (diskerQueue->pop(workerId, ipcIo))
- DiskerHandleRequest(workerId, ipcIo);
+ try {
+ while (true) {
+ int workerId;
+ IpcIoMsg ipcIo;
+ diskerQueue->pop(workerId, ipcIo); // XXX: notify worker?
+ DiskerHandleRequest(workerId, ipcIo);
+ }
+ } catch (const DiskerQueue::Empty &) {}
}
/// called when disker receives an I/O request
else // ipcIo.command == IpcIo::cmdWrite
diskerWrite(ipcIo);
- diskerQueue->push(workerId, ipcIo); // XXX: report warning
-
- if (diskerQueue->pushedSize(workerId) == 1)
- Notify(workerId + 1); // notify worker
+ try {
+ if (diskerQueue->push(workerId, ipcIo))
+ Notify(workerId + 1); // notify worker
+ } catch (const DiskerQueue::Full &) {
+ // XXX: grow queue size?
+ }
}
static bool
/// Lockless fixed-capacity queue for a single writer and a single reader.
class OneToOneUniQueue {
public:
+ class Empty {};
+ class Full {};
class ItemTooLarge {};
OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity);
static int Items2Bytes(const unsigned int maxItemSize, const int size);
template <class Value>
- bool pop(Value &value); ///< returns false iff the queue is empty
+ bool pop(Value &value); ///< returns true iff the queue was full
template <class Value>
- bool push(const Value &value); ///< returns false iff the queue is full
+ bool push(const Value &value); ///< returns true iff the queue was empty
private:
struct Shared {
/// Lockless fixed-capacity bidirectional queue for two processes.
class OneToOneBiQueue {
public:
+ typedef OneToOneUniQueue::Empty Empty;
+ typedef OneToOneUniQueue::Full Full;
+ typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge;
+
/// Create a new shared queue.
OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity);
OneToOneBiQueue(const String &id); ///< Attach to existing shared queue.
- int pushedSize() const { return pushQueue->size(); }
-
template <class Value>
bool pop(Value &value) { return popQueue->pop(value); }
template <class Value>
*/
class FewToOneBiQueue {
public:
+ typedef OneToOneBiQueue::Empty Empty;
+ typedef OneToOneBiQueue::Full Full;
+ typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge;
+
FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity);
static OneToOneBiQueue *Attach(const String &id, const int workerId);
~FewToOneBiQueue();
bool validWorkerId(const int workerId) const;
int workerCount() const { return theWorkerCount; }
- int pushedSize(const int workerId) const;
template <class Value>
- bool pop(int &workerId, Value &value); ///< returns false iff the queue is empty
+ bool pop(int &workerId, Value &value); ///< returns false iff the queue was full
template <class Value>
- bool push(const int workerId, const Value &value); ///< returns false iff the queue is full
+ bool push(const int workerId, const Value &value); ///< returns false iff the queue was empty
private:
int theLastPopWorkerId; ///< the last worker ID we pop()ed from
throw ItemTooLarge();
if (empty())
- return false;
+ throw Empty();
+ const bool wasFull = full();
const unsigned int pos =
shared->theOut++ % shared->theCapacity * shared->theMaxItemSize;
memcpy(&value, shared->theBuffer + pos, sizeof(value));
--shared->theSize;
- return true;
+ return wasFull;
}
template <class Value>
throw ItemTooLarge();
if (full())
- return false;
+ throw Full();
+ const bool wasEmpty = empty();
const unsigned int pos =
shared->theIn++ % shared->theCapacity * shared->theMaxItemSize;
memcpy(shared->theBuffer + pos, &value, sizeof(value));
++shared->theSize;
- return true;
+ return wasEmpty;
}
++theLastPopWorkerId;
for (int i = 0; i < theWorkerCount; ++i) {
theLastPopWorkerId = (theLastPopWorkerId + 1) % theWorkerCount;
- if (biQueues[theLastPopWorkerId]->pop(value)) {
+ try {
+ const bool wasFull = biQueues[theLastPopWorkerId]->pop(value);
workerId = theLastPopWorkerId;
- return true;
- }
+ return wasFull;
+ } catch (const Empty &) {}
}
- return false;
+ throw Empty();
}
template <class Value>