From: Mike Brady <4265913+mikebrady@users.noreply.github.com> Date: Tue, 23 Mar 2021 19:18:09 +0000 (+0000) Subject: move to an array of sources. still problems X-Git-Tag: 1.1-dev~71 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4f5503e8d1af04ad89b007d63fc9b7ab82c02f27;p=thirdparty%2Fnqptp.git move to an array of sources. still problems --- diff --git a/nqptp-shm-structures.h b/nqptp-shm-structures.h index 8afa6f0..08bc9f2 100644 --- a/nqptp-shm-structures.h +++ b/nqptp-shm-structures.h @@ -33,8 +33,7 @@ struct clock_source { char ip[64]; // 64 is nicely aligned and bigger than INET6_ADDRSTRLEN (46) uint64_t clock_id; uint64_t reserved; - uint64_t source_time; // the time at the source at - uint64_t local_time; // the local time when the source time is valid + uint64_t local_time; // the local time when the offset was calculated uint64_t local_to_source_time_offset; // add this to the local time to get source time int flags; // not used yet int valid; // this entry is valid diff --git a/nqptp.c b/nqptp.c index 88a36b5..f5240e1 100644 --- a/nqptp.c +++ b/nqptp.c @@ -17,6 +17,8 @@ * Commercial licensing is also available. */ +#define DEBUG_LEVEL 2 + #include "debug.h" #include "nqptp-shm-structures.h" @@ -67,8 +69,6 @@ #define SIOCSHWTSTAMP 0x89b0 #endif -#define DEBUG_LEVEL 0 - // References from the IEEE Document ISBN 978-0-7381-5400-8 STD95773. // "IEEE Standard for a Precision Clock Synchronization Protocol for Networked Measurement and // Control Systems" The IEEE Std 1588-2008 (Revision of IEEE Std 1588-2002) @@ -109,7 +109,7 @@ struct timing_samples { } timing_samples; struct ptpSource { - char *ip; // ipv4 or ipv6 + char ip[64]; // ipv4 or ipv6 uint64_t clock_id; uint16_t sequence_number; enum stage current_stage; @@ -120,6 +120,7 @@ struct ptpSource { // go int shared_clock_number; // which entry to use in the shared memory, could be -1! uint64_t sample_number; // should roll over in 2^61 seconds! + int in_use; struct ptpSource *next; } ptpSource; @@ -131,11 +132,13 @@ struct socket_info { int number; uint16_t port; }; - +struct ptpSource sources[MAX_SHARED_CLOCKS]; struct socket_info sockets[MAX_OPEN_SOCKETS]; unsigned int sockets_open = 0; // also doubles as where to put next one, as sockets are never closed. struct shm_structure *shared_memory = NULL; +struct ptpSource *clocks = NULL; // a one-way linked list + // struct sockaddr_in6 is bigger than struct sockaddr. #ifdef AF_INET6 @@ -177,6 +180,75 @@ uint64_t get_time_now() { return timespec_to_ns(&tn); } +struct ptpSource *find_source(char *sender_string, uint64_t packet_clock_id) { + struct ptpSource *response = NULL; + int i = 0; + int found = 0; + while ((found == 0) && (i < MAX_SHARED_CLOCKS)) { + if ((sources[i].in_use != 0) && (sources[i].clock_id == packet_clock_id) && (strcasecmp(sender_string,(const char *)&sources[i].ip) == 0)) + found = 1; + else + i++; + } + if (found != 0) + response = &sources[i]; + return response; +} + +struct ptpSource *create_source(char *sender_string, uint64_t packet_clock_id) { + struct ptpSource *response = NULL; + int i = 0; + int found = 0; + while ((found == 0) && (i < MAX_SHARED_CLOCKS)) { + if (sources[i].in_use == 0) + found = 1; + else + i++; + } + if (found != 0) { + memset(&sources[i],0,sizeof(struct ptpSource)); + sources[i].in_use = 1; + strncpy((char *)&sources[i].ip, sender_string, sizeof(ptpSource.ip)-1); + sources[i].clock_id = packet_clock_id; + sources[i].shared_clock_number = -1; + response = &sources[i]; + debug(1,"activated source %d with clock_id %" PRIx64 " on ip: %s.", i, sources[i].clock_id, &sources[i].ip); + } else { + die("Clock table full!"); + } + return response; +} + +void deactivate_old_sources(uint64_t reception_time) { + int i; + for (i = 0; i < MAX_SHARED_CLOCKS; i++) { + if (sources[i].in_use != 0) { + int64_t time_since_last_sync = reception_time - sources[i].t2; + if (time_since_last_sync > 1000000000) { + if (sources[i].shared_clock_number != -1) { + shared_memory->clocks[sources[i].shared_clock_number].valid = 0; + debug(1,"deactivated shared clock %d with clock_id %" PRIx64 " on ip: %s.",sources[i].shared_clock_number, shared_memory->clocks[sources[i].shared_clock_number].clock_id, &shared_memory->clocks[sources[i].shared_clock_number].ip); + } + sources[i].in_use = 0; + debug(1,"deactivated source %d with clock_id %" PRIx64 " on ip: %s.", i, sources[i].clock_id, &sources[i].ip); + } + } + } +} + + + + +/* +void listSourceRecords(struct ptpSource *list) { + struct ptpSource *p = list; + while (p != NULL) { + debug(1,"Clock ID: '%" PRIx64 "' at %s, with shm entry %d.", p->clock_id, p->ip, p->shared_clock_number); + p = p->next; + } +} + + struct ptpSource *findOrCreateSource(struct ptpSource **list, char *ip, uint64_t clock_id, uint8_t message_type) { struct ptpSource *response; @@ -206,6 +278,8 @@ struct ptpSource *findOrCreateSource(struct ptpSource **list, char *ip, uint64_t } else { // only create a record for a Sync message if (message_type == Sync) { + debug(1,"New before"); + listSourceRecords(clocks); response = (struct ptpSource *)malloc(sizeof(ptpSource)); if (response != NULL) { memset((void *)response, 0, sizeof(ptpSource)); @@ -214,11 +288,14 @@ struct ptpSource *findOrCreateSource(struct ptpSource **list, char *ip, uint64_t response->vacant_samples = MAX_TIMING_SAMPLES; // no valid samples yet response->shared_clock_number = -1; // none allocated yet. Hacky *insertion_point = response; - debug(2, - "Clock record created for Clock ID: '%" PRIu64 "', aka '%" PRIu64 "', aka '%" PRIx64 + debug(1, + "New clock record created for Clock ID: '%" PRIu64 "', aka '%" PRIu64 "', aka '%" PRIx64 "' at %s.", clock_id, clock_id, clock_id, ip); } + debug(1,"New after"); + listSourceRecords(clocks); + } else { response = NULL; } @@ -230,14 +307,18 @@ void deleteObseleteClockRecords(struct ptpSource **list, uint64_t time_now) { // debug(1,"delete -- time now: % " PRIx64 ".", time_now); struct ptpSource **temp = list; + debug(1,"check"); while (*temp != NULL) { struct ptpSource *p = *temp; + int deleting_something = 0; int64_t time_since_last_use = time_now - p->t2; // this is the time of the last sync record - debug(2, "checking record for Clock ID %" PRIx64 " at %s. Time difference is %" PRId64 ".", - p->clock_id, p->ip, time_since_last_use); + debug(1, "checking record for Clock ID %" PRIx64 " at %s. Shared clock is %d Time difference is %" PRId64 ".", + p->clock_id, p->ip, p->shared_clock_number, time_since_last_use); - if (time_since_last_use > 5000000000) { // drop them if idle - debug(2, "delete record for Clock ID %" PRIx64 " at %s.", p->clock_id, p->ip); + if (time_since_last_use > 1000000000) { // drop them if idle + debug(1,"Delete Before"); + listSourceRecords(clocks); + debug(1, "delete record for Clock ID %" PRIx64 " at %s.", p->clock_id, p->ip); if (p->shared_clock_number != -1) { int rc = pthread_mutex_lock(&shared_memory->shm_mutex); if (rc != 0) @@ -251,11 +332,14 @@ void deleteObseleteClockRecords(struct ptpSource **list, uint64_t time_now) { *temp = p->next; free(p->ip); // the IP was strdup'ed in free(p); + debug(1,"Delete After:"); + listSourceRecords(clocks); } else { temp = &p->next; } } } +*/ void debug_print_buffer(int level, char *buf, size_t buf_len) { // printf("Received %u bytes in a packet from %s:%d\n", buf_len, inet_ntoa(si_other.sin_addr), @@ -330,6 +414,9 @@ void termHandler(__attribute__((unused)) int k) { } int main(void) { + clocks = NULL; + shared_memory = NULL; + // memset(sources,0,sizeof(sources)); // level 0 is no messages, level 3 is most messages -- see debug.h debug_init(DEBUG_LEVEL, 0, 1, 1); debug(1, "startup"); @@ -346,7 +433,6 @@ int main(void) { sigaction(SIGTERM, &act2, NULL); ssize_t recv_len; - struct ptpSource *clocks = NULL; // a one-way linked list char buf[BUFLEN]; @@ -742,9 +828,11 @@ int main(void) { uint64_t packet_clock_id_low = nctohl(&mt->clockIdentity[4]); packet_clock_id = packet_clock_id << 32; packet_clock_id = packet_clock_id + packet_clock_id_low; - struct ptpSource *the_clock = - findOrCreateSource(&clocks, sender_string, packet_clock_id, - buf[0] & 0xF); // only create a record for a SYNC + + struct ptpSource *the_clock = find_source(sender_string, packet_clock_id); + if ((the_clock == NULL) && ((buf[0] & 0xF) == Sync)) { + the_clock = create_source(sender_string, packet_clock_id); + } if (the_clock != NULL) { switch (buf[0] & 0xF) { case Sync: { // if it's a sync @@ -779,37 +867,6 @@ int main(void) { } if (discard_sync == 0) { - // if necessary, initialise a new shared clock record - // hacky. - if (the_clock->shared_clock_number == -1) { - if (next_free_clock_source_entry == MAX_SHARED_CLOCKS) - die("No more shared clocks!"); - // associate and initialise a shared clock record - int i = 0; - while ((shared_memory->clocks[i].valid != 0) && (i < MAX_SHARED_CLOCKS)) { - i++; - } - if (i == MAX_SHARED_CLOCKS) - die("All %d clock entries are in use -- no more available!", - MAX_SHARED_CLOCKS); - the_clock->shared_clock_number = i; - int rc = pthread_mutex_lock(&shared_memory->shm_mutex); - if (rc != 0) - die("Can't acquire mutex to initialise a clock!"); - memset(&shared_memory->clocks[i], 0, sizeof(struct clock_source)); - strncpy((char *)&shared_memory->clocks[i].ip, the_clock->ip, - INET6_ADDRSTRLEN - 1); - shared_memory->clocks[i].clock_id = the_clock->clock_id; - shared_memory->clocks[i].valid = 1; - rc = pthread_mutex_unlock(&shared_memory->shm_mutex); - if (rc != 0) - die("Can't release mutex after initialising a clock!"); - debug(2, - "shared memory clock entry %d created for Clock ID: '%" PRIx64 - "' at %s.", - i, the_clock->clock_id, the_clock->ip); - } - the_clock->sequence_number = ntohs(msg->header.sequenceId); the_clock->t2 = reception_time; memset(&m, 0, sizeof(m)); @@ -1215,22 +1272,53 @@ int main(void) { // here, update the shared clock information + + + int rc = pthread_mutex_lock(&shared_memory->shm_mutex); if (rc != 0) warn("Can't acquire mutex to update a clock!"); + + // if necessary, initialise a new shared clock record + // hacky. + + if (the_clock->shared_clock_number == -1) { + + // associate and initialise a shared clock record + int i = 0; + while ((shared_memory->clocks[i].valid != 0) && (i < MAX_SHARED_CLOCKS)) { + i++; + } + if (i == MAX_SHARED_CLOCKS) + die("All %d clock entries are in use -- no more available!", MAX_SHARED_CLOCKS); + the_clock->shared_clock_number = i; + + strncpy((char *)&shared_memory->clocks[i].ip, (const char *)&the_clock->ip, + INET6_ADDRSTRLEN - 1); + shared_memory->clocks[i].clock_id = the_clock->clock_id; + shared_memory->clocks[i].valid = 1; + shared_memory->clocks[i].reserved = 0; + shared_memory->clocks[i].flags = 0; + debug(1, + "shared memory clock entry %d created for Clock ID: '%" PRIx64 + "' at %s.", + i, the_clock->clock_id, the_clock->ip); + + } + + // now update the clock shared_memory->clocks[the_clock->shared_clock_number].local_time = the_clock->t2; - shared_memory->clocks[the_clock->shared_clock_number].source_time = - estimated_offset + the_clock->t2; shared_memory->clocks[the_clock->shared_clock_number] .local_to_source_time_offset = estimated_offset; + rc = pthread_mutex_unlock(&shared_memory->shm_mutex); if (rc != 0) warn("Can't release mutex after updating a clock!"); // clang-format off - debug(2,"id: %20" PRIu64 ", time: 0x%" PRIx64 + debug(3,"id: %20" PRIu64 ", time: 0x%" PRIx64 ", offset: %" PRIx64 ", variation: %+f, turnaround: %f, ip: %s, sequence: %u samples: %d.", the_clock->clock_id, the_clock->t2 + estimated_offset, @@ -1276,8 +1364,9 @@ int main(void) { // check errno/WSAGetLastError(), call perror(), etc ... } // here, invalidate records and entries that are out of date - uint64_t tn = get_time_now(); - deleteObseleteClockRecords(&clocks, tn); + //uint64_t tn = get_time_now(); + debug(1,"check for obsolete sources at time %" PRIx64 ".", reception_time); + deactivate_old_sources(reception_time); } }