/*
- * DEBUG: section 5 Socket Functions
+ * DEBUG: section 05 Socket Functions
* AUTHOR: Harvest Derived
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
#include "comm.h"
#include "event.h"
#include "fde.h"
+#include "comm/AcceptLimiter.h"
+#include "comm/comm_internal.h"
+#include "comm/ListenStateData.h"
#include "CommIO.h"
#include "CommRead.h"
#include "ConnectionDetail.h"
#include "CommCalls.h"
#include "DescriptorSet.h"
#include "icmp/net_db.h"
-#include "ip/IpAddress.h"
-#include "ip/IpIntercept.h"
+#include "ip/Address.h"
+#include "ip/Intercept.h"
#if defined(_SQUID_CYGWIN_)
#include <sys/ioctl.h>
static void commStopHalfClosedMonitor(int fd);
static IOCB commHalfClosedReader;
+static void comm_init_opened(int new_socket, Ip::Address &addr, unsigned char TOS, const char *note, struct addrinfo *AI);
+static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI);
struct comm_io_callback_t {
// defaults given by client
char *host;
u_short default_port;
- IpAddress default_addr;
+ Ip::Address default_addr;
// NP: CANNOT store the default addr:port together as it gets set/reset differently.
DnsLookupDetails dns; ///< host lookup details
- IpAddress S;
+ Ip::Address S;
AsyncCall::Pointer callback;
int fd;
static PF commHandleWrite;
static IPH commConnectDnsHandle;
-static PF comm_accept_try;
-
-class AcceptFD
-{
-
-public:
- AcceptFD(int aFd = -1): fd(aFd), theCallback(0), mayAcceptMore(false) {}
-
- void subscribe(AsyncCall::Pointer &call);
- void acceptNext();
- void notify(int newfd, comm_err_t, int xerrno, const ConnectionDetail &);
-
- int fd;
-
-private:
- bool acceptOne();
-
- AsyncCall::Pointer theCallback;
- bool mayAcceptMore;
-};
-
typedef enum {
COMM_CB_READ = 1,
COMM_CB_DERIVED
} comm_callback_t;
-struct _fd_debug_t {
- char const *close_file;
- int close_line;
-};
-
-typedef struct _fd_debug_t fd_debug_t;
-
static MemAllocator *conn_close_pool = NULL;
-AcceptFD *fdc_table = NULL; // TODO: rename. And use Vector<>?
fd_debug_t *fdd_table = NULL;
-static bool
+bool
isOpen(const int fd)
{
return fd_table[fd].flags.open != 0;
* synchronous wrapper around udp socket functions
*/
int
-comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, IpAddress &from)
+comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, Ip::Address &from)
{
statCounter.syscalls.sock.recvfroms++;
int x = 0;
int
comm_udp_recv(int fd, void *buf, size_t len, int flags)
{
- IpAddress nul;
+ Ip::Address nul;
return comm_udp_recvfrom(fd, buf, len, flags, nul);
}
u_short
comm_local_port(int fd)
{
- IpAddress temp;
+ Ip::Address temp;
struct addrinfo *addr = NULL;
fde *F = &fd_table[fd];
int
comm_open(int sock_type,
int proto,
- IpAddress &addr,
+ Ip::Address &addr,
int flags,
const char *note)
{
int
comm_open_listener(int sock_type,
int proto,
- IpAddress &addr,
+ Ip::Address &addr,
int flags,
const char *note)
{
/* attempt native enabled port. */
sock = comm_openex(sock_type, proto, addr, flags, 0, note);
-#if USE_IPV6
- /* under IPv6 there is the possibility IPv6 is present but disabled. */
- /* try again as IPv4-native */
- if ( sock < 0 && addr.IsIPv6() && addr.SetIPv4() ) {
- /* attempt to open this IPv4-only. */
- sock = comm_openex(sock_type, proto, addr, flags, 0, note);
- debugs(50, 2, HERE << "attempt open " << note << " socket on: " << addr);
- }
-#endif
-
return sock;
}
int
comm_openex(int sock_type,
int proto,
- IpAddress &addr,
+ Ip::Address &addr,
int flags,
unsigned char TOS,
const char *note)
{
int new_socket;
- fde *F = NULL;
int tos = 0;
struct addrinfo *AI = NULL;
debugs(50, 3, "comm_openex: Attempt open socket for: " << addr );
- if ((new_socket = socket(AI->ai_family, AI->ai_socktype, AI->ai_protocol)) < 0) {
+ new_socket = socket(AI->ai_family, AI->ai_socktype, AI->ai_protocol);
+#if USE_IPV6
+ /* under IPv6 there is the possibility IPv6 is present but disabled. */
+ /* try again as IPv4-native if possible */
+ if ( new_socket < 0 && addr.IsIPv6() && addr.SetIPv4() ) {
+ /* attempt to open this IPv4-only. */
+ addr.FreeAddrInfo(AI);
+ /* Setup the socket addrinfo details for use */
+ addr.GetAddrInfo(AI);
+ AI->ai_socktype = sock_type;
+ AI->ai_protocol = proto;
+ debugs(50, 3, "comm_openex: Attempt fallback open socket for: " << addr );
+ new_socket = socket(AI->ai_family, AI->ai_socktype, AI->ai_protocol);
+ debugs(50, 2, HERE << "attempt open " << note << " socket on: " << addr);
+ }
+#endif
+
+ if (new_socket < 0) {
/* Increase the number of reserved fd's if calls to socket()
* are failing because the open file table is full. This
* limits the number of simultaneous clients */
#endif
+ comm_init_opened(new_socket, addr, TOS, note, AI);
+ new_socket = comm_apply_flags(new_socket, addr, flags, AI);
+
+ addr.FreeAddrInfo(AI);
+
+ PROF_stop(comm_open);
+
+ return new_socket;
+}
+
+/// update FD tables after a local or remote (IPC) comm_openex();
+void
+comm_init_opened(int new_socket,
+ Ip::Address &addr,
+ unsigned char TOS,
+ const char *note,
+ struct addrinfo *AI)
+{
+ assert(new_socket >= 0);
+ assert(AI);
+
+ fde *F = NULL;
+
/* update fdstat */
debugs(5, 5, "comm_open: FD " << new_socket << " is a new socket");
F->tos = TOS;
F->sock_family = AI->ai_family;
+}
+
+/// apply flags after a local comm_open*() call;
+/// returns new_socket or -1 on error
+static int
+comm_apply_flags(int new_socket,
+ Ip::Address &addr,
+ int flags,
+ struct addrinfo *AI)
+{
+ assert(new_socket >= 0);
+ assert(AI);
+ const int sock_type = AI->ai_socktype;
if (!(flags & COMM_NOCLOEXEC))
commSetCloseOnExec(new_socket);
if (commBind(new_socket, *AI) != COMM_OK) {
comm_close(new_socket);
- addr.FreeAddrInfo(AI);
return -1;
- PROF_stop(comm_open);
}
}
- addr.FreeAddrInfo(AI);
-
if (flags & COMM_NONBLOCKING)
if (commSetNonBlocking(new_socket) == COMM_ERROR) {
+ comm_close(new_socket);
return -1;
- PROF_stop(comm_open);
}
#ifdef TCP_NODELAY
if (Config.tcpRcvBufsz > 0 && sock_type == SOCK_STREAM)
commSetTcpRcvbuf(new_socket, Config.tcpRcvBufsz);
- PROF_stop(comm_open);
-
return new_socket;
}
+void
+comm_import_opened(int fd,
+ Ip::Address &addr,
+ int flags,
+ const char *note,
+ struct addrinfo *AI)
+{
+ debugs(5, 2, HERE << " FD " << fd << " at " << addr);
+ assert(fd >= 0);
+ assert(AI);
+
+ comm_init_opened(fd, addr, 0, note, AI);
+
+ if (!(flags & COMM_NOCLOEXEC))
+ fd_table[fd].flags.close_on_exec = 1;
+
+ if (addr.GetPort() > (u_short) 0) {
+#ifdef _SQUID_MSWIN_
+ if (sock_type != SOCK_DGRAM)
+#endif
+ fd_table[fd].flags.nolinger = 1;
+ }
+
+ if ((flags & COMM_TRANSPARENT))
+ fd_table[fd].flags.transparent = 1;
+
+ if (flags & COMM_NONBLOCKING)
+ fd_table[fd].flags.nonblocking = 1;
+
+#ifdef TCP_NODELAY
+ if (AI->ai_socktype == SOCK_STREAM)
+ fd_table[fd].flags.nodelay = 1;
+#endif
+
+ /* no fd_table[fd].flags. updates needed for these conditions:
+ * if ((flags & COMM_REUSEADDR)) ...
+ * if ((flags & COMM_DOBIND) ...) ...
+ */
+}
+
+
CBDATA_CLASS_INIT(ConnectStateData);
void *
int
ConnectStateData::commResetFD()
{
- struct addrinfo *AI = NULL;
- IpAddress nul;
- int new_family = AF_UNSPEC;
// XXX: do we have to check this?
//
statCounter.syscalls.sock.sockets++;
- /* setup a bare-bones addrinfo */
- /* TODO INET6: for WinXP we may need to check the local_addr type and setup the family properly. */
- nul.GetAddrInfo(AI);
- new_family = AI->ai_family;
+ fde *F = &fd_table[fd];
- int fd2 = socket(AI->ai_family, AI->ai_socktype, AI->ai_protocol);
+ struct addrinfo *AI = NULL;
+ F->local_addr.GetAddrInfo(AI);
+ int new_family = AI->ai_family;
- nul.FreeAddrInfo(AI);
+ int fd2 = socket(new_family, AI->ai_socktype, AI->ai_protocol);
if (fd2 < 0) {
debugs(5, DBG_CRITICAL, HERE << "WARNING: FD " << fd2 << " socket failed to allocate: " << xstrerror());
if (ENFILE == errno || EMFILE == errno)
fdAdjustReserved();
+ F->local_addr.FreeAddrInfo(AI);
return 0;
}
close(fd2);
+ F->local_addr.FreeAddrInfo(AI);
return 0;
}
commResetSelect(fd);
close(fd2);
- fde *F = &fd_table[fd];
debugs(50, 3, "commResetFD: Reset socket FD " << fd << "->" << fd2 << " : family=" << new_family );
/* INET6: copy the new sockets family type to the FDE table */
- fd_table[fd].sock_family = new_family;
+ F->sock_family = new_family;
+
+ F->flags.called_connect = 0;
- fd_table[fd].flags.called_connect = 0;
/*
* yuck, this has assumptions about comm_open() arguments for
* the original socket
comm_set_transparent(fd);
}
- AI = NULL;
- F->local_addr.GetAddrInfo(AI);
-
if (commBind(fd, *AI) != COMM_OK) {
debugs(5, DBG_CRITICAL, "WARNING: Reset of FD " << fd << " for " << F->local_addr << " failed to bind: " << xstrerror());
F->local_addr.FreeAddrInfo(AI);
if (squid_curtime - connstart > Config.Timeout.connect)
return 0;
} else {
- if (tries > addrcount)
+ if (tries > addrcount) {
+ /* Flush bad address count in case we are
+ * skipping over incompatible protocol
+ */
+ ipcacheMarkAllGood(host);
return 0;
+ }
}
return commResetFD();
void
ConnectStateData::connect()
{
- if (S.IsAnyAddr())
- defaults();
+ defaults();
debugs(5,5, HERE << "to " << S);
callCallback(COMM_OK, 0);
break;
-#if USE_IPV6
case COMM_ERR_PROTOCOL:
+ debugs(5, 5, HERE "FD " << fd << ": COMM_ERR_PROTOCOL - try again");
/* problem using the desired protocol over this socket.
- * count the connection attempt, reset the socket, and immediately try again */
+ * skip to the next address and hope it's more compatible
+ * but do not mark the current address as bad
+ */
tries++;
- commResetFD();
- connect();
+ if (commRetryConnect()) {
+ /* Force an addr cycle to move forward to the next possible address */
+ ipcacheCycleAddr(host, NULL);
+ eventAdd("commReconnect", commReconnect, this, this->addrcount == 1 ? 0.05 : 0.0, 0);
+ } else {
+ debugs(5, 5, HERE << "FD " << fd << ": COMM_ERR_PROTOCOL - ERR tried too many times already.");
+ callCallback(COMM_ERR_CONNECT, errno);
+ }
break;
-#endif
default:
debugs(5, 5, HERE "FD " << fd << ": * - try again");
}
int
-comm_connect_addr(int sock, const IpAddress &address)
+comm_connect_addr(int sock, const Ip::Address &address)
{
comm_err_t status = COMM_OK;
fde *F = &fd_table[sock];
assert(address.GetPort() != 0);
- debugs(5, 9, "comm_connect_addr: connecting socket " << sock << " to " << address << " (want family: " << F->sock_family << ")");
+ debugs(5, 9, HERE << "connecting socket FD " << sock << " to " << address << " (want family: " << F->sock_family << ")");
- /* BUG 2222 FIX: reset the FD when its found to be IPv4 in IPv6 mode */
- /* inverse case of IPv4 failing to connect on IPv6 socket is handeld post-connect.
+#if USE_IPV6
+ /* Handle IPv6 over IPv4-only socket case.
* this case must presently be handled here since the GetAddrInfo asserts on bad mappings.
- * eventually we want it to throw a Must() that gets handled there instead of this if.
- * NP: because commresetFD is private to ConnStateData we have to return an error and
+ * NP: because commResetFD is private to ConnStateData we have to return an error and
* trust its handled properly.
*/
-#if USE_IPV6
if (F->sock_family == AF_INET && !address.IsIPv4()) {
+ errno = ENETUNREACH;
return COMM_ERR_PROTOCOL;
}
-#endif
+
+ /* Handle IPv4 over IPv6-only socket case.
+ * This case is presently handled here as it's both a known case and it's
+ * uncertain what error will be returned by the IPv6 stack in such case. It's
+ * possible this will also be handled by the errno checks below after connect()
+ * but needs carefull cross-platform verification, and verifying the address
+ * condition here is simple.
+ */
+ if (!F->local_addr.IsIPv4() && address.IsIPv4()) {
+ errno = ENETUNREACH;
+ return COMM_ERR_PROTOCOL;
+ }
+#endif /* USE_IPV6 */
address.GetAddrInfo(AI, F->sock_family);
status = COMM_OK;
else if (ignoreErrno(errno))
status = COMM_INPROGRESS;
+ else if (errno == EAFNOSUPPORT || errno == EINVAL)
+ return COMM_ERR_PROTOCOL;
else
-#if USE_IPV6
- if ( F->sock_family == AF_INET6 && address.IsIPv4() ) {
-
- /* failover to trying IPv4-only link if an IPv6 one fails */
- /* to catch the edge case of apps listening on IPv4-localhost */
- F->sock_family = AF_INET;
- int res = comm_connect_addr(sock, address);
-
- /* if that fails too, undo our temporary socktype hack so the repeat works properly. */
- if (res == COMM_ERROR)
- F->sock_family = AF_INET6;
-
- return res;
- } else
-#endif
- return COMM_ERROR;
+ return COMM_ERROR;
address.NtoA(F->ipaddr, MAX_IPSTRLEN);
return status;
}
-/* Wait for an incoming connection on FD. FD should be a socket returned
- * from comm_listen. */
-static int
-comm_old_accept(int fd, ConnectionDetail &details)
-{
- PROF_start(comm_accept);
- statCounter.syscalls.sock.accepts++;
- int sock;
- struct addrinfo *gai = NULL;
- details.me.InitAddrInfo(gai);
-
- if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
-
- details.me.FreeAddrInfo(gai);
-
- PROF_stop(comm_accept);
-
- if (ignoreErrno(errno)) {
- debugs(50, 5, "comm_old_accept: FD " << fd << ": " << xstrerror());
- return COMM_NOMESSAGE;
- } else if (ENFILE == errno || EMFILE == errno) {
- debugs(50, 3, "comm_old_accept: FD " << fd << ": " << xstrerror());
- return COMM_ERROR;
- } else {
- debugs(50, 1, "comm_old_accept: FD " << fd << ": " << xstrerror());
- return COMM_ERROR;
- }
- }
-
- details.peer = *gai;
-
- details.me.InitAddrInfo(gai);
-
- details.me.SetEmpty();
- getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
- details.me = *gai;
-
- commSetCloseOnExec(sock);
-
- /* fdstat update */
- fd_open(sock, FD_SOCKET, "HTTP Request");
- fdd_table[sock].close_file = NULL;
- fdd_table[sock].close_line = 0;
- fde *F = &fd_table[sock];
- details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
- F->remote_port = details.peer.GetPort();
- F->local_addr.SetPort(details.me.GetPort());
-#if USE_IPV6
- F->sock_family = AF_INET;
-#else
- F->sock_family = details.me.IsIPv4()?AF_INET:AF_INET6;
-#endif
- details.me.FreeAddrInfo(gai);
-
- commSetNonBlocking(sock);
-
- /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
- F->flags.transparent = fd_table[fd].flags.transparent;
-
- PROF_stop(comm_accept);
- return sock;
-}
-
void
commCallCloseHandlers(int fd)
{
L.l_linger = 0;
if (setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *) &L, sizeof(L)) < 0)
- debugs(50, 0, "commResetTCPClose: FD " << fd << ": " << xstrerror());
+ debugs(50, DBG_CRITICAL, "ERROR: Closing FD " << fd << " with TCP RST: " << xstrerror());
comm_close(fd);
}
}
-
void
comm_close_complete(int fd, void *data)
{
close(fd);
- fdc_table[fd] = AcceptFD(fd);
-
statCounter.syscalls.sock.closes++;
/* When an fd closes, give accept() a chance, if need be */
-
- if (fdNFree() >= RESERVED_FD)
- AcceptLimiter::Instance().kick();
-
+ Comm::AcceptLimiter::Instance().kick();
}
/*
commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
}
- // notify accept handlers
- fdc_table[fd].notify(-1, COMM_ERR_CLOSING, 0, ConnectionDetail());
-
commCallCloseHandlers(fd);
if (F->pconn.uses)
/* Send a udp datagram to specified TO_ADDR. */
int
comm_udp_sendto(int fd,
- const IpAddress &to_addr,
+ const Ip::Address &to_addr,
const void *buf,
int len)
{
fd_table =(fde *) xcalloc(Squid_MaxFD, sizeof(fde));
fdd_table = (fd_debug_t *)xcalloc(Squid_MaxFD, sizeof(fd_debug_t));
- fdc_table = new AcceptFD[Squid_MaxFD];
- for (int pos = 0; pos < Squid_MaxFD; ++pos) {
- fdc_table[pos] = AcceptFD(pos);
- }
+ /* make sure the accept() socket FIFO delay queue exists */
+ Comm::AcceptLimiter::Instance();
commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t));
for (int pos = 0; pos < Squid_MaxFD; pos++) {
safe_free(fd_table);
safe_free(fdd_table);
- if (fdc_table) {
- delete[] fdc_table;
- fdc_table = NULL;
- }
safe_free(commfd_table);
}
debugs(5, 5, "commCloseAllSockets: FD " << fd << ": Calling timeout handler");
ScheduleCallHere(callback);
} else {
- debugs(5, 5, "commCloseAllSockets: FD " << fd << ": calling comm_close()");
- comm_close(fd);
+ debugs(5, 5, "commCloseAllSockets: FD " << fd << ": calling comm_reset_close()");
+ comm_reset_close(fd);
}
}
}
}
}
-/*
- * New-style listen and accept routines
- *
- * Listen simply registers our interest in an FD for listening,
- * and accept takes a callback to call when an FD has been
- * accept()ed.
- */
-int
-comm_listen(int sock)
-{
- int x;
-
- if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) {
- debugs(50, 0, "comm_listen: listen(" << (Squid_MaxFD >> 2) << ", " << sock << "): " << xstrerror());
- return x;
- }
-
- if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) {
-#ifdef SO_ACCEPTFILTER
- struct accept_filter_arg afa;
- bzero(&afa, sizeof(afa));
- debugs(5, DBG_CRITICAL, "Installing accept filter '" << Config.accept_filter << "' on FD " << sock);
- xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
- x = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa));
- if (x < 0)
- debugs(5, 0, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
-#elif defined(TCP_DEFER_ACCEPT)
- int seconds = 30;
- if (strncmp(Config.accept_filter, "data=", 5) == 0)
- seconds = atoi(Config.accept_filter + 5);
- x = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds));
- if (x < 0)
- debugs(5, 0, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
-#else
- debugs(5, 0, "accept_filter not supported on your OS");
-#endif
- }
-
- return sock;
-}
-
-void
-comm_accept(int fd, IOACB *handler, void *handler_data)
-{
- debugs(5, 5, "comm_accept: FD " << fd << " handler: " << (void*)handler);
- assert(isOpen(fd));
-
- AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler",
- CommAcceptCbPtrFun(handler, handler_data));
- fdc_table[fd].subscribe(call);
-}
-
-void
-comm_accept(int fd, AsyncCall::Pointer &call)
-{
- debugs(5, 5, "comm_accept: FD " << fd << " AsyncCall: " << call);
- assert(isOpen(fd));
-
- fdc_table[fd].subscribe(call);
-}
-
-// Called when somebody wants to be notified when our socket accepts new
-// connection. We do not probe the FD until there is such interest.
-void
-AcceptFD::subscribe(AsyncCall::Pointer &call)
-{
- /* make sure we're not pending! */
- assert(!theCallback);
- theCallback = call;
-
-#if OPTIMISTIC_IO
- mayAcceptMore = true; // even if we failed to accept last time
-#endif
-
- if (mayAcceptMore)
- acceptNext();
- else
- commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
-}
-
-bool
-AcceptFD::acceptOne()
-{
- // If there is no callback and we accept, we will leak the accepted FD.
- // When we are running out of FDs, there is often no callback.
- if (!theCallback) {
- debugs(5, 5, "AcceptFD::acceptOne orphaned: FD " << fd);
- // XXX: can we remove this and similar "just in case" calls and
- // either listen always or listen only when there is a callback?
- if (!AcceptLimiter::Instance().deferring())
- commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
- return false;
- }
-
- /*
- * We don't worry about running low on FDs here. Instead,
- * httpAccept() will use AcceptLimiter if we reach the limit
- * there.
- */
-
- /* Accept a new connection */
- ConnectionDetail connDetails;
- int newfd = comm_old_accept(fd, connDetails);
-
- /* Check for errors */
-
- if (newfd < 0) {
- assert(theCallback != NULL);
-
- if (newfd == COMM_NOMESSAGE) {
- /* register interest again */
- debugs(5, 5, HERE << "try later: FD " << fd <<
- " handler: " << *theCallback);
- commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
- return false;
- }
-
- // A non-recoverable error; notify the caller */
- notify(-1, COMM_ERROR, errno, connDetails);
- return false;
- }
-
- assert(theCallback != NULL);
- debugs(5, 5, "AcceptFD::acceptOne accepted: FD " << fd <<
- " newfd: " << newfd << " from: " << connDetails.peer <<
- " handler: " << *theCallback);
- notify(newfd, COMM_OK, 0, connDetails);
- return true;
-}
-
-void
-AcceptFD::acceptNext()
-{
- mayAcceptMore = acceptOne();
-}
-
-void
-AcceptFD::notify(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &connDetails)
-{
- if (theCallback != NULL) {
- typedef CommAcceptCbParams Params;
- Params ¶ms = GetCommParams<Params>(theCallback);
- params.fd = fd;
- params.nfd = newfd;
- params.details = connDetails;
- params.flag = errcode;
- params.xerrno = xerrno;
- ScheduleCallHere(theCallback);
- theCallback = NULL;
- }
-}
-
-/*
- * This callback is called whenever a filedescriptor is ready
- * to dupe itself and fob off an accept()ed connection
- */
-static void
-comm_accept_try(int fd, void *)
-{
- assert(isOpen(fd));
- fdc_table[fd].acceptNext();
-}
-
void CommIO::Initialise()
{
/* Initialize done pipe signal */
}
}
-AcceptLimiter AcceptLimiter::Instance_;
-
-AcceptLimiter &AcceptLimiter::Instance()
-{
- return Instance_;
-}
-
-bool
-AcceptLimiter::deferring() const
-{
- return deferred.size() > 0;
-}
-
-void
-AcceptLimiter::defer (int fd, Acceptor::AcceptorFunction *aFunc, void *data)
-{
- debugs(5, 5, "AcceptLimiter::defer: FD " << fd << " handler: " << (void*)aFunc);
- Acceptor temp;
- temp.theFunction = aFunc;
- temp.acceptFD = fd;
- temp.theData = data;
- deferred.push_back(temp);
-}
-
-void
-AcceptLimiter::kick()
-{
- if (!deferring())
- return;
-
- /* Yes, this means the first on is the last off....
- * If the list container was a little more friendly, we could sensibly us it.
- */
- Acceptor temp = deferred.pop_back();
-
- comm_accept (temp.acceptFD, temp.theFunction, temp.theData);
-}
-
/// Start waiting for a possibly half-closed connection to close
// by scheduling a read callback to a monitoring handler that
// will close the connection on read errors.
return EVENT_ERROR;
};
}
+
+/// Create a unix-domain socket (UDS) that only supports FD_MSGHDR I/O.
+int
+comm_open_uds(int sock_type,
+ int proto,
+ struct sockaddr_un* addr,
+ int flags)
+{
+ // TODO: merge with comm_openex() when Ip::Address becomes NetAddress
+
+ int new_socket;
+
+ PROF_start(comm_open);
+ /* Create socket for accepting new connections. */
+ statCounter.syscalls.sock.sockets++;
+
+ /* Setup the socket addrinfo details for use */
+ struct addrinfo AI;
+ AI.ai_flags = 0;
+ AI.ai_family = PF_UNIX;
+ AI.ai_socktype = sock_type;
+ AI.ai_protocol = proto;
+ AI.ai_addrlen = SUN_LEN(addr);
+ AI.ai_addr = (sockaddr*)addr;
+ AI.ai_canonname = NULL;
+ AI.ai_next = NULL;
+
+ debugs(50, 3, HERE << "Attempt open socket for: " << addr->sun_path);
+
+ if ((new_socket = socket(AI.ai_family, AI.ai_socktype, AI.ai_protocol)) < 0) {
+ /* Increase the number of reserved fd's if calls to socket()
+ * are failing because the open file table is full. This
+ * limits the number of simultaneous clients */
+
+ if (limitError(errno)) {
+ debugs(50, DBG_IMPORTANT, HERE << "socket failure: " << xstrerror());
+ fdAdjustReserved();
+ } else {
+ debugs(50, DBG_CRITICAL, HERE << "socket failure: " << xstrerror());
+ }
+
+ PROF_stop(comm_open);
+ return -1;
+ }
+
+ debugs(50, 3, HERE "Opened UDS FD " << new_socket << " : family=" << AI.ai_family << ", type=" << AI.ai_socktype << ", protocol=" << AI.ai_protocol);
+
+ /* update fdstat */
+ debugs(50, 5, HERE << "FD " << new_socket << " is a new socket");
+
+ assert(!isOpen(new_socket));
+ fd_open(new_socket, FD_MSGHDR, NULL);
+
+ fdd_table[new_socket].close_file = NULL;
+
+ fdd_table[new_socket].close_line = 0;
+
+ fd_table[new_socket].sock_family = AI.ai_family;
+
+ if (!(flags & COMM_NOCLOEXEC))
+ commSetCloseOnExec(new_socket);
+
+ if (flags & COMM_REUSEADDR)
+ commSetReuseAddr(new_socket);
+
+ if (flags & COMM_NONBLOCKING) {
+ if (commSetNonBlocking(new_socket) != COMM_OK) {
+ comm_close(new_socket);
+ PROF_stop(comm_open);
+ return -1;
+ }
+ }
+
+ if (flags & COMM_DOBIND) {
+ if (commBind(new_socket, AI) != COMM_OK) {
+ comm_close(new_socket);
+ PROF_stop(comm_open);
+ return -1;
+ }
+ }
+
+#ifdef TCP_NODELAY
+ if (sock_type == SOCK_STREAM)
+ commSetTcpNoDelay(new_socket);
+
+#endif
+
+ if (Config.tcpRcvBufsz > 0 && sock_type == SOCK_STREAM)
+ commSetTcpRcvbuf(new_socket, Config.tcpRcvBufsz);
+
+ PROF_stop(comm_open);
+
+ return new_socket;
+}