#include "squid.h"
#include "base/AsyncCall.h"
-#include "StoreIOBuffer.h"
#include "comm.h"
-#include "event.h"
-#include "fde.h"
+#include "ClientInfo.h"
+#include "CommCalls.h"
#include "comm/AcceptLimiter.h"
#include "comm/comm_internal.h"
#include "comm/Connection.h"
#include "comm/Loops.h"
#include "comm/Write.h"
#include "comm/TcpAcceptor.h"
-#include "CommIO.h"
#include "CommRead.h"
-#include "MemBuf.h"
-#include "pconn.h"
-#include "SquidTime.h"
-#include "CommCalls.h"
+#include "compat/cmsg.h"
#include "DescriptorSet.h"
+#include "event.h"
+#include "fde.h"
+#include "globals.h"
#include "icmp/net_db.h"
#include "ip/Address.h"
#include "ip/Intercept.h"
#include "ip/QosConfig.h"
#include "ip/tools.h"
-#include "ClientInfo.h"
+#include "MemBuf.h"
+#include "pconn.h"
+#include "protos.h"
+#include "profiler/Profiler.h"
+#include "SquidTime.h"
+#include "StatCounters.h"
+#include "StoreIOBuffer.h"
#if USE_SSL
#include "ssl/support.h"
#endif
#ifdef HAVE_NETINET_TCP_H
#include <netinet/tcp.h>
#endif
+#if HAVE_SYS_UN_H
+#include <sys/un.h>
+#endif
+#if HAVE_MATH_H
+#include <math.h>
+#endif
+#if HAVE_ERRNO_H
+#include <errno.h>
+#endif
/*
* New C-like simple comm code. This stuff is a mess and doesn't really buy us anything.
assert(data == COMMIO_FD_READCB(fd));
assert(ccb->active());
/* Attempt a read */
- statCounter.syscalls.sock.reads++;
+ ++ statCounter.syscalls.sock.reads;
errno = 0;
int retval;
retval = FD_READ_METHOD(fd, ccb->buf, ccb->size);
#endif
}
-
/**
* Return whether the FD has a pending completed callback.
* NP: does not work.
Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
}
-
/**
* synchronous wrapper around udp socket functions
*/
int
comm_udp_recvfrom(int fd, void *buf, size_t len, int flags, Ip::Address &from)
{
- statCounter.syscalls.sock.recvfroms++;
+ ++ statCounter.syscalls.sock.recvfroms;
int x = 0;
struct addrinfo *AI = NULL;
return send(s, buf, len, flags);
}
-
bool
comm_has_incomplete_write(int fd)
{
temp.InitAddrInfo(addr);
if (getsockname(fd, addr->ai_addr, &(addr->ai_addrlen)) ) {
- debugs(50, 1, "comm_local_port: Failed to retrieve TCP/UDP port number for socket: FD " << fd << ": " << xstrerror());
+ debugs(50, DBG_IMPORTANT, "comm_local_port: Failed to retrieve TCP/UDP port number for socket: FD " << fd << ": " << xstrerror());
temp.FreeAddrInfo(addr);
return 0;
}
static comm_err_t
commBind(int s, struct addrinfo &inaddr)
{
- statCounter.syscalls.sock.binds++;
+ ++ statCounter.syscalls.sock.binds;
if (bind(s, inaddr.ai_addr, inaddr.ai_addrlen) == 0) {
debugs(50, 6, "commBind: bind socket FD " << s << " to " << fd_table[s].local_addr);
{
#ifdef IPV6_V6ONLY
if (setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &tos, sizeof(int)) < 0) {
- debugs(50, 1, "comm_open: setsockopt(IPV6_V6ONLY) " << (tos?"ON":"OFF") << " for FD " << fd << ": " << xstrerror());
+ debugs(50, DBG_IMPORTANT, "comm_open: setsockopt(IPV6_V6ONLY) " << (tos?"ON":"OFF") << " for FD " << fd << ": " << xstrerror());
}
#else
debugs(50, 0, "WARNING: comm_open: setsockopt(IPV6_V6ONLY) not supported on this platform");
PROF_start(comm_open);
/* Create socket for accepting new connections. */
- statCounter.syscalls.sock.sockets++;
+ ++ statCounter.syscalls.sock.sockets;
/* Setup the socket addrinfo details for use */
addr.GetAddrInfo(AI);
if ( (flags & COMM_DOBIND) || addr.GetPort() > 0 || !addr.IsAnyAddr() ) {
if ( !(flags & COMM_DOBIND) && addr.IsAnyAddr() )
- debugs(5,1,"WARNING: Squid is attempting to bind() port " << addr << " without being a listener.");
+ debugs(5, DBG_IMPORTANT,"WARNING: Squid is attempting to bind() port " << addr << " without being a listener.");
if ( addr.IsNoAddr() )
debugs(5,0,"CRITICAL: Squid is attempting to bind() port " << addr << "!!");
*/
}
-// Legacy pre-AsyncCalls API for FD timeouts.
-int
-commSetTimeout(int fd, int timeout, CTCB * handler, void *data)
-{
- AsyncCall::Pointer call;
- debugs(5, 3, HERE << "FD " << fd << " timeout " << timeout);
- if (handler != NULL)
- call=commCbCall(5,4, "SomeTimeoutHandler", CommTimeoutCbPtrFun(handler, data));
- else
- call = NULL;
- return commSetTimeout(fd, timeout, call);
-}
-
-// Legacy pre-Comm::Connection API for FD timeouts
-// still used by non-socket FD code dealing with pipes and IPC sockets.
-int
-commSetTimeout(int fd, int timeout, AsyncCall::Pointer &callback)
+// XXX: now that raw-FD timeouts are only unset for pipes and files this SHOULD be a no-op.
+// With handler already unset. Leaving this present until that can be verified for all code paths.
+void
+commUnsetFdTimeout(int fd)
{
- debugs(5, 3, HERE << "FD " << fd << " timeout " << timeout);
+ debugs(5, 3, HERE << "Remove timeout for FD " << fd);
assert(fd >= 0);
assert(fd < Squid_MaxFD);
fde *F = &fd_table[fd];
assert(F->flags.open);
- if (timeout < 0) {
- F->timeoutHandler = NULL;
- F->timeout = 0;
- } else {
- if (callback != NULL) {
- typedef CommTimeoutCbParams Params;
- Params ¶ms = GetCommParams<Params>(callback);
- params.fd = fd;
- F->timeoutHandler = callback;
- }
-
- F->timeout = squid_curtime + (time_t) timeout;
- }
-
- return F->timeout;
+ F->timeoutHandler = NULL;
+ F->timeout = 0;
}
int
if (!F->flags.called_connect) {
F->flags.called_connect = 1;
- statCounter.syscalls.sock.connects++;
+ ++ statCounter.syscalls.sock.connects;
x = connect(sock, AI->ai_addr, AI->ai_addrlen);
debugs(14,9, "connecting to: " << address );
}
} else {
-#if defined(_SQUID_NEWSOS6_)
+#if _SQUID_NEWSOS6_
/* Makoto MATSUSHITA <matusita@ics.es.osaka-u.ac.jp> */
connect(sock, AI->ai_addr, AI->ai_addrlen);
if (x == 0)
errno = err;
-#if defined(_SQUID_SOLARIS_)
+#if _SQUID_SOLARIS_
/*
* Solaris 2.4's socket emulation doesn't allow you
* to determine the error from a failed non-blocking
F->remote_port = address.GetPort(); /* remote_port is HS */
if (status == COMM_OK) {
- debugs(5, 10, "comm_connect_addr: FD " << sock << " connected to " << address);
+ debugs(5, DBG_DATA, "comm_connect_addr: FD " << sock << " connected to " << address);
} else if (status == COMM_INPROGRESS) {
- debugs(5, 10, "comm_connect_addr: FD " << sock << " connection pending");
+ debugs(5, DBG_DATA, "comm_connect_addr: FD " << sock << " connection pending");
}
return status;
// If call is not canceled schedule it for execution else ignore it
if (!call->canceled()) {
debugs(5, 5, "commCallCloseHandlers: ch->handler=" << call);
- typedef CommCloseCbParams Params;
- Params ¶ms = GetCommParams<Params>(call);
- params.fd = fd;
ScheduleCallHere(call);
}
}
}
static void
-commLingerTimeout(int fd, void *unused)
+commLingerTimeout(const FdeCbParams ¶ms)
{
- debugs(5, 3, "commLingerTimeout: FD " << fd);
- comm_close(fd);
+ debugs(5, 3, "commLingerTimeout: FD " << params.fd);
+ comm_close(params.fd);
}
/*
comm_lingering_close(int fd)
{
#if USE_SSL
-
if (fd_table[fd].ssl)
- ssl_shutdown_method(fd);
-
+ ssl_shutdown_method(fd_table[fd].ssl);
#endif
if (shutdown(fd, 1) < 0) {
}
fd_note(fd, "lingering close");
- commSetTimeout(fd, 10, commLingerTimeout, NULL);
+ AsyncCall::Pointer call = commCbCall(5,4, "commLingerTimeout", FdeCbPtrFun(commLingerTimeout, NULL));
+
+ debugs(5, 3, HERE << "FD " << fd << " timeout " << timeout);
+ assert(fd_table[fd].flags.open);
+ if (callback != NULL) {
+ typedef FdeCbParams Params;
+ Params ¶ms = GetCommParams<Params>(callback);
+ params.fd = fd;
+ fd_table[fd].timeoutHandler = callback;
+ fd_table[fd].timeout = squid_curtime + static_cast<time_t>(10);
+ }
+
Comm::SetSelect(fd, COMM_SELECT_READ, commLingerClose, NULL, 0);
}
comm_close(fd);
}
+#if USE_SSL
void
-comm_close_start(int fd, void *data)
+commStartSslClose(const FdeCbParams ¶ms)
{
-#if USE_SSL
- fde *F = &fd_table[fd];
- if (F->ssl)
- ssl_shutdown_method(fd);
-
-#endif
-
+ assert(&fd_table[params.fd].ssl);
+ ssl_shutdown_method(fd_table[params.fd].ssl);
}
+#endif
void
-comm_close_complete(int fd, void *data)
+comm_close_complete(const FdeCbParams ¶ms)
{
#if USE_SSL
- fde *F = &fd_table[fd];
+ fde *F = &fd_table[params.fd];
if (F->ssl) {
SSL_free(F->ssl);
F->dynamicSslContext = NULL;
}
#endif
- fd_close(fd); /* update fdstat */
-
- close(fd);
+ fd_close(params.fd); /* update fdstat */
+ close(params.fd);
- statCounter.syscalls.sock.closes++;
+ ++ statCounter.syscalls.sock.closes;
- /* When an fd closes, give accept() a chance, if need be */
+ /* When one connection closes, give accept() a chance, if need be */
Comm::AcceptLimiter::Instance().kick();
}
return;
/* The following fails because ipc.c is doing calls to pipe() to create sockets! */
- assert(isOpen(fd));
+ if (!isOpen(fd)) {
+ debugs(50, DBG_IMPORTANT, HERE << "BUG 3556: FD " << fd << " is not an open socket.");
+ // XXX: do we need to run close(fd) or fd_close(fd) here?
+ return;
+ }
assert(F->type != FD_FILE);
F->flags.close_request = 1;
- AsyncCall::Pointer startCall=commCbCall(5,4, "comm_close_start",
- CommCloseCbPtrFun(comm_close_start, NULL));
- typedef CommCloseCbParams Params;
- Params &startParams = GetCommParams<Params>(startCall);
- startParams.fd = fd;
- ScheduleCallHere(startCall);
+#if USE_SSL
+ if (F->ssl) {
+ AsyncCall::Pointer startCall=commCbCall(5,4, "commStartSslClose",
+ FdeCbPtrFun(commStartSslClose, NULL));
+ FdeCbParams &startParams = GetCommParams<FdeCbParams>(startCall);
+ startParams.fd = fd;
+ ScheduleCallHere(startCall);
+ }
+#endif
// a half-closed fd may lack a reader, so we stop monitoring explicitly
if (commHasHalfClosedMonitor(fd))
commStopHalfClosedMonitor(fd);
- commSetTimeout(fd, -1, NULL, NULL);
+ commUnsetFdTimeout(fd);
// notify read/write handlers after canceling select reservations, if any
if (COMMIO_FD_WRITECB(fd)->active()) {
comm_empty_os_read_buffers(fd);
-
AsyncCall::Pointer completeCall=commCbCall(5,4, "comm_close_complete",
- CommCloseCbPtrFun(comm_close_complete, NULL));
- Params &completeParams = GetCommParams<Params>(completeCall);
+ FdeCbPtrFun(comm_close_complete, NULL));
+ FdeCbParams &completeParams = GetCommParams<FdeCbParams>(completeCall);
completeParams.fd = fd;
// must use async call to wait for all callbacks
// scheduled before comm_close() to finish
struct addrinfo *AI = NULL;
PROF_start(comm_udp_sendto);
- statCounter.syscalls.sock.sendtos++;
+ ++ statCounter.syscalls.sock.sendtos;
debugs(50, 3, "comm_udp_sendto: Attempt to send UDP packet to " << to_addr <<
" using FD " << fd << " using Port " << comm_local_port(fd) );
if (ECONNREFUSED != errno)
#endif
- debugs(50, 1, "comm_udp_sendto: FD " << fd << ", (family=" << fd_table[fd].sock_family << ") " << to_addr << ": " << xstrerror());
+ debugs(50, DBG_IMPORTANT, "comm_udp_sendto: FD " << fd << ", (family=" << fd_table[fd].sock_family << ") " << to_addr << ": " << xstrerror());
return COMM_ERROR;
}
void
-comm_add_close_handler(int fd, PF * handler, void *data)
+comm_add_close_handler(int fd, CLCB * handler, void *data)
{
debugs(5, 5, "comm_add_close_handler: FD " << fd << ", handler=" <<
handler << ", data=" << data);
fd_table[fd].closeHandler = call;
}
-
// remove function-based close handler
void
-comm_remove_close_handler(int fd, PF * handler, void *data)
+comm_remove_close_handler(int fd, CLCB * handler, void *data)
{
assert (isOpen(fd));
/* Find handler in list */
int on = 1;
if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof(on)) < 0)
- debugs(50, 1, "commSetReuseAddr: FD " << fd << ": " << xstrerror());
+ debugs(50, DBG_IMPORTANT, "commSetReuseAddr: FD " << fd << ": " << xstrerror());
}
static void
commSetTcpRcvbuf(int fd, int size)
{
if (setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (char *) &size, sizeof(size)) < 0)
- debugs(50, 1, "commSetTcpRcvbuf: FD " << fd << ", SIZE " << size << ": " << xstrerror());
+ debugs(50, DBG_IMPORTANT, "commSetTcpRcvbuf: FD " << fd << ", SIZE " << size << ": " << xstrerror());
if (setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (char *) &size, sizeof(size)) < 0)
- debugs(50, 1, "commSetTcpRcvbuf: FD " << fd << ", SIZE " << size << ": " << xstrerror());
+ debugs(50, DBG_IMPORTANT, "commSetTcpRcvbuf: FD " << fd << ", SIZE " << size << ": " << xstrerror());
#ifdef TCP_WINDOW_CLAMP
if (setsockopt(fd, SOL_TCP, TCP_WINDOW_CLAMP, (char *) &size, sizeof(size)) < 0)
- debugs(50, 1, "commSetTcpRcvbuf: FD " << fd << ", SIZE " << size << ": " << xstrerror());
+ debugs(50, DBG_IMPORTANT, "commSetTcpRcvbuf: FD " << fd << ", SIZE " << size << ": " << xstrerror());
#endif
}
int on = 1;
if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &on, sizeof(on)) < 0)
- debugs(50, 1, "commSetTcpNoDelay: FD " << fd << ": " << xstrerror());
+ debugs(50, DBG_IMPORTANT, "commSetTcpNoDelay: FD " << fd << ": " << xstrerror());
fd_table[fd].flags.nodelay = 1;
}
if (timeout && interval) {
int count = (timeout + interval - 1) / interval;
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &count, sizeof(on)) < 0)
- debugs(5, 1, "commSetKeepalive: FD " << fd << ": " << xstrerror());
+ debugs(5, DBG_IMPORTANT, "commSetKeepalive: FD " << fd << ": " << xstrerror());
}
#endif
#ifdef TCP_KEEPIDLE
if (idle) {
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &idle, sizeof(on)) < 0)
- debugs(5, 1, "commSetKeepalive: FD " << fd << ": " << xstrerror());
+ debugs(5, DBG_IMPORTANT, "commSetKeepalive: FD " << fd << ": " << xstrerror());
}
#endif
#ifdef TCP_KEEPINTVL
if (interval) {
if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &interval, sizeof(on)) < 0)
- debugs(5, 1, "commSetKeepalive: FD " << fd << ": " << xstrerror());
+ debugs(5, DBG_IMPORTANT, "commSetKeepalive: FD " << fd << ": " << xstrerror());
}
#endif
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (char *) &on, sizeof(on)) < 0)
- debugs(5, 1, "commSetKeepalive: FD " << fd << ": " << xstrerror());
+ debugs(5, DBG_IMPORTANT, "commSetKeepalive: FD " << fd << ": " << xstrerror());
}
void
int fd;
fde *F = NULL;
- for (fd = 0; fd <= Biggest_FD; fd++) {
+ for (fd = 0; fd <= Biggest_FD; ++fd) {
F = &fd_table[fd];
if (!F->flags.open)
fde *F = NULL;
AsyncCall::Pointer callback;
- for (fd = 0; fd <= Biggest_FD; fd++) {
+ for (fd = 0; fd <= Biggest_FD; ++fd) {
F = &fd_table[fd];
if (writeTimedOut(fd)) {
}
}
-void CommIO::Initialise()
-{
- /* Initialize done pipe signal */
- int DonePipe[2];
- if (pipe(DonePipe)) {}
- DoneFD = DonePipe[1];
- DoneReadFD = DonePipe[0];
- fd_open(DoneReadFD, FD_PIPE, "async-io completetion event: main");
- fd_open(DoneFD, FD_PIPE, "async-io completetion event: threads");
- commSetNonBlocking(DoneReadFD);
- commSetNonBlocking(DoneFD);
- Comm::SetSelect(DoneReadFD, COMM_SELECT_READ, NULLFDHandler, NULL, 0);
- Initialised = true;
-}
-
-void CommIO::NotifyIOClose()
-{
- /* Close done pipe signal */
- FlushPipe();
- close(DoneFD);
- close(DoneReadFD);
- fd_close(DoneFD);
- fd_close(DoneReadFD);
- Initialised = false;
-}
-
-bool CommIO::Initialised = false;
-bool CommIO::DoneSignalled = false;
-int CommIO::DoneFD = -1;
-int CommIO::DoneReadFD = -1;
-
-void
-CommIO::FlushPipe()
-{
- char buf[256];
- FD_READ_METHOD(DoneReadFD, buf, sizeof(buf));
-}
-
-void
-CommIO::NULLFDHandler(int fd, void *data)
-{
- FlushPipe();
- Comm::SetSelect(fd, COMM_SELECT_READ, NULLFDHandler, NULL, 0);
-}
-
-void
-CommIO::ResetNotifications()
-{
- if (DoneSignalled) {
- FlushPipe();
- DoneSignalled = false;
- }
-}
-
/// Start waiting for a possibly half-closed connection to close
// by scheduling a read callback to a monitoring handler that
// will close the connection on read errors.
commPlanHalfClosedCheck(); // make sure this fd will be checked again
}
-
CommRead::CommRead() : conn(NULL), buf(NULL), len(0), callback(NULL) {}
CommRead::CommRead(const Comm::ConnectionPointer &c, char *buf_, int len_, AsyncCall::Pointer &callback_)
// We have to use a global function as a closer and point to temp
// instead of "this" because DeferredReadManager is not a job and
// is not even cbdata protected
+ // XXX: and yet we use cbdata protection functions on it??
AsyncCall::Pointer closer = commCbCall(5,4,
"DeferredReadManager::CloseHandler",
CommCloseCbPtrFun(&CloseHandler, temp));
}
void
-DeferredReadManager::CloseHandler(int fd, void *thecbdata)
+DeferredReadManager::CloseHandler(const CommCloseCbParams ¶ms)
{
- if (!cbdataReferenceValid (thecbdata))
+ if (!cbdataReferenceValid(params.data))
return;
- CbDataList<DeferredRead> *temp = (CbDataList<DeferredRead> *)thecbdata;
+ CbDataList<DeferredRead> *temp = (CbDataList<DeferredRead> *)params.data;
temp->element.closer = NULL;
temp->element.markCancelled();
assert (!deferredReads.empty());
DeferredRead &read = deferredReads.head->element;
+
+ // NOTE: at this point the connection has been paused/stalled for an unknown
+ // amount of time. We must re-validate that it is active and usable.
+
+ // If the connection has been closed already. Cancel this read.
+ if (!Comm::IsConnOpen(read.theRead.conn)) {
+ if (read.closer != NULL) {
+ read.closer->cancel("Connection closed before.");
+ read.closer = NULL;
+ }
+ read.markCancelled();
+ }
+
if (!read.cancelled) {
comm_remove_close_handler(read.theRead.conn->fd, read.closer);
read.closer = NULL;
PROF_start(comm_open);
/* Create socket for accepting new connections. */
- statCounter.syscalls.sock.sockets++;
+ ++ statCounter.syscalls.sock.sockets;
/* Setup the socket addrinfo details for use */
struct addrinfo AI;