]> git.ipfire.org Git - thirdparty/squid.git/blobdiff - src/DiskIO/IpcIo/IpcIoFile.h
Docs: Copyright updates for 2018 (#114)
[thirdparty/squid.git] / src / DiskIO / IpcIo / IpcIoFile.h
index 10181eae1a00484d2a56c3b1b959d8c668ac4f1c..f89197ed82c7add134effb1a02b047f9eeee04fc 100644 (file)
@@ -1,3 +1,11 @@
+/*
+ * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
 #ifndef SQUID_IPC_IOFILE_H
 #define SQUID_IPC_IOFILE_H
 
 #include "DiskIO/DiskFile.h"
 #include "DiskIO/IORequestor.h"
 #include "ipc/forward.h"
+#include "ipc/mem/Page.h"
+#include "SquidString.h"
+#include <list>
 #include <map>
+#include <memory>
+
+namespace Ipc
+{
+class FewToFewBiQueue;
+} // Ipc
 
 // TODO: expand to all classes
-namespace IpcIo {
+namespace IpcIo
+{
 
 /// what kind of I/O the disker needs to do or have done
 typedef enum { cmdNone, cmdOpen, cmdRead, cmdWrite } Command;
 
-enum { BufCapacity = 32*1024 }; // XXX: must not exceed TypedMsgHdr.maxSize
-
 } // namespace IpcIo
 
-
-/// converts DiskIO requests to IPC messages
-// TODO: make this IpcIoMsg to make IpcIoRequest and IpcIoResponse similar
-class IpcIoRequest {
+/// converts DiskIO requests to IPC queue messages
+class IpcIoMsg
+{
 public:
-    IpcIoRequest();
-
-    explicit IpcIoRequest(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
-    void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
+    IpcIoMsg();
 
 public:
-    int requestorId; ///< kidId of the requestor; used for response destination
-    unsigned int requestId; ///< unique for sender; matches request w/ response
+    unsigned int requestId; ///< unique for requestor; matches request w/ response
 
-    /* ReadRequest and WriteRequest parameters to pass to disker */
-    char buf[IpcIo::BufCapacity]; // XXX: inefficient
     off_t offset;
     size_t len;
+    Ipc::Mem::PageId page;
 
-    IpcIo::Command command; ///< what disker is supposed to do
-};
-
-/// disker response to IpcIoRequest
-class IpcIoResponse {
-public:
-    IpcIoResponse();
-
-    explicit IpcIoResponse(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
-    void pack(Ipc::TypedMsgHdr& msg) const; ///< prepare for sendmsg()
-
-public:
-    int diskId; ///< kidId of the responding disker
-    unsigned int requestId; ///< unique for sender; matches request w/ response
-
-    char buf[IpcIo::BufCapacity]; // XXX: inefficient
-    size_t len;
-
-    IpcIo::Command command; ///< what disker did
+    IpcIo::Command command; ///< what disker is supposed to do or did
+    struct timeval start; ///< when the I/O request was converted to IpcIoMsg
 
     int xerrno; ///< I/O error code or zero
 };
@@ -64,6 +57,7 @@ class IpcIoPendingRequest;
 
 class IpcIoFile: public DiskFile
 {
+    CBDATA_CLASS(IpcIoFile);
 
 public:
     typedef RefCount<IpcIoFile> Pointer;
@@ -72,6 +66,7 @@ public:
     virtual ~IpcIoFile();
 
     /* DiskFile API */
+    virtual void configure(const Config &cfg);
     virtual void open(int flags, mode_t mode, RefCount<IORequestor> callback);
     virtual void create(int flags, mode_t mode, RefCount<IORequestor> callback);
     virtual void read(ReadRequest *);
@@ -83,48 +78,73 @@ public:
     virtual bool canWrite() const;
     virtual bool ioInProgress() const;
 
-    /// finds and calls the right IpcIoFile upon disker's response
-    static void HandleResponse(const Ipc::TypedMsgHdr &response);
+    /// handle open response from coordinator
+    static void HandleOpenResponse(const Ipc::StrandSearchResponse &response);
+
+    /// handle queue push notifications from worker or disker
+    static void HandleNotification(const Ipc::TypedMsgHdr &msg);
 
-    /// disker entry point for remote I/O requests
-    static void HandleRequest(const IpcIoRequest &request);
+    DiskFile::Config config; ///< supported configuration options
 
 protected:
     friend class IpcIoPendingRequest;
-    void openCompleted(const IpcIoResponse *response);
-    void readCompleted(ReadRequest *readRequest, const IpcIoResponse *);
-    void writeCompleted(WriteRequest *writeRequest, const IpcIoResponse *);
+    void openCompleted(const Ipc::StrandSearchResponse *const response);
+    void readCompleted(ReadRequest *readRequest, IpcIoMsg *const response);
+    void writeCompleted(WriteRequest *writeRequest, const IpcIoMsg *const response);
+    bool canWait() const;
 
 private:
-    void send(IpcIoRequest &request, IpcIoPendingRequest *pending);
+    void trackPendingRequest(const unsigned int id, IpcIoPendingRequest *const pending);
+    void push(IpcIoPendingRequest *const pending);
+    IpcIoPendingRequest *dequeueRequest(const unsigned int requestId);
 
-    static IpcIoPendingRequest *DequeueRequest(unsigned int requestId);
+    static void Notify(const int peerId);
 
-    static void CheckTimeouts(void* param);
-    static void ScheduleTimeoutCheck();
+    static void OpenTimeout(void *const param);
+    static void CheckTimeouts(void *const param);
+    void checkTimeouts();
+    void scheduleTimeoutCheck();
+
+    static void HandleResponses(const char *const when);
+    void handleResponse(IpcIoMsg &ipcIo);
+
+    static void DiskerHandleMoreRequests(void*);
+    static void DiskerHandleRequests();
+    static void DiskerHandleRequest(const int workerId, IpcIoMsg &ipcIo);
+    static bool WaitBeforePop();
 
 private:
     const String dbName; ///< the name of the file we are managing
     int diskId; ///< the process ID of the disker we talk to
     RefCount<IORequestor> ioRequestor;
 
-    int ioLevel; ///< number of pending I/O requests using this file
-
     bool error_; ///< whether we have seen at least one I/O error (XXX)
 
+    unsigned int lastRequestId; ///< last requestId used
+
     /// maps requestId to the handleResponse callback
     typedef std::map<unsigned int, IpcIoPendingRequest*> RequestMap;
-    static RequestMap TheRequestMap1; ///< older (or newer) pending requests
-    static RequestMap TheRequestMap2; ///< newer (or older) pending requests
-    static RequestMap *TheOlderRequests; ///< older requests (map1 or map2)
-    static RequestMap *TheNewerRequests; ///< newer requests (map2 or map1)
-    static bool TimeoutCheckScheduled; ///< we expect a CheckTimeouts() call
+    RequestMap requestMap1; ///< older (or newer) pending requests
+    RequestMap requestMap2; ///< newer (or older) pending requests
+    RequestMap *olderRequests; ///< older requests (map1 or map2)
+    RequestMap *newerRequests; ///< newer requests (map2 or map1)
+    bool timeoutCheckScheduled; ///< we expect a CheckTimeouts() call
 
-    static unsigned int LastRequestId; ///< last requestId used
+    static const double Timeout; ///< timeout value in seconds
 
-    CBDATA_CLASS2(IpcIoFile);
-};
+    typedef std::list<Pointer> IpcIoFileList;
+    static IpcIoFileList WaitingForOpen; ///< pending open requests
 
+    ///< maps diskerId to IpcIoFile, cleared in destructor
+    typedef std::map<int, IpcIoFile*> IpcIoFilesMap;
+    static IpcIoFilesMap IpcIoFiles;
+
+    typedef Ipc::FewToFewBiQueue Queue;
+    static std::unique_ptr<Queue> queue; ///< IPC queue
+
+    /// whether we are waiting for an event to handle still queued I/O requests
+    static bool DiskerHandleMoreRequestsScheduled;
+};
 
 /// keeps original I/O request parameters while disker is handling the request
 class IpcIoPendingRequest
@@ -133,10 +153,10 @@ public:
     IpcIoPendingRequest(const IpcIoFile::Pointer &aFile);
 
     /// called when response is received and, with a nil response, on timeouts
-    void completeIo(IpcIoResponse *response);
+    void completeIo(IpcIoMsg *const response);
 
 public:
-    IpcIoFile::Pointer file; ///< the file object waiting for the response
+    const IpcIoFile::Pointer file; ///< the file object waiting for the response
     ReadRequest *readRequest; ///< set if this is a read requests
     WriteRequest *writeRequest; ///< set if this is a write request
 
@@ -145,5 +165,5 @@ private:
     IpcIoPendingRequest &operator =(const IpcIoPendingRequest &d); // ditto
 };
 
-
 #endif /* SQUID_IPC_IOFILE_H */
+