1#include <linux/ceph/ceph_debug.h>
2
3#include <linux/backing-dev.h>
4#include <linux/fs.h>
5#include <linux/mm.h>
6#include <linux/pagemap.h>
7#include <linux/writeback.h>	/* generic_writepages */
8#include <linux/slab.h>
9#include <linux/pagevec.h>
10#include <linux/task_io_accounting_ops.h>
11
12#include "super.h"
13#include "mds_client.h"
14#include <linux/ceph/osd_client.h>
15
16/*
17 * Ceph address space ops.
18 *
19 * There are a few funny things going on here.
20 *
21 * The page->private field is used to reference a struct
22 * ceph_snap_context for _every_ dirty page.  This indicates which
23 * snapshot the page was logically dirtied in, and thus which snap
24 * context needs to be associated with the osd write during writeback.
25 *
26 * Similarly, struct ceph_inode_info maintains a set of counters to
27 * count dirty pages on the inode.  In the absence of snapshots,
28 * i_wrbuffer_ref == i_wrbuffer_ref_head == the dirty page count.
29 *
30 * When a snapshot is taken (that is, when the client receives
31 * notification that a snapshot was taken), each inode with caps and
32 * with dirty pages (dirty pages implies there is a cap) gets a new
33 * ceph_cap_snap in the i_cap_snaps list (which is sorted in ascending
34 * order, new snaps go to the tail).  The i_wrbuffer_ref_head count is
35 * moved to capsnap->dirty. (Unless a sync write is currently in
36 * progress.  In that case, the capsnap is said to be "pending", new
37 * writes cannot start, and the capsnap isn't "finalized" until the
38 * write completes (or fails) and a final size/mtime for the inode for
39 * that snap can be settled upon.)  i_wrbuffer_ref_head is reset to 0.
40 *
41 * On writeback, we must submit writes to the osd IN SNAP ORDER.  So,
42 * we look for the first capsnap in i_cap_snaps and write out pages in
43 * that snap context _only_.  Then we move on to the next capsnap,
44 * eventually reaching the "live" or "head" context (i.e., pages that
45 * are not yet snapped) and are writing the most recently dirtied
46 * pages.
47 *
48 * Invalidate and so forth must take care to ensure the dirty page
49 * accounting is preserved.
50 */
51
52#define CONGESTION_ON_THRESH(congestion_kb) (congestion_kb >> (PAGE_SHIFT-10))
53#define CONGESTION_OFF_THRESH(congestion_kb)				\
54	(CONGESTION_ON_THRESH(congestion_kb) -				\
55	 (CONGESTION_ON_THRESH(congestion_kb) >> 2))
56
57
58
59/*
60 * Dirty a page.  Optimistically adjust accounting, on the assumption
61 * that we won't race with invalidate.  If we do, readjust.
62 */
63static int ceph_set_page_dirty(struct page *page)
64{
65	struct address_space *mapping = page->mapping;
66	struct inode *inode;
67	struct ceph_inode_info *ci;
68	int undo = 0;
69	struct ceph_snap_context *snapc;
70
71	if (unlikely(!mapping))
72		return !TestSetPageDirty(page);
73
74	if (TestSetPageDirty(page)) {
75		dout("%p set_page_dirty %p idx %lu -- already dirty\n",
76		     mapping->host, page, page->index);
77		return 0;
78	}
79
80	inode = mapping->host;
81	ci = ceph_inode(inode);
82
83	/*
84	 * Note that we're grabbing a snapc ref here without holding
85	 * any locks!
86	 */
87	snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
88
89	/* dirty the head */
90	spin_lock(&ci->i_ceph_lock);
91	if (ci->i_head_snapc == NULL)
92		ci->i_head_snapc = ceph_get_snap_context(snapc);
93	++ci->i_wrbuffer_ref_head;
94	if (ci->i_wrbuffer_ref == 0)
95		ihold(inode);
96	++ci->i_wrbuffer_ref;
97	dout("%p set_page_dirty %p idx %lu head %d/%d -> %d/%d "
98	     "snapc %p seq %lld (%d snaps)\n",
99	     mapping->host, page, page->index,
100	     ci->i_wrbuffer_ref-1, ci->i_wrbuffer_ref_head-1,
101	     ci->i_wrbuffer_ref, ci->i_wrbuffer_ref_head,
102	     snapc, snapc->seq, snapc->num_snaps);
103	spin_unlock(&ci->i_ceph_lock);
104
105	/* now adjust page */
106	spin_lock_irq(&mapping->tree_lock);
107	if (page->mapping) {	/* Race with truncate? */
108		WARN_ON_ONCE(!PageUptodate(page));
109		account_page_dirtied(page, page->mapping);
110		radix_tree_tag_set(&mapping->page_tree,
111				page_index(page), PAGECACHE_TAG_DIRTY);
112
113		/*
114		 * Reference snap context in page->private.  Also set
115		 * PagePrivate so that we get invalidatepage callback.
116		 */
117		page->private = (unsigned long)snapc;
118		SetPagePrivate(page);
119	} else {
120		dout("ANON set_page_dirty %p (raced truncate?)\n", page);
121		undo = 1;
122	}
123
124	spin_unlock_irq(&mapping->tree_lock);
125
126	if (undo)
127		/* whoops, we failed to dirty the page */
128		ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
129
130	__mark_inode_dirty(mapping->host, I_DIRTY_PAGES);
131
132	BUG_ON(!PageDirty(page));
133	return 1;
134}
135
136/*
137 * If we are truncating the full page (i.e. offset == 0), adjust the
138 * dirty page counters appropriately.  Only called if there is private
139 * data on the page.
140 */
141static void ceph_invalidatepage(struct page *page, unsigned long offset)
142{
143	struct inode *inode;
144	struct ceph_inode_info *ci;
145	struct ceph_snap_context *snapc = (void *)page->private;
146
147	BUG_ON(!PageLocked(page));
148	BUG_ON(!page->private);
149	BUG_ON(!PagePrivate(page));
150	BUG_ON(!page->mapping);
151
152	inode = page->mapping->host;
153
154	/*
155	 * We can get non-dirty pages here due to races between
156	 * set_page_dirty and truncate_complete_page; just spit out a
157	 * warning, in case we end up with accounting problems later.
158	 */
159	if (!PageDirty(page))
160		pr_err("%p invalidatepage %p page not dirty\n", inode, page);
161
162	if (offset == 0)
163		ClearPageChecked(page);
164
165	ci = ceph_inode(inode);
166	if (offset == 0) {
167		dout("%p invalidatepage %p idx %lu full dirty page %lu\n",
168		     inode, page, page->index, offset);
169		ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
170		ceph_put_snap_context(snapc);
171		page->private = 0;
172		ClearPagePrivate(page);
173	} else {
174		dout("%p invalidatepage %p idx %lu partial dirty page\n",
175		     inode, page, page->index);
176	}
177}
178
179/* just a sanity check */
180static int ceph_releasepage(struct page *page, gfp_t g)
181{
182	struct inode *inode = page->mapping ? page->mapping->host : NULL;
183	dout("%p releasepage %p idx %lu\n", inode, page, page->index);
184	WARN_ON(PageDirty(page));
185	WARN_ON(page->private);
186	WARN_ON(PagePrivate(page));
187	return 0;
188}
189
190/*
191 * read a single page, without unlocking it.
192 */
193static int readpage_nounlock(struct file *filp, struct page *page)
194{
195	struct inode *inode = filp->f_dentry->d_inode;
196	struct ceph_inode_info *ci = ceph_inode(inode);
197	struct ceph_osd_client *osdc =
198		&ceph_inode_to_client(inode)->client->osdc;
199	int err = 0;
200	u64 len = PAGE_CACHE_SIZE;
201
202	dout("readpage inode %p file %p page %p index %lu\n",
203	     inode, filp, page, page->index);
204	err = ceph_osdc_readpages(osdc, ceph_vino(inode), &ci->i_layout,
205				  page->index << PAGE_CACHE_SHIFT, &len,
206				  ci->i_truncate_seq, ci->i_truncate_size,
207				  &page, 1, 0);
208	if (err == -ENOENT)
209		err = 0;
210	if (err < 0) {
211		SetPageError(page);
212		goto out;
213	} else if (err < PAGE_CACHE_SIZE) {
214		/* zero fill remainder of page */
215		zero_user_segment(page, err, PAGE_CACHE_SIZE);
216	}
217	SetPageUptodate(page);
218
219out:
220	return err < 0 ? err : 0;
221}
222
223static int ceph_readpage(struct file *filp, struct page *page)
224{
225	int r = readpage_nounlock(filp, page);
226	unlock_page(page);
227	return r;
228}
229
230/*
231 * Finish an async read(ahead) op.
232 */
233static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
234{
235	struct inode *inode = req->r_inode;
236	struct ceph_osd_reply_head *replyhead;
237	int rc, bytes;
238	int i;
239
240	/* parse reply */
241	replyhead = msg->front.iov_base;
242	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
243	rc = le32_to_cpu(replyhead->result);
244	bytes = le32_to_cpu(msg->hdr.data_len);
245
246	dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
247
248	/* unlock all pages, zeroing any data we didn't read */
249	for (i = 0; i < req->r_num_pages; i++, bytes -= PAGE_CACHE_SIZE) {
250		struct page *page = req->r_pages[i];
251
252		if (bytes < (int)PAGE_CACHE_SIZE) {
253			/* zero (remainder of) page */
254			int s = bytes < 0 ? 0 : bytes;
255			zero_user_segment(page, s, PAGE_CACHE_SIZE);
256		}
257 		dout("finish_read %p uptodate %p idx %lu\n", inode, page,
258		     page->index);
259		flush_dcache_page(page);
260		SetPageUptodate(page);
261		unlock_page(page);
262		page_cache_release(page);
263	}
264	kfree(req->r_pages);
265}
266
267/*
268 * start an async read(ahead) operation.  return nr_pages we submitted
269 * a read for on success, or negative error code.
270 */
271static int start_read(struct inode *inode, struct list_head *page_list, int max)
272{
273	struct ceph_osd_client *osdc =
274		&ceph_inode_to_client(inode)->client->osdc;
275	struct ceph_inode_info *ci = ceph_inode(inode);
276	struct page *page = list_entry(page_list->prev, struct page, lru);
277	struct ceph_osd_request *req;
278	u64 off;
279	u64 len;
280	int i;
281	struct page **pages;
282	pgoff_t next_index;
283	int nr_pages = 0;
284	int ret;
285
286	off = page->index << PAGE_CACHE_SHIFT;
287
288	/* count pages */
289	next_index = page->index;
290	list_for_each_entry_reverse(page, page_list, lru) {
291		if (page->index != next_index)
292			break;
293		nr_pages++;
294		next_index++;
295		if (max && nr_pages == max)
296			break;
297	}
298	len = nr_pages << PAGE_CACHE_SHIFT;
299	dout("start_read %p nr_pages %d is %lld~%lld\n", inode, nr_pages,
300	     off, len);
301
302	req = ceph_osdc_new_request(osdc, &ci->i_layout, ceph_vino(inode),
303				    off, &len,
304				    CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
305				    NULL, 0,
306				    ci->i_truncate_seq, ci->i_truncate_size,
307				    NULL, false, 1, 0);
308	if (!req)
309		return -ENOMEM;
310
311	/* build page vector */
312	nr_pages = len >> PAGE_CACHE_SHIFT;
313	pages = kmalloc(sizeof(*pages) * nr_pages, GFP_NOFS);
314	ret = -ENOMEM;
315	if (!pages)
316		goto out;
317	for (i = 0; i < nr_pages; ++i) {
318		page = list_entry(page_list->prev, struct page, lru);
319		BUG_ON(PageLocked(page));
320		list_del(&page->lru);
321
322 		dout("start_read %p adding %p idx %lu\n", inode, page,
323		     page->index);
324		if (add_to_page_cache_lru(page, &inode->i_data, page->index,
325					  GFP_NOFS)) {
326			page_cache_release(page);
327			dout("start_read %p add_to_page_cache failed %p\n",
328			     inode, page);
329			nr_pages = i;
330			goto out_pages;
331		}
332		pages[i] = page;
333	}
334	req->r_pages = pages;
335	req->r_num_pages = nr_pages;
336	req->r_callback = finish_read;
337	req->r_inode = inode;
338
339	dout("start_read %p starting %p %lld~%lld\n", inode, req, off, len);
340	ret = ceph_osdc_start_request(osdc, req, false);
341	if (ret < 0)
342		goto out_pages;
343	ceph_osdc_put_request(req);
344	return nr_pages;
345
346out_pages:
347	ceph_release_page_vector(pages, nr_pages);
348out:
349	ceph_osdc_put_request(req);
350	return ret;
351}
352
353
354/*
355 * Read multiple pages.  Leave pages we don't read + unlock in page_list;
356 * the caller (VM) cleans them up.
357 */
358static int ceph_readpages(struct file *file, struct address_space *mapping,
359			  struct list_head *page_list, unsigned nr_pages)
360{
361	struct inode *inode = file->f_dentry->d_inode;
362	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
363	int rc = 0;
364	int max = 0;
365
366	if (fsc->mount_options->rsize >= PAGE_CACHE_SIZE)
367		max = (fsc->mount_options->rsize + PAGE_CACHE_SIZE - 1)
368			>> PAGE_SHIFT;
369
370	dout("readpages %p file %p nr_pages %d max %d\n", inode, file, nr_pages,
371	     max);
372	while (!list_empty(page_list)) {
373		rc = start_read(inode, page_list, max);
374		if (rc < 0)
375			goto out;
376		BUG_ON(rc == 0);
377	}
378out:
379	dout("readpages %p file %p ret %d\n", inode, file, rc);
380	return rc;
381}
382
383/*
384 * Get ref for the oldest snapc for an inode with dirty data... that is, the
385 * only snap context we are allowed to write back.
386 */
387static struct ceph_snap_context *get_oldest_context(struct inode *inode,
388						    u64 *snap_size)
389{
390	struct ceph_inode_info *ci = ceph_inode(inode);
391	struct ceph_snap_context *snapc = NULL;
392	struct ceph_cap_snap *capsnap = NULL;
393
394	spin_lock(&ci->i_ceph_lock);
395	list_for_each_entry(capsnap, &ci->i_cap_snaps, ci_item) {
396		dout(" cap_snap %p snapc %p has %d dirty pages\n", capsnap,
397		     capsnap->context, capsnap->dirty_pages);
398		if (capsnap->dirty_pages) {
399			snapc = ceph_get_snap_context(capsnap->context);
400			if (snap_size)
401				*snap_size = capsnap->size;
402			break;
403		}
404	}
405	if (!snapc && ci->i_wrbuffer_ref_head) {
406		snapc = ceph_get_snap_context(ci->i_head_snapc);
407		dout(" head snapc %p has %d dirty pages\n",
408		     snapc, ci->i_wrbuffer_ref_head);
409	}
410	spin_unlock(&ci->i_ceph_lock);
411	return snapc;
412}
413
414/*
415 * Write a single page, but leave the page locked.
416 *
417 * If we get a write error, set the page error bit, but still adjust the
418 * dirty page accounting (i.e., page is no longer dirty).
419 */
420static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
421{
422	struct inode *inode;
423	struct ceph_inode_info *ci;
424	struct ceph_fs_client *fsc;
425	struct ceph_osd_client *osdc;
426	loff_t page_off = page->index << PAGE_CACHE_SHIFT;
427	int len = PAGE_CACHE_SIZE;
428	loff_t i_size;
429	int err = 0;
430	struct ceph_snap_context *snapc, *oldest;
431	u64 snap_size = 0;
432	long writeback_stat;
433
434	dout("writepage %p idx %lu\n", page, page->index);
435
436	if (!page->mapping || !page->mapping->host) {
437		dout("writepage %p - no mapping\n", page);
438		return -EFAULT;
439	}
440	inode = page->mapping->host;
441	ci = ceph_inode(inode);
442	fsc = ceph_inode_to_client(inode);
443	osdc = &fsc->client->osdc;
444
445	/* verify this is a writeable snap context */
446	snapc = (void *)page->private;
447	if (snapc == NULL) {
448		dout("writepage %p page %p not dirty?\n", inode, page);
449		goto out;
450	}
451	oldest = get_oldest_context(inode, &snap_size);
452	if (snapc->seq > oldest->seq) {
453		dout("writepage %p page %p snapc %p not writeable - noop\n",
454		     inode, page, (void *)page->private);
455		/* we should only noop if called by kswapd */
456		WARN_ON((current->flags & PF_MEMALLOC) == 0);
457		ceph_put_snap_context(oldest);
458		goto out;
459	}
460	ceph_put_snap_context(oldest);
461
462	/* is this a partial page at end of file? */
463	if (snap_size)
464		i_size = snap_size;
465	else
466		i_size = i_size_read(inode);
467	if (i_size < page_off + len)
468		len = i_size - page_off;
469
470	dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
471	     inode, page, page->index, page_off, len, snapc);
472
473	writeback_stat = atomic_long_inc_return(&fsc->writeback_count);
474	if (writeback_stat >
475	    CONGESTION_ON_THRESH(fsc->mount_options->congestion_kb))
476		set_bdi_congested(&fsc->backing_dev_info, BLK_RW_ASYNC);
477
478	set_page_writeback(page);
479	err = ceph_osdc_writepages(osdc, ceph_vino(inode),
480				   &ci->i_layout, snapc,
481				   page_off, len,
482				   ci->i_truncate_seq, ci->i_truncate_size,
483				   &inode->i_mtime,
484				   &page, 1, 0, 0, true);
485	if (err < 0) {
486		dout("writepage setting page/mapping error %d %p\n", err, page);
487		SetPageError(page);
488		mapping_set_error(&inode->i_data, err);
489		if (wbc)
490			wbc->pages_skipped++;
491	} else {
492		dout("writepage cleaned page %p\n", page);
493		err = 0;  /* vfs expects us to return 0 */
494	}
495	page->private = 0;
496	ClearPagePrivate(page);
497	end_page_writeback(page);
498	ceph_put_wrbuffer_cap_refs(ci, 1, snapc);
499	ceph_put_snap_context(snapc);  /* page's reference */
500out:
501	return err;
502}
503
504static int ceph_writepage(struct page *page, struct writeback_control *wbc)
505{
506	int err;
507	struct inode *inode = page->mapping->host;
508	BUG_ON(!inode);
509	ihold(inode);
510	err = writepage_nounlock(page, wbc);
511	unlock_page(page);
512	iput(inode);
513	return err;
514}
515
516
517/*
518 * lame release_pages helper.  release_pages() isn't exported to
519 * modules.
520 */
521static void ceph_release_pages(struct page **pages, int num)
522{
523	struct pagevec pvec;
524	int i;
525
526	pagevec_init(&pvec, 0);
527	for (i = 0; i < num; i++) {
528		if (pagevec_add(&pvec, pages[i]) == 0)
529			pagevec_release(&pvec);
530	}
531	pagevec_release(&pvec);
532}
533
534
535/*
536 * async writeback completion handler.
537 *
538 * If we get an error, set the mapping error bit, but not the individual
539 * page error bits.
540 */
541static void writepages_finish(struct ceph_osd_request *req,
542			      struct ceph_msg *msg)
543{
544	struct inode *inode = req->r_inode;
545	struct ceph_osd_reply_head *replyhead;
546	struct ceph_osd_op *op;
547	struct ceph_inode_info *ci = ceph_inode(inode);
548	unsigned wrote;
549	struct page *page;
550	int i;
551	struct ceph_snap_context *snapc = req->r_snapc;
552	struct address_space *mapping = inode->i_mapping;
553	__s32 rc = -EIO;
554	u64 bytes = 0;
555	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
556	long writeback_stat;
557	unsigned issued = ceph_caps_issued(ci);
558
559	/* parse reply */
560	replyhead = msg->front.iov_base;
561	WARN_ON(le32_to_cpu(replyhead->num_ops) == 0);
562	op = (void *)(replyhead + 1);
563	rc = le32_to_cpu(replyhead->result);
564	bytes = le64_to_cpu(op->extent.length);
565
566	if (rc >= 0) {
567		/*
568		 * Assume we wrote the pages we originally sent.  The
569		 * osd might reply with fewer pages if our writeback
570		 * raced with a truncation and was adjusted at the osd,
571		 * so don't believe the reply.
572		 */
573		wrote = req->r_num_pages;
574	} else {
575		wrote = 0;
576		mapping_set_error(mapping, rc);
577	}
578	dout("writepages_finish %p rc %d bytes %llu wrote %d (pages)\n",
579	     inode, rc, bytes, wrote);
580
581	/* clean all pages */
582	for (i = 0; i < req->r_num_pages; i++) {
583		page = req->r_pages[i];
584		BUG_ON(!page);
585		WARN_ON(!PageUptodate(page));
586
587		writeback_stat =
588			atomic_long_dec_return(&fsc->writeback_count);
589		if (writeback_stat <
590		    CONGESTION_OFF_THRESH(fsc->mount_options->congestion_kb))
591			clear_bdi_congested(&fsc->backing_dev_info,
592					    BLK_RW_ASYNC);
593
594		ceph_put_snap_context((void *)page->private);
595		page->private = 0;
596		ClearPagePrivate(page);
597		dout("unlocking %d %p\n", i, page);
598		end_page_writeback(page);
599
600		/*
601		 * We lost the cache cap, need to truncate the page before
602		 * it is unlocked, otherwise we'd truncate it later in the
603		 * page truncation thread, possibly losing some data that
604		 * raced its way in
605		 */
606		if ((issued & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0)
607			generic_error_remove_page(inode->i_mapping, page);
608
609		unlock_page(page);
610	}
611	dout("%p wrote+cleaned %d pages\n", inode, wrote);
612	ceph_put_wrbuffer_cap_refs(ci, req->r_num_pages, snapc);
613
614	ceph_release_pages(req->r_pages, req->r_num_pages);
615	if (req->r_pages_from_pool)
616		mempool_free(req->r_pages,
617			     ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
618	else
619		kfree(req->r_pages);
620	ceph_osdc_put_request(req);
621}
622
623/*
624 * allocate a page vec, either directly, or if necessary, via a the
625 * mempool.  we avoid the mempool if we can because req->r_num_pages
626 * may be less than the maximum write size.
627 */
628static void alloc_page_vec(struct ceph_fs_client *fsc,
629			   struct ceph_osd_request *req)
630{
631	req->r_pages = kmalloc(sizeof(struct page *) * req->r_num_pages,
632			       GFP_NOFS);
633	if (!req->r_pages) {
634		req->r_pages = mempool_alloc(fsc->wb_pagevec_pool, GFP_NOFS);
635		req->r_pages_from_pool = 1;
636		WARN_ON(!req->r_pages);
637	}
638}
639
640/*
641 * initiate async writeback
642 */
643static int ceph_writepages_start(struct address_space *mapping,
644				 struct writeback_control *wbc)
645{
646	struct inode *inode = mapping->host;
647	struct ceph_inode_info *ci = ceph_inode(inode);
648	struct ceph_fs_client *fsc;
649	pgoff_t index, start, end;
650	int range_whole = 0;
651	int should_loop = 1;
652	pgoff_t max_pages = 0, max_pages_ever = 0;
653	struct ceph_snap_context *snapc = NULL, *last_snapc = NULL, *pgsnapc;
654	struct pagevec pvec;
655	int done = 0;
656	int rc = 0;
657	unsigned wsize = 1 << inode->i_blkbits;
658	struct ceph_osd_request *req = NULL;
659	int do_sync;
660	u64 snap_size = 0;
661
662	/*
663	 * Include a 'sync' in the OSD request if this is a data
664	 * integrity write (e.g., O_SYNC write or fsync()), or if our
665	 * cap is being revoked.
666	 */
667	do_sync = wbc->sync_mode == WB_SYNC_ALL;
668	if (ceph_caps_revoking(ci, CEPH_CAP_FILE_BUFFER))
669		do_sync = 1;
670	dout("writepages_start %p dosync=%d (mode=%s)\n",
671	     inode, do_sync,
672	     wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
673	     (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
674
675	fsc = ceph_inode_to_client(inode);
676	if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
677		pr_warning("writepage_start %p on forced umount\n", inode);
678		return -EIO; /* we're in a forced umount, don't write! */
679	}
680	if (fsc->mount_options->wsize && fsc->mount_options->wsize < wsize)
681		wsize = fsc->mount_options->wsize;
682	if (wsize < PAGE_CACHE_SIZE)
683		wsize = PAGE_CACHE_SIZE;
684	max_pages_ever = wsize >> PAGE_CACHE_SHIFT;
685
686	pagevec_init(&pvec, 0);
687
688	/* where to start/end? */
689	if (wbc->range_cyclic) {
690		start = mapping->writeback_index; /* Start from prev offset */
691		end = -1;
692		dout(" cyclic, start at %lu\n", start);
693	} else {
694		start = wbc->range_start >> PAGE_CACHE_SHIFT;
695		end = wbc->range_end >> PAGE_CACHE_SHIFT;
696		if (wbc->range_start == 0 && wbc->range_end == LLONG_MAX)
697			range_whole = 1;
698		should_loop = 0;
699		dout(" not cyclic, %lu to %lu\n", start, end);
700	}
701	index = start;
702
703retry:
704	/* find oldest snap context with dirty data */
705	ceph_put_snap_context(snapc);
706	snapc = get_oldest_context(inode, &snap_size);
707	if (!snapc) {
708		/* hmm, why does writepages get called when there
709		   is no dirty data? */
710		dout(" no snap context with dirty data?\n");
711		goto out;
712	}
713	dout(" oldest snapc is %p seq %lld (%d snaps)\n",
714	     snapc, snapc->seq, snapc->num_snaps);
715	if (last_snapc && snapc != last_snapc) {
716		/* if we switched to a newer snapc, restart our scan at the
717		 * start of the original file range. */
718		dout("  snapc differs from last pass, restarting at %lu\n",
719		     index);
720		index = start;
721	}
722	last_snapc = snapc;
723
724	while (!done && index <= end) {
725		unsigned i;
726		int first;
727		pgoff_t next;
728		int pvec_pages, locked_pages;
729		struct page *page;
730		int want;
731		u64 offset, len;
732		struct ceph_osd_request_head *reqhead;
733		struct ceph_osd_op *op;
734		long writeback_stat;
735
736		next = 0;
737		locked_pages = 0;
738		max_pages = max_pages_ever;
739
740get_more_pages:
741		first = -1;
742		want = min(end - index,
743			   min((pgoff_t)PAGEVEC_SIZE,
744			       max_pages - (pgoff_t)locked_pages) - 1)
745			+ 1;
746		pvec_pages = pagevec_lookup_tag(&pvec, mapping, &index,
747						PAGECACHE_TAG_DIRTY,
748						want);
749		dout("pagevec_lookup_tag got %d\n", pvec_pages);
750		if (!pvec_pages && !locked_pages)
751			break;
752		for (i = 0; i < pvec_pages && locked_pages < max_pages; i++) {
753			page = pvec.pages[i];
754			dout("? %p idx %lu\n", page, page->index);
755			if (locked_pages == 0)
756				lock_page(page);  /* first page */
757			else if (!trylock_page(page))
758				break;
759
760			/* only dirty pages, or our accounting breaks */
761			if (unlikely(!PageDirty(page)) ||
762			    unlikely(page->mapping != mapping)) {
763				dout("!dirty or !mapping %p\n", page);
764				unlock_page(page);
765				break;
766			}
767			if (!wbc->range_cyclic && page->index > end) {
768				dout("end of range %p\n", page);
769				done = 1;
770				unlock_page(page);
771				break;
772			}
773			if (next && (page->index != next)) {
774				dout("not consecutive %p\n", page);
775				unlock_page(page);
776				break;
777			}
778			if (wbc->sync_mode != WB_SYNC_NONE) {
779				dout("waiting on writeback %p\n", page);
780				wait_on_page_writeback(page);
781			}
782			if ((snap_size && page_offset(page) > snap_size) ||
783			    (!snap_size &&
784			     page_offset(page) > i_size_read(inode))) {
785				dout("%p page eof %llu\n", page, snap_size ?
786				     snap_size : i_size_read(inode));
787				done = 1;
788				unlock_page(page);
789				break;
790			}
791			if (PageWriteback(page)) {
792				dout("%p under writeback\n", page);
793				unlock_page(page);
794				break;
795			}
796
797			/* only if matching snap context */
798			pgsnapc = (void *)page->private;
799			if (pgsnapc->seq > snapc->seq) {
800				dout("page snapc %p %lld > oldest %p %lld\n",
801				     pgsnapc, pgsnapc->seq, snapc, snapc->seq);
802				unlock_page(page);
803				if (!locked_pages)
804					continue; /* keep looking for snap */
805				break;
806			}
807
808			if (!clear_page_dirty_for_io(page)) {
809				dout("%p !clear_page_dirty_for_io\n", page);
810				unlock_page(page);
811				break;
812			}
813
814			/* ok */
815			if (locked_pages == 0) {
816				/* prepare async write request */
817				offset = (unsigned long long)page->index
818					<< PAGE_CACHE_SHIFT;
819				len = wsize;
820				req = ceph_osdc_new_request(&fsc->client->osdc,
821					    &ci->i_layout,
822					    ceph_vino(inode),
823					    offset, &len,
824					    CEPH_OSD_OP_WRITE,
825					    CEPH_OSD_FLAG_WRITE |
826						    CEPH_OSD_FLAG_ONDISK,
827					    snapc, do_sync,
828					    ci->i_truncate_seq,
829					    ci->i_truncate_size,
830					    &inode->i_mtime, true, 1, 0);
831
832				if (!req) {
833					rc = -ENOMEM;
834					unlock_page(page);
835					break;
836				}
837
838				max_pages = req->r_num_pages;
839
840				alloc_page_vec(fsc, req);
841				req->r_callback = writepages_finish;
842				req->r_inode = inode;
843			}
844
845			/* note position of first page in pvec */
846			if (first < 0)
847				first = i;
848			dout("%p will write page %p idx %lu\n",
849			     inode, page, page->index);
850
851			writeback_stat =
852			       atomic_long_inc_return(&fsc->writeback_count);
853			if (writeback_stat > CONGESTION_ON_THRESH(
854				    fsc->mount_options->congestion_kb)) {
855				set_bdi_congested(&fsc->backing_dev_info,
856						  BLK_RW_ASYNC);
857			}
858
859			set_page_writeback(page);
860			req->r_pages[locked_pages] = page;
861			locked_pages++;
862			next = page->index + 1;
863		}
864
865		/* did we get anything? */
866		if (!locked_pages)
867			goto release_pvec_pages;
868		if (i) {
869			int j;
870			BUG_ON(!locked_pages || first < 0);
871
872			if (pvec_pages && i == pvec_pages &&
873			    locked_pages < max_pages) {
874				dout("reached end pvec, trying for more\n");
875				pagevec_reinit(&pvec);
876				goto get_more_pages;
877			}
878
879			/* shift unused pages over in the pvec...  we
880			 * will need to release them below. */
881			for (j = i; j < pvec_pages; j++) {
882				dout(" pvec leftover page %p\n",
883				     pvec.pages[j]);
884				pvec.pages[j-i+first] = pvec.pages[j];
885			}
886			pvec.nr -= i-first;
887		}
888
889		/* submit the write */
890		offset = req->r_pages[0]->index << PAGE_CACHE_SHIFT;
891		len = min((snap_size ? snap_size : i_size_read(inode)) - offset,
892			  (u64)locked_pages << PAGE_CACHE_SHIFT);
893		dout("writepages got %d pages at %llu~%llu\n",
894		     locked_pages, offset, len);
895
896		/* revise final length, page count */
897		req->r_num_pages = locked_pages;
898		reqhead = req->r_request->front.iov_base;
899		op = (void *)(reqhead + 1);
900		op->extent.length = cpu_to_le64(len);
901		op->payload_len = cpu_to_le32(len);
902		req->r_request->hdr.data_len = cpu_to_le32(len);
903
904		rc = ceph_osdc_start_request(&fsc->client->osdc, req, true);
905		BUG_ON(rc);
906		req = NULL;
907
908		/* continue? */
909		index = next;
910		wbc->nr_to_write -= locked_pages;
911		if (wbc->nr_to_write <= 0)
912			done = 1;
913
914release_pvec_pages:
915		dout("pagevec_release on %d pages (%p)\n", (int)pvec.nr,
916		     pvec.nr ? pvec.pages[0] : NULL);
917		pagevec_release(&pvec);
918
919		if (locked_pages && !done)
920			goto retry;
921	}
922
923	if (should_loop && !done) {
924		/* more to do; loop back to beginning of file */
925		dout("writepages looping back to beginning of file\n");
926		should_loop = 0;
927		index = 0;
928		goto retry;
929	}
930
931	if (wbc->range_cyclic || (range_whole && wbc->nr_to_write > 0))
932		mapping->writeback_index = index;
933
934out:
935	if (req)
936		ceph_osdc_put_request(req);
937	ceph_put_snap_context(snapc);
938	dout("writepages done, rc = %d\n", rc);
939	return rc;
940}
941
942
943
944/*
945 * See if a given @snapc is either writeable, or already written.
946 */
947static int context_is_writeable_or_written(struct inode *inode,
948					   struct ceph_snap_context *snapc)
949{
950	struct ceph_snap_context *oldest = get_oldest_context(inode, NULL);
951	int ret = !oldest || snapc->seq <= oldest->seq;
952
953	ceph_put_snap_context(oldest);
954	return ret;
955}
956
957/*
958 * We are only allowed to write into/dirty the page if the page is
959 * clean, or already dirty within the same snap context.
960 *
961 * called with page locked.
962 * return success with page locked,
963 * or any failure (incl -EAGAIN) with page unlocked.
964 */
965static int ceph_update_writeable_page(struct file *file,
966			    loff_t pos, unsigned len,
967			    struct page *page)
968{
969	struct inode *inode = file->f_dentry->d_inode;
970	struct ceph_inode_info *ci = ceph_inode(inode);
971	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
972	loff_t page_off = pos & PAGE_CACHE_MASK;
973	int pos_in_page = pos & ~PAGE_CACHE_MASK;
974	int end_in_page = pos_in_page + len;
975	loff_t i_size;
976	int r;
977	struct ceph_snap_context *snapc, *oldest;
978
979retry_locked:
980	/* writepages currently holds page lock, but if we change that later, */
981	wait_on_page_writeback(page);
982
983	/* check snap context */
984	BUG_ON(!ci->i_snap_realm);
985	down_read(&mdsc->snap_rwsem);
986	BUG_ON(!ci->i_snap_realm->cached_context);
987	snapc = (void *)page->private;
988	if (snapc && snapc != ci->i_head_snapc) {
989		/*
990		 * this page is already dirty in another (older) snap
991		 * context!  is it writeable now?
992		 */
993		oldest = get_oldest_context(inode, NULL);
994		up_read(&mdsc->snap_rwsem);
995
996		if (snapc->seq > oldest->seq) {
997			ceph_put_snap_context(oldest);
998			dout(" page %p snapc %p not current or oldest\n",
999			     page, snapc);
1000			/*
1001			 * queue for writeback, and wait for snapc to
1002			 * be writeable or written
1003			 */
1004			snapc = ceph_get_snap_context(snapc);
1005			unlock_page(page);
1006			ceph_queue_writeback(inode);
1007			r = wait_event_interruptible(ci->i_cap_wq,
1008			       context_is_writeable_or_written(inode, snapc));
1009			ceph_put_snap_context(snapc);
1010			if (r == -ERESTARTSYS)
1011				return r;
1012			return -EAGAIN;
1013		}
1014		ceph_put_snap_context(oldest);
1015
1016		/* yay, writeable, do it now (without dropping page lock) */
1017		dout(" page %p snapc %p not current, but oldest\n",
1018		     page, snapc);
1019		if (!clear_page_dirty_for_io(page))
1020			goto retry_locked;
1021		r = writepage_nounlock(page, NULL);
1022		if (r < 0)
1023			goto fail_nosnap;
1024		goto retry_locked;
1025	}
1026
1027	if (PageUptodate(page)) {
1028		dout(" page %p already uptodate\n", page);
1029		return 0;
1030	}
1031
1032	/* full page? */
1033	if (pos_in_page == 0 && len == PAGE_CACHE_SIZE)
1034		return 0;
1035
1036	/* past end of file? */
1037	i_size = inode->i_size;   /* caller holds i_mutex */
1038
1039	if (i_size + len > inode->i_sb->s_maxbytes) {
1040		/* file is too big */
1041		r = -EINVAL;
1042		goto fail;
1043	}
1044
1045	if (page_off >= i_size ||
1046	    (pos_in_page == 0 && (pos+len) >= i_size &&
1047	     end_in_page - pos_in_page != PAGE_CACHE_SIZE)) {
1048		dout(" zeroing %p 0 - %d and %d - %d\n",
1049		     page, pos_in_page, end_in_page, (int)PAGE_CACHE_SIZE);
1050		zero_user_segments(page,
1051				   0, pos_in_page,
1052				   end_in_page, PAGE_CACHE_SIZE);
1053		return 0;
1054	}
1055
1056	/* we need to read it. */
1057	up_read(&mdsc->snap_rwsem);
1058	r = readpage_nounlock(file, page);
1059	if (r < 0)
1060		goto fail_nosnap;
1061	goto retry_locked;
1062
1063fail:
1064	up_read(&mdsc->snap_rwsem);
1065fail_nosnap:
1066	unlock_page(page);
1067	return r;
1068}
1069
1070/*
1071 * We are only allowed to write into/dirty the page if the page is
1072 * clean, or already dirty within the same snap context.
1073 */
1074static int ceph_write_begin(struct file *file, struct address_space *mapping,
1075			    loff_t pos, unsigned len, unsigned flags,
1076			    struct page **pagep, void **fsdata)
1077{
1078	struct inode *inode = file->f_dentry->d_inode;
1079	struct page *page;
1080	pgoff_t index = pos >> PAGE_CACHE_SHIFT;
1081	int r;
1082
1083	do {
1084		/* get a page */
1085		page = grab_cache_page_write_begin(mapping, index, 0);
1086		if (!page)
1087			return -ENOMEM;
1088		*pagep = page;
1089
1090		dout("write_begin file %p inode %p page %p %d~%d\n", file,
1091		     inode, page, (int)pos, (int)len);
1092
1093		r = ceph_update_writeable_page(file, pos, len, page);
1094	} while (r == -EAGAIN);
1095
1096	return r;
1097}
1098
1099/*
1100 * we don't do anything in here that simple_write_end doesn't do
1101 * except adjust dirty page accounting and drop read lock on
1102 * mdsc->snap_rwsem.
1103 */
1104static int ceph_write_end(struct file *file, struct address_space *mapping,
1105			  loff_t pos, unsigned len, unsigned copied,
1106			  struct page *page, void *fsdata)
1107{
1108	struct inode *inode = file->f_dentry->d_inode;
1109	struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
1110	struct ceph_mds_client *mdsc = fsc->mdsc;
1111	unsigned from = pos & (PAGE_CACHE_SIZE - 1);
1112	int check_cap = 0;
1113
1114	dout("write_end file %p inode %p page %p %d~%d (%d)\n", file,
1115	     inode, page, (int)pos, (int)copied, (int)len);
1116
1117	/* zero the stale part of the page if we did a short copy */
1118	if (copied < len)
1119		zero_user_segment(page, from+copied, len);
1120
1121	/* did file size increase? */
1122	/* (no need for i_size_read(); we caller holds i_mutex */
1123	if (pos+copied > inode->i_size)
1124		check_cap = ceph_inode_set_size(inode, pos+copied);
1125
1126	if (!PageUptodate(page))
1127		SetPageUptodate(page);
1128
1129	set_page_dirty(page);
1130
1131	unlock_page(page);
1132	up_read(&mdsc->snap_rwsem);
1133	page_cache_release(page);
1134
1135	if (check_cap)
1136		ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL);
1137
1138	return copied;
1139}
1140
1141/*
1142 * we set .direct_IO to indicate direct io is supported, but since we
1143 * intercept O_DIRECT reads and writes early, this function should
1144 * never get called.
1145 */
1146static ssize_t ceph_direct_io(int rw, struct kiocb *iocb,
1147			      const struct iovec *iov,
1148			      loff_t pos, unsigned long nr_segs)
1149{
1150	WARN_ON(1);
1151	return -EINVAL;
1152}
1153
1154const struct address_space_operations ceph_aops = {
1155	.readpage = ceph_readpage,
1156	.readpages = ceph_readpages,
1157	.writepage = ceph_writepage,
1158	.writepages = ceph_writepages_start,
1159	.write_begin = ceph_write_begin,
1160	.write_end = ceph_write_end,
1161	.set_page_dirty = ceph_set_page_dirty,
1162	.invalidatepage = ceph_invalidatepage,
1163	.releasepage = ceph_releasepage,
1164	.direct_IO = ceph_direct_io,
1165};
1166
1167
1168/*
1169 * vm ops
1170 */
1171
1172/*
1173 * Reuse write_begin here for simplicity.
1174 */
1175static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1176{
1177	struct inode *inode = vma->vm_file->f_dentry->d_inode;
1178	struct page *page = vmf->page;
1179	struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
1180	loff_t off = page->index << PAGE_CACHE_SHIFT;
1181	loff_t size, len;
1182	int ret;
1183
1184	size = i_size_read(inode);
1185	if (off + PAGE_CACHE_SIZE <= size)
1186		len = PAGE_CACHE_SIZE;
1187	else
1188		len = size & ~PAGE_CACHE_MASK;
1189
1190	dout("page_mkwrite %p %llu~%llu page %p idx %lu\n", inode,
1191	     off, len, page, page->index);
1192
1193	lock_page(page);
1194
1195	ret = VM_FAULT_NOPAGE;
1196	if ((off > size) ||
1197	    (page->mapping != inode->i_mapping))
1198		goto out;
1199
1200	ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
1201	if (ret == 0) {
1202		/* success.  we'll keep the page locked. */
1203		set_page_dirty(page);
1204		up_read(&mdsc->snap_rwsem);
1205		ret = VM_FAULT_LOCKED;
1206	} else {
1207		if (ret == -ENOMEM)
1208			ret = VM_FAULT_OOM;
1209		else
1210			ret = VM_FAULT_SIGBUS;
1211	}
1212out:
1213	dout("page_mkwrite %p %llu~%llu = %d\n", inode, off, len, ret);
1214	if (ret != VM_FAULT_LOCKED)
1215		unlock_page(page);
1216	return ret;
1217}
1218
1219static struct vm_operations_struct ceph_vmops = {
1220	.fault		= filemap_fault,
1221	.page_mkwrite	= ceph_page_mkwrite,
1222};
1223
1224int ceph_mmap(struct file *file, struct vm_area_struct *vma)
1225{
1226	struct address_space *mapping = file->f_mapping;
1227
1228	if (!mapping->a_ops->readpage)
1229		return -ENOEXEC;
1230	file_accessed(file);
1231	vma->vm_ops = &ceph_vmops;
1232	vma->vm_flags |= VM_CAN_NONLINEAR;
1233	return 0;
1234}
1235