]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Added IPC Strand and Coordinator classes. Strands are jobs responsible
authorAlex Rousskov <rousskov@measurement-factory.com>
Mon, 26 Apr 2010 07:09:03 +0000 (01:09 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Mon, 26 Apr 2010 07:09:03 +0000 (01:09 -0600)
for registering Squid processes or threads with the central Coordinator
job. Coordinator will broadcast control signals and shared ports to
Strands.

Added a simple hierarchy of inter-process communication (IPC) classes to
support Coordinator and Strands.

Print current process number (KidIdentifier) when writing debug messages.
This allows to easily isolate per-process progress even when using a single
cache.log.

12 files changed:
1  2 
src/comm.cc
src/debug.cc
src/fde.h
src/ipc/Coordinator.cc
src/ipc/Coordinator.h
src/ipc/Kids.cc
src/ipc/Port.cc
src/ipc/Port.h
src/ipc/Strand.cc
src/ipc/Strand.h
src/ipc/UdsOp.cc
src/ipc/UdsOp.h

diff --cc src/comm.cc
index 1e8ca109adf359f8d23b189438255d75b42ab827,32f4a11f4b8725cc9585b1621d1d57dcd5957008..f2dd9881529f7d7e28e66cbed4d9d94aeac18271
@@@ -1883,7 -1883,11 +1883,11 @@@ commHandleWrite(int fd, void *data
             (long int) state->offset << ", sz " << (long int) state->size << ".");
  
      nleft = state->size - state->offset;
-     len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
 -    // XXX: if we use UDS and it is not connected then 'sendto' will be called.
++    // XXX: tmp hack: call sendto() for unconnected UDS sockets
+     if (!fd_table[fd].flags.called_connect && fd_table[fd].unix_addr.sun_family == AF_LOCAL)
+         len = sendto(fd, state->buf + state->offset, nleft, 0, (sockaddr*)&fd_table[fd].unix_addr, SUN_LEN(&fd_table[fd].unix_addr));
+     else
+         len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
      debugs(5, 5, "commHandleWrite: write() returns " << len);
      fd_bytes(fd, len, FD_WRITE);
      statCounter.syscalls.sock.writes++;
@@@ -2409,3 -2413,99 +2413,99 @@@ CommSelectEngine::checkEvents(int timeo
          return EVENT_ERROR;
      };
  }
 -/**
 - * Create a unix-domain socket
 - */
 -    struct addrinfo AI;
++/// Create a unix-domain socket (UDS)
+ int
+ comm_open_uds(int sock_type,
+               int proto,
+               struct sockaddr_un* addr,
+               int flags)
+ {
++    // TODO: merge with comm_openex() when IpAddress becomes NetAddress
++
+     int new_socket;
 -
+     PROF_start(comm_open);
 -    debugs(50, 3, "comm_openex: Attempt open socket for: " << addr->sun_path);
+     /* Create socket for accepting new connections. */
+     statCounter.syscalls.sock.sockets++;
+     /* Setup the socket addrinfo details for use */
++    struct addrinfo AI;
+     AI.ai_flags = 0;
+     AI.ai_family = PF_UNIX;
+     AI.ai_socktype = sock_type;
+     AI.ai_protocol = proto;
+     AI.ai_addrlen = SUN_LEN(addr);
+     AI.ai_addr = (sockaddr*)addr;
+     AI.ai_canonname = NULL;
+     AI.ai_next = NULL;
 -            debugs(50, DBG_IMPORTANT, "comm_open: socket failure: " << xstrerror());
++    debugs(50, 3, HERE << "Attempt open socket for: " << addr->sun_path);
+     if ((new_socket = socket(AI.ai_family, AI.ai_socktype, AI.ai_protocol)) < 0) {
+         /* Increase the number of reserved fd's if calls to socket()
+          * are failing because the open file table is full.  This
+          * limits the number of simultaneous clients */
+         if (limitError(errno)) {
 -            debugs(50, DBG_CRITICAL, "comm_open: socket failure: " << xstrerror());
++            debugs(50, DBG_IMPORTANT, HERE << "socket failure: " << xstrerror());
+             fdAdjustReserved();
+         } else {
 -    debugs(50, 3, "comm_openex: Opened socket FD " << new_socket << " : family=" << AI.ai_family << ", type=" << AI.ai_socktype << ", protocol=" << AI.ai_protocol );
++            debugs(50, DBG_CRITICAL, HERE << "socket failure: " << xstrerror());
+         }
+         PROF_stop(comm_open);
+         return -1;
+     }
 -    debugs(50, 5, "comm_open: FD " << new_socket << " is a new socket");
++    debugs(50, 3, HERE "Opened UDS FD " << new_socket << " : family=" << AI.ai_family << ", type=" << AI.ai_socktype << ", protocol=" << AI.ai_protocol);
+     /* update fdstat */
 -
++    debugs(50, 5, HERE << "FD " << new_socket << " is a new socket");
+     assert(!isOpen(new_socket));
 -    fd_table[new_socket].sock_family = AI.ai_family;
 -
 -    fd_table[new_socket].unix_addr = *addr;
+     fd_open(new_socket, FD_SOCKET, NULL);
+     fdd_table[new_socket].close_file = NULL;
++
+     fdd_table[new_socket].close_line = 0;
++    fd_table[new_socket].unix_addr = *addr;
++
++    fd_table[new_socket].sock_family = AI.ai_family;
++
+     if (!(flags & COMM_NOCLOEXEC))
+         commSetCloseOnExec(new_socket);
+     if (flags & COMM_REUSEADDR)
+         commSetReuseAddr(new_socket);
+     if (flags & COMM_NONBLOCKING) {
+         if (commSetNonBlocking(new_socket) != COMM_OK) {
+             comm_close(new_socket);
+             PROF_stop(comm_open);
+             return -1;
+         }
+     }
+     if (flags & COMM_DOBIND) {
+         if (commBind(new_socket, AI) != COMM_OK) {
+             comm_close(new_socket);
+             PROF_stop(comm_open);
+             return -1;
+         }
+     }
+ #ifdef TCP_NODELAY
+     if (sock_type == SOCK_STREAM)
+         commSetTcpNoDelay(new_socket);
+ #endif
+     if (Config.tcpRcvBufsz > 0 && sock_type == SOCK_STREAM)
+         commSetTcpRcvbuf(new_socket, Config.tcpRcvBufsz);
+     PROF_stop(comm_open);
+     return new_socket;
+ }
diff --cc src/debug.cc
index b82ffdd878e55bbdd51f6b5b4159d551e9d2522d,b82ffdd878e55bbdd51f6b5b4159d551e9d2522d..41b4907ad4176b360bebc978665431d957021325
@@@ -36,6 -36,6 +36,7 @@@
  #include "Debug.h"
  #include "SquidTime.h"
  #include "util.h"
