]> git.ipfire.org Git - thirdparty/squid.git/blob - src/ipc/Queue.cc
Convert most Segment::mem() calls to reserve().
[thirdparty/squid.git] / src / ipc / Queue.cc
1 /*
2 * $Id$
3 *
4 * DEBUG: section 54 Interprocess Communication
5 *
6 */
7
8 #include "config.h"
9 #include "base/TextException.h"
10 #include "Debug.h"
11 #include "globals.h"
12 #include "ipc/Queue.h"
13
14 /// constructs shared segment ID from parent queue ID and child queue index
15 static String
16 QueueId(String id, const int idx)
17 {
18 id.append("__");
19 id.append(xitoa(idx));
20 return id;
21 }
22
23 /// constructs QueueReader ID from parent queue ID
24 static String
25 ReaderId(String id)
26 {
27 id.append("__readers");
28 return id;
29 }
30
31
32 /* QueueReader */
33
34 InstanceIdDefinitions(QueueReader, "ipcQR");
35
36 QueueReader::QueueReader(): popBlocked(1), popSignal(0)
37 {
38 debugs(54, 7, HERE << "constructed " << id);
39 }
40
41
42 // OneToOneUniQueue
43
44 OneToOneUniQueue::OneToOneUniQueue(const String &id, const unsigned int maxItemSize, const int capacity):
45 shm(id.termedBuf()), reader_(NULL)
46 {
47 const int sharedSize = Items2Bytes(maxItemSize, capacity);
48 shm.create(sharedSize);
49 shared = new (shm.reserve(sharedSize)) Shared(maxItemSize, capacity);
50 }
51
52 OneToOneUniQueue::OneToOneUniQueue(const String &id): shm(id.termedBuf()),
53 reader_(NULL)
54 {
55 shm.open();
56 shared = reinterpret_cast<Shared *>(shm.mem());
57 assert(shared);
58 const int mySharedSize =
59 Items2Bytes(shared->theMaxItemSize, shared->theCapacity);
60 assert(shared == reinterpret_cast<Shared *>(shm.reserve(mySharedSize)));
61 }
62
63 void
64 OneToOneUniQueue::reader(QueueReader *aReader)
65 {
66 Must(!reader_ && aReader);
67 reader_ = aReader;
68 }
69
70 int
71 OneToOneUniQueue::Bytes2Items(const unsigned int maxItemSize, int size)
72 {
73 assert(maxItemSize > 0);
74 size -= sizeof(Shared);
75 return size >= 0 ? size / maxItemSize : 0;
76 }
77
78 int
79 OneToOneUniQueue::Items2Bytes(const unsigned int maxItemSize, const int size)
80 {
81 assert(size >= 0);
82 return sizeof(Shared) + maxItemSize * size;
83 }
84
85 OneToOneUniQueue::Shared::Shared(const unsigned int aMaxItemSize, const int aCapacity):
86 theIn(0), theOut(0), theSize(0), theMaxItemSize(aMaxItemSize),
87 theCapacity(aCapacity)
88 {
89 }
90
91 QueueReader &
92 OneToOneUniQueue::reader()
93 {
94 Must(reader_);
95 return *reader_;
96 }
97
98
99 // OneToOneBiQueue
100
101 OneToOneBiQueue::OneToOneBiQueue(const String &id, const unsigned int maxItemSize, const int capacity):
102 popQueue(new OneToOneUniQueue(QueueId(id, 1), maxItemSize, capacity)),
103 pushQueue(new OneToOneUniQueue(QueueId(id, 2), maxItemSize, capacity))
104 {
105 }
106
107 OneToOneBiQueue::OneToOneBiQueue(const String &id):
108 popQueue(new OneToOneUniQueue(QueueId(id, 2))),
109 pushQueue(new OneToOneUniQueue(QueueId(id, 1)))
110 {
111 }
112
113 void
114 OneToOneBiQueue::readers(QueueReader *r1, QueueReader *r2)
115 {
116 popQueue->reader(r1);
117 pushQueue->reader(r2);
118 }
119
120 void
121 OneToOneBiQueue::clearReaderSignal()
122 {
123 debugs(54, 7, HERE << "reader: " << &popQueue->reader());
124 popQueue->reader().clearSignal();
125 }
126
127
128 // FewToOneBiQueue
129
130 FewToOneBiQueue::FewToOneBiQueue(const String &id, const int aWorkerCount, const unsigned int maxItemSize, const int capacity):
131 theLastPopWorker(0), theWorkerCount(aWorkerCount),
132 shm(ReaderId(id).termedBuf()),
133 reader(NULL)
134 {
135 // create a new segment for the local and remote queue readers
136 // TODO: all our queues and readers should use a single segment
137 shm.create((theWorkerCount+1)*sizeof(QueueReader));
138 reader = new (shm.reserve(sizeof(QueueReader))) QueueReader;
139 debugs(54, 7, HERE << "disker " << id << " reader: " << reader->id);
140
141 assert(theWorkerCount >= 0);
142 biQueues.reserve(theWorkerCount);
143 for (int i = 0; i < theWorkerCount; ++i) {
144 OneToOneBiQueue *const biQueue =
145 new OneToOneBiQueue(QueueId(id, i), maxItemSize, capacity);
146 QueueReader *remoteReader =
147 new (shm.reserve(sizeof(QueueReader))) QueueReader;
148 biQueue->readers(reader, remoteReader);
149 biQueues.push_back(biQueue);
150 }
151 }
152
153 OneToOneBiQueue *
154 FewToOneBiQueue::Attach(const String &id, const int workerId)
155 {
156 // XXX: remove this leak. By refcounting Ipc::Mem::Segments? By creating a global FewToOneBiQueue for each worker?
157 Ipc::Mem::Segment *shmPtr = new Ipc::Mem::Segment(ReaderId(id).termedBuf());
158
159 Ipc::Mem::Segment &shm = *shmPtr;
160 shm.open();
161 assert(shm.size() >= static_cast<off_t>((1 + workerId+1)*sizeof(QueueReader)));
162 QueueReader *readers = reinterpret_cast<QueueReader*>(shm.mem());
163 QueueReader *remoteReader = &readers[0];
164 debugs(54, 7, HERE << "disker " << id << " reader: " << remoteReader->id);
165 QueueReader *localReader = &readers[workerId+1];
166 debugs(54, 7, HERE << "local " << id << " reader: " << localReader->id);
167
168 OneToOneBiQueue *const biQueue =
169 new OneToOneBiQueue(QueueId(id, workerId));
170 biQueue->readers(localReader, remoteReader);
171 return biQueue;
172 }
173
174 FewToOneBiQueue::~FewToOneBiQueue()
175 {
176 for (int i = 0; i < theWorkerCount; ++i)
177 delete biQueues[i];
178 }
179
180 bool FewToOneBiQueue::validWorkerId(const int workerId) const
181 {
182 return 0 <= workerId && workerId < theWorkerCount;
183 }
184
185 void
186 FewToOneBiQueue::clearReaderSignal(int workerId)
187 {
188 debugs(54, 7, HERE << "reader: " << reader->id);
189
190 assert(validWorkerId(workerId));
191 reader->clearSignal();
192
193 // we got a hint; we could reposition iteration to try popping from the
194 // workerId queue first; but it does not seem to help much and might
195 // introduce some bias so we do not do that for now:
196 // theLastPopWorker = (workerId + theWorkerCount - 1) % theWorkerCount;
197 }