Tested using on-disk file. The test hack will be removed.
Fixed Port code to allow it to receive more than one message with varying
msghdr buffer configurations. We must [re]allocate all msghdr buffers before
every read/recvmsg() call.
handleRegistrationRequest(StrandCoord(message));
break;
+ case mtDescriptorGet:
+ debugs(54, 6, HERE << "Descriptor get request");
+ handleDescriptorGet(Descriptor(message));
+ break;
+
default:
- debugs(54, 6, HERE << "Unhandled message type: " << message.type());
+ debugs(54, 1, HERE << "Unhandled message type: " << message.type());
break;
}
}
SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
}
+void Ipc::Coordinator::handleDescriptorGet(const Descriptor& request)
+{
+ // XXX: hack: create descriptor here
+ char buffer[64];
+ snprintf(buffer, sizeof(buffer), "/tmp/squid_shared_file.txt");
+ static int fd = -1;
+ if (fd < 0) {
+ fd = open(buffer, O_CREAT | O_RDWR | O_APPEND, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH);
+ int n = snprintf(buffer, sizeof(buffer), "coord: created %d\n", fd);
+ ssize_t bytes = write(fd, buffer, n);
+ Must(bytes == n);
+ debugs(54, 6, "Created FD " << fd << " for kid" << request.fromKid);
+ } else {
+ int n = snprintf(buffer, sizeof(buffer), "coord: updated %d\n", fd);
+ ssize_t bytes = write(fd, buffer, n);
+ Must(bytes == n);
+ }
+
+ debugs(54, 6, "Sending FD " << fd << " to kid" << request.fromKid);
+
+ Descriptor response(-1, fd);
+ TypedMsgHdr message;
+ response.pack(message);
+ SendMessage(MakeAddr(strandAddrPfx, request.fromKid), message);
+
+ // XXX: close(fd); fd should be opened until the message has not reached rec iver
+}
+
void Ipc::Coordinator::broadcastSignal(int sig) const
{
typedef Strands::const_iterator SCI;
void registerStrand(const StrandCoord &); ///< adds or updates existing
void handleRegistrationRequest(const StrandCoord &); ///< register,ACK
+ void handleDescriptorGet(const Descriptor& request);
+
private:
typedef Vector<StrandCoord> Strands; ///< unsorted strands
Strands strands; ///< registered processes and threads
Ipc::StrandCoord::StrandCoord(const TypedMsgHdr &hdrMsg): kidId(-1), pid(0)
{
- hdrMsg.getData(mtRegistration, this, sizeof(this));
+ hdrMsg.getData(mtRegistration, this, sizeof(this));
}
void Ipc::StrandCoord::pack(TypedMsgHdr &hdrMsg) const
{
hdrMsg.putData(mtRegistration, this, sizeof(this));
}
+
+
+Ipc::Descriptor::Descriptor(): fromKid(-1), fd(-1)
+{
+}
+
+Ipc::Descriptor::Descriptor(int aFromKid, int aFd): fromKid(aFromKid), fd(aFd)
+{
+}
+
+Ipc::Descriptor::Descriptor(const TypedMsgHdr &hdrMsg): fromKid(-1), fd(-1)
+{
+ if (hdrMsg.type() == mtDescriptorGet) {
+ hdrMsg.getData(mtDescriptorGet, this, sizeof(this));
+ fd = -1;
+ } else {
+ hdrMsg.getData(mtDescriptorPut, this, sizeof(this));
+ fd = hdrMsg.getFd();
+ }
+}
+
+void Ipc::Descriptor::pack(TypedMsgHdr &hdrMsg) const
+{
+ if (fd >= 0) {
+ hdrMsg.putData(mtDescriptorPut, this, sizeof(this));
+ hdrMsg.putFd(fd);
+ } else {
+ hdrMsg.putData(mtDescriptorGet, this, sizeof(this));
+ }
+}
class TypedMsgHdr;
-typedef enum { mtNone = 0, mtRegistration, mtDescriptor } MessageType;
+typedef enum { mtNone = 0, mtRegistration,
+ mtDescriptorGet, mtDescriptorPut } MessageType;
/// Strand location details
class StrandCoord {
public:
- StrandCoord(); ///< unknown location
- StrandCoord(int akidId, pid_t aPid); ///< from registrant
- explicit StrandCoord(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
- void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+ StrandCoord(); ///< unknown location
+ StrandCoord(int akidId, pid_t aPid); ///< from registrant
+ explicit StrandCoord(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+ void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
public:
int kidId; ///< internal Squid process number
class Descriptor
{
public:
- explicit Descriptor(int fd); ///< from descriptor sender
- explicit Descriptor(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
- void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+ Descriptor(); ///< unknown descriptor
+ Descriptor(int fromKid, int fd); ///< from descriptor sender or requestor
+ explicit Descriptor(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+ void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
public:
+ int fromKid; /// the source of this message
int fd; ///< raw descriptor value
};
UdsOp(aListenAddr)
{
setOptions(COMM_NONBLOCKING | COMM_DOBIND);
- buf.allocate();
}
void Ipc::Port::start()
void Ipc::Port::listen()
{
debugs(54, 6, HERE);
+ buf.prepForReading();
AsyncCall::Pointer readHandler = asyncCall(54, 6, "Ipc::Port::noteRead",
CommCbMemFunT<Port, CommIoCbParams>(this, &Port::noteRead));
comm_read(fd(), buf.raw(), buf.size(), readHandler);
void Ipc::Strand::receive(const TypedMsgHdr &message)
{
- debugs(54, 6, HERE);
+ debugs(54, 6, HERE << message.type());
switch (message.type()) {
case mtRegistration:
handleRegistrationResponse(StrandCoord(message));
break;
+ case mtDescriptorPut:
+ putDescriptor(Descriptor(message));
+ break;
+
default:
- debugs(54, 6, HERE << "Unhandled message type: " << message.type());
+ debugs(54, 1, HERE << "Unhandled message type: " << message.type());
break;
}
}
if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
debugs(54, 6, "kid" << KidIdentifier << " registered");
clearTimeout(); // we are done
+
+ debugs(54, 6, HERE << "requesting FD");
+ Descriptor request(KidIdentifier, -1);
+ TypedMsgHdr message;
+ request.pack(message);
+ SendMessage(coordinatorAddr, message);
} else {
// could be an ACK to the registration message of our dead predecessor
debugs(54, 6, "kid" << KidIdentifier << " is not yet registered");
}
}
+/// receive descriptor we asked for
+void Ipc::Strand::putDescriptor(const Descriptor &message)
+{
+ debugs(54, 6, HERE << "got FD " << message.fd);
+ char buffer[64];
+ const int n = snprintf(buffer, sizeof(buffer), "strand: kid%d wrote using FD %d\n", KidIdentifier, message.fd);
+ ssize_t bytes = write(message.fd, buffer, n);
+ Must(bytes == n);
+}
+
void Ipc::Strand::timedout()
{
debugs(54, 6, HERE << isRegistered);
{
class StrandCoord;
+class Descriptor;
/// Receives coordination messages on behalf of its process or thread
class Strand: public Port
private:
void registerSelf(); /// let Coordinator know this strand exists
void handleRegistrationResponse(const StrandCoord &strand);
+ void putDescriptor(const Descriptor &message);
private:
bool isRegistered; ///< whether Coordinator ACKed registration (unused)