/*
- * $Id$
- *
- * DEBUG: section 54 Interprocess Communication
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
*
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
*/
-#include "config.h"
+
+/* DEBUG: section 54 Interprocess Communication */
+
+#include "squid.h"
#include "base/TextException.h"
#include "comm.h"
-#include "CommCalls.h"
#include "comm/Connection.h"
#include "comm/Write.h"
+#include "CommCalls.h"
#include "ipc/UdsOp.h"
-
Ipc::UdsOp::UdsOp(const String& pathAddr):
- AsyncJob("Ipc::UdsOp"),
- address(PathToAddress(pathAddr)),
- options(COMM_NONBLOCKING)
+ AsyncJob("Ipc::UdsOp"),
+ address(PathToAddress(pathAddr)),
+ options(COMM_NONBLOCKING)
{
debugs(54, 5, HERE << '[' << this << "] pathAddr=" << pathAddr);
}
typedef CommCbMemFunT<UdsOp, CommTimeoutCbParams> Dialer;
AsyncCall::Pointer handler = asyncCall(54,5, handlerName,
Dialer(CbcPointer<UdsOp>(this), &UdsOp::noteTimeout));
- commSetTimeout(conn()->fd, seconds, handler);
+ commSetConnTimeout(conn(), seconds, handler);
}
void Ipc::UdsOp::clearTimeout()
{
- commSetTimeout(conn()->fd, -1, NULL, NULL); // TODO: add Comm::ClearTimeout(fd)
+ commUnsetConnTimeout(conn());
}
void Ipc::UdsOp::noteTimeout(const CommTimeoutCbParams &)
timedout(); // our kid handles communication timeout
}
-
struct sockaddr_un
Ipc::PathToAddress(const String& pathAddr) {
assert(pathAddr.size() != 0);
return unixAddr;
}
-
CBDATA_NAMESPACED_CLASS_INIT(Ipc, UdsSender);
Ipc::UdsSender::UdsSender(const String& pathAddr, const TypedMsgHdr& aMessage):
- UdsOp(pathAddr),
- message(aMessage),
- retries(10), // TODO: make configurable?
- timeout(10), // TODO: make configurable?
- writing(false)
+ UdsOp(pathAddr),
+ message(aMessage),
+ retries(10), // TODO: make configurable?
+ timeout(10), // TODO: make configurable?
+ sleeping(false),
+ writing(false)
{
message.address(address);
}
+void Ipc::UdsSender::swanSong()
+{
+ // did we abort while waiting between retries?
+ if (sleeping)
+ cancelSleep();
+
+ UdsOp::swanSong();
+}
+
void Ipc::UdsSender::start()
{
UdsOp::start();
bool Ipc::UdsSender::doneAll() const
{
- return !writing && UdsOp::doneAll();
+ return !writing && !sleeping && UdsOp::doneAll();
}
void Ipc::UdsSender::write()
void Ipc::UdsSender::wrote(const CommIoCbParams& params)
{
- debugs(54, 5, HERE << params.conn << " flag " << params.flag << " [" << this << ']');
+ debugs(54, 5, HERE << params.conn << " flag " << params.flag << " retries " << retries << " [" << this << ']');
writing = false;
- if (params.flag != COMM_OK && retries-- > 0) {
- sleep(1); // do not spend all tries at once; XXX: use an async timed event instead of blocking here; store the time when we started writing so that we do not sleep if not needed?
- write(); // XXX: should we close on error so that conn() reopens?
+ if (params.flag != Comm::OK && retries-- > 0) {
+ // perhaps a fresh connection and more time will help?
+ conn()->close();
+ startSleep();
+ }
+}
+
+/// pause for a while before resending the message
+void Ipc::UdsSender::startSleep()
+{
+ Must(!sleeping);
+ sleeping = true;
+ eventAdd("Ipc::UdsSender::DelayedRetry",
+ Ipc::UdsSender::DelayedRetry,
+ new Pointer(this), 1, 0, false); // TODO: Use Fibonacci increments
+}
+
+/// stop sleeping (or do nothing if we were not)
+void Ipc::UdsSender::cancelSleep()
+{
+ if (sleeping) {
+ // Why not delete the event? See Comm::ConnOpener::cancelSleep().
+ sleeping = false;
+ debugs(54, 9, "stops sleeping");
+ }
+}
+
+/// legacy wrapper for Ipc::UdsSender::delayedRetry()
+void Ipc::UdsSender::DelayedRetry(void *data)
+{
+ Pointer *ptr = static_cast<Pointer*>(data);
+ assert(ptr);
+ if (UdsSender *us = dynamic_cast<UdsSender*>(ptr->valid())) {
+ // get back inside AsyncJob protection by scheduling an async job call
+ typedef NullaryMemFunT<Ipc::UdsSender> Dialer;
+ AsyncCall::Pointer call = JobCallback(54, 4, Dialer, us, Ipc::UdsSender::delayedRetry);
+ ScheduleCallHere(call);
+ }
+ delete ptr;
+}
+
+/// make another sending attempt after a pause
+void Ipc::UdsSender::delayedRetry()
+{
+ debugs(54, 5, HERE << sleeping);
+ if (sleeping) {
+ sleeping = false;
+ write(); // reopens the connection if needed
}
}
mustStop("timedout");
}
-
void Ipc::SendMessage(const String& toAddress, const TypedMsgHdr &message)
{
AsyncJob::Start(new UdsSender(toAddress, message));
}
+
+const Comm::ConnectionPointer &
+Ipc::ImportFdIntoComm(const Comm::ConnectionPointer &conn, int socktype, int protocol, Ipc::FdNoteId noteId)
+{
+ struct sockaddr_storage addr;
+ socklen_t len = sizeof(addr);
+ if (getsockname(conn->fd, reinterpret_cast<sockaddr*>(&addr), &len) == 0) {
+ conn->remote = addr;
+ struct addrinfo* addr_info = NULL;
+ conn->remote.getAddrInfo(addr_info);
+ addr_info->ai_socktype = socktype;
+ addr_info->ai_protocol = protocol;
+ comm_import_opened(conn, Ipc::FdNote(noteId), addr_info);
+ Ip::Address::FreeAddr(addr_info);
+ } else {
+ int xerrno = errno;
+ debugs(54, DBG_CRITICAL, "ERROR: Ipc::ImportFdIntoComm: " << conn << ' ' << xstrerr(xerrno));
+ conn->close();
+ }
+ return conn;
+}
+