struct tevent_req *readv_req;
struct tevent_req *writev_req;
struct tevent_req *disconnect_req;
+ struct tevent_req *monitor_req;
};
+static void tstream_monitor_disconnect(struct tstream_context *stream, int err);
+
static int tstream_context_destructor(struct tstream_context *stream)
{
if (stream->readv_req) {
tevent_req_received(stream->disconnect_req);
}
+ if (stream->monitor_req != NULL) {
+ tevent_req_received(stream->monitor_req);
+ }
+
return 0;
}
stream->readv_req = NULL;
stream->writev_req = NULL;
stream->disconnect_req = NULL;
+ stream->monitor_req = NULL;
state = talloc_size(stream, psize);
if (state == NULL) {
ssize_t tstream_pending_bytes(struct tstream_context *stream)
{
- return stream->ops->pending_bytes(stream);
+ ssize_t ret;
+
+ ret = stream->ops->pending_bytes(stream);
+ if (ret == -1) {
+ int sys_errno = errno;
+ tstream_monitor_disconnect(stream, sys_errno);
+ errno = sys_errno;
+ }
+
+ return ret;
}
struct tstream_readv_state {
ret = state->ops->readv_recv(subreq, &sys_errno);
TALLOC_FREE(subreq);
if (ret == -1) {
+ tstream_monitor_disconnect(state->stream, sys_errno);
tevent_req_error(req, sys_errno);
return;
}
ret = state->ops->writev_recv(subreq, &sys_errno);
if (ret == -1) {
+ tstream_monitor_disconnect(state->stream, sys_errno);
tevent_req_error(req, sys_errno);
return;
}
ret = state->ops->disconnect_recv(subreq, &sys_errno);
if (ret == -1) {
+ tstream_monitor_disconnect(state->stream, sys_errno);
tevent_req_error(req, sys_errno);
return;
}
+ tstream_monitor_disconnect(state->stream, ECONNABORTED);
+
tevent_req_done(req);
}
return ret;
}
+struct tstream_monitor_state {
+ const struct tstream_context_ops *ops;
+ struct tstream_context *stream;
+ struct tevent_req *subreq;
+ int ret;
+};
+
+static void tstream_monitor_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct tstream_monitor_state *state =
+ tevent_req_data(req,
+ struct tstream_monitor_state);
+
+ TALLOC_FREE(state->subreq);
+
+ if (state->stream != NULL) {
+ state->stream->monitor_req = NULL;
+ state->stream = NULL;
+ }
+}
+
+static void tstream_monitor_done(struct tevent_req *subreq);
+
+struct tevent_req *tstream_monitor_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tstream_context *stream)
+{
+ struct tevent_req *req = NULL;
+ struct tstream_monitor_state *state = NULL;
+ struct tevent_req *subreq = NULL;
+ ssize_t pending;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct tstream_monitor_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->ops = stream->ops;
+ state->stream = stream;
+ state->ret = -1;
+
+ if (stream->disconnect_req != NULL) {
+ tevent_req_error(req, ECONNABORTED);
+ return tevent_req_post(req, ev);
+ }
+
+ if (stream->monitor_req != NULL) {
+ tevent_req_error(req, EALREADY);
+ return tevent_req_post(req, ev);
+ }
+ stream->monitor_req = req;
+
+ tevent_req_set_cleanup_fn(req, tstream_monitor_cleanup);
+ tevent_req_defer_callback(req, ev);
+
+ if (state->ops->monitor_send != NULL) {
+ subreq = state->ops->monitor_send(state,
+ ev,
+ stream);
+ if (tevent_req_nomem(subreq, req)) {
+ return tevent_req_post(req, ev);
+ }
+ tevent_req_set_callback(subreq, tstream_monitor_done, req);
+
+ state->subreq = subreq;
+ return req;
+ }
+
+ pending = stream->ops->pending_bytes(stream);
+ if (pending < 0) {
+ tevent_req_error(req, errno);
+ return tevent_req_post(req, ev);
+ }
+
+ /*
+ * just remain forever pending until
+ * tstream_monitor_disconnect() is triggered
+ * in any way.
+ */
+ return req;
+}
+
+static void tstream_monitor_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req =
+ tevent_req_callback_data(subreq,
+ struct tevent_req);
+ struct tstream_monitor_state *state =
+ tevent_req_data(req,
+ struct tstream_monitor_state);
+ ssize_t ret;
+ int sys_errno;
+
+ state->subreq = NULL;
+
+ ret = state->ops->monitor_recv(subreq, &sys_errno);
+ TALLOC_FREE(subreq);
+ if (ret == -1) {
+ tevent_req_error(req, sys_errno);
+ return;
+ }
+
+ state->ret = ret;
+
+ tevent_req_done(req);
+}
+
+static void tstream_monitor_disconnect(struct tstream_context *stream, int err)
+{
+ if (stream == NULL) {
+ return;
+ }
+
+ if (stream->monitor_req == NULL) {
+ return;
+ }
+
+ /*
+ * tevent_req_defer_callback() was used
+ * so it's safe to call
+ */
+ tevent_req_error(stream->monitor_req, err);
+}
+
+int tstream_monitor_recv(struct tevent_req *req, int *perrno)
+{
+ struct tstream_monitor_state *state =
+ tevent_req_data(req,
+ struct tstream_monitor_state);
+ int ret;
+
+ ret = tsocket_simple_int_recv(req, perrno);
+ if (ret == 0) {
+ ret = state->ret;
+ }
+
+ tevent_req_received(req);
+ return ret;
+}
int tstream_disconnect_recv(struct tevent_req *req,
int *perrno);
+/**
+ * @brief Monitor the state of the stream
+ *
+ * This waits forever until a connection error happens.
+ *
+ * @param[in] mem_ctx The talloc memory context to use.
+ *
+ * @param[in] ev The tevent_context to run on.
+ *
+ * @param[in] stream The tstream context to work on.
+ *
+ * @return A 'tevent_req' handle, where the caller can register
+ * a callback with tevent_req_set_callback(). NULL on
+ * fatal error.
+ */
+struct tevent_req *tstream_monitor_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tstream_context *stream);
+
+/**
+ * @brief Get the result of a tstream_monitor_send().
+ *
+ * The caller can only have one outstanding tstream_monitor_send()
+ * at a time otherwise the caller will get *perrno = EBUSY.
+ *
+ * @param[in] req The tevent request from tstream_readv_send().
+ *
+ * @param[out] perrno The error number.
+ *
+ * @return -1 with perrno set to the actual errno.
+ * (0 is never returned!).
+ */
+int tstream_monitor_recv(struct tevent_req *req, int *perrno);
+
/**
* @}
*/