// "IEEE Standard for a Precision Clock Synchronization Protocol for Networked Measurement and
// Control Systems" The IEEE Std 1588-2008 (Revision of IEEE Std 1588-2002)
+// transaction tracking
+enum stage {
+ nothing_seen,
+ sync_seen,
+ follow_up_seen,
+ waiting_for_sync // this when you are waiting out a sync for a new cycle
+};
+
// Table 19
enum messageType {
Sync,
Reserved_F
};
+#define MAX_TIMING_SAMPLES 1
+struct timing_samples {
+ uint64_t local, remote;
+} timing_samples;
+
struct ptpSource {
- char *ip; // ipv4 or ipv6
+ char *ip; // ipv4 or ipv6
+ int discarding_packets; // true if discarding packets for a period
+ uint64_t discard_until_time;
+ uint16_t sequence_number;
+ enum stage current_stage;
uint64_t t1, t2, t3, t4, t5, previous_offset;
+ struct timing_samples samples[MAX_TIMING_SAMPLES];
+ int vacant_samples; // the number of elements in the timing_samples array that are not yet used
+ int next_sample_goes_here;
struct ptpSource *next;
} ptpSource;
if (response != NULL) {
memset((void *)response, 0, sizeof(ptpSource));
response->ip = strdup(ip);
+ response->vacant_samples = MAX_TIMING_SAMPLES; // no valid samples yet
*insertion_point = response;
fprintf(stderr, "Clock record created for \"%s\".\n", ip);
}
ret |= setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof(yes));
}
#endif
-
- //if (ret == 0)
- // ret = setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &yes, sizeof(yes));
-
- if (ret != 0)
- fprintf(stderr, "unable to enable timestamping.\n");
-
-
+ if (ret != 0)
+ fprintf(stderr, "unable to enable timestamping.\n");
if (!ret)
ret = bind(fd, p->ai_addr, p->ai_addrlen);
if (sockets_open > 0) {
while (1) {
+ uint64_t discard_interval = 50000000; // 50 ms.
fd_set readSockSet;
struct timeval timeout;
FD_ZERO(&readSockSet);
if (recv_len == -1) {
die("recvfrom()");
} else if (recv_len >= (ssize_t)sizeof(struct ptp_common_message_header)) {
- // get the time
+ // get the time
- struct timeval tv_ioctl;
- tv_ioctl.tv_sec = 0;
- tv_ioctl.tv_usec = 0;
- int error = ioctl(sockets[t].number, SIOCGSTAMP, &tv_ioctl);
- uint64_t reception_time = tv_ioctl.tv_sec;
- reception_time = reception_time * 1000000;
- reception_time = reception_time + tv_ioctl.tv_usec;
- reception_time = reception_time * 1000;
+ struct timeval tv_ioctl;
+ tv_ioctl.tv_sec = 0;
+ tv_ioctl.tv_usec = 0;
+ int error = ioctl(sockets[t].number, SIOCGSTAMP, &tv_ioctl);
+ uint64_t reception_time = tv_ioctl.tv_sec;
+ reception_time = reception_time * 1000000;
+ reception_time = reception_time + tv_ioctl.tv_usec;
+ reception_time = reception_time * 1000;
// check its credentials
// the sending and receiving ports must be the same (i.e. 319 -> 319 or 320 -> 320)
memset(sender_string, 0, sizeof(sender_string));
inet_ntop(connection_ip_family, sender_addr, sender_string, sizeof(sender_string));
- // fprintf(stderr, "connection from %s:%u on port %u\n", sender_string, sender_port,
+ // fprintf(stderr, "connection from %s:%u on port %u\n", sender_string,
+ // sender_port,
// sockets[t].port);
// print_buffer(buf, recv_len);
// now, find or create a record for this ip
struct ptpSource *the_clock = findOrCreateSource(&clocks, sender_string);
- switch (buf[0] & 0xF) {
- case Sync: { // if it's a sync
- struct ptp_sync_message *msg = (struct ptp_sync_message *)buf;
- the_clock->t2 = reception_time;
- memset(&m, 0, sizeof(m));
- m.header.transportSpecificAndMessageID = 0x11;
- m.header.reservedAndVersionPTP = 0x02;
- m.header.messageLength = htons(44);
- m.header.flags = htons(0x608);
- m.header.sourcePortID = htons(1);
- m.header.controlOtherMessage = 5;
- m.header.sequenceId = msg->header.sequenceId;
- struct ifaddrs *ifaddr = NULL;
- struct ifaddrs *ifa = NULL;
-
- if ((status = getifaddrs(&ifaddr) == -1)) {
- fprintf(stderr, "getifaddrs: %s\n", gai_strerror(status));
- } else {
- int found = 0;
- for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
- if ((ifa->ifa_addr) && (ifa->ifa_addr->sa_family == AF_PACKET)) {
- struct sockaddr_ll *s = (struct sockaddr_ll *)ifa->ifa_addr;
- if ((strcmp(ifa->ifa_name, "lo") != 0) && (found == 0)) {
- memcpy(&m.header.clockIdentity, &s->sll_addr, s->sll_halen);
- found = 1;
+ if (the_clock->discarding_packets != 0) {
+ int64_t discard_time_remaining = the_clock->discard_until_time - reception_time;
+ if (discard_time_remaining <= 0)
+ the_clock->discarding_packets = 0;
+ }
+
+ if (the_clock->discarding_packets == 0) {
+ switch (buf[0] & 0xF) {
+ case Sync: { // if it's a sync
+ struct ptp_sync_message *msg = (struct ptp_sync_message *)buf;
+ if ((the_clock->current_stage != nothing_seen) &&
+ (the_clock->current_stage != waiting_for_sync)) {
+
+ fprintf(stderr,
+ "Sync expecting to be in state nothing_seen (%u) or waiting_for_sync "
+ "(%u). Stage error -- "
+ "current state is %u. Discarding. %s\n",
+ nothing_seen, waiting_for_sync, the_clock->current_stage,
+ the_clock->ip);
+
+ the_clock->current_stage = waiting_for_sync;
+ // the_clock->discarding_packets = 1;
+ the_clock->discard_until_time = reception_time + discard_interval;
+ }
+ the_clock->sequence_number = ntohs(msg->header.sequenceId);
+ the_clock->t2 = reception_time;
+ memset(&m, 0, sizeof(m));
+ m.header.transportSpecificAndMessageID = 0x11;
+ m.header.reservedAndVersionPTP = 0x02;
+ m.header.messageLength = htons(44);
+ m.header.flags = htons(0x608);
+ m.header.sourcePortID = htons(1);
+ m.header.controlOtherMessage = 5;
+ m.header.sequenceId = htons(the_clock->sequence_number);
+ struct ifaddrs *ifaddr = NULL;
+ struct ifaddrs *ifa = NULL;
+
+ if ((status = getifaddrs(&ifaddr) == -1)) {
+ fprintf(stderr, "getifaddrs: %s\n", gai_strerror(status));
+ } else {
+ int found = 0;
+ for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
+ if ((ifa->ifa_addr) && (ifa->ifa_addr->sa_family == AF_PACKET)) {
+ struct sockaddr_ll *s = (struct sockaddr_ll *)ifa->ifa_addr;
+ if ((strcmp(ifa->ifa_name, "lo") != 0) && (found == 0)) {
+ memcpy(&m.header.clockIdentity, &s->sll_addr, s->sll_halen);
+ found = 1;
+ }
}
}
+ freeifaddrs(ifaddr);
+ }
+ // fprintf(stderr, "DREQ to %s\n", the_clock->ip);
+ if (sendto(sockets[t].number, &m, sizeof(m), 0,
+ (const struct sockaddr *)&from_sock_addr,
+ from_sock_addr_length) == -1) {
+ fprintf(stderr, "sendto: %s\n", strerror(errno));
+ return 4;
}
- freeifaddrs(ifaddr);
- }
- // fprintf(stderr, "DREQ to %s\n", the_clock->ip);
- the_clock->t3 = get_time_now();
- if (sendto(sockets[t].number, &m, sizeof(m), 0,
- (const struct sockaddr *)&from_sock_addr,
- from_sock_addr_length) == -1) {
- fprintf(stderr, "sendto: %s\n", strerror(errno));
- return 4;
- }
- } break;
-
- case Follow_Up: {
- struct ptp_follow_up_message *msg = (struct ptp_follow_up_message *)buf;
- uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]);
- uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]);
- uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]);
- uint64_t preciseOriginTimestamp = seconds_hi;
- preciseOriginTimestamp = preciseOriginTimestamp << 32;
- preciseOriginTimestamp = preciseOriginTimestamp + seconds_low;
- preciseOriginTimestamp = preciseOriginTimestamp * 1000000000L;
- preciseOriginTimestamp = preciseOriginTimestamp + nanoseconds;
- the_clock->t1 = preciseOriginTimestamp;
- } break;
- case Delay_Resp: {
- struct ptp_delay_resp_message *msg = (struct ptp_delay_resp_message *)buf;
- uint16_t seconds_hi = nctohs(&msg->delay_resp.receiveTimestamp[0]);
- uint32_t seconds_low = nctohl(&msg->delay_resp.receiveTimestamp[2]);
- uint32_t nanoseconds = nctohl(&msg->delay_resp.receiveTimestamp[6]);
- uint64_t receiveTimestamp = seconds_hi;
- receiveTimestamp = receiveTimestamp << 32;
- receiveTimestamp = receiveTimestamp + seconds_low;
- receiveTimestamp = receiveTimestamp * 1000000000L;
- receiveTimestamp = receiveTimestamp + nanoseconds;
- the_clock->t4 = receiveTimestamp;
- the_clock->t5 = reception_time; // t5 - t3 gives us the out-and-back time locally
- // -- an instantaneous quality index
-
- // calculate delay and calculate offset
- // fprintf(stderr, "t1: %016" PRIx64 ", t2: %" PRIx64 ", t3: %" PRIx64 ", t4: %"
- // PRIx64
- // ".\n",t1,t2,t3,t4); fprintf(stderr, "nominal remote transaction time: %" PRIx64
- // " =
- // %" PRIu64 "ns; local transaction time: %" PRIx64 " = %" PRId64 "ns.\n", t4-t1,
- // t4-t1, t3-t2, t3-t2);
- uint64_t offset = the_clock->t1 - the_clock->t2;
- if (the_clock->previous_offset == 0)
- fprintf(stderr, "offset: %" PRIx64 ".\n", offset);
- else {
- int64_t variation = offset - the_clock->previous_offset;
- fprintf(stderr,
- "remote transaction time: %f, offset: %" PRIx64
- ", variation: %+f, turnaround: %f ip: %s.\n",
- (the_clock->t4 - the_clock->t1) * 0.000000001, offset,
- variation * 0.000000001, (the_clock->t5 - the_clock->t2) * 0.000000001, the_clock->ip);
- }
- the_clock->previous_offset = offset;
- // fprintf(stderr, "Offset: %" PRIx64 ", delay %f.\n", offset, delay*0.000000001);
- } break;
- default:
- break;
- }
+ struct timeval tv_ioctl;
+ tv_ioctl.tv_sec = 0;
+ tv_ioctl.tv_usec = 0;
+ int error = ioctl(sockets[t].number, SIOCGSTAMP, &tv_ioctl);
+ uint64_t transmission_time = tv_ioctl.tv_sec;
+ transmission_time = transmission_time * 1000000;
+ transmission_time = transmission_time + tv_ioctl.tv_usec;
+ transmission_time = transmission_time * 1000;
+ the_clock->t3 = transmission_time;
+ // int64_t ttd = transmission_time - the_clock->t3;
+ // fprintf(stderr, "transmission time delta: %f.\n", ttd*0.000000001);
+
+ the_clock->current_stage = sync_seen;
+ } break;
+
+ case Follow_Up: {
+ struct ptp_follow_up_message *msg = (struct ptp_follow_up_message *)buf;
+ if ((the_clock->current_stage == sync_seen) &&
+ (the_clock->sequence_number == ntohs(msg->header.sequenceId))) {
+ uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]);
+ uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]);
+ uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]);
+ uint64_t preciseOriginTimestamp = seconds_hi;
+ preciseOriginTimestamp = preciseOriginTimestamp << 32;
+ preciseOriginTimestamp = preciseOriginTimestamp + seconds_low;
+ preciseOriginTimestamp = preciseOriginTimestamp * 1000000000L;
+ preciseOriginTimestamp = preciseOriginTimestamp + nanoseconds;
+ the_clock->t1 = preciseOriginTimestamp;
+ the_clock->current_stage = follow_up_seen;
+ } else {
+ if (the_clock->current_stage != waiting_for_sync) {
+
+ fprintf(stderr,
+ "Follow_Up expecting to be in state sync_seen (%u). Stage error -- "
+ "current state is %u. Discarding. %s\n",
+ sync_seen, the_clock->current_stage, the_clock->ip);
+
+ the_clock->current_stage = waiting_for_sync;
+ // the_clock->discarding_packets = 1;
+ the_clock->discard_until_time = reception_time + discard_interval;
+ }
+ }
+ } break;
+ case Delay_Resp: {
+ struct ptp_delay_resp_message *msg = (struct ptp_delay_resp_message *)buf;
+ if ((the_clock->current_stage == follow_up_seen) &&
+ (the_clock->sequence_number == ntohs(msg->header.sequenceId))) {
+ uint16_t seconds_hi = nctohs(&msg->delay_resp.receiveTimestamp[0]);
+ uint32_t seconds_low = nctohl(&msg->delay_resp.receiveTimestamp[2]);
+ uint32_t nanoseconds = nctohl(&msg->delay_resp.receiveTimestamp[6]);
+ uint64_t receiveTimestamp = seconds_hi;
+ receiveTimestamp = receiveTimestamp << 32;
+ receiveTimestamp = receiveTimestamp + seconds_low;
+ receiveTimestamp = receiveTimestamp * 1000000000L;
+ receiveTimestamp = receiveTimestamp + nanoseconds;
+ the_clock->t4 = receiveTimestamp;
+ the_clock->t5 =
+ reception_time; // t5 - t3 gives us the out-and-back time locally
+ // -- an instantaneous quality index
+ // t5 - t2 gives us an overall interchange time
+ // from the Sync to the Delay Resp
+
+ if ((the_clock->t5 - the_clock->t2) < 15 * 1000000) {
+ if ((the_clock->t4 - the_clock->t1) < 60 * 1000000) {
+
+ // calculate delay and calculate offset
+ // fprintf(stderr, "t1: %016" PRIx64 ", t2: %" PRIx64 ", t3: %" PRIx64 ",
+ // t4:
+ // %" PRIx64
+ // ".\n",t1,t2,t3,t4); fprintf(stderr, "nominal remote transaction time:
+ // %" PRIx64 " =
+ // %" PRIu64 "ns; local transaction time: %" PRIx64 " = %" PRId64 "ns.\n",
+ // t4-t1, t4-t1, t3-t2, t3-t2);
+
+ // now, store the remote and local times in the array
+ the_clock->samples[the_clock->next_sample_goes_here].local =
+ the_clock->t2;
+ the_clock->samples[the_clock->next_sample_goes_here].remote =
+ the_clock->t1;
+ the_clock->next_sample_goes_here++;
+ if (the_clock->next_sample_goes_here == MAX_TIMING_SAMPLES)
+ the_clock->next_sample_goes_here = 0;
+ if (the_clock->vacant_samples > 0)
+ the_clock->vacant_samples--;
+
+ // fprintf(stderr, "Offset: %" PRIx64 ", delay %f.\n", offset,
+ // delay*0.000000001);
+
+ // clang-format off
+
+ /*
+
+ // here, let's try to use the t1 - remote time and t2 - local time
+ // records to estimate the relationship between the local clock (x) and
+ // remote clock(y) estimate a figure for drift between the local
+ // clock (x) and the remote clock (y)
+
+ // if we plug in a local interval, we will get back what that is in remote
+ // time
+
+ // calculate the line of best fit for relating the local time and the remote
+ // time we will calculate the slope, which is the drift. See
+ // https://www.varsitytutors.com/hotmath/hotmath_help/topics/line-of-best-fit
+
+ long double y_bar = 0; // remote timestamp average
+ long double x_bar = 0; // local timestamp average
+ int sample_count = 0;
+ long double gradient;
+ long double intercept;
+ int i;
+ for (i = 0; i < MAX_TIMING_SAMPLES - the_clock->vacant_samples; i++) {
+ y_bar += (1.0 * the_clock->samples[i].remote);
+ x_bar += (1.0 * the_clock->samples[i].local);
+ sample_count++;
+ }
- } else {
- // fprintf(stderr, "Packet dropped because ports don't match.\n");
+ y_bar = y_bar / sample_count;
+ x_bar = x_bar / sample_count;
+
+ long double xid, yid;
+ long double mtl, mbl;
+ mtl = 0;
+ mbl = 0;
+ for (i = 0; i < MAX_TIMING_SAMPLES - the_clock->vacant_samples; i++) {
+ xid = 1.0 * the_clock->samples[i].local - x_bar;
+ yid = 1.0 * the_clock->samples[i].remote - y_bar;
+ mtl = mtl + xid * yid;
+ mbl = mbl + xid * xid;
+ }
+
+ if (mbl)
+ gradient = (1.0 * mtl) / mbl;
+ else {
+ gradient = 1.0;
+ }
+
+ // intercept = y_bar - gradient * x_bar
+
+ intercept = 1.0 * y_bar - gradient * x_bar;
+
+ // y = mx + c
+ // remote = gradient * local + intercept
+
+ long double remote_f = gradient * the_clock->t2 + intercept;
+ uint64_t remote_estimate = (uint64_t)remote_f;
+ // fprintf(stderr, "remote actual: %" PRIx64 ", remote estimated: %" PRIx64
+ // ".\n", the_clock->t1, remote_estimate);
+
+ // uint64_t offset = the_clock->t1 - the_clock->t2;
+ uint64_t offset = remote_estimate - the_clock->t2;
+ // clang-format on
+ */
+
+ // here, calculate the average offset
+
+ int e;
+ long double offsets = 0;
+ for (e = 0; e < MAX_TIMING_SAMPLES - the_clock->vacant_samples; e++) {
+ offsets = offsets + 1.0 * (the_clock->samples[e].remote -
+ the_clock->samples[e].local);
+ }
+
+ offsets = offsets / (MAX_TIMING_SAMPLES - the_clock->vacant_samples);
+
+ uint64_t offset = (uint64_t)offsets;
+ long double gradient = 1.0;
+ // uint64_t offset = the_clock->t1 - the_clock->t2;
+
+ if (the_clock->previous_offset == 0)
+ fprintf(stderr, "offset: %" PRIx64 ".\n", offset);
+ else {
+ int64_t variation = offset - the_clock->previous_offset;
+ fprintf(stderr,
+ "remote transaction time: %f, offset: %" PRIx64
+ ", variation: %+f, turnaround: %f delta (ppm): %+Lf ip: %s.\n",
+ (the_clock->t4 - the_clock->t1) * 0.000000001, offset,
+ variation * 0.000000001,
+ (the_clock->t5 - the_clock->t2) * 0.000000001, (gradient - 1.0) * 1000000, the_clock->ip);
+ }
+ the_clock->previous_offset = offset;
+ } else {
+ fprintf(stderr,
+ "t4 - t1 (sync and delay response) time is too long. Discarding. %s\n",
+ the_clock->ip);
+ }
+ } else {
+ fprintf(stderr,
+ "t5 - t2 time (cycle time) is too long. Discarding. %s\n",
+ the_clock->ip);
+ }
+ the_clock->current_stage = nothing_seen;
+ } else {
+ if (the_clock->current_stage != waiting_for_sync) {
+
+ fprintf(stderr,
+ "Delay_Resp expecting to be in state sync_seen (%u). Stage error -- "
+ "current state is %u. Discarding. %s\n",
+ sync_seen, the_clock->current_stage, the_clock->ip);
+
+ the_clock->current_stage = waiting_for_sync;
+ // the_clock->discarding_packets = 1;
+ the_clock->discard_until_time = reception_time + discard_interval;
+ }
+ }
+ } break;
+ default:
+ break;
+ }
+ }
}
}
}