From: root Date: Fri, 16 Apr 2010 11:06:18 +0000 (+0700) Subject: IPC patch X-Git-Tag: SQUID_3_2_0_1~93^2~32^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=10cefb7b033e060e72da03da435430ae3f707cf2;p=thirdparty%2Fsquid.git IPC patch --- diff --git a/src/comm.cc b/src/comm.cc index 1e8ca109ad..32f4a11f4b 100644 --- a/src/comm.cc +++ b/src/comm.cc @@ -1883,7 +1883,11 @@ commHandleWrite(int fd, void *data) (long int) state->offset << ", sz " << (long int) state->size << "."); nleft = state->size - state->offset; - len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); + // XXX: if we use UDS and it is not connected then 'sendto' will be called. + if (!fd_table[fd].flags.called_connect && fd_table[fd].unix_addr.sun_family == AF_LOCAL) + len = sendto(fd, state->buf + state->offset, nleft, 0, (sockaddr*)&fd_table[fd].unix_addr, SUN_LEN(&fd_table[fd].unix_addr)); + else + len = FD_WRITE_METHOD(fd, state->buf + state->offset, nleft); debugs(5, 5, "commHandleWrite: write() returns " << len); fd_bytes(fd, len, FD_WRITE); statCounter.syscalls.sock.writes++; @@ -2409,3 +2413,99 @@ CommSelectEngine::checkEvents(int timeout) return EVENT_ERROR; }; } + +/** + * Create a unix-domain socket + */ +int +comm_open_uds(int sock_type, + int proto, + struct sockaddr_un* addr, + int flags) +{ + int new_socket; + struct addrinfo AI; + + PROF_start(comm_open); + + /* Create socket for accepting new connections. */ + statCounter.syscalls.sock.sockets++; + + /* Setup the socket addrinfo details for use */ + AI.ai_flags = 0; + AI.ai_family = PF_UNIX; + AI.ai_socktype = sock_type; + AI.ai_protocol = proto; + AI.ai_addrlen = SUN_LEN(addr); + AI.ai_addr = (sockaddr*)addr; + AI.ai_canonname = NULL; + AI.ai_next = NULL; + + debugs(50, 3, "comm_openex: Attempt open socket for: " << addr->sun_path); + + if ((new_socket = socket(AI.ai_family, AI.ai_socktype, AI.ai_protocol)) < 0) { + /* Increase the number of reserved fd's if calls to socket() + * are failing because the open file table is full. This + * limits the number of simultaneous clients */ + + if (limitError(errno)) { + debugs(50, DBG_IMPORTANT, "comm_open: socket failure: " << xstrerror()); + fdAdjustReserved(); + } else { + debugs(50, DBG_CRITICAL, "comm_open: socket failure: " << xstrerror()); + } + + PROF_stop(comm_open); + return -1; + } + + debugs(50, 3, "comm_openex: Opened socket FD " << new_socket << " : family=" << AI.ai_family << ", type=" << AI.ai_socktype << ", protocol=" << AI.ai_protocol ); + + /* update fdstat */ + debugs(50, 5, "comm_open: FD " << new_socket << " is a new socket"); + + assert(!isOpen(new_socket)); + + fd_open(new_socket, FD_SOCKET, NULL); + fd_table[new_socket].sock_family = AI.ai_family; + + fd_table[new_socket].unix_addr = *addr; + + fdd_table[new_socket].close_file = NULL; + fdd_table[new_socket].close_line = 0; + + if (!(flags & COMM_NOCLOEXEC)) + commSetCloseOnExec(new_socket); + + if (flags & COMM_REUSEADDR) + commSetReuseAddr(new_socket); + + if (flags & COMM_NONBLOCKING) { + if (commSetNonBlocking(new_socket) != COMM_OK) { + comm_close(new_socket); + PROF_stop(comm_open); + return -1; + } + } + + if (flags & COMM_DOBIND) { + if (commBind(new_socket, AI) != COMM_OK) { + comm_close(new_socket); + PROF_stop(comm_open); + return -1; + } + } + +#ifdef TCP_NODELAY + if (sock_type == SOCK_STREAM) + commSetTcpNoDelay(new_socket); + +#endif + + if (Config.tcpRcvBufsz > 0 && sock_type == SOCK_STREAM) + commSetTcpRcvbuf(new_socket, Config.tcpRcvBufsz); + + PROF_stop(comm_open); + + return new_socket; +} diff --git a/src/comm.h b/src/comm.h index e9c13cc9a5..4e289d98a5 100644 --- a/src/comm.h +++ b/src/comm.h @@ -55,6 +55,7 @@ SQUIDCEXTERN void comm_init(void); SQUIDCEXTERN void comm_exit(void); SQUIDCEXTERN int comm_open(int, int, IpAddress &, int, const char *note); +SQUIDCEXTERN int comm_open_uds(int sock_type, int proto, struct sockaddr_un* addr, int flags); /** * Open a port specially bound for listening or sending through a specific port. diff --git a/src/fde.h b/src/fde.h index a0b903d1a3..68a6db6cc5 100644 --- a/src/fde.h +++ b/src/fde.h @@ -60,6 +60,7 @@ public: int sock_family; char ipaddr[MAX_IPSTRLEN]; /* dotted decimal address of peer */ char desc[FD_DESC_SZ]; + struct sockaddr_un unix_addr; // store address of unix domain socket when it's needed struct { unsigned int open:1; diff --git a/src/ipc/Coordinator.cc b/src/ipc/Coordinator.cc new file mode 100644 index 0000000000..3a11401fce --- /dev/null +++ b/src/ipc/Coordinator.cc @@ -0,0 +1,63 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + + +#include "config.h" +#include "ipc/Coordinator.h" + + +CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator); + + +Ipc::Coordinator::Coordinator(): + Port(coordinatorPathAddr) +{ +} + +void Ipc::Coordinator::start() +{ + listen(); +} + +Ipc::StrandData* Ipc::Coordinator::findStrand(int kidId) +{ + for (Vector::iterator iter = strands.begin(); iter != strands.end(); ++iter) { + if (iter->kidId == kidId) + return &(*iter); + } + return NULL; +} + +void Ipc::Coordinator::enrollStrand(const StrandData& strand) +{ + if (StrandData* found = findStrand(strand.kidId)) + *found = strand; + else + strands.push_back(strand); +} + +void Ipc::Coordinator::handleRead(const Message& message) +{ + switch (message.type()) { + case mtRegister: + debugs(54, 6, HERE << "Registration request"); + handleRegistrationRequest(message.strand()); + break; + + default: + debugs(54, 6, HERE << "Unhandled message of type: " << message.type()); + break; + } +} + +void Ipc::Coordinator::handleRegistrationRequest(const StrandData& strand) +{ + // register strand + enrollStrand(strand); + // send back received message + SendMessage(makeAddr(strandPathAddr, strand.kidId), Message(mtRegister, strand.kidId, strand.pid)); +} diff --git a/src/ipc/Coordinator.h b/src/ipc/Coordinator.h new file mode 100644 index 0000000000..a3006fe907 --- /dev/null +++ b/src/ipc/Coordinator.h @@ -0,0 +1,52 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#ifndef SQUID_IPC_COORDINATOR_H +#define SQUID_IPC_COORDINATOR_H + + +#include "Array.h" +#include "SquidString.h" +#include "ipc/Port.h" + + +namespace Ipc +{ + + +/** + Coordinator processes incoming queries about registration of running squid instances + and store it KidIndentiers. +*/ +class Coordinator: public Port, public RefCountable +{ +private: + Coordinator(const Coordinator&); // not implemented + Coordinator& operator =(const Coordinator&); // not implemented + +public: + Coordinator(); + +public: + virtual void start(); + +private: + virtual void handleRead(const Message& message); + StrandData* findStrand(int kidId); + void enrollStrand(const StrandData& strand); + void handleRegistrationRequest(const StrandData& strand); + +private: + Vector strands; + + CBDATA_CLASS2(Coordinator); +}; + + +} + +#endif /* SQUID_IPC_COORDINATOR_H */ diff --git a/src/ipc/Kids.cc b/src/ipc/Kids.cc index cf97689b45..98932b8aac 100644 --- a/src/ipc/Kids.cc +++ b/src/ipc/Kids.cc @@ -26,11 +26,16 @@ void Kids::init(size_t n) storage.reserve(n); + char kid_name[32]; for (size_t i = 1; i <= n; ++i) { - char kid_name[32]; snprintf(kid_name, sizeof(kid_name), "(squid-%d)", (int)i); storage.push_back(Kid(kid_name)); } + + if (n > 1) { + snprintf(kid_name, sizeof(kid_name), "(squid-coord-%d)", (int)(n + 1)); + storage.push_back(Kid(kid_name)); + } } /// returns kid by pid diff --git a/src/ipc/Makefile.am b/src/ipc/Makefile.am index c2bcb17ad1..6e18342cfc 100644 --- a/src/ipc/Makefile.am +++ b/src/ipc/Makefile.am @@ -7,4 +7,14 @@ libipc_la_SOURCES = \ Kid.cc \ Kid.h \ Kids.cc \ - Kids.h + Kids.h \ + Coordinator.cc \ + Coordinator.h \ + UdsOp.cc \ + UdsOp.h \ + Port.cc \ + Port.h \ + Strand.cc \ + Strand.h + +DEFS += -DDEFAULT_PREFIX=\"$(prefix)\" diff --git a/src/ipc/Port.cc b/src/ipc/Port.cc new file mode 100644 index 0000000000..1dcd7ff654 --- /dev/null +++ b/src/ipc/Port.cc @@ -0,0 +1,51 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + + +#include "config.h" +#include "ipc/Port.h" + + +const char Ipc::coordinatorPathAddr[] = DEFAULT_PREFIX "/ipc/coordinator"; +const char Ipc::strandPathAddr[] = DEFAULT_PREFIX "/ipc/squid"; + + +Ipc::Port::Port(const String& aListenAddr): + UdsOp(aListenAddr), + listenAddr(aListenAddr) +{ + assert(listenAddr.size() > sizeof(DEFAULT_PREFIX)); +} + +void Ipc::Port::listen() +{ + debugs(54, 6, HERE); + AsyncCall::Pointer readHandler = asyncCall(54, 6, "Ipc::Port::noteRead", + CommCbMemFunT(this, &Port::noteRead)); + comm_read(fd(), message.rawData(), message.size(), readHandler); +} + +String Ipc::Port::makeAddr(const char* pathAddr, int id) const +{ + assert(id >= 0); + String addr = pathAddr; + addr.append('-'); + addr.append(xitoa(id)); + return addr; +} + +void Ipc::Port::noteRead(const CommIoCbParams& params) +{ + debugs(54, 6, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']'); + assert(params.data == this); + if (params.flag == COMM_OK) { + assert(params.buf == (char*)&message); + assert(params.size == sizeof(Message)); + handleRead(message); + } + listen(); +} diff --git a/src/ipc/Port.h b/src/ipc/Port.h new file mode 100644 index 0000000000..00332d33d1 --- /dev/null +++ b/src/ipc/Port.h @@ -0,0 +1,53 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#ifndef SQUID_IPC_PORT_H +#define SQUID_IPC_PORT_H + + +#include "SquidString.h" +#include "ipc/UdsOp.h" + + +namespace Ipc +{ + + +/** + Base class implements functionality of local endpoint + is listening incoming connections +*/ +class Port: public UdsOp +{ +public: + Port(const String& aListenAddr); + +public: + /// start listening of incoming connections + void listen(); + String makeAddr(const char* pathAddr, int id) const; + +protected: + virtual void handleRead(const Message& message) = 0; + +private: + void noteRead(const CommIoCbParams& params); + +private: + String listenAddr; + Message message; +}; + + +extern const char coordinatorPathAddr[]; +extern const char strandPathAddr[]; + + +} + + +#endif /* SQUID_IPC_PORT_H */ diff --git a/src/ipc/Strand.cc b/src/ipc/Strand.cc new file mode 100644 index 0000000000..510a219561 --- /dev/null +++ b/src/ipc/Strand.cc @@ -0,0 +1,84 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + + +#include "config.h" +#include "ipc/Strand.h" +#include "ipc/Kids.h" + + +CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand); + + +Ipc::Strand::Strand(): + Port(makeAddr(strandPathAddr, KidIdentifier)), + isRegistered(false) +{ +} + +void Ipc::Strand::start() +{ + listen(); + setListenTimeout(&Strand::noteRegistrationTimeout, 6); + enroll(); +} + +void Ipc::Strand::enroll() +{ + debugs(54, 6, HERE); + assert(!registered()); + SendMessage(coordinatorPathAddr, Message(mtRegister, KidIdentifier, getpid())); +} + +void Ipc::Strand::handleRead(const Message& message) +{ + debugs(54, 6, HERE); + switch (message.type()) { + + case mtRegister: + handleRegistrationResponse(message.strand()); + break; + + default: + debugs(54, 6, HERE << "Unhandled message of type: " << message.type()); + break; + } +} + +void Ipc::Strand::handleRegistrationResponse(const StrandData& strand) +{ + // handle registration respond from coordinator + // coordinator returns the same message + isRegistered = (strand.kidId == KidIdentifier && strand.pid == getpid()); + debugs(54, 6, "Kid " << KidIdentifier << " is " << (char*)(isRegistered ? "" : "NOT ") << "registered"); + setListenTimeout(NULL, -1); +} + +void Ipc::Strand::setListenTimeout(TimeoutHandler timeoutHandler, int timeout) +{ + AsyncCall::Pointer listenTimeoutHandler = NULL; + if (timeout > 0) { + assert(timeoutHandler != NULL); + listenTimeoutHandler = asyncCall(54, 6, "Ipc::Strand::timeoutHandler", + CommCbMemFunT(this, timeoutHandler)); + } + setTimeout(listenTimeoutHandler, timeout); +} + +void Ipc::Strand::noteRegistrationTimeout(const CommTimeoutCbParams& params) +{ + debugs(54, 6, HERE); + if (!registered()) { + debugs(54, 6, HERE << "Kid " << KidIdentifier << " is not registered"); + exit(1); + } +} + +bool Ipc::Strand::registered() const +{ + return isRegistered; +} diff --git a/src/ipc/Strand.h b/src/ipc/Strand.h new file mode 100644 index 0000000000..785e1451fc --- /dev/null +++ b/src/ipc/Strand.h @@ -0,0 +1,58 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#ifndef SQUID_IPC_STRAND_H +#define SQUID_IPC_STRAND_H + + +#include "SquidString.h" +#include "ipc/Port.h" + + +namespace Ipc +{ + + +/** + Strand implement functionality of Coordinator's client and + send registration query to Coordinator with it KidIdentifier. +*/ +class Strand: public Port, public RefCountable +{ +private: + typedef void (Strand::*TimeoutHandler)(const CommTimeoutCbParams&); + +private: + Strand(const Strand&); // not implemented + Strand& operator =(const Strand&); // not implemented + +public: + Strand(); + +public: + virtual void start(); + /// send register query + void enroll(); + bool registered() const; + +private: + virtual void handleRead(const Message& message); + void handleRegistrationResponse(const StrandData& strand); + void setListenTimeout(TimeoutHandler timeoutHandler, int timeout); + void noteRegistrationTimeout(const CommTimeoutCbParams& params); + +private: + bool isRegistered; + + CBDATA_CLASS2(Strand); +}; + + +} + + +#endif /* SQUID_IPC_STRAND_H */ diff --git a/src/ipc/UdsOp.cc b/src/ipc/UdsOp.cc new file mode 100644 index 0000000000..0fdcd4eecc --- /dev/null +++ b/src/ipc/UdsOp.cc @@ -0,0 +1,156 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + + +#include "config.h" +#include "comm.h" +#include "ipc/UdsOp.h" + +#define SEND_RETRIES 4 +#define SEND_TIMEOUT 4 + +Ipc::Message::Message(): + data() +{ +} + +Ipc::Message::Message(MessageType messageType, int kidId, pid_t pid) +{ + data.messageType = messageType; + data.strand.kidId = kidId; + data.strand.pid = pid; +} + +Ipc::MessageType Ipc::Message::type() const +{ + return data.messageType; +} + +const Ipc::StrandData& Ipc::Message::strand() const +{ + return data.strand; +} + +char* Ipc::Message::rawData() +{ + return (char*)&data; +} + +size_t Ipc::Message::size() +{ + return sizeof(data); +} + + +Ipc::UdsOp::UdsOp(const String& pathAddr, bool bind /* = true */): + AsyncJob("Ipc::UdsOp"), + addr(setAddr(pathAddr)), + options(COMM_NONBLOCKING), + fd_(-1) +{ + debugs(54, 5, HERE << '[' << this << "] pathAddr " << pathAddr); + if (bind) { + unlink(pathAddr.termedBuf()); + options |= COMM_DOBIND; + } +} + +Ipc::UdsOp::~UdsOp() +{ + debugs(54, 5, HERE << '[' << this << ']'); + if (fd_ > 0) + comm_close(fd_); +} + +bool Ipc::UdsOp::doneAll() const +{ + return false; +} + +int Ipc::UdsOp::fd() +{ + if (fd_ < 0) { + fd_ = comm_open_uds(SOCK_DGRAM, 0, &addr, options); + Must(fd_ > 0); + } + return fd_; +} + +struct sockaddr_un Ipc::UdsOp::setAddr(const String& pathAddr) +{ + assert(pathAddr.size() != 0); + struct sockaddr_un unixAddr; + memset(&unixAddr, 0, sizeof(unixAddr)); + unixAddr.sun_family = AF_LOCAL; + xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path)); + return unixAddr; +} + +void Ipc::UdsOp::setTimeout(AsyncCall::Pointer& timeoutHandler, int timeout) +{ + commSetTimeout(fd(), timeout, timeoutHandler); +} + + +CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender); + +Ipc::UdsSender::UdsSender(const String& pathAddr, const Message& aMessage): + UdsOp(pathAddr, false), + message(aMessage), + retries(SEND_RETRIES), + timeout(SEND_TIMEOUT) +{ + assert(retries > 0); + assert(timeout >= 0); +} + +void Ipc::UdsSender::start() +{ + write(); + if (timeout > 0) + { + AsyncCall::Pointer timeoutHandler = asyncCall(54, 5, "Ipc::UdsSender::noteTimeout", + CommCbMemFunT(this, &UdsSender::noteTimeout)); + setTimeout(timeoutHandler, timeout); + } +} + +bool Ipc::UdsSender::retry() +{ + if (retries > 0) + --retries; + return retries != 0; +} + +void Ipc::UdsSender::write() +{ + debugs(54, 5, HERE); + AsyncCall::Pointer writeHandler = asyncCall(54, 5, "Ipc::UdsSender::noteWrite", + CommCbMemFunT(this, &UdsSender::noteWrite)); + comm_write(fd(), message.rawData(), message.size(), writeHandler); +} + +void Ipc::UdsSender::noteWrite(const CommIoCbParams& params) +{ + debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']'); + if (params.flag == COMM_OK || !retry()) + mustStop("done"); + else + write(); +} + +void Ipc::UdsSender::noteTimeout(const CommTimeoutCbParams& params) +{ + debugs(54, 5, HERE); + mustStop("done"); +} + + +void Ipc::SendMessage(const String& toAddress, const Message& message) +{ + AsyncJob::AsyncStart(new UdsSender(toAddress, message)); +} diff --git a/src/ipc/UdsOp.h b/src/ipc/UdsOp.h new file mode 100644 index 0000000000..66c4ed4875 --- /dev/null +++ b/src/ipc/UdsOp.h @@ -0,0 +1,119 @@ +/* + * $Id$ + * + * DEBUG: section 54 Interprocess Communication + * + */ + +#ifndef SQUID_IPC_ASYNCUDSOP_H +#define SQUID_IPC_ASYNCUDSOP_H + + +#include "SquidString.h" +#include "CommCalls.h" + + +namespace Ipc +{ + + +typedef enum {mtNone = 0, mtRegister} MessageType; + +/** + This one contains a registration info +*/ +struct StrandData +{ + int kidId; + pid_t pid; +}; + +/** + This class contains data is used by UdsSender/UdsReceiver. +*/ +class Message +{ +public: + Message(); + Message(MessageType messageType, int kidId, pid_t pid); + +public: + MessageType type() const; + const StrandData& strand() const; + char* rawData(); + size_t size(); + +private: + struct { + MessageType messageType; + StrandData strand; + } data; +}; + +/** + UdsOp implements common async UDS operation. +*/ +class UdsOp: public AsyncJob +{ +private: + UdsOp(const UdsOp&); // not implemented + UdsOp& operator= (const UdsOp&); // not implemented + +public: + UdsOp(const String& pathAddr, bool bind = true); + virtual ~UdsOp(); + +protected: + /// return an endpoint for communication, use fd() instead of fd_ + int fd(); + virtual bool doneAll() const; + void setTimeout(AsyncCall::Pointer& timeoutHandler, int aTimeout); + +private: + struct sockaddr_un setAddr(const String& pathAddr); + +private: + struct sockaddr_un addr; + int options; + int fd_; +}; + +/** + Implement async write operation for UDS +*/ +class UdsSender: public UdsOp +{ +private: + UdsSender(const UdsSender&); // not implemented + UdsSender& operator= (const UdsSender&); // not implemented + +public: + UdsSender(const String& pathAddr, const Message& aMessage); + +public: + /// start writing data + virtual void start(); + +private: + /// update retries counter and check + bool retry(); + /// schedule writing + void write(); + void noteWrite(const CommIoCbParams& params); + void noteTimeout(const CommTimeoutCbParams& params); + +private: + Message message; + int retries; + int timeout; + + CBDATA_CLASS2(UdsSender); +}; + + +void SendMessage(const String& toAddress, const Message& message); + + +} + +#endif /* SQUID_IPC_ASYNCUDSOP_H */ diff --git a/src/main.cc b/src/main.cc index 46d56db140..7a07e1df8b 100644 --- a/src/main.cc +++ b/src/main.cc @@ -56,6 +56,8 @@ #include "DiskIO/DiskIOModule.h" #include "comm.h" #include "ipc/Kids.h" +#include "ipc/Coordinator.h" +#include "ipc/Strand.h" #if USE_EPOLL #include "comm_epoll.h" #endif @@ -1396,6 +1398,13 @@ SquidMain(int argc, char **argv) mainLoop.setTimeService(&time_engine); + if (!opt_no_daemon && Config.main_processes > 1) { + if (KidIdentifier == Config.main_processes + 1) + AsyncJob::AsyncStart(new Ipc::Coordinator); + else if (KidIdentifier != 0) + AsyncJob::AsyncStart(new Ipc::Strand); + } + /* at this point we are finished the synchronous startup. */ starting_up = 0; @@ -1606,7 +1615,7 @@ watch_child(char *argv[]) mainStartScript(argv[0]); // start each kid that needs to be [re]started; once - for (size_t i = 0; i < TheKids.count(); ++i) { + for (int i = TheKids.count() - 1; i >= 0; --i) { Kid& kid = TheKids.get(i); if (kid.hopeless() || kid.exitedHappy() || kid.running()) continue;