From: Mike Brady <4265913+mikebrady@users.noreply.github.com> Date: Tue, 16 Mar 2021 08:33:32 +0000 (+0000) Subject: Works reasonably well. Two filters -- least square line of best fit and average.... X-Git-Tag: 1.1-dev~109 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1014c75f42871b1ec25d66d65bf2113ea09c9da5;p=thirdparty%2Fnqptp.git Works reasonably well. Two filters -- least square line of best fit and average. Neither is great. Not using recvmsg. Signal sequence checking is crude. --- diff --git a/nqptp.c b/nqptp.c index 93c29ae..1c341f8 100644 --- a/nqptp.c +++ b/nqptp.c @@ -41,6 +41,14 @@ // "IEEE Standard for a Precision Clock Synchronization Protocol for Networked Measurement and // Control Systems" The IEEE Std 1588-2008 (Revision of IEEE Std 1588-2002) +// transaction tracking +enum stage { + nothing_seen, + sync_seen, + follow_up_seen, + waiting_for_sync // this when you are waiting out a sync for a new cycle +}; + // Table 19 enum messageType { Sync, @@ -61,9 +69,21 @@ enum messageType { Reserved_F }; +#define MAX_TIMING_SAMPLES 1 +struct timing_samples { + uint64_t local, remote; +} timing_samples; + struct ptpSource { - char *ip; // ipv4 or ipv6 + char *ip; // ipv4 or ipv6 + int discarding_packets; // true if discarding packets for a period + uint64_t discard_until_time; + uint16_t sequence_number; + enum stage current_stage; uint64_t t1, t2, t3, t4, t5, previous_offset; + struct timing_samples samples[MAX_TIMING_SAMPLES]; + int vacant_samples; // the number of elements in the timing_samples array that are not yet used + int next_sample_goes_here; struct ptpSource *next; } ptpSource; @@ -156,6 +176,7 @@ struct ptpSource *findOrCreateSource(struct ptpSource **list, char *ip) { if (response != NULL) { memset((void *)response, 0, sizeof(ptpSource)); response->ip = strdup(ip); + response->vacant_samples = MAX_TIMING_SAMPLES; // no valid samples yet *insertion_point = response; fprintf(stderr, "Clock record created for \"%s\".\n", ip); } @@ -367,14 +388,8 @@ int main(void) { ret |= setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof(yes)); } #endif - - //if (ret == 0) - // ret = setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &yes, sizeof(yes)); - - if (ret != 0) - fprintf(stderr, "unable to enable timestamping.\n"); - - + if (ret != 0) + fprintf(stderr, "unable to enable timestamping.\n"); if (!ret) ret = bind(fd, p->ai_addr, p->ai_addrlen); @@ -398,6 +413,7 @@ int main(void) { if (sockets_open > 0) { while (1) { + uint64_t discard_interval = 50000000; // 50 ms. fd_set readSockSet; struct timeval timeout; FD_ZERO(&readSockSet); @@ -427,16 +443,16 @@ int main(void) { if (recv_len == -1) { die("recvfrom()"); } else if (recv_len >= (ssize_t)sizeof(struct ptp_common_message_header)) { - // get the time + // get the time - struct timeval tv_ioctl; - tv_ioctl.tv_sec = 0; - tv_ioctl.tv_usec = 0; - int error = ioctl(sockets[t].number, SIOCGSTAMP, &tv_ioctl); - uint64_t reception_time = tv_ioctl.tv_sec; - reception_time = reception_time * 1000000; - reception_time = reception_time + tv_ioctl.tv_usec; - reception_time = reception_time * 1000; + struct timeval tv_ioctl; + tv_ioctl.tv_sec = 0; + tv_ioctl.tv_usec = 0; + int error = ioctl(sockets[t].number, SIOCGSTAMP, &tv_ioctl); + uint64_t reception_time = tv_ioctl.tv_sec; + reception_time = reception_time * 1000000; + reception_time = reception_time + tv_ioctl.tv_usec; + reception_time = reception_time * 1000; // check its credentials // the sending and receiving ports must be the same (i.e. 319 -> 319 or 320 -> 320) @@ -465,107 +481,283 @@ int main(void) { memset(sender_string, 0, sizeof(sender_string)); inet_ntop(connection_ip_family, sender_addr, sender_string, sizeof(sender_string)); - // fprintf(stderr, "connection from %s:%u on port %u\n", sender_string, sender_port, + // fprintf(stderr, "connection from %s:%u on port %u\n", sender_string, + // sender_port, // sockets[t].port); // print_buffer(buf, recv_len); // now, find or create a record for this ip struct ptpSource *the_clock = findOrCreateSource(&clocks, sender_string); - switch (buf[0] & 0xF) { - case Sync: { // if it's a sync - struct ptp_sync_message *msg = (struct ptp_sync_message *)buf; - the_clock->t2 = reception_time; - memset(&m, 0, sizeof(m)); - m.header.transportSpecificAndMessageID = 0x11; - m.header.reservedAndVersionPTP = 0x02; - m.header.messageLength = htons(44); - m.header.flags = htons(0x608); - m.header.sourcePortID = htons(1); - m.header.controlOtherMessage = 5; - m.header.sequenceId = msg->header.sequenceId; - struct ifaddrs *ifaddr = NULL; - struct ifaddrs *ifa = NULL; - - if ((status = getifaddrs(&ifaddr) == -1)) { - fprintf(stderr, "getifaddrs: %s\n", gai_strerror(status)); - } else { - int found = 0; - for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { - if ((ifa->ifa_addr) && (ifa->ifa_addr->sa_family == AF_PACKET)) { - struct sockaddr_ll *s = (struct sockaddr_ll *)ifa->ifa_addr; - if ((strcmp(ifa->ifa_name, "lo") != 0) && (found == 0)) { - memcpy(&m.header.clockIdentity, &s->sll_addr, s->sll_halen); - found = 1; + if (the_clock->discarding_packets != 0) { + int64_t discard_time_remaining = the_clock->discard_until_time - reception_time; + if (discard_time_remaining <= 0) + the_clock->discarding_packets = 0; + } + + if (the_clock->discarding_packets == 0) { + switch (buf[0] & 0xF) { + case Sync: { // if it's a sync + struct ptp_sync_message *msg = (struct ptp_sync_message *)buf; + if ((the_clock->current_stage != nothing_seen) && + (the_clock->current_stage != waiting_for_sync)) { + + fprintf(stderr, + "Sync expecting to be in state nothing_seen (%u) or waiting_for_sync " + "(%u). Stage error -- " + "current state is %u. Discarding. %s\n", + nothing_seen, waiting_for_sync, the_clock->current_stage, + the_clock->ip); + + the_clock->current_stage = waiting_for_sync; + // the_clock->discarding_packets = 1; + the_clock->discard_until_time = reception_time + discard_interval; + } + the_clock->sequence_number = ntohs(msg->header.sequenceId); + the_clock->t2 = reception_time; + memset(&m, 0, sizeof(m)); + m.header.transportSpecificAndMessageID = 0x11; + m.header.reservedAndVersionPTP = 0x02; + m.header.messageLength = htons(44); + m.header.flags = htons(0x608); + m.header.sourcePortID = htons(1); + m.header.controlOtherMessage = 5; + m.header.sequenceId = htons(the_clock->sequence_number); + struct ifaddrs *ifaddr = NULL; + struct ifaddrs *ifa = NULL; + + if ((status = getifaddrs(&ifaddr) == -1)) { + fprintf(stderr, "getifaddrs: %s\n", gai_strerror(status)); + } else { + int found = 0; + for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { + if ((ifa->ifa_addr) && (ifa->ifa_addr->sa_family == AF_PACKET)) { + struct sockaddr_ll *s = (struct sockaddr_ll *)ifa->ifa_addr; + if ((strcmp(ifa->ifa_name, "lo") != 0) && (found == 0)) { + memcpy(&m.header.clockIdentity, &s->sll_addr, s->sll_halen); + found = 1; + } } } + freeifaddrs(ifaddr); + } + // fprintf(stderr, "DREQ to %s\n", the_clock->ip); + if (sendto(sockets[t].number, &m, sizeof(m), 0, + (const struct sockaddr *)&from_sock_addr, + from_sock_addr_length) == -1) { + fprintf(stderr, "sendto: %s\n", strerror(errno)); + return 4; } - freeifaddrs(ifaddr); - } - // fprintf(stderr, "DREQ to %s\n", the_clock->ip); - the_clock->t3 = get_time_now(); - if (sendto(sockets[t].number, &m, sizeof(m), 0, - (const struct sockaddr *)&from_sock_addr, - from_sock_addr_length) == -1) { - fprintf(stderr, "sendto: %s\n", strerror(errno)); - return 4; - } - } break; - - case Follow_Up: { - struct ptp_follow_up_message *msg = (struct ptp_follow_up_message *)buf; - uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]); - uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]); - uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]); - uint64_t preciseOriginTimestamp = seconds_hi; - preciseOriginTimestamp = preciseOriginTimestamp << 32; - preciseOriginTimestamp = preciseOriginTimestamp + seconds_low; - preciseOriginTimestamp = preciseOriginTimestamp * 1000000000L; - preciseOriginTimestamp = preciseOriginTimestamp + nanoseconds; - the_clock->t1 = preciseOriginTimestamp; - } break; - case Delay_Resp: { - struct ptp_delay_resp_message *msg = (struct ptp_delay_resp_message *)buf; - uint16_t seconds_hi = nctohs(&msg->delay_resp.receiveTimestamp[0]); - uint32_t seconds_low = nctohl(&msg->delay_resp.receiveTimestamp[2]); - uint32_t nanoseconds = nctohl(&msg->delay_resp.receiveTimestamp[6]); - uint64_t receiveTimestamp = seconds_hi; - receiveTimestamp = receiveTimestamp << 32; - receiveTimestamp = receiveTimestamp + seconds_low; - receiveTimestamp = receiveTimestamp * 1000000000L; - receiveTimestamp = receiveTimestamp + nanoseconds; - the_clock->t4 = receiveTimestamp; - the_clock->t5 = reception_time; // t5 - t3 gives us the out-and-back time locally - // -- an instantaneous quality index - - // calculate delay and calculate offset - // fprintf(stderr, "t1: %016" PRIx64 ", t2: %" PRIx64 ", t3: %" PRIx64 ", t4: %" - // PRIx64 - // ".\n",t1,t2,t3,t4); fprintf(stderr, "nominal remote transaction time: %" PRIx64 - // " = - // %" PRIu64 "ns; local transaction time: %" PRIx64 " = %" PRId64 "ns.\n", t4-t1, - // t4-t1, t3-t2, t3-t2); - uint64_t offset = the_clock->t1 - the_clock->t2; - if (the_clock->previous_offset == 0) - fprintf(stderr, "offset: %" PRIx64 ".\n", offset); - else { - int64_t variation = offset - the_clock->previous_offset; - fprintf(stderr, - "remote transaction time: %f, offset: %" PRIx64 - ", variation: %+f, turnaround: %f ip: %s.\n", - (the_clock->t4 - the_clock->t1) * 0.000000001, offset, - variation * 0.000000001, (the_clock->t5 - the_clock->t2) * 0.000000001, the_clock->ip); - } - the_clock->previous_offset = offset; - // fprintf(stderr, "Offset: %" PRIx64 ", delay %f.\n", offset, delay*0.000000001); - } break; - default: - break; - } + struct timeval tv_ioctl; + tv_ioctl.tv_sec = 0; + tv_ioctl.tv_usec = 0; + int error = ioctl(sockets[t].number, SIOCGSTAMP, &tv_ioctl); + uint64_t transmission_time = tv_ioctl.tv_sec; + transmission_time = transmission_time * 1000000; + transmission_time = transmission_time + tv_ioctl.tv_usec; + transmission_time = transmission_time * 1000; + the_clock->t3 = transmission_time; + // int64_t ttd = transmission_time - the_clock->t3; + // fprintf(stderr, "transmission time delta: %f.\n", ttd*0.000000001); + + the_clock->current_stage = sync_seen; + } break; + + case Follow_Up: { + struct ptp_follow_up_message *msg = (struct ptp_follow_up_message *)buf; + if ((the_clock->current_stage == sync_seen) && + (the_clock->sequence_number == ntohs(msg->header.sequenceId))) { + uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]); + uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]); + uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]); + uint64_t preciseOriginTimestamp = seconds_hi; + preciseOriginTimestamp = preciseOriginTimestamp << 32; + preciseOriginTimestamp = preciseOriginTimestamp + seconds_low; + preciseOriginTimestamp = preciseOriginTimestamp * 1000000000L; + preciseOriginTimestamp = preciseOriginTimestamp + nanoseconds; + the_clock->t1 = preciseOriginTimestamp; + the_clock->current_stage = follow_up_seen; + } else { + if (the_clock->current_stage != waiting_for_sync) { + + fprintf(stderr, + "Follow_Up expecting to be in state sync_seen (%u). Stage error -- " + "current state is %u. Discarding. %s\n", + sync_seen, the_clock->current_stage, the_clock->ip); + + the_clock->current_stage = waiting_for_sync; + // the_clock->discarding_packets = 1; + the_clock->discard_until_time = reception_time + discard_interval; + } + } + } break; + case Delay_Resp: { + struct ptp_delay_resp_message *msg = (struct ptp_delay_resp_message *)buf; + if ((the_clock->current_stage == follow_up_seen) && + (the_clock->sequence_number == ntohs(msg->header.sequenceId))) { + uint16_t seconds_hi = nctohs(&msg->delay_resp.receiveTimestamp[0]); + uint32_t seconds_low = nctohl(&msg->delay_resp.receiveTimestamp[2]); + uint32_t nanoseconds = nctohl(&msg->delay_resp.receiveTimestamp[6]); + uint64_t receiveTimestamp = seconds_hi; + receiveTimestamp = receiveTimestamp << 32; + receiveTimestamp = receiveTimestamp + seconds_low; + receiveTimestamp = receiveTimestamp * 1000000000L; + receiveTimestamp = receiveTimestamp + nanoseconds; + the_clock->t4 = receiveTimestamp; + the_clock->t5 = + reception_time; // t5 - t3 gives us the out-and-back time locally + // -- an instantaneous quality index + // t5 - t2 gives us an overall interchange time + // from the Sync to the Delay Resp + + if ((the_clock->t5 - the_clock->t2) < 15 * 1000000) { + if ((the_clock->t4 - the_clock->t1) < 60 * 1000000) { + + // calculate delay and calculate offset + // fprintf(stderr, "t1: %016" PRIx64 ", t2: %" PRIx64 ", t3: %" PRIx64 ", + // t4: + // %" PRIx64 + // ".\n",t1,t2,t3,t4); fprintf(stderr, "nominal remote transaction time: + // %" PRIx64 " = + // %" PRIu64 "ns; local transaction time: %" PRIx64 " = %" PRId64 "ns.\n", + // t4-t1, t4-t1, t3-t2, t3-t2); + + // now, store the remote and local times in the array + the_clock->samples[the_clock->next_sample_goes_here].local = + the_clock->t2; + the_clock->samples[the_clock->next_sample_goes_here].remote = + the_clock->t1; + the_clock->next_sample_goes_here++; + if (the_clock->next_sample_goes_here == MAX_TIMING_SAMPLES) + the_clock->next_sample_goes_here = 0; + if (the_clock->vacant_samples > 0) + the_clock->vacant_samples--; + + // fprintf(stderr, "Offset: %" PRIx64 ", delay %f.\n", offset, + // delay*0.000000001); + + // clang-format off + + /* + + // here, let's try to use the t1 - remote time and t2 - local time + // records to estimate the relationship between the local clock (x) and + // remote clock(y) estimate a figure for drift between the local + // clock (x) and the remote clock (y) + + // if we plug in a local interval, we will get back what that is in remote + // time + + // calculate the line of best fit for relating the local time and the remote + // time we will calculate the slope, which is the drift. See + // https://www.varsitytutors.com/hotmath/hotmath_help/topics/line-of-best-fit + + long double y_bar = 0; // remote timestamp average + long double x_bar = 0; // local timestamp average + int sample_count = 0; + long double gradient; + long double intercept; + int i; + for (i = 0; i < MAX_TIMING_SAMPLES - the_clock->vacant_samples; i++) { + y_bar += (1.0 * the_clock->samples[i].remote); + x_bar += (1.0 * the_clock->samples[i].local); + sample_count++; + } - } else { - // fprintf(stderr, "Packet dropped because ports don't match.\n"); + y_bar = y_bar / sample_count; + x_bar = x_bar / sample_count; + + long double xid, yid; + long double mtl, mbl; + mtl = 0; + mbl = 0; + for (i = 0; i < MAX_TIMING_SAMPLES - the_clock->vacant_samples; i++) { + xid = 1.0 * the_clock->samples[i].local - x_bar; + yid = 1.0 * the_clock->samples[i].remote - y_bar; + mtl = mtl + xid * yid; + mbl = mbl + xid * xid; + } + + if (mbl) + gradient = (1.0 * mtl) / mbl; + else { + gradient = 1.0; + } + + // intercept = y_bar - gradient * x_bar + + intercept = 1.0 * y_bar - gradient * x_bar; + + // y = mx + c + // remote = gradient * local + intercept + + long double remote_f = gradient * the_clock->t2 + intercept; + uint64_t remote_estimate = (uint64_t)remote_f; + // fprintf(stderr, "remote actual: %" PRIx64 ", remote estimated: %" PRIx64 + // ".\n", the_clock->t1, remote_estimate); + + // uint64_t offset = the_clock->t1 - the_clock->t2; + uint64_t offset = remote_estimate - the_clock->t2; + // clang-format on + */ + + // here, calculate the average offset + + int e; + long double offsets = 0; + for (e = 0; e < MAX_TIMING_SAMPLES - the_clock->vacant_samples; e++) { + offsets = offsets + 1.0 * (the_clock->samples[e].remote - + the_clock->samples[e].local); + } + + offsets = offsets / (MAX_TIMING_SAMPLES - the_clock->vacant_samples); + + uint64_t offset = (uint64_t)offsets; + long double gradient = 1.0; + // uint64_t offset = the_clock->t1 - the_clock->t2; + + if (the_clock->previous_offset == 0) + fprintf(stderr, "offset: %" PRIx64 ".\n", offset); + else { + int64_t variation = offset - the_clock->previous_offset; + fprintf(stderr, + "remote transaction time: %f, offset: %" PRIx64 + ", variation: %+f, turnaround: %f delta (ppm): %+Lf ip: %s.\n", + (the_clock->t4 - the_clock->t1) * 0.000000001, offset, + variation * 0.000000001, + (the_clock->t5 - the_clock->t2) * 0.000000001, (gradient - 1.0) * 1000000, the_clock->ip); + } + the_clock->previous_offset = offset; + } else { + fprintf(stderr, + "t4 - t1 (sync and delay response) time is too long. Discarding. %s\n", + the_clock->ip); + } + } else { + fprintf(stderr, + "t5 - t2 time (cycle time) is too long. Discarding. %s\n", + the_clock->ip); + } + the_clock->current_stage = nothing_seen; + } else { + if (the_clock->current_stage != waiting_for_sync) { + + fprintf(stderr, + "Delay_Resp expecting to be in state sync_seen (%u). Stage error -- " + "current state is %u. Discarding. %s\n", + sync_seen, the_clock->current_stage, the_clock->ip); + + the_clock->current_stage = waiting_for_sync; + // the_clock->discarding_packets = 1; + the_clock->discard_until_time = reception_time + discard_interval; + } + } + } break; + default: + break; + } + } } } }