drbd_worker.c revision a80ca1ae81fc52e304e753f6de4ef248df364f9e
1/* 2 drbd_worker.c 3 4 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. 5 6 Copyright (C) 2001-2008, LINBIT Information Technologies GmbH. 7 Copyright (C) 1999-2008, Philipp Reisner <philipp.reisner@linbit.com>. 8 Copyright (C) 2002-2008, Lars Ellenberg <lars.ellenberg@linbit.com>. 9 10 drbd is free software; you can redistribute it and/or modify 11 it under the terms of the GNU General Public License as published by 12 the Free Software Foundation; either version 2, or (at your option) 13 any later version. 14 15 drbd is distributed in the hope that it will be useful, 16 but WITHOUT ANY WARRANTY; without even the implied warranty of 17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 18 GNU General Public License for more details. 19 20 You should have received a copy of the GNU General Public License 21 along with drbd; see the file COPYING. If not, write to 22 the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA. 23 24*/ 25 26#include <linux/module.h> 27#include <linux/drbd.h> 28#include <linux/sched.h> 29#include <linux/wait.h> 30#include <linux/mm.h> 31#include <linux/memcontrol.h> 32#include <linux/mm_inline.h> 33#include <linux/slab.h> 34#include <linux/random.h> 35#include <linux/string.h> 36#include <linux/scatterlist.h> 37 38#include "drbd_int.h" 39#include "drbd_protocol.h" 40#include "drbd_req.h" 41 42static int make_ov_request(struct drbd_device *, int); 43static int make_resync_request(struct drbd_device *, int); 44 45/* endio handlers: 46 * drbd_md_io_complete (defined here) 47 * drbd_request_endio (defined here) 48 * drbd_peer_request_endio (defined here) 49 * bm_async_io_complete (defined in drbd_bitmap.c) 50 * 51 * For all these callbacks, note the following: 52 * The callbacks will be called in irq context by the IDE drivers, 53 * and in Softirqs/Tasklets/BH context by the SCSI drivers. 54 * Try to get the locking right :) 55 * 56 */ 57 58 59/* About the global_state_lock 60 Each state transition on an device holds a read lock. In case we have 61 to evaluate the resync after dependencies, we grab a write lock, because 62 we need stable states on all devices for that. */ 63rwlock_t global_state_lock; 64 65/* used for synchronous meta data and bitmap IO 66 * submitted by drbd_md_sync_page_io() 67 */ 68void drbd_md_io_complete(struct bio *bio, int error) 69{ 70 struct drbd_md_io *md_io; 71 struct drbd_device *device; 72 73 md_io = (struct drbd_md_io *)bio->bi_private; 74 device = container_of(md_io, struct drbd_device, md_io); 75 76 md_io->error = error; 77 78 /* We grabbed an extra reference in _drbd_md_sync_page_io() to be able 79 * to timeout on the lower level device, and eventually detach from it. 80 * If this io completion runs after that timeout expired, this 81 * drbd_md_put_buffer() may allow us to finally try and re-attach. 82 * During normal operation, this only puts that extra reference 83 * down to 1 again. 84 * Make sure we first drop the reference, and only then signal 85 * completion, or we may (in drbd_al_read_log()) cycle so fast into the 86 * next drbd_md_sync_page_io(), that we trigger the 87 * ASSERT(atomic_read(&device->md_io_in_use) == 1) there. 88 */ 89 drbd_md_put_buffer(device); 90 md_io->done = 1; 91 wake_up(&device->misc_wait); 92 bio_put(bio); 93 if (device->ldev) /* special case: drbd_md_read() during drbd_adm_attach() */ 94 put_ldev(device); 95} 96 97/* reads on behalf of the partner, 98 * "submitted" by the receiver 99 */ 100static void drbd_endio_read_sec_final(struct drbd_peer_request *peer_req) __releases(local) 101{ 102 unsigned long flags = 0; 103 struct drbd_peer_device *peer_device = peer_req->peer_device; 104 struct drbd_device *device = peer_device->device; 105 106 spin_lock_irqsave(&device->resource->req_lock, flags); 107 device->read_cnt += peer_req->i.size >> 9; 108 list_del(&peer_req->w.list); 109 if (list_empty(&device->read_ee)) 110 wake_up(&device->ee_wait); 111 if (test_bit(__EE_WAS_ERROR, &peer_req->flags)) 112 __drbd_chk_io_error(device, DRBD_READ_ERROR); 113 spin_unlock_irqrestore(&device->resource->req_lock, flags); 114 115 drbd_queue_work(&peer_device->connection->sender_work, &peer_req->w); 116 put_ldev(device); 117} 118 119/* writes on behalf of the partner, or resync writes, 120 * "submitted" by the receiver, final stage. */ 121void drbd_endio_write_sec_final(struct drbd_peer_request *peer_req) __releases(local) 122{ 123 unsigned long flags = 0; 124 struct drbd_peer_device *peer_device = peer_req->peer_device; 125 struct drbd_device *device = peer_device->device; 126 struct drbd_interval i; 127 int do_wake; 128 u64 block_id; 129 int do_al_complete_io; 130 131 /* after we moved peer_req to done_ee, 132 * we may no longer access it, 133 * it may be freed/reused already! 134 * (as soon as we release the req_lock) */ 135 i = peer_req->i; 136 do_al_complete_io = peer_req->flags & EE_CALL_AL_COMPLETE_IO; 137 block_id = peer_req->block_id; 138 139 spin_lock_irqsave(&device->resource->req_lock, flags); 140 device->writ_cnt += peer_req->i.size >> 9; 141 list_move_tail(&peer_req->w.list, &device->done_ee); 142 143 /* 144 * Do not remove from the write_requests tree here: we did not send the 145 * Ack yet and did not wake possibly waiting conflicting requests. 146 * Removed from the tree from "drbd_process_done_ee" within the 147 * appropriate dw.cb (e_end_block/e_end_resync_block) or from 148 * _drbd_clear_done_ee. 149 */ 150 151 do_wake = list_empty(block_id == ID_SYNCER ? &device->sync_ee : &device->active_ee); 152 153 /* FIXME do we want to detach for failed REQ_DISCARD? 154 * ((peer_req->flags & (EE_WAS_ERROR|EE_IS_TRIM)) == EE_WAS_ERROR) */ 155 if (peer_req->flags & EE_WAS_ERROR) 156 __drbd_chk_io_error(device, DRBD_WRITE_ERROR); 157 spin_unlock_irqrestore(&device->resource->req_lock, flags); 158 159 if (block_id == ID_SYNCER) 160 drbd_rs_complete_io(device, i.sector); 161 162 if (do_wake) 163 wake_up(&device->ee_wait); 164 165 if (do_al_complete_io) 166 drbd_al_complete_io(device, &i); 167 168 wake_asender(peer_device->connection); 169 put_ldev(device); 170} 171 172/* writes on behalf of the partner, or resync writes, 173 * "submitted" by the receiver. 174 */ 175void drbd_peer_request_endio(struct bio *bio, int error) 176{ 177 struct drbd_peer_request *peer_req = bio->bi_private; 178 struct drbd_device *device = peer_req->peer_device->device; 179 int uptodate = bio_flagged(bio, BIO_UPTODATE); 180 int is_write = bio_data_dir(bio) == WRITE; 181 int is_discard = !!(bio->bi_rw & REQ_DISCARD); 182 183 if (error && __ratelimit(&drbd_ratelimit_state)) 184 drbd_warn(device, "%s: error=%d s=%llus\n", 185 is_write ? (is_discard ? "discard" : "write") 186 : "read", error, 187 (unsigned long long)peer_req->i.sector); 188 if (!error && !uptodate) { 189 if (__ratelimit(&drbd_ratelimit_state)) 190 drbd_warn(device, "%s: setting error to -EIO s=%llus\n", 191 is_write ? "write" : "read", 192 (unsigned long long)peer_req->i.sector); 193 /* strange behavior of some lower level drivers... 194 * fail the request by clearing the uptodate flag, 195 * but do not return any error?! */ 196 error = -EIO; 197 } 198 199 if (error) 200 set_bit(__EE_WAS_ERROR, &peer_req->flags); 201 202 bio_put(bio); /* no need for the bio anymore */ 203 if (atomic_dec_and_test(&peer_req->pending_bios)) { 204 if (is_write) 205 drbd_endio_write_sec_final(peer_req); 206 else 207 drbd_endio_read_sec_final(peer_req); 208 } 209} 210 211/* read, readA or write requests on R_PRIMARY coming from drbd_make_request 212 */ 213void drbd_request_endio(struct bio *bio, int error) 214{ 215 unsigned long flags; 216 struct drbd_request *req = bio->bi_private; 217 struct drbd_device *device = req->device; 218 struct bio_and_error m; 219 enum drbd_req_event what; 220 int uptodate = bio_flagged(bio, BIO_UPTODATE); 221 222 if (!error && !uptodate) { 223 drbd_warn(device, "p %s: setting error to -EIO\n", 224 bio_data_dir(bio) == WRITE ? "write" : "read"); 225 /* strange behavior of some lower level drivers... 226 * fail the request by clearing the uptodate flag, 227 * but do not return any error?! */ 228 error = -EIO; 229 } 230 231 232 /* If this request was aborted locally before, 233 * but now was completed "successfully", 234 * chances are that this caused arbitrary data corruption. 235 * 236 * "aborting" requests, or force-detaching the disk, is intended for 237 * completely blocked/hung local backing devices which do no longer 238 * complete requests at all, not even do error completions. In this 239 * situation, usually a hard-reset and failover is the only way out. 240 * 241 * By "aborting", basically faking a local error-completion, 242 * we allow for a more graceful swichover by cleanly migrating services. 243 * Still the affected node has to be rebooted "soon". 244 * 245 * By completing these requests, we allow the upper layers to re-use 246 * the associated data pages. 247 * 248 * If later the local backing device "recovers", and now DMAs some data 249 * from disk into the original request pages, in the best case it will 250 * just put random data into unused pages; but typically it will corrupt 251 * meanwhile completely unrelated data, causing all sorts of damage. 252 * 253 * Which means delayed successful completion, 254 * especially for READ requests, 255 * is a reason to panic(). 256 * 257 * We assume that a delayed *error* completion is OK, 258 * though we still will complain noisily about it. 259 */ 260 if (unlikely(req->rq_state & RQ_LOCAL_ABORTED)) { 261 if (__ratelimit(&drbd_ratelimit_state)) 262 drbd_emerg(device, "delayed completion of aborted local request; disk-timeout may be too aggressive\n"); 263 264 if (!error) 265 panic("possible random memory corruption caused by delayed completion of aborted local request\n"); 266 } 267 268 /* to avoid recursion in __req_mod */ 269 if (unlikely(error)) { 270 if (bio->bi_rw & REQ_DISCARD) 271 what = (error == -EOPNOTSUPP) 272 ? DISCARD_COMPLETED_NOTSUPP 273 : DISCARD_COMPLETED_WITH_ERROR; 274 else 275 what = (bio_data_dir(bio) == WRITE) 276 ? WRITE_COMPLETED_WITH_ERROR 277 : (bio_rw(bio) == READ) 278 ? READ_COMPLETED_WITH_ERROR 279 : READ_AHEAD_COMPLETED_WITH_ERROR; 280 } else 281 what = COMPLETED_OK; 282 283 bio_put(req->private_bio); 284 req->private_bio = ERR_PTR(error); 285 286 /* not req_mod(), we need irqsave here! */ 287 spin_lock_irqsave(&device->resource->req_lock, flags); 288 __req_mod(req, what, &m); 289 spin_unlock_irqrestore(&device->resource->req_lock, flags); 290 put_ldev(device); 291 292 if (m.bio) 293 complete_master_bio(device, &m); 294} 295 296void drbd_csum_ee(struct crypto_hash *tfm, struct drbd_peer_request *peer_req, void *digest) 297{ 298 struct hash_desc desc; 299 struct scatterlist sg; 300 struct page *page = peer_req->pages; 301 struct page *tmp; 302 unsigned len; 303 304 desc.tfm = tfm; 305 desc.flags = 0; 306 307 sg_init_table(&sg, 1); 308 crypto_hash_init(&desc); 309 310 while ((tmp = page_chain_next(page))) { 311 /* all but the last page will be fully used */ 312 sg_set_page(&sg, page, PAGE_SIZE, 0); 313 crypto_hash_update(&desc, &sg, sg.length); 314 page = tmp; 315 } 316 /* and now the last, possibly only partially used page */ 317 len = peer_req->i.size & (PAGE_SIZE - 1); 318 sg_set_page(&sg, page, len ?: PAGE_SIZE, 0); 319 crypto_hash_update(&desc, &sg, sg.length); 320 crypto_hash_final(&desc, digest); 321} 322 323void drbd_csum_bio(struct crypto_hash *tfm, struct bio *bio, void *digest) 324{ 325 struct hash_desc desc; 326 struct scatterlist sg; 327 struct bio_vec bvec; 328 struct bvec_iter iter; 329 330 desc.tfm = tfm; 331 desc.flags = 0; 332 333 sg_init_table(&sg, 1); 334 crypto_hash_init(&desc); 335 336 bio_for_each_segment(bvec, bio, iter) { 337 sg_set_page(&sg, bvec.bv_page, bvec.bv_len, bvec.bv_offset); 338 crypto_hash_update(&desc, &sg, sg.length); 339 } 340 crypto_hash_final(&desc, digest); 341} 342 343/* MAYBE merge common code with w_e_end_ov_req */ 344static int w_e_send_csum(struct drbd_work *w, int cancel) 345{ 346 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 347 struct drbd_peer_device *peer_device = peer_req->peer_device; 348 struct drbd_device *device = peer_device->device; 349 int digest_size; 350 void *digest; 351 int err = 0; 352 353 if (unlikely(cancel)) 354 goto out; 355 356 if (unlikely((peer_req->flags & EE_WAS_ERROR) != 0)) 357 goto out; 358 359 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm); 360 digest = kmalloc(digest_size, GFP_NOIO); 361 if (digest) { 362 sector_t sector = peer_req->i.sector; 363 unsigned int size = peer_req->i.size; 364 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 365 /* Free peer_req and pages before send. 366 * In case we block on congestion, we could otherwise run into 367 * some distributed deadlock, if the other side blocks on 368 * congestion as well, because our receiver blocks in 369 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 370 drbd_free_peer_req(device, peer_req); 371 peer_req = NULL; 372 inc_rs_pending(device); 373 err = drbd_send_drequest_csum(peer_device, sector, size, 374 digest, digest_size, 375 P_CSUM_RS_REQUEST); 376 kfree(digest); 377 } else { 378 drbd_err(device, "kmalloc() of digest failed.\n"); 379 err = -ENOMEM; 380 } 381 382out: 383 if (peer_req) 384 drbd_free_peer_req(device, peer_req); 385 386 if (unlikely(err)) 387 drbd_err(device, "drbd_send_drequest(..., csum) failed\n"); 388 return err; 389} 390 391#define GFP_TRY (__GFP_HIGHMEM | __GFP_NOWARN) 392 393static int read_for_csum(struct drbd_peer_device *peer_device, sector_t sector, int size) 394{ 395 struct drbd_device *device = peer_device->device; 396 struct drbd_peer_request *peer_req; 397 398 if (!get_ldev(device)) 399 return -EIO; 400 401 if (drbd_rs_should_slow_down(device, sector)) 402 goto defer; 403 404 /* GFP_TRY, because if there is no memory available right now, this may 405 * be rescheduled for later. It is "only" background resync, after all. */ 406 peer_req = drbd_alloc_peer_req(peer_device, ID_SYNCER /* unused */, sector, 407 size, true /* has real payload */, GFP_TRY); 408 if (!peer_req) 409 goto defer; 410 411 peer_req->w.cb = w_e_send_csum; 412 spin_lock_irq(&device->resource->req_lock); 413 list_add(&peer_req->w.list, &device->read_ee); 414 spin_unlock_irq(&device->resource->req_lock); 415 416 atomic_add(size >> 9, &device->rs_sect_ev); 417 if (drbd_submit_peer_request(device, peer_req, READ, DRBD_FAULT_RS_RD) == 0) 418 return 0; 419 420 /* If it failed because of ENOMEM, retry should help. If it failed 421 * because bio_add_page failed (probably broken lower level driver), 422 * retry may or may not help. 423 * If it does not, you may need to force disconnect. */ 424 spin_lock_irq(&device->resource->req_lock); 425 list_del(&peer_req->w.list); 426 spin_unlock_irq(&device->resource->req_lock); 427 428 drbd_free_peer_req(device, peer_req); 429defer: 430 put_ldev(device); 431 return -EAGAIN; 432} 433 434int w_resync_timer(struct drbd_work *w, int cancel) 435{ 436 struct drbd_device *device = 437 container_of(w, struct drbd_device, resync_work); 438 439 switch (device->state.conn) { 440 case C_VERIFY_S: 441 make_ov_request(device, cancel); 442 break; 443 case C_SYNC_TARGET: 444 make_resync_request(device, cancel); 445 break; 446 } 447 448 return 0; 449} 450 451void resync_timer_fn(unsigned long data) 452{ 453 struct drbd_device *device = (struct drbd_device *) data; 454 455 if (list_empty(&device->resync_work.list)) 456 drbd_queue_work(&first_peer_device(device)->connection->sender_work, 457 &device->resync_work); 458} 459 460static void fifo_set(struct fifo_buffer *fb, int value) 461{ 462 int i; 463 464 for (i = 0; i < fb->size; i++) 465 fb->values[i] = value; 466} 467 468static int fifo_push(struct fifo_buffer *fb, int value) 469{ 470 int ov; 471 472 ov = fb->values[fb->head_index]; 473 fb->values[fb->head_index++] = value; 474 475 if (fb->head_index >= fb->size) 476 fb->head_index = 0; 477 478 return ov; 479} 480 481static void fifo_add_val(struct fifo_buffer *fb, int value) 482{ 483 int i; 484 485 for (i = 0; i < fb->size; i++) 486 fb->values[i] += value; 487} 488 489struct fifo_buffer *fifo_alloc(int fifo_size) 490{ 491 struct fifo_buffer *fb; 492 493 fb = kzalloc(sizeof(struct fifo_buffer) + sizeof(int) * fifo_size, GFP_NOIO); 494 if (!fb) 495 return NULL; 496 497 fb->head_index = 0; 498 fb->size = fifo_size; 499 fb->total = 0; 500 501 return fb; 502} 503 504static int drbd_rs_controller(struct drbd_device *device, unsigned int sect_in) 505{ 506 struct disk_conf *dc; 507 unsigned int want; /* The number of sectors we want in the proxy */ 508 int req_sect; /* Number of sectors to request in this turn */ 509 int correction; /* Number of sectors more we need in the proxy*/ 510 int cps; /* correction per invocation of drbd_rs_controller() */ 511 int steps; /* Number of time steps to plan ahead */ 512 int curr_corr; 513 int max_sect; 514 struct fifo_buffer *plan; 515 516 dc = rcu_dereference(device->ldev->disk_conf); 517 plan = rcu_dereference(device->rs_plan_s); 518 519 steps = plan->size; /* (dc->c_plan_ahead * 10 * SLEEP_TIME) / HZ; */ 520 521 if (device->rs_in_flight + sect_in == 0) { /* At start of resync */ 522 want = ((dc->resync_rate * 2 * SLEEP_TIME) / HZ) * steps; 523 } else { /* normal path */ 524 want = dc->c_fill_target ? dc->c_fill_target : 525 sect_in * dc->c_delay_target * HZ / (SLEEP_TIME * 10); 526 } 527 528 correction = want - device->rs_in_flight - plan->total; 529 530 /* Plan ahead */ 531 cps = correction / steps; 532 fifo_add_val(plan, cps); 533 plan->total += cps * steps; 534 535 /* What we do in this step */ 536 curr_corr = fifo_push(plan, 0); 537 plan->total -= curr_corr; 538 539 req_sect = sect_in + curr_corr; 540 if (req_sect < 0) 541 req_sect = 0; 542 543 max_sect = (dc->c_max_rate * 2 * SLEEP_TIME) / HZ; 544 if (req_sect > max_sect) 545 req_sect = max_sect; 546 547 /* 548 drbd_warn(device, "si=%u if=%d wa=%u co=%d st=%d cps=%d pl=%d cc=%d rs=%d\n", 549 sect_in, device->rs_in_flight, want, correction, 550 steps, cps, device->rs_planed, curr_corr, req_sect); 551 */ 552 553 return req_sect; 554} 555 556static int drbd_rs_number_requests(struct drbd_device *device) 557{ 558 unsigned int sect_in; /* Number of sectors that came in since the last turn */ 559 int number, mxb; 560 561 sect_in = atomic_xchg(&device->rs_sect_in, 0); 562 device->rs_in_flight -= sect_in; 563 564 rcu_read_lock(); 565 mxb = drbd_get_max_buffers(device) / 2; 566 if (rcu_dereference(device->rs_plan_s)->size) { 567 number = drbd_rs_controller(device, sect_in) >> (BM_BLOCK_SHIFT - 9); 568 device->c_sync_rate = number * HZ * (BM_BLOCK_SIZE / 1024) / SLEEP_TIME; 569 } else { 570 device->c_sync_rate = rcu_dereference(device->ldev->disk_conf)->resync_rate; 571 number = SLEEP_TIME * device->c_sync_rate / ((BM_BLOCK_SIZE / 1024) * HZ); 572 } 573 rcu_read_unlock(); 574 575 /* Don't have more than "max-buffers"/2 in-flight. 576 * Otherwise we may cause the remote site to stall on drbd_alloc_pages(), 577 * potentially causing a distributed deadlock on congestion during 578 * online-verify or (checksum-based) resync, if max-buffers, 579 * socket buffer sizes and resync rate settings are mis-configured. */ 580 if (mxb - device->rs_in_flight < number) 581 number = mxb - device->rs_in_flight; 582 583 return number; 584} 585 586static int make_resync_request(struct drbd_device *const device, int cancel) 587{ 588 struct drbd_peer_device *const peer_device = first_peer_device(device); 589 struct drbd_connection *const connection = peer_device ? peer_device->connection : NULL; 590 unsigned long bit; 591 sector_t sector; 592 const sector_t capacity = drbd_get_capacity(device->this_bdev); 593 int max_bio_size; 594 int number, rollback_i, size; 595 int align, queued, sndbuf; 596 int i = 0; 597 598 if (unlikely(cancel)) 599 return 0; 600 601 if (device->rs_total == 0) { 602 /* empty resync? */ 603 drbd_resync_finished(device); 604 return 0; 605 } 606 607 if (!get_ldev(device)) { 608 /* Since we only need to access device->rsync a 609 get_ldev_if_state(device,D_FAILED) would be sufficient, but 610 to continue resync with a broken disk makes no sense at 611 all */ 612 drbd_err(device, "Disk broke down during resync!\n"); 613 return 0; 614 } 615 616 max_bio_size = queue_max_hw_sectors(device->rq_queue) << 9; 617 number = drbd_rs_number_requests(device); 618 if (number <= 0) 619 goto requeue; 620 621 for (i = 0; i < number; i++) { 622 /* Stop generating RS requests, when half of the send buffer is filled */ 623 mutex_lock(&connection->data.mutex); 624 if (connection->data.socket) { 625 queued = connection->data.socket->sk->sk_wmem_queued; 626 sndbuf = connection->data.socket->sk->sk_sndbuf; 627 } else { 628 queued = 1; 629 sndbuf = 0; 630 } 631 mutex_unlock(&connection->data.mutex); 632 if (queued > sndbuf / 2) 633 goto requeue; 634 635next_sector: 636 size = BM_BLOCK_SIZE; 637 bit = drbd_bm_find_next(device, device->bm_resync_fo); 638 639 if (bit == DRBD_END_OF_BITMAP) { 640 device->bm_resync_fo = drbd_bm_bits(device); 641 put_ldev(device); 642 return 0; 643 } 644 645 sector = BM_BIT_TO_SECT(bit); 646 647 if (drbd_rs_should_slow_down(device, sector) || 648 drbd_try_rs_begin_io(device, sector)) { 649 device->bm_resync_fo = bit; 650 goto requeue; 651 } 652 device->bm_resync_fo = bit + 1; 653 654 if (unlikely(drbd_bm_test_bit(device, bit) == 0)) { 655 drbd_rs_complete_io(device, sector); 656 goto next_sector; 657 } 658 659#if DRBD_MAX_BIO_SIZE > BM_BLOCK_SIZE 660 /* try to find some adjacent bits. 661 * we stop if we have already the maximum req size. 662 * 663 * Additionally always align bigger requests, in order to 664 * be prepared for all stripe sizes of software RAIDs. 665 */ 666 align = 1; 667 rollback_i = i; 668 while (i < number) { 669 if (size + BM_BLOCK_SIZE > max_bio_size) 670 break; 671 672 /* Be always aligned */ 673 if (sector & ((1<<(align+3))-1)) 674 break; 675 676 /* do not cross extent boundaries */ 677 if (((bit+1) & BM_BLOCKS_PER_BM_EXT_MASK) == 0) 678 break; 679 /* now, is it actually dirty, after all? 680 * caution, drbd_bm_test_bit is tri-state for some 681 * obscure reason; ( b == 0 ) would get the out-of-band 682 * only accidentally right because of the "oddly sized" 683 * adjustment below */ 684 if (drbd_bm_test_bit(device, bit+1) != 1) 685 break; 686 bit++; 687 size += BM_BLOCK_SIZE; 688 if ((BM_BLOCK_SIZE << align) <= size) 689 align++; 690 i++; 691 } 692 /* if we merged some, 693 * reset the offset to start the next drbd_bm_find_next from */ 694 if (size > BM_BLOCK_SIZE) 695 device->bm_resync_fo = bit + 1; 696#endif 697 698 /* adjust very last sectors, in case we are oddly sized */ 699 if (sector + (size>>9) > capacity) 700 size = (capacity-sector)<<9; 701 if (connection->agreed_pro_version >= 89 && 702 connection->csums_tfm) { 703 switch (read_for_csum(peer_device, sector, size)) { 704 case -EIO: /* Disk failure */ 705 put_ldev(device); 706 return -EIO; 707 case -EAGAIN: /* allocation failed, or ldev busy */ 708 drbd_rs_complete_io(device, sector); 709 device->bm_resync_fo = BM_SECT_TO_BIT(sector); 710 i = rollback_i; 711 goto requeue; 712 case 0: 713 /* everything ok */ 714 break; 715 default: 716 BUG(); 717 } 718 } else { 719 int err; 720 721 inc_rs_pending(device); 722 err = drbd_send_drequest(peer_device, P_RS_DATA_REQUEST, 723 sector, size, ID_SYNCER); 724 if (err) { 725 drbd_err(device, "drbd_send_drequest() failed, aborting...\n"); 726 dec_rs_pending(device); 727 put_ldev(device); 728 return err; 729 } 730 } 731 } 732 733 if (device->bm_resync_fo >= drbd_bm_bits(device)) { 734 /* last syncer _request_ was sent, 735 * but the P_RS_DATA_REPLY not yet received. sync will end (and 736 * next sync group will resume), as soon as we receive the last 737 * resync data block, and the last bit is cleared. 738 * until then resync "work" is "inactive" ... 739 */ 740 put_ldev(device); 741 return 0; 742 } 743 744 requeue: 745 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 746 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 747 put_ldev(device); 748 return 0; 749} 750 751static int make_ov_request(struct drbd_device *device, int cancel) 752{ 753 int number, i, size; 754 sector_t sector; 755 const sector_t capacity = drbd_get_capacity(device->this_bdev); 756 bool stop_sector_reached = false; 757 758 if (unlikely(cancel)) 759 return 1; 760 761 number = drbd_rs_number_requests(device); 762 763 sector = device->ov_position; 764 for (i = 0; i < number; i++) { 765 if (sector >= capacity) 766 return 1; 767 768 /* We check for "finished" only in the reply path: 769 * w_e_end_ov_reply(). 770 * We need to send at least one request out. */ 771 stop_sector_reached = i > 0 772 && verify_can_do_stop_sector(device) 773 && sector >= device->ov_stop_sector; 774 if (stop_sector_reached) 775 break; 776 777 size = BM_BLOCK_SIZE; 778 779 if (drbd_rs_should_slow_down(device, sector) || 780 drbd_try_rs_begin_io(device, sector)) { 781 device->ov_position = sector; 782 goto requeue; 783 } 784 785 if (sector + (size>>9) > capacity) 786 size = (capacity-sector)<<9; 787 788 inc_rs_pending(device); 789 if (drbd_send_ov_request(first_peer_device(device), sector, size)) { 790 dec_rs_pending(device); 791 return 0; 792 } 793 sector += BM_SECT_PER_BIT; 794 } 795 device->ov_position = sector; 796 797 requeue: 798 device->rs_in_flight += (i << (BM_BLOCK_SHIFT - 9)); 799 if (i == 0 || !stop_sector_reached) 800 mod_timer(&device->resync_timer, jiffies + SLEEP_TIME); 801 return 1; 802} 803 804int w_ov_finished(struct drbd_work *w, int cancel) 805{ 806 struct drbd_device_work *dw = 807 container_of(w, struct drbd_device_work, w); 808 struct drbd_device *device = dw->device; 809 kfree(dw); 810 ov_out_of_sync_print(device); 811 drbd_resync_finished(device); 812 813 return 0; 814} 815 816static int w_resync_finished(struct drbd_work *w, int cancel) 817{ 818 struct drbd_device_work *dw = 819 container_of(w, struct drbd_device_work, w); 820 struct drbd_device *device = dw->device; 821 kfree(dw); 822 823 drbd_resync_finished(device); 824 825 return 0; 826} 827 828static void ping_peer(struct drbd_device *device) 829{ 830 struct drbd_connection *connection = first_peer_device(device)->connection; 831 832 clear_bit(GOT_PING_ACK, &connection->flags); 833 request_ping(connection); 834 wait_event(connection->ping_wait, 835 test_bit(GOT_PING_ACK, &connection->flags) || device->state.conn < C_CONNECTED); 836} 837 838int drbd_resync_finished(struct drbd_device *device) 839{ 840 unsigned long db, dt, dbdt; 841 unsigned long n_oos; 842 union drbd_state os, ns; 843 struct drbd_device_work *dw; 844 char *khelper_cmd = NULL; 845 int verify_done = 0; 846 847 /* Remove all elements from the resync LRU. Since future actions 848 * might set bits in the (main) bitmap, then the entries in the 849 * resync LRU would be wrong. */ 850 if (drbd_rs_del_all(device)) { 851 /* In case this is not possible now, most probably because 852 * there are P_RS_DATA_REPLY Packets lingering on the worker's 853 * queue (or even the read operations for those packets 854 * is not finished by now). Retry in 100ms. */ 855 856 schedule_timeout_interruptible(HZ / 10); 857 dw = kmalloc(sizeof(struct drbd_device_work), GFP_ATOMIC); 858 if (dw) { 859 dw->w.cb = w_resync_finished; 860 dw->device = device; 861 drbd_queue_work(&first_peer_device(device)->connection->sender_work, 862 &dw->w); 863 return 1; 864 } 865 drbd_err(device, "Warn failed to drbd_rs_del_all() and to kmalloc(dw).\n"); 866 } 867 868 dt = (jiffies - device->rs_start - device->rs_paused) / HZ; 869 if (dt <= 0) 870 dt = 1; 871 872 db = device->rs_total; 873 /* adjust for verify start and stop sectors, respective reached position */ 874 if (device->state.conn == C_VERIFY_S || device->state.conn == C_VERIFY_T) 875 db -= device->ov_left; 876 877 dbdt = Bit2KB(db/dt); 878 device->rs_paused /= HZ; 879 880 if (!get_ldev(device)) 881 goto out; 882 883 ping_peer(device); 884 885 spin_lock_irq(&device->resource->req_lock); 886 os = drbd_read_state(device); 887 888 verify_done = (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T); 889 890 /* This protects us against multiple calls (that can happen in the presence 891 of application IO), and against connectivity loss just before we arrive here. */ 892 if (os.conn <= C_CONNECTED) 893 goto out_unlock; 894 895 ns = os; 896 ns.conn = C_CONNECTED; 897 898 drbd_info(device, "%s done (total %lu sec; paused %lu sec; %lu K/sec)\n", 899 verify_done ? "Online verify" : "Resync", 900 dt + device->rs_paused, device->rs_paused, dbdt); 901 902 n_oos = drbd_bm_total_weight(device); 903 904 if (os.conn == C_VERIFY_S || os.conn == C_VERIFY_T) { 905 if (n_oos) { 906 drbd_alert(device, "Online verify found %lu %dk block out of sync!\n", 907 n_oos, Bit2KB(1)); 908 khelper_cmd = "out-of-sync"; 909 } 910 } else { 911 D_ASSERT(device, (n_oos - device->rs_failed) == 0); 912 913 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) 914 khelper_cmd = "after-resync-target"; 915 916 if (first_peer_device(device)->connection->csums_tfm && device->rs_total) { 917 const unsigned long s = device->rs_same_csum; 918 const unsigned long t = device->rs_total; 919 const int ratio = 920 (t == 0) ? 0 : 921 (t < 100000) ? ((s*100)/t) : (s/(t/100)); 922 drbd_info(device, "%u %% had equal checksums, eliminated: %luK; " 923 "transferred %luK total %luK\n", 924 ratio, 925 Bit2KB(device->rs_same_csum), 926 Bit2KB(device->rs_total - device->rs_same_csum), 927 Bit2KB(device->rs_total)); 928 } 929 } 930 931 if (device->rs_failed) { 932 drbd_info(device, " %lu failed blocks\n", device->rs_failed); 933 934 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 935 ns.disk = D_INCONSISTENT; 936 ns.pdsk = D_UP_TO_DATE; 937 } else { 938 ns.disk = D_UP_TO_DATE; 939 ns.pdsk = D_INCONSISTENT; 940 } 941 } else { 942 ns.disk = D_UP_TO_DATE; 943 ns.pdsk = D_UP_TO_DATE; 944 945 if (os.conn == C_SYNC_TARGET || os.conn == C_PAUSED_SYNC_T) { 946 if (device->p_uuid) { 947 int i; 948 for (i = UI_BITMAP ; i <= UI_HISTORY_END ; i++) 949 _drbd_uuid_set(device, i, device->p_uuid[i]); 950 drbd_uuid_set(device, UI_BITMAP, device->ldev->md.uuid[UI_CURRENT]); 951 _drbd_uuid_set(device, UI_CURRENT, device->p_uuid[UI_CURRENT]); 952 } else { 953 drbd_err(device, "device->p_uuid is NULL! BUG\n"); 954 } 955 } 956 957 if (!(os.conn == C_VERIFY_S || os.conn == C_VERIFY_T)) { 958 /* for verify runs, we don't update uuids here, 959 * so there would be nothing to report. */ 960 drbd_uuid_set_bm(device, 0UL); 961 drbd_print_uuids(device, "updated UUIDs"); 962 if (device->p_uuid) { 963 /* Now the two UUID sets are equal, update what we 964 * know of the peer. */ 965 int i; 966 for (i = UI_CURRENT ; i <= UI_HISTORY_END ; i++) 967 device->p_uuid[i] = device->ldev->md.uuid[i]; 968 } 969 } 970 } 971 972 _drbd_set_state(device, ns, CS_VERBOSE, NULL); 973out_unlock: 974 spin_unlock_irq(&device->resource->req_lock); 975 put_ldev(device); 976out: 977 device->rs_total = 0; 978 device->rs_failed = 0; 979 device->rs_paused = 0; 980 981 /* reset start sector, if we reached end of device */ 982 if (verify_done && device->ov_left == 0) 983 device->ov_start_sector = 0; 984 985 drbd_md_sync(device); 986 987 if (khelper_cmd) 988 drbd_khelper(device, khelper_cmd); 989 990 return 1; 991} 992 993/* helper */ 994static void move_to_net_ee_or_free(struct drbd_device *device, struct drbd_peer_request *peer_req) 995{ 996 if (drbd_peer_req_has_active_page(peer_req)) { 997 /* This might happen if sendpage() has not finished */ 998 int i = (peer_req->i.size + PAGE_SIZE -1) >> PAGE_SHIFT; 999 atomic_add(i, &device->pp_in_use_by_net); 1000 atomic_sub(i, &device->pp_in_use); 1001 spin_lock_irq(&device->resource->req_lock); 1002 list_add_tail(&peer_req->w.list, &device->net_ee); 1003 spin_unlock_irq(&device->resource->req_lock); 1004 wake_up(&drbd_pp_wait); 1005 } else 1006 drbd_free_peer_req(device, peer_req); 1007} 1008 1009/** 1010 * w_e_end_data_req() - Worker callback, to send a P_DATA_REPLY packet in response to a P_DATA_REQUEST 1011 * @device: DRBD device. 1012 * @w: work object. 1013 * @cancel: The connection will be closed anyways 1014 */ 1015int w_e_end_data_req(struct drbd_work *w, int cancel) 1016{ 1017 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1018 struct drbd_peer_device *peer_device = peer_req->peer_device; 1019 struct drbd_device *device = peer_device->device; 1020 int err; 1021 1022 if (unlikely(cancel)) { 1023 drbd_free_peer_req(device, peer_req); 1024 dec_unacked(device); 1025 return 0; 1026 } 1027 1028 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1029 err = drbd_send_block(peer_device, P_DATA_REPLY, peer_req); 1030 } else { 1031 if (__ratelimit(&drbd_ratelimit_state)) 1032 drbd_err(device, "Sending NegDReply. sector=%llus.\n", 1033 (unsigned long long)peer_req->i.sector); 1034 1035 err = drbd_send_ack(peer_device, P_NEG_DREPLY, peer_req); 1036 } 1037 1038 dec_unacked(device); 1039 1040 move_to_net_ee_or_free(device, peer_req); 1041 1042 if (unlikely(err)) 1043 drbd_err(device, "drbd_send_block() failed\n"); 1044 return err; 1045} 1046 1047/** 1048 * w_e_end_rsdata_req() - Worker callback to send a P_RS_DATA_REPLY packet in response to a P_RS_DATA_REQUEST 1049 * @w: work object. 1050 * @cancel: The connection will be closed anyways 1051 */ 1052int w_e_end_rsdata_req(struct drbd_work *w, int cancel) 1053{ 1054 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1055 struct drbd_peer_device *peer_device = peer_req->peer_device; 1056 struct drbd_device *device = peer_device->device; 1057 int err; 1058 1059 if (unlikely(cancel)) { 1060 drbd_free_peer_req(device, peer_req); 1061 dec_unacked(device); 1062 return 0; 1063 } 1064 1065 if (get_ldev_if_state(device, D_FAILED)) { 1066 drbd_rs_complete_io(device, peer_req->i.sector); 1067 put_ldev(device); 1068 } 1069 1070 if (device->state.conn == C_AHEAD) { 1071 err = drbd_send_ack(peer_device, P_RS_CANCEL, peer_req); 1072 } else if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1073 if (likely(device->state.pdsk >= D_INCONSISTENT)) { 1074 inc_rs_pending(device); 1075 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1076 } else { 1077 if (__ratelimit(&drbd_ratelimit_state)) 1078 drbd_err(device, "Not sending RSDataReply, " 1079 "partner DISKLESS!\n"); 1080 err = 0; 1081 } 1082 } else { 1083 if (__ratelimit(&drbd_ratelimit_state)) 1084 drbd_err(device, "Sending NegRSDReply. sector %llus.\n", 1085 (unsigned long long)peer_req->i.sector); 1086 1087 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1088 1089 /* update resync data with failure */ 1090 drbd_rs_failed_io(device, peer_req->i.sector, peer_req->i.size); 1091 } 1092 1093 dec_unacked(device); 1094 1095 move_to_net_ee_or_free(device, peer_req); 1096 1097 if (unlikely(err)) 1098 drbd_err(device, "drbd_send_block() failed\n"); 1099 return err; 1100} 1101 1102int w_e_end_csum_rs_req(struct drbd_work *w, int cancel) 1103{ 1104 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1105 struct drbd_peer_device *peer_device = peer_req->peer_device; 1106 struct drbd_device *device = peer_device->device; 1107 struct digest_info *di; 1108 int digest_size; 1109 void *digest = NULL; 1110 int err, eq = 0; 1111 1112 if (unlikely(cancel)) { 1113 drbd_free_peer_req(device, peer_req); 1114 dec_unacked(device); 1115 return 0; 1116 } 1117 1118 if (get_ldev(device)) { 1119 drbd_rs_complete_io(device, peer_req->i.sector); 1120 put_ldev(device); 1121 } 1122 1123 di = peer_req->digest; 1124 1125 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1126 /* quick hack to try to avoid a race against reconfiguration. 1127 * a real fix would be much more involved, 1128 * introducing more locking mechanisms */ 1129 if (peer_device->connection->csums_tfm) { 1130 digest_size = crypto_hash_digestsize(peer_device->connection->csums_tfm); 1131 D_ASSERT(device, digest_size == di->digest_size); 1132 digest = kmalloc(digest_size, GFP_NOIO); 1133 } 1134 if (digest) { 1135 drbd_csum_ee(peer_device->connection->csums_tfm, peer_req, digest); 1136 eq = !memcmp(digest, di->digest, digest_size); 1137 kfree(digest); 1138 } 1139 1140 if (eq) { 1141 drbd_set_in_sync(device, peer_req->i.sector, peer_req->i.size); 1142 /* rs_same_csums unit is BM_BLOCK_SIZE */ 1143 device->rs_same_csum += peer_req->i.size >> BM_BLOCK_SHIFT; 1144 err = drbd_send_ack(peer_device, P_RS_IS_IN_SYNC, peer_req); 1145 } else { 1146 inc_rs_pending(device); 1147 peer_req->block_id = ID_SYNCER; /* By setting block_id, digest pointer becomes invalid! */ 1148 peer_req->flags &= ~EE_HAS_DIGEST; /* This peer request no longer has a digest pointer */ 1149 kfree(di); 1150 err = drbd_send_block(peer_device, P_RS_DATA_REPLY, peer_req); 1151 } 1152 } else { 1153 err = drbd_send_ack(peer_device, P_NEG_RS_DREPLY, peer_req); 1154 if (__ratelimit(&drbd_ratelimit_state)) 1155 drbd_err(device, "Sending NegDReply. I guess it gets messy.\n"); 1156 } 1157 1158 dec_unacked(device); 1159 move_to_net_ee_or_free(device, peer_req); 1160 1161 if (unlikely(err)) 1162 drbd_err(device, "drbd_send_block/ack() failed\n"); 1163 return err; 1164} 1165 1166int w_e_end_ov_req(struct drbd_work *w, int cancel) 1167{ 1168 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1169 struct drbd_peer_device *peer_device = peer_req->peer_device; 1170 struct drbd_device *device = peer_device->device; 1171 sector_t sector = peer_req->i.sector; 1172 unsigned int size = peer_req->i.size; 1173 int digest_size; 1174 void *digest; 1175 int err = 0; 1176 1177 if (unlikely(cancel)) 1178 goto out; 1179 1180 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm); 1181 digest = kmalloc(digest_size, GFP_NOIO); 1182 if (!digest) { 1183 err = 1; /* terminate the connection in case the allocation failed */ 1184 goto out; 1185 } 1186 1187 if (likely(!(peer_req->flags & EE_WAS_ERROR))) 1188 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1189 else 1190 memset(digest, 0, digest_size); 1191 1192 /* Free e and pages before send. 1193 * In case we block on congestion, we could otherwise run into 1194 * some distributed deadlock, if the other side blocks on 1195 * congestion as well, because our receiver blocks in 1196 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1197 drbd_free_peer_req(device, peer_req); 1198 peer_req = NULL; 1199 inc_rs_pending(device); 1200 err = drbd_send_drequest_csum(peer_device, sector, size, digest, digest_size, P_OV_REPLY); 1201 if (err) 1202 dec_rs_pending(device); 1203 kfree(digest); 1204 1205out: 1206 if (peer_req) 1207 drbd_free_peer_req(device, peer_req); 1208 dec_unacked(device); 1209 return err; 1210} 1211 1212void drbd_ov_out_of_sync_found(struct drbd_device *device, sector_t sector, int size) 1213{ 1214 if (device->ov_last_oos_start + device->ov_last_oos_size == sector) { 1215 device->ov_last_oos_size += size>>9; 1216 } else { 1217 device->ov_last_oos_start = sector; 1218 device->ov_last_oos_size = size>>9; 1219 } 1220 drbd_set_out_of_sync(device, sector, size); 1221} 1222 1223int w_e_end_ov_reply(struct drbd_work *w, int cancel) 1224{ 1225 struct drbd_peer_request *peer_req = container_of(w, struct drbd_peer_request, w); 1226 struct drbd_peer_device *peer_device = peer_req->peer_device; 1227 struct drbd_device *device = peer_device->device; 1228 struct digest_info *di; 1229 void *digest; 1230 sector_t sector = peer_req->i.sector; 1231 unsigned int size = peer_req->i.size; 1232 int digest_size; 1233 int err, eq = 0; 1234 bool stop_sector_reached = false; 1235 1236 if (unlikely(cancel)) { 1237 drbd_free_peer_req(device, peer_req); 1238 dec_unacked(device); 1239 return 0; 1240 } 1241 1242 /* after "cancel", because after drbd_disconnect/drbd_rs_cancel_all 1243 * the resync lru has been cleaned up already */ 1244 if (get_ldev(device)) { 1245 drbd_rs_complete_io(device, peer_req->i.sector); 1246 put_ldev(device); 1247 } 1248 1249 di = peer_req->digest; 1250 1251 if (likely((peer_req->flags & EE_WAS_ERROR) == 0)) { 1252 digest_size = crypto_hash_digestsize(peer_device->connection->verify_tfm); 1253 digest = kmalloc(digest_size, GFP_NOIO); 1254 if (digest) { 1255 drbd_csum_ee(peer_device->connection->verify_tfm, peer_req, digest); 1256 1257 D_ASSERT(device, digest_size == di->digest_size); 1258 eq = !memcmp(digest, di->digest, digest_size); 1259 kfree(digest); 1260 } 1261 } 1262 1263 /* Free peer_req and pages before send. 1264 * In case we block on congestion, we could otherwise run into 1265 * some distributed deadlock, if the other side blocks on 1266 * congestion as well, because our receiver blocks in 1267 * drbd_alloc_pages due to pp_in_use > max_buffers. */ 1268 drbd_free_peer_req(device, peer_req); 1269 if (!eq) 1270 drbd_ov_out_of_sync_found(device, sector, size); 1271 else 1272 ov_out_of_sync_print(device); 1273 1274 err = drbd_send_ack_ex(peer_device, P_OV_RESULT, sector, size, 1275 eq ? ID_IN_SYNC : ID_OUT_OF_SYNC); 1276 1277 dec_unacked(device); 1278 1279 --device->ov_left; 1280 1281 /* let's advance progress step marks only for every other megabyte */ 1282 if ((device->ov_left & 0x200) == 0x200) 1283 drbd_advance_rs_marks(device, device->ov_left); 1284 1285 stop_sector_reached = verify_can_do_stop_sector(device) && 1286 (sector + (size>>9)) >= device->ov_stop_sector; 1287 1288 if (device->ov_left == 0 || stop_sector_reached) { 1289 ov_out_of_sync_print(device); 1290 drbd_resync_finished(device); 1291 } 1292 1293 return err; 1294} 1295 1296/* FIXME 1297 * We need to track the number of pending barrier acks, 1298 * and to be able to wait for them. 1299 * See also comment in drbd_adm_attach before drbd_suspend_io. 1300 */ 1301static int drbd_send_barrier(struct drbd_connection *connection) 1302{ 1303 struct p_barrier *p; 1304 struct drbd_socket *sock; 1305 1306 sock = &connection->data; 1307 p = conn_prepare_command(connection, sock); 1308 if (!p) 1309 return -EIO; 1310 p->barrier = connection->send.current_epoch_nr; 1311 p->pad = 0; 1312 connection->send.current_epoch_writes = 0; 1313 1314 return conn_send_command(connection, sock, P_BARRIER, sizeof(*p), NULL, 0); 1315} 1316 1317int w_send_write_hint(struct drbd_work *w, int cancel) 1318{ 1319 struct drbd_device *device = 1320 container_of(w, struct drbd_device, unplug_work); 1321 struct drbd_socket *sock; 1322 1323 if (cancel) 1324 return 0; 1325 sock = &first_peer_device(device)->connection->data; 1326 if (!drbd_prepare_command(first_peer_device(device), sock)) 1327 return -EIO; 1328 return drbd_send_command(first_peer_device(device), sock, P_UNPLUG_REMOTE, 0, NULL, 0); 1329} 1330 1331static void re_init_if_first_write(struct drbd_connection *connection, unsigned int epoch) 1332{ 1333 if (!connection->send.seen_any_write_yet) { 1334 connection->send.seen_any_write_yet = true; 1335 connection->send.current_epoch_nr = epoch; 1336 connection->send.current_epoch_writes = 0; 1337 } 1338} 1339 1340static void maybe_send_barrier(struct drbd_connection *connection, unsigned int epoch) 1341{ 1342 /* re-init if first write on this connection */ 1343 if (!connection->send.seen_any_write_yet) 1344 return; 1345 if (connection->send.current_epoch_nr != epoch) { 1346 if (connection->send.current_epoch_writes) 1347 drbd_send_barrier(connection); 1348 connection->send.current_epoch_nr = epoch; 1349 } 1350} 1351 1352int w_send_out_of_sync(struct drbd_work *w, int cancel) 1353{ 1354 struct drbd_request *req = container_of(w, struct drbd_request, w); 1355 struct drbd_device *device = req->device; 1356 struct drbd_peer_device *const peer_device = first_peer_device(device); 1357 struct drbd_connection *const connection = peer_device->connection; 1358 int err; 1359 1360 if (unlikely(cancel)) { 1361 req_mod(req, SEND_CANCELED); 1362 return 0; 1363 } 1364 1365 /* this time, no connection->send.current_epoch_writes++; 1366 * If it was sent, it was the closing barrier for the last 1367 * replicated epoch, before we went into AHEAD mode. 1368 * No more barriers will be sent, until we leave AHEAD mode again. */ 1369 maybe_send_barrier(connection, req->epoch); 1370 1371 err = drbd_send_out_of_sync(peer_device, req); 1372 req_mod(req, OOS_HANDED_TO_NETWORK); 1373 1374 return err; 1375} 1376 1377/** 1378 * w_send_dblock() - Worker callback to send a P_DATA packet in order to mirror a write request 1379 * @w: work object. 1380 * @cancel: The connection will be closed anyways 1381 */ 1382int w_send_dblock(struct drbd_work *w, int cancel) 1383{ 1384 struct drbd_request *req = container_of(w, struct drbd_request, w); 1385 struct drbd_device *device = req->device; 1386 struct drbd_peer_device *const peer_device = first_peer_device(device); 1387 struct drbd_connection *connection = peer_device->connection; 1388 int err; 1389 1390 if (unlikely(cancel)) { 1391 req_mod(req, SEND_CANCELED); 1392 return 0; 1393 } 1394 1395 re_init_if_first_write(connection, req->epoch); 1396 maybe_send_barrier(connection, req->epoch); 1397 connection->send.current_epoch_writes++; 1398 1399 err = drbd_send_dblock(peer_device, req); 1400 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1401 1402 return err; 1403} 1404 1405/** 1406 * w_send_read_req() - Worker callback to send a read request (P_DATA_REQUEST) packet 1407 * @w: work object. 1408 * @cancel: The connection will be closed anyways 1409 */ 1410int w_send_read_req(struct drbd_work *w, int cancel) 1411{ 1412 struct drbd_request *req = container_of(w, struct drbd_request, w); 1413 struct drbd_device *device = req->device; 1414 struct drbd_peer_device *const peer_device = first_peer_device(device); 1415 struct drbd_connection *connection = peer_device->connection; 1416 int err; 1417 1418 if (unlikely(cancel)) { 1419 req_mod(req, SEND_CANCELED); 1420 return 0; 1421 } 1422 1423 /* Even read requests may close a write epoch, 1424 * if there was any yet. */ 1425 maybe_send_barrier(connection, req->epoch); 1426 1427 err = drbd_send_drequest(peer_device, P_DATA_REQUEST, req->i.sector, req->i.size, 1428 (unsigned long)req); 1429 1430 req_mod(req, err ? SEND_FAILED : HANDED_OVER_TO_NETWORK); 1431 1432 return err; 1433} 1434 1435int w_restart_disk_io(struct drbd_work *w, int cancel) 1436{ 1437 struct drbd_request *req = container_of(w, struct drbd_request, w); 1438 struct drbd_device *device = req->device; 1439 1440 if (bio_data_dir(req->master_bio) == WRITE && req->rq_state & RQ_IN_ACT_LOG) 1441 drbd_al_begin_io(device, &req->i, false); 1442 1443 drbd_req_make_private_bio(req, req->master_bio); 1444 req->private_bio->bi_bdev = device->ldev->backing_bdev; 1445 generic_make_request(req->private_bio); 1446 1447 return 0; 1448} 1449 1450static int _drbd_may_sync_now(struct drbd_device *device) 1451{ 1452 struct drbd_device *odev = device; 1453 int resync_after; 1454 1455 while (1) { 1456 if (!odev->ldev || odev->state.disk == D_DISKLESS) 1457 return 1; 1458 rcu_read_lock(); 1459 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1460 rcu_read_unlock(); 1461 if (resync_after == -1) 1462 return 1; 1463 odev = minor_to_device(resync_after); 1464 if (!odev) 1465 return 1; 1466 if ((odev->state.conn >= C_SYNC_SOURCE && 1467 odev->state.conn <= C_PAUSED_SYNC_T) || 1468 odev->state.aftr_isp || odev->state.peer_isp || 1469 odev->state.user_isp) 1470 return 0; 1471 } 1472} 1473 1474/** 1475 * _drbd_pause_after() - Pause resync on all devices that may not resync now 1476 * @device: DRBD device. 1477 * 1478 * Called from process context only (admin command and after_state_ch). 1479 */ 1480static int _drbd_pause_after(struct drbd_device *device) 1481{ 1482 struct drbd_device *odev; 1483 int i, rv = 0; 1484 1485 rcu_read_lock(); 1486 idr_for_each_entry(&drbd_devices, odev, i) { 1487 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1488 continue; 1489 if (!_drbd_may_sync_now(odev)) 1490 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 1), CS_HARD, NULL) 1491 != SS_NOTHING_TO_DO); 1492 } 1493 rcu_read_unlock(); 1494 1495 return rv; 1496} 1497 1498/** 1499 * _drbd_resume_next() - Resume resync on all devices that may resync now 1500 * @device: DRBD device. 1501 * 1502 * Called from process context only (admin command and worker). 1503 */ 1504static int _drbd_resume_next(struct drbd_device *device) 1505{ 1506 struct drbd_device *odev; 1507 int i, rv = 0; 1508 1509 rcu_read_lock(); 1510 idr_for_each_entry(&drbd_devices, odev, i) { 1511 if (odev->state.conn == C_STANDALONE && odev->state.disk == D_DISKLESS) 1512 continue; 1513 if (odev->state.aftr_isp) { 1514 if (_drbd_may_sync_now(odev)) 1515 rv |= (__drbd_set_state(_NS(odev, aftr_isp, 0), 1516 CS_HARD, NULL) 1517 != SS_NOTHING_TO_DO) ; 1518 } 1519 } 1520 rcu_read_unlock(); 1521 return rv; 1522} 1523 1524void resume_next_sg(struct drbd_device *device) 1525{ 1526 write_lock_irq(&global_state_lock); 1527 _drbd_resume_next(device); 1528 write_unlock_irq(&global_state_lock); 1529} 1530 1531void suspend_other_sg(struct drbd_device *device) 1532{ 1533 write_lock_irq(&global_state_lock); 1534 _drbd_pause_after(device); 1535 write_unlock_irq(&global_state_lock); 1536} 1537 1538/* caller must hold global_state_lock */ 1539enum drbd_ret_code drbd_resync_after_valid(struct drbd_device *device, int o_minor) 1540{ 1541 struct drbd_device *odev; 1542 int resync_after; 1543 1544 if (o_minor == -1) 1545 return NO_ERROR; 1546 if (o_minor < -1 || o_minor > MINORMASK) 1547 return ERR_RESYNC_AFTER; 1548 1549 /* check for loops */ 1550 odev = minor_to_device(o_minor); 1551 while (1) { 1552 if (odev == device) 1553 return ERR_RESYNC_AFTER_CYCLE; 1554 1555 /* You are free to depend on diskless, non-existing, 1556 * or not yet/no longer existing minors. 1557 * We only reject dependency loops. 1558 * We cannot follow the dependency chain beyond a detached or 1559 * missing minor. 1560 */ 1561 if (!odev || !odev->ldev || odev->state.disk == D_DISKLESS) 1562 return NO_ERROR; 1563 1564 rcu_read_lock(); 1565 resync_after = rcu_dereference(odev->ldev->disk_conf)->resync_after; 1566 rcu_read_unlock(); 1567 /* dependency chain ends here, no cycles. */ 1568 if (resync_after == -1) 1569 return NO_ERROR; 1570 1571 /* follow the dependency chain */ 1572 odev = minor_to_device(resync_after); 1573 } 1574} 1575 1576/* caller must hold global_state_lock */ 1577void drbd_resync_after_changed(struct drbd_device *device) 1578{ 1579 int changes; 1580 1581 do { 1582 changes = _drbd_pause_after(device); 1583 changes |= _drbd_resume_next(device); 1584 } while (changes); 1585} 1586 1587void drbd_rs_controller_reset(struct drbd_device *device) 1588{ 1589 struct fifo_buffer *plan; 1590 1591 atomic_set(&device->rs_sect_in, 0); 1592 atomic_set(&device->rs_sect_ev, 0); 1593 device->rs_in_flight = 0; 1594 1595 /* Updating the RCU protected object in place is necessary since 1596 this function gets called from atomic context. 1597 It is valid since all other updates also lead to an completely 1598 empty fifo */ 1599 rcu_read_lock(); 1600 plan = rcu_dereference(device->rs_plan_s); 1601 plan->total = 0; 1602 fifo_set(plan, 0); 1603 rcu_read_unlock(); 1604} 1605 1606void start_resync_timer_fn(unsigned long data) 1607{ 1608 struct drbd_device *device = (struct drbd_device *) data; 1609 1610 drbd_queue_work(&first_peer_device(device)->connection->sender_work, 1611 &device->start_resync_work); 1612} 1613 1614int w_start_resync(struct drbd_work *w, int cancel) 1615{ 1616 struct drbd_device *device = 1617 container_of(w, struct drbd_device, start_resync_work); 1618 1619 if (atomic_read(&device->unacked_cnt) || atomic_read(&device->rs_pending_cnt)) { 1620 drbd_warn(device, "w_start_resync later...\n"); 1621 device->start_resync_timer.expires = jiffies + HZ/10; 1622 add_timer(&device->start_resync_timer); 1623 return 0; 1624 } 1625 1626 drbd_start_resync(device, C_SYNC_SOURCE); 1627 clear_bit(AHEAD_TO_SYNC_SOURCE, &device->flags); 1628 return 0; 1629} 1630 1631/** 1632 * drbd_start_resync() - Start the resync process 1633 * @device: DRBD device. 1634 * @side: Either C_SYNC_SOURCE or C_SYNC_TARGET 1635 * 1636 * This function might bring you directly into one of the 1637 * C_PAUSED_SYNC_* states. 1638 */ 1639void drbd_start_resync(struct drbd_device *device, enum drbd_conns side) 1640{ 1641 struct drbd_peer_device *peer_device = first_peer_device(device); 1642 struct drbd_connection *connection = peer_device ? peer_device->connection : NULL; 1643 union drbd_state ns; 1644 int r; 1645 1646 if (device->state.conn >= C_SYNC_SOURCE && device->state.conn < C_AHEAD) { 1647 drbd_err(device, "Resync already running!\n"); 1648 return; 1649 } 1650 1651 if (!test_bit(B_RS_H_DONE, &device->flags)) { 1652 if (side == C_SYNC_TARGET) { 1653 /* Since application IO was locked out during C_WF_BITMAP_T and 1654 C_WF_SYNC_UUID we are still unmodified. Before going to C_SYNC_TARGET 1655 we check that we might make the data inconsistent. */ 1656 r = drbd_khelper(device, "before-resync-target"); 1657 r = (r >> 8) & 0xff; 1658 if (r > 0) { 1659 drbd_info(device, "before-resync-target handler returned %d, " 1660 "dropping connection.\n", r); 1661 conn_request_state(connection, NS(conn, C_DISCONNECTING), CS_HARD); 1662 return; 1663 } 1664 } else /* C_SYNC_SOURCE */ { 1665 r = drbd_khelper(device, "before-resync-source"); 1666 r = (r >> 8) & 0xff; 1667 if (r > 0) { 1668 if (r == 3) { 1669 drbd_info(device, "before-resync-source handler returned %d, " 1670 "ignoring. Old userland tools?", r); 1671 } else { 1672 drbd_info(device, "before-resync-source handler returned %d, " 1673 "dropping connection.\n", r); 1674 conn_request_state(connection, 1675 NS(conn, C_DISCONNECTING), CS_HARD); 1676 return; 1677 } 1678 } 1679 } 1680 } 1681 1682 if (current == connection->worker.task) { 1683 /* The worker should not sleep waiting for state_mutex, 1684 that can take long */ 1685 if (!mutex_trylock(device->state_mutex)) { 1686 set_bit(B_RS_H_DONE, &device->flags); 1687 device->start_resync_timer.expires = jiffies + HZ/5; 1688 add_timer(&device->start_resync_timer); 1689 return; 1690 } 1691 } else { 1692 mutex_lock(device->state_mutex); 1693 } 1694 clear_bit(B_RS_H_DONE, &device->flags); 1695 1696 /* req_lock: serialize with drbd_send_and_submit() and others 1697 * global_state_lock: for stable sync-after dependencies */ 1698 spin_lock_irq(&device->resource->req_lock); 1699 write_lock(&global_state_lock); 1700 /* Did some connection breakage or IO error race with us? */ 1701 if (device->state.conn < C_CONNECTED 1702 || !get_ldev_if_state(device, D_NEGOTIATING)) { 1703 write_unlock(&global_state_lock); 1704 spin_unlock_irq(&device->resource->req_lock); 1705 mutex_unlock(device->state_mutex); 1706 return; 1707 } 1708 1709 ns = drbd_read_state(device); 1710 1711 ns.aftr_isp = !_drbd_may_sync_now(device); 1712 1713 ns.conn = side; 1714 1715 if (side == C_SYNC_TARGET) 1716 ns.disk = D_INCONSISTENT; 1717 else /* side == C_SYNC_SOURCE */ 1718 ns.pdsk = D_INCONSISTENT; 1719 1720 r = __drbd_set_state(device, ns, CS_VERBOSE, NULL); 1721 ns = drbd_read_state(device); 1722 1723 if (ns.conn < C_CONNECTED) 1724 r = SS_UNKNOWN_ERROR; 1725 1726 if (r == SS_SUCCESS) { 1727 unsigned long tw = drbd_bm_total_weight(device); 1728 unsigned long now = jiffies; 1729 int i; 1730 1731 device->rs_failed = 0; 1732 device->rs_paused = 0; 1733 device->rs_same_csum = 0; 1734 device->rs_last_events = 0; 1735 device->rs_last_sect_ev = 0; 1736 device->rs_total = tw; 1737 device->rs_start = now; 1738 for (i = 0; i < DRBD_SYNC_MARKS; i++) { 1739 device->rs_mark_left[i] = tw; 1740 device->rs_mark_time[i] = now; 1741 } 1742 _drbd_pause_after(device); 1743 } 1744 write_unlock(&global_state_lock); 1745 spin_unlock_irq(&device->resource->req_lock); 1746 1747 if (r == SS_SUCCESS) { 1748 /* reset rs_last_bcast when a resync or verify is started, 1749 * to deal with potential jiffies wrap. */ 1750 device->rs_last_bcast = jiffies - HZ; 1751 1752 drbd_info(device, "Began resync as %s (will sync %lu KB [%lu bits set]).\n", 1753 drbd_conn_str(ns.conn), 1754 (unsigned long) device->rs_total << (BM_BLOCK_SHIFT-10), 1755 (unsigned long) device->rs_total); 1756 if (side == C_SYNC_TARGET) 1757 device->bm_resync_fo = 0; 1758 1759 /* Since protocol 96, we must serialize drbd_gen_and_send_sync_uuid 1760 * with w_send_oos, or the sync target will get confused as to 1761 * how much bits to resync. We cannot do that always, because for an 1762 * empty resync and protocol < 95, we need to do it here, as we call 1763 * drbd_resync_finished from here in that case. 1764 * We drbd_gen_and_send_sync_uuid here for protocol < 96, 1765 * and from after_state_ch otherwise. */ 1766 if (side == C_SYNC_SOURCE && connection->agreed_pro_version < 96) 1767 drbd_gen_and_send_sync_uuid(peer_device); 1768 1769 if (connection->agreed_pro_version < 95 && device->rs_total == 0) { 1770 /* This still has a race (about when exactly the peers 1771 * detect connection loss) that can lead to a full sync 1772 * on next handshake. In 8.3.9 we fixed this with explicit 1773 * resync-finished notifications, but the fix 1774 * introduces a protocol change. Sleeping for some 1775 * time longer than the ping interval + timeout on the 1776 * SyncSource, to give the SyncTarget the chance to 1777 * detect connection loss, then waiting for a ping 1778 * response (implicit in drbd_resync_finished) reduces 1779 * the race considerably, but does not solve it. */ 1780 if (side == C_SYNC_SOURCE) { 1781 struct net_conf *nc; 1782 int timeo; 1783 1784 rcu_read_lock(); 1785 nc = rcu_dereference(connection->net_conf); 1786 timeo = nc->ping_int * HZ + nc->ping_timeo * HZ / 9; 1787 rcu_read_unlock(); 1788 schedule_timeout_interruptible(timeo); 1789 } 1790 drbd_resync_finished(device); 1791 } 1792 1793 drbd_rs_controller_reset(device); 1794 /* ns.conn may already be != device->state.conn, 1795 * we may have been paused in between, or become paused until 1796 * the timer triggers. 1797 * No matter, that is handled in resync_timer_fn() */ 1798 if (ns.conn == C_SYNC_TARGET) 1799 mod_timer(&device->resync_timer, jiffies); 1800 1801 drbd_md_sync(device); 1802 } 1803 put_ldev(device); 1804 mutex_unlock(device->state_mutex); 1805} 1806 1807static void update_on_disk_bitmap(struct drbd_device *device) 1808{ 1809 struct sib_info sib = { .sib_reason = SIB_SYNC_PROGRESS, }; 1810 device->rs_last_bcast = jiffies; 1811 1812 if (!get_ldev(device)) 1813 return; 1814 1815 drbd_bm_write_lazy(device, 0); 1816 if (drbd_bm_total_weight(device) <= device->rs_failed) 1817 drbd_resync_finished(device); 1818 drbd_bcast_event(device, &sib); 1819 /* update timestamp, in case it took a while to write out stuff */ 1820 device->rs_last_bcast = jiffies; 1821 put_ldev(device); 1822} 1823 1824bool wants_lazy_bitmap_update(struct drbd_device *device) 1825{ 1826 enum drbd_conns connection_state = device->state.conn; 1827 return 1828 /* only do a lazy writeout, if device is in some resync state */ 1829 (connection_state == C_SYNC_SOURCE 1830 || connection_state == C_SYNC_TARGET 1831 || connection_state == C_PAUSED_SYNC_S 1832 || connection_state == C_PAUSED_SYNC_T) && 1833 /* AND 1834 * either we just finished, or the last lazy update 1835 * was some time ago already. */ 1836 (drbd_bm_total_weight(device) <= device->rs_failed 1837 || time_after(jiffies, device->rs_last_bcast + 2*HZ)); 1838} 1839 1840static void try_update_all_on_disk_bitmaps(struct drbd_connection *connection) 1841{ 1842 struct drbd_peer_device *peer_device; 1843 int vnr; 1844 1845 rcu_read_lock(); 1846 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 1847 struct drbd_device *device = peer_device->device; 1848 if (!wants_lazy_bitmap_update(device)) 1849 continue; 1850 kref_get(&device->kref); 1851 rcu_read_unlock(); 1852 update_on_disk_bitmap(device); 1853 kref_put(&device->kref, drbd_destroy_device); 1854 rcu_read_lock(); 1855 } 1856 rcu_read_unlock(); 1857} 1858 1859static bool dequeue_work_batch(struct drbd_work_queue *queue, struct list_head *work_list) 1860{ 1861 spin_lock_irq(&queue->q_lock); 1862 list_splice_init(&queue->q, work_list); 1863 spin_unlock_irq(&queue->q_lock); 1864 return !list_empty(work_list); 1865} 1866 1867static bool dequeue_work_item(struct drbd_work_queue *queue, struct list_head *work_list) 1868{ 1869 spin_lock_irq(&queue->q_lock); 1870 if (!list_empty(&queue->q)) 1871 list_move(queue->q.next, work_list); 1872 spin_unlock_irq(&queue->q_lock); 1873 return !list_empty(work_list); 1874} 1875 1876static void wait_for_work(struct drbd_connection *connection, struct list_head *work_list) 1877{ 1878 DEFINE_WAIT(wait); 1879 struct net_conf *nc; 1880 int uncork, cork; 1881 1882 dequeue_work_item(&connection->sender_work, work_list); 1883 if (!list_empty(work_list)) 1884 return; 1885 1886 /* Still nothing to do? 1887 * Maybe we still need to close the current epoch, 1888 * even if no new requests are queued yet. 1889 * 1890 * Also, poke TCP, just in case. 1891 * Then wait for new work (or signal). */ 1892 rcu_read_lock(); 1893 nc = rcu_dereference(connection->net_conf); 1894 uncork = nc ? nc->tcp_cork : 0; 1895 rcu_read_unlock(); 1896 if (uncork) { 1897 mutex_lock(&connection->data.mutex); 1898 if (connection->data.socket) 1899 drbd_tcp_uncork(connection->data.socket); 1900 mutex_unlock(&connection->data.mutex); 1901 } 1902 1903 for (;;) { 1904 int send_barrier; 1905 prepare_to_wait(&connection->sender_work.q_wait, &wait, TASK_INTERRUPTIBLE); 1906 spin_lock_irq(&connection->resource->req_lock); 1907 spin_lock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 1908 /* dequeue single item only, 1909 * we still use drbd_queue_work_front() in some places */ 1910 if (!list_empty(&connection->sender_work.q)) 1911 list_move(connection->sender_work.q.next, work_list); 1912 spin_unlock(&connection->sender_work.q_lock); /* FIXME get rid of this one? */ 1913 if (!list_empty(work_list) || signal_pending(current)) { 1914 spin_unlock_irq(&connection->resource->req_lock); 1915 break; 1916 } 1917 1918 /* We found nothing new to do, no to-be-communicated request, 1919 * no other work item. We may still need to close the last 1920 * epoch. Next incoming request epoch will be connection -> 1921 * current transfer log epoch number. If that is different 1922 * from the epoch of the last request we communicated, it is 1923 * safe to send the epoch separating barrier now. 1924 */ 1925 send_barrier = 1926 atomic_read(&connection->current_tle_nr) != 1927 connection->send.current_epoch_nr; 1928 spin_unlock_irq(&connection->resource->req_lock); 1929 1930 if (send_barrier) 1931 maybe_send_barrier(connection, 1932 connection->send.current_epoch_nr + 1); 1933 /* drbd_send() may have called flush_signals() */ 1934 if (get_t_state(&connection->worker) != RUNNING) 1935 break; 1936 schedule(); 1937 /* may be woken up for other things but new work, too, 1938 * e.g. if the current epoch got closed. 1939 * In which case we send the barrier above. */ 1940 1941 try_update_all_on_disk_bitmaps(connection); 1942 } 1943 finish_wait(&connection->sender_work.q_wait, &wait); 1944 1945 /* someone may have changed the config while we have been waiting above. */ 1946 rcu_read_lock(); 1947 nc = rcu_dereference(connection->net_conf); 1948 cork = nc ? nc->tcp_cork : 0; 1949 rcu_read_unlock(); 1950 mutex_lock(&connection->data.mutex); 1951 if (connection->data.socket) { 1952 if (cork) 1953 drbd_tcp_cork(connection->data.socket); 1954 else if (!uncork) 1955 drbd_tcp_uncork(connection->data.socket); 1956 } 1957 mutex_unlock(&connection->data.mutex); 1958} 1959 1960int drbd_worker(struct drbd_thread *thi) 1961{ 1962 struct drbd_connection *connection = thi->connection; 1963 struct drbd_work *w = NULL; 1964 struct drbd_peer_device *peer_device; 1965 LIST_HEAD(work_list); 1966 int vnr; 1967 1968 while (get_t_state(thi) == RUNNING) { 1969 drbd_thread_current_set_cpu(thi); 1970 1971 /* as long as we use drbd_queue_work_front(), 1972 * we may only dequeue single work items here, not batches. */ 1973 if (list_empty(&work_list)) 1974 wait_for_work(connection, &work_list); 1975 1976 if (signal_pending(current)) { 1977 flush_signals(current); 1978 if (get_t_state(thi) == RUNNING) { 1979 drbd_warn(connection, "Worker got an unexpected signal\n"); 1980 continue; 1981 } 1982 break; 1983 } 1984 1985 if (get_t_state(thi) != RUNNING) 1986 break; 1987 1988 while (!list_empty(&work_list)) { 1989 w = list_first_entry(&work_list, struct drbd_work, list); 1990 list_del_init(&w->list); 1991 if (w->cb(w, connection->cstate < C_WF_REPORT_PARAMS) == 0) 1992 continue; 1993 if (connection->cstate >= C_WF_REPORT_PARAMS) 1994 conn_request_state(connection, NS(conn, C_NETWORK_FAILURE), CS_HARD); 1995 } 1996 } 1997 1998 do { 1999 while (!list_empty(&work_list)) { 2000 w = list_first_entry(&work_list, struct drbd_work, list); 2001 list_del_init(&w->list); 2002 w->cb(w, 1); 2003 } 2004 dequeue_work_batch(&connection->sender_work, &work_list); 2005 } while (!list_empty(&work_list)); 2006 2007 rcu_read_lock(); 2008 idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { 2009 struct drbd_device *device = peer_device->device; 2010 D_ASSERT(device, device->state.disk == D_DISKLESS && device->state.conn == C_STANDALONE); 2011 kref_get(&device->kref); 2012 rcu_read_unlock(); 2013 drbd_device_cleanup(device); 2014 kref_put(&device->kref, drbd_destroy_device); 2015 rcu_read_lock(); 2016 } 2017 rcu_read_unlock(); 2018 2019 return 0; 2020} 2021