drbd_worker.c revision c37c8ecfee685fa42de8fd418ad8ca1e66408bd8
1/* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26#include <linux/module.h> 27#include <linux/drbd.h> 28#include <linux/sched.h> 29#include <linux/wait.h> 30#include <linux/mm.h> 31#include <linux/memcontrol.h> 32#include <linux/mm_inline.h> 33#include <linux/slab.h> 34#include <linux/random.h> 35#include <linux/string.h> 36#include <linux/scatterlist.h> 37 38#include "drbd_int.h" 39#include "drbd_req.h" 40 41static int w_make_ov_request(struct drbd_work *w, int cancel); 42 43 44/* endio handlers: 45 * drbd_md_io_complete (defined here) 46 * drbd_request_endio (defined here) 47 * drbd_peer_request_endio (defined here) 48 * bm_async_io_complete (defined in drbd_bitmap.c) 49 * 50 * For all these callbacks, note the following: 51 * The callbacks will be called in irq context by the IDE drivers, 52 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 53 * Try to get the locking right :) 54 * 55 */ 56 57 58/* About the global_state_lock 59 Each state transition on an device holds a read lock. In case we have 60 to evaluate the sync after dependencies, we grab a write lock, because 61 we need stable states on all devices for that. */ 62rwlock_t global_state_lock; 63 64/* used for synchronous meta data and bitmap IO 65 * submitted by drbd_md_sync_page_io() 66 */ 67void drbd_md_io_complete(struct bio *bio, int error) 68{ 69 struct drbd_md_io *md_io; 70 71 md_io = (struct drbd_md_io *)bio->bi_private; 72 md_io->error = error; 73 74 complete(&md_io->event); 75} 76 77/* reads on behalf of the partner, 78 * "submitted" by the receiver 79 */ 80void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local) 81{ 82 unsigned long flags = 0; 83 struct drbd_conf *mdev = peer_req->w.mdev; 84 85 spin_lock_irqsave(&mdev->tconn->req_lock, flags); 86 mdev->read_cnt += peer_req->i.size >> 9; 87 list_del(&peer_req->w.list); 88 if (list_empty(&mdev->read_ee)) 89 wake_up(&mdev->ee_wait); 90 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 91 __drbd_chk_io_error(mdev, false); 92 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags); 93 94 drbd_queue_work(&mdev->tconn->data.work, &peer_req->w); 95 put_ldev(mdev); 96} 97 98/* writes on behalf of the partner, or resync writes, 99 * "submitted" by the receiver, final stage. */ 100static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local) 101{ 102 unsigned long flags = 0; 103 struct drbd_conf *mdev = peer_req->w.mdev; 104 struct drbd_interval i; 105 int do_wake; 106 u64 block_id; 107 int do_al_complete_io; 108 109 /* after we moved peer_req to done_ee, 110 * we may no longer access it, 111 * it may be freed/reused already! 112 * (as soon as we release the req_lock) */ 113 i = peer_req->i; 114 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO; 115 block_id = peer_req->block_id; 116 117 spin_lock_irqsave(&mdev->tconn->req_lock, flags); 118 mdev->writ_cnt += peer_req->i.size >> 9; 119 list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */ 120 list_add_tail(&peer_req->w.list, &mdev->done_ee); 121 122 /* 123 * Do not remove from the write_requests tree here: we did not send the 124 * Ack yet and did not wake possibly waiting conflicting requests. 125 * Removed from the tree from "drbd_process_done_ee" within the 126 * appropriate w.cb (e_end_block/e_end_resync_block) or from 127 * _drbd_clear_done_ee. 128 */ 129 130 do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee); 131 132 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 133 __drbd_chk_io_error(mdev, false); 134 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags); 135 136 if (block_id == ID_SYNCER) 137 drbd_rs_complete_io(mdev, i.sector); 138 139 if (do_wake) 140 wake_up(&mdev->ee_wait); 141 142 if (do_al_complete_io) 143 drbd_al_complete_io(mdev, &i); 144 145 wake_asender(mdev->tconn); 146 put_ldev(mdev); 147} 148 149/* writes on behalf of the partner, or resync writes, 150 * "submitted" by the receiver. 151 */ 152void drbd_peer_request_endio(struct bio *bio, int error) 153{ 154 struct drbd_peer_request *peer_req = bio->bi_private; 155 struct drbd_conf *mdev = peer_req->w.mdev; 156 int uptodate = bio_flagged(bio, BIO_UPTODATE); 157 int is_write = bio_data_dir(bio) == WRITE; 158 159 if (error && __ratelimit(&drbd_ratelimit_state)) 160 dev_warn(DEV, "%s: error=%d s=%llus\n", 161 is_write ? "write" : "read", error, 162 (unsigned long long)peer_req->i.sector); 163 if (!error && !uptodate) { 164 if (__ratelimit(&drbd_ratelimit_state)) 165 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n", 166 is_write ? "write" : "read", 167 (unsigned long long)peer_req->i.sector); 168 /* strange behavior of some lower level drivers... 169 * fail the request by clearing the uptodate flag, 170 * but do not return any error?! */ 171 error = -EIO; 172 } 173 174 if (error) 175 set_bit(__EE_WAS_ERROR, &peer_req->flags); 176 177 bio_put(bio); /* no need for the bio anymore */ 178 if (atomic_dec_and_test(&peer_req->pending_bios)) { 179 if (is_write) 180 drbd_endio_write_sec_final(peer_req); 181 else 182 drbd_endio_read_sec_final(peer_req); 183 } 184} 185 186/* read, readA or write requests on R_PRIMARY coming from drbd_make_request 187 */ 188void drbd_request_endio(struct bio *bio, int error) 189{ 190 unsigned long flags; 191 struct drbd_request *req = bio->bi_private; 192 struct drbd_conf *mdev = req->w.mdev; 193 struct bio_and_error m; 194 enum drbd_req_event what; 195 int uptodate = bio_flagged(bio, BIO_UPTODATE); 196 197 if (!error && !uptodate) { 198 dev_warn(DEV, "p %s: setting error to -EIO\n", 199 bio_data_dir(bio) == WRITE ? "write" : "read"); 200 /* strange behavior of some lower level drivers... 201 * fail the request by clearing the uptodate flag, 202 * but do not return any error?! */ 203 error = -EIO; 204 } 205 206 /* to avoid recursion in __req_mod */ 207 if (unlikely(error)) { 208 what = (bio_data_dir(bio) == WRITE) 209 ? WRITE_COMPLETED_WITH_ERROR 210 : (bio_rw(bio) == READ) 211 ? READ_COMPLETED_WITH_ERROR 212 : READ_AHEAD_COMPLETED_WITH_ERROR; 213 } else 214 what = COMPLETED_OK; 215 216 bio_put(req->private_bio); 217 req->private_bio = ERR_PTR(error); 218 219 /* not req_mod(), we need irqsave here! */ 220 spin_lock_irqsave(&mdev->tconn->req_lock, flags); 221 __req_mod(req, what, &m); 222 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags); 223 224 if (m.bio) 225 complete_master_bio(mdev, &m); 226} 227 228int w_read_retry_remote(struct drbd_work *w, int cancel) 229{ 230 struct drbd_request *req = container_of(w, struct drbd_request, w); 231 struct drbd_conf *mdev = w->mdev; 232 233 /* We should not detach for read io-error, 234 * but try to WRITE the P_DATA_REPLY to the failed location, 235 * to give the disk the chance to relocate that block */ 236 237 spin_lock_irq(&mdev->tconn->req_lock); 238 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) { 239 _req_mod(req, READ_RETRY_REMOTE_CANCELED); 240 spin_unlock_irq(&mdev->tconn->req_lock); 241 return 0; 242 } 243 spin_unlock_irq(&mdev->tconn->req_lock); 244 245 return w_send_read_req(w, 0); 246} 247 248void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, 249 struct drbd_peer_request *peer_req, void *digest) 250{ 251 struct hash_desc desc; 252 struct scatterlist sg; 253 struct page *page = peer_req->pages; 254 struct page *tmp; 255 unsigned len; 256 257 desc.tfm = tfm; 258 desc.flags = 0; 259 260 sg_init_table(&sg, 1); 261 crypto_hash_init(&desc); 262 263 while ((tmp = page_chain_next(page))) { 264 /* all but the last page will be fully used */ 265 sg_set_page(&sg, page, PAGE_SIZE, 0); 266 crypto_hash_update(&desc, &sg, sg.length); 267 page = tmp; 268 } 269 /* and now the last, possibly only partially used page */ 270 len = peer_req->i.size & (PAGE_SIZE - 1); 271 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 272 crypto_hash_update(&desc, &sg, sg.length); 273 crypto_hash_final(&desc, digest); 274} 275 276void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) 277{ 278 struct hash_desc desc; 279 struct scatterlist sg; 280 struct bio_vec *bvec; 281 int i; 282 283 desc.tfm = tfm; 284 desc.flags = 0; 285 286 sg_init_table(&sg, 1); 287 crypto_hash_init(&desc); 288 289 __bio_for_each_segment(bvec, bio, i, 0) { 290 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset); 291 crypto_hash_update(&desc, &sg, sg.length); 292 } 293 crypto_hash_final(&desc, digest); 294} 295 296/* MAYBE merge common code with w_e_end_ov_req */ 297static int w_e_send_csum(struct drbd_work *w, int cancel) 298{ 299 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 300 struct drbd_conf *mdev = w->mdev; 301 int digest_size; 302 void *digest; 303 int err = 0; 304 305 if (unlikely(cancel)) 306 goto out; 307 308 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0)) 309 goto out; 310 311 digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm); 312 digest = kmalloc(digest_size, GFP_NOIO); 313 if (digest) { 314 sector_t sector = peer_req->i.sector; 315 unsigned int size = peer_req->i.size; 316 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest); 317 /* Free peer_req and pages before send. 318 * In case we block on congestion, we could otherwise run into 319 * some distributed deadlock, if the other side blocks on 320 * congestion as well, because our receiver blocks in 321 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 322 drbd_free_peer_req(mdev, peer_req); 323 peer_req = NULL; 324 inc_rs_pending(mdev); 325 err = drbd_send_drequest_csum(mdev, sector, size, 326 digest, digest_size, 327 P_CSUM_RS_REQUEST); 328 kfree(digest); 329 } else { 330 dev_err(DEV, "kmalloc() of digest failed.\n"); 331 err = -ENOMEM; 332 } 333 334out: 335 if (peer_req) 336 drbd_free_peer_req(mdev, peer_req); 337 338 if (unlikely(err)) 339 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n"); 340 return err; 341} 342 343#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 344 345static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size) 346{ 347 struct drbd_peer_request *peer_req; 348 349 if (!get_ldev(mdev)) 350 return -EIO; 351 352 if (drbd_rs_should_slow_down(mdev, sector)) 353 goto defer; 354 355 /* GFP_TRY, because if there is no memory available right now, this may 356 * be rescheduled for later. It is "only" background resync, after all. */ 357 peer_req = drbd_alloc_peer_req(mdev, ID_SYNCER /* unused */, sector, 358 size, GFP_TRY); 359 if (!peer_req) 360 goto defer; 361 362 peer_req->w.cb = w_e_send_csum; 363 spin_lock_irq(&mdev->tconn->req_lock); 364 list_add(&peer_req->w.list, &mdev->read_ee); 365 spin_unlock_irq(&mdev->tconn->req_lock); 366 367 atomic_add(size >> 9, &mdev->rs_sect_ev); 368 if (drbd_submit_peer_request(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0) 369 return 0; 370 371 /* If it failed because of ENOMEM, retry should help. If it failed 372 * because bio_add_page failed (probably broken lower level driver), 373 * retry may or may not help. 374 * If it does not, you may need to force disconnect. */ 375 spin_lock_irq(&mdev->tconn->req_lock); 376 list_del(&peer_req->w.list); 377 spin_unlock_irq(&mdev->tconn->req_lock); 378 379 drbd_free_peer_req(mdev, peer_req); 380defer: 381 put_ldev(mdev); 382 return -EAGAIN; 383} 384 385int w_resync_timer(struct drbd_work *w, int cancel) 386{ 387 struct drbd_conf *mdev = w->mdev; 388 switch (mdev->state.conn) { 389 case C_VERIFY_S: 390 w_make_ov_request(w, cancel); 391 break; 392 case C_SYNC_TARGET: 393 w_make_resync_request(w, cancel); 394 break; 395 } 396 397 return 0; 398} 399 400void resync_timer_fn(unsigned long data) 401{ 402 struct drbd_conf *mdev = (struct drbd_conf *) data; 403 404 if (list_empty(&mdev->resync_work.list)) 405 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work); 406} 407 408static void fifo_set(struct fifo_buffer *fb, int value) 409{ 410 int i; 411 412 for (i = 0; i < fb->size; i++) 413 fb->values[i] = value; 414} 415 416static int fifo_push(struct fifo_buffer *fb, int value) 417{ 418 int ov; 419 420 ov = fb->values[fb->head_index]; 421 fb->values[fb->head_index++] = value; 422 423 if (fb->head_index >= fb->size) 424 fb->head_index = 0; 425 426 return ov; 427} 428 429static void fifo_add_val(struct fifo_buffer *fb, int value) 430{ 431 int i; 432 433 for (i = 0; i < fb->size; i++) 434 fb->values[i] += value; 435} 436 437static int drbd_rs_controller(struct drbd_conf *mdev) 438{ 439 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 440 unsigned int want; /* The number of sectors we want in the proxy */ 441 int req_sect; /* Number of sectors to request in this turn */ 442 int correction; /* Number of sectors more we need in the proxy*/ 443 int cps; /* correction per invocation of drbd_rs_controller() */ 444 int steps; /* Number of time steps to plan ahead */ 445 int curr_corr; 446 int max_sect; 447 448 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */ 449 mdev->rs_in_flight -= sect_in; 450 451 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */ 452 453 steps = mdev->rs_plan_s.size; /* (mdev->ldev->dc.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 454 455 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */ 456 want = ((mdev->ldev->dc.resync_rate * 2 * SLEEP_TIME) / HZ) * steps; 457 } else { /* normal path */ 458 want = mdev->ldev->dc.c_fill_target ? mdev->ldev->dc.c_fill_target : 459 sect_in * mdev->ldev->dc.c_delay_target * HZ / (SLEEP_TIME * 10); 460 } 461 462 correction = want - mdev->rs_in_flight - mdev->rs_planed; 463 464 /* Plan ahead */ 465 cps = correction / steps; 466 fifo_add_val(&mdev->rs_plan_s, cps); 467 mdev->rs_planed += cps * steps; 468 469 /* What we do in this step */ 470 curr_corr = fifo_push(&mdev->rs_plan_s, 0); 471 spin_unlock(&mdev->peer_seq_lock); 472 mdev->rs_planed -= curr_corr; 473 474 req_sect = sect_in + curr_corr; 475 if (req_sect < 0) 476 req_sect = 0; 477 478 max_sect = (mdev->ldev->dc.c_max_rate * 2 * SLEEP_TIME) / HZ; 479 if (req_sect > max_sect) 480 req_sect = max_sect; 481 482 /* 483 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 484 sect_in, mdev->rs_in_flight, want, correction, 485 steps, cps, mdev->rs_planed, curr_corr, req_sect); 486 */ 487 488 return req_sect; 489} 490 491static int drbd_rs_number_requests(struct drbd_conf *mdev) 492{ 493 int number; 494 if (mdev->rs_plan_s.size) { /* mdev->ldev->dc.c_plan_ahead */ 495 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9); 496 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 497 } else { 498 mdev->c_sync_rate = mdev->ldev->dc.resync_rate; 499 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 500 } 501 502 /* ignore the amount of pending requests, the resync controller should 503 * throttle down to incoming reply rate soon enough anyways. */ 504 return number; 505} 506 507int w_make_resync_request(struct drbd_work *w, int cancel) 508{ 509 struct drbd_conf *mdev = w->mdev; 510 unsigned long bit; 511 sector_t sector; 512 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 513 int max_bio_size; 514 int number, rollback_i, size; 515 int align, queued, sndbuf; 516 int i = 0; 517 518 if (unlikely(cancel)) 519 return 0; 520 521 if (mdev->rs_total == 0) { 522 /* empty resync? */ 523 drbd_resync_finished(mdev); 524 return 0; 525 } 526 527 if (!get_ldev(mdev)) { 528 /* Since we only need to access mdev->rsync a 529 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but 530 to continue resync with a broken disk makes no sense at 531 all */ 532 dev_err(DEV, "Disk broke down during resync!\n"); 533 return 0; 534 } 535 536 max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9; 537 number = drbd_rs_number_requests(mdev); 538 if (number == 0) 539 goto requeue; 540 541 for (i = 0; i < number; i++) { 542 /* Stop generating RS requests, when half of the send buffer is filled */ 543 mutex_lock(&mdev->tconn->data.mutex); 544 if (mdev->tconn->data.socket) { 545 queued = mdev->tconn->data.socket->sk->sk_wmem_queued; 546 sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf; 547 } else { 548 queued = 1; 549 sndbuf = 0; 550 } 551 mutex_unlock(&mdev->tconn->data.mutex); 552 if (queued > sndbuf / 2) 553 goto requeue; 554 555next_sector: 556 size = BM_BLOCK_SIZE; 557 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo); 558 559 if (bit == DRBD_END_OF_BITMAP) { 560 mdev->bm_resync_fo = drbd_bm_bits(mdev); 561 put_ldev(mdev); 562 return 0; 563 } 564 565 sector = BM_BIT_TO_SECT(bit); 566 567 if (drbd_rs_should_slow_down(mdev, sector) || 568 drbd_try_rs_begin_io(mdev, sector)) { 569 mdev->bm_resync_fo = bit; 570 goto requeue; 571 } 572 mdev->bm_resync_fo = bit + 1; 573 574 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) { 575 drbd_rs_complete_io(mdev, sector); 576 goto next_sector; 577 } 578 579#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE 580 /* try to find some adjacent bits. 581 * we stop if we have already the maximum req size. 582 * 583 * Additionally always align bigger requests, in order to 584 * be prepared for all stripe sizes of software RAIDs. 585 */ 586 align = 1; 587 rollback_i = i; 588 for (;;) { 589 if (size + BM_BLOCK_SIZE > max_bio_size) 590 break; 591 592 /* Be always aligned */ 593 if (sector & ((1<<(align+3))-1)) 594 break; 595 596 /* do not cross extent boundaries */ 597 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 598 break; 599 /* now, is it actually dirty, after all? 600 * caution, drbd_bm_test_bit is tri-state for some 601 * obscure reason; ( b == 0 ) would get the out-of-band 602 * only accidentally right because of the "oddly sized" 603 * adjustment below */ 604 if (drbd_bm_test_bit(mdev, bit+1) != 1) 605 break; 606 bit++; 607 size += BM_BLOCK_SIZE; 608 if ((BM_BLOCK_SIZE << align) <= size) 609 align++; 610 i++; 611 } 612 /* if we merged some, 613 * reset the offset to start the next drbd_bm_find_next from */ 614 if (size > BM_BLOCK_SIZE) 615 mdev->bm_resync_fo = bit + 1; 616#endif 617 618 /* adjust very last sectors, in case we are oddly sized */ 619 if (sector + (size>>9) > capacity) 620 size = (capacity-sector)<<9; 621 if (mdev->tconn->agreed_pro_version >= 89 && mdev->tconn->csums_tfm) { 622 switch (read_for_csum(mdev, sector, size)) { 623 case -EIO: /* Disk failure */ 624 put_ldev(mdev); 625 return -EIO; 626 case -EAGAIN: /* allocation failed, or ldev busy */ 627 drbd_rs_complete_io(mdev, sector); 628 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector); 629 i = rollback_i; 630 goto requeue; 631 case 0: 632 /* everything ok */ 633 break; 634 default: 635 BUG(); 636 } 637 } else { 638 int err; 639 640 inc_rs_pending(mdev); 641 err = drbd_send_drequest(mdev, P_RS_DATA_REQUEST, 642 sector, size, ID_SYNCER); 643 if (err) { 644 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n"); 645 dec_rs_pending(mdev); 646 put_ldev(mdev); 647 return err; 648 } 649 } 650 } 651 652 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) { 653 /* last syncer _request_ was sent, 654 * but the P_RS_DATA_REPLY not yet received. sync will end (and 655 * next sync group will resume), as soon as we receive the last 656 * resync data block, and the last bit is cleared. 657 * until then resync "work" is "inactive" ... 658 */ 659 put_ldev(mdev); 660 return 0; 661 } 662 663 requeue: 664 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 665 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 666 put_ldev(mdev); 667 return 0; 668} 669 670static int w_make_ov_request(struct drbd_work *w, int cancel) 671{ 672 struct drbd_conf *mdev = w->mdev; 673 int number, i, size; 674 sector_t sector; 675 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 676 677 if (unlikely(cancel)) 678 return 1; 679 680 number = drbd_rs_number_requests(mdev); 681 682 sector = mdev->ov_position; 683 for (i = 0; i < number; i++) { 684 if (sector >= capacity) { 685 return 1; 686 } 687 688 size = BM_BLOCK_SIZE; 689 690 if (drbd_rs_should_slow_down(mdev, sector) || 691 drbd_try_rs_begin_io(mdev, sector)) { 692 mdev->ov_position = sector; 693 goto requeue; 694 } 695 696 if (sector + (size>>9) > capacity) 697 size = (capacity-sector)<<9; 698 699 inc_rs_pending(mdev); 700 if (drbd_send_ov_request(mdev, sector, size)) { 701 dec_rs_pending(mdev); 702 return 0; 703 } 704 sector += BM_SECT_PER_BIT; 705 } 706 mdev->ov_position = sector; 707 708 requeue: 709 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 710 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 711 return 1; 712} 713 714int w_ov_finished(struct drbd_work *w, int cancel) 715{ 716 struct drbd_conf *mdev = w->mdev; 717 kfree(w); 718 ov_out_of_sync_print(mdev); 719 drbd_resync_finished(mdev); 720 721 return 0; 722} 723 724static int w_resync_finished(struct drbd_work *w, int cancel) 725{ 726 struct drbd_conf *mdev = w->mdev; 727 kfree(w); 728 729 drbd_resync_finished(mdev); 730 731 return 0; 732} 733 734static void ping_peer(struct drbd_conf *mdev) 735{ 736 struct drbd_tconn *tconn = mdev->tconn; 737 738 clear_bit(GOT_PING_ACK, &tconn->flags); 739 request_ping(tconn); 740 wait_event(tconn->ping_wait, 741 test_bit(GOT_PING_ACK, &tconn->flags) || mdev->state.conn < C_CONNECTED); 742} 743 744int drbd_resync_finished(struct drbd_conf *mdev) 745{ 746 unsigned long db, dt, dbdt; 747 unsigned long n_oos; 748 union drbd_state os, ns; 749 struct drbd_work *w; 750 char *khelper_cmd = NULL; 751 int verify_done = 0; 752 753 /* Remove all elements from the resync LRU. Since future actions 754 * might set bits in the (main) bitmap, then the entries in the 755 * resync LRU would be wrong. */ 756 if (drbd_rs_del_all(mdev)) { 757 /* In case this is not possible now, most probably because 758 * there are P_RS_DATA_REPLY Packets lingering on the worker's 759 * queue (or even the read operations for those packets 760 * is not finished by now). Retry in 100ms. */ 761 762 schedule_timeout_interruptible(HZ / 10); 763 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC); 764 if (w) { 765 w->cb = w_resync_finished; 766 drbd_queue_work(&mdev->tconn->data.work, w); 767 return 1; 768 } 769 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n"); 770 } 771 772 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ; 773 if (dt <= 0) 774 dt = 1; 775 db = mdev->rs_total; 776 dbdt = Bit2KB(db/dt); 777 mdev->rs_paused /= HZ; 778 779 if (!get_ldev(mdev)) 780 goto out; 781 782 ping_peer(mdev); 783 784 spin_lock_irq(&mdev->tconn->req_lock); 785 os = drbd_read_state(mdev); 786 787 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T); 788 789 /* This protects us against multiple calls (that can happen in the presence 790 of application IO), and against connectivity loss just before we arrive here. */ 791 if (os.conn <= C_CONNECTED) 792 goto out_unlock; 793 794 ns = os; 795 ns.conn = C_CONNECTED; 796 797 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 798 verify_done ? "Online verify " : "Resync", 799 dt + mdev->rs_paused, mdev->rs_paused, dbdt); 800 801 n_oos = drbd_bm_total_weight(mdev); 802 803 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 804 if (n_oos) { 805 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n", 806 n_oos, Bit2KB(1)); 807 khelper_cmd = "out-of-sync"; 808 } 809 } else { 810 D_ASSERT((n_oos - mdev->rs_failed) == 0); 811 812 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 813 khelper_cmd = "after-resync-target"; 814 815 if (mdev->tconn->csums_tfm && mdev->rs_total) { 816 const unsigned long s = mdev->rs_same_csum; 817 const unsigned long t = mdev->rs_total; 818 const int ratio = 819 (t == 0) ? 0 : 820 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 821 dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; " 822 "transferred %luK total %luK\n", 823 ratio, 824 Bit2KB(mdev->rs_same_csum), 825 Bit2KB(mdev->rs_total - mdev->rs_same_csum), 826 Bit2KB(mdev->rs_total)); 827 } 828 } 829 830 if (mdev->rs_failed) { 831 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed); 832 833 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 834 ns.disk = D_INCONSISTENT; 835 ns.pdsk = D_UP_TO_DATE; 836 } else { 837 ns.disk = D_UP_TO_DATE; 838 ns.pdsk = D_INCONSISTENT; 839 } 840 } else { 841 ns.disk = D_UP_TO_DATE; 842 ns.pdsk = D_UP_TO_DATE; 843 844 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 845 if (mdev->p_uuid) { 846 int i; 847 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 848 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]); 849 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]); 850 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]); 851 } else { 852 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n"); 853 } 854 } 855 856 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) { 857 /* for verify runs, we don't update uuids here, 858 * so there would be nothing to report. */ 859 drbd_uuid_set_bm(mdev, 0UL); 860 drbd_print_uuids(mdev, "updated UUIDs"); 861 if (mdev->p_uuid) { 862 /* Now the two UUID sets are equal, update what we 863 * know of the peer. */ 864 int i; 865 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 866 mdev->p_uuid[i] = mdev->ldev->md.uuid[i]; 867 } 868 } 869 } 870 871 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 872out_unlock: 873 spin_unlock_irq(&mdev->tconn->req_lock); 874 put_ldev(mdev); 875out: 876 mdev->rs_total = 0; 877 mdev->rs_failed = 0; 878 mdev->rs_paused = 0; 879 if (verify_done) 880 mdev->ov_start_sector = 0; 881 882 drbd_md_sync(mdev); 883 884 if (khelper_cmd) 885 drbd_khelper(mdev, khelper_cmd); 886 887 return 1; 888} 889 890/* helper */ 891static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req) 892{ 893 if (drbd_peer_req_has_active_page(peer_req)) { 894 /* This might happen if sendpage() has not finished */ 895 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT; 896 atomic_add(i, &mdev->pp_in_use_by_net); 897 atomic_sub(i, &mdev->pp_in_use); 898 spin_lock_irq(&mdev->tconn->req_lock); 899 list_add_tail(&peer_req->w.list, &mdev->net_ee); 900 spin_unlock_irq(&mdev->tconn->req_lock); 901 wake_up(&drbd_pp_wait); 902 } else 903 drbd_free_peer_req(mdev, peer_req); 904} 905 906/** 907 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 908 * @mdev: DRBD device. 909 * @w: work object. 910 * @cancel: The connection will be closed anyways 911 */ 912int w_e_end_data_req(struct drbd_work *w, int cancel) 913{ 914 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 915 struct drbd_conf *mdev = w->mdev; 916 int err; 917 918 if (unlikely(cancel)) { 919 drbd_free_peer_req(mdev, peer_req); 920 dec_unacked(mdev); 921 return 0; 922 } 923 924 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 925 err = drbd_send_block(mdev, P_DATA_REPLY, peer_req); 926 } else { 927 if (__ratelimit(&drbd_ratelimit_state)) 928 dev_err(DEV, "Sending NegDReply. sector=%llus.\n", 929 (unsigned long long)peer_req->i.sector); 930 931 err = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req); 932 } 933 934 dec_unacked(mdev); 935 936 move_to_net_ee_or_free(mdev, peer_req); 937 938 if (unlikely(err)) 939 dev_err(DEV, "drbd_send_block() failed\n"); 940 return err; 941} 942 943/** 944 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS 945 * @mdev: DRBD device. 946 * @w: work object. 947 * @cancel: The connection will be closed anyways 948 */ 949int w_e_end_rsdata_req(struct drbd_work *w, int cancel) 950{ 951 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 952 struct drbd_conf *mdev = w->mdev; 953 int err; 954 955 if (unlikely(cancel)) { 956 drbd_free_peer_req(mdev, peer_req); 957 dec_unacked(mdev); 958 return 0; 959 } 960 961 if (get_ldev_if_state(mdev, D_FAILED)) { 962 drbd_rs_complete_io(mdev, peer_req->i.sector); 963 put_ldev(mdev); 964 } 965 966 if (mdev->state.conn == C_AHEAD) { 967 err = drbd_send_ack(mdev, P_RS_CANCEL, peer_req); 968 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 969 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) { 970 inc_rs_pending(mdev); 971 err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req); 972 } else { 973 if (__ratelimit(&drbd_ratelimit_state)) 974 dev_err(DEV, "Not sending RSDataReply, " 975 "partner DISKLESS!\n"); 976 err = 0; 977 } 978 } else { 979 if (__ratelimit(&drbd_ratelimit_state)) 980 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n", 981 (unsigned long long)peer_req->i.sector); 982 983 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req); 984 985 /* update resync data with failure */ 986 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size); 987 } 988 989 dec_unacked(mdev); 990 991 move_to_net_ee_or_free(mdev, peer_req); 992 993 if (unlikely(err)) 994 dev_err(DEV, "drbd_send_block() failed\n"); 995 return err; 996} 997 998int w_e_end_csum_rs_req(struct drbd_work *w, int cancel) 999{ 1000 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1001 struct drbd_conf *mdev = w->mdev; 1002 struct digest_info *di; 1003 int digest_size; 1004 void *digest = NULL; 1005 int err, eq = 0; 1006 1007 if (unlikely(cancel)) { 1008 drbd_free_peer_req(mdev, peer_req); 1009 dec_unacked(mdev); 1010 return 0; 1011 } 1012 1013 if (get_ldev(mdev)) { 1014 drbd_rs_complete_io(mdev, peer_req->i.sector); 1015 put_ldev(mdev); 1016 } 1017 1018 di = peer_req->digest; 1019 1020 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1021 /* quick hack to try to avoid a race against reconfiguration. 1022 * a real fix would be much more involved, 1023 * introducing more locking mechanisms */ 1024 if (mdev->tconn->csums_tfm) { 1025 digest_size = crypto_hash_digestsize(mdev->tconn->csums_tfm); 1026 D_ASSERT(digest_size == di->digest_size); 1027 digest = kmalloc(digest_size, GFP_NOIO); 1028 } 1029 if (digest) { 1030 drbd_csum_ee(mdev, mdev->tconn->csums_tfm, peer_req, digest); 1031 eq = !memcmp(digest, di->digest, digest_size); 1032 kfree(digest); 1033 } 1034 1035 if (eq) { 1036 drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size); 1037 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1038 mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT; 1039 err = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req); 1040 } else { 1041 inc_rs_pending(mdev); 1042 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1043 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */ 1044 kfree(di); 1045 err = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req); 1046 } 1047 } else { 1048 err = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req); 1049 if (__ratelimit(&drbd_ratelimit_state)) 1050 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 1051 } 1052 1053 dec_unacked(mdev); 1054 move_to_net_ee_or_free(mdev, peer_req); 1055 1056 if (unlikely(err)) 1057 dev_err(DEV, "drbd_send_block/ack() failed\n"); 1058 return err; 1059} 1060 1061int w_e_end_ov_req(struct drbd_work *w, int cancel) 1062{ 1063 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1064 struct drbd_conf *mdev = w->mdev; 1065 sector_t sector = peer_req->i.sector; 1066 unsigned int size = peer_req->i.size; 1067 int digest_size; 1068 void *digest; 1069 int err = 0; 1070 1071 if (unlikely(cancel)) 1072 goto out; 1073 1074 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm); 1075 digest = kmalloc(digest_size, GFP_NOIO); 1076 if (!digest) { 1077 err = 1; /* terminate the connection in case the allocation failed */ 1078 goto out; 1079 } 1080 1081 if (likely(!(peer_req->flags & EE_WAS_ERROR))) 1082 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest); 1083 else 1084 memset(digest, 0, digest_size); 1085 1086 /* Free e and pages before send. 1087 * In case we block on congestion, we could otherwise run into 1088 * some distributed deadlock, if the other side blocks on 1089 * congestion as well, because our receiver blocks in 1090 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1091 drbd_free_peer_req(mdev, peer_req); 1092 peer_req = NULL; 1093 inc_rs_pending(mdev); 1094 err = drbd_send_drequest_csum(mdev, sector, size, digest, digest_size, P_OV_REPLY); 1095 if (err) 1096 dec_rs_pending(mdev); 1097 kfree(digest); 1098 1099out: 1100 if (peer_req) 1101 drbd_free_peer_req(mdev, peer_req); 1102 dec_unacked(mdev); 1103 return err; 1104} 1105 1106void drbd_ov_out_of_sync_found(struct drbd_conf *mdev, sector_t sector, int size) 1107{ 1108 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) { 1109 mdev->ov_last_oos_size += size>>9; 1110 } else { 1111 mdev->ov_last_oos_start = sector; 1112 mdev->ov_last_oos_size = size>>9; 1113 } 1114 drbd_set_out_of_sync(mdev, sector, size); 1115} 1116 1117int w_e_end_ov_reply(struct drbd_work *w, int cancel) 1118{ 1119 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1120 struct drbd_conf *mdev = w->mdev; 1121 struct digest_info *di; 1122 void *digest; 1123 sector_t sector = peer_req->i.sector; 1124 unsigned int size = peer_req->i.size; 1125 int digest_size; 1126 int err, eq = 0; 1127 1128 if (unlikely(cancel)) { 1129 drbd_free_peer_req(mdev, peer_req); 1130 dec_unacked(mdev); 1131 return 0; 1132 } 1133 1134 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1135 * the resync lru has been cleaned up already */ 1136 if (get_ldev(mdev)) { 1137 drbd_rs_complete_io(mdev, peer_req->i.sector); 1138 put_ldev(mdev); 1139 } 1140 1141 di = peer_req->digest; 1142 1143 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1144 digest_size = crypto_hash_digestsize(mdev->tconn->verify_tfm); 1145 digest = kmalloc(digest_size, GFP_NOIO); 1146 if (digest) { 1147 drbd_csum_ee(mdev, mdev->tconn->verify_tfm, peer_req, digest); 1148 1149 D_ASSERT(digest_size == di->digest_size); 1150 eq = !memcmp(digest, di->digest, digest_size); 1151 kfree(digest); 1152 } 1153 } 1154 1155 /* Free peer_req and pages before send. 1156 * In case we block on congestion, we could otherwise run into 1157 * some distributed deadlock, if the other side blocks on 1158 * congestion as well, because our receiver blocks in 1159 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1160 drbd_free_peer_req(mdev, peer_req); 1161 if (!eq) 1162 drbd_ov_out_of_sync_found(mdev, sector, size); 1163 else 1164 ov_out_of_sync_print(mdev); 1165 1166 err = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, 1167 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1168 1169 dec_unacked(mdev); 1170 1171 --mdev->ov_left; 1172 1173 /* let's advance progress step marks only for every other megabyte */ 1174 if ((mdev->ov_left & 0x200) == 0x200) 1175 drbd_advance_rs_marks(mdev, mdev->ov_left); 1176 1177 if (mdev->ov_left == 0) { 1178 ov_out_of_sync_print(mdev); 1179 drbd_resync_finished(mdev); 1180 } 1181 1182 return err; 1183} 1184 1185int w_prev_work_done(struct drbd_work *w, int cancel) 1186{ 1187 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w); 1188 1189 complete(&b->done); 1190 return 0; 1191} 1192 1193int w_send_barrier(struct drbd_work *w, int cancel) 1194{ 1195 struct drbd_socket *sock; 1196 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w); 1197 struct drbd_conf *mdev = w->mdev; 1198 struct p_barrier *p; 1199 1200 /* really avoid racing with tl_clear. w.cb may have been referenced 1201 * just before it was reassigned and re-queued, so double check that. 1202 * actually, this race was harmless, since we only try to send the 1203 * barrier packet here, and otherwise do nothing with the object. 1204 * but compare with the head of w_clear_epoch */ 1205 spin_lock_irq(&mdev->tconn->req_lock); 1206 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED) 1207 cancel = 1; 1208 spin_unlock_irq(&mdev->tconn->req_lock); 1209 if (cancel) 1210 return 0; 1211 1212 sock = &mdev->tconn->data; 1213 p = drbd_prepare_command(mdev, sock); 1214 if (!p) 1215 return -EIO; 1216 p->barrier = b->br_number; 1217 /* inc_ap_pending was done where this was queued. 1218 * dec_ap_pending will be done in got_BarrierAck 1219 * or (on connection loss) in w_clear_epoch. */ 1220 return drbd_send_command(mdev, sock, P_BARRIER, sizeof(*p), NULL, 0); 1221} 1222 1223int w_send_write_hint(struct drbd_work *w, int cancel) 1224{ 1225 struct drbd_conf *mdev = w->mdev; 1226 struct drbd_socket *sock; 1227 1228 if (cancel) 1229 return 0; 1230 sock = &mdev->tconn->data; 1231 if (!drbd_prepare_command(mdev, sock)) 1232 return -EIO; 1233 return drbd_send_command(mdev, sock, P_UNPLUG_REMOTE, 0, NULL, 0); 1234} 1235 1236int w_send_out_of_sync(struct drbd_work *w, int cancel) 1237{ 1238 struct drbd_request *req = container_of(w, struct drbd_request, w); 1239 struct drbd_conf *mdev = w->mdev; 1240 int err; 1241 1242 if (unlikely(cancel)) { 1243 req_mod(req, SEND_CANCELED); 1244 return 0; 1245 } 1246 1247 err = drbd_send_out_of_sync(mdev, req); 1248 req_mod(req, OOS_HANDED_TO_NETWORK); 1249 1250 return err; 1251} 1252 1253/** 1254 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1255 * @mdev: DRBD device. 1256 * @w: work object. 1257 * @cancel: The connection will be closed anyways 1258 */ 1259int w_send_dblock(struct drbd_work *w, int cancel) 1260{ 1261 struct drbd_request *req = container_of(w, struct drbd_request, w); 1262 struct drbd_conf *mdev = w->mdev; 1263 int err; 1264 1265 if (unlikely(cancel)) { 1266 req_mod(req, SEND_CANCELED); 1267 return 0; 1268 } 1269 1270 err = drbd_send_dblock(mdev, req); 1271 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1272 1273 return err; 1274} 1275 1276/** 1277 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1278 * @mdev: DRBD device. 1279 * @w: work object. 1280 * @cancel: The connection will be closed anyways 1281 */ 1282int w_send_read_req(struct drbd_work *w, int cancel) 1283{ 1284 struct drbd_request *req = container_of(w, struct drbd_request, w); 1285 struct drbd_conf *mdev = w->mdev; 1286 int err; 1287 1288 if (unlikely(cancel)) { 1289 req_mod(req, SEND_CANCELED); 1290 return 0; 1291 } 1292 1293 err = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size, 1294 (unsigned long)req); 1295 1296 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1297 1298 return err; 1299} 1300 1301int w_restart_disk_io(struct drbd_work *w, int cancel) 1302{ 1303 struct drbd_request *req = container_of(w, struct drbd_request, w); 1304 struct drbd_conf *mdev = w->mdev; 1305 1306 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1307 drbd_al_begin_io(mdev, &req->i); 1308 /* Calling drbd_al_begin_io() out of the worker might deadlocks 1309 theoretically. Practically it can not deadlock, since this is 1310 only used when unfreezing IOs. All the extents of the requests 1311 that made it into the TL are already active */ 1312 1313 drbd_req_make_private_bio(req, req->master_bio); 1314 req->private_bio->bi_bdev = mdev->ldev->backing_bdev; 1315 generic_make_request(req->private_bio); 1316 1317 return 0; 1318} 1319 1320static int _drbd_may_sync_now(struct drbd_conf *mdev) 1321{ 1322 struct drbd_conf *odev = mdev; 1323 1324 while (1) { 1325 if (!odev->ldev) 1326 return 1; 1327 if (odev->ldev->dc.resync_after == -1) 1328 return 1; 1329 odev = minor_to_mdev(odev->ldev->dc.resync_after); 1330 if (!expect(odev)) 1331 return 1; 1332 if ((odev->state.conn >= C_SYNC_SOURCE && 1333 odev->state.conn <= C_PAUSED_SYNC_T) || 1334 odev->state.aftr_isp || odev->state.peer_isp || 1335 odev->state.user_isp) 1336 return 0; 1337 } 1338} 1339 1340/** 1341 * _drbd_pause_after() - Pause resync on all devices that may not resync now 1342 * @mdev: DRBD device. 1343 * 1344 * Called from process context only (admin command and after_state_ch). 1345 */ 1346static int _drbd_pause_after(struct drbd_conf *mdev) 1347{ 1348 struct drbd_conf *odev; 1349 int i, rv = 0; 1350 1351 idr_for_each_entry(&minors, odev, i) { 1352 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1353 continue; 1354 if (!_drbd_may_sync_now(odev)) 1355 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL) 1356 != SS_NOTHING_TO_DO); 1357 } 1358 1359 return rv; 1360} 1361 1362/** 1363 * _drbd_resume_next() - Resume resync on all devices that may resync now 1364 * @mdev: DRBD device. 1365 * 1366 * Called from process context only (admin command and worker). 1367 */ 1368static int _drbd_resume_next(struct drbd_conf *mdev) 1369{ 1370 struct drbd_conf *odev; 1371 int i, rv = 0; 1372 1373 idr_for_each_entry(&minors, odev, i) { 1374 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1375 continue; 1376 if (odev->state.aftr_isp) { 1377 if (_drbd_may_sync_now(odev)) 1378 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0), 1379 CS_HARD, NULL) 1380 != SS_NOTHING_TO_DO) ; 1381 } 1382 } 1383 return rv; 1384} 1385 1386void resume_next_sg(struct drbd_conf *mdev) 1387{ 1388 write_lock_irq(&global_state_lock); 1389 _drbd_resume_next(mdev); 1390 write_unlock_irq(&global_state_lock); 1391} 1392 1393void suspend_other_sg(struct drbd_conf *mdev) 1394{ 1395 write_lock_irq(&global_state_lock); 1396 _drbd_pause_after(mdev); 1397 write_unlock_irq(&global_state_lock); 1398} 1399 1400static int sync_after_error(struct drbd_conf *mdev, int o_minor) 1401{ 1402 struct drbd_conf *odev; 1403 1404 if (o_minor == -1) 1405 return NO_ERROR; 1406 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL) 1407 return ERR_SYNC_AFTER; 1408 1409 /* check for loops */ 1410 odev = minor_to_mdev(o_minor); 1411 while (1) { 1412 if (odev == mdev) 1413 return ERR_SYNC_AFTER_CYCLE; 1414 1415 /* dependency chain ends here, no cycles. */ 1416 if (odev->ldev->dc.resync_after == -1) 1417 return NO_ERROR; 1418 1419 /* follow the dependency chain */ 1420 odev = minor_to_mdev(odev->ldev->dc.resync_after); 1421 } 1422} 1423 1424int drbd_alter_sa(struct drbd_conf *mdev, int na) 1425{ 1426 int changes; 1427 int retcode; 1428 1429 write_lock_irq(&global_state_lock); 1430 retcode = sync_after_error(mdev, na); 1431 if (retcode == NO_ERROR) { 1432 mdev->ldev->dc.resync_after = na; 1433 do { 1434 changes = _drbd_pause_after(mdev); 1435 changes |= _drbd_resume_next(mdev); 1436 } while (changes); 1437 } 1438 write_unlock_irq(&global_state_lock); 1439 return retcode; 1440} 1441 1442void drbd_rs_controller_reset(struct drbd_conf *mdev) 1443{ 1444 atomic_set(&mdev->rs_sect_in, 0); 1445 atomic_set(&mdev->rs_sect_ev, 0); 1446 mdev->rs_in_flight = 0; 1447 mdev->rs_planed = 0; 1448 spin_lock(&mdev->peer_seq_lock); 1449 fifo_set(&mdev->rs_plan_s, 0); 1450 spin_unlock(&mdev->peer_seq_lock); 1451} 1452 1453void start_resync_timer_fn(unsigned long data) 1454{ 1455 struct drbd_conf *mdev = (struct drbd_conf *) data; 1456 1457 drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work); 1458} 1459 1460int w_start_resync(struct drbd_work *w, int cancel) 1461{ 1462 struct drbd_conf *mdev = w->mdev; 1463 1464 if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) { 1465 dev_warn(DEV, "w_start_resync later...\n"); 1466 mdev->start_resync_timer.expires = jiffies + HZ/10; 1467 add_timer(&mdev->start_resync_timer); 1468 return 0; 1469 } 1470 1471 drbd_start_resync(mdev, C_SYNC_SOURCE); 1472 clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags); 1473 return 0; 1474} 1475 1476/** 1477 * drbd_start_resync() - Start the resync process 1478 * @mdev: DRBD device. 1479 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1480 * 1481 * This function might bring you directly into one of the 1482 * C_PAUSED_SYNC_* states. 1483 */ 1484void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side) 1485{ 1486 union drbd_state ns; 1487 int r; 1488 1489 if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) { 1490 dev_err(DEV, "Resync already running!\n"); 1491 return; 1492 } 1493 1494 if (mdev->state.conn < C_AHEAD) { 1495 /* In case a previous resync run was aborted by an IO error/detach on the peer. */ 1496 drbd_rs_cancel_all(mdev); 1497 /* This should be done when we abort the resync. We definitely do not 1498 want to have this for connections going back and forth between 1499 Ahead/Behind and SyncSource/SyncTarget */ 1500 } 1501 1502 if (!test_bit(B_RS_H_DONE, &mdev->flags)) { 1503 if (side == C_SYNC_TARGET) { 1504 /* Since application IO was locked out during C_WF_BITMAP_T and 1505 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1506 we check that we might make the data inconsistent. */ 1507 r = drbd_khelper(mdev, "before-resync-target"); 1508 r = (r >> 8) & 0xff; 1509 if (r > 0) { 1510 dev_info(DEV, "before-resync-target handler returned %d, " 1511 "dropping connection.\n", r); 1512 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD); 1513 return; 1514 } 1515 } else /* C_SYNC_SOURCE */ { 1516 r = drbd_khelper(mdev, "before-resync-source"); 1517 r = (r >> 8) & 0xff; 1518 if (r > 0) { 1519 if (r == 3) { 1520 dev_info(DEV, "before-resync-source handler returned %d, " 1521 "ignoring. Old userland tools?", r); 1522 } else { 1523 dev_info(DEV, "before-resync-source handler returned %d, " 1524 "dropping connection.\n", r); 1525 conn_request_state(mdev->tconn, NS(conn, C_DISCONNECTING), CS_HARD); 1526 return; 1527 } 1528 } 1529 } 1530 } 1531 1532 if (current == mdev->tconn->worker.task) { 1533 /* The worker should not sleep waiting for state_mutex, 1534 that can take long */ 1535 if (!mutex_trylock(mdev->state_mutex)) { 1536 set_bit(B_RS_H_DONE, &mdev->flags); 1537 mdev->start_resync_timer.expires = jiffies + HZ/5; 1538 add_timer(&mdev->start_resync_timer); 1539 return; 1540 } 1541 } else { 1542 mutex_lock(mdev->state_mutex); 1543 } 1544 clear_bit(B_RS_H_DONE, &mdev->flags); 1545 1546 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) { 1547 mutex_unlock(mdev->state_mutex); 1548 return; 1549 } 1550 1551 write_lock_irq(&global_state_lock); 1552 ns = drbd_read_state(mdev); 1553 1554 ns.aftr_isp = !_drbd_may_sync_now(mdev); 1555 1556 ns.conn = side; 1557 1558 if (side == C_SYNC_TARGET) 1559 ns.disk = D_INCONSISTENT; 1560 else /* side == C_SYNC_SOURCE */ 1561 ns.pdsk = D_INCONSISTENT; 1562 1563 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 1564 ns = drbd_read_state(mdev); 1565 1566 if (ns.conn < C_CONNECTED) 1567 r = SS_UNKNOWN_ERROR; 1568 1569 if (r == SS_SUCCESS) { 1570 unsigned long tw = drbd_bm_total_weight(mdev); 1571 unsigned long now = jiffies; 1572 int i; 1573 1574 mdev->rs_failed = 0; 1575 mdev->rs_paused = 0; 1576 mdev->rs_same_csum = 0; 1577 mdev->rs_last_events = 0; 1578 mdev->rs_last_sect_ev = 0; 1579 mdev->rs_total = tw; 1580 mdev->rs_start = now; 1581 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1582 mdev->rs_mark_left[i] = tw; 1583 mdev->rs_mark_time[i] = now; 1584 } 1585 _drbd_pause_after(mdev); 1586 } 1587 write_unlock_irq(&global_state_lock); 1588 1589 if (r == SS_SUCCESS) { 1590 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1591 drbd_conn_str(ns.conn), 1592 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10), 1593 (unsigned long) mdev->rs_total); 1594 if (side == C_SYNC_TARGET) 1595 mdev->bm_resync_fo = 0; 1596 1597 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid 1598 * with w_send_oos, or the sync target will get confused as to 1599 * how much bits to resync. We cannot do that always, because for an 1600 * empty resync and protocol < 95, we need to do it here, as we call 1601 * drbd_resync_finished from here in that case. 1602 * We drbd_gen_and_send_sync_uuid here for protocol < 96, 1603 * and from after_state_ch otherwise. */ 1604 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96) 1605 drbd_gen_and_send_sync_uuid(mdev); 1606 1607 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) { 1608 /* This still has a race (about when exactly the peers 1609 * detect connection loss) that can lead to a full sync 1610 * on next handshake. In 8.3.9 we fixed this with explicit 1611 * resync-finished notifications, but the fix 1612 * introduces a protocol change. Sleeping for some 1613 * time longer than the ping interval + timeout on the 1614 * SyncSource, to give the SyncTarget the chance to 1615 * detect connection loss, then waiting for a ping 1616 * response (implicit in drbd_resync_finished) reduces 1617 * the race considerably, but does not solve it. */ 1618 if (side == C_SYNC_SOURCE) 1619 schedule_timeout_interruptible( 1620 mdev->tconn->net_conf->ping_int * HZ + 1621 mdev->tconn->net_conf->ping_timeo*HZ/9); 1622 drbd_resync_finished(mdev); 1623 } 1624 1625 drbd_rs_controller_reset(mdev); 1626 /* ns.conn may already be != mdev->state.conn, 1627 * we may have been paused in between, or become paused until 1628 * the timer triggers. 1629 * No matter, that is handled in resync_timer_fn() */ 1630 if (ns.conn == C_SYNC_TARGET) 1631 mod_timer(&mdev->resync_timer, jiffies); 1632 1633 drbd_md_sync(mdev); 1634 } 1635 put_ldev(mdev); 1636 mutex_unlock(mdev->state_mutex); 1637} 1638 1639int drbd_worker(struct drbd_thread *thi) 1640{ 1641 struct drbd_tconn *tconn = thi->tconn; 1642 struct drbd_work *w = NULL; 1643 struct drbd_conf *mdev; 1644 LIST_HEAD(work_list); 1645 int vnr, intr = 0; 1646 1647 while (get_t_state(thi) == RUNNING) { 1648 drbd_thread_current_set_cpu(thi); 1649 1650 if (down_trylock(&tconn->data.work.s)) { 1651 mutex_lock(&tconn->data.mutex); 1652 if (tconn->data.socket && !tconn->net_conf->no_cork) 1653 drbd_tcp_uncork(tconn->data.socket); 1654 mutex_unlock(&tconn->data.mutex); 1655 1656 intr = down_interruptible(&tconn->data.work.s); 1657 1658 mutex_lock(&tconn->data.mutex); 1659 if (tconn->data.socket && !tconn->net_conf->no_cork) 1660 drbd_tcp_cork(tconn->data.socket); 1661 mutex_unlock(&tconn->data.mutex); 1662 } 1663 1664 if (intr) { 1665 flush_signals(current); 1666 if (get_t_state(thi) == RUNNING) { 1667 conn_warn(tconn, "Worker got an unexpected signal\n"); 1668 continue; 1669 } 1670 break; 1671 } 1672 1673 if (get_t_state(thi) != RUNNING) 1674 break; 1675 /* With this break, we have done a down() but not consumed 1676 the entry from the list. The cleanup code takes care of 1677 this... */ 1678 1679 w = NULL; 1680 spin_lock_irq(&tconn->data.work.q_lock); 1681 if (list_empty(&tconn->data.work.q)) { 1682 /* something terribly wrong in our logic. 1683 * we were able to down() the semaphore, 1684 * but the list is empty... doh. 1685 * 1686 * what is the best thing to do now? 1687 * try again from scratch, restarting the receiver, 1688 * asender, whatnot? could break even more ugly, 1689 * e.g. when we are primary, but no good local data. 1690 * 1691 * I'll try to get away just starting over this loop. 1692 */ 1693 conn_warn(tconn, "Work list unexpectedly empty\n"); 1694 spin_unlock_irq(&tconn->data.work.q_lock); 1695 continue; 1696 } 1697 w = list_entry(tconn->data.work.q.next, struct drbd_work, list); 1698 list_del_init(&w->list); 1699 spin_unlock_irq(&tconn->data.work.q_lock); 1700 1701 if (w->cb(w, tconn->cstate < C_WF_REPORT_PARAMS)) { 1702 /* dev_warn(DEV, "worker: a callback failed! \n"); */ 1703 if (tconn->cstate >= C_WF_REPORT_PARAMS) 1704 conn_request_state(tconn, NS(conn, C_NETWORK_FAILURE), CS_HARD); 1705 } 1706 } 1707 1708 spin_lock_irq(&tconn->data.work.q_lock); 1709 while (!list_empty(&tconn->data.work.q)) { 1710 list_splice_init(&tconn->data.work.q, &work_list); 1711 spin_unlock_irq(&tconn->data.work.q_lock); 1712 1713 while (!list_empty(&work_list)) { 1714 w = list_entry(work_list.next, struct drbd_work, list); 1715 list_del_init(&w->list); 1716 w->cb(w, 1); 1717 } 1718 1719 spin_lock_irq(&tconn->data.work.q_lock); 1720 } 1721 sema_init(&tconn->data.work.s, 0); 1722 /* DANGEROUS race: if someone did queue his work within the spinlock, 1723 * but up() ed outside the spinlock, we could get an up() on the 1724 * semaphore without corresponding list entry. 1725 * So don't do that. 1726 */ 1727 spin_unlock_irq(&tconn->data.work.q_lock); 1728 1729 drbd_thread_stop(&tconn->receiver); 1730 idr_for_each_entry(&tconn->volumes, mdev, vnr) { 1731 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE); 1732 /* _drbd_set_state only uses stop_nowait. 1733 * wait here for the exiting receiver. */ 1734 drbd_mdev_cleanup(mdev); 1735 } 1736 clear_bit(OBJECT_DYING, &tconn->flags); 1737 clear_bit(CONFIG_PENDING, &tconn->flags); 1738 wake_up(&tconn->ping_wait); 1739 1740 return 0; 1741} 1742