From: Mike Brady <4265913+mikebrady@users.noreply.github.com> Date: Wed, 7 Apr 2021 09:35:45 +0000 (+0100) Subject: receives timing peer lists and sets flags accordingly. X-Git-Tag: 1.1-dev~43 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=44038d64833a54fce818a8f33f3cf6a07723431d;p=thirdparty%2Fnqptp.git receives timing peer lists and sets flags accordingly. --- diff --git a/nqptp-clock-sources.c b/nqptp-clock-sources.c index d7a0e59..bcc1d9b 100644 --- a/nqptp-clock-sources.c +++ b/nqptp-clock-sources.c @@ -29,8 +29,7 @@ #define FIELD_SIZEOF(t, f) (sizeof(((t *)0)->f)) #endif -int find_clock_source_record(char *sender_string, uint64_t packet_clock_id, - clock_source *clocks_shared_info, +int find_clock_source_record(char *sender_string, clock_source *clocks_shared_info, clock_source_private_data *clocks_private_info) { // return the index of the clock in the clock information arrays or -1 int response = -1; @@ -38,7 +37,6 @@ int find_clock_source_record(char *sender_string, uint64_t packet_clock_id, int found = 0; while ((found == 0) && (i < MAX_CLOCKS)) { if ((clocks_private_info[i].in_use != 0) && - (clocks_shared_info[i].clock_id == packet_clock_id) && (strcasecmp(sender_string, (const char *)&clocks_shared_info[i].ip) == 0)) found = 1; else @@ -49,9 +47,9 @@ int find_clock_source_record(char *sender_string, uint64_t packet_clock_id, return response; } -int create_clock_source_record(char *sender_string, uint64_t packet_clock_id, - clock_source *clocks_shared_info, - clock_source_private_data *clocks_private_info) { +int create_clock_source_record(char *sender_string, clock_source *clocks_shared_info, + clock_source_private_data *clocks_private_info, int use_lock) { + // sometimes, the mutex will already be locked // return the index of a clock entry in the clock information arrays or -1 if full // initialise the entries in the shared and private arrays int response = -1; @@ -66,16 +64,16 @@ int create_clock_source_record(char *sender_string, uint64_t packet_clock_id, if (found == 1) { response = i; - int rc = pthread_mutex_lock(&shared_memory->shm_mutex); - if (rc != 0) - warn("Can't acquire mutex to activate a new clock!"); + if (use_lock != 0) { + if (pthread_mutex_lock(&shared_memory->shm_mutex) != 0) + warn("Can't acquire mutex to activate a new clock!"); + } memset(&clocks_shared_info[i], 0, sizeof(clock_source)); strncpy((char *)&clocks_shared_info[i].ip, sender_string, FIELD_SIZEOF(clock_source, ip) - 1); - clocks_shared_info[i].clock_id = packet_clock_id; - rc = pthread_mutex_unlock(&shared_memory->shm_mutex); - if (rc != 0) - warn("Can't release mutex after activating a new clock!"); - + if (use_lock != 0) { + if (pthread_mutex_unlock(&shared_memory->shm_mutex) != 0) + warn("Can't release mutex after activating a new clock!"); + } memset(&clocks_private_info[i], 0, sizeof(clock_source_private_data)); clocks_private_info[i].in_use = 1; clocks_private_info[i].t2 = 0; @@ -92,9 +90,12 @@ void manage_clock_sources(uint64_t reception_time, clock_source *clocks_shared_i clock_source_private_data *clocks_private_info) { debug(3, "manage_clock_sources"); int i; + // do a garbage collect for clock records no longer in use for (i = 0; i < MAX_CLOCKS; i++) { - if (clocks_private_info[i].in_use != 0) { - int64_t time_since_last_sync = reception_time - clocks_private_info[i].t2; + // only if its in use and not a timing peer... don't need a mutex to check + if ((clocks_private_info[i].in_use != 0) && (clocks_shared_info[i].timing_peer == 0)) { + int64_t time_since_last_use = reception_time - clocks_private_info[i].time_of_last_use; + // using a sync timeout to determine when to drop the record... // the following give the sync receipt time in whole seconds // depending on the aPTPinitialLogSyncInterval and the aPTPsyncReceiptTimeout int64_t syncTimeout = (1 << (32 + aPTPinitialLogSyncInterval)); @@ -102,7 +103,7 @@ void manage_clock_sources(uint64_t reception_time, clock_source *clocks_shared_i syncTimeout = syncTimeout >> 32; // seconds to nanoseconds syncTimeout = syncTimeout * 1000000000; - if (time_since_last_sync > syncTimeout) { + if (time_since_last_use > syncTimeout) { debug(2, "deactivated source %d with clock_id %" PRIx64 " on ip: %s.", i, clocks_shared_info[i].clock_id, &clocks_shared_info[i].ip); int rc = pthread_mutex_lock(&shared_memory->shm_mutex); diff --git a/nqptp-clock-sources.h b/nqptp-clock-sources.h index 834462e..62d2b0b 100644 --- a/nqptp-clock-sources.h +++ b/nqptp-clock-sources.h @@ -34,20 +34,20 @@ typedef struct { uint16_t in_use; enum stage current_stage; uint64_t t2; - + // for garbage collection + uint64_t time_of_last_use; // will be taken out of use if not used for a while and not in the + // timing peer group + // (A member of the timing peer group could appear and disappear) // for Announce Qualification uint64_t announce_times[4]; // we'll check qualification and currency using these - int announce_is_valid; // this may mean it's a master clock_source int is_one_of_ours; // true if it is one of our own clocks } clock_source_private_data; -int find_clock_source_record(char *sender_string, uint64_t packet_clock_id, - clock_source *clocks_shared_info, +int find_clock_source_record(char *sender_string, clock_source *clocks_shared_info, clock_source_private_data *clocks_private_info); -int create_clock_source_record(char *sender_string, uint64_t packet_clock_id, - clock_source *clocks_shared_info, - clock_source_private_data *clocks_private_info); +int create_clock_source_record(char *sender_string, clock_source *clocks_shared_info, + clock_source_private_data *clocks_private_info, int use_lock); void update_clock_self_identifications(clock_source *clocks_shared_info, clock_source_private_data *clocks_private_info); diff --git a/nqptp-message-handlers.c b/nqptp-message-handlers.c index e27c6df..48f0c3e 100644 --- a/nqptp-message-handlers.c +++ b/nqptp-message-handlers.c @@ -16,14 +16,56 @@ * * Commercial licensing is also available. */ - #include "nqptp-message-handlers.h" #include "nqptp-ptp-definitions.h" #include "nqptp-utilities.h" +#include #include "debug.h" #include "general-utilities.h" +void handle_control_port_messages(char *buf, ssize_t recv_len, clock_source *clock_info, + clock_source_private_data *clock_private_info) { + if (recv_len != -1) { + buf[recv_len - 1] = 0; // make sure there's a null in it! + if (strstr(buf, "set_timing_peers ") == buf) { + char *ip_list = buf + strlen("set_timing_peers "); + + int rc = pthread_mutex_lock(&shared_memory->shm_mutex); + if (rc != 0) + warn("Can't acquire mutex to set_timing_peers!"); + // turn off all is_timing_peers + int i; + for (i = 0; i < MAX_CLOCKS; i++) + clock_info[i].timing_peer = 0; + + while (ip_list != NULL) { + char *new_ip = strsep(&ip_list, " "); + // look for the IP in the list of clocks, and create an inert entry if not there + int t = find_clock_source_record(new_ip, clock_info, clock_private_info); + if (t == -1) + t = create_clock_source_record(new_ip, clock_info, clock_private_info, + 0); // don't use the mutex + + clock_info[t].timing_peer = 1; + } + + rc = pthread_mutex_unlock(&shared_memory->shm_mutex); + if (rc != 0) + warn("Can't release mutex after set_timing_peers!"); + + for (i = 0; i < MAX_CLOCKS; i++) { + if (clock_info[i].timing_peer != 0) + debug(3, "%s is in the timing peer group.", &clock_info[i].ip); + } + } else { + warn("Unrecognised string on the control port."); + } + } else { + warn("Bad packet on the control port."); + } +} + void handle_announce(char *buf, ssize_t recv_len, clock_source *clock_info, clock_source_private_data *clock_private_info, uint64_t reception_time) { // reject Announce messages from self @@ -32,6 +74,7 @@ void handle_announce(char *buf, ssize_t recv_len, clock_source *clock_info, // make way for the new time if ((size_t)recv_len >= sizeof(struct ptp_announce_message)) { struct ptp_announce_message *msg = (struct ptp_announce_message *)buf; + int i; // number of elements in the array is 4, hence the 4-1 stuff for (i = 4 - 1; i > 1 - 1; i--) { @@ -63,37 +106,45 @@ void handle_announce(char *buf, ssize_t recv_len, clock_source *clock_info, i++; } if (valid_count >= foreign_master_threshold) { - if (clock_private_info->announce_is_valid == 0) { + if (clock_info->qualified == 0) { uint64_t grandmaster_clock_id = nctohl(&msg->announce.grandmasterIdentity[0]); uint64_t grandmaster_clock_id_low = nctohl(&msg->announce.grandmasterIdentity[4]); grandmaster_clock_id = grandmaster_clock_id << 32; grandmaster_clock_id = grandmaster_clock_id + grandmaster_clock_id_low; - debug(1, - "clock_id %" PRIx64 " on ip: %s, \"Announce\" message is Qualified -- See 9.3.2.5.", + debug(2, + "clock_id %" PRIx64 " at: %s, \"Announce\" message is Qualified -- See 9.3.2.5.", clock_info->clock_id, clock_info->ip); uint32_t clockQuality = msg->announce.grandmasterClockQuality; uint8_t clockClass = (clockQuality >> 24) & 0xff; uint8_t clockAccuracy = (clockQuality >> 16) & 0xff; uint16_t offsetScaledLogVariance = clockQuality & 0xffff; - debug(1, " grandmasterIdentity: %" PRIx64 ".", grandmaster_clock_id); - debug(1, " grandmasterPriority1: %u.", msg->announce.grandmasterPriority1); - debug(1, " grandmasterClockQuality: 0x%x.", msg->announce.grandmasterClockQuality); - debug(1, " clockClass: %u.", clockClass); // See 7.6.2.4 clockClass - debug(1, " clockAccuracy: 0x%x.", + debug(2, " grandmasterIdentity: %" PRIx64 ".", grandmaster_clock_id); + debug(2, " grandmasterPriority1: %u.", msg->announce.grandmasterPriority1); + debug(2, " grandmasterClockQuality: 0x%x.", msg->announce.grandmasterClockQuality); + debug(2, " clockClass: %u.", clockClass); // See 7.6.2.4 clockClass + debug(2, " clockAccuracy: 0x%x.", clockAccuracy); // See 7.6.2.5 clockAccuracy - debug(1, " offsetScaledLogVariance: %x.", + debug(2, " offsetScaledLogVariance: 0x%x.", offsetScaledLogVariance); // See 7.6.3 PTP variance - debug(1, " grandmasterPriority2: %u.", msg->announce.grandmasterPriority2); + debug(2, " grandmasterPriority2: %u.", msg->announce.grandmasterPriority2); } - clock_private_info->announce_is_valid = 1; + if (pthread_mutex_lock(&shared_memory->shm_mutex) != 0) + warn("Can't acquire mutex to set_timing_peers!"); + clock_info->qualified = 1; + if (pthread_mutex_unlock(&shared_memory->shm_mutex) != 0) + warn("Can't release mutex after set_timing_peers!"); } else { - if (clock_private_info->announce_is_valid != 0) + if (clock_info->qualified != 0) debug(1, "clock_id %" PRIx64 " on ip: %s \"Announce\" message is not Qualified -- See 9.3.2.5.", clock_info->clock_id, clock_info->ip); - clock_private_info->announce_is_valid = 0; + if (pthread_mutex_lock(&shared_memory->shm_mutex) != 0) + warn("Can't acquire mutex to set_timing_peers!"); + clock_info->qualified = 0; + if (pthread_mutex_unlock(&shared_memory->shm_mutex) != 0) + warn("Can't release mutex after set_timing_peers!"); } } } diff --git a/nqptp-message-handlers.h b/nqptp-message-handlers.h index 2c7b36a..a882dde 100644 --- a/nqptp-message-handlers.h +++ b/nqptp-message-handlers.h @@ -26,4 +26,7 @@ void handle_announce(char *buf, ssize_t recv_len, clock_source *clock_info, clock_source_private_data *clock_private_info, uint64_t reception_time); +void handle_control_port_messages(char *buf, ssize_t recv_len, clock_source *clock_info, + clock_source_private_data *clock_private_info); + #endif \ No newline at end of file diff --git a/nqptp-shm-structures.h b/nqptp-shm-structures.h index 8832906..99cd4a7 100644 --- a/nqptp-shm-structures.h +++ b/nqptp-shm-structures.h @@ -23,6 +23,12 @@ #define STORAGE_ID "/nqptp" #define MAX_CLOCKS 32 #define NQPTP_SHM_STRUCTURES_VERSION 1 +#define NQPTP_CONTROL_PORT 9000 + +// the control port will accept a packet with the first word being: +// "set_timing_peers" followed by a space and then a space-delimited +// list of ip numbers, either IPv4 or IPv6 +// the whole not to exceed 4096 characters in total #include #include @@ -31,11 +37,12 @@ typedef struct { char ip[64]; // 64 is nicely aligned and bigger than INET6_ADDRSTRLEN (46) uint64_t clock_id; - uint64_t reserved; uint64_t local_time; // the local time when the offset was calculated uint64_t local_to_source_time_offset; // add this to the local time to get source time - int flags; // not used yet - int valid; // this entry is valid + uint8_t flags; // not used yet + uint8_t valid; // this entry is valid + uint8_t timing_peer; // true if this is in the current timing peer group + uint8_t qualified; // true if it has valid Announce messages } clock_source; struct shm_structure { diff --git a/nqptp-utilities.c b/nqptp-utilities.c index 170c88c..201bc7d 100644 --- a/nqptp-utilities.c +++ b/nqptp-utilities.c @@ -159,7 +159,7 @@ void debug_print_buffer(int level, char *buf, size_t buf_len) { debug(level, "SGNL: \"%s\".", obf); break; default: - debug(level, " \"%s\".", obf); + debug(1, "XXXX \"%s\".", obf); // output this at level 1 break; } free(obf); diff --git a/nqptp.c b/nqptp.c index 36d81d0..74931d9 100644 --- a/nqptp.c +++ b/nqptp.c @@ -137,6 +137,8 @@ int main(void) { open_sockets_at_port(319, &sockets_open_stuff); open_sockets_at_port(320, &sockets_open_stuff); + open_sockets_at_port(NQPTP_CONTROL_PORT, + &sockets_open_stuff); // this for messages from the client // open a shared memory interface. int shm_fd = -1; @@ -245,19 +247,30 @@ int main(void) { msg.msg_control = &control; msg.msg_controllen = sizeof(control); + uint16_t receiver_port = 0; // int msgsize = recv(udpsocket_fd, &msg_buffer, 4, 0); recv_len = recvmsg(socket_number, &msg, MSG_DONTWAIT); - if (recv_len != -1) - debug_print_buffer(2, buf, recv_len); - + if (recv_len != -1) { + // get the receiver port + unsigned int jp; + for (jp = 0; jp < sockets_open_stuff.sockets_open; jp++) { + if (socket_number == sockets_open_stuff.sockets[jp].number) + receiver_port = sockets_open_stuff.sockets[jp].port; + } + } if (recv_len == -1) { if (errno == EAGAIN) { usleep(1000); // this can happen, it seems... } else { debug(1, "recvmsg() error %d", errno); } + // check if it's a control port message before checking for the length of the message. + } else if (receiver_port == NQPTP_CONTROL_PORT) { + handle_control_port_messages(buf, recv_len, (clock_source *)&shared_memory->clocks, + (clock_source_private_data *)&clocks_private); } else if (recv_len >= (ssize_t)sizeof(struct ptp_common_message_header)) { + debug_print_buffer(2, buf, recv_len); debug(3, "Received %d bytes control message on reception.", msg.msg_controllen); // get the time int level, type; @@ -309,36 +322,24 @@ int main(void) { sender_port = ntohs(sa4->sin_port); } - // check here if the sender port and receiver port are the same - // find the socket in the socket list - uint16_t receiver_port = 0; - unsigned int jp; - for (jp = 0; jp < sockets_open_stuff.sockets_open; jp++) { - if (socket_number == sockets_open_stuff.sockets[jp].number) - receiver_port = sockets_open_stuff.sockets[jp].port; - } - if (sender_port == receiver_port) { char sender_string[256]; memset(sender_string, 0, sizeof(sender_string)); inet_ntop(connection_ip_family, sender_addr, sender_string, sizeof(sender_string)); - // now, find or create a record for this ip / clock_id combination - struct ptp_common_message_header *mt = (struct ptp_common_message_header *)buf; - uint64_t packet_clock_id = nctohl(&mt->clockIdentity[0]); - uint64_t packet_clock_id_low = nctohl(&mt->clockIdentity[4]); - packet_clock_id = packet_clock_id << 32; - packet_clock_id = packet_clock_id + packet_clock_id_low; - - int the_clock = find_clock_source_record( - sender_string, packet_clock_id, (clock_source *)&shared_memory->clocks, - (clock_source_private_data *)&clocks_private); + // now, find or create a record for this ip + int the_clock = + find_clock_source_record(sender_string, (clock_source *)&shared_memory->clocks, + (clock_source_private_data *)&clocks_private); + // not sure about requiring a Sync before creating it... if ((the_clock == -1) && ((buf[0] & 0xF) == Sync)) { the_clock = create_clock_source_record( - sender_string, packet_clock_id, (clock_source *)&shared_memory->clocks, - (clock_source_private_data *)&clocks_private); + sender_string, (clock_source *)&shared_memory->clocks, + (clock_source_private_data *)&clocks_private, 1); // the "1" means use mutexes } if (the_clock != -1) { + clocks_private[the_clock].time_of_last_use = + reception_time; // for garbage collection switch (buf[0] & 0xF) { case Announce: // needed to reject messages coming from self @@ -404,10 +405,16 @@ int main(void) { case Follow_Up: { struct ptp_follow_up_message *msg = (struct ptp_follow_up_message *)buf; + if ((clocks_private[the_clock].current_stage == sync_seen) && (clocks_private[the_clock].sequence_number == ntohs(msg->header.sequenceId))) { + uint64_t packet_clock_id = nctohl(&msg->header.clockIdentity[0]); + uint64_t packet_clock_id_low = nctohl(&msg->header.clockIdentity[4]); + packet_clock_id = packet_clock_id << 32; + packet_clock_id = packet_clock_id + packet_clock_id_low; + uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]); uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]); uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]); @@ -428,6 +435,9 @@ int main(void) { int rc = pthread_mutex_lock(&shared_memory->shm_mutex); if (rc != 0) warn("Can't acquire mutex to update a clock!"); + // update/set the clock_id + + shared_memory->clocks[the_clock].clock_id = packet_clock_id; shared_memory->clocks[the_clock].valid = 1; shared_memory->clocks[the_clock].local_time = clocks_private[the_clock].t2; shared_memory->clocks[the_clock].local_to_source_time_offset = offset;