void
IpcIoFile::open(int flags, mode_t mode, RefCount<IORequestor> callback)
{
- debugs(0,0,HERE);
ioRequestor = callback;
Must(diskId < 0); // we do not know our disker yet
Must(!diskerQueue && !workerQueue);
Ipc::StrandSearchRequest request;
request.requestorId = KidIdentifier;
+ request.data = this;
request.tag = dbName;
Ipc::TypedMsgHdr msg;
void
IpcIoFile::openCompleted(const Ipc::StrandSearchResponse *const response) {
- debugs(0,0,HERE);
Must(diskId < 0); // we do not know our disker yet
Must(!workerQueue);
+ // contain single pending open request at this time
+ newerRequests->clear();
+ olderRequests->clear();
+
if (!response) {
debugs(79,1, HERE << "error: timeout");
error_ = true;
} else {
diskId = response->strand.kidId;
if (diskId >= 0) {
- workerQueue = DiskerQueue::Attach(dbName.termedBuf(), KidIdentifier);
+ workerQueue = DiskerQueue::Attach(dbName.termedBuf(), KidIdentifier - 1);
ipcIoFiles.insert(std::make_pair(diskId, this));
} else {
error_ = true;
void
IpcIoFile::create(int flags, mode_t mode, RefCount<IORequestor> callback)
{
- debugs(0,0,HERE);
assert(false); // check
/* We use the same logic path for open */
open(flags, mode, callback);
void
IpcIoFile::close()
{
- debugs(0,0,HERE);
assert(ioRequestor != NULL);
delete diskerQueue;
IpcIoFile::readCompleted(ReadRequest *readRequest,
const IpcIoMsg *const response)
{
- debugs(0,0,HERE);
bool ioError = false;
if (!response) {
debugs(79,1, HERE << "error: timeout");
IpcIoFile::writeCompleted(WriteRequest *writeRequest,
const IpcIoMsg *const response)
{
- debugs(0,0,HERE);
bool ioError = false;
if (!response) {
debugs(79,1, HERE << "error: timeout");
void
IpcIoFile::trackPendingRequest(IpcIoPendingRequest &pending)
{
- debugs(0,0,HERE);
newerRequests->insert(std::make_pair(pending.id, &pending));
if (!timeoutCheckScheduled)
scheduleTimeoutCheck();
void
IpcIoFile::push(IpcIoPendingRequest &pending)
{
- debugs(0,0,HERE);
+ debugs(47, 7, HERE);
Must(diskId >= 0);
Must(workerQueue);
Must(pending.readRequest || pending.writeRequest);
IpcIoMsg ipcIo;
+ ipcIo.requestId = pending.id;
if (pending.readRequest) {
ipcIo.command = IpcIo::cmdRead;
ipcIo.offset = pending.readRequest->offset;
void
IpcIoFile::Notify(const int peerId)
{
- debugs(0,0,HERE);
+ debugs(47, 7, HERE << "kid" << peerId);
Ipc::TypedMsgHdr msg;
msg.setType(Ipc::mtIpcIoNotification); // TODO: add proper message type?
msg.putInt(KidIdentifier);
void
IpcIoFile::HandleNotification(const Ipc::TypedMsgHdr &msg)
{
- debugs(0,0,HERE);
+ debugs(47, 7, HERE);
if (IamDiskProcess())
DiskerHandleRequests();
else {
void
IpcIoFile::CheckTimeouts(void *const param)
{
- debugs(0,0,HERE);
Must(param);
reinterpret_cast<IpcIoFile*>(param)->checkTimeouts();
}
void
IpcIoFile::checkTimeouts()
{
- debugs(0,0,HERE);
timeoutCheckScheduled = false;
// any old request would have timed out by now
void
IpcIoFile::scheduleTimeoutCheck()
{
- debugs(0,0,HERE);
// we check all older requests at once so some may be wait for 2*timeout
const double timeout = 7; // in seconds
eventAdd("IpcIoFile::CheckTimeouts", &IpcIoFile::CheckTimeouts,
IpcIoPendingRequest *
IpcIoFile::dequeueRequest(const unsigned int requestId)
{
- debugs(47, 3, HERE);
Must(requestId != 0);
RequestMap *map = NULL;
IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
file(aFile), readRequest(NULL), writeRequest(NULL)
{
- debugs(0,0,HERE);
if (++file->lastRequestId == 0) // don't use zero value as requestId
++file->lastRequestId;
id = file->lastRequestId;
void
IpcIoPendingRequest::completeIo(const IpcIoMsg *const response)
{
- debugs(0,0,HERE);
Must(file != NULL);
if (readRequest)
static void
diskerRead(IpcIoMsg &ipcIo)
{
- debugs(0,0,HERE);
const ssize_t read = pread(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
statCounter.syscalls.disk.reads++;
fd_bytes(TheFile, read, FD_READ);
static void
diskerWrite(IpcIoMsg &ipcIo)
{
- debugs(0,0,HERE);
const ssize_t wrote = pwrite(TheFile, ipcIo.buf, ipcIo.len, ipcIo.offset);
statCounter.syscalls.disk.writes++;
fd_bytes(TheFile, wrote, FD_WRITE);
void
IpcIoFile::DiskerHandleRequests()
{
- debugs(0,0,HERE);
Must(diskerQueue);
int workerId;
IpcIoMsg ipcIo;
void
IpcIoFile::DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo)
{
- debugs(0,0,HERE);
Must(diskerQueue);
if (ipcIo.command != IpcIo::cmdRead && ipcIo.command != IpcIo::cmdWrite) {
diskerQueue->push(workerId, ipcIo); // XXX: report warning
if (diskerQueue->pushedSize(workerId) == 1)
- Notify(workerId); // notify worker
+ Notify(workerId + 1); // notify worker
}
static bool
DiskerOpen(const String &path, int flags, mode_t mode)
{
- debugs(0,0,HERE);
assert(TheFile < 0);
TheFile = file_open(path.termedBuf(), flags);
static void
DiskerClose(const String &path)
{
- debugs(0,0,HERE);
if (TheFile >= 0) {
file_close(TheFile);
debugs(79,3, HERE << "rock db closed " << path << ": FD " << TheFile);
TheFile = -1;
store_open_disk_fd--;
- }
+ }
}