}
static struct nfsd_file *
-ff_local_open_fh(struct pnfs_layout_segment *lseg, u32 ds_idx,
+ff_local_open_fh(struct pnfs_layout_segment *lseg, u32 ds_idx, u32 dss_id,
struct nfs_client *clp, const struct cred *cred,
struct nfs_fh *fh, fmode_t mode)
{
#if IS_ENABLED(CONFIG_NFS_LOCALIO)
struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, ds_idx);
- return nfs_local_open_fh(clp, cred, fh, &mirror->dss[0].nfl, mode);
+ return nfs_local_open_fh(clp, cred, fh, &mirror->dss[dss_id].nfl, mode);
#else
return NULL;
#endif
static void
ff_layout_mark_ds_unreachable(struct pnfs_layout_segment *lseg, u32 idx)
{
- struct nfs4_deviceid_node *devid = FF_LAYOUT_DEVID_NODE(lseg, idx);
+ struct nfs4_deviceid_node *devid = FF_LAYOUT_DEVID_NODE(lseg, idx, 0);
if (devid)
nfs4_mark_deviceid_unavailable(devid);
static void
ff_layout_mark_ds_reachable(struct pnfs_layout_segment *lseg, u32 idx)
{
- struct nfs4_deviceid_node *devid = FF_LAYOUT_DEVID_NODE(lseg, idx);
+ struct nfs4_deviceid_node *devid = FF_LAYOUT_DEVID_NODE(lseg, idx, 0);
if (devid)
nfs4_mark_deviceid_available(devid);
/* mirrors are initially sorted by efficiency */
for (idx = start_idx; idx < fls->mirror_array_cnt; idx++) {
mirror = FF_LAYOUT_COMP(lseg, idx);
- ds = nfs4_ff_layout_prepare_ds(lseg, mirror, false);
+ ds = nfs4_ff_layout_prepare_ds(lseg, mirror, 0, false);
if (IS_ERR(ds))
continue;
for (i = 0; i < pgio->pg_mirror_count; i++) {
mirror = FF_LAYOUT_COMP(pgio->pg_lseg, i);
- ds = nfs4_ff_layout_prepare_ds(pgio->pg_lseg, mirror, true);
+ ds = nfs4_ff_layout_prepare_ds(pgio->pg_lseg, mirror, 0, true);
if (IS_ERR(ds)) {
if (!ff_layout_no_fallback_to_mds(pgio->pg_lseg))
goto out_mds;
{
struct pnfs_layout_hdr *lo = lseg->pls_layout;
struct inode *inode = lo->plh_inode;
- struct nfs4_deviceid_node *devid = FF_LAYOUT_DEVID_NODE(lseg, idx);
+ struct nfs4_deviceid_node *devid = FF_LAYOUT_DEVID_NODE(lseg, idx, 0);
struct nfs4_slot_table *tbl = &clp->cl_session->fc_slot_table;
switch (op_status) {
struct pnfs_layout_segment *lseg,
u32 idx)
{
- struct nfs4_deviceid_node *devid = FF_LAYOUT_DEVID_NODE(lseg, idx);
+ struct nfs4_deviceid_node *devid = FF_LAYOUT_DEVID_NODE(lseg, idx, 0);
switch (op_status) {
case NFS_OK:
mirror = FF_LAYOUT_COMP(lseg, idx);
err = ff_layout_track_ds_error(FF_LAYOUT_FROM_HDR(lseg->pls_layout),
- mirror, offset, length, status, opnum,
+ mirror, 0, offset, length, status, opnum,
nfs_io_gfp_mask());
switch (status) {
hdr->args.pgbase, (size_t)hdr->args.count, offset);
mirror = FF_LAYOUT_COMP(lseg, idx);
- ds = nfs4_ff_layout_prepare_ds(lseg, mirror, false);
+ ds = nfs4_ff_layout_prepare_ds(lseg, mirror, 0, false);
if (IS_ERR(ds)) {
ds_fatal_error = nfs_error_is_fatal(PTR_ERR(ds));
goto out_failed;
}
ds_clnt = nfs4_ff_find_or_create_ds_client(mirror, ds->ds_clp,
- hdr->inode);
+ hdr->inode, 0);
if (IS_ERR(ds_clnt))
goto out_failed;
- ds_cred = ff_layout_get_ds_cred(mirror, &lseg->pls_range, hdr->cred);
+ ds_cred = ff_layout_get_ds_cred(mirror, &lseg->pls_range, hdr->cred, 0);
if (!ds_cred)
goto out_failed;
- vers = nfs4_ff_layout_ds_version(mirror);
+ vers = nfs4_ff_layout_ds_version(mirror, 0);
dprintk("%s USE DS: %s cl_count %d vers %d\n", __func__,
ds->ds_remotestr, refcount_read(&ds->ds_clp->cl_count), vers);
hdr->pgio_done_cb = ff_layout_read_done_cb;
refcount_inc(&ds->ds_clp->cl_count);
hdr->ds_clp = ds->ds_clp;
- fh = nfs4_ff_layout_select_ds_fh(mirror);
+ fh = nfs4_ff_layout_select_ds_fh(mirror, 0);
if (fh)
hdr->args.fh = fh;
- nfs4_ff_layout_select_ds_stateid(mirror, &hdr->args.stateid);
+ nfs4_ff_layout_select_ds_stateid(mirror, 0, &hdr->args.stateid);
/*
* Note that if we ever decide to split across DSes,
hdr->mds_offset = offset;
/* Start IO accounting for local read */
- localio = ff_local_open_fh(lseg, idx, ds->ds_clp, ds_cred, fh, FMODE_READ);
+ localio = ff_local_open_fh(lseg, idx, 0, ds->ds_clp, ds_cred, fh, FMODE_READ);
if (localio) {
hdr->task.tk_start = ktime_get();
ff_layout_read_record_layoutstats_start(&hdr->task, hdr);
bool ds_fatal_error = false;
mirror = FF_LAYOUT_COMP(lseg, idx);
- ds = nfs4_ff_layout_prepare_ds(lseg, mirror, true);
+ ds = nfs4_ff_layout_prepare_ds(lseg, mirror, 0, true);
if (IS_ERR(ds)) {
ds_fatal_error = nfs_error_is_fatal(PTR_ERR(ds));
goto out_failed;
}
ds_clnt = nfs4_ff_find_or_create_ds_client(mirror, ds->ds_clp,
- hdr->inode);
+ hdr->inode, 0);
if (IS_ERR(ds_clnt))
goto out_failed;
- ds_cred = ff_layout_get_ds_cred(mirror, &lseg->pls_range, hdr->cred);
+ ds_cred = ff_layout_get_ds_cred(mirror, &lseg->pls_range, hdr->cred, 0);
if (!ds_cred)
goto out_failed;
- vers = nfs4_ff_layout_ds_version(mirror);
+ vers = nfs4_ff_layout_ds_version(mirror, 0);
dprintk("%s ino %lu sync %d req %zu@%llu DS: %s cl_count %d vers %d\n",
__func__, hdr->inode->i_ino, sync, (size_t) hdr->args.count,
refcount_inc(&ds->ds_clp->cl_count);
hdr->ds_clp = ds->ds_clp;
hdr->ds_commit_idx = idx;
- fh = nfs4_ff_layout_select_ds_fh(mirror);
+ fh = nfs4_ff_layout_select_ds_fh(mirror, 0);
if (fh)
hdr->args.fh = fh;
- nfs4_ff_layout_select_ds_stateid(mirror, &hdr->args.stateid);
+ nfs4_ff_layout_select_ds_stateid(mirror, 0, &hdr->args.stateid);
/*
* Note that if we ever decide to split across DSes,
hdr->args.offset = offset;
/* Start IO accounting for local write */
- localio = ff_local_open_fh(lseg, idx, ds->ds_clp, ds_cred, fh,
+ localio = ff_local_open_fh(lseg, idx, 0, ds->ds_clp, ds_cred, fh,
FMODE_READ|FMODE_WRITE);
if (localio) {
hdr->task.tk_start = ktime_get();
idx = calc_ds_index_from_commit(lseg, data->ds_commit_index);
mirror = FF_LAYOUT_COMP(lseg, idx);
- ds = nfs4_ff_layout_prepare_ds(lseg, mirror, true);
+ ds = nfs4_ff_layout_prepare_ds(lseg, mirror, 0, true);
if (IS_ERR(ds))
goto out_err;
ds_clnt = nfs4_ff_find_or_create_ds_client(mirror, ds->ds_clp,
- data->inode);
+ data->inode, 0);
if (IS_ERR(ds_clnt))
goto out_err;
- ds_cred = ff_layout_get_ds_cred(mirror, &lseg->pls_range, data->cred);
+ ds_cred = ff_layout_get_ds_cred(mirror, &lseg->pls_range, data->cred, 0);
if (!ds_cred)
goto out_err;
- vers = nfs4_ff_layout_ds_version(mirror);
+ vers = nfs4_ff_layout_ds_version(mirror, 0);
dprintk("%s ino %lu, how %d cl_count %d vers %d\n", __func__,
data->inode->i_ino, how, refcount_read(&ds->ds_clp->cl_count),
data->args.fh = fh;
/* Start IO accounting for local commit */
- localio = ff_local_open_fh(lseg, idx, ds->ds_clp, ds_cred, fh,
+ localio = ff_local_open_fh(lseg, idx, 0, ds->ds_clp, ds_cred, fh,
FMODE_READ|FMODE_WRITE);
if (localio) {
data->task.tk_start = ktime_get();
}
static inline struct nfs4_deviceid_node *
-FF_LAYOUT_DEVID_NODE(struct pnfs_layout_segment *lseg, u32 idx)
+FF_LAYOUT_DEVID_NODE(struct pnfs_layout_segment *lseg, u32 idx, u32 dss_id)
{
struct nfs4_ff_layout_mirror *mirror = FF_LAYOUT_COMP(lseg, idx);
if (mirror != NULL) {
- struct nfs4_ff_layout_ds *mirror_ds = mirror->dss[0].mirror_ds;
+ struct nfs4_ff_layout_ds *mirror_ds = mirror->dss[dss_id].mirror_ds;
if (!IS_ERR_OR_NULL(mirror_ds))
return &mirror_ds->id_node;
}
static inline int
-nfs4_ff_layout_ds_version(const struct nfs4_ff_layout_mirror *mirror)
+nfs4_ff_layout_ds_version(const struct nfs4_ff_layout_mirror *mirror, u32 dss_id)
{
- return mirror->dss[0].mirror_ds->ds_versions[0].version;
+ return mirror->dss[dss_id].mirror_ds->ds_versions[0].version;
+}
+
+static inline u32
+nfs4_ff_layout_calc_dss_id(const u64 stripe_unit, const u32 dss_count, const loff_t offset)
+{
+ u64 tmp = offset;
+
+ if (dss_count == 1 || stripe_unit == 0)
+ return 0;
+
+ do_div(tmp, stripe_unit);
+
+ return do_div(tmp, dss_count);
}
struct nfs4_ff_layout_ds *
void nfs4_ff_layout_put_deviceid(struct nfs4_ff_layout_ds *mirror_ds);
void nfs4_ff_layout_free_deviceid(struct nfs4_ff_layout_ds *mirror_ds);
int ff_layout_track_ds_error(struct nfs4_flexfile_layout *flo,
- struct nfs4_ff_layout_mirror *mirror, u64 offset,
- u64 length, int status, enum nfs_opnum4 opnum,
- gfp_t gfp_flags);
+ struct nfs4_ff_layout_mirror *mirror,
+ u32 dss_id, u64 offset, u64 length, int status,
+ enum nfs_opnum4 opnum, gfp_t gfp_flags);
void ff_layout_send_layouterror(struct pnfs_layout_segment *lseg);
int ff_layout_encode_ds_ioerr(struct xdr_stream *xdr, const struct list_head *head);
void ff_layout_free_ds_ioerr(struct list_head *head);
struct list_head *head,
unsigned int maxnum);
struct nfs_fh *
-nfs4_ff_layout_select_ds_fh(struct nfs4_ff_layout_mirror *mirror);
+nfs4_ff_layout_select_ds_fh(struct nfs4_ff_layout_mirror *mirror, u32 dss_id);
void
nfs4_ff_layout_select_ds_stateid(const struct nfs4_ff_layout_mirror *mirror,
- nfs4_stateid *stateid);
+ u32 dss_id,
+ nfs4_stateid *stateid);
struct nfs4_pnfs_ds *
nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg,
struct nfs4_ff_layout_mirror *mirror,
+ u32 dss_id,
bool fail_return);
struct rpc_clnt *
nfs4_ff_find_or_create_ds_client(struct nfs4_ff_layout_mirror *mirror,
struct nfs_client *ds_clp,
- struct inode *inode);
+ struct inode *inode,
+ u32 dss_id);
const struct cred *ff_layout_get_ds_cred(struct nfs4_ff_layout_mirror *mirror,
const struct pnfs_layout_range *range,
- const struct cred *mdscred);
+ const struct cred *mdscred,
+ u32 dss_id);
bool ff_layout_avoid_mds_available_ds(struct pnfs_layout_segment *lseg);
bool ff_layout_avoid_read_on_rw(struct pnfs_layout_segment *lseg);
}
int ff_layout_track_ds_error(struct nfs4_flexfile_layout *flo,
- struct nfs4_ff_layout_mirror *mirror, u64 offset,
- u64 length, int status, enum nfs_opnum4 opnum,
- gfp_t gfp_flags)
+ struct nfs4_ff_layout_mirror *mirror,
+ u32 dss_id, u64 offset, u64 length, int status,
+ enum nfs_opnum4 opnum, gfp_t gfp_flags)
{
struct nfs4_ff_layout_ds_err *dserr;
if (status == 0)
return 0;
- if (IS_ERR_OR_NULL(mirror->dss[0].mirror_ds))
+ if (IS_ERR_OR_NULL(mirror->dss[dss_id].mirror_ds))
return -EINVAL;
dserr = kmalloc(sizeof(*dserr), gfp_flags);
dserr->length = length;
dserr->status = status;
dserr->opnum = opnum;
- nfs4_stateid_copy(&dserr->stateid, &mirror->dss[0].stateid);
- memcpy(&dserr->deviceid, &mirror->dss[0].mirror_ds->id_node.deviceid,
+ nfs4_stateid_copy(&dserr->stateid, &mirror->dss[dss_id].stateid);
+ memcpy(&dserr->deviceid, &mirror->dss[dss_id].mirror_ds->id_node.deviceid,
NFS4_DEVICEID4_SIZE);
spin_lock(&flo->generic_hdr.plh_inode->i_lock);
}
static const struct cred *
-ff_layout_get_mirror_cred(struct nfs4_ff_layout_mirror *mirror, u32 iomode)
+ff_layout_get_mirror_cred(struct nfs4_ff_layout_mirror *mirror, u32 iomode, u32 dss_id)
{
const struct cred *cred, __rcu **pcred;
if (iomode == IOMODE_READ)
- pcred = &mirror->dss[0].ro_cred;
+ pcred = &mirror->dss[dss_id].ro_cred;
else
- pcred = &mirror->dss[0].rw_cred;
+ pcred = &mirror->dss[dss_id].rw_cred;
rcu_read_lock();
do {
}
struct nfs_fh *
-nfs4_ff_layout_select_ds_fh(struct nfs4_ff_layout_mirror *mirror)
+nfs4_ff_layout_select_ds_fh(struct nfs4_ff_layout_mirror *mirror, u32 dss_id)
{
/* FIXME: For now assume there is only 1 version available for the DS */
- return &mirror->dss[0].fh_versions[0];
+ return &mirror->dss[dss_id].fh_versions[0];
}
void
nfs4_ff_layout_select_ds_stateid(const struct nfs4_ff_layout_mirror *mirror,
- nfs4_stateid *stateid)
+ u32 dss_id,
+ nfs4_stateid *stateid)
{
- if (nfs4_ff_layout_ds_version(mirror) == 4)
- nfs4_stateid_copy(stateid, &mirror->dss[0].stateid);
+ if (nfs4_ff_layout_ds_version(mirror, dss_id) == 4)
+ nfs4_stateid_copy(stateid, &mirror->dss[dss_id].stateid);
}
static bool
ff_layout_init_mirror_ds(struct pnfs_layout_hdr *lo,
- struct nfs4_ff_layout_mirror *mirror)
+ struct nfs4_ff_layout_mirror *mirror,
+ u32 dss_id)
{
if (mirror == NULL)
goto outerr;
- if (mirror->dss[0].mirror_ds == NULL) {
+ if (mirror->dss[dss_id].mirror_ds == NULL) {
struct nfs4_deviceid_node *node;
struct nfs4_ff_layout_ds *mirror_ds = ERR_PTR(-ENODEV);
node = nfs4_find_get_deviceid(NFS_SERVER(lo->plh_inode),
- &mirror->dss[0].devid, lo->plh_lc_cred,
+ &mirror->dss[dss_id].devid, lo->plh_lc_cred,
GFP_KERNEL);
if (node)
mirror_ds = FF_LAYOUT_MIRROR_DS(node);
/* check for race with another call to this function */
- if (cmpxchg(&mirror->dss[0].mirror_ds, NULL, mirror_ds) &&
+ if (cmpxchg(&mirror->dss[dss_id].mirror_ds, NULL, mirror_ds) &&
mirror_ds != ERR_PTR(-ENODEV))
nfs4_put_deviceid_node(node);
}
- if (IS_ERR(mirror->dss[0].mirror_ds))
+ if (IS_ERR(mirror->dss[dss_id].mirror_ds))
goto outerr;
return true;
* nfs4_ff_layout_prepare_ds - prepare a DS connection for an RPC call
* @lseg: the layout segment we're operating on
* @mirror: layout mirror describing the DS to use
+ * @dss_id: DS stripe id to select stripe to use
* @fail_return: return layout on connect failure?
*
* Try to prepare a DS connection to accept an RPC call. This involves
struct nfs4_pnfs_ds *
nfs4_ff_layout_prepare_ds(struct pnfs_layout_segment *lseg,
struct nfs4_ff_layout_mirror *mirror,
+ u32 dss_id,
bool fail_return)
{
struct nfs4_pnfs_ds *ds;
unsigned int max_payload;
int status = -EAGAIN;
- if (!ff_layout_init_mirror_ds(lseg->pls_layout, mirror))
+ if (!ff_layout_init_mirror_ds(lseg->pls_layout, mirror, dss_id))
goto noconnect;
- ds = mirror->dss[0].mirror_ds->ds;
+ ds = mirror->dss[dss_id].mirror_ds->ds;
if (READ_ONCE(ds->ds_clp))
goto out;
/* matching smp_wmb() in _nfs4_pnfs_v3/4_ds_connect */
/* FIXME: For now we assume the server sent only one version of NFS
* to use for the DS.
*/
- status = nfs4_pnfs_ds_connect(s, ds, &mirror->dss[0].mirror_ds->id_node,
+ status = nfs4_pnfs_ds_connect(s, ds, &mirror->dss[dss_id].mirror_ds->id_node,
dataserver_timeo, dataserver_retrans,
- mirror->dss[0].mirror_ds->ds_versions[0].version,
- mirror->dss[0].mirror_ds->ds_versions[0].minor_version);
+ mirror->dss[dss_id].mirror_ds->ds_versions[0].version,
+ mirror->dss[dss_id].mirror_ds->ds_versions[0].minor_version);
/* connect success, check rsize/wsize limit */
if (!status) {
max_payload =
nfs_block_size(rpc_max_payload(ds->ds_clp->cl_rpcclient),
NULL);
- if (mirror->dss[0].mirror_ds->ds_versions[0].rsize > max_payload)
- mirror->dss[0].mirror_ds->ds_versions[0].rsize = max_payload;
- if (mirror->dss[0].mirror_ds->ds_versions[0].wsize > max_payload)
- mirror->dss[0].mirror_ds->ds_versions[0].wsize = max_payload;
+ if (mirror->dss[dss_id].mirror_ds->ds_versions[0].rsize > max_payload)
+ mirror->dss[dss_id].mirror_ds->ds_versions[0].rsize = max_payload;
+ if (mirror->dss[dss_id].mirror_ds->ds_versions[0].wsize > max_payload)
+ mirror->dss[dss_id].mirror_ds->ds_versions[0].wsize = max_payload;
goto out;
}
noconnect:
ff_layout_track_ds_error(FF_LAYOUT_FROM_HDR(lseg->pls_layout),
- mirror, lseg->pls_range.offset,
+ mirror, dss_id, lseg->pls_range.offset,
lseg->pls_range.length, NFS4ERR_NXIO,
OP_ILLEGAL, GFP_NOIO);
ff_layout_send_layouterror(lseg);
const struct cred *
ff_layout_get_ds_cred(struct nfs4_ff_layout_mirror *mirror,
const struct pnfs_layout_range *range,
- const struct cred *mdscred)
+ const struct cred *mdscred,
+ u32 dss_id)
{
const struct cred *cred;
- if (mirror && !mirror->dss[0].mirror_ds->ds_versions[0].tightly_coupled) {
- cred = ff_layout_get_mirror_cred(mirror, range->iomode);
+ if (mirror && !mirror->dss[dss_id].mirror_ds->ds_versions[0].tightly_coupled) {
+ cred = ff_layout_get_mirror_cred(mirror, range->iomode, dss_id);
if (!cred)
cred = get_cred(mdscred);
} else {
* @mirror: pointer to the mirror
* @ds_clp: nfs_client for the DS
* @inode: pointer to inode
+ * @dss_id: DS stripe id
*
* Find or create a DS rpc client with th MDS server rpc client auth flavor
* in the nfs_client cl_ds_clients list.
*/
struct rpc_clnt *
nfs4_ff_find_or_create_ds_client(struct nfs4_ff_layout_mirror *mirror,
- struct nfs_client *ds_clp, struct inode *inode)
+ struct nfs_client *ds_clp, struct inode *inode,
+ u32 dss_id)
{
- switch (mirror->dss[0].mirror_ds->ds_versions[0].version) {
+ switch (mirror->dss[dss_id].mirror_ds->ds_versions[0].version) {
case 3:
/* For NFSv3 DS, flavor is set when creating DS connections */
return ds_clp->cl_rpcclient;
{
struct nfs4_ff_layout_mirror *mirror;
struct nfs4_deviceid_node *devid;
- u32 idx;
+ u32 idx, dss_id;
for (idx = 0; idx < FF_LAYOUT_MIRROR_COUNT(lseg); idx++) {
mirror = FF_LAYOUT_COMP(lseg, idx);
- if (mirror) {
- if (!mirror->dss[0].mirror_ds)
+ if (!mirror)
+ continue;
+ for (dss_id = 0; dss_id < mirror->dss_count; dss_id++) {
+ if (!mirror->dss[dss_id].mirror_ds)
return true;
- if (IS_ERR(mirror->dss[0].mirror_ds))
+ if (IS_ERR(mirror->dss[dss_id].mirror_ds))
continue;
- devid = &mirror->dss[0].mirror_ds->id_node;
+ devid = &mirror->dss[dss_id].mirror_ds->id_node;
if (!nfs4_test_deviceid_unavailable(devid))
return true;
}
{
struct nfs4_ff_layout_mirror *mirror;
struct nfs4_deviceid_node *devid;
- u32 idx;
+ u32 idx, dss_id;
for (idx = 0; idx < FF_LAYOUT_MIRROR_COUNT(lseg); idx++) {
mirror = FF_LAYOUT_COMP(lseg, idx);
- if (!mirror || IS_ERR(mirror->dss[0].mirror_ds))
- return false;
- if (!mirror->dss[0].mirror_ds)
- continue;
- devid = &mirror->dss[0].mirror_ds->id_node;
- if (nfs4_test_deviceid_unavailable(devid))
+ if (!mirror)
return false;
+ for (dss_id = 0; dss_id < mirror->dss_count; dss_id++) {
+ if (IS_ERR(mirror->dss[dss_id].mirror_ds))
+ return false;
+ if (!mirror->dss[dss_id].mirror_ds)
+ continue;
+ devid = &mirror->dss[dss_id].mirror_ds->id_node;
+ if (nfs4_test_deviceid_unavailable(devid))
+ return false;
+ }
}
return FF_LAYOUT_MIRROR_COUNT(lseg) != 0;