drbd_worker.c revision 13d42685bec1f012dcbc5d187490eb1d15ec8219
1/* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24 */ 25 26#include <linux/module.h> 27#include <linux/drbd.h> 28#include <linux/sched.h> 29#include <linux/smp_lock.h> 30#include <linux/wait.h> 31#include <linux/mm.h> 32#include <linux/memcontrol.h> 33#include <linux/mm_inline.h> 34#include <linux/slab.h> 35#include <linux/random.h> 36#include <linux/string.h> 37#include <linux/scatterlist.h> 38 39#include "drbd_int.h" 40#include "drbd_req.h" 41 42static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel); 43 44 45 46/* defined here: 47 drbd_md_io_complete 48 drbd_endio_sec 49 drbd_endio_pri 50 51 * more endio handlers: 52 atodb_endio in drbd_actlog.c 53 drbd_bm_async_io_complete in drbd_bitmap.c 54 55 * For all these callbacks, note the following: 56 * The callbacks will be called in irq context by the IDE drivers, 57 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 58 * Try to get the locking right :) 59 * 60 */ 61 62 63/* About the global_state_lock 64 Each state transition on an device holds a read lock. In case we have 65 to evaluate the sync after dependencies, we grab a write lock, because 66 we need stable states on all devices for that. */ 67rwlock_t global_state_lock; 68 69/* used for synchronous meta data and bitmap IO 70 * submitted by drbd_md_sync_page_io() 71 */ 72void drbd_md_io_complete(struct bio *bio, int error) 73{ 74 struct drbd_md_io *md_io; 75 76 md_io = (struct drbd_md_io *)bio->bi_private; 77 md_io->error = error; 78 79 complete(&md_io->event); 80} 81 82/* reads on behalf of the partner, 83 * "submitted" by the receiver 84 */ 85void drbd_endio_read_sec_final(struct drbd_epoch_entry *e) __releases(local) 86{ 87 unsigned long flags = 0; 88 struct drbd_conf *mdev = e->mdev; 89 90 D_ASSERT(e->block_id != ID_VACANT); 91 92 spin_lock_irqsave(&mdev->req_lock, flags); 93 mdev->read_cnt += e->size >> 9; 94 list_del(&e->w.list); 95 if (list_empty(&mdev->read_ee)) 96 wake_up(&mdev->ee_wait); 97 if (test_bit(__EE_WAS_ERROR, &e->flags)) 98 __drbd_chk_io_error(mdev, FALSE); 99 spin_unlock_irqrestore(&mdev->req_lock, flags); 100 101 drbd_queue_work(&mdev->data.work, &e->w); 102 put_ldev(mdev); 103} 104 105static int is_failed_barrier(int ee_flags) 106{ 107 return (ee_flags & (EE_IS_BARRIER|EE_WAS_ERROR|EE_RESUBMITTED)) 108 == (EE_IS_BARRIER|EE_WAS_ERROR); 109} 110 111/* writes on behalf of the partner, or resync writes, 112 * "submitted" by the receiver, final stage. */ 113static void drbd_endio_write_sec_final(struct drbd_epoch_entry *e) __releases(local) 114{ 115 unsigned long flags = 0; 116 struct drbd_conf *mdev = e->mdev; 117 sector_t e_sector; 118 int do_wake; 119 int is_syncer_req; 120 int do_al_complete_io; 121 122 /* if this is a failed barrier request, disable use of barriers, 123 * and schedule for resubmission */ 124 if (is_failed_barrier(e->flags)) { 125 drbd_bump_write_ordering(mdev, WO_bdev_flush); 126 spin_lock_irqsave(&mdev->req_lock, flags); 127 list_del(&e->w.list); 128 e->flags = (e->flags & ~EE_WAS_ERROR) | EE_RESUBMITTED; 129 e->w.cb = w_e_reissue; 130 /* put_ldev actually happens below, once we come here again. */ 131 __release(local); 132 spin_unlock_irqrestore(&mdev->req_lock, flags); 133 drbd_queue_work(&mdev->data.work, &e->w); 134 return; 135 } 136 137 D_ASSERT(e->block_id != ID_VACANT); 138 139 /* after we moved e to done_ee, 140 * we may no longer access it, 141 * it may be freed/reused already! 142 * (as soon as we release the req_lock) */ 143 e_sector = e->sector; 144 do_al_complete_io = e->flags & EE_CALL_AL_COMPLETE_IO; 145 is_syncer_req = is_syncer_block_id(e->block_id); 146 147 spin_lock_irqsave(&mdev->req_lock, flags); 148 mdev->writ_cnt += e->size >> 9; 149 list_del(&e->w.list); /* has been on active_ee or sync_ee */ 150 list_add_tail(&e->w.list, &mdev->done_ee); 151 152 /* No hlist_del_init(&e->colision) here, we did not send the Ack yet, 153 * neither did we wake possibly waiting conflicting requests. 154 * done from "drbd_process_done_ee" within the appropriate w.cb 155 * (e_end_block/e_end_resync_block) or from _drbd_clear_done_ee */ 156 157 do_wake = is_syncer_req 158 ? list_empty(&mdev->sync_ee) 159 : list_empty(&mdev->active_ee); 160 161 if (test_bit(__EE_WAS_ERROR, &e->flags)) 162 __drbd_chk_io_error(mdev, FALSE); 163 spin_unlock_irqrestore(&mdev->req_lock, flags); 164 165 if (is_syncer_req) 166 drbd_rs_complete_io(mdev, e_sector); 167 168 if (do_wake) 169 wake_up(&mdev->ee_wait); 170 171 if (do_al_complete_io) 172 drbd_al_complete_io(mdev, e_sector); 173 174 wake_asender(mdev); 175 put_ldev(mdev); 176} 177 178/* writes on behalf of the partner, or resync writes, 179 * "submitted" by the receiver. 180 */ 181void drbd_endio_sec(struct bio *bio, int error) 182{ 183 struct drbd_epoch_entry *e = bio->bi_private; 184 struct drbd_conf *mdev = e->mdev; 185 int uptodate = bio_flagged(bio, BIO_UPTODATE); 186 int is_write = bio_data_dir(bio) == WRITE; 187 188 if (error) 189 dev_warn(DEV, "%s: error=%d s=%llus\n", 190 is_write ? "write" : "read", error, 191 (unsigned long long)e->sector); 192 if (!error && !uptodate) { 193 dev_warn(DEV, "%s: setting error to -EIO s=%llus\n", 194 is_write ? "write" : "read", 195 (unsigned long long)e->sector); 196 /* strange behavior of some lower level drivers... 197 * fail the request by clearing the uptodate flag, 198 * but do not return any error?! */ 199 error = -EIO; 200 } 201 202 if (error) 203 set_bit(__EE_WAS_ERROR, &e->flags); 204 205 bio_put(bio); /* no need for the bio anymore */ 206 if (atomic_dec_and_test(&e->pending_bios)) { 207 if (is_write) 208 drbd_endio_write_sec_final(e); 209 else 210 drbd_endio_read_sec_final(e); 211 } 212} 213 214/* read, readA or write requests on R_PRIMARY coming from drbd_make_request 215 */ 216void drbd_endio_pri(struct bio *bio, int error) 217{ 218 struct drbd_request *req = bio->bi_private; 219 struct drbd_conf *mdev = req->mdev; 220 enum drbd_req_event what; 221 int uptodate = bio_flagged(bio, BIO_UPTODATE); 222 223 if (!error && !uptodate) { 224 dev_warn(DEV, "p %s: setting error to -EIO\n", 225 bio_data_dir(bio) == WRITE ? "write" : "read"); 226 /* strange behavior of some lower level drivers... 227 * fail the request by clearing the uptodate flag, 228 * but do not return any error?! */ 229 error = -EIO; 230 } 231 232 /* to avoid recursion in __req_mod */ 233 if (unlikely(error)) { 234 what = (bio_data_dir(bio) == WRITE) 235 ? write_completed_with_error 236 : (bio_rw(bio) == READ) 237 ? read_completed_with_error 238 : read_ahead_completed_with_error; 239 } else 240 what = completed_ok; 241 242 bio_put(req->private_bio); 243 req->private_bio = ERR_PTR(error); 244 245 req_mod(req, what); 246} 247 248int w_read_retry_remote(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 249{ 250 struct drbd_request *req = container_of(w, struct drbd_request, w); 251 252 /* We should not detach for read io-error, 253 * but try to WRITE the P_DATA_REPLY to the failed location, 254 * to give the disk the chance to relocate that block */ 255 256 spin_lock_irq(&mdev->req_lock); 257 if (cancel || mdev->state.pdsk != D_UP_TO_DATE) { 258 _req_mod(req, read_retry_remote_canceled); 259 spin_unlock_irq(&mdev->req_lock); 260 return 1; 261 } 262 spin_unlock_irq(&mdev->req_lock); 263 264 return w_send_read_req(mdev, w, 0); 265} 266 267int w_resync_inactive(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 268{ 269 ERR_IF(cancel) return 1; 270 dev_err(DEV, "resync inactive, but callback triggered??\n"); 271 return 1; /* Simply ignore this! */ 272} 273 274void drbd_csum_ee(struct drbd_conf *mdev, struct crypto_hash *tfm, struct drbd_epoch_entry *e, void *digest) 275{ 276 struct hash_desc desc; 277 struct scatterlist sg; 278 struct page *page = e->pages; 279 struct page *tmp; 280 unsigned len; 281 282 desc.tfm = tfm; 283 desc.flags = 0; 284 285 sg_init_table(&sg, 1); 286 crypto_hash_init(&desc); 287 288 while ((tmp = page_chain_next(page))) { 289 /* all but the last page will be fully used */ 290 sg_set_page(&sg, page, PAGE_SIZE, 0); 291 crypto_hash_update(&desc, &sg, sg.length); 292 page = tmp; 293 } 294 /* and now the last, possibly only partially used page */ 295 len = e->size & (PAGE_SIZE - 1); 296 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 297 crypto_hash_update(&desc, &sg, sg.length); 298 crypto_hash_final(&desc, digest); 299} 300 301void drbd_csum_bio(struct drbd_conf *mdev, struct crypto_hash *tfm, struct bio *bio, void *digest) 302{ 303 struct hash_desc desc; 304 struct scatterlist sg; 305 struct bio_vec *bvec; 306 int i; 307 308 desc.tfm = tfm; 309 desc.flags = 0; 310 311 sg_init_table(&sg, 1); 312 crypto_hash_init(&desc); 313 314 __bio_for_each_segment(bvec, bio, i, 0) { 315 sg_set_page(&sg, bvec->bv_page, bvec->bv_len, bvec->bv_offset); 316 crypto_hash_update(&desc, &sg, sg.length); 317 } 318 crypto_hash_final(&desc, digest); 319} 320 321static int w_e_send_csum(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 322{ 323 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 324 int digest_size; 325 void *digest; 326 int ok; 327 328 D_ASSERT(e->block_id == DRBD_MAGIC + 0xbeef); 329 330 if (unlikely(cancel)) { 331 drbd_free_ee(mdev, e); 332 return 1; 333 } 334 335 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 336 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 337 digest = kmalloc(digest_size, GFP_NOIO); 338 if (digest) { 339 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); 340 341 inc_rs_pending(mdev); 342 ok = drbd_send_drequest_csum(mdev, 343 e->sector, 344 e->size, 345 digest, 346 digest_size, 347 P_CSUM_RS_REQUEST); 348 kfree(digest); 349 } else { 350 dev_err(DEV, "kmalloc() of digest failed.\n"); 351 ok = 0; 352 } 353 } else 354 ok = 1; 355 356 drbd_free_ee(mdev, e); 357 358 if (unlikely(!ok)) 359 dev_err(DEV, "drbd_send_drequest(..., csum) failed\n"); 360 return ok; 361} 362 363#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 364 365static int read_for_csum(struct drbd_conf *mdev, sector_t sector, int size) 366{ 367 struct drbd_epoch_entry *e; 368 369 if (!get_ldev(mdev)) 370 return -EIO; 371 372 if (drbd_rs_should_slow_down(mdev)) 373 goto defer; 374 375 /* GFP_TRY, because if there is no memory available right now, this may 376 * be rescheduled for later. It is "only" background resync, after all. */ 377 e = drbd_alloc_ee(mdev, DRBD_MAGIC+0xbeef, sector, size, GFP_TRY); 378 if (!e) 379 goto defer; 380 381 e->w.cb = w_e_send_csum; 382 spin_lock_irq(&mdev->req_lock); 383 list_add(&e->w.list, &mdev->read_ee); 384 spin_unlock_irq(&mdev->req_lock); 385 386 atomic_add(size >> 9, &mdev->rs_sect_ev); 387 if (drbd_submit_ee(mdev, e, READ, DRBD_FAULT_RS_RD) == 0) 388 return 0; 389 390 /* drbd_submit_ee currently fails for one reason only: 391 * not being able to allocate enough bios. 392 * Is dropping the connection going to help? */ 393 spin_lock_irq(&mdev->req_lock); 394 list_del(&e->w.list); 395 spin_unlock_irq(&mdev->req_lock); 396 397 drbd_free_ee(mdev, e); 398defer: 399 put_ldev(mdev); 400 return -EAGAIN; 401} 402 403void resync_timer_fn(unsigned long data) 404{ 405 struct drbd_conf *mdev = (struct drbd_conf *) data; 406 int queue; 407 408 queue = 1; 409 switch (mdev->state.conn) { 410 case C_VERIFY_S: 411 mdev->resync_work.cb = w_make_ov_request; 412 break; 413 case C_SYNC_TARGET: 414 mdev->resync_work.cb = w_make_resync_request; 415 break; 416 default: 417 queue = 0; 418 mdev->resync_work.cb = w_resync_inactive; 419 } 420 421 /* harmless race: list_empty outside data.work.q_lock */ 422 if (list_empty(&mdev->resync_work.list) && queue) 423 drbd_queue_work(&mdev->data.work, &mdev->resync_work); 424} 425 426static void fifo_set(struct fifo_buffer *fb, int value) 427{ 428 int i; 429 430 for (i = 0; i < fb->size; i++) 431 fb->values[i] = value; 432} 433 434static int fifo_push(struct fifo_buffer *fb, int value) 435{ 436 int ov; 437 438 ov = fb->values[fb->head_index]; 439 fb->values[fb->head_index++] = value; 440 441 if (fb->head_index >= fb->size) 442 fb->head_index = 0; 443 444 return ov; 445} 446 447static void fifo_add_val(struct fifo_buffer *fb, int value) 448{ 449 int i; 450 451 for (i = 0; i < fb->size; i++) 452 fb->values[i] += value; 453} 454 455int drbd_rs_controller(struct drbd_conf *mdev) 456{ 457 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 458 unsigned int want; /* The number of sectors we want in the proxy */ 459 int req_sect; /* Number of sectors to request in this turn */ 460 int correction; /* Number of sectors more we need in the proxy*/ 461 int cps; /* correction per invocation of drbd_rs_controller() */ 462 int steps; /* Number of time steps to plan ahead */ 463 int curr_corr; 464 int max_sect; 465 466 sect_in = atomic_xchg(&mdev->rs_sect_in, 0); /* Number of sectors that came in */ 467 mdev->rs_in_flight -= sect_in; 468 469 spin_lock(&mdev->peer_seq_lock); /* get an atomic view on mdev->rs_plan_s */ 470 471 steps = mdev->rs_plan_s.size; /* (mdev->sync_conf.c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 472 473 if (mdev->rs_in_flight + sect_in == 0) { /* At start of resync */ 474 want = ((mdev->sync_conf.rate * 2 * SLEEP_TIME) / HZ) * steps; 475 } else { /* normal path */ 476 want = mdev->sync_conf.c_fill_target ? mdev->sync_conf.c_fill_target : 477 sect_in * mdev->sync_conf.c_delay_target * HZ / (SLEEP_TIME * 10); 478 } 479 480 correction = want - mdev->rs_in_flight - mdev->rs_planed; 481 482 /* Plan ahead */ 483 cps = correction / steps; 484 fifo_add_val(&mdev->rs_plan_s, cps); 485 mdev->rs_planed += cps * steps; 486 487 /* What we do in this step */ 488 curr_corr = fifo_push(&mdev->rs_plan_s, 0); 489 spin_unlock(&mdev->peer_seq_lock); 490 mdev->rs_planed -= curr_corr; 491 492 req_sect = sect_in + curr_corr; 493 if (req_sect < 0) 494 req_sect = 0; 495 496 max_sect = (mdev->sync_conf.c_max_rate * 2 * SLEEP_TIME) / HZ; 497 if (req_sect > max_sect) 498 req_sect = max_sect; 499 500 /* 501 dev_warn(DEV, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 502 sect_in, mdev->rs_in_flight, want, correction, 503 steps, cps, mdev->rs_planed, curr_corr, req_sect); 504 */ 505 506 return req_sect; 507} 508 509int w_make_resync_request(struct drbd_conf *mdev, 510 struct drbd_work *w, int cancel) 511{ 512 unsigned long bit; 513 sector_t sector; 514 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 515 int max_segment_size; 516 int number, rollback_i, size, pe, mx; 517 int align, queued, sndbuf; 518 int i = 0; 519 520 if (unlikely(cancel)) 521 return 1; 522 523 if (unlikely(mdev->state.conn < C_CONNECTED)) { 524 dev_err(DEV, "Confused in w_make_resync_request()! cstate < Connected"); 525 return 0; 526 } 527 528 if (mdev->state.conn != C_SYNC_TARGET) 529 dev_err(DEV, "%s in w_make_resync_request\n", 530 drbd_conn_str(mdev->state.conn)); 531 532 if (mdev->rs_total == 0) { 533 /* empty resync? */ 534 drbd_resync_finished(mdev); 535 return 1; 536 } 537 538 if (!get_ldev(mdev)) { 539 /* Since we only need to access mdev->rsync a 540 get_ldev_if_state(mdev,D_FAILED) would be sufficient, but 541 to continue resync with a broken disk makes no sense at 542 all */ 543 dev_err(DEV, "Disk broke down during resync!\n"); 544 mdev->resync_work.cb = w_resync_inactive; 545 return 1; 546 } 547 548 /* starting with drbd 8.3.8, we can handle multi-bio EEs, 549 * if it should be necessary */ 550 max_segment_size = 551 mdev->agreed_pro_version < 94 ? queue_max_segment_size(mdev->rq_queue) : 552 mdev->agreed_pro_version < 95 ? DRBD_MAX_SIZE_H80_PACKET : DRBD_MAX_SEGMENT_SIZE; 553 554 if (mdev->rs_plan_s.size) { /* mdev->sync_conf.c_plan_ahead */ 555 number = drbd_rs_controller(mdev) >> (BM_BLOCK_SHIFT - 9); 556 mdev->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 557 } else { 558 mdev->c_sync_rate = mdev->sync_conf.rate; 559 number = SLEEP_TIME * mdev->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 560 } 561 562 /* Throttle resync on lower level disk activity, which may also be 563 * caused by application IO on Primary/SyncTarget. 564 * Keep this after the call to drbd_rs_controller, as that assumes 565 * to be called as precisely as possible every SLEEP_TIME, 566 * and would be confused otherwise. */ 567 if (drbd_rs_should_slow_down(mdev)) 568 goto requeue; 569 570 mutex_lock(&mdev->data.mutex); 571 if (mdev->data.socket) 572 mx = mdev->data.socket->sk->sk_rcvbuf / sizeof(struct p_block_req); 573 else 574 mx = 1; 575 mutex_unlock(&mdev->data.mutex); 576 577 /* For resync rates >160MB/sec, allow more pending RS requests */ 578 if (number > mx) 579 mx = number; 580 581 /* Limit the number of pending RS requests to no more than the peer's receive buffer */ 582 pe = atomic_read(&mdev->rs_pending_cnt); 583 if ((pe + number) > mx) { 584 number = mx - pe; 585 } 586 587 for (i = 0; i < number; i++) { 588 /* Stop generating RS requests, when half of the send buffer is filled */ 589 mutex_lock(&mdev->data.mutex); 590 if (mdev->data.socket) { 591 queued = mdev->data.socket->sk->sk_wmem_queued; 592 sndbuf = mdev->data.socket->sk->sk_sndbuf; 593 } else { 594 queued = 1; 595 sndbuf = 0; 596 } 597 mutex_unlock(&mdev->data.mutex); 598 if (queued > sndbuf / 2) 599 goto requeue; 600 601next_sector: 602 size = BM_BLOCK_SIZE; 603 bit = drbd_bm_find_next(mdev, mdev->bm_resync_fo); 604 605 if (bit == -1UL) { 606 mdev->bm_resync_fo = drbd_bm_bits(mdev); 607 mdev->resync_work.cb = w_resync_inactive; 608 put_ldev(mdev); 609 return 1; 610 } 611 612 sector = BM_BIT_TO_SECT(bit); 613 614 if (drbd_try_rs_begin_io(mdev, sector)) { 615 mdev->bm_resync_fo = bit; 616 goto requeue; 617 } 618 mdev->bm_resync_fo = bit + 1; 619 620 if (unlikely(drbd_bm_test_bit(mdev, bit) == 0)) { 621 drbd_rs_complete_io(mdev, sector); 622 goto next_sector; 623 } 624 625#if DRBD_MAX_SEGMENT_SIZE > BM_BLOCK_SIZE 626 /* try to find some adjacent bits. 627 * we stop if we have already the maximum req size. 628 * 629 * Additionally always align bigger requests, in order to 630 * be prepared for all stripe sizes of software RAIDs. 631 */ 632 align = 1; 633 rollback_i = i; 634 for (;;) { 635 if (size + BM_BLOCK_SIZE > max_segment_size) 636 break; 637 638 /* Be always aligned */ 639 if (sector & ((1<<(align+3))-1)) 640 break; 641 642 /* do not cross extent boundaries */ 643 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 644 break; 645 /* now, is it actually dirty, after all? 646 * caution, drbd_bm_test_bit is tri-state for some 647 * obscure reason; ( b == 0 ) would get the out-of-band 648 * only accidentally right because of the "oddly sized" 649 * adjustment below */ 650 if (drbd_bm_test_bit(mdev, bit+1) != 1) 651 break; 652 bit++; 653 size += BM_BLOCK_SIZE; 654 if ((BM_BLOCK_SIZE << align) <= size) 655 align++; 656 i++; 657 } 658 /* if we merged some, 659 * reset the offset to start the next drbd_bm_find_next from */ 660 if (size > BM_BLOCK_SIZE) 661 mdev->bm_resync_fo = bit + 1; 662#endif 663 664 /* adjust very last sectors, in case we are oddly sized */ 665 if (sector + (size>>9) > capacity) 666 size = (capacity-sector)<<9; 667 if (mdev->agreed_pro_version >= 89 && mdev->csums_tfm) { 668 switch (read_for_csum(mdev, sector, size)) { 669 case -EIO: /* Disk failure */ 670 put_ldev(mdev); 671 return 0; 672 case -EAGAIN: /* allocation failed, or ldev busy */ 673 drbd_rs_complete_io(mdev, sector); 674 mdev->bm_resync_fo = BM_SECT_TO_BIT(sector); 675 i = rollback_i; 676 goto requeue; 677 case 0: 678 /* everything ok */ 679 break; 680 default: 681 BUG(); 682 } 683 } else { 684 inc_rs_pending(mdev); 685 if (!drbd_send_drequest(mdev, P_RS_DATA_REQUEST, 686 sector, size, ID_SYNCER)) { 687 dev_err(DEV, "drbd_send_drequest() failed, aborting...\n"); 688 dec_rs_pending(mdev); 689 put_ldev(mdev); 690 return 0; 691 } 692 } 693 } 694 695 if (mdev->bm_resync_fo >= drbd_bm_bits(mdev)) { 696 /* last syncer _request_ was sent, 697 * but the P_RS_DATA_REPLY not yet received. sync will end (and 698 * next sync group will resume), as soon as we receive the last 699 * resync data block, and the last bit is cleared. 700 * until then resync "work" is "inactive" ... 701 */ 702 mdev->resync_work.cb = w_resync_inactive; 703 put_ldev(mdev); 704 return 1; 705 } 706 707 requeue: 708 mdev->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 709 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 710 put_ldev(mdev); 711 return 1; 712} 713 714static int w_make_ov_request(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 715{ 716 int number, i, size; 717 sector_t sector; 718 const sector_t capacity = drbd_get_capacity(mdev->this_bdev); 719 720 if (unlikely(cancel)) 721 return 1; 722 723 if (unlikely(mdev->state.conn < C_CONNECTED)) { 724 dev_err(DEV, "Confused in w_make_ov_request()! cstate < Connected"); 725 return 0; 726 } 727 728 number = SLEEP_TIME*mdev->sync_conf.rate / ((BM_BLOCK_SIZE/1024)*HZ); 729 if (atomic_read(&mdev->rs_pending_cnt) > number) 730 goto requeue; 731 732 number -= atomic_read(&mdev->rs_pending_cnt); 733 734 sector = mdev->ov_position; 735 for (i = 0; i < number; i++) { 736 if (sector >= capacity) { 737 mdev->resync_work.cb = w_resync_inactive; 738 return 1; 739 } 740 741 size = BM_BLOCK_SIZE; 742 743 if (drbd_try_rs_begin_io(mdev, sector)) { 744 mdev->ov_position = sector; 745 goto requeue; 746 } 747 748 if (sector + (size>>9) > capacity) 749 size = (capacity-sector)<<9; 750 751 inc_rs_pending(mdev); 752 if (!drbd_send_ov_request(mdev, sector, size)) { 753 dec_rs_pending(mdev); 754 return 0; 755 } 756 sector += BM_SECT_PER_BIT; 757 } 758 mdev->ov_position = sector; 759 760 requeue: 761 mod_timer(&mdev->resync_timer, jiffies + SLEEP_TIME); 762 return 1; 763} 764 765 766int w_ov_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 767{ 768 kfree(w); 769 ov_oos_print(mdev); 770 drbd_resync_finished(mdev); 771 772 return 1; 773} 774 775static int w_resync_finished(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 776{ 777 kfree(w); 778 779 drbd_resync_finished(mdev); 780 781 return 1; 782} 783 784static void ping_peer(struct drbd_conf *mdev) 785{ 786 clear_bit(GOT_PING_ACK, &mdev->flags); 787 request_ping(mdev); 788 wait_event(mdev->misc_wait, 789 test_bit(GOT_PING_ACK, &mdev->flags) || mdev->state.conn < C_CONNECTED); 790} 791 792int drbd_resync_finished(struct drbd_conf *mdev) 793{ 794 unsigned long db, dt, dbdt; 795 unsigned long n_oos; 796 union drbd_state os, ns; 797 struct drbd_work *w; 798 char *khelper_cmd = NULL; 799 800 /* Remove all elements from the resync LRU. Since future actions 801 * might set bits in the (main) bitmap, then the entries in the 802 * resync LRU would be wrong. */ 803 if (drbd_rs_del_all(mdev)) { 804 /* In case this is not possible now, most probably because 805 * there are P_RS_DATA_REPLY Packets lingering on the worker's 806 * queue (or even the read operations for those packets 807 * is not finished by now). Retry in 100ms. */ 808 809 drbd_kick_lo(mdev); 810 __set_current_state(TASK_INTERRUPTIBLE); 811 schedule_timeout(HZ / 10); 812 w = kmalloc(sizeof(struct drbd_work), GFP_ATOMIC); 813 if (w) { 814 w->cb = w_resync_finished; 815 drbd_queue_work(&mdev->data.work, w); 816 return 1; 817 } 818 dev_err(DEV, "Warn failed to drbd_rs_del_all() and to kmalloc(w).\n"); 819 } 820 821 dt = (jiffies - mdev->rs_start - mdev->rs_paused) / HZ; 822 if (dt <= 0) 823 dt = 1; 824 db = mdev->rs_total; 825 dbdt = Bit2KB(db/dt); 826 mdev->rs_paused /= HZ; 827 828 if (!get_ldev(mdev)) 829 goto out; 830 831 ping_peer(mdev); 832 833 spin_lock_irq(&mdev->req_lock); 834 os = mdev->state; 835 836 /* This protects us against multiple calls (that can happen in the presence 837 of application IO), and against connectivity loss just before we arrive here. */ 838 if (os.conn <= C_CONNECTED) 839 goto out_unlock; 840 841 ns = os; 842 ns.conn = C_CONNECTED; 843 844 dev_info(DEV, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 845 (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) ? 846 "Online verify " : "Resync", 847 dt + mdev->rs_paused, mdev->rs_paused, dbdt); 848 849 n_oos = drbd_bm_total_weight(mdev); 850 851 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 852 if (n_oos) { 853 dev_alert(DEV, "Online verify found %lu %dk block out of sync!\n", 854 n_oos, Bit2KB(1)); 855 khelper_cmd = "out-of-sync"; 856 } 857 } else { 858 D_ASSERT((n_oos - mdev->rs_failed) == 0); 859 860 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 861 khelper_cmd = "after-resync-target"; 862 863 if (mdev->csums_tfm && mdev->rs_total) { 864 const unsigned long s = mdev->rs_same_csum; 865 const unsigned long t = mdev->rs_total; 866 const int ratio = 867 (t == 0) ? 0 : 868 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 869 dev_info(DEV, "%u %% had equal check sums, eliminated: %luK; " 870 "transferred %luK total %luK\n", 871 ratio, 872 Bit2KB(mdev->rs_same_csum), 873 Bit2KB(mdev->rs_total - mdev->rs_same_csum), 874 Bit2KB(mdev->rs_total)); 875 } 876 } 877 878 if (mdev->rs_failed) { 879 dev_info(DEV, " %lu failed blocks\n", mdev->rs_failed); 880 881 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 882 ns.disk = D_INCONSISTENT; 883 ns.pdsk = D_UP_TO_DATE; 884 } else { 885 ns.disk = D_UP_TO_DATE; 886 ns.pdsk = D_INCONSISTENT; 887 } 888 } else { 889 ns.disk = D_UP_TO_DATE; 890 ns.pdsk = D_UP_TO_DATE; 891 892 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 893 if (mdev->p_uuid) { 894 int i; 895 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 896 _drbd_uuid_set(mdev, i, mdev->p_uuid[i]); 897 drbd_uuid_set(mdev, UI_BITMAP, mdev->ldev->md.uuid[UI_CURRENT]); 898 _drbd_uuid_set(mdev, UI_CURRENT, mdev->p_uuid[UI_CURRENT]); 899 } else { 900 dev_err(DEV, "mdev->p_uuid is NULL! BUG\n"); 901 } 902 } 903 904 drbd_uuid_set_bm(mdev, 0UL); 905 906 if (mdev->p_uuid) { 907 /* Now the two UUID sets are equal, update what we 908 * know of the peer. */ 909 int i; 910 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 911 mdev->p_uuid[i] = mdev->ldev->md.uuid[i]; 912 } 913 } 914 915 _drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 916out_unlock: 917 spin_unlock_irq(&mdev->req_lock); 918 put_ldev(mdev); 919out: 920 mdev->rs_total = 0; 921 mdev->rs_failed = 0; 922 mdev->rs_paused = 0; 923 mdev->ov_start_sector = 0; 924 925 drbd_md_sync(mdev); 926 927 if (test_and_clear_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags)) { 928 dev_warn(DEV, "Writing the whole bitmap, due to failed kmalloc\n"); 929 drbd_queue_bitmap_io(mdev, &drbd_bm_write, NULL, "write from resync_finished"); 930 } 931 932 if (khelper_cmd) 933 drbd_khelper(mdev, khelper_cmd); 934 935 return 1; 936} 937 938/* helper */ 939static void move_to_net_ee_or_free(struct drbd_conf *mdev, struct drbd_epoch_entry *e) 940{ 941 if (drbd_ee_has_active_page(e)) { 942 /* This might happen if sendpage() has not finished */ 943 int i = (e->size + PAGE_SIZE -1) >> PAGE_SHIFT; 944 atomic_add(i, &mdev->pp_in_use_by_net); 945 atomic_sub(i, &mdev->pp_in_use); 946 spin_lock_irq(&mdev->req_lock); 947 list_add_tail(&e->w.list, &mdev->net_ee); 948 spin_unlock_irq(&mdev->req_lock); 949 wake_up(&drbd_pp_wait); 950 } else 951 drbd_free_ee(mdev, e); 952} 953 954/** 955 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 956 * @mdev: DRBD device. 957 * @w: work object. 958 * @cancel: The connection will be closed anyways 959 */ 960int w_e_end_data_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 961{ 962 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 963 int ok; 964 965 if (unlikely(cancel)) { 966 drbd_free_ee(mdev, e); 967 dec_unacked(mdev); 968 return 1; 969 } 970 971 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 972 ok = drbd_send_block(mdev, P_DATA_REPLY, e); 973 } else { 974 if (__ratelimit(&drbd_ratelimit_state)) 975 dev_err(DEV, "Sending NegDReply. sector=%llus.\n", 976 (unsigned long long)e->sector); 977 978 ok = drbd_send_ack(mdev, P_NEG_DREPLY, e); 979 } 980 981 dec_unacked(mdev); 982 983 move_to_net_ee_or_free(mdev, e); 984 985 if (unlikely(!ok)) 986 dev_err(DEV, "drbd_send_block() failed\n"); 987 return ok; 988} 989 990/** 991 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUESTRS 992 * @mdev: DRBD device. 993 * @w: work object. 994 * @cancel: The connection will be closed anyways 995 */ 996int w_e_end_rsdata_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 997{ 998 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 999 int ok; 1000 1001 if (unlikely(cancel)) { 1002 drbd_free_ee(mdev, e); 1003 dec_unacked(mdev); 1004 return 1; 1005 } 1006 1007 if (get_ldev_if_state(mdev, D_FAILED)) { 1008 drbd_rs_complete_io(mdev, e->sector); 1009 put_ldev(mdev); 1010 } 1011 1012 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 1013 if (likely(mdev->state.pdsk >= D_INCONSISTENT)) { 1014 inc_rs_pending(mdev); 1015 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 1016 } else { 1017 if (__ratelimit(&drbd_ratelimit_state)) 1018 dev_err(DEV, "Not sending RSDataReply, " 1019 "partner DISKLESS!\n"); 1020 ok = 1; 1021 } 1022 } else { 1023 if (__ratelimit(&drbd_ratelimit_state)) 1024 dev_err(DEV, "Sending NegRSDReply. sector %llus.\n", 1025 (unsigned long long)e->sector); 1026 1027 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 1028 1029 /* update resync data with failure */ 1030 drbd_rs_failed_io(mdev, e->sector, e->size); 1031 } 1032 1033 dec_unacked(mdev); 1034 1035 move_to_net_ee_or_free(mdev, e); 1036 1037 if (unlikely(!ok)) 1038 dev_err(DEV, "drbd_send_block() failed\n"); 1039 return ok; 1040} 1041 1042int w_e_end_csum_rs_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1043{ 1044 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1045 struct digest_info *di; 1046 int digest_size; 1047 void *digest = NULL; 1048 int ok, eq = 0; 1049 1050 if (unlikely(cancel)) { 1051 drbd_free_ee(mdev, e); 1052 dec_unacked(mdev); 1053 return 1; 1054 } 1055 1056 if (get_ldev(mdev)) { 1057 drbd_rs_complete_io(mdev, e->sector); 1058 put_ldev(mdev); 1059 } 1060 1061 di = e->digest; 1062 1063 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 1064 /* quick hack to try to avoid a race against reconfiguration. 1065 * a real fix would be much more involved, 1066 * introducing more locking mechanisms */ 1067 if (mdev->csums_tfm) { 1068 digest_size = crypto_hash_digestsize(mdev->csums_tfm); 1069 D_ASSERT(digest_size == di->digest_size); 1070 digest = kmalloc(digest_size, GFP_NOIO); 1071 } 1072 if (digest) { 1073 drbd_csum_ee(mdev, mdev->csums_tfm, e, digest); 1074 eq = !memcmp(digest, di->digest, digest_size); 1075 kfree(digest); 1076 } 1077 1078 if (eq) { 1079 drbd_set_in_sync(mdev, e->sector, e->size); 1080 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1081 mdev->rs_same_csum += e->size >> BM_BLOCK_SHIFT; 1082 ok = drbd_send_ack(mdev, P_RS_IS_IN_SYNC, e); 1083 } else { 1084 inc_rs_pending(mdev); 1085 e->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1086 e->flags &= ~EE_HAS_DIGEST; /* This e no longer has a digest pointer */ 1087 kfree(di); 1088 ok = drbd_send_block(mdev, P_RS_DATA_REPLY, e); 1089 } 1090 } else { 1091 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 1092 if (__ratelimit(&drbd_ratelimit_state)) 1093 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 1094 } 1095 1096 dec_unacked(mdev); 1097 move_to_net_ee_or_free(mdev, e); 1098 1099 if (unlikely(!ok)) 1100 dev_err(DEV, "drbd_send_block/ack() failed\n"); 1101 return ok; 1102} 1103 1104int w_e_end_ov_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1105{ 1106 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1107 int digest_size; 1108 void *digest; 1109 int ok = 1; 1110 1111 if (unlikely(cancel)) 1112 goto out; 1113 1114 if (unlikely((e->flags & EE_WAS_ERROR) != 0)) 1115 goto out; 1116 1117 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1118 /* FIXME if this allocation fails, online verify will not terminate! */ 1119 digest = kmalloc(digest_size, GFP_NOIO); 1120 if (digest) { 1121 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); 1122 inc_rs_pending(mdev); 1123 ok = drbd_send_drequest_csum(mdev, e->sector, e->size, 1124 digest, digest_size, P_OV_REPLY); 1125 if (!ok) 1126 dec_rs_pending(mdev); 1127 kfree(digest); 1128 } 1129 1130out: 1131 drbd_free_ee(mdev, e); 1132 1133 dec_unacked(mdev); 1134 1135 return ok; 1136} 1137 1138void drbd_ov_oos_found(struct drbd_conf *mdev, sector_t sector, int size) 1139{ 1140 if (mdev->ov_last_oos_start + mdev->ov_last_oos_size == sector) { 1141 mdev->ov_last_oos_size += size>>9; 1142 } else { 1143 mdev->ov_last_oos_start = sector; 1144 mdev->ov_last_oos_size = size>>9; 1145 } 1146 drbd_set_out_of_sync(mdev, sector, size); 1147 set_bit(WRITE_BM_AFTER_RESYNC, &mdev->flags); 1148} 1149 1150int w_e_end_ov_reply(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1151{ 1152 struct drbd_epoch_entry *e = container_of(w, struct drbd_epoch_entry, w); 1153 struct digest_info *di; 1154 int digest_size; 1155 void *digest; 1156 int ok, eq = 0; 1157 1158 if (unlikely(cancel)) { 1159 drbd_free_ee(mdev, e); 1160 dec_unacked(mdev); 1161 return 1; 1162 } 1163 1164 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1165 * the resync lru has been cleaned up already */ 1166 if (get_ldev(mdev)) { 1167 drbd_rs_complete_io(mdev, e->sector); 1168 put_ldev(mdev); 1169 } 1170 1171 di = e->digest; 1172 1173 if (likely((e->flags & EE_WAS_ERROR) == 0)) { 1174 digest_size = crypto_hash_digestsize(mdev->verify_tfm); 1175 digest = kmalloc(digest_size, GFP_NOIO); 1176 if (digest) { 1177 drbd_csum_ee(mdev, mdev->verify_tfm, e, digest); 1178 1179 D_ASSERT(digest_size == di->digest_size); 1180 eq = !memcmp(digest, di->digest, digest_size); 1181 kfree(digest); 1182 } 1183 } else { 1184 ok = drbd_send_ack(mdev, P_NEG_RS_DREPLY, e); 1185 if (__ratelimit(&drbd_ratelimit_state)) 1186 dev_err(DEV, "Sending NegDReply. I guess it gets messy.\n"); 1187 } 1188 1189 dec_unacked(mdev); 1190 if (!eq) 1191 drbd_ov_oos_found(mdev, e->sector, e->size); 1192 else 1193 ov_oos_print(mdev); 1194 1195 ok = drbd_send_ack_ex(mdev, P_OV_RESULT, e->sector, e->size, 1196 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1197 1198 drbd_free_ee(mdev, e); 1199 1200 if (--mdev->ov_left == 0) { 1201 ov_oos_print(mdev); 1202 drbd_resync_finished(mdev); 1203 } 1204 1205 return ok; 1206} 1207 1208int w_prev_work_done(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1209{ 1210 struct drbd_wq_barrier *b = container_of(w, struct drbd_wq_barrier, w); 1211 complete(&b->done); 1212 return 1; 1213} 1214 1215int w_send_barrier(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1216{ 1217 struct drbd_tl_epoch *b = container_of(w, struct drbd_tl_epoch, w); 1218 struct p_barrier *p = &mdev->data.sbuf.barrier; 1219 int ok = 1; 1220 1221 /* really avoid racing with tl_clear. w.cb may have been referenced 1222 * just before it was reassigned and re-queued, so double check that. 1223 * actually, this race was harmless, since we only try to send the 1224 * barrier packet here, and otherwise do nothing with the object. 1225 * but compare with the head of w_clear_epoch */ 1226 spin_lock_irq(&mdev->req_lock); 1227 if (w->cb != w_send_barrier || mdev->state.conn < C_CONNECTED) 1228 cancel = 1; 1229 spin_unlock_irq(&mdev->req_lock); 1230 if (cancel) 1231 return 1; 1232 1233 if (!drbd_get_data_sock(mdev)) 1234 return 0; 1235 p->barrier = b->br_number; 1236 /* inc_ap_pending was done where this was queued. 1237 * dec_ap_pending will be done in got_BarrierAck 1238 * or (on connection loss) in w_clear_epoch. */ 1239 ok = _drbd_send_cmd(mdev, mdev->data.socket, P_BARRIER, 1240 (struct p_header80 *)p, sizeof(*p), 0); 1241 drbd_put_data_sock(mdev); 1242 1243 return ok; 1244} 1245 1246int w_send_write_hint(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1247{ 1248 if (cancel) 1249 return 1; 1250 return drbd_send_short_cmd(mdev, P_UNPLUG_REMOTE); 1251} 1252 1253/** 1254 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1255 * @mdev: DRBD device. 1256 * @w: work object. 1257 * @cancel: The connection will be closed anyways 1258 */ 1259int w_send_dblock(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1260{ 1261 struct drbd_request *req = container_of(w, struct drbd_request, w); 1262 int ok; 1263 1264 if (unlikely(cancel)) { 1265 req_mod(req, send_canceled); 1266 return 1; 1267 } 1268 1269 ok = drbd_send_dblock(mdev, req); 1270 req_mod(req, ok ? handed_over_to_network : send_failed); 1271 1272 return ok; 1273} 1274 1275/** 1276 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1277 * @mdev: DRBD device. 1278 * @w: work object. 1279 * @cancel: The connection will be closed anyways 1280 */ 1281int w_send_read_req(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1282{ 1283 struct drbd_request *req = container_of(w, struct drbd_request, w); 1284 int ok; 1285 1286 if (unlikely(cancel)) { 1287 req_mod(req, send_canceled); 1288 return 1; 1289 } 1290 1291 ok = drbd_send_drequest(mdev, P_DATA_REQUEST, req->sector, req->size, 1292 (unsigned long)req); 1293 1294 if (!ok) { 1295 /* ?? we set C_TIMEOUT or C_BROKEN_PIPE in drbd_send(); 1296 * so this is probably redundant */ 1297 if (mdev->state.conn >= C_CONNECTED) 1298 drbd_force_state(mdev, NS(conn, C_NETWORK_FAILURE)); 1299 } 1300 req_mod(req, ok ? handed_over_to_network : send_failed); 1301 1302 return ok; 1303} 1304 1305int w_restart_disk_io(struct drbd_conf *mdev, struct drbd_work *w, int cancel) 1306{ 1307 struct drbd_request *req = container_of(w, struct drbd_request, w); 1308 1309 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1310 drbd_al_begin_io(mdev, req->sector); 1311 /* Calling drbd_al_begin_io() out of the worker might deadlocks 1312 theoretically. Practically it can not deadlock, since this is 1313 only used when unfreezing IOs. All the extents of the requests 1314 that made it into the TL are already active */ 1315 1316 drbd_req_make_private_bio(req, req->master_bio); 1317 req->private_bio->bi_bdev = mdev->ldev->backing_bdev; 1318 generic_make_request(req->private_bio); 1319 1320 return 1; 1321} 1322 1323static int _drbd_may_sync_now(struct drbd_conf *mdev) 1324{ 1325 struct drbd_conf *odev = mdev; 1326 1327 while (1) { 1328 if (odev->sync_conf.after == -1) 1329 return 1; 1330 odev = minor_to_mdev(odev->sync_conf.after); 1331 ERR_IF(!odev) return 1; 1332 if ((odev->state.conn >= C_SYNC_SOURCE && 1333 odev->state.conn <= C_PAUSED_SYNC_T) || 1334 odev->state.aftr_isp || odev->state.peer_isp || 1335 odev->state.user_isp) 1336 return 0; 1337 } 1338} 1339 1340/** 1341 * _drbd_pause_after() - Pause resync on all devices that may not resync now 1342 * @mdev: DRBD device. 1343 * 1344 * Called from process context only (admin command and after_state_ch). 1345 */ 1346static int _drbd_pause_after(struct drbd_conf *mdev) 1347{ 1348 struct drbd_conf *odev; 1349 int i, rv = 0; 1350 1351 for (i = 0; i < minor_count; i++) { 1352 odev = minor_to_mdev(i); 1353 if (!odev) 1354 continue; 1355 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1356 continue; 1357 if (!_drbd_may_sync_now(odev)) 1358 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL) 1359 != SS_NOTHING_TO_DO); 1360 } 1361 1362 return rv; 1363} 1364 1365/** 1366 * _drbd_resume_next() - Resume resync on all devices that may resync now 1367 * @mdev: DRBD device. 1368 * 1369 * Called from process context only (admin command and worker). 1370 */ 1371static int _drbd_resume_next(struct drbd_conf *mdev) 1372{ 1373 struct drbd_conf *odev; 1374 int i, rv = 0; 1375 1376 for (i = 0; i < minor_count; i++) { 1377 odev = minor_to_mdev(i); 1378 if (!odev) 1379 continue; 1380 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1381 continue; 1382 if (odev->state.aftr_isp) { 1383 if (_drbd_may_sync_now(odev)) 1384 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0), 1385 CS_HARD, NULL) 1386 != SS_NOTHING_TO_DO) ; 1387 } 1388 } 1389 return rv; 1390} 1391 1392void resume_next_sg(struct drbd_conf *mdev) 1393{ 1394 write_lock_irq(&global_state_lock); 1395 _drbd_resume_next(mdev); 1396 write_unlock_irq(&global_state_lock); 1397} 1398 1399void suspend_other_sg(struct drbd_conf *mdev) 1400{ 1401 write_lock_irq(&global_state_lock); 1402 _drbd_pause_after(mdev); 1403 write_unlock_irq(&global_state_lock); 1404} 1405 1406static int sync_after_error(struct drbd_conf *mdev, int o_minor) 1407{ 1408 struct drbd_conf *odev; 1409 1410 if (o_minor == -1) 1411 return NO_ERROR; 1412 if (o_minor < -1 || minor_to_mdev(o_minor) == NULL) 1413 return ERR_SYNC_AFTER; 1414 1415 /* check for loops */ 1416 odev = minor_to_mdev(o_minor); 1417 while (1) { 1418 if (odev == mdev) 1419 return ERR_SYNC_AFTER_CYCLE; 1420 1421 /* dependency chain ends here, no cycles. */ 1422 if (odev->sync_conf.after == -1) 1423 return NO_ERROR; 1424 1425 /* follow the dependency chain */ 1426 odev = minor_to_mdev(odev->sync_conf.after); 1427 } 1428} 1429 1430int drbd_alter_sa(struct drbd_conf *mdev, int na) 1431{ 1432 int changes; 1433 int retcode; 1434 1435 write_lock_irq(&global_state_lock); 1436 retcode = sync_after_error(mdev, na); 1437 if (retcode == NO_ERROR) { 1438 mdev->sync_conf.after = na; 1439 do { 1440 changes = _drbd_pause_after(mdev); 1441 changes |= _drbd_resume_next(mdev); 1442 } while (changes); 1443 } 1444 write_unlock_irq(&global_state_lock); 1445 return retcode; 1446} 1447 1448/** 1449 * drbd_start_resync() - Start the resync process 1450 * @mdev: DRBD device. 1451 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1452 * 1453 * This function might bring you directly into one of the 1454 * C_PAUSED_SYNC_* states. 1455 */ 1456void drbd_start_resync(struct drbd_conf *mdev, enum drbd_conns side) 1457{ 1458 union drbd_state ns; 1459 int r; 1460 1461 if (mdev->state.conn >= C_SYNC_SOURCE) { 1462 dev_err(DEV, "Resync already running!\n"); 1463 return; 1464 } 1465 1466 /* In case a previous resync run was aborted by an IO error/detach on the peer. */ 1467 drbd_rs_cancel_all(mdev); 1468 1469 if (side == C_SYNC_TARGET) { 1470 /* Since application IO was locked out during C_WF_BITMAP_T and 1471 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1472 we check that we might make the data inconsistent. */ 1473 r = drbd_khelper(mdev, "before-resync-target"); 1474 r = (r >> 8) & 0xff; 1475 if (r > 0) { 1476 dev_info(DEV, "before-resync-target handler returned %d, " 1477 "dropping connection.\n", r); 1478 drbd_force_state(mdev, NS(conn, C_DISCONNECTING)); 1479 return; 1480 } 1481 } 1482 1483 drbd_state_lock(mdev); 1484 1485 if (!get_ldev_if_state(mdev, D_NEGOTIATING)) { 1486 drbd_state_unlock(mdev); 1487 return; 1488 } 1489 1490 if (side == C_SYNC_TARGET) { 1491 mdev->bm_resync_fo = 0; 1492 } else /* side == C_SYNC_SOURCE */ { 1493 u64 uuid; 1494 1495 get_random_bytes(&uuid, sizeof(u64)); 1496 drbd_uuid_set(mdev, UI_BITMAP, uuid); 1497 drbd_send_sync_uuid(mdev, uuid); 1498 1499 D_ASSERT(mdev->state.disk == D_UP_TO_DATE); 1500 } 1501 1502 write_lock_irq(&global_state_lock); 1503 ns = mdev->state; 1504 1505 ns.aftr_isp = !_drbd_may_sync_now(mdev); 1506 1507 ns.conn = side; 1508 1509 if (side == C_SYNC_TARGET) 1510 ns.disk = D_INCONSISTENT; 1511 else /* side == C_SYNC_SOURCE */ 1512 ns.pdsk = D_INCONSISTENT; 1513 1514 r = __drbd_set_state(mdev, ns, CS_VERBOSE, NULL); 1515 ns = mdev->state; 1516 1517 if (ns.conn < C_CONNECTED) 1518 r = SS_UNKNOWN_ERROR; 1519 1520 if (r == SS_SUCCESS) { 1521 unsigned long tw = drbd_bm_total_weight(mdev); 1522 unsigned long now = jiffies; 1523 int i; 1524 1525 mdev->rs_failed = 0; 1526 mdev->rs_paused = 0; 1527 mdev->rs_same_csum = 0; 1528 mdev->rs_last_events = 0; 1529 mdev->rs_last_sect_ev = 0; 1530 mdev->rs_total = tw; 1531 mdev->rs_start = now; 1532 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1533 mdev->rs_mark_left[i] = tw; 1534 mdev->rs_mark_time[i] = now; 1535 } 1536 _drbd_pause_after(mdev); 1537 } 1538 write_unlock_irq(&global_state_lock); 1539 put_ldev(mdev); 1540 1541 if (r == SS_SUCCESS) { 1542 dev_info(DEV, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1543 drbd_conn_str(ns.conn), 1544 (unsigned long) mdev->rs_total << (BM_BLOCK_SHIFT-10), 1545 (unsigned long) mdev->rs_total); 1546 1547 if (mdev->agreed_pro_version < 95 && mdev->rs_total == 0) { 1548 /* This still has a race (about when exactly the peers 1549 * detect connection loss) that can lead to a full sync 1550 * on next handshake. In 8.3.9 we fixed this with explicit 1551 * resync-finished notifications, but the fix 1552 * introduces a protocol change. Sleeping for some 1553 * time longer than the ping interval + timeout on the 1554 * SyncSource, to give the SyncTarget the chance to 1555 * detect connection loss, then waiting for a ping 1556 * response (implicit in drbd_resync_finished) reduces 1557 * the race considerably, but does not solve it. */ 1558 if (side == C_SYNC_SOURCE) 1559 schedule_timeout_interruptible( 1560 mdev->net_conf->ping_int * HZ + 1561 mdev->net_conf->ping_timeo*HZ/9); 1562 drbd_resync_finished(mdev); 1563 } 1564 1565 atomic_set(&mdev->rs_sect_in, 0); 1566 atomic_set(&mdev->rs_sect_ev, 0); 1567 mdev->rs_in_flight = 0; 1568 mdev->rs_planed = 0; 1569 spin_lock(&mdev->peer_seq_lock); 1570 fifo_set(&mdev->rs_plan_s, 0); 1571 spin_unlock(&mdev->peer_seq_lock); 1572 /* ns.conn may already be != mdev->state.conn, 1573 * we may have been paused in between, or become paused until 1574 * the timer triggers. 1575 * No matter, that is handled in resync_timer_fn() */ 1576 if (ns.conn == C_SYNC_TARGET) 1577 mod_timer(&mdev->resync_timer, jiffies); 1578 1579 drbd_md_sync(mdev); 1580 } 1581 drbd_state_unlock(mdev); 1582} 1583 1584int drbd_worker(struct drbd_thread *thi) 1585{ 1586 struct drbd_conf *mdev = thi->mdev; 1587 struct drbd_work *w = NULL; 1588 LIST_HEAD(work_list); 1589 int intr = 0, i; 1590 1591 sprintf(current->comm, "drbd%d_worker", mdev_to_minor(mdev)); 1592 1593 while (get_t_state(thi) == Running) { 1594 drbd_thread_current_set_cpu(mdev); 1595 1596 if (down_trylock(&mdev->data.work.s)) { 1597 mutex_lock(&mdev->data.mutex); 1598 if (mdev->data.socket && !mdev->net_conf->no_cork) 1599 drbd_tcp_uncork(mdev->data.socket); 1600 mutex_unlock(&mdev->data.mutex); 1601 1602 intr = down_interruptible(&mdev->data.work.s); 1603 1604 mutex_lock(&mdev->data.mutex); 1605 if (mdev->data.socket && !mdev->net_conf->no_cork) 1606 drbd_tcp_cork(mdev->data.socket); 1607 mutex_unlock(&mdev->data.mutex); 1608 } 1609 1610 if (intr) { 1611 D_ASSERT(intr == -EINTR); 1612 flush_signals(current); 1613 ERR_IF (get_t_state(thi) == Running) 1614 continue; 1615 break; 1616 } 1617 1618 if (get_t_state(thi) != Running) 1619 break; 1620 /* With this break, we have done a down() but not consumed 1621 the entry from the list. The cleanup code takes care of 1622 this... */ 1623 1624 w = NULL; 1625 spin_lock_irq(&mdev->data.work.q_lock); 1626 ERR_IF(list_empty(&mdev->data.work.q)) { 1627 /* something terribly wrong in our logic. 1628 * we were able to down() the semaphore, 1629 * but the list is empty... doh. 1630 * 1631 * what is the best thing to do now? 1632 * try again from scratch, restarting the receiver, 1633 * asender, whatnot? could break even more ugly, 1634 * e.g. when we are primary, but no good local data. 1635 * 1636 * I'll try to get away just starting over this loop. 1637 */ 1638 spin_unlock_irq(&mdev->data.work.q_lock); 1639 continue; 1640 } 1641 w = list_entry(mdev->data.work.q.next, struct drbd_work, list); 1642 list_del_init(&w->list); 1643 spin_unlock_irq(&mdev->data.work.q_lock); 1644 1645 if (!w->cb(mdev, w, mdev->state.conn < C_CONNECTED)) { 1646 /* dev_warn(DEV, "worker: a callback failed! \n"); */ 1647 if (mdev->state.conn >= C_CONNECTED) 1648 drbd_force_state(mdev, 1649 NS(conn, C_NETWORK_FAILURE)); 1650 } 1651 } 1652 D_ASSERT(test_bit(DEVICE_DYING, &mdev->flags)); 1653 D_ASSERT(test_bit(CONFIG_PENDING, &mdev->flags)); 1654 1655 spin_lock_irq(&mdev->data.work.q_lock); 1656 i = 0; 1657 while (!list_empty(&mdev->data.work.q)) { 1658 list_splice_init(&mdev->data.work.q, &work_list); 1659 spin_unlock_irq(&mdev->data.work.q_lock); 1660 1661 while (!list_empty(&work_list)) { 1662 w = list_entry(work_list.next, struct drbd_work, list); 1663 list_del_init(&w->list); 1664 w->cb(mdev, w, 1); 1665 i++; /* dead debugging code */ 1666 } 1667 1668 spin_lock_irq(&mdev->data.work.q_lock); 1669 } 1670 sema_init(&mdev->data.work.s, 0); 1671 /* DANGEROUS race: if someone did queue his work within the spinlock, 1672 * but up() ed outside the spinlock, we could get an up() on the 1673 * semaphore without corresponding list entry. 1674 * So don't do that. 1675 */ 1676 spin_unlock_irq(&mdev->data.work.q_lock); 1677 1678 D_ASSERT(mdev->state.disk == D_DISKLESS && mdev->state.conn == C_STANDALONE); 1679 /* _drbd_set_state only uses stop_nowait. 1680 * wait here for the Exiting receiver. */ 1681 drbd_thread_stop(&mdev->receiver); 1682 drbd_mdev_cleanup(mdev); 1683 1684 dev_info(DEV, "worker terminated\n"); 1685 1686 clear_bit(DEVICE_DYING, &mdev->flags); 1687 clear_bit(CONFIG_PENDING, &mdev->flags); 1688 wake_up(&mdev->state_wait); 1689 1690 return 0; 1691} 1692