journal.c revision 77c320eb46e216c17aee5c943949229ccfed6904
1/* 2 * bcache journalling code, for btree insertions 3 * 4 * Copyright 2012 Google, Inc. 5 */ 6 7#include "bcache.h" 8#include "btree.h" 9#include "debug.h" 10#include "request.h" 11 12#include <trace/events/bcache.h> 13 14/* 15 * Journal replay/recovery: 16 * 17 * This code is all driven from run_cache_set(); we first read the journal 18 * entries, do some other stuff, then we mark all the keys in the journal 19 * entries (same as garbage collection would), then we replay them - reinserting 20 * them into the cache in precisely the same order as they appear in the 21 * journal. 22 * 23 * We only journal keys that go in leaf nodes, which simplifies things quite a 24 * bit. 25 */ 26 27static void journal_read_endio(struct bio *bio, int error) 28{ 29 struct closure *cl = bio->bi_private; 30 closure_put(cl); 31} 32 33static int journal_read_bucket(struct cache *ca, struct list_head *list, 34 struct btree_op *op, unsigned bucket_index) 35{ 36 struct journal_device *ja = &ca->journal; 37 struct bio *bio = &ja->bio; 38 39 struct journal_replay *i; 40 struct jset *j, *data = ca->set->journal.w[0].data; 41 unsigned len, left, offset = 0; 42 int ret = 0; 43 sector_t bucket = bucket_to_sector(ca->set, ca->sb.d[bucket_index]); 44 45 pr_debug("reading %llu", (uint64_t) bucket); 46 47 while (offset < ca->sb.bucket_size) { 48reread: left = ca->sb.bucket_size - offset; 49 len = min_t(unsigned, left, PAGE_SECTORS * 8); 50 51 bio_reset(bio); 52 bio->bi_sector = bucket + offset; 53 bio->bi_bdev = ca->bdev; 54 bio->bi_rw = READ; 55 bio->bi_size = len << 9; 56 57 bio->bi_end_io = journal_read_endio; 58 bio->bi_private = &op->cl; 59 bch_bio_map(bio, data); 60 61 closure_bio_submit(bio, &op->cl, ca); 62 closure_sync(&op->cl); 63 64 /* This function could be simpler now since we no longer write 65 * journal entries that overlap bucket boundaries; this means 66 * the start of a bucket will always have a valid journal entry 67 * if it has any journal entries at all. 68 */ 69 70 j = data; 71 while (len) { 72 struct list_head *where; 73 size_t blocks, bytes = set_bytes(j); 74 75 if (j->magic != jset_magic(ca->set)) 76 return ret; 77 78 if (bytes > left << 9) 79 return ret; 80 81 if (bytes > len << 9) 82 goto reread; 83 84 if (j->csum != csum_set(j)) 85 return ret; 86 87 blocks = set_blocks(j, ca->set); 88 89 while (!list_empty(list)) { 90 i = list_first_entry(list, 91 struct journal_replay, list); 92 if (i->j.seq >= j->last_seq) 93 break; 94 list_del(&i->list); 95 kfree(i); 96 } 97 98 list_for_each_entry_reverse(i, list, list) { 99 if (j->seq == i->j.seq) 100 goto next_set; 101 102 if (j->seq < i->j.last_seq) 103 goto next_set; 104 105 if (j->seq > i->j.seq) { 106 where = &i->list; 107 goto add; 108 } 109 } 110 111 where = list; 112add: 113 i = kmalloc(offsetof(struct journal_replay, j) + 114 bytes, GFP_KERNEL); 115 if (!i) 116 return -ENOMEM; 117 memcpy(&i->j, j, bytes); 118 list_add(&i->list, where); 119 ret = 1; 120 121 ja->seq[bucket_index] = j->seq; 122next_set: 123 offset += blocks * ca->sb.block_size; 124 len -= blocks * ca->sb.block_size; 125 j = ((void *) j) + blocks * block_bytes(ca); 126 } 127 } 128 129 return ret; 130} 131 132int bch_journal_read(struct cache_set *c, struct list_head *list, 133 struct btree_op *op) 134{ 135#define read_bucket(b) \ 136 ({ \ 137 int ret = journal_read_bucket(ca, list, op, b); \ 138 __set_bit(b, bitmap); \ 139 if (ret < 0) \ 140 return ret; \ 141 ret; \ 142 }) 143 144 struct cache *ca; 145 unsigned iter; 146 147 for_each_cache(ca, c, iter) { 148 struct journal_device *ja = &ca->journal; 149 unsigned long bitmap[SB_JOURNAL_BUCKETS / BITS_PER_LONG]; 150 unsigned i, l, r, m; 151 uint64_t seq; 152 153 bitmap_zero(bitmap, SB_JOURNAL_BUCKETS); 154 pr_debug("%u journal buckets", ca->sb.njournal_buckets); 155 156 /* 157 * Read journal buckets ordered by golden ratio hash to quickly 158 * find a sequence of buckets with valid journal entries 159 */ 160 for (i = 0; i < ca->sb.njournal_buckets; i++) { 161 l = (i * 2654435769U) % ca->sb.njournal_buckets; 162 163 if (test_bit(l, bitmap)) 164 break; 165 166 if (read_bucket(l)) 167 goto bsearch; 168 } 169 170 /* 171 * If that fails, check all the buckets we haven't checked 172 * already 173 */ 174 pr_debug("falling back to linear search"); 175 176 for (l = find_first_zero_bit(bitmap, ca->sb.njournal_buckets); 177 l < ca->sb.njournal_buckets; 178 l = find_next_zero_bit(bitmap, ca->sb.njournal_buckets, l + 1)) 179 if (read_bucket(l)) 180 goto bsearch; 181 182 if (list_empty(list)) 183 continue; 184bsearch: 185 /* Binary search */ 186 m = r = find_next_bit(bitmap, ca->sb.njournal_buckets, l + 1); 187 pr_debug("starting binary search, l %u r %u", l, r); 188 189 while (l + 1 < r) { 190 seq = list_entry(list->prev, struct journal_replay, 191 list)->j.seq; 192 193 m = (l + r) >> 1; 194 read_bucket(m); 195 196 if (seq != list_entry(list->prev, struct journal_replay, 197 list)->j.seq) 198 l = m; 199 else 200 r = m; 201 } 202 203 /* 204 * Read buckets in reverse order until we stop finding more 205 * journal entries 206 */ 207 pr_debug("finishing up: m %u njournal_buckets %u", 208 m, ca->sb.njournal_buckets); 209 l = m; 210 211 while (1) { 212 if (!l--) 213 l = ca->sb.njournal_buckets - 1; 214 215 if (l == m) 216 break; 217 218 if (test_bit(l, bitmap)) 219 continue; 220 221 if (!read_bucket(l)) 222 break; 223 } 224 225 seq = 0; 226 227 for (i = 0; i < ca->sb.njournal_buckets; i++) 228 if (ja->seq[i] > seq) { 229 seq = ja->seq[i]; 230 ja->cur_idx = ja->discard_idx = 231 ja->last_idx = i; 232 233 } 234 } 235 236 if (!list_empty(list)) 237 c->journal.seq = list_entry(list->prev, 238 struct journal_replay, 239 list)->j.seq; 240 241 return 0; 242#undef read_bucket 243} 244 245void bch_journal_mark(struct cache_set *c, struct list_head *list) 246{ 247 atomic_t p = { 0 }; 248 struct bkey *k; 249 struct journal_replay *i; 250 struct journal *j = &c->journal; 251 uint64_t last = j->seq; 252 253 /* 254 * journal.pin should never fill up - we never write a journal 255 * entry when it would fill up. But if for some reason it does, we 256 * iterate over the list in reverse order so that we can just skip that 257 * refcount instead of bugging. 258 */ 259 260 list_for_each_entry_reverse(i, list, list) { 261 BUG_ON(last < i->j.seq); 262 i->pin = NULL; 263 264 while (last-- != i->j.seq) 265 if (fifo_free(&j->pin) > 1) { 266 fifo_push_front(&j->pin, p); 267 atomic_set(&fifo_front(&j->pin), 0); 268 } 269 270 if (fifo_free(&j->pin) > 1) { 271 fifo_push_front(&j->pin, p); 272 i->pin = &fifo_front(&j->pin); 273 atomic_set(i->pin, 1); 274 } 275 276 for (k = i->j.start; 277 k < end(&i->j); 278 k = bkey_next(k)) { 279 unsigned j; 280 281 for (j = 0; j < KEY_PTRS(k); j++) { 282 struct bucket *g = PTR_BUCKET(c, k, j); 283 atomic_inc(&g->pin); 284 285 if (g->prio == BTREE_PRIO && 286 !ptr_stale(c, k, j)) 287 g->prio = INITIAL_PRIO; 288 } 289 290 __bch_btree_mark_key(c, 0, k); 291 } 292 } 293} 294 295int bch_journal_replay(struct cache_set *s, struct list_head *list, 296 struct btree_op *op) 297{ 298 int ret = 0, keys = 0, entries = 0; 299 struct bkey *k; 300 struct journal_replay *i = 301 list_entry(list->prev, struct journal_replay, list); 302 303 uint64_t start = i->j.last_seq, end = i->j.seq, n = start; 304 305 list_for_each_entry(i, list, list) { 306 BUG_ON(i->pin && atomic_read(i->pin) != 1); 307 308 cache_set_err_on(n != i->j.seq, s, 309"bcache: journal entries %llu-%llu missing! (replaying %llu-%llu)", 310 n, i->j.seq - 1, start, end); 311 312 for (k = i->j.start; 313 k < end(&i->j); 314 k = bkey_next(k)) { 315 trace_bcache_journal_replay_key(k); 316 317 bkey_copy(op->keys.top, k); 318 bch_keylist_push(&op->keys); 319 320 op->journal = i->pin; 321 atomic_inc(op->journal); 322 323 ret = bch_btree_insert(op, s); 324 if (ret) 325 goto err; 326 327 BUG_ON(!bch_keylist_empty(&op->keys)); 328 keys++; 329 330 cond_resched(); 331 } 332 333 if (i->pin) 334 atomic_dec(i->pin); 335 n = i->j.seq + 1; 336 entries++; 337 } 338 339 pr_info("journal replay done, %i keys in %i entries, seq %llu", 340 keys, entries, end); 341 342 while (!list_empty(list)) { 343 i = list_first_entry(list, struct journal_replay, list); 344 list_del(&i->list); 345 kfree(i); 346 } 347err: 348 closure_sync(&op->cl); 349 return ret; 350} 351 352/* Journalling */ 353 354static void btree_flush_write(struct cache_set *c) 355{ 356 /* 357 * Try to find the btree node with that references the oldest journal 358 * entry, best is our current candidate and is locked if non NULL: 359 */ 360 struct btree *b, *best = NULL; 361 unsigned iter; 362 363 for_each_cached_btree(b, c, iter) { 364 if (!down_write_trylock(&b->lock)) 365 continue; 366 367 if (!btree_node_dirty(b) || 368 !btree_current_write(b)->journal) { 369 rw_unlock(true, b); 370 continue; 371 } 372 373 if (!best) 374 best = b; 375 else if (journal_pin_cmp(c, 376 btree_current_write(best), 377 btree_current_write(b))) { 378 rw_unlock(true, best); 379 best = b; 380 } else 381 rw_unlock(true, b); 382 } 383 384 if (best) 385 goto out; 386 387 /* We can't find the best btree node, just pick the first */ 388 list_for_each_entry(b, &c->btree_cache, list) 389 if (!b->level && btree_node_dirty(b)) { 390 best = b; 391 rw_lock(true, best, best->level); 392 goto found; 393 } 394 395out: 396 if (!best) 397 return; 398found: 399 if (btree_node_dirty(best)) 400 bch_btree_node_write(best, NULL); 401 rw_unlock(true, best); 402} 403 404#define last_seq(j) ((j)->seq - fifo_used(&(j)->pin) + 1) 405 406static void journal_discard_endio(struct bio *bio, int error) 407{ 408 struct journal_device *ja = 409 container_of(bio, struct journal_device, discard_bio); 410 struct cache *ca = container_of(ja, struct cache, journal); 411 412 atomic_set(&ja->discard_in_flight, DISCARD_DONE); 413 414 closure_wake_up(&ca->set->journal.wait); 415 closure_put(&ca->set->cl); 416} 417 418static void journal_discard_work(struct work_struct *work) 419{ 420 struct journal_device *ja = 421 container_of(work, struct journal_device, discard_work); 422 423 submit_bio(0, &ja->discard_bio); 424} 425 426static void do_journal_discard(struct cache *ca) 427{ 428 struct journal_device *ja = &ca->journal; 429 struct bio *bio = &ja->discard_bio; 430 431 if (!ca->discard) { 432 ja->discard_idx = ja->last_idx; 433 return; 434 } 435 436 switch (atomic_read(&ja->discard_in_flight)) { 437 case DISCARD_IN_FLIGHT: 438 return; 439 440 case DISCARD_DONE: 441 ja->discard_idx = (ja->discard_idx + 1) % 442 ca->sb.njournal_buckets; 443 444 atomic_set(&ja->discard_in_flight, DISCARD_READY); 445 /* fallthrough */ 446 447 case DISCARD_READY: 448 if (ja->discard_idx == ja->last_idx) 449 return; 450 451 atomic_set(&ja->discard_in_flight, DISCARD_IN_FLIGHT); 452 453 bio_init(bio); 454 bio->bi_sector = bucket_to_sector(ca->set, 455 ca->sb.d[ja->discard_idx]); 456 bio->bi_bdev = ca->bdev; 457 bio->bi_rw = REQ_WRITE|REQ_DISCARD; 458 bio->bi_max_vecs = 1; 459 bio->bi_io_vec = bio->bi_inline_vecs; 460 bio->bi_size = bucket_bytes(ca); 461 bio->bi_end_io = journal_discard_endio; 462 463 closure_get(&ca->set->cl); 464 INIT_WORK(&ja->discard_work, journal_discard_work); 465 schedule_work(&ja->discard_work); 466 } 467} 468 469static void journal_reclaim(struct cache_set *c) 470{ 471 struct bkey *k = &c->journal.key; 472 struct cache *ca; 473 uint64_t last_seq; 474 unsigned iter, n = 0; 475 atomic_t p; 476 477 while (!atomic_read(&fifo_front(&c->journal.pin))) 478 fifo_pop(&c->journal.pin, p); 479 480 last_seq = last_seq(&c->journal); 481 482 /* Update last_idx */ 483 484 for_each_cache(ca, c, iter) { 485 struct journal_device *ja = &ca->journal; 486 487 while (ja->last_idx != ja->cur_idx && 488 ja->seq[ja->last_idx] < last_seq) 489 ja->last_idx = (ja->last_idx + 1) % 490 ca->sb.njournal_buckets; 491 } 492 493 for_each_cache(ca, c, iter) 494 do_journal_discard(ca); 495 496 if (c->journal.blocks_free) 497 return; 498 499 /* 500 * Allocate: 501 * XXX: Sort by free journal space 502 */ 503 504 for_each_cache(ca, c, iter) { 505 struct journal_device *ja = &ca->journal; 506 unsigned next = (ja->cur_idx + 1) % ca->sb.njournal_buckets; 507 508 /* No space available on this device */ 509 if (next == ja->discard_idx) 510 continue; 511 512 ja->cur_idx = next; 513 k->ptr[n++] = PTR(0, 514 bucket_to_sector(c, ca->sb.d[ja->cur_idx]), 515 ca->sb.nr_this_dev); 516 } 517 518 bkey_init(k); 519 SET_KEY_PTRS(k, n); 520 521 if (n) 522 c->journal.blocks_free = c->sb.bucket_size >> c->block_bits; 523 524 if (!journal_full(&c->journal)) 525 __closure_wake_up(&c->journal.wait); 526} 527 528void bch_journal_next(struct journal *j) 529{ 530 atomic_t p = { 1 }; 531 532 j->cur = (j->cur == j->w) 533 ? &j->w[1] 534 : &j->w[0]; 535 536 /* 537 * The fifo_push() needs to happen at the same time as j->seq is 538 * incremented for last_seq() to be calculated correctly 539 */ 540 BUG_ON(!fifo_push(&j->pin, p)); 541 atomic_set(&fifo_back(&j->pin), 1); 542 543 j->cur->data->seq = ++j->seq; 544 j->cur->need_write = false; 545 j->cur->data->keys = 0; 546 547 if (fifo_full(&j->pin)) 548 pr_debug("journal_pin full (%zu)", fifo_used(&j->pin)); 549} 550 551static void journal_write_endio(struct bio *bio, int error) 552{ 553 struct journal_write *w = bio->bi_private; 554 555 cache_set_err_on(error, w->c, "journal io error"); 556 closure_put(&w->c->journal.io); 557} 558 559static void journal_write(struct closure *); 560 561static void journal_write_done(struct closure *cl) 562{ 563 struct journal *j = container_of(cl, struct journal, io); 564 struct journal_write *w = (j->cur == j->w) 565 ? &j->w[1] 566 : &j->w[0]; 567 568 __closure_wake_up(&w->wait); 569 continue_at_nobarrier(cl, journal_write, system_wq); 570} 571 572static void journal_write_unlocked(struct closure *cl) 573 __releases(c->journal.lock) 574{ 575 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 576 struct cache *ca; 577 struct journal_write *w = c->journal.cur; 578 struct bkey *k = &c->journal.key; 579 unsigned i, sectors = set_blocks(w->data, c) * c->sb.block_size; 580 581 struct bio *bio; 582 struct bio_list list; 583 bio_list_init(&list); 584 585 if (!w->need_write) { 586 /* 587 * XXX: have to unlock closure before we unlock journal lock, 588 * else we race with bch_journal(). But this way we race 589 * against cache set unregister. Doh. 590 */ 591 set_closure_fn(cl, NULL, NULL); 592 closure_sub(cl, CLOSURE_RUNNING + 1); 593 spin_unlock(&c->journal.lock); 594 return; 595 } else if (journal_full(&c->journal)) { 596 journal_reclaim(c); 597 spin_unlock(&c->journal.lock); 598 599 btree_flush_write(c); 600 continue_at(cl, journal_write, system_wq); 601 } 602 603 c->journal.blocks_free -= set_blocks(w->data, c); 604 605 w->data->btree_level = c->root->level; 606 607 bkey_copy(&w->data->btree_root, &c->root->key); 608 bkey_copy(&w->data->uuid_bucket, &c->uuid_bucket); 609 610 for_each_cache(ca, c, i) 611 w->data->prio_bucket[ca->sb.nr_this_dev] = ca->prio_buckets[0]; 612 613 w->data->magic = jset_magic(c); 614 w->data->version = BCACHE_JSET_VERSION; 615 w->data->last_seq = last_seq(&c->journal); 616 w->data->csum = csum_set(w->data); 617 618 for (i = 0; i < KEY_PTRS(k); i++) { 619 ca = PTR_CACHE(c, k, i); 620 bio = &ca->journal.bio; 621 622 atomic_long_add(sectors, &ca->meta_sectors_written); 623 624 bio_reset(bio); 625 bio->bi_sector = PTR_OFFSET(k, i); 626 bio->bi_bdev = ca->bdev; 627 bio->bi_rw = REQ_WRITE|REQ_SYNC|REQ_META|REQ_FLUSH|REQ_FUA; 628 bio->bi_size = sectors << 9; 629 630 bio->bi_end_io = journal_write_endio; 631 bio->bi_private = w; 632 bch_bio_map(bio, w->data); 633 634 trace_bcache_journal_write(bio); 635 bio_list_add(&list, bio); 636 637 SET_PTR_OFFSET(k, i, PTR_OFFSET(k, i) + sectors); 638 639 ca->journal.seq[ca->journal.cur_idx] = w->data->seq; 640 } 641 642 atomic_dec_bug(&fifo_back(&c->journal.pin)); 643 bch_journal_next(&c->journal); 644 journal_reclaim(c); 645 646 spin_unlock(&c->journal.lock); 647 648 while ((bio = bio_list_pop(&list))) 649 closure_bio_submit(bio, cl, c->cache[0]); 650 651 continue_at(cl, journal_write_done, NULL); 652} 653 654static void journal_write(struct closure *cl) 655{ 656 struct cache_set *c = container_of(cl, struct cache_set, journal.io); 657 658 spin_lock(&c->journal.lock); 659 journal_write_unlocked(cl); 660} 661 662static void __journal_try_write(struct cache_set *c, bool noflush) 663 __releases(c->journal.lock) 664{ 665 struct closure *cl = &c->journal.io; 666 struct journal_write *w = c->journal.cur; 667 668 w->need_write = true; 669 670 if (!closure_trylock(cl, &c->cl)) 671 spin_unlock(&c->journal.lock); 672 else if (noflush && journal_full(&c->journal)) { 673 spin_unlock(&c->journal.lock); 674 continue_at(cl, journal_write, system_wq); 675 } else 676 journal_write_unlocked(cl); 677} 678 679#define journal_try_write(c) __journal_try_write(c, false) 680 681void bch_journal_meta(struct cache_set *c, struct closure *cl) 682{ 683 struct journal_write *w; 684 685 if (CACHE_SYNC(&c->sb)) { 686 spin_lock(&c->journal.lock); 687 w = c->journal.cur; 688 689 if (cl) 690 BUG_ON(!closure_wait(&w->wait, cl)); 691 692 __journal_try_write(c, true); 693 } 694} 695 696static void journal_write_work(struct work_struct *work) 697{ 698 struct cache_set *c = container_of(to_delayed_work(work), 699 struct cache_set, 700 journal.work); 701 spin_lock(&c->journal.lock); 702 journal_try_write(c); 703} 704 705/* 706 * Entry point to the journalling code - bio_insert() and btree_invalidate() 707 * pass bch_journal() a list of keys to be journalled, and then 708 * bch_journal() hands those same keys off to btree_insert_async() 709 */ 710 711void bch_journal(struct closure *cl) 712{ 713 struct btree_op *op = container_of(cl, struct btree_op, cl); 714 struct cache_set *c = op->c; 715 struct journal_write *w; 716 size_t b, n = ((uint64_t *) op->keys.top) - op->keys.list; 717 718 if (op->type != BTREE_INSERT || 719 !CACHE_SYNC(&c->sb)) 720 goto out; 721 722 /* 723 * If we're looping because we errored, might already be waiting on 724 * another journal write: 725 */ 726 while (atomic_read(&cl->parent->remaining) & CLOSURE_WAITING) 727 closure_sync(cl->parent); 728 729 spin_lock(&c->journal.lock); 730 731 if (journal_full(&c->journal)) { 732 trace_bcache_journal_full(c); 733 734 closure_wait(&c->journal.wait, cl); 735 736 journal_reclaim(c); 737 spin_unlock(&c->journal.lock); 738 739 btree_flush_write(c); 740 continue_at(cl, bch_journal, bcache_wq); 741 } 742 743 w = c->journal.cur; 744 b = __set_blocks(w->data, w->data->keys + n, c); 745 746 if (b * c->sb.block_size > PAGE_SECTORS << JSET_BITS || 747 b > c->journal.blocks_free) { 748 trace_bcache_journal_entry_full(c); 749 750 /* 751 * XXX: If we were inserting so many keys that they won't fit in 752 * an _empty_ journal write, we'll deadlock. For now, handle 753 * this in bch_keylist_realloc() - but something to think about. 754 */ 755 BUG_ON(!w->data->keys); 756 757 BUG_ON(!closure_wait(&w->wait, cl)); 758 759 journal_try_write(c); 760 continue_at(cl, bch_journal, bcache_wq); 761 } 762 763 memcpy(end(w->data), op->keys.list, n * sizeof(uint64_t)); 764 w->data->keys += n; 765 766 op->journal = &fifo_back(&c->journal.pin); 767 atomic_inc(op->journal); 768 769 if (op->flush_journal) { 770 closure_wait(&w->wait, cl->parent); 771 journal_try_write(c); 772 } else if (!w->need_write) { 773 schedule_delayed_work(&c->journal.work, 774 msecs_to_jiffies(c->journal_delay_ms)); 775 spin_unlock(&c->journal.lock); 776 } else { 777 spin_unlock(&c->journal.lock); 778 } 779out: 780 bch_btree_insert_async(cl); 781} 782 783void bch_journal_free(struct cache_set *c) 784{ 785 free_pages((unsigned long) c->journal.w[1].data, JSET_BITS); 786 free_pages((unsigned long) c->journal.w[0].data, JSET_BITS); 787 free_fifo(&c->journal.pin); 788} 789 790int bch_journal_alloc(struct cache_set *c) 791{ 792 struct journal *j = &c->journal; 793 794 closure_init_unlocked(&j->io); 795 spin_lock_init(&j->lock); 796 INIT_DELAYED_WORK(&j->work, journal_write_work); 797 798 c->journal_delay_ms = 100; 799 800 j->w[0].c = c; 801 j->w[1].c = c; 802 803 if (!(init_fifo(&j->pin, JOURNAL_PIN, GFP_KERNEL)) || 804 !(j->w[0].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS)) || 805 !(j->w[1].data = (void *) __get_free_pages(GFP_KERNEL, JSET_BITS))) 806 return -ENOMEM; 807 808 return 0; 809} 810