]> git.ipfire.org Git - people/ms/linux.git/blame - fs/ceph/addr.c
ceph: don't use d_add in ceph_handle_snapdir
[people/ms/linux.git] / fs / ceph / addr.c
CommitLineData
b2441318 1// SPDX-License-Identifier: GPL-2.0
3d14c5d2 2#include <linux/ceph/ceph_debug.h>
1d3576fd
SW
3
4#include <linux/backing-dev.h>
5#include <linux/fs.h>
6#include <linux/mm.h>
7#include <linux/pagemap.h>
8#include <linux/writeback.h> /* generic_writepages */
5a0e3ad6 9#include <linux/slab.h>
1d3576fd
SW
10#include <linux/pagevec.h>
11#include <linux/task_io_accounting_ops.h>
f361bf4a 12#include <linux/signal.h>
5c308356 13#include <linux/iversion.h>
97e27aaa 14#include <linux/ktime.h>
f0702876 15#include <linux/netfs.h>
1d3576fd
SW
16
17#include "super.h"
3d14c5d2 18#include "mds_client.h"
99ccbd22 19#include "cache.h"
97e27aaa 20#include "metric.h"
3d14c5d2 21#include <linux/ceph/osd_client.h>
08c1ac50 22#include <linux/ceph/striper.h>
1d3576fd
SW
23
24/*
25 * Ceph address space ops.
26 *
27 * There are a few funny things going on here.
28 *
29 * The page->private field is used to reference a struct
30 * ceph_snap_context for _every_ dirty page. This indicates which
31 * snapshot the page was logically dirtied in, and thus which snap
32 * context needs to be associated with the osd write during writeback.
33 *
34 * Similarly, struct ceph_inode_info maintains a set of counters to
25985edc 35 * count dirty pages on the inode. In the absence of snapshots,
1d3576fd
SW
36 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.
37 *
38 * When a snapshot is taken (that is, when the client receives
39 * notification that a snapshot was taken), each inode with caps and
40 * with dirty pages (dirty pages implies there is a cap) gets a new
41 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending
42 * order, new snaps go to the tail). The i_wrbuffer_ref_head count is
43 * moved to capsnap->dirty. (Unless a sync write is currently in
44 * progress. In that case, the capsnap is said to be "pending", new
45 * writes cannot start, and the capsnap isn't "finalized" until the
46 * write completes (or fails) and a final size/mtime for the inode for
47 * that snap can be settled upon.) i_wrbuffer_ref_head is reset to 0.
48 *
49 * On writeback, we must submit writes to the osd IN SNAP ORDER. So,
50 * we look for the first capsnap in i_cap_snaps and write out pages in
51 * that snap context _only_. Then we move on to the next capsnap,
52 * eventually reaching the "live" or "head" context (i.e., pages that
53 * are not yet snapped) and are writing the most recently dirtied
54 * pages.
55 *
56 * Invalidate and so forth must take care to ensure the dirty page
57 * accounting is preserved.
58 */
59
2baba250
YS
60#define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
61#define CONGESTION_OFF_THRESH(congestion_kb) \
62 (CONGESTION_ON_THRESH(congestion_kb) - \
63 (CONGESTION_ON_THRESH(congestion_kb) >> 2))
64
d801327d
JL
65static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
66 struct page *page, void **_fsdata);
67
61600ef8
YZ
68static inline struct ceph_snap_context *page_snap_context(struct page *page)
69{
70 if (PagePrivate(page))
71 return (void *)page->private;
72 return NULL;
73}
1d3576fd
SW
74
75/*
76 * Dirty a page. Optimistically adjust accounting, on the assumption
77 * that we won't race with invalidate. If we do, readjust.
78 */
79static int ceph_set_page_dirty(struct page *page)
80{
81 struct address_space *mapping = page->mapping;
82 struct inode *inode;
83 struct ceph_inode_info *ci;
1d3576fd 84 struct ceph_snap_context *snapc;
7d6e1f54 85 int ret;
1d3576fd
SW
86
87 if (unlikely(!mapping))
88 return !TestSetPageDirty(page);
89
7d6e1f54 90 if (PageDirty(page)) {
1d3576fd
SW
91 dout("%p set_page_dirty %p idx %lu -- already dirty\n",
92 mapping->host, page, page->index);
7d6e1f54 93 BUG_ON(!PagePrivate(page));
1d3576fd
SW
94 return 0;
95 }
96
97 inode = mapping->host;
98 ci = ceph_inode(inode);
99
1d3576fd 100 /* dirty the head */
be655596 101 spin_lock(&ci->i_ceph_lock);
5dda377c
YZ
102 BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
103 if (__ceph_have_pending_cap_snap(ci)) {
104 struct ceph_cap_snap *capsnap =
105 list_last_entry(&ci->i_cap_snaps,
106 struct ceph_cap_snap,
107 ci_item);
108 snapc = ceph_get_snap_context(capsnap->context);
109 capsnap->dirty_pages++;
110 } else {
111 BUG_ON(!ci->i_head_snapc);
112 snapc = ceph_get_snap_context(ci->i_head_snapc);
113 ++ci->i_wrbuffer_ref_head;
114 }
1d3576fd 115 if (ci->i_wrbuffer_ref == 0)
0444d76a 116 ihold(inode);
1d3576fd
SW
117 ++ci->i_wrbuffer_ref;
118 dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "
119 "snapc %p seq %lld (%d snaps)\n",
120 mapping->host, page, page->index,
121 ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
122 ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
123 snapc, snapc->seq, snapc->num_snaps);
be655596 124 spin_unlock(&ci->i_ceph_lock);
1d3576fd 125
7d6e1f54
SZ
126 /*
127 * Reference snap context in page->private. Also set
128 * PagePrivate so that we get invalidatepage callback.
129 */
130 BUG_ON(PagePrivate(page));
131 page->private = (unsigned long)snapc;
132 SetPagePrivate(page);
1d3576fd 133
7d6e1f54
SZ
134 ret = __set_page_dirty_nobuffers(page);
135 WARN_ON(!PageLocked(page));
136 WARN_ON(!page->mapping);
1d3576fd 137
7d6e1f54 138 return ret;
1d3576fd
SW
139}
140
141/*
142 * If we are truncating the full page (i.e. offset == 0), adjust the
143 * dirty page counters appropriately. Only called if there is private
144 * data on the page.
145 */
d47992f8
LC
146static void ceph_invalidatepage(struct page *page, unsigned int offset,
147 unsigned int length)
1d3576fd 148{
4ce1e9ad 149 struct inode *inode;
1d3576fd 150 struct ceph_inode_info *ci;
61600ef8 151 struct ceph_snap_context *snapc = page_snap_context(page);
1d3576fd 152
7c46b318
JL
153 wait_on_page_fscache(page);
154
4ce1e9ad 155 inode = page->mapping->host;
b150f5c1
MT
156 ci = ceph_inode(inode);
157
09cbfeaf 158 if (offset != 0 || length != PAGE_SIZE) {
b150f5c1
MT
159 dout("%p invalidatepage %p idx %lu partial dirty page %u~%u\n",
160 inode, page, page->index, offset, length);
161 return;
162 }
4ce1e9ad 163
b072d774 164 WARN_ON(!PageLocked(page));
99ccbd22
MT
165 if (!PagePrivate(page))
166 return;
167
b150f5c1
MT
168 dout("%p invalidatepage %p idx %lu full dirty page\n",
169 inode, page, page->index);
170
171 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
172 ceph_put_snap_context(snapc);
173 page->private = 0;
174 ClearPagePrivate(page);
1d3576fd
SW
175}
176
7c46b318 177static int ceph_releasepage(struct page *page, gfp_t gfp)
1d3576fd 178{
e55f1a18
N
179 dout("%p releasepage %p idx %lu (%sdirty)\n", page->mapping->host,
180 page, page->index, PageDirty(page) ? "" : "not ");
99ccbd22 181
7c46b318
JL
182 if (PageFsCache(page)) {
183 if (!(gfp & __GFP_DIRECT_RECLAIM) || !(gfp & __GFP_FS))
184 return 0;
185 wait_on_page_fscache(page);
186 }
99ccbd22 187 return !PagePrivate(page);
1d3576fd
SW
188}
189
f0702876
JL
190static void ceph_netfs_expand_readahead(struct netfs_read_request *rreq)
191{
192 struct inode *inode = rreq->mapping->host;
193 struct ceph_inode_info *ci = ceph_inode(inode);
194 struct ceph_file_layout *lo = &ci->i_layout;
195 u32 blockoff;
196 u64 blockno;
197
198 /* Expand the start downward */
199 blockno = div_u64_rem(rreq->start, lo->stripe_unit, &blockoff);
200 rreq->start = blockno * lo->stripe_unit;
201 rreq->len += blockoff;
202
203 /* Now, round up the length to the next block */
204 rreq->len = roundup(rreq->len, lo->stripe_unit);
205}
206
207static bool ceph_netfs_clamp_length(struct netfs_read_subrequest *subreq)
208{
209 struct inode *inode = subreq->rreq->mapping->host;
210 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
211 struct ceph_inode_info *ci = ceph_inode(inode);
212 u64 objno, objoff;
213 u32 xlen;
214
215 /* Truncate the extent at the end of the current block */
216 ceph_calc_file_object_mapping(&ci->i_layout, subreq->start, subreq->len,
217 &objno, &objoff, &xlen);
218 subreq->len = min(xlen, fsc->mount_options->rsize);
219 return true;
220}
221
222static void finish_netfs_read(struct ceph_osd_request *req)
223{
224 struct ceph_fs_client *fsc = ceph_inode_to_client(req->r_inode);
225 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
226 struct netfs_read_subrequest *subreq = req->r_priv;
227 int num_pages;
228 int err = req->r_result;
229
230 ceph_update_read_latency(&fsc->mdsc->metric, req->r_start_latency,
231 req->r_end_latency, err);
232
233 dout("%s: result %d subreq->len=%zu i_size=%lld\n", __func__, req->r_result,
234 subreq->len, i_size_read(req->r_inode));
235
236 /* no object means success but no data */
237 if (err == -ENOENT)
238 err = 0;
239 else if (err == -EBLOCKLISTED)
240 fsc->blocklisted = true;
241
242 if (err >= 0 && err < subreq->len)
243 __set_bit(NETFS_SREQ_CLEAR_TAIL, &subreq->flags);
244
245 netfs_subreq_terminated(subreq, err, true);
246
247 num_pages = calc_pages_for(osd_data->alignment, osd_data->length);
248 ceph_put_page_vector(osd_data->pages, num_pages, false);
249 iput(req->r_inode);
250}
251
252static void ceph_netfs_issue_op(struct netfs_read_subrequest *subreq)
253{
254 struct netfs_read_request *rreq = subreq->rreq;
255 struct inode *inode = rreq->mapping->host;
256 struct ceph_inode_info *ci = ceph_inode(inode);
257 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
258 struct ceph_osd_request *req;
259 struct ceph_vino vino = ceph_vino(inode);
260 struct iov_iter iter;
261 struct page **pages;
262 size_t page_off;
263 int err = 0;
264 u64 len = subreq->len;
265
266 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, vino, subreq->start, &len,
267 0, 1, CEPH_OSD_OP_READ,
268 CEPH_OSD_FLAG_READ | fsc->client->osdc.client->options->read_from_replica,
269 NULL, ci->i_truncate_seq, ci->i_truncate_size, false);
270 if (IS_ERR(req)) {
271 err = PTR_ERR(req);
272 req = NULL;
273 goto out;
274 }
275
276 dout("%s: pos=%llu orig_len=%zu len=%llu\n", __func__, subreq->start, subreq->len, len);
277 iov_iter_xarray(&iter, READ, &rreq->mapping->i_pages, subreq->start, len);
278 err = iov_iter_get_pages_alloc(&iter, &pages, len, &page_off);
279 if (err < 0) {
280 dout("%s: iov_ter_get_pages_alloc returned %d\n", __func__, err);
281 goto out;
282 }
283
284 /* should always give us a page-aligned read */
285 WARN_ON_ONCE(page_off);
286 len = err;
287
288 osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0, false, false);
289 req->r_callback = finish_netfs_read;
290 req->r_priv = subreq;
291 req->r_inode = inode;
292 ihold(inode);
293
294 err = ceph_osdc_start_request(req->r_osdc, req, false);
295 if (err)
296 iput(inode);
297out:
298 ceph_osdc_put_request(req);
299 if (err)
300 netfs_subreq_terminated(subreq, err, false);
301 dout("%s: result %d\n", __func__, err);
302}
303
304static void ceph_init_rreq(struct netfs_read_request *rreq, struct file *file)
305{
306}
307
49870056
JL
308static void ceph_readahead_cleanup(struct address_space *mapping, void *priv)
309{
310 struct inode *inode = mapping->host;
311 struct ceph_inode_info *ci = ceph_inode(inode);
312 int got = (uintptr_t)priv;
313
314 if (got)
315 ceph_put_cap_refs(ci, got);
316}
317
f0702876
JL
318const struct netfs_read_request_ops ceph_netfs_read_ops = {
319 .init_rreq = ceph_init_rreq,
320 .is_cache_enabled = ceph_is_cache_enabled,
321 .begin_cache_operation = ceph_begin_cache_operation,
322 .issue_op = ceph_netfs_issue_op,
323 .expand_readahead = ceph_netfs_expand_readahead,
324 .clamp_length = ceph_netfs_clamp_length,
d801327d 325 .check_write_begin = ceph_netfs_check_write_begin,
49870056 326 .cleanup = ceph_readahead_cleanup,
f0702876
JL
327};
328
329/* read a single page, without unlocking it. */
330static int ceph_readpage(struct file *file, struct page *page)
331{
332 struct inode *inode = file_inode(file);
333 struct ceph_inode_info *ci = ceph_inode(inode);
334 struct ceph_vino vino = ceph_vino(inode);
335 u64 off = page_offset(page);
336 u64 len = PAGE_SIZE;
337
338 if (ci->i_inline_version != CEPH_INLINE_NONE) {
339 /*
340 * Uptodate inline data should have been added
341 * into page cache while getting Fcr caps.
342 */
343 if (off == 0) {
344 unlock_page(page);
345 return -EINVAL;
346 }
347 zero_user_segment(page, 0, PAGE_SIZE);
348 SetPageUptodate(page);
349 unlock_page(page);
350 return 0;
351 }
352
353 dout("readpage ino %llx.%llx file %p off %llu len %llu page %p index %lu\n",
354 vino.ino, vino.snap, file, off, len, page, page->index);
355
356 return netfs_readpage(file, page, &ceph_netfs_read_ops, NULL);
357}
358
49870056 359static void ceph_readahead(struct readahead_control *ractl)
1d3576fd 360{
49870056
JL
361 struct inode *inode = file_inode(ractl->file);
362 struct ceph_file_info *fi = ractl->file->private_data;
363 struct ceph_rw_context *rw_ctx;
2b1ac852
YZ
364 int got = 0;
365 int ret = 0;
366
49870056
JL
367 if (ceph_inode(inode)->i_inline_version != CEPH_INLINE_NONE)
368 return;
369
370 rw_ctx = ceph_find_rw_context(fi);
5d988308 371 if (!rw_ctx) {
49870056
JL
372 /*
373 * readahead callers do not necessarily hold Fcb caps
374 * (e.g. fadvise, madvise).
375 */
2b1ac852 376 int want = CEPH_CAP_FILE_CACHE;
49870056
JL
377
378 ret = ceph_try_get_caps(inode, CEPH_CAP_FILE_RD, want, true, &got);
379 if (ret < 0)
2b1ac852 380 dout("start_read %p, error getting cap\n", inode);
49870056 381 else if (!(got & want))
2b1ac852 382 dout("start_read %p, no cache cap\n", inode);
1d3576fd 383
49870056
JL
384 if (ret <= 0)
385 return;
2b1ac852 386 }
49870056 387 netfs_readahead(ractl, &ceph_netfs_read_ops, (void *)(uintptr_t)got);
1d3576fd
SW
388}
389
1f934b00
YZ
390struct ceph_writeback_ctl
391{
392 loff_t i_size;
393 u64 truncate_size;
394 u32 truncate_seq;
395 bool size_stable;
2a2d927e 396 bool head_snapc;
1f934b00
YZ
397};
398
1d3576fd
SW
399/*
400 * Get ref for the oldest snapc for an inode with dirty data... that is, the
401 * only snap context we are allowed to write back.
1d3576fd 402 */
1f934b00 403static struct ceph_snap_context *
05455e11
YZ
404get_oldest_context(struct inode *inode, struct ceph_writeback_ctl *ctl,
405 struct ceph_snap_context *page_snapc)
1d3576fd
SW
406{
407 struct ceph_inode_info *ci = ceph_inode(inode);
408 struct ceph_snap_context *snapc = NULL;
409 struct ceph_cap_snap *capsnap = NULL;
410
be655596 411 spin_lock(&ci->i_ceph_lock);
1d3576fd
SW
412 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
413 dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
414 capsnap->context, capsnap->dirty_pages);
05455e11
YZ
415 if (!capsnap->dirty_pages)
416 continue;
417
418 /* get i_size, truncate_{seq,size} for page_snapc? */
419 if (snapc && capsnap->context != page_snapc)
420 continue;
421
422 if (ctl) {
423 if (capsnap->writing) {
424 ctl->i_size = i_size_read(inode);
425 ctl->size_stable = false;
426 } else {
427 ctl->i_size = capsnap->size;
428 ctl->size_stable = true;
1f934b00 429 }
05455e11
YZ
430 ctl->truncate_size = capsnap->truncate_size;
431 ctl->truncate_seq = capsnap->truncate_seq;
2a2d927e 432 ctl->head_snapc = false;
1d3576fd 433 }
05455e11
YZ
434
435 if (snapc)
436 break;
437
438 snapc = ceph_get_snap_context(capsnap->context);
439 if (!page_snapc ||
440 page_snapc == snapc ||
441 page_snapc->seq > snapc->seq)
442 break;
1d3576fd 443 }
7d8cb26d 444 if (!snapc && ci->i_wrbuffer_ref_head) {
80e755fe 445 snapc = ceph_get_snap_context(ci->i_head_snapc);
1d3576fd
SW
446 dout(" head snapc %p has %d dirty pages\n",
447 snapc, ci->i_wrbuffer_ref_head);
1f934b00
YZ
448 if (ctl) {
449 ctl->i_size = i_size_read(inode);
450 ctl->truncate_size = ci->i_truncate_size;
451 ctl->truncate_seq = ci->i_truncate_seq;
452 ctl->size_stable = false;
2a2d927e 453 ctl->head_snapc = true;
1f934b00 454 }
1d3576fd 455 }
be655596 456 spin_unlock(&ci->i_ceph_lock);
1d3576fd
SW
457 return snapc;
458}
459
1f934b00
YZ
460static u64 get_writepages_data_length(struct inode *inode,
461 struct page *page, u64 start)
462{
463 struct ceph_inode_info *ci = ceph_inode(inode);
464 struct ceph_snap_context *snapc = page_snap_context(page);
465 struct ceph_cap_snap *capsnap = NULL;
466 u64 end = i_size_read(inode);
467
468 if (snapc != ci->i_head_snapc) {
469 bool found = false;
470 spin_lock(&ci->i_ceph_lock);
471 list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
472 if (capsnap->context == snapc) {
473 if (!capsnap->writing)
474 end = capsnap->size;
475 found = true;
476 break;
477 }
478 }
479 spin_unlock(&ci->i_ceph_lock);
480 WARN_ON(!found);
481 }
482 if (end > page_offset(page) + PAGE_SIZE)
483 end = page_offset(page) + PAGE_SIZE;
484 return end > start ? end - start : 0;
485}
486
1d3576fd
SW
487/*
488 * Write a single page, but leave the page locked.
489 *
b72b13eb 490 * If we get a write error, mark the mapping for error, but still adjust the
1d3576fd
SW
491 * dirty page accounting (i.e., page is no longer dirty).
492 */
493static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
494{
6390987f
JL
495 struct inode *inode = page->mapping->host;
496 struct ceph_inode_info *ci = ceph_inode(inode);
497 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
6298a337 498 struct ceph_snap_context *snapc, *oldest;
fc2744aa 499 loff_t page_off = page_offset(page);
6390987f
JL
500 int err;
501 loff_t len = PAGE_SIZE;
1f934b00 502 struct ceph_writeback_ctl ceph_wbc;
6390987f
JL
503 struct ceph_osd_client *osdc = &fsc->client->osdc;
504 struct ceph_osd_request *req;
1d3576fd
SW
505
506 dout("writepage %p idx %lu\n", page, page->index);
507
1d3576fd 508 /* verify this is a writeable snap context */
61600ef8 509 snapc = page_snap_context(page);
d37b1d99 510 if (!snapc) {
1d3576fd 511 dout("writepage %p page %p not dirty?\n", inode, page);
43986881 512 return 0;
1d3576fd 513 }
05455e11 514 oldest = get_oldest_context(inode, &ceph_wbc, snapc);
6298a337 515 if (snapc->seq > oldest->seq) {
1d3576fd 516 dout("writepage %p page %p snapc %p not writeable - noop\n",
61600ef8 517 inode, page, snapc);
1d3576fd 518 /* we should only noop if called by kswapd */
fa71fefb 519 WARN_ON(!(current->flags & PF_MEMALLOC));
6298a337 520 ceph_put_snap_context(oldest);
fa71fefb 521 redirty_page_for_writepage(wbc, page);
43986881 522 return 0;
1d3576fd 523 }
6298a337 524 ceph_put_snap_context(oldest);
1d3576fd
SW
525
526 /* is this a partial page at end of file? */
1f934b00
YZ
527 if (page_off >= ceph_wbc.i_size) {
528 dout("%p page eof %llu\n", page, ceph_wbc.i_size);
05455e11 529 page->mapping->a_ops->invalidatepage(page, 0, PAGE_SIZE);
43986881 530 return 0;
fc2744aa 531 }
43986881 532
1f934b00
YZ
533 if (ceph_wbc.i_size < page_off + len)
534 len = ceph_wbc.i_size - page_off;
1d3576fd 535
6390987f 536 dout("writepage %p page %p index %lu on %llu~%llu snapc %p seq %lld\n",
1c0a9c2d 537 inode, page, page->index, page_off, len, snapc, snapc->seq);
1d3576fd 538
314c4737 539 if (atomic_long_inc_return(&fsc->writeback_count) >
3d14c5d2 540 CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
09dc9fc2 541 set_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
2baba250 542
1d3576fd 543 set_page_writeback(page);
6390987f
JL
544 req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode), page_off, &len, 0, 1,
545 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE, snapc,
546 ceph_wbc.truncate_seq, ceph_wbc.truncate_size,
547 true);
548 if (IS_ERR(req)) {
549 redirty_page_for_writepage(wbc, page);
550 end_page_writeback(page);
551 return PTR_ERR(req);
552 }
553
554 /* it may be a short write due to an object boundary */
555 WARN_ON_ONCE(len > PAGE_SIZE);
556 osd_req_op_extent_osd_data_pages(req, 0, &page, len, 0, false, false);
557 dout("writepage %llu~%llu (%llu bytes)\n", page_off, len, len);
558
559 req->r_mtime = inode->i_mtime;
560 err = ceph_osdc_start_request(osdc, req, true);
561 if (!err)
562 err = ceph_osdc_wait_request(osdc, req);
563
564 ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
565 req->r_end_latency, err);
566
567 ceph_osdc_put_request(req);
568 if (err == 0)
569 err = len;
570
1d3576fd 571 if (err < 0) {
ad15ec06
YZ
572 struct writeback_control tmp_wbc;
573 if (!wbc)
574 wbc = &tmp_wbc;
575 if (err == -ERESTARTSYS) {
576 /* killed by SIGKILL */
577 dout("writepage interrupted page %p\n", page);
578 redirty_page_for_writepage(wbc, page);
579 end_page_writeback(page);
43986881 580 return err;
ad15ec06 581 }
0b98acd6
ID
582 if (err == -EBLOCKLISTED)
583 fsc->blocklisted = true;
ad15ec06
YZ
584 dout("writepage setting page/mapping error %d %p\n",
585 err, page);
1d3576fd 586 mapping_set_error(&inode->i_data, err);
ad15ec06 587 wbc->pages_skipped++;
1d3576fd
SW
588 } else {
589 dout("writepage cleaned page %p\n", page);
590 err = 0; /* vfs expects us to return 0 */
591 }
592 page->private = 0;
593 ClearPagePrivate(page);
594 end_page_writeback(page);
595 ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
6298a337 596 ceph_put_snap_context(snapc); /* page's reference */
314c4737
YZ
597
598 if (atomic_long_dec_return(&fsc->writeback_count) <
599 CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
600 clear_bdi_congested(inode_to_bdi(inode), BLK_RW_ASYNC);
601
1d3576fd
SW
602 return err;
603}
604
605static int ceph_writepage(struct page *page, struct writeback_control *wbc)
606{
dbd646a8
YS
607 int err;
608 struct inode *inode = page->mapping->host;
609 BUG_ON(!inode);
70b666c3 610 ihold(inode);
dbd646a8 611 err = writepage_nounlock(page, wbc);
ad15ec06
YZ
612 if (err == -ERESTARTSYS) {
613 /* direct memory reclaimer was killed by SIGKILL. return 0
614 * to prevent caller from setting mapping/page error */
615 err = 0;
616 }
1d3576fd 617 unlock_page(page);
dbd646a8 618 iput(inode);
1d3576fd
SW
619 return err;
620}
621
1d3576fd
SW
622/*
623 * async writeback completion handler.
624 *
625 * If we get an error, set the mapping error bit, but not the individual
626 * page error bits.
627 */
85e084fe 628static void writepages_finish(struct ceph_osd_request *req)
1d3576fd
SW
629{
630 struct inode *inode = req->r_inode;
1d3576fd 631 struct ceph_inode_info *ci = ceph_inode(inode);
87060c10 632 struct ceph_osd_data *osd_data;
1d3576fd 633 struct page *page;
5b64640c
YZ
634 int num_pages, total_pages = 0;
635 int i, j;
636 int rc = req->r_result;
1d3576fd
SW
637 struct ceph_snap_context *snapc = req->r_snapc;
638 struct address_space *mapping = inode->i_mapping;
3d14c5d2 639 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
5b64640c 640 bool remove_page;
1d3576fd 641
5b64640c 642 dout("writepages_finish %p rc %d\n", inode, rc);
26544c62 643 if (rc < 0) {
1d3576fd 644 mapping_set_error(mapping, rc);
26544c62 645 ceph_set_error_write(ci);
0b98acd6
ID
646 if (rc == -EBLOCKLISTED)
647 fsc->blocklisted = true;
26544c62
JL
648 } else {
649 ceph_clear_error_write(ci);
650 }
5b64640c 651
97e27aaa
XL
652 ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
653 req->r_end_latency, rc);
654
5b64640c
YZ
655 /*
656 * We lost the cache cap, need to truncate the page before
657 * it is unlocked, otherwise we'd truncate it later in the
658 * page truncation thread, possibly losing some data that
659 * raced its way in
660 */
661 remove_page = !(ceph_caps_issued(ci) &
662 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
1d3576fd
SW
663
664 /* clean all pages */
5b64640c
YZ
665 for (i = 0; i < req->r_num_ops; i++) {
666 if (req->r_ops[i].op != CEPH_OSD_OP_WRITE)
667 break;
e63dc5c7 668
5b64640c
YZ
669 osd_data = osd_req_op_extent_osd_data(req, i);
670 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
671 num_pages = calc_pages_for((u64)osd_data->alignment,
672 (u64)osd_data->length);
673 total_pages += num_pages;
674 for (j = 0; j < num_pages; j++) {
675 page = osd_data->pages[j];
676 BUG_ON(!page);
677 WARN_ON(!PageUptodate(page));
678
679 if (atomic_long_dec_return(&fsc->writeback_count) <
680 CONGESTION_OFF_THRESH(
681 fsc->mount_options->congestion_kb))
09dc9fc2 682 clear_bdi_congested(inode_to_bdi(inode),
5b64640c
YZ
683 BLK_RW_ASYNC);
684
685 ceph_put_snap_context(page_snap_context(page));
686 page->private = 0;
687 ClearPagePrivate(page);
688 dout("unlocking %p\n", page);
689 end_page_writeback(page);
690
691 if (remove_page)
692 generic_error_remove_page(inode->i_mapping,
693 page);
694
695 unlock_page(page);
696 }
697 dout("writepages_finish %p wrote %llu bytes cleaned %d pages\n",
698 inode, osd_data->length, rc >= 0 ? num_pages : 0);
e63dc5c7 699
96ac9158 700 release_pages(osd_data->pages, num_pages);
1d3576fd 701 }
1d3576fd 702
5b64640c
YZ
703 ceph_put_wrbuffer_cap_refs(ci, total_pages, snapc);
704
705 osd_data = osd_req_op_extent_osd_data(req, 0);
87060c10 706 if (osd_data->pages_from_pool)
a0102bda 707 mempool_free(osd_data->pages, ceph_wb_pagevec_pool);
1d3576fd 708 else
87060c10 709 kfree(osd_data->pages);
1d3576fd
SW
710 ceph_osdc_put_request(req);
711}
712
1d3576fd
SW
713/*
714 * initiate async writeback
715 */
716static int ceph_writepages_start(struct address_space *mapping,
717 struct writeback_control *wbc)
718{
719 struct inode *inode = mapping->host;
1d3576fd 720 struct ceph_inode_info *ci = ceph_inode(inode);
fc2744aa
YZ
721 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
722 struct ceph_vino vino = ceph_vino(inode);
2a2d927e 723 pgoff_t index, start_index, end = -1;
80e755fe 724 struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
1d3576fd 725 struct pagevec pvec;
1d3576fd 726 int rc = 0;
93407472 727 unsigned int wsize = i_blocksize(inode);
1d3576fd 728 struct ceph_osd_request *req = NULL;
1f934b00 729 struct ceph_writeback_ctl ceph_wbc;
590e9d98 730 bool should_loop, range_whole = false;
af9cc401 731 bool done = false;
1d3576fd 732
3fb99d48 733 dout("writepages_start %p (mode=%s)\n", inode,
1d3576fd
SW
734 wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
735 (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
736
50c9132d 737 if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
6c93df5d
YZ
738 if (ci->i_wrbuffer_ref > 0) {
739 pr_warn_ratelimited(
740 "writepage_start %p %lld forced umount\n",
741 inode, ceph_ino(inode));
742 }
a341d4df 743 mapping_set_error(mapping, -EIO);
1d3576fd
SW
744 return -EIO; /* we're in a forced umount, don't write! */
745 }
95cca2b4 746 if (fsc->mount_options->wsize < wsize)
3d14c5d2 747 wsize = fsc->mount_options->wsize;
1d3576fd 748
86679820 749 pagevec_init(&pvec);
1d3576fd 750
590e9d98 751 start_index = wbc->range_cyclic ? mapping->writeback_index : 0;
2a2d927e 752 index = start_index;
1d3576fd
SW
753
754retry:
755 /* find oldest snap context with dirty data */
05455e11 756 snapc = get_oldest_context(inode, &ceph_wbc, NULL);
1d3576fd
SW
757 if (!snapc) {
758 /* hmm, why does writepages get called when there
759 is no dirty data? */
760 dout(" no snap context with dirty data?\n");
761 goto out;
762 }
763 dout(" oldest snapc is %p seq %lld (%d snaps)\n",
764 snapc, snapc->seq, snapc->num_snaps);
fc2744aa 765
2a2d927e
YZ
766 should_loop = false;
767 if (ceph_wbc.head_snapc && snapc != last_snapc) {
768 /* where to start/end? */
769 if (wbc->range_cyclic) {
770 index = start_index;
771 end = -1;
772 if (index > 0)
773 should_loop = true;
774 dout(" cyclic, start at %lu\n", index);
775 } else {
776 index = wbc->range_start >> PAGE_SHIFT;
777 end = wbc->range_end >> PAGE_SHIFT;
778 if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
779 range_whole = true;
780 dout(" not cyclic, %lu to %lu\n", index, end);
781 }
782 } else if (!ceph_wbc.head_snapc) {
783 /* Do not respect wbc->range_{start,end}. Dirty pages
784 * in that range can be associated with newer snapc.
785 * They are not writeable until we write all dirty pages
786 * associated with 'snapc' get written */
1582af2e 787 if (index > 0)
2a2d927e
YZ
788 should_loop = true;
789 dout(" non-head snapc, range whole\n");
1d3576fd 790 }
2a2d927e
YZ
791
792 ceph_put_snap_context(last_snapc);
1d3576fd
SW
793 last_snapc = snapc;
794
af9cc401 795 while (!done && index <= end) {
5b64640c 796 int num_ops = 0, op_idx;
0e5ecac7 797 unsigned i, pvec_pages, max_pages, locked_pages = 0;
5b64640c 798 struct page **pages = NULL, **data_pages;
1d3576fd 799 struct page *page;
0e5ecac7 800 pgoff_t strip_unit_end = 0;
5b64640c 801 u64 offset = 0, len = 0;
a0102bda 802 bool from_pool = false;
1d3576fd 803
0e5ecac7 804 max_pages = wsize >> PAGE_SHIFT;
1d3576fd
SW
805
806get_more_pages:
2e169296
JL
807 pvec_pages = pagevec_lookup_range_tag(&pvec, mapping, &index,
808 end, PAGECACHE_TAG_DIRTY);
0ed75fc8 809 dout("pagevec_lookup_range_tag got %d\n", pvec_pages);
1d3576fd
SW
810 if (!pvec_pages && !locked_pages)
811 break;
812 for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
813 page = pvec.pages[i];
814 dout("? %p idx %lu\n", page, page->index);
815 if (locked_pages == 0)
816 lock_page(page); /* first page */
817 else if (!trylock_page(page))
818 break;
819
820 /* only dirty pages, or our accounting breaks */
821 if (unlikely(!PageDirty(page)) ||
822 unlikely(page->mapping != mapping)) {
823 dout("!dirty or !mapping %p\n", page);
824 unlock_page(page);
0713e5f2 825 continue;
1d3576fd 826 }
af9cc401
YZ
827 /* only if matching snap context */
828 pgsnapc = page_snap_context(page);
829 if (pgsnapc != snapc) {
830 dout("page snapc %p %lld != oldest %p %lld\n",
831 pgsnapc, pgsnapc->seq, snapc, snapc->seq);
1582af2e
YZ
832 if (!should_loop &&
833 !ceph_wbc.head_snapc &&
834 wbc->sync_mode != WB_SYNC_NONE)
835 should_loop = true;
1d3576fd 836 unlock_page(page);
af9cc401 837 continue;
1d3576fd 838 }
1f934b00
YZ
839 if (page_offset(page) >= ceph_wbc.i_size) {
840 dout("%p page eof %llu\n",
841 page, ceph_wbc.i_size);
c95f1c5f
EC
842 if ((ceph_wbc.size_stable ||
843 page_offset(page) >= i_size_read(inode)) &&
844 clear_page_dirty_for_io(page))
af9cc401
YZ
845 mapping->a_ops->invalidatepage(page,
846 0, PAGE_SIZE);
847 unlock_page(page);
848 continue;
849 }
850 if (strip_unit_end && (page->index > strip_unit_end)) {
851 dout("end of strip unit %p\n", page);
1d3576fd
SW
852 unlock_page(page);
853 break;
854 }
855 if (PageWriteback(page)) {
0713e5f2
YZ
856 if (wbc->sync_mode == WB_SYNC_NONE) {
857 dout("%p under writeback\n", page);
858 unlock_page(page);
859 continue;
860 }
861 dout("waiting on writeback %p\n", page);
862 wait_on_page_writeback(page);
1d3576fd
SW
863 }
864
1d3576fd
SW
865 if (!clear_page_dirty_for_io(page)) {
866 dout("%p !clear_page_dirty_for_io\n", page);
867 unlock_page(page);
0713e5f2 868 continue;
1d3576fd
SW
869 }
870
e5975c7c
AE
871 /*
872 * We have something to write. If this is
873 * the first locked page this time through,
5b64640c
YZ
874 * calculate max possinle write size and
875 * allocate a page array
e5975c7c 876 */
1d3576fd 877 if (locked_pages == 0) {
5b64640c
YZ
878 u64 objnum;
879 u64 objoff;
dccbf080 880 u32 xlen;
5b64640c 881
1d3576fd 882 /* prepare async write request */
e5975c7c 883 offset = (u64)page_offset(page);
dccbf080
ID
884 ceph_calc_file_object_mapping(&ci->i_layout,
885 offset, wsize,
886 &objnum, &objoff,
887 &xlen);
888 len = xlen;
8c71897b 889
3fb99d48 890 num_ops = 1;
5b64640c 891 strip_unit_end = page->index +
09cbfeaf 892 ((len - 1) >> PAGE_SHIFT);
88486957 893
5b64640c 894 BUG_ON(pages);
88486957 895 max_pages = calc_pages_for(0, (u64)len);
6da2ec56
KC
896 pages = kmalloc_array(max_pages,
897 sizeof(*pages),
898 GFP_NOFS);
88486957 899 if (!pages) {
a0102bda
JL
900 from_pool = true;
901 pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
e5975c7c 902 BUG_ON(!pages);
88486957 903 }
5b64640c
YZ
904
905 len = 0;
906 } else if (page->index !=
09cbfeaf 907 (offset + len) >> PAGE_SHIFT) {
a0102bda
JL
908 if (num_ops >= (from_pool ? CEPH_OSD_SLAB_OPS :
909 CEPH_OSD_MAX_OPS)) {
5b64640c
YZ
910 redirty_page_for_writepage(wbc, page);
911 unlock_page(page);
912 break;
913 }
914
915 num_ops++;
916 offset = (u64)page_offset(page);
917 len = 0;
1d3576fd
SW
918 }
919
920 /* note position of first page in pvec */
1d3576fd
SW
921 dout("%p will write page %p idx %lu\n",
922 inode, page, page->index);
2baba250 923
5b64640c
YZ
924 if (atomic_long_inc_return(&fsc->writeback_count) >
925 CONGESTION_ON_THRESH(
3d14c5d2 926 fsc->mount_options->congestion_kb)) {
09dc9fc2 927 set_bdi_congested(inode_to_bdi(inode),
213c99ee 928 BLK_RW_ASYNC);
2baba250
YS
929 }
930
0713e5f2
YZ
931
932 pages[locked_pages++] = page;
933 pvec.pages[i] = NULL;
934
09cbfeaf 935 len += PAGE_SIZE;
1d3576fd
SW
936 }
937
938 /* did we get anything? */
939 if (!locked_pages)
940 goto release_pvec_pages;
941 if (i) {
0713e5f2
YZ
942 unsigned j, n = 0;
943 /* shift unused page to beginning of pvec */
944 for (j = 0; j < pvec_pages; j++) {
945 if (!pvec.pages[j])
946 continue;
947 if (n < j)
948 pvec.pages[n] = pvec.pages[j];
949 n++;
950 }
951 pvec.nr = n;
1d3576fd
SW
952
953 if (pvec_pages && i == pvec_pages &&
954 locked_pages < max_pages) {
955 dout("reached end pvec, trying for more\n");
0713e5f2 956 pagevec_release(&pvec);
1d3576fd
SW
957 goto get_more_pages;
958 }
1d3576fd
SW
959 }
960
5b64640c 961new_request:
e5975c7c 962 offset = page_offset(pages[0]);
5b64640c
YZ
963 len = wsize;
964
965 req = ceph_osdc_new_request(&fsc->client->osdc,
966 &ci->i_layout, vino,
967 offset, &len, 0, num_ops,
1f934b00
YZ
968 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
969 snapc, ceph_wbc.truncate_seq,
970 ceph_wbc.truncate_size, false);
5b64640c
YZ
971 if (IS_ERR(req)) {
972 req = ceph_osdc_new_request(&fsc->client->osdc,
973 &ci->i_layout, vino,
974 offset, &len, 0,
975 min(num_ops,
976 CEPH_OSD_SLAB_OPS),
977 CEPH_OSD_OP_WRITE,
54ea0046 978 CEPH_OSD_FLAG_WRITE,
1f934b00
YZ
979 snapc, ceph_wbc.truncate_seq,
980 ceph_wbc.truncate_size, true);
5b64640c 981 BUG_ON(IS_ERR(req));
e1966b49 982 }
5b64640c 983 BUG_ON(len < page_offset(pages[locked_pages - 1]) +
09cbfeaf 984 PAGE_SIZE - offset);
5b64640c
YZ
985
986 req->r_callback = writepages_finish;
987 req->r_inode = inode;
1d3576fd 988
5b64640c
YZ
989 /* Format the osd request message and submit the write */
990 len = 0;
991 data_pages = pages;
992 op_idx = 0;
993 for (i = 0; i < locked_pages; i++) {
994 u64 cur_offset = page_offset(pages[i]);
995 if (offset + len != cur_offset) {
3fb99d48 996 if (op_idx + 1 == req->r_num_ops)
5b64640c
YZ
997 break;
998 osd_req_op_extent_dup_last(req, op_idx,
999 cur_offset - offset);
1000 dout("writepages got pages at %llu~%llu\n",
1001 offset, len);
1002 osd_req_op_extent_osd_data_pages(req, op_idx,
1003 data_pages, len, 0,
a0102bda 1004 from_pool, false);
5b64640c 1005 osd_req_op_extent_update(req, op_idx, len);
e5975c7c 1006
5b64640c
YZ
1007 len = 0;
1008 offset = cur_offset;
1009 data_pages = pages + i;
1010 op_idx++;
1011 }
1012
1013 set_page_writeback(pages[i]);
09cbfeaf 1014 len += PAGE_SIZE;
5b64640c
YZ
1015 }
1016
1f934b00
YZ
1017 if (ceph_wbc.size_stable) {
1018 len = min(len, ceph_wbc.i_size - offset);
5b64640c
YZ
1019 } else if (i == locked_pages) {
1020 /* writepages_finish() clears writeback pages
1021 * according to the data length, so make sure
1022 * data length covers all locked pages */
09cbfeaf 1023 u64 min_len = len + 1 - PAGE_SIZE;
1f934b00
YZ
1024 len = get_writepages_data_length(inode, pages[i - 1],
1025 offset);
5b64640c
YZ
1026 len = max(len, min_len);
1027 }
1028 dout("writepages got pages at %llu~%llu\n", offset, len);
e5975c7c 1029
5b64640c 1030 osd_req_op_extent_osd_data_pages(req, op_idx, data_pages, len,
a0102bda 1031 0, from_pool, false);
5b64640c 1032 osd_req_op_extent_update(req, op_idx, len);
e5975c7c 1033
5b64640c
YZ
1034 BUG_ON(op_idx + 1 != req->r_num_ops);
1035
a0102bda 1036 from_pool = false;
5b64640c
YZ
1037 if (i < locked_pages) {
1038 BUG_ON(num_ops <= req->r_num_ops);
1039 num_ops -= req->r_num_ops;
5b64640c
YZ
1040 locked_pages -= i;
1041
1042 /* allocate new pages array for next request */
1043 data_pages = pages;
6da2ec56
KC
1044 pages = kmalloc_array(locked_pages, sizeof(*pages),
1045 GFP_NOFS);
5b64640c 1046 if (!pages) {
a0102bda
JL
1047 from_pool = true;
1048 pages = mempool_alloc(ceph_wb_pagevec_pool, GFP_NOFS);
5b64640c
YZ
1049 BUG_ON(!pages);
1050 }
1051 memcpy(pages, data_pages + i,
1052 locked_pages * sizeof(*pages));
1053 memset(data_pages + i, 0,
1054 locked_pages * sizeof(*pages));
1055 } else {
1056 BUG_ON(num_ops != req->r_num_ops);
1057 index = pages[i - 1]->index + 1;
1058 /* request message now owns the pages array */
1059 pages = NULL;
1060 }
e5975c7c 1061
fac02ddf 1062 req->r_mtime = inode->i_mtime;
9d6fcb08
SW
1063 rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
1064 BUG_ON(rc);
1d3576fd
SW
1065 req = NULL;
1066
5b64640c
YZ
1067 wbc->nr_to_write -= i;
1068 if (pages)
1069 goto new_request;
1070
2a2d927e
YZ
1071 /*
1072 * We stop writing back only if we are not doing
1073 * integrity sync. In case of integrity sync we have to
1074 * keep going until we have written all the pages
1075 * we tagged for writeback prior to entering this loop.
1076 */
1077 if (wbc->nr_to_write <= 0 && wbc->sync_mode == WB_SYNC_NONE)
af9cc401 1078 done = true;
1d3576fd
SW
1079
1080release_pvec_pages:
1081 dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
1082 pvec.nr ? pvec.pages[0] : NULL);
1083 pagevec_release(&pvec);
1d3576fd
SW
1084 }
1085
1086 if (should_loop && !done) {
1087 /* more to do; loop back to beginning of file */
1088 dout("writepages looping back to beginning of file\n");
2a2d927e 1089 end = start_index - 1; /* OK even when start_index == 0 */
f275635e
YZ
1090
1091 /* to write dirty pages associated with next snapc,
1092 * we need to wait until current writes complete */
1093 if (wbc->sync_mode != WB_SYNC_NONE &&
1094 start_index == 0 && /* all dirty pages were checked */
1095 !ceph_wbc.head_snapc) {
1096 struct page *page;
1097 unsigned i, nr;
1098 index = 0;
1099 while ((index <= end) &&
1100 (nr = pagevec_lookup_tag(&pvec, mapping, &index,
67fd707f 1101 PAGECACHE_TAG_WRITEBACK))) {
f275635e
YZ
1102 for (i = 0; i < nr; i++) {
1103 page = pvec.pages[i];
1104 if (page_snap_context(page) != snapc)
1105 continue;
1106 wait_on_page_writeback(page);
1107 }
1108 pagevec_release(&pvec);
1109 cond_resched();
1110 }
1111 }
1112
2a2d927e 1113 start_index = 0;
1d3576fd
SW
1114 index = 0;
1115 goto retry;
1116 }
1117
1118 if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
1119 mapping->writeback_index = index;
1120
1121out:
3ed97d63 1122 ceph_osdc_put_request(req);
2a2d927e
YZ
1123 ceph_put_snap_context(last_snapc);
1124 dout("writepages dend - startone, rc = %d\n", rc);
1d3576fd
SW
1125 return rc;
1126}
1127
1128
1129
1130/*
1131 * See if a given @snapc is either writeable, or already written.
1132 */
1133static int context_is_writeable_or_written(struct inode *inode,
1134 struct ceph_snap_context *snapc)
1135{
05455e11 1136 struct ceph_snap_context *oldest = get_oldest_context(inode, NULL, NULL);
6298a337
SW
1137 int ret = !oldest || snapc->seq <= oldest->seq;
1138
1139 ceph_put_snap_context(oldest);
1140 return ret;
1d3576fd
SW
1141}
1142
18d620f0
JL
1143/**
1144 * ceph_find_incompatible - find an incompatible context and return it
18d620f0 1145 * @page: page being dirtied
8f883c24 1146 *
18d620f0
JL
1147 * We are only allowed to write into/dirty a page if the page is
1148 * clean, or already dirty within the same snap context. Returns a
1149 * conflicting context if there is one, NULL if there isn't, or a
1150 * negative error code on other errors.
1151 *
1152 * Must be called with page lock held.
1d3576fd 1153 */
18d620f0 1154static struct ceph_snap_context *
d45156bf 1155ceph_find_incompatible(struct page *page)
1d3576fd 1156{
d45156bf 1157 struct inode *inode = page->mapping->host;
6c93df5d 1158 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1d3576fd 1159 struct ceph_inode_info *ci = ceph_inode(inode);
1d3576fd 1160
50c9132d 1161 if (READ_ONCE(fsc->mount_state) >= CEPH_MOUNT_SHUTDOWN) {
6c93df5d 1162 dout(" page %p forced umount\n", page);
18d620f0 1163 return ERR_PTR(-EIO);
6c93df5d
YZ
1164 }
1165
18d620f0
JL
1166 for (;;) {
1167 struct ceph_snap_context *snapc, *oldest;
1168
1169 wait_on_page_writeback(page);
1170
1171 snapc = page_snap_context(page);
1172 if (!snapc || snapc == ci->i_head_snapc)
1173 break;
1d3576fd 1174
1d3576fd
SW
1175 /*
1176 * this page is already dirty in another (older) snap
1177 * context! is it writeable now?
1178 */
05455e11 1179 oldest = get_oldest_context(inode, NULL, NULL);
80e755fe 1180 if (snapc->seq > oldest->seq) {
18d620f0 1181 /* not writeable -- return it for the caller to deal with */
6298a337 1182 ceph_put_snap_context(oldest);
18d620f0
JL
1183 dout(" page %p snapc %p not current or oldest\n", page, snapc);
1184 return ceph_get_snap_context(snapc);
1d3576fd 1185 }
6298a337 1186 ceph_put_snap_context(oldest);
1d3576fd
SW
1187
1188 /* yay, writeable, do it now (without dropping page lock) */
18d620f0
JL
1189 dout(" page %p snapc %p not current, but oldest\n", page, snapc);
1190 if (clear_page_dirty_for_io(page)) {
1191 int r = writepage_nounlock(page, NULL);
1192 if (r < 0)
1193 return ERR_PTR(r);
1194 }
1195 }
1196 return NULL;
1197}
1198
d801327d
JL
1199static int ceph_netfs_check_write_begin(struct file *file, loff_t pos, unsigned int len,
1200 struct page *page, void **_fsdata)
1201{
1202 struct inode *inode = file_inode(file);
1203 struct ceph_inode_info *ci = ceph_inode(inode);
1204 struct ceph_snap_context *snapc;
1205
1206 snapc = ceph_find_incompatible(page);
1207 if (snapc) {
1208 int r;
1209
1210 unlock_page(page);
1211 put_page(page);
1212 if (IS_ERR(snapc))
1213 return PTR_ERR(snapc);
1214
1215 ceph_queue_writeback(inode);
1216 r = wait_event_killable(ci->i_cap_wq,
1217 context_is_writeable_or_written(inode, snapc));
1218 ceph_put_snap_context(snapc);
1219 return r == 0 ? -EAGAIN : r;
1220 }
1221 return 0;
1222}
1223
18d620f0
JL
1224/*
1225 * We are only allowed to write into/dirty the page if the page is
1226 * clean, or already dirty within the same snap context.
18d620f0 1227 */
1cc16990
JL
1228static int ceph_write_begin(struct file *file, struct address_space *mapping,
1229 loff_t pos, unsigned len, unsigned flags,
1230 struct page **pagep, void **fsdata)
18d620f0
JL
1231{
1232 struct inode *inode = file_inode(file);
1233 struct ceph_inode_info *ci = ceph_inode(inode);
1cc16990
JL
1234 struct page *page = NULL;
1235 pgoff_t index = pos >> PAGE_SHIFT;
d801327d 1236 int r;
1d3576fd 1237
d801327d
JL
1238 /*
1239 * Uninlining should have already been done and everything updated, EXCEPT
1240 * for inline_version sent to the MDS.
1241 */
1242 if (ci->i_inline_version != CEPH_INLINE_NONE) {
4a357f50 1243 page = grab_cache_page_write_begin(mapping, index, flags);
d801327d
JL
1244 if (!page)
1245 return -ENOMEM;
4af6b225 1246
1cc16990 1247 /*
d801327d
JL
1248 * The inline_version on a new inode is set to 1. If that's the
1249 * case, then the page is brand new and isn't yet Uptodate.
1cc16990 1250 */
d801327d
JL
1251 r = 0;
1252 if (index == 0 && ci->i_inline_version != 1) {
1253 if (!PageUptodate(page)) {
1254 WARN_ONCE(1, "ceph: write_begin called on still-inlined inode (inline_version %llu)!\n",
1255 ci->i_inline_version);
1256 r = -EINVAL;
1257 }
1258 goto out;
1cc16990 1259 }
d801327d
JL
1260 zero_user_segment(page, 0, PAGE_SIZE);
1261 SetPageUptodate(page);
1262 goto out;
1cc16990 1263 }
4af6b225 1264
d801327d
JL
1265 r = netfs_write_begin(file, inode->i_mapping, pos, len, 0, &page, NULL,
1266 &ceph_netfs_read_ops, NULL);
1267out:
1268 if (r == 0)
1269 wait_on_page_fscache(page);
1cc16990 1270 if (r < 0) {
d801327d 1271 if (page)
09cbfeaf 1272 put_page(page);
1cc16990 1273 } else {
d801327d 1274 WARN_ON_ONCE(!PageLocked(page));
1cc16990
JL
1275 *pagep = page;
1276 }
4af6b225
YS
1277 return r;
1278}
1279
1d3576fd
SW
1280/*
1281 * we don't do anything in here that simple_write_end doesn't do
5dda377c 1282 * except adjust dirty page accounting
1d3576fd
SW
1283 */
1284static int ceph_write_end(struct file *file, struct address_space *mapping,
1285 loff_t pos, unsigned len, unsigned copied,
1286 struct page *page, void *fsdata)
1287{
496ad9aa 1288 struct inode *inode = file_inode(file);
efb0ca76 1289 bool check_cap = false;
1d3576fd
SW
1290
1291 dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
1292 inode, page, (int)pos, (int)copied, (int)len);
1293
1294 /* zero the stale part of the page if we did a short copy */
b9de313c
AV
1295 if (!PageUptodate(page)) {
1296 if (copied < len) {
1297 copied = 0;
1298 goto out;
1299 }
1300 SetPageUptodate(page);
1301 }
1d3576fd
SW
1302
1303 /* did file size increase? */
99c88e69 1304 if (pos+copied > i_size_read(inode))
1d3576fd
SW
1305 check_cap = ceph_inode_set_size(inode, pos+copied);
1306
1d3576fd
SW
1307 set_page_dirty(page);
1308
b9de313c 1309out:
1d3576fd 1310 unlock_page(page);
09cbfeaf 1311 put_page(page);
1d3576fd
SW
1312
1313 if (check_cap)
1314 ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
1315
1316 return copied;
1317}
1318
1319/*
1320 * we set .direct_IO to indicate direct io is supported, but since we
1321 * intercept O_DIRECT reads and writes early, this function should
1322 * never get called.
1323 */
c8b8e32d 1324static ssize_t ceph_direct_io(struct kiocb *iocb, struct iov_iter *iter)
1d3576fd
SW
1325{
1326 WARN_ON(1);
1327 return -EINVAL;
1328}
1329
1330const struct address_space_operations ceph_aops = {
1331 .readpage = ceph_readpage,
49870056 1332 .readahead = ceph_readahead,
1d3576fd
SW
1333 .writepage = ceph_writepage,
1334 .writepages = ceph_writepages_start,
1335 .write_begin = ceph_write_begin,
1336 .write_end = ceph_write_end,
1337 .set_page_dirty = ceph_set_page_dirty,
1338 .invalidatepage = ceph_invalidatepage,
1339 .releasepage = ceph_releasepage,
1340 .direct_IO = ceph_direct_io,
1341};
1342
4f7e89f6
YZ
1343static void ceph_block_sigs(sigset_t *oldset)
1344{
1345 sigset_t mask;
1346 siginitsetinv(&mask, sigmask(SIGKILL));
1347 sigprocmask(SIG_BLOCK, &mask, oldset);
1348}
1349
1350static void ceph_restore_sigs(sigset_t *oldset)
1351{
1352 sigprocmask(SIG_SETMASK, oldset, NULL);
1353}
1d3576fd
SW
1354
1355/*
1356 * vm ops
1357 */
24499847 1358static vm_fault_t ceph_filemap_fault(struct vm_fault *vmf)
61f68816 1359{
11bac800 1360 struct vm_area_struct *vma = vmf->vma;
61f68816
YZ
1361 struct inode *inode = file_inode(vma->vm_file);
1362 struct ceph_inode_info *ci = ceph_inode(inode);
1363 struct ceph_file_info *fi = vma->vm_file->private_data;
3738daa6 1364 struct page *pinned_page = NULL;
c403c3a2 1365 loff_t off = (loff_t)vmf->pgoff << PAGE_SHIFT;
24499847 1366 int want, got, err;
4f7e89f6 1367 sigset_t oldset;
24499847 1368 vm_fault_t ret = VM_FAULT_SIGBUS;
4f7e89f6
YZ
1369
1370 ceph_block_sigs(&oldset);
61f68816
YZ
1371
1372 dout("filemap_fault %p %llx.%llx %llu~%zd trying to get caps\n",
09cbfeaf 1373 inode, ceph_vinop(inode), off, (size_t)PAGE_SIZE);
61f68816
YZ
1374 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1375 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
1376 else
1377 want = CEPH_CAP_FILE_CACHE;
4f7e89f6
YZ
1378
1379 got = 0;
5e3ded1b
YZ
1380 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_RD, want, -1,
1381 &got, &pinned_page);
24499847 1382 if (err < 0)
4f7e89f6 1383 goto out_restore;
6ce026e4 1384
61f68816 1385 dout("filemap_fault %p %llu~%zd got cap refs on %s\n",
09cbfeaf 1386 inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got));
61f68816 1387
83701246 1388 if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
2b1ac852 1389 ci->i_inline_version == CEPH_INLINE_NONE) {
5d988308
YZ
1390 CEPH_DEFINE_RW_CONTEXT(rw_ctx, got);
1391 ceph_add_rw_context(fi, &rw_ctx);
11bac800 1392 ret = filemap_fault(vmf);
5d988308 1393 ceph_del_rw_context(fi, &rw_ctx);
24499847
SJ
1394 dout("filemap_fault %p %llu~%zd drop cap refs %s ret %x\n",
1395 inode, off, (size_t)PAGE_SIZE,
1396 ceph_cap_string(got), ret);
2b1ac852 1397 } else
24499847 1398 err = -EAGAIN;
61f68816 1399
3738daa6 1400 if (pinned_page)
09cbfeaf 1401 put_page(pinned_page);
61f68816
YZ
1402 ceph_put_cap_refs(ci, got);
1403
24499847 1404 if (err != -EAGAIN)
4f7e89f6 1405 goto out_restore;
83701246
YZ
1406
1407 /* read inline data */
09cbfeaf 1408 if (off >= PAGE_SIZE) {
83701246
YZ
1409 /* does not support inline data > PAGE_SIZE */
1410 ret = VM_FAULT_SIGBUS;
1411 } else {
83701246
YZ
1412 struct address_space *mapping = inode->i_mapping;
1413 struct page *page = find_or_create_page(mapping, 0,
c62d2555
MH
1414 mapping_gfp_constraint(mapping,
1415 ~__GFP_FS));
83701246
YZ
1416 if (!page) {
1417 ret = VM_FAULT_OOM;
4f7e89f6 1418 goto out_inline;
83701246 1419 }
24499847 1420 err = __ceph_do_getattr(inode, page,
83701246 1421 CEPH_STAT_CAP_INLINE_DATA, true);
24499847 1422 if (err < 0 || off >= i_size_read(inode)) {
83701246 1423 unlock_page(page);
09cbfeaf 1424 put_page(page);
c64a2b05 1425 ret = vmf_error(err);
4f7e89f6 1426 goto out_inline;
83701246 1427 }
24499847
SJ
1428 if (err < PAGE_SIZE)
1429 zero_user_segment(page, err, PAGE_SIZE);
83701246
YZ
1430 else
1431 flush_dcache_page(page);
1432 SetPageUptodate(page);
1433 vmf->page = page;
1434 ret = VM_FAULT_MAJOR | VM_FAULT_LOCKED;
4f7e89f6 1435out_inline:
24499847 1436 dout("filemap_fault %p %llu~%zd read inline data ret %x\n",
4f7e89f6 1437 inode, off, (size_t)PAGE_SIZE, ret);
83701246 1438 }
4f7e89f6
YZ
1439out_restore:
1440 ceph_restore_sigs(&oldset);
24499847
SJ
1441 if (err < 0)
1442 ret = vmf_error(err);
6ce026e4 1443
61f68816
YZ
1444 return ret;
1445}
1d3576fd 1446
24499847 1447static vm_fault_t ceph_page_mkwrite(struct vm_fault *vmf)
1d3576fd 1448{
11bac800 1449 struct vm_area_struct *vma = vmf->vma;
496ad9aa 1450 struct inode *inode = file_inode(vma->vm_file);
61f68816
YZ
1451 struct ceph_inode_info *ci = ceph_inode(inode);
1452 struct ceph_file_info *fi = vma->vm_file->private_data;
f66fd9f0 1453 struct ceph_cap_flush *prealloc_cf;
61f68816 1454 struct page *page = vmf->page;
6285bc23 1455 loff_t off = page_offset(page);
61f68816
YZ
1456 loff_t size = i_size_read(inode);
1457 size_t len;
24499847 1458 int want, got, err;
4f7e89f6 1459 sigset_t oldset;
24499847 1460 vm_fault_t ret = VM_FAULT_SIGBUS;
3ca9c3bd 1461
f66fd9f0
YZ
1462 prealloc_cf = ceph_alloc_cap_flush();
1463 if (!prealloc_cf)
6ce026e4 1464 return VM_FAULT_OOM;
f66fd9f0 1465
249c1df5 1466 sb_start_pagefault(inode->i_sb);
4f7e89f6 1467 ceph_block_sigs(&oldset);
f66fd9f0 1468
28127bdd
YZ
1469 if (ci->i_inline_version != CEPH_INLINE_NONE) {
1470 struct page *locked_page = NULL;
1471 if (off == 0) {
1472 lock_page(page);
1473 locked_page = page;
1474 }
24499847 1475 err = ceph_uninline_data(vma->vm_file, locked_page);
28127bdd
YZ
1476 if (locked_page)
1477 unlock_page(locked_page);
24499847 1478 if (err < 0)
f66fd9f0 1479 goto out_free;
28127bdd
YZ
1480 }
1481
09cbfeaf
KS
1482 if (off + PAGE_SIZE <= size)
1483 len = PAGE_SIZE;
1d3576fd 1484 else
09cbfeaf 1485 len = size & ~PAGE_MASK;
1d3576fd 1486
61f68816
YZ
1487 dout("page_mkwrite %p %llx.%llx %llu~%zd getting caps i_size %llu\n",
1488 inode, ceph_vinop(inode), off, len, size);
1489 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1490 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1491 else
1492 want = CEPH_CAP_FILE_BUFFER;
4f7e89f6
YZ
1493
1494 got = 0;
5e3ded1b 1495 err = ceph_get_caps(vma->vm_file, CEPH_CAP_FILE_WR, want, off + len,
4f7e89f6 1496 &got, NULL);
24499847 1497 if (err < 0)
4f7e89f6 1498 goto out_free;
6ce026e4 1499
61f68816
YZ
1500 dout("page_mkwrite %p %llu~%zd got cap refs on %s\n",
1501 inode, off, len, ceph_cap_string(got));
1502
1503 /* Update time before taking page lock */
1504 file_update_time(vma->vm_file);
5c308356 1505 inode_inc_iversion_raw(inode);
4af6b225 1506
f0b33df5 1507 do {
d45156bf
JL
1508 struct ceph_snap_context *snapc;
1509
f0b33df5 1510 lock_page(page);
4af6b225 1511
cb03c143 1512 if (page_mkwrite_check_truncate(page, inode) < 0) {
f0b33df5
YZ
1513 unlock_page(page);
1514 ret = VM_FAULT_NOPAGE;
1515 break;
1516 }
1517
d45156bf
JL
1518 snapc = ceph_find_incompatible(page);
1519 if (!snapc) {
f0b33df5
YZ
1520 /* success. we'll keep the page locked. */
1521 set_page_dirty(page);
1522 ret = VM_FAULT_LOCKED;
d45156bf
JL
1523 break;
1524 }
1525
1526 unlock_page(page);
1527
1528 if (IS_ERR(snapc)) {
1529 ret = VM_FAULT_SIGBUS;
1530 break;
f0b33df5 1531 }
d45156bf
JL
1532
1533 ceph_queue_writeback(inode);
1534 err = wait_event_killable(ci->i_cap_wq,
1535 context_is_writeable_or_written(inode, snapc));
1536 ceph_put_snap_context(snapc);
1537 } while (err == 0);
4af6b225 1538
28127bdd
YZ
1539 if (ret == VM_FAULT_LOCKED ||
1540 ci->i_inline_version != CEPH_INLINE_NONE) {
61f68816
YZ
1541 int dirty;
1542 spin_lock(&ci->i_ceph_lock);
28127bdd 1543 ci->i_inline_version = CEPH_INLINE_NONE;
f66fd9f0
YZ
1544 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
1545 &prealloc_cf);
61f68816
YZ
1546 spin_unlock(&ci->i_ceph_lock);
1547 if (dirty)
1548 __mark_inode_dirty(inode, dirty);
1549 }
1550
24499847 1551 dout("page_mkwrite %p %llu~%zd dropping cap refs on %s ret %x\n",
61f68816 1552 inode, off, len, ceph_cap_string(got), ret);
a8810cdc 1553 ceph_put_cap_refs_async(ci, got);
f66fd9f0 1554out_free:
4f7e89f6 1555 ceph_restore_sigs(&oldset);
249c1df5 1556 sb_end_pagefault(inode->i_sb);
f66fd9f0 1557 ceph_free_cap_flush(prealloc_cf);
24499847
SJ
1558 if (err < 0)
1559 ret = vmf_error(err);
1d3576fd
SW
1560 return ret;
1561}
1562
31c542a1
YZ
1563void ceph_fill_inline_data(struct inode *inode, struct page *locked_page,
1564 char *data, size_t len)
1565{
1566 struct address_space *mapping = inode->i_mapping;
1567 struct page *page;
1568
1569 if (locked_page) {
1570 page = locked_page;
1571 } else {
1572 if (i_size_read(inode) == 0)
1573 return;
1574 page = find_or_create_page(mapping, 0,
c62d2555
MH
1575 mapping_gfp_constraint(mapping,
1576 ~__GFP_FS));
31c542a1
YZ
1577 if (!page)
1578 return;
1579 if (PageUptodate(page)) {
1580 unlock_page(page);
09cbfeaf 1581 put_page(page);
31c542a1
YZ
1582 return;
1583 }
1584 }
1585
0668ff52 1586 dout("fill_inline_data %p %llx.%llx len %zu locked_page %p\n",
31c542a1
YZ
1587 inode, ceph_vinop(inode), len, locked_page);
1588
1589 if (len > 0) {
1590 void *kaddr = kmap_atomic(page);
1591 memcpy(kaddr, data, len);
1592 kunmap_atomic(kaddr);
1593 }
1594
1595 if (page != locked_page) {
09cbfeaf
KS
1596 if (len < PAGE_SIZE)
1597 zero_user_segment(page, len, PAGE_SIZE);
31c542a1
YZ
1598 else
1599 flush_dcache_page(page);
1600
1601 SetPageUptodate(page);
1602 unlock_page(page);
09cbfeaf 1603 put_page(page);
31c542a1
YZ
1604 }
1605}
1606
28127bdd
YZ
1607int ceph_uninline_data(struct file *filp, struct page *locked_page)
1608{
1609 struct inode *inode = file_inode(filp);
1610 struct ceph_inode_info *ci = ceph_inode(inode);
1611 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1612 struct ceph_osd_request *req;
1613 struct page *page = NULL;
1614 u64 len, inline_version;
1615 int err = 0;
1616 bool from_pagecache = false;
1617
1618 spin_lock(&ci->i_ceph_lock);
1619 inline_version = ci->i_inline_version;
1620 spin_unlock(&ci->i_ceph_lock);
1621
1622 dout("uninline_data %p %llx.%llx inline_version %llu\n",
1623 inode, ceph_vinop(inode), inline_version);
1624
1625 if (inline_version == 1 || /* initial version, no data */
1626 inline_version == CEPH_INLINE_NONE)
1627 goto out;
1628
1629 if (locked_page) {
1630 page = locked_page;
1631 WARN_ON(!PageUptodate(page));
1632 } else if (ceph_caps_issued(ci) &
1633 (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) {
1634 page = find_get_page(inode->i_mapping, 0);
1635 if (page) {
1636 if (PageUptodate(page)) {
1637 from_pagecache = true;
1638 lock_page(page);
1639 } else {
09cbfeaf 1640 put_page(page);
28127bdd
YZ
1641 page = NULL;
1642 }
1643 }
1644 }
1645
1646 if (page) {
1647 len = i_size_read(inode);
09cbfeaf
KS
1648 if (len > PAGE_SIZE)
1649 len = PAGE_SIZE;
28127bdd
YZ
1650 } else {
1651 page = __page_cache_alloc(GFP_NOFS);
1652 if (!page) {
1653 err = -ENOMEM;
1654 goto out;
1655 }
1656 err = __ceph_do_getattr(inode, page,
1657 CEPH_STAT_CAP_INLINE_DATA, true);
1658 if (err < 0) {
1659 /* no inline data */
1660 if (err == -ENODATA)
1661 err = 0;
1662 goto out;
1663 }
1664 len = err;
1665 }
1666
1667 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1668 ceph_vino(inode), 0, &len, 0, 1,
54ea0046 1669 CEPH_OSD_OP_CREATE, CEPH_OSD_FLAG_WRITE,
34b759b4 1670 NULL, 0, 0, false);
28127bdd
YZ
1671 if (IS_ERR(req)) {
1672 err = PTR_ERR(req);
1673 goto out;
1674 }
1675
fac02ddf 1676 req->r_mtime = inode->i_mtime;
28127bdd
YZ
1677 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1678 if (!err)
1679 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
1680 ceph_osdc_put_request(req);
1681 if (err < 0)
1682 goto out;
1683
1684 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
1685 ceph_vino(inode), 0, &len, 1, 3,
54ea0046 1686 CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
34b759b4
ID
1687 NULL, ci->i_truncate_seq,
1688 ci->i_truncate_size, false);
28127bdd
YZ
1689 if (IS_ERR(req)) {
1690 err = PTR_ERR(req);
1691 goto out;
1692 }
1693
1694 osd_req_op_extent_osd_data_pages(req, 1, &page, len, 0, false, false);
1695
ec137c10
YZ
1696 {
1697 __le64 xattr_buf = cpu_to_le64(inline_version);
1698 err = osd_req_op_xattr_init(req, 0, CEPH_OSD_OP_CMPXATTR,
1699 "inline_version", &xattr_buf,
1700 sizeof(xattr_buf),
1701 CEPH_OSD_CMPXATTR_OP_GT,
1702 CEPH_OSD_CMPXATTR_MODE_U64);
1703 if (err)
1704 goto out_put;
1705 }
1706
1707 {
1708 char xattr_buf[32];
1709 int xattr_len = snprintf(xattr_buf, sizeof(xattr_buf),
1710 "%llu", inline_version);
1711 err = osd_req_op_xattr_init(req, 2, CEPH_OSD_OP_SETXATTR,
1712 "inline_version",
1713 xattr_buf, xattr_len, 0, 0);
1714 if (err)
1715 goto out_put;
1716 }
28127bdd 1717
fac02ddf 1718 req->r_mtime = inode->i_mtime;
28127bdd
YZ
1719 err = ceph_osdc_start_request(&fsc->client->osdc, req, false);
1720 if (!err)
1721 err = ceph_osdc_wait_request(&fsc->client->osdc, req);
97e27aaa
XL
1722
1723 ceph_update_write_latency(&fsc->mdsc->metric, req->r_start_latency,
1724 req->r_end_latency, err);
1725
28127bdd
YZ
1726out_put:
1727 ceph_osdc_put_request(req);
1728 if (err == -ECANCELED)
1729 err = 0;
1730out:
1731 if (page && page != locked_page) {
1732 if (from_pagecache) {
1733 unlock_page(page);
09cbfeaf 1734 put_page(page);
28127bdd
YZ
1735 } else
1736 __free_pages(page, 0);
1737 }
1738
1739 dout("uninline_data %p %llx.%llx inline_version %llu = %d\n",
1740 inode, ceph_vinop(inode), inline_version, err);
1741 return err;
1742}
1743
7cbea8dc 1744static const struct vm_operations_struct ceph_vmops = {
61f68816 1745 .fault = ceph_filemap_fault,
1d3576fd
SW
1746 .page_mkwrite = ceph_page_mkwrite,
1747};
1748
1749int ceph_mmap(struct file *file, struct vm_area_struct *vma)
1750{
1751 struct address_space *mapping = file->f_mapping;
1752
1753 if (!mapping->a_ops->readpage)
1754 return -ENOEXEC;
1755 file_accessed(file);
1756 vma->vm_ops = &ceph_vmops;
1d3576fd
SW
1757 return 0;
1758}
10183a69
YZ
1759
1760enum {
1761 POOL_READ = 1,
1762 POOL_WRITE = 2,
1763};
1764
779fe0fb
YZ
1765static int __ceph_pool_perm_get(struct ceph_inode_info *ci,
1766 s64 pool, struct ceph_string *pool_ns)
10183a69
YZ
1767{
1768 struct ceph_fs_client *fsc = ceph_inode_to_client(&ci->vfs_inode);
1769 struct ceph_mds_client *mdsc = fsc->mdsc;
1770 struct ceph_osd_request *rd_req = NULL, *wr_req = NULL;
1771 struct rb_node **p, *parent;
1772 struct ceph_pool_perm *perm;
1773 struct page **pages;
779fe0fb 1774 size_t pool_ns_len;
10183a69
YZ
1775 int err = 0, err2 = 0, have = 0;
1776
1777 down_read(&mdsc->pool_perm_rwsem);
1778 p = &mdsc->pool_perm_tree.rb_node;
1779 while (*p) {
1780 perm = rb_entry(*p, struct ceph_pool_perm, node);
1781 if (pool < perm->pool)
1782 p = &(*p)->rb_left;
1783 else if (pool > perm->pool)
1784 p = &(*p)->rb_right;
1785 else {
779fe0fb
YZ
1786 int ret = ceph_compare_string(pool_ns,
1787 perm->pool_ns,
1788 perm->pool_ns_len);
1789 if (ret < 0)
1790 p = &(*p)->rb_left;
1791 else if (ret > 0)
1792 p = &(*p)->rb_right;
1793 else {
1794 have = perm->perm;
1795 break;
1796 }
10183a69
YZ
1797 }
1798 }
1799 up_read(&mdsc->pool_perm_rwsem);
1800 if (*p)
1801 goto out;
1802
779fe0fb
YZ
1803 if (pool_ns)
1804 dout("__ceph_pool_perm_get pool %lld ns %.*s no perm cached\n",
1805 pool, (int)pool_ns->len, pool_ns->str);
1806 else
1807 dout("__ceph_pool_perm_get pool %lld no perm cached\n", pool);
10183a69
YZ
1808
1809 down_write(&mdsc->pool_perm_rwsem);
779fe0fb 1810 p = &mdsc->pool_perm_tree.rb_node;
10183a69
YZ
1811 parent = NULL;
1812 while (*p) {
1813 parent = *p;
1814 perm = rb_entry(parent, struct ceph_pool_perm, node);
1815 if (pool < perm->pool)
1816 p = &(*p)->rb_left;
1817 else if (pool > perm->pool)
1818 p = &(*p)->rb_right;
1819 else {
779fe0fb
YZ
1820 int ret = ceph_compare_string(pool_ns,
1821 perm->pool_ns,
1822 perm->pool_ns_len);
1823 if (ret < 0)
1824 p = &(*p)->rb_left;
1825 else if (ret > 0)
1826 p = &(*p)->rb_right;
1827 else {
1828 have = perm->perm;
1829 break;
1830 }
10183a69
YZ
1831 }
1832 }
1833 if (*p) {
1834 up_write(&mdsc->pool_perm_rwsem);
1835 goto out;
1836 }
1837
34b759b4 1838 rd_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
10183a69
YZ
1839 1, false, GFP_NOFS);
1840 if (!rd_req) {
1841 err = -ENOMEM;
1842 goto out_unlock;
1843 }
1844
1845 rd_req->r_flags = CEPH_OSD_FLAG_READ;
1846 osd_req_op_init(rd_req, 0, CEPH_OSD_OP_STAT, 0);
1847 rd_req->r_base_oloc.pool = pool;
779fe0fb
YZ
1848 if (pool_ns)
1849 rd_req->r_base_oloc.pool_ns = ceph_get_string(pool_ns);
d30291b9 1850 ceph_oid_printf(&rd_req->r_base_oid, "%llx.00000000", ci->i_vino.ino);
10183a69 1851
13d1ad16
ID
1852 err = ceph_osdc_alloc_messages(rd_req, GFP_NOFS);
1853 if (err)
1854 goto out_unlock;
10183a69 1855
34b759b4 1856 wr_req = ceph_osdc_alloc_request(&fsc->client->osdc, NULL,
10183a69
YZ
1857 1, false, GFP_NOFS);
1858 if (!wr_req) {
1859 err = -ENOMEM;
1860 goto out_unlock;
1861 }
1862
54ea0046 1863 wr_req->r_flags = CEPH_OSD_FLAG_WRITE;
10183a69 1864 osd_req_op_init(wr_req, 0, CEPH_OSD_OP_CREATE, CEPH_OSD_OP_FLAG_EXCL);
63244fa1 1865 ceph_oloc_copy(&wr_req->r_base_oloc, &rd_req->r_base_oloc);
d30291b9 1866 ceph_oid_copy(&wr_req->r_base_oid, &rd_req->r_base_oid);
10183a69 1867
13d1ad16
ID
1868 err = ceph_osdc_alloc_messages(wr_req, GFP_NOFS);
1869 if (err)
1870 goto out_unlock;
10183a69
YZ
1871
1872 /* one page should be large enough for STAT data */
1873 pages = ceph_alloc_page_vector(1, GFP_KERNEL);
1874 if (IS_ERR(pages)) {
1875 err = PTR_ERR(pages);
1876 goto out_unlock;
1877 }
1878
1879 osd_req_op_raw_data_in_pages(rd_req, 0, pages, PAGE_SIZE,
1880 0, false, true);
10183a69
YZ
1881 err = ceph_osdc_start_request(&fsc->client->osdc, rd_req, false);
1882
fac02ddf 1883 wr_req->r_mtime = ci->vfs_inode.i_mtime;
10183a69
YZ
1884 err2 = ceph_osdc_start_request(&fsc->client->osdc, wr_req, false);
1885
1886 if (!err)
1887 err = ceph_osdc_wait_request(&fsc->client->osdc, rd_req);
1888 if (!err2)
1889 err2 = ceph_osdc_wait_request(&fsc->client->osdc, wr_req);
1890
1891 if (err >= 0 || err == -ENOENT)
1892 have |= POOL_READ;
131d7eb4 1893 else if (err != -EPERM) {
0b98acd6
ID
1894 if (err == -EBLOCKLISTED)
1895 fsc->blocklisted = true;
10183a69 1896 goto out_unlock;
131d7eb4 1897 }
10183a69
YZ
1898
1899 if (err2 == 0 || err2 == -EEXIST)
1900 have |= POOL_WRITE;
1901 else if (err2 != -EPERM) {
0b98acd6
ID
1902 if (err2 == -EBLOCKLISTED)
1903 fsc->blocklisted = true;
10183a69
YZ
1904 err = err2;
1905 goto out_unlock;
1906 }
1907
779fe0fb
YZ
1908 pool_ns_len = pool_ns ? pool_ns->len : 0;
1909 perm = kmalloc(sizeof(*perm) + pool_ns_len + 1, GFP_NOFS);
10183a69
YZ
1910 if (!perm) {
1911 err = -ENOMEM;
1912 goto out_unlock;
1913 }
1914
1915 perm->pool = pool;
1916 perm->perm = have;
779fe0fb
YZ
1917 perm->pool_ns_len = pool_ns_len;
1918 if (pool_ns_len > 0)
1919 memcpy(perm->pool_ns, pool_ns->str, pool_ns_len);
1920 perm->pool_ns[pool_ns_len] = 0;
1921
10183a69
YZ
1922 rb_link_node(&perm->node, parent, p);
1923 rb_insert_color(&perm->node, &mdsc->pool_perm_tree);
1924 err = 0;
1925out_unlock:
1926 up_write(&mdsc->pool_perm_rwsem);
1927
3ed97d63
ID
1928 ceph_osdc_put_request(rd_req);
1929 ceph_osdc_put_request(wr_req);
10183a69
YZ
1930out:
1931 if (!err)
1932 err = have;
779fe0fb
YZ
1933 if (pool_ns)
1934 dout("__ceph_pool_perm_get pool %lld ns %.*s result = %d\n",
1935 pool, (int)pool_ns->len, pool_ns->str, err);
1936 else
1937 dout("__ceph_pool_perm_get pool %lld result = %d\n", pool, err);
10183a69
YZ
1938 return err;
1939}
1940
5e3ded1b 1941int ceph_pool_perm_check(struct inode *inode, int need)
10183a69 1942{
5e3ded1b 1943 struct ceph_inode_info *ci = ceph_inode(inode);
779fe0fb 1944 struct ceph_string *pool_ns;
5e3ded1b 1945 s64 pool;
10183a69
YZ
1946 int ret, flags;
1947
80e80fbb
YZ
1948 if (ci->i_vino.snap != CEPH_NOSNAP) {
1949 /*
1950 * Pool permission check needs to write to the first object.
1951 * But for snapshot, head of the first object may have alread
1952 * been deleted. Skip check to avoid creating orphan object.
1953 */
1954 return 0;
1955 }
1956
5e3ded1b 1957 if (ceph_test_mount_opt(ceph_inode_to_client(inode),
10183a69
YZ
1958 NOPOOLPERM))
1959 return 0;
1960
1961 spin_lock(&ci->i_ceph_lock);
1962 flags = ci->i_ceph_flags;
7627151e 1963 pool = ci->i_layout.pool_id;
10183a69
YZ
1964 spin_unlock(&ci->i_ceph_lock);
1965check:
1966 if (flags & CEPH_I_POOL_PERM) {
1967 if ((need & CEPH_CAP_FILE_RD) && !(flags & CEPH_I_POOL_RD)) {
7627151e 1968 dout("ceph_pool_perm_check pool %lld no read perm\n",
10183a69
YZ
1969 pool);
1970 return -EPERM;
1971 }
1972 if ((need & CEPH_CAP_FILE_WR) && !(flags & CEPH_I_POOL_WR)) {
7627151e 1973 dout("ceph_pool_perm_check pool %lld no write perm\n",
10183a69
YZ
1974 pool);
1975 return -EPERM;
1976 }
1977 return 0;
1978 }
1979
779fe0fb
YZ
1980 pool_ns = ceph_try_get_string(ci->i_layout.pool_ns);
1981 ret = __ceph_pool_perm_get(ci, pool, pool_ns);
1982 ceph_put_string(pool_ns);
10183a69
YZ
1983 if (ret < 0)
1984 return ret;
1985
1986 flags = CEPH_I_POOL_PERM;
1987 if (ret & POOL_READ)
1988 flags |= CEPH_I_POOL_RD;
1989 if (ret & POOL_WRITE)
1990 flags |= CEPH_I_POOL_WR;
1991
1992 spin_lock(&ci->i_ceph_lock);
779fe0fb
YZ
1993 if (pool == ci->i_layout.pool_id &&
1994 pool_ns == rcu_dereference_raw(ci->i_layout.pool_ns)) {
1995 ci->i_ceph_flags |= flags;
10183a69 1996 } else {
7627151e 1997 pool = ci->i_layout.pool_id;
10183a69
YZ
1998 flags = ci->i_ceph_flags;
1999 }
2000 spin_unlock(&ci->i_ceph_lock);
2001 goto check;
2002}
2003
2004void ceph_pool_perm_destroy(struct ceph_mds_client *mdsc)
2005{
2006 struct ceph_pool_perm *perm;
2007 struct rb_node *n;
2008
2009 while (!RB_EMPTY_ROOT(&mdsc->pool_perm_tree)) {
2010 n = rb_first(&mdsc->pool_perm_tree);
2011 perm = rb_entry(n, struct ceph_pool_perm, node);
2012 rb_erase(n, &mdsc->pool_perm_tree);
2013 kfree(perm);
2014 }
2015}