chiark / gitweb /
08b5a4e291c38bb00e503ed865e481af4888260a
[stressapptest] / src / worker.cc
1 // Copyright 2006 Google Inc. All Rights Reserved.
2
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6
7 //      http://www.apache.org/licenses/LICENSE-2.0
8
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // worker.cc : individual tasks that can be run in combination to
16 // stress the system
17
18 #include <errno.h>
19 #include <pthread.h>
20 #include <sched.h>
21 #include <signal.h>
22 #include <stdlib.h>
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <string.h>
26 #include <time.h>
27 #include <unistd.h>
28
29 #include <sys/select.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <sys/times.h>
33
34 // These are necessary, but on by default
35 // #define __USE_GNU
36 // #define __USE_LARGEFILE64
37 #include <fcntl.h>
38 #include <sys/socket.h>
39 #include <netdb.h>
40 #include <arpa/inet.h>
41 #include <linux/unistd.h>  // for gettid
42
43 // For size of block device
44 #include <sys/ioctl.h>
45 #include <linux/fs.h>
46 // For asynchronous I/O
47 #include <linux/aio_abi.h>
48
49 #include <sys/syscall.h>
50
51 #include <set>
52 #include <string>
53
54 // This file must work with autoconf on its public version,
55 // so these includes are correct.
56 #include "error_diag.h"  // NOLINT
57 #include "os.h"          // NOLINT
58 #include "pattern.h"     // NOLINT
59 #include "queue.h"       // NOLINT
60 #include "sat.h"         // NOLINT
61 #include "sattypes.h"    // NOLINT
62 #include "worker.h"      // NOLINT
63
64 // Syscalls
65 // Why ubuntu, do you hate gettid so bad?
66 #if !defined(__NR_gettid)
67   #define __NR_gettid             224
68 #endif
69
70 #define gettid() syscall(__NR_gettid)
71 #if !defined(CPU_SETSIZE)
72 _syscall3(int, sched_getaffinity, pid_t, pid,
73           unsigned int, len, cpu_set_t*, mask)
74 _syscall3(int, sched_setaffinity, pid_t, pid,
75           unsigned int, len, cpu_set_t*, mask)
76 #endif
77
78 // Linux aio syscalls.
79 #if !defined(__NR_io_setup)
80 #define __NR_io_setup   206
81 #define __NR_io_destroy 207
82 #define __NR_io_getevents       208
83 #define __NR_io_submit  209
84 #define __NR_io_cancel  210
85 #endif
86
87 #define io_setup(nr_events, ctxp) \
88   syscall(__NR_io_setup, (nr_events), (ctxp))
89 #define io_submit(ctx_id, nr, iocbpp) \
90   syscall(__NR_io_submit, (ctx_id), (nr), (iocbpp))
91 #define io_getevents(ctx_id, io_getevents, nr, events, timeout) \
92   syscall(__NR_io_getevents, (ctx_id), (io_getevents), (nr), (events), \
93     (timeout))
94 #define io_cancel(ctx_id, iocb, result) \
95   syscall(__NR_io_cancel, (ctx_id), (iocb), (result))
96 #define io_destroy(ctx) \
97   syscall(__NR_io_destroy, (ctx))
98
99 namespace {
100   // Get HW core ID from cpuid instruction.
101   inline int apicid(void) {
102     int cpu;
103 #if defined(STRESSAPPTEST_CPU_X86_64) || defined(STRESSAPPTEST_CPU_I686)
104     __asm __volatile("cpuid" : "=b" (cpu) : "a" (1) : "cx", "dx");
105 #else
106   #warning "Unsupported CPU type: unable to determine core ID."
107     cpu = 0;
108 #endif
109     return (cpu >> 24);
110   }
111
112   // Work around the sad fact that there are two (gnu, xsi) incompatible
113   // versions of strerror_r floating around google. Awesome.
114   bool sat_strerror(int err, char *buf, int len) {
115     buf[0] = 0;
116     char *errmsg = reinterpret_cast<char*>(strerror_r(err, buf, len));
117     int retval = reinterpret_cast<int64>(errmsg);
118     if (retval == 0)
119       return true;
120     if (retval == -1)
121       return false;
122     if (errmsg != buf) {
123       strncpy(buf, errmsg, len);
124       buf[len - 1] = 0;
125     }
126     return true;
127   }
128
129
130   inline uint64 addr_to_tag(void *address) {
131     return reinterpret_cast<uint64>(address);
132   }
133 }
134
135 #if !defined(O_DIRECT)
136 // Sometimes this isn't available.
137 // Disregard if it's not defined.
138   #define O_DIRECT            0
139 #endif
140
141 // A struct to hold captured errors, for later reporting.
142 struct ErrorRecord {
143   uint64 actual;  // This is the actual value read.
144   uint64 reread;  // This is the actual value, reread.
145   uint64 expected;  // This is what it should have been.
146   uint64 *vaddr;  // This is where it was (or wasn't).
147   char *vbyteaddr;  // This is byte specific where the data was (or wasn't).
148   uint64 paddr;  // This is the bus address, if available.
149   uint64 *tagvaddr;  // This holds the tag value if this data was tagged.
150   uint64 tagpaddr;  // This holds the physical address corresponding to the tag.
151 };
152
153 // This is a helper function to create new threads with pthreads.
154 static void *ThreadSpawnerGeneric(void *ptr) {
155   WorkerThread *worker = static_cast<WorkerThread*>(ptr);
156   worker->StartRoutine();
157   return NULL;
158 }
159
160
161 void WorkerStatus::Initialize() {
162   sat_assert(0 == pthread_mutex_init(&num_workers_mutex_, NULL));
163   sat_assert(0 == pthread_rwlock_init(&status_rwlock_, NULL));
164   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL,
165                                        num_workers_ + 1));
166 }
167
168 void WorkerStatus::Destroy() {
169   sat_assert(0 == pthread_mutex_destroy(&num_workers_mutex_));
170   sat_assert(0 == pthread_rwlock_destroy(&status_rwlock_));
171   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
172 }
173
174 void WorkerStatus::PauseWorkers() {
175   if (SetStatus(PAUSE) != PAUSE)
176     WaitOnPauseBarrier();
177 }
178
179 void WorkerStatus::ResumeWorkers() {
180   if (SetStatus(RUN) == PAUSE)
181     WaitOnPauseBarrier();
182 }
183
184 void WorkerStatus::StopWorkers() {
185   if (SetStatus(STOP) == PAUSE)
186     WaitOnPauseBarrier();
187 }
188
189 bool WorkerStatus::ContinueRunning() {
190   // This loop is an optimization.  We use it to immediately re-check the status
191   // after resuming from a pause, instead of returning and waiting for the next
192   // call to this function.
193   for (;;) {
194     switch (GetStatus()) {
195       case RUN:
196         return true;
197       case PAUSE:
198         // Wait for the other workers to call this function so that
199         // PauseWorkers() can return.
200         WaitOnPauseBarrier();
201         // Wait for ResumeWorkers() to be called.
202         WaitOnPauseBarrier();
203         break;
204       case STOP:
205         return false;
206     }
207   }
208 }
209
210 bool WorkerStatus::ContinueRunningNoPause() {
211   return (GetStatus() != STOP);
212 }
213
214 void WorkerStatus::RemoveSelf() {
215   // Acquire a read lock on status_rwlock_ while (status_ != PAUSE).
216   for (;;) {
217     AcquireStatusReadLock();
218     if (status_ != PAUSE)
219       break;
220     // We need to obey PauseWorkers() just like ContinueRunning() would, so that
221     // the other threads won't wait on pause_barrier_ forever.
222     ReleaseStatusLock();
223     // Wait for the other workers to call this function so that PauseWorkers()
224     // can return.
225     WaitOnPauseBarrier();
226     // Wait for ResumeWorkers() to be called.
227     WaitOnPauseBarrier();
228   }
229
230   // This lock would be unnecessary if we held a write lock instead of a read
231   // lock on status_rwlock_, but that would also force all threads calling
232   // ContinueRunning() to wait on this one.  Using a separate lock avoids that.
233   AcquireNumWorkersLock();
234   // Decrement num_workers_ and reinitialize pause_barrier_, which we know isn't
235   // in use because (status != PAUSE).
236   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
237   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL, num_workers_));
238   --num_workers_;
239   ReleaseNumWorkersLock();
240
241   // Release status_rwlock_.
242   ReleaseStatusLock();
243 }
244
245
246 // Parent thread class.
247 WorkerThread::WorkerThread() {
248   status_ = 0;
249   pages_copied_ = 0;
250   errorcount_ = 0;
251   runduration_usec_ = 0;
252   priority_ = Normal;
253   worker_status_ = NULL;
254   thread_spawner_ = &ThreadSpawnerGeneric;
255   tag_mode_ = false;
256 }
257
258 WorkerThread::~WorkerThread() {}
259
260 // Constructors. Just init some default values.
261 FillThread::FillThread() {
262   num_pages_to_fill_ = 0;
263 }
264
265 // Initialize file name to empty.
266 FileThread::FileThread() {
267   filename_ = "";
268   devicename_ = "";
269   pass_ = 0;
270   page_io_ = true;
271   crc_page_ = -1;
272   local_page_ = NULL;
273 }
274
275 // If file thread used bounce buffer in memory, account for the extra
276 // copy for memory bandwidth calculation.
277 float FileThread::GetMemoryCopiedData() {
278   if (!os_->normal_mem())
279     return GetCopiedData();
280   else
281     return 0;
282 }
283
284 // Initialize target hostname to be invalid.
285 NetworkThread::NetworkThread() {
286   snprintf(ipaddr_, sizeof(ipaddr_), "Unknown");
287   sock_ = 0;
288 }
289
290 // Initialize?
291 NetworkSlaveThread::NetworkSlaveThread() {
292 }
293
294 // Initialize?
295 NetworkListenThread::NetworkListenThread() {
296 }
297
298 // Init member variables.
299 void WorkerThread::InitThread(int thread_num_init,
300                               class Sat *sat_init,
301                               class OsLayer *os_init,
302                               class PatternList *patternlist_init,
303                               WorkerStatus *worker_status) {
304   sat_assert(worker_status);
305   worker_status->AddWorkers(1);
306
307   thread_num_ = thread_num_init;
308   sat_ = sat_init;
309   os_ = os_init;
310   patternlist_ = patternlist_init;
311   worker_status_ = worker_status;
312
313   cpu_mask_ = AvailableCpus();
314   tag_ = 0xffffffff;
315
316   tag_mode_ = sat_->tag_mode();
317 }
318
319
320 // Use pthreads to prioritize a system thread.
321 bool WorkerThread::InitPriority() {
322   // This doesn't affect performance that much, and may not be too safe.
323
324   bool ret = BindToCpus(cpu_mask_);
325   if (!ret)
326     logprintf(11, "Log: Bind to %x failed.\n", cpu_mask_);
327
328   logprintf(11, "Log: Thread %d running on apic ID %d mask %x (%x).\n",
329             thread_num_, apicid(), CurrentCpus(), cpu_mask_);
330 #if 0
331   if (priority_ == High) {
332     sched_param param;
333     param.sched_priority = 1;
334     // Set the priority; others are unchanged.
335     logprintf(0, "Log: Changing priority to SCHED_FIFO %d\n",
336               param.sched_priority);
337     if (sched_setscheduler(0, SCHED_FIFO, &param)) {
338       char buf[256];
339       sat_strerror(errno, buf, sizeof(buf));
340       logprintf(0, "Process Error: sched_setscheduler "
341                    "failed - error %d %s\n",
342                 errno, buf);
343     }
344   }
345 #endif
346   return true;
347 }
348
349 // Use pthreads to create a system thread.
350 int WorkerThread::SpawnThread() {
351   // Create the new thread.
352   int result = pthread_create(&thread_, NULL, thread_spawner_, this);
353   if (result) {
354     char buf[256];
355     sat_strerror(result, buf, sizeof(buf));
356     logprintf(0, "Process Error: pthread_create "
357                   "failed - error %d %s\n", result,
358               buf);
359     status_ += 1;
360     return false;
361   }
362
363   // 0 is pthreads success.
364   return true;
365 }
366
367 // Kill the worker thread with SIGINT.
368 int WorkerThread::KillThread() {
369   pthread_kill(thread_, SIGINT);
370   return 0;
371 }
372
373 // Block until thread has exited.
374 int WorkerThread::JoinThread() {
375   int result = pthread_join(thread_, NULL);
376
377   if (result) {
378     logprintf(0, "Process Error: pthread_join failed - error %d\n", result);
379     status_ = 0;
380   }
381
382   // 0 is pthreads success.
383   return (!result);
384 }
385
386
387 void WorkerThread::StartRoutine() {
388   InitPriority();
389   StartThreadTimer();
390   Work();
391   StopThreadTimer();
392   worker_status_->RemoveSelf();
393 }
394
395
396 // Thread work loop. Execute until marked finished.
397 int WorkerThread::Work() {
398   do {
399     logprintf(9, "Log: ...\n");
400     // Sleep for 1 second.
401     sat_sleep(1);
402   } while (IsReadyToRun());
403
404   return 0;
405 }
406
407
408 // Returns CPU mask of CPUs available to this process,
409 // Conceptually, each bit represents a logical CPU, ie:
410 //   mask = 3  (11b):   cpu0, 1
411 //   mask = 13 (1101b): cpu0, 2, 3
412 uint32 WorkerThread::AvailableCpus() {
413   cpu_set_t curr_cpus;
414   CPU_ZERO(&curr_cpus);
415   sched_getaffinity(getppid(), sizeof(curr_cpus), &curr_cpus);
416   return cpuset_to_uint32(&curr_cpus);
417 }
418
419
420 // Returns CPU mask of CPUs this thread is bound to,
421 // Conceptually, each bit represents a logical CPU, ie:
422 //   mask = 3  (11b):   cpu0, 1
423 //   mask = 13 (1101b): cpu0, 2, 3
424 uint32 WorkerThread::CurrentCpus() {
425   cpu_set_t curr_cpus;
426   CPU_ZERO(&curr_cpus);
427   sched_getaffinity(0, sizeof(curr_cpus), &curr_cpus);
428   return cpuset_to_uint32(&curr_cpus);
429 }
430
431
432 // Bind worker thread to specified CPU(s)
433 //   Args:
434 //     thread_mask: cpu_set_t representing CPUs, ie
435 //                  mask = 1  (01b):   cpu0
436 //                  mask = 3  (11b):   cpu0, 1
437 //                  mask = 13 (1101b): cpu0, 2, 3
438 //
439 //   Returns true on success, false otherwise.
440 bool WorkerThread::BindToCpus(uint32 thread_mask) {
441   uint32 process_mask = AvailableCpus();
442   if (thread_mask == process_mask)
443     return true;
444
445   logprintf(11, "Log: available CPU mask - %x\n", process_mask);
446   if ((thread_mask | process_mask) != process_mask) {
447     // Invalid cpu_mask, ie cpu not allocated to this process or doesn't exist.
448     logprintf(0, "Log: requested CPUs %x not a subset of available %x\n",
449               thread_mask, process_mask);
450     return false;
451   }
452   cpu_set_t cpuset;
453   cpuset_from_uint32(thread_mask, &cpuset);
454   return (sched_setaffinity(gettid(), sizeof(cpuset), &cpuset) == 0);
455 }
456
457
458 // A worker thread can yield itself to give up CPU until it's scheduled again.
459 //   Returns true on success, false on error.
460 bool WorkerThread::YieldSelf() {
461   return (sched_yield() == 0);
462 }
463
464
465 // Fill this page with its pattern.
466 bool WorkerThread::FillPage(struct page_entry *pe) {
467   // Error check arguments.
468   if (pe == 0) {
469     logprintf(0, "Process Error: Fill Page entry null\n");
470     return 0;
471   }
472
473   // Mask is the bitmask of indexes used by the pattern.
474   // It is the pattern size -1. Size is always a power of 2.
475   uint64 *memwords = static_cast<uint64*>(pe->addr);
476   int length = sat_->page_length();
477
478   if (tag_mode_) {
479     // Select tag or data as appropriate.
480     for (int i = 0; i < length / wordsize_; i++) {
481       datacast_t data;
482
483       if ((i & 0x7) == 0) {
484         data.l64 = addr_to_tag(&memwords[i]);
485       } else {
486         data.l32.l = pe->pattern->pattern(i << 1);
487         data.l32.h = pe->pattern->pattern((i << 1) + 1);
488       }
489       memwords[i] = data.l64;
490     }
491   } else {
492     // Just fill in untagged data directly.
493     for (int i = 0; i < length / wordsize_; i++) {
494       datacast_t data;
495
496       data.l32.l = pe->pattern->pattern(i << 1);
497       data.l32.h = pe->pattern->pattern((i << 1) + 1);
498       memwords[i] = data.l64;
499     }
500   }
501
502   return 1;
503 }
504
505
506 // Tell the thread how many pages to fill.
507 void FillThread::SetFillPages(int64 num_pages_to_fill_init) {
508   num_pages_to_fill_ = num_pages_to_fill_init;
509 }
510
511 // Fill this page with a random pattern.
512 bool FillThread::FillPageRandom(struct page_entry *pe) {
513   // Error check arguments.
514   if (pe == 0) {
515     logprintf(0, "Process Error: Fill Page entry null\n");
516     return 0;
517   }
518   if ((patternlist_ == 0) || (patternlist_->Size() == 0)) {
519     logprintf(0, "Process Error: No data patterns available\n");
520     return 0;
521   }
522
523   // Choose a random pattern for this block.
524   pe->pattern = patternlist_->GetRandomPattern();
525   if (pe->pattern == 0) {
526     logprintf(0, "Process Error: Null data pattern\n");
527     return 0;
528   }
529
530   // Actually fill the page.
531   return FillPage(pe);
532 }
533
534
535 // Memory fill work loop. Execute until alloted pages filled.
536 int FillThread::Work() {
537   int result = 1;
538
539   logprintf(9, "Log: Starting fill thread %d\n", thread_num_);
540
541   // We want to fill num_pages_to_fill pages, and
542   // stop when we've filled that many.
543   // We also want to capture early break
544   struct page_entry pe;
545   int64 loops = 0;
546   while (IsReadyToRun() && (loops < num_pages_to_fill_)) {
547     result &= sat_->GetEmpty(&pe);
548     if (!result) {
549       logprintf(0, "Process Error: fill_thread failed to pop pages, "
550                 "bailing\n");
551       break;
552     }
553
554     // Fill the page with pattern
555     result &= FillPageRandom(&pe);
556     if (!result) break;
557
558     // Put the page back on the queue.
559     result &= sat_->PutValid(&pe);
560     if (!result) {
561       logprintf(0, "Process Error: fill_thread failed to push pages, "
562                 "bailing\n");
563       break;
564     }
565     loops++;
566   }
567
568   // Fill in thread status.
569   pages_copied_ = loops;
570   status_ = result;
571   logprintf(9, "Log: Completed %d: Fill thread. Status %d, %d pages filled\n",
572             thread_num_, status_, pages_copied_);
573   return 0;
574 }
575
576
577 // Print error information about a data miscompare.
578 void WorkerThread::ProcessError(struct ErrorRecord *error,
579                                 int priority,
580                                 const char *message) {
581   char dimm_string[256] = "";
582
583   int apic_id = apicid();
584   uint32 cpumask = CurrentCpus();
585
586   // Determine if this is a write or read error.
587   os_->Flush(error->vaddr);
588   error->reread = *(error->vaddr);
589
590   char *good = reinterpret_cast<char*>(&(error->expected));
591   char *bad = reinterpret_cast<char*>(&(error->actual));
592
593   sat_assert(error->expected != error->actual);
594   unsigned int offset = 0;
595   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
596     if (good[offset] != bad[offset])
597       break;
598   }
599
600   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
601
602   // Find physical address if possible.
603   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
604
605   // Pretty print DIMM mapping if available.
606   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
607
608   // Report parseable error.
609   if (priority < 5) {
610     // Run miscompare error through diagnoser for logging and reporting.
611     os_->error_diagnoser_->AddMiscompareError(dimm_string,
612                                               reinterpret_cast<uint64>
613                                               (error->vaddr), 1);
614
615     logprintf(priority,
616               "%s: miscompare on CPU %d(0x%x) at %p(0x%llx:%s): "
617               "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
618               message,
619               apic_id,
620               cpumask,
621               error->vaddr,
622               error->paddr,
623               dimm_string,
624               error->actual,
625               error->reread,
626               error->expected);
627   }
628
629
630   // Overwrite incorrect data with correct data to prevent
631   // future miscompares when this data is reused.
632   *(error->vaddr) = error->expected;
633   os_->Flush(error->vaddr);
634 }
635
636
637
638 // Print error information about a data miscompare.
639 void FileThread::ProcessError(struct ErrorRecord *error,
640                               int priority,
641                               const char *message) {
642   char dimm_string[256] = "";
643
644   // Determine if this is a write or read error.
645   os_->Flush(error->vaddr);
646   error->reread = *(error->vaddr);
647
648   char *good = reinterpret_cast<char*>(&(error->expected));
649   char *bad = reinterpret_cast<char*>(&(error->actual));
650
651   sat_assert(error->expected != error->actual);
652   unsigned int offset = 0;
653   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
654     if (good[offset] != bad[offset])
655       break;
656   }
657
658   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
659
660   // Find physical address if possible.
661   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
662
663   // Pretty print DIMM mapping if available.
664   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
665
666   // If crc_page_ is valid, ie checking content read back from file,
667   // track src/dst memory addresses. Otherwise catagorize as general
668   // mememory miscompare for CRC checking everywhere else.
669   if (crc_page_ != -1) {
670     int miscompare_byteoffset = static_cast<char*>(error->vbyteaddr) -
671                                 static_cast<char*>(page_recs_[crc_page_].dst);
672     os_->error_diagnoser_->AddHDDMiscompareError(devicename_,
673                                                  crc_page_,
674                                                  miscompare_byteoffset,
675                                                  page_recs_[crc_page_].src,
676                                                  page_recs_[crc_page_].dst);
677   } else {
678     os_->error_diagnoser_->AddMiscompareError(dimm_string,
679                                               reinterpret_cast<uint64>
680                                               (error->vaddr), 1);
681   }
682
683   logprintf(priority,
684             "%s: miscompare on %s at %p(0x%llx:%s): read:0x%016llx, "
685             "reread:0x%016llx expected:0x%016llx\n",
686             message,
687             devicename_.c_str(),
688             error->vaddr,
689             error->paddr,
690             dimm_string,
691             error->actual,
692             error->reread,
693             error->expected);
694
695   // Overwrite incorrect data with correct data to prevent
696   // future miscompares when this data is reused.
697   *(error->vaddr) = error->expected;
698   os_->Flush(error->vaddr);
699 }
700
701
702 // Do a word by word result check of a region.
703 // Print errors on mismatches.
704 int WorkerThread::CheckRegion(void *addr,
705                               class Pattern *pattern,
706                               int64 length,
707                               int offset,
708                               int64 pattern_offset) {
709   uint64 *memblock = static_cast<uint64*>(addr);
710   const int kErrorLimit = 128;
711   int errors = 0;
712   int overflowerrors = 0;  // Count of overflowed errors.
713   bool page_error = false;
714   string errormessage("Hardware Error");
715   struct ErrorRecord
716     recorded[kErrorLimit];  // Queued errors for later printing.
717
718   // For each word in the data region.
719   for (int i = 0; i < length / wordsize_; i++) {
720     uint64 actual = memblock[i];
721     uint64 expected;
722
723     // Determine the value that should be there.
724     datacast_t data;
725     int index = 2 * i + pattern_offset;
726     data.l32.l = pattern->pattern(index);
727     data.l32.h = pattern->pattern(index + 1);
728     expected = data.l64;
729     // Check tags if necessary.
730     if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
731       expected = addr_to_tag(&memblock[i]);
732     }
733
734
735     // If the value is incorrect, save an error record for later printing.
736     if (actual != expected) {
737       if (errors < kErrorLimit) {
738         recorded[errors].actual = actual;
739         recorded[errors].expected = expected;
740         recorded[errors].vaddr = &memblock[i];
741         errors++;
742       } else {
743         page_error = true;
744         // If we have overflowed the error queue, just print the errors now.
745         logprintf(10, "Log: Error record overflow, too many miscompares!\n");
746         errormessage = "Page Error";
747         break;
748       }
749     }
750   }
751
752   // Find if this is a whole block corruption.
753   if (page_error && !tag_mode_) {
754     int patsize = patternlist_->Size();
755     for (int pat = 0; pat < patsize; pat++) {
756       class Pattern *altpattern = patternlist_->GetPattern(pat);
757       const int kGood = 0;
758       const int kBad = 1;
759       const int kGoodAgain = 2;
760       const int kNoMatch = 3;
761       int state = kGood;
762       unsigned int badstart = 0;
763       unsigned int badend = 0;
764
765       // Don't match against ourself!
766       if (pattern == altpattern)
767         continue;
768
769       for (int i = 0; i < length / wordsize_; i++) {
770         uint64 actual = memblock[i];
771         datacast_t expected;
772         datacast_t possible;
773
774         // Determine the value that should be there.
775         int index = 2 * i + pattern_offset;
776
777         expected.l32.l = pattern->pattern(index);
778         expected.l32.h = pattern->pattern(index + 1);
779
780         possible.l32.l = pattern->pattern(index);
781         possible.l32.h = pattern->pattern(index + 1);
782
783         if (state == kGood) {
784           if (actual == expected.l64) {
785             continue;
786           } else if (actual == possible.l64) {
787             badstart = i;
788             badend = i;
789             state = kBad;
790             continue;
791           } else {
792             state = kNoMatch;
793             break;
794           }
795         } else if (state == kBad) {
796           if (actual == possible.l64) {
797             badend = i;
798             continue;
799           } else if (actual == expected.l64) {
800             state = kGoodAgain;
801             continue;
802           } else {
803             state = kNoMatch;
804             break;
805           }
806         } else if (state == kGoodAgain) {
807           if (actual == expected.l64) {
808             continue;
809           } else {
810             state = kNoMatch;
811             break;
812           }
813         }
814       }
815
816       if ((state == kGoodAgain) || (state == kBad)) {
817         unsigned int blockerrors = badend - badstart + 1;
818         errormessage = "Block Error";
819         ProcessError(&recorded[0], 0, errormessage.c_str());
820         logprintf(0, "Block Error: (%p) pattern %s instead of %s, "
821                   "%d bytes from offset 0x%x to 0x%x\n",
822                   &memblock[badstart],
823                   altpattern->name(), pattern->name(),
824                   blockerrors * wordsize_,
825                   offset + badstart * wordsize_,
826                   offset + badend * wordsize_);
827         errorcount_ += blockerrors;
828         return blockerrors;
829       }
830     }
831   }
832
833
834   // Process error queue after all errors have been recorded.
835   for (int err = 0; err < errors; err++) {
836     int priority = 5;
837     if (errorcount_ + err < 30)
838       priority = 0;  // Bump up the priority for the first few errors.
839     ProcessError(&recorded[err], priority, errormessage.c_str());
840   }
841
842   if (page_error) {
843     // For each word in the data region.
844     int error_recount = 0;
845     for (int i = 0; i < length / wordsize_; i++) {
846       uint64 actual = memblock[i];
847       uint64 expected;
848       datacast_t data;
849       // Determine the value that should be there.
850       int index = 2 * i + pattern_offset;
851
852       data.l32.l = pattern->pattern(index);
853       data.l32.h = pattern->pattern(index + 1);
854       expected = data.l64;
855
856       // Check tags if necessary.
857       if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
858         expected = addr_to_tag(&memblock[i]);
859       }
860
861       // If the value is incorrect, save an error record for later printing.
862       if (actual != expected) {
863         if (error_recount < kErrorLimit) {
864           // We already reported these.
865           error_recount++;
866         } else {
867           // If we have overflowed the error queue, print the errors now.
868           struct ErrorRecord er;
869           er.actual = actual;
870           er.expected = expected;
871           er.vaddr = &memblock[i];
872
873           // Do the error printout. This will take a long time and
874           // likely change the machine state.
875           ProcessError(&er, 12, errormessage.c_str());
876           overflowerrors++;
877         }
878       }
879     }
880   }
881
882   // Keep track of observed errors.
883   errorcount_ += errors + overflowerrors;
884   return errors + overflowerrors;
885 }
886
887 float WorkerThread::GetCopiedData() {
888   return pages_copied_ * sat_->page_length() / kMegabyte;
889 }
890
891 // Calculate the CRC of a region.
892 // Result check if the CRC mismatches.
893 int WorkerThread::CrcCheckPage(struct page_entry *srcpe) {
894   const int blocksize = 4096;
895   const int blockwords = blocksize / wordsize_;
896   int errors = 0;
897
898   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
899   uint64 *memblock = static_cast<uint64*>(srcpe->addr);
900   int blocks = sat_->page_length() / blocksize;
901   for (int currentblock = 0; currentblock < blocks; currentblock++) {
902     uint64 *memslice = memblock + currentblock * blockwords;
903
904     AdlerChecksum crc;
905     if (tag_mode_) {
906       AdlerAddrCrcC(memslice, blocksize, &crc, srcpe);
907     } else {
908       CalculateAdlerChecksum(memslice, blocksize, &crc);
909     }
910
911     // If the CRC does not match, we'd better look closer.
912     if (!crc.Equals(*expectedcrc)) {
913       logprintf(11, "Log: CrcCheckPage Falling through to slow compare, "
914                 "CRC mismatch %s != %s\n",
915                 crc.ToHexString().c_str(),
916                 expectedcrc->ToHexString().c_str());
917       int errorcount = CheckRegion(memslice,
918                                    srcpe->pattern,
919                                    blocksize,
920                                    currentblock * blocksize, 0);
921       if (errorcount == 0) {
922         logprintf(0, "Log: CrcCheckPage CRC mismatch %s != %s, "
923                      "but no miscompares found.\n",
924                   crc.ToHexString().c_str(),
925                   expectedcrc->ToHexString().c_str());
926       }
927       errors += errorcount;
928     }
929   }
930
931   // For odd length transfers, we should never hit this.
932   int leftovers = sat_->page_length() % blocksize;
933   if (leftovers) {
934     uint64 *memslice = memblock + blocks * blockwords;
935     errors += CheckRegion(memslice,
936                           srcpe->pattern,
937                           leftovers,
938                           blocks * blocksize, 0);
939   }
940   return errors;
941 }
942
943
944 // Print error information about a data miscompare.
945 void WorkerThread::ProcessTagError(struct ErrorRecord *error,
946                                    int priority,
947                                    const char *message) {
948   char dimm_string[256] = "";
949   char tag_dimm_string[256] = "";
950   bool read_error = false;
951
952   int apic_id = apicid();
953   uint32 cpumask = CurrentCpus();
954
955   // Determine if this is a write or read error.
956   os_->Flush(error->vaddr);
957   error->reread = *(error->vaddr);
958
959   // Distinguish read and write errors.
960   if (error->actual != error->reread) {
961     read_error = true;
962   }
963
964   sat_assert(error->expected != error->actual);
965
966   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr);
967
968   // Find physical address if possible.
969   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
970   error->tagpaddr = os_->VirtualToPhysical(error->tagvaddr);
971
972   // Pretty print DIMM mapping if available.
973   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
974   // Pretty print DIMM mapping if available.
975   os_->FindDimm(error->tagpaddr, tag_dimm_string, sizeof(tag_dimm_string));
976
977   // Report parseable error.
978   if (priority < 5) {
979     logprintf(priority,
980               "%s: Tag from %p(0x%llx:%s) (%s) "
981               "miscompare on CPU %d(0x%x) at %p(0x%llx:%s): "
982               "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
983               message,
984               error->tagvaddr, error->tagpaddr,
985               tag_dimm_string,
986               read_error ? "read error" : "write error",
987               apic_id,
988               cpumask,
989               error->vaddr,
990               error->paddr,
991               dimm_string,
992               error->actual,
993               error->reread,
994               error->expected);
995   }
996
997   errorcount_ += 1;
998
999   // Overwrite incorrect data with correct data to prevent
1000   // future miscompares when this data is reused.
1001   *(error->vaddr) = error->expected;
1002   os_->Flush(error->vaddr);
1003 }
1004
1005
1006 // Print out and log a tag error.
1007 bool WorkerThread::ReportTagError(
1008     uint64 *mem64,
1009     uint64 actual,
1010     uint64 tag) {
1011   struct ErrorRecord er;
1012   er.actual = actual;
1013
1014   er.expected = tag;
1015   er.vaddr = mem64;
1016
1017   // Generate vaddr from tag.
1018   er.tagvaddr = reinterpret_cast<uint64*>(actual);
1019
1020   ProcessTagError(&er, 0, "Hardware Error");
1021   return true;
1022 }
1023
1024 // C implementation of Adler memory copy, with memory tagging.
1025 bool WorkerThread::AdlerAddrMemcpyC(uint64 *dstmem64,
1026                                     uint64 *srcmem64,
1027                                     unsigned int size_in_bytes,
1028                                     AdlerChecksum *checksum,
1029                                     struct page_entry *pe) {
1030   // Use this data wrapper to access memory with 64bit read/write.
1031   datacast_t data;
1032   datacast_t dstdata;
1033   unsigned int count = size_in_bytes / sizeof(data);
1034
1035   if (count > ((1U) << 19)) {
1036     // Size is too large, must be strictly less than 512 KB.
1037     return false;
1038   }
1039
1040   uint64 a1 = 1;
1041   uint64 a2 = 1;
1042   uint64 b1 = 0;
1043   uint64 b2 = 0;
1044
1045   class Pattern *pattern = pe->pattern;
1046
1047   unsigned int i = 0;
1048   while (i < count) {
1049     // Process 64 bits at a time.
1050     if ((i & 0x7) == 0) {
1051       data.l64 = srcmem64[i];
1052       dstdata.l64 = dstmem64[i];
1053       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1054       uint64 dst_tag = addr_to_tag(&dstmem64[i]);
1055       // Detect if tags have been corrupted.
1056       if (data.l64 != src_tag)
1057         ReportTagError(&srcmem64[i], data.l64, src_tag);
1058       if (dstdata.l64 != dst_tag)
1059         ReportTagError(&dstmem64[i], dstdata.l64, dst_tag);
1060
1061       data.l32.l = pattern->pattern(i << 1);
1062       data.l32.h = pattern->pattern((i << 1) + 1);
1063       a1 = a1 + data.l32.l;
1064       b1 = b1 + a1;
1065       a1 = a1 + data.l32.h;
1066       b1 = b1 + a1;
1067
1068       data.l64  = dst_tag;
1069       dstmem64[i] = data.l64;
1070
1071     } else {
1072       data.l64 = srcmem64[i];
1073       a1 = a1 + data.l32.l;
1074       b1 = b1 + a1;
1075       a1 = a1 + data.l32.h;
1076       b1 = b1 + a1;
1077       dstmem64[i] = data.l64;
1078     }
1079     i++;
1080
1081     data.l64 = srcmem64[i];
1082     a2 = a2 + data.l32.l;
1083     b2 = b2 + a2;
1084     a2 = a2 + data.l32.h;
1085     b2 = b2 + a2;
1086     dstmem64[i] = data.l64;
1087     i++;
1088   }
1089   checksum->Set(a1, a2, b1, b2);
1090   return true;
1091 }
1092
1093
1094 // C implementation of Adler memory crc.
1095 bool WorkerThread::AdlerAddrCrcC(uint64 *srcmem64,
1096                                  unsigned int size_in_bytes,
1097                                  AdlerChecksum *checksum,
1098                                  struct page_entry *pe) {
1099   // Use this data wrapper to access memory with 64bit read/write.
1100   datacast_t data;
1101   unsigned int count = size_in_bytes / sizeof(data);
1102
1103   if (count > ((1U) << 19)) {
1104     // Size is too large, must be strictly less than 512 KB.
1105     return false;
1106   }
1107
1108   uint64 a1 = 1;
1109   uint64 a2 = 1;
1110   uint64 b1 = 0;
1111   uint64 b2 = 0;
1112
1113   class Pattern *pattern = pe->pattern;
1114
1115   unsigned int i = 0;
1116   while (i < count) {
1117     // Process 64 bits at a time.
1118     if ((i & 0x7) == 0) {
1119       data.l64 = srcmem64[i];
1120       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1121       // Check that tags match expected.
1122       if (data.l64 != src_tag)
1123         ReportTagError(&srcmem64[i], data.l64, src_tag);
1124
1125
1126       data.l32.l = pattern->pattern(i << 1);
1127       data.l32.h = pattern->pattern((i << 1) + 1);
1128       a1 = a1 + data.l32.l;
1129       b1 = b1 + a1;
1130       a1 = a1 + data.l32.h;
1131       b1 = b1 + a1;
1132
1133
1134     } else {
1135       data.l64 = srcmem64[i];
1136       a1 = a1 + data.l32.l;
1137       b1 = b1 + a1;
1138       a1 = a1 + data.l32.h;
1139       b1 = b1 + a1;
1140     }
1141     i++;
1142
1143     data.l64 = srcmem64[i];
1144     a2 = a2 + data.l32.l;
1145     b2 = b2 + a2;
1146     a2 = a2 + data.l32.h;
1147     b2 = b2 + a2;
1148     i++;
1149   }
1150   checksum->Set(a1, a2, b1, b2);
1151   return true;
1152 }
1153
1154 // Copy a block of memory quickly, while keeping a CRC of the data.
1155 // Result check if the CRC mismatches.
1156 int WorkerThread::CrcCopyPage(struct page_entry *dstpe,
1157                               struct page_entry *srcpe) {
1158   int errors = 0;
1159   const int blocksize = 4096;
1160   const int blockwords = blocksize / wordsize_;
1161   int blocks = sat_->page_length() / blocksize;
1162
1163   // Base addresses for memory copy
1164   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1165   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1166   // Remember the expected CRC
1167   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1168
1169   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1170     uint64 *targetmem = targetmembase + currentblock * blockwords;
1171     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1172
1173     AdlerChecksum crc;
1174     if (tag_mode_) {
1175       AdlerAddrMemcpyC(targetmem, sourcemem, blocksize, &crc, srcpe);
1176     } else {
1177       AdlerMemcpyC(targetmem, sourcemem, blocksize, &crc);
1178     }
1179
1180     // Investigate miscompares.
1181     if (!crc.Equals(*expectedcrc)) {
1182       logprintf(11, "Log: CrcCopyPage Falling through to slow compare, "
1183                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1184                 expectedcrc->ToHexString().c_str());
1185       int errorcount = CheckRegion(sourcemem,
1186                                    srcpe->pattern,
1187                                    blocksize,
1188                                    currentblock * blocksize, 0);
1189       if (errorcount == 0) {
1190         logprintf(0, "Log: CrcCopyPage CRC mismatch %s != %s, "
1191                      "but no miscompares found. Retrying with fresh data.\n",
1192                   crc.ToHexString().c_str(),
1193                   expectedcrc->ToHexString().c_str());
1194         if (!tag_mode_) {
1195           // Copy the data originally read from this region back again.
1196           // This data should have any corruption read originally while
1197           // calculating the CRC.
1198           memcpy(sourcemem, targetmem, blocksize);
1199           errorcount = CheckRegion(sourcemem,
1200                                    srcpe->pattern,
1201                                    blocksize,
1202                                    currentblock * blocksize, 0);
1203           if (errorcount == 0) {
1204             int apic_id = apicid();
1205             uint32 cpumask = CurrentCpus();
1206             logprintf(0, "Process Error: CPU %d(0x%x) CrcCopyPage "
1207                          "CRC mismatch %s != %s, "
1208                          "but no miscompares found on second pass.\n",
1209                       apic_id, cpumask,
1210                       crc.ToHexString().c_str(),
1211                       expectedcrc->ToHexString().c_str());
1212             struct ErrorRecord er;
1213             er.actual = sourcemem[0];
1214             er.expected = 0x0;
1215             er.vaddr = sourcemem;
1216             ProcessError(&er, 0, "Hardware Error");
1217           }
1218         }
1219       }
1220       errors += errorcount;
1221     }
1222   }
1223
1224   // For odd length transfers, we should never hit this.
1225   int leftovers = sat_->page_length() % blocksize;
1226   if (leftovers) {
1227     uint64 *targetmem = targetmembase + blocks * blockwords;
1228     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1229
1230     errors += CheckRegion(sourcemem,
1231                           srcpe->pattern,
1232                           leftovers,
1233                           blocks * blocksize, 0);
1234     int leftoverwords = leftovers / wordsize_;
1235     for (int i = 0; i < leftoverwords; i++) {
1236       targetmem[i] = sourcemem[i];
1237     }
1238   }
1239
1240   // Update pattern reference to reflect new contents.
1241   dstpe->pattern = srcpe->pattern;
1242
1243   // Clean clean clean the errors away.
1244   if (errors) {
1245     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1246     // cause bad data to be propogated across the page.
1247     FillPage(dstpe);
1248   }
1249   return errors;
1250 }
1251
1252
1253
1254 // Invert a block of memory quickly, traversing downwards.
1255 int InvertThread::InvertPageDown(struct page_entry *srcpe) {
1256   const int blocksize = 4096;
1257   const int blockwords = blocksize / wordsize_;
1258   int blocks = sat_->page_length() / blocksize;
1259
1260   // Base addresses for memory copy
1261   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1262
1263   for (int currentblock = blocks-1; currentblock >= 0; currentblock--) {
1264     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1265     for (int i = blockwords - 32; i >= 0; i -= 32) {
1266       for (int index = i + 31; index >= i; --index) {
1267         unsigned int actual = sourcemem[index];
1268         sourcemem[index] = ~actual;
1269       }
1270       OsLayer::FastFlush(&sourcemem[i]);
1271     }
1272   }
1273
1274   return 0;
1275 }
1276
1277 // Invert a block of memory, traversing upwards.
1278 int InvertThread::InvertPageUp(struct page_entry *srcpe) {
1279   const int blocksize = 4096;
1280   const int blockwords = blocksize / wordsize_;
1281   int blocks = sat_->page_length() / blocksize;
1282
1283   // Base addresses for memory copy
1284   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1285
1286   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1287     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1288     for (int i = 0; i < blockwords; i += 32) {
1289       for (int index = i; index <= i + 31; ++index) {
1290         unsigned int actual = sourcemem[index];
1291         sourcemem[index] = ~actual;
1292       }
1293       OsLayer::FastFlush(&sourcemem[i]);
1294     }
1295   }
1296   return 0;
1297 }
1298
1299 // Copy a block of memory quickly, while keeping a CRC of the data.
1300 // Result check if the CRC mismatches. Warm the CPU while running
1301 int WorkerThread::CrcWarmCopyPage(struct page_entry *dstpe,
1302                                   struct page_entry *srcpe) {
1303   int errors = 0;
1304   const int blocksize = 4096;
1305   const int blockwords = blocksize / wordsize_;
1306   int blocks = sat_->page_length() / blocksize;
1307
1308   // Base addresses for memory copy
1309   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1310   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1311   // Remember the expected CRC
1312   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1313
1314   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1315     uint64 *targetmem = targetmembase + currentblock * blockwords;
1316     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1317
1318     AdlerChecksum crc;
1319     if (tag_mode_) {
1320       AdlerAddrMemcpyC(targetmem, sourcemem, blocksize, &crc, srcpe);
1321     } else {
1322       os_->AdlerMemcpyWarm(targetmem, sourcemem, blocksize, &crc);
1323     }
1324
1325     // Investigate miscompares.
1326     if (!crc.Equals(*expectedcrc)) {
1327       logprintf(11, "Log: CrcWarmCopyPage Falling through to slow compare, "
1328                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1329                 expectedcrc->ToHexString().c_str());
1330       int errorcount = CheckRegion(sourcemem,
1331                                    srcpe->pattern,
1332                                    blocksize,
1333                                    currentblock * blocksize, 0);
1334       if (errorcount == 0) {
1335         logprintf(0, "Log: CrcWarmCopyPage CRC mismatch %s != %s, "
1336                      "but no miscompares found. Retrying with fresh data.\n",
1337                   crc.ToHexString().c_str(),
1338                   expectedcrc->ToHexString().c_str());
1339         if (!tag_mode_) {
1340           // Copy the data originally read from this region back again.
1341           // This data should have any corruption read originally while
1342           // calculating the CRC.
1343           memcpy(sourcemem, targetmem, blocksize);
1344           errorcount = CheckRegion(sourcemem,
1345                                    srcpe->pattern,
1346                                    blocksize,
1347                                    currentblock * blocksize, 0);
1348           if (errorcount == 0) {
1349             logprintf(0, "Process Error: CrcWarmCopyPage CRC mismatch %s "
1350                          "!= %s, but no miscompares found on second pass.\n",
1351                       crc.ToHexString().c_str(),
1352                       expectedcrc->ToHexString().c_str());
1353           }
1354         }
1355       }
1356       errors += errorcount;
1357     }
1358   }
1359
1360   // For odd length transfers, we should never hit this.
1361   int leftovers = sat_->page_length() % blocksize;
1362   if (leftovers) {
1363     uint64 *targetmem = targetmembase + blocks * blockwords;
1364     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1365
1366     errors += CheckRegion(sourcemem,
1367                           srcpe->pattern,
1368                           leftovers,
1369                           blocks * blocksize, 0);
1370     int leftoverwords = leftovers / wordsize_;
1371     for (int i = 0; i < leftoverwords; i++) {
1372       targetmem[i] = sourcemem[i];
1373     }
1374   }
1375
1376   // Update pattern reference to reflect new contents.
1377   dstpe->pattern = srcpe->pattern;
1378
1379   // Clean clean clean the errors away.
1380   if (errors) {
1381     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1382     // cause bad data to be propogated across the page.
1383     FillPage(dstpe);
1384   }
1385   return errors;
1386 }
1387
1388
1389
1390 // Memory check work loop. Execute until done, then exhaust pages.
1391 int CheckThread::Work() {
1392   struct page_entry pe;
1393   int result = 1;
1394   int64 loops = 0;
1395
1396   logprintf(9, "Log: Starting Check thread %d\n", thread_num_);
1397
1398   // We want to check all the pages, and
1399   // stop when there aren't any left.
1400   while (1) {
1401     result &= sat_->GetValid(&pe);
1402     if (!result) {
1403       if (IsReadyToRunNoPause())
1404         logprintf(0, "Process Error: check_thread failed to pop pages, "
1405                   "bailing\n");
1406       else
1407         result = 1;
1408       break;
1409     }
1410
1411     // Do the result check.
1412     CrcCheckPage(&pe);
1413
1414     // Push pages back on the valid queue if we are still going,
1415     // throw them out otherwise.
1416     if (IsReadyToRunNoPause())
1417       result &= sat_->PutValid(&pe);
1418     else
1419       result &= sat_->PutEmpty(&pe);
1420     if (!result) {
1421       logprintf(0, "Process Error: check_thread failed to push pages, "
1422                 "bailing\n");
1423       break;
1424     }
1425     loops++;
1426   }
1427
1428   pages_copied_ = loops;
1429   status_ = result;
1430   logprintf(9, "Log: Completed %d: Check thread. Status %d, %d pages checked\n",
1431             thread_num_, status_, pages_copied_);
1432   return 1;
1433 }
1434
1435
1436 // Memory copy work loop. Execute until marked done.
1437 int CopyThread::Work() {
1438   struct page_entry src;
1439   struct page_entry dst;
1440   int result = 1;
1441   int64 loops = 0;
1442
1443   logprintf(9, "Log: Starting copy thread %d: cpu %x, mem %x\n",
1444             thread_num_, cpu_mask_, tag_);
1445
1446   while (IsReadyToRun()) {
1447     // Pop the needed pages.
1448     result &= sat_->GetValid(&src, tag_);
1449     result &= sat_->GetEmpty(&dst, tag_);
1450     if (!result) {
1451       logprintf(0, "Process Error: copy_thread failed to pop pages, "
1452                 "bailing\n");
1453       break;
1454     }
1455
1456     // Force errors for unittests.
1457     if (sat_->error_injection()) {
1458       if (loops == 8) {
1459         char *addr = reinterpret_cast<char*>(src.addr);
1460         int offset = random() % sat_->page_length();
1461         addr[offset] = 0xba;
1462       }
1463     }
1464
1465     // We can use memcpy, or CRC check while we copy.
1466     if (sat_->warm()) {
1467       CrcWarmCopyPage(&dst, &src);
1468     } else if (sat_->strict()) {
1469       CrcCopyPage(&dst, &src);
1470     } else {
1471       memcpy(dst.addr, src.addr, sat_->page_length());
1472       dst.pattern = src.pattern;
1473     }
1474
1475     result &= sat_->PutValid(&dst);
1476     result &= sat_->PutEmpty(&src);
1477
1478     // Copy worker-threads yield themselves at the end of each copy loop,
1479     // to avoid threads from preempting each other in the middle of the inner
1480     // copy-loop. Cooperations between Copy worker-threads results in less
1481     // unnecessary cache thrashing (which happens when context-switching in the
1482     // middle of the inner copy-loop).
1483     YieldSelf();
1484
1485     if (!result) {
1486       logprintf(0, "Process Error: copy_thread failed to push pages, "
1487                 "bailing\n");
1488       break;
1489     }
1490     loops++;
1491   }
1492
1493   pages_copied_ = loops;
1494   status_ = result;
1495   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1496             thread_num_, status_, pages_copied_);
1497   return 1;
1498 }
1499
1500 // Memory invert work loop. Execute until marked done.
1501 int InvertThread::Work() {
1502   struct page_entry src;
1503   int result = 1;
1504   int64 loops = 0;
1505
1506   logprintf(9, "Log: Starting invert thread %d\n", thread_num_);
1507
1508   while (IsReadyToRun()) {
1509     // Pop the needed pages.
1510     result &= sat_->GetValid(&src);
1511     if (!result) {
1512       logprintf(0, "Process Error: invert_thread failed to pop pages, "
1513                 "bailing\n");
1514       break;
1515     }
1516
1517     if (sat_->strict())
1518       CrcCheckPage(&src);
1519
1520     // For the same reason CopyThread yields itself (see YieldSelf comment
1521     // in CopyThread::Work(), InvertThread yields itself after each invert
1522     // operation to improve cooperation between different worker threads
1523     // stressing the memory/cache.
1524     InvertPageUp(&src);
1525     YieldSelf();
1526     InvertPageDown(&src);
1527     YieldSelf();
1528     InvertPageDown(&src);
1529     YieldSelf();
1530     InvertPageUp(&src);
1531     YieldSelf();
1532
1533     if (sat_->strict())
1534       CrcCheckPage(&src);
1535
1536     result &= sat_->PutValid(&src);
1537     if (!result) {
1538       logprintf(0, "Process Error: invert_thread failed to push pages, "
1539                 "bailing\n");
1540       break;
1541     }
1542     loops++;
1543   }
1544
1545   pages_copied_ = loops * 2;
1546   status_ = result;
1547   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1548             thread_num_, status_, pages_copied_);
1549   return 1;
1550 }
1551
1552
1553 // Set file name to use for File IO.
1554 void FileThread::SetFile(const char *filename_init) {
1555   filename_ = filename_init;
1556   devicename_ = os_->FindFileDevice(filename_);
1557 }
1558
1559 // Open the file for access.
1560 bool FileThread::OpenFile(int *pfile) {
1561   int fd = open(filename_.c_str(),
1562                 O_RDWR | O_CREAT | O_SYNC | O_DIRECT,
1563                 0644);
1564   if (fd < 0) {
1565     logprintf(0, "Process Error: Failed to create file %s!!\n",
1566               filename_.c_str());
1567     pages_copied_ = 0;
1568     status_ = 0;
1569     return 0;
1570   }
1571   *pfile = fd;
1572   return 1;
1573 }
1574
1575 // Close the file.
1576 bool FileThread::CloseFile(int fd) {
1577   close(fd);
1578   return 1;
1579 }
1580
1581 // Check sector tagging.
1582 bool FileThread::SectorTagPage(struct page_entry *src, int block) {
1583   int page_length = sat_->page_length();
1584   struct FileThread::SectorTag *tag =
1585     (struct FileThread::SectorTag *)(src->addr);
1586
1587   // Tag each sector.
1588   unsigned char magic = ((0xba + thread_num_) & 0xff);
1589   for (int sec = 0; sec < page_length / 512; sec++) {
1590     tag[sec].magic = magic;
1591     tag[sec].block = block & 0xff;
1592     tag[sec].sector = sec & 0xff;
1593     tag[sec].pass = pass_ & 0xff;
1594   }
1595   return true;
1596 }
1597
1598 bool FileThread::WritePageToFile(int fd, struct page_entry *src) {
1599   int page_length = sat_->page_length();
1600   // Fill the file with our data.
1601   int64 size = write(fd, src->addr, page_length);
1602
1603   if (size != page_length) {
1604     os_->ErrorReport(devicename_.c_str(), "write-error", 1);
1605     errorcount_++;
1606     logprintf(0, "Block Error: file_thread failed to write, "
1607               "bailing\n");
1608     return false;
1609   }
1610   return true;
1611 }
1612
1613 // Write the data to the file.
1614 bool FileThread::WritePages(int fd) {
1615   int strict = sat_->strict();
1616
1617   // Start fresh at beginning of file for each batch of pages.
1618   lseek(fd, 0, SEEK_SET);
1619   for (int i = 0; i < sat_->disk_pages(); i++) {
1620     struct page_entry src;
1621     if (!GetValidPage(&src))
1622       return false;
1623     // Save expected pattern.
1624     page_recs_[i].pattern = src.pattern;
1625     page_recs_[i].src = src.addr;
1626
1627     // Check data correctness.
1628     if (strict)
1629       CrcCheckPage(&src);
1630
1631     SectorTagPage(&src, i);
1632
1633     bool result = WritePageToFile(fd, &src);
1634
1635     if (!PutEmptyPage(&src))
1636       return false;
1637
1638     if (!result)
1639       return false;
1640   }
1641   return true;
1642 }
1643
1644 // Copy data from file into memory block.
1645 bool FileThread::ReadPageFromFile(int fd, struct page_entry *dst) {
1646   int page_length = sat_->page_length();
1647
1648   // Do the actual read.
1649   int64 size = read(fd, dst->addr, page_length);
1650   if (size != page_length) {
1651     os_->ErrorReport(devicename_.c_str(), "read-error", 1);
1652     logprintf(0, "Block Error: file_thread failed to read, "
1653               "bailing\n");
1654     errorcount_++;
1655     return false;
1656   }
1657   return true;
1658 }
1659
1660 // Check sector tagging.
1661 bool FileThread::SectorValidatePage(const struct PageRec &page,
1662                                     struct page_entry *dst, int block) {
1663   // Error injection.
1664   static int calls = 0;
1665   calls++;
1666
1667   // Do sector tag compare.
1668   int firstsector = -1;
1669   int lastsector = -1;
1670   bool badsector = false;
1671   int page_length = sat_->page_length();
1672
1673   // Cast data block into an array of tagged sectors.
1674   struct FileThread::SectorTag *tag =
1675   (struct FileThread::SectorTag *)(dst->addr);
1676
1677   sat_assert(sizeof(*tag) == 512);
1678
1679   // Error injection.
1680   if (sat_->error_injection()) {
1681     if (calls == 2) {
1682       for (int badsec = 8; badsec < 17; badsec++)
1683         tag[badsec].pass = 27;
1684     }
1685     if (calls == 18) {
1686       (static_cast<int32*>(dst->addr))[27] = 0xbadda7a;
1687     }
1688   }
1689
1690   // Check each sector for the correct tag we added earlier,
1691   // then revert the tag to the to normal data pattern.
1692   unsigned char magic = ((0xba + thread_num_) & 0xff);
1693   for (int sec = 0; sec < page_length / 512; sec++) {
1694     // Check magic tag.
1695     if ((tag[sec].magic != magic) ||
1696         (tag[sec].block != (block & 0xff)) ||
1697         (tag[sec].sector != (sec & 0xff)) ||
1698         (tag[sec].pass != (pass_ & 0xff))) {
1699       // Offset calculation for tag location.
1700       int offset = sec * sizeof(SectorTag);
1701       if (tag[sec].block != (block & 0xff))
1702         offset += 1 * sizeof(uint8);
1703       else if (tag[sec].sector != (sec & 0xff))
1704         offset += 2 * sizeof(uint8);
1705       else if (tag[sec].pass != (pass_ & 0xff))
1706         offset += 3 * sizeof(uint8);
1707
1708       // Run sector tag error through diagnoser for logging and reporting.
1709       errorcount_ += 1;
1710       os_->error_diagnoser_->AddHDDSectorTagError(devicename_, tag[sec].block,
1711                                                   offset,
1712                                                   tag[sec].sector,
1713                                                   page.src, page.dst);
1714
1715       logprintf(5, "Sector Error: Sector tag @ 0x%x, pass %d/%d. "
1716                 "sec %x/%x, block %d/%d, magic %x/%x, File: %s \n",
1717                 block * page_length + 512 * sec,
1718                 (pass_ & 0xff), (unsigned int)tag[sec].pass,
1719                 sec, (unsigned int)tag[sec].sector,
1720                 block, (unsigned int)tag[sec].block,
1721                 magic, (unsigned int)tag[sec].magic,
1722                 filename_.c_str());
1723
1724       // Keep track of first and last bad sector.
1725       if (firstsector == -1)
1726         firstsector = (block * page_length / 512) + sec;
1727       lastsector = (block * page_length / 512) + sec;
1728       badsector = true;
1729     }
1730     // Patch tag back to proper pattern.
1731     unsigned int *addr = (unsigned int *)(&tag[sec]);
1732     *addr = dst->pattern->pattern(512 * sec / sizeof(*addr));
1733   }
1734
1735   // If we found sector errors:
1736   if (badsector == true) {
1737     logprintf(5, "Log: file sector miscompare at offset %x-%x. File: %s\n",
1738               firstsector * 512,
1739               ((lastsector + 1) * 512) - 1,
1740               filename_.c_str());
1741
1742     // Either exit immediately, or patch the data up and continue.
1743     if (sat_->stop_on_error()) {
1744       exit(1);
1745     } else {
1746       // Patch up bad pages.
1747       for (int block = (firstsector * 512) / page_length;
1748           block <= (lastsector * 512) / page_length;
1749           block++) {
1750         unsigned int *memblock = static_cast<unsigned int *>(dst->addr);
1751         int length = page_length / wordsize_;
1752         for (int i = 0; i < length; i++) {
1753           memblock[i] = dst->pattern->pattern(i);
1754         }
1755       }
1756     }
1757   }
1758   return true;
1759 }
1760
1761 // Get memory for an incoming data transfer..
1762 bool FileThread::PagePrepare() {
1763   // We can only do direct IO to SAT pages if it is normal mem.
1764   page_io_ = os_->normal_mem();
1765
1766   // Init a local buffer if we need it.
1767   if (!page_io_) {
1768     int result = posix_memalign(&local_page_, 512, sat_->page_length());
1769     if (result) {
1770       logprintf(0, "Process Error: disk thread posix_memalign "
1771                    "returned %d (fail)\n",
1772                 result);
1773       status_ += 1;
1774       return false;
1775     }
1776   }
1777   return true;
1778 }
1779
1780
1781 // Remove memory allocated for data transfer.
1782 bool FileThread::PageTeardown() {
1783   // Free a local buffer if we need to.
1784   if (!page_io_) {
1785     free(local_page_);
1786   }
1787   return true;
1788 }
1789
1790
1791
1792 // Get memory for an incoming data transfer..
1793 bool FileThread::GetEmptyPage(struct page_entry *dst) {
1794   if (page_io_) {
1795     if (!sat_->GetEmpty(dst))
1796       return false;
1797   } else {
1798     dst->addr = local_page_;
1799     dst->offset = 0;
1800     dst->pattern = 0;
1801   }
1802   return true;
1803 }
1804
1805 // Get memory for an outgoing data transfer..
1806 bool FileThread::GetValidPage(struct page_entry *src) {
1807   struct page_entry tmp;
1808   if (!sat_->GetValid(&tmp))
1809     return false;
1810   if (page_io_) {
1811     *src = tmp;
1812     return true;
1813   } else {
1814     src->addr = local_page_;
1815     src->offset = 0;
1816     CrcCopyPage(src, &tmp);
1817     if (!sat_->PutValid(&tmp))
1818       return false;
1819   }
1820   return true;
1821 }
1822
1823
1824 // Throw out a used empty page.
1825 bool FileThread::PutEmptyPage(struct page_entry *src) {
1826   if (page_io_) {
1827     if (!sat_->PutEmpty(src))
1828       return false;
1829   }
1830   return true;
1831 }
1832
1833 // Throw out a used, filled page.
1834 bool FileThread::PutValidPage(struct page_entry *src) {
1835   if (page_io_) {
1836     if (!sat_->PutValid(src))
1837       return false;
1838   }
1839   return true;
1840 }
1841
1842
1843
1844 // Copy data from file into memory blocks.
1845 bool FileThread::ReadPages(int fd) {
1846   int page_length = sat_->page_length();
1847   int strict = sat_->strict();
1848   int result = 1;
1849
1850
1851   // Read our data back out of the file, into it's new location.
1852   lseek(fd, 0, SEEK_SET);
1853   for (int i = 0; i < sat_->disk_pages(); i++) {
1854     struct page_entry dst;
1855     if (!GetEmptyPage(&dst))
1856       return false;
1857     // Retrieve expected pattern.
1858     dst.pattern = page_recs_[i].pattern;
1859     // Update page recordpage record.
1860     page_recs_[i].dst = dst.addr;
1861
1862     // Read from the file into destination page.
1863     if (!ReadPageFromFile(fd, &dst)) {
1864         PutEmptyPage(&dst);
1865         return false;
1866     }
1867
1868     SectorValidatePage(page_recs_[i], &dst, i);
1869
1870     // Ensure that the transfer ended up with correct data.
1871     if (strict) {
1872       // Record page index currently CRC checked.
1873       crc_page_ = i;
1874       int errors = CrcCheckPage(&dst);
1875       if (errors) {
1876         logprintf(5, "Log: file miscompare at block %d, "
1877                   "offset %x-%x. File: %s\n",
1878                   i, i * page_length, ((i + 1) * page_length) - 1,
1879                   filename_.c_str());
1880         result = false;
1881       }
1882       crc_page_ = -1;
1883       errorcount_ += errors;
1884     }
1885     if (!PutValidPage(&dst))
1886       return false;
1887   }
1888   return result;
1889 }
1890
1891
1892 // File IO work loop. Execute until marked done.
1893 int FileThread::Work() {
1894   int result = 1;
1895   int fileresult = 1;
1896   int64 loops = 0;
1897
1898   logprintf(9, "Log: Starting file thread %d, file %s, device %s\n",
1899             thread_num_,
1900             filename_.c_str(),
1901             devicename_.c_str());
1902
1903   if (!PagePrepare())
1904     return 0;
1905
1906   // Open the data IO file.
1907   int fd = 0;
1908   if (!OpenFile(&fd))
1909     return 0;
1910
1911   pass_ = 0;
1912
1913   // Load patterns into page records.
1914   page_recs_ = new struct PageRec[sat_->disk_pages()];
1915   for (int i = 0; i < sat_->disk_pages(); i++) {
1916     page_recs_[i].pattern = new struct Pattern();
1917   }
1918
1919   // Loop until done.
1920   while (IsReadyToRun()) {
1921     // Do the file write.
1922     if (!(fileresult &= WritePages(fd)))
1923       break;
1924
1925     // Do the file read.
1926     if (!(fileresult &= ReadPages(fd)))
1927       break;
1928
1929     loops++;
1930     pass_ = loops;
1931   }
1932
1933   pages_copied_ = loops * sat_->disk_pages();
1934   status_ = result;
1935
1936   // Clean up.
1937   CloseFile(fd);
1938   PageTeardown();
1939
1940   logprintf(9, "Log: Completed %d: file thread status %d, %d pages copied\n",
1941             thread_num_, status_, pages_copied_);
1942   return 1;
1943 }
1944
1945 bool NetworkThread::IsNetworkStopSet() {
1946   return !IsReadyToRunNoPause();
1947 }
1948
1949 bool NetworkSlaveThread::IsNetworkStopSet() {
1950   // This thread has no completion status.
1951   // It finishes whever there is no more data to be
1952   // passed back.
1953   return true;
1954 }
1955
1956 // Set ip name to use for Network IO.
1957 void NetworkThread::SetIP(const char *ipaddr_init) {
1958   strncpy(ipaddr_, ipaddr_init, 256);
1959 }
1960
1961 // Create a socket.
1962 // Return 0 on error.
1963 bool NetworkThread::CreateSocket(int *psocket) {
1964   int sock = socket(AF_INET, SOCK_STREAM, 0);
1965   if (sock == -1) {
1966     logprintf(0, "Process Error: Cannot open socket\n");
1967     pages_copied_ = 0;
1968     status_ = 0;
1969     return false;
1970   }
1971   *psocket = sock;
1972   return true;
1973 }
1974
1975 // Close the socket.
1976 bool NetworkThread::CloseSocket(int sock) {
1977   close(sock);
1978   return true;
1979 }
1980
1981 // Initiate the tcp connection.
1982 bool NetworkThread::Connect(int sock) {
1983   struct sockaddr_in dest_addr;
1984   dest_addr.sin_family = AF_INET;
1985   dest_addr.sin_port = htons(kNetworkPort);
1986   memset(&(dest_addr.sin_zero), '\0', sizeof(dest_addr.sin_zero));
1987
1988   // Translate dot notation to u32.
1989   if (inet_aton(ipaddr_, &dest_addr.sin_addr) == 0) {
1990     logprintf(0, "Process Error: Cannot resolve %s\n", ipaddr_);
1991     pages_copied_ = 0;
1992     status_ = 0;
1993     return false;
1994   }
1995
1996   if (-1 == connect(sock, reinterpret_cast<struct sockaddr *>(&dest_addr),
1997                     sizeof(struct sockaddr))) {
1998     logprintf(0, "Process Error: Cannot connect %s\n", ipaddr_);
1999     pages_copied_ = 0;
2000     status_ = 0;
2001     return false;
2002   }
2003   return true;
2004 }
2005
2006 // Initiate the tcp connection.
2007 bool NetworkListenThread::Listen() {
2008   struct sockaddr_in sa;
2009
2010   memset(&(sa.sin_zero), '\0', sizeof(sa.sin_zero));
2011
2012   sa.sin_family = AF_INET;
2013   sa.sin_addr.s_addr = INADDR_ANY;
2014   sa.sin_port = htons(kNetworkPort);
2015
2016   if (-1 == bind(sock_, (struct sockaddr*)&sa, sizeof(struct sockaddr))) {
2017     char buf[256];
2018     sat_strerror(errno, buf, sizeof(buf));
2019     logprintf(0, "Process Error: Cannot bind socket: %s\n", buf);
2020     pages_copied_ = 0;
2021     status_ = 0;
2022     return false;
2023   }
2024   listen(sock_, 3);
2025   return true;
2026 }
2027
2028 // Wait for a connection from a network traffic generation thread.
2029 bool NetworkListenThread::Wait() {
2030     fd_set rfds;
2031     struct timeval tv;
2032     int retval;
2033
2034     // Watch sock_ to see when it has input.
2035     FD_ZERO(&rfds);
2036     FD_SET(sock_, &rfds);
2037     // Wait up to five seconds.
2038     tv.tv_sec = 5;
2039     tv.tv_usec = 0;
2040
2041     retval = select(sock_ + 1, &rfds, NULL, NULL, &tv);
2042
2043     return (retval > 0);
2044 }
2045
2046 // Wait for a connection from a network traffic generation thread.
2047 bool NetworkListenThread::GetConnection(int *pnewsock) {
2048   struct sockaddr_in sa;
2049   socklen_t size = sizeof(struct sockaddr_in);
2050
2051   int newsock = accept(sock_, reinterpret_cast<struct sockaddr *>(&sa), &size);
2052   if (newsock < 0)  {
2053     logprintf(0, "Process Error: Did not receive connection\n");
2054     pages_copied_ = 0;
2055     status_ = 0;
2056     return false;
2057   }
2058   *pnewsock = newsock;
2059   return true;
2060 }
2061
2062 bool NetworkThread::SendPage(int sock, struct page_entry *src) {
2063   int page_length = sat_->page_length();
2064   char *address = static_cast<char*>(src->addr);
2065
2066   // Send our data over the network.
2067   int size = page_length;
2068   while (size) {
2069     int transferred = send(sock, address + (page_length - size), size, 0);
2070     if ((transferred == 0) || (transferred == -1)) {
2071       if (!IsNetworkStopSet()) {
2072         char buf[256] = "";
2073         sat_strerror(errno, buf, sizeof(buf));
2074         logprintf(0, "Process Error: Thread %d, "
2075                      "Network write failed, bailing. (%s)\n",
2076                   thread_num_, buf);
2077       }
2078       return false;
2079     }
2080     size = size - transferred;
2081   }
2082   return true;
2083 }
2084
2085
2086 bool NetworkThread::ReceivePage(int sock, struct page_entry *dst) {
2087   int page_length = sat_->page_length();
2088   char *address = static_cast<char*>(dst->addr);
2089
2090   // Maybe we will get our data back again, maybe not.
2091   int size = page_length;
2092   while (size) {
2093     int transferred = recv(sock, address + (page_length - size), size, 0);
2094     if ((transferred == 0) || (transferred == -1)) {
2095       // Typically network slave thread should exit as network master
2096       // thread stops sending data.
2097       if (IsNetworkStopSet()) {
2098         int err = errno;
2099         if (transferred == 0 && err == 0) {
2100           // Two system setups will not sync exactly,
2101           // allow early exit, but log it.
2102           logprintf(0, "Log: Net thread did not receive any data, exiting.\n");
2103         } else {
2104           char buf[256] = "";
2105           sat_strerror(err, buf, sizeof(buf));
2106           // Print why we failed.
2107           logprintf(0, "Process Error: Thread %d, "
2108                        "Network read failed, bailing (%s).\n",
2109                     thread_num_, buf);
2110           // Print arguments and results.
2111           logprintf(0, "Log: recv(%d, address %x, size %x, 0) == %x, err %d\n",
2112                     sock, address + (page_length - size),
2113                     size, transferred, err);
2114           if ((transferred == 0) &&
2115               (page_length - size < 512) &&
2116               (page_length - size > 0)) {
2117             // Print null terminated data received, to see who's been
2118             // sending us supicious unwanted data.
2119             address[page_length - size] = 0;
2120             logprintf(0, "Log: received  %d bytes: '%s'\n",
2121                       page_length - size, address);
2122           }
2123         }
2124       }
2125       return false;
2126     }
2127     size = size - transferred;
2128   }
2129   return true;
2130 }
2131
2132
2133 // Network IO work loop. Execute until marked done.
2134 int NetworkThread::Work() {
2135   logprintf(9, "Log: Starting network thread %d, ip %s\n",
2136             thread_num_,
2137             ipaddr_);
2138
2139   // Make a socket.
2140   int sock = 0;
2141   if (!CreateSocket(&sock))
2142     return 0;
2143
2144   // Network IO loop requires network slave thread to have already initialized.
2145   // We will sleep here for awhile to ensure that the slave thread will be
2146   // listening by the time we connect.
2147   // Sleep for 15 seconds.
2148   sat_sleep(15);
2149   logprintf(9, "Log: Starting execution of network thread %d, ip %s\n",
2150             thread_num_,
2151             ipaddr_);
2152
2153
2154   // Connect to a slave thread.
2155   if (!Connect(sock))
2156     return 0;
2157
2158   // Loop until done.
2159   int result = 1;
2160   int strict = sat_->strict();
2161   int64 loops = 0;
2162   while (IsReadyToRun()) {
2163     struct page_entry src;
2164     struct page_entry dst;
2165     result &= sat_->GetValid(&src);
2166     result &= sat_->GetEmpty(&dst);
2167     if (!result) {
2168       logprintf(0, "Process Error: net_thread failed to pop pages, "
2169                 "bailing\n");
2170       break;
2171     }
2172
2173     // Check data correctness.
2174     if (strict)
2175       CrcCheckPage(&src);
2176
2177     // Do the network write.
2178     if (!(result &= SendPage(sock, &src)))
2179       break;
2180
2181     // Update pattern reference to reflect new contents.
2182     dst.pattern = src.pattern;
2183
2184     // Do the network read.
2185     if (!(result &= ReceivePage(sock, &dst)))
2186       break;
2187
2188     // Ensure that the transfer ended up with correct data.
2189     if (strict)
2190       CrcCheckPage(&dst);
2191
2192     // Return all of our pages to the queue.
2193     result &= sat_->PutValid(&dst);
2194     result &= sat_->PutEmpty(&src);
2195     if (!result) {
2196       logprintf(0, "Process Error: net_thread failed to push pages, "
2197                 "bailing\n");
2198       break;
2199     }
2200     loops++;
2201   }
2202
2203   pages_copied_ = loops;
2204   status_ = result;
2205
2206   // Clean up.
2207   CloseSocket(sock);
2208
2209   logprintf(9, "Log: Completed %d: network thread status %d, "
2210                "%d pages copied\n",
2211             thread_num_, status_, pages_copied_);
2212   return 1;
2213 }
2214
2215 // Spawn slave threads for incoming connections.
2216 bool NetworkListenThread::SpawnSlave(int newsock, int threadid) {
2217   logprintf(12, "Log: Listen thread spawning slave\n");
2218
2219   // Spawn slave thread, to reflect network traffic back to sender.
2220   ChildWorker *child_worker = new ChildWorker;
2221   child_worker->thread.SetSock(newsock);
2222   child_worker->thread.InitThread(threadid, sat_, os_, patternlist_,
2223                                   &child_worker->status);
2224   child_worker->status.Initialize();
2225   child_worker->thread.SpawnThread();
2226   child_workers_.push_back(child_worker);
2227
2228   return true;
2229 }
2230
2231 // Reap slave threads.
2232 bool NetworkListenThread::ReapSlaves() {
2233   bool result = true;
2234   // Gather status and reap threads.
2235   logprintf(12, "Log: Joining all outstanding threads\n");
2236
2237   for (int i = 0; i < child_workers_.size(); i++) {
2238     NetworkSlaveThread& child_thread = child_workers_[i]->thread;
2239     logprintf(12, "Log: Joining slave thread %d\n", i);
2240     child_thread.JoinThread();
2241     if (child_thread.GetStatus() != 1) {
2242       logprintf(0, "Process Error: Slave Thread %d failed with status %d\n", i,
2243                 child_thread.GetStatus());
2244       result = false;
2245     }
2246     errorcount_ += child_thread.GetErrorCount();
2247     logprintf(9, "Log: Slave Thread %d found %lld miscompares\n", i,
2248               child_thread.GetErrorCount());
2249     pages_copied_ += child_thread.GetPageCount();
2250   }
2251
2252   return result;
2253 }
2254
2255 // Network listener IO work loop. Execute until marked done.
2256 int NetworkListenThread::Work() {
2257   int result = 1;
2258   logprintf(9, "Log: Starting network listen thread %d\n",
2259             thread_num_);
2260
2261   // Make a socket.
2262   sock_ = 0;
2263   if (!CreateSocket(&sock_))
2264     return 0;
2265   logprintf(9, "Log: Listen thread created sock\n");
2266
2267   // Allows incoming connections to be queued up by socket library.
2268   int newsock = 0;
2269   Listen();
2270   logprintf(12, "Log: Listen thread waiting for incoming connections\n");
2271
2272   // Wait on incoming connections, and spawn worker threads for them.
2273   int threadcount = 0;
2274   while (IsReadyToRun()) {
2275     // Poll for connections that we can accept().
2276     if (Wait()) {
2277       // Accept those connections.
2278       logprintf(12, "Log: Listen thread found incoming connection\n");
2279       if (GetConnection(&newsock)) {
2280         SpawnSlave(newsock, threadcount);
2281         threadcount++;
2282       }
2283     }
2284   }
2285
2286   // Gather status and join spawned threads.
2287   ReapSlaves();
2288
2289   // Delete the child workers.
2290   for (ChildVector::iterator it = child_workers_.begin();
2291        it != child_workers_.end(); ++it) {
2292     (*it)->status.Destroy();
2293     delete *it;
2294   }
2295   child_workers_.clear();
2296
2297   CloseSocket(sock_);
2298
2299   status_ = result;
2300   logprintf(9,
2301             "Log: Completed %d: network listen thread status %d, "
2302             "%d pages copied\n",
2303             thread_num_, status_, pages_copied_);
2304   return 1;
2305 }
2306
2307 // Set network reflector socket struct.
2308 void NetworkSlaveThread::SetSock(int sock) {
2309   sock_ = sock;
2310 }
2311
2312 // Network reflector IO work loop. Execute until marked done.
2313 int NetworkSlaveThread::Work() {
2314   logprintf(9, "Log: Starting network slave thread %d\n",
2315             thread_num_);
2316
2317   // Verify that we have a socket.
2318   int sock = sock_;
2319   if (!sock)
2320     return 0;
2321
2322   // Loop until done.
2323   int64 loops = 0;
2324   // Init a local buffer for storing data.
2325   void *local_page = NULL;
2326   int result = posix_memalign(&local_page, 512, sat_->page_length());
2327   if (result) {
2328     logprintf(0, "Process Error: net slave posix_memalign "
2329                  "returned %d (fail)\n",
2330               result);
2331     status_ += 1;
2332     return false;
2333   }
2334
2335   struct page_entry page;
2336   page.addr = local_page;
2337
2338   // This thread will continue to run as long as the thread on the other end of
2339   // the socket is still sending and receiving data.
2340   while (1) {
2341     // Do the network read.
2342     if (!ReceivePage(sock, &page))
2343       break;
2344
2345     // Do the network write.
2346     if (!SendPage(sock, &page))
2347       break;
2348
2349     loops++;
2350   }
2351
2352   pages_copied_ = loops;
2353   // No results provided from this type of thread.
2354   status_ = 1;
2355
2356   // Clean up.
2357   CloseSocket(sock);
2358
2359   logprintf(9,
2360             "Log: Completed %d: network slave thread status %d, "
2361             "%d pages copied\n",
2362             thread_num_, status_, pages_copied_);
2363   return status_;
2364 }
2365
2366 // Thread work loop. Execute until marked finished.
2367 int ErrorPollThread::Work() {
2368   logprintf(9, "Log: Starting system error poll thread %d\n", thread_num_);
2369
2370   // This calls a generic error polling function in the Os abstraction layer.
2371   do {
2372     errorcount_ += os_->ErrorPoll();
2373     os_->ErrorWait();
2374   } while (IsReadyToRun());
2375
2376   logprintf(9, "Log: Finished system error poll thread %d: %d errors\n",
2377             thread_num_, errorcount_);
2378   status_ = 1;
2379   return 1;
2380 }
2381
2382 // Worker thread to heat up CPU.
2383 int CpuStressThread::Work() {
2384   logprintf(9, "Log: Starting CPU stress thread %d\n", thread_num_);
2385
2386   do {
2387     // Run ludloff's platform/CPU-specific assembly workload.
2388     os_->CpuStressWorkload();
2389     YieldSelf();
2390   } while (IsReadyToRun());
2391
2392   logprintf(9, "Log: Finished CPU stress thread %d:\n",
2393             thread_num_);
2394   status_ = 1;
2395   return 1;
2396 }
2397
2398 CpuCacheCoherencyThread::CpuCacheCoherencyThread(cc_cacheline_data *data,
2399                                                  int cacheline_count,
2400                                                  int thread_num,
2401                                                  int inc_count) {
2402   cc_cacheline_data_ = data;
2403   cc_cacheline_count_ = cacheline_count;
2404   cc_thread_num_ = thread_num;
2405   cc_inc_count_ = inc_count;
2406 }
2407
2408 // Worked thread to test the cache coherency of the CPUs
2409 int CpuCacheCoherencyThread::Work() {
2410   logprintf(9, "Log: Starting the Cache Coherency thread %d\n",
2411             cc_thread_num_);
2412   uint64 time_start, time_end;
2413   struct timeval tv;
2414
2415   unsigned int seed = static_cast<unsigned int>(gettid());
2416   gettimeofday(&tv, NULL);  // Get the timestamp before increments.
2417   time_start = tv.tv_sec * 1000000ULL + tv.tv_usec;
2418
2419   uint64 total_inc = 0;  // Total increments done by the thread.
2420   while (IsReadyToRun()) {
2421     for (int i = 0; i < cc_inc_count_; i++) {
2422       // Choose a datastructure in random and increment the appropriate
2423       // member in that according to the offset (which is the same as the
2424       // thread number.
2425       int r = rand_r(&seed);
2426       r = cc_cacheline_count_ * (r / (RAND_MAX + 1.0));
2427       // Increment the member of the randomely selected structure.
2428       (cc_cacheline_data_[r].num[cc_thread_num_])++;
2429     }
2430
2431     total_inc += cc_inc_count_;
2432
2433     // Calculate if the local counter matches with the global value
2434     // in all the cache line structures for this particular thread.
2435     int cc_global_num = 0;
2436     for (int cline_num = 0; cline_num < cc_cacheline_count_; cline_num++) {
2437       cc_global_num += cc_cacheline_data_[cline_num].num[cc_thread_num_];
2438       // Reset the cachline member's value for the next run.
2439       cc_cacheline_data_[cline_num].num[cc_thread_num_] = 0;
2440     }
2441     if (sat_->error_injection())
2442       cc_global_num = -1;
2443
2444     if (cc_global_num != cc_inc_count_) {
2445       errorcount_++;
2446       logprintf(0, "Hardware Error: global(%d) and local(%d) do not match\n",
2447                 cc_global_num, cc_inc_count_);
2448     }
2449   }
2450   gettimeofday(&tv, NULL);  // Get the timestamp at the end.
2451   time_end = tv.tv_sec * 1000000ULL + tv.tv_usec;
2452
2453   uint64 us_elapsed = time_end - time_start;
2454   // inc_rate is the no. of increments per second.
2455   double inc_rate = total_inc * 1e6 / us_elapsed;
2456
2457   logprintf(4, "Stats: CC Thread(%d): Time=%llu us,"
2458             " Increments=%llu, Increments/sec = %.6lf\n",
2459             cc_thread_num_, us_elapsed, total_inc, inc_rate);
2460   logprintf(9, "Log: Finished CPU Cache Coherency thread %d:\n",
2461             cc_thread_num_);
2462   status_ = 1;
2463   return 1;
2464 }
2465
2466 DiskThread::DiskThread(DiskBlockTable *block_table) {
2467   read_block_size_ = kSectorSize;   // default 1 sector (512 bytes)
2468   write_block_size_ = kSectorSize;  // this assumes read and write block size
2469                                     // are the same
2470   segment_size_ = -1;               // use the entire disk as one segment
2471   cache_size_ = 16 * 1024 * 1024;   // assume 16MiB cache by default
2472   // Use a queue such that 3/2 times as much data as the cache can hold
2473   // is written before it is read so that there is little chance the read
2474   // data is in the cache.
2475   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2476   blocks_per_segment_ = 32;
2477
2478   read_threshold_ = 100000;         // 100ms is a reasonable limit for
2479   write_threshold_ = 100000;        // reading/writing a sector
2480
2481   read_timeout_ = 5000000;          // 5 seconds should be long enough for a
2482   write_timeout_ = 5000000;         // timout for reading/writing
2483
2484   device_sectors_ = 0;
2485   non_destructive_ = 0;
2486
2487   aio_ctx_ = 0;
2488   block_table_ = block_table;
2489   update_block_table_ = 1;
2490
2491   block_buffer_ = NULL;
2492 }
2493
2494 DiskThread::~DiskThread() {
2495 }
2496
2497 // Set filename for device file (in /dev).
2498 void DiskThread::SetDevice(const char *device_name) {
2499   device_name_ = device_name;
2500 }
2501
2502 // Set various parameters that control the behaviour of the test.
2503 // -1 is used as a sentinel value on each parameter (except non_destructive)
2504 // to indicate that the parameter not be set.
2505 bool DiskThread::SetParameters(int read_block_size,
2506                                int write_block_size,
2507                                int64 segment_size,
2508                                int64 cache_size,
2509                                int blocks_per_segment,
2510                                int64 read_threshold,
2511                                int64 write_threshold,
2512                                int non_destructive) {
2513   if (read_block_size != -1) {
2514     // Blocks must be aligned to the disk's sector size.
2515     if (read_block_size % kSectorSize != 0) {
2516       logprintf(0, "Process Error: Block size must be a multiple of %d "
2517                 "(thread %d).\n", kSectorSize, thread_num_);
2518       return false;
2519     }
2520
2521     read_block_size_ = read_block_size;
2522   }
2523
2524   if (write_block_size != -1) {
2525     // Write blocks must be aligned to the disk's sector size and to the
2526     // block size.
2527     if (write_block_size % kSectorSize != 0) {
2528       logprintf(0, "Process Error: Write block size must be a multiple "
2529                 "of %d (thread %d).\n", kSectorSize, thread_num_);
2530       return false;
2531     }
2532     if (write_block_size % read_block_size_ != 0) {
2533       logprintf(0, "Process Error: Write block size must be a multiple "
2534                 "of the read block size, which is %d (thread %d).\n",
2535                 read_block_size_, thread_num_);
2536       return false;
2537     }
2538
2539     write_block_size_ = write_block_size;
2540
2541   } else {
2542     // Make sure write_block_size_ is still valid.
2543     if (read_block_size_ > write_block_size_) {
2544       logprintf(5, "Log: Assuming write block size equal to read block size, "
2545                 "which is %d (thread %d).\n", read_block_size_,
2546                 thread_num_);
2547       write_block_size_ = read_block_size_;
2548     } else {
2549       if (write_block_size_ % read_block_size_ != 0) {
2550         logprintf(0, "Process Error: Write block size (defined as %d) must "
2551                   "be a multiple of the read block size, which is %d "
2552                   "(thread %d).\n", write_block_size_, read_block_size_,
2553                   thread_num_);
2554         return false;
2555       }
2556     }
2557   }
2558
2559   if (cache_size != -1) {
2560     cache_size_ = cache_size;
2561   }
2562
2563   if (blocks_per_segment != -1) {
2564     if (blocks_per_segment <= 0) {
2565       logprintf(0, "Process Error: Blocks per segment must be greater than "
2566                    "zero.\n (thread %d)", thread_num_);
2567       return false;
2568     }
2569
2570     blocks_per_segment_ = blocks_per_segment;
2571   }
2572
2573   if (read_threshold != -1) {
2574     if (read_threshold <= 0) {
2575       logprintf(0, "Process Error: Read threshold must be greater than "
2576                    "zero (thread %d).\n", thread_num_);
2577       return false;
2578     }
2579
2580     read_threshold_ = read_threshold;
2581   }
2582
2583   if (write_threshold != -1) {
2584     if (write_threshold <= 0) {
2585       logprintf(0, "Process Error: Write threshold must be greater than "
2586                    "zero (thread %d).\n", thread_num_);
2587       return false;
2588     }
2589
2590     write_threshold_ = write_threshold;
2591   }
2592
2593   if (segment_size != -1) {
2594     // Segments must be aligned to the disk's sector size.
2595     if (segment_size % kSectorSize != 0) {
2596       logprintf(0, "Process Error: Segment size must be a multiple of %d"
2597                 " (thread %d).\n", kSectorSize, thread_num_);
2598       return false;
2599     }
2600
2601     segment_size_ = segment_size / kSectorSize;
2602   }
2603
2604   non_destructive_ = non_destructive;
2605
2606   // Having a queue of 150% of blocks that will fit in the disk's cache
2607   // should be enough to force out the oldest block before it is read and hence,
2608   // making sure the data comes form the disk and not the cache.
2609   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2610   // Updating DiskBlockTable parameters
2611   if (update_block_table_) {
2612     block_table_->SetParameters(kSectorSize, write_block_size_,
2613                                 device_sectors_, segment_size_,
2614                                 device_name_);
2615   }
2616   return true;
2617 }
2618
2619 bool DiskThread::OpenDevice(int *pfile) {
2620   int fd = open(device_name_.c_str(),
2621                 O_RDWR | O_SYNC | O_DIRECT | O_LARGEFILE,
2622                 0);
2623   if (fd < 0) {
2624     logprintf(0, "Process Error: Failed to open device %s (thread %d)!!\n",
2625               device_name_.c_str(), thread_num_);
2626     return false;
2627   }
2628   *pfile = fd;
2629
2630   return GetDiskSize(fd);
2631 }
2632
2633 // Retrieves the size (in bytes) of the disk/file.
2634 bool DiskThread::GetDiskSize(int fd) {
2635   struct stat device_stat;
2636   if (fstat(fd, &device_stat) == -1) {
2637     logprintf(0, "Process Error: Unable to fstat disk %s (thread %d).\n",
2638               device_name_.c_str(), thread_num_);
2639     return false;
2640   }
2641
2642   // For a block device, an ioctl is needed to get the size since the size
2643   // of the device file (i.e. /dev/sdb) is 0.
2644   if (S_ISBLK(device_stat.st_mode)) {
2645     uint64 block_size = 0;
2646
2647     if (ioctl(fd, BLKGETSIZE64, &block_size) == -1) {
2648       logprintf(0, "Process Error: Unable to ioctl disk %s (thread %d).\n",
2649                 device_name_.c_str(), thread_num_);
2650       return false;
2651     }
2652
2653     // If an Elephant is initialized with status DEAD its size will be zero.
2654     if (block_size == 0) {
2655       os_->ErrorReport(device_name_.c_str(), "device-size-zero", 1);
2656       ++errorcount_;
2657       status_ = 1;  // Avoid a procedural error.
2658       return false;
2659     }
2660
2661     device_sectors_ = block_size / kSectorSize;
2662
2663   } else if (S_ISREG(device_stat.st_mode)) {
2664     device_sectors_ = device_stat.st_size / kSectorSize;
2665
2666   } else {
2667     logprintf(0, "Process Error: %s is not a regular file or block "
2668               "device (thread %d).\n", device_name_.c_str(),
2669               thread_num_);
2670     return false;
2671   }
2672
2673   logprintf(12, "Log: Device sectors: %lld on disk %s (thread %d).\n",
2674             device_sectors_, device_name_.c_str(), thread_num_);
2675
2676   if (update_block_table_) {
2677     block_table_->SetParameters(kSectorSize, write_block_size_,
2678                                 device_sectors_, segment_size_,
2679                                 device_name_);
2680   }
2681
2682   return true;
2683 }
2684
2685 bool DiskThread::CloseDevice(int fd) {
2686   close(fd);
2687   return true;
2688 }
2689
2690 // Return the time in microseconds.
2691 int64 DiskThread::GetTime() {
2692   struct timeval tv;
2693   gettimeofday(&tv, NULL);
2694   return tv.tv_sec * 1000000 + tv.tv_usec;
2695 }
2696
2697 bool DiskThread::DoWork(int fd) {
2698   int64 block_num = 0;
2699   blocks_written_ = 0;
2700   blocks_read_ = 0;
2701   int64 num_segments;
2702
2703   if (segment_size_ == -1) {
2704     num_segments = 1;
2705   } else {
2706     num_segments = device_sectors_ / segment_size_;
2707     if (device_sectors_ % segment_size_ != 0)
2708       num_segments++;
2709   }
2710
2711   // Disk size should be at least 3x cache size.  See comment later for
2712   // details.
2713   sat_assert(device_sectors_ * kSectorSize > 3 * cache_size_);
2714
2715   // This disk test works by writing blocks with a certain pattern to
2716   // disk, then reading them back and verifying it against the pattern
2717   // at a later time.  A failure happens when either the block cannot
2718   // be written/read or when the read block is different than what was
2719   // written.  If a block takes too long to write/read, then a warning
2720   // is given instead of an error since taking too long is not
2721   // necessarily an error.
2722   //
2723   // To prevent the read blocks from coming from the disk cache,
2724   // enough blocks are written before read such that a block would
2725   // be ejected from the disk cache by the time it is read.
2726   //
2727   // TODO(amistry): Implement some sort of read/write throttling.  The
2728   //                flood of asynchronous I/O requests when a drive is
2729   //                unplugged is causing the application and kernel to
2730   //                become unresponsive.
2731
2732   while (IsReadyToRun()) {
2733     // Write blocks to disk.
2734     logprintf(16, "Write phase for disk %s (thread %d).\n",
2735               device_name_.c_str(), thread_num_);
2736     while (IsReadyToRunNoPause() &&
2737            in_flight_sectors_.size() < queue_size_ + 1) {
2738       // Confine testing to a particular segment of the disk.
2739       int64 segment = (block_num / blocks_per_segment_) % num_segments;
2740       if (block_num % blocks_per_segment_ == 0) {
2741         logprintf(20, "Log: Starting to write segment %lld out of "
2742                   "%lld on disk %s (thread %d).\n",
2743                   segment, num_segments, device_name_.c_str(),
2744                   thread_num_);
2745       }
2746       block_num++;
2747
2748       BlockData *block = block_table_->GetUnusedBlock(segment);
2749
2750       // If an unused sequence of sectors could not be found, skip to the
2751       // next block to process.  Soon, a new segment will come and new
2752       // sectors will be able to be allocated.  This effectively puts a
2753       // minumim on the disk size at 3x the stated cache size, or 48MiB
2754       // if a cache size is not given (since the cache is set as 16MiB
2755       // by default).  Given that todays caches are at the low MiB range
2756       // and drive sizes at the mid GB, this shouldn't pose a problem.
2757       // The 3x minimum comes from the following:
2758       //   1. In order to allocate 'y' blocks from a segment, the
2759       //      segment must contain at least 2y blocks or else an
2760       //      allocation may not succeed.
2761       //   2. Assume the entire disk is one segment.
2762       //   3. A full write phase consists of writing blocks corresponding to
2763       //      3/2 cache size.
2764       //   4. Therefore, the one segment must have 2 * 3/2 * cache
2765       //      size worth of blocks = 3 * cache size worth of blocks
2766       //      to complete.
2767       // In non-destructive mode, don't write anything to disk.
2768       if (!non_destructive_) {
2769         if (!WriteBlockToDisk(fd, block)) {
2770           block_table_->RemoveBlock(block);
2771           continue;
2772         }
2773       }
2774
2775       block->SetBlockAsInitialized();
2776
2777       blocks_written_++;
2778       in_flight_sectors_.push(block);
2779     }
2780
2781     // Verify blocks on disk.
2782     logprintf(20, "Read phase for disk %s (thread %d).\n",
2783               device_name_.c_str(), thread_num_);
2784     while (IsReadyToRunNoPause() && !in_flight_sectors_.empty()) {
2785       BlockData *block = in_flight_sectors_.front();
2786       in_flight_sectors_.pop();
2787       ValidateBlockOnDisk(fd, block);
2788       block_table_->RemoveBlock(block);
2789       blocks_read_++;
2790     }
2791   }
2792
2793   pages_copied_ = blocks_written_ + blocks_read_;
2794   return true;
2795 }
2796
2797 // Do an asynchronous disk I/O operation.
2798 bool DiskThread::AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
2799                             int64 offset, int64 timeout) {
2800   // Use the Linux native asynchronous I/O interface for reading/writing.
2801   // A read/write consists of three basic steps:
2802   //    1. create an io context.
2803   //    2. prepare and submit an io request to the context
2804   //    3. wait for an event on the context.
2805
2806   struct {
2807     const int opcode;
2808     const char *op_str;
2809     const char *error_str;
2810   } operations[2] = {
2811     { IOCB_CMD_PREAD, "read", "disk-read-error" },
2812     { IOCB_CMD_PWRITE, "write", "disk-write-error" }
2813   };
2814
2815   struct iocb cb;
2816   memset(&cb, 0, sizeof(cb));
2817
2818   cb.aio_fildes = fd;
2819   cb.aio_lio_opcode = operations[op].opcode;
2820   cb.aio_buf = (__u64)buf;
2821   cb.aio_nbytes = size;
2822   cb.aio_offset = offset;
2823
2824   struct iocb *cbs[] = { &cb };
2825   if (io_submit(aio_ctx_, 1, cbs) != 1) {
2826     logprintf(0, "Process Error: Unable to submit async %s "
2827                  "on disk %s (thread %d).\n",
2828               operations[op].op_str, device_name_.c_str(),
2829               thread_num_);
2830     return false;
2831   }
2832
2833   struct io_event event;
2834   memset(&event, 0, sizeof(event));
2835   struct timespec tv;
2836   tv.tv_sec = timeout / 1000000;
2837   tv.tv_nsec = (timeout % 1000000) * 1000;
2838   if (io_getevents(aio_ctx_, 1, 1, &event, &tv) != 1) {
2839     // A ctrl-c from the keyboard will cause io_getevents to fail with an
2840     // EINTR error code.  This is not an error and so don't treat it as such,
2841     // but still log it.
2842     if (errno == EINTR) {
2843       logprintf(5, "Log: %s interrupted on disk %s (thread %d).\n",
2844                 operations[op].op_str, device_name_.c_str(),
2845                 thread_num_);
2846     } else {
2847       os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2848       errorcount_ += 1;
2849       logprintf(0, "Hardware Error: Timeout doing async %s to sectors "
2850                    "starting at %lld on disk %s (thread %d).\n",
2851                 operations[op].op_str, offset / kSectorSize,
2852                 device_name_.c_str(), thread_num_);
2853     }
2854
2855     // Don't bother checking return codes since io_cancel seems to always fail.
2856     // Since io_cancel is always failing, destroying and recreating an I/O
2857     // context is a workaround for canceling an in-progress I/O operation.
2858     // TODO(amistry): Find out why io_cancel isn't working and make it work.
2859     io_cancel(aio_ctx_, &cb, &event);
2860     io_destroy(aio_ctx_);
2861     aio_ctx_ = 0;
2862     if (io_setup(5, &aio_ctx_)) {
2863       logprintf(0, "Process Error: Unable to create aio context on disk %s"
2864                 " (thread %d).\n",
2865                 device_name_.c_str(), thread_num_);
2866     }
2867
2868     return false;
2869   }
2870
2871   // event.res contains the number of bytes written/read or
2872   // error if < 0, I think.
2873   if (event.res != size) {
2874     errorcount_++;
2875     os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2876
2877     if (event.res < 0) {
2878       switch (event.res) {
2879         case -EIO:
2880           logprintf(0, "Hardware Error: Low-level I/O error while doing %s to "
2881                        "sectors starting at %lld on disk %s (thread %d).\n",
2882                     operations[op].op_str, offset / kSectorSize,
2883                     device_name_.c_str(), thread_num_);
2884           break;
2885         default:
2886           logprintf(0, "Hardware Error: Unknown error while doing %s to "
2887                        "sectors starting at %lld on disk %s (thread %d).\n",
2888                     operations[op].op_str, offset / kSectorSize,
2889                     device_name_.c_str(), thread_num_);
2890       }
2891     } else {
2892       logprintf(0, "Hardware Error: Unable to %s to sectors starting at "
2893                    "%lld on disk %s (thread %d).\n",
2894                 operations[op].op_str, offset / kSectorSize,
2895                 device_name_.c_str(), thread_num_);
2896     }
2897     return false;
2898   }
2899
2900   return true;
2901 }
2902
2903 // Write a block to disk.
2904 bool DiskThread::WriteBlockToDisk(int fd, BlockData *block) {
2905   memset(block_buffer_, 0, block->GetSize());
2906
2907   // Fill block buffer with a pattern
2908   struct page_entry pe;
2909   if (!sat_->GetValid(&pe)) {
2910     // Even though a valid page could not be obatined, it is not an error
2911     // since we can always fill in a pattern directly, albeit slower.
2912     unsigned int *memblock = static_cast<unsigned int *>(block_buffer_);
2913     block->SetPattern(patternlist_->GetRandomPattern());
2914
2915     logprintf(11, "Log: Warning, using pattern fill fallback in "
2916                   "DiskThread::WriteBlockToDisk on disk %s (thread %d).\n",
2917               device_name_.c_str(), thread_num_);
2918
2919     for (int i = 0; i < block->GetSize()/wordsize_; i++) {
2920       memblock[i] = block->GetPattern()->pattern(i);
2921     }
2922   } else {
2923     memcpy(block_buffer_, pe.addr, block->GetSize());
2924     block->SetPattern(pe.pattern);
2925     sat_->PutValid(&pe);
2926   }
2927
2928   logprintf(12, "Log: Writing %lld sectors starting at %lld on disk %s"
2929             " (thread %d).\n",
2930             block->GetSize()/kSectorSize, block->GetAddress(),
2931             device_name_.c_str(), thread_num_);
2932
2933   int64 start_time = GetTime();
2934
2935   if (!AsyncDiskIO(ASYNC_IO_WRITE, fd, block_buffer_, block->GetSize(),
2936                    block->GetAddress() * kSectorSize, write_timeout_)) {
2937     return false;
2938   }
2939
2940   int64 end_time = GetTime();
2941   logprintf(12, "Log: Writing time: %lld us (thread %d).\n",
2942             end_time - start_time, thread_num_);
2943   if (end_time - start_time > write_threshold_) {
2944     logprintf(5, "Log: Write took %lld us which is longer than threshold "
2945                  "%lld us on disk %s (thread %d).\n",
2946               end_time - start_time, write_threshold_, device_name_.c_str(),
2947               thread_num_);
2948   }
2949
2950   return true;
2951 }
2952
2953 // Verify a block on disk.
2954 bool DiskThread::ValidateBlockOnDisk(int fd, BlockData *block) {
2955   int64 blocks = block->GetSize() / read_block_size_;
2956   int64 bytes_read = 0;
2957   int64 current_blocks;
2958   int64 current_bytes;
2959   uint64 address = block->GetAddress();
2960
2961   logprintf(20, "Log: Reading sectors starting at %lld on disk %s "
2962             "(thread %d).\n",
2963             address, device_name_.c_str(), thread_num_);
2964
2965   // Read block from disk and time the read.  If it takes longer than the
2966   // threshold, complain.
2967   if (lseek(fd, address * kSectorSize, SEEK_SET) == -1) {
2968     logprintf(0, "Process Error: Unable to seek to sector %lld in "
2969               "DiskThread::ValidateSectorsOnDisk on disk %s "
2970               "(thread %d).\n", address, device_name_.c_str(), thread_num_);
2971     return false;
2972   }
2973   int64 start_time = GetTime();
2974
2975   // Split a large write-sized block into small read-sized blocks and
2976   // read them in groups of randomly-sized multiples of read block size.
2977   // This assures all data written on disk by this particular block
2978   // will be tested using a random reading pattern.
2979
2980   while (blocks != 0) {
2981     // Test all read blocks in a written block.
2982     current_blocks = (random() % blocks) + 1;
2983     current_bytes = current_blocks * read_block_size_;
2984
2985     memset(block_buffer_, 0, current_bytes);
2986
2987     logprintf(20, "Log: Reading %lld sectors starting at sector %lld on "
2988               "disk %s (thread %d)\n",
2989               current_bytes / kSectorSize,
2990               (address * kSectorSize + bytes_read) / kSectorSize,
2991               device_name_.c_str(), thread_num_);
2992
2993     if (!AsyncDiskIO(ASYNC_IO_READ, fd, block_buffer_, current_bytes,
2994                      address * kSectorSize + bytes_read,
2995                      write_timeout_)) {
2996       return false;
2997     }
2998
2999     int64 end_time = GetTime();
3000     logprintf(20, "Log: Reading time: %lld us (thread %d).\n",
3001               end_time - start_time, thread_num_);
3002     if (end_time - start_time > read_threshold_) {
3003       logprintf(5, "Log: Read took %lld us which is longer than threshold "
3004                 "%lld us on disk %s (thread %d).\n",
3005                 end_time - start_time, read_threshold_,
3006                 device_name_.c_str(), thread_num_);
3007     }
3008
3009     // In non-destructive mode, don't compare the block to the pattern since
3010     // the block was never written to disk in the first place.
3011     if (!non_destructive_) {
3012       if (CheckRegion(block_buffer_, block->GetPattern(), current_bytes,
3013                       0, bytes_read)) {
3014         os_->ErrorReport(device_name_.c_str(), "disk-pattern-error", 1);
3015         errorcount_ += 1;
3016         logprintf(0, "Hardware Error: Pattern mismatch in block starting at "
3017                   "sector %lld in DiskThread::ValidateSectorsOnDisk on "
3018                   "disk %s (thread %d).\n",
3019                   address, device_name_.c_str(), thread_num_);
3020       }
3021     }
3022
3023     bytes_read += current_blocks * read_block_size_;
3024     blocks -= current_blocks;
3025   }
3026
3027   return true;
3028 }
3029
3030 int DiskThread::Work() {
3031   int fd;
3032
3033   logprintf(9, "Log: Starting disk thread %d, disk %s\n",
3034             thread_num_, device_name_.c_str());
3035
3036   srandom(time(NULL));
3037
3038   if (!OpenDevice(&fd)) {
3039     return 0;
3040   }
3041
3042   // Allocate a block buffer aligned to 512 bytes since the kernel requires it
3043   // when using direst IO.
3044
3045   int result = posix_memalign(&block_buffer_, kBufferAlignment,
3046                               sat_->page_length());
3047   if (result) {
3048     CloseDevice(fd);
3049     logprintf(0, "Process Error: Unable to allocate memory for buffers "
3050                  "for disk %s (thread %d) posix memalign returned %d.\n",
3051               device_name_.c_str(), thread_num_, result);
3052     status_ += 1;
3053     return false;
3054   }
3055
3056   if (io_setup(5, &aio_ctx_)) {
3057     logprintf(0, "Process Error: Unable to create aio context for disk %s"
3058               " (thread %d).\n",
3059               device_name_.c_str(), thread_num_);
3060     return 0;
3061   }
3062
3063   DoWork(fd);
3064
3065   status_ = 1;
3066
3067   io_destroy(aio_ctx_);
3068   CloseDevice(fd);
3069   free(block_buffer_);
3070
3071   logprintf(9, "Log: Completed %d (disk %s): disk thread status %d, "
3072                "%d pages copied\n",
3073             thread_num_, device_name_.c_str(), status_, pages_copied_);
3074   return 1;
3075 }
3076
3077 RandomDiskThread::RandomDiskThread(DiskBlockTable *block_table)
3078     : DiskThread(block_table) {
3079   update_block_table_ = 0;
3080 }
3081
3082 RandomDiskThread::~RandomDiskThread() {
3083 }
3084
3085 bool RandomDiskThread::DoWork(int fd) {
3086   blocks_read_ = 0;
3087   blocks_written_ = 0;
3088   logprintf(11, "Random phase for disk %s (thread %d).\n",
3089             device_name_.c_str(), thread_num_);
3090   while (IsReadyToRun()) {
3091     BlockData *block = block_table_->GetRandomBlock();
3092     if (block == NULL) {
3093       logprintf(12, "No block available for device %s (thread %d).\n",
3094                 device_name_.c_str(), thread_num_);
3095     } else {
3096       ValidateBlockOnDisk(fd, block);
3097       block_table_->ReleaseBlock(block);
3098       blocks_read_++;
3099     }
3100   }
3101   pages_copied_ = blocks_read_;
3102   return true;
3103 }
3104
3105 MemoryRegionThread::MemoryRegionThread() {
3106   error_injection_ = false;
3107   pages_ = NULL;
3108 }
3109
3110 MemoryRegionThread::~MemoryRegionThread() {
3111   if (pages_ != NULL)
3112     delete pages_;
3113 }
3114
3115 bool MemoryRegionThread::SetRegion(void *region, int64 size) {
3116   int plength = sat_->page_length();
3117   int npages = size / plength;
3118   if (size % plength) {
3119     logprintf(0, "Process Error: region size is not a multiple of SAT "
3120               "page length\n");
3121     return false;
3122   } else {
3123     if (pages_ != NULL)
3124       delete pages_;
3125     pages_ = new PageEntryQueue(npages);
3126     char *base_addr = reinterpret_cast<char*>(region);
3127     region_ = base_addr;
3128     for (int i = 0; i < npages; i++) {
3129       struct page_entry pe;
3130       init_pe(&pe);
3131       pe.addr = reinterpret_cast<void*>(base_addr + i * plength);
3132       pe.offset = i * plength;
3133
3134       pages_->Push(&pe);
3135     }
3136     return true;
3137   }
3138 }
3139
3140 void MemoryRegionThread::ProcessError(struct ErrorRecord *error,
3141                                       int priority,
3142                                       const char *message) {
3143   uint32 buffer_offset;
3144   if (phase_ == kPhaseCopy) {
3145     // If the error occurred on the Copy Phase, it means that
3146     // the source data (i.e., the main memory) is wrong. so
3147     // just pass it to the original ProcessError to call a
3148     // bad-dimm error
3149     WorkerThread::ProcessError(error, priority, message);
3150   } else if (phase_ == kPhaseCheck) {
3151     // A error on the Check Phase means that the memory region tested
3152     // has an error. Gathering more information and then reporting
3153     // the error.
3154     // Determine if this is a write or read error.
3155     os_->Flush(error->vaddr);
3156     error->reread = *(error->vaddr);
3157     char *good = reinterpret_cast<char*>(&(error->expected));
3158     char *bad = reinterpret_cast<char*>(&(error->actual));
3159     sat_assert(error->expected != error->actual);
3160     unsigned int offset = 0;
3161     for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
3162       if (good[offset] != bad[offset])
3163         break;
3164     }
3165
3166     error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
3167
3168     buffer_offset = error->vbyteaddr - region_;
3169
3170     // Find physical address if possible.
3171     error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
3172     logprintf(priority,
3173               "%s: miscompare on %s, CRC check at %p(0x%llx), "
3174               "offset %llx: read:0x%016llx, reread:0x%016llx "
3175               "expected:0x%016llx\n",
3176               message,
3177               identifier_.c_str(),
3178               error->vaddr,
3179               error->paddr,
3180               buffer_offset,
3181               error->actual,
3182               error->reread,
3183               error->expected);
3184   } else {
3185     logprintf(0, "Process Error: memory region thread raised an "
3186               "unexpected error.");
3187   }
3188 }
3189
3190 int MemoryRegionThread::Work() {
3191   struct page_entry source_pe;
3192   struct page_entry memregion_pe;
3193   int result = 1;
3194   int64 loops = 0;
3195   const uint64 error_constant = 0x00ba00000000ba00LL;
3196
3197   // For error injection.
3198   int64 *addr = 0x0;
3199   int offset = 0;
3200   int64 data = 0;
3201
3202   logprintf(9, "Log: Starting Memory Region thread %d\n", thread_num_);
3203
3204   while (IsReadyToRun()) {
3205     // Getting pages from SAT and queue.
3206     phase_ = kPhaseNoPhase;
3207     result &= sat_->GetValid(&source_pe);
3208     if (!result) {
3209       logprintf(0, "Process Error: memory region thread failed to pop "
3210                 "pages from SAT, bailing\n");
3211       break;
3212     }
3213
3214     result &= pages_->PopRandom(&memregion_pe);
3215     if (!result) {
3216       logprintf(0, "Process Error: memory region thread failed to pop "
3217                 "pages from queue, bailing\n");
3218       break;
3219     }
3220
3221     // Error injection for CRC copy.
3222     if ((sat_->error_injection() || error_injection_) && loops == 1) {
3223       addr = reinterpret_cast<int64*>(source_pe.addr);
3224       offset = random() % (sat_->page_length() / wordsize_);
3225       data = addr[offset];
3226       addr[offset] = error_constant;
3227     }
3228
3229     // Copying SAT page into memory region.
3230     phase_ = kPhaseCopy;
3231     CrcCopyPage(&memregion_pe, &source_pe);
3232     memregion_pe.pattern = source_pe.pattern;
3233
3234     // Error injection for CRC Check.
3235     if ((sat_->error_injection() || error_injection_) && loops == 2) {
3236       addr = reinterpret_cast<int64*>(memregion_pe.addr);
3237       offset = random() % (sat_->page_length() / wordsize_);
3238       data = addr[offset];
3239       addr[offset] = error_constant;
3240     }
3241
3242     // Checking page content in memory region.
3243     phase_ = kPhaseCheck;
3244     CrcCheckPage(&memregion_pe);
3245
3246     phase_ = kPhaseNoPhase;
3247     // Storing pages on their proper queues.
3248     result &= sat_->PutValid(&source_pe);
3249     if (!result) {
3250       logprintf(0, "Process Error: memory region thread failed to push "
3251                 "pages into SAT, bailing\n");
3252       break;
3253     }
3254     result &= pages_->Push(&memregion_pe);
3255     if (!result) {
3256       logprintf(0, "Process Error: memory region thread failed to push "
3257                 "pages into queue, bailing\n");
3258       break;
3259     }
3260
3261     if ((sat_->error_injection() || error_injection_) &&
3262         loops >= 1 && loops <= 2) {
3263       addr[offset] = data;
3264     }
3265
3266     loops++;
3267     YieldSelf();
3268   }
3269
3270   pages_copied_ = loops;
3271   status_ = result;
3272   logprintf(9, "Log: Completed %d: Memory Region thread. Status %d, %d "
3273             "pages checked\n", thread_num_, status_, pages_copied_);
3274   return 1;
3275 }