]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
Merged from parent (trunk r10600).
authorAlex Rousskov <rousskov@measurement-factory.com>
Tue, 6 Jul 2010 23:09:44 +0000 (17:09 -0600)
committerAlex Rousskov <rousskov@measurement-factory.com>
Tue, 6 Jul 2010 23:09:44 +0000 (17:09 -0600)
29 files changed:
1  2 
configure.in
doc/release-notes/release-3.2.sgml
src/Makefile.am
src/cache_cf.cc
src/cf.data.pre
src/client_side.cc
src/comm.cc
src/comm.h
src/comm/ListenStateData.cc
src/comm/ListenStateData.h
src/debug.cc
src/enums.h
src/htcp.cc
src/icp_v2.cc
src/ip/Address.cc
src/ip/Address.h
src/ipc/Coordinator.cc
src/ipc/SharedListen.cc
src/ipc/SharedListen.h
src/ipc/StartListening.cc
src/ipc/StartListening.h
src/ipc/Strand.cc
src/ipc/TypedMsgHdr.cc
src/ipc/UdsOp.cc
src/main.cc
src/protos.h
src/snmp_core.cc
src/structs.h
src/tools.cc

diff --cc configure.in
Simple merge
Simple merge
diff --cc src/Makefile.am
index b4d2730cca3b93dddf7424076a9b9acca0d4cf63,589a298bb7115e453a538d0a1b9cc996f87f6195..e73f6b17c55e1e22295c8b648e3d47c3c6a2f632
@@@ -163,9 -161,10 +161,11 @@@ COMMON_LIBS = 
        acl/libstate.la \
        auth/libauth.la \
        acl/libapi.la \
+       base/libbase.la \
+       libsquid.la \
        ip/libip.la \
 -      fs/libfs.la
 +      fs/libfs.la \
 +      ipc/libipc.la
  
  EXTRA_PROGRAMS = \
        DiskIO/DiskDaemon/diskd \
diff --cc src/cache_cf.cc
Simple merge
diff --cc src/cf.data.pre
index eef92d8b5028ad40b720522b38c927deadf0f168,b7300e91a3d6eef561f70eab1449e6d6dbbec9ae..090b000cbee8b9efdda211955b42a6abc71de517
@@@ -6844,18 -7005,17 +7042,31 @@@ DOC_STAR
        Whether to lookup the EUI or MAC address of a connected client.
  DOC_END
  
+ NAME: max_filedescriptors max_filedesc
+ TYPE: int
+ DEFAULT: 0
+ LOC: Config.max_filedescriptors
+ DOC_START
+       The maximum number of filedescriptors supported.
+       The default "0" means Squid inherits the current ulimit setting.
+       Note: Changing this requires a restart of Squid. Also
+       not all comm loops supports large values.
+ DOC_END
 +NAME: workers
 +TYPE: int
 +LOC: Config.workers
 +DEFAULT: 1
 +DOC_START
 +      Number of main Squid processes or "workers" to fork and maintain.
 +      0: "no daemon" mode, like running "squid -N ..."
 +      1: "no SMP" mode, start one main Squid process daemon (default)
 +      N: start N main Squid process daemons (i.e., SMP mode)
 +
 +      In SMP mode, each worker does nearly all what a single Squid daemon
 +      does (e.g., listen on http_port and forward HTTP requests).
 +DOC_END
 +
  EOF
index 228c6fb7cff9a33ee2c70afffe244cc218f2aa20,a5391ee42b21f31e0dde8b2330f3904fae7a6638..ba03e748f09083a591b8f3a63d0f79408239ae9e
@@@ -93,6 -93,6 +93,7 @@@
  #include "clientStream.h"
  #include "comm.h"
  #include "comm/ListenStateData.h"
++#include "base/TextException.h"
  #include "ConnectionDetail.h"
  #include "eui/Config.h"
  #include "fde.h"
  #include "HttpRequest.h"
  #include "ident/Config.h"
  #include "ident/Ident.h"
- #include "ip/IpIntercept.h"
+ #include "ip/Intercept.h"
 +#include "ipc/StartListening.h"
  #include "MemBuf.h"
  #include "MemObject.h"
  #include "ProtoPort.h"
diff --cc src/comm.cc
index 475d167bf6beac575f35375ffb7ebb5fc5d6e385,25fbda66a7542ee683cb049f8c327381d37aa73f..6330270bbd1055b60c0a6b53011d1ffa467f7f79
@@@ -71,8 -71,6 +71,8 @@@ typedef enum 
  
  static void commStopHalfClosedMonitor(int fd);
  static IOCB commHalfClosedReader;
- static void comm_init_opened(int new_socket, IpAddress &addr, unsigned char TOS, const char *note, struct addrinfo *AI);
- static int comm_apply_flags(int new_socket, IpAddress &addr, int flags, struct addrinfo *AI);
++static void comm_init_opened(int new_socket, Ip::Address &addr, unsigned char TOS, const char *note, struct addrinfo *AI);
++static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI);
  
  
  struct comm_io_callback_t {
@@@ -744,29 -750,6 +751,29 @@@ comm_openex(int sock_type
  
  #endif
  
-                  IpAddress &addr,
 +    comm_init_opened(new_socket, addr, TOS, note, AI);
 +    new_socket = comm_apply_flags(new_socket, addr, flags, AI);
 +
 +    addr.FreeAddrInfo(AI);
 +
 +    PROF_stop(comm_open);
 +
 +    return new_socket;
 +}
 +
 +/// update FD tables after a local or remote (IPC) comm_openex();
 +void
 +comm_init_opened(int new_socket,
++                 Ip::Address &addr,
 +                 unsigned char TOS,
 +                 const char *note,
 +                 struct addrinfo *AI)
 +{
 +    assert(new_socket >= 0);
 +    assert(AI);
 +
 +    fde *F = NULL;
 +
      /* update fdstat */
      debugs(5, 5, "comm_open: FD " << new_socket << " is a new socket");
  
      F->tos = TOS;
  
      F->sock_family = AI->ai_family;
-                  IpAddress &addr,
 +}
 +
 +/// apply flags after a local comm_open*() call;
 +/// returns new_socket or -1 on error
 +static int
 +comm_apply_flags(int new_socket,
++                 Ip::Address &addr,
 +                 int flags,
 +                 struct addrinfo *AI)
 +{
 +    assert(new_socket >= 0);
 +    assert(AI);
 +    const int sock_type = AI->ai_socktype;
  
      if (!(flags & COMM_NOCLOEXEC))
          commSetCloseOnExec(new_socket);
      return new_socket;
  }
  
-                    IpAddress &addr,
 +void
 +comm_import_opened(int fd,
++                   Ip::Address &addr,
 +                   int flags,
 +                   const char *note,
 +                   struct addrinfo *AI)
 +{
 +    debugs(5, 2, HERE << " FD " << fd << " at " << addr);
 +    assert(fd >= 0);
 +    assert(AI);
 +
 +    comm_init_opened(fd, addr, 0, note, AI);
 +
 +    if (!(flags & COMM_NOCLOEXEC))
 +        fd_table[fd].flags.close_on_exec = 1;
 +
 +    if (addr.GetPort() > (u_short) 0) {
 +#ifdef _SQUID_MSWIN_
 +        if (sock_type != SOCK_DGRAM)
 +#endif
 +            fd_table[fd].flags.nolinger = 1;
 +    }
 +
 +    if ((flags & COMM_TRANSPARENT))
 +        fd_table[fd].flags.transparent = 1;
 +
 +    if (flags & COMM_NONBLOCKING)
 +        fd_table[fd].flags.nonblocking = 1;
 +
 +#ifdef TCP_NODELAY
 +    if (AI->ai_socktype == SOCK_STREAM)
 +        fd_table[fd].flags.nodelay = 1;
 +#endif
 +
 +    /* no fd_table[fd].flags. updates needed for these conditions:
 +     * if ((flags & COMM_REUSEADDR)) ...
 +     * if ((flags & COMM_DOBIND) ...) ...
 +     */
 +}
 +
 +
  CBDATA_CLASS_INIT(ConnectStateData);
  
  void *
@@@ -2481,97 -2420,3 +2492,97 @@@ CommSelectEngine::checkEvents(int timeo
          return EVENT_ERROR;
      };
  }
