From d2a6dcba707c15484c255e7a569b90f7f1186383 Mon Sep 17 00:00:00 2001 From: Eduard Bagdasaryan Date: Mon, 18 Jun 2018 16:51:34 +0000 Subject: [PATCH] Identify collapsed transactions (#213) Added a new CF tag to the Squid request status %Ss access log field. This tag marks transactions that have waited for a CF initiator transaction. This wait may happen in two cases (or their combination): 1. Classic collapsing: A client request gets collapsed on arrival (e.g., TCP_CF_HIT or TCP_CF_MISS). 2. Collapsed revalidation: An internal revalidation request is collapsed (e.g., TCP_CF_REFRESH_MODIFIED). A CF tag approach is simple but the resulting access.log records cannot distinguish some cases. For example, a pure collapsed revalidation transaction (case 2) cannot be distinguished from these transactions: * a collapsed client that got collapsed on revalidation (case 1+2); * a collapsed client that initiated revalidation. We may want to log more collapsing details in the future. These changes do not affect CF initiating code. In order to track collapsed transactions, a new CollapsingHistory class was introduced. Since more and more non-logging code relies on ALE, this history is kept in ALE. ClientHttpRequest uses its logType field instead of the LogTags in ALE, so we also use logType for storing ClientHttpRequest's CollapsedHistory. Eventually, ClientHttpRequest should eliminate logType in favor of direct ALE use. Also: ICP code fixing/refactoring: * htcpSyncAle() and icpSyncAle() should not require the caller to supply correct LogTags because callers like fillChecklist() do not have access to that information (it is not stored in the transaction object unlike the other pieces of info that these functions copy to ALE). * Added icpUdpData::ale to preserve master transaction info when messages are queued. Several icpUdpData improvements were triggered by this change because ale is a (second!) non-POD member and icpUdpData was mistreated as a POD. They include: - Removed icpUdpData::start as unused. - Removed icpUdpData::len as set but otherwise unused. - Removed icpUdpData::logcode as essentially duplicating msg->opcode. * Update ICP ALE, if any, as soon as the transaction tags become known (instead of sometimes waiting for the ICP message to be logged). The ICP message may be dropped and/or never be logged, but we should keep ALE up to date because it is used in an increasingly many contexts. Also found and marked an ICP memory leak. It is best to fix that in a dedicated commit. Also supplied URN code with ALE. Full-featured Client-based classes already use ALE. We have not tested with URNs, but these changes may improve logging of transactions that involve URN resolution. Also fixed problematic StoreEntry::collapsingInitiator(). It could return true if the entry had transients but had nothing to do with collapsing. It also incorrectly assumed that a collapsed entry is always marked with ENTRY_FWD_HDR_WAIT. That assumption is wrong because Controller::allowCollapsing() does not set this flag for the entry. We did not find a better way to track StoreEntry objects associated with CF initiators than to add a new StoreEntry flag. Hitting an entry flagged with ENTRY_REQUIRES_COLLAPSING requires collapsing the request. --- src/AccessLogEntry.h | 2 +- src/CollapsingHistory.h | 28 +++++++++ src/FwdState.cc | 2 +- src/ICP.h | 34 ++--------- src/LogTags.cc | 23 +++++++- src/LogTags.h | 16 +++-- src/Makefile.am | 4 ++ src/Store.h | 8 ++- src/StoreClient.h | 11 +++- src/Transients.cc | 23 +++++++- src/Transients.h | 18 ++++-- src/acl/Asn.cc | 1 + src/adaptation/icap/History.cc | 1 - src/client_side_reply.cc | 94 ++++++++++++++++------------- src/client_side_reply.h | 3 + src/client_side_request.cc | 6 +- src/enums.h | 5 +- src/htcp.cc | 48 ++++++++++----- src/icp_v2.cc | 105 +++++++++++++++++++++++---------- src/icp_v3.cc | 2 +- src/ipc/StoreMap.cc | 5 ++ src/mime.cc | 3 +- src/neighbors.cc | 17 +++--- src/peer_digest.cc | 1 + src/refresh.cc | 1 + src/store.cc | 21 ++++--- src/store/Controller.cc | 33 +++++++++-- src/store/Controller.h | 3 + src/store_client.cc | 23 ++++++-- src/tests/stub_icp.cc | 5 +- src/tests/stub_store.cc | 1 + src/tunnel.cc | 4 +- src/urn.cc | 28 +++++---- src/urn.h | 6 +- 34 files changed, 393 insertions(+), 192 deletions(-) create mode 100644 src/CollapsingHistory.h diff --git a/src/AccessLogEntry.h b/src/AccessLogEntry.h index 53850e8868..3e436c0364 100644 --- a/src/AccessLogEntry.h +++ b/src/AccessLogEntry.h @@ -137,7 +137,7 @@ public: Ip::Address caddr; int64_t highOffset = 0; int64_t objectSize = 0; - LogTags code = LOG_TAG_NONE; + LogTags code; struct timeval start_time; ///< The time the master transaction started struct timeval trTime; ///< The response time const char *rfc931 = nullptr; diff --git a/src/CollapsingHistory.h b/src/CollapsingHistory.h new file mode 100644 index 0000000000..4fc42a0c98 --- /dev/null +++ b/src/CollapsingHistory.h @@ -0,0 +1,28 @@ +/* + * Copyright (C) 1996-2018 The Squid Software Foundation and contributors + * + * Squid software is distributed under GPLv2+ license and includes + * contributions from numerous individuals and organizations. + * Please see the COPYING and CONTRIBUTORS files for details. + */ + +#ifndef SQUID_COLLAPSING_HISTORY_H +#define SQUID_COLLAPSING_HISTORY_H + +/// collapsed forwarding history of a master transaction +class CollapsingHistory +{ +public: + /// whether at least one request was collapsed + bool collapsed() const { return revalidationCollapses || otherCollapses; } + + /* These stats count collapsing decisions, regardless of their outcome. */ + + /// the total number of collapsed internal revalidation requests + int revalidationCollapses = 0; + /// the total number of all other (a.k.a. "classic") collapsed requests + int otherCollapses = 0; +}; + +#endif /* SQUID_COLLAPSING_HISTORY_H */ + diff --git a/src/FwdState.cc b/src/FwdState.cc index f692a114a7..5a785e3663 100644 --- a/src/FwdState.cc +++ b/src/FwdState.cc @@ -378,7 +378,7 @@ FwdState::Start(const Comm::ConnectionPointer &clientConn, StoreEntry *entry, Ht return; case AnyP::PROTO_URN: - urnStart(request, entry); + urnStart(request, entry, al); return; default: diff --git a/src/ICP.h b/src/ICP.h index e51ea13404..2845134308 100644 --- a/src/ICP.h +++ b/src/ICP.h @@ -68,9 +68,6 @@ public: ICPState(icp_common_t &aHeader, HttpRequest *aRequest); virtual ~ICPState(); - /// whether the found entry warrants an ICP_HIT response - bool foundHit(const StoreEntry &) const; - icp_common_t header; HttpRequest *request; int fd; @@ -80,27 +77,13 @@ public: protected: /* StoreClient API */ + virtual LogTags *loggingTags() override; virtual void fillChecklist(ACLFilledChecklist &) const override; - mutable AccessLogEntryPointer al; -}; - -/// \ingroup ServerProtocolICPAPI -struct icpUdpData { - - /// IP address for the remote end. Because we reply to packets from unknown non-peers. - Ip::Address address; - - void *msg; - size_t len; - icpUdpData *next; -#ifndef LESS_TIMING - - struct timeval start; -#endif - LogTags logcode; + /// either confirms and starts processing a cache hit or returns false + bool confirmAndPrepHit(const StoreEntry &); - struct timeval queue_time; + mutable AccessLogEntryPointer al; }; extern Comm::ConnectionPointer icpIncomingConn; @@ -119,21 +102,12 @@ void icpCreateAndSend(icp_opcode, int flags, char const *url, int reqnum, int pa /// \ingroup ServerProtocolICPAPI icp_opcode icpGetCommonOpcode(); -/// \ingroup ServerProtocolICPAPI -int icpUdpSend(int, const Ip::Address &, icp_common_t *, const LogTags &, int, AccessLogEntryPointer); - -/// \ingroup ServerProtocolICPAPI -LogTags icpLogFromICPCode(icp_opcode opcode); - /// \ingroup ServerProtocolICPAPI void icpDenyAccess(Ip::Address &from, char *url, int reqnum, int fd); /// \ingroup ServerProtocolICPAPI PF icpHandleUdp; -/// \ingroup ServerProtocolICPAPI -PF icpUdpSendQueue; - /// \ingroup ServerProtocolICPAPI void icpHandleIcpV3(int, Ip::Address &, char *, int); diff --git a/src/LogTags.cc b/src/LogTags.cc index 0912eef548..d8878292e3 100644 --- a/src/LogTags.cc +++ b/src/LogTags.cc @@ -7,6 +7,7 @@ */ #include "squid.h" +#include "Debug.h" #include "LogTags.h" // old deprecated tag strings @@ -39,6 +40,14 @@ const char * LogTags::Str_[] = { "TYPE_MAX" }; +void +LogTags::update(const LogTags_ot t) +{ + assert(t < LOG_TYPE_MAX); + debugs(83, 7, Str_[oldType] << " to " << Str_[t]); + oldType = t; +} + /* * This method is documented in http://wiki.squid-cache.org/SquidFaq/SquidLogs#Squid_result_codes * Please keep the wiki up to date @@ -51,11 +60,21 @@ LogTags::c_str() const int pos = 0; // source tags - if (oldType && oldType < LOG_TYPE_MAX) - pos += snprintf(buf, sizeof(buf), "%s",Str_[oldType]); + const int protoLen = 3; + if (oldType && oldType < LOG_TYPE_MAX) { + assert(Str_[oldType][protoLen] == '_'); + snprintf(buf, protoLen + 1, "%s", Str_[oldType]); + pos += protoLen; + } else pos += snprintf(buf, sizeof(buf), "NONE"); + if (collapsingHistory.collapsed()) + pos += snprintf(buf + pos, sizeof(buf) - pos, "_CF"); + + const char *tag = Str_[oldType] + protoLen; + pos += snprintf(buf + pos, sizeof(buf) - pos, "%s", tag); + if (err.ignored) pos += snprintf(buf+pos,sizeof(buf)-pos, "_IGNORED"); diff --git a/src/LogTags.h b/src/LogTags.h index 4766637607..29ff99635a 100644 --- a/src/LogTags.h +++ b/src/LogTags.h @@ -9,6 +9,8 @@ #ifndef SQUID_SRC_LOGTAGS_H #define SQUID_SRC_LOGTAGS_H +#include "CollapsingHistory.h" + /** Squid transaction result code/tag set. * * These codes indicate how the request was received @@ -49,10 +51,10 @@ typedef enum { class LogTags { public: - LogTags(LogTags_ot t) : oldType(t) {assert(oldType < LOG_TYPE_MAX);} - // XXX: this operator does not reset flags - // TODO: either replace with a category-only setter or remove - LogTags &operator =(const LogTags_ot &t) {assert(t < LOG_TYPE_MAX); oldType = t; return *this;} + LogTags() = default; + explicit LogTags(const LogTags_ot t) { update(t); } + + void update(const LogTags_ot t); /// compute the status access.log field const char *c_str() const; @@ -78,8 +80,10 @@ private: public: // XXX: only until client_db.cc stats are redesigned. - // deprecated LogTag enum value - LogTags_ot oldType; + /// a set of client protocol, cache use, and other transaction outcome tags + LogTags_ot oldType = LOG_TAG_NONE; + /// controls CF tag presence + CollapsingHistory collapsingHistory; }; /// iterator for LogTags_ot enumeration diff --git a/src/Makefile.am b/src/Makefile.am index b3007da9b0..9511f470bc 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -256,6 +256,7 @@ squid_SOURCES = \ clientStreamForward.h \ CollapsedForwarding.cc \ CollapsedForwarding.h \ + CollapsingHistory.h \ CommandLine.cc \ CommandLine.h \ CompletionDispatcher.cc \ @@ -1467,6 +1468,7 @@ tests_testDiskIO_SOURCES = \ HttpReply.cc \ int.h \ int.cc \ + LogTags.cc \ MasterXaction.cc \ MasterXaction.h \ MemBuf.cc \ @@ -2935,6 +2937,7 @@ tests_testUfs_SOURCES = \ HttpReply.cc \ int.h \ int.cc \ + LogTags.cc \ RequestFlags.h \ RequestFlags.cc \ Transients.cc \ @@ -3119,6 +3122,7 @@ tests_testRock_SOURCES = \ HttpReply.cc \ int.h \ int.cc \ + LogTags.cc \ MasterXaction.cc \ MasterXaction.h \ MemBuf.cc \ diff --git a/src/Store.h b/src/Store.h index b65ce68911..bf20ae7f50 100644 --- a/src/Store.h +++ b/src/Store.h @@ -188,9 +188,11 @@ public: /// whether there is a corresponding locked shared memory table entry bool hasMemStore() const { return mem_obj && mem_obj->memCache.index >= 0; } - /// whether this is a collapsed forwarding-created public entry that still - /// has not received its response headers; new requests may collapse on it - bool collapsingInitiator() const; + /// whether this entry can feed collapsed requests and only them + bool hittingRequiresCollapsing() const { return EBIT_TEST(flags, ENTRY_REQUIRES_COLLAPSING); } + + /// allow or forbid collapsed requests feeding + void setCollapsingRequirement(const bool required); MemObject *mem_obj; RemovalPolicyNode repl; diff --git a/src/StoreClient.h b/src/StoreClient.h index 8c9f093837..2d4f92953c 100644 --- a/src/StoreClient.h +++ b/src/StoreClient.h @@ -17,6 +17,7 @@ typedef void STCB(void *, StoreIOBuffer); /* store callback */ class StoreEntry; class ACLFilledChecklist; +class LogTags; /// A StoreEntry::getPublic*() caller. class StoreClient @@ -31,17 +32,23 @@ public: /// An isNull() entry indicates a cache miss. virtual void created(StoreEntry *) = 0; + /// \return LogTags (if the class logs transactions) or nil (otherwise) + virtual LogTags *loggingTags() = 0; + protected: /// configure the ACL checklist with the current transaction state virtual void fillChecklist(ACLFilledChecklist &) const = 0; + /// \returns whether the caller must collapse on the given entry + /// Before returning true, updates common collapsing-related stats. + /// See also: StoreEntry::hittingRequiresCollapsing(). + bool startCollapsingOn(const StoreEntry &, const bool doingRevalidation); + // These methods only interpret Squid configuration. Their allowances are // provisional -- other factors may prevent collapsed forwarding. The first // two exist primarily to distinguish two major CF cases in callers code. /// whether Squid configuration allows us to become a CF initiator bool mayInitiateCollapsing() const { return onCollapsingPath(); } - /// whether Squid configuration allows collapsing on the initiatorEntry - bool mayCollapseOn(const StoreEntry &initiatorEntry) const; /// whether Squid configuration allows collapsing for this transaction bool onCollapsingPath() const; }; diff --git a/src/Transients.cc b/src/Transients.cc index 0fe09ed9bb..f79710ad79 100644 --- a/src/Transients.cc +++ b/src/Transients.cc @@ -166,6 +166,8 @@ Transients::get(const cache_key *key) e->mem_obj->xitTable.index = index; e->mem_obj->xitTable.io = Store::ioReading; anchor->exportInto(*e); + const bool collapsingRequired = EBIT_TEST(anchor->basics.flags, ENTRY_REQUIRES_COLLAPSING); + e->setCollapsingRequirement(collapsingRequired); // keep read lock to receive updates from others return e; } @@ -186,6 +188,20 @@ Transients::findCollapsed(const sfileno index) return NULL; } +void +Transients::clearCollapsingRequirement(const StoreEntry &e) +{ + assert(map); + assert(e.hasTransients()); + assert(isWriter(e)); + const auto idx = e.mem_obj->xitTable.index; + auto &anchor = map->writeableEntry(idx); + if (EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING)) { + EBIT_CLR(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING); + CollapsedForwarding::Broadcast(e); + } +} + void Transients::monitorIo(StoreEntry *e, const cache_key *key, const Store::IoStatus direction) { @@ -242,15 +258,16 @@ Transients::noteFreeMapSlice(const Ipc::StoreMapSliceId) } void -Transients::status(const StoreEntry &entry, bool &aborted, bool &waitingToBeFreed) const +Transients::status(const StoreEntry &entry, Transients::EntryStatus &entryStatus) const { assert(map); assert(entry.hasTransients()); const auto idx = entry.mem_obj->xitTable.index; const auto &anchor = isWriter(entry) ? map->writeableEntry(idx) : map->readableEntry(idx); - aborted = anchor.writerHalted; - waitingToBeFreed = anchor.waitingToBeFreed; + entryStatus.abortedByWriter = anchor.writerHalted; + entryStatus.waitingToBeFreed = anchor.waitingToBeFreed; + entryStatus.collapsed = EBIT_TEST(anchor.basics.flags, ENTRY_REQUIRES_COLLAPSING); } void diff --git a/src/Transients.h b/src/Transients.h index 8479f20074..7cc1811579 100644 --- a/src/Transients.h +++ b/src/Transients.h @@ -26,12 +26,24 @@ typedef Ipc::StoreMap TransientsMap; class Transients: public Store::Controlled, public Ipc::StoreMapCleaner { public: + /// shared entry metadata, used for synchronization + class EntryStatus + { + public: + bool abortedByWriter = false; ///< whether the entry was aborted + bool waitingToBeFreed = false; ///< whether the entry was marked for deletion + bool collapsed = false; ///< whether the entry allows collapsing + }; + Transients(); virtual ~Transients(); /// return a local, previously collapsed entry StoreEntry *findCollapsed(const sfileno xitIndex); + /// removes collapsing requirement (for future hits) + void clearCollapsingRequirement(const StoreEntry &e); + /// start listening for remote DELETE requests targeting either a complete /// StoreEntry (ioReading) or a being-formed miss StoreEntry (ioWriting) void monitorIo(StoreEntry*, const cache_key*, const Store::IoStatus); @@ -39,10 +51,8 @@ public: /// called when the in-transit entry has been successfully cached void completeWriting(const StoreEntry &e); - /// copies current shared entry metadata into parameters - /// \param aborted whether the entry was aborted - /// \param waitingToBeFreed whether the entry was marked for deletion - void status(const StoreEntry &e, bool &aborted, bool &waitingToBeFreed) const; + /// copies current shared entry metadata into entryStatus + void status(const StoreEntry &e, EntryStatus &entryStatus) const; /// number of entry readers some time ago int readers(const StoreEntry &e) const; diff --git a/src/acl/Asn.cc b/src/acl/Asn.cc index a40941c223..fdeca2834f 100644 --- a/src/acl/Asn.cc +++ b/src/acl/Asn.cc @@ -246,6 +246,7 @@ asnCacheStart(int as) asState->request = HttpRequest::FromUrl(asres, mx); assert(asState->request != NULL); + // XXX: Missing a hittingRequiresCollapsing() && startCollapsingOn() check. if ((e = storeGetPublic(asres, Http::METHOD_GET)) == NULL) { e = storeCreateEntry(asres, asres, RequestFlags(), Http::METHOD_GET); asState->sc = storeClientListAdd(e, asState); diff --git a/src/adaptation/icap/History.cc b/src/adaptation/icap/History.cc index 9f6262d059..91c0ca9eca 100644 --- a/src/adaptation/icap/History.cc +++ b/src/adaptation/icap/History.cc @@ -13,7 +13,6 @@ #include "SquidTime.h" Adaptation::Icap::History::History(): - logType(LOG_TAG_NONE), req_sz(0), concurrencyLevel(0) { diff --git a/src/client_side_reply.cc b/src/client_side_reply.cc index 2a0c71859e..821dfaad39 100644 --- a/src/client_side_reply.cc +++ b/src/client_side_reply.cc @@ -285,7 +285,7 @@ clientReplyContext::processExpired() return; } - http->logType = LOG_TCP_REFRESH; + http->logType.update(LOG_TCP_REFRESH); http->request->flags.refresh = true; #if STORE_CLIENT_LIST_DEBUG /* Prevent a race with the store client memory free routines @@ -295,6 +295,7 @@ clientReplyContext::processExpired() /* Prepare to make a new temporary request */ saveState(); + // TODO: Consider also allowing regular (non-collapsed) revalidation hits. // TODO: support collapsed revalidation for Vary-controlled entries bool collapsingAllowed = Config.onoff.collapsed_forwarding && !Store::Root().smpAware() && @@ -303,7 +304,7 @@ clientReplyContext::processExpired() StoreEntry *entry = nullptr; if (collapsingAllowed) { if (const auto e = storeGetPublicByRequest(http->request, ksRevalidation)) { - if (e->collapsingInitiator() && mayCollapseOn(*e)) { + if (e->hittingRequiresCollapsing() && startCollapsingOn(*e, true)) { entry = e; entry->lock("clientReplyContext::processExpired#alreadyRevalidating"); } else { @@ -433,7 +434,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result) debugs(88, 3, "CF slave hit private non-shareable " << *http->storeEntry() << ". MISS"); // restore context to meet processMiss() expectations restoreState(); - http->logType = LOG_TCP_MISS; + http->logType.update(LOG_TCP_MISS); processMiss(); return; } @@ -446,7 +447,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result) // request to origin was aborted if (EBIT_TEST(http->storeEntry()->flags, ENTRY_ABORTED)) { debugs(88, 3, "request to origin aborted '" << http->storeEntry()->url() << "', sending old entry to client"); - http->logType = LOG_TCP_REFRESH_FAIL_OLD; + http->logType.update(LOG_TCP_REFRESH_FAIL_OLD); sendClientOldEntry(); } @@ -454,7 +455,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result) // origin replied 304 if (status == Http::scNotModified) { - http->logType = LOG_TCP_REFRESH_UNMODIFIED; + http->logType.update(LOG_TCP_REFRESH_UNMODIFIED); http->request->flags.staleIfHit = false; // old_entry is no longer stale // update headers on existing entry @@ -486,7 +487,7 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result) old_rep->sline.status() << ") to client"); sendClientOldEntry(); } else { - http->logType = LOG_TCP_REFRESH_MODIFIED; + http->logType.update(LOG_TCP_REFRESH_MODIFIED); debugs(88, 3, "origin replied " << status << ", replacing existing entry and forwarding to client"); @@ -499,13 +500,13 @@ clientReplyContext::handleIMSReply(StoreIOBuffer result) // origin replied with an error else if (http->request->flags.failOnValidationError) { - http->logType = LOG_TCP_REFRESH_FAIL_ERR; + http->logType.update(LOG_TCP_REFRESH_FAIL_ERR); debugs(88, 3, "origin replied with error " << status << ", forwarding to client due to fail_on_validation_err"); sendClientUpstreamResponse(); } else { // ignore and let client have old entry - http->logType = LOG_TCP_REFRESH_FAIL_OLD; + http->logType.update(LOG_TCP_REFRESH_FAIL_OLD); debugs(88, 3, "origin replied with error " << status << ", sending old entry (" << old_rep->sline.status() << ") to client"); sendClientOldEntry(); @@ -553,7 +554,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result) } else if (result.flags.error) { /* swap in failure */ debugs(88, 3, "clientCacheHit: swapin failure for " << http->uri); - http->logType = LOG_TCP_SWAPFAIL_MISS; + http->logType.update(LOG_TCP_SWAPFAIL_MISS); removeClientStoreReference(&sc, http); processMiss(); return; @@ -564,7 +565,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result) // happen to regular hits because we are called asynchronously. if (!e->mayStartHitting()) { debugs(88, 3, "unsharable " << *e << ". MISS"); - http->logType = LOG_TCP_MISS; + http->logType.update(LOG_TCP_MISS); processMiss(); return; } @@ -575,7 +576,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result) * object */ /* treat as a miss */ - http->logType = LOG_TCP_MISS; + http->logType.update(LOG_TCP_MISS); processMiss(); return; } @@ -591,7 +592,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result) if (http->request->storeId().cmp(e->mem_obj->storeId()) != 0) { debugs(33, DBG_IMPORTANT, "clientProcessHit: URL mismatch, '" << e->mem_obj->storeId() << "' != '" << http->request->storeId() << "'"); - http->logType = LOG_TCP_MISS; // we lack a more precise LOG_*_MISS code + http->logType.update(LOG_TCP_MISS); // we lack a more precise LOG_*_MISS code processMiss(); return; } @@ -623,7 +624,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result) case VARY_CANCEL: /* varyEvaluateMatch found a object loop. Process as miss */ debugs(88, DBG_IMPORTANT, "clientProcessHit: Vary object loop!"); - http->logType = LOG_TCP_MISS; // we lack a more precise LOG_*_MISS code + http->logType.update(LOG_TCP_MISS); // we lack a more precise LOG_*_MISS code processMiss(); return; } @@ -638,12 +639,12 @@ clientReplyContext::cacheHit(StoreIOBuffer result) if (e->checkNegativeHit() && !r->flags.noCacheHack()) { debugs(88, 5, "negative-HIT"); - http->logType = LOG_TCP_NEGATIVE_HIT; + http->logType.update(LOG_TCP_NEGATIVE_HIT); sendMoreData(result); return; } else if (blockedHit()) { debugs(88, 5, "send_hit forces a MISS"); - http->logType = LOG_TCP_MISS; + http->logType.update(LOG_TCP_MISS); processMiss(); return; } else if (!http->flags.internal && refreshCheckHTTP(e, r)) { @@ -667,7 +668,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result) * modification time. * XXX: BUG 1890 objects without Date do not get one added. */ - http->logType = LOG_TCP_MISS; + http->logType.update(LOG_TCP_MISS); processMiss(); } else if (r->flags.noCache) { debugs(88, 3, "validate HIT object? NO. Client sent CC:no-cache. Do CLIENT_REFRESH_MISS"); @@ -675,7 +676,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result) * This did not match a refresh pattern that overrides no-cache * we should honour the client no-cache header. */ - http->logType = LOG_TCP_CLIENT_REFRESH_MISS; + http->logType.update(LOG_TCP_CLIENT_REFRESH_MISS); processMiss(); } else if (r->url.getScheme() == AnyP::PROTO_HTTP || r->url.getScheme() == AnyP::PROTO_HTTPS) { debugs(88, 3, "validate HIT object? YES."); @@ -690,7 +691,7 @@ clientReplyContext::cacheHit(StoreIOBuffer result) * We don't know how to re-validate other protocols. Handle * them as if the object has expired. */ - http->logType = LOG_TCP_MISS; + http->logType.update(LOG_TCP_MISS); processMiss(); } return; @@ -707,13 +708,13 @@ clientReplyContext::cacheHit(StoreIOBuffer result) #if USE_DELAY_POOLS if (e->store_status != STORE_OK) - http->logType = LOG_TCP_MISS; + http->logType.update(LOG_TCP_MISS); else #endif if (e->mem_status == IN_MEMORY) - http->logType = LOG_TCP_MEM_HIT; + http->logType.update(LOG_TCP_MEM_HIT); else if (Config.onoff.offline) - http->logType = LOG_TCP_OFFLINE_HIT; + http->logType.update(LOG_TCP_OFFLINE_HIT); sendMoreData(result); } @@ -778,7 +779,7 @@ clientReplyContext::processMiss() if (http->redirect.status) { HttpReply *rep = new HttpReply; - http->logType = LOG_TCP_REDIRECT; + http->logType.update(LOG_TCP_REDIRECT); http->storeEntry()->releaseRequest(); rep->redirect(http->redirect.status, http->redirect.location); http->storeEntry()->replaceHttpReply(rep); @@ -821,7 +822,7 @@ clientReplyContext::processConditional(StoreIOBuffer &result) if (e->getReply()->sline.status() != Http::scOkay) { debugs(88, 4, "Reply code " << e->getReply()->sline.status() << " != 200"); - http->logType = LOG_TCP_MISS; + http->logType.update(LOG_TCP_MISS); processMiss(); return true; } @@ -946,6 +947,15 @@ clientReplyContext::created(StoreEntry *newEntry) identifyFoundObject(newEntry); } +LogTags * +clientReplyContext::loggingTags() +{ + // XXX: clientReplyContext code assumes that http cbdata is always valid. + // TODO: Either add cbdataReferenceValid(http) checks in all the relevant + // places, like this one, or remove cbdata protection of the http member. + return &http->logType; +} + void clientReplyContext::purgeFoundGet(StoreEntry *newEntry) { @@ -971,7 +981,7 @@ clientReplyContext::purgeFoundObject(StoreEntry *entry) assert (entry && !entry->isNull()); if (EBIT_TEST(entry->flags, ENTRY_SPECIAL)) { - http->logType = LOG_TCP_DENIED; + http->logType.update(LOG_TCP_DENIED); Ip::Address tmp_noaddr; tmp_noaddr.setNoAddr(); // TODO: make a global const ErrorState *err = clientBuildError(ERR_ACCESS_DENIED, Http::scForbidden, NULL, @@ -991,7 +1001,7 @@ clientReplyContext::purgeFoundObject(StoreEntry *entry) sc = storeClientListAdd(http->storeEntry(), this); - http->logType = LOG_TCP_HIT; + http->logType.update(LOG_TCP_HIT); reqofs = 0; @@ -1012,7 +1022,7 @@ clientReplyContext::purgeRequest() Config2.onoff.enable_purge); if (!Config2.onoff.enable_purge) { - http->logType = LOG_TCP_DENIED; + http->logType.update(LOG_TCP_DENIED); Ip::Address tmp_noaddr; tmp_noaddr.setNoAddr(); ErrorState *err = clientBuildError(ERR_ACCESS_DENIED, Http::scForbidden, NULL, @@ -1033,7 +1043,7 @@ clientReplyContext::purgeRequest() void clientReplyContext::purgeDoMissPurge() { - http->logType = LOG_TCP_MISS; + http->logType.update(LOG_TCP_MISS); lookingforstore = 3; StoreEntry::getPublicByRequestMethod(this,http->request, Http::METHOD_GET); } @@ -1718,7 +1728,7 @@ clientReplyContext::identifyFoundObject(StoreEntry *newEntry) if (NULL == http->storeEntry()) { /** \li If no StoreEntry object is current assume this object isn't in the cache set MISS*/ debugs(85, 3, "StoreEntry is NULL - MISS"); - http->logType = LOG_TCP_MISS; + http->logType.update(LOG_TCP_MISS); doGetMoreData(); return; } @@ -1726,7 +1736,7 @@ clientReplyContext::identifyFoundObject(StoreEntry *newEntry) if (Config.onoff.offline) { /** \li If we are running in offline mode set to HIT */ debugs(85, 3, "offline HIT " << *e); - http->logType = LOG_TCP_HIT; + http->logType.update(LOG_TCP_HIT); doGetMoreData(); return; } @@ -1735,7 +1745,7 @@ clientReplyContext::identifyFoundObject(StoreEntry *newEntry) /** \li If redirection status is True force this to be a MISS */ debugs(85, 3, "REDIRECT status forced StoreEntry to NULL (no body on 3XX responses) " << *e); forgetHit(); - http->logType = LOG_TCP_REDIRECT; + http->logType.update(LOG_TCP_REDIRECT); doGetMoreData(); return; } @@ -1743,7 +1753,7 @@ clientReplyContext::identifyFoundObject(StoreEntry *newEntry) if (!e->validToSend()) { debugs(85, 3, "!storeEntryValidToSend MISS " << *e); forgetHit(); - http->logType = LOG_TCP_MISS; + http->logType.update(LOG_TCP_MISS); doGetMoreData(); return; } @@ -1751,7 +1761,7 @@ clientReplyContext::identifyFoundObject(StoreEntry *newEntry) if (EBIT_TEST(e->flags, ENTRY_SPECIAL)) { /* \li Special entries are always hits, no matter what the client says */ debugs(85, 3, "ENTRY_SPECIAL HIT " << *e); - http->logType = LOG_TCP_HIT; + http->logType.update(LOG_TCP_HIT); doGetMoreData(); return; } @@ -1759,21 +1769,21 @@ clientReplyContext::identifyFoundObject(StoreEntry *newEntry) if (r->flags.noCache) { debugs(85, 3, "no-cache REFRESH MISS " << *e); forgetHit(); - http->logType = LOG_TCP_CLIENT_REFRESH_MISS; + http->logType.update(LOG_TCP_CLIENT_REFRESH_MISS); doGetMoreData(); return; } - if (e->collapsingInitiator() && !mayCollapseOn(*e)) { + if (e->hittingRequiresCollapsing() && !startCollapsingOn(*e, false)) { debugs(85, 3, "prohibited CF MISS " << *e); forgetHit(); - http->logType = LOG_TCP_MISS; + http->logType.update(LOG_TCP_MISS); doGetMoreData(); return; } debugs(85, 3, "default HIT " << *e); - http->logType = LOG_TCP_HIT; + http->logType.update(LOG_TCP_HIT); doGetMoreData(); } @@ -1829,7 +1839,7 @@ clientGetMoreData(clientStreamNode * aNode, ClientHttpRequest * http) } /* continue forwarding, not finished yet. */ - http->logType = LOG_TCP_MISS; + http->logType.update(LOG_TCP_MISS); context->doGetMoreData(); } else @@ -1965,7 +1975,7 @@ clientReplyContext::sendBodyTooLargeError() { Ip::Address tmp_noaddr; tmp_noaddr.setNoAddr(); // TODO: make a global const - http->logType = LOG_TCP_DENIED_REPLY; + http->logType.update(LOG_TCP_DENIED_REPLY); ErrorState *err = clientBuildError(ERR_TOO_BIG, Http::scForbidden, NULL, http->getConn() != NULL ? http->getConn()->clientConnection->remote : tmp_noaddr, http->request); @@ -1979,7 +1989,7 @@ clientReplyContext::sendBodyTooLargeError() void clientReplyContext::sendPreconditionFailedError() { - http->logType = LOG_TCP_HIT; + http->logType.update(LOG_TCP_HIT); Ip::Address tmp_noaddr; tmp_noaddr.setNoAddr(); ErrorState *const err = @@ -2000,9 +2010,9 @@ clientReplyContext::sendNotModified() // log as TCP_INM_HIT if code 304 generated for // If-None-Match request if (!http->request->flags.ims) - http->logType = LOG_TCP_INM_HIT; + http->logType.update(LOG_TCP_INM_HIT); else - http->logType = LOG_TCP_IMS_HIT; + http->logType.update(LOG_TCP_IMS_HIT); removeClientStoreReference(&sc, http); createStoreEntry(http->request->method, RequestFlags()); e = http->storeEntry(); @@ -2088,7 +2098,7 @@ clientReplyContext::processReplyAccessResult(const allow_t &accessAllowed) err_type page_id; page_id = aclGetDenyInfoPage(&Config.denyInfoList, AclMatchedName, 1); - http->logType = LOG_TCP_DENIED_REPLY; + http->logType.update(LOG_TCP_DENIED_REPLY); if (page_id == ERR_NONE) page_id = ERR_ACCESS_DENIED; diff --git a/src/client_side_reply.h b/src/client_side_reply.h index 4fd7aa2849..f596384284 100644 --- a/src/client_side_reply.h +++ b/src/client_side_reply.h @@ -74,7 +74,10 @@ public: /* state variable - replace with class to handle storeentries at some point */ int lookingforstore; + + /* StoreClient API */ virtual void created (StoreEntry *newEntry); + virtual LogTags *loggingTags(); ClientHttpRequest *http; int headers_sz; diff --git a/src/client_side_request.cc b/src/client_side_request.cc index 36bc046243..74c6f41d7c 100644 --- a/src/client_side_request.cc +++ b/src/client_side_request.cc @@ -151,7 +151,6 @@ ClientHttpRequest::ClientHttpRequest(ConnStateData * aConn) : uri(NULL), log_uri(NULL), req_sz(0), - logType(LOG_TAG_NONE), calloutContext(NULL), maxReplyBodySize_(0), entry_(NULL), @@ -777,7 +776,7 @@ ClientRequestContext::clientAccessCheckDone(const allow_t &answer) */ page_id = aclGetDenyInfoPage(&Config.denyInfoList, AclMatchedName, answer != ACCESS_AUTH_REQUIRED); - http->logType = LOG_TCP_DENIED; + http->logType.update(LOG_TCP_DENIED); if (auth_challenge) { #if USE_AUTH @@ -1533,7 +1532,8 @@ void ClientHttpRequest::httpStart() { PROF_start(httpStart); - logType = LOG_TAG_NONE; + // XXX: Re-initializes rather than updates. Should not be needed at all. + logType.update(LOG_TAG_NONE); debugs(85, 4, logType.c_str() << " for '" << uri << "'"); /* no one should have touched this */ diff --git a/src/enums.h b/src/enums.h index 646e6b403b..584eb22041 100644 --- a/src/enums.h +++ b/src/enums.h @@ -104,7 +104,10 @@ enum { ENTRY_NEGCACHED, ENTRY_VALIDATED, ENTRY_BAD_LENGTH, - ENTRY_ABORTED + ENTRY_ABORTED, + /// Whether the entry serves collapsed hits now. + /// Meaningful only for public entries. + ENTRY_REQUIRES_COLLAPSING }; /* diff --git a/src/htcp.cc b/src/htcp.cc index 56cbc11b93..333d82c934 100644 --- a/src/htcp.cc +++ b/src/htcp.cc @@ -135,6 +135,7 @@ public: /* StoreClient API */ void created(StoreEntry *); + virtual LogTags *loggingTags(); virtual void fillChecklist(ACLFilledChecklist &) const; public: @@ -145,12 +146,14 @@ public: size_t reqHdrsSz = 0; ///< size of the req_hdrs content HttpRequest::Pointer request; + /// optimization: nil until needed + mutable AccessLogEntryPointer al; + private: HttpRequest::Pointer checkHitRequest; Ip::Address from; htcpDataHeader *dhdr = nullptr; - mutable AccessLogEntryPointer al; }; class htcpDetail { @@ -254,7 +257,7 @@ static ssize_t htcpBuildTstOpData(char *buf, size_t buflen, htcpStuff * stuff); static void htcpHandleMsg(char *buf, int sz, Ip::Address &from); -static void htcpLogHtcp(Ip::Address &, int, LogTags, const char *, AccessLogEntryPointer); +static void htcpLogHtcp(Ip::Address &, const int, const LogTags_ot, const char *, AccessLogEntryPointer); static void htcpHandleTst(htcpDataHeader *, char *buf, int sz, Ip::Address &from); static void htcpRecv(int fd, void *data); @@ -268,13 +271,12 @@ static void htcpHandleTstRequest(htcpDataHeader *, char *buf, int sz, Ip::Addres static void htcpHandleTstResponse(htcpDataHeader *, char *, int, Ip::Address &); static void -htcpSyncAle(AccessLogEntryPointer &al, const Ip::Address &caddr, int opcode, LogTags logcode, const char *url) +htcpSyncAle(AccessLogEntryPointer &al, const Ip::Address &caddr, const int opcode, const char *url) { if (!al) al = new AccessLogEntry(); al->cache.caddr = caddr; al->htcp.opcode = htcpOpcodeStr[opcode]; - al->cache.code = logcode; al->url = url; // HTCP transactions do not wait al->cache.start_time = current_time; @@ -946,7 +948,7 @@ htcpSpecifier::created(StoreEntry *e) debugs(31, 3, "htcpCheckHit: NO; entry not valid to send" ); } else if (refreshCheckHTCP(e, checkHitRequest.getRaw())) { debugs(31, 3, "htcpCheckHit: NO; cached response is stale"); - } else if (e->collapsingInitiator() && !mayCollapseOn(*e)) { + } else if (e->hittingRequiresCollapsing() && !startCollapsingOn(*e, false)) { debugs(31, 3, "htcpCheckHit: NO; prohibited CF hit: " << *e); } else { debugs(31, 3, "htcpCheckHit: YES!?"); @@ -960,11 +962,20 @@ htcpSpecifier::created(StoreEntry *e) // e->abandon(); } +LogTags * +htcpSpecifier::loggingTags() +{ + // calling htcpSyncAle() here would not change cache.code + if (!al) + al = new AccessLogEntry(); + return &al->cache.code; +} + void htcpSpecifier::fillChecklist(ACLFilledChecklist &checklist) const { checklist.setRequest(request.getRaw()); - htcpSyncAle(al, from, dhdr->opcode, LOG_TAG_NONE, uri); + htcpSyncAle(al, from, dhdr->opcode, uri); checklist.al = al; } @@ -1121,13 +1132,13 @@ htcpHandleTstRequest(htcpDataHeader * dhdr, char *buf, int sz, Ip::Address &from if (!s->request) { debugs(31, 3, "htcpHandleTstRequest: failed to parse request"); - htcpLogHtcp(from, dhdr->opcode, LOG_UDP_INVALID, dash_str, nullptr); + htcpLogHtcp(from, dhdr->opcode, LOG_UDP_INVALID, dash_str, s->al); return; } if (!htcpAccessAllowed(Config.accessList.htcp, s, from)) { debugs(31, 3, "htcpHandleTstRequest: Access denied"); - htcpLogHtcp(from, dhdr->opcode, LOG_UDP_DENIED, s->uri, nullptr); + htcpLogHtcp(from, dhdr->opcode, LOG_UDP_DENIED, s->uri, s->al); return; } @@ -1171,17 +1182,20 @@ htcpHandleClr(htcpDataHeader * hdr, char *buf, int sz, Ip::Address &from) debugs(31, 3, "htcpHandleClr: htcpUnpackSpecifier failed"); htcpLogHtcp(from, hdr->opcode, LOG_UDP_INVALID, dash_str, nullptr); return; + } else { + s->setFrom(from); + s->setDataHeader(hdr); } if (!s->request) { debugs(31, 3, "htcpHandleTstRequest: failed to parse request"); - htcpLogHtcp(from, hdr->opcode, LOG_UDP_INVALID, dash_str, nullptr); + htcpLogHtcp(from, hdr->opcode, LOG_UDP_INVALID, dash_str, s->al); return; } if (!htcpAccessAllowed(Config.accessList.htcp_clr, s, from)) { debugs(31, 3, "htcpHandleClr: Access denied"); - htcpLogHtcp(from, hdr->opcode, LOG_UDP_DENIED, s->uri, nullptr); + htcpLogHtcp(from, hdr->opcode, LOG_UDP_DENIED, s->uri, s->al); return; } @@ -1196,12 +1210,12 @@ htcpHandleClr(htcpDataHeader * hdr, char *buf, int sz, Ip::Address &from) case 1: htcpClrReply(hdr, 1, from); /* hit */ - htcpLogHtcp(from, hdr->opcode, LOG_UDP_HIT, s->uri, nullptr); + htcpLogHtcp(from, hdr->opcode, LOG_UDP_HIT, s->uri, s->al); break; case 0: htcpClrReply(hdr, 0, from); /* miss */ - htcpLogHtcp(from, hdr->opcode, LOG_UDP_MISS, s->uri, nullptr); + htcpLogHtcp(from, hdr->opcode, LOG_UDP_MISS, s->uri, s->al); break; default: @@ -1607,14 +1621,16 @@ htcpClosePorts(void) } static void -htcpLogHtcp(Ip::Address &caddr, int opcode, LogTags logcode, const char *url, AccessLogEntryPointer al) +htcpLogHtcp(Ip::Address &caddr, const int opcode, const LogTags_ot logcode, const char *url, AccessLogEntryPointer al) { - if (LOG_TAG_NONE == logcode.oldType) - return; if (!Config.onoff.log_udp) return; - htcpSyncAle(al, caddr, opcode, logcode, url); + htcpSyncAle(al, caddr, opcode, url); + + assert(logcode != LOG_TAG_NONE); + al->cache.code.update(logcode); + accessLogLog(al, NULL); } diff --git a/src/icp_v2.cc b/src/icp_v2.cc index 2cba445e0e..756df4eeef 100644 --- a/src/icp_v2.cc +++ b/src/icp_v2.cc @@ -47,10 +47,20 @@ #include +/// a delayed icpUdpSend() call +class DelayedUdpSend { +public: + Ip::Address address; ///< remote peer (which may not be a cache_peer) + icp_common_t *msg = nullptr; ///< ICP message with network byte order fields + DelayedUdpSend *next = nullptr; ///< invasive FIFO queue of delayed ICP messages + AccessLogEntryPointer ale; ///< sender's master transaction summary + struct timeval queue_time = {0, 0}; ///< queuing timestamp +}; + static void icpIncomingConnectionOpened(const Comm::ConnectionPointer &conn, int errNo); /// \ingroup ServerProtocolICPInternal2 -static void icpLogIcp(const Ip::Address &, const LogTags &, int, const char *, int, AccessLogEntryPointer); +static void icpLogIcp(const Ip::Address &, const LogTags_ot, int, const char *, const int, AccessLogEntryPointer &); /// \ingroup ServerProtocolICPInternal2 static void icpHandleIcpV2(int, Ip::Address &, char *, int); @@ -58,14 +68,17 @@ static void icpHandleIcpV2(int, Ip::Address &, char *, int); /// \ingroup ServerProtocolICPInternal2 static void icpCount(void *, int, size_t, int); +static LogTags_ot icpLogFromICPCode(icp_opcode); + +static int icpUdpSend(int fd, const Ip::Address &to, icp_common_t * msg, int delay, AccessLogEntryPointer al); + static void -icpSyncAle(AccessLogEntryPointer &al, const Ip::Address &caddr, LogTags logcode, const char *url, int len, int delay) +icpSyncAle(AccessLogEntryPointer &al, const Ip::Address &caddr, const char *url, int len, int delay) { if (!al) al = new AccessLogEntry(); al->icp.opcode = ICP_QUERY; al->cache.caddr = caddr; - al->cache.code = logcode; al->url = url; // XXX: move to use icp.clientReply instead al->http.clientReplySz.payloadData = len; @@ -80,9 +93,9 @@ icpSyncAle(AccessLogEntryPointer &al, const Ip::Address &caddr, LogTags logcode, * IcpQueueHead is global so comm_incoming() knows whether or not * to call icpUdpSendQueue. */ -static icpUdpData *IcpQueueHead = NULL; +static DelayedUdpSend *IcpQueueHead = NULL; /// \ingroup ServerProtocolICPInternal2 -static icpUdpData *IcpQueueTail = NULL; +static DelayedUdpSend *IcpQueueTail = NULL; /// \ingroup ServerProtocolICPInternal2 Comm::ConnectionPointer icpIncomingConn = NULL; @@ -141,7 +154,7 @@ ICPState::~ICPState() } bool -ICPState::foundHit(const StoreEntry &e) const +ICPState::confirmAndPrepHit(const StoreEntry &e) { if (e.isNull()) return false; @@ -152,17 +165,26 @@ ICPState::foundHit(const StoreEntry &e) const if (!Config.onoff.icp_hit_stale && refreshCheckICP(&e, request)) return false; - if (e.collapsingInitiator() && !mayCollapseOn(e)) + if (e.hittingRequiresCollapsing() && !startCollapsingOn(e, false)) return false; return true; } +LogTags * +ICPState::loggingTags() +{ + // calling icpSyncAle(LOG_TAG_NONE) here would not change cache.code + if (!al) + al = new AccessLogEntry(); + return &al->cache.code; +} + void ICPState::fillChecklist(ACLFilledChecklist &checklist) const { checklist.setRequest(request); - icpSyncAle(al, from, LOG_TAG_NONE, url, 0, 0); + icpSyncAle(al, from, url, 0, 0); checklist.al = al; } @@ -179,7 +201,7 @@ public: ICPState(aHeader, aRequest),rtt(0),src_rtt(0),flags(0) {} ~ICP2State(); - void created(StoreEntry * newEntry); + virtual void created(StoreEntry * newEntry) override; int rtt; int src_rtt; @@ -195,7 +217,7 @@ ICP2State::created(StoreEntry *e) debugs(12, 5, "icpHandleIcpV2: OPCODE " << icp_opcode_str[header.opcode]); icp_opcode codeToSend; - if (foundHit(*e)) { + if (confirmAndPrepHit(*e)) { codeToSend = ICP_HIT; } else { #if USE_ICMP @@ -224,22 +246,32 @@ ICP2State::created(StoreEntry *e) /* End ICP2State */ -/// \ingroup ServerProtocolICPInternal2 +/// updates ALE (if any) and logs the transaction (if needed) static void -icpLogIcp(const Ip::Address &caddr, const LogTags &logcode, int len, const char *url, int delay, AccessLogEntry::Pointer al) +icpLogIcp(const Ip::Address &caddr, const LogTags_ot logcode, const int len, const char *url, int delay, AccessLogEntry::Pointer &al) { - if (LOG_TAG_NONE == logcode.oldType) - return; + assert(logcode != LOG_TAG_NONE); - if (LOG_ICP_QUERY == logcode.oldType) - return; + // Optimization: No premature (ALE creation in) icpSyncAle(). + if (al) { + icpSyncAle(al, caddr, url, len, delay); + al->cache.code.update(logcode); + } - clientdbUpdate(caddr, logcode, AnyP::PROTO_ICP, len); + if (logcode == LOG_ICP_QUERY) + return; // we never log queries - if (!Config.onoff.log_udp) + if (!Config.onoff.log_udp) { + clientdbUpdate(caddr, al ? al->cache.code : LogTags(logcode), AnyP::PROTO_ICP, len); return; + } - icpSyncAle(al, caddr, logcode, url, len, delay); + if (!al) { + // The above attempt to optimize ALE creation has failed. We do need it. + icpSyncAle(al, caddr, url, len, delay); + al->cache.code.update(logcode); + } + clientdbUpdate(caddr, al->cache.code, AnyP::PROTO_ICP, len); accessLogLog(al, NULL); } @@ -247,14 +279,14 @@ icpLogIcp(const Ip::Address &caddr, const LogTags &logcode, int len, const char void icpUdpSendQueue(int fd, void *) { - icpUdpData *q; + DelayedUdpSend *q; while ((q = IcpQueueHead) != NULL) { int delay = tvSubUsec(q->queue_time, current_time); /* increment delay to prevent looping */ - const int x = icpUdpSend(fd, q->address, (icp_common_t *) q->msg, q->logcode, ++delay, nullptr); + const int x = icpUdpSend(fd, q->address, q->msg, ++delay, q->ale); IcpQueueHead = q->next; - xfree(q); + delete q; if (x < 0) break; @@ -306,15 +338,16 @@ icp_common_t::CreateMessage( return (icp_common_t *)buf; } -int +// TODO: Move retries to icpCreateAndSend(); the other caller does not retry. +/// writes the given UDP msg to the socket; queues a retry on the first failure +/// \returns a negative number on failures +static int icpUdpSend(int fd, const Ip::Address &to, icp_common_t * msg, - const LogTags &logcode, int delay, AccessLogEntryPointer al) { - icpUdpData *queue; int x; int len; len = (int) ntohs(msg->length); @@ -325,17 +358,17 @@ icpUdpSend(int fd, if (x >= 0) { /* successfully written */ + const auto logcode = icpLogFromICPCode(static_cast(msg->opcode)); icpLogIcp(to, logcode, len, (char *) (msg + 1), delay, al); icpCount(msg, SENT, (size_t) len, delay); safe_free(msg); } else if (0 == delay) { /* send failed, but queue it */ - queue = (icpUdpData *) xcalloc(1, sizeof(icpUdpData)); + const auto queue = new DelayedUdpSend(); queue->address = to; queue->msg = msg; - queue->len = (int) ntohs(msg->length); queue->queue_time = current_time; - queue->logcode = logcode; + queue->ale = al; if (IcpQueueHead == NULL) { IcpQueueHead = queue; @@ -352,6 +385,7 @@ icpUdpSend(int fd, ++statCounter.icp.replies_queued; } else { /* don't queue it */ + // XXX: safe_free(msg) ++statCounter.icp.replies_dropped; } @@ -377,7 +411,7 @@ icpGetCommonOpcode() return ICP_ERR; } -LogTags +static LogTags_ot icpLogFromICPCode(icp_opcode opcode) { if (opcode == ICP_ERR) @@ -395,6 +429,12 @@ icpLogFromICPCode(icp_opcode opcode) if (opcode == ICP_MISS_NOFETCH) return LOG_UDP_MISS_NOFETCH; + if (opcode == ICP_DECHO) + return LOG_ICP_QUERY; + + if (opcode == ICP_QUERY) + return LOG_ICP_QUERY; + fatal("expected ICP opcode\n"); return LOG_UDP_INVALID; @@ -403,8 +443,11 @@ icpLogFromICPCode(icp_opcode opcode) void icpCreateAndSend(icp_opcode opcode, int flags, char const *url, int reqnum, int pad, int fd, const Ip::Address &from, AccessLogEntry::Pointer al) { + // update potentially shared ALE ASAP; the ICP query itself may be delayed + if (al) + al->cache.code.update(icpLogFromICPCode(opcode)); icp_common_t *reply = icp_common_t::CreateMessage(opcode, flags, url, reqnum, pad); - icpUdpSend(fd, from, reply, icpLogFromICPCode(opcode), 0, al); + icpUdpSend(fd, from, reply, 0, al); } void @@ -417,7 +460,7 @@ icpDenyAccess(Ip::Address &from, char *url, int reqnum, int fd) * count this DENIED query in the clientdb, even though * we're not sending an ICP reply... */ - clientdbUpdate(from, LOG_UDP_DENIED, AnyP::PROTO_ICP, 0); + clientdbUpdate(from, LogTags(LOG_UDP_DENIED), AnyP::PROTO_ICP, 0); } else { icpCreateAndSend(ICP_DENIED, 0, url, reqnum, 0, fd, from, nullptr); } diff --git a/src/icp_v3.cc b/src/icp_v3.cc index 484084d64b..4541cf89a4 100644 --- a/src/icp_v3.cc +++ b/src/icp_v3.cc @@ -66,7 +66,7 @@ ICP3State::created(StoreEntry *e) debugs(12, 5, "icpHandleIcpV3: OPCODE " << icp_opcode_str[header.opcode]); icp_opcode codeToSend; - if (foundHit(*e)) { + if (confirmAndPrepHit(*e)) { codeToSend = ICP_HIT; } else if (icpGetCommonOpcode() == ICP_ERR) codeToSend = ICP_MISS; diff --git a/src/ipc/StoreMap.cc b/src/ipc/StoreMap.cc index 6640010510..fb9a28d8c8 100644 --- a/src/ipc/StoreMap.cc +++ b/src/ipc/StoreMap.cc @@ -805,7 +805,12 @@ Ipc::StoreMapAnchor::exportInto(StoreEntry &into) const into.lastModified(basics.lastmod); into.swap_file_sz = basics.swap_file_sz; into.refcount = basics.refcount; + const bool collapsingRequired = into.hittingRequiresCollapsing(); into.flags = basics.flags; + // There are possibly several flags we do not need to overwrite, + // and ENTRY_REQUIRES_COLLAPSING is one of them. + // TODO: check for other flags. + into.setCollapsingRequirement(collapsingRequired); } void diff --git a/src/mime.cc b/src/mime.cc index 2e87011ffa..efdbfd06e1 100644 --- a/src/mime.cc +++ b/src/mime.cc @@ -46,6 +46,7 @@ public: /* StoreClient API */ virtual void created(StoreEntry *); + virtual LogTags *loggingTags() { return nullptr; } // no access logging/ACLs virtual void fillChecklist(ACLFilledChecklist &) const; private: @@ -442,7 +443,7 @@ MimeIcon::created(StoreEntry *newEntry) void MimeIcon::fillChecklist(ACLFilledChecklist &) const { - // Unreachable: We never call mayInitiateCollapsing() or mayCollapseOn(). + // Unreachable: We never mayInitiateCollapsing() or startCollapsingOn(). assert(false); } diff --git a/src/neighbors.cc b/src/neighbors.cc index f55a87ee92..8922dc7f02 100644 --- a/src/neighbors.cc +++ b/src/neighbors.cc @@ -599,7 +599,6 @@ neighborsUdpPing(HttpRequest * request, int i; int reqnum = 0; int flags; - icp_common_t *query; int queries_sent = 0; int peers_pinged = 0; int parent_timeout = 0, parent_exprep = 0; @@ -659,8 +658,9 @@ neighborsUdpPing(HttpRequest * request, if (p->icp.port == echo_port) { debugs(15, 4, "neighborsUdpPing: Looks like a dumb cache, send DECHO ping"); - query = icp_common_t::CreateMessage(ICP_DECHO, 0, url, reqnum, 0); - icpUdpSend(icpOutgoingConn->fd, p->in_addr, query, LOG_ICP_QUERY, 0, nullptr); + // TODO: Get ALE from callback_data if possible. + icpCreateAndSend(ICP_DECHO, 0, url, reqnum, 0, + icpOutgoingConn->fd, p->in_addr, nullptr); } else { flags = 0; @@ -668,9 +668,9 @@ neighborsUdpPing(HttpRequest * request, if (p->icp.version == ICP_VERSION_2) flags |= ICP_FLAG_SRC_RTT; - query = icp_common_t::CreateMessage(ICP_QUERY, flags, url, reqnum, 0); - - icpUdpSend(icpOutgoingConn->fd, p->in_addr, query, LOG_ICP_QUERY, 0, nullptr); + // TODO: Get ALE from callback_data if possible. + icpCreateAndSend(ICP_QUERY, flags, url, reqnum, 0, + icpOutgoingConn->fd, p->in_addr, nullptr); } } } @@ -1402,7 +1402,6 @@ peerCountMcastPeersStart(void *data) // APIs) to pass around a few basic data points like start_ping and ping! CachePeer *p = (CachePeer *)data; MemObject *mem; - icp_common_t *query; int reqnum; // TODO: use class AnyP::Uri instead of constructing and re-parsing a string LOCAL_ARRAY(char, url, MAX_URL); @@ -1429,8 +1428,8 @@ peerCountMcastPeersStart(void *data) mcastSetTtl(icpOutgoingConn->fd, p->mcast.ttl); p->mcast.id = mem->id; reqnum = icpSetCacheKey((const cache_key *)fake->key); - query = icp_common_t::CreateMessage(ICP_QUERY, 0, url, reqnum, 0); - icpUdpSend(icpOutgoingConn->fd, p->in_addr, query, LOG_ICP_QUERY, 0, nullptr); + icpCreateAndSend(ICP_QUERY, 0, url, reqnum, 0, + icpOutgoingConn->fd, p->in_addr, psstate->al); fake->ping_status = PING_WAITING; eventAdd("peerCountMcastPeersDone", peerCountMcastPeersDone, diff --git a/src/peer_digest.cc b/src/peer_digest.cc index 70d682df8b..c55b1bad87 100644 --- a/src/peer_digest.cc +++ b/src/peer_digest.cc @@ -352,6 +352,7 @@ peerDigestRequest(PeerDigest * pd) old_e = fetch->old_entry = storeGetPublicByRequest(req); + // XXX: Missing a hittingRequiresCollapsing() && startCollapsingOn() check. if (old_e) { debugs(72, 5, "found old " << *old_e); diff --git a/src/refresh.cc b/src/refresh.cc index 5296dbec25..77ab6d7946 100644 --- a/src/refresh.cc +++ b/src/refresh.cc @@ -591,6 +591,7 @@ refreshCheckHTTP(const StoreEntry * entry, HttpRequest * request) ++ refreshCounts[rcHTTP].total; ++ refreshCounts[rcHTTP].status[reason]; request->flags.staleIfHit = refreshIsStaleIfHit(reason); + // TODO: Treat collapsed responses as fresh but second-hand. return (Config.onoff.offline || reason < 200) ? 0 : 1; } diff --git a/src/store.cc b/src/store.cc index a7098019f5..30b0307db8 100644 --- a/src/store.cc +++ b/src/store.cc @@ -452,7 +452,6 @@ StoreEntry::releaseRequest(const bool shareable) shareableWhenPrivate = false; // may already be false if (EBIT_TEST(flags, RELEASE_REQUEST)) return; - setPrivateKey(shareable, true); } @@ -1818,7 +1817,6 @@ StoreEntry::startWriting() /* TODO: when we store headers separately remove the header portion */ /* TODO: mark the length of the headers ? */ /* We ONLY want the headers */ - assert (isEmpty()); assert(mem_obj); @@ -1831,6 +1829,13 @@ StoreEntry::startWriting() rep->body.packInto(this); flush(); + + // The entry headers are written, new clients + // should not collapse anymore. + if (hittingRequiresCollapsing()) { + setCollapsingRequirement(false); + Store::Root().transientsClearCollapsingRequirement(*this); + } } char const * @@ -2079,13 +2084,13 @@ StoreEntry::describeTimestamps() const return buf; } -bool -StoreEntry::collapsingInitiator() const +void +StoreEntry::setCollapsingRequirement(const bool required) { - if (!publicKey()) - return false; - return EBIT_TEST(flags, ENTRY_FWD_HDR_WAIT) || - (hasTransients() && !hasMemStore() && !hasDisk()); + if (required) + EBIT_SET(flags, ENTRY_REQUIRES_COLLAPSING); + else + EBIT_CLR(flags, ENTRY_REQUIRES_COLLAPSING); } static std::ostream & diff --git a/src/store/Controller.cc b/src/store/Controller.cc index be514e0cac..a822b39f3d 100644 --- a/src/store/Controller.cc +++ b/src/store/Controller.cc @@ -613,6 +613,13 @@ Store::Controller::transientsDisconnect(StoreEntry &e) transients->disconnect(e); } +void +Store::Controller::transientsClearCollapsingRequirement(StoreEntry &e) +{ + if (transients) + transients->clearCollapsingRequirement(e); +} + void Store::Controller::handleIdleEntry(StoreEntry &e) { @@ -685,11 +692,15 @@ Store::Controller::allowCollapsing(StoreEntry *e, const RequestFlags &reqFlags, const HttpRequestMethod &reqMethod) { const KeyScope keyScope = reqFlags.refresh ? ksRevalidation : ksDefault; + // set the flag now so that it gets copied into the Transients entry + e->setCollapsingRequirement(true); if (e->makePublic(keyScope)) { // this is needed for both local and SMP collapsing debugs(20, 3, "may " << (transients && e->hasTransients() ? "SMP-" : "locally-") << "collapse " << *e); return true; } + // paranoid cleanup; the flag is meaningless for private entries + e->setCollapsingRequirement(false); return false; } @@ -739,11 +750,15 @@ Store::Controller::syncCollapsed(const sfileno xitIndex) debugs(20, 7, "syncing " << *collapsed); - bool abortedByWriter = false; - bool waitingToBeFreed = false; - transients->status(*collapsed, abortedByWriter, waitingToBeFreed); + Transients::EntryStatus entryStatus; + transients->status(*collapsed, entryStatus); - if (waitingToBeFreed) { + if (!entryStatus.collapsed) { + debugs(20, 5, "removing collapsing requirement for " << *collapsed << " since remote writer probably got headers"); + collapsed->setCollapsingRequirement(false); + } + + if (entryStatus.waitingToBeFreed) { debugs(20, 3, "will release " << *collapsed << " due to waitingToBeFreed"); collapsed->release(true); // may already be marked } @@ -753,12 +768,18 @@ Store::Controller::syncCollapsed(const sfileno xitIndex) assert(transients->isReader(*collapsed)); - if (abortedByWriter) { + if (entryStatus.abortedByWriter) { debugs(20, 3, "aborting " << *collapsed << " because its writer has aborted"); collapsed->abort(); return; } + if (entryStatus.collapsed && !collapsed->hittingRequiresCollapsing()) { + debugs(20, 3, "aborting " << *collapsed << " due to writer/reader collapsing state mismatch"); + collapsed->abort(); + return; + } + bool found = false; bool inSync = false; if (memStore && collapsed->mem_obj->memCache.io == MemObject::ioDone) { @@ -776,7 +797,7 @@ Store::Controller::syncCollapsed(const sfileno xitIndex) found = anchorToCache(*collapsed, inSync); } - if (waitingToBeFreed && !found) { + if (entryStatus.waitingToBeFreed && !found) { debugs(20, 3, "aborting unattached " << *collapsed << " because it was marked for deletion before we could attach it"); collapsed->abort(); diff --git a/src/store/Controller.h b/src/store/Controller.h index c715d91783..cdd88e13c3 100644 --- a/src/store/Controller.h +++ b/src/store/Controller.h @@ -124,6 +124,9 @@ public: /// disassociates the entry from the intransit table void transientsDisconnect(StoreEntry &); + /// removes collapsing requirement (for future hits) + void transientsClearCollapsingRequirement(StoreEntry &e); + /// disassociates the entry from the memory cache, preserving cached data void memoryDisconnect(StoreEntry &); diff --git a/src/store_client.cc b/src/store_client.cc index b13c561572..463a46ec97 100644 --- a/src/store_client.cc +++ b/src/store_client.cc @@ -62,17 +62,32 @@ StoreClient::onCollapsingPath() const } bool -StoreClient::mayCollapseOn(const StoreEntry &e) const +StoreClient::startCollapsingOn(const StoreEntry &e, const bool doingRevalidation) { - assert(e.collapsingInitiator()); // our result is not meaningful for regular hits - return onCollapsingPath(); + if (!e.hittingRequiresCollapsing()) + return false; // collapsing is impossible due to the entry state + + if (!onCollapsingPath()) + return false; // collapsing is impossible due to Squid configuration + + /* collapsing is possible; the caller must collapse */ + + if (const auto tags = loggingTags()) { + if (doingRevalidation) + tags->collapsingHistory.revalidationCollapses++; + else + tags->collapsingHistory.otherCollapses++; + } + + debugs(85, 5, e << " doingRevalidation=" << doingRevalidation); + return true; } void StoreClient::fillChecklist(ACLFilledChecklist &checklist) const { // TODO: Consider moving all CF-related methods into a new dedicated class. - Must(!"mayCollapse() caller must override fillChecklist()"); + Must(!"startCollapsingOn() caller must override fillChecklist()"); } /* store_client */ diff --git a/src/tests/stub_icp.cc b/src/tests/stub_icp.cc index 4cf29124df..0744eb5742 100644 --- a/src/tests/stub_icp.cc +++ b/src/tests/stub_icp.cc @@ -21,7 +21,8 @@ icp_common_t *icp_common_t::CreateMessage(icp_opcode opcode, int flags, const ch icp_opcode icp_common_t::getOpCode() const STUB_RETVAL(ICP_INVALID) ICPState::ICPState(icp_common_t &aHeader, HttpRequest *aRequest) STUB ICPState::~ICPState() STUB -bool ICPState::foundHit(const StoreEntry &) const STUB_RETVAL(false) +bool ICPState::confirmAndPrepHit(const StoreEntry &) STUB_RETVAL(false) +LogTags *ICPState::loggingTags() STUB_RETVAL(nullptr) void ICPState::fillChecklist(ACLFilledChecklist&) const STUB Comm::ConnectionPointer icpIncomingConn; @@ -32,8 +33,6 @@ HttpRequest* icpGetRequest(char *url, int reqnum, int fd, Ip::Address &from) STU bool icpAccessAllowed(Ip::Address &from, HttpRequest * icp_request) STUB_RETVAL(false) void icpCreateAndSend(icp_opcode, int flags, char const *url, int reqnum, int pad, int fd, const Ip::Address &from, AccessLogEntryPointer) STUB icp_opcode icpGetCommonOpcode() STUB_RETVAL(ICP_INVALID) -int icpUdpSend(int, const Ip::Address &, icp_common_t *, LogTags, int, AccessLogEntryPointer) STUB_RETVAL(0) -LogTags icpLogFromICPCode(icp_opcode opcode) STUB_RETVAL(LOG_TAG_NONE) void icpDenyAccess(Ip::Address &from, char *url, int reqnum, int fd) STUB void icpHandleIcpV3(int, Ip::Address &, char *, int) STUB void icpConnectionsOpen(void) STUB diff --git a/src/tests/stub_store.cc b/src/tests/stub_store.cc index a5e0ee67b3..0fc07d761f 100644 --- a/src/tests/stub_store.cc +++ b/src/tests/stub_store.cc @@ -95,6 +95,7 @@ void StoreEntry::touch() STUB void StoreEntry::release(const bool shareable) STUB void StoreEntry::append(char const *, int) STUB void StoreEntry::vappendf(const char *, va_list) STUB +void StoreEntry::setCollapsingRequirement(const bool required) STUB NullStoreEntry *NullStoreEntry::getInstance() STUB_RETVAL(NULL) const char *NullStoreEntry::getMD5Text() const STUB_RETVAL(NULL) diff --git a/src/tunnel.cc b/src/tunnel.cc index 28bfb4b745..49b14ba5ac 100644 --- a/src/tunnel.cc +++ b/src/tunnel.cc @@ -451,7 +451,7 @@ TunnelStateData::informUserOfPeerError(const char *errMsg, const size_t sz) server.len = 0; if (logTag_ptr) - *logTag_ptr = LOG_TCP_TUNNEL; + logTag_ptr->update(LOG_TCP_TUNNEL); if (!clientExpectsConnectResponse()) { // closing the connection is the best we can do here @@ -880,7 +880,7 @@ tunnelStartShoveling(TunnelStateData *tunnelState) assert(!tunnelState->waitingForConnectExchange()); *tunnelState->status_ptr = Http::scOkay; if (tunnelState->logTag_ptr) - *tunnelState->logTag_ptr = LOG_TCP_TUNNEL; + tunnelState->logTag_ptr->update(LOG_TCP_TUNNEL); if (cbdataReferenceValid(tunnelState)) { // Shovel any payload already pushed into reply buffer by the server response diff --git a/src/urn.cc b/src/urn.cc index 8cac2a6a0b..976d37f36b 100644 --- a/src/urn.cc +++ b/src/urn.cc @@ -9,6 +9,7 @@ /* DEBUG: section 52 URN Parsing */ #include "squid.h" +#include "AccessLogEntry.h" #include "acl/FilledChecklist.h" #include "cbdata.h" #include "errorpage.h" @@ -33,6 +34,8 @@ class UrnState : public StoreClient CBDATA_CLASS(UrnState); public: + explicit UrnState(const AccessLogEntry::Pointer &anAle): ale(anAle) {} + void created (StoreEntry *newEntry); void start (HttpRequest *, StoreEntry *); char *getHost(const SBuf &urlpath); @@ -40,23 +43,25 @@ public: virtual ~UrnState(); - StoreEntry *entry; - store_client *sc; - StoreEntry *urlres_e; + StoreEntry *entry = nullptr; + store_client *sc = nullptr; + StoreEntry *urlres_e = nullptr; HttpRequest::Pointer request; HttpRequest::Pointer urlres_r; struct { - bool force_menu; + bool force_menu = false; } flags; - char reqbuf[URN_REQBUF_SZ]; - int reqofs; + char reqbuf[URN_REQBUF_SZ] = { '\0' }; + int reqofs = 0; private: /* StoreClient API */ + virtual LogTags *loggingTags() { return ale ? &ale->cache.code : nullptr; } virtual void fillChecklist(ACLFilledChecklist &) const; - char *urlres; + char *urlres = nullptr; + AccessLogEntry::Pointer ale; ///< master transaction summary }; typedef struct { @@ -188,15 +193,16 @@ void UrnState::fillChecklist(ACLFilledChecklist &checklist) const { checklist.setRequest(request.getRaw()); + checklist.al = ale; } void UrnState::created(StoreEntry *e) { - if (e->isNull() || (e->collapsingInitiator() && !mayCollapseOn(*e))) { + if (e->isNull() || (e->hittingRequiresCollapsing() && !startCollapsingOn(*e, false))) { urlres_e = storeCreateEntry(urlres, urlres, RequestFlags(), Http::METHOD_GET); sc = storeClientListAdd(urlres_e, this); - FwdState::fwdStart(Comm::ConnectionPointer(), urlres_e, urlres_r.getRaw()); + FwdState::Start(Comm::ConnectionPointer(), urlres_e, urlres_r.getRaw(), ale); // TODO: StoreClients must either store/lock or abandon found entries. //if (!e->isNull()) // e->abandon(); @@ -218,9 +224,9 @@ UrnState::created(StoreEntry *e) } void -urnStart(HttpRequest * r, StoreEntry * e) +urnStart(HttpRequest *r, StoreEntry *e, const AccessLogEntryPointer &ale) { - UrnState *anUrn = new UrnState(); + const auto anUrn = new UrnState(ale); anUrn->start (r, e); } diff --git a/src/urn.h b/src/urn.h index e6b0f4125c..b190d03bf0 100644 --- a/src/urn.h +++ b/src/urn.h @@ -11,10 +11,14 @@ #ifndef SQUID_URN_H_ #define SQUID_URN_H_ +class AccessLogEntry; class HttpRequest; class StoreEntry; -void urnStart(HttpRequest *, StoreEntry *); +template class RefCount; +typedef RefCount AccessLogEntryPointer; + +void urnStart(HttpRequest *, StoreEntry *, const AccessLogEntryPointer &ale); #endif /* SQUID_URN_H_ */ -- 2.39.5