#define FIELD_SIZEOF(t, f) (sizeof(((t *)0)->f))
#endif
-int find_clock_source_record(char *sender_string, uint64_t packet_clock_id,
- clock_source *clocks_shared_info,
+int find_clock_source_record(char *sender_string, clock_source *clocks_shared_info,
clock_source_private_data *clocks_private_info) {
// return the index of the clock in the clock information arrays or -1
int response = -1;
int found = 0;
while ((found == 0) && (i < MAX_CLOCKS)) {
if ((clocks_private_info[i].in_use != 0) &&
- (clocks_shared_info[i].clock_id == packet_clock_id) &&
(strcasecmp(sender_string, (const char *)&clocks_shared_info[i].ip) == 0))
found = 1;
else
return response;
}
-int create_clock_source_record(char *sender_string, uint64_t packet_clock_id,
- clock_source *clocks_shared_info,
- clock_source_private_data *clocks_private_info) {
+int create_clock_source_record(char *sender_string, clock_source *clocks_shared_info,
+ clock_source_private_data *clocks_private_info, int use_lock) {
+ // sometimes, the mutex will already be locked
// return the index of a clock entry in the clock information arrays or -1 if full
// initialise the entries in the shared and private arrays
int response = -1;
if (found == 1) {
response = i;
- int rc = pthread_mutex_lock(&shared_memory->shm_mutex);
- if (rc != 0)
- warn("Can't acquire mutex to activate a new clock!");
+ if (use_lock != 0) {
+ if (pthread_mutex_lock(&shared_memory->shm_mutex) != 0)
+ warn("Can't acquire mutex to activate a new clock!");
+ }
memset(&clocks_shared_info[i], 0, sizeof(clock_source));
strncpy((char *)&clocks_shared_info[i].ip, sender_string, FIELD_SIZEOF(clock_source, ip) - 1);
- clocks_shared_info[i].clock_id = packet_clock_id;
- rc = pthread_mutex_unlock(&shared_memory->shm_mutex);
- if (rc != 0)
- warn("Can't release mutex after activating a new clock!");
-
+ if (use_lock != 0) {
+ if (pthread_mutex_unlock(&shared_memory->shm_mutex) != 0)
+ warn("Can't release mutex after activating a new clock!");
+ }
memset(&clocks_private_info[i], 0, sizeof(clock_source_private_data));
clocks_private_info[i].in_use = 1;
clocks_private_info[i].t2 = 0;
clock_source_private_data *clocks_private_info) {
debug(3, "manage_clock_sources");
int i;
+ // do a garbage collect for clock records no longer in use
for (i = 0; i < MAX_CLOCKS; i++) {
- if (clocks_private_info[i].in_use != 0) {
- int64_t time_since_last_sync = reception_time - clocks_private_info[i].t2;
+ // only if its in use and not a timing peer... don't need a mutex to check
+ if ((clocks_private_info[i].in_use != 0) && (clocks_shared_info[i].timing_peer == 0)) {
+ int64_t time_since_last_use = reception_time - clocks_private_info[i].time_of_last_use;
+ // using a sync timeout to determine when to drop the record...
// the following give the sync receipt time in whole seconds
// depending on the aPTPinitialLogSyncInterval and the aPTPsyncReceiptTimeout
int64_t syncTimeout = (1 << (32 + aPTPinitialLogSyncInterval));
syncTimeout = syncTimeout >> 32;
// seconds to nanoseconds
syncTimeout = syncTimeout * 1000000000;
- if (time_since_last_sync > syncTimeout) {
+ if (time_since_last_use > syncTimeout) {
debug(2, "deactivated source %d with clock_id %" PRIx64 " on ip: %s.", i,
clocks_shared_info[i].clock_id, &clocks_shared_info[i].ip);
int rc = pthread_mutex_lock(&shared_memory->shm_mutex);
uint16_t in_use;
enum stage current_stage;
uint64_t t2;
-
+ // for garbage collection
+ uint64_t time_of_last_use; // will be taken out of use if not used for a while and not in the
+ // timing peer group
+ // (A member of the timing peer group could appear and disappear)
// for Announce Qualification
uint64_t announce_times[4]; // we'll check qualification and currency using these
- int announce_is_valid; // this may mean it's a master clock_source
int is_one_of_ours; // true if it is one of our own clocks
} clock_source_private_data;
-int find_clock_source_record(char *sender_string, uint64_t packet_clock_id,
- clock_source *clocks_shared_info,
+int find_clock_source_record(char *sender_string, clock_source *clocks_shared_info,
clock_source_private_data *clocks_private_info);
-int create_clock_source_record(char *sender_string, uint64_t packet_clock_id,
- clock_source *clocks_shared_info,
- clock_source_private_data *clocks_private_info);
+int create_clock_source_record(char *sender_string, clock_source *clocks_shared_info,
+ clock_source_private_data *clocks_private_info, int use_lock);
void update_clock_self_identifications(clock_source *clocks_shared_info,
clock_source_private_data *clocks_private_info);
*
* Commercial licensing is also available.
*/
-
#include "nqptp-message-handlers.h"
#include "nqptp-ptp-definitions.h"
#include "nqptp-utilities.h"
+#include <string.h>
#include "debug.h"
#include "general-utilities.h"
+void handle_control_port_messages(char *buf, ssize_t recv_len, clock_source *clock_info,
+ clock_source_private_data *clock_private_info) {
+ if (recv_len != -1) {
+ buf[recv_len - 1] = 0; // make sure there's a null in it!
+ if (strstr(buf, "set_timing_peers ") == buf) {
+ char *ip_list = buf + strlen("set_timing_peers ");
+
+ int rc = pthread_mutex_lock(&shared_memory->shm_mutex);
+ if (rc != 0)
+ warn("Can't acquire mutex to set_timing_peers!");
+ // turn off all is_timing_peers
+ int i;
+ for (i = 0; i < MAX_CLOCKS; i++)
+ clock_info[i].timing_peer = 0;
+
+ while (ip_list != NULL) {
+ char *new_ip = strsep(&ip_list, " ");
+ // look for the IP in the list of clocks, and create an inert entry if not there
+ int t = find_clock_source_record(new_ip, clock_info, clock_private_info);
+ if (t == -1)
+ t = create_clock_source_record(new_ip, clock_info, clock_private_info,
+ 0); // don't use the mutex
+
+ clock_info[t].timing_peer = 1;
+ }
+
+ rc = pthread_mutex_unlock(&shared_memory->shm_mutex);
+ if (rc != 0)
+ warn("Can't release mutex after set_timing_peers!");
+
+ for (i = 0; i < MAX_CLOCKS; i++) {
+ if (clock_info[i].timing_peer != 0)
+ debug(3, "%s is in the timing peer group.", &clock_info[i].ip);
+ }
+ } else {
+ warn("Unrecognised string on the control port.");
+ }
+ } else {
+ warn("Bad packet on the control port.");
+ }
+}
+
void handle_announce(char *buf, ssize_t recv_len, clock_source *clock_info,
clock_source_private_data *clock_private_info, uint64_t reception_time) {
// reject Announce messages from self
// make way for the new time
if ((size_t)recv_len >= sizeof(struct ptp_announce_message)) {
struct ptp_announce_message *msg = (struct ptp_announce_message *)buf;
+
int i;
// number of elements in the array is 4, hence the 4-1 stuff
for (i = 4 - 1; i > 1 - 1; i--) {
i++;
}
if (valid_count >= foreign_master_threshold) {
- if (clock_private_info->announce_is_valid == 0) {
+ if (clock_info->qualified == 0) {
uint64_t grandmaster_clock_id = nctohl(&msg->announce.grandmasterIdentity[0]);
uint64_t grandmaster_clock_id_low = nctohl(&msg->announce.grandmasterIdentity[4]);
grandmaster_clock_id = grandmaster_clock_id << 32;
grandmaster_clock_id = grandmaster_clock_id + grandmaster_clock_id_low;
- debug(1,
- "clock_id %" PRIx64 " on ip: %s, \"Announce\" message is Qualified -- See 9.3.2.5.",
+ debug(2,
+ "clock_id %" PRIx64 " at: %s, \"Announce\" message is Qualified -- See 9.3.2.5.",
clock_info->clock_id, clock_info->ip);
uint32_t clockQuality = msg->announce.grandmasterClockQuality;
uint8_t clockClass = (clockQuality >> 24) & 0xff;
uint8_t clockAccuracy = (clockQuality >> 16) & 0xff;
uint16_t offsetScaledLogVariance = clockQuality & 0xffff;
- debug(1, " grandmasterIdentity: %" PRIx64 ".", grandmaster_clock_id);
- debug(1, " grandmasterPriority1: %u.", msg->announce.grandmasterPriority1);
- debug(1, " grandmasterClockQuality: 0x%x.", msg->announce.grandmasterClockQuality);
- debug(1, " clockClass: %u.", clockClass); // See 7.6.2.4 clockClass
- debug(1, " clockAccuracy: 0x%x.",
+ debug(2, " grandmasterIdentity: %" PRIx64 ".", grandmaster_clock_id);
+ debug(2, " grandmasterPriority1: %u.", msg->announce.grandmasterPriority1);
+ debug(2, " grandmasterClockQuality: 0x%x.", msg->announce.grandmasterClockQuality);
+ debug(2, " clockClass: %u.", clockClass); // See 7.6.2.4 clockClass
+ debug(2, " clockAccuracy: 0x%x.",
clockAccuracy); // See 7.6.2.5 clockAccuracy
- debug(1, " offsetScaledLogVariance: %x.",
+ debug(2, " offsetScaledLogVariance: 0x%x.",
offsetScaledLogVariance); // See 7.6.3 PTP variance
- debug(1, " grandmasterPriority2: %u.", msg->announce.grandmasterPriority2);
+ debug(2, " grandmasterPriority2: %u.", msg->announce.grandmasterPriority2);
}
- clock_private_info->announce_is_valid = 1;
+ if (pthread_mutex_lock(&shared_memory->shm_mutex) != 0)
+ warn("Can't acquire mutex to set_timing_peers!");
+ clock_info->qualified = 1;
+ if (pthread_mutex_unlock(&shared_memory->shm_mutex) != 0)
+ warn("Can't release mutex after set_timing_peers!");
} else {
- if (clock_private_info->announce_is_valid != 0)
+ if (clock_info->qualified != 0)
debug(1,
"clock_id %" PRIx64
" on ip: %s \"Announce\" message is not Qualified -- See 9.3.2.5.",
clock_info->clock_id, clock_info->ip);
- clock_private_info->announce_is_valid = 0;
+ if (pthread_mutex_lock(&shared_memory->shm_mutex) != 0)
+ warn("Can't acquire mutex to set_timing_peers!");
+ clock_info->qualified = 0;
+ if (pthread_mutex_unlock(&shared_memory->shm_mutex) != 0)
+ warn("Can't release mutex after set_timing_peers!");
}
}
}
void handle_announce(char *buf, ssize_t recv_len, clock_source *clock_info,
clock_source_private_data *clock_private_info, uint64_t reception_time);
+void handle_control_port_messages(char *buf, ssize_t recv_len, clock_source *clock_info,
+ clock_source_private_data *clock_private_info);
+
#endif
\ No newline at end of file
#define STORAGE_ID "/nqptp"
#define MAX_CLOCKS 32
#define NQPTP_SHM_STRUCTURES_VERSION 1
+#define NQPTP_CONTROL_PORT 9000
+
+// the control port will accept a packet with the first word being:
+// "set_timing_peers" followed by a space and then a space-delimited
+// list of ip numbers, either IPv4 or IPv6
+// the whole not to exceed 4096 characters in total
#include <inttypes.h>
#include <netinet/in.h>
typedef struct {
char ip[64]; // 64 is nicely aligned and bigger than INET6_ADDRSTRLEN (46)
uint64_t clock_id;
- uint64_t reserved;
uint64_t local_time; // the local time when the offset was calculated
uint64_t local_to_source_time_offset; // add this to the local time to get source time
- int flags; // not used yet
- int valid; // this entry is valid
+ uint8_t flags; // not used yet
+ uint8_t valid; // this entry is valid
+ uint8_t timing_peer; // true if this is in the current timing peer group
+ uint8_t qualified; // true if it has valid Announce messages
} clock_source;
struct shm_structure {
debug(level, "SGNL: \"%s\".", obf);
break;
default:
- debug(level, " \"%s\".", obf);
+ debug(1, "XXXX \"%s\".", obf); // output this at level 1
break;
}
free(obf);
open_sockets_at_port(319, &sockets_open_stuff);
open_sockets_at_port(320, &sockets_open_stuff);
+ open_sockets_at_port(NQPTP_CONTROL_PORT,
+ &sockets_open_stuff); // this for messages from the client
// open a shared memory interface.
int shm_fd = -1;
msg.msg_control = &control;
msg.msg_controllen = sizeof(control);
+ uint16_t receiver_port = 0;
// int msgsize = recv(udpsocket_fd, &msg_buffer, 4, 0);
recv_len = recvmsg(socket_number, &msg, MSG_DONTWAIT);
- if (recv_len != -1)
- debug_print_buffer(2, buf, recv_len);
-
+ if (recv_len != -1) {
+ // get the receiver port
+ unsigned int jp;
+ for (jp = 0; jp < sockets_open_stuff.sockets_open; jp++) {
+ if (socket_number == sockets_open_stuff.sockets[jp].number)
+ receiver_port = sockets_open_stuff.sockets[jp].port;
+ }
+ }
if (recv_len == -1) {
if (errno == EAGAIN) {
usleep(1000); // this can happen, it seems...
} else {
debug(1, "recvmsg() error %d", errno);
}
+ // check if it's a control port message before checking for the length of the message.
+ } else if (receiver_port == NQPTP_CONTROL_PORT) {
+ handle_control_port_messages(buf, recv_len, (clock_source *)&shared_memory->clocks,
+ (clock_source_private_data *)&clocks_private);
} else if (recv_len >= (ssize_t)sizeof(struct ptp_common_message_header)) {
+ debug_print_buffer(2, buf, recv_len);
debug(3, "Received %d bytes control message on reception.", msg.msg_controllen);
// get the time
int level, type;
sender_port = ntohs(sa4->sin_port);
}
- // check here if the sender port and receiver port are the same
- // find the socket in the socket list
- uint16_t receiver_port = 0;
- unsigned int jp;
- for (jp = 0; jp < sockets_open_stuff.sockets_open; jp++) {
- if (socket_number == sockets_open_stuff.sockets[jp].number)
- receiver_port = sockets_open_stuff.sockets[jp].port;
- }
-
if (sender_port == receiver_port) {
char sender_string[256];
memset(sender_string, 0, sizeof(sender_string));
inet_ntop(connection_ip_family, sender_addr, sender_string, sizeof(sender_string));
- // now, find or create a record for this ip / clock_id combination
- struct ptp_common_message_header *mt = (struct ptp_common_message_header *)buf;
- uint64_t packet_clock_id = nctohl(&mt->clockIdentity[0]);
- uint64_t packet_clock_id_low = nctohl(&mt->clockIdentity[4]);
- packet_clock_id = packet_clock_id << 32;
- packet_clock_id = packet_clock_id + packet_clock_id_low;
-
- int the_clock = find_clock_source_record(
- sender_string, packet_clock_id, (clock_source *)&shared_memory->clocks,
- (clock_source_private_data *)&clocks_private);
+ // now, find or create a record for this ip
+ int the_clock =
+ find_clock_source_record(sender_string, (clock_source *)&shared_memory->clocks,
+ (clock_source_private_data *)&clocks_private);
+ // not sure about requiring a Sync before creating it...
if ((the_clock == -1) && ((buf[0] & 0xF) == Sync)) {
the_clock = create_clock_source_record(
- sender_string, packet_clock_id, (clock_source *)&shared_memory->clocks,
- (clock_source_private_data *)&clocks_private);
+ sender_string, (clock_source *)&shared_memory->clocks,
+ (clock_source_private_data *)&clocks_private, 1); // the "1" means use mutexes
}
if (the_clock != -1) {
+ clocks_private[the_clock].time_of_last_use =
+ reception_time; // for garbage collection
switch (buf[0] & 0xF) {
case Announce:
// needed to reject messages coming from self
case Follow_Up: {
struct ptp_follow_up_message *msg = (struct ptp_follow_up_message *)buf;
+
if ((clocks_private[the_clock].current_stage == sync_seen) &&
(clocks_private[the_clock].sequence_number ==
ntohs(msg->header.sequenceId))) {
+ uint64_t packet_clock_id = nctohl(&msg->header.clockIdentity[0]);
+ uint64_t packet_clock_id_low = nctohl(&msg->header.clockIdentity[4]);
+ packet_clock_id = packet_clock_id << 32;
+ packet_clock_id = packet_clock_id + packet_clock_id_low;
+
uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]);
uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]);
uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]);
int rc = pthread_mutex_lock(&shared_memory->shm_mutex);
if (rc != 0)
warn("Can't acquire mutex to update a clock!");
+ // update/set the clock_id
+
+ shared_memory->clocks[the_clock].clock_id = packet_clock_id;
shared_memory->clocks[the_clock].valid = 1;
shared_memory->clocks[the_clock].local_time = clocks_private[the_clock].t2;
shared_memory->clocks[the_clock].local_to_source_time_offset = offset;