]> git.ipfire.org Git - thirdparty/squid.git/blame - src/ipc/Queue.cc
Expose the underlying integer type to AtomicWord users.
[thirdparty/squid.git] / src / ipc / Queue.cc
CommitLineData
948230de
DK
1/*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8#include "config.h"
fa61cefe
AR
9#include "base/TextException.h"
10#include "Debug.h"
11#include "globals.h"
948230de
DK
12#include "ipc/Queue.h"
13
f5591061 14/// constructs Metadata ID from parent queue ID
948230de 15static String
f5591061 16MetadataId(String id)
948230de 17{
f5591061 18 id.append("__metadata");
948230de
DK
19 return id;
20}
21
f5591061 22/// constructs one-to-one queues ID from parent queue ID
fa61cefe 23static String
f5591061
DK
24QueuesId(String id)
25{
26 id.append("__queues");
27 return id;
28}
29
30/// constructs QueueReaders ID from parent queue ID
31static String
32ReadersId(String id)
fa61cefe
AR
33{
34 id.append("__readers");
35 return id;
36}
37
38
39/* QueueReader */
40
15cdbc7c 41InstanceIdDefinitions(Ipc::QueueReader, "ipcQR");
fa61cefe 42
15cdbc7c 43Ipc::QueueReader::QueueReader(): popBlocked(1), popSignal(0)
fa61cefe
AR
44{
45 debugs(54, 7, HERE << "constructed " << id);
46}
47
68353d5a
DK
48/* QueueReaders */
49
15cdbc7c 50Ipc::QueueReaders::QueueReaders(const int aCapacity): theCapacity(aCapacity)
68353d5a
DK
51{
52 Must(theCapacity > 0);
53 new (theReaders) QueueReader[theCapacity];
54}
55
56size_t
15cdbc7c 57Ipc::QueueReaders::sharedMemorySize() const
68353d5a
DK
58{
59 return SharedMemorySize(theCapacity);
60}
61
62size_t
15cdbc7c 63Ipc::QueueReaders::SharedMemorySize(const int capacity)
68353d5a
DK
64{
65 return sizeof(QueueReaders) + sizeof(QueueReader) * capacity;
66}
67
948230de
DK
68
69// OneToOneUniQueue
70
f5591061 71Ipc::OneToOneUniQueue::OneToOneUniQueue(const unsigned int aMaxItemSize, const int aCapacity):
9199139f
AR
72 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
73 theCapacity(aCapacity)
fa61cefe 74{
f5591061
DK
75 Must(theMaxItemSize > 0);
76 Must(theCapacity > 0);
fa61cefe
AR
77}
78
948230de 79int
15cdbc7c 80Ipc::OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
948230de
DK
81{
82 assert(maxItemSize > 0);
f5591061 83 size -= sizeof(OneToOneUniQueue);
948230de
DK
84 return size >= 0 ? size / maxItemSize : 0;
85}
86
87int
15cdbc7c 88Ipc::OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
948230de
DK
89{
90 assert(size >= 0);
f5591061 91 return sizeof(OneToOneUniQueue) + maxItemSize * size;
948230de
DK
92}
93
68353d5a 94
f5591061 95/* OneToOneUniQueues */
948230de 96
f5591061 97Ipc::OneToOneUniQueues::OneToOneUniQueues(const int aCapacity, const unsigned int maxItemSize, const int queueCapacity): theCapacity(aCapacity)
fa61cefe 98{
f5591061
DK
99 Must(theCapacity > 0);
100 for (int i = 0; i < theCapacity; ++i)
101 new (&(*this)[i]) OneToOneUniQueue(maxItemSize, queueCapacity);
68353d5a
DK
102}
103
104size_t
f5591061 105Ipc::OneToOneUniQueues::sharedMemorySize() const
68353d5a 106{
f5591061 107 return sizeof(*this) + theCapacity * front().sharedMemorySize();
fa61cefe
AR
108}
109
f5591061
DK
110size_t
111Ipc::OneToOneUniQueues::SharedMemorySize(const int capacity, const unsigned int maxItemSize, const int queueCapacity)
fa61cefe 112{
f5591061
DK
113 const int queueSize =
114 OneToOneUniQueue::Items2Bytes(maxItemSize, queueCapacity);
115 return sizeof(OneToOneUniQueues) + queueSize * capacity;
fa61cefe
AR
116}
117
f5591061
DK
118const Ipc::OneToOneUniQueue &
119Ipc::OneToOneUniQueues::operator [](const int index) const
fa61cefe 120{
f5591061
DK
121 Must(0 <= index && index < theCapacity);
122 const size_t queueSize = index ? front().sharedMemorySize() : 0;
123 const char *const queue =
124 reinterpret_cast<const char *>(this) + sizeof(*this) + index * queueSize;
125 return *reinterpret_cast<const OneToOneUniQueue *>(queue);
fa61cefe
AR
126}
127
948230de 128
f5591061 129// FewToFewBiQueue
948230de 130
f5591061
DK
131Ipc::FewToFewBiQueue::Owner *
132Ipc::FewToFewBiQueue::Init(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity)
68353d5a 133{
f5591061 134 return new Owner(id, groupASize, groupAIdOffset, groupBSize, groupBIdOffset, maxItemSize, capacity);
68353d5a
DK
135}
136
f5591061 137Ipc::FewToFewBiQueue::FewToFewBiQueue(const String &id, const Group aLocalGroup, const int aLocalProcessId):
9199139f
AR
138 metadata(shm_old(Metadata)(MetadataId(id).termedBuf())),
139 queues(shm_old(OneToOneUniQueues)(QueuesId(id).termedBuf())),
140 readers(shm_old(QueueReaders)(ReadersId(id).termedBuf())),
141 theLocalGroup(aLocalGroup), theLocalProcessId(aLocalProcessId),
142 theLastPopProcessId(readers->theCapacity)
948230de 143{
f5591061
DK
144 Must(queues->theCapacity == metadata->theGroupASize * metadata->theGroupBSize * 2);
145 Must(readers->theCapacity == metadata->theGroupASize + metadata->theGroupBSize);
68353d5a 146
f5591061
DK
147 const QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
148 debugs(54, 7, HERE << "queue " << id << " reader: " << localReader.id);
948230de
DK
149}
150
f5591061
DK
151bool
152Ipc::FewToFewBiQueue::validProcessId(const Group group, const int processId) const
153{
154 switch (group) {
155 case groupA:
156 return metadata->theGroupAIdOffset <= processId &&
9199139f 157 processId < metadata->theGroupAIdOffset + metadata->theGroupASize;
f5591061
DK
158 case groupB:
159 return metadata->theGroupBIdOffset <= processId &&
9199139f 160 processId < metadata->theGroupBIdOffset + metadata->theGroupBSize;
f5591061
DK
161 }
162 return false;
163}
164
0a11e039
AR
165int
166Ipc::FewToFewBiQueue::oneToOneQueueIndex(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
f5591061
DK
167{
168 Must(fromGroup != toGroup);
0a11e039
AR
169 assert(validProcessId(fromGroup, fromProcessId));
170 assert(validProcessId(toGroup, toProcessId));
f5591061
DK
171 int index1;
172 int index2;
173 int offset;
174 if (fromGroup == groupA) {
175 index1 = fromProcessId - metadata->theGroupAIdOffset;
176 index2 = toProcessId - metadata->theGroupBIdOffset;
177 offset = 0;
178 } else {
179 index1 = toProcessId - metadata->theGroupAIdOffset;
180 index2 = fromProcessId - metadata->theGroupBIdOffset;
181 offset = metadata->theGroupASize * metadata->theGroupBSize;
182 }
183 const int index = offset + index1 * metadata->theGroupBSize + index2;
0a11e039
AR
184 return index;
185}
186
187Ipc::OneToOneUniQueue &
188Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId)
189{
190 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
191}
192
193const Ipc::OneToOneUniQueue &
194Ipc::FewToFewBiQueue::oneToOneQueue(const Group fromGroup, const int fromProcessId, const Group toGroup, const int toProcessId) const
195{
196 return (*queues)[oneToOneQueueIndex(fromGroup, fromProcessId, toGroup, toProcessId)];
948230de
DK
197}
198
f5591061
DK
199Ipc::QueueReader &
200Ipc::FewToFewBiQueue::reader(const Group group, const int processId)
948230de 201{
f5591061
DK
202 Must(validProcessId(group, processId));
203 const int index = group == groupA ?
9199139f
AR
204 processId - metadata->theGroupAIdOffset :
205 metadata->theGroupASize + processId - metadata->theGroupBIdOffset;
f5591061 206 return readers->theReaders[index];
948230de 207}
fa61cefe
AR
208
209void
f5591061 210Ipc::FewToFewBiQueue::clearReaderSignal(const int remoteProcessId)
fa61cefe 211{
f5591061
DK
212 QueueReader &localReader = reader(theLocalGroup, theLocalProcessId);
213 debugs(54, 7, HERE << "reader: " << localReader.id);
fa61cefe 214
f5591061
DK
215 Must(validProcessId(remoteGroup(), remoteProcessId));
216 localReader.clearSignal();
fa61cefe
AR
217
218 // we got a hint; we could reposition iteration to try popping from the
f5591061 219 // remoteProcessId queue first; but it does not seem to help much and might
fa61cefe 220 // introduce some bias so we do not do that for now:
f5591061 221 // theLastPopProcessId = remoteProcessId;
68353d5a
DK
222}
223
f5591061 224Ipc::FewToFewBiQueue::Metadata::Metadata(const int aGroupASize, const int aGroupAIdOffset, const int aGroupBSize, const int aGroupBIdOffset):
9199139f
AR
225 theGroupASize(aGroupASize), theGroupAIdOffset(aGroupAIdOffset),
226 theGroupBSize(aGroupBSize), theGroupBIdOffset(aGroupBIdOffset)
f5591061
DK
227{
228 Must(theGroupASize > 0);
229 Must(theGroupBSize > 0);
230}
231
232Ipc::FewToFewBiQueue::Owner::Owner(const String &id, const int groupASize, const int groupAIdOffset, const int groupBSize, const int groupBIdOffset, const unsigned int maxItemSize, const int capacity):
9199139f
AR
233 metadataOwner(shm_new(Metadata)(MetadataId(id).termedBuf(), groupASize, groupAIdOffset, groupBSize, groupBIdOffset)),
234 queuesOwner(shm_new(OneToOneUniQueues)(QueuesId(id).termedBuf(), groupASize*groupBSize*2, maxItemSize, capacity)),
235 readersOwner(shm_new(QueueReaders)(ReadersId(id).termedBuf(), groupASize+groupBSize))
68353d5a 236{
68353d5a
DK
237}
238
f5591061 239Ipc::FewToFewBiQueue::Owner::~Owner()
68353d5a 240{
f5591061
DK
241 delete metadataOwner;
242 delete queuesOwner;
68353d5a 243 delete readersOwner;
fa61cefe 244}