]> git.ipfire.org Git - thirdparty/bird.git/commitdiff
Lib: Publish/subscribe queues
authorOndrej Zajicek <santiago@crfreenet.org>
Tue, 16 Sep 2025 13:34:44 +0000 (15:34 +0200)
committerMaria Matejka <mq@ucw.cz>
Wed, 13 May 2026 10:31:04 +0000 (12:31 +0200)
Implement a publish/subscribe messaging system with dynamic topic
management and resource tracking. The system allows multiple publishers
to send messages to named topics, which are then distributed to all
subscribers of those topics. Publishers and subscribers are managed as
resources and and automatically cleaned up when destroyed.

lib/Doc
lib/Makefile
lib/pubsub.c [new file with mode: 0644]
lib/pubsub.h [new file with mode: 0644]

diff --git a/lib/Doc b/lib/Doc
index 3877f3a38c05e551f09de1458a433d11fea4fc1a..238d08487d85171e1c946c4b7750b18346d30660 100644 (file)
--- a/lib/Doc
+++ b/lib/Doc
@@ -9,4 +9,5 @@ S resource.c
 S mempool.c
 S slab.c
 S event.c
+S pubsub.c
 S ../sysdep/unix/io.c
index 21aad3784b22a20431a2bc9ca93c25f7dbadadc7..216425659966f1961031acddba35e5a8580eb56b 100644 (file)
@@ -1,4 +1,4 @@
-src := bitmap.c bitops.c blake2s.c blake2b.c checksum.c event.c evpn.c flowspec.c idm.c ip.c lists.c mac.c md5.c mempool.c net.c patmatch.c printf.c resource.c sha1.c sha256.c sha512.c slab.c slists.c strtoul.c tbf.c timer.c xmalloc.c
+src := bitmap.c bitops.c blake2s.c blake2b.c checksum.c event.c evpn.c flowspec.c idm.c ip.c lists.c mac.c md5.c mempool.c net.c patmatch.c printf.c pubsub.c resource.c sha1.c sha256.c sha512.c slab.c slists.c strtoul.c tbf.c timer.c xmalloc.c
 obj := $(src-o-files)
 $(all-daemon)
 
