public:
CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
+ /// prints message parameters; suitable for cache manager reports
+ void stat(std::ostream &);
+
public:
int sender; ///< kid ID of sending process
sfileno xitIndex;
};
+void
+CollapsedForwardingMsg::stat(std::ostream &os)
+{
+ os << "sender: " << sender << ", xitIndex: " << xitIndex;
+}
+
// CollapsedForwarding
void
HandleNewData("after notification");
}
+void
+CollapsedForwarding::StatQueue(std::ostream &os)
+{
+ if (queue.get()) {
+ os << "Transients queues:\n";
+ queue->stat<CollapsedForwardingMsg>(os);
+ }
+}
+
/// initializes shared queue used by CollapsedForwarding
class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
{
/// handle queue push notifications from worker or disker
static void HandleNotification(const Ipc::TypedMsgHdr &msg);
+ /// prints IPC message queue state; suitable for cache manager reports
+ static void StatQueue(std::ostream &);
+
private:
typedef Ipc::MultiQueue Queue;
static std::unique_ptr<Queue> queue; ///< IPC queue
operator <<(std::ostream &os, const SipcIo &sio)
{
return os << "ipcIo" << sio.worker << '.' << sio.msg.requestId <<
- (sio.msg.command == IpcIo::cmdRead ? 'r' : 'w') << sio.disker;
+ sio.msg.command << sio.disker;
}
+/* IpcIo::Command */
+
+std::ostream &
+operator <<(std::ostream &os, const IpcIo::Command command)
+{
+ switch (command) {
+ case IpcIo::cmdNone:
+ return os << '-';
+ case IpcIo::cmdOpen:
+ return os << 'o';
+ case IpcIo::cmdRead:
+ return os << 'r';
+ case IpcIo::cmdWrite:
+ return os << 'w';
+ }
+ // unreachable code
+ return os << static_cast<int>(command);
+}
+
+/* IpcIoFile */
+
IpcIoFile::IpcIoFile(char const *aDb):
dbName(aDb), diskId(-1), error_(false), lastRequestId(0),
olderRequests(&requestMap1), newerRequests(&requestMap2),
HandleResponses("after notification");
}
+void
+IpcIoFile::StatQueue(std::ostream &os)
+{
+ if (queue.get()) {
+ os << "SMP disk I/O queues:\n";
+ queue->stat<IpcIoMsg>(os);
+ }
+}
+
/// handles open request timeout
void
IpcIoFile::OpenTimeout(void *const param)
start.tv_usec = 0;
}
+void
+IpcIoMsg::stat(std::ostream &os)
+{
+ timeval elapsedTime;
+ tvSub(elapsedTime, start, current_time);
+ os << "id: " << requestId <<
+ ", offset: " << offset <<
+ ", size: " << len <<
+ ", page: " << page <<
+ ", command: " << command <<
+ ", start: " << start <<
+ ", elapsed: " << elapsedTime <<
+ ", errno: " << xerrno;
+}
+
/* IpcIoPendingRequest */
IpcIoPendingRequest::IpcIoPendingRequest(const IpcIoFile::Pointer &aFile):
} // namespace IpcIo
+std::ostream &operator <<(std::ostream &, IpcIo::Command);
+
/// converts DiskIO requests to IPC queue messages
class IpcIoMsg
{
public:
IpcIoMsg();
+ /// prints message parameters; suitable for cache manager reports
+ void stat(std::ostream &);
+
public:
unsigned int requestId; ///< unique for requestor; matches request w/ response
/// handle queue push notifications from worker or disker
static void HandleNotification(const Ipc::TypedMsgHdr &msg);
+ /// prints IPC message queue state; suitable for cache manager reports
+ static void StatQueue(std::ostream &);
+
DiskFile::Config config; ///< supported configuration options
protected:
ip/libip.la \
fs/libfs.la \
mgr/libmgr.la \
- ipc/libipc.la \
anyp/libanyp.la \
mem/libmem.la \
store/libstore.la \
sbuf/libsbuf.la \
DiskIO/libdiskio.la \
+ ipc/libipc.la \
$(top_builddir)/lib/libmisccontainers.la \
$(top_builddir)/lib/libmiscencoding.la \
$(top_builddir)/lib/libmiscutil.la \
event.cc \
tests/stub_external_acl.cc \
tests/stub_fatal.cc \
+ tests/stub_IpcIoFile.cc \
fatal.h \
fd.cc \
fd.h \
tests/stub_libsecurity.cc \
tests/stub_libstore.cc \
tests/stub_main_cc.cc \
+ tests/stub_IpcIoFile.cc \
mem_node.cc \
mime.cc \
mime.h \
tests/stub_libsecurity.cc \
tests/stub_libstore.cc \
tests/stub_main_cc.cc \
+ tests/stub_IpcIoFile.cc \
mem_node.cc \
mime.cc \
mime.h \
tests/stub_libsecurity.cc \
tests/stub_libstore.cc \
tests/stub_main_cc.cc \
+ tests/stub_IpcIoFile.cc \
mem_node.cc \
mime.cc \
mime.h \
debugs(20, DBG_IMPORTANT, "MemObject->data.origin_offset: " << (data_hdr.head ? data_hdr.head->nodeBuffer.offset : 0));
#endif
- debugs(20, DBG_IMPORTANT, "MemObject->start_ping: " << start_ping.tv_sec << "."<< std::setfill('0') << std::setw(6) << start_ping.tv_usec);
+ debugs(20, DBG_IMPORTANT, "MemObject->start_ping: " << start_ping);
debugs(20, DBG_IMPORTANT, "MemObject->inmem_hi: " << data_hdr.endOffset());
debugs(20, DBG_IMPORTANT, "MemObject->inmem_lo: " << inmem_lo);
debugs(20, DBG_IMPORTANT, "MemObject->nclients: " << nclients);
#include "rfc1123.h"
#include <ctime>
+#include <iosfwd>
/* NP: sys/time.h is provided by libcompat */
/* Use uint64_t to store milliseconds */
return !(a != b);
}
+/// prints <seconds>.<microseconds>
+std::ostream &operator <<(std::ostream &, const timeval &);
+
namespace Time
{
void
Icmp::Log(const Ip::Address &addr, const uint8_t type, const char* pkt_str, const int rtt, const int hops)
{
- debugs(42, 2, "pingerLog: " << std::setw(9) << current_time.tv_sec <<
- "." << std::setfill('0') << std::setw(6) <<
- current_time.tv_usec << " " << std::left << std::setfill(' ') <<
+ debugs(42, 2, "pingerLog: " << current_time << " " << std::left <<
std::setw(45) << addr << " " << type <<
" " << std::setw(15) << pkt_str << " " << rtt <<
"ms " << hops << " hops");
return sizeof(OneToOneUniQueue) + maxItemSize * size;
}
+/// start state reporting (by reporting queue parameters)
+/// The labels reflect whether the caller owns theIn or theOut data member and,
+/// hence, cannot report the other value reliably.
+void
+Ipc::OneToOneUniQueue::statOpen(std::ostream &os, const char *inLabel, const char *outLabel, const uint32_t count) const
+{
+ os << "{ size: " << count <<
+ ", capacity: " << theCapacity <<
+ ", " << inLabel << ": " << theIn <<
+ ", " << outLabel << ": " << theOut;
+}
+
+/// end state reporting started by statOpen()
+void
+Ipc::OneToOneUniQueue::statClose(std::ostream &os) const
+{
+ os << "}\n";
+}
+
/* OneToOneUniQueues */
Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
#include "ipc/mem/Pointer.h"
#include "util.h"
+#include <algorithm>
#include <atomic>
class String;
/// returns true iff the value was set; the value may be stale!
template<class Value> bool peek(Value &value) const;
+ /// prints incoming queue state; suitable for cache manager reports
+ template<class Value> void statIn(std::ostream &, int localProcessId, int remoteProcessId) const;
+ /// prints outgoing queue state; suitable for cache manager reports
+ template<class Value> void statOut(std::ostream &, int localProcessId, int remoteProcessId) const;
+
private:
+ void statOpen(std::ostream &, const char *inLabel, const char *outLabel, uint32_t count) const;
+ void statClose(std::ostream &) const;
+ template<class Value> void statSamples(std::ostream &, unsigned int start, uint32_t size) const;
+ template<class Value> void statRange(std::ostream &, unsigned int start, uint32_t n) const;
- unsigned int theIn; ///< input index, used only in push()
- unsigned int theOut; ///< output index, used only in pop()
+ // optimization: these non-std::atomic data members are in shared memory,
+ // but each is used only by one process (aside from obscured reporting)
+ unsigned int theIn; ///< current push() position; reporting aside, used only in push()
+ unsigned int theOut; ///< current pop() position; reporting aside, used only in pop()/peek()
std::atomic<uint32_t> theSize; ///< number of items in the queue
const unsigned int theMaxItemSize; ///< maximum item size
/// peeks at the item likely to be pop()ed next
template<class Value> bool peek(int &remoteProcessId, Value &value) const;
+ /// prints current state; suitable for cache manager reports
+ template<class Value> void stat(std::ostream &) const;
+
/// returns local reader's balance
QueueReader::Balance &localBalance() { return localReader().balance; }
return wasEmpty && (!reader || reader->raiseSignal());
}
+template <class Value>
+void
+OneToOneUniQueue::statIn(std::ostream &os, const int localProcessId, const int remoteProcessId) const
+{
+ os << " kid" << localProcessId << " receiving from kid" << remoteProcessId << ": ";
+ // Nobody can modify our theOut so, after capturing some valid theSize value
+ // in count, we can reliably report all [theOut, theOut+count) items that
+ // were queued at theSize capturing time. We will miss new items push()ed by
+ // the other side, but it is OK -- we report state at the capturing time.
+ const auto count = theSize.load();
+ statOpen(os, "other", "popIndex", count);
+ statSamples<Value>(os, theOut, count);
+ statClose(os);
+}
+
+template <class Value>
+void
+OneToOneUniQueue::statOut(std::ostream &os, const int localProcessId, const int remoteProcessId) const
+{
+ os << " kid" << localProcessId << " sending to kid" << remoteProcessId << ": ";
+ // Nobody can modify our theIn so, after capturing some valid theSize value
+ // in count, we can reliably report all [theIn-count, theIn) items that were
+ // queued at theSize capturing time. We may report items already pop()ed by
+ // the other side, but that is OK because pop() does not modify items -- it
+ // only increments theOut.
+ const auto count = theSize.load();
+ statOpen(os, "pushIndex", "other", count);
+ statSamples<Value>(os, theIn - count, count); // unsigned offset underflow OK
+ statClose(os);
+}
+
+/// report a sample of [start, start + size) items
+template <class Value>
+void
+OneToOneUniQueue::statSamples(std::ostream &os, const unsigned int start, const uint32_t count) const
+{
+ if (!count) {
+ os << " ";
+ return;
+ }
+
+ os << ", items: [\n";
+ // report a few leading and trailing items, without repetitions
+ const auto sampleSize = std::min(3U, count); // leading (and max) sample
+ statRange<Value>(os, start, sampleSize);
+ if (sampleSize < count) { // the first sample did not show some items
+ // The `start` offset aside, the first sample reported all items
+ // below the sampleSize offset. The second sample needs to report
+ // the last sampleSize items (i.e. starting at count-sampleSize
+ // offset) except those already reported by the first sample.
+ const auto secondSampleOffset = std::max(sampleSize, count - sampleSize);
+ const auto secondSampleSize = std::min(sampleSize, count - sampleSize);
+
+ // but first we print a sample separator, unless there are no items
+ // between the samples or the separator hides the only unsampled item
+ const auto bothSamples = sampleSize + secondSampleSize;
+ if (bothSamples + 1U == count)
+ statRange<Value>(os, start + sampleSize, 1);
+ else if (count > bothSamples)
+ os << " # ... " << (count - bothSamples) << " items not shown ...\n";
+
+ statRange<Value>(os, start + secondSampleOffset, secondSampleSize);
+ }
+ os << " ]";
+}
+
+/// statSamples() helper that reports n items from start
+template <class Value>
+void
+OneToOneUniQueue::statRange(std::ostream &os, const unsigned int start, const uint32_t n) const
+{
+ assert(sizeof(Value) <= theMaxItemSize);
+ auto offset = start;
+ for (uint32_t i = 0; i < n; ++i) {
+ // XXX: Throughout this C++ header, these overflow wrapping tricks work
+ // only because theCapacity currently happens to be a power of 2 (e.g.,
+ // the highest offset (0xF...FFF) % 3 is 0 and so is the next offset).
+ const auto pos = (offset++ % theCapacity) * theMaxItemSize;
+ Value value;
+ memcpy(&value, theBuffer + pos, sizeof(value));
+ os << " { ";
+ value.stat(os);
+ os << " },\n";
+ }
+}
+
// OneToOneUniQueues
inline OneToOneUniQueue &
return false; // most likely, no process had anything to pop
}
+template <class Value>
+void
+BaseMultiQueue::stat(std::ostream &os) const
+{
+ for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
+ const auto &queue = inQueue(processId);
+ queue.statIn<Value>(os, theLocalProcessId, processId);
+ }
+
+ os << "\n";
+
+ for (int processId = remotesIdOffset(); processId < remotesIdOffset() + remotesCount(); ++processId) {
+ const auto &queue = outQueue(processId);
+ queue.statOut<Value>(os, theLocalProcessId, processId);
+ }
+}
+
// FewToFewBiQueue
template <class Value>
void
HierarchyLogEntry::stopPeerClock(const bool force)
{
- debugs(46, 5, "First connection started: " << firstConnStart_.tv_sec << "." <<
- std::setfill('0') << std::setw(6) << firstConnStart_.tv_usec <<
+ debugs(46, 5, "First connection started: " << firstConnStart_ <<
", current total response time value: " << (totalResponseTime_.tv_sec * 1000 + totalResponseTime_.tv_usec/1000) <<
(force ? ", force fixing" : ""));
if (!force && totalResponseTime_.tv_sec != -1)
#include "squid.h"
#include "base/AsyncCbdataCalls.h"
+#include "base/PackableStream.h"
#include "base/TextException.h"
#include "CacheDigest.h"
#include "CacheManager.h"
+#include "CollapsedForwarding.h"
#include "comm/Connection.h"
#include "comm/Read.h"
+#if HAVE_DISKIO_MODULE_IPCIO
+#include "DiskIO/IpcIo/IpcIoFile.h"
+#endif
#include "ETag.h"
#include "event.h"
#include "fde.h"
Root().stat(*output);
}
+/// reports the current state of Store-related queues
+static void
+StatQueues(StoreEntry *e)
+{
+ assert(e);
+ PackableStream stream(*e);
+ CollapsedForwarding::StatQueue(stream);
+ #if HAVE_DISKIO_MODULE_IPCIO
+ stream << "\n";
+ IpcIoFile::StatQueue(stream);
+ #endif
+ stream.flush();
+}
+
// XXX: new/delete operators need to be replaced with MEMPROXY_CLASS
// definitions but doing so exposes bug 4370, and maybe 4354 and 4355
void *
Mgr::RegisterAction("store_io", "Store IO Interface Stats", &Mgr::StoreIoAction::Create, 0, 1);
Mgr::RegisterAction("store_check_cachable_stats", "storeCheckCachable() Stats",
storeCheckCachableStats, 0, 1);
+ Mgr::RegisterAction("store_queues", "SMP Transients and Caching Queues", StatQueues, 0, 1);
}
void
void CollapsedForwarding::Broadcast(StoreEntry const&, const bool) STUB
void CollapsedForwarding::Broadcast(const sfileno, const bool) STUB
+void CollapsedForwarding::StatQueue(std::ostream &) STUB
--- /dev/null
+/*
+ * Copyright (C) 1996-2020 The Squid Software Foundation and contributors
+ *
+ * Squid software is distributed under GPLv2+ license and includes
+ * contributions from numerous individuals and organizations.
+ * Please see the COPYING and CONTRIBUTORS files for details.
+ */
+
+#include "squid.h"
+
+#if HAVE_DISKIO_MODULE_IPCIO
+#include "DiskIO/IpcIo/IpcIoFile.h"
+
+#define STUB_API "DiskIO/IocIo/IpcIoFile.cc"
+
+#include "tests/STUB.h"
+
+void IpcIoFile::StatQueue(std::ostream &) STUB
+
+#endif /* HAVE_DISKIO_MODULE_IPCIO */
+
void TimeEngine::tick() STUB
TimeEngine::~TimeEngine() {STUB_NOP}
+std::ostream &operator <<(std::ostream &os, const timeval &) STUB_RETVAL(os)
+
#include "squid.h"
#include "SquidTime.h"
+#include <iomanip>
+#include <ostream>
+
struct timeval current_time;
double current_dtime;
time_t squid_curtime = 0;
getCurrentTime();
}
+std::ostream &
+operator <<(std::ostream &os, const timeval &t)
+{
+ os << t.tv_sec << ".";
+ const auto savedFill = os.fill('0');
+ os << std::setw(6) << t.tv_usec;
+ os.fill(savedFill);
+ return os;
+}
+
const char *
Time::FormatStrf(time_t t)
{