-     // TODO: merge with comm_openex() when IpAddress becomes NetAddress
 +
 +/// Create a unix-domain socket (UDS) that only supports FD_MSGHDR I/O.
 +int
 +comm_open_uds(int sock_type,
 +              int proto,
 +              struct sockaddr_un* addr,
 +              int flags)
 +{
++    // TODO: merge with comm_openex() when Ip::Address becomes NetAddress
 +
 +    int new_socket;
 +
 +    PROF_start(comm_open);
 +    /* Create socket for accepting new connections. */
 +    statCounter.syscalls.sock.sockets++;
 +
 +    /* Setup the socket addrinfo details for use */
 +    struct addrinfo AI;
 +    AI.ai_flags = 0;
 +    AI.ai_family = PF_UNIX;
 +    AI.ai_socktype = sock_type;
 +    AI.ai_protocol = proto;
 +    AI.ai_addrlen = SUN_LEN(addr);
 +    AI.ai_addr = (sockaddr*)addr;
 +    AI.ai_canonname = NULL;
 +    AI.ai_next = NULL;
 +
 +    debugs(50, 3, HERE << "Attempt open socket for: " << addr->sun_path);
 +
 +    if ((new_socket = socket(AI.ai_family, AI.ai_socktype, AI.ai_protocol)) < 0) {
 +        /* Increase the number of reserved fd's if calls to socket()
 +         * are failing because the open file table is full.  This
 +         * limits the number of simultaneous clients */
 +
 +        if (limitError(errno)) {
 +            debugs(50, DBG_IMPORTANT, HERE << "socket failure: " << xstrerror());
 +            fdAdjustReserved();
 +        } else {
 +            debugs(50, DBG_CRITICAL, HERE << "socket failure: " << xstrerror());
 +        }
 +
 +        PROF_stop(comm_open);
 +        return -1;
 +    }
 +
 +    debugs(50, 3, HERE "Opened UDS FD " << new_socket << " : family=" << AI.ai_family << ", type=" << AI.ai_socktype << ", protocol=" << AI.ai_protocol);
 +
 +    /* update fdstat */
 +    debugs(50, 5, HERE << "FD " << new_socket << " is a new socket");
 +
 +    assert(!isOpen(new_socket));
 +    fd_open(new_socket, FD_MSGHDR, NULL);
 +
 +    fdd_table[new_socket].close_file = NULL;
 +
 +    fdd_table[new_socket].close_line = 0;
 +
 +    fd_table[new_socket].sock_family = AI.ai_family;
 +
 +    if (!(flags & COMM_NOCLOEXEC))
 +        commSetCloseOnExec(new_socket);
 +
 +    if (flags & COMM_REUSEADDR)
 +        commSetReuseAddr(new_socket);
 +
 +    if (flags & COMM_NONBLOCKING) {
 +        if (commSetNonBlocking(new_socket) != COMM_OK) {
 +            comm_close(new_socket);
 +            PROF_stop(comm_open);
 +            return -1;
 +        }
 +    }
 +
 +    if (flags & COMM_DOBIND) {
 +        if (commBind(new_socket, AI) != COMM_OK) {
 +            comm_close(new_socket);
 +            PROF_stop(comm_open);
 +            return -1;
 +        }
 +    }
 +
 +#ifdef TCP_NODELAY
 +    if (sock_type == SOCK_STREAM)
 +        commSetTcpNoDelay(new_socket);
 +
 +#endif
 +
 +    if (Config.tcpRcvBufsz > 0 && sock_type == SOCK_STREAM)
 +        commSetTcpRcvbuf(new_socket, Config.tcpRcvBufsz);
 +
 +    PROF_stop(comm_open);
 +
 +    return new_socket;
 +}
diff --cc src/comm.h
index 8cedf8ab9ff52fe40c7e8d737fac628a246c5ce2,331def76f0a85e62db961c3b1d9d33352c4532a6..5db4996a4025d202907aa2374c60dbe15ee7e992
@@@ -54,10 -52,7 +52,10 @@@ SQUIDCEXTERN int comm_connect_addr(int 
  SQUIDCEXTERN void comm_init(void);
  SQUIDCEXTERN void comm_exit(void);
  
- SQUIDCEXTERN int comm_open(int, int, IpAddress &, int, const char *note);
+ SQUIDCEXTERN int comm_open(int, int, Ip::Address &, int, const char *note);
 +SQUIDCEXTERN int comm_open_uds(int sock_type, int proto, struct sockaddr_un* addr, int flags);
 +/// update Comm state after getting a comm_open() FD from another process
- SQUIDCEXTERN void comm_import_opened(int fd, IpAddress &addr, int flags, const char *note, struct addrinfo *AI);
++SQUIDCEXTERN void comm_import_opened(int fd, Ip::Address &addr, int flags, const char *note, struct addrinfo *AI);
  
  /**
   * Open a port specially bound for listening or sending through a specific port.
index 8f6e481d4f903ade8a0c09d309c94c5472e0739d,d3c44469dae94a184817fc086bcf9f08435482ea..4e5a304831b5c74806dde63863c3f5ada73f5b95
@@@ -141,7 -141,7 +141,7 @@@ Comm::ListenStateData::okToAccept(
      return false;
  }
  
--bool
++void
  Comm::ListenStateData::acceptOne()
  {
      /*
  
          if (newfd == COMM_NOMESSAGE) {
              /* register interest again */
--            debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << *theCallback);
++            debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback);
              commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
--            return false;
++            return;
          }
  
          // A non-recoverable error; notify the caller */
--        debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << *theCallback);
++        debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback);
          notify(-1, COMM_ERROR, errno, connDetails);
--        return false;
++        mayAcceptMore = false;
++        return;
      }
  
      debugs(5, 5, HERE << "accepted: FD " << fd <<
             " newfd: " << newfd << " from: " << connDetails.peer <<
--           " handler: " << *theCallback);
++           " handler: " << theCallback);
      notify(newfd, COMM_OK, 0, connDetails);
--    return true;
  }
  
  void
