#include "DiskIO/WriteRequest.h"
#include "ipc/Messages.h"
#include "ipc/Port.h"
-#include "ipc/StrandCoord.h"
+#include "ipc/StrandSearch.h"
#include "ipc/UdsOp.h"
CBDATA_CLASS_INIT(IpcIoFile);
-IpcIoFile::RequestMap IpcIoFile::TheRequestMap1;
-IpcIoFile::RequestMap IpcIoFile::TheRequestMap2;
-IpcIoFile::RequestMap *IpcIoFile::TheOlderRequests = &IpcIoFile::TheRequestMap1;
-IpcIoFile::RequestMap *IpcIoFile::TheNewerRequests = &IpcIoFile::TheRequestMap2;
-bool IpcIoFile::TimeoutCheckScheduled = false;
-unsigned int IpcIoFile::LastRequestId = 0;
+IpcIoFile::DiskerQueue *IpcIoFile::diskerQueue = NULL;
+IpcIoFile::IpcIoFiles IpcIoFile::ipcIoFiles;
static bool DiskerOpen(const String &path, int flags, mode_t mode);
static void DiskerClose(const String &path);
IpcIoFile::IpcIoFile(char const *aDb):
dbName(aDb),
diskId(-1),
- ioLevel(0),
- error_(false)
+ workerQueue(NULL),
+ error_(false),
+ lastRequestId(0),
+ olderRequests(&requestMap1),
+ newerRequests(&requestMap2),
+ timeoutCheckScheduled(false)
{
}
void
IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
{
+ debugs(0,0,HERE);
ioRequestor = callback;
Must(diskId < 0); // we do not know our disker yet
+ Must(!diskerQueue && !workerQueue);
if (IamDiskProcess()) {
error_ = !DiskerOpen(dbName, flags, mode);
if (error_)
return;
+ // XXX: make capacity configurable
+ diskerQueue = new DiskerQueue(dbName.termedBuf(), Config.workers, 1024);
+ ipcIoFiles.insert(std::make_pair(KidIdentifier, this));
+
Ipc::HereIamMessage ann(Ipc::StrandCoord(KidIdentifier, getpid()));
ann.strand.tag = dbName;
Ipc::TypedMsgHdr message;
ioRequestor->ioCompletedNotification();
return;
- }
+ }
- // XXX: use StrandSearchRequest instead
- IpcIoRequest ipcIo;
- ipcIo.requestorId = KidIdentifier;
- ipcIo.command = IpcIo::cmdOpen;
- ipcIo.len = dbName.size();
- assert(ipcIo.len <= sizeof(ipcIo.buf));
- memcpy(ipcIo.buf, dbName.rawBuf(), ipcIo.len);
+ Ipc::StrandSearchRequest request;
+ request.requestorId = KidIdentifier;
+ request.tag = dbName;
- IpcIoPendingRequest *pending = new IpcIoPendingRequest(this);
- send(ipcIo, pending);
+ Ipc::TypedMsgHdr msg;
+ request.pack(msg);
+ Ipc::SendMessage(Ipc::coordinatorAddr, msg);
+
+ IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
+ trackPendingRequest(*pending);
}
void
-IpcIoFile::openCompleted(const IpcIoResponse *ipcResponse) {
- if (!ipcResponse) {
+IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) {
+ debugs(0,0,HERE);
+ Must(diskId < 0); // we do not know our disker yet
+ Must(!workerQueue);
+
+ if (!response) {
debugs(79,1, HERE << "error: timeout");
error_ = true;
- } else
- if (ipcResponse->xerrno) {
- debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno));
- error_ = true;
- } else {
- diskId = ipcResponse->diskId;
- if (diskId < 0) {
+ } else {
+ diskId = response->strand.kidId;
+ if (diskId >= 0) {
+ workerQueue = DiskerQueue::Attach(dbName.termedBuf(), KidIdentifier);
+ ipcIoFiles.insert(std::make_pair(diskId, this));
+ } else {
error_ = true;
debugs(79,1, HERE << "error: no disker claimed " << dbName);
- }
- }
+ }
+ }
ioRequestor->ioCompletedNotification();
}
void
IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
{
+ debugs(0,0,HERE);
assert(false); // check
/* We use the same logic path for open */
open(flags, mode, callback);
void
IpcIoFile::close()
{
+ debugs(0,0,HERE);
assert(ioRequestor != NULL);
+ delete diskerQueue;
+ delete workerQueue;
+
if (IamDiskProcess())
DiskerClose(dbName);
- // XXX: else nothing to do?
+ // XXX: else nothing to do?
ioRequestor->closeCompleted();
}
//assert(minOffset < 0 || minOffset <= readRequest->offset);
//assert(maxOffset < 0 || readRequest->offset + readRequest->len <= (uint64_t)maxOffset);
- IpcIoRequest ipcIo;
- ipcIo.requestorId = KidIdentifier;
- ipcIo.command = IpcIo::cmdRead;
- ipcIo.offset = readRequest->offset;
- ipcIo.len = readRequest->len;
-
- IpcIoPendingRequest *pending = new IpcIoPendingRequest(this);
+ IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
pending->readRequest = readRequest;
- send(ipcIo, pending);
+ push(*pending);
}
void
IpcIoFile::readCompleted(ReadRequest *readRequest,
- const IpcIoResponse *ipcResponse)
+ const IpcIoMsg *const response)
{
+ debugs(0,0,HERE);
bool ioError = false;
- if (!ipcResponse) {
+ if (!response) {
debugs(79,1, HERE << "error: timeout");
ioError = true; // I/O timeout does not warrant setting error_?
- } else
- if (ipcResponse->xerrno) {
- debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno));
+ } else
+ if (response->xerrno) {
+ debugs(79,1, HERE << "error: " << xstrerr(response->xerrno));
ioError = error_ = true;
- } else {
- memcpy(readRequest->buf, ipcResponse->buf, ipcResponse->len);
+ } else {
+ memcpy(readRequest->buf, response->buf, response->len);
}
const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
//assert(minOffset < 0 || minOffset <= writeRequest->offset);
//assert(maxOffset < 0 || writeRequest->offset + writeRequest->len <= (uint64_t)maxOffset);
- IpcIoRequest ipcIo;
- ipcIo.requestorId = KidIdentifier;
- ipcIo.command = IpcIo::cmdWrite;
- ipcIo.offset = writeRequest->offset;
- ipcIo.len = writeRequest->len;
- assert(ipcIo.len <= sizeof(ipcIo.buf));
- memcpy(ipcIo.buf, writeRequest->buf, ipcIo.len); // optimize away
-
- IpcIoPendingRequest *pending = new IpcIoPendingRequest(this);
+ IpcIoPendingRequest *const pending = new IpcIoPendingRequest(this);
pending->writeRequest = writeRequest;
- send(ipcIo, pending);
+ push(*pending);
}
void
IpcIoFile::writeCompleted(WriteRequest *writeRequest,
- const IpcIoResponse *ipcResponse)
+ const IpcIoMsg *const response)
{
+ debugs(0,0,HERE);
bool ioError = false;
- if (!ipcResponse) {
+ if (!response) {
debugs(79,1, HERE << "error: timeout");
ioError = true; // I/O timeout does not warrant setting error_?
- } else
- if (ipcResponse->xerrno) {
- debugs(79,1, HERE << "error: " << xstrerr(ipcResponse->xerrno));
- error_ = true;
} else
- if (ipcResponse->len != writeRequest->len) {
- debugs(79,1, HERE << "problem: " << ipcResponse->len << " < " << writeRequest->len);
+ if (response->xerrno) {
+ debugs(79,1, HERE << "error: " << xstrerr(response->xerrno));
+ ioError = error_ = true;
+ } else
+ if (response->len != writeRequest->len) {
+ debugs(79,1, HERE << "problem: " << response->len << " < " << writeRequest->len);
error_ = true;
}
bool
IpcIoFile::ioInProgress() const
{
- return ioLevel > 0; // XXX: todo
+ return !olderRequests->empty() || !newerRequests->empty();
}
-/// sends an I/O request to disker
+/// track a new pending request
void
-IpcIoFile::send(IpcIoRequest &ipcIo, IpcIoPendingRequest *pending)
+IpcIoFile::trackPendingRequest(IpcIoPendingRequest &pending)
{
- if (++LastRequestId == 0) // don't use zero value as requestId
- ++LastRequestId;
- ipcIo.requestId = LastRequestId;
- TheNewerRequests->insert(std::make_pair(ipcIo.requestId, pending));
-
- Ipc::TypedMsgHdr message;
- ipcIo.pack(message);
+ debugs(0,0,HERE);
+ newerRequests->insert(std::make_pair(pending.id, &pending));
+ if (!timeoutCheckScheduled)
+ scheduleTimeoutCheck();
+}
- Must(diskId >= 0 || ipcIo.command == IpcIo::cmdOpen);
- const String addr = diskId >= 0 ?
- Ipc::Port::MakeAddr(Ipc::strandAddrPfx, diskId) :
- Ipc::coordinatorAddr;
+/// push an I/O request to disker
+void
+IpcIoFile::push(IpcIoPendingRequest &pending)
+{
+ debugs(0,0,HERE);
+ Must(diskId >= 0);
+ Must(workerQueue);
+ Must(pending.readRequest || pending.writeRequest);
+
+ IpcIoMsg ipcIo;
+ if (pending.readRequest) {
+ ipcIo.command = IpcIo::cmdRead;
+ ipcIo.offset = pending.readRequest->offset;
+ ipcIo.len = pending.readRequest->len;
+ assert(ipcIo.len <= sizeof(ipcIo.buf));
+ memcpy(ipcIo.buf, pending.readRequest->buf, ipcIo.len); // optimize away
+ } else { // pending.writeRequest
+ ipcIo.command = IpcIo::cmdWrite;
+ ipcIo.offset = pending.writeRequest->offset;
+ ipcIo.len = pending.writeRequest->len;
+ assert(ipcIo.len <= sizeof(ipcIo.buf));
+ memcpy(ipcIo.buf, pending.writeRequest->buf, ipcIo.len); // optimize away
+ }
- debugs(47, 7, HERE << "asking disker" << diskId << " to " <<
- ipcIo.command << "; ipcIo" << KidIdentifier << '.' << ipcIo.requestId);
+ if (!workerQueue->push(ipcIo)) {
+ pending.completeIo(NULL);
+ return;
+ }
- Ipc::SendMessage(addr, message);
- ++ioLevel;
+ if (workerQueue->pushedSize() == 1)
+ Notify(diskId); // notify disker
- if (!TimeoutCheckScheduled)
- ScheduleTimeoutCheck();
+ trackPendingRequest(pending);
}
-/// called when disker responds to our I/O request
+/// called when coordinator responds to worker open request
void
-IpcIoFile::HandleResponse(const Ipc::TypedMsgHdr &raw)
+IpcIoFile::HandleOpenResponse(const Ipc::StrandSearchResponse &response)
{
- IpcIoResponse response(raw);
-
- const int requestId = response.requestId;
- debugs(47, 7, HERE << "disker response to " << response.command <<
- "; ipcIo" << KidIdentifier << '.' << requestId);
+ debugs(47, 7, HERE << "coordinator response to open request");
+ Must(response.data);
+ reinterpret_cast<IpcIoFile*>(response.data)->openCompleted(&response);
+}
- Must(requestId != 0);
+void
+IpcIoFile::handleResponses()
+{
+ Must(workerQueue);
+ IpcIoMsg ipcIo;
+ while (workerQueue->pop(ipcIo))
+ handleResponse(ipcIo);
+}
- if (IpcIoPendingRequest *pending = DequeueRequest(requestId)) {
- pending->completeIo(&response);
+void
+IpcIoFile::handleResponse(const IpcIoMsg &ipcIo)
+{
+ const int requestId = ipcIo.requestId;
+ debugs(47, 7, HERE << "disker response to " << ipcIo.command <<
+ "; ipcIo" << KidIdentifier << '.' << requestId);
+ Must(requestId);
+ if (IpcIoPendingRequest *const pending = dequeueRequest(requestId)) {
+ pending->completeIo(&ipcIo);
delete pending; // XXX: leaking if throwing
} else {
- debugs(47, 4, HERE << "LATE disker response to " << response.command <<
- "; ipcIo" << KidIdentifier << '.' << requestId);
+ debugs(47, 4, HERE << "LATE disker response to " << ipcIo.command <<
+ "; ipcIo" << KidIdentifier << '.' << requestId);
// nothing we can do about it; completeIo() has been called already
}
}
-/// Mgr::IpcIoFile::checkTimeous wrapper
void
-IpcIoFile::CheckTimeouts(void*)
+IpcIoFile::Notify(const int peerId)
{
- TimeoutCheckScheduled = false;
+ debugs(0,0,HERE);
+ Ipc::TypedMsgHdr msg;
+ msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
+ msg.putInt(KidIdentifier);
+ const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, peerId);
+ Ipc::SendMessage(addr, msg);
+}
+
+void
+IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
+{
+ debugs(0,0,HERE);
+ if (IamDiskProcess())
+ DiskerHandleRequests();
+ else {
+ const int diskId = msg.getInt();
+ const IpcIoFiles::const_iterator i = ipcIoFiles.find(diskId);
+ Must(i != ipcIoFiles.end()); // XXX: warn and continue?
+ i->second->handleResponses();
+ }
+}
+
+/// IpcIoFile::checkTimeouts wrapper
+void
+IpcIoFile::CheckTimeouts(void *const param)
+{
+ debugs(0,0,HERE);
+ Must(param);
+ reinterpret_cast<IpcIoFile*>(param)->checkTimeouts();
+}
+
+void
+IpcIoFile::checkTimeouts()
+{
+ debugs(0,0,HERE);
+ timeoutCheckScheduled = false;
// any old request would have timed out by now
typedef RequestMap::const_iterator RMCI;
- RequestMap &elders = *TheOlderRequests;
- for (RMCI i = elders.begin(); i != elders.end(); ++i) {
- IpcIoPendingRequest *pending = i->second;
+ for (RMCI i = olderRequests->begin(); i != olderRequests->end(); ++i) {
+ IpcIoPendingRequest *const pending = i->second;
const int requestId = i->first;
debugs(47, 7, HERE << "disker timeout; ipcIo" <<
- KidIdentifier << '.' << requestId);
+ KidIdentifier << '.' << diskId << '.' << requestId);
pending->completeIo(NULL); // no response
delete pending; // XXX: leaking if throwing
- }
- elders.clear();
+ }
+ olderRequests->clear();
- swap(TheOlderRequests, TheNewerRequests); // switches pointers around
- if (!TheOlderRequests->empty())
- ScheduleTimeoutCheck();
+ swap(olderRequests, newerRequests); // switches pointers around
+ if (!olderRequests->empty())
+ scheduleTimeoutCheck();
}
/// prepare to check for timeouts in a little while
void
-IpcIoFile::ScheduleTimeoutCheck()
+IpcIoFile::scheduleTimeoutCheck()
{
+ debugs(0,0,HERE);
// we check all older requests at once so some may be wait for 2*timeout
const double timeout = 7; // in seconds
eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
- NULL, timeout, 0, false);
- TimeoutCheckScheduled = true;
+ this, timeout, 0, false);
+ timeoutCheckScheduled = true;
}
/// returns and forgets the right IpcIoFile pending request
IpcIoPendingRequest *
-IpcIoFile::DequeueRequest(unsigned int requestId)
+IpcIoFile::dequeueRequest(const unsigned int requestId)
{
debugs(47, 3, HERE);
Must(requestId != 0);
- RequestMap *map = NULL;
- RequestMap::iterator i = TheRequestMap1.find(requestId);
+ RequestMap *map = NULL;
+ RequestMap::iterator i = requestMap1.find(requestId);
- if (i != TheRequestMap1.end())
- map = &TheRequestMap1;
+ if (i != requestMap1.end())
+ map = &requestMap1;
else {
- i = TheRequestMap2.find(requestId);
- if (i != TheRequestMap2.end())
- map = &TheRequestMap2;
+ i = requestMap2.find(requestId);
+ if (i != requestMap2.end())
+ map = &requestMap2;
}
if (!map) // not found in both maps
}
int
-IpcIoFile::getFD() const
+IpcIoFile::getFD() const
{
assert(false); // not supported; TODO: remove this method from API
return -1;
}
-/* IpcIoRequest */
-
-IpcIoRequest::IpcIoRequest():
- requestorId(0), requestId(0),
- offset(0), len(0),
- command(IpcIo::cmdNone)
-{
-}
-
-IpcIoRequest::IpcIoRequest(const Ipc::TypedMsgHdr& msg)
-{
- msg.checkType(Ipc::mtIpcIoRequest);
- msg.getPod(requestorId);
- msg.getPod(requestId);
-
- msg.getPod(offset);
- msg.getPod(len);
- msg.getPod(command);
-
- if (command == IpcIo::cmdOpen || command == IpcIo::cmdWrite)
- msg.getFixed(buf, len);
-}
-
-void
-IpcIoRequest::pack(Ipc::TypedMsgHdr& msg) const
-{
- msg.setType(Ipc::mtIpcIoRequest);
- msg.putPod(requestorId);
- msg.putPod(requestId);
-
- msg.putPod(offset);
- msg.putPod(len);
- msg.putPod(command);
-
- if (command == IpcIo::cmdOpen || command == IpcIo::cmdWrite)
- msg.putFixed(buf, len);
-}
-
-
-/* IpcIoResponse */
-
-IpcIoResponse::IpcIoResponse():
- diskId(-1),
- requestId(0),
- len(0),
- xerrno(0)
-{
-}
-
-IpcIoResponse::IpcIoResponse(const Ipc::TypedMsgHdr& msg)
-{
- msg.checkType(Ipc::mtIpcIoResponse);
- msg.getPod(diskId);
- msg.getPod(requestId);
- msg.getPod(len);
- msg.getPod(command);
- msg.getPod(xerrno);
-
- if (command == IpcIo::cmdRead && !xerrno)
- msg.getFixed(buf, len);
-}
+/* IpcIoMsg */
-void
-IpcIoResponse::pack(Ipc::TypedMsgHdr& msg) const
+IpcIoMsg::IpcIoMsg():
+ requestId(0), offset(0), len(0), command(IpcIo::cmdNone), xerrno(0)
{
- msg.setType(Ipc::mtIpcIoResponse);
- msg.putPod(diskId);
- msg.putPod(requestId);
- msg.putPod(len);
- msg.putPod(command);
- msg.putPod(xerrno);
-
- if (command == IpcIo::cmdRead && !xerrno)
- msg.putFixed(buf, len);
}
-
/* IpcIoPendingRequest */
IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
file(aFile), readRequest(NULL), writeRequest(NULL)
{
+ debugs(0,0,HERE);
+ if (++file->lastRequestId == 0) // don't use zero value as requestId
+ ++file->lastRequestId;
+ id = file->lastRequestId;
}
void
-IpcIoPendingRequest::completeIo(IpcIoResponse *response)
+IpcIoPendingRequest::completeIo(const IpcIoMsg *const response)
{
+ debugs(0,0,HERE);
Must(file != NULL);
if (readRequest)
else
if (writeRequest)
file->writeCompleted(writeRequest, response);
- else
- file->openCompleted(response);
+ else {
+ Must(!response); // only timeouts are handled here
+ file->openCompleted(NULL);
+ }
}
static int TheFile = -1; ///< db file descriptor
-static
-void diskerRead(const IpcIoRequest &request)
+static void
+diskerRead(IpcIoMsg &ipcIo)
{
- debugs(47,5, HERE << "disker" << KidIdentifier << " reads " <<
- request.len << " at " << request.offset <<
- " ipcIo" << request.requestorId << '.' << request.requestId);
-
- IpcIoResponse response;
- response.diskId = KidIdentifier;
- response.requestId = request.requestId;
- response.command = request.command;
-
- const ssize_t read = pread(TheFile, response.buf, request.len, request.offset);
+ debugs(0,0,HERE);
+ const ssize_t read = pread(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
if (read >= 0) {
- response.xerrno = 0;
- response.len = static_cast<size_t>(read); // safe because read > 0
+ ipcIo.xerrno = 0;
+ const size_t len = static_cast<size_t>(read); // safe because read > 0
debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
- (response.len == request.len ? "all " : "just ") << read);
- } else {
- response.xerrno = errno;
- response.len = 0;
+ (len == ipcIo.len ? "all " : "just ") << read);
+ ipcIo.len = len;
+ } else {
+ ipcIo.xerrno = errno;
+ ipcIo.len = 0;
debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
- response.xerrno);
- }
-
- Ipc::TypedMsgHdr message;
- response.pack(message);
- const String addr =
- Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId);
- Ipc::SendMessage(addr, message);
+ ipcIo.xerrno);
+ }
}
-static
-void diskerWrite(const IpcIoRequest &request)
+static void
+diskerWrite(IpcIoMsg &ipcIo)
{
- debugs(47,5, HERE << "disker" << KidIdentifier << " writes " <<
- request.len << " at " << request.offset <<
- " ipcIo" << request.requestorId << '.' << request.requestId);
-
- IpcIoResponse response;
- response.diskId = KidIdentifier;
- response.requestId = request.requestId;
- response.command = request.command;
-
- const ssize_t wrote = pwrite(TheFile, request.buf, request.len, request.offset);
+ debugs(0,0,HERE);
+ const ssize_t wrote = pwrite(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
if (wrote >= 0) {
- response.xerrno = 0;
- response.len = static_cast<size_t>(wrote); // safe because wrote > 0
+ ipcIo.xerrno = 0;
+ const size_t len = static_cast<size_t>(wrote); // safe because wrote > 0
debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " <<
- (response.len == request.len ? "all " : "just ") << wrote);
- } else {
- response.xerrno = errno;
- response.len = 0;
+ (len == ipcIo.len ? "all " : "just ") << wrote);
+ ipcIo.len = len;
+ } else {
+ ipcIo.xerrno = errno;
+ ipcIo.len = 0;
debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " <<
- response.xerrno);
- }
-
- Ipc::TypedMsgHdr message;
- response.pack(message);
- const String addr =
- Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId);
- Ipc::SendMessage(addr, message);
+ ipcIo.xerrno);
+ }
}
-/// called when disker receives an I/O request
void
-IpcIoFile::HandleRequest(const IpcIoRequest &request)
+IpcIoFile::DiskerHandleRequests()
{
- switch (request.command) {
- case IpcIo::cmdRead:
- diskerRead(request);
- break;
+ debugs(0,0,HERE);
+ Must(diskerQueue);
+ int workerId;
+ IpcIoMsg ipcIo;
+ while (diskerQueue->pop(workerId, ipcIo))
+ DiskerHandleRequest(workerId, ipcIo);
+}
- case IpcIo::cmdWrite:
- diskerWrite(request);
- break;
+/// called when disker receives an I/O request
+void
+IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
+{
+ debugs(0,0,HERE);
+ Must(diskerQueue);
- default:
+ if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
debugs(0,0, HERE << "disker" << KidIdentifier <<
- " should not receive " << request.command <<
- " ipcIo" << request.requestorId << '.' << request.requestId);
- break;
- }
+ " should not receive " << ipcIo.command <<
+ " ipcIo" << workerId << '.' << ipcIo.requestId);
+ return;
+ }
+
+ debugs(47,5, HERE << "disker" << KidIdentifier <<
+ (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
+ ipcIo.len << " at " << ipcIo.offset <<
+ " ipcIo" << workerId << '.' << ipcIo.requestId);
+
+ if (ipcIo.command == IpcIo::cmdRead)
+ diskerRead(ipcIo);
+ else // ipcIo.command == IpcIo::cmdWrite
+ diskerWrite(ipcIo);
+
+ diskerQueue->push(workerId, ipcIo); // XXX: report warning
+
+ if (diskerQueue->pushedSize(workerId) == 1)
+ Notify(workerId); // notify worker
}
static bool
DiskerOpen(const String &path, int flags, mode_t mode)
{
+ debugs(0,0,HERE);
assert(TheFile < 0);
TheFile = file_open(path.termedBuf(), flags);
debugs(47,0, HERE << "rock db error opening " << path << ": " <<
xstrerr(xerrno));
return false;
- }
+ }
store_open_disk_fd++;
debugs(79,3, HERE << "rock db opened " << path << ": FD " << TheFile);
- return true;
+ return true;
}
static void
DiskerClose(const String &path)
{
+ debugs(0,0,HERE);
if (TheFile >= 0) {
file_close(TheFile);
debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
#include "DiskIO/DiskFile.h"
#include "DiskIO/IORequestor.h"
#include "ipc/forward.h"
+#include "ipc/Queue.h"
#include <map>
// TODO: expand to all classes
} // namespace IpcIo
-/// converts DiskIO requests to IPC messages
-// TODO: make this IpcIoMsg to make IpcIoRequest and IpcIoResponse similar
-class IpcIoRequest {
+/// converts DiskIO requests to IPC queue messages
+class IpcIoMsg {
public:
- IpcIoRequest();
-
- explicit IpcIoRequest(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
- void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
+ IpcIoMsg();
public:
- int requestorId; ///< kidId of the requestor; used for response destination
- unsigned int requestId; ///< unique for sender; matches request w/ response
+ unsigned int requestId; ///< unique for requestor; matches request w/ response
- /* ReadRequest and WriteRequest parameters to pass to disker */
char buf[IpcIo::BufCapacity]; // XXX: inefficient
off_t offset;
size_t len;
- IpcIo::Command command; ///< what disker is supposed to do
-};
-
-/// disker response to IpcIoRequest
-class IpcIoResponse {
-public:
- IpcIoResponse();
-
- explicit IpcIoResponse(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
- void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
-
-public:
- int diskId; ///< kidId of the responding disker
- unsigned int requestId; ///< unique for sender; matches request w/ response
-
- char buf[IpcIo::BufCapacity]; // XXX: inefficient
- size_t len;
-
- IpcIo::Command command; ///< what disker did
+ IpcIo::Command command; ///< what disker is supposed to do or did
int xerrno; ///< I/O error code or zero
};
virtual bool canWrite() const;
virtual bool ioInProgress() const;
- /// finds and calls the right IpcIoFile upon disker's response
- static void HandleResponse(const Ipc::TypedMsgHdr &response);
+ /// handle open response from coordinator
+ static void HandleOpenResponse(const Ipc::StrandSearchResponse &response);
- /// disker entry point for remote I/O requests
- static void HandleRequest(const IpcIoRequest &request);
+ /// handle queue push notifications from worker or disker
+ static void HandleNotification(const Ipc::TypedMsgHdr &msg);
protected:
friend class IpcIoPendingRequest;
- void openCompleted(const IpcIoResponse *response);
- void readCompleted(ReadRequest *readRequest, const IpcIoResponse *);
- void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse *);
+ void openCompleted(const Ipc::StrandSearchResponse *const response);
+ void readCompleted(ReadRequest *readRequest, const IpcIoMsg *const response);
+ void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response);
private:
- void send(IpcIoRequest &request, IpcIoPendingRequest *pending);
+ void trackPendingRequest(IpcIoPendingRequest &pending);
+ void push(IpcIoPendingRequest &pending);
+ IpcIoPendingRequest *dequeueRequest(const unsigned int requestId);
+
+ static void Notify(const int peerId);
- static IpcIoPendingRequest *DequeueRequest(unsigned int requestId);
+ static void CheckTimeouts(void *const param);
+ void checkTimeouts();
+ void scheduleTimeoutCheck();
- static void CheckTimeouts(void* param);
- static void ScheduleTimeoutCheck();
+ void handleResponses();
+ void handleResponse(const IpcIoMsg &ipcIo);
+
+ static void DiskerHandleRequests();
+ static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
private:
+ typedef FewToOneBiQueue<IpcIoMsg> DiskerQueue;
+ typedef OneToOneBiQueue<IpcIoMsg> WorkerQueue;
+
const String dbName; ///< the name of the file we are managing
int diskId; ///< the process ID of the disker we talk to
+ static DiskerQueue *diskerQueue; ///< IPC queue for disker
+ WorkerQueue *workerQueue; ///< IPC queue for worker
RefCount<IORequestor> ioRequestor;
- int ioLevel; ///< number of pending I/O requests using this file
-
bool error_; ///< whether we have seen at least one I/O error (XXX)
+ unsigned int lastRequestId; ///< last requestId used
+
/// maps requestId to the handleResponse callback
typedef std::map<unsigned int, IpcIoPendingRequest*> RequestMap;
- static RequestMap TheRequestMap1; ///< older (or newer) pending requests
- static RequestMap TheRequestMap2; ///< newer (or older) pending requests
- static RequestMap *TheOlderRequests; ///< older requests (map1 or map2)
- static RequestMap *TheNewerRequests; ///< newer requests (map2 or map1)
- static bool TimeoutCheckScheduled; ///< we expect a CheckTimeouts() call
+ RequestMap requestMap1; ///< older (or newer) pending requests
+ RequestMap requestMap2; ///< newer (or older) pending requests
+ RequestMap *olderRequests; ///< older requests (map1 or map2)
+ RequestMap *newerRequests; ///< newer requests (map2 or map1)
+ bool timeoutCheckScheduled; ///< we expect a CheckTimeouts() call
- static unsigned int LastRequestId; ///< last requestId used
+ typedef std::map<int, IpcIoFile*> IpcIoFiles; ///< maps diskerId to IpcIoFile
+ static IpcIoFiles ipcIoFiles;
CBDATA_CLASS2(IpcIoFile);
};
IpcIoPendingRequest(const IpcIoFile::Pointer &aFile);
/// called when response is received and, with a nil response, on timeouts
- void completeIo(IpcIoResponse *response);
+ void completeIo(const IpcIoMsg *const response);
public:
IpcIoFile::Pointer file; ///< the file object waiting for the response
+ unsigned int id; ///< request id
ReadRequest *readRequest; ///< set if this is a read requests
WriteRequest *writeRequest; ///< set if this is a write request
--- /dev/null
+/*
+ * $Id$
+ *
+ */
+
+#ifndef SQUID_IPC_QUEUE_H
+#define SQUID_IPC_QUEUE_H
+
+#include "Array.h"
+#include "ipc/AtomicWord.h"
+#include "ipc/SharedMemory.h"
+#include "SquidString.h"
+#include "util.h"
+
+/// Lockless fixed-capacity queue for a single writer and a single
+/// reader. Does not manage shared memory segment.
+template <class Value>
+class OneToOneUniQueue {
+public:
+ OneToOneUniQueue(const int aCapacity);
+
+ int size() const { return theSize; }
+ int capacity() const { return theCapacity; }
+
+ bool empty() const { return !theSize; }
+ bool full() const { return theSize == theCapacity; }
+
+ bool pop(Value &value); ///< returns false iff the queue is empty
+ bool push(const Value &value); ///< returns false iff the queue is full
+
+ static int Bytes2Items(int size);
+ static int Items2Bytes(const int size);
+
+private:
+ unsigned int theIn; ///< input index, used only in push()
+ unsigned int theOut; ///< output index, used only in pop()
+
+ AtomicWord theSize; ///< number of items in the queue
+ const int theCapacity; ///< maximum number of items, i.e. theBuffer size
+ Value theBuffer[];
+};
+
+/// Lockless fixed-capacity bidirectional queue for two processes.
+/// Manages shared memory segment.
+template <class Value>
+class OneToOneBiQueue {
+public:
+ typedef OneToOneUniQueue<Value> UniQueue;
+
+ /// Create a new shared queue.
+ OneToOneBiQueue(const char *const id, const int aCapacity);
+ OneToOneBiQueue(const char *const id); ///< Attach to existing shared queue.
+
+ int pushedSize() const { return pushQueue->size(); }
+
+ bool pop(Value &value) { return popQueue->pop(value); }
+ bool push(const Value &value) { return pushQueue->push(value); }
+
+private:
+ SharedMemory shm; ///< shared memory segment
+ UniQueue *popQueue; ///< queue to pop from for this process
+ UniQueue *pushQueue; ///< queue to push to for this process
+};
+
+/**
+ * Lockless fixed-capacity bidirectional queue for a limited number
+ * pricesses. Implements a star topology: Many worker processes
+ * communicate with the one central process. The central process uses
+ * FewToOneBiQueue object, while workers use OneToOneBiQueue objects
+ * created with the Attach() method. Each worker has a unique integer ID
+ * in [0, workerCount) range.
+ */
+template <class Value>
+class FewToOneBiQueue {
+public:
+ typedef OneToOneBiQueue<Value> BiQueue;
+
+ FewToOneBiQueue(const char *const id, const int aWorkerCount, const int aCapacity);
+ static BiQueue *Attach(const char *const id, const int workerId);
+ ~FewToOneBiQueue();
+
+ bool validWorkerId(const int workerId) const;
+ int workerCount() const { return theWorkerCount; }
+ int pushedSize(const int workerId) const;
+
+ bool pop(int &workerId, Value &value); ///< returns false iff the queue is empty
+ bool push(const int workerId, const Value &value); ///< returns false iff the queue is full
+
+private:
+ static String BiQueueId(String id, const int workerId);
+
+ int theLastPopWorkerId; ///< the last worker ID we pop()ed from
+ Vector<BiQueue *> biQueues; ///< worker queues
+ const int theWorkerCount; ///< number of worker processes
+ const int theCapacity; ///< per-worker capacity
+};
+
+
+// OneToOneUniQueue
+
+template <class Value>
+OneToOneUniQueue<Value>::OneToOneUniQueue(const int aCapacity):
+ theIn(0), theOut(0), theSize(0), theCapacity(aCapacity)
+{
+ assert(theCapacity > 0);
+}
+
+template <class Value>
+bool
+OneToOneUniQueue<Value>::pop(Value &value)
+{
+ if (empty())
+ return false;
+
+ const unsigned int pos = theOut++ % theCapacity;
+ value = theBuffer[pos];
+ --theSize;
+ return true;
+}
+
+template <class Value>
+bool
+OneToOneUniQueue<Value>::push(const Value &value)
+{
+ if (full())
+ return false;
+
+ const unsigned int pos = theIn++ % theCapacity;
+ theBuffer[pos] = value;
+ ++theSize;
+ return true;
+}
+
+template <class Value>
+int
+OneToOneUniQueue<Value>::Bytes2Items(int size)
+{
+ assert(size >= 0);
+ size -= sizeof(OneToOneUniQueue);
+ return size >= 0 ? size / sizeof(Value) : 0;
+}
+
+template <class Value>
+int
+OneToOneUniQueue<Value>::Items2Bytes(const int size)
+{
+ assert(size >= 0);
+ return sizeof(OneToOneUniQueue) + sizeof(Value) * size;
+}
+
+
+// OneToOneBiQueue
+
+template <class Value>
+OneToOneBiQueue<Value>::OneToOneBiQueue(const char *const id, const int capacity) :
+ shm(id)
+{
+ const int uniSize = UniQueue::Items2Bytes(capacity);
+ shm.create(uniSize * 2);
+ char *const mem = reinterpret_cast<char *>(shm.mem());
+ assert(mem);
+ popQueue = new (mem) UniQueue(capacity);
+ pushQueue = new (mem + uniSize) UniQueue(capacity);
+}
+
+template <class Value>
+OneToOneBiQueue<Value>::OneToOneBiQueue(const char *const id) :
+ shm(id)
+{
+ shm.open();
+ char *const mem = reinterpret_cast<char *>(shm.mem());
+ assert(mem);
+ pushQueue = reinterpret_cast<UniQueue *>(mem);
+ const int uniSize = pushQueue->Items2Bytes(pushQueue->capacity());
+ popQueue = reinterpret_cast<UniQueue *>(mem + uniSize);
+}
+
+// FewToOneBiQueue
+
+template <class Value>
+FewToOneBiQueue<Value>::FewToOneBiQueue(const char *const id, const int aWorkerCount, const int aCapacity):
+ theLastPopWorkerId(-1), theWorkerCount(aWorkerCount),
+ theCapacity(aCapacity)
+{
+ biQueues.reserve(theWorkerCount);
+ for (int i = 0; i < theWorkerCount; ++i) {
+ const String biQueueId = BiQueueId(id, i);
+ biQueues.push_back(new BiQueue(biQueueId.termedBuf(), theCapacity));
+ }
+}
+
+template <class Value>
+typename FewToOneBiQueue<Value>::BiQueue *
+FewToOneBiQueue<Value>::Attach(const char *const id, const int workerId)
+{
+ return new BiQueue(BiQueueId(id, workerId).termedBuf());
+}
+
+template <class Value>
+FewToOneBiQueue<Value>::~FewToOneBiQueue()
+{
+ for (int i = 0; i < theWorkerCount; ++i)
+ delete biQueues[i];
+}
+
+template <class Value>
+bool FewToOneBiQueue<Value>::validWorkerId(const int workerId) const
+{
+ return 0 <= workerId && workerId < theWorkerCount;
+}
+
+template <class Value>
+int FewToOneBiQueue<Value>::pushedSize(const int workerId) const
+{
+ assert(validWorkerId(workerId));
+ return biQueues[workerId]->pushedSize();
+}
+
+template <class Value>
+bool
+FewToOneBiQueue<Value>::pop(int &workerId, Value &value)
+{
+ ++theLastPopWorkerId;
+ for (int i = 0; i < theWorkerCount; ++i) {
+ theLastPopWorkerId = (theLastPopWorkerId + 1) % theWorkerCount;
+ if (biQueues[theLastPopWorkerId]->pop(value)) {
+ workerId = theLastPopWorkerId;
+ return true;
+ }
+ }
+ return false;
+}
+
+template <class Value>
+bool
+FewToOneBiQueue<Value>::push(const int workerId, const Value &value)
+{
+ assert(validWorkerId(workerId));
+ return biQueues[workerId]->push(value);
+}
+
+template <class Value>
+String
+FewToOneBiQueue<Value>::BiQueueId(String id, const int workerId)
+{
+ id.append("__");
+ id.append(xitoa(workerId));
+ return id;
+}
+
+#endif // SQUID_IPC_QUEUE_H