int repl = 0;
unsigned int maj_ver, min_ver;
int prev_state;
+ int msg_done = 0;
if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) {
co_skip(sc_oc(sc), co_data(sc_oc(sc)));
applet_wont_consume(appctx);
goto out;
}
+
+ /* check if we've already hit the rx limit (i.e. we've
+ * already gone through send_msgs and we don't want to
+ * process input messages again). We must absolutely
+ * leave via send_msgs otherwise we can leave the
+ * connection in a stuck state if acks are missing for
+ * example.
+ */
+ if (msg_done >= peers_max_updates_at_once) {
+ applet_have_more_data(appctx); // make sure to come back here
+ goto send_msgs;
+ }
+
applet_will_consume(appctx);
/* local peer is assigned of a lesson, start it */
/* skip consumed message */
co_skip(sc_oc(sc), totl);
+
+ /* make sure we don't process too many at once */
+ if (msg_done >= peers_max_updates_at_once)
+ goto send_msgs;
+ msg_done++;
+
/* loop on that state to peek next message */
goto switchstate;