++#include "ipc/Kids.h"
  
  /* for shutting_down flag in xassert() */
  #include "globals.h"
@@@ -52,6 -52,6 +53,7 @@@ FILE *debug_log = NULL
  static char *debug_log_file = NULL;
  static int Ctx_Lock = 0;
  static const char *debugLogTime(void);
++static const char *debugLogKid(void);
  static void ctx_print(void);
  #if HAVE_SYSLOG
  #ifdef LOG_LOCAL4
@@@ -117,8 -117,8 +119,9 @@@ _db_print(const char *format,...
      va_start(args2, format);
      va_start(args3, format);
  
--    snprintf(f, BUFSIZ, "%s| %s",
++    snprintf(f, BUFSIZ, "%s%s| %s",
               debugLogTime(),
++             debugLogKid(),
               format);
  
      _db_print_file(f, args1);
@@@ -543,6 -543,6 +546,18 @@@ debugLogTime(void
      return buf;
  }
  
++static const char *
++debugLogKid(void)
++{
++    if (KidIdentifier != 0) {
++              static char buf[16];
++        snprintf(buf, sizeof(buf), " kid%d", KidIdentifier);
++              return buf;
++    }
++
++    return "";
++}
++
  void
  xassert(const char *msg, const char *file, int line)
  {
diff --cc src/fde.h
index a0b903d1a3d43ebf4c2c9cc1566575757c311ba1,68a6db6cc53ab41d3a57b931f762dcd3c609b547..6a7365c0410e03025dba2046375aa748f0503b3b
+++ b/src/fde.h
@@@ -60,6 -60,7 +60,7 @@@ public
      int sock_family;
      char ipaddr[MAX_IPSTRLEN];            /* dotted decimal address of peer */
      char desc[FD_DESC_SZ];
 -    struct sockaddr_un unix_addr;         // store address of unix domain socket when it's needed
++    struct sockaddr_un unix_addr; ///< unix domain socket (UDS) address, if any
  
      struct {
          unsigned int open:1;
index 0000000000000000000000000000000000000000,3a11401fce7410b42391ae28541f8a76bd952970..b0e4893e249a8f6db6f1a51b7e32bf5bf945a430
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,63 +1,64 @@@
 -    Port(coordinatorPathAddr)
+ /*
+  * $Id$
+  *
+  * DEBUG: section 54    Interprocess Communication
+  *
+  */
+ #include "config.h"
+ #include "ipc/Coordinator.h"
+ CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
+ Ipc::Coordinator::Coordinator():
 -    listen();
++    Port(coordinatorAddr)
+ {
+ }
+ void Ipc::Coordinator::start()
+ {
 -void Ipc::Coordinator::enrollStrand(const StrandData& strand)
++    Port::start();
+ }
+ Ipc::StrandData* Ipc::Coordinator::findStrand(int kidId)
+ {
+     for (Vector<StrandData>::iterator iter = strands.begin(); iter != strands.end(); ++iter) {
+         if (iter->kidId == kidId)
+             return &(*iter);
+     }
+     return NULL;
+ }
 -void Ipc::Coordinator::handleRead(const Message& message)
++void Ipc::Coordinator::registerStrand(const StrandData& strand)
+ {
+     if (StrandData* found = findStrand(strand.kidId))
+         *found = strand;
+     else
+         strands.push_back(strand);
+ }
 -    case mtRegister:
++void Ipc::Coordinator::receive(const Message& message)
+ {
+     switch (message.type()) {
 -        debugs(54, 6, HERE << "Unhandled message of type: " << message.type());
++    case mtRegistration:
+         debugs(54, 6, HERE << "Registration request");
+         handleRegistrationRequest(message.strand());
+         break;
+     default:
 -    // register strand
 -    enrollStrand(strand);
 -    // send back received message
 -    SendMessage(makeAddr(strandPathAddr, strand.kidId), Message(mtRegister, strand.kidId, strand.pid));
++        debugs(54, 6, HERE << "Unhandled message type: " << message.type());
+         break;
+     }
+ }
+ void Ipc::Coordinator::handleRegistrationRequest(const StrandData& strand)
+ {
++    registerStrand(strand);
++
++    // send back an acknowledgement; TODO: remove as not needed?
++    SendMessage(MakeAddr(strandAddrPfx, strand.kidId),
++        Message(mtRegistration, strand.kidId, strand.pid));
+ }
index 0000000000000000000000000000000000000000,a3006fe907e0817ecf0da782dbdc342d98f0ad3a..c4620dc97026510803e67b36f8d7733dbab8b8df
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,52 +1,46 @@@
 -#include "SquidString.h"
+ /*
+  * $Id$
+  *
+  * DEBUG: section 54    Interprocess Communication
+  *
+  */
+ #ifndef SQUID_IPC_COORDINATOR_H
+ #define SQUID_IPC_COORDINATOR_H
+ #include "Array.h"
 -
 -/**
 -  Coordinator processes incoming queries about registration of running squid instances
 -  and store it KidIndentiers.
 -*/
 -class Coordinator: public Port, public RefCountable
+ #include "ipc/Port.h"
+ namespace Ipc
+ {
 -private:
 -    Coordinator(const Coordinator&); // not implemented
 -    Coordinator& operator =(const Coordinator&); // not implemented
 -
++///  Coordinates shared activities of Strands (Squid processes or threads)
++class Coordinator: public Port
+ {
 -public:
 -    virtual void start();
+ public:
+     Coordinator();
 -private:
 -    virtual void handleRead(const Message& message);
 -    StrandData* findStrand(int kidId);
 -    void enrollStrand(const StrandData& strand);
 -    void handleRegistrationRequest(const StrandData& strand);
++protected:
++    virtual void start(); // Port (AsyncJob) API
++    virtual void receive(const Message& message); // Port API
 -    Vector<StrandData> strands;
++    StrandData* findStrand(int kidId); ///< registered strand or NULL
++    void registerStrand(const StrandData &); ///< adds or updates existing
++    void handleRegistrationRequest(const StrandData &); ///< registers and ACKs
+ private:
 -}
++    Vector<StrandData> strands; ///< registered processes and threads
+     CBDATA_CLASS2(Coordinator);
++
++private:
++    Coordinator(const Coordinator&); // not implemented
++    Coordinator& operator =(const Coordinator&); // not implemented
+ };
++} // namespace Ipc
+ #endif /* SQUID_IPC_COORDINATOR_H */
diff --cc src/ipc/Kids.cc
index cf97689b4525126c9d12bc2e3303ffa8e9cda2ec,98932b8aaca46f73dd6ad93577d1f2a048c75a96..ede9d15d77896577883185d85d04e776830f6b27
@@@ -26,11 -26,16 +26,19 @@@ void Kids::init(size_t n
  
      storage.reserve(n);
  
+     char kid_name[32];
++
++    // add Kid records for all n main strands
      for (size_t i = 1; i <= n; ++i) {
-         char kid_name[32];
          snprintf(kid_name, sizeof(kid_name), "(squid-%d)", (int)i);
          storage.push_back(Kid(kid_name));
      }
++    // if coordination is needed, add a Kid record for Coordinator
+     if (n > 1) {
+         snprintf(kid_name, sizeof(kid_name), "(squid-coord-%d)", (int)(n + 1));
+         storage.push_back(Kid(kid_name));
+     }
  }
  
  /// returns kid by pid
diff --cc src/ipc/Port.cc
index 0000000000000000000000000000000000000000,1dcd7ff6549ef2b1df1031b019f12508833ffb8d..5a01a700e619bef3b0914088dbcdea157029f8de
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,51 +1,64 @@@
 -
 -const char Ipc::coordinatorPathAddr[] = DEFAULT_PREFIX "/ipc/coordinator";
 -const char Ipc::strandPathAddr[] = DEFAULT_PREFIX "/ipc/squid";
+ /*
+  * $Id$
+  *
+  * DEBUG: section 54    Interprocess Communication
+  *
+  */
+ #include "config.h"
+ #include "ipc/Port.h"
 -    UdsOp(aListenAddr),
 -    listenAddr(aListenAddr)
++const char Ipc::coordinatorAddr[] = DEFAULT_PREFIX "/var/run/coordinator.ipc";
++const char Ipc::strandAddrPfx[] = DEFAULT_PREFIX "/var/run/squid";
+ Ipc::Port::Port(const String& aListenAddr):
 -    assert(listenAddr.size() > sizeof(DEFAULT_PREFIX));
++    UdsOp(aListenAddr)
+ {
 -    comm_read(fd(), message.rawData(), message.size(), readHandler);
++    setOptions(COMM_NONBLOCKING | COMM_DOBIND);
++}
++
++void Ipc::Port::start()
++{
++    UdsOp::start();
++    listen();
+ }
+ void Ipc::Port::listen()
+ {
+     debugs(54, 6, HERE);
+     AsyncCall::Pointer readHandler = asyncCall(54, 6, "Ipc::Port::noteRead",
+         CommCbMemFunT<Port, CommIoCbParams>(this, &Port::noteRead));
 -String Ipc::Port::makeAddr(const char* pathAddr, int id) const
++    comm_read(fd(), buf.raw(), buf.size(), readHandler);
+ }
 -    debugs(54, 6, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
 -    assert(params.data == this);
++bool Ipc::Port::doneAll() const
++{
++    return false; // listen forever
++}
++
++String Ipc::Port::MakeAddr(const char* pathAddr, int id)
+ {
+     assert(id >= 0);
+     String addr = pathAddr;
+     addr.append('-');
+     addr.append(xitoa(id));
++    addr.append(".ipc");
+     return addr;
+ }
+ void Ipc::Port::noteRead(const CommIoCbParams& params)
+ {
 -        assert(params.buf == (char*)&message);
 -        assert(params.size == sizeof(Message));
 -        handleRead(message);
++    debugs(54, 6, HERE << "FD " << params.fd << " flag " << params.flag <<
++        " [" << this << ']');
+     if (params.flag == COMM_OK) {
++        assert(params.buf == buf.raw());
++        assert(params.size == buf.size());
++        receive(buf);
+     }
++    // TODO: if there was a fatal error on our socket, close the socket before
++      // trying to listen again and print a level-1 error message.
++
+     listen();
+ }
diff --cc src/ipc/Port.h
index 0000000000000000000000000000000000000000,00332d33d1f01c00c1b94e986eea6a245bab6441..8c87280cc23ca2f891f581c60bd0d316baec0bf4
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,53 +1,53 @@@
 -/**
 -   Base class implements functionality of local endpoint
 -   is listening incoming connections
 -*/
+ /*
+  * $Id$
+  *
+  * DEBUG: section 54    Interprocess Communication
+  *
+  */
+ #ifndef SQUID_IPC_PORT_H
+ #define SQUID_IPC_PORT_H
+ #include "SquidString.h"
+ #include "ipc/UdsOp.h"
+ namespace Ipc
+ {
 -    Port(const String& aListenAddr);
++/// Waits for and receives incoming IPC messages; kids handle the messages
+ class Port: public UdsOp
+ {
+ public:
 -public:
 -    /// start listening of incoming connections
++    Port(const String &aListenAddr);
 -    String makeAddr(const char* pathAddr, int id) const;
++protected:
++    /// calculates IPC message address for strand #id at path
++    static String MakeAddr(const char *path, int id);
++
++    virtual void start() = 0; // UdsOp (AsyncJob) API; has body
++    virtual bool doneAll() const; // UdsOp (AsyncJob) API
++
++    /// read the next incoming message
+     void listen();
 -protected:
 -    virtual void handleRead(const Message& message) = 0;
 -    void noteRead(const CommIoCbParams& params);
++    /// handle IPC message just read
++    virtual void receive(const Message& message) = 0;
+ private:
 -    String listenAddr;
 -    Message message;
++    void noteRead(const CommIoCbParams &params); // Comm callback API
+ private:
 -extern const char coordinatorPathAddr[];
 -extern const char strandPathAddr[];
 -
++    Message buf; ///< UDS read buffer filled by Comm
+ };
 -}
++extern const char coordinatorAddr[]; ///< where coordinator listens
++extern const char strandAddrPfx[]; ///< strand's listening address prefix
++} // namespace Ipc
+ #endif /* SQUID_IPC_PORT_H */
index 0000000000000000000000000000000000000000,510a2195612b382445c7607480125afa8e626f14..a220d3bf5ea019379ded3c9709362303ab1a21af
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,84 +1,70 @@@
 -
+ /*
+  * $Id$
+  *
+  * DEBUG: section 54    Interprocess Communication
+  *
+  */
 -    Port(makeAddr(strandPathAddr, KidIdentifier)),
+ #include "config.h"
+ #include "ipc/Strand.h"
+ #include "ipc/Kids.h"
+ CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
+ Ipc::Strand::Strand():
 -    listen();
 -    setListenTimeout(&Strand::noteRegistrationTimeout, 6);
 -    enroll();
++    Port(MakeAddr(strandAddrPfx, KidIdentifier)),
+     isRegistered(false)
+ {
+ }
+ void Ipc::Strand::start()
+ {
 -void Ipc::Strand::enroll()
++    Port::start();
++    registerSelf();
+ }
 -    assert(!registered());
 -    SendMessage(coordinatorPathAddr, Message(mtRegister, KidIdentifier, getpid()));
++void Ipc::Strand::registerSelf()
+ {
+     debugs(54, 6, HERE);
 -void Ipc::Strand::handleRead(const Message& message)
++    Must(!isRegistered);
++    SendMessage(coordinatorAddr,
++              Message(mtRegistration, KidIdentifier, getpid()));
++    setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
+ }
 -    case mtRegister:
++void Ipc::Strand::receive(const Message& message)
+ {
+     debugs(54, 6, HERE);
+     switch (message.type()) {
 -        debugs(54, 6, HERE << "Unhandled message of type: " << message.type());
++    case mtRegistration:
+         handleRegistrationResponse(message.strand());
+         break;
+     default:
 -    // handle registration respond from coordinator
 -    // coordinator returns the same message
 -    isRegistered = (strand.kidId == KidIdentifier && strand.pid == getpid());
 -    debugs(54, 6, "Kid " << KidIdentifier << " is " << (char*)(isRegistered ? "" : "NOT ") << "registered");
 -    setListenTimeout(NULL, -1);
 -}
 -
 -void Ipc::Strand::setListenTimeout(TimeoutHandler timeoutHandler, int timeout)
 -{
 -    AsyncCall::Pointer listenTimeoutHandler = NULL;
 -    if (timeout > 0) {
 -        assert(timeoutHandler != NULL);
 -        listenTimeoutHandler = asyncCall(54, 6, "Ipc::Strand::timeoutHandler",
 -            CommCbMemFunT<Strand, CommTimeoutCbParams>(this, timeoutHandler));
 -    }
 -    setTimeout(listenTimeoutHandler, timeout);
 -}
 -
 -void Ipc::Strand::noteRegistrationTimeout(const CommTimeoutCbParams& params)
 -{
 -    debugs(54, 6, HERE);
 -    if (!registered()) {
 -        debugs(54, 6, HERE << "Kid " << KidIdentifier << " is not registered");
 -        exit(1);
++        debugs(54, 6, HERE << "Unhandled message type: " << message.type());
+         break;
+     }
+ }
+ void Ipc::Strand::handleRegistrationResponse(const StrandData& strand)
+ {
 -bool Ipc::Strand::registered() const
++    // handle registration response from the coordinator; it could be stale
++    if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
++        debugs(54, 6, "kid" << KidIdentifier << " registered");
++        clearTimeout(); // we are done
++    } else {
++        // could be an ACK to the registration message of our dead predecessor
++        debugs(54, 6, "kid" << KidIdentifier << " is not yet registered");
++        // keep listening, with a timeout
+     }
+ }
 -    return isRegistered;
++void Ipc::Strand::timedout()
+ {
++    debugs(54, 6, HERE << isRegistered);
++    if (!isRegistered)
++        fatalf("kid%d registration timed out", KidIdentifier);
+ }
index 0000000000000000000000000000000000000000,785e1451fc1f93d4a4d0b32c1127bdb2670144ca..da207d005cc32e8cc083790b162d958c2f291653
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,58 +1,48 @@@
 -
 -#include "SquidString.h"
+ /*
+  * $Id$
+  *
+  * DEBUG: section 54    Interprocess Communication
+  *
+  */
+ #ifndef SQUID_IPC_STRAND_H
+ #define SQUID_IPC_STRAND_H
 -/**
 -  Strand implement functionality of Coordinator's client and
 -  send registration query to Coordinator with it KidIdentifier.
 -*/
 -class Strand: public Port, public RefCountable
+ #include "ipc/Port.h"
+ namespace Ipc
+ {
 -private:
 -    typedef void (Strand::*TimeoutHandler)(const CommTimeoutCbParams&);
 -
 -private:
 -    Strand(const Strand&); // not implemented
 -    Strand& operator =(const Strand&); // not implemented
 -
++/// Receives coordination messages on behalf of its process or thread
++class Strand: public Port
+ {
 -public:
 -    virtual void start();
 -    /// send register query
 -    void enroll();
 -    bool registered() const;
+ public:
+     Strand();
 -    virtual void handleRead(const Message& message);
++    virtual void start(); // Port (AsyncJob) API
++
++protected:
++    virtual void timedout(); // Port (UsdOp) API
++    virtual void receive(const Message& message); // Port API
+ private:
 -    void setListenTimeout(TimeoutHandler timeoutHandler, int timeout);
 -    void noteRegistrationTimeout(const CommTimeoutCbParams& params);
++    void registerSelf(); /// let Coordinator know this strand exists
+     void handleRegistrationResponse(const StrandData& strand);
 -    bool isRegistered;
+ private:
++    bool isRegistered; ///< whether Coordinator ACKed registration (unused)
+     CBDATA_CLASS2(Strand);
++
++private:
++    Strand(const Strand&); // not implemented
++    Strand& operator =(const Strand&); // not implemented
+ };
+ }
+ #endif /* SQUID_IPC_STRAND_H */
index 0000000000000000000000000000000000000000,0fdcd4eeccf47aa3a81601588e727f1aef7497b4..5f7f7703c367a912631787ed1bae46c5b29b9711
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,156 +1,145 @@@
 -#define SEND_RETRIES 4
 -#define SEND_TIMEOUT 4
+ /*
+  * $Id$
+  *
+  * DEBUG: section 54    Interprocess Communication
+  *
+  */
+ #include "config.h"
+ #include "comm.h"
+ #include "ipc/UdsOp.h"
 -Ipc::Message::Message():
 -    data()
 -Ipc::MessageType Ipc::Message::type() const
++Ipc::Message::Message()
+ {
++    data.messageType = mtNone;
++    data.strand.kidId = -1;
+ }
+ Ipc::Message::Message(MessageType messageType, int kidId, pid_t pid)
+ {
+     data.messageType = messageType;
+     data.strand.kidId = kidId;
+     data.strand.pid = pid;
+ }
 -    return data.messageType;
++const Ipc::StrandData &Ipc::Message::strand() const
+ {
 -const Ipc::StrandData& Ipc::Message::strand() const
 -{
 -    return data.strand;
 -}
 -
 -char* Ipc::Message::rawData()
 -{
 -    return (char*)&data;
 -}
 -
 -size_t Ipc::Message::size()
 -{
 -    return sizeof(data);
 -}
 -
 -
 -Ipc::UdsOp::UdsOp(const String& pathAddr, bool bind /* = true */):
++    Must(data.messageType == mtRegistration);
++      return data.strand;
+ }
 -    debugs(54, 5, HERE << '[' << this << "] pathAddr " << pathAddr);
 -    if (bind) {
 -        unlink(pathAddr.termedBuf());
 -        options |= COMM_DOBIND;
 -    }
++Ipc::UdsOp::UdsOp(const String& pathAddr):
+     AsyncJob("Ipc::UdsOp"),
+     addr(setAddr(pathAddr)),
+     options(COMM_NONBLOCKING),
+     fd_(-1)
+ {
 -    if (fd_ > 0)
++    debugs(54, 5, HERE << '[' << this << "] pathAddr=" << pathAddr);
+ }
+ Ipc::UdsOp::~UdsOp()
+ {
+     debugs(54, 5, HERE << '[' << this << ']');
 -bool Ipc::UdsOp::doneAll() const
++    if (fd_ >= 0)
+         comm_close(fd_);
+ }
 -    return false;
++void Ipc::UdsOp::setOptions(int newOptions)
+ {
 -        Must(fd_ > 0);
++    options = newOptions;
+ }
+ int Ipc::UdsOp::fd()
+ {
+     if (fd_ < 0) {
++        if (options & COMM_DOBIND)
++            unlink(addr.sun_path);
+         fd_ = comm_open_uds(SOCK_DGRAM, 0, &addr, options);
 -void Ipc::UdsOp::setTimeout(AsyncCall::Pointer& timeoutHandler, int timeout)
++        Must(fd_ >= 0);
+     }
+     return fd_;
+ }
+ struct sockaddr_un Ipc::UdsOp::setAddr(const String& pathAddr)
+ {
+     assert(pathAddr.size() != 0);
+     struct sockaddr_un unixAddr;
+     memset(&unixAddr, 0, sizeof(unixAddr));
+     unixAddr.sun_family = AF_LOCAL;
+     xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path));
+     return unixAddr;
+ }
 -    commSetTimeout(fd(), timeout, timeoutHandler);
++void Ipc::UdsOp::setTimeout(int seconds, const char *handlerName)
+ {
 -    UdsOp(pathAddr, false),
++    AsyncCall::Pointer handler = asyncCall(54,5, handlerName,
++        CommCbMemFunT<UdsOp, CommTimeoutCbParams>(this,
++            &UdsOp::noteTimeout));
++    commSetTimeout(fd(), seconds, handler);
++}
++
++void Ipc::UdsOp::clearTimeout()
++{
++    commSetTimeout(fd(), -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd)
++}
++
++void Ipc::UdsOp::noteTimeout(const CommTimeoutCbParams &)
++{
++    timedout(); // our kid handles communication timeout
+ }
+ CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender);
+ Ipc::UdsSender::UdsSender(const String& pathAddr, const Message& aMessage):
 -    retries(SEND_RETRIES),
 -    timeout(SEND_TIMEOUT)
