};
struct openmetrics_request {
+ struct ostream *output;
+
enum openmetrics_request_state state;
struct stats_metrics_iter *stats_iter;
const struct metric *metric;
openmetrics_export_metric_value(req, out, req->metric, timestamp);
}
+static int
+openmetrics_send_buffer(struct openmetrics_request *req, buffer_t *buffer)
+{
+ ssize_t sent;
+
+ if (buffer->used == 0)
+ return 1;
+
+ sent = o_stream_send(req->output, buffer->data, buffer->used);
+ if (sent < 0)
+ return -1;
+
+ /* Max buffer size is enormous */
+ i_assert((size_t)sent == buffer->used);
+
+ if (o_stream_get_buffer_used_size(req->output) >= IO_BLOCK_SIZE)
+ return 0;
+ return 1;
+}
+
static void openmetrics_export_next(struct openmetrics_request *req)
{
/* Determine what to export next. */
}
}
+static void openmetrics_handle_write_error(struct openmetrics_request *req)
+{
+ i_info("openmetrics: write(%s) failed: %s",
+ o_stream_get_name(req->output),
+ o_stream_get_error(req->output));
+ o_stream_destroy(&req->output);
+}
+
static void openmetrics_request_deinit(struct openmetrics_request *req)
{
stats_metrics_iterate_deinit(&req->stats_iter);
array_free(&req->sub_metric_stack);
}
-static void
-openmetrics_export(struct openmetrics_request *req,
- struct http_server_response *resp)
+static int openmetrics_export(struct openmetrics_request *req)
{
- string_t *out = t_str_new(2048);
int64_t timestamp;
+ string_t *out;
+ int ret;
+ ret = o_stream_flush(req->output);
+ if (ret < 0) {
+ openmetrics_handle_write_error(req);
+ return -1;
+ }
+ if (ret == 0) {
+ /* Output stream buffer needs to be flushed further */
+ return 0;
+ }
+
+ if (req->state == OPENMETRICS_REQUEST_STATE_FINISHED) {
+ /* All metrics were exported already, so we can finish the
+ HTTP request now. */
+ o_stream_destroy(&req->output);
+ return 1;
+ }
+
+ /* Record timestamp for metrics export */
i_assert(ioloop_timeval.tv_usec < 1000000);
timestamp = ((int64_t)ioloop_timeval.tv_sec * 1000 +
(int64_t)ioloop_timeval.tv_usec / 1000);
- while (req->state != OPENMETRICS_REQUEST_STATE_FINISHED)
+ /* Export metrics into a string buffer and write that buffer to the
+ output stream after each (sub-)metric, so that the string buffer
+ stays small. The output stream buffer can grow bigger, but writing is
+ stopped for later resumption when the output stream buffer has grown
+ beyond an optimal size. */
+ out = t_str_new(1024);
+ for (;;) {
+ str_truncate(out, 0);
+
openmetrics_export_continue(req, out, timestamp);
+ ret = openmetrics_send_buffer(req, out);
+ if (ret < 0) {
+ openmetrics_handle_write_error(req);
+ return -1;
+ }
+ if (req->state == OPENMETRICS_REQUEST_STATE_FINISHED) {
+ /* Finished export of metrics, but the output stream
+ buffer may still contain data. */
+ break;
+ }
+ if (ret == 0) {
+ /* Output stream buffer is filled up beyond the optimal
+ size; wait until we can write more. */
+ return ret;
+ }
+ }
+
+ /* Cleanup everything except the output stream */
openmetrics_request_deinit(req);
- http_server_response_set_payload_data(
- resp, str_data(out), str_len(out));
+ /* Finished; flush output */
+ ret = o_stream_finish(req->output);
+ if (ret < 0) {
+ openmetrics_handle_write_error(req);
+ return -1;
+ }
+ return ret;
+}
+
+static void openmetrics_request_destroy(struct openmetrics_request *req)
+{
+ o_stream_destroy(&req->output);
+ openmetrics_request_deinit(req);
}
static void
{
const struct http_request *hreq = http_server_request_get(hsreq);
struct http_server_response *hsresp;
- struct openmetrics_request req;
+ struct openmetrics_request *req;
+ pool_t pool;
if (strcmp(hreq->method, "OPTIONS") == 0) {
hsresp = http_server_response_create(hsreq, 200, "OK");
return;
}
+ pool = http_server_request_get_pool(hsreq);
+ req = p_new(pool, struct openmetrics_request, 1);
+
+ http_server_request_set_destroy_callback(
+ hsreq, openmetrics_request_destroy, req);
+
hsresp = http_server_response_create(hsreq, 200, "OK");
http_server_response_add_header(
hsresp, "Content-Type",
"text/plain; version="OPENMETRICS_CONTENT_VERSION"; "
"charset=utf-8");
- i_zero(&req);
- openmetrics_export(&req, hsresp);
+ req->output = http_server_response_get_payload_output(
+ hsresp, SIZE_MAX, FALSE);
- http_server_response_submit(hsresp);
+ o_stream_set_flush_callback(req->output, openmetrics_export, req);
+ o_stream_set_flush_pending(req->output, TRUE);
}
void stats_service_openmetrics_init(void)