]> git.ipfire.org Git - thirdparty/squid.git/blob - src/comm/ModKqueue.cc
SourceFormat Enforcement
[thirdparty/squid.git] / src / comm / ModKqueue.cc
1 /*
2 * Copyright (C) 1996-2015 The Squid Software Foundation and contributors
3 *
4 * Squid software is distributed under GPLv2+ license and includes
5 * contributions from numerous individuals and organizations.
6 * Please see the COPYING and CONTRIBUTORS files for details.
7 */
8
9 /* DEBUG: section 05 Socket Functions */
10
11 /*
12 * This code was originally written by Benno Rice and hacked on quite
13 * a bit by Adrian. Adrian then took it to the hybrid-ircd project to use
14 * in their new IO subsystem. After a year of modifications and some
15 * rather interesting changes (event aggregation) its back in squid.
16 * Thanks to the ircd-hybrid guys.
17 */
18
19 /*
20 * XXX Currently not implemented / supported by this module XXX
21 *
22 * - delay pools
23 * - deferred reads
24 * - flags.read_pending
25 *
26 * So, its not entirely useful in a production setup since if a read
27 * is meant to be deferred it isn't (we're not even throwing the event
28 * away here). Eventually the rest of the code will be rewritten
29 * so deferred reads aren't required.
30 * -- adrian
31 */
32 #include "squid.h"
33
34 #if USE_KQUEUE
35 #include "comm/Loops.h"
36 #include "fde.h"
37 #include "globals.h"
38 #include "SquidTime.h"
39 #include "StatCounters.h"
40 #include "Store.h"
41
42 #include <cerrno>
43 #if HAVE_SYS_EVENT_H
44 #include <sys/event.h>
45 #endif
46
47 #define KE_LENGTH 128
48
49 /* jlemon goofed up and didn't add EV_SET until fbsd 4.3 */
50
51 #ifndef EV_SET
52 #define EV_SET(kevp, a, b, c, d, e, f) do { \
53 (kevp)->ident = (a); \
54 (kevp)->filter = (b); \
55 (kevp)->flags = (c); \
56 (kevp)->fflags = (d); \
57 (kevp)->data = (e); \
58 (kevp)->udata = (f); \
59 } while(0)
60 #endif
61
62 static void kq_update_events(int, short, PF *);
63 static int kq;
64
65 static struct timespec zero_timespec;
66
67 static struct kevent *kqlst; /* kevent buffer */
68 static int kqmax; /* max structs to buffer */
69 static int kqoff; /* offset into the buffer */
70 static int max_poll_time = 1000;
71
72 static void commKQueueRegisterWithCacheManager(void);
73
74 /* XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX */
75 /* Private functions */
76
77 void
78 kq_update_events(int fd, short filter, PF * handler)
79 {
80 PF *cur_handler;
81 int kep_flags;
82
83 switch (filter) {
84
85 case EVFILT_READ:
86 cur_handler = fd_table[fd].read_handler;
87 break;
88
89 case EVFILT_WRITE:
90 cur_handler = fd_table[fd].write_handler;
91 break;
92
93 default:
94 /* XXX bad! -- adrian */
95 return;
96 break;
97 }
98
99 if ((cur_handler == NULL && handler != NULL)
100 || (cur_handler != NULL && handler == NULL)) {
101
102 struct kevent *kep;
103
104 kep = kqlst + kqoff;
105
106 if (handler != NULL) {
107 kep_flags = (EV_ADD | EV_ONESHOT);
108 } else {
109 kep_flags = EV_DELETE;
110 }
111
112 EV_SET(kep, (uintptr_t) fd, filter, kep_flags, 0, 0, 0);
113
114 /* Check if we've used the last one. If we have then submit them all */
115 if (kqoff == kqmax - 1) {
116 int ret;
117
118 ret = kevent(kq, kqlst, kqmax, NULL, 0, &zero_timespec);
119 /* jdc -- someone needs to do error checking... */
120
121 if (ret == -1) {
122 perror("kq_update_events(): kevent()");
123 return;
124 }
125
126 kqoff = 0;
127 } else {
128 ++kqoff;
129 }
130 }
131 }
132
133 /* XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX */
134 /* Public functions */
135
136 /*
137 * comm_select_init
138 *
139 * This is a needed exported function which will be called to initialise
140 * the network loop code.
141 */
142 void
143 Comm::SelectLoopInit(void)
144 {
145 kq = kqueue();
146
147 if (kq < 0) {
148 fatal("comm_select_init: Couldn't open kqueue fd!\n");
149 }
150
151 kqmax = getdtablesize();
152
153 kqlst = (struct kevent *)xmalloc(sizeof(*kqlst) * kqmax);
154 zero_timespec.tv_sec = 0;
155 zero_timespec.tv_nsec = 0;
156
157 commKQueueRegisterWithCacheManager();
158 }
159
160 /*
161 * comm_setselect
162 *
163 * This is a needed exported function which will be called to register
164 * and deregister interest in a pending IO state for a given FD.
165 */
166 void
167 Comm::SetSelect(int fd, unsigned int type, PF * handler, void *client_data, time_t timeout)
168 {
169 fde *F = &fd_table[fd];
170 assert(fd >= 0);
171 assert(F->flags.open);
172 debugs(5, 5, HERE << "FD " << fd << ", type=" << type <<
173 ", handler=" << handler << ", client_data=" << client_data <<
174 ", timeout=" << timeout);
175
176 if (type & COMM_SELECT_READ) {
177 if (F->flags.read_pending)
178 kq_update_events(fd, EVFILT_WRITE, handler);
179
180 kq_update_events(fd, EVFILT_READ, handler);
181
182 F->read_handler = handler;
183 F->read_data = client_data;
184 }
185
186 if (type & COMM_SELECT_WRITE) {
187 kq_update_events(fd, EVFILT_WRITE, handler);
188 F->write_handler = handler;
189 F->write_data = client_data;
190 }
191
192 if (timeout)
193 F->timeout = squid_curtime + timeout;
194
195 }
196
197 void
198 Comm::ResetSelect(int fd)
199 {
200 fde *F = &fd_table[fd];
201 if (F->read_handler) {
202 kq_update_events(fd, EVFILT_READ, (PF *)1);
203 }
204 if (F->write_handler) {
205 kq_update_events(fd, EVFILT_WRITE, (PF *)1);
206 }
207 }
208
209 /*
210 * Check all connections for new connections and input data that is to be
211 * processed. Also check for connections with data queued and whether we can
212 * write it out.
213 */
214
215 /*
216 * comm_select
217 *
218 * Called to do the new-style IO, courtesy of of squid (like most of this
219 * new IO code). This routine handles the stuff we've hidden in
220 * comm_setselect and fd_table[] and calls callbacks for IO ready
221 * events.
222 */
223
224 Comm::Flag
225 Comm::DoSelect(int msec)
226 {
227 int num, i;
228
229 static struct kevent ke[KE_LENGTH];
230
231 struct timespec poll_time;
232
233 if (msec > max_poll_time)
234 msec = max_poll_time;
235
236 poll_time.tv_sec = msec / 1000;
237
238 poll_time.tv_nsec = (msec % 1000) * 1000000;
239
240 for (;;) {
241 num = kevent(kq, kqlst, kqoff, ke, KE_LENGTH, &poll_time);
242 ++statCounter.select_loops;
243 kqoff = 0;
244
245 if (num >= 0)
246 break;
247
248 if (ignoreErrno(errno))
249 break;
250
251 getCurrentTime();
252
253 return Comm::COMM_ERROR;
254
255 /* NOTREACHED */
256 }
257
258 getCurrentTime();
259
260 if (num == 0)
261 return Comm::OK; /* No error.. */
262
263 for (i = 0; i < num; ++i) {
264 int fd = (int) ke[i].ident;
265 PF *hdl = NULL;
266 fde *F = &fd_table[fd];
267
268 if (ke[i].flags & EV_ERROR) {
269 errno = ke[i].data;
270 /* XXX error == bad! -- adrian */
271 continue; /* XXX! */
272 }
273
274 if (ke[i].filter == EVFILT_READ || F->flags.read_pending) {
275 if ((hdl = F->read_handler) != NULL) {
276 F->read_handler = NULL;
277 F->flags.read_pending = 0;
278 hdl(fd, F->read_data);
279 }
280 }
281
282 if (ke[i].filter == EVFILT_WRITE) {
283 if ((hdl = F->write_handler) != NULL) {
284 F->write_handler = NULL;
285 hdl(fd, F->write_data);
286 }
287 }
288
289 if (ke[i].filter != EVFILT_WRITE && ke[i].filter != EVFILT_READ) {
290 /* Bad! -- adrian */
291 debugs(5, DBG_IMPORTANT, "comm_select: kevent returned " << ke[i].filter << "!");
292 }
293 }
294
295 return Comm::OK;
296 }
297
298 void
299 Comm::QuickPollRequired(void)
300 {
301 max_poll_time = 10;
302 }
303
304 static void
305 commKQueueRegisterWithCacheManager(void)
306 {
307 }
308
309 #endif /* USE_KQUEUE */
310