]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: stream: move the conn_stream specific calls to the stream-int
authorWilly Tarreau <w@1wt.eu>
Sat, 17 Nov 2018 18:51:07 +0000 (19:51 +0100)
committerWilly Tarreau <w@1wt.eu>
Sat, 17 Nov 2018 18:53:45 +0000 (19:53 +0100)
There are still some unwelcome synchronous calls to si_cs_recv() in
process_stream(). Let's have a new function si_sync_recv() to perform
a synchronous receive call on a stream interface regardless of the type
of its endpoint, and move these calls there. For now it only implements
conn_streams since it doesn't seem useful to support applets there. The
function implements an extra check for the stream interface to be in an
established state before attempting anything.

include/proto/stream_interface.h
src/stream.c

index 7fc6d7e21d125b63239b2d70d0952900164b3ffb..79cba856271b9fdeec88c195a5ed9564a6be9f53 100644 (file)
@@ -381,6 +381,33 @@ static inline void si_chk_rcv(struct stream_interface *si)
        si->ops->chk_rcv(si);
 }
 
+/* This tries to perform a synchronous receive on the stream interface to
+ * try to collect last arrived data. In practice it's only implemented on
+ * conn_streams. Returns 0 if nothing was done, non-zero if new data or a
+ * shutdown were collected. This may result on some delayed receive calls
+ * to be programmed and performed later, though it doesn't provide any
+ * such guarantee.
+ */
+static inline int si_sync_recv(struct stream_interface *si)
+{
+       struct conn_stream *cs;
+
+       if (si->state != SI_ST_EST)
+               return 0;
+
+       cs = objt_cs(si->end);
+       if (!cs)
+               return 0; // only conn_streams are supported
+
+       if (si->wait_event.wait_reason & SUB_CAN_RECV)
+               return 0; // already subscribed
+
+       if (si->flags & SI_FL_WAIT_ROOM && c_size(si_ic(si)))
+               return 0; // already failed
+
+       return si_cs_recv(cs);
+}
+
 /* Calls chk_snd on the connection using the data layer */
 static inline void si_chk_snd(struct stream_interface *si)
 {
index 6e73f8c710044c8ba849a98b27abe21e6ca64206..e9270bdacc01209fd97d98e8103254bbf90c2800 100644 (file)
@@ -1685,7 +1685,6 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
        unsigned int req_ana_back;
        struct channel *req, *res;
        struct stream_interface *si_f, *si_b;
-       struct conn_stream *cs;
 
        activity[tid].stream++;
 
@@ -1696,15 +1695,9 @@ struct task *process_stream(struct task *t, void *context, unsigned short state)
        si_b = &s->si[1];
 
        /* First, attempt to receive pending data from I/O layers */
-       cs = objt_cs(si_f->end);
-       if (cs && !(si_f->wait_event.wait_reason & SUB_CAN_RECV) &&
-           (!(si_f->flags & SI_FL_WAIT_ROOM) || !c_size(req)))
-               si_cs_recv(cs);
-
-       cs = objt_cs(si_b->end);
-       if (cs && !(si_b->wait_event.wait_reason & SUB_CAN_RECV) &&
-           (!(si_b->flags & SI_FL_WAIT_ROOM) || !c_size(res)))
-               si_cs_recv(cs);
+       si_sync_recv(si_f);
+       si_sync_recv(si_b);
+
 redo:
 
        //DPRINTF(stderr, "%s:%d: cs=%d ss=%d(%d) rqf=0x%08x rpf=0x%08x\n", __FUNCTION__, __LINE__,