From: Mike Brady <4265913+mikebrady@users.noreply.github.com> Date: Sun, 21 Mar 2021 09:46:34 +0000 (+0000) Subject: Include Clock ID in records and public record. garbage collection written but not... X-Git-Tag: 1.1-dev~94 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=2a1f8c5430c72196746ed4f466b7ab1d3ecfcc53;p=thirdparty%2Fnqptp.git Include Clock ID in records and public record. garbage collection written but not tested. Generation of clock id needs overhaul --- diff --git a/nqptp-shm-structures.h b/nqptp-shm-structures.h index 07350ba..0adf0f6 100644 --- a/nqptp-shm-structures.h +++ b/nqptp-shm-structures.h @@ -28,23 +28,22 @@ #include struct clock_source { - char ip[64]; // 64 is nicely aligned and bigger than INET6_ADDRSTRLEN (46) - uint64_t source_time; // the time at the source at - uint64_t local_time; // the local time when the source time is valid - uint64_t local_to_source_time_offset; // add this to the local time to get source time - int flags; // not used yet - int valid; // this entry is valid -}; - -struct shm_basic_structure { + char ip[64]; // 64 is nicely aligned and bigger than INET6_ADDRSTRLEN (46) + uint64_t clock_id; + uint64_t reserved; + uint64_t source_time; // the time at the source at + uint64_t local_time; // the local time when the source time is valid + uint64_t local_to_source_time_offset; // add this to the local time to get source time + int flags; // not used yet + int valid; // this entry is valid }; struct shm_structure { - pthread_mutex_t shm_mutex; // for safely accessing the structure - uint16_t size_of_clock_array; // check this is equal to MAX_SHARED_CLOCKS - uint16_t version; // check this is equal to NQPTP_SHM_STRUCTURES_VERSION - uint32_t flags; - struct clock_source clocks[MAX_SHARED_CLOCKS]; + pthread_mutex_t shm_mutex; // for safely accessing the structure + uint16_t size_of_clock_array; // check this is equal to MAX_SHARED_CLOCKS + uint16_t version; // check this is equal to NQPTP_SHM_STRUCTURES_VERSION + uint32_t flags; + struct clock_source clocks[MAX_SHARED_CLOCKS]; }; #endif diff --git a/nqptp.c b/nqptp.c index 3eb60f7..2b6c711 100644 --- a/nqptp.c +++ b/nqptp.c @@ -17,7 +17,7 @@ * Commercial licensing is also available. */ - #include "nqptp-shm-structures.h" +#include "nqptp-shm-structures.h" #include #include //printf @@ -51,20 +51,19 @@ #include #ifndef SO_TIMESTAMPING -# define SO_TIMESTAMPING 37 -# define SCM_TIMESTAMPING SO_TIMESTAMPING +#define SO_TIMESTAMPING 37 +#define SCM_TIMESTAMPING SO_TIMESTAMPING #endif #ifndef SO_TIMESTAMPNS -# define SO_TIMESTAMPNS 35 +#define SO_TIMESTAMPNS 35 #endif #ifndef SIOCGSTAMPNS -# define SIOCGSTAMPNS 0x8907 +#define SIOCGSTAMPNS 0x8907 #endif #ifndef SIOCSHWTSTAMP -# define SIOCSHWTSTAMP 0x89b0 +#define SIOCSHWTSTAMP 0x89b0 #endif - // References from the IEEE Document ISBN 978-0-7381-5400-8 STD95773. // "IEEE Standard for a Precision Clock Synchronization Protocol for Networked Measurement and // Control Systems" The IEEE Std 1588-2008 (Revision of IEEE Std 1588-2002) @@ -105,8 +104,8 @@ struct timing_samples { } timing_samples; struct ptpSource { - char *ip; // ipv4 or ipv6 - + char *ip; // ipv4 or ipv6 + uint64_t clock_id; uint16_t sequence_number; enum stage current_stage; uint64_t t1, t2, t3, t4, t5, previous_offset, previous_estimated_offset; @@ -114,8 +113,8 @@ struct ptpSource { int vacant_samples; // the number of elements in the timing_samples array that are not yet used int next_sample_goes_here; // point to where in the timing samples array the next entries should // go - int shared_clock_number; // which entry to use in the shared memory, could be zero! - uint64_t sample_number; // should roll over in 2^61 seconds! + int shared_clock_number; // which entry to use in the shared memory, could be -1! + uint64_t sample_number; // should roll over in 2^61 seconds! struct ptpSource *next; } ptpSource; @@ -123,7 +122,6 @@ struct ptpSource { #define MAX_OPEN_SOCKETS 32 // up to 32 sockets open on ports 319 and 320 - struct socket_info { int number; uint16_t port; @@ -136,7 +134,6 @@ unsigned int sockets_open = struct shm_structure *shared_memory = 0; - // struct sockaddr_in6 is bigger than struct sockaddr. #ifdef AF_INET6 #define SOCKADDR struct sockaddr_storage @@ -182,7 +179,7 @@ uint64_t get_time_now() { return timespec_to_ns(&tn); } -struct ptpSource *findOrCreateSource(struct ptpSource **list, char *ip) { +struct ptpSource *findOrCreateSource(struct ptpSource **list, char *ip, uint64_t clock_id) { struct ptpSource *response; struct ptpSource **insertion_point = list; // in case the list is empty struct ptpSource *crawler = *list; @@ -190,10 +187,11 @@ struct ptpSource *findOrCreateSource(struct ptpSource **list, char *ip) { // fprintf(stderr, "No clocks recorded\n"); insertion_point = list; } else { - while ((crawler->next != NULL) && (strcasecmp(ip, crawler->ip) != 0)) { + while ((crawler->next != NULL) && + ((crawler->clock_id != clock_id) || (strcasecmp(ip, crawler->ip) != 0))) { crawler = crawler->next; } - if (strcasecmp(ip, crawler->ip) == 0) { + if ((crawler->clock_id == clock_id) && (strcasecmp(ip, crawler->ip) == 0)) { // found, so no insertion insertion_point = NULL; } else { @@ -212,8 +210,9 @@ struct ptpSource *findOrCreateSource(struct ptpSource **list, char *ip) { if (response != NULL) { memset((void *)response, 0, sizeof(ptpSource)); response->ip = strdup(ip); + response->clock_id = clock_id; response->vacant_samples = MAX_TIMING_SAMPLES; // no valid samples yet - response->shared_clock_number = -1; // none allocated yet. Hacky + response->shared_clock_number = -1; // none allocated yet. Hacky *insertion_point = response; fprintf(stderr, "Clock record created for \"%s\".\n", ip); } @@ -221,6 +220,30 @@ struct ptpSource *findOrCreateSource(struct ptpSource **list, char *ip) { return response; } +void deleteObseleteClockRecords(struct ptpSource **list, uint64_t time_now) { + struct ptpSource **temp = list; + + while (*temp != NULL) { + struct ptpSource *p = *temp; + int64_t time_since_last_use = time_now - p->t2; // this is the time of the last sync record + if (time_since_last_use > 60000000000) { + // discard this one... + if (p->shared_clock_number != -1) { + + int rc = pthread_mutex_lock(&shared_memory->shm_mutex); + if (rc != 0) + fprintf(stderr, "Can't acquire mutex to delete a clock!\n"); + memset(&shared_memory->clocks[p->shared_clock_number], 0, sizeof(struct clock_source)); + rc = pthread_mutex_unlock(&shared_memory->shm_mutex); + if (rc != 0) + fprintf(stderr, "Can't release mutex after deleting a clock!\n"); + } + temp = &p->next; + free(p); + } + } +} + void print_buffer(char *buf, size_t buf_len) { uint64_t time_now = get_time_now(); if (time_then == 0) { @@ -358,7 +381,6 @@ int main(void) { } (void)umask(oldumask); - if (fchown(shm_fd, -1, grp != NULL ? grp->gr_gid : 0) < 0) { fprintf(stderr, "Failed to set ownership.\n"); } @@ -439,15 +461,15 @@ int main(void) { setsockopt(fd, SOL_SOCKET, SO_TIMESTAMPING, &so_timestamping_flags, sizeof(so_timestamping_flags)); -/* - int val = 0; - socklen_t len = sizeof(val); - if (getsockopt(fd, SOL_SOCKET, SO_TIMESTAMPING, &val, &len) < 0) - fprintf(stderr, "%s: %s\n", "getsockopt SO_TIMESTAMPING", strerror(errno)); - else - fprintf(stderr, "SO_TIMESTAMPING requested: %d, obtained: %d\n", so_timestamping_flags, - val); -*/ + /* + int val = 0; + socklen_t len = sizeof(val); + if (getsockopt(fd, SOL_SOCKET, SO_TIMESTAMPING, &val, &len) < 0) + fprintf(stderr, "%s: %s\n", "getsockopt SO_TIMESTAMPING", strerror(errno)); + else + fprintf(stderr, "SO_TIMESTAMPING requested: %d, obtained: %d\n", + so_timestamping_flags, val); + */ /* if (ret == 0) setsockopt(fd, SOL_SOCKET, SO_TIMESTAMPNS, &yes, sizeof(yes)); @@ -513,14 +535,14 @@ int main(void) { if (ret == 0) setsockopt(fd, SOL_SOCKET, SO_TIMESTAMPING, &so_timestamping_flags, sizeof(so_timestamping_flags)); -/* int val; - socklen_t len = sizeof(val); - if (getsockopt(fd, SOL_SOCKET, SO_TIMESTAMPING, &val, &len) < 0) - fprintf(stderr, "%s: %s\n", "getsockopt SO_TIMESTAMPING", strerror(errno)); - else - fprintf(stderr, "SO_TIMESTAMPING requested: %d, obtained: %d\n", so_timestamping_flags, - val); -*/ + /* int val; + socklen_t len = sizeof(val); + if (getsockopt(fd, SOL_SOCKET, SO_TIMESTAMPING, &val, &len) < 0) + fprintf(stderr, "%s: %s\n", "getsockopt SO_TIMESTAMPING", strerror(errno)); + else + fprintf(stderr, "SO_TIMESTAMPING requested: %d, obtained: %d\n", + so_timestamping_flags, val); + */ /* if (ret == 0) setsockopt(fd, SOL_SOCKET, SO_TIMESTAMPNS, &yes, sizeof(yes)); @@ -568,9 +590,9 @@ int main(void) { timeout.tv_sec = 10; timeout.tv_usec = 0; int retval = select(smax + 1, &readSockSet, NULL, NULL, &timeout); - + uint64_t reception_time = get_time_now(); // use this if other methods fail if (retval > 0) { - uint64_t reception_time = get_time_now(); // use this if other methods fail + unsigned t; for (t = 0; t < sockets_open; t++) { if (FD_ISSET(sockets[t].number, &readSockSet)) { @@ -646,7 +668,8 @@ int main(void) { reception_time = reception_time * 1000000000; reception_time = reception_time + ts->tv_nsec; } else { - fprintf(stderr, "Can't establish a reception time -- falling back on get_time_now() \n"); + fprintf(stderr, + "Can't establish a reception time -- falling back on get_time_now() \n"); } } @@ -672,7 +695,8 @@ int main(void) { sender_port = ntohs(sa4->sin_port); } -// if ((sender_port == sockets[t].port) && (connection_ip_family == AF_INET)) { + // if ((sender_port == sockets[t].port) && (connection_ip_family == + // AF_INET)) { if (sender_port == sockets[t].port) { char sender_string[256]; memset(sender_string, 0, sizeof(sender_string)); @@ -684,33 +708,38 @@ int main(void) { // print_buffer(buf, recv_len); - // now, find or create a record for this ip - struct ptpSource *the_clock = findOrCreateSource(&clocks, sender_string); - - switch (buf[0] & 0xF) { - case Sync: { // if it's a sync - struct ptp_sync_message *msg = (struct ptp_sync_message *)buf; - if (msg->header.correctionField != 0) - fprintf(stderr, "correctionField: %" PRIx64 ".\n", - msg->header.correctionField); - // fprintf(stderr, "SYNC %u.\n", ntohs(msg->header.sequenceId)); - int discard_sync = 0; - - if ((the_clock->current_stage != nothing_seen) && - (the_clock->current_stage != waiting_for_sync)) { - - // here, we have an unexpected SYNC. It could be because the - // previous transaction sequence failed for some reason - // But, if that is so, the SYNC will have a newer sequence number - // so, ignore it if it's older. - - uint16_t new_sync_sequence_number = ntohs(msg->header.sequenceId); - int16_t sequence_number_difference = - (the_clock->sequence_number - new_sync_sequence_number); - if ((sequence_number_difference > 0) && (sequence_number_difference < 8)) - discard_sync = 1; - - // clang-format off + // now, find or create a record for this ip / clock_id combination + struct ptp_common_message_header *mt = (struct ptp_common_message_header *)buf; + uint64_t packet_clock_id = nctohl(&mt->clockIdentity[0]); + uint64_t packet_clock_id_low = nctohl(&mt->clockIdentity[4]); + packet_clock_id = packet_clock_id << 32; + packet_clock_id = packet_clock_id + packet_clock_id_low; + struct ptpSource *the_clock = + findOrCreateSource(&clocks, sender_string, packet_clock_id); + + switch (buf[0] & 0xF) { + case Sync: { // if it's a sync + struct ptp_sync_message *msg = (struct ptp_sync_message *)buf; + if (msg->header.correctionField != 0) + fprintf(stderr, "correctionField: %" PRIx64 ".\n", msg->header.correctionField); + // fprintf(stderr, "SYNC %u.\n", ntohs(msg->header.sequenceId)); + int discard_sync = 0; + + if ((the_clock->current_stage != nothing_seen) && + (the_clock->current_stage != waiting_for_sync)) { + + // here, we have an unexpected SYNC. It could be because the + // previous transaction sequence failed for some reason + // But, if that is so, the SYNC will have a newer sequence number + // so, ignore it if it's older. + + uint16_t new_sync_sequence_number = ntohs(msg->header.sequenceId); + int16_t sequence_number_difference = + (the_clock->sequence_number - new_sync_sequence_number); + if ((sequence_number_difference > 0) && (sequence_number_difference < 8)) + discard_sync = 1; + + // clang-format off /* fprintf(stderr, "Sync %u expecting to be in state nothing_seen (%u) or waiting_for_sync " @@ -720,138 +749,138 @@ int main(void) { discard_sync ? " Discarded because it is older." : "", the_clock->ip); */ - // clang-format on - + // clang-format on + } + if (discard_sync == 0) { + + // if necessary, initialise a new shared clock record + // hacky. + if (the_clock->shared_clock_number == -1) { + if (next_free_clock_source_entry == MAX_SHARED_CLOCKS) + fprintf(stderr, "No more shared clocks!\n"); + // associate and initialise a shared clock record + int i = next_free_clock_source_entry++; + the_clock->shared_clock_number = i; + int rc = pthread_mutex_lock(&shared_memory->shm_mutex); + if (rc != 0) + fprintf(stderr, "Can't acquire mutex to initialise a clock!\n"); + memset(&shared_memory->clocks[i], 0, sizeof(struct clock_source)); + strncpy((char *)&shared_memory->clocks[i].ip, the_clock->ip, + INET6_ADDRSTRLEN - 1); + shared_memory->clocks[i].valid = 1; + rc = pthread_mutex_unlock(&shared_memory->shm_mutex); + if (rc != 0) + fprintf(stderr, "Can't release mutex after initialising a clock!\n"); } - if (discard_sync == 0) { - - // if necessary, initialise a new shared clock record - // hacky. - if (the_clock->shared_clock_number == -1) { - if (next_free_clock_source_entry == MAX_SHARED_CLOCKS) - fprintf(stderr,"No more shared clocks!\n"); - // associate and initialise a shared clock record - int i = next_free_clock_source_entry++; - the_clock->shared_clock_number = i; - int rc = pthread_mutex_lock(&shared_memory->shm_mutex); - if (rc != 0) - fprintf(stderr,"Can't acquire mutex to initialise a clock!\n"); - memset(&shared_memory->clocks[i],0,sizeof(struct clock_source)); - strncpy((char *)&shared_memory->clocks[i].ip, the_clock->ip, INET6_ADDRSTRLEN-1); - shared_memory->clocks[i].valid = 1; - rc = pthread_mutex_unlock(&shared_memory->shm_mutex); - if (rc != 0) - fprintf(stderr,"Can't release mutex after initialising a clock!\n"); - } - - - - the_clock->sequence_number = ntohs(msg->header.sequenceId); - the_clock->t2 = reception_time; - memset(&m, 0, sizeof(m)); - m.header.transportSpecificAndMessageID = 0x11; - m.header.reservedAndVersionPTP = 0x02; - m.header.messageLength = htons(44); - m.header.flags = htons(0x608); - m.header.sourcePortID = htons(1); - m.header.controlOtherMessage = 5; - m.header.sequenceId = htons(the_clock->sequence_number); - struct ifaddrs *ifaddr = NULL; - struct ifaddrs *ifa = NULL; - - if ((status = getifaddrs(&ifaddr) == -1)) { - fprintf(stderr, "getifaddrs: %s\n", gai_strerror(status)); - } else { - int found = 0; - for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { - if ((ifa->ifa_addr) && (ifa->ifa_addr->sa_family == AF_PACKET)) { - struct sockaddr_ll *s = (struct sockaddr_ll *)ifa->ifa_addr; - if ((strcmp(ifa->ifa_name, "lo") != 0) && (found == 0)) { - memcpy(&m.header.clockIdentity, &s->sll_addr, s->sll_halen); - found = 1; - } + + the_clock->sequence_number = ntohs(msg->header.sequenceId); + the_clock->t2 = reception_time; + memset(&m, 0, sizeof(m)); + m.header.transportSpecificAndMessageID = 0x11; + m.header.reservedAndVersionPTP = 0x02; + m.header.messageLength = htons(44); + m.header.flags = htons(0x608); + m.header.sourcePortID = htons(1); + m.header.controlOtherMessage = 5; + m.header.sequenceId = htons(the_clock->sequence_number); + struct ifaddrs *ifaddr = NULL; + struct ifaddrs *ifa = NULL; + + if ((status = getifaddrs(&ifaddr) == -1)) { + fprintf(stderr, "getifaddrs: %s\n", gai_strerror(status)); + } else { + int found = 0; + for (ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) { + if ((ifa->ifa_addr) && (ifa->ifa_addr->sa_family == AF_PACKET)) { + struct sockaddr_ll *s = (struct sockaddr_ll *)ifa->ifa_addr; + if ((strcmp(ifa->ifa_name, "lo") != 0) && (found == 0)) { + memcpy(&m.header.clockIdentity, &s->sll_addr, s->sll_halen); + found = 1; } } - freeifaddrs(ifaddr); } + freeifaddrs(ifaddr); + } - struct msghdr header; - struct iovec io; - memset(&header, 0, sizeof(header)); - memset(&io, 0, sizeof(io)); - header.msg_name = &from_sock_addr; - header.msg_namelen = sizeof(from_sock_addr); - header.msg_iov = &io; - header.msg_iov->iov_base = &m; - header.msg_iov->iov_len = sizeof(m); - header.msg_iovlen = 1; - uint64_t transmission_time = get_time_now(); // in case nothing better works - if ((sendmsg(sockets[t].number, &header, 0)) == -1) { - fprintf(stderr, "Error in sendmsg,\t [errno = %d]\n", errno); - } + struct msghdr header; + struct iovec io; + memset(&header, 0, sizeof(header)); + memset(&io, 0, sizeof(io)); + header.msg_name = &from_sock_addr; + header.msg_namelen = sizeof(from_sock_addr); + header.msg_iov = &io; + header.msg_iov->iov_base = &m; + header.msg_iov->iov_len = sizeof(m); + header.msg_iovlen = 1; + uint64_t transmission_time = get_time_now(); // in case nothing better works + if ((sendmsg(sockets[t].number, &header, 0)) == -1) { + fprintf(stderr, "Error in sendmsg,\t [errno = %d]\n", errno); + } - // Obtain the sent packet timestamp. - char data[256]; - struct msghdr msg; - struct iovec entry; - struct sockaddr_in from_addr; - struct { - struct cmsghdr cm; - char control[512]; - } control; - - memset(&msg, 0, sizeof(msg)); - msg.msg_iov = &entry; - msg.msg_iovlen = 1; - entry.iov_base = data; - entry.iov_len = sizeof(data); - msg.msg_name = (caddr_t)&from_addr; - msg.msg_namelen = sizeof(from_addr); - msg.msg_control = &control; - msg.msg_controllen = sizeof(control); - if (recvmsg(sockets[t].number, &msg, MSG_ERRQUEUE) == -1) { - // can't get the transmission time directly - // possibly because it's not implemented - struct timespec tv_ioctl; - tv_ioctl.tv_sec = 0; - tv_ioctl.tv_nsec = 0; - int error = ioctl(sockets[t].number, SIOCGSTAMPNS, &tv_ioctl); - if (error == 0) { // somnetimes, even this doesn't work, so we fall back on the earlier get_time_now(); - transmission_time = tv_ioctl.tv_sec; - transmission_time = transmission_time * 1000000000; - transmission_time = transmission_time + tv_ioctl.tv_nsec; - } + // Obtain the sent packet timestamp. + char data[256]; + struct msghdr msg; + struct iovec entry; + struct sockaddr_in from_addr; + struct { + struct cmsghdr cm; + char control[512]; + } control; + + memset(&msg, 0, sizeof(msg)); + msg.msg_iov = &entry; + msg.msg_iovlen = 1; + entry.iov_base = data; + entry.iov_len = sizeof(data); + msg.msg_name = (caddr_t)&from_addr; + msg.msg_namelen = sizeof(from_addr); + msg.msg_control = &control; + msg.msg_controllen = sizeof(control); + if (recvmsg(sockets[t].number, &msg, MSG_ERRQUEUE) == -1) { + // can't get the transmission time directly + // possibly because it's not implemented + struct timespec tv_ioctl; + tv_ioctl.tv_sec = 0; + tv_ioctl.tv_nsec = 0; + int error = ioctl(sockets[t].number, SIOCGSTAMPNS, &tv_ioctl); + if (error == 0) { // somnetimes, even this doesn't work, so we fall back on + // the earlier get_time_now(); + transmission_time = tv_ioctl.tv_sec; + transmission_time = transmission_time * 1000000000; + transmission_time = transmission_time + tv_ioctl.tv_nsec; + } + } else { + // get the time + int level, type; + struct cmsghdr *cm; + struct timespec *ts = NULL; + for (cm = CMSG_FIRSTHDR(&msg); cm != NULL; cm = CMSG_NXTHDR(&msg, cm)) { + level = cm->cmsg_level; + type = cm->cmsg_type; + if (SOL_SOCKET == level && SO_TIMESTAMPING == type) { + /* + struct timespec *stamp = (struct timespec + *)CMSG_DATA(cm); fprintf(stderr, "SO_TIMESTAMPING Tx: "); + fprintf(stderr, "SW %ld.%09ld\n", + (long)stamp->tv_sec, (long)stamp->tv_nsec); stamp++; + // skip deprecated HW transformed + stamp++; + fprintf(stderr, "SO_TIMESTAMPING Tx: "); + fprintf(stderr, "HW raw %ld.%09ld\n", + (long)stamp->tv_sec, (long)stamp->tv_nsec); + */ + ts = (struct timespec *)CMSG_DATA(cm); + transmission_time = ts->tv_sec; + transmission_time = transmission_time * 1000000000; + transmission_time = transmission_time + ts->tv_nsec; } else { - // get the time - int level, type; - struct cmsghdr *cm; - struct timespec *ts = NULL; - for (cm = CMSG_FIRSTHDR(&msg); cm != NULL; cm = CMSG_NXTHDR(&msg, cm)) { - level = cm->cmsg_level; - type = cm->cmsg_type; - if (SOL_SOCKET == level && SO_TIMESTAMPING == type) { - /* - struct timespec *stamp = (struct timespec - *)CMSG_DATA(cm); fprintf(stderr, "SO_TIMESTAMPING Tx: "); - fprintf(stderr, "SW %ld.%09ld\n", - (long)stamp->tv_sec, (long)stamp->tv_nsec); stamp++; - // skip deprecated HW transformed - stamp++; - fprintf(stderr, "SO_TIMESTAMPING Tx: "); - fprintf(stderr, "HW raw %ld.%09ld\n", - (long)stamp->tv_sec, (long)stamp->tv_nsec); - */ - ts = (struct timespec *)CMSG_DATA(cm); - transmission_time = ts->tv_sec; - transmission_time = transmission_time * 1000000000; - transmission_time = transmission_time + ts->tv_nsec; - } else { - // fprintf(stderr, "Can't establish a transmission time! Falling back on get_time_now().\n"); - } - } + // fprintf(stderr, "Can't establish a transmission time! Falling back on + // get_time_now().\n"); } + } + } - // clang-format off + // clang-format off /* // fprintf(stderr, "DREQ to %s\n", the_clock->ip); if (sendto(sockets[t].number, &m, sizeof(m), 0, @@ -861,135 +890,136 @@ int main(void) { return 4; } */ - // clang-format on - - the_clock->t3 = transmission_time; - // int64_t ttd = transmission_time - the_clock->t3; - // fprintf(stderr, "transmission time delta: %f.\n", ttd*0.000000001); + // clang-format on - the_clock->current_stage = sync_seen; - - } - } break; - - case Follow_Up: { - struct ptp_follow_up_message *msg = (struct ptp_follow_up_message *)buf; - if ((the_clock->current_stage == sync_seen) && - (the_clock->sequence_number == ntohs(msg->header.sequenceId))) { - uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]); - uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]); - uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]); - uint64_t preciseOriginTimestamp = seconds_hi; - preciseOriginTimestamp = preciseOriginTimestamp << 32; - preciseOriginTimestamp = preciseOriginTimestamp + seconds_low; - preciseOriginTimestamp = preciseOriginTimestamp * 1000000000L; - preciseOriginTimestamp = preciseOriginTimestamp + nanoseconds; - the_clock->t1 = preciseOriginTimestamp; - the_clock->current_stage = follow_up_seen; - } else { - if (the_clock->current_stage != waiting_for_sync) { -/* - fprintf( - stderr, - "Follow_Up %u expecting to be in state sync_seen (%u). Stage error -- " - "current state is %u, sequence %u. Ignoring it. %s\n", - ntohs(msg->header.sequenceId), sync_seen, the_clock->current_stage, - the_clock->sequence_number, the_clock->ip); -*/ - } - } - } break; - case Delay_Resp: { - struct ptp_delay_resp_message *msg = (struct ptp_delay_resp_message *)buf; - if ((the_clock->current_stage == follow_up_seen) && - (the_clock->sequence_number == ntohs(msg->header.sequenceId))) { - uint16_t seconds_hi = nctohs(&msg->delay_resp.receiveTimestamp[0]); - uint32_t seconds_low = nctohl(&msg->delay_resp.receiveTimestamp[2]); - uint32_t nanoseconds = nctohl(&msg->delay_resp.receiveTimestamp[6]); - uint64_t receiveTimestamp = seconds_hi; - receiveTimestamp = receiveTimestamp << 32; - receiveTimestamp = receiveTimestamp + seconds_low; - receiveTimestamp = receiveTimestamp * 1000000000L; - receiveTimestamp = receiveTimestamp + nanoseconds; - the_clock->t4 = receiveTimestamp; + the_clock->t3 = transmission_time; + // int64_t ttd = transmission_time - the_clock->t3; + // fprintf(stderr, "transmission time delta: %f.\n", ttd*0.000000001); + the_clock->current_stage = sync_seen; + } + } break; + + case Follow_Up: { + struct ptp_follow_up_message *msg = (struct ptp_follow_up_message *)buf; + if ((the_clock->current_stage == sync_seen) && + (the_clock->sequence_number == ntohs(msg->header.sequenceId))) { + uint16_t seconds_hi = nctohs(&msg->follow_up.preciseOriginTimestamp[0]); + uint32_t seconds_low = nctohl(&msg->follow_up.preciseOriginTimestamp[2]); + uint32_t nanoseconds = nctohl(&msg->follow_up.preciseOriginTimestamp[6]); + uint64_t preciseOriginTimestamp = seconds_hi; + preciseOriginTimestamp = preciseOriginTimestamp << 32; + preciseOriginTimestamp = preciseOriginTimestamp + seconds_low; + preciseOriginTimestamp = preciseOriginTimestamp * 1000000000L; + preciseOriginTimestamp = preciseOriginTimestamp + nanoseconds; + the_clock->t1 = preciseOriginTimestamp; + the_clock->current_stage = follow_up_seen; + } else { + if (the_clock->current_stage != waiting_for_sync) { /* - // reference: Figure 12 - (t4 - t1) [always positive, a difference of two distant clock times] - less (t3 -t2) [always positive, a difference of two local clock times] - is equal to t(m->s) + t(s->m), thus twice the propagation time - assuming symmetrical delays + fprintf( + stderr, + "Follow_Up %u expecting to be in state sync_seen + (%u). Stage error -- " "current state is %u, sequence %u. Ignoring it. + %s\n", ntohs(msg->header.sequenceId), sync_seen, the_clock->current_stage, + the_clock->sequence_number, the_clock->ip); */ + } + } + } break; + case Delay_Resp: { + struct ptp_delay_resp_message *msg = (struct ptp_delay_resp_message *)buf; + if ((the_clock->current_stage == follow_up_seen) && + (the_clock->sequence_number == ntohs(msg->header.sequenceId))) { + uint16_t seconds_hi = nctohs(&msg->delay_resp.receiveTimestamp[0]); + uint32_t seconds_low = nctohl(&msg->delay_resp.receiveTimestamp[2]); + uint32_t nanoseconds = nctohl(&msg->delay_resp.receiveTimestamp[6]); + uint64_t receiveTimestamp = seconds_hi; + receiveTimestamp = receiveTimestamp << 32; + receiveTimestamp = receiveTimestamp + seconds_low; + receiveTimestamp = receiveTimestamp * 1000000000L; + receiveTimestamp = receiveTimestamp + nanoseconds; + the_clock->t4 = receiveTimestamp; - // int64_t distant_time_difference = the_clock->t4 - the_clock->t1; - // int64_t local_time_difference = the_clock->t3 - the_clock->t2; - // int64_t double_propagation_time = distant_time_difference - distant_time_difference; // better be positive - // fprintf(stderr, "distant_time_difference: %" PRId64 ", local_time_difference: %" PRId64 " , double_propagation_time %" PRId64 ".\n", distant_time_difference, local_time_difference, double_propagation_time); - - the_clock->t5 = - reception_time; // t5 - t3 gives us the out-and-back time locally - // -- an instantaneous quality index - // t5 - t2 gives us an overall interchange time - // from the Sync to the Delay Resp - - if ((the_clock->t5 - the_clock->t2) < 25 * 1000000) { - if ((the_clock->t4 - the_clock->t1) < 60 * 1000000) { - - // calculate delay and calculate offset - // fprintf(stderr, "t1: %016" PRIx64 ", t2: %" PRIx64 ", t3: %" PRIx64 ", - // t4: - // %" PRIx64 - // ".\n",t1,t2,t3,t4); fprintf(stderr, "nominal remote transaction time: - // %" PRIx64 " = - // %" PRIu64 "ns; local transaction time: %" PRIx64 " = %" PRId64 "ns.\n", - // t4-t1, t4-t1, t3-t2, t3-t2); - - uint64_t instantaneous_offset = the_clock->t1 - the_clock->t2; - int64_t change_in_offset = - instantaneous_offset - the_clock->previous_offset; - - // now, decide whether to keep the sample for averaging, etc. - the_clock->sample_number++; - if (the_clock->sample_number == 16) { // discard the approx first two seconds! - // remove previous samples before this number - the_clock->vacant_samples = - MAX_TIMING_SAMPLES; // invalidate all the previous samples used for - // averaging, etc. - the_clock->next_sample_goes_here = 0; - } + /* + // reference: Figure 12 + (t4 - t1) [always positive, a difference of two distant clock times] + less (t3 -t2) [always positive, a difference of two local clock times] + is equal to t(m->s) + t(s->m), thus twice the propagation time + assuming symmetrical delays + */ - int64_t discontinuity_threshold = 250000000; // nanoseconds - if ((change_in_offset > discontinuity_threshold) || - (change_in_offset < (-discontinuity_threshold))) { - fprintf(stderr, - "large discontinuity of %+f seconds detected, sequence %u\n", - change_in_offset * 0.000000001, the_clock->sequence_number); - the_clock->vacant_samples = - MAX_TIMING_SAMPLES; // invalidate all the previous samples used for - // averaging, etc. - the_clock->next_sample_goes_here = 0; - } + // int64_t distant_time_difference = the_clock->t4 - the_clock->t1; + // int64_t local_time_difference = the_clock->t3 - the_clock->t2; + // int64_t double_propagation_time = distant_time_difference - + // distant_time_difference; // better be positive fprintf(stderr, + // "distant_time_difference: %" PRId64 ", local_time_difference: %" PRId64 " , + // double_propagation_time %" PRId64 ".\n", distant_time_difference, + // local_time_difference, double_propagation_time); + + the_clock->t5 = + reception_time; // t5 - t3 gives us the out-and-back time locally + // -- an instantaneous quality index + // t5 - t2 gives us an overall interchange time + // from the Sync to the Delay Resp + + if ((the_clock->t5 - the_clock->t2) < 25 * 1000000) { + if ((the_clock->t4 - the_clock->t1) < 60 * 1000000) { + + // calculate delay and calculate offset + // fprintf(stderr, "t1: %016" PRIx64 ", t2: %" PRIx64 ", t3: %" PRIx64 ", + // t4: + // %" PRIx64 + // ".\n",t1,t2,t3,t4); fprintf(stderr, "nominal remote transaction time: + // %" PRIx64 " = + // %" PRIu64 "ns; local transaction time: %" PRIx64 " = %" PRId64 "ns.\n", + // t4-t1, t4-t1, t3-t2, t3-t2); + + uint64_t instantaneous_offset = the_clock->t1 - the_clock->t2; + int64_t change_in_offset = + instantaneous_offset - the_clock->previous_offset; + + // now, decide whether to keep the sample for averaging, etc. + the_clock->sample_number++; + if (the_clock->sample_number == + 16) { // discard the approx first two seconds! + // remove previous samples before this number + the_clock->vacant_samples = + MAX_TIMING_SAMPLES; // invalidate all the previous samples used for + // averaging, etc. + the_clock->next_sample_goes_here = 0; + } - // now, store the remote and local times in the array - the_clock->samples[the_clock->next_sample_goes_here].local = - the_clock->t2; - the_clock->samples[the_clock->next_sample_goes_here].remote = - the_clock->t1; - uint64_t diff = the_clock->t1 - the_clock->t2; - the_clock->samples[the_clock->next_sample_goes_here].local_to_remote_offset = diff; - the_clock->next_sample_goes_here++; - if (the_clock->next_sample_goes_here == MAX_TIMING_SAMPLES) - the_clock->next_sample_goes_here = 0; - if (the_clock->vacant_samples > 0) - the_clock->vacant_samples--; + int64_t discontinuity_threshold = 250000000; // nanoseconds + if ((change_in_offset > discontinuity_threshold) || + (change_in_offset < (-discontinuity_threshold))) { + fprintf(stderr, + "large discontinuity of %+f seconds detected, sequence %u\n", + change_in_offset * 0.000000001, the_clock->sequence_number); + the_clock->vacant_samples = + MAX_TIMING_SAMPLES; // invalidate all the previous samples used for + // averaging, etc. + the_clock->next_sample_goes_here = 0; + } + // now, store the remote and local times in the array + the_clock->samples[the_clock->next_sample_goes_here].local = the_clock->t2; + the_clock->samples[the_clock->next_sample_goes_here].remote = the_clock->t1; + uint64_t diff = the_clock->t1 - the_clock->t2; + the_clock->samples[the_clock->next_sample_goes_here] + .local_to_remote_offset = diff; + the_clock->next_sample_goes_here++; + if (the_clock->next_sample_goes_here == MAX_TIMING_SAMPLES) + the_clock->next_sample_goes_here = 0; + if (the_clock->vacant_samples > 0) + the_clock->vacant_samples--; - uint64_t estimated_offset = instantaneous_offset; + uint64_t estimated_offset = instantaneous_offset; - // fprintf(stderr, "Offset: %" PRIx64 ", delay %f.\n", offset, - // delay*0.000000001); + // fprintf(stderr, "Offset: %" PRIx64 ", delay %f.\n", offset, + // delay*0.000000001); - // clang-format off + // clang-format off /* // here, let's try to use the t1 - remote time and t2 - local time @@ -1051,38 +1081,35 @@ int main(void) { // uint64_t offset = the_clock->t1 - the_clock->t2; uint64_t estimated_offset = remote_estimate - the_clock->t2; */ - // clang-format on - - + // clang-format on - // here, calculate the average offset + // here, calculate the average offset - int e; - long double offsets = 0; - int sample_count = MAX_TIMING_SAMPLES - the_clock->vacant_samples; - for (e = 0; e < sample_count; e++) { - uint64_t ho = the_clock->samples[e].local_to_remote_offset; - ho = ho >> 12; + int e; + long double offsets = 0; + int sample_count = MAX_TIMING_SAMPLES - the_clock->vacant_samples; + for (e = 0; e < sample_count; e++) { + uint64_t ho = the_clock->samples[e].local_to_remote_offset; + ho = ho >> 12; - offsets = offsets + 1.0 * ho; - } + offsets = offsets + 1.0 * ho; + } - offsets = offsets / sample_count; + offsets = offsets / sample_count; - // uint64_t offset = (uint64_t)offsets; + // uint64_t offset = (uint64_t)offsets; - estimated_offset = (uint64_t)offsets; + estimated_offset = (uint64_t)offsets; - estimated_offset = estimated_offset << 12; - - // just to keep the print line happy - long double gradient = 1.0; - // uint64_t offset = the_clock->t1 - the_clock->t2; + estimated_offset = estimated_offset << 12; + // just to keep the print line happy + long double gradient = 1.0; + // uint64_t offset = the_clock->t1 - the_clock->t2; -// clang-format on + // clang-format on -// clang-format off + // clang-format off /* // here, use a Savitzky–Golay filter to smooth the last 9 offsets // see https://en.wikipedia.org/wiki/Savitzky–Golay_filter @@ -1118,29 +1145,32 @@ int main(void) { long double gradient = 1.0; int sample_count = window_size; */ -// clang-format on + // clang-format on - int64_t variation = 0; + int64_t variation = 0; - if (the_clock->previous_estimated_offset != 0) { - variation = estimated_offset - the_clock->previous_estimated_offset; - } else { - estimated_offset = instantaneous_offset; - } - - // here, update the shared clock information + if (the_clock->previous_estimated_offset != 0) { + variation = estimated_offset - the_clock->previous_estimated_offset; + } else { + estimated_offset = instantaneous_offset; + } - int rc = pthread_mutex_lock(&shared_memory->shm_mutex); - if (rc != 0) - fprintf(stderr,"Can't acquire mutex to update a clock!\n"); - shared_memory->clocks[the_clock->shared_clock_number].local_time = the_clock->t2; - shared_memory->clocks[the_clock->shared_clock_number].source_time = estimated_offset + the_clock->t2; - shared_memory->clocks[the_clock->shared_clock_number].local_to_source_time_offset = estimated_offset; - rc = pthread_mutex_unlock(&shared_memory->shm_mutex); - if (rc != 0) - fprintf(stderr,"Can't release mutex after updating a clock!\n"); + // here, update the shared clock information + + int rc = pthread_mutex_lock(&shared_memory->shm_mutex); + if (rc != 0) + fprintf(stderr, "Can't acquire mutex to update a clock!\n"); + shared_memory->clocks[the_clock->shared_clock_number].local_time = + the_clock->t2; + shared_memory->clocks[the_clock->shared_clock_number].source_time = + estimated_offset + the_clock->t2; + shared_memory->clocks[the_clock->shared_clock_number] + .local_to_source_time_offset = estimated_offset; + rc = pthread_mutex_unlock(&shared_memory->shm_mutex); + if (rc != 0) + fprintf(stderr, "Can't release mutex after updating a clock!\n"); - // clang-format off + // clang-format off fprintf(stderr, "estimated offset: %" PRIx64 @@ -1150,40 +1180,38 @@ int main(void) { (the_clock->t5 - the_clock->t2) * 0.000000001, (gradient - 1.0) * 1000000, the_clock->ip, the_clock->sequence_number, sample_count); - // clang-format on + // clang-format on - the_clock->previous_estimated_offset = estimated_offset; - the_clock->previous_offset = instantaneous_offset; - } else { - // fprintf(stderr, - // "t4 - t1 (sync and delay response) time %f is too long. - // Discarding. %s\n", (the_clock->t4 - the_clock->t1)*0.000000001, - // the_clock->ip); - } + the_clock->previous_estimated_offset = estimated_offset; + the_clock->previous_offset = instantaneous_offset; } else { - // fprintf(stderr, "t5 - t2 time %f (total transaction time) is too long. - // Discarding. %s\n", (the_clock->t5 - the_clock->t2)*0.000000001, + // fprintf(stderr, + // "t4 - t1 (sync and delay response) time %f is too long. + // Discarding. %s\n", (the_clock->t4 - the_clock->t1)*0.000000001, // the_clock->ip); } - the_clock->current_stage = nothing_seen; } else { - if (the_clock->current_stage != waiting_for_sync) { -/* - fprintf(stderr, - "Delay_Resp %u expecting to be in state follow_up_seen (%u). Stage " - "error -- " - "current state is %u, sequence %u. Ignoring it. %s\n", - ntohs(msg->header.sequenceId), follow_up_seen, - the_clock->current_stage, the_clock->sequence_number, - the_clock->ip); -*/ - } + // fprintf(stderr, "t5 - t2 time %f (total transaction time) is too long. + // Discarding. %s\n", (the_clock->t5 - the_clock->t2)*0.000000001, + // the_clock->ip); + } + the_clock->current_stage = nothing_seen; + } else { + if (the_clock->current_stage != waiting_for_sync) { + /* + fprintf(stderr, + "Delay_Resp %u expecting to be in state + follow_up_seen (%u). Stage " "error -- " "current state is %u, sequence %u. + Ignoring it. %s\n", ntohs(msg->header.sequenceId), follow_up_seen, + the_clock->current_stage, + the_clock->sequence_number, the_clock->ip); + */ } - } break; - default: - break; } - + } break; + default: + break; + } } } } @@ -1192,6 +1220,8 @@ int main(void) { } else if (retval < 0) { // check errno/WSAGetLastError(), call perror(), etc ... } + // here, invalidate records and entries that are out of date + // deleteObseleteClockRecords(&clocks, reception_time); } }