@@@ -182,7 -182,7 +182,7 @@@ Comm::ListenStateData::acceptNext(
  {
      assert(isOpen(fd));
      debugs(5, 2, HERE << "connection on FD " << fd);
--    mayAcceptMore = acceptOne();
++    acceptOne();
  }
  
  void
index c7bdc7be2dd822998382a573970aa86c015b1216,c7bdc7be2dd822998382a573970aa86c015b1216..66ed358786c465fe090d8c2c6912cdc19a4bd706
@@@ -41,7 -41,7 +41,7 @@@ private
      /// Method callback for whenever an FD is ready to accept a client connection.
      static void doAccept(int fd, void *data);
  
--    bool acceptOne();
++    void acceptOne();
      int oldAccept(ConnectionDetail &details);
  
      AsyncCall::Pointer theCallback;
diff --cc src/debug.cc
Simple merge
diff --cc src/enums.h
Simple merge
diff --cc src/htcp.cc
index 2e261aa295babb204dd3974d28a4ea73bbd503e4,fe6d5079890a2ce6a47b716000ed9aabdd59c221..fd17a3270b9448279e781b17624c3649c49e3471
@@@ -1500,21 -1480,26 +1500,21 @@@ htcpInit(void
          return;
      }
  
-     IpAddress incomingAddr = Config.Addrs.udp_incoming;
+     Ip::Address incomingAddr = Config.Addrs.udp_incoming;
      incomingAddr.SetPort(Config.Port.htcp);
  
 -    enter_suid();
 -    htcpInSocket = comm_open_listener(SOCK_DGRAM,
 -                                      IPPROTO_UDP,
 -                                      incomingAddr,
 -                                      COMM_NONBLOCKING,
 -                                      "HTCP Socket");
 -    leave_suid();
 -
 -    if (htcpInSocket < 0)
 -        fatal("Cannot open HTCP Socket");
 +    AsyncCall::Pointer call = asyncCall(31, 2,
 +                                        "htcpIncomingConnectionOpened",
 +                                        HtcpListeningStartedDialer(&htcpIncomingConnectionOpened));
  
 -    commSetSelect(htcpInSocket, COMM_SELECT_READ, htcpRecv, NULL, 0);
 -
 -    debugs(31, 1, "Accepting HTCP messages on port " << Config.Port.htcp << ", FD " << htcpInSocket << ".");
 +    Ipc::StartListening(SOCK_DGRAM,
 +                        IPPROTO_UDP,
 +                        incomingAddr,
 +                        COMM_NONBLOCKING,
 +                        Ipc::fdnInHtcpSocket, call);
  
      if (!Config.Addrs.udp_outgoing.IsNoAddr()) {
-         IpAddress outgoingAddr = Config.Addrs.udp_outgoing;
+         Ip::Address outgoingAddr = Config.Addrs.udp_outgoing;
          outgoingAddr.SetPort(Config.Port.htcp);
  
          enter_suid();
diff --cc src/icp_v2.cc
index 67bd9788820b8822aa8793fe7eb6c6bd20620c34,2afd304e893f7bf1857f7c49e6fcc687c6ea16fc..cb1e4cde5cbced6d37d664b0784347f0ed726c6b
  #include "SquidTime.h"
  #include "SwapDir.h"
  #include "icmp/net_db.h"
- #include "ip/IpAddress.h"
+ #include "ip/Address.h"
 +#include "ipc/StartListening.h"
  #include "rfc1738.h"
  
-     typedef void (*Handler)(int fd, int errNo, IpAddress& addr);
-     IcpListeningStartedDialer(Handler aHandler, IpAddress& anAddr):
 +/// dials icpIncomingConnectionOpened call
 +class IcpListeningStartedDialer: public CallDialer,
 +        public Ipc::StartListeningCb
 +{
 +public:
-     IpAddress addr;
++    typedef void (*Handler)(int fd, int errNo, Ip::Address& addr);
++    IcpListeningStartedDialer(Handler aHandler, Ip::Address& anAddr):
 +            handler(aHandler), addr(anAddr) {}
 +
 +    virtual void print(std::ostream &os) const {
 +        startPrint(os) <<
 +        ", address=" << addr << ')';
 +    }
 +
 +    virtual bool canDial(AsyncCall &) const { return true; }
 +    virtual void dial(AsyncCall &) { (handler)(fd, errNo, addr); }
 +
 +public:
 +    Handler handler;
- static void icpIncomingConnectionOpened(int fd, int errNo, IpAddress& addr);
++    Ip::Address addr;
 +};
 +
++static void icpIncomingConnectionOpened(int fd, int errNo, Ip::Address& addr);
 +
  /// \ingroup ServerProtocolICPInternal2
- static void icpLogIcp(const IpAddress &, log_type, int, const char *, int);
+ static void icpLogIcp(const Ip::Address &, log_type, int, const char *, int);
  
  /// \ingroup ServerProtocolICPInternal2
- static void icpHandleIcpV2(int, IpAddress &, char *, int);
+ static void icpHandleIcpV2(int, Ip::Address &, char *, int);
  
  /// \ingroup ServerProtocolICPInternal2
  static void icpCount(void *, int, size_t, int);
@@@ -738,31 -728,6 +736,31 @@@ icpConnectionsOpen(void
      theOutICPAddr.FreeAddrInfo(xai);
  }
  
- icpIncomingConnectionOpened(int fd, int errNo, IpAddress& addr)
 +static void
++icpIncomingConnectionOpened(int fd, int errNo, Ip::Address& addr)
 +{
 +    theInIcpConnection = fd;
 +
 +    if (theInIcpConnection < 0)
 +        fatal("Cannot open ICP Port");
 +
 +    commSetSelect(theInIcpConnection,
 +                  COMM_SELECT_READ,
 +                  icpHandleUdp,
 +                  NULL,
 +                  0);
 +
 +    for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
 +        ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL);
 +
 +    debugs(12, 1, "Accepting ICP messages at " << addr << ", FD " << theInIcpConnection << ".");
 +
 +    fd_note(theInIcpConnection, "Incoming ICP socket");
 +
 +    if (Config.Addrs.udp_outgoing.IsNoAddr())
 +        theOutIcpConnection = theInIcpConnection;
 +}
 +
  /**
   * icpConnectionShutdown only closes the 'in' socket if it is
   * different than the 'out' socket.
index 2591e02a421b53f8e386e6fe77faee120c2d998e,0fc8cf9b49f4540de307ff2bb928ff52ceded9b1..0dce34d28f49fee11a86bb6ec76911bfd16fb7f1
@@@ -879,12 -888,8 +888,14 @@@ Ip::Address::matchIPAddr(const Ip::Addr
      return 0;
  }
  
- int IpAddress::compareWhole(const IpAddress &rhs) const
++int
++Ip::Address::compareWhole(const Ip::Address &rhs) const
 +{
 +    return memcmp(this, &rhs, sizeof(*this));
 +}
 +
- bool IpAddress::operator ==(const IpAddress &s) const
+ bool
+ Ip::Address::operator ==(const Ip::Address &s) const
  {
      return (0 == matchIPAddr(s));
  }
index 8423b42864b327e37f09b2abf52246b7e303a0e2,12612cefbae483078e7bd0f1049442668fa62dd0..3f5ee88bdeae5e1a37f8f8d8ec18ab802fe817e5
@@@ -324,17 -319,10 +319,17 @@@ public
       \retval  1  IP rhs is greater (numerically) than that stored.
       \retval -1  IP rhs is less (numerically) than that stored.
       */
-     int matchIPAddr(const IpAddress &rhs) const;
+     int matchIPAddr(const Address &rhs) const;
  
-     int compareWhole(const IpAddress &rhs) const;
 +    /** Compare taking IP, port, protocol, etc. into account. Returns an
 +        integer  less  than,  equal  to,  or greater than zero if the object
 +        is found, respectively, to be less than, to match, or to be greater
 +        than rhs. The exact ordering algorithm is not specified and may change.
 +    */
++    int compareWhole(const Ip::Address &rhs) const;
 +
      /**
-      *  Get RFC 3493 addrinfo structure from the IpAddress data
+      *  Get RFC 3493 addrinfo structure from the Ip::Address data
       *  for protocol-neutral socket operations.
       *  Should be passed a NULL pointer of type struct addrinfo* it will
       *  allocate memory for the structures involved. (see FreeAddrInfo to clear).
index a714330e3180393126a3686ffdb6b9fce8a7d082,0000000000000000000000000000000000000000..0f2eac99757a924859f61364d9bf3332777f072a
mode 100644,000000..100644
--- /dev/null
@@@ -1,139 -1,0 +1,139 @@@
-     IpAddress addr = p.addr; // comm_open_listener may modify it
 +/*
 + * $Id$
 + *
 + * DEBUG: section 54    Interprocess Communication
 + *
 + */
 +
 +
 +#include "config.h"
 +#include "comm.h"
 +#include "ipc/Coordinator.h"
 +#include "ipc/FdNotes.h"
 +#include "ipc/SharedListen.h"
 +
 +
 +CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
 +Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
 +
 +
 +Ipc::Coordinator::Coordinator():
 +        Port(coordinatorAddr)
 +{
 +}
 +
 +void Ipc::Coordinator::start()
 +{
 +    Port::start();
 +}
 +
 +Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
 +{
 +    typedef Strands::iterator SI;
 +    for (SI iter = strands.begin(); iter != strands.end(); ++iter) {
 +        if (iter->kidId == kidId)
 +            return &(*iter);
 +    }
 +    return NULL;
 +}
 +
 +void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
 +{
 +    if (StrandCoord* found = findStrand(strand.kidId))
 +        *found = strand;
 +    else
 +        strands.push_back(strand);
 +}
 +
 +void Ipc::Coordinator::receive(const TypedMsgHdr& message)
 +{
 +    switch (message.type()) {
 +    case mtRegistration:
 +        debugs(54, 6, HERE << "Registration request");
 +        handleRegistrationRequest(StrandCoord(message));
 +        break;
 +
 +    case mtSharedListenRequest:
 +        debugs(54, 6, HERE << "Shared listen request");
 +        handleSharedListenRequest(SharedListenRequest(message));
 +        break;
 +
 +    default:
 +        debugs(54, 1, HERE << "Unhandled message type: " << message.type());
 +        break;
 +    }
 +}
 +
 +void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
 +{
 +    registerStrand(strand);
 +
 +    // send back an acknowledgement; TODO: remove as not needed?
 +    TypedMsgHdr message;
 +    strand.pack(message);
 +    SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
 +}
 +
 +void
 +Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
 +{
 +    debugs(54, 4, HERE << "kid" << request.requestorId <<
 +           " needs shared listen FD for " << request.params.addr);
 +    Listeners::const_iterator i = listeners.find(request.params);
 +    int errNo = 0;
 +    const int sock = (i != listeners.end()) ?
 +                     i->second : openListenSocket(request, errNo);
 +
 +    debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " <<
 +           request.params.addr << " to kid" << request.requestorId <<
 +           " mapId=" << request.mapId);
 +
 +    SharedListenResponse response(sock, errNo, request.mapId);
 +    TypedMsgHdr message;
 +    response.pack(message);
 +    SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
 +}
 +
 +int
 +Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
 +                                   int &errNo)
 +{
 +    const OpenListenerParams &p = request.params;
 +
 +    debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
 +           request.requestorId);
 +
