(long int) state->offset << ", sz " << (long int) state->size << ".");
nleft = state->size - state->offset;
- len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
+ // XXX: if we use UDS and it is not connected then 'sendto' will be called.
+ if (!fd_table[fd].flags.called_connect && fd_table[fd].unix_addr.sun_family == AF_LOCAL)
+ len = sendto(fd, state->buf + state->offset, nleft, 0, (sockaddr*)&fd_table[fd].unix_addr, SUN_LEN(&fd_table[fd].unix_addr));
+ else
+ len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
debugs(5, 5, "commHandleWrite: write() returns " << len);
fd_bytes(fd, len, FD_WRITE);
statCounter.syscalls.sock.writes++;
return EVENT_ERROR;
};
}
+
+/**
+ * Create a unix-domain socket
+ */
+int
+comm_open_uds(int sock_type,
+ int proto,
+ struct sockaddr_un* addr,
+ int flags)
+{
+ int new_socket;
+ struct addrinfo AI;
+
+ PROF_start(comm_open);
+
+ /* Create socket for accepting new connections. */
+ statCounter.syscalls.sock.sockets++;
+
+ /* Setup the socket addrinfo details for use */
+ AI.ai_flags = 0;
+ AI.ai_family = PF_UNIX;
+ AI.ai_socktype = sock_type;
+ AI.ai_protocol = proto;
+ AI.ai_addrlen = SUN_LEN(addr);
+ AI.ai_addr = (sockaddr*)addr;
+ AI.ai_canonname = NULL;
+ AI.ai_next = NULL;
+
+ debugs(50, 3, "comm_openex: Attempt open socket for: " << addr->sun_path);
+
+ if ((new_socket = socket(AI.ai_family, AI.ai_socktype, AI.ai_protocol)) < 0) {
+ /* Increase the number of reserved fd's if calls to socket()
+ * are failing because the open file table is full. This
+ * limits the number of simultaneous clients */
+
+ if (limitError(errno)) {
+ debugs(50, DBG_IMPORTANT, "comm_open: socket failure: " << xstrerror());
+ fdAdjustReserved();
+ } else {
+ debugs(50, DBG_CRITICAL, "comm_open: socket failure: " << xstrerror());
+ }
+
+ PROF_stop(comm_open);
+ return -1;
+ }
+
+ debugs(50, 3, "comm_openex: Opened socket FD " << new_socket << " : family=" << AI.ai_family << ", type=" << AI.ai_socktype << ", protocol=" << AI.ai_protocol );
+
+ /* update fdstat */
+ debugs(50, 5, "comm_open: FD " << new_socket << " is a new socket");
+
+ assert(!isOpen(new_socket));
+
+ fd_open(new_socket, FD_SOCKET, NULL);
+ fd_table[new_socket].sock_family = AI.ai_family;
+
+ fd_table[new_socket].unix_addr = *addr;
+
+ fdd_table[new_socket].close_file = NULL;
+ fdd_table[new_socket].close_line = 0;
+
+ if (!(flags & COMM_NOCLOEXEC))
+ commSetCloseOnExec(new_socket);
+
+ if (flags & COMM_REUSEADDR)
+ commSetReuseAddr(new_socket);
+
+ if (flags & COMM_NONBLOCKING) {
+ if (commSetNonBlocking(new_socket) != COMM_OK) {
+ comm_close(new_socket);
+ PROF_stop(comm_open);
+ return -1;
+ }
+ }
+
+ if (flags & COMM_DOBIND) {
+ if (commBind(new_socket, AI) != COMM_OK) {
+ comm_close(new_socket);
+ PROF_stop(comm_open);
+ return -1;
+ }
+ }
+
+#ifdef TCP_NODELAY
+ if (sock_type == SOCK_STREAM)
+ commSetTcpNoDelay(new_socket);
+
+#endif
+
+ if (Config.tcpRcvBufsz > 0 && sock_type == SOCK_STREAM)
+ commSetTcpRcvbuf(new_socket, Config.tcpRcvBufsz);
+
+ PROF_stop(comm_open);
+
+ return new_socket;
+}
SQUIDCEXTERN void comm_exit(void);
SQUIDCEXTERN int comm_open(int, int, IpAddress &, int, const char *note);
+SQUIDCEXTERN int comm_open_uds(int sock_type, int proto, struct sockaddr_un* addr, int flags);
/**
* Open a port specially bound for listening or sending through a specific port.
int sock_family;
char ipaddr[MAX_IPSTRLEN]; /* dotted decimal address of peer */
char desc[FD_DESC_SZ];
+ struct sockaddr_un unix_addr; // store address of unix domain socket when it's needed
struct {
unsigned int open:1;
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+
+#include "config.h"
+#include "ipc/Coordinator.h"
+
+
+CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
+
+
+Ipc::Coordinator::Coordinator():
+ Port(coordinatorPathAddr)
+{
+}
+
+void Ipc::Coordinator::start()
+{
+ listen();
+}
+
+Ipc::StrandData* Ipc::Coordinator::findStrand(int kidId)
+{
+ for (Vector<StrandData>::iterator iter = strands.begin(); iter != strands.end(); ++iter) {
+ if (iter->kidId == kidId)
+ return &(*iter);
+ }
+ return NULL;
+}
+
+void Ipc::Coordinator::enrollStrand(const StrandData& strand)
+{
+ if (StrandData* found = findStrand(strand.kidId))
+ *found = strand;
+ else
+ strands.push_back(strand);
+}
+
+void Ipc::Coordinator::handleRead(const Message& message)
+{
+ switch (message.type()) {
+ case mtRegister:
+ debugs(54, 6, HERE << "Registration request");
+ handleRegistrationRequest(message.strand());
+ break;
+
+ default:
+ debugs(54, 6, HERE << "Unhandled message of type: " << message.type());
+ break;
+ }
+}
+
+void Ipc::Coordinator::handleRegistrationRequest(const StrandData& strand)
+{
+ // register strand
+ enrollStrand(strand);
+ // send back received message
+ SendMessage(makeAddr(strandPathAddr, strand.kidId), Message(mtRegister, strand.kidId, strand.pid));
+}
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_COORDINATOR_H
+#define SQUID_IPC_COORDINATOR_H
+
+
+#include "Array.h"
+#include "SquidString.h"
+#include "ipc/Port.h"
+
+
+namespace Ipc
+{
+
+
+/**
+ Coordinator processes incoming queries about registration of running squid instances
+ and store it KidIndentiers.
+*/
+class Coordinator: public Port, public RefCountable
+{
+private:
+ Coordinator(const Coordinator&); // not implemented
+ Coordinator& operator =(const Coordinator&); // not implemented
+
+public:
+ Coordinator();
+
+public:
+ virtual void start();
+
+private:
+ virtual void handleRead(const Message& message);
+ StrandData* findStrand(int kidId);
+ void enrollStrand(const StrandData& strand);
+ void handleRegistrationRequest(const StrandData& strand);
+
+private:
+ Vector<StrandData> strands;
+
+ CBDATA_CLASS2(Coordinator);
+};
+
+
+}
+
+#endif /* SQUID_IPC_COORDINATOR_H */
storage.reserve(n);
+ char kid_name[32];
for (size_t i = 1; i <= n; ++i) {
- char kid_name[32];
snprintf(kid_name, sizeof(kid_name), "(squid-%d)", (int)i);
storage.push_back(Kid(kid_name));
}
+
+ if (n > 1) {
+ snprintf(kid_name, sizeof(kid_name), "(squid-coord-%d)", (int)(n + 1));
+ storage.push_back(Kid(kid_name));
+ }
}
/// returns kid by pid
Kid.cc \
Kid.h \
Kids.cc \
- Kids.h
+ Kids.h \
+ Coordinator.cc \
+ Coordinator.h \
+ UdsOp.cc \
+ UdsOp.h \
+ Port.cc \
+ Port.h \
+ Strand.cc \
+ Strand.h
+
+DEFS += -DDEFAULT_PREFIX=\"$(prefix)\"
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+
+#include "config.h"
+#include "ipc/Port.h"
+
+
+const char Ipc::coordinatorPathAddr[] = DEFAULT_PREFIX "/ipc/coordinator";
+const char Ipc::strandPathAddr[] = DEFAULT_PREFIX "/ipc/squid";
+
+
+Ipc::Port::Port(const String& aListenAddr):
+ UdsOp(aListenAddr),
+ listenAddr(aListenAddr)
+{
+ assert(listenAddr.size() > sizeof(DEFAULT_PREFIX));
+}
+
+void Ipc::Port::listen()
+{
+ debugs(54, 6, HERE);
+ AsyncCall::Pointer readHandler = asyncCall(54, 6, "Ipc::Port::noteRead",
+ CommCbMemFunT<Port, CommIoCbParams>(this, &Port::noteRead));
+ comm_read(fd(), message.rawData(), message.size(), readHandler);
+}
+
+String Ipc::Port::makeAddr(const char* pathAddr, int id) const
+{
+ assert(id >= 0);
+ String addr = pathAddr;
+ addr.append('-');
+ addr.append(xitoa(id));
+ return addr;
+}
+
+void Ipc::Port::noteRead(const CommIoCbParams& params)
+{
+ debugs(54, 6, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
+ assert(params.data == this);
+ if (params.flag == COMM_OK) {
+ assert(params.buf == (char*)&message);
+ assert(params.size == sizeof(Message));
+ handleRead(message);
+ }
+ listen();
+}
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_PORT_H
+#define SQUID_IPC_PORT_H
+
+
+#include "SquidString.h"
+#include "ipc/UdsOp.h"
+
+
+namespace Ipc
+{
+
+
+/**
+ Base class implements functionality of local endpoint
+ is listening incoming connections
+*/
+class Port: public UdsOp
+{
+public:
+ Port(const String& aListenAddr);
+
+public:
+ /// start listening of incoming connections
+ void listen();
+ String makeAddr(const char* pathAddr, int id) const;
+
+protected:
+ virtual void handleRead(const Message& message) = 0;
+
+private:
+ void noteRead(const CommIoCbParams& params);
+
+private:
+ String listenAddr;
+ Message message;
+};
+
+
+extern const char coordinatorPathAddr[];
+extern const char strandPathAddr[];
+
+
+}
+
+
+#endif /* SQUID_IPC_PORT_H */
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+
+#include "config.h"
+#include "ipc/Strand.h"
+#include "ipc/Kids.h"
+
+
+CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
+
+
+Ipc::Strand::Strand():
+ Port(makeAddr(strandPathAddr, KidIdentifier)),
+ isRegistered(false)
+{
+}
+
+void Ipc::Strand::start()
+{
+ listen();
+ setListenTimeout(&Strand::noteRegistrationTimeout, 6);
+ enroll();
+}
+
+void Ipc::Strand::enroll()
+{
+ debugs(54, 6, HERE);
+ assert(!registered());
+ SendMessage(coordinatorPathAddr, Message(mtRegister, KidIdentifier, getpid()));
+}
+
+void Ipc::Strand::handleRead(const Message& message)
+{
+ debugs(54, 6, HERE);
+ switch (message.type()) {
+
+ case mtRegister:
+ handleRegistrationResponse(message.strand());
+ break;
+
+ default:
+ debugs(54, 6, HERE << "Unhandled message of type: " << message.type());
+ break;
+ }
+}
+
+void Ipc::Strand::handleRegistrationResponse(const StrandData& strand)
+{
+ // handle registration respond from coordinator
+ // coordinator returns the same message
+ isRegistered = (strand.kidId == KidIdentifier && strand.pid == getpid());
+ debugs(54, 6, "Kid " << KidIdentifier << " is " << (char*)(isRegistered ? "" : "NOT ") << "registered");
+ setListenTimeout(NULL, -1);
+}
+
+void Ipc::Strand::setListenTimeout(TimeoutHandler timeoutHandler, int timeout)
+{
+ AsyncCall::Pointer listenTimeoutHandler = NULL;
+ if (timeout > 0) {
+ assert(timeoutHandler != NULL);
+ listenTimeoutHandler = asyncCall(54, 6, "Ipc::Strand::timeoutHandler",
+ CommCbMemFunT<Strand, CommTimeoutCbParams>(this, timeoutHandler));
+ }
+ setTimeout(listenTimeoutHandler, timeout);
+}
+
+void Ipc::Strand::noteRegistrationTimeout(const CommTimeoutCbParams& params)
+{
+ debugs(54, 6, HERE);
+ if (!registered()) {
+ debugs(54, 6, HERE << "Kid " << KidIdentifier << " is not registered");
+ exit(1);
+ }
+}
+
+bool Ipc::Strand::registered() const
+{
+ return isRegistered;
+}
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_STRAND_H
+#define SQUID_IPC_STRAND_H
+
+
+#include "SquidString.h"
+#include "ipc/Port.h"
+
+
+namespace Ipc
+{
+
+
+/**
+ Strand implement functionality of Coordinator's client and
+ send registration query to Coordinator with it KidIdentifier.
+*/
+class Strand: public Port, public RefCountable
+{
+private:
+ typedef void (Strand::*TimeoutHandler)(const CommTimeoutCbParams&);
+
+private:
+ Strand(const Strand&); // not implemented
+ Strand& operator =(const Strand&); // not implemented
+
+public:
+ Strand();
+
+public:
+ virtual void start();
+ /// send register query
+ void enroll();
+ bool registered() const;
+
+private:
+ virtual void handleRead(const Message& message);
+ void handleRegistrationResponse(const StrandData& strand);
+ void setListenTimeout(TimeoutHandler timeoutHandler, int timeout);
+ void noteRegistrationTimeout(const CommTimeoutCbParams& params);
+
+private:
+ bool isRegistered;
+
+ CBDATA_CLASS2(Strand);
+};
+
+
+}
+
+
+#endif /* SQUID_IPC_STRAND_H */
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+
+#include "config.h"
+#include "comm.h"
+#include "ipc/UdsOp.h"
+
+#define SEND_RETRIES 4
+#define SEND_TIMEOUT 4
+
+Ipc::Message::Message():
+ data()
+{
+}
+
+Ipc::Message::Message(MessageType messageType, int kidId, pid_t pid)
+{
+ data.messageType = messageType;
+ data.strand.kidId = kidId;
+ data.strand.pid = pid;
+}
+
+Ipc::MessageType Ipc::Message::type() const
+{
+ return data.messageType;
+}
+
+const Ipc::StrandData& Ipc::Message::strand() const
+{
+ return data.strand;
+}
+
+char* Ipc::Message::rawData()
+{
+ return (char*)&data;
+}
+
+size_t Ipc::Message::size()
+{
+ return sizeof(data);
+}
+
+
+Ipc::UdsOp::UdsOp(const String& pathAddr, bool bind /* = true */):
+ AsyncJob("Ipc::UdsOp"),
+ addr(setAddr(pathAddr)),
+ options(COMM_NONBLOCKING),
+ fd_(-1)
+{
+ debugs(54, 5, HERE << '[' << this << "] pathAddr " << pathAddr);
+ if (bind) {
+ unlink(pathAddr.termedBuf());
+ options |= COMM_DOBIND;
+ }
+}
+
+Ipc::UdsOp::~UdsOp()
+{
+ debugs(54, 5, HERE << '[' << this << ']');
+ if (fd_ > 0)
+ comm_close(fd_);
+}
+
+bool Ipc::UdsOp::doneAll() const
+{
+ return false;
+}
+
+int Ipc::UdsOp::fd()
+{
+ if (fd_ < 0) {
+ fd_ = comm_open_uds(SOCK_DGRAM, 0, &addr, options);
+ Must(fd_ > 0);
+ }
+ return fd_;
+}
+
+struct sockaddr_un Ipc::UdsOp::setAddr(const String& pathAddr)
+{
+ assert(pathAddr.size() != 0);
+ struct sockaddr_un unixAddr;
+ memset(&unixAddr, 0, sizeof(unixAddr));
+ unixAddr.sun_family = AF_LOCAL;
+ xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path));
+ return unixAddr;
+}
+
+void Ipc::UdsOp::setTimeout(AsyncCall::Pointer& timeoutHandler, int timeout)
+{
+ commSetTimeout(fd(), timeout, timeoutHandler);
+}
+
+
+CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender);
+
+Ipc::UdsSender::UdsSender(const String& pathAddr, const Message& aMessage):
+ UdsOp(pathAddr, false),
+ message(aMessage),
+ retries(SEND_RETRIES),
+ timeout(SEND_TIMEOUT)
+{
+ assert(retries > 0);
+ assert(timeout >= 0);
+}
+
+void Ipc::UdsSender::start()
+{
+ write();
+ if (timeout > 0)
+ {
+ AsyncCall::Pointer timeoutHandler = asyncCall(54, 5, "Ipc::UdsSender::noteTimeout",
+ CommCbMemFunT<UdsSender, CommTimeoutCbParams>(this, &UdsSender::noteTimeout));
+ setTimeout(timeoutHandler, timeout);
+ }
+}
+
+bool Ipc::UdsSender::retry()
+{
+ if (retries > 0)
+ --retries;
+ return retries != 0;
+}
+
+void Ipc::UdsSender::write()
+{
+ debugs(54, 5, HERE);
+ AsyncCall::Pointer writeHandler = asyncCall(54, 5, "Ipc::UdsSender::noteWrite",
+ CommCbMemFunT<UdsSender, CommIoCbParams>(this, &UdsSender::noteWrite));
+ comm_write(fd(), message.rawData(), message.size(), writeHandler);
+}
+
+void Ipc::UdsSender::noteWrite(const CommIoCbParams& params)
+{
+ debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
+ if (params.flag == COMM_OK || !retry())
+ mustStop("done");
+ else
+ write();
+}
+
+void Ipc::UdsSender::noteTimeout(const CommTimeoutCbParams& params)
+{
+ debugs(54, 5, HERE);
+ mustStop("done");
+}
+
+
+void Ipc::SendMessage(const String& toAddress, const Message& message)
+{
+ AsyncJob::AsyncStart(new UdsSender(toAddress, message));
+}
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_ASYNCUDSOP_H
+#define SQUID_IPC_ASYNCUDSOP_H
+
+
+#include "SquidString.h"
+#include "CommCalls.h"
+
+
+namespace Ipc
+{
+
+
+typedef enum {mtNone = 0, mtRegister} MessageType;
+
+/**
+ This one contains a registration info
+*/
+struct StrandData
+{
+ int kidId;
+ pid_t pid;
+};
+
+/**
+ This class contains data is used by UdsSender/UdsReceiver.
+*/
+class Message
+{
+public:
+ Message();
+ Message(MessageType messageType, int kidId, pid_t pid);
+
+public:
+ MessageType type() const;
+ const StrandData& strand() const;
+ char* rawData();
+ size_t size();
+
+private:
+ struct {
+ MessageType messageType;
+ StrandData strand;
+ } data;
+};
+
+/**
+ UdsOp implements common async UDS operation.
+*/
+class UdsOp: public AsyncJob
+{
+private:
+ UdsOp(const UdsOp&); // not implemented
+ UdsOp& operator= (const UdsOp&); // not implemented
+
+public:
+ UdsOp(const String& pathAddr, bool bind = true);
+ virtual ~UdsOp();
+
+protected:
+ /// return an endpoint for communication, use fd() instead of fd_
+ int fd();
+ virtual bool doneAll() const;
+ void setTimeout(AsyncCall::Pointer& timeoutHandler, int aTimeout);
+
+private:
+ struct sockaddr_un setAddr(const String& pathAddr);
+
+private:
+ struct sockaddr_un addr;
+ int options;
+ int fd_;
+};
+
+/**
+ Implement async write operation for UDS
+*/
+class UdsSender: public UdsOp
+{
+private:
+ UdsSender(const UdsSender&); // not implemented
+ UdsSender& operator= (const UdsSender&); // not implemented
+
+public:
+ UdsSender(const String& pathAddr, const Message& aMessage);
+
+public:
+ /// start writing data
+ virtual void start();
+
+private:
+ /// update retries counter and check
+ bool retry();
+ /// schedule writing
+ void write();
+ void noteWrite(const CommIoCbParams& params);
+ void noteTimeout(const CommTimeoutCbParams& params);
+
+private:
+ Message message;
+ int retries;
+ int timeout;
+
+ CBDATA_CLASS2(UdsSender);
+};
+
+
+void SendMessage(const String& toAddress, const Message& message);
+
+
+}
+
+#endif /* SQUID_IPC_ASYNCUDSOP_H */
#include "DiskIO/DiskIOModule.h"
#include "comm.h"
#include "ipc/Kids.h"
+#include "ipc/Coordinator.h"
+#include "ipc/Strand.h"
#if USE_EPOLL
#include "comm_epoll.h"
#endif
mainLoop.setTimeService(&time_engine);
+ if (!opt_no_daemon && Config.main_processes > 1) {
+ if (KidIdentifier == Config.main_processes + 1)
+ AsyncJob::AsyncStart(new Ipc::Coordinator);
+ else if (KidIdentifier != 0)
+ AsyncJob::AsyncStart(new Ipc::Strand);
+ }
+
/* at this point we are finished the synchronous startup. */
starting_up = 0;
mainStartScript(argv[0]);
// start each kid that needs to be [re]started; once
- for (size_t i = 0; i < TheKids.count(); ++i) {
+ for (int i = TheKids.count() - 1; i >= 0; --i) {
Kid& kid = TheKids.get(i);
if (kid.hopeless() || kid.exitedHappy() || kid.running())
continue;