-
/*
- * $Id: comm.cc,v 1.447 2008/02/26 21:49:34 amosjeffries Exp $
- *
- * DEBUG: section 5 Socket Functions
+ * DEBUG: section 05 Socket Functions
* AUTHOR: Harvest Derived
*
* SQUID Web Proxy Cache http://www.squid-cache.org/
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
- *
+ *
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
- *
+ *
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
#include "comm.h"
#include "event.h"
#include "fde.h"
+#include "comm/AcceptLimiter.h"
+#include "comm/comm_internal.h"
+#include "comm/ListenStateData.h"
#include "CommIO.h"
#include "CommRead.h"
#include "ConnectionDetail.h"
#include "pconn.h"
#include "SquidTime.h"
#include "CommCalls.h"
-#include "IPAddress.h"
-#include "IPInterception.h"
#include "DescriptorSet.h"
#include "icmp/net_db.h"
+#include "ip/Address.h"
+#include "ip/Intercept.h"
#if defined(_SQUID_CYGWIN_)
#include <sys/ioctl.h>
*/
typedef enum {
- IOCB_NONE,
- IOCB_READ,
- IOCB_WRITE
+ IOCB_NONE,
+ IOCB_READ,
+ IOCB_WRITE
} iocb_type;
static void commStopHalfClosedMonitor(int fd);
static IOCB commHalfClosedReader;
+static void comm_init_opened(int new_socket, Ip::Address &addr, unsigned char TOS, const char *note, struct addrinfo *AI);
+static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI);
struct comm_io_callback_t {
- iocb_type type;
- int fd;
- AsyncCall::Pointer callback;
- char *buf;
- FREE *freefunc;
- int size;
- int offset;
- comm_err_t errcode;
- int xerrno;
-
- bool active() const { return callback != NULL; }
+ iocb_type type;
+ int fd;
+ AsyncCall::Pointer callback;
+ char *buf;
+ FREE *freefunc;
+ int size;
+ int offset;
+ comm_err_t errcode;
+ int xerrno;
+
+ bool active() const { return callback != NULL; }
};
struct _comm_fd {
- int fd;
- comm_io_callback_t readcb;
- comm_io_callback_t writecb;
+ int fd;
+ comm_io_callback_t readcb;
+ comm_io_callback_t writecb;
};
typedef struct _comm_fd comm_fd_t;
comm_fd_t *commfd_table;
bool
commio_has_callback(int fd, iocb_type type, comm_io_callback_t *ccb)
{
- assert(ccb->fd == fd);
- assert(ccb->type == type);
- return ccb->active();
+ assert(ccb->fd == fd);
+ assert(ccb->type == type);
+ return ccb->active();
}
/*
*/
static void
commio_set_callback(int fd, iocb_type type, comm_io_callback_t *ccb,
- AsyncCall::Pointer &cb, char *buf, FREE *freefunc, int size)
+ AsyncCall::Pointer &cb, char *buf, FREE *freefunc, int size)
{
- assert(!ccb->active());
- assert(ccb->type == type);
- assert(cb != NULL);
- ccb->fd = fd;
- ccb->callback = cb;
- ccb->buf = buf;
- ccb->freefunc = freefunc;
- ccb->size = size;
- ccb->offset = 0;
+ assert(!ccb->active());
+ assert(ccb->type == type);
+ assert(cb != NULL);
+ ccb->fd = fd;
+ ccb->callback = cb;
+ ccb->buf = buf;
+ ccb->freefunc = freefunc;
+ ccb->size = size;
+ ccb->offset = 0;
}
commio_finish_callback(int fd, comm_io_callback_t *ccb, comm_err_t code, int xerrno)
{
debugs(5, 3, "commio_finish_callback: called for FD " << fd << " (" <<
- code << ", " << xerrno << ")");
- assert(ccb->active());
- assert(ccb->fd == fd);
- ccb->errcode = code;
- ccb->xerrno = xerrno;
-
- comm_io_callback_t cb = *ccb;
-
- /* We've got a copy; blow away the real one */
- /* XXX duplicate code from commio_cancel_callback! */
- ccb->xerrno = 0;
- ccb->callback = NULL; // cb has it
-
- /* free data */
- if (cb.freefunc) {
- cb.freefunc(cb.buf);
- cb.buf = NULL;
- }
-
- if (cb.callback != NULL) {
+ code << ", " << xerrno << ")");
+ assert(ccb->active());
+ assert(ccb->fd == fd);
+ ccb->errcode = code;
+ ccb->xerrno = xerrno;
+
+ comm_io_callback_t cb = *ccb;
+
+ /* We've got a copy; blow away the real one */
+ /* XXX duplicate code from commio_cancel_callback! */
+ ccb->xerrno = 0;
+ ccb->callback = NULL; // cb has it
+
+ /* free data */
+ if (cb.freefunc) {
+ cb.freefunc(cb.buf);
+ cb.buf = NULL;
+ }
+
+ if (cb.callback != NULL) {
typedef CommIoCbParams Params;
Params ¶ms = GetCommParams<Params>(cb.callback);
params.fd = cb.fd;
params.flag = cb.errcode;
params.xerrno = cb.xerrno;
ScheduleCallHere(cb.callback);
- }
+ }
}
commio_cancel_callback(int fd, comm_io_callback_t *ccb)
{
debugs(5, 3, "commio_cancel_callback: called for FD " << fd);
- assert(ccb->fd == fd);
- assert(ccb->active());
+ assert(ccb->fd == fd);
+ assert(ccb->active());
- ccb->xerrno = 0;
- ccb->callback = NULL;
+ ccb->xerrno = 0;
+ ccb->callback = NULL;
}
/*
* Call the given comm callback; assumes the callback is valid.
- *
+ *
* @param ccb io completion callback
*/
void
// defaults given by client
char *host;
u_short default_port;
- IPAddress default_addr;
+ Ip::Address default_addr;
// NP: CANNOT store the default addr:port together as it gets set/reset differently.
- IPAddress S;
+ DnsLookupDetails dns; ///< host lookup details
+ Ip::Address S;
AsyncCall::Pointer callback;
int fd;
static PF commHandleWrite;
static IPH commConnectDnsHandle;
-static PF comm_accept_try;
-
-class AcceptFD
-{
-
-public:
- AcceptFD(int aFd = -1): fd(aFd), theCallback(0), mayAcceptMore(false) {}
-
- void subscribe(AsyncCall::Pointer &call);
- void acceptNext();
- void notify(int newfd, comm_err_t, int xerrno, const ConnectionDetail &);
-
- int fd;
-
-private:
- bool acceptOne();
-
- AsyncCall::Pointer theCallback;
- bool mayAcceptMore;
-};
-
typedef enum {
COMM_CB_READ = 1,
- COMM_CB_DERIVED,
+ COMM_CB_DERIVED
} comm_callback_t;
-struct _fd_debug_t
-{
- char const *close_file;
- int close_line;
-};
-
-typedef struct _fd_debug_t fd_debug_t;
-
static MemAllocator *conn_close_pool = NULL;
-AcceptFD *fdc_table = NULL; // TODO: rename. And use Vector<>?
fd_debug_t *fdd_table = NULL;
-static bool
+bool
isOpen(const int fd)
{
- return fd_table[fd].flags.open != 0;
+ return fd_table[fd].flags.open != 0;
}
/**
commHandleRead(int fd, void *data)
{
comm_io_callback_t *ccb = (comm_io_callback_t *) data;
-
+
assert(data == COMMIO_FD_READCB(fd));
assert(commio_has_callback(fd, IOCB_READ, ccb));
/* Attempt a read */
if (retval < 0 && !ignoreErrno(errno)) {
debugs(5, 3, "comm_read_try: scheduling COMM_ERROR");
- ccb->offset = 0;
- commio_finish_callback(fd, ccb, COMM_ERROR, errno);
+ ccb->offset = 0;
+ commio_finish_callback(fd, ccb, COMM_ERROR, errno);
return;
};
/* Note - read 0 == socket EOF, which is a valid read */
if (retval >= 0) {
fd_bytes(fd, retval, FD_READ);
- ccb->offset = retval;
- commio_finish_callback(fd, ccb, COMM_OK, errno);
+ ccb->offset = retval;
+ commio_finish_callback(fd, ccb, COMM_OK, errno);
return;
}
comm_read(int fd, char *buf, int size, IOCB *handler, void *handler_data)
{
AsyncCall::Pointer call = commCbCall(5,4, "SomeCommReadHandler",
- CommIoCbPtrFun(handler, handler_data));
+ CommIoCbPtrFun(handler, handler_data));
comm_read(fd, buf, size, call);
}
if (!isOpen(fd)) {
debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed");
return;
- }
+ }
comm_io_callback_t *cb = COMMIO_FD_READCB(fd);
// TODO: is "active" == "monitors FD"?
if (!cb->active()) {
debugs(5, 4, "comm_read_cancel fails: FD " << fd << " inactive");
return;
- }
+ }
typedef CommCbFunPtrCallT<CommIoCbPtrFun> Call;
Call *call = dynamic_cast<Call*>(cb->callback.getRaw());
if (!call) {
debugs(5, 4, "comm_read_cancel fails: FD " << fd << " lacks callback");
return;
- }
+ }
call->cancel("old comm_read_cancel");
comm_read_cancel(int fd, AsyncCall::Pointer &callback)
{
callback->cancel("comm_read_cancel");
-
+
if (!isOpen(fd)) {
debugs(5, 4, "comm_read_cancel fails: FD " << fd << " closed");
return;
AsyncCall::Pointer call = cb->callback;
assert(call != NULL); // XXX: should never fail (active() checks for callback==NULL)
-
+
/* Ok, we can be reasonably sure we won't lose any data here! */
assert(call == callback);
* synchronous wrapper around udp socket functions
*/
int
-comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, IPAddress &from)
+comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, Ip::Address &from)
{
statCounter.syscalls.sock.recvfroms++;
int x = 0;
int
comm_udp_recv(int fd, void *buf, size_t len, int flags)
{
- IPAddress nul;
+ Ip::Address nul;
return comm_udp_recvfrom(fd, buf, len, flags, nul);
}
u_short
comm_local_port(int fd)
{
- IPAddress temp;
+ Ip::Address temp;
struct addrinfo *addr = NULL;
fde *F = &fd_table[fd];
if (F->local_addr.GetPort())
return F->local_addr.GetPort();
+#if USE_IPV6
+ if (F->sock_family == AF_INET)
+ temp.SetIPv4();
+#endif
+
temp.InitAddrInfo(addr);
if (getsockname(fd, addr->ai_addr, &(addr->ai_addrlen)) ) {
F->local_addr.SetPort(temp.GetPort());
+#if 0 // seems to undo comm_open actions on the FD ...
// grab default socket information for this address
temp.GetAddrInfo(addr);
F->sock_family = addr->ai_family;
temp.FreeAddrInfo(addr);
+#endif
- debugs(5, 6, "comm_local_port: FD " << fd << ": port " << F->local_addr.GetPort());
+ debugs(5, 6, "comm_local_port: FD " << fd << ": port " << F->local_addr.GetPort() << "(family=" << F->sock_family << ")");
return F->local_addr.GetPort();
}
{
statCounter.syscalls.sock.binds++;
- if (bind(s, inaddr.ai_addr, inaddr.ai_addrlen) == 0)
+ if (bind(s, inaddr.ai_addr, inaddr.ai_addrlen) == 0) {
+ debugs(50, 6, "commBind: bind socket FD " << s << " to " << fd_table[s].local_addr);
return COMM_OK;
+ }
debugs(50, 0, "commBind: Cannot bind socket FD " << s << " to " << fd_table[s].local_addr << ": " << xstrerror());
int
comm_open(int sock_type,
int proto,
- IPAddress &addr,
+ Ip::Address &addr,
int flags,
const char *note)
{
return comm_openex(sock_type, proto, addr, flags, 0, note);
}
+int
+comm_open_listener(int sock_type,
+ int proto,
+ Ip::Address &addr,
+ int flags,
+ const char *note)
+{
+ int sock = -1;
+
+ /* all listener sockets require bind() */
+ flags |= COMM_DOBIND;
+
+ /* attempt native enabled port. */
+ sock = comm_openex(sock_type, proto, addr, flags, 0, note);
+
+ return sock;
+}
+
static bool
limitError(int const anErrno)
{
comm_set_tos(int fd, int tos)
{
#ifdef IP_TOS
- int x = setsockopt(fd, IPPROTO_IP, IP_TOS, (char *) &tos, sizeof(int));
- if (x < 0)
- debugs(50, 1, "comm_set_tos: setsockopt(IP_TOS) on FD " << fd << ": " << xstrerror());
- return x;
+ int x = setsockopt(fd, IPPROTO_IP, IP_TOS, (char *) &tos, sizeof(int));
+ if (x < 0)
+ debugs(50, 1, "comm_set_tos: setsockopt(IP_TOS) on FD " << fd << ": " << xstrerror());
+ return x;
#else
- debugs(50, 0, "WARNING: setsockopt(IP_TOS) not supported on this platform");
- return -1;
+ debugs(50, 0, "WARNING: setsockopt(IP_TOS) not supported on this platform");
+ return -1;
#endif
}
{
#ifdef IPV6_V6ONLY
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &tos, sizeof(int)) < 0) {
- debugs(50, 1, "comm_open: setsockopt(IPV6_V6ONLY) on FD " << fd << ": " << xstrerror());
+ debugs(50, 1, "comm_open: setsockopt(IPV6_V6ONLY) " << (tos?"ON":"OFF") << " for FD " << fd << ": " << xstrerror());
}
#else
debugs(50, 0, "WARNING: comm_open: setsockopt(IPV6_V6ONLY) not supported on this platform");
int tos = 1;
if (setsockopt(fd, SOL_IP, IP_TRANSPARENT, (char *) &tos, sizeof(int)) < 0) {
debugs(50, DBG_IMPORTANT, "comm_open: setsockopt(IP_TRANSPARENT) on FD " << fd << ": " << xstrerror());
- }
- else {
+ } else {
/* mark the socket as having transparent options */
fd_table[fd].flags.transparent = 1;
}
int
comm_openex(int sock_type,
int proto,
- IPAddress &addr,
+ Ip::Address &addr,
int flags,
unsigned char TOS,
const char *note)
{
int new_socket;
- fde *F = NULL;
int tos = 0;
struct addrinfo *AI = NULL;
debugs(50, 3, "comm_openex: Attempt open socket for: " << addr );
- if ((new_socket = socket(AI->ai_family, AI->ai_socktype, AI->ai_protocol)) < 0)
- {
+ new_socket = socket(AI->ai_family, AI->ai_socktype, AI->ai_protocol);
+#if USE_IPV6
+ /* under IPv6 there is the possibility IPv6 is present but disabled. */
+ /* try again as IPv4-native if possible */
+ if ( new_socket < 0 && addr.IsIPv6() && addr.SetIPv4() ) {
+ /* attempt to open this IPv4-only. */
+ addr.FreeAddrInfo(AI);
+ /* Setup the socket addrinfo details for use */
+ addr.GetAddrInfo(AI);
+ AI->ai_socktype = sock_type;
+ AI->ai_protocol = proto;
+ debugs(50, 3, "comm_openex: Attempt fallback open socket for: " << addr );
+ new_socket = socket(AI->ai_family, AI->ai_socktype, AI->ai_protocol);
+ debugs(50, 2, HERE << "attempt open " << note << " socket on: " << addr);
+ }
+#endif
+
+ if (new_socket < 0) {
/* Increase the number of reserved fd's if calls to socket()
* are failing because the open file table is full. This
* limits the number of simultaneous clients */
#if IPV6_SPECIAL_SPLITSTACK
- if( addr.IsIPv6() )
- comm_set_v6only(new_socket, tos);
+ if ( addr.IsIPv6() )
+ comm_set_v6only(new_socket, 1);
#endif
/* Windows Vista supports Dual-Sockets. BUT defaults them to V6ONLY. Turn it OFF. */
/* Other OS may have this administratively disabled for general use. Same deal. */
- if( addr.IsIPv6() )
+ if ( addr.IsIPv6() )
comm_set_v6only(new_socket, 0);
#endif
+ comm_init_opened(new_socket, addr, TOS, note, AI);
+ new_socket = comm_apply_flags(new_socket, addr, flags, AI);
+
+ addr.FreeAddrInfo(AI);
+
+ PROF_stop(comm_open);
+
+ return new_socket;
+}
+
+/// update FD tables after a local or remote (IPC) comm_openex();
+void
+comm_init_opened(int new_socket,
+ Ip::Address &addr,
+ unsigned char TOS,
+ const char *note,
+ struct addrinfo *AI)
+{
+ assert(new_socket >= 0);
+ assert(AI);
+
+ fde *F = NULL;
+
/* update fdstat */
debugs(5, 5, "comm_open: FD " << new_socket << " is a new socket");
F->tos = TOS;
F->sock_family = AI->ai_family;
+}
+
+/// apply flags after a local comm_open*() call;
+/// returns new_socket or -1 on error
+static int
+comm_apply_flags(int new_socket,
+ Ip::Address &addr,
+ int flags,
+ struct addrinfo *AI)
+{
+ assert(new_socket >= 0);
+ assert(AI);
+ const int sock_type = AI->ai_socktype;
if (!(flags & COMM_NOCLOEXEC))
commSetCloseOnExec(new_socket);
if ((flags & COMM_REUSEADDR))
commSetReuseAddr(new_socket);
- if (addr.GetPort() > (u_short) 0)
- {
+ if (addr.GetPort() > (u_short) 0) {
#ifdef _SQUID_MSWIN_
-
if (sock_type != SOCK_DGRAM)
#endif
-
commSetNoLinger(new_socket);
if (opt_reuseaddr)
}
/* MUST be done before binding or face OS Error: "(99) Cannot assign requested address"... */
- if((flags & COMM_TRANSPARENT)) {
+ if ((flags & COMM_TRANSPARENT)) {
comm_set_transparent(new_socket);
}
- if (!addr.IsNoAddr())
- {
+ if ( (flags & COMM_DOBIND) || addr.GetPort() > 0 || !addr.IsAnyAddr() ) {
+ if ( !(flags & COMM_DOBIND) && addr.IsAnyAddr() )
+ debugs(5,1,"WARNING: Squid is attempting to bind() port " << addr << " without being a listener.");
+ if ( addr.IsNoAddr() )
+ debugs(5,0,"CRITICAL: Squid is attempting to bind() port " << addr << "!!");
+
if (commBind(new_socket, *AI) != COMM_OK) {
comm_close(new_socket);
- addr.FreeAddrInfo(AI);
return -1;
- PROF_stop(comm_open);
}
}
- addr.FreeAddrInfo(AI);
-
if (flags & COMM_NONBLOCKING)
- if (commSetNonBlocking(new_socket) == COMM_ERROR)
- {
+ if (commSetNonBlocking(new_socket) == COMM_ERROR) {
+ comm_close(new_socket);
return -1;
- PROF_stop(comm_open);
}
#ifdef TCP_NODELAY
if (Config.tcpRcvBufsz > 0 && sock_type == SOCK_STREAM)
commSetTcpRcvbuf(new_socket, Config.tcpRcvBufsz);
- PROF_stop(comm_open);
-
return new_socket;
}
+void
+comm_import_opened(int fd,
+ Ip::Address &addr,
+ int flags,
+ const char *note,
+ struct addrinfo *AI)
+{
+ debugs(5, 2, HERE << " FD " << fd << " at " << addr);
+ assert(fd >= 0);
+ assert(AI);
+
+ comm_init_opened(fd, addr, 0, note, AI);
+
+ if (!(flags & COMM_NOCLOEXEC))
+ fd_table[fd].flags.close_on_exec = 1;
+
+ if (addr.GetPort() > (u_short) 0) {
+#ifdef _SQUID_MSWIN_
+ if (sock_type != SOCK_DGRAM)
+#endif
+ fd_table[fd].flags.nolinger = 1;
+ }
+
+ if ((flags & COMM_TRANSPARENT))
+ fd_table[fd].flags.transparent = 1;
+
+ if (flags & COMM_NONBLOCKING)
+ fd_table[fd].flags.nonblocking = 1;
+
+#ifdef TCP_NODELAY
+ if (AI->ai_socktype == SOCK_STREAM)
+ fd_table[fd].flags.nodelay = 1;
+#endif
+
+ /* no fd_table[fd].flags. updates needed for these conditions:
+ * if ((flags & COMM_REUSEADDR)) ...
+ * if ((flags & COMM_DOBIND) ...) ...
+ */
+}
+
+
CBDATA_CLASS_INIT(ConnectStateData);
void *
commConnectStart(int fd, const char *host, u_short port, AsyncCall::Pointer &cb)
{
debugs(cb->debugSection, cb->debugLevel, "commConnectStart: FD " << fd <<
- ", cb " << cb << ", " << host << ":" << port); // TODO: just print *cb
+ ", cb " << cb << ", " << host << ":" << port); // TODO: just print *cb
ConnectStateData *cs;
cs = new ConnectStateData;
{
debugs(5, 5, "commConnectStart: FD " << fd << ", data " << data << ", " << host << ":" << port);
AsyncCall::Pointer call = commCbCall(5,3,
- "SomeCommConnectHandler", CommConnectCbPtrFun(callback, data));
+ "SomeCommConnectHandler", CommConnectCbPtrFun(callback, data));
commConnectStart(fd, host, port, call);
}
static void
-commConnectDnsHandle(const ipcache_addrs * ia, void *data)
+commConnectDnsHandle(const ipcache_addrs *ia, const DnsLookupDetails &details, void *data)
{
ConnectStateData *cs = (ConnectStateData *)data;
+ cs->dns = details;
if (ia == NULL) {
debugs(5, 3, "commConnectDnsHandle: Unknown host: " << cs->host);
-
- if (!dns_error_message) {
- dns_error_message = "Unknown DNS error";
- debugs(5, 1, "commConnectDnsHandle: Bad dns_error_message");
- }
-
- assert(dns_error_message != NULL);
cs->callCallback(COMM_ERR_DNS, 0);
return;
}
typedef CommConnectCbParams Params;
Params ¶ms = GetCommParams<Params>(callback);
params.fd = fd;
+ params.dns = dns;
params.flag = status;
params.xerrno = xerrno;
ScheduleCallHere(callback);
int
ConnectStateData::commResetFD()
{
- struct addrinfo *AI = NULL;
- IPAddress nul;
- int new_family = AF_UNSPEC;
// XXX: do we have to check this?
//
statCounter.syscalls.sock.sockets++;
- /* setup a bare-bones addrinfo */
- /* TODO INET6: for WinXP we may need to check the local_addr type and setup the family properly. */
- nul.GetAddrInfo(AI);
- new_family = AI->ai_family;
+ fde *F = &fd_table[fd];
- int fd2 = socket(AI->ai_family, AI->ai_socktype, AI->ai_protocol);
+ struct addrinfo *AI = NULL;
+ F->local_addr.GetAddrInfo(AI);
+ int new_family = AI->ai_family;
- nul.FreeAddrInfo(AI);
+ int fd2 = socket(new_family, AI->ai_socktype, AI->ai_protocol);
if (fd2 < 0) {
debugs(5, DBG_CRITICAL, HERE << "WARNING: FD " << fd2 << " socket failed to allocate: " << xstrerror());
if (ENFILE == errno || EMFILE == errno)
fdAdjustReserved();
+ F->local_addr.FreeAddrInfo(AI);
return 0;
}
close(fd2);
+ F->local_addr.FreeAddrInfo(AI);
return 0;
}
commResetSelect(fd);
close(fd2);
- fde *F = &fd_table[fd];
+
+ debugs(50, 3, "commResetFD: Reset socket FD " << fd << "->" << fd2 << " : family=" << new_family );
/* INET6: copy the new sockets family type to the FDE table */
- fd_table[fd].sock_family = new_family;
+ F->sock_family = new_family;
+
+ F->flags.called_connect = 0;
- fd_table[fd].flags.called_connect = 0;
/*
* yuck, this has assumptions about comm_open() arguments for
* the original socket
*/
/* MUST be done before binding or face OS Error: "(99) Cannot assign requested address"... */
- if( F->flags.transparent ) {
+ if ( F->flags.transparent ) {
comm_set_transparent(fd);
}
- AI = NULL;
- F->local_addr.GetAddrInfo(AI);
-
if (commBind(fd, *AI) != COMM_OK) {
debugs(5, DBG_CRITICAL, "WARNING: Reset of FD " << fd << " for " << F->local_addr << " failed to bind: " << xstrerror());
F->local_addr.FreeAddrInfo(AI);
comm_set_tos(fd, F->tos);
#if IPV6_SPECIAL_SPLITSTACK
-
- if( F->local_addr.IsIPv6() )
- comm_set_v6only(fd, F->tos);
-
+ if ( F->local_addr.IsIPv6() )
+ comm_set_v6only(fd, 1);
#endif
copyFDFlags(fd, F);
if (squid_curtime - connstart > Config.Timeout.connect)
return 0;
} else {
- if (tries > addrcount)
+ if (tries > addrcount) {
+ /* Flush bad address count in case we are
+ * skipping over incompatible protocol
+ */
+ ipcacheMarkAllGood(host);
return 0;
+ }
}
return commResetFD();
void
ConnectStateData::connect()
{
- if (S.IsAnyAddr())
- defaults();
+ defaults();
debugs(5,5, HERE << "to " << S);
callCallback(COMM_OK, 0);
break;
-#if USE_IPV6
case COMM_ERR_PROTOCOL:
+ debugs(5, 5, HERE "FD " << fd << ": COMM_ERR_PROTOCOL - try again");
/* problem using the desired protocol over this socket.
- * count the connection attempt, reset the socket, and immediately try again */
+ * skip to the next address and hope it's more compatible
+ * but do not mark the current address as bad
+ */
tries++;
- commResetFD();
- connect();
+ if (commRetryConnect()) {
+ /* Force an addr cycle to move forward to the next possible address */
+ ipcacheCycleAddr(host, NULL);
+ eventAdd("commReconnect", commReconnect, this, this->addrcount == 1 ? 0.05 : 0.0, 0);
+ } else {
+ debugs(5, 5, HERE << "FD " << fd << ": COMM_ERR_PROTOCOL - ERR tried too many times already.");
+ callCallback(COMM_ERR_CONNECT, errno);
+ }
break;
-#endif
default:
debugs(5, 5, HERE "FD " << fd << ": * - try again");
{
AsyncCall::Pointer call;
debugs(5, 3, HERE << "FD " << fd << " timeout " << timeout);
- if(handler != NULL)
- call=commCbCall(5,4, "SomeTimeoutHandler", CommTimeoutCbPtrFun(handler, data));
+ if (handler != NULL)
+ call=commCbCall(5,4, "SomeTimeoutHandler", CommTimeoutCbPtrFun(handler, data));
else
- call = NULL;
+ call = NULL;
return commSetTimeout(fd, timeout, call);
}
F->timeout = 0;
} else {
if (callback != NULL) {
- typedef CommTimeoutCbParams Params;
- Params ¶ms = GetCommParams<Params>(callback);
- params.fd = fd;
+ typedef CommTimeoutCbParams Params;
+ Params ¶ms = GetCommParams<Params>(callback);
+ params.fd = fd;
F->timeoutHandler = callback;
}
}
int
-comm_connect_addr(int sock, const IPAddress &address)
+comm_connect_addr(int sock, const Ip::Address &address)
{
comm_err_t status = COMM_OK;
fde *F = &fd_table[sock];
assert(address.GetPort() != 0);
- debugs(5, 9, "comm_connect_addr: connecting socket " << sock << " to " << address << " (want family: " << F->sock_family << ")");
+ debugs(5, 9, HERE << "connecting socket FD " << sock << " to " << address << " (want family: " << F->sock_family << ")");
- /* BUG 2222 FIX: reset the FD when its found to be IPv4 in IPv6 mode */
- /* inverse case of IPv4 failing to connect on IPv6 socket is handeld post-connect.
+#if USE_IPV6
+ /* Handle IPv6 over IPv4-only socket case.
* this case must presently be handled here since the GetAddrInfo asserts on bad mappings.
- * eventually we want it to throw a Must() that gets handled there instead of this if.
- * NP: because commresetFD is private to ConnStateData we have to return an error and
+ * NP: because commResetFD is private to ConnStateData we have to return an error and
* trust its handled properly.
*/
-#if USE_IPV6
- if(F->sock_family == AF_INET && !address.IsIPv4()) {
+ if (F->sock_family == AF_INET && !address.IsIPv4()) {
+ errno = ENETUNREACH;
return COMM_ERR_PROTOCOL;
}
-#endif
+
+ /* Handle IPv4 over IPv6-only socket case.
+ * This case is presently handled here as it's both a known case and it's
+ * uncertain what error will be returned by the IPv6 stack in such case. It's
+ * possible this will also be handled by the errno checks below after connect()
+ * but needs carefull cross-platform verification, and verifying the address
+ * condition here is simple.
+ */
+ if (!F->local_addr.IsIPv4() && address.IsIPv4()) {
+ errno = ENETUNREACH;
+ return COMM_ERR_PROTOCOL;
+ }
+#endif /* USE_IPV6 */
address.GetAddrInfo(AI, F->sock_family);
/* Establish connection. */
errno = 0;
- if (!F->flags.called_connect)
- {
+ if (!F->flags.called_connect) {
F->flags.called_connect = 1;
statCounter.syscalls.sock.connects++;
errno = EINPROGRESS;
}
- if (x < 0)
- {
+ if (x < 0) {
debugs(5,5, "comm_connect_addr: sock=" << sock << ", addrinfo( " <<
- " flags=" << AI->ai_flags <<
- ", family=" << AI->ai_family <<
- ", socktype=" << AI->ai_socktype <<
- ", protocol=" << AI->ai_protocol <<
- ", &addr=" << AI->ai_addr <<
- ", addrlen=" << AI->ai_addrlen <<
- " )" );
+ " flags=" << AI->ai_flags <<
+ ", family=" << AI->ai_family <<
+ ", socktype=" << AI->ai_socktype <<
+ ", protocol=" << AI->ai_protocol <<
+ ", &addr=" << AI->ai_addr <<
+ ", addrlen=" << AI->ai_addrlen <<
+ " )" );
debugs(5, 9, "connect FD " << sock << ": (" << x << ") " << xstrerror());
debugs(14,9, "connecting to: " << address );
}
- } else
- {
+ } else {
#if defined(_SQUID_NEWSOS6_)
/* Makoto MATSUSHITA <matusita@ics.es.osaka-u.ac.jp> */
}
-/* Squid seems to be working fine without this code. With this code,
- * we leak memory on many connect requests because of EINPROGRESS.
- * If you find that this code is needed, please file a bug report. */
+ /* Squid seems to be working fine without this code. With this code,
+ * we leak memory on many connect requests because of EINPROGRESS.
+ * If you find that this code is needed, please file a bug report. */
#if 0
#ifdef _SQUID_LINUX_
/* 2007-11-27:
- * Linux Debian replaces our allocated AI pointer with garbage when
+ * Linux Debian replaces our allocated AI pointer with garbage when
* connect() fails. This leads to segmentation faults deallocating
* the system-allocated memory when we go to clean up our pointer.
* HACK: is to leak the memory returned since we can't deallocate.
*/
- if(errno != 0) {
+ if (errno != 0) {
AI = NULL;
}
#endif
status = COMM_OK;
else if (ignoreErrno(errno))
status = COMM_INPROGRESS;
+ else if (errno == EAFNOSUPPORT || errno == EINVAL)
+ return COMM_ERR_PROTOCOL;
else
-#if USE_IPV6
- if( address.IsIPv4() && F->sock_family == AF_INET6 ) {
-
- /* failover to trying IPv4-only link if an IPv6 one fails */
- /* to catch the edge case of apps listening on IPv4-localhost */
- F->sock_family = AF_INET;
- int res = comm_connect_addr(sock, address);
-
- /* if that fails too, undo our temporary socktype hack so the repeat works properly. */
- if(res == COMM_ERROR)
- F->sock_family = AF_INET6;
-
- return res;
- }
- else
-#endif
return COMM_ERROR;
address.NtoA(F->ipaddr, MAX_IPSTRLEN);
F->remote_port = address.GetPort(); /* remote_port is HS */
- if (status == COMM_OK)
- {
+ if (status == COMM_OK) {
debugs(5, 10, "comm_connect_addr: FD " << sock << " connected to " << address);
- } else if (status == COMM_INPROGRESS)
- {
+ } else if (status == COMM_INPROGRESS) {
debugs(5, 10, "comm_connect_addr: FD " << sock << " connection pending");
}
return status;
}
-/* Wait for an incoming connection on FD. FD should be a socket returned
- * from comm_listen. */
-static int
-comm_old_accept(int fd, ConnectionDetail &details)
-{
- PROF_start(comm_accept);
- statCounter.syscalls.sock.accepts++;
- int sock;
- struct addrinfo *gai = NULL;
- details.me.InitAddrInfo(gai);
-
- if ((sock = accept(fd, gai->ai_addr, &gai->ai_addrlen)) < 0) {
-
- details.me.FreeAddrInfo(gai);
-
- PROF_stop(comm_accept);
-
- if (ignoreErrno(errno))
- {
- debugs(50, 5, "comm_old_accept: FD " << fd << ": " << xstrerror());
- return COMM_NOMESSAGE;
- } else if (ENFILE == errno || EMFILE == errno)
- {
- debugs(50, 3, "comm_old_accept: FD " << fd << ": " << xstrerror());
- return COMM_ERROR;
- } else
- {
- debugs(50, 1, "comm_old_accept: FD " << fd << ": " << xstrerror());
- return COMM_ERROR;
- }
- }
-
- details.peer = *gai;
-
- details.me.InitAddrInfo(gai);
-
- details.me.SetEmpty();
- getsockname(sock, gai->ai_addr, &gai->ai_addrlen);
- details.me = *gai;
-
- commSetCloseOnExec(sock);
-
- /* fdstat update */
- fd_open(sock, FD_SOCKET, "HTTP Request");
- fdd_table[sock].close_file = NULL;
- fdd_table[sock].close_line = 0;
- fde *F = &fd_table[sock];
- details.peer.NtoA(F->ipaddr,MAX_IPSTRLEN);
- F->remote_port = details.peer.GetPort();
- F->local_addr.SetPort(details.me.GetPort());
-#if USE_IPV6
- F->sock_family = AF_INET;
-#else
- F->sock_family = details.me.IsIPv4()?AF_INET:AF_INET6;
-#endif
- details.me.FreeAddrInfo(gai);
-
- commSetNonBlocking(sock);
-
- /* IFF the socket is (tproxy) transparent, pass the flag down to allow spoofing */
- F->flags.transparent = fd_table[fd].flags.transparent;
-
- PROF_stop(comm_accept);
- return sock;
-}
-
void
commCallCloseHandlers(int fd)
{
while (F->closeHandler != NULL) {
AsyncCall::Pointer call = F->closeHandler;
- F->closeHandler = call->Next();
- call->setNext(NULL);
- // If call is not canceled schedule it for execution else ignore it
- if(!call->canceled()){
- debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call);
- typedef CommCloseCbParams Params;
- Params ¶ms = GetCommParams<Params>(call);
- params.fd = fd;
- ScheduleCallHere(call);
- }
+ F->closeHandler = call->Next();
+ call->setNext(NULL);
+ // If call is not canceled schedule it for execution else ignore it
+ if (!call->canceled()) {
+ debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call);
+ typedef CommCloseCbParams Params;
+ Params ¶ms = GetCommParams<Params>(call);
+ params.fd = fd;
+ ScheduleCallHere(call);
+ }
}
}
L.l_linger = 0;
if (setsockopt(fd, SOL_SOCKET, SO_LINGER, (char *) &L, sizeof(L)) < 0)
- debugs(50, 0, "commResetTCPClose: FD " << fd << ": " << xstrerror());
+ debugs(50, DBG_CRITICAL, "ERROR: Closing FD " << fd << " with TCP RST: " << xstrerror());
comm_close(fd);
}
-void
+void
comm_close_start(int fd, void *data)
{
#if USE_SSL
}
-
-void
+void
comm_close_complete(int fd, void *data)
{
#if USE_SSL
close(fd);
- fdc_table[fd] = AcceptFD(fd);
-
statCounter.syscalls.sock.closes++;
/* When an fd closes, give accept() a chance, if need be */
-
- if (fdNFree() >= RESERVED_FD)
- AcceptLimiter::Instance().kick();
-
+ Comm::AcceptLimiter::Instance().kick();
}
/*
* + call read handlers with ERR_CLOSING
* + call closing handlers
*
- * NOTE: COMM_ERR_CLOSING will NOT be called for CommReads' sitting in a
+ * NOTE: COMM_ERR_CLOSING will NOT be called for CommReads' sitting in a
* DeferredReadManager.
*/
void
if (F->closing())
return;
- if (shutting_down && (!F->flags.open || F->type == FD_FILE))
+ /* XXX: is this obsolete behind F->closing() ? */
+ if ( (shutting_down || reconfiguring) && (!F->flags.open || F->type == FD_FILE))
return;
/* The following fails because ipc.c is doing calls to pipe() to create sockets! */
F->flags.close_request = 1;
AsyncCall::Pointer startCall=commCbCall(5,4, "comm_close_start",
- CommCloseCbPtrFun(comm_close_start, NULL));
+ CommCloseCbPtrFun(comm_close_start, NULL));
typedef CommCloseCbParams Params;
Params &startParams = GetCommParams<Params>(startCall);
startParams.fd = fd;
commio_finish_callback(fd, COMMIO_FD_READCB(fd), COMM_ERR_CLOSING, errno);
}
- // notify accept handlers
- fdc_table[fd].notify(-1, COMM_ERR_CLOSING, 0, ConnectionDetail());
-
commCallCloseHandlers(fd);
if (F->pconn.uses)
F->pconn.pool->count(F->pconn.uses);
comm_empty_os_read_buffers(fd);
-
+
AsyncCall::Pointer completeCall=commCbCall(5,4, "comm_close_complete",
- CommCloseCbPtrFun(comm_close_complete, NULL));
+ CommCloseCbPtrFun(comm_close_complete, NULL));
Params &completeParams = GetCommParams<Params>(completeCall);
completeParams.fd = fd;
- // must use async call to wait for all callbacks
+ // must use async call to wait for all callbacks
// scheduled before comm_close() to finish
ScheduleCallHere(completeCall);
/* Send a udp datagram to specified TO_ADDR. */
int
comm_udp_sendto(int fd,
- const IPAddress &to_addr,
+ const Ip::Address &to_addr,
const void *buf,
int len)
{
statCounter.syscalls.sock.sendtos++;
debugs(50, 3, "comm_udp_sendto: Attempt to send UDP packet to " << to_addr <<
- " using FD " << fd << " using Port " << comm_local_port(fd) );
+ " using FD " << fd << " using Port " << comm_local_port(fd) );
/* BUG: something in the above macro appears to occasionally be setting AI to garbage. */
/* AYJ: 2007-08-27 : or was it because I wasn't then setting 'fd_table[fd].sock_family' to fill properly. */
handler << ", data=" << data);
AsyncCall::Pointer call=commCbCall(5,4, "SomeCloseHandler",
- CommCloseCbPtrFun(handler, data));
+ CommCloseCbPtrFun(handler, data));
comm_add_close_handler(fd, call);
}
handler << ", data=" << data);
AsyncCall::Pointer p;
- for (p = fd_table[fd].closeHandler; p != NULL; p = p->Next()){
+ for (p = fd_table[fd].closeHandler; p != NULL; p = p->Next()) {
typedef CommCbFunPtrCallT<CommCloseCbPtrFun> Call;
const Call *call = dynamic_cast<const Call*>(p.getRaw());
if (!call) // method callbacks have their own comm_remove_close_handler
if (call->dialer.handler == handler && params.data == data)
break; /* This is our handler */
}
- assert(p != NULL);
- p->cancel("comm_remove_close_handler");
+
+ // comm_close removes all close handlers so our handler may be gone
+ if (p != NULL)
+ p->cancel("comm_remove_close_handler");
+ // TODO: should we remove the handler from the close handlers list?
}
// remove method-based close handler
comm_remove_close_handler(int fd, AsyncCall::Pointer &call)
{
assert (isOpen(fd));
- /* Find handler in list */
debugs(5, 5, "comm_remove_close_handler: FD " << fd << ", AsyncCall=" << call);
+ // comm_close removes all close handlers so our handler may be gone
+ // TODO: should we remove the handler from the close handlers list?
+#if 0
// Check to see if really exist the given AsyncCall in comm_close handlers
// TODO: optimize: this slow code is only needed for the assert() below
AsyncCall::Pointer p;
for (p = fd_table[fd].closeHandler; p != NULL && p != call; p = p->Next());
assert(p == call);
+#endif
call->cancel("comm_remove_close_handler");
}
}
void
-commSetCloseOnExec(int fd) {
+commSetCloseOnExec(int fd)
+{
#ifdef FD_CLOEXEC
int flags;
int dummy = 0;
#ifdef TCP_NODELAY
static void
-commSetTcpNoDelay(int fd) {
+commSetTcpNoDelay(int fd)
+{
int on = 1;
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof(on)) < 0)
int on = 1;
#ifdef TCP_KEEPCNT
if (timeout && interval) {
- int count = (timeout + interval - 1) / interval;
- if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &count, sizeof(on)) < 0)
- debugs(5, 1, "commSetKeepalive: FD " << fd << ": " << xstrerror());
+ int count = (timeout + interval - 1) / interval;
+ if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &count, sizeof(on)) < 0)
+ debugs(5, 1, "commSetKeepalive: FD " << fd << ": " << xstrerror());
}
#endif
#ifdef TCP_KEEPIDLE
if (idle) {
- if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(on)) < 0)
- debugs(5, 1, "commSetKeepalive: FD " << fd << ": " << xstrerror());
+ if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(on)) < 0)
+ debugs(5, 1, "commSetKeepalive: FD " << fd << ": " << xstrerror());
}
#endif
#ifdef TCP_KEEPINTVL
if (interval) {
- if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(on)) < 0)
- debugs(5, 1, "commSetKeepalive: FD " << fd << ": " << xstrerror());
+ if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(on)) < 0)
+ debugs(5, 1, "commSetKeepalive: FD " << fd << ": " << xstrerror());
}
#endif
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (char *) &on, sizeof(on)) < 0)
- debugs(5, 1, "commSetKeepalive: FD " << fd << ": " << xstrerror());
+ debugs(5, 1, "commSetKeepalive: FD " << fd << ": " << xstrerror());
}
void
-comm_init(void) {
+comm_init(void)
+{
fd_table =(fde *) xcalloc(Squid_MaxFD, sizeof(fde));
fdd_table = (fd_debug_t *)xcalloc(Squid_MaxFD, sizeof(fd_debug_t));
- fdc_table = new AcceptFD[Squid_MaxFD];
- for (int pos = 0; pos < Squid_MaxFD; ++pos) {
- fdc_table[pos] = AcceptFD(pos);
- }
+ /* make sure the accept() socket FIFO delay queue exists */
+ Comm::AcceptLimiter::Instance();
commfd_table = (comm_fd_t *) xcalloc(Squid_MaxFD, sizeof(comm_fd_t));
for (int pos = 0; pos < Squid_MaxFD; pos++) {
- commfd_table[pos].fd = pos;
- commfd_table[pos].readcb.fd = pos;
- commfd_table[pos].readcb.type = IOCB_READ;
- commfd_table[pos].writecb.fd = pos;
- commfd_table[pos].writecb.type = IOCB_WRITE;
+ commfd_table[pos].fd = pos;
+ commfd_table[pos].readcb.fd = pos;
+ commfd_table[pos].readcb.type = IOCB_READ;
+ commfd_table[pos].writecb.fd = pos;
+ commfd_table[pos].writecb.type = IOCB_WRITE;
}
/* XXX account fd_table */
/* Keep a few file descriptors free so that we don't run out of FD's
* after accepting a client but before it opens a socket or a file.
* Since Squid_MaxFD can be as high as several thousand, don't waste them */
- RESERVED_FD = XMIN(100, Squid_MaxFD / 4);
+ RESERVED_FD = min(100, Squid_MaxFD / 4);
conn_close_pool = memPoolCreate("close_handler", sizeof(close_handler));
}
void
-comm_exit(void) {
+comm_exit(void)
+{
delete TheHalfClosed;
TheHalfClosed = NULL;
safe_free(fd_table);
safe_free(fdd_table);
- if (fdc_table) {
- delete[] fdc_table;
- fdc_table = NULL;
- }
safe_free(commfd_table);
}
/* Write to FD. */
static void
-commHandleWrite(int fd, void *data) {
+commHandleWrite(int fd, void *data)
+{
comm_io_callback_t *state = (comm_io_callback_t *)data;
int len = 0;
int nleft;
debugs(5, 5, "commHandleWrite: write() returns " << len);
fd_bytes(fd, len, FD_WRITE);
statCounter.syscalls.sock.writes++;
+ // After each successful partial write,
+ // reset fde::writeStart to the current time.
+ fd_table[fd].writeStart = squid_curtime;
if (len == 0) {
/* Note we even call write if nleft == 0 */
comm_write(int fd, const char *buf, int size, IOCB * handler, void *handler_data, FREE * free_func)
{
AsyncCall::Pointer call = commCbCall(5,5, "SomeCommWriteHander",
- CommIoCbPtrFun(handler, handler_data));
+ CommIoCbPtrFun(handler, handler_data));
comm_write(fd, buf, size, call, free_func);
}
comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd);
assert(!ccb->active());
+ fd_table[fd].writeStart = squid_curtime;
/* Queue the write */
commio_set_callback(fd, IOCB_WRITE, ccb, callback,
- (char *)buf, free_func, size);
+ (char *)buf, free_func, size);
commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
}
/* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap */
void
-comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data) {
+comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data)
+{
comm_write(fd, mb->buf, mb->size, handler, handler_data, mb->freeFunc());
}
void
-comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback) {
+comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback)
+{
comm_write(fd, mb->buf, mb->size, callback, mb->freeFunc());
}
* like to use it.
*/
int
-ignoreErrno(int ierrno) {
+ignoreErrno(int ierrno)
+{
switch (ierrno) {
case EINPROGRESS:
}
void
-commCloseAllSockets(void) {
+commCloseAllSockets(void)
+{
int fd;
fde *F = NULL;
AsyncCall::Pointer callback = F->timeoutHandler;
F->timeoutHandler = NULL;
debugs(5, 5, "commCloseAllSockets: FD " << fd << ": Calling timeout handler");
- ScheduleCallHere(callback);
+ ScheduleCallHere(callback);
} else {
- debugs(5, 5, "commCloseAllSockets: FD " << fd << ": calling comm_close()");
- comm_close(fd);
+ debugs(5, 5, "commCloseAllSockets: FD " << fd << ": calling comm_reset_close()");
+ comm_reset_close(fd);
}
}
}
static bool
-AlreadyTimedOut(fde *F) {
+AlreadyTimedOut(fde *F)
+{
if (!F->flags.open)
return true;
return false;
}
+static bool
+writeTimedOut(int fd)
+{
+ if (!commio_has_callback(fd, IOCB_WRITE, COMMIO_FD_WRITECB(fd)))
+ return false;
+
+ if ((squid_curtime - fd_table[fd].writeStart) < Config.Timeout.write)
+ return false;
+
+ return true;
+}
+
void
-checkTimeouts(void) {
+checkTimeouts(void)
+{
int fd;
fde *F = NULL;
AsyncCall::Pointer callback;
for (fd = 0; fd <= Biggest_FD; fd++) {
F = &fd_table[fd];
- if (AlreadyTimedOut(F))
+ if (writeTimedOut(fd)) {
+ // We have an active write callback and we are timed out
+ commio_finish_callback(fd, COMMIO_FD_WRITECB(fd), COMM_ERROR, ETIMEDOUT);
+ } else if (AlreadyTimedOut(F))
continue;
- debugs(5, 5, "checkTimeouts: FD " << fd << " Expired");
+ debugs(5, 5, "checkTimeouts: FD " << fd << " Expired");
if (F->timeoutHandler != NULL) {
debugs(5, 5, "checkTimeouts: FD " << fd << ": Call timeout handler");
callback = F->timeoutHandler;
F->timeoutHandler = NULL;
- ScheduleCallHere(callback);
+ ScheduleCallHere(callback);
} else {
debugs(5, 5, "checkTimeouts: FD " << fd << ": Forcing comm_close()");
comm_close(fd);
}
}
-/*
- * New-style listen and accept routines
- *
- * Listen simply registers our interest in an FD for listening,
- * and accept takes a callback to call when an FD has been
- * accept()ed.
- */
-int
-comm_listen(int sock) {
- int x;
-
- if ((x = listen(sock, Squid_MaxFD >> 2)) < 0) {
- debugs(50, 0, "comm_listen: listen(" << (Squid_MaxFD >> 2) << ", " << sock << "): " << xstrerror());
- return x;
- }
-
- if (Config.accept_filter && strcmp(Config.accept_filter, "none") != 0) {
-#ifdef SO_ACCEPTFILTER
- struct accept_filter_arg afa;
- bzero(&afa, sizeof(afa));
- debugs(5, DBG_CRITICAL, "Installing accept filter '" << Config.accept_filter << "' on FD " << sock);
- xstrncpy(afa.af_name, Config.accept_filter, sizeof(afa.af_name));
- x = setsockopt(sock, SOL_SOCKET, SO_ACCEPTFILTER, &afa, sizeof(afa));
- if (x < 0)
- debugs(5, 0, "SO_ACCEPTFILTER '" << Config.accept_filter << "': '" << xstrerror());
-#elif defined(TCP_DEFER_ACCEPT)
- int seconds = 30;
- if (strncmp(Config.accept_filter, "data=", 5) == 0)
- seconds = atoi(Config.accept_filter + 5);
- x = setsockopt(sock, IPPROTO_TCP, TCP_DEFER_ACCEPT, &seconds, sizeof(seconds));
- if (x < 0)
- debugs(5, 0, "TCP_DEFER_ACCEPT '" << Config.accept_filter << "': '" << xstrerror());
-#else
- debugs(5, 0, "accept_filter not supported on your OS");
-#endif
- }
-
- return sock;
-}
-
-void
-comm_accept(int fd, IOACB *handler, void *handler_data) {
- debugs(5, 5, "comm_accept: FD " << fd << " handler: " << (void*)handler);
- assert(isOpen(fd));
-
- AsyncCall::Pointer call = commCbCall(5,5, "SomeCommAcceptHandler",
- CommAcceptCbPtrFun(handler, handler_data));
- fdc_table[fd].subscribe(call);
-}
-
-void
-comm_accept(int fd, AsyncCall::Pointer &call) {
- debugs(5, 5, "comm_accept: FD " << fd << " AsyncCall: " << call);
- assert(isOpen(fd));
-
- fdc_table[fd].subscribe(call);
-}
-
-// Called when somebody wants to be notified when our socket accepts new
-// connection. We do not probe the FD until there is such interest.
-void
-AcceptFD::subscribe(AsyncCall::Pointer &call) {
- /* make sure we're not pending! */
- assert(!theCallback);
- theCallback = call;
-
-#if OPTIMISTIC_IO
- mayAcceptMore = true; // even if we failed to accept last time
-#endif
-
- if (mayAcceptMore)
- acceptNext();
- else
- commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
-}
-
-bool
-AcceptFD::acceptOne() {
- // If there is no callback and we accept, we will leak the accepted FD.
- // When we are running out of FDs, there is often no callback.
- if (!theCallback) {
- debugs(5, 5, "AcceptFD::acceptOne orphaned: FD " << fd);
- // XXX: can we remove this and similar "just in case" calls and
- // either listen always or listen only when there is a callback?
- if (!AcceptLimiter::Instance().deferring())
- commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
- return false;
- }
-
- /*
- * We don't worry about running low on FDs here. Instead,
- * httpAccept() will use AcceptLimiter if we reach the limit
- * there.
- */
-
- /* Accept a new connection */
- ConnectionDetail connDetails;
- int newfd = comm_old_accept(fd, connDetails);
-
- /* Check for errors */
-
- if (newfd < 0) {
- assert(theCallback != NULL);
-
- if (newfd == COMM_NOMESSAGE) {
- /* register interest again */
- debugs(5, 5, HERE << "try later: FD " << fd <<
- " handler: " << *theCallback);
- commSetSelect(fd, COMM_SELECT_READ, comm_accept_try, NULL, 0);
- return false;
- }
-
- // A non-recoverable error; notify the caller */
- notify(-1, COMM_ERROR, errno, connDetails);
- return false;
- }
-
- assert(theCallback != NULL);
- debugs(5, 5, "AcceptFD::acceptOne accepted: FD " << fd <<
- " newfd: " << newfd << " from: " << connDetails.peer <<
- " handler: " << *theCallback);
- notify(newfd, COMM_OK, 0, connDetails);
- return true;
-}
-
-void
-AcceptFD::acceptNext() {
- mayAcceptMore = acceptOne();
-}
-
-void
-AcceptFD::notify(int newfd, comm_err_t errcode, int xerrno, const ConnectionDetail &connDetails)
+void CommIO::Initialise()
{
- if (theCallback != NULL) {
- typedef CommAcceptCbParams Params;
- Params ¶ms = GetCommParams<Params>(theCallback);
- params.fd = fd;
- params.nfd = newfd;
- params.details = connDetails;
- params.flag = errcode;
- params.xerrno = xerrno;
- ScheduleCallHere(theCallback);
- theCallback = NULL;
- }
-}
-
-/*
- * This callback is called whenever a filedescriptor is ready
- * to dupe itself and fob off an accept()ed connection
- */
-static void
-comm_accept_try(int fd, void *) {
- assert(isOpen(fd));
- fdc_table[fd].acceptNext();
-}
-
-void CommIO::Initialise() {
/* Initialize done pipe signal */
int DonePipe[2];
- if(pipe(DonePipe)) {}
+ if (pipe(DonePipe)) {}
DoneFD = DonePipe[1];
DoneReadFD = DonePipe[0];
fd_open(DoneReadFD, FD_PIPE, "async-io completetion event: main");
Initialised = true;
}
-void CommIO::NotifyIOClose() {
+void CommIO::NotifyIOClose()
+{
/* Close done pipe signal */
FlushPipe();
close(DoneFD);
int CommIO::DoneReadFD = -1;
void
-CommIO::FlushPipe() {
+CommIO::FlushPipe()
+{
char buf[256];
FD_READ_METHOD(DoneReadFD, buf, sizeof(buf));
}
void
-CommIO::NULLFDHandler(int fd, void *data) {
+CommIO::NULLFDHandler(int fd, void *data)
+{
FlushPipe();
commSetSelect(fd, COMM_SELECT_READ, NULLFDHandler, NULL, 0);
}
void
-CommIO::ResetNotifications() {
+CommIO::ResetNotifications()
+{
if (DoneSignalled) {
FlushPipe();
DoneSignalled = false;
}
}
-AcceptLimiter AcceptLimiter::Instance_;
-
-AcceptLimiter &AcceptLimiter::Instance() {
- return Instance_;
-}
-
-bool
-AcceptLimiter::deferring() const {
- return deferred.size() > 0;
-}
-
-void
-AcceptLimiter::defer (int fd, Acceptor::AcceptorFunction *aFunc, void *data) {
- debugs(5, 5, "AcceptLimiter::defer: FD " << fd << " handler: " << (void*)aFunc);
- Acceptor temp;
- temp.theFunction = aFunc;
- temp.acceptFD = fd;
- temp.theData = data;
- deferred.push_back(temp);
-}
-
-void
-AcceptLimiter::kick() {
- if (!deferring())
- return;
-
- /* Yes, this means the first on is the last off....
- * If the list container was a little more friendly, we could sensibly us it.
- */
- Acceptor temp = deferred.pop_back();
-
- comm_accept (temp.acceptFD, temp.theFunction, temp.theData);
-}
-
-/// Start waiting for a possibly half-closed connection to close
-// by scheduling a read callback to a monitoring handler that
+/// Start waiting for a possibly half-closed connection to close
+// by scheduling a read callback to a monitoring handler that
// will close the connection on read errors.
void
-commStartHalfClosedMonitor(int fd) {
+commStartHalfClosedMonitor(int fd)
+{
debugs(5, 5, HERE << "adding FD " << fd << " to " << *TheHalfClosed);
assert(isOpen(fd));
assert(!commHasHalfClosedMonitor(fd));
/// calls comm_read for those that do; re-schedules the check if needed
static
void
-commHalfClosedCheck(void *) {
+commHalfClosedCheck(void *)
+{
debugs(5, 5, HERE << "checking " << *TheHalfClosed);
typedef DescriptorSet::const_iterator DSCI;
const int fd = *i;
if (!fd_table[fd].halfClosedReader) { // not reading already
AsyncCall::Pointer call = commCbCall(5,4, "commHalfClosedReader",
- CommIoCbPtrFun(&commHalfClosedReader, NULL));
+ CommIoCbPtrFun(&commHalfClosedReader, NULL));
comm_read(fd, NULL, 0, call);
fd_table[fd].halfClosedReader = call;
}
/// checks whether we are waiting for possibly half-closed connection to close
// We are monitoring if the read handler for the fd is the monitoring handler.
bool
-commHasHalfClosedMonitor(int fd) {
+commHasHalfClosedMonitor(int fd)
+{
return TheHalfClosed->has(fd);
}
/// stop waiting for possibly half-closed connection to close
static void
-commStopHalfClosedMonitor(int const fd) {
+commStopHalfClosedMonitor(int const fd)
+{
debugs(5, 5, HERE << "removing FD " << fd << " from " << *TheHalfClosed);
// cancel the read if one was scheduled
/// I/O handler for the possibly half-closed connection monitoring code
static void
-commHalfClosedReader(int fd, char *, size_t size, comm_err_t flag, int, void *) {
+commHalfClosedReader(int fd, char *, size_t size, comm_err_t flag, int, void *)
+{
// there cannot be more data coming in on half-closed connections
- assert(size == 0);
+ assert(size == 0);
assert(commHasHalfClosedMonitor(fd)); // or we would have canceled the read
fd_table[fd].halfClosedReader = NULL; // done reading, for now
DeferredRead::DeferredRead (DeferrableRead *aReader, void *data, CommRead const &aRead) : theReader(aReader), theContext (data), theRead(aRead), cancelled(false) {}
-DeferredReadManager::~DeferredReadManager() {
+DeferredReadManager::~DeferredReadManager()
+{
flushReads();
assert (deferredReads.empty());
}
/// \endcond
void
-DeferredReadManager::delayRead(DeferredRead const &aRead) {
+DeferredReadManager::delayRead(DeferredRead const &aRead)
+{
debugs(5, 3, "Adding deferred read on FD " << aRead.theRead.fd);
CbDataList<DeferredRead> *temp = deferredReads.push_back(aRead);
- // We have to use a global function as a closer and point to temp
+ // We have to use a global function as a closer and point to temp
// instead of "this" because DeferredReadManager is not a job and
// is not even cbdata protected
AsyncCall::Pointer closer = commCbCall(5,4,
- "DeferredReadManager::CloseHandler",
- CommCloseCbPtrFun(&CloseHandler, temp));
+ "DeferredReadManager::CloseHandler",
+ CommCloseCbPtrFun(&CloseHandler, temp));
comm_add_close_handler(aRead.theRead.fd, closer);
temp->element.closer = closer; // remeber so that we can cancel
}
void
-DeferredReadManager::CloseHandler(int fd, void *thecbdata) {
+DeferredReadManager::CloseHandler(int fd, void *thecbdata)
+{
if (!cbdataReferenceValid (thecbdata))
return;
}
DeferredRead
-DeferredReadManager::popHead(CbDataListContainer<DeferredRead> &deferredReads) {
+DeferredReadManager::popHead(CbDataListContainer<DeferredRead> &deferredReads)
+{
assert (!deferredReads.empty());
DeferredRead &read = deferredReads.head->element;
}
void
-DeferredReadManager::kickReads(int const count) {
+DeferredReadManager::kickReads(int const count)
+{
/* if we had CbDataList::size() we could consolidate this and flushReads */
if (count < 1) {
}
void
-DeferredReadManager::flushReads() {
+DeferredReadManager::flushReads()
+{
CbDataListContainer<DeferredRead> reads;
reads = deferredReads;
deferredReads = CbDataListContainer<DeferredRead>();
+ // XXX: For fairness this SHOULD randomize the order
while (!reads.empty()) {
DeferredRead aRead = popHead(reads);
kickARead(aRead);
}
void
-DeferredReadManager::kickARead(DeferredRead const &aRead) {
+DeferredReadManager::kickARead(DeferredRead const &aRead)
+{
if (aRead.cancelled)
return;
+ if (aRead.theRead.fd>=0 && fd_table[aRead.theRead.fd].closing())
+ return;
+
debugs(5, 3, "Kicking deferred read on FD " << aRead.theRead.fd);
aRead.theReader(aRead.theContext, aRead.theRead);
}
void
-DeferredRead::markCancelled() {
+DeferredRead::markCancelled()
+{
cancelled = true;
}
-ConnectionDetail::ConnectionDetail() : me(), peer() {
+ConnectionDetail::ConnectionDetail() : me(), peer()
+{
}
int
-CommSelectEngine::checkEvents(int timeout) {
+CommSelectEngine::checkEvents(int timeout)
+{
static time_t last_timeout = 0;
/* No, this shouldn't be here. But it shouldn't be in each comm handler. -adrian */
return EVENT_ERROR;
};
}
+
+/// Create a unix-domain socket (UDS) that only supports FD_MSGHDR I/O.
+int
+comm_open_uds(int sock_type,
+ int proto,
+ struct sockaddr_un* addr,
+ int flags)
+{
+ // TODO: merge with comm_openex() when Ip::Address becomes NetAddress
+
+ int new_socket;
+
+ PROF_start(comm_open);
+ /* Create socket for accepting new connections. */
+ statCounter.syscalls.sock.sockets++;
+
+ /* Setup the socket addrinfo details for use */
+ struct addrinfo AI;
+ AI.ai_flags = 0;
+ AI.ai_family = PF_UNIX;
+ AI.ai_socktype = sock_type;
+ AI.ai_protocol = proto;
+ AI.ai_addrlen = SUN_LEN(addr);
+ AI.ai_addr = (sockaddr*)addr;
+ AI.ai_canonname = NULL;
+ AI.ai_next = NULL;
+
+ debugs(50, 3, HERE << "Attempt open socket for: " << addr->sun_path);
+
+ if ((new_socket = socket(AI.ai_family, AI.ai_socktype, AI.ai_protocol)) < 0) {
+ /* Increase the number of reserved fd's if calls to socket()
+ * are failing because the open file table is full. This
+ * limits the number of simultaneous clients */
+
+ if (limitError(errno)) {
+ debugs(50, DBG_IMPORTANT, HERE << "socket failure: " << xstrerror());
+ fdAdjustReserved();
+ } else {
+ debugs(50, DBG_CRITICAL, HERE << "socket failure: " << xstrerror());
+ }
+
+ PROF_stop(comm_open);
+ return -1;
+ }
+
+ debugs(50, 3, HERE "Opened UDS FD " << new_socket << " : family=" << AI.ai_family << ", type=" << AI.ai_socktype << ", protocol=" << AI.ai_protocol);
+
+ /* update fdstat */
+ debugs(50, 5, HERE << "FD " << new_socket << " is a new socket");
+
+ assert(!isOpen(new_socket));
+ fd_open(new_socket, FD_MSGHDR, NULL);
+
+ fdd_table[new_socket].close_file = NULL;
+
+ fdd_table[new_socket].close_line = 0;
+
+ fd_table[new_socket].sock_family = AI.ai_family;
+
+ if (!(flags & COMM_NOCLOEXEC))
+ commSetCloseOnExec(new_socket);
+
+ if (flags & COMM_REUSEADDR)
+ commSetReuseAddr(new_socket);
+
+ if (flags & COMM_NONBLOCKING) {
+ if (commSetNonBlocking(new_socket) != COMM_OK) {
+ comm_close(new_socket);
+ PROF_stop(comm_open);
+ return -1;
+ }
+ }
+
+ if (flags & COMM_DOBIND) {
+ if (commBind(new_socket, AI) != COMM_OK) {
+ comm_close(new_socket);
+ PROF_stop(comm_open);
+ return -1;
+ }
+ }
+
+#ifdef TCP_NODELAY
+ if (sock_type == SOCK_STREAM)
+ commSetTcpNoDelay(new_socket);
+
+#endif
+
+ if (Config.tcpRcvBufsz > 0 && sock_type == SOCK_STREAM)
+ commSetTcpRcvbuf(new_socket, Config.tcpRcvBufsz);
+
+ PROF_stop(comm_open);
+
+ return new_socket;
+}