++    UdsOp(pathAddr),
+     message(aMessage),
 -    assert(retries > 0);
 -    assert(timeout >= 0);
++    retries(4), // TODO: make configurable?
++    timeout(5), // TODO: make configurable?
++    writing(false)
+ {
 -    {
 -        AsyncCall::Pointer timeoutHandler = asyncCall(54, 5, "Ipc::UdsSender::noteTimeout",
 -            CommCbMemFunT<UdsSender, CommTimeoutCbParams>(this, &UdsSender::noteTimeout));
 -        setTimeout(timeoutHandler, timeout);
 -    }
+ }
+ void Ipc::UdsSender::start()
+ {
++    UdsOp::start();
+     write();
+     if (timeout > 0)
 -bool Ipc::UdsSender::retry()
++        setTimeout(timeout, "Ipc::UdsSender::noteTimeout");
+ }
 -    if (retries > 0)
 -        --retries;
 -    return retries != 0;
++bool Ipc::UdsSender::doneAll() const
+ {
 -    AsyncCall::Pointer writeHandler = asyncCall(54, 5, "Ipc::UdsSender::noteWrite",
 -        CommCbMemFunT<UdsSender, CommIoCbParams>(this, &UdsSender::noteWrite));
 -    comm_write(fd(), message.rawData(), message.size(), writeHandler);
++    return !writing && UdsOp::doneAll();
+ }
+ void Ipc::UdsSender::write()
+ {
+     debugs(54, 5, HERE);
 -void Ipc::UdsSender::noteWrite(const CommIoCbParams& params)
++    AsyncCall::Pointer writeHandler = asyncCall(54, 5, "Ipc::UdsSender::wrote",
++        CommCbMemFunT<UdsSender, CommIoCbParams>(this, &UdsSender::wrote));
++    comm_write(fd(), message.raw(), message.size(), writeHandler);
++    writing = true;
+ }
 -    if (params.flag == COMM_OK || !retry())
 -        mustStop("done");
 -    else
 -        write();
