From: Mike Brady <4265913+mikebrady@users.noreply.github.com> Date: Mon, 19 Apr 2021 07:43:08 +0000 (+0100) Subject: Use the Delay Response to check if a Sync has been delayed and remove samples where... X-Git-Tag: 1.1-dev~27 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=595bc8886e7cd1d722d7a01cb85b18132bfbcd23;p=thirdparty%2Fnqptp.git Use the Delay Response to check if a Sync has been delayed and remove samples where the Delay Resp response is not within 20 ms of the Sync. Stop clamping and guessing where the Sync should have been. Temporarily (?) turn on averaging over the last minute. More meomory and more CPU. Maybe mickey-mouse averaging would actually be better. --- diff --git a/general-utilities.c b/general-utilities.c index a526a52..a7a5baf 100644 --- a/general-utilities.c +++ b/general-utilities.c @@ -22,6 +22,18 @@ #include #include +void hcton64(uint64_t num, uint8_t *p) { + uint64_t numc = num; + numc = numc >> 32; + uint32_t num_32 = numc; + uint32_t rev = htonl(num_32); + memcpy(p,&rev,sizeof(uint32_t)); + num_32 = num & 0xffffffff; + p = p + 4; + rev = htonl(num_32); + memcpy(p,&rev,sizeof(uint32_t)); +} + uint32_t nctohl(const uint8_t *p) { // read 4 characters from *p and do ntohl on them // this is to avoid possible aliasing violations uint32_t holder; diff --git a/general-utilities.h b/general-utilities.h index 72f010e..1b627da 100644 --- a/general-utilities.h +++ b/general-utilities.h @@ -35,8 +35,8 @@ #define SAFAMILY sa_family #endif -uint32_t -nctohl(const uint8_t *p); // read 4 characters from *p and do ntohl on them, avoiding aliasing +void hcton64(uint64_t num, uint8_t *p); +uint32_t nctohl(const uint8_t *p); // read 4 characters from *p and do ntohl on them, avoiding aliasing uint16_t nctohs(const uint8_t *p); uint64_t timespec_to_ns(struct timespec *tn); uint64_t get_time_now(); diff --git a/nqptp-clock-sources.h b/nqptp-clock-sources.h index ece2aca..dbca11f 100644 --- a/nqptp-clock-sources.h +++ b/nqptp-clock-sources.h @@ -26,6 +26,7 @@ enum stage { waiting_for_sync, sync_seen, + follow_up_seen }; typedef enum { @@ -35,7 +36,7 @@ typedef enum { clock_is_master } clock_flags; -#define MAX_TIMING_SAMPLES 11 +#define MAX_TIMING_SAMPLES 481 typedef struct { uint16_t sequence_number; uint64_t local, local_to_remote_offset; @@ -51,7 +52,7 @@ typedef struct { uint16_t sequence_number; uint16_t in_use; enum stage current_stage; - uint64_t t2, previous_offset, previous_estimated_offset; + uint64_t t1, t2, t3, previous_offset, previous_estimated_offset; // for garbage collection uint64_t time_of_last_use; // will be taken out of use if not used for a while and not in the // timing peer group diff --git a/nqptp-message-handlers.c b/nqptp-message-handlers.c index b158b51..2097cda 100644 --- a/nqptp-message-handlers.c +++ b/nqptp-message-handlers.c @@ -16,11 +16,13 @@ * * Commercial licensing is also available. */ +#include +#include +#include + #include "nqptp-message-handlers.h" #include "nqptp-ptp-definitions.h" #include "nqptp-utilities.h" -#include - #include "debug.h" #include "general-utilities.h" @@ -286,8 +288,7 @@ void handle_announce(char *buf, ssize_t recv_len, clock_source_private_data *clo } void handle_sync(char *buf, __attribute__((unused)) ssize_t recv_len, - clock_source_private_data *clock_private_info, uint64_t reception_time) { - + clock_source_private_data *clock_private_info, uint64_t reception_time, SOCKADDR *from_sock_addr, int socket_number) { struct ptp_sync_message *msg = (struct ptp_sync_message *)buf; // this is just to see if anything interesting comes in the SYNC package // a non-zero origin timestamp @@ -342,7 +343,6 @@ void handle_sync(char *buf, __attribute__((unused)) ssize_t recv_len, */ clock_private_info->sequence_number = ntohs(msg->header.sequenceId); clock_private_info->t2 = reception_time; - // it turns out that we don't really need to send a Delay_Req // as a Follow_Up message always comes through @@ -351,6 +351,38 @@ void handle_sync(char *buf, __attribute__((unused)) ssize_t recv_len, // Delay_Resp time -- it contains the same information as the Follow_Up clock_private_info->current_stage = sync_seen; + + // send a delay request message + { + struct ptp_delay_req_message m; + memset(&m, 0, sizeof(m)); + m.header.transportSpecificAndMessageID = 0x11; // Table 19, pp 125, 1 byte field + m.header.reservedAndVersionPTP = 0x02; // 1 byte field + m.header.messageLength = htons(44); + m.header.flags = htons(0x608); + m.header.sourcePortID = htons(1); + m.header.controlOtherMessage = 5; // 1 byte field + m.header.sequenceId = htons(clock_private_info->sequence_number); + uint64_t sid = get_self_clock_id(); + memcpy(&m.header.clockIdentity,&sid,sizeof(uint64_t)); + struct msghdr header; + struct iovec io; + memset(&header, 0, sizeof(header)); + memset(&io, 0, sizeof(io)); + header.msg_name = from_sock_addr; + header.msg_namelen = sizeof(SOCKADDR); + header.msg_iov = &io; + header.msg_iov->iov_base = &m; + header.msg_iov->iov_len = sizeof(m); + header.msg_iovlen = 1; + clock_private_info->t3 = get_time_now(); // in case nothing better works + if ((sendmsg(socket_number, &header, 0)) == -1) { + //debug(1, "Error in sendmsg [errno = %d] to socket %d.", errno, socket_number); + //debug_print_buffer(1,(char *)&m, sizeof(m)); + } else { + //debug(1, "Success in sendmsg to socket %d.", socket_number); + } + } } } @@ -375,55 +407,61 @@ void handle_follow_up(char *buf, __attribute__((unused)) ssize_t recv_len, preciseOriginTimestamp = preciseOriginTimestamp + seconds_low; preciseOriginTimestamp = preciseOriginTimestamp * 1000000000L; preciseOriginTimestamp = preciseOriginTimestamp + nanoseconds; + + // this result is called "t1" in the IEEE spec. + + clock_private_info->t1 = preciseOriginTimestamp; + // we already have "t2" and it seems as if we can't generate "t3" // and "t4", so use t1 - t2 as the clock-to-local offsets - clock_private_info->current_stage = waiting_for_sync; + clock_private_info->current_stage = follow_up_seen; - // update the shared clock information - uint64_t offset = preciseOriginTimestamp - clock_private_info->t2; - // now, if there was a valid offset previously, - // check if the offset should be clamped + } else { + debug(3, + "Follow_Up %u expecting to be in state sync_seen (%u). Stage error -- " + "current state is %u, sequence %u. Ignoring it. %s", + ntohs(msg->header.sequenceId), sync_seen, clock_private_info->current_stage, + clock_private_info->sequence_number, clock_private_info->ip); + } +} - if ((clock_private_info->flags & (1 << clock_is_valid)) && - (clock_private_info->vacant_samples != MAX_TIMING_SAMPLES)) { +void handle_delay_resp(char *buf, __attribute__((unused)) ssize_t recv_len, + clock_source_private_data *clock_private_info, + uint64_t reception_time) { + struct ptp_delay_resp_message *msg = (struct ptp_delay_resp_message *)buf; - /* - if (clock_private_info->previous_estimated_offset != 0) { - int64_t jitter = offset - clock_private_info->previous_estimated_offset; - int64_t skew = 0; - debug(1," reception interval is: %f", clock_private_info->reception_interval * - 0.000001); if (clock_private_info->reception_interval != 0) { uint64_t nominal_interval = - 125000000; // 125 ms skew = clock_private_info->reception_interval - nominal_interval; skew = - skew / 2; offset = offset + skew; // try to compensate is a packet arrives too early or too - late - } + if ((clock_private_info->current_stage == follow_up_seen) && + (clock_private_info->sequence_number == ntohs(msg->header.sequenceId))) { - int64_t compensated_jitter = offset - clock_private_info->previous_estimated_offset; - debug(1," jitter: %f, skew: %f, compensated_jitter: %f.", jitter * 0.000001, skew * - 0.000001, compensated_jitter * 0.000001); - } - */ - - /* - // only clamp if in the timing peer list - if ((clock_info->flags & (1 << clock_is_a_timing_peer)) != 0) { - const int64_t clamp = 1 * 1000 * 1000; // - int64_t jitter = offset - clock_private_info->previous_estimated_offset; - if (jitter > clamp) - offset = clock_private_info->previous_estimated_offset + clamp; - else if (jitter < (-clamp)) - offset = clock_private_info->previous_estimated_offset - clamp; - int64_t clamped_jitter = offset - clock_private_info->previous_estimated_offset; - debug(1, "clock: %" PRIx64 ", jitter: %+f ms, clamped_jitter: %+f ms.", - clock_info->clock_id, jitter * 0.000001, clamped_jitter * 0.000001); - } - // okay, so the offset may now have been clamped to be close to the estimated previous - offset - */ - } + uint64_t packet_clock_id = nctohl(&msg->header.clockIdentity[0]); + uint64_t packet_clock_id_low = nctohl(&msg->header.clockIdentity[4]); + packet_clock_id = packet_clock_id << 32; + packet_clock_id = packet_clock_id + packet_clock_id_low; + + uint16_t seconds_hi = nctohs(&msg->delay_resp.receiveTimestamp[0]); + uint32_t seconds_low = nctohl(&msg->delay_resp.receiveTimestamp[2]); + uint32_t nanoseconds = nctohl(&msg->delay_resp.receiveTimestamp[6]); + uint64_t receiveTimestamp = seconds_hi; + receiveTimestamp = receiveTimestamp << 32; + receiveTimestamp = receiveTimestamp + seconds_low; + receiveTimestamp = receiveTimestamp * 1000000000L; + receiveTimestamp = receiveTimestamp + nanoseconds; + + // this is t4 in the IEEE doc and should be close to t1 + // on some systems, it is identical to t1. + + //uint64_t delay_req_turnaround_time = reception_time - clock_private_info->t3; + uint64_t t4t1diff = receiveTimestamp - clock_private_info->t1; + //uint64_t t3t2diff = clock_private_info->t3 - clock_private_info->t2; + //debug(1,"t4t1diff: %f, delay_req_turnaround_time: %f, t3t2diff: %f.", t4t1diff * 0.000000001, delay_req_turnaround_time * 0.000000001, t3t2diff * 0.000000001); + + + if (t4t1diff < 20000000) { + // update the shared clock information + uint64_t offset = clock_private_info->t1 - clock_private_info->t2; // update our sample information @@ -443,76 +481,23 @@ void handle_follow_up(char *buf, __attribute__((unused)) ssize_t recv_len, if (clock_private_info->vacant_samples > 0) clock_private_info->vacant_samples--; - // okay, so now we have our samples, including the current one. - - // let's try to estimate when this Sync message _should_ have arrived. - // this might allow us to detect a sample that is anomalously late or early - // skewing the offset calculation. - int sample_count = MAX_TIMING_SAMPLES - clock_private_info->vacant_samples; int64_t divergence = 0; - if (sample_count > 1) { - int f; - uint64_t ts = 0; - for (f = 0; f < sample_count; f++) { - uint64_t ti = clock_private_info->samples[f].local >> 8; - uint16_t sequence_gap = - clock_private_info->sequence_number - clock_private_info->samples[f].sequence_number; - ti += (125000000 * (sequence_gap)) >> 8; - // debug(1, "ti: %f, sequence_gap: %u, sample count: %u", ti, sequence_gap, sample_count); - ts += ti; - } - ts = ts / sample_count; - ts = ts << 8; - // uint64_t estimated_t2 = (uint64_t)ts; - divergence = clock_private_info->t2 - ts; - // debug(1, "divergence is: %f ms. %d samples", divergence * 0.000001, sample_count); - } - - // calculate averages - - // here we will correct the offset by adding the divergence to it - - offset = offset + divergence; - - const int64_t clamp = 1000 * 1000; // WAG - int64_t jitter = offset - clock_private_info->previous_estimated_offset; - if (jitter > clamp) - offset = clock_private_info->previous_estimated_offset + clamp; - else if (jitter < (-clamp)) - offset = clock_private_info->previous_estimated_offset - clamp; - // int64_t clamped_jitter = offset - clock_private_info->previous_estimated_offset; - // debug(1, "clock: %" PRIx64 ", jitter: %+f ms, clamped_jitter: %+f ms.", - // clock_info->clock_id, - // jitter * 0.000001, clamped_jitter * 0.000001); - - // okay, so the offset may now have been clamped to be close to the estimated previous offset - uint64_t estimated_offset = offset; - /* - // here, calculate the average offset - - // int sample_count = MAX_TIMING_SAMPLES - clock_private_info->vacant_samples; - if (sample_count > 1) { - int e; - long double offsets = 0; - for (e = 0; e < sample_count; e++) { - uint64_t ho = clock_private_info->samples[e].local_to_remote_offset; - - offsets = offsets + 1.0 * ho; - } + if (sample_count > 1) { + int e; + long double offsets = 0; + for (e = 0; e < sample_count; e++) { + uint64_t ho = clock_private_info->samples[e].local_to_remote_offset; - offsets = offsets / sample_count; - estimated_offset = (uint64_t)offsets; - } - */ + offsets = offsets + 1.0 * ho; + } - // int64_t estimated_variation = estimated_offset - - // clock_private_info->previous_estimated_offset; debug(1, "clock: %" PRIx64 ", - // estimated_jitter: %+f ms, divergence: %+f.", clock_info->clock_id, - // estimated_variation * 0.000001, divergence * 0.000001); + offsets = offsets / sample_count; + estimated_offset = (uint64_t)offsets; + } clock_private_info->previous_estimated_offset = estimated_offset; @@ -531,12 +516,15 @@ void handle_follow_up(char *buf, __attribute__((unused)) ssize_t recv_len, // if we have need to wrap. if (clock_private_info->next_sample_goes_here == MAX_TIMING_SAMPLES) clock_private_info->next_sample_goes_here = 0; + } else { + debug(2,"Dropping an apparently slow timing exchange with a disparity of %f milliseconds on clock: %" PRIx64 ".", t4t1diff * 0.000001, clock_private_info->clock_id); + } - } else { + } else { debug(3, - "Follow_Up %u expecting to be in state sync_seen (%u). Stage error -- " + "Delay_Resp %u expecting to be in state follow_up_seen (%u). Stage error -- " "current state is %u, sequence %u. Ignoring it. %s", - ntohs(msg->header.sequenceId), sync_seen, clock_private_info->current_stage, + ntohs(msg->header.sequenceId), follow_up_seen, clock_private_info->current_stage, clock_private_info->sequence_number, clock_private_info->ip); } } \ No newline at end of file diff --git a/nqptp-message-handlers.h b/nqptp-message-handlers.h index 1debea5..5f67e02 100644 --- a/nqptp-message-handlers.h +++ b/nqptp-message-handlers.h @@ -20,6 +20,7 @@ #ifndef NQPTP_MESSAGE_HANDLERS_H #define NQPTP_MESSAGE_HANDLERS_H +#include "general-utilities.h" #include "nqptp-clock-sources.h" #include "nqptp-shm-structures.h" @@ -27,11 +28,14 @@ void handle_announce(char *buf, ssize_t recv_len, clock_source_private_data *clo uint64_t reception_time); void handle_sync(char *buf, ssize_t recv_len, clock_source_private_data *clock_private_info, - uint64_t reception_time); + uint64_t reception_time, SOCKADDR *from_sock_addr, int socket_number); void handle_follow_up(char *buf, ssize_t recv_len, clock_source_private_data *clock_private_info, uint64_t reception_time); +void handle_delay_resp(char *buf, ssize_t recv_len, clock_source_private_data *clock_private_info, + uint64_t reception_time); + void handle_control_port_messages(char *buf, ssize_t recv_len, clock_source_private_data *clock_private_info); diff --git a/nqptp-ptp-definitions.h b/nqptp-ptp-definitions.h index 9592082..01ea7d7 100644 --- a/nqptp-ptp-definitions.h +++ b/nqptp-ptp-definitions.h @@ -146,6 +146,28 @@ struct __attribute__((__packed__)) ptp_delay_resp { uint8_t requestingPortIdentity[10]; }; +// this is the extra part for a Pdelay_Req message (13.9, pp 131) +struct __attribute__((__packed__)) ptp_pdelay_req { + uint8_t originTimestamp[10]; + uint8_t reserved[10]; // to make it the same length as a Pdelay_Resp message +}; + +// this is the extra part for a Pdelay_Resp message (13.10, pp 131) +struct __attribute__((__packed__)) ptp_pdelay_resp { + uint8_t requestReceiptTimestamp[10]; + uint8_t requestingPortIdentity[10]; +}; + +struct __attribute__((__packed__)) ptp_pdelay_req_message { + struct ptp_common_message_header header; + struct ptp_pdelay_req pdelay_req; +}; + +struct __attribute__((__packed__)) ptp_pdelay_resp_message { + struct ptp_common_message_header header; + struct ptp_pdelay_resp pdelay_resp; +}; + struct __attribute__((__packed__)) ptp_sync_message { struct ptp_common_message_header header; struct ptp_sync sync; diff --git a/nqptp-utilities.c b/nqptp-utilities.c index 3b98b45..ad51fa5 100644 --- a/nqptp-utilities.c +++ b/nqptp-utilities.c @@ -18,6 +18,7 @@ */ #include "nqptp-utilities.h" +#include "nqptp-ptp-definitions.h" #include "general-utilities.h" #include #include @@ -201,7 +202,42 @@ uint64_t get_self_clock_id() { local_clock_id[3] = 0xFF; local_clock_id[4] = 0xFE; } + // it's in Network Byte Order! uint64_t result; memcpy(&result, local_clock_id, sizeof(result)); + // debug(1,"local_clock_id: %" PRIx64 ".", result); return result; } + + +void send_delay_req_message(int socket_number, SOCKADDR *from_sock_addr, uint16_t seqno) { + struct ptp_delay_req_message m; + memset(&m, 0, sizeof(m)); + m.header.transportSpecificAndMessageID = 0x11; // Table 19, pp 125, 1 byte field + m.header.reservedAndVersionPTP = 0x02; // 1 byte field + m.header.messageLength = htons(44); + m.header.flags = htons(0x608); + m.header.sourcePortID = htons(1); + m.header.controlOtherMessage = 5; // 1 byte field + m.header.sequenceId = htons(seqno); + m.header.logMessagePeriod = 0x7f; // Table 24, pp 128 + uint64_t sid = get_self_clock_id(); + memcpy(&m.header.clockIdentity,&sid,sizeof(uint64_t)); + struct msghdr header; + struct iovec io; + memset(&header, 0, sizeof(header)); + memset(&io, 0, sizeof(io)); + header.msg_name = from_sock_addr; + header.msg_namelen = sizeof(SOCKADDR); + header.msg_iov = &io; + header.msg_iov->iov_base = &m; + header.msg_iov->iov_len = sizeof(m); + header.msg_iovlen = 1; + uint64_t transmission_time = get_time_now(); // in case nothing better works + if ((sendmsg(socket_number, &header, 0)) == -1) { + debug(1, "Error in sendmsg [errno = %d]", errno); + } else { + debug_print_buffer(1,&m, sizeof(m)); + } +} + diff --git a/nqptp-utilities.h b/nqptp-utilities.h index f821f28..fb6d83d 100644 --- a/nqptp-utilities.h +++ b/nqptp-utilities.h @@ -37,5 +37,4 @@ typedef struct { void open_sockets_at_port(uint16_t port, sockets_open_bundle *sockets_open_stuff); void debug_print_buffer(int level, char *buf, size_t buf_len); uint64_t get_self_clock_id(); // a clock ID based on a MAC address - #endif \ No newline at end of file diff --git a/nqptp.c b/nqptp.c index 2fc2556..119902e 100644 --- a/nqptp.c +++ b/nqptp.c @@ -225,7 +225,7 @@ int main(void) { while (1) { struct epoll_event events[MAX_EVENTS]; - // the timeout is in seconds + // the timeout is in milliseconds int event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, 1000); uint64_t reception_time = get_time_now(); // use this if other methods fail @@ -358,13 +358,16 @@ int main(void) { handle_announce(buf, recv_len, &clocks_private[the_clock], reception_time); break; case Sync: { // if it's a sync - handle_sync(buf, recv_len, &clocks_private[the_clock], reception_time); + handle_sync(buf, recv_len, &clocks_private[the_clock], reception_time, &from_sock_addr, socket_number); } break; - case Follow_Up: { handle_follow_up(buf, recv_len, &clocks_private[the_clock], reception_time); } break; + case Delay_Resp: { + handle_delay_resp(buf, recv_len, &clocks_private[the_clock], reception_time); + } break; default: + debug_print_buffer(2, buf, recv_len); // unusual messages will have debug level 1. break; } }