/*
- * $Id$
- *
- * DEBUG: section 54 Interprocess Communication
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
*
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
*/
+/* DEBUG: section 54 Interprocess Communication */
-#include "config.h"
+#include "squid.h"
+#include "base/TextException.h"
#include "comm.h"
+#include "comm/Connection.h"
+#include "comm/Write.h"
#include "CommCalls.h"
#include "ipc/UdsOp.h"
-
Ipc::UdsOp::UdsOp(const String& pathAddr):
- AsyncJob("Ipc::UdsOp"),
- address(PathToAddress(pathAddr)),
- options(COMM_NONBLOCKING),
- fd_(-1)
+ AsyncJob("Ipc::UdsOp"),
+ address(PathToAddress(pathAddr)),
+ options(COMM_NONBLOCKING)
{
debugs(54, 5, HERE << '[' << this << "] pathAddr=" << pathAddr);
}
Ipc::UdsOp::~UdsOp()
{
debugs(54, 5, HERE << '[' << this << ']');
- if (fd_ >= 0)
- comm_close(fd_);
+ if (Comm::IsConnOpen(conn_))
+ conn_->close();
+ conn_ = NULL;
}
void Ipc::UdsOp::setOptions(int newOptions)
options = newOptions;
}
-int Ipc::UdsOp::fd()
+Comm::ConnectionPointer &
+Ipc::UdsOp::conn()
{
- if (fd_ < 0) {
+ if (!Comm::IsConnOpen(conn_)) {
if (options & COMM_DOBIND)
unlink(address.sun_path);
- fd_ = comm_open_uds(SOCK_DGRAM, 0, &address, options);
- Must(fd_ >= 0);
+ if (conn_ == NULL)
+ conn_ = new Comm::Connection;
+ conn_->fd = comm_open_uds(SOCK_DGRAM, 0, &address, options);
+ Must(Comm::IsConnOpen(conn_));
}
- return fd_;
+ return conn_;
}
void Ipc::UdsOp::setTimeout(int seconds, const char *handlerName)
{
+ typedef CommCbMemFunT<UdsOp, CommTimeoutCbParams> Dialer;
AsyncCall::Pointer handler = asyncCall(54,5, handlerName,
- CommCbMemFunT<UdsOp, CommTimeoutCbParams>(this,
- &UdsOp::noteTimeout));
- commSetTimeout(fd(), seconds, handler);
+ Dialer(CbcPointer<UdsOp>(this), &UdsOp::noteTimeout));
+ commSetConnTimeout(conn(), seconds, handler);
}
void Ipc::UdsOp::clearTimeout()
{
- commSetTimeout(fd(), -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd)
+ commUnsetConnTimeout(conn());
}
void Ipc::UdsOp::noteTimeout(const CommTimeoutCbParams &)
timedout(); // our kid handles communication timeout
}
-
struct sockaddr_un
Ipc::PathToAddress(const String& pathAddr) {
assert(pathAddr.size() != 0);
return unixAddr;
}
-
CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender);
Ipc::UdsSender::UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage):
- UdsOp(pathAddr),
- message(aMessage),
- retries(10), // TODO: make configurable?
- timeout(10), // TODO: make configurable?
- writing(false)
+ UdsOp(pathAddr),
+ message(aMessage),
+ retries(10), // TODO: make configurable?
+ timeout(10), // TODO: make configurable?
+ sleeping(false),
+ writing(false)
{
message.address(address);
}
+void Ipc::UdsSender::swanSong()
+{
+ // did we abort while waiting between retries?
+ if (sleeping)
+ cancelSleep();
+
+ UdsOp::swanSong();
+}
+
void Ipc::UdsSender::start()
{
UdsOp::start();
bool Ipc::UdsSender::doneAll() const
{
- return !writing && UdsOp::doneAll();
+ return !writing && !sleeping && UdsOp::doneAll();
}
void Ipc::UdsSender::write()
{
debugs(54, 5, HERE);
- AsyncCall::Pointer writeHandler = asyncCall(54, 5, "Ipc::UdsSender::wrote",
- CommCbMemFunT<UdsSender, CommIoCbParams>(this, &UdsSender::wrote));
- comm_write(fd(), message.raw(), message.size(), writeHandler);
+ typedef CommCbMemFunT<UdsSender, CommIoCbParams> Dialer;
+ AsyncCall::Pointer writeHandler = JobCallback(54, 5,
+ Dialer, this, UdsSender::wrote);
+ Comm::Write(conn(), message.raw(), message.size(), writeHandler, NULL);
writing = true;
}
void Ipc::UdsSender::wrote(const CommIoCbParams& params)
{
- debugs(54, 5, HERE << "FD " << params.fd << " flag " << params.flag << " [" << this << ']');
+ debugs(54, 5, HERE << params.conn << " flag " << params.flag << " retries " << retries << " [" << this << ']');
writing = false;
- if (params.flag != COMM_OK && retries-- > 0) {
- sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed?
- write(); // XXX: should we close on error so that fd() reopens?
+ if (params.flag != Comm::OK && retries-- > 0) {
+ // perhaps a fresh connection and more time will help?
+ conn()->close();
+ startSleep();
+ }
+}
+
+/// pause for a while before resending the message
+void Ipc::UdsSender::startSleep()
+{
+ Must(!sleeping);
+ sleeping = true;
+ eventAdd("Ipc::UdsSender::DelayedRetry",
+ Ipc::UdsSender::DelayedRetry,
+ new Pointer(this), 1, 0, false); // TODO: Use Fibonacci increments
+}
+
+/// stop sleeping (or do nothing if we were not)
+void Ipc::UdsSender::cancelSleep()
+{
+ if (sleeping) {
+ // Why not delete the event? See Comm::ConnOpener::cancelSleep().
+ sleeping = false;
+ debugs(54, 9, "stops sleeping");
+ }
+}
+
+/// legacy wrapper for Ipc::UdsSender::delayedRetry()
+void Ipc::UdsSender::DelayedRetry(void *data)
+{
+ Pointer *ptr = static_cast<Pointer*>(data);
+ assert(ptr);
+ if (UdsSender *us = dynamic_cast<UdsSender*>(ptr->valid())) {
+ // get back inside AsyncJob protection by scheduling an async job call
+ typedef NullaryMemFunT<Ipc::UdsSender> Dialer;
+ AsyncCall::Pointer call = JobCallback(54, 4, Dialer, us, Ipc::UdsSender::delayedRetry);
+ ScheduleCallHere(call);
+ }
+ delete ptr;
+}
+
+/// make another sending attempt after a pause
+void Ipc::UdsSender::delayedRetry()
+{
+ debugs(54, 5, HERE << sleeping);
+ if (sleeping) {
+ sleeping = false;
+ write(); // reopens the connection if needed
}
}
mustStop("timedout");
}
-
void Ipc::SendMessage(const String& toAddress, const TypedMsgHdr &message)
{
- AsyncJob::AsyncStart(new UdsSender(toAddress, message));
+ AsyncJob::Start(new UdsSender(toAddress, message));
}
+
+const Comm::ConnectionPointer &
+Ipc::ImportFdIntoComm(const Comm::ConnectionPointer &conn, int socktype, int protocol, Ipc::FdNoteId noteId)
+{
+ struct sockaddr_storage addr;
+ socklen_t len = sizeof(addr);
+ if (getsockname(conn->fd, reinterpret_cast<sockaddr*>(&addr), &len) == 0) {
+ conn->remote = addr;
+ struct addrinfo* addr_info = NULL;
+ conn->remote.getAddrInfo(addr_info);
+ addr_info->ai_socktype = socktype;
+ addr_info->ai_protocol = protocol;
+ comm_import_opened(conn, Ipc::FdNote(noteId), addr_info);
+ Ip::Address::FreeAddr(addr_info);
+ } else {
+ int xerrno = errno;
+ debugs(54, DBG_CRITICAL, "ERROR: Ipc::ImportFdIntoComm: " << conn << ' ' << xstrerr(xerrno));
+ conn->close();
+ }
+ return conn;
+}
+