#include "config.h"
#include "base/TextException.h"
+#include "comm/Connection.h"
#include "mgr/ActionWriter.h"
#include "Store.h"
CBDATA_NAMESPACED_CLASS_INIT(Mgr, ActionWriter);
-Mgr::ActionWriter::ActionWriter(const Action::Pointer &anAction, int aFd):
- StoreToCommWriter(aFd, anAction->createStoreEntry()),
+Mgr::ActionWriter::ActionWriter(const Action::Pointer &anAction, const Comm::ConnectionPointer &conn):
+ StoreToCommWriter(conn, anAction->createStoreEntry()),
action(anAction)
{
- debugs(16, 5, HERE << "FD " << aFd << " action: " << action);
+ debugs(16, 5, HERE << conn << " action: " << action);
}
void
#ifndef SQUID_MGR_ACTION_WRITER_H
#define SQUID_MGR_ACTION_WRITER_H
+#include "comm/forward.h"
#include "HttpRequestMethod.h"
#include "mgr/StoreToCommWriter.h"
class ActionWriter: public StoreToCommWriter
{
public:
- ActionWriter(const Action::Pointer &anAction, int aFd);
+ ActionWriter(const Action::Pointer &anAction, const Comm::ConnectionPointer &conn);
protected:
/* AsyncJob API */
#include "config.h"
#include "base/TextException.h"
+#include "comm/Connection.h"
#include "mgr/Filler.h"
#include "mgr/Response.h"
#include "Store.h"
CBDATA_NAMESPACED_CLASS_INIT(Mgr, Filler);
-Mgr::Filler::Filler(const Action::Pointer &anAction, int aFd,
+Mgr::Filler::Filler(const Action::Pointer &anAction, const Comm::ConnectionPointer &conn,
unsigned int aRequestId):
- StoreToCommWriter(aFd, anAction->createStoreEntry()),
+ StoreToCommWriter(conn, anAction->createStoreEntry()),
action(anAction),
requestId(aRequestId)
{
- debugs(16, 5, HERE << "FD " << aFd << " action: " << action);
+ debugs(16, 5, HERE << conn << " action: " << action);
}
void
#ifndef SQUID_MGR_FILLER_H
#define SQUID_MGR_FILLER_H
+#include "comm/forward.h"
#include "HttpRequestMethod.h"
#include "mgr/Action.h"
#include "mgr/StoreToCommWriter.h"
class Filler: public StoreToCommWriter
{
public:
- Filler(const Action::Pointer &anAction, int aFd, unsigned int aRequestId);
+ Filler(const Action::Pointer &anAction, const Comm::ConnectionPointer &conn, unsigned int aRequestId);
protected:
/* AsyncJob API */
#include "config.h"
#include "base/AsyncJobCalls.h"
#include "base/TextException.h"
+#include "comm/Connection.h"
#include "CommCalls.h"
#include "errorpage.h"
#include "HttpReply.h"
Mgr::Forwarder::RequestsMap Mgr::Forwarder::TheRequestsMap;
unsigned int Mgr::Forwarder::LastRequestId = 0;
-Mgr::Forwarder::Forwarder(int aFd, const ActionParams &aParams,
+Mgr::Forwarder::Forwarder(const Comm::ConnectionPointer &conn, const ActionParams &aParams,
HttpRequest* aRequest, StoreEntry* anEntry):
AsyncJob("Mgr::Forwarder"),
params(aParams),
- request(aRequest), entry(anEntry), fd(aFd), requestId(0), closer(NULL)
+ request(aRequest), entry(anEntry), clientConnection(conn), requestId(0), closer(NULL)
{
- debugs(16, 5, HERE << "FD " << aFd);
- Must(fd >= 0);
+ debugs(16, 5, HERE << clientConnection);
+ Must(Comm::IsConnOpen(clientConnection));
Must(request != NULL);
Must(entry != NULL);
closer = asyncCall(16, 5, "Mgr::Forwarder::noteCommClosed",
CommCbMemFunT<Forwarder, CommCloseCbParams>(this, &Forwarder::noteCommClosed));
- comm_add_close_handler(fd, closer);
+ comm_add_close_handler(clientConnection->fd, closer);
}
Mgr::Forwarder::~Forwarder()
void
Mgr::Forwarder::close()
{
- if (fd >= 0) {
+ if (Comm::IsConnOpen(clientConnection)) {
if (closer != NULL) {
- comm_remove_close_handler(fd, closer);
+ comm_remove_close_handler(clientConnection->fd, closer);
closer = NULL;
}
- comm_close(fd);
- fd = -1;
+ clientConnection->close();
}
}
++LastRequestId;
requestId = LastRequestId;
TheRequestsMap[requestId] = callback;
- Request mgrRequest(KidIdentifier, requestId, fd, params);
+ Request mgrRequest(KidIdentifier, requestId, clientConnection, params);
Ipc::TypedMsgHdr message;
try {
Mgr::Forwarder::noteCommClosed(const CommCloseCbParams& params)
{
debugs(16, 5, HERE);
- Must(fd == params.fd);
- fd = -1;
+ Must(!Comm::IsConnOpen(clientConnection));
mustStop("commClosed");
}
Mgr::Forwarder::callException(const std::exception& e)
{
try {
- if (entry != NULL && request != NULL && fd >= 0)
+ if (entry != NULL && request != NULL && Comm::IsConnOpen(clientConnection))
quitOnError("exception", errorCon(ERR_INVALID_RESP, HTTP_INTERNAL_SERVER_ERROR, request));
} catch (const std::exception& ex) {
debugs(16, DBG_CRITICAL, HERE << ex.what());
Mgr::Forwarder::Abort(void* param)
{
Forwarder* mgrFwdr = static_cast<Forwarder*>(param);
- if (mgrFwdr->fd >= 0)
- comm_close(mgrFwdr->fd);
+ if (Comm::IsConnOpen(mgrFwdr->clientConnection))
+ mgrFwdr->clientConnection->close();
}
#define SQUID_MGR_FORWARDER_H
#include "base/AsyncJob.h"
+#include "comm/forward.h"
#include "mgr/ActionParams.h"
#include <map>
class Forwarder: public AsyncJob
{
public:
- Forwarder(int aFd, const ActionParams &aParams, HttpRequest* aRequest,
+ Forwarder(const Comm::ConnectionPointer &conn, const ActionParams &aParams, HttpRequest* aRequest,
StoreEntry* anEntry);
virtual ~Forwarder();
ActionParams params; ///< action parameters to pass to the other side
HttpRequest* request; ///< HTTP client request for detailing errors
StoreEntry* entry; ///< Store entry expecting the response
- int fd; ///< HTTP client connection descriptor
+ Comm::ConnectionPointer clientConnection; ///< HTTP client connection descriptor
unsigned int requestId; ///< request id
AsyncCall::Pointer closer; ///< comm_close handler for the HTTP connection
#include "config.h"
#include "base/TextException.h"
+#include "comm/Connection.h"
#include "mgr/Command.h"
#include "mgr/Filler.h"
#include "mgr/FunAction.h"
Mgr::FunAction::respond(const Request& request)
{
debugs(16, 5, HERE);
- const int fd = ImportHttpFdIntoComm(request.fd);
- Must(fd >= 0);
+ const Comm::ConnectionPointer client = ImportHttpFdIntoComm(request.fd);
+ Must(Comm::IsConnOpen(client));
Must(request.requestId != 0);
- AsyncJob::Start(new Mgr::Filler(this, fd, request.requestId));
+ AsyncJob::Start(new Mgr::Filler(this, client, request.requestId));
}
void
#include "config.h"
#include "base/TextException.h"
+#include "comm/Connection.h"
#include "HttpReply.h"
#include "ipc/Messages.h"
#include "ipc/TypedMsgHdr.h"
Mgr::InfoAction::respond(const Request& request)
{
debugs(16, 5, HERE);
- int fd = ImportHttpFdIntoComm(request.fd);
- Must(fd >= 0);
+ Comm::ConnectionPointer client = ImportHttpFdIntoComm(request.fd);
+ Must(Comm::IsConnOpen(client));
Must(request.requestId != 0);
- AsyncJob::Start(new Mgr::Filler(this, fd, request.requestId));
+ AsyncJob::Start(new Mgr::Filler(this, client, request.requestId));
}
void
#include "config.h"
#include "base/TextException.h"
+#include "comm.h"
#include "CommCalls.h"
+#include "comm/Connection.h"
#include "HttpReply.h"
#include "ipc/Coordinator.h"
#include "mgr/ActionWriter.h"
return c1.kidId < c2.kidId;
}
-Mgr::Inquirer::Inquirer(Action::Pointer anAction, int aFd,
+Mgr::Inquirer::Inquirer(Action::Pointer anAction, const Comm::ConnectionPointer &conn,
const Request &aCause, const Ipc::StrandCoords &coords):
AsyncJob("Mgr::Inquirer"),
aggrAction(anAction),
cause(aCause),
- fd(aFd),
+ clientConnection(conn),
strands(coords), pos(strands.begin()),
requestId(0), closer(NULL), timeout(aggrAction->atomic() ? 10 : 100)
{
- debugs(16, 5, HERE << "FD " << aFd << " action: " << aggrAction);
+ debugs(16, 5, HERE << conn << " action: " << aggrAction);
// order by ascending kid IDs; useful for non-aggregatable stats
std::sort(strands.begin(), strands.end(), LesserStrandByKidId);
closer = asyncCall(16, 5, "Mgr::Inquirer::noteCommClosed",
CommCbMemFunT<Inquirer, CommCloseCbParams>(this, &Inquirer::noteCommClosed));
- comm_add_close_handler(fd, closer);
+ comm_add_close_handler(clientConnection->fd, closer);
}
Mgr::Inquirer::~Inquirer()
void
Mgr::Inquirer::close()
{
- if (fd >= 0) {
+ if (Comm::IsConnOpen(clientConnection)) {
removeCloseHandler();
- comm_close(fd);
- fd = -1;
+ clientConnection->close();
}
}
Mgr::Inquirer::removeCloseHandler()
{
if (closer != NULL) {
- comm_remove_close_handler(fd, closer);
+ comm_remove_close_handler(clientConnection->fd, closer);
closer = NULL;
}
}
Mgr::Inquirer::start()
{
debugs(16, 5, HERE);
- Must(fd >= 0);
+ Must(Comm::IsConnOpen(clientConnection));
Must(aggrAction != NULL);
std::auto_ptr<HttpReply> reply(new HttpReply);
std::auto_ptr<MemBuf> replyBuf(reply->pack());
writer = asyncCall(16, 5, "Mgr::Inquirer::noteWroteHeader",
CommCbMemFunT<Inquirer, CommIoCbParams>(this, &Inquirer::noteWroteHeader));
- comm_write_mbuf(fd, replyBuf.get(), writer);
+ comm_write_mbuf(clientConnection, replyBuf.get(), writer);
}
/// called when we wrote the response header
debugs(16, 5, HERE);
writer = NULL;
Must(params.flag == COMM_OK);
- Must(params.fd == fd);
+ Must(clientConnection != NULL && params.fd == clientConnection->fd);
Must(params.size != 0);
// start inquiries at the initial pos
inquire();
const int kidId = pos->kidId;
debugs(16, 4, HERE << "inquire kid: " << kidId << status());
TheRequestsMap[requestId] = callback;
- Request mgrRequest(KidIdentifier, requestId, fd,
+ Request mgrRequest(KidIdentifier, requestId, clientConnection,
aggrAction->command().params);
Ipc::TypedMsgHdr message;
mgrRequest.pack(message);
Mgr::Inquirer::noteCommClosed(const CommCloseCbParams& params)
{
debugs(16, 5, HERE);
- Must(fd < 0 || fd == params.fd);
- fd = -1;
+ Must(!Comm::IsConnOpen(clientConnection) || clientConnection->fd == params.fd);
+ clientConnection = NULL; // AYJ: Do we actually have to NULL it?
mustStop("commClosed");
}
}
if (aggrAction->aggregatable()) {
removeCloseHandler();
- AsyncJob::Start(new ActionWriter(aggrAction, fd));
- fd = -1; // should not close fd because we passed it to ActionWriter
+ AsyncJob::Start(new ActionWriter(aggrAction, clientConnection));
+ clientConnection = NULL; // should not close fd because we passed it to ActionWriter
}
close();
}
{
static MemBuf buf;
buf.reset();
- buf.Printf(" [FD %d, requestId %u]", fd, requestId);
+ buf.Printf(" [FD %d, requestId %u]", clientConnection->fd, requestId);
buf.terminate();
return buf.content();
}
#include "base/AsyncJobCalls.h"
#include "base/AsyncJob.h"
+#include "comm/forward.h"
#include "ipc/StrandCoords.h"
#include "MemBuf.h"
#include "mgr/Action.h"
class Inquirer: public AsyncJob
{
public:
- Inquirer(Action::Pointer anAction, int aFd, const Request &aCause,
+ Inquirer(Action::Pointer anAction, const Comm::ConnectionPointer &conn, const Request &aCause,
const Ipc::StrandCoords &coords);
virtual ~Inquirer();
Action::Pointer aggrAction; //< action to aggregate
Request cause; ///< cache manager request received from HTTP client
- int fd; ///< HTTP client socket descriptor
+ Comm::ConnectionPointer clientConnection; ///< HTTP client socket descriptor
Ipc::StrandCoords strands; ///< all strands we want to query, in order
Ipc::StrandCoords::const_iterator pos; ///< strand we should query now
#include "config.h"
#include "base/TextException.h"
+#include "comm/Connection.h"
#include "ipc/Messages.h"
#include "mgr/ActionParams.h"
#include "mgr/Request.h"
-Mgr::Request::Request(int aRequestorId, unsigned int aRequestId, int aFd,
+Mgr::Request::Request(int aRequestorId, unsigned int aRequestId, const Comm::ConnectionPointer &conn,
const ActionParams &aParams):
requestorId(aRequestorId), requestId(aRequestId),
- fd(aFd), params(aParams)
+ fd(conn->fd),
+ params(aParams)
{
Must(requestorId > 0);
Must(requestId != 0);
class Request
{
public:
- Request(int aRequestorId, unsigned int aRequestId, int aFd,
+ Request(int aRequestorId, unsigned int aRequestId, const Comm::ConnectionPointer &conn,
const ActionParams &aParams);
explicit Request(const Ipc::TypedMsgHdr& msg); ///< from recvmsg()
#include "config.h"
#include "base/TextException.h"
+#include "comm/Connection.h"
#include "CommCalls.h"
#include "ipc/FdNotes.h"
#include "mgr/StoreToCommWriter.h"
CBDATA_NAMESPACED_CLASS_INIT(Mgr, StoreToCommWriter);
-Mgr::StoreToCommWriter::StoreToCommWriter(int aFd, StoreEntry* anEntry):
+Mgr::StoreToCommWriter::StoreToCommWriter(const Comm::ConnectionPointer &conn, StoreEntry* anEntry):
AsyncJob("Mgr::StoreToCommWriter"),
- fd(aFd), entry(anEntry), sc(NULL), writeOffset(0), closer(NULL)
+ clientConnection(conn), entry(anEntry), sc(NULL), writeOffset(0), closer(NULL)
{
- debugs(16, 6, HERE << "FD " << fd);
+ debugs(16, 6, HERE << clientConnection);
closer = asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommClosed",
CommCbMemFunT<StoreToCommWriter, CommCloseCbParams>(this, &StoreToCommWriter::noteCommClosed));
- comm_add_close_handler(fd, closer);
+ comm_add_close_handler(clientConnection->fd, closer);
}
Mgr::StoreToCommWriter::~StoreToCommWriter()
void
Mgr::StoreToCommWriter::close()
{
- if (fd >= 0) {
+ if (Comm::IsConnOpen(clientConnection)) {
if (closer != NULL) {
- comm_remove_close_handler(fd, closer);
+ comm_remove_close_handler(clientConnection->fd, closer);
closer = NULL;
}
- comm_close(fd);
- fd = -1;
+ clientConnection->close();
}
}
Mgr::StoreToCommWriter::start()
{
debugs(16, 6, HERE);
- Must(fd >= 0);
+ Must(Comm::IsConnOpen(clientConnection));
Must(entry != NULL);
entry->registerAbort(&StoreToCommWriter::Abort, this);
sc = storeClientListAdd(entry, this);
Mgr::StoreToCommWriter::scheduleCommWrite(const StoreIOBuffer& ioBuf)
{
debugs(16, 6, HERE);
- Must(fd >= 0);
+ Must(Comm::IsConnOpen(clientConnection));
Must(ioBuf.data != NULL);
// write filled buffer
typedef CommCbMemFunT<StoreToCommWriter, CommIoCbParams> MyDialer;
AsyncCall::Pointer writer =
asyncCall(16, 5, "Mgr::StoreToCommWriter::noteCommWrote",
MyDialer(this, &StoreToCommWriter::noteCommWrote));
- comm_write(fd, ioBuf.data, ioBuf.length, writer);
+ comm_write(clientConnection, ioBuf.data, ioBuf.length, writer);
}
void
{
debugs(16, 6, HERE);
Must(params.flag == COMM_OK);
- Must(params.fd == fd);
+ Must(clientConnection != NULL && params.fd == clientConnection->fd);
Must(params.size != 0);
writeOffset += params.size;
if (!doneAll())
Mgr::StoreToCommWriter::noteCommClosed(const CommCloseCbParams& params)
{
debugs(16, 6, HERE);
- Must(fd == params.fd);
- fd = -1;
+ Must(!Comm::IsConnOpen(clientConnection));
mustStop("commClosed");
}
Mgr::StoreToCommWriter::Abort(void* param)
{
StoreToCommWriter* mgrWriter = static_cast<StoreToCommWriter*>(param);
- if (mgrWriter->fd >= 0)
- comm_close(mgrWriter->fd);
+ if (Comm::IsConnOpen(mgrWriter->clientConnection))
+ mgrWriter->clientConnection->close();
}
-
-int
+Comm::ConnectionPointer
Mgr::ImportHttpFdIntoComm(int fd)
{
+ Comm::ConnectionPointer result = new Comm::Connection();
struct sockaddr_in addr;
socklen_t len = sizeof(addr);
if (getsockname(fd, reinterpret_cast<sockaddr*>(&addr), &len) == 0) {
- Ip::Address ipAddr(addr);
+ result->fd = fd;
+ result->local = addr;
struct addrinfo* addr_info = NULL;
- ipAddr.GetAddrInfo(addr_info);
+ result->local.GetAddrInfo(addr_info);
addr_info->ai_socktype = SOCK_STREAM;
addr_info->ai_protocol = IPPROTO_TCP;
- comm_import_opened(fd, ipAddr, COMM_NONBLOCKING, Ipc::FdNote(Ipc::fdnHttpSocket), addr_info);
- ipAddr.FreeAddrInfo(addr_info);
+ comm_import_opened(result, Ipc::FdNote(Ipc::fdnHttpSocket), addr_info);
+ result->local.FreeAddrInfo(addr_info);
} else {
debugs(16, DBG_CRITICAL, HERE << "ERROR: FD " << fd << ' ' << xstrerror());
::close(fd);
fd = -1;
}
- return fd;
+ return result;
}
#define SQUID_MGR_STORE_TO_COMM_WRITER_H
#include "base/AsyncJob.h"
+#include "comm/forward.h"
#include "mgr/Action.h"
#include "StoreIOBuffer.h"
class StoreToCommWriter: public AsyncJob
{
public:
- StoreToCommWriter(int aFd, StoreEntry *anEntry);
+ StoreToCommWriter(const Comm::ConnectionPointer &conn, StoreEntry *anEntry);
virtual ~StoreToCommWriter();
protected:
void close();
protected:
- int fd; ///< HTTP client descriptor
+ Comm::ConnectionPointer clientConnection; ///< HTTP client descriptor
StoreEntry* entry; ///< store entry with the cache manager response
store_client* sc; ///< our registration with the store
};
/// import HTTP socket fd from another strand into our Comm state
-extern int ImportHttpFdIntoComm(int fd);
+extern Comm::ConnectionPointer ImportHttpFdIntoComm(int fd);
} // namespace Mgr