]> git.ipfire.org Git - thirdparty/squid.git/blob - src/CollapsedForwarding.cc
Docs: Copyright updates for 2018 (#114)
[thirdparty/squid.git] / src / CollapsedForwarding.cc
1 /*
2 * Copyright (C) 1996-2018 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 17 Request Forwarding */
10
11 #include "squid.h"
12 #include "CollapsedForwarding.h"
13 #include "globals.h"
14 #include "ipc/mem/Segment.h"
15 #include "ipc/Messages.h"
16 #include "ipc/Port.h"
17 #include "ipc/TypedMsgHdr.h"
18 #include "MemObject.h"
19 #include "SquidConfig.h"
20 #include "Store.h"
21 #include "store_key_md5.h"
22 #include "tools.h"
23
24 /// shared memory segment path to use for CollapsedForwarding queue
25 static const char *const ShmLabel = "cf";
26 /// a single worker-to-worker queue capacity
27 // TODO: make configurable or compute from squid.conf settings if possible
28 static const int QueueCapacity = 1024;
29
30 std::unique_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue;
31
32 /// IPC queue message
33 class CollapsedForwardingMsg
34 {
35 public:
36 CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {}
37
38 public:
39 int sender; ///< kid ID of sending process
40
41 /// transients index, so that workers can find [private] entries to sync
42 sfileno xitIndex;
43 };
44
45 // CollapsedForwarding
46
47 void
48 CollapsedForwarding::Init()
49 {
50 Must(!queue.get());
51 if (UsingSmp() && IamWorkerProcess())
52 queue.reset(new Queue(ShmLabel, KidIdentifier));
53 }
54
55 void
56 CollapsedForwarding::Broadcast(const StoreEntry &e)
57 {
58 if (!queue.get())
59 return;
60
61 if (!e.mem_obj || e.mem_obj->xitTable.index < 0 ||
62 !Store::Root().transientReaders(e)) {
63 debugs(17, 7, "nobody reads " << e);
64 return;
65 }
66
67 CollapsedForwardingMsg msg;
68 msg.sender = KidIdentifier;
69 msg.xitIndex = e.mem_obj->xitTable.index;
70
71 debugs(17, 5, e << " to " << Config.workers << "-1 workers");
72
73 // TODO: send only to workers who are waiting for data
74 for (int workerId = 1; workerId <= Config.workers; ++workerId) {
75 try {
76 if (workerId != KidIdentifier && queue->push(workerId, msg))
77 Notify(workerId);
78 } catch (const Queue::Full &) {
79 debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " <<
80 "queue overflow for kid" << workerId <<
81 " at " << queue->outSize(workerId) << " items");
82 // TODO: grow queue size
83 }
84 }
85 }
86
87 void
88 CollapsedForwarding::Notify(const int workerId)
89 {
90 // TODO: Count and report the total number of notifications, pops, pushes.
91 debugs(17, 7, "to kid" << workerId);
92 Ipc::TypedMsgHdr msg;
93 msg.setType(Ipc::mtCollapsedForwardingNotification);
94 msg.putInt(KidIdentifier);
95 const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, workerId);
96 Ipc::SendMessage(addr, msg);
97 }
98
99 void
100 CollapsedForwarding::HandleNewData(const char *const when)
101 {
102 debugs(17, 4, "popping all " << when);
103 CollapsedForwardingMsg msg;
104 int workerId;
105 int poppedCount = 0;
106 while (queue->pop(workerId, msg)) {
107 debugs(17, 3, "message from kid" << workerId);
108 if (workerId != msg.sender) {
109 debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId <<
110 " != " << msg.sender);
111 }
112
113 debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map");
114 Store::Root().syncCollapsed(msg.xitIndex);
115 debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map");
116
117 // XXX: stop and schedule an async call to continue
118 ++poppedCount;
119 assert(poppedCount < SQUID_MAXFD);
120 }
121 }
122
123 void
124 CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg)
125 {
126 const int from = msg.getInt();
127 debugs(17, 7, "from " << from);
128 assert(queue.get());
129 queue->clearReaderSignal(from);
130 HandleNewData("after notification");
131 }
132
133 /// initializes shared queue used by CollapsedForwarding
134 class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner
135 {
136 public:
137 /* RegisteredRunner API */
138 CollapsedForwardingRr(): owner(NULL) {}
139 virtual ~CollapsedForwardingRr();
140
141 protected:
142 virtual void create();
143 virtual void open();
144
145 private:
146 Ipc::MultiQueue::Owner *owner;
147 };
148
149 RunnerRegistrationEntry(CollapsedForwardingRr);
150
151 void CollapsedForwardingRr::create()
152 {
153 Must(!owner);
154 owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1,
155 sizeof(CollapsedForwardingMsg),
156 QueueCapacity);
157 }
158
159 void CollapsedForwardingRr::open()
160 {
161 CollapsedForwarding::Init();
162 }
163
164 CollapsedForwardingRr::~CollapsedForwardingRr()
165 {
166 delete owner;
167 }
168