}
+/* The pcli_* functions are used for the CLI proxy in the master */
+
static enum obj_type *pcli_pid_to_server(int proc_pid)
{
struct mworker_proc *child;
return -1;
}
+/* Parse the CLI request:
+ *
+ * - it can rewrite the buffer by trimming the prefix
+ * - fill dst with the destination server if there is one
+ *
+ * Return:
+ * - the amount of data to forward or
+ * - -1 if there is no end to the command or
+ * - 0 everything has been trimmed (only a prefix)
+ */
+#define PCLI_REQ_INIT 0
+#define PCLI_REQ_PFX 1
+#define PCLI_REQ_TRIM 2
+#define PCLI_REQ_CMD 3
+
+int pcli_parse_request(struct channel *req, int *target_pid)
+{
+ char *input = (char *)ci_head(req);
+ const char *end;
+ char *ptr, *trim = NULL, *pfx_b = NULL, *cmd_b = NULL;
+ struct buffer *buf = &req->buf;
+ int ret = 0;
+ int state = PCLI_REQ_INIT;
+
+ ptr = input;
+ end = b_stop(buf);
+
+ /* The while loop condition is checking the end of the command.
+ It is needed to iterate for each ptr++ done in the parser */
+ while (ptr < end && *ptr != '\n' && *ptr != '\r' && *ptr != ';') {
+ switch (state) {
+ /* The init state only trims the useless chars */
+ case PCLI_REQ_INIT:
+
+ /* skip every spaces at the start of the command */
+ if (*ptr == ' ') {
+ ptr++;
+ continue;
+ }
+ pfx_b = ptr; /* this is the start of the command or of the @ prefix */
+ state = PCLI_REQ_PFX;
+
+ /* the atprefix state looks for a @ prefix. If it finds
+ it, it will check to which server send the request.
+ It also ajust the trim pointer */
+ case PCLI_REQ_PFX:
+
+ if (*pfx_b != '@') {
+ /* there is no prefix */
+ pfx_b = NULL;
+ cmd_b = ptr;
+ state = PCLI_REQ_CMD;
+ continue;
+ }
+
+ if (*ptr != ' ') {
+ ptr++;
+ continue;
+ }
+ *ptr = '\0'; /* this the end of the prefix */
+ ptr++;
+ trim = ptr;
+ state = PCLI_REQ_TRIM;
+ break;
+
+ /* we really need to trim there because that's the only
+ way to know if we are going to send a command or if
+ there is only a prefix */
+ case PCLI_REQ_TRIM:
+ if (*ptr == ' ') {
+ ptr++;
+ continue;
+ }
+ cmd_b = trim = ptr;
+ state = PCLI_REQ_CMD;
+
+ /* just look for the end of the command */
+ case PCLI_REQ_CMD:
+ ptr++;
+ continue;
+ }
+ }
+
+ /* we didn't find a command separator, not enough data */
+ if (ptr >= end)
+ return -1;
+
+ if (!pfx_b && !cmd_b) {
+ /* probably just a \n or a ; */
+ return 1;
+ } else if (pfx_b && !cmd_b) {
+ /* it's only a prefix, we don't want to forward it */
+ *ptr = '\0';
+ trim = ptr + 1; /* we want to trim the whole command */
+ ret = 0;
+ } else if (cmd_b) {
+ /* command without a prefix */
+ *ptr = '\n';
+ ret = ptr - cmd_b + 1;
+ }
+
+ if (pfx_b)
+ *target_pid = pcli_prefix_to_pid(pfx_b);
+
+ /* trim the useless chars */
+ if (trim)
+ b_del(&req->buf, trim - input);
+
+ return ret;
+}
+
+int pcli_wait_for_request(struct stream *s, struct channel *req, int an_bit)
+{
+ int target_pid;
+ int to_forward;
+
+ target_pid = s->pcli_next_pid;
+
+read_again:
+ /* if the channel is closed for read, we won't receive any more data
+ from the client, but we don't want to forward this close to the
+ server */
+ channel_dont_close(req);
+
+ /* We don't know yet to which server we will connect */
+ channel_dont_connect(req);
+
+
+ /* we are not waiting for a response, there is no more request and we
+ * receive a close from the client, we can leave */
+ if (!(ci_data(req)) && req->flags & CF_SHUTR) {
+ channel_shutw_now(&s->res);
+ s->req.analysers &= ~AN_REQ_WAIT_CLI;
+ return 1;
+ }
+
+ req->flags |= CF_READ_DONTWAIT;
+
+ /* need more data */
+ if (!ci_data(req))
+ return 0;
+
+ /* If there is data available for analysis, log the end of the idle time. */
+ if (c_data(req) && s->logs.t_idle == -1)
+ s->logs.t_idle = tv_ms_elapsed(&s->logs.tv_accept, &now) - s->logs.t_handshake;
+
+ to_forward = pcli_parse_request(req, &target_pid);
+ if (to_forward > 0) {
+ /* enough data */
+
+ /* we didn't find the process, send an error and close */
+ if (target_pid < 0) {
+ pcli_reply_and_close(s, "Can't find the target CLI!\n");
+ return 0;
+ }
+
+ /* forward only 1 command */
+ channel_forward(req, to_forward);
+ /* we send only 1 command per request, and we write close after it */
+ channel_shutw_now(req);
+
+ /* remove the XFER_DATA analysers, which forwards all
+ * the data, we don't want to forward the next requests
+ * We need to add CF_FLT_ANALYZE to abort the forward too.
+ */
+ req->analysers &= ~(AN_REQ_FLT_XFER_DATA|AN_REQ_WAIT_CLI);
+ req->analysers |= AN_REQ_FLT_END|CF_FLT_ANALYZE;
+ s->res.analysers |= AN_RES_WAIT_CLI;
+
+ /* we can connect now */
+ s->target = pcli_pid_to_server(target_pid);
+ if (!s->target) {
+ s->target = &cli_applet.obj_type;
+ }
+
+ s->flags |= (SF_DIRECT | SF_ASSIGNED);
+ channel_auto_connect(req);
+
+ } else if (to_forward == 0) {
+ /* we only received a prefix without command, which
+ mean that we want to store it for every other
+ command for this session */
+ if (target_pid > -1) {
+ s->pcli_next_pid = target_pid;
+ // TODO: pcli_reply the prompt
+ } else {
+ // TODO: pcli_reply() error
+ s->pcli_next_pid = 0;
+ }
+
+ /* we trimmed things but we might have other commands to consume */
+ goto read_again;
+ } else if (to_forward == -1 && channel_full(req, global.tune.maxrewrite)) {
+ /* buffer is full and we didn't catch the end of a command */
+ goto send_help;
+ }
+
+ return 0;
+
+send_help:
+ b_reset(&req->buf);
+ b_putblk(&req->buf, "help\n", 5);
+ goto read_again;
+}
+
+int pcli_wait_for_response(struct stream *s, struct channel *rep, int an_bit)
+{
+ struct proxy *fe = strm_fe(s);
+ struct proxy *be = s->be;
+
+ rep->flags |= CF_READ_DONTWAIT; /* try to get back here ASAP */
+ rep->flags |= CF_NEVER_WAIT;
+
+ /* don't forward the close */
+ channel_dont_close(&s->res);
+ channel_dont_close(&s->req);
+
+ /* forward the data */
+ if (ci_data(rep)) {
+ c_adv(rep, ci_data(rep));
+ return 0;
+ }
+
+ if ((rep->flags & (CF_SHUTR|CF_READ_NULL))) {
+ /* stream cleanup */
+
+ s->si[1].flags |= SI_FL_NOLINGER | SI_FL_NOHALF;
+ si_shutr(&s->si[1]);
+ si_shutw(&s->si[1]);
+
+ /*
+ * starting from there this the same code as
+ * http_end_txn_clean_session().
+ *
+ * It allows to do frontend keepalive while reconnecting to a
+ * new server for each request.
+ */
+
+ if (s->flags & SF_BE_ASSIGNED) {
+ HA_ATOMIC_SUB(&be->beconn, 1);
+ if (unlikely(s->srv_conn))
+ sess_change_server(s, NULL);
+ }
+
+ s->logs.t_close = tv_ms_elapsed(&s->logs.tv_accept, &now);
+ stream_process_counters(s);
+
+ /* don't count other requests' data */
+ s->logs.bytes_in -= ci_data(&s->req);
+ s->logs.bytes_out -= ci_data(&s->res);
+
+ /* we may need to know the position in the queue */
+ pendconn_free(s);
+
+ /* let's do a final log if we need it */
+ if (!LIST_ISEMPTY(&fe->logformat) && s->logs.logwait &&
+ !(s->flags & SF_MONITOR) &&
+ (!(fe->options & PR_O_NULLNOLOG) || s->req.total)) {
+ s->do_log(s);
+ }
+
+ /* stop tracking content-based counters */
+ stream_stop_content_counters(s);
+ stream_update_time_stats(s);
+
+ s->logs.accept_date = date; /* user-visible date for logging */
+ s->logs.tv_accept = now; /* corrected date for internal use */
+ s->logs.t_handshake = 0; /* There are no handshake in keep alive connection. */
+ s->logs.t_idle = -1;
+ tv_zero(&s->logs.tv_request);
+ s->logs.t_queue = -1;
+ s->logs.t_connect = -1;
+ s->logs.t_data = -1;
+ s->logs.t_close = 0;
+ s->logs.prx_queue_pos = 0; /* we get the number of pending conns before us */
+ s->logs.srv_queue_pos = 0; /* we will get this number soon */
+
+ s->logs.bytes_in = s->req.total = ci_data(&s->req);
+ s->logs.bytes_out = s->res.total = ci_data(&s->res);
+
+ stream_del_srv_conn(s);
+ if (objt_server(s->target)) {
+ if (s->flags & SF_CURR_SESS) {
+ s->flags &= ~SF_CURR_SESS;
+ HA_ATOMIC_SUB(&objt_server(s->target)->cur_sess, 1);
+ }
+ if (may_dequeue_tasks(objt_server(s->target), be))
+ process_srv_queue(objt_server(s->target));
+ }
+
+ s->target = NULL;
+
+ /* only release our endpoint if we don't intend to reuse the
+ * connection.
+ */
+ if (!si_conn_ready(&s->si[1])) {
+ si_release_endpoint(&s->si[1]);
+ s->srv_conn = NULL;
+ }
+
+ s->si[1].state = s->si[1].prev_state = SI_ST_INI;
+ s->si[1].err_type = SI_ET_NONE;
+ s->si[1].conn_retries = 0; /* used for logging too */
+ s->si[1].exp = TICK_ETERNITY;
+ s->si[1].flags &= SI_FL_ISBACK | SI_FL_DONT_WAKE; /* we're in the context of process_stream */
+ s->req.flags &= ~(CF_SHUTW|CF_SHUTW_NOW|CF_AUTO_CONNECT|CF_WRITE_ERROR|CF_STREAMER|CF_STREAMER_FAST|CF_NEVER_WAIT|CF_WAKE_CONNECT|CF_WROTE_DATA);
+ s->res.flags &= ~(CF_SHUTR|CF_SHUTR_NOW|CF_READ_ATTACHED|CF_READ_ERROR|CF_READ_NOEXP|CF_STREAMER|CF_STREAMER_FAST|CF_WRITE_PARTIAL|CF_NEVER_WAIT|CF_WROTE_DATA);
+ s->flags &= ~(SF_DIRECT|SF_ASSIGNED|SF_ADDR_SET|SF_BE_ASSIGNED|SF_FORCE_PRST|SF_IGNORE_PRST);
+ s->flags &= ~(SF_CURR_SESS|SF_REDIRECTABLE|SF_SRV_REUSED);
+ s->flags &= ~(SF_ERR_MASK|SF_FINST_MASK|SF_REDISP);
+ /* reinitialise the current rule list pointer to NULL. We are sure that
+ * any rulelist match the NULL pointer.
+ */
+ s->current_rule_list = NULL;
+
+ s->be = strm_fe(s);
+ s->logs.logwait = strm_fe(s)->to_log;
+ s->logs.level = 0;
+ stream_del_srv_conn(s);
+ s->target = NULL;
+ /* re-init store persistence */
+ s->store_count = 0;
+ s->uniq_id = global.req_count++;
+
+ s->req.flags |= CF_READ_DONTWAIT; /* one read is usually enough */
+
+ s->req.flags |= CF_WAKE_ONCE; /* need to be called again if there is some command left in the request */
+
+ s->req.analysers |= AN_REQ_WAIT_CLI;
+ s->res.analysers &= ~AN_RES_WAIT_CLI;
+
+ /* We must trim any excess data from the response buffer, because we
+ * may have blocked an invalid response from a server that we don't
+ * want to accidentely forward once we disable the analysers, nor do
+ * we want those data to come along with next response. A typical
+ * example of such data would be from a buggy server responding to
+ * a HEAD with some data, or sending more than the advertised
+ * content-length.
+ */
+ if (unlikely(ci_data(&s->res)))
+ b_set_data(&s->res.buf, co_data(&s->res));
+
+ /* Now we can realign the response buffer */
+ c_realign_if_empty(&s->res);
+
+ s->req.rto = strm_fe(s)->timeout.client;
+ s->req.wto = TICK_ETERNITY;
+
+ s->res.rto = TICK_ETERNITY;
+ s->res.wto = strm_fe(s)->timeout.client;
+
+ s->req.rex = TICK_ETERNITY;
+ s->req.wex = TICK_ETERNITY;
+ s->req.analyse_exp = TICK_ETERNITY;
+ s->res.rex = TICK_ETERNITY;
+ s->res.wex = TICK_ETERNITY;
+ s->res.analyse_exp = TICK_ETERNITY;
+ s->si[1].hcto = TICK_ETERNITY;
+
+ /* we're removing the analysers, we MUST re-enable events detection.
+ * We don't enable close on the response channel since it's either
+ * already closed, or in keep-alive with an idle connection handler.
+ */
+ channel_auto_read(&s->req);
+ channel_auto_close(&s->req);
+ channel_auto_read(&s->res);
+
+
+ return 1;
+ }
+ return 0;
+}
+
/*
* The mworker functions are used to initialize the CLI in the master process
*/
mworker_proxy->next = proxies_list;
proxies_list = mworker_proxy;
mworker_proxy->id = strdup("MASTER");
- mworker_proxy->mode = PR_MODE_TCP;
+ mworker_proxy->mode = PR_MODE_CLI;
mworker_proxy->state = PR_STNEW;
mworker_proxy->last_change = now.tv_sec;
mworker_proxy->cap = PR_CAP_LISTEN; /* this is a listen section */
/* no port specified */
newsrv->flags |= SRV_F_MAPPORTS;
newsrv->addr = *sk;
- newsrv->iweight = 1;
- newsrv->uweight = 1;
- mworker_proxy->srv_act++;
+ /* don't let the server participate to load balancing */
+ newsrv->iweight = 0;
+ newsrv->uweight = 0;
srv_lb_commit_status(newsrv);
child->srv = newsrv;