From: Mike Brady <4265913+mikebrady@users.noreply.github.com> Date: Thu, 8 Apr 2021 08:22:53 +0000 (+0100) Subject: working with timing peer lists, it seems X-Git-Tag: 1.1-dev~40 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=c002c4927266473a9c2d48289679398ab30eebb5;p=thirdparty%2Fnqptp.git working with timing peer lists, it seems --- diff --git a/nqptp-clock-sources.c b/nqptp-clock-sources.c index d5bee8a..cc97b71 100644 --- a/nqptp-clock-sources.c +++ b/nqptp-clock-sources.c @@ -78,6 +78,7 @@ int create_clock_source_record(char *sender_string, clock_source *clocks_shared_ clocks_private_info[i].in_use = 1; clocks_private_info[i].t2 = 0; clocks_private_info[i].current_stage = waiting_for_sync; + clocks_private_info[i].vacant_samples = MAX_TIMING_SAMPLES; debug(2, "activated source %d with clock_id %" PRIx64 " on ip: %s.", i, clocks_shared_info[i].clock_id, &clocks_shared_info[i].ip); } else { diff --git a/nqptp-clock-sources.h b/nqptp-clock-sources.h index d53d07b..6f46172 100644 --- a/nqptp-clock-sources.h +++ b/nqptp-clock-sources.h @@ -28,7 +28,7 @@ enum stage { sync_seen, }; -#define MAX_TIMING_SAMPLES 480 +#define MAX_TIMING_SAMPLES 19 typedef struct { uint64_t local, local_to_remote_offset; } timing_samples; diff --git a/nqptp-message-handlers.c b/nqptp-message-handlers.c index 81d2b89..add262f 100644 --- a/nqptp-message-handlers.c +++ b/nqptp-message-handlers.c @@ -42,22 +42,28 @@ void handle_control_port_messages(char *buf, ssize_t recv_len, clock_source *clo while (ip_list != NULL) { char *new_ip = strsep(&ip_list, " "); // look for the IP in the list of clocks, and create an inert entry if not there - int t = find_clock_source_record(new_ip, clock_info, clock_private_info); - if (t == -1) - t = create_clock_source_record(new_ip, clock_info, clock_private_info, - 0); // don't use the mutex - - clock_info[t].flags |= (1 << clock_is_a_timing_peer); + if ((new_ip != NULL) && (new_ip[0] != 0)) { + int t = find_clock_source_record(new_ip, clock_info, clock_private_info); + if (t == -1) + t = create_clock_source_record(new_ip, clock_info, clock_private_info, + 0); // don't use the mutex + // if it is just about to become a timing peer, reset its sample count + clock_private_info[t].vacant_samples = MAX_TIMING_SAMPLES; + clock_private_info[t].next_sample_goes_here = 0; + clock_info[t].flags |= (1 << clock_is_a_timing_peer); + } } rc = pthread_mutex_unlock(&shared_memory->shm_mutex); if (rc != 0) warn("Can't release mutex after set_timing_peers!"); - + debug(1, "Timing group start"); for (i = 0; i < MAX_CLOCKS; i++) { if ((clock_info[i].flags & (1 << clock_is_a_timing_peer)) != 0) - debug(3, "%s is in the timing peer group.", &clock_info[i].ip); + debug(1, "%s.", &clock_info[i].ip); } + debug(1, "Timing group end"); + } else { warn("Unrecognised string on the control port."); } @@ -152,55 +158,115 @@ void handle_announce(char *buf, ssize_t recv_len, clock_source *clock_info, } void handle_follow_up(char *buf, ssize_t recv_len, clock_source *clock_info, - clock_source_private_data *clock_private_info, uint64_t reception_time, pthread_mutex_t *shm_mutex) { + clock_source_private_data *clock_private_info, uint64_t reception_time, + pthread_mutex_t *shm_mutex) { struct ptp_follow_up_message *msg = (struct ptp_follow_up_message *)buf; - if ((clock_private_info->current_stage == sync_seen) && - (clock_private_info->sequence_number == - ntohs(msg->header.sequenceId))) { - - uint64_t packet_clock_id = nctohl(&msg->header.clockIdentity[0]); - uint64_t packet_clock_id_low = nctohl(&msg->header.clockIdentity[4]); - packet_clock_id = packet_clock_id << 32; - packet_clock_id = packet_clock_id + packet_clock_id_low; - - uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]); - uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]); - uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]); - uint64_t preciseOriginTimestamp = seconds_hi; - preciseOriginTimestamp = preciseOriginTimestamp << 32; - preciseOriginTimestamp = preciseOriginTimestamp + seconds_low; - preciseOriginTimestamp = preciseOriginTimestamp * 1000000000L; - preciseOriginTimestamp = preciseOriginTimestamp + nanoseconds; - // this result is called "t1" in the IEEE spec. - // we already have "t2" and it seems as if we can't generate "t3" - // and "t4", so use t1 - t2 as the clock-to-local offsets - - clock_private_info->current_stage = waiting_for_sync; - - // update the shared clock information - uint64_t offset = preciseOriginTimestamp - clock_private_info->t2; - - int rc = pthread_mutex_lock(shm_mutex); - if (rc != 0) - warn("Can't acquire mutex to update a clock!"); - // update/set the clock_id - - clock_info->clock_id = packet_clock_id; - clock_info->flags |= (1<local_time = clock_private_info->t2; - clock_info->local_to_source_time_offset = offset; - rc = pthread_mutex_unlock(shm_mutex); - if (rc != 0) - warn("Can't release mutex after updating a clock!"); - - } else { - debug(3, - "Follow_Up %u expecting to be in state sync_seen (%u). Stage error -- " - "current state is %u, sequence %u. Ignoring it. %s", - ntohs(msg->header.sequenceId), sync_seen, - clock_private_info->current_stage, - clock_private_info->sequence_number, - clock_info->ip); - } + if ((clock_private_info->current_stage == sync_seen) && + (clock_private_info->sequence_number == ntohs(msg->header.sequenceId))) { + + uint64_t packet_clock_id = nctohl(&msg->header.clockIdentity[0]); + uint64_t packet_clock_id_low = nctohl(&msg->header.clockIdentity[4]); + packet_clock_id = packet_clock_id << 32; + packet_clock_id = packet_clock_id + packet_clock_id_low; + + uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]); + uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]); + uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]); + uint64_t preciseOriginTimestamp = seconds_hi; + preciseOriginTimestamp = preciseOriginTimestamp << 32; + preciseOriginTimestamp = preciseOriginTimestamp + seconds_low; + preciseOriginTimestamp = preciseOriginTimestamp * 1000000000L; + preciseOriginTimestamp = preciseOriginTimestamp + nanoseconds; + // this result is called "t1" in the IEEE spec. + // we already have "t2" and it seems as if we can't generate "t3" + // and "t4", so use t1 - t2 as the clock-to-local offsets + + clock_private_info->current_stage = waiting_for_sync; + + // update the shared clock information + uint64_t offset = preciseOriginTimestamp - clock_private_info->t2; + + // now, if there was a valid offset previously, + // check if the offset should be clamped + + if ((clock_info->flags & (1 << clock_is_valid)) && + (clock_private_info->vacant_samples != MAX_TIMING_SAMPLES)) { + + const int64_t clamp = 1 * 1000 * 1000; + int64_t jitter = offset - clock_private_info->previous_estimated_offset; + if (jitter > clamp) + offset = clock_private_info->previous_estimated_offset + clamp; + else if (jitter < (-clamp)) + offset = clock_private_info->previous_estimated_offset - clamp; + int64_t clamped_jitter = offset - clock_private_info->previous_estimated_offset; + debug(1, "clock: %" PRIx64 ", jitter: %+f ms, clamped_jitter: %+f ms.", clock_info->clock_id, + jitter * 0.000001, clamped_jitter * 0.000001); + } + + // okay, so the offset may now have been clamped to be close to the estimated previous offset + // track our samples + + clock_private_info->samples[clock_private_info->next_sample_goes_here].local = + clock_private_info->t2; + clock_private_info->samples[clock_private_info->next_sample_goes_here].local_to_remote_offset = + offset; + clock_private_info->next_sample_goes_here++; + if (clock_private_info->next_sample_goes_here == MAX_TIMING_SAMPLES) + clock_private_info->next_sample_goes_here = 0; + if (clock_private_info->vacant_samples == MAX_TIMING_SAMPLES) { + clock_private_info->previous_offset = offset; + clock_private_info->previous_estimated_offset = offset; + } + + if (clock_private_info->vacant_samples > 0) + clock_private_info->vacant_samples--; + + // calculate averages + + uint64_t estimated_offset = offset; + + // here, calculate the average offset + + int sample_count = MAX_TIMING_SAMPLES - clock_private_info->vacant_samples; + + if (sample_count > 1) { + int e; + long double offsets = 0; + for (e = 0; e < sample_count; e++) { + uint64_t ho = clock_private_info->samples[e].local_to_remote_offset; + + offsets = offsets + 1.0 * ho; + } + + offsets = offsets / sample_count; + estimated_offset = (uint64_t)offsets; + } + + int64_t estimated_variation = estimated_offset - clock_private_info->previous_estimated_offset; + debug(2, "clock: %" PRIx64 ", estimated_jitter: %+f ms.", clock_info->clock_id, + estimated_variation * 0.000001); + + clock_private_info->previous_estimated_offset = estimated_offset; + + int rc = pthread_mutex_lock(shm_mutex); + if (rc != 0) + warn("Can't acquire mutex to update a clock!"); + // update/set the clock_id + + clock_info->clock_id = packet_clock_id; + clock_info->flags |= (1 << clock_is_valid); + clock_info->local_time = clock_private_info->t2; + clock_info->local_to_source_time_offset = estimated_offset; + rc = pthread_mutex_unlock(shm_mutex); + if (rc != 0) + warn("Can't release mutex after updating a clock!"); + + } else { + debug(3, + "Follow_Up %u expecting to be in state sync_seen (%u). Stage error -- " + "current state is %u, sequence %u. Ignoring it. %s", + ntohs(msg->header.sequenceId), sync_seen, clock_private_info->current_stage, + clock_private_info->sequence_number, clock_info->ip); + } } \ No newline at end of file diff --git a/nqptp-message-handlers.h b/nqptp-message-handlers.h index 1b69311..056e754 100644 --- a/nqptp-message-handlers.h +++ b/nqptp-message-handlers.h @@ -27,7 +27,8 @@ void handle_announce(char *buf, ssize_t recv_len, clock_source *clock_info, clock_source_private_data *clock_private_info, uint64_t reception_time); void handle_follow_up(char *buf, ssize_t recv_len, clock_source *clock_info, - clock_source_private_data *clock_private_info, uint64_t reception_time, pthread_mutex_t *shm_mutex); + clock_source_private_data *clock_private_info, uint64_t reception_time, + pthread_mutex_t *shm_mutex); void handle_control_port_messages(char *buf, ssize_t recv_len, clock_source *clock_info, clock_source_private_data *clock_private_info); diff --git a/nqptp.c b/nqptp.c index eb4df14..6a2e247 100644 --- a/nqptp.c +++ b/nqptp.c @@ -405,7 +405,8 @@ int main(void) { case Follow_Up: { handle_follow_up(buf, recv_len, &shared_memory->clocks[the_clock], - &clocks_private[the_clock], reception_time, &shared_memory->shm_mutex); + &clocks_private[the_clock], reception_time, + &shared_memory->shm_mutex); } break; default: break;