#include "sysincl.h"
+#include "array.h"
#include "ntp_io.h"
#include "ntp_core.h"
#include "ntp_sources.h"
struct sockaddr u;
};
+struct Message {
+ union sockaddr_in46 name;
+ struct iovec iov;
+ NTP_Receive_Buffer buf;
+ /* Aligned buffer for control messages */
+ struct cmsghdr cmsgbuf[CMSGBUF_SIZE / sizeof (struct cmsghdr)];
+};
+
+struct MessageHeader {
+ struct msghdr msg_hdr;
+ unsigned int msg_len;
+};
+
+#define MAX_RECV_MESSAGES 1
+
+/* Arrays of Message and MessageHeader */
+static ARR_Instance recv_messages;
+static ARR_Instance recv_headers;
+
/* The server/peer and client sockets for IPv4 and IPv6 */
static int server_sock_fd4;
static int client_sock_fd4;
}
/* ================================================== */
+
+static void
+prepare_buffers(unsigned int n)
+{
+ struct MessageHeader *hdr;
+ struct Message *msg;
+ unsigned int i;
+
+ for (i = 0; i < n; i++) {
+ msg = ARR_GetElement(recv_messages, i);
+ hdr = ARR_GetElement(recv_headers, i);
+
+ msg->iov.iov_base = &msg->buf;
+ msg->iov.iov_len = sizeof (msg->buf);
+ hdr->msg_hdr.msg_name = &msg->name;
+ hdr->msg_hdr.msg_namelen = sizeof (msg->name);
+ hdr->msg_hdr.msg_iov = &msg->iov;
+ hdr->msg_hdr.msg_iovlen = 1;
+ hdr->msg_hdr.msg_control = &msg->cmsgbuf;
+ hdr->msg_hdr.msg_controllen = sizeof (msg->cmsgbuf);
+ hdr->msg_hdr.msg_flags = 0;
+ hdr->msg_len = 0;
+ }
+}
+
+/* ================================================== */
+
void
NIO_Initialise(int family)
{
assert(!initialised);
initialised = 1;
+ recv_messages = ARR_CreateInstance(sizeof (struct Message));
+ ARR_SetSize(recv_messages, MAX_RECV_MESSAGES);
+ recv_headers = ARR_CreateInstance(sizeof (struct MessageHeader));
+ ARR_SetSize(recv_headers, MAX_RECV_MESSAGES);
+ prepare_buffers(MAX_RECV_MESSAGES);
+
server_port = CNF_GetNTPPort();
client_port = CNF_GetAcquisitionPort();
close_socket(server_sock_fd6);
server_sock_fd6 = client_sock_fd6 = INVALID_SOCK_FD;
#endif
+ ARR_DestroyInstance(recv_headers);
+ ARR_DestroyInstance(recv_messages);
initialised = 0;
}
/* ================================================== */
static void
-read_from_socket(int sock_fd, int event, void *anything)
+process_receive(struct msghdr *hdr, int length, int sock_fd)
{
- /* This should only be called when there is something
- to read, otherwise it will block. */
-
- int status;
- NTP_Receive_Buffer message;
- union sockaddr_in46 where_from;
- unsigned int flags = 0;
- struct timeval now;
- double now_err;
NTP_Remote_Address remote_addr;
NTP_Local_Address local_addr;
- struct cmsghdr cmsgbuf[CMSGBUF_SIZE / sizeof (struct cmsghdr)];
- struct msghdr msg;
- struct iovec iov;
struct cmsghdr *cmsg;
-
- assert(initialised);
+ struct timeval now;
+ double now_err;
SCH_GetLastEventTime(&now, &now_err, NULL);
- iov.iov_base = &message.ntp_pkt;
- iov.iov_len = sizeof(message);
- msg.msg_name = &where_from;
- msg.msg_namelen = sizeof(where_from);
- msg.msg_iov = &iov;
- msg.msg_iovlen = 1;
- msg.msg_control = (void *) cmsgbuf;
- msg.msg_controllen = sizeof(cmsgbuf);
- msg.msg_flags = 0;
-
- status = recvmsg(sock_fd, &msg, flags);
-
- /* Don't bother checking if read failed or why if it did. More
- likely than not, it will be connection refused, resulting from a
- previous sendto() directing a datagram at a port that is not
- listening (which appears to generate an ICMP response, and on
- some architectures e.g. Linux this is translated into an error
- reponse on a subsequent recvfrom). */
-
- if (status > 0) {
- if (msg.msg_namelen > sizeof (where_from))
- LOG_FATAL(LOGF_NtpIO, "Truncated source address");
+ if (hdr->msg_namelen > sizeof (union sockaddr_in46)) {
+ DEBUG_LOG(LOGF_NtpIO, "Truncated source address");
+ return;
+ }
- UTI_SockaddrToIPAndPort(&where_from.u, &remote_addr.ip_addr, &remote_addr.port);
+ UTI_SockaddrToIPAndPort((struct sockaddr *)hdr->msg_name,
+ &remote_addr.ip_addr, &remote_addr.port);
- local_addr.ip_addr.family = IPADDR_UNSPEC;
- local_addr.sock_fd = sock_fd;
+ local_addr.ip_addr.family = IPADDR_UNSPEC;
+ local_addr.sock_fd = sock_fd;
- for (cmsg = CMSG_FIRSTHDR(&msg); cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
+ for (cmsg = CMSG_FIRSTHDR(hdr); cmsg; cmsg = CMSG_NXTHDR(hdr, cmsg)) {
#ifdef HAVE_IN_PKTINFO
- if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) {
- struct in_pktinfo ipi;
+ if (cmsg->cmsg_level == IPPROTO_IP && cmsg->cmsg_type == IP_PKTINFO) {
+ struct in_pktinfo ipi;
- memcpy(&ipi, CMSG_DATA(cmsg), sizeof(ipi));
- local_addr.ip_addr.addr.in4 = ntohl(ipi.ipi_spec_dst.s_addr);
- local_addr.ip_addr.family = IPADDR_INET4;
- }
+ memcpy(&ipi, CMSG_DATA(cmsg), sizeof(ipi));
+ local_addr.ip_addr.addr.in4 = ntohl(ipi.ipi_spec_dst.s_addr);
+ local_addr.ip_addr.family = IPADDR_INET4;
+ }
#endif
#ifdef HAVE_IN6_PKTINFO
- if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) {
- struct in6_pktinfo ipi;
-
- memcpy(&ipi, CMSG_DATA(cmsg), sizeof(ipi));
- memcpy(&local_addr.ip_addr.addr.in6, &ipi.ipi6_addr.s6_addr,
- sizeof (local_addr.ip_addr.addr.in6));
- local_addr.ip_addr.family = IPADDR_INET6;
- }
+ if (cmsg->cmsg_level == IPPROTO_IPV6 && cmsg->cmsg_type == IPV6_PKTINFO) {
+ struct in6_pktinfo ipi;
+
+ memcpy(&ipi, CMSG_DATA(cmsg), sizeof(ipi));
+ memcpy(&local_addr.ip_addr.addr.in6, &ipi.ipi6_addr.s6_addr,
+ sizeof (local_addr.ip_addr.addr.in6));
+ local_addr.ip_addr.family = IPADDR_INET6;
+ }
#endif
#ifdef SO_TIMESTAMP
- if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_TIMESTAMP) {
- struct timeval tv;
+ if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SO_TIMESTAMP) {
+ struct timeval tv;
- memcpy(&tv, CMSG_DATA(cmsg), sizeof(tv));
- LCL_CookTime(&tv, &now, &now_err);
- }
-#endif
+ memcpy(&tv, CMSG_DATA(cmsg), sizeof(tv));
+ LCL_CookTime(&tv, &now, &now_err);
}
+#endif
+ }
- DEBUG_LOG(LOGF_NtpIO, "Received %d bytes from %s:%d to %s fd %d",
- status, UTI_IPToString(&remote_addr.ip_addr), remote_addr.port,
- UTI_IPToString(&local_addr.ip_addr), local_addr.sock_fd);
+ DEBUG_LOG(LOGF_NtpIO, "Received %d bytes from %s:%d to %s fd %d",
+ length, UTI_IPToString(&remote_addr.ip_addr), remote_addr.port,
+ UTI_IPToString(&local_addr.ip_addr), local_addr.sock_fd);
- if (status >= NTP_NORMAL_PACKET_LENGTH) {
+ /* Just ignore the packet if it's not of a recognized length */
+ if (length < NTP_NORMAL_PACKET_LENGTH || length > sizeof (NTP_Receive_Buffer))
+ return;
- NSR_ProcessReceive((NTP_Packet *) &message.ntp_pkt, &now, now_err,
- &remote_addr, &local_addr, status);
+ NSR_ProcessReceive((NTP_Packet *)hdr->msg_iov[0].iov_base, &now, now_err,
+ &remote_addr, &local_addr, length);
+}
- } else {
+/* ================================================== */
- /* Just ignore the packet if it's not of a recognized length */
+static void
+read_from_socket(int sock_fd, int event, void *anything)
+{
+ /* This should only be called when there is something
+ to read, otherwise it will block */
- }
+ struct MessageHeader *hdr;
+ unsigned int i, n;
+ int status;
+
+ hdr = ARR_GetElements(recv_headers);
+ n = ARR_GetSize(recv_headers);
+ assert(n >= 1);
+
+ n = 1;
+ status = recvmsg(sock_fd, &hdr[0].msg_hdr, 0);
+ if (status >= 0)
+ hdr[0].msg_len = status;
+
+ if (status < 0) {
+ DEBUG_LOG(LOGF_NtpIO, "Could not receive from fd %d : %s", sock_fd,
+ strerror(errno));
+ return;
}
+
+ for (i = 0; i < n; i++) {
+ hdr = ARR_GetElement(recv_headers, i);
+ process_receive(&hdr->msg_hdr, hdr->msg_len, sock_fd);
+ }
+
+ /* Restore the buffers to their original state */
+ prepare_buffers(n);
}
/* ================================================== */