From: Alex Rousskov Date: Mon, 26 Apr 2010 07:09:03 +0000 (-0600) Subject: Added IPC Strand and Coordinator classes. Strands are jobs responsible X-Git-Tag: SQUID_3_2_0_1~93^2~32 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=ba568924a51f14ee6b10c7470e337d87477bd575;p=thirdparty%2Fsquid.git Added IPC Strand and Coordinator classes. Strands are jobs responsible for registering Squid processes or threads with the central Coordinator job. Coordinator will broadcast control signals and shared ports to Strands. Added a simple hierarchy of inter-process communication (IPC) classes to support Coordinator and Strands. Print current process number (KidIdentifier) when writing debug messages. This allows to easily isolate per-process progress even when using a single cache.log. --- ba568924a51f14ee6b10c7470e337d87477bd575 diff --cc src/comm.cc index 1e8ca109ad,32f4a11f4b..f2dd988152 --- a/src/comm.cc +++ b/src/comm.cc @@@ -1883,7 -1883,11 +1883,11 @@@ commHandleWrite(int fd, void *data (long int) state->offset << ", sz " << (long int) state->size << "."); nleft = state->size - state->offset; - len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); - // XXX: if we use UDS and it is not connected then 'sendto' will be called. ++ // XXX: tmp hack: call sendto() for unconnected UDS sockets + if (!fd_table[fd].flags.called_connect && fd_table[fd].unix_addr.sun_family == AF_LOCAL) + len = sendto(fd, state->buf + state->offset, nleft, 0, (sockaddr*)&fd_table[fd].unix_addr, SUN_LEN(&fd_table[fd].unix_addr)); + else + len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); debugs(5, 5, "commHandleWrite: write() returns " << len); fd_bytes(fd, len, FD_WRITE); statCounter.syscalls.sock.writes++; @@@ -2409,3 -2413,99 +2413,99 @@@ CommSelectEngine::checkEvents(int timeo return EVENT_ERROR; }; } + -/** - * Create a unix-domain socket - */ ++/// Create a unix-domain socket (UDS) + int + comm_open_uds(int sock_type, + int proto, + struct sockaddr_un* addr, + int flags) + { ++ // TODO: merge with comm_openex() when IpAddress becomes NetAddress ++ + int new_socket; - struct addrinfo AI; + + PROF_start(comm_open); - + /* Create socket for accepting new connections. */ + statCounter.syscalls.sock.sockets++; + + /* Setup the socket addrinfo details for use */ ++ struct addrinfo AI; + AI.ai_flags = 0; + AI.ai_family = PF_UNIX; + AI.ai_socktype = sock_type; + AI.ai_protocol = proto; + AI.ai_addrlen = SUN_LEN(addr); + AI.ai_addr = (sockaddr*)addr; + AI.ai_canonname = NULL; + AI.ai_next = NULL; + - debugs(50, 3, "comm_openex: Attempt open socket for: " << addr->sun_path); ++ debugs(50, 3, HERE << "Attempt open socket for: " << addr->sun_path); + + if ((new_socket = socket(AI.ai_family, AI.ai_socktype, AI.ai_protocol)) < 0) { + /* Increase the number of reserved fd's if calls to socket() + * are failing because the open file table is full. This + * limits the number of simultaneous clients */ + + if (limitError(errno)) { - debugs(50, DBG_IMPORTANT, "comm_open: socket failure: " << xstrerror()); ++ debugs(50, DBG_IMPORTANT, HERE << "socket failure: " << xstrerror()); + fdAdjustReserved(); + } else { - debugs(50, DBG_CRITICAL, "comm_open: socket failure: " << xstrerror()); ++ debugs(50, DBG_CRITICAL, HERE << "socket failure: " << xstrerror()); + } + + PROF_stop(comm_open); + return -1; + } + - debugs(50, 3, "comm_openex: Opened socket FD " << new_socket << " : family=" << AI.ai_family << ", type=" << AI.ai_socktype << ", protocol=" << AI.ai_protocol ); ++ debugs(50, 3, HERE "Opened UDS FD " << new_socket << " : family=" << AI.ai_family << ", type=" << AI.ai_socktype << ", protocol=" << AI.ai_protocol); + + /* update fdstat */ - debugs(50, 5, "comm_open: FD " << new_socket << " is a new socket"); ++ debugs(50, 5, HERE << "FD " << new_socket << " is a new socket"); + + assert(!isOpen(new_socket)); - + fd_open(new_socket, FD_SOCKET, NULL); - fd_table[new_socket].sock_family = AI.ai_family; - - fd_table[new_socket].unix_addr = *addr; + + fdd_table[new_socket].close_file = NULL; ++ + fdd_table[new_socket].close_line = 0; + ++ fd_table[new_socket].unix_addr = *addr; ++ ++ fd_table[new_socket].sock_family = AI.ai_family; ++ + if (!(flags & COMM_NOCLOEXEC)) + commSetCloseOnExec(new_socket); + + if (flags & COMM_REUSEADDR) + commSetReuseAddr(new_socket); + + if (flags & COMM_NONBLOCKING) { + if (commSetNonBlocking(new_socket) != COMM_OK) { + comm_close(new_socket); + PROF_stop(comm_open); + return -1; + } + } + + if (flags & COMM_DOBIND) { + if (commBind(new_socket, AI) != COMM_OK) { + comm_close(new_socket); + PROF_stop(comm_open); + return -1; + } + } + + #ifdef TCP_NODELAY + if (sock_type == SOCK_STREAM) + commSetTcpNoDelay(new_socket); + + #endif + + if (Config.tcpRcvBufsz > 0 && sock_type == SOCK_STREAM) + commSetTcpRcvbuf(new_socket, Config.tcpRcvBufsz); + + PROF_stop(comm_open); + + return new_socket; + } diff --cc src/debug.cc index b82ffdd878,b82ffdd878..41b4907ad4 --- a/src/debug.cc +++ b/src/debug.cc @@@ -36,6 -36,6 +36,7 @@@ #include "Debug.h" #include "SquidTime.h" #include "util.h" ++#include "ipc/Kids.h" /* for shutting_down flag in xassert() */ #include "globals.h" @@@ -52,6 -52,6 +53,7 @@@ FILE *debug_log = NULL static char *debug_log_file = NULL; static int Ctx_Lock = 0; static const char *debugLogTime(void); ++static const char *debugLogKid(void); static void ctx_print(void); #if HAVE_SYSLOG #ifdef LOG_LOCAL4 @@@ -117,8 -117,8 +119,9 @@@ _db_print(const char *format,... va_start(args2, format); va_start(args3, format); -- snprintf(f, BUFSIZ, "%s| %s", ++ snprintf(f, BUFSIZ, "%s%s| %s", debugLogTime(), ++ debugLogKid(), format); _db_print_file(f, args1); @@@ -543,6 -543,6 +546,18 @@@ debugLogTime(void return buf; } ++static const char * ++debugLogKid(void) ++{ ++ if (KidIdentifier != 0) { ++ static char buf[16]; ++ snprintf(buf, sizeof(buf), " kid%d", KidIdentifier); ++ return buf; ++ } ++ ++ return ""; ++} ++ void xassert(const char *msg, const char *file, int line) { diff --cc src/fde.h index a0b903d1a3,68a6db6cc5..6a7365c041 --- a/src/fde.h +++ b/src/fde.h @@@ -60,6 -60,7 +60,7 @@@ public int sock_family; char ipaddr[MAX_IPSTRLEN]; /* dotted decimal address of peer */ char desc[FD_DESC_SZ]; - struct sockaddr_un unix_addr; // store address of unix domain socket when it's needed ++ struct sockaddr_un unix_addr; ///< unix domain socket (UDS) address, if any struct { unsigned int open:1; diff --cc src/ipc/Coordinator.cc index 0000000000,3a11401fce..b0e4893e24 mode 000000,100644..100644 --- a/src/ipc/Coordinator.cc +++ b/src/ipc/Coordinator.cc @@@ -1,0 -1,63 +1,64 @@@ + /* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + + + #include "config.h" + #include "ipc/Coordinator.h" + + + CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator); + + + Ipc::Coordinator::Coordinator(): - Port(coordinatorPathAddr) ++ Port(coordinatorAddr) + { + } + + void Ipc::Coordinator::start() + { - listen(); ++ Port::start(); + } + + Ipc::StrandData* Ipc::Coordinator::findStrand(int kidId) + { + for (Vector::iterator iter = strands.begin(); iter != strands.end(); ++iter) { + if (iter->kidId == kidId) + return &(*iter); + } + return NULL; + } + -void Ipc::Coordinator::enrollStrand(const StrandData& strand) ++void Ipc::Coordinator::registerStrand(const StrandData& strand) + { + if (StrandData* found = findStrand(strand.kidId)) + *found = strand; + else + strands.push_back(strand); + } + -void Ipc::Coordinator::handleRead(const Message& message) ++void Ipc::Coordinator::receive(const Message& message) + { + switch (message.type()) { - case mtRegister: ++ case mtRegistration: + debugs(54, 6, HERE << "Registration request"); + handleRegistrationRequest(message.strand()); + break; + + default: - debugs(54, 6, HERE << "Unhandled message of type: " << message.type()); ++ debugs(54, 6, HERE << "Unhandled message type: " << message.type()); + break; + } + } + + void Ipc::Coordinator::handleRegistrationRequest(const StrandData& strand) + { - // register strand - enrollStrand(strand); - // send back received message - SendMessage(makeAddr(strandPathAddr, strand.kidId), Message(mtRegister, strand.kidId, strand.pid)); ++ registerStrand(strand); ++ ++ // send back an acknowledgement; TODO: remove as not needed? ++ SendMessage(MakeAddr(strandAddrPfx, strand.kidId), ++ Message(mtRegistration, strand.kidId, strand.pid)); + } diff --cc src/ipc/Coordinator.h index 0000000000,a3006fe907..c4620dc970 mode 000000,100644..100644 --- a/src/ipc/Coordinator.h +++ b/src/ipc/Coordinator.h @@@ -1,0 -1,52 +1,46 @@@ + /* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + + #ifndef SQUID_IPC_COORDINATOR_H + #define SQUID_IPC_COORDINATOR_H + + + #include "Array.h" -#include "SquidString.h" + #include "ipc/Port.h" + + + namespace Ipc + { + - -/** - Coordinator processes incoming queries about registration of running squid instances - and store it KidIndentiers. -*/ -class Coordinator: public Port, public RefCountable ++/// Coordinates shared activities of Strands (Squid processes or threads) ++class Coordinator: public Port + { -private: - Coordinator(const Coordinator&); // not implemented - Coordinator& operator =(const Coordinator&); // not implemented - + public: + Coordinator(); + -public: - virtual void start(); ++protected: ++ virtual void start(); // Port (AsyncJob) API ++ virtual void receive(const Message& message); // Port API + -private: - virtual void handleRead(const Message& message); - StrandData* findStrand(int kidId); - void enrollStrand(const StrandData& strand); - void handleRegistrationRequest(const StrandData& strand); ++ StrandData* findStrand(int kidId); ///< registered strand or NULL ++ void registerStrand(const StrandData &); ///< adds or updates existing ++ void handleRegistrationRequest(const StrandData &); ///< registers and ACKs + + private: - Vector strands; ++ Vector strands; ///< registered processes and threads + + CBDATA_CLASS2(Coordinator); ++ ++private: ++ Coordinator(const Coordinator&); // not implemented ++ Coordinator& operator =(const Coordinator&); // not implemented + }; + + -} ++} // namespace Ipc + + #endif /* SQUID_IPC_COORDINATOR_H */ diff --cc src/ipc/Kids.cc index cf97689b45,98932b8aac..ede9d15d77 --- a/src/ipc/Kids.cc +++ b/src/ipc/Kids.cc @@@ -26,11 -26,16 +26,19 @@@ void Kids::init(size_t n storage.reserve(n); + char kid_name[32]; ++ ++ // add Kid records for all n main strands for (size_t i = 1; i <= n; ++i) { - char kid_name[32]; snprintf(kid_name, sizeof(kid_name), "(squid-%d)", (int)i); storage.push_back(Kid(kid_name)); } + ++ // if coordination is needed, add a Kid record for Coordinator + if (n > 1) { + snprintf(kid_name, sizeof(kid_name), "(squid-coord-%d)", (int)(n + 1)); + storage.push_back(Kid(kid_name)); + } } /// returns kid by pid diff --cc src/ipc/Port.cc index 0000000000,1dcd7ff654..5a01a700e6 mode 000000,100644..100644 --- a/src/ipc/Port.cc +++ b/src/ipc/Port.cc @@@ -1,0 -1,51 +1,64 @@@ + /* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + + + #include "config.h" + #include "ipc/Port.h" + - -const char Ipc::coordinatorPathAddr[] = DEFAULT_PREFIX "/ipc/coordinator"; -const char Ipc::strandPathAddr[] = DEFAULT_PREFIX "/ipc/squid"; ++const char Ipc::coordinatorAddr[] = DEFAULT_PREFIX "/var/run/coordinator.ipc"; ++const char Ipc::strandAddrPfx[] = DEFAULT_PREFIX "/var/run/squid"; + + + Ipc::Port::Port(const String& aListenAddr): - UdsOp(aListenAddr), - listenAddr(aListenAddr) ++ UdsOp(aListenAddr) + { - assert(listenAddr.size() > sizeof(DEFAULT_PREFIX)); ++ setOptions(COMM_NONBLOCKING | COMM_DOBIND); ++} ++ ++void Ipc::Port::start() ++{ ++ UdsOp::start(); ++ listen(); + } + + void Ipc::Port::listen() + { + debugs(54, 6, HERE); + AsyncCall::Pointer readHandler = asyncCall(54, 6, "Ipc::Port::noteRead", + CommCbMemFunT(this, &Port::noteRead)); - comm_read(fd(), message.rawData(), message.size(), readHandler); ++ comm_read(fd(), buf.raw(), buf.size(), readHandler); + } + -String Ipc::Port::makeAddr(const char* pathAddr, int id) const ++bool Ipc::Port::doneAll() const ++{ ++ return false; // listen forever ++} ++ ++String Ipc::Port::MakeAddr(const char* pathAddr, int id) + { + assert(id >= 0); + String addr = pathAddr; + addr.append('-'); + addr.append(xitoa(id)); ++ addr.append(".ipc"); + return addr; + } + + void Ipc::Port::noteRead(const CommIoCbParams& params) + { - debugs(54, 6, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']'); - assert(params.data == this); ++ debugs(54, 6, HERE << "FD " << params.fd << " flag " << params.flag << ++ " [" << this << ']'); + if (params.flag == COMM_OK) { - assert(params.buf == (char*)&message); - assert(params.size == sizeof(Message)); - handleRead(message); ++ assert(params.buf == buf.raw()); ++ assert(params.size == buf.size()); ++ receive(buf); + } ++ // TODO: if there was a fatal error on our socket, close the socket before ++ // trying to listen again and print a level-1 error message. ++ + listen(); + } diff --cc src/ipc/Port.h index 0000000000,00332d33d1..8c87280cc2 mode 000000,100644..100644 --- a/src/ipc/Port.h +++ b/src/ipc/Port.h @@@ -1,0 -1,53 +1,53 @@@ + /* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + + #ifndef SQUID_IPC_PORT_H + #define SQUID_IPC_PORT_H + + + #include "SquidString.h" + #include "ipc/UdsOp.h" + + + namespace Ipc + { + + -/** - Base class implements functionality of local endpoint - is listening incoming connections -*/ ++/// Waits for and receives incoming IPC messages; kids handle the messages + class Port: public UdsOp + { + public: - Port(const String& aListenAddr); ++ Port(const String &aListenAddr); + -public: - /// start listening of incoming connections ++protected: ++ /// calculates IPC message address for strand #id at path ++ static String MakeAddr(const char *path, int id); ++ ++ virtual void start() = 0; // UdsOp (AsyncJob) API; has body ++ virtual bool doneAll() const; // UdsOp (AsyncJob) API ++ ++ /// read the next incoming message + void listen(); - String makeAddr(const char* pathAddr, int id) const; + -protected: - virtual void handleRead(const Message& message) = 0; ++ /// handle IPC message just read ++ virtual void receive(const Message& message) = 0; + + private: - void noteRead(const CommIoCbParams& params); ++ void noteRead(const CommIoCbParams ¶ms); // Comm callback API + + private: - String listenAddr; - Message message; ++ Message buf; ///< UDS read buffer filled by Comm + }; + + -extern const char coordinatorPathAddr[]; -extern const char strandPathAddr[]; - ++extern const char coordinatorAddr[]; ///< where coordinator listens ++extern const char strandAddrPfx[]; ///< strand's listening address prefix + -} ++} // namespace Ipc + + + #endif /* SQUID_IPC_PORT_H */ diff --cc src/ipc/Strand.cc index 0000000000,510a219561..a220d3bf5e mode 000000,100644..100644 --- a/src/ipc/Strand.cc +++ b/src/ipc/Strand.cc @@@ -1,0 -1,84 +1,70 @@@ + /* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + - + #include "config.h" + #include "ipc/Strand.h" + #include "ipc/Kids.h" + + + CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand); + + + Ipc::Strand::Strand(): - Port(makeAddr(strandPathAddr, KidIdentifier)), ++ Port(MakeAddr(strandAddrPfx, KidIdentifier)), + isRegistered(false) + { + } + + void Ipc::Strand::start() + { - listen(); - setListenTimeout(&Strand::noteRegistrationTimeout, 6); - enroll(); ++ Port::start(); ++ registerSelf(); + } + -void Ipc::Strand::enroll() ++void Ipc::Strand::registerSelf() + { + debugs(54, 6, HERE); - assert(!registered()); - SendMessage(coordinatorPathAddr, Message(mtRegister, KidIdentifier, getpid())); ++ Must(!isRegistered); ++ SendMessage(coordinatorAddr, ++ Message(mtRegistration, KidIdentifier, getpid())); ++ setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable? + } + -void Ipc::Strand::handleRead(const Message& message) ++void Ipc::Strand::receive(const Message& message) + { + debugs(54, 6, HERE); + switch (message.type()) { + - case mtRegister: ++ case mtRegistration: + handleRegistrationResponse(message.strand()); + break; + + default: - debugs(54, 6, HERE << "Unhandled message of type: " << message.type()); ++ debugs(54, 6, HERE << "Unhandled message type: " << message.type()); + break; + } + } + + void Ipc::Strand::handleRegistrationResponse(const StrandData& strand) + { - // handle registration respond from coordinator - // coordinator returns the same message - isRegistered = (strand.kidId == KidIdentifier && strand.pid == getpid()); - debugs(54, 6, "Kid " << KidIdentifier << " is " << (char*)(isRegistered ? "" : "NOT ") << "registered"); - setListenTimeout(NULL, -1); -} - -void Ipc::Strand::setListenTimeout(TimeoutHandler timeoutHandler, int timeout) -{ - AsyncCall::Pointer listenTimeoutHandler = NULL; - if (timeout > 0) { - assert(timeoutHandler != NULL); - listenTimeoutHandler = asyncCall(54, 6, "Ipc::Strand::timeoutHandler", - CommCbMemFunT(this, timeoutHandler)); - } - setTimeout(listenTimeoutHandler, timeout); -} - -void Ipc::Strand::noteRegistrationTimeout(const CommTimeoutCbParams& params) -{ - debugs(54, 6, HERE); - if (!registered()) { - debugs(54, 6, HERE << "Kid " << KidIdentifier << " is not registered"); - exit(1); ++ // handle registration response from the coordinator; it could be stale ++ if (strand.kidId == KidIdentifier && strand.pid == getpid()) { ++ debugs(54, 6, "kid" << KidIdentifier << " registered"); ++ clearTimeout(); // we are done ++ } else { ++ // could be an ACK to the registration message of our dead predecessor ++ debugs(54, 6, "kid" << KidIdentifier << " is not yet registered"); ++ // keep listening, with a timeout + } + } + -bool Ipc::Strand::registered() const ++void Ipc::Strand::timedout() + { - return isRegistered; ++ debugs(54, 6, HERE << isRegistered); ++ if (!isRegistered) ++ fatalf("kid%d registration timed out", KidIdentifier); + } diff --cc src/ipc/Strand.h index 0000000000,785e1451fc..da207d005c mode 000000,100644..100644 --- a/src/ipc/Strand.h +++ b/src/ipc/Strand.h @@@ -1,0 -1,58 +1,48 @@@ + /* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + + #ifndef SQUID_IPC_STRAND_H + #define SQUID_IPC_STRAND_H + - -#include "SquidString.h" + #include "ipc/Port.h" + + + namespace Ipc + { + + -/** - Strand implement functionality of Coordinator's client and - send registration query to Coordinator with it KidIdentifier. -*/ -class Strand: public Port, public RefCountable ++/// Receives coordination messages on behalf of its process or thread ++class Strand: public Port + { -private: - typedef void (Strand::*TimeoutHandler)(const CommTimeoutCbParams&); - -private: - Strand(const Strand&); // not implemented - Strand& operator =(const Strand&); // not implemented - + public: + Strand(); + -public: - virtual void start(); - /// send register query - void enroll(); - bool registered() const; ++ virtual void start(); // Port (AsyncJob) API ++ ++protected: ++ virtual void timedout(); // Port (UsdOp) API ++ virtual void receive(const Message& message); // Port API + + private: - virtual void handleRead(const Message& message); ++ void registerSelf(); /// let Coordinator know this strand exists + void handleRegistrationResponse(const StrandData& strand); - void setListenTimeout(TimeoutHandler timeoutHandler, int timeout); - void noteRegistrationTimeout(const CommTimeoutCbParams& params); + + private: - bool isRegistered; ++ bool isRegistered; ///< whether Coordinator ACKed registration (unused) + + CBDATA_CLASS2(Strand); ++ ++private: ++ Strand(const Strand&); // not implemented ++ Strand& operator =(const Strand&); // not implemented + }; + + + } + + + #endif /* SQUID_IPC_STRAND_H */ diff --cc src/ipc/UdsOp.cc index 0000000000,0fdcd4eecc..5f7f7703c3 mode 000000,100644..100644 --- a/src/ipc/UdsOp.cc +++ b/src/ipc/UdsOp.cc @@@ -1,0 -1,156 +1,145 @@@ + /* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + + + #include "config.h" + #include "comm.h" + #include "ipc/UdsOp.h" + -#define SEND_RETRIES 4 -#define SEND_TIMEOUT 4 + -Ipc::Message::Message(): - data() ++Ipc::Message::Message() + { ++ data.messageType = mtNone; ++ data.strand.kidId = -1; + } + + Ipc::Message::Message(MessageType messageType, int kidId, pid_t pid) + { + data.messageType = messageType; + data.strand.kidId = kidId; + data.strand.pid = pid; + } + -Ipc::MessageType Ipc::Message::type() const ++const Ipc::StrandData &Ipc::Message::strand() const + { - return data.messageType; ++ Must(data.messageType == mtRegistration); ++ return data.strand; + } + -const Ipc::StrandData& Ipc::Message::strand() const -{ - return data.strand; -} - -char* Ipc::Message::rawData() -{ - return (char*)&data; -} - -size_t Ipc::Message::size() -{ - return sizeof(data); -} - - -Ipc::UdsOp::UdsOp(const String& pathAddr, bool bind /* = true */): ++Ipc::UdsOp::UdsOp(const String& pathAddr): + AsyncJob("Ipc::UdsOp"), + addr(setAddr(pathAddr)), + options(COMM_NONBLOCKING), + fd_(-1) + { - debugs(54, 5, HERE << '[' << this << "] pathAddr " << pathAddr); - if (bind) { - unlink(pathAddr.termedBuf()); - options |= COMM_DOBIND; - } ++ debugs(54, 5, HERE << '[' << this << "] pathAddr=" << pathAddr); + } + + Ipc::UdsOp::~UdsOp() + { + debugs(54, 5, HERE << '[' << this << ']'); - if (fd_ > 0) ++ if (fd_ >= 0) + comm_close(fd_); + } + -bool Ipc::UdsOp::doneAll() const ++void Ipc::UdsOp::setOptions(int newOptions) + { - return false; ++ options = newOptions; + } + + int Ipc::UdsOp::fd() + { + if (fd_ < 0) { ++ if (options & COMM_DOBIND) ++ unlink(addr.sun_path); + fd_ = comm_open_uds(SOCK_DGRAM, 0, &addr, options); - Must(fd_ > 0); ++ Must(fd_ >= 0); + } + return fd_; + } + + struct sockaddr_un Ipc::UdsOp::setAddr(const String& pathAddr) + { + assert(pathAddr.size() != 0); + struct sockaddr_un unixAddr; + memset(&unixAddr, 0, sizeof(unixAddr)); + unixAddr.sun_family = AF_LOCAL; + xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path)); + return unixAddr; + } + -void Ipc::UdsOp::setTimeout(AsyncCall::Pointer& timeoutHandler, int timeout) ++void Ipc::UdsOp::setTimeout(int seconds, const char *handlerName) + { - commSetTimeout(fd(), timeout, timeoutHandler); ++ AsyncCall::Pointer handler = asyncCall(54,5, handlerName, ++ CommCbMemFunT(this, ++ &UdsOp::noteTimeout)); ++ commSetTimeout(fd(), seconds, handler); ++} ++ ++void Ipc::UdsOp::clearTimeout() ++{ ++ commSetTimeout(fd(), -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd) ++} ++ ++void Ipc::UdsOp::noteTimeout(const CommTimeoutCbParams &) ++{ ++ timedout(); // our kid handles communication timeout + } + + + CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender); + + Ipc::UdsSender::UdsSender(const String& pathAddr, const Message& aMessage): - UdsOp(pathAddr, false), ++ UdsOp(pathAddr), + message(aMessage), - retries(SEND_RETRIES), - timeout(SEND_TIMEOUT) ++ retries(4), // TODO: make configurable? ++ timeout(5), // TODO: make configurable? ++ writing(false) + { - assert(retries > 0); - assert(timeout >= 0); + } + + void Ipc::UdsSender::start() + { ++ UdsOp::start(); + write(); + if (timeout > 0) - { - AsyncCall::Pointer timeoutHandler = asyncCall(54, 5, "Ipc::UdsSender::noteTimeout", - CommCbMemFunT(this, &UdsSender::noteTimeout)); - setTimeout(timeoutHandler, timeout); - } ++ setTimeout(timeout, "Ipc::UdsSender::noteTimeout"); + } + -bool Ipc::UdsSender::retry() ++bool Ipc::UdsSender::doneAll() const + { - if (retries > 0) - --retries; - return retries != 0; ++ return !writing && UdsOp::doneAll(); + } + + void Ipc::UdsSender::write() + { + debugs(54, 5, HERE); - AsyncCall::Pointer writeHandler = asyncCall(54, 5, "Ipc::UdsSender::noteWrite", - CommCbMemFunT(this, &UdsSender::noteWrite)); - comm_write(fd(), message.rawData(), message.size(), writeHandler); ++ AsyncCall::Pointer writeHandler = asyncCall(54, 5, "Ipc::UdsSender::wrote", ++ CommCbMemFunT(this, &UdsSender::wrote)); ++ comm_write(fd(), message.raw(), message.size(), writeHandler); ++ writing = true; + } + -void Ipc::UdsSender::noteWrite(const CommIoCbParams& params) ++void Ipc::UdsSender::wrote(const CommIoCbParams& params) + { + debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']'); - if (params.flag == COMM_OK || !retry()) - mustStop("done"); - else - write(); ++ writing = false; ++ if (params.flag != COMM_OK && retries-- > 0) ++ write(); // XXX: should we close on error so that fd() reopens? + } + -void Ipc::UdsSender::noteTimeout(const CommTimeoutCbParams& params) ++void Ipc::UdsSender::timedout() + { + debugs(54, 5, HERE); - mustStop("done"); ++ mustStop("timedout"); + } + + + void Ipc::SendMessage(const String& toAddress, const Message& message) + { + AsyncJob::AsyncStart(new UdsSender(toAddress, message)); + } diff --cc src/ipc/UdsOp.h index 0000000000,66c4ed4875..7801f25b98 mode 000000,100644..100644 --- a/src/ipc/UdsOp.h +++ b/src/ipc/UdsOp.h @@@ -1,0 -1,119 +1,124 @@@ + /* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + + #ifndef SQUID_IPC_ASYNCUDSOP_H + #define SQUID_IPC_ASYNCUDSOP_H + + + #include "SquidString.h" + #include "CommCalls.h" + + + namespace Ipc + { + + -typedef enum {mtNone = 0, mtRegister} MessageType; ++typedef enum { mtNone = 0, mtRegistration } MessageType; + -/** - This one contains a registration info -*/ ++/// Strand registration information + struct StrandData + { - int kidId; ++ int kidId; + pid_t pid; + }; + -/** - This class contains data is used by UdsSender/UdsReceiver. -*/ ++/// information sent or received during IPC + class Message + { + public: + Message(); + Message(MessageType messageType, int kidId, pid_t pid); + -public: - MessageType type() const; ++ /// raw, type-independent access ++ int type() const { return data.messageType; } ++ char *raw() { return reinterpret_cast(&data); } ++ const char *raw() const { return reinterpret_cast(&data); } ++ size_t size() const { return sizeof(data); } ++ ++ /// type-dependent access + const StrandData& strand() const; - char* rawData(); - size_t size(); + + private: - struct { - MessageType messageType; - StrandData strand; - } data; ++ struct Data { ++ int messageType; ++ StrandData strand; ++ // TODO: redesign to better handle many type-specific datas like strand ++ } data; ///< everything being sent or received + }; + -/** - UdsOp implements common async UDS operation. -*/ ++/// code shared by unix-domain socket senders (e.g., UdsSender or Coordinator) ++/// and receivers (e.g. Port or Coordinator) + class UdsOp: public AsyncJob + { -private: - UdsOp(const UdsOp&); // not implemented - UdsOp& operator= (const UdsOp&); // not implemented - + public: - UdsOp(const String& pathAddr, bool bind = true); ++ UdsOp(const String &pathAddr); + virtual ~UdsOp(); + + protected: - /// return an endpoint for communication, use fd() instead of fd_ - int fd(); - virtual bool doneAll() const; - void setTimeout(AsyncCall::Pointer& timeoutHandler, int aTimeout); ++ virtual void timedout() {} ///< called after setTimeout() if timed out ++ ++ int fd(); ///< creates if needed and returns raw UDS socket descriptor ++ ++ /// call timedout() if no UDS messages in a given number of seconds ++ void setTimeout(int seconds, const char *handlerName); ++ void clearTimeout(); ///< remove previously set timeout, if any ++ ++ void setOptions(int newOptions); ///< changes socket options ++ ++private: ++ /// Comm timeout callback; calls timedout() ++ void noteTimeout(const CommTimeoutCbParams &p); ++ ++ /// configures addr member ++ struct sockaddr_un setAddr(const String &pathAddr); + + private: - struct sockaddr_un setAddr(const String& pathAddr); ++ struct sockaddr_un addr; ///< UDS address ++ int options; ///< UDS options ++ int fd_; ///< UDS descriptor + + private: - struct sockaddr_un addr; - int options; - int fd_; ++ UdsOp(const UdsOp &); // not implemented ++ UdsOp &operator= (const UdsOp &); // not implemented + }; + -/** - Implement async write operation for UDS -*/ ++// XXX: move UdsSender code to UdsSender.{cc,h} ++/// attempts to send an IPC message a few times, with a timeout + class UdsSender: public UdsOp + { -private: - UdsSender(const UdsSender&); // not implemented - UdsSender& operator= (const UdsSender&); // not implemented - + public: + UdsSender(const String& pathAddr, const Message& aMessage); + -public: - /// start writing data - virtual void start(); ++protected: ++ virtual void start(); // UdsOp (AsyncJob) API ++ virtual bool doneAll() const; // UdsOp (AsyncJob) API ++ virtual void timedout(); // UdsOp API + + private: - /// update retries counter and check - bool retry(); - /// schedule writing - void write(); - void noteWrite(const CommIoCbParams& params); - void noteTimeout(const CommTimeoutCbParams& params); ++ void write(); ///< schedule writing ++ void wrote(const CommIoCbParams& params); ///< done writing or error + + private: - Message message; - int retries; - int timeout; ++ Message message; ///< what to send ++ int retries; ///< how many times to try after a write error ++ int timeout; ///< total time to send the message ++ bool writing; ///< whether Comm started and did not finish writing + + CBDATA_CLASS2(UdsSender); ++ ++private: ++ UdsSender(const UdsSender&); // not implemented ++ UdsSender& operator= (const UdsSender&); // not implemented + }; + + + void SendMessage(const String& toAddress, const Message& message); + + + } + + #endif /* SQUID_IPC_ASYNCUDSOP_H */