static int TheFile = -1; ///< db file descriptor
-static
-void diskerRead(const IpcIoRequest &request)
+static void
+diskerRead(IpcIoMsg &ipcIo)
{
- debugs(47,5, HERE << "disker" << KidIdentifier << " reads " <<
- request.len << " at " << request.offset <<
- " ipcIo" << request.requestorId << '.' << request.requestId);
-
- IpcIoResponse response;
- response.diskId = KidIdentifier;
- response.requestId = request.requestId;
- response.command = request.command;
-
- const ssize_t read = pread(TheFile, response.buf, request.len, request.offset);
+ debugs(0,0,HERE);
+ const ssize_t read = pread(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
+ statCounter.syscalls.disk.reads++;
+ fd_bytes(TheFile, read, FD_READ);
+
if (read >= 0) {
- response.xerrno = 0;
- response.len = static_cast<size_t>(read); // safe because read > 0
+ ipcIo.xerrno = 0;
+ const size_t len = static_cast<size_t>(read); // safe because read > 0
debugs(47,8, HERE << "disker" << KidIdentifier << " read " <<
- (response.len == request.len ? "all " : "just ") << read);
- } else {
- response.xerrno = errno;
- response.len = 0;
+ (len == ipcIo.len ? "all " : "just ") << read);
+ ipcIo.len = len;
+ } else {
+ ipcIo.xerrno = errno;
+ ipcIo.len = 0;
debugs(47,5, HERE << "disker" << KidIdentifier << " read error: " <<
- response.xerrno);
- }
-
- Ipc::TypedMsgHdr message;
- response.pack(message);
- const String addr =
- Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId);
- Ipc::SendMessage(addr, message);
+ ipcIo.xerrno);
+ }
}
-static
-void diskerWrite(const IpcIoRequest &request)
+static void
+diskerWrite(IpcIoMsg &ipcIo)
{
- debugs(47,5, HERE << "disker" << KidIdentifier << " writes " <<
- request.len << " at " << request.offset <<
- " ipcIo" << request.requestorId << '.' << request.requestId);
-
- IpcIoResponse response;
- response.diskId = KidIdentifier;
- response.requestId = request.requestId;
- response.command = request.command;
-
- const ssize_t wrote = pwrite(TheFile, request.buf, request.len, request.offset);
+ debugs(0,0,HERE);
+ const ssize_t wrote = pwrite(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
+ statCounter.syscalls.disk.writes++;
+ fd_bytes(TheFile, wrote, FD_WRITE);
+
if (wrote >= 0) {
- response.xerrno = 0;
- response.len = static_cast<size_t>(wrote); // safe because wrote > 0
+ ipcIo.xerrno = 0;
+ const size_t len = static_cast<size_t>(wrote); // safe because wrote > 0
debugs(47,8, HERE << "disker" << KidIdentifier << " wrote " <<
- (response.len == request.len ? "all " : "just ") << wrote);
- } else {
- response.xerrno = errno;
- response.len = 0;
+ (len == ipcIo.len ? "all " : "just ") << wrote);
+ ipcIo.len = len;
+ } else {
+ ipcIo.xerrno = errno;
+ ipcIo.len = 0;
debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " <<
- response.xerrno);
- }
-
- Ipc::TypedMsgHdr message;
- response.pack(message);
- const String addr =
- Ipc::Port::MakeAddr(Ipc::strandAddrPfx, request.requestorId);
- Ipc::SendMessage(addr, message);
+ ipcIo.xerrno);
+ }
}
-/// called when disker receives an I/O request
void
-IpcIoFile::HandleRequest(const IpcIoRequest &request)
+IpcIoFile::DiskerHandleRequests()
{
- switch (request.command) {
- case IpcIo::cmdRead:
- diskerRead(request);
- break;
+ debugs(0,0,HERE);
+ Must(diskerQueue);
+ int workerId;
+ IpcIoMsg ipcIo;
+ while (diskerQueue->pop(workerId, ipcIo))
+ DiskerHandleRequest(workerId, ipcIo);
+}
- case IpcIo::cmdWrite:
- diskerWrite(request);
- break;
+/// called when disker receives an I/O request
+void
+IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
+{
+ debugs(0,0,HERE);
+ Must(diskerQueue);
- default:
+ if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
debugs(0,0, HERE << "disker" << KidIdentifier <<
- " should not receive " << request.command <<
- " ipcIo" << request.requestorId << '.' << request.requestId);
- break;
- }
+ " should not receive " << ipcIo.command <<
+ " ipcIo" << workerId << '.' << ipcIo.requestId);
+ return;
+ }
+
+ debugs(47,5, HERE << "disker" << KidIdentifier <<
+ (ipcIo.command == IpcIo::cmdRead ? " reads " : " writes ") <<
+ ipcIo.len << " at " << ipcIo.offset <<
+ " ipcIo" << workerId << '.' << ipcIo.requestId);
+
+ if (ipcIo.command == IpcIo::cmdRead)
+ diskerRead(ipcIo);
+ else // ipcIo.command == IpcIo::cmdWrite
+ diskerWrite(ipcIo);
+
+ diskerQueue->push(workerId, ipcIo); // XXX: report warning
+
+ if (diskerQueue->pushedSize(workerId) == 1)
+ Notify(workerId); // notify worker
}
static bool