]>
Commit | Line | Data |
---|---|---|
807feb1d | 1 | /* |
f70aedc4 | 2 | * Copyright (C) 1996-2021 The Squid Software Foundation and contributors |
807feb1d | 3 | * |
bbc27441 AJ |
4 | * Squid software is distributed under GPLv2+ license and includes |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
807feb1d DK |
7 | */ |
8 | ||
bbc27441 AJ |
9 | /* DEBUG: section 17 Request Forwarding */ |
10 | ||
807feb1d | 11 | #include "squid.h" |
e4d13993 AR |
12 | #include "CollapsedForwarding.h" |
13 | #include "globals.h" | |
807feb1d DK |
14 | #include "ipc/mem/Segment.h" |
15 | #include "ipc/Messages.h" | |
16 | #include "ipc/Port.h" | |
17 | #include "ipc/TypedMsgHdr.h" | |
d366a7fa | 18 | #include "MemObject.h" |
ce49546e AR |
19 | #include "SquidConfig.h" |
20 | #include "Store.h" | |
21 | #include "store_key_md5.h" | |
807feb1d DK |
22 | #include "tools.h" |
23 | ||
24 | /// shared memory segment path to use for CollapsedForwarding queue | |
25 | static const char *const ShmLabel = "cf"; | |
26 | /// a single worker-to-worker queue capacity | |
27 | // TODO: make configurable or compute from squid.conf settings if possible | |
28 | static const int QueueCapacity = 1024; | |
29 | ||
829030b5 | 30 | std::unique_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue; |
807feb1d DK |
31 | |
32 | /// IPC queue message | |
33 | class CollapsedForwardingMsg | |
34 | { | |
35 | public: | |
6919be24 | 36 | CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {} |
807feb1d | 37 | |
d8ee9e8d EB |
38 | /// prints message parameters; suitable for cache manager reports |
39 | void stat(std::ostream &); | |
40 | ||
807feb1d | 41 | public: |
6919be24 AR |
42 | int sender; ///< kid ID of sending process |
43 | ||
44 | /// transients index, so that workers can find [private] entries to sync | |
9d4e9cfb | 45 | sfileno xitIndex; |
807feb1d DK |
46 | }; |
47 | ||
d8ee9e8d EB |
48 | void |
49 | CollapsedForwardingMsg::stat(std::ostream &os) | |
50 | { | |
51 | os << "sender: " << sender << ", xitIndex: " << xitIndex; | |
52 | } | |
53 | ||
807feb1d DK |
54 | // CollapsedForwarding |
55 | ||
56 | void | |
57 | CollapsedForwarding::Init() | |
58 | { | |
59 | Must(!queue.get()); | |
ce49546e AR |
60 | if (UsingSmp() && IamWorkerProcess()) |
61 | queue.reset(new Queue(ShmLabel, KidIdentifier)); | |
807feb1d DK |
62 | } |
63 | ||
64 | void | |
4310f8b0 | 65 | CollapsedForwarding::Broadcast(const StoreEntry &e, const bool includingThisWorker) |
807feb1d | 66 | { |
4310f8b0 | 67 | if (!e.hasTransients() || |
9d4e9cfb | 68 | !Store::Root().transientReaders(e)) { |
f54986ad | 69 | debugs(17, 7, "nobody reads " << e); |
d366a7fa AR |
70 | return; |
71 | } | |
72 | ||
4310f8b0 EB |
73 | debugs(17, 5, e); |
74 | Broadcast(e.mem_obj->xitTable.index, includingThisWorker); | |
75 | } | |
76 | ||
77 | void | |
78 | CollapsedForwarding::Broadcast(const sfileno index, const bool includingThisWorker) | |
79 | { | |
80 | if (!queue.get()) | |
81 | return; | |
82 | ||
807feb1d | 83 | CollapsedForwardingMsg msg; |
ce49546e | 84 | msg.sender = KidIdentifier; |
4310f8b0 | 85 | msg.xitIndex = index; |
ce49546e | 86 | |
4310f8b0 | 87 | debugs(17, 7, "entry " << index << " to " << Config.workers << (includingThisWorker ? "" : "-1") << " workers"); |
807feb1d DK |
88 | |
89 | // TODO: send only to workers who are waiting for data | |
807feb1d DK |
90 | for (int workerId = 1; workerId <= Config.workers; ++workerId) { |
91 | try { | |
4310f8b0 | 92 | if ((workerId != KidIdentifier || includingThisWorker) && queue->push(workerId, msg)) |
807feb1d DK |
93 | Notify(workerId); |
94 | } catch (const Queue::Full &) { | |
5c9846c7 AR |
95 | debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " << |
96 | "queue overflow for kid" << workerId << | |
97 | " at " << queue->outSize(workerId) << " items"); | |
807feb1d DK |
98 | // TODO: grow queue size |
99 | } | |
100 | } | |
101 | } | |
102 | ||
103 | void | |
104 | CollapsedForwarding::Notify(const int workerId) | |
105 | { | |
106 | // TODO: Count and report the total number of notifications, pops, pushes. | |
ce49546e | 107 | debugs(17, 7, "to kid" << workerId); |
807feb1d | 108 | Ipc::TypedMsgHdr msg; |
807feb1d DK |
109 | msg.setType(Ipc::mtCollapsedForwardingNotification); |
110 | msg.putInt(KidIdentifier); | |
1ee292b7 | 111 | const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, workerId); |
807feb1d DK |
112 | Ipc::SendMessage(addr, msg); |
113 | } | |
114 | ||
115 | void | |
116 | CollapsedForwarding::HandleNewData(const char *const when) | |
117 | { | |
ce49546e | 118 | debugs(17, 4, "popping all " << when); |
807feb1d DK |
119 | CollapsedForwardingMsg msg; |
120 | int workerId; | |
121 | int poppedCount = 0; | |
122 | while (queue->pop(workerId, msg)) { | |
ce49546e AR |
123 | debugs(17, 3, "message from kid" << workerId); |
124 | if (workerId != msg.sender) { | |
125 | debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId << | |
126 | " != " << msg.sender); | |
807feb1d DK |
127 | } |
128 | ||
6919be24 AR |
129 | debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map"); |
130 | Store::Root().syncCollapsed(msg.xitIndex); | |
131 | debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map"); | |
ce49546e | 132 | |
807feb1d | 133 | // XXX: stop and schedule an async call to continue |
432ad89c FC |
134 | ++poppedCount; |
135 | assert(poppedCount < SQUID_MAXFD); | |
807feb1d DK |
136 | } |
137 | } | |
138 | ||
139 | void | |
140 | CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg) | |
141 | { | |
142 | const int from = msg.getInt(); | |
539283df | 143 | debugs(17, 7, "from " << from); |
ce49546e | 144 | assert(queue.get()); |
807feb1d DK |
145 | queue->clearReaderSignal(from); |
146 | HandleNewData("after notification"); | |
147 | } | |
148 | ||
d8ee9e8d EB |
149 | void |
150 | CollapsedForwarding::StatQueue(std::ostream &os) | |
151 | { | |
152 | if (queue.get()) { | |
153 | os << "Transients queues:\n"; | |
154 | queue->stat<CollapsedForwardingMsg>(os); | |
155 | } | |
156 | } | |
157 | ||
807feb1d DK |
158 | /// initializes shared queue used by CollapsedForwarding |
159 | class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner | |
160 | { | |
161 | public: | |
162 | /* RegisteredRunner API */ | |
163 | CollapsedForwardingRr(): owner(NULL) {} | |
164 | virtual ~CollapsedForwardingRr(); | |
165 | ||
166 | protected: | |
21b7990f AR |
167 | virtual void create(); |
168 | virtual void open(); | |
807feb1d DK |
169 | |
170 | private: | |
171 | Ipc::MultiQueue::Owner *owner; | |
172 | }; | |
173 | ||
21b7990f | 174 | RunnerRegistrationEntry(CollapsedForwardingRr); |
807feb1d | 175 | |
21b7990f | 176 | void CollapsedForwardingRr::create() |
807feb1d DK |
177 | { |
178 | Must(!owner); | |
179 | owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1, | |
180 | sizeof(CollapsedForwardingMsg), | |
181 | QueueCapacity); | |
182 | } | |
183 | ||
21b7990f | 184 | void CollapsedForwardingRr::open() |
807feb1d | 185 | { |
ce49546e | 186 | CollapsedForwarding::Init(); |
807feb1d DK |
187 | } |
188 | ||
189 | CollapsedForwardingRr::~CollapsedForwardingRr() | |
190 | { | |
191 | delete owner; | |
192 | } | |
f53969cc | 193 |