struct smbdirect_send_io *request =
container_of(wc->wr_cqe, struct smbdirect_send_io, cqe);
struct smbdirect_socket *sc = request->socket;
+ struct smbdirect_send_io *sibling, *next;
int lcredits = 0;
log_rdma_send(INFO, "smbdirect_send_io 0x%p completed wc->status=%s\n",
request, ib_wc_status_msg(wc->status));
+ /*
+ * Free possible siblings and then the main send_io
+ */
+ list_for_each_entry_safe(sibling, next, &request->sibling_list, sibling_list) {
+ list_del_init(&sibling->sibling_list);
+ smbd_free_send_io(sibling);
+ lcredits += 1;
+ }
/* Note this frees wc->wr_cqe, but not wc */
smbd_free_send_io(request);
lcredits += 1;
/* Post the send request */
static int smbd_post_send(struct smbdirect_socket *sc,
- struct smbdirect_send_io *request)
+ struct smbdirect_send_batch *batch,
+ struct smbdirect_send_io *request)
{
int i;
}
request->cqe.done = send_done;
-
request->wr.next = NULL;
- request->wr.wr_cqe = &request->cqe;
request->wr.sg_list = request->sge;
request->wr.num_sge = request->num_sge;
request->wr.opcode = IB_WR_SEND;
+
+ if (batch) {
+ request->wr.wr_cqe = NULL;
+ request->wr.send_flags = 0;
+ if (!list_empty(&batch->msg_list)) {
+ struct smbdirect_send_io *last;
+
+ last = list_last_entry(&batch->msg_list,
+ struct smbdirect_send_io,
+ sibling_list);
+ last->wr.next = &request->wr;
+ }
+ list_add_tail(&request->sibling_list, &batch->msg_list);
+ batch->wr_cnt++;
+ return 0;
+ }
+
+ request->wr.wr_cqe = &request->cqe;
request->wr.send_flags = IB_SEND_SIGNALED;
return smbd_ib_post_send(sc, &request->wr);
}
+static void smbd_send_batch_init(struct smbdirect_send_batch *batch,
+ bool need_invalidate_rkey,
+ unsigned int remote_key)
+{
+ INIT_LIST_HEAD(&batch->msg_list);
+ batch->wr_cnt = 0;
+ batch->need_invalidate_rkey = need_invalidate_rkey;
+ batch->remote_key = remote_key;
+}
+
+static int smbd_send_batch_flush(struct smbdirect_socket *sc,
+ struct smbdirect_send_batch *batch,
+ bool is_last)
+{
+ struct smbdirect_send_io *first, *last;
+ int ret = 0;
+
+ if (list_empty(&batch->msg_list))
+ return 0;
+
+ first = list_first_entry(&batch->msg_list,
+ struct smbdirect_send_io,
+ sibling_list);
+ last = list_last_entry(&batch->msg_list,
+ struct smbdirect_send_io,
+ sibling_list);
+
+ if (batch->need_invalidate_rkey) {
+ first->wr.opcode = IB_WR_SEND_WITH_INV;
+ first->wr.ex.invalidate_rkey = batch->remote_key;
+ batch->need_invalidate_rkey = false;
+ batch->remote_key = 0;
+ }
+
+ last->wr.send_flags = IB_SEND_SIGNALED;
+ last->wr.wr_cqe = &last->cqe;
+
+ /*
+ * Remove last from batch->msg_list
+ * and splice the rest of batch->msg_list
+ * to last->sibling_list.
+ *
+ * batch->msg_list is a valid empty list
+ * at the end.
+ */
+ list_del_init(&last->sibling_list);
+ list_splice_tail_init(&batch->msg_list, &last->sibling_list);
+ batch->wr_cnt = 0;
+
+ ret = smbd_ib_post_send(sc, &first->wr);
+ if (ret) {
+ struct smbdirect_send_io *sibling, *next;
+
+ list_for_each_entry_safe(sibling, next, &last->sibling_list, sibling_list) {
+ list_del_init(&sibling->sibling_list);
+ smbd_free_send_io(sibling);
+ }
+ smbd_free_send_io(last);
+ }
+
+ return ret;
+}
+
static int wait_for_credits(struct smbdirect_socket *sc,
wait_queue_head_t *waitq, atomic_t *total_credits,
int needed)
} while (true);
}
-static int wait_for_send_lcredit(struct smbdirect_socket *sc)
+static int wait_for_send_lcredit(struct smbdirect_socket *sc,
+ struct smbdirect_send_batch *batch)
{
+ if (batch && (atomic_read(&sc->send_io.lcredits.count) <= 1)) {
+ int ret;
+
+ ret = smbd_send_batch_flush(sc, batch, false);
+ if (ret)
+ return ret;
+ }
+
return wait_for_credits(sc,
&sc->send_io.lcredits.wait_queue,
&sc->send_io.lcredits.count,
1);
}
-static int wait_for_send_credits(struct smbdirect_socket *sc)
+static int wait_for_send_credits(struct smbdirect_socket *sc,
+ struct smbdirect_send_batch *batch)
{
+ if (batch &&
+ (batch->wr_cnt >= 16 || atomic_read(&sc->send_io.credits.count) <= 1)) {
+ int ret;
+
+ ret = smbd_send_batch_flush(sc, batch, false);
+ if (ret)
+ return ret;
+ }
+
return wait_for_credits(sc,
&sc->send_io.credits.wait_queue,
&sc->send_io.credits.count,
}
static int smbd_post_send_iter(struct smbdirect_socket *sc,
+ struct smbdirect_send_batch *batch,
struct iov_iter *iter,
int *_remaining_data_length)
{
struct smbdirect_data_transfer *packet;
int new_credits = 0;
- rc = wait_for_send_lcredit(sc);
+ rc = wait_for_send_lcredit(sc, batch);
if (rc) {
log_outgoing(ERR, "disconnected not sending on wait_lcredit\n");
rc = -EAGAIN;
goto err_wait_lcredit;
}
- rc = wait_for_send_credits(sc);
+ rc = wait_for_send_credits(sc, batch);
if (rc) {
log_outgoing(ERR, "disconnected not sending on wait_credit\n");
rc = -EAGAIN;
le32_to_cpu(packet->data_length),
le32_to_cpu(packet->remaining_data_length));
- rc = smbd_post_send(sc, request);
+ rc = smbd_post_send(sc, batch, request);
if (!rc)
return 0;
int remaining_data_length = 0;
sc->statistics.send_empty++;
- return smbd_post_send_iter(sc, NULL, &remaining_data_length);
+ return smbd_post_send_iter(sc, NULL, NULL, &remaining_data_length);
}
static int smbd_post_send_full_iter(struct smbdirect_socket *sc,
+ struct smbdirect_send_batch *batch,
struct iov_iter *iter,
int *_remaining_data_length)
{
*/
while (iov_iter_count(iter) > 0) {
- rc = smbd_post_send_iter(sc, iter, _remaining_data_length);
+ rc = smbd_post_send_iter(sc, batch, iter, _remaining_data_length);
if (rc < 0)
break;
}
struct smbdirect_socket_parameters *sp = &sc->parameters;
struct smb_rqst *rqst;
struct iov_iter iter;
+ struct smbdirect_send_batch batch;
unsigned int remaining_data_length, klen;
int rc, i, rqst_idx;
+ int error = 0;
if (sc->status != SMBDIRECT_SOCKET_CONNECTED)
return -EAGAIN;
num_rqst, remaining_data_length);
rqst_idx = 0;
+ smbd_send_batch_init(&batch, false, 0);
do {
rqst = &rqst_array[rqst_idx];
klen += rqst->rq_iov[i].iov_len;
iov_iter_kvec(&iter, ITER_SOURCE, rqst->rq_iov, rqst->rq_nvec, klen);
- rc = smbd_post_send_full_iter(sc, &iter, &remaining_data_length);
- if (rc < 0)
+ rc = smbd_post_send_full_iter(sc, &batch, &iter, &remaining_data_length);
+ if (rc < 0) {
+ error = rc;
break;
+ }
if (iov_iter_count(&rqst->rq_iter) > 0) {
/* And then the data pages if there are any */
- rc = smbd_post_send_full_iter(sc, &rqst->rq_iter,
+ rc = smbd_post_send_full_iter(sc, &batch, &rqst->rq_iter,
&remaining_data_length);
- if (rc < 0)
+ if (rc < 0) {
+ error = rc;
break;
+ }
}
} while (++rqst_idx < num_rqst);
+ rc = smbd_send_batch_flush(sc, &batch, true);
+ if (unlikely(!rc && error))
+ rc = error;
+
/*
* As an optimization, we don't wait for individual I/O to finish
* before sending the next one.