drbd_worker.c revision a8cd15ba7919eaf1f416857f983a502cc261af26
1/* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24*/ 25 26#include <linux/module.h> 27#include <linux/drbd.h> 28#include <linux/sched.h> 29#include <linux/wait.h> 30#include <linux/mm.h> 31#include <linux/memcontrol.h> 32#include <linux/mm_inline.h> 33#include <linux/slab.h> 34#include <linux/random.h> 35#include <linux/string.h> 36#include <linux/scatterlist.h> 37 38#include "drbd_int.h" 39#include "drbd_protocol.h" 40#include "drbd_req.h" 41 42static int w_make_ov_request(struct drbd_work *, int); 43 44 45/* endio handlers: 46 * drbd_md_io_complete (defined here) 47 * drbd_request_endio (defined here) 48 * drbd_peer_request_endio (defined here) 49 * bm_async_io_complete (defined in drbd_bitmap.c) 50 * 51 * For all these callbacks, note the following: 52 * The callbacks will be called in irq context by the IDE drivers, 53 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 54 * Try to get the locking right :) 55 * 56 */ 57 58 59/* About the global_state_lock 60 Each state transition on an device holds a read lock. In case we have 61 to evaluate the resync after dependencies, we grab a write lock, because 62 we need stable states on all devices for that. */ 63rwlock_t global_state_lock; 64 65/* used for synchronous meta data and bitmap IO 66 * submitted by drbd_md_sync_page_io() 67 */ 68void drbd_md_io_complete(struct bio *bio, int error) 69{ 70 struct drbd_md_io *md_io; 71 struct drbd_device *device; 72 73 md_io = (struct drbd_md_io *)bio->bi_private; 74 device = container_of(md_io, struct drbd_device, md_io); 75 76 md_io->error = error; 77 78 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able 79 * to timeout on the lower level device, and eventually detach from it. 80 * If this io completion runs after that timeout expired, this 81 * drbd_md_put_buffer() may allow us to finally try and re-attach. 82 * During normal operation, this only puts that extra reference 83 * down to 1 again. 84 * Make sure we first drop the reference, and only then signal 85 * completion, or we may (in drbd_al_read_log()) cycle so fast into the 86 * next drbd_md_sync_page_io(), that we trigger the 87 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there. 88 */ 89 drbd_md_put_buffer(device); 90 md_io->done = 1; 91 wake_up(&device->misc_wait); 92 bio_put(bio); 93 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */ 94 put_ldev(device); 95} 96 97/* reads on behalf of the partner, 98 * "submitted" by the receiver 99 */ 100static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local) 101{ 102 unsigned long flags = 0; 103 struct drbd_device *device = peer_req->peer_device->device; 104 105 spin_lock_irqsave(&device->resource->req_lock, flags); 106 device->read_cnt += peer_req->i.size >> 9; 107 list_del(&peer_req->w.list); 108 if (list_empty(&device->read_ee)) 109 wake_up(&device->ee_wait); 110 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 111 __drbd_chk_io_error(device, DRBD_READ_ERROR); 112 spin_unlock_irqrestore(&device->resource->req_lock, flags); 113 114 drbd_queue_work(&first_peer_device(device)->connection->sender_work, 115 &peer_req->w); 116 put_ldev(device); 117} 118 119/* writes on behalf of the partner, or resync writes, 120 * "submitted" by the receiver, final stage. */ 121static void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local) 122{ 123 unsigned long flags = 0; 124 struct drbd_device *device = peer_req->peer_device->device; 125 struct drbd_interval i; 126 int do_wake; 127 u64 block_id; 128 int do_al_complete_io; 129 130 /* after we moved peer_req to done_ee, 131 * we may no longer access it, 132 * it may be freed/reused already! 133 * (as soon as we release the req_lock) */ 134 i = peer_req->i; 135 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO; 136 block_id = peer_req->block_id; 137 138 spin_lock_irqsave(&device->resource->req_lock, flags); 139 device->writ_cnt += peer_req->i.size >> 9; 140 list_move_tail(&peer_req->w.list, &device->done_ee); 141 142 /* 143 * Do not remove from the write_requests tree here: we did not send the 144 * Ack yet and did not wake possibly waiting conflicting requests. 145 * Removed from the tree from "drbd_process_done_ee" within the 146 * appropriate dw.cb (e_end_block/e_end_resync_block) or from 147 * _drbd_clear_done_ee. 148 */ 149 150 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee); 151 152 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 153 __drbd_chk_io_error(device, DRBD_WRITE_ERROR); 154 spin_unlock_irqrestore(&device->resource->req_lock, flags); 155 156 if (block_id == ID_SYNCER) 157 drbd_rs_complete_io(device, i.sector); 158 159 if (do_wake) 160 wake_up(&device->ee_wait); 161 162 if (do_al_complete_io) 163 drbd_al_complete_io(device, &i); 164 165 wake_asender(first_peer_device(device)->connection); 166 put_ldev(device); 167} 168 169/* writes on behalf of the partner, or resync writes, 170 * "submitted" by the receiver. 171 */ 172void drbd_peer_request_endio(struct bio *bio, int error) 173{ 174 struct drbd_peer_request *peer_req = bio->bi_private; 175 struct drbd_device *device = peer_req->peer_device->device; 176 int uptodate = bio_flagged(bio, BIO_UPTODATE); 177 int is_write = bio_data_dir(bio) == WRITE; 178 179 if (error && __ratelimit(&drbd_ratelimit_state)) 180 drbd_warn(device, "%s: error=%d s=%llus\n", 181 is_write ? "write" : "read", error, 182 (unsigned long long)peer_req->i.sector); 183 if (!error && !uptodate) { 184 if (__ratelimit(&drbd_ratelimit_state)) 185 drbd_warn(device, "%s: setting error to -EIO s=%llus\n", 186 is_write ? "write" : "read", 187 (unsigned long long)peer_req->i.sector); 188 /* strange behavior of some lower level drivers... 189 * fail the request by clearing the uptodate flag, 190 * but do not return any error?! */ 191 error = -EIO; 192 } 193 194 if (error) 195 set_bit(__EE_WAS_ERROR, &peer_req->flags); 196 197 bio_put(bio); /* no need for the bio anymore */ 198 if (atomic_dec_and_test(&peer_req->pending_bios)) { 199 if (is_write) 200 drbd_endio_write_sec_final(peer_req); 201 else 202 drbd_endio_read_sec_final(peer_req); 203 } 204} 205 206/* read, readA or write requests on R_PRIMARY coming from drbd_make_request 207 */ 208void drbd_request_endio(struct bio *bio, int error) 209{ 210 unsigned long flags; 211 struct drbd_request *req = bio->bi_private; 212 struct drbd_device *device = req->device; 213 struct bio_and_error m; 214 enum drbd_req_event what; 215 int uptodate = bio_flagged(bio, BIO_UPTODATE); 216 217 if (!error && !uptodate) { 218 drbd_warn(device, "p %s: setting error to -EIO\n", 219 bio_data_dir(bio) == WRITE ? "write" : "read"); 220 /* strange behavior of some lower level drivers... 221 * fail the request by clearing the uptodate flag, 222 * but do not return any error?! */ 223 error = -EIO; 224 } 225 226 227 /* If this request was aborted locally before, 228 * but now was completed "successfully", 229 * chances are that this caused arbitrary data corruption. 230 * 231 * "aborting" requests, or force-detaching the disk, is intended for 232 * completely blocked/hung local backing devices which do no longer 233 * complete requests at all, not even do error completions. In this 234 * situation, usually a hard-reset and failover is the only way out. 235 * 236 * By "aborting", basically faking a local error-completion, 237 * we allow for a more graceful swichover by cleanly migrating services. 238 * Still the affected node has to be rebooted "soon". 239 * 240 * By completing these requests, we allow the upper layers to re-use 241 * the associated data pages. 242 * 243 * If later the local backing device "recovers", and now DMAs some data 244 * from disk into the original request pages, in the best case it will 245 * just put random data into unused pages; but typically it will corrupt 246 * meanwhile completely unrelated data, causing all sorts of damage. 247 * 248 * Which means delayed successful completion, 249 * especially for READ requests, 250 * is a reason to panic(). 251 * 252 * We assume that a delayed *error* completion is OK, 253 * though we still will complain noisily about it. 254 */ 255 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) { 256 if (__ratelimit(&drbd_ratelimit_state)) 257 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n"); 258 259 if (!error) 260 panic("possible random memory corruption caused by delayed completion of aborted local request\n"); 261 } 262 263 /* to avoid recursion in __req_mod */ 264 if (unlikely(error)) { 265 what = (bio_data_dir(bio) == WRITE) 266 ? WRITE_COMPLETED_WITH_ERROR 267 : (bio_rw(bio) == READ) 268 ? READ_COMPLETED_WITH_ERROR 269 : READ_AHEAD_COMPLETED_WITH_ERROR; 270 } else 271 what = COMPLETED_OK; 272 273 bio_put(req->private_bio); 274 req->private_bio = ERR_PTR(error); 275 276 /* not req_mod(), we need irqsave here! */ 277 spin_lock_irqsave(&device->resource->req_lock, flags); 278 __req_mod(req, what, &m); 279 spin_unlock_irqrestore(&device->resource->req_lock, flags); 280 put_ldev(device); 281 282 if (m.bio) 283 complete_master_bio(device, &m); 284} 285 286void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest) 287{ 288 struct hash_desc desc; 289 struct scatterlist sg; 290 struct page *page = peer_req->pages; 291 struct page *tmp; 292 unsigned len; 293 294 desc.tfm = tfm; 295 desc.flags = 0; 296 297 sg_init_table(&sg, 1); 298 crypto_hash_init(&desc); 299 300 while ((tmp = page_chain_next(page))) { 301 /* all but the last page will be fully used */ 302 sg_set_page(&sg, page, PAGE_SIZE, 0); 303 crypto_hash_update(&desc, &sg, sg.length); 304 page = tmp; 305 } 306 /* and now the last, possibly only partially used page */ 307 len = peer_req->i.size & (PAGE_SIZE - 1); 308 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 309 crypto_hash_update(&desc, &sg, sg.length); 310 crypto_hash_final(&desc, digest); 311} 312 313void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest) 314{ 315 struct hash_desc desc; 316 struct scatterlist sg; 317 struct bio_vec bvec; 318 struct bvec_iter iter; 319 320 desc.tfm = tfm; 321 desc.flags = 0; 322 323 sg_init_table(&sg, 1); 324 crypto_hash_init(&desc); 325 326 bio_for_each_segment(bvec, bio, iter) { 327 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset); 328 crypto_hash_update(&desc, &sg, sg.length); 329 } 330 crypto_hash_final(&desc, digest); 331} 332 333/* MAYBE merge common code with w_e_end_ov_req */ 334static int w_e_send_csum(struct drbd_work *w, int cancel) 335{ 336 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 337 struct drbd_device *device = peer_req->peer_device->device; 338 int digest_size; 339 void *digest; 340 int err = 0; 341 342 if (unlikely(cancel)) 343 goto out; 344 345 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0)) 346 goto out; 347 348 digest_size = crypto_hash_digestsize(first_peer_device(device)->connection->csums_tfm); 349 digest = kmalloc(digest_size, GFP_NOIO); 350 if (digest) { 351 sector_t sector = peer_req->i.sector; 352 unsigned int size = peer_req->i.size; 353 drbd_csum_ee(first_peer_device(device)->connection->csums_tfm, peer_req, digest); 354 /* Free peer_req and pages before send. 355 * In case we block on congestion, we could otherwise run into 356 * some distributed deadlock, if the other side blocks on 357 * congestion as well, because our receiver blocks in 358 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 359 drbd_free_peer_req(device, peer_req); 360 peer_req = NULL; 361 inc_rs_pending(device); 362 err = drbd_send_drequest_csum(first_peer_device(device), sector, size, 363 digest, digest_size, 364 P_CSUM_RS_REQUEST); 365 kfree(digest); 366 } else { 367 drbd_err(device, "kmalloc() of digest failed.\n"); 368 err = -ENOMEM; 369 } 370 371out: 372 if (peer_req) 373 drbd_free_peer_req(device, peer_req); 374 375 if (unlikely(err)) 376 drbd_err(device, "drbd_send_drequest(..., csum) failed\n"); 377 return err; 378} 379 380#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 381 382static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size) 383{ 384 struct drbd_device *device = peer_device->device; 385 struct drbd_peer_request *peer_req; 386 387 if (!get_ldev(device)) 388 return -EIO; 389 390 if (drbd_rs_should_slow_down(device, sector)) 391 goto defer; 392 393 /* GFP_TRY, because if there is no memory available right now, this may 394 * be rescheduled for later. It is "only" background resync, after all. */ 395 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector, 396 size, GFP_TRY); 397 if (!peer_req) 398 goto defer; 399 400 peer_req->w.cb = w_e_send_csum; 401 spin_lock_irq(&device->resource->req_lock); 402 list_add(&peer_req->w.list, &device->read_ee); 403 spin_unlock_irq(&device->resource->req_lock); 404 405 atomic_add(size >> 9, &device->rs_sect_ev); 406 if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0) 407 return 0; 408 409 /* If it failed because of ENOMEM, retry should help. If it failed 410 * because bio_add_page failed (probably broken lower level driver), 411 * retry may or may not help. 412 * If it does not, you may need to force disconnect. */ 413 spin_lock_irq(&device->resource->req_lock); 414 list_del(&peer_req->w.list); 415 spin_unlock_irq(&device->resource->req_lock); 416 417 drbd_free_peer_req(device, peer_req); 418defer: 419 put_ldev(device); 420 return -EAGAIN; 421} 422 423int w_resync_timer(struct drbd_work *w, int cancel) 424{ 425 struct drbd_device *device = 426 container_of(w, struct drbd_device, resync_work); 427 428 switch (device->state.conn) { 429 case C_VERIFY_S: 430 w_make_ov_request(w, cancel); 431 break; 432 case C_SYNC_TARGET: 433 w_make_resync_request(w, cancel); 434 break; 435 } 436 437 return 0; 438} 439 440void resync_timer_fn(unsigned long data) 441{ 442 struct drbd_device *device = (struct drbd_device *) data; 443 444 if (list_empty(&device->resync_work.list)) 445 drbd_queue_work(&first_peer_device(device)->connection->sender_work, 446 &device->resync_work); 447} 448 449static void fifo_set(struct fifo_buffer *fb, int value) 450{ 451 int i; 452 453 for (i = 0; i < fb->size; i++) 454 fb->values[i] = value; 455} 456 457static int fifo_push(struct fifo_buffer *fb, int value) 458{ 459 int ov; 460 461 ov = fb->values[fb->head_index]; 462 fb->values[fb->head_index++] = value; 463 464 if (fb->head_index >= fb->size) 465 fb->head_index = 0; 466 467 return ov; 468} 469 470static void fifo_add_val(struct fifo_buffer *fb, int value) 471{ 472 int i; 473 474 for (i = 0; i < fb->size; i++) 475 fb->values[i] += value; 476} 477 478struct fifo_buffer *fifo_alloc(int fifo_size) 479{ 480 struct fifo_buffer *fb; 481 482 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO); 483 if (!fb) 484 return NULL; 485 486 fb->head_index = 0; 487 fb->size = fifo_size; 488 fb->total = 0; 489 490 return fb; 491} 492 493static int drbd_rs_controller(struct drbd_device *device) 494{ 495 struct disk_conf *dc; 496 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 497 unsigned int want; /* The number of sectors we want in the proxy */ 498 int req_sect; /* Number of sectors to request in this turn */ 499 int correction; /* Number of sectors more we need in the proxy*/ 500 int cps; /* correction per invocation of drbd_rs_controller() */ 501 int steps; /* Number of time steps to plan ahead */ 502 int curr_corr; 503 int max_sect; 504 struct fifo_buffer *plan; 505 506 sect_in = atomic_xchg(&device->rs_sect_in, 0); /* Number of sectors that came in */ 507 device->rs_in_flight -= sect_in; 508 509 dc = rcu_dereference(device->ldev->disk_conf); 510 plan = rcu_dereference(device->rs_plan_s); 511 512 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 513 514 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */ 515 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps; 516 } else { /* normal path */ 517 want = dc->c_fill_target ? dc->c_fill_target : 518 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10); 519 } 520 521 correction = want - device->rs_in_flight - plan->total; 522 523 /* Plan ahead */ 524 cps = correction / steps; 525 fifo_add_val(plan, cps); 526 plan->total += cps * steps; 527 528 /* What we do in this step */ 529 curr_corr = fifo_push(plan, 0); 530 plan->total -= curr_corr; 531 532 req_sect = sect_in + curr_corr; 533 if (req_sect < 0) 534 req_sect = 0; 535 536 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ; 537 if (req_sect > max_sect) 538 req_sect = max_sect; 539 540 /* 541 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 542 sect_in, device->rs_in_flight, want, correction, 543 steps, cps, device->rs_planed, curr_corr, req_sect); 544 */ 545 546 return req_sect; 547} 548 549static int drbd_rs_number_requests(struct drbd_device *device) 550{ 551 int number; 552 553 rcu_read_lock(); 554 if (rcu_dereference(device->rs_plan_s)->size) { 555 number = drbd_rs_controller(device) >> (BM_BLOCK_SHIFT - 9); 556 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 557 } else { 558 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate; 559 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 560 } 561 rcu_read_unlock(); 562 563 /* ignore the amount of pending requests, the resync controller should 564 * throttle down to incoming reply rate soon enough anyways. */ 565 return number; 566} 567 568int w_make_resync_request(struct drbd_work *w, int cancel) 569{ 570 struct drbd_device_work *dw = device_work(w); 571 struct drbd_device *device = dw->device; 572 unsigned long bit; 573 sector_t sector; 574 const sector_t capacity = drbd_get_capacity(device->this_bdev); 575 int max_bio_size; 576 int number, rollback_i, size; 577 int align, queued, sndbuf; 578 int i = 0; 579 580 if (unlikely(cancel)) 581 return 0; 582 583 if (device->rs_total == 0) { 584 /* empty resync? */ 585 drbd_resync_finished(device); 586 return 0; 587 } 588 589 if (!get_ldev(device)) { 590 /* Since we only need to access device->rsync a 591 get_ldev_if_state(device,D_FAILED) would be sufficient, but 592 to continue resync with a broken disk makes no sense at 593 all */ 594 drbd_err(device, "Disk broke down during resync!\n"); 595 return 0; 596 } 597 598 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9; 599 number = drbd_rs_number_requests(device); 600 if (number == 0) 601 goto requeue; 602 603 for (i = 0; i < number; i++) { 604 /* Stop generating RS requests, when half of the send buffer is filled */ 605 mutex_lock(&first_peer_device(device)->connection->data.mutex); 606 if (first_peer_device(device)->connection->data.socket) { 607 queued = first_peer_device(device)->connection->data.socket->sk->sk_wmem_queued; 608 sndbuf = first_peer_device(device)->connection->data.socket->sk->sk_sndbuf; 609 } else { 610 queued = 1; 611 sndbuf = 0; 612 } 613 mutex_unlock(&first_peer_device(device)->connection->data.mutex); 614 if (queued > sndbuf / 2) 615 goto requeue; 616 617next_sector: 618 size = BM_BLOCK_SIZE; 619 bit = drbd_bm_find_next(device, device->bm_resync_fo); 620 621 if (bit == DRBD_END_OF_BITMAP) { 622 device->bm_resync_fo = drbd_bm_bits(device); 623 put_ldev(device); 624 return 0; 625 } 626 627 sector = BM_BIT_TO_SECT(bit); 628 629 if (drbd_rs_should_slow_down(device, sector) || 630 drbd_try_rs_begin_io(device, sector)) { 631 device->bm_resync_fo = bit; 632 goto requeue; 633 } 634 device->bm_resync_fo = bit + 1; 635 636 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) { 637 drbd_rs_complete_io(device, sector); 638 goto next_sector; 639 } 640 641#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE 642 /* try to find some adjacent bits. 643 * we stop if we have already the maximum req size. 644 * 645 * Additionally always align bigger requests, in order to 646 * be prepared for all stripe sizes of software RAIDs. 647 */ 648 align = 1; 649 rollback_i = i; 650 for (;;) { 651 if (size + BM_BLOCK_SIZE > max_bio_size) 652 break; 653 654 /* Be always aligned */ 655 if (sector & ((1<<(align+3))-1)) 656 break; 657 658 /* do not cross extent boundaries */ 659 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 660 break; 661 /* now, is it actually dirty, after all? 662 * caution, drbd_bm_test_bit is tri-state for some 663 * obscure reason; ( b == 0 ) would get the out-of-band 664 * only accidentally right because of the "oddly sized" 665 * adjustment below */ 666 if (drbd_bm_test_bit(device, bit+1) != 1) 667 break; 668 bit++; 669 size += BM_BLOCK_SIZE; 670 if ((BM_BLOCK_SIZE << align) <= size) 671 align++; 672 i++; 673 } 674 /* if we merged some, 675 * reset the offset to start the next drbd_bm_find_next from */ 676 if (size > BM_BLOCK_SIZE) 677 device->bm_resync_fo = bit + 1; 678#endif 679 680 /* adjust very last sectors, in case we are oddly sized */ 681 if (sector + (size>>9) > capacity) 682 size = (capacity-sector)<<9; 683 if (first_peer_device(device)->connection->agreed_pro_version >= 89 && 684 first_peer_device(device)->connection->csums_tfm) { 685 switch (read_for_csum(first_peer_device(device), sector, size)) { 686 case -EIO: /* Disk failure */ 687 put_ldev(device); 688 return -EIO; 689 case -EAGAIN: /* allocation failed, or ldev busy */ 690 drbd_rs_complete_io(device, sector); 691 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 692 i = rollback_i; 693 goto requeue; 694 case 0: 695 /* everything ok */ 696 break; 697 default: 698 BUG(); 699 } 700 } else { 701 int err; 702 703 inc_rs_pending(device); 704 err = drbd_send_drequest(first_peer_device(device), P_RS_DATA_REQUEST, 705 sector, size, ID_SYNCER); 706 if (err) { 707 drbd_err(device, "drbd_send_drequest() failed, aborting...\n"); 708 dec_rs_pending(device); 709 put_ldev(device); 710 return err; 711 } 712 } 713 } 714 715 if (device->bm_resync_fo >= drbd_bm_bits(device)) { 716 /* last syncer _request_ was sent, 717 * but the P_RS_DATA_REPLY not yet received. sync will end (and 718 * next sync group will resume), as soon as we receive the last 719 * resync data block, and the last bit is cleared. 720 * until then resync "work" is "inactive" ... 721 */ 722 put_ldev(device); 723 return 0; 724 } 725 726 requeue: 727 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 728 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 729 put_ldev(device); 730 return 0; 731} 732 733static int w_make_ov_request(struct drbd_work *w, int cancel) 734{ 735 struct drbd_device *device = device_work(w)->device; 736 int number, i, size; 737 sector_t sector; 738 const sector_t capacity = drbd_get_capacity(device->this_bdev); 739 bool stop_sector_reached = false; 740 741 if (unlikely(cancel)) 742 return 1; 743 744 number = drbd_rs_number_requests(device); 745 746 sector = device->ov_position; 747 for (i = 0; i < number; i++) { 748 if (sector >= capacity) 749 return 1; 750 751 /* We check for "finished" only in the reply path: 752 * w_e_end_ov_reply(). 753 * We need to send at least one request out. */ 754 stop_sector_reached = i > 0 755 && verify_can_do_stop_sector(device) 756 && sector >= device->ov_stop_sector; 757 if (stop_sector_reached) 758 break; 759 760 size = BM_BLOCK_SIZE; 761 762 if (drbd_rs_should_slow_down(device, sector) || 763 drbd_try_rs_begin_io(device, sector)) { 764 device->ov_position = sector; 765 goto requeue; 766 } 767 768 if (sector + (size>>9) > capacity) 769 size = (capacity-sector)<<9; 770 771 inc_rs_pending(device); 772 if (drbd_send_ov_request(first_peer_device(device), sector, size)) { 773 dec_rs_pending(device); 774 return 0; 775 } 776 sector += BM_SECT_PER_BIT; 777 } 778 device->ov_position = sector; 779 780 requeue: 781 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 782 if (i == 0 || !stop_sector_reached) 783 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 784 return 1; 785} 786 787int w_ov_finished(struct drbd_work *w, int cancel) 788{ 789 struct drbd_device_work *dw = 790 container_of(w, struct drbd_device_work, w); 791 struct drbd_device *device = dw->device; 792 kfree(dw); 793 ov_out_of_sync_print(device); 794 drbd_resync_finished(device); 795 796 return 0; 797} 798 799static int w_resync_finished(struct drbd_work *w, int cancel) 800{ 801 struct drbd_device_work *dw = 802 container_of(w, struct drbd_device_work, w); 803 struct drbd_device *device = dw->device; 804 kfree(dw); 805 806 drbd_resync_finished(device); 807 808 return 0; 809} 810 811static void ping_peer(struct drbd_device *device) 812{ 813 struct drbd_connection *connection = first_peer_device(device)->connection; 814 815 clear_bit(GOT_PING_ACK, &connection->flags); 816 request_ping(connection); 817 wait_event(connection->ping_wait, 818 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED); 819} 820 821int drbd_resync_finished(struct drbd_device *device) 822{ 823 unsigned long db, dt, dbdt; 824 unsigned long n_oos; 825 union drbd_state os, ns; 826 struct drbd_device_work *dw; 827 char *khelper_cmd = NULL; 828 int verify_done = 0; 829 830 /* Remove all elements from the resync LRU. Since future actions 831 * might set bits in the (main) bitmap, then the entries in the 832 * resync LRU would be wrong. */ 833 if (drbd_rs_del_all(device)) { 834 /* In case this is not possible now, most probably because 835 * there are P_RS_DATA_REPLY Packets lingering on the worker's 836 * queue (or even the read operations for those packets 837 * is not finished by now). Retry in 100ms. */ 838 839 schedule_timeout_interruptible(HZ / 10); 840 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC); 841 if (dw) { 842 dw->w.cb = w_resync_finished; 843 dw->device = device; 844 drbd_queue_work(&first_peer_device(device)->connection->sender_work, 845 &dw->w); 846 return 1; 847 } 848 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n"); 849 } 850 851 dt = (jiffies - device->rs_start - device->rs_paused) / HZ; 852 if (dt <= 0) 853 dt = 1; 854 855 db = device->rs_total; 856 /* adjust for verify start and stop sectors, respective reached position */ 857 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 858 db -= device->ov_left; 859 860 dbdt = Bit2KB(db/dt); 861 device->rs_paused /= HZ; 862 863 if (!get_ldev(device)) 864 goto out; 865 866 ping_peer(device); 867 868 spin_lock_irq(&device->resource->req_lock); 869 os = drbd_read_state(device); 870 871 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T); 872 873 /* This protects us against multiple calls (that can happen in the presence 874 of application IO), and against connectivity loss just before we arrive here. */ 875 if (os.conn <= C_CONNECTED) 876 goto out_unlock; 877 878 ns = os; 879 ns.conn = C_CONNECTED; 880 881 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 882 verify_done ? "Online verify" : "Resync", 883 dt + device->rs_paused, device->rs_paused, dbdt); 884 885 n_oos = drbd_bm_total_weight(device); 886 887 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 888 if (n_oos) { 889 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n", 890 n_oos, Bit2KB(1)); 891 khelper_cmd = "out-of-sync"; 892 } 893 } else { 894 D_ASSERT(device, (n_oos - device->rs_failed) == 0); 895 896 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 897 khelper_cmd = "after-resync-target"; 898 899 if (first_peer_device(device)->connection->csums_tfm && device->rs_total) { 900 const unsigned long s = device->rs_same_csum; 901 const unsigned long t = device->rs_total; 902 const int ratio = 903 (t == 0) ? 0 : 904 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 905 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; " 906 "transferred %luK total %luK\n", 907 ratio, 908 Bit2KB(device->rs_same_csum), 909 Bit2KB(device->rs_total - device->rs_same_csum), 910 Bit2KB(device->rs_total)); 911 } 912 } 913 914 if (device->rs_failed) { 915 drbd_info(device, " %lu failed blocks\n", device->rs_failed); 916 917 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 918 ns.disk = D_INCONSISTENT; 919 ns.pdsk = D_UP_TO_DATE; 920 } else { 921 ns.disk = D_UP_TO_DATE; 922 ns.pdsk = D_INCONSISTENT; 923 } 924 } else { 925 ns.disk = D_UP_TO_DATE; 926 ns.pdsk = D_UP_TO_DATE; 927 928 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 929 if (device->p_uuid) { 930 int i; 931 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 932 _drbd_uuid_set(device, i, device->p_uuid[i]); 933 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]); 934 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]); 935 } else { 936 drbd_err(device, "device->p_uuid is NULL! BUG\n"); 937 } 938 } 939 940 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) { 941 /* for verify runs, we don't update uuids here, 942 * so there would be nothing to report. */ 943 drbd_uuid_set_bm(device, 0UL); 944 drbd_print_uuids(device, "updated UUIDs"); 945 if (device->p_uuid) { 946 /* Now the two UUID sets are equal, update what we 947 * know of the peer. */ 948 int i; 949 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 950 device->p_uuid[i] = device->ldev->md.uuid[i]; 951 } 952 } 953 } 954 955 _drbd_set_state(device, ns, CS_VERBOSE, NULL); 956out_unlock: 957 spin_unlock_irq(&device->resource->req_lock); 958 put_ldev(device); 959out: 960 device->rs_total = 0; 961 device->rs_failed = 0; 962 device->rs_paused = 0; 963 964 /* reset start sector, if we reached end of device */ 965 if (verify_done && device->ov_left == 0) 966 device->ov_start_sector = 0; 967 968 drbd_md_sync(device); 969 970 if (khelper_cmd) 971 drbd_khelper(device, khelper_cmd); 972 973 return 1; 974} 975 976/* helper */ 977static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req) 978{ 979 if (drbd_peer_req_has_active_page(peer_req)) { 980 /* This might happen if sendpage() has not finished */ 981 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT; 982 atomic_add(i, &device->pp_in_use_by_net); 983 atomic_sub(i, &device->pp_in_use); 984 spin_lock_irq(&device->resource->req_lock); 985 list_add_tail(&peer_req->w.list, &device->net_ee); 986 spin_unlock_irq(&device->resource->req_lock); 987 wake_up(&drbd_pp_wait); 988 } else 989 drbd_free_peer_req(device, peer_req); 990} 991 992/** 993 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 994 * @device: DRBD device. 995 * @w: work object. 996 * @cancel: The connection will be closed anyways 997 */ 998int w_e_end_data_req(struct drbd_work *w, int cancel) 999{ 1000 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1001 struct drbd_device *device = peer_req->peer_device->device; 1002 int err; 1003 1004 if (unlikely(cancel)) { 1005 drbd_free_peer_req(device, peer_req); 1006 dec_unacked(device); 1007 return 0; 1008 } 1009 1010 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1011 err = drbd_send_block(first_peer_device(device), P_DATA_REPLY, peer_req); 1012 } else { 1013 if (__ratelimit(&drbd_ratelimit_state)) 1014 drbd_err(device, "Sending NegDReply. sector=%llus.\n", 1015 (unsigned long long)peer_req->i.sector); 1016 1017 err = drbd_send_ack(first_peer_device(device), P_NEG_DREPLY, peer_req); 1018 } 1019 1020 dec_unacked(device); 1021 1022 move_to_net_ee_or_free(device, peer_req); 1023 1024 if (unlikely(err)) 1025 drbd_err(device, "drbd_send_block() failed\n"); 1026 return err; 1027} 1028 1029/** 1030 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST 1031 * @w: work object. 1032 * @cancel: The connection will be closed anyways 1033 */ 1034int w_e_end_rsdata_req(struct drbd_work *w, int cancel) 1035{ 1036 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1037 struct drbd_device *device = peer_req->peer_device->device; 1038 int err; 1039 1040 if (unlikely(cancel)) { 1041 drbd_free_peer_req(device, peer_req); 1042 dec_unacked(device); 1043 return 0; 1044 } 1045 1046 if (get_ldev_if_state(device, D_FAILED)) { 1047 drbd_rs_complete_io(device, peer_req->i.sector); 1048 put_ldev(device); 1049 } 1050 1051 if (device->state.conn == C_AHEAD) { 1052 err = drbd_send_ack(first_peer_device(device), P_RS_CANCEL, peer_req); 1053 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1054 if (likely(device->state.pdsk >= D_INCONSISTENT)) { 1055 inc_rs_pending(device); 1056 err = drbd_send_block(first_peer_device(device), P_RS_DATA_REPLY, peer_req); 1057 } else { 1058 if (__ratelimit(&drbd_ratelimit_state)) 1059 drbd_err(device, "Not sending RSDataReply, " 1060 "partner DISKLESS!\n"); 1061 err = 0; 1062 } 1063 } else { 1064 if (__ratelimit(&drbd_ratelimit_state)) 1065 drbd_err(device, "Sending NegRSDReply. sector %llus.\n", 1066 (unsigned long long)peer_req->i.sector); 1067 1068 err = drbd_send_ack(first_peer_device(device), P_NEG_RS_DREPLY, peer_req); 1069 1070 /* update resync data with failure */ 1071 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size); 1072 } 1073 1074 dec_unacked(device); 1075 1076 move_to_net_ee_or_free(device, peer_req); 1077 1078 if (unlikely(err)) 1079 drbd_err(device, "drbd_send_block() failed\n"); 1080 return err; 1081} 1082 1083int w_e_end_csum_rs_req(struct drbd_work *w, int cancel) 1084{ 1085 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1086 struct drbd_device *device = peer_req->peer_device->device; 1087 struct digest_info *di; 1088 int digest_size; 1089 void *digest = NULL; 1090 int err, eq = 0; 1091 1092 if (unlikely(cancel)) { 1093 drbd_free_peer_req(device, peer_req); 1094 dec_unacked(device); 1095 return 0; 1096 } 1097 1098 if (get_ldev(device)) { 1099 drbd_rs_complete_io(device, peer_req->i.sector); 1100 put_ldev(device); 1101 } 1102 1103 di = peer_req->digest; 1104 1105 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1106 /* quick hack to try to avoid a race against reconfiguration. 1107 * a real fix would be much more involved, 1108 * introducing more locking mechanisms */ 1109 if (first_peer_device(device)->connection->csums_tfm) { 1110 digest_size = crypto_hash_digestsize(first_peer_device(device)->connection->csums_tfm); 1111 D_ASSERT(device, digest_size == di->digest_size); 1112 digest = kmalloc(digest_size, GFP_NOIO); 1113 } 1114 if (digest) { 1115 drbd_csum_ee(first_peer_device(device)->connection->csums_tfm, peer_req, digest); 1116 eq = !memcmp(digest, di->digest, digest_size); 1117 kfree(digest); 1118 } 1119 1120 if (eq) { 1121 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size); 1122 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1123 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT; 1124 err = drbd_send_ack(first_peer_device(device), P_RS_IS_IN_SYNC, peer_req); 1125 } else { 1126 inc_rs_pending(device); 1127 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1128 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */ 1129 kfree(di); 1130 err = drbd_send_block(first_peer_device(device), P_RS_DATA_REPLY, peer_req); 1131 } 1132 } else { 1133 err = drbd_send_ack(first_peer_device(device), P_NEG_RS_DREPLY, peer_req); 1134 if (__ratelimit(&drbd_ratelimit_state)) 1135 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n"); 1136 } 1137 1138 dec_unacked(device); 1139 move_to_net_ee_or_free(device, peer_req); 1140 1141 if (unlikely(err)) 1142 drbd_err(device, "drbd_send_block/ack() failed\n"); 1143 return err; 1144} 1145 1146int w_e_end_ov_req(struct drbd_work *w, int cancel) 1147{ 1148 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1149 struct drbd_device *device = peer_req->peer_device->device; 1150 sector_t sector = peer_req->i.sector; 1151 unsigned int size = peer_req->i.size; 1152 int digest_size; 1153 void *digest; 1154 int err = 0; 1155 1156 if (unlikely(cancel)) 1157 goto out; 1158 1159 digest_size = crypto_hash_digestsize(first_peer_device(device)->connection->verify_tfm); 1160 digest = kmalloc(digest_size, GFP_NOIO); 1161 if (!digest) { 1162 err = 1; /* terminate the connection in case the allocation failed */ 1163 goto out; 1164 } 1165 1166 if (likely(!(peer_req->flags & EE_WAS_ERROR))) 1167 drbd_csum_ee(first_peer_device(device)->connection->verify_tfm, peer_req, digest); 1168 else 1169 memset(digest, 0, digest_size); 1170 1171 /* Free e and pages before send. 1172 * In case we block on congestion, we could otherwise run into 1173 * some distributed deadlock, if the other side blocks on 1174 * congestion as well, because our receiver blocks in 1175 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1176 drbd_free_peer_req(device, peer_req); 1177 peer_req = NULL; 1178 inc_rs_pending(device); 1179 err = drbd_send_drequest_csum(first_peer_device(device), sector, size, digest, digest_size, P_OV_REPLY); 1180 if (err) 1181 dec_rs_pending(device); 1182 kfree(digest); 1183 1184out: 1185 if (peer_req) 1186 drbd_free_peer_req(device, peer_req); 1187 dec_unacked(device); 1188 return err; 1189} 1190 1191void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size) 1192{ 1193 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) { 1194 device->ov_last_oos_size += size>>9; 1195 } else { 1196 device->ov_last_oos_start = sector; 1197 device->ov_last_oos_size = size>>9; 1198 } 1199 drbd_set_out_of_sync(device, sector, size); 1200} 1201 1202int w_e_end_ov_reply(struct drbd_work *w, int cancel) 1203{ 1204 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1205 struct drbd_device *device = peer_req->peer_device->device; 1206 struct digest_info *di; 1207 void *digest; 1208 sector_t sector = peer_req->i.sector; 1209 unsigned int size = peer_req->i.size; 1210 int digest_size; 1211 int err, eq = 0; 1212 bool stop_sector_reached = false; 1213 1214 if (unlikely(cancel)) { 1215 drbd_free_peer_req(device, peer_req); 1216 dec_unacked(device); 1217 return 0; 1218 } 1219 1220 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1221 * the resync lru has been cleaned up already */ 1222 if (get_ldev(device)) { 1223 drbd_rs_complete_io(device, peer_req->i.sector); 1224 put_ldev(device); 1225 } 1226 1227 di = peer_req->digest; 1228 1229 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1230 digest_size = crypto_hash_digestsize(first_peer_device(device)->connection->verify_tfm); 1231 digest = kmalloc(digest_size, GFP_NOIO); 1232 if (digest) { 1233 drbd_csum_ee(first_peer_device(device)->connection->verify_tfm, peer_req, digest); 1234 1235 D_ASSERT(device, digest_size == di->digest_size); 1236 eq = !memcmp(digest, di->digest, digest_size); 1237 kfree(digest); 1238 } 1239 } 1240 1241 /* Free peer_req and pages before send. 1242 * In case we block on congestion, we could otherwise run into 1243 * some distributed deadlock, if the other side blocks on 1244 * congestion as well, because our receiver blocks in 1245 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1246 drbd_free_peer_req(device, peer_req); 1247 if (!eq) 1248 drbd_ov_out_of_sync_found(device, sector, size); 1249 else 1250 ov_out_of_sync_print(device); 1251 1252 err = drbd_send_ack_ex(first_peer_device(device), P_OV_RESULT, sector, size, 1253 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1254 1255 dec_unacked(device); 1256 1257 --device->ov_left; 1258 1259 /* let's advance progress step marks only for every other megabyte */ 1260 if ((device->ov_left & 0x200) == 0x200) 1261 drbd_advance_rs_marks(device, device->ov_left); 1262 1263 stop_sector_reached = verify_can_do_stop_sector(device) && 1264 (sector + (size>>9)) >= device->ov_stop_sector; 1265 1266 if (device->ov_left == 0 || stop_sector_reached) { 1267 ov_out_of_sync_print(device); 1268 drbd_resync_finished(device); 1269 } 1270 1271 return err; 1272} 1273 1274/* FIXME 1275 * We need to track the number of pending barrier acks, 1276 * and to be able to wait for them. 1277 * See also comment in drbd_adm_attach before drbd_suspend_io. 1278 */ 1279static int drbd_send_barrier(struct drbd_connection *connection) 1280{ 1281 struct p_barrier *p; 1282 struct drbd_socket *sock; 1283 1284 sock = &connection->data; 1285 p = conn_prepare_command(connection, sock); 1286 if (!p) 1287 return -EIO; 1288 p->barrier = connection->send.current_epoch_nr; 1289 p->pad = 0; 1290 connection->send.current_epoch_writes = 0; 1291 1292 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0); 1293} 1294 1295int w_send_write_hint(struct drbd_work *w, int cancel) 1296{ 1297 struct drbd_device *device = 1298 container_of(w, struct drbd_device, unplug_work); 1299 struct drbd_socket *sock; 1300 1301 if (cancel) 1302 return 0; 1303 sock = &first_peer_device(device)->connection->data; 1304 if (!drbd_prepare_command(first_peer_device(device), sock)) 1305 return -EIO; 1306 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0); 1307} 1308 1309static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch) 1310{ 1311 if (!connection->send.seen_any_write_yet) { 1312 connection->send.seen_any_write_yet = true; 1313 connection->send.current_epoch_nr = epoch; 1314 connection->send.current_epoch_writes = 0; 1315 } 1316} 1317 1318static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch) 1319{ 1320 /* re-init if first write on this connection */ 1321 if (!connection->send.seen_any_write_yet) 1322 return; 1323 if (connection->send.current_epoch_nr != epoch) { 1324 if (connection->send.current_epoch_writes) 1325 drbd_send_barrier(connection); 1326 connection->send.current_epoch_nr = epoch; 1327 } 1328} 1329 1330int w_send_out_of_sync(struct drbd_work *w, int cancel) 1331{ 1332 struct drbd_request *req = container_of(w, struct drbd_request, w); 1333 struct drbd_device *device = req->device; 1334 struct drbd_connection *connection = first_peer_device(device)->connection; 1335 int err; 1336 1337 if (unlikely(cancel)) { 1338 req_mod(req, SEND_CANCELED); 1339 return 0; 1340 } 1341 1342 /* this time, no connection->send.current_epoch_writes++; 1343 * If it was sent, it was the closing barrier for the last 1344 * replicated epoch, before we went into AHEAD mode. 1345 * No more barriers will be sent, until we leave AHEAD mode again. */ 1346 maybe_send_barrier(connection, req->epoch); 1347 1348 err = drbd_send_out_of_sync(first_peer_device(device), req); 1349 req_mod(req, OOS_HANDED_TO_NETWORK); 1350 1351 return err; 1352} 1353 1354/** 1355 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1356 * @w: work object. 1357 * @cancel: The connection will be closed anyways 1358 */ 1359int w_send_dblock(struct drbd_work *w, int cancel) 1360{ 1361 struct drbd_request *req = container_of(w, struct drbd_request, w); 1362 struct drbd_device *device = req->device; 1363 struct drbd_connection *connection = first_peer_device(device)->connection; 1364 int err; 1365 1366 if (unlikely(cancel)) { 1367 req_mod(req, SEND_CANCELED); 1368 return 0; 1369 } 1370 1371 re_init_if_first_write(connection, req->epoch); 1372 maybe_send_barrier(connection, req->epoch); 1373 connection->send.current_epoch_writes++; 1374 1375 err = drbd_send_dblock(first_peer_device(device), req); 1376 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1377 1378 return err; 1379} 1380 1381/** 1382 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1383 * @w: work object. 1384 * @cancel: The connection will be closed anyways 1385 */ 1386int w_send_read_req(struct drbd_work *w, int cancel) 1387{ 1388 struct drbd_request *req = container_of(w, struct drbd_request, w); 1389 struct drbd_device *device = req->device; 1390 struct drbd_connection *connection = first_peer_device(device)->connection; 1391 int err; 1392 1393 if (unlikely(cancel)) { 1394 req_mod(req, SEND_CANCELED); 1395 return 0; 1396 } 1397 1398 /* Even read requests may close a write epoch, 1399 * if there was any yet. */ 1400 maybe_send_barrier(connection, req->epoch); 1401 1402 err = drbd_send_drequest(first_peer_device(device), P_DATA_REQUEST, req->i.sector, req->i.size, 1403 (unsigned long)req); 1404 1405 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1406 1407 return err; 1408} 1409 1410int w_restart_disk_io(struct drbd_work *w, int cancel) 1411{ 1412 struct drbd_request *req = container_of(w, struct drbd_request, w); 1413 struct drbd_device *device = req->device; 1414 1415 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1416 drbd_al_begin_io(device, &req->i, false); 1417 1418 drbd_req_make_private_bio(req, req->master_bio); 1419 req->private_bio->bi_bdev = device->ldev->backing_bdev; 1420 generic_make_request(req->private_bio); 1421 1422 return 0; 1423} 1424 1425static int _drbd_may_sync_now(struct drbd_device *device) 1426{ 1427 struct drbd_device *odev = device; 1428 int resync_after; 1429 1430 while (1) { 1431 if (!odev->ldev || odev->state.disk == D_DISKLESS) 1432 return 1; 1433 rcu_read_lock(); 1434 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1435 rcu_read_unlock(); 1436 if (resync_after == -1) 1437 return 1; 1438 odev = minor_to_device(resync_after); 1439 if (!odev) 1440 return 1; 1441 if ((odev->state.conn >= C_SYNC_SOURCE && 1442 odev->state.conn <= C_PAUSED_SYNC_T) || 1443 odev->state.aftr_isp || odev->state.peer_isp || 1444 odev->state.user_isp) 1445 return 0; 1446 } 1447} 1448 1449/** 1450 * _drbd_pause_after() - Pause resync on all devices that may not resync now 1451 * @device: DRBD device. 1452 * 1453 * Called from process context only (admin command and after_state_ch). 1454 */ 1455static int _drbd_pause_after(struct drbd_device *device) 1456{ 1457 struct drbd_device *odev; 1458 int i, rv = 0; 1459 1460 rcu_read_lock(); 1461 idr_for_each_entry(&drbd_devices, odev, i) { 1462 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1463 continue; 1464 if (!_drbd_may_sync_now(odev)) 1465 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL) 1466 != SS_NOTHING_TO_DO); 1467 } 1468 rcu_read_unlock(); 1469 1470 return rv; 1471} 1472 1473/** 1474 * _drbd_resume_next() - Resume resync on all devices that may resync now 1475 * @device: DRBD device. 1476 * 1477 * Called from process context only (admin command and worker). 1478 */ 1479static int _drbd_resume_next(struct drbd_device *device) 1480{ 1481 struct drbd_device *odev; 1482 int i, rv = 0; 1483 1484 rcu_read_lock(); 1485 idr_for_each_entry(&drbd_devices, odev, i) { 1486 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1487 continue; 1488 if (odev->state.aftr_isp) { 1489 if (_drbd_may_sync_now(odev)) 1490 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0), 1491 CS_HARD, NULL) 1492 != SS_NOTHING_TO_DO) ; 1493 } 1494 } 1495 rcu_read_unlock(); 1496 return rv; 1497} 1498 1499void resume_next_sg(struct drbd_device *device) 1500{ 1501 write_lock_irq(&global_state_lock); 1502 _drbd_resume_next(device); 1503 write_unlock_irq(&global_state_lock); 1504} 1505 1506void suspend_other_sg(struct drbd_device *device) 1507{ 1508 write_lock_irq(&global_state_lock); 1509 _drbd_pause_after(device); 1510 write_unlock_irq(&global_state_lock); 1511} 1512 1513/* caller must hold global_state_lock */ 1514enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor) 1515{ 1516 struct drbd_device *odev; 1517 int resync_after; 1518 1519 if (o_minor == -1) 1520 return NO_ERROR; 1521 if (o_minor < -1 || o_minor > MINORMASK) 1522 return ERR_RESYNC_AFTER; 1523 1524 /* check for loops */ 1525 odev = minor_to_device(o_minor); 1526 while (1) { 1527 if (odev == device) 1528 return ERR_RESYNC_AFTER_CYCLE; 1529 1530 /* You are free to depend on diskless, non-existing, 1531 * or not yet/no longer existing minors. 1532 * We only reject dependency loops. 1533 * We cannot follow the dependency chain beyond a detached or 1534 * missing minor. 1535 */ 1536 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS) 1537 return NO_ERROR; 1538 1539 rcu_read_lock(); 1540 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1541 rcu_read_unlock(); 1542 /* dependency chain ends here, no cycles. */ 1543 if (resync_after == -1) 1544 return NO_ERROR; 1545 1546 /* follow the dependency chain */ 1547 odev = minor_to_device(resync_after); 1548 } 1549} 1550 1551/* caller must hold global_state_lock */ 1552void drbd_resync_after_changed(struct drbd_device *device) 1553{ 1554 int changes; 1555 1556 do { 1557 changes = _drbd_pause_after(device); 1558 changes |= _drbd_resume_next(device); 1559 } while (changes); 1560} 1561 1562void drbd_rs_controller_reset(struct drbd_device *device) 1563{ 1564 struct fifo_buffer *plan; 1565 1566 atomic_set(&device->rs_sect_in, 0); 1567 atomic_set(&device->rs_sect_ev, 0); 1568 device->rs_in_flight = 0; 1569 1570 /* Updating the RCU protected object in place is necessary since 1571 this function gets called from atomic context. 1572 It is valid since all other updates also lead to an completely 1573 empty fifo */ 1574 rcu_read_lock(); 1575 plan = rcu_dereference(device->rs_plan_s); 1576 plan->total = 0; 1577 fifo_set(plan, 0); 1578 rcu_read_unlock(); 1579} 1580 1581void start_resync_timer_fn(unsigned long data) 1582{ 1583 struct drbd_device *device = (struct drbd_device *) data; 1584 1585 drbd_queue_work(&first_peer_device(device)->connection->sender_work, 1586 &device->start_resync_work); 1587} 1588 1589int w_start_resync(struct drbd_work *w, int cancel) 1590{ 1591 struct drbd_device *device = 1592 container_of(w, struct drbd_device, start_resync_work); 1593 1594 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) { 1595 drbd_warn(device, "w_start_resync later...\n"); 1596 device->start_resync_timer.expires = jiffies + HZ/10; 1597 add_timer(&device->start_resync_timer); 1598 return 0; 1599 } 1600 1601 drbd_start_resync(device, C_SYNC_SOURCE); 1602 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags); 1603 return 0; 1604} 1605 1606/** 1607 * drbd_start_resync() - Start the resync process 1608 * @device: DRBD device. 1609 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1610 * 1611 * This function might bring you directly into one of the 1612 * C_PAUSED_SYNC_* states. 1613 */ 1614void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) 1615{ 1616 union drbd_state ns; 1617 int r; 1618 1619 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) { 1620 drbd_err(device, "Resync already running!\n"); 1621 return; 1622 } 1623 1624 if (!test_bit(B_RS_H_DONE, &device->flags)) { 1625 if (side == C_SYNC_TARGET) { 1626 /* Since application IO was locked out during C_WF_BITMAP_T and 1627 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1628 we check that we might make the data inconsistent. */ 1629 r = drbd_khelper(device, "before-resync-target"); 1630 r = (r >> 8) & 0xff; 1631 if (r > 0) { 1632 drbd_info(device, "before-resync-target handler returned %d, " 1633 "dropping connection.\n", r); 1634 conn_request_state(first_peer_device(device)->connection, NS(conn, C_DISCONNECTING), CS_HARD); 1635 return; 1636 } 1637 } else /* C_SYNC_SOURCE */ { 1638 r = drbd_khelper(device, "before-resync-source"); 1639 r = (r >> 8) & 0xff; 1640 if (r > 0) { 1641 if (r == 3) { 1642 drbd_info(device, "before-resync-source handler returned %d, " 1643 "ignoring. Old userland tools?", r); 1644 } else { 1645 drbd_info(device, "before-resync-source handler returned %d, " 1646 "dropping connection.\n", r); 1647 conn_request_state(first_peer_device(device)->connection, 1648 NS(conn, C_DISCONNECTING), CS_HARD); 1649 return; 1650 } 1651 } 1652 } 1653 } 1654 1655 if (current == first_peer_device(device)->connection->worker.task) { 1656 /* The worker should not sleep waiting for state_mutex, 1657 that can take long */ 1658 if (!mutex_trylock(device->state_mutex)) { 1659 set_bit(B_RS_H_DONE, &device->flags); 1660 device->start_resync_timer.expires = jiffies + HZ/5; 1661 add_timer(&device->start_resync_timer); 1662 return; 1663 } 1664 } else { 1665 mutex_lock(device->state_mutex); 1666 } 1667 clear_bit(B_RS_H_DONE, &device->flags); 1668 1669 write_lock_irq(&global_state_lock); 1670 /* Did some connection breakage or IO error race with us? */ 1671 if (device->state.conn < C_CONNECTED 1672 || !get_ldev_if_state(device, D_NEGOTIATING)) { 1673 write_unlock_irq(&global_state_lock); 1674 mutex_unlock(device->state_mutex); 1675 return; 1676 } 1677 1678 ns = drbd_read_state(device); 1679 1680 ns.aftr_isp = !_drbd_may_sync_now(device); 1681 1682 ns.conn = side; 1683 1684 if (side == C_SYNC_TARGET) 1685 ns.disk = D_INCONSISTENT; 1686 else /* side == C_SYNC_SOURCE */ 1687 ns.pdsk = D_INCONSISTENT; 1688 1689 r = __drbd_set_state(device, ns, CS_VERBOSE, NULL); 1690 ns = drbd_read_state(device); 1691 1692 if (ns.conn < C_CONNECTED) 1693 r = SS_UNKNOWN_ERROR; 1694 1695 if (r == SS_SUCCESS) { 1696 unsigned long tw = drbd_bm_total_weight(device); 1697 unsigned long now = jiffies; 1698 int i; 1699 1700 device->rs_failed = 0; 1701 device->rs_paused = 0; 1702 device->rs_same_csum = 0; 1703 device->rs_last_events = 0; 1704 device->rs_last_sect_ev = 0; 1705 device->rs_total = tw; 1706 device->rs_start = now; 1707 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1708 device->rs_mark_left[i] = tw; 1709 device->rs_mark_time[i] = now; 1710 } 1711 _drbd_pause_after(device); 1712 } 1713 write_unlock_irq(&global_state_lock); 1714 1715 if (r == SS_SUCCESS) { 1716 /* reset rs_last_bcast when a resync or verify is started, 1717 * to deal with potential jiffies wrap. */ 1718 device->rs_last_bcast = jiffies - HZ; 1719 1720 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1721 drbd_conn_str(ns.conn), 1722 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10), 1723 (unsigned long) device->rs_total); 1724 if (side == C_SYNC_TARGET) 1725 device->bm_resync_fo = 0; 1726 1727 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid 1728 * with w_send_oos, or the sync target will get confused as to 1729 * how much bits to resync. We cannot do that always, because for an 1730 * empty resync and protocol < 95, we need to do it here, as we call 1731 * drbd_resync_finished from here in that case. 1732 * We drbd_gen_and_send_sync_uuid here for protocol < 96, 1733 * and from after_state_ch otherwise. */ 1734 if (side == C_SYNC_SOURCE && 1735 first_peer_device(device)->connection->agreed_pro_version < 96) 1736 drbd_gen_and_send_sync_uuid(first_peer_device(device)); 1737 1738 if (first_peer_device(device)->connection->agreed_pro_version < 95 && 1739 device->rs_total == 0) { 1740 /* This still has a race (about when exactly the peers 1741 * detect connection loss) that can lead to a full sync 1742 * on next handshake. In 8.3.9 we fixed this with explicit 1743 * resync-finished notifications, but the fix 1744 * introduces a protocol change. Sleeping for some 1745 * time longer than the ping interval + timeout on the 1746 * SyncSource, to give the SyncTarget the chance to 1747 * detect connection loss, then waiting for a ping 1748 * response (implicit in drbd_resync_finished) reduces 1749 * the race considerably, but does not solve it. */ 1750 if (side == C_SYNC_SOURCE) { 1751 struct net_conf *nc; 1752 int timeo; 1753 1754 rcu_read_lock(); 1755 nc = rcu_dereference(first_peer_device(device)->connection->net_conf); 1756 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9; 1757 rcu_read_unlock(); 1758 schedule_timeout_interruptible(timeo); 1759 } 1760 drbd_resync_finished(device); 1761 } 1762 1763 drbd_rs_controller_reset(device); 1764 /* ns.conn may already be != device->state.conn, 1765 * we may have been paused in between, or become paused until 1766 * the timer triggers. 1767 * No matter, that is handled in resync_timer_fn() */ 1768 if (ns.conn == C_SYNC_TARGET) 1769 mod_timer(&device->resync_timer, jiffies); 1770 1771 drbd_md_sync(device); 1772 } 1773 put_ldev(device); 1774 mutex_unlock(device->state_mutex); 1775} 1776 1777/* If the resource already closed the current epoch, but we did not 1778 * (because we have not yet seen new requests), we should send the 1779 * corresponding barrier now. Must be checked within the same spinlock 1780 * that is used to check for new requests. */ 1781static bool need_to_send_barrier(struct drbd_connection *connection) 1782{ 1783 if (!connection->send.seen_any_write_yet) 1784 return false; 1785 1786 /* Skip barriers that do not contain any writes. 1787 * This may happen during AHEAD mode. */ 1788 if (!connection->send.current_epoch_writes) 1789 return false; 1790 1791 /* ->req_lock is held when requests are queued on 1792 * connection->sender_work, and put into ->transfer_log. 1793 * It is also held when ->current_tle_nr is increased. 1794 * So either there are already new requests queued, 1795 * and corresponding barriers will be send there. 1796 * Or nothing new is queued yet, so the difference will be 1. 1797 */ 1798 if (atomic_read(&connection->current_tle_nr) != 1799 connection->send.current_epoch_nr + 1) 1800 return false; 1801 1802 return true; 1803} 1804 1805static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) 1806{ 1807 spin_lock_irq(&queue->q_lock); 1808 list_splice_init(&queue->q, work_list); 1809 spin_unlock_irq(&queue->q_lock); 1810 return !list_empty(work_list); 1811} 1812 1813static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list) 1814{ 1815 spin_lock_irq(&queue->q_lock); 1816 if (!list_empty(&queue->q)) 1817 list_move(queue->q.next, work_list); 1818 spin_unlock_irq(&queue->q_lock); 1819 return !list_empty(work_list); 1820} 1821 1822static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list) 1823{ 1824 DEFINE_WAIT(wait); 1825 struct net_conf *nc; 1826 int uncork, cork; 1827 1828 dequeue_work_item(&connection->sender_work, work_list); 1829 if (!list_empty(work_list)) 1830 return; 1831 1832 /* Still nothing to do? 1833 * Maybe we still need to close the current epoch, 1834 * even if no new requests are queued yet. 1835 * 1836 * Also, poke TCP, just in case. 1837 * Then wait for new work (or signal). */ 1838 rcu_read_lock(); 1839 nc = rcu_dereference(connection->net_conf); 1840 uncork = nc ? nc->tcp_cork : 0; 1841 rcu_read_unlock(); 1842 if (uncork) { 1843 mutex_lock(&connection->data.mutex); 1844 if (connection->data.socket) 1845 drbd_tcp_uncork(connection->data.socket); 1846 mutex_unlock(&connection->data.mutex); 1847 } 1848 1849 for (;;) { 1850 int send_barrier; 1851 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE); 1852 spin_lock_irq(&connection->resource->req_lock); 1853 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 1854 /* dequeue single item only, 1855 * we still use drbd_queue_work_front() in some places */ 1856 if (!list_empty(&connection->sender_work.q)) 1857 list_move(connection->sender_work.q.next, work_list); 1858 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 1859 if (!list_empty(work_list) || signal_pending(current)) { 1860 spin_unlock_irq(&connection->resource->req_lock); 1861 break; 1862 } 1863 send_barrier = need_to_send_barrier(connection); 1864 spin_unlock_irq(&connection->resource->req_lock); 1865 if (send_barrier) { 1866 drbd_send_barrier(connection); 1867 connection->send.current_epoch_nr++; 1868 } 1869 schedule(); 1870 /* may be woken up for other things but new work, too, 1871 * e.g. if the current epoch got closed. 1872 * In which case we send the barrier above. */ 1873 } 1874 finish_wait(&connection->sender_work.q_wait, &wait); 1875 1876 /* someone may have changed the config while we have been waiting above. */ 1877 rcu_read_lock(); 1878 nc = rcu_dereference(connection->net_conf); 1879 cork = nc ? nc->tcp_cork : 0; 1880 rcu_read_unlock(); 1881 mutex_lock(&connection->data.mutex); 1882 if (connection->data.socket) { 1883 if (cork) 1884 drbd_tcp_cork(connection->data.socket); 1885 else if (!uncork) 1886 drbd_tcp_uncork(connection->data.socket); 1887 } 1888 mutex_unlock(&connection->data.mutex); 1889} 1890 1891int drbd_worker(struct drbd_thread *thi) 1892{ 1893 struct drbd_connection *connection = thi->connection; 1894 struct drbd_device_work *dw = NULL; 1895 struct drbd_peer_device *peer_device; 1896 LIST_HEAD(work_list); 1897 int vnr; 1898 1899 while (get_t_state(thi) == RUNNING) { 1900 drbd_thread_current_set_cpu(thi); 1901 1902 /* as long as we use drbd_queue_work_front(), 1903 * we may only dequeue single work items here, not batches. */ 1904 if (list_empty(&work_list)) 1905 wait_for_work(connection, &work_list); 1906 1907 if (signal_pending(current)) { 1908 flush_signals(current); 1909 if (get_t_state(thi) == RUNNING) { 1910 drbd_warn(connection, "Worker got an unexpected signal\n"); 1911 continue; 1912 } 1913 break; 1914 } 1915 1916 if (get_t_state(thi) != RUNNING) 1917 break; 1918 1919 while (!list_empty(&work_list)) { 1920 dw = list_first_entry(&work_list, struct drbd_device_work, w.list); 1921 list_del_init(&dw->w.list); 1922 if (dw->w.cb(&dw->w, connection->cstate < C_WF_REPORT_PARAMS) == 0) 1923 continue; 1924 if (connection->cstate >= C_WF_REPORT_PARAMS) 1925 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 1926 } 1927 } 1928 1929 do { 1930 while (!list_empty(&work_list)) { 1931 dw = list_first_entry(&work_list, struct drbd_device_work, w.list); 1932 list_del_init(&dw->w.list); 1933 dw->w.cb(&dw->w, 1); 1934 } 1935 dequeue_work_batch(&connection->sender_work, &work_list); 1936 } while (!list_empty(&work_list)); 1937 1938 rcu_read_lock(); 1939 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1940 struct drbd_device *device = peer_device->device; 1941 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE); 1942 kref_get(&device->kref); 1943 rcu_read_unlock(); 1944 drbd_device_cleanup(device); 1945 kref_put(&device->kref, drbd_destroy_device); 1946 rcu_read_lock(); 1947 } 1948 rcu_read_unlock(); 1949 1950 return 0; 1951} 1952