]>
Commit | Line | Data |
---|---|---|
807feb1d | 1 | /* |
4ac4a490 | 2 | * Copyright (C) 1996-2017 The Squid Software Foundation and contributors |
807feb1d | 3 | * |
bbc27441 AJ |
4 | * Squid software is distributed under GPLv2+ license and includes |
5 | * contributions from numerous individuals and organizations. | |
6 | * Please see the COPYING and CONTRIBUTORS files for details. | |
807feb1d DK |
7 | */ |
8 | ||
bbc27441 AJ |
9 | /* DEBUG: section 17 Request Forwarding */ |
10 | ||
807feb1d | 11 | #include "squid.h" |
e4d13993 AR |
12 | #include "CollapsedForwarding.h" |
13 | #include "globals.h" | |
807feb1d DK |
14 | #include "ipc/mem/Segment.h" |
15 | #include "ipc/Messages.h" | |
16 | #include "ipc/Port.h" | |
17 | #include "ipc/TypedMsgHdr.h" | |
d366a7fa | 18 | #include "MemObject.h" |
ce49546e AR |
19 | #include "SquidConfig.h" |
20 | #include "Store.h" | |
21 | #include "store_key_md5.h" | |
807feb1d DK |
22 | #include "tools.h" |
23 | ||
24 | /// shared memory segment path to use for CollapsedForwarding queue | |
25 | static const char *const ShmLabel = "cf"; | |
26 | /// a single worker-to-worker queue capacity | |
27 | // TODO: make configurable or compute from squid.conf settings if possible | |
28 | static const int QueueCapacity = 1024; | |
29 | ||
829030b5 | 30 | std::unique_ptr<CollapsedForwarding::Queue> CollapsedForwarding::queue; |
807feb1d DK |
31 | |
32 | /// IPC queue message | |
33 | class CollapsedForwardingMsg | |
34 | { | |
35 | public: | |
6919be24 | 36 | CollapsedForwardingMsg(): sender(-1), xitIndex(-1) {} |
807feb1d DK |
37 | |
38 | public: | |
6919be24 AR |
39 | int sender; ///< kid ID of sending process |
40 | ||
41 | /// transients index, so that workers can find [private] entries to sync | |
9d4e9cfb | 42 | sfileno xitIndex; |
807feb1d DK |
43 | }; |
44 | ||
45 | // CollapsedForwarding | |
46 | ||
47 | void | |
48 | CollapsedForwarding::Init() | |
49 | { | |
50 | Must(!queue.get()); | |
ce49546e AR |
51 | if (UsingSmp() && IamWorkerProcess()) |
52 | queue.reset(new Queue(ShmLabel, KidIdentifier)); | |
807feb1d DK |
53 | } |
54 | ||
55 | void | |
99921d9d | 56 | CollapsedForwarding::Broadcast(const StoreEntry &e) |
807feb1d | 57 | { |
ce49546e AR |
58 | if (!queue.get()) |
59 | return; | |
60 | ||
d366a7fa | 61 | if (!e.mem_obj || e.mem_obj->xitTable.index < 0 || |
9d4e9cfb | 62 | !Store::Root().transientReaders(e)) { |
f54986ad | 63 | debugs(17, 7, "nobody reads " << e); |
d366a7fa AR |
64 | return; |
65 | } | |
66 | ||
807feb1d | 67 | CollapsedForwardingMsg msg; |
ce49546e | 68 | msg.sender = KidIdentifier; |
6919be24 | 69 | msg.xitIndex = e.mem_obj->xitTable.index; |
ce49546e | 70 | |
6919be24 | 71 | debugs(17, 5, e << " to " << Config.workers << "-1 workers"); |
807feb1d DK |
72 | |
73 | // TODO: send only to workers who are waiting for data | |
807feb1d DK |
74 | for (int workerId = 1; workerId <= Config.workers; ++workerId) { |
75 | try { | |
ce49546e | 76 | if (workerId != KidIdentifier && queue->push(workerId, msg)) |
807feb1d DK |
77 | Notify(workerId); |
78 | } catch (const Queue::Full &) { | |
5c9846c7 AR |
79 | debugs(17, DBG_IMPORTANT, "ERROR: Collapsed forwarding " << |
80 | "queue overflow for kid" << workerId << | |
81 | " at " << queue->outSize(workerId) << " items"); | |
807feb1d DK |
82 | // TODO: grow queue size |
83 | } | |
84 | } | |
85 | } | |
86 | ||
87 | void | |
88 | CollapsedForwarding::Notify(const int workerId) | |
89 | { | |
90 | // TODO: Count and report the total number of notifications, pops, pushes. | |
ce49546e | 91 | debugs(17, 7, "to kid" << workerId); |
807feb1d | 92 | Ipc::TypedMsgHdr msg; |
807feb1d DK |
93 | msg.setType(Ipc::mtCollapsedForwardingNotification); |
94 | msg.putInt(KidIdentifier); | |
1ee292b7 | 95 | const String addr = Ipc::Port::MakeAddr(Ipc::strandAddrLabel, workerId); |
807feb1d DK |
96 | Ipc::SendMessage(addr, msg); |
97 | } | |
98 | ||
99 | void | |
100 | CollapsedForwarding::HandleNewData(const char *const when) | |
101 | { | |
ce49546e | 102 | debugs(17, 4, "popping all " << when); |
807feb1d DK |
103 | CollapsedForwardingMsg msg; |
104 | int workerId; | |
105 | int poppedCount = 0; | |
106 | while (queue->pop(workerId, msg)) { | |
ce49546e AR |
107 | debugs(17, 3, "message from kid" << workerId); |
108 | if (workerId != msg.sender) { | |
109 | debugs(17, DBG_IMPORTANT, "mismatching kid IDs: " << workerId << | |
110 | " != " << msg.sender); | |
807feb1d DK |
111 | } |
112 | ||
6919be24 AR |
113 | debugs(17, 7, "handling entry " << msg.xitIndex << " in transients_map"); |
114 | Store::Root().syncCollapsed(msg.xitIndex); | |
115 | debugs(17, 7, "handled entry " << msg.xitIndex << " in transients_map"); | |
ce49546e | 116 | |
807feb1d | 117 | // XXX: stop and schedule an async call to continue |
432ad89c FC |
118 | ++poppedCount; |
119 | assert(poppedCount < SQUID_MAXFD); | |
807feb1d DK |
120 | } |
121 | } | |
122 | ||
123 | void | |
124 | CollapsedForwarding::HandleNotification(const Ipc::TypedMsgHdr &msg) | |
125 | { | |
126 | const int from = msg.getInt(); | |
539283df | 127 | debugs(17, 7, "from " << from); |
ce49546e | 128 | assert(queue.get()); |
807feb1d DK |
129 | queue->clearReaderSignal(from); |
130 | HandleNewData("after notification"); | |
131 | } | |
132 | ||
133 | /// initializes shared queue used by CollapsedForwarding | |
134 | class CollapsedForwardingRr: public Ipc::Mem::RegisteredRunner | |
135 | { | |
136 | public: | |
137 | /* RegisteredRunner API */ | |
138 | CollapsedForwardingRr(): owner(NULL) {} | |
139 | virtual ~CollapsedForwardingRr(); | |
140 | ||
141 | protected: | |
21b7990f AR |
142 | virtual void create(); |
143 | virtual void open(); | |
807feb1d DK |
144 | |
145 | private: | |
146 | Ipc::MultiQueue::Owner *owner; | |
147 | }; | |
148 | ||
21b7990f | 149 | RunnerRegistrationEntry(CollapsedForwardingRr); |
807feb1d | 150 | |
21b7990f | 151 | void CollapsedForwardingRr::create() |
807feb1d DK |
152 | { |
153 | Must(!owner); | |
154 | owner = Ipc::MultiQueue::Init(ShmLabel, Config.workers, 1, | |
155 | sizeof(CollapsedForwardingMsg), | |
156 | QueueCapacity); | |
157 | } | |
158 | ||
21b7990f | 159 | void CollapsedForwardingRr::open() |
807feb1d | 160 | { |
ce49546e | 161 | CollapsedForwarding::Init(); |
807feb1d DK |
162 | } |
163 | ||
164 | CollapsedForwardingRr::~CollapsedForwardingRr() | |
165 | { | |
166 | delete owner; | |
167 | } | |
f53969cc | 168 |