return collecty_queue_submit(self->queue, source, object, value);
}
+int collecty_daemon_flush_source(
+ collecty_daemon* self, collecty_source* source, const char* object) {
+ return collecty_queue_flush_source(self->queue, source, object);
+}
+
static int collecty_daemon_bus_version(sd_bus* bus, const char* path, const char* interface,
const char* property, sd_bus_message* reply, void* data, sd_bus_error* error) {
return sd_bus_message_append(reply, "s", PACKAGE_VERSION);
STAILQ_HEAD(queue, collecty_queue_object) queue;
};
+static int collecty_queue_object_equals(
+ collecty_queue* self, struct collecty_queue_object* o, const char* object) {
+ // Objects match if they are both NULL
+ if (!o->object && !object)
+ return 1;
+
+ // If both have objects, we need to compare
+ if (o->object && object)
+ return collecty_string_equals(o->object, object);
+
+ // Otherwise we have no match
+ return 0;
+}
+
static int collecty_queue_heartbeat(sd_event_source* source, uint64_t usec, void* data) {
collecty_queue* self = data;
int r;
if (o->source != source)
continue;
- // If both objects are NULL we have a match
- if (!o->object && !object)
+ // Check if the object matches
+ if (collecty_queue_object_equals(self, o, object))
return o;
-
- // If both have objects, we need to compare
- else if (o->object && object)
- if (collecty_string_equals(o->object, object))
- return o;
}
// No match found
return r;
}
+static int collecty_queue_flush_object(collecty_queue* self, struct collecty_queue_object* o) {
+ int r;
+
+ // Call the source to write its data
+ r = collecty_source_commit(o->source, o->object, o->num_samples, (const char**)o->samples);
+ if (r < 0) {
+ ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n",
+ collecty_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r));
+ }
+
+ // Remove the object from the queue
+ STAILQ_REMOVE(&self->queue, o, collecty_queue_object, nodes);
+
+ // Free the object
+ collecty_queue_free_object(o);
+
+ return 0;
+}
+
int collecty_queue_flush(collecty_queue* self) {
struct collecty_queue_object* o = NULL;
int r;
if (!o)
break;
- // Call the source to write its data
- r = collecty_source_commit(o->source, o->object, o->num_samples, (const char**)o->samples);
- if (r < 0) {
- ERROR(self->ctx, "Failed to write samples for %s(%s): %s\n",
- collecty_source_name(o->source), (o->object) ? o->object : NULL, strerror(-r));
- }
+ // Flush the object
+ r = collecty_queue_flush_object(self, o);
+ if (r < 0)
+ return r;
+ }
- // Remove the object from the queue
- STAILQ_REMOVE(&self->queue, o, collecty_queue_object, nodes);
+ return 0;
+}
- // Free the object
- collecty_queue_free_object(o);
+int collecty_queue_flush_source(collecty_queue* self,
+ collecty_source* source, const char* object) {
+ struct collecty_queue_object* o = NULL;
+
+ STAILQ_FOREACH(o, &self->queue, nodes) {
+ // Continue if the source does not match
+ if (o->source != source)
+ continue;
+
+ // Continue if the object does not match
+ if (!collecty_queue_object_equals(self, o, object))
+ continue;
+
+ // Flush the object
+ return collecty_queue_flush_object(self, o);
}
return 0;