]> git.ipfire.org Git - thirdparty/squid.git/blame - src/CollapsedForwarding.cc
Various fixes related to overlapping and collapsed entry caching.
[thirdparty/squid.git] / src / CollapsedForwarding.cc
CommitLineData
807feb1d
DK
1/*
2 * DEBUG: section 17 Request Forwarding
3 *
4 */
5
6#include "squid.h"
7#include "ipc/mem/Segment.h"
8#include "ipc/Messages.h"
9#include "ipc/Port.h"
10#include "ipc/TypedMsgHdr.h"
11#include "CollapsedForwarding.h"
807feb1d 12#include "globals.h"
ce49546e
AR
13#include "SquidConfig.h"
14#include "Store.h"
15#include "store_key_md5.h"
807feb1d
DK
16#include "tools.h"
17
18/// shared memory segment path to use for CollapsedForwarding queue
19static const char *const ShmLabel = "cf";
20/// a single worker-to-worker queue capacity
21// TODO: make configurable or compute from squid.conf settings if possible
22static const int QueueCapacity = 1024;
23
24std::auto_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
25
26/// IPC queue message
27class CollapsedForwardingMsg
28{
29public:
ce49546e 30 CollapsedForwardingMsg(): sender(-1) { key[0] = key[1] = 0; }
807feb1d
DK
31
32public:
ce49546e
AR
33 int sender; /// kid ID of sending process
34 uint64_t key[2]; ///< StoreEntry key
807feb1d
DK
35};
36
37// CollapsedForwarding
38
39void
40CollapsedForwarding::Init()
41{
42 Must(!queue.get());
ce49546e
AR
43 if (UsingSmp() && IamWorkerProcess())
44 queue.reset(new Queue(ShmLabel, KidIdentifier));
807feb1d
DK
45}
46
47void
99921d9d 48CollapsedForwarding::Broadcast(const StoreEntry &e)
807feb1d 49{
ce49546e
AR
50 if (!queue.get())
51 return;
52
807feb1d 53 CollapsedForwardingMsg msg;
ce49546e 54 msg.sender = KidIdentifier;
99921d9d 55 memcpy(msg.key, e.key, sizeof(msg.key));
ce49546e 56
99921d9d
AR
57 debugs(17, 5, storeKeyText(static_cast<cache_key*>(e.key)) << " to " <<
58 Config.workers << "-1 workers");
807feb1d
DK
59
60 // TODO: send only to workers who are waiting for data
807feb1d
DK
61 for (int workerId = 1; workerId <= Config.workers; ++workerId) {
62 try {
ce49546e 63 if (workerId != KidIdentifier && queue->push(workerId, msg))
807feb1d
DK
64 Notify(workerId);
65 } catch (const Queue::Full &) {
5c9846c7
AR
66 debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
67 "queue overflow for kid" << workerId <<
68 " at " << queue->outSize(workerId) << " items");
807feb1d
DK
69 // TODO: grow queue size
70 }
71 }
72}
73
74void
75CollapsedForwarding::Notify(const int workerId)
76{
77 // TODO: Count and report the total number of notifications, pops, pushes.
ce49546e 78 debugs(17, 7, "to kid" << workerId);
807feb1d
DK
79 Ipc::TypedMsgHdr msg;
80 // TODO: add proper message type?
81 msg.setType(Ipc::mtCollapsedForwardingNotification);
82 msg.putInt(KidIdentifier);
83 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, workerId);
84 Ipc::SendMessage(addr, msg);
85}
86
87void
88CollapsedForwarding::HandleNewData(const char *const when)
89{
ce49546e 90 debugs(17, 4, "popping all " << when);
807feb1d
DK
91 CollapsedForwardingMsg msg;
92 int workerId;
93 int poppedCount = 0;
94 while (queue->pop(workerId, msg)) {
ce49546e
AR
95 debugs(17, 3, "message from kid" << workerId);
96 if (workerId != msg.sender) {
97 debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId <<
98 " != " << msg.sender);
807feb1d
DK
99 }
100
5c9846c7
AR
101 const cache_key *key = reinterpret_cast<const cache_key*>(msg.key);
102 debugs(17, 7, "hadling " << storeKeyText(key));
103 Store::Root().syncCollapsed(key);
104 debugs(17, 7, "handled " << storeKeyText(key));
ce49546e 105
807feb1d
DK
106 // XXX: stop and schedule an async call to continue
107 assert(++poppedCount < SQUID_MAXFD);
108 }
109}
110
111void
112CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
113{
114 const int from = msg.getInt();
115 debugs(17, 7, HERE << "from " << from);
ce49546e 116 assert(queue.get());
807feb1d
DK
117 queue->clearReaderSignal(from);
118 HandleNewData("after notification");
119}
120
121/// initializes shared queue used by CollapsedForwarding
122class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
123{
124public:
125 /* RegisteredRunner API */
126 CollapsedForwardingRr(): owner(NULL) {}
127 virtual ~CollapsedForwardingRr();
128
129protected:
130 virtual void create(const RunnerRegistry &);
131 virtual void open(const RunnerRegistry &);
132
133private:
134 Ipc::MultiQueue::Owner *owner;
135};
136
137RunnerRegistrationEntry(rrAfterConfig, CollapsedForwardingRr);
138
139void CollapsedForwardingRr::create(const RunnerRegistry &)
140{
141 Must(!owner);
142 owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1,
143 sizeof(CollapsedForwardingMsg),
144 QueueCapacity);
145}
146
147void CollapsedForwardingRr::open(const RunnerRegistry &)
148{
ce49546e 149 CollapsedForwarding::Init();
807feb1d
DK
150}
151
152CollapsedForwardingRr::~CollapsedForwardingRr()
153{
154 delete owner;
155}