]> git.ipfire.org Git - thirdparty/nqptp.git/commitdiff
Works reasonably well. Two filters -- least square line of best fit and average....
authorMike Brady <4265913+mikebrady@users.noreply.github.com>
Tue, 16 Mar 2021 08:33:32 +0000 (08:33 +0000)
committerMike Brady <4265913+mikebrady@users.noreply.github.com>
Tue, 16 Mar 2021 08:33:32 +0000 (08:33 +0000)
nqptp.c

diff --git a/nqptp.c b/nqptp.c
index 93c29ae1a437d5904136c7306d5068af2423b9cb..1c341f838d7d197c45ee0450ca769b6c690e36b8 100644 (file)
--- a/nqptp.c
+++ b/nqptp.c
 // "IEEE Standard for a Precision Clock Synchronization Protocol for Networked Measurement and
 // Control Systems" The IEEE Std 1588-2008 (Revision of IEEE Std 1588-2002)
 
+// transaction tracking
+enum stage {
+  nothing_seen,
+  sync_seen,
+  follow_up_seen,
+  waiting_for_sync // this when you are waiting out a sync for a new cycle
+};
+
 // Table 19
 enum messageType {
   Sync,
@@ -61,9 +69,21 @@ enum messageType {
   Reserved_F
 };
 
+#define MAX_TIMING_SAMPLES 1
+struct timing_samples {
+  uint64_t local, remote;
+} timing_samples;
+
 struct ptpSource {
-  char *ip; // ipv4 or ipv6
+  char *ip;               // ipv4 or ipv6
+  int discarding_packets; // true if discarding packets for a period
+  uint64_t discard_until_time;
+  uint16_t sequence_number;
+  enum stage current_stage;
   uint64_t t1, t2, t3, t4, t5, previous_offset;
+  struct timing_samples samples[MAX_TIMING_SAMPLES];
+  int vacant_samples; // the number of elements in the timing_samples array that are not yet used
+  int next_sample_goes_here;
   struct ptpSource *next;
 } ptpSource;
 
@@ -156,6 +176,7 @@ struct ptpSource *findOrCreateSource(struct ptpSource **list, char *ip) {
     if (response != NULL) {
       memset((void *)response, 0, sizeof(ptpSource));
       response->ip = strdup(ip);
+      response->vacant_samples = MAX_TIMING_SAMPLES; // no valid samples yet
       *insertion_point = response;
       fprintf(stderr, "Clock record created for \"%s\".\n", ip);
     }
@@ -367,14 +388,8 @@ int main(void) {
         ret |= setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &yes, sizeof(yes));
       }
 #endif
-
-                       //if (ret == 0)
-                       //      ret = setsockopt(fd, SOL_SOCKET, SO_TIMESTAMP, &yes, sizeof(yes));
-
-                       if (ret != 0)
-                               fprintf(stderr, "unable to enable timestamping.\n");
-
-
+      if (ret != 0)
+        fprintf(stderr, "unable to enable timestamping.\n");
 
       if (!ret)
         ret = bind(fd, p->ai_addr, p->ai_addrlen);
@@ -398,6 +413,7 @@ int main(void) {
 
   if (sockets_open > 0) {
     while (1) {
+      uint64_t discard_interval = 50000000; // 50 ms.
       fd_set readSockSet;
       struct timeval timeout;
       FD_ZERO(&readSockSet);
@@ -427,16 +443,16 @@ int main(void) {
             if (recv_len == -1) {
               die("recvfrom()");
             } else if (recv_len >= (ssize_t)sizeof(struct ptp_common_message_header)) {
-               // get the time
+              // get the time
 
-                                                       struct timeval tv_ioctl;
-                                                       tv_ioctl.tv_sec = 0;
-                                                       tv_ioctl.tv_usec = 0;
-                                                       int error = ioctl(sockets[t].number, SIOCGSTAMP, &tv_ioctl);
-                                                       uint64_t reception_time = tv_ioctl.tv_sec;
-                                                       reception_time = reception_time * 1000000;
-                                                       reception_time = reception_time + tv_ioctl.tv_usec;
-                                                       reception_time = reception_time * 1000;
+              struct timeval tv_ioctl;
+              tv_ioctl.tv_sec = 0;
+              tv_ioctl.tv_usec = 0;
+              int error = ioctl(sockets[t].number, SIOCGSTAMP, &tv_ioctl);
+              uint64_t reception_time = tv_ioctl.tv_sec;
+              reception_time = reception_time * 1000000;
+              reception_time = reception_time + tv_ioctl.tv_usec;
+              reception_time = reception_time * 1000;
 
               // check its credentials
               // the sending and receiving ports must be the same (i.e. 319 -> 319 or 320 -> 320)
@@ -465,107 +481,283 @@ int main(void) {
                 memset(sender_string, 0, sizeof(sender_string));
                 inet_ntop(connection_ip_family, sender_addr, sender_string, sizeof(sender_string));
 
-                // fprintf(stderr, "connection from %s:%u on port %u\n", sender_string, sender_port,
+                // fprintf(stderr, "connection from %s:%u on port %u\n", sender_string,
+                // sender_port,
                 //        sockets[t].port);
 
                 // print_buffer(buf, recv_len);
 
                 // now, find or create a record for this ip
                 struct ptpSource *the_clock = findOrCreateSource(&clocks, sender_string);
-                switch (buf[0] & 0xF) {
-                case Sync: { // if it's a sync
-                  struct ptp_sync_message *msg = (struct ptp_sync_message *)buf;
-                  the_clock->t2 = reception_time;
-                  memset(&m, 0, sizeof(m));
-                  m.header.transportSpecificAndMessageID = 0x11;
-                  m.header.reservedAndVersionPTP = 0x02;
-                  m.header.messageLength = htons(44);
-                  m.header.flags = htons(0x608);
-                  m.header.sourcePortID = htons(1);
-                  m.header.controlOtherMessage = 5;
-                  m.header.sequenceId = msg->header.sequenceId;
-                  struct ifaddrs *ifaddr = NULL;
-                  struct ifaddrs *ifa = NULL;
-
-                  if ((status = getifaddrs(&ifaddr) == -1)) {
-                    fprintf(stderr, "getifaddrs: %s\n", gai_strerror(status));
-                  } else {
-                    int found = 0;
-                    for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
-                      if ((ifa->ifa_addr) && (ifa->ifa_addr->sa_family == AF_PACKET)) {
-                        struct sockaddr_ll *s = (struct sockaddr_ll *)ifa->ifa_addr;
-                        if ((strcmp(ifa->ifa_name, "lo") != 0) && (found == 0)) {
-                          memcpy(&m.header.clockIdentity, &s->sll_addr, s->sll_halen);
-                          found = 1;
+                if (the_clock->discarding_packets != 0) {
+                  int64_t discard_time_remaining = the_clock->discard_until_time - reception_time;
+                  if (discard_time_remaining <= 0)
+                    the_clock->discarding_packets = 0;
+                }
+
+                if (the_clock->discarding_packets == 0) {
+                  switch (buf[0] & 0xF) {
+                  case Sync: { // if it's a sync
+                    struct ptp_sync_message *msg = (struct ptp_sync_message *)buf;
+                    if ((the_clock->current_stage != nothing_seen) &&
+                        (the_clock->current_stage != waiting_for_sync)) {
+
+                      fprintf(stderr,
+                              "Sync expecting to be in state nothing_seen (%u) or waiting_for_sync "
+                              "(%u). Stage error -- "
+                              "current state is %u. Discarding. %s\n",
+                              nothing_seen, waiting_for_sync, the_clock->current_stage,
+                              the_clock->ip);
+
+                      the_clock->current_stage = waiting_for_sync;
+                      // the_clock->discarding_packets = 1;
+                      the_clock->discard_until_time = reception_time + discard_interval;
+                    }
+                    the_clock->sequence_number = ntohs(msg->header.sequenceId);
+                    the_clock->t2 = reception_time;
+                    memset(&m, 0, sizeof(m));
+                    m.header.transportSpecificAndMessageID = 0x11;
+                    m.header.reservedAndVersionPTP = 0x02;
+                    m.header.messageLength = htons(44);
+                    m.header.flags = htons(0x608);
+                    m.header.sourcePortID = htons(1);
+                    m.header.controlOtherMessage = 5;
+                    m.header.sequenceId = htons(the_clock->sequence_number);
+                    struct ifaddrs *ifaddr = NULL;
+                    struct ifaddrs *ifa = NULL;
+
+                    if ((status = getifaddrs(&ifaddr) == -1)) {
+                      fprintf(stderr, "getifaddrs: %s\n", gai_strerror(status));
+                    } else {
+                      int found = 0;
+                      for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) {
+                        if ((ifa->ifa_addr) && (ifa->ifa_addr->sa_family == AF_PACKET)) {
+                          struct sockaddr_ll *s = (struct sockaddr_ll *)ifa->ifa_addr;
+                          if ((strcmp(ifa->ifa_name, "lo") != 0) && (found == 0)) {
+                            memcpy(&m.header.clockIdentity, &s->sll_addr, s->sll_halen);
+                            found = 1;
+                          }
                         }
                       }
+                      freeifaddrs(ifaddr);
+                    }
+                    // fprintf(stderr, "DREQ to %s\n", the_clock->ip);
+                    if (sendto(sockets[t].number, &m, sizeof(m), 0,
+                               (const struct sockaddr *)&from_sock_addr,
+                               from_sock_addr_length) == -1) {
+                      fprintf(stderr, "sendto: %s\n", strerror(errno));
+                      return 4;
                     }
-                    freeifaddrs(ifaddr);
-                  }
-                  // fprintf(stderr, "DREQ to %s\n", the_clock->ip);
-                  the_clock->t3 = get_time_now();
-                  if (sendto(sockets[t].number, &m, sizeof(m), 0,
-                             (const struct sockaddr *)&from_sock_addr,
-                             from_sock_addr_length) == -1) {
-                    fprintf(stderr, "sendto: %s\n", strerror(errno));
-                    return 4;
-                  }
-                } break;
-
-                case Follow_Up: {
-                  struct ptp_follow_up_message *msg = (struct ptp_follow_up_message *)buf;
-                  uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]);
-                  uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]);
-                  uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]);
-                  uint64_t preciseOriginTimestamp = seconds_hi;
-                  preciseOriginTimestamp = preciseOriginTimestamp << 32;
-                  preciseOriginTimestamp = preciseOriginTimestamp + seconds_low;
-                  preciseOriginTimestamp = preciseOriginTimestamp * 1000000000L;
-                  preciseOriginTimestamp = preciseOriginTimestamp + nanoseconds;
-                  the_clock->t1 = preciseOriginTimestamp;
-                } break;
-                case Delay_Resp: {
-                  struct ptp_delay_resp_message *msg = (struct ptp_delay_resp_message *)buf;
-                  uint16_t seconds_hi = nctohs(&msg->delay_resp.receiveTimestamp[0]);
-                  uint32_t seconds_low = nctohl(&msg->delay_resp.receiveTimestamp[2]);
-                  uint32_t nanoseconds = nctohl(&msg->delay_resp.receiveTimestamp[6]);
-                  uint64_t receiveTimestamp = seconds_hi;
-                  receiveTimestamp = receiveTimestamp << 32;
-                  receiveTimestamp = receiveTimestamp + seconds_low;
-                  receiveTimestamp = receiveTimestamp * 1000000000L;
-                  receiveTimestamp = receiveTimestamp + nanoseconds;
-                  the_clock->t4 = receiveTimestamp;
-                  the_clock->t5 = reception_time; // t5 - t3 gives us the out-and-back time locally
-                                                  // -- an instantaneous quality index
-
-                  // calculate delay and calculate offset
-                  // fprintf(stderr, "t1: %016" PRIx64 ", t2: %" PRIx64 ", t3: %" PRIx64 ", t4: %"
-                  // PRIx64
-                  // ".\n",t1,t2,t3,t4); fprintf(stderr, "nominal remote transaction time: %" PRIx64
-                  // " =
-                  // %" PRIu64 "ns; local transaction time: %" PRIx64 " = %" PRId64 "ns.\n", t4-t1,
-                  // t4-t1, t3-t2, t3-t2);
-                  uint64_t offset = the_clock->t1 - the_clock->t2;
-                  if (the_clock->previous_offset == 0)
-                    fprintf(stderr, "offset: %" PRIx64 ".\n", offset);
-                  else {
-                    int64_t variation = offset - the_clock->previous_offset;
-                    fprintf(stderr,
-                            "remote transaction time: %f, offset: %" PRIx64
-                            ", variation: %+f, turnaround: %f ip: %s.\n",
-                            (the_clock->t4 - the_clock->t1) * 0.000000001, offset,
-                            variation * 0.000000001, (the_clock->t5 - the_clock->t2) * 0.000000001, the_clock->ip);
-                  }
-                  the_clock->previous_offset = offset;
-                  // fprintf(stderr, "Offset: %" PRIx64 ", delay %f.\n", offset, delay*0.000000001);
 
-                } break;
-                default:
-                  break;
-                }
+                    struct timeval tv_ioctl;
+                    tv_ioctl.tv_sec = 0;
+                    tv_ioctl.tv_usec = 0;
+                    int error = ioctl(sockets[t].number, SIOCGSTAMP, &tv_ioctl);
+                    uint64_t transmission_time = tv_ioctl.tv_sec;
+                    transmission_time = transmission_time * 1000000;
+                    transmission_time = transmission_time + tv_ioctl.tv_usec;
+                    transmission_time = transmission_time * 1000;
+                    the_clock->t3 = transmission_time;
+                    // int64_t ttd = transmission_time - the_clock->t3;
+                    // fprintf(stderr, "transmission time delta: %f.\n", ttd*0.000000001);
+
+                    the_clock->current_stage = sync_seen;
+                  } break;
+
+                  case Follow_Up: {
+                    struct ptp_follow_up_message *msg = (struct ptp_follow_up_message *)buf;
+                    if ((the_clock->current_stage == sync_seen) &&
+                        (the_clock->sequence_number == ntohs(msg->header.sequenceId))) {
+                      uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]);
+                      uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]);
+                      uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]);
+                      uint64_t preciseOriginTimestamp = seconds_hi;
+                      preciseOriginTimestamp = preciseOriginTimestamp << 32;
+                      preciseOriginTimestamp = preciseOriginTimestamp + seconds_low;
+                      preciseOriginTimestamp = preciseOriginTimestamp * 1000000000L;
+                      preciseOriginTimestamp = preciseOriginTimestamp + nanoseconds;
+                      the_clock->t1 = preciseOriginTimestamp;
+                      the_clock->current_stage = follow_up_seen;
+                    } else {
+                      if (the_clock->current_stage != waiting_for_sync) {
+
+                        fprintf(stderr,
+                                "Follow_Up expecting to be in state sync_seen (%u). Stage error -- "
+                                "current state is %u. Discarding. %s\n",
+                                sync_seen, the_clock->current_stage, the_clock->ip);
+
+                        the_clock->current_stage = waiting_for_sync;
+                        // the_clock->discarding_packets = 1;
+                        the_clock->discard_until_time = reception_time + discard_interval;
+                      }
+                    }
+                  } break;
+                  case Delay_Resp: {
+                    struct ptp_delay_resp_message *msg = (struct ptp_delay_resp_message *)buf;
+                    if ((the_clock->current_stage == follow_up_seen) &&
+                        (the_clock->sequence_number == ntohs(msg->header.sequenceId))) {
+                      uint16_t seconds_hi = nctohs(&msg->delay_resp.receiveTimestamp[0]);
+                      uint32_t seconds_low = nctohl(&msg->delay_resp.receiveTimestamp[2]);
+                      uint32_t nanoseconds = nctohl(&msg->delay_resp.receiveTimestamp[6]);
+                      uint64_t receiveTimestamp = seconds_hi;
+                      receiveTimestamp = receiveTimestamp << 32;
+                      receiveTimestamp = receiveTimestamp + seconds_low;
+                      receiveTimestamp = receiveTimestamp * 1000000000L;
+                      receiveTimestamp = receiveTimestamp + nanoseconds;
+                      the_clock->t4 = receiveTimestamp;
+                      the_clock->t5 =
+                          reception_time; // t5 - t3 gives us the out-and-back time locally
+                                          // -- an instantaneous quality index
+                                          // t5 - t2 gives us an overall interchange time
+                                          // from the Sync to the Delay Resp
+
+                      if ((the_clock->t5 - the_clock->t2) < 15 * 1000000) {
+                        if ((the_clock->t4 - the_clock->t1) < 60 * 1000000) {
+
+                          // calculate delay and calculate offset
+                          // fprintf(stderr, "t1: %016" PRIx64 ", t2: %" PRIx64 ", t3: %" PRIx64 ",
+                          // t4:
+                          // %" PRIx64
+                          // ".\n",t1,t2,t3,t4); fprintf(stderr, "nominal remote transaction time:
+                          // %" PRIx64 " =
+                          // %" PRIu64 "ns; local transaction time: %" PRIx64 " = %" PRId64 "ns.\n",
+                          // t4-t1, t4-t1, t3-t2, t3-t2);
+
+                          // now, store the remote and local times in the array
+                          the_clock->samples[the_clock->next_sample_goes_here].local =
+                              the_clock->t2;
+                          the_clock->samples[the_clock->next_sample_goes_here].remote =
+                              the_clock->t1;
+                          the_clock->next_sample_goes_here++;
+                          if (the_clock->next_sample_goes_here == MAX_TIMING_SAMPLES)
+                            the_clock->next_sample_goes_here = 0;
+                          if (the_clock->vacant_samples > 0)
+                            the_clock->vacant_samples--;
+
+                          // fprintf(stderr, "Offset: %" PRIx64 ", delay %f.\n", offset,
+                          // delay*0.000000001);
+
+                          // clang-format off
+
+                                                                                       /*
+
+                        // here, let's try to use the t1 - remote time and t2 - local time
+                        // records to estimate the relationship between the local clock (x) and
+                        // remote clock(y) estimate a figure for drift between the local
+                        // clock (x) and the remote clock (y)
+
+                        // if we plug in a local interval, we will get back what that is in remote
+                        // time
+
+                        // calculate the line of best fit for relating the local time and the remote
+                        // time we will calculate the slope, which is the drift. See
+                        // https://www.varsitytutors.com/hotmath/hotmath_help/topics/line-of-best-fit
+
+                        long double y_bar = 0; // remote timestamp average
+                        long double x_bar = 0; // local timestamp average
+                        int sample_count = 0;
+                        long double gradient;
+                        long double intercept;
+                        int i;
+                        for (i = 0; i < MAX_TIMING_SAMPLES - the_clock->vacant_samples; i++) {
+                          y_bar += (1.0 * the_clock->samples[i].remote);
+                          x_bar += (1.0 * the_clock->samples[i].local);
+                          sample_count++;
+                        }
 
-              } else {
-                // fprintf(stderr, "Packet dropped because ports don't match.\n");
+                        y_bar = y_bar / sample_count;
+                        x_bar = x_bar / sample_count;
+
+                        long double xid, yid;
+                        long double mtl, mbl;
+                        mtl = 0;
+                        mbl = 0;
+                        for (i = 0; i < MAX_TIMING_SAMPLES - the_clock->vacant_samples; i++) {
+                          xid = 1.0 * the_clock->samples[i].local - x_bar;
+                          yid = 1.0 * the_clock->samples[i].remote - y_bar;
+                          mtl = mtl + xid * yid;
+                          mbl = mbl + xid * xid;
+                        }
+
+                        if (mbl)
+                          gradient = (1.0 * mtl) / mbl;
+                        else {
+                          gradient = 1.0;
+                        }
+
+                        // intercept = y_bar  - gradient * x_bar
+
+                        intercept = 1.0 * y_bar - gradient * x_bar;
+
+                        // y = mx + c
+                        // remote = gradient * local + intercept
+
+                        long double remote_f = gradient * the_clock->t2 + intercept;
+                        uint64_t remote_estimate = (uint64_t)remote_f;
+                        // fprintf(stderr, "remote actual: %" PRIx64 ", remote estimated: %" PRIx64
+                        // ".\n", the_clock->t1, remote_estimate);
+
+                        // uint64_t offset = the_clock->t1 - the_clock->t2;
+                        uint64_t offset = remote_estimate - the_clock->t2;
+                        // clang-format on
+                      */
+
+                      // here, calculate the average offset
+
+                      int e;
+                      long double offsets = 0;
+                      for (e = 0; e < MAX_TIMING_SAMPLES - the_clock->vacant_samples; e++) {
+                        offsets = offsets + 1.0 * (the_clock->samples[e].remote -
+                                                   the_clock->samples[e].local);
+                      }
+
+                      offsets = offsets / (MAX_TIMING_SAMPLES - the_clock->vacant_samples);
+
+                      uint64_t offset = (uint64_t)offsets;
+                      long double gradient = 1.0;
+                      // uint64_t offset = the_clock->t1 - the_clock->t2;
+
+                        if (the_clock->previous_offset == 0)
+                          fprintf(stderr, "offset: %" PRIx64 ".\n", offset);
+                        else {
+                          int64_t variation = offset - the_clock->previous_offset;
+                          fprintf(stderr,
+                                  "remote transaction time: %f, offset: %" PRIx64
+                                  ", variation: %+f, turnaround: %f delta (ppm): %+Lf ip: %s.\n",
+                                  (the_clock->t4 - the_clock->t1) * 0.000000001, offset,
+                                  variation * 0.000000001,
+                                  (the_clock->t5 - the_clock->t2) * 0.000000001, (gradient - 1.0) * 1000000, the_clock->ip);
+                        }
+                        the_clock->previous_offset = offset;
+                      } else {
+                       fprintf(stderr,
+                              "t4 - t1 (sync and delay response) time is too long. Discarding. %s\n",
+                              the_clock->ip);
+                      }
+                      } else {
+                       fprintf(stderr,
+                              "t5 - t2 time (cycle time) is too long. Discarding. %s\n",
+                              the_clock->ip);
+                      }
+                      the_clock->current_stage = nothing_seen;
+                    } else {
+                      if (the_clock->current_stage != waiting_for_sync) {
+
+                      fprintf(stderr,
+                              "Delay_Resp expecting to be in state sync_seen (%u). Stage error -- "
+                              "current state is %u. Discarding. %s\n",
+                              sync_seen, the_clock->current_stage, the_clock->ip);
+
+                        the_clock->current_stage = waiting_for_sync;
+                        // the_clock->discarding_packets = 1;
+                        the_clock->discard_until_time = reception_time + discard_interval;
+                      }
+                    }
+                  } break;
+                  default:
+                    break;
+                  }
+                }
               }
             }
           }