* Commercial licensing is also available.
*/
+#define DEBUG_LEVEL 2
+
#include "debug.h"
#include "nqptp-shm-structures.h"
#define SIOCSHWTSTAMP 0x89b0
#endif
-#define DEBUG_LEVEL 0
-
// References from the IEEE Document ISBN 978-0-7381-5400-8 STD95773.
// "IEEE Standard for a Precision Clock Synchronization Protocol for Networked Measurement and
// Control Systems" The IEEE Std 1588-2008 (Revision of IEEE Std 1588-2002)
} timing_samples;
struct ptpSource {
- char *ip; // ipv4 or ipv6
+ char ip[64]; // ipv4 or ipv6
uint64_t clock_id;
uint16_t sequence_number;
enum stage current_stage;
// go
int shared_clock_number; // which entry to use in the shared memory, could be -1!
uint64_t sample_number; // should roll over in 2^61 seconds!
+ int in_use;
struct ptpSource *next;
} ptpSource;
int number;
uint16_t port;
};
-
+struct ptpSource sources[MAX_SHARED_CLOCKS];
struct socket_info sockets[MAX_OPEN_SOCKETS];
unsigned int sockets_open =
0; // also doubles as where to put next one, as sockets are never closed.
struct shm_structure *shared_memory = NULL;
+struct ptpSource *clocks = NULL; // a one-way linked list
+
// struct sockaddr_in6 is bigger than struct sockaddr.
#ifdef AF_INET6
return timespec_to_ns(&tn);
}
+struct ptpSource *find_source(char *sender_string, uint64_t packet_clock_id) {
+ struct ptpSource *response = NULL;
+ int i = 0;
+ int found = 0;
+ while ((found == 0) && (i < MAX_SHARED_CLOCKS)) {
+ if ((sources[i].in_use != 0) && (sources[i].clock_id == packet_clock_id) && (strcasecmp(sender_string,(const char *)&sources[i].ip) == 0))
+ found = 1;
+ else
+ i++;
+ }
+ if (found != 0)
+ response = &sources[i];
+ return response;
+}
+
+struct ptpSource *create_source(char *sender_string, uint64_t packet_clock_id) {
+ struct ptpSource *response = NULL;
+ int i = 0;
+ int found = 0;
+ while ((found == 0) && (i < MAX_SHARED_CLOCKS)) {
+ if (sources[i].in_use == 0)
+ found = 1;
+ else
+ i++;
+ }
+ if (found != 0) {
+ memset(&sources[i],0,sizeof(struct ptpSource));
+ sources[i].in_use = 1;
+ strncpy((char *)&sources[i].ip, sender_string, sizeof(ptpSource.ip)-1);
+ sources[i].clock_id = packet_clock_id;
+ sources[i].shared_clock_number = -1;
+ response = &sources[i];
+ debug(1,"activated source %d with clock_id %" PRIx64 " on ip: %s.", i, sources[i].clock_id, &sources[i].ip);
+ } else {
+ die("Clock table full!");
+ }
+ return response;
+}
+
+void deactivate_old_sources(uint64_t reception_time) {
+ int i;
+ for (i = 0; i < MAX_SHARED_CLOCKS; i++) {
+ if (sources[i].in_use != 0) {
+ int64_t time_since_last_sync = reception_time - sources[i].t2;
+ if (time_since_last_sync > 1000000000) {
+ if (sources[i].shared_clock_number != -1) {
+ shared_memory->clocks[sources[i].shared_clock_number].valid = 0;
+ debug(1,"deactivated shared clock %d with clock_id %" PRIx64 " on ip: %s.",sources[i].shared_clock_number, shared_memory->clocks[sources[i].shared_clock_number].clock_id, &shared_memory->clocks[sources[i].shared_clock_number].ip);
+ }
+ sources[i].in_use = 0;
+ debug(1,"deactivated source %d with clock_id %" PRIx64 " on ip: %s.", i, sources[i].clock_id, &sources[i].ip);
+ }
+ }
+ }
+}
+
+
+
+
+/*
+void listSourceRecords(struct ptpSource *list) {
+ struct ptpSource *p = list;
+ while (p != NULL) {
+ debug(1,"Clock ID: '%" PRIx64 "' at %s, with shm entry %d.", p->clock_id, p->ip, p->shared_clock_number);
+ p = p->next;
+ }
+}
+
+
struct ptpSource *findOrCreateSource(struct ptpSource **list, char *ip, uint64_t clock_id,
uint8_t message_type) {
struct ptpSource *response;
} else {
// only create a record for a Sync message
if (message_type == Sync) {
+ debug(1,"New before");
+ listSourceRecords(clocks);
response = (struct ptpSource *)malloc(sizeof(ptpSource));
if (response != NULL) {
memset((void *)response, 0, sizeof(ptpSource));
response->vacant_samples = MAX_TIMING_SAMPLES; // no valid samples yet
response->shared_clock_number = -1; // none allocated yet. Hacky
*insertion_point = response;
- debug(2,
- "Clock record created for Clock ID: '%" PRIu64 "', aka '%" PRIu64 "', aka '%" PRIx64
+ debug(1,
+ "New clock record created for Clock ID: '%" PRIu64 "', aka '%" PRIu64 "', aka '%" PRIx64
"' at %s.",
clock_id, clock_id, clock_id, ip);
}
+ debug(1,"New after");
+ listSourceRecords(clocks);
+
} else {
response = NULL;
}
// debug(1,"delete -- time now: % " PRIx64 ".", time_now);
struct ptpSource **temp = list;
+ debug(1,"check");
while (*temp != NULL) {
struct ptpSource *p = *temp;
+ int deleting_something = 0;
int64_t time_since_last_use = time_now - p->t2; // this is the time of the last sync record
- debug(2, "checking record for Clock ID %" PRIx64 " at %s. Time difference is %" PRId64 ".",
- p->clock_id, p->ip, time_since_last_use);
+ debug(1, "checking record for Clock ID %" PRIx64 " at %s. Shared clock is %d Time difference is %" PRId64 ".",
+ p->clock_id, p->ip, p->shared_clock_number, time_since_last_use);
- if (time_since_last_use > 5000000000) { // drop them if idle
- debug(2, "delete record for Clock ID %" PRIx64 " at %s.", p->clock_id, p->ip);
+ if (time_since_last_use > 1000000000) { // drop them if idle
+ debug(1,"Delete Before");
+ listSourceRecords(clocks);
+ debug(1, "delete record for Clock ID %" PRIx64 " at %s.", p->clock_id, p->ip);
if (p->shared_clock_number != -1) {
int rc = pthread_mutex_lock(&shared_memory->shm_mutex);
if (rc != 0)
*temp = p->next;
free(p->ip); // the IP was strdup'ed in
free(p);
+ debug(1,"Delete After:");
+ listSourceRecords(clocks);
} else {
temp = &p->next;
}
}
}
+*/
void debug_print_buffer(int level, char *buf, size_t buf_len) {
// printf("Received %u bytes in a packet from %s:%d\n", buf_len, inet_ntoa(si_other.sin_addr),
}
int main(void) {
+ clocks = NULL;
+ shared_memory = NULL;
+ // memset(sources,0,sizeof(sources));
// level 0 is no messages, level 3 is most messages -- see debug.h
debug_init(DEBUG_LEVEL, 0, 1, 1);
debug(1, "startup");
sigaction(SIGTERM, &act2, NULL);
ssize_t recv_len;
- struct ptpSource *clocks = NULL; // a one-way linked list
char buf[BUFLEN];
uint64_t packet_clock_id_low = nctohl(&mt->clockIdentity[4]);
packet_clock_id = packet_clock_id << 32;
packet_clock_id = packet_clock_id + packet_clock_id_low;
- struct ptpSource *the_clock =
- findOrCreateSource(&clocks, sender_string, packet_clock_id,
- buf[0] & 0xF); // only create a record for a SYNC
+
+ struct ptpSource *the_clock = find_source(sender_string, packet_clock_id);
+ if ((the_clock == NULL) && ((buf[0] & 0xF) == Sync)) {
+ the_clock = create_source(sender_string, packet_clock_id);
+ }
if (the_clock != NULL) {
switch (buf[0] & 0xF) {
case Sync: { // if it's a sync
}
if (discard_sync == 0) {
- // if necessary, initialise a new shared clock record
- // hacky.
- if (the_clock->shared_clock_number == -1) {
- if (next_free_clock_source_entry == MAX_SHARED_CLOCKS)
- die("No more shared clocks!");
- // associate and initialise a shared clock record
- int i = 0;
- while ((shared_memory->clocks[i].valid != 0) && (i < MAX_SHARED_CLOCKS)) {
- i++;
- }
- if (i == MAX_SHARED_CLOCKS)
- die("All %d clock entries are in use -- no more available!",
- MAX_SHARED_CLOCKS);
- the_clock->shared_clock_number = i;
- int rc = pthread_mutex_lock(&shared_memory->shm_mutex);
- if (rc != 0)
- die("Can't acquire mutex to initialise a clock!");
- memset(&shared_memory->clocks[i], 0, sizeof(struct clock_source));
- strncpy((char *)&shared_memory->clocks[i].ip, the_clock->ip,
- INET6_ADDRSTRLEN - 1);
- shared_memory->clocks[i].clock_id = the_clock->clock_id;
- shared_memory->clocks[i].valid = 1;
- rc = pthread_mutex_unlock(&shared_memory->shm_mutex);
- if (rc != 0)
- die("Can't release mutex after initialising a clock!");
- debug(2,
- "shared memory clock entry %d created for Clock ID: '%" PRIx64
- "' at %s.",
- i, the_clock->clock_id, the_clock->ip);
- }
-
the_clock->sequence_number = ntohs(msg->header.sequenceId);
the_clock->t2 = reception_time;
memset(&m, 0, sizeof(m));
// here, update the shared clock information
+
+
+
int rc = pthread_mutex_lock(&shared_memory->shm_mutex);
if (rc != 0)
warn("Can't acquire mutex to update a clock!");
+
+ // if necessary, initialise a new shared clock record
+ // hacky.
+
+ if (the_clock->shared_clock_number == -1) {
+
+ // associate and initialise a shared clock record
+ int i = 0;
+ while ((shared_memory->clocks[i].valid != 0) && (i < MAX_SHARED_CLOCKS)) {
+ i++;
+ }
+ if (i == MAX_SHARED_CLOCKS)
+ die("All %d clock entries are in use -- no more available!", MAX_SHARED_CLOCKS);
+ the_clock->shared_clock_number = i;
+
+ strncpy((char *)&shared_memory->clocks[i].ip, (const char *)&the_clock->ip,
+ INET6_ADDRSTRLEN - 1);
+ shared_memory->clocks[i].clock_id = the_clock->clock_id;
+ shared_memory->clocks[i].valid = 1;
+ shared_memory->clocks[i].reserved = 0;
+ shared_memory->clocks[i].flags = 0;
+ debug(1,
+ "shared memory clock entry %d created for Clock ID: '%" PRIx64
+ "' at %s.",
+ i, the_clock->clock_id, the_clock->ip);
+
+ }
+
+ // now update the clock
shared_memory->clocks[the_clock->shared_clock_number].local_time =
the_clock->t2;
- shared_memory->clocks[the_clock->shared_clock_number].source_time =
- estimated_offset + the_clock->t2;
shared_memory->clocks[the_clock->shared_clock_number]
.local_to_source_time_offset = estimated_offset;
+
rc = pthread_mutex_unlock(&shared_memory->shm_mutex);
if (rc != 0)
warn("Can't release mutex after updating a clock!");
// clang-format off
- debug(2,"id: %20" PRIu64 ", time: 0x%" PRIx64
+ debug(3,"id: %20" PRIu64 ", time: 0x%" PRIx64
", offset: %" PRIx64
", variation: %+f, turnaround: %f, ip: %s, sequence: %u samples: %d.",
the_clock->clock_id, the_clock->t2 + estimated_offset,
// check errno/WSAGetLastError(), call perror(), etc ...
}
// here, invalidate records and entries that are out of date
- uint64_t tn = get_time_now();
- deleteObseleteClockRecords(&clocks, tn);
+ //uint64_t tn = get_time_now();
+ debug(1,"check for obsolete sources at time %" PRIx64 ".", reception_time);
+ deactivate_old_sources(reception_time);
}
}