drbd_worker.c revision e64a32945902a178c9de9b38e0ea3290981605bc
1/* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26#include <linux/module.h> 27#include <linux/drbd.h> 28#include <linux/sched.h> 29#include <linux/wait.h> 30#include <linux/mm.h> 31#include <linux/memcontrol.h> 32#include <linux/mm_inline.h> 33#include <linux/slab.h> 34#include <linux/random.h> 35#include <linux/string.h> 36#include <linux/scatterlist.h> 37 38#include "drbd_int.h" 39#include "drbd_req.h" 40 41static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel); 42static int w_make_resync_request(struct drbd_conf *mdev, 43 struct drbd_work *w, int cancel); 44 45 46 47/* endio handlers: 48 * drbd_md_io_complete (defined here) 49 * drbd_endio_pri (defined here) 50 * drbd_endio_sec (defined here) 51 * bm_async_io_complete (defined in drbd_bitmap.c) 52 * 53 * For all these callbacks, note the following: 54 * The callbacks will be called in irq context by the IDE drivers, 55 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 56 * Try to get the locking right :) 57 * 58 */ 59 60 61/* About the global_state_lock 62 Each state transition on an device holds a read lock. In case we have 63 to evaluate the sync after dependencies, we grab a write lock, because 64 we need stable states on all devices for that. */ 65rwlock_t global_state_lock; 66 67/* used for synchronous meta data and bitmap IO 68 * submitted by drbd_md_sync_page_io() 69 */ 70void drbd_md_io_complete(struct bio *bio, int error) 71{ 72 struct drbd_md_io *md_io; 73 74 md_io = (struct drbd_md_io *)bio->bi_private; 75 md_io->error = error; 76 77 complete(&md_io->event); 78} 79 80/* reads on behalf of the partner, 81 * "submitted" by the receiver 82 */ 83void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local) 84{ 85 unsigned long flags = 0; 86 struct drbd_conf *mdev = peer_req->mdev; 87 88 spin_lock_irqsave(&mdev->tconn->req_lock, flags); 89 mdev->read_cnt += peer_req->i.size >> 9; 90 list_del(&peer_req->w.list); 91 if (list_empty(&mdev->read_ee)) 92 wake_up(&mdev->ee_wait); 93 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 94 __drbd_chk_io_error(mdev, false); 95 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags); 96 97 drbd_queue_work(&mdev->tconn->data.work, &peer_req->w); 98 put_ldev(mdev); 99} 100 101/* writes on behalf of the partner, or resync writes, 102 * "submitted" by the receiver, final stage. */ 103static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local) 104{ 105 unsigned long flags = 0; 106 struct drbd_conf *mdev = peer_req->mdev; 107 sector_t e_sector; 108 int do_wake; 109 u64 block_id; 110 int do_al_complete_io; 111 112 /* after we moved peer_req to done_ee, 113 * we may no longer access it, 114 * it may be freed/reused already! 115 * (as soon as we release the req_lock) */ 116 e_sector = peer_req->i.sector; 117 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO; 118 block_id = peer_req->block_id; 119 120 spin_lock_irqsave(&mdev->tconn->req_lock, flags); 121 mdev->writ_cnt += peer_req->i.size >> 9; 122 list_del(&peer_req->w.list); /* has been on active_ee or sync_ee */ 123 list_add_tail(&peer_req->w.list, &mdev->done_ee); 124 125 /* 126 * Do not remove from the write_requests tree here: we did not send the 127 * Ack yet and did not wake possibly waiting conflicting requests. 128 * Removed from the tree from "drbd_process_done_ee" within the 129 * appropriate w.cb (e_end_block/e_end_resync_block) or from 130 * _drbd_clear_done_ee. 131 */ 132 133 do_wake = list_empty(block_id == ID_SYNCER ? &mdev->sync_ee : &mdev->active_ee); 134 135 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 136 __drbd_chk_io_error(mdev, false); 137 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags); 138 139 if (block_id == ID_SYNCER) 140 drbd_rs_complete_io(mdev, e_sector); 141 142 if (do_wake) 143 wake_up(&mdev->ee_wait); 144 145 if (do_al_complete_io) 146 drbd_al_complete_io(mdev, e_sector); 147 148 wake_asender(mdev); 149 put_ldev(mdev); 150} 151 152/* writes on behalf of the partner, or resync writes, 153 * "submitted" by the receiver. 154 */ 155void drbd_endio_sec(struct bio *bio, int error) 156{ 157 struct drbd_peer_request *peer_req = bio->bi_private; 158 struct drbd_conf *mdev = peer_req->mdev; 159 int uptodate = bio_flagged(bio, BIO_UPTODATE); 160 int is_write = bio_data_dir(bio) == WRITE; 161 162 if (error && __ratelimit(&drbd_ratelimit_state)) 163 dev_warn(DEV, "%s: error=%d s=%llus\n", 164 is_write ? "write" : "read", error, 165 (unsigned long long)peer_req->i.sector); 166 if (!error && !uptodate) { 167 if (__ratelimit(&drbd_ratelimit_state)) 168 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n", 169 is_write ? "write" : "read", 170 (unsigned long long)peer_req->i.sector); 171 /* strange behavior of some lower level drivers... 172 * fail the request by clearing the uptodate flag, 173 * but do not return any error?! */ 174 error = -EIO; 175 } 176 177 if (error) 178 set_bit(__EE_WAS_ERROR, &peer_req->flags); 179 180 bio_put(bio); /* no need for the bio anymore */ 181 if (atomic_dec_and_test(&peer_req->pending_bios)) { 182 if (is_write) 183 drbd_endio_write_sec_final(peer_req); 184 else 185 drbd_endio_read_sec_final(peer_req); 186 } 187} 188 189/* read, readA or write requests on R_PRIMARY coming from drbd_make_request 190 */ 191void drbd_endio_pri(struct bio *bio, int error) 192{ 193 unsigned long flags; 194 struct drbd_request *req = bio->bi_private; 195 struct drbd_conf *mdev = req->mdev; 196 struct bio_and_error m; 197 enum drbd_req_event what; 198 int uptodate = bio_flagged(bio, BIO_UPTODATE); 199 200 if (!error && !uptodate) { 201 dev_warn(DEV, "p %s: setting error to -EIO\n", 202 bio_data_dir(bio) == WRITE ? "write" : "read"); 203 /* strange behavior of some lower level drivers... 204 * fail the request by clearing the uptodate flag, 205 * but do not return any error?! */ 206 error = -EIO; 207 } 208 209 /* to avoid recursion in __req_mod */ 210 if (unlikely(error)) { 211 what = (bio_data_dir(bio) == WRITE) 212 ? WRITE_COMPLETED_WITH_ERROR 213 : (bio_rw(bio) == READ) 214 ? READ_COMPLETED_WITH_ERROR 215 : READ_AHEAD_COMPLETED_WITH_ERROR; 216 } else 217 what = COMPLETED_OK; 218 219 bio_put(req->private_bio); 220 req->private_bio = ERR_PTR(error); 221 222 /* not req_mod(), we need irqsave here! */ 223 spin_lock_irqsave(&mdev->tconn->req_lock, flags); 224 __req_mod(req, what, &m); 225 spin_unlock_irqrestore(&mdev->tconn->req_lock, flags); 226 227 if (m.bio) 228 complete_master_bio(mdev, &m); 229} 230 231int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 232{ 233 struct drbd_request *req = container_of(w, struct drbd_request, w); 234 235 /* We should not detach for read io-error, 236 * but try to WRITE the P_DATA_REPLY to the failed location, 237 * to give the disk the chance to relocate that block */ 238 239 spin_lock_irq(&mdev->tconn->req_lock); 240 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) { 241 _req_mod(req, READ_RETRY_REMOTE_CANCELED); 242 spin_unlock_irq(&mdev->tconn->req_lock); 243 return 1; 244 } 245 spin_unlock_irq(&mdev->tconn->req_lock); 246 247 return w_send_read_req(mdev, w, 0); 248} 249 250void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, 251 struct drbd_peer_request *peer_req, void *digest) 252{ 253 struct hash_desc desc; 254 struct scatterlist sg; 255 struct page *page = peer_req->pages; 256 struct page *tmp; 257 unsigned len; 258 259 desc.tfm = tfm; 260 desc.flags = 0; 261 262 sg_init_table(&sg, 1); 263 crypto_hash_init(&desc); 264 265 while ((tmp = page_chain_next(page))) { 266 /* all but the last page will be fully used */ 267 sg_set_page(&sg, page, PAGE_SIZE, 0); 268 crypto_hash_update(&desc, &sg, sg.length); 269 page = tmp; 270 } 271 /* and now the last, possibly only partially used page */ 272 len = peer_req->i.size & (PAGE_SIZE - 1); 273 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 274 crypto_hash_update(&desc, &sg, sg.length); 275 crypto_hash_final(&desc, digest); 276} 277 278void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) 279{ 280 struct hash_desc desc; 281 struct scatterlist sg; 282 struct bio_vec *bvec; 283 int i; 284 285 desc.tfm = tfm; 286 desc.flags = 0; 287 288 sg_init_table(&sg, 1); 289 crypto_hash_init(&desc); 290 291 __bio_for_each_segment(bvec, bio, i, 0) { 292 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset); 293 crypto_hash_update(&desc, &sg, sg.length); 294 } 295 crypto_hash_final(&desc, digest); 296} 297 298/* TODO merge common code with w_e_end_ov_req */ 299int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 300{ 301 struct drbd_peer_request *peer_req = 302 container_of(w, struct drbd_peer_request, w); 303 int digest_size; 304 void *digest; 305 int ok = 1; 306 307 if (unlikely(cancel)) 308 goto out; 309 310 if (likely((peer_req->flags & EE_WAS_ERROR) != 0)) 311 goto out; 312 313 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 314 digest = kmalloc(digest_size, GFP_NOIO); 315 if (digest) { 316 sector_t sector = peer_req->i.sector; 317 unsigned int size = peer_req->i.size; 318 drbd_csum_ee(mdev, mdev->csums_tfm, peer_req, digest); 319 /* Free e and pages before send. 320 * In case we block on congestion, we could otherwise run into 321 * some distributed deadlock, if the other side blocks on 322 * congestion as well, because our receiver blocks in 323 * drbd_pp_alloc due to pp_in_use > max_buffers. */ 324 drbd_free_ee(mdev, peer_req); 325 peer_req = NULL; 326 inc_rs_pending(mdev); 327 ok = drbd_send_drequest_csum(mdev, sector, size, 328 digest, digest_size, 329 P_CSUM_RS_REQUEST); 330 kfree(digest); 331 } else { 332 dev_err(DEV, "kmalloc() of digest failed.\n"); 333 ok = 0; 334 } 335 336out: 337 if (peer_req) 338 drbd_free_ee(mdev, peer_req); 339 340 if (unlikely(!ok)) 341 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n"); 342 return ok; 343} 344 345#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 346 347static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size) 348{ 349 struct drbd_peer_request *peer_req; 350 351 if (!get_ldev(mdev)) 352 return -EIO; 353 354 if (drbd_rs_should_slow_down(mdev, sector)) 355 goto defer; 356 357 /* GFP_TRY, because if there is no memory available right now, this may 358 * be rescheduled for later. It is "only" background resync, after all. */ 359 peer_req = drbd_alloc_ee(mdev, ID_SYNCER /* unused */, sector, size, GFP_TRY); 360 if (!peer_req) 361 goto defer; 362 363 peer_req->w.cb = w_e_send_csum; 364 spin_lock_irq(&mdev->tconn->req_lock); 365 list_add(&peer_req->w.list, &mdev->read_ee); 366 spin_unlock_irq(&mdev->tconn->req_lock); 367 368 atomic_add(size >> 9, &mdev->rs_sect_ev); 369 if (drbd_submit_ee(mdev, peer_req, READ, DRBD_FAULT_RS_RD) == 0) 370 return 0; 371 372 /* If it failed because of ENOMEM, retry should help. If it failed 373 * because bio_add_page failed (probably broken lower level driver), 374 * retry may or may not help. 375 * If it does not, you may need to force disconnect. */ 376 spin_lock_irq(&mdev->tconn->req_lock); 377 list_del(&peer_req->w.list); 378 spin_unlock_irq(&mdev->tconn->req_lock); 379 380 drbd_free_ee(mdev, peer_req); 381defer: 382 put_ldev(mdev); 383 return -EAGAIN; 384} 385 386int w_resync_timer(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 387{ 388 switch (mdev->state.conn) { 389 case C_VERIFY_S: 390 w_make_ov_request(mdev, w, cancel); 391 break; 392 case C_SYNC_TARGET: 393 w_make_resync_request(mdev, w, cancel); 394 break; 395 } 396 397 return 1; 398} 399 400void resync_timer_fn(unsigned long data) 401{ 402 struct drbd_conf *mdev = (struct drbd_conf *) data; 403 404 if (list_empty(&mdev->resync_work.list)) 405 drbd_queue_work(&mdev->tconn->data.work, &mdev->resync_work); 406} 407 408static void fifo_set(struct fifo_buffer *fb, int value) 409{ 410 int i; 411 412 for (i = 0; i < fb->size; i++) 413 fb->values[i] = value; 414} 415 416static int fifo_push(struct fifo_buffer *fb, int value) 417{ 418 int ov; 419 420 ov = fb->values[fb->head_index]; 421 fb->values[fb->head_index++] = value; 422 423 if (fb->head_index >= fb->size) 424 fb->head_index = 0; 425 426 return ov; 427} 428 429static void fifo_add_val(struct fifo_buffer *fb, int value) 430{ 431 int i; 432 433 for (i = 0; i < fb->size; i++) 434 fb->values[i] += value; 435} 436 437static int drbd_rs_controller(struct drbd_conf *mdev) 438{ 439 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 440 unsigned int want; /* The number of sectors we want in the proxy */ 441 int req_sect; /* Number of sectors to request in this turn */ 442 int correction; /* Number of sectors more we need in the proxy*/ 443 int cps; /* correction per invocation of drbd_rs_controller() */ 444 int steps; /* Number of time steps to plan ahead */ 445 int curr_corr; 446 int max_sect; 447 448 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */ 449 mdev->rs_in_flight -= sect_in; 450 451 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */ 452 453 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 454 455 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */ 456 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps; 457 } else { /* normal path */ 458 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target : 459 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10); 460 } 461 462 correction = want - mdev->rs_in_flight - mdev->rs_planed; 463 464 /* Plan ahead */ 465 cps = correction / steps; 466 fifo_add_val(&mdev->rs_plan_s, cps); 467 mdev->rs_planed += cps * steps; 468 469 /* What we do in this step */ 470 curr_corr = fifo_push(&mdev->rs_plan_s, 0); 471 spin_unlock(&mdev->peer_seq_lock); 472 mdev->rs_planed -= curr_corr; 473 474 req_sect = sect_in + curr_corr; 475 if (req_sect < 0) 476 req_sect = 0; 477 478 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ; 479 if (req_sect > max_sect) 480 req_sect = max_sect; 481 482 /* 483 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 484 sect_in, mdev->rs_in_flight, want, correction, 485 steps, cps, mdev->rs_planed, curr_corr, req_sect); 486 */ 487 488 return req_sect; 489} 490 491static int drbd_rs_number_requests(struct drbd_conf *mdev) 492{ 493 int number; 494 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */ 495 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9); 496 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 497 } else { 498 mdev->c_sync_rate = mdev->sync_conf.rate; 499 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 500 } 501 502 /* ignore the amount of pending requests, the resync controller should 503 * throttle down to incoming reply rate soon enough anyways. */ 504 return number; 505} 506 507static int w_make_resync_request(struct drbd_conf *mdev, 508 struct drbd_work *w, int cancel) 509{ 510 unsigned long bit; 511 sector_t sector; 512 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 513 int max_bio_size; 514 int number, rollback_i, size; 515 int align, queued, sndbuf; 516 int i = 0; 517 518 if (unlikely(cancel)) 519 return 1; 520 521 if (mdev->rs_total == 0) { 522 /* empty resync? */ 523 drbd_resync_finished(mdev); 524 return 1; 525 } 526 527 if (!get_ldev(mdev)) { 528 /* Since we only need to access mdev->rsync a 529 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but 530 to continue resync with a broken disk makes no sense at 531 all */ 532 dev_err(DEV, "Disk broke down during resync!\n"); 533 return 1; 534 } 535 536 max_bio_size = queue_max_hw_sectors(mdev->rq_queue) << 9; 537 number = drbd_rs_number_requests(mdev); 538 if (number == 0) 539 goto requeue; 540 541 for (i = 0; i < number; i++) { 542 /* Stop generating RS requests, when half of the send buffer is filled */ 543 mutex_lock(&mdev->tconn->data.mutex); 544 if (mdev->tconn->data.socket) { 545 queued = mdev->tconn->data.socket->sk->sk_wmem_queued; 546 sndbuf = mdev->tconn->data.socket->sk->sk_sndbuf; 547 } else { 548 queued = 1; 549 sndbuf = 0; 550 } 551 mutex_unlock(&mdev->tconn->data.mutex); 552 if (queued > sndbuf / 2) 553 goto requeue; 554 555next_sector: 556 size = BM_BLOCK_SIZE; 557 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo); 558 559 if (bit == DRBD_END_OF_BITMAP) { 560 mdev->bm_resync_fo = drbd_bm_bits(mdev); 561 put_ldev(mdev); 562 return 1; 563 } 564 565 sector = BM_BIT_TO_SECT(bit); 566 567 if (drbd_rs_should_slow_down(mdev, sector) || 568 drbd_try_rs_begin_io(mdev, sector)) { 569 mdev->bm_resync_fo = bit; 570 goto requeue; 571 } 572 mdev->bm_resync_fo = bit + 1; 573 574 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) { 575 drbd_rs_complete_io(mdev, sector); 576 goto next_sector; 577 } 578 579#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE 580 /* try to find some adjacent bits. 581 * we stop if we have already the maximum req size. 582 * 583 * Additionally always align bigger requests, in order to 584 * be prepared for all stripe sizes of software RAIDs. 585 */ 586 align = 1; 587 rollback_i = i; 588 for (;;) { 589 if (size + BM_BLOCK_SIZE > max_bio_size) 590 break; 591 592 /* Be always aligned */ 593 if (sector & ((1<<(align+3))-1)) 594 break; 595 596 /* do not cross extent boundaries */ 597 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 598 break; 599 /* now, is it actually dirty, after all? 600 * caution, drbd_bm_test_bit is tri-state for some 601 * obscure reason; ( b == 0 ) would get the out-of-band 602 * only accidentally right because of the "oddly sized" 603 * adjustment below */ 604 if (drbd_bm_test_bit(mdev, bit+1) != 1) 605 break; 606 bit++; 607 size += BM_BLOCK_SIZE; 608 if ((BM_BLOCK_SIZE << align) <= size) 609 align++; 610 i++; 611 } 612 /* if we merged some, 613 * reset the offset to start the next drbd_bm_find_next from */ 614 if (size > BM_BLOCK_SIZE) 615 mdev->bm_resync_fo = bit + 1; 616#endif 617 618 /* adjust very last sectors, in case we are oddly sized */ 619 if (sector + (size>>9) > capacity) 620 size = (capacity-sector)<<9; 621 if (mdev->tconn->agreed_pro_version >= 89 && mdev->csums_tfm) { 622 switch (read_for_csum(mdev, sector, size)) { 623 case -EIO: /* Disk failure */ 624 put_ldev(mdev); 625 return 0; 626 case -EAGAIN: /* allocation failed, or ldev busy */ 627 drbd_rs_complete_io(mdev, sector); 628 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector); 629 i = rollback_i; 630 goto requeue; 631 case 0: 632 /* everything ok */ 633 break; 634 default: 635 BUG(); 636 } 637 } else { 638 inc_rs_pending(mdev); 639 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST, 640 sector, size, ID_SYNCER)) { 641 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n"); 642 dec_rs_pending(mdev); 643 put_ldev(mdev); 644 return 0; 645 } 646 } 647 } 648 649 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) { 650 /* last syncer _request_ was sent, 651 * but the P_RS_DATA_REPLY not yet received. sync will end (and 652 * next sync group will resume), as soon as we receive the last 653 * resync data block, and the last bit is cleared. 654 * until then resync "work" is "inactive" ... 655 */ 656 put_ldev(mdev); 657 return 1; 658 } 659 660 requeue: 661 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 662 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 663 put_ldev(mdev); 664 return 1; 665} 666 667static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 668{ 669 int number, i, size; 670 sector_t sector; 671 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 672 673 if (unlikely(cancel)) 674 return 1; 675 676 number = drbd_rs_number_requests(mdev); 677 678 sector = mdev->ov_position; 679 for (i = 0; i < number; i++) { 680 if (sector >= capacity) { 681 return 1; 682 } 683 684 size = BM_BLOCK_SIZE; 685 686 if (drbd_rs_should_slow_down(mdev, sector) || 687 drbd_try_rs_begin_io(mdev, sector)) { 688 mdev->ov_position = sector; 689 goto requeue; 690 } 691 692 if (sector + (size>>9) > capacity) 693 size = (capacity-sector)<<9; 694 695 inc_rs_pending(mdev); 696 if (!drbd_send_ov_request(mdev, sector, size)) { 697 dec_rs_pending(mdev); 698 return 0; 699 } 700 sector += BM_SECT_PER_BIT; 701 } 702 mdev->ov_position = sector; 703 704 requeue: 705 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 706 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 707 return 1; 708} 709 710int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 711{ 712 kfree(w); 713 ov_oos_print(mdev); 714 drbd_resync_finished(mdev); 715 716 return 1; 717} 718 719static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 720{ 721 kfree(w); 722 723 drbd_resync_finished(mdev); 724 725 return 1; 726} 727 728static void ping_peer(struct drbd_conf *mdev) 729{ 730 clear_bit(GOT_PING_ACK, &mdev->flags); 731 request_ping(mdev); 732 wait_event(mdev->misc_wait, 733 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED); 734} 735 736int drbd_resync_finished(struct drbd_conf *mdev) 737{ 738 unsigned long db, dt, dbdt; 739 unsigned long n_oos; 740 union drbd_state os, ns; 741 struct drbd_work *w; 742 char *khelper_cmd = NULL; 743 int verify_done = 0; 744 745 /* Remove all elements from the resync LRU. Since future actions 746 * might set bits in the (main) bitmap, then the entries in the 747 * resync LRU would be wrong. */ 748 if (drbd_rs_del_all(mdev)) { 749 /* In case this is not possible now, most probably because 750 * there are P_RS_DATA_REPLY Packets lingering on the worker's 751 * queue (or even the read operations for those packets 752 * is not finished by now). Retry in 100ms. */ 753 754 schedule_timeout_interruptible(HZ / 10); 755 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC); 756 if (w) { 757 w->cb = w_resync_finished; 758 drbd_queue_work(&mdev->tconn->data.work, w); 759 return 1; 760 } 761 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n"); 762 } 763 764 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ; 765 if (dt <= 0) 766 dt = 1; 767 db = mdev->rs_total; 768 dbdt = Bit2KB(db/dt); 769 mdev->rs_paused /= HZ; 770 771 if (!get_ldev(mdev)) 772 goto out; 773 774 ping_peer(mdev); 775 776 spin_lock_irq(&mdev->tconn->req_lock); 777 os = mdev->state; 778 779 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T); 780 781 /* This protects us against multiple calls (that can happen in the presence 782 of application IO), and against connectivity loss just before we arrive here. */ 783 if (os.conn <= C_CONNECTED) 784 goto out_unlock; 785 786 ns = os; 787 ns.conn = C_CONNECTED; 788 789 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 790 verify_done ? "Online verify " : "Resync", 791 dt + mdev->rs_paused, mdev->rs_paused, dbdt); 792 793 n_oos = drbd_bm_total_weight(mdev); 794 795 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 796 if (n_oos) { 797 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n", 798 n_oos, Bit2KB(1)); 799 khelper_cmd = "out-of-sync"; 800 } 801 } else { 802 D_ASSERT((n_oos - mdev->rs_failed) == 0); 803 804 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 805 khelper_cmd = "after-resync-target"; 806 807 if (mdev->csums_tfm && mdev->rs_total) { 808 const unsigned long s = mdev->rs_same_csum; 809 const unsigned long t = mdev->rs_total; 810 const int ratio = 811 (t == 0) ? 0 : 812 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 813 dev_info(DEV, "%u %% had equal checksums, eliminated: %luK; " 814 "transferred %luK total %luK\n", 815 ratio, 816 Bit2KB(mdev->rs_same_csum), 817 Bit2KB(mdev->rs_total - mdev->rs_same_csum), 818 Bit2KB(mdev->rs_total)); 819 } 820 } 821 822 if (mdev->rs_failed) { 823 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed); 824 825 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 826 ns.disk = D_INCONSISTENT; 827 ns.pdsk = D_UP_TO_DATE; 828 } else { 829 ns.disk = D_UP_TO_DATE; 830 ns.pdsk = D_INCONSISTENT; 831 } 832 } else { 833 ns.disk = D_UP_TO_DATE; 834 ns.pdsk = D_UP_TO_DATE; 835 836 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 837 if (mdev->p_uuid) { 838 int i; 839 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 840 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]); 841 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]); 842 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]); 843 } else { 844 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n"); 845 } 846 } 847 848 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) { 849 /* for verify runs, we don't update uuids here, 850 * so there would be nothing to report. */ 851 drbd_uuid_set_bm(mdev, 0UL); 852 drbd_print_uuids(mdev, "updated UUIDs"); 853 if (mdev->p_uuid) { 854 /* Now the two UUID sets are equal, update what we 855 * know of the peer. */ 856 int i; 857 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 858 mdev->p_uuid[i] = mdev->ldev->md.uuid[i]; 859 } 860 } 861 } 862 863 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 864out_unlock: 865 spin_unlock_irq(&mdev->tconn->req_lock); 866 put_ldev(mdev); 867out: 868 mdev->rs_total = 0; 869 mdev->rs_failed = 0; 870 mdev->rs_paused = 0; 871 if (verify_done) 872 mdev->ov_start_sector = 0; 873 874 drbd_md_sync(mdev); 875 876 if (khelper_cmd) 877 drbd_khelper(mdev, khelper_cmd); 878 879 return 1; 880} 881 882/* helper */ 883static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_peer_request *peer_req) 884{ 885 if (drbd_ee_has_active_page(peer_req)) { 886 /* This might happen if sendpage() has not finished */ 887 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT; 888 atomic_add(i, &mdev->pp_in_use_by_net); 889 atomic_sub(i, &mdev->pp_in_use); 890 spin_lock_irq(&mdev->tconn->req_lock); 891 list_add_tail(&peer_req->w.list, &mdev->net_ee); 892 spin_unlock_irq(&mdev->tconn->req_lock); 893 wake_up(&drbd_pp_wait); 894 } else 895 drbd_free_ee(mdev, peer_req); 896} 897 898/** 899 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 900 * @mdev: DRBD device. 901 * @w: work object. 902 * @cancel: The connection will be closed anyways 903 */ 904int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 905{ 906 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 907 int ok; 908 909 if (unlikely(cancel)) { 910 drbd_free_ee(mdev, peer_req); 911 dec_unacked(mdev); 912 return 1; 913 } 914 915 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 916 ok = drbd_send_block(mdev, P_DATA_REPLY, peer_req); 917 } else { 918 if (__ratelimit(&drbd_ratelimit_state)) 919 dev_err(DEV, "Sending NegDReply. sector=%llus.\n", 920 (unsigned long long)peer_req->i.sector); 921 922 ok = drbd_send_ack(mdev, P_NEG_DREPLY, peer_req); 923 } 924 925 dec_unacked(mdev); 926 927 move_to_net_ee_or_free(mdev, peer_req); 928 929 if (unlikely(!ok)) 930 dev_err(DEV, "drbd_send_block() failed\n"); 931 return ok; 932} 933 934/** 935 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS 936 * @mdev: DRBD device. 937 * @w: work object. 938 * @cancel: The connection will be closed anyways 939 */ 940int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 941{ 942 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 943 int ok; 944 945 if (unlikely(cancel)) { 946 drbd_free_ee(mdev, peer_req); 947 dec_unacked(mdev); 948 return 1; 949 } 950 951 if (get_ldev_if_state(mdev, D_FAILED)) { 952 drbd_rs_complete_io(mdev, peer_req->i.sector); 953 put_ldev(mdev); 954 } 955 956 if (mdev->state.conn == C_AHEAD) { 957 ok = drbd_send_ack(mdev, P_RS_CANCEL, peer_req); 958 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 959 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) { 960 inc_rs_pending(mdev); 961 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req); 962 } else { 963 if (__ratelimit(&drbd_ratelimit_state)) 964 dev_err(DEV, "Not sending RSDataReply, " 965 "partner DISKLESS!\n"); 966 ok = 1; 967 } 968 } else { 969 if (__ratelimit(&drbd_ratelimit_state)) 970 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n", 971 (unsigned long long)peer_req->i.sector); 972 973 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req); 974 975 /* update resync data with failure */ 976 drbd_rs_failed_io(mdev, peer_req->i.sector, peer_req->i.size); 977 } 978 979 dec_unacked(mdev); 980 981 move_to_net_ee_or_free(mdev, peer_req); 982 983 if (unlikely(!ok)) 984 dev_err(DEV, "drbd_send_block() failed\n"); 985 return ok; 986} 987 988int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 989{ 990 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 991 struct digest_info *di; 992 int digest_size; 993 void *digest = NULL; 994 int ok, eq = 0; 995 996 if (unlikely(cancel)) { 997 drbd_free_ee(mdev, peer_req); 998 dec_unacked(mdev); 999 return 1; 1000 } 1001 1002 if (get_ldev(mdev)) { 1003 drbd_rs_complete_io(mdev, peer_req->i.sector); 1004 put_ldev(mdev); 1005 } 1006 1007 di = peer_req->digest; 1008 1009 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1010 /* quick hack to try to avoid a race against reconfiguration. 1011 * a real fix would be much more involved, 1012 * introducing more locking mechanisms */ 1013 if (mdev->csums_tfm) { 1014 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 1015 D_ASSERT(digest_size == di->digest_size); 1016 digest = kmalloc(digest_size, GFP_NOIO); 1017 } 1018 if (digest) { 1019 drbd_csum_ee(mdev, mdev->csums_tfm, peer_req, digest); 1020 eq = !memcmp(digest, di->digest, digest_size); 1021 kfree(digest); 1022 } 1023 1024 if (eq) { 1025 drbd_set_in_sync(mdev, peer_req->i.sector, peer_req->i.size); 1026 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1027 mdev->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT; 1028 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, peer_req); 1029 } else { 1030 inc_rs_pending(mdev); 1031 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1032 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */ 1033 kfree(di); 1034 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, peer_req); 1035 } 1036 } else { 1037 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, peer_req); 1038 if (__ratelimit(&drbd_ratelimit_state)) 1039 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 1040 } 1041 1042 dec_unacked(mdev); 1043 move_to_net_ee_or_free(mdev, peer_req); 1044 1045 if (unlikely(!ok)) 1046 dev_err(DEV, "drbd_send_block/ack() failed\n"); 1047 return ok; 1048} 1049 1050/* TODO merge common code with w_e_send_csum */ 1051int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1052{ 1053 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1054 sector_t sector = peer_req->i.sector; 1055 unsigned int size = peer_req->i.size; 1056 int digest_size; 1057 void *digest; 1058 int ok = 1; 1059 1060 if (unlikely(cancel)) 1061 goto out; 1062 1063 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1064 digest = kmalloc(digest_size, GFP_NOIO); 1065 if (!digest) { 1066 ok = 0; /* terminate the connection in case the allocation failed */ 1067 goto out; 1068 } 1069 1070 if (likely(!(peer_req->flags & EE_WAS_ERROR))) 1071 drbd_csum_ee(mdev, mdev->verify_tfm, peer_req, digest); 1072 else 1073 memset(digest, 0, digest_size); 1074 1075 /* Free e and pages before send. 1076 * In case we block on congestion, we could otherwise run into 1077 * some distributed deadlock, if the other side blocks on 1078 * congestion as well, because our receiver blocks in 1079 * drbd_pp_alloc due to pp_in_use > max_buffers. */ 1080 drbd_free_ee(mdev, peer_req); 1081 peer_req = NULL; 1082 inc_rs_pending(mdev); 1083 ok = drbd_send_drequest_csum(mdev, sector, size, 1084 digest, digest_size, 1085 P_OV_REPLY); 1086 if (!ok) 1087 dec_rs_pending(mdev); 1088 kfree(digest); 1089 1090out: 1091 if (peer_req) 1092 drbd_free_ee(mdev, peer_req); 1093 dec_unacked(mdev); 1094 return ok; 1095} 1096 1097void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size) 1098{ 1099 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) { 1100 mdev->ov_last_oos_size += size>>9; 1101 } else { 1102 mdev->ov_last_oos_start = sector; 1103 mdev->ov_last_oos_size = size>>9; 1104 } 1105 drbd_set_out_of_sync(mdev, sector, size); 1106} 1107 1108int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1109{ 1110 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1111 struct digest_info *di; 1112 void *digest; 1113 sector_t sector = peer_req->i.sector; 1114 unsigned int size = peer_req->i.size; 1115 int digest_size; 1116 int ok, eq = 0; 1117 1118 if (unlikely(cancel)) { 1119 drbd_free_ee(mdev, peer_req); 1120 dec_unacked(mdev); 1121 return 1; 1122 } 1123 1124 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1125 * the resync lru has been cleaned up already */ 1126 if (get_ldev(mdev)) { 1127 drbd_rs_complete_io(mdev, peer_req->i.sector); 1128 put_ldev(mdev); 1129 } 1130 1131 di = peer_req->digest; 1132 1133 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1134 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1135 digest = kmalloc(digest_size, GFP_NOIO); 1136 if (digest) { 1137 drbd_csum_ee(mdev, mdev->verify_tfm, peer_req, digest); 1138 1139 D_ASSERT(digest_size == di->digest_size); 1140 eq = !memcmp(digest, di->digest, digest_size); 1141 kfree(digest); 1142 } 1143 } 1144 1145 /* Free e and pages before send. 1146 * In case we block on congestion, we could otherwise run into 1147 * some distributed deadlock, if the other side blocks on 1148 * congestion as well, because our receiver blocks in 1149 * drbd_pp_alloc due to pp_in_use > max_buffers. */ 1150 drbd_free_ee(mdev, peer_req); 1151 if (!eq) 1152 drbd_ov_oos_found(mdev, sector, size); 1153 else 1154 ov_oos_print(mdev); 1155 1156 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, sector, size, 1157 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1158 1159 dec_unacked(mdev); 1160 1161 --mdev->ov_left; 1162 1163 /* let's advance progress step marks only for every other megabyte */ 1164 if ((mdev->ov_left & 0x200) == 0x200) 1165 drbd_advance_rs_marks(mdev, mdev->ov_left); 1166 1167 if (mdev->ov_left == 0) { 1168 ov_oos_print(mdev); 1169 drbd_resync_finished(mdev); 1170 } 1171 1172 return ok; 1173} 1174 1175int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1176{ 1177 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w); 1178 complete(&b->done); 1179 return 1; 1180} 1181 1182int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1183{ 1184 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w); 1185 struct p_barrier *p = &mdev->tconn->data.sbuf.barrier; 1186 int ok = 1; 1187 1188 /* really avoid racing with tl_clear. w.cb may have been referenced 1189 * just before it was reassigned and re-queued, so double check that. 1190 * actually, this race was harmless, since we only try to send the 1191 * barrier packet here, and otherwise do nothing with the object. 1192 * but compare with the head of w_clear_epoch */ 1193 spin_lock_irq(&mdev->tconn->req_lock); 1194 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED) 1195 cancel = 1; 1196 spin_unlock_irq(&mdev->tconn->req_lock); 1197 if (cancel) 1198 return 1; 1199 1200 if (!drbd_get_data_sock(mdev)) 1201 return 0; 1202 p->barrier = b->br_number; 1203 /* inc_ap_pending was done where this was queued. 1204 * dec_ap_pending will be done in got_BarrierAck 1205 * or (on connection loss) in w_clear_epoch. */ 1206 ok = _drbd_send_cmd(mdev, mdev->tconn->data.socket, P_BARRIER, 1207 &p->head, sizeof(*p), 0); 1208 drbd_put_data_sock(mdev); 1209 1210 return ok; 1211} 1212 1213int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1214{ 1215 if (cancel) 1216 return 1; 1217 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE); 1218} 1219 1220int w_send_oos(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1221{ 1222 struct drbd_request *req = container_of(w, struct drbd_request, w); 1223 int ok; 1224 1225 if (unlikely(cancel)) { 1226 req_mod(req, SEND_CANCELED); 1227 return 1; 1228 } 1229 1230 ok = drbd_send_oos(mdev, req); 1231 req_mod(req, OOS_HANDED_TO_NETWORK); 1232 1233 return ok; 1234} 1235 1236/** 1237 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1238 * @mdev: DRBD device. 1239 * @w: work object. 1240 * @cancel: The connection will be closed anyways 1241 */ 1242int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1243{ 1244 struct drbd_request *req = container_of(w, struct drbd_request, w); 1245 int ok; 1246 1247 if (unlikely(cancel)) { 1248 req_mod(req, SEND_CANCELED); 1249 return 1; 1250 } 1251 1252 ok = drbd_send_dblock(mdev, req); 1253 req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED); 1254 1255 return ok; 1256} 1257 1258/** 1259 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1260 * @mdev: DRBD device. 1261 * @w: work object. 1262 * @cancel: The connection will be closed anyways 1263 */ 1264int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1265{ 1266 struct drbd_request *req = container_of(w, struct drbd_request, w); 1267 int ok; 1268 1269 if (unlikely(cancel)) { 1270 req_mod(req, SEND_CANCELED); 1271 return 1; 1272 } 1273 1274 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->i.sector, req->i.size, 1275 (unsigned long)req); 1276 1277 if (!ok) { 1278 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send(); 1279 * so this is probably redundant */ 1280 if (mdev->state.conn >= C_CONNECTED) 1281 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE)); 1282 } 1283 req_mod(req, ok ? HANDED_OVER_TO_NETWORK : SEND_FAILED); 1284 1285 return ok; 1286} 1287 1288int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1289{ 1290 struct drbd_request *req = container_of(w, struct drbd_request, w); 1291 1292 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1293 drbd_al_begin_io(mdev, req->i.sector); 1294 /* Calling drbd_al_begin_io() out of the worker might deadlocks 1295 theoretically. Practically it can not deadlock, since this is 1296 only used when unfreezing IOs. All the extents of the requests 1297 that made it into the TL are already active */ 1298 1299 drbd_req_make_private_bio(req, req->master_bio); 1300 req->private_bio->bi_bdev = mdev->ldev->backing_bdev; 1301 generic_make_request(req->private_bio); 1302 1303 return 1; 1304} 1305 1306static int _drbd_may_sync_now(struct drbd_conf *mdev) 1307{ 1308 struct drbd_conf *odev = mdev; 1309 1310 while (1) { 1311 if (odev->sync_conf.after == -1) 1312 return 1; 1313 odev = minor_to_mdev(odev->sync_conf.after); 1314 if (!expect(odev)) 1315 return 1; 1316 if ((odev->state.conn >= C_SYNC_SOURCE && 1317 odev->state.conn <= C_PAUSED_SYNC_T) || 1318 odev->state.aftr_isp || odev->state.peer_isp || 1319 odev->state.user_isp) 1320 return 0; 1321 } 1322} 1323 1324/** 1325 * _drbd_pause_after() - Pause resync on all devices that may not resync now 1326 * @mdev: DRBD device. 1327 * 1328 * Called from process context only (admin command and after_state_ch). 1329 */ 1330static int _drbd_pause_after(struct drbd_conf *mdev) 1331{ 1332 struct drbd_conf *odev; 1333 int i, rv = 0; 1334 1335 for (i = 0; i < minor_count; i++) { 1336 odev = minor_to_mdev(i); 1337 if (!odev) 1338 continue; 1339 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1340 continue; 1341 if (!_drbd_may_sync_now(odev)) 1342 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL) 1343 != SS_NOTHING_TO_DO); 1344 } 1345 1346 return rv; 1347} 1348 1349/** 1350 * _drbd_resume_next() - Resume resync on all devices that may resync now 1351 * @mdev: DRBD device. 1352 * 1353 * Called from process context only (admin command and worker). 1354 */ 1355static int _drbd_resume_next(struct drbd_conf *mdev) 1356{ 1357 struct drbd_conf *odev; 1358 int i, rv = 0; 1359 1360 for (i = 0; i < minor_count; i++) { 1361 odev = minor_to_mdev(i); 1362 if (!odev) 1363 continue; 1364 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1365 continue; 1366 if (odev->state.aftr_isp) { 1367 if (_drbd_may_sync_now(odev)) 1368 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0), 1369 CS_HARD, NULL) 1370 != SS_NOTHING_TO_DO) ; 1371 } 1372 } 1373 return rv; 1374} 1375 1376void resume_next_sg(struct drbd_conf *mdev) 1377{ 1378 write_lock_irq(&global_state_lock); 1379 _drbd_resume_next(mdev); 1380 write_unlock_irq(&global_state_lock); 1381} 1382 1383void suspend_other_sg(struct drbd_conf *mdev) 1384{ 1385 write_lock_irq(&global_state_lock); 1386 _drbd_pause_after(mdev); 1387 write_unlock_irq(&global_state_lock); 1388} 1389 1390static int sync_after_error(struct drbd_conf *mdev, int o_minor) 1391{ 1392 struct drbd_conf *odev; 1393 1394 if (o_minor == -1) 1395 return NO_ERROR; 1396 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL) 1397 return ERR_SYNC_AFTER; 1398 1399 /* check for loops */ 1400 odev = minor_to_mdev(o_minor); 1401 while (1) { 1402 if (odev == mdev) 1403 return ERR_SYNC_AFTER_CYCLE; 1404 1405 /* dependency chain ends here, no cycles. */ 1406 if (odev->sync_conf.after == -1) 1407 return NO_ERROR; 1408 1409 /* follow the dependency chain */ 1410 odev = minor_to_mdev(odev->sync_conf.after); 1411 } 1412} 1413 1414int drbd_alter_sa(struct drbd_conf *mdev, int na) 1415{ 1416 int changes; 1417 int retcode; 1418 1419 write_lock_irq(&global_state_lock); 1420 retcode = sync_after_error(mdev, na); 1421 if (retcode == NO_ERROR) { 1422 mdev->sync_conf.after = na; 1423 do { 1424 changes = _drbd_pause_after(mdev); 1425 changes |= _drbd_resume_next(mdev); 1426 } while (changes); 1427 } 1428 write_unlock_irq(&global_state_lock); 1429 return retcode; 1430} 1431 1432void drbd_rs_controller_reset(struct drbd_conf *mdev) 1433{ 1434 atomic_set(&mdev->rs_sect_in, 0); 1435 atomic_set(&mdev->rs_sect_ev, 0); 1436 mdev->rs_in_flight = 0; 1437 mdev->rs_planed = 0; 1438 spin_lock(&mdev->peer_seq_lock); 1439 fifo_set(&mdev->rs_plan_s, 0); 1440 spin_unlock(&mdev->peer_seq_lock); 1441} 1442 1443void start_resync_timer_fn(unsigned long data) 1444{ 1445 struct drbd_conf *mdev = (struct drbd_conf *) data; 1446 1447 drbd_queue_work(&mdev->tconn->data.work, &mdev->start_resync_work); 1448} 1449 1450int w_start_resync(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1451{ 1452 if (atomic_read(&mdev->unacked_cnt) || atomic_read(&mdev->rs_pending_cnt)) { 1453 dev_warn(DEV, "w_start_resync later...\n"); 1454 mdev->start_resync_timer.expires = jiffies + HZ/10; 1455 add_timer(&mdev->start_resync_timer); 1456 return 1; 1457 } 1458 1459 drbd_start_resync(mdev, C_SYNC_SOURCE); 1460 clear_bit(AHEAD_TO_SYNC_SOURCE, &mdev->current_epoch->flags); 1461 return 1; 1462} 1463 1464/** 1465 * drbd_start_resync() - Start the resync process 1466 * @mdev: DRBD device. 1467 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1468 * 1469 * This function might bring you directly into one of the 1470 * C_PAUSED_SYNC_* states. 1471 */ 1472void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side) 1473{ 1474 union drbd_state ns; 1475 int r; 1476 1477 if (mdev->state.conn >= C_SYNC_SOURCE && mdev->state.conn < C_AHEAD) { 1478 dev_err(DEV, "Resync already running!\n"); 1479 return; 1480 } 1481 1482 if (mdev->state.conn < C_AHEAD) { 1483 /* In case a previous resync run was aborted by an IO error/detach on the peer. */ 1484 drbd_rs_cancel_all(mdev); 1485 /* This should be done when we abort the resync. We definitely do not 1486 want to have this for connections going back and forth between 1487 Ahead/Behind and SyncSource/SyncTarget */ 1488 } 1489 1490 if (!test_bit(B_RS_H_DONE, &mdev->flags)) { 1491 if (side == C_SYNC_TARGET) { 1492 /* Since application IO was locked out during C_WF_BITMAP_T and 1493 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1494 we check that we might make the data inconsistent. */ 1495 r = drbd_khelper(mdev, "before-resync-target"); 1496 r = (r >> 8) & 0xff; 1497 if (r > 0) { 1498 dev_info(DEV, "before-resync-target handler returned %d, " 1499 "dropping connection.\n", r); 1500 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 1501 return; 1502 } 1503 } else /* C_SYNC_SOURCE */ { 1504 r = drbd_khelper(mdev, "before-resync-source"); 1505 r = (r >> 8) & 0xff; 1506 if (r > 0) { 1507 if (r == 3) { 1508 dev_info(DEV, "before-resync-source handler returned %d, " 1509 "ignoring. Old userland tools?", r); 1510 } else { 1511 dev_info(DEV, "before-resync-source handler returned %d, " 1512 "dropping connection.\n", r); 1513 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 1514 return; 1515 } 1516 } 1517 } 1518 } 1519 1520 if (current == mdev->tconn->worker.task) { 1521 /* The worker should not sleep waiting for drbd_state_lock(), 1522 that can take long */ 1523 if (test_and_set_bit(CLUSTER_ST_CHANGE, &mdev->flags)) { 1524 set_bit(B_RS_H_DONE, &mdev->flags); 1525 mdev->start_resync_timer.expires = jiffies + HZ/5; 1526 add_timer(&mdev->start_resync_timer); 1527 return; 1528 } 1529 } else { 1530 drbd_state_lock(mdev); 1531 } 1532 clear_bit(B_RS_H_DONE, &mdev->flags); 1533 1534 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) { 1535 drbd_state_unlock(mdev); 1536 return; 1537 } 1538 1539 write_lock_irq(&global_state_lock); 1540 ns = mdev->state; 1541 1542 ns.aftr_isp = !_drbd_may_sync_now(mdev); 1543 1544 ns.conn = side; 1545 1546 if (side == C_SYNC_TARGET) 1547 ns.disk = D_INCONSISTENT; 1548 else /* side == C_SYNC_SOURCE */ 1549 ns.pdsk = D_INCONSISTENT; 1550 1551 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 1552 ns = mdev->state; 1553 1554 if (ns.conn < C_CONNECTED) 1555 r = SS_UNKNOWN_ERROR; 1556 1557 if (r == SS_SUCCESS) { 1558 unsigned long tw = drbd_bm_total_weight(mdev); 1559 unsigned long now = jiffies; 1560 int i; 1561 1562 mdev->rs_failed = 0; 1563 mdev->rs_paused = 0; 1564 mdev->rs_same_csum = 0; 1565 mdev->rs_last_events = 0; 1566 mdev->rs_last_sect_ev = 0; 1567 mdev->rs_total = tw; 1568 mdev->rs_start = now; 1569 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1570 mdev->rs_mark_left[i] = tw; 1571 mdev->rs_mark_time[i] = now; 1572 } 1573 _drbd_pause_after(mdev); 1574 } 1575 write_unlock_irq(&global_state_lock); 1576 1577 if (r == SS_SUCCESS) { 1578 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1579 drbd_conn_str(ns.conn), 1580 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10), 1581 (unsigned long) mdev->rs_total); 1582 if (side == C_SYNC_TARGET) 1583 mdev->bm_resync_fo = 0; 1584 1585 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid 1586 * with w_send_oos, or the sync target will get confused as to 1587 * how much bits to resync. We cannot do that always, because for an 1588 * empty resync and protocol < 95, we need to do it here, as we call 1589 * drbd_resync_finished from here in that case. 1590 * We drbd_gen_and_send_sync_uuid here for protocol < 96, 1591 * and from after_state_ch otherwise. */ 1592 if (side == C_SYNC_SOURCE && mdev->tconn->agreed_pro_version < 96) 1593 drbd_gen_and_send_sync_uuid(mdev); 1594 1595 if (mdev->tconn->agreed_pro_version < 95 && mdev->rs_total == 0) { 1596 /* This still has a race (about when exactly the peers 1597 * detect connection loss) that can lead to a full sync 1598 * on next handshake. In 8.3.9 we fixed this with explicit 1599 * resync-finished notifications, but the fix 1600 * introduces a protocol change. Sleeping for some 1601 * time longer than the ping interval + timeout on the 1602 * SyncSource, to give the SyncTarget the chance to 1603 * detect connection loss, then waiting for a ping 1604 * response (implicit in drbd_resync_finished) reduces 1605 * the race considerably, but does not solve it. */ 1606 if (side == C_SYNC_SOURCE) 1607 schedule_timeout_interruptible( 1608 mdev->tconn->net_conf->ping_int * HZ + 1609 mdev->tconn->net_conf->ping_timeo*HZ/9); 1610 drbd_resync_finished(mdev); 1611 } 1612 1613 drbd_rs_controller_reset(mdev); 1614 /* ns.conn may already be != mdev->state.conn, 1615 * we may have been paused in between, or become paused until 1616 * the timer triggers. 1617 * No matter, that is handled in resync_timer_fn() */ 1618 if (ns.conn == C_SYNC_TARGET) 1619 mod_timer(&mdev->resync_timer, jiffies); 1620 1621 drbd_md_sync(mdev); 1622 } 1623 put_ldev(mdev); 1624 drbd_state_unlock(mdev); 1625} 1626 1627int drbd_worker(struct drbd_thread *thi) 1628{ 1629 struct drbd_conf *mdev = thi->mdev; 1630 struct drbd_work *w = NULL; 1631 LIST_HEAD(work_list); 1632 int intr = 0, i; 1633 1634 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev)); 1635 1636 while (get_t_state(thi) == RUNNING) { 1637 drbd_thread_current_set_cpu(mdev, thi); 1638 1639 if (down_trylock(&mdev->tconn->data.work.s)) { 1640 mutex_lock(&mdev->tconn->data.mutex); 1641 if (mdev->tconn->data.socket && !mdev->tconn->net_conf->no_cork) 1642 drbd_tcp_uncork(mdev->tconn->data.socket); 1643 mutex_unlock(&mdev->tconn->data.mutex); 1644 1645 intr = down_interruptible(&mdev->tconn->data.work.s); 1646 1647 mutex_lock(&mdev->tconn->data.mutex); 1648 if (mdev->tconn->data.socket && !mdev->tconn->net_conf->no_cork) 1649 drbd_tcp_cork(mdev->tconn->data.socket); 1650 mutex_unlock(&mdev->tconn->data.mutex); 1651 } 1652 1653 if (intr) { 1654 D_ASSERT(intr == -EINTR); 1655 flush_signals(current); 1656 if (!expect(get_t_state(thi) != RUNNING)) 1657 continue; 1658 break; 1659 } 1660 1661 if (get_t_state(thi) != RUNNING) 1662 break; 1663 /* With this break, we have done a down() but not consumed 1664 the entry from the list. The cleanup code takes care of 1665 this... */ 1666 1667 w = NULL; 1668 spin_lock_irq(&mdev->tconn->data.work.q_lock); 1669 if (!expect(!list_empty(&mdev->tconn->data.work.q))) { 1670 /* something terribly wrong in our logic. 1671 * we were able to down() the semaphore, 1672 * but the list is empty... doh. 1673 * 1674 * what is the best thing to do now? 1675 * try again from scratch, restarting the receiver, 1676 * asender, whatnot? could break even more ugly, 1677 * e.g. when we are primary, but no good local data. 1678 * 1679 * I'll try to get away just starting over this loop. 1680 */ 1681 spin_unlock_irq(&mdev->tconn->data.work.q_lock); 1682 continue; 1683 } 1684 w = list_entry(mdev->tconn->data.work.q.next, struct drbd_work, list); 1685 list_del_init(&w->list); 1686 spin_unlock_irq(&mdev->tconn->data.work.q_lock); 1687 1688 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) { 1689 /* dev_warn(DEV, "worker: a callback failed! \n"); */ 1690 if (mdev->state.conn >= C_CONNECTED) 1691 drbd_force_state(mdev, 1692 NS(conn, C_NETWORK_FAILURE)); 1693 } 1694 } 1695 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags)); 1696 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags)); 1697 1698 spin_lock_irq(&mdev->tconn->data.work.q_lock); 1699 i = 0; 1700 while (!list_empty(&mdev->tconn->data.work.q)) { 1701 list_splice_init(&mdev->tconn->data.work.q, &work_list); 1702 spin_unlock_irq(&mdev->tconn->data.work.q_lock); 1703 1704 while (!list_empty(&work_list)) { 1705 w = list_entry(work_list.next, struct drbd_work, list); 1706 list_del_init(&w->list); 1707 w->cb(mdev, w, 1); 1708 i++; /* dead debugging code */ 1709 } 1710 1711 spin_lock_irq(&mdev->tconn->data.work.q_lock); 1712 } 1713 sema_init(&mdev->tconn->data.work.s, 0); 1714 /* DANGEROUS race: if someone did queue his work within the spinlock, 1715 * but up() ed outside the spinlock, we could get an up() on the 1716 * semaphore without corresponding list entry. 1717 * So don't do that. 1718 */ 1719 spin_unlock_irq(&mdev->tconn->data.work.q_lock); 1720 1721 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE); 1722 /* _drbd_set_state only uses stop_nowait. 1723 * wait here for the exiting receiver. */ 1724 drbd_thread_stop(&mdev->tconn->receiver); 1725 drbd_mdev_cleanup(mdev); 1726 1727 dev_info(DEV, "worker terminated\n"); 1728 1729 clear_bit(DEVICE_DYING, &mdev->flags); 1730 clear_bit(CONFIG_PENDING, &mdev->flags); 1731 wake_up(&mdev->state_wait); 1732 1733 return 0; 1734} 1735