chiark / gitweb /
Update to stressapptest 1.0.4
[stressapptest] / src / worker.cc
1 // Copyright 2006 Google Inc. All Rights Reserved.
2
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6
7 //      http://www.apache.org/licenses/LICENSE-2.0
8
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // worker.cc : individual tasks that can be run in combination to
16 // stress the system
17
18 #include <errno.h>
19 #include <pthread.h>
20 #include <sched.h>
21 #include <signal.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <string.h>
26 #include <time.h>
27 #include <unistd.h>
28
29 #include <sys/select.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <sys/times.h>
33
34 // These are necessary, but on by default
35 // #define __USE_GNU
36 // #define __USE_LARGEFILE64
37 #include <fcntl.h>
38 #include <sys/socket.h>
39 #include <netdb.h>
40 #include <arpa/inet.h>
41 #include <linux/unistd.h>  // for gettid
42
43 // For size of block device
44 #include <sys/ioctl.h>
45 #include <linux/fs.h>
46 // For asynchronous I/O
47 #include <libaio.h>
48
49 #include <sys/syscall.h>
50
51 #include <set>
52 #include <string>
53
54 // This file must work with autoconf on its public version,
55 // so these includes are correct.
56 #include "error_diag.h"  // NOLINT
57 #include "os.h"          // NOLINT
58 #include "pattern.h"     // NOLINT
59 #include "queue.h"       // NOLINT
60 #include "sat.h"         // NOLINT
61 #include "sattypes.h"    // NOLINT
62 #include "worker.h"      // NOLINT
63
64 // Syscalls
65 // Why ubuntu, do you hate gettid so bad?
66 #if !defined(__NR_gettid)
67   #define __NR_gettid             224
68 #endif
69
70 #define gettid() syscall(__NR_gettid)
71 #if !defined(CPU_SETSIZE)
72 _syscall3(int, sched_getaffinity, pid_t, pid,
73           unsigned int, len, cpu_set_t*, mask)
74 _syscall3(int, sched_setaffinity, pid_t, pid,
75           unsigned int, len, cpu_set_t*, mask)
76 #endif
77
78 // Linux aio syscalls.
79 #if !defined(__NR_io_setup)
80 #error "No aio headers inculded, please install libaio."
81 #endif
82
83 namespace {
84   // Get HW core ID from cpuid instruction.
85   inline int apicid(void) {
86     int cpu;
87 #if defined(STRESSAPPTEST_CPU_X86_64) || defined(STRESSAPPTEST_CPU_I686)
88     __asm __volatile("cpuid" : "=b" (cpu) : "a" (1) : "cx", "dx");
89 #elif defined(STRESSAPPTEST_CPU_ARMV7A)
90   #warning "Unsupported CPU type ARMV7A: unable to determine core ID."
91     cpu = 0;
92 #else
93   #warning "Unsupported CPU type: unable to determine core ID."
94     cpu = 0;
95 #endif
96     return (cpu >> 24);
97   }
98
99   // Work around the sad fact that there are two (gnu, xsi) incompatible
100   // versions of strerror_r floating around google. Awesome.
101   bool sat_strerror(int err, char *buf, int len) {
102     buf[0] = 0;
103     char *errmsg = reinterpret_cast<char*>(strerror_r(err, buf, len));
104     int retval = reinterpret_cast<int64>(errmsg);
105     if (retval == 0)
106       return true;
107     if (retval == -1)
108       return false;
109     if (errmsg != buf) {
110       strncpy(buf, errmsg, len);
111       buf[len - 1] = 0;
112     }
113     return true;
114   }
115
116
117   inline uint64 addr_to_tag(void *address) {
118     return reinterpret_cast<uint64>(address);
119   }
120 }
121
122 #if !defined(O_DIRECT)
123 // Sometimes this isn't available.
124 // Disregard if it's not defined.
125   #define O_DIRECT            0
126 #endif
127
128 // A struct to hold captured errors, for later reporting.
129 struct ErrorRecord {
130   uint64 actual;  // This is the actual value read.
131   uint64 reread;  // This is the actual value, reread.
132   uint64 expected;  // This is what it should have been.
133   uint64 *vaddr;  // This is where it was (or wasn't).
134   char *vbyteaddr;  // This is byte specific where the data was (or wasn't).
135   uint64 paddr;  // This is the bus address, if available.
136   uint64 *tagvaddr;  // This holds the tag value if this data was tagged.
137   uint64 tagpaddr;  // This holds the physical address corresponding to the tag.
138 };
139
140 // This is a helper function to create new threads with pthreads.
141 static void *ThreadSpawnerGeneric(void *ptr) {
142   WorkerThread *worker = static_cast<WorkerThread*>(ptr);
143   worker->StartRoutine();
144   return NULL;
145 }
146
147 void WorkerStatus::Initialize() {
148   sat_assert(0 == pthread_mutex_init(&num_workers_mutex_, NULL));
149   sat_assert(0 == pthread_rwlock_init(&status_rwlock_, NULL));
150   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL,
151                                        num_workers_ + 1));
152 }
153
154 void WorkerStatus::Destroy() {
155   sat_assert(0 == pthread_mutex_destroy(&num_workers_mutex_));
156   sat_assert(0 == pthread_rwlock_destroy(&status_rwlock_));
157   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
158 }
159
160 void WorkerStatus::PauseWorkers() {
161   if (SetStatus(PAUSE) != PAUSE)
162     WaitOnPauseBarrier();
163 }
164
165 void WorkerStatus::ResumeWorkers() {
166   if (SetStatus(RUN) == PAUSE)
167     WaitOnPauseBarrier();
168 }
169
170 void WorkerStatus::StopWorkers() {
171   if (SetStatus(STOP) == PAUSE)
172     WaitOnPauseBarrier();
173 }
174
175 bool WorkerStatus::ContinueRunning() {
176   // This loop is an optimization.  We use it to immediately re-check the status
177   // after resuming from a pause, instead of returning and waiting for the next
178   // call to this function.
179   for (;;) {
180     switch (GetStatus()) {
181       case RUN:
182         return true;
183       case PAUSE:
184         // Wait for the other workers to call this function so that
185         // PauseWorkers() can return.
186         WaitOnPauseBarrier();
187         // Wait for ResumeWorkers() to be called.
188         WaitOnPauseBarrier();
189         break;
190       case STOP:
191         return false;
192     }
193   }
194 }
195
196 bool WorkerStatus::ContinueRunningNoPause() {
197   return (GetStatus() != STOP);
198 }
199
200 void WorkerStatus::RemoveSelf() {
201   // Acquire a read lock on status_rwlock_ while (status_ != PAUSE).
202   for (;;) {
203     AcquireStatusReadLock();
204     if (status_ != PAUSE)
205       break;
206     // We need to obey PauseWorkers() just like ContinueRunning() would, so that
207     // the other threads won't wait on pause_barrier_ forever.
208     ReleaseStatusLock();
209     // Wait for the other workers to call this function so that PauseWorkers()
210     // can return.
211     WaitOnPauseBarrier();
212     // Wait for ResumeWorkers() to be called.
213     WaitOnPauseBarrier();
214   }
215
216   // This lock would be unnecessary if we held a write lock instead of a read
217   // lock on status_rwlock_, but that would also force all threads calling
218   // ContinueRunning() to wait on this one.  Using a separate lock avoids that.
219   AcquireNumWorkersLock();
220   // Decrement num_workers_ and reinitialize pause_barrier_, which we know isn't
221   // in use because (status != PAUSE).
222   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
223   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL, num_workers_));
224   --num_workers_;
225   ReleaseNumWorkersLock();
226
227   // Release status_rwlock_.
228   ReleaseStatusLock();
229 }
230
231
232 // Parent thread class.
233 WorkerThread::WorkerThread() {
234   status_ = false;
235   pages_copied_ = 0;
236   errorcount_ = 0;
237   runduration_usec_ = 1;
238   priority_ = Normal;
239   worker_status_ = NULL;
240   thread_spawner_ = &ThreadSpawnerGeneric;
241   tag_mode_ = false;
242 }
243
244 WorkerThread::~WorkerThread() {}
245
246 // Constructors. Just init some default values.
247 FillThread::FillThread() {
248   num_pages_to_fill_ = 0;
249 }
250
251 // Initialize file name to empty.
252 FileThread::FileThread() {
253   filename_ = "";
254   devicename_ = "";
255   pass_ = 0;
256   page_io_ = true;
257   crc_page_ = -1;
258   local_page_ = NULL;
259 }
260
261 // If file thread used bounce buffer in memory, account for the extra
262 // copy for memory bandwidth calculation.
263 float FileThread::GetMemoryCopiedData() {
264   if (!os_->normal_mem())
265     return GetCopiedData();
266   else
267     return 0;
268 }
269
270 // Initialize target hostname to be invalid.
271 NetworkThread::NetworkThread() {
272   snprintf(ipaddr_, sizeof(ipaddr_), "Unknown");
273   sock_ = 0;
274 }
275
276 // Initialize?
277 NetworkSlaveThread::NetworkSlaveThread() {
278 }
279
280 // Initialize?
281 NetworkListenThread::NetworkListenThread() {
282 }
283
284 // Init member variables.
285 void WorkerThread::InitThread(int thread_num_init,
286                               class Sat *sat_init,
287                               class OsLayer *os_init,
288                               class PatternList *patternlist_init,
289                               WorkerStatus *worker_status) {
290   sat_assert(worker_status);
291   worker_status->AddWorkers(1);
292
293   thread_num_ = thread_num_init;
294   sat_ = sat_init;
295   os_ = os_init;
296   patternlist_ = patternlist_init;
297   worker_status_ = worker_status;
298
299   AvailableCpus(&cpu_mask_);
300   tag_ = 0xffffffff;
301
302   tag_mode_ = sat_->tag_mode();
303 }
304
305
306 // Use pthreads to prioritize a system thread.
307 bool WorkerThread::InitPriority() {
308   // This doesn't affect performance that much, and may not be too safe.
309
310   bool ret = BindToCpus(&cpu_mask_);
311   if (!ret)
312     logprintf(11, "Log: Bind to %s failed.\n",
313               cpuset_format(&cpu_mask_).c_str());
314
315   logprintf(11, "Log: Thread %d running on apic ID %d mask %s (%s).\n",
316             thread_num_, apicid(),
317             CurrentCpusFormat().c_str(),
318             cpuset_format(&cpu_mask_).c_str());
319 #if 0
320   if (priority_ == High) {
321     sched_param param;
322     param.sched_priority = 1;
323     // Set the priority; others are unchanged.
324     logprintf(0, "Log: Changing priority to SCHED_FIFO %d\n",
325               param.sched_priority);
326     if (sched_setscheduler(0, SCHED_FIFO, &param)) {
327       char buf[256];
328       sat_strerror(errno, buf, sizeof(buf));
329       logprintf(0, "Process Error: sched_setscheduler "
330                    "failed - error %d %s\n",
331                 errno, buf);
332     }
333   }
334 #endif
335   return true;
336 }
337
338 // Use pthreads to create a system thread.
339 int WorkerThread::SpawnThread() {
340   // Create the new thread.
341   int result = pthread_create(&thread_, NULL, thread_spawner_, this);
342   if (result) {
343     char buf[256];
344     sat_strerror(result, buf, sizeof(buf));
345     logprintf(0, "Process Error: pthread_create "
346                   "failed - error %d %s\n", result,
347               buf);
348     status_ = false;
349     return false;
350   }
351
352   // 0 is pthreads success.
353   return true;
354 }
355
356 // Kill the worker thread with SIGINT.
357 bool WorkerThread::KillThread() {
358   return (pthread_kill(thread_, SIGINT) == 0);
359 }
360
361 // Block until thread has exited.
362 bool WorkerThread::JoinThread() {
363   int result = pthread_join(thread_, NULL);
364
365   if (result) {
366     logprintf(0, "Process Error: pthread_join failed - error %d\n", result);
367     status_ = false;
368   }
369
370   // 0 is pthreads success.
371   return (!result);
372 }
373
374
375 void WorkerThread::StartRoutine() {
376   InitPriority();
377   StartThreadTimer();
378   Work();
379   StopThreadTimer();
380   worker_status_->RemoveSelf();
381 }
382
383
384 // Thread work loop. Execute until marked finished.
385 bool WorkerThread::Work() {
386   do {
387     logprintf(9, "Log: ...\n");
388     // Sleep for 1 second.
389     sat_sleep(1);
390   } while (IsReadyToRun());
391
392   return false;
393 }
394
395
396 // Returns CPU mask of CPUs available to this process,
397 // Conceptually, each bit represents a logical CPU, ie:
398 //   mask = 3  (11b):   cpu0, 1
399 //   mask = 13 (1101b): cpu0, 2, 3
400 bool WorkerThread::AvailableCpus(cpu_set_t *cpuset) {
401   CPU_ZERO(cpuset);
402   return sched_getaffinity(getppid(), sizeof(*cpuset), cpuset) == 0;
403 }
404
405
406 // Returns CPU mask of CPUs this thread is bound to,
407 // Conceptually, each bit represents a logical CPU, ie:
408 //   mask = 3  (11b):   cpu0, 1
409 //   mask = 13 (1101b): cpu0, 2, 3
410 bool WorkerThread::CurrentCpus(cpu_set_t *cpuset) {
411   CPU_ZERO(cpuset);
412   return sched_getaffinity(0, sizeof(*cpuset), cpuset) == 0;
413 }
414
415
416 // Bind worker thread to specified CPU(s)
417 //   Args:
418 //     thread_mask: cpu_set_t representing CPUs, ie
419 //                  mask = 1  (01b):   cpu0
420 //                  mask = 3  (11b):   cpu0, 1
421 //                  mask = 13 (1101b): cpu0, 2, 3
422 //
423 //   Returns true on success, false otherwise.
424 bool WorkerThread::BindToCpus(const cpu_set_t *thread_mask) {
425   cpu_set_t process_mask;
426   AvailableCpus(&process_mask);
427   if (cpuset_isequal(thread_mask, &process_mask))
428     return true;
429
430   logprintf(11, "Log: available CPU mask - %s\n",
431             cpuset_format(&process_mask).c_str());
432   if (!cpuset_issubset(thread_mask, &process_mask)) {
433     // Invalid cpu_mask, ie cpu not allocated to this process or doesn't exist.
434     logprintf(0, "Log: requested CPUs %s not a subset of available %s\n",
435               cpuset_format(thread_mask).c_str(),
436               cpuset_format(&process_mask).c_str());
437     return false;
438   }
439   return (sched_setaffinity(gettid(), sizeof(*thread_mask), thread_mask) == 0);
440 }
441
442
443 // A worker thread can yield itself to give up CPU until it's scheduled again.
444 //   Returns true on success, false on error.
445 bool WorkerThread::YieldSelf() {
446   return (sched_yield() == 0);
447 }
448
449
450 // Fill this page with its pattern.
451 bool WorkerThread::FillPage(struct page_entry *pe) {
452   // Error check arguments.
453   if (pe == 0) {
454     logprintf(0, "Process Error: Fill Page entry null\n");
455     return 0;
456   }
457
458   // Mask is the bitmask of indexes used by the pattern.
459   // It is the pattern size -1. Size is always a power of 2.
460   uint64 *memwords = static_cast<uint64*>(pe->addr);
461   int length = sat_->page_length();
462
463   if (tag_mode_) {
464     // Select tag or data as appropriate.
465     for (int i = 0; i < length / wordsize_; i++) {
466       datacast_t data;
467
468       if ((i & 0x7) == 0) {
469         data.l64 = addr_to_tag(&memwords[i]);
470       } else {
471         data.l32.l = pe->pattern->pattern(i << 1);
472         data.l32.h = pe->pattern->pattern((i << 1) + 1);
473       }
474       memwords[i] = data.l64;
475     }
476   } else {
477     // Just fill in untagged data directly.
478     for (int i = 0; i < length / wordsize_; i++) {
479       datacast_t data;
480
481       data.l32.l = pe->pattern->pattern(i << 1);
482       data.l32.h = pe->pattern->pattern((i << 1) + 1);
483       memwords[i] = data.l64;
484     }
485   }
486
487   return 1;
488 }
489
490
491 // Tell the thread how many pages to fill.
492 void FillThread::SetFillPages(int64 num_pages_to_fill_init) {
493   num_pages_to_fill_ = num_pages_to_fill_init;
494 }
495
496 // Fill this page with a random pattern.
497 bool FillThread::FillPageRandom(struct page_entry *pe) {
498   // Error check arguments.
499   if (pe == 0) {
500     logprintf(0, "Process Error: Fill Page entry null\n");
501     return 0;
502   }
503   if ((patternlist_ == 0) || (patternlist_->Size() == 0)) {
504     logprintf(0, "Process Error: No data patterns available\n");
505     return 0;
506   }
507
508   // Choose a random pattern for this block.
509   pe->pattern = patternlist_->GetRandomPattern();
510   if (pe->pattern == 0) {
511     logprintf(0, "Process Error: Null data pattern\n");
512     return 0;
513   }
514
515   // Actually fill the page.
516   return FillPage(pe);
517 }
518
519
520 // Memory fill work loop. Execute until alloted pages filled.
521 bool FillThread::Work() {
522   bool result = true;
523
524   logprintf(9, "Log: Starting fill thread %d\n", thread_num_);
525
526   // We want to fill num_pages_to_fill pages, and
527   // stop when we've filled that many.
528   // We also want to capture early break
529   struct page_entry pe;
530   int64 loops = 0;
531   while (IsReadyToRun() && (loops < num_pages_to_fill_)) {
532     result = result && sat_->GetEmpty(&pe);
533     if (!result) {
534       logprintf(0, "Process Error: fill_thread failed to pop pages, "
535                 "bailing\n");
536       break;
537     }
538
539     // Fill the page with pattern
540     result = result && FillPageRandom(&pe);
541     if (!result) break;
542
543     // Put the page back on the queue.
544     result = result && sat_->PutValid(&pe);
545     if (!result) {
546       logprintf(0, "Process Error: fill_thread failed to push pages, "
547                 "bailing\n");
548       break;
549     }
550     loops++;
551   }
552
553   // Fill in thread status.
554   pages_copied_ = loops;
555   status_ = result;
556   logprintf(9, "Log: Completed %d: Fill thread. Status %d, %d pages filled\n",
557             thread_num_, status_, pages_copied_);
558   return result;
559 }
560
561
562 // Print error information about a data miscompare.
563 void WorkerThread::ProcessError(struct ErrorRecord *error,
564                                 int priority,
565                                 const char *message) {
566   char dimm_string[256] = "";
567
568   int apic_id = apicid();
569
570   // Determine if this is a write or read error.
571   os_->Flush(error->vaddr);
572   error->reread = *(error->vaddr);
573
574   char *good = reinterpret_cast<char*>(&(error->expected));
575   char *bad = reinterpret_cast<char*>(&(error->actual));
576
577   sat_assert(error->expected != error->actual);
578   unsigned int offset = 0;
579   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
580     if (good[offset] != bad[offset])
581       break;
582   }
583
584   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
585
586   // Find physical address if possible.
587   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
588
589   // Pretty print DIMM mapping if available.
590   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
591
592   // Report parseable error.
593   if (priority < 5) {
594     // Run miscompare error through diagnoser for logging and reporting.
595     os_->error_diagnoser_->AddMiscompareError(dimm_string,
596                                               reinterpret_cast<uint64>
597                                               (error->vaddr), 1);
598
599     logprintf(priority,
600               "%s: miscompare on CPU %d(0x%s) at %p(0x%llx:%s): "
601               "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
602               message,
603               apic_id,
604               CurrentCpusFormat().c_str(),
605               error->vaddr,
606               error->paddr,
607               dimm_string,
608               error->actual,
609               error->reread,
610               error->expected);
611   }
612
613
614   // Overwrite incorrect data with correct data to prevent
615   // future miscompares when this data is reused.
616   *(error->vaddr) = error->expected;
617   os_->Flush(error->vaddr);
618 }
619
620
621
622 // Print error information about a data miscompare.
623 void FileThread::ProcessError(struct ErrorRecord *error,
624                               int priority,
625                               const char *message) {
626   char dimm_string[256] = "";
627
628   // Determine if this is a write or read error.
629   os_->Flush(error->vaddr);
630   error->reread = *(error->vaddr);
631
632   char *good = reinterpret_cast<char*>(&(error->expected));
633   char *bad = reinterpret_cast<char*>(&(error->actual));
634
635   sat_assert(error->expected != error->actual);
636   unsigned int offset = 0;
637   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
638     if (good[offset] != bad[offset])
639       break;
640   }
641
642   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
643
644   // Find physical address if possible.
645   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
646
647   // Pretty print DIMM mapping if available.
648   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
649
650   // If crc_page_ is valid, ie checking content read back from file,
651   // track src/dst memory addresses. Otherwise catagorize as general
652   // mememory miscompare for CRC checking everywhere else.
653   if (crc_page_ != -1) {
654     int miscompare_byteoffset = static_cast<char*>(error->vbyteaddr) -
655                                 static_cast<char*>(page_recs_[crc_page_].dst);
656     os_->error_diagnoser_->AddHDDMiscompareError(devicename_,
657                                                  crc_page_,
658                                                  miscompare_byteoffset,
659                                                  page_recs_[crc_page_].src,
660                                                  page_recs_[crc_page_].dst);
661   } else {
662     os_->error_diagnoser_->AddMiscompareError(dimm_string,
663                                               reinterpret_cast<uint64>
664                                               (error->vaddr), 1);
665   }
666
667   logprintf(priority,
668             "%s: miscompare on %s at %p(0x%llx:%s): read:0x%016llx, "
669             "reread:0x%016llx expected:0x%016llx\n",
670             message,
671             devicename_.c_str(),
672             error->vaddr,
673             error->paddr,
674             dimm_string,
675             error->actual,
676             error->reread,
677             error->expected);
678
679   // Overwrite incorrect data with correct data to prevent
680   // future miscompares when this data is reused.
681   *(error->vaddr) = error->expected;
682   os_->Flush(error->vaddr);
683 }
684
685
686 // Do a word by word result check of a region.
687 // Print errors on mismatches.
688 int WorkerThread::CheckRegion(void *addr,
689                               class Pattern *pattern,
690                               int64 length,
691                               int offset,
692                               int64 pattern_offset) {
693   uint64 *memblock = static_cast<uint64*>(addr);
694   const int kErrorLimit = 128;
695   int errors = 0;
696   int overflowerrors = 0;  // Count of overflowed errors.
697   bool page_error = false;
698   string errormessage("Hardware Error");
699   struct ErrorRecord
700     recorded[kErrorLimit];  // Queued errors for later printing.
701
702   // For each word in the data region.
703   for (int i = 0; i < length / wordsize_; i++) {
704     uint64 actual = memblock[i];
705     uint64 expected;
706
707     // Determine the value that should be there.
708     datacast_t data;
709     int index = 2 * i + pattern_offset;
710     data.l32.l = pattern->pattern(index);
711     data.l32.h = pattern->pattern(index + 1);
712     expected = data.l64;
713     // Check tags if necessary.
714     if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
715       expected = addr_to_tag(&memblock[i]);
716     }
717
718
719     // If the value is incorrect, save an error record for later printing.
720     if (actual != expected) {
721       if (errors < kErrorLimit) {
722         recorded[errors].actual = actual;
723         recorded[errors].expected = expected;
724         recorded[errors].vaddr = &memblock[i];
725         errors++;
726       } else {
727         page_error = true;
728         // If we have overflowed the error queue, just print the errors now.
729         logprintf(10, "Log: Error record overflow, too many miscompares!\n");
730         errormessage = "Page Error";
731         break;
732       }
733     }
734   }
735
736   // Find if this is a whole block corruption.
737   if (page_error && !tag_mode_) {
738     int patsize = patternlist_->Size();
739     for (int pat = 0; pat < patsize; pat++) {
740       class Pattern *altpattern = patternlist_->GetPattern(pat);
741       const int kGood = 0;
742       const int kBad = 1;
743       const int kGoodAgain = 2;
744       const int kNoMatch = 3;
745       int state = kGood;
746       unsigned int badstart = 0;
747       unsigned int badend = 0;
748
749       // Don't match against ourself!
750       if (pattern == altpattern)
751         continue;
752
753       for (int i = 0; i < length / wordsize_; i++) {
754         uint64 actual = memblock[i];
755         datacast_t expected;
756         datacast_t possible;
757
758         // Determine the value that should be there.
759         int index = 2 * i + pattern_offset;
760
761         expected.l32.l = pattern->pattern(index);
762         expected.l32.h = pattern->pattern(index + 1);
763
764         possible.l32.l = pattern->pattern(index);
765         possible.l32.h = pattern->pattern(index + 1);
766
767         if (state == kGood) {
768           if (actual == expected.l64) {
769             continue;
770           } else if (actual == possible.l64) {
771             badstart = i;
772             badend = i;
773             state = kBad;
774             continue;
775           } else {
776             state = kNoMatch;
777             break;
778           }
779         } else if (state == kBad) {
780           if (actual == possible.l64) {
781             badend = i;
782             continue;
783           } else if (actual == expected.l64) {
784             state = kGoodAgain;
785             continue;
786           } else {
787             state = kNoMatch;
788             break;
789           }
790         } else if (state == kGoodAgain) {
791           if (actual == expected.l64) {
792             continue;
793           } else {
794             state = kNoMatch;
795             break;
796           }
797         }
798       }
799
800       if ((state == kGoodAgain) || (state == kBad)) {
801         unsigned int blockerrors = badend - badstart + 1;
802         errormessage = "Block Error";
803         ProcessError(&recorded[0], 0, errormessage.c_str());
804         logprintf(0, "Block Error: (%p) pattern %s instead of %s, "
805                   "%d bytes from offset 0x%x to 0x%x\n",
806                   &memblock[badstart],
807                   altpattern->name(), pattern->name(),
808                   blockerrors * wordsize_,
809                   offset + badstart * wordsize_,
810                   offset + badend * wordsize_);
811         errorcount_ += blockerrors;
812         return blockerrors;
813       }
814     }
815   }
816
817
818   // Process error queue after all errors have been recorded.
819   for (int err = 0; err < errors; err++) {
820     int priority = 5;
821     if (errorcount_ + err < 30)
822       priority = 0;  // Bump up the priority for the first few errors.
823     ProcessError(&recorded[err], priority, errormessage.c_str());
824   }
825
826   if (page_error) {
827     // For each word in the data region.
828     int error_recount = 0;
829     for (int i = 0; i < length / wordsize_; i++) {
830       uint64 actual = memblock[i];
831       uint64 expected;
832       datacast_t data;
833       // Determine the value that should be there.
834       int index = 2 * i + pattern_offset;
835
836       data.l32.l = pattern->pattern(index);
837       data.l32.h = pattern->pattern(index + 1);
838       expected = data.l64;
839
840       // Check tags if necessary.
841       if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
842         expected = addr_to_tag(&memblock[i]);
843       }
844
845       // If the value is incorrect, save an error record for later printing.
846       if (actual != expected) {
847         if (error_recount < kErrorLimit) {
848           // We already reported these.
849           error_recount++;
850         } else {
851           // If we have overflowed the error queue, print the errors now.
852           struct ErrorRecord er;
853           er.actual = actual;
854           er.expected = expected;
855           er.vaddr = &memblock[i];
856
857           // Do the error printout. This will take a long time and
858           // likely change the machine state.
859           ProcessError(&er, 12, errormessage.c_str());
860           overflowerrors++;
861         }
862       }
863     }
864   }
865
866   // Keep track of observed errors.
867   errorcount_ += errors + overflowerrors;
868   return errors + overflowerrors;
869 }
870
871 float WorkerThread::GetCopiedData() {
872   return pages_copied_ * sat_->page_length() / kMegabyte;
873 }
874
875 // Calculate the CRC of a region.
876 // Result check if the CRC mismatches.
877 int WorkerThread::CrcCheckPage(struct page_entry *srcpe) {
878   const int blocksize = 4096;
879   const int blockwords = blocksize / wordsize_;
880   int errors = 0;
881
882   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
883   uint64 *memblock = static_cast<uint64*>(srcpe->addr);
884   int blocks = sat_->page_length() / blocksize;
885   for (int currentblock = 0; currentblock < blocks; currentblock++) {
886     uint64 *memslice = memblock + currentblock * blockwords;
887
888     AdlerChecksum crc;
889     if (tag_mode_) {
890       AdlerAddrCrcC(memslice, blocksize, &crc, srcpe);
891     } else {
892       CalculateAdlerChecksum(memslice, blocksize, &crc);
893     }
894
895     // If the CRC does not match, we'd better look closer.
896     if (!crc.Equals(*expectedcrc)) {
897       logprintf(11, "Log: CrcCheckPage Falling through to slow compare, "
898                 "CRC mismatch %s != %s\n",
899                 crc.ToHexString().c_str(),
900                 expectedcrc->ToHexString().c_str());
901       int errorcount = CheckRegion(memslice,
902                                    srcpe->pattern,
903                                    blocksize,
904                                    currentblock * blocksize, 0);
905       if (errorcount == 0) {
906         logprintf(0, "Log: CrcCheckPage CRC mismatch %s != %s, "
907                      "but no miscompares found.\n",
908                   crc.ToHexString().c_str(),
909                   expectedcrc->ToHexString().c_str());
910       }
911       errors += errorcount;
912     }
913   }
914
915   // For odd length transfers, we should never hit this.
916   int leftovers = sat_->page_length() % blocksize;
917   if (leftovers) {
918     uint64 *memslice = memblock + blocks * blockwords;
919     errors += CheckRegion(memslice,
920                           srcpe->pattern,
921                           leftovers,
922                           blocks * blocksize, 0);
923   }
924   return errors;
925 }
926
927
928 // Print error information about a data miscompare.
929 void WorkerThread::ProcessTagError(struct ErrorRecord *error,
930                                    int priority,
931                                    const char *message) {
932   char dimm_string[256] = "";
933   char tag_dimm_string[256] = "";
934   bool read_error = false;
935
936   int apic_id = apicid();
937
938   // Determine if this is a write or read error.
939   os_->Flush(error->vaddr);
940   error->reread = *(error->vaddr);
941
942   // Distinguish read and write errors.
943   if (error->actual != error->reread) {
944     read_error = true;
945   }
946
947   sat_assert(error->expected != error->actual);
948
949   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr);
950
951   // Find physical address if possible.
952   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
953   error->tagpaddr = os_->VirtualToPhysical(error->tagvaddr);
954
955   // Pretty print DIMM mapping if available.
956   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
957   // Pretty print DIMM mapping if available.
958   os_->FindDimm(error->tagpaddr, tag_dimm_string, sizeof(tag_dimm_string));
959
960   // Report parseable error.
961   if (priority < 5) {
962     logprintf(priority,
963               "%s: Tag from %p(0x%llx:%s) (%s) "
964               "miscompare on CPU %d(0x%s) at %p(0x%llx:%s): "
965               "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
966               message,
967               error->tagvaddr, error->tagpaddr,
968               tag_dimm_string,
969               read_error ? "read error" : "write error",
970               apic_id,
971               CurrentCpusFormat().c_str(),
972               error->vaddr,
973               error->paddr,
974               dimm_string,
975               error->actual,
976               error->reread,
977               error->expected);
978   }
979
980   errorcount_ += 1;
981
982   // Overwrite incorrect data with correct data to prevent
983   // future miscompares when this data is reused.
984   *(error->vaddr) = error->expected;
985   os_->Flush(error->vaddr);
986 }
987
988
989 // Print out and log a tag error.
990 bool WorkerThread::ReportTagError(
991     uint64 *mem64,
992     uint64 actual,
993     uint64 tag) {
994   struct ErrorRecord er;
995   er.actual = actual;
996
997   er.expected = tag;
998   er.vaddr = mem64;
999
1000   // Generate vaddr from tag.
1001   er.tagvaddr = reinterpret_cast<uint64*>(actual);
1002
1003   ProcessTagError(&er, 0, "Hardware Error");
1004   return true;
1005 }
1006
1007 // C implementation of Adler memory copy, with memory tagging.
1008 bool WorkerThread::AdlerAddrMemcpyC(uint64 *dstmem64,
1009                                     uint64 *srcmem64,
1010                                     unsigned int size_in_bytes,
1011                                     AdlerChecksum *checksum,
1012                                     struct page_entry *pe) {
1013   // Use this data wrapper to access memory with 64bit read/write.
1014   datacast_t data;
1015   datacast_t dstdata;
1016   unsigned int count = size_in_bytes / sizeof(data);
1017
1018   if (count > ((1U) << 19)) {
1019     // Size is too large, must be strictly less than 512 KB.
1020     return false;
1021   }
1022
1023   uint64 a1 = 1;
1024   uint64 a2 = 1;
1025   uint64 b1 = 0;
1026   uint64 b2 = 0;
1027
1028   class Pattern *pattern = pe->pattern;
1029
1030   unsigned int i = 0;
1031   while (i < count) {
1032     // Process 64 bits at a time.
1033     if ((i & 0x7) == 0) {
1034       data.l64 = srcmem64[i];
1035       dstdata.l64 = dstmem64[i];
1036       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1037       uint64 dst_tag = addr_to_tag(&dstmem64[i]);
1038       // Detect if tags have been corrupted.
1039       if (data.l64 != src_tag)
1040         ReportTagError(&srcmem64[i], data.l64, src_tag);
1041       if (dstdata.l64 != dst_tag)
1042         ReportTagError(&dstmem64[i], dstdata.l64, dst_tag);
1043
1044       data.l32.l = pattern->pattern(i << 1);
1045       data.l32.h = pattern->pattern((i << 1) + 1);
1046       a1 = a1 + data.l32.l;
1047       b1 = b1 + a1;
1048       a1 = a1 + data.l32.h;
1049       b1 = b1 + a1;
1050
1051       data.l64  = dst_tag;
1052       dstmem64[i] = data.l64;
1053
1054     } else {
1055       data.l64 = srcmem64[i];
1056       a1 = a1 + data.l32.l;
1057       b1 = b1 + a1;
1058       a1 = a1 + data.l32.h;
1059       b1 = b1 + a1;
1060       dstmem64[i] = data.l64;
1061     }
1062     i++;
1063
1064     data.l64 = srcmem64[i];
1065     a2 = a2 + data.l32.l;
1066     b2 = b2 + a2;
1067     a2 = a2 + data.l32.h;
1068     b2 = b2 + a2;
1069     dstmem64[i] = data.l64;
1070     i++;
1071   }
1072   checksum->Set(a1, a2, b1, b2);
1073   return true;
1074 }
1075
1076 // x86_64 SSE2 assembly implementation of Adler memory copy, with address
1077 // tagging added as a second step. This is useful for debugging failures
1078 // that only occur when SSE / nontemporal writes are used.
1079 bool WorkerThread::AdlerAddrMemcpyWarm(uint64 *dstmem64,
1080                                        uint64 *srcmem64,
1081                                        unsigned int size_in_bytes,
1082                                        AdlerChecksum *checksum,
1083                                        struct page_entry *pe) {
1084   // Do ASM copy, ignore checksum.
1085   AdlerChecksum ignored_checksum;
1086   os_->AdlerMemcpyWarm(dstmem64, srcmem64, size_in_bytes, &ignored_checksum);
1087
1088   // Force cache flush.
1089   int length = size_in_bytes / sizeof(*dstmem64);
1090   for (int i = 0; i < length; i += sizeof(*dstmem64)) {
1091     os_->FastFlush(dstmem64 + i);
1092     os_->FastFlush(srcmem64 + i);
1093   }
1094   // Check results.
1095   AdlerAddrCrcC(srcmem64, size_in_bytes, checksum, pe);
1096   // Patch up address tags.
1097   TagAddrC(dstmem64, size_in_bytes);
1098   return true;
1099 }
1100
1101 // Retag pages..
1102 bool WorkerThread::TagAddrC(uint64 *memwords,
1103                             unsigned int size_in_bytes) {
1104   // Mask is the bitmask of indexes used by the pattern.
1105   // It is the pattern size -1. Size is always a power of 2.
1106
1107   // Select tag or data as appropriate.
1108   int length = size_in_bytes / wordsize_;
1109   for (int i = 0; i < length; i += 8) {
1110     datacast_t data;
1111     data.l64 = addr_to_tag(&memwords[i]);
1112     memwords[i] = data.l64;
1113   }
1114   return true;
1115 }
1116
1117 // C implementation of Adler memory crc.
1118 bool WorkerThread::AdlerAddrCrcC(uint64 *srcmem64,
1119                                  unsigned int size_in_bytes,
1120                                  AdlerChecksum *checksum,
1121                                  struct page_entry *pe) {
1122   // Use this data wrapper to access memory with 64bit read/write.
1123   datacast_t data;
1124   unsigned int count = size_in_bytes / sizeof(data);
1125
1126   if (count > ((1U) << 19)) {
1127     // Size is too large, must be strictly less than 512 KB.
1128     return false;
1129   }
1130
1131   uint64 a1 = 1;
1132   uint64 a2 = 1;
1133   uint64 b1 = 0;
1134   uint64 b2 = 0;
1135
1136   class Pattern *pattern = pe->pattern;
1137
1138   unsigned int i = 0;
1139   while (i < count) {
1140     // Process 64 bits at a time.
1141     if ((i & 0x7) == 0) {
1142       data.l64 = srcmem64[i];
1143       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1144       // Check that tags match expected.
1145       if (data.l64 != src_tag)
1146         ReportTagError(&srcmem64[i], data.l64, src_tag);
1147
1148       data.l32.l = pattern->pattern(i << 1);
1149       data.l32.h = pattern->pattern((i << 1) + 1);
1150       a1 = a1 + data.l32.l;
1151       b1 = b1 + a1;
1152       a1 = a1 + data.l32.h;
1153       b1 = b1 + a1;
1154     } else {
1155       data.l64 = srcmem64[i];
1156       a1 = a1 + data.l32.l;
1157       b1 = b1 + a1;
1158       a1 = a1 + data.l32.h;
1159       b1 = b1 + a1;
1160     }
1161     i++;
1162
1163     data.l64 = srcmem64[i];
1164     a2 = a2 + data.l32.l;
1165     b2 = b2 + a2;
1166     a2 = a2 + data.l32.h;
1167     b2 = b2 + a2;
1168     i++;
1169   }
1170   checksum->Set(a1, a2, b1, b2);
1171   return true;
1172 }
1173
1174 // Copy a block of memory quickly, while keeping a CRC of the data.
1175 // Result check if the CRC mismatches.
1176 int WorkerThread::CrcCopyPage(struct page_entry *dstpe,
1177                               struct page_entry *srcpe) {
1178   int errors = 0;
1179   const int blocksize = 4096;
1180   const int blockwords = blocksize / wordsize_;
1181   int blocks = sat_->page_length() / blocksize;
1182
1183   // Base addresses for memory copy
1184   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1185   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1186   // Remember the expected CRC
1187   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1188
1189   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1190     uint64 *targetmem = targetmembase + currentblock * blockwords;
1191     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1192
1193     AdlerChecksum crc;
1194     if (tag_mode_) {
1195       AdlerAddrMemcpyC(targetmem, sourcemem, blocksize, &crc, srcpe);
1196     } else {
1197       AdlerMemcpyC(targetmem, sourcemem, blocksize, &crc);
1198     }
1199
1200     // Investigate miscompares.
1201     if (!crc.Equals(*expectedcrc)) {
1202       logprintf(11, "Log: CrcCopyPage Falling through to slow compare, "
1203                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1204                 expectedcrc->ToHexString().c_str());
1205       int errorcount = CheckRegion(sourcemem,
1206                                    srcpe->pattern,
1207                                    blocksize,
1208                                    currentblock * blocksize, 0);
1209       if (errorcount == 0) {
1210         logprintf(0, "Log: CrcCopyPage CRC mismatch %s != %s, "
1211                      "but no miscompares found. Retrying with fresh data.\n",
1212                   crc.ToHexString().c_str(),
1213                   expectedcrc->ToHexString().c_str());
1214         if (!tag_mode_) {
1215           // Copy the data originally read from this region back again.
1216           // This data should have any corruption read originally while
1217           // calculating the CRC.
1218           memcpy(sourcemem, targetmem, blocksize);
1219           errorcount = CheckRegion(sourcemem,
1220                                    srcpe->pattern,
1221                                    blocksize,
1222                                    currentblock * blocksize, 0);
1223           if (errorcount == 0) {
1224             int apic_id = apicid();
1225             logprintf(0, "Process Error: CPU %d(0x%s) CrcCopyPage "
1226                          "CRC mismatch %s != %s, "
1227                          "but no miscompares found on second pass.\n",
1228                       apic_id, CurrentCpusFormat().c_str(),
1229                       crc.ToHexString().c_str(),
1230                       expectedcrc->ToHexString().c_str());
1231             struct ErrorRecord er;
1232             er.actual = sourcemem[0];
1233             er.expected = 0x0;
1234             er.vaddr = sourcemem;
1235             ProcessError(&er, 0, "Hardware Error");
1236           }
1237         }
1238       }
1239       errors += errorcount;
1240     }
1241   }
1242
1243   // For odd length transfers, we should never hit this.
1244   int leftovers = sat_->page_length() % blocksize;
1245   if (leftovers) {
1246     uint64 *targetmem = targetmembase + blocks * blockwords;
1247     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1248
1249     errors += CheckRegion(sourcemem,
1250                           srcpe->pattern,
1251                           leftovers,
1252                           blocks * blocksize, 0);
1253     int leftoverwords = leftovers / wordsize_;
1254     for (int i = 0; i < leftoverwords; i++) {
1255       targetmem[i] = sourcemem[i];
1256     }
1257   }
1258
1259   // Update pattern reference to reflect new contents.
1260   dstpe->pattern = srcpe->pattern;
1261
1262   // Clean clean clean the errors away.
1263   if (errors) {
1264     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1265     // cause bad data to be propogated across the page.
1266     FillPage(dstpe);
1267   }
1268   return errors;
1269 }
1270
1271
1272
1273 // Invert a block of memory quickly, traversing downwards.
1274 int InvertThread::InvertPageDown(struct page_entry *srcpe) {
1275   const int blocksize = 4096;
1276   const int blockwords = blocksize / wordsize_;
1277   int blocks = sat_->page_length() / blocksize;
1278
1279   // Base addresses for memory copy
1280   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1281
1282   for (int currentblock = blocks-1; currentblock >= 0; currentblock--) {
1283     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1284     for (int i = blockwords - 32; i >= 0; i -= 32) {
1285       for (int index = i + 31; index >= i; --index) {
1286         unsigned int actual = sourcemem[index];
1287         sourcemem[index] = ~actual;
1288       }
1289       OsLayer::FastFlush(&sourcemem[i]);
1290     }
1291   }
1292
1293   return 0;
1294 }
1295
1296 // Invert a block of memory, traversing upwards.
1297 int InvertThread::InvertPageUp(struct page_entry *srcpe) {
1298   const int blocksize = 4096;
1299   const int blockwords = blocksize / wordsize_;
1300   int blocks = sat_->page_length() / blocksize;
1301
1302   // Base addresses for memory copy
1303   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1304
1305   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1306     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1307     for (int i = 0; i < blockwords; i += 32) {
1308       for (int index = i; index <= i + 31; ++index) {
1309         unsigned int actual = sourcemem[index];
1310         sourcemem[index] = ~actual;
1311       }
1312       OsLayer::FastFlush(&sourcemem[i]);
1313     }
1314   }
1315   return 0;
1316 }
1317
1318 // Copy a block of memory quickly, while keeping a CRC of the data.
1319 // Result check if the CRC mismatches. Warm the CPU while running
1320 int WorkerThread::CrcWarmCopyPage(struct page_entry *dstpe,
1321                                   struct page_entry *srcpe) {
1322   int errors = 0;
1323   const int blocksize = 4096;
1324   const int blockwords = blocksize / wordsize_;
1325   int blocks = sat_->page_length() / blocksize;
1326
1327   // Base addresses for memory copy
1328   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1329   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1330   // Remember the expected CRC
1331   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1332
1333   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1334     uint64 *targetmem = targetmembase + currentblock * blockwords;
1335     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1336
1337     AdlerChecksum crc;
1338     if (tag_mode_) {
1339       AdlerAddrMemcpyWarm(targetmem, sourcemem, blocksize, &crc, srcpe);
1340     } else {
1341       os_->AdlerMemcpyWarm(targetmem, sourcemem, blocksize, &crc);
1342     }
1343
1344     // Investigate miscompares.
1345     if (!crc.Equals(*expectedcrc)) {
1346       logprintf(11, "Log: CrcWarmCopyPage Falling through to slow compare, "
1347                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1348                 expectedcrc->ToHexString().c_str());
1349       int errorcount = CheckRegion(sourcemem,
1350                                    srcpe->pattern,
1351                                    blocksize,
1352                                    currentblock * blocksize, 0);
1353       if (errorcount == 0) {
1354         logprintf(0, "Log: CrcWarmCopyPage CRC mismatch %s != %s, "
1355                      "but no miscompares found. Retrying with fresh data.\n",
1356                   crc.ToHexString().c_str(),
1357                   expectedcrc->ToHexString().c_str());
1358         if (!tag_mode_) {
1359           // Copy the data originally read from this region back again.
1360           // This data should have any corruption read originally while
1361           // calculating the CRC.
1362           memcpy(sourcemem, targetmem, blocksize);
1363           errorcount = CheckRegion(sourcemem,
1364                                    srcpe->pattern,
1365                                    blocksize,
1366                                    currentblock * blocksize, 0);
1367           if (errorcount == 0) {
1368             int apic_id = apicid();
1369             logprintf(0, "Process Error: CPU %d(0x%s) CrciWarmCopyPage "
1370                          "CRC mismatch %s != %s, "
1371                          "but no miscompares found on second pass.\n",
1372                       apic_id, CurrentCpusFormat().c_str(),
1373                       crc.ToHexString().c_str(),
1374                       expectedcrc->ToHexString().c_str());
1375             struct ErrorRecord er;
1376             er.actual = sourcemem[0];
1377             er.expected = 0x0;
1378             er.vaddr = sourcemem;
1379             ProcessError(&er, 0, "Hardware Error");
1380           }
1381         }
1382       }
1383       errors += errorcount;
1384     }
1385   }
1386
1387   // For odd length transfers, we should never hit this.
1388   int leftovers = sat_->page_length() % blocksize;
1389   if (leftovers) {
1390     uint64 *targetmem = targetmembase + blocks * blockwords;
1391     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1392
1393     errors += CheckRegion(sourcemem,
1394                           srcpe->pattern,
1395                           leftovers,
1396                           blocks * blocksize, 0);
1397     int leftoverwords = leftovers / wordsize_;
1398     for (int i = 0; i < leftoverwords; i++) {
1399       targetmem[i] = sourcemem[i];
1400     }
1401   }
1402
1403   // Update pattern reference to reflect new contents.
1404   dstpe->pattern = srcpe->pattern;
1405
1406   // Clean clean clean the errors away.
1407   if (errors) {
1408     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1409     // cause bad data to be propogated across the page.
1410     FillPage(dstpe);
1411   }
1412   return errors;
1413 }
1414
1415
1416
1417 // Memory check work loop. Execute until done, then exhaust pages.
1418 bool CheckThread::Work() {
1419   struct page_entry pe;
1420   bool result = true;
1421   int64 loops = 0;
1422
1423   logprintf(9, "Log: Starting Check thread %d\n", thread_num_);
1424
1425   // We want to check all the pages, and
1426   // stop when there aren't any left.
1427   while (true) {
1428     result = result && sat_->GetValid(&pe);
1429     if (!result) {
1430       if (IsReadyToRunNoPause())
1431         logprintf(0, "Process Error: check_thread failed to pop pages, "
1432                   "bailing\n");
1433       else
1434         result = true;
1435       break;
1436     }
1437
1438     // Do the result check.
1439     CrcCheckPage(&pe);
1440
1441     // Push pages back on the valid queue if we are still going,
1442     // throw them out otherwise.
1443     if (IsReadyToRunNoPause())
1444       result = result && sat_->PutValid(&pe);
1445     else
1446       result = result && sat_->PutEmpty(&pe);
1447     if (!result) {
1448       logprintf(0, "Process Error: check_thread failed to push pages, "
1449                 "bailing\n");
1450       break;
1451     }
1452     loops++;
1453   }
1454
1455   pages_copied_ = loops;
1456   status_ = result;
1457   logprintf(9, "Log: Completed %d: Check thread. Status %d, %d pages checked\n",
1458             thread_num_, status_, pages_copied_);
1459   return result;
1460 }
1461
1462
1463 // Memory copy work loop. Execute until marked done.
1464 bool CopyThread::Work() {
1465   struct page_entry src;
1466   struct page_entry dst;
1467   bool result = true;
1468   int64 loops = 0;
1469
1470   logprintf(9, "Log: Starting copy thread %d: cpu %s, mem %x\n",
1471             thread_num_, cpuset_format(&cpu_mask_).c_str(), tag_);
1472
1473   while (IsReadyToRun()) {
1474     // Pop the needed pages.
1475     result = result && sat_->GetValid(&src, tag_);
1476     result = result && sat_->GetEmpty(&dst, tag_);
1477     if (!result) {
1478       logprintf(0, "Process Error: copy_thread failed to pop pages, "
1479                 "bailing\n");
1480       break;
1481     }
1482
1483     // Force errors for unittests.
1484     if (sat_->error_injection()) {
1485       if (loops == 8) {
1486         char *addr = reinterpret_cast<char*>(src.addr);
1487         int offset = random() % sat_->page_length();
1488         addr[offset] = 0xba;
1489       }
1490     }
1491
1492     // We can use memcpy, or CRC check while we copy.
1493     if (sat_->warm()) {
1494       CrcWarmCopyPage(&dst, &src);
1495     } else if (sat_->strict()) {
1496       CrcCopyPage(&dst, &src);
1497     } else {
1498       memcpy(dst.addr, src.addr, sat_->page_length());
1499       dst.pattern = src.pattern;
1500     }
1501
1502     result = result && sat_->PutValid(&dst);
1503     result = result && sat_->PutEmpty(&src);
1504
1505     // Copy worker-threads yield themselves at the end of each copy loop,
1506     // to avoid threads from preempting each other in the middle of the inner
1507     // copy-loop. Cooperations between Copy worker-threads results in less
1508     // unnecessary cache thrashing (which happens when context-switching in the
1509     // middle of the inner copy-loop).
1510     YieldSelf();
1511
1512     if (!result) {
1513       logprintf(0, "Process Error: copy_thread failed to push pages, "
1514                 "bailing\n");
1515       break;
1516     }
1517     loops++;
1518   }
1519
1520   pages_copied_ = loops;
1521   status_ = result;
1522   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1523             thread_num_, status_, pages_copied_);
1524   return result;
1525 }
1526
1527 // Memory invert work loop. Execute until marked done.
1528 bool InvertThread::Work() {
1529   struct page_entry src;
1530   bool result = true;
1531   int64 loops = 0;
1532
1533   logprintf(9, "Log: Starting invert thread %d\n", thread_num_);
1534
1535   while (IsReadyToRun()) {
1536     // Pop the needed pages.
1537     result = result && sat_->GetValid(&src);
1538     if (!result) {
1539       logprintf(0, "Process Error: invert_thread failed to pop pages, "
1540                 "bailing\n");
1541       break;
1542     }
1543
1544     if (sat_->strict())
1545       CrcCheckPage(&src);
1546
1547     // For the same reason CopyThread yields itself (see YieldSelf comment
1548     // in CopyThread::Work(), InvertThread yields itself after each invert
1549     // operation to improve cooperation between different worker threads
1550     // stressing the memory/cache.
1551     InvertPageUp(&src);
1552     YieldSelf();
1553     InvertPageDown(&src);
1554     YieldSelf();
1555     InvertPageDown(&src);
1556     YieldSelf();
1557     InvertPageUp(&src);
1558     YieldSelf();
1559
1560     if (sat_->strict())
1561       CrcCheckPage(&src);
1562
1563     result = result && sat_->PutValid(&src);
1564     if (!result) {
1565       logprintf(0, "Process Error: invert_thread failed to push pages, "
1566                 "bailing\n");
1567       break;
1568     }
1569     loops++;
1570   }
1571
1572   pages_copied_ = loops * 2;
1573   status_ = result;
1574   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1575             thread_num_, status_, pages_copied_);
1576   return result;
1577 }
1578
1579
1580 // Set file name to use for File IO.
1581 void FileThread::SetFile(const char *filename_init) {
1582   filename_ = filename_init;
1583   devicename_ = os_->FindFileDevice(filename_);
1584 }
1585
1586 // Open the file for access.
1587 bool FileThread::OpenFile(int *pfile) {
1588   int fd = open(filename_.c_str(),
1589                 O_RDWR | O_CREAT | O_SYNC | O_DIRECT,
1590                 0644);
1591   if (fd < 0) {
1592     logprintf(0, "Process Error: Failed to create file %s!!\n",
1593               filename_.c_str());
1594     pages_copied_ = 0;
1595     return false;
1596   }
1597   *pfile = fd;
1598   return true;
1599 }
1600
1601 // Close the file.
1602 bool FileThread::CloseFile(int fd) {
1603   close(fd);
1604   return true;
1605 }
1606
1607 // Check sector tagging.
1608 bool FileThread::SectorTagPage(struct page_entry *src, int block) {
1609   int page_length = sat_->page_length();
1610   struct FileThread::SectorTag *tag =
1611     (struct FileThread::SectorTag *)(src->addr);
1612
1613   // Tag each sector.
1614   unsigned char magic = ((0xba + thread_num_) & 0xff);
1615   for (int sec = 0; sec < page_length / 512; sec++) {
1616     tag[sec].magic = magic;
1617     tag[sec].block = block & 0xff;
1618     tag[sec].sector = sec & 0xff;
1619     tag[sec].pass = pass_ & 0xff;
1620   }
1621   return true;
1622 }
1623
1624 bool FileThread::WritePageToFile(int fd, struct page_entry *src) {
1625   int page_length = sat_->page_length();
1626   // Fill the file with our data.
1627   int64 size = write(fd, src->addr, page_length);
1628
1629   if (size != page_length) {
1630     os_->ErrorReport(devicename_.c_str(), "write-error", 1);
1631     errorcount_++;
1632     logprintf(0, "Block Error: file_thread failed to write, "
1633               "bailing\n");
1634     return false;
1635   }
1636   return true;
1637 }
1638
1639 // Write the data to the file.
1640 bool FileThread::WritePages(int fd) {
1641   int strict = sat_->strict();
1642
1643   // Start fresh at beginning of file for each batch of pages.
1644   lseek64(fd, 0, SEEK_SET);
1645   for (int i = 0; i < sat_->disk_pages(); i++) {
1646     struct page_entry src;
1647     if (!GetValidPage(&src))
1648       return false;
1649     // Save expected pattern.
1650     page_recs_[i].pattern = src.pattern;
1651     page_recs_[i].src = src.addr;
1652
1653     // Check data correctness.
1654     if (strict)
1655       CrcCheckPage(&src);
1656
1657     SectorTagPage(&src, i);
1658
1659     bool result = WritePageToFile(fd, &src);
1660
1661     if (!PutEmptyPage(&src))
1662       return false;
1663
1664     if (!result)
1665       return false;
1666   }
1667   return true;
1668 }
1669
1670 // Copy data from file into memory block.
1671 bool FileThread::ReadPageFromFile(int fd, struct page_entry *dst) {
1672   int page_length = sat_->page_length();
1673
1674   // Do the actual read.
1675   int64 size = read(fd, dst->addr, page_length);
1676   if (size != page_length) {
1677     os_->ErrorReport(devicename_.c_str(), "read-error", 1);
1678     logprintf(0, "Block Error: file_thread failed to read, "
1679               "bailing\n");
1680     errorcount_++;
1681     return false;
1682   }
1683   return true;
1684 }
1685
1686 // Check sector tagging.
1687 bool FileThread::SectorValidatePage(const struct PageRec &page,
1688                                     struct page_entry *dst, int block) {
1689   // Error injection.
1690   static int calls = 0;
1691   calls++;
1692
1693   // Do sector tag compare.
1694   int firstsector = -1;
1695   int lastsector = -1;
1696   bool badsector = false;
1697   int page_length = sat_->page_length();
1698
1699   // Cast data block into an array of tagged sectors.
1700   struct FileThread::SectorTag *tag =
1701   (struct FileThread::SectorTag *)(dst->addr);
1702
1703   sat_assert(sizeof(*tag) == 512);
1704
1705   // Error injection.
1706   if (sat_->error_injection()) {
1707     if (calls == 2) {
1708       for (int badsec = 8; badsec < 17; badsec++)
1709         tag[badsec].pass = 27;
1710     }
1711     if (calls == 18) {
1712       (static_cast<int32*>(dst->addr))[27] = 0xbadda7a;
1713     }
1714   }
1715
1716   // Check each sector for the correct tag we added earlier,
1717   // then revert the tag to the to normal data pattern.
1718   unsigned char magic = ((0xba + thread_num_) & 0xff);
1719   for (int sec = 0; sec < page_length / 512; sec++) {
1720     // Check magic tag.
1721     if ((tag[sec].magic != magic) ||
1722         (tag[sec].block != (block & 0xff)) ||
1723         (tag[sec].sector != (sec & 0xff)) ||
1724         (tag[sec].pass != (pass_ & 0xff))) {
1725       // Offset calculation for tag location.
1726       int offset = sec * sizeof(SectorTag);
1727       if (tag[sec].block != (block & 0xff))
1728         offset += 1 * sizeof(uint8);
1729       else if (tag[sec].sector != (sec & 0xff))
1730         offset += 2 * sizeof(uint8);
1731       else if (tag[sec].pass != (pass_ & 0xff))
1732         offset += 3 * sizeof(uint8);
1733
1734       // Run sector tag error through diagnoser for logging and reporting.
1735       errorcount_ += 1;
1736       os_->error_diagnoser_->AddHDDSectorTagError(devicename_, tag[sec].block,
1737                                                   offset,
1738                                                   tag[sec].sector,
1739                                                   page.src, page.dst);
1740
1741       logprintf(5, "Sector Error: Sector tag @ 0x%x, pass %d/%d. "
1742                 "sec %x/%x, block %d/%d, magic %x/%x, File: %s \n",
1743                 block * page_length + 512 * sec,
1744                 (pass_ & 0xff), (unsigned int)tag[sec].pass,
1745                 sec, (unsigned int)tag[sec].sector,
1746                 block, (unsigned int)tag[sec].block,
1747                 magic, (unsigned int)tag[sec].magic,
1748                 filename_.c_str());
1749
1750       // Keep track of first and last bad sector.
1751       if (firstsector == -1)
1752         firstsector = (block * page_length / 512) + sec;
1753       lastsector = (block * page_length / 512) + sec;
1754       badsector = true;
1755     }
1756     // Patch tag back to proper pattern.
1757     unsigned int *addr = (unsigned int *)(&tag[sec]);
1758     *addr = dst->pattern->pattern(512 * sec / sizeof(*addr));
1759   }
1760
1761   // If we found sector errors:
1762   if (badsector == true) {
1763     logprintf(5, "Log: file sector miscompare at offset %x-%x. File: %s\n",
1764               firstsector * 512,
1765               ((lastsector + 1) * 512) - 1,
1766               filename_.c_str());
1767
1768     // Either exit immediately, or patch the data up and continue.
1769     if (sat_->stop_on_error()) {
1770       exit(1);
1771     } else {
1772       // Patch up bad pages.
1773       for (int block = (firstsector * 512) / page_length;
1774           block <= (lastsector * 512) / page_length;
1775           block++) {
1776         unsigned int *memblock = static_cast<unsigned int *>(dst->addr);
1777         int length = page_length / wordsize_;
1778         for (int i = 0; i < length; i++) {
1779           memblock[i] = dst->pattern->pattern(i);
1780         }
1781       }
1782     }
1783   }
1784   return true;
1785 }
1786
1787 // Get memory for an incoming data transfer..
1788 bool FileThread::PagePrepare() {
1789   // We can only do direct IO to SAT pages if it is normal mem.
1790   page_io_ = os_->normal_mem();
1791
1792   // Init a local buffer if we need it.
1793   if (!page_io_) {
1794     int result = posix_memalign(&local_page_, 512, sat_->page_length());
1795     if (result) {
1796       logprintf(0, "Process Error: disk thread posix_memalign "
1797                    "returned %d (fail)\n",
1798                 result);
1799       status_ = false;
1800       return false;
1801     }
1802   }
1803   return true;
1804 }
1805
1806
1807 // Remove memory allocated for data transfer.
1808 bool FileThread::PageTeardown() {
1809   // Free a local buffer if we need to.
1810   if (!page_io_) {
1811     free(local_page_);
1812   }
1813   return true;
1814 }
1815
1816
1817
1818 // Get memory for an incoming data transfer..
1819 bool FileThread::GetEmptyPage(struct page_entry *dst) {
1820   if (page_io_) {
1821     if (!sat_->GetEmpty(dst))
1822       return false;
1823   } else {
1824     dst->addr = local_page_;
1825     dst->offset = 0;
1826     dst->pattern = 0;
1827   }
1828   return true;
1829 }
1830
1831 // Get memory for an outgoing data transfer..
1832 bool FileThread::GetValidPage(struct page_entry *src) {
1833   struct page_entry tmp;
1834   if (!sat_->GetValid(&tmp))
1835     return false;
1836   if (page_io_) {
1837     *src = tmp;
1838     return true;
1839   } else {
1840     src->addr = local_page_;
1841     src->offset = 0;
1842     CrcCopyPage(src, &tmp);
1843     if (!sat_->PutValid(&tmp))
1844       return false;
1845   }
1846   return true;
1847 }
1848
1849
1850 // Throw out a used empty page.
1851 bool FileThread::PutEmptyPage(struct page_entry *src) {
1852   if (page_io_) {
1853     if (!sat_->PutEmpty(src))
1854       return false;
1855   }
1856   return true;
1857 }
1858
1859 // Throw out a used, filled page.
1860 bool FileThread::PutValidPage(struct page_entry *src) {
1861   if (page_io_) {
1862     if (!sat_->PutValid(src))
1863       return false;
1864   }
1865   return true;
1866 }
1867
1868 // Copy data from file into memory blocks.
1869 bool FileThread::ReadPages(int fd) {
1870   int page_length = sat_->page_length();
1871   int strict = sat_->strict();
1872   bool result = true;
1873
1874   // Read our data back out of the file, into it's new location.
1875   lseek64(fd, 0, SEEK_SET);
1876   for (int i = 0; i < sat_->disk_pages(); i++) {
1877     struct page_entry dst;
1878     if (!GetEmptyPage(&dst))
1879       return false;
1880     // Retrieve expected pattern.
1881     dst.pattern = page_recs_[i].pattern;
1882     // Update page recordpage record.
1883     page_recs_[i].dst = dst.addr;
1884
1885     // Read from the file into destination page.
1886     if (!ReadPageFromFile(fd, &dst)) {
1887         PutEmptyPage(&dst);
1888         return false;
1889     }
1890
1891     SectorValidatePage(page_recs_[i], &dst, i);
1892
1893     // Ensure that the transfer ended up with correct data.
1894     if (strict) {
1895       // Record page index currently CRC checked.
1896       crc_page_ = i;
1897       int errors = CrcCheckPage(&dst);
1898       if (errors) {
1899         logprintf(5, "Log: file miscompare at block %d, "
1900                   "offset %x-%x. File: %s\n",
1901                   i, i * page_length, ((i + 1) * page_length) - 1,
1902                   filename_.c_str());
1903         result = false;
1904       }
1905       crc_page_ = -1;
1906       errorcount_ += errors;
1907     }
1908     if (!PutValidPage(&dst))
1909       return false;
1910   }
1911   return result;
1912 }
1913
1914 // File IO work loop. Execute until marked done.
1915 bool FileThread::Work() {
1916   bool result = true;
1917   int64 loops = 0;
1918
1919   logprintf(9, "Log: Starting file thread %d, file %s, device %s\n",
1920             thread_num_,
1921             filename_.c_str(),
1922             devicename_.c_str());
1923
1924   if (!PagePrepare()) {
1925     status_ = false;
1926     return false;
1927   }
1928
1929   // Open the data IO file.
1930   int fd = 0;
1931   if (!OpenFile(&fd)) {
1932     status_ = false;
1933     return false;
1934   }
1935
1936   pass_ = 0;
1937
1938   // Load patterns into page records.
1939   page_recs_ = new struct PageRec[sat_->disk_pages()];
1940   for (int i = 0; i < sat_->disk_pages(); i++) {
1941     page_recs_[i].pattern = new struct Pattern();
1942   }
1943
1944   // Loop until done.
1945   while (IsReadyToRun()) {
1946     // Do the file write.
1947     if (!(result = result && WritePages(fd)))
1948       break;
1949
1950     // Do the file read.
1951     if (!(result = result && ReadPages(fd)))
1952       break;
1953
1954     loops++;
1955     pass_ = loops;
1956   }
1957
1958   pages_copied_ = loops * sat_->disk_pages();
1959
1960   // Clean up.
1961   CloseFile(fd);
1962   PageTeardown();
1963
1964   logprintf(9, "Log: Completed %d: file thread status %d, %d pages copied\n",
1965             thread_num_, status_, pages_copied_);
1966   // Failure to read from device indicates hardware,
1967   // rather than procedural SW error.
1968   status_ = true;
1969   return true;
1970 }
1971
1972 bool NetworkThread::IsNetworkStopSet() {
1973   return !IsReadyToRunNoPause();
1974 }
1975
1976 bool NetworkSlaveThread::IsNetworkStopSet() {
1977   // This thread has no completion status.
1978   // It finishes whever there is no more data to be
1979   // passed back.
1980   return true;
1981 }
1982
1983 // Set ip name to use for Network IO.
1984 void NetworkThread::SetIP(const char *ipaddr_init) {
1985   strncpy(ipaddr_, ipaddr_init, 256);
1986 }
1987
1988 // Create a socket.
1989 // Return 0 on error.
1990 bool NetworkThread::CreateSocket(int *psocket) {
1991   int sock = socket(AF_INET, SOCK_STREAM, 0);
1992   if (sock == -1) {
1993     logprintf(0, "Process Error: Cannot open socket\n");
1994     pages_copied_ = 0;
1995     status_ = false;
1996     return false;
1997   }
1998   *psocket = sock;
1999   return true;
2000 }
2001
2002 // Close the socket.
2003 bool NetworkThread::CloseSocket(int sock) {
2004   close(sock);
2005   return true;
2006 }
2007
2008 // Initiate the tcp connection.
2009 bool NetworkThread::Connect(int sock) {
2010   struct sockaddr_in dest_addr;
2011   dest_addr.sin_family = AF_INET;
2012   dest_addr.sin_port = htons(kNetworkPort);
2013   memset(&(dest_addr.sin_zero), '\0', sizeof(dest_addr.sin_zero));
2014
2015   // Translate dot notation to u32.
2016   if (inet_aton(ipaddr_, &dest_addr.sin_addr) == 0) {
2017     logprintf(0, "Process Error: Cannot resolve %s\n", ipaddr_);
2018     pages_copied_ = 0;
2019     status_ = false;
2020     return false;
2021   }
2022
2023   if (-1 == connect(sock, reinterpret_cast<struct sockaddr *>(&dest_addr),
2024                     sizeof(struct sockaddr))) {
2025     logprintf(0, "Process Error: Cannot connect %s\n", ipaddr_);
2026     pages_copied_ = 0;
2027     status_ = false;
2028     return false;
2029   }
2030   return true;
2031 }
2032
2033 // Initiate the tcp connection.
2034 bool NetworkListenThread::Listen() {
2035   struct sockaddr_in sa;
2036
2037   memset(&(sa.sin_zero), '\0', sizeof(sa.sin_zero));
2038
2039   sa.sin_family = AF_INET;
2040   sa.sin_addr.s_addr = INADDR_ANY;
2041   sa.sin_port = htons(kNetworkPort);
2042
2043   if (-1 == bind(sock_, (struct sockaddr*)&sa, sizeof(struct sockaddr))) {
2044     char buf[256];
2045     sat_strerror(errno, buf, sizeof(buf));
2046     logprintf(0, "Process Error: Cannot bind socket: %s\n", buf);
2047     pages_copied_ = 0;
2048     status_ = false;
2049     return false;
2050   }
2051   listen(sock_, 3);
2052   return true;
2053 }
2054
2055 // Wait for a connection from a network traffic generation thread.
2056 bool NetworkListenThread::Wait() {
2057     fd_set rfds;
2058     struct timeval tv;
2059     int retval;
2060
2061     // Watch sock_ to see when it has input.
2062     FD_ZERO(&rfds);
2063     FD_SET(sock_, &rfds);
2064     // Wait up to five seconds.
2065     tv.tv_sec = 5;
2066     tv.tv_usec = 0;
2067
2068     retval = select(sock_ + 1, &rfds, NULL, NULL, &tv);
2069
2070     return (retval > 0);
2071 }
2072
2073 // Wait for a connection from a network traffic generation thread.
2074 bool NetworkListenThread::GetConnection(int *pnewsock) {
2075   struct sockaddr_in sa;
2076   socklen_t size = sizeof(struct sockaddr_in);
2077
2078   int newsock = accept(sock_, reinterpret_cast<struct sockaddr *>(&sa), &size);
2079   if (newsock < 0)  {
2080     logprintf(0, "Process Error: Did not receive connection\n");
2081     pages_copied_ = 0;
2082     status_ = false;
2083     return false;
2084   }
2085   *pnewsock = newsock;
2086   return true;
2087 }
2088
2089 // Send a page, return false if a page was not sent.
2090 bool NetworkThread::SendPage(int sock, struct page_entry *src) {
2091   int page_length = sat_->page_length();
2092   char *address = static_cast<char*>(src->addr);
2093
2094   // Send our data over the network.
2095   int size = page_length;
2096   while (size) {
2097     int transferred = send(sock, address + (page_length - size), size, 0);
2098     if ((transferred == 0) || (transferred == -1)) {
2099       if (!IsNetworkStopSet()) {
2100         char buf[256] = "";
2101         sat_strerror(errno, buf, sizeof(buf));
2102         logprintf(0, "Process Error: Thread %d, "
2103                      "Network write failed, bailing. (%s)\n",
2104                   thread_num_, buf);
2105         status_ = false;
2106       }
2107       return false;
2108     }
2109     size = size - transferred;
2110   }
2111   return true;
2112 }
2113
2114 // Receive a page. Return false if a page was not received.
2115 bool NetworkThread::ReceivePage(int sock, struct page_entry *dst) {
2116   int page_length = sat_->page_length();
2117   char *address = static_cast<char*>(dst->addr);
2118
2119   // Maybe we will get our data back again, maybe not.
2120   int size = page_length;
2121   while (size) {
2122     int transferred = recv(sock, address + (page_length - size), size, 0);
2123     if ((transferred == 0) || (transferred == -1)) {
2124       // Typically network slave thread should exit as network master
2125       // thread stops sending data.
2126       if (IsNetworkStopSet()) {
2127         int err = errno;
2128         if (transferred == 0 && err == 0) {
2129           // Two system setups will not sync exactly,
2130           // allow early exit, but log it.
2131           logprintf(0, "Log: Net thread did not receive any data, exiting.\n");
2132         } else {
2133           char buf[256] = "";
2134           sat_strerror(err, buf, sizeof(buf));
2135           // Print why we failed.
2136           logprintf(0, "Process Error: Thread %d, "
2137                        "Network read failed, bailing (%s).\n",
2138                     thread_num_, buf);
2139           status_ = false;
2140           // Print arguments and results.
2141           logprintf(0, "Log: recv(%d, address %x, size %x, 0) == %x, err %d\n",
2142                     sock, address + (page_length - size),
2143                     size, transferred, err);
2144           if ((transferred == 0) &&
2145               (page_length - size < 512) &&
2146               (page_length - size > 0)) {
2147             // Print null terminated data received, to see who's been
2148             // sending us supicious unwanted data.
2149             address[page_length - size] = 0;
2150             logprintf(0, "Log: received  %d bytes: '%s'\n",
2151                       page_length - size, address);
2152           }
2153         }
2154       }
2155       return false;
2156     }
2157     size = size - transferred;
2158   }
2159   return true;
2160 }
2161
2162 // Network IO work loop. Execute until marked done.
2163 // Return true if the thread ran as expected.
2164 bool NetworkThread::Work() {
2165   logprintf(9, "Log: Starting network thread %d, ip %s\n",
2166             thread_num_,
2167             ipaddr_);
2168
2169   // Make a socket.
2170   int sock = 0;
2171   if (!CreateSocket(&sock))
2172     return false;
2173
2174   // Network IO loop requires network slave thread to have already initialized.
2175   // We will sleep here for awhile to ensure that the slave thread will be
2176   // listening by the time we connect.
2177   // Sleep for 15 seconds.
2178   sat_sleep(15);
2179   logprintf(9, "Log: Starting execution of network thread %d, ip %s\n",
2180             thread_num_,
2181             ipaddr_);
2182
2183
2184   // Connect to a slave thread.
2185   if (!Connect(sock))
2186     return false;
2187
2188   // Loop until done.
2189   bool result = true;
2190   int strict = sat_->strict();
2191   int64 loops = 0;
2192   while (IsReadyToRun()) {
2193     struct page_entry src;
2194     struct page_entry dst;
2195     result = result && sat_->GetValid(&src);
2196     result = result && sat_->GetEmpty(&dst);
2197     if (!result) {
2198       logprintf(0, "Process Error: net_thread failed to pop pages, "
2199                 "bailing\n");
2200       break;
2201     }
2202
2203     // Check data correctness.
2204     if (strict)
2205       CrcCheckPage(&src);
2206
2207     // Do the network write.
2208     if (!(result = result && SendPage(sock, &src)))
2209       break;
2210
2211     // Update pattern reference to reflect new contents.
2212     dst.pattern = src.pattern;
2213
2214     // Do the network read.
2215     if (!(result = result && ReceivePage(sock, &dst)))
2216       break;
2217
2218     // Ensure that the transfer ended up with correct data.
2219     if (strict)
2220       CrcCheckPage(&dst);
2221
2222     // Return all of our pages to the queue.
2223     result = result && sat_->PutValid(&dst);
2224     result = result && sat_->PutEmpty(&src);
2225     if (!result) {
2226       logprintf(0, "Process Error: net_thread failed to push pages, "
2227                 "bailing\n");
2228       break;
2229     }
2230     loops++;
2231   }
2232
2233   pages_copied_ = loops;
2234   status_ = result;
2235
2236   // Clean up.
2237   CloseSocket(sock);
2238
2239   logprintf(9, "Log: Completed %d: network thread status %d, "
2240                "%d pages copied\n",
2241             thread_num_, status_, pages_copied_);
2242   return result;
2243 }
2244
2245 // Spawn slave threads for incoming connections.
2246 bool NetworkListenThread::SpawnSlave(int newsock, int threadid) {
2247   logprintf(12, "Log: Listen thread spawning slave\n");
2248
2249   // Spawn slave thread, to reflect network traffic back to sender.
2250   ChildWorker *child_worker = new ChildWorker;
2251   child_worker->thread.SetSock(newsock);
2252   child_worker->thread.InitThread(threadid, sat_, os_, patternlist_,
2253                                   &child_worker->status);
2254   child_worker->status.Initialize();
2255   child_worker->thread.SpawnThread();
2256   child_workers_.push_back(child_worker);
2257
2258   return true;
2259 }
2260
2261 // Reap slave threads.
2262 bool NetworkListenThread::ReapSlaves() {
2263   bool result = true;
2264   // Gather status and reap threads.
2265   logprintf(12, "Log: Joining all outstanding threads\n");
2266
2267   for (size_t i = 0; i < child_workers_.size(); i++) {
2268     NetworkSlaveThread& child_thread = child_workers_[i]->thread;
2269     logprintf(12, "Log: Joining slave thread %d\n", i);
2270     child_thread.JoinThread();
2271     if (child_thread.GetStatus() != 1) {
2272       logprintf(0, "Process Error: Slave Thread %d failed with status %d\n", i,
2273                 child_thread.GetStatus());
2274       result = false;
2275     }
2276     errorcount_ += child_thread.GetErrorCount();
2277     logprintf(9, "Log: Slave Thread %d found %lld miscompares\n", i,
2278               child_thread.GetErrorCount());
2279     pages_copied_ += child_thread.GetPageCount();
2280   }
2281
2282   return result;
2283 }
2284
2285 // Network listener IO work loop. Execute until marked done.
2286 // Return false on fatal software error.
2287 bool NetworkListenThread::Work() {
2288   logprintf(9, "Log: Starting network listen thread %d\n",
2289             thread_num_);
2290
2291   // Make a socket.
2292   sock_ = 0;
2293   if (!CreateSocket(&sock_)) {
2294     status_ = false;
2295     return false;
2296   }
2297   logprintf(9, "Log: Listen thread created sock\n");
2298
2299   // Allows incoming connections to be queued up by socket library.
2300   int newsock = 0;
2301   Listen();
2302   logprintf(12, "Log: Listen thread waiting for incoming connections\n");
2303
2304   // Wait on incoming connections, and spawn worker threads for them.
2305   int threadcount = 0;
2306   while (IsReadyToRun()) {
2307     // Poll for connections that we can accept().
2308     if (Wait()) {
2309       // Accept those connections.
2310       logprintf(12, "Log: Listen thread found incoming connection\n");
2311       if (GetConnection(&newsock)) {
2312         SpawnSlave(newsock, threadcount);
2313         threadcount++;
2314       }
2315     }
2316   }
2317
2318   // Gather status and join spawned threads.
2319   ReapSlaves();
2320
2321   // Delete the child workers.
2322   for (ChildVector::iterator it = child_workers_.begin();
2323        it != child_workers_.end(); ++it) {
2324     (*it)->status.Destroy();
2325     delete *it;
2326   }
2327   child_workers_.clear();
2328
2329   CloseSocket(sock_);
2330
2331   status_ = true;
2332   logprintf(9,
2333             "Log: Completed %d: network listen thread status %d, "
2334             "%d pages copied\n",
2335             thread_num_, status_, pages_copied_);
2336   return true;
2337 }
2338
2339 // Set network reflector socket struct.
2340 void NetworkSlaveThread::SetSock(int sock) {
2341   sock_ = sock;
2342 }
2343
2344 // Network reflector IO work loop. Execute until marked done.
2345 // Return false on fatal software error.
2346 bool NetworkSlaveThread::Work() {
2347   logprintf(9, "Log: Starting network slave thread %d\n",
2348             thread_num_);
2349
2350   // Verify that we have a socket.
2351   int sock = sock_;
2352   if (!sock) {
2353     status_ = false;
2354     return false;
2355   }
2356
2357   // Loop until done.
2358   int64 loops = 0;
2359   // Init a local buffer for storing data.
2360   void *local_page = NULL;
2361   int result = posix_memalign(&local_page, 512, sat_->page_length());
2362   if (result) {
2363     logprintf(0, "Process Error: net slave posix_memalign "
2364                  "returned %d (fail)\n",
2365               result);
2366     status_ = false;
2367     return false;
2368   }
2369
2370   struct page_entry page;
2371   page.addr = local_page;
2372
2373   // This thread will continue to run as long as the thread on the other end of
2374   // the socket is still sending and receiving data.
2375   while (1) {
2376     // Do the network read.
2377     if (!ReceivePage(sock, &page))
2378       break;
2379
2380     // Do the network write.
2381     if (!SendPage(sock, &page))
2382       break;
2383
2384     loops++;
2385   }
2386
2387   pages_copied_ = loops;
2388   // No results provided from this type of thread.
2389   status_ = true;
2390
2391   // Clean up.
2392   CloseSocket(sock);
2393
2394   logprintf(9,
2395             "Log: Completed %d: network slave thread status %d, "
2396             "%d pages copied\n",
2397             thread_num_, status_, pages_copied_);
2398   return true;
2399 }
2400
2401 // Thread work loop. Execute until marked finished.
2402 bool ErrorPollThread::Work() {
2403   logprintf(9, "Log: Starting system error poll thread %d\n", thread_num_);
2404
2405   // This calls a generic error polling function in the Os abstraction layer.
2406   do {
2407     errorcount_ += os_->ErrorPoll();
2408     os_->ErrorWait();
2409   } while (IsReadyToRun());
2410
2411   logprintf(9, "Log: Finished system error poll thread %d: %d errors\n",
2412             thread_num_, errorcount_);
2413   status_ = true;
2414   return true;
2415 }
2416
2417 // Worker thread to heat up CPU.
2418 // This thread does not evaluate pass/fail or software error.
2419 bool CpuStressThread::Work() {
2420   logprintf(9, "Log: Starting CPU stress thread %d\n", thread_num_);
2421
2422   do {
2423     // Run ludloff's platform/CPU-specific assembly workload.
2424     os_->CpuStressWorkload();
2425     YieldSelf();
2426   } while (IsReadyToRun());
2427
2428   logprintf(9, "Log: Finished CPU stress thread %d:\n",
2429             thread_num_);
2430   status_ = true;
2431   return true;
2432 }
2433
2434 CpuCacheCoherencyThread::CpuCacheCoherencyThread(cc_cacheline_data *data,
2435                                                  int cacheline_count,
2436                                                  int thread_num,
2437                                                  int inc_count) {
2438   cc_cacheline_data_ = data;
2439   cc_cacheline_count_ = cacheline_count;
2440   cc_thread_num_ = thread_num;
2441   cc_inc_count_ = inc_count;
2442 }
2443
2444 // Worked thread to test the cache coherency of the CPUs
2445 // Return false on fatal sw error.
2446 bool CpuCacheCoherencyThread::Work() {
2447   logprintf(9, "Log: Starting the Cache Coherency thread %d\n",
2448             cc_thread_num_);
2449   uint64 time_start, time_end;
2450   struct timeval tv;
2451
2452   unsigned int seed = static_cast<unsigned int>(gettid());
2453   gettimeofday(&tv, NULL);  // Get the timestamp before increments.
2454   time_start = tv.tv_sec * 1000000ULL + tv.tv_usec;
2455
2456   uint64 total_inc = 0;  // Total increments done by the thread.
2457   while (IsReadyToRun()) {
2458     for (int i = 0; i < cc_inc_count_; i++) {
2459       // Choose a datastructure in random and increment the appropriate
2460       // member in that according to the offset (which is the same as the
2461       // thread number.
2462       int r = rand_r(&seed);
2463       r = cc_cacheline_count_ * (r / (RAND_MAX + 1.0));
2464       // Increment the member of the randomely selected structure.
2465       (cc_cacheline_data_[r].num[cc_thread_num_])++;
2466     }
2467
2468     total_inc += cc_inc_count_;
2469
2470     // Calculate if the local counter matches with the global value
2471     // in all the cache line structures for this particular thread.
2472     int cc_global_num = 0;
2473     for (int cline_num = 0; cline_num < cc_cacheline_count_; cline_num++) {
2474       cc_global_num += cc_cacheline_data_[cline_num].num[cc_thread_num_];
2475       // Reset the cachline member's value for the next run.
2476       cc_cacheline_data_[cline_num].num[cc_thread_num_] = 0;
2477     }
2478     if (sat_->error_injection())
2479       cc_global_num = -1;
2480
2481     if (cc_global_num != cc_inc_count_) {
2482       errorcount_++;
2483       logprintf(0, "Hardware Error: global(%d) and local(%d) do not match\n",
2484                 cc_global_num, cc_inc_count_);
2485     }
2486   }
2487   gettimeofday(&tv, NULL);  // Get the timestamp at the end.
2488   time_end = tv.tv_sec * 1000000ULL + tv.tv_usec;
2489
2490   uint64 us_elapsed = time_end - time_start;
2491   // inc_rate is the no. of increments per second.
2492   double inc_rate = total_inc * 1e6 / us_elapsed;
2493
2494   logprintf(4, "Stats: CC Thread(%d): Time=%llu us,"
2495             " Increments=%llu, Increments/sec = %.6lf\n",
2496             cc_thread_num_, us_elapsed, total_inc, inc_rate);
2497   logprintf(9, "Log: Finished CPU Cache Coherency thread %d:\n",
2498             cc_thread_num_);
2499   status_ = true;
2500   return true;
2501 }
2502
2503 DiskThread::DiskThread(DiskBlockTable *block_table) {
2504   read_block_size_ = kSectorSize;   // default 1 sector (512 bytes)
2505   write_block_size_ = kSectorSize;  // this assumes read and write block size
2506                                     // are the same
2507   segment_size_ = -1;               // use the entire disk as one segment
2508   cache_size_ = 16 * 1024 * 1024;   // assume 16MiB cache by default
2509   // Use a queue such that 3/2 times as much data as the cache can hold
2510   // is written before it is read so that there is little chance the read
2511   // data is in the cache.
2512   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2513   blocks_per_segment_ = 32;
2514
2515   read_threshold_ = 100000;         // 100ms is a reasonable limit for
2516   write_threshold_ = 100000;        // reading/writing a sector
2517
2518   read_timeout_ = 5000000;          // 5 seconds should be long enough for a
2519   write_timeout_ = 5000000;         // timout for reading/writing
2520
2521   device_sectors_ = 0;
2522   non_destructive_ = 0;
2523
2524   aio_ctx_ = 0;
2525   block_table_ = block_table;
2526   update_block_table_ = 1;
2527
2528   block_buffer_ = NULL;
2529
2530   blocks_written_ = 0;
2531   blocks_read_ = 0;
2532 }
2533
2534 DiskThread::~DiskThread() {
2535   if (block_buffer_)
2536     free(block_buffer_);
2537 }
2538
2539 // Set filename for device file (in /dev).
2540 void DiskThread::SetDevice(const char *device_name) {
2541   device_name_ = device_name;
2542 }
2543
2544 // Set various parameters that control the behaviour of the test.
2545 // -1 is used as a sentinel value on each parameter (except non_destructive)
2546 // to indicate that the parameter not be set.
2547 bool DiskThread::SetParameters(int read_block_size,
2548                                int write_block_size,
2549                                int64 segment_size,
2550                                int64 cache_size,
2551                                int blocks_per_segment,
2552                                int64 read_threshold,
2553                                int64 write_threshold,
2554                                int non_destructive) {
2555   if (read_block_size != -1) {
2556     // Blocks must be aligned to the disk's sector size.
2557     if (read_block_size % kSectorSize != 0) {
2558       logprintf(0, "Process Error: Block size must be a multiple of %d "
2559                 "(thread %d).\n", kSectorSize, thread_num_);
2560       return false;
2561     }
2562
2563     read_block_size_ = read_block_size;
2564   }
2565
2566   if (write_block_size != -1) {
2567     // Write blocks must be aligned to the disk's sector size and to the
2568     // block size.
2569     if (write_block_size % kSectorSize != 0) {
2570       logprintf(0, "Process Error: Write block size must be a multiple "
2571                 "of %d (thread %d).\n", kSectorSize, thread_num_);
2572       return false;
2573     }
2574     if (write_block_size % read_block_size_ != 0) {
2575       logprintf(0, "Process Error: Write block size must be a multiple "
2576                 "of the read block size, which is %d (thread %d).\n",
2577                 read_block_size_, thread_num_);
2578       return false;
2579     }
2580
2581     write_block_size_ = write_block_size;
2582
2583   } else {
2584     // Make sure write_block_size_ is still valid.
2585     if (read_block_size_ > write_block_size_) {
2586       logprintf(5, "Log: Assuming write block size equal to read block size, "
2587                 "which is %d (thread %d).\n", read_block_size_,
2588                 thread_num_);
2589       write_block_size_ = read_block_size_;
2590     } else {
2591       if (write_block_size_ % read_block_size_ != 0) {
2592         logprintf(0, "Process Error: Write block size (defined as %d) must "
2593                   "be a multiple of the read block size, which is %d "
2594                   "(thread %d).\n", write_block_size_, read_block_size_,
2595                   thread_num_);
2596         return false;
2597       }
2598     }
2599   }
2600
2601   if (cache_size != -1) {
2602     cache_size_ = cache_size;
2603   }
2604
2605   if (blocks_per_segment != -1) {
2606     if (blocks_per_segment <= 0) {
2607       logprintf(0, "Process Error: Blocks per segment must be greater than "
2608                    "zero.\n (thread %d)", thread_num_);
2609       return false;
2610     }
2611
2612     blocks_per_segment_ = blocks_per_segment;
2613   }
2614
2615   if (read_threshold != -1) {
2616     if (read_threshold <= 0) {
2617       logprintf(0, "Process Error: Read threshold must be greater than "
2618                    "zero (thread %d).\n", thread_num_);
2619       return false;
2620     }
2621
2622     read_threshold_ = read_threshold;
2623   }
2624
2625   if (write_threshold != -1) {
2626     if (write_threshold <= 0) {
2627       logprintf(0, "Process Error: Write threshold must be greater than "
2628                    "zero (thread %d).\n", thread_num_);
2629       return false;
2630     }
2631
2632     write_threshold_ = write_threshold;
2633   }
2634
2635   if (segment_size != -1) {
2636     // Segments must be aligned to the disk's sector size.
2637     if (segment_size % kSectorSize != 0) {
2638       logprintf(0, "Process Error: Segment size must be a multiple of %d"
2639                 " (thread %d).\n", kSectorSize, thread_num_);
2640       return false;
2641     }
2642
2643     segment_size_ = segment_size / kSectorSize;
2644   }
2645
2646   non_destructive_ = non_destructive;
2647
2648   // Having a queue of 150% of blocks that will fit in the disk's cache
2649   // should be enough to force out the oldest block before it is read and hence,
2650   // making sure the data comes form the disk and not the cache.
2651   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2652   // Updating DiskBlockTable parameters
2653   if (update_block_table_) {
2654     block_table_->SetParameters(kSectorSize, write_block_size_,
2655                                 device_sectors_, segment_size_,
2656                                 device_name_);
2657   }
2658   return true;
2659 }
2660
2661 // Open a device, return false on failure.
2662 bool DiskThread::OpenDevice(int *pfile) {
2663   int fd = open(device_name_.c_str(),
2664                 O_RDWR | O_SYNC | O_DIRECT | O_LARGEFILE,
2665                 0);
2666   if (fd < 0) {
2667     logprintf(0, "Process Error: Failed to open device %s (thread %d)!!\n",
2668               device_name_.c_str(), thread_num_);
2669     return false;
2670   }
2671   *pfile = fd;
2672
2673   return GetDiskSize(fd);
2674 }
2675
2676 // Retrieves the size (in bytes) of the disk/file.
2677 // Return false on failure.
2678 bool DiskThread::GetDiskSize(int fd) {
2679   struct stat device_stat;
2680   if (fstat(fd, &device_stat) == -1) {
2681     logprintf(0, "Process Error: Unable to fstat disk %s (thread %d).\n",
2682               device_name_.c_str(), thread_num_);
2683     return false;
2684   }
2685
2686   // For a block device, an ioctl is needed to get the size since the size
2687   // of the device file (i.e. /dev/sdb) is 0.
2688   if (S_ISBLK(device_stat.st_mode)) {
2689     uint64 block_size = 0;
2690
2691     if (ioctl(fd, BLKGETSIZE64, &block_size) == -1) {
2692       logprintf(0, "Process Error: Unable to ioctl disk %s (thread %d).\n",
2693                 device_name_.c_str(), thread_num_);
2694       return false;
2695     }
2696
2697     // Zero size indicates nonworking device..
2698     if (block_size == 0) {
2699       os_->ErrorReport(device_name_.c_str(), "device-size-zero", 1);
2700       ++errorcount_;
2701       status_ = true;  // Avoid a procedural error.
2702       return false;
2703     }
2704
2705     device_sectors_ = block_size / kSectorSize;
2706
2707   } else if (S_ISREG(device_stat.st_mode)) {
2708     device_sectors_ = device_stat.st_size / kSectorSize;
2709
2710   } else {
2711     logprintf(0, "Process Error: %s is not a regular file or block "
2712               "device (thread %d).\n", device_name_.c_str(),
2713               thread_num_);
2714     return false;
2715   }
2716
2717   logprintf(12, "Log: Device sectors: %lld on disk %s (thread %d).\n",
2718             device_sectors_, device_name_.c_str(), thread_num_);
2719
2720   if (update_block_table_) {
2721     block_table_->SetParameters(kSectorSize, write_block_size_,
2722                                 device_sectors_, segment_size_,
2723                                 device_name_);
2724   }
2725
2726   return true;
2727 }
2728
2729 bool DiskThread::CloseDevice(int fd) {
2730   close(fd);
2731   return true;
2732 }
2733
2734 // Return the time in microseconds.
2735 int64 DiskThread::GetTime() {
2736   struct timeval tv;
2737   gettimeofday(&tv, NULL);
2738   return tv.tv_sec * 1000000 + tv.tv_usec;
2739 }
2740
2741 // Do randomized reads and (possibly) writes on a device.
2742 // Return false on fatal SW error, true on SW success,
2743 // regardless of whether HW failed.
2744 bool DiskThread::DoWork(int fd) {
2745   int64 block_num = 0;
2746   int64 num_segments;
2747
2748   if (segment_size_ == -1) {
2749     num_segments = 1;
2750   } else {
2751     num_segments = device_sectors_ / segment_size_;
2752     if (device_sectors_ % segment_size_ != 0)
2753       num_segments++;
2754   }
2755
2756   // Disk size should be at least 3x cache size.  See comment later for
2757   // details.
2758   sat_assert(device_sectors_ * kSectorSize > 3 * cache_size_);
2759
2760   // This disk test works by writing blocks with a certain pattern to
2761   // disk, then reading them back and verifying it against the pattern
2762   // at a later time.  A failure happens when either the block cannot
2763   // be written/read or when the read block is different than what was
2764   // written.  If a block takes too long to write/read, then a warning
2765   // is given instead of an error since taking too long is not
2766   // necessarily an error.
2767   //
2768   // To prevent the read blocks from coming from the disk cache,
2769   // enough blocks are written before read such that a block would
2770   // be ejected from the disk cache by the time it is read.
2771   //
2772   // TODO(amistry): Implement some sort of read/write throttling.  The
2773   //                flood of asynchronous I/O requests when a drive is
2774   //                unplugged is causing the application and kernel to
2775   //                become unresponsive.
2776
2777   while (IsReadyToRun()) {
2778     // Write blocks to disk.
2779     logprintf(16, "Log: Write phase %sfor disk %s (thread %d).\n",
2780               non_destructive_ ? "(disabled) " : "",
2781               device_name_.c_str(), thread_num_);
2782     while (IsReadyToRunNoPause() &&
2783            in_flight_sectors_.size() <
2784                static_cast<size_t>(queue_size_ + 1)) {
2785       // Confine testing to a particular segment of the disk.
2786       int64 segment = (block_num / blocks_per_segment_) % num_segments;
2787       if (!non_destructive_ &&
2788           (block_num % blocks_per_segment_ == 0)) {
2789         logprintf(20, "Log: Starting to write segment %lld out of "
2790                   "%lld on disk %s (thread %d).\n",
2791                   segment, num_segments, device_name_.c_str(),
2792                   thread_num_);
2793       }
2794       block_num++;
2795
2796       BlockData *block = block_table_->GetUnusedBlock(segment);
2797
2798       // If an unused sequence of sectors could not be found, skip to the
2799       // next block to process.  Soon, a new segment will come and new
2800       // sectors will be able to be allocated.  This effectively puts a
2801       // minumim on the disk size at 3x the stated cache size, or 48MiB
2802       // if a cache size is not given (since the cache is set as 16MiB
2803       // by default).  Given that todays caches are at the low MiB range
2804       // and drive sizes at the mid GB, this shouldn't pose a problem.
2805       // The 3x minimum comes from the following:
2806       //   1. In order to allocate 'y' blocks from a segment, the
2807       //      segment must contain at least 2y blocks or else an
2808       //      allocation may not succeed.
2809       //   2. Assume the entire disk is one segment.
2810       //   3. A full write phase consists of writing blocks corresponding to
2811       //      3/2 cache size.
2812       //   4. Therefore, the one segment must have 2 * 3/2 * cache
2813       //      size worth of blocks = 3 * cache size worth of blocks
2814       //      to complete.
2815       // In non-destructive mode, don't write anything to disk.
2816       if (!non_destructive_) {
2817         if (!WriteBlockToDisk(fd, block)) {
2818           block_table_->RemoveBlock(block);
2819           return true;
2820         }
2821         blocks_written_++;
2822       }
2823
2824       // Block is either initialized by writing, or in nondestructive case,
2825       // initialized by being added into the datastructure for later reading.
2826       block->SetBlockAsInitialized();
2827
2828       in_flight_sectors_.push(block);
2829     }
2830
2831     // Verify blocks on disk.
2832     logprintf(20, "Log: Read phase for disk %s (thread %d).\n",
2833               device_name_.c_str(), thread_num_);
2834     while (IsReadyToRunNoPause() && !in_flight_sectors_.empty()) {
2835       BlockData *block = in_flight_sectors_.front();
2836       in_flight_sectors_.pop();
2837       if (!ValidateBlockOnDisk(fd, block))
2838         return true;
2839       block_table_->RemoveBlock(block);
2840       blocks_read_++;
2841     }
2842   }
2843
2844   pages_copied_ = blocks_written_ + blocks_read_;
2845   return true;
2846 }
2847
2848 // Do an asynchronous disk I/O operation.
2849 // Return false if the IO is not set up.
2850 bool DiskThread::AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
2851                             int64 offset, int64 timeout) {
2852   // Use the Linux native asynchronous I/O interface for reading/writing.
2853   // A read/write consists of three basic steps:
2854   //    1. create an io context.
2855   //    2. prepare and submit an io request to the context
2856   //    3. wait for an event on the context.
2857
2858   struct {
2859     const int opcode;
2860     const char *op_str;
2861     const char *error_str;
2862   } operations[2] = {
2863     { IO_CMD_PREAD, "read", "disk-read-error" },
2864     { IO_CMD_PWRITE, "write", "disk-write-error" }
2865   };
2866
2867   struct iocb cb;
2868   memset(&cb, 0, sizeof(cb));
2869
2870   cb.aio_fildes = fd;
2871   cb.aio_lio_opcode = operations[op].opcode;
2872   cb.u.c.buf = buf;
2873   cb.u.c.nbytes = size;
2874   cb.u.c.offset = offset;
2875
2876   struct iocb *cbs[] = { &cb };
2877   if (io_submit(aio_ctx_, 1, cbs) != 1) {
2878     int error = errno;
2879     char buf[256];
2880     sat_strerror(error, buf, sizeof(buf));
2881     logprintf(0, "Process Error: Unable to submit async %s "
2882                  "on disk %s (thread %d). Error %d, %s\n",
2883               operations[op].op_str, device_name_.c_str(),
2884               thread_num_, error, buf);
2885     return false;
2886   }
2887
2888   struct io_event event;
2889   memset(&event, 0, sizeof(event));
2890   struct timespec tv;
2891   tv.tv_sec = timeout / 1000000;
2892   tv.tv_nsec = (timeout % 1000000) * 1000;
2893   if (io_getevents(aio_ctx_, 1, 1, &event, &tv) != 1) {
2894     // A ctrl-c from the keyboard will cause io_getevents to fail with an
2895     // EINTR error code.  This is not an error and so don't treat it as such,
2896     // but still log it.
2897     int error = errno;
2898     if (error == EINTR) {
2899       logprintf(5, "Log: %s interrupted on disk %s (thread %d).\n",
2900                 operations[op].op_str, device_name_.c_str(),
2901                 thread_num_);
2902     } else {
2903       os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2904       errorcount_ += 1;
2905       logprintf(0, "Hardware Error: Timeout doing async %s to sectors "
2906                    "starting at %lld on disk %s (thread %d).\n",
2907                 operations[op].op_str, offset / kSectorSize,
2908                 device_name_.c_str(), thread_num_);
2909     }
2910
2911     // Don't bother checking return codes since io_cancel seems to always fail.
2912     // Since io_cancel is always failing, destroying and recreating an I/O
2913     // context is a workaround for canceling an in-progress I/O operation.
2914     // TODO(amistry): Find out why io_cancel isn't working and make it work.
2915     io_cancel(aio_ctx_, &cb, &event);
2916     io_destroy(aio_ctx_);
2917     aio_ctx_ = 0;
2918     if (io_setup(5, &aio_ctx_)) {
2919       int error = errno;
2920       char buf[256];
2921       sat_strerror(error, buf, sizeof(buf));
2922       logprintf(0, "Process Error: Unable to create aio context on disk %s"
2923                 " (thread %d) Error %d, %s\n",
2924                 device_name_.c_str(), thread_num_, error, buf);
2925     }
2926
2927     return false;
2928   }
2929
2930   // event.res contains the number of bytes written/read or
2931   // error if < 0, I think.
2932   if (event.res != static_cast<uint64>(size)) {
2933     errorcount_++;
2934     os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2935
2936     if (event.res < 0) {
2937       switch (event.res) {
2938         case -EIO:
2939           logprintf(0, "Hardware Error: Low-level I/O error while doing %s to "
2940                        "sectors starting at %lld on disk %s (thread %d).\n",
2941                     operations[op].op_str, offset / kSectorSize,
2942                     device_name_.c_str(), thread_num_);
2943           break;
2944         default:
2945           logprintf(0, "Hardware Error: Unknown error while doing %s to "
2946                        "sectors starting at %lld on disk %s (thread %d).\n",
2947                     operations[op].op_str, offset / kSectorSize,
2948                     device_name_.c_str(), thread_num_);
2949       }
2950     } else {
2951       logprintf(0, "Hardware Error: Unable to %s to sectors starting at "
2952                    "%lld on disk %s (thread %d).\n",
2953                 operations[op].op_str, offset / kSectorSize,
2954                 device_name_.c_str(), thread_num_);
2955     }
2956     return false;
2957   }
2958
2959   return true;
2960 }
2961
2962 // Write a block to disk.
2963 // Return false if the block is not written.
2964 bool DiskThread::WriteBlockToDisk(int fd, BlockData *block) {
2965   memset(block_buffer_, 0, block->GetSize());
2966
2967   // Fill block buffer with a pattern
2968   struct page_entry pe;
2969   if (!sat_->GetValid(&pe)) {
2970     // Even though a valid page could not be obatined, it is not an error
2971     // since we can always fill in a pattern directly, albeit slower.
2972     unsigned int *memblock = static_cast<unsigned int *>(block_buffer_);
2973     block->SetPattern(patternlist_->GetRandomPattern());
2974
2975     logprintf(11, "Log: Warning, using pattern fill fallback in "
2976                   "DiskThread::WriteBlockToDisk on disk %s (thread %d).\n",
2977               device_name_.c_str(), thread_num_);
2978
2979     for (int i = 0; i < block->GetSize()/wordsize_; i++) {
2980       memblock[i] = block->GetPattern()->pattern(i);
2981     }
2982   } else {
2983     memcpy(block_buffer_, pe.addr, block->GetSize());
2984     block->SetPattern(pe.pattern);
2985     sat_->PutValid(&pe);
2986   }
2987
2988   logprintf(12, "Log: Writing %lld sectors starting at %lld on disk %s"
2989             " (thread %d).\n",
2990             block->GetSize()/kSectorSize, block->GetAddress(),
2991             device_name_.c_str(), thread_num_);
2992
2993   int64 start_time = GetTime();
2994
2995   if (!AsyncDiskIO(ASYNC_IO_WRITE, fd, block_buffer_, block->GetSize(),
2996                    block->GetAddress() * kSectorSize, write_timeout_)) {
2997     return false;
2998   }
2999
3000   int64 end_time = GetTime();
3001   logprintf(12, "Log: Writing time: %lld us (thread %d).\n",
3002             end_time - start_time, thread_num_);
3003   if (end_time - start_time > write_threshold_) {
3004     logprintf(5, "Log: Write took %lld us which is longer than threshold "
3005                  "%lld us on disk %s (thread %d).\n",
3006               end_time - start_time, write_threshold_, device_name_.c_str(),
3007               thread_num_);
3008   }
3009
3010   return true;
3011 }
3012
3013 // Verify a block on disk.
3014 // Return true if the block was read, also increment errorcount
3015 // if the block had data errors or performance problems.
3016 bool DiskThread::ValidateBlockOnDisk(int fd, BlockData *block) {
3017   int64 blocks = block->GetSize() / read_block_size_;
3018   int64 bytes_read = 0;
3019   int64 current_blocks;
3020   int64 current_bytes;
3021   uint64 address = block->GetAddress();
3022
3023   logprintf(20, "Log: Reading sectors starting at %lld on disk %s "
3024             "(thread %d).\n",
3025             address, device_name_.c_str(), thread_num_);
3026
3027   // Read block from disk and time the read.  If it takes longer than the
3028   // threshold, complain.
3029   if (lseek64(fd, address * kSectorSize, SEEK_SET) == -1) {
3030     logprintf(0, "Process Error: Unable to seek to sector %lld in "
3031               "DiskThread::ValidateSectorsOnDisk on disk %s "
3032               "(thread %d).\n", address, device_name_.c_str(), thread_num_);
3033     return false;
3034   }
3035   int64 start_time = GetTime();
3036
3037   // Split a large write-sized block into small read-sized blocks and
3038   // read them in groups of randomly-sized multiples of read block size.
3039   // This assures all data written on disk by this particular block
3040   // will be tested using a random reading pattern.
3041   while (blocks != 0) {
3042     // Test all read blocks in a written block.
3043     current_blocks = (random() % blocks) + 1;
3044     current_bytes = current_blocks * read_block_size_;
3045
3046     memset(block_buffer_, 0, current_bytes);
3047
3048     logprintf(20, "Log: Reading %lld sectors starting at sector %lld on "
3049               "disk %s (thread %d)\n",
3050               current_bytes / kSectorSize,
3051               (address * kSectorSize + bytes_read) / kSectorSize,
3052               device_name_.c_str(), thread_num_);
3053
3054     if (!AsyncDiskIO(ASYNC_IO_READ, fd, block_buffer_, current_bytes,
3055                      address * kSectorSize + bytes_read,
3056                      write_timeout_)) {
3057       return false;
3058     }
3059
3060     int64 end_time = GetTime();
3061     logprintf(20, "Log: Reading time: %lld us (thread %d).\n",
3062               end_time - start_time, thread_num_);
3063     if (end_time - start_time > read_threshold_) {
3064       logprintf(5, "Log: Read took %lld us which is longer than threshold "
3065                 "%lld us on disk %s (thread %d).\n",
3066                 end_time - start_time, read_threshold_,
3067                 device_name_.c_str(), thread_num_);
3068     }
3069
3070     // In non-destructive mode, don't compare the block to the pattern since
3071     // the block was never written to disk in the first place.
3072     if (!non_destructive_) {
3073       if (CheckRegion(block_buffer_, block->GetPattern(), current_bytes,
3074                       0, bytes_read)) {
3075         os_->ErrorReport(device_name_.c_str(), "disk-pattern-error", 1);
3076         errorcount_ += 1;
3077         logprintf(0, "Hardware Error: Pattern mismatch in block starting at "
3078                   "sector %lld in DiskThread::ValidateSectorsOnDisk on "
3079                   "disk %s (thread %d).\n",
3080                   address, device_name_.c_str(), thread_num_);
3081       }
3082     }
3083
3084     bytes_read += current_blocks * read_block_size_;
3085     blocks -= current_blocks;
3086   }
3087
3088   return true;
3089 }
3090
3091 // Direct device access thread.
3092 // Return false on software error.
3093 bool DiskThread::Work() {
3094   int fd;
3095
3096   logprintf(9, "Log: Starting disk thread %d, disk %s\n",
3097             thread_num_, device_name_.c_str());
3098
3099   srandom(time(NULL));
3100
3101   if (!OpenDevice(&fd)) {
3102     status_ = false;
3103     return false;
3104   }
3105
3106   // Allocate a block buffer aligned to 512 bytes since the kernel requires it
3107   // when using direst IO.
3108   int memalign_result = posix_memalign(&block_buffer_, kBufferAlignment,
3109                               sat_->page_length());
3110   if (memalign_result) {
3111     CloseDevice(fd);
3112     logprintf(0, "Process Error: Unable to allocate memory for buffers "
3113                  "for disk %s (thread %d) posix memalign returned %d.\n",
3114               device_name_.c_str(), thread_num_, memalign_result);
3115     status_ = false;
3116     return false;
3117   }
3118
3119   if (io_setup(5, &aio_ctx_)) {
3120     CloseDevice(fd);
3121     logprintf(0, "Process Error: Unable to create aio context for disk %s"
3122               " (thread %d).\n",
3123               device_name_.c_str(), thread_num_);
3124     status_ = false;
3125     return false;
3126   }
3127
3128   bool result = DoWork(fd);
3129
3130   status_ = result;
3131
3132   io_destroy(aio_ctx_);
3133   CloseDevice(fd);
3134
3135   logprintf(9, "Log: Completed %d (disk %s): disk thread status %d, "
3136                "%d pages copied\n",
3137             thread_num_, device_name_.c_str(), status_, pages_copied_);
3138   return result;
3139 }
3140
3141 RandomDiskThread::RandomDiskThread(DiskBlockTable *block_table)
3142     : DiskThread(block_table) {
3143   update_block_table_ = 0;
3144 }
3145
3146 RandomDiskThread::~RandomDiskThread() {
3147 }
3148
3149 // Workload for random disk thread.
3150 bool RandomDiskThread::DoWork(int fd) {
3151   logprintf(11, "Log: Random phase for disk %s (thread %d).\n",
3152             device_name_.c_str(), thread_num_);
3153   while (IsReadyToRun()) {
3154     BlockData *block = block_table_->GetRandomBlock();
3155     if (block == NULL) {
3156       logprintf(12, "Log: No block available for device %s (thread %d).\n",
3157                 device_name_.c_str(), thread_num_);
3158     } else {
3159       ValidateBlockOnDisk(fd, block);
3160       block_table_->ReleaseBlock(block);
3161       blocks_read_++;
3162     }
3163   }
3164   pages_copied_ = blocks_read_;
3165   return true;
3166 }
3167
3168 MemoryRegionThread::MemoryRegionThread() {
3169   error_injection_ = false;
3170   pages_ = NULL;
3171 }
3172
3173 MemoryRegionThread::~MemoryRegionThread() {
3174   if (pages_ != NULL)
3175     delete pages_;
3176 }
3177
3178 // Set a region of memory or MMIO to be tested.
3179 // Return false if region could not be mapped.
3180 bool MemoryRegionThread::SetRegion(void *region, int64 size) {
3181   int plength = sat_->page_length();
3182   int npages = size / plength;
3183   if (size % plength) {
3184     logprintf(0, "Process Error: region size is not a multiple of SAT "
3185               "page length\n");
3186     return false;
3187   } else {
3188     if (pages_ != NULL)
3189       delete pages_;
3190     pages_ = new PageEntryQueue(npages);
3191     char *base_addr = reinterpret_cast<char*>(region);
3192     region_ = base_addr;
3193     for (int i = 0; i < npages; i++) {
3194       struct page_entry pe;
3195       init_pe(&pe);
3196       pe.addr = reinterpret_cast<void*>(base_addr + i * plength);
3197       pe.offset = i * plength;
3198
3199       pages_->Push(&pe);
3200     }
3201     return true;
3202   }
3203 }
3204
3205 // More detailed error printout for hardware errors in memory or MMIO
3206 // regions.
3207 void MemoryRegionThread::ProcessError(struct ErrorRecord *error,
3208                                       int priority,
3209                                       const char *message) {
3210   uint32 buffer_offset;
3211   if (phase_ == kPhaseCopy) {
3212     // If the error occurred on the Copy Phase, it means that
3213     // the source data (i.e., the main memory) is wrong. so
3214     // just pass it to the original ProcessError to call a
3215     // bad-dimm error
3216     WorkerThread::ProcessError(error, priority, message);
3217   } else if (phase_ == kPhaseCheck) {
3218     // A error on the Check Phase means that the memory region tested
3219     // has an error. Gathering more information and then reporting
3220     // the error.
3221     // Determine if this is a write or read error.
3222     os_->Flush(error->vaddr);
3223     error->reread = *(error->vaddr);
3224     char *good = reinterpret_cast<char*>(&(error->expected));
3225     char *bad = reinterpret_cast<char*>(&(error->actual));
3226     sat_assert(error->expected != error->actual);
3227     unsigned int offset = 0;
3228     for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
3229       if (good[offset] != bad[offset])
3230         break;
3231     }
3232
3233     error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
3234
3235     buffer_offset = error->vbyteaddr - region_;
3236
3237     // Find physical address if possible.
3238     error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
3239     logprintf(priority,
3240               "%s: miscompare on %s, CRC check at %p(0x%llx), "
3241               "offset %llx: read:0x%016llx, reread:0x%016llx "
3242               "expected:0x%016llx\n",
3243               message,
3244               identifier_.c_str(),
3245               error->vaddr,
3246               error->paddr,
3247               buffer_offset,
3248               error->actual,
3249               error->reread,
3250               error->expected);
3251   } else {
3252     logprintf(0, "Process Error: memory region thread raised an "
3253               "unexpected error.");
3254   }
3255 }
3256
3257 // Workload for testion memory or MMIO regions.
3258 // Return false on software error.
3259 bool MemoryRegionThread::Work() {
3260   struct page_entry source_pe;
3261   struct page_entry memregion_pe;
3262   bool result = true;
3263   int64 loops = 0;
3264   const uint64 error_constant = 0x00ba00000000ba00LL;
3265
3266   // For error injection.
3267   int64 *addr = 0x0;
3268   int offset = 0;
3269   int64 data = 0;
3270
3271   logprintf(9, "Log: Starting Memory Region thread %d\n", thread_num_);
3272
3273   while (IsReadyToRun()) {
3274     // Getting pages from SAT and queue.
3275     phase_ = kPhaseNoPhase;
3276     result = result && sat_->GetValid(&source_pe);
3277     if (!result) {
3278       logprintf(0, "Process Error: memory region thread failed to pop "
3279                 "pages from SAT, bailing\n");
3280       break;
3281     }
3282
3283     result = result && pages_->PopRandom(&memregion_pe);
3284     if (!result) {
3285       logprintf(0, "Process Error: memory region thread failed to pop "
3286                 "pages from queue, bailing\n");
3287       break;
3288     }
3289
3290     // Error injection for CRC copy.
3291     if ((sat_->error_injection() || error_injection_) && loops == 1) {
3292       addr = reinterpret_cast<int64*>(source_pe.addr);
3293       offset = random() % (sat_->page_length() / wordsize_);
3294       data = addr[offset];
3295       addr[offset] = error_constant;
3296     }
3297
3298     // Copying SAT page into memory region.
3299     phase_ = kPhaseCopy;
3300     CrcCopyPage(&memregion_pe, &source_pe);
3301     memregion_pe.pattern = source_pe.pattern;
3302
3303     // Error injection for CRC Check.
3304     if ((sat_->error_injection() || error_injection_) && loops == 2) {
3305       addr = reinterpret_cast<int64*>(memregion_pe.addr);
3306       offset = random() % (sat_->page_length() / wordsize_);
3307       data = addr[offset];
3308       addr[offset] = error_constant;
3309     }
3310
3311     // Checking page content in memory region.
3312     phase_ = kPhaseCheck;
3313     CrcCheckPage(&memregion_pe);
3314
3315     phase_ = kPhaseNoPhase;
3316     // Storing pages on their proper queues.
3317     result = result && sat_->PutValid(&source_pe);
3318     if (!result) {
3319       logprintf(0, "Process Error: memory region thread failed to push "
3320                 "pages into SAT, bailing\n");
3321       break;
3322     }
3323     result = result && pages_->Push(&memregion_pe);
3324     if (!result) {
3325       logprintf(0, "Process Error: memory region thread failed to push "
3326                 "pages into queue, bailing\n");
3327       break;
3328     }
3329
3330     if ((sat_->error_injection() || error_injection_) &&
3331         loops >= 1 && loops <= 2) {
3332       addr[offset] = data;
3333     }
3334
3335     loops++;
3336     YieldSelf();
3337   }
3338
3339   pages_copied_ = loops;
3340   status_ = result;
3341   logprintf(9, "Log: Completed %d: Memory Region thread. Status %d, %d "
3342             "pages checked\n", thread_num_, status_, pages_copied_);
3343   return result;
3344 }