static void ibw_event_handler_verbs(struct event_context *ev,
struct fd_event *fde, uint16_t flags, void *private_data);
static int ibw_fill_cq(struct ibw_conn *conn);
++static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc);
++static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc);
+static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
+ int n, struct ibv_mr **ppmr)
+{
+ void *buf;
+ buf = memalign(pctx->pagesize, n);
+ if (!buf) {
+ sprintf(ibw_lasterr, "couldn't allocate memory\n");
+ return NULL;
+ }
+
+ *ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
+ if (!*ppmr) {
+ sprintf(ibw_lasterr, "couldn't allocate mr\n");
+ free(buf);
+ return NULL;
+ }
+
+ return buf;
+}
+
+static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr)
+{
+ if (*ppmr!=NULL) {
+ ibv_dereg_mr(*ppmr);
+ *ppmr = NULL;
+ }
+ if (*ppbuf) {
+ free(*ppbuf);
+ *ppbuf = NULL;
+ }
+}
static int ibw_init_memory(struct ibw_conn *conn)
{
struct ibv_wc wc;
int rc;
++ struct ibv_cq *ev_cq;
++ void *ev_ctx;
-- rc = ibv_poll_cq(pconn->cq, 1, &wc);
-- if (rc!=1) {
-- sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
++ /* TODO: check whether if it's good to have more channels here... */
++ rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx);
++ if (rc) {
++ sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc);
goto error;
}
-- if (wc.status) {
-- sprintf(ibw_lasterr, "cq completion failed status %d\n",
-- wc.status);
++ if (ev_cq != pconn->cq) {
++ sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n",
++ (unsigned int)ev_cq, (unsigned int)pconn->cq);
++ goto error;
++ }
++ rc = ibv_req_notify_cq(pconn->cq, 0);
++ if (rc) {
++ sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc);
goto error;
}
-- switch(wc.opcode) {
-- case IBV_WC_SEND:
-- {
-- struct ibw_wr *p;
--
-- DEBUG(10, ("send completion\n"));
-- assert(pconn->cm_id->qp->qp_num==wc.qp_num);
- assert(wc.wr_id < pctx->opts.max_send_wr);
-
- assert(wc.wr_id < pctx->qsize);
-- p = pconn->wr_index[wc.wr_id];
- if (p->msg_large) {
- ibw_free_mr(&p->msg_large, &p->mr_large);
- }
-
-- DLIST_REMOVE(pconn->wr_list_used, p);
-- DLIST_ADD(pconn->wr_list_avail, p);
++ while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
++ if (wc.status) {
++ sprintf(ibw_lasterr, "cq completion failed status %d\n",
++ wc.status);
++ goto error;
}
-- break;
--
-- case IBV_WC_RDMA_WRITE:
-- DEBUG(10, ("rdma write completion\n"));
-- break;
-- case IBV_WC_RDMA_READ:
-- DEBUG(10, ("rdma read completion\n"));
-- break;
++ switch(wc.opcode) {
++ case IBV_WC_SEND:
++ DEBUG(10, ("send completion\n"));
++ if (ibw_wc_send(conn, &wc))
++ goto error;
++ break;
-- case IBV_WC_RECV:
-- {
- int recv_index;
- struct ibw_wr *p;
-
- assert(pconn->cm_id->qp->qp_num==wc.qp_num);
- assert(wc.wr_id < pctx->qsize);
- p = pconn->wr_index[wc.wr_id];
-
- DLIST_REMOVE(pconn->wr_list_used, p);
- DLIST_ADD(pconn->wr_list_avail, p);
++ case IBV_WC_RDMA_WRITE:
++ DEBUG(10, ("rdma write completion\n"));
++ break;
+
++ case IBV_WC_RDMA_READ:
++ DEBUG(10, ("rdma read completion\n"));
++ break;
+
++ case IBV_WC_RECV:
DEBUG(10, ("recv completion\n"));
- assert(pconn->cm_id->qp->qp_num==wc.qp_num);
- assert((int)wc.wr_id > pctx->opts.max_send_wr);
- recv_index = (int)wc.wr_id - pctx->opts.max_send_wr;
- assert(recv_index < pctx->opts.max_recv_wr);
- assert(wc.byte_len <= pctx->opts.recv_bufsize);
-
- /* TODO: take care of fragmented messages !!! */
- pctx->receive_func(conn,
- pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize),
- wc.byte_len);
- assert(wc.byte_len <= pctx->max_msg_size);
-
- pctx->receive_func(conn, p->msg, wc.byte_len);
-- if (ibw_refill_cq_recv(conn))
++ if (ibw_wc_recv(conn, &wc))
goto error;
-- }
-- break;
++ break;
-- default:
-- sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
++ default:
++ sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
++ goto error;
++ }
++ }
++ if (rc!=0) {
++ sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
goto error;
}
pctx->connstate_func(NULL, conn);
}
++static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
++{
++ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
++ struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
++ struct ibw_wr *p;
++
++ assert(pconn->cm_id->qp->qp_num==wc->qp_num);
++ assert(wc->wr_id < pctx->opts.max_send_wr);
++
++ p = pconn->wr_index[wc->wr_id];
++ if (p->msg_large) {
++ ibw_free_mr(&p->msg_large, &p->mr_large);
++ }
++
++ DLIST_REMOVE(pconn->wr_list_used, p);
++ DLIST_ADD(pconn->wr_list_avail, p);
++
++ return 0;
++}
++
++static inline int ibw_append_to_part(void *memctx, struct ibw_part *part,
++ char **pp, uint32_t add_len, int info)
++{
++ /* allocate more if necessary - it's an "evergrowing" buffer... */
++ if (part->len + add_len > part->bufsize) {
++ if (part->buf==NULL) {
++ assert(part->len==0);
++ part->buf = talloc_size(memctx, add_len);
++ if (part->buf==NULL) {
++ sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n",
++ add_len, info);
++ return -1;
++ }
++ part->bufsize = add_len;
++ } else {
++ part->buf = talloc_realloc_size(memctx,
++ part->buf, part->len + add_len);
++ if (part->buf==NULL) {
++ sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n",
++ part->len, add_len, info);
++ return -1;
++ }
++ }
++ part->bufsize = part->len + add_len;
++ }
++
++ /* consume pp */
++ memcpy(part->buf + part->len, *pp, add_len);
++ *pp += add_len;
++ part->len += add_len;
++ part->to_read -= add_len;
++
++ return 0;
++}
++
++static inline int ibw_wc_mem_threshold(void *memctx, struct ibw_part *part, uint32_t threshold)
++{
++ if (part->bufsize > threshold) {
++ talloc_free(part->buf);
++ part->buf = talloc_size(memctx, threshold);
++ if (part->buf==NULL) {
++ sprintf(ibw_lasterr, "talloc_size failed\n");
++ return -1;
++ }
++ part->bufsize = threshold;
++ }
++ return 0;
++}
++
++static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
++{
++ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
++ struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
++ int recv_index;
++ char *p;
++ uint32_t remain;
++ struct ibw_part *part;
++
++ assert(pconn->cm_id->qp->qp_num==wc->qp_num);
++ assert((int)wc->wr_id > pctx->opts.max_send_wr);
++ recv_index = (int)wc->wr_id - pctx->opts.max_send_wr;
++ assert(recv_index < pctx->opts.max_recv_wr);
++ assert(wc->byte_len <= pctx->opts.recv_bufsize);
++
++ p = pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize);
++ part = &pconn->part;
++
++ remain = wc->byte_len;
++ while(remain) {
++ /* here always true: (part->len!=0 && part->to_read!=0) ||
++ (part->len==0 && part->to_read==0) */
++ if (part->len) { /* is there a partial msg to be continued? */
++ int read_len = (part->to_read<=remain) ? part->to_read : remain;
++ if (ibw_append_to_part(pconn, part, &p, read_len, 421))
++ goto error;
++ remain -= read_len;
++
++ if (part->len<=sizeof(uint32_t) && part->to_read==0) {
++ assert(part->len==sizeof(uint32_t));
++ /* set it again now... */
++ part->to_read = *((uint32_t *)(part->buf));
++ if (part->to_read<sizeof(uint32_t)) {
++ sprintf(ibw_lasterr, "got msglen=%u #2\n", part->to_read);
++ goto error;
++ }
++ part->to_read -= sizeof(uint32_t); /* it's already read */
++ }
++
++ if (part->to_read==0) {
++ pctx->receive_func(conn, part->buf, part->len);
++ part->len = 0; /* tells not having partial data (any more) */
++ if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold))
++ goto error;
++ }
++ } else {
++ if (remain>=sizeof(uint32_t)) {
++ uint32_t msglen = *(uint32_t *)p;
++ if (msglen<sizeof(uint32_t)) {
++ sprintf(ibw_lasterr, "got msglen=%u\n", msglen);
++ goto error;
++ }
++
++ /* mostly awaited case: */
++ if (msglen<=remain) {
++ pctx->receive_func(conn, p, msglen);
++ p += msglen;
++ remain -= msglen;
++ } else {
++ part->to_read = msglen;
++ /* part->len is already 0 */
++ if (ibw_append_to_part(pconn, part, &p, remain, 422))
++ goto error;
++ remain = 0; /* to be continued ... */
++ /* part->to_read > 0 here */
++ }
++ } else { /* edge case: */
++ part->to_read = sizeof(uint32_t);
++ /* part->len is already 0 */
++ if (ibw_append_to_part(pconn, part, &p, remain, 423))
++ goto error;
++ remain = 0;
++ /* part->to_read > 0 here */
++ }
++ }
++ } /* <remain> is always decreased at least by 1 */
++
++ if (ibw_refill_cq_recv(conn))
++ goto error;
++
++ return 0;
++
++error:
++ DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr));
++ conn->state = IBWC_ERROR;
++ return -1;
++}
++
static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts)
{
int i;
opts->max_send_wr = 256;
opts->max_recv_wr = 1024;
+ opts->avg_send_size = 1024;
+ opts->recv_bufsize = 256;
++ opts->recv_threshold = 1 * 1024 * 1024;
for(i=0; i<nattr; i++) {
name = attr[i].name;
opts->max_send_wr = atoi(value);
else if (strcmp(name, "max_recv_wr")==0)
opts->max_recv_wr = atoi(value);
+ else if (strcmp(name, "avg_send_size")==0)
+ opts->avg_send_size = atoi(value);
+ else if (strcmp(name, "recv_bufsize")==0)
+ opts->recv_bufsize = atoi(value);
++ else if (strcmp(name, "recv_threshold")==0)
++ opts->recv_threshold = atoi(value);
else {
sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
return -1;
};
struct ibv_send_wr *bad_wr;
- if (n + sizeof(long)<=pctx->opts.avg_send_size) {
- assert(p->msg==(char *)buf);
- assert(n<=pctx->max_msg_size);
++ if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) {
+ assert((p->msg + sizeof(long))==(char *)buf);
+ list.lkey = pconn->mr_send->lkey;
+ list.addr = (uintptr_t) p->msg;
+
+ *((uint32_t *)p->msg) = htonl(n);
+ } else {
+ assert((p->msg_large + sizeof(long))==(char *)buf);
+ assert(p->mr_large!=NULL);
+ list.lkey = p->mr_large->lkey;
+ list.addr = (uintptr_t) p->msg_large;
+
+ *((uint32_t *)p->msg_large) = htonl(n);
+ }
return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
}