chiark / gitweb /
d24b5cd2c077b30881385871bd61d7bbced29e6b
[stressapptest] / src / worker.cc
1 // Copyright 2006 Google Inc. All Rights Reserved.
2
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6
7 //      http://www.apache.org/licenses/LICENSE-2.0
8
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // worker.cc : individual tasks that can be run in combination to
16 // stress the system
17
18 #include <errno.h>
19 #include <pthread.h>
20 #include <sched.h>
21 #include <signal.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <string.h>
26 #include <time.h>
27 #include <unistd.h>
28
29 #include <sys/select.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <sys/times.h>
33
34 // These are necessary, but on by default
35 // #define __USE_GNU
36 // #define __USE_LARGEFILE64
37 #include <fcntl.h>
38 #include <sys/socket.h>
39 #include <netdb.h>
40 #include <arpa/inet.h>
41 #include <linux/unistd.h>  // for gettid
42
43 // For size of block device
44 #include <sys/ioctl.h>
45 #include <linux/fs.h>
46 // For asynchronous I/O
47 #ifdef HAVE_LIBAIO_H
48 #include <libaio.h>
49 #endif
50
51 #include <sys/syscall.h>
52
53 #include <set>
54 #include <string>
55
56 // This file must work with autoconf on its public version,
57 // so these includes are correct.
58 #include "error_diag.h"  // NOLINT
59 #include "os.h"          // NOLINT
60 #include "pattern.h"     // NOLINT
61 #include "queue.h"       // NOLINT
62 #include "sat.h"         // NOLINT
63 #include "sattypes.h"    // NOLINT
64 #include "worker.h"      // NOLINT
65
66 // Syscalls
67 // Why ubuntu, do you hate gettid so bad?
68 #if !defined(__NR_gettid)
69   #define __NR_gettid             224
70 #endif
71
72 #define gettid() syscall(__NR_gettid)
73 #if !defined(CPU_SETSIZE)
74 _syscall3(int, sched_getaffinity, pid_t, pid,
75           unsigned int, len, cpu_set_t*, mask)
76 _syscall3(int, sched_setaffinity, pid_t, pid,
77           unsigned int, len, cpu_set_t*, mask)
78 #endif
79
80 namespace {
81   // Get HW core ID from cpuid instruction.
82   inline int apicid(void) {
83     int cpu;
84 #if defined(STRESSAPPTEST_CPU_X86_64) || defined(STRESSAPPTEST_CPU_I686)
85     __asm__ __volatile__ (
86 # if defined(STRESSAPPTEST_CPU_I686) && defined(__PIC__)
87         "xchg %%ebx, %%esi;"
88         "cpuid;"
89         "xchg %%esi, %%ebx;"
90         : "=S" (cpu)
91 # else
92         "cpuid;"
93         : "=b" (cpu)
94 # endif
95         : "a" (1) : "cx", "dx");
96 #elif defined(STRESSAPPTEST_CPU_ARMV7A)
97   #warning "Unsupported CPU type ARMV7A: unable to determine core ID."
98     cpu = 0;
99 #else
100   #warning "Unsupported CPU type: unable to determine core ID."
101     cpu = 0;
102 #endif
103     return (cpu >> 24);
104   }
105
106   // Work around the sad fact that there are two (gnu, xsi) incompatible
107   // versions of strerror_r floating around google. Awesome.
108   bool sat_strerror(int err, char *buf, int len) {
109     buf[0] = 0;
110     char *errmsg = reinterpret_cast<char*>(strerror_r(err, buf, len));
111     int retval = reinterpret_cast<int64>(errmsg);
112     if (retval == 0)
113       return true;
114     if (retval == -1)
115       return false;
116     if (errmsg != buf) {
117       strncpy(buf, errmsg, len);
118       buf[len - 1] = 0;
119     }
120     return true;
121   }
122
123
124   inline uint64 addr_to_tag(void *address) {
125     return reinterpret_cast<uint64>(address);
126   }
127 }
128
129 #if !defined(O_DIRECT)
130 // Sometimes this isn't available.
131 // Disregard if it's not defined.
132   #define O_DIRECT            0
133 #endif
134
135 // A struct to hold captured errors, for later reporting.
136 struct ErrorRecord {
137   uint64 actual;  // This is the actual value read.
138   uint64 reread;  // This is the actual value, reread.
139   uint64 expected;  // This is what it should have been.
140   uint64 *vaddr;  // This is where it was (or wasn't).
141   char *vbyteaddr;  // This is byte specific where the data was (or wasn't).
142   uint64 paddr;  // This is the bus address, if available.
143   uint64 *tagvaddr;  // This holds the tag value if this data was tagged.
144   uint64 tagpaddr;  // This holds the physical address corresponding to the tag.
145 };
146
147 // This is a helper function to create new threads with pthreads.
148 static void *ThreadSpawnerGeneric(void *ptr) {
149   WorkerThread *worker = static_cast<WorkerThread*>(ptr);
150   worker->StartRoutine();
151   return NULL;
152 }
153
154 void WorkerStatus::Initialize() {
155   sat_assert(0 == pthread_mutex_init(&num_workers_mutex_, NULL));
156   sat_assert(0 == pthread_rwlock_init(&status_rwlock_, NULL));
157 #ifdef HAVE_PTHREAD_BARRIERS
158   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL,
159                                        num_workers_ + 1));
160 #endif
161 }
162
163 void WorkerStatus::Destroy() {
164   sat_assert(0 == pthread_mutex_destroy(&num_workers_mutex_));
165   sat_assert(0 == pthread_rwlock_destroy(&status_rwlock_));
166 #ifdef HAVE_PTHREAD_BARRIERS
167   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
168 #endif
169 }
170
171 void WorkerStatus::PauseWorkers() {
172   if (SetStatus(PAUSE) != PAUSE)
173     WaitOnPauseBarrier();
174 }
175
176 void WorkerStatus::ResumeWorkers() {
177   if (SetStatus(RUN) == PAUSE)
178     WaitOnPauseBarrier();
179 }
180
181 void WorkerStatus::StopWorkers() {
182   if (SetStatus(STOP) == PAUSE)
183     WaitOnPauseBarrier();
184 }
185
186 bool WorkerStatus::ContinueRunning() {
187   // This loop is an optimization.  We use it to immediately re-check the status
188   // after resuming from a pause, instead of returning and waiting for the next
189   // call to this function.
190   for (;;) {
191     switch (GetStatus()) {
192       case RUN:
193         return true;
194       case PAUSE:
195         // Wait for the other workers to call this function so that
196         // PauseWorkers() can return.
197         WaitOnPauseBarrier();
198         // Wait for ResumeWorkers() to be called.
199         WaitOnPauseBarrier();
200         break;
201       case STOP:
202         return false;
203     }
204   }
205 }
206
207 bool WorkerStatus::ContinueRunningNoPause() {
208   return (GetStatus() != STOP);
209 }
210
211 void WorkerStatus::RemoveSelf() {
212   // Acquire a read lock on status_rwlock_ while (status_ != PAUSE).
213   for (;;) {
214     AcquireStatusReadLock();
215     if (status_ != PAUSE)
216       break;
217     // We need to obey PauseWorkers() just like ContinueRunning() would, so that
218     // the other threads won't wait on pause_barrier_ forever.
219     ReleaseStatusLock();
220     // Wait for the other workers to call this function so that PauseWorkers()
221     // can return.
222     WaitOnPauseBarrier();
223     // Wait for ResumeWorkers() to be called.
224     WaitOnPauseBarrier();
225   }
226
227   // This lock would be unnecessary if we held a write lock instead of a read
228   // lock on status_rwlock_, but that would also force all threads calling
229   // ContinueRunning() to wait on this one.  Using a separate lock avoids that.
230   AcquireNumWorkersLock();
231   // Decrement num_workers_ and reinitialize pause_barrier_, which we know isn't
232   // in use because (status != PAUSE).
233 #ifdef HAVE_PTHREAD_BARRIERS
234   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
235   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL, num_workers_));
236 #endif
237   --num_workers_;
238   ReleaseNumWorkersLock();
239
240   // Release status_rwlock_.
241   ReleaseStatusLock();
242 }
243
244
245 // Parent thread class.
246 WorkerThread::WorkerThread() {
247   status_ = false;
248   pages_copied_ = 0;
249   errorcount_ = 0;
250   runduration_usec_ = 1;
251   priority_ = Normal;
252   worker_status_ = NULL;
253   thread_spawner_ = &ThreadSpawnerGeneric;
254   tag_mode_ = false;
255 }
256
257 WorkerThread::~WorkerThread() {}
258
259 // Constructors. Just init some default values.
260 FillThread::FillThread() {
261   num_pages_to_fill_ = 0;
262 }
263
264 // Initialize file name to empty.
265 FileThread::FileThread() {
266   filename_ = "";
267   devicename_ = "";
268   pass_ = 0;
269   page_io_ = true;
270   crc_page_ = -1;
271   local_page_ = NULL;
272 }
273
274 // If file thread used bounce buffer in memory, account for the extra
275 // copy for memory bandwidth calculation.
276 float FileThread::GetMemoryCopiedData() {
277   if (!os_->normal_mem())
278     return GetCopiedData();
279   else
280     return 0;
281 }
282
283 // Initialize target hostname to be invalid.
284 NetworkThread::NetworkThread() {
285   snprintf(ipaddr_, sizeof(ipaddr_), "Unknown");
286   sock_ = 0;
287 }
288
289 // Initialize?
290 NetworkSlaveThread::NetworkSlaveThread() {
291 }
292
293 // Initialize?
294 NetworkListenThread::NetworkListenThread() {
295 }
296
297 // Init member variables.
298 void WorkerThread::InitThread(int thread_num_init,
299                               class Sat *sat_init,
300                               class OsLayer *os_init,
301                               class PatternList *patternlist_init,
302                               WorkerStatus *worker_status) {
303   sat_assert(worker_status);
304   worker_status->AddWorkers(1);
305
306   thread_num_ = thread_num_init;
307   sat_ = sat_init;
308   os_ = os_init;
309   patternlist_ = patternlist_init;
310   worker_status_ = worker_status;
311
312   AvailableCpus(&cpu_mask_);
313   tag_ = 0xffffffff;
314
315   tag_mode_ = sat_->tag_mode();
316 }
317
318
319 // Use pthreads to prioritize a system thread.
320 bool WorkerThread::InitPriority() {
321   // This doesn't affect performance that much, and may not be too safe.
322
323   bool ret = BindToCpus(&cpu_mask_);
324   if (!ret)
325     logprintf(11, "Log: Bind to %s failed.\n",
326               cpuset_format(&cpu_mask_).c_str());
327
328   logprintf(11, "Log: Thread %d running on apic ID %d mask %s (%s).\n",
329             thread_num_, apicid(),
330             CurrentCpusFormat().c_str(),
331             cpuset_format(&cpu_mask_).c_str());
332 #if 0
333   if (priority_ == High) {
334     sched_param param;
335     param.sched_priority = 1;
336     // Set the priority; others are unchanged.
337     logprintf(0, "Log: Changing priority to SCHED_FIFO %d\n",
338               param.sched_priority);
339     if (sched_setscheduler(0, SCHED_FIFO, &param)) {
340       char buf[256];
341       sat_strerror(errno, buf, sizeof(buf));
342       logprintf(0, "Process Error: sched_setscheduler "
343                    "failed - error %d %s\n",
344                 errno, buf);
345     }
346   }
347 #endif
348   return true;
349 }
350
351 // Use pthreads to create a system thread.
352 int WorkerThread::SpawnThread() {
353   // Create the new thread.
354   int result = pthread_create(&thread_, NULL, thread_spawner_, this);
355   if (result) {
356     char buf[256];
357     sat_strerror(result, buf, sizeof(buf));
358     logprintf(0, "Process Error: pthread_create "
359                   "failed - error %d %s\n", result,
360               buf);
361     status_ = false;
362     return false;
363   }
364
365   // 0 is pthreads success.
366   return true;
367 }
368
369 // Kill the worker thread with SIGINT.
370 bool WorkerThread::KillThread() {
371   return (pthread_kill(thread_, SIGINT) == 0);
372 }
373
374 // Block until thread has exited.
375 bool WorkerThread::JoinThread() {
376   int result = pthread_join(thread_, NULL);
377
378   if (result) {
379     logprintf(0, "Process Error: pthread_join failed - error %d\n", result);
380     status_ = false;
381   }
382
383   // 0 is pthreads success.
384   return (!result);
385 }
386
387
388 void WorkerThread::StartRoutine() {
389   InitPriority();
390   StartThreadTimer();
391   Work();
392   StopThreadTimer();
393   worker_status_->RemoveSelf();
394 }
395
396
397 // Thread work loop. Execute until marked finished.
398 bool WorkerThread::Work() {
399   do {
400     logprintf(9, "Log: ...\n");
401     // Sleep for 1 second.
402     sat_sleep(1);
403   } while (IsReadyToRun());
404
405   return false;
406 }
407
408
409 // Returns CPU mask of CPUs available to this process,
410 // Conceptually, each bit represents a logical CPU, ie:
411 //   mask = 3  (11b):   cpu0, 1
412 //   mask = 13 (1101b): cpu0, 2, 3
413 bool WorkerThread::AvailableCpus(cpu_set_t *cpuset) {
414   CPU_ZERO(cpuset);
415 #ifdef HAVE_SCHED_GETAFFINITY
416   return sched_getaffinity(getppid(), sizeof(*cpuset), cpuset) == 0;
417 #else
418   return 0;
419 #endif
420 }
421
422
423 // Returns CPU mask of CPUs this thread is bound to,
424 // Conceptually, each bit represents a logical CPU, ie:
425 //   mask = 3  (11b):   cpu0, 1
426 //   mask = 13 (1101b): cpu0, 2, 3
427 bool WorkerThread::CurrentCpus(cpu_set_t *cpuset) {
428   CPU_ZERO(cpuset);
429 #ifdef HAVE_SCHED_GETAFFINITY
430   return sched_getaffinity(0, sizeof(*cpuset), cpuset) == 0;
431 #else
432   return 0;
433 #endif
434 }
435
436
437 // Bind worker thread to specified CPU(s)
438 //   Args:
439 //     thread_mask: cpu_set_t representing CPUs, ie
440 //                  mask = 1  (01b):   cpu0
441 //                  mask = 3  (11b):   cpu0, 1
442 //                  mask = 13 (1101b): cpu0, 2, 3
443 //
444 //   Returns true on success, false otherwise.
445 bool WorkerThread::BindToCpus(const cpu_set_t *thread_mask) {
446   cpu_set_t process_mask;
447   AvailableCpus(&process_mask);
448   if (cpuset_isequal(thread_mask, &process_mask))
449     return true;
450
451   logprintf(11, "Log: available CPU mask - %s\n",
452             cpuset_format(&process_mask).c_str());
453   if (!cpuset_issubset(thread_mask, &process_mask)) {
454     // Invalid cpu_mask, ie cpu not allocated to this process or doesn't exist.
455     logprintf(0, "Log: requested CPUs %s not a subset of available %s\n",
456               cpuset_format(thread_mask).c_str(),
457               cpuset_format(&process_mask).c_str());
458     return false;
459   }
460 #ifdef HAVE_SCHED_GETAFFINITY
461   return (sched_setaffinity(gettid(), sizeof(*thread_mask), thread_mask) == 0);
462 #else
463   return 0;
464 #endif
465 }
466
467
468 // A worker thread can yield itself to give up CPU until it's scheduled again.
469 //   Returns true on success, false on error.
470 bool WorkerThread::YieldSelf() {
471   return (sched_yield() == 0);
472 }
473
474
475 // Fill this page with its pattern.
476 bool WorkerThread::FillPage(struct page_entry *pe) {
477   // Error check arguments.
478   if (pe == 0) {
479     logprintf(0, "Process Error: Fill Page entry null\n");
480     return 0;
481   }
482
483   // Mask is the bitmask of indexes used by the pattern.
484   // It is the pattern size -1. Size is always a power of 2.
485   uint64 *memwords = static_cast<uint64*>(pe->addr);
486   int length = sat_->page_length();
487
488   if (tag_mode_) {
489     // Select tag or data as appropriate.
490     for (int i = 0; i < length / wordsize_; i++) {
491       datacast_t data;
492
493       if ((i & 0x7) == 0) {
494         data.l64 = addr_to_tag(&memwords[i]);
495       } else {
496         data.l32.l = pe->pattern->pattern(i << 1);
497         data.l32.h = pe->pattern->pattern((i << 1) + 1);
498       }
499       memwords[i] = data.l64;
500     }
501   } else {
502     // Just fill in untagged data directly.
503     for (int i = 0; i < length / wordsize_; i++) {
504       datacast_t data;
505
506       data.l32.l = pe->pattern->pattern(i << 1);
507       data.l32.h = pe->pattern->pattern((i << 1) + 1);
508       memwords[i] = data.l64;
509     }
510   }
511
512   return 1;
513 }
514
515
516 // Tell the thread how many pages to fill.
517 void FillThread::SetFillPages(int64 num_pages_to_fill_init) {
518   num_pages_to_fill_ = num_pages_to_fill_init;
519 }
520
521 // Fill this page with a random pattern.
522 bool FillThread::FillPageRandom(struct page_entry *pe) {
523   // Error check arguments.
524   if (pe == 0) {
525     logprintf(0, "Process Error: Fill Page entry null\n");
526     return 0;
527   }
528   if ((patternlist_ == 0) || (patternlist_->Size() == 0)) {
529     logprintf(0, "Process Error: No data patterns available\n");
530     return 0;
531   }
532
533   // Choose a random pattern for this block.
534   pe->pattern = patternlist_->GetRandomPattern();
535   if (pe->pattern == 0) {
536     logprintf(0, "Process Error: Null data pattern\n");
537     return 0;
538   }
539
540   // Actually fill the page.
541   return FillPage(pe);
542 }
543
544
545 // Memory fill work loop. Execute until alloted pages filled.
546 bool FillThread::Work() {
547   bool result = true;
548
549   logprintf(9, "Log: Starting fill thread %d\n", thread_num_);
550
551   // We want to fill num_pages_to_fill pages, and
552   // stop when we've filled that many.
553   // We also want to capture early break
554   struct page_entry pe;
555   int64 loops = 0;
556   while (IsReadyToRun() && (loops < num_pages_to_fill_)) {
557     result = result && sat_->GetEmpty(&pe);
558     if (!result) {
559       logprintf(0, "Process Error: fill_thread failed to pop pages, "
560                 "bailing\n");
561       break;
562     }
563
564     // Fill the page with pattern
565     result = result && FillPageRandom(&pe);
566     if (!result) break;
567
568     // Put the page back on the queue.
569     result = result && sat_->PutValid(&pe);
570     if (!result) {
571       logprintf(0, "Process Error: fill_thread failed to push pages, "
572                 "bailing\n");
573       break;
574     }
575     loops++;
576   }
577
578   // Fill in thread status.
579   pages_copied_ = loops;
580   status_ = result;
581   logprintf(9, "Log: Completed %d: Fill thread. Status %d, %d pages filled\n",
582             thread_num_, status_, pages_copied_);
583   return result;
584 }
585
586
587 // Print error information about a data miscompare.
588 void WorkerThread::ProcessError(struct ErrorRecord *error,
589                                 int priority,
590                                 const char *message) {
591   char dimm_string[256] = "";
592
593   int apic_id = apicid();
594
595   // Determine if this is a write or read error.
596   os_->Flush(error->vaddr);
597   error->reread = *(error->vaddr);
598
599   char *good = reinterpret_cast<char*>(&(error->expected));
600   char *bad = reinterpret_cast<char*>(&(error->actual));
601
602   sat_assert(error->expected != error->actual);
603   unsigned int offset = 0;
604   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
605     if (good[offset] != bad[offset])
606       break;
607   }
608
609   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
610
611   // Find physical address if possible.
612   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
613
614   // Pretty print DIMM mapping if available.
615   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
616
617   // Report parseable error.
618   if (priority < 5) {
619     // Run miscompare error through diagnoser for logging and reporting.
620     os_->error_diagnoser_->AddMiscompareError(dimm_string,
621                                               reinterpret_cast<uint64>
622                                               (error->vaddr), 1);
623
624     logprintf(priority,
625               "%s: miscompare on CPU %d(0x%s) at %p(0x%llx:%s): "
626               "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
627               message,
628               apic_id,
629               CurrentCpusFormat().c_str(),
630               error->vaddr,
631               error->paddr,
632               dimm_string,
633               error->actual,
634               error->reread,
635               error->expected);
636   }
637
638
639   // Overwrite incorrect data with correct data to prevent
640   // future miscompares when this data is reused.
641   *(error->vaddr) = error->expected;
642   os_->Flush(error->vaddr);
643 }
644
645
646
647 // Print error information about a data miscompare.
648 void FileThread::ProcessError(struct ErrorRecord *error,
649                               int priority,
650                               const char *message) {
651   char dimm_string[256] = "";
652
653   // Determine if this is a write or read error.
654   os_->Flush(error->vaddr);
655   error->reread = *(error->vaddr);
656
657   char *good = reinterpret_cast<char*>(&(error->expected));
658   char *bad = reinterpret_cast<char*>(&(error->actual));
659
660   sat_assert(error->expected != error->actual);
661   unsigned int offset = 0;
662   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
663     if (good[offset] != bad[offset])
664       break;
665   }
666
667   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
668
669   // Find physical address if possible.
670   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
671
672   // Pretty print DIMM mapping if available.
673   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
674
675   // If crc_page_ is valid, ie checking content read back from file,
676   // track src/dst memory addresses. Otherwise catagorize as general
677   // mememory miscompare for CRC checking everywhere else.
678   if (crc_page_ != -1) {
679     int miscompare_byteoffset = static_cast<char*>(error->vbyteaddr) -
680                                 static_cast<char*>(page_recs_[crc_page_].dst);
681     os_->error_diagnoser_->AddHDDMiscompareError(devicename_,
682                                                  crc_page_,
683                                                  miscompare_byteoffset,
684                                                  page_recs_[crc_page_].src,
685                                                  page_recs_[crc_page_].dst);
686   } else {
687     os_->error_diagnoser_->AddMiscompareError(dimm_string,
688                                               reinterpret_cast<uint64>
689                                               (error->vaddr), 1);
690   }
691
692   logprintf(priority,
693             "%s: miscompare on %s at %p(0x%llx:%s): read:0x%016llx, "
694             "reread:0x%016llx expected:0x%016llx\n",
695             message,
696             devicename_.c_str(),
697             error->vaddr,
698             error->paddr,
699             dimm_string,
700             error->actual,
701             error->reread,
702             error->expected);
703
704   // Overwrite incorrect data with correct data to prevent
705   // future miscompares when this data is reused.
706   *(error->vaddr) = error->expected;
707   os_->Flush(error->vaddr);
708 }
709
710
711 // Do a word by word result check of a region.
712 // Print errors on mismatches.
713 int WorkerThread::CheckRegion(void *addr,
714                               class Pattern *pattern,
715                               int64 length,
716                               int offset,
717                               int64 pattern_offset) {
718   uint64 *memblock = static_cast<uint64*>(addr);
719   const int kErrorLimit = 128;
720   int errors = 0;
721   int overflowerrors = 0;  // Count of overflowed errors.
722   bool page_error = false;
723   string errormessage("Hardware Error");
724   struct ErrorRecord
725     recorded[kErrorLimit];  // Queued errors for later printing.
726
727   // For each word in the data region.
728   for (int i = 0; i < length / wordsize_; i++) {
729     uint64 actual = memblock[i];
730     uint64 expected;
731
732     // Determine the value that should be there.
733     datacast_t data;
734     int index = 2 * i + pattern_offset;
735     data.l32.l = pattern->pattern(index);
736     data.l32.h = pattern->pattern(index + 1);
737     expected = data.l64;
738     // Check tags if necessary.
739     if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
740       expected = addr_to_tag(&memblock[i]);
741     }
742
743
744     // If the value is incorrect, save an error record for later printing.
745     if (actual != expected) {
746       if (errors < kErrorLimit) {
747         recorded[errors].actual = actual;
748         recorded[errors].expected = expected;
749         recorded[errors].vaddr = &memblock[i];
750         errors++;
751       } else {
752         page_error = true;
753         // If we have overflowed the error queue, just print the errors now.
754         logprintf(10, "Log: Error record overflow, too many miscompares!\n");
755         errormessage = "Page Error";
756         break;
757       }
758     }
759   }
760
761   // Find if this is a whole block corruption.
762   if (page_error && !tag_mode_) {
763     int patsize = patternlist_->Size();
764     for (int pat = 0; pat < patsize; pat++) {
765       class Pattern *altpattern = patternlist_->GetPattern(pat);
766       const int kGood = 0;
767       const int kBad = 1;
768       const int kGoodAgain = 2;
769       const int kNoMatch = 3;
770       int state = kGood;
771       unsigned int badstart = 0;
772       unsigned int badend = 0;
773
774       // Don't match against ourself!
775       if (pattern == altpattern)
776         continue;
777
778       for (int i = 0; i < length / wordsize_; i++) {
779         uint64 actual = memblock[i];
780         datacast_t expected;
781         datacast_t possible;
782
783         // Determine the value that should be there.
784         int index = 2 * i + pattern_offset;
785
786         expected.l32.l = pattern->pattern(index);
787         expected.l32.h = pattern->pattern(index + 1);
788
789         possible.l32.l = pattern->pattern(index);
790         possible.l32.h = pattern->pattern(index + 1);
791
792         if (state == kGood) {
793           if (actual == expected.l64) {
794             continue;
795           } else if (actual == possible.l64) {
796             badstart = i;
797             badend = i;
798             state = kBad;
799             continue;
800           } else {
801             state = kNoMatch;
802             break;
803           }
804         } else if (state == kBad) {
805           if (actual == possible.l64) {
806             badend = i;
807             continue;
808           } else if (actual == expected.l64) {
809             state = kGoodAgain;
810             continue;
811           } else {
812             state = kNoMatch;
813             break;
814           }
815         } else if (state == kGoodAgain) {
816           if (actual == expected.l64) {
817             continue;
818           } else {
819             state = kNoMatch;
820             break;
821           }
822         }
823       }
824
825       if ((state == kGoodAgain) || (state == kBad)) {
826         unsigned int blockerrors = badend - badstart + 1;
827         errormessage = "Block Error";
828         ProcessError(&recorded[0], 0, errormessage.c_str());
829         logprintf(0, "Block Error: (%p) pattern %s instead of %s, "
830                   "%d bytes from offset 0x%x to 0x%x\n",
831                   &memblock[badstart],
832                   altpattern->name(), pattern->name(),
833                   blockerrors * wordsize_,
834                   offset + badstart * wordsize_,
835                   offset + badend * wordsize_);
836         errorcount_ += blockerrors;
837         return blockerrors;
838       }
839     }
840   }
841
842
843   // Process error queue after all errors have been recorded.
844   for (int err = 0; err < errors; err++) {
845     int priority = 5;
846     if (errorcount_ + err < 30)
847       priority = 0;  // Bump up the priority for the first few errors.
848     ProcessError(&recorded[err], priority, errormessage.c_str());
849   }
850
851   if (page_error) {
852     // For each word in the data region.
853     int error_recount = 0;
854     for (int i = 0; i < length / wordsize_; i++) {
855       uint64 actual = memblock[i];
856       uint64 expected;
857       datacast_t data;
858       // Determine the value that should be there.
859       int index = 2 * i + pattern_offset;
860
861       data.l32.l = pattern->pattern(index);
862       data.l32.h = pattern->pattern(index + 1);
863       expected = data.l64;
864
865       // Check tags if necessary.
866       if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
867         expected = addr_to_tag(&memblock[i]);
868       }
869
870       // If the value is incorrect, save an error record for later printing.
871       if (actual != expected) {
872         if (error_recount < kErrorLimit) {
873           // We already reported these.
874           error_recount++;
875         } else {
876           // If we have overflowed the error queue, print the errors now.
877           struct ErrorRecord er;
878           er.actual = actual;
879           er.expected = expected;
880           er.vaddr = &memblock[i];
881
882           // Do the error printout. This will take a long time and
883           // likely change the machine state.
884           ProcessError(&er, 12, errormessage.c_str());
885           overflowerrors++;
886         }
887       }
888     }
889   }
890
891   // Keep track of observed errors.
892   errorcount_ += errors + overflowerrors;
893   return errors + overflowerrors;
894 }
895
896 float WorkerThread::GetCopiedData() {
897   return pages_copied_ * sat_->page_length() / kMegabyte;
898 }
899
900 // Calculate the CRC of a region.
901 // Result check if the CRC mismatches.
902 int WorkerThread::CrcCheckPage(struct page_entry *srcpe) {
903   const int blocksize = 4096;
904   const int blockwords = blocksize / wordsize_;
905   int errors = 0;
906
907   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
908   uint64 *memblock = static_cast<uint64*>(srcpe->addr);
909   int blocks = sat_->page_length() / blocksize;
910   for (int currentblock = 0; currentblock < blocks; currentblock++) {
911     uint64 *memslice = memblock + currentblock * blockwords;
912
913     AdlerChecksum crc;
914     if (tag_mode_) {
915       AdlerAddrCrcC(memslice, blocksize, &crc, srcpe);
916     } else {
917       CalculateAdlerChecksum(memslice, blocksize, &crc);
918     }
919
920     // If the CRC does not match, we'd better look closer.
921     if (!crc.Equals(*expectedcrc)) {
922       logprintf(11, "Log: CrcCheckPage Falling through to slow compare, "
923                 "CRC mismatch %s != %s\n",
924                 crc.ToHexString().c_str(),
925                 expectedcrc->ToHexString().c_str());
926       int errorcount = CheckRegion(memslice,
927                                    srcpe->pattern,
928                                    blocksize,
929                                    currentblock * blocksize, 0);
930       if (errorcount == 0) {
931         logprintf(0, "Log: CrcCheckPage CRC mismatch %s != %s, "
932                      "but no miscompares found.\n",
933                   crc.ToHexString().c_str(),
934                   expectedcrc->ToHexString().c_str());
935       }
936       errors += errorcount;
937     }
938   }
939
940   // For odd length transfers, we should never hit this.
941   int leftovers = sat_->page_length() % blocksize;
942   if (leftovers) {
943     uint64 *memslice = memblock + blocks * blockwords;
944     errors += CheckRegion(memslice,
945                           srcpe->pattern,
946                           leftovers,
947                           blocks * blocksize, 0);
948   }
949   return errors;
950 }
951
952
953 // Print error information about a data miscompare.
954 void WorkerThread::ProcessTagError(struct ErrorRecord *error,
955                                    int priority,
956                                    const char *message) {
957   char dimm_string[256] = "";
958   char tag_dimm_string[256] = "";
959   bool read_error = false;
960
961   int apic_id = apicid();
962
963   // Determine if this is a write or read error.
964   os_->Flush(error->vaddr);
965   error->reread = *(error->vaddr);
966
967   // Distinguish read and write errors.
968   if (error->actual != error->reread) {
969     read_error = true;
970   }
971
972   sat_assert(error->expected != error->actual);
973
974   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr);
975
976   // Find physical address if possible.
977   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
978   error->tagpaddr = os_->VirtualToPhysical(error->tagvaddr);
979
980   // Pretty print DIMM mapping if available.
981   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
982   // Pretty print DIMM mapping if available.
983   os_->FindDimm(error->tagpaddr, tag_dimm_string, sizeof(tag_dimm_string));
984
985   // Report parseable error.
986   if (priority < 5) {
987     logprintf(priority,
988               "%s: Tag from %p(0x%llx:%s) (%s) "
989               "miscompare on CPU %d(0x%s) at %p(0x%llx:%s): "
990               "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
991               message,
992               error->tagvaddr, error->tagpaddr,
993               tag_dimm_string,
994               read_error ? "read error" : "write error",
995               apic_id,
996               CurrentCpusFormat().c_str(),
997               error->vaddr,
998               error->paddr,
999               dimm_string,
1000               error->actual,
1001               error->reread,
1002               error->expected);
1003   }
1004
1005   errorcount_ += 1;
1006
1007   // Overwrite incorrect data with correct data to prevent
1008   // future miscompares when this data is reused.
1009   *(error->vaddr) = error->expected;
1010   os_->Flush(error->vaddr);
1011 }
1012
1013
1014 // Print out and log a tag error.
1015 bool WorkerThread::ReportTagError(
1016     uint64 *mem64,
1017     uint64 actual,
1018     uint64 tag) {
1019   struct ErrorRecord er;
1020   er.actual = actual;
1021
1022   er.expected = tag;
1023   er.vaddr = mem64;
1024
1025   // Generate vaddr from tag.
1026   er.tagvaddr = reinterpret_cast<uint64*>(actual);
1027
1028   ProcessTagError(&er, 0, "Hardware Error");
1029   return true;
1030 }
1031
1032 // C implementation of Adler memory copy, with memory tagging.
1033 bool WorkerThread::AdlerAddrMemcpyC(uint64 *dstmem64,
1034                                     uint64 *srcmem64,
1035                                     unsigned int size_in_bytes,
1036                                     AdlerChecksum *checksum,
1037                                     struct page_entry *pe) {
1038   // Use this data wrapper to access memory with 64bit read/write.
1039   datacast_t data;
1040   datacast_t dstdata;
1041   unsigned int count = size_in_bytes / sizeof(data);
1042
1043   if (count > ((1U) << 19)) {
1044     // Size is too large, must be strictly less than 512 KB.
1045     return false;
1046   }
1047
1048   uint64 a1 = 1;
1049   uint64 a2 = 1;
1050   uint64 b1 = 0;
1051   uint64 b2 = 0;
1052
1053   class Pattern *pattern = pe->pattern;
1054
1055   unsigned int i = 0;
1056   while (i < count) {
1057     // Process 64 bits at a time.
1058     if ((i & 0x7) == 0) {
1059       data.l64 = srcmem64[i];
1060       dstdata.l64 = dstmem64[i];
1061       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1062       uint64 dst_tag = addr_to_tag(&dstmem64[i]);
1063       // Detect if tags have been corrupted.
1064       if (data.l64 != src_tag)
1065         ReportTagError(&srcmem64[i], data.l64, src_tag);
1066       if (dstdata.l64 != dst_tag)
1067         ReportTagError(&dstmem64[i], dstdata.l64, dst_tag);
1068
1069       data.l32.l = pattern->pattern(i << 1);
1070       data.l32.h = pattern->pattern((i << 1) + 1);
1071       a1 = a1 + data.l32.l;
1072       b1 = b1 + a1;
1073       a1 = a1 + data.l32.h;
1074       b1 = b1 + a1;
1075
1076       data.l64  = dst_tag;
1077       dstmem64[i] = data.l64;
1078
1079     } else {
1080       data.l64 = srcmem64[i];
1081       a1 = a1 + data.l32.l;
1082       b1 = b1 + a1;
1083       a1 = a1 + data.l32.h;
1084       b1 = b1 + a1;
1085       dstmem64[i] = data.l64;
1086     }
1087     i++;
1088
1089     data.l64 = srcmem64[i];
1090     a2 = a2 + data.l32.l;
1091     b2 = b2 + a2;
1092     a2 = a2 + data.l32.h;
1093     b2 = b2 + a2;
1094     dstmem64[i] = data.l64;
1095     i++;
1096   }
1097   checksum->Set(a1, a2, b1, b2);
1098   return true;
1099 }
1100
1101 // x86_64 SSE2 assembly implementation of Adler memory copy, with address
1102 // tagging added as a second step. This is useful for debugging failures
1103 // that only occur when SSE / nontemporal writes are used.
1104 bool WorkerThread::AdlerAddrMemcpyWarm(uint64 *dstmem64,
1105                                        uint64 *srcmem64,
1106                                        unsigned int size_in_bytes,
1107                                        AdlerChecksum *checksum,
1108                                        struct page_entry *pe) {
1109   // Do ASM copy, ignore checksum.
1110   AdlerChecksum ignored_checksum;
1111   os_->AdlerMemcpyWarm(dstmem64, srcmem64, size_in_bytes, &ignored_checksum);
1112
1113   // Force cache flush.
1114   int length = size_in_bytes / sizeof(*dstmem64);
1115   for (int i = 0; i < length; i += sizeof(*dstmem64)) {
1116     os_->FastFlush(dstmem64 + i);
1117     os_->FastFlush(srcmem64 + i);
1118   }
1119   // Check results.
1120   AdlerAddrCrcC(srcmem64, size_in_bytes, checksum, pe);
1121   // Patch up address tags.
1122   TagAddrC(dstmem64, size_in_bytes);
1123   return true;
1124 }
1125
1126 // Retag pages..
1127 bool WorkerThread::TagAddrC(uint64 *memwords,
1128                             unsigned int size_in_bytes) {
1129   // Mask is the bitmask of indexes used by the pattern.
1130   // It is the pattern size -1. Size is always a power of 2.
1131
1132   // Select tag or data as appropriate.
1133   int length = size_in_bytes / wordsize_;
1134   for (int i = 0; i < length; i += 8) {
1135     datacast_t data;
1136     data.l64 = addr_to_tag(&memwords[i]);
1137     memwords[i] = data.l64;
1138   }
1139   return true;
1140 }
1141
1142 // C implementation of Adler memory crc.
1143 bool WorkerThread::AdlerAddrCrcC(uint64 *srcmem64,
1144                                  unsigned int size_in_bytes,
1145                                  AdlerChecksum *checksum,
1146                                  struct page_entry *pe) {
1147   // Use this data wrapper to access memory with 64bit read/write.
1148   datacast_t data;
1149   unsigned int count = size_in_bytes / sizeof(data);
1150
1151   if (count > ((1U) << 19)) {
1152     // Size is too large, must be strictly less than 512 KB.
1153     return false;
1154   }
1155
1156   uint64 a1 = 1;
1157   uint64 a2 = 1;
1158   uint64 b1 = 0;
1159   uint64 b2 = 0;
1160
1161   class Pattern *pattern = pe->pattern;
1162
1163   unsigned int i = 0;
1164   while (i < count) {
1165     // Process 64 bits at a time.
1166     if ((i & 0x7) == 0) {
1167       data.l64 = srcmem64[i];
1168       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1169       // Check that tags match expected.
1170       if (data.l64 != src_tag)
1171         ReportTagError(&srcmem64[i], data.l64, src_tag);
1172
1173       data.l32.l = pattern->pattern(i << 1);
1174       data.l32.h = pattern->pattern((i << 1) + 1);
1175       a1 = a1 + data.l32.l;
1176       b1 = b1 + a1;
1177       a1 = a1 + data.l32.h;
1178       b1 = b1 + a1;
1179     } else {
1180       data.l64 = srcmem64[i];
1181       a1 = a1 + data.l32.l;
1182       b1 = b1 + a1;
1183       a1 = a1 + data.l32.h;
1184       b1 = b1 + a1;
1185     }
1186     i++;
1187
1188     data.l64 = srcmem64[i];
1189     a2 = a2 + data.l32.l;
1190     b2 = b2 + a2;
1191     a2 = a2 + data.l32.h;
1192     b2 = b2 + a2;
1193     i++;
1194   }
1195   checksum->Set(a1, a2, b1, b2);
1196   return true;
1197 }
1198
1199 // Copy a block of memory quickly, while keeping a CRC of the data.
1200 // Result check if the CRC mismatches.
1201 int WorkerThread::CrcCopyPage(struct page_entry *dstpe,
1202                               struct page_entry *srcpe) {
1203   int errors = 0;
1204   const int blocksize = 4096;
1205   const int blockwords = blocksize / wordsize_;
1206   int blocks = sat_->page_length() / blocksize;
1207
1208   // Base addresses for memory copy
1209   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1210   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1211   // Remember the expected CRC
1212   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1213
1214   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1215     uint64 *targetmem = targetmembase + currentblock * blockwords;
1216     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1217
1218     AdlerChecksum crc;
1219     if (tag_mode_) {
1220       AdlerAddrMemcpyC(targetmem, sourcemem, blocksize, &crc, srcpe);
1221     } else {
1222       AdlerMemcpyC(targetmem, sourcemem, blocksize, &crc);
1223     }
1224
1225     // Investigate miscompares.
1226     if (!crc.Equals(*expectedcrc)) {
1227       logprintf(11, "Log: CrcCopyPage Falling through to slow compare, "
1228                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1229                 expectedcrc->ToHexString().c_str());
1230       int errorcount = CheckRegion(sourcemem,
1231                                    srcpe->pattern,
1232                                    blocksize,
1233                                    currentblock * blocksize, 0);
1234       if (errorcount == 0) {
1235         logprintf(0, "Log: CrcCopyPage CRC mismatch %s != %s, "
1236                      "but no miscompares found. Retrying with fresh data.\n",
1237                   crc.ToHexString().c_str(),
1238                   expectedcrc->ToHexString().c_str());
1239         if (!tag_mode_) {
1240           // Copy the data originally read from this region back again.
1241           // This data should have any corruption read originally while
1242           // calculating the CRC.
1243           memcpy(sourcemem, targetmem, blocksize);
1244           errorcount = CheckRegion(sourcemem,
1245                                    srcpe->pattern,
1246                                    blocksize,
1247                                    currentblock * blocksize, 0);
1248           if (errorcount == 0) {
1249             int apic_id = apicid();
1250             logprintf(0, "Process Error: CPU %d(0x%s) CrcCopyPage "
1251                          "CRC mismatch %s != %s, "
1252                          "but no miscompares found on second pass.\n",
1253                       apic_id, CurrentCpusFormat().c_str(),
1254                       crc.ToHexString().c_str(),
1255                       expectedcrc->ToHexString().c_str());
1256             struct ErrorRecord er;
1257             er.actual = sourcemem[0];
1258             er.expected = 0x0;
1259             er.vaddr = sourcemem;
1260             ProcessError(&er, 0, "Hardware Error");
1261           }
1262         }
1263       }
1264       errors += errorcount;
1265     }
1266   }
1267
1268   // For odd length transfers, we should never hit this.
1269   int leftovers = sat_->page_length() % blocksize;
1270   if (leftovers) {
1271     uint64 *targetmem = targetmembase + blocks * blockwords;
1272     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1273
1274     errors += CheckRegion(sourcemem,
1275                           srcpe->pattern,
1276                           leftovers,
1277                           blocks * blocksize, 0);
1278     int leftoverwords = leftovers / wordsize_;
1279     for (int i = 0; i < leftoverwords; i++) {
1280       targetmem[i] = sourcemem[i];
1281     }
1282   }
1283
1284   // Update pattern reference to reflect new contents.
1285   dstpe->pattern = srcpe->pattern;
1286
1287   // Clean clean clean the errors away.
1288   if (errors) {
1289     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1290     // cause bad data to be propogated across the page.
1291     FillPage(dstpe);
1292   }
1293   return errors;
1294 }
1295
1296
1297
1298 // Invert a block of memory quickly, traversing downwards.
1299 int InvertThread::InvertPageDown(struct page_entry *srcpe) {
1300   const int blocksize = 4096;
1301   const int blockwords = blocksize / wordsize_;
1302   int blocks = sat_->page_length() / blocksize;
1303
1304   // Base addresses for memory copy
1305   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1306
1307   for (int currentblock = blocks-1; currentblock >= 0; currentblock--) {
1308     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1309     for (int i = blockwords - 32; i >= 0; i -= 32) {
1310       for (int index = i + 31; index >= i; --index) {
1311         unsigned int actual = sourcemem[index];
1312         sourcemem[index] = ~actual;
1313       }
1314       OsLayer::FastFlush(&sourcemem[i]);
1315     }
1316   }
1317
1318   return 0;
1319 }
1320
1321 // Invert a block of memory, traversing upwards.
1322 int InvertThread::InvertPageUp(struct page_entry *srcpe) {
1323   const int blocksize = 4096;
1324   const int blockwords = blocksize / wordsize_;
1325   int blocks = sat_->page_length() / blocksize;
1326
1327   // Base addresses for memory copy
1328   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1329
1330   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1331     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1332     for (int i = 0; i < blockwords; i += 32) {
1333       for (int index = i; index <= i + 31; ++index) {
1334         unsigned int actual = sourcemem[index];
1335         sourcemem[index] = ~actual;
1336       }
1337       OsLayer::FastFlush(&sourcemem[i]);
1338     }
1339   }
1340   return 0;
1341 }
1342
1343 // Copy a block of memory quickly, while keeping a CRC of the data.
1344 // Result check if the CRC mismatches. Warm the CPU while running
1345 int WorkerThread::CrcWarmCopyPage(struct page_entry *dstpe,
1346                                   struct page_entry *srcpe) {
1347   int errors = 0;
1348   const int blocksize = 4096;
1349   const int blockwords = blocksize / wordsize_;
1350   int blocks = sat_->page_length() / blocksize;
1351
1352   // Base addresses for memory copy
1353   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1354   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1355   // Remember the expected CRC
1356   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1357
1358   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1359     uint64 *targetmem = targetmembase + currentblock * blockwords;
1360     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1361
1362     AdlerChecksum crc;
1363     if (tag_mode_) {
1364       AdlerAddrMemcpyWarm(targetmem, sourcemem, blocksize, &crc, srcpe);
1365     } else {
1366       os_->AdlerMemcpyWarm(targetmem, sourcemem, blocksize, &crc);
1367     }
1368
1369     // Investigate miscompares.
1370     if (!crc.Equals(*expectedcrc)) {
1371       logprintf(11, "Log: CrcWarmCopyPage Falling through to slow compare, "
1372                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1373                 expectedcrc->ToHexString().c_str());
1374       int errorcount = CheckRegion(sourcemem,
1375                                    srcpe->pattern,
1376                                    blocksize,
1377                                    currentblock * blocksize, 0);
1378       if (errorcount == 0) {
1379         logprintf(0, "Log: CrcWarmCopyPage CRC mismatch %s != %s, "
1380                      "but no miscompares found. Retrying with fresh data.\n",
1381                   crc.ToHexString().c_str(),
1382                   expectedcrc->ToHexString().c_str());
1383         if (!tag_mode_) {
1384           // Copy the data originally read from this region back again.
1385           // This data should have any corruption read originally while
1386           // calculating the CRC.
1387           memcpy(sourcemem, targetmem, blocksize);
1388           errorcount = CheckRegion(sourcemem,
1389                                    srcpe->pattern,
1390                                    blocksize,
1391                                    currentblock * blocksize, 0);
1392           if (errorcount == 0) {
1393             int apic_id = apicid();
1394             logprintf(0, "Process Error: CPU %d(0x%s) CrciWarmCopyPage "
1395                          "CRC mismatch %s != %s, "
1396                          "but no miscompares found on second pass.\n",
1397                       apic_id, CurrentCpusFormat().c_str(),
1398                       crc.ToHexString().c_str(),
1399                       expectedcrc->ToHexString().c_str());
1400             struct ErrorRecord er;
1401             er.actual = sourcemem[0];
1402             er.expected = 0x0;
1403             er.vaddr = sourcemem;
1404             ProcessError(&er, 0, "Hardware Error");
1405           }
1406         }
1407       }
1408       errors += errorcount;
1409     }
1410   }
1411
1412   // For odd length transfers, we should never hit this.
1413   int leftovers = sat_->page_length() % blocksize;
1414   if (leftovers) {
1415     uint64 *targetmem = targetmembase + blocks * blockwords;
1416     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1417
1418     errors += CheckRegion(sourcemem,
1419                           srcpe->pattern,
1420                           leftovers,
1421                           blocks * blocksize, 0);
1422     int leftoverwords = leftovers / wordsize_;
1423     for (int i = 0; i < leftoverwords; i++) {
1424       targetmem[i] = sourcemem[i];
1425     }
1426   }
1427
1428   // Update pattern reference to reflect new contents.
1429   dstpe->pattern = srcpe->pattern;
1430
1431   // Clean clean clean the errors away.
1432   if (errors) {
1433     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1434     // cause bad data to be propogated across the page.
1435     FillPage(dstpe);
1436   }
1437   return errors;
1438 }
1439
1440
1441
1442 // Memory check work loop. Execute until done, then exhaust pages.
1443 bool CheckThread::Work() {
1444   struct page_entry pe;
1445   bool result = true;
1446   int64 loops = 0;
1447
1448   logprintf(9, "Log: Starting Check thread %d\n", thread_num_);
1449
1450   // We want to check all the pages, and
1451   // stop when there aren't any left.
1452   while (true) {
1453     result = result && sat_->GetValid(&pe);
1454     if (!result) {
1455       if (IsReadyToRunNoPause())
1456         logprintf(0, "Process Error: check_thread failed to pop pages, "
1457                   "bailing\n");
1458       else
1459         result = true;
1460       break;
1461     }
1462
1463     // Do the result check.
1464     CrcCheckPage(&pe);
1465
1466     // Push pages back on the valid queue if we are still going,
1467     // throw them out otherwise.
1468     if (IsReadyToRunNoPause())
1469       result = result && sat_->PutValid(&pe);
1470     else
1471       result = result && sat_->PutEmpty(&pe);
1472     if (!result) {
1473       logprintf(0, "Process Error: check_thread failed to push pages, "
1474                 "bailing\n");
1475       break;
1476     }
1477     loops++;
1478   }
1479
1480   pages_copied_ = loops;
1481   status_ = result;
1482   logprintf(9, "Log: Completed %d: Check thread. Status %d, %d pages checked\n",
1483             thread_num_, status_, pages_copied_);
1484   return result;
1485 }
1486
1487
1488 // Memory copy work loop. Execute until marked done.
1489 bool CopyThread::Work() {
1490   struct page_entry src;
1491   struct page_entry dst;
1492   bool result = true;
1493   int64 loops = 0;
1494
1495   logprintf(9, "Log: Starting copy thread %d: cpu %s, mem %x\n",
1496             thread_num_, cpuset_format(&cpu_mask_).c_str(), tag_);
1497
1498   while (IsReadyToRun()) {
1499     // Pop the needed pages.
1500     result = result && sat_->GetValid(&src, tag_);
1501     result = result && sat_->GetEmpty(&dst, tag_);
1502     if (!result) {
1503       logprintf(0, "Process Error: copy_thread failed to pop pages, "
1504                 "bailing\n");
1505       break;
1506     }
1507
1508     // Force errors for unittests.
1509     if (sat_->error_injection()) {
1510       if (loops == 8) {
1511         char *addr = reinterpret_cast<char*>(src.addr);
1512         int offset = random() % sat_->page_length();
1513         addr[offset] = 0xba;
1514       }
1515     }
1516
1517     // We can use memcpy, or CRC check while we copy.
1518     if (sat_->warm()) {
1519       CrcWarmCopyPage(&dst, &src);
1520     } else if (sat_->strict()) {
1521       CrcCopyPage(&dst, &src);
1522     } else {
1523       memcpy(dst.addr, src.addr, sat_->page_length());
1524       dst.pattern = src.pattern;
1525     }
1526
1527     result = result && sat_->PutValid(&dst);
1528     result = result && sat_->PutEmpty(&src);
1529
1530     // Copy worker-threads yield themselves at the end of each copy loop,
1531     // to avoid threads from preempting each other in the middle of the inner
1532     // copy-loop. Cooperations between Copy worker-threads results in less
1533     // unnecessary cache thrashing (which happens when context-switching in the
1534     // middle of the inner copy-loop).
1535     YieldSelf();
1536
1537     if (!result) {
1538       logprintf(0, "Process Error: copy_thread failed to push pages, "
1539                 "bailing\n");
1540       break;
1541     }
1542     loops++;
1543   }
1544
1545   pages_copied_ = loops;
1546   status_ = result;
1547   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1548             thread_num_, status_, pages_copied_);
1549   return result;
1550 }
1551
1552 // Memory invert work loop. Execute until marked done.
1553 bool InvertThread::Work() {
1554   struct page_entry src;
1555   bool result = true;
1556   int64 loops = 0;
1557
1558   logprintf(9, "Log: Starting invert thread %d\n", thread_num_);
1559
1560   while (IsReadyToRun()) {
1561     // Pop the needed pages.
1562     result = result && sat_->GetValid(&src);
1563     if (!result) {
1564       logprintf(0, "Process Error: invert_thread failed to pop pages, "
1565                 "bailing\n");
1566       break;
1567     }
1568
1569     if (sat_->strict())
1570       CrcCheckPage(&src);
1571
1572     // For the same reason CopyThread yields itself (see YieldSelf comment
1573     // in CopyThread::Work(), InvertThread yields itself after each invert
1574     // operation to improve cooperation between different worker threads
1575     // stressing the memory/cache.
1576     InvertPageUp(&src);
1577     YieldSelf();
1578     InvertPageDown(&src);
1579     YieldSelf();
1580     InvertPageDown(&src);
1581     YieldSelf();
1582     InvertPageUp(&src);
1583     YieldSelf();
1584
1585     if (sat_->strict())
1586       CrcCheckPage(&src);
1587
1588     result = result && sat_->PutValid(&src);
1589     if (!result) {
1590       logprintf(0, "Process Error: invert_thread failed to push pages, "
1591                 "bailing\n");
1592       break;
1593     }
1594     loops++;
1595   }
1596
1597   pages_copied_ = loops * 2;
1598   status_ = result;
1599   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1600             thread_num_, status_, pages_copied_);
1601   return result;
1602 }
1603
1604
1605 // Set file name to use for File IO.
1606 void FileThread::SetFile(const char *filename_init) {
1607   filename_ = filename_init;
1608   devicename_ = os_->FindFileDevice(filename_);
1609 }
1610
1611 // Open the file for access.
1612 bool FileThread::OpenFile(int *pfile) {
1613   bool no_O_DIRECT = false;
1614   int flags = O_RDWR | O_CREAT | O_SYNC;
1615   int fd = open(filename_.c_str(), flags | O_DIRECT, 0644);
1616   if (O_DIRECT != 0 && fd < 0 && errno == EINVAL) {
1617     no_O_DIRECT = true;
1618     fd = open(filename_.c_str(), flags, 0644); // Try without O_DIRECT
1619   }
1620   if (fd < 0) {
1621     logprintf(0, "Process Error: Failed to create file %s!!\n",
1622               filename_.c_str());
1623     pages_copied_ = 0;
1624     return false;
1625   }
1626   if (no_O_DIRECT)
1627     os_->ActivateFlushPageCache(); // Not using O_DIRECT fixed EINVAL
1628   *pfile = fd;
1629   return true;
1630 }
1631
1632 // Close the file.
1633 bool FileThread::CloseFile(int fd) {
1634   close(fd);
1635   return true;
1636 }
1637
1638 // Check sector tagging.
1639 bool FileThread::SectorTagPage(struct page_entry *src, int block) {
1640   int page_length = sat_->page_length();
1641   struct FileThread::SectorTag *tag =
1642     (struct FileThread::SectorTag *)(src->addr);
1643
1644   // Tag each sector.
1645   unsigned char magic = ((0xba + thread_num_) & 0xff);
1646   for (int sec = 0; sec < page_length / 512; sec++) {
1647     tag[sec].magic = magic;
1648     tag[sec].block = block & 0xff;
1649     tag[sec].sector = sec & 0xff;
1650     tag[sec].pass = pass_ & 0xff;
1651   }
1652   return true;
1653 }
1654
1655 bool FileThread::WritePageToFile(int fd, struct page_entry *src) {
1656   int page_length = sat_->page_length();
1657   // Fill the file with our data.
1658   int64 size = write(fd, src->addr, page_length);
1659
1660   if (size != page_length) {
1661     os_->ErrorReport(devicename_.c_str(), "write-error", 1);
1662     errorcount_++;
1663     logprintf(0, "Block Error: file_thread failed to write, "
1664               "bailing\n");
1665     return false;
1666   }
1667   return true;
1668 }
1669
1670 // Write the data to the file.
1671 bool FileThread::WritePages(int fd) {
1672   int strict = sat_->strict();
1673
1674   // Start fresh at beginning of file for each batch of pages.
1675   lseek64(fd, 0, SEEK_SET);
1676   for (int i = 0; i < sat_->disk_pages(); i++) {
1677     struct page_entry src;
1678     if (!GetValidPage(&src))
1679       return false;
1680     // Save expected pattern.
1681     page_recs_[i].pattern = src.pattern;
1682     page_recs_[i].src = src.addr;
1683
1684     // Check data correctness.
1685     if (strict)
1686       CrcCheckPage(&src);
1687
1688     SectorTagPage(&src, i);
1689
1690     bool result = WritePageToFile(fd, &src);
1691
1692     if (!PutEmptyPage(&src))
1693       return false;
1694
1695     if (!result)
1696       return false;
1697   }
1698   return os_->FlushPageCache(); // If O_DIRECT worked, this will be a NOP.
1699 }
1700
1701 // Copy data from file into memory block.
1702 bool FileThread::ReadPageFromFile(int fd, struct page_entry *dst) {
1703   int page_length = sat_->page_length();
1704
1705   // Do the actual read.
1706   int64 size = read(fd, dst->addr, page_length);
1707   if (size != page_length) {
1708     os_->ErrorReport(devicename_.c_str(), "read-error", 1);
1709     logprintf(0, "Block Error: file_thread failed to read, "
1710               "bailing\n");
1711     errorcount_++;
1712     return false;
1713   }
1714   return true;
1715 }
1716
1717 // Check sector tagging.
1718 bool FileThread::SectorValidatePage(const struct PageRec &page,
1719                                     struct page_entry *dst, int block) {
1720   // Error injection.
1721   static int calls = 0;
1722   calls++;
1723
1724   // Do sector tag compare.
1725   int firstsector = -1;
1726   int lastsector = -1;
1727   bool badsector = false;
1728   int page_length = sat_->page_length();
1729
1730   // Cast data block into an array of tagged sectors.
1731   struct FileThread::SectorTag *tag =
1732   (struct FileThread::SectorTag *)(dst->addr);
1733
1734   sat_assert(sizeof(*tag) == 512);
1735
1736   // Error injection.
1737   if (sat_->error_injection()) {
1738     if (calls == 2) {
1739       for (int badsec = 8; badsec < 17; badsec++)
1740         tag[badsec].pass = 27;
1741     }
1742     if (calls == 18) {
1743       (static_cast<int32*>(dst->addr))[27] = 0xbadda7a;
1744     }
1745   }
1746
1747   // Check each sector for the correct tag we added earlier,
1748   // then revert the tag to the to normal data pattern.
1749   unsigned char magic = ((0xba + thread_num_) & 0xff);
1750   for (int sec = 0; sec < page_length / 512; sec++) {
1751     // Check magic tag.
1752     if ((tag[sec].magic != magic) ||
1753         (tag[sec].block != (block & 0xff)) ||
1754         (tag[sec].sector != (sec & 0xff)) ||
1755         (tag[sec].pass != (pass_ & 0xff))) {
1756       // Offset calculation for tag location.
1757       int offset = sec * sizeof(SectorTag);
1758       if (tag[sec].block != (block & 0xff))
1759         offset += 1 * sizeof(uint8);
1760       else if (tag[sec].sector != (sec & 0xff))
1761         offset += 2 * sizeof(uint8);
1762       else if (tag[sec].pass != (pass_ & 0xff))
1763         offset += 3 * sizeof(uint8);
1764
1765       // Run sector tag error through diagnoser for logging and reporting.
1766       errorcount_ += 1;
1767       os_->error_diagnoser_->AddHDDSectorTagError(devicename_, tag[sec].block,
1768                                                   offset,
1769                                                   tag[sec].sector,
1770                                                   page.src, page.dst);
1771
1772       logprintf(5, "Sector Error: Sector tag @ 0x%x, pass %d/%d. "
1773                 "sec %x/%x, block %d/%d, magic %x/%x, File: %s \n",
1774                 block * page_length + 512 * sec,
1775                 (pass_ & 0xff), (unsigned int)tag[sec].pass,
1776                 sec, (unsigned int)tag[sec].sector,
1777                 block, (unsigned int)tag[sec].block,
1778                 magic, (unsigned int)tag[sec].magic,
1779                 filename_.c_str());
1780
1781       // Keep track of first and last bad sector.
1782       if (firstsector == -1)
1783         firstsector = (block * page_length / 512) + sec;
1784       lastsector = (block * page_length / 512) + sec;
1785       badsector = true;
1786     }
1787     // Patch tag back to proper pattern.
1788     unsigned int *addr = (unsigned int *)(&tag[sec]);
1789     *addr = dst->pattern->pattern(512 * sec / sizeof(*addr));
1790   }
1791
1792   // If we found sector errors:
1793   if (badsector == true) {
1794     logprintf(5, "Log: file sector miscompare at offset %x-%x. File: %s\n",
1795               firstsector * 512,
1796               ((lastsector + 1) * 512) - 1,
1797               filename_.c_str());
1798
1799     // Either exit immediately, or patch the data up and continue.
1800     if (sat_->stop_on_error()) {
1801       exit(1);
1802     } else {
1803       // Patch up bad pages.
1804       for (int block = (firstsector * 512) / page_length;
1805           block <= (lastsector * 512) / page_length;
1806           block++) {
1807         unsigned int *memblock = static_cast<unsigned int *>(dst->addr);
1808         int length = page_length / wordsize_;
1809         for (int i = 0; i < length; i++) {
1810           memblock[i] = dst->pattern->pattern(i);
1811         }
1812       }
1813     }
1814   }
1815   return true;
1816 }
1817
1818 // Get memory for an incoming data transfer..
1819 bool FileThread::PagePrepare() {
1820   // We can only do direct IO to SAT pages if it is normal mem.
1821   page_io_ = os_->normal_mem();
1822
1823   // Init a local buffer if we need it.
1824   if (!page_io_) {
1825 #ifdef HAVE_POSIX_MEMALIGN
1826     int result = posix_memalign(&local_page_, 512, sat_->page_length());
1827 #else
1828     local_page_ = memalign(512, sat_->page_length());
1829     int result = (local_page_ == 0);
1830 #endif
1831     if (result) {
1832       logprintf(0, "Process Error: disk thread posix_memalign "
1833                    "returned %d (fail)\n",
1834                 result);
1835       status_ = false;
1836       return false;
1837     }
1838   }
1839   return true;
1840 }
1841
1842
1843 // Remove memory allocated for data transfer.
1844 bool FileThread::PageTeardown() {
1845   // Free a local buffer if we need to.
1846   if (!page_io_) {
1847     free(local_page_);
1848   }
1849   return true;
1850 }
1851
1852
1853
1854 // Get memory for an incoming data transfer..
1855 bool FileThread::GetEmptyPage(struct page_entry *dst) {
1856   if (page_io_) {
1857     if (!sat_->GetEmpty(dst))
1858       return false;
1859   } else {
1860     dst->addr = local_page_;
1861     dst->offset = 0;
1862     dst->pattern = 0;
1863   }
1864   return true;
1865 }
1866
1867 // Get memory for an outgoing data transfer..
1868 bool FileThread::GetValidPage(struct page_entry *src) {
1869   struct page_entry tmp;
1870   if (!sat_->GetValid(&tmp))
1871     return false;
1872   if (page_io_) {
1873     *src = tmp;
1874     return true;
1875   } else {
1876     src->addr = local_page_;
1877     src->offset = 0;
1878     CrcCopyPage(src, &tmp);
1879     if (!sat_->PutValid(&tmp))
1880       return false;
1881   }
1882   return true;
1883 }
1884
1885
1886 // Throw out a used empty page.
1887 bool FileThread::PutEmptyPage(struct page_entry *src) {
1888   if (page_io_) {
1889     if (!sat_->PutEmpty(src))
1890       return false;
1891   }
1892   return true;
1893 }
1894
1895 // Throw out a used, filled page.
1896 bool FileThread::PutValidPage(struct page_entry *src) {
1897   if (page_io_) {
1898     if (!sat_->PutValid(src))
1899       return false;
1900   }
1901   return true;
1902 }
1903
1904 // Copy data from file into memory blocks.
1905 bool FileThread::ReadPages(int fd) {
1906   int page_length = sat_->page_length();
1907   int strict = sat_->strict();
1908   bool result = true;
1909
1910   // Read our data back out of the file, into it's new location.
1911   lseek64(fd, 0, SEEK_SET);
1912   for (int i = 0; i < sat_->disk_pages(); i++) {
1913     struct page_entry dst;
1914     if (!GetEmptyPage(&dst))
1915       return false;
1916     // Retrieve expected pattern.
1917     dst.pattern = page_recs_[i].pattern;
1918     // Update page recordpage record.
1919     page_recs_[i].dst = dst.addr;
1920
1921     // Read from the file into destination page.
1922     if (!ReadPageFromFile(fd, &dst)) {
1923         PutEmptyPage(&dst);
1924         return false;
1925     }
1926
1927     SectorValidatePage(page_recs_[i], &dst, i);
1928
1929     // Ensure that the transfer ended up with correct data.
1930     if (strict) {
1931       // Record page index currently CRC checked.
1932       crc_page_ = i;
1933       int errors = CrcCheckPage(&dst);
1934       if (errors) {
1935         logprintf(5, "Log: file miscompare at block %d, "
1936                   "offset %x-%x. File: %s\n",
1937                   i, i * page_length, ((i + 1) * page_length) - 1,
1938                   filename_.c_str());
1939         result = false;
1940       }
1941       crc_page_ = -1;
1942       errorcount_ += errors;
1943     }
1944     if (!PutValidPage(&dst))
1945       return false;
1946   }
1947   return result;
1948 }
1949
1950 // File IO work loop. Execute until marked done.
1951 bool FileThread::Work() {
1952   bool result = true;
1953   int64 loops = 0;
1954
1955   logprintf(9, "Log: Starting file thread %d, file %s, device %s\n",
1956             thread_num_,
1957             filename_.c_str(),
1958             devicename_.c_str());
1959
1960   if (!PagePrepare()) {
1961     status_ = false;
1962     return false;
1963   }
1964
1965   // Open the data IO file.
1966   int fd = 0;
1967   if (!OpenFile(&fd)) {
1968     status_ = false;
1969     return false;
1970   }
1971
1972   pass_ = 0;
1973
1974   // Load patterns into page records.
1975   page_recs_ = new struct PageRec[sat_->disk_pages()];
1976   for (int i = 0; i < sat_->disk_pages(); i++) {
1977     page_recs_[i].pattern = new struct Pattern();
1978   }
1979
1980   // Loop until done.
1981   while (IsReadyToRun()) {
1982     // Do the file write.
1983     if (!(result = result && WritePages(fd)))
1984       break;
1985
1986     // Do the file read.
1987     if (!(result = result && ReadPages(fd)))
1988       break;
1989
1990     loops++;
1991     pass_ = loops;
1992   }
1993
1994   pages_copied_ = loops * sat_->disk_pages();
1995
1996   // Clean up.
1997   CloseFile(fd);
1998   PageTeardown();
1999
2000   logprintf(9, "Log: Completed %d: file thread status %d, %d pages copied\n",
2001             thread_num_, status_, pages_copied_);
2002   // Failure to read from device indicates hardware,
2003   // rather than procedural SW error.
2004   status_ = true;
2005   return true;
2006 }
2007
2008 bool NetworkThread::IsNetworkStopSet() {
2009   return !IsReadyToRunNoPause();
2010 }
2011
2012 bool NetworkSlaveThread::IsNetworkStopSet() {
2013   // This thread has no completion status.
2014   // It finishes whever there is no more data to be
2015   // passed back.
2016   return true;
2017 }
2018
2019 // Set ip name to use for Network IO.
2020 void NetworkThread::SetIP(const char *ipaddr_init) {
2021   strncpy(ipaddr_, ipaddr_init, 256);
2022 }
2023
2024 // Create a socket.
2025 // Return 0 on error.
2026 bool NetworkThread::CreateSocket(int *psocket) {
2027   int sock = socket(AF_INET, SOCK_STREAM, 0);
2028   if (sock == -1) {
2029     logprintf(0, "Process Error: Cannot open socket\n");
2030     pages_copied_ = 0;
2031     status_ = false;
2032     return false;
2033   }
2034   *psocket = sock;
2035   return true;
2036 }
2037
2038 // Close the socket.
2039 bool NetworkThread::CloseSocket(int sock) {
2040   close(sock);
2041   return true;
2042 }
2043
2044 // Initiate the tcp connection.
2045 bool NetworkThread::Connect(int sock) {
2046   struct sockaddr_in dest_addr;
2047   dest_addr.sin_family = AF_INET;
2048   dest_addr.sin_port = htons(kNetworkPort);
2049   memset(&(dest_addr.sin_zero), '\0', sizeof(dest_addr.sin_zero));
2050
2051   // Translate dot notation to u32.
2052   if (inet_aton(ipaddr_, &dest_addr.sin_addr) == 0) {
2053     logprintf(0, "Process Error: Cannot resolve %s\n", ipaddr_);
2054     pages_copied_ = 0;
2055     status_ = false;
2056     return false;
2057   }
2058
2059   if (-1 == connect(sock, reinterpret_cast<struct sockaddr *>(&dest_addr),
2060                     sizeof(struct sockaddr))) {
2061     logprintf(0, "Process Error: Cannot connect %s\n", ipaddr_);
2062     pages_copied_ = 0;
2063     status_ = false;
2064     return false;
2065   }
2066   return true;
2067 }
2068
2069 // Initiate the tcp connection.
2070 bool NetworkListenThread::Listen() {
2071   struct sockaddr_in sa;
2072
2073   memset(&(sa.sin_zero), '\0', sizeof(sa.sin_zero));
2074
2075   sa.sin_family = AF_INET;
2076   sa.sin_addr.s_addr = INADDR_ANY;
2077   sa.sin_port = htons(kNetworkPort);
2078
2079   if (-1 == bind(sock_, (struct sockaddr*)&sa, sizeof(struct sockaddr))) {
2080     char buf[256];
2081     sat_strerror(errno, buf, sizeof(buf));
2082     logprintf(0, "Process Error: Cannot bind socket: %s\n", buf);
2083     pages_copied_ = 0;
2084     status_ = false;
2085     return false;
2086   }
2087   listen(sock_, 3);
2088   return true;
2089 }
2090
2091 // Wait for a connection from a network traffic generation thread.
2092 bool NetworkListenThread::Wait() {
2093     fd_set rfds;
2094     struct timeval tv;
2095     int retval;
2096
2097     // Watch sock_ to see when it has input.
2098     FD_ZERO(&rfds);
2099     FD_SET(sock_, &rfds);
2100     // Wait up to five seconds.
2101     tv.tv_sec = 5;
2102     tv.tv_usec = 0;
2103
2104     retval = select(sock_ + 1, &rfds, NULL, NULL, &tv);
2105
2106     return (retval > 0);
2107 }
2108
2109 // Wait for a connection from a network traffic generation thread.
2110 bool NetworkListenThread::GetConnection(int *pnewsock) {
2111   struct sockaddr_in sa;
2112   socklen_t size = sizeof(struct sockaddr_in);
2113
2114   int newsock = accept(sock_, reinterpret_cast<struct sockaddr *>(&sa), &size);
2115   if (newsock < 0)  {
2116     logprintf(0, "Process Error: Did not receive connection\n");
2117     pages_copied_ = 0;
2118     status_ = false;
2119     return false;
2120   }
2121   *pnewsock = newsock;
2122   return true;
2123 }
2124
2125 // Send a page, return false if a page was not sent.
2126 bool NetworkThread::SendPage(int sock, struct page_entry *src) {
2127   int page_length = sat_->page_length();
2128   char *address = static_cast<char*>(src->addr);
2129
2130   // Send our data over the network.
2131   int size = page_length;
2132   while (size) {
2133     int transferred = send(sock, address + (page_length - size), size, 0);
2134     if ((transferred == 0) || (transferred == -1)) {
2135       if (!IsNetworkStopSet()) {
2136         char buf[256] = "";
2137         sat_strerror(errno, buf, sizeof(buf));
2138         logprintf(0, "Process Error: Thread %d, "
2139                      "Network write failed, bailing. (%s)\n",
2140                   thread_num_, buf);
2141         status_ = false;
2142       }
2143       return false;
2144     }
2145     size = size - transferred;
2146   }
2147   return true;
2148 }
2149
2150 // Receive a page. Return false if a page was not received.
2151 bool NetworkThread::ReceivePage(int sock, struct page_entry *dst) {
2152   int page_length = sat_->page_length();
2153   char *address = static_cast<char*>(dst->addr);
2154
2155   // Maybe we will get our data back again, maybe not.
2156   int size = page_length;
2157   while (size) {
2158     int transferred = recv(sock, address + (page_length - size), size, 0);
2159     if ((transferred == 0) || (transferred == -1)) {
2160       // Typically network slave thread should exit as network master
2161       // thread stops sending data.
2162       if (IsNetworkStopSet()) {
2163         int err = errno;
2164         if (transferred == 0 && err == 0) {
2165           // Two system setups will not sync exactly,
2166           // allow early exit, but log it.
2167           logprintf(0, "Log: Net thread did not receive any data, exiting.\n");
2168         } else {
2169           char buf[256] = "";
2170           sat_strerror(err, buf, sizeof(buf));
2171           // Print why we failed.
2172           logprintf(0, "Process Error: Thread %d, "
2173                        "Network read failed, bailing (%s).\n",
2174                     thread_num_, buf);
2175           status_ = false;
2176           // Print arguments and results.
2177           logprintf(0, "Log: recv(%d, address %x, size %x, 0) == %x, err %d\n",
2178                     sock, address + (page_length - size),
2179                     size, transferred, err);
2180           if ((transferred == 0) &&
2181               (page_length - size < 512) &&
2182               (page_length - size > 0)) {
2183             // Print null terminated data received, to see who's been
2184             // sending us supicious unwanted data.
2185             address[page_length - size] = 0;
2186             logprintf(0, "Log: received  %d bytes: '%s'\n",
2187                       page_length - size, address);
2188           }
2189         }
2190       }
2191       return false;
2192     }
2193     size = size - transferred;
2194   }
2195   return true;
2196 }
2197
2198 // Network IO work loop. Execute until marked done.
2199 // Return true if the thread ran as expected.
2200 bool NetworkThread::Work() {
2201   logprintf(9, "Log: Starting network thread %d, ip %s\n",
2202             thread_num_,
2203             ipaddr_);
2204
2205   // Make a socket.
2206   int sock = 0;
2207   if (!CreateSocket(&sock))
2208     return false;
2209
2210   // Network IO loop requires network slave thread to have already initialized.
2211   // We will sleep here for awhile to ensure that the slave thread will be
2212   // listening by the time we connect.
2213   // Sleep for 15 seconds.
2214   sat_sleep(15);
2215   logprintf(9, "Log: Starting execution of network thread %d, ip %s\n",
2216             thread_num_,
2217             ipaddr_);
2218
2219
2220   // Connect to a slave thread.
2221   if (!Connect(sock))
2222     return false;
2223
2224   // Loop until done.
2225   bool result = true;
2226   int strict = sat_->strict();
2227   int64 loops = 0;
2228   while (IsReadyToRun()) {
2229     struct page_entry src;
2230     struct page_entry dst;
2231     result = result && sat_->GetValid(&src);
2232     result = result && sat_->GetEmpty(&dst);
2233     if (!result) {
2234       logprintf(0, "Process Error: net_thread failed to pop pages, "
2235                 "bailing\n");
2236       break;
2237     }
2238
2239     // Check data correctness.
2240     if (strict)
2241       CrcCheckPage(&src);
2242
2243     // Do the network write.
2244     if (!(result = result && SendPage(sock, &src)))
2245       break;
2246
2247     // Update pattern reference to reflect new contents.
2248     dst.pattern = src.pattern;
2249
2250     // Do the network read.
2251     if (!(result = result && ReceivePage(sock, &dst)))
2252       break;
2253
2254     // Ensure that the transfer ended up with correct data.
2255     if (strict)
2256       CrcCheckPage(&dst);
2257
2258     // Return all of our pages to the queue.
2259     result = result && sat_->PutValid(&dst);
2260     result = result && sat_->PutEmpty(&src);
2261     if (!result) {
2262       logprintf(0, "Process Error: net_thread failed to push pages, "
2263                 "bailing\n");
2264       break;
2265     }
2266     loops++;
2267   }
2268
2269   pages_copied_ = loops;
2270   status_ = result;
2271
2272   // Clean up.
2273   CloseSocket(sock);
2274
2275   logprintf(9, "Log: Completed %d: network thread status %d, "
2276                "%d pages copied\n",
2277             thread_num_, status_, pages_copied_);
2278   return result;
2279 }
2280
2281 // Spawn slave threads for incoming connections.
2282 bool NetworkListenThread::SpawnSlave(int newsock, int threadid) {
2283   logprintf(12, "Log: Listen thread spawning slave\n");
2284
2285   // Spawn slave thread, to reflect network traffic back to sender.
2286   ChildWorker *child_worker = new ChildWorker;
2287   child_worker->thread.SetSock(newsock);
2288   child_worker->thread.InitThread(threadid, sat_, os_, patternlist_,
2289                                   &child_worker->status);
2290   child_worker->status.Initialize();
2291   child_worker->thread.SpawnThread();
2292   child_workers_.push_back(child_worker);
2293
2294   return true;
2295 }
2296
2297 // Reap slave threads.
2298 bool NetworkListenThread::ReapSlaves() {
2299   bool result = true;
2300   // Gather status and reap threads.
2301   logprintf(12, "Log: Joining all outstanding threads\n");
2302
2303   for (size_t i = 0; i < child_workers_.size(); i++) {
2304     NetworkSlaveThread& child_thread = child_workers_[i]->thread;
2305     logprintf(12, "Log: Joining slave thread %d\n", i);
2306     child_thread.JoinThread();
2307     if (child_thread.GetStatus() != 1) {
2308       logprintf(0, "Process Error: Slave Thread %d failed with status %d\n", i,
2309                 child_thread.GetStatus());
2310       result = false;
2311     }
2312     errorcount_ += child_thread.GetErrorCount();
2313     logprintf(9, "Log: Slave Thread %d found %lld miscompares\n", i,
2314               child_thread.GetErrorCount());
2315     pages_copied_ += child_thread.GetPageCount();
2316   }
2317
2318   return result;
2319 }
2320
2321 // Network listener IO work loop. Execute until marked done.
2322 // Return false on fatal software error.
2323 bool NetworkListenThread::Work() {
2324   logprintf(9, "Log: Starting network listen thread %d\n",
2325             thread_num_);
2326
2327   // Make a socket.
2328   sock_ = 0;
2329   if (!CreateSocket(&sock_)) {
2330     status_ = false;
2331     return false;
2332   }
2333   logprintf(9, "Log: Listen thread created sock\n");
2334
2335   // Allows incoming connections to be queued up by socket library.
2336   int newsock = 0;
2337   Listen();
2338   logprintf(12, "Log: Listen thread waiting for incoming connections\n");
2339
2340   // Wait on incoming connections, and spawn worker threads for them.
2341   int threadcount = 0;
2342   while (IsReadyToRun()) {
2343     // Poll for connections that we can accept().
2344     if (Wait()) {
2345       // Accept those connections.
2346       logprintf(12, "Log: Listen thread found incoming connection\n");
2347       if (GetConnection(&newsock)) {
2348         SpawnSlave(newsock, threadcount);
2349         threadcount++;
2350       }
2351     }
2352   }
2353
2354   // Gather status and join spawned threads.
2355   ReapSlaves();
2356
2357   // Delete the child workers.
2358   for (ChildVector::iterator it = child_workers_.begin();
2359        it != child_workers_.end(); ++it) {
2360     (*it)->status.Destroy();
2361     delete *it;
2362   }
2363   child_workers_.clear();
2364
2365   CloseSocket(sock_);
2366
2367   status_ = true;
2368   logprintf(9,
2369             "Log: Completed %d: network listen thread status %d, "
2370             "%d pages copied\n",
2371             thread_num_, status_, pages_copied_);
2372   return true;
2373 }
2374
2375 // Set network reflector socket struct.
2376 void NetworkSlaveThread::SetSock(int sock) {
2377   sock_ = sock;
2378 }
2379
2380 // Network reflector IO work loop. Execute until marked done.
2381 // Return false on fatal software error.
2382 bool NetworkSlaveThread::Work() {
2383   logprintf(9, "Log: Starting network slave thread %d\n",
2384             thread_num_);
2385
2386   // Verify that we have a socket.
2387   int sock = sock_;
2388   if (!sock) {
2389     status_ = false;
2390     return false;
2391   }
2392
2393   // Loop until done.
2394   int64 loops = 0;
2395   // Init a local buffer for storing data.
2396   void *local_page = NULL;
2397 #ifdef HAVE_POSIX_MEMALIGN
2398   int result = posix_memalign(&local_page, 512, sat_->page_length());
2399 #else
2400   local_page = memalign(512, sat_->page_length());
2401   int result = (local_page == 0);
2402 #endif
2403   if (result) {
2404     logprintf(0, "Process Error: net slave posix_memalign "
2405                  "returned %d (fail)\n",
2406               result);
2407     status_ = false;
2408     return false;
2409   }
2410
2411   struct page_entry page;
2412   page.addr = local_page;
2413
2414   // This thread will continue to run as long as the thread on the other end of
2415   // the socket is still sending and receiving data.
2416   while (1) {
2417     // Do the network read.
2418     if (!ReceivePage(sock, &page))
2419       break;
2420
2421     // Do the network write.
2422     if (!SendPage(sock, &page))
2423       break;
2424
2425     loops++;
2426   }
2427
2428   pages_copied_ = loops;
2429   // No results provided from this type of thread.
2430   status_ = true;
2431
2432   // Clean up.
2433   CloseSocket(sock);
2434
2435   logprintf(9,
2436             "Log: Completed %d: network slave thread status %d, "
2437             "%d pages copied\n",
2438             thread_num_, status_, pages_copied_);
2439   return true;
2440 }
2441
2442 // Thread work loop. Execute until marked finished.
2443 bool ErrorPollThread::Work() {
2444   logprintf(9, "Log: Starting system error poll thread %d\n", thread_num_);
2445
2446   // This calls a generic error polling function in the Os abstraction layer.
2447   do {
2448     errorcount_ += os_->ErrorPoll();
2449     os_->ErrorWait();
2450   } while (IsReadyToRun());
2451
2452   logprintf(9, "Log: Finished system error poll thread %d: %d errors\n",
2453             thread_num_, errorcount_);
2454   status_ = true;
2455   return true;
2456 }
2457
2458 // Worker thread to heat up CPU.
2459 // This thread does not evaluate pass/fail or software error.
2460 bool CpuStressThread::Work() {
2461   logprintf(9, "Log: Starting CPU stress thread %d\n", thread_num_);
2462
2463   do {
2464     // Run ludloff's platform/CPU-specific assembly workload.
2465     os_->CpuStressWorkload();
2466     YieldSelf();
2467   } while (IsReadyToRun());
2468
2469   logprintf(9, "Log: Finished CPU stress thread %d:\n",
2470             thread_num_);
2471   status_ = true;
2472   return true;
2473 }
2474
2475 CpuCacheCoherencyThread::CpuCacheCoherencyThread(cc_cacheline_data *data,
2476                                                  int cacheline_count,
2477                                                  int thread_num,
2478                                                  int inc_count) {
2479   cc_cacheline_data_ = data;
2480   cc_cacheline_count_ = cacheline_count;
2481   cc_thread_num_ = thread_num;
2482   cc_inc_count_ = inc_count;
2483 }
2484
2485 // Worked thread to test the cache coherency of the CPUs
2486 // Return false on fatal sw error.
2487 bool CpuCacheCoherencyThread::Work() {
2488   logprintf(9, "Log: Starting the Cache Coherency thread %d\n",
2489             cc_thread_num_);
2490   uint64 time_start, time_end;
2491   struct timeval tv;
2492
2493   unsigned int seed = static_cast<unsigned int>(gettid());
2494   gettimeofday(&tv, NULL);  // Get the timestamp before increments.
2495   time_start = tv.tv_sec * 1000000ULL + tv.tv_usec;
2496
2497   uint64 total_inc = 0;  // Total increments done by the thread.
2498   while (IsReadyToRun()) {
2499     for (int i = 0; i < cc_inc_count_; i++) {
2500       // Choose a datastructure in random and increment the appropriate
2501       // member in that according to the offset (which is the same as the
2502       // thread number.
2503 #ifdef HAVE_RAND_R
2504       int r = rand_r(&seed);
2505 #else
2506       int r = rand();
2507 #endif
2508       r = cc_cacheline_count_ * (r / (RAND_MAX + 1.0));
2509       // Increment the member of the randomely selected structure.
2510       (cc_cacheline_data_[r].num[cc_thread_num_])++;
2511     }
2512
2513     total_inc += cc_inc_count_;
2514
2515     // Calculate if the local counter matches with the global value
2516     // in all the cache line structures for this particular thread.
2517     int cc_global_num = 0;
2518     for (int cline_num = 0; cline_num < cc_cacheline_count_; cline_num++) {
2519       cc_global_num += cc_cacheline_data_[cline_num].num[cc_thread_num_];
2520       // Reset the cachline member's value for the next run.
2521       cc_cacheline_data_[cline_num].num[cc_thread_num_] = 0;
2522     }
2523     if (sat_->error_injection())
2524       cc_global_num = -1;
2525
2526     if (cc_global_num != cc_inc_count_) {
2527       errorcount_++;
2528       logprintf(0, "Hardware Error: global(%d) and local(%d) do not match\n",
2529                 cc_global_num, cc_inc_count_);
2530     }
2531   }
2532   gettimeofday(&tv, NULL);  // Get the timestamp at the end.
2533   time_end = tv.tv_sec * 1000000ULL + tv.tv_usec;
2534
2535   uint64 us_elapsed = time_end - time_start;
2536   // inc_rate is the no. of increments per second.
2537   double inc_rate = total_inc * 1e6 / us_elapsed;
2538
2539   logprintf(4, "Stats: CC Thread(%d): Time=%llu us,"
2540             " Increments=%llu, Increments/sec = %.6lf\n",
2541             cc_thread_num_, us_elapsed, total_inc, inc_rate);
2542   logprintf(9, "Log: Finished CPU Cache Coherency thread %d:\n",
2543             cc_thread_num_);
2544   status_ = true;
2545   return true;
2546 }
2547
2548 DiskThread::DiskThread(DiskBlockTable *block_table) {
2549   read_block_size_ = kSectorSize;   // default 1 sector (512 bytes)
2550   write_block_size_ = kSectorSize;  // this assumes read and write block size
2551                                     // are the same
2552   segment_size_ = -1;               // use the entire disk as one segment
2553   cache_size_ = 16 * 1024 * 1024;   // assume 16MiB cache by default
2554   // Use a queue such that 3/2 times as much data as the cache can hold
2555   // is written before it is read so that there is little chance the read
2556   // data is in the cache.
2557   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2558   blocks_per_segment_ = 32;
2559
2560   read_threshold_ = 100000;         // 100ms is a reasonable limit for
2561   write_threshold_ = 100000;        // reading/writing a sector
2562
2563   read_timeout_ = 5000000;          // 5 seconds should be long enough for a
2564   write_timeout_ = 5000000;         // timout for reading/writing
2565
2566   device_sectors_ = 0;
2567   non_destructive_ = 0;
2568
2569 #ifdef HAVE_LIBAIO_H
2570   aio_ctx_ = 0;
2571 #endif
2572   block_table_ = block_table;
2573   update_block_table_ = 1;
2574
2575   block_buffer_ = NULL;
2576
2577   blocks_written_ = 0;
2578   blocks_read_ = 0;
2579 }
2580
2581 DiskThread::~DiskThread() {
2582   if (block_buffer_)
2583     free(block_buffer_);
2584 }
2585
2586 // Set filename for device file (in /dev).
2587 void DiskThread::SetDevice(const char *device_name) {
2588   device_name_ = device_name;
2589 }
2590
2591 // Set various parameters that control the behaviour of the test.
2592 // -1 is used as a sentinel value on each parameter (except non_destructive)
2593 // to indicate that the parameter not be set.
2594 bool DiskThread::SetParameters(int read_block_size,
2595                                int write_block_size,
2596                                int64 segment_size,
2597                                int64 cache_size,
2598                                int blocks_per_segment,
2599                                int64 read_threshold,
2600                                int64 write_threshold,
2601                                int non_destructive) {
2602   if (read_block_size != -1) {
2603     // Blocks must be aligned to the disk's sector size.
2604     if (read_block_size % kSectorSize != 0) {
2605       logprintf(0, "Process Error: Block size must be a multiple of %d "
2606                 "(thread %d).\n", kSectorSize, thread_num_);
2607       return false;
2608     }
2609
2610     read_block_size_ = read_block_size;
2611   }
2612
2613   if (write_block_size != -1) {
2614     // Write blocks must be aligned to the disk's sector size and to the
2615     // block size.
2616     if (write_block_size % kSectorSize != 0) {
2617       logprintf(0, "Process Error: Write block size must be a multiple "
2618                 "of %d (thread %d).\n", kSectorSize, thread_num_);
2619       return false;
2620     }
2621     if (write_block_size % read_block_size_ != 0) {
2622       logprintf(0, "Process Error: Write block size must be a multiple "
2623                 "of the read block size, which is %d (thread %d).\n",
2624                 read_block_size_, thread_num_);
2625       return false;
2626     }
2627
2628     write_block_size_ = write_block_size;
2629
2630   } else {
2631     // Make sure write_block_size_ is still valid.
2632     if (read_block_size_ > write_block_size_) {
2633       logprintf(5, "Log: Assuming write block size equal to read block size, "
2634                 "which is %d (thread %d).\n", read_block_size_,
2635                 thread_num_);
2636       write_block_size_ = read_block_size_;
2637     } else {
2638       if (write_block_size_ % read_block_size_ != 0) {
2639         logprintf(0, "Process Error: Write block size (defined as %d) must "
2640                   "be a multiple of the read block size, which is %d "
2641                   "(thread %d).\n", write_block_size_, read_block_size_,
2642                   thread_num_);
2643         return false;
2644       }
2645     }
2646   }
2647
2648   if (cache_size != -1) {
2649     cache_size_ = cache_size;
2650   }
2651
2652   if (blocks_per_segment != -1) {
2653     if (blocks_per_segment <= 0) {
2654       logprintf(0, "Process Error: Blocks per segment must be greater than "
2655                    "zero.\n (thread %d)", thread_num_);
2656       return false;
2657     }
2658
2659     blocks_per_segment_ = blocks_per_segment;
2660   }
2661
2662   if (read_threshold != -1) {
2663     if (read_threshold <= 0) {
2664       logprintf(0, "Process Error: Read threshold must be greater than "
2665                    "zero (thread %d).\n", thread_num_);
2666       return false;
2667     }
2668
2669     read_threshold_ = read_threshold;
2670   }
2671
2672   if (write_threshold != -1) {
2673     if (write_threshold <= 0) {
2674       logprintf(0, "Process Error: Write threshold must be greater than "
2675                    "zero (thread %d).\n", thread_num_);
2676       return false;
2677     }
2678
2679     write_threshold_ = write_threshold;
2680   }
2681
2682   if (segment_size != -1) {
2683     // Segments must be aligned to the disk's sector size.
2684     if (segment_size % kSectorSize != 0) {
2685       logprintf(0, "Process Error: Segment size must be a multiple of %d"
2686                 " (thread %d).\n", kSectorSize, thread_num_);
2687       return false;
2688     }
2689
2690     segment_size_ = segment_size / kSectorSize;
2691   }
2692
2693   non_destructive_ = non_destructive;
2694
2695   // Having a queue of 150% of blocks that will fit in the disk's cache
2696   // should be enough to force out the oldest block before it is read and hence,
2697   // making sure the data comes form the disk and not the cache.
2698   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2699   // Updating DiskBlockTable parameters
2700   if (update_block_table_) {
2701     block_table_->SetParameters(kSectorSize, write_block_size_,
2702                                 device_sectors_, segment_size_,
2703                                 device_name_);
2704   }
2705   return true;
2706 }
2707
2708 // Open a device, return false on failure.
2709 bool DiskThread::OpenDevice(int *pfile) {
2710   bool no_O_DIRECT = false;
2711   int flags = O_RDWR | O_SYNC | O_LARGEFILE;
2712   int fd = open(device_name_.c_str(), flags | O_DIRECT, 0);
2713   if (O_DIRECT != 0 && fd < 0 && errno == EINVAL) {
2714     no_O_DIRECT = true;
2715     fd = open(device_name_.c_str(), flags, 0); // Try without O_DIRECT
2716   }
2717   if (fd < 0) {
2718     logprintf(0, "Process Error: Failed to open device %s (thread %d)!!\n",
2719               device_name_.c_str(), thread_num_);
2720     return false;
2721   }
2722   if (no_O_DIRECT)
2723     os_->ActivateFlushPageCache();
2724   *pfile = fd;
2725
2726   return GetDiskSize(fd);
2727 }
2728
2729 // Retrieves the size (in bytes) of the disk/file.
2730 // Return false on failure.
2731 bool DiskThread::GetDiskSize(int fd) {
2732   struct stat device_stat;
2733   if (fstat(fd, &device_stat) == -1) {
2734     logprintf(0, "Process Error: Unable to fstat disk %s (thread %d).\n",
2735               device_name_.c_str(), thread_num_);
2736     return false;
2737   }
2738
2739   // For a block device, an ioctl is needed to get the size since the size
2740   // of the device file (i.e. /dev/sdb) is 0.
2741   if (S_ISBLK(device_stat.st_mode)) {
2742     uint64 block_size = 0;
2743
2744     if (ioctl(fd, BLKGETSIZE64, &block_size) == -1) {
2745       logprintf(0, "Process Error: Unable to ioctl disk %s (thread %d).\n",
2746                 device_name_.c_str(), thread_num_);
2747       return false;
2748     }
2749
2750     // Zero size indicates nonworking device..
2751     if (block_size == 0) {
2752       os_->ErrorReport(device_name_.c_str(), "device-size-zero", 1);
2753       ++errorcount_;
2754       status_ = true;  // Avoid a procedural error.
2755       return false;
2756     }
2757
2758     device_sectors_ = block_size / kSectorSize;
2759
2760   } else if (S_ISREG(device_stat.st_mode)) {
2761     device_sectors_ = device_stat.st_size / kSectorSize;
2762
2763   } else {
2764     logprintf(0, "Process Error: %s is not a regular file or block "
2765               "device (thread %d).\n", device_name_.c_str(),
2766               thread_num_);
2767     return false;
2768   }
2769
2770   logprintf(12, "Log: Device sectors: %lld on disk %s (thread %d).\n",
2771             device_sectors_, device_name_.c_str(), thread_num_);
2772
2773   if (update_block_table_) {
2774     block_table_->SetParameters(kSectorSize, write_block_size_,
2775                                 device_sectors_, segment_size_,
2776                                 device_name_);
2777   }
2778
2779   return true;
2780 }
2781
2782 bool DiskThread::CloseDevice(int fd) {
2783   close(fd);
2784   return true;
2785 }
2786
2787 // Return the time in microseconds.
2788 int64 DiskThread::GetTime() {
2789   struct timeval tv;
2790   gettimeofday(&tv, NULL);
2791   return tv.tv_sec * 1000000 + tv.tv_usec;
2792 }
2793
2794 // Do randomized reads and (possibly) writes on a device.
2795 // Return false on fatal SW error, true on SW success,
2796 // regardless of whether HW failed.
2797 bool DiskThread::DoWork(int fd) {
2798   int64 block_num = 0;
2799   int64 num_segments;
2800
2801   if (segment_size_ == -1) {
2802     num_segments = 1;
2803   } else {
2804     num_segments = device_sectors_ / segment_size_;
2805     if (device_sectors_ % segment_size_ != 0)
2806       num_segments++;
2807   }
2808
2809   // Disk size should be at least 3x cache size.  See comment later for
2810   // details.
2811   sat_assert(device_sectors_ * kSectorSize > 3 * cache_size_);
2812
2813   // This disk test works by writing blocks with a certain pattern to
2814   // disk, then reading them back and verifying it against the pattern
2815   // at a later time.  A failure happens when either the block cannot
2816   // be written/read or when the read block is different than what was
2817   // written.  If a block takes too long to write/read, then a warning
2818   // is given instead of an error since taking too long is not
2819   // necessarily an error.
2820   //
2821   // To prevent the read blocks from coming from the disk cache,
2822   // enough blocks are written before read such that a block would
2823   // be ejected from the disk cache by the time it is read.
2824   //
2825   // TODO(amistry): Implement some sort of read/write throttling.  The
2826   //                flood of asynchronous I/O requests when a drive is
2827   //                unplugged is causing the application and kernel to
2828   //                become unresponsive.
2829
2830   while (IsReadyToRun()) {
2831     // Write blocks to disk.
2832     logprintf(16, "Log: Write phase %sfor disk %s (thread %d).\n",
2833               non_destructive_ ? "(disabled) " : "",
2834               device_name_.c_str(), thread_num_);
2835     while (IsReadyToRunNoPause() &&
2836            in_flight_sectors_.size() <
2837                static_cast<size_t>(queue_size_ + 1)) {
2838       // Confine testing to a particular segment of the disk.
2839       int64 segment = (block_num / blocks_per_segment_) % num_segments;
2840       if (!non_destructive_ &&
2841           (block_num % blocks_per_segment_ == 0)) {
2842         logprintf(20, "Log: Starting to write segment %lld out of "
2843                   "%lld on disk %s (thread %d).\n",
2844                   segment, num_segments, device_name_.c_str(),
2845                   thread_num_);
2846       }
2847       block_num++;
2848
2849       BlockData *block = block_table_->GetUnusedBlock(segment);
2850
2851       // If an unused sequence of sectors could not be found, skip to the
2852       // next block to process.  Soon, a new segment will come and new
2853       // sectors will be able to be allocated.  This effectively puts a
2854       // minumim on the disk size at 3x the stated cache size, or 48MiB
2855       // if a cache size is not given (since the cache is set as 16MiB
2856       // by default).  Given that todays caches are at the low MiB range
2857       // and drive sizes at the mid GB, this shouldn't pose a problem.
2858       // The 3x minimum comes from the following:
2859       //   1. In order to allocate 'y' blocks from a segment, the
2860       //      segment must contain at least 2y blocks or else an
2861       //      allocation may not succeed.
2862       //   2. Assume the entire disk is one segment.
2863       //   3. A full write phase consists of writing blocks corresponding to
2864       //      3/2 cache size.
2865       //   4. Therefore, the one segment must have 2 * 3/2 * cache
2866       //      size worth of blocks = 3 * cache size worth of blocks
2867       //      to complete.
2868       // In non-destructive mode, don't write anything to disk.
2869       if (!non_destructive_) {
2870         if (!WriteBlockToDisk(fd, block)) {
2871           block_table_->RemoveBlock(block);
2872           return true;
2873         }
2874         blocks_written_++;
2875       }
2876
2877       // Block is either initialized by writing, or in nondestructive case,
2878       // initialized by being added into the datastructure for later reading.
2879       block->SetBlockAsInitialized();
2880
2881       in_flight_sectors_.push(block);
2882     }
2883     if (!os_->FlushPageCache()) // If O_DIRECT worked, this will be a NOP.
2884       return false;
2885
2886     // Verify blocks on disk.
2887     logprintf(20, "Log: Read phase for disk %s (thread %d).\n",
2888               device_name_.c_str(), thread_num_);
2889     while (IsReadyToRunNoPause() && !in_flight_sectors_.empty()) {
2890       BlockData *block = in_flight_sectors_.front();
2891       in_flight_sectors_.pop();
2892       if (!ValidateBlockOnDisk(fd, block))
2893         return true;
2894       block_table_->RemoveBlock(block);
2895       blocks_read_++;
2896     }
2897   }
2898
2899   pages_copied_ = blocks_written_ + blocks_read_;
2900   return true;
2901 }
2902
2903 // Do an asynchronous disk I/O operation.
2904 // Return false if the IO is not set up.
2905 bool DiskThread::AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
2906                             int64 offset, int64 timeout) {
2907 #ifdef HAVE_LIBAIO_H
2908   // Use the Linux native asynchronous I/O interface for reading/writing.
2909   // A read/write consists of three basic steps:
2910   //    1. create an io context.
2911   //    2. prepare and submit an io request to the context
2912   //    3. wait for an event on the context.
2913
2914   struct {
2915     const int opcode;
2916     const char *op_str;
2917     const char *error_str;
2918   } operations[2] = {
2919     { IO_CMD_PREAD, "read", "disk-read-error" },
2920     { IO_CMD_PWRITE, "write", "disk-write-error" }
2921   };
2922
2923   struct iocb cb;
2924   memset(&cb, 0, sizeof(cb));
2925
2926   cb.aio_fildes = fd;
2927   cb.aio_lio_opcode = operations[op].opcode;
2928   cb.u.c.buf = buf;
2929   cb.u.c.nbytes = size;
2930   cb.u.c.offset = offset;
2931
2932   struct iocb *cbs[] = { &cb };
2933   if (io_submit(aio_ctx_, 1, cbs) != 1) {
2934     int error = errno;
2935     char buf[256];
2936     sat_strerror(error, buf, sizeof(buf));
2937     logprintf(0, "Process Error: Unable to submit async %s "
2938                  "on disk %s (thread %d). Error %d, %s\n",
2939               operations[op].op_str, device_name_.c_str(),
2940               thread_num_, error, buf);
2941     return false;
2942   }
2943
2944   struct io_event event;
2945   memset(&event, 0, sizeof(event));
2946   struct timespec tv;
2947   tv.tv_sec = timeout / 1000000;
2948   tv.tv_nsec = (timeout % 1000000) * 1000;
2949   if (io_getevents(aio_ctx_, 1, 1, &event, &tv) != 1) {
2950     // A ctrl-c from the keyboard will cause io_getevents to fail with an
2951     // EINTR error code.  This is not an error and so don't treat it as such,
2952     // but still log it.
2953     int error = errno;
2954     if (error == EINTR) {
2955       logprintf(5, "Log: %s interrupted on disk %s (thread %d).\n",
2956                 operations[op].op_str, device_name_.c_str(),
2957                 thread_num_);
2958     } else {
2959       os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2960       errorcount_ += 1;
2961       logprintf(0, "Hardware Error: Timeout doing async %s to sectors "
2962                    "starting at %lld on disk %s (thread %d).\n",
2963                 operations[op].op_str, offset / kSectorSize,
2964                 device_name_.c_str(), thread_num_);
2965     }
2966
2967     // Don't bother checking return codes since io_cancel seems to always fail.
2968     // Since io_cancel is always failing, destroying and recreating an I/O
2969     // context is a workaround for canceling an in-progress I/O operation.
2970     // TODO(amistry): Find out why io_cancel isn't working and make it work.
2971     io_cancel(aio_ctx_, &cb, &event);
2972     io_destroy(aio_ctx_);
2973     aio_ctx_ = 0;
2974     if (io_setup(5, &aio_ctx_)) {
2975       int error = errno;
2976       char buf[256];
2977       sat_strerror(error, buf, sizeof(buf));
2978       logprintf(0, "Process Error: Unable to create aio context on disk %s"
2979                 " (thread %d) Error %d, %s\n",
2980                 device_name_.c_str(), thread_num_, error, buf);
2981     }
2982
2983     return false;
2984   }
2985
2986   // event.res contains the number of bytes written/read or
2987   // error if < 0, I think.
2988   if (event.res != static_cast<uint64>(size)) {
2989     errorcount_++;
2990     os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2991
2992     if (event.res < 0) {
2993       switch (event.res) {
2994         case -EIO:
2995           logprintf(0, "Hardware Error: Low-level I/O error while doing %s to "
2996                        "sectors starting at %lld on disk %s (thread %d).\n",
2997                     operations[op].op_str, offset / kSectorSize,
2998                     device_name_.c_str(), thread_num_);
2999           break;
3000         default:
3001           logprintf(0, "Hardware Error: Unknown error while doing %s to "
3002                        "sectors starting at %lld on disk %s (thread %d).\n",
3003                     operations[op].op_str, offset / kSectorSize,
3004                     device_name_.c_str(), thread_num_);
3005       }
3006     } else {
3007       logprintf(0, "Hardware Error: Unable to %s to sectors starting at "
3008                    "%lld on disk %s (thread %d).\n",
3009                 operations[op].op_str, offset / kSectorSize,
3010                 device_name_.c_str(), thread_num_);
3011     }
3012     return false;
3013   }
3014
3015   return true;
3016 #else // !HAVE_LIBAIO_H
3017   return false;
3018 #endif
3019 }
3020
3021 // Write a block to disk.
3022 // Return false if the block is not written.
3023 bool DiskThread::WriteBlockToDisk(int fd, BlockData *block) {
3024   memset(block_buffer_, 0, block->GetSize());
3025
3026   // Fill block buffer with a pattern
3027   struct page_entry pe;
3028   if (!sat_->GetValid(&pe)) {
3029     // Even though a valid page could not be obatined, it is not an error
3030     // since we can always fill in a pattern directly, albeit slower.
3031     unsigned int *memblock = static_cast<unsigned int *>(block_buffer_);
3032     block->SetPattern(patternlist_->GetRandomPattern());
3033
3034     logprintf(11, "Log: Warning, using pattern fill fallback in "
3035                   "DiskThread::WriteBlockToDisk on disk %s (thread %d).\n",
3036               device_name_.c_str(), thread_num_);
3037
3038     for (int i = 0; i < block->GetSize()/wordsize_; i++) {
3039       memblock[i] = block->GetPattern()->pattern(i);
3040     }
3041   } else {
3042     memcpy(block_buffer_, pe.addr, block->GetSize());
3043     block->SetPattern(pe.pattern);
3044     sat_->PutValid(&pe);
3045   }
3046
3047   logprintf(12, "Log: Writing %lld sectors starting at %lld on disk %s"
3048             " (thread %d).\n",
3049             block->GetSize()/kSectorSize, block->GetAddress(),
3050             device_name_.c_str(), thread_num_);
3051
3052   int64 start_time = GetTime();
3053
3054   if (!AsyncDiskIO(ASYNC_IO_WRITE, fd, block_buffer_, block->GetSize(),
3055                    block->GetAddress() * kSectorSize, write_timeout_)) {
3056     return false;
3057   }
3058
3059   int64 end_time = GetTime();
3060   logprintf(12, "Log: Writing time: %lld us (thread %d).\n",
3061             end_time - start_time, thread_num_);
3062   if (end_time - start_time > write_threshold_) {
3063     logprintf(5, "Log: Write took %lld us which is longer than threshold "
3064                  "%lld us on disk %s (thread %d).\n",
3065               end_time - start_time, write_threshold_, device_name_.c_str(),
3066               thread_num_);
3067   }
3068
3069   return true;
3070 }
3071
3072 // Verify a block on disk.
3073 // Return true if the block was read, also increment errorcount
3074 // if the block had data errors or performance problems.
3075 bool DiskThread::ValidateBlockOnDisk(int fd, BlockData *block) {
3076   int64 blocks = block->GetSize() / read_block_size_;
3077   int64 bytes_read = 0;
3078   int64 current_blocks;
3079   int64 current_bytes;
3080   uint64 address = block->GetAddress();
3081
3082   logprintf(20, "Log: Reading sectors starting at %lld on disk %s "
3083             "(thread %d).\n",
3084             address, device_name_.c_str(), thread_num_);
3085
3086   // Read block from disk and time the read.  If it takes longer than the
3087   // threshold, complain.
3088   if (lseek64(fd, address * kSectorSize, SEEK_SET) == -1) {
3089     logprintf(0, "Process Error: Unable to seek to sector %lld in "
3090               "DiskThread::ValidateSectorsOnDisk on disk %s "
3091               "(thread %d).\n", address, device_name_.c_str(), thread_num_);
3092     return false;
3093   }
3094   int64 start_time = GetTime();
3095
3096   // Split a large write-sized block into small read-sized blocks and
3097   // read them in groups of randomly-sized multiples of read block size.
3098   // This assures all data written on disk by this particular block
3099   // will be tested using a random reading pattern.
3100   while (blocks != 0) {
3101     // Test all read blocks in a written block.
3102     current_blocks = (random() % blocks) + 1;
3103     current_bytes = current_blocks * read_block_size_;
3104
3105     memset(block_buffer_, 0, current_bytes);
3106
3107     logprintf(20, "Log: Reading %lld sectors starting at sector %lld on "
3108               "disk %s (thread %d)\n",
3109               current_bytes / kSectorSize,
3110               (address * kSectorSize + bytes_read) / kSectorSize,
3111               device_name_.c_str(), thread_num_);
3112
3113     if (!AsyncDiskIO(ASYNC_IO_READ, fd, block_buffer_, current_bytes,
3114                      address * kSectorSize + bytes_read,
3115                      write_timeout_)) {
3116       return false;
3117     }
3118
3119     int64 end_time = GetTime();
3120     logprintf(20, "Log: Reading time: %lld us (thread %d).\n",
3121               end_time - start_time, thread_num_);
3122     if (end_time - start_time > read_threshold_) {
3123       logprintf(5, "Log: Read took %lld us which is longer than threshold "
3124                 "%lld us on disk %s (thread %d).\n",
3125                 end_time - start_time, read_threshold_,
3126                 device_name_.c_str(), thread_num_);
3127     }
3128
3129     // In non-destructive mode, don't compare the block to the pattern since
3130     // the block was never written to disk in the first place.
3131     if (!non_destructive_) {
3132       if (CheckRegion(block_buffer_, block->GetPattern(), current_bytes,
3133                       0, bytes_read)) {
3134         os_->ErrorReport(device_name_.c_str(), "disk-pattern-error", 1);
3135         errorcount_ += 1;
3136         logprintf(0, "Hardware Error: Pattern mismatch in block starting at "
3137                   "sector %lld in DiskThread::ValidateSectorsOnDisk on "
3138                   "disk %s (thread %d).\n",
3139                   address, device_name_.c_str(), thread_num_);
3140       }
3141     }
3142
3143     bytes_read += current_blocks * read_block_size_;
3144     blocks -= current_blocks;
3145   }
3146
3147   return true;
3148 }
3149
3150 // Direct device access thread.
3151 // Return false on software error.
3152 bool DiskThread::Work() {
3153   int fd;
3154
3155   logprintf(9, "Log: Starting disk thread %d, disk %s\n",
3156             thread_num_, device_name_.c_str());
3157
3158   srandom(time(NULL));
3159
3160   if (!OpenDevice(&fd)) {
3161     status_ = false;
3162     return false;
3163   }
3164
3165   // Allocate a block buffer aligned to 512 bytes since the kernel requires it
3166   // when using direct IO.
3167 #ifdef HAVE_POSIX_MEMALIGN
3168   int memalign_result = posix_memalign(&block_buffer_, kBufferAlignment,
3169                               sat_->page_length());
3170 #else
3171   block_buffer_ = memalign(kBufferAlignment, sat_->page_length());
3172   int memalign_result = (block_buffer_ == 0);
3173 #endif
3174   if (memalign_result) {
3175     CloseDevice(fd);
3176     logprintf(0, "Process Error: Unable to allocate memory for buffers "
3177                  "for disk %s (thread %d) posix memalign returned %d.\n",
3178               device_name_.c_str(), thread_num_, memalign_result);
3179     status_ = false;
3180     return false;
3181   }
3182
3183 #ifdef HAVE_LIBAIO_H
3184   if (io_setup(5, &aio_ctx_)) {
3185     CloseDevice(fd);
3186     logprintf(0, "Process Error: Unable to create aio context for disk %s"
3187               " (thread %d).\n",
3188               device_name_.c_str(), thread_num_);
3189     status_ = false;
3190     return false;
3191   }
3192 #endif
3193
3194   bool result = DoWork(fd);
3195
3196   status_ = result;
3197
3198 #ifdef HAVE_LIBAIO_H
3199   io_destroy(aio_ctx_);
3200 #endif
3201   CloseDevice(fd);
3202
3203   logprintf(9, "Log: Completed %d (disk %s): disk thread status %d, "
3204                "%d pages copied\n",
3205             thread_num_, device_name_.c_str(), status_, pages_copied_);
3206   return result;
3207 }
3208
3209 RandomDiskThread::RandomDiskThread(DiskBlockTable *block_table)
3210     : DiskThread(block_table) {
3211   update_block_table_ = 0;
3212 }
3213
3214 RandomDiskThread::~RandomDiskThread() {
3215 }
3216
3217 // Workload for random disk thread.
3218 bool RandomDiskThread::DoWork(int fd) {
3219   logprintf(11, "Log: Random phase for disk %s (thread %d).\n",
3220             device_name_.c_str(), thread_num_);
3221   while (IsReadyToRun()) {
3222     BlockData *block = block_table_->GetRandomBlock();
3223     if (block == NULL) {
3224       logprintf(12, "Log: No block available for device %s (thread %d).\n",
3225                 device_name_.c_str(), thread_num_);
3226     } else {
3227       ValidateBlockOnDisk(fd, block);
3228       block_table_->ReleaseBlock(block);
3229       blocks_read_++;
3230     }
3231   }
3232   pages_copied_ = blocks_read_;
3233   return true;
3234 }
3235
3236 MemoryRegionThread::MemoryRegionThread() {
3237   error_injection_ = false;
3238   pages_ = NULL;
3239 }
3240
3241 MemoryRegionThread::~MemoryRegionThread() {
3242   if (pages_ != NULL)
3243     delete pages_;
3244 }
3245
3246 // Set a region of memory or MMIO to be tested.
3247 // Return false if region could not be mapped.
3248 bool MemoryRegionThread::SetRegion(void *region, int64 size) {
3249   int plength = sat_->page_length();
3250   int npages = size / plength;
3251   if (size % plength) {
3252     logprintf(0, "Process Error: region size is not a multiple of SAT "
3253               "page length\n");
3254     return false;
3255   } else {
3256     if (pages_ != NULL)
3257       delete pages_;
3258     pages_ = new PageEntryQueue(npages);
3259     char *base_addr = reinterpret_cast<char*>(region);
3260     region_ = base_addr;
3261     for (int i = 0; i < npages; i++) {
3262       struct page_entry pe;
3263       init_pe(&pe);
3264       pe.addr = reinterpret_cast<void*>(base_addr + i * plength);
3265       pe.offset = i * plength;
3266
3267       pages_->Push(&pe);
3268     }
3269     return true;
3270   }
3271 }
3272
3273 // More detailed error printout for hardware errors in memory or MMIO
3274 // regions.
3275 void MemoryRegionThread::ProcessError(struct ErrorRecord *error,
3276                                       int priority,
3277                                       const char *message) {
3278   uint32 buffer_offset;
3279   if (phase_ == kPhaseCopy) {
3280     // If the error occurred on the Copy Phase, it means that
3281     // the source data (i.e., the main memory) is wrong. so
3282     // just pass it to the original ProcessError to call a
3283     // bad-dimm error
3284     WorkerThread::ProcessError(error, priority, message);
3285   } else if (phase_ == kPhaseCheck) {
3286     // A error on the Check Phase means that the memory region tested
3287     // has an error. Gathering more information and then reporting
3288     // the error.
3289     // Determine if this is a write or read error.
3290     os_->Flush(error->vaddr);
3291     error->reread = *(error->vaddr);
3292     char *good = reinterpret_cast<char*>(&(error->expected));
3293     char *bad = reinterpret_cast<char*>(&(error->actual));
3294     sat_assert(error->expected != error->actual);
3295     unsigned int offset = 0;
3296     for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
3297       if (good[offset] != bad[offset])
3298         break;
3299     }
3300
3301     error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
3302
3303     buffer_offset = error->vbyteaddr - region_;
3304
3305     // Find physical address if possible.
3306     error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
3307     logprintf(priority,
3308               "%s: miscompare on %s, CRC check at %p(0x%llx), "
3309               "offset %llx: read:0x%016llx, reread:0x%016llx "
3310               "expected:0x%016llx\n",
3311               message,
3312               identifier_.c_str(),
3313               error->vaddr,
3314               error->paddr,
3315               buffer_offset,
3316               error->actual,
3317               error->reread,
3318               error->expected);
3319   } else {
3320     logprintf(0, "Process Error: memory region thread raised an "
3321               "unexpected error.");
3322   }
3323 }
3324
3325 // Workload for testion memory or MMIO regions.
3326 // Return false on software error.
3327 bool MemoryRegionThread::Work() {
3328   struct page_entry source_pe;
3329   struct page_entry memregion_pe;
3330   bool result = true;
3331   int64 loops = 0;
3332   const uint64 error_constant = 0x00ba00000000ba00LL;
3333
3334   // For error injection.
3335   int64 *addr = 0x0;
3336   int offset = 0;
3337   int64 data = 0;
3338
3339   logprintf(9, "Log: Starting Memory Region thread %d\n", thread_num_);
3340
3341   while (IsReadyToRun()) {
3342     // Getting pages from SAT and queue.
3343     phase_ = kPhaseNoPhase;
3344     result = result && sat_->GetValid(&source_pe);
3345     if (!result) {
3346       logprintf(0, "Process Error: memory region thread failed to pop "
3347                 "pages from SAT, bailing\n");
3348       break;
3349     }
3350
3351     result = result && pages_->PopRandom(&memregion_pe);
3352     if (!result) {
3353       logprintf(0, "Process Error: memory region thread failed to pop "
3354                 "pages from queue, bailing\n");
3355       break;
3356     }
3357
3358     // Error injection for CRC copy.
3359     if ((sat_->error_injection() || error_injection_) && loops == 1) {
3360       addr = reinterpret_cast<int64*>(source_pe.addr);
3361       offset = random() % (sat_->page_length() / wordsize_);
3362       data = addr[offset];
3363       addr[offset] = error_constant;
3364     }
3365
3366     // Copying SAT page into memory region.
3367     phase_ = kPhaseCopy;
3368     CrcCopyPage(&memregion_pe, &source_pe);
3369     memregion_pe.pattern = source_pe.pattern;
3370
3371     // Error injection for CRC Check.
3372     if ((sat_->error_injection() || error_injection_) && loops == 2) {
3373       addr = reinterpret_cast<int64*>(memregion_pe.addr);
3374       offset = random() % (sat_->page_length() / wordsize_);
3375       data = addr[offset];
3376       addr[offset] = error_constant;
3377     }
3378
3379     // Checking page content in memory region.
3380     phase_ = kPhaseCheck;
3381     CrcCheckPage(&memregion_pe);
3382
3383     phase_ = kPhaseNoPhase;
3384     // Storing pages on their proper queues.
3385     result = result && sat_->PutValid(&source_pe);
3386     if (!result) {
3387       logprintf(0, "Process Error: memory region thread failed to push "
3388                 "pages into SAT, bailing\n");
3389       break;
3390     }
3391     result = result && pages_->Push(&memregion_pe);
3392     if (!result) {
3393       logprintf(0, "Process Error: memory region thread failed to push "
3394                 "pages into queue, bailing\n");
3395       break;
3396     }
3397
3398     if ((sat_->error_injection() || error_injection_) &&
3399         loops >= 1 && loops <= 2) {
3400       addr[offset] = data;
3401     }
3402
3403     loops++;
3404     YieldSelf();
3405   }
3406
3407   pages_copied_ = loops;
3408   status_ = result;
3409   logprintf(9, "Log: Completed %d: Memory Region thread. Status %d, %d "
3410             "pages checked\n", thread_num_, status_, pages_copied_);
3411   return result;
3412 }