#include "ipc/Port.h"
#include "ipc/StrandSearch.h"
#include "ipc/UdsOp.h"
+#include "ipc/mem/Pages.h"
CBDATA_CLASS_INIT(IpcIoFile);
void
IpcIoFile::readCompleted(ReadRequest *readRequest,
- const IpcIoMsg *const response)
+ IpcIoMsg *const response)
{
bool ioError = false;
if (!response) {
if (response->xerrno) {
debugs(79,1, HERE << "error: " << xstrerr(response->xerrno));
ioError = error_ = true;
+ }
+ else
+ if (!response->page) {
+ debugs(79,1, HERE << "error: run out of shared memory pages");
+ ioError = true;
} else {
- memcpy(readRequest->buf, response->buf, response->len);
+ const char *const buf = Ipc::Mem::PagePointer(response->page);
+ memcpy(readRequest->buf, buf, response->len);
}
+ Ipc::Mem::PutPage(response->page);
+
const ssize_t rlen = ioError ? -1 : (ssize_t)readRequest->len;
const int errflag = ioError ? DISK_ERROR : DISK_OK;
ioRequestor->readCompleted(readRequest->buf, rlen, errflag, readRequest);
Must(pending->readRequest || pending->writeRequest);
IpcIoMsg ipcIo;
- ipcIo.requestId = lastRequestId;
- if (pending->readRequest) {
- ipcIo.command = IpcIo::cmdRead;
- ipcIo.offset = pending->readRequest->offset;
- ipcIo.len = pending->readRequest->len;
- assert(ipcIo.len <= sizeof(ipcIo.buf));
- memcpy(ipcIo.buf, pending->readRequest->buf, ipcIo.len); // optimize away
- } else { // pending->writeRequest
- ipcIo.command = IpcIo::cmdWrite;
- ipcIo.offset = pending->writeRequest->offset;
- ipcIo.len = pending->writeRequest->len;
- assert(ipcIo.len <= sizeof(ipcIo.buf));
- memcpy(ipcIo.buf, pending->writeRequest->buf, ipcIo.len); // optimize away
- }
+ try {
+ ipcIo.requestId = lastRequestId;
+ if (pending->readRequest) {
+ ipcIo.command = IpcIo::cmdRead;
+ ipcIo.offset = pending->readRequest->offset;
+ ipcIo.len = pending->readRequest->len;
+ } else { // pending->writeRequest
+ Must(pending->writeRequest->len <= Ipc::Mem::PageSize());
+ if (!Ipc::Mem::GetPage(ipcIo.page)) {
+ ipcIo.len = 0;
+ throw TexcHere("run out of shared memory pages");
+ }
+ ipcIo.command = IpcIo::cmdWrite;
+ ipcIo.offset = pending->writeRequest->offset;
+ ipcIo.len = pending->writeRequest->len;
+ char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
+ memcpy(buf, pending->writeRequest->buf, ipcIo.len); // optimize away
+ }
- debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->pushQueue->size());
+ debugs(47, 7, HERE << "pushing " << SipcIo(KidIdentifier, ipcIo, diskId) << " at " << workerQueue->pushQueue->size());
- try {
if (workerQueue->push(ipcIo))
Notify(diskId); // must notify disker
trackPendingRequest(pending);
debugs(47, DBG_IMPORTANT, "Worker I/O push queue overflow: " <<
SipcIo(KidIdentifier, ipcIo, diskId)); // TODO: report queue len
// TODO: grow queue size
-
+
+ pending->completeIo(NULL); // XXX: should distinguish this from timeout
+ delete pending;
+ } catch (const TextException &e) {
+ debugs(47, DBG_IMPORTANT, HERE << e.what());
pending->completeIo(NULL); // XXX: should distinguish this from timeout
delete pending;
}
}
void
-IpcIoFile::handleResponse(const IpcIoMsg &ipcIo)
+IpcIoFile::handleResponse(IpcIoMsg &ipcIo)
{
const int requestId = ipcIo.requestId;
debugs(47, 7, HERE << "popped disker response: " <<
}
void
-IpcIoPendingRequest::completeIo(const IpcIoMsg *const response)
+IpcIoPendingRequest::completeIo(IpcIoMsg *const response)
{
if (readRequest)
file->readCompleted(readRequest, response);
static void
diskerRead(IpcIoMsg &ipcIo)
{
- const ssize_t read = pread(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
+ if (!Ipc::Mem::GetPage(ipcIo.page)) {
+ ipcIo.len = 0;
+ debugs(47,5, HERE << "run out of shared memory pages");
+ return;
+ }
+
+ char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
+ const ssize_t read = pread(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
statCounter.syscalls.disk.reads++;
fd_bytes(TheFile, read, FD_READ);
static void
diskerWrite(IpcIoMsg &ipcIo)
{
- const ssize_t wrote = pwrite(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
+ const char *const buf = Ipc::Mem::PagePointer(ipcIo.page);
+ const ssize_t wrote = pwrite(TheFile, buf, min(ipcIo.len, Ipc::Mem::PageSize()), ipcIo.offset);
statCounter.syscalls.disk.writes++;
fd_bytes(TheFile, wrote, FD_WRITE);
debugs(47,5, HERE << "disker" << KidIdentifier << " write error: " <<
ipcIo.xerrno);
}
+
+ Ipc::Mem::PutPage(ipcIo.page);
}
void
#include "DiskIO/IORequestor.h"
#include "ipc/forward.h"
#include "ipc/Queue.h"
+#include "ipc/mem/Page.h"
#include <list>
#include <map>
/// what kind of I/O the disker needs to do or have done
typedef enum { cmdNone, cmdOpen, cmdRead, cmdWrite } Command;
-enum { BufCapacity = 32*1024 };
-
} // namespace IpcIo
public:
unsigned int requestId; ///< unique for requestor; matches request w/ response
- char buf[IpcIo::BufCapacity]; // XXX: inefficient
off_t offset;
size_t len;
+ Ipc::Mem::PageId page;
IpcIo::Command command; ///< what disker is supposed to do or did
protected:
friend class IpcIoPendingRequest;
void openCompleted(const Ipc::StrandSearchResponse *const response);
- void readCompleted(ReadRequest *readRequest, const IpcIoMsg *const response);
+ void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response);
void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response);
private:
void handleNotification();
void handleResponses(const char *when);
- void handleResponse(const IpcIoMsg &ipcIo);
+ void handleResponse(IpcIoMsg &ipcIo);
static void DiskerHandleRequests(const int workerId);
static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
IpcIoPendingRequest(const IpcIoFile::Pointer &aFile);
/// called when response is received and, with a nil response, on timeouts
- void completeIo(const IpcIoMsg *const response);
+ void completeIo(IpcIoMsg *const response);
public:
const IpcIoFile::Pointer file; ///< the file object waiting for the response