(long int) state->offset << ", sz " << (long int) state->size << ".");
nleft = state->size - state->offset;
- len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
- // XXX: if we use UDS and it is not connected then 'sendto' will be called.
++ // XXX: tmp hack: call sendto() for unconnected UDS sockets
+ if (!fd_table[fd].flags.called_connect && fd_table[fd].unix_addr.sun_family == AF_LOCAL)
+ len = sendto(fd, state->buf + state->offset, nleft, 0, (sockaddr*)&fd_table[fd].unix_addr, SUN_LEN(&fd_table[fd].unix_addr));
+ else
+ len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
debugs(5, 5, "commHandleWrite: write() returns " << len);
fd_bytes(fd, len, FD_WRITE);
statCounter.syscalls.sock.writes++;
return EVENT_ERROR;
};
}
-/**
- * Create a unix-domain socket
- */
+
- struct addrinfo AI;
++/// Create a unix-domain socket (UDS)
+ int
+ comm_open_uds(int sock_type,
+ int proto,
+ struct sockaddr_un* addr,
+ int flags)
+ {
++ // TODO: merge with comm_openex() when IpAddress becomes NetAddress
++
+ int new_socket;
-
+
+ PROF_start(comm_open);
- debugs(50, 3, "comm_openex: Attempt open socket for: " << addr->sun_path);
+ /* Create socket for accepting new connections. */
+ statCounter.syscalls.sock.sockets++;
+
+ /* Setup the socket addrinfo details for use */
++ struct addrinfo AI;
+ AI.ai_flags = 0;
+ AI.ai_family = PF_UNIX;
+ AI.ai_socktype = sock_type;
+ AI.ai_protocol = proto;
+ AI.ai_addrlen = SUN_LEN(addr);
+ AI.ai_addr = (sockaddr*)addr;
+ AI.ai_canonname = NULL;
+ AI.ai_next = NULL;
+
- debugs(50, DBG_IMPORTANT, "comm_open: socket failure: " << xstrerror());
++ debugs(50, 3, HERE << "Attempt open socket for: " << addr->sun_path);
+
+ if ((new_socket = socket(AI.ai_family, AI.ai_socktype, AI.ai_protocol)) < 0) {
+ /* Increase the number of reserved fd's if calls to socket()
+ * are failing because the open file table is full. This
+ * limits the number of simultaneous clients */
+
+ if (limitError(errno)) {
- debugs(50, DBG_CRITICAL, "comm_open: socket failure: " << xstrerror());
++ debugs(50, DBG_IMPORTANT, HERE << "socket failure: " << xstrerror());
+ fdAdjustReserved();
+ } else {
- debugs(50, 3, "comm_openex: Opened socket FD " << new_socket << " : family=" << AI.ai_family << ", type=" << AI.ai_socktype << ", protocol=" << AI.ai_protocol );
++ debugs(50, DBG_CRITICAL, HERE << "socket failure: " << xstrerror());
+ }
+
+ PROF_stop(comm_open);
+ return -1;
+ }
+
- debugs(50, 5, "comm_open: FD " << new_socket << " is a new socket");
++ debugs(50, 3, HERE "Opened UDS FD " << new_socket << " : family=" << AI.ai_family << ", type=" << AI.ai_socktype << ", protocol=" << AI.ai_protocol);
+
+ /* update fdstat */
-
++ debugs(50, 5, HERE << "FD " << new_socket << " is a new socket");
+
+ assert(!isOpen(new_socket));
- fd_table[new_socket].sock_family = AI.ai_family;
-
- fd_table[new_socket].unix_addr = *addr;
+ fd_open(new_socket, FD_SOCKET, NULL);
+
+ fdd_table[new_socket].close_file = NULL;
++
+ fdd_table[new_socket].close_line = 0;
+
++ fd_table[new_socket].unix_addr = *addr;
++
++ fd_table[new_socket].sock_family = AI.ai_family;
++
+ if (!(flags & COMM_NOCLOEXEC))
+ commSetCloseOnExec(new_socket);
+
+ if (flags & COMM_REUSEADDR)
+ commSetReuseAddr(new_socket);
+
+ if (flags & COMM_NONBLOCKING) {
+ if (commSetNonBlocking(new_socket) != COMM_OK) {
+ comm_close(new_socket);
+ PROF_stop(comm_open);
+ return -1;
+ }
+ }
+
+ if (flags & COMM_DOBIND) {
+ if (commBind(new_socket, AI) != COMM_OK) {
+ comm_close(new_socket);
+ PROF_stop(comm_open);
+ return -1;
+ }
+ }
+
+ #ifdef TCP_NODELAY
+ if (sock_type == SOCK_STREAM)
+ commSetTcpNoDelay(new_socket);
+
+ #endif
+
+ if (Config.tcpRcvBufsz > 0 && sock_type == SOCK_STREAM)
+ commSetTcpRcvbuf(new_socket, Config.tcpRcvBufsz);
+
+ PROF_stop(comm_open);
+
+ return new_socket;
+ }
#include "Debug.h"
#include "SquidTime.h"
#include "util.h"
++#include "ipc/Kids.h"
/* for shutting_down flag in xassert() */
#include "globals.h"
static char *debug_log_file = NULL;
static int Ctx_Lock = 0;
static const char *debugLogTime(void);
++static const char *debugLogKid(void);
static void ctx_print(void);
#if HAVE_SYSLOG
#ifdef LOG_LOCAL4
va_start(args2, format);
va_start(args3, format);
-- snprintf(f, BUFSIZ, "%s| %s",
++ snprintf(f, BUFSIZ, "%s%s| %s",
debugLogTime(),
++ debugLogKid(),
format);
_db_print_file(f, args1);
return buf;
}
++static const char *
++debugLogKid(void)
++{
++ if (KidIdentifier != 0) {
++ static char buf[16];
++ snprintf(buf, sizeof(buf), " kid%d", KidIdentifier);
++ return buf;
++ }
++
++ return "";
++}
++
void
xassert(const char *msg, const char *file, int line)
{
int sock_family;
char ipaddr[MAX_IPSTRLEN]; /* dotted decimal address of peer */
char desc[FD_DESC_SZ];
- struct sockaddr_un unix_addr; // store address of unix domain socket when it's needed
++ struct sockaddr_un unix_addr; ///< unix domain socket (UDS) address, if any
struct {
unsigned int open:1;
--- /dev/null
- Port(coordinatorPathAddr)
+ /*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+
+ #include "config.h"
+ #include "ipc/Coordinator.h"
+
+
+ CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
+
+
+ Ipc::Coordinator::Coordinator():
- listen();
++ Port(coordinatorAddr)
+ {
+ }
+
+ void Ipc::Coordinator::start()
+ {
-void Ipc::Coordinator::enrollStrand(const StrandData& strand)
++ Port::start();
+ }
+
+ Ipc::StrandData* Ipc::Coordinator::findStrand(int kidId)
+ {
+ for (Vector<StrandData>::iterator iter = strands.begin(); iter != strands.end(); ++iter) {
+ if (iter->kidId == kidId)
+ return &(*iter);
+ }
+ return NULL;
+ }
+
-void Ipc::Coordinator::handleRead(const Message& message)
++void Ipc::Coordinator::registerStrand(const StrandData& strand)
+ {
+ if (StrandData* found = findStrand(strand.kidId))
+ *found = strand;
+ else
+ strands.push_back(strand);
+ }
+
- case mtRegister:
++void Ipc::Coordinator::receive(const Message& message)
+ {
+ switch (message.type()) {
- debugs(54, 6, HERE << "Unhandled message of type: " << message.type());
++ case mtRegistration:
+ debugs(54, 6, HERE << "Registration request");
+ handleRegistrationRequest(message.strand());
+ break;
+
+ default:
- // register strand
- enrollStrand(strand);
- // send back received message
- SendMessage(makeAddr(strandPathAddr, strand.kidId), Message(mtRegister, strand.kidId, strand.pid));
++ debugs(54, 6, HERE << "Unhandled message type: " << message.type());
+ break;
+ }
+ }
+
+ void Ipc::Coordinator::handleRegistrationRequest(const StrandData& strand)
+ {
++ registerStrand(strand);
++
++ // send back an acknowledgement; TODO: remove as not needed?
++ SendMessage(MakeAddr(strandAddrPfx, strand.kidId),
++ Message(mtRegistration, strand.kidId, strand.pid));
+ }
--- /dev/null
-#include "SquidString.h"
+ /*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+ #ifndef SQUID_IPC_COORDINATOR_H
+ #define SQUID_IPC_COORDINATOR_H
+
+
+ #include "Array.h"
-
-/**
- Coordinator processes incoming queries about registration of running squid instances
- and store it KidIndentiers.
-*/
-class Coordinator: public Port, public RefCountable
+ #include "ipc/Port.h"
+
+
+ namespace Ipc
+ {
+
-private:
- Coordinator(const Coordinator&); // not implemented
- Coordinator& operator =(const Coordinator&); // not implemented
-
++/// Coordinates shared activities of Strands (Squid processes or threads)
++class Coordinator: public Port
+ {
-public:
- virtual void start();
+ public:
+ Coordinator();
+
-private:
- virtual void handleRead(const Message& message);
- StrandData* findStrand(int kidId);
- void enrollStrand(const StrandData& strand);
- void handleRegistrationRequest(const StrandData& strand);
++protected:
++ virtual void start(); // Port (AsyncJob) API
++ virtual void receive(const Message& message); // Port API
+
- Vector<StrandData> strands;
++ StrandData* findStrand(int kidId); ///< registered strand or NULL
++ void registerStrand(const StrandData &); ///< adds or updates existing
++ void handleRegistrationRequest(const StrandData &); ///< registers and ACKs
+
+ private:
-}
++ Vector<StrandData> strands; ///< registered processes and threads
+
+ CBDATA_CLASS2(Coordinator);
++
++private:
++ Coordinator(const Coordinator&); // not implemented
++ Coordinator& operator =(const Coordinator&); // not implemented
+ };
+
+
++} // namespace Ipc
+
+ #endif /* SQUID_IPC_COORDINATOR_H */
storage.reserve(n);
+ char kid_name[32];
++
++ // add Kid records for all n main strands
for (size_t i = 1; i <= n; ++i) {
- char kid_name[32];
snprintf(kid_name, sizeof(kid_name), "(squid-%d)", (int)i);
storage.push_back(Kid(kid_name));
}
+
++ // if coordination is needed, add a Kid record for Coordinator
+ if (n > 1) {
+ snprintf(kid_name, sizeof(kid_name), "(squid-coord-%d)", (int)(n + 1));
+ storage.push_back(Kid(kid_name));
+ }
}
/// returns kid by pid
--- /dev/null
-
-const char Ipc::coordinatorPathAddr[] = DEFAULT_PREFIX "/ipc/coordinator";
-const char Ipc::strandPathAddr[] = DEFAULT_PREFIX "/ipc/squid";
+ /*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+
+ #include "config.h"
+ #include "ipc/Port.h"
+
- UdsOp(aListenAddr),
- listenAddr(aListenAddr)
++const char Ipc::coordinatorAddr[] = DEFAULT_PREFIX "/var/run/coordinator.ipc";
++const char Ipc::strandAddrPfx[] = DEFAULT_PREFIX "/var/run/squid";
+
+
+ Ipc::Port::Port(const String& aListenAddr):
- assert(listenAddr.size() > sizeof(DEFAULT_PREFIX));
++ UdsOp(aListenAddr)
+ {
- comm_read(fd(), message.rawData(), message.size(), readHandler);
++ setOptions(COMM_NONBLOCKING | COMM_DOBIND);
++}
++
++void Ipc::Port::start()
++{
++ UdsOp::start();
++ listen();
+ }
+
+ void Ipc::Port::listen()
+ {
+ debugs(54, 6, HERE);
+ AsyncCall::Pointer readHandler = asyncCall(54, 6, "Ipc::Port::noteRead",
+ CommCbMemFunT<Port, CommIoCbParams>(this, &Port::noteRead));
-String Ipc::Port::makeAddr(const char* pathAddr, int id) const
++ comm_read(fd(), buf.raw(), buf.size(), readHandler);
+ }
+
- debugs(54, 6, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
- assert(params.data == this);
++bool Ipc::Port::doneAll() const
++{
++ return false; // listen forever
++}
++
++String Ipc::Port::MakeAddr(const char* pathAddr, int id)
+ {
+ assert(id >= 0);
+ String addr = pathAddr;
+ addr.append('-');
+ addr.append(xitoa(id));
++ addr.append(".ipc");
+ return addr;
+ }
+
+ void Ipc::Port::noteRead(const CommIoCbParams& params)
+ {
- assert(params.buf == (char*)&message);
- assert(params.size == sizeof(Message));
- handleRead(message);
++ debugs(54, 6, HERE << "FD " << params.fd << " flag " << params.flag <<
++ " [" << this << ']');
+ if (params.flag == COMM_OK) {
++ assert(params.buf == buf.raw());
++ assert(params.size == buf.size());
++ receive(buf);
+ }
++ // TODO: if there was a fatal error on our socket, close the socket before
++ // trying to listen again and print a level-1 error message.
++
+ listen();
+ }
--- /dev/null
-/**
- Base class implements functionality of local endpoint
- is listening incoming connections
-*/
+ /*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+ #ifndef SQUID_IPC_PORT_H
+ #define SQUID_IPC_PORT_H
+
+
+ #include "SquidString.h"
+ #include "ipc/UdsOp.h"
+
+
+ namespace Ipc
+ {
+
+
- Port(const String& aListenAddr);
++/// Waits for and receives incoming IPC messages; kids handle the messages
+ class Port: public UdsOp
+ {
+ public:
-public:
- /// start listening of incoming connections
++ Port(const String &aListenAddr);
+
- String makeAddr(const char* pathAddr, int id) const;
++protected:
++ /// calculates IPC message address for strand #id at path
++ static String MakeAddr(const char *path, int id);
++
++ virtual void start() = 0; // UdsOp (AsyncJob) API; has body
++ virtual bool doneAll() const; // UdsOp (AsyncJob) API
++
++ /// read the next incoming message
+ void listen();
-protected:
- virtual void handleRead(const Message& message) = 0;
+
- void noteRead(const CommIoCbParams& params);
++ /// handle IPC message just read
++ virtual void receive(const Message& message) = 0;
+
+ private:
- String listenAddr;
- Message message;
++ void noteRead(const CommIoCbParams ¶ms); // Comm callback API
+
+ private:
-extern const char coordinatorPathAddr[];
-extern const char strandPathAddr[];
-
++ Message buf; ///< UDS read buffer filled by Comm
+ };
+
+
-}
++extern const char coordinatorAddr[]; ///< where coordinator listens
++extern const char strandAddrPfx[]; ///< strand's listening address prefix
+
++} // namespace Ipc
+
+
+ #endif /* SQUID_IPC_PORT_H */
--- /dev/null
-
+ /*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
- Port(makeAddr(strandPathAddr, KidIdentifier)),
+ #include "config.h"
+ #include "ipc/Strand.h"
+ #include "ipc/Kids.h"
+
+
+ CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
+
+
+ Ipc::Strand::Strand():
- listen();
- setListenTimeout(&Strand::noteRegistrationTimeout, 6);
- enroll();
++ Port(MakeAddr(strandAddrPfx, KidIdentifier)),
+ isRegistered(false)
+ {
+ }
+
+ void Ipc::Strand::start()
+ {
-void Ipc::Strand::enroll()
++ Port::start();
++ registerSelf();
+ }
+
- assert(!registered());
- SendMessage(coordinatorPathAddr, Message(mtRegister, KidIdentifier, getpid()));
++void Ipc::Strand::registerSelf()
+ {
+ debugs(54, 6, HERE);
-void Ipc::Strand::handleRead(const Message& message)
++ Must(!isRegistered);
++ SendMessage(coordinatorAddr,
++ Message(mtRegistration, KidIdentifier, getpid()));
++ setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
+ }
+
- case mtRegister:
++void Ipc::Strand::receive(const Message& message)
+ {
+ debugs(54, 6, HERE);
+ switch (message.type()) {
+
- debugs(54, 6, HERE << "Unhandled message of type: " << message.type());
++ case mtRegistration:
+ handleRegistrationResponse(message.strand());
+ break;
+
+ default:
- // handle registration respond from coordinator
- // coordinator returns the same message
- isRegistered = (strand.kidId == KidIdentifier && strand.pid == getpid());
- debugs(54, 6, "Kid " << KidIdentifier << " is " << (char*)(isRegistered ? "" : "NOT ") << "registered");
- setListenTimeout(NULL, -1);
-}
-
-void Ipc::Strand::setListenTimeout(TimeoutHandler timeoutHandler, int timeout)
-{
- AsyncCall::Pointer listenTimeoutHandler = NULL;
- if (timeout > 0) {
- assert(timeoutHandler != NULL);
- listenTimeoutHandler = asyncCall(54, 6, "Ipc::Strand::timeoutHandler",
- CommCbMemFunT<Strand, CommTimeoutCbParams>(this, timeoutHandler));
- }
- setTimeout(listenTimeoutHandler, timeout);
-}
-
-void Ipc::Strand::noteRegistrationTimeout(const CommTimeoutCbParams& params)
-{
- debugs(54, 6, HERE);
- if (!registered()) {
- debugs(54, 6, HERE << "Kid " << KidIdentifier << " is not registered");
- exit(1);
++ debugs(54, 6, HERE << "Unhandled message type: " << message.type());
+ break;
+ }
+ }
+
+ void Ipc::Strand::handleRegistrationResponse(const StrandData& strand)
+ {
-bool Ipc::Strand::registered() const
++ // handle registration response from the coordinator; it could be stale
++ if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
++ debugs(54, 6, "kid" << KidIdentifier << " registered");
++ clearTimeout(); // we are done
++ } else {
++ // could be an ACK to the registration message of our dead predecessor
++ debugs(54, 6, "kid" << KidIdentifier << " is not yet registered");
++ // keep listening, with a timeout
+ }
+ }
+
- return isRegistered;
++void Ipc::Strand::timedout()
+ {
++ debugs(54, 6, HERE << isRegistered);
++ if (!isRegistered)
++ fatalf("kid%d registration timed out", KidIdentifier);
+ }
--- /dev/null
-
-#include "SquidString.h"
+ /*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+ #ifndef SQUID_IPC_STRAND_H
+ #define SQUID_IPC_STRAND_H
+
-/**
- Strand implement functionality of Coordinator's client and
- send registration query to Coordinator with it KidIdentifier.
-*/
-class Strand: public Port, public RefCountable
+ #include "ipc/Port.h"
+
+
+ namespace Ipc
+ {
+
+
-private:
- typedef void (Strand::*TimeoutHandler)(const CommTimeoutCbParams&);
-
-private:
- Strand(const Strand&); // not implemented
- Strand& operator =(const Strand&); // not implemented
-
++/// Receives coordination messages on behalf of its process or thread
++class Strand: public Port
+ {
-public:
- virtual void start();
- /// send register query
- void enroll();
- bool registered() const;
+ public:
+ Strand();
+
- virtual void handleRead(const Message& message);
++ virtual void start(); // Port (AsyncJob) API
++
++protected:
++ virtual void timedout(); // Port (UsdOp) API
++ virtual void receive(const Message& message); // Port API
+
+ private:
- void setListenTimeout(TimeoutHandler timeoutHandler, int timeout);
- void noteRegistrationTimeout(const CommTimeoutCbParams& params);
++ void registerSelf(); /// let Coordinator know this strand exists
+ void handleRegistrationResponse(const StrandData& strand);
- bool isRegistered;
+
+ private:
++ bool isRegistered; ///< whether Coordinator ACKed registration (unused)
+
+ CBDATA_CLASS2(Strand);
++
++private:
++ Strand(const Strand&); // not implemented
++ Strand& operator =(const Strand&); // not implemented
+ };
+
+
+ }
+
+
+ #endif /* SQUID_IPC_STRAND_H */
--- /dev/null
-#define SEND_RETRIES 4
-#define SEND_TIMEOUT 4
+ /*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+
+ #include "config.h"
+ #include "comm.h"
+ #include "ipc/UdsOp.h"
+
-Ipc::Message::Message():
- data()
+
-Ipc::MessageType Ipc::Message::type() const
++Ipc::Message::Message()
+ {
++ data.messageType = mtNone;
++ data.strand.kidId = -1;
+ }
+
+ Ipc::Message::Message(MessageType messageType, int kidId, pid_t pid)
+ {
+ data.messageType = messageType;
+ data.strand.kidId = kidId;
+ data.strand.pid = pid;
+ }
+
- return data.messageType;
++const Ipc::StrandData &Ipc::Message::strand() const
+ {
-const Ipc::StrandData& Ipc::Message::strand() const
-{
- return data.strand;
-}
-
-char* Ipc::Message::rawData()
-{
- return (char*)&data;
-}
-
-size_t Ipc::Message::size()
-{
- return sizeof(data);
-}
-
-
-Ipc::UdsOp::UdsOp(const String& pathAddr, bool bind /* = true */):
++ Must(data.messageType == mtRegistration);
++ return data.strand;
+ }
+
- debugs(54, 5, HERE << '[' << this << "] pathAddr " << pathAddr);
- if (bind) {
- unlink(pathAddr.termedBuf());
- options |= COMM_DOBIND;
- }
++Ipc::UdsOp::UdsOp(const String& pathAddr):
+ AsyncJob("Ipc::UdsOp"),
+ addr(setAddr(pathAddr)),
+ options(COMM_NONBLOCKING),
+ fd_(-1)
+ {
- if (fd_ > 0)
++ debugs(54, 5, HERE << '[' << this << "] pathAddr=" << pathAddr);
+ }
+
+ Ipc::UdsOp::~UdsOp()
+ {
+ debugs(54, 5, HERE << '[' << this << ']');
-bool Ipc::UdsOp::doneAll() const
++ if (fd_ >= 0)
+ comm_close(fd_);
+ }
+
- return false;
++void Ipc::UdsOp::setOptions(int newOptions)
+ {
- Must(fd_ > 0);
++ options = newOptions;
+ }
+
+ int Ipc::UdsOp::fd()
+ {
+ if (fd_ < 0) {
++ if (options & COMM_DOBIND)
++ unlink(addr.sun_path);
+ fd_ = comm_open_uds(SOCK_DGRAM, 0, &addr, options);
-void Ipc::UdsOp::setTimeout(AsyncCall::Pointer& timeoutHandler, int timeout)
++ Must(fd_ >= 0);
+ }
+ return fd_;
+ }
+
+ struct sockaddr_un Ipc::UdsOp::setAddr(const String& pathAddr)
+ {
+ assert(pathAddr.size() != 0);
+ struct sockaddr_un unixAddr;
+ memset(&unixAddr, 0, sizeof(unixAddr));
+ unixAddr.sun_family = AF_LOCAL;
+ xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path));
+ return unixAddr;
+ }
+
- commSetTimeout(fd(), timeout, timeoutHandler);
++void Ipc::UdsOp::setTimeout(int seconds, const char *handlerName)
+ {
- UdsOp(pathAddr, false),
++ AsyncCall::Pointer handler = asyncCall(54,5, handlerName,
++ CommCbMemFunT<UdsOp, CommTimeoutCbParams>(this,
++ &UdsOp::noteTimeout));
++ commSetTimeout(fd(), seconds, handler);
++}
++
++void Ipc::UdsOp::clearTimeout()
++{
++ commSetTimeout(fd(), -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd)
++}
++
++void Ipc::UdsOp::noteTimeout(const CommTimeoutCbParams &)
++{
++ timedout(); // our kid handles communication timeout
+ }
+
+
+ CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender);
+
+ Ipc::UdsSender::UdsSender(const String& pathAddr, const Message& aMessage):
- retries(SEND_RETRIES),
- timeout(SEND_TIMEOUT)
++ UdsOp(pathAddr),
+ message(aMessage),
- assert(retries > 0);
- assert(timeout >= 0);
++ retries(4), // TODO: make configurable?
++ timeout(5), // TODO: make configurable?
++ writing(false)
+ {
- {
- AsyncCall::Pointer timeoutHandler = asyncCall(54, 5, "Ipc::UdsSender::noteTimeout",
- CommCbMemFunT<UdsSender, CommTimeoutCbParams>(this, &UdsSender::noteTimeout));
- setTimeout(timeoutHandler, timeout);
- }
+ }
+
+ void Ipc::UdsSender::start()
+ {
++ UdsOp::start();
+ write();
+ if (timeout > 0)
-bool Ipc::UdsSender::retry()
++ setTimeout(timeout, "Ipc::UdsSender::noteTimeout");
+ }
+
- if (retries > 0)
- --retries;
- return retries != 0;
++bool Ipc::UdsSender::doneAll() const
+ {
- AsyncCall::Pointer writeHandler = asyncCall(54, 5, "Ipc::UdsSender::noteWrite",
- CommCbMemFunT<UdsSender, CommIoCbParams>(this, &UdsSender::noteWrite));
- comm_write(fd(), message.rawData(), message.size(), writeHandler);
++ return !writing && UdsOp::doneAll();
+ }
+
+ void Ipc::UdsSender::write()
+ {
+ debugs(54, 5, HERE);
-void Ipc::UdsSender::noteWrite(const CommIoCbParams& params)
++ AsyncCall::Pointer writeHandler = asyncCall(54, 5, "Ipc::UdsSender::wrote",
++ CommCbMemFunT<UdsSender, CommIoCbParams>(this, &UdsSender::wrote));
++ comm_write(fd(), message.raw(), message.size(), writeHandler);
++ writing = true;
+ }
+
- if (params.flag == COMM_OK || !retry())
- mustStop("done");
- else
- write();
++void Ipc::UdsSender::wrote(const CommIoCbParams& params)
+ {
+ debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
-void Ipc::UdsSender::noteTimeout(const CommTimeoutCbParams& params)
++ writing = false;
++ if (params.flag != COMM_OK && retries-- > 0)
++ write(); // XXX: should we close on error so that fd() reopens?
+ }
+
- mustStop("done");
++void Ipc::UdsSender::timedout()
+ {
+ debugs(54, 5, HERE);
++ mustStop("timedout");
+ }
+
+
+ void Ipc::SendMessage(const String& toAddress, const Message& message)
+ {
+ AsyncJob::AsyncStart(new UdsSender(toAddress, message));
+ }
--- /dev/null
-typedef enum {mtNone = 0, mtRegister} MessageType;
+ /*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+ #ifndef SQUID_IPC_ASYNCUDSOP_H
+ #define SQUID_IPC_ASYNCUDSOP_H
+
+
+ #include "SquidString.h"
+ #include "CommCalls.h"
+
+
+ namespace Ipc
+ {
+
+
-/**
- This one contains a registration info
-*/
++typedef enum { mtNone = 0, mtRegistration } MessageType;
+
- int kidId;
++/// Strand registration information
+ struct StrandData
+ {
-/**
- This class contains data is used by UdsSender/UdsReceiver.
-*/
++ int kidId;
+ pid_t pid;
+ };
+
-public:
- MessageType type() const;
++/// information sent or received during IPC
+ class Message
+ {
+ public:
+ Message();
+ Message(MessageType messageType, int kidId, pid_t pid);
+
- char* rawData();
- size_t size();
++ /// raw, type-independent access
++ int type() const { return data.messageType; }
++ char *raw() { return reinterpret_cast<char*>(&data); }
++ const char *raw() const { return reinterpret_cast<const char*>(&data); }
++ size_t size() const { return sizeof(data); }
++
++ /// type-dependent access
+ const StrandData& strand() const;
- struct {
- MessageType messageType;
- StrandData strand;
- } data;
+
+ private:
-/**
- UdsOp implements common async UDS operation.
-*/
++ struct Data {
++ int messageType;
++ StrandData strand;
++ // TODO: redesign to better handle many type-specific datas like strand
++ } data; ///< everything being sent or received
+ };
+
-private:
- UdsOp(const UdsOp&); // not implemented
- UdsOp& operator= (const UdsOp&); // not implemented
-
++/// code shared by unix-domain socket senders (e.g., UdsSender or Coordinator)
++/// and receivers (e.g. Port or Coordinator)
+ class UdsOp: public AsyncJob
+ {
- UdsOp(const String& pathAddr, bool bind = true);
+ public:
- /// return an endpoint for communication, use fd() instead of fd_
- int fd();
- virtual bool doneAll() const;
- void setTimeout(AsyncCall::Pointer& timeoutHandler, int aTimeout);
++ UdsOp(const String &pathAddr);
+ virtual ~UdsOp();
+
+ protected:
- struct sockaddr_un setAddr(const String& pathAddr);
++ virtual void timedout() {} ///< called after setTimeout() if timed out
++
++ int fd(); ///< creates if needed and returns raw UDS socket descriptor
++
++ /// call timedout() if no UDS messages in a given number of seconds
++ void setTimeout(int seconds, const char *handlerName);
++ void clearTimeout(); ///< remove previously set timeout, if any
++
++ void setOptions(int newOptions); ///< changes socket options
++
++private:
++ /// Comm timeout callback; calls timedout()
++ void noteTimeout(const CommTimeoutCbParams &p);
++
++ /// configures addr member
++ struct sockaddr_un setAddr(const String &pathAddr);
+
+ private:
- struct sockaddr_un addr;
- int options;
- int fd_;
++ struct sockaddr_un addr; ///< UDS address
++ int options; ///< UDS options
++ int fd_; ///< UDS descriptor
+
+ private:
-/**
- Implement async write operation for UDS
-*/
++ UdsOp(const UdsOp &); // not implemented
++ UdsOp &operator= (const UdsOp &); // not implemented
+ };
+
-private:
- UdsSender(const UdsSender&); // not implemented
- UdsSender& operator= (const UdsSender&); // not implemented
-
++// XXX: move UdsSender code to UdsSender.{cc,h}
++/// attempts to send an IPC message a few times, with a timeout
+ class UdsSender: public UdsOp
+ {
-public:
- /// start writing data
- virtual void start();
+ public:
+ UdsSender(const String& pathAddr, const Message& aMessage);
+
- /// update retries counter and check
- bool retry();
- /// schedule writing
- void write();
- void noteWrite(const CommIoCbParams& params);
- void noteTimeout(const CommTimeoutCbParams& params);
++protected:
++ virtual void start(); // UdsOp (AsyncJob) API
++ virtual bool doneAll() const; // UdsOp (AsyncJob) API
++ virtual void timedout(); // UdsOp API
+
+ private:
- Message message;
- int retries;
- int timeout;
++ void write(); ///< schedule writing
++ void wrote(const CommIoCbParams& params); ///< done writing or error
+
+ private:
++ Message message; ///< what to send
++ int retries; ///< how many times to try after a write error
++ int timeout; ///< total time to send the message
++ bool writing; ///< whether Comm started and did not finish writing
+
+ CBDATA_CLASS2(UdsSender);
++
++private:
++ UdsSender(const UdsSender&); // not implemented
++ UdsSender& operator= (const UdsSender&); // not implemented
+ };
+
+
+ void SendMessage(const String& toAddress, const Message& message);
+
+
+ }
+
+ #endif /* SQUID_IPC_ASYNCUDSOP_H */