++    Ip::Address addr = p.addr; // comm_open_listener may modify it
 +
 +    enter_suid();
 +    const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags,
 +                                        FdNote(p.fdNote));
 +    errNo = (sock >= 0) ? 0 : errno;
 +    leave_suid();
 +
 +    // cache positive results
 +    if (sock >= 0)
 +        listeners[request.params] = sock;
 +
 +    return sock;
 +}
 +
 +void Ipc::Coordinator::broadcastSignal(int sig) const
 +{
 +    typedef Strands::const_iterator SCI;
 +    for (SCI iter = strands.begin(); iter != strands.end(); ++iter) {
 +        debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
 +               ", PID=" << iter->pid);
 +        kill(iter->pid, sig);
 +    }
 +}
 +
 +Ipc::Coordinator* Ipc::Coordinator::Instance()
 +{
 +    if (!TheInstance)
 +        TheInstance = new Coordinator;
 +    // XXX: if the Coordinator job quits, this pointer will become invalid
 +    // we could make Coordinator death fatal, except during exit, but since
 +    // Strands do not re-register, even process death would be pointless.
 +    return TheInstance;
 +}
index ada9cefac4eb1a4ac21fb671be82d9b87e953854,0000000000000000000000000000000000000000..77e2e1c2247a157a9def2d8bbd261d5fc8ed8271
mode 100644,000000..100644
--- /dev/null
@@@ -1,149 -1,0 +1,150 @@@
 +/*
 + * $Id$
 + *
 + * DEBUG: section 54    Interprocess Communication
 + *
 + */
 +
 +#include "config.h"
 +#include <map>
 +#include "comm.h"
