]> git.ipfire.org Git - thirdparty/nqptp.git/commitdiff
receives timing peer lists and sets flags accordingly.
authorMike Brady <4265913+mikebrady@users.noreply.github.com>
Wed, 7 Apr 2021 09:35:45 +0000 (10:35 +0100)
committerMike Brady <4265913+mikebrady@users.noreply.github.com>
Wed, 7 Apr 2021 09:35:45 +0000 (10:35 +0100)
nqptp-clock-sources.c
nqptp-clock-sources.h
nqptp-message-handlers.c
nqptp-message-handlers.h
nqptp-shm-structures.h
nqptp-utilities.c
nqptp.c

index d7a0e59bf36049784b22f02301ae625b1611edbc..bcc1d9b14a44bbd593b150d35be3e001a51b2124 100644 (file)
@@ -29,8 +29,7 @@
 #define FIELD_SIZEOF(t, f) (sizeof(((t *)0)->f))
 #endif
 
-int find_clock_source_record(char *sender_string, uint64_t packet_clock_id,
-                             clock_source *clocks_shared_info,
+int find_clock_source_record(char *sender_string, clock_source *clocks_shared_info,
                              clock_source_private_data *clocks_private_info) {
   // return the index of the clock in the clock information arrays or -1
   int response = -1;
@@ -38,7 +37,6 @@ int find_clock_source_record(char *sender_string, uint64_t packet_clock_id,
   int found = 0;
   while ((found == 0) && (i < MAX_CLOCKS)) {
     if ((clocks_private_info[i].in_use != 0) &&
-        (clocks_shared_info[i].clock_id == packet_clock_id) &&
         (strcasecmp(sender_string, (const char *)&clocks_shared_info[i].ip) == 0))
       found = 1;
     else
@@ -49,9 +47,9 @@ int find_clock_source_record(char *sender_string, uint64_t packet_clock_id,
   return response;
 }
 
-int create_clock_source_record(char *sender_string, uint64_t packet_clock_id,
-                               clock_source *clocks_shared_info,
-                               clock_source_private_data *clocks_private_info) {
+int create_clock_source_record(char *sender_string, clock_source *clocks_shared_info,
+                               clock_source_private_data *clocks_private_info, int use_lock) {
+  // sometimes, the mutex will already be locked
   // return the index of a clock entry in the clock information arrays or -1 if full
   // initialise the entries in the shared and private arrays
   int response = -1;
@@ -66,16 +64,16 @@ int create_clock_source_record(char *sender_string, uint64_t packet_clock_id,
 
   if (found == 1) {
     response = i;
-    int rc = pthread_mutex_lock(&shared_memory->shm_mutex);
-    if (rc != 0)
-      warn("Can't acquire mutex to activate a new  clock!");
+    if (use_lock != 0) {
+      if (pthread_mutex_lock(&shared_memory->shm_mutex) != 0)
+        warn("Can't acquire mutex to activate a new  clock!");
+    }
     memset(&clocks_shared_info[i], 0, sizeof(clock_source));
     strncpy((char *)&clocks_shared_info[i].ip, sender_string, FIELD_SIZEOF(clock_source, ip) - 1);
-    clocks_shared_info[i].clock_id = packet_clock_id;
-    rc = pthread_mutex_unlock(&shared_memory->shm_mutex);
-    if (rc != 0)
-      warn("Can't release mutex after activating a new clock!");
-
+    if (use_lock != 0) {
+      if (pthread_mutex_unlock(&shared_memory->shm_mutex) != 0)
+        warn("Can't release mutex after activating a new clock!");
+    }
     memset(&clocks_private_info[i], 0, sizeof(clock_source_private_data));
     clocks_private_info[i].in_use = 1;
     clocks_private_info[i].t2 = 0;
@@ -92,9 +90,12 @@ void manage_clock_sources(uint64_t reception_time, clock_source *clocks_shared_i
                           clock_source_private_data *clocks_private_info) {
   debug(3, "manage_clock_sources");
   int i;
+  // do a garbage collect for clock records no longer in use
   for (i = 0; i < MAX_CLOCKS; i++) {
-    if (clocks_private_info[i].in_use != 0) {
-      int64_t time_since_last_sync = reception_time - clocks_private_info[i].t2;
+    // only if its in use and not a timing peer... don't need a mutex to check
+    if ((clocks_private_info[i].in_use != 0) && (clocks_shared_info[i].timing_peer == 0)) {
+      int64_t time_since_last_use = reception_time - clocks_private_info[i].time_of_last_use;
+      // using a sync timeout to determine when to drop the record...
       // the following give the sync receipt time in whole seconds
       // depending on the aPTPinitialLogSyncInterval and the aPTPsyncReceiptTimeout
       int64_t syncTimeout = (1 << (32 + aPTPinitialLogSyncInterval));
@@ -102,7 +103,7 @@ void manage_clock_sources(uint64_t reception_time, clock_source *clocks_shared_i
       syncTimeout = syncTimeout >> 32;
       // seconds to nanoseconds
       syncTimeout = syncTimeout * 1000000000;
-      if (time_since_last_sync > syncTimeout) {
+      if (time_since_last_use > syncTimeout) {
         debug(2, "deactivated source %d with clock_id %" PRIx64 " on ip: %s.", i,
               clocks_shared_info[i].clock_id, &clocks_shared_info[i].ip);
         int rc = pthread_mutex_lock(&shared_memory->shm_mutex);
index 834462ed6f99e2de8d44e0d7b5173cc52fda1ebb..62d2b0b0f82bc0c9de42ed71244db9479592a7cf 100644 (file)
@@ -34,20 +34,20 @@ typedef struct {
   uint16_t in_use;
   enum stage current_stage;
   uint64_t t2;
-
+  // for garbage collection
+  uint64_t time_of_last_use; // will be taken out of use if not used for a while and not in the
+                             // timing peer group
+  // (A member of the timing peer group could appear and disappear)
   // for Announce Qualification
   uint64_t announce_times[4]; // we'll check qualification and currency using these
-  int announce_is_valid;      // this may mean it's a master clock_source
   int is_one_of_ours;         // true if it is one of our own clocks
 } clock_source_private_data;
 
-int find_clock_source_record(char *sender_string, uint64_t packet_clock_id,
-                             clock_source *clocks_shared_info,
+int find_clock_source_record(char *sender_string, clock_source *clocks_shared_info,
                              clock_source_private_data *clocks_private_info);
 
-int create_clock_source_record(char *sender_string, uint64_t packet_clock_id,
-                               clock_source *clocks_shared_info,
-                               clock_source_private_data *clocks_private_info);
+int create_clock_source_record(char *sender_string, clock_source *clocks_shared_info,
+                               clock_source_private_data *clocks_private_info, int use_lock);
 
 void update_clock_self_identifications(clock_source *clocks_shared_info,
                                        clock_source_private_data *clocks_private_info);
index e27c6dfe491e3ef12424629ecd174e5cac6b4ef4..48f0c3eb769991720ba483b06f03a7d74e933421 100644 (file)
  *
  * Commercial licensing is also available.
  */
-
 #include "nqptp-message-handlers.h"
 #include "nqptp-ptp-definitions.h"
 #include "nqptp-utilities.h"
+#include <string.h>
 
 #include "debug.h"
 #include "general-utilities.h"
 
+void handle_control_port_messages(char *buf, ssize_t recv_len, clock_source *clock_info,
+                                  clock_source_private_data *clock_private_info) {
+  if (recv_len != -1) {
+    buf[recv_len - 1] = 0; // make sure there's a null in it!
+    if (strstr(buf, "set_timing_peers ") == buf) {
+      char *ip_list = buf + strlen("set_timing_peers ");
+
+      int rc = pthread_mutex_lock(&shared_memory->shm_mutex);
+      if (rc != 0)
+        warn("Can't acquire mutex to set_timing_peers!");
+      // turn off all is_timing_peers
+      int i;
+      for (i = 0; i < MAX_CLOCKS; i++)
+        clock_info[i].timing_peer = 0;
+
+      while (ip_list != NULL) {
+        char *new_ip = strsep(&ip_list, " ");
+        // look for the IP in the list of clocks, and create an inert entry if not there
+        int t = find_clock_source_record(new_ip, clock_info, clock_private_info);
+        if (t == -1)
+          t = create_clock_source_record(new_ip, clock_info, clock_private_info,
+                                         0); // don't use the mutex
+
+        clock_info[t].timing_peer = 1;
+      }
+
+      rc = pthread_mutex_unlock(&shared_memory->shm_mutex);
+      if (rc != 0)
+        warn("Can't release mutex after set_timing_peers!");
+
+      for (i = 0; i < MAX_CLOCKS; i++) {
+        if (clock_info[i].timing_peer != 0)
+          debug(3, "%s is in the timing peer group.", &clock_info[i].ip);
+      }
+    } else {
+      warn("Unrecognised string on the control port.");
+    }
+  } else {
+    warn("Bad packet on the control port.");
+  }
+}
+
 void handle_announce(char *buf, ssize_t recv_len, clock_source *clock_info,
                      clock_source_private_data *clock_private_info, uint64_t reception_time) {
   // reject Announce messages from self
@@ -32,6 +74,7 @@ void handle_announce(char *buf, ssize_t recv_len, clock_source *clock_info,
     // make way for the new time
     if ((size_t)recv_len >= sizeof(struct ptp_announce_message)) {
       struct ptp_announce_message *msg = (struct ptp_announce_message *)buf;
+
       int i;
       // number of elements in the array is 4, hence the 4-1 stuff
       for (i = 4 - 1; i > 1 - 1; i--) {
@@ -63,37 +106,45 @@ void handle_announce(char *buf, ssize_t recv_len, clock_source *clock_info,
         i++;
       }
       if (valid_count >= foreign_master_threshold) {
-        if (clock_private_info->announce_is_valid == 0) {
+        if (clock_info->qualified == 0) {
           uint64_t grandmaster_clock_id = nctohl(&msg->announce.grandmasterIdentity[0]);
           uint64_t grandmaster_clock_id_low = nctohl(&msg->announce.grandmasterIdentity[4]);
           grandmaster_clock_id = grandmaster_clock_id << 32;
           grandmaster_clock_id = grandmaster_clock_id + grandmaster_clock_id_low;
 
-          debug(1,
-                "clock_id %" PRIx64 " on ip: %s, \"Announce\" message is Qualified -- See 9.3.2.5.",
+          debug(2,
+                "clock_id %" PRIx64 " at:    %s, \"Announce\" message is Qualified -- See 9.3.2.5.",
                 clock_info->clock_id, clock_info->ip);
           uint32_t clockQuality = msg->announce.grandmasterClockQuality;
           uint8_t clockClass = (clockQuality >> 24) & 0xff;
           uint8_t clockAccuracy = (clockQuality >> 16) & 0xff;
           uint16_t offsetScaledLogVariance = clockQuality & 0xffff;
-          debug(1, "    grandmasterIdentity:     %" PRIx64 ".", grandmaster_clock_id);
-          debug(1, "    grandmasterPriority1:    %u.", msg->announce.grandmasterPriority1);
-          debug(1, "    grandmasterClockQuality: 0x%x.", msg->announce.grandmasterClockQuality);
-          debug(1, "        clockClass:              %u.", clockClass); // See 7.6.2.4 clockClass
-          debug(1, "        clockAccuracy:           0x%x.",
+          debug(2, "    grandmasterIdentity:         %" PRIx64 ".", grandmaster_clock_id);
+          debug(2, "    grandmasterPriority1:        %u.", msg->announce.grandmasterPriority1);
+          debug(2, "    grandmasterClockQuality:     0x%x.", msg->announce.grandmasterClockQuality);
+          debug(2, "        clockClass:              %u.", clockClass); // See 7.6.2.4 clockClass
+          debug(2, "        clockAccuracy:           0x%x.",
                 clockAccuracy); // See 7.6.2.5 clockAccuracy
-          debug(1, "        offsetScaledLogVariance: %x.",
+          debug(2, "        offsetScaledLogVariance: 0x%x.",
                 offsetScaledLogVariance); // See 7.6.3 PTP variance
-          debug(1, "    grandmasterPriority2:    %u.", msg->announce.grandmasterPriority2);
+          debug(2, "    grandmasterPriority2:        %u.", msg->announce.grandmasterPriority2);
         }
-        clock_private_info->announce_is_valid = 1;
+        if (pthread_mutex_lock(&shared_memory->shm_mutex) != 0)
+          warn("Can't acquire mutex to set_timing_peers!");
+        clock_info->qualified = 1;
+        if (pthread_mutex_unlock(&shared_memory->shm_mutex) != 0)
+          warn("Can't release mutex after set_timing_peers!");
       } else {
-        if (clock_private_info->announce_is_valid != 0)
+        if (clock_info->qualified != 0)
           debug(1,
                 "clock_id %" PRIx64
                 " on ip: %s \"Announce\" message is not Qualified -- See 9.3.2.5.",
                 clock_info->clock_id, clock_info->ip);
-        clock_private_info->announce_is_valid = 0;
+        if (pthread_mutex_lock(&shared_memory->shm_mutex) != 0)
+          warn("Can't acquire mutex to set_timing_peers!");
+        clock_info->qualified = 0;
+        if (pthread_mutex_unlock(&shared_memory->shm_mutex) != 0)
+          warn("Can't release mutex after set_timing_peers!");
       }
     }
   }
index 2c7b36af5954c3294876377c1725788987c39912..a882dde00ef5870100b3c966b7131c92bbdff7e2 100644 (file)
@@ -26,4 +26,7 @@
 void handle_announce(char *buf, ssize_t recv_len, clock_source *clock_info,
                      clock_source_private_data *clock_private_info, uint64_t reception_time);
 
+void handle_control_port_messages(char *buf, ssize_t recv_len, clock_source *clock_info,
+                                  clock_source_private_data *clock_private_info);
+
 #endif
\ No newline at end of file
index 8832906ec60ebfdd83a4fa2583bebb30340f5173..99cd4a76ae59bf5a64eb1403dd6288b3415b0773 100644 (file)
 #define STORAGE_ID "/nqptp"
 #define MAX_CLOCKS 32
 #define NQPTP_SHM_STRUCTURES_VERSION 1
+#define NQPTP_CONTROL_PORT 9000
+
+// the control port will accept a packet with the first word being:
+// "set_timing_peers" followed by a space and then a space-delimited
+// list of ip numbers, either IPv4 or IPv6
+// the whole not to exceed 4096 characters in total
 
 #include <inttypes.h>
 #include <netinet/in.h>
 typedef struct {
   char ip[64]; // 64 is nicely aligned and bigger than INET6_ADDRSTRLEN (46)
   uint64_t clock_id;
-  uint64_t reserved;
   uint64_t local_time;                  // the local time when the offset was calculated
   uint64_t local_to_source_time_offset; // add this to the local time to get source time
-  int flags;                            // not used yet
-  int valid;                            // this entry is valid
+  uint8_t flags;                        // not used yet
+  uint8_t valid;                        // this entry is valid
+  uint8_t timing_peer;                  // true if this is in the current timing peer group
+  uint8_t qualified;                    // true if it has valid Announce messages
 } clock_source;
 
 struct shm_structure {
index 170c88cdef08db69debb6951732914bca34f040e..201bc7da5abc8168827e5a12b855fd3ccd55a678 100644 (file)
@@ -159,7 +159,7 @@ void debug_print_buffer(int level, char *buf, size_t buf_len) {
       debug(level, "SGNL: \"%s\".", obf);
       break;
     default:
-      debug(level, "      \"%s\".", obf);
+      debug(1, "XXXX  \"%s\".", obf); // output this at level 1
       break;
     }
     free(obf);
diff --git a/nqptp.c b/nqptp.c
index 36d81d0dc4b7496809307de2d711a88e5fc24801..74931d9420df40d8bb24be41e093c33cde88a345 100644 (file)
--- a/nqptp.c
+++ b/nqptp.c
@@ -137,6 +137,8 @@ int main(void) {
 
   open_sockets_at_port(319, &sockets_open_stuff);
   open_sockets_at_port(320, &sockets_open_stuff);
+  open_sockets_at_port(NQPTP_CONTROL_PORT,
+                       &sockets_open_stuff); // this for messages from the client
 
   // open a shared memory interface.
   int shm_fd = -1;
@@ -245,19 +247,30 @@ int main(void) {
           msg.msg_control = &control;
           msg.msg_controllen = sizeof(control);
 
+          uint16_t receiver_port = 0;
           // int msgsize = recv(udpsocket_fd, &msg_buffer, 4, 0);
           recv_len = recvmsg(socket_number, &msg, MSG_DONTWAIT);
 
-          if (recv_len != -1)
-            debug_print_buffer(2, buf, recv_len);
-
+          if (recv_len != -1) {
+            // get the receiver port
+            unsigned int jp;
+            for (jp = 0; jp < sockets_open_stuff.sockets_open; jp++) {
+              if (socket_number == sockets_open_stuff.sockets[jp].number)
+                receiver_port = sockets_open_stuff.sockets[jp].port;
+            }
+          }
           if (recv_len == -1) {
             if (errno == EAGAIN) {
               usleep(1000); // this can happen, it seems...
             } else {
               debug(1, "recvmsg() error %d", errno);
             }
+            // check if it's a control port message before checking for the length of the message.
+          } else if (receiver_port == NQPTP_CONTROL_PORT) {
+            handle_control_port_messages(buf, recv_len, (clock_source *)&shared_memory->clocks,
+                                         (clock_source_private_data *)&clocks_private);
           } else if (recv_len >= (ssize_t)sizeof(struct ptp_common_message_header)) {
+            debug_print_buffer(2, buf, recv_len);
             debug(3, "Received %d bytes control message on reception.", msg.msg_controllen);
             // get the time
             int level, type;
@@ -309,36 +322,24 @@ int main(void) {
               sender_port = ntohs(sa4->sin_port);
             }
 
-            // check here if the sender port and receiver port are the same
-            // find the socket in the socket list
-            uint16_t receiver_port = 0;
-            unsigned int jp;
-            for (jp = 0; jp < sockets_open_stuff.sockets_open; jp++) {
-              if (socket_number == sockets_open_stuff.sockets[jp].number)
-                receiver_port = sockets_open_stuff.sockets[jp].port;
-            }
-
             if (sender_port == receiver_port) {
 
               char sender_string[256];
               memset(sender_string, 0, sizeof(sender_string));
               inet_ntop(connection_ip_family, sender_addr, sender_string, sizeof(sender_string));
-              // now, find or create a record for this ip / clock_id combination
-              struct ptp_common_message_header *mt = (struct ptp_common_message_header *)buf;
-              uint64_t packet_clock_id = nctohl(&mt->clockIdentity[0]);
-              uint64_t packet_clock_id_low = nctohl(&mt->clockIdentity[4]);
-              packet_clock_id = packet_clock_id << 32;
-              packet_clock_id = packet_clock_id + packet_clock_id_low;
-
-              int the_clock = find_clock_source_record(
-                  sender_string, packet_clock_id, (clock_source *)&shared_memory->clocks,
-                  (clock_source_private_data *)&clocks_private);
+              // now, find or create a record for this ip
+              int the_clock =
+                  find_clock_source_record(sender_string, (clock_source *)&shared_memory->clocks,
+                                           (clock_source_private_data *)&clocks_private);
+              // not sure about requiring a Sync before creating it...
               if ((the_clock == -1) && ((buf[0] & 0xF) == Sync)) {
                 the_clock = create_clock_source_record(
-                    sender_string, packet_clock_id, (clock_source *)&shared_memory->clocks,
-                    (clock_source_private_data *)&clocks_private);
+                    sender_string, (clock_source *)&shared_memory->clocks,
+                    (clock_source_private_data *)&clocks_private, 1); // the "1" means use mutexes
               }
               if (the_clock != -1) {
+                clocks_private[the_clock].time_of_last_use =
+                    reception_time; // for garbage collection
                 switch (buf[0] & 0xF) {
                 case Announce:
                   // needed to reject messages coming from self
@@ -404,10 +405,16 @@ int main(void) {
 
                 case Follow_Up: {
                   struct ptp_follow_up_message *msg = (struct ptp_follow_up_message *)buf;
+
                   if ((clocks_private[the_clock].current_stage == sync_seen) &&
                       (clocks_private[the_clock].sequence_number ==
                        ntohs(msg->header.sequenceId))) {
 
+                    uint64_t packet_clock_id = nctohl(&msg->header.clockIdentity[0]);
+                    uint64_t packet_clock_id_low = nctohl(&msg->header.clockIdentity[4]);
+                    packet_clock_id = packet_clock_id << 32;
+                    packet_clock_id = packet_clock_id + packet_clock_id_low;
+
                     uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]);
                     uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]);
                     uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]);
@@ -428,6 +435,9 @@ int main(void) {
                     int rc = pthread_mutex_lock(&shared_memory->shm_mutex);
                     if (rc != 0)
                       warn("Can't acquire mutex to update a clock!");
+                    // update/set the clock_id
+
+                    shared_memory->clocks[the_clock].clock_id = packet_clock_id;
                     shared_memory->clocks[the_clock].valid = 1;
                     shared_memory->clocks[the_clock].local_time = clocks_private[the_clock].t2;
                     shared_memory->clocks[the_clock].local_to_source_time_offset = offset;