1// Copyright 2006 Google Inc. All Rights Reserved.
2
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6
7//      http://www.apache.org/licenses/LICENSE-2.0
8
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// worker.cc : individual tasks that can be run in combination to
16// stress the system
17
18#include <errno.h>
19#include <pthread.h>
20#include <sched.h>
21#include <signal.h>
22#include <stdlib.h>
23#include <stdio.h>
24#include <stdint.h>
25#include <string.h>
26#include <time.h>
27#include <unistd.h>
28
29#include <sys/select.h>
30#include <sys/stat.h>
31#include <sys/types.h>
32#include <sys/times.h>
33
34// These are necessary, but on by default
35// #define __USE_GNU
36// #define __USE_LARGEFILE64
37#include <fcntl.h>
38#include <sys/socket.h>
39#include <netdb.h>
40#include <arpa/inet.h>
41#include <linux/unistd.h>  // for gettid
42
43// For size of block device
44#include <sys/ioctl.h>
45#include <linux/fs.h>
46// For asynchronous I/O
47#ifdef HAVE_LIBAIO_H
48#include <libaio.h>
49#endif
50
51#include <sys/syscall.h>
52
53#include <set>
54#include <string>
55
56// This file must work with autoconf on its public version,
57// so these includes are correct.
58#include "error_diag.h"  // NOLINT
59#include "os.h"          // NOLINT
60#include "pattern.h"     // NOLINT
61#include "queue.h"       // NOLINT
62#include "sat.h"         // NOLINT
63#include "sattypes.h"    // NOLINT
64#include "worker.h"      // NOLINT
65
66// Syscalls
67// Why ubuntu, do you hate gettid so bad?
68#if !defined(__NR_gettid)
69  #define __NR_gettid             224
70#endif
71
72#define gettid() syscall(__NR_gettid)
73#if !defined(CPU_SETSIZE)
74_syscall3(int, sched_getaffinity, pid_t, pid,
75          unsigned int, len, cpu_set_t*, mask)
76_syscall3(int, sched_setaffinity, pid_t, pid,
77          unsigned int, len, cpu_set_t*, mask)
78#endif
79
80namespace {
81  // Get HW core ID from cpuid instruction.
82  inline int apicid(void) {
83    int cpu;
84#if defined(STRESSAPPTEST_CPU_X86_64) || defined(STRESSAPPTEST_CPU_I686)
85    __asm __volatile("cpuid" : "=b" (cpu) : "a" (1) : "cx", "dx");
86#elif defined(STRESSAPPTEST_CPU_ARMV7A)
87  #warning "Unsupported CPU type ARMV7A: unable to determine core ID."
88    cpu = 0;
89#else
90  #warning "Unsupported CPU type: unable to determine core ID."
91    cpu = 0;
92#endif
93    return (cpu >> 24);
94  }
95
96  // Work around the sad fact that there are two (gnu, xsi) incompatible
97  // versions of strerror_r floating around google. Awesome.
98  bool sat_strerror(int err, char *buf, int len) {
99    buf[0] = 0;
100    char *errmsg = reinterpret_cast<char*>(strerror_r(err, buf, len));
101    int retval = reinterpret_cast<int64>(errmsg);
102    if (retval == 0)
103      return true;
104    if (retval == -1)
105      return false;
106    if (errmsg != buf) {
107      strncpy(buf, errmsg, len);
108      buf[len - 1] = 0;
109    }
110    return true;
111  }
112
113
114  inline uint64 addr_to_tag(void *address) {
115    return reinterpret_cast<uint64>(address);
116  }
117}
118
119#if !defined(O_DIRECT)
120// Sometimes this isn't available.
121// Disregard if it's not defined.
122  #define O_DIRECT            0
123#endif
124
125// A struct to hold captured errors, for later reporting.
126struct ErrorRecord {
127  uint64 actual;  // This is the actual value read.
128  uint64 reread;  // This is the actual value, reread.
129  uint64 expected;  // This is what it should have been.
130  uint64 *vaddr;  // This is where it was (or wasn't).
131  char *vbyteaddr;  // This is byte specific where the data was (or wasn't).
132  uint64 paddr;  // This is the bus address, if available.
133  uint64 *tagvaddr;  // This holds the tag value if this data was tagged.
134  uint64 tagpaddr;  // This holds the physical address corresponding to the tag.
135};
136
137// This is a helper function to create new threads with pthreads.
138static void *ThreadSpawnerGeneric(void *ptr) {
139  WorkerThread *worker = static_cast<WorkerThread*>(ptr);
140  worker->StartRoutine();
141  return NULL;
142}
143
144void WorkerStatus::Initialize() {
145  sat_assert(0 == pthread_mutex_init(&num_workers_mutex_, NULL));
146  sat_assert(0 == pthread_rwlock_init(&status_rwlock_, NULL));
147#ifdef _POSIX_BARRIERS
148  sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL,
149                                       num_workers_ + 1));
150#endif
151}
152
153void WorkerStatus::Destroy() {
154  sat_assert(0 == pthread_mutex_destroy(&num_workers_mutex_));
155  sat_assert(0 == pthread_rwlock_destroy(&status_rwlock_));
156#ifdef _POSIX_BARRIERS
157  sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
158#endif
159}
160
161void WorkerStatus::PauseWorkers() {
162  if (SetStatus(PAUSE) != PAUSE)
163    WaitOnPauseBarrier();
164}
165
166void WorkerStatus::ResumeWorkers() {
167  if (SetStatus(RUN) == PAUSE)
168    WaitOnPauseBarrier();
169}
170
171void WorkerStatus::StopWorkers() {
172  if (SetStatus(STOP) == PAUSE)
173    WaitOnPauseBarrier();
174}
175
176bool WorkerStatus::ContinueRunning() {
177  // This loop is an optimization.  We use it to immediately re-check the status
178  // after resuming from a pause, instead of returning and waiting for the next
179  // call to this function.
180  for (;;) {
181    switch (GetStatus()) {
182      case RUN:
183        return true;
184      case PAUSE:
185        // Wait for the other workers to call this function so that
186        // PauseWorkers() can return.
187        WaitOnPauseBarrier();
188        // Wait for ResumeWorkers() to be called.
189        WaitOnPauseBarrier();
190        break;
191      case STOP:
192        return false;
193    }
194  }
195}
196
197bool WorkerStatus::ContinueRunningNoPause() {
198  return (GetStatus() != STOP);
199}
200
201void WorkerStatus::RemoveSelf() {
202  // Acquire a read lock on status_rwlock_ while (status_ != PAUSE).
203  for (;;) {
204    AcquireStatusReadLock();
205    if (status_ != PAUSE)
206      break;
207    // We need to obey PauseWorkers() just like ContinueRunning() would, so that
208    // the other threads won't wait on pause_barrier_ forever.
209    ReleaseStatusLock();
210    // Wait for the other workers to call this function so that PauseWorkers()
211    // can return.
212    WaitOnPauseBarrier();
213    // Wait for ResumeWorkers() to be called.
214    WaitOnPauseBarrier();
215  }
216
217  // This lock would be unnecessary if we held a write lock instead of a read
218  // lock on status_rwlock_, but that would also force all threads calling
219  // ContinueRunning() to wait on this one.  Using a separate lock avoids that.
220  AcquireNumWorkersLock();
221  // Decrement num_workers_ and reinitialize pause_barrier_, which we know isn't
222  // in use because (status != PAUSE).
223#ifdef _POSIX_BARRIERS
224  sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
225  sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL, num_workers_));
226#endif
227  --num_workers_;
228  ReleaseNumWorkersLock();
229
230  // Release status_rwlock_.
231  ReleaseStatusLock();
232}
233
234
235// Parent thread class.
236WorkerThread::WorkerThread() {
237  status_ = false;
238  pages_copied_ = 0;
239  errorcount_ = 0;
240  runduration_usec_ = 1;
241  priority_ = Normal;
242  worker_status_ = NULL;
243  thread_spawner_ = &ThreadSpawnerGeneric;
244  tag_mode_ = false;
245}
246
247WorkerThread::~WorkerThread() {}
248
249// Constructors. Just init some default values.
250FillThread::FillThread() {
251  num_pages_to_fill_ = 0;
252}
253
254// Initialize file name to empty.
255FileThread::FileThread() {
256  filename_ = "";
257  devicename_ = "";
258  pass_ = 0;
259  page_io_ = true;
260  crc_page_ = -1;
261  local_page_ = NULL;
262}
263
264// If file thread used bounce buffer in memory, account for the extra
265// copy for memory bandwidth calculation.
266float FileThread::GetMemoryCopiedData() {
267  if (!os_->normal_mem())
268    return GetCopiedData();
269  else
270    return 0;
271}
272
273// Initialize target hostname to be invalid.
274NetworkThread::NetworkThread() {
275  snprintf(ipaddr_, sizeof(ipaddr_), "Unknown");
276  sock_ = 0;
277}
278
279// Initialize?
280NetworkSlaveThread::NetworkSlaveThread() {
281}
282
283// Initialize?
284NetworkListenThread::NetworkListenThread() {
285}
286
287// Init member variables.
288void WorkerThread::InitThread(int thread_num_init,
289                              class Sat *sat_init,
290                              class OsLayer *os_init,
291                              class PatternList *patternlist_init,
292                              WorkerStatus *worker_status) {
293  sat_assert(worker_status);
294  worker_status->AddWorkers(1);
295
296  thread_num_ = thread_num_init;
297  sat_ = sat_init;
298  os_ = os_init;
299  patternlist_ = patternlist_init;
300  worker_status_ = worker_status;
301
302  AvailableCpus(&cpu_mask_);
303  tag_ = 0xffffffff;
304
305  tag_mode_ = sat_->tag_mode();
306}
307
308
309// Use pthreads to prioritize a system thread.
310bool WorkerThread::InitPriority() {
311  // This doesn't affect performance that much, and may not be too safe.
312
313  bool ret = BindToCpus(&cpu_mask_);
314  if (!ret)
315    logprintf(11, "Log: Bind to %s failed.\n",
316              cpuset_format(&cpu_mask_).c_str());
317
318  logprintf(11, "Log: Thread %d running on apic ID %d mask %s (%s).\n",
319            thread_num_, apicid(),
320            CurrentCpusFormat().c_str(),
321            cpuset_format(&cpu_mask_).c_str());
322#if 0
323  if (priority_ == High) {
324    sched_param param;
325    param.sched_priority = 1;
326    // Set the priority; others are unchanged.
327    logprintf(0, "Log: Changing priority to SCHED_FIFO %d\n",
328              param.sched_priority);
329    if (sched_setscheduler(0, SCHED_FIFO, &param)) {
330      char buf[256];
331      sat_strerror(errno, buf, sizeof(buf));
332      logprintf(0, "Process Error: sched_setscheduler "
333                   "failed - error %d %s\n",
334                errno, buf);
335    }
336  }
337#endif
338  return true;
339}
340
341// Use pthreads to create a system thread.
342int WorkerThread::SpawnThread() {
343  // Create the new thread.
344  int result = pthread_create(&thread_, NULL, thread_spawner_, this);
345  if (result) {
346    char buf[256];
347    sat_strerror(result, buf, sizeof(buf));
348    logprintf(0, "Process Error: pthread_create "
349                  "failed - error %d %s\n", result,
350              buf);
351    status_ = false;
352    return false;
353  }
354
355  // 0 is pthreads success.
356  return true;
357}
358
359// Kill the worker thread with SIGINT.
360bool WorkerThread::KillThread() {
361  return (pthread_kill(thread_, SIGINT) == 0);
362}
363
364// Block until thread has exited.
365bool WorkerThread::JoinThread() {
366  int result = pthread_join(thread_, NULL);
367
368  if (result) {
369    logprintf(0, "Process Error: pthread_join failed - error %d\n", result);
370    status_ = false;
371  }
372
373  // 0 is pthreads success.
374  return (!result);
375}
376
377
378void WorkerThread::StartRoutine() {
379  InitPriority();
380  StartThreadTimer();
381  Work();
382  StopThreadTimer();
383  worker_status_->RemoveSelf();
384}
385
386
387// Thread work loop. Execute until marked finished.
388bool WorkerThread::Work() {
389  do {
390    logprintf(9, "Log: ...\n");
391    // Sleep for 1 second.
392    sat_sleep(1);
393  } while (IsReadyToRun());
394
395  return false;
396}
397
398
399// Returns CPU mask of CPUs available to this process,
400// Conceptually, each bit represents a logical CPU, ie:
401//   mask = 3  (11b):   cpu0, 1
402//   mask = 13 (1101b): cpu0, 2, 3
403bool WorkerThread::AvailableCpus(cpu_set_t *cpuset) {
404  CPU_ZERO(cpuset);
405#ifdef HAVE_SCHED_GETAFFINITY
406  return sched_getaffinity(getppid(), sizeof(*cpuset), cpuset) == 0;
407#else
408  return 0;
409#endif
410}
411
412
413// Returns CPU mask of CPUs this thread is bound to,
414// Conceptually, each bit represents a logical CPU, ie:
415//   mask = 3  (11b):   cpu0, 1
416//   mask = 13 (1101b): cpu0, 2, 3
417bool WorkerThread::CurrentCpus(cpu_set_t *cpuset) {
418  CPU_ZERO(cpuset);
419#ifdef HAVE_SCHED_GETAFFINITY
420  return sched_getaffinity(0, sizeof(*cpuset), cpuset) == 0;
421#else
422  return 0;
423#endif
424}
425
426
427// Bind worker thread to specified CPU(s)
428//   Args:
429//     thread_mask: cpu_set_t representing CPUs, ie
430//                  mask = 1  (01b):   cpu0
431//                  mask = 3  (11b):   cpu0, 1
432//                  mask = 13 (1101b): cpu0, 2, 3
433//
434//   Returns true on success, false otherwise.
435bool WorkerThread::BindToCpus(const cpu_set_t *thread_mask) {
436  cpu_set_t process_mask;
437  AvailableCpus(&process_mask);
438  if (cpuset_isequal(thread_mask, &process_mask))
439    return true;
440
441  logprintf(11, "Log: available CPU mask - %s\n",
442            cpuset_format(&process_mask).c_str());
443  if (!cpuset_issubset(thread_mask, &process_mask)) {
444    // Invalid cpu_mask, ie cpu not allocated to this process or doesn't exist.
445    logprintf(0, "Log: requested CPUs %s not a subset of available %s\n",
446              cpuset_format(thread_mask).c_str(),
447              cpuset_format(&process_mask).c_str());
448    return false;
449  }
450#ifdef HAVE_SCHED_GETAFFINITY
451  return (sched_setaffinity(gettid(), sizeof(*thread_mask), thread_mask) == 0);
452#else
453  return 0;
454#endif
455}
456
457
458// A worker thread can yield itself to give up CPU until it's scheduled again.
459//   Returns true on success, false on error.
460bool WorkerThread::YieldSelf() {
461  return (sched_yield() == 0);
462}
463
464
465// Fill this page with its pattern.
466bool WorkerThread::FillPage(struct page_entry *pe) {
467  // Error check arguments.
468  if (pe == 0) {
469    logprintf(0, "Process Error: Fill Page entry null\n");
470    return 0;
471  }
472
473  // Mask is the bitmask of indexes used by the pattern.
474  // It is the pattern size -1. Size is always a power of 2.
475  uint64 *memwords = static_cast<uint64*>(pe->addr);
476  int length = sat_->page_length();
477
478  if (tag_mode_) {
479    // Select tag or data as appropriate.
480    for (int i = 0; i < length / wordsize_; i++) {
481      datacast_t data;
482
483      if ((i & 0x7) == 0) {
484        data.l64 = addr_to_tag(&memwords[i]);
485      } else {
486        data.l32.l = pe->pattern->pattern(i << 1);
487        data.l32.h = pe->pattern->pattern((i << 1) + 1);
488      }
489      memwords[i] = data.l64;
490    }
491  } else {
492    // Just fill in untagged data directly.
493    for (int i = 0; i < length / wordsize_; i++) {
494      datacast_t data;
495
496      data.l32.l = pe->pattern->pattern(i << 1);
497      data.l32.h = pe->pattern->pattern((i << 1) + 1);
498      memwords[i] = data.l64;
499    }
500  }
501
502  return 1;
503}
504
505
506// Tell the thread how many pages to fill.
507void FillThread::SetFillPages(int64 num_pages_to_fill_init) {
508  num_pages_to_fill_ = num_pages_to_fill_init;
509}
510
511// Fill this page with a random pattern.
512bool FillThread::FillPageRandom(struct page_entry *pe) {
513  // Error check arguments.
514  if (pe == 0) {
515    logprintf(0, "Process Error: Fill Page entry null\n");
516    return 0;
517  }
518  if ((patternlist_ == 0) || (patternlist_->Size() == 0)) {
519    logprintf(0, "Process Error: No data patterns available\n");
520    return 0;
521  }
522
523  // Choose a random pattern for this block.
524  pe->pattern = patternlist_->GetRandomPattern();
525  if (pe->pattern == 0) {
526    logprintf(0, "Process Error: Null data pattern\n");
527    return 0;
528  }
529
530  // Actually fill the page.
531  return FillPage(pe);
532}
533
534
535// Memory fill work loop. Execute until alloted pages filled.
536bool FillThread::Work() {
537  bool result = true;
538
539  logprintf(9, "Log: Starting fill thread %d\n", thread_num_);
540
541  // We want to fill num_pages_to_fill pages, and
542  // stop when we've filled that many.
543  // We also want to capture early break
544  struct page_entry pe;
545  int64 loops = 0;
546  while (IsReadyToRun() && (loops < num_pages_to_fill_)) {
547    result = result && sat_->GetEmpty(&pe);
548    if (!result) {
549      logprintf(0, "Process Error: fill_thread failed to pop pages, "
550                "bailing\n");
551      break;
552    }
553
554    // Fill the page with pattern
555    result = result && FillPageRandom(&pe);
556    if (!result) break;
557
558    // Put the page back on the queue.
559    result = result && sat_->PutValid(&pe);
560    if (!result) {
561      logprintf(0, "Process Error: fill_thread failed to push pages, "
562                "bailing\n");
563      break;
564    }
565    loops++;
566  }
567
568  // Fill in thread status.
569  pages_copied_ = loops;
570  status_ = result;
571  logprintf(9, "Log: Completed %d: Fill thread. Status %d, %d pages filled\n",
572            thread_num_, status_, pages_copied_);
573  return result;
574}
575
576
577// Print error information about a data miscompare.
578void WorkerThread::ProcessError(struct ErrorRecord *error,
579                                int priority,
580                                const char *message) {
581  char dimm_string[256] = "";
582
583  int apic_id = apicid();
584
585  // Determine if this is a write or read error.
586  os_->Flush(error->vaddr);
587  error->reread = *(error->vaddr);
588
589  char *good = reinterpret_cast<char*>(&(error->expected));
590  char *bad = reinterpret_cast<char*>(&(error->actual));
591
592  sat_assert(error->expected != error->actual);
593  unsigned int offset = 0;
594  for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
595    if (good[offset] != bad[offset])
596      break;
597  }
598
599  error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
600
601  // Find physical address if possible.
602  error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
603
604  // Pretty print DIMM mapping if available.
605  os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
606
607  // Report parseable error.
608  if (priority < 5) {
609    // Run miscompare error through diagnoser for logging and reporting.
610    os_->error_diagnoser_->AddMiscompareError(dimm_string,
611                                              reinterpret_cast<uint64>
612                                              (error->vaddr), 1);
613
614    logprintf(priority,
615              "%s: miscompare on CPU %d(0x%s) at %p(0x%llx:%s): "
616              "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
617              message,
618              apic_id,
619              CurrentCpusFormat().c_str(),
620              error->vaddr,
621              error->paddr,
622              dimm_string,
623              error->actual,
624              error->reread,
625              error->expected);
626  }
627
628
629  // Overwrite incorrect data with correct data to prevent
630  // future miscompares when this data is reused.
631  *(error->vaddr) = error->expected;
632  os_->Flush(error->vaddr);
633}
634
635
636
637// Print error information about a data miscompare.
638void FileThread::ProcessError(struct ErrorRecord *error,
639                              int priority,
640                              const char *message) {
641  char dimm_string[256] = "";
642
643  // Determine if this is a write or read error.
644  os_->Flush(error->vaddr);
645  error->reread = *(error->vaddr);
646
647  char *good = reinterpret_cast<char*>(&(error->expected));
648  char *bad = reinterpret_cast<char*>(&(error->actual));
649
650  sat_assert(error->expected != error->actual);
651  unsigned int offset = 0;
652  for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
653    if (good[offset] != bad[offset])
654      break;
655  }
656
657  error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
658
659  // Find physical address if possible.
660  error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
661
662  // Pretty print DIMM mapping if available.
663  os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
664
665  // If crc_page_ is valid, ie checking content read back from file,
666  // track src/dst memory addresses. Otherwise catagorize as general
667  // mememory miscompare for CRC checking everywhere else.
668  if (crc_page_ != -1) {
669    int miscompare_byteoffset = static_cast<char*>(error->vbyteaddr) -
670                                static_cast<char*>(page_recs_[crc_page_].dst);
671    os_->error_diagnoser_->AddHDDMiscompareError(devicename_,
672                                                 crc_page_,
673                                                 miscompare_byteoffset,
674                                                 page_recs_[crc_page_].src,
675                                                 page_recs_[crc_page_].dst);
676  } else {
677    os_->error_diagnoser_->AddMiscompareError(dimm_string,
678                                              reinterpret_cast<uint64>
679                                              (error->vaddr), 1);
680  }
681
682  logprintf(priority,
683            "%s: miscompare on %s at %p(0x%llx:%s): read:0x%016llx, "
684            "reread:0x%016llx expected:0x%016llx\n",
685            message,
686            devicename_.c_str(),
687            error->vaddr,
688            error->paddr,
689            dimm_string,
690            error->actual,
691            error->reread,
692            error->expected);
693
694  // Overwrite incorrect data with correct data to prevent
695  // future miscompares when this data is reused.
696  *(error->vaddr) = error->expected;
697  os_->Flush(error->vaddr);
698}
699
700
701// Do a word by word result check of a region.
702// Print errors on mismatches.
703int WorkerThread::CheckRegion(void *addr,
704                              class Pattern *pattern,
705                              int64 length,
706                              int offset,
707                              int64 pattern_offset) {
708  uint64 *memblock = static_cast<uint64*>(addr);
709  const int kErrorLimit = 128;
710  int errors = 0;
711  int overflowerrors = 0;  // Count of overflowed errors.
712  bool page_error = false;
713  string errormessage("Hardware Error");
714  struct ErrorRecord
715    recorded[kErrorLimit];  // Queued errors for later printing.
716
717  // For each word in the data region.
718  for (int i = 0; i < length / wordsize_; i++) {
719    uint64 actual = memblock[i];
720    uint64 expected;
721
722    // Determine the value that should be there.
723    datacast_t data;
724    int index = 2 * i + pattern_offset;
725    data.l32.l = pattern->pattern(index);
726    data.l32.h = pattern->pattern(index + 1);
727    expected = data.l64;
728    // Check tags if necessary.
729    if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
730      expected = addr_to_tag(&memblock[i]);
731    }
732
733
734    // If the value is incorrect, save an error record for later printing.
735    if (actual != expected) {
736      if (errors < kErrorLimit) {
737        recorded[errors].actual = actual;
738        recorded[errors].expected = expected;
739        recorded[errors].vaddr = &memblock[i];
740        errors++;
741      } else {
742        page_error = true;
743        // If we have overflowed the error queue, just print the errors now.
744        logprintf(10, "Log: Error record overflow, too many miscompares!\n");
745        errormessage = "Page Error";
746        break;
747      }
748    }
749  }
750
751  // Find if this is a whole block corruption.
752  if (page_error && !tag_mode_) {
753    int patsize = patternlist_->Size();
754    for (int pat = 0; pat < patsize; pat++) {
755      class Pattern *altpattern = patternlist_->GetPattern(pat);
756      const int kGood = 0;
757      const int kBad = 1;
758      const int kGoodAgain = 2;
759      const int kNoMatch = 3;
760      int state = kGood;
761      unsigned int badstart = 0;
762      unsigned int badend = 0;
763
764      // Don't match against ourself!
765      if (pattern == altpattern)
766        continue;
767
768      for (int i = 0; i < length / wordsize_; i++) {
769        uint64 actual = memblock[i];
770        datacast_t expected;
771        datacast_t possible;
772
773        // Determine the value that should be there.
774        int index = 2 * i + pattern_offset;
775
776        expected.l32.l = pattern->pattern(index);
777        expected.l32.h = pattern->pattern(index + 1);
778
779        possible.l32.l = pattern->pattern(index);
780        possible.l32.h = pattern->pattern(index + 1);
781
782        if (state == kGood) {
783          if (actual == expected.l64) {
784            continue;
785          } else if (actual == possible.l64) {
786            badstart = i;
787            badend = i;
788            state = kBad;
789            continue;
790          } else {
791            state = kNoMatch;
792            break;
793          }
794        } else if (state == kBad) {
795          if (actual == possible.l64) {
796            badend = i;
797            continue;
798          } else if (actual == expected.l64) {
799            state = kGoodAgain;
800            continue;
801          } else {
802            state = kNoMatch;
803            break;
804          }
805        } else if (state == kGoodAgain) {
806          if (actual == expected.l64) {
807            continue;
808          } else {
809            state = kNoMatch;
810            break;
811          }
812        }
813      }
814
815      if ((state == kGoodAgain) || (state == kBad)) {
816        unsigned int blockerrors = badend - badstart + 1;
817        errormessage = "Block Error";
818        ProcessError(&recorded[0], 0, errormessage.c_str());
819        logprintf(0, "Block Error: (%p) pattern %s instead of %s, "
820                  "%d bytes from offset 0x%x to 0x%x\n",
821                  &memblock[badstart],
822                  altpattern->name(), pattern->name(),
823                  blockerrors * wordsize_,
824                  offset + badstart * wordsize_,
825                  offset + badend * wordsize_);
826        errorcount_ += blockerrors;
827        return blockerrors;
828      }
829    }
830  }
831
832
833  // Process error queue after all errors have been recorded.
834  for (int err = 0; err < errors; err++) {
835    int priority = 5;
836    if (errorcount_ + err < 30)
837      priority = 0;  // Bump up the priority for the first few errors.
838    ProcessError(&recorded[err], priority, errormessage.c_str());
839  }
840
841  if (page_error) {
842    // For each word in the data region.
843    int error_recount = 0;
844    for (int i = 0; i < length / wordsize_; i++) {
845      uint64 actual = memblock[i];
846      uint64 expected;
847      datacast_t data;
848      // Determine the value that should be there.
849      int index = 2 * i + pattern_offset;
850
851      data.l32.l = pattern->pattern(index);
852      data.l32.h = pattern->pattern(index + 1);
853      expected = data.l64;
854
855      // Check tags if necessary.
856      if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
857        expected = addr_to_tag(&memblock[i]);
858      }
859
860      // If the value is incorrect, save an error record for later printing.
861      if (actual != expected) {
862        if (error_recount < kErrorLimit) {
863          // We already reported these.
864          error_recount++;
865        } else {
866          // If we have overflowed the error queue, print the errors now.
867          struct ErrorRecord er;
868          er.actual = actual;
869          er.expected = expected;
870          er.vaddr = &memblock[i];
871
872          // Do the error printout. This will take a long time and
873          // likely change the machine state.
874          ProcessError(&er, 12, errormessage.c_str());
875          overflowerrors++;
876        }
877      }
878    }
879  }
880
881  // Keep track of observed errors.
882  errorcount_ += errors + overflowerrors;
883  return errors + overflowerrors;
884}
885
886float WorkerThread::GetCopiedData() {
887  return pages_copied_ * sat_->page_length() / kMegabyte;
888}
889
890// Calculate the CRC of a region.
891// Result check if the CRC mismatches.
892int WorkerThread::CrcCheckPage(struct page_entry *srcpe) {
893  const int blocksize = 4096;
894  const int blockwords = blocksize / wordsize_;
895  int errors = 0;
896
897  const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
898  uint64 *memblock = static_cast<uint64*>(srcpe->addr);
899  int blocks = sat_->page_length() / blocksize;
900  for (int currentblock = 0; currentblock < blocks; currentblock++) {
901    uint64 *memslice = memblock + currentblock * blockwords;
902
903    AdlerChecksum crc;
904    if (tag_mode_) {
905      AdlerAddrCrcC(memslice, blocksize, &crc, srcpe);
906    } else {
907      CalculateAdlerChecksum(memslice, blocksize, &crc);
908    }
909
910    // If the CRC does not match, we'd better look closer.
911    if (!crc.Equals(*expectedcrc)) {
912      logprintf(11, "Log: CrcCheckPage Falling through to slow compare, "
913                "CRC mismatch %s != %s\n",
914                crc.ToHexString().c_str(),
915                expectedcrc->ToHexString().c_str());
916      int errorcount = CheckRegion(memslice,
917                                   srcpe->pattern,
918                                   blocksize,
919                                   currentblock * blocksize, 0);
920      if (errorcount == 0) {
921        logprintf(0, "Log: CrcCheckPage CRC mismatch %s != %s, "
922                     "but no miscompares found.\n",
923                  crc.ToHexString().c_str(),
924                  expectedcrc->ToHexString().c_str());
925      }
926      errors += errorcount;
927    }
928  }
929
930  // For odd length transfers, we should never hit this.
931  int leftovers = sat_->page_length() % blocksize;
932  if (leftovers) {
933    uint64 *memslice = memblock + blocks * blockwords;
934    errors += CheckRegion(memslice,
935                          srcpe->pattern,
936                          leftovers,
937                          blocks * blocksize, 0);
938  }
939  return errors;
940}
941
942
943// Print error information about a data miscompare.
944void WorkerThread::ProcessTagError(struct ErrorRecord *error,
945                                   int priority,
946                                   const char *message) {
947  char dimm_string[256] = "";
948  char tag_dimm_string[256] = "";
949  bool read_error = false;
950
951  int apic_id = apicid();
952
953  // Determine if this is a write or read error.
954  os_->Flush(error->vaddr);
955  error->reread = *(error->vaddr);
956
957  // Distinguish read and write errors.
958  if (error->actual != error->reread) {
959    read_error = true;
960  }
961
962  sat_assert(error->expected != error->actual);
963
964  error->vbyteaddr = reinterpret_cast<char*>(error->vaddr);
965
966  // Find physical address if possible.
967  error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
968  error->tagpaddr = os_->VirtualToPhysical(error->tagvaddr);
969
970  // Pretty print DIMM mapping if available.
971  os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
972  // Pretty print DIMM mapping if available.
973  os_->FindDimm(error->tagpaddr, tag_dimm_string, sizeof(tag_dimm_string));
974
975  // Report parseable error.
976  if (priority < 5) {
977    logprintf(priority,
978              "%s: Tag from %p(0x%llx:%s) (%s) "
979              "miscompare on CPU %d(0x%s) at %p(0x%llx:%s): "
980              "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
981              message,
982              error->tagvaddr, error->tagpaddr,
983              tag_dimm_string,
984              read_error ? "read error" : "write error",
985              apic_id,
986              CurrentCpusFormat().c_str(),
987              error->vaddr,
988              error->paddr,
989              dimm_string,
990              error->actual,
991              error->reread,
992              error->expected);
993  }
994
995  errorcount_ += 1;
996
997  // Overwrite incorrect data with correct data to prevent
998  // future miscompares when this data is reused.
999  *(error->vaddr) = error->expected;
1000  os_->Flush(error->vaddr);
1001}
1002
1003
1004// Print out and log a tag error.
1005bool WorkerThread::ReportTagError(
1006    uint64 *mem64,
1007    uint64 actual,
1008    uint64 tag) {
1009  struct ErrorRecord er;
1010  er.actual = actual;
1011
1012  er.expected = tag;
1013  er.vaddr = mem64;
1014
1015  // Generate vaddr from tag.
1016  er.tagvaddr = reinterpret_cast<uint64*>(actual);
1017
1018  ProcessTagError(&er, 0, "Hardware Error");
1019  return true;
1020}
1021
1022// C implementation of Adler memory copy, with memory tagging.
1023bool WorkerThread::AdlerAddrMemcpyC(uint64 *dstmem64,
1024                                    uint64 *srcmem64,
1025                                    unsigned int size_in_bytes,
1026                                    AdlerChecksum *checksum,
1027                                    struct page_entry *pe) {
1028  // Use this data wrapper to access memory with 64bit read/write.
1029  datacast_t data;
1030  datacast_t dstdata;
1031  unsigned int count = size_in_bytes / sizeof(data);
1032
1033  if (count > ((1U) << 19)) {
1034    // Size is too large, must be strictly less than 512 KB.
1035    return false;
1036  }
1037
1038  uint64 a1 = 1;
1039  uint64 a2 = 1;
1040  uint64 b1 = 0;
1041  uint64 b2 = 0;
1042
1043  class Pattern *pattern = pe->pattern;
1044
1045  unsigned int i = 0;
1046  while (i < count) {
1047    // Process 64 bits at a time.
1048    if ((i & 0x7) == 0) {
1049      data.l64 = srcmem64[i];
1050      dstdata.l64 = dstmem64[i];
1051      uint64 src_tag = addr_to_tag(&srcmem64[i]);
1052      uint64 dst_tag = addr_to_tag(&dstmem64[i]);
1053      // Detect if tags have been corrupted.
1054      if (data.l64 != src_tag)
1055        ReportTagError(&srcmem64[i], data.l64, src_tag);
1056      if (dstdata.l64 != dst_tag)
1057        ReportTagError(&dstmem64[i], dstdata.l64, dst_tag);
1058
1059      data.l32.l = pattern->pattern(i << 1);
1060      data.l32.h = pattern->pattern((i << 1) + 1);
1061      a1 = a1 + data.l32.l;
1062      b1 = b1 + a1;
1063      a1 = a1 + data.l32.h;
1064      b1 = b1 + a1;
1065
1066      data.l64  = dst_tag;
1067      dstmem64[i] = data.l64;
1068
1069    } else {
1070      data.l64 = srcmem64[i];
1071      a1 = a1 + data.l32.l;
1072      b1 = b1 + a1;
1073      a1 = a1 + data.l32.h;
1074      b1 = b1 + a1;
1075      dstmem64[i] = data.l64;
1076    }
1077    i++;
1078
1079    data.l64 = srcmem64[i];
1080    a2 = a2 + data.l32.l;
1081    b2 = b2 + a2;
1082    a2 = a2 + data.l32.h;
1083    b2 = b2 + a2;
1084    dstmem64[i] = data.l64;
1085    i++;
1086  }
1087  checksum->Set(a1, a2, b1, b2);
1088  return true;
1089}
1090
1091// x86_64 SSE2 assembly implementation of Adler memory copy, with address
1092// tagging added as a second step. This is useful for debugging failures
1093// that only occur when SSE / nontemporal writes are used.
1094bool WorkerThread::AdlerAddrMemcpyWarm(uint64 *dstmem64,
1095                                       uint64 *srcmem64,
1096                                       unsigned int size_in_bytes,
1097                                       AdlerChecksum *checksum,
1098                                       struct page_entry *pe) {
1099  // Do ASM copy, ignore checksum.
1100  AdlerChecksum ignored_checksum;
1101  os_->AdlerMemcpyWarm(dstmem64, srcmem64, size_in_bytes, &ignored_checksum);
1102
1103  // Force cache flush.
1104  int length = size_in_bytes / sizeof(*dstmem64);
1105  for (int i = 0; i < length; i += sizeof(*dstmem64)) {
1106    os_->FastFlush(dstmem64 + i);
1107    os_->FastFlush(srcmem64 + i);
1108  }
1109  // Check results.
1110  AdlerAddrCrcC(srcmem64, size_in_bytes, checksum, pe);
1111  // Patch up address tags.
1112  TagAddrC(dstmem64, size_in_bytes);
1113  return true;
1114}
1115
1116// Retag pages..
1117bool WorkerThread::TagAddrC(uint64 *memwords,
1118                            unsigned int size_in_bytes) {
1119  // Mask is the bitmask of indexes used by the pattern.
1120  // It is the pattern size -1. Size is always a power of 2.
1121
1122  // Select tag or data as appropriate.
1123  int length = size_in_bytes / wordsize_;
1124  for (int i = 0; i < length; i += 8) {
1125    datacast_t data;
1126    data.l64 = addr_to_tag(&memwords[i]);
1127    memwords[i] = data.l64;
1128  }
1129  return true;
1130}
1131
1132// C implementation of Adler memory crc.
1133bool WorkerThread::AdlerAddrCrcC(uint64 *srcmem64,
1134                                 unsigned int size_in_bytes,
1135                                 AdlerChecksum *checksum,
1136                                 struct page_entry *pe) {
1137  // Use this data wrapper to access memory with 64bit read/write.
1138  datacast_t data;
1139  unsigned int count = size_in_bytes / sizeof(data);
1140
1141  if (count > ((1U) << 19)) {
1142    // Size is too large, must be strictly less than 512 KB.
1143    return false;
1144  }
1145
1146  uint64 a1 = 1;
1147  uint64 a2 = 1;
1148  uint64 b1 = 0;
1149  uint64 b2 = 0;
1150
1151  class Pattern *pattern = pe->pattern;
1152
1153  unsigned int i = 0;
1154  while (i < count) {
1155    // Process 64 bits at a time.
1156    if ((i & 0x7) == 0) {
1157      data.l64 = srcmem64[i];
1158      uint64 src_tag = addr_to_tag(&srcmem64[i]);
1159      // Check that tags match expected.
1160      if (data.l64 != src_tag)
1161        ReportTagError(&srcmem64[i], data.l64, src_tag);
1162
1163      data.l32.l = pattern->pattern(i << 1);
1164      data.l32.h = pattern->pattern((i << 1) + 1);
1165      a1 = a1 + data.l32.l;
1166      b1 = b1 + a1;
1167      a1 = a1 + data.l32.h;
1168      b1 = b1 + a1;
1169    } else {
1170      data.l64 = srcmem64[i];
1171      a1 = a1 + data.l32.l;
1172      b1 = b1 + a1;
1173      a1 = a1 + data.l32.h;
1174      b1 = b1 + a1;
1175    }
1176    i++;
1177
1178    data.l64 = srcmem64[i];
1179    a2 = a2 + data.l32.l;
1180    b2 = b2 + a2;
1181    a2 = a2 + data.l32.h;
1182    b2 = b2 + a2;
1183    i++;
1184  }
1185  checksum->Set(a1, a2, b1, b2);
1186  return true;
1187}
1188
1189// Copy a block of memory quickly, while keeping a CRC of the data.
1190// Result check if the CRC mismatches.
1191int WorkerThread::CrcCopyPage(struct page_entry *dstpe,
1192                              struct page_entry *srcpe) {
1193  int errors = 0;
1194  const int blocksize = 4096;
1195  const int blockwords = blocksize / wordsize_;
1196  int blocks = sat_->page_length() / blocksize;
1197
1198  // Base addresses for memory copy
1199  uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1200  uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1201  // Remember the expected CRC
1202  const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1203
1204  for (int currentblock = 0; currentblock < blocks; currentblock++) {
1205    uint64 *targetmem = targetmembase + currentblock * blockwords;
1206    uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1207
1208    AdlerChecksum crc;
1209    if (tag_mode_) {
1210      AdlerAddrMemcpyC(targetmem, sourcemem, blocksize, &crc, srcpe);
1211    } else {
1212      AdlerMemcpyC(targetmem, sourcemem, blocksize, &crc);
1213    }
1214
1215    // Investigate miscompares.
1216    if (!crc.Equals(*expectedcrc)) {
1217      logprintf(11, "Log: CrcCopyPage Falling through to slow compare, "
1218                "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1219                expectedcrc->ToHexString().c_str());
1220      int errorcount = CheckRegion(sourcemem,
1221                                   srcpe->pattern,
1222                                   blocksize,
1223                                   currentblock * blocksize, 0);
1224      if (errorcount == 0) {
1225        logprintf(0, "Log: CrcCopyPage CRC mismatch %s != %s, "
1226                     "but no miscompares found. Retrying with fresh data.\n",
1227                  crc.ToHexString().c_str(),
1228                  expectedcrc->ToHexString().c_str());
1229        if (!tag_mode_) {
1230          // Copy the data originally read from this region back again.
1231          // This data should have any corruption read originally while
1232          // calculating the CRC.
1233          memcpy(sourcemem, targetmem, blocksize);
1234          errorcount = CheckRegion(sourcemem,
1235                                   srcpe->pattern,
1236                                   blocksize,
1237                                   currentblock * blocksize, 0);
1238          if (errorcount == 0) {
1239            int apic_id = apicid();
1240            logprintf(0, "Process Error: CPU %d(0x%s) CrcCopyPage "
1241                         "CRC mismatch %s != %s, "
1242                         "but no miscompares found on second pass.\n",
1243                      apic_id, CurrentCpusFormat().c_str(),
1244                      crc.ToHexString().c_str(),
1245                      expectedcrc->ToHexString().c_str());
1246            struct ErrorRecord er;
1247            er.actual = sourcemem[0];
1248            er.expected = 0x0;
1249            er.vaddr = sourcemem;
1250            ProcessError(&er, 0, "Hardware Error");
1251          }
1252        }
1253      }
1254      errors += errorcount;
1255    }
1256  }
1257
1258  // For odd length transfers, we should never hit this.
1259  int leftovers = sat_->page_length() % blocksize;
1260  if (leftovers) {
1261    uint64 *targetmem = targetmembase + blocks * blockwords;
1262    uint64 *sourcemem = sourcemembase + blocks * blockwords;
1263
1264    errors += CheckRegion(sourcemem,
1265                          srcpe->pattern,
1266                          leftovers,
1267                          blocks * blocksize, 0);
1268    int leftoverwords = leftovers / wordsize_;
1269    for (int i = 0; i < leftoverwords; i++) {
1270      targetmem[i] = sourcemem[i];
1271    }
1272  }
1273
1274  // Update pattern reference to reflect new contents.
1275  dstpe->pattern = srcpe->pattern;
1276
1277  // Clean clean clean the errors away.
1278  if (errors) {
1279    // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1280    // cause bad data to be propogated across the page.
1281    FillPage(dstpe);
1282  }
1283  return errors;
1284}
1285
1286
1287
1288// Invert a block of memory quickly, traversing downwards.
1289int InvertThread::InvertPageDown(struct page_entry *srcpe) {
1290  const int blocksize = 4096;
1291  const int blockwords = blocksize / wordsize_;
1292  int blocks = sat_->page_length() / blocksize;
1293
1294  // Base addresses for memory copy
1295  unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1296
1297  for (int currentblock = blocks-1; currentblock >= 0; currentblock--) {
1298    unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1299    for (int i = blockwords - 32; i >= 0; i -= 32) {
1300      for (int index = i + 31; index >= i; --index) {
1301        unsigned int actual = sourcemem[index];
1302        sourcemem[index] = ~actual;
1303      }
1304      OsLayer::FastFlush(&sourcemem[i]);
1305    }
1306  }
1307
1308  return 0;
1309}
1310
1311// Invert a block of memory, traversing upwards.
1312int InvertThread::InvertPageUp(struct page_entry *srcpe) {
1313  const int blocksize = 4096;
1314  const int blockwords = blocksize / wordsize_;
1315  int blocks = sat_->page_length() / blocksize;
1316
1317  // Base addresses for memory copy
1318  unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1319
1320  for (int currentblock = 0; currentblock < blocks; currentblock++) {
1321    unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1322    for (int i = 0; i < blockwords; i += 32) {
1323      for (int index = i; index <= i + 31; ++index) {
1324        unsigned int actual = sourcemem[index];
1325        sourcemem[index] = ~actual;
1326      }
1327      OsLayer::FastFlush(&sourcemem[i]);
1328    }
1329  }
1330  return 0;
1331}
1332
1333// Copy a block of memory quickly, while keeping a CRC of the data.
1334// Result check if the CRC mismatches. Warm the CPU while running
1335int WorkerThread::CrcWarmCopyPage(struct page_entry *dstpe,
1336                                  struct page_entry *srcpe) {
1337  int errors = 0;
1338  const int blocksize = 4096;
1339  const int blockwords = blocksize / wordsize_;
1340  int blocks = sat_->page_length() / blocksize;
1341
1342  // Base addresses for memory copy
1343  uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1344  uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1345  // Remember the expected CRC
1346  const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1347
1348  for (int currentblock = 0; currentblock < blocks; currentblock++) {
1349    uint64 *targetmem = targetmembase + currentblock * blockwords;
1350    uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1351
1352    AdlerChecksum crc;
1353    if (tag_mode_) {
1354      AdlerAddrMemcpyWarm(targetmem, sourcemem, blocksize, &crc, srcpe);
1355    } else {
1356      os_->AdlerMemcpyWarm(targetmem, sourcemem, blocksize, &crc);
1357    }
1358
1359    // Investigate miscompares.
1360    if (!crc.Equals(*expectedcrc)) {
1361      logprintf(11, "Log: CrcWarmCopyPage Falling through to slow compare, "
1362                "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1363                expectedcrc->ToHexString().c_str());
1364      int errorcount = CheckRegion(sourcemem,
1365                                   srcpe->pattern,
1366                                   blocksize,
1367                                   currentblock * blocksize, 0);
1368      if (errorcount == 0) {
1369        logprintf(0, "Log: CrcWarmCopyPage CRC mismatch %s != %s, "
1370                     "but no miscompares found. Retrying with fresh data.\n",
1371                  crc.ToHexString().c_str(),
1372                  expectedcrc->ToHexString().c_str());
1373        if (!tag_mode_) {
1374          // Copy the data originally read from this region back again.
1375          // This data should have any corruption read originally while
1376          // calculating the CRC.
1377          memcpy(sourcemem, targetmem, blocksize);
1378          errorcount = CheckRegion(sourcemem,
1379                                   srcpe->pattern,
1380                                   blocksize,
1381                                   currentblock * blocksize, 0);
1382          if (errorcount == 0) {
1383            int apic_id = apicid();
1384            logprintf(0, "Process Error: CPU %d(0x%s) CrciWarmCopyPage "
1385                         "CRC mismatch %s != %s, "
1386                         "but no miscompares found on second pass.\n",
1387                      apic_id, CurrentCpusFormat().c_str(),
1388                      crc.ToHexString().c_str(),
1389                      expectedcrc->ToHexString().c_str());
1390            struct ErrorRecord er;
1391            er.actual = sourcemem[0];
1392            er.expected = 0x0;
1393            er.vaddr = sourcemem;
1394            ProcessError(&er, 0, "Hardware Error");
1395          }
1396        }
1397      }
1398      errors += errorcount;
1399    }
1400  }
1401
1402  // For odd length transfers, we should never hit this.
1403  int leftovers = sat_->page_length() % blocksize;
1404  if (leftovers) {
1405    uint64 *targetmem = targetmembase + blocks * blockwords;
1406    uint64 *sourcemem = sourcemembase + blocks * blockwords;
1407
1408    errors += CheckRegion(sourcemem,
1409                          srcpe->pattern,
1410                          leftovers,
1411                          blocks * blocksize, 0);
1412    int leftoverwords = leftovers / wordsize_;
1413    for (int i = 0; i < leftoverwords; i++) {
1414      targetmem[i] = sourcemem[i];
1415    }
1416  }
1417
1418  // Update pattern reference to reflect new contents.
1419  dstpe->pattern = srcpe->pattern;
1420
1421  // Clean clean clean the errors away.
1422  if (errors) {
1423    // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1424    // cause bad data to be propogated across the page.
1425    FillPage(dstpe);
1426  }
1427  return errors;
1428}
1429
1430
1431
1432// Memory check work loop. Execute until done, then exhaust pages.
1433bool CheckThread::Work() {
1434  struct page_entry pe;
1435  bool result = true;
1436  int64 loops = 0;
1437
1438  logprintf(9, "Log: Starting Check thread %d\n", thread_num_);
1439
1440  // We want to check all the pages, and
1441  // stop when there aren't any left.
1442  while (true) {
1443    result = result && sat_->GetValid(&pe);
1444    if (!result) {
1445      if (IsReadyToRunNoPause())
1446        logprintf(0, "Process Error: check_thread failed to pop pages, "
1447                  "bailing\n");
1448      else
1449        result = true;
1450      break;
1451    }
1452
1453    // Do the result check.
1454    CrcCheckPage(&pe);
1455
1456    // Push pages back on the valid queue if we are still going,
1457    // throw them out otherwise.
1458    if (IsReadyToRunNoPause())
1459      result = result && sat_->PutValid(&pe);
1460    else
1461      result = result && sat_->PutEmpty(&pe);
1462    if (!result) {
1463      logprintf(0, "Process Error: check_thread failed to push pages, "
1464                "bailing\n");
1465      break;
1466    }
1467    loops++;
1468  }
1469
1470  pages_copied_ = loops;
1471  status_ = result;
1472  logprintf(9, "Log: Completed %d: Check thread. Status %d, %d pages checked\n",
1473            thread_num_, status_, pages_copied_);
1474  return result;
1475}
1476
1477
1478// Memory copy work loop. Execute until marked done.
1479bool CopyThread::Work() {
1480  struct page_entry src;
1481  struct page_entry dst;
1482  bool result = true;
1483  int64 loops = 0;
1484
1485  logprintf(9, "Log: Starting copy thread %d: cpu %s, mem %x\n",
1486            thread_num_, cpuset_format(&cpu_mask_).c_str(), tag_);
1487
1488  while (IsReadyToRun()) {
1489    // Pop the needed pages.
1490    result = result && sat_->GetValid(&src, tag_);
1491    result = result && sat_->GetEmpty(&dst, tag_);
1492    if (!result) {
1493      logprintf(0, "Process Error: copy_thread failed to pop pages, "
1494                "bailing\n");
1495      break;
1496    }
1497
1498    // Force errors for unittests.
1499    if (sat_->error_injection()) {
1500      if (loops == 8) {
1501        char *addr = reinterpret_cast<char*>(src.addr);
1502        int offset = random() % sat_->page_length();
1503        addr[offset] = 0xba;
1504      }
1505    }
1506
1507    // We can use memcpy, or CRC check while we copy.
1508    if (sat_->warm()) {
1509      CrcWarmCopyPage(&dst, &src);
1510    } else if (sat_->strict()) {
1511      CrcCopyPage(&dst, &src);
1512    } else {
1513      memcpy(dst.addr, src.addr, sat_->page_length());
1514      dst.pattern = src.pattern;
1515    }
1516
1517    result = result && sat_->PutValid(&dst);
1518    result = result && sat_->PutEmpty(&src);
1519
1520    // Copy worker-threads yield themselves at the end of each copy loop,
1521    // to avoid threads from preempting each other in the middle of the inner
1522    // copy-loop. Cooperations between Copy worker-threads results in less
1523    // unnecessary cache thrashing (which happens when context-switching in the
1524    // middle of the inner copy-loop).
1525    YieldSelf();
1526
1527    if (!result) {
1528      logprintf(0, "Process Error: copy_thread failed to push pages, "
1529                "bailing\n");
1530      break;
1531    }
1532    loops++;
1533  }
1534
1535  pages_copied_ = loops;
1536  status_ = result;
1537  logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1538            thread_num_, status_, pages_copied_);
1539  return result;
1540}
1541
1542// Memory invert work loop. Execute until marked done.
1543bool InvertThread::Work() {
1544  struct page_entry src;
1545  bool result = true;
1546  int64 loops = 0;
1547
1548  logprintf(9, "Log: Starting invert thread %d\n", thread_num_);
1549
1550  while (IsReadyToRun()) {
1551    // Pop the needed pages.
1552    result = result && sat_->GetValid(&src);
1553    if (!result) {
1554      logprintf(0, "Process Error: invert_thread failed to pop pages, "
1555                "bailing\n");
1556      break;
1557    }
1558
1559    if (sat_->strict())
1560      CrcCheckPage(&src);
1561
1562    // For the same reason CopyThread yields itself (see YieldSelf comment
1563    // in CopyThread::Work(), InvertThread yields itself after each invert
1564    // operation to improve cooperation between different worker threads
1565    // stressing the memory/cache.
1566    InvertPageUp(&src);
1567    YieldSelf();
1568    InvertPageDown(&src);
1569    YieldSelf();
1570    InvertPageDown(&src);
1571    YieldSelf();
1572    InvertPageUp(&src);
1573    YieldSelf();
1574
1575    if (sat_->strict())
1576      CrcCheckPage(&src);
1577
1578    result = result && sat_->PutValid(&src);
1579    if (!result) {
1580      logprintf(0, "Process Error: invert_thread failed to push pages, "
1581                "bailing\n");
1582      break;
1583    }
1584    loops++;
1585  }
1586
1587  pages_copied_ = loops * 2;
1588  status_ = result;
1589  logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1590            thread_num_, status_, pages_copied_);
1591  return result;
1592}
1593
1594
1595// Set file name to use for File IO.
1596void FileThread::SetFile(const char *filename_init) {
1597  filename_ = filename_init;
1598  devicename_ = os_->FindFileDevice(filename_);
1599}
1600
1601// Open the file for access.
1602bool FileThread::OpenFile(int *pfile) {
1603  bool no_O_DIRECT = false;
1604  int flags = O_RDWR | O_CREAT | O_SYNC;
1605  int fd = open(filename_.c_str(), flags | O_DIRECT, 0644);
1606  if (O_DIRECT != 0 && fd < 0 && errno == EINVAL) {
1607    no_O_DIRECT = true;
1608    fd = open(filename_.c_str(), flags, 0644); // Try without O_DIRECT
1609  }
1610  if (fd < 0) {
1611    logprintf(0, "Process Error: Failed to create file %s!!\n",
1612              filename_.c_str());
1613    pages_copied_ = 0;
1614    return false;
1615  }
1616  if (no_O_DIRECT)
1617    os_->ActivateFlushPageCache(); // Not using O_DIRECT fixed EINVAL
1618  *pfile = fd;
1619  return true;
1620}
1621
1622// Close the file.
1623bool FileThread::CloseFile(int fd) {
1624  close(fd);
1625  return true;
1626}
1627
1628// Check sector tagging.
1629bool FileThread::SectorTagPage(struct page_entry *src, int block) {
1630  int page_length = sat_->page_length();
1631  struct FileThread::SectorTag *tag =
1632    (struct FileThread::SectorTag *)(src->addr);
1633
1634  // Tag each sector.
1635  unsigned char magic = ((0xba + thread_num_) & 0xff);
1636  for (int sec = 0; sec < page_length / 512; sec++) {
1637    tag[sec].magic = magic;
1638    tag[sec].block = block & 0xff;
1639    tag[sec].sector = sec & 0xff;
1640    tag[sec].pass = pass_ & 0xff;
1641  }
1642  return true;
1643}
1644
1645bool FileThread::WritePageToFile(int fd, struct page_entry *src) {
1646  int page_length = sat_->page_length();
1647  // Fill the file with our data.
1648  int64 size = write(fd, src->addr, page_length);
1649
1650  if (size != page_length) {
1651    os_->ErrorReport(devicename_.c_str(), "write-error", 1);
1652    errorcount_++;
1653    logprintf(0, "Block Error: file_thread failed to write, "
1654              "bailing\n");
1655    return false;
1656  }
1657  return true;
1658}
1659
1660// Write the data to the file.
1661bool FileThread::WritePages(int fd) {
1662  int strict = sat_->strict();
1663
1664  // Start fresh at beginning of file for each batch of pages.
1665  lseek64(fd, 0, SEEK_SET);
1666  for (int i = 0; i < sat_->disk_pages(); i++) {
1667    struct page_entry src;
1668    if (!GetValidPage(&src))
1669      return false;
1670    // Save expected pattern.
1671    page_recs_[i].pattern = src.pattern;
1672    page_recs_[i].src = src.addr;
1673
1674    // Check data correctness.
1675    if (strict)
1676      CrcCheckPage(&src);
1677
1678    SectorTagPage(&src, i);
1679
1680    bool result = WritePageToFile(fd, &src);
1681
1682    if (!PutEmptyPage(&src))
1683      return false;
1684
1685    if (!result)
1686      return false;
1687  }
1688  return os_->FlushPageCache(); // If O_DIRECT worked, this will be a NOP.
1689}
1690
1691// Copy data from file into memory block.
1692bool FileThread::ReadPageFromFile(int fd, struct page_entry *dst) {
1693  int page_length = sat_->page_length();
1694
1695  // Do the actual read.
1696  int64 size = read(fd, dst->addr, page_length);
1697  if (size != page_length) {
1698    os_->ErrorReport(devicename_.c_str(), "read-error", 1);
1699    logprintf(0, "Block Error: file_thread failed to read, "
1700              "bailing\n");
1701    errorcount_++;
1702    return false;
1703  }
1704  return true;
1705}
1706
1707// Check sector tagging.
1708bool FileThread::SectorValidatePage(const struct PageRec &page,
1709                                    struct page_entry *dst, int block) {
1710  // Error injection.
1711  static int calls = 0;
1712  calls++;
1713
1714  // Do sector tag compare.
1715  int firstsector = -1;
1716  int lastsector = -1;
1717  bool badsector = false;
1718  int page_length = sat_->page_length();
1719
1720  // Cast data block into an array of tagged sectors.
1721  struct FileThread::SectorTag *tag =
1722  (struct FileThread::SectorTag *)(dst->addr);
1723
1724  sat_assert(sizeof(*tag) == 512);
1725
1726  // Error injection.
1727  if (sat_->error_injection()) {
1728    if (calls == 2) {
1729      for (int badsec = 8; badsec < 17; badsec++)
1730        tag[badsec].pass = 27;
1731    }
1732    if (calls == 18) {
1733      (static_cast<int32*>(dst->addr))[27] = 0xbadda7a;
1734    }
1735  }
1736
1737  // Check each sector for the correct tag we added earlier,
1738  // then revert the tag to the to normal data pattern.
1739  unsigned char magic = ((0xba + thread_num_) & 0xff);
1740  for (int sec = 0; sec < page_length / 512; sec++) {
1741    // Check magic tag.
1742    if ((tag[sec].magic != magic) ||
1743        (tag[sec].block != (block & 0xff)) ||
1744        (tag[sec].sector != (sec & 0xff)) ||
1745        (tag[sec].pass != (pass_ & 0xff))) {
1746      // Offset calculation for tag location.
1747      int offset = sec * sizeof(SectorTag);
1748      if (tag[sec].block != (block & 0xff))
1749        offset += 1 * sizeof(uint8);
1750      else if (tag[sec].sector != (sec & 0xff))
1751        offset += 2 * sizeof(uint8);
1752      else if (tag[sec].pass != (pass_ & 0xff))
1753        offset += 3 * sizeof(uint8);
1754
1755      // Run sector tag error through diagnoser for logging and reporting.
1756      errorcount_ += 1;
1757      os_->error_diagnoser_->AddHDDSectorTagError(devicename_, tag[sec].block,
1758                                                  offset,
1759                                                  tag[sec].sector,
1760                                                  page.src, page.dst);
1761
1762      logprintf(5, "Sector Error: Sector tag @ 0x%x, pass %d/%d. "
1763                "sec %x/%x, block %d/%d, magic %x/%x, File: %s \n",
1764                block * page_length + 512 * sec,
1765                (pass_ & 0xff), (unsigned int)tag[sec].pass,
1766                sec, (unsigned int)tag[sec].sector,
1767                block, (unsigned int)tag[sec].block,
1768                magic, (unsigned int)tag[sec].magic,
1769                filename_.c_str());
1770
1771      // Keep track of first and last bad sector.
1772      if (firstsector == -1)
1773        firstsector = (block * page_length / 512) + sec;
1774      lastsector = (block * page_length / 512) + sec;
1775      badsector = true;
1776    }
1777    // Patch tag back to proper pattern.
1778    unsigned int *addr = (unsigned int *)(&tag[sec]);
1779    *addr = dst->pattern->pattern(512 * sec / sizeof(*addr));
1780  }
1781
1782  // If we found sector errors:
1783  if (badsector == true) {
1784    logprintf(5, "Log: file sector miscompare at offset %x-%x. File: %s\n",
1785              firstsector * 512,
1786              ((lastsector + 1) * 512) - 1,
1787              filename_.c_str());
1788
1789    // Either exit immediately, or patch the data up and continue.
1790    if (sat_->stop_on_error()) {
1791      exit(1);
1792    } else {
1793      // Patch up bad pages.
1794      for (int block = (firstsector * 512) / page_length;
1795          block <= (lastsector * 512) / page_length;
1796          block++) {
1797        unsigned int *memblock = static_cast<unsigned int *>(dst->addr);
1798        int length = page_length / wordsize_;
1799        for (int i = 0; i < length; i++) {
1800          memblock[i] = dst->pattern->pattern(i);
1801        }
1802      }
1803    }
1804  }
1805  return true;
1806}
1807
1808// Get memory for an incoming data transfer..
1809bool FileThread::PagePrepare() {
1810  // We can only do direct IO to SAT pages if it is normal mem.
1811  page_io_ = os_->normal_mem();
1812
1813  // Init a local buffer if we need it.
1814  if (!page_io_) {
1815#ifdef HAVE_POSIX_MEMALIGN
1816    int result = posix_memalign(&local_page_, 512, sat_->page_length());
1817#else
1818    local_page_ = memalign(512, sat_->page_length());
1819    int result = (local_page_ == 0);
1820#endif
1821    if (result) {
1822      logprintf(0, "Process Error: disk thread posix_memalign "
1823                   "returned %d (fail)\n",
1824                result);
1825      status_ = false;
1826      return false;
1827    }
1828  }
1829  return true;
1830}
1831
1832
1833// Remove memory allocated for data transfer.
1834bool FileThread::PageTeardown() {
1835  // Free a local buffer if we need to.
1836  if (!page_io_) {
1837    free(local_page_);
1838  }
1839  return true;
1840}
1841
1842
1843
1844// Get memory for an incoming data transfer..
1845bool FileThread::GetEmptyPage(struct page_entry *dst) {
1846  if (page_io_) {
1847    if (!sat_->GetEmpty(dst))
1848      return false;
1849  } else {
1850    dst->addr = local_page_;
1851    dst->offset = 0;
1852    dst->pattern = 0;
1853  }
1854  return true;
1855}
1856
1857// Get memory for an outgoing data transfer..
1858bool FileThread::GetValidPage(struct page_entry *src) {
1859  struct page_entry tmp;
1860  if (!sat_->GetValid(&tmp))
1861    return false;
1862  if (page_io_) {
1863    *src = tmp;
1864    return true;
1865  } else {
1866    src->addr = local_page_;
1867    src->offset = 0;
1868    CrcCopyPage(src, &tmp);
1869    if (!sat_->PutValid(&tmp))
1870      return false;
1871  }
1872  return true;
1873}
1874
1875
1876// Throw out a used empty page.
1877bool FileThread::PutEmptyPage(struct page_entry *src) {
1878  if (page_io_) {
1879    if (!sat_->PutEmpty(src))
1880      return false;
1881  }
1882  return true;
1883}
1884
1885// Throw out a used, filled page.
1886bool FileThread::PutValidPage(struct page_entry *src) {
1887  if (page_io_) {
1888    if (!sat_->PutValid(src))
1889      return false;
1890  }
1891  return true;
1892}
1893
1894// Copy data from file into memory blocks.
1895bool FileThread::ReadPages(int fd) {
1896  int page_length = sat_->page_length();
1897  int strict = sat_->strict();
1898  bool result = true;
1899
1900  // Read our data back out of the file, into it's new location.
1901  lseek64(fd, 0, SEEK_SET);
1902  for (int i = 0; i < sat_->disk_pages(); i++) {
1903    struct page_entry dst;
1904    if (!GetEmptyPage(&dst))
1905      return false;
1906    // Retrieve expected pattern.
1907    dst.pattern = page_recs_[i].pattern;
1908    // Update page recordpage record.
1909    page_recs_[i].dst = dst.addr;
1910
1911    // Read from the file into destination page.
1912    if (!ReadPageFromFile(fd, &dst)) {
1913        PutEmptyPage(&dst);
1914        return false;
1915    }
1916
1917    SectorValidatePage(page_recs_[i], &dst, i);
1918
1919    // Ensure that the transfer ended up with correct data.
1920    if (strict) {
1921      // Record page index currently CRC checked.
1922      crc_page_ = i;
1923      int errors = CrcCheckPage(&dst);
1924      if (errors) {
1925        logprintf(5, "Log: file miscompare at block %d, "
1926                  "offset %x-%x. File: %s\n",
1927                  i, i * page_length, ((i + 1) * page_length) - 1,
1928                  filename_.c_str());
1929        result = false;
1930      }
1931      crc_page_ = -1;
1932      errorcount_ += errors;
1933    }
1934    if (!PutValidPage(&dst))
1935      return false;
1936  }
1937  return result;
1938}
1939
1940// File IO work loop. Execute until marked done.
1941bool FileThread::Work() {
1942  bool result = true;
1943  int64 loops = 0;
1944
1945  logprintf(9, "Log: Starting file thread %d, file %s, device %s\n",
1946            thread_num_,
1947            filename_.c_str(),
1948            devicename_.c_str());
1949
1950  if (!PagePrepare()) {
1951    status_ = false;
1952    return false;
1953  }
1954
1955  // Open the data IO file.
1956  int fd = 0;
1957  if (!OpenFile(&fd)) {
1958    status_ = false;
1959    return false;
1960  }
1961
1962  pass_ = 0;
1963
1964  // Load patterns into page records.
1965  page_recs_ = new struct PageRec[sat_->disk_pages()];
1966  for (int i = 0; i < sat_->disk_pages(); i++) {
1967    page_recs_[i].pattern = new struct Pattern();
1968  }
1969
1970  // Loop until done.
1971  while (IsReadyToRun()) {
1972    // Do the file write.
1973    if (!(result = result && WritePages(fd)))
1974      break;
1975
1976    // Do the file read.
1977    if (!(result = result && ReadPages(fd)))
1978      break;
1979
1980    loops++;
1981    pass_ = loops;
1982  }
1983
1984  pages_copied_ = loops * sat_->disk_pages();
1985
1986  // Clean up.
1987  CloseFile(fd);
1988  PageTeardown();
1989
1990  logprintf(9, "Log: Completed %d: file thread status %d, %d pages copied\n",
1991            thread_num_, status_, pages_copied_);
1992  // Failure to read from device indicates hardware,
1993  // rather than procedural SW error.
1994  status_ = true;
1995  return true;
1996}
1997
1998bool NetworkThread::IsNetworkStopSet() {
1999  return !IsReadyToRunNoPause();
2000}
2001
2002bool NetworkSlaveThread::IsNetworkStopSet() {
2003  // This thread has no completion status.
2004  // It finishes whever there is no more data to be
2005  // passed back.
2006  return true;
2007}
2008
2009// Set ip name to use for Network IO.
2010void NetworkThread::SetIP(const char *ipaddr_init) {
2011  strncpy(ipaddr_, ipaddr_init, 256);
2012}
2013
2014// Create a socket.
2015// Return 0 on error.
2016bool NetworkThread::CreateSocket(int *psocket) {
2017  int sock = socket(AF_INET, SOCK_STREAM, 0);
2018  if (sock == -1) {
2019    logprintf(0, "Process Error: Cannot open socket\n");
2020    pages_copied_ = 0;
2021    status_ = false;
2022    return false;
2023  }
2024  *psocket = sock;
2025  return true;
2026}
2027
2028// Close the socket.
2029bool NetworkThread::CloseSocket(int sock) {
2030  close(sock);
2031  return true;
2032}
2033
2034// Initiate the tcp connection.
2035bool NetworkThread::Connect(int sock) {
2036  struct sockaddr_in dest_addr;
2037  dest_addr.sin_family = AF_INET;
2038  dest_addr.sin_port = htons(kNetworkPort);
2039  memset(&(dest_addr.sin_zero), '\0', sizeof(dest_addr.sin_zero));
2040
2041  // Translate dot notation to u32.
2042  if (inet_aton(ipaddr_, &dest_addr.sin_addr) == 0) {
2043    logprintf(0, "Process Error: Cannot resolve %s\n", ipaddr_);
2044    pages_copied_ = 0;
2045    status_ = false;
2046    return false;
2047  }
2048
2049  if (-1 == connect(sock, reinterpret_cast<struct sockaddr *>(&dest_addr),
2050                    sizeof(struct sockaddr))) {
2051    logprintf(0, "Process Error: Cannot connect %s\n", ipaddr_);
2052    pages_copied_ = 0;
2053    status_ = false;
2054    return false;
2055  }
2056  return true;
2057}
2058
2059// Initiate the tcp connection.
2060bool NetworkListenThread::Listen() {
2061  struct sockaddr_in sa;
2062
2063  memset(&(sa.sin_zero), '\0', sizeof(sa.sin_zero));
2064
2065  sa.sin_family = AF_INET;
2066  sa.sin_addr.s_addr = INADDR_ANY;
2067  sa.sin_port = htons(kNetworkPort);
2068
2069  if (-1 == bind(sock_, (struct sockaddr*)&sa, sizeof(struct sockaddr))) {
2070    char buf[256];
2071    sat_strerror(errno, buf, sizeof(buf));
2072    logprintf(0, "Process Error: Cannot bind socket: %s\n", buf);
2073    pages_copied_ = 0;
2074    status_ = false;
2075    return false;
2076  }
2077  listen(sock_, 3);
2078  return true;
2079}
2080
2081// Wait for a connection from a network traffic generation thread.
2082bool NetworkListenThread::Wait() {
2083    fd_set rfds;
2084    struct timeval tv;
2085    int retval;
2086
2087    // Watch sock_ to see when it has input.
2088    FD_ZERO(&rfds);
2089    FD_SET(sock_, &rfds);
2090    // Wait up to five seconds.
2091    tv.tv_sec = 5;
2092    tv.tv_usec = 0;
2093
2094    retval = select(sock_ + 1, &rfds, NULL, NULL, &tv);
2095
2096    return (retval > 0);
2097}
2098
2099// Wait for a connection from a network traffic generation thread.
2100bool NetworkListenThread::GetConnection(int *pnewsock) {
2101  struct sockaddr_in sa;
2102  socklen_t size = sizeof(struct sockaddr_in);
2103
2104  int newsock = accept(sock_, reinterpret_cast<struct sockaddr *>(&sa), &size);
2105  if (newsock < 0)  {
2106    logprintf(0, "Process Error: Did not receive connection\n");
2107    pages_copied_ = 0;
2108    status_ = false;
2109    return false;
2110  }
2111  *pnewsock = newsock;
2112  return true;
2113}
2114
2115// Send a page, return false if a page was not sent.
2116bool NetworkThread::SendPage(int sock, struct page_entry *src) {
2117  int page_length = sat_->page_length();
2118  char *address = static_cast<char*>(src->addr);
2119
2120  // Send our data over the network.
2121  int size = page_length;
2122  while (size) {
2123    int transferred = send(sock, address + (page_length - size), size, 0);
2124    if ((transferred == 0) || (transferred == -1)) {
2125      if (!IsNetworkStopSet()) {
2126        char buf[256] = "";
2127        sat_strerror(errno, buf, sizeof(buf));
2128        logprintf(0, "Process Error: Thread %d, "
2129                     "Network write failed, bailing. (%s)\n",
2130                  thread_num_, buf);
2131        status_ = false;
2132      }
2133      return false;
2134    }
2135    size = size - transferred;
2136  }
2137  return true;
2138}
2139
2140// Receive a page. Return false if a page was not received.
2141bool NetworkThread::ReceivePage(int sock, struct page_entry *dst) {
2142  int page_length = sat_->page_length();
2143  char *address = static_cast<char*>(dst->addr);
2144
2145  // Maybe we will get our data back again, maybe not.
2146  int size = page_length;
2147  while (size) {
2148    int transferred = recv(sock, address + (page_length - size), size, 0);
2149    if ((transferred == 0) || (transferred == -1)) {
2150      // Typically network slave thread should exit as network master
2151      // thread stops sending data.
2152      if (IsNetworkStopSet()) {
2153        int err = errno;
2154        if (transferred == 0 && err == 0) {
2155          // Two system setups will not sync exactly,
2156          // allow early exit, but log it.
2157          logprintf(0, "Log: Net thread did not receive any data, exiting.\n");
2158        } else {
2159          char buf[256] = "";
2160          sat_strerror(err, buf, sizeof(buf));
2161          // Print why we failed.
2162          logprintf(0, "Process Error: Thread %d, "
2163                       "Network read failed, bailing (%s).\n",
2164                    thread_num_, buf);
2165          status_ = false;
2166          // Print arguments and results.
2167          logprintf(0, "Log: recv(%d, address %x, size %x, 0) == %x, err %d\n",
2168                    sock, address + (page_length - size),
2169                    size, transferred, err);
2170          if ((transferred == 0) &&
2171              (page_length - size < 512) &&
2172              (page_length - size > 0)) {
2173            // Print null terminated data received, to see who's been
2174            // sending us supicious unwanted data.
2175            address[page_length - size] = 0;
2176            logprintf(0, "Log: received  %d bytes: '%s'\n",
2177                      page_length - size, address);
2178          }
2179        }
2180      }
2181      return false;
2182    }
2183    size = size - transferred;
2184  }
2185  return true;
2186}
2187
2188// Network IO work loop. Execute until marked done.
2189// Return true if the thread ran as expected.
2190bool NetworkThread::Work() {
2191  logprintf(9, "Log: Starting network thread %d, ip %s\n",
2192            thread_num_,
2193            ipaddr_);
2194
2195  // Make a socket.
2196  int sock = 0;
2197  if (!CreateSocket(&sock))
2198    return false;
2199
2200  // Network IO loop requires network slave thread to have already initialized.
2201  // We will sleep here for awhile to ensure that the slave thread will be
2202  // listening by the time we connect.
2203  // Sleep for 15 seconds.
2204  sat_sleep(15);
2205  logprintf(9, "Log: Starting execution of network thread %d, ip %s\n",
2206            thread_num_,
2207            ipaddr_);
2208
2209
2210  // Connect to a slave thread.
2211  if (!Connect(sock))
2212    return false;
2213
2214  // Loop until done.
2215  bool result = true;
2216  int strict = sat_->strict();
2217  int64 loops = 0;
2218  while (IsReadyToRun()) {
2219    struct page_entry src;
2220    struct page_entry dst;
2221    result = result && sat_->GetValid(&src);
2222    result = result && sat_->GetEmpty(&dst);
2223    if (!result) {
2224      logprintf(0, "Process Error: net_thread failed to pop pages, "
2225                "bailing\n");
2226      break;
2227    }
2228
2229    // Check data correctness.
2230    if (strict)
2231      CrcCheckPage(&src);
2232
2233    // Do the network write.
2234    if (!(result = result && SendPage(sock, &src)))
2235      break;
2236
2237    // Update pattern reference to reflect new contents.
2238    dst.pattern = src.pattern;
2239
2240    // Do the network read.
2241    if (!(result = result && ReceivePage(sock, &dst)))
2242      break;
2243
2244    // Ensure that the transfer ended up with correct data.
2245    if (strict)
2246      CrcCheckPage(&dst);
2247
2248    // Return all of our pages to the queue.
2249    result = result && sat_->PutValid(&dst);
2250    result = result && sat_->PutEmpty(&src);
2251    if (!result) {
2252      logprintf(0, "Process Error: net_thread failed to push pages, "
2253                "bailing\n");
2254      break;
2255    }
2256    loops++;
2257  }
2258
2259  pages_copied_ = loops;
2260  status_ = result;
2261
2262  // Clean up.
2263  CloseSocket(sock);
2264
2265  logprintf(9, "Log: Completed %d: network thread status %d, "
2266               "%d pages copied\n",
2267            thread_num_, status_, pages_copied_);
2268  return result;
2269}
2270
2271// Spawn slave threads for incoming connections.
2272bool NetworkListenThread::SpawnSlave(int newsock, int threadid) {
2273  logprintf(12, "Log: Listen thread spawning slave\n");
2274
2275  // Spawn slave thread, to reflect network traffic back to sender.
2276  ChildWorker *child_worker = new ChildWorker;
2277  child_worker->thread.SetSock(newsock);
2278  child_worker->thread.InitThread(threadid, sat_, os_, patternlist_,
2279                                  &child_worker->status);
2280  child_worker->status.Initialize();
2281  child_worker->thread.SpawnThread();
2282  child_workers_.push_back(child_worker);
2283
2284  return true;
2285}
2286
2287// Reap slave threads.
2288bool NetworkListenThread::ReapSlaves() {
2289  bool result = true;
2290  // Gather status and reap threads.
2291  logprintf(12, "Log: Joining all outstanding threads\n");
2292
2293  for (size_t i = 0; i < child_workers_.size(); i++) {
2294    NetworkSlaveThread& child_thread = child_workers_[i]->thread;
2295    logprintf(12, "Log: Joining slave thread %d\n", i);
2296    child_thread.JoinThread();
2297    if (child_thread.GetStatus() != 1) {
2298      logprintf(0, "Process Error: Slave Thread %d failed with status %d\n", i,
2299                child_thread.GetStatus());
2300      result = false;
2301    }
2302    errorcount_ += child_thread.GetErrorCount();
2303    logprintf(9, "Log: Slave Thread %d found %lld miscompares\n", i,
2304              child_thread.GetErrorCount());
2305    pages_copied_ += child_thread.GetPageCount();
2306  }
2307
2308  return result;
2309}
2310
2311// Network listener IO work loop. Execute until marked done.
2312// Return false on fatal software error.
2313bool NetworkListenThread::Work() {
2314  logprintf(9, "Log: Starting network listen thread %d\n",
2315            thread_num_);
2316
2317  // Make a socket.
2318  sock_ = 0;
2319  if (!CreateSocket(&sock_)) {
2320    status_ = false;
2321    return false;
2322  }
2323  logprintf(9, "Log: Listen thread created sock\n");
2324
2325  // Allows incoming connections to be queued up by socket library.
2326  int newsock = 0;
2327  Listen();
2328  logprintf(12, "Log: Listen thread waiting for incoming connections\n");
2329
2330  // Wait on incoming connections, and spawn worker threads for them.
2331  int threadcount = 0;
2332  while (IsReadyToRun()) {
2333    // Poll for connections that we can accept().
2334    if (Wait()) {
2335      // Accept those connections.
2336      logprintf(12, "Log: Listen thread found incoming connection\n");
2337      if (GetConnection(&newsock)) {
2338        SpawnSlave(newsock, threadcount);
2339        threadcount++;
2340      }
2341    }
2342  }
2343
2344  // Gather status and join spawned threads.
2345  ReapSlaves();
2346
2347  // Delete the child workers.
2348  for (ChildVector::iterator it = child_workers_.begin();
2349       it != child_workers_.end(); ++it) {
2350    (*it)->status.Destroy();
2351    delete *it;
2352  }
2353  child_workers_.clear();
2354
2355  CloseSocket(sock_);
2356
2357  status_ = true;
2358  logprintf(9,
2359            "Log: Completed %d: network listen thread status %d, "
2360            "%d pages copied\n",
2361            thread_num_, status_, pages_copied_);
2362  return true;
2363}
2364
2365// Set network reflector socket struct.
2366void NetworkSlaveThread::SetSock(int sock) {
2367  sock_ = sock;
2368}
2369
2370// Network reflector IO work loop. Execute until marked done.
2371// Return false on fatal software error.
2372bool NetworkSlaveThread::Work() {
2373  logprintf(9, "Log: Starting network slave thread %d\n",
2374            thread_num_);
2375
2376  // Verify that we have a socket.
2377  int sock = sock_;
2378  if (!sock) {
2379    status_ = false;
2380    return false;
2381  }
2382
2383  // Loop until done.
2384  int64 loops = 0;
2385  // Init a local buffer for storing data.
2386  void *local_page = NULL;
2387#ifdef HAVE_POSIX_MEMALIGN
2388  int result = posix_memalign(&local_page, 512, sat_->page_length());
2389#else
2390  local_page = memalign(512, sat_->page_length());
2391  int result = (local_page == 0);
2392#endif
2393  if (result) {
2394    logprintf(0, "Process Error: net slave posix_memalign "
2395                 "returned %d (fail)\n",
2396              result);
2397    status_ = false;
2398    return false;
2399  }
2400
2401  struct page_entry page;
2402  page.addr = local_page;
2403
2404  // This thread will continue to run as long as the thread on the other end of
2405  // the socket is still sending and receiving data.
2406  while (1) {
2407    // Do the network read.
2408    if (!ReceivePage(sock, &page))
2409      break;
2410
2411    // Do the network write.
2412    if (!SendPage(sock, &page))
2413      break;
2414
2415    loops++;
2416  }
2417
2418  pages_copied_ = loops;
2419  // No results provided from this type of thread.
2420  status_ = true;
2421
2422  // Clean up.
2423  CloseSocket(sock);
2424
2425  logprintf(9,
2426            "Log: Completed %d: network slave thread status %d, "
2427            "%d pages copied\n",
2428            thread_num_, status_, pages_copied_);
2429  return true;
2430}
2431
2432// Thread work loop. Execute until marked finished.
2433bool ErrorPollThread::Work() {
2434  logprintf(9, "Log: Starting system error poll thread %d\n", thread_num_);
2435
2436  // This calls a generic error polling function in the Os abstraction layer.
2437  do {
2438    errorcount_ += os_->ErrorPoll();
2439    os_->ErrorWait();
2440  } while (IsReadyToRun());
2441
2442  logprintf(9, "Log: Finished system error poll thread %d: %d errors\n",
2443            thread_num_, errorcount_);
2444  status_ = true;
2445  return true;
2446}
2447
2448// Worker thread to heat up CPU.
2449// This thread does not evaluate pass/fail or software error.
2450bool CpuStressThread::Work() {
2451  logprintf(9, "Log: Starting CPU stress thread %d\n", thread_num_);
2452
2453  do {
2454    // Run ludloff's platform/CPU-specific assembly workload.
2455    os_->CpuStressWorkload();
2456    YieldSelf();
2457  } while (IsReadyToRun());
2458
2459  logprintf(9, "Log: Finished CPU stress thread %d:\n",
2460            thread_num_);
2461  status_ = true;
2462  return true;
2463}
2464
2465CpuCacheCoherencyThread::CpuCacheCoherencyThread(cc_cacheline_data *data,
2466                                                 int cacheline_count,
2467                                                 int thread_num,
2468                                                 int inc_count) {
2469  cc_cacheline_data_ = data;
2470  cc_cacheline_count_ = cacheline_count;
2471  cc_thread_num_ = thread_num;
2472  cc_inc_count_ = inc_count;
2473}
2474
2475// Worked thread to test the cache coherency of the CPUs
2476// Return false on fatal sw error.
2477bool CpuCacheCoherencyThread::Work() {
2478  logprintf(9, "Log: Starting the Cache Coherency thread %d\n",
2479            cc_thread_num_);
2480  uint64 time_start, time_end;
2481  struct timeval tv;
2482
2483  unsigned int seed = static_cast<unsigned int>(gettid());
2484  gettimeofday(&tv, NULL);  // Get the timestamp before increments.
2485  time_start = tv.tv_sec * 1000000ULL + tv.tv_usec;
2486
2487  uint64 total_inc = 0;  // Total increments done by the thread.
2488  while (IsReadyToRun()) {
2489    for (int i = 0; i < cc_inc_count_; i++) {
2490      // Choose a datastructure in random and increment the appropriate
2491      // member in that according to the offset (which is the same as the
2492      // thread number.
2493#ifdef HAVE_RAND_R
2494      int r = rand_r(&seed);
2495#else
2496      int r = rand();
2497#endif
2498      r = cc_cacheline_count_ * (r / (RAND_MAX + 1.0));
2499      // Increment the member of the randomely selected structure.
2500      (cc_cacheline_data_[r].num[cc_thread_num_])++;
2501    }
2502
2503    total_inc += cc_inc_count_;
2504
2505    // Calculate if the local counter matches with the global value
2506    // in all the cache line structures for this particular thread.
2507    int cc_global_num = 0;
2508    for (int cline_num = 0; cline_num < cc_cacheline_count_; cline_num++) {
2509      cc_global_num += cc_cacheline_data_[cline_num].num[cc_thread_num_];
2510      // Reset the cachline member's value for the next run.
2511      cc_cacheline_data_[cline_num].num[cc_thread_num_] = 0;
2512    }
2513    if (sat_->error_injection())
2514      cc_global_num = -1;
2515
2516    if (cc_global_num != cc_inc_count_) {
2517      errorcount_++;
2518      logprintf(0, "Hardware Error: global(%d) and local(%d) do not match\n",
2519                cc_global_num, cc_inc_count_);
2520    }
2521  }
2522  gettimeofday(&tv, NULL);  // Get the timestamp at the end.
2523  time_end = tv.tv_sec * 1000000ULL + tv.tv_usec;
2524
2525  uint64 us_elapsed = time_end - time_start;
2526  // inc_rate is the no. of increments per second.
2527  double inc_rate = total_inc * 1e6 / us_elapsed;
2528
2529  logprintf(4, "Stats: CC Thread(%d): Time=%llu us,"
2530            " Increments=%llu, Increments/sec = %.6lf\n",
2531            cc_thread_num_, us_elapsed, total_inc, inc_rate);
2532  logprintf(9, "Log: Finished CPU Cache Coherency thread %d:\n",
2533            cc_thread_num_);
2534  status_ = true;
2535  return true;
2536}
2537
2538DiskThread::DiskThread(DiskBlockTable *block_table) {
2539  read_block_size_ = kSectorSize;   // default 1 sector (512 bytes)
2540  write_block_size_ = kSectorSize;  // this assumes read and write block size
2541                                    // are the same
2542  segment_size_ = -1;               // use the entire disk as one segment
2543  cache_size_ = 16 * 1024 * 1024;   // assume 16MiB cache by default
2544  // Use a queue such that 3/2 times as much data as the cache can hold
2545  // is written before it is read so that there is little chance the read
2546  // data is in the cache.
2547  queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2548  blocks_per_segment_ = 32;
2549
2550  read_threshold_ = 100000;         // 100ms is a reasonable limit for
2551  write_threshold_ = 100000;        // reading/writing a sector
2552
2553  read_timeout_ = 5000000;          // 5 seconds should be long enough for a
2554  write_timeout_ = 5000000;         // timout for reading/writing
2555
2556  device_sectors_ = 0;
2557  non_destructive_ = 0;
2558
2559#ifdef HAVE_LIBAIO_H
2560  aio_ctx_ = 0;
2561#endif
2562  block_table_ = block_table;
2563  update_block_table_ = 1;
2564
2565  block_buffer_ = NULL;
2566
2567  blocks_written_ = 0;
2568  blocks_read_ = 0;
2569}
2570
2571DiskThread::~DiskThread() {
2572  if (block_buffer_)
2573    free(block_buffer_);
2574}
2575
2576// Set filename for device file (in /dev).
2577void DiskThread::SetDevice(const char *device_name) {
2578  device_name_ = device_name;
2579}
2580
2581// Set various parameters that control the behaviour of the test.
2582// -1 is used as a sentinel value on each parameter (except non_destructive)
2583// to indicate that the parameter not be set.
2584bool DiskThread::SetParameters(int read_block_size,
2585                               int write_block_size,
2586                               int64 segment_size,
2587                               int64 cache_size,
2588                               int blocks_per_segment,
2589                               int64 read_threshold,
2590                               int64 write_threshold,
2591                               int non_destructive) {
2592  if (read_block_size != -1) {
2593    // Blocks must be aligned to the disk's sector size.
2594    if (read_block_size % kSectorSize != 0) {
2595      logprintf(0, "Process Error: Block size must be a multiple of %d "
2596                "(thread %d).\n", kSectorSize, thread_num_);
2597      return false;
2598    }
2599
2600    read_block_size_ = read_block_size;
2601  }
2602
2603  if (write_block_size != -1) {
2604    // Write blocks must be aligned to the disk's sector size and to the
2605    // block size.
2606    if (write_block_size % kSectorSize != 0) {
2607      logprintf(0, "Process Error: Write block size must be a multiple "
2608                "of %d (thread %d).\n", kSectorSize, thread_num_);
2609      return false;
2610    }
2611    if (write_block_size % read_block_size_ != 0) {
2612      logprintf(0, "Process Error: Write block size must be a multiple "
2613                "of the read block size, which is %d (thread %d).\n",
2614                read_block_size_, thread_num_);
2615      return false;
2616    }
2617
2618    write_block_size_ = write_block_size;
2619
2620  } else {
2621    // Make sure write_block_size_ is still valid.
2622    if (read_block_size_ > write_block_size_) {
2623      logprintf(5, "Log: Assuming write block size equal to read block size, "
2624                "which is %d (thread %d).\n", read_block_size_,
2625                thread_num_);
2626      write_block_size_ = read_block_size_;
2627    } else {
2628      if (write_block_size_ % read_block_size_ != 0) {
2629        logprintf(0, "Process Error: Write block size (defined as %d) must "
2630                  "be a multiple of the read block size, which is %d "
2631                  "(thread %d).\n", write_block_size_, read_block_size_,
2632                  thread_num_);
2633        return false;
2634      }
2635    }
2636  }
2637
2638  if (cache_size != -1) {
2639    cache_size_ = cache_size;
2640  }
2641
2642  if (blocks_per_segment != -1) {
2643    if (blocks_per_segment <= 0) {
2644      logprintf(0, "Process Error: Blocks per segment must be greater than "
2645                   "zero.\n (thread %d)", thread_num_);
2646      return false;
2647    }
2648
2649    blocks_per_segment_ = blocks_per_segment;
2650  }
2651
2652  if (read_threshold != -1) {
2653    if (read_threshold <= 0) {
2654      logprintf(0, "Process Error: Read threshold must be greater than "
2655                   "zero (thread %d).\n", thread_num_);
2656      return false;
2657    }
2658
2659    read_threshold_ = read_threshold;
2660  }
2661
2662  if (write_threshold != -1) {
2663    if (write_threshold <= 0) {
2664      logprintf(0, "Process Error: Write threshold must be greater than "
2665                   "zero (thread %d).\n", thread_num_);
2666      return false;
2667    }
2668
2669    write_threshold_ = write_threshold;
2670  }
2671
2672  if (segment_size != -1) {
2673    // Segments must be aligned to the disk's sector size.
2674    if (segment_size % kSectorSize != 0) {
2675      logprintf(0, "Process Error: Segment size must be a multiple of %d"
2676                " (thread %d).\n", kSectorSize, thread_num_);
2677      return false;
2678    }
2679
2680    segment_size_ = segment_size / kSectorSize;
2681  }
2682
2683  non_destructive_ = non_destructive;
2684
2685  // Having a queue of 150% of blocks that will fit in the disk's cache
2686  // should be enough to force out the oldest block before it is read and hence,
2687  // making sure the data comes form the disk and not the cache.
2688  queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2689  // Updating DiskBlockTable parameters
2690  if (update_block_table_) {
2691    block_table_->SetParameters(kSectorSize, write_block_size_,
2692                                device_sectors_, segment_size_,
2693                                device_name_);
2694  }
2695  return true;
2696}
2697
2698// Open a device, return false on failure.
2699bool DiskThread::OpenDevice(int *pfile) {
2700  bool no_O_DIRECT = false;
2701  int flags = O_RDWR | O_SYNC | O_LARGEFILE;
2702  int fd = open(device_name_.c_str(), flags | O_DIRECT, 0);
2703  if (O_DIRECT != 0 && fd < 0 && errno == EINVAL) {
2704    no_O_DIRECT = true;
2705    fd = open(device_name_.c_str(), flags, 0); // Try without O_DIRECT
2706  }
2707  if (fd < 0) {
2708    logprintf(0, "Process Error: Failed to open device %s (thread %d)!!\n",
2709              device_name_.c_str(), thread_num_);
2710    return false;
2711  }
2712  if (no_O_DIRECT)
2713    os_->ActivateFlushPageCache();
2714  *pfile = fd;
2715
2716  return GetDiskSize(fd);
2717}
2718
2719// Retrieves the size (in bytes) of the disk/file.
2720// Return false on failure.
2721bool DiskThread::GetDiskSize(int fd) {
2722  struct stat device_stat;
2723  if (fstat(fd, &device_stat) == -1) {
2724    logprintf(0, "Process Error: Unable to fstat disk %s (thread %d).\n",
2725              device_name_.c_str(), thread_num_);
2726    return false;
2727  }
2728
2729  // For a block device, an ioctl is needed to get the size since the size
2730  // of the device file (i.e. /dev/sdb) is 0.
2731  if (S_ISBLK(device_stat.st_mode)) {
2732    uint64 block_size = 0;
2733
2734    if (ioctl(fd, BLKGETSIZE64, &block_size) == -1) {
2735      logprintf(0, "Process Error: Unable to ioctl disk %s (thread %d).\n",
2736                device_name_.c_str(), thread_num_);
2737      return false;
2738    }
2739
2740    // Zero size indicates nonworking device..
2741    if (block_size == 0) {
2742      os_->ErrorReport(device_name_.c_str(), "device-size-zero", 1);
2743      ++errorcount_;
2744      status_ = true;  // Avoid a procedural error.
2745      return false;
2746    }
2747
2748    device_sectors_ = block_size / kSectorSize;
2749
2750  } else if (S_ISREG(device_stat.st_mode)) {
2751    device_sectors_ = device_stat.st_size / kSectorSize;
2752
2753  } else {
2754    logprintf(0, "Process Error: %s is not a regular file or block "
2755              "device (thread %d).\n", device_name_.c_str(),
2756              thread_num_);
2757    return false;
2758  }
2759
2760  logprintf(12, "Log: Device sectors: %lld on disk %s (thread %d).\n",
2761            device_sectors_, device_name_.c_str(), thread_num_);
2762
2763  if (update_block_table_) {
2764    block_table_->SetParameters(kSectorSize, write_block_size_,
2765                                device_sectors_, segment_size_,
2766                                device_name_);
2767  }
2768
2769  return true;
2770}
2771
2772bool DiskThread::CloseDevice(int fd) {
2773  close(fd);
2774  return true;
2775}
2776
2777// Return the time in microseconds.
2778int64 DiskThread::GetTime() {
2779  struct timeval tv;
2780  gettimeofday(&tv, NULL);
2781  return tv.tv_sec * 1000000 + tv.tv_usec;
2782}
2783
2784// Do randomized reads and (possibly) writes on a device.
2785// Return false on fatal SW error, true on SW success,
2786// regardless of whether HW failed.
2787bool DiskThread::DoWork(int fd) {
2788  int64 block_num = 0;
2789  int64 num_segments;
2790
2791  if (segment_size_ == -1) {
2792    num_segments = 1;
2793  } else {
2794    num_segments = device_sectors_ / segment_size_;
2795    if (device_sectors_ % segment_size_ != 0)
2796      num_segments++;
2797  }
2798
2799  // Disk size should be at least 3x cache size.  See comment later for
2800  // details.
2801  sat_assert(device_sectors_ * kSectorSize > 3 * cache_size_);
2802
2803  // This disk test works by writing blocks with a certain pattern to
2804  // disk, then reading them back and verifying it against the pattern
2805  // at a later time.  A failure happens when either the block cannot
2806  // be written/read or when the read block is different than what was
2807  // written.  If a block takes too long to write/read, then a warning
2808  // is given instead of an error since taking too long is not
2809  // necessarily an error.
2810  //
2811  // To prevent the read blocks from coming from the disk cache,
2812  // enough blocks are written before read such that a block would
2813  // be ejected from the disk cache by the time it is read.
2814  //
2815  // TODO(amistry): Implement some sort of read/write throttling.  The
2816  //                flood of asynchronous I/O requests when a drive is
2817  //                unplugged is causing the application and kernel to
2818  //                become unresponsive.
2819
2820  while (IsReadyToRun()) {
2821    // Write blocks to disk.
2822    logprintf(16, "Log: Write phase %sfor disk %s (thread %d).\n",
2823              non_destructive_ ? "(disabled) " : "",
2824              device_name_.c_str(), thread_num_);
2825    while (IsReadyToRunNoPause() &&
2826           in_flight_sectors_.size() <
2827               static_cast<size_t>(queue_size_ + 1)) {
2828      // Confine testing to a particular segment of the disk.
2829      int64 segment = (block_num / blocks_per_segment_) % num_segments;
2830      if (!non_destructive_ &&
2831          (block_num % blocks_per_segment_ == 0)) {
2832        logprintf(20, "Log: Starting to write segment %lld out of "
2833                  "%lld on disk %s (thread %d).\n",
2834                  segment, num_segments, device_name_.c_str(),
2835                  thread_num_);
2836      }
2837      block_num++;
2838
2839      BlockData *block = block_table_->GetUnusedBlock(segment);
2840
2841      // If an unused sequence of sectors could not be found, skip to the
2842      // next block to process.  Soon, a new segment will come and new
2843      // sectors will be able to be allocated.  This effectively puts a
2844      // minumim on the disk size at 3x the stated cache size, or 48MiB
2845      // if a cache size is not given (since the cache is set as 16MiB
2846      // by default).  Given that todays caches are at the low MiB range
2847      // and drive sizes at the mid GB, this shouldn't pose a problem.
2848      // The 3x minimum comes from the following:
2849      //   1. In order to allocate 'y' blocks from a segment, the
2850      //      segment must contain at least 2y blocks or else an
2851      //      allocation may not succeed.
2852      //   2. Assume the entire disk is one segment.
2853      //   3. A full write phase consists of writing blocks corresponding to
2854      //      3/2 cache size.
2855      //   4. Therefore, the one segment must have 2 * 3/2 * cache
2856      //      size worth of blocks = 3 * cache size worth of blocks
2857      //      to complete.
2858      // In non-destructive mode, don't write anything to disk.
2859      if (!non_destructive_) {
2860        if (!WriteBlockToDisk(fd, block)) {
2861          block_table_->RemoveBlock(block);
2862          return true;
2863        }
2864        blocks_written_++;
2865      }
2866
2867      // Block is either initialized by writing, or in nondestructive case,
2868      // initialized by being added into the datastructure for later reading.
2869      block->SetBlockAsInitialized();
2870
2871      in_flight_sectors_.push(block);
2872    }
2873    if (!os_->FlushPageCache()) // If O_DIRECT worked, this will be a NOP.
2874      return false;
2875
2876    // Verify blocks on disk.
2877    logprintf(20, "Log: Read phase for disk %s (thread %d).\n",
2878              device_name_.c_str(), thread_num_);
2879    while (IsReadyToRunNoPause() && !in_flight_sectors_.empty()) {
2880      BlockData *block = in_flight_sectors_.front();
2881      in_flight_sectors_.pop();
2882      if (!ValidateBlockOnDisk(fd, block))
2883        return true;
2884      block_table_->RemoveBlock(block);
2885      blocks_read_++;
2886    }
2887  }
2888
2889  pages_copied_ = blocks_written_ + blocks_read_;
2890  return true;
2891}
2892
2893// Do an asynchronous disk I/O operation.
2894// Return false if the IO is not set up.
2895bool DiskThread::AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
2896                            int64 offset, int64 timeout) {
2897#ifdef HAVE_LIBAIO_H
2898  // Use the Linux native asynchronous I/O interface for reading/writing.
2899  // A read/write consists of three basic steps:
2900  //    1. create an io context.
2901  //    2. prepare and submit an io request to the context
2902  //    3. wait for an event on the context.
2903
2904  struct {
2905    const int opcode;
2906    const char *op_str;
2907    const char *error_str;
2908  } operations[2] = {
2909    { IO_CMD_PREAD, "read", "disk-read-error" },
2910    { IO_CMD_PWRITE, "write", "disk-write-error" }
2911  };
2912
2913  struct iocb cb;
2914  memset(&cb, 0, sizeof(cb));
2915
2916  cb.aio_fildes = fd;
2917  cb.aio_lio_opcode = operations[op].opcode;
2918  cb.u.c.buf = buf;
2919  cb.u.c.nbytes = size;
2920  cb.u.c.offset = offset;
2921
2922  struct iocb *cbs[] = { &cb };
2923  if (io_submit(aio_ctx_, 1, cbs) != 1) {
2924    int error = errno;
2925    char buf[256];
2926    sat_strerror(error, buf, sizeof(buf));
2927    logprintf(0, "Process Error: Unable to submit async %s "
2928                 "on disk %s (thread %d). Error %d, %s\n",
2929              operations[op].op_str, device_name_.c_str(),
2930              thread_num_, error, buf);
2931    return false;
2932  }
2933
2934  struct io_event event;
2935  memset(&event, 0, sizeof(event));
2936  struct timespec tv;
2937  tv.tv_sec = timeout / 1000000;
2938  tv.tv_nsec = (timeout % 1000000) * 1000;
2939  if (io_getevents(aio_ctx_, 1, 1, &event, &tv) != 1) {
2940    // A ctrl-c from the keyboard will cause io_getevents to fail with an
2941    // EINTR error code.  This is not an error and so don't treat it as such,
2942    // but still log it.
2943    int error = errno;
2944    if (error == EINTR) {
2945      logprintf(5, "Log: %s interrupted on disk %s (thread %d).\n",
2946                operations[op].op_str, device_name_.c_str(),
2947                thread_num_);
2948    } else {
2949      os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2950      errorcount_ += 1;
2951      logprintf(0, "Hardware Error: Timeout doing async %s to sectors "
2952                   "starting at %lld on disk %s (thread %d).\n",
2953                operations[op].op_str, offset / kSectorSize,
2954                device_name_.c_str(), thread_num_);
2955    }
2956
2957    // Don't bother checking return codes since io_cancel seems to always fail.
2958    // Since io_cancel is always failing, destroying and recreating an I/O
2959    // context is a workaround for canceling an in-progress I/O operation.
2960    // TODO(amistry): Find out why io_cancel isn't working and make it work.
2961    io_cancel(aio_ctx_, &cb, &event);
2962    io_destroy(aio_ctx_);
2963    aio_ctx_ = 0;
2964    if (io_setup(5, &aio_ctx_)) {
2965      int error = errno;
2966      char buf[256];
2967      sat_strerror(error, buf, sizeof(buf));
2968      logprintf(0, "Process Error: Unable to create aio context on disk %s"
2969                " (thread %d) Error %d, %s\n",
2970                device_name_.c_str(), thread_num_, error, buf);
2971    }
2972
2973    return false;
2974  }
2975
2976  // event.res contains the number of bytes written/read or
2977  // error if < 0, I think.
2978  if (event.res != static_cast<uint64>(size)) {
2979    errorcount_++;
2980    os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2981
2982    if (event.res < 0) {
2983      switch (event.res) {
2984        case -EIO:
2985          logprintf(0, "Hardware Error: Low-level I/O error while doing %s to "
2986                       "sectors starting at %lld on disk %s (thread %d).\n",
2987                    operations[op].op_str, offset / kSectorSize,
2988                    device_name_.c_str(), thread_num_);
2989          break;
2990        default:
2991          logprintf(0, "Hardware Error: Unknown error while doing %s to "
2992                       "sectors starting at %lld on disk %s (thread %d).\n",
2993                    operations[op].op_str, offset / kSectorSize,
2994                    device_name_.c_str(), thread_num_);
2995      }
2996    } else {
2997      logprintf(0, "Hardware Error: Unable to %s to sectors starting at "
2998                   "%lld on disk %s (thread %d).\n",
2999                operations[op].op_str, offset / kSectorSize,
3000                device_name_.c_str(), thread_num_);
3001    }
3002    return false;
3003  }
3004
3005  return true;
3006#else // !HAVE_LIBAIO_H
3007  return false;
3008#endif
3009}
3010
3011// Write a block to disk.
3012// Return false if the block is not written.
3013bool DiskThread::WriteBlockToDisk(int fd, BlockData *block) {
3014  memset(block_buffer_, 0, block->GetSize());
3015
3016  // Fill block buffer with a pattern
3017  struct page_entry pe;
3018  if (!sat_->GetValid(&pe)) {
3019    // Even though a valid page could not be obatined, it is not an error
3020    // since we can always fill in a pattern directly, albeit slower.
3021    unsigned int *memblock = static_cast<unsigned int *>(block_buffer_);
3022    block->SetPattern(patternlist_->GetRandomPattern());
3023
3024    logprintf(11, "Log: Warning, using pattern fill fallback in "
3025                  "DiskThread::WriteBlockToDisk on disk %s (thread %d).\n",
3026              device_name_.c_str(), thread_num_);
3027
3028    for (int i = 0; i < block->GetSize()/wordsize_; i++) {
3029      memblock[i] = block->GetPattern()->pattern(i);
3030    }
3031  } else {
3032    memcpy(block_buffer_, pe.addr, block->GetSize());
3033    block->SetPattern(pe.pattern);
3034    sat_->PutValid(&pe);
3035  }
3036
3037  logprintf(12, "Log: Writing %lld sectors starting at %lld on disk %s"
3038            " (thread %d).\n",
3039            block->GetSize()/kSectorSize, block->GetAddress(),
3040            device_name_.c_str(), thread_num_);
3041
3042  int64 start_time = GetTime();
3043
3044  if (!AsyncDiskIO(ASYNC_IO_WRITE, fd, block_buffer_, block->GetSize(),
3045                   block->GetAddress() * kSectorSize, write_timeout_)) {
3046    return false;
3047  }
3048
3049  int64 end_time = GetTime();
3050  logprintf(12, "Log: Writing time: %lld us (thread %d).\n",
3051            end_time - start_time, thread_num_);
3052  if (end_time - start_time > write_threshold_) {
3053    logprintf(5, "Log: Write took %lld us which is longer than threshold "
3054                 "%lld us on disk %s (thread %d).\n",
3055              end_time - start_time, write_threshold_, device_name_.c_str(),
3056              thread_num_);
3057  }
3058
3059  return true;
3060}
3061
3062// Verify a block on disk.
3063// Return true if the block was read, also increment errorcount
3064// if the block had data errors or performance problems.
3065bool DiskThread::ValidateBlockOnDisk(int fd, BlockData *block) {
3066  int64 blocks = block->GetSize() / read_block_size_;
3067  int64 bytes_read = 0;
3068  int64 current_blocks;
3069  int64 current_bytes;
3070  uint64 address = block->GetAddress();
3071
3072  logprintf(20, "Log: Reading sectors starting at %lld on disk %s "
3073            "(thread %d).\n",
3074            address, device_name_.c_str(), thread_num_);
3075
3076  // Read block from disk and time the read.  If it takes longer than the
3077  // threshold, complain.
3078  if (lseek64(fd, address * kSectorSize, SEEK_SET) == -1) {
3079    logprintf(0, "Process Error: Unable to seek to sector %lld in "
3080              "DiskThread::ValidateSectorsOnDisk on disk %s "
3081              "(thread %d).\n", address, device_name_.c_str(), thread_num_);
3082    return false;
3083  }
3084  int64 start_time = GetTime();
3085
3086  // Split a large write-sized block into small read-sized blocks and
3087  // read them in groups of randomly-sized multiples of read block size.
3088  // This assures all data written on disk by this particular block
3089  // will be tested using a random reading pattern.
3090  while (blocks != 0) {
3091    // Test all read blocks in a written block.
3092    current_blocks = (random() % blocks) + 1;
3093    current_bytes = current_blocks * read_block_size_;
3094
3095    memset(block_buffer_, 0, current_bytes);
3096
3097    logprintf(20, "Log: Reading %lld sectors starting at sector %lld on "
3098              "disk %s (thread %d)\n",
3099              current_bytes / kSectorSize,
3100              (address * kSectorSize + bytes_read) / kSectorSize,
3101              device_name_.c_str(), thread_num_);
3102
3103    if (!AsyncDiskIO(ASYNC_IO_READ, fd, block_buffer_, current_bytes,
3104                     address * kSectorSize + bytes_read,
3105                     write_timeout_)) {
3106      return false;
3107    }
3108
3109    int64 end_time = GetTime();
3110    logprintf(20, "Log: Reading time: %lld us (thread %d).\n",
3111              end_time - start_time, thread_num_);
3112    if (end_time - start_time > read_threshold_) {
3113      logprintf(5, "Log: Read took %lld us which is longer than threshold "
3114                "%lld us on disk %s (thread %d).\n",
3115                end_time - start_time, read_threshold_,
3116                device_name_.c_str(), thread_num_);
3117    }
3118
3119    // In non-destructive mode, don't compare the block to the pattern since
3120    // the block was never written to disk in the first place.
3121    if (!non_destructive_) {
3122      if (CheckRegion(block_buffer_, block->GetPattern(), current_bytes,
3123                      0, bytes_read)) {
3124        os_->ErrorReport(device_name_.c_str(), "disk-pattern-error", 1);
3125        errorcount_ += 1;
3126        logprintf(0, "Hardware Error: Pattern mismatch in block starting at "
3127                  "sector %lld in DiskThread::ValidateSectorsOnDisk on "
3128                  "disk %s (thread %d).\n",
3129                  address, device_name_.c_str(), thread_num_);
3130      }
3131    }
3132
3133    bytes_read += current_blocks * read_block_size_;
3134    blocks -= current_blocks;
3135  }
3136
3137  return true;
3138}
3139
3140// Direct device access thread.
3141// Return false on software error.
3142bool DiskThread::Work() {
3143  int fd;
3144
3145  logprintf(9, "Log: Starting disk thread %d, disk %s\n",
3146            thread_num_, device_name_.c_str());
3147
3148  srandom(time(NULL));
3149
3150  if (!OpenDevice(&fd)) {
3151    status_ = false;
3152    return false;
3153  }
3154
3155  // Allocate a block buffer aligned to 512 bytes since the kernel requires it
3156  // when using direct IO.
3157#ifdef HAVE_POSIX_MEMALIGN
3158  int memalign_result = posix_memalign(&block_buffer_, kBufferAlignment,
3159                              sat_->page_length());
3160#else
3161  block_buffer_ = memalign(kBufferAlignment, sat_->page_length());
3162  int memalign_result = (block_buffer_ == 0);
3163#endif
3164  if (memalign_result) {
3165    CloseDevice(fd);
3166    logprintf(0, "Process Error: Unable to allocate memory for buffers "
3167                 "for disk %s (thread %d) posix memalign returned %d.\n",
3168              device_name_.c_str(), thread_num_, memalign_result);
3169    status_ = false;
3170    return false;
3171  }
3172
3173#ifdef HAVE_LIBAIO_H
3174  if (io_setup(5, &aio_ctx_)) {
3175    CloseDevice(fd);
3176    logprintf(0, "Process Error: Unable to create aio context for disk %s"
3177              " (thread %d).\n",
3178              device_name_.c_str(), thread_num_);
3179    status_ = false;
3180    return false;
3181  }
3182#endif
3183
3184  bool result = DoWork(fd);
3185
3186  status_ = result;
3187
3188#ifdef HAVE_LIBAIO_H
3189  io_destroy(aio_ctx_);
3190#endif
3191  CloseDevice(fd);
3192
3193  logprintf(9, "Log: Completed %d (disk %s): disk thread status %d, "
3194               "%d pages copied\n",
3195            thread_num_, device_name_.c_str(), status_, pages_copied_);
3196  return result;
3197}
3198
3199RandomDiskThread::RandomDiskThread(DiskBlockTable *block_table)
3200    : DiskThread(block_table) {
3201  update_block_table_ = 0;
3202}
3203
3204RandomDiskThread::~RandomDiskThread() {
3205}
3206
3207// Workload for random disk thread.
3208bool RandomDiskThread::DoWork(int fd) {
3209  logprintf(11, "Log: Random phase for disk %s (thread %d).\n",
3210            device_name_.c_str(), thread_num_);
3211  while (IsReadyToRun()) {
3212    BlockData *block = block_table_->GetRandomBlock();
3213    if (block == NULL) {
3214      logprintf(12, "Log: No block available for device %s (thread %d).\n",
3215                device_name_.c_str(), thread_num_);
3216    } else {
3217      ValidateBlockOnDisk(fd, block);
3218      block_table_->ReleaseBlock(block);
3219      blocks_read_++;
3220    }
3221  }
3222  pages_copied_ = blocks_read_;
3223  return true;
3224}
3225
3226MemoryRegionThread::MemoryRegionThread() {
3227  error_injection_ = false;
3228  pages_ = NULL;
3229}
3230
3231MemoryRegionThread::~MemoryRegionThread() {
3232  if (pages_ != NULL)
3233    delete pages_;
3234}
3235
3236// Set a region of memory or MMIO to be tested.
3237// Return false if region could not be mapped.
3238bool MemoryRegionThread::SetRegion(void *region, int64 size) {
3239  int plength = sat_->page_length();
3240  int npages = size / plength;
3241  if (size % plength) {
3242    logprintf(0, "Process Error: region size is not a multiple of SAT "
3243              "page length\n");
3244    return false;
3245  } else {
3246    if (pages_ != NULL)
3247      delete pages_;
3248    pages_ = new PageEntryQueue(npages);
3249    char *base_addr = reinterpret_cast<char*>(region);
3250    region_ = base_addr;
3251    for (int i = 0; i < npages; i++) {
3252      struct page_entry pe;
3253      init_pe(&pe);
3254      pe.addr = reinterpret_cast<void*>(base_addr + i * plength);
3255      pe.offset = i * plength;
3256
3257      pages_->Push(&pe);
3258    }
3259    return true;
3260  }
3261}
3262
3263// More detailed error printout for hardware errors in memory or MMIO
3264// regions.
3265void MemoryRegionThread::ProcessError(struct ErrorRecord *error,
3266                                      int priority,
3267                                      const char *message) {
3268  uint32 buffer_offset;
3269  if (phase_ == kPhaseCopy) {
3270    // If the error occurred on the Copy Phase, it means that
3271    // the source data (i.e., the main memory) is wrong. so
3272    // just pass it to the original ProcessError to call a
3273    // bad-dimm error
3274    WorkerThread::ProcessError(error, priority, message);
3275  } else if (phase_ == kPhaseCheck) {
3276    // A error on the Check Phase means that the memory region tested
3277    // has an error. Gathering more information and then reporting
3278    // the error.
3279    // Determine if this is a write or read error.
3280    os_->Flush(error->vaddr);
3281    error->reread = *(error->vaddr);
3282    char *good = reinterpret_cast<char*>(&(error->expected));
3283    char *bad = reinterpret_cast<char*>(&(error->actual));
3284    sat_assert(error->expected != error->actual);
3285    unsigned int offset = 0;
3286    for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
3287      if (good[offset] != bad[offset])
3288        break;
3289    }
3290
3291    error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
3292
3293    buffer_offset = error->vbyteaddr - region_;
3294
3295    // Find physical address if possible.
3296    error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
3297    logprintf(priority,
3298              "%s: miscompare on %s, CRC check at %p(0x%llx), "
3299              "offset %llx: read:0x%016llx, reread:0x%016llx "
3300              "expected:0x%016llx\n",
3301              message,
3302              identifier_.c_str(),
3303              error->vaddr,
3304              error->paddr,
3305              buffer_offset,
3306              error->actual,
3307              error->reread,
3308              error->expected);
3309  } else {
3310    logprintf(0, "Process Error: memory region thread raised an "
3311              "unexpected error.");
3312  }
3313}
3314
3315// Workload for testion memory or MMIO regions.
3316// Return false on software error.
3317bool MemoryRegionThread::Work() {
3318  struct page_entry source_pe;
3319  struct page_entry memregion_pe;
3320  bool result = true;
3321  int64 loops = 0;
3322  const uint64 error_constant = 0x00ba00000000ba00LL;
3323
3324  // For error injection.
3325  int64 *addr = 0x0;
3326  int offset = 0;
3327  int64 data = 0;
3328
3329  logprintf(9, "Log: Starting Memory Region thread %d\n", thread_num_);
3330
3331  while (IsReadyToRun()) {
3332    // Getting pages from SAT and queue.
3333    phase_ = kPhaseNoPhase;
3334    result = result && sat_->GetValid(&source_pe);
3335    if (!result) {
3336      logprintf(0, "Process Error: memory region thread failed to pop "
3337                "pages from SAT, bailing\n");
3338      break;
3339    }
3340
3341    result = result && pages_->PopRandom(&memregion_pe);
3342    if (!result) {
3343      logprintf(0, "Process Error: memory region thread failed to pop "
3344                "pages from queue, bailing\n");
3345      break;
3346    }
3347
3348    // Error injection for CRC copy.
3349    if ((sat_->error_injection() || error_injection_) && loops == 1) {
3350      addr = reinterpret_cast<int64*>(source_pe.addr);
3351      offset = random() % (sat_->page_length() / wordsize_);
3352      data = addr[offset];
3353      addr[offset] = error_constant;
3354    }
3355
3356    // Copying SAT page into memory region.
3357    phase_ = kPhaseCopy;
3358    CrcCopyPage(&memregion_pe, &source_pe);
3359    memregion_pe.pattern = source_pe.pattern;
3360
3361    // Error injection for CRC Check.
3362    if ((sat_->error_injection() || error_injection_) && loops == 2) {
3363      addr = reinterpret_cast<int64*>(memregion_pe.addr);
3364      offset = random() % (sat_->page_length() / wordsize_);
3365      data = addr[offset];
3366      addr[offset] = error_constant;
3367    }
3368
3369    // Checking page content in memory region.
3370    phase_ = kPhaseCheck;
3371    CrcCheckPage(&memregion_pe);
3372
3373    phase_ = kPhaseNoPhase;
3374    // Storing pages on their proper queues.
3375    result = result && sat_->PutValid(&source_pe);
3376    if (!result) {
3377      logprintf(0, "Process Error: memory region thread failed to push "
3378                "pages into SAT, bailing\n");
3379      break;
3380    }
3381    result = result && pages_->Push(&memregion_pe);
3382    if (!result) {
3383      logprintf(0, "Process Error: memory region thread failed to push "
3384                "pages into queue, bailing\n");
3385      break;
3386    }
3387
3388    if ((sat_->error_injection() || error_injection_) &&
3389        loops >= 1 && loops <= 2) {
3390      addr[offset] = data;
3391    }
3392
3393    loops++;
3394    YieldSelf();
3395  }
3396
3397  pages_copied_ = loops;
3398  status_ = result;
3399  logprintf(9, "Log: Completed %d: Memory Region thread. Status %d, %d "
3400            "pages checked\n", thread_num_, status_, pages_copied_);
3401  return result;
3402}
3403