+++ /dev/null
-/*
- mtr -- a network diagnostic tool
- Copyright (C) 2016 Matt Kimball
-
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License version 2 as
- published by the Free Software Foundation.
-
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
-
- You should have received a copy of the GNU General Public License along
- with this program; if not, write to the Free Software Foundation, Inc.,
- 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-*/
-
-#include "command.h"
-
-#include <errno.h>
-#include <io.h>
-#include <stdio.h>
-
-/*
- A completion routine to be called by Windows when a read from
- the command stream has completed.
-*/
-static
-void CALLBACK finish_read_command(
- DWORD status,
- DWORD size_read,
- OVERLAPPED * overlapped)
-{
- struct command_buffer_t *buffer;
- char *read_position;
-
- /*
- hEvent is unusuaed by ReadFileEx, so we use it to pass
- our command_buffer structure.
- */
- buffer = (struct command_buffer_t *) overlapped->hEvent;
-
- if (status) {
- /* When the stream is closed ERROR_BROKEN_PIPE will be the result */
- if (status == ERROR_BROKEN_PIPE) {
- buffer->platform.pipe_open = false;
- return;
- }
-
- fprintf(stderr, "ReadFileEx completion failure %d\n", status);
- exit(EXIT_FAILURE);
- }
-
- /* Copy from the overlapped I/O buffer to the incoming command buffer */
- read_position =
- &buffer->incoming_buffer[buffer->incoming_read_position];
- memcpy(read_position, buffer->platform.overlapped_buffer, size_read);
-
- /* Account for the newly read data */
- buffer->incoming_read_position += size_read;
- buffer->platform.read_active = false;
-}
-
-/*
- An APC which does nothing, to be used only to wake from the current
- alertable wait.
-*/
-static
-void CALLBACK empty_apc(
- ULONG * param)
-{
-}
-
-/* Wake from the next alertable wait without waiting for newly read data */
-static
-void queue_empty_apc(
- void)
-{
- if (QueueUserAPC((PAPCFUNC) empty_apc, GetCurrentThread(), 0) == 0) {
- fprintf(stderr, "Unexpected QueueUserAPC failure %d\n",
- GetLastError());
- exit(EXIT_FAILURE);
- }
-}
-
-/* Start a new overlapped I/O read from the command stream */
-void start_read_command(
- struct command_buffer_t *buffer)
-{
- HANDLE command_stream = (HANDLE) get_osfhandle(buffer->command_stream);
- int space_remaining =
- COMMAND_BUFFER_SIZE - buffer->incoming_read_position - 1;
- int err;
-
- /* If a read is already active, or the pipe is closed, do nothing */
- if (!buffer->platform.pipe_open || buffer->platform.read_active) {
- return;
- }
-
- memset(&buffer->platform.overlapped, 0, sizeof(OVERLAPPED));
- buffer->platform.overlapped.hEvent = (HANDLE) buffer;
-
- if (!ReadFileEx
- (command_stream, buffer->platform.overlapped_buffer,
- space_remaining, &buffer->platform.overlapped,
- finish_read_command)) {
-
- err = GetLastError();
-
- if (err == ERROR_BROKEN_PIPE) {
- /* If the command stream has been closed, we need to wake from
- the next altertable wait to exit the main loop */
- buffer->platform.pipe_open = false;
- queue_empty_apc();
-
- return;
- } else if (err != WAIT_IO_COMPLETION) {
- fprintf(stderr, "Unexpected ReadFileEx failure %d\n",
- GetLastError());
- exit(EXIT_FAILURE);
- }
- }
-
- /* Remember that we have started an overlapped read already */
- buffer->platform.read_active = true;
-}
-
-/* Initialize the command buffer, and start the first overlapped read */
-void init_command_buffer(
- struct command_buffer_t *command_buffer,
- int command_stream)
-{
- memset(command_buffer, 0, sizeof(struct command_buffer_t));
- command_buffer->command_stream = command_stream;
- command_buffer->platform.pipe_open = true;
-}
-
-/*
- Return with errno EPIPE if the command stream has been closed.
- Otherwise, not much to do for Cygwin, since we are using Overlapped I/O
- to read commands.
-*/
-int read_commands(
- struct command_buffer_t *buffer)
-{
- if (!buffer->platform.pipe_open) {
- errno = EPIPE;
- return -1;
- }
-
- return 0;
-}
#include "probe.h"
+#include <assert.h>
#include <errno.h>
+#include <error.h>
+#include <fcntl.h>
+#include <io.h>
#include <stdio.h>
+#include <unistd.h>
#include <winternl.h>
#include "protocols.h"
+
+/*
+ Implementation notes (or "Why this uses a worker thread")
+
+ Having done my time debugging various race conditions over the
+ last twenty-plus years as a software developer, both of my own
+ creation and discovered in the code of others, I almost always
+ try to structure my code to be single-threaded. However,
+ I think in this case, the ICMP service thread is unavoidable.
+
+ I would have liked to avoid multithreading entirely, but here are
+ the constraints:
+
+ a) mtr was originally a Unix program which used "raw sockets".
+ b) In order to port mtr to Windows, Cygwin is used to get a
+ Unix-like environment.
+ c) You can't use a raw socket to receive an ICMP reply on Windows.
+ However, Windows provides a separate API in the form of
+ ICMP.DLL for sending and receiving ICMP messages.
+ d) The ICMP API works asynchronously, and requires completion
+ through an asynchronous procedure call ("APC")
+ e) APCs are only delivered during blocking Win32 operations
+ which are flagged as "alertable." This prevents apps from
+ having APCs execute unexpectedly during an I/O operation.
+ f) Cygwin's implementation of POSIX functions does all I/O
+ through non-alertable I/O operations. This is reasonable
+ because APCs don't exist in the POSIX API.
+ g) Cygwin implements Unix-style signals at the application level,
+ since the Windows kernel doesn't have them. We want our
+ program to respond to SIGTERM and SIGKILL, at least.
+ h) Cygwin's signal implementation will deliver signals during
+ blocking I/O functions in the Cygwin library, but won't
+ respond to signals if the signal is sent while the application
+ is in a blocking Windows API call which Cygwin is not aware of.
+ i) Since we want to both send/receive ICMP probes and also respond
+ to Unix-style signals, we require two threads: one which
+ uses Cygwin's POSIX style blocking I/O and can respond to
+ signals, and one which uses alertable waits using Win32
+ blocking APIs.
+
+ The solution is to have the main thread using select() as the
+ blocking operation in its loop, and also to have an ICMP service
+ thread using WaitForSingleObjectEx() as its blocking operation.
+ The main thread will respond to signals. The ICMP service thread
+ will run the APCs completing ICMP.DLL requests.
+
+ These two threads communicate through a pair of pipes. One pipe
+ sends requests from the main thread to the ICMP service thread,
+ and another pipe sends the requests back as they complete.
+
+ We use the Cygwin pipe() to create the pipes, but in the ICMP
+ service thread we use the Win32 HANDLE that corresponds to the
+ recieving end of the input pipe to wait for ICMP requests.
+*/
+
+
+static DWORD WINAPI icmp_service_thread(LPVOID param);
+
/* Windows doesn't require any initialization at a privileged level */
void init_net_state_privileged(
struct net_state_t *net_state)
{
}
-/* Open the ICMP.DLL interface */
+/*
+ Convienience similar to error(), but for reporting Windows
+ error codes instead of errno codes.
+*/
+void error_win(int exit_code, int win_error, const char *str) {
+ fprintf(stderr, "%s (code %d)\n", str, win_error);
+ exit(exit_code);
+}
+
+/* Open the ICMP.DLL interface and start the ICMP service thread */
void init_net_state(
struct net_state_t *net_state)
{
+ HANDLE thread;
+ int in_pipe[2], out_pipe[2];
+ int err;
+
memset(net_state, 0, sizeof(struct net_state_t));
net_state->platform.icmp4 = IcmpCreateFile();
if (net_state->platform.icmp4 == INVALID_HANDLE_VALUE
&& net_state->platform.icmp6 == INVALID_HANDLE_VALUE) {
- fprintf(stderr, "Failure opening ICMP %d\n", GetLastError());
- exit(EXIT_FAILURE);
+
+ error_win(EXIT_FAILURE, GetLastError(), "Failure opening ICMP");
}
net_state->platform.ip4_socket_raw = false;
net_state->platform.ip6_socket_raw = false;
+
+ /*
+ We need a pipe for communication with the ICMP thread
+ in each direction.
+ */
+ if (pipe(in_pipe) == -1 || pipe(out_pipe) == -1) {
+ error(EXIT_FAILURE, errno, "Failure creating thread pipe");
+ }
+
+ net_state->platform.thread_in_pipe_read = in_pipe[0];
+ net_state->platform.thread_in_pipe_write = in_pipe[1];
+ net_state->platform.thread_out_pipe_read = out_pipe[0];
+ net_state->platform.thread_out_pipe_write = out_pipe[1];
+
+ net_state->platform.thread_in_pipe_read_handle =
+ (HANDLE)get_osfhandle(in_pipe[0]);
+
+ /*
+ The read on the out pipe needs to be nonblocking because
+ it will be occasionally checked in the main thread.
+ */
+ err = fcntl(out_pipe[0], F_SETFL, O_NONBLOCK);
+ if (err == -1) {
+ error(
+ EXIT_FAILURE, errno,
+ "Failure setting pipe to non-blocking");
+ }
+
+ /* Spin up the ICMP service thread */
+ thread = CreateThread(
+ NULL, 0, icmp_service_thread, net_state, 0, NULL);
+
+ if (thread == NULL) {
+ error_win(
+ EXIT_FAILURE, GetLastError(),
+ "Failure creating ICMP service thread");
+ }
}
/*
void platform_free_probe(
struct probe_t *probe)
{
- if (probe->platform.reply4) {
- free(probe->platform.reply4);
- probe->platform.reply4 = NULL;
- }
}
/* Report a windows error code using a platform-independent error string */
}
}
+/*
+ After we have the result of an ICMP probe on the ICMP service
+ thread, this is used to send the result back to the main thread
+ for probe result reporting.
+*/
+static
+void queue_thread_result(struct icmp_thread_request_t *request)
+{
+ int byte_count;
+
+ /* Pass ownership of the request back through the result pipe */
+ byte_count = write(
+ request->net_state->platform.thread_out_pipe_write,
+ &request,
+ sizeof(struct icmp_thread_request_t *));
+ if (byte_count == -1) {
+ error(
+ EXIT_FAILURE, errno,
+ "failure writing to probe result queue");
+ }
+}
+
/*
The overlapped I/O style completion routine to be called by
Windows during an altertable wait when an ICMP probe has
PIO_STATUS_BLOCK status,
ULONG reserved)
{
- struct probe_t *probe = (struct probe_t *) context;
- struct net_state_t *net_state = probe->platform.net_state;
+ struct icmp_thread_request_t *request =
+ (struct icmp_thread_request_t *) context;
int icmp_type;
int round_trip_us = 0;
int reply_count;
ICMP_ECHO_REPLY *reply4;
ICMPV6_ECHO_REPLY *reply6;
- if (probe->platform.ip_version == 6) {
- reply6 = probe->platform.reply6;
+ if (request->ip_version == 6) {
+ reply6 = request->reply6;
reply_count = Icmp6ParseReplies(reply6, sizeof(ICMPV6_ECHO_REPLY));
if (reply_count > 0) {
remote_addr6->sin6_scope_id = 0;
}
} else {
- reply4 = probe->platform.reply4;
+ reply4 = request->reply4;
reply_count = IcmpParseReplies(reply4, sizeof(ICMP_ECHO_REPLY));
if (reply_count > 0) {
icmp_type = ICMP_DEST_UNREACH;
}
- if (icmp_type != -1) {
- /* Record probe result */
- respond_to_probe(net_state, probe, icmp_type,
- &remote_addr, round_trip_us, 0, NULL);
- } else {
- report_win_error(probe->token, reply_status);
- free_probe(net_state, probe);
- }
+ request->icmp_type = icmp_type;
+ request->reply_status = reply_status;
+ request->remote_addr = remote_addr;
+ request->round_trip_us = round_trip_us;
+ queue_thread_result(request);
}
/* Use ICMP.DLL's send echo support to send a probe */
static
void icmp_send_probe(
- struct net_state_t *net_state,
- struct probe_t *probe,
- const struct probe_param_t *param,
- struct sockaddr_storage *src_sockaddr,
- struct sockaddr_storage *dest_sockaddr,
+ struct icmp_thread_request_t *request,
char *payload,
int payload_size)
{
struct sockaddr_in6 *src_sockaddr6;
struct sockaddr_in6 *dest_sockaddr6;
- if (param->timeout > 0) {
- timeout = 1000 * param->timeout;
+ if (request->timeout > 0) {
+ timeout = 1000 * request->timeout;
} else {
/*
IcmpSendEcho2 will return invalid argument on a timeout of
}
memset(&option, 0, sizeof(IP_OPTION_INFORMATION));
- option.Ttl = param->ttl;
+ option.Ttl = request->ttl;
- if (param->ip_version == 6) {
+ if (request->ip_version == 6) {
reply_size = sizeof(ICMPV6_ECHO_REPLY) + payload_size;
} else {
reply_size = sizeof(ICMP_ECHO_REPLY) + payload_size;
}
- probe->platform.reply4 = malloc(reply_size);
- if (probe->platform.reply4 == NULL) {
- perror("failure to allocate reply buffer");
- exit(EXIT_FAILURE);
+ request->reply4 = malloc(reply_size);
+ if (request->reply4 == NULL) {
+ error(EXIT_FAILURE, errno, "failure to allocate reply buffer");
}
- if (param->ip_version == 6) {
- src_sockaddr6 = (struct sockaddr_in6 *) src_sockaddr;
- dest_sockaddr6 = (struct sockaddr_in6 *) dest_sockaddr;
+ if (request->ip_version == 6) {
+ src_sockaddr6 = (struct sockaddr_in6 *) &request->src_sockaddr;
+ dest_sockaddr6 = (struct sockaddr_in6 *) &request->dest_sockaddr;
- send_result = Icmp6SendEcho2(net_state->platform.icmp6, NULL,
- (FARPROC) on_icmp_reply, probe,
+ send_result = Icmp6SendEcho2(request->net_state->platform.icmp6,
+ NULL,
+ (FARPROC) on_icmp_reply,
+ request,
src_sockaddr6, dest_sockaddr6,
payload, payload_size, &option,
- probe->platform.reply6, reply_size,
- timeout);
+ request->reply6,
+ reply_size, timeout);
} else {
- dest_sockaddr4 = (struct sockaddr_in *) dest_sockaddr;
+ dest_sockaddr4 = (struct sockaddr_in *) &request->dest_sockaddr;
- send_result = IcmpSendEcho2(net_state->platform.icmp4, NULL,
- (FARPROC) on_icmp_reply, probe,
+ send_result = IcmpSendEcho2(request->net_state->platform.icmp4,
+ NULL,
+ (FARPROC) on_icmp_reply,
+ request,
dest_sockaddr4->sin_addr.s_addr,
payload, payload_size, &option,
- probe->platform.reply4, reply_size,
- timeout);
+ request->reply4,
+ reply_size, timeout);
}
if (send_result == 0) {
err = GetLastError();
/*
- ERROR_IO_PENDING is expected for asynchronous probes,
- but any other error is unexpected.
- */
+ ERROR_IO_PENDING is expected when the probe is sent.
+ Other errors indicate the probe wasn't sent, and should
+ be reported in the main thread.
+ */
if (err != ERROR_IO_PENDING) {
- report_win_error(probe->token, err);
- free_probe(net_state, probe);
+ request->icmp_type = -1;
+ request->reply_status = err;
+ queue_thread_result(request);
}
}
}
/* Fill the payload of the packet as specified by the probe parameters */
static
int fill_payload(
- const struct probe_param_t *param,
+ const struct icmp_thread_request_t *request,
char *payload,
int payload_buffer_size)
{
int ip_icmp_size;
int payload_size;
- if (param->ip_version == 6) {
+ if (request->ip_version == 6) {
ip_icmp_size =
sizeof(struct IP6Header) + sizeof(struct ICMPHeader);
- } else if (param->ip_version == 4) {
+ } else if (request->ip_version == 4) {
ip_icmp_size = sizeof(struct IPHeader) + sizeof(struct ICMPHeader);
} else {
errno = EINVAL;
return -1;
}
- payload_size = param->packet_size - ip_icmp_size;
+ payload_size = request->packet_size - ip_icmp_size;
if (payload_size < 0) {
payload_size = 0;
}
return -1;
}
- memset(payload, param->bit_pattern, payload_size);
+ memset(payload, request->bit_pattern, payload_size);
return payload_size;
}
+/*
+ We've received a probe request from the main thread, so
+ fill out a payload buffer and then send the probe.
+*/
+static
+void icmp_handle_probe_request(struct icmp_thread_request_t *request)
+{
+ char payload[PACKET_BUFFER_SIZE];
+ int payload_size;
+
+ payload_size = fill_payload(request, payload, PACKET_BUFFER_SIZE);
+ if (payload_size < 0) {
+ error(EXIT_FAILURE, errno, "Error constructing packet");
+ }
+
+ icmp_send_probe(request, payload, payload_size);
+}
+
+/*
+ The main loop of the ICMP service thread. The loop starts
+ an overlapped read on the incoming request pipe, then waits
+ in an alertable wait for that read to complete. Because
+ the wait is alertable, ICMP probes can complete through
+ APCs in that wait.
+*/
+static
+DWORD WINAPI icmp_service_thread(LPVOID param) {
+ struct net_state_t *net_state;
+ struct icmp_thread_request_t *request;
+ DWORD wait_status;
+ OVERLAPPED overlapped;
+ HANDLE event;
+ BOOL success;
+ bool read_pending;
+ int read_count;
+ int err;
+
+ /*
+ We need an event to signal completion of reads from the request
+ pipe.
+ */
+ event = CreateEvent(NULL, TRUE, FALSE, NULL);
+ if (event == NULL) {
+ error_win(
+ EXIT_FAILURE, GetLastError(),
+ "failure creating ICMP thread event");
+ }
+
+ net_state = (struct net_state_t *)param;
+ read_pending = false;
+ while (true) {
+ /*
+ Start a new read on the request pipe if none is
+ currently pending.
+ */
+ if (!read_pending) {
+ request = NULL;
+
+ ResetEvent(event);
+
+ memset(&overlapped, 0, sizeof(OVERLAPPED));
+ overlapped.hEvent = event;
+
+ success = ReadFile(
+ net_state->platform.thread_in_pipe_read_handle,
+ &request,
+ sizeof(struct icmp_thread_request_t *),
+ NULL,
+ &overlapped);
+
+ if (!success) {
+ err = GetLastError();
+
+ if (err != ERROR_IO_PENDING) {
+ error_win(
+ EXIT_FAILURE, err,
+ "failure starting overlapped thread pipe read");
+ }
+ }
+
+ read_pending = true;
+ }
+
+ /*
+ Wait for either the request read to complete, or
+ an APC which completes an ICMP probe.
+ */
+ wait_status = WaitForSingleObjectEx(
+ event,
+ INFINITE,
+ TRUE);
+
+ /*
+ If the event we waited on has been signalled, read
+ the request from the pipe.
+ */
+ if (wait_status == WAIT_OBJECT_0) {
+ read_pending = false;
+
+ success = GetOverlappedResult(
+ net_state->platform.thread_in_pipe_read_handle,
+ &overlapped,
+ &read_count,
+ FALSE);
+
+ if (!success) {
+ error_win(
+ EXIT_FAILURE, GetLastError(),
+ "failure completing overlapped thread pipe read");
+ }
+
+ if (read_count == 0) {
+ continue;
+ }
+
+ assert(
+ read_count == sizeof(struct icmp_thread_request_t *));
+
+ /* Start the new probe from the request */
+ icmp_handle_probe_request(request);
+ }
+ }
+}
+
+/*
+ When we are on the main thread and need the ICMP service thread
+ to start a new probe, this is used to pass the request for the
+ new probe to the service thread.
+*/
+static
+void queue_thread_request(
+ struct net_state_t *net_state,
+ struct probe_t *probe,
+ const struct probe_param_t *param,
+ struct sockaddr_storage *dest_sockaddr,
+ struct sockaddr_storage *src_sockaddr)
+{
+ struct icmp_thread_request_t *request;
+ int byte_count;
+
+ request = malloc(sizeof(struct icmp_thread_request_t));
+ if (request == NULL) {
+ error(EXIT_FAILURE, errno, "failure to allocate request");
+ }
+ memset(request, 0, sizeof(struct icmp_thread_request_t));
+
+ request->ip_version = param->ip_version;
+ request->ttl = param->ttl;
+ request->timeout = param->timeout;
+ request->packet_size = param->packet_size;
+ request->bit_pattern = param->bit_pattern;
+
+ request->net_state = net_state;
+ request->probe = probe;
+ request->dest_sockaddr = *dest_sockaddr;
+ request->src_sockaddr = *src_sockaddr;
+
+ /*
+ The ownership of the request is passed to the ICMP thread
+ through the pipe.
+ */
+ byte_count = write(
+ net_state->platform.thread_in_pipe_write,
+ &request,
+ sizeof(struct icmp_thread_request_t *));
+
+ if (byte_count == -1) {
+ error(
+ EXIT_FAILURE, errno,
+ "failure writing to probe request queue");
+ }
+}
+
/* Decode the probe parameters and send a probe */
void send_probe(
struct net_state_t *net_state,
struct probe_t *probe;
struct sockaddr_storage dest_sockaddr;
struct sockaddr_storage src_sockaddr;
- char payload[PACKET_BUFFER_SIZE];
- int payload_size;
if (resolve_probe_addresses(net_state, param, &dest_sockaddr,
&src_sockaddr)) {
probe->platform.ip_version = param->ip_version;
- payload_size = fill_payload(param, payload, PACKET_BUFFER_SIZE);
- if (payload_size < 0) {
- perror("Error construction packet");
- exit(EXIT_FAILURE);
- }
+ queue_thread_request(
+ net_state, probe, param, &dest_sockaddr, &src_sockaddr);
+}
+
+/*
+ After we've receive the result from the ICMP service thread,
+ report either the probe status, or any Windows error we
+ encountered while attempting to send the probe.
+*/
+static
+void complete_icmp_result(struct icmp_thread_request_t *request)
+{
+ struct net_state_t *net_state;
+ struct probe_t *probe;
+
+ /*
+ We can de-const the net_state and probe, since we are back
+ on the main thread.
+ */
+ net_state = (struct net_state_t *)request->net_state;
+ probe = (struct probe_t *)request->probe;
- icmp_send_probe(net_state, probe, param,
- &src_sockaddr, &dest_sockaddr, payload, payload_size);
+ if (request->icmp_type != -1) {
+ /* Record probe result */
+ respond_to_probe(net_state, probe,
+ request->icmp_type, &request->remote_addr,
+ request->round_trip_us, 0, NULL);
+ } else {
+ report_win_error(probe->token, request->reply_status);
+ free_probe(net_state, probe);
+ }
}
/*
- On Windows, an implementation of receive_replies is unnecessary, because,
- unlike Unix, replies are completed using Overlapped I/O during an
- alertable wait, and don't require explicit reads.
+ Read the status of completed probes from the ICMP service
+ if any has completed.
*/
void receive_replies(
struct net_state_t *net_state)
{
+ int read_count;
+ struct icmp_thread_request_t *request;
+
+ read_count = read(
+ net_state->platform.thread_out_pipe_read,
+ &request,
+ sizeof(struct icmp_thread_request_t *));
+
+ if (read_count == -1) {
+ /*
+ EINTR and EAGAIN can occur under normal conditions, and
+ should be retried. We will retry the next iteration
+ of the main loop.
+ */
+ if (errno == EINTR || errno == EAGAIN) {
+ return;
+ }
+
+ error(EXIT_FAILURE, errno, "thread result pipe read error");
+ }
+
+ assert(read_count == sizeof(struct icmp_thread_request_t *));
+ complete_icmp_result(request);
+
+ if (request->reply4) {
+ free(request->reply4);
+ request->reply4 = NULL;
+ }
+ free(request);
}
/*