HA_ATOMIC_INC(b_orig(buf) + ds->ofs);
}
- /* in this loop, ofs always points to the counter byte that precedes
- * the message so that we can take our reference there if we have to
- * stop before the end (ret=0).
+ /* we were already there, adjust the offset to be relative to
+ * the buffer's head and remove us from the counter.
*/
- if (sc_opposite(sc)->state == SC_ST_EST) {
- /* we were already there, adjust the offset to be relative to
- * the buffer's head and remove us from the counter.
- */
- ofs = ds->ofs - b_head_ofs(buf);
- if (ds->ofs < b_head_ofs(buf))
- ofs += b_size(buf);
-
- BUG_ON(ofs >= buf->size);
- HA_ATOMIC_DEC(b_peek(buf, ofs));
-
- ret = 1;
- while (ofs + 1 < b_data(buf)) {
- struct dns_query *query;
- uint16_t original_qid;
- uint16_t new_qid;
-
- cnt = 1;
- len = b_peek_varint(buf, ofs + cnt, &msg_len);
- if (!len)
- break;
- cnt += len;
- BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
-
- /* retrieve available room on output channel */
- available_room = channel_recv_max(sc_ic(sc));
+ ofs = ds->ofs - b_head_ofs(buf);
+ if (ds->ofs < b_head_ofs(buf))
+ ofs += b_size(buf);
- /* tx_msg_offset null means we are at the start of a new message */
- if (!ds->tx_msg_offset) {
- uint16_t slen;
+ BUG_ON(ofs >= buf->size);
+ HA_ATOMIC_DEC(b_peek(buf, ofs));
- /* check if there is enough room to put message len and query id */
- if (available_room < sizeof(slen) + sizeof(new_qid)) {
- sc_need_room(sc);
- ret = 0;
- break;
- }
+ /* in following loop, ofs always points to the counter byte that
+ * precedes the message so that we can take our reference there if we
+ * have to stop before the end (ret=0).
+ */
+ ret = 1;
+ while (ofs + 1 < b_data(buf)) {
+ struct dns_query *query;
+ uint16_t original_qid;
+ uint16_t new_qid;
- /* put msg len into then channel */
- slen = (uint16_t)msg_len;
- slen = htons(slen);
- applet_putblk(appctx, (char *)&slen, sizeof(slen));
- available_room -= sizeof(slen);
-
- /* backup original query id */
- len = b_getblk(buf, (char *)&original_qid, sizeof(original_qid), ofs + cnt);
- if (!len) {
- /* should never happen since messages are atomically
- * written into ring
- */
- ret = 0;
- break;
- }
+ cnt = 1;
+ len = b_peek_varint(buf, ofs + cnt, &msg_len);
+ if (!len)
+ break;
+ cnt += len;
+ BUG_ON(msg_len + ofs + cnt + 1 > b_data(buf));
- /* generates new query id */
- new_qid = ++ds->query_counter;
- new_qid = htons(new_qid);
-
- /* put new query id into the channel */
- applet_putblk(appctx, (char *)&new_qid, sizeof(new_qid));
- available_room -= sizeof(new_qid);
-
- /* keep query id mapping */
-
- query = pool_alloc(dns_query_pool);
- if (query) {
- query->qid.key = new_qid;
- query->original_qid = original_qid;
- query->expire = tick_add(now_ms, 5000);
- LIST_INIT(&query->list);
- if (LIST_ISEMPTY(&ds->queries)) {
- /* enable task to handle expire */
- ds->task_exp->expire = query->expire;
- /* ensure this will be executed by the same
- * thread than ds_session_release
- * to ensure session_release is free
- * to destroy the task */
- task_queue(ds->task_exp);
- }
- LIST_APPEND(&ds->queries, &query->list);
- eb32_insert(&ds->query_ids, &query->qid);
- ds->onfly_queries++;
- }
+ /* retrieve available room on output channel */
+ available_room = channel_recv_max(sc_ic(sc));
- /* update the tx_offset to handle output in 16k streams */
- ds->tx_msg_offset = sizeof(original_qid);
+ /* tx_msg_offset null means we are at the start of a new message */
+ if (!ds->tx_msg_offset) {
+ uint16_t slen;
+ /* check if there is enough room to put message len and query id */
+ if (available_room < sizeof(slen) + sizeof(new_qid)) {
+ sc_need_room(sc);
+ ret = 0;
+ break;
}
- /* check if it remains available room on output chan */
- if (unlikely(!available_room)) {
- sc_need_room(sc);
+ /* put msg len into then channel */
+ slen = (uint16_t)msg_len;
+ slen = htons(slen);
+ applet_putblk(appctx, (char *)&slen, sizeof(slen));
+ available_room -= sizeof(slen);
+
+ /* backup original query id */
+ len = b_getblk(buf, (char *)&original_qid, sizeof(original_qid), ofs + cnt);
+ if (!len) {
+ /* should never happen since messages are atomically
+ * written into ring
+ */
ret = 0;
break;
}
- chunk_reset(&trash);
- if ((msg_len - ds->tx_msg_offset) > available_room) {
- /* remaining msg data is too large to be written in output channel at one time */
+ /* generates new query id */
+ new_qid = ++ds->query_counter;
+ new_qid = htons(new_qid);
+
+ /* put new query id into the channel */
+ applet_putblk(appctx, (char *)&new_qid, sizeof(new_qid));
+ available_room -= sizeof(new_qid);
+
+ /* keep query id mapping */
+
+ query = pool_alloc(dns_query_pool);
+ if (query) {
+ query->qid.key = new_qid;
+ query->original_qid = original_qid;
+ query->expire = tick_add(now_ms, 5000);
+ LIST_INIT(&query->list);
+ if (LIST_ISEMPTY(&ds->queries)) {
+ /* enable task to handle expire */
+ ds->task_exp->expire = query->expire;
+ /* ensure this will be executed by the same
+ * thread than ds_session_release
+ * to ensure session_release is free
+ * to destroy the task */
+ task_queue(ds->task_exp);
+ }
+ LIST_APPEND(&ds->queries, &query->list);
+ eb32_insert(&ds->query_ids, &query->qid);
+ ds->onfly_queries++;
+ }
- len = b_getblk(buf, trash.area, available_room, ofs + cnt + ds->tx_msg_offset);
+ /* update the tx_offset to handle output in 16k streams */
+ ds->tx_msg_offset = sizeof(original_qid);
- /* update offset to complete mesg forwarding later */
- ds->tx_msg_offset += len;
- }
- else {
- /* remaining msg data can be written in output channel at one time */
- len = b_getblk(buf, trash.area, msg_len - ds->tx_msg_offset, ofs + cnt + ds->tx_msg_offset);
+ }
- /* reset tx_msg_offset to mark forward fully processed */
- ds->tx_msg_offset = 0;
- }
- trash.data += len;
+ /* check if it remains available room on output chan */
+ if (unlikely(!available_room)) {
+ sc_need_room(sc);
+ ret = 0;
+ break;
+ }
- if (applet_putchk(appctx, &trash) == -1) {
- /* should never happen since we
- * check available_room is large
- * enough here.
- */
- ret = 0;
- break;
- }
+ chunk_reset(&trash);
+ if ((msg_len - ds->tx_msg_offset) > available_room) {
+ /* remaining msg data is too large to be written in output channel at one time */
- if (ds->tx_msg_offset) {
- /* msg was not fully processed, we must be awake to drain pending data */
+ len = b_getblk(buf, trash.area, available_room, ofs + cnt + ds->tx_msg_offset);
- sc_need_room(sc);
- ret = 0;
- break;
- }
- /* switch to next message */
- ofs += cnt + msg_len;
+ /* update offset to complete mesg forwarding later */
+ ds->tx_msg_offset += len;
+ }
+ else {
+ /* remaining msg data can be written in output channel at one time */
+ len = b_getblk(buf, trash.area, msg_len - ds->tx_msg_offset, ofs + cnt + ds->tx_msg_offset);
+
+ /* reset tx_msg_offset to mark forward fully processed */
+ ds->tx_msg_offset = 0;
}
+ trash.data += len;
- HA_ATOMIC_INC(b_peek(buf, ofs));
- ds->ofs = b_peek_ofs(buf, ofs);
+ if (applet_putchk(appctx, &trash) == -1) {
+ /* should never happen since we
+ * check available_room is large
+ * enough here.
+ */
+ ret = 0;
+ break;
+ }
+
+ if (ds->tx_msg_offset) {
+ /* msg was not fully processed, we must be awake to drain pending data */
+
+ sc_need_room(sc);
+ ret = 0;
+ break;
+ }
+ /* switch to next message */
+ ofs += cnt + msg_len;
}
+
+ HA_ATOMIC_INC(b_peek(buf, ofs));
+ ds->ofs = b_peek_ofs(buf, ofs);
+
HA_RWLOCK_RDUNLOCK(DNS_LOCK, &ring->lock);
if (ret) {