++#include "base/TextException.h"
 +#include "ipc/Port.h"
 +#include "ipc/Messages.h"
 +#include "ipc/Kids.h"
 +#include "ipc/TypedMsgHdr.h"
 +#include "ipc/StartListening.h"
 +#include "ipc/SharedListen.h"
 +
 +
 +/// holds information necessary to handle JoinListen response
 +class PendingOpenRequest
 +{
 +public:
 +    Ipc::OpenListenerParams params; ///< actual comm_open_sharedListen() parameters
 +    AsyncCall::Pointer callback; // who to notify
 +};
 +
 +/// maps ID assigned at request time to the response callback
 +typedef std::map<int, PendingOpenRequest> SharedListenRequestMap;
 +static SharedListenRequestMap TheSharedListenRequestMap;
 +
 +static int
 +AddToMap(const PendingOpenRequest &por)
 +{
 +    // find unused ID using linear seach; there should not be many entries
 +    for (int id = 0; true; ++id) {
 +        if (TheSharedListenRequestMap.find(id) == TheSharedListenRequestMap.end()) {
 +            TheSharedListenRequestMap[id] = por;
 +            return id;
 +        }
 +    }
 +    assert(false); // not reached
 +    return -1;
 +}
 +
 +Ipc::OpenListenerParams::OpenListenerParams()
 +{
 +    xmemset(this, 0, sizeof(*this));
 +}
 +
 +bool
 +Ipc::OpenListenerParams::operator <(const OpenListenerParams &p) const
 +{
 +    if (sock_type != p.sock_type)
 +        return sock_type < p.sock_type;
 +
 +    if (proto != p.proto)
 +        return proto < p.proto;
 +
 +    // ignore flags and fdNote differences because they do not affect binding
 +
 +    return addr.compareWhole(p.addr) < 0;
 +}
 +
 +
 +
 +Ipc::SharedListenRequest::SharedListenRequest(): requestorId(-1), mapId(-1)
 +{
 +    // caller will then set public data members
 +}
 +
 +Ipc::SharedListenRequest::SharedListenRequest(const TypedMsgHdr &hdrMsg)
 +{
 +    hdrMsg.getData(mtSharedListenRequest, this, sizeof(*this));
 +}
 +
 +void Ipc::SharedListenRequest::pack(TypedMsgHdr &hdrMsg) const
 +{
 +    hdrMsg.putData(mtSharedListenRequest, this, sizeof(*this));
 +}
 +
 +
 +Ipc::SharedListenResponse::SharedListenResponse(int aFd, int anErrNo, int aMapId):
 +        fd(aFd), errNo(anErrNo), mapId(aMapId)
 +{
 +}
 +
 +Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr &hdrMsg):
 +        fd(-1), errNo(0), mapId(-1)
 +{
 +    hdrMsg.getData(mtSharedListenResponse, this, sizeof(*this));
 +    fd = hdrMsg.getFd();
 +}
 +
 +void Ipc::SharedListenResponse::pack(TypedMsgHdr &hdrMsg) const
 +{
 +    hdrMsg.putData(mtSharedListenResponse, this, sizeof(*this));
 +    hdrMsg.putFd(fd);
 +}
 +
 +
 +void Ipc::JoinSharedListen(const OpenListenerParams &params,
 +                           AsyncCall::Pointer &callback)
 +{
 +    PendingOpenRequest por;
 +    por.params = params;
 +    por.callback = callback;
 +
 +    SharedListenRequest request;
 +    request.requestorId = KidIdentifier;
 +    request.params = por.params;
 +    request.mapId = AddToMap(por);
 +
 +    debugs(54, 3, HERE << "getting listening FD for " << request.params.addr <<
 +           " mapId=" << request.mapId);
 +
 +    TypedMsgHdr message;
 +    request.pack(message);
 +    SendMessage(coordinatorAddr, message);
 +}
 +
 +void Ipc::SharedListenJoined(const SharedListenResponse &response)
 +{
 +    const int fd = response.fd;
 +
 +    debugs(54, 3, HERE << "got listening FD " << fd << " errNo=" <<
 +           response.errNo << " mapId=" << response.mapId);
 +
 +    Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end());
 +    PendingOpenRequest por = TheSharedListenRequestMap[response.mapId];
 +    Must(por.callback != NULL);
 +    TheSharedListenRequestMap.erase(response.mapId);
 +
 +    if (fd >= 0) {
 +        OpenListenerParams &p = por.params;
 +        struct addrinfo *AI = NULL;
 +        p.addr.GetAddrInfo(AI);
 +        AI->ai_socktype = p.sock_type;
 +        AI->ai_protocol = p.proto;
 +        comm_import_opened(fd, p.addr, p.flags, FdNote(p.fdNote), AI);
 +        p.addr.FreeAddrInfo(AI);
 +    }
 +
 +    StartListeningCb *cbd =
 +        dynamic_cast<StartListeningCb*>(por.callback->getDialer());
 +    Must(cbd);
 +    cbd->fd = fd;
 +    cbd->errNo = response.errNo;
 +    ScheduleCallHere(por.callback);
 +}
index 3b77b88a5f0e55f4b940de59c9bd5ac88c4a4dc0,0000000000000000000000000000000000000000..8d93ef43c34cab7541deb433a6e6cd5003bc008e
mode 100644,000000..100644
--- /dev/null
@@@ -1,74 -1,0 +1,74 @@@
-     IpAddress addr; ///< will be memset and memcopied
 +/*
 + * $Id$
 + *
 + * DEBUG: section 54    Interprocess Communication
 + *
 + */
 +
 +#ifndef SQUID_IPC_SHARED_LISTEN_H
 +#define SQUID_IPC_SHARED_LISTEN_H
 +
 +#include "base/AsyncCall.h"
 +
 +namespace Ipc
 +{
 +
 +/// "shared listen" is when concurrent processes are listening on the same fd
 +
 +/// comm_open_listener() parameters holder
 +class OpenListenerParams
 +{
 +public:
 +    OpenListenerParams();
 +
 +    bool operator <(const OpenListenerParams &p) const; ///< useful for map<>
 +
 +    int sock_type;
 +    int proto;
++    Ip::Address addr; ///< will be memset and memcopied
 +    int flags;
 +    int fdNote; ///< index into fd_note() comment strings
 +};
 +
 +class TypedMsgHdr;
 +
 +/// a request for a listen socket with given parameters
 +class SharedListenRequest
 +{
 +public:
 +    SharedListenRequest(); ///< from OpenSharedListen() which then sets public data
 +    explicit SharedListenRequest(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
 +    void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
 +
 +public:
 +    int requestorId; ///< kidId of the requestor
 +
 +    OpenListenerParams params; ///< actual comm_open_sharedListen() parameters
 +
 +    int mapId; ///< to map future response to the requestor's callback
 +};
 +
 +/// a response to SharedListenRequest
 +class SharedListenResponse
 +{
 +public:
 +    SharedListenResponse(int fd, int errNo, int mapId);
 +    explicit SharedListenResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
 +    void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
 +
 +public:
 +    int fd; ///< opened listening socket or -1
 +    int errNo; ///< errno value from comm_open_sharedListen() call
 +    int mapId; ///< to map future response to the requestor's callback
 +};
 +
 +/// prepare and send SharedListenRequest to Coordinator
 +extern void JoinSharedListen(const OpenListenerParams &, AsyncCall::Pointer &);
 +
 +/// process Coordinator response to SharedListenRequest
 +extern void SharedListenJoined(const SharedListenResponse &response);
 +
 +} // namespace Ipc;
 +
 +
 +#endif /* SQUID_IPC_SHARED_LISTEN_H */
index ff61538cc3f7774875762841116b57a8d5743c1d,0000000000000000000000000000000000000000..77fa86f778e6cdaa1b1cd8a330a2d531c12fee98
mode 100644,000000..100644
--- /dev/null
@@@ -1,58 -1,0 +1,58 @@@
- #include "TextException.h"
 +/*
 + * $Id$
 + *
 + * DEBUG: section 54    Interprocess Communication
 + *
 + */
 +
 +#include "config.h"
 +#include "comm.h"
- void Ipc::StartListening(int sock_type, int proto, IpAddress &addr,
++#include "base/TextException.h"
 +#include "ipc/SharedListen.h"
 +#include "ipc/StartListening.h"
 +
 +
 +Ipc::StartListeningCb::StartListeningCb(): fd(-1), errNo(0)
 +{
 +}
 +
 +Ipc::StartListeningCb::~StartListeningCb()
 +{
 +}
 +
 +std::ostream &Ipc::StartListeningCb::startPrint(std::ostream &os) const
 +{
 +    return os << "(FD " << fd << ", err=" << errNo;
 +}
 +
 +
++void Ipc::StartListening(int sock_type, int proto, Ip::Address &addr,
 +                         int flags, FdNoteId fdNote, AsyncCall::Pointer &callback)
 +{
 +    OpenListenerParams p;
 +    p.sock_type = sock_type;
 +    p.proto = proto;
 +    p.addr = addr;
 +    p.flags = flags;
 +    p.fdNote = fdNote;
 +
 +    if (UsingSmp()) { // if SMP is on, share
 +        Ipc::JoinSharedListen(p, callback);
 +        return; // wait for the call back
 +    }
 +
 +    enter_suid();
 +    const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags,
 +                                        FdNote(p.fdNote));
 +    const int errNo = (sock >= 0) ? 0 : errno;
 +    leave_suid();
 +
 +    debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr);
 +
 +    StartListeningCb *cbd =
 +        dynamic_cast<StartListeningCb*>(callback->getDialer());
 +    Must(cbd);
 +    cbd->fd = sock;
 +    cbd->errNo = errNo;
 +    ScheduleCallHere(callback);
 +}
