]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Implemented basic file descriptor exchange between IPC Strand and Coordinator.
authorAlex Rousskov <rousskov@measurement-factory.com>
Thu, 29 Apr 2010 22:35:11 +0000 (16:35 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Thu, 29 Apr 2010 22:35:11 +0000 (16:35 -0600)
Tested using on-disk file. The test hack will be removed.

Fixed Port code to allow it to receive more than one message with varying
msghdr buffer configurations. We must [re]allocate all msghdr buffers before
every read/recvmsg() call.

src/ipc/Coordinator.cc
src/ipc/Coordinator.h
src/ipc/Messages.cc
src/ipc/Messages.h
src/ipc/Port.cc
src/ipc/Strand.cc
src/ipc/Strand.h

index 02f91a70c378249a17173a41b09b44ec65d6083c..a232a83b5f3fec98dfbf5585964f7b55f3dac3b5 100644 (file)
@@ -50,8 +50,13 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message)
         handleRegistrationRequest(StrandCoord(message));
         break;
 
+    case mtDescriptorGet:
+        debugs(54, 6, HERE << "Descriptor get request");
+        handleDescriptorGet(Descriptor(message));
+        break;
+
     default:
-        debugs(54, 6, HERE << "Unhandled message type: " << message.type());
+        debugs(54, 1, HERE << "Unhandled message type: " << message.type());
         break;
     }
 }
@@ -66,6 +71,34 @@ void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
     SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
 }
 
