6 #ifndef SQUID_IPC_QUEUE_H
7 #define SQUID_IPC_QUEUE_H
10 #include "base/InstanceId.h"
11 #include "ipc/AtomicWord.h"
12 #include "ipc/mem/Segment.h"
17 /// State of the reading end of a queue (i.e., of the code calling pop()).
18 /// Multiple queues attached to one reader share this state.
21 QueueReader(); // the initial state is "blocked without a signal"
23 /// whether the reader is waiting for a notification signal
24 bool blocked() const { return popBlocked
== 1; }
26 /// marks the reader as blocked, waiting for a notification signal
27 void block() { popBlocked
.swap_if(0, 1); }
29 /// removes the block() effects
30 void unblock() { popBlocked
.swap_if(1, 0); }
32 /// if reader is blocked and not notified, marks the notification signal
33 /// as sent and not received, returning true; otherwise, returns false
34 bool raiseSignal() { return blocked() && popSignal
.swap_if(0,1); }
36 /// marks sent reader notification as received (also removes pop blocking)
37 void clearSignal() { unblock(); popSignal
.swap_if(1,0); }
40 AtomicWord popBlocked
; ///< whether the reader is blocked on pop()
41 AtomicWord popSignal
; ///< whether writer has sent and reader has not received notification
44 /// unique ID for debugging which reader is used (works across processes)
45 const InstanceId
<QueueReader
> id
;
50 * Lockless fixed-capacity queue for a single writer and a single reader.
52 * If the queue is empty, the reader is considered "blocked" and needs
53 * an out-of-band notification message to notice the next pushed item.
55 * Current implementation assumes that the writer cannot get blocked: if the
56 * queue is full, the writer will just not push and come back later (with a
57 * different value). We can add support for blocked writers if needed.
59 class OneToOneUniQueue
{
61 // pop() and push() exceptions; TODO: use TextException instead
63 class ItemTooLarge
{};
65 OneToOneUniQueue(const String
&id
, const unsigned int maxItemSize
, const int capacity
);
66 OneToOneUniQueue(const String
&id
);
68 unsigned int maxItemSize() const { return shared
->theMaxItemSize
; }
69 int size() const { return shared
->theSize
; }
70 int capacity() const { return shared
->theCapacity
; }
72 bool empty() const { return !shared
->theSize
; }
73 bool full() const { return shared
->theSize
== shared
->theCapacity
; }
75 static int Bytes2Items(const unsigned int maxItemSize
, int size
);
76 static int Items2Bytes(const unsigned int maxItemSize
, const int size
);
78 /// returns true iff the value was set; [un]blocks the reader as needed
79 template<class Value
> bool pop(Value
&value
);
81 /// returns true iff the caller must notify the reader of the pushed item
82 template<class Value
> bool push(const Value
&value
);
84 QueueReader
&reader();
85 void reader(QueueReader
*aReader
);
89 Shared(const unsigned int aMaxItemSize
, const int aCapacity
);
91 unsigned int theIn
; ///< input index, used only in push()
92 unsigned int theOut
; ///< output index, used only in pop()
94 AtomicWord theSize
; ///< number of items in the queue
95 const unsigned int theMaxItemSize
; ///< maximum item size
96 const int theCapacity
; ///< maximum number of items, i.e. theBuffer size
101 Ipc::Mem::Segment shm
; ///< shared memory segment
102 Shared
*shared
; ///< pointer to shared memory
103 QueueReader
*reader_
; ///< the state of the code popping from this queue
106 /// Lockless fixed-capacity bidirectional queue for two processes.
107 class OneToOneBiQueue
{
109 typedef OneToOneUniQueue::Full Full
;
110 typedef OneToOneUniQueue::ItemTooLarge ItemTooLarge
;
112 /// Create a new shared queue.
113 OneToOneBiQueue(const String
&id
, const unsigned int maxItemSize
, const int capacity
);
114 OneToOneBiQueue(const String
&id
); ///< Attach to existing shared queue.
116 void readers(QueueReader
*r1
, QueueReader
*r2
);
117 void clearReaderSignal();
119 /* wrappers to call the right OneToOneUniQueue method for this process */
120 template<class Value
> bool pop(Value
&value
) { return popQueue
->pop(value
); }
121 template<class Value
> bool push(const Value
&value
) { return pushQueue
->push(value
); }
124 OneToOneUniQueue
*const popQueue
; ///< queue to pop from for this process
125 OneToOneUniQueue
*const pushQueue
; ///< queue to push to for this process
129 * Lockless fixed-capacity bidirectional queue for a limited number
130 * pricesses. Implements a star topology: Many worker processes
131 * communicate with the one central process. The central process uses
132 * FewToOneBiQueue object, while workers use OneToOneBiQueue objects
133 * created with the Attach() method. Each worker has a unique integer
134 * ID in [1, workerCount] range.
136 class FewToOneBiQueue
{
138 typedef OneToOneBiQueue::Full Full
;
139 typedef OneToOneBiQueue::ItemTooLarge ItemTooLarge
;
141 FewToOneBiQueue(const String
&id
, const int aWorkerCount
, const unsigned int maxItemSize
, const int capacity
);
142 static OneToOneBiQueue
*Attach(const String
&id
, const int workerId
);
145 bool validWorkerId(const int workerId
) const;
146 int workerCount() const { return theWorkerCount
; }
148 /// clears the reader notification received by the disker from worker
149 void clearReaderSignal(int workerId
);
151 /// picks a worker and calls OneToOneUniQueue::pop() using its queue
152 template <class Value
> bool pop(int &workerId
, Value
&value
);
154 /// calls OneToOneUniQueue::push() using the given worker queue
155 template <class Value
> bool push(const int workerId
, const Value
&value
);
157 //private: XXX: make private by moving pop/push debugging into pop/push
158 int theLastPopWorker
; ///< the ID of the last worker we tried to pop() from
159 Vector
<OneToOneBiQueue
*> biQueues
; ///< worker queues indexed by worker ID
160 const int theWorkerCount
; ///< the total number of workers
162 Ipc::Mem::Segment shm
; ///< shared memory segment to store the reader
163 QueueReader
*reader
; ///< the state of the code popping from all biQueues
165 enum { WorkerIdOffset
= 1 }; ///< worker ID offset, always 1 for now
171 template <class Value
>
173 OneToOneUniQueue::pop(Value
&value
)
175 if (sizeof(value
) > shared
->theMaxItemSize
)
176 throw ItemTooLarge();
178 // A writer might push between the empty test and block() below, so we do
179 // not return false right after calling block(), but test again.
182 // A writer might push between the empty test and block() below,
183 // so we must test again as such a writer will not signal us.
189 const unsigned int pos
=
190 (shared
->theOut
++ % shared
->theCapacity
) * shared
->theMaxItemSize
;
191 memcpy(&value
, shared
->theBuffer
+ pos
, sizeof(value
));
197 template <class Value
>
199 OneToOneUniQueue::push(const Value
&value
)
201 if (sizeof(value
) > shared
->theMaxItemSize
)
202 throw ItemTooLarge();
207 const bool wasEmpty
= empty();
208 const unsigned int pos
=
209 shared
->theIn
++ % shared
->theCapacity
* shared
->theMaxItemSize
;
210 memcpy(shared
->theBuffer
+ pos
, &value
, sizeof(value
));
213 return wasEmpty
&& reader().raiseSignal();
219 template <class Value
>
221 FewToOneBiQueue::pop(int &workerId
, Value
&value
)
223 // iterate all workers, starting after the one we visited last
224 for (int i
= 0; i
< theWorkerCount
; ++i
) {
225 theLastPopWorker
= (theLastPopWorker
+ 1) % theWorkerCount
;
226 if (biQueues
[theLastPopWorker
]->pop(value
)) {
227 workerId
= theLastPopWorker
+ WorkerIdOffset
;
231 return false; // no worker had anything to pop
234 template <class Value
>
236 FewToOneBiQueue::push(const int workerId
, const Value
&value
)
238 assert(validWorkerId(workerId
));
239 return biQueues
[workerId
- WorkerIdOffset
]->push(value
);
242 #endif // SQUID_IPC_QUEUE_H