]> git.ipfire.org Git - thirdparty/collectd.git/commitdiff
add margalit implementation
authorSvetlana Shmidt <sshmidt@google.com>
Thu, 13 Aug 2020 10:00:01 +0000 (15:00 +0500)
committerGitHub <noreply@github.com>
Thu, 13 Aug 2020 10:00:01 +0000 (15:00 +0500)
distbench/margalit.c

index 406082c22c8fa96f5769cba1f2032029f5c3f794..790afa1d6c20e01c3dca091cf735c9d751624b6e 100644 (file)
@@ -1,3 +1,314 @@
-int counter = ~0; // Just an example
+/**
+ * collectd - src/daemon/distribution.c
+ * Copyright (C) 2019-2020  Google LLC
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a
+ * copy of this software and associated documentation files (the "Software"),
+ * to deal in the Software without restriction, including without limitation
+ * the rights to use, copy, modify, merge, publish, distribute, sublicense,
+ * and/or sell copies of the Software, and to permit persons to whom the
+ * Software is furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+ * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+ * DEALINGS IN THE SOFTWARE.
+ *
+ * Author:
+ *   Elene Margalitadze <elene.margalit at gmail.com>
+ **/
 
-int my_function() { return --counter; }
+#include "collectd.h"
+#include "distribution.h"
+#include <math.h>
+#include <pthread.h>
+
+typedef struct {
+  uint64_t bucket_counter;
+  double min_boundary;
+  double max_boundary;
+} bucket_t;
+
+struct distribution_s {
+  bucket_t *buckets;
+  size_t num_buckets;
+  uint64_t total_scalar_count; // count of all registered scalar metrics
+  double raw_data_sum;         // sum of all registered raw scalar metrics
+  pthread_mutex_t mutex;
+};
+
+/*Private bucket constructor, min_boundary is inclusive, max_boundary is
+exclusive because the max_boundary Infinity is exclusive and we want the other
+max_boundaries to be consistent with that.*/
+static bucket_t initialize_bucket(double min_boundary, double max_boundary) {
+  bucket_t new_bucket = {
+      .bucket_counter = 0,
+      .min_boundary = min_boundary,
+      .max_boundary = max_boundary,
+  };
+  return new_bucket;
+}
+
+distribution_t *distribution_new_linear(size_t num_buckets, double size) {
+  if ((num_buckets == 0) || (size <= 0)) {
+    errno = EINVAL;
+    return NULL;
+  }
+
+  distribution_t *new_distribution = calloc(1, sizeof(distribution_t));
+  bucket_t *buckets = calloc(num_buckets, sizeof(bucket_t));
+
+  if ((new_distribution == NULL) || (buckets == NULL)) {
+    free(new_distribution);
+    free(buckets);
+    return NULL;
+  }
+  new_distribution->buckets = buckets;
+
+  for (size_t i = 0; i < num_buckets; i++) {
+    if (i < num_buckets - 1) {
+      new_distribution->buckets[i] =
+          initialize_bucket(i * size, i * size + size);
+    } else {
+      new_distribution->buckets[i] = initialize_bucket(i * size, INFINITY);
+    }
+    double min_boundary = i * size;
+    double max_boundary = (i == num_buckets - 1) ? INFINITY : i * size + size;
+    new_distribution->buckets[i] = initialize_bucket(min_boundary, max_boundary);
+  }
+
+  new_distribution->num_buckets = num_buckets;
+  new_distribution->total_scalar_count = 0;
+  new_distribution->raw_data_sum = 0;
+  pthread_mutex_init(&new_distribution->mutex, NULL);
+  return new_distribution;
+}
+
+distribution_t *distribution_new_exponential(size_t num_buckets, double factor,
+                                             double base) {
+  if ((num_buckets == 0) || (factor <= 0) || (base <= 1)) {
+    errno = EINVAL;
+    return NULL;
+  }
+
+  distribution_t *new_distribution = calloc(1, sizeof(distribution_t));
+  bucket_t *buckets = calloc(num_buckets, sizeof(bucket_t));
+
+  if ((new_distribution == NULL) || (buckets == NULL)) {
+    free(new_distribution);
+    free(buckets);
+    return NULL;
+  }
+  new_distribution->buckets = buckets;
+
+  for (size_t i = 0; i < num_buckets; i++) {
+    double min_boundary = (i == 0) ? 0 : new_distribution->buckets[i - 1].max_boundary;
+    double max_boundary = (i == num_buckets - 1) ? INFINITY : factor * pow(base, i);
+    new_distribution->buckets[i] = initialize_bucket(min_boundary, max_boundary);
+  }
+
+  new_distribution->num_buckets = num_buckets;
+  new_distribution->total_scalar_count = 0;
+  new_distribution->raw_data_sum = 0;
+  pthread_mutex_init(&new_distribution->mutex, NULL);
+  return new_distribution;
+}
+
+distribution_t *distribution_new_custom(size_t num_bounds,
+                                        double *custom_max_boundaries) {
+
+  if ((num_bounds == 0) || (custom_max_boundaries == NULL)) {
+    errno = EINVAL;
+    return NULL;
+  }
+
+  for (size_t i = 1; i < num_bounds; i++) {
+    if ((custom_max_boundaries[i] <= custom_max_boundaries[i - 1]) ||
+        (custom_max_boundaries[i] == INFINITY)) {
+      errno = EINVAL;
+      return NULL;
+    }
+  }
+
+  distribution_t *new_distribution = calloc(1, sizeof(distribution_t));
+  bucket_t *buckets =
+      calloc(num_bounds + 1, sizeof(bucket_t)); //+1 for infinity bucket
+
+  if ((new_distribution == NULL) || (buckets == NULL)) {
+    free(new_distribution);
+    free(buckets);
+    return NULL;
+  }
+  new_distribution->buckets = buckets;
+
+  for (size_t i = 0; i < num_bounds + 1; i++) {
+    double min_boundary = (i == 0) ? 0 : new_distribution->buckets[i - 1].max_boundary;
+    double max_boundary = (i == num_bounds) ? INFINITY : custom_max_boundaries[i];
+    new_distribution->buckets[i] = initialize_bucket(min_boundary, max_boundary);
+  }
+
+  new_distribution->num_buckets =
+      num_bounds + 1; // plus one for infinity bucket
+  new_distribution->total_scalar_count = 0;
+  new_distribution->raw_data_sum = 0;
+  pthread_mutex_init(&new_distribution->mutex, NULL);
+  return new_distribution;
+}
+
+static int find_bucket(distribution_t *dist, size_t left, size_t right,
+                            double gauge) {
+  if (left > right) {
+    return -1;
+  }
+
+  int mid = left + (right - left) / 2;
+  if (gauge >= dist->buckets[mid].min_boundary &&
+      gauge < dist->buckets[mid].max_boundary) {
+    return mid;
+  }
+
+  if (gauge < dist->buckets[mid].min_boundary) {
+    return find_bucket(dist, left, mid - 1, gauge);
+  }
+
+  return find_bucket(dist, mid + 1, right, gauge);
+}
+
+int distribution_update(distribution_t *dist, double gauge) {
+  if ((dist == NULL) || (gauge <= 0)) {
+    return EINVAL;
+  }
+  /*
+  for(size_t i = 0; i < dist->num_buckets; i++) {
+    if(gauge >= dist->buckets[i].min_boundary && gauge <
+  dist->buckets[i].max_boundary) { dist->buckets[i].bucket_counter++;
+    }
+  }
+  */
+  size_t left = 0;
+  //pthread_mutex_lock(&dist->mutex);
+  size_t right = dist->num_buckets - 1;
+  int index = find_bucket(dist, left, right, gauge);
+
+  dist->buckets[index].bucket_counter++;
+  dist->total_scalar_count++;
+  dist->raw_data_sum += gauge;
+  //pthread_mutex_lock(&dist->mutex);
+  return 0;
+}
+
+double distribution_average(distribution_t *dist) {
+  if (dist == NULL) {
+    errno = EINVAL;
+    return NAN;
+  }
+
+  pthread_mutex_lock(&dist->mutex);
+
+  if (dist->total_scalar_count == 0) {
+    return NAN;
+  }
+  double average = dist->raw_data_sum / (double) dist->total_scalar_count;
+  pthread_mutex_unlock(&dist->mutex);
+  return average;
+}
+
+double distribution_percentile(distribution_t *dist, double percent) {
+  if ((percent < 0) || (percent > 100) || (dist == NULL)) {
+    errno = EINVAL;
+    return NAN;
+  }
+  int sum = 0;
+  double bound = 0;
+  pthread_mutex_lock(&dist->mutex);
+  double target_amount = (percent / 100) * (double) dist->total_scalar_count;
+  for (size_t i = 0; i < dist->num_buckets; i++) {
+    sum += dist->buckets[i].bucket_counter;
+    if ((double)sum >= target_amount) {
+      bound = dist->buckets[i].max_boundary;
+      break;
+    }
+  }
+  pthread_mutex_unlock(&dist->mutex);
+  return bound;
+}
+
+distribution_t *distribution_clone(distribution_t *dist) {
+  if (dist == NULL) {
+    errno = EINVAL;
+    return NULL;
+  }
+
+  distribution_t *new_distribution = calloc(1, sizeof(distribution_t));
+
+  if (new_distribution == NULL) {
+    free(new_distribution);
+    return NULL;
+  }
+  pthread_mutex_lock(&dist->mutex);
+  new_distribution->buckets = distribution_get_buckets(dist);
+  new_distribution->num_buckets = dist->num_buckets;
+
+  new_distribution->total_scalar_count = dist->total_scalar_count;
+  new_distribution->raw_data_sum = dist->raw_data_sum;
+  pthread_mutex_init(&new_distribution->mutex, NULL);
+  pthread_mutex_unlock(&dist->mutex);
+  return new_distribution;
+}
+
+void distribution_destroy(distribution_t *dist) {
+  if (dist == NULL) {
+    return;
+  }
+  free(dist->buckets);
+  free(dist);
+}
+
+bucket_t *distribution_get_buckets(distribution_t *dist) {
+  if (dist == NULL) {
+    errno = EINVAL;
+    return NULL;
+  }
+
+  bucket_t *buckets = calloc(dist->num_buckets, sizeof(bucket_t));
+
+  if (buckets == NULL) {
+    free(buckets);
+    return NULL;
+  }
+  pthread_mutex_lock(&dist->mutex);
+  memcpy(buckets, dist->buckets, sizeof(bucket_t) * dist->num_buckets);
+  pthread_mutex_unlock(&dist->mutex);
+  return buckets;
+}
+
+int distribution_get_num_buckets(distribution_t *dist) {
+  if (dist == NULL) {
+    errno = EINVAL;
+    return -1;
+  }
+
+  return dist->num_buckets;
+}
+
+int distribution_get_total_scalar_count(distribution_t *dist) {
+  if (dist == NULL) {
+    errno = EINVAL;
+    return -1;
+  }
+  return dist->total_scalar_count;
+}
+
+double distribution_get_raw_data_sum(distribution_t *dist) {
+  if (dist == NULL) {
+    errno = EINVAL;
+    return NAN;
+  }
+  return dist->raw_data_sum;
+}