diff --git a/lib/pubsub.c b/lib/pubsub.c
new file mode 100644 (file)
index 0000000..7adf5af
--- /dev/null
@@ -0,0 +1,257 @@
+/*
+ *     BIRD Library -- Publish/Subscribe Queue
+ *
+ *     (c) 2025 Ondrej Zajicek <santiago@crfreenet.org>
+ *     (c) 2025 CZ.NIC z.s.p.o.
+ *
+ *     Can be freely distributed and used under the terms of the GNU GPL.
+ */
+
+/**
+ * DOC: Publish/Subscribe Queues
+ *
+ * BIRD implements a publish/subscribe messaging system with dynamic topic
+ * management and resource tracking. The system allows multiple publishers to
+ * send messages to named topics, which are then distributed to all subscribers
+ * of those topics.
+ *
+ * The system is built around four main components: queues, topics, publishers,
+ * and subscribers. A &ps_queue serves as the central coordination point,
+ * maintaining list of topics. Topics are created dynamically when first
+ * referenced and can have multiple publishers and subscribers attached.
+ *
+ * Publishers and subscribers are implemented as managed resources. Each
+ * publisher or subscriber can be attached to only one topic. When publishers or
+ * subscribers are destroyed, they automatically detach from their associated
+ * topics.
+ *
+ * The ps_init_queue() function initializes a new message queue with a given
+ * name and memory pool. Topics are created on-demand through ps_get_topic().
+ * Publishers attach to topics using ps_attach() and can send messages via
+ * ps_publish(), which sends notification to all subscribers. Subscribers use
+ * ps_subscribe() to register for topic updates. When a subscriber joins
+ * a topic with attached publishers, these publishers are notified of the new
+ * subscription through their subscribe hooks.
+ */
+
+#undef LOCAL_DEBUG
+
+#include "nest/bird.h"
+
+#include "lib/event.h"
+#include "lib/resource.h"
+#include "lib/pubsub.h"
+
+static void ps_publisher_free(resource *r);
+static void ps_publisher_dump(struct dump_request *dreq, resource *r);
+static void ps_subscriber_free(resource *r);
+static void ps_subscriber_dump(struct dump_request *dreq, resource *r);
+static void ps_event_loop(void *ptr);
+
+
+static struct resclass ps_publisher_class = {
+  .name = "Publisher",
+  .size = sizeof(ps_publisher),
+  .free = ps_publisher_free,
+  .dump = ps_publisher_dump,
+};
+
+static struct resclass ps_subscriber_class = {
+  .name = "Subscriber",
+  .size = sizeof(ps_subscriber),
+  .free = ps_subscriber_free,
+  .dump = ps_subscriber_dump,
+};
+
+
+void
+ps_init_queue(ps_queue *q, pool *p, const char *name)
+{
+  q->name = name;
+  q->pool = rp_new(p, name);
+  q->event = ev_new_init(q->pool, ps_event_loop, q);
+  init_list(&q->topics);
+  init_list(&q->topics_pending);
+}
+
+ps_topic *
+ps_get_topic(ps_queue *q, const char *name)
+{
+  ps_topic *t;
+
+  WALK_LIST(t, q->topics)
+    if (!strcmp(t->name, name))
+      return t;
+
+  WALK_LIST(t, q->topics_pending)
+    if (!strcmp(t->name, name))
+      return t;
+
+  t = mb_allocz(q->pool, sizeof(struct ps_topic));
+  strncpy(t->name, name, sizeof(t->name)-1);
+
+  init_list(&t->publishers);
+  init_list(&t->subscribers);
+  add_tail(&q->topics, &t->n);
+
+  DBG("%s: New topic '%s', total %u\n",
+      q->name, t->name, list_length(&q->topics) + list_length(&q->topics_pending));
+
+  return t;
+}
+
+ps_publisher *
+ps_publisher_new(pool *p, void (*subscribe_hook)(struct ps_publisher *), void *data)
+{
+  ps_publisher *pub = ralloc(p, &ps_publisher_class);
+  pub->subscribe_hook = subscribe_hook;
+  pub->data = data;
+  return pub;
+}
+
+static void
+ps_publisher_free(resource *r)
+{
+  ps_publisher *pub = (void *) r;
+
+  if (pub->topic)
+    ps_detach(pub);
+}
+
+static void
+ps_publisher_dump(struct dump_request *dreq, resource *r)
+{
+  ps_publisher *pub = (void *) r;
+
+  RDUMP("(queue %p, topic '%s')", pub->queue, pub->topic ? pub->topic->name : "NULL");
+  RDUMP("(subscribe_hook %p, data %p)", pub->subscribe_hook, pub->data);
+}
+
+void
+ps_attach(ps_publisher *pub, ps_queue *q, ps_topic *t)
+{
+  ASSERT(!pub->queue && !pub->topic);
+
+  pub->queue = q;
+  pub->topic = t;
+  add_tail(&t->publishers, &pub->n);
+
+  DBG("%s: Publisher %p added to topic '%s', total %u\n",
+      q->name, pub, t->name, list_length(&t->publishers));
+}
+
+void
+ps_detach(ps_publisher *pub)
+{
+  ASSERT(pub->queue && pub->topic);
+  ps_queue *q UNUSED = pub->queue;
+  ps_topic *t UNUSED = pub->topic;
+
+  pub->queue = NULL;
+  pub->topic = NULL;
+  rem_node(&pub->n);
+
+  DBG("%s: Publisher %p removed from topic '%s', total %u\n",
+      q->name, pub, t->name, list_length(&t->publishers));
+}
+
+void
+ps_publish(ps_publisher *pub, void *msg, uint length)
+{
+  ASSERT(pub->queue && pub->topic);
+  ps_topic *t = pub->topic;
+
+  DBG("%s: Message from publisher %p on topic '%s', notifying %u subscribers\n",
+      pub->queue->name, pub, t->name, list_length(&t->subscribers));
+
+  /* Ping subscribers */
+  ps_subscriber *sub; node *n;
+  WALK_LIST2(sub, n, t->subscribers, n)
+    sub->notify_hook(sub, msg, length);
+}
+
+
+ps_subscriber *
+ps_subscriber_new(pool *p, void (*notify_hook)(struct ps_subscriber *, void *, uint), void *data)
+{
+  ps_subscriber *sub = ralloc(p, &ps_subscriber_class);
+  sub->notify_hook = notify_hook;
+  sub->data = data;
+  return sub;
+}
+
+static void
+ps_subscriber_free(resource *r)
+{
+  ps_subscriber *sub = (void *) r;
+
+  if (sub->topic)
+    ps_unsubscribe(sub);
+}
+
+static void
+ps_subscriber_dump(struct dump_request *dreq, resource *r)
+{
+  ps_subscriber *sub = (void *) r;
+
+  RDUMP("(queue %p, topic '%s')", sub->queue, sub->topic ? sub->topic->name : "NULL");
+  RDUMP("(notify_hook %p, data %p)", sub->notify_hook, sub->data);
+}
+
+void
+ps_subscribe(ps_subscriber *sub, ps_queue *q, ps_topic *t)
+{
+  ASSERT(!sub->queue && !sub->topic);
+
+  sub->queue = q;
+  sub->topic = t;
+  add_tail(&t->subscribers, &sub->n);
+
+  DBG("%s: Subscriber %p added to topic '%s', total %u\n",
+      q->name, sub, t->name, list_length(&t->subscribers));
+
+  if (EMPTY_LIST(t->publishers))
+    return;
+
+  /* Ping publishers */
+  rem_node(&t->n);
+  add_tail(&q->topics_pending, &t->n);
+
+  if (!ev_active(q->event))
+    ev_schedule(q->event);
+}
+
+void
+ps_unsubscribe(ps_subscriber *sub)
+{
+  ASSERT(sub->queue && sub->topic);
+  ps_queue *q UNUSED = sub->queue;
+  ps_topic *t UNUSED = sub->topic;
+
+  sub->queue = NULL;
+  sub->topic = NULL;
+  rem_node(&sub->n);
+
+  DBG("%s: Subscriber %p removed from topic '%s', total %u\n",
+      q->name, sub, t->name, list_length(&t->subscribers));
+}
+
+static void
+ps_event_loop(void *ptr)
+{
+  ps_queue *q = ptr;
+
+  ps_topic *t;
+  WALK_LIST_FIRST(t, q->topics_pending)
+  {
+    DBG("%s: Subscription change on topic '%s', notifying %u publishers\n",
+       q->name, t->name, list_length(&t->publishers));
+
+    rem_node(&t->n);
+    add_tail(&q->topics, &t->n);
+
+    struct ps_publisher *pub; node *n;
+    WALK_LIST2(pub, n, t->publishers, n)
+      CALL(pub->subscribe_hook, pub);
+  }
+}
diff --git a/lib/pubsub.h b/lib/pubsub.h
new file mode 100644 (file)
index 0000000..d5be1e0
--- /dev/null
@@ -0,0 +1,73 @@
+/*
+ *     BIRD Library -- Publish/Subscribe Queue
+ *
+ *     (c) 2025 Ondrej Zajicek <santiago@crfreenet.org>
+ *     (c) 2025 CZ.NIC z.s.p.o.
+ *
+ *     Can be freely distributed and used under the terms of the GNU GPL.
+ */
+
+#ifndef _BIRD_PUBSUB_H_
+#define _BIRD_PUBSUB_H_
+
+#include "lib/event.h"
+#include "lib/lists.h"
+#include "lib/resource.h"
+
+typedef struct ps_queue
+{
+  const char *name;
+  pool *pool;
+  event *event;
+  list topics;
+  list topics_pending;
+} ps_queue;
+
+typedef struct ps_topic
+{
+  node n;
+  char name[16];
+  list publishers;
+  list subscribers;
+} ps_topic;
+
+typedef struct ps_publisher
+{
+  resource r;
+  node n;
+  void (*subscribe_hook)(struct ps_publisher *);
+  void *data;
+  struct ps_queue *queue;
+  struct ps_topic *topic;
+} ps_publisher;
+
+typedef struct ps_subscriber
+{
+  resource r;
+  node n;
+  void (*notify_hook)(struct ps_subscriber *, void *, uint);
+  void *data;
+  struct ps_queue *queue;
+  struct ps_topic *topic;
+} ps_subscriber;
+
+
+void ps_init_queue(ps_queue *q, pool *p, const char *name);
+ps_topic *ps_get_topic(ps_queue *q, const char *name);
+
+ps_publisher *ps_publisher_new(pool *p, void (*subscribe_hook)(struct ps_publisher *), void *data);
+void ps_attach(ps_publisher *pub, ps_queue *q, ps_topic *t);
+void ps_detach(ps_publisher *pub);
+void ps_publish(ps_publisher *pub, void *msg, uint length);
+
+static inline void ps_attach_topic(ps_publisher *pub, ps_queue *q, const char *name)
+{ ps_attach(pub, q, ps_get_topic(q, name)); }
+
+ps_subscriber * ps_subscriber_new(pool *p, void (*notify_hook)(struct ps_subscriber *, void *, uint), void *data);
+void ps_subscribe(ps_subscriber *sub, ps_queue *q, ps_topic *t);
+void ps_unsubscribe(ps_subscriber *sub);
+
+static inline void ps_subscribe_topic(ps_subscriber *sub, ps_queue *q, const char *name)
+{ ps_subscribe(sub, q, ps_get_topic(q, name)); }
+
+#endif