++void Ipc::UdsSender::wrote(const CommIoCbParams& params)
+ {
+     debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
 -void Ipc::UdsSender::noteTimeout(const CommTimeoutCbParams& params)
++    writing = false;
++    if (params.flag != COMM_OK && retries-- > 0)
++        write(); // XXX: should we close on error so that fd() reopens?
+ }
 -    mustStop("done");
++void Ipc::UdsSender::timedout()
+ {
+     debugs(54, 5, HERE);
++    mustStop("timedout");
+ }
+ void Ipc::SendMessage(const String& toAddress, const Message& message)
+ {
+     AsyncJob::AsyncStart(new UdsSender(toAddress, message));
+ }
diff --cc src/ipc/UdsOp.h
index 0000000000000000000000000000000000000000,66c4ed48758214b7c841ad9cbc5bb1033d9fc783..7801f25b98f15bde3ae166785036a163ed7a8d46
mode 000000,100644..100644
--- /dev/null
@@@ -1,0 -1,119 +1,124 @@@
 -typedef enum {mtNone = 0, mtRegister} MessageType;
+ /*
+  * $Id$
+  *
+  * DEBUG: section 54    Interprocess Communication
+  *
+  */
+ #ifndef SQUID_IPC_ASYNCUDSOP_H
+ #define SQUID_IPC_ASYNCUDSOP_H
+ #include "SquidString.h"
+ #include "CommCalls.h"
+ namespace Ipc
+ {
 -/**
 -   This one contains a registration info
 -*/
++typedef enum { mtNone = 0, mtRegistration } MessageType;
 -    int   kidId;
++/// Strand registration information
+ struct StrandData
+ {
 -/**
 -  This class contains data is used by UdsSender/UdsReceiver.
 -*/
++    int kidId;
+     pid_t pid;
+ };
 -public:
 -    MessageType type() const;
++/// information sent or received during IPC
+ class Message
+ {
+ public:
+     Message();
+     Message(MessageType messageType, int kidId, pid_t pid);
 -    char*  rawData();
 -    size_t size();
++    /// raw, type-independent access
++    int type() const { return data.messageType; }
++    char *raw() { return reinterpret_cast<char*>(&data); }
++    const char *raw() const { return reinterpret_cast<const char*>(&data); }
++    size_t size() const { return sizeof(data); }
++
++    /// type-dependent access
+     const StrandData& strand() const;
 -    struct {
 -        MessageType messageType;
 -        StrandData  strand;
 -    } data;
+ private:
 -/**
 -  UdsOp implements common async UDS operation.
 -*/
++    struct Data {
++        int messageType;
++        StrandData strand;
++        // TODO: redesign to better handle many type-specific datas like strand
++    } data; ///< everything being sent or received
+ };
 -private:
 -    UdsOp(const UdsOp&); // not implemented
 -    UdsOp& operator= (const UdsOp&); // not implemented
 -
++/// code shared by unix-domain socket senders (e.g., UdsSender or Coordinator)
++/// and receivers (e.g. Port or Coordinator)
+ class UdsOp: public AsyncJob
+ {
 -    UdsOp(const String& pathAddr, bool bind = true);
+ public:
 -    /// return an endpoint for communication, use fd() instead of fd_
 -    int fd();
 -    virtual bool doneAll() const;
 -    void setTimeout(AsyncCall::Pointer& timeoutHandler, int aTimeout);
++    UdsOp(const String &pathAddr);
+     virtual ~UdsOp();
+ protected:
 -    struct sockaddr_un setAddr(const String& pathAddr);
++    virtual void timedout() {} ///< called after setTimeout() if timed out
++
++    int fd(); ///< creates if needed and returns raw UDS socket descriptor
++
++    /// call timedout() if no UDS messages in a given number of seconds
++    void setTimeout(int seconds, const char *handlerName);
++    void clearTimeout(); ///< remove previously set timeout, if any
++
++      void setOptions(int newOptions); ///< changes socket options
++
++private:
++    /// Comm timeout callback; calls timedout()
++    void noteTimeout(const CommTimeoutCbParams &p);
++
++    /// configures addr member
++    struct sockaddr_un setAddr(const String &pathAddr);
+ private:
 -    struct sockaddr_un addr;
 -    int  options;
 -    int  fd_;
++    struct sockaddr_un addr; ///< UDS address
++    int options; ///< UDS options
++    int fd_; ///< UDS descriptor
+ private:
 -/**
 -  Implement async write operation for UDS
 -*/
++    UdsOp(const UdsOp &); // not implemented
++    UdsOp &operator= (const UdsOp &); // not implemented
+ };
 -private:
 -    UdsSender(const UdsSender&); // not implemented
 -    UdsSender& operator= (const UdsSender&); // not implemented
 -
++// XXX: move UdsSender code to UdsSender.{cc,h}
++/// attempts to send an IPC message a few times, with a timeout
+ class UdsSender: public UdsOp
+ {
 -public:
 -    /// start writing data
 -    virtual void start();
+ public:
+     UdsSender(const String& pathAddr, const Message& aMessage);
 -    /// update retries counter and check
 -    bool retry();
 -    /// schedule writing
 -    void write();
 -    void noteWrite(const CommIoCbParams& params);
 -    void noteTimeout(const CommTimeoutCbParams& params);
++protected:
++    virtual void start(); // UdsOp (AsyncJob) API
++    virtual bool doneAll() const; // UdsOp (AsyncJob) API
++    virtual void timedout(); // UdsOp API
+ private:
 -    Message message;
 -    int retries;
 -    int timeout;
++    void write(); ///< schedule writing
++    void wrote(const CommIoCbParams& params); ///< done writing or error
+ private:
++    Message message; ///< what to send
++    int retries; ///< how many times to try after a write error
++    int timeout; ///< total time to send the message
++    bool writing; ///< whether Comm started and did not finish writing
+     CBDATA_CLASS2(UdsSender);
++
++private:
++    UdsSender(const UdsSender&); // not implemented
++    UdsSender& operator= (const UdsSender&); // not implemented
+ };
+ void SendMessage(const String& toAddress, const Message& message);
+ }
+ #endif /* SQUID_IPC_ASYNCUDSOP_H */