chiark / gitweb /
Add better ARM/Android support, support a wider variety of configure options.
[stressapptest] / src / worker.cc
1 // Copyright 2006 Google Inc. All Rights Reserved.
2
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6
7 //      http://www.apache.org/licenses/LICENSE-2.0
8
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // worker.cc : individual tasks that can be run in combination to
16 // stress the system
17
18 #include <errno.h>
19 #include <pthread.h>
20 #include <sched.h>
21 #include <signal.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <string.h>
26 #include <time.h>
27 #include <unistd.h>
28
29 #include <sys/select.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <sys/times.h>
33
34 // These are necessary, but on by default
35 // #define __USE_GNU
36 // #define __USE_LARGEFILE64
37 #include <fcntl.h>
38 #include <sys/socket.h>
39 #include <netdb.h>
40 #include <arpa/inet.h>
41 #include <linux/unistd.h>  // for gettid
42
43 // For size of block device
44 #include <sys/ioctl.h>
45 #include <linux/fs.h>
46 // For asynchronous I/O
47 #ifdef HAVE_LIBAIO_H
48 #include <libaio.h>
49 #endif
50
51 #include <sys/syscall.h>
52
53 #include <set>
54 #include <string>
55
56 // This file must work with autoconf on its public version,
57 // so these includes are correct.
58 #include "error_diag.h"  // NOLINT
59 #include "os.h"          // NOLINT
60 #include "pattern.h"     // NOLINT
61 #include "queue.h"       // NOLINT
62 #include "sat.h"         // NOLINT
63 #include "sattypes.h"    // NOLINT
64 #include "worker.h"      // NOLINT
65
66 // Syscalls
67 // Why ubuntu, do you hate gettid so bad?
68 #if !defined(__NR_gettid)
69   #define __NR_gettid             224
70 #endif
71
72 #define gettid() syscall(__NR_gettid)
73 #if !defined(CPU_SETSIZE)
74 _syscall3(int, sched_getaffinity, pid_t, pid,
75           unsigned int, len, cpu_set_t*, mask)
76 _syscall3(int, sched_setaffinity, pid_t, pid,
77           unsigned int, len, cpu_set_t*, mask)
78 #endif
79
80 namespace {
81   // Get HW core ID from cpuid instruction.
82   inline int apicid(void) {
83     int cpu;
84 #if defined(STRESSAPPTEST_CPU_X86_64) || defined(STRESSAPPTEST_CPU_I686)
85     __asm __volatile("cpuid" : "=b" (cpu) : "a" (1) : "cx", "dx");
86 #elif defined(STRESSAPPTEST_CPU_ARMV7A)
87   #warning "Unsupported CPU type ARMV7A: unable to determine core ID."
88     cpu = 0;
89 #else
90   #warning "Unsupported CPU type: unable to determine core ID."
91     cpu = 0;
92 #endif
93     return (cpu >> 24);
94   }
95
96   // Work around the sad fact that there are two (gnu, xsi) incompatible
97   // versions of strerror_r floating around google. Awesome.
98   bool sat_strerror(int err, char *buf, int len) {
99     buf[0] = 0;
100     char *errmsg = reinterpret_cast<char*>(strerror_r(err, buf, len));
101     int retval = reinterpret_cast<int64>(errmsg);
102     if (retval == 0)
103       return true;
104     if (retval == -1)
105       return false;
106     if (errmsg != buf) {
107       strncpy(buf, errmsg, len);
108       buf[len - 1] = 0;
109     }
110     return true;
111   }
112
113
114   inline uint64 addr_to_tag(void *address) {
115     return reinterpret_cast<uint64>(address);
116   }
117 }
118
119 #if !defined(O_DIRECT)
120 // Sometimes this isn't available.
121 // Disregard if it's not defined.
122   #define O_DIRECT            0
123 #endif
124
125 // A struct to hold captured errors, for later reporting.
126 struct ErrorRecord {
127   uint64 actual;  // This is the actual value read.
128   uint64 reread;  // This is the actual value, reread.
129   uint64 expected;  // This is what it should have been.
130   uint64 *vaddr;  // This is where it was (or wasn't).
131   char *vbyteaddr;  // This is byte specific where the data was (or wasn't).
132   uint64 paddr;  // This is the bus address, if available.
133   uint64 *tagvaddr;  // This holds the tag value if this data was tagged.
134   uint64 tagpaddr;  // This holds the physical address corresponding to the tag.
135 };
136
137 // This is a helper function to create new threads with pthreads.
138 static void *ThreadSpawnerGeneric(void *ptr) {
139   WorkerThread *worker = static_cast<WorkerThread*>(ptr);
140   worker->StartRoutine();
141   return NULL;
142 }
143
144 void WorkerStatus::Initialize() {
145   sat_assert(0 == pthread_mutex_init(&num_workers_mutex_, NULL));
146   sat_assert(0 == pthread_rwlock_init(&status_rwlock_, NULL));
147 #ifdef _POSIX_BARRIERS
148   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL,
149                                        num_workers_ + 1));
150 #endif
151 }
152
153 void WorkerStatus::Destroy() {
154   sat_assert(0 == pthread_mutex_destroy(&num_workers_mutex_));
155   sat_assert(0 == pthread_rwlock_destroy(&status_rwlock_));
156 #ifdef _POSIX_BARRIERS
157   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
158 #endif
159 }
160
161 void WorkerStatus::PauseWorkers() {
162   if (SetStatus(PAUSE) != PAUSE)
163     WaitOnPauseBarrier();
164 }
165
166 void WorkerStatus::ResumeWorkers() {
167   if (SetStatus(RUN) == PAUSE)
168     WaitOnPauseBarrier();
169 }
170
171 void WorkerStatus::StopWorkers() {
172   if (SetStatus(STOP) == PAUSE)
173     WaitOnPauseBarrier();
174 }
175
176 bool WorkerStatus::ContinueRunning() {
177   // This loop is an optimization.  We use it to immediately re-check the status
178   // after resuming from a pause, instead of returning and waiting for the next
179   // call to this function.
180   for (;;) {
181     switch (GetStatus()) {
182       case RUN:
183         return true;
184       case PAUSE:
185         // Wait for the other workers to call this function so that
186         // PauseWorkers() can return.
187         WaitOnPauseBarrier();
188         // Wait for ResumeWorkers() to be called.
189         WaitOnPauseBarrier();
190         break;
191       case STOP:
192         return false;
193     }
194   }
195 }
196
197 bool WorkerStatus::ContinueRunningNoPause() {
198   return (GetStatus() != STOP);
199 }
200
201 void WorkerStatus::RemoveSelf() {
202   // Acquire a read lock on status_rwlock_ while (status_ != PAUSE).
203   for (;;) {
204     AcquireStatusReadLock();
205     if (status_ != PAUSE)
206       break;
207     // We need to obey PauseWorkers() just like ContinueRunning() would, so that
208     // the other threads won't wait on pause_barrier_ forever.
209     ReleaseStatusLock();
210     // Wait for the other workers to call this function so that PauseWorkers()
211     // can return.
212     WaitOnPauseBarrier();
213     // Wait for ResumeWorkers() to be called.
214     WaitOnPauseBarrier();
215   }
216
217   // This lock would be unnecessary if we held a write lock instead of a read
218   // lock on status_rwlock_, but that would also force all threads calling
219   // ContinueRunning() to wait on this one.  Using a separate lock avoids that.
220   AcquireNumWorkersLock();
221   // Decrement num_workers_ and reinitialize pause_barrier_, which we know isn't
222   // in use because (status != PAUSE).
223 #ifdef _POSIX_BARRIERS
224   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
225   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL, num_workers_));
226 #endif
227   --num_workers_;
228   ReleaseNumWorkersLock();
229
230   // Release status_rwlock_.
231   ReleaseStatusLock();
232 }
233
234
235 // Parent thread class.
236 WorkerThread::WorkerThread() {
237   status_ = false;
238   pages_copied_ = 0;
239   errorcount_ = 0;
240   runduration_usec_ = 1;
241   priority_ = Normal;
242   worker_status_ = NULL;
243   thread_spawner_ = &ThreadSpawnerGeneric;
244   tag_mode_ = false;
245 }
246
247 WorkerThread::~WorkerThread() {}
248
249 // Constructors. Just init some default values.
250 FillThread::FillThread() {
251   num_pages_to_fill_ = 0;
252 }
253
254 // Initialize file name to empty.
255 FileThread::FileThread() {
256   filename_ = "";
257   devicename_ = "";
258   pass_ = 0;
259   page_io_ = true;
260   crc_page_ = -1;
261   local_page_ = NULL;
262 }
263
264 // If file thread used bounce buffer in memory, account for the extra
265 // copy for memory bandwidth calculation.
266 float FileThread::GetMemoryCopiedData() {
267   if (!os_->normal_mem())
268     return GetCopiedData();
269   else
270     return 0;
271 }
272
273 // Initialize target hostname to be invalid.
274 NetworkThread::NetworkThread() {
275   snprintf(ipaddr_, sizeof(ipaddr_), "Unknown");
276   sock_ = 0;
277 }
278
279 // Initialize?
280 NetworkSlaveThread::NetworkSlaveThread() {
281 }
282
283 // Initialize?
284 NetworkListenThread::NetworkListenThread() {
285 }
286
287 // Init member variables.
288 void WorkerThread::InitThread(int thread_num_init,
289                               class Sat *sat_init,
290                               class OsLayer *os_init,
291                               class PatternList *patternlist_init,
292                               WorkerStatus *worker_status) {
293   sat_assert(worker_status);
294   worker_status->AddWorkers(1);
295
296   thread_num_ = thread_num_init;
297   sat_ = sat_init;
298   os_ = os_init;
299   patternlist_ = patternlist_init;
300   worker_status_ = worker_status;
301
302   AvailableCpus(&cpu_mask_);
303   tag_ = 0xffffffff;
304
305   tag_mode_ = sat_->tag_mode();
306 }
307
308
309 // Use pthreads to prioritize a system thread.
310 bool WorkerThread::InitPriority() {
311   // This doesn't affect performance that much, and may not be too safe.
312
313   bool ret = BindToCpus(&cpu_mask_);
314   if (!ret)
315     logprintf(11, "Log: Bind to %s failed.\n",
316               cpuset_format(&cpu_mask_).c_str());
317
318   logprintf(11, "Log: Thread %d running on apic ID %d mask %s (%s).\n",
319             thread_num_, apicid(),
320             CurrentCpusFormat().c_str(),
321             cpuset_format(&cpu_mask_).c_str());
322 #if 0
323   if (priority_ == High) {
324     sched_param param;
325     param.sched_priority = 1;
326     // Set the priority; others are unchanged.
327     logprintf(0, "Log: Changing priority to SCHED_FIFO %d\n",
328               param.sched_priority);
329     if (sched_setscheduler(0, SCHED_FIFO, &param)) {
330       char buf[256];
331       sat_strerror(errno, buf, sizeof(buf));
332       logprintf(0, "Process Error: sched_setscheduler "
333                    "failed - error %d %s\n",
334                 errno, buf);
335     }
336   }
337 #endif
338   return true;
339 }
340
341 // Use pthreads to create a system thread.
342 int WorkerThread::SpawnThread() {
343   // Create the new thread.
344   int result = pthread_create(&thread_, NULL, thread_spawner_, this);
345   if (result) {
346     char buf[256];
347     sat_strerror(result, buf, sizeof(buf));
348     logprintf(0, "Process Error: pthread_create "
349                   "failed - error %d %s\n", result,
350               buf);
351     status_ = false;
352     return false;
353   }
354
355   // 0 is pthreads success.
356   return true;
357 }
358
359 // Kill the worker thread with SIGINT.
360 bool WorkerThread::KillThread() {
361   return (pthread_kill(thread_, SIGINT) == 0);
362 }
363
364 // Block until thread has exited.
365 bool WorkerThread::JoinThread() {
366   int result = pthread_join(thread_, NULL);
367
368   if (result) {
369     logprintf(0, "Process Error: pthread_join failed - error %d\n", result);
370     status_ = false;
371   }
372
373   // 0 is pthreads success.
374   return (!result);
375 }
376
377
378 void WorkerThread::StartRoutine() {
379   InitPriority();
380   StartThreadTimer();
381   Work();
382   StopThreadTimer();
383   worker_status_->RemoveSelf();
384 }
385
386
387 // Thread work loop. Execute until marked finished.
388 bool WorkerThread::Work() {
389   do {
390     logprintf(9, "Log: ...\n");
391     // Sleep for 1 second.
392     sat_sleep(1);
393   } while (IsReadyToRun());
394
395   return false;
396 }
397
398
399 // Returns CPU mask of CPUs available to this process,
400 // Conceptually, each bit represents a logical CPU, ie:
401 //   mask = 3  (11b):   cpu0, 1
402 //   mask = 13 (1101b): cpu0, 2, 3
403 bool WorkerThread::AvailableCpus(cpu_set_t *cpuset) {
404   CPU_ZERO(cpuset);
405 #ifdef HAVE_SCHED_GETAFFINITY
406   return sched_getaffinity(getppid(), sizeof(*cpuset), cpuset) == 0;
407 #else
408   return 0;
409 #endif
410 }
411
412
413 // Returns CPU mask of CPUs this thread is bound to,
414 // Conceptually, each bit represents a logical CPU, ie:
415 //   mask = 3  (11b):   cpu0, 1
416 //   mask = 13 (1101b): cpu0, 2, 3
417 bool WorkerThread::CurrentCpus(cpu_set_t *cpuset) {
418   CPU_ZERO(cpuset);
419 #ifdef HAVE_SCHED_GETAFFINITY
420   return sched_getaffinity(0, sizeof(*cpuset), cpuset) == 0;
421 #else
422   return 0;
423 #endif
424 }
425
426
427 // Bind worker thread to specified CPU(s)
428 //   Args:
429 //     thread_mask: cpu_set_t representing CPUs, ie
430 //                  mask = 1  (01b):   cpu0
431 //                  mask = 3  (11b):   cpu0, 1
432 //                  mask = 13 (1101b): cpu0, 2, 3
433 //
434 //   Returns true on success, false otherwise.
435 bool WorkerThread::BindToCpus(const cpu_set_t *thread_mask) {
436   cpu_set_t process_mask;
437   AvailableCpus(&process_mask);
438   if (cpuset_isequal(thread_mask, &process_mask))
439     return true;
440
441   logprintf(11, "Log: available CPU mask - %s\n",
442             cpuset_format(&process_mask).c_str());
443   if (!cpuset_issubset(thread_mask, &process_mask)) {
444     // Invalid cpu_mask, ie cpu not allocated to this process or doesn't exist.
445     logprintf(0, "Log: requested CPUs %s not a subset of available %s\n",
446               cpuset_format(thread_mask).c_str(),
447               cpuset_format(&process_mask).c_str());
448     return false;
449   }
450 #ifdef HAVE_SCHED_GETAFFINITY
451   return (sched_setaffinity(gettid(), sizeof(*thread_mask), thread_mask) == 0);
452 #else
453   return 0;
454 #endif
455 }
456
457
458 // A worker thread can yield itself to give up CPU until it's scheduled again.
459 //   Returns true on success, false on error.
460 bool WorkerThread::YieldSelf() {
461   return (sched_yield() == 0);
462 }
463
464
465 // Fill this page with its pattern.
466 bool WorkerThread::FillPage(struct page_entry *pe) {
467   // Error check arguments.
468   if (pe == 0) {
469     logprintf(0, "Process Error: Fill Page entry null\n");
470     return 0;
471   }
472
473   // Mask is the bitmask of indexes used by the pattern.
474   // It is the pattern size -1. Size is always a power of 2.
475   uint64 *memwords = static_cast<uint64*>(pe->addr);
476   int length = sat_->page_length();
477
478   if (tag_mode_) {
479     // Select tag or data as appropriate.
480     for (int i = 0; i < length / wordsize_; i++) {
481       datacast_t data;
482
483       if ((i & 0x7) == 0) {
484         data.l64 = addr_to_tag(&memwords[i]);
485       } else {
486         data.l32.l = pe->pattern->pattern(i << 1);
487         data.l32.h = pe->pattern->pattern((i << 1) + 1);
488       }
489       memwords[i] = data.l64;
490     }
491   } else {
492     // Just fill in untagged data directly.
493     for (int i = 0; i < length / wordsize_; i++) {
494       datacast_t data;
495
496       data.l32.l = pe->pattern->pattern(i << 1);
497       data.l32.h = pe->pattern->pattern((i << 1) + 1);
498       memwords[i] = data.l64;
499     }
500   }
501
502   return 1;
503 }
504
505
506 // Tell the thread how many pages to fill.
507 void FillThread::SetFillPages(int64 num_pages_to_fill_init) {
508   num_pages_to_fill_ = num_pages_to_fill_init;
509 }
510
511 // Fill this page with a random pattern.
512 bool FillThread::FillPageRandom(struct page_entry *pe) {
513   // Error check arguments.
514   if (pe == 0) {
515     logprintf(0, "Process Error: Fill Page entry null\n");
516     return 0;
517   }
518   if ((patternlist_ == 0) || (patternlist_->Size() == 0)) {
519     logprintf(0, "Process Error: No data patterns available\n");
520     return 0;
521   }
522
523   // Choose a random pattern for this block.
524   pe->pattern = patternlist_->GetRandomPattern();
525   if (pe->pattern == 0) {
526     logprintf(0, "Process Error: Null data pattern\n");
527     return 0;
528   }
529
530   // Actually fill the page.
531   return FillPage(pe);
532 }
533
534
535 // Memory fill work loop. Execute until alloted pages filled.
536 bool FillThread::Work() {
537   bool result = true;
538
539   logprintf(9, "Log: Starting fill thread %d\n", thread_num_);
540
541   // We want to fill num_pages_to_fill pages, and
542   // stop when we've filled that many.
543   // We also want to capture early break
544   struct page_entry pe;
545   int64 loops = 0;
546   while (IsReadyToRun() && (loops < num_pages_to_fill_)) {
547     result = result && sat_->GetEmpty(&pe);
548     if (!result) {
549       logprintf(0, "Process Error: fill_thread failed to pop pages, "
550                 "bailing\n");
551       break;
552     }
553
554     // Fill the page with pattern
555     result = result && FillPageRandom(&pe);
556     if (!result) break;
557
558     // Put the page back on the queue.
559     result = result && sat_->PutValid(&pe);
560     if (!result) {
561       logprintf(0, "Process Error: fill_thread failed to push pages, "
562                 "bailing\n");
563       break;
564     }
565     loops++;
566   }
567
568   // Fill in thread status.
569   pages_copied_ = loops;
570   status_ = result;
571   logprintf(9, "Log: Completed %d: Fill thread. Status %d, %d pages filled\n",
572             thread_num_, status_, pages_copied_);
573   return result;
574 }
575
576
577 // Print error information about a data miscompare.
578 void WorkerThread::ProcessError(struct ErrorRecord *error,
579                                 int priority,
580                                 const char *message) {
581   char dimm_string[256] = "";
582
583   int apic_id = apicid();
584
585   // Determine if this is a write or read error.
586   os_->Flush(error->vaddr);
587   error->reread = *(error->vaddr);
588
589   char *good = reinterpret_cast<char*>(&(error->expected));
590   char *bad = reinterpret_cast<char*>(&(error->actual));
591
592   sat_assert(error->expected != error->actual);
593   unsigned int offset = 0;
594   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
595     if (good[offset] != bad[offset])
596       break;
597   }
598
599   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
600
601   // Find physical address if possible.
602   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
603
604   // Pretty print DIMM mapping if available.
605   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
606
607   // Report parseable error.
608   if (priority < 5) {
609     // Run miscompare error through diagnoser for logging and reporting.
610     os_->error_diagnoser_->AddMiscompareError(dimm_string,
611                                               reinterpret_cast<uint64>
612                                               (error->vaddr), 1);
613
614     logprintf(priority,
615               "%s: miscompare on CPU %d(0x%s) at %p(0x%llx:%s): "
616               "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
617               message,
618               apic_id,
619               CurrentCpusFormat().c_str(),
620               error->vaddr,
621               error->paddr,
622               dimm_string,
623               error->actual,
624               error->reread,
625               error->expected);
626   }
627
628
629   // Overwrite incorrect data with correct data to prevent
630   // future miscompares when this data is reused.
631   *(error->vaddr) = error->expected;
632   os_->Flush(error->vaddr);
633 }
634
635
636
637 // Print error information about a data miscompare.
638 void FileThread::ProcessError(struct ErrorRecord *error,
639                               int priority,
640                               const char *message) {
641   char dimm_string[256] = "";
642
643   // Determine if this is a write or read error.
644   os_->Flush(error->vaddr);
645   error->reread = *(error->vaddr);
646
647   char *good = reinterpret_cast<char*>(&(error->expected));
648   char *bad = reinterpret_cast<char*>(&(error->actual));
649
650   sat_assert(error->expected != error->actual);
651   unsigned int offset = 0;
652   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
653     if (good[offset] != bad[offset])
654       break;
655   }
656
657   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
658
659   // Find physical address if possible.
660   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
661
662   // Pretty print DIMM mapping if available.
663   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
664
665   // If crc_page_ is valid, ie checking content read back from file,
666   // track src/dst memory addresses. Otherwise catagorize as general
667   // mememory miscompare for CRC checking everywhere else.
668   if (crc_page_ != -1) {
669     int miscompare_byteoffset = static_cast<char*>(error->vbyteaddr) -
670                                 static_cast<char*>(page_recs_[crc_page_].dst);
671     os_->error_diagnoser_->AddHDDMiscompareError(devicename_,
672                                                  crc_page_,
673                                                  miscompare_byteoffset,
674                                                  page_recs_[crc_page_].src,
675                                                  page_recs_[crc_page_].dst);
676   } else {
677     os_->error_diagnoser_->AddMiscompareError(dimm_string,
678                                               reinterpret_cast<uint64>
679                                               (error->vaddr), 1);
680   }
681
682   logprintf(priority,
683             "%s: miscompare on %s at %p(0x%llx:%s): read:0x%016llx, "
684             "reread:0x%016llx expected:0x%016llx\n",
685             message,
686             devicename_.c_str(),
687             error->vaddr,
688             error->paddr,
689             dimm_string,
690             error->actual,
691             error->reread,
692             error->expected);
693
694   // Overwrite incorrect data with correct data to prevent
695   // future miscompares when this data is reused.
696   *(error->vaddr) = error->expected;
697   os_->Flush(error->vaddr);
698 }
699
700
701 // Do a word by word result check of a region.
702 // Print errors on mismatches.
703 int WorkerThread::CheckRegion(void *addr,
704                               class Pattern *pattern,
705                               int64 length,
706                               int offset,
707                               int64 pattern_offset) {
708   uint64 *memblock = static_cast<uint64*>(addr);
709   const int kErrorLimit = 128;
710   int errors = 0;
711   int overflowerrors = 0;  // Count of overflowed errors.
712   bool page_error = false;
713   string errormessage("Hardware Error");
714   struct ErrorRecord
715     recorded[kErrorLimit];  // Queued errors for later printing.
716
717   // For each word in the data region.
718   for (int i = 0; i < length / wordsize_; i++) {
719     uint64 actual = memblock[i];
720     uint64 expected;
721
722     // Determine the value that should be there.
723     datacast_t data;
724     int index = 2 * i + pattern_offset;
725     data.l32.l = pattern->pattern(index);
726     data.l32.h = pattern->pattern(index + 1);
727     expected = data.l64;
728     // Check tags if necessary.
729     if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
730       expected = addr_to_tag(&memblock[i]);
731     }
732
733
734     // If the value is incorrect, save an error record for later printing.
735     if (actual != expected) {
736       if (errors < kErrorLimit) {
737         recorded[errors].actual = actual;
738         recorded[errors].expected = expected;
739         recorded[errors].vaddr = &memblock[i];
740         errors++;
741       } else {
742         page_error = true;
743         // If we have overflowed the error queue, just print the errors now.
744         logprintf(10, "Log: Error record overflow, too many miscompares!\n");
745         errormessage = "Page Error";
746         break;
747       }
748     }
749   }
750
751   // Find if this is a whole block corruption.
752   if (page_error && !tag_mode_) {
753     int patsize = patternlist_->Size();
754     for (int pat = 0; pat < patsize; pat++) {
755       class Pattern *altpattern = patternlist_->GetPattern(pat);
756       const int kGood = 0;
757       const int kBad = 1;
758       const int kGoodAgain = 2;
759       const int kNoMatch = 3;
760       int state = kGood;
761       unsigned int badstart = 0;
762       unsigned int badend = 0;
763
764       // Don't match against ourself!
765       if (pattern == altpattern)
766         continue;
767
768       for (int i = 0; i < length / wordsize_; i++) {
769         uint64 actual = memblock[i];
770         datacast_t expected;
771         datacast_t possible;
772
773         // Determine the value that should be there.
774         int index = 2 * i + pattern_offset;
775
776         expected.l32.l = pattern->pattern(index);
777         expected.l32.h = pattern->pattern(index + 1);
778
779         possible.l32.l = pattern->pattern(index);
780         possible.l32.h = pattern->pattern(index + 1);
781
782         if (state == kGood) {
783           if (actual == expected.l64) {
784             continue;
785           } else if (actual == possible.l64) {
786             badstart = i;
787             badend = i;
788             state = kBad;
789             continue;
790           } else {
791             state = kNoMatch;
792             break;
793           }
794         } else if (state == kBad) {
795           if (actual == possible.l64) {
796             badend = i;
797             continue;
798           } else if (actual == expected.l64) {
799             state = kGoodAgain;
800             continue;
801           } else {
802             state = kNoMatch;
803             break;
804           }
805         } else if (state == kGoodAgain) {
806           if (actual == expected.l64) {
807             continue;
808           } else {
809             state = kNoMatch;
810             break;
811           }
812         }
813       }
814
815       if ((state == kGoodAgain) || (state == kBad)) {
816         unsigned int blockerrors = badend - badstart + 1;
817         errormessage = "Block Error";
818         ProcessError(&recorded[0], 0, errormessage.c_str());
819         logprintf(0, "Block Error: (%p) pattern %s instead of %s, "
820                   "%d bytes from offset 0x%x to 0x%x\n",
821                   &memblock[badstart],
822                   altpattern->name(), pattern->name(),
823                   blockerrors * wordsize_,
824                   offset + badstart * wordsize_,
825                   offset + badend * wordsize_);
826         errorcount_ += blockerrors;
827         return blockerrors;
828       }
829     }
830   }
831
832
833   // Process error queue after all errors have been recorded.
834   for (int err = 0; err < errors; err++) {
835     int priority = 5;
836     if (errorcount_ + err < 30)
837       priority = 0;  // Bump up the priority for the first few errors.
838     ProcessError(&recorded[err], priority, errormessage.c_str());
839   }
840
841   if (page_error) {
842     // For each word in the data region.
843     int error_recount = 0;
844     for (int i = 0; i < length / wordsize_; i++) {
845       uint64 actual = memblock[i];
846       uint64 expected;
847       datacast_t data;
848       // Determine the value that should be there.
849       int index = 2 * i + pattern_offset;
850
851       data.l32.l = pattern->pattern(index);
852       data.l32.h = pattern->pattern(index + 1);
853       expected = data.l64;
854
855       // Check tags if necessary.
856       if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
857         expected = addr_to_tag(&memblock[i]);
858       }
859
860       // If the value is incorrect, save an error record for later printing.
861       if (actual != expected) {
862         if (error_recount < kErrorLimit) {
863           // We already reported these.
864           error_recount++;
865         } else {
866           // If we have overflowed the error queue, print the errors now.
867           struct ErrorRecord er;
868           er.actual = actual;
869           er.expected = expected;
870           er.vaddr = &memblock[i];
871
872           // Do the error printout. This will take a long time and
873           // likely change the machine state.
874           ProcessError(&er, 12, errormessage.c_str());
875           overflowerrors++;
876         }
877       }
878     }
879   }
880
881   // Keep track of observed errors.
882   errorcount_ += errors + overflowerrors;
883   return errors + overflowerrors;
884 }
885
886 float WorkerThread::GetCopiedData() {
887   return pages_copied_ * sat_->page_length() / kMegabyte;
888 }
889
890 // Calculate the CRC of a region.
891 // Result check if the CRC mismatches.
892 int WorkerThread::CrcCheckPage(struct page_entry *srcpe) {
893   const int blocksize = 4096;
894   const int blockwords = blocksize / wordsize_;
895   int errors = 0;
896
897   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
898   uint64 *memblock = static_cast<uint64*>(srcpe->addr);
899   int blocks = sat_->page_length() / blocksize;
900   for (int currentblock = 0; currentblock < blocks; currentblock++) {
901     uint64 *memslice = memblock + currentblock * blockwords;
902
903     AdlerChecksum crc;
904     if (tag_mode_) {
905       AdlerAddrCrcC(memslice, blocksize, &crc, srcpe);
906     } else {
907       CalculateAdlerChecksum(memslice, blocksize, &crc);
908     }
909
910     // If the CRC does not match, we'd better look closer.
911     if (!crc.Equals(*expectedcrc)) {
912       logprintf(11, "Log: CrcCheckPage Falling through to slow compare, "
913                 "CRC mismatch %s != %s\n",
914                 crc.ToHexString().c_str(),
915                 expectedcrc->ToHexString().c_str());
916       int errorcount = CheckRegion(memslice,
917                                    srcpe->pattern,
918                                    blocksize,
919                                    currentblock * blocksize, 0);
920       if (errorcount == 0) {
921         logprintf(0, "Log: CrcCheckPage CRC mismatch %s != %s, "
922                      "but no miscompares found.\n",
923                   crc.ToHexString().c_str(),
924                   expectedcrc->ToHexString().c_str());
925       }
926       errors += errorcount;
927     }
928   }
929
930   // For odd length transfers, we should never hit this.
931   int leftovers = sat_->page_length() % blocksize;
932   if (leftovers) {
933     uint64 *memslice = memblock + blocks * blockwords;
934     errors += CheckRegion(memslice,
935                           srcpe->pattern,
936                           leftovers,
937                           blocks * blocksize, 0);
938   }
939   return errors;
940 }
941
942
943 // Print error information about a data miscompare.
944 void WorkerThread::ProcessTagError(struct ErrorRecord *error,
945                                    int priority,
946                                    const char *message) {
947   char dimm_string[256] = "";
948   char tag_dimm_string[256] = "";
949   bool read_error = false;
950
951   int apic_id = apicid();
952
953   // Determine if this is a write or read error.
954   os_->Flush(error->vaddr);
955   error->reread = *(error->vaddr);
956
957   // Distinguish read and write errors.
958   if (error->actual != error->reread) {
959     read_error = true;
960   }
961
962   sat_assert(error->expected != error->actual);
963
964   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr);
965
966   // Find physical address if possible.
967   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
968   error->tagpaddr = os_->VirtualToPhysical(error->tagvaddr);
969
970   // Pretty print DIMM mapping if available.
971   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
972   // Pretty print DIMM mapping if available.
973   os_->FindDimm(error->tagpaddr, tag_dimm_string, sizeof(tag_dimm_string));
974
975   // Report parseable error.
976   if (priority < 5) {
977     logprintf(priority,
978               "%s: Tag from %p(0x%llx:%s) (%s) "
979               "miscompare on CPU %d(0x%s) at %p(0x%llx:%s): "
980               "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
981               message,
982               error->tagvaddr, error->tagpaddr,
983               tag_dimm_string,
984               read_error ? "read error" : "write error",
985               apic_id,
986               CurrentCpusFormat().c_str(),
987               error->vaddr,
988               error->paddr,
989               dimm_string,
990               error->actual,
991               error->reread,
992               error->expected);
993   }
994
995   errorcount_ += 1;
996
997   // Overwrite incorrect data with correct data to prevent
998   // future miscompares when this data is reused.
999   *(error->vaddr) = error->expected;
1000   os_->Flush(error->vaddr);
1001 }
1002
1003
1004 // Print out and log a tag error.
1005 bool WorkerThread::ReportTagError(
1006     uint64 *mem64,
1007     uint64 actual,
1008     uint64 tag) {
1009   struct ErrorRecord er;
1010   er.actual = actual;
1011
1012   er.expected = tag;
1013   er.vaddr = mem64;
1014
1015   // Generate vaddr from tag.
1016   er.tagvaddr = reinterpret_cast<uint64*>(actual);
1017
1018   ProcessTagError(&er, 0, "Hardware Error");
1019   return true;
1020 }
1021
1022 // C implementation of Adler memory copy, with memory tagging.
1023 bool WorkerThread::AdlerAddrMemcpyC(uint64 *dstmem64,
1024                                     uint64 *srcmem64,
1025                                     unsigned int size_in_bytes,
1026                                     AdlerChecksum *checksum,
1027                                     struct page_entry *pe) {
1028   // Use this data wrapper to access memory with 64bit read/write.
1029   datacast_t data;
1030   datacast_t dstdata;
1031   unsigned int count = size_in_bytes / sizeof(data);
1032
1033   if (count > ((1U) << 19)) {
1034     // Size is too large, must be strictly less than 512 KB.
1035     return false;
1036   }
1037
1038   uint64 a1 = 1;
1039   uint64 a2 = 1;
1040   uint64 b1 = 0;
1041   uint64 b2 = 0;
1042
1043   class Pattern *pattern = pe->pattern;
1044
1045   unsigned int i = 0;
1046   while (i < count) {
1047     // Process 64 bits at a time.
1048     if ((i & 0x7) == 0) {
1049       data.l64 = srcmem64[i];
1050       dstdata.l64 = dstmem64[i];
1051       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1052       uint64 dst_tag = addr_to_tag(&dstmem64[i]);
1053       // Detect if tags have been corrupted.
1054       if (data.l64 != src_tag)
1055         ReportTagError(&srcmem64[i], data.l64, src_tag);
1056       if (dstdata.l64 != dst_tag)
1057         ReportTagError(&dstmem64[i], dstdata.l64, dst_tag);
1058
1059       data.l32.l = pattern->pattern(i << 1);
1060       data.l32.h = pattern->pattern((i << 1) + 1);
1061       a1 = a1 + data.l32.l;
1062       b1 = b1 + a1;
1063       a1 = a1 + data.l32.h;
1064       b1 = b1 + a1;
1065
1066       data.l64  = dst_tag;
1067       dstmem64[i] = data.l64;
1068
1069     } else {
1070       data.l64 = srcmem64[i];
1071       a1 = a1 + data.l32.l;
1072       b1 = b1 + a1;
1073       a1 = a1 + data.l32.h;
1074       b1 = b1 + a1;
1075       dstmem64[i] = data.l64;
1076     }
1077     i++;
1078
1079     data.l64 = srcmem64[i];
1080     a2 = a2 + data.l32.l;
1081     b2 = b2 + a2;
1082     a2 = a2 + data.l32.h;
1083     b2 = b2 + a2;
1084     dstmem64[i] = data.l64;
1085     i++;
1086   }
1087   checksum->Set(a1, a2, b1, b2);
1088   return true;
1089 }
1090
1091 // x86_64 SSE2 assembly implementation of Adler memory copy, with address
1092 // tagging added as a second step. This is useful for debugging failures
1093 // that only occur when SSE / nontemporal writes are used.
1094 bool WorkerThread::AdlerAddrMemcpyWarm(uint64 *dstmem64,
1095                                        uint64 *srcmem64,
1096                                        unsigned int size_in_bytes,
1097                                        AdlerChecksum *checksum,
1098                                        struct page_entry *pe) {
1099   // Do ASM copy, ignore checksum.
1100   AdlerChecksum ignored_checksum;
1101   os_->AdlerMemcpyWarm(dstmem64, srcmem64, size_in_bytes, &ignored_checksum);
1102
1103   // Force cache flush.
1104   int length = size_in_bytes / sizeof(*dstmem64);
1105   for (int i = 0; i < length; i += sizeof(*dstmem64)) {
1106     os_->FastFlush(dstmem64 + i);
1107     os_->FastFlush(srcmem64 + i);
1108   }
1109   // Check results.
1110   AdlerAddrCrcC(srcmem64, size_in_bytes, checksum, pe);
1111   // Patch up address tags.
1112   TagAddrC(dstmem64, size_in_bytes);
1113   return true;
1114 }
1115
1116 // Retag pages..
1117 bool WorkerThread::TagAddrC(uint64 *memwords,
1118                             unsigned int size_in_bytes) {
1119   // Mask is the bitmask of indexes used by the pattern.
1120   // It is the pattern size -1. Size is always a power of 2.
1121
1122   // Select tag or data as appropriate.
1123   int length = size_in_bytes / wordsize_;
1124   for (int i = 0; i < length; i += 8) {
1125     datacast_t data;
1126     data.l64 = addr_to_tag(&memwords[i]);
1127     memwords[i] = data.l64;
1128   }
1129   return true;
1130 }
1131
1132 // C implementation of Adler memory crc.
1133 bool WorkerThread::AdlerAddrCrcC(uint64 *srcmem64,
1134                                  unsigned int size_in_bytes,
1135                                  AdlerChecksum *checksum,
1136                                  struct page_entry *pe) {
1137   // Use this data wrapper to access memory with 64bit read/write.
1138   datacast_t data;
1139   unsigned int count = size_in_bytes / sizeof(data);
1140
1141   if (count > ((1U) << 19)) {
1142     // Size is too large, must be strictly less than 512 KB.
1143     return false;
1144   }
1145
1146   uint64 a1 = 1;
1147   uint64 a2 = 1;
1148   uint64 b1 = 0;
1149   uint64 b2 = 0;
1150
1151   class Pattern *pattern = pe->pattern;
1152
1153   unsigned int i = 0;
1154   while (i < count) {
1155     // Process 64 bits at a time.
1156     if ((i & 0x7) == 0) {
1157       data.l64 = srcmem64[i];
1158       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1159       // Check that tags match expected.
1160       if (data.l64 != src_tag)
1161         ReportTagError(&srcmem64[i], data.l64, src_tag);
1162
1163       data.l32.l = pattern->pattern(i << 1);
1164       data.l32.h = pattern->pattern((i << 1) + 1);
1165       a1 = a1 + data.l32.l;
1166       b1 = b1 + a1;
1167       a1 = a1 + data.l32.h;
1168       b1 = b1 + a1;
1169     } else {
1170       data.l64 = srcmem64[i];
1171       a1 = a1 + data.l32.l;
1172       b1 = b1 + a1;
1173       a1 = a1 + data.l32.h;
1174       b1 = b1 + a1;
1175     }
1176     i++;
1177
1178     data.l64 = srcmem64[i];
1179     a2 = a2 + data.l32.l;
1180     b2 = b2 + a2;
1181     a2 = a2 + data.l32.h;
1182     b2 = b2 + a2;
1183     i++;
1184   }
1185   checksum->Set(a1, a2, b1, b2);
1186   return true;
1187 }
1188
1189 // Copy a block of memory quickly, while keeping a CRC of the data.
1190 // Result check if the CRC mismatches.
1191 int WorkerThread::CrcCopyPage(struct page_entry *dstpe,
1192                               struct page_entry *srcpe) {
1193   int errors = 0;
1194   const int blocksize = 4096;
1195   const int blockwords = blocksize / wordsize_;
1196   int blocks = sat_->page_length() / blocksize;
1197
1198   // Base addresses for memory copy
1199   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1200   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1201   // Remember the expected CRC
1202   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1203
1204   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1205     uint64 *targetmem = targetmembase + currentblock * blockwords;
1206     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1207
1208     AdlerChecksum crc;
1209     if (tag_mode_) {
1210       AdlerAddrMemcpyC(targetmem, sourcemem, blocksize, &crc, srcpe);
1211     } else {
1212       AdlerMemcpyC(targetmem, sourcemem, blocksize, &crc);
1213     }
1214
1215     // Investigate miscompares.
1216     if (!crc.Equals(*expectedcrc)) {
1217       logprintf(11, "Log: CrcCopyPage Falling through to slow compare, "
1218                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1219                 expectedcrc->ToHexString().c_str());
1220       int errorcount = CheckRegion(sourcemem,
1221                                    srcpe->pattern,
1222                                    blocksize,
1223                                    currentblock * blocksize, 0);
1224       if (errorcount == 0) {
1225         logprintf(0, "Log: CrcCopyPage CRC mismatch %s != %s, "
1226                      "but no miscompares found. Retrying with fresh data.\n",
1227                   crc.ToHexString().c_str(),
1228                   expectedcrc->ToHexString().c_str());
1229         if (!tag_mode_) {
1230           // Copy the data originally read from this region back again.
1231           // This data should have any corruption read originally while
1232           // calculating the CRC.
1233           memcpy(sourcemem, targetmem, blocksize);
1234           errorcount = CheckRegion(sourcemem,
1235                                    srcpe->pattern,
1236                                    blocksize,
1237                                    currentblock * blocksize, 0);
1238           if (errorcount == 0) {
1239             int apic_id = apicid();
1240             logprintf(0, "Process Error: CPU %d(0x%s) CrcCopyPage "
1241                          "CRC mismatch %s != %s, "
1242                          "but no miscompares found on second pass.\n",
1243                       apic_id, CurrentCpusFormat().c_str(),
1244                       crc.ToHexString().c_str(),
1245                       expectedcrc->ToHexString().c_str());
1246             struct ErrorRecord er;
1247             er.actual = sourcemem[0];
1248             er.expected = 0x0;
1249             er.vaddr = sourcemem;
1250             ProcessError(&er, 0, "Hardware Error");
1251           }
1252         }
1253       }
1254       errors += errorcount;
1255     }
1256   }
1257
1258   // For odd length transfers, we should never hit this.
1259   int leftovers = sat_->page_length() % blocksize;
1260   if (leftovers) {
1261     uint64 *targetmem = targetmembase + blocks * blockwords;
1262     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1263
1264     errors += CheckRegion(sourcemem,
1265                           srcpe->pattern,
1266                           leftovers,
1267                           blocks * blocksize, 0);
1268     int leftoverwords = leftovers / wordsize_;
1269     for (int i = 0; i < leftoverwords; i++) {
1270       targetmem[i] = sourcemem[i];
1271     }
1272   }
1273
1274   // Update pattern reference to reflect new contents.
1275   dstpe->pattern = srcpe->pattern;
1276
1277   // Clean clean clean the errors away.
1278   if (errors) {
1279     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1280     // cause bad data to be propogated across the page.
1281     FillPage(dstpe);
1282   }
1283   return errors;
1284 }
1285
1286
1287
1288 // Invert a block of memory quickly, traversing downwards.
1289 int InvertThread::InvertPageDown(struct page_entry *srcpe) {
1290   const int blocksize = 4096;
1291   const int blockwords = blocksize / wordsize_;
1292   int blocks = sat_->page_length() / blocksize;
1293
1294   // Base addresses for memory copy
1295   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1296
1297   for (int currentblock = blocks-1; currentblock >= 0; currentblock--) {
1298     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1299     for (int i = blockwords - 32; i >= 0; i -= 32) {
1300       for (int index = i + 31; index >= i; --index) {
1301         unsigned int actual = sourcemem[index];
1302         sourcemem[index] = ~actual;
1303       }
1304       OsLayer::FastFlush(&sourcemem[i]);
1305     }
1306   }
1307
1308   return 0;
1309 }
1310
1311 // Invert a block of memory, traversing upwards.
1312 int InvertThread::InvertPageUp(struct page_entry *srcpe) {
1313   const int blocksize = 4096;
1314   const int blockwords = blocksize / wordsize_;
1315   int blocks = sat_->page_length() / blocksize;
1316
1317   // Base addresses for memory copy
1318   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1319
1320   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1321     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1322     for (int i = 0; i < blockwords; i += 32) {
1323       for (int index = i; index <= i + 31; ++index) {
1324         unsigned int actual = sourcemem[index];
1325         sourcemem[index] = ~actual;
1326       }
1327       OsLayer::FastFlush(&sourcemem[i]);
1328     }
1329   }
1330   return 0;
1331 }
1332
1333 // Copy a block of memory quickly, while keeping a CRC of the data.
1334 // Result check if the CRC mismatches. Warm the CPU while running
1335 int WorkerThread::CrcWarmCopyPage(struct page_entry *dstpe,
1336                                   struct page_entry *srcpe) {
1337   int errors = 0;
1338   const int blocksize = 4096;
1339   const int blockwords = blocksize / wordsize_;
1340   int blocks = sat_->page_length() / blocksize;
1341
1342   // Base addresses for memory copy
1343   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1344   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1345   // Remember the expected CRC
1346   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1347
1348   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1349     uint64 *targetmem = targetmembase + currentblock * blockwords;
1350     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1351
1352     AdlerChecksum crc;
1353     if (tag_mode_) {
1354       AdlerAddrMemcpyWarm(targetmem, sourcemem, blocksize, &crc, srcpe);
1355     } else {
1356       os_->AdlerMemcpyWarm(targetmem, sourcemem, blocksize, &crc);
1357     }
1358
1359     // Investigate miscompares.
1360     if (!crc.Equals(*expectedcrc)) {
1361       logprintf(11, "Log: CrcWarmCopyPage Falling through to slow compare, "
1362                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1363                 expectedcrc->ToHexString().c_str());
1364       int errorcount = CheckRegion(sourcemem,
1365                                    srcpe->pattern,
1366                                    blocksize,
1367                                    currentblock * blocksize, 0);
1368       if (errorcount == 0) {
1369         logprintf(0, "Log: CrcWarmCopyPage CRC mismatch %s != %s, "
1370                      "but no miscompares found. Retrying with fresh data.\n",
1371                   crc.ToHexString().c_str(),
1372                   expectedcrc->ToHexString().c_str());
1373         if (!tag_mode_) {
1374           // Copy the data originally read from this region back again.
1375           // This data should have any corruption read originally while
1376           // calculating the CRC.
1377           memcpy(sourcemem, targetmem, blocksize);
1378           errorcount = CheckRegion(sourcemem,
1379                                    srcpe->pattern,
1380                                    blocksize,
1381                                    currentblock * blocksize, 0);
1382           if (errorcount == 0) {
1383             int apic_id = apicid();
1384             logprintf(0, "Process Error: CPU %d(0x%s) CrciWarmCopyPage "
1385                          "CRC mismatch %s != %s, "
1386                          "but no miscompares found on second pass.\n",
1387                       apic_id, CurrentCpusFormat().c_str(),
1388                       crc.ToHexString().c_str(),
1389                       expectedcrc->ToHexString().c_str());
1390             struct ErrorRecord er;
1391             er.actual = sourcemem[0];
1392             er.expected = 0x0;
1393             er.vaddr = sourcemem;
1394             ProcessError(&er, 0, "Hardware Error");
1395           }
1396         }
1397       }
1398       errors += errorcount;
1399     }
1400   }
1401
1402   // For odd length transfers, we should never hit this.
1403   int leftovers = sat_->page_length() % blocksize;
1404   if (leftovers) {
1405     uint64 *targetmem = targetmembase + blocks * blockwords;
1406     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1407
1408     errors += CheckRegion(sourcemem,
1409                           srcpe->pattern,
1410                           leftovers,
1411                           blocks * blocksize, 0);
1412     int leftoverwords = leftovers / wordsize_;
1413     for (int i = 0; i < leftoverwords; i++) {
1414       targetmem[i] = sourcemem[i];
1415     }
1416   }
1417
1418   // Update pattern reference to reflect new contents.
1419   dstpe->pattern = srcpe->pattern;
1420
1421   // Clean clean clean the errors away.
1422   if (errors) {
1423     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1424     // cause bad data to be propogated across the page.
1425     FillPage(dstpe);
1426   }
1427   return errors;
1428 }
1429
1430
1431
1432 // Memory check work loop. Execute until done, then exhaust pages.
1433 bool CheckThread::Work() {
1434   struct page_entry pe;
1435   bool result = true;
1436   int64 loops = 0;
1437
1438   logprintf(9, "Log: Starting Check thread %d\n", thread_num_);
1439
1440   // We want to check all the pages, and
1441   // stop when there aren't any left.
1442   while (true) {
1443     result = result && sat_->GetValid(&pe);
1444     if (!result) {
1445       if (IsReadyToRunNoPause())
1446         logprintf(0, "Process Error: check_thread failed to pop pages, "
1447                   "bailing\n");
1448       else
1449         result = true;
1450       break;
1451     }
1452
1453     // Do the result check.
1454     CrcCheckPage(&pe);
1455
1456     // Push pages back on the valid queue if we are still going,
1457     // throw them out otherwise.
1458     if (IsReadyToRunNoPause())
1459       result = result && sat_->PutValid(&pe);
1460     else
1461       result = result && sat_->PutEmpty(&pe);
1462     if (!result) {
1463       logprintf(0, "Process Error: check_thread failed to push pages, "
1464                 "bailing\n");
1465       break;
1466     }
1467     loops++;
1468   }
1469
1470   pages_copied_ = loops;
1471   status_ = result;
1472   logprintf(9, "Log: Completed %d: Check thread. Status %d, %d pages checked\n",
1473             thread_num_, status_, pages_copied_);
1474   return result;
1475 }
1476
1477
1478 // Memory copy work loop. Execute until marked done.
1479 bool CopyThread::Work() {
1480   struct page_entry src;
1481   struct page_entry dst;
1482   bool result = true;
1483   int64 loops = 0;
1484
1485   logprintf(9, "Log: Starting copy thread %d: cpu %s, mem %x\n",
1486             thread_num_, cpuset_format(&cpu_mask_).c_str(), tag_);
1487
1488   while (IsReadyToRun()) {
1489     // Pop the needed pages.
1490     result = result && sat_->GetValid(&src, tag_);
1491     result = result && sat_->GetEmpty(&dst, tag_);
1492     if (!result) {
1493       logprintf(0, "Process Error: copy_thread failed to pop pages, "
1494                 "bailing\n");
1495       break;
1496     }
1497
1498     // Force errors for unittests.
1499     if (sat_->error_injection()) {
1500       if (loops == 8) {
1501         char *addr = reinterpret_cast<char*>(src.addr);
1502         int offset = random() % sat_->page_length();
1503         addr[offset] = 0xba;
1504       }
1505     }
1506
1507     // We can use memcpy, or CRC check while we copy.
1508     if (sat_->warm()) {
1509       CrcWarmCopyPage(&dst, &src);
1510     } else if (sat_->strict()) {
1511       CrcCopyPage(&dst, &src);
1512     } else {
1513       memcpy(dst.addr, src.addr, sat_->page_length());
1514       dst.pattern = src.pattern;
1515     }
1516
1517     result = result && sat_->PutValid(&dst);
1518     result = result && sat_->PutEmpty(&src);
1519
1520     // Copy worker-threads yield themselves at the end of each copy loop,
1521     // to avoid threads from preempting each other in the middle of the inner
1522     // copy-loop. Cooperations between Copy worker-threads results in less
1523     // unnecessary cache thrashing (which happens when context-switching in the
1524     // middle of the inner copy-loop).
1525     YieldSelf();
1526
1527     if (!result) {
1528       logprintf(0, "Process Error: copy_thread failed to push pages, "
1529                 "bailing\n");
1530       break;
1531     }
1532     loops++;
1533   }
1534
1535   pages_copied_ = loops;
1536   status_ = result;
1537   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1538             thread_num_, status_, pages_copied_);
1539   return result;
1540 }
1541
1542 // Memory invert work loop. Execute until marked done.
1543 bool InvertThread::Work() {
1544   struct page_entry src;
1545   bool result = true;
1546   int64 loops = 0;
1547
1548   logprintf(9, "Log: Starting invert thread %d\n", thread_num_);
1549
1550   while (IsReadyToRun()) {
1551     // Pop the needed pages.
1552     result = result && sat_->GetValid(&src);
1553     if (!result) {
1554       logprintf(0, "Process Error: invert_thread failed to pop pages, "
1555                 "bailing\n");
1556       break;
1557     }
1558
1559     if (sat_->strict())
1560       CrcCheckPage(&src);
1561
1562     // For the same reason CopyThread yields itself (see YieldSelf comment
1563     // in CopyThread::Work(), InvertThread yields itself after each invert
1564     // operation to improve cooperation between different worker threads
1565     // stressing the memory/cache.
1566     InvertPageUp(&src);
1567     YieldSelf();
1568     InvertPageDown(&src);
1569     YieldSelf();
1570     InvertPageDown(&src);
1571     YieldSelf();
1572     InvertPageUp(&src);
1573     YieldSelf();
1574
1575     if (sat_->strict())
1576       CrcCheckPage(&src);
1577
1578     result = result && sat_->PutValid(&src);
1579     if (!result) {
1580       logprintf(0, "Process Error: invert_thread failed to push pages, "
1581                 "bailing\n");
1582       break;
1583     }
1584     loops++;
1585   }
1586
1587   pages_copied_ = loops * 2;
1588   status_ = result;
1589   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1590             thread_num_, status_, pages_copied_);
1591   return result;
1592 }
1593
1594
1595 // Set file name to use for File IO.
1596 void FileThread::SetFile(const char *filename_init) {
1597   filename_ = filename_init;
1598   devicename_ = os_->FindFileDevice(filename_);
1599 }
1600
1601 // Open the file for access.
1602 bool FileThread::OpenFile(int *pfile) {
1603   bool no_O_DIRECT = false;
1604   int flags = O_RDWR | O_CREAT | O_SYNC;
1605   int fd = open(filename_.c_str(), flags | O_DIRECT, 0644);
1606   if (O_DIRECT != 0 && fd < 0 && errno == EINVAL) {
1607     no_O_DIRECT = true;
1608     fd = open(filename_.c_str(), flags, 0644); // Try without O_DIRECT
1609   }
1610   if (fd < 0) {
1611     logprintf(0, "Process Error: Failed to create file %s!!\n",
1612               filename_.c_str());
1613     pages_copied_ = 0;
1614     return false;
1615   }
1616   if (no_O_DIRECT)
1617     os_->ActivateFlushPageCache(); // Not using O_DIRECT fixed EINVAL
1618   *pfile = fd;
1619   return true;
1620 }
1621
1622 // Close the file.
1623 bool FileThread::CloseFile(int fd) {
1624   close(fd);
1625   return true;
1626 }
1627
1628 // Check sector tagging.
1629 bool FileThread::SectorTagPage(struct page_entry *src, int block) {
1630   int page_length = sat_->page_length();
1631   struct FileThread::SectorTag *tag =
1632     (struct FileThread::SectorTag *)(src->addr);
1633
1634   // Tag each sector.
1635   unsigned char magic = ((0xba + thread_num_) & 0xff);
1636   for (int sec = 0; sec < page_length / 512; sec++) {
1637     tag[sec].magic = magic;
1638     tag[sec].block = block & 0xff;
1639     tag[sec].sector = sec & 0xff;
1640     tag[sec].pass = pass_ & 0xff;
1641   }
1642   return true;
1643 }
1644
1645 bool FileThread::WritePageToFile(int fd, struct page_entry *src) {
1646   int page_length = sat_->page_length();
1647   // Fill the file with our data.
1648   int64 size = write(fd, src->addr, page_length);
1649
1650   if (size != page_length) {
1651     os_->ErrorReport(devicename_.c_str(), "write-error", 1);
1652     errorcount_++;
1653     logprintf(0, "Block Error: file_thread failed to write, "
1654               "bailing\n");
1655     return false;
1656   }
1657   return true;
1658 }
1659
1660 // Write the data to the file.
1661 bool FileThread::WritePages(int fd) {
1662   int strict = sat_->strict();
1663
1664   // Start fresh at beginning of file for each batch of pages.
1665   lseek64(fd, 0, SEEK_SET);
1666   for (int i = 0; i < sat_->disk_pages(); i++) {
1667     struct page_entry src;
1668     if (!GetValidPage(&src))
1669       return false;
1670     // Save expected pattern.
1671     page_recs_[i].pattern = src.pattern;
1672     page_recs_[i].src = src.addr;
1673
1674     // Check data correctness.
1675     if (strict)
1676       CrcCheckPage(&src);
1677
1678     SectorTagPage(&src, i);
1679
1680     bool result = WritePageToFile(fd, &src);
1681
1682     if (!PutEmptyPage(&src))
1683       return false;
1684
1685     if (!result)
1686       return false;
1687   }
1688   return os_->FlushPageCache(); // If O_DIRECT worked, this will be a NOP.
1689 }
1690
1691 // Copy data from file into memory block.
1692 bool FileThread::ReadPageFromFile(int fd, struct page_entry *dst) {
1693   int page_length = sat_->page_length();
1694
1695   // Do the actual read.
1696   int64 size = read(fd, dst->addr, page_length);
1697   if (size != page_length) {
1698     os_->ErrorReport(devicename_.c_str(), "read-error", 1);
1699     logprintf(0, "Block Error: file_thread failed to read, "
1700               "bailing\n");
1701     errorcount_++;
1702     return false;
1703   }
1704   return true;
1705 }
1706
1707 // Check sector tagging.
1708 bool FileThread::SectorValidatePage(const struct PageRec &page,
1709                                     struct page_entry *dst, int block) {
1710   // Error injection.
1711   static int calls = 0;
1712   calls++;
1713
1714   // Do sector tag compare.
1715   int firstsector = -1;
1716   int lastsector = -1;
1717   bool badsector = false;
1718   int page_length = sat_->page_length();
1719
1720   // Cast data block into an array of tagged sectors.
1721   struct FileThread::SectorTag *tag =
1722   (struct FileThread::SectorTag *)(dst->addr);
1723
1724   sat_assert(sizeof(*tag) == 512);
1725
1726   // Error injection.
1727   if (sat_->error_injection()) {
1728     if (calls == 2) {
1729       for (int badsec = 8; badsec < 17; badsec++)
1730         tag[badsec].pass = 27;
1731     }
1732     if (calls == 18) {
1733       (static_cast<int32*>(dst->addr))[27] = 0xbadda7a;
1734     }
1735   }
1736
1737   // Check each sector for the correct tag we added earlier,
1738   // then revert the tag to the to normal data pattern.
1739   unsigned char magic = ((0xba + thread_num_) & 0xff);
1740   for (int sec = 0; sec < page_length / 512; sec++) {
1741     // Check magic tag.
1742     if ((tag[sec].magic != magic) ||
1743         (tag[sec].block != (block & 0xff)) ||
1744         (tag[sec].sector != (sec & 0xff)) ||
1745         (tag[sec].pass != (pass_ & 0xff))) {
1746       // Offset calculation for tag location.
1747       int offset = sec * sizeof(SectorTag);
1748       if (tag[sec].block != (block & 0xff))
1749         offset += 1 * sizeof(uint8);
1750       else if (tag[sec].sector != (sec & 0xff))
1751         offset += 2 * sizeof(uint8);
1752       else if (tag[sec].pass != (pass_ & 0xff))
1753         offset += 3 * sizeof(uint8);
1754
1755       // Run sector tag error through diagnoser for logging and reporting.
1756       errorcount_ += 1;
1757       os_->error_diagnoser_->AddHDDSectorTagError(devicename_, tag[sec].block,
1758                                                   offset,
1759                                                   tag[sec].sector,
1760                                                   page.src, page.dst);
1761
1762       logprintf(5, "Sector Error: Sector tag @ 0x%x, pass %d/%d. "
1763                 "sec %x/%x, block %d/%d, magic %x/%x, File: %s \n",
1764                 block * page_length + 512 * sec,
1765                 (pass_ & 0xff), (unsigned int)tag[sec].pass,
1766                 sec, (unsigned int)tag[sec].sector,
1767                 block, (unsigned int)tag[sec].block,
1768                 magic, (unsigned int)tag[sec].magic,
1769                 filename_.c_str());
1770
1771       // Keep track of first and last bad sector.
1772       if (firstsector == -1)
1773         firstsector = (block * page_length / 512) + sec;
1774       lastsector = (block * page_length / 512) + sec;
1775       badsector = true;
1776     }
1777     // Patch tag back to proper pattern.
1778     unsigned int *addr = (unsigned int *)(&tag[sec]);
1779     *addr = dst->pattern->pattern(512 * sec / sizeof(*addr));
1780   }
1781
1782   // If we found sector errors:
1783   if (badsector == true) {
1784     logprintf(5, "Log: file sector miscompare at offset %x-%x. File: %s\n",
1785               firstsector * 512,
1786               ((lastsector + 1) * 512) - 1,
1787               filename_.c_str());
1788
1789     // Either exit immediately, or patch the data up and continue.
1790     if (sat_->stop_on_error()) {
1791       exit(1);
1792     } else {
1793       // Patch up bad pages.
1794       for (int block = (firstsector * 512) / page_length;
1795           block <= (lastsector * 512) / page_length;
1796           block++) {
1797         unsigned int *memblock = static_cast<unsigned int *>(dst->addr);
1798         int length = page_length / wordsize_;
1799         for (int i = 0; i < length; i++) {
1800           memblock[i] = dst->pattern->pattern(i);
1801         }
1802       }
1803     }
1804   }
1805   return true;
1806 }
1807
1808 // Get memory for an incoming data transfer..
1809 bool FileThread::PagePrepare() {
1810   // We can only do direct IO to SAT pages if it is normal mem.
1811   page_io_ = os_->normal_mem();
1812
1813   // Init a local buffer if we need it.
1814   if (!page_io_) {
1815 #ifdef HAVE_POSIX_MEMALIGN
1816     int result = posix_memalign(&local_page_, 512, sat_->page_length());
1817 #else
1818     local_page_ = memalign(512, sat_->page_length());
1819     int result = (local_page_ == 0);
1820 #endif
1821     if (result) {
1822       logprintf(0, "Process Error: disk thread posix_memalign "
1823                    "returned %d (fail)\n",
1824                 result);
1825       status_ = false;
1826       return false;
1827     }
1828   }
1829   return true;
1830 }
1831
1832
1833 // Remove memory allocated for data transfer.
1834 bool FileThread::PageTeardown() {
1835   // Free a local buffer if we need to.
1836   if (!page_io_) {
1837     free(local_page_);
1838   }
1839   return true;
1840 }
1841
1842
1843
1844 // Get memory for an incoming data transfer..
1845 bool FileThread::GetEmptyPage(struct page_entry *dst) {
1846   if (page_io_) {
1847     if (!sat_->GetEmpty(dst))
1848       return false;
1849   } else {
1850     dst->addr = local_page_;
1851     dst->offset = 0;
1852     dst->pattern = 0;
1853   }
1854   return true;
1855 }
1856
1857 // Get memory for an outgoing data transfer..
1858 bool FileThread::GetValidPage(struct page_entry *src) {
1859   struct page_entry tmp;
1860   if (!sat_->GetValid(&tmp))
1861     return false;
1862   if (page_io_) {
1863     *src = tmp;
1864     return true;
1865   } else {
1866     src->addr = local_page_;
1867     src->offset = 0;
1868     CrcCopyPage(src, &tmp);
1869     if (!sat_->PutValid(&tmp))
1870       return false;
1871   }
1872   return true;
1873 }
1874
1875
1876 // Throw out a used empty page.
1877 bool FileThread::PutEmptyPage(struct page_entry *src) {
1878   if (page_io_) {
1879     if (!sat_->PutEmpty(src))
1880       return false;
1881   }
1882   return true;
1883 }
1884
1885 // Throw out a used, filled page.
1886 bool FileThread::PutValidPage(struct page_entry *src) {
1887   if (page_io_) {
1888     if (!sat_->PutValid(src))
1889       return false;
1890   }
1891   return true;
1892 }
1893
1894 // Copy data from file into memory blocks.
1895 bool FileThread::ReadPages(int fd) {
1896   int page_length = sat_->page_length();
1897   int strict = sat_->strict();
1898   bool result = true;
1899
1900   // Read our data back out of the file, into it's new location.
1901   lseek64(fd, 0, SEEK_SET);
1902   for (int i = 0; i < sat_->disk_pages(); i++) {
1903     struct page_entry dst;
1904     if (!GetEmptyPage(&dst))
1905       return false;
1906     // Retrieve expected pattern.
1907     dst.pattern = page_recs_[i].pattern;
1908     // Update page recordpage record.
1909     page_recs_[i].dst = dst.addr;
1910
1911     // Read from the file into destination page.
1912     if (!ReadPageFromFile(fd, &dst)) {
1913         PutEmptyPage(&dst);
1914         return false;
1915     }
1916
1917     SectorValidatePage(page_recs_[i], &dst, i);
1918
1919     // Ensure that the transfer ended up with correct data.
1920     if (strict) {
1921       // Record page index currently CRC checked.
1922       crc_page_ = i;
1923       int errors = CrcCheckPage(&dst);
1924       if (errors) {
1925         logprintf(5, "Log: file miscompare at block %d, "
1926                   "offset %x-%x. File: %s\n",
1927                   i, i * page_length, ((i + 1) * page_length) - 1,
1928                   filename_.c_str());
1929         result = false;
1930       }
1931       crc_page_ = -1;
1932       errorcount_ += errors;
1933     }
1934     if (!PutValidPage(&dst))
1935       return false;
1936   }
1937   return result;
1938 }
1939
1940 // File IO work loop. Execute until marked done.
1941 bool FileThread::Work() {
1942   bool result = true;
1943   int64 loops = 0;
1944
1945   logprintf(9, "Log: Starting file thread %d, file %s, device %s\n",
1946             thread_num_,
1947             filename_.c_str(),
1948             devicename_.c_str());
1949
1950   if (!PagePrepare()) {
1951     status_ = false;
1952     return false;
1953   }
1954
1955   // Open the data IO file.
1956   int fd = 0;
1957   if (!OpenFile(&fd)) {
1958     status_ = false;
1959     return false;
1960   }
1961
1962   pass_ = 0;
1963
1964   // Load patterns into page records.
1965   page_recs_ = new struct PageRec[sat_->disk_pages()];
1966   for (int i = 0; i < sat_->disk_pages(); i++) {
1967     page_recs_[i].pattern = new struct Pattern();
1968   }
1969
1970   // Loop until done.
1971   while (IsReadyToRun()) {
1972     // Do the file write.
1973     if (!(result = result && WritePages(fd)))
1974       break;
1975
1976     // Do the file read.
1977     if (!(result = result && ReadPages(fd)))
1978       break;
1979
1980     loops++;
1981     pass_ = loops;
1982   }
1983
1984   pages_copied_ = loops * sat_->disk_pages();
1985
1986   // Clean up.
1987   CloseFile(fd);
1988   PageTeardown();
1989
1990   logprintf(9, "Log: Completed %d: file thread status %d, %d pages copied\n",
1991             thread_num_, status_, pages_copied_);
1992   // Failure to read from device indicates hardware,
1993   // rather than procedural SW error.
1994   status_ = true;
1995   return true;
1996 }
1997
1998 bool NetworkThread::IsNetworkStopSet() {
1999   return !IsReadyToRunNoPause();
2000 }
2001
2002 bool NetworkSlaveThread::IsNetworkStopSet() {
2003   // This thread has no completion status.
2004   // It finishes whever there is no more data to be
2005   // passed back.
2006   return true;
2007 }
2008
2009 // Set ip name to use for Network IO.
2010 void NetworkThread::SetIP(const char *ipaddr_init) {
2011   strncpy(ipaddr_, ipaddr_init, 256);
2012 }
2013
2014 // Create a socket.
2015 // Return 0 on error.
2016 bool NetworkThread::CreateSocket(int *psocket) {
2017   int sock = socket(AF_INET, SOCK_STREAM, 0);
2018   if (sock == -1) {
2019     logprintf(0, "Process Error: Cannot open socket\n");
2020     pages_copied_ = 0;
2021     status_ = false;
2022     return false;
2023   }
2024   *psocket = sock;
2025   return true;
2026 }
2027
2028 // Close the socket.
2029 bool NetworkThread::CloseSocket(int sock) {
2030   close(sock);
2031   return true;
2032 }
2033
2034 // Initiate the tcp connection.
2035 bool NetworkThread::Connect(int sock) {
2036   struct sockaddr_in dest_addr;
2037   dest_addr.sin_family = AF_INET;
2038   dest_addr.sin_port = htons(kNetworkPort);
2039   memset(&(dest_addr.sin_zero), '\0', sizeof(dest_addr.sin_zero));
2040
2041   // Translate dot notation to u32.
2042   if (inet_aton(ipaddr_, &dest_addr.sin_addr) == 0) {
2043     logprintf(0, "Process Error: Cannot resolve %s\n", ipaddr_);
2044     pages_copied_ = 0;
2045     status_ = false;
2046     return false;
2047   }
2048
2049   if (-1 == connect(sock, reinterpret_cast<struct sockaddr *>(&dest_addr),
2050                     sizeof(struct sockaddr))) {
2051     logprintf(0, "Process Error: Cannot connect %s\n", ipaddr_);
2052     pages_copied_ = 0;
2053     status_ = false;
2054     return false;
2055   }
2056   return true;
2057 }
2058
2059 // Initiate the tcp connection.
2060 bool NetworkListenThread::Listen() {
2061   struct sockaddr_in sa;
2062
2063   memset(&(sa.sin_zero), '\0', sizeof(sa.sin_zero));
2064
2065   sa.sin_family = AF_INET;
2066   sa.sin_addr.s_addr = INADDR_ANY;
2067   sa.sin_port = htons(kNetworkPort);
2068
2069   if (-1 == bind(sock_, (struct sockaddr*)&sa, sizeof(struct sockaddr))) {
2070     char buf[256];
2071     sat_strerror(errno, buf, sizeof(buf));
2072     logprintf(0, "Process Error: Cannot bind socket: %s\n", buf);
2073     pages_copied_ = 0;
2074     status_ = false;
2075     return false;
2076   }
2077   listen(sock_, 3);
2078   return true;
2079 }
2080
2081 // Wait for a connection from a network traffic generation thread.
2082 bool NetworkListenThread::Wait() {
2083     fd_set rfds;
2084     struct timeval tv;
2085     int retval;
2086
2087     // Watch sock_ to see when it has input.
2088     FD_ZERO(&rfds);
2089     FD_SET(sock_, &rfds);
2090     // Wait up to five seconds.
2091     tv.tv_sec = 5;
2092     tv.tv_usec = 0;
2093
2094     retval = select(sock_ + 1, &rfds, NULL, NULL, &tv);
2095
2096     return (retval > 0);
2097 }
2098
2099 // Wait for a connection from a network traffic generation thread.
2100 bool NetworkListenThread::GetConnection(int *pnewsock) {
2101   struct sockaddr_in sa;
2102   socklen_t size = sizeof(struct sockaddr_in);
2103
2104   int newsock = accept(sock_, reinterpret_cast<struct sockaddr *>(&sa), &size);
2105   if (newsock < 0)  {
2106     logprintf(0, "Process Error: Did not receive connection\n");
2107     pages_copied_ = 0;
2108     status_ = false;
2109     return false;
2110   }
2111   *pnewsock = newsock;
2112   return true;
2113 }
2114
2115 // Send a page, return false if a page was not sent.
2116 bool NetworkThread::SendPage(int sock, struct page_entry *src) {
2117   int page_length = sat_->page_length();
2118   char *address = static_cast<char*>(src->addr);
2119
2120   // Send our data over the network.
2121   int size = page_length;
2122   while (size) {
2123     int transferred = send(sock, address + (page_length - size), size, 0);
2124     if ((transferred == 0) || (transferred == -1)) {
2125       if (!IsNetworkStopSet()) {
2126         char buf[256] = "";
2127         sat_strerror(errno, buf, sizeof(buf));
2128         logprintf(0, "Process Error: Thread %d, "
2129                      "Network write failed, bailing. (%s)\n",
2130                   thread_num_, buf);
2131         status_ = false;
2132       }
2133       return false;
2134     }
2135     size = size - transferred;
2136   }
2137   return true;
2138 }
2139
2140 // Receive a page. Return false if a page was not received.
2141 bool NetworkThread::ReceivePage(int sock, struct page_entry *dst) {
2142   int page_length = sat_->page_length();
2143   char *address = static_cast<char*>(dst->addr);
2144
2145   // Maybe we will get our data back again, maybe not.
2146   int size = page_length;
2147   while (size) {
2148     int transferred = recv(sock, address + (page_length - size), size, 0);
2149     if ((transferred == 0) || (transferred == -1)) {
2150       // Typically network slave thread should exit as network master
2151       // thread stops sending data.
2152       if (IsNetworkStopSet()) {
2153         int err = errno;
2154         if (transferred == 0 && err == 0) {
2155           // Two system setups will not sync exactly,
2156           // allow early exit, but log it.
2157           logprintf(0, "Log: Net thread did not receive any data, exiting.\n");
2158         } else {
2159           char buf[256] = "";
2160           sat_strerror(err, buf, sizeof(buf));
2161           // Print why we failed.
2162           logprintf(0, "Process Error: Thread %d, "
2163                        "Network read failed, bailing (%s).\n",
2164                     thread_num_, buf);
2165           status_ = false;
2166           // Print arguments and results.
2167           logprintf(0, "Log: recv(%d, address %x, size %x, 0) == %x, err %d\n",
2168                     sock, address + (page_length - size),
2169                     size, transferred, err);
2170           if ((transferred == 0) &&
2171               (page_length - size < 512) &&
2172               (page_length - size > 0)) {
2173             // Print null terminated data received, to see who's been
2174             // sending us supicious unwanted data.
2175             address[page_length - size] = 0;
2176             logprintf(0, "Log: received  %d bytes: '%s'\n",
2177                       page_length - size, address);
2178           }
2179         }
2180       }
2181       return false;
2182     }
2183     size = size - transferred;
2184   }
2185   return true;
2186 }
2187
2188 // Network IO work loop. Execute until marked done.
2189 // Return true if the thread ran as expected.
2190 bool NetworkThread::Work() {
2191   logprintf(9, "Log: Starting network thread %d, ip %s\n",
2192             thread_num_,
2193             ipaddr_);
2194
2195   // Make a socket.
2196   int sock = 0;
2197   if (!CreateSocket(&sock))
2198     return false;
2199
2200   // Network IO loop requires network slave thread to have already initialized.
2201   // We will sleep here for awhile to ensure that the slave thread will be
2202   // listening by the time we connect.
2203   // Sleep for 15 seconds.
2204   sat_sleep(15);
2205   logprintf(9, "Log: Starting execution of network thread %d, ip %s\n",
2206             thread_num_,
2207             ipaddr_);
2208
2209
2210   // Connect to a slave thread.
2211   if (!Connect(sock))
2212     return false;
2213
2214   // Loop until done.
2215   bool result = true;
2216   int strict = sat_->strict();
2217   int64 loops = 0;
2218   while (IsReadyToRun()) {
2219     struct page_entry src;
2220     struct page_entry dst;
2221     result = result && sat_->GetValid(&src);
2222     result = result && sat_->GetEmpty(&dst);
2223     if (!result) {
2224       logprintf(0, "Process Error: net_thread failed to pop pages, "
2225                 "bailing\n");
2226       break;
2227     }
2228
2229     // Check data correctness.
2230     if (strict)
2231       CrcCheckPage(&src);
2232
2233     // Do the network write.
2234     if (!(result = result && SendPage(sock, &src)))
2235       break;
2236
2237     // Update pattern reference to reflect new contents.
2238     dst.pattern = src.pattern;
2239
2240     // Do the network read.
2241     if (!(result = result && ReceivePage(sock, &dst)))
2242       break;
2243
2244     // Ensure that the transfer ended up with correct data.
2245     if (strict)
2246       CrcCheckPage(&dst);
2247
2248     // Return all of our pages to the queue.
2249     result = result && sat_->PutValid(&dst);
2250     result = result && sat_->PutEmpty(&src);
2251     if (!result) {
2252       logprintf(0, "Process Error: net_thread failed to push pages, "
2253                 "bailing\n");
2254       break;
2255     }
2256     loops++;
2257   }
2258
2259   pages_copied_ = loops;
2260   status_ = result;
2261
2262   // Clean up.
2263   CloseSocket(sock);
2264
2265   logprintf(9, "Log: Completed %d: network thread status %d, "
2266                "%d pages copied\n",
2267             thread_num_, status_, pages_copied_);
2268   return result;
2269 }
2270
2271 // Spawn slave threads for incoming connections.
2272 bool NetworkListenThread::SpawnSlave(int newsock, int threadid) {
2273   logprintf(12, "Log: Listen thread spawning slave\n");
2274
2275   // Spawn slave thread, to reflect network traffic back to sender.
2276   ChildWorker *child_worker = new ChildWorker;
2277   child_worker->thread.SetSock(newsock);
2278   child_worker->thread.InitThread(threadid, sat_, os_, patternlist_,
2279                                   &child_worker->status);
2280   child_worker->status.Initialize();
2281   child_worker->thread.SpawnThread();
2282   child_workers_.push_back(child_worker);
2283
2284   return true;
2285 }
2286
2287 // Reap slave threads.
2288 bool NetworkListenThread::ReapSlaves() {
2289   bool result = true;
2290   // Gather status and reap threads.
2291   logprintf(12, "Log: Joining all outstanding threads\n");
2292
2293   for (size_t i = 0; i < child_workers_.size(); i++) {
2294     NetworkSlaveThread& child_thread = child_workers_[i]->thread;
2295     logprintf(12, "Log: Joining slave thread %d\n", i);
2296     child_thread.JoinThread();
2297     if (child_thread.GetStatus() != 1) {
2298       logprintf(0, "Process Error: Slave Thread %d failed with status %d\n", i,
2299                 child_thread.GetStatus());
2300       result = false;
2301     }
2302     errorcount_ += child_thread.GetErrorCount();
2303     logprintf(9, "Log: Slave Thread %d found %lld miscompares\n", i,
2304               child_thread.GetErrorCount());
2305     pages_copied_ += child_thread.GetPageCount();
2306   }
2307
2308   return result;
2309 }
2310
2311 // Network listener IO work loop. Execute until marked done.
2312 // Return false on fatal software error.
2313 bool NetworkListenThread::Work() {
2314   logprintf(9, "Log: Starting network listen thread %d\n",
2315             thread_num_);
2316
2317   // Make a socket.
2318   sock_ = 0;
2319   if (!CreateSocket(&sock_)) {
2320     status_ = false;
2321     return false;
2322   }
2323   logprintf(9, "Log: Listen thread created sock\n");
2324
2325   // Allows incoming connections to be queued up by socket library.
2326   int newsock = 0;
2327   Listen();
2328   logprintf(12, "Log: Listen thread waiting for incoming connections\n");
2329
2330   // Wait on incoming connections, and spawn worker threads for them.
2331   int threadcount = 0;
2332   while (IsReadyToRun()) {
2333     // Poll for connections that we can accept().
2334     if (Wait()) {
2335       // Accept those connections.
2336       logprintf(12, "Log: Listen thread found incoming connection\n");
2337       if (GetConnection(&newsock)) {
2338         SpawnSlave(newsock, threadcount);
2339         threadcount++;
2340       }
2341     }
2342   }
2343
2344   // Gather status and join spawned threads.
2345   ReapSlaves();
2346
2347   // Delete the child workers.
2348   for (ChildVector::iterator it = child_workers_.begin();
2349        it != child_workers_.end(); ++it) {
2350     (*it)->status.Destroy();
2351     delete *it;
2352   }
2353   child_workers_.clear();
2354
2355   CloseSocket(sock_);
2356
2357   status_ = true;
2358   logprintf(9,
2359             "Log: Completed %d: network listen thread status %d, "
2360             "%d pages copied\n",
2361             thread_num_, status_, pages_copied_);
2362   return true;
2363 }
2364
2365 // Set network reflector socket struct.
2366 void NetworkSlaveThread::SetSock(int sock) {
2367   sock_ = sock;
2368 }
2369
2370 // Network reflector IO work loop. Execute until marked done.
2371 // Return false on fatal software error.
2372 bool NetworkSlaveThread::Work() {
2373   logprintf(9, "Log: Starting network slave thread %d\n",
2374             thread_num_);
2375
2376   // Verify that we have a socket.
2377   int sock = sock_;
2378   if (!sock) {
2379     status_ = false;
2380     return false;
2381   }
2382
2383   // Loop until done.
2384   int64 loops = 0;
2385   // Init a local buffer for storing data.
2386   void *local_page = NULL;
2387 #ifdef HAVE_POSIX_MEMALIGN
2388   int result = posix_memalign(&local_page, 512, sat_->page_length());
2389 #else
2390   local_page = memalign(512, sat_->page_length());
2391   int result = (local_page == 0);
2392 #endif
2393   if (result) {
2394     logprintf(0, "Process Error: net slave posix_memalign "
2395                  "returned %d (fail)\n",
2396               result);
2397     status_ = false;
2398     return false;
2399   }
2400
2401   struct page_entry page;
2402   page.addr = local_page;
2403
2404   // This thread will continue to run as long as the thread on the other end of
2405   // the socket is still sending and receiving data.
2406   while (1) {
2407     // Do the network read.
2408     if (!ReceivePage(sock, &page))
2409       break;
2410
2411     // Do the network write.
2412     if (!SendPage(sock, &page))
2413       break;
2414
2415     loops++;
2416   }
2417
2418   pages_copied_ = loops;
2419   // No results provided from this type of thread.
2420   status_ = true;
2421
2422   // Clean up.
2423   CloseSocket(sock);
2424
2425   logprintf(9,
2426             "Log: Completed %d: network slave thread status %d, "
2427             "%d pages copied\n",
2428             thread_num_, status_, pages_copied_);
2429   return true;
2430 }
2431
2432 // Thread work loop. Execute until marked finished.
2433 bool ErrorPollThread::Work() {
2434   logprintf(9, "Log: Starting system error poll thread %d\n", thread_num_);
2435
2436   // This calls a generic error polling function in the Os abstraction layer.
2437   do {
2438     errorcount_ += os_->ErrorPoll();
2439     os_->ErrorWait();
2440   } while (IsReadyToRun());
2441
2442   logprintf(9, "Log: Finished system error poll thread %d: %d errors\n",
2443             thread_num_, errorcount_);
2444   status_ = true;
2445   return true;
2446 }
2447
2448 // Worker thread to heat up CPU.
2449 // This thread does not evaluate pass/fail or software error.
2450 bool CpuStressThread::Work() {
2451   logprintf(9, "Log: Starting CPU stress thread %d\n", thread_num_);
2452
2453   do {
2454     // Run ludloff's platform/CPU-specific assembly workload.
2455     os_->CpuStressWorkload();
2456     YieldSelf();
2457   } while (IsReadyToRun());
2458
2459   logprintf(9, "Log: Finished CPU stress thread %d:\n",
2460             thread_num_);
2461   status_ = true;
2462   return true;
2463 }
2464
2465 CpuCacheCoherencyThread::CpuCacheCoherencyThread(cc_cacheline_data *data,
2466                                                  int cacheline_count,
2467                                                  int thread_num,
2468                                                  int inc_count) {
2469   cc_cacheline_data_ = data;
2470   cc_cacheline_count_ = cacheline_count;
2471   cc_thread_num_ = thread_num;
2472   cc_inc_count_ = inc_count;
2473 }
2474
2475 // Worked thread to test the cache coherency of the CPUs
2476 // Return false on fatal sw error.
2477 bool CpuCacheCoherencyThread::Work() {
2478   logprintf(9, "Log: Starting the Cache Coherency thread %d\n",
2479             cc_thread_num_);
2480   uint64 time_start, time_end;
2481   struct timeval tv;
2482
2483   unsigned int seed = static_cast<unsigned int>(gettid());
2484   gettimeofday(&tv, NULL);  // Get the timestamp before increments.
2485   time_start = tv.tv_sec * 1000000ULL + tv.tv_usec;
2486
2487   uint64 total_inc = 0;  // Total increments done by the thread.
2488   while (IsReadyToRun()) {
2489     for (int i = 0; i < cc_inc_count_; i++) {
2490       // Choose a datastructure in random and increment the appropriate
2491       // member in that according to the offset (which is the same as the
2492       // thread number.
2493 #ifdef HAVE_RAND_R
2494       int r = rand_r(&seed);
2495 #else
2496       int r = rand();
2497 #endif
2498       r = cc_cacheline_count_ * (r / (RAND_MAX + 1.0));
2499       // Increment the member of the randomely selected structure.
2500       (cc_cacheline_data_[r].num[cc_thread_num_])++;
2501     }
2502
2503     total_inc += cc_inc_count_;
2504
2505     // Calculate if the local counter matches with the global value
2506     // in all the cache line structures for this particular thread.
2507     int cc_global_num = 0;
2508     for (int cline_num = 0; cline_num < cc_cacheline_count_; cline_num++) {
2509       cc_global_num += cc_cacheline_data_[cline_num].num[cc_thread_num_];
2510       // Reset the cachline member's value for the next run.
2511       cc_cacheline_data_[cline_num].num[cc_thread_num_] = 0;
2512     }
2513     if (sat_->error_injection())
2514       cc_global_num = -1;
2515
2516     if (cc_global_num != cc_inc_count_) {
2517       errorcount_++;
2518       logprintf(0, "Hardware Error: global(%d) and local(%d) do not match\n",
2519                 cc_global_num, cc_inc_count_);
2520     }
2521   }
2522   gettimeofday(&tv, NULL);  // Get the timestamp at the end.
2523   time_end = tv.tv_sec * 1000000ULL + tv.tv_usec;
2524
2525   uint64 us_elapsed = time_end - time_start;
2526   // inc_rate is the no. of increments per second.
2527   double inc_rate = total_inc * 1e6 / us_elapsed;
2528
2529   logprintf(4, "Stats: CC Thread(%d): Time=%llu us,"
2530             " Increments=%llu, Increments/sec = %.6lf\n",
2531             cc_thread_num_, us_elapsed, total_inc, inc_rate);
2532   logprintf(9, "Log: Finished CPU Cache Coherency thread %d:\n",
2533             cc_thread_num_);
2534   status_ = true;
2535   return true;
2536 }
2537
2538 DiskThread::DiskThread(DiskBlockTable *block_table) {
2539   read_block_size_ = kSectorSize;   // default 1 sector (512 bytes)
2540   write_block_size_ = kSectorSize;  // this assumes read and write block size
2541                                     // are the same
2542   segment_size_ = -1;               // use the entire disk as one segment
2543   cache_size_ = 16 * 1024 * 1024;   // assume 16MiB cache by default
2544   // Use a queue such that 3/2 times as much data as the cache can hold
2545   // is written before it is read so that there is little chance the read
2546   // data is in the cache.
2547   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2548   blocks_per_segment_ = 32;
2549
2550   read_threshold_ = 100000;         // 100ms is a reasonable limit for
2551   write_threshold_ = 100000;        // reading/writing a sector
2552
2553   read_timeout_ = 5000000;          // 5 seconds should be long enough for a
2554   write_timeout_ = 5000000;         // timout for reading/writing
2555
2556   device_sectors_ = 0;
2557   non_destructive_ = 0;
2558
2559 #ifdef HAVE_LIBAIO_H
2560   aio_ctx_ = 0;
2561 #endif
2562   block_table_ = block_table;
2563   update_block_table_ = 1;
2564
2565   block_buffer_ = NULL;
2566
2567   blocks_written_ = 0;
2568   blocks_read_ = 0;
2569 }
2570
2571 DiskThread::~DiskThread() {
2572   if (block_buffer_)
2573     free(block_buffer_);
2574 }
2575
2576 // Set filename for device file (in /dev).
2577 void DiskThread::SetDevice(const char *device_name) {
2578   device_name_ = device_name;
2579 }
2580
2581 // Set various parameters that control the behaviour of the test.
2582 // -1 is used as a sentinel value on each parameter (except non_destructive)
2583 // to indicate that the parameter not be set.
2584 bool DiskThread::SetParameters(int read_block_size,
2585                                int write_block_size,
2586                                int64 segment_size,
2587                                int64 cache_size,
2588                                int blocks_per_segment,
2589                                int64 read_threshold,
2590                                int64 write_threshold,
2591                                int non_destructive) {
2592   if (read_block_size != -1) {
2593     // Blocks must be aligned to the disk's sector size.
2594     if (read_block_size % kSectorSize != 0) {
2595       logprintf(0, "Process Error: Block size must be a multiple of %d "
2596                 "(thread %d).\n", kSectorSize, thread_num_);
2597       return false;
2598     }
2599
2600     read_block_size_ = read_block_size;
2601   }
2602
2603   if (write_block_size != -1) {
2604     // Write blocks must be aligned to the disk's sector size and to the
2605     // block size.
2606     if (write_block_size % kSectorSize != 0) {
2607       logprintf(0, "Process Error: Write block size must be a multiple "
2608                 "of %d (thread %d).\n", kSectorSize, thread_num_);
2609       return false;
2610     }
2611     if (write_block_size % read_block_size_ != 0) {
2612       logprintf(0, "Process Error: Write block size must be a multiple "
2613                 "of the read block size, which is %d (thread %d).\n",
2614                 read_block_size_, thread_num_);
2615       return false;
2616     }
2617
2618     write_block_size_ = write_block_size;
2619
2620   } else {
2621     // Make sure write_block_size_ is still valid.
2622     if (read_block_size_ > write_block_size_) {
2623       logprintf(5, "Log: Assuming write block size equal to read block size, "
2624                 "which is %d (thread %d).\n", read_block_size_,
2625                 thread_num_);
2626       write_block_size_ = read_block_size_;
2627     } else {
2628       if (write_block_size_ % read_block_size_ != 0) {
2629         logprintf(0, "Process Error: Write block size (defined as %d) must "
2630                   "be a multiple of the read block size, which is %d "
2631                   "(thread %d).\n", write_block_size_, read_block_size_,
2632                   thread_num_);
2633         return false;
2634       }
2635     }
2636   }
2637
2638   if (cache_size != -1) {
2639     cache_size_ = cache_size;
2640   }
2641
2642   if (blocks_per_segment != -1) {
2643     if (blocks_per_segment <= 0) {
2644       logprintf(0, "Process Error: Blocks per segment must be greater than "
2645                    "zero.\n (thread %d)", thread_num_);
2646       return false;
2647     }
2648
2649     blocks_per_segment_ = blocks_per_segment;
2650   }
2651
2652   if (read_threshold != -1) {
2653     if (read_threshold <= 0) {
2654       logprintf(0, "Process Error: Read threshold must be greater than "
2655                    "zero (thread %d).\n", thread_num_);
2656       return false;
2657     }
2658
2659     read_threshold_ = read_threshold;
2660   }
2661
2662   if (write_threshold != -1) {
2663     if (write_threshold <= 0) {
2664       logprintf(0, "Process Error: Write threshold must be greater than "
2665                    "zero (thread %d).\n", thread_num_);
2666       return false;
2667     }
2668
2669     write_threshold_ = write_threshold;
2670   }
2671
2672   if (segment_size != -1) {
2673     // Segments must be aligned to the disk's sector size.
2674     if (segment_size % kSectorSize != 0) {
2675       logprintf(0, "Process Error: Segment size must be a multiple of %d"
2676                 " (thread %d).\n", kSectorSize, thread_num_);
2677       return false;
2678     }
2679
2680     segment_size_ = segment_size / kSectorSize;
2681   }
2682
2683   non_destructive_ = non_destructive;
2684
2685   // Having a queue of 150% of blocks that will fit in the disk's cache
2686   // should be enough to force out the oldest block before it is read and hence,
2687   // making sure the data comes form the disk and not the cache.
2688   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2689   // Updating DiskBlockTable parameters
2690   if (update_block_table_) {
2691     block_table_->SetParameters(kSectorSize, write_block_size_,
2692                                 device_sectors_, segment_size_,
2693                                 device_name_);
2694   }
2695   return true;
2696 }
2697
2698 // Open a device, return false on failure.
2699 bool DiskThread::OpenDevice(int *pfile) {
2700   bool no_O_DIRECT = false;
2701   int flags = O_RDWR | O_SYNC | O_LARGEFILE;
2702   int fd = open(device_name_.c_str(), flags | O_DIRECT, 0);
2703   if (O_DIRECT != 0 && fd < 0 && errno == EINVAL) {
2704     no_O_DIRECT = true;
2705     fd = open(device_name_.c_str(), flags, 0); // Try without O_DIRECT
2706   }
2707   if (fd < 0) {
2708     logprintf(0, "Process Error: Failed to open device %s (thread %d)!!\n",
2709               device_name_.c_str(), thread_num_);
2710     return false;
2711   }
2712   if (no_O_DIRECT)
2713     os_->ActivateFlushPageCache();
2714   *pfile = fd;
2715
2716   return GetDiskSize(fd);
2717 }
2718
2719 // Retrieves the size (in bytes) of the disk/file.
2720 // Return false on failure.
2721 bool DiskThread::GetDiskSize(int fd) {
2722   struct stat device_stat;
2723   if (fstat(fd, &device_stat) == -1) {
2724     logprintf(0, "Process Error: Unable to fstat disk %s (thread %d).\n",
2725               device_name_.c_str(), thread_num_);
2726     return false;
2727   }
2728
2729   // For a block device, an ioctl is needed to get the size since the size
2730   // of the device file (i.e. /dev/sdb) is 0.
2731   if (S_ISBLK(device_stat.st_mode)) {
2732     uint64 block_size = 0;
2733
2734     if (ioctl(fd, BLKGETSIZE64, &block_size) == -1) {
2735       logprintf(0, "Process Error: Unable to ioctl disk %s (thread %d).\n",
2736                 device_name_.c_str(), thread_num_);
2737       return false;
2738     }
2739
2740     // Zero size indicates nonworking device..
2741     if (block_size == 0) {
2742       os_->ErrorReport(device_name_.c_str(), "device-size-zero", 1);
2743       ++errorcount_;
2744       status_ = true;  // Avoid a procedural error.
2745       return false;
2746     }
2747
2748     device_sectors_ = block_size / kSectorSize;
2749
2750   } else if (S_ISREG(device_stat.st_mode)) {
2751     device_sectors_ = device_stat.st_size / kSectorSize;
2752
2753   } else {
2754     logprintf(0, "Process Error: %s is not a regular file or block "
2755               "device (thread %d).\n", device_name_.c_str(),
2756               thread_num_);
2757     return false;
2758   }
2759
2760   logprintf(12, "Log: Device sectors: %lld on disk %s (thread %d).\n",
2761             device_sectors_, device_name_.c_str(), thread_num_);
2762
2763   if (update_block_table_) {
2764     block_table_->SetParameters(kSectorSize, write_block_size_,
2765                                 device_sectors_, segment_size_,
2766                                 device_name_);
2767   }
2768
2769   return true;
2770 }
2771
2772 bool DiskThread::CloseDevice(int fd) {
2773   close(fd);
2774   return true;
2775 }
2776
2777 // Return the time in microseconds.
2778 int64 DiskThread::GetTime() {
2779   struct timeval tv;
2780   gettimeofday(&tv, NULL);
2781   return tv.tv_sec * 1000000 + tv.tv_usec;
2782 }
2783
2784 // Do randomized reads and (possibly) writes on a device.
2785 // Return false on fatal SW error, true on SW success,
2786 // regardless of whether HW failed.
2787 bool DiskThread::DoWork(int fd) {
2788   int64 block_num = 0;
2789   int64 num_segments;
2790
2791   if (segment_size_ == -1) {
2792     num_segments = 1;
2793   } else {
2794     num_segments = device_sectors_ / segment_size_;
2795     if (device_sectors_ % segment_size_ != 0)
2796       num_segments++;
2797   }
2798
2799   // Disk size should be at least 3x cache size.  See comment later for
2800   // details.
2801   sat_assert(device_sectors_ * kSectorSize > 3 * cache_size_);
2802
2803   // This disk test works by writing blocks with a certain pattern to
2804   // disk, then reading them back and verifying it against the pattern
2805   // at a later time.  A failure happens when either the block cannot
2806   // be written/read or when the read block is different than what was
2807   // written.  If a block takes too long to write/read, then a warning
2808   // is given instead of an error since taking too long is not
2809   // necessarily an error.
2810   //
2811   // To prevent the read blocks from coming from the disk cache,
2812   // enough blocks are written before read such that a block would
2813   // be ejected from the disk cache by the time it is read.
2814   //
2815   // TODO(amistry): Implement some sort of read/write throttling.  The
2816   //                flood of asynchronous I/O requests when a drive is
2817   //                unplugged is causing the application and kernel to
2818   //                become unresponsive.
2819
2820   while (IsReadyToRun()) {
2821     // Write blocks to disk.
2822     logprintf(16, "Log: Write phase %sfor disk %s (thread %d).\n",
2823               non_destructive_ ? "(disabled) " : "",
2824               device_name_.c_str(), thread_num_);
2825     while (IsReadyToRunNoPause() &&
2826            in_flight_sectors_.size() <
2827                static_cast<size_t>(queue_size_ + 1)) {
2828       // Confine testing to a particular segment of the disk.
2829       int64 segment = (block_num / blocks_per_segment_) % num_segments;
2830       if (!non_destructive_ &&
2831           (block_num % blocks_per_segment_ == 0)) {
2832         logprintf(20, "Log: Starting to write segment %lld out of "
2833                   "%lld on disk %s (thread %d).\n",
2834                   segment, num_segments, device_name_.c_str(),
2835                   thread_num_);
2836       }
2837       block_num++;
2838
2839       BlockData *block = block_table_->GetUnusedBlock(segment);
2840
2841       // If an unused sequence of sectors could not be found, skip to the
2842       // next block to process.  Soon, a new segment will come and new
2843       // sectors will be able to be allocated.  This effectively puts a
2844       // minumim on the disk size at 3x the stated cache size, or 48MiB
2845       // if a cache size is not given (since the cache is set as 16MiB
2846       // by default).  Given that todays caches are at the low MiB range
2847       // and drive sizes at the mid GB, this shouldn't pose a problem.
2848       // The 3x minimum comes from the following:
2849       //   1. In order to allocate 'y' blocks from a segment, the
2850       //      segment must contain at least 2y blocks or else an
2851       //      allocation may not succeed.
2852       //   2. Assume the entire disk is one segment.
2853       //   3. A full write phase consists of writing blocks corresponding to
2854       //      3/2 cache size.
2855       //   4. Therefore, the one segment must have 2 * 3/2 * cache
2856       //      size worth of blocks = 3 * cache size worth of blocks
2857       //      to complete.
2858       // In non-destructive mode, don't write anything to disk.
2859       if (!non_destructive_) {
2860         if (!WriteBlockToDisk(fd, block)) {
2861           block_table_->RemoveBlock(block);
2862           return true;
2863         }
2864         blocks_written_++;
2865       }
2866
2867       // Block is either initialized by writing, or in nondestructive case,
2868       // initialized by being added into the datastructure for later reading.
2869       block->SetBlockAsInitialized();
2870
2871       in_flight_sectors_.push(block);
2872     }
2873     if (!os_->FlushPageCache()) // If O_DIRECT worked, this will be a NOP.
2874       return false;
2875
2876     // Verify blocks on disk.
2877     logprintf(20, "Log: Read phase for disk %s (thread %d).\n",
2878               device_name_.c_str(), thread_num_);
2879     while (IsReadyToRunNoPause() && !in_flight_sectors_.empty()) {
2880       BlockData *block = in_flight_sectors_.front();
2881       in_flight_sectors_.pop();
2882       if (!ValidateBlockOnDisk(fd, block))
2883         return true;
2884       block_table_->RemoveBlock(block);
2885       blocks_read_++;
2886     }
2887   }
2888
2889   pages_copied_ = blocks_written_ + blocks_read_;
2890   return true;
2891 }
2892
2893 // Do an asynchronous disk I/O operation.
2894 // Return false if the IO is not set up.
2895 bool DiskThread::AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
2896                             int64 offset, int64 timeout) {
2897 #ifdef HAVE_LIBAIO_H
2898   // Use the Linux native asynchronous I/O interface for reading/writing.
2899   // A read/write consists of three basic steps:
2900   //    1. create an io context.
2901   //    2. prepare and submit an io request to the context
2902   //    3. wait for an event on the context.
2903
2904   struct {
2905     const int opcode;
2906     const char *op_str;
2907     const char *error_str;
2908   } operations[2] = {
2909     { IO_CMD_PREAD, "read", "disk-read-error" },
2910     { IO_CMD_PWRITE, "write", "disk-write-error" }
2911   };
2912
2913   struct iocb cb;
2914   memset(&cb, 0, sizeof(cb));
2915
2916   cb.aio_fildes = fd;
2917   cb.aio_lio_opcode = operations[op].opcode;
2918   cb.u.c.buf = buf;
2919   cb.u.c.nbytes = size;
2920   cb.u.c.offset = offset;
2921
2922   struct iocb *cbs[] = { &cb };
2923   if (io_submit(aio_ctx_, 1, cbs) != 1) {
2924     int error = errno;
2925     char buf[256];
2926     sat_strerror(error, buf, sizeof(buf));
2927     logprintf(0, "Process Error: Unable to submit async %s "
2928                  "on disk %s (thread %d). Error %d, %s\n",
2929               operations[op].op_str, device_name_.c_str(),
2930               thread_num_, error, buf);
2931     return false;
2932   }
2933
2934   struct io_event event;
2935   memset(&event, 0, sizeof(event));
2936   struct timespec tv;
2937   tv.tv_sec = timeout / 1000000;
2938   tv.tv_nsec = (timeout % 1000000) * 1000;
2939   if (io_getevents(aio_ctx_, 1, 1, &event, &tv) != 1) {
2940     // A ctrl-c from the keyboard will cause io_getevents to fail with an
2941     // EINTR error code.  This is not an error and so don't treat it as such,
2942     // but still log it.
2943     int error = errno;
2944     if (error == EINTR) {
2945       logprintf(5, "Log: %s interrupted on disk %s (thread %d).\n",
2946                 operations[op].op_str, device_name_.c_str(),
2947                 thread_num_);
2948     } else {
2949       os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2950       errorcount_ += 1;
2951       logprintf(0, "Hardware Error: Timeout doing async %s to sectors "
2952                    "starting at %lld on disk %s (thread %d).\n",
2953                 operations[op].op_str, offset / kSectorSize,
2954                 device_name_.c_str(), thread_num_);
2955     }
2956
2957     // Don't bother checking return codes since io_cancel seems to always fail.
2958     // Since io_cancel is always failing, destroying and recreating an I/O
2959     // context is a workaround for canceling an in-progress I/O operation.
2960     // TODO(amistry): Find out why io_cancel isn't working and make it work.
2961     io_cancel(aio_ctx_, &cb, &event);
2962     io_destroy(aio_ctx_);
2963     aio_ctx_ = 0;
2964     if (io_setup(5, &aio_ctx_)) {
2965       int error = errno;
2966       char buf[256];
2967       sat_strerror(error, buf, sizeof(buf));
2968       logprintf(0, "Process Error: Unable to create aio context on disk %s"
2969                 " (thread %d) Error %d, %s\n",
2970                 device_name_.c_str(), thread_num_, error, buf);
2971     }
2972
2973     return false;
2974   }
2975
2976   // event.res contains the number of bytes written/read or
2977   // error if < 0, I think.
2978   if (event.res != static_cast<uint64>(size)) {
2979     errorcount_++;
2980     os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2981
2982     if (event.res < 0) {
2983       switch (event.res) {
2984         case -EIO:
2985           logprintf(0, "Hardware Error: Low-level I/O error while doing %s to "
2986                        "sectors starting at %lld on disk %s (thread %d).\n",
2987                     operations[op].op_str, offset / kSectorSize,
2988                     device_name_.c_str(), thread_num_);
2989           break;
2990         default:
2991           logprintf(0, "Hardware Error: Unknown error while doing %s to "
2992                        "sectors starting at %lld on disk %s (thread %d).\n",
2993                     operations[op].op_str, offset / kSectorSize,
2994                     device_name_.c_str(), thread_num_);
2995       }
2996     } else {
2997       logprintf(0, "Hardware Error: Unable to %s to sectors starting at "
2998                    "%lld on disk %s (thread %d).\n",
2999                 operations[op].op_str, offset / kSectorSize,
3000                 device_name_.c_str(), thread_num_);
3001     }
3002     return false;
3003   }
3004
3005   return true;
3006 #else // !HAVE_LIBAIO_H
3007   return false;
3008 #endif
3009 }
3010
3011 // Write a block to disk.
3012 // Return false if the block is not written.
3013 bool DiskThread::WriteBlockToDisk(int fd, BlockData *block) {
3014   memset(block_buffer_, 0, block->GetSize());
3015
3016   // Fill block buffer with a pattern
3017   struct page_entry pe;
3018   if (!sat_->GetValid(&pe)) {
3019     // Even though a valid page could not be obatined, it is not an error
3020     // since we can always fill in a pattern directly, albeit slower.
3021     unsigned int *memblock = static_cast<unsigned int *>(block_buffer_);
3022     block->SetPattern(patternlist_->GetRandomPattern());
3023
3024     logprintf(11, "Log: Warning, using pattern fill fallback in "
3025                   "DiskThread::WriteBlockToDisk on disk %s (thread %d).\n",
3026               device_name_.c_str(), thread_num_);
3027
3028     for (int i = 0; i < block->GetSize()/wordsize_; i++) {
3029       memblock[i] = block->GetPattern()->pattern(i);
3030     }
3031   } else {
3032     memcpy(block_buffer_, pe.addr, block->GetSize());
3033     block->SetPattern(pe.pattern);
3034     sat_->PutValid(&pe);
3035   }
3036
3037   logprintf(12, "Log: Writing %lld sectors starting at %lld on disk %s"
3038             " (thread %d).\n",
3039             block->GetSize()/kSectorSize, block->GetAddress(),
3040             device_name_.c_str(), thread_num_);
3041
3042   int64 start_time = GetTime();
3043
3044   if (!AsyncDiskIO(ASYNC_IO_WRITE, fd, block_buffer_, block->GetSize(),
3045                    block->GetAddress() * kSectorSize, write_timeout_)) {
3046     return false;
3047   }
3048
3049   int64 end_time = GetTime();
3050   logprintf(12, "Log: Writing time: %lld us (thread %d).\n",
3051             end_time - start_time, thread_num_);
3052   if (end_time - start_time > write_threshold_) {
3053     logprintf(5, "Log: Write took %lld us which is longer than threshold "
3054                  "%lld us on disk %s (thread %d).\n",
3055               end_time - start_time, write_threshold_, device_name_.c_str(),
3056               thread_num_);
3057   }
3058
3059   return true;
3060 }
3061
3062 // Verify a block on disk.
3063 // Return true if the block was read, also increment errorcount
3064 // if the block had data errors or performance problems.
3065 bool DiskThread::ValidateBlockOnDisk(int fd, BlockData *block) {
3066   int64 blocks = block->GetSize() / read_block_size_;
3067   int64 bytes_read = 0;
3068   int64 current_blocks;
3069   int64 current_bytes;
3070   uint64 address = block->GetAddress();
3071
3072   logprintf(20, "Log: Reading sectors starting at %lld on disk %s "
3073             "(thread %d).\n",
3074             address, device_name_.c_str(), thread_num_);
3075
3076   // Read block from disk and time the read.  If it takes longer than the
3077   // threshold, complain.
3078   if (lseek64(fd, address * kSectorSize, SEEK_SET) == -1) {
3079     logprintf(0, "Process Error: Unable to seek to sector %lld in "
3080               "DiskThread::ValidateSectorsOnDisk on disk %s "
3081               "(thread %d).\n", address, device_name_.c_str(), thread_num_);
3082     return false;
3083   }
3084   int64 start_time = GetTime();
3085
3086   // Split a large write-sized block into small read-sized blocks and
3087   // read them in groups of randomly-sized multiples of read block size.
3088   // This assures all data written on disk by this particular block
3089   // will be tested using a random reading pattern.
3090   while (blocks != 0) {
3091     // Test all read blocks in a written block.
3092     current_blocks = (random() % blocks) + 1;
3093     current_bytes = current_blocks * read_block_size_;
3094
3095     memset(block_buffer_, 0, current_bytes);
3096
3097     logprintf(20, "Log: Reading %lld sectors starting at sector %lld on "
3098               "disk %s (thread %d)\n",
3099               current_bytes / kSectorSize,
3100               (address * kSectorSize + bytes_read) / kSectorSize,
3101               device_name_.c_str(), thread_num_);
3102
3103     if (!AsyncDiskIO(ASYNC_IO_READ, fd, block_buffer_, current_bytes,
3104                      address * kSectorSize + bytes_read,
3105                      write_timeout_)) {
3106       return false;
3107     }
3108
3109     int64 end_time = GetTime();
3110     logprintf(20, "Log: Reading time: %lld us (thread %d).\n",
3111               end_time - start_time, thread_num_);
3112     if (end_time - start_time > read_threshold_) {
3113       logprintf(5, "Log: Read took %lld us which is longer than threshold "
3114                 "%lld us on disk %s (thread %d).\n",
3115                 end_time - start_time, read_threshold_,
3116                 device_name_.c_str(), thread_num_);
3117     }
3118
3119     // In non-destructive mode, don't compare the block to the pattern since
3120     // the block was never written to disk in the first place.
3121     if (!non_destructive_) {
3122       if (CheckRegion(block_buffer_, block->GetPattern(), current_bytes,
3123                       0, bytes_read)) {
3124         os_->ErrorReport(device_name_.c_str(), "disk-pattern-error", 1);
3125         errorcount_ += 1;
3126         logprintf(0, "Hardware Error: Pattern mismatch in block starting at "
3127                   "sector %lld in DiskThread::ValidateSectorsOnDisk on "
3128                   "disk %s (thread %d).\n",
3129                   address, device_name_.c_str(), thread_num_);
3130       }
3131     }
3132
3133     bytes_read += current_blocks * read_block_size_;
3134     blocks -= current_blocks;
3135   }
3136
3137   return true;
3138 }
3139
3140 // Direct device access thread.
3141 // Return false on software error.
3142 bool DiskThread::Work() {
3143   int fd;
3144
3145   logprintf(9, "Log: Starting disk thread %d, disk %s\n",
3146             thread_num_, device_name_.c_str());
3147
3148   srandom(time(NULL));
3149
3150   if (!OpenDevice(&fd)) {
3151     status_ = false;
3152     return false;
3153   }
3154
3155   // Allocate a block buffer aligned to 512 bytes since the kernel requires it
3156   // when using direct IO.
3157 #ifdef HAVE_POSIX_MEMALIGN
3158   int memalign_result = posix_memalign(&block_buffer_, kBufferAlignment,
3159                               sat_->page_length());
3160 #else
3161   block_buffer_ = memalign(kBufferAlignment, sat_->page_length());
3162   int memalign_result = (block_buffer_ == 0);
3163 #endif
3164   if (memalign_result) {
3165     CloseDevice(fd);
3166     logprintf(0, "Process Error: Unable to allocate memory for buffers "
3167                  "for disk %s (thread %d) posix memalign returned %d.\n",
3168               device_name_.c_str(), thread_num_, memalign_result);
3169     status_ = false;
3170     return false;
3171   }
3172
3173 #ifdef HAVE_LIBAIO_H
3174   if (io_setup(5, &aio_ctx_)) {
3175     CloseDevice(fd);
3176     logprintf(0, "Process Error: Unable to create aio context for disk %s"
3177               " (thread %d).\n",
3178               device_name_.c_str(), thread_num_);
3179     status_ = false;
3180     return false;
3181   }
3182 #endif
3183
3184   bool result = DoWork(fd);
3185
3186   status_ = result;
3187
3188 #ifdef HAVE_LIBAIO_H
3189   io_destroy(aio_ctx_);
3190 #endif
3191   CloseDevice(fd);
3192
3193   logprintf(9, "Log: Completed %d (disk %s): disk thread status %d, "
3194                "%d pages copied\n",
3195             thread_num_, device_name_.c_str(), status_, pages_copied_);
3196   return result;
3197 }
3198
3199 RandomDiskThread::RandomDiskThread(DiskBlockTable *block_table)
3200     : DiskThread(block_table) {
3201   update_block_table_ = 0;
3202 }
3203
3204 RandomDiskThread::~RandomDiskThread() {
3205 }
3206
3207 // Workload for random disk thread.
3208 bool RandomDiskThread::DoWork(int fd) {
3209   logprintf(11, "Log: Random phase for disk %s (thread %d).\n",
3210             device_name_.c_str(), thread_num_);
3211   while (IsReadyToRun()) {
3212     BlockData *block = block_table_->GetRandomBlock();
3213     if (block == NULL) {
3214       logprintf(12, "Log: No block available for device %s (thread %d).\n",
3215                 device_name_.c_str(), thread_num_);
3216     } else {
3217       ValidateBlockOnDisk(fd, block);
3218       block_table_->ReleaseBlock(block);
3219       blocks_read_++;
3220     }
3221   }
3222   pages_copied_ = blocks_read_;
3223   return true;
3224 }
3225
3226 MemoryRegionThread::MemoryRegionThread() {
3227   error_injection_ = false;
3228   pages_ = NULL;
3229 }
3230
3231 MemoryRegionThread::~MemoryRegionThread() {
3232   if (pages_ != NULL)
3233     delete pages_;
3234 }
3235
3236 // Set a region of memory or MMIO to be tested.
3237 // Return false if region could not be mapped.
3238 bool MemoryRegionThread::SetRegion(void *region, int64 size) {
3239   int plength = sat_->page_length();
3240   int npages = size / plength;
3241   if (size % plength) {
3242     logprintf(0, "Process Error: region size is not a multiple of SAT "
3243               "page length\n");
3244     return false;
3245   } else {
3246     if (pages_ != NULL)
3247       delete pages_;
3248     pages_ = new PageEntryQueue(npages);
3249     char *base_addr = reinterpret_cast<char*>(region);
3250     region_ = base_addr;
3251     for (int i = 0; i < npages; i++) {
3252       struct page_entry pe;
3253       init_pe(&pe);
3254       pe.addr = reinterpret_cast<void*>(base_addr + i * plength);
3255       pe.offset = i * plength;
3256
3257       pages_->Push(&pe);
3258     }
3259     return true;
3260   }
3261 }
3262
3263 // More detailed error printout for hardware errors in memory or MMIO
3264 // regions.
3265 void MemoryRegionThread::ProcessError(struct ErrorRecord *error,
3266                                       int priority,
3267                                       const char *message) {
3268   uint32 buffer_offset;
3269   if (phase_ == kPhaseCopy) {
3270     // If the error occurred on the Copy Phase, it means that
3271     // the source data (i.e., the main memory) is wrong. so
3272     // just pass it to the original ProcessError to call a
3273     // bad-dimm error
3274     WorkerThread::ProcessError(error, priority, message);
3275   } else if (phase_ == kPhaseCheck) {
3276     // A error on the Check Phase means that the memory region tested
3277     // has an error. Gathering more information and then reporting
3278     // the error.
3279     // Determine if this is a write or read error.
3280     os_->Flush(error->vaddr);
3281     error->reread = *(error->vaddr);
3282     char *good = reinterpret_cast<char*>(&(error->expected));
3283     char *bad = reinterpret_cast<char*>(&(error->actual));
3284     sat_assert(error->expected != error->actual);
3285     unsigned int offset = 0;
3286     for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
3287       if (good[offset] != bad[offset])
3288         break;
3289     }
3290
3291     error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
3292
3293     buffer_offset = error->vbyteaddr - region_;
3294
3295     // Find physical address if possible.
3296     error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
3297     logprintf(priority,
3298               "%s: miscompare on %s, CRC check at %p(0x%llx), "
3299               "offset %llx: read:0x%016llx, reread:0x%016llx "
3300               "expected:0x%016llx\n",
3301               message,
3302               identifier_.c_str(),
3303               error->vaddr,
3304               error->paddr,
3305               buffer_offset,
3306               error->actual,
3307               error->reread,
3308               error->expected);
3309   } else {
3310     logprintf(0, "Process Error: memory region thread raised an "
3311               "unexpected error.");
3312   }
3313 }
3314
3315 // Workload for testion memory or MMIO regions.
3316 // Return false on software error.
3317 bool MemoryRegionThread::Work() {
3318   struct page_entry source_pe;
3319   struct page_entry memregion_pe;
3320   bool result = true;
3321   int64 loops = 0;
3322   const uint64 error_constant = 0x00ba00000000ba00LL;
3323
3324   // For error injection.
3325   int64 *addr = 0x0;
3326   int offset = 0;
3327   int64 data = 0;
3328
3329   logprintf(9, "Log: Starting Memory Region thread %d\n", thread_num_);
3330
3331   while (IsReadyToRun()) {
3332     // Getting pages from SAT and queue.
3333     phase_ = kPhaseNoPhase;
3334     result = result && sat_->GetValid(&source_pe);
3335     if (!result) {
3336       logprintf(0, "Process Error: memory region thread failed to pop "
3337                 "pages from SAT, bailing\n");
3338       break;
3339     }
3340
3341     result = result && pages_->PopRandom(&memregion_pe);
3342     if (!result) {
3343       logprintf(0, "Process Error: memory region thread failed to pop "
3344                 "pages from queue, bailing\n");
3345       break;
3346     }
3347
3348     // Error injection for CRC copy.
3349     if ((sat_->error_injection() || error_injection_) && loops == 1) {
3350       addr = reinterpret_cast<int64*>(source_pe.addr);
3351       offset = random() % (sat_->page_length() / wordsize_);
3352       data = addr[offset];
3353       addr[offset] = error_constant;
3354     }
3355
3356     // Copying SAT page into memory region.
3357     phase_ = kPhaseCopy;
3358     CrcCopyPage(&memregion_pe, &source_pe);
3359     memregion_pe.pattern = source_pe.pattern;
3360
3361     // Error injection for CRC Check.
3362     if ((sat_->error_injection() || error_injection_) && loops == 2) {
3363       addr = reinterpret_cast<int64*>(memregion_pe.addr);
3364       offset = random() % (sat_->page_length() / wordsize_);
3365       data = addr[offset];
3366       addr[offset] = error_constant;
3367     }
3368
3369     // Checking page content in memory region.
3370     phase_ = kPhaseCheck;
3371     CrcCheckPage(&memregion_pe);
3372
3373     phase_ = kPhaseNoPhase;
3374     // Storing pages on their proper queues.
3375     result = result && sat_->PutValid(&source_pe);
3376     if (!result) {
3377       logprintf(0, "Process Error: memory region thread failed to push "
3378                 "pages into SAT, bailing\n");
3379       break;
3380     }
3381     result = result && pages_->Push(&memregion_pe);
3382     if (!result) {
3383       logprintf(0, "Process Error: memory region thread failed to push "
3384                 "pages into queue, bailing\n");
3385       break;
3386     }
3387
3388     if ((sat_->error_injection() || error_injection_) &&
3389         loops >= 1 && loops <= 2) {
3390       addr[offset] = data;
3391     }
3392
3393     loops++;
3394     YieldSelf();
3395   }
3396
3397   pages_copied_ = loops;
3398   status_ = result;
3399   logprintf(9, "Log: Completed %d: Memory Region thread. Status %d, %d "
3400             "pages checked\n", thread_num_, status_, pages_copied_);
3401   return result;
3402 }