#if defined(WORK_FORK)
typedef struct blocking_child_tag {
- int reusable;
- int pid;
- int req_write_pipe; /* parent */
- int resp_read_pipe;
- void * resp_read_ctx;
- int req_read_pipe; /* child */
- int resp_write_pipe;
- int ispipe;
+ int reusable;
+ int pid;
+ int req_write_pipe; /* parent */
+ int resp_read_pipe;
+ void * resp_read_ctx;
+ int req_read_pipe; /* child */
+ int resp_write_pipe;
+ int ispipe;
+ volatile u_int resp_ready_seen; /* signal/scan */
+ volatile u_int resp_ready_done; /* consumer/mainloop */
} blocking_child;
#elif defined(WORK_THREAD)
typedef struct blocking_child_tag {
-/*
- * blocking workitems and blocking_responses are dynamically-sized
- * one-dimensional arrays of pointers to blocking worker requests and
- * responses.
- *
- * IMPORTANT: This structure is shared between threads, and all access
- * that is not atomic (especially queue operations) must hold the
- * 'accesslock' semaphore to avoid data races.
- *
- * The resource management (thread/semaphore creation/destruction)
- * functions and functions just testing a handle are safe because these
- * are only changed by the main thread when no worker is running on the
- * same data structure.
- */
+ /*
+ * blocking workitems and blocking_responses are
+ * dynamically-sized one-dimensional arrays of pointers to
+ * blocking worker requests and responses.
+ *
+ * IMPORTANT: This structure is shared between threads, and all
+ * access that is not atomic (especially queue operations) must
+ * hold the 'accesslock' semaphore to avoid data races.
+ *
+ * The resource management (thread/semaphore
+ * creation/destruction) functions and functions just testing a
+ * handle are safe because these are only changed by the main
+ * thread when no worker is running on the same data structure.
+ */
int reusable;
sem_ref accesslock; /* shared access lock */
thr_ref thread_ref; /* thread 'handle' */
int resp_write_pipe; /* child */
int ispipe;
void * resp_read_ctx; /* child */
+ volatile u_int resp_ready_seen; /* signal/scan */
+ volatile u_int resp_ready_done; /* consumer/mainloop */
#else
sem_ref responses_pending; /* signalling */
#endif
#endif /* WORK_THREAD */
+/* we need some global tag to indicate any blocking child may be ready: */
+extern volatile u_int blocking_child_ready_seen;/* signal/scan */
+extern volatile u_int blocking_child_ready_done;/* consumer/mainloop */
+
extern blocking_child ** blocking_children;
extern size_t blocking_children_alloc;
extern int worker_per_query; /* boolean */
blocking_pipe_header *, size_t,
const blocking_pipe_header *);
extern void process_blocking_resp(blocking_child *);
+extern void harvest_blocking_responses(void);
extern int send_blocking_req_internal(blocking_child *,
blocking_pipe_header *,
void *);
# endif
#endif
+#if defined(HAVE_SIGNALED_IO) && defined(DEBUG_TIMING)
+# undef DEBUG_TIMING
+#endif
/*
* setsockopt does not always have the same arg declaration
const sockaddr_u *, const sockaddr_u *);
static int create_sockets (u_short);
static SOCKET open_socket (sockaddr_u *, int, int, endpt *);
-static char * fdbits (int, fd_set *);
static void set_reuseaddr (int);
static isc_boolean_t socket_broadcast_enable (struct interface *, SOCKET, sockaddr_u *);
+
+#if !defined(HAVE_IO_COMPLETION_PORT) && !defined(HAVE_SIGNALED_IO)
+static char * fdbits (int, const fd_set *);
+#endif
#ifdef OS_MISSES_SPECIFIC_ROUTE_UPDATES
static isc_boolean_t socket_broadcast_disable (struct interface *, sockaddr_u *);
#endif
#if !defined(HAVE_IO_COMPLETION_PORT)
static inline int read_network_packet (SOCKET, struct interface *, l_fp);
static void ntpd_addremove_io_fd (int, int, int);
-static input_handler_t input_handler;
+static void input_handler_scan (const l_fp*, const fd_set*);
+static int/*BOOL*/ sanitize_fdset (int errc);
#ifdef REFCLOCK
static inline int read_refclock_packet (SOCKET, struct refclockio *, l_fp);
#endif
+#ifdef HAVE_SIGNALED_IO
+static void input_handler (l_fp*);
+#endif
#endif
-
#ifndef HAVE_IO_COMPLETION_PORT
void
maintain_activefds(
addremove_io_fd = &ntpd_addremove_io_fd;
#endif
-#ifdef SYS_WINNT
+#if defined(SYS_WINNT)
init_io_completion_port();
-#endif
-
-#if defined(HAVE_SIGNALED_IO)
+#elif defined(HAVE_SIGNALED_IO)
(void) set_signal(input_handler);
#endif
}
UNUSED_ARG(is_pipe);
#ifdef HAVE_SIGNALED_IO
- init_socket_sig(fd);
+ if (!remove_it)
+ init_socket_sig(fd);
#endif /* not HAVE_SIGNALED_IO */
maintain_activefds(fd, remove_it);
{
return (broadcast_client_enabled);
}
+
/*
* Check to see if the address is a multicast address
*/
}
-#if !defined(HAVE_IO_COMPLETION_PORT)
+#if !defined(HAVE_IO_COMPLETION_PORT) && !defined(HAVE_SIGNALED_IO)
/*
* fdbits - generate ascii representation of fd_set (FAU debug support)
* HFDF format - highest fd first.
*/
static char *
fdbits(
- int count,
- fd_set *set
+ int count,
+ const fd_set* set
)
{
static char buffer[256];
return buffer;
}
-
+#endif
#ifdef REFCLOCK
/*
* and - lacking a hardware reference clock - I have
* yet to learn about anything else that is.
*/
+ ++handler_calls;
rdfdes = activefds;
# if !defined(VMS) && !defined(SYS_VXWORKS)
nfound = select(maxactivefd + 1, &rdfdes, NULL,
/* make select() wake up after one second */
{
struct timeval t1;
-
- t1.tv_sec = 1;
+ t1.tv_sec = 1;
t1.tv_usec = 0;
nfound = select(maxactivefd + 1,
&rdfdes, NULL, NULL,
&t1);
}
# endif /* VMS, VxWorks */
+ if (nfound < 0 && sanitize_fdset(errno)) {
+ struct timeval t1;
+ t1.tv_sec = 0;
+ t1.tv_usec = 0;
+ rdfdes = activefds;
+ nfound = select(maxactivefd + 1,
+ &rdfdes, NULL, NULL,
+ &t1);
+ }
+
if (nfound > 0) {
l_fp ts;
get_systime(&ts);
- input_handler(&ts);
+ input_handler_scan(&ts, &rdfdes);
} else if (nfound == -1 && errno != EINTR) {
msyslog(LOG_ERR, "select() error: %m");
}
# endif /* HAVE_SIGNALED_IO */
}
+#ifdef HAVE_SIGNALED_IO
/*
* input_handler - receive packets asynchronously
+ *
+ * ALWAYS IN SIGNAL HANDLER CONTEXT -- only async-safe functions allowed!
*/
-static void
+static RETSIGTYPE
input_handler(
l_fp * cts
)
{
- int buflen;
int n;
- u_int idx;
- int doing;
- SOCKET fd;
- blocking_child *c;
struct timeval tvzero;
- l_fp ts; /* Timestamp at BOselect() gob */
-#ifdef DEBUG_TIMING
- l_fp ts_e; /* Timestamp at EOselect() gob */
-#endif
fd_set fds;
- size_t select_count;
- endpt * ep;
-#ifdef REFCLOCK
- struct refclockio *rp;
- int saved_errno;
- const char * clk;
-#endif
-#ifdef HAS_ROUTING_SOCKET
- struct asyncio_reader * asyncio_reader;
- struct asyncio_reader * next_asyncio_reader;
-#endif
-
- handler_calls++;
- select_count = 0;
-
- /*
- * If we have something to do, freeze a timestamp.
- * See below for the other cases (nothing left to do or error)
- */
- ts = *cts;
+
+ ++handler_calls;
/*
* Do a poll to see who has data
tvzero.tv_sec = tvzero.tv_usec = 0;
n = select(maxactivefd + 1, &fds, NULL, NULL, &tvzero);
+ if (n < 0 && sanitize_fdset(errno)) {
+ fds = activefds;
+ tvzero.tv_sec = tvzero.tv_usec = 0;
+ n = select(maxactivefd + 1, &fds, NULL, NULL, &tvzero);
+ }
+ if (n > 0)
+ input_handler_scan(cts, &fds);
+}
+#endif /* HAVE_SIGNALED_IO */
+
+
+/*
+ * Try to sanitize the global FD set
+ *
+ * SIGNAL HANDLER CONTEXT if HAVE_SIGNALED_IO, ordinary userspace otherwise
+ */
+static int/*BOOL*/
+sanitize_fdset(
+ int errc
+ )
+{
+ int j, b, maxscan;
+# ifndef HAVE_SIGNALED_IO
/*
- * If there are no packets waiting just return
+ * extended FAU debugging output
*/
- if (n < 0) {
- int err = errno;
- int j, b, prior;
- /*
- * extended FAU debugging output
- */
- if (err != EINTR)
- msyslog(LOG_ERR,
- "select(%d, %s, 0L, 0L, &0.0) error: %m",
- maxactivefd + 1,
- fdbits(maxactivefd, &activefds));
- if (err != EBADF)
- goto ih_return;
- for (j = 0, prior = 0; j <= maxactivefd; j++) {
- if (FD_ISSET(j, &activefds)) {
- if (-1 != read(j, &b, 0)) {
- prior = j;
- continue;
- }
- msyslog(LOG_ERR,
- "Removing bad file descriptor %d from select set",
- j);
- FD_CLR(j, &activefds);
- if (j == maxactivefd)
- maxactivefd = prior;
+ if (errc != EINTR) {
+ msyslog(LOG_ERR,
+ "select(%d, %s, 0L, 0L, &0.0) error: %m",
+ maxactivefd + 1,
+ fdbits(maxactivefd, &activefds));
+ }
+# endif
+
+ if (errc != EBADF)
+ return FALSE;
+
+ /* if we have oviously bad FDs, try to sanitize the FD set. */
+ for (j = 0, maxscan = 0; j <= maxactivefd; j++) {
+ if (FD_ISSET(j, &activefds)) {
+ if (-1 != read(j, &b, 0)) {
+ maxscan = j;
+ continue;
}
+# ifndef HAVE_SIGNALED_IO
+ msyslog(LOG_ERR,
+ "Removing bad file descriptor %d from select set",
+ j);
+# endif
+ FD_CLR(j, &activefds);
}
- goto ih_return;
}
- else if (n == 0)
- goto ih_return;
+ if (maxactivefd != maxscan)
+ maxactivefd = maxscan;
+ return TRUE;
+}
+
+/*
+ * scan the known FDs (clocks, servers, ...) for presence in a 'fd_set'.
+ *
+ * SIGNAL HANDLER CONTEXT if HAVE_SIGNALED_IO, ordinary userspace otherwise
+ */
+static void
+input_handler_scan(
+ const l_fp * cts,
+ const fd_set * pfds
+ )
+{
+ int buflen;
+ u_int idx;
+ int doing;
+ SOCKET fd;
+ blocking_child *c;
+ l_fp ts; /* Timestamp at BOselect() gob */
+
+#if defined(DEBUG_TIMING)
+ l_fp ts_e; /* Timestamp at EOselect() gob */
+#endif
+ endpt * ep;
+#ifdef REFCLOCK
+ struct refclockio *rp;
+ int saved_errno;
+ const char * clk;
+#endif
+#ifdef HAS_ROUTING_SOCKET
+ struct asyncio_reader * asyncio_reader;
+ struct asyncio_reader * next_asyncio_reader;
+#endif
++handler_pkts;
+ ts = *cts;
#ifdef REFCLOCK
/*
* Check out the reference clocks first, if any
*/
-
- if (refio != NULL) {
- for (rp = refio; rp != NULL; rp = rp->next) {
- fd = rp->fd;
-
- if (!FD_ISSET(fd, &fds))
- continue;
- ++select_count;
- buflen = read_refclock_packet(fd, rp, ts);
- /*
- * The first read must succeed after select()
- * indicates readability, or we've reached
- * a permanent EOF. http://bugs.ntp.org/1732
- * reported ntpd munching CPU after a USB GPS
- * was unplugged because select was indicating
- * EOF but ntpd didn't remove the descriptor
- * from the activefds set.
- */
- if (buflen < 0 && EAGAIN != errno) {
- saved_errno = errno;
- clk = refnumtoa(&rp->srcclock->srcadr);
- errno = saved_errno;
- msyslog(LOG_ERR, "%s read: %m", clk);
- maintain_activefds(fd, TRUE);
- } else if (0 == buflen) {
- clk = refnumtoa(&rp->srcclock->srcadr);
- msyslog(LOG_ERR, "%s read EOF", clk);
- maintain_activefds(fd, TRUE);
- } else {
- /* drain any remaining refclock input */
- do {
- buflen = read_refclock_packet(fd, rp, ts);
- } while (buflen > 0);
- }
+
+ for (rp = refio; rp != NULL; rp = rp->next) {
+ fd = rp->fd;
+
+ if (!FD_ISSET(fd, pfds))
+ continue;
+ buflen = read_refclock_packet(fd, rp, ts);
+ /*
+ * The first read must succeed after select() indicates
+ * readability, or we've reached a permanent EOF.
+ * http://bugs.ntp.org/1732 reported ntpd munching CPU
+ * after a USB GPS was unplugged because select was
+ * indicating EOF but ntpd didn't remove the descriptor
+ * from the activefds set.
+ */
+ if (buflen < 0 && EAGAIN != errno) {
+ saved_errno = errno;
+ clk = refnumtoa(&rp->srcclock->srcadr);
+ errno = saved_errno;
+ msyslog(LOG_ERR, "%s read: %m", clk);
+ maintain_activefds(fd, TRUE);
+ } else if (0 == buflen) {
+ clk = refnumtoa(&rp->srcclock->srcadr);
+ msyslog(LOG_ERR, "%s read EOF", clk);
+ maintain_activefds(fd, TRUE);
+ } else {
+ /* drain any remaining refclock input */
+ do {
+ buflen = read_refclock_packet(fd, rp, ts);
+ } while (buflen > 0);
}
}
#endif /* REFCLOCK */
}
if (fd < 0)
continue;
- if (FD_ISSET(fd, &fds))
+ if (FD_ISSET(fd, pfds))
do {
- ++select_count;
buflen = read_network_packet(
fd, ep, ts);
} while (buflen > 0);
while (asyncio_reader != NULL) {
/* callback may unlink and free asyncio_reader */
next_asyncio_reader = asyncio_reader->link;
- if (FD_ISSET(asyncio_reader->fd, &fds)) {
- ++select_count;
+ if (FD_ISSET(asyncio_reader->fd, pfds))
(*asyncio_reader->receiver)(asyncio_reader);
- }
asyncio_reader = next_asyncio_reader;
}
#endif /* HAS_ROUTING_SOCKET */
c = blocking_children[idx];
if (NULL == c || -1 == c->resp_read_pipe)
continue;
- if (FD_ISSET(c->resp_read_pipe, &fds)) {
- select_count++;
- process_blocking_resp(c);
+ if (FD_ISSET(c->resp_read_pipe, pfds)) {
+ ++c->resp_ready_seen;
+ ++blocking_child_ready_seen;
}
}
- /*
- * Done everything from that select.
- * If nothing to do, just return.
- * If an error occurred, complain and return.
- */
- if (select_count == 0) { /* We really had nothing to do */
-#ifdef DEBUG
- if (debug)
- msyslog(LOG_DEBUG, "input_handler: select() returned 0");
-#endif /* DEBUG */
- goto ih_return;
- }
/* We've done our work */
-#ifdef DEBUG_TIMING
+#if defined(DEBUG_TIMING)
get_systime(&ts_e);
/*
* (ts_e - ts) is the amount of time we spent
"input_handler: Processed a gob of fd's in %s msec",
lfptoms(&ts_e, 6));
#endif /* DEBUG_TIMING */
- /* We're done... */
- ih_return:
- return;
}
-#endif /* !HAVE_IO_COMPLETION_PORT */
/*