]> git.ipfire.org Git - thirdparty/squid.git/commitdiff
IPC patch
authorroot <root@ubuntu>
Fri, 16 Apr 2010 11:06:18 +0000 (18:06 +0700)
committerroot <root@ubuntu>
Fri, 16 Apr 2010 11:06:18 +0000 (18:06 +0700)
14 files changed:
src/comm.cc
src/comm.h
src/fde.h
src/ipc/Coordinator.cc [new file with mode: 0644]
src/ipc/Coordinator.h [new file with mode: 0644]
src/ipc/Kids.cc
src/ipc/Makefile.am
src/ipc/Port.cc [new file with mode: 0644]
src/ipc/Port.h [new file with mode: 0644]
src/ipc/Strand.cc [new file with mode: 0644]
src/ipc/Strand.h [new file with mode: 0644]
src/ipc/UdsOp.cc [new file with mode: 0644]
src/ipc/UdsOp.h [new file with mode: 0644]
src/main.cc

index 1e8ca109adf359f8d23b189438255d75b42ab827..32f4a11f4b8725cc9585b1621d1d57dcd5957008 100644 (file)
@@ -1883,7 +1883,11 @@ commHandleWrite(int fd, void *data)
            (long int) state->offset << ", sz " << (long int) state->size << ".");
 
     nleft = state->size - state->offset;
-    len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
+    // XXX: if we use UDS and it is not connected then 'sendto' will be called.
+    if (!fd_table[fd].flags.called_connect && fd_table[fd].unix_addr.sun_family == AF_LOCAL)
+        len = sendto(fd, state->buf + state->offset, nleft, 0, (sockaddr*)&fd_table[fd].unix_addr, SUN_LEN(&fd_table[fd].unix_addr));
+    else
+        len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft);
     debugs(5, 5, "commHandleWrite: write() returns " << len);
     fd_bytes(fd, len, FD_WRITE);
     statCounter.syscalls.sock.writes++;
@@ -2409,3 +2413,99 @@ CommSelectEngine::checkEvents(int timeout)
         return EVENT_ERROR;
     };
 }
+
+/**
+ * Create a unix-domain socket
+ */
+int
+comm_open_uds(int sock_type,
+              int proto,
+              struct sockaddr_un* addr,
+              int flags)
+{
+    int new_socket;
+    struct addrinfo AI;
+
+    PROF_start(comm_open);
+
+    /* Create socket for accepting new connections. */
+    statCounter.syscalls.sock.sockets++;
+
+    /* Setup the socket addrinfo details for use */
+    AI.ai_flags = 0;
+    AI.ai_family = PF_UNIX;
+    AI.ai_socktype = sock_type;
+    AI.ai_protocol = proto;
+    AI.ai_addrlen = SUN_LEN(addr);
+    AI.ai_addr = (sockaddr*)addr;
+    AI.ai_canonname = NULL;
+    AI.ai_next = NULL;
+
+    debugs(50, 3, "comm_openex: Attempt open socket for: " << addr->sun_path);
+
+    if ((new_socket = socket(AI.ai_family, AI.ai_socktype, AI.ai_protocol)) < 0) {
+        /* Increase the number of reserved fd's if calls to socket()
+         * are failing because the open file table is full.  This
+         * limits the number of simultaneous clients */
+
+        if (limitError(errno)) {
+            debugs(50, DBG_IMPORTANT, "comm_open: socket failure: " << xstrerror());
+            fdAdjustReserved();
+        } else {
+            debugs(50, DBG_CRITICAL, "comm_open: socket failure: " << xstrerror());
+        }
+
+        PROF_stop(comm_open);
+        return -1;
+    }
+
+    debugs(50, 3, "comm_openex: Opened socket FD " << new_socket << " : family=" << AI.ai_family << ", type=" << AI.ai_socktype << ", protocol=" << AI.ai_protocol );
+
+    /* update fdstat */
+    debugs(50, 5, "comm_open: FD " << new_socket << " is a new socket");
+
+    assert(!isOpen(new_socket));
+
+    fd_open(new_socket, FD_SOCKET, NULL);
+    fd_table[new_socket].sock_family = AI.ai_family;
+
+    fd_table[new_socket].unix_addr = *addr;
+
+    fdd_table[new_socket].close_file = NULL;
+    fdd_table[new_socket].close_line = 0;
+
+    if (!(flags & COMM_NOCLOEXEC))
+        commSetCloseOnExec(new_socket);
+
+    if (flags & COMM_REUSEADDR)
+        commSetReuseAddr(new_socket);
+
+    if (flags & COMM_NONBLOCKING) {
+        if (commSetNonBlocking(new_socket) != COMM_OK) {
+            comm_close(new_socket);
+            PROF_stop(comm_open);
+            return -1;
+        }
+    }
+
+    if (flags & COMM_DOBIND) {
+        if (commBind(new_socket, AI) != COMM_OK) {
+            comm_close(new_socket);
+            PROF_stop(comm_open);
+            return -1;
+        }
+    }
+
+#ifdef TCP_NODELAY
+    if (sock_type == SOCK_STREAM)
+        commSetTcpNoDelay(new_socket);
+
+#endif
+
+    if (Config.tcpRcvBufsz > 0 && sock_type == SOCK_STREAM)
+        commSetTcpRcvbuf(new_socket, Config.tcpRcvBufsz);
+
+    PROF_stop(comm_open);
+
+    return new_socket;
+}
index e9c13cc9a5ae638e414b2b8ec8177c21cb4f592d..4e289d98a51f37042ff412c7987c386c0d644d27 100644 (file)
@@ -55,6 +55,7 @@ SQUIDCEXTERN void comm_init(void);
 SQUIDCEXTERN void comm_exit(void);
 
 SQUIDCEXTERN int comm_open(int, int, IpAddress &, int, const char *note);
