chiark / gitweb /
c568064175937009b899802b555c9e1b1ccdc492
[stressapptest] / src / worker.cc
1 // Copyright 2006 Google Inc. All Rights Reserved.
2
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6
7 //      http://www.apache.org/licenses/LICENSE-2.0
8
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // worker.cc : individual tasks that can be run in combination to
16 // stress the system
17
18 #include <errno.h>
19 #include <pthread.h>
20 #include <sched.h>
21 #include <signal.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <string.h>
26 #include <time.h>
27 #include <unistd.h>
28
29 #include <sys/select.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <sys/times.h>
33
34 // These are necessary, but on by default
35 // #define __USE_GNU
36 // #define __USE_LARGEFILE64
37 #include <fcntl.h>
38 #include <sys/socket.h>
39 #include <netdb.h>
40 #include <arpa/inet.h>
41 #include <linux/unistd.h>  // for gettid
42
43 // For size of block device
44 #include <sys/ioctl.h>
45 #include <linux/fs.h>
46 // For asynchronous I/O
47 #include <libaio.h>
48
49 #include <sys/syscall.h>
50
51 #include <set>
52 #include <string>
53
54 // This file must work with autoconf on its public version,
55 // so these includes are correct.
56 #include "error_diag.h"  // NOLINT
57 #include "os.h"          // NOLINT
58 #include "pattern.h"     // NOLINT
59 #include "queue.h"       // NOLINT
60 #include "sat.h"         // NOLINT
61 #include "sattypes.h"    // NOLINT
62 #include "worker.h"      // NOLINT
63
64 // Syscalls
65 // Why ubuntu, do you hate gettid so bad?
66 #if !defined(__NR_gettid)
67   #define __NR_gettid             224
68 #endif
69
70 #define gettid() syscall(__NR_gettid)
71 #if !defined(CPU_SETSIZE)
72 _syscall3(int, sched_getaffinity, pid_t, pid,
73           unsigned int, len, cpu_set_t*, mask)
74 _syscall3(int, sched_setaffinity, pid_t, pid,
75           unsigned int, len, cpu_set_t*, mask)
76 #endif
77
78 // Linux aio syscalls.
79 #if !defined(__NR_io_setup)
80 #error "No aio headers inculded, please install libaio."
81 #endif
82
83 namespace {
84   // Get HW core ID from cpuid instruction.
85   inline int apicid(void) {
86     int cpu;
87 #if defined(STRESSAPPTEST_CPU_X86_64) || defined(STRESSAPPTEST_CPU_I686)
88     __asm __volatile("cpuid" : "=b" (cpu) : "a" (1) : "cx", "dx");
89 #else
90   #warning "Unsupported CPU type: unable to determine core ID."
91     cpu = 0;
92 #endif
93     return (cpu >> 24);
94   }
95
96   // Work around the sad fact that there are two (gnu, xsi) incompatible
97   // versions of strerror_r floating around google. Awesome.
98   bool sat_strerror(int err, char *buf, int len) {
99     buf[0] = 0;
100     char *errmsg = reinterpret_cast<char*>(strerror_r(err, buf, len));
101     int retval = reinterpret_cast<int64>(errmsg);
102     if (retval == 0)
103       return true;
104     if (retval == -1)
105       return false;
106     if (errmsg != buf) {
107       strncpy(buf, errmsg, len);
108       buf[len - 1] = 0;
109     }
110     return true;
111   }
112
113
114   inline uint64 addr_to_tag(void *address) {
115     return reinterpret_cast<uint64>(address);
116   }
117 }
118
119 #if !defined(O_DIRECT)
120 // Sometimes this isn't available.
121 // Disregard if it's not defined.
122   #define O_DIRECT            0
123 #endif
124
125 // A struct to hold captured errors, for later reporting.
126 struct ErrorRecord {
127   uint64 actual;  // This is the actual value read.
128   uint64 reread;  // This is the actual value, reread.
129   uint64 expected;  // This is what it should have been.
130   uint64 *vaddr;  // This is where it was (or wasn't).
131   char *vbyteaddr;  // This is byte specific where the data was (or wasn't).
132   uint64 paddr;  // This is the bus address, if available.
133   uint64 *tagvaddr;  // This holds the tag value if this data was tagged.
134   uint64 tagpaddr;  // This holds the physical address corresponding to the tag.
135 };
136
137 // This is a helper function to create new threads with pthreads.
138 static void *ThreadSpawnerGeneric(void *ptr) {
139   WorkerThread *worker = static_cast<WorkerThread*>(ptr);
140   worker->StartRoutine();
141   return NULL;
142 }
143
144 void WorkerStatus::Initialize() {
145   sat_assert(0 == pthread_mutex_init(&num_workers_mutex_, NULL));
146   sat_assert(0 == pthread_rwlock_init(&status_rwlock_, NULL));
147   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL,
148                                        num_workers_ + 1));
149 }
150
151 void WorkerStatus::Destroy() {
152   sat_assert(0 == pthread_mutex_destroy(&num_workers_mutex_));
153   sat_assert(0 == pthread_rwlock_destroy(&status_rwlock_));
154   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
155 }
156
157 void WorkerStatus::PauseWorkers() {
158   if (SetStatus(PAUSE) != PAUSE)
159     WaitOnPauseBarrier();
160 }
161
162 void WorkerStatus::ResumeWorkers() {
163   if (SetStatus(RUN) == PAUSE)
164     WaitOnPauseBarrier();
165 }
166
167 void WorkerStatus::StopWorkers() {
168   if (SetStatus(STOP) == PAUSE)
169     WaitOnPauseBarrier();
170 }
171
172 bool WorkerStatus::ContinueRunning() {
173   // This loop is an optimization.  We use it to immediately re-check the status
174   // after resuming from a pause, instead of returning and waiting for the next
175   // call to this function.
176   for (;;) {
177     switch (GetStatus()) {
178       case RUN:
179         return true;
180       case PAUSE:
181         // Wait for the other workers to call this function so that
182         // PauseWorkers() can return.
183         WaitOnPauseBarrier();
184         // Wait for ResumeWorkers() to be called.
185         WaitOnPauseBarrier();
186         break;
187       case STOP:
188         return false;
189     }
190   }
191 }
192
193 bool WorkerStatus::ContinueRunningNoPause() {
194   return (GetStatus() != STOP);
195 }
196
197 void WorkerStatus::RemoveSelf() {
198   // Acquire a read lock on status_rwlock_ while (status_ != PAUSE).
199   for (;;) {
200     AcquireStatusReadLock();
201     if (status_ != PAUSE)
202       break;
203     // We need to obey PauseWorkers() just like ContinueRunning() would, so that
204     // the other threads won't wait on pause_barrier_ forever.
205     ReleaseStatusLock();
206     // Wait for the other workers to call this function so that PauseWorkers()
207     // can return.
208     WaitOnPauseBarrier();
209     // Wait for ResumeWorkers() to be called.
210     WaitOnPauseBarrier();
211   }
212
213   // This lock would be unnecessary if we held a write lock instead of a read
214   // lock on status_rwlock_, but that would also force all threads calling
215   // ContinueRunning() to wait on this one.  Using a separate lock avoids that.
216   AcquireNumWorkersLock();
217   // Decrement num_workers_ and reinitialize pause_barrier_, which we know isn't
218   // in use because (status != PAUSE).
219   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
220   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL, num_workers_));
221   --num_workers_;
222   ReleaseNumWorkersLock();
223
224   // Release status_rwlock_.
225   ReleaseStatusLock();
226 }
227
228
229 // Parent thread class.
230 WorkerThread::WorkerThread() {
231   status_ = false;
232   pages_copied_ = 0;
233   errorcount_ = 0;
234   runduration_usec_ = 1;
235   priority_ = Normal;
236   worker_status_ = NULL;
237   thread_spawner_ = &ThreadSpawnerGeneric;
238   tag_mode_ = false;
239 }
240
241 WorkerThread::~WorkerThread() {}
242
243 // Constructors. Just init some default values.
244 FillThread::FillThread() {
245   num_pages_to_fill_ = 0;
246 }
247
248 // Initialize file name to empty.
249 FileThread::FileThread() {
250   filename_ = "";
251   devicename_ = "";
252   pass_ = 0;
253   page_io_ = true;
254   crc_page_ = -1;
255   local_page_ = NULL;
256 }
257
258 // If file thread used bounce buffer in memory, account for the extra
259 // copy for memory bandwidth calculation.
260 float FileThread::GetMemoryCopiedData() {
261   if (!os_->normal_mem())
262     return GetCopiedData();
263   else
264     return 0;
265 }
266
267 // Initialize target hostname to be invalid.
268 NetworkThread::NetworkThread() {
269   snprintf(ipaddr_, sizeof(ipaddr_), "Unknown");
270   sock_ = 0;
271 }
272
273 // Initialize?
274 NetworkSlaveThread::NetworkSlaveThread() {
275 }
276
277 // Initialize?
278 NetworkListenThread::NetworkListenThread() {
279 }
280
281 // Init member variables.
282 void WorkerThread::InitThread(int thread_num_init,
283                               class Sat *sat_init,
284                               class OsLayer *os_init,
285                               class PatternList *patternlist_init,
286                               WorkerStatus *worker_status) {
287   sat_assert(worker_status);
288   worker_status->AddWorkers(1);
289
290   thread_num_ = thread_num_init;
291   sat_ = sat_init;
292   os_ = os_init;
293   patternlist_ = patternlist_init;
294   worker_status_ = worker_status;
295
296   AvailableCpus(&cpu_mask_);
297   tag_ = 0xffffffff;
298
299   tag_mode_ = sat_->tag_mode();
300 }
301
302
303 // Use pthreads to prioritize a system thread.
304 bool WorkerThread::InitPriority() {
305   // This doesn't affect performance that much, and may not be too safe.
306
307   bool ret = BindToCpus(&cpu_mask_);
308   if (!ret)
309     logprintf(11, "Log: Bind to %s failed.\n",
310               cpuset_format(&cpu_mask_).c_str());
311
312   logprintf(11, "Log: Thread %d running on apic ID %d mask %s (%s).\n",
313             thread_num_, apicid(),
314             CurrentCpusFormat().c_str(),
315             cpuset_format(&cpu_mask_).c_str());
316 #if 0
317   if (priority_ == High) {
318     sched_param param;
319     param.sched_priority = 1;
320     // Set the priority; others are unchanged.
321     logprintf(0, "Log: Changing priority to SCHED_FIFO %d\n",
322               param.sched_priority);
323     if (sched_setscheduler(0, SCHED_FIFO, &param)) {
324       char buf[256];
325       sat_strerror(errno, buf, sizeof(buf));
326       logprintf(0, "Process Error: sched_setscheduler "
327                    "failed - error %d %s\n",
328                 errno, buf);
329     }
330   }
331 #endif
332   return true;
333 }
334
335 // Use pthreads to create a system thread.
336 int WorkerThread::SpawnThread() {
337   // Create the new thread.
338   int result = pthread_create(&thread_, NULL, thread_spawner_, this);
339   if (result) {
340     char buf[256];
341     sat_strerror(result, buf, sizeof(buf));
342     logprintf(0, "Process Error: pthread_create "
343                   "failed - error %d %s\n", result,
344               buf);
345     status_ = false;
346     return false;
347   }
348
349   // 0 is pthreads success.
350   return true;
351 }
352
353 // Kill the worker thread with SIGINT.
354 bool WorkerThread::KillThread() {
355   return (pthread_kill(thread_, SIGINT) == 0);
356 }
357
358 // Block until thread has exited.
359 bool WorkerThread::JoinThread() {
360   int result = pthread_join(thread_, NULL);
361
362   if (result) {
363     logprintf(0, "Process Error: pthread_join failed - error %d\n", result);
364     status_ = false;
365   }
366
367   // 0 is pthreads success.
368   return (!result);
369 }
370
371
372 void WorkerThread::StartRoutine() {
373   InitPriority();
374   StartThreadTimer();
375   Work();
376   StopThreadTimer();
377   worker_status_->RemoveSelf();
378 }
379
380
381 // Thread work loop. Execute until marked finished.
382 bool WorkerThread::Work() {
383   do {
384     logprintf(9, "Log: ...\n");
385     // Sleep for 1 second.
386     sat_sleep(1);
387   } while (IsReadyToRun());
388
389   return false;
390 }
391
392
393 // Returns CPU mask of CPUs available to this process,
394 // Conceptually, each bit represents a logical CPU, ie:
395 //   mask = 3  (11b):   cpu0, 1
396 //   mask = 13 (1101b): cpu0, 2, 3
397 bool WorkerThread::AvailableCpus(cpu_set_t *cpuset) {
398   CPU_ZERO(cpuset);
399   return sched_getaffinity(getppid(), sizeof(*cpuset), cpuset) == 0;
400 }
401
402
403 // Returns CPU mask of CPUs this thread is bound to,
404 // Conceptually, each bit represents a logical CPU, ie:
405 //   mask = 3  (11b):   cpu0, 1
406 //   mask = 13 (1101b): cpu0, 2, 3
407 bool WorkerThread::CurrentCpus(cpu_set_t *cpuset) {
408   CPU_ZERO(cpuset);
409   return sched_getaffinity(0, sizeof(*cpuset), cpuset) == 0;
410 }
411
412
413 // Bind worker thread to specified CPU(s)
414 //   Args:
415 //     thread_mask: cpu_set_t representing CPUs, ie
416 //                  mask = 1  (01b):   cpu0
417 //                  mask = 3  (11b):   cpu0, 1
418 //                  mask = 13 (1101b): cpu0, 2, 3
419 //
420 //   Returns true on success, false otherwise.
421 bool WorkerThread::BindToCpus(const cpu_set_t *thread_mask) {
422   cpu_set_t process_mask;
423   AvailableCpus(&process_mask);
424   if (cpuset_isequal(thread_mask, &process_mask))
425     return true;
426
427   logprintf(11, "Log: available CPU mask - %s\n",
428             cpuset_format(&process_mask).c_str());
429   if (!cpuset_issubset(thread_mask, &process_mask)) {
430     // Invalid cpu_mask, ie cpu not allocated to this process or doesn't exist.
431     logprintf(0, "Log: requested CPUs %s not a subset of available %s\n",
432               cpuset_format(thread_mask).c_str(),
433               cpuset_format(&process_mask).c_str());
434     return false;
435   }
436   return (sched_setaffinity(gettid(), sizeof(*thread_mask), thread_mask) == 0);
437 }
438
439
440 // A worker thread can yield itself to give up CPU until it's scheduled again.
441 //   Returns true on success, false on error.
442 bool WorkerThread::YieldSelf() {
443   return (sched_yield() == 0);
444 }
445
446
447 // Fill this page with its pattern.
448 bool WorkerThread::FillPage(struct page_entry *pe) {
449   // Error check arguments.
450   if (pe == 0) {
451     logprintf(0, "Process Error: Fill Page entry null\n");
452     return 0;
453   }
454
455   // Mask is the bitmask of indexes used by the pattern.
456   // It is the pattern size -1. Size is always a power of 2.
457   uint64 *memwords = static_cast<uint64*>(pe->addr);
458   int length = sat_->page_length();
459
460   if (tag_mode_) {
461     // Select tag or data as appropriate.
462     for (int i = 0; i < length / wordsize_; i++) {
463       datacast_t data;
464
465       if ((i & 0x7) == 0) {
466         data.l64 = addr_to_tag(&memwords[i]);
467       } else {
468         data.l32.l = pe->pattern->pattern(i << 1);
469         data.l32.h = pe->pattern->pattern((i << 1) + 1);
470       }
471       memwords[i] = data.l64;
472     }
473   } else {
474     // Just fill in untagged data directly.
475     for (int i = 0; i < length / wordsize_; i++) {
476       datacast_t data;
477
478       data.l32.l = pe->pattern->pattern(i << 1);
479       data.l32.h = pe->pattern->pattern((i << 1) + 1);
480       memwords[i] = data.l64;
481     }
482   }
483
484   return 1;
485 }
486
487
488 // Tell the thread how many pages to fill.
489 void FillThread::SetFillPages(int64 num_pages_to_fill_init) {
490   num_pages_to_fill_ = num_pages_to_fill_init;
491 }
492
493 // Fill this page with a random pattern.
494 bool FillThread::FillPageRandom(struct page_entry *pe) {
495   // Error check arguments.
496   if (pe == 0) {
497     logprintf(0, "Process Error: Fill Page entry null\n");
498     return 0;
499   }
500   if ((patternlist_ == 0) || (patternlist_->Size() == 0)) {
501     logprintf(0, "Process Error: No data patterns available\n");
502     return 0;
503   }
504
505   // Choose a random pattern for this block.
506   pe->pattern = patternlist_->GetRandomPattern();
507   if (pe->pattern == 0) {
508     logprintf(0, "Process Error: Null data pattern\n");
509     return 0;
510   }
511
512   // Actually fill the page.
513   return FillPage(pe);
514 }
515
516
517 // Memory fill work loop. Execute until alloted pages filled.
518 bool FillThread::Work() {
519   bool result = true;
520
521   logprintf(9, "Log: Starting fill thread %d\n", thread_num_);
522
523   // We want to fill num_pages_to_fill pages, and
524   // stop when we've filled that many.
525   // We also want to capture early break
526   struct page_entry pe;
527   int64 loops = 0;
528   while (IsReadyToRun() && (loops < num_pages_to_fill_)) {
529     result = result && sat_->GetEmpty(&pe);
530     if (!result) {
531       logprintf(0, "Process Error: fill_thread failed to pop pages, "
532                 "bailing\n");
533       break;
534     }
535
536     // Fill the page with pattern
537     result = result && FillPageRandom(&pe);
538     if (!result) break;
539
540     // Put the page back on the queue.
541     result = result && sat_->PutValid(&pe);
542     if (!result) {
543       logprintf(0, "Process Error: fill_thread failed to push pages, "
544                 "bailing\n");
545       break;
546     }
547     loops++;
548   }
549
550   // Fill in thread status.
551   pages_copied_ = loops;
552   status_ = result;
553   logprintf(9, "Log: Completed %d: Fill thread. Status %d, %d pages filled\n",
554             thread_num_, status_, pages_copied_);
555   return result;
556 }
557
558
559 // Print error information about a data miscompare.
560 void WorkerThread::ProcessError(struct ErrorRecord *error,
561                                 int priority,
562                                 const char *message) {
563   char dimm_string[256] = "";
564
565   int apic_id = apicid();
566
567   // Determine if this is a write or read error.
568   os_->Flush(error->vaddr);
569   error->reread = *(error->vaddr);
570
571   char *good = reinterpret_cast<char*>(&(error->expected));
572   char *bad = reinterpret_cast<char*>(&(error->actual));
573
574   sat_assert(error->expected != error->actual);
575   unsigned int offset = 0;
576   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
577     if (good[offset] != bad[offset])
578       break;
579   }
580
581   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
582
583   // Find physical address if possible.
584   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
585
586   // Pretty print DIMM mapping if available.
587   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
588
589   // Report parseable error.
590   if (priority < 5) {
591     // Run miscompare error through diagnoser for logging and reporting.
592     os_->error_diagnoser_->AddMiscompareError(dimm_string,
593                                               reinterpret_cast<uint64>
594                                               (error->vaddr), 1);
595
596     logprintf(priority,
597               "%s: miscompare on CPU %d(0x%s) at %p(0x%llx:%s): "
598               "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
599               message,
600               apic_id,
601               CurrentCpusFormat().c_str(),
602               error->vaddr,
603               error->paddr,
604               dimm_string,
605               error->actual,
606               error->reread,
607               error->expected);
608   }
609
610
611   // Overwrite incorrect data with correct data to prevent
612   // future miscompares when this data is reused.
613   *(error->vaddr) = error->expected;
614   os_->Flush(error->vaddr);
615 }
616
617
618
619 // Print error information about a data miscompare.
620 void FileThread::ProcessError(struct ErrorRecord *error,
621                               int priority,
622                               const char *message) {
623   char dimm_string[256] = "";
624
625   // Determine if this is a write or read error.
626   os_->Flush(error->vaddr);
627   error->reread = *(error->vaddr);
628
629   char *good = reinterpret_cast<char*>(&(error->expected));
630   char *bad = reinterpret_cast<char*>(&(error->actual));
631
632   sat_assert(error->expected != error->actual);
633   unsigned int offset = 0;
634   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
635     if (good[offset] != bad[offset])
636       break;
637   }
638
639   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
640
641   // Find physical address if possible.
642   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
643
644   // Pretty print DIMM mapping if available.
645   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
646
647   // If crc_page_ is valid, ie checking content read back from file,
648   // track src/dst memory addresses. Otherwise catagorize as general
649   // mememory miscompare for CRC checking everywhere else.
650   if (crc_page_ != -1) {
651     int miscompare_byteoffset = static_cast<char*>(error->vbyteaddr) -
652                                 static_cast<char*>(page_recs_[crc_page_].dst);
653     os_->error_diagnoser_->AddHDDMiscompareError(devicename_,
654                                                  crc_page_,
655                                                  miscompare_byteoffset,
656                                                  page_recs_[crc_page_].src,
657                                                  page_recs_[crc_page_].dst);
658   } else {
659     os_->error_diagnoser_->AddMiscompareError(dimm_string,
660                                               reinterpret_cast<uint64>
661                                               (error->vaddr), 1);
662   }
663
664   logprintf(priority,
665             "%s: miscompare on %s at %p(0x%llx:%s): read:0x%016llx, "
666             "reread:0x%016llx expected:0x%016llx\n",
667             message,
668             devicename_.c_str(),
669             error->vaddr,
670             error->paddr,
671             dimm_string,
672             error->actual,
673             error->reread,
674             error->expected);
675
676   // Overwrite incorrect data with correct data to prevent
677   // future miscompares when this data is reused.
678   *(error->vaddr) = error->expected;
679   os_->Flush(error->vaddr);
680 }
681
682
683 // Do a word by word result check of a region.
684 // Print errors on mismatches.
685 int WorkerThread::CheckRegion(void *addr,
686                               class Pattern *pattern,
687                               int64 length,
688                               int offset,
689                               int64 pattern_offset) {
690   uint64 *memblock = static_cast<uint64*>(addr);
691   const int kErrorLimit = 128;
692   int errors = 0;
693   int overflowerrors = 0;  // Count of overflowed errors.
694   bool page_error = false;
695   string errormessage("Hardware Error");
696   struct ErrorRecord
697     recorded[kErrorLimit];  // Queued errors for later printing.
698
699   // For each word in the data region.
700   for (int i = 0; i < length / wordsize_; i++) {
701     uint64 actual = memblock[i];
702     uint64 expected;
703
704     // Determine the value that should be there.
705     datacast_t data;
706     int index = 2 * i + pattern_offset;
707     data.l32.l = pattern->pattern(index);
708     data.l32.h = pattern->pattern(index + 1);
709     expected = data.l64;
710     // Check tags if necessary.
711     if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
712       expected = addr_to_tag(&memblock[i]);
713     }
714
715
716     // If the value is incorrect, save an error record for later printing.
717     if (actual != expected) {
718       if (errors < kErrorLimit) {
719         recorded[errors].actual = actual;
720         recorded[errors].expected = expected;
721         recorded[errors].vaddr = &memblock[i];
722         errors++;
723       } else {
724         page_error = true;
725         // If we have overflowed the error queue, just print the errors now.
726         logprintf(10, "Log: Error record overflow, too many miscompares!\n");
727         errormessage = "Page Error";
728         break;
729       }
730     }
731   }
732
733   // Find if this is a whole block corruption.
734   if (page_error && !tag_mode_) {
735     int patsize = patternlist_->Size();
736     for (int pat = 0; pat < patsize; pat++) {
737       class Pattern *altpattern = patternlist_->GetPattern(pat);
738       const int kGood = 0;
739       const int kBad = 1;
740       const int kGoodAgain = 2;
741       const int kNoMatch = 3;
742       int state = kGood;
743       unsigned int badstart = 0;
744       unsigned int badend = 0;
745
746       // Don't match against ourself!
747       if (pattern == altpattern)
748         continue;
749
750       for (int i = 0; i < length / wordsize_; i++) {
751         uint64 actual = memblock[i];
752         datacast_t expected;
753         datacast_t possible;
754
755         // Determine the value that should be there.
756         int index = 2 * i + pattern_offset;
757
758         expected.l32.l = pattern->pattern(index);
759         expected.l32.h = pattern->pattern(index + 1);
760
761         possible.l32.l = pattern->pattern(index);
762         possible.l32.h = pattern->pattern(index + 1);
763
764         if (state == kGood) {
765           if (actual == expected.l64) {
766             continue;
767           } else if (actual == possible.l64) {
768             badstart = i;
769             badend = i;
770             state = kBad;
771             continue;
772           } else {
773             state = kNoMatch;
774             break;
775           }
776         } else if (state == kBad) {
777           if (actual == possible.l64) {
778             badend = i;
779             continue;
780           } else if (actual == expected.l64) {
781             state = kGoodAgain;
782             continue;
783           } else {
784             state = kNoMatch;
785             break;
786           }
787         } else if (state == kGoodAgain) {
788           if (actual == expected.l64) {
789             continue;
790           } else {
791             state = kNoMatch;
792             break;
793           }
794         }
795       }
796
797       if ((state == kGoodAgain) || (state == kBad)) {
798         unsigned int blockerrors = badend - badstart + 1;
799         errormessage = "Block Error";
800         ProcessError(&recorded[0], 0, errormessage.c_str());
801         logprintf(0, "Block Error: (%p) pattern %s instead of %s, "
802                   "%d bytes from offset 0x%x to 0x%x\n",
803                   &memblock[badstart],
804                   altpattern->name(), pattern->name(),
805                   blockerrors * wordsize_,
806                   offset + badstart * wordsize_,
807                   offset + badend * wordsize_);
808         errorcount_ += blockerrors;
809         return blockerrors;
810       }
811     }
812   }
813
814
815   // Process error queue after all errors have been recorded.
816   for (int err = 0; err < errors; err++) {
817     int priority = 5;
818     if (errorcount_ + err < 30)
819       priority = 0;  // Bump up the priority for the first few errors.
820     ProcessError(&recorded[err], priority, errormessage.c_str());
821   }
822
823   if (page_error) {
824     // For each word in the data region.
825     int error_recount = 0;
826     for (int i = 0; i < length / wordsize_; i++) {
827       uint64 actual = memblock[i];
828       uint64 expected;
829       datacast_t data;
830       // Determine the value that should be there.
831       int index = 2 * i + pattern_offset;
832
833       data.l32.l = pattern->pattern(index);
834       data.l32.h = pattern->pattern(index + 1);
835       expected = data.l64;
836
837       // Check tags if necessary.
838       if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
839         expected = addr_to_tag(&memblock[i]);
840       }
841
842       // If the value is incorrect, save an error record for later printing.
843       if (actual != expected) {
844         if (error_recount < kErrorLimit) {
845           // We already reported these.
846           error_recount++;
847         } else {
848           // If we have overflowed the error queue, print the errors now.
849           struct ErrorRecord er;
850           er.actual = actual;
851           er.expected = expected;
852           er.vaddr = &memblock[i];
853
854           // Do the error printout. This will take a long time and
855           // likely change the machine state.
856           ProcessError(&er, 12, errormessage.c_str());
857           overflowerrors++;
858         }
859       }
860     }
861   }
862
863   // Keep track of observed errors.
864   errorcount_ += errors + overflowerrors;
865   return errors + overflowerrors;
866 }
867
868 float WorkerThread::GetCopiedData() {
869   return pages_copied_ * sat_->page_length() / kMegabyte;
870 }
871
872 // Calculate the CRC of a region.
873 // Result check if the CRC mismatches.
874 int WorkerThread::CrcCheckPage(struct page_entry *srcpe) {
875   const int blocksize = 4096;
876   const int blockwords = blocksize / wordsize_;
877   int errors = 0;
878
879   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
880   uint64 *memblock = static_cast<uint64*>(srcpe->addr);
881   int blocks = sat_->page_length() / blocksize;
882   for (int currentblock = 0; currentblock < blocks; currentblock++) {
883     uint64 *memslice = memblock + currentblock * blockwords;
884
885     AdlerChecksum crc;
886     if (tag_mode_) {
887       AdlerAddrCrcC(memslice, blocksize, &crc, srcpe);
888     } else {
889       CalculateAdlerChecksum(memslice, blocksize, &crc);
890     }
891
892     // If the CRC does not match, we'd better look closer.
893     if (!crc.Equals(*expectedcrc)) {
894       logprintf(11, "Log: CrcCheckPage Falling through to slow compare, "
895                 "CRC mismatch %s != %s\n",
896                 crc.ToHexString().c_str(),
897                 expectedcrc->ToHexString().c_str());
898       int errorcount = CheckRegion(memslice,
899                                    srcpe->pattern,
900                                    blocksize,
901                                    currentblock * blocksize, 0);
902       if (errorcount == 0) {
903         logprintf(0, "Log: CrcCheckPage CRC mismatch %s != %s, "
904                      "but no miscompares found.\n",
905                   crc.ToHexString().c_str(),
906                   expectedcrc->ToHexString().c_str());
907       }
908       errors += errorcount;
909     }
910   }
911
912   // For odd length transfers, we should never hit this.
913   int leftovers = sat_->page_length() % blocksize;
914   if (leftovers) {
915     uint64 *memslice = memblock + blocks * blockwords;
916     errors += CheckRegion(memslice,
917                           srcpe->pattern,
918                           leftovers,
919                           blocks * blocksize, 0);
920   }
921   return errors;
922 }
923
924
925 // Print error information about a data miscompare.
926 void WorkerThread::ProcessTagError(struct ErrorRecord *error,
927                                    int priority,
928                                    const char *message) {
929   char dimm_string[256] = "";
930   char tag_dimm_string[256] = "";
931   bool read_error = false;
932
933   int apic_id = apicid();
934
935   // Determine if this is a write or read error.
936   os_->Flush(error->vaddr);
937   error->reread = *(error->vaddr);
938
939   // Distinguish read and write errors.
940   if (error->actual != error->reread) {
941     read_error = true;
942   }
943
944   sat_assert(error->expected != error->actual);
945
946   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr);
947
948   // Find physical address if possible.
949   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
950   error->tagpaddr = os_->VirtualToPhysical(error->tagvaddr);
951
952   // Pretty print DIMM mapping if available.
953   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
954   // Pretty print DIMM mapping if available.
955   os_->FindDimm(error->tagpaddr, tag_dimm_string, sizeof(tag_dimm_string));
956
957   // Report parseable error.
958   if (priority < 5) {
959     logprintf(priority,
960               "%s: Tag from %p(0x%llx:%s) (%s) "
961               "miscompare on CPU %d(0x%s) at %p(0x%llx:%s): "
962               "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
963               message,
964               error->tagvaddr, error->tagpaddr,
965               tag_dimm_string,
966               read_error ? "read error" : "write error",
967               apic_id,
968               CurrentCpusFormat().c_str(),
969               error->vaddr,
970               error->paddr,
971               dimm_string,
972               error->actual,
973               error->reread,
974               error->expected);
975   }
976
977   errorcount_ += 1;
978
979   // Overwrite incorrect data with correct data to prevent
980   // future miscompares when this data is reused.
981   *(error->vaddr) = error->expected;
982   os_->Flush(error->vaddr);
983 }
984
985
986 // Print out and log a tag error.
987 bool WorkerThread::ReportTagError(
988     uint64 *mem64,
989     uint64 actual,
990     uint64 tag) {
991   struct ErrorRecord er;
992   er.actual = actual;
993
994   er.expected = tag;
995   er.vaddr = mem64;
996
997   // Generate vaddr from tag.
998   er.tagvaddr = reinterpret_cast<uint64*>(actual);
999
1000   ProcessTagError(&er, 0, "Hardware Error");
1001   return true;
1002 }
1003
1004 // C implementation of Adler memory copy, with memory tagging.
1005 bool WorkerThread::AdlerAddrMemcpyC(uint64 *dstmem64,
1006                                     uint64 *srcmem64,
1007                                     unsigned int size_in_bytes,
1008                                     AdlerChecksum *checksum,
1009                                     struct page_entry *pe) {
1010   // Use this data wrapper to access memory with 64bit read/write.
1011   datacast_t data;
1012   datacast_t dstdata;
1013   unsigned int count = size_in_bytes / sizeof(data);
1014
1015   if (count > ((1U) << 19)) {
1016     // Size is too large, must be strictly less than 512 KB.
1017     return false;
1018   }
1019
1020   uint64 a1 = 1;
1021   uint64 a2 = 1;
1022   uint64 b1 = 0;
1023   uint64 b2 = 0;
1024
1025   class Pattern *pattern = pe->pattern;
1026
1027   unsigned int i = 0;
1028   while (i < count) {
1029     // Process 64 bits at a time.
1030     if ((i & 0x7) == 0) {
1031       data.l64 = srcmem64[i];
1032       dstdata.l64 = dstmem64[i];
1033       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1034       uint64 dst_tag = addr_to_tag(&dstmem64[i]);
1035       // Detect if tags have been corrupted.
1036       if (data.l64 != src_tag)
1037         ReportTagError(&srcmem64[i], data.l64, src_tag);
1038       if (dstdata.l64 != dst_tag)
1039         ReportTagError(&dstmem64[i], dstdata.l64, dst_tag);
1040
1041       data.l32.l = pattern->pattern(i << 1);
1042       data.l32.h = pattern->pattern((i << 1) + 1);
1043       a1 = a1 + data.l32.l;
1044       b1 = b1 + a1;
1045       a1 = a1 + data.l32.h;
1046       b1 = b1 + a1;
1047
1048       data.l64  = dst_tag;
1049       dstmem64[i] = data.l64;
1050
1051     } else {
1052       data.l64 = srcmem64[i];
1053       a1 = a1 + data.l32.l;
1054       b1 = b1 + a1;
1055       a1 = a1 + data.l32.h;
1056       b1 = b1 + a1;
1057       dstmem64[i] = data.l64;
1058     }
1059     i++;
1060
1061     data.l64 = srcmem64[i];
1062     a2 = a2 + data.l32.l;
1063     b2 = b2 + a2;
1064     a2 = a2 + data.l32.h;
1065     b2 = b2 + a2;
1066     dstmem64[i] = data.l64;
1067     i++;
1068   }
1069   checksum->Set(a1, a2, b1, b2);
1070   return true;
1071 }
1072
1073 // x86_64 SSE2 assembly implementation of Adler memory copy, with address
1074 // tagging added as a second step. This is useful for debugging failures
1075 // that only occur when SSE / nontemporal writes are used.
1076 bool WorkerThread::AdlerAddrMemcpyWarm(uint64 *dstmem64,
1077                                        uint64 *srcmem64,
1078                                        unsigned int size_in_bytes,
1079                                        AdlerChecksum *checksum,
1080                                        struct page_entry *pe) {
1081   // Do ASM copy, ignore checksum.
1082   AdlerChecksum ignored_checksum;
1083   os_->AdlerMemcpyWarm(dstmem64, srcmem64, size_in_bytes, &ignored_checksum);
1084
1085   // Force cache flush.
1086   int length = size_in_bytes / sizeof(*dstmem64);
1087   for (int i = 0; i < length; i += sizeof(*dstmem64)) {
1088     os_->FastFlush(dstmem64 + i);
1089     os_->FastFlush(srcmem64 + i);
1090   }
1091   // Check results.
1092   AdlerAddrCrcC(srcmem64, size_in_bytes, checksum, pe);
1093   // Patch up address tags.
1094   TagAddrC(dstmem64, size_in_bytes);
1095   return true;
1096 }
1097
1098 // Retag pages..
1099 bool WorkerThread::TagAddrC(uint64 *memwords,
1100                             unsigned int size_in_bytes) {
1101   // Mask is the bitmask of indexes used by the pattern.
1102   // It is the pattern size -1. Size is always a power of 2.
1103
1104   // Select tag or data as appropriate.
1105   int length = size_in_bytes / wordsize_;
1106   for (int i = 0; i < length; i += 8) {
1107     datacast_t data;
1108     data.l64 = addr_to_tag(&memwords[i]);
1109     memwords[i] = data.l64;
1110   }
1111   return true;
1112 }
1113
1114 // C implementation of Adler memory crc.
1115 bool WorkerThread::AdlerAddrCrcC(uint64 *srcmem64,
1116                                  unsigned int size_in_bytes,
1117                                  AdlerChecksum *checksum,
1118                                  struct page_entry *pe) {
1119   // Use this data wrapper to access memory with 64bit read/write.
1120   datacast_t data;
1121   unsigned int count = size_in_bytes / sizeof(data);
1122
1123   if (count > ((1U) << 19)) {
1124     // Size is too large, must be strictly less than 512 KB.
1125     return false;
1126   }
1127
1128   uint64 a1 = 1;
1129   uint64 a2 = 1;
1130   uint64 b1 = 0;
1131   uint64 b2 = 0;
1132
1133   class Pattern *pattern = pe->pattern;
1134
1135   unsigned int i = 0;
1136   while (i < count) {
1137     // Process 64 bits at a time.
1138     if ((i & 0x7) == 0) {
1139       data.l64 = srcmem64[i];
1140       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1141       // Check that tags match expected.
1142       if (data.l64 != src_tag)
1143         ReportTagError(&srcmem64[i], data.l64, src_tag);
1144
1145       data.l32.l = pattern->pattern(i << 1);
1146       data.l32.h = pattern->pattern((i << 1) + 1);
1147       a1 = a1 + data.l32.l;
1148       b1 = b1 + a1;
1149       a1 = a1 + data.l32.h;
1150       b1 = b1 + a1;
1151     } else {
1152       data.l64 = srcmem64[i];
1153       a1 = a1 + data.l32.l;
1154       b1 = b1 + a1;
1155       a1 = a1 + data.l32.h;
1156       b1 = b1 + a1;
1157     }
1158     i++;
1159
1160     data.l64 = srcmem64[i];
1161     a2 = a2 + data.l32.l;
1162     b2 = b2 + a2;
1163     a2 = a2 + data.l32.h;
1164     b2 = b2 + a2;
1165     i++;
1166   }
1167   checksum->Set(a1, a2, b1, b2);
1168   return true;
1169 }
1170
1171 // Copy a block of memory quickly, while keeping a CRC of the data.
1172 // Result check if the CRC mismatches.
1173 int WorkerThread::CrcCopyPage(struct page_entry *dstpe,
1174                               struct page_entry *srcpe) {
1175   int errors = 0;
1176   const int blocksize = 4096;
1177   const int blockwords = blocksize / wordsize_;
1178   int blocks = sat_->page_length() / blocksize;
1179
1180   // Base addresses for memory copy
1181   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1182   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1183   // Remember the expected CRC
1184   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1185
1186   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1187     uint64 *targetmem = targetmembase + currentblock * blockwords;
1188     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1189
1190     AdlerChecksum crc;
1191     if (tag_mode_) {
1192       AdlerAddrMemcpyC(targetmem, sourcemem, blocksize, &crc, srcpe);
1193     } else {
1194       AdlerMemcpyC(targetmem, sourcemem, blocksize, &crc);
1195     }
1196
1197     // Investigate miscompares.
1198     if (!crc.Equals(*expectedcrc)) {
1199       logprintf(11, "Log: CrcCopyPage Falling through to slow compare, "
1200                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1201                 expectedcrc->ToHexString().c_str());
1202       int errorcount = CheckRegion(sourcemem,
1203                                    srcpe->pattern,
1204                                    blocksize,
1205                                    currentblock * blocksize, 0);
1206       if (errorcount == 0) {
1207         logprintf(0, "Log: CrcCopyPage CRC mismatch %s != %s, "
1208                      "but no miscompares found. Retrying with fresh data.\n",
1209                   crc.ToHexString().c_str(),
1210                   expectedcrc->ToHexString().c_str());
1211         if (!tag_mode_) {
1212           // Copy the data originally read from this region back again.
1213           // This data should have any corruption read originally while
1214           // calculating the CRC.
1215           memcpy(sourcemem, targetmem, blocksize);
1216           errorcount = CheckRegion(sourcemem,
1217                                    srcpe->pattern,
1218                                    blocksize,
1219                                    currentblock * blocksize, 0);
1220           if (errorcount == 0) {
1221             int apic_id = apicid();
1222             logprintf(0, "Process Error: CPU %d(0x%s) CrcCopyPage "
1223                          "CRC mismatch %s != %s, "
1224                          "but no miscompares found on second pass.\n",
1225                       apic_id, CurrentCpusFormat().c_str(),
1226                       crc.ToHexString().c_str(),
1227                       expectedcrc->ToHexString().c_str());
1228             struct ErrorRecord er;
1229             er.actual = sourcemem[0];
1230             er.expected = 0x0;
1231             er.vaddr = sourcemem;
1232             ProcessError(&er, 0, "Hardware Error");
1233           }
1234         }
1235       }
1236       errors += errorcount;
1237     }
1238   }
1239
1240   // For odd length transfers, we should never hit this.
1241   int leftovers = sat_->page_length() % blocksize;
1242   if (leftovers) {
1243     uint64 *targetmem = targetmembase + blocks * blockwords;
1244     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1245
1246     errors += CheckRegion(sourcemem,
1247                           srcpe->pattern,
1248                           leftovers,
1249                           blocks * blocksize, 0);
1250     int leftoverwords = leftovers / wordsize_;
1251     for (int i = 0; i < leftoverwords; i++) {
1252       targetmem[i] = sourcemem[i];
1253     }
1254   }
1255
1256   // Update pattern reference to reflect new contents.
1257   dstpe->pattern = srcpe->pattern;
1258
1259   // Clean clean clean the errors away.
1260   if (errors) {
1261     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1262     // cause bad data to be propogated across the page.
1263     FillPage(dstpe);
1264   }
1265   return errors;
1266 }
1267
1268
1269
1270 // Invert a block of memory quickly, traversing downwards.
1271 int InvertThread::InvertPageDown(struct page_entry *srcpe) {
1272   const int blocksize = 4096;
1273   const int blockwords = blocksize / wordsize_;
1274   int blocks = sat_->page_length() / blocksize;
1275
1276   // Base addresses for memory copy
1277   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1278
1279   for (int currentblock = blocks-1; currentblock >= 0; currentblock--) {
1280     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1281     for (int i = blockwords - 32; i >= 0; i -= 32) {
1282       for (int index = i + 31; index >= i; --index) {
1283         unsigned int actual = sourcemem[index];
1284         sourcemem[index] = ~actual;
1285       }
1286       OsLayer::FastFlush(&sourcemem[i]);
1287     }
1288   }
1289
1290   return 0;
1291 }
1292
1293 // Invert a block of memory, traversing upwards.
1294 int InvertThread::InvertPageUp(struct page_entry *srcpe) {
1295   const int blocksize = 4096;
1296   const int blockwords = blocksize / wordsize_;
1297   int blocks = sat_->page_length() / blocksize;
1298
1299   // Base addresses for memory copy
1300   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1301
1302   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1303     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1304     for (int i = 0; i < blockwords; i += 32) {
1305       for (int index = i; index <= i + 31; ++index) {
1306         unsigned int actual = sourcemem[index];
1307         sourcemem[index] = ~actual;
1308       }
1309       OsLayer::FastFlush(&sourcemem[i]);
1310     }
1311   }
1312   return 0;
1313 }
1314
1315 // Copy a block of memory quickly, while keeping a CRC of the data.
1316 // Result check if the CRC mismatches. Warm the CPU while running
1317 int WorkerThread::CrcWarmCopyPage(struct page_entry *dstpe,
1318                                   struct page_entry *srcpe) {
1319   int errors = 0;
1320   const int blocksize = 4096;
1321   const int blockwords = blocksize / wordsize_;
1322   int blocks = sat_->page_length() / blocksize;
1323
1324   // Base addresses for memory copy
1325   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1326   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1327   // Remember the expected CRC
1328   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1329
1330   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1331     uint64 *targetmem = targetmembase + currentblock * blockwords;
1332     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1333
1334     AdlerChecksum crc;
1335     if (tag_mode_) {
1336       AdlerAddrMemcpyWarm(targetmem, sourcemem, blocksize, &crc, srcpe);
1337     } else {
1338       os_->AdlerMemcpyWarm(targetmem, sourcemem, blocksize, &crc);
1339     }
1340
1341     // Investigate miscompares.
1342     if (!crc.Equals(*expectedcrc)) {
1343       logprintf(11, "Log: CrcWarmCopyPage Falling through to slow compare, "
1344                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1345                 expectedcrc->ToHexString().c_str());
1346       int errorcount = CheckRegion(sourcemem,
1347                                    srcpe->pattern,
1348                                    blocksize,
1349                                    currentblock * blocksize, 0);
1350       if (errorcount == 0) {
1351         logprintf(0, "Log: CrcWarmCopyPage CRC mismatch %s != %s, "
1352                      "but no miscompares found. Retrying with fresh data.\n",
1353                   crc.ToHexString().c_str(),
1354                   expectedcrc->ToHexString().c_str());
1355         if (!tag_mode_) {
1356           // Copy the data originally read from this region back again.
1357           // This data should have any corruption read originally while
1358           // calculating the CRC.
1359           memcpy(sourcemem, targetmem, blocksize);
1360           errorcount = CheckRegion(sourcemem,
1361                                    srcpe->pattern,
1362                                    blocksize,
1363                                    currentblock * blocksize, 0);
1364           if (errorcount == 0) {
1365             int apic_id = apicid();
1366             logprintf(0, "Process Error: CPU %d(0x%s) CrciWarmCopyPage "
1367                          "CRC mismatch %s != %s, "
1368                          "but no miscompares found on second pass.\n",
1369                       apic_id, CurrentCpusFormat().c_str(),
1370                       crc.ToHexString().c_str(),
1371                       expectedcrc->ToHexString().c_str());
1372             struct ErrorRecord er;
1373             er.actual = sourcemem[0];
1374             er.expected = 0x0;
1375             er.vaddr = sourcemem;
1376             ProcessError(&er, 0, "Hardware Error");
1377           }
1378         }
1379       }
1380       errors += errorcount;
1381     }
1382   }
1383
1384   // For odd length transfers, we should never hit this.
1385   int leftovers = sat_->page_length() % blocksize;
1386   if (leftovers) {
1387     uint64 *targetmem = targetmembase + blocks * blockwords;
1388     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1389
1390     errors += CheckRegion(sourcemem,
1391                           srcpe->pattern,
1392                           leftovers,
1393                           blocks * blocksize, 0);
1394     int leftoverwords = leftovers / wordsize_;
1395     for (int i = 0; i < leftoverwords; i++) {
1396       targetmem[i] = sourcemem[i];
1397     }
1398   }
1399
1400   // Update pattern reference to reflect new contents.
1401   dstpe->pattern = srcpe->pattern;
1402
1403   // Clean clean clean the errors away.
1404   if (errors) {
1405     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1406     // cause bad data to be propogated across the page.
1407     FillPage(dstpe);
1408   }
1409   return errors;
1410 }
1411
1412
1413
1414 // Memory check work loop. Execute until done, then exhaust pages.
1415 bool CheckThread::Work() {
1416   struct page_entry pe;
1417   bool result = true;
1418   int64 loops = 0;
1419
1420   logprintf(9, "Log: Starting Check thread %d\n", thread_num_);
1421
1422   // We want to check all the pages, and
1423   // stop when there aren't any left.
1424   while (true) {
1425     result = result && sat_->GetValid(&pe);
1426     if (!result) {
1427       if (IsReadyToRunNoPause())
1428         logprintf(0, "Process Error: check_thread failed to pop pages, "
1429                   "bailing\n");
1430       else
1431         result = true;
1432       break;
1433     }
1434
1435     // Do the result check.
1436     CrcCheckPage(&pe);
1437
1438     // Push pages back on the valid queue if we are still going,
1439     // throw them out otherwise.
1440     if (IsReadyToRunNoPause())
1441       result = result && sat_->PutValid(&pe);
1442     else
1443       result = result && sat_->PutEmpty(&pe);
1444     if (!result) {
1445       logprintf(0, "Process Error: check_thread failed to push pages, "
1446                 "bailing\n");
1447       break;
1448     }
1449     loops++;
1450   }
1451
1452   pages_copied_ = loops;
1453   status_ = result;
1454   logprintf(9, "Log: Completed %d: Check thread. Status %d, %d pages checked\n",
1455             thread_num_, status_, pages_copied_);
1456   return result;
1457 }
1458
1459
1460 // Memory copy work loop. Execute until marked done.
1461 bool CopyThread::Work() {
1462   struct page_entry src;
1463   struct page_entry dst;
1464   bool result = true;
1465   int64 loops = 0;
1466
1467   logprintf(9, "Log: Starting copy thread %d: cpu %s, mem %x\n",
1468             thread_num_, cpuset_format(&cpu_mask_).c_str(), tag_);
1469
1470   while (IsReadyToRun()) {
1471     // Pop the needed pages.
1472     result = result && sat_->GetValid(&src, tag_);
1473     result = result && sat_->GetEmpty(&dst, tag_);
1474     if (!result) {
1475       logprintf(0, "Process Error: copy_thread failed to pop pages, "
1476                 "bailing\n");
1477       break;
1478     }
1479
1480     // Force errors for unittests.
1481     if (sat_->error_injection()) {
1482       if (loops == 8) {
1483         char *addr = reinterpret_cast<char*>(src.addr);
1484         int offset = random() % sat_->page_length();
1485         addr[offset] = 0xba;
1486       }
1487     }
1488
1489     // We can use memcpy, or CRC check while we copy.
1490     if (sat_->warm()) {
1491       CrcWarmCopyPage(&dst, &src);
1492     } else if (sat_->strict()) {
1493       CrcCopyPage(&dst, &src);
1494     } else {
1495       memcpy(dst.addr, src.addr, sat_->page_length());
1496       dst.pattern = src.pattern;
1497     }
1498
1499     result = result && sat_->PutValid(&dst);
1500     result = result && sat_->PutEmpty(&src);
1501
1502     // Copy worker-threads yield themselves at the end of each copy loop,
1503     // to avoid threads from preempting each other in the middle of the inner
1504     // copy-loop. Cooperations between Copy worker-threads results in less
1505     // unnecessary cache thrashing (which happens when context-switching in the
1506     // middle of the inner copy-loop).
1507     YieldSelf();
1508
1509     if (!result) {
1510       logprintf(0, "Process Error: copy_thread failed to push pages, "
1511                 "bailing\n");
1512       break;
1513     }
1514     loops++;
1515   }
1516
1517   pages_copied_ = loops;
1518   status_ = result;
1519   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1520             thread_num_, status_, pages_copied_);
1521   return result;
1522 }
1523
1524 // Memory invert work loop. Execute until marked done.
1525 bool InvertThread::Work() {
1526   struct page_entry src;
1527   bool result = true;
1528   int64 loops = 0;
1529
1530   logprintf(9, "Log: Starting invert thread %d\n", thread_num_);
1531
1532   while (IsReadyToRun()) {
1533     // Pop the needed pages.
1534     result = result && sat_->GetValid(&src);
1535     if (!result) {
1536       logprintf(0, "Process Error: invert_thread failed to pop pages, "
1537                 "bailing\n");
1538       break;
1539     }
1540
1541     if (sat_->strict())
1542       CrcCheckPage(&src);
1543
1544     // For the same reason CopyThread yields itself (see YieldSelf comment
1545     // in CopyThread::Work(), InvertThread yields itself after each invert
1546     // operation to improve cooperation between different worker threads
1547     // stressing the memory/cache.
1548     InvertPageUp(&src);
1549     YieldSelf();
1550     InvertPageDown(&src);
1551     YieldSelf();
1552     InvertPageDown(&src);
1553     YieldSelf();
1554     InvertPageUp(&src);
1555     YieldSelf();
1556
1557     if (sat_->strict())
1558       CrcCheckPage(&src);
1559
1560     result = result && sat_->PutValid(&src);
1561     if (!result) {
1562       logprintf(0, "Process Error: invert_thread failed to push pages, "
1563                 "bailing\n");
1564       break;
1565     }
1566     loops++;
1567   }
1568
1569   pages_copied_ = loops * 2;
1570   status_ = result;
1571   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1572             thread_num_, status_, pages_copied_);
1573   return result;
1574 }
1575
1576
1577 // Set file name to use for File IO.
1578 void FileThread::SetFile(const char *filename_init) {
1579   filename_ = filename_init;
1580   devicename_ = os_->FindFileDevice(filename_);
1581 }
1582
1583 // Open the file for access.
1584 bool FileThread::OpenFile(int *pfile) {
1585   int fd = open(filename_.c_str(),
1586                 O_RDWR | O_CREAT | O_SYNC | O_DIRECT,
1587                 0644);
1588   if (fd < 0) {
1589     logprintf(0, "Process Error: Failed to create file %s!!\n",
1590               filename_.c_str());
1591     pages_copied_ = 0;
1592     return false;
1593   }
1594   *pfile = fd;
1595   return true;
1596 }
1597
1598 // Close the file.
1599 bool FileThread::CloseFile(int fd) {
1600   close(fd);
1601   return true;
1602 }
1603
1604 // Check sector tagging.
1605 bool FileThread::SectorTagPage(struct page_entry *src, int block) {
1606   int page_length = sat_->page_length();
1607   struct FileThread::SectorTag *tag =
1608     (struct FileThread::SectorTag *)(src->addr);
1609
1610   // Tag each sector.
1611   unsigned char magic = ((0xba + thread_num_) & 0xff);
1612   for (int sec = 0; sec < page_length / 512; sec++) {
1613     tag[sec].magic = magic;
1614     tag[sec].block = block & 0xff;
1615     tag[sec].sector = sec & 0xff;
1616     tag[sec].pass = pass_ & 0xff;
1617   }
1618   return true;
1619 }
1620
1621 bool FileThread::WritePageToFile(int fd, struct page_entry *src) {
1622   int page_length = sat_->page_length();
1623   // Fill the file with our data.
1624   int64 size = write(fd, src->addr, page_length);
1625
1626   if (size != page_length) {
1627     os_->ErrorReport(devicename_.c_str(), "write-error", 1);
1628     errorcount_++;
1629     logprintf(0, "Block Error: file_thread failed to write, "
1630               "bailing\n");
1631     return false;
1632   }
1633   return true;
1634 }
1635
1636 // Write the data to the file.
1637 bool FileThread::WritePages(int fd) {
1638   int strict = sat_->strict();
1639
1640   // Start fresh at beginning of file for each batch of pages.
1641   lseek64(fd, 0, SEEK_SET);
1642   for (int i = 0; i < sat_->disk_pages(); i++) {
1643     struct page_entry src;
1644     if (!GetValidPage(&src))
1645       return false;
1646     // Save expected pattern.
1647     page_recs_[i].pattern = src.pattern;
1648     page_recs_[i].src = src.addr;
1649
1650     // Check data correctness.
1651     if (strict)
1652       CrcCheckPage(&src);
1653
1654     SectorTagPage(&src, i);
1655
1656     bool result = WritePageToFile(fd, &src);
1657
1658     if (!PutEmptyPage(&src))
1659       return false;
1660
1661     if (!result)
1662       return false;
1663   }
1664   return true;
1665 }
1666
1667 // Copy data from file into memory block.
1668 bool FileThread::ReadPageFromFile(int fd, struct page_entry *dst) {
1669   int page_length = sat_->page_length();
1670
1671   // Do the actual read.
1672   int64 size = read(fd, dst->addr, page_length);
1673   if (size != page_length) {
1674     os_->ErrorReport(devicename_.c_str(), "read-error", 1);
1675     logprintf(0, "Block Error: file_thread failed to read, "
1676               "bailing\n");
1677     errorcount_++;
1678     return false;
1679   }
1680   return true;
1681 }
1682
1683 // Check sector tagging.
1684 bool FileThread::SectorValidatePage(const struct PageRec &page,
1685                                     struct page_entry *dst, int block) {
1686   // Error injection.
1687   static int calls = 0;
1688   calls++;
1689
1690   // Do sector tag compare.
1691   int firstsector = -1;
1692   int lastsector = -1;
1693   bool badsector = false;
1694   int page_length = sat_->page_length();
1695
1696   // Cast data block into an array of tagged sectors.
1697   struct FileThread::SectorTag *tag =
1698   (struct FileThread::SectorTag *)(dst->addr);
1699
1700   sat_assert(sizeof(*tag) == 512);
1701
1702   // Error injection.
1703   if (sat_->error_injection()) {
1704     if (calls == 2) {
1705       for (int badsec = 8; badsec < 17; badsec++)
1706         tag[badsec].pass = 27;
1707     }
1708     if (calls == 18) {
1709       (static_cast<int32*>(dst->addr))[27] = 0xbadda7a;
1710     }
1711   }
1712
1713   // Check each sector for the correct tag we added earlier,
1714   // then revert the tag to the to normal data pattern.
1715   unsigned char magic = ((0xba + thread_num_) & 0xff);
1716   for (int sec = 0; sec < page_length / 512; sec++) {
1717     // Check magic tag.
1718     if ((tag[sec].magic != magic) ||
1719         (tag[sec].block != (block & 0xff)) ||
1720         (tag[sec].sector != (sec & 0xff)) ||
1721         (tag[sec].pass != (pass_ & 0xff))) {
1722       // Offset calculation for tag location.
1723       int offset = sec * sizeof(SectorTag);
1724       if (tag[sec].block != (block & 0xff))
1725         offset += 1 * sizeof(uint8);
1726       else if (tag[sec].sector != (sec & 0xff))
1727         offset += 2 * sizeof(uint8);
1728       else if (tag[sec].pass != (pass_ & 0xff))
1729         offset += 3 * sizeof(uint8);
1730
1731       // Run sector tag error through diagnoser for logging and reporting.
1732       errorcount_ += 1;
1733       os_->error_diagnoser_->AddHDDSectorTagError(devicename_, tag[sec].block,
1734                                                   offset,
1735                                                   tag[sec].sector,
1736                                                   page.src, page.dst);
1737
1738       logprintf(5, "Sector Error: Sector tag @ 0x%x, pass %d/%d. "
1739                 "sec %x/%x, block %d/%d, magic %x/%x, File: %s \n",
1740                 block * page_length + 512 * sec,
1741                 (pass_ & 0xff), (unsigned int)tag[sec].pass,
1742                 sec, (unsigned int)tag[sec].sector,
1743                 block, (unsigned int)tag[sec].block,
1744                 magic, (unsigned int)tag[sec].magic,
1745                 filename_.c_str());
1746
1747       // Keep track of first and last bad sector.
1748       if (firstsector == -1)
1749         firstsector = (block * page_length / 512) + sec;
1750       lastsector = (block * page_length / 512) + sec;
1751       badsector = true;
1752     }
1753     // Patch tag back to proper pattern.
1754     unsigned int *addr = (unsigned int *)(&tag[sec]);
1755     *addr = dst->pattern->pattern(512 * sec / sizeof(*addr));
1756   }
1757
1758   // If we found sector errors:
1759   if (badsector == true) {
1760     logprintf(5, "Log: file sector miscompare at offset %x-%x. File: %s\n",
1761               firstsector * 512,
1762               ((lastsector + 1) * 512) - 1,
1763               filename_.c_str());
1764
1765     // Either exit immediately, or patch the data up and continue.
1766     if (sat_->stop_on_error()) {
1767       exit(1);
1768     } else {
1769       // Patch up bad pages.
1770       for (int block = (firstsector * 512) / page_length;
1771           block <= (lastsector * 512) / page_length;
1772           block++) {
1773         unsigned int *memblock = static_cast<unsigned int *>(dst->addr);
1774         int length = page_length / wordsize_;
1775         for (int i = 0; i < length; i++) {
1776           memblock[i] = dst->pattern->pattern(i);
1777         }
1778       }
1779     }
1780   }
1781   return true;
1782 }
1783
1784 // Get memory for an incoming data transfer..
1785 bool FileThread::PagePrepare() {
1786   // We can only do direct IO to SAT pages if it is normal mem.
1787   page_io_ = os_->normal_mem();
1788
1789   // Init a local buffer if we need it.
1790   if (!page_io_) {
1791     int result = posix_memalign(&local_page_, 512, sat_->page_length());
1792     if (result) {
1793       logprintf(0, "Process Error: disk thread posix_memalign "
1794                    "returned %d (fail)\n",
1795                 result);
1796       status_ = false;
1797       return false;
1798     }
1799   }
1800   return true;
1801 }
1802
1803
1804 // Remove memory allocated for data transfer.
1805 bool FileThread::PageTeardown() {
1806   // Free a local buffer if we need to.
1807   if (!page_io_) {
1808     free(local_page_);
1809   }
1810   return true;
1811 }
1812
1813
1814
1815 // Get memory for an incoming data transfer..
1816 bool FileThread::GetEmptyPage(struct page_entry *dst) {
1817   if (page_io_) {
1818     if (!sat_->GetEmpty(dst))
1819       return false;
1820   } else {
1821     dst->addr = local_page_;
1822     dst->offset = 0;
1823     dst->pattern = 0;
1824   }
1825   return true;
1826 }
1827
1828 // Get memory for an outgoing data transfer..
1829 bool FileThread::GetValidPage(struct page_entry *src) {
1830   struct page_entry tmp;
1831   if (!sat_->GetValid(&tmp))
1832     return false;
1833   if (page_io_) {
1834     *src = tmp;
1835     return true;
1836   } else {
1837     src->addr = local_page_;
1838     src->offset = 0;
1839     CrcCopyPage(src, &tmp);
1840     if (!sat_->PutValid(&tmp))
1841       return false;
1842   }
1843   return true;
1844 }
1845
1846
1847 // Throw out a used empty page.
1848 bool FileThread::PutEmptyPage(struct page_entry *src) {
1849   if (page_io_) {
1850     if (!sat_->PutEmpty(src))
1851       return false;
1852   }
1853   return true;
1854 }
1855
1856 // Throw out a used, filled page.
1857 bool FileThread::PutValidPage(struct page_entry *src) {
1858   if (page_io_) {
1859     if (!sat_->PutValid(src))
1860       return false;
1861   }
1862   return true;
1863 }
1864
1865 // Copy data from file into memory blocks.
1866 bool FileThread::ReadPages(int fd) {
1867   int page_length = sat_->page_length();
1868   int strict = sat_->strict();
1869   bool result = true;
1870
1871   // Read our data back out of the file, into it's new location.
1872   lseek64(fd, 0, SEEK_SET);
1873   for (int i = 0; i < sat_->disk_pages(); i++) {
1874     struct page_entry dst;
1875     if (!GetEmptyPage(&dst))
1876       return false;
1877     // Retrieve expected pattern.
1878     dst.pattern = page_recs_[i].pattern;
1879     // Update page recordpage record.
1880     page_recs_[i].dst = dst.addr;
1881
1882     // Read from the file into destination page.
1883     if (!ReadPageFromFile(fd, &dst)) {
1884         PutEmptyPage(&dst);
1885         return false;
1886     }
1887
1888     SectorValidatePage(page_recs_[i], &dst, i);
1889
1890     // Ensure that the transfer ended up with correct data.
1891     if (strict) {
1892       // Record page index currently CRC checked.
1893       crc_page_ = i;
1894       int errors = CrcCheckPage(&dst);
1895       if (errors) {
1896         logprintf(5, "Log: file miscompare at block %d, "
1897                   "offset %x-%x. File: %s\n",
1898                   i, i * page_length, ((i + 1) * page_length) - 1,
1899                   filename_.c_str());
1900         result = false;
1901       }
1902       crc_page_ = -1;
1903       errorcount_ += errors;
1904     }
1905     if (!PutValidPage(&dst))
1906       return false;
1907   }
1908   return result;
1909 }
1910
1911 // File IO work loop. Execute until marked done.
1912 bool FileThread::Work() {
1913   bool result = true;
1914   int64 loops = 0;
1915
1916   logprintf(9, "Log: Starting file thread %d, file %s, device %s\n",
1917             thread_num_,
1918             filename_.c_str(),
1919             devicename_.c_str());
1920
1921   if (!PagePrepare()) {
1922     status_ = false;
1923     return false;
1924   }
1925
1926   // Open the data IO file.
1927   int fd = 0;
1928   if (!OpenFile(&fd)) {
1929     status_ = false;
1930     return false;
1931   }
1932
1933   pass_ = 0;
1934
1935   // Load patterns into page records.
1936   page_recs_ = new struct PageRec[sat_->disk_pages()];
1937   for (int i = 0; i < sat_->disk_pages(); i++) {
1938     page_recs_[i].pattern = new struct Pattern();
1939   }
1940
1941   // Loop until done.
1942   while (IsReadyToRun()) {
1943     // Do the file write.
1944     if (!(result = result && WritePages(fd)))
1945       break;
1946
1947     // Do the file read.
1948     if (!(result = result && ReadPages(fd)))
1949       break;
1950
1951     loops++;
1952     pass_ = loops;
1953   }
1954
1955   pages_copied_ = loops * sat_->disk_pages();
1956   status_ = result;
1957
1958   // Clean up.
1959   CloseFile(fd);
1960   PageTeardown();
1961
1962   logprintf(9, "Log: Completed %d: file thread status %d, %d pages copied\n",
1963             thread_num_, status_, pages_copied_);
1964   return result;
1965 }
1966
1967 bool NetworkThread::IsNetworkStopSet() {
1968   return !IsReadyToRunNoPause();
1969 }
1970
1971 bool NetworkSlaveThread::IsNetworkStopSet() {
1972   // This thread has no completion status.
1973   // It finishes whever there is no more data to be
1974   // passed back.
1975   return true;
1976 }
1977
1978 // Set ip name to use for Network IO.
1979 void NetworkThread::SetIP(const char *ipaddr_init) {
1980   strncpy(ipaddr_, ipaddr_init, 256);
1981 }
1982
1983 // Create a socket.
1984 // Return 0 on error.
1985 bool NetworkThread::CreateSocket(int *psocket) {
1986   int sock = socket(AF_INET, SOCK_STREAM, 0);
1987   if (sock == -1) {
1988     logprintf(0, "Process Error: Cannot open socket\n");
1989     pages_copied_ = 0;
1990     status_ = false;
1991     return false;
1992   }
1993   *psocket = sock;
1994   return true;
1995 }
1996
1997 // Close the socket.
1998 bool NetworkThread::CloseSocket(int sock) {
1999   close(sock);
2000   return true;
2001 }
2002
2003 // Initiate the tcp connection.
2004 bool NetworkThread::Connect(int sock) {
2005   struct sockaddr_in dest_addr;
2006   dest_addr.sin_family = AF_INET;
2007   dest_addr.sin_port = htons(kNetworkPort);
2008   memset(&(dest_addr.sin_zero), '\0', sizeof(dest_addr.sin_zero));
2009
2010   // Translate dot notation to u32.
2011   if (inet_aton(ipaddr_, &dest_addr.sin_addr) == 0) {
2012     logprintf(0, "Process Error: Cannot resolve %s\n", ipaddr_);
2013     pages_copied_ = 0;
2014     status_ = false;
2015     return false;
2016   }
2017
2018   if (-1 == connect(sock, reinterpret_cast<struct sockaddr *>(&dest_addr),
2019                     sizeof(struct sockaddr))) {
2020     logprintf(0, "Process Error: Cannot connect %s\n", ipaddr_);
2021     pages_copied_ = 0;
2022     status_ = false;
2023     return false;
2024   }
2025   return true;
2026 }
2027
2028 // Initiate the tcp connection.
2029 bool NetworkListenThread::Listen() {
2030   struct sockaddr_in sa;
2031
2032   memset(&(sa.sin_zero), '\0', sizeof(sa.sin_zero));
2033
2034   sa.sin_family = AF_INET;
2035   sa.sin_addr.s_addr = INADDR_ANY;
2036   sa.sin_port = htons(kNetworkPort);
2037
2038   if (-1 == bind(sock_, (struct sockaddr*)&sa, sizeof(struct sockaddr))) {
2039     char buf[256];
2040     sat_strerror(errno, buf, sizeof(buf));
2041     logprintf(0, "Process Error: Cannot bind socket: %s\n", buf);
2042     pages_copied_ = 0;
2043     status_ = false;
2044     return false;
2045   }
2046   listen(sock_, 3);
2047   return true;
2048 }
2049
2050 // Wait for a connection from a network traffic generation thread.
2051 bool NetworkListenThread::Wait() {
2052     fd_set rfds;
2053     struct timeval tv;
2054     int retval;
2055
2056     // Watch sock_ to see when it has input.
2057     FD_ZERO(&rfds);
2058     FD_SET(sock_, &rfds);
2059     // Wait up to five seconds.
2060     tv.tv_sec = 5;
2061     tv.tv_usec = 0;
2062
2063     retval = select(sock_ + 1, &rfds, NULL, NULL, &tv);
2064
2065     return (retval > 0);
2066 }
2067
2068 // Wait for a connection from a network traffic generation thread.
2069 bool NetworkListenThread::GetConnection(int *pnewsock) {
2070   struct sockaddr_in sa;
2071   socklen_t size = sizeof(struct sockaddr_in);
2072
2073   int newsock = accept(sock_, reinterpret_cast<struct sockaddr *>(&sa), &size);
2074   if (newsock < 0)  {
2075     logprintf(0, "Process Error: Did not receive connection\n");
2076     pages_copied_ = 0;
2077     status_ = false;
2078     return false;
2079   }
2080   *pnewsock = newsock;
2081   return true;
2082 }
2083
2084 // Send a page, return false if a page was not sent.
2085 bool NetworkThread::SendPage(int sock, struct page_entry *src) {
2086   int page_length = sat_->page_length();
2087   char *address = static_cast<char*>(src->addr);
2088
2089   // Send our data over the network.
2090   int size = page_length;
2091   while (size) {
2092     int transferred = send(sock, address + (page_length - size), size, 0);
2093     if ((transferred == 0) || (transferred == -1)) {
2094       if (!IsNetworkStopSet()) {
2095         char buf[256] = "";
2096         sat_strerror(errno, buf, sizeof(buf));
2097         logprintf(0, "Process Error: Thread %d, "
2098                      "Network write failed, bailing. (%s)\n",
2099                   thread_num_, buf);
2100         status_ = false;
2101       }
2102       return false;
2103     }
2104     size = size - transferred;
2105   }
2106   return true;
2107 }
2108
2109 // Receive a page. Return false if a page was not received.
2110 bool NetworkThread::ReceivePage(int sock, struct page_entry *dst) {
2111   int page_length = sat_->page_length();
2112   char *address = static_cast<char*>(dst->addr);
2113
2114   // Maybe we will get our data back again, maybe not.
2115   int size = page_length;
2116   while (size) {
2117     int transferred = recv(sock, address + (page_length - size), size, 0);
2118     if ((transferred == 0) || (transferred == -1)) {
2119       // Typically network slave thread should exit as network master
2120       // thread stops sending data.
2121       if (IsNetworkStopSet()) {
2122         int err = errno;
2123         if (transferred == 0 && err == 0) {
2124           // Two system setups will not sync exactly,
2125           // allow early exit, but log it.
2126           logprintf(0, "Log: Net thread did not receive any data, exiting.\n");
2127         } else {
2128           char buf[256] = "";
2129           sat_strerror(err, buf, sizeof(buf));
2130           // Print why we failed.
2131           logprintf(0, "Process Error: Thread %d, "
2132                        "Network read failed, bailing (%s).\n",
2133                     thread_num_, buf);
2134           status_ = false;
2135           // Print arguments and results.
2136           logprintf(0, "Log: recv(%d, address %x, size %x, 0) == %x, err %d\n",
2137                     sock, address + (page_length - size),
2138                     size, transferred, err);
2139           if ((transferred == 0) &&
2140               (page_length - size < 512) &&
2141               (page_length - size > 0)) {
2142             // Print null terminated data received, to see who's been
2143             // sending us supicious unwanted data.
2144             address[page_length - size] = 0;
2145             logprintf(0, "Log: received  %d bytes: '%s'\n",
2146                       page_length - size, address);
2147           }
2148         }
2149       }
2150       return false;
2151     }
2152     size = size - transferred;
2153   }
2154   return true;
2155 }
2156
2157 // Network IO work loop. Execute until marked done.
2158 // Return true if the thread ran as expected.
2159 bool NetworkThread::Work() {
2160   logprintf(9, "Log: Starting network thread %d, ip %s\n",
2161             thread_num_,
2162             ipaddr_);
2163
2164   // Make a socket.
2165   int sock = 0;
2166   if (!CreateSocket(&sock))
2167     return false;
2168
2169   // Network IO loop requires network slave thread to have already initialized.
2170   // We will sleep here for awhile to ensure that the slave thread will be
2171   // listening by the time we connect.
2172   // Sleep for 15 seconds.
2173   sat_sleep(15);
2174   logprintf(9, "Log: Starting execution of network thread %d, ip %s\n",
2175             thread_num_,
2176             ipaddr_);
2177
2178
2179   // Connect to a slave thread.
2180   if (!Connect(sock))
2181     return false;
2182
2183   // Loop until done.
2184   bool result = true;
2185   int strict = sat_->strict();
2186   int64 loops = 0;
2187   while (IsReadyToRun()) {
2188     struct page_entry src;
2189     struct page_entry dst;
2190     result = result && sat_->GetValid(&src);
2191     result = result && sat_->GetEmpty(&dst);
2192     if (!result) {
2193       logprintf(0, "Process Error: net_thread failed to pop pages, "
2194                 "bailing\n");
2195       break;
2196     }
2197
2198     // Check data correctness.
2199     if (strict)
2200       CrcCheckPage(&src);
2201
2202     // Do the network write.
2203     if (!(result = result && SendPage(sock, &src)))
2204       break;
2205
2206     // Update pattern reference to reflect new contents.
2207     dst.pattern = src.pattern;
2208
2209     // Do the network read.
2210     if (!(result = result && ReceivePage(sock, &dst)))
2211       break;
2212
2213     // Ensure that the transfer ended up with correct data.
2214     if (strict)
2215       CrcCheckPage(&dst);
2216
2217     // Return all of our pages to the queue.
2218     result = result && sat_->PutValid(&dst);
2219     result = result && sat_->PutEmpty(&src);
2220     if (!result) {
2221       logprintf(0, "Process Error: net_thread failed to push pages, "
2222                 "bailing\n");
2223       break;
2224     }
2225     loops++;
2226   }
2227
2228   pages_copied_ = loops;
2229   status_ = result;
2230
2231   // Clean up.
2232   CloseSocket(sock);
2233
2234   logprintf(9, "Log: Completed %d: network thread status %d, "
2235                "%d pages copied\n",
2236             thread_num_, status_, pages_copied_);
2237   return result;
2238 }
2239
2240 // Spawn slave threads for incoming connections.
2241 bool NetworkListenThread::SpawnSlave(int newsock, int threadid) {
2242   logprintf(12, "Log: Listen thread spawning slave\n");
2243
2244   // Spawn slave thread, to reflect network traffic back to sender.
2245   ChildWorker *child_worker = new ChildWorker;
2246   child_worker->thread.SetSock(newsock);
2247   child_worker->thread.InitThread(threadid, sat_, os_, patternlist_,
2248                                   &child_worker->status);
2249   child_worker->status.Initialize();
2250   child_worker->thread.SpawnThread();
2251   child_workers_.push_back(child_worker);
2252
2253   return true;
2254 }
2255
2256 // Reap slave threads.
2257 bool NetworkListenThread::ReapSlaves() {
2258   bool result = true;
2259   // Gather status and reap threads.
2260   logprintf(12, "Log: Joining all outstanding threads\n");
2261
2262   for (int i = 0; i < child_workers_.size(); i++) {
2263     NetworkSlaveThread& child_thread = child_workers_[i]->thread;
2264     logprintf(12, "Log: Joining slave thread %d\n", i);
2265     child_thread.JoinThread();
2266     if (child_thread.GetStatus() != 1) {
2267       logprintf(0, "Process Error: Slave Thread %d failed with status %d\n", i,
2268                 child_thread.GetStatus());
2269       result = false;
2270     }
2271     errorcount_ += child_thread.GetErrorCount();
2272     logprintf(9, "Log: Slave Thread %d found %lld miscompares\n", i,
2273               child_thread.GetErrorCount());
2274     pages_copied_ += child_thread.GetPageCount();
2275   }
2276
2277   return result;
2278 }
2279
2280 // Network listener IO work loop. Execute until marked done.
2281 // Return false on fatal software error.
2282 bool NetworkListenThread::Work() {
2283   logprintf(9, "Log: Starting network listen thread %d\n",
2284             thread_num_);
2285
2286   // Make a socket.
2287   sock_ = 0;
2288   if (!CreateSocket(&sock_)) {
2289     status_ = false;
2290     return false;
2291   }
2292   logprintf(9, "Log: Listen thread created sock\n");
2293
2294   // Allows incoming connections to be queued up by socket library.
2295   int newsock = 0;
2296   Listen();
2297   logprintf(12, "Log: Listen thread waiting for incoming connections\n");
2298
2299   // Wait on incoming connections, and spawn worker threads for them.
2300   int threadcount = 0;
2301   while (IsReadyToRun()) {
2302     // Poll for connections that we can accept().
2303     if (Wait()) {
2304       // Accept those connections.
2305       logprintf(12, "Log: Listen thread found incoming connection\n");
2306       if (GetConnection(&newsock)) {
2307         SpawnSlave(newsock, threadcount);
2308         threadcount++;
2309       }
2310     }
2311   }
2312
2313   // Gather status and join spawned threads.
2314   ReapSlaves();
2315
2316   // Delete the child workers.
2317   for (ChildVector::iterator it = child_workers_.begin();
2318        it != child_workers_.end(); ++it) {
2319     (*it)->status.Destroy();
2320     delete *it;
2321   }
2322   child_workers_.clear();
2323
2324   CloseSocket(sock_);
2325
2326   status_ = true;
2327   logprintf(9,
2328             "Log: Completed %d: network listen thread status %d, "
2329             "%d pages copied\n",
2330             thread_num_, status_, pages_copied_);
2331   return true;
2332 }
2333
2334 // Set network reflector socket struct.
2335 void NetworkSlaveThread::SetSock(int sock) {
2336   sock_ = sock;
2337 }
2338
2339 // Network reflector IO work loop. Execute until marked done.
2340 // Return false on fatal software error.
2341 bool NetworkSlaveThread::Work() {
2342   logprintf(9, "Log: Starting network slave thread %d\n",
2343             thread_num_);
2344
2345   // Verify that we have a socket.
2346   int sock = sock_;
2347   if (!sock) {
2348     status_ = false;
2349     return false;
2350   }
2351
2352   // Loop until done.
2353   int64 loops = 0;
2354   // Init a local buffer for storing data.
2355   void *local_page = NULL;
2356   int result = posix_memalign(&local_page, 512, sat_->page_length());
2357   if (result) {
2358     logprintf(0, "Process Error: net slave posix_memalign "
2359                  "returned %d (fail)\n",
2360               result);
2361     status_ = false;
2362     return false;
2363   }
2364
2365   struct page_entry page;
2366   page.addr = local_page;
2367
2368   // This thread will continue to run as long as the thread on the other end of
2369   // the socket is still sending and receiving data.
2370   while (1) {
2371     // Do the network read.
2372     if (!ReceivePage(sock, &page))
2373       break;
2374
2375     // Do the network write.
2376     if (!SendPage(sock, &page))
2377       break;
2378
2379     loops++;
2380   }
2381
2382   pages_copied_ = loops;
2383   // No results provided from this type of thread.
2384   status_ = true;
2385
2386   // Clean up.
2387   CloseSocket(sock);
2388
2389   logprintf(9,
2390             "Log: Completed %d: network slave thread status %d, "
2391             "%d pages copied\n",
2392             thread_num_, status_, pages_copied_);
2393   return true;
2394 }
2395
2396 // Thread work loop. Execute until marked finished.
2397 bool ErrorPollThread::Work() {
2398   logprintf(9, "Log: Starting system error poll thread %d\n", thread_num_);
2399
2400   // This calls a generic error polling function in the Os abstraction layer.
2401   do {
2402     errorcount_ += os_->ErrorPoll();
2403     os_->ErrorWait();
2404   } while (IsReadyToRun());
2405
2406   logprintf(9, "Log: Finished system error poll thread %d: %d errors\n",
2407             thread_num_, errorcount_);
2408   status_ = true;
2409   return true;
2410 }
2411
2412 // Worker thread to heat up CPU.
2413 // This thread does not evaluate pass/fail or software error.
2414 bool CpuStressThread::Work() {
2415   logprintf(9, "Log: Starting CPU stress thread %d\n", thread_num_);
2416
2417   do {
2418     // Run ludloff's platform/CPU-specific assembly workload.
2419     os_->CpuStressWorkload();
2420     YieldSelf();
2421   } while (IsReadyToRun());
2422
2423   logprintf(9, "Log: Finished CPU stress thread %d:\n",
2424             thread_num_);
2425   status_ = true;
2426   return true;
2427 }
2428
2429 CpuCacheCoherencyThread::CpuCacheCoherencyThread(cc_cacheline_data *data,
2430                                                  int cacheline_count,
2431                                                  int thread_num,
2432                                                  int inc_count) {
2433   cc_cacheline_data_ = data;
2434   cc_cacheline_count_ = cacheline_count;
2435   cc_thread_num_ = thread_num;
2436   cc_inc_count_ = inc_count;
2437 }
2438
2439 // Worked thread to test the cache coherency of the CPUs
2440 // Return false on fatal sw error.
2441 bool CpuCacheCoherencyThread::Work() {
2442   logprintf(9, "Log: Starting the Cache Coherency thread %d\n",
2443             cc_thread_num_);
2444   uint64 time_start, time_end;
2445   struct timeval tv;
2446
2447   unsigned int seed = static_cast<unsigned int>(gettid());
2448   gettimeofday(&tv, NULL);  // Get the timestamp before increments.
2449   time_start = tv.tv_sec * 1000000ULL + tv.tv_usec;
2450
2451   uint64 total_inc = 0;  // Total increments done by the thread.
2452   while (IsReadyToRun()) {
2453     for (int i = 0; i < cc_inc_count_; i++) {
2454       // Choose a datastructure in random and increment the appropriate
2455       // member in that according to the offset (which is the same as the
2456       // thread number.
2457       int r = rand_r(&seed);
2458       r = cc_cacheline_count_ * (r / (RAND_MAX + 1.0));
2459       // Increment the member of the randomely selected structure.
2460       (cc_cacheline_data_[r].num[cc_thread_num_])++;
2461     }
2462
2463     total_inc += cc_inc_count_;
2464
2465     // Calculate if the local counter matches with the global value
2466     // in all the cache line structures for this particular thread.
2467     int cc_global_num = 0;
2468     for (int cline_num = 0; cline_num < cc_cacheline_count_; cline_num++) {
2469       cc_global_num += cc_cacheline_data_[cline_num].num[cc_thread_num_];
2470       // Reset the cachline member's value for the next run.
2471       cc_cacheline_data_[cline_num].num[cc_thread_num_] = 0;
2472     }
2473     if (sat_->error_injection())
2474       cc_global_num = -1;
2475
2476     if (cc_global_num != cc_inc_count_) {
2477       errorcount_++;
2478       logprintf(0, "Hardware Error: global(%d) and local(%d) do not match\n",
2479                 cc_global_num, cc_inc_count_);
2480     }
2481   }
2482   gettimeofday(&tv, NULL);  // Get the timestamp at the end.
2483   time_end = tv.tv_sec * 1000000ULL + tv.tv_usec;
2484
2485   uint64 us_elapsed = time_end - time_start;
2486   // inc_rate is the no. of increments per second.
2487   double inc_rate = total_inc * 1e6 / us_elapsed;
2488
2489   logprintf(4, "Stats: CC Thread(%d): Time=%llu us,"
2490             " Increments=%llu, Increments/sec = %.6lf\n",
2491             cc_thread_num_, us_elapsed, total_inc, inc_rate);
2492   logprintf(9, "Log: Finished CPU Cache Coherency thread %d:\n",
2493             cc_thread_num_);
2494   status_ = true;
2495   return true;
2496 }
2497
2498 DiskThread::DiskThread(DiskBlockTable *block_table) {
2499   read_block_size_ = kSectorSize;   // default 1 sector (512 bytes)
2500   write_block_size_ = kSectorSize;  // this assumes read and write block size
2501                                     // are the same
2502   segment_size_ = -1;               // use the entire disk as one segment
2503   cache_size_ = 16 * 1024 * 1024;   // assume 16MiB cache by default
2504   // Use a queue such that 3/2 times as much data as the cache can hold
2505   // is written before it is read so that there is little chance the read
2506   // data is in the cache.
2507   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2508   blocks_per_segment_ = 32;
2509
2510   read_threshold_ = 100000;         // 100ms is a reasonable limit for
2511   write_threshold_ = 100000;        // reading/writing a sector
2512
2513   read_timeout_ = 5000000;          // 5 seconds should be long enough for a
2514   write_timeout_ = 5000000;         // timout for reading/writing
2515
2516   device_sectors_ = 0;
2517   non_destructive_ = 0;
2518
2519   aio_ctx_ = 0;
2520   block_table_ = block_table;
2521   update_block_table_ = 1;
2522
2523   block_buffer_ = NULL;
2524
2525   blocks_written_ = 0;
2526   blocks_read_ = 0;
2527 }
2528
2529 DiskThread::~DiskThread() {
2530   if (block_buffer_)
2531     free(block_buffer_);
2532 }
2533
2534 // Set filename for device file (in /dev).
2535 void DiskThread::SetDevice(const char *device_name) {
2536   device_name_ = device_name;
2537 }
2538
2539 // Set various parameters that control the behaviour of the test.
2540 // -1 is used as a sentinel value on each parameter (except non_destructive)
2541 // to indicate that the parameter not be set.
2542 bool DiskThread::SetParameters(int read_block_size,
2543                                int write_block_size,
2544                                int64 segment_size,
2545                                int64 cache_size,
2546                                int blocks_per_segment,
2547                                int64 read_threshold,
2548                                int64 write_threshold,
2549                                int non_destructive) {
2550   if (read_block_size != -1) {
2551     // Blocks must be aligned to the disk's sector size.
2552     if (read_block_size % kSectorSize != 0) {
2553       logprintf(0, "Process Error: Block size must be a multiple of %d "
2554                 "(thread %d).\n", kSectorSize, thread_num_);
2555       return false;
2556     }
2557
2558     read_block_size_ = read_block_size;
2559   }
2560
2561   if (write_block_size != -1) {
2562     // Write blocks must be aligned to the disk's sector size and to the
2563     // block size.
2564     if (write_block_size % kSectorSize != 0) {
2565       logprintf(0, "Process Error: Write block size must be a multiple "
2566                 "of %d (thread %d).\n", kSectorSize, thread_num_);
2567       return false;
2568     }
2569     if (write_block_size % read_block_size_ != 0) {
2570       logprintf(0, "Process Error: Write block size must be a multiple "
2571                 "of the read block size, which is %d (thread %d).\n",
2572                 read_block_size_, thread_num_);
2573       return false;
2574     }
2575
2576     write_block_size_ = write_block_size;
2577
2578   } else {
2579     // Make sure write_block_size_ is still valid.
2580     if (read_block_size_ > write_block_size_) {
2581       logprintf(5, "Log: Assuming write block size equal to read block size, "
2582                 "which is %d (thread %d).\n", read_block_size_,
2583                 thread_num_);
2584       write_block_size_ = read_block_size_;
2585     } else {
2586       if (write_block_size_ % read_block_size_ != 0) {
2587         logprintf(0, "Process Error: Write block size (defined as %d) must "
2588                   "be a multiple of the read block size, which is %d "
2589                   "(thread %d).\n", write_block_size_, read_block_size_,
2590                   thread_num_);
2591         return false;
2592       }
2593     }
2594   }
2595
2596   if (cache_size != -1) {
2597     cache_size_ = cache_size;
2598   }
2599
2600   if (blocks_per_segment != -1) {
2601     if (blocks_per_segment <= 0) {
2602       logprintf(0, "Process Error: Blocks per segment must be greater than "
2603                    "zero.\n (thread %d)", thread_num_);
2604       return false;
2605     }
2606
2607     blocks_per_segment_ = blocks_per_segment;
2608   }
2609
2610   if (read_threshold != -1) {
2611     if (read_threshold <= 0) {
2612       logprintf(0, "Process Error: Read threshold must be greater than "
2613                    "zero (thread %d).\n", thread_num_);
2614       return false;
2615     }
2616
2617     read_threshold_ = read_threshold;
2618   }
2619
2620   if (write_threshold != -1) {
2621     if (write_threshold <= 0) {
2622       logprintf(0, "Process Error: Write threshold must be greater than "
2623                    "zero (thread %d).\n", thread_num_);
2624       return false;
2625     }
2626
2627     write_threshold_ = write_threshold;
2628   }
2629
2630   if (segment_size != -1) {
2631     // Segments must be aligned to the disk's sector size.
2632     if (segment_size % kSectorSize != 0) {
2633       logprintf(0, "Process Error: Segment size must be a multiple of %d"
2634                 " (thread %d).\n", kSectorSize, thread_num_);
2635       return false;
2636     }
2637
2638     segment_size_ = segment_size / kSectorSize;
2639   }
2640
2641   non_destructive_ = non_destructive;
2642
2643   // Having a queue of 150% of blocks that will fit in the disk's cache
2644   // should be enough to force out the oldest block before it is read and hence,
2645   // making sure the data comes form the disk and not the cache.
2646   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2647   // Updating DiskBlockTable parameters
2648   if (update_block_table_) {
2649     block_table_->SetParameters(kSectorSize, write_block_size_,
2650                                 device_sectors_, segment_size_,
2651                                 device_name_);
2652   }
2653   return true;
2654 }
2655
2656 // Open a device, return false on failure.
2657 bool DiskThread::OpenDevice(int *pfile) {
2658   int fd = open(device_name_.c_str(),
2659                 O_RDWR | O_SYNC | O_DIRECT | O_LARGEFILE,
2660                 0);
2661   if (fd < 0) {
2662     logprintf(0, "Process Error: Failed to open device %s (thread %d)!!\n",
2663               device_name_.c_str(), thread_num_);
2664     return false;
2665   }
2666   *pfile = fd;
2667
2668   return GetDiskSize(fd);
2669 }
2670
2671 // Retrieves the size (in bytes) of the disk/file.
2672 // Return false on failure.
2673 bool DiskThread::GetDiskSize(int fd) {
2674   struct stat device_stat;
2675   if (fstat(fd, &device_stat) == -1) {
2676     logprintf(0, "Process Error: Unable to fstat disk %s (thread %d).\n",
2677               device_name_.c_str(), thread_num_);
2678     return false;
2679   }
2680
2681   // For a block device, an ioctl is needed to get the size since the size
2682   // of the device file (i.e. /dev/sdb) is 0.
2683   if (S_ISBLK(device_stat.st_mode)) {
2684     uint64 block_size = 0;
2685
2686     if (ioctl(fd, BLKGETSIZE64, &block_size) == -1) {
2687       logprintf(0, "Process Error: Unable to ioctl disk %s (thread %d).\n",
2688                 device_name_.c_str(), thread_num_);
2689       return false;
2690     }
2691
2692     // If an Elephant is initialized with status DEAD its size will be zero.
2693     if (block_size == 0) {
2694       os_->ErrorReport(device_name_.c_str(), "device-size-zero", 1);
2695       ++errorcount_;
2696       status_ = true;  // Avoid a procedural error.
2697       return false;
2698     }
2699
2700     device_sectors_ = block_size / kSectorSize;
2701
2702   } else if (S_ISREG(device_stat.st_mode)) {
2703     device_sectors_ = device_stat.st_size / kSectorSize;
2704
2705   } else {
2706     logprintf(0, "Process Error: %s is not a regular file or block "
2707               "device (thread %d).\n", device_name_.c_str(),
2708               thread_num_);
2709     return false;
2710   }
2711
2712   logprintf(12, "Log: Device sectors: %lld on disk %s (thread %d).\n",
2713             device_sectors_, device_name_.c_str(), thread_num_);
2714
2715   if (update_block_table_) {
2716     block_table_->SetParameters(kSectorSize, write_block_size_,
2717                                 device_sectors_, segment_size_,
2718                                 device_name_);
2719   }
2720
2721   return true;
2722 }
2723
2724 bool DiskThread::CloseDevice(int fd) {
2725   close(fd);
2726   return true;
2727 }
2728
2729 // Return the time in microseconds.
2730 int64 DiskThread::GetTime() {
2731   struct timeval tv;
2732   gettimeofday(&tv, NULL);
2733   return tv.tv_sec * 1000000 + tv.tv_usec;
2734 }
2735
2736 // Do randomized reads and (possibly) writes on a device.
2737 // Return false on fatal error, either SW or HW.
2738 bool DiskThread::DoWork(int fd) {
2739   int64 block_num = 0;
2740   int64 num_segments;
2741   bool result = true;
2742
2743   if (segment_size_ == -1) {
2744     num_segments = 1;
2745   } else {
2746     num_segments = device_sectors_ / segment_size_;
2747     if (device_sectors_ % segment_size_ != 0)
2748       num_segments++;
2749   }
2750
2751   // Disk size should be at least 3x cache size.  See comment later for
2752   // details.
2753   sat_assert(device_sectors_ * kSectorSize > 3 * cache_size_);
2754
2755   // This disk test works by writing blocks with a certain pattern to
2756   // disk, then reading them back and verifying it against the pattern
2757   // at a later time.  A failure happens when either the block cannot
2758   // be written/read or when the read block is different than what was
2759   // written.  If a block takes too long to write/read, then a warning
2760   // is given instead of an error since taking too long is not
2761   // necessarily an error.
2762   //
2763   // To prevent the read blocks from coming from the disk cache,
2764   // enough blocks are written before read such that a block would
2765   // be ejected from the disk cache by the time it is read.
2766   //
2767   // TODO(amistry): Implement some sort of read/write throttling.  The
2768   //                flood of asynchronous I/O requests when a drive is
2769   //                unplugged is causing the application and kernel to
2770   //                become unresponsive.
2771
2772   while (IsReadyToRun()) {
2773     // Write blocks to disk.
2774     logprintf(16, "Log: Write phase %sfor disk %s (thread %d).\n",
2775               non_destructive_ ? "(disabled) " : "",
2776               device_name_.c_str(), thread_num_);
2777     while (IsReadyToRunNoPause() &&
2778            in_flight_sectors_.size() < queue_size_ + 1) {
2779       // Confine testing to a particular segment of the disk.
2780       int64 segment = (block_num / blocks_per_segment_) % num_segments;
2781       if (!non_destructive_ &&
2782           (block_num % blocks_per_segment_ == 0)) {
2783         logprintf(20, "Log: Starting to write segment %lld out of "
2784                   "%lld on disk %s (thread %d).\n",
2785                   segment, num_segments, device_name_.c_str(),
2786                   thread_num_);
2787       }
2788       block_num++;
2789
2790       BlockData *block = block_table_->GetUnusedBlock(segment);
2791
2792       // If an unused sequence of sectors could not be found, skip to the
2793       // next block to process.  Soon, a new segment will come and new
2794       // sectors will be able to be allocated.  This effectively puts a
2795       // minumim on the disk size at 3x the stated cache size, or 48MiB
2796       // if a cache size is not given (since the cache is set as 16MiB
2797       // by default).  Given that todays caches are at the low MiB range
2798       // and drive sizes at the mid GB, this shouldn't pose a problem.
2799       // The 3x minimum comes from the following:
2800       //   1. In order to allocate 'y' blocks from a segment, the
2801       //      segment must contain at least 2y blocks or else an
2802       //      allocation may not succeed.
2803       //   2. Assume the entire disk is one segment.
2804       //   3. A full write phase consists of writing blocks corresponding to
2805       //      3/2 cache size.
2806       //   4. Therefore, the one segment must have 2 * 3/2 * cache
2807       //      size worth of blocks = 3 * cache size worth of blocks
2808       //      to complete.
2809       // In non-destructive mode, don't write anything to disk.
2810       if (!non_destructive_) {
2811         if (!WriteBlockToDisk(fd, block)) {
2812           block_table_->RemoveBlock(block);
2813           return false;
2814         }
2815         blocks_written_++;
2816       }
2817
2818       // Block is either initialized by writing, or in nondestructive case,
2819       // initialized by being added into the datastructure for later reading.
2820       block->SetBlockAsInitialized();
2821
2822       in_flight_sectors_.push(block);
2823     }
2824
2825     // Verify blocks on disk.
2826     logprintf(20, "Log: Read phase for disk %s (thread %d).\n",
2827               device_name_.c_str(), thread_num_);
2828     while (IsReadyToRunNoPause() && !in_flight_sectors_.empty()) {
2829       BlockData *block = in_flight_sectors_.front();
2830       in_flight_sectors_.pop();
2831       if (!ValidateBlockOnDisk(fd, block))
2832         return false;
2833       block_table_->RemoveBlock(block);
2834       blocks_read_++;
2835     }
2836   }
2837
2838   pages_copied_ = blocks_written_ + blocks_read_;
2839   return result;
2840 }
2841
2842 // Do an asynchronous disk I/O operation.
2843 // Return false if the IO is not set up.
2844 bool DiskThread::AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
2845                             int64 offset, int64 timeout) {
2846   // Use the Linux native asynchronous I/O interface for reading/writing.
2847   // A read/write consists of three basic steps:
2848   //    1. create an io context.
2849   //    2. prepare and submit an io request to the context
2850   //    3. wait for an event on the context.
2851
2852   struct {
2853     const int opcode;
2854     const char *op_str;
2855     const char *error_str;
2856   } operations[2] = {
2857     { IO_CMD_PREAD, "read", "disk-read-error" },
2858     { IO_CMD_PWRITE, "write", "disk-write-error" }
2859   };
2860
2861   struct iocb cb;
2862   memset(&cb, 0, sizeof(cb));
2863
2864   cb.aio_fildes = fd;
2865   cb.aio_lio_opcode = operations[op].opcode;
2866   cb.u.c.buf = buf;
2867   cb.u.c.nbytes = size;
2868   cb.u.c.offset = offset;
2869
2870   struct iocb *cbs[] = { &cb };
2871   if (io_submit(aio_ctx_, 1, cbs) != 1) {
2872     int error = errno;
2873     char buf[256];
2874     sat_strerror(error, buf, sizeof(buf));
2875     logprintf(0, "Process Error: Unable to submit async %s "
2876                  "on disk %s (thread %d). Error %d, %s\n",
2877               operations[op].op_str, device_name_.c_str(),
2878               thread_num_, error, buf);
2879     return false;
2880   }
2881
2882   struct io_event event;
2883   memset(&event, 0, sizeof(event));
2884   struct timespec tv;
2885   tv.tv_sec = timeout / 1000000;
2886   tv.tv_nsec = (timeout % 1000000) * 1000;
2887   if (io_getevents(aio_ctx_, 1, 1, &event, &tv) != 1) {
2888     // A ctrl-c from the keyboard will cause io_getevents to fail with an
2889     // EINTR error code.  This is not an error and so don't treat it as such,
2890     // but still log it.
2891     int error = errno;
2892     if (error == EINTR) {
2893       logprintf(5, "Log: %s interrupted on disk %s (thread %d).\n",
2894                 operations[op].op_str, device_name_.c_str(),
2895                 thread_num_);
2896     } else {
2897       os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2898       errorcount_ += 1;
2899       logprintf(0, "Hardware Error: Timeout doing async %s to sectors "
2900                    "starting at %lld on disk %s (thread %d).\n",
2901                 operations[op].op_str, offset / kSectorSize,
2902                 device_name_.c_str(), thread_num_);
2903     }
2904
2905     // Don't bother checking return codes since io_cancel seems to always fail.
2906     // Since io_cancel is always failing, destroying and recreating an I/O
2907     // context is a workaround for canceling an in-progress I/O operation.
2908     // TODO(amistry): Find out why io_cancel isn't working and make it work.
2909     io_cancel(aio_ctx_, &cb, &event);
2910     io_destroy(aio_ctx_);
2911     aio_ctx_ = 0;
2912     if (io_setup(5, &aio_ctx_)) {
2913       int error = errno;
2914       char buf[256];
2915       sat_strerror(error, buf, sizeof(buf));
2916       logprintf(0, "Process Error: Unable to create aio context on disk %s"
2917                 " (thread %d) Error %d, %s\n",
2918                 device_name_.c_str(), thread_num_, error, buf);
2919     }
2920
2921     return false;
2922   }
2923
2924   // event.res contains the number of bytes written/read or
2925   // error if < 0, I think.
2926   if (event.res != size) {
2927     errorcount_++;
2928     os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2929
2930     if (event.res < 0) {
2931       switch (event.res) {
2932         case -EIO:
2933           logprintf(0, "Hardware Error: Low-level I/O error while doing %s to "
2934                        "sectors starting at %lld on disk %s (thread %d).\n",
2935                     operations[op].op_str, offset / kSectorSize,
2936                     device_name_.c_str(), thread_num_);
2937           break;
2938         default:
2939           logprintf(0, "Hardware Error: Unknown error while doing %s to "
2940                        "sectors starting at %lld on disk %s (thread %d).\n",
2941                     operations[op].op_str, offset / kSectorSize,
2942                     device_name_.c_str(), thread_num_);
2943       }
2944     } else {
2945       logprintf(0, "Hardware Error: Unable to %s to sectors starting at "
2946                    "%lld on disk %s (thread %d).\n",
2947                 operations[op].op_str, offset / kSectorSize,
2948                 device_name_.c_str(), thread_num_);
2949     }
2950     return false;
2951   }
2952
2953   return true;
2954 }
2955
2956 // Write a block to disk.
2957 // Return false if the block is not written.
2958 bool DiskThread::WriteBlockToDisk(int fd, BlockData *block) {
2959   memset(block_buffer_, 0, block->GetSize());
2960
2961   // Fill block buffer with a pattern
2962   struct page_entry pe;
2963   if (!sat_->GetValid(&pe)) {
2964     // Even though a valid page could not be obatined, it is not an error
2965     // since we can always fill in a pattern directly, albeit slower.
2966     unsigned int *memblock = static_cast<unsigned int *>(block_buffer_);
2967     block->SetPattern(patternlist_->GetRandomPattern());
2968
2969     logprintf(11, "Log: Warning, using pattern fill fallback in "
2970                   "DiskThread::WriteBlockToDisk on disk %s (thread %d).\n",
2971               device_name_.c_str(), thread_num_);
2972
2973     for (int i = 0; i < block->GetSize()/wordsize_; i++) {
2974       memblock[i] = block->GetPattern()->pattern(i);
2975     }
2976   } else {
2977     memcpy(block_buffer_, pe.addr, block->GetSize());
2978     block->SetPattern(pe.pattern);
2979     sat_->PutValid(&pe);
2980   }
2981
2982   logprintf(12, "Log: Writing %lld sectors starting at %lld on disk %s"
2983             " (thread %d).\n",
2984             block->GetSize()/kSectorSize, block->GetAddress(),
2985             device_name_.c_str(), thread_num_);
2986
2987   int64 start_time = GetTime();
2988
2989   if (!AsyncDiskIO(ASYNC_IO_WRITE, fd, block_buffer_, block->GetSize(),
2990                    block->GetAddress() * kSectorSize, write_timeout_)) {
2991     return false;
2992   }
2993
2994   int64 end_time = GetTime();
2995   logprintf(12, "Log: Writing time: %lld us (thread %d).\n",
2996             end_time - start_time, thread_num_);
2997   if (end_time - start_time > write_threshold_) {
2998     logprintf(5, "Log: Write took %lld us which is longer than threshold "
2999                  "%lld us on disk %s (thread %d).\n",
3000               end_time - start_time, write_threshold_, device_name_.c_str(),
3001               thread_num_);
3002   }
3003
3004   return true;
3005 }
3006
3007 // Verify a block on disk.
3008 // Return true if the block was read, also increment errorcount
3009 // if the block had data errors or performance problems.
3010 bool DiskThread::ValidateBlockOnDisk(int fd, BlockData *block) {
3011   int64 blocks = block->GetSize() / read_block_size_;
3012   int64 bytes_read = 0;
3013   int64 current_blocks;
3014   int64 current_bytes;
3015   uint64 address = block->GetAddress();
3016
3017   logprintf(20, "Log: Reading sectors starting at %lld on disk %s "
3018             "(thread %d).\n",
3019             address, device_name_.c_str(), thread_num_);
3020
3021   // Read block from disk and time the read.  If it takes longer than the
3022   // threshold, complain.
3023   if (lseek64(fd, address * kSectorSize, SEEK_SET) == -1) {
3024     logprintf(0, "Process Error: Unable to seek to sector %lld in "
3025               "DiskThread::ValidateSectorsOnDisk on disk %s "
3026               "(thread %d).\n", address, device_name_.c_str(), thread_num_);
3027     return false;
3028   }
3029   int64 start_time = GetTime();
3030
3031   // Split a large write-sized block into small read-sized blocks and
3032   // read them in groups of randomly-sized multiples of read block size.
3033   // This assures all data written on disk by this particular block
3034   // will be tested using a random reading pattern.
3035   while (blocks != 0) {
3036     // Test all read blocks in a written block.
3037     current_blocks = (random() % blocks) + 1;
3038     current_bytes = current_blocks * read_block_size_;
3039
3040     memset(block_buffer_, 0, current_bytes);
3041
3042     logprintf(20, "Log: Reading %lld sectors starting at sector %lld on "
3043               "disk %s (thread %d)\n",
3044               current_bytes / kSectorSize,
3045               (address * kSectorSize + bytes_read) / kSectorSize,
3046               device_name_.c_str(), thread_num_);
3047
3048     if (!AsyncDiskIO(ASYNC_IO_READ, fd, block_buffer_, current_bytes,
3049                      address * kSectorSize + bytes_read,
3050                      write_timeout_)) {
3051       return false;
3052     }
3053
3054     int64 end_time = GetTime();
3055     logprintf(20, "Log: Reading time: %lld us (thread %d).\n",
3056               end_time - start_time, thread_num_);
3057     if (end_time - start_time > read_threshold_) {
3058       logprintf(5, "Log: Read took %lld us which is longer than threshold "
3059                 "%lld us on disk %s (thread %d).\n",
3060                 end_time - start_time, read_threshold_,
3061                 device_name_.c_str(), thread_num_);
3062     }
3063
3064     // In non-destructive mode, don't compare the block to the pattern since
3065     // the block was never written to disk in the first place.
3066     if (!non_destructive_) {
3067       if (CheckRegion(block_buffer_, block->GetPattern(), current_bytes,
3068                       0, bytes_read)) {
3069         os_->ErrorReport(device_name_.c_str(), "disk-pattern-error", 1);
3070         errorcount_ += 1;
3071         logprintf(0, "Hardware Error: Pattern mismatch in block starting at "
3072                   "sector %lld in DiskThread::ValidateSectorsOnDisk on "
3073                   "disk %s (thread %d).\n",
3074                   address, device_name_.c_str(), thread_num_);
3075       }
3076     }
3077
3078     bytes_read += current_blocks * read_block_size_;
3079     blocks -= current_blocks;
3080   }
3081
3082   return true;
3083 }
3084
3085 // Direct device access thread.
3086 // Return false on software error.
3087 bool DiskThread::Work() {
3088   int fd;
3089
3090   logprintf(9, "Log: Starting disk thread %d, disk %s\n",
3091             thread_num_, device_name_.c_str());
3092
3093   srandom(time(NULL));
3094
3095   if (!OpenDevice(&fd)) {
3096     status_ = false;
3097     return false;
3098   }
3099
3100   // Allocate a block buffer aligned to 512 bytes since the kernel requires it
3101   // when using direst IO.
3102   int memalign_result = posix_memalign(&block_buffer_, kBufferAlignment,
3103                               sat_->page_length());
3104   if (memalign_result) {
3105     CloseDevice(fd);
3106     logprintf(0, "Process Error: Unable to allocate memory for buffers "
3107                  "for disk %s (thread %d) posix memalign returned %d.\n",
3108               device_name_.c_str(), thread_num_, memalign_result);
3109     status_ = false;
3110     return false;
3111   }
3112
3113   if (io_setup(5, &aio_ctx_)) {
3114     CloseDevice(fd);
3115     logprintf(0, "Process Error: Unable to create aio context for disk %s"
3116               " (thread %d).\n",
3117               device_name_.c_str(), thread_num_);
3118     status_ = false;
3119     return false;
3120   }
3121
3122   bool result = DoWork(fd);
3123
3124   status_ = result;
3125
3126   io_destroy(aio_ctx_);
3127   CloseDevice(fd);
3128
3129   logprintf(9, "Log: Completed %d (disk %s): disk thread status %d, "
3130                "%d pages copied\n",
3131             thread_num_, device_name_.c_str(), status_, pages_copied_);
3132   return result;
3133 }
3134
3135 RandomDiskThread::RandomDiskThread(DiskBlockTable *block_table)
3136     : DiskThread(block_table) {
3137   update_block_table_ = 0;
3138 }
3139
3140 RandomDiskThread::~RandomDiskThread() {
3141 }
3142
3143 // Workload for random disk thread.
3144 bool RandomDiskThread::DoWork(int fd) {
3145   logprintf(11, "Log: Random phase for disk %s (thread %d).\n",
3146             device_name_.c_str(), thread_num_);
3147   while (IsReadyToRun()) {
3148     BlockData *block = block_table_->GetRandomBlock();
3149     if (block == NULL) {
3150       logprintf(12, "Log: No block available for device %s (thread %d).\n",
3151                 device_name_.c_str(), thread_num_);
3152     } else {
3153       ValidateBlockOnDisk(fd, block);
3154       block_table_->ReleaseBlock(block);
3155       blocks_read_++;
3156     }
3157   }
3158   pages_copied_ = blocks_read_;
3159   return true;
3160 }
3161
3162 MemoryRegionThread::MemoryRegionThread() {
3163   error_injection_ = false;
3164   pages_ = NULL;
3165 }
3166
3167 MemoryRegionThread::~MemoryRegionThread() {
3168   if (pages_ != NULL)
3169     delete pages_;
3170 }
3171
3172 // Set a region of memory or MMIO to be tested.
3173 // Return false if region could not be mapped.
3174 bool MemoryRegionThread::SetRegion(void *region, int64 size) {
3175   int plength = sat_->page_length();
3176   int npages = size / plength;
3177   if (size % plength) {
3178     logprintf(0, "Process Error: region size is not a multiple of SAT "
3179               "page length\n");
3180     return false;
3181   } else {
3182     if (pages_ != NULL)
3183       delete pages_;
3184     pages_ = new PageEntryQueue(npages);
3185     char *base_addr = reinterpret_cast<char*>(region);
3186     region_ = base_addr;
3187     for (int i = 0; i < npages; i++) {
3188       struct page_entry pe;
3189       init_pe(&pe);
3190       pe.addr = reinterpret_cast<void*>(base_addr + i * plength);
3191       pe.offset = i * plength;
3192
3193       pages_->Push(&pe);
3194     }
3195     return true;
3196   }
3197 }
3198
3199 // More detailed error printout for hardware errors in memory or MMIO
3200 // regions.
3201 void MemoryRegionThread::ProcessError(struct ErrorRecord *error,
3202                                       int priority,
3203                                       const char *message) {
3204   uint32 buffer_offset;
3205   if (phase_ == kPhaseCopy) {
3206     // If the error occurred on the Copy Phase, it means that
3207     // the source data (i.e., the main memory) is wrong. so
3208     // just pass it to the original ProcessError to call a
3209     // bad-dimm error
3210     WorkerThread::ProcessError(error, priority, message);
3211   } else if (phase_ == kPhaseCheck) {
3212     // A error on the Check Phase means that the memory region tested
3213     // has an error. Gathering more information and then reporting
3214     // the error.
3215     // Determine if this is a write or read error.
3216     os_->Flush(error->vaddr);
3217     error->reread = *(error->vaddr);
3218     char *good = reinterpret_cast<char*>(&(error->expected));
3219     char *bad = reinterpret_cast<char*>(&(error->actual));
3220     sat_assert(error->expected != error->actual);
3221     unsigned int offset = 0;
3222     for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
3223       if (good[offset] != bad[offset])
3224         break;
3225     }
3226
3227     error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
3228
3229     buffer_offset = error->vbyteaddr - region_;
3230
3231     // Find physical address if possible.
3232     error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
3233     logprintf(priority,
3234               "%s: miscompare on %s, CRC check at %p(0x%llx), "
3235               "offset %llx: read:0x%016llx, reread:0x%016llx "
3236               "expected:0x%016llx\n",
3237               message,
3238               identifier_.c_str(),
3239               error->vaddr,
3240               error->paddr,
3241               buffer_offset,
3242               error->actual,
3243               error->reread,
3244               error->expected);
3245   } else {
3246     logprintf(0, "Process Error: memory region thread raised an "
3247               "unexpected error.");
3248   }
3249 }
3250
3251 // Workload for testion memory or MMIO regions.
3252 // Return false on software error.
3253 bool MemoryRegionThread::Work() {
3254   struct page_entry source_pe;
3255   struct page_entry memregion_pe;
3256   bool result = true;
3257   int64 loops = 0;
3258   const uint64 error_constant = 0x00ba00000000ba00LL;
3259
3260   // For error injection.
3261   int64 *addr = 0x0;
3262   int offset = 0;
3263   int64 data = 0;
3264
3265   logprintf(9, "Log: Starting Memory Region thread %d\n", thread_num_);
3266
3267   while (IsReadyToRun()) {
3268     // Getting pages from SAT and queue.
3269     phase_ = kPhaseNoPhase;
3270     result = result && sat_->GetValid(&source_pe);
3271     if (!result) {
3272       logprintf(0, "Process Error: memory region thread failed to pop "
3273                 "pages from SAT, bailing\n");
3274       break;
3275     }
3276
3277     result = result && pages_->PopRandom(&memregion_pe);
3278     if (!result) {
3279       logprintf(0, "Process Error: memory region thread failed to pop "
3280                 "pages from queue, bailing\n");
3281       break;
3282     }
3283
3284     // Error injection for CRC copy.
3285     if ((sat_->error_injection() || error_injection_) && loops == 1) {
3286       addr = reinterpret_cast<int64*>(source_pe.addr);
3287       offset = random() % (sat_->page_length() / wordsize_);
3288       data = addr[offset];
3289       addr[offset] = error_constant;
3290     }
3291
3292     // Copying SAT page into memory region.
3293     phase_ = kPhaseCopy;
3294     CrcCopyPage(&memregion_pe, &source_pe);
3295     memregion_pe.pattern = source_pe.pattern;
3296
3297     // Error injection for CRC Check.
3298     if ((sat_->error_injection() || error_injection_) && loops == 2) {
3299       addr = reinterpret_cast<int64*>(memregion_pe.addr);
3300       offset = random() % (sat_->page_length() / wordsize_);
3301       data = addr[offset];
3302       addr[offset] = error_constant;
3303     }
3304
3305     // Checking page content in memory region.
3306     phase_ = kPhaseCheck;
3307     CrcCheckPage(&memregion_pe);
3308
3309     phase_ = kPhaseNoPhase;
3310     // Storing pages on their proper queues.
3311     result = result && sat_->PutValid(&source_pe);
3312     if (!result) {
3313       logprintf(0, "Process Error: memory region thread failed to push "
3314                 "pages into SAT, bailing\n");
3315       break;
3316     }
3317     result = result && pages_->Push(&memregion_pe);
3318     if (!result) {
3319       logprintf(0, "Process Error: memory region thread failed to push "
3320                 "pages into queue, bailing\n");
3321       break;
3322     }
3323
3324     if ((sat_->error_injection() || error_injection_) &&
3325         loops >= 1 && loops <= 2) {
3326       addr[offset] = data;
3327     }
3328
3329     loops++;
3330     YieldSelf();
3331   }
3332
3333   pages_copied_ = loops;
3334   status_ = result;
3335   logprintf(9, "Log: Completed %d: Memory Region thread. Status %d, %d "
3336             "pages checked\n", thread_num_, status_, pages_copied_);
3337   return result;
3338 }