index f2180d7543eb83923915341ffb56037323bb8b00,0000000000000000000000000000000000000000..1950cc4644b29821f780cfeb8e0bd4f710b65dcf
mode 100644,000000..100644
--- /dev/null
@@@ -1,43 -1,0 +1,42 @@@
- class IpAddress;
 +/*
 + * $Id$
 + *
 + * DEBUG: section 54    Interprocess Communication
 + *
 + */
 +
 +#ifndef SQUID_IPC_START_LISTENING_H
 +#define SQUID_IPC_START_LISTENING_H
 +
 +#include <iosfwd>
++#include "ip/forward.h"
 +#include "ipc/FdNotes.h"
 +#include "base/AsyncCall.h"
 +
- extern void StartListening(int sock_type, int proto, IpAddress &addr,
 +namespace Ipc
 +{
 +
 +/// common API for all StartListening() callbacks
 +class StartListeningCb
 +{
 +public:
 +    StartListeningCb();
 +    virtual ~StartListeningCb();
 +
 +    /// starts printing arguments, return os
 +    std::ostream &startPrint(std::ostream &os) const;
 +
 +public:
 +    int fd; ///< opened listening socket or -1
 +    int errNo; ///< errno value from the comm_open_listener() call
 +};
 +
 +/// Depending on whether SMP is on, either ask Coordinator to send us
 +/// the listening FD or call comm_open_listener() directly.
++extern void StartListening(int sock_type, int proto, Ip::Address &addr,
 +                           int flags, FdNoteId fdNote, AsyncCall::Pointer &callback);
 +
 +} // namespace Ipc;
 +
 +
 +#endif /* SQUID_IPC_START_LISTENING_H */
index 622f8c4e6bd8e02e545480101b94024c1c6ec948,0000000000000000000000000000000000000000..384e44794f4b601399ec24a5f06152edcd1e7a7b
mode 100644,000000..100644
--- /dev/null
@@@ -1,77 -1,0 +1,78 @@@
 +/*
 + * $Id$
 + *
 + * DEBUG: section 54    Interprocess Communication
 + *
 + */
 +
 +#include "config.h"
++#include "base/TextException.h"
 +#include "ipc/Strand.h"
 +#include "ipc/Messages.h"
 +#include "ipc/SharedListen.h"
 +#include "ipc/Kids.h"
 +
 +
 +CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
 +
 +
 +Ipc::Strand::Strand():
 +        Port(MakeAddr(strandAddrPfx, KidIdentifier)),
 +        isRegistered(false)
 +{
 +}
 +
 +void Ipc::Strand::start()
 +{
 +    Port::start();
 +    registerSelf();
 +}
 +
 +void Ipc::Strand::registerSelf()
 +{
 +    debugs(54, 6, HERE);
 +    Must(!isRegistered);
 +    TypedMsgHdr message;
 +    StrandCoord(KidIdentifier, getpid()).pack(message);
 +    SendMessage(coordinatorAddr, message);
 +    setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
 +}
 +
 +void Ipc::Strand::receive(const TypedMsgHdr &message)
 +{
 +    debugs(54, 6, HERE << message.type());
 +    switch (message.type()) {
 +
 +    case mtRegistration:
 +        handleRegistrationResponse(StrandCoord(message));
 +        break;
 +
 +    case mtSharedListenResponse:
 +        SharedListenJoined(SharedListenResponse(message));
 +        break;
 +
 +    default:
 +        debugs(54, 1, HERE << "Unhandled message type: " << message.type());
 +        break;
 +    }
 +}
 +
 +void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand)
 +{
 +    // handle registration response from the coordinator; it could be stale
 +    if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
 +        debugs(54, 6, "kid" << KidIdentifier << " registered");
 +        clearTimeout(); // we are done
 +    } else {
 +        // could be an ACK to the registration message of our dead predecessor
 +        debugs(54, 6, "kid" << KidIdentifier << " is not yet registered");
 +        // keep listening, with a timeout
 +    }
 +}
 +
 +void Ipc::Strand::timedout()
 +{
 +    debugs(54, 6, HERE << isRegistered);
 +    if (!isRegistered)
 +        fatalf("kid%d registration timed out", KidIdentifier);
 +}
index 0b12ab48614f0075c89b85e360c80483a5a872ab,0000000000000000000000000000000000000000..622e142dbbbde20cbc6a4a4673f334ea553b36f2
mode 100644,000000..100644
--- /dev/null
@@@ -1,167 -1,0 +1,168 @@@
- #include "TextException.h"
 +/*
 + * $Id$
 + *
 + * DEBUG: section 54    Interprocess Communication
 + *
 + */
 +
 +
 +#include "config.h"
 +#include <string.h>