+SQUIDCEXTERN int comm_open_uds(int sock_type, int proto, struct sockaddr_un* addr, int flags);
 
 /**
  * Open a port specially bound for listening or sending through a specific port.
index a0b903d1a3d43ebf4c2c9cc1566575757c311ba1..68a6db6cc53ab41d3a57b931f762dcd3c609b547 100644 (file)
--- a/src/fde.h
+++ b/src/fde.h
@@ -60,6 +60,7 @@ public:
     int sock_family;
     char ipaddr[MAX_IPSTRLEN];            /* dotted decimal address of peer */
     char desc[FD_DESC_SZ];
+    struct sockaddr_un unix_addr;         // store address of unix domain socket when it's needed
 
     struct {
         unsigned int open:1;
diff --git a/src/ipc/Coordinator.cc b/src/ipc/Coordinator.cc
new file mode 100644 (file)
index 0000000..3a11401
--- /dev/null
@@ -0,0 +1,63 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+
+#include "config.h"
+#include "ipc/Coordinator.h"
+
+
+CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
+
+
+Ipc::Coordinator::Coordinator():
+    Port(coordinatorPathAddr)
+{
+}
+
+void Ipc::Coordinator::start()
+{
+    listen();
+}
+
+Ipc::StrandData* Ipc::Coordinator::findStrand(int kidId)
+{
+    for (Vector<StrandData>::iterator iter = strands.begin(); iter != strands.end(); ++iter) {
+        if (iter->kidId == kidId)
+            return &(*iter);
+    }
+    return NULL;
+}
+
+void Ipc::Coordinator::enrollStrand(const StrandData& strand)
+{
+    if (StrandData* found = findStrand(strand.kidId))
+        *found = strand;
+    else
+        strands.push_back(strand);
+}
+
+void Ipc::Coordinator::handleRead(const Message& message)
+{
+    switch (message.type()) {
+    case mtRegister:
+        debugs(54, 6, HERE << "Registration request");
+        handleRegistrationRequest(message.strand());
+        break;
+
+    default:
+        debugs(54, 6, HERE << "Unhandled message of type: " << message.type());
+        break;
+    }
+}
+
+void Ipc::Coordinator::handleRegistrationRequest(const StrandData& strand)
+{
+    // register strand
+    enrollStrand(strand);
+    // send back received message
+    SendMessage(makeAddr(strandPathAddr, strand.kidId), Message(mtRegister, strand.kidId, strand.pid));
+}
diff --git a/src/ipc/Coordinator.h b/src/ipc/Coordinator.h
new file mode 100644 (file)
index 0000000..a3006fe
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_COORDINATOR_H
+#define SQUID_IPC_COORDINATOR_H
+
+
+#include "Array.h"
+#include "SquidString.h"
+#include "ipc/Port.h"
+
+
+namespace Ipc
+{
+
+
+/**
+  Coordinator processes incoming queries about registration of running squid instances
+  and store it KidIndentiers.
+*/
+class Coordinator: public Port, public RefCountable
+{
+private:
+    Coordinator(const Coordinator&); // not implemented
+    Coordinator& operator =(const Coordinator&); // not implemented
+
+public:
+    Coordinator();
+
+public:
+    virtual void start();
+
+private:
+    virtual void handleRead(const Message& message);
+    StrandData* findStrand(int kidId);
+    void enrollStrand(const StrandData& strand);
+    void handleRegistrationRequest(const StrandData& strand);
+
+private:
+    Vector<StrandData> strands;
+
+    CBDATA_CLASS2(Coordinator);
+};
+
+
+}
+
+#endif /* SQUID_IPC_COORDINATOR_H */
index cf97689b4525126c9d12bc2e3303ffa8e9cda2ec..98932b8aaca46f73dd6ad93577d1f2a048c75a96 100644 (file)
@@ -26,11 +26,16 @@ void Kids::init(size_t n)
 
     storage.reserve(n);
 
+    char kid_name[32];
     for (size_t i = 1; i <= n; ++i) {
-        char kid_name[32];
         snprintf(kid_name, sizeof(kid_name), "(squid-%d)", (int)i);
         storage.push_back(Kid(kid_name));
     }
+
+    if (n > 1) {
+        snprintf(kid_name, sizeof(kid_name), "(squid-coord-%d)", (int)(n + 1));
+        storage.push_back(Kid(kid_name));
+    }
 }
 
 /// returns kid by pid
