drbd_worker.c revision 5a75cc7cfbb98e896232902214432dae30653dfe
1/* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26#include <linux/module.h> 27#include <linux/drbd.h> 28#include <linux/sched.h> 29#include <linux/smp_lock.h> 30#include <linux/wait.h> 31#include <linux/mm.h> 32#include <linux/memcontrol.h> 33#include <linux/mm_inline.h> 34#include <linux/slab.h> 35#include <linux/random.h> 36#include <linux/string.h> 37#include <linux/scatterlist.h> 38 39#include "drbd_int.h" 40#include "drbd_req.h" 41 42static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel); 43 44 45 46/* defined here: 47 drbd_md_io_complete 48 drbd_endio_sec 49 drbd_endio_pri 50 51 * more endio handlers: 52 atodb_endio in drbd_actlog.c 53 drbd_bm_async_io_complete in drbd_bitmap.c 54 55 * For all these callbacks, note the following: 56 * The callbacks will be called in irq context by the IDE drivers, 57 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 58 * Try to get the locking right :) 59 * 60 */ 61 62 63/* About the global_state_lock 64 Each state transition on an device holds a read lock. In case we have 65 to evaluate the sync after dependencies, we grab a write lock, because 66 we need stable states on all devices for that. */ 67rwlock_t global_state_lock; 68 69/* used for synchronous meta data and bitmap IO 70 * submitted by drbd_md_sync_page_io() 71 */ 72void drbd_md_io_complete(struct bio *bio, int error) 73{ 74 struct drbd_md_io *md_io; 75 76 md_io = (struct drbd_md_io *)bio->bi_private; 77 md_io->error = error; 78 79 complete(&md_io->event); 80} 81 82/* reads on behalf of the partner, 83 * "submitted" by the receiver 84 */ 85void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local) 86{ 87 unsigned long flags = 0; 88 struct drbd_conf *mdev = e->mdev; 89 90 D_ASSERT(e->block_id != ID_VACANT); 91 92 spin_lock_irqsave(&mdev->req_lock, flags); 93 mdev->read_cnt += e->size >> 9; 94 list_del(&e->w.list); 95 if (list_empty(&mdev->read_ee)) 96 wake_up(&mdev->ee_wait); 97 if (test_bit(__EE_WAS_ERROR, &e->flags)) 98 __drbd_chk_io_error(mdev, FALSE); 99 spin_unlock_irqrestore(&mdev->req_lock, flags); 100 101 drbd_queue_work(&mdev->data.work, &e->w); 102 put_ldev(mdev); 103} 104 105static int is_failed_barrier(int ee_flags) 106{ 107 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED)) 108 == (EE_IS_BARRIER|EE_WAS_ERROR); 109} 110 111/* writes on behalf of the partner, or resync writes, 112 * "submitted" by the receiver, final stage. */ 113static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local) 114{ 115 unsigned long flags = 0; 116 struct drbd_conf *mdev = e->mdev; 117 sector_t e_sector; 118 int do_wake; 119 int is_syncer_req; 120 int do_al_complete_io; 121 122 /* if this is a failed barrier request, disable use of barriers, 123 * and schedule for resubmission */ 124 if (is_failed_barrier(e->flags)) { 125 drbd_bump_write_ordering(mdev, WO_bdev_flush); 126 spin_lock_irqsave(&mdev->req_lock, flags); 127 list_del(&e->w.list); 128 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED; 129 e->w.cb = w_e_reissue; 130 /* put_ldev actually happens below, once we come here again. */ 131 __release(local); 132 spin_unlock_irqrestore(&mdev->req_lock, flags); 133 drbd_queue_work(&mdev->data.work, &e->w); 134 return; 135 } 136 137 D_ASSERT(e->block_id != ID_VACANT); 138 139 /* after we moved e to done_ee, 140 * we may no longer access it, 141 * it may be freed/reused already! 142 * (as soon as we release the req_lock) */ 143 e_sector = e->sector; 144 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO; 145 is_syncer_req = is_syncer_block_id(e->block_id); 146 147 spin_lock_irqsave(&mdev->req_lock, flags); 148 mdev->writ_cnt += e->size >> 9; 149 list_del(&e->w.list); /* has been on active_ee or sync_ee */ 150 list_add_tail(&e->w.list, &mdev->done_ee); 151 152 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet, 153 * neither did we wake possibly waiting conflicting requests. 154 * done from "drbd_process_done_ee" within the appropriate w.cb 155 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */ 156 157 do_wake = is_syncer_req 158 ? list_empty(&mdev->sync_ee) 159 : list_empty(&mdev->active_ee); 160 161 if (test_bit(__EE_WAS_ERROR, &e->flags)) 162 __drbd_chk_io_error(mdev, FALSE); 163 spin_unlock_irqrestore(&mdev->req_lock, flags); 164 165 if (is_syncer_req) 166 drbd_rs_complete_io(mdev, e_sector); 167 168 if (do_wake) 169 wake_up(&mdev->ee_wait); 170 171 if (do_al_complete_io) 172 drbd_al_complete_io(mdev, e_sector); 173 174 wake_asender(mdev); 175 put_ldev(mdev); 176} 177 178/* writes on behalf of the partner, or resync writes, 179 * "submitted" by the receiver. 180 */ 181void drbd_endio_sec(struct bio *bio, int error) 182{ 183 struct drbd_epoch_entry *e = bio->bi_private; 184 struct drbd_conf *mdev = e->mdev; 185 int uptodate = bio_flagged(bio, BIO_UPTODATE); 186 int is_write = bio_data_dir(bio) == WRITE; 187 188 if (error) 189 dev_warn(DEV, "%s: error=%d s=%llus\n", 190 is_write ? "write" : "read", error, 191 (unsigned long long)e->sector); 192 if (!error && !uptodate) { 193 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n", 194 is_write ? "write" : "read", 195 (unsigned long long)e->sector); 196 /* strange behavior of some lower level drivers... 197 * fail the request by clearing the uptodate flag, 198 * but do not return any error?! */ 199 error = -EIO; 200 } 201 202 if (error) 203 set_bit(__EE_WAS_ERROR, &e->flags); 204 205 bio_put(bio); /* no need for the bio anymore */ 206 if (atomic_dec_and_test(&e->pending_bios)) { 207 if (is_write) 208 drbd_endio_write_sec_final(e); 209 else 210 drbd_endio_read_sec_final(e); 211 } 212} 213 214/* read, readA or write requests on R_PRIMARY coming from drbd_make_request 215 */ 216void drbd_endio_pri(struct bio *bio, int error) 217{ 218 struct drbd_request *req = bio->bi_private; 219 struct drbd_conf *mdev = req->mdev; 220 enum drbd_req_event what; 221 int uptodate = bio_flagged(bio, BIO_UPTODATE); 222 223 if (!error && !uptodate) { 224 dev_warn(DEV, "p %s: setting error to -EIO\n", 225 bio_data_dir(bio) == WRITE ? "write" : "read"); 226 /* strange behavior of some lower level drivers... 227 * fail the request by clearing the uptodate flag, 228 * but do not return any error?! */ 229 error = -EIO; 230 } 231 232 /* to avoid recursion in __req_mod */ 233 if (unlikely(error)) { 234 what = (bio_data_dir(bio) == WRITE) 235 ? write_completed_with_error 236 : (bio_rw(bio) == READ) 237 ? read_completed_with_error 238 : read_ahead_completed_with_error; 239 } else 240 what = completed_ok; 241 242 bio_put(req->private_bio); 243 req->private_bio = ERR_PTR(error); 244 245 req_mod(req, what); 246} 247 248int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 249{ 250 struct drbd_request *req = container_of(w, struct drbd_request, w); 251 252 /* We should not detach for read io-error, 253 * but try to WRITE the P_DATA_REPLY to the failed location, 254 * to give the disk the chance to relocate that block */ 255 256 spin_lock_irq(&mdev->req_lock); 257 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) { 258 _req_mod(req, read_retry_remote_canceled); 259 spin_unlock_irq(&mdev->req_lock); 260 return 1; 261 } 262 spin_unlock_irq(&mdev->req_lock); 263 264 return w_send_read_req(mdev, w, 0); 265} 266 267int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 268{ 269 ERR_IF(cancel) return 1; 270 dev_err(DEV, "resync inactive, but callback triggered??\n"); 271 return 1; /* Simply ignore this! */ 272} 273 274void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest) 275{ 276 struct hash_desc desc; 277 struct scatterlist sg; 278 struct page *page = e->pages; 279 struct page *tmp; 280 unsigned len; 281 282 desc.tfm = tfm; 283 desc.flags = 0; 284 285 sg_init_table(&sg, 1); 286 crypto_hash_init(&desc); 287 288 while ((tmp = page_chain_next(page))) { 289 /* all but the last page will be fully used */ 290 sg_set_page(&sg, page, PAGE_SIZE, 0); 291 crypto_hash_update(&desc, &sg, sg.length); 292 page = tmp; 293 } 294 /* and now the last, possibly only partially used page */ 295 len = e->size & (PAGE_SIZE - 1); 296 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 297 crypto_hash_update(&desc, &sg, sg.length); 298 crypto_hash_final(&desc, digest); 299} 300 301void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) 302{ 303 struct hash_desc desc; 304 struct scatterlist sg; 305 struct bio_vec *bvec; 306 int i; 307 308 desc.tfm = tfm; 309 desc.flags = 0; 310 311 sg_init_table(&sg, 1); 312 crypto_hash_init(&desc); 313 314 __bio_for_each_segment(bvec, bio, i, 0) { 315 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset); 316 crypto_hash_update(&desc, &sg, sg.length); 317 } 318 crypto_hash_final(&desc, digest); 319} 320 321static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 322{ 323 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 324 int digest_size; 325 void *digest; 326 int ok; 327 328 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef); 329 330 if (unlikely(cancel)) { 331 drbd_free_ee(mdev, e); 332 return 1; 333 } 334 335 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 336 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 337 digest = kmalloc(digest_size, GFP_NOIO); 338 if (digest) { 339 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); 340 341 inc_rs_pending(mdev); 342 ok = drbd_send_drequest_csum(mdev, 343 e->sector, 344 e->size, 345 digest, 346 digest_size, 347 P_CSUM_RS_REQUEST); 348 kfree(digest); 349 } else { 350 dev_err(DEV, "kmalloc() of digest failed.\n"); 351 ok = 0; 352 } 353 } else 354 ok = 1; 355 356 drbd_free_ee(mdev, e); 357 358 if (unlikely(!ok)) 359 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n"); 360 return ok; 361} 362 363#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 364 365static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size) 366{ 367 struct drbd_epoch_entry *e; 368 369 if (!get_ldev(mdev)) 370 return -EIO; 371 372 if (drbd_rs_should_slow_down(mdev)) 373 goto defer; 374 375 /* GFP_TRY, because if there is no memory available right now, this may 376 * be rescheduled for later. It is "only" background resync, after all. */ 377 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY); 378 if (!e) 379 goto defer; 380 381 e->w.cb = w_e_send_csum; 382 spin_lock_irq(&mdev->req_lock); 383 list_add(&e->w.list, &mdev->read_ee); 384 spin_unlock_irq(&mdev->req_lock); 385 386 atomic_add(size >> 9, &mdev->rs_sect_ev); 387 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0) 388 return 0; 389 390 drbd_free_ee(mdev, e); 391defer: 392 put_ldev(mdev); 393 return -EAGAIN; 394} 395 396void resync_timer_fn(unsigned long data) 397{ 398 struct drbd_conf *mdev = (struct drbd_conf *) data; 399 int queue; 400 401 queue = 1; 402 switch (mdev->state.conn) { 403 case C_VERIFY_S: 404 mdev->resync_work.cb = w_make_ov_request; 405 break; 406 case C_SYNC_TARGET: 407 mdev->resync_work.cb = w_make_resync_request; 408 break; 409 default: 410 queue = 0; 411 mdev->resync_work.cb = w_resync_inactive; 412 } 413 414 /* harmless race: list_empty outside data.work.q_lock */ 415 if (list_empty(&mdev->resync_work.list) && queue) 416 drbd_queue_work(&mdev->data.work, &mdev->resync_work); 417} 418 419static void fifo_set(struct fifo_buffer *fb, int value) 420{ 421 int i; 422 423 for (i = 0; i < fb->size; i++) 424 fb->values[i] += value; 425} 426 427static int fifo_push(struct fifo_buffer *fb, int value) 428{ 429 int ov; 430 431 ov = fb->values[fb->head_index]; 432 fb->values[fb->head_index++] = value; 433 434 if (fb->head_index >= fb->size) 435 fb->head_index = 0; 436 437 return ov; 438} 439 440static void fifo_add_val(struct fifo_buffer *fb, int value) 441{ 442 int i; 443 444 for (i = 0; i < fb->size; i++) 445 fb->values[i] += value; 446} 447 448int drbd_rs_controller(struct drbd_conf *mdev) 449{ 450 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 451 unsigned int want; /* The number of sectors we want in the proxy */ 452 int req_sect; /* Number of sectors to request in this turn */ 453 int correction; /* Number of sectors more we need in the proxy*/ 454 int cps; /* correction per invocation of drbd_rs_controller() */ 455 int steps; /* Number of time steps to plan ahead */ 456 int curr_corr; 457 int max_sect; 458 459 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */ 460 mdev->rs_in_flight -= sect_in; 461 462 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */ 463 464 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 465 466 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */ 467 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps; 468 } else { /* normal path */ 469 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target : 470 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10); 471 } 472 473 correction = want - mdev->rs_in_flight - mdev->rs_planed; 474 475 /* Plan ahead */ 476 cps = correction / steps; 477 fifo_add_val(&mdev->rs_plan_s, cps); 478 mdev->rs_planed += cps * steps; 479 480 /* What we do in this step */ 481 curr_corr = fifo_push(&mdev->rs_plan_s, 0); 482 spin_unlock(&mdev->peer_seq_lock); 483 mdev->rs_planed -= curr_corr; 484 485 req_sect = sect_in + curr_corr; 486 if (req_sect < 0) 487 req_sect = 0; 488 489 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ; 490 if (req_sect > max_sect) 491 req_sect = max_sect; 492 493 /* 494 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 495 sect_in, mdev->rs_in_flight, want, correction, 496 steps, cps, mdev->rs_planed, curr_corr, req_sect); 497 */ 498 499 return req_sect; 500} 501 502int w_make_resync_request(struct drbd_conf *mdev, 503 struct drbd_work *w, int cancel) 504{ 505 unsigned long bit; 506 sector_t sector; 507 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 508 int max_segment_size; 509 int number, rollback_i, size, pe, mx; 510 int align, queued, sndbuf; 511 int i = 0; 512 513 if (unlikely(cancel)) 514 return 1; 515 516 if (unlikely(mdev->state.conn < C_CONNECTED)) { 517 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected"); 518 return 0; 519 } 520 521 if (mdev->state.conn != C_SYNC_TARGET) 522 dev_err(DEV, "%s in w_make_resync_request\n", 523 drbd_conn_str(mdev->state.conn)); 524 525 if (!get_ldev(mdev)) { 526 /* Since we only need to access mdev->rsync a 527 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but 528 to continue resync with a broken disk makes no sense at 529 all */ 530 dev_err(DEV, "Disk broke down during resync!\n"); 531 mdev->resync_work.cb = w_resync_inactive; 532 return 1; 533 } 534 535 /* starting with drbd 8.3.8, we can handle multi-bio EEs, 536 * if it should be necessary */ 537 max_segment_size = 538 mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) : 539 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE; 540 541 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */ 542 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9); 543 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 544 } else { 545 mdev->c_sync_rate = mdev->sync_conf.rate; 546 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 547 } 548 549 /* Throttle resync on lower level disk activity, which may also be 550 * caused by application IO on Primary/SyncTarget. 551 * Keep this after the call to drbd_rs_controller, as that assumes 552 * to be called as precisely as possible every SLEEP_TIME, 553 * and would be confused otherwise. */ 554 if (drbd_rs_should_slow_down(mdev)) 555 goto requeue; 556 557 mutex_lock(&mdev->data.mutex); 558 if (mdev->data.socket) 559 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req); 560 else 561 mx = 1; 562 mutex_unlock(&mdev->data.mutex); 563 564 /* For resync rates >160MB/sec, allow more pending RS requests */ 565 if (number > mx) 566 mx = number; 567 568 /* Limit the number of pending RS requests to no more than the peer's receive buffer */ 569 pe = atomic_read(&mdev->rs_pending_cnt); 570 if ((pe + number) > mx) { 571 number = mx - pe; 572 } 573 574 for (i = 0; i < number; i++) { 575 /* Stop generating RS requests, when half of the send buffer is filled */ 576 mutex_lock(&mdev->data.mutex); 577 if (mdev->data.socket) { 578 queued = mdev->data.socket->sk->sk_wmem_queued; 579 sndbuf = mdev->data.socket->sk->sk_sndbuf; 580 } else { 581 queued = 1; 582 sndbuf = 0; 583 } 584 mutex_unlock(&mdev->data.mutex); 585 if (queued > sndbuf / 2) 586 goto requeue; 587 588next_sector: 589 size = BM_BLOCK_SIZE; 590 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo); 591 592 if (bit == -1UL) { 593 mdev->bm_resync_fo = drbd_bm_bits(mdev); 594 mdev->resync_work.cb = w_resync_inactive; 595 put_ldev(mdev); 596 return 1; 597 } 598 599 sector = BM_BIT_TO_SECT(bit); 600 601 if (drbd_try_rs_begin_io(mdev, sector)) { 602 mdev->bm_resync_fo = bit; 603 goto requeue; 604 } 605 mdev->bm_resync_fo = bit + 1; 606 607 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) { 608 drbd_rs_complete_io(mdev, sector); 609 goto next_sector; 610 } 611 612#if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE 613 /* try to find some adjacent bits. 614 * we stop if we have already the maximum req size. 615 * 616 * Additionally always align bigger requests, in order to 617 * be prepared for all stripe sizes of software RAIDs. 618 */ 619 align = 1; 620 rollback_i = i; 621 for (;;) { 622 if (size + BM_BLOCK_SIZE > max_segment_size) 623 break; 624 625 /* Be always aligned */ 626 if (sector & ((1<<(align+3))-1)) 627 break; 628 629 /* do not cross extent boundaries */ 630 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 631 break; 632 /* now, is it actually dirty, after all? 633 * caution, drbd_bm_test_bit is tri-state for some 634 * obscure reason; ( b == 0 ) would get the out-of-band 635 * only accidentally right because of the "oddly sized" 636 * adjustment below */ 637 if (drbd_bm_test_bit(mdev, bit+1) != 1) 638 break; 639 bit++; 640 size += BM_BLOCK_SIZE; 641 if ((BM_BLOCK_SIZE << align) <= size) 642 align++; 643 i++; 644 } 645 /* if we merged some, 646 * reset the offset to start the next drbd_bm_find_next from */ 647 if (size > BM_BLOCK_SIZE) 648 mdev->bm_resync_fo = bit + 1; 649#endif 650 651 /* adjust very last sectors, in case we are oddly sized */ 652 if (sector + (size>>9) > capacity) 653 size = (capacity-sector)<<9; 654 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) { 655 switch (read_for_csum(mdev, sector, size)) { 656 case -EIO: /* Disk failure */ 657 put_ldev(mdev); 658 return 0; 659 case -EAGAIN: /* allocation failed, or ldev busy */ 660 drbd_rs_complete_io(mdev, sector); 661 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector); 662 i = rollback_i; 663 goto requeue; 664 case 0: 665 /* everything ok */ 666 break; 667 default: 668 BUG(); 669 } 670 } else { 671 inc_rs_pending(mdev); 672 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST, 673 sector, size, ID_SYNCER)) { 674 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n"); 675 dec_rs_pending(mdev); 676 put_ldev(mdev); 677 return 0; 678 } 679 } 680 } 681 682 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) { 683 /* last syncer _request_ was sent, 684 * but the P_RS_DATA_REPLY not yet received. sync will end (and 685 * next sync group will resume), as soon as we receive the last 686 * resync data block, and the last bit is cleared. 687 * until then resync "work" is "inactive" ... 688 */ 689 mdev->resync_work.cb = w_resync_inactive; 690 put_ldev(mdev); 691 return 1; 692 } 693 694 requeue: 695 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 696 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 697 put_ldev(mdev); 698 return 1; 699} 700 701static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 702{ 703 int number, i, size; 704 sector_t sector; 705 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 706 707 if (unlikely(cancel)) 708 return 1; 709 710 if (unlikely(mdev->state.conn < C_CONNECTED)) { 711 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected"); 712 return 0; 713 } 714 715 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ); 716 if (atomic_read(&mdev->rs_pending_cnt) > number) 717 goto requeue; 718 719 number -= atomic_read(&mdev->rs_pending_cnt); 720 721 sector = mdev->ov_position; 722 for (i = 0; i < number; i++) { 723 if (sector >= capacity) { 724 mdev->resync_work.cb = w_resync_inactive; 725 return 1; 726 } 727 728 size = BM_BLOCK_SIZE; 729 730 if (drbd_try_rs_begin_io(mdev, sector)) { 731 mdev->ov_position = sector; 732 goto requeue; 733 } 734 735 if (sector + (size>>9) > capacity) 736 size = (capacity-sector)<<9; 737 738 inc_rs_pending(mdev); 739 if (!drbd_send_ov_request(mdev, sector, size)) { 740 dec_rs_pending(mdev); 741 return 0; 742 } 743 sector += BM_SECT_PER_BIT; 744 } 745 mdev->ov_position = sector; 746 747 requeue: 748 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 749 return 1; 750} 751 752 753int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 754{ 755 kfree(w); 756 ov_oos_print(mdev); 757 drbd_resync_finished(mdev); 758 759 return 1; 760} 761 762static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 763{ 764 kfree(w); 765 766 drbd_resync_finished(mdev); 767 768 return 1; 769} 770 771int drbd_resync_finished(struct drbd_conf *mdev) 772{ 773 unsigned long db, dt, dbdt; 774 unsigned long n_oos; 775 union drbd_state os, ns; 776 struct drbd_work *w; 777 char *khelper_cmd = NULL; 778 779 /* Remove all elements from the resync LRU. Since future actions 780 * might set bits in the (main) bitmap, then the entries in the 781 * resync LRU would be wrong. */ 782 if (drbd_rs_del_all(mdev)) { 783 /* In case this is not possible now, most probably because 784 * there are P_RS_DATA_REPLY Packets lingering on the worker's 785 * queue (or even the read operations for those packets 786 * is not finished by now). Retry in 100ms. */ 787 788 drbd_kick_lo(mdev); 789 __set_current_state(TASK_INTERRUPTIBLE); 790 schedule_timeout(HZ / 10); 791 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC); 792 if (w) { 793 w->cb = w_resync_finished; 794 drbd_queue_work(&mdev->data.work, w); 795 return 1; 796 } 797 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n"); 798 } 799 800 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ; 801 if (dt <= 0) 802 dt = 1; 803 db = mdev->rs_total; 804 dbdt = Bit2KB(db/dt); 805 mdev->rs_paused /= HZ; 806 807 if (!get_ldev(mdev)) 808 goto out; 809 810 spin_lock_irq(&mdev->req_lock); 811 os = mdev->state; 812 813 /* This protects us against multiple calls (that can happen in the presence 814 of application IO), and against connectivity loss just before we arrive here. */ 815 if (os.conn <= C_CONNECTED) 816 goto out_unlock; 817 818 ns = os; 819 ns.conn = C_CONNECTED; 820 821 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 822 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ? 823 "Online verify " : "Resync", 824 dt + mdev->rs_paused, mdev->rs_paused, dbdt); 825 826 n_oos = drbd_bm_total_weight(mdev); 827 828 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 829 if (n_oos) { 830 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n", 831 n_oos, Bit2KB(1)); 832 khelper_cmd = "out-of-sync"; 833 } 834 } else { 835 D_ASSERT((n_oos - mdev->rs_failed) == 0); 836 837 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 838 khelper_cmd = "after-resync-target"; 839 840 if (mdev->csums_tfm && mdev->rs_total) { 841 const unsigned long s = mdev->rs_same_csum; 842 const unsigned long t = mdev->rs_total; 843 const int ratio = 844 (t == 0) ? 0 : 845 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 846 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; " 847 "transferred %luK total %luK\n", 848 ratio, 849 Bit2KB(mdev->rs_same_csum), 850 Bit2KB(mdev->rs_total - mdev->rs_same_csum), 851 Bit2KB(mdev->rs_total)); 852 } 853 } 854 855 if (mdev->rs_failed) { 856 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed); 857 858 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 859 ns.disk = D_INCONSISTENT; 860 ns.pdsk = D_UP_TO_DATE; 861 } else { 862 ns.disk = D_UP_TO_DATE; 863 ns.pdsk = D_INCONSISTENT; 864 } 865 } else { 866 ns.disk = D_UP_TO_DATE; 867 ns.pdsk = D_UP_TO_DATE; 868 869 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 870 if (mdev->p_uuid) { 871 int i; 872 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 873 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]); 874 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]); 875 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]); 876 } else { 877 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n"); 878 } 879 } 880 881 drbd_uuid_set_bm(mdev, 0UL); 882 883 if (mdev->p_uuid) { 884 /* Now the two UUID sets are equal, update what we 885 * know of the peer. */ 886 int i; 887 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 888 mdev->p_uuid[i] = mdev->ldev->md.uuid[i]; 889 } 890 } 891 892 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 893out_unlock: 894 spin_unlock_irq(&mdev->req_lock); 895 put_ldev(mdev); 896out: 897 mdev->rs_total = 0; 898 mdev->rs_failed = 0; 899 mdev->rs_paused = 0; 900 mdev->ov_start_sector = 0; 901 902 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) { 903 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n"); 904 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished"); 905 } 906 907 if (khelper_cmd) 908 drbd_khelper(mdev, khelper_cmd); 909 910 return 1; 911} 912 913/* helper */ 914static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e) 915{ 916 if (drbd_ee_has_active_page(e)) { 917 /* This might happen if sendpage() has not finished */ 918 int i = DIV_ROUND_UP(e->size, PAGE_SIZE); 919 atomic_add(i, &mdev->pp_in_use_by_net); 920 atomic_sub(i, &mdev->pp_in_use); 921 spin_lock_irq(&mdev->req_lock); 922 list_add_tail(&e->w.list, &mdev->net_ee); 923 spin_unlock_irq(&mdev->req_lock); 924 wake_up(&drbd_pp_wait); 925 } else 926 drbd_free_ee(mdev, e); 927} 928 929/** 930 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 931 * @mdev: DRBD device. 932 * @w: work object. 933 * @cancel: The connection will be closed anyways 934 */ 935int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 936{ 937 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 938 int ok; 939 940 if (unlikely(cancel)) { 941 drbd_free_ee(mdev, e); 942 dec_unacked(mdev); 943 return 1; 944 } 945 946 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 947 ok = drbd_send_block(mdev, P_DATA_REPLY, e); 948 } else { 949 if (__ratelimit(&drbd_ratelimit_state)) 950 dev_err(DEV, "Sending NegDReply. sector=%llus.\n", 951 (unsigned long long)e->sector); 952 953 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e); 954 } 955 956 dec_unacked(mdev); 957 958 move_to_net_ee_or_free(mdev, e); 959 960 if (unlikely(!ok)) 961 dev_err(DEV, "drbd_send_block() failed\n"); 962 return ok; 963} 964 965/** 966 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS 967 * @mdev: DRBD device. 968 * @w: work object. 969 * @cancel: The connection will be closed anyways 970 */ 971int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 972{ 973 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 974 int ok; 975 976 if (unlikely(cancel)) { 977 drbd_free_ee(mdev, e); 978 dec_unacked(mdev); 979 return 1; 980 } 981 982 if (get_ldev_if_state(mdev, D_FAILED)) { 983 drbd_rs_complete_io(mdev, e->sector); 984 put_ldev(mdev); 985 } 986 987 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 988 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) { 989 inc_rs_pending(mdev); 990 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 991 } else { 992 if (__ratelimit(&drbd_ratelimit_state)) 993 dev_err(DEV, "Not sending RSDataReply, " 994 "partner DISKLESS!\n"); 995 ok = 1; 996 } 997 } else { 998 if (__ratelimit(&drbd_ratelimit_state)) 999 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n", 1000 (unsigned long long)e->sector); 1001 1002 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 1003 1004 /* update resync data with failure */ 1005 drbd_rs_failed_io(mdev, e->sector, e->size); 1006 } 1007 1008 dec_unacked(mdev); 1009 1010 move_to_net_ee_or_free(mdev, e); 1011 1012 if (unlikely(!ok)) 1013 dev_err(DEV, "drbd_send_block() failed\n"); 1014 return ok; 1015} 1016 1017int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1018{ 1019 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1020 struct digest_info *di; 1021 int digest_size; 1022 void *digest = NULL; 1023 int ok, eq = 0; 1024 1025 if (unlikely(cancel)) { 1026 drbd_free_ee(mdev, e); 1027 dec_unacked(mdev); 1028 return 1; 1029 } 1030 1031 if (get_ldev(mdev)) { 1032 drbd_rs_complete_io(mdev, e->sector); 1033 put_ldev(mdev); 1034 } 1035 1036 di = e->digest; 1037 1038 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 1039 /* quick hack to try to avoid a race against reconfiguration. 1040 * a real fix would be much more involved, 1041 * introducing more locking mechanisms */ 1042 if (mdev->csums_tfm) { 1043 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 1044 D_ASSERT(digest_size == di->digest_size); 1045 digest = kmalloc(digest_size, GFP_NOIO); 1046 } 1047 if (digest) { 1048 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); 1049 eq = !memcmp(digest, di->digest, digest_size); 1050 kfree(digest); 1051 } 1052 1053 if (eq) { 1054 drbd_set_in_sync(mdev, e->sector, e->size); 1055 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1056 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT; 1057 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e); 1058 } else { 1059 inc_rs_pending(mdev); 1060 e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1061 e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */ 1062 kfree(di); 1063 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 1064 } 1065 } else { 1066 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 1067 if (__ratelimit(&drbd_ratelimit_state)) 1068 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 1069 } 1070 1071 dec_unacked(mdev); 1072 move_to_net_ee_or_free(mdev, e); 1073 1074 if (unlikely(!ok)) 1075 dev_err(DEV, "drbd_send_block/ack() failed\n"); 1076 return ok; 1077} 1078 1079int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1080{ 1081 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1082 int digest_size; 1083 void *digest; 1084 int ok = 1; 1085 1086 if (unlikely(cancel)) 1087 goto out; 1088 1089 if (unlikely((e->flags & EE_WAS_ERROR) != 0)) 1090 goto out; 1091 1092 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1093 /* FIXME if this allocation fails, online verify will not terminate! */ 1094 digest = kmalloc(digest_size, GFP_NOIO); 1095 if (digest) { 1096 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); 1097 inc_rs_pending(mdev); 1098 ok = drbd_send_drequest_csum(mdev, e->sector, e->size, 1099 digest, digest_size, P_OV_REPLY); 1100 if (!ok) 1101 dec_rs_pending(mdev); 1102 kfree(digest); 1103 } 1104 1105out: 1106 drbd_free_ee(mdev, e); 1107 1108 dec_unacked(mdev); 1109 1110 return ok; 1111} 1112 1113void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size) 1114{ 1115 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) { 1116 mdev->ov_last_oos_size += size>>9; 1117 } else { 1118 mdev->ov_last_oos_start = sector; 1119 mdev->ov_last_oos_size = size>>9; 1120 } 1121 drbd_set_out_of_sync(mdev, sector, size); 1122 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags); 1123} 1124 1125int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1126{ 1127 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1128 struct digest_info *di; 1129 int digest_size; 1130 void *digest; 1131 int ok, eq = 0; 1132 1133 if (unlikely(cancel)) { 1134 drbd_free_ee(mdev, e); 1135 dec_unacked(mdev); 1136 return 1; 1137 } 1138 1139 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1140 * the resync lru has been cleaned up already */ 1141 if (get_ldev(mdev)) { 1142 drbd_rs_complete_io(mdev, e->sector); 1143 put_ldev(mdev); 1144 } 1145 1146 di = e->digest; 1147 1148 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 1149 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1150 digest = kmalloc(digest_size, GFP_NOIO); 1151 if (digest) { 1152 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); 1153 1154 D_ASSERT(digest_size == di->digest_size); 1155 eq = !memcmp(digest, di->digest, digest_size); 1156 kfree(digest); 1157 } 1158 } else { 1159 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 1160 if (__ratelimit(&drbd_ratelimit_state)) 1161 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 1162 } 1163 1164 dec_unacked(mdev); 1165 if (!eq) 1166 drbd_ov_oos_found(mdev, e->sector, e->size); 1167 else 1168 ov_oos_print(mdev); 1169 1170 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size, 1171 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1172 1173 drbd_free_ee(mdev, e); 1174 1175 if (--mdev->ov_left == 0) { 1176 ov_oos_print(mdev); 1177 drbd_resync_finished(mdev); 1178 } 1179 1180 return ok; 1181} 1182 1183int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1184{ 1185 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w); 1186 complete(&b->done); 1187 return 1; 1188} 1189 1190int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1191{ 1192 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w); 1193 struct p_barrier *p = &mdev->data.sbuf.barrier; 1194 int ok = 1; 1195 1196 /* really avoid racing with tl_clear. w.cb may have been referenced 1197 * just before it was reassigned and re-queued, so double check that. 1198 * actually, this race was harmless, since we only try to send the 1199 * barrier packet here, and otherwise do nothing with the object. 1200 * but compare with the head of w_clear_epoch */ 1201 spin_lock_irq(&mdev->req_lock); 1202 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED) 1203 cancel = 1; 1204 spin_unlock_irq(&mdev->req_lock); 1205 if (cancel) 1206 return 1; 1207 1208 if (!drbd_get_data_sock(mdev)) 1209 return 0; 1210 p->barrier = b->br_number; 1211 /* inc_ap_pending was done where this was queued. 1212 * dec_ap_pending will be done in got_BarrierAck 1213 * or (on connection loss) in w_clear_epoch. */ 1214 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER, 1215 (struct p_header80 *)p, sizeof(*p), 0); 1216 drbd_put_data_sock(mdev); 1217 1218 return ok; 1219} 1220 1221int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1222{ 1223 if (cancel) 1224 return 1; 1225 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE); 1226} 1227 1228/** 1229 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1230 * @mdev: DRBD device. 1231 * @w: work object. 1232 * @cancel: The connection will be closed anyways 1233 */ 1234int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1235{ 1236 struct drbd_request *req = container_of(w, struct drbd_request, w); 1237 int ok; 1238 1239 if (unlikely(cancel)) { 1240 req_mod(req, send_canceled); 1241 return 1; 1242 } 1243 1244 ok = drbd_send_dblock(mdev, req); 1245 req_mod(req, ok ? handed_over_to_network : send_failed); 1246 1247 return ok; 1248} 1249 1250/** 1251 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1252 * @mdev: DRBD device. 1253 * @w: work object. 1254 * @cancel: The connection will be closed anyways 1255 */ 1256int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1257{ 1258 struct drbd_request *req = container_of(w, struct drbd_request, w); 1259 int ok; 1260 1261 if (unlikely(cancel)) { 1262 req_mod(req, send_canceled); 1263 return 1; 1264 } 1265 1266 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size, 1267 (unsigned long)req); 1268 1269 if (!ok) { 1270 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send(); 1271 * so this is probably redundant */ 1272 if (mdev->state.conn >= C_CONNECTED) 1273 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE)); 1274 } 1275 req_mod(req, ok ? handed_over_to_network : send_failed); 1276 1277 return ok; 1278} 1279 1280int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1281{ 1282 struct drbd_request *req = container_of(w, struct drbd_request, w); 1283 1284 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1285 drbd_al_begin_io(mdev, req->sector); 1286 /* Calling drbd_al_begin_io() out of the worker might deadlocks 1287 theoretically. Practically it can not deadlock, since this is 1288 only used when unfreezing IOs. All the extents of the requests 1289 that made it into the TL are already active */ 1290 1291 drbd_req_make_private_bio(req, req->master_bio); 1292 req->private_bio->bi_bdev = mdev->ldev->backing_bdev; 1293 generic_make_request(req->private_bio); 1294 1295 return 1; 1296} 1297 1298static int _drbd_may_sync_now(struct drbd_conf *mdev) 1299{ 1300 struct drbd_conf *odev = mdev; 1301 1302 while (1) { 1303 if (odev->sync_conf.after == -1) 1304 return 1; 1305 odev = minor_to_mdev(odev->sync_conf.after); 1306 ERR_IF(!odev) return 1; 1307 if ((odev->state.conn >= C_SYNC_SOURCE && 1308 odev->state.conn <= C_PAUSED_SYNC_T) || 1309 odev->state.aftr_isp || odev->state.peer_isp || 1310 odev->state.user_isp) 1311 return 0; 1312 } 1313} 1314 1315/** 1316 * _drbd_pause_after() - Pause resync on all devices that may not resync now 1317 * @mdev: DRBD device. 1318 * 1319 * Called from process context only (admin command and after_state_ch). 1320 */ 1321static int _drbd_pause_after(struct drbd_conf *mdev) 1322{ 1323 struct drbd_conf *odev; 1324 int i, rv = 0; 1325 1326 for (i = 0; i < minor_count; i++) { 1327 odev = minor_to_mdev(i); 1328 if (!odev) 1329 continue; 1330 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1331 continue; 1332 if (!_drbd_may_sync_now(odev)) 1333 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL) 1334 != SS_NOTHING_TO_DO); 1335 } 1336 1337 return rv; 1338} 1339 1340/** 1341 * _drbd_resume_next() - Resume resync on all devices that may resync now 1342 * @mdev: DRBD device. 1343 * 1344 * Called from process context only (admin command and worker). 1345 */ 1346static int _drbd_resume_next(struct drbd_conf *mdev) 1347{ 1348 struct drbd_conf *odev; 1349 int i, rv = 0; 1350 1351 for (i = 0; i < minor_count; i++) { 1352 odev = minor_to_mdev(i); 1353 if (!odev) 1354 continue; 1355 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1356 continue; 1357 if (odev->state.aftr_isp) { 1358 if (_drbd_may_sync_now(odev)) 1359 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0), 1360 CS_HARD, NULL) 1361 != SS_NOTHING_TO_DO) ; 1362 } 1363 } 1364 return rv; 1365} 1366 1367void resume_next_sg(struct drbd_conf *mdev) 1368{ 1369 write_lock_irq(&global_state_lock); 1370 _drbd_resume_next(mdev); 1371 write_unlock_irq(&global_state_lock); 1372} 1373 1374void suspend_other_sg(struct drbd_conf *mdev) 1375{ 1376 write_lock_irq(&global_state_lock); 1377 _drbd_pause_after(mdev); 1378 write_unlock_irq(&global_state_lock); 1379} 1380 1381static int sync_after_error(struct drbd_conf *mdev, int o_minor) 1382{ 1383 struct drbd_conf *odev; 1384 1385 if (o_minor == -1) 1386 return NO_ERROR; 1387 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL) 1388 return ERR_SYNC_AFTER; 1389 1390 /* check for loops */ 1391 odev = minor_to_mdev(o_minor); 1392 while (1) { 1393 if (odev == mdev) 1394 return ERR_SYNC_AFTER_CYCLE; 1395 1396 /* dependency chain ends here, no cycles. */ 1397 if (odev->sync_conf.after == -1) 1398 return NO_ERROR; 1399 1400 /* follow the dependency chain */ 1401 odev = minor_to_mdev(odev->sync_conf.after); 1402 } 1403} 1404 1405int drbd_alter_sa(struct drbd_conf *mdev, int na) 1406{ 1407 int changes; 1408 int retcode; 1409 1410 write_lock_irq(&global_state_lock); 1411 retcode = sync_after_error(mdev, na); 1412 if (retcode == NO_ERROR) { 1413 mdev->sync_conf.after = na; 1414 do { 1415 changes = _drbd_pause_after(mdev); 1416 changes |= _drbd_resume_next(mdev); 1417 } while (changes); 1418 } 1419 write_unlock_irq(&global_state_lock); 1420 return retcode; 1421} 1422 1423static void ping_peer(struct drbd_conf *mdev) 1424{ 1425 clear_bit(GOT_PING_ACK, &mdev->flags); 1426 request_ping(mdev); 1427 wait_event(mdev->misc_wait, 1428 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED); 1429} 1430 1431/** 1432 * drbd_start_resync() - Start the resync process 1433 * @mdev: DRBD device. 1434 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1435 * 1436 * This function might bring you directly into one of the 1437 * C_PAUSED_SYNC_* states. 1438 */ 1439void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side) 1440{ 1441 union drbd_state ns; 1442 int r; 1443 1444 if (mdev->state.conn >= C_SYNC_SOURCE) { 1445 dev_err(DEV, "Resync already running!\n"); 1446 return; 1447 } 1448 1449 /* In case a previous resync run was aborted by an IO error/detach on the peer. */ 1450 drbd_rs_cancel_all(mdev); 1451 1452 if (side == C_SYNC_TARGET) { 1453 /* Since application IO was locked out during C_WF_BITMAP_T and 1454 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1455 we check that we might make the data inconsistent. */ 1456 r = drbd_khelper(mdev, "before-resync-target"); 1457 r = (r >> 8) & 0xff; 1458 if (r > 0) { 1459 dev_info(DEV, "before-resync-target handler returned %d, " 1460 "dropping connection.\n", r); 1461 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 1462 return; 1463 } 1464 } 1465 1466 drbd_state_lock(mdev); 1467 1468 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) { 1469 drbd_state_unlock(mdev); 1470 return; 1471 } 1472 1473 if (side == C_SYNC_TARGET) { 1474 mdev->bm_resync_fo = 0; 1475 } else /* side == C_SYNC_SOURCE */ { 1476 u64 uuid; 1477 1478 get_random_bytes(&uuid, sizeof(u64)); 1479 drbd_uuid_set(mdev, UI_BITMAP, uuid); 1480 drbd_send_sync_uuid(mdev, uuid); 1481 1482 D_ASSERT(mdev->state.disk == D_UP_TO_DATE); 1483 } 1484 1485 write_lock_irq(&global_state_lock); 1486 ns = mdev->state; 1487 1488 ns.aftr_isp = !_drbd_may_sync_now(mdev); 1489 1490 ns.conn = side; 1491 1492 if (side == C_SYNC_TARGET) 1493 ns.disk = D_INCONSISTENT; 1494 else /* side == C_SYNC_SOURCE */ 1495 ns.pdsk = D_INCONSISTENT; 1496 1497 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 1498 ns = mdev->state; 1499 1500 if (ns.conn < C_CONNECTED) 1501 r = SS_UNKNOWN_ERROR; 1502 1503 if (r == SS_SUCCESS) { 1504 unsigned long tw = drbd_bm_total_weight(mdev); 1505 unsigned long now = jiffies; 1506 int i; 1507 1508 mdev->rs_failed = 0; 1509 mdev->rs_paused = 0; 1510 mdev->rs_same_csum = 0; 1511 mdev->rs_last_events = 0; 1512 mdev->rs_last_sect_ev = 0; 1513 mdev->rs_total = tw; 1514 mdev->rs_start = now; 1515 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1516 mdev->rs_mark_left[i] = tw; 1517 mdev->rs_mark_time[i] = now; 1518 } 1519 _drbd_pause_after(mdev); 1520 } 1521 write_unlock_irq(&global_state_lock); 1522 put_ldev(mdev); 1523 1524 if (r == SS_SUCCESS) { 1525 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1526 drbd_conn_str(ns.conn), 1527 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10), 1528 (unsigned long) mdev->rs_total); 1529 1530 if (mdev->rs_total == 0) { 1531 /* Peer still reachable? Beware of failing before-resync-target handlers! */ 1532 ping_peer(mdev); 1533 drbd_resync_finished(mdev); 1534 } 1535 1536 atomic_set(&mdev->rs_sect_in, 0); 1537 atomic_set(&mdev->rs_sect_ev, 0); 1538 mdev->rs_in_flight = 0; 1539 mdev->rs_planed = 0; 1540 spin_lock(&mdev->peer_seq_lock); 1541 fifo_set(&mdev->rs_plan_s, 0); 1542 spin_unlock(&mdev->peer_seq_lock); 1543 /* ns.conn may already be != mdev->state.conn, 1544 * we may have been paused in between, or become paused until 1545 * the timer triggers. 1546 * No matter, that is handled in resync_timer_fn() */ 1547 if (ns.conn == C_SYNC_TARGET) 1548 mod_timer(&mdev->resync_timer, jiffies); 1549 1550 drbd_md_sync(mdev); 1551 } 1552 drbd_state_unlock(mdev); 1553} 1554 1555int drbd_worker(struct drbd_thread *thi) 1556{ 1557 struct drbd_conf *mdev = thi->mdev; 1558 struct drbd_work *w = NULL; 1559 LIST_HEAD(work_list); 1560 int intr = 0, i; 1561 1562 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev)); 1563 1564 while (get_t_state(thi) == Running) { 1565 drbd_thread_current_set_cpu(mdev); 1566 1567 if (down_trylock(&mdev->data.work.s)) { 1568 mutex_lock(&mdev->data.mutex); 1569 if (mdev->data.socket && !mdev->net_conf->no_cork) 1570 drbd_tcp_uncork(mdev->data.socket); 1571 mutex_unlock(&mdev->data.mutex); 1572 1573 intr = down_interruptible(&mdev->data.work.s); 1574 1575 mutex_lock(&mdev->data.mutex); 1576 if (mdev->data.socket && !mdev->net_conf->no_cork) 1577 drbd_tcp_cork(mdev->data.socket); 1578 mutex_unlock(&mdev->data.mutex); 1579 } 1580 1581 if (intr) { 1582 D_ASSERT(intr == -EINTR); 1583 flush_signals(current); 1584 ERR_IF (get_t_state(thi) == Running) 1585 continue; 1586 break; 1587 } 1588 1589 if (get_t_state(thi) != Running) 1590 break; 1591 /* With this break, we have done a down() but not consumed 1592 the entry from the list. The cleanup code takes care of 1593 this... */ 1594 1595 w = NULL; 1596 spin_lock_irq(&mdev->data.work.q_lock); 1597 ERR_IF(list_empty(&mdev->data.work.q)) { 1598 /* something terribly wrong in our logic. 1599 * we were able to down() the semaphore, 1600 * but the list is empty... doh. 1601 * 1602 * what is the best thing to do now? 1603 * try again from scratch, restarting the receiver, 1604 * asender, whatnot? could break even more ugly, 1605 * e.g. when we are primary, but no good local data. 1606 * 1607 * I'll try to get away just starting over this loop. 1608 */ 1609 spin_unlock_irq(&mdev->data.work.q_lock); 1610 continue; 1611 } 1612 w = list_entry(mdev->data.work.q.next, struct drbd_work, list); 1613 list_del_init(&w->list); 1614 spin_unlock_irq(&mdev->data.work.q_lock); 1615 1616 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) { 1617 /* dev_warn(DEV, "worker: a callback failed! \n"); */ 1618 if (mdev->state.conn >= C_CONNECTED) 1619 drbd_force_state(mdev, 1620 NS(conn, C_NETWORK_FAILURE)); 1621 } 1622 } 1623 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags)); 1624 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags)); 1625 1626 spin_lock_irq(&mdev->data.work.q_lock); 1627 i = 0; 1628 while (!list_empty(&mdev->data.work.q)) { 1629 list_splice_init(&mdev->data.work.q, &work_list); 1630 spin_unlock_irq(&mdev->data.work.q_lock); 1631 1632 while (!list_empty(&work_list)) { 1633 w = list_entry(work_list.next, struct drbd_work, list); 1634 list_del_init(&w->list); 1635 w->cb(mdev, w, 1); 1636 i++; /* dead debugging code */ 1637 } 1638 1639 spin_lock_irq(&mdev->data.work.q_lock); 1640 } 1641 sema_init(&mdev->data.work.s, 0); 1642 /* DANGEROUS race: if someone did queue his work within the spinlock, 1643 * but up() ed outside the spinlock, we could get an up() on the 1644 * semaphore without corresponding list entry. 1645 * So don't do that. 1646 */ 1647 spin_unlock_irq(&mdev->data.work.q_lock); 1648 1649 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE); 1650 /* _drbd_set_state only uses stop_nowait. 1651 * wait here for the Exiting receiver. */ 1652 drbd_thread_stop(&mdev->receiver); 1653 drbd_mdev_cleanup(mdev); 1654 1655 dev_info(DEV, "worker terminated\n"); 1656 1657 clear_bit(DEVICE_DYING, &mdev->flags); 1658 clear_bit(CONFIG_PENDING, &mdev->flags); 1659 wake_up(&mdev->state_wait); 1660 1661 return 0; 1662} 1663