++#include "protos.h"
++#include "base/TextException.h"
 +#include "ipc/TypedMsgHdr.h"
 +
 +Ipc::TypedMsgHdr::TypedMsgHdr()
 +{
 +    xmemset(this, 0, sizeof(*this));
 +    sync();
 +}
 +
 +Ipc::TypedMsgHdr::TypedMsgHdr(const TypedMsgHdr &tmh)
 +{
 +    xmemcpy(this, &tmh, sizeof(*this));
 +    sync();
 +}
 +
 +Ipc::TypedMsgHdr &Ipc::TypedMsgHdr::operator =(const TypedMsgHdr &tmh)
 +{
 +    if (this != &tmh) { // skip assignment to self
 +        xmemcpy(this, &tmh, sizeof(*this));
 +        sync();
 +    }
 +    return *this;
 +}
 +
 +// update msghdr and ios pointers based on msghdr counters
 +void Ipc::TypedMsgHdr::sync()
 +{
 +    if (msg_name) { // we have a name
 +        msg_name = &name;
 +    } else {
 +        Must(!msg_namelen && !msg_name);
 +    }
 +
 +    if (msg_iov) { // we have a data component
 +        Must(msg_iovlen == 1);
 +        msg_iov = ios;
 +        ios[0].iov_base = &data;
 +        Must(ios[0].iov_len == sizeof(data));
 +    } else {
 +        Must(!msg_iovlen && !msg_iov);
 +    }
 +
 +    if (msg_control) { // we have a control component
 +        Must(msg_controllen > 0);
 +        msg_control = &ctrl;
 +    } else {
 +        Must(!msg_controllen && !msg_control);
 +    }
 +}
 +
 +
 +
 +int
 +Ipc::TypedMsgHdr::type() const
 +{
 +    Must(msg_iovlen == 1);
 +    return data.type_;
 +}
 +
 +void
 +Ipc::TypedMsgHdr::address(const struct sockaddr_un& addr)
 +{
 +    allocName();
 +    name = addr;
 +    msg_name = &name;
 +    msg_namelen = SUN_LEN(&name);
 +}
 +
 +void
 +Ipc::TypedMsgHdr::getData(int destType, void *raw, size_t size) const
 +{
 +    Must(type() == destType);
 +    Must(size == data.size);
 +    xmemcpy(raw, data.raw, size);
 +}
 +
 +void
 +Ipc::TypedMsgHdr::putData(int aType, const void *raw, size_t size)
 +{
 +    Must(size <= sizeof(data.raw));
 +    allocData();
 +    data.type_ = aType;
 +    data.size = size;
 +    xmemcpy(data.raw, raw, size);
 +}
 +
 +void
 +Ipc::TypedMsgHdr::putFd(int fd)
 +{
 +    Must(fd >= 0);
 +    allocControl();
 +
 +    const int fdCount = 1;
 +
 +    struct cmsghdr *cmsg = CMSG_FIRSTHDR(this);
 +    cmsg->cmsg_level = SOL_SOCKET;
 +    cmsg->cmsg_type = SCM_RIGHTS;
 +    cmsg->cmsg_len = CMSG_LEN(sizeof(int) * fdCount);
 +
 +    int *fdStore = reinterpret_cast<int*>(CMSG_DATA(cmsg));
 +    xmemcpy(fdStore, &fd, fdCount * sizeof(int));
 +    msg_controllen = cmsg->cmsg_len;
 +}
 +
 +int
 +Ipc::TypedMsgHdr::getFd() const
 +{
 +    Must(msg_control && msg_controllen);
 +
 +    struct cmsghdr *cmsg = CMSG_FIRSTHDR(this);
 +    Must(cmsg->cmsg_level == SOL_SOCKET);
 +    Must(cmsg->cmsg_type == SCM_RIGHTS);
 +
 +    const int fdCount = 1;
 +    const int *fdStore = reinterpret_cast<const int*>(CMSG_DATA(cmsg));
 +    int fd = -1;
 +    xmemcpy(&fd, fdStore, fdCount * sizeof(int));
 +    return fd;
 +}
 +
 +void
 +Ipc::TypedMsgHdr::prepForReading()
 +{
 +    xmemset(this, 0, sizeof(*this));
 +    allocName();
 +    allocData();
 +    allocControl();
 +}
 +
 +/// initialize io vector with one io record
 +void
 +Ipc::TypedMsgHdr::allocData()
 +{
 +    Must(!msg_iovlen && !msg_iov);
 +    msg_iovlen = 1;
 +    msg_iov = ios;
 +    ios[0].iov_base = &data;
 +    ios[0].iov_len = sizeof(data);
 +    data.type_ = 0;
 +    data.size = 0;
 +}
 +
 +void
 +Ipc::TypedMsgHdr::allocName()
 +{
 +    Must(!msg_name && !msg_namelen);
 +    msg_name = &name;
 +    msg_namelen = sizeof(name); // is that the right size?
 +}
 +
 +void
 +Ipc::TypedMsgHdr::allocControl()
 +{
 +    Must(!msg_control && !msg_controllen);
 +    msg_control = &ctrl;
 +    msg_controllen = sizeof(ctrl);
 +}
index 741a33e0e4ae9c30d45ef9695279ba067aa81c76,0000000000000000000000000000000000000000..f1d05e38527fed36db84ca24d318d1e4510bd2b4
mode 100644,000000..100644
--- /dev/null
@@@ -1,131 -1,0 +1,132 @@@
 +/*
 + * $Id$
 + *
 + * DEBUG: section 54    Interprocess Communication
 + *
 + */
 +
 +
 +#include "config.h"
 +#include "comm.h"
 +#include "CommCalls.h"
++#include "base/TextException.h"
 +#include "ipc/UdsOp.h"
 +
 +
 +Ipc::UdsOp::UdsOp(const String& pathAddr):
 +        AsyncJob("Ipc::UdsOp"),
 +        address(PathToAddress(pathAddr)),
 +        options(COMM_NONBLOCKING),
 +        fd_(-1)
 +{
 +    debugs(54, 5, HERE << '[' << this << "] pathAddr=" << pathAddr);
 +}
 +
 +Ipc::UdsOp::~UdsOp()
 +{
 +    debugs(54, 5, HERE << '[' << this << ']');
 +    if (fd_ >= 0)
 +        comm_close(fd_);
 +}
 +
 +void Ipc::UdsOp::setOptions(int newOptions)
 +{
 +    options = newOptions;
 +}
 +
 +int Ipc::UdsOp::fd()
 +{
 +    if (fd_ < 0) {
 +        if (options & COMM_DOBIND)
 +            unlink(address.sun_path);
 +        fd_ = comm_open_uds(SOCK_DGRAM, 0, &address, options);
 +        Must(fd_ >= 0);
 +    }
 +    return fd_;
 +}
 +
 +void Ipc::UdsOp::setTimeout(int seconds, const char *handlerName)
 +{
 +    AsyncCall::Pointer handler = asyncCall(54,5, handlerName,
 +                                           CommCbMemFunT<UdsOp, CommTimeoutCbParams>(this,
 +                                                   &UdsOp::noteTimeout));
 +    commSetTimeout(fd(), seconds, handler);
 +}
 +
 +void Ipc::UdsOp::clearTimeout()
 +{
 +    commSetTimeout(fd(), -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd)
 +}
 +
 +void Ipc::UdsOp::noteTimeout(const CommTimeoutCbParams &)
 +{
 +    timedout(); // our kid handles communication timeout
 +}
 +
 +
 +struct sockaddr_un
 +Ipc::PathToAddress(const String& pathAddr) {
 +    assert(pathAddr.size() != 0);
 +    struct sockaddr_un unixAddr;
 +    memset(&unixAddr, 0, sizeof(unixAddr));
 +    unixAddr.sun_family = AF_LOCAL;
 +    xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path));
 +    return unixAddr;
 +}
 +
 +
 +CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender);
 +
 +Ipc::UdsSender::UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage):
 +        UdsOp(pathAddr),
 +        message(aMessage),
 +        retries(10), // TODO: make configurable?
 +        timeout(10), // TODO: make configurable?
 +        writing(false)
 +{
 +    message.address(address);
 +}
 +
 +void Ipc::UdsSender::start()
 +{
 +    UdsOp::start();
 +    write();
 +    if (timeout > 0)
 +        setTimeout(timeout, "Ipc::UdsSender::noteTimeout");
 +}
 +
 +bool Ipc::UdsSender::doneAll() const
 +{
 +    return !writing && UdsOp::doneAll();
 +}
 +
 +void Ipc::UdsSender::write()
 +{
 +    debugs(54, 5, HERE);
 +    AsyncCall::Pointer writeHandler = asyncCall(54, 5, "Ipc::UdsSender::wrote",
 +                                      CommCbMemFunT<UdsSender, CommIoCbParams>(this, &UdsSender::wrote));
 +    comm_write(fd(), message.raw(), message.size(), writeHandler);
 +    writing = true;
 +}
 +
 +void Ipc::UdsSender::wrote(const CommIoCbParams& params)
 +{
 +    debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
 +    writing = false;
 +    if (params.flag != COMM_OK && retries-- > 0) {
 +        sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed?
 +        write(); // XXX: should we close on error so that fd() reopens?
 +    }
 +}
 +
 +void Ipc::UdsSender::timedout()
 +{
 +    debugs(54, 5, HERE);
 +    mustStop("timedout");
 +}
 +
 +
 +void Ipc::SendMessage(const String& toAddress, const TypedMsgHdr &message)
 +{
 +    AsyncJob::AsyncStart(new UdsSender(toAddress, message));
 +}
