]> git.ipfire.org Git - thirdparty/squid.git/blob - src/CollapsedForwarding.cc
Added debugging to allow automated store entry tracking during CF notifications.
[thirdparty/squid.git] / src / CollapsedForwarding.cc
1 /*
2 * DEBUG: section 17 Request Forwarding
3 *
4 */
5
6 #include "squid.h"
7 #include "ipc/mem/Segment.h"
8 #include "ipc/Messages.h"
9 #include "ipc/Port.h"
10 #include "ipc/TypedMsgHdr.h"
11 #include "CollapsedForwarding.h"
12 #include "globals.h"
13 #include "SquidConfig.h"
14 #include "Store.h"
15 #include "store_key_md5.h"
16 #include "tools.h"
17
18 /// shared memory segment path to use for CollapsedForwarding queue
19 static const char *const ShmLabel = "cf";
20 /// a single worker-to-worker queue capacity
21 // TODO: make configurable or compute from squid.conf settings if possible
22 static const int QueueCapacity = 1024;
23
24 std::auto_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
25
26 /// IPC queue message
27 class CollapsedForwardingMsg
28 {
29 public:
30 CollapsedForwardingMsg(): sender(-1) { key[0] = key[1] = 0; }
31
32 public:
33 int sender; /// kid ID of sending process
34 uint64_t key[2]; ///< StoreEntry key
35 };
36
37 // CollapsedForwarding
38
39 void
40 CollapsedForwarding::Init()
41 {
42 Must(!queue.get());
43 if (UsingSmp() && IamWorkerProcess())
44 queue.reset(new Queue(ShmLabel, KidIdentifier));
45 }
46
47 void
48 CollapsedForwarding::Broadcast(const cache_key *key)
49 {
50 if (!queue.get())
51 return;
52
53 CollapsedForwardingMsg msg;
54 msg.sender = KidIdentifier;
55 memcpy(msg.key, key, sizeof(msg.key));
56
57 debugs(17, 5, storeKeyText(key) << " to " << Config.workers << "-1 workers");
58
59 // TODO: send only to workers who are waiting for data
60 for (int workerId = 1; workerId <= Config.workers; ++workerId) {
61 try {
62 if (workerId != KidIdentifier && queue->push(workerId, msg))
63 Notify(workerId);
64 } catch (const Queue::Full &) {
65 debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
66 "queue overflow for kid" << workerId <<
67 " at " << queue->outSize(workerId) << " items");
68 // TODO: grow queue size
69 }
70 }
71 }
72
73 void
74 CollapsedForwarding::Notify(const int workerId)
75 {
76 // TODO: Count and report the total number of notifications, pops, pushes.
77 debugs(17, 7, "to kid" << workerId);
78 Ipc::TypedMsgHdr msg;
79 // TODO: add proper message type?
80 msg.setType(Ipc::mtCollapsedForwardingNotification);
81 msg.putInt(KidIdentifier);
82 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrPfx, workerId);
83 Ipc::SendMessage(addr, msg);
84 }
85
86 void
87 CollapsedForwarding::HandleNewData(const char *const when)
88 {
89 debugs(17, 4, "popping all " << when);
90 CollapsedForwardingMsg msg;
91 int workerId;
92 int poppedCount = 0;
93 while (queue->pop(workerId, msg)) {
94 debugs(17, 3, "message from kid" << workerId);
95 if (workerId != msg.sender) {
96 debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId <<
97 " != " << msg.sender);
98 }
99
100 const cache_key *key = reinterpret_cast<const cache_key*>(msg.key);
101 debugs(17, 7, "hadling " << storeKeyText(key));
102 Store::Root().syncCollapsed(key);
103 debugs(17, 7, "handled " << storeKeyText(key));
104
105 // XXX: stop and schedule an async call to continue
106 assert(++poppedCount < SQUID_MAXFD);
107 }
108 }
109
110 void
111 CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
112 {
113 const int from = msg.getInt();
114 debugs(17, 7, HERE << "from " << from);
115 assert(queue.get());
116 queue->clearReaderSignal(from);
117 HandleNewData("after notification");
118 }
119
120 /// initializes shared queue used by CollapsedForwarding
121 class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
122 {
123 public:
124 /* RegisteredRunner API */
125 CollapsedForwardingRr(): owner(NULL) {}
126 virtual ~CollapsedForwardingRr();
127
128 protected:
129 virtual void create(const RunnerRegistry &);
130 virtual void open(const RunnerRegistry &);
131
132 private:
133 Ipc::MultiQueue::Owner *owner;
134 };
135
136 RunnerRegistrationEntry(rrAfterConfig, CollapsedForwardingRr);
137
138 void CollapsedForwardingRr::create(const RunnerRegistry &)
139 {
140 Must(!owner);
141 owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1,
142 sizeof(CollapsedForwardingMsg),
143 QueueCapacity);
144 }
145
146 void CollapsedForwardingRr::open(const RunnerRegistry &)
147 {
148 CollapsedForwarding::Init();
149 }
150
151 CollapsedForwardingRr::~CollapsedForwardingRr()
152 {
153 delete owner;
154 }