+/*
+ * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
#ifndef SQUID_IPC_IOFILE_H
#define SQUID_IPC_IOFILE_H
#include "DiskIO/DiskFile.h"
#include "DiskIO/IORequestor.h"
#include "ipc/forward.h"
+#include "ipc/mem/Page.h"
+#include "SquidString.h"
+#include <list>
#include <map>
+#include <memory>
+
+namespace Ipc
+{
+class FewToFewBiQueue;
+} // Ipc
// TODO: expand to all classes
-namespace IpcIo {
+namespace IpcIo
+{
/// what kind of I/O the disker needs to do or have done
typedef enum { cmdNone, cmdOpen, cmdRead, cmdWrite } Command;
-enum { BufCapacity = 32*1024 }; // XXX: must not exceed TypedMsgHdr.maxSize
-
} // namespace IpcIo
-
-/// converts DiskIO requests to IPC messages
-// TODO: make this IpcIoMsg to make IpcIoRequest and IpcIoResponse similar
-class IpcIoRequest {
+/// converts DiskIO requests to IPC queue messages
+class IpcIoMsg
+{
public:
- IpcIoRequest();
-
- explicit IpcIoRequest(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
- void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
+ IpcIoMsg();
public:
- int requestorId; ///< kidId of the requestor; used for response destination
- unsigned int requestId; ///< unique for sender; matches request w/ response
+ unsigned int requestId; ///< unique for requestor; matches request w/ response
- /* ReadRequest and WriteRequest parameters to pass to disker */
- char buf[IpcIo::BufCapacity]; // XXX: inefficient
off_t offset;
size_t len;
+ Ipc::Mem::PageId page;
- IpcIo::Command command; ///< what disker is supposed to do
-};
-
-/// disker response to IpcIoRequest
-class IpcIoResponse {
-public:
- IpcIoResponse();
-
- explicit IpcIoResponse(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
- void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
-
-public:
- int diskId; ///< kidId of the responding disker
- unsigned int requestId; ///< unique for sender; matches request w/ response
-
- char buf[IpcIo::BufCapacity]; // XXX: inefficient
- size_t len;
-
- IpcIo::Command command; ///< what disker did
+ IpcIo::Command command; ///< what disker is supposed to do or did
+ struct timeval start; ///< when the I/O request was converted to IpcIoMsg
int xerrno; ///< I/O error code or zero
};
class IpcIoFile: public DiskFile
{
+ CBDATA_CLASS(IpcIoFile);
public:
typedef RefCount<IpcIoFile> Pointer;
virtual ~IpcIoFile();
/* DiskFile API */
+ virtual void configure(const Config &cfg);
virtual void open(int flags, mode_t mode, RefCount<IORequestor> callback);
virtual void create(int flags, mode_t mode, RefCount<IORequestor> callback);
virtual void read(ReadRequest *);
virtual bool canWrite() const;
virtual bool ioInProgress() const;
- /// finds and calls the right IpcIoFile upon disker's response
- static void HandleResponse(const Ipc::TypedMsgHdr &response);
+ /// handle open response from coordinator
+ static void HandleOpenResponse(const Ipc::StrandSearchResponse &response);
+
+ /// handle queue push notifications from worker or disker
+ static void HandleNotification(const Ipc::TypedMsgHdr &msg);
- /// disker entry point for remote I/O requests
- static void HandleRequest(const IpcIoRequest &request);
+ DiskFile::Config config; ///< supported configuration options
protected:
friend class IpcIoPendingRequest;
- void openCompleted(const IpcIoResponse *response);
- void readCompleted(ReadRequest *readRequest, const IpcIoResponse *);
- void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse *);
+ void openCompleted(const Ipc::StrandSearchResponse *const response);
+ void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response);
+ void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response);
+ bool canWait() const;
private:
- void send(IpcIoRequest &request, IpcIoPendingRequest *pending);
+ void trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending);
+ void push(IpcIoPendingRequest *const pending);
+ IpcIoPendingRequest *dequeueRequest(const unsigned int requestId);
- static IpcIoPendingRequest *DequeueRequest(unsigned int requestId);
+ static void Notify(const int peerId);
- static void CheckTimeouts(void* param);
- static void ScheduleTimeoutCheck();
+ static void OpenTimeout(void *const param);
+ static void CheckTimeouts(void *const param);
+ void checkTimeouts();
+ void scheduleTimeoutCheck();
+
+ static void HandleResponses(const char *const when);
+ void handleResponse(IpcIoMsg &ipcIo);
+
+ static void DiskerHandleMoreRequests(void*);
+ static void DiskerHandleRequests();
+ static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
+ static bool WaitBeforePop();
private:
const String dbName; ///< the name of the file we are managing
int diskId; ///< the process ID of the disker we talk to
RefCount<IORequestor> ioRequestor;
- int ioLevel; ///< number of pending I/O requests using this file
-
bool error_; ///< whether we have seen at least one I/O error (XXX)
+ unsigned int lastRequestId; ///< last requestId used
+
/// maps requestId to the handleResponse callback
typedef std::map<unsigned int, IpcIoPendingRequest*> RequestMap;
- static RequestMap TheRequestMap1; ///< older (or newer) pending requests
- static RequestMap TheRequestMap2; ///< newer (or older) pending requests
- static RequestMap *TheOlderRequests; ///< older requests (map1 or map2)
- static RequestMap *TheNewerRequests; ///< newer requests (map2 or map1)
- static bool TimeoutCheckScheduled; ///< we expect a CheckTimeouts() call
+ RequestMap requestMap1; ///< older (or newer) pending requests
+ RequestMap requestMap2; ///< newer (or older) pending requests
+ RequestMap *olderRequests; ///< older requests (map1 or map2)
+ RequestMap *newerRequests; ///< newer requests (map2 or map1)
+ bool timeoutCheckScheduled; ///< we expect a CheckTimeouts() call
- static unsigned int LastRequestId; ///< last requestId used
+ static const double Timeout; ///< timeout value in seconds
- CBDATA_CLASS2(IpcIoFile);
-};
+ typedef std::list<Pointer> IpcIoFileList;
+ static IpcIoFileList WaitingForOpen; ///< pending open requests
+ ///< maps diskerId to IpcIoFile, cleared in destructor
+ typedef std::map<int, IpcIoFile*> IpcIoFilesMap;
+ static IpcIoFilesMap IpcIoFiles;
+
+ typedef Ipc::FewToFewBiQueue Queue;
+ static std::unique_ptr<Queue> queue; ///< IPC queue
+
+ /// whether we are waiting for an event to handle still queued I/O requests
+ static bool DiskerHandleMoreRequestsScheduled;
+};
/// keeps original I/O request parameters while disker is handling the request
class IpcIoPendingRequest
IpcIoPendingRequest(const IpcIoFile::Pointer &aFile);
/// called when response is received and, with a nil response, on timeouts
- void completeIo(IpcIoResponse *response);
+ void completeIo(IpcIoMsg *const response);
public:
- IpcIoFile::Pointer file; ///< the file object waiting for the response
+ const IpcIoFile::Pointer file; ///< the file object waiting for the response
ReadRequest *readRequest; ///< set if this is a read requests
WriteRequest *writeRequest; ///< set if this is a write request
IpcIoPendingRequest &operator =(const IpcIoPendingRequest &d); // ditto
};
-
#endif /* SQUID_IPC_IOFILE_H */
+