diff --cc src/main.cc
index 362a5a6f46a425de3586e3856e08f4e67381774c,b251d44af37bc594f60b123a3ed31afd701c3a7b..9e480fec4eeafa7446001c13d8c0d68e954e7c3a
@@@ -599,7 -590,7 +604,7 @@@ shut_down(int sig
  
  #endif
  #ifndef _SQUID_MSWIN_
--#ifdef KILL_PARENT_OPT
++#if KILL_PARENT_OPT
  
      if (getppid() > 1) {
          debugs(1, 1, "Killing master process, pid " << getppid());
  static void
  serverConnectionsOpen(void)
  {
 -    clientOpenListenSockets();
 -    icpConnectionsOpen();
 -#if USE_HTCP
 +    if (IamPrimaryProcess()) {
 +#if USE_WCCP
  
 -    htcpInit();
 +        wccpConnectionOpen();
  #endif
 -#if SQUID_SNMP
  
 -    snmpConnectionOpen();
 -#endif
 -#if USE_WCCP
 +#if USE_WCCPv2
  
 -    wccpConnectionOpen();
 +        wccp2ConnectionOpen();
  #endif
 +    }
 +    // Coordinator does not start proxying services
 +    if (!IamCoordinatorProcess()) {
 +        clientOpenListenSockets();
 +        icpConnectionsOpen();
 +#if USE_HTCP
  
 -#if USE_WCCPv2
 +        htcpInit();
 +#endif
- #ifdef SQUID_SNMP
++#if SQUID_SNMP
  
 -    wccp2ConnectionOpen();
 +        snmpConnectionOpen();
  #endif
  
 -    clientdbInit();
 -    icmpEngine.Open();
 -    netdbInit();
 -    asnInit();
 -    ACL::Initialize();
 -    peerSelectInit();
 -
 -    carpInit();
 -    peerUserHashInit();
 -    peerSourceHashInit();
 +        clientdbInit();
 +        icmpEngine.Open();
 +        netdbInit();
 +        asnInit();
 +        ACL::Initialize();
 +        peerSelectInit();
 +
 +        carpInit();
 +        peerUserHashInit();
 +        peerSourceHashInit();
 +    }
  }
  
  static void
  serverConnectionsClose(void)
  {
      assert(shutting_down || reconfiguring);
 -    clientHttpConnectionsClose();
 -    icpConnectionShutdown();
 -#if USE_HTCP
  
 -    htcpSocketShutdown();
 -#endif
 +    if (IamPrimaryProcess()) {
 +#if USE_WCCP
  
 -    icmpEngine.Close();
 -#if SQUID_SNMP
 +        wccpConnectionClose();
 +#endif
 +#if USE_WCCPv2
  
 -    snmpConnectionShutdown();
 +        wccp2ConnectionClose();
  #endif
 -#if USE_WCCP
 +    }
 +    if (!IamCoordinatorProcess()) {
 +        clientHttpConnectionsClose();
 +        icpConnectionShutdown();
 +#if USE_HTCP
  
 -    wccpConnectionClose();
 +        htcpSocketShutdown();
  #endif
 -#if USE_WCCPv2
  
 -    wccp2ConnectionClose();
 +        icmpEngine.Close();
- #ifdef SQUID_SNMP
++#if SQUID_SNMP
 +
 +        snmpConnectionShutdown();
  #endif
  
 -    asnFreeMemory();
 +        asnFreeMemory();
 +    }
  }
  
  static void
@@@ -782,13 -773,11 +804,13 @@@ mainReconfigureFinish(void *
  #endif
  
      redirectInit();
-     authenticateInit(&Config.authConfiguration);
+     authenticateInit(&Auth::TheConfig);
      externalAclInit();
 +
 +    if (IamPrimaryProcess()) {
  #if USE_WCCP
  
 -    wccpInit();
 +        wccpInit();
  #endif
  #if USE_WCCPv2
  
diff --cc src/protos.h
Simple merge
index 287fbef1b5ecd147e4972a486bc5570685352a47,02caad56910e9719e9b238f070da758d0189b678..74dce91e7eecb08db832b4f246ebda3aae6ffc02
   *
   */
  #include "squid.h"
- #include "comm.h"
- #include "cache_snmp.h"
  #include "acl/FilledChecklist.h"
- #include "ip/IpAddress.h"
+ #include "cache_snmp.h"
+ #include "comm.h"
 +#include "ipc/StartListening.h"
+ #include "compat/strsep.h"
+ #include "ip/Address.h"
  
  #define SNMP_REQUEST_SIZE 4096
  #define MAX_PROTOSTAT 5
  
 +/// dials snmpConnectionOpened call
 +class SnmpListeningStartedDialer: public CallDialer,
 +        public Ipc::StartListeningCb
 +{
 +public:
 +    typedef void (*Handler)(int fd, int errNo);
 +    SnmpListeningStartedDialer(Handler aHandler): handler(aHandler) {}
 +
 +    virtual void print(std::ostream &os) const { startPrint(os) << ')'; }
 +
 +    virtual bool canDial(AsyncCall &) const { return true; }
 +    virtual void dial(AsyncCall &) { (handler)(fd, errNo); }
 +
 +public:
 +    Handler handler;
 +};
 +
 +
- IpAddress theOutSNMPAddr;
+ Ip::Address theOutSNMPAddr;
  
  typedef struct _mib_tree_entry mib_tree_entry;
  typedef oid *(instance_Fn) (oid * name, snint * len, mib_tree_entry * current, oid_ParseFn ** Fn);
diff --cc src/structs.h
index 09806d00497d4d1148dc0dc58f23e7e4d8376a4a,aac9ffcfe7b75f59831bb7fc5d977a779d46fb82..819d63029baa19796378ab6bccbc294ebb95bde7
@@@ -617,7 -610,7 +610,8 @@@ struct SquidConfig 
  
      char *accept_filter;
      int umask;
+     int max_filedescriptors;
 +    int workers;
  
  #if USE_LOADABLE_MODULES
      wordlist *loadable_module_names;
diff --cc src/tools.cc
index 50768dbe5228d80ba4f38c051c893bd9a4331383,c8b8e4d2e0888b5c7e9fb9a665899a48839c366d..fe02b68cfe8c5c8b93e752ff4621346707ccae6d
   */
  
  #include "squid.h"
- #include "ProtoPort.h"
- #include "SwapDir.h"
+ #include "compat/initgroups.h"
+ #include "compat/getaddrinfo.h"
+ #include "compat/getnameinfo.h"
+ #include "compat/tempnam.h"
  #include "fde.h"
+ #include "ip/Intercept.h"
  #include "MemBuf.h"
- #include "wordlist.h"
+ #include "ProtoPort.h"
  #include "SquidMath.h"
  #include "SquidTime.h"
- #include "ip/IpIntercept.h"
 +#include "ipc/Kids.h"
 +#include "ipc/Coordinator.h"
+ #include "SwapDir.h"
+ #include "wordlist.h"
  
  #if HAVE_SYS_PRCTL_H
  #include <sys/prctl.h>
@@@ -416,10 -408,8 +420,10 @@@ sigusr2_handle(int sig
      static int state = 0;
      /* no debugs() here; bad things happen if the signal is delivered during _db_print() */
  
 +    DebugSignal = sig;
 +
      if (state == 0) {
- #ifndef MEM_GEN_TRACE
+ #if !MEM_GEN_TRACE
          Debug::parseOptions("ALL,7");
  #else