From: Peter Somogyi Date: Wed, 20 Dec 2006 16:42:58 +0000 (+0100) Subject: Made receiver handle partial packets. X-Git-Tag: tevent-0.9.20~348^2~2986^2~1 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=efd2903e0fa4f52f42b9ad33274d224db70db4b9;p=thirdparty%2Fsamba.git Made receiver handle partial packets. (This used to be ctdb commit 808fd658552e489825fb22453755e225549ebfcc) --- efd2903e0fa4f52f42b9ad33274d224db70db4b9 diff --cc ctdb/ib/ibwrapper.c index c04505bc474,b70b6caad6b..db6e303638b --- a/ctdb/ib/ibwrapper.c +++ b/ctdb/ib/ibwrapper.c @@@ -49,38 -49,7 +49,40 @@@ static char ibw_lasterr[IBW_LASTERR_BUF static void ibw_event_handler_verbs(struct event_context *ev, struct fd_event *fde, uint16_t flags, void *private_data); static int ibw_fill_cq(struct ibw_conn *conn); ++static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc); ++static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc); +static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn, + int n, struct ibv_mr **ppmr) +{ + void *buf; + buf = memalign(pctx->pagesize, n); + if (!buf) { + sprintf(ibw_lasterr, "couldn't allocate memory\n"); + return NULL; + } + + *ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE); + if (!*ppmr) { + sprintf(ibw_lasterr, "couldn't allocate mr\n"); + free(buf); + return NULL; + } + + return buf; +} + +static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr) +{ + if (*ppmr!=NULL) { + ibv_dereg_mr(*ppmr); + *ppmr = NULL; + } + if (*ppbuf) { + free(*ppbuf); + *ppbuf = NULL; + } +} static int ibw_init_memory(struct ibw_conn *conn) { @@@ -503,67 -489,62 +505,61 @@@ static void ibw_event_handler_verbs(str struct ibv_wc wc; int rc; ++ struct ibv_cq *ev_cq; ++ void *ev_ctx; -- rc = ibv_poll_cq(pconn->cq, 1, &wc); -- if (rc!=1) { -- sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc); ++ /* TODO: check whether if it's good to have more channels here... */ ++ rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx); ++ if (rc) { ++ sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc); goto error; } -- if (wc.status) { -- sprintf(ibw_lasterr, "cq completion failed status %d\n", -- wc.status); ++ if (ev_cq != pconn->cq) { ++ sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n", ++ (unsigned int)ev_cq, (unsigned int)pconn->cq); ++ goto error; ++ } ++ rc = ibv_req_notify_cq(pconn->cq, 0); ++ if (rc) { ++ sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc); goto error; } -- switch(wc.opcode) { -- case IBV_WC_SEND: -- { -- struct ibw_wr *p; -- -- DEBUG(10, ("send completion\n")); -- assert(pconn->cm_id->qp->qp_num==wc.qp_num); - assert(wc.wr_id < pctx->opts.max_send_wr); - - assert(wc.wr_id < pctx->qsize); -- p = pconn->wr_index[wc.wr_id]; - if (p->msg_large) { - ibw_free_mr(&p->msg_large, &p->mr_large); - } - -- DLIST_REMOVE(pconn->wr_list_used, p); -- DLIST_ADD(pconn->wr_list_avail, p); ++ while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) { ++ if (wc.status) { ++ sprintf(ibw_lasterr, "cq completion failed status %d\n", ++ wc.status); ++ goto error; } -- break; -- -- case IBV_WC_RDMA_WRITE: -- DEBUG(10, ("rdma write completion\n")); -- break; -- case IBV_WC_RDMA_READ: -- DEBUG(10, ("rdma read completion\n")); -- break; ++ switch(wc.opcode) { ++ case IBV_WC_SEND: ++ DEBUG(10, ("send completion\n")); ++ if (ibw_wc_send(conn, &wc)) ++ goto error; ++ break; -- case IBV_WC_RECV: -- { - int recv_index; - struct ibw_wr *p; - - assert(pconn->cm_id->qp->qp_num==wc.qp_num); - assert(wc.wr_id < pctx->qsize); - p = pconn->wr_index[wc.wr_id]; - - DLIST_REMOVE(pconn->wr_list_used, p); - DLIST_ADD(pconn->wr_list_avail, p); ++ case IBV_WC_RDMA_WRITE: ++ DEBUG(10, ("rdma write completion\n")); ++ break; + ++ case IBV_WC_RDMA_READ: ++ DEBUG(10, ("rdma read completion\n")); ++ break; + ++ case IBV_WC_RECV: DEBUG(10, ("recv completion\n")); - assert(pconn->cm_id->qp->qp_num==wc.qp_num); - assert((int)wc.wr_id > pctx->opts.max_send_wr); - recv_index = (int)wc.wr_id - pctx->opts.max_send_wr; - assert(recv_index < pctx->opts.max_recv_wr); - assert(wc.byte_len <= pctx->opts.recv_bufsize); - - /* TODO: take care of fragmented messages !!! */ - pctx->receive_func(conn, - pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize), - wc.byte_len); - assert(wc.byte_len <= pctx->max_msg_size); - - pctx->receive_func(conn, p->msg, wc.byte_len); -- if (ibw_refill_cq_recv(conn)) ++ if (ibw_wc_recv(conn, &wc)) goto error; -- } -- break; ++ break; -- default: -- sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode); ++ default: ++ sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode); ++ goto error; ++ } ++ } ++ if (rc!=0) { ++ sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc); goto error; } @@@ -574,6 -555,6 +570,163 @@@ error pctx->connstate_func(NULL, conn); } ++static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc) ++{ ++ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); ++ struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); ++ struct ibw_wr *p; ++ ++ assert(pconn->cm_id->qp->qp_num==wc->qp_num); ++ assert(wc->wr_id < pctx->opts.max_send_wr); ++ ++ p = pconn->wr_index[wc->wr_id]; ++ if (p->msg_large) { ++ ibw_free_mr(&p->msg_large, &p->mr_large); ++ } ++ ++ DLIST_REMOVE(pconn->wr_list_used, p); ++ DLIST_ADD(pconn->wr_list_avail, p); ++ ++ return 0; ++} ++ ++static inline int ibw_append_to_part(void *memctx, struct ibw_part *part, ++ char **pp, uint32_t add_len, int info) ++{ ++ /* allocate more if necessary - it's an "evergrowing" buffer... */ ++ if (part->len + add_len > part->bufsize) { ++ if (part->buf==NULL) { ++ assert(part->len==0); ++ part->buf = talloc_size(memctx, add_len); ++ if (part->buf==NULL) { ++ sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n", ++ add_len, info); ++ return -1; ++ } ++ part->bufsize = add_len; ++ } else { ++ part->buf = talloc_realloc_size(memctx, ++ part->buf, part->len + add_len); ++ if (part->buf==NULL) { ++ sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n", ++ part->len, add_len, info); ++ return -1; ++ } ++ } ++ part->bufsize = part->len + add_len; ++ } ++ ++ /* consume pp */ ++ memcpy(part->buf + part->len, *pp, add_len); ++ *pp += add_len; ++ part->len += add_len; ++ part->to_read -= add_len; ++ ++ return 0; ++} ++ ++static inline int ibw_wc_mem_threshold(void *memctx, struct ibw_part *part, uint32_t threshold) ++{ ++ if (part->bufsize > threshold) { ++ talloc_free(part->buf); ++ part->buf = talloc_size(memctx, threshold); ++ if (part->buf==NULL) { ++ sprintf(ibw_lasterr, "talloc_size failed\n"); ++ return -1; ++ } ++ part->bufsize = threshold; ++ } ++ return 0; ++} ++ ++static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc) ++{ ++ struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv); ++ struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv); ++ int recv_index; ++ char *p; ++ uint32_t remain; ++ struct ibw_part *part; ++ ++ assert(pconn->cm_id->qp->qp_num==wc->qp_num); ++ assert((int)wc->wr_id > pctx->opts.max_send_wr); ++ recv_index = (int)wc->wr_id - pctx->opts.max_send_wr; ++ assert(recv_index < pctx->opts.max_recv_wr); ++ assert(wc->byte_len <= pctx->opts.recv_bufsize); ++ ++ p = pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize); ++ part = &pconn->part; ++ ++ remain = wc->byte_len; ++ while(remain) { ++ /* here always true: (part->len!=0 && part->to_read!=0) || ++ (part->len==0 && part->to_read==0) */ ++ if (part->len) { /* is there a partial msg to be continued? */ ++ int read_len = (part->to_read<=remain) ? part->to_read : remain; ++ if (ibw_append_to_part(pconn, part, &p, read_len, 421)) ++ goto error; ++ remain -= read_len; ++ ++ if (part->len<=sizeof(uint32_t) && part->to_read==0) { ++ assert(part->len==sizeof(uint32_t)); ++ /* set it again now... */ ++ part->to_read = *((uint32_t *)(part->buf)); ++ if (part->to_readto_read); ++ goto error; ++ } ++ part->to_read -= sizeof(uint32_t); /* it's already read */ ++ } ++ ++ if (part->to_read==0) { ++ pctx->receive_func(conn, part->buf, part->len); ++ part->len = 0; /* tells not having partial data (any more) */ ++ if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold)) ++ goto error; ++ } ++ } else { ++ if (remain>=sizeof(uint32_t)) { ++ uint32_t msglen = *(uint32_t *)p; ++ if (msglenreceive_func(conn, p, msglen); ++ p += msglen; ++ remain -= msglen; ++ } else { ++ part->to_read = msglen; ++ /* part->len is already 0 */ ++ if (ibw_append_to_part(pconn, part, &p, remain, 422)) ++ goto error; ++ remain = 0; /* to be continued ... */ ++ /* part->to_read > 0 here */ ++ } ++ } else { /* edge case: */ ++ part->to_read = sizeof(uint32_t); ++ /* part->len is already 0 */ ++ if (ibw_append_to_part(pconn, part, &p, remain, 423)) ++ goto error; ++ remain = 0; ++ /* part->to_read > 0 here */ ++ } ++ } ++ } /* is always decreased at least by 1 */ ++ ++ if (ibw_refill_cq_recv(conn)) ++ goto error; ++ ++ return 0; ++ ++error: ++ DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr)); ++ conn->state = IBWC_ERROR; ++ return -1; ++} ++ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts) { int i; @@@ -581,8 -562,6 +734,9 @@@ opts->max_send_wr = 256; opts->max_recv_wr = 1024; + opts->avg_send_size = 1024; + opts->recv_bufsize = 256; ++ opts->recv_threshold = 1 * 1024 * 1024; for(i=0; imax_send_wr = atoi(value); else if (strcmp(name, "max_recv_wr")==0) opts->max_recv_wr = atoi(value); + else if (strcmp(name, "avg_send_size")==0) + opts->avg_send_size = atoi(value); + else if (strcmp(name, "recv_bufsize")==0) + opts->recv_bufsize = atoi(value); ++ else if (strcmp(name, "recv_threshold")==0) ++ opts->recv_threshold = atoi(value); else { sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name); return -1; @@@ -843,20 -810,8 +999,20 @@@ int ibw_send(struct ibw_conn *conn, voi }; struct ibv_send_wr *bad_wr; - if (n + sizeof(long)<=pctx->opts.avg_send_size) { - assert(p->msg==(char *)buf); - assert(n<=pctx->max_msg_size); ++ if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) { + assert((p->msg + sizeof(long))==(char *)buf); + list.lkey = pconn->mr_send->lkey; + list.addr = (uintptr_t) p->msg; + + *((uint32_t *)p->msg) = htonl(n); + } else { + assert((p->msg_large + sizeof(long))==(char *)buf); + assert(p->mr_large!=NULL); + list.lkey = p->mr_large->lkey; + list.addr = (uintptr_t) p->msg_large; + + *((uint32_t *)p->msg_large) = htonl(n); + } return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr); } diff --cc ctdb/ib/ibwrapper_internal.h index b819c483d3d,04d82f9565d..6e34917755b --- a/ctdb/ib/ibwrapper_internal.h +++ b/ctdb/ib/ibwrapper_internal.h @@@ -22,10 -22,8 +22,11 @@@ */ struct ibw_opts { -- int max_send_wr; -- int max_recv_wr; - int avg_send_size; - int recv_bufsize; ++ uint32_t max_send_wr; ++ uint32_t max_recv_wr; ++ uint32_t avg_send_size; ++ uint32_t recv_bufsize; ++ uint32_t recv_threshold; }; struct ibw_wr { @@@ -54,8 -48,10 +55,15 @@@ struct ibw_ctx_priv ibw_receive_fn_t receive_func; /* see ibw_init */ long pagesize; /* sysconf result for memalign */ - int qsize; /* opts.max_send_wr + opts.max_recv_wr */ - int max_msg_size; /* see ibw_init */ +}; + ++struct ibw_part { ++ char *buf; /* talloced memory buffer */ ++ uint32_t bufsize; /* allocated size of buf - always grows */ ++ uint32_t len; /* message part length */ ++ uint32_t to_read; /* 4 or *((uint32_t)buf) if len>=sizeof(uint32_t) */ + }; + struct ibw_conn_priv { struct ibv_comp_channel *verbs_channel; struct fd_event *verbs_channel_event; @@@ -70,10 -65,5 +78,11 @@@ struct ibw_wr *wr_list_avail; struct ibw_wr *wr_list_used; struct ibw_wr **wr_index; /* array[0..(qsize-1)] of (ibw_wr *) */ + + /* buf_recv is a ring buffer */ + char *buf_recv; /* max_recv_wr * avg_recv_size */ + struct ibv_mr *mr_recv; - int recv_index; /* index of the next recv buffer */ ++ int recv_index; /* index of the next recv buffer when refilling */ ++ struct ibw_part part; };