\ingroup ServerProtocol
*/
+#include "comm/forward.h"
+#include "ip/Address.h"
#include "StoreClient.h"
/**
struct timeval queue_time;
};
+extern Comm::ConnectionPointer icpIncomingConn;
+extern Comm::ConnectionPointer icpOutgoingConn;
+extern Ip::Address theIcpPublicHostID;
+
/// \ingroup ServerProtocolICPAPI
HttpRequest* icpGetRequest(char *url, int reqnum, int fd, Ip::Address &from);
debugs(9,3, HERE << "waiting for body production end or abort");
}
-#if 0
-bool
-ServerStateData::canSend(int fd) const
-{
- return fd >= 0 && !fd_table[fd].closing();
-}
-#endif
-
void
ServerStateData::sendMoreRequestBody()
{
const Comm::ConnectionPointer conn = dataDescriptor();
if (!Comm::IsConnOpen(conn)) {
- debugs(9,3, HERE << "cannot send request body to closing FD " << (conn != NULL?conn->fd:-1));
+ debugs(9,3, HERE << "cannot send request body to closing " << conn);
return; // wait for the kid's close handler; TODO: assert(closer);
}
if (getMoreRequestBody(buf) && buf.contentSize() > 0) {
debugs(9,3, HERE << "will write " << buf.contentSize() << " request body bytes");
typedef CommCbMemFunT<ServerStateData, CommIoCbParams> Dialer;
- requestSender = JobCallback(93,3,
- Dialer, this, ServerStateData::sentRequestBody);
- comm_write_mbuf(conn->fd, &buf, requestSender);
+ requestSender = JobCallback(93,3, Dialer, this, ServerStateData::sentRequestBody);
+ comm_write_mbuf(conn, &buf, requestSender);
} else {
debugs(9,3, HERE << "will wait for more request body bytes or eof");
requestSender = NULL;
#ifndef SQUID_SERVER_H
#define SQUID_SERVER_H
-#include "StoreIOBuffer.h"
-#include "forward.h"
-#include "BodyPipe.h"
-#include "base/AsyncJob.h"
-#include "CommCalls.h"
-
+#include "config.h"
#if USE_ADAPTATION
#include "adaptation/forward.h"
#include "adaptation/Initiator.h"
#endif
+#include "base/AsyncJob.h"
+#include "BodyPipe.h"
+#include "CommCalls.h"
+#include "forward.h"
+#include "StoreIOBuffer.h"
/**
* ServerStateData is a common base for server-side classes such as
* virgin responses using ICAP, and provide the client-side consumer with
* responses.
*
- \todo TODO: Rename to ServerStateDataInfoRecordHandler.
+ * \todo TODO: Rename to something clearer.
*/
class ServerStateData:
#if USE_ADAPTATION
// comm module will free the buffer
typedef CommCbMemFunT<Adaptation::Icap::Xaction, CommIoCbParams> Dialer;
writer = JobCallback(93, 3, Dialer, this, Adaptation::Icap::Xaction::noteCommWrote);
- comm_write_mbuf(connection->fd, &buf, writer);
+ comm_write_mbuf(connection, &buf, writer);
updateTimeout();
}
#ifndef _SQUID_BASE_SUBSCRIPTION_H
#define _SQUID_BASE_SUBSCRIPTION_H
-#include "RefCount.h"
#include "base/AsyncCall.h"
/**
}
/**
- \ingroup CacheManagerAPI
+ * \ingroup CacheManagerAPI
* Registers a C++-style action, via a poiner to a subclass of
* a CacheManagerAction object, whose run() method will be invoked when
* CacheManager identifies that the user has requested the action.
#define comm_close comm_lingering_close
#endif
-/// dials clientHttpConnectionOpened or clientHttpsConnectionOpened call
+/// dials clientListenerConnectionOpened call
class ListeningStartedDialer: public CallDialer, public Ipc::StartListeningCb
{
public:
- typedef void (*Handler)(int fd, int errNo, http_port_list *portCfg);
- ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg):
- handler(aHandler), portCfg(aPortCfg) {}
+ typedef void (*Handler)(int errNo, http_port_list *portCfg, bool uses_ssl);
+ ListeningStartedDialer(Handler aHandler, http_port_list *aPortCfg, bool aSslFlag):
+ handler(aHandler), portCfg(aPortCfg), uses_ssl(aSslFlag) {}
virtual void print(std::ostream &os) const {
startPrint(os) <<
- ", port=" << (void*)portCfg << ')';
+ ", " << (uses_ssl? "SSL " :"") << "port=" << (void*)portCfg << ')';
}
virtual bool canDial(AsyncCall &) const { return true; }
- virtual void dial(AsyncCall &) { (handler)(fd, errNo, portCfg); }
+ virtual void dial(AsyncCall &) { (handler)(errNo, portCfg, uses_ssl); }
public:
Handler handler;
private:
http_port_list *portCfg; ///< from Config.Sockaddr.http
+ bool uses_ssl;
};
-static void clientHttpConnectionOpened(int fd, int errNo, http_port_list *s);
-#if USE_SSL
-static void clientHttpsConnectionOpened(int fd, int errNo, http_port_list *s);
-#endif
+static void clientListenerConnectionOpened(int errNo, http_port_list *s, bool uses_ssl);
/* our socket-related context */
/* other */
static IOCB clientWriteComplete;
static IOCB clientWriteBodyComplete;
+static IOACB httpAccept;
+static IOACB httpsAccept;
static bool clientParseRequest(ConnStateData * conn, bool &do_next_read);
static PF clientLifetimeTimeout;
static ClientSocketContext *parseHttpRequestAbort(ConnStateData * conn, const char *uri);
static ConnStateData *connStateCreate(const Comm::ConnectionPointer &details, http_port_list *port);
-// TODO make this return the conn for use instead.
-int
-ClientSocketContext::fd() const
+const Comm::ConnectionPointer &
+ClientSocketContext::clientConn() const
{
assert (http);
assert (http->getConn() != NULL);
assert (http->getConn()->clientConn != NULL);
- return http->getConn()->clientConn->fd;
+ return http->getConn()->clientConn;
}
clientStreamNode *
AsyncCall::Pointer call = commCbCall(33, 5, "ClientSocketContext::wroteControlMsg",
CommIoCbPtrFun(&WroteControlMsg, this));
- comm_write_mbuf(fd(), mb, call);
+ comm_write_mbuf(clientConn(), mb, call);
delete mb;
}
/// called when we wrote the 1xx response
void
-ClientSocketContext::wroteControlMsg(int fd, char *, size_t, comm_err_t errflag, int xerrno)
+ClientSocketContext::wroteControlMsg(const Comm::ConnectionPointer &conn, char *, size_t, comm_err_t errflag, int xerrno)
{
if (errflag == COMM_ERR_CLOSING)
return;
// close on 1xx errors to be conservative and to simplify the code
// (if we do not close, we must notify the source of a failure!)
- comm_close(fd);
+ Comm::ConnectionPointer nonConst = conn;
+ nonConst->close();
}
/// wroteControlMsg() wrapper: ClientSocketContext is not an AsyncJob
void
-ClientSocketContext::WroteControlMsg(int fd, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data)
+ClientSocketContext::WroteControlMsg(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data)
{
ClientSocketContext *context = static_cast<ClientSocketContext*>(data);
- context->wroteControlMsg(fd, bufnotused, size, errflag, xerrno);
+ context->wroteControlMsg(conn, bufnotused, size, errflag, xerrno);
}
#if USE_IDENT
noteSentBodyBytes (length);
AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteBodyComplete",
CommIoCbPtrFun(clientWriteBodyComplete, this));
- comm_write(fd(), bodyData.data, length, call );
+ comm_write(clientConn(), bodyData.data, length, call );
return;
}
/* write */
AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
CommIoCbPtrFun(clientWriteComplete, this));
- comm_write_mbuf(fd(), &mb, call);
+ comm_write_mbuf(clientConn(), &mb, call);
} else
- writeComplete(fd(), NULL, 0, COMM_OK);
+ writeComplete(clientConn(), NULL, 0, COMM_OK);
}
/**
debugs(33,7, HERE << "sendStartOfMessage schedules clientWriteComplete");
AsyncCall::Pointer call = commCbCall(33, 5, "clientWriteComplete",
CommIoCbPtrFun(clientWriteComplete, this));
- comm_write_mbuf(fd(), mb, call);
+ comm_write_mbuf(clientConn(), mb, call);
delete mb;
}
const bool mustSendLastChunk = http->request->flags.chunked_reply &&
!http->request->flags.stream_error && !context->startOfOutput();
if (responseFinishedOrFailed(rep, receivedData) && !mustSendLastChunk) {
- context->writeComplete(http->getConn()->clientConn->fd, NULL, 0, COMM_OK);
+ context->writeComplete(http->getConn()->clientConn, NULL, 0, COMM_OK);
PROF_stop(clientSocketRecipient);
return;
}
}
static void
-clientWriteBodyComplete(int fd, char *buf, size_t size, comm_err_t errflag, int xerrno, void *data)
+clientWriteBodyComplete(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t errflag, int xerrno, void *data)
{
debugs(33,7, HERE << "clientWriteBodyComplete schedules clientWriteComplete");
- clientWriteComplete(fd, NULL, size, errflag, xerrno, data);
+ clientWriteComplete(conn, NULL, size, errflag, xerrno, data);
}
void
/** first update iterator "i" if needed */
if (!http->range_iter.debt()) {
- debugs(33, 5, "ClientSocketContext::canPackMoreRanges: At end of current range spec for FD " << fd());
+ debugs(33, 5, "ClientSocketContext::canPackMoreRanges: At end of current range spec for FD " << clientConn());
if (http->range_iter.pos.incrementable())
++http->range_iter.pos;
void
ClientSocketContext::pullData()
{
- debugs(33, 5, "ClientSocketContext::pullData: FD " << fd() <<
+ debugs(33, 5, "ClientSocketContext::pullData: FD " << clientConn() <<
" attempting to pull upstream data");
/* More data will be coming from the stream. */
if (!canPackMoreRanges()) {
debugs(33, 5, HERE << "Range request at end of returnable " <<
- "range sequence on FD " << fd());
+ "range sequence on " << clientConn());
if (http->request->flags.proxy_keepalive)
return STREAM_COMPLETE;
* no more data to send.
*/
void
-clientWriteComplete(int fd, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data)
+clientWriteComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data)
{
ClientSocketContext *context = (ClientSocketContext *)data;
- context->writeComplete (fd, bufnotused, size, errflag);
+ context->writeComplete(conn, bufnotused, size, errflag);
}
/// remembers the abnormal connection termination for logging purposes
}
void
-ClientSocketContext::writeComplete(int aFileDescriptor, char *bufnotused, size_t size, comm_err_t errflag)
+ClientSocketContext::writeComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, comm_err_t errflag)
{
StoreEntry *entry = http->storeEntry();
http->out.size += size;
- assert(aFileDescriptor > -1);
- debugs(33, 5, "clientWriteComplete: FD " << aFileDescriptor << ", sz " << size <<
+ assert(Comm::IsConnOpen(conn));
+ debugs(33, 5, HERE << conn << ", sz " << size <<
", err " << errflag << ", off " << http->out.size << ", len " <<
entry ? entry->objectLen() : 0);
clientUpdateSocketStats(http->logType, size);
if (errflag == COMM_ERR_CLOSING)
return;
- assert (Comm::IsConnOpen(http->getConn()->clientConn) && this->fd() == aFileDescriptor);
+ assert(Comm::IsConnOpen(clientConn()) && clientConn()->fd == conn->fd);
- if (errflag || clientHttpRequestStatus(aFileDescriptor, http)) {
+ if (errflag || clientHttpRequestStatus(conn->fd, http)) {
initiateClose("failure or true request status");
/* Do we leak here ? */
return;
break;
case STREAM_COMPLETE:
- debugs(33, 5, "clientWriteComplete: FD " << aFileDescriptor << " Keeping Alive");
+ debugs(33, 5, HERE << conn << " Keeping Alive");
keepaliveNextRequest();
return;
/// check FD after clientHttp[s]ConnectionOpened, adjust HttpSockets as needed
static bool
-OpenedHttpSocket(int fd, const char *msgIfFail)
+OpenedHttpSocket(const Comm::ConnectionPointer &clientConn, const char *msgIfFail)
{
- if (fd < 0) {
+ if (!Comm::IsConnOpen(clientConn)) {
Must(NHttpSockets > 0); // we tried to open some
--NHttpSockets; // there will be fewer sockets than planned
Must(HttpSockets[NHttpSockets] < 0); // no extra fds received
/// find any unused HttpSockets[] slot and store fd there or return false
static bool
-AddOpenedHttpSocket(int fd)
+AddOpenedHttpSocket(const Comm::ConnectionPointer &conn)
{
bool found = false;
for (int i = 0; i < NHttpSockets && !found; i++) {
if ((found = HttpSockets[i] < 0))
- HttpSockets[i] = fd;
+ HttpSockets[i] = conn->fd;
}
return found;
}
++bumpCount;
#endif
- /* AYJ: 2009-12-27: bit bumpy. new ConnAcceptor(...) should be doing all the Comm:: stuff ... */
+// NOTE: would the design here be better if we opened both the ConnAcceptor and IPC informative messages now?
+// that way we have at least one worker listening on the socket immediately with others joining in as
+// they receive the IPC message.
- const int openFlags = COMM_NONBLOCKING |
- (s->spoof_client_ip ? COMM_TRANSPARENT : 0);
+ // Fill out a Comm::Connection which IPC will open as a listener for us
+ // then pass back so we can start a ConnAcceptor subscription.
+ s->listenConn = new Comm::Connection;
+ s->listenConn->local = s->s;
+ s->listenConn->flags = COMM_NONBLOCKING | (s->spoof_client_ip ? COMM_TRANSPARENT : 0);
- AsyncCall::Pointer callback = asyncCall(33,2,
- "clientHttpConnectionOpened",
- ListeningStartedDialer(&clientHttpConnectionOpened, s));
- Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->s, openFlags,
- Ipc::fdnHttpSocket, callback);
+ // setup the subscriptions such that new connections accepted by listenConn are handled by HTTP
+ typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+ RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s));
+ Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
- HttpSockets[NHttpSockets++] = -1; // set in clientHttpConnectionOpened
+ AsyncCall::Pointer listenCall = asyncCall(33,2, "clientListenerConnectionOpened",
+ ListeningStartedDialer(&clientListenerConnectionOpened, s, false));
+ Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->listenConn, Ipc::fdnHttpSocket, listenCall, sub);
+
+ HttpSockets[NHttpSockets++] = -1; // set in clientListenerHttpConnectionOpened
}
#if USE_SSL
#endif
}
-/// process clientHttpConnectionsOpen result
-static void
-clientHttpConnectionOpened(int fd, int, http_port_list *s)
-{
- if (!OpenedHttpSocket(fd, "Cannot open HTTP Port"))
- return;
-
- Must(s);
-
- s->listenConn = new Comm::Connection();
- s->listenConn->local = s->s;
- s->listenConn->fd = fd;
-
- // TODO: hide most of this subscription stuff away.
- typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
- RefCount<AcceptCall> call = commCbCall(5, 5, "httpAccept", CommAcceptCbPtrFun(httpAccept, s));
- Subscription::Pointer sub = new CallSubscription<AcceptCall>(call);
- AsyncJob::Start(new Comm::ConnAcceptor(s->listenConn, "HTTP Listener", sub));
-
- debugs(1, 1, "Accepting" <<
- (s->intercepted ? " intercepted" : "") <<
- (s->spoof_client_ip ? " spoofing" : "") <<
- (s->sslBump ? " bumpy" : "") <<
- (s->accel ? " accelerated" : "")
- << " HTTP connections at " << s->s << ", FD " << fd << ".");
-
- Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
-}
-
#if USE_SSL
static void
clientHttpsConnectionsOpen(void)
{
+// XXX: de-dupe clientHttpConnectionsOpened and clientHttpsConnectionsOpened
+
https_port_list *s;
for (s = Config.Sockaddr.https; s; s = (https_port_list *)s->http.next) {
continue;
}
- AsyncCall::Pointer call = asyncCall(33, 2, "clientHttpsConnectionOpened",
- ListeningStartedDialer(&clientHttpsConnectionOpened, &s->http));
+ // Fill out a Comm::Connection which IPC will open as a listener for us
+ s->http.listenConn = new Comm::Connection;
+ s->http.listenConn->local = s->http.s;
+ s->http.listenConn->flags = COMM_NONBLOCKING | (s->http.spoof_client_ip ? COMM_TRANSPARENT : 0);
+
+ // setup the subscriptions such that new connections accepted by listenConn are handled by HTTPS
+ typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
+ RefCount<AcceptCall> subCall = commCbCall(5, 5, "httpsAccept", CommAcceptCbPtrFun(httpsAccept, s));
+ Subscription::Pointer sub = new CallSubscription<AcceptCall>(subCall);
- Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->http.s, COMM_NONBLOCKING,
- Ipc::fdnHttpsSocket, call);
+ AsyncCall::Pointer listenCall = asyncCall(33, 2, "clientListenerConnectionOpened",
+ ListeningStartedDialer(&clientListenerConnectionOpened, &s->http, true));
+
+ Ipc::StartListening(SOCK_STREAM, IPPROTO_TCP, s->listenConn, Ipc::fdnHttpsSocket, listenCall, sub);
HttpSockets[NHttpSockets++] = -1;
}
}
+#endif
-/// process clientHttpsConnectionsOpen result
+/// process clientHttpConnectionsOpen result
static void
-clientHttpsConnectionOpened(int fd, int, http_port_list *s)
+clientListenerConnectionOpened(int, http_port_list *s, bool uses_ssl)
{
- if (!OpenedHttpSocket(fd, "Cannot open HTTPS Port"))
+ if (!OpenedHttpSocket(s->listenConn, (uses_ssl?"Cannot open HTTP Port":"Cannot open HTTPS Port")))
return;
Must(s);
+ Must(Comm::IsConnOpen(s->listenConn));
- s->listenConn = new Comm::Connection();
- s->listenConn->local = s->s;
- s->listenConn->fd = fd;
-
- // TODO: hide most of this subscription stuff away.
- typedef CommCbFunPtrCallT<CommAcceptCbPtrFun> AcceptCall;
- RefCount<AcceptCall> call = commCbCall(5, 5, "httpsAccept", CommAcceptCbPtrFun(httpsAccept, s));
- Subscription::Pointer sub = new CallSubscription<AcceptCall>(call);
- AsyncJob::Start(new Comm::ConnAcceptor(s->listenConn, "HTTPS Listener", sub));
-
- debugs(1, 1, "Accepting HTTPS connections at " << s->s << ", FD " << fd << ".");
+ debugs(1, 1, "Accepting" <<
+ (s->intercepted ? " intercepted" : "") <<
+ (s->spoof_client_ip ? " spoofing" : "") <<
+ (s->sslBump ? " bumpy" : "") <<
+ (s->accel ? " accelerated" : "")
+ << " HTTP" << (uses_ssl?"S":"") << " connections at " << s->listenConn << ".");
- Must(AddOpenedHttpSocket(fd)); // otherwise, we have received a fd we did not ask for
+ Must(AddOpenedHttpSocket(s->listenConn)); // otherwise, we have received a fd we did not ask for
}
-#endif
-
void
clientOpenListenSockets(void)
{
{
for (http_port_list *s = Config.Sockaddr.http; s; s = s->next) {
if (s->listenConn != NULL) {
- debugs(1, 1, "Closing HTTP port " << s->s);
+ debugs(1, 1, "Closing HTTP port " << s->listenConn->local);
s->listenConn->close();
s->listenConn = NULL;
}
#if USE_SSL
for (http_port_list *s = Config.Sockaddr.https; s; s = s->next) {
if (s->listenConn != NULL) {
- debugs(1, 1, "Closing HTTPS port " << s->s);
+ debugs(1, 1, "Closing HTTPS port " << s->listenConn->local);
s->listenConn->close();
s->listenConn = NULL;
}
ClientSocketContext();
~ClientSocketContext();
bool startOfOutput() const;
- void writeComplete(int fd, char *bufnotused, size_t size, comm_err_t errflag);
+ void writeComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, comm_err_t errflag);
void keepaliveNextRequest();
ClientHttpRequest *http; /* we own this */
HttpReply *reply;
size_t lengthToSend(Range<int64_t> const &available);
void noteSentBodyBytes(size_t);
void buildRangeHeader(HttpReply * rep);
- int fd() const;
+ const Comm::ConnectionPointer & clientConn() const;
clientStreamNode * getTail() const;
clientStreamNode * getClientReplyContext() const;
void connIsFinished();
void writeControlMsg(HttpControlMsg &msg);
protected:
- static void WroteControlMsg(int fd, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data);
- void wroteControlMsg(int fd, char *bufnotused, size_t size, comm_err_t errflag, int xerrno);
+ static IOCB WroteControlMsg; //(int fd, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data);
+ void wroteControlMsg(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, comm_err_t errflag, int xerrno);
private:
CBDATA_CLASS(ClientSocketContext);
// called when comm_write has completed
static void
-SslBumpEstablish(int, char *, size_t, comm_err_t errflag, int, void *data)
+SslBumpEstablish(const Comm::ConnectionPointer &, char *, size_t, comm_err_t errflag, int, void *data)
{
ClientHttpRequest *r = static_cast<ClientHttpRequest*>(data);
debugs(85, 5, HERE << "responded to CONNECT: " << r << " ? " << errflag);
debugs(33, 7, HERE << "Confirming CONNECT tunnel on FD " << getConn()->clientConn);
// TODO: Unify with tunnel.cc and add a Server(?) header
- static const char *const conn_established =
- "HTTP/1.0 200 Connection established\r\n\r\n";
- comm_write(getConn()->clientConn->fd, conn_established, strlen(conn_established),
- &SslBumpEstablish, this, NULL);
+ static const char *const conn_established = "HTTP/1.0 200 Connection established\r\n\r\n";
+ comm_write(getConn()->clientConn, conn_established, strlen(conn_established), &SslBumpEstablish, this, NULL);
}
#endif
static void commStopHalfClosedMonitor(int fd);
static IOCB commHalfClosedReader;
-static void comm_init_opened(int new_socket, Ip::Address &addr, unsigned char TOS, const char *note, struct addrinfo *AI);
+static void comm_init_opened(const Comm::ConnectionPointer &conn, unsigned char TOS, const char *note, struct addrinfo *AI);
static int comm_apply_flags(int new_socket, Ip::Address &addr, int flags, struct addrinfo *AI);
return comm_openex(sock_type, proto, addr, flags, 0, note);
}
+void
+comm_open_listener(int sock_type,
+ int proto,
+ Comm::ConnectionPointer &conn,
+ const char *note)
+{
+ /* all listener sockets require bind() */
+ conn->flags |= COMM_DOBIND;
+
+ /* attempt native enabled port. */
+ conn->fd = comm_openex(sock_type, proto, conn->local, conn->flags, 0, note);
+}
+
int
comm_open_listener(int sock_type,
int proto,
if ( Ip::EnableIpv6&IPV6_SPECIAL_V4MAPPING && addr.IsIPv6() )
comm_set_v6only(new_socket, 0);
- comm_init_opened(new_socket, addr, TOS, note, AI);
+ // temporary for te transition. comm_openex will eventually have a conn to play with.
+ Comm::ConnectionPointer temp = new Comm::Connection;
+ temp->fd = new_socket;
+ temp->local = addr;
+ comm_init_opened(temp, TOS, note, AI);
new_socket = comm_apply_flags(new_socket, addr, flags, AI);
addr.FreeAddrInfo(AI);
/// update FD tables after a local or remote (IPC) comm_openex();
void
-comm_init_opened(int new_socket,
- Ip::Address &addr,
+comm_init_opened(const Comm::ConnectionPointer &conn,
unsigned char TOS,
const char *note,
struct addrinfo *AI)
{
- assert(new_socket >= 0);
+ assert(Comm::IsConnOpen(conn));
assert(AI);
- fde *F = NULL;
-
/* update fdstat */
- debugs(5, 5, "comm_open: FD " << new_socket << " is a new socket");
-
- assert(!isOpen(new_socket));
- fd_open(new_socket, FD_SOCKET, note);
-
- fdd_table[new_socket].close_file = NULL;
-
- fdd_table[new_socket].close_line = 0;
+ debugs(5, 5, HERE << conn << " is a new socket");
- F = &fd_table[new_socket];
+ assert(!isOpen(conn->fd)); // NP: global isOpen checks the fde entry for openness not the Comm::Connection
+ fd_open(conn->fd, FD_SOCKET, note);
- F->local_addr = addr;
+ fdd_table[conn->fd].close_file = NULL;
+ fdd_table[conn->fd].close_line = 0;
+ fde *F = &fd_table[conn->fd];
+ F->local_addr = conn->local;
F->tos = TOS;
-
F->sock_family = AI->ai_family;
}
}
void
-comm_import_opened(int fd,
- Ip::Address &addr,
- int flags,
+comm_import_opened(const Comm::ConnectionPointer &conn,
const char *note,
struct addrinfo *AI)
{
- debugs(5, 2, HERE << " FD " << fd << " at " << addr);
- assert(fd >= 0);
+ debugs(5, 2, HERE << conn);
+ assert(Comm::IsConnOpen(conn));
assert(AI);
- comm_init_opened(fd, addr, 0, note, AI);
+ comm_init_opened(conn, 0, note, AI);
- if (!(flags & COMM_NOCLOEXEC))
- fd_table[fd].flags.close_on_exec = 1;
+ if (!(conn->flags & COMM_NOCLOEXEC))
+ fd_table[conn->fd].flags.close_on_exec = 1;
- if (addr.GetPort() > (u_short) 0) {
+ if (conn->local.GetPort() > (u_short) 0) {
#ifdef _SQUID_MSWIN_
- if (sock_type != SOCK_DGRAM)
+ if (AI->ai_socktype != SOCK_DGRAM)
#endif
- fd_table[fd].flags.nolinger = 1;
+ fd_table[conn->fd].flags.nolinger = 1;
}
- if ((flags & COMM_TRANSPARENT))
- fd_table[fd].flags.transparent = 1;
+ if ((conn->flags & COMM_TRANSPARENT))
+ fd_table[conn->fd].flags.transparent = 1;
- if (flags & COMM_NONBLOCKING)
- fd_table[fd].flags.nonblocking = 1;
+ if (conn->flags & COMM_NONBLOCKING)
+ fd_table[conn->fd].flags.nonblocking = 1;
#ifdef TCP_NODELAY
if (AI->ai_socktype == SOCK_STREAM)
- fd_table[fd].flags.nodelay = 1;
+ fd_table[conn->fd].flags.nodelay = 1;
#endif
/* no fd_table[fd].flags. updates needed for these conditions:
* free_func is used to free the passed buffer when the write has completed.
*/
void
-comm_write(int fd, const char *buf, int size, IOCB * handler, void *handler_data, FREE * free_func)
+comm_write(const Comm::ConnectionPointer &conn, const char *buf, int size, IOCB * handler, void *handler_data, FREE * free_func)
{
AsyncCall::Pointer call = commCbCall(5,5, "SomeCommWriteHander",
CommIoCbPtrFun(handler, handler_data));
- comm_write(fd, buf, size, call, free_func);
+ comm_write(conn, buf, size, call, free_func);
}
void
-comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
+comm_write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func)
{
- debugs(5, 5, "comm_write: FD " << fd << ": sz " << size << ": asynCall " << callback);
+ debugs(5, 5, HERE << conn << ": sz " << size << ": asynCall " << callback);
/* Make sure we are open, not closing, and not writing */
- assert(isOpen(fd));
- assert(!fd_table[fd].closing());
- comm_io_callback_t *ccb = COMMIO_FD_WRITECB(fd);
+ assert(Comm::IsConnOpen(conn));
+ assert(!fd_table[conn->fd].closing());
+ comm_io_callback_t *ccb = COMMIO_FD_WRITECB(conn->fd);
assert(!ccb->active());
- fd_table[fd].writeStart = squid_curtime;
+ fd_table[conn->fd].writeStart = squid_curtime;
/* Queue the write */
- commio_set_callback(fd, IOCB_WRITE, ccb, callback,
+ commio_set_callback(conn->fd, IOCB_WRITE, ccb, callback,
(char *)buf, free_func, size);
- commSetSelect(fd, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
+ commSetSelect(conn->fd, COMM_SELECT_WRITE, commHandleWrite, ccb, 0);
}
/* a wrapper around comm_write to allow for MemBuf to be comm_written in a snap */
+#if 0
void
comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data)
{
{
comm_write(fd, mb->buf, mb->size, callback, mb->freeFunc());
}
+#endif
+
+void
+comm_write_mbuf(const Comm::ConnectionPointer &conn, MemBuf *mb, IOCB * handler, void *handler_data)
+{
+ comm_write(conn, mb->buf, mb->size, handler, handler_data, mb->freeFunc());
+}
+
+void
+comm_write_mbuf(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback)
+{
+ comm_write(conn, mb->buf, mb->size, callback, mb->freeFunc());
+}
/*
/// I/O handler for the possibly half-closed connection monitoring code
static void
-commHalfClosedReader(int fd, char *, size_t size, comm_err_t flag, int, void *)
+commHalfClosedReader(const Comm::ConnectionPointer &conn, char *, size_t size, comm_err_t flag, int, void *)
{
// there cannot be more data coming in on half-closed connections
assert(size == 0);
- assert(commHasHalfClosedMonitor(fd)); // or we would have canceled the read
+ assert(conn != NULL);
+ assert(commHasHalfClosedMonitor(conn->fd)); // or we would have canceled the read
- fd_table[fd].halfClosedReader = NULL; // done reading, for now
+ fd_table[conn->fd].halfClosedReader = NULL; // done reading, for now
// nothing to do if fd is being closed
if (flag == COMM_ERR_CLOSING)
// if read failed, close the connection
if (flag != COMM_OK) {
- debugs(5, 3, "commHalfClosedReader: closing FD " << fd);
- comm_close(fd);
+ debugs(5, 3, HERE << "closing " << conn);
+ Comm::ConnectionPointer nonConst = conn;
+ nonConst->close();
return;
}
SQUIDCEXTERN int comm_open(int, int, Ip::Address &, int, const char *note);
SQUIDCEXTERN int comm_open_uds(int sock_type, int proto, struct sockaddr_un* addr, int flags);
/// update Comm state after getting a comm_open() FD from another process
-SQUIDCEXTERN void comm_import_opened(int fd, Ip::Address &addr, int flags, const char *note, struct addrinfo *AI);
+SQUIDCEXTERN void comm_import_opened(const Comm::ConnectionPointer &, const char *note, struct addrinfo *AI);
/**
* Open a port specially bound for listening or sending through a specific port.
* (in debugs or cachemgr) will occur in Native IPv4 format.
* A reconfigure is needed to reset the stored IP in most cases and attempt a port re-open.
*/
-SQUIDCEXTERN int comm_open_listener(int sock_type, int proto, Ip::Address &addr, int flags, const char *note);
+extern int comm_open_listener(int sock_type, int proto, Ip::Address &addr, int flags, const char *note);
+extern void comm_open_listener(int sock_type, int proto, Comm::ConnectionPointer &conn, const char *note);
SQUIDCEXTERN int comm_openex(int, int, Ip::Address &, int, unsigned char TOS, const char *);
SQUIDCEXTERN u_short comm_local_port(int fd);
SQUIDCEXTERN void commResetSelect(int);
SQUIDCEXTERN int comm_udp_sendto(int sock, const Ip::Address &to, const void *buf, int buflen);
-extern void comm_write(int fd, const char *buf, int len, IOCB *callback, void *callback_data, FREE *func);
-extern void comm_write(int fd, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func = NULL);
-SQUIDCEXTERN void comm_write_mbuf(int fd, MemBuf *mb, IOCB * handler, void *handler_data);
-extern void comm_write_mbuf(int fd, MemBuf *mb, AsyncCall::Pointer &callback);
+extern void comm_write(const Comm::ConnectionPointer &conn, const char *buf, int len, IOCB *callback, void *callback_data, FREE *func);
+extern void comm_write(const Comm::ConnectionPointer &conn, const char *buf, int size, AsyncCall::Pointer &callback, FREE * free_func = NULL);
+SQUIDCEXTERN void comm_write_mbuf(const Comm::ConnectionPointer &conn, MemBuf *mb, IOCB * handler, void *handler_data);
+extern void comm_write_mbuf(const Comm::ConnectionPointer &conn, MemBuf *mb, AsyncCall::Pointer &callback);
SQUIDCEXTERN void commCallCloseHandlers(int fd);
SQUIDCEXTERN int commSetTimeout(int fd, int, PF *, void *);
-extern int commSetTimeout(int fd, int, AsyncCall::Pointer &calback);
+extern int commSetTimeout(int fd, int, AsyncCall::Pointer &callback);
/**
* Set or clear the timeout for some action on an active connection.
#endif
static void idnsCacheQuery(idns_query * q);
static void idnsSendQuery(idns_query * q);
+static void idnsDoSendQueryVC(nsvc *vc);
static CNCB idnsInitVCConnected;
-
static IOCB idnsReadVCHeader;
-static void idnsDoSendQueryVC(nsvc *vc);
+static IOCB idnsReadVC;
+static IOCB idnsSentQueryVC;
static int idnsFromKnownNameserver(Ip::Address const &from);
static idns_query *idnsFindQuery(unsigned short id);
}
static void
-idnsSentQueryVC(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
+idnsSentQueryVC(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
{
nsvc * vc = (nsvc *)data;
if (flag == COMM_ERR_CLOSING)
return;
- if (fd_table[fd].closing())
+ // XXX: irrelevant now that we have conn pointer?
+ if (!Comm::IsConnOpen(conn) || fd_table[conn->fd].closing())
return;
if (flag != COMM_OK || size <= 0) {
- comm_close(fd);
+ Comm::ConnectionPointer nonConst = conn;
+ nonConst->close();
return;
}
commSetTimeout(vc->conn->fd, Config.Timeout.idns_query, NULL, NULL);
- comm_write_mbuf(vc->conn->fd, mb, idnsSentQueryVC, vc);
+ comm_write_mbuf(vc->conn, mb, idnsSentQueryVC, vc);
delete mb;
}
}
static void
-idnsReadVC(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+idnsReadVC(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
nsvc * vc = (nsvc *)data;
return;
if (flag != COMM_OK || len <= 0) {
- comm_close(fd);
+ if (Comm::IsConnOpen(conn)) {
+ Comm::ConnectionPointer nonConst = conn;
+ nonConst->close();
+ }
return;
}
vc->msg->size += len; // XXX should not access -> size directly
if (vc->msg->contentSize() < vc->msglen) {
- comm_read(fd, buf + len, vc->msglen - vc->msg->contentSize(), idnsReadVC, vc);
+ comm_read(conn->fd, buf + len, vc->msglen - vc->msg->contentSize(), idnsReadVC, vc);
return;
}
assert(vc->ns < nns);
- debugs(78, 3, "idnsReadVC: FD " << fd << ": received " <<
- (int) vc->msg->contentSize() << " bytes via tcp from " <<
- nameservers[vc->ns].S << ".");
+ debugs(78, 3, HERE << conn << ": received " << vc->msg->contentSize() << " bytes via TCP from " << nameservers[vc->ns].S << ".");
idnsGrokReply(vc->msg->buf, vc->msg->contentSize());
vc->msg->clean();
- comm_read(fd, (char *)&vc->msglen, 2 , idnsReadVCHeader, vc);
+ comm_read(conn->fd, (char *)&vc->msglen, 2 , idnsReadVCHeader, vc);
}
static void
-idnsReadVCHeader(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+idnsReadVCHeader(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
nsvc * vc = (nsvc *)data;
return;
if (flag != COMM_OK || len <= 0) {
- comm_close(fd);
+ if (Comm::IsConnOpen(conn)) {
+ Comm::ConnectionPointer nonConst = conn;
+ nonConst->close();
+ }
return;
}
assert(vc->read_msglen <= 2);
if (vc->read_msglen < 2) {
- comm_read(fd, buf + len, 2 - vc->read_msglen, idnsReadVCHeader, vc);
+ comm_read(conn->fd, buf + len, 2 - vc->read_msglen, idnsReadVCHeader, vc);
return;
}
vc->msglen = ntohs(vc->msglen);
vc->msg->init(vc->msglen, vc->msglen);
- comm_read(fd, vc->msg->buf, vc->msglen, idnsReadVC, vc);
+ comm_read(conn->fd, vc->msg->buf, vc->msglen, idnsReadVC, vc);
}
/*
*/
#include "config.h"
-#include "errorpage.h"
#include "auth/UserRequest.h"
-#include "SquidTime.h"
-#include "Store.h"
+#include "comm/Connection.h"
+#include "errorpage.h"
+#include "fde.h"
#include "HttpReply.h"
#include "HttpRequest.h"
-#include "MemObject.h"
-#include "fde.h"
#include "MemBuf.h"
+#include "MemObject.h"
#include "rfc1738.h"
+#include "SquidTime.h"
+#include "Store.h"
#include "URLScheme.h"
#include "wordlist.h"
}
void
-errorSend(int fd, ErrorState * err)
+errorSend(const Comm::ConnectionPointer &conn, ErrorState * err)
{
HttpReply *rep;
- debugs(4, 3, "errorSend: FD " << fd << ", err=" << err);
- assert(fd >= 0);
+ debugs(4, 3, HERE << conn << ", err=" << err);
+ assert(Comm::IsConnOpen(conn));
/*
* ugh, this is how we make sure error codes get back to
* the client side for logging and error tracking.
rep = err->BuildHttpReply();
- comm_write_mbuf(fd, rep->pack(), errorSendComplete, err);
+ comm_write_mbuf(conn, rep->pack(), errorSendComplete, err);
delete rep;
}
* closing the FD, otherwise we do it ourselves.
*/
static void
-errorSendComplete(int fd, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data)
+errorSendComplete(const Comm::ConnectionPointer &conn, char *bufnotused, size_t size, comm_err_t errflag, int xerrno, void *data)
{
ErrorState *err = static_cast<ErrorState *>(data);
- debugs(4, 3, "errorSendComplete: FD " << fd << ", size=" << size);
+ debugs(4, 3, HERE << conn << ", size=" << size);
if (errflag != COMM_ERR_CLOSING) {
if (err->callback) {
debugs(4, 3, "errorSendComplete: callback");
- err->callback(fd, err->callback_data, size);
+ err->callback(conn->fd, err->callback_data, size);
} else {
- comm_close(fd);
debugs(4, 3, "errorSendComplete: comm_close");
+ Comm::ConnectionPointer nonConst = conn;
+ nonConst->close();
}
}
#include "squid.h"
#include "auth/UserRequest.h"
#include "cbdata.h"
+#include "comm/forward.h"
#include "ip/Address.h"
/**
SQUIDCEXTERN void errorClean(void);
/**
- \ingroup ErrorPageAPI
+ * \ingroup ErrorPageAPI
*
* This function generates a error page from the info contained
* by err and then sends it to the client.
* The callback function errorSendComplete() is called after
- * the page has been written to the client socket (fd).
+ * the page has been written to the client (clientConn).
* errorSendComplete() deallocates err. We need to add
* err to the cbdata because comm_write() requires it
* for all callback data pointers.
*
- \note normally errorSend() should only be called from
- * routines in ssl.c and pass.c, where we don't have any
- * StoreEntry's. In client_side.c we must allocate a StoreEntry
- * for errors and use errorAppendEntry() to account for
- * persistent/pipeline connections.
+ * \note normally errorSend() should only be called from
+ * routines in ssl.c and pass.c, where we don't have any
+ * StoreEntry's. In client_side.c we must allocate a StoreEntry
+ * for errors and use errorAppendEntry() to account for
+ * persistent/pipeline connections.
*
- \param fd socket where page object is to be written
- \param err This object is destroyed after use in this function.
+ * \param clientConn socket where page object is to be written
+ * \param err This object is destroyed after use in this function.
*/
-SQUIDCEXTERN void errorSend(int fd, ErrorState *err);
+SQUIDCEXTERN void errorSend(const Comm::ConnectionPointer &conn, ErrorState *err);
/**
\ingroup ErrorPageAPI
ebuf = xstrdup(buf);
safe_free(ctrl.last_command);
-
safe_free(ctrl.last_reply);
-
ctrl.last_command = ebuf;
if (!Comm::IsConnOpen(ctrl.conn)) {
- debugs(9, 2, HERE << "cannot send to closing ctrl FD " << ctrl.conn->fd);
+ debugs(9, 2, HERE << "cannot send to closing ctrl " << ctrl.conn);
// TODO: assert(ctrl.closer != NULL);
return;
}
typedef CommCbMemFunT<FtpStateData, CommIoCbParams> Dialer;
AsyncCall::Pointer call = JobCallback(9, 5, Dialer, this, FtpStateData::ftpWriteCommandCallback);
- comm_write(ctrl.conn->fd,
- ctrl.last_command,
- strlen(ctrl.last_command),
- call);
+ comm_write(ctrl.conn, ctrl.last_command, strlen(ctrl.last_command), call);
scheduleReadControlReply(0);
}
return;
if (io.flag) {
- debugs(9, DBG_IMPORTANT, "ftpWriteCommandCallback: FD " << io.fd << ": " << xstrerr(io.xerrno));
+ debugs(9, DBG_IMPORTANT, "ftpWriteCommandCallback: " << io.conn << ": " << xstrerr(io.xerrno));
failed(ERR_WRITE_ERROR, io.xerrno);
/* failed closes ctrl.conn and frees ftpState */
return;
void
FtpStateData::scheduleReadControlReply(int buffered_ok)
{
- debugs(9, 3, HERE << "FD " << ctrl.conn->fd);
+ debugs(9, 3, HERE << ctrl.conn);
if (buffered_ok && ctrl.offset > 0) {
/* We've already read some reply data */
extern int opt_create_swap_dirs; /* 0 */
extern int opt_store_doublecheck; /* 0 */
extern int syslog_enable; /* 0 */
- extern int theInIcpConnection; /* -1 */
- extern int theOutIcpConnection; /* -1 */
extern int DnsSocketA; /* -1 */
extern int DnsSocketB; /* -1 */
-#if SQUID_SNMP
-
- extern int theInSnmpConnection; /* -1 */
- extern int theOutSnmpConnection; /* -1 */
- extern char *snmp_agentinfo;
-#endif
-
extern int n_disk_objects; /* 0 */
extern iostats IOStats;
int cso_recno;
int len;
char *buf; /* pts to a 4k page */
- int fd;
+ Comm::ConnectionPointer serverConn;
HttpRequest *req;
FwdState::Pointer fwd;
char replybuf[BUFSIZ];
/// \ingroup ServerProtocolGopherInternal
static void
-gopherStateFree(int fdnotused, void *data)
+gopherStateFree(int, void *data)
{
GopherStateData *gopherState = (GopherStateData *)data;
gopherTimeout(int fd, void *data)
{
GopherStateData *gopherState = (GopherStateData *)data;
- StoreEntry *entry = gopherState->entry;
- debugs(10, 4, "gopherTimeout: FD " << fd << ": '" << entry->url() << "'" );
+ debugs(10, 4, HERE << gopherState->serverConn << ": '" << gopherState->entry->url() << "'" );
gopherState->fwd->fail(errorCon(ERR_READ_TIMEOUT, HTTP_GATEWAY_TIMEOUT, gopherState->fwd->request));
- comm_close(fd);
+ if (Comm::IsConnOpen(gopherState->serverConn))
+ gopherState->serverConn->close();
}
/**
* Read until error or connection closed.
*/
static void
-gopherReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+gopherReadReply(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
GopherStateData *gopherState = (GopherStateData *)data;
StoreEntry *entry = gopherState->entry;
assert(buf == gopherState->replybuf);
if (EBIT_TEST(entry->flags, ENTRY_ABORTED)) {
- comm_close(fd);
+ gopherState->serverConn->close();
return;
}
/* leave one space for \0 in gopherToHTML */
+ debugs(10, 5, HERE << conn << " read len=" << len);
+
if (flag == COMM_OK && len > 0) {
#if DELAY_POOLS
delayId.bytesIn(len);
kb_incr(&statCounter.server.all.kbytes_in, len);
kb_incr(&statCounter.server.other.kbytes_in, len);
- }
- debugs(10, 5, "gopherReadReply: FD " << fd << " read len=" << len);
-
- if (flag == COMM_OK && len > 0) {
- commSetTimeout(fd, Config.Timeout.read, NULL, NULL);
+ commSetTimeout(conn->fd, Config.Timeout.read, NULL, NULL);
IOStats.Gopher.reads++;
for (clen = len - 1, bin = 0; clen; bin++)
if (ignoreErrno(errno)) {
do_next_read = 1;
} else {
- ErrorState *err;
- err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR, gopherState->fwd->request);
+ ErrorState *err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR, gopherState->fwd->request);
err->xerrno = errno;
gopherState->fwd->fail(err);
- comm_close(fd);
+ gopherState->serverConn->close();
do_next_read = 0;
}
} else if (len == 0 && entry->isEmpty()) {
gopherState->fwd->fail(errorCon(ERR_ZERO_SIZE_OBJECT, HTTP_SERVICE_UNAVAILABLE, gopherState->fwd->request));
- comm_close(fd);
+ gopherState->serverConn->close();
do_next_read = 0;
} else if (len == 0) {
/* Connection closed; retrieval done. */
gopherEndHTML(gopherState);
entry->timestampsSet();
-
entry->flush();
-
gopherState->fwd->complete();
-
- comm_close(fd);
-
+ gopherState->serverConn->close();
do_next_read = 0;
} else {
if (gopherState->conversion != gopher_ds::NORMAL) {
}
if (do_next_read)
- comm_read(fd, buf, read_sz, gopherReadReply, gopherState);
+ comm_read(conn->fd, buf, read_sz, gopherReadReply, gopherState);
return;
}
* This will be called when request write is complete. Schedule read of reply.
*/
static void
-gopherSendComplete(int fd, char *buf, size_t size, comm_err_t errflag, int xerrno, void *data)
+gopherSendComplete(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t errflag, int xerrno, void *data)
{
GopherStateData *gopherState = (GopherStateData *) data;
StoreEntry *entry = gopherState->entry;
- debugs(10, 5, "gopherSendComplete: FD " << fd << " size: " << size << " errflag: " << errflag);
+ debugs(10, 5, HERE << conn << " size: " << size << " errflag: " << errflag);
if (size > 0) {
- fd_bytes(fd, size, FD_WRITE);
+ fd_bytes(conn->fd, size, FD_WRITE);
kb_incr(&statCounter.server.all.kbytes_out, size);
kb_incr(&statCounter.server.other.kbytes_out, size);
}
err->port = gopherState->fwd->request->port;
err->url = xstrdup(entry->url());
gopherState->fwd->fail(err);
- comm_close(fd);
+ gopherState->serverConn->close();
if (buf)
memFree(buf, MEM_4K_BUF); /* Allocated by gopherSendRequest. */
/* Schedule read reply. */
AsyncCall::Pointer call = commCbCall(10,5, "gopherReadReply",
CommIoCbPtrFun(gopherReadReply, gopherState));
- entry->delayAwareRead(fd, gopherState->replybuf, BUFSIZ, call);
+ entry->delayAwareRead(conn->fd, gopherState->replybuf, BUFSIZ, call);
if (buf)
memFree(buf, MEM_4K_BUF); /* Allocated by gopherSendRequest. */
snprintf(buf, 4096, "%s\r\n", gopherState->request);
}
- debugs(10, 5, "gopherSendRequest: FD " << fd);
- comm_write(fd, buf, strlen(buf), gopherSendComplete, gopherState, NULL);
+ debugs(10, 5, HERE << gopherState->serverConn);
+ comm_write(gopherState->serverConn, buf, strlen(buf), gopherSendComplete, gopherState, NULL);
if (EBIT_TEST(gopherState->entry->flags, ENTRY_CACHABLE))
gopherState->entry->setPublicKey(); /* Make it public */
return;
}
- gopherState->fd = fwd->serverConnection()->fd; // TODO: save the serverConnection() in gopher instead of the FD
- gopherState->fwd = fwd;
+ gopherState->serverConn = fwd->serverConnection();
gopherSendRequest(fwd->serverConnection()->fd, gopherState);
commSetTimeout(fwd->serverConnection()->fd, Config.Timeout.read, gopherTimeout, gopherState);
}
*/
#include "squid.h"
+#include "comm.h"
+#include "comm/Connection.h"
#include "helper.h"
+#include "MemBuf.h"
#include "SquidMath.h"
#include "SquidTime.h"
#include "Store.h"
-#include "comm.h"
-#include "MemBuf.h"
#include "wordlist.h"
#define HELPER_MAX_ARGS 64
CBDATA_CLASS_INIT(statefulhelper);
CBDATA_TYPE(helper_stateful_server);
+void
+HelperServerBase::closePipesSafely()
+{
+#ifdef _SQUID_MSWIN_
+ int no = index + 1;
+
+ shutdown(writePipe->fd, SD_BOTH);
+#endif
+
+ flags.closing = 1;
+ if (readPipe->fd == writePipe->fd)
+ readPipe->fd = -1;
+ else
+ readPipe->close();
+ writePipe->close();
+
+#ifdef _SQUID_MSWIN_
+ if (hIpc) {
+ if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
+ getCurrentTime();
+ debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
+ " #" << no << " (" << hlp->cmdline->key << "," <<
+ (long int)pid << ") didn't exit in 5 seconds");
+ }
+ CloseHandle(hIpc);
+ }
+#endif
+}
+
+void
+HelperServerBase::closeWritePipeSafely()
+{
+#ifdef _SQUID_MSWIN_
+ int no = index + 1;
+
+ shutdown(writePipe->fd, (readPipe->fd == writePipe->fd ? SD_BOTH : SD_SEND));
+#endif
+
+ flags.closing = 1;
+ if (readPipe->fd == writePipe->fd)
+ readPipe->fd = -1;
+ writePipe->close();
+
+#ifdef _SQUID_MSWIN_
+ if (hIpc) {
+ if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
+ getCurrentTime();
+ debugs(84, DBG_IMPORTANT, "WARNING: " << hlp->id_name <<
+ " #" << no << " (" << hlp->cmdline->key << "," <<
+ (long int)pid << ") didn't exit in 5 seconds");
+ }
+ CloseHandle(hIpc);
+ }
+#endif
+}
+
void
helperOpenServers(helper * hlp)
{
srv->pid = pid;
srv->index = k;
srv->addr = hlp->addr;
- srv->rfd = rfd;
- srv->wfd = wfd;
+ srv->readPipe = new Comm::Connection;
+ srv->readPipe->fd = rfd;
+ srv->writePipe = new Comm::Connection;
+ srv->writePipe->fd = wfd;
srv->rbuf = (char *)memAllocBuf(BUF_8KB, &srv->rbuf_sz);
srv->wqueue = new MemBuf;
srv->roffset = 0;
comm_add_close_handler(rfd, helperServerFree, srv);
- comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
+ comm_read(srv->readPipe->fd, srv->rbuf, srv->rbuf_sz - 1, helperHandleRead, srv);
}
hlp->last_restart = squid_curtime;
srv->stats.releases = 0;
srv->index = k;
srv->addr = hlp->addr;
- srv->rfd = rfd;
- srv->wfd = wfd;
+ srv->readPipe = new Comm::Connection;
+ srv->readPipe->fd = rfd;
+ srv->writePipe = new Comm::Connection;
+ srv->writePipe->fd = wfd;
srv->rbuf = (char *)memAllocBuf(BUF_8KB, &srv->rbuf_sz);
srv->roffset = 0;
srv->parent = cbdataReference(hlp);
comm_add_close_handler(rfd, helperStatefulServerFree, srv);
- comm_read(srv->rfd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
+ comm_read(srv->readPipe->fd, srv->rbuf, srv->rbuf_sz - 1, helperStatefulHandleRead, srv);
}
hlp->last_restart = squid_curtime;
double tt = 0.001 * (srv->requests[0] ? tvSubMsec(srv->requests[0]->dispatch_time, current_time) : tvSubMsec(srv->dispatch_time, srv->answer_time));
storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c\t%7.3f\t%7d\t%s\n",
srv->index + 1,
- srv->rfd,
+ srv->readPipe->fd,
srv->pid,
srv->stats.uses,
srv->stats.pending ? 'B' : ' ',
double tt = 0.001 * tvSubMsec(srv->dispatch_time, srv->flags.busy ? current_time : srv->answer_time);
storeAppendPrintf(sentry, "%7d\t%7d\t%7d\t%11d\t%c%c%c%c%c\t%7.3f\t%7d\t%s\n",
srv->index + 1,
- srv->rfd,
+ srv->readPipe->fd,
srv->pid,
srv->stats.uses,
srv->flags.busy ? 'B' : ' ',
helperShutdown(helper * hlp)
{
dlink_node *link = hlp->servers.head;
-#ifdef _SQUID_MSWIN_
-
- HANDLE hIpc;
- pid_t pid;
- int no;
-#endif
while (link) {
helper_server *srv;
continue;
}
- srv->flags.closing = 1;
-#ifdef _SQUID_MSWIN_
-
- hIpc = srv->hIpc;
- pid = srv->pid;
- no = srv->index + 1;
- shutdown(srv->wfd, SD_BOTH);
-#endif
-
debugs(84, 3, "helperShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
/* the rest of the details is dealt with in the helperServerFree
* close handler
*/
- comm_close(srv->rfd);
-#ifdef _SQUID_MSWIN_
-
- if (hIpc) {
- if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
- getCurrentTime();
- debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
- " #" << no << " (" << hlp->cmdline->key << "," <<
- (long int)pid << ") didn't exit in 5 seconds");
-
- }
-
- CloseHandle(hIpc);
- }
-
-#endif
-
+ srv->closePipesSafely();
}
}
{
dlink_node *link = hlp->servers.head;
helper_stateful_server *srv;
-#ifdef _SQUID_MSWIN_
-
- HANDLE hIpc;
- pid_t pid;
- int no;
-#endif
while (link) {
srv = (helper_stateful_server *)link->data;
}
}
- srv->flags.closing = 1;
-#ifdef _SQUID_MSWIN_
-
- hIpc = srv->hIpc;
- pid = srv->pid;
- no = srv->index + 1;
- shutdown(srv->wfd, SD_BOTH);
-#endif
-
debugs(84, 3, "helperStatefulShutdown: " << hlp->id_name << " #" << srv->index + 1 << " shutting down.");
/* the rest of the details is dealt with in the helperStatefulServerFree
* close handler
*/
- comm_close(srv->rfd);
-#ifdef _SQUID_MSWIN_
-
- if (hIpc) {
- if (WaitForSingleObject(hIpc, 5000) != WAIT_OBJECT_0) {
- getCurrentTime();
- debugs(84, 1, "helperShutdown: WARNING: " << hlp->id_name <<
- " #" << no << " (" << hlp->cmdline->key << "," <<
- (long int)pid << ") didn't exit in 5 seconds");
- }
-
- CloseHandle(hIpc);
- }
-
-#endif
-
+ srv->closePipesSafely();
}
}
safe_free(srv->requests);
- if (srv->wfd != srv->rfd && srv->wfd != -1)
- comm_close(srv->wfd);
+ if (Comm::IsConnOpen(srv->writePipe))
+ srv->closeWritePipeSafely();
dlinkDelete(&srv->link, &hlp->servers);
}
/* TODO: walk the local queue of requests and carry them all out */
- if (srv->wfd != srv->rfd && srv->wfd != -1)
- comm_close(srv->wfd);
+ if (Comm::IsConnOpen(srv->writePipe))
+ srv->closeWritePipeSafely();
dlinkDelete(&srv->link, &hlp->servers);
static void
-helperHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+helperHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
char *t = NULL;
helper_server *srv = (helper_server *)data;
return;
}
- assert(fd == srv->rfd);
+ assert(conn->fd == srv->readPipe->fd);
debugs(84, 5, "helperHandleRead: " << len << " bytes from " << hlp->id_name << " #" << srv->index + 1);
if (flag != COMM_OK || len <= 0) {
if (len < 0)
- debugs(84, 1, "helperHandleRead: FD " << fd << " read: " << xstrerror());
-
- comm_close(fd);
+ debugs(84, 1, "helperHandleRead: FD " << conn->fd << " read: " << xstrerror());
+ srv->closePipesSafely();
return;
}
if (!srv->flags.shutdown) {
helperKickQueue(hlp);
} else if (!srv->flags.closing && !srv->stats.pending) {
- int wfd = srv->wfd;
- srv->wfd = -1;
- if (srv->rfd == wfd)
- srv->rfd = -1;
- srv->flags.closing=1;
- comm_close(wfd);
+ srv->closeWritePipeSafely();
return;
}
}
- if (srv->rfd != -1)
- comm_read(fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
+ if (Comm::IsConnOpen(srv->readPipe))
+ comm_read(srv->readPipe->fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1, helperHandleRead, srv);
}
static void
-helperStatefulHandleRead(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+helperStatefulHandleRead(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
char *t = NULL;
helper_stateful_server *srv = (helper_stateful_server *)data;
return;
}
- assert(fd == srv->rfd);
+ assert(conn->fd == srv->readPipe->fd);
debugs(84, 5, "helperStatefulHandleRead: " << len << " bytes from " <<
hlp->id_name << " #" << srv->index + 1);
if (flag != COMM_OK || len <= 0) {
if (len < 0)
- debugs(84, 1, "helperStatefulHandleRead: FD " << fd << " read: " << xstrerror());
-
- comm_close(fd);
+ debugs(84, 1, "helperStatefulHandleRead: FD " << conn->fd << " read: " << xstrerror());
+ srv->closePipesSafely();
return;
}
helperStatefulReleaseServer(srv);
}
- if (srv->rfd != -1)
- comm_read(srv->rfd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
+ if (Comm::IsConnOpen(srv->readPipe))
+ comm_read(srv->readPipe->fd, srv->rbuf + srv->roffset, srv->rbuf_sz - srv->roffset - 1,
helperStatefulHandleRead, srv);
}
static void
-helperDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+helperDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
helper_server *srv = (helper_server *)data;
srv->writebuf = srv->wqueue;
srv->wqueue = new MemBuf;
srv->flags.writing = 1;
- comm_write(srv->wfd,
+ comm_write(srv->writePipe,
srv->writebuf->content(),
srv->writebuf->contentSize(),
helperDispatchWriteDone, /* Handler */
srv->writebuf = srv->wqueue;
srv->wqueue = new MemBuf;
srv->flags.writing = 1;
- comm_write(srv->wfd,
+ comm_write(srv->writePipe,
srv->writebuf->content(),
srv->writebuf->contentSize(),
helperDispatchWriteDone, /* Handler */
}
static void
-helperStatefulDispatchWriteDone(int fd, char *buf, size_t len, comm_err_t flag,
+helperStatefulDispatchWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag,
int xerrno, void *data)
{
/* nothing! */
srv->flags.reserved = 1;
srv->request = r;
srv->dispatch_time = current_time;
- comm_write(srv->wfd,
+ comm_write(srv->writePipe,
r->buf,
strlen(r->buf),
helperStatefulDispatchWriteDone, /* Handler */
if (!srv->flags.shutdown) {
helperStatefulKickQueue(srv->parent);
} else if (!srv->flags.closing && !srv->flags.reserved && !srv->flags.busy) {
- int wfd = srv->wfd;
- srv->wfd = -1;
- if (srv->rfd == wfd)
- srv->rfd = -1;
- srv->flags.closing=1;
- comm_close(wfd);
+ srv->closeWritePipeSafely();
return;
}
}
#include "squid.h"
#include "cbdata.h"
-#include "ip/Address.h"
+#include "comm/forward.h"
#include "HelperChildConfig.h"
+#include "ip/Address.h"
class helper_request;
-typedef struct _helper_flags helper_flags;
-
-typedef struct _helper_stateful_flags helper_stateful_flags;
-
typedef void HLPSCB(void *, void *lastserver, char *buf);
class helper
*/
class HelperServerBase
{
+public:
+ /** Closes pipes to the helper safely.
+ * Handles the case where the read and write pipes are the same FD.
+ */
+ void closePipesSafely();
+
+ /** Closes the reading pipe.
+ * If the read and write sockets are the same the write pipe will
+ * also be closed. Otherwise its left open for later handling.
+ */
+ void closeWritePipeSafely();
+
public:
int index;
int pid;
Ip::Address addr;
- int rfd;
- int wfd;
+ Comm::ConnectionPointer readPipe;
+ Comm::ConnectionPointer writePipe;
void *hIpc;
char *rbuf;
struct timeval answer_time;
dlink_node link;
+
+ struct _helper_flags {
+ unsigned int busy:1;
+ unsigned int writing:1;
+ unsigned int closing:1;
+ unsigned int shutdown:1;
+ unsigned int reserved:1;
+ } flags;
+
};
class helper_server : public HelperServerBase
helper *parent;
helper_request **requests;
- struct _helper_flags {
- unsigned int writing:1;
- unsigned int closing:1;
- unsigned int shutdown:1;
- } flags;
-
struct {
int uses;
unsigned int pending;
statefulhelper *parent;
helper_stateful_request *request;
- struct _helper_stateful_flags {
- unsigned int busy:1;
- unsigned int closing:1;
- unsigned int shutdown:1;
- unsigned int reserved:1;
- } flags;
-
struct {
int uses;
int submits;
public Ipc::StartListeningCb
{
public:
- typedef void (*Handler)(int fd, int errNo);
+ typedef void (*Handler)(int errNo);
HtcpListeningStartedDialer(Handler aHandler): handler(aHandler) {}
virtual void print(std::ostream &os) const { startPrint(os) << ')'; }
-
virtual bool canDial(AsyncCall &) const { return true; }
- virtual void dial(AsyncCall &) { (handler)(fd, errNo); }
+ virtual void dial(AsyncCall &) { (handler)(errNo); }
public:
Handler handler;
RR_RESPONSE
};
-static void htcpIncomingConnectionOpened(int fd, int errNo);
+static void htcpIncomingConnectionOpened(int errNo);
static uint32_t msg_id_counter = 0;
-static int htcpInSocket = -1;
-static int htcpOutSocket = -1;
+static Comm::ConnectionPointer htcpOutgoingConn = NULL;
+static Comm::ConnectionPointer htcpIncomingConn = NULL;
#define N_QUERIED_KEYS 8192
static uint32_t queried_id[N_QUERIED_KEYS];
static cache_key queried_keys[N_QUERIED_KEYS][SQUID_MD5_DIGEST_LENGTH];
}
static void
-
htcpSend(const char *buf, int len, Ip::Address &to)
{
- int x;
-
- debugs(31, 3, "htcpSend: " << to );
+ debugs(31, 3, HERE << to);
htcpHexdump("htcpSend", buf, len);
- x = comm_udp_sendto(htcpOutSocket,
- to,
- buf,
- len);
-
- if (x < 0)
- debugs(31, 3, "htcpSend: FD " << htcpOutSocket << " sendto: " << xstrerror());
+ if (comm_udp_sendto(htcpOutgoingConn->fd, to, buf, len) < 0)
+ debugs(31, 3, HERE << htcpOutgoingConn << " sendto: " << xstrerror());
else
statCounter.htcp.pkts_sent++;
}
*/
void
-
htcpSpecifier::setFrom(Ip::Address &aSocket)
{
from = aSocket;
htcpInit(void)
{
if (Config.Port.htcp <= 0) {
- debugs(31, 1, "HTCP Disabled.");
+ debugs(31, DBG_IMPORTANT, "HTCP Disabled.");
return;
}
- Ip::Address incomingAddr = Config.Addrs.udp_incoming;
- incomingAddr.SetPort(Config.Port.htcp);
+ htcpIncomingConn = new Comm::Connection;
+ htcpIncomingConn->local = Config.Addrs.udp_incoming;
+ htcpIncomingConn->local.SetPort(Config.Port.htcp);
- if (!Ip::EnableIpv6 && !incomingAddr.SetIPv4()) {
- debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << incomingAddr << " is not an IPv4 address.");
+ if (!Ip::EnableIpv6 && !htcpIncomingConn->local.SetIPv4()) {
+ debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << htcpIncomingConn->local << " is not an IPv4 address.");
fatal("HTCP port cannot be opened.");
}
/* split-stack for now requires default IPv4-only HTCP */
- if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && incomingAddr.IsAnyAddr()) {
- incomingAddr.SetIPv4();
+ if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && htcpIncomingConn->local.IsAnyAddr()) {
+ htcpIncomingConn->local.SetIPv4();
}
AsyncCall::Pointer call = asyncCall(31, 2,
Ipc::StartListening(SOCK_DGRAM,
IPPROTO_UDP,
- incomingAddr,
- COMM_NONBLOCKING,
- Ipc::fdnInHtcpSocket, call);
+ htcpIncomingConn,
+ Ipc::fdnInHtcpSocket, call, Subscription::Pointer());
if (!Config.Addrs.udp_outgoing.IsNoAddr()) {
- Ip::Address outgoingAddr = Config.Addrs.udp_outgoing;
- outgoingAddr.SetPort(Config.Port.htcp);
+ htcpOutgoingConn = new Comm::Connection;
+ htcpOutgoingConn->local = Config.Addrs.udp_outgoing;
+ htcpOutgoingConn->local.SetPort(Config.Port.htcp);
- if (!Ip::EnableIpv6 && !outgoingAddr.SetIPv4()) {
- debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << outgoingAddr << " is not an IPv4 address.");
+ if (!Ip::EnableIpv6 && !htcpOutgoingConn->local.SetIPv4()) {
+ debugs(31, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << htcpOutgoingConn->local << " is not an IPv4 address.");
fatal("HTCP port cannot be opened.");
}
/* split-stack for now requires default IPv4-only HTCP */
- if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && outgoingAddr.IsAnyAddr()) {
- outgoingAddr.SetIPv4();
+ if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && htcpOutgoingConn->local.IsAnyAddr()) {
+ htcpOutgoingConn->local.SetIPv4();
}
enter_suid();
- htcpOutSocket = comm_open_listener(SOCK_DGRAM,
- IPPROTO_UDP,
- outgoingAddr,
- COMM_NONBLOCKING,
- "Outgoing HTCP Socket");
+ comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, htcpOutgoingConn, "Outgoing HTCP Socket");
leave_suid();
- if (htcpOutSocket < 0)
+ if (!Comm::IsConnOpen(htcpOutgoingConn))
fatal("Cannot open Outgoing HTCP Socket");
- commSetSelect(htcpOutSocket, COMM_SELECT_READ, htcpRecv, NULL, 0);
+ commSetSelect(htcpOutgoingConn->fd, COMM_SELECT_READ, htcpRecv, NULL, 0);
- debugs(31, 1, "Outgoing HTCP messages on port " << Config.Port.htcp << ", FD " << htcpOutSocket << ".");
-
- fd_note(htcpInSocket, "Incoming HTCP socket");
+ debugs(31, DBG_IMPORTANT, "Sending HTCP messages from " << htcpOutgoingConn->local);
}
if (!htcpDetailPool) {
}
static void
-htcpIncomingConnectionOpened(int fd, int errNo)
+htcpIncomingConnectionOpened(int)
{
- htcpInSocket = fd;
-
- if (htcpInSocket < 0)
+ if (!Comm::IsConnOpen(htcpIncomingConn))
fatal("Cannot open HTCP Socket");
- commSetSelect(htcpInSocket, COMM_SELECT_READ, htcpRecv, NULL, 0);
+ commSetSelect(htcpIncomingConn->fd, COMM_SELECT_READ, htcpRecv, NULL, 0);
- debugs(31, 1, "Accepting HTCP messages on port " << Config.Port.htcp << ", FD " << htcpInSocket << ".");
+ debugs(31, DBG_CRITICAL, "Accepting HTCP messages on " << htcpIncomingConn->local);
- if (Config.Addrs.udp_outgoing.IsNoAddr())
- htcpOutSocket = htcpInSocket;
+ if (Config.Addrs.udp_outgoing.IsNoAddr()) {
+ htcpOutgoingConn = htcpIncomingConn;
+ debugs(31, DBG_IMPORTANT, "Sending HTCP messages from " << htcpOutgoingConn->local);
+ }
}
int
MemBuf mb;
http_state_flags flags;
- if (htcpInSocket < 0)
+ if (!Comm::IsConnOpen(htcpIncomingConn))
return 0;
old_squid_format = p->options.htcp_oldsquid;
MemBuf mb;
http_state_flags flags;
- if (htcpInSocket < 0)
+ if (!Comm::IsConnOpen(htcpIncomingConn))
return;
old_squid_format = p->options.htcp_oldsquid;
void
htcpSocketShutdown(void)
{
- if (htcpInSocket < 0)
+ if (!Comm::IsConnOpen(htcpIncomingConn))
return;
- if (htcpInSocket != htcpOutSocket) {
- debugs(12, 1, "FD " << htcpInSocket << " Closing HTCP socket");
- comm_close(htcpInSocket);
- }
-
+ debugs(12, DBG_IMPORTANT, "Stop accepting HTCP on " << htcpIncomingConn->local);
/*
- * Here we set 'htcpInSocket' to -1 even though the HTCP 'in'
+ * Here we just unlink htcpIncomingConn because the HTCP 'in'
* and 'out' sockets might be just one FD. This prevents this
* function from executing repeatedly. When we are really ready to
* exit or restart, main will comm_close the 'out' descriptor.
*/
- htcpInSocket = -1;
+ htcpIncomingConn = NULL;
/*
* Normally we only write to the outgoing HTCP socket, but
/* XXX Don't we need this handler to read replies while shutting down?
* I think there should be a separate hander for reading replies..
*/
- assert(htcpOutSocket > -1);
+ assert(Comm::IsConnOpen(htcpOutgoingConn));
- commSetSelect(htcpOutSocket, COMM_SELECT_READ, NULL, NULL, 0);
+ commSetSelect(htcpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0);
}
void
{
htcpSocketShutdown();
- if (htcpOutSocket > -1) {
- debugs(12, 1, "FD " << htcpOutSocket << " Closing HTCP socket");
- comm_close(htcpOutSocket);
- htcpOutSocket = -1;
+ if (htcpOutgoingConn != NULL) {
+ debugs(12, DBG_IMPORTANT, "Stop sending HTCP from " << htcpOutgoingConn->local);
+ htcpOutgoingConn = NULL;
}
}
request->peer_host=_peer?_peer->host:NULL;
buildRequestPrefix(request, orig_request, entry, &mb, flags);
debugs(11, 6, HERE << serverConnection << ":\n" << mb.buf);
- comm_write_mbuf(serverConnection->fd, &mb, requestSender);
+ comm_write_mbuf(serverConnection, &mb, requestSender);
return true;
}
typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
requestSender = JobCallback(11,5,
Dialer, this, HttpStateData::wroteLast);
- comm_write(serverConnection->fd, "\r\n", 2, requestSender);
+ comm_write(serverConnection, "\r\n", 2, requestSender);
return true;
#else
return false;
flags.sentLastChunk = true;
typedef CommCbMemFunT<HttpStateData, CommIoCbParams> Dialer;
- requestSender = JobCallback(11,5,
- Dialer, this, HttpStateData::wroteLast);
- comm_write(serverConnection->fd, "0\r\n\r\n", 5, requestSender);
+ requestSender = JobCallback(11,5, Dialer, this, HttpStateData::wroteLast);
+ comm_write(serverConnection, "0\r\n\r\n", 5, requestSender);
return true;
}
public Ipc::StartListeningCb
{
public:
- typedef void (*Handler)(int fd, int errNo, Ip::Address& addr);
- IcpListeningStartedDialer(Handler aHandler, Ip::Address& anAddr):
- handler(aHandler), addr(anAddr) {}
-
- virtual void print(std::ostream &os) const {
- startPrint(os) <<
- ", address=" << addr << ')';
- }
+ typedef void (*Handler)(int errNo);
+ IcpListeningStartedDialer(Handler aHandler):
+ handler(aHandler) {}
+ virtual void print(std::ostream &os) const { startPrint(os) << ')'; }
virtual bool canDial(AsyncCall &) const { return true; }
- virtual void dial(AsyncCall &) { (handler)(fd, errNo, addr); }
+ virtual void dial(AsyncCall &) { (handler)(errNo); }
public:
Handler handler;
- Ip::Address addr;
};
-static void icpIncomingConnectionOpened(int fd, int errNo, Ip::Address& addr);
+static void icpIncomingConnectionOpened(int errNo);
/// \ingroup ServerProtocolICPInternal2
static void icpLogIcp(const Ip::Address &, log_type, int, const char *, int);
static icpUdpData *IcpQueueTail = NULL;
/// \ingroup ServerProtocolICPInternal2
-Ip::Address theOutICPAddr;
+Comm::ConnectionPointer icpIncomingConn = NULL;
+/// \ingroup ServerProtocolICPInternal2
+Comm::ConnectionPointer icpOutgoingConn = NULL;
+
+/** \ingroup ServerProtocolICPInternal2
+ * ICP v2 uses the outgoing address as host ID.
+ * NP: this *may* be identical to icpOutgoingConn->local
+ * but when IN/OUT sockets are shared we can't guarantee that
+ * so a separate variable is used for now.
+ *
+ * We have one for private use (sent only by this local cache)
+ * and one for public use (for external caches to contact us)
+ */
+Ip::Address theIcpPrivateHostID;
+
+/// \see theIcpPrivateHostID
+Ip::Address theIcpPublicHostID;
+
/* icp_common_t */
_icp_common_t::_icp_common_t() : opcode(ICP_INVALID), version(0), length(0), reqnum(0), flags(0), pad(0), shostid(0)
headerp->pad = htonl(pad);
- theOutICPAddr.GetInAddr( *((struct in_addr*)&headerp->shostid) );
+ theIcpPrivateHostID.GetInAddr( *((struct in_addr*)&headerp->shostid) );
urloffset = buf + sizeof(icp_common_t);
icpConnectionsOpen(void)
{
uint16_t port;
- Ip::Address addr;
-
- struct addrinfo *xai = NULL;
- int x;
+// Ip::Address addr;
if ((port = Config.Port.icp) <= 0)
return;
- addr = Config.Addrs.udp_incoming;
- addr.SetPort(port);
+ icpIncomingConn = new Comm::Connection;
+ icpIncomingConn->local = Config.Addrs.udp_incoming;
+ icpIncomingConn->local.SetPort(port);
- if (!Ip::EnableIpv6 && !addr.SetIPv4()) {
- debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << addr << " is not an IPv4 address.");
+ if (!Ip::EnableIpv6 && !icpIncomingConn->local.SetIPv4()) {
+ debugs(12, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpIncomingConn->local << " is not an IPv4 address.");
fatal("ICP port cannot be opened.");
}
/* split-stack for now requires default IPv4-only ICP */
- if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && addr.IsAnyAddr()) {
- addr.SetIPv4();
+ if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpIncomingConn->local.IsAnyAddr()) {
+ icpIncomingConn->local.SetIPv4();
}
AsyncCall::Pointer call = asyncCall(12, 2,
"icpIncomingConnectionOpened",
- IcpListeningStartedDialer(&icpIncomingConnectionOpened, addr));
+ IcpListeningStartedDialer(&icpIncomingConnectionOpened));
Ipc::StartListening(SOCK_DGRAM,
IPPROTO_UDP,
- addr,
- COMM_NONBLOCKING,
- Ipc::fdnInIcpSocket, call);
+ icpIncomingConn,
+ Ipc::fdnInIcpSocket, call, Subscription::Pointer());
- addr.SetEmpty(); // clear for next use.
- addr = Config.Addrs.udp_outgoing;
- if ( !addr.IsNoAddr() ) {
- enter_suid();
- addr.SetPort(port);
+ if ( !Config.Addrs.udp_outgoing.IsNoAddr() ) {
+ icpOutgoingConn = new Comm::Connection;
+ icpOutgoingConn->local = Config.Addrs.udp_outgoing;
+ icpOutgoingConn->local.SetPort(port);
- if (!Ip::EnableIpv6 && !addr.SetIPv4()) {
- debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << addr << " is not an IPv4 address.");
+ if (!Ip::EnableIpv6 && !icpOutgoingConn->local.SetIPv4()) {
+ debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << icpOutgoingConn->local << " is not an IPv4 address.");
fatal("ICP port cannot be opened.");
}
/* split-stack for now requires default IPv4-only ICP */
- if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && addr.IsAnyAddr()) {
- addr.SetIPv4();
+ if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && icpOutgoingConn->local.IsAnyAddr()) {
+ icpOutgoingConn->local.SetIPv4();
}
- theOutIcpConnection = comm_open_listener(SOCK_DGRAM,
- IPPROTO_UDP,
- addr,
- COMM_NONBLOCKING,
- "ICP Port");
+ enter_suid();
+ comm_open_listener(SOCK_DGRAM, IPPROTO_UDP, icpOutgoingConn, "Outgoing ICP Port");
leave_suid();
- if (theOutIcpConnection < 0)
+ if (!Comm::IsConnOpen(icpOutgoingConn))
fatal("Cannot open Outgoing ICP Port");
- commSetSelect(theOutIcpConnection,
- COMM_SELECT_READ,
- icpHandleUdp,
- NULL,
- 0);
+ debugs(12, DBG_CRITICAL, "Sending ICP messages from " << icpOutgoingConn->local);
- debugs(12, 1, "Outgoing ICP messages on port " << addr.GetPort() << ", FD " << theOutIcpConnection << ".");
-
- fd_note(theOutIcpConnection, "Outgoing ICP socket");
+ commSetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
}
- theOutICPAddr.SetEmpty();
-
- theOutICPAddr.InitAddrInfo(xai);
-
- x = getsockname(theOutIcpConnection, xai->ai_addr, &xai->ai_addrlen);
-
- if (x < 0)
- debugs(50, 1, "theOutIcpConnection FD " << theOutIcpConnection << ": getsockname: " << xstrerror());
+ // Ensure that we have the IP address(es) to use for Host ID.
+ // The outgoing address is used as 'private' host ID used only on packets we send
+ struct addrinfo *xai = NULL;
+ theIcpPrivateHostID.InitAddrInfo(xai);
+ if (getsockname(icpOutgoingConn->fd, xai->ai_addr, &xai->ai_addrlen) < 0)
+ debugs(50, DBG_IMPORTANT, "ERROR: Unable to identify ICP host ID to use for " << icpOutgoingConn
+ << ": getsockname: " << xstrerror());
else
- theOutICPAddr = *xai;
-
- theOutICPAddr.FreeAddrInfo(xai);
+ theIcpPrivateHostID = *xai;
+ theIcpPrivateHostID.FreeAddrInfo(xai);
}
static void
-icpIncomingConnectionOpened(int fd, int errNo, Ip::Address& addr)
+icpIncomingConnectionOpened(int errNo)
{
- theInIcpConnection = fd;
-
- if (theInIcpConnection < 0)
+ if (!Comm::IsConnOpen(icpIncomingConn))
fatal("Cannot open ICP Port");
- commSetSelect(theInIcpConnection,
- COMM_SELECT_READ,
- icpHandleUdp,
- NULL,
- 0);
+ commSetSelect(icpIncomingConn->fd, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
for (const wordlist *s = Config.mcast_group_list; s; s = s->next)
- ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL);
+ ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL); // XXX: pass the icpIncomingConn for mcastJoinGroups usage.
- debugs(12, 1, "Accepting ICP messages at " << addr << ", FD " << theInIcpConnection << ".");
+ debugs(12, DBG_IMPORTANT, "Accepting ICP messages on " << icpIncomingConn->local);
- fd_note(theInIcpConnection, "Incoming ICP socket");
+ fd_note(icpIncomingConn->fd, "Incoming ICP port");
- if (Config.Addrs.udp_outgoing.IsNoAddr())
- theOutIcpConnection = theInIcpConnection;
+ if (Config.Addrs.udp_outgoing.IsNoAddr()) {
+ icpOutgoingConn = icpIncomingConn;
+ debugs(12, DBG_IMPORTANT, "Sending ICP messages from " << icpOutgoingConn->local);
+ }
+
+ // Ensure that we have the IP address(es) to use for Host ID.
+ // The listening address is used as 'public' host ID which can be used to contact us
+ struct addrinfo *xai = NULL;
+ theIcpPublicHostID.InitAddrInfo(xai); // reset xai
+ if (getsockname(icpIncomingConn->fd, xai->ai_addr, &xai->ai_addrlen) < 0)
+ debugs(50, DBG_IMPORTANT, "ERROR: Unable to identify ICP host ID to use for " << icpIncomingConn
+ << ": getsockname: " << xstrerror());
+ else
+ theIcpPublicHostID = *xai;
+ theIcpPublicHostID.FreeAddrInfo(xai);
}
/**
void
icpConnectionShutdown(void)
{
- if (theInIcpConnection < 0)
+ if (!Comm::IsConnOpen(icpIncomingConn))
return;
- if (theInIcpConnection != theOutIcpConnection) {
- debugs(12, 1, "FD " << theInIcpConnection << " Closing ICP connection");
- comm_close(theInIcpConnection);
- }
+ debugs(12, DBG_IMPORTANT, "Stop receiving ICP on " << icpIncomingConn->local);
- /**
- * Here we set 'theInIcpConnection' to -1 even though the ICP 'in'
- * and 'out' sockets might be just one FD. This prevents this
- * function from executing repeatedly. When we are really ready to
- * exit or restart, main will comm_close the 'out' descriptor.
+ /** Release the 'in' socket for lazy closure.
+ * in and out sockets may be sharing one same FD.
+ * This prevents this function from executing repeatedly.
*/
- theInIcpConnection = -1;
+ icpIncomingConn = NULL;
/**
* Normally we only write to the outgoing ICP socket, but
* to that specific interface. During shutdown, we must
* disable reading on the outgoing socket.
*/
- assert(theOutIcpConnection > -1);
+ assert(Comm::IsConnOpen(icpOutgoingConn));
- commSetSelect(theOutIcpConnection, COMM_SELECT_READ, NULL, NULL, 0);
+ commSetSelect(icpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0);
}
void
{
icpConnectionShutdown();
- if (theOutIcpConnection > -1) {
- debugs(12, 1, "FD " << theOutIcpConnection << " Closing ICP connection");
- comm_close(theOutIcpConnection);
- theOutIcpConnection = -1;
+ if (icpOutgoingConn != NULL) {
+ debugs(12, DBG_IMPORTANT, "Stop sending ICP from " << icpOutgoingConn->local);
+ icpOutgoingConn = NULL;
}
}
char buf[4096];
} IdentStateData;
-// TODO: make these all a series of Async jobs. They are self-contained callbacks now.
+// TODO: make these all a series of Async job calls. They are self-contained callbacks now.
static IOCB ReadReply;
static PF Close;
static PF Timeout;
mb.Printf("%d, %d\r\n",
conn->remote.GetPort(),
conn->local.GetPort());
- comm_write_mbuf(conn->fd, &mb, NULL, state);
+ comm_write_mbuf(conn, &mb, NULL, state);
comm_read(conn->fd, state->buf, BUFSIZ, Ident::ReadReply, state);
commSetTimeout(conn->fd, Ident::TheConfig.timeout, Ident::Timeout, state);
}
void
-Ident::ReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+Ident::ReadReply(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
IdentStateData *state = (IdentStateData *)data;
char *ident = NULL;
char *t = NULL;
assert(buf == state->buf);
- assert(fd == state->conn->fd);
+ assert(conn->fd == state->conn->fd);
if (flag != COMM_OK || len <= 0) {
state->conn->close();
if ((t = strchr(buf, '\n')))
*t = '\0';
- debugs(30, 5, HERE << "FD " << fd << ": Read '" << buf << "'");
+ debugs(30, 5, HERE << conn << ": Read '" << buf << "'");
if (strstr(buf, "USERID")) {
if ((ident = strrchr(buf, ':'))) {
#include "config.h"
+#include "base/Subscription.h"
#include "comm.h"
+#include "comm/Connection.h"
#include "ipc/Coordinator.h"
#include "ipc/FdNotes.h"
#include "ipc/SharedListen.h"
" needs shared listen FD for " << request.params.addr);
Listeners::const_iterator i = listeners.find(request.params);
int errNo = 0;
- const int sock = (i != listeners.end()) ?
+ const Comm::ConnectionPointer c = (i != listeners.end()) ?
i->second : openListenSocket(request, errNo);
- debugs(54, 3, HERE << "sending shared listen FD " << sock << " for " <<
+ debugs(54, 3, HERE << "sending shared listen " << c << " for " <<
request.params.addr << " to kid" << request.requestorId <<
" mapId=" << request.mapId);
- SharedListenResponse response(sock, errNo, request.mapId);
+ SharedListenResponse response(c, errNo, request.mapId);
TypedMsgHdr message;
response.pack(message);
SendMessage(MakeAddr(strandAddrPfx, request.requestorId), message);
}
-int
+Comm::ConnectionPointer
Ipc::Coordinator::openListenSocket(const SharedListenRequest& request,
int &errNo)
{
debugs(54, 6, HERE << "opening listen FD at " << p.addr << " for kid" <<
request.requestorId);
- Ip::Address addr = p.addr; // comm_open_listener may modify it
+ Comm::ConnectionPointer conn = new Comm::Connection;
+ conn->local = p.addr; // comm_open_listener may modify it
+ conn->flags = p.flags;
enter_suid();
- const int sock = comm_open_listener(p.sock_type, p.proto, addr, p.flags,
- FdNote(p.fdNote));
- errNo = (sock >= 0) ? 0 : errno;
+ comm_open_listener(p.sock_type, p.proto, conn, FdNote(p.fdNote));
+ errNo = Comm::IsConnOpen(conn) ? 0 : errno;
leave_suid();
+ debugs(54, 6, HERE << "tried listening on " << conn << " for kid" <<
+ request.requestorId);
+
// cache positive results
- if (sock >= 0)
- listeners[request.params] = sock;
+ if (Comm::IsConnOpen(conn))
+ listeners[request.params] = conn;
- return sock;
+ return conn;
}
void Ipc::Coordinator::broadcastSignal(int sig) const
void handleSharedListenRequest(const SharedListenRequest& request);
/// calls comm_open_listener()
- int openListenSocket(const SharedListenRequest& request, int &errNo);
+ Comm::ConnectionPointer openListenSocket(const SharedListenRequest& request, int &errNo);
private:
typedef Vector<StrandCoord> Strands; ///< unsorted strands
Strands strands; ///< registered processes and threads
- typedef std::map<OpenListenerParams, int> Listeners; ///< params:fd map
+ typedef std::map<OpenListenerParams, Comm::ConnectionPointer> Listeners; ///< params:fd map
Listeners listeners; ///< cached comm_open_listener() results
static Coordinator* TheInstance; ///< the only class instance in existence
typedef CommCbMemFunT<Port, CommIoCbParams> Dialer;
AsyncCall::Pointer readHandler = JobCallback(54, 6,
Dialer, this, Port::noteRead);
- comm_read(fd(), buf.raw(), buf.size(), readHandler);
+ comm_read(conn()->fd, buf.raw(), buf.size(), readHandler);
}
bool Ipc::Port::doneAll() const
void Ipc::Port::noteRead(const CommIoCbParams& params)
{
- debugs(54, 6, HERE << "FD " << params.fd << " flag " << params.flag <<
+ debugs(54, 6, HERE << params.conn << " flag " << params.flag <<
" [" << this << ']');
if (params.flag == COMM_OK) {
assert(params.buf == buf.raw());
*/
#include "config.h"
-#include <map>
-#include "comm.h"
#include "base/TextException.h"
+#include "comm.h"
+#include "comm/Connection.h"
#include "ipc/Port.h"
#include "ipc/Messages.h"
#include "ipc/Kids.h"
#include "ipc/StartListening.h"
#include "ipc/SharedListen.h"
+#include <map>
/// holds information necessary to handle JoinListen response
class PendingOpenRequest
}
-Ipc::SharedListenResponse::SharedListenResponse(int aFd, int anErrNo, int aMapId):
- fd(aFd), errNo(anErrNo), mapId(aMapId)
+Ipc::SharedListenResponse::SharedListenResponse(const Comm::ConnectionPointer &c, int anErrNo, int aMapId):
+ conn(c), errNo(anErrNo), mapId(aMapId)
{
}
Ipc::SharedListenResponse::SharedListenResponse(const TypedMsgHdr &hdrMsg):
- fd(-1), errNo(0), mapId(-1)
+ conn(NULL), errNo(0), mapId(-1)
{
hdrMsg.getData(mtSharedListenResponse, this, sizeof(*this));
- fd = hdrMsg.getFd();
+ conn = new Comm::Connection;
+ conn->fd = hdrMsg.getFd();
+ // other conn details are passed in OpenListenerParams and filled out by SharedListenJoin()
}
void Ipc::SharedListenResponse::pack(TypedMsgHdr &hdrMsg) const
{
hdrMsg.putData(mtSharedListenResponse, this, sizeof(*this));
- hdrMsg.putFd(fd);
+ hdrMsg.putFd(conn->fd);
}
void Ipc::SharedListenJoined(const SharedListenResponse &response)
{
- const int fd = response.fd;
+ Comm::ConnectionPointer c = response.conn;
- debugs(54, 3, HERE << "got listening FD " << fd << " errNo=" <<
+ // Dont debugs c fully since only FD is filled right now.
+ debugs(54, 3, HERE << "got listening FD " << c->fd << " errNo=" <<
response.errNo << " mapId=" << response.mapId);
Must(TheSharedListenRequestMap.find(response.mapId) != TheSharedListenRequestMap.end());
Must(por.callback != NULL);
TheSharedListenRequestMap.erase(response.mapId);
- if (fd >= 0) {
+ if (Comm::IsConnOpen(c)) {
OpenListenerParams &p = por.params;
+ c->local = p.addr;
+ c->flags = p.flags;
+ // XXX: leave the comm AI stuff to comm_import_opened()?
struct addrinfo *AI = NULL;
p.addr.GetAddrInfo(AI);
AI->ai_socktype = p.sock_type;
AI->ai_protocol = p.proto;
- comm_import_opened(fd, p.addr, p.flags, FdNote(p.fdNote), AI);
+ comm_import_opened(c, FdNote(p.fdNote), AI);
p.addr.FreeAddrInfo(AI);
}
- StartListeningCb *cbd =
- dynamic_cast<StartListeningCb*>(por.callback->getDialer());
+ StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(por.callback->getDialer());
Must(cbd);
- cbd->fd = fd;
+ cbd->conn = c;
cbd->errNo = response.errNo;
+ cbd->handlerSubscription = por.params.handlerSubscription;
ScheduleCallHere(por.callback);
}
/// "shared listen" is when concurrent processes are listening on the same fd
-/// comm_open_listener() parameters holder
+/// Comm::ConnAcceptor parameters holder
+/// all the details necessary to recreate a Comm::Connection and fde entry for the kid listener FD
class OpenListenerParams
{
public:
bool operator <(const OpenListenerParams &p) const; ///< useful for map<>
+ // bits to re-create the fde entry
int sock_type;
int proto;
+ int fdNote; ///< index into fd_note() comment strings
+
+ // bits to re-create the listener Comm::Connection descriptor
Ip::Address addr; ///< will be memset and memcopied
int flags;
- int fdNote; ///< index into fd_note() comment strings
+
+ /// handler to subscribe to Comm::ConnAcceptor when we get the response
+ Subscription::Pointer handlerSubscription;
};
class TypedMsgHdr;
class SharedListenResponse
{
public:
- SharedListenResponse(int fd, int errNo, int mapId);
+ SharedListenResponse(const Comm::ConnectionPointer &c, int errNo, int mapId);
explicit SharedListenResponse(const TypedMsgHdr &hdrMsg); ///< from recvmsg()
void pack(TypedMsgHdr &hdrMsg) const; ///< prepare for sendmsg()
public:
- int fd; ///< opened listening socket or -1
+ Comm::ConnectionPointer conn; ///< opened listening socket or -1
int errNo; ///< errno value from comm_open_sharedListen() call
int mapId; ///< to map future response to the requestor's callback
};
*/
#include "config.h"
-#include "comm.h"
+#include "base/Subscription.h"
#include "base/TextException.h"
+#include "comm.h"
+#include "comm/ConnAcceptor.h"
+#include "comm/Connection.h"
#include "ipc/SharedListen.h"
#include "ipc/StartListening.h"
-Ipc::StartListeningCb::StartListeningCb(): fd(-1), errNo(0)
+Ipc::StartListeningCb::StartListeningCb(): conn(NULL), errNo(0)
{
}
std::ostream &Ipc::StartListeningCb::startPrint(std::ostream &os) const
{
- return os << "(FD " << fd << ", err=" << errNo;
+ return os << "(" << conn << ", err=" << errNo;
}
-
-void Ipc::StartListening(int sock_type, int proto, Ip::Address &addr,
- int flags, FdNoteId fdNote, AsyncCall::Pointer &callback)
+void
+Ipc::StartListening(int sock_type, int proto, const Comm::ConnectionPointer &listenConn,
+ FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &sub)
{
- OpenListenerParams p;
- p.sock_type = sock_type;
- p.proto = proto;
- p.addr = addr;
- p.flags = flags;
- p.fdNote = fdNote;
-
if (UsingSmp()) { // if SMP is on, share
+ OpenListenerParams p;
+ p.sock_type = sock_type;
+ p.proto = proto;
+ p.addr = listenConn->local;
+ p.flags = listenConn->flags;
+ p.fdNote = fdNote;
+ p.handlerSubscription = sub;
+
Ipc::JoinSharedListen(p, callback);
return; // wait for the call back
}
+ StartListeningCb *cbd = dynamic_cast<StartListeningCb*>(callback->getDialer());
+ Must(cbd);
+ cbd->conn = listenConn;
+
enter_suid();
- const int sock = comm_open_listener(p.sock_type, p.proto, p.addr, p.flags,
- FdNote(p.fdNote));
- const int errNo = (sock >= 0) ? 0 : errno;
+ if (sock_type == SOCK_STREAM) {
+ // TCP: setup the subscriptions such that new connections accepted by listenConn are handled by HTTP
+ AsyncJob::Start(new Comm::ConnAcceptor(cbd->conn, FdNote(fdNote), sub));
+ } else if (sock_type == SOCK_DGRAM) {
+ // UDP: setup the listener socket, but do not set a subscriber
+ Comm::ConnectionPointer udpConn = listenConn;
+ comm_open_listener(sock_type, proto, udpConn, FdNote(fdNote));
+ } else {
+ fatalf("Invalid Socket Type (%d)",sock_type);
+ }
+ cbd->errNo = cbd->conn->isOpen() ? 0 : errno;
leave_suid();
- debugs(54, 3, HERE << "opened listen FD " << sock << " for " << p.addr);
-
- StartListeningCb *cbd =
- dynamic_cast<StartListeningCb*>(callback->getDialer());
- Must(cbd);
- cbd->fd = sock;
- cbd->errNo = errNo;
+ debugs(54, 3, HERE << "opened listen " << cbd->conn);
ScheduleCallHere(callback);
}
#define SQUID_IPC_START_LISTENING_H
#include "config.h"
+#include "base/AsyncCall.h"
+#include "base/Subscription.h"
+#include "comm/forward.h"
#include "ip/forward.h"
#include "ipc/FdNotes.h"
-#include "base/AsyncCall.h"
#if HAVE_IOSFWD
#include <iosfwd>
std::ostream &startPrint(std::ostream &os) const;
public:
- int fd; ///< opened listening socket or -1
+ Comm::ConnectionPointer conn; ///< opened listening socket
int errNo; ///< errno value from the comm_open_listener() call
+ Subscription::Pointer handlerSubscription; ///< The subscription we will pass on to the ConnAcceptor
};
/// Depending on whether SMP is on, either ask Coordinator to send us
-/// the listening FD or call comm_open_listener() directly.
-extern void StartListening(int sock_type, int proto, Ip::Address &addr,
- int flags, FdNoteId fdNote, AsyncCall::Pointer &callback);
+/// the listening FD or start a connection acceptor directly.
+extern void StartListening(int sock_type, int proto, const Comm::ConnectionPointer &listenConn,
+ FdNoteId fdNote, AsyncCall::Pointer &callback, const Subscription::Pointer &handlerSub);
} // namespace Ipc;
*/
#include "config.h"
+#include "base/Subscription.h"
#include "base/TextException.h"
+#include "comm/Connection.h"
#include "ipc/Strand.h"
#include "ipc/Messages.h"
#include "ipc/SharedListen.h"
#include "ipc/Kids.h"
-
CBDATA_NAMESPACED_CLASS_INIT(Ipc, Strand);
Ipc::UdsOp::UdsOp(const String& pathAddr):
AsyncJob("Ipc::UdsOp"),
address(PathToAddress(pathAddr)),
- options(COMM_NONBLOCKING),
- fd_(-1)
+ options(COMM_NONBLOCKING)
{
debugs(54, 5, HERE << '[' << this << "] pathAddr=" << pathAddr);
}
Ipc::UdsOp::~UdsOp()
{
debugs(54, 5, HERE << '[' << this << ']');
- if (fd_ >= 0)
- comm_close(fd_);
+ if (Comm::IsConnOpen(conn_))
+ conn_->close();
+ conn_ = NULL;
}
void Ipc::UdsOp::setOptions(int newOptions)
options = newOptions;
}
-int Ipc::UdsOp::fd()
+Comm::ConnectionPointer &
+Ipc::UdsOp::conn()
{
- if (fd_ < 0) {
+ if (!Comm::IsConnOpen(conn_)) {
if (options & COMM_DOBIND)
unlink(address.sun_path);
- fd_ = comm_open_uds(SOCK_DGRAM, 0, &address, options);
- Must(fd_ >= 0);
+ if (conn_ == NULL)
+ conn_ = new Comm::Connection;
+ conn_->fd = comm_open_uds(SOCK_DGRAM, 0, &address, options);
+ Must(Comm::IsConnOpen(conn_));
}
- return fd_;
+ return conn_;
}
void Ipc::UdsOp::setTimeout(int seconds, const char *handlerName)
typedef CommCbMemFunT<UdsOp, CommTimeoutCbParams> Dialer;
AsyncCall::Pointer handler = asyncCall(54,5, handlerName,
Dialer(CbcPointer<UdsOp>(this), &UdsOp::noteTimeout));
- commSetTimeout(fd(), seconds, handler);
+ commSetTimeout(conn()->fd, seconds, handler);
}
void Ipc::UdsOp::clearTimeout()
{
- commSetTimeout(fd(), -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd)
+ commSetTimeout(conn()->fd, -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd)
}
void Ipc::UdsOp::noteTimeout(const CommTimeoutCbParams &)
typedef CommCbMemFunT<UdsSender, CommIoCbParams> Dialer;
AsyncCall::Pointer writeHandler = JobCallback(54, 5,
Dialer, this, UdsSender::wrote);
- comm_write(fd(), message.raw(), message.size(), writeHandler);
+ comm_write(conn(), message.raw(), message.size(), writeHandler);
writing = true;
}
void Ipc::UdsSender::wrote(const CommIoCbParams& params)
{
- debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
+ debugs(54, 5, HERE << params.conn << " flag " << params.flag << " [" << this << ']');
writing = false;
if (params.flag != COMM_OK && retries-- > 0) {
sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed?
- write(); // XXX: should we close on error so that fd() reopens?
+ write(); // XXX: should we close on error so that conn() reopens?
}
}
#include "SquidString.h"
#include "base/AsyncJob.h"
+#include "comm/forward.h"
#include "ipc/TypedMsgHdr.h"
class CommTimeoutCbParams;
protected:
virtual void timedout() {} ///< called after setTimeout() if timed out
- int fd(); ///< creates if needed and returns raw UDS socket descriptor
+ Comm::ConnectionPointer &conn(); ///< creates if needed and returns raw UDS socket descriptor
/// call timedout() if no UDS messages in a given number of seconds
void setTimeout(int seconds, const char *handlerName);
private:
int options; ///< UDS options
- int fd_; ///< UDS descriptor
+ Comm::ConnectionPointer conn_; ///< UDS descriptor
private:
UdsOp(const UdsOp &); // not implemented
#include "squid.h"
#include "AccessLogEntry.h"
+#include "acl/Acl.h"
+#include "acl/Asn.h"
+#if USE_ADAPTATION
+#include "adaptation/Config.h"
+#endif
+#if USE_ECAP
+#include "adaptation/ecap/Config.h"
+#endif
#if ICAP_CLIENT
+#include "adaptation/icap/Config.h"
#include "adaptation/icap/icap_log.h"
#endif
#include "auth/Gadgets.h"
+#include "base/Subscription.h"
#include "base/TextException.h"
-#include "ConfigParser.h"
-#include "errorpage.h"
-#include "event.h"
-#include "EventLoop.h"
-#include "ExternalACL.h"
-#include "Store.h"
-#include "ICP.h"
-#include "ident/Ident.h"
-#include "HttpReply.h"
-#include "pconn.h"
-#include "Mem.h"
-#include "acl/Asn.h"
-#include "acl/Acl.h"
-#include "htcp.h"
-#include "StoreFileSystem.h"
-#include "DiskIO/DiskIOModule.h"
#include "comm.h"
-#include "ipc/Kids.h"
-#include "ipc/Coordinator.h"
-#include "ipc/Strand.h"
-#include "ip/tools.h"
#if USE_EPOLL
#include "comm_epoll.h"
#endif
#if defined(USE_SELECT) || defined(USE_SELECT_WIN32)
#include "comm_select.h"
#endif
-#include "SquidTime.h"
-#include "SwapDir.h"
+#include "ConfigParser.h"
+#include "DiskIO/DiskIOModule.h"
+#include "errorpage.h"
+#if USE_SQUID_ESI
+#include "esi/Module.h"
+#endif
+#include "event.h"
+#include "EventLoop.h"
+#include "ExternalACL.h"
#include "forward.h"
-#include "MemPool.h"
+#include "fs/Module.h"
+#include "htcp.h"
+#include "HttpReply.h"
#include "icmp/IcmpSquid.h"
#include "icmp/net_db.h"
-#include "PeerSelectState.h"
+#include "ICP.h"
+#include "ident/Ident.h"
+#include "ip/tools.h"
+#include "ipc/Coordinator.h"
+#include "ipc/Kids.h"
+#include "ipc/Strand.h"
#if USE_LOADABLE_MODULES
#include "LoadableModules.h"
#endif
+#include "Mem.h"
+#include "MemPool.h"
+#include "pconn.h"
+#include "PeerSelectState.h"
+#include "SquidTime.h"
+#include "Store.h"
+#include "StoreFileSystem.h"
+#include "SwapDir.h"
-#if ICAP_CLIENT
-#include "adaptation/icap/Config.h"
-#endif
-#if USE_ECAP
-#include "adaptation/ecap/Config.h"
-#endif
-#if USE_ADAPTATION
-#include "adaptation/Config.h"
-#endif
-
-#if USE_SQUID_ESI
-#include "esi/Module.h"
-#endif
-
-#include "fs/Module.h"
#if USE_WIN32_SERVICE
*/
#include "squid.h"
+#include "comm/Connection.h"
+// XXX: for icpIncomingConn - need to pass it as a generic parameter.
+#include "ICP.h"
int
mcastSetTtl(int fd, int mcast_ttl)
mcastJoinGroups(const ipcache_addrs *ia, const DnsLookupDetails &, void *datanotused)
{
#ifdef IP_MULTICAST_TTL
- int fd = theInIcpConnection;
-
- struct ip_mreq mr;
- int i;
- int x;
- char c = 0;
if (ia == NULL) {
- debugs(7, 0, "comm_join_mcast_groups: Unknown host");
+ debugs(7, DBG_CRITICAL, "ERROR: Attempting to join multicast group. Cannot resolve group hostname in DNS.");
return;
}
- for (i = 0; i < (int) ia->count; i++) {
+ for (int i = 0; i < (int) ia->count; i++) {
debugs(7, 9, "Listening for ICP requests on " << ia->in_addrs[i] );
if ( ! ia->in_addrs[i].IsIPv4() ) {
- debugs(7, 9, "ERROR: IPv6 Multicast Listen has not been implemented!");
+ debugs(7, 2, "ERROR: IPv6 Multicast Listen has not been implemented!");
continue;
}
+ struct ip_mreq mr;
ia->in_addrs[i].GetInAddr(mr.imr_multiaddr);
-
mr.imr_interface.s_addr = INADDR_ANY;
- x = setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
- (char *) &mr, sizeof(struct ip_mreq));
-
- if (x < 0)
- debugs(7, 1, "comm_join_mcast_groups: FD " << fd << ", IP=" << ia->in_addrs[i]);
- x = setsockopt(fd, IPPROTO_IP, IP_MULTICAST_LOOP, &c, 1);
+ if (setsockopt(icpIncomingConn->fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) &mr, sizeof(struct ip_mreq)) < 0)
+ debugs(7, DBG_IMPORTANT, "ERROR: Join failed for " << icpIncomingConn << ", Multicast IP=" << ia->in_addrs[i]);
- if (x < 0)
- debugs(7, 1, "Can't disable multicast loopback: " << xstrerror());
+ char c = 0;
+ if (setsockopt(icpIncomingConn->fd, IPPROTO_IP, IP_MULTICAST_LOOP, &c, 1) < 0)
+ debugs(7, DBG_IMPORTANT, "ERROR: " << icpIncomingConn << " can't disable multicast loopback: " << xstrerror());
}
#endif
"Peer Cache Statistics",
neighborDumpPeers, 0, 1);
- if (theInIcpConnection >= 0) {
+ if (Comm::IsConnOpen(icpIncomingConn)) {
manager->registerAction("non_peers",
"List of Unknown sites sending ICP messages",
neighborDumpNonPeers, 0, 1);
void
neighbors_init(void)
{
- Ip::Address nul;
- struct addrinfo *AI = NULL;
struct servent *sep = NULL;
const char *me = getMyHostname();
peer *thisPeer = NULL;
peer *next = NULL;
- int fd = theInIcpConnection;
neighborsRegisterWithCacheManager();
- /* setup addrinfo for use */
- nul.InitAddrInfo(AI);
-
- if (fd >= 0) {
-
- if (getsockname(fd, AI->ai_addr, &AI->ai_addrlen) < 0)
- debugs(15, 1, "getsockname(" << fd << "," << AI->ai_addr << "," << &AI->ai_addrlen << ") failed.");
+ if (Comm::IsConnOpen(icpIncomingConn)) {
for (thisPeer = Config.peers; thisPeer; thisPeer = next) {
http_port_list *s = NULL;
if (thisPeer->http_port != s->s.GetPort())
continue;
- debugs(15, 1, "WARNING: Peer looks like this host");
+ debugs(15, DBG_IMPORTANT, "WARNING: Peer looks like this host");
- debugs(15, 1, " Ignoring " <<
+ debugs(15, DBG_IMPORTANT, " Ignoring " <<
neighborTypeStr(thisPeer) << " " << thisPeer->host <<
"/" << thisPeer->http_port << "/" <<
thisPeer->icp.port);
peerRefreshDNS((void *) 1);
- if (ICP_INVALID == echo_hdr.opcode) {
+ if (echo_hdr.opcode == ICP_INVALID) {
echo_hdr.opcode = ICP_SECHO;
echo_hdr.version = ICP_VERSION_CURRENT;
echo_hdr.length = 0;
echo_hdr.reqnum = 0;
echo_hdr.flags = 0;
echo_hdr.pad = 0;
- nul = *AI;
- nul.GetInAddr( *((struct in_addr*)&echo_hdr.shostid) );
+ theIcpPublicHostID.GetInAddr( *((struct in_addr*)&echo_hdr.shostid) );
sep = getservbyname("echo", "udp");
echo_port = sep ? ntohs((u_short) sep->s_port) : 7;
}
first_ping = Config.peers;
- nul.FreeAddrInfo(AI);
}
int
} else
#endif
{
- if (Config.Port.icp <= 0 || theOutIcpConnection <= 0) {
+ if (Config.Port.icp <= 0 || !Comm::IsConnOpen(icpOutgoingConn)) {
debugs(15, DBG_CRITICAL, "ICP is disabled! Cannot send ICP request to peer.");
continue;
} else {
if (p->type == PEER_MULTICAST)
- mcastSetTtl(theOutIcpConnection, p->mcast.ttl);
+ mcastSetTtl(icpOutgoingConn->fd, p->mcast.ttl);
if (p->icp.port == echo_port) {
debugs(15, 4, "neighborsUdpPing: Looks like a dumb cache, send DECHO ping");
echo_hdr.reqnum = reqnum;
query = _icp_common_t::createMessage(ICP_DECHO, 0, url, reqnum, 0);
- icpUdpSend(theOutIcpConnection,p->in_addr,query,LOG_ICP_QUERY,0);
+ icpUdpSend(icpOutgoingConn->fd, p->in_addr, query, LOG_ICP_QUERY, 0);
} else {
flags = 0;
query = _icp_common_t::createMessage(ICP_QUERY, flags, url, reqnum, 0);
- icpUdpSend(theOutIcpConnection, p->in_addr, query, LOG_ICP_QUERY, 0);
+ icpUdpSend(icpOutgoingConn->fd, p->in_addr, query, LOG_ICP_QUERY, 0);
}
}
}
mem->start_ping = current_time;
mem->ping_reply_callback = peerCountHandleIcpReply;
mem->ircb_data = psstate;
- mcastSetTtl(theOutIcpConnection, p->mcast.ttl);
+ mcastSetTtl(icpOutgoingConn->fd, p->mcast.ttl);
p->mcast.id = mem->id;
reqnum = icpSetCacheKey((const cache_key *)fake->key);
query = _icp_common_t::createMessage(ICP_QUERY, 0, url, reqnum, 0);
- icpUdpSend(theOutIcpConnection,
- p->in_addr,
- query,
- LOG_ICP_QUERY,
- 0);
+ icpUdpSend(icpOutgoingConn->fd, p->in_addr, query, LOG_ICP_QUERY, 0);
fake->ping_status = PING_WAITING;
eventAdd("peerCountMcastPeersDone",
peerCountMcastPeersDone,
#include "squid.h"
#include "CacheManager.h"
-#include "Store.h"
#include "comm.h"
-#include "pconn.h"
+#include "comm/Connection.h"
#include "fde.h"
+#include "pconn.h"
+#include "Store.h"
#define PCONN_FDS_SZ 8 /* pconn set size, increase for better memcache hit rate */
}
int
-IdleConnList::findFDIndex (int fd)
+IdleConnList::findFDIndex(int fd)
{
int index;
}
void
-IdleConnList::read(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+IdleConnList::read(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
- debugs(48, 3, "IdleConnList::read: " << len << " bytes from FD " << fd);
+ debugs(48, 3, "IdleConnList::read: " << len << " bytes from " << conn);
if (flag == COMM_ERR_CLOSING) {
/* Bail out early on COMM_ERR_CLOSING - close handlers will tidy up for us */
}
IdleConnList *list = (IdleConnList *) data;
- if (list->removeFD(fd)) /* might delete list */
- comm_close(fd);
+ if (list && list->removeFD(conn->fd)) { /* might delete list */
+ Comm::ConnectionPointer nonConst = conn;
+ nonConst->close();
+ }
}
void
*/
#include "squid.h"
+#include "comm/Connection.h"
#include "event.h"
#include "fde.h"
+#include "ICP.h"
#include "SquidTime.h"
static IPH send_announce;
if (0 == Config.onoff.announce)
return;
- if (theOutIcpConnection < 0)
+ if (!Comm::IsConnOpen(icpOutgoingConn))
return;
ipcache_nbgethostbyname(Config.Announce.host, send_announce, NULL);
LOCAL_ARRAY(char, tbuf, 256);
LOCAL_ARRAY(char, sndbuf, BUFSIZ);
- Ip::Address S;
char *host = Config.Announce.host;
char *file = NULL;
u_short port = Config.Announce.port;
int l;
int n;
int fd;
- int x;
if (ia == NULL) {
debugs(27, 1, "send_announce: Unknown host '" << host << "'");
}
}
- S = ia->in_addrs[0];
+ Ip::Address S = ia->in_addrs[0];
S.SetPort(port);
- assert(theOutIcpConnection > 0);
- x = comm_udp_sendto(theOutIcpConnection, S, sndbuf, strlen(sndbuf) + 1);
+ assert(Comm::IsConnOpen(icpOutgoingConn));
- if (x < 0)
- debugs(27, 1, "send_announce: FD " << theOutIcpConnection << ": " << xstrerror());
+ if (comm_udp_sendto(icpOutgoingConn->fd, S, sndbuf, strlen(sndbuf) + 1) < 0)
+ debugs(27, 1, "ERROR: Failed to announce to " << S << " from " << icpOutgoingConn->local << ": " << xstrerror());
}
#include "ipc/StartListening.h"
#include "ip/Address.h"
#include "ip/tools.h"
+#include "snmp_core.h"
#define SNMP_REQUEST_SIZE 4096
#define MAX_PROTOSTAT 5
public Ipc::StartListeningCb
{
public:
- typedef void (*Handler)(int fd, int errNo);
+ typedef void (*Handler)(const Comm::ConnectionPointer &conn, int errNo);
SnmpListeningStartedDialer(Handler aHandler): handler(aHandler) {}
virtual void print(std::ostream &os) const { startPrint(os) << ')'; }
-
virtual bool canDial(AsyncCall &) const { return true; }
- virtual void dial(AsyncCall &) { (handler)(fd, errNo); }
+ virtual void dial(AsyncCall &) { (handler)(conn, errNo); }
public:
Handler handler;
};
+static void snmpPortOpened(const Comm::ConnectionPointer &conn, int errNo);
-Ip::Address theOutSNMPAddr;
-
typedef struct _mib_tree_entry mib_tree_entry;
typedef oid *(instance_Fn) (oid * name, snint * len, mib_tree_entry * current, oid_ParseFn ** Fn);
mib_tree_entry *mib_tree_head;
mib_tree_entry *mib_tree_last;
-static void snmpIncomingConnectionOpened(int fd, int errNo);
-static void snmpOutgoingConnectionOpened(int fd, int errNo);
+Comm::ConnectionPointer snmpIncomingConn;
+Comm::ConnectionPointer snmpOutgoingConn;
static mib_tree_entry * snmpAddNodeStr(const char *base_str, int o, oid_ParseFn * parsefunction, instance_Fn * instancefunction);
static mib_tree_entry *snmpAddNode(oid * name, int len, oid_ParseFn * parsefunction, instance_Fn * instancefunction, int children,...);
{
debugs(49, 5, "snmpConnectionOpen: Called");
- if (Config.Port.snmp > 0) {
- Config.Addrs.snmp_incoming.SetPort(Config.Port.snmp);
+ if (Config.Port.snmp <= 0)
+ return;
- if (!Ip::EnableIpv6 && !Config.Addrs.snmp_incoming.SetIPv4()) {
- debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << Config.Addrs.snmp_incoming << " is not an IPv4 address.");
- fatal("SNMP port cannot be opened.");
- }
- /* split-stack for now requires IPv4-only SNMP */
- if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && Config.Addrs.snmp_incoming.IsAnyAddr()) {
- Config.Addrs.snmp_incoming.SetIPv4();
- }
+ snmpIncomingConn = new Comm::Connection;
+ snmpIncomingConn->local = Config.Addrs.snmp_incoming;
+ snmpIncomingConn->local.SetPort(Config.Port.snmp);
- AsyncCall::Pointer call = asyncCall(49, 2,
- "snmpIncomingConnectionOpened",
- SnmpListeningStartedDialer(&snmpIncomingConnectionOpened));
+ if (!Ip::EnableIpv6 && !snmpIncomingConn->local.SetIPv4()) {
+ debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << snmpIncomingConn->local << " is not an IPv4 address.");
+ fatal("SNMP port cannot be opened.");
+ }
+ /* split-stack for now requires IPv4-only SNMP */
+ if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && snmpIncomingConn->local.IsAnyAddr()) {
+ snmpIncomingConn->local.SetIPv4();
+ }
- Ipc::StartListening(SOCK_DGRAM,
- IPPROTO_UDP,
- Config.Addrs.snmp_incoming,
- COMM_NONBLOCKING,
- Ipc::fdnInSnmpSocket, call);
+ AsyncCall::Pointer call = asyncCall(49, 2, "snmpIncomingConnectionOpened",
+ SnmpListeningStartedDialer(&snmpPortOpened));
+ Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, snmpIncomingConn, Ipc::fdnInSnmpSocket, call, Subscription::Pointer());
- if (!Config.Addrs.snmp_outgoing.IsNoAddr()) {
- Config.Addrs.snmp_outgoing.SetPort(Config.Port.snmp);
+ if (!Config.Addrs.snmp_outgoing.IsNoAddr()) {
+ snmpOutgoingConn = new Comm::Connection;
+ snmpOutgoingConn->local = Config.Addrs.snmp_outgoing;
+ snmpOutgoingConn->local.SetPort(Config.Port.snmp);
- if (!Ip::EnableIpv6 && !Config.Addrs.snmp_outgoing.SetIPv4()) {
- debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << Config.Addrs.snmp_outgoing << " is not an IPv4 address.");
- fatal("SNMP port cannot be opened.");
- }
- /* split-stack for now requires IPv4-only SNMP */
- if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && Config.Addrs.snmp_outgoing.IsAnyAddr()) {
- Config.Addrs.snmp_outgoing.SetIPv4();
- }
- AsyncCall::Pointer call = asyncCall(49, 2,
- "snmpOutgoingConnectionOpened",
- SnmpListeningStartedDialer(&snmpOutgoingConnectionOpened));
-
- Ipc::StartListening(SOCK_DGRAM,
- IPPROTO_UDP,
- Config.Addrs.snmp_outgoing,
- COMM_NONBLOCKING,
- Ipc::fdnOutSnmpSocket, call);
+ if (!Ip::EnableIpv6 && !snmpOutgoingConn->local.SetIPv4()) {
+ debugs(49, DBG_CRITICAL, "ERROR: IPv6 is disabled. " << snmpOutgoingConn->local << " is not an IPv4 address.");
+ fatal("SNMP port cannot be opened.");
}
+ /* split-stack for now requires IPv4-only SNMP */
+ if (Ip::EnableIpv6&IPV6_SPECIAL_SPLITSTACK && snmpOutgoingConn->local.IsAnyAddr()) {
+ snmpOutgoingConn->local.SetIPv4();
+ }
+ AsyncCall::Pointer call = asyncCall(49, 2, "snmpOutgoingConnectionOpened",
+ SnmpListeningStartedDialer(&snmpPortOpened));
+ Ipc::StartListening(SOCK_DGRAM, IPPROTO_UDP, snmpOutgoingConn, Ipc::fdnOutSnmpSocket, call, Subscription::Pointer());
+ } else {
+ snmpOutgoingConn = snmpIncomingConn;
+ debugs(1, DBG_IMPORTANT, "Sending SNMP messages from " << snmpOutgoingConn->local);
}
}
static void
-snmpIncomingConnectionOpened(int fd, int errNo)
+snmpPortOpened(const Comm::ConnectionPointer &conn, int errNo)
{
- theInSnmpConnection = fd;
- if (theInSnmpConnection < 0)
- fatal("Cannot open Incoming SNMP Port");
+ if (!Comm::IsConnOpen(conn))
+ fatalf("Cannot open SNMP %s Port",(conn->fd == snmpIncomingConn->fd?"receiving":"sending"));
- commSetSelect(theInSnmpConnection, COMM_SELECT_READ, snmpHandleUdp, NULL,
- 0);
+ commSetSelect(conn->fd, COMM_SELECT_READ, snmpHandleUdp, NULL, 0);
- debugs(1, 1, "Accepting SNMP messages on " << Config.Addrs.snmp_incoming <<
- ", FD " << theInSnmpConnection << ".");
-
- if (Config.Addrs.snmp_outgoing.IsNoAddr())
- theOutSnmpConnection = theInSnmpConnection;
-}
-
-static void
-snmpOutgoingConnectionOpened(int fd, int errNo)
-{
- theOutSnmpConnection = fd;
- if (theOutSnmpConnection < 0)
- fatal("Cannot open Outgoing SNMP Port");
-
- commSetSelect(theOutSnmpConnection, COMM_SELECT_READ, snmpHandleUdp, NULL,
- 0);
-
- debugs(1, 1, "Outgoing SNMP messages on " << Config.Addrs.snmp_outgoing <<
- ", FD " << theOutSnmpConnection << ".");
-
- {
- struct addrinfo *xaddr = NULL;
- int x;
-
-
- theOutSNMPAddr.SetEmpty();
-
- theOutSNMPAddr.InitAddrInfo(xaddr);
-
- x = getsockname(theOutSnmpConnection, xaddr->ai_addr, &xaddr->ai_addrlen);
-
- if (x < 0)
- debugs(51, 1, "theOutSnmpConnection FD " << theOutSnmpConnection << ": getsockname: " << xstrerror());
- else
- theOutSNMPAddr = *xaddr;
-
- theOutSNMPAddr.FreeAddrInfo(xaddr);
- }
+ if (conn->fd == snmpIncomingConn->fd)
+ debugs(1, DBG_IMPORTANT, "Accepting SNMP messages on " << snmpIncomingConn->local);
+ else if (conn->fd == snmpOutgoingConn->fd)
+ debugs(1, DBG_IMPORTANT, "Sending SNMP messages from " << snmpOutgoingConn->local);
+ else
+ fatalf("Lost SNMP port (%d) on FD %d", (int)conn->local.GetPort(), conn->fd);
}
void
snmpConnectionShutdown(void)
{
- if (theInSnmpConnection < 0)
+ if (!Comm::IsConnOpen(snmpIncomingConn))
return;
- if (theInSnmpConnection != theOutSnmpConnection) {
- debugs(49, 1, "FD " << theInSnmpConnection << " Closing SNMP socket");
- comm_close(theInSnmpConnection);
- }
-
- /*
- * Here we set 'theInSnmpConnection' to -1 even though the SNMP 'in'
- * and 'out' sockets might be just one FD. This prevents this
- * function from executing repeatedly. When we are really ready to
- * exit or restart, main will comm_close the 'out' descriptor.
- */
- theInSnmpConnection = -1;
+ // Perform lazy closure. So as not to step on outgoing connection when sharing.
+ debugs(49, DBG_IMPORTANT, "Closing SNMP receiving port " << snmpIncomingConn->local);
+ snmpIncomingConn = NULL;
/*
* Normally we only write to the outgoing SNMP socket, but we
* specific interface. During shutdown, we must disable reading
* on the outgoing socket.
*/
- assert(theOutSnmpConnection > -1);
+ assert(Comm::IsConnOpen(snmpOutgoingConn));
- commSetSelect(theOutSnmpConnection, COMM_SELECT_READ, NULL, NULL, 0);
+ commSetSelect(snmpOutgoingConn->fd, COMM_SELECT_READ, NULL, NULL, 0);
}
void
{
snmpConnectionShutdown();
- if (theOutSnmpConnection > -1) {
- debugs(49, 1, "FD " << theOutSnmpConnection << " Closing SNMP socket");
- comm_close(theOutSnmpConnection);
- /* make sure the SNMP out connection is unset */
- theOutSnmpConnection = -1;
- }
+ if (!Comm::IsConnOpen(snmpOutgoingConn))
+ return;
+
+ debugs(49, DBG_IMPORTANT, "Closing SNMP sending port " << snmpOutgoingConn->local);
+ snmpOutgoingConn = NULL;
}
/*
--- /dev/null
+#ifndef _SQUID_SNMP_CORE_H
+#define _SQUID_SNMP_CORE_H
+
+#include "config.h"
+
+#if SQUID_SNMP
+#include "comm/forward.h"
+
+extern Comm::ConnectionPointer snmpOutgoingConn;
+// PRIVATE? extern int theInSnmpConnection;
+// DEAD? extern char *snmp_agentinfo;
+
+#endif /* SQUID_SNMP */
+
+#endif /* _SQUID_SNMP_CORE_H */
*/
#include "squid.h"
-#include "compat/initgroups.h"
-#include "compat/getaddrinfo.h"
-#include "compat/getnameinfo.h"
-#include "compat/tempnam.h"
+// XXX: these should be already pulled in...
+//#include "compat/initgroups.h"
+//#include "compat/getaddrinfo.h"
+//#include "compat/getnameinfo.h"
+//#include "compat/tempnam.h"
+
+#include "base/Subscription.h"
#include "fde.h"
+#include "ICP.h"
#include "ip/Intercept.h"
+#include "ipc/Coordinator.h"
+#include "ipc/Kids.h"
#include "MemBuf.h"
#include "ProtoPort.h"
#include "SquidMath.h"
#include "SquidTime.h"
-#include "ipc/Kids.h"
-#include "ipc/Coordinator.h"
#include "SwapDir.h"
#include "wordlist.h"
void
releaseServerSockets(void)
{
- int i;
/* Release the main ports as early as possible */
- for (i = 0; i < NHttpSockets; i++) {
+ // clear both http_port and https_port lists.
+ for (int i = 0; i < NHttpSockets; i++) {
if (HttpSockets[i] >= 0)
close(HttpSockets[i]);
}
- if (theInIcpConnection >= 0)
- close(theInIcpConnection);
+ // clear icp_port's
+ icpConnectionClose();
- if (theOutIcpConnection >= 0 && theOutIcpConnection != theInIcpConnection)
- close(theOutIcpConnection);
+ // XXX: Why not the HTCP, SNMP, DNS ports as well?
}
static char *
class Connection;
void *operator new(size_t);
void operator delete (void *);
- static void ReadClient(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data);
- static void ReadServer(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data);
- static void WriteClientDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data);
- static void WriteServerDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data);
+ static void ReadClient(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data);
+ static void ReadServer(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data);
+ static void WriteClientDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data);
+ static void WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data);
bool noConnections() const;
char *url;
/* Read from server side and queue it for writing to the client */
void
-TunnelStateData::ReadServer(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
+TunnelStateData::ReadServer(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
- assert (cbdataReferenceValid (tunnelState));
+ assert(cbdataReferenceValid(tunnelState));
tunnelState->readServer(buf, len, errcode, xerrno);
}
void
TunnelStateData::readServer(char *buf, size_t len, comm_err_t errcode, int xerrno)
{
- debugs(26, 3, HERE << server.conn << ", read " << len << " bytes");
+ debugs(26, 3, HERE << server.conn << ", read " << len << " bytes");
/*
* Bail out early on COMM_ERR_CLOSING
kb_incr(&statCounter.server.other.kbytes_in, len);
}
- copy (len, errcode, xerrno, server, client, WriteClientDone);
+ copy(len, errcode, xerrno, server, client, WriteClientDone);
}
void
/* XXX fixme xstrerror and xerrno... */
errno = xerrno;
- debugs(50, debugLevelForError(xerrno), "TunnelStateData::Connection::error: FD " << conn->fd <<
- ": read/write failure: " << xstrerror());
+ debugs(50, debugLevelForError(xerrno), HERE << conn << ": read/write failure: " << xstrerror());
if (!ignoreErrno(xerrno))
conn->close();
/* Read from client side and queue it for writing to the server */
void
-TunnelStateData::ReadClient(int fd, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
+TunnelStateData::ReadClient(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t errcode, int xerrno, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
- assert (cbdataReferenceValid (tunnelState));
+ assert(cbdataReferenceValid(tunnelState));
tunnelState->readClient(buf, len, errcode, xerrno);
}
to.conn->close();
}
} else if (cbdataReferenceValid(this))
- comm_write(to.conn->fd, from.buf, len, completion, this, NULL);
+ comm_write(to.conn, from.buf, len, completion, this, NULL);
cbdataInternalUnlock(this); /* ??? */
}
/* Writes data from the client buffer to the server side */
void
-TunnelStateData::WriteServerDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+TunnelStateData::WriteServerDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
- assert (cbdataReferenceValid (tunnelState));
+ assert(cbdataReferenceValid(tunnelState));
tunnelState->writeServerDone(buf, len, flag, xerrno);
}
/* Writes data from the server buffer to the client side */
void
-TunnelStateData::WriteClientDone(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+TunnelStateData::WriteClientDone(const Comm::ConnectionPointer &, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
- assert (cbdataReferenceValid (tunnelState));
+ assert(cbdataReferenceValid(tunnelState));
tunnelState->writeClientDone(buf, len, flag, xerrno);
}
void
-TunnelStateData::Connection::dataSent (size_t amount)
+TunnelStateData::Connection::dataSent(size_t amount)
{
assert(amount == (size_t)len);
len =0;
}
static void
-tunnelConnectedWriteDone(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
+tunnelConnectedWriteDone(const Comm::ConnectionPointer &conn, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
if (flag != COMM_OK) {
- tunnelErrorComplete(fd, data, 0);
+ tunnelErrorComplete(conn->fd, data, 0);
return;
}
TunnelStateData *tunnelState = (TunnelStateData *)data;
debugs(26, 3, HERE << server << ", tunnelState=" << tunnelState);
*tunnelState->status_ptr = HTTP_OK;
- comm_write(tunnelState->client.conn->fd, conn_established, strlen(conn_established),
+ comm_write(tunnelState->client.conn, conn_established, strlen(conn_established),
tunnelConnectedWriteDone, tunnelState, NULL);
}
static void
-tunnelErrorComplete(int fdnotused, void *data, size_t sizenotused)
+tunnelErrorComplete(int /*const Comm::ConnectionPointer &*/, void *data, size_t)
{
TunnelStateData *tunnelState = (TunnelStateData *)data;
assert(tunnelState != NULL);
err->port = conn->remote.GetPort();
err->callback = tunnelErrorComplete;
err->callback_data = tunnelState;
- errorSend(tunnelState->client.conn->fd, err);
+ errorSend(tunnelState->client.conn, err);
}
return;
}
if (answer == 0) {
err = errorCon(ERR_FORWARDING_DENIED, HTTP_FORBIDDEN, request);
*status_ptr = HTTP_FORBIDDEN;
- errorSend(http->getConn()->clientConn->fd, err);
+ errorSend(http->getConn()->clientConn, err);
return;
}
}
packerClean(&p);
mb.append("\r\n", 2);
- comm_write_mbuf(server->fd, &mb, tunnelConnectedWriteDone, tunnelState);
+ comm_write_mbuf(server, &mb, tunnelConnectedWriteDone, tunnelState);
commSetTimeout(server->fd, Config.Timeout.read, tunnelTimeout, tunnelState);
}
*tunnelState->status_ptr = HTTP_SERVICE_UNAVAILABLE;
err->callback = tunnelErrorComplete;
err->callback_data = tunnelState;
- errorSend(tunnelState->client.conn->fd, err);
+ errorSend(tunnelState->client.conn, err);
return;
}
public:
~WhoisState();
- void readReply (int fd, char *aBuffer, size_t aBufferLength, comm_err_t flag, int xerrno);
+ void readReply(const Comm::ConnectionPointer &, char *aBuffer, size_t aBufferLength, comm_err_t flag, int xerrno);
void setReplyToOK(StoreEntry *sentry);
StoreEntry *entry;
HttpRequest *request;
}
static void
-whoisWriteComplete(int fd, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
+whoisWriteComplete(const Comm::ConnectionPointer &, char *buf, size_t size, comm_err_t flag, int xerrno, void *data)
{
xfree(buf);
}
whoisStart(FwdState * fwd)
{
WhoisState *p;
- int fd = fwd->serverConnection()->fd;
char *buf;
size_t l;
CBDATA_INIT_TYPE(WhoisState);
p->dataWritten = false;
p->entry->lock();
- comm_add_close_handler(fd, whoisClose, p);
+ comm_add_close_handler(fwd->serverConnection()->fd, whoisClose, p);
l = p->request->urlpath.size() + 3;
String str_print=p->request->urlpath.substr(1,p->request->urlpath.size());
snprintf(buf, l, SQUIDSTRINGPH"\r\n", SQUIDSTRINGPRINT(str_print));
- comm_write(fd, buf, strlen(buf), whoisWriteComplete, p, NULL);
- comm_read(fd, p->buf, BUFSIZ, whoisReadReply, p);
- commSetTimeout(fd, Config.Timeout.read, whoisTimeout, p);
+ comm_write(fwd->serverConnection(), buf, strlen(buf), whoisWriteComplete, p, NULL);
+ comm_read(fwd->serverConnection()->fd, p->buf, BUFSIZ, whoisReadReply, p);
+ commSetTimeout(fwd->serverConnection()->fd, Config.Timeout.read, whoisTimeout, p);
}
/* PRIVATE */
}
static void
-whoisReadReply(int fd, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
+whoisReadReply(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data)
{
WhoisState *p = (WhoisState *)data;
- p->readReply(fd, buf, len, flag, xerrno);
+ p->readReply(conn, buf, len, flag, xerrno);
}
void
}
void
-WhoisState::readReply (int fd, char *aBuffer, size_t aBufferLength, comm_err_t flag, int xerrno)
+WhoisState::readReply(const Comm::ConnectionPointer &conn, char *aBuffer, size_t aBufferLength, comm_err_t flag, int xerrno)
{
int do_next_read = 0;
}
aBuffer[aBufferLength] = '\0';
- debugs(75, 3, "whoisReadReply: FD " << fd << " read " << aBufferLength << " bytes");
+ debugs(75, 3, HERE << conn << " read " << aBufferLength << " bytes");
debugs(75, 5, "{" << aBuffer << "}");
if (flag == COMM_OK && aBufferLength > 0) {
do_next_read = 1;
} else if (flag != COMM_OK || aBufferLength < 0) {
- debugs(50, 2, "whoisReadReply: FD " << fd << ": read failure: " << xstrerror() << ".");
+ debugs(50, 2, HERE << conn << ": read failure: " << xstrerror() << ".");
if (ignoreErrno(errno)) {
do_next_read = 1;
err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR, fwd->request);
err->xerrno = errno;
fwd->fail(err);
- comm_close(fd);
+ Comm::ConnectionPointer nonConst = conn;
+ nonConst->close();
do_next_read = 0;
}
} else {
debugs(75, 3, "whoisReadReply: Done: " << entry->url() );
- comm_close(fd);
+ Comm::ConnectionPointer nonConst = conn;
+ nonConst->close();
do_next_read = 0;
}
if (do_next_read)
- comm_read(fd, aBuffer, BUFSIZ, whoisReadReply, this);
+ comm_read(conn->fd, aBuffer, BUFSIZ, whoisReadReply, this);
}
static void