From: Alex Rousskov Date: Thu, 29 Apr 2010 22:35:11 +0000 (-0600) Subject: Implemented basic file descriptor exchange between IPC Strand and Coordinator. X-Git-Tag: SQUID_3_2_0_1~93^2~28 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=7230e9c74aa96bc804d62086e370352016380ac7;p=thirdparty%2Fsquid.git Implemented basic file descriptor exchange between IPC Strand and Coordinator. Tested using on-disk file. The test hack will be removed. Fixed Port code to allow it to receive more than one message with varying msghdr buffer configurations. We must [re]allocate all msghdr buffers before every read/recvmsg() call. --- diff --git a/src/ipc/Coordinator.cc b/src/ipc/Coordinator.cc index 02f91a70c3..a232a83b5f 100644 --- a/src/ipc/Coordinator.cc +++ b/src/ipc/Coordinator.cc @@ -50,8 +50,13 @@ void Ipc::Coordinator::receive(const TypedMsgHdr& message) handleRegistrationRequest(StrandCoord(message)); break; + case mtDescriptorGet: + debugs(54, 6, HERE << "Descriptor get request"); + handleDescriptorGet(Descriptor(message)); + break; + default: - debugs(54, 6, HERE << "Unhandled message type: " << message.type()); + debugs(54, 1, HERE << "Unhandled message type: " << message.type()); break; } } @@ -66,6 +71,34 @@ void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand) SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message); } +void Ipc::Coordinator::handleDescriptorGet(const Descriptor& request) +{ + // XXX: hack: create descriptor here + char buffer[64]; + snprintf(buffer, sizeof(buffer), "/tmp/squid_shared_file.txt"); + static int fd = -1; + if (fd < 0) { + fd = open(buffer, O_CREAT | O_RDWR | O_APPEND, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH); + int n = snprintf(buffer, sizeof(buffer), "coord: created %d\n", fd); + ssize_t bytes = write(fd, buffer, n); + Must(bytes == n); + debugs(54, 6, "Created FD " << fd << " for kid" << request.fromKid); + } else { + int n = snprintf(buffer, sizeof(buffer), "coord: updated %d\n", fd); + ssize_t bytes = write(fd, buffer, n); + Must(bytes == n); + } + + debugs(54, 6, "Sending FD " << fd << " to kid" << request.fromKid); + + Descriptor response(-1, fd); + TypedMsgHdr message; + response.pack(message); + SendMessage(MakeAddr(strandAddrPfx, request.fromKid), message); + + // XXX: close(fd); fd should be opened until the message has not reached rec iver +} + void Ipc::Coordinator::broadcastSignal(int sig) const { typedef Strands::const_iterator SCI; diff --git a/src/ipc/Coordinator.h b/src/ipc/Coordinator.h index 1d81a4ed7e..80e150a768 100644 --- a/src/ipc/Coordinator.h +++ b/src/ipc/Coordinator.h @@ -35,6 +35,8 @@ protected: void registerStrand(const StrandCoord &); ///< adds or updates existing void handleRegistrationRequest(const StrandCoord &); ///< register,ACK + void handleDescriptorGet(const Descriptor& request); + private: typedef Vector Strands; ///< unsorted strands Strands strands; ///< registered processes and threads diff --git a/src/ipc/Messages.cc b/src/ipc/Messages.cc index 7060a940f4..5b0d585af8 100644 --- a/src/ipc/Messages.cc +++ b/src/ipc/Messages.cc @@ -22,10 +22,40 @@ Ipc::StrandCoord::StrandCoord(int aKidId, pid_t aPid): kidId(aKidId), pid(aPid) Ipc::StrandCoord::StrandCoord(const TypedMsgHdr &hdrMsg): kidId(-1), pid(0) { - hdrMsg.getData(mtRegistration, this, sizeof(this)); + hdrMsg.getData(mtRegistration, this, sizeof(this)); } void Ipc::StrandCoord::pack(TypedMsgHdr &hdrMsg) const { hdrMsg.putData(mtRegistration, this, sizeof(this)); } + + +Ipc::Descriptor::Descriptor(): fromKid(-1), fd(-1) +{ +} + +Ipc::Descriptor::Descriptor(int aFromKid, int aFd): fromKid(aFromKid), fd(aFd) +{ +} + +Ipc::Descriptor::Descriptor(const TypedMsgHdr &hdrMsg): fromKid(-1), fd(-1) +{ + if (hdrMsg.type() == mtDescriptorGet) { + hdrMsg.getData(mtDescriptorGet, this, sizeof(this)); + fd = -1; + } else { + hdrMsg.getData(mtDescriptorPut, this, sizeof(this)); + fd = hdrMsg.getFd(); + } +} + +void Ipc::Descriptor::pack(TypedMsgHdr &hdrMsg) const +{ + if (fd >= 0) { + hdrMsg.putData(mtDescriptorPut, this, sizeof(this)); + hdrMsg.putFd(fd); + } else { + hdrMsg.putData(mtDescriptorGet, this, sizeof(this)); + } +} diff --git a/src/ipc/Messages.h b/src/ipc/Messages.h index d7b1aae232..d545b21b74 100644 --- a/src/ipc/Messages.h +++ b/src/ipc/Messages.h @@ -19,15 +19,16 @@ namespace Ipc class TypedMsgHdr; -typedef enum { mtNone = 0, mtRegistration, mtDescriptor } MessageType; +typedef enum { mtNone = 0, mtRegistration, + mtDescriptorGet, mtDescriptorPut } MessageType; /// Strand location details class StrandCoord { public: - StrandCoord(); ///< unknown location - StrandCoord(int akidId, pid_t aPid); ///< from registrant - explicit StrandCoord(const TypedMsgHdr &hdrMsg); ///< from recvmsg() - void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() + StrandCoord(); ///< unknown location + StrandCoord(int akidId, pid_t aPid); ///< from registrant + explicit StrandCoord(const TypedMsgHdr &hdrMsg); ///< from recvmsg() + void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() public: int kidId; ///< internal Squid process number @@ -38,11 +39,13 @@ public: class Descriptor { public: - explicit Descriptor(int fd); ///< from descriptor sender - explicit Descriptor(const TypedMsgHdr &hdrMsg); ///< from recvmsg() - void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() + Descriptor(); ///< unknown descriptor + Descriptor(int fromKid, int fd); ///< from descriptor sender or requestor + explicit Descriptor(const TypedMsgHdr &hdrMsg); ///< from recvmsg() + void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg() public: + int fromKid; /// the source of this message int fd; ///< raw descriptor value }; diff --git a/src/ipc/Port.cc b/src/ipc/Port.cc index 8d525005cb..b02c18ee49 100644 --- a/src/ipc/Port.cc +++ b/src/ipc/Port.cc @@ -18,7 +18,6 @@ Ipc::Port::Port(const String& aListenAddr): UdsOp(aListenAddr) { setOptions(COMM_NONBLOCKING | COMM_DOBIND); - buf.allocate(); } void Ipc::Port::start() @@ -30,6 +29,7 @@ void Ipc::Port::start() void Ipc::Port::listen() { debugs(54, 6, HERE); + buf.prepForReading(); AsyncCall::Pointer readHandler = asyncCall(54, 6, "Ipc::Port::noteRead", CommCbMemFunT(this, &Port::noteRead)); comm_read(fd(), buf.raw(), buf.size(), readHandler); diff --git a/src/ipc/Strand.cc b/src/ipc/Strand.cc index 0c8f59ebce..3bf75c7dfa 100644 --- a/src/ipc/Strand.cc +++ b/src/ipc/Strand.cc @@ -38,15 +38,19 @@ void Ipc::Strand::registerSelf() void Ipc::Strand::receive(const TypedMsgHdr &message) { - debugs(54, 6, HERE); + debugs(54, 6, HERE << message.type()); switch (message.type()) { case mtRegistration: handleRegistrationResponse(StrandCoord(message)); break; + case mtDescriptorPut: + putDescriptor(Descriptor(message)); + break; + default: - debugs(54, 6, HERE << "Unhandled message type: " << message.type()); + debugs(54, 1, HERE << "Unhandled message type: " << message.type()); break; } } @@ -57,6 +61,12 @@ void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand) if (strand.kidId == KidIdentifier && strand.pid == getpid()) { debugs(54, 6, "kid" << KidIdentifier << " registered"); clearTimeout(); // we are done + + debugs(54, 6, HERE << "requesting FD"); + Descriptor request(KidIdentifier, -1); + TypedMsgHdr message; + request.pack(message); + SendMessage(coordinatorAddr, message); } else { // could be an ACK to the registration message of our dead predecessor debugs(54, 6, "kid" << KidIdentifier << " is not yet registered"); @@ -64,6 +74,16 @@ void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand) } } +/// receive descriptor we asked for +void Ipc::Strand::putDescriptor(const Descriptor &message) +{ + debugs(54, 6, HERE << "got FD " << message.fd); + char buffer[64]; + const int n = snprintf(buffer, sizeof(buffer), "strand: kid%d wrote using FD %d\n", KidIdentifier, message.fd); + ssize_t bytes = write(message.fd, buffer, n); + Must(bytes == n); +} + void Ipc::Strand::timedout() { debugs(54, 6, HERE << isRegistered); diff --git a/src/ipc/Strand.h b/src/ipc/Strand.h index 5a1e26e7b4..33f644ab97 100644 --- a/src/ipc/Strand.h +++ b/src/ipc/Strand.h @@ -15,6 +15,7 @@ namespace Ipc { class StrandCoord; +class Descriptor; /// Receives coordination messages on behalf of its process or thread class Strand: public Port @@ -31,6 +32,7 @@ protected: private: void registerSelf(); /// let Coordinator know this strand exists void handleRegistrationResponse(const StrandCoord &strand); + void putDescriptor(const Descriptor &message); private: bool isRegistered; ///< whether Coordinator ACKed registration (unused)