index c2bcb17ad1ab73949e0bb32898fe42d8068355af..6e18342cfcf07ea622d6e4196b0e4cd518c51373 100644 (file)
@@ -7,4 +7,14 @@ libipc_la_SOURCES = \
        Kid.cc \
        Kid.h \
        Kids.cc \
-       Kids.h
+       Kids.h \
+       Coordinator.cc \
+       Coordinator.h \
+       UdsOp.cc \
+       UdsOp.h \
+       Port.cc \
+       Port.h \
+       Strand.cc \
+       Strand.h
+
+DEFS += -DDEFAULT_PREFIX=\"$(prefix)\"
diff --git a/src/ipc/Port.cc b/src/ipc/Port.cc
new file mode 100644 (file)
index 0000000..1dcd7ff
--- /dev/null
@@ -0,0 +1,51 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+
+#include "config.h"
+#include "ipc/Port.h"
+
+
+const char Ipc::coordinatorPathAddr[] = DEFAULT_PREFIX "/ipc/coordinator";
+const char Ipc::strandPathAddr[] = DEFAULT_PREFIX "/ipc/squid";
+
+
+Ipc::Port::Port(const String& aListenAddr):
+    UdsOp(aListenAddr),
+    listenAddr(aListenAddr)
+{
+    assert(listenAddr.size() > sizeof(DEFAULT_PREFIX));
+}
+
+void Ipc::Port::listen()
+{
+    debugs(54, 6, HERE);
+    AsyncCall::Pointer readHandler = asyncCall(54, 6, "Ipc::Port::noteRead",
+        CommCbMemFunT<Port, CommIoCbParams>(this, &Port::noteRead));
+    comm_read(fd(), message.rawData(), message.size(), readHandler);
+}
+
+String Ipc::Port::makeAddr(const char* pathAddr, int id) const
+{
+    assert(id >= 0);
+    String addr = pathAddr;
+    addr.append('-');
+    addr.append(xitoa(id));
+    return addr;
+}
+
+void Ipc::Port::noteRead(const CommIoCbParams& params)
+{
+    debugs(54, 6, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
+    assert(params.data == this);
+    if (params.flag == COMM_OK) {
+        assert(params.buf == (char*)&message);
+        assert(params.size == sizeof(Message));
+        handleRead(message);
+    }
+    listen();
+}
diff --git a/src/ipc/Port.h b/src/ipc/Port.h
new file mode 100644 (file)
index 0000000..00332d3
--- /dev/null
@@ -0,0 +1,53 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_PORT_H
+#define SQUID_IPC_PORT_H
+
+
+#include "SquidString.h"
+#include "ipc/UdsOp.h"
+
+
+namespace Ipc
+{
+
+
+/**
+   Base class implements functionality of local endpoint
+   is listening incoming connections
+*/
+class Port: public UdsOp
+{
+public:
+    Port(const String& aListenAddr);
+
+public:
+    /// start listening of incoming connections
+    void listen();
+    String makeAddr(const char* pathAddr, int id) const;
+
+protected:
+    virtual void handleRead(const Message& message) = 0;
+
+private:
+    void noteRead(const CommIoCbParams& params);
+
+private:
+    String listenAddr;
+    Message message;
+};
+
+
+extern const char coordinatorPathAddr[];
+extern const char strandPathAddr[];
+
+
+}
+
+
+#endif /* SQUID_IPC_PORT_H */
diff --git a/src/ipc/Strand.cc b/src/ipc/Strand.cc
new file mode 100644 (file)
index 0000000..510a219
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+
+#include "config.h"
+#include "ipc/Strand.h"
+#include "ipc/Kids.h"
+
+
+CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
+
+
+Ipc::Strand::Strand():
+    Port(makeAddr(strandPathAddr, KidIdentifier)),
+    isRegistered(false)
+{
+}
+
+void Ipc::Strand::start()
+{
+    listen();
+    setListenTimeout(&Strand::noteRegistrationTimeout, 6);
+    enroll();
+}
+
+void Ipc::Strand::enroll()
+{
+    debugs(54, 6, HERE);
+    assert(!registered());
+    SendMessage(coordinatorPathAddr, Message(mtRegister, KidIdentifier, getpid()));
+}
+
+void Ipc::Strand::handleRead(const Message& message)
+{
+    debugs(54, 6, HERE);
+    switch (message.type()) {
+
+    case mtRegister:
+        handleRegistrationResponse(message.strand());
+        break;
+
+    default:
+        debugs(54, 6, HERE << "Unhandled message of type: " << message.type());
+        break;
+    }
+}
+
+void Ipc::Strand::handleRegistrationResponse(const StrandData& strand)
+{
+    // handle registration respond from coordinator
+    // coordinator returns the same message
+    isRegistered = (strand.kidId == KidIdentifier && strand.pid == getpid());
+    debugs(54, 6, "Kid " << KidIdentifier << " is " << (char*)(isRegistered ? "" : "NOT ") << "registered");
+    setListenTimeout(NULL, -1);
+}
+
+void Ipc::Strand::setListenTimeout(TimeoutHandler timeoutHandler, int timeout)
+{
+    AsyncCall::Pointer listenTimeoutHandler = NULL;
+    if (timeout > 0) {
+        assert(timeoutHandler != NULL);
+        listenTimeoutHandler = asyncCall(54, 6, "Ipc::Strand::timeoutHandler",
+            CommCbMemFunT<Strand, CommTimeoutCbParams>(this, timeoutHandler));
+    }
+    setTimeout(listenTimeoutHandler, timeout);
+}
+
+void Ipc::Strand::noteRegistrationTimeout(const CommTimeoutCbParams& params)
+{
+    debugs(54, 6, HERE);
+    if (!registered()) {
+        debugs(54, 6, HERE << "Kid " << KidIdentifier << " is not registered");
+        exit(1);
+    }
+}
+
+bool Ipc::Strand::registered() const
+{
+    return isRegistered;
+}
diff --git a/src/ipc/Strand.h b/src/ipc/Strand.h
new file mode 100644 (file)
index 0000000..785e145
--- /dev/null
@@ -0,0 +1,58 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_STRAND_H
+#define SQUID_IPC_STRAND_H
+
+
+#include "SquidString.h"
+#include "ipc/Port.h"
+
+
+namespace Ipc
+{
+
+
+/**
+  Strand implement functionality of Coordinator's client and
+  send registration query to Coordinator with it KidIdentifier.
+*/
+class Strand: public Port, public RefCountable
+{
+private:
+    typedef void (Strand::*TimeoutHandler)(const CommTimeoutCbParams&);
+
+private:
+    Strand(const Strand&); // not implemented
+    Strand& operator =(const Strand&); // not implemented
+
+public:
+    Strand();
+
+public:
+    virtual void start();
+    /// send register query
+    void enroll();
+    bool registered() const;
+
+private:
+    virtual void handleRead(const Message& message);
+    void handleRegistrationResponse(const StrandData& strand);
+    void setListenTimeout(TimeoutHandler timeoutHandler, int timeout);
+    void noteRegistrationTimeout(const CommTimeoutCbParams& params);
+
+private:
+    bool isRegistered;
+
+    CBDATA_CLASS2(Strand);
+};
+
+
+}
+
+
+#endif /* SQUID_IPC_STRAND_H */
diff --git a/src/ipc/UdsOp.cc b/src/ipc/UdsOp.cc
new file mode 100644 (file)
index 0000000..0fdcd4e
--- /dev/null
@@ -0,0 +1,156 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+
+#include "config.h"
+#include "comm.h"
+#include "ipc/UdsOp.h"
+
+#define SEND_RETRIES 4
+#define SEND_TIMEOUT 4
+
+Ipc::Message::Message():
+    data()
+{
+}
+
+Ipc::Message::Message(MessageType messageType, int kidId, pid_t pid)
+{
+    data.messageType = messageType;
+    data.strand.kidId = kidId;
+    data.strand.pid = pid;
+}
+
+Ipc::MessageType Ipc::Message::type() const
+{
+    return data.messageType;
+}
+
+const Ipc::StrandData& Ipc::Message::strand() const
+{
+    return data.strand;
+}
+
+char* Ipc::Message::rawData()
+{
+    return (char*)&data;
+}
+
+size_t Ipc::Message::size()
+{
+    return sizeof(data);
+}
+
+
+Ipc::UdsOp::UdsOp(const String& pathAddr, bool bind /* = true */):
+    AsyncJob("Ipc::UdsOp"),
+    addr(setAddr(pathAddr)),
+    options(COMM_NONBLOCKING),
+    fd_(-1)
+{
+    debugs(54, 5, HERE << '[' << this << "] pathAddr " << pathAddr);
+    if (bind) {
+        unlink(pathAddr.termedBuf());
+        options |= COMM_DOBIND;
+    }
+}
+
+Ipc::UdsOp::~UdsOp()
+{
+    debugs(54, 5, HERE << '[' << this << ']');
+    if (fd_ > 0)
+        comm_close(fd_);
+}
+
+bool Ipc::UdsOp::doneAll() const
+{
+    return false;
+}
+
+int Ipc::UdsOp::fd()
+{
+    if (fd_ < 0) {
+        fd_ = comm_open_uds(SOCK_DGRAM, 0, &addr, options);
+        Must(fd_ > 0);
+    }
+    return fd_;
+}
+
+struct sockaddr_un Ipc::UdsOp::setAddr(const String& pathAddr)
+{
+    assert(pathAddr.size() != 0);
+    struct sockaddr_un unixAddr;
+    memset(&unixAddr, 0, sizeof(unixAddr));
+    unixAddr.sun_family = AF_LOCAL;
+    xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path));
+    return unixAddr;
+}
+
+void Ipc::UdsOp::setTimeout(AsyncCall::Pointer& timeoutHandler, int timeout)
+{
+    commSetTimeout(fd(), timeout, timeoutHandler);
+}
+
+
+CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender);
+
+Ipc::UdsSender::UdsSender(const String& pathAddr, const Message& aMessage):
+    UdsOp(pathAddr, false),
+    message(aMessage),
+    retries(SEND_RETRIES),
+    timeout(SEND_TIMEOUT)
+{
+    assert(retries > 0);
+    assert(timeout >= 0);
+}
+
+void Ipc::UdsSender::start()
+{
+    write();
+    if (timeout > 0)
+    {
+        AsyncCall::Pointer timeoutHandler = asyncCall(54, 5, "Ipc::UdsSender::noteTimeout",
+            CommCbMemFunT<UdsSender, CommTimeoutCbParams>(this, &UdsSender::noteTimeout));
+        setTimeout(timeoutHandler, timeout);
+    }
+}
+
+bool Ipc::UdsSender::retry()
+{
+    if (retries > 0)
+        --retries;
+    return retries != 0;
+}
+
+void Ipc::UdsSender::write()
+{
+    debugs(54, 5, HERE);
+    AsyncCall::Pointer writeHandler = asyncCall(54, 5, "Ipc::UdsSender::noteWrite",
+        CommCbMemFunT<UdsSender, CommIoCbParams>(this, &UdsSender::noteWrite));
+    comm_write(fd(), message.rawData(), message.size(), writeHandler);
+}
+
+void Ipc::UdsSender::noteWrite(const CommIoCbParams& params)
+{
+    debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
+    if (params.flag == COMM_OK || !retry())
+        mustStop("done");
+    else
+        write();
+}
+
+void Ipc::UdsSender::noteTimeout(const CommTimeoutCbParams& params)
+{
+    debugs(54, 5, HERE);
+    mustStop("done");
+}
+
+
+void Ipc::SendMessage(const String& toAddress, const Message& message)
+{
+    AsyncJob::AsyncStart(new UdsSender(toAddress, message));
+}
diff --git a/src/ipc/UdsOp.h b/src/ipc/UdsOp.h
new file mode 100644 (file)
index 0000000..66c4ed4
--- /dev/null
@@ -0,0 +1,119 @@
+/*
+ * $Id$
+ *
+ * DEBUG: section 54    Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_ASYNCUDSOP_H
+#define SQUID_IPC_ASYNCUDSOP_H
+
+
+#include "SquidString.h"
+#include "CommCalls.h"
+
+
+namespace Ipc
+{
+
+
+typedef enum {mtNone = 0, mtRegister} MessageType;
+
+/**
+   This one contains a registration info
+*/
+struct StrandData
+{
+    int   kidId;
+    pid_t pid;
+};
+
+/**
+  This class contains data is used by UdsSender/UdsReceiver.
+*/
+class Message
+{
+public:
+    Message();
+    Message(MessageType messageType, int kidId, pid_t pid);
+
+public:
+    MessageType type() const;
+    const StrandData& strand() const;
+    char*  rawData();
+    size_t size();
+
+private:
+    struct {
+        MessageType messageType;
+        StrandData  strand;
+    } data;
+};
+
+/**
+  UdsOp implements common async UDS operation.
+*/
+class UdsOp: public AsyncJob
+{
+private:
+    UdsOp(const UdsOp&); // not implemented
+    UdsOp& operator= (const UdsOp&); // not implemented
+
+public:
+    UdsOp(const String& pathAddr, bool bind = true);
+    virtual ~UdsOp();
+
+protected:
+    /// return an endpoint for communication, use fd() instead of fd_
+    int fd();
+    virtual bool doneAll() const;
+    void setTimeout(AsyncCall::Pointer& timeoutHandler, int aTimeout);
+
+private:
+    struct sockaddr_un setAddr(const String& pathAddr);
+
+private:
+    struct sockaddr_un addr;
+    int  options;
+    int  fd_;
+};
+
+/**
+  Implement async write operation for UDS
+*/
+class UdsSender: public UdsOp
+{
+private:
+    UdsSender(const UdsSender&); // not implemented
+    UdsSender& operator= (const UdsSender&); // not implemented
+
+public:
+    UdsSender(const String& pathAddr, const Message& aMessage);
+
+public:
+    /// start writing data
+    virtual void start();
+
+private:
+    /// update retries counter and check
+    bool retry();
+    /// schedule writing
+    void write();
+    void noteWrite(const CommIoCbParams& params);
+    void noteTimeout(const CommTimeoutCbParams& params);
+
+private:
+    Message message;
+    int retries;
+    int timeout;
+
+    CBDATA_CLASS2(UdsSender);
+};
+
+
+void SendMessage(const String& toAddress, const Message& message);
+
+
+}
+
+#endif /* SQUID_IPC_ASYNCUDSOP_H */
index 46d56db140753590117be94f6a156d3c2cbdce73..7a07e1df8b04706e960e4b9d4e84d3aafe482319 100644 (file)
@@ -56,6 +56,8 @@
 #include "DiskIO/DiskIOModule.h"
 #include "comm.h"
 #include "ipc/Kids.h"
+#include "ipc/Coordinator.h"
+#include "ipc/Strand.h"
 #if USE_EPOLL
 #include "comm_epoll.h"
 #endif
@@ -1396,6 +1398,13 @@ SquidMain(int argc, char **argv)
 
     mainLoop.setTimeService(&time_engine);
 
+    if (!opt_no_daemon && Config.main_processes > 1) {
+        if (KidIdentifier == Config.main_processes + 1)
+            AsyncJob::AsyncStart(new Ipc::Coordinator);
+        else if (KidIdentifier != 0)
+            AsyncJob::AsyncStart(new Ipc::Strand);
+    }
+
     /* at this point we are finished the synchronous startup. */
     starting_up = 0;
 
@@ -1606,7 +1615,7 @@ watch_child(char *argv[])
         mainStartScript(argv[0]);
 
         // start each kid that needs to be [re]started; once
-        for (size_t i = 0; i < TheKids.count(); ++i) {
+        for (int i = TheKids.count() - 1; i >= 0; --i) {
             Kid& kid = TheKids.get(i);
             if (kid.hopeless() || kid.exitedHappy() || kid.running())
                 continue;