while (ip_list != NULL) {
char *new_ip = strsep(&ip_list, " ");
// look for the IP in the list of clocks, and create an inert entry if not there
- int t = find_clock_source_record(new_ip, clock_info, clock_private_info);
- if (t == -1)
- t = create_clock_source_record(new_ip, clock_info, clock_private_info,
- 0); // don't use the mutex
-
- clock_info[t].flags |= (1 << clock_is_a_timing_peer);
+ if ((new_ip != NULL) && (new_ip[0] != 0)) {
+ int t = find_clock_source_record(new_ip, clock_info, clock_private_info);
+ if (t == -1)
+ t = create_clock_source_record(new_ip, clock_info, clock_private_info,
+ 0); // don't use the mutex
+ // if it is just about to become a timing peer, reset its sample count
+ clock_private_info[t].vacant_samples = MAX_TIMING_SAMPLES;
+ clock_private_info[t].next_sample_goes_here = 0;
+ clock_info[t].flags |= (1 << clock_is_a_timing_peer);
+ }
}
rc = pthread_mutex_unlock(&shared_memory->shm_mutex);
if (rc != 0)
warn("Can't release mutex after set_timing_peers!");
-
+ debug(1, "Timing group start");
for (i = 0; i < MAX_CLOCKS; i++) {
if ((clock_info[i].flags & (1 << clock_is_a_timing_peer)) != 0)
- debug(3, "%s is in the timing peer group.", &clock_info[i].ip);
+ debug(1, "%s.", &clock_info[i].ip);
}
+ debug(1, "Timing group end");
+
} else {
warn("Unrecognised string on the control port.");
}
}
void handle_follow_up(char *buf, ssize_t recv_len, clock_source *clock_info,
- clock_source_private_data *clock_private_info, uint64_t reception_time, pthread_mutex_t *shm_mutex) {
+ clock_source_private_data *clock_private_info, uint64_t reception_time,
+ pthread_mutex_t *shm_mutex) {
struct ptp_follow_up_message *msg = (struct ptp_follow_up_message *)buf;
- if ((clock_private_info->current_stage == sync_seen) &&
- (clock_private_info->sequence_number ==
- ntohs(msg->header.sequenceId))) {
-
- uint64_t packet_clock_id = nctohl(&msg->header.clockIdentity[0]);
- uint64_t packet_clock_id_low = nctohl(&msg->header.clockIdentity[4]);
- packet_clock_id = packet_clock_id << 32;
- packet_clock_id = packet_clock_id + packet_clock_id_low;
-
- uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]);
- uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]);
- uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]);
- uint64_t preciseOriginTimestamp = seconds_hi;
- preciseOriginTimestamp = preciseOriginTimestamp << 32;
- preciseOriginTimestamp = preciseOriginTimestamp + seconds_low;
- preciseOriginTimestamp = preciseOriginTimestamp * 1000000000L;
- preciseOriginTimestamp = preciseOriginTimestamp + nanoseconds;
- // this result is called "t1" in the IEEE spec.
- // we already have "t2" and it seems as if we can't generate "t3"
- // and "t4", so use t1 - t2 as the clock-to-local offsets
-
- clock_private_info->current_stage = waiting_for_sync;
-
- // update the shared clock information
- uint64_t offset = preciseOriginTimestamp - clock_private_info->t2;
-
- int rc = pthread_mutex_lock(shm_mutex);
- if (rc != 0)
- warn("Can't acquire mutex to update a clock!");
- // update/set the clock_id
-
- clock_info->clock_id = packet_clock_id;
- clock_info->flags |= (1<<clock_is_valid);
- clock_info->local_time = clock_private_info->t2;
- clock_info->local_to_source_time_offset = offset;
- rc = pthread_mutex_unlock(shm_mutex);
- if (rc != 0)
- warn("Can't release mutex after updating a clock!");
-
- } else {
- debug(3,
- "Follow_Up %u expecting to be in state sync_seen (%u). Stage error -- "
- "current state is %u, sequence %u. Ignoring it. %s",
- ntohs(msg->header.sequenceId), sync_seen,
- clock_private_info->current_stage,
- clock_private_info->sequence_number,
- clock_info->ip);
- }
+ if ((clock_private_info->current_stage == sync_seen) &&
+ (clock_private_info->sequence_number == ntohs(msg->header.sequenceId))) {
+
+ uint64_t packet_clock_id = nctohl(&msg->header.clockIdentity[0]);
+ uint64_t packet_clock_id_low = nctohl(&msg->header.clockIdentity[4]);
+ packet_clock_id = packet_clock_id << 32;
+ packet_clock_id = packet_clock_id + packet_clock_id_low;
+
+ uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]);
+ uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]);
+ uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]);
+ uint64_t preciseOriginTimestamp = seconds_hi;
+ preciseOriginTimestamp = preciseOriginTimestamp << 32;
+ preciseOriginTimestamp = preciseOriginTimestamp + seconds_low;
+ preciseOriginTimestamp = preciseOriginTimestamp * 1000000000L;
+ preciseOriginTimestamp = preciseOriginTimestamp + nanoseconds;
+ // this result is called "t1" in the IEEE spec.
+ // we already have "t2" and it seems as if we can't generate "t3"
+ // and "t4", so use t1 - t2 as the clock-to-local offsets
+
+ clock_private_info->current_stage = waiting_for_sync;
+
+ // update the shared clock information
+ uint64_t offset = preciseOriginTimestamp - clock_private_info->t2;
+
+ // now, if there was a valid offset previously,
+ // check if the offset should be clamped
+
+ if ((clock_info->flags & (1 << clock_is_valid)) &&
+ (clock_private_info->vacant_samples != MAX_TIMING_SAMPLES)) {
+
+ const int64_t clamp = 1 * 1000 * 1000;
+ int64_t jitter = offset - clock_private_info->previous_estimated_offset;
+ if (jitter > clamp)
+ offset = clock_private_info->previous_estimated_offset + clamp;
+ else if (jitter < (-clamp))
+ offset = clock_private_info->previous_estimated_offset - clamp;
+ int64_t clamped_jitter = offset - clock_private_info->previous_estimated_offset;
+ debug(1, "clock: %" PRIx64 ", jitter: %+f ms, clamped_jitter: %+f ms.", clock_info->clock_id,
+ jitter * 0.000001, clamped_jitter * 0.000001);
+ }
+
+ // okay, so the offset may now have been clamped to be close to the estimated previous offset
+ // track our samples
+
+ clock_private_info->samples[clock_private_info->next_sample_goes_here].local =
+ clock_private_info->t2;
+ clock_private_info->samples[clock_private_info->next_sample_goes_here].local_to_remote_offset =
+ offset;
+ clock_private_info->next_sample_goes_here++;
+ if (clock_private_info->next_sample_goes_here == MAX_TIMING_SAMPLES)
+ clock_private_info->next_sample_goes_here = 0;
+ if (clock_private_info->vacant_samples == MAX_TIMING_SAMPLES) {
+ clock_private_info->previous_offset = offset;
+ clock_private_info->previous_estimated_offset = offset;
+ }
+
+ if (clock_private_info->vacant_samples > 0)
+ clock_private_info->vacant_samples--;
+
+ // calculate averages
+
+ uint64_t estimated_offset = offset;
+
+ // here, calculate the average offset
+
+ int sample_count = MAX_TIMING_SAMPLES - clock_private_info->vacant_samples;
+
+ if (sample_count > 1) {
+ int e;
+ long double offsets = 0;
+ for (e = 0; e < sample_count; e++) {
+ uint64_t ho = clock_private_info->samples[e].local_to_remote_offset;
+
+ offsets = offsets + 1.0 * ho;
+ }
+
+ offsets = offsets / sample_count;
+ estimated_offset = (uint64_t)offsets;
+ }
+
+ int64_t estimated_variation = estimated_offset - clock_private_info->previous_estimated_offset;
+ debug(2, "clock: %" PRIx64 ", estimated_jitter: %+f ms.", clock_info->clock_id,
+ estimated_variation * 0.000001);
+
+ clock_private_info->previous_estimated_offset = estimated_offset;
+
+ int rc = pthread_mutex_lock(shm_mutex);
+ if (rc != 0)
+ warn("Can't acquire mutex to update a clock!");
+ // update/set the clock_id
+
+ clock_info->clock_id = packet_clock_id;
+ clock_info->flags |= (1 << clock_is_valid);
+ clock_info->local_time = clock_private_info->t2;
+ clock_info->local_to_source_time_offset = estimated_offset;
+ rc = pthread_mutex_unlock(shm_mutex);
+ if (rc != 0)
+ warn("Can't release mutex after updating a clock!");
+
+ } else {
+ debug(3,
+ "Follow_Up %u expecting to be in state sync_seen (%u). Stage error -- "
+ "current state is %u, sequence %u. Ignoring it. %s",
+ ntohs(msg->header.sequenceId), sync_seen, clock_private_info->current_stage,
+ clock_private_info->sequence_number, clock_info->ip);
+ }
}
\ No newline at end of file