acl/libstate.la \
auth/libauth.la \
acl/libapi.la \
+ base/libbase.la \
+ libsquid.la \
ip/libip.la \
- fs/libfs.la
+ fs/libfs.la \
+ ipc/libipc.la
EXTRA_PROGRAMS = \
DiskIO/DiskDaemon/diskd \
Whether to lookup the EUI or MAC address of a connected client.
DOC_END
+ NAME: max_filedescriptors max_filedesc
+ TYPE: int
+ DEFAULT: 0
+ LOC: Config.max_filedescriptors
+ DOC_START
+ The maximum number of filedescriptors supported.
+
+ The default "0" means Squid inherits the current ulimit setting.
+
+ Note: Changing this requires a restart of Squid. Also
+ not all comm loops supports large values.
+ DOC_END
+
+NAME: workers
+TYPE: int
+LOC: Config.workers
+DEFAULT: 1
+DOC_START
+ Number of main Squid processes or "workers" to fork and maintain.
+ 0: "no daemon" mode, like running "squid -N ..."
+ 1: "no SMP" mode, start one main Squid process daemon (default)
+ N: start N main Squid process daemons (i.e., SMP mode)
+
+ In SMP mode, each worker does nearly all what a single Squid daemon
+ does (e.g., listen on http_port and forward HTTP requests).
+DOC_END
+
EOF
#include "clientStream.h"
#include "comm.h"
#include "comm/ListenStateData.h"
++#include "base/TextException.h"
#include "ConnectionDetail.h"
#include "eui/Config.h"
#include "fde.h"
#include "HttpRequest.h"
#include "ident/Config.h"
#include "ident/Ident.h"
- #include "ip/IpIntercept.h"
+ #include "ip/Intercept.h"
+#include "ipc/StartListening.h"
#include "MemBuf.h"
#include "MemObject.h"
#include "ProtoPort.h"
static void commStopHalfClosedMonitor(int fd);
static IOCB commHalfClosedReader;
- static void comm_init_opened(int new_socket, IpAddress &addr, unsigned char TOS, const char *note, struct addrinfo *AI);
- static int comm_apply_flags(int new_socket, IpAddress &addr, int flags, struct addrinfo *AI);
++static void comm_init_opened(int new_socket, Ip::Address &addr, unsigned char TOS, const char *note, struct addrinfo *AI);
++static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI);
struct comm_io_callback_t {
#endif
- IpAddress &addr,
+ comm_init_opened(new_socket, addr, TOS, note, AI);
+ new_socket = comm_apply_flags(new_socket, addr, flags, AI);
+
+ addr.FreeAddrInfo(AI);
+
+ PROF_stop(comm_open);
+
+ return new_socket;
+}
+
+/// update FD tables after a local or remote (IPC) comm_openex();
+void
+comm_init_opened(int new_socket,
++ Ip::Address &addr,
+ unsigned char TOS,
+ const char *note,
+ struct addrinfo *AI)
+{
+ assert(new_socket >= 0);
+ assert(AI);
+
+ fde *F = NULL;
+
/* update fdstat */
debugs(5, 5, "comm_open: FD " << new_socket << " is a new socket");
F->tos = TOS;
F->sock_family = AI->ai_family;
- IpAddress &addr,
+}
+
+/// apply flags after a local comm_open*() call;
+/// returns new_socket or -1 on error
+static int
+comm_apply_flags(int new_socket,
++ Ip::Address &addr,
+ int flags,
+ struct addrinfo *AI)
+{
+ assert(new_socket >= 0);
+ assert(AI);
+ const int sock_type = AI->ai_socktype;
if (!(flags & COMM_NOCLOEXEC))
commSetCloseOnExec(new_socket);
return new_socket;
}
- IpAddress &addr,
+void
+comm_import_opened(int fd,
++ Ip::Address &addr,
+ int flags,
+ const char *note,
+ struct addrinfo *AI)
+{
+ debugs(5, 2, HERE << " FD " << fd << " at " << addr);
+ assert(fd >= 0);
+ assert(AI);
+
+ comm_init_opened(fd, addr, 0, note, AI);
+
+ if (!(flags & COMM_NOCLOEXEC))
+ fd_table[fd].flags.close_on_exec = 1;
+
+ if (addr.GetPort() > (u_short) 0) {
+#ifdef _SQUID_MSWIN_
+ if (sock_type != SOCK_DGRAM)
+#endif
+ fd_table[fd].flags.nolinger = 1;
+ }
+
+ if ((flags & COMM_TRANSPARENT))
+ fd_table[fd].flags.transparent = 1;
+
+ if (flags & COMM_NONBLOCKING)
+ fd_table[fd].flags.nonblocking = 1;
+
+#ifdef TCP_NODELAY
+ if (AI->ai_socktype == SOCK_STREAM)
+ fd_table[fd].flags.nodelay = 1;
+#endif
+
+ /* no fd_table[fd].flags. updates needed for these conditions:
+ * if ((flags & COMM_REUSEADDR)) ...
+ * if ((flags & COMM_DOBIND) ...) ...
+ */
+}
+
+
CBDATA_CLASS_INIT(ConnectStateData);
void *
return EVENT_ERROR;
};
}
- // TODO: merge with comm_openex() when IpAddress becomes NetAddress
+
+/// Create a unix-domain socket (UDS) that only supports FD_MSGHDR I/O.
+int
+comm_open_uds(int sock_type,
+ int proto,
+ struct sockaddr_un* addr,
+ int flags)
+{
++ // TODO: merge with comm_openex() when Ip::Address becomes NetAddress
+
+ int new_socket;
+
+ PROF_start(comm_open);
+ /* Create socket for accepting new connections. */
+ statCounter.syscalls.sock.sockets++;
+
+ /* Setup the socket addrinfo details for use */
+ struct addrinfo AI;
+ AI.ai_flags = 0;
+ AI.ai_family = PF_UNIX;
+ AI.ai_socktype = sock_type;
+ AI.ai_protocol = proto;
+ AI.ai_addrlen = SUN_LEN(addr);
+ AI.ai_addr = (sockaddr*)addr;
+ AI.ai_canonname = NULL;
+ AI.ai_next = NULL;
+
+ debugs(50, 3, HERE << "Attempt open socket for: " << addr->sun_path);
+
+ if ((new_socket = socket(AI.ai_family, AI.ai_socktype, AI.ai_protocol)) < 0) {
+ /* Increase the number of reserved fd's if calls to socket()
+ * are failing because the open file table is full. This
+ * limits the number of simultaneous clients */
+
+ if (limitError(errno)) {
+ debugs(50, DBG_IMPORTANT, HERE << "socket failure: " << xstrerror());
+ fdAdjustReserved();
+ } else {
+ debugs(50, DBG_CRITICAL, HERE << "socket failure: " << xstrerror());
+ }
+
+ PROF_stop(comm_open);
+ return -1;
+ }
+
+ debugs(50, 3, HERE "Opened UDS FD " << new_socket << " : family=" << AI.ai_family << ", type=" << AI.ai_socktype << ", protocol=" << AI.ai_protocol);
+
+ /* update fdstat */
+ debugs(50, 5, HERE << "FD " << new_socket << " is a new socket");
+
+ assert(!isOpen(new_socket));
+ fd_open(new_socket, FD_MSGHDR, NULL);
+
+ fdd_table[new_socket].close_file = NULL;
+
+ fdd_table[new_socket].close_line = 0;
+
+ fd_table[new_socket].sock_family = AI.ai_family;
+
+ if (!(flags & COMM_NOCLOEXEC))
+ commSetCloseOnExec(new_socket);
+
+ if (flags & COMM_REUSEADDR)
+ commSetReuseAddr(new_socket);
+
+ if (flags & COMM_NONBLOCKING) {
+ if (commSetNonBlocking(new_socket) != COMM_OK) {
+ comm_close(new_socket);
+ PROF_stop(comm_open);
+ return -1;
+ }
+ }
+
+ if (flags & COMM_DOBIND) {
+ if (commBind(new_socket, AI) != COMM_OK) {
+ comm_close(new_socket);
+ PROF_stop(comm_open);
+ return -1;
+ }
+ }
+
+#ifdef TCP_NODELAY
+ if (sock_type == SOCK_STREAM)
+ commSetTcpNoDelay(new_socket);
+
+#endif
+
+ if (Config.tcpRcvBufsz > 0 && sock_type == SOCK_STREAM)
+ commSetTcpRcvbuf(new_socket, Config.tcpRcvBufsz);
+
+ PROF_stop(comm_open);
+
+ return new_socket;
+}
SQUIDCEXTERN void comm_init(void);
SQUIDCEXTERN void comm_exit(void);
- SQUIDCEXTERN int comm_open(int, int, IpAddress &, int, const char *note);
+ SQUIDCEXTERN int comm_open(int, int, Ip::Address &, int, const char *note);
+SQUIDCEXTERN int comm_open_uds(int sock_type, int proto, struct sockaddr_un* addr, int flags);
+/// update Comm state after getting a comm_open() FD from another process
- SQUIDCEXTERN void comm_import_opened(int fd, IpAddress &addr, int flags, const char *note, struct addrinfo *AI);
++SQUIDCEXTERN void comm_import_opened(int fd, Ip::Address &addr, int flags, const char *note, struct addrinfo *AI);
/**
* Open a port specially bound for listening or sending through a specific port.
return false;
}
--bool
++void
Comm::ListenStateData::acceptOne()
{
/*
if (newfd == COMM_NOMESSAGE) {
/* register interest again */
-- debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << *theCallback);
++ debugs(5, 5, HERE << "try later: FD " << fd << " handler: " << theCallback);
commSetSelect(fd, COMM_SELECT_READ, doAccept, this, 0);
-- return false;
++ return;
}
// A non-recoverable error; notify the caller */
-- debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << *theCallback);
++ debugs(5, 5, HERE << "non-recoverable error: FD " << fd << " handler: " << theCallback);
notify(-1, COMM_ERROR, errno, connDetails);
-- return false;
++ mayAcceptMore = false;
++ return;
}
debugs(5, 5, HERE << "accepted: FD " << fd <<
" newfd: " << newfd << " from: " << connDetails.peer <<
-- " handler: " << *theCallback);
++ " handler: " << theCallback);
notify(newfd, COMM_OK, 0, connDetails);
-- return true;
}
void
{
assert(isOpen(fd));
debugs(5, 2, HERE << "connection on FD " << fd);
-- mayAcceptMore = acceptOne();
++ acceptOne();
}
void
/// Method callback for whenever an FD is ready to accept a client connection.
static void doAccept(int fd, void *data);
-- bool acceptOne();
++ void acceptOne();
int oldAccept(ConnectionDetail &details);
AsyncCall::Pointer theCallback;
return;
}
- IpAddress incomingAddr = Config.Addrs.udp_incoming;
+ Ip::Address incomingAddr = Config.Addrs.udp_incoming;
incomingAddr.SetPort(Config.Port.htcp);
- enter_suid();
- htcpInSocket = comm_open_listener(SOCK_DGRAM,
- IPPROTO_UDP,
- incomingAddr,
- COMM_NONBLOCKING,
- "HTCP Socket");
- leave_suid();
-
- if (htcpInSocket < 0)
- fatal("Cannot open HTCP Socket");
+ AsyncCall::Pointer call = asyncCall(31, 2,
+ "htcpIncomingConnectionOpened",
+ HtcpListeningStartedDialer(&htcpIncomingConnectionOpened));
- commSetSelect(htcpInSocket, COMM_SELECT_READ, htcpRecv, NULL, 0);
-
- debugs(31, 1, "Accepting HTCP messages on port " << Config.Port.htcp << ", FD " << htcpInSocket << ".");
+ Ipc::StartListening(SOCK_DGRAM,
+ IPPROTO_UDP,
+ incomingAddr,
+ COMM_NONBLOCKING,
+ Ipc::fdnInHtcpSocket, call);
if (!Config.Addrs.udp_outgoing.IsNoAddr()) {
- IpAddress outgoingAddr = Config.Addrs.udp_outgoing;
+ Ip::Address outgoingAddr = Config.Addrs.udp_outgoing;
outgoingAddr.SetPort(Config.Port.htcp);
enter_suid();
#include "SquidTime.h"
#include "SwapDir.h"
#include "icmp/net_db.h"
- #include "ip/IpAddress.h"
+ #include "ip/Address.h"
+#include "ipc/StartListening.h"
#include "rfc1738.h"
- typedef void (*Handler)(int fd, int errNo, IpAddress& addr);
- IcpListeningStartedDialer(Handler aHandler, IpAddress& anAddr):
+/// dials icpIncomingConnectionOpened call
+class IcpListeningStartedDialer: public CallDialer,
+ public Ipc::StartListeningCb
+{
+public:
- IpAddress addr;
++ typedef void (*Handler)(int fd, int errNo, Ip::Address& addr);
++ IcpListeningStartedDialer(Handler aHandler, Ip::Address& anAddr):
+ handler(aHandler), addr(anAddr) {}
+
+ virtual void print(std::ostream &os) const {
+ startPrint(os) <<
+ ", address=" << addr << ')';
+ }
+
+ virtual bool canDial(AsyncCall &) const { return true; }
+ virtual void dial(AsyncCall &) { (handler)(fd, errNo, addr); }
+
+public:
+ Handler handler;
- static void icpIncomingConnectionOpened(int fd, int errNo, IpAddress& addr);
++ Ip::Address addr;
+};
+
++static void icpIncomingConnectionOpened(int fd, int errNo, Ip::Address& addr);
+
/// \ingroup ServerProtocolICPInternal2
- static void icpLogIcp(const IpAddress &, log_type, int, const char *, int);
+ static void icpLogIcp(const Ip::Address &, log_type, int, const char *, int);
/// \ingroup ServerProtocolICPInternal2
- static void icpHandleIcpV2(int, IpAddress &, char *, int);
+ static void icpHandleIcpV2(int, Ip::Address &, char *, int);
/// \ingroup ServerProtocolICPInternal2
static void icpCount(void *, int, size_t, int);
theOutICPAddr.FreeAddrInfo(xai);
}
- icpIncomingConnectionOpened(int fd, int errNo, IpAddress& addr)
+static void
++icpIncomingConnectionOpened(int fd, int errNo, Ip::Address& addr)
+{
+ theInIcpConnection = fd;
+
+ if (theInIcpConnection < 0)
+ fatal("Cannot open ICP Port");
+
+ commSetSelect(theInIcpConnection,
+ COMM_SELECT_READ,
+ icpHandleUdp,
+ NULL,
+ 0);
+
+ for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
+ ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL);
+
+ debugs(12, 1, "Accepting ICP messages at " << addr << ", FD " << theInIcpConnection << ".");
+
+ fd_note(theInIcpConnection, "Incoming ICP socket");
+
+ if (Config.Addrs.udp_outgoing.IsNoAddr())
+ theOutIcpConnection = theInIcpConnection;
+}
+
/**
* icpConnectionShutdown only closes the 'in' socket if it is
* different than the 'out' socket.
return 0;
}
- int IpAddress::compareWhole(const IpAddress &rhs) const
++int
++Ip::Address::compareWhole(const Ip::Address &rhs) const
+{
+ return memcmp(this, &rhs, sizeof(*this));
+}
+
- bool IpAddress::operator ==(const IpAddress &s) const
+ bool
+ Ip::Address::operator ==(const Ip::Address &s) const
{
return (0 == matchIPAddr(s));
}
\retval 1 IP rhs is greater (numerically) than that stored.
\retval -1 IP rhs is less (numerically) than that stored.
*/
- int matchIPAddr(const IpAddress &rhs) const;
+ int matchIPAddr(const Address &rhs) const;
- int compareWhole(const IpAddress &rhs) const;
+ /** Compare taking IP, port, protocol, etc. into account. Returns an
+ integer less than, equal to, or greater than zero if the object
+ is found, respectively, to be less than, to match, or to be greater
+ than rhs. The exact ordering algorithm is not specified and may change.
+ */
++ int compareWhole(const Ip::Address &rhs) const;
+
/**
- * Get RFC 3493 addrinfo structure from the IpAddress data
+ * Get RFC 3493 addrinfo structure from the Ip::Address data
* for protocol-neutral socket operations.
* Should be passed a NULL pointer of type struct addrinfo* it will
* allocate memory for the structures involved. (see FreeAddrInfo to clear).
--- /dev/null
- IpAddress addr = p.addr; // comm_open_listener may modify it
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+
+#include "config.h"
+#include "comm.h"
+#include "ipc/Coordinator.h"
+#include "ipc/FdNotes.h"
+#include "ipc/SharedListen.h"
+
+
+CBDATA_NAMESPACED_CLASS_INIT(Ipc, Coordinator);
+Ipc::Coordinator* Ipc::Coordinator::TheInstance = NULL;
+
+
+Ipc::Coordinator::Coordinator():
+ Port(coordinatorAddr)
+{
+}
+
+void Ipc::Coordinator::start()
+{
+ Port::start();
+}
+
+Ipc::StrandCoord* Ipc::Coordinator::findStrand(int kidId)
+{
+ typedef Strands::iterator SI;
+ for (SI iter = strands.begin(); iter != strands.end(); ++iter) {
+ if (iter->kidId == kidId)
+ return &(*iter);
+ }
+ return NULL;
+}
+
+void Ipc::Coordinator::registerStrand(const StrandCoord& strand)
+{
+ if (StrandCoord* found = findStrand(strand.kidId))
+ *found = strand;
+ else
+ strands.push_back(strand);
+}
+
+void Ipc::Coordinator::receive(const TypedMsgHdr& message)
+{
+ switch (message.type()) {
+ case mtRegistration:
+ debugs(54, 6, HERE << "Registration request");
+ handleRegistrationRequest(StrandCoord(message));
+ break;
+
+ case mtSharedListenRequest:
+ debugs(54, 6, HERE << "Shared listen request");
+ handleSharedListenRequest(SharedListenRequest(message));
+ break;
+
+ default:
+ debugs(54, 1, HERE << "Unhandled message type: " << message.type());
+ break;
+ }
+}
+
+void Ipc::Coordinator::handleRegistrationRequest(const StrandCoord& strand)
+{
+ registerStrand(strand);
+
+ // send back an acknowledgement; TODO: remove as not needed?
+ TypedMsgHdr message;
+ strand.pack(message);
+ SendMessage(MakeAddr(strandAddrPfx, strand.kidId), message);
+}
+
+void
+Ipc::Coordinator::handleSharedListenRequest(const SharedListenRequest& request)
+{
+ debugs(54, 4, HERE << "kid" << request.requestorId <<
+ " needs shared listen FD for " << request.params.addr);
+ Listeners::const_iterator i = listeners.find(request.params);
+ int errNo = 0;
+ const int sock = (i != listeners.end()) ?
+ i->second : openListenSocket(request, errNo);
+
+ debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " <<
+ request.params.addr << " to kid" << request.requestorId <<
+ " mapId=" << request.mapId);
+
+ SharedListenResponse response(sock, errNo, request.mapId);
+ TypedMsgHdr message;
+ response.pack(message);
+ SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
+}
+
+int
+Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
+ int &errNo)
+{
+ const OpenListenerParams &p = request.params;
+
+ debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
+ request.requestorId);
+
++ Ip::Address addr = p.addr; // comm_open_listener may modify it
+
+ enter_suid();
+ const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags,
+ FdNote(p.fdNote));
+ errNo = (sock >= 0) ? 0 : errno;
+ leave_suid();
+
+ // cache positive results
+ if (sock >= 0)
+ listeners[request.params] = sock;
+
+ return sock;
+}
+
+void Ipc::Coordinator::broadcastSignal(int sig) const
+{
+ typedef Strands::const_iterator SCI;
+ for (SCI iter = strands.begin(); iter != strands.end(); ++iter) {
+ debugs(54, 5, HERE << "signal " << sig << " to kid" << iter->kidId <<
+ ", PID=" << iter->pid);
+ kill(iter->pid, sig);
+ }
+}
+
+Ipc::Coordinator* Ipc::Coordinator::Instance()
+{
+ if (!TheInstance)
+ TheInstance = new Coordinator;
+ // XXX: if the Coordinator job quits, this pointer will become invalid
+ // we could make Coordinator death fatal, except during exit, but since
+ // Strands do not re-register, even process death would be pointless.
+ return TheInstance;
+}
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#include "config.h"
+#include <map>
+#include "comm.h"
++#include "base/TextException.h"
+#include "ipc/Port.h"
+#include "ipc/Messages.h"
+#include "ipc/Kids.h"
+#include "ipc/TypedMsgHdr.h"
+#include "ipc/StartListening.h"
+#include "ipc/SharedListen.h"
+
+
+/// holds information necessary to handle JoinListen response
+class PendingOpenRequest
+{
+public:
+ Ipc::OpenListenerParams params; ///< actual comm_open_sharedListen() parameters
+ AsyncCall::Pointer callback; // who to notify
+};
+
+/// maps ID assigned at request time to the response callback
+typedef std::map<int, PendingOpenRequest> SharedListenRequestMap;
+static SharedListenRequestMap TheSharedListenRequestMap;
+
+static int
+AddToMap(const PendingOpenRequest &por)
+{
+ // find unused ID using linear seach; there should not be many entries
+ for (int id = 0; true; ++id) {
+ if (TheSharedListenRequestMap.find(id) == TheSharedListenRequestMap.end()) {
+ TheSharedListenRequestMap[id] = por;
+ return id;
+ }
+ }
+ assert(false); // not reached
+ return -1;
+}
+
+Ipc::OpenListenerParams::OpenListenerParams()
+{
+ xmemset(this, 0, sizeof(*this));
+}
+
+bool
+Ipc::OpenListenerParams::operator <(const OpenListenerParams &p) const
+{
+ if (sock_type != p.sock_type)
+ return sock_type < p.sock_type;
+
+ if (proto != p.proto)
+ return proto < p.proto;
+
+ // ignore flags and fdNote differences because they do not affect binding
+
+ return addr.compareWhole(p.addr) < 0;
+}
+
+
+
+Ipc::SharedListenRequest::SharedListenRequest(): requestorId(-1), mapId(-1)
+{
+ // caller will then set public data members
+}
+
+Ipc::SharedListenRequest::SharedListenRequest(const TypedMsgHdr &hdrMsg)
+{
+ hdrMsg.getData(mtSharedListenRequest, this, sizeof(*this));
+}
+
+void Ipc::SharedListenRequest::pack(TypedMsgHdr &hdrMsg) const
+{
+ hdrMsg.putData(mtSharedListenRequest, this, sizeof(*this));
+}
+
+
+Ipc::SharedListenResponse::SharedListenResponse(int aFd, int anErrNo, int aMapId):
+ fd(aFd), errNo(anErrNo), mapId(aMapId)
+{
+}
+
+Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr &hdrMsg):
+ fd(-1), errNo(0), mapId(-1)
+{
+ hdrMsg.getData(mtSharedListenResponse, this, sizeof(*this));
+ fd = hdrMsg.getFd();
+}
+
+void Ipc::SharedListenResponse::pack(TypedMsgHdr &hdrMsg) const
+{
+ hdrMsg.putData(mtSharedListenResponse, this, sizeof(*this));
+ hdrMsg.putFd(fd);
+}
+
+
+void Ipc::JoinSharedListen(const OpenListenerParams ¶ms,
+ AsyncCall::Pointer &callback)
+{
+ PendingOpenRequest por;
+ por.params = params;
+ por.callback = callback;
+
+ SharedListenRequest request;
+ request.requestorId = KidIdentifier;
+ request.params = por.params;
+ request.mapId = AddToMap(por);
+
+ debugs(54, 3, HERE << "getting listening FD for " << request.params.addr <<
+ " mapId=" << request.mapId);
+
+ TypedMsgHdr message;
+ request.pack(message);
+ SendMessage(coordinatorAddr, message);
+}
+
+void Ipc::SharedListenJoined(const SharedListenResponse &response)
+{
+ const int fd = response.fd;
+
+ debugs(54, 3, HERE << "got listening FD " << fd << " errNo=" <<
+ response.errNo << " mapId=" << response.mapId);
+
+ Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end());
+ PendingOpenRequest por = TheSharedListenRequestMap[response.mapId];
+ Must(por.callback != NULL);
+ TheSharedListenRequestMap.erase(response.mapId);
+
+ if (fd >= 0) {
+ OpenListenerParams &p = por.params;
+ struct addrinfo *AI = NULL;
+ p.addr.GetAddrInfo(AI);
+ AI->ai_socktype = p.sock_type;
+ AI->ai_protocol = p.proto;
+ comm_import_opened(fd, p.addr, p.flags, FdNote(p.fdNote), AI);
+ p.addr.FreeAddrInfo(AI);
+ }
+
+ StartListeningCb *cbd =
+ dynamic_cast<StartListeningCb*>(por.callback->getDialer());
+ Must(cbd);
+ cbd->fd = fd;
+ cbd->errNo = response.errNo;
+ ScheduleCallHere(por.callback);
+}
--- /dev/null
- IpAddress addr; ///< will be memset and memcopied
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_SHARED_LISTEN_H
+#define SQUID_IPC_SHARED_LISTEN_H
+
+#include "base/AsyncCall.h"
+
+namespace Ipc
+{
+
+/// "shared listen" is when concurrent processes are listening on the same fd
+
+/// comm_open_listener() parameters holder
+class OpenListenerParams
+{
+public:
+ OpenListenerParams();
+
+ bool operator <(const OpenListenerParams &p) const; ///< useful for map<>
+
+ int sock_type;
+ int proto;
++ Ip::Address addr; ///< will be memset and memcopied
+ int flags;
+ int fdNote; ///< index into fd_note() comment strings
+};
+
+class TypedMsgHdr;
+
+/// a request for a listen socket with given parameters
+class SharedListenRequest
+{
+public:
+ SharedListenRequest(); ///< from OpenSharedListen() which then sets public data
+ explicit SharedListenRequest(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+ void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+
+public:
+ int requestorId; ///< kidId of the requestor
+
+ OpenListenerParams params; ///< actual comm_open_sharedListen() parameters
+
+ int mapId; ///< to map future response to the requestor's callback
+};
+
+/// a response to SharedListenRequest
+class SharedListenResponse
+{
+public:
+ SharedListenResponse(int fd, int errNo, int mapId);
+ explicit SharedListenResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
+ void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
+
+public:
+ int fd; ///< opened listening socket or -1
+ int errNo; ///< errno value from comm_open_sharedListen() call
+ int mapId; ///< to map future response to the requestor's callback
+};
+
+/// prepare and send SharedListenRequest to Coordinator
+extern void JoinSharedListen(const OpenListenerParams &, AsyncCall::Pointer &);
+
+/// process Coordinator response to SharedListenRequest
+extern void SharedListenJoined(const SharedListenResponse &response);
+
+} // namespace Ipc;
+
+
+#endif /* SQUID_IPC_SHARED_LISTEN_H */
--- /dev/null
- #include "TextException.h"
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#include "config.h"
+#include "comm.h"
- void Ipc::StartListening(int sock_type, int proto, IpAddress &addr,
++#include "base/TextException.h"
+#include "ipc/SharedListen.h"
+#include "ipc/StartListening.h"
+
+
+Ipc::StartListeningCb::StartListeningCb(): fd(-1), errNo(0)
+{
+}
+
+Ipc::StartListeningCb::~StartListeningCb()
+{
+}
+
+std::ostream &Ipc::StartListeningCb::startPrint(std::ostream &os) const
+{
+ return os << "(FD " << fd << ", err=" << errNo;
+}
+
+
++void Ipc::StartListening(int sock_type, int proto, Ip::Address &addr,
+ int flags, FdNoteId fdNote, AsyncCall::Pointer &callback)
+{
+ OpenListenerParams p;
+ p.sock_type = sock_type;
+ p.proto = proto;
+ p.addr = addr;
+ p.flags = flags;
+ p.fdNote = fdNote;
+
+ if (UsingSmp()) { // if SMP is on, share
+ Ipc::JoinSharedListen(p, callback);
+ return; // wait for the call back
+ }
+
+ enter_suid();
+ const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags,
+ FdNote(p.fdNote));
+ const int errNo = (sock >= 0) ? 0 : errno;
+ leave_suid();
+
+ debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr);
+
+ StartListeningCb *cbd =
+ dynamic_cast<StartListeningCb*>(callback->getDialer());
+ Must(cbd);
+ cbd->fd = sock;
+ cbd->errNo = errNo;
+ ScheduleCallHere(callback);
+}
--- /dev/null
- class IpAddress;
-
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#ifndef SQUID_IPC_START_LISTENING_H
+#define SQUID_IPC_START_LISTENING_H
+
+#include <iosfwd>
++#include "ip/forward.h"
+#include "ipc/FdNotes.h"
+#include "base/AsyncCall.h"
+
- extern void StartListening(int sock_type, int proto, IpAddress &addr,
+namespace Ipc
+{
+
+/// common API for all StartListening() callbacks
+class StartListeningCb
+{
+public:
+ StartListeningCb();
+ virtual ~StartListeningCb();
+
+ /// starts printing arguments, return os
+ std::ostream &startPrint(std::ostream &os) const;
+
+public:
+ int fd; ///< opened listening socket or -1
+ int errNo; ///< errno value from the comm_open_listener() call
+};
+
+/// Depending on whether SMP is on, either ask Coordinator to send us
+/// the listening FD or call comm_open_listener() directly.
++extern void StartListening(int sock_type, int proto, Ip::Address &addr,
+ int flags, FdNoteId fdNote, AsyncCall::Pointer &callback);
+
+} // namespace Ipc;
+
+
+#endif /* SQUID_IPC_START_LISTENING_H */
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+#include "config.h"
++#include "base/TextException.h"
+#include "ipc/Strand.h"
+#include "ipc/Messages.h"
+#include "ipc/SharedListen.h"
+#include "ipc/Kids.h"
+
+
+CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
+
+
+Ipc::Strand::Strand():
+ Port(MakeAddr(strandAddrPfx, KidIdentifier)),
+ isRegistered(false)
+{
+}
+
+void Ipc::Strand::start()
+{
+ Port::start();
+ registerSelf();
+}
+
+void Ipc::Strand::registerSelf()
+{
+ debugs(54, 6, HERE);
+ Must(!isRegistered);
+ TypedMsgHdr message;
+ StrandCoord(KidIdentifier, getpid()).pack(message);
+ SendMessage(coordinatorAddr, message);
+ setTimeout(6, "Ipc::Strand::timeoutHandler"); // TODO: make 6 configurable?
+}
+
+void Ipc::Strand::receive(const TypedMsgHdr &message)
+{
+ debugs(54, 6, HERE << message.type());
+ switch (message.type()) {
+
+ case mtRegistration:
+ handleRegistrationResponse(StrandCoord(message));
+ break;
+
+ case mtSharedListenResponse:
+ SharedListenJoined(SharedListenResponse(message));
+ break;
+
+ default:
+ debugs(54, 1, HERE << "Unhandled message type: " << message.type());
+ break;
+ }
+}
+
+void Ipc::Strand::handleRegistrationResponse(const StrandCoord &strand)
+{
+ // handle registration response from the coordinator; it could be stale
+ if (strand.kidId == KidIdentifier && strand.pid == getpid()) {
+ debugs(54, 6, "kid" << KidIdentifier << " registered");
+ clearTimeout(); // we are done
+ } else {
+ // could be an ACK to the registration message of our dead predecessor
+ debugs(54, 6, "kid" << KidIdentifier << " is not yet registered");
+ // keep listening, with a timeout
+ }
+}
+
+void Ipc::Strand::timedout()
+{
+ debugs(54, 6, HERE << isRegistered);
+ if (!isRegistered)
+ fatalf("kid%d registration timed out", KidIdentifier);
+}
--- /dev/null
- #include "TextException.h"
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+
+#include "config.h"
+#include <string.h>
++#include "protos.h"
++#include "base/TextException.h"
+#include "ipc/TypedMsgHdr.h"
+
+Ipc::TypedMsgHdr::TypedMsgHdr()
+{
+ xmemset(this, 0, sizeof(*this));
+ sync();
+}
+
+Ipc::TypedMsgHdr::TypedMsgHdr(const TypedMsgHdr &tmh)
+{
+ xmemcpy(this, &tmh, sizeof(*this));
+ sync();
+}
+
+Ipc::TypedMsgHdr &Ipc::TypedMsgHdr::operator =(const TypedMsgHdr &tmh)
+{
+ if (this != &tmh) { // skip assignment to self
+ xmemcpy(this, &tmh, sizeof(*this));
+ sync();
+ }
+ return *this;
+}
+
+// update msghdr and ios pointers based on msghdr counters
+void Ipc::TypedMsgHdr::sync()
+{
+ if (msg_name) { // we have a name
+ msg_name = &name;
+ } else {
+ Must(!msg_namelen && !msg_name);
+ }
+
+ if (msg_iov) { // we have a data component
+ Must(msg_iovlen == 1);
+ msg_iov = ios;
+ ios[0].iov_base = &data;
+ Must(ios[0].iov_len == sizeof(data));
+ } else {
+ Must(!msg_iovlen && !msg_iov);
+ }
+
+ if (msg_control) { // we have a control component
+ Must(msg_controllen > 0);
+ msg_control = &ctrl;
+ } else {
+ Must(!msg_controllen && !msg_control);
+ }
+}
+
+
+
+int
+Ipc::TypedMsgHdr::type() const
+{
+ Must(msg_iovlen == 1);
+ return data.type_;
+}
+
+void
+Ipc::TypedMsgHdr::address(const struct sockaddr_un& addr)
+{
+ allocName();
+ name = addr;
+ msg_name = &name;
+ msg_namelen = SUN_LEN(&name);
+}
+
+void
+Ipc::TypedMsgHdr::getData(int destType, void *raw, size_t size) const
+{
+ Must(type() == destType);
+ Must(size == data.size);
+ xmemcpy(raw, data.raw, size);
+}
+
+void
+Ipc::TypedMsgHdr::putData(int aType, const void *raw, size_t size)
+{
+ Must(size <= sizeof(data.raw));
+ allocData();
+ data.type_ = aType;
+ data.size = size;
+ xmemcpy(data.raw, raw, size);
+}
+
+void
+Ipc::TypedMsgHdr::putFd(int fd)
+{
+ Must(fd >= 0);
+ allocControl();
+
+ const int fdCount = 1;
+
+ struct cmsghdr *cmsg = CMSG_FIRSTHDR(this);
+ cmsg->cmsg_level = SOL_SOCKET;
+ cmsg->cmsg_type = SCM_RIGHTS;
+ cmsg->cmsg_len = CMSG_LEN(sizeof(int) * fdCount);
+
+ int *fdStore = reinterpret_cast<int*>(CMSG_DATA(cmsg));
+ xmemcpy(fdStore, &fd, fdCount * sizeof(int));
+ msg_controllen = cmsg->cmsg_len;
+}
+
+int
+Ipc::TypedMsgHdr::getFd() const
+{
+ Must(msg_control && msg_controllen);
+
+ struct cmsghdr *cmsg = CMSG_FIRSTHDR(this);
+ Must(cmsg->cmsg_level == SOL_SOCKET);
+ Must(cmsg->cmsg_type == SCM_RIGHTS);
+
+ const int fdCount = 1;
+ const int *fdStore = reinterpret_cast<const int*>(CMSG_DATA(cmsg));
+ int fd = -1;
+ xmemcpy(&fd, fdStore, fdCount * sizeof(int));
+ return fd;
+}
+
+void
+Ipc::TypedMsgHdr::prepForReading()
+{
+ xmemset(this, 0, sizeof(*this));
+ allocName();
+ allocData();
+ allocControl();
+}
+
+/// initialize io vector with one io record
+void
+Ipc::TypedMsgHdr::allocData()
+{
+ Must(!msg_iovlen && !msg_iov);
+ msg_iovlen = 1;
+ msg_iov = ios;
+ ios[0].iov_base = &data;
+ ios[0].iov_len = sizeof(data);
+ data.type_ = 0;
+ data.size = 0;
+}
+
+void
+Ipc::TypedMsgHdr::allocName()
+{
+ Must(!msg_name && !msg_namelen);
+ msg_name = &name;
+ msg_namelen = sizeof(name); // is that the right size?
+}
+
+void
+Ipc::TypedMsgHdr::allocControl()
+{
+ Must(!msg_control && !msg_controllen);
+ msg_control = &ctrl;
+ msg_controllen = sizeof(ctrl);
+}
--- /dev/null
+/*
+ * $Id$
+ *
+ * DEBUG: section 54 Interprocess Communication
+ *
+ */
+
+
+#include "config.h"
+#include "comm.h"
+#include "CommCalls.h"
++#include "base/TextException.h"
+#include "ipc/UdsOp.h"
+
+
+Ipc::UdsOp::UdsOp(const String& pathAddr):
+ AsyncJob("Ipc::UdsOp"),
+ address(PathToAddress(pathAddr)),
+ options(COMM_NONBLOCKING),
+ fd_(-1)
+{
+ debugs(54, 5, HERE << '[' << this << "] pathAddr=" << pathAddr);
+}
+
+Ipc::UdsOp::~UdsOp()
+{
+ debugs(54, 5, HERE << '[' << this << ']');
+ if (fd_ >= 0)
+ comm_close(fd_);
+}
+
+void Ipc::UdsOp::setOptions(int newOptions)
+{
+ options = newOptions;
+}
+
+int Ipc::UdsOp::fd()
+{
+ if (fd_ < 0) {
+ if (options & COMM_DOBIND)
+ unlink(address.sun_path);
+ fd_ = comm_open_uds(SOCK_DGRAM, 0, &address, options);
+ Must(fd_ >= 0);
+ }
+ return fd_;
+}
+
+void Ipc::UdsOp::setTimeout(int seconds, const char *handlerName)
+{
+ AsyncCall::Pointer handler = asyncCall(54,5, handlerName,
+ CommCbMemFunT<UdsOp, CommTimeoutCbParams>(this,
+ &UdsOp::noteTimeout));
+ commSetTimeout(fd(), seconds, handler);
+}
+
+void Ipc::UdsOp::clearTimeout()
+{
+ commSetTimeout(fd(), -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd)
+}
+
+void Ipc::UdsOp::noteTimeout(const CommTimeoutCbParams &)
+{
+ timedout(); // our kid handles communication timeout
+}
+
+
+struct sockaddr_un
+Ipc::PathToAddress(const String& pathAddr) {
+ assert(pathAddr.size() != 0);
+ struct sockaddr_un unixAddr;
+ memset(&unixAddr, 0, sizeof(unixAddr));
+ unixAddr.sun_family = AF_LOCAL;
+ xstrncpy(unixAddr.sun_path, pathAddr.termedBuf(), sizeof(unixAddr.sun_path));
+ return unixAddr;
+}
+
+
+CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender);
+
+Ipc::UdsSender::UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage):
+ UdsOp(pathAddr),
+ message(aMessage),
+ retries(10), // TODO: make configurable?
+ timeout(10), // TODO: make configurable?
+ writing(false)
+{
+ message.address(address);
+}
+
+void Ipc::UdsSender::start()
+{
+ UdsOp::start();
+ write();
+ if (timeout > 0)
+ setTimeout(timeout, "Ipc::UdsSender::noteTimeout");
+}
+
+bool Ipc::UdsSender::doneAll() const
+{
+ return !writing && UdsOp::doneAll();
+}
+
+void Ipc::UdsSender::write()
+{
+ debugs(54, 5, HERE);
+ AsyncCall::Pointer writeHandler = asyncCall(54, 5, "Ipc::UdsSender::wrote",
+ CommCbMemFunT<UdsSender, CommIoCbParams>(this, &UdsSender::wrote));
+ comm_write(fd(), message.raw(), message.size(), writeHandler);
+ writing = true;
+}
+
+void Ipc::UdsSender::wrote(const CommIoCbParams& params)
+{
+ debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
+ writing = false;
+ if (params.flag != COMM_OK && retries-- > 0) {
+ sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed?
+ write(); // XXX: should we close on error so that fd() reopens?
+ }
+}
+
+void Ipc::UdsSender::timedout()
+{
+ debugs(54, 5, HERE);
+ mustStop("timedout");
+}
+
+
+void Ipc::SendMessage(const String& toAddress, const TypedMsgHdr &message)
+{
+ AsyncJob::AsyncStart(new UdsSender(toAddress, message));
+}
#endif
#ifndef _SQUID_MSWIN_
--#ifdef KILL_PARENT_OPT
++#if KILL_PARENT_OPT
if (getppid() > 1) {
debugs(1, 1, "Killing master process, pid " << getppid());
static void
serverConnectionsOpen(void)
{
- clientOpenListenSockets();
- icpConnectionsOpen();
-#if USE_HTCP
+ if (IamPrimaryProcess()) {
+#if USE_WCCP
- htcpInit();
+ wccpConnectionOpen();
#endif
-#if SQUID_SNMP
- snmpConnectionOpen();
-#endif
-#if USE_WCCP
+#if USE_WCCPv2
- wccpConnectionOpen();
+ wccp2ConnectionOpen();
#endif
+ }
+ // Coordinator does not start proxying services
+ if (!IamCoordinatorProcess()) {
+ clientOpenListenSockets();
+ icpConnectionsOpen();
+#if USE_HTCP
-#if USE_WCCPv2
+ htcpInit();
+#endif
- #ifdef SQUID_SNMP
++#if SQUID_SNMP
- wccp2ConnectionOpen();
+ snmpConnectionOpen();
#endif
- clientdbInit();
- icmpEngine.Open();
- netdbInit();
- asnInit();
- ACL::Initialize();
- peerSelectInit();
-
- carpInit();
- peerUserHashInit();
- peerSourceHashInit();
+ clientdbInit();
+ icmpEngine.Open();
+ netdbInit();
+ asnInit();
+ ACL::Initialize();
+ peerSelectInit();
+
+ carpInit();
+ peerUserHashInit();
+ peerSourceHashInit();
+ }
}
static void
serverConnectionsClose(void)
{
assert(shutting_down || reconfiguring);
- clientHttpConnectionsClose();
- icpConnectionShutdown();
-#if USE_HTCP
- htcpSocketShutdown();
-#endif
+ if (IamPrimaryProcess()) {
+#if USE_WCCP
- icmpEngine.Close();
-#if SQUID_SNMP
+ wccpConnectionClose();
+#endif
+#if USE_WCCPv2
- snmpConnectionShutdown();
+ wccp2ConnectionClose();
#endif
-#if USE_WCCP
+ }
+ if (!IamCoordinatorProcess()) {
+ clientHttpConnectionsClose();
+ icpConnectionShutdown();
+#if USE_HTCP
- wccpConnectionClose();
+ htcpSocketShutdown();
#endif
-#if USE_WCCPv2
- wccp2ConnectionClose();
+ icmpEngine.Close();
- #ifdef SQUID_SNMP
++#if SQUID_SNMP
+
+ snmpConnectionShutdown();
#endif
- asnFreeMemory();
+ asnFreeMemory();
+ }
}
static void
#endif
redirectInit();
- authenticateInit(&Config.authConfiguration);
+ authenticateInit(&Auth::TheConfig);
externalAclInit();
+
+ if (IamPrimaryProcess()) {
#if USE_WCCP
- wccpInit();
+ wccpInit();
#endif
#if USE_WCCPv2
*
*/
#include "squid.h"
- #include "comm.h"
- #include "cache_snmp.h"
#include "acl/FilledChecklist.h"
- #include "ip/IpAddress.h"
+ #include "cache_snmp.h"
+ #include "comm.h"
+#include "ipc/StartListening.h"
+ #include "compat/strsep.h"
+ #include "ip/Address.h"
#define SNMP_REQUEST_SIZE 4096
#define MAX_PROTOSTAT 5
-
+/// dials snmpConnectionOpened call
+class SnmpListeningStartedDialer: public CallDialer,
+ public Ipc::StartListeningCb
+{
+public:
+ typedef void (*Handler)(int fd, int errNo);
+ SnmpListeningStartedDialer(Handler aHandler): handler(aHandler) {}
+
+ virtual void print(std::ostream &os) const { startPrint(os) << ')'; }
+
+ virtual bool canDial(AsyncCall &) const { return true; }
+ virtual void dial(AsyncCall &) { (handler)(fd, errNo); }
+
+public:
+ Handler handler;
+};
+
+
- IpAddress theOutSNMPAddr;
+ Ip::Address theOutSNMPAddr;
typedef struct _mib_tree_entry mib_tree_entry;
typedef oid *(instance_Fn) (oid * name, snint * len, mib_tree_entry * current, oid_ParseFn ** Fn);
char *accept_filter;
int umask;
+ int max_filedescriptors;
+ int workers;
#if USE_LOADABLE_MODULES
wordlist *loadable_module_names;
*/
#include "squid.h"
- #include "ProtoPort.h"
- #include "SwapDir.h"
+ #include "compat/initgroups.h"
+ #include "compat/getaddrinfo.h"
+ #include "compat/getnameinfo.h"
+ #include "compat/tempnam.h"
#include "fde.h"
+ #include "ip/Intercept.h"
#include "MemBuf.h"
- #include "wordlist.h"
+ #include "ProtoPort.h"
#include "SquidMath.h"
#include "SquidTime.h"
- #include "ip/IpIntercept.h"
+#include "ipc/Kids.h"
+#include "ipc/Coordinator.h"
+ #include "SwapDir.h"
+ #include "wordlist.h"
#if HAVE_SYS_PRCTL_H
#include <sys/prctl.h>
static int state = 0;
/* no debugs() here; bad things happen if the signal is delivered during _db_print() */
+ DebugSignal = sig;
+
if (state == 0) {
- #ifndef MEM_GEN_TRACE
+ #if !MEM_GEN_TRACE
Debug::parseOptions("ALL,7");
#else