+void Ipc::Coordinator::handleDescriptorGet(const Descriptor& request)
+{
+    // XXX: hack: create descriptor here
+    char buffer[64];
+    snprintf(buffer, sizeof(buffer), "/tmp/squid_shared_file.txt");
+    static int fd = -1;
+    if (fd < 0) {
+        fd = open(buffer, O_CREAT | O_RDWR | O_APPEND, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
+        int n = snprintf(buffer, sizeof(buffer), "coord: created %d\n", fd);
+        ssize_t bytes = write(fd, buffer, n);
+        Must(bytes == n);
+        debugs(54, 6, "Created FD " << fd << " for kid" << request.fromKid);
+    } else {
+        int n = snprintf(buffer, sizeof(buffer), "coord: updated %d\n", fd);
+        ssize_t bytes = write(fd, buffer, n);
+        Must(bytes == n);
+    }
+
+    debugs(54, 6, "Sending FD " << fd << " to kid" << request.fromKid);
+
+    Descriptor response(-1, fd);
+    TypedMsgHdr message;
+    response.pack(message);
+    SendMessage(MakeAddr(strandAddrPfx, request.fromKid), message);
+
+    // XXX: close(fd); fd should be opened until the message has not reached rec iver 
+}
+
 void Ipc::Coordinator::broadcastSignal(int sig) const
 {
     typedef Strands::const_iterator SCI;
index 1d81a4ed7ea2c37c434d83c3ecd3a548ab089014..80e150a76824f6b10a08498fac3daae4efacb44a 100644 (file)
@@ -35,6 +35,8 @@ protected:
     void registerStrand(const StrandCoord &); ///< adds or updates existing
     void handleRegistrationRequest(const StrandCoord &); ///< register,ACK
 
+    void handleDescriptorGet(const Descriptor& request);
+
 private:
     typedef Vector<StrandCoord> Strands; ///< unsorted strands
     Strands strands; ///< registered processes and threads
index 7060a940f488900e82293c0cb2691188446c2765..5b0d585af83c922da439535bd815e803b6b1ffc7 100644 (file)
@@ -22,10 +22,40 @@ Ipc::StrandCoord::StrandCoord(int aKidId, pid_t aPid): kidId(aKidId), pid(aPid)
 
 Ipc::StrandCoord::StrandCoord(const TypedMsgHdr &hdrMsg): kidId(-1), pid(0)
 {
-       hdrMsg.getData(mtRegistration, this, sizeof(this));
+    hdrMsg.getData(mtRegistration, this, sizeof(this));
 }
 
 void Ipc::StrandCoord::pack(TypedMsgHdr &hdrMsg) const
 {
     hdrMsg.putData(mtRegistration, this, sizeof(this));
 }
+
+
+Ipc::Descriptor::Descriptor(): fromKid(-1), fd(-1)
+{
+}
+
+Ipc::Descriptor::Descriptor(int aFromKid, int aFd): fromKid(aFromKid), fd(aFd)
+{
+}
+
+Ipc::Descriptor::Descriptor(const TypedMsgHdr &hdrMsg): fromKid(-1), fd(-1)
+{
+    if (hdrMsg.type() == mtDescriptorGet) {
+        hdrMsg.getData(mtDescriptorGet, this, sizeof(this));
+        fd = -1;
+    } else {
+        hdrMsg.getData(mtDescriptorPut, this, sizeof(this));
+        fd = hdrMsg.getFd();
+    }
+}
+
+void Ipc::Descriptor::pack(TypedMsgHdr &hdrMsg) const
+{
+    if (fd >= 0) {
+        hdrMsg.putData(mtDescriptorPut, this, sizeof(this));
+        hdrMsg.putFd(fd);
+    } else {
+        hdrMsg.putData(mtDescriptorGet, this, sizeof(this));
+    }
+}
index d7b1aae232289fa4f00de9191539577c739f3996..d545b21b74c5b49b76817654ea9a8065e146f292 100644 (file)
@@ -19,15 +19,16 @@ namespace Ipc
 
 class TypedMsgHdr;
 
-typedef enum { mtNone = 0, mtRegistration, mtDescriptor } MessageType;
+typedef enum { mtNone = 0, mtRegistration,
+    mtDescriptorGet, mtDescriptorPut } MessageType;
 
 /// Strand location details
 class StrandCoord {
 public:
-       StrandCoord(); ///< unknown location
-       StrandCoord(int akidId, pid_t aPid); ///< from registrant
-       explicit StrandCoord(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
-       void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+    StrandCoord(); ///< unknown location
+    StrandCoord(int akidId, pid_t aPid); ///< from registrant
+    explicit StrandCoord(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+    void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
 
 public:
     int kidId; ///< internal Squid process number
@@ -38,11 +39,13 @@ public:
 class Descriptor
 {
 public:
-    explicit Descriptor(int fd); ///< from descriptor sender
-       explicit Descriptor(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
-       void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+    Descriptor(); ///< unknown descriptor
+    Descriptor(int fromKid, int fd); ///< from descriptor sender or requestor
+    explicit Descriptor(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+    void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
 
 public:
+    int fromKid; /// the source of this message
     int fd; ///< raw descriptor value
 };
 
index 8d525005cbe66ab02cf39bc83cad98c9efa2478f..b02c18ee4960e2daf17f1091fd5d9c843ff5e93d 100644 (file)
@@ -18,7 +18,6 @@ Ipc::Port::Port(const String& aListenAddr):
     UdsOp(aListenAddr)
 {
     setOptions(COMM_NONBLOCKING | COMM_DOBIND);
-    buf.allocate();
 }
 
 void Ipc::Port::start()
@@ -30,6 +29,7 @@ void Ipc::Port::start()
 void Ipc::Port::listen()
 {
     debugs(54, 6, HERE);
+    buf.prepForReading();
     AsyncCall::Pointer readHandler = asyncCall(54, 6, "Ipc::Port::noteRead",
         CommCbMemFunT<Port, CommIoCbParams>(this, &Port::noteRead));
     comm_read(fd(), buf.raw(), buf.size(), readHandler);
index 0c8f59ebced15cf2e67354ec7084ce513fe74ab8..3bf75c7dfaf602cd9d56bc1f904e528859e7596e 100644 (file)
@@ -38,15 +38,19 @@ void Ipc::Strand::registerSelf()
 
 void Ipc::Strand::receive(const TypedMsgHdr &message)
 {
-    debugs(54, 6, HERE);
+    debugs(54, 6, HERE << message.type());
     switch (message.type()) {
 
     case mtRegistration:
         handleRegistrationResponse(StrandCoord(message));
         break;
 
+    case mtDescriptorPut:
+        putDescriptor(Descriptor(message));
+        break;
+
     default:
-        debugs(54, 6, HERE << "Unhandled message type: " << message.type());
+        debugs(54, 1, HERE << "Unhandled message type: " << message.type());
         break;
     }
 }
@@ -57,6 +61,12 @@ void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand)
     if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
         debugs(54, 6, "kid" << KidIdentifier << " registered");
         clearTimeout(); // we are done
+
+        debugs(54, 6, HERE << "requesting FD");
+        Descriptor request(KidIdentifier, -1);
+        TypedMsgHdr message;
+        request.pack(message);
+        SendMessage(coordinatorAddr, message);
     } else {
         // could be an ACK to the registration message of our dead predecessor
         debugs(54, 6, "kid" << KidIdentifier << " is not yet registered");
@@ -64,6 +74,16 @@ void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand)
     }
 }
 
+/// receive descriptor we asked for
+void Ipc::Strand::putDescriptor(const Descriptor &message)
+{
+    debugs(54, 6, HERE << "got FD " << message.fd);
+    char buffer[64];
+    const int n = snprintf(buffer, sizeof(buffer), "strand: kid%d wrote using FD %d\n", KidIdentifier, message.fd);
+    ssize_t bytes = write(message.fd, buffer, n);
+    Must(bytes == n);
+}
+
 void Ipc::Strand::timedout()
 {
     debugs(54, 6, HERE << isRegistered);
index 5a1e26e7b47319f301d65ebb3dae7ec5d4383bcd..33f644ab9775de011099b032508ca2551bb9149c 100644 (file)
@@ -15,6 +15,7 @@ namespace Ipc
 {
 
 class StrandCoord;
+class Descriptor;
 
 /// Receives coordination messages on behalf of its process or thread
 class Strand: public Port
@@ -31,6 +32,7 @@ protected:
 private:
     void registerSelf(); /// let Coordinator know this strand exists
     void handleRegistrationResponse(const StrandCoord &strand);
+    void putDescriptor(const Descriptor &message);
 
 private:
     bool isRegistered; ///< whether Coordinator ACKed registration (unused)