]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
lib/tsocket: add tstream_monitor_send/recv infrastructure
authorStefan Metzmacher <metze@samba.org>
Thu, 15 May 2025 10:01:12 +0000 (12:01 +0200)
committerStefan Metzmacher <metze@samba.org>
Wed, 18 Jun 2025 17:52:37 +0000 (17:52 +0000)
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Volker Lendecke <vl@samba.org>
lib/tsocket/tsocket.c
lib/tsocket/tsocket.h
lib/tsocket/tsocket_internal.h

index 92d8e9272637c4874df5af16b9991f9311f7aed5..68fdb6c3b407080bd6d61e334ef388f06d9e4c86 100644 (file)
@@ -446,8 +446,11 @@ struct tstream_context {
        struct tevent_req *readv_req;
        struct tevent_req *writev_req;
        struct tevent_req *disconnect_req;
+       struct tevent_req *monitor_req;
 };
 
+static void tstream_monitor_disconnect(struct tstream_context *stream, int err);
+
 static int tstream_context_destructor(struct tstream_context *stream)
 {
        if (stream->readv_req) {
@@ -462,6 +465,10 @@ static int tstream_context_destructor(struct tstream_context *stream)
                tevent_req_received(stream->disconnect_req);
        }
 
+       if (stream->monitor_req != NULL) {
+               tevent_req_received(stream->monitor_req);
+       }
+
        return 0;
 }
 
@@ -485,6 +492,7 @@ struct tstream_context *_tstream_context_create(TALLOC_CTX *mem_ctx,
        stream->readv_req       = NULL;
        stream->writev_req      = NULL;
        stream->disconnect_req  = NULL;
+       stream->monitor_req     = NULL;
 
        state = talloc_size(stream, psize);
        if (state == NULL) {
@@ -508,7 +516,16 @@ void *_tstream_context_data(struct tstream_context *stream)
 
 ssize_t tstream_pending_bytes(struct tstream_context *stream)
 {
-       return stream->ops->pending_bytes(stream);
+       ssize_t ret;
+
+       ret = stream->ops->pending_bytes(stream);
+       if (ret == -1) {
+               int sys_errno = errno;
+               tstream_monitor_disconnect(stream, sys_errno);
+               errno = sys_errno;
+       }
+
+       return ret;
 }
 
 struct tstream_readv_state {
@@ -610,6 +627,7 @@ static void tstream_readv_done(struct tevent_req *subreq)
        ret = state->ops->readv_recv(subreq, &sys_errno);
        TALLOC_FREE(subreq);
        if (ret == -1) {
+               tstream_monitor_disconnect(state->stream, sys_errno);
                tevent_req_error(req, sys_errno);
                return;
        }
@@ -732,6 +750,7 @@ static void tstream_writev_done(struct tevent_req *subreq)
 
        ret = state->ops->writev_recv(subreq, &sys_errno);
        if (ret == -1) {
+               tstream_monitor_disconnect(state->stream, sys_errno);
                tevent_req_error(req, sys_errno);
                return;
        }
@@ -832,10 +851,13 @@ static void tstream_disconnect_done(struct tevent_req *subreq)
 
        ret = state->ops->disconnect_recv(subreq, &sys_errno);
        if (ret == -1) {
+               tstream_monitor_disconnect(state->stream, sys_errno);
                tevent_req_error(req, sys_errno);
                return;
        }
 
+       tstream_monitor_disconnect(state->stream, ECONNABORTED);
+
        tevent_req_done(req);
 }
 
@@ -850,3 +872,144 @@ int tstream_disconnect_recv(struct tevent_req *req,
        return ret;
 }
 
+struct tstream_monitor_state {
+       const struct tstream_context_ops *ops;
+       struct tstream_context *stream;
+       struct tevent_req *subreq;
+       int ret;
+};
+
+static void tstream_monitor_cleanup(struct tevent_req *req,
+                                   enum tevent_req_state req_state)
+{
+       struct tstream_monitor_state *state =
+               tevent_req_data(req,
+               struct tstream_monitor_state);
+
+       TALLOC_FREE(state->subreq);
+
+       if (state->stream != NULL) {
+               state->stream->monitor_req = NULL;
+               state->stream = NULL;
+       }
+}
+
+static void tstream_monitor_done(struct tevent_req *subreq);
+
+struct tevent_req *tstream_monitor_send(TALLOC_CTX *mem_ctx,
+                                       struct tevent_context *ev,
+                                       struct tstream_context *stream)
+{
+       struct tevent_req *req = NULL;
+       struct tstream_monitor_state *state = NULL;
+       struct tevent_req *subreq = NULL;
+       ssize_t pending;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct tstream_monitor_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->ops = stream->ops;
+       state->stream = stream;
+       state->ret = -1;
+
+       if (stream->disconnect_req != NULL) {
+               tevent_req_error(req, ECONNABORTED);
+               return tevent_req_post(req, ev);
+       }
+
+       if (stream->monitor_req != NULL) {
+               tevent_req_error(req, EALREADY);
+               return tevent_req_post(req, ev);
+       }
+       stream->monitor_req = req;
+
+       tevent_req_set_cleanup_fn(req, tstream_monitor_cleanup);
+       tevent_req_defer_callback(req, ev);
+
+       if (state->ops->monitor_send != NULL) {
+               subreq = state->ops->monitor_send(state,
+                                                 ev,
+                                                 stream);
+               if (tevent_req_nomem(subreq, req)) {
+                       return tevent_req_post(req, ev);
+               }
+               tevent_req_set_callback(subreq, tstream_monitor_done, req);
+
+               state->subreq = subreq;
+               return req;
+       }
+
+       pending = stream->ops->pending_bytes(stream);
+       if (pending < 0) {
+               tevent_req_error(req, errno);
+               return tevent_req_post(req, ev);
+       }
+
+       /*
+        * just remain forever pending until
+        * tstream_monitor_disconnect() is triggered
+        * in any way.
+        */
+       return req;
+}
+
+static void tstream_monitor_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req =
+               tevent_req_callback_data(subreq,
+               struct tevent_req);
+       struct tstream_monitor_state *state =
+               tevent_req_data(req,
+               struct tstream_monitor_state);
+       ssize_t ret;
+       int sys_errno;
+
+       state->subreq = NULL;
+
+       ret = state->ops->monitor_recv(subreq, &sys_errno);
+       TALLOC_FREE(subreq);
+       if (ret == -1) {
+               tevent_req_error(req, sys_errno);
+               return;
+       }
+
+       state->ret = ret;
+
+       tevent_req_done(req);
+}
+
+static void tstream_monitor_disconnect(struct tstream_context *stream, int err)
+{
+       if (stream == NULL) {
+               return;
+       }
+
+       if (stream->monitor_req == NULL) {
+               return;
+       }
+
+       /*
+        * tevent_req_defer_callback() was used
+        * so it's safe to call
+        */
+       tevent_req_error(stream->monitor_req, err);
+}
+
+int tstream_monitor_recv(struct tevent_req *req, int *perrno)
+{
+       struct tstream_monitor_state *state =
+               tevent_req_data(req,
+               struct tstream_monitor_state);
+       int ret;
+
+       ret = tsocket_simple_int_recv(req, perrno);
+       if (ret == 0) {
+               ret = state->ret;
+       }
+
+       tevent_req_received(req);
+       return ret;
+}
index 07e10c84a8a91c77aa43db273da67b98095d630b..deae754c28e995f3d7cf1339109300866129eb1d 100644 (file)
@@ -472,6 +472,40 @@ struct tevent_req *tstream_disconnect_send(TALLOC_CTX *mem_ctx,
 int tstream_disconnect_recv(struct tevent_req *req,
                            int *perrno);
 
+/**
+ * @brief Monitor the state of the stream
+ *
+ * This waits forever until a connection error happens.
+ *
+ * @param[in]  mem_ctx   The talloc memory context to use.
+ *
+ * @param[in]  ev        The tevent_context to run on.
+ *
+ * @param[in]  stream    The tstream context to work on.
+ *
+ * @return               A 'tevent_req' handle, where the caller can register
+ *                       a callback with tevent_req_set_callback(). NULL on
+ *                       fatal error.
+ */
+struct tevent_req *tstream_monitor_send(TALLOC_CTX *mem_ctx,
+                                       struct tevent_context *ev,
+                                       struct tstream_context *stream);
+
+/**
+ * @brief Get the result of a tstream_monitor_send().
+ *
+ * The caller can only have one outstanding tstream_monitor_send()
+ * at a time otherwise the caller will get *perrno = EBUSY.
+ *
+ * @param[in]  req      The tevent request from tstream_readv_send().
+ *
+ * @param[out] perrno   The error number.
+ *
+ * @return              -1 with perrno set to the actual errno.
+ *                      (0 is never returned!).
+ */
+int tstream_monitor_recv(struct tevent_req *req, int *perrno);
+
 /**
  * @}
  */
index 154b2ce6f890c66b0cdc6643f1bdabfdd5447dbe..d334c99992ce85b498d2339187cc43febf36ad35 100644 (file)
@@ -122,6 +122,14 @@ struct tstream_context_ops {
                                              struct tstream_context *stream);
        int (*disconnect_recv)(struct tevent_req *req,
                               int *perrno);
+
+       /*
+        * Optional
+        */
+       struct tevent_req *(*monitor_send)(TALLOC_CTX *mem_ctx,
+                                          struct tevent_context *ev,
+                                          struct tstream_context *stream);
+       int (*monitor_recv)(struct tevent_req *req, int *perrno);
 };
 
 struct tstream_context *_tstream_context_create(TALLOC_CTX *mem_ctx,