chiark / gitweb /
6a00db2085338ccb464321e7b5eeb027c13e4da6
[stressapptest] / src / worker.cc
1 // Copyright 2006 Google Inc. All Rights Reserved.
2
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6
7 //      http://www.apache.org/licenses/LICENSE-2.0
8
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // worker.cc : individual tasks that can be run in combination to
16 // stress the system
17
18 #include <errno.h>
19 #include <malloc.h>
20 #include <pthread.h>
21 #include <sched.h>
22 #include <signal.h>
23 #include <stdlib.h>
24 #include <stdio.h>
25 #include <stdint.h>
26 #include <string.h>
27 #include <time.h>
28 #include <unistd.h>
29
30 #include <sys/select.h>
31 #include <sys/stat.h>
32 #include <sys/types.h>
33 #include <sys/times.h>
34
35 // These are necessary, but on by default
36 // #define __USE_GNU
37 // #define __USE_LARGEFILE64
38 #include <fcntl.h>
39 #include <sys/socket.h>
40 #include <netdb.h>
41 #include <arpa/inet.h>
42 #include <linux/unistd.h>  // for gettid
43 // For size of block device
44 #include <sys/ioctl.h>
45 #include <linux/fs.h>
46 // For asynchronous I/O
47 #include <linux/aio_abi.h>
48
49 #include <sys/syscall.h>
50
51 #include <set>
52 #include <string>
53
54 // This file must work with autoconf on its public version,
55 // so these includes are correct.
56 #include "error_diag.h"  // NOLINT
57 #include "os.h"          // NOLINT
58 #include "pattern.h"     // NOLINT
59 #include "queue.h"       // NOLINT
60 #include "sat.h"         // NOLINT
61 #include "sattypes.h"    // NOLINT
62 #include "worker.h"      // NOLINT
63
64 // Syscalls
65 // Why ubuntu, do you hate gettid so bad?
66 #if !defined(__NR_gettid)
67 #  define __NR_gettid             224
68 #endif
69
70 #define gettid() syscall(__NR_gettid)
71 #if !defined(CPU_SETSIZE)
72 _syscall3(int, sched_getaffinity, pid_t, pid,
73           unsigned int, len, cpu_set_t*, mask)
74 _syscall3(int, sched_setaffinity, pid_t, pid,
75           unsigned int, len, cpu_set_t*, mask)
76 #endif
77
78 // Linux aio syscalls.
79 #if !defined(__NR_io_setup)
80 #define __NR_io_setup   206
81 #define __NR_io_destroy 207
82 #define __NR_io_getevents       208
83 #define __NR_io_submit  209
84 #define __NR_io_cancel  210
85 #endif
86
87 #define io_setup(nr_events, ctxp) \
88   syscall(__NR_io_setup, (nr_events), (ctxp))
89 #define io_submit(ctx_id, nr, iocbpp) \
90   syscall(__NR_io_submit, (ctx_id), (nr), (iocbpp))
91 #define io_getevents(ctx_id, io_getevents, nr, events, timeout) \
92   syscall(__NR_io_getevents, (ctx_id), (io_getevents), (nr), (events), \
93     (timeout))
94 #define io_cancel(ctx_id, iocb, result) \
95   syscall(__NR_io_cancel, (ctx_id), (iocb), (result))
96 #define io_destroy(ctx) \
97   syscall(__NR_io_destroy, (ctx))
98
99 namespace {
100   // Get HW core ID from cpuid instruction.
101   inline int apicid(void) {
102     int cpu;
103 #ifdef STRESSAPPTEST_CPU_PPC
104     cpu = 0;
105 #else
106     __asm __volatile("cpuid" : "=b" (cpu) : "a" (1) : "cx", "dx");
107 #endif
108     return (cpu >> 24);
109   }
110
111   // Work around the sad fact that there are two (gnu, xsi) incompatible
112   // versions of strerror_r floating around google. Awesome.
113   bool sat_strerror(int err, char *buf, int len) {
114     buf[0] = 0;
115     char *errmsg = reinterpret_cast<char*>(strerror_r(err, buf, len));
116     int retval = reinterpret_cast<int64>(errmsg);
117     if (retval == 0)
118       return true;
119     if (retval == -1)
120       return false;
121     if (errmsg != buf) {
122       strncpy(buf, errmsg, len);
123       buf[len - 1] = 0;
124     }
125     return true;
126   }
127
128
129   inline uint64 addr_to_tag(void *address) {
130     return reinterpret_cast<uint64>(address);
131   }
132 }
133
134 #if !defined(O_DIRECT)
135 // Sometimes this isn't available.
136 // Disregard if it's not defined.
137   #define O_DIRECT            0
138 #endif
139
140 // A struct to hold captured errors, for later reporting.
141 struct ErrorRecord {
142   uint64 actual;  // This is the actual value read.
143   uint64 reread;  // This is the actual value, reread.
144   uint64 expected;  // This is what it should have been.
145   uint64 *vaddr;  // This is where it was (or wasn't).
146   char *vbyteaddr;  // This is byte specific where the data was (or wasn't).
147   uint64 paddr;  // This is the bus address, if available.
148   uint64 *tagvaddr;  // This holds the tag value if this data was tagged.
149   uint64 tagpaddr;  // This holds the physical address corresponding to the tag.
150 };
151
152 // This is a helper function to create new threads with pthreads.
153 static void *ThreadSpawnerGeneric(void *ptr) {
154   WorkerThread *worker = static_cast<WorkerThread*>(ptr);
155   worker->StartRoutine();
156   return NULL;
157 }
158
159
160 void WorkerStatus::Initialize() {
161   sat_assert(0 == pthread_mutex_init(&num_workers_mutex_, NULL));
162   sat_assert(0 == pthread_rwlock_init(&status_rwlock_, NULL));
163   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL,
164                                        num_workers_ + 1));
165 }
166
167 void WorkerStatus::Destroy() {
168   sat_assert(0 == pthread_mutex_destroy(&num_workers_mutex_));
169   sat_assert(0 == pthread_rwlock_destroy(&status_rwlock_));
170   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
171 }
172
173 void WorkerStatus::PauseWorkers() {
174   if (SetStatus(PAUSE) != PAUSE)
175     WaitOnPauseBarrier();
176 }
177
178 void WorkerStatus::ResumeWorkers() {
179   if (SetStatus(RUN) == PAUSE)
180     WaitOnPauseBarrier();
181 }
182
183 void WorkerStatus::StopWorkers() {
184   if (SetStatus(STOP) == PAUSE)
185     WaitOnPauseBarrier();
186 }
187
188 bool WorkerStatus::ContinueRunning() {
189   // This loop is an optimization.  We use it to immediately re-check the status
190   // after resuming from a pause, instead of returning and waiting for the next
191   // call to this function.
192   for (;;) {
193     switch (GetStatus()) {
194       case RUN:
195         return true;
196       case PAUSE:
197         // Wait for the other workers to call this function so that
198         // PauseWorkers() can return.
199         WaitOnPauseBarrier();
200         // Wait for ResumeWorkers() to be called.
201         WaitOnPauseBarrier();
202         break;
203       case STOP:
204         return false;
205     }
206   }
207 }
208
209 bool WorkerStatus::ContinueRunningNoPause() {
210   return (GetStatus() != STOP);
211 }
212
213 void WorkerStatus::RemoveSelf() {
214   // Acquire a read lock on status_rwlock_ while (status_ != PAUSE).
215   for (;;) {
216     AcquireStatusReadLock();
217     if (status_ != PAUSE)
218       break;
219     // We need to obey PauseWorkers() just like ContinueRunning() would, so that
220     // the other threads won't wait on pause_barrier_ forever.
221     ReleaseStatusLock();
222     // Wait for the other workers to call this function so that PauseWorkers()
223     // can return.
224     WaitOnPauseBarrier();
225     // Wait for ResumeWorkers() to be called.
226     WaitOnPauseBarrier();
227   }
228
229   // This lock would be unnecessary if we held a write lock instead of a read
230   // lock on status_rwlock_, but that would also force all threads calling
231   // ContinueRunning() to wait on this one.  Using a separate lock avoids that.
232   AcquireNumWorkersLock();
233   // Decrement num_workers_ and reinitialize pause_barrier_, which we know isn't
234   // in use because (status != PAUSE).
235   sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
236   sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL, num_workers_));
237   --num_workers_;
238   ReleaseNumWorkersLock();
239
240   // Release status_rwlock_.
241   ReleaseStatusLock();
242 }
243
244
245 // Parent thread class.
246 WorkerThread::WorkerThread() {
247   status_ = 0;
248   pages_copied_ = 0;
249   errorcount_ = 0;
250   runduration_usec_ = 0;
251   priority_ = Normal;
252   worker_status_ = NULL;
253   thread_spawner_ = &ThreadSpawnerGeneric;
254   tag_mode_ = false;
255 }
256
257 WorkerThread::~WorkerThread() {}
258
259 // Constructors. Just init some default values.
260 FillThread::FillThread() {
261   num_pages_to_fill_ = 0;
262 }
263
264 // Initialize file name to empty.
265 FileThread::FileThread() {
266   filename_ = "";
267   devicename_ = "";
268   pass_ = 0;
269   page_io_ = true;
270   crc_page_ = -1;
271 }
272
273 // If file thread used bounce buffer in memory, account for the extra
274 // copy for memory bandwidth calculation.
275 float FileThread::GetMemoryCopiedData() {
276   if (!os_->normal_mem())
277     return GetCopiedData();
278   else
279     return 0;
280 }
281
282 // Initialize target hostname to be invalid.
283 NetworkThread::NetworkThread() {
284   snprintf(ipaddr_, sizeof(ipaddr_), "Unknown");
285   sock_ = 0;
286 }
287
288 // Initialize?
289 NetworkSlaveThread::NetworkSlaveThread() {
290 }
291
292 // Initialize?
293 NetworkListenThread::NetworkListenThread() {
294 }
295
296 // Init member variables.
297 void WorkerThread::InitThread(int thread_num_init,
298                               class Sat *sat_init,
299                               class OsLayer *os_init,
300                               class PatternList *patternlist_init,
301                               WorkerStatus *worker_status) {
302   sat_assert(worker_status);
303   worker_status->AddWorkers(1);
304
305   thread_num_ = thread_num_init;
306   sat_ = sat_init;
307   os_ = os_init;
308   patternlist_ = patternlist_init;
309   worker_status_ = worker_status;
310
311   cpu_mask_ = AvailableCpus();
312   tag_ = 0xffffffff;
313
314   tag_mode_ = sat_->tag_mode();
315 }
316
317
318 // Use pthreads to prioritize a system thread.
319 bool WorkerThread::InitPriority() {
320   // This doesn't affect performance that much, and may not be too safe.
321
322   bool ret = BindToCpus(cpu_mask_);
323   if (!ret)
324     logprintf(11, "Log: Bind to %x failed.\n", cpu_mask_);
325
326   logprintf(11, "Log: Thread %d running on apic ID %d mask %x (%x).\n",
327             thread_num_, apicid(), CurrentCpus(), cpu_mask_);
328 #if 0
329   if (priority_ == High) {
330     sched_param param;
331     param.sched_priority = 1;
332     // Set the priority; others are unchanged.
333     logprintf(0, "Log: Changing priority to SCHED_FIFO %d\n",
334               param.sched_priority);
335     if (sched_setscheduler(0, SCHED_FIFO, &param)) {
336       char buf[256];
337       sat_strerror(errno, buf, sizeof(buf));
338       logprintf(0, "Process Error: sched_setscheduler "
339                    "failed - error %d %s\n",
340                 errno, buf);
341     }
342   }
343 #endif
344   return true;
345 }
346
347 // Use pthreads to create a system thread.
348 int WorkerThread::SpawnThread() {
349   // Create the new thread.
350   int result = pthread_create(&thread_, NULL, thread_spawner_, this);
351   if (result) {
352     char buf[256];
353     sat_strerror(result, buf, sizeof(buf));
354     logprintf(0, "Process Error: pthread_create "
355                   "failed - error %d %s\n", result,
356               buf);
357     status_ += 1;
358     return false;
359   }
360
361   // 0 is pthreads success.
362   return true;
363 }
364
365 // Kill the worker thread with SIGINT.
366 int WorkerThread::KillThread() {
367   pthread_kill(thread_, SIGINT);
368   return 0;
369 }
370
371 // Block until thread has exited.
372 int WorkerThread::JoinThread() {
373   int result = pthread_join(thread_, NULL);
374
375   if (result) {
376     logprintf(0, "Process Error: pthread_join failed - error %d\n", result);
377     status_ = 0;
378   }
379
380   // 0 is pthreads success.
381   return (!result);
382 }
383
384
385 void WorkerThread::StartRoutine() {
386   InitPriority();
387   StartThreadTimer();
388   Work();
389   StopThreadTimer();
390   worker_status_->RemoveSelf();
391 }
392
393
394 // Thread work loop. Execute until marked finished.
395 int WorkerThread::Work() {
396   do {
397     logprintf(9, "Log: ...\n");
398     // Sleep for 1 second.
399     sat_sleep(1);
400   } while (IsReadyToRun());
401
402   return 0;
403 }
404
405
406 // Returns CPU mask of CPUs available to this process,
407 // Conceptually, each bit represents a logical CPU, ie:
408 //   mask = 3  (11b):   cpu0, 1
409 //   mask = 13 (1101b): cpu0, 2, 3
410 uint32 WorkerThread::AvailableCpus() {
411   cpu_set_t curr_cpus;
412   CPU_ZERO(&curr_cpus);
413   sched_getaffinity(getppid(), sizeof(curr_cpus), &curr_cpus);
414   return cpuset_to_uint32(&curr_cpus);
415 }
416
417
418 // Returns CPU mask of CPUs this thread is bound to,
419 // Conceptually, each bit represents a logical CPU, ie:
420 //   mask = 3  (11b):   cpu0, 1
421 //   mask = 13 (1101b): cpu0, 2, 3
422 uint32 WorkerThread::CurrentCpus() {
423   cpu_set_t curr_cpus;
424   CPU_ZERO(&curr_cpus);
425   sched_getaffinity(0, sizeof(curr_cpus), &curr_cpus);
426   return cpuset_to_uint32(&curr_cpus);
427 }
428
429
430 // Bind worker thread to specified CPU(s)
431 //   Args:
432 //     thread_mask: cpu_set_t representing CPUs, ie
433 //                  mask = 1  (01b):   cpu0
434 //                  mask = 3  (11b):   cpu0, 1
435 //                  mask = 13 (1101b): cpu0, 2, 3
436 //
437 //   Returns true on success, false otherwise.
438 bool WorkerThread::BindToCpus(uint32 thread_mask) {
439   uint32 process_mask = AvailableCpus();
440   if (thread_mask == process_mask)
441     return true;
442
443   logprintf(11, "Log: available CPU mask - %x\n", process_mask);
444   if ((thread_mask | process_mask) != process_mask) {
445     // Invalid cpu_mask, ie cpu not allocated to this process or doesn't exist.
446     logprintf(0, "Log: requested CPUs %x not a subset of available %x\n",
447               thread_mask, process_mask);
448     return false;
449   }
450   cpu_set_t cpuset;
451   cpuset_from_uint32(thread_mask, &cpuset);
452   return (sched_setaffinity(gettid(), sizeof(cpuset), &cpuset) == 0);
453 }
454
455
456 // A worker thread can yield itself to give up CPU until it's scheduled again.
457 //   Returns true on success, false on error.
458 bool WorkerThread::YieldSelf() {
459   return (sched_yield() == 0);
460 }
461
462
463 // Fill this page with its pattern.
464 bool WorkerThread::FillPage(struct page_entry *pe) {
465   // Error check arguments.
466   if (pe == 0) {
467     logprintf(0, "Process Error: Fill Page entry null\n");
468     return 0;
469   }
470
471   // Mask is the bitmask of indexes used by the pattern.
472   // It is the pattern size -1. Size is always a power of 2.
473   uint64 *memwords = static_cast<uint64*>(pe->addr);
474   int length = sat_->page_length();
475
476   if (tag_mode_) {
477     // Select tag or data as appropriate.
478     for (int i = 0; i < length / wordsize_; i++) {
479       datacast_t data;
480
481       if ((i & 0x7) == 0) {
482         data.l64 = addr_to_tag(&memwords[i]);
483       } else {
484         data.l32.l = pe->pattern->pattern(i << 1);
485         data.l32.h = pe->pattern->pattern((i << 1) + 1);
486       }
487       memwords[i] = data.l64;
488     }
489   } else {
490     // Just fill in untagged data directly.
491     for (int i = 0; i < length / wordsize_; i++) {
492       datacast_t data;
493
494       data.l32.l = pe->pattern->pattern(i << 1);
495       data.l32.h = pe->pattern->pattern((i << 1) + 1);
496       memwords[i] = data.l64;
497     }
498   }
499
500   return 1;
501 }
502
503
504 // Tell the thread how many pages to fill.
505 void FillThread::SetFillPages(int64 num_pages_to_fill_init) {
506   num_pages_to_fill_ = num_pages_to_fill_init;
507 }
508
509 // Fill this page with a random pattern.
510 bool FillThread::FillPageRandom(struct page_entry *pe) {
511   // Error check arguments.
512   if (pe == 0) {
513     logprintf(0, "Process Error: Fill Page entry null\n");
514     return 0;
515   }
516   if ((patternlist_ == 0) || (patternlist_->Size() == 0)) {
517     logprintf(0, "Process Error: No data patterns available\n");
518     return 0;
519   }
520
521   // Choose a random pattern for this block.
522   pe->pattern = patternlist_->GetRandomPattern();
523   if (pe->pattern == 0) {
524     logprintf(0, "Process Error: Null data pattern\n");
525     return 0;
526   }
527
528   // Actually fill the page.
529   return FillPage(pe);
530 }
531
532
533 // Memory fill work loop. Execute until alloted pages filled.
534 int FillThread::Work() {
535   int result = 1;
536
537   logprintf(9, "Log: Starting fill thread %d\n", thread_num_);
538
539   // We want to fill num_pages_to_fill pages, and
540   // stop when we've filled that many.
541   // We also want to capture early break
542   struct page_entry pe;
543   int64 loops = 0;
544   while (IsReadyToRun() && (loops < num_pages_to_fill_)) {
545     result &= sat_->GetEmpty(&pe);
546     if (!result) {
547       logprintf(0, "Process Error: fill_thread failed to pop pages, "
548                 "bailing\n");
549       break;
550     }
551
552     // Fill the page with pattern
553     result &= FillPageRandom(&pe);
554     if (!result) break;
555
556     // Put the page back on the queue.
557     result &= sat_->PutValid(&pe);
558     if (!result) {
559       logprintf(0, "Process Error: fill_thread failed to push pages, "
560                 "bailing\n");
561       break;
562     }
563     loops++;
564   }
565
566   // Fill in thread status.
567   pages_copied_ = loops;
568   status_ = result;
569   logprintf(9, "Log: Completed %d: Fill thread. Status %d, %d pages filled\n",
570             thread_num_, status_, pages_copied_);
571   return 0;
572 }
573
574
575 // Print error information about a data miscompare.
576 void WorkerThread::ProcessError(struct ErrorRecord *error,
577                                 int priority,
578                                 const char *message) {
579   char dimm_string[256] = "";
580
581   int apic_id = apicid();
582   uint32 cpumask = CurrentCpus();
583
584   // Determine if this is a write or read error.
585   os_->Flush(error->vaddr);
586   error->reread = *(error->vaddr);
587
588   char *good = reinterpret_cast<char*>(&(error->expected));
589   char *bad = reinterpret_cast<char*>(&(error->actual));
590
591   sat_assert(error->expected != error->actual);
592   unsigned int offset = 0;
593   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
594     if (good[offset] != bad[offset])
595       break;
596   }
597
598   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
599
600   // Find physical address if possible.
601   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
602
603   // Pretty print DIMM mapping if available.
604   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
605
606   // Report parseable error.
607   if (priority < 5) {
608     // Run miscompare error through diagnoser for logging and reporting.
609     os_->error_diagnoser_->AddMiscompareError(dimm_string,
610                                               reinterpret_cast<uint64>
611                                               (error->vaddr), 1);
612
613     logprintf(priority,
614               "%s: miscompare on CPU %d(%x) at %p(0x%llx:%s): "
615               "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
616               message,
617               apic_id,
618               cpumask,
619               error->vaddr,
620               error->paddr,
621               dimm_string,
622               error->actual,
623               error->reread,
624               error->expected);
625   }
626
627
628   // Overwrite incorrect data with correct data to prevent
629   // future miscompares when this data is reused.
630   *(error->vaddr) = error->expected;
631   os_->Flush(error->vaddr);
632 }
633
634
635
636 // Print error information about a data miscompare.
637 void FileThread::ProcessError(struct ErrorRecord *error,
638                               int priority,
639                               const char *message) {
640   char dimm_string[256] = "";
641
642   // Determine if this is a write or read error.
643   os_->Flush(error->vaddr);
644   error->reread = *(error->vaddr);
645
646   char *good = reinterpret_cast<char*>(&(error->expected));
647   char *bad = reinterpret_cast<char*>(&(error->actual));
648
649   sat_assert(error->expected != error->actual);
650   unsigned int offset = 0;
651   for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
652     if (good[offset] != bad[offset])
653       break;
654   }
655
656   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
657
658   // Find physical address if possible.
659   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
660
661   // Pretty print DIMM mapping if available.
662   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
663
664   // If crc_page_ is valid, ie checking content read back from file,
665   // track src/dst memory addresses. Otherwise catagorize as general
666   // mememory miscompare for CRC checking everywhere else.
667   if (crc_page_ != -1) {
668     int miscompare_byteoffset = static_cast<char*>(error->vbyteaddr) -
669                                 static_cast<char*>(page_recs_[crc_page_].dst);
670     os_->error_diagnoser_->AddHDDMiscompareError(devicename_,
671                                                  crc_page_,
672                                                  miscompare_byteoffset,
673                                                  page_recs_[crc_page_].src,
674                                                  page_recs_[crc_page_].dst);
675   } else {
676     os_->error_diagnoser_->AddMiscompareError(dimm_string,
677                                               reinterpret_cast<uint64>
678                                               (error->vaddr), 1);
679   }
680
681   logprintf(priority,
682             "%s: miscompare on %s at %p(0x%llx:%s): read:0x%016llx, "
683             "reread:0x%016llx expected:0x%016llx\n",
684             message,
685             devicename_.c_str(),
686             error->vaddr,
687             error->paddr,
688             dimm_string,
689             error->actual,
690             error->reread,
691             error->expected);
692
693   // Overwrite incorrect data with correct data to prevent
694   // future miscompares when this data is reused.
695   *(error->vaddr) = error->expected;
696   os_->Flush(error->vaddr);
697 }
698
699
700 // Do a word by word result check of a region.
701 // Print errors on mismatches.
702 int WorkerThread::CheckRegion(void *addr,
703                               class Pattern *pattern,
704                               int64 length,
705                               int offset,
706                               int64 pattern_offset) {
707   uint64 *memblock = static_cast<uint64*>(addr);
708   const int kErrorLimit = 128;
709   int errors = 0;
710   int overflowerrors = 0;  // Count of overflowed errors.
711   bool page_error = false;
712   string errormessage("Hardware Error");
713   struct ErrorRecord
714     recorded[kErrorLimit];  // Queued errors for later printing.
715
716   // For each word in the data region.
717   for (int i = 0; i < length / wordsize_; i++) {
718     uint64 actual = memblock[i];
719     uint64 expected;
720
721     // Determine the value that should be there.
722     datacast_t data;
723     int index = 2 * i + pattern_offset;
724     data.l32.l = pattern->pattern(index);
725     data.l32.h = pattern->pattern(index + 1);
726     expected = data.l64;
727     // Check tags if necessary.
728     if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
729       expected = addr_to_tag(&memblock[i]);
730     }
731
732
733     // If the value is incorrect, save an error record for later printing.
734     if (actual != expected) {
735       if (errors < kErrorLimit) {
736         recorded[errors].actual = actual;
737         recorded[errors].expected = expected;
738         recorded[errors].vaddr = &memblock[i];
739         errors++;
740       } else {
741         page_error = true;
742         // If we have overflowed the error queue, just print the errors now.
743         logprintf(10, "Log: Error record overflow, too many miscompares!\n");
744         errormessage = "Page Error";
745         break;
746       }
747     }
748   }
749
750   // Find if this is a whole block corruption.
751   if (page_error && !tag_mode_) {
752     int patsize = patternlist_->Size();
753     for (int pat = 0; pat < patsize; pat++) {
754       class Pattern *altpattern = patternlist_->GetPattern(pat);
755       const int kGood = 0;
756       const int kBad = 1;
757       const int kGoodAgain = 2;
758       const int kNoMatch = 3;
759       int state = kGood;
760       unsigned int badstart = 0;
761       unsigned int badend = 0;
762
763       // Don't match against ourself!
764       if (pattern == altpattern)
765         continue;
766
767       for (int i = 0; i < length / wordsize_; i++) {
768         uint64 actual = memblock[i];
769         datacast_t expected;
770         datacast_t possible;
771
772         // Determine the value that should be there.
773         int index = 2 * i + pattern_offset;
774
775         expected.l32.l = pattern->pattern(index);
776         expected.l32.h = pattern->pattern(index + 1);
777
778         possible.l32.l = pattern->pattern(index);
779         possible.l32.h = pattern->pattern(index + 1);
780
781         if (state == kGood) {
782           if (actual == expected.l64) {
783             continue;
784           } else if (actual == possible.l64) {
785             badstart = i;
786             badend = i;
787             state = kBad;
788             continue;
789           } else {
790             state = kNoMatch;
791             break;
792           }
793         } else if (state == kBad) {
794           if (actual == possible.l64) {
795             badend = i;
796             continue;
797           } else if (actual == expected.l64) {
798             state = kGoodAgain;
799             continue;
800           } else {
801             state = kNoMatch;
802             break;
803           }
804         } else if (state == kGoodAgain) {
805           if (actual == expected.l64) {
806             continue;
807           } else {
808             state = kNoMatch;
809             break;
810           }
811         }
812       }
813
814       if ((state == kGoodAgain) || (state == kBad)) {
815         unsigned int blockerrors = badend - badstart + 1;
816         errormessage = "Block Error";
817         ProcessError(&recorded[0], 0, errormessage.c_str());
818         logprintf(0, "Block Error: (%p) pattern %s instead of %s, "
819                   "%d bytes from offset 0x%x to 0x%x\n",
820                   &memblock[badstart],
821                   altpattern->name(), pattern->name(),
822                   blockerrors * wordsize_,
823                   offset + badstart * wordsize_,
824                   offset + badend * wordsize_);
825         errorcount_ += blockerrors;
826         return blockerrors;
827       }
828     }
829   }
830
831
832   // Process error queue after all errors have been recorded.
833   for (int err = 0; err < errors; err++) {
834     int priority = 5;
835     if (errorcount_ + err < 30)
836       priority = 0;  // Bump up the priority for the first few errors.
837     ProcessError(&recorded[err], priority, errormessage.c_str());
838   }
839
840   if (page_error) {
841     // For each word in the data region.
842     int error_recount = 0;
843     for (int i = 0; i < length / wordsize_; i++) {
844       uint64 actual = memblock[i];
845       uint64 expected;
846       datacast_t data;
847       // Determine the value that should be there.
848       int index = 2 * i + pattern_offset;
849
850       data.l32.l = pattern->pattern(index);
851       data.l32.h = pattern->pattern(index + 1);
852       expected = data.l64;
853
854       // Check tags if necessary.
855       if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
856         expected = addr_to_tag(&memblock[i]);
857       }
858
859       // If the value is incorrect, save an error record for later printing.
860       if (actual != expected) {
861         if (error_recount < kErrorLimit) {
862           // We already reported these.
863           error_recount++;
864         } else {
865           // If we have overflowed the error queue, print the errors now.
866           struct ErrorRecord er;
867           er.actual = actual;
868           er.expected = expected;
869           er.vaddr = &memblock[i];
870
871           // Do the error printout. This will take a long time and
872           // likely change the machine state.
873           ProcessError(&er, 12, errormessage.c_str());
874           overflowerrors++;
875         }
876       }
877     }
878   }
879
880   // Keep track of observed errors.
881   errorcount_ += errors + overflowerrors;
882   return errors + overflowerrors;
883 }
884
885 float WorkerThread::GetCopiedData() {
886   return pages_copied_ * sat_->page_length() / kMegabyte;
887 }
888
889 // Calculate the CRC of a region.
890 // Result check if the CRC mismatches.
891 int WorkerThread::CrcCheckPage(struct page_entry *srcpe) {
892   const int blocksize = 4096;
893   const int blockwords = blocksize / wordsize_;
894   int errors = 0;
895
896   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
897   uint64 *memblock = static_cast<uint64*>(srcpe->addr);
898   int blocks = sat_->page_length() / blocksize;
899   for (int currentblock = 0; currentblock < blocks; currentblock++) {
900     uint64 *memslice = memblock + currentblock * blockwords;
901
902     AdlerChecksum crc;
903     if (tag_mode_) {
904       AdlerAddrCrcC(memslice, blocksize, &crc, srcpe);
905     } else {
906       CalculateAdlerChecksum(memslice, blocksize, &crc);
907     }
908
909     // If the CRC does not match, we'd better look closer.
910     if (!crc.Equals(*expectedcrc)) {
911       logprintf(11, "Log: CrcCheckPage Falling through to slow compare, "
912                 "CRC mismatch %s != %s\n",
913                 crc.ToHexString().c_str(),
914                 expectedcrc->ToHexString().c_str());
915       int errorcount = CheckRegion(memslice,
916                                    srcpe->pattern,
917                                    blocksize,
918                                    currentblock * blocksize, 0);
919       if (errorcount == 0) {
920         logprintf(0, "Log: CrcCheckPage CRC mismatch %s != %s, "
921                      "but no miscompares found.\n",
922                   crc.ToHexString().c_str(),
923                   expectedcrc->ToHexString().c_str());
924       }
925       errors += errorcount;
926     }
927   }
928
929   // For odd length transfers, we should never hit this.
930   int leftovers = sat_->page_length() % blocksize;
931   if (leftovers) {
932     uint64 *memslice = memblock + blocks * blockwords;
933     errors += CheckRegion(memslice,
934                           srcpe->pattern,
935                           leftovers,
936                           blocks * blocksize, 0);
937   }
938   return errors;
939 }
940
941
942 // Print error information about a data miscompare.
943 void WorkerThread::ProcessTagError(struct ErrorRecord *error,
944                                    int priority,
945                                    const char *message) {
946   char dimm_string[256] = "";
947   char tag_dimm_string[256] = "";
948   bool read_error = false;
949
950   int apic_id = apicid();
951   uint32 cpumask = CurrentCpus();
952
953   // Determine if this is a write or read error.
954   os_->Flush(error->vaddr);
955   error->reread = *(error->vaddr);
956
957   // Distinguish read and write errors.
958   if (error->actual != error->reread) {
959     read_error = true;
960   }
961
962   sat_assert(error->expected != error->actual);
963
964   error->vbyteaddr = reinterpret_cast<char*>(error->vaddr);
965
966   // Find physical address if possible.
967   error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
968   error->tagpaddr = os_->VirtualToPhysical(error->tagvaddr);
969
970   // Pretty print DIMM mapping if available.
971   os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
972   // Pretty print DIMM mapping if available.
973   os_->FindDimm(error->tagpaddr, tag_dimm_string, sizeof(tag_dimm_string));
974
975   // Report parseable error.
976   if (priority < 5) {
977     logprintf(priority,
978               "%s: Tag from %p(0x%llx:%s) (%s) miscompare on CPU %d(%x) at "
979               "%p(0x%llx:%s): "
980               "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
981               message,
982               error->tagvaddr, error->tagpaddr,
983               tag_dimm_string,
984               read_error?"read error":"write error",
985               apic_id,
986               cpumask,
987               error->vaddr,
988               error->paddr,
989               dimm_string,
990               error->actual,
991               error->reread,
992               error->expected);
993   }
994
995   errorcount_ += 1;
996
997   // Overwrite incorrect data with correct data to prevent
998   // future miscompares when this data is reused.
999   *(error->vaddr) = error->expected;
1000   os_->Flush(error->vaddr);
1001 }
1002
1003
1004 // Print out and log a tag error.
1005 bool WorkerThread::ReportTagError(
1006     uint64 *mem64,
1007     uint64 actual,
1008     uint64 tag) {
1009   struct ErrorRecord er;
1010   er.actual = actual;
1011
1012   er.expected = tag;
1013   er.vaddr = mem64;
1014
1015   // Generate vaddr from tag.
1016   er.tagvaddr = reinterpret_cast<uint64*>(actual);
1017
1018   ProcessTagError(&er, 0, "Hardware Error");
1019   return true;
1020 }
1021
1022 // C implementation of Adler memory copy, with memory tagging.
1023 bool WorkerThread::AdlerAddrMemcpyC(uint64 *dstmem64,
1024                                     uint64 *srcmem64,
1025                                     unsigned int size_in_bytes,
1026                                     AdlerChecksum *checksum,
1027                                     struct page_entry *pe) {
1028   // Use this data wrapper to access memory with 64bit read/write.
1029   datacast_t data;
1030   datacast_t dstdata;
1031   unsigned int count = size_in_bytes / sizeof(data);
1032
1033   if (count > ((1U) << 19)) {
1034     // Size is too large, must be strictly less than 512 KB.
1035     return false;
1036   }
1037
1038   uint64 a1 = 1;
1039   uint64 a2 = 1;
1040   uint64 b1 = 0;
1041   uint64 b2 = 0;
1042
1043   class Pattern *pattern = pe->pattern;
1044
1045   unsigned int i = 0;
1046   while (i < count) {
1047     // Process 64 bits at a time.
1048     if ((i & 0x7) == 0) {
1049       data.l64 = srcmem64[i];
1050       dstdata.l64 = dstmem64[i];
1051       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1052       uint64 dst_tag = addr_to_tag(&dstmem64[i]);
1053       // Detect if tags have been corrupted.
1054       if (data.l64 != src_tag)
1055         ReportTagError(&srcmem64[i], data.l64, src_tag);
1056       if (dstdata.l64 != dst_tag)
1057         ReportTagError(&dstmem64[i], dstdata.l64, dst_tag);
1058
1059       data.l32.l = pattern->pattern(i << 1);
1060       data.l32.h = pattern->pattern((i << 1) + 1);
1061       a1 = a1 + data.l32.l;
1062       b1 = b1 + a1;
1063       a1 = a1 + data.l32.h;
1064       b1 = b1 + a1;
1065
1066       data.l64  = dst_tag;
1067       dstmem64[i] = data.l64;
1068
1069     } else {
1070       data.l64 = srcmem64[i];
1071       a1 = a1 + data.l32.l;
1072       b1 = b1 + a1;
1073       a1 = a1 + data.l32.h;
1074       b1 = b1 + a1;
1075       dstmem64[i] = data.l64;
1076     }
1077     i++;
1078
1079     data.l64 = srcmem64[i];
1080     a2 = a2 + data.l32.l;
1081     b2 = b2 + a2;
1082     a2 = a2 + data.l32.h;
1083     b2 = b2 + a2;
1084     dstmem64[i] = data.l64;
1085     i++;
1086   }
1087   checksum->Set(a1, a2, b1, b2);
1088   return true;
1089 }
1090
1091
1092 // C implementation of Adler memory crc.
1093 bool WorkerThread::AdlerAddrCrcC(uint64 *srcmem64,
1094                                  unsigned int size_in_bytes,
1095                                  AdlerChecksum *checksum,
1096                                  struct page_entry *pe) {
1097   // Use this data wrapper to access memory with 64bit read/write.
1098   datacast_t data;
1099   unsigned int count = size_in_bytes / sizeof(data);
1100
1101   if (count > ((1U) << 19)) {
1102     // Size is too large, must be strictly less than 512 KB.
1103     return false;
1104   }
1105
1106   uint64 a1 = 1;
1107   uint64 a2 = 1;
1108   uint64 b1 = 0;
1109   uint64 b2 = 0;
1110
1111   class Pattern *pattern = pe->pattern;
1112
1113   unsigned int i = 0;
1114   while (i < count) {
1115     // Process 64 bits at a time.
1116     if ((i & 0x7) == 0) {
1117       data.l64 = srcmem64[i];
1118       uint64 src_tag = addr_to_tag(&srcmem64[i]);
1119       // Check that tags match expected.
1120       if (data.l64 != src_tag)
1121         ReportTagError(&srcmem64[i], data.l64, src_tag);
1122
1123
1124       data.l32.l = pattern->pattern(i << 1);
1125       data.l32.h = pattern->pattern((i << 1) + 1);
1126       a1 = a1 + data.l32.l;
1127       b1 = b1 + a1;
1128       a1 = a1 + data.l32.h;
1129       b1 = b1 + a1;
1130
1131
1132     } else {
1133       data.l64 = srcmem64[i];
1134       a1 = a1 + data.l32.l;
1135       b1 = b1 + a1;
1136       a1 = a1 + data.l32.h;
1137       b1 = b1 + a1;
1138     }
1139     i++;
1140
1141     data.l64 = srcmem64[i];
1142     a2 = a2 + data.l32.l;
1143     b2 = b2 + a2;
1144     a2 = a2 + data.l32.h;
1145     b2 = b2 + a2;
1146     i++;
1147   }
1148   checksum->Set(a1, a2, b1, b2);
1149   return true;
1150 }
1151
1152 // Copy a block of memory quickly, while keeping a CRC of the data.
1153 // Result check if the CRC mismatches.
1154 int WorkerThread::CrcCopyPage(struct page_entry *dstpe,
1155                               struct page_entry *srcpe) {
1156   int errors = 0;
1157   const int blocksize = 4096;
1158   const int blockwords = blocksize / wordsize_;
1159   int blocks = sat_->page_length() / blocksize;
1160
1161   // Base addresses for memory copy
1162   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1163   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1164   // Remember the expected CRC
1165   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1166
1167   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1168     uint64 *targetmem = targetmembase + currentblock * blockwords;
1169     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1170
1171     AdlerChecksum crc;
1172     if (tag_mode_) {
1173       AdlerAddrMemcpyC(targetmem, sourcemem, blocksize, &crc, srcpe);
1174     } else {
1175       AdlerMemcpyC(targetmem, sourcemem, blocksize, &crc);
1176     }
1177
1178     // Investigate miscompares.
1179     if (!crc.Equals(*expectedcrc)) {
1180       logprintf(11, "Log: CrcCopyPage Falling through to slow compare, "
1181                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1182                 expectedcrc->ToHexString().c_str());
1183       int errorcount = CheckRegion(sourcemem,
1184                                    srcpe->pattern,
1185                                    blocksize,
1186                                    currentblock * blocksize, 0);
1187       if (errorcount == 0) {
1188         logprintf(0, "Log: CrcCopyPage CRC mismatch %s != %s, "
1189                      "but no miscompares found. Retrying with fresh data.\n",
1190                   crc.ToHexString().c_str(),
1191                   expectedcrc->ToHexString().c_str());
1192         if (!tag_mode_) {
1193           // Copy the data originally read from this region back again.
1194           // This data should have any corruption read originally while
1195           // calculating the CRC.
1196           memcpy(sourcemem, targetmem, blocksize);
1197           errorcount = CheckRegion(sourcemem,
1198                                    srcpe->pattern,
1199                                    blocksize,
1200                                    currentblock * blocksize, 0);
1201           if (errorcount == 0) {
1202             logprintf(0, "Process Error: CrcCopyPage CRC mismatch %s != %s, "
1203                          "but no miscompares found on second pass.\n",
1204                       crc.ToHexString().c_str(),
1205                       expectedcrc->ToHexString().c_str());
1206           }
1207         }
1208       }
1209       errors += errorcount;
1210     }
1211   }
1212
1213   // For odd length transfers, we should never hit this.
1214   int leftovers = sat_->page_length() % blocksize;
1215   if (leftovers) {
1216     uint64 *targetmem = targetmembase + blocks * blockwords;
1217     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1218
1219     errors += CheckRegion(sourcemem,
1220                           srcpe->pattern,
1221                           leftovers,
1222                           blocks * blocksize, 0);
1223     int leftoverwords = leftovers / wordsize_;
1224     for (int i = 0; i < leftoverwords; i++) {
1225       targetmem[i] = sourcemem[i];
1226     }
1227   }
1228
1229   // Update pattern reference to reflect new contents.
1230   dstpe->pattern = srcpe->pattern;
1231
1232   // Clean clean clean the errors away.
1233   if (errors) {
1234     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1235     // cause bad data to be propogated across the page.
1236     FillPage(dstpe);
1237   }
1238   return errors;
1239 }
1240
1241
1242
1243 // Invert a block of memory quickly, traversing downwards.
1244 int InvertThread::InvertPageDown(struct page_entry *srcpe) {
1245   const int blocksize = 4096;
1246   const int blockwords = blocksize / wordsize_;
1247   int blocks = sat_->page_length() / blocksize;
1248
1249   // Base addresses for memory copy
1250   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1251
1252   for (int currentblock = blocks-1; currentblock >= 0; currentblock--) {
1253     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1254     for (int i = blockwords - 32; i >= 0; i -= 32) {
1255       for (int index = i + 31; index >= i; --index) {
1256         unsigned int actual = sourcemem[index];
1257         sourcemem[index] = ~actual;
1258       }
1259       OsLayer::FastFlush(&sourcemem[i]);
1260     }
1261   }
1262
1263   return 0;
1264 }
1265
1266 // Invert a block of memory, traversing upwards.
1267 int InvertThread::InvertPageUp(struct page_entry *srcpe) {
1268   const int blocksize = 4096;
1269   const int blockwords = blocksize / wordsize_;
1270   int blocks = sat_->page_length() / blocksize;
1271
1272   // Base addresses for memory copy
1273   unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1274
1275   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1276     unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1277     for (int i = 0; i < blockwords; i += 32) {
1278       for (int index = i; index <= i + 31; ++index) {
1279         unsigned int actual = sourcemem[index];
1280         sourcemem[index] = ~actual;
1281       }
1282       OsLayer::FastFlush(&sourcemem[i]);
1283     }
1284   }
1285   return 0;
1286 }
1287
1288 // Copy a block of memory quickly, while keeping a CRC of the data.
1289 // Result check if the CRC mismatches. Warm the CPU while running
1290 int WorkerThread::CrcWarmCopyPage(struct page_entry *dstpe,
1291                                   struct page_entry *srcpe) {
1292   int errors = 0;
1293   const int blocksize = 4096;
1294   const int blockwords = blocksize / wordsize_;
1295   int blocks = sat_->page_length() / blocksize;
1296
1297   // Base addresses for memory copy
1298   uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1299   uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1300   // Remember the expected CRC
1301   const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1302
1303   for (int currentblock = 0; currentblock < blocks; currentblock++) {
1304     uint64 *targetmem = targetmembase + currentblock * blockwords;
1305     uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1306
1307     AdlerChecksum crc;
1308     if (tag_mode_) {
1309       AdlerAddrMemcpyC(targetmem, sourcemem, blocksize, &crc, srcpe);
1310     } else {
1311       os_->AdlerMemcpyWarm(targetmem, sourcemem, blocksize, &crc);
1312     }
1313
1314     // Investigate miscompares.
1315     if (!crc.Equals(*expectedcrc)) {
1316       logprintf(11, "Log: CrcWarmCopyPage Falling through to slow compare, "
1317                 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1318                 expectedcrc->ToHexString().c_str());
1319       int errorcount = CheckRegion(sourcemem,
1320                                    srcpe->pattern,
1321                                    blocksize,
1322                                    currentblock * blocksize, 0);
1323       if (errorcount == 0) {
1324         logprintf(0, "Log: CrcWarmCopyPage CRC mismatch %s != %s, "
1325                      "but no miscompares found. Retrying with fresh data.\n",
1326                   crc.ToHexString().c_str(),
1327                   expectedcrc->ToHexString().c_str());
1328         if (!tag_mode_) {
1329           // Copy the data originally read from this region back again.
1330           // This data should have any corruption read originally while
1331           // calculating the CRC.
1332           memcpy(sourcemem, targetmem, blocksize);
1333           errorcount = CheckRegion(sourcemem,
1334                                    srcpe->pattern,
1335                                    blocksize,
1336                                    currentblock * blocksize, 0);
1337           if (errorcount == 0) {
1338             logprintf(0, "Process Error: CrcWarmCopyPage CRC mismatch %s "
1339                          "!= %s, but no miscompares found on second pass.\n",
1340                       crc.ToHexString().c_str(),
1341                       expectedcrc->ToHexString().c_str());
1342           }
1343         }
1344       }
1345       errors += errorcount;
1346     }
1347   }
1348
1349   // For odd length transfers, we should never hit this.
1350   int leftovers = sat_->page_length() % blocksize;
1351   if (leftovers) {
1352     uint64 *targetmem = targetmembase + blocks * blockwords;
1353     uint64 *sourcemem = sourcemembase + blocks * blockwords;
1354
1355     errors += CheckRegion(sourcemem,
1356                           srcpe->pattern,
1357                           leftovers,
1358                           blocks * blocksize, 0);
1359     int leftoverwords = leftovers / wordsize_;
1360     for (int i = 0; i < leftoverwords; i++) {
1361       targetmem[i] = sourcemem[i];
1362     }
1363   }
1364
1365   // Update pattern reference to reflect new contents.
1366   dstpe->pattern = srcpe->pattern;
1367
1368   // Clean clean clean the errors away.
1369   if (errors) {
1370     // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1371     // cause bad data to be propogated across the page.
1372     FillPage(dstpe);
1373   }
1374   return errors;
1375 }
1376
1377
1378
1379 // Memory check work loop. Execute until done, then exhaust pages.
1380 int CheckThread::Work() {
1381   struct page_entry pe;
1382   int result = 1;
1383   int64 loops = 0;
1384
1385   logprintf(9, "Log: Starting Check thread %d\n", thread_num_);
1386
1387   // We want to check all the pages, and
1388   // stop when there aren't any left.
1389   while (1) {
1390     result &= sat_->GetValid(&pe);
1391     if (!result) {
1392       if (IsReadyToRunNoPause())
1393         logprintf(0, "Process Error: check_thread failed to pop pages, "
1394                   "bailing\n");
1395       else
1396         result = 1;
1397       break;
1398     }
1399
1400     // Do the result check.
1401     CrcCheckPage(&pe);
1402
1403     // Push pages back on the valid queue if we are still going,
1404     // throw them out otherwise.
1405     if (IsReadyToRunNoPause())
1406       result &= sat_->PutValid(&pe);
1407     else
1408       result &= sat_->PutEmpty(&pe);
1409     if (!result) {
1410       logprintf(0, "Process Error: check_thread failed to push pages, "
1411                 "bailing\n");
1412       break;
1413     }
1414     loops++;
1415   }
1416
1417   pages_copied_ = loops;
1418   status_ = result;
1419   logprintf(9, "Log: Completed %d: Check thread. Status %d, %d pages checked\n",
1420             thread_num_, status_, pages_copied_);
1421   return 1;
1422 }
1423
1424
1425 // Memory copy work loop. Execute until marked done.
1426 int CopyThread::Work() {
1427   struct page_entry src;
1428   struct page_entry dst;
1429   int result = 1;
1430   int64 loops = 0;
1431
1432   logprintf(9, "Log: Starting copy thread %d: cpu %x, mem %x\n",
1433             thread_num_, cpu_mask_, tag_);
1434
1435   while (IsReadyToRun()) {
1436     // Pop the needed pages.
1437     result &= sat_->GetValid(&src, tag_);
1438     result &= sat_->GetEmpty(&dst, tag_);
1439     if (!result) {
1440       logprintf(0, "Process Error: copy_thread failed to pop pages, "
1441                 "bailing\n");
1442       break;
1443     }
1444
1445     // Force errors for unittests.
1446     if (sat_->error_injection()) {
1447       if (loops == 8) {
1448         char *addr = reinterpret_cast<char*>(src.addr);
1449         int offset = random() % sat_->page_length();
1450         addr[offset] = 0xba;
1451       }
1452     }
1453
1454     // We can use memcpy, or CRC check while we copy.
1455     if (sat_->warm()) {
1456       CrcWarmCopyPage(&dst, &src);
1457     } else if (sat_->strict()) {
1458       CrcCopyPage(&dst, &src);
1459     } else {
1460       memcpy(dst.addr, src.addr, sat_->page_length());
1461       dst.pattern = src.pattern;
1462     }
1463
1464     result &= sat_->PutValid(&dst);
1465     result &= sat_->PutEmpty(&src);
1466
1467     // Copy worker-threads yield themselves at the end of each copy loop,
1468     // to avoid threads from preempting each other in the middle of the inner
1469     // copy-loop. Cooperations between Copy worker-threads results in less
1470     // unnecessary cache thrashing (which happens when context-switching in the
1471     // middle of the inner copy-loop).
1472     YieldSelf();
1473
1474     if (!result) {
1475       logprintf(0, "Process Error: copy_thread failed to push pages, "
1476                 "bailing\n");
1477       break;
1478     }
1479     loops++;
1480   }
1481
1482   pages_copied_ = loops;
1483   status_ = result;
1484   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1485             thread_num_, status_, pages_copied_);
1486   return 1;
1487 }
1488
1489
1490
1491 // Memory invert work loop. Execute until marked done.
1492 int InvertThread::Work() {
1493   struct page_entry src;
1494   int result = 1;
1495   int64 loops = 0;
1496
1497   logprintf(9, "Log: Starting invert thread %d\n", thread_num_);
1498
1499   while (IsReadyToRun()) {
1500     // Pop the needed pages.
1501     result &= sat_->GetValid(&src);
1502     if (!result) {
1503       logprintf(0, "Process Error: invert_thread failed to pop pages, "
1504                 "bailing\n");
1505       break;
1506     }
1507
1508     if (sat_->strict())
1509       CrcCheckPage(&src);
1510
1511     // For the same reason CopyThread yields itself (see YieldSelf comment
1512     // in CopyThread::Work(), InvertThread yields itself after each invert
1513     // operation to improve cooperation between different worker threads
1514     // stressing the memory/cache.
1515     InvertPageUp(&src);
1516     YieldSelf();
1517     InvertPageDown(&src);
1518     YieldSelf();
1519     InvertPageDown(&src);
1520     YieldSelf();
1521     InvertPageUp(&src);
1522     YieldSelf();
1523
1524     if (sat_->strict())
1525       CrcCheckPage(&src);
1526
1527     result &= sat_->PutValid(&src);
1528     if (!result) {
1529       logprintf(0, "Process Error: invert_thread failed to push pages, "
1530                 "bailing\n");
1531       break;
1532     }
1533     loops++;
1534   }
1535
1536   pages_copied_ = loops * 2;
1537   status_ = result;
1538   logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1539             thread_num_, status_, pages_copied_);
1540   return 1;
1541 }
1542
1543
1544 // Set file name to use for File IO.
1545 void FileThread::SetFile(const char *filename_init) {
1546   filename_ = filename_init;
1547   devicename_ = os_->FindFileDevice(filename_);
1548 }
1549
1550 // Open the file for access.
1551 bool FileThread::OpenFile(int *pfile) {
1552   int fd = open(filename_.c_str(),
1553                 O_RDWR | O_CREAT | O_SYNC | O_DIRECT,
1554                 0644);
1555   if (fd < 0) {
1556     logprintf(0, "Process Error: Failed to create file %s!!\n",
1557               filename_.c_str());
1558     pages_copied_ = 0;
1559     status_ = 0;
1560     return 0;
1561   }
1562   *pfile = fd;
1563   return 1;
1564 }
1565
1566 // Close the file.
1567 bool FileThread::CloseFile(int fd) {
1568   close(fd);
1569   return 1;
1570 }
1571
1572 // Check sector tagging.
1573 bool FileThread::SectorTagPage(struct page_entry *src, int block) {
1574   int page_length = sat_->page_length();
1575   struct FileThread::SectorTag *tag =
1576     (struct FileThread::SectorTag *)(src->addr);
1577
1578   // Tag each sector.
1579   unsigned char magic = ((0xba + thread_num_) & 0xff);
1580   for (int sec = 0; sec < page_length / 512; sec++) {
1581     tag[sec].magic = magic;
1582     tag[sec].block = block & 0xff;
1583     tag[sec].sector = sec & 0xff;
1584     tag[sec].pass = pass_ & 0xff;
1585   }
1586   return true;
1587 }
1588
1589 bool FileThread::WritePageToFile(int fd, struct page_entry *src) {
1590   int page_length = sat_->page_length();
1591   // Fill the file with our data.
1592   int64 size = write(fd, src->addr, page_length);
1593
1594   if (size != page_length) {
1595     os_->ErrorReport(devicename_.c_str(), "write-error", 1);
1596     errorcount_++;
1597     logprintf(0, "Block Error: file_thread failed to write, "
1598               "bailing\n");
1599     return false;
1600   }
1601   return true;
1602 }
1603
1604 // Write the data to the file.
1605 bool FileThread::WritePages(int fd) {
1606   int strict = sat_->strict();
1607
1608   // Start fresh at beginning of file for each batch of pages.
1609   lseek(fd, 0, SEEK_SET);
1610   for (int i = 0; i < sat_->disk_pages(); i++) {
1611     struct page_entry src;
1612     if (!GetValidPage(&src))
1613       return false;
1614     // Save expected pattern.
1615     page_recs_[i].pattern = src.pattern;
1616     page_recs_[i].src = src.addr;
1617
1618     // Check data correctness.
1619     if (strict)
1620       CrcCheckPage(&src);
1621
1622     SectorTagPage(&src, i);
1623
1624     bool result = WritePageToFile(fd, &src);
1625
1626     if (!PutEmptyPage(&src))
1627       return false;
1628
1629     if (!result)
1630       return false;
1631   }
1632   return true;
1633 }
1634
1635 // Copy data from file into memory block.
1636 bool FileThread::ReadPageFromFile(int fd, struct page_entry *dst) {
1637   int page_length = sat_->page_length();
1638
1639   // Do the actual read.
1640   int64 size = read(fd, dst->addr, page_length);
1641   if (size != page_length) {
1642     os_->ErrorReport(devicename_.c_str(), "read-error", 1);
1643     logprintf(0, "Block Error: file_thread failed to read, "
1644               "bailing\n");
1645     errorcount_++;
1646     return false;
1647   }
1648   return true;
1649 }
1650
1651 // Check sector tagging.
1652 bool FileThread::SectorValidatePage(const struct PageRec &page,
1653                                     struct page_entry *dst, int block) {
1654   // Error injection.
1655   static int calls = 0;
1656   calls++;
1657
1658   // Do sector tag compare.
1659   int firstsector = -1;
1660   int lastsector = -1;
1661   bool badsector = false;
1662   int page_length = sat_->page_length();
1663
1664   // Cast data block into an array of tagged sectors.
1665   struct FileThread::SectorTag *tag =
1666   (struct FileThread::SectorTag *)(dst->addr);
1667
1668   sat_assert(sizeof(*tag) == 512);
1669
1670   // Error injection.
1671   if (sat_->error_injection()) {
1672     if (calls == 2) {
1673       for (int badsec = 8; badsec < 17; badsec++)
1674         tag[badsec].pass = 27;
1675     }
1676     if (calls == 18) {
1677       (static_cast<int32*>(dst->addr))[27] = 0xbadda7a;
1678     }
1679   }
1680
1681   // Check each sector for the correct tag we added earlier,
1682   // then revert the tag to the to normal data pattern.
1683   unsigned char magic = ((0xba + thread_num_) & 0xff);
1684   for (int sec = 0; sec < page_length / 512; sec++) {
1685     // Check magic tag.
1686     if ((tag[sec].magic != magic) ||
1687         (tag[sec].block != (block & 0xff)) ||
1688         (tag[sec].sector != (sec & 0xff)) ||
1689         (tag[sec].pass != (pass_ & 0xff))) {
1690       // Offset calculation for tag location.
1691       int offset = sec * sizeof(SectorTag);
1692       if (tag[sec].block != (block & 0xff))
1693         offset += 1 * sizeof(uint8);
1694       else if (tag[sec].sector != (sec & 0xff))
1695         offset += 2 * sizeof(uint8);
1696       else if (tag[sec].pass != (pass_ & 0xff))
1697         offset += 3 * sizeof(uint8);
1698
1699       // Run sector tag error through diagnoser for logging and reporting.
1700       errorcount_ += 1;
1701       os_->error_diagnoser_->AddHDDSectorTagError(devicename_, tag[sec].block,
1702                                                   offset,
1703                                                   tag[sec].sector,
1704                                                   page.src, page.dst);
1705
1706       logprintf(5, "Sector Error: Sector tag @ 0x%x, pass %d/%d. "
1707                 "sec %x/%x, block %d/%d, magic %x/%x, File: %s \n",
1708                 block * page_length + 512 * sec,
1709                 (pass_ & 0xff), (unsigned int)tag[sec].pass,
1710                 sec, (unsigned int)tag[sec].sector,
1711                 block, (unsigned int)tag[sec].block,
1712                 magic, (unsigned int)tag[sec].magic,
1713                 filename_.c_str());
1714
1715       // Keep track of first and last bad sector.
1716       if (firstsector == -1)
1717         firstsector = (block * page_length / 512) + sec;
1718       lastsector = (block * page_length / 512) + sec;
1719       badsector = true;
1720     }
1721     // Patch tag back to proper pattern.
1722     unsigned int *addr = (unsigned int *)(&tag[sec]);
1723     *addr = dst->pattern->pattern(512 * sec / sizeof(*addr));
1724   }
1725
1726   // If we found sector errors:
1727   if (badsector == true) {
1728     logprintf(5, "Log: file sector miscompare at offset %x-%x. File: %s\n",
1729               firstsector * 512,
1730               ((lastsector + 1) * 512) - 1,
1731               filename_.c_str());
1732
1733     // Either exit immediately, or patch the data up and continue.
1734     if (sat_->stop_on_error()) {
1735       exit(1);
1736     } else {
1737       // Patch up bad pages.
1738       for (int block = (firstsector * 512) / page_length;
1739           block <= (lastsector * 512) / page_length;
1740           block++) {
1741         unsigned int *memblock = static_cast<unsigned int *>(dst->addr);
1742         int length = page_length / wordsize_;
1743         for (int i = 0; i < length; i++) {
1744           memblock[i] = dst->pattern->pattern(i);
1745         }
1746       }
1747     }
1748   }
1749   return true;
1750 }
1751
1752
1753
1754 // Get memory for an incoming data transfer..
1755 bool FileThread::PagePrepare() {
1756   // We can only do direct IO to SAT pages if it is normal mem.
1757   page_io_ = os_->normal_mem();
1758
1759   // Init a local buffer if we need it.
1760   if (!page_io_) {
1761     local_page_ = static_cast<void*>(memalign(512, sat_->page_length()));
1762     if (!local_page_) {
1763       logprintf(0, "Process Error: disk thread memalign returned 0\n");
1764       status_ += 1;
1765       return false;
1766     }
1767   }
1768   return true;
1769 }
1770
1771
1772 // Remove memory allocated for data transfer.
1773 bool FileThread::PageTeardown() {
1774   // Free a local buffer if we need to.
1775   if (!page_io_) {
1776     free(local_page_);
1777   }
1778   return true;
1779 }
1780
1781
1782
1783 // Get memory for an incoming data transfer..
1784 bool FileThread::GetEmptyPage(struct page_entry *dst) {
1785   if (page_io_) {
1786     if (!sat_->GetEmpty(dst))
1787       return false;
1788   } else {
1789     dst->addr = local_page_;
1790     dst->offset = 0;
1791     dst->pattern = 0;
1792   }
1793   return true;
1794 }
1795
1796 // Get memory for an outgoing data transfer..
1797 bool FileThread::GetValidPage(struct page_entry *src) {
1798   struct page_entry tmp;
1799   if (!sat_->GetValid(&tmp))
1800     return false;
1801   if (page_io_) {
1802     *src = tmp;
1803     return true;
1804   } else {
1805     src->addr = local_page_;
1806     src->offset = 0;
1807     CrcCopyPage(src, &tmp);
1808     if (!sat_->PutValid(&tmp))
1809       return false;
1810   }
1811   return true;
1812 }
1813
1814
1815 // Throw out a used empty page.
1816 bool FileThread::PutEmptyPage(struct page_entry *src) {
1817   if (page_io_) {
1818     if (!sat_->PutEmpty(src))
1819       return false;
1820   }
1821   return true;
1822 }
1823
1824 // Throw out a used, filled page.
1825 bool FileThread::PutValidPage(struct page_entry *src) {
1826   if (page_io_) {
1827     if (!sat_->PutValid(src))
1828       return false;
1829   }
1830   return true;
1831 }
1832
1833
1834
1835 // Copy data from file into memory blocks.
1836 bool FileThread::ReadPages(int fd) {
1837   int page_length = sat_->page_length();
1838   int strict = sat_->strict();
1839   int result = 1;
1840
1841
1842   // Read our data back out of the file, into it's new location.
1843   lseek(fd, 0, SEEK_SET);
1844   for (int i = 0; i < sat_->disk_pages(); i++) {
1845     struct page_entry dst;
1846     if (!GetEmptyPage(&dst))
1847       return false;
1848     // Retrieve expected pattern.
1849     dst.pattern = page_recs_[i].pattern;
1850     // Update page recordpage record.
1851     page_recs_[i].dst = dst.addr;
1852
1853     // Read from the file into destination page.
1854     if (!ReadPageFromFile(fd, &dst)) {
1855         PutEmptyPage(&dst);
1856         return false;
1857     }
1858
1859     SectorValidatePage(page_recs_[i], &dst, i);
1860
1861     // Ensure that the transfer ended up with correct data.
1862     if (strict) {
1863       // Record page index currently CRC checked.
1864       crc_page_ = i;
1865       int errors = CrcCheckPage(&dst);
1866       if (errors) {
1867         logprintf(5, "Log: file miscompare at block %d, "
1868                   "offset %x-%x. File: %s\n",
1869                   i, i * page_length, ((i + 1) * page_length) - 1,
1870                   filename_.c_str());
1871         result = false;
1872       }
1873       crc_page_ = -1;
1874       errorcount_ += errors;
1875     }
1876     if (!PutValidPage(&dst))
1877       return false;
1878   }
1879   return result;
1880 }
1881
1882
1883 // File IO work loop. Execute until marked done.
1884 int FileThread::Work() {
1885   int result = 1;
1886   int fileresult = 1;
1887   int64 loops = 0;
1888
1889   logprintf(9, "Log: Starting file thread %d, file %s, device %s\n",
1890             thread_num_,
1891             filename_.c_str(),
1892             devicename_.c_str());
1893
1894   if (!PagePrepare())
1895     return 0;
1896
1897   // Open the data IO file.
1898   int fd = 0;
1899   if (!OpenFile(&fd))
1900     return 0;
1901
1902   pass_ = 0;
1903
1904   // Load patterns into page records.
1905   page_recs_ = new struct PageRec[sat_->disk_pages()];
1906   for (int i = 0; i < sat_->disk_pages(); i++) {
1907     page_recs_[i].pattern = new struct Pattern();
1908   }
1909
1910   // Loop until done.
1911   while (IsReadyToRun()) {
1912     // Do the file write.
1913     if (!(fileresult &= WritePages(fd)))
1914       break;
1915
1916     // Do the file read.
1917     if (!(fileresult &= ReadPages(fd)))
1918       break;
1919
1920     loops++;
1921     pass_ = loops;
1922   }
1923
1924   pages_copied_ = loops * sat_->disk_pages();
1925   status_ = result;
1926
1927   // Clean up.
1928   CloseFile(fd);
1929   PageTeardown();
1930
1931   logprintf(9, "Log: Completed %d: file thread status %d, %d pages copied\n",
1932             thread_num_, status_, pages_copied_);
1933   return 1;
1934 }
1935
1936 bool NetworkThread::IsNetworkStopSet() {
1937   return !IsReadyToRunNoPause();
1938 }
1939
1940 bool NetworkSlaveThread::IsNetworkStopSet() {
1941   // This thread has no completion status.
1942   // It finishes whever there is no more data to be
1943   // passed back.
1944   return true;
1945 }
1946
1947 // Set ip name to use for Network IO.
1948 void NetworkThread::SetIP(const char *ipaddr_init) {
1949   strncpy(ipaddr_, ipaddr_init, 256);
1950 }
1951
1952 // Create a socket.
1953 // Return 0 on error.
1954 bool NetworkThread::CreateSocket(int *psocket) {
1955   int sock = socket(AF_INET, SOCK_STREAM, 0);
1956   if (sock == -1) {
1957     logprintf(0, "Process Error: Cannot open socket\n");
1958     pages_copied_ = 0;
1959     status_ = 0;
1960     return false;
1961   }
1962   *psocket = sock;
1963   return true;
1964 }
1965
1966 // Close the socket.
1967 bool NetworkThread::CloseSocket(int sock) {
1968   close(sock);
1969   return true;
1970 }
1971
1972 // Initiate the tcp connection.
1973 bool NetworkThread::Connect(int sock) {
1974   struct sockaddr_in dest_addr;
1975   dest_addr.sin_family = AF_INET;
1976   dest_addr.sin_port = htons(kNetworkPort);
1977   memset(&(dest_addr.sin_zero), '\0', sizeof(dest_addr.sin_zero));
1978
1979   // Translate dot notation to u32.
1980   if (inet_aton(ipaddr_, &dest_addr.sin_addr) == 0) {
1981     logprintf(0, "Process Error: Cannot resolve %s\n", ipaddr_);
1982     pages_copied_ = 0;
1983     status_ = 0;
1984     return false;
1985   }
1986
1987   if (-1 == connect(sock, reinterpret_cast<struct sockaddr *>(&dest_addr),
1988                     sizeof(struct sockaddr))) {
1989     logprintf(0, "Process Error: Cannot connect %s\n", ipaddr_);
1990     pages_copied_ = 0;
1991     status_ = 0;
1992     return false;
1993   }
1994   return true;
1995 }
1996
1997 // Initiate the tcp connection.
1998 bool NetworkListenThread::Listen() {
1999   struct sockaddr_in sa;
2000
2001   memset(&(sa.sin_zero), '\0', sizeof(sa.sin_zero));
2002
2003   sa.sin_family = AF_INET;
2004   sa.sin_addr.s_addr = INADDR_ANY;
2005   sa.sin_port = htons(kNetworkPort);
2006
2007   if (-1 == bind(sock_, (struct sockaddr*)&sa, sizeof(struct sockaddr))) {
2008     char buf[256];
2009     sat_strerror(errno, buf, sizeof(buf));
2010     logprintf(0, "Process Error: Cannot bind socket: %s\n", buf);
2011     pages_copied_ = 0;
2012     status_ = 0;
2013     return false;
2014   }
2015   listen(sock_, 3);
2016   return true;
2017 }
2018
2019 // Wait for a connection from a network traffic generation thread.
2020 bool NetworkListenThread::Wait() {
2021     fd_set rfds;
2022     struct timeval tv;
2023     int retval;
2024
2025     // Watch sock_ to see when it has input.
2026     FD_ZERO(&rfds);
2027     FD_SET(sock_, &rfds);
2028     // Wait up to five seconds.
2029     tv.tv_sec = 5;
2030     tv.tv_usec = 0;
2031
2032     retval = select(sock_ + 1, &rfds, NULL, NULL, &tv);
2033
2034     return (retval > 0);
2035 }
2036
2037 // Wait for a connection from a network traffic generation thread.
2038 bool NetworkListenThread::GetConnection(int *pnewsock) {
2039   struct sockaddr_in sa;
2040   socklen_t size = sizeof(struct sockaddr_in);
2041
2042   int newsock = accept(sock_, reinterpret_cast<struct sockaddr *>(&sa), &size);
2043   if (newsock < 0)  {
2044     logprintf(0, "Process Error: Did not receive connection\n");
2045     pages_copied_ = 0;
2046     status_ = 0;
2047     return false;
2048   }
2049   *pnewsock = newsock;
2050   return true;
2051 }
2052
2053 bool NetworkThread::SendPage(int sock, struct page_entry *src) {
2054   int page_length = sat_->page_length();
2055   char *address = static_cast<char*>(src->addr);
2056
2057   // Send our data over the network.
2058   int size = page_length;
2059   while (size) {
2060     int transferred = send(sock, address + (page_length - size), size, 0);
2061     if ((transferred == 0) || (transferred == -1)) {
2062       if (!IsNetworkStopSet()) {
2063         char buf[256] = "";
2064         sat_strerror(errno, buf, sizeof(buf));
2065         logprintf(0, "Process Error: Thread %d, "
2066                      "Network write failed, bailing. (%s)\n",
2067                   thread_num_, buf);
2068       }
2069       return false;
2070     }
2071     size = size - transferred;
2072   }
2073   return true;
2074 }
2075
2076
2077 bool NetworkThread::ReceivePage(int sock, struct page_entry *dst) {
2078   int page_length = sat_->page_length();
2079   char *address = static_cast<char*>(dst->addr);
2080
2081   // Maybe we will get our data back again, maybe not.
2082   int size = page_length;
2083   while (size) {
2084     int transferred = recv(sock, address + (page_length - size), size, 0);
2085     if ((transferred == 0) || (transferred == -1)) {
2086       // Typically network slave thread should exit as network master
2087       // thread stops sending data.
2088       if (IsNetworkStopSet()) {
2089         int err = errno;
2090         if (transferred == 0 && err == 0) {
2091           // Two system setups will not sync exactly,
2092           // allow early exit, but log it.
2093           logprintf(0, "Log: Net thread did not recieve any data, exitting.\n");
2094         } else {
2095           char buf[256] = "";
2096           sat_strerror(err, buf, sizeof(buf));
2097           // Print why we failed.
2098           logprintf(0, "Process Error: Thread %d, "
2099                        "Network read failed, bailing (%s).\n",
2100                     thread_num_, buf);
2101           // Print arguments and results.
2102           logprintf(0, "Log: recv(%d, address %x, size %x, 0) == %x, err %d\n",
2103                     sock, address + (page_length - size),
2104                     size, transferred, err);
2105           if ((transferred == 0) &&
2106               (page_length - size < 512) &&
2107               (page_length - size > 0)) {
2108             // Print null terminated data received, to see who's been
2109             // sending us supicious unwanted data.
2110             address[page_length - size] = 0;
2111             logprintf(0, "Log: received  %d bytes: '%s'\n",
2112                       page_length - size, address);
2113           }
2114         }
2115       }
2116       return false;
2117     }
2118     size = size - transferred;
2119   }
2120   return true;
2121 }
2122
2123
2124 // Network IO work loop. Execute until marked done.
2125 int NetworkThread::Work() {
2126   logprintf(9, "Log: Starting network thread %d, ip %s\n",
2127             thread_num_,
2128             ipaddr_);
2129
2130   // Make a socket.
2131   int sock = 0;
2132   if (!CreateSocket(&sock))
2133     return 0;
2134
2135   // Network IO loop requires network slave thread to have already initialized.
2136   // We will sleep here for awhile to ensure that the slave thread will be
2137   // listening by the time we connect.
2138   // Sleep for 15 seconds.
2139   sat_sleep(15);
2140   logprintf(9, "Log: Starting execution of network thread %d, ip %s\n",
2141             thread_num_,
2142             ipaddr_);
2143
2144
2145   // Connect to a slave thread.
2146   if (!Connect(sock))
2147     return 0;
2148
2149   // Loop until done.
2150   int result = 1;
2151   int strict = sat_->strict();
2152   int64 loops = 0;
2153   while (IsReadyToRun()) {
2154     struct page_entry src;
2155     struct page_entry dst;
2156     result &= sat_->GetValid(&src);
2157     result &= sat_->GetEmpty(&dst);
2158     if (!result) {
2159       logprintf(0, "Process Error: net_thread failed to pop pages, "
2160                 "bailing\n");
2161       break;
2162     }
2163
2164     // Check data correctness.
2165     if (strict)
2166       CrcCheckPage(&src);
2167
2168     // Do the network write.
2169     if (!(result &= SendPage(sock, &src)))
2170       break;
2171
2172     // Update pattern reference to reflect new contents.
2173     dst.pattern = src.pattern;
2174
2175     // Do the network read.
2176     if (!(result &= ReceivePage(sock, &dst)))
2177       break;
2178
2179     // Ensure that the transfer ended up with correct data.
2180     if (strict)
2181       CrcCheckPage(&dst);
2182
2183     // Return all of our pages to the queue.
2184     result &= sat_->PutValid(&dst);
2185     result &= sat_->PutEmpty(&src);
2186     if (!result) {
2187       logprintf(0, "Process Error: net_thread failed to push pages, "
2188                 "bailing\n");
2189       break;
2190     }
2191     loops++;
2192   }
2193
2194   pages_copied_ = loops;
2195   status_ = result;
2196
2197   // Clean up.
2198   CloseSocket(sock);
2199
2200   logprintf(9, "Log: Completed %d: network thread status %d, "
2201                "%d pages copied\n",
2202             thread_num_, status_, pages_copied_);
2203   return 1;
2204 }
2205
2206 // Spawn slave threads for incoming connections.
2207 bool NetworkListenThread::SpawnSlave(int newsock, int threadid) {
2208   logprintf(12, "Log: Listen thread spawning slave\n");
2209
2210   // Spawn slave thread, to reflect network traffic back to sender.
2211   ChildWorker *child_worker = new ChildWorker;
2212   child_worker->thread.SetSock(newsock);
2213   child_worker->thread.InitThread(threadid, sat_, os_, patternlist_,
2214                                   &child_worker->status);
2215   child_worker->status.Initialize();
2216   child_worker->thread.SpawnThread();
2217   child_workers_.push_back(child_worker);
2218
2219   return true;
2220 }
2221
2222 // Reap slave threads.
2223 bool NetworkListenThread::ReapSlaves() {
2224   bool result = true;
2225   // Gather status and reap threads.
2226   logprintf(12, "Log: Joining all outstanding threads\n");
2227
2228   for (int i = 0; i < child_workers_.size(); i++) {
2229     NetworkSlaveThread& child_thread = child_workers_[i]->thread;
2230     logprintf(12, "Log: Joining slave thread %d\n", i);
2231     child_thread.JoinThread();
2232     if (child_thread.GetStatus() != 1) {
2233       logprintf(0, "Process Error: Slave Thread %d failed with status %d\n", i,
2234                 child_thread.GetStatus());
2235       result = false;
2236     }
2237     errorcount_ += child_thread.GetErrorCount();
2238     logprintf(9, "Log: Slave Thread %d found %lld miscompares\n", i,
2239               child_thread.GetErrorCount());
2240     pages_copied_ += child_thread.GetPageCount();
2241   }
2242
2243   return result;
2244 }
2245
2246 // Network listener IO work loop. Execute until marked done.
2247 int NetworkListenThread::Work() {
2248   int result = 1;
2249   logprintf(9, "Log: Starting network listen thread %d\n",
2250             thread_num_);
2251
2252   // Make a socket.
2253   sock_ = 0;
2254   if (!CreateSocket(&sock_))
2255     return 0;
2256   logprintf(9, "Log: Listen thread created sock\n");
2257
2258   // Allows incoming connections to be queued up by socket library.
2259   int newsock = 0;
2260   Listen();
2261   logprintf(12, "Log: Listen thread waiting for incoming connections\n");
2262
2263   // Wait on incoming connections, and spawn worker threads for them.
2264   int threadcount = 0;
2265   while (IsReadyToRun()) {
2266     // Poll for connections that we can accept().
2267     if (Wait()) {
2268       // Accept those connections.
2269       logprintf(12, "Log: Listen thread found incoming connection\n");
2270       if (GetConnection(&newsock)) {
2271         SpawnSlave(newsock, threadcount);
2272         threadcount++;
2273       }
2274     }
2275   }
2276
2277   // Gather status and join spawned threads.
2278   ReapSlaves();
2279
2280   // Delete the child workers.
2281   for (ChildVector::iterator it = child_workers_.begin();
2282        it != child_workers_.end(); ++it) {
2283     (*it)->status.Destroy();
2284     delete *it;
2285   }
2286   child_workers_.clear();
2287
2288   CloseSocket(sock_);
2289
2290   status_ = result;
2291   logprintf(9,
2292             "Log: Completed %d: network listen thread status %d, "
2293             "%d pages copied\n",
2294             thread_num_, status_, pages_copied_);
2295   return 1;
2296 }
2297
2298 // Set network reflector socket struct.
2299 void NetworkSlaveThread::SetSock(int sock) {
2300   sock_ = sock;
2301 }
2302
2303 // Network reflector IO work loop. Execute until marked done.
2304 int NetworkSlaveThread::Work() {
2305   logprintf(9, "Log: Starting network slave thread %d\n",
2306             thread_num_);
2307
2308   // Verify that we have a socket.
2309   int sock = sock_;
2310   if (!sock)
2311     return 0;
2312
2313   // Loop until done.
2314   int result = 1;
2315   int64 loops = 0;
2316   // Init a local buffer for storing data.
2317   void *local_page = static_cast<void*>(memalign(512, sat_->page_length()));
2318   if (!local_page) {
2319     logprintf(0, "Process Error: Net Slave thread memalign returned 0\n");
2320     status_ += 1;
2321     return 0;
2322   }
2323
2324   struct page_entry page;
2325   page.addr = local_page;
2326
2327   // This thread will continue to run as long as the thread on the other end of
2328   // the socket is still sending and receiving data.
2329   while (1) {
2330     // Do the network read.
2331     if (!ReceivePage(sock, &page))
2332       break;
2333
2334     // Do the network write.
2335     if (!SendPage(sock, &page))
2336       break;
2337
2338     loops++;
2339   }
2340
2341   pages_copied_ = loops;
2342   status_ = result;
2343
2344   // Clean up.
2345   CloseSocket(sock);
2346
2347   logprintf(9,
2348             "Log: Completed %d: network slave thread status %d, "
2349             "%d pages copied\n",
2350             thread_num_, status_, pages_copied_);
2351   return result;
2352 }
2353
2354 // Thread work loop. Execute until marked finished.
2355 int ErrorPollThread::Work() {
2356   logprintf(9, "Log: Starting system error poll thread %d\n", thread_num_);
2357
2358   // This calls a generic error polling function in the Os abstraction layer.
2359   do {
2360     errorcount_ += os_->ErrorPoll();
2361     os_->ErrorWait();
2362   } while (IsReadyToRun());
2363
2364   logprintf(9, "Log: Finished system error poll thread %d: %d errors\n",
2365             thread_num_, errorcount_);
2366   status_ = 1;
2367   return 1;
2368 }
2369
2370 // Worker thread to heat up CPU.
2371 int CpuStressThread::Work() {
2372   logprintf(9, "Log: Starting CPU stress thread %d\n", thread_num_);
2373
2374   do {
2375     // Run ludloff's platform/CPU-specific assembly workload.
2376     os_->CpuStressWorkload();
2377     YieldSelf();
2378   } while (IsReadyToRun());
2379
2380   logprintf(9, "Log: Finished CPU stress thread %d:\n",
2381             thread_num_);
2382   status_ = 1;
2383   return 1;
2384 }
2385
2386 CpuCacheCoherencyThread::CpuCacheCoherencyThread(cc_cacheline_data *data,
2387                                                  int cacheline_count,
2388                                                  int thread_num,
2389                                                  int inc_count) {
2390   cc_cacheline_data_ = data;
2391   cc_cacheline_count_ = cacheline_count;
2392   cc_thread_num_ = thread_num;
2393   cc_inc_count_ = inc_count;
2394 }
2395
2396 // Worked thread to test the cache coherency of the CPUs
2397 int CpuCacheCoherencyThread::Work() {
2398   logprintf(9, "Log: Starting the Cache Coherency thread %d\n",
2399             cc_thread_num_);
2400   uint64 time_start, time_end;
2401   struct timeval tv;
2402
2403   unsigned int seed = static_cast<unsigned int>(gettid());
2404   gettimeofday(&tv, NULL);  // Get the timestamp before increments.
2405   time_start = tv.tv_sec * 1000000ULL + tv.tv_usec;
2406
2407   uint64 total_inc = 0;  // Total increments done by the thread.
2408   while (IsReadyToRun()) {
2409     for (int i = 0; i < cc_inc_count_; i++) {
2410       // Choose a datastructure in random and increment the appropriate
2411       // member in that according to the offset (which is the same as the
2412       // thread number.
2413       int r = rand_r(&seed);
2414       r = cc_cacheline_count_ * (r / (RAND_MAX + 1.0));
2415       // Increment the member of the randomely selected structure.
2416       (cc_cacheline_data_[r].num[cc_thread_num_])++;
2417     }
2418
2419     total_inc += cc_inc_count_;
2420
2421     // Calculate if the local counter matches with the global value
2422     // in all the cache line structures for this particular thread.
2423     int cc_global_num = 0;
2424     for (int cline_num = 0; cline_num < cc_cacheline_count_; cline_num++) {
2425       cc_global_num += cc_cacheline_data_[cline_num].num[cc_thread_num_];
2426       // Reset the cachline member's value for the next run.
2427       cc_cacheline_data_[cline_num].num[cc_thread_num_] = 0;
2428     }
2429     if (sat_->error_injection())
2430       cc_global_num = -1;
2431
2432     if (cc_global_num != cc_inc_count_) {
2433       errorcount_++;
2434       logprintf(0, "Hardware Error: global(%d) and local(%d) do not match\n",
2435                 cc_global_num, cc_inc_count_);
2436     }
2437   }
2438   gettimeofday(&tv, NULL);  // Get the timestamp at the end.
2439   time_end = tv.tv_sec * 1000000ULL + tv.tv_usec;
2440
2441   uint64 us_elapsed = time_end - time_start;
2442   // inc_rate is the no. of increments per second.
2443   double inc_rate = total_inc * 1e6 / us_elapsed;
2444
2445   logprintf(4, "Stats: CC Thread(%d): Time=%llu us,"
2446             " Increments=%llu, Increments/sec = %.6lf\n",
2447             cc_thread_num_, us_elapsed, total_inc, inc_rate);
2448   logprintf(9, "Log: Finished CPU Cache Coherency thread %d:\n",
2449             cc_thread_num_);
2450   status_ = 1;
2451   return 1;
2452 }
2453
2454 DiskThread::DiskThread(DiskBlockTable *block_table) {
2455   read_block_size_ = kSectorSize;   // default 1 sector (512 bytes)
2456   write_block_size_ = kSectorSize;  // this assumes read and write block size
2457                                     // are the same
2458   segment_size_ = -1;               // use the entire disk as one segment
2459   cache_size_ = 16 * 1024 * 1024;   // assume 16MiB cache by default
2460   // Use a queue such that 3/2 times as much data as the cache can hold
2461   // is written before it is read so that there is little chance the read
2462   // data is in the cache.
2463   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2464   blocks_per_segment_ = 32;
2465
2466   read_threshold_ = 100000;         // 100ms is a reasonable limit for
2467   write_threshold_ = 100000;        // reading/writing a sector
2468
2469   read_timeout_ = 5000000;          // 5 seconds should be long enough for a
2470   write_timeout_ = 5000000;         // timout for reading/writing
2471
2472   device_sectors_ = 0;
2473   non_destructive_ = 0;
2474
2475   aio_ctx_ = 0;
2476   block_table_ = block_table;
2477   update_block_table_ = 1;
2478 }
2479
2480 DiskThread::~DiskThread() {
2481 }
2482
2483 // Set filename for device file (in /dev).
2484 void DiskThread::SetDevice(const char *device_name) {
2485   device_name_ = device_name;
2486 }
2487
2488 // Set various parameters that control the behaviour of the test.
2489 // -1 is used as a sentinel value on each parameter (except non_destructive)
2490 // to indicate that the parameter not be set.
2491 bool DiskThread::SetParameters(int read_block_size,
2492                                int write_block_size,
2493                                int64 segment_size,
2494                                int64 cache_size,
2495                                int blocks_per_segment,
2496                                int64 read_threshold,
2497                                int64 write_threshold,
2498                                int non_destructive) {
2499   if (read_block_size != -1) {
2500     // Blocks must be aligned to the disk's sector size.
2501     if (read_block_size % kSectorSize != 0) {
2502       logprintf(0, "Process Error: Block size must be a multiple of %d "
2503                 "(thread %d).\n", kSectorSize, thread_num_);
2504       return false;
2505     }
2506
2507     read_block_size_ = read_block_size;
2508   }
2509
2510   if (write_block_size != -1) {
2511     // Write blocks must be aligned to the disk's sector size and to the
2512     // block size.
2513     if (write_block_size % kSectorSize != 0) {
2514       logprintf(0, "Process Error: Write block size must be a multiple "
2515                 "of %d (thread %d).\n", kSectorSize, thread_num_);
2516       return false;
2517     }
2518     if (write_block_size % read_block_size_ != 0) {
2519       logprintf(0, "Process Error: Write block size must be a multiple "
2520                 "of the read block size, which is %d (thread %d).\n",
2521                 read_block_size_, thread_num_);
2522       return false;
2523     }
2524
2525     write_block_size_ = write_block_size;
2526
2527   } else {
2528     // Make sure write_block_size_ is still valid.
2529     if (read_block_size_ > write_block_size_) {
2530       logprintf(5, "Log: Assuming write block size equal to read block size, "
2531                 "which is %d (thread %d).\n", read_block_size_,
2532                 thread_num_);
2533       write_block_size_ = read_block_size_;
2534     } else {
2535       if (write_block_size_ % read_block_size_ != 0) {
2536         logprintf(0, "Process Error: Write block size (defined as %d) must "
2537                   "be a multiple of the read block size, which is %d "
2538                   "(thread %d).\n", write_block_size_, read_block_size_,
2539                   thread_num_);
2540         return false;
2541       }
2542     }
2543   }
2544
2545   if (cache_size != -1) {
2546     cache_size_ = cache_size;
2547   }
2548
2549   if (blocks_per_segment != -1) {
2550     if (blocks_per_segment <= 0) {
2551       logprintf(0, "Process Error: Blocks per segment must be greater than "
2552                    "zero.\n (thread %d)", thread_num_);
2553       return false;
2554     }
2555
2556     blocks_per_segment_ = blocks_per_segment;
2557   }
2558
2559   if (read_threshold != -1) {
2560     if (read_threshold <= 0) {
2561       logprintf(0, "Process Error: Read threshold must be greater than "
2562                    "zero (thread %d).\n", thread_num_);
2563       return false;
2564     }
2565
2566     read_threshold_ = read_threshold;
2567   }
2568
2569   if (write_threshold != -1) {
2570     if (write_threshold <= 0) {
2571       logprintf(0, "Process Error: Write threshold must be greater than "
2572                    "zero (thread %d).\n", thread_num_);
2573       return false;
2574     }
2575
2576     write_threshold_ = write_threshold;
2577   }
2578
2579   if (segment_size != -1) {
2580     // Segments must be aligned to the disk's sector size.
2581     if (segment_size % kSectorSize != 0) {
2582       logprintf(0, "Process Error: Segment size must be a multiple of %d"
2583                 " (thread %d).\n", kSectorSize, thread_num_);
2584       return false;
2585     }
2586
2587     segment_size_ = segment_size / kSectorSize;
2588   }
2589
2590   non_destructive_ = non_destructive;
2591
2592   // Having a queue of 150% of blocks that will fit in the disk's cache
2593   // should be enough to force out the oldest block before it is read and hence,
2594   // making sure the data comes form the disk and not the cache.
2595   queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2596   // Updating DiskBlockTable parameters
2597   if (update_block_table_) {
2598     block_table_->SetParameters(kSectorSize, write_block_size_,
2599                                 device_sectors_, segment_size_,
2600                                 device_name_);
2601   }
2602   return true;
2603 }
2604
2605 bool DiskThread::OpenDevice(int *pfile) {
2606   int fd = open(device_name_.c_str(),
2607                 O_RDWR | O_SYNC | O_DIRECT | O_LARGEFILE,
2608                 0);
2609   if (fd < 0) {
2610     logprintf(0, "Process Error: Failed to open device %s (thread %d)!!\n",
2611               device_name_.c_str(), thread_num_);
2612     return false;
2613   }
2614   *pfile = fd;
2615
2616   return GetDiskSize(fd);
2617 }
2618
2619 // Retrieves the size (in bytes) of the disk/file.
2620 bool DiskThread::GetDiskSize(int fd) {
2621   struct stat device_stat;
2622   if (fstat(fd, &device_stat) == -1) {
2623     logprintf(0, "Process Error: Unable to fstat disk %s (thread %d).\n",
2624               device_name_.c_str(), thread_num_);
2625     return false;
2626   }
2627
2628   // For a block device, an ioctl is needed to get the size since the size
2629   // of the device file (i.e. /dev/sdb) is 0.
2630   if (S_ISBLK(device_stat.st_mode)) {
2631     uint64 block_size = 0;
2632
2633     if (ioctl(fd, BLKGETSIZE64, &block_size) == -1) {
2634       logprintf(0, "Process Error: Unable to ioctl disk %s (thread %d).\n",
2635                 device_name_.c_str(), thread_num_);
2636       return false;
2637     }
2638
2639     // If an Elephant is initialized with status DEAD its size will be zero.
2640     if (block_size == 0) {
2641       os_->ErrorReport(device_name_.c_str(), "device-size-zero", 1);
2642       ++errorcount_;
2643       status_ = 1;  // Avoid a procedural error.
2644       return false;
2645     }
2646
2647     device_sectors_ = block_size / kSectorSize;
2648
2649   } else if (S_ISREG(device_stat.st_mode)) {
2650     device_sectors_ = device_stat.st_size / kSectorSize;
2651
2652   } else {
2653     logprintf(0, "Process Error: %s is not a regular file or block "
2654               "device (thread %d).\n", device_name_.c_str(),
2655               thread_num_);
2656     return false;
2657   }
2658
2659   logprintf(12, "Log: Device sectors: %lld on disk %s (thread %d).\n",
2660             device_sectors_, device_name_.c_str(), thread_num_);
2661
2662   if (update_block_table_) {
2663     block_table_->SetParameters(kSectorSize, write_block_size_,
2664                                 device_sectors_, segment_size_,
2665                                 device_name_);
2666   }
2667
2668   return true;
2669 }
2670
2671 bool DiskThread::CloseDevice(int fd) {
2672   close(fd);
2673   return true;
2674 }
2675
2676 // Return the time in microseconds.
2677 int64 DiskThread::GetTime() {
2678   struct timeval tv;
2679   gettimeofday(&tv, NULL);
2680   return tv.tv_sec * 1000000 + tv.tv_usec;
2681 }
2682
2683 bool DiskThread::DoWork(int fd) {
2684   int64 block_num = 0;
2685   blocks_written_ = 0;
2686   blocks_read_ = 0;
2687   int64 num_segments;
2688
2689   if (segment_size_ == -1) {
2690     num_segments = 1;
2691   } else {
2692     num_segments = device_sectors_ / segment_size_;
2693     if (device_sectors_ % segment_size_ != 0)
2694       num_segments++;
2695   }
2696
2697   // Disk size should be at least 3x cache size.  See comment later for
2698   // details.
2699   sat_assert(device_sectors_ * kSectorSize > 3 * cache_size_);
2700
2701   // This disk test works by writing blocks with a certain pattern to
2702   // disk, then reading them back and verifying it against the pattern
2703   // at a later time.  A failure happens when either the block cannot
2704   // be written/read or when the read block is different than what was
2705   // written.  If a block takes too long to write/read, then a warning
2706   // is given instead of an error since taking too long is not
2707   // necessarily an error.
2708   //
2709   // To prevent the read blocks from coming from the disk cache,
2710   // enough blocks are written before read such that a block would
2711   // be ejected from the disk cache by the time it is read.
2712   //
2713   // TODO(amistry): Implement some sort of read/write throttling.  The
2714   //                flood of asynchronous I/O requests when a drive is
2715   //                unplugged is causing the application and kernel to
2716   //                become unresponsive.
2717
2718   while (IsReadyToRun()) {
2719     // Write blocks to disk.
2720     logprintf(16, "Write phase for disk %s (thread %d).\n",
2721               device_name_.c_str(), thread_num_);
2722     while (IsReadyToRunNoPause() &&
2723            in_flight_sectors_.size() < queue_size_ + 1) {
2724       // Confine testing to a particular segment of the disk.
2725       int64 segment = (block_num / blocks_per_segment_) % num_segments;
2726       if (block_num % blocks_per_segment_ == 0) {
2727         logprintf(20, "Log: Starting to write segment %lld out of "
2728                   "%lld on disk %s (thread %d).\n",
2729                   segment, num_segments, device_name_.c_str(),
2730                   thread_num_);
2731       }
2732       block_num++;
2733
2734       BlockData *block = block_table_->GetUnusedBlock(segment);
2735
2736       // If an unused sequence of sectors could not be found, skip to the
2737       // next block to process.  Soon, a new segment will come and new
2738       // sectors will be able to be allocated.  This effectively puts a
2739       // minumim on the disk size at 3x the stated cache size, or 48MiB
2740       // if a cache size is not given (since the cache is set as 16MiB
2741       // by default).  Given that todays caches are at the low MiB range
2742       // and drive sizes at the mid GB, this shouldn't pose a problem.
2743       // The 3x minimum comes from the following:
2744       //   1. In order to allocate 'y' blocks from a segment, the
2745       //      segment must contain at least 2y blocks or else an
2746       //      allocation may not succeed.
2747       //   2. Assume the entire disk is one segment.
2748       //   3. A full write phase consists of writing blocks corresponding to
2749       //      3/2 cache size.
2750       //   4. Therefore, the one segment must have 2 * 3/2 * cache
2751       //      size worth of blocks = 3 * cache size worth of blocks
2752       //      to complete.
2753       // In non-destructive mode, don't write anything to disk.
2754       if (!non_destructive_) {
2755         if (!WriteBlockToDisk(fd, block)) {
2756           block_table_->RemoveBlock(block);
2757           continue;
2758         }
2759       }
2760
2761       block->SetBlockAsInitialized();
2762
2763       blocks_written_++;
2764       in_flight_sectors_.push(block);
2765     }
2766
2767     // Verify blocks on disk.
2768     logprintf(20, "Read phase for disk %s (thread %d).\n",
2769               device_name_.c_str(), thread_num_);
2770     while (IsReadyToRunNoPause() && !in_flight_sectors_.empty()) {
2771       BlockData *block = in_flight_sectors_.front();
2772       in_flight_sectors_.pop();
2773       ValidateBlockOnDisk(fd, block);
2774       block_table_->RemoveBlock(block);
2775       blocks_read_++;
2776     }
2777   }
2778
2779   pages_copied_ = blocks_written_ + blocks_read_;
2780   return true;
2781 }
2782
2783 // Do an asynchronous disk I/O operation.
2784 bool DiskThread::AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
2785                             int64 offset, int64 timeout) {
2786   // Use the Linux native asynchronous I/O interface for reading/writing.
2787   // A read/write consists of three basic steps:
2788   //    1. create an io context.
2789   //    2. prepare and submit an io request to the context
2790   //    3. wait for an event on the context.
2791
2792   struct {
2793     const int opcode;
2794     const char *op_str;
2795     const char *error_str;
2796   } operations[2] = {
2797     { IOCB_CMD_PREAD, "read", "disk-read-error" },
2798     { IOCB_CMD_PWRITE, "write", "disk-write-error" }
2799   };
2800
2801   struct iocb cb;
2802   memset(&cb, 0, sizeof(cb));
2803
2804   cb.aio_fildes = fd;
2805   cb.aio_lio_opcode = operations[op].opcode;
2806   cb.aio_buf = (__u64)buf;
2807   cb.aio_nbytes = size;
2808   cb.aio_offset = offset;
2809
2810   struct iocb *cbs[] = { &cb };
2811   if (io_submit(aio_ctx_, 1, cbs) != 1) {
2812     logprintf(0, "Process Error: Unable to submit async %s "
2813                  "on disk %s (thread %d).\n",
2814               operations[op].op_str, device_name_.c_str(),
2815               thread_num_);
2816     return false;
2817   }
2818
2819   struct io_event event;
2820   memset(&event, 0, sizeof(event));
2821   struct timespec tv;
2822   tv.tv_sec = timeout / 1000000;
2823   tv.tv_nsec = (timeout % 1000000) * 1000;
2824   if (io_getevents(aio_ctx_, 1, 1, &event, &tv) != 1) {
2825     // A ctrl-c from the keyboard will cause io_getevents to fail with an
2826     // EINTR error code.  This is not an error and so don't treat it as such,
2827     // but still log it.
2828     if (errno == EINTR) {
2829       logprintf(5, "Log: %s interrupted on disk %s (thread %d).\n",
2830                 operations[op].op_str, device_name_.c_str(),
2831                 thread_num_);
2832     } else {
2833       os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2834       errorcount_ += 1;
2835       logprintf(0, "Hardware Error: Timeout doing async %s to sectors "
2836                    "starting at %lld on disk %s (thread %d).\n",
2837                 operations[op].op_str, offset / kSectorSize,
2838                 device_name_.c_str(), thread_num_);
2839     }
2840
2841     // Don't bother checking return codes since io_cancel seems to always fail.
2842     // Since io_cancel is always failing, destroying and recreating an I/O
2843     // context is a workaround for canceling an in-progress I/O operation.
2844     // TODO(amistry): Find out why io_cancel isn't working and make it work.
2845     io_cancel(aio_ctx_, &cb, &event);
2846     io_destroy(aio_ctx_);
2847     aio_ctx_ = 0;
2848     if (io_setup(5, &aio_ctx_)) {
2849       logprintf(0, "Process Error: Unable to create aio context on disk %s"
2850                 " (thread %d).\n",
2851                 device_name_.c_str(), thread_num_);
2852     }
2853
2854     return false;
2855   }
2856
2857   // event.res contains the number of bytes written/read or
2858   // error if < 0, I think.
2859   if (event.res != size) {
2860     errorcount_++;
2861     os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2862
2863     if (event.res < 0) {
2864       switch (event.res) {
2865         case -EIO:
2866           logprintf(0, "Hardware Error: Low-level I/O error while doing %s to "
2867                        "sectors starting at %lld on disk %s (thread %d).\n",
2868                     operations[op].op_str, offset / kSectorSize,
2869                     device_name_.c_str(), thread_num_);
2870           break;
2871         default:
2872           logprintf(0, "Hardware Error: Unknown error while doing %s to "
2873                        "sectors starting at %lld on disk %s (thread %d).\n",
2874                     operations[op].op_str, offset / kSectorSize,
2875                     device_name_.c_str(), thread_num_);
2876       }
2877     } else {
2878       logprintf(0, "Hardware Error: Unable to %s to sectors starting at "
2879                    "%lld on disk %s (thread %d).\n",
2880                 operations[op].op_str, offset / kSectorSize,
2881                 device_name_.c_str(), thread_num_);
2882     }
2883     return false;
2884   }
2885
2886   return true;
2887 }
2888
2889 // Write a block to disk.
2890 bool DiskThread::WriteBlockToDisk(int fd, BlockData *block) {
2891   memset(block_buffer_, 0, block->GetSize());
2892
2893   // Fill block buffer with a pattern
2894   struct page_entry pe;
2895   if (!sat_->GetValid(&pe)) {
2896     // Even though a valid page could not be obatined, it is not an error
2897     // since we can always fill in a pattern directly, albeit slower.
2898     unsigned int *memblock = static_cast<unsigned int *>(block_buffer_);
2899     block->SetPattern(patternlist_->GetRandomPattern());
2900
2901     logprintf(11, "Log: Warning, using pattern fill fallback in "
2902                   "DiskThread::WriteBlockToDisk on disk %s (thread %d).\n",
2903               device_name_.c_str(), thread_num_);
2904
2905     for (int i = 0; i < block->GetSize()/wordsize_; i++) {
2906       memblock[i] = block->GetPattern()->pattern(i);
2907     }
2908   } else {
2909     memcpy(block_buffer_, pe.addr, block->GetSize());
2910     block->SetPattern(pe.pattern);
2911     sat_->PutValid(&pe);
2912   }
2913
2914   logprintf(12, "Log: Writing %lld sectors starting at %lld on disk %s"
2915             " (thread %d).\n",
2916             block->GetSize()/kSectorSize, block->GetAddress(),
2917             device_name_.c_str(), thread_num_);
2918
2919   int64 start_time = GetTime();
2920
2921   if (!AsyncDiskIO(ASYNC_IO_WRITE, fd, block_buffer_, block->GetSize(),
2922                    block->GetAddress() * kSectorSize, write_timeout_)) {
2923     return false;
2924   }
2925
2926   int64 end_time = GetTime();
2927   logprintf(12, "Log: Writing time: %lld us (thread %d).\n",
2928             end_time - start_time, thread_num_);
2929   if (end_time - start_time > write_threshold_) {
2930     logprintf(5, "Log: Write took %lld us which is longer than threshold "
2931                  "%lld us on disk %s (thread %d).\n",
2932               end_time - start_time, write_threshold_, device_name_.c_str(),
2933               thread_num_);
2934   }
2935
2936   return true;
2937 }
2938
2939 // Verify a block on disk.
2940 bool DiskThread::ValidateBlockOnDisk(int fd, BlockData *block) {
2941   int64 blocks = block->GetSize() / read_block_size_;
2942   int64 bytes_read = 0;
2943   int64 current_blocks;
2944   int64 current_bytes;
2945   uint64 address = block->GetAddress();
2946
2947   logprintf(20, "Log: Reading sectors starting at %lld on disk %s "
2948             "(thread %d).\n",
2949             address, device_name_.c_str(), thread_num_);
2950
2951   // Read block from disk and time the read.  If it takes longer than the
2952   // threshold, complain.
2953   if (lseek(fd, address * kSectorSize, SEEK_SET) == -1) {
2954     logprintf(0, "Process Error: Unable to seek to sector %lld in "
2955               "DiskThread::ValidateSectorsOnDisk on disk %s "
2956               "(thread %d).\n", address, device_name_.c_str(), thread_num_);
2957     return false;
2958   }
2959   int64 start_time = GetTime();
2960
2961   // Split a large write-sized block into small read-sized blocks and
2962   // read them in groups of randomly-sized multiples of read block size.
2963   // This assures all data written on disk by this particular block
2964   // will be tested using a random reading pattern.
2965
2966   while (blocks != 0) {
2967     // Test all read blocks in a written block.
2968     current_blocks = (random() % blocks) + 1;
2969     current_bytes = current_blocks * read_block_size_;
2970
2971     memset(block_buffer_, 0, current_bytes);
2972
2973     logprintf(20, "Log: Reading %lld sectors starting at sector %lld on "
2974               "disk %s (thread %d)\n",
2975               current_bytes / kSectorSize,
2976               (address * kSectorSize + bytes_read) / kSectorSize,
2977               device_name_.c_str(), thread_num_);
2978
2979     if (!AsyncDiskIO(ASYNC_IO_READ, fd, block_buffer_, current_bytes,
2980                      address * kSectorSize + bytes_read,
2981                      write_timeout_)) {
2982       return false;
2983     }
2984
2985     int64 end_time = GetTime();
2986     logprintf(20, "Log: Reading time: %lld us (thread %d).\n",
2987               end_time - start_time, thread_num_);
2988     if (end_time - start_time > read_threshold_) {
2989       logprintf(5, "Log: Read took %lld us which is longer than threshold "
2990                 "%lld us on disk %s (thread %d).\n",
2991                 end_time - start_time, read_threshold_,
2992                 device_name_.c_str(), thread_num_);
2993     }
2994
2995     // In non-destructive mode, don't compare the block to the pattern since
2996     // the block was never written to disk in the first place.
2997     if (!non_destructive_) {
2998       if (CheckRegion(block_buffer_, block->GetPattern(), current_bytes,
2999                       0, bytes_read)) {
3000         os_->ErrorReport(device_name_.c_str(), "disk-pattern-error", 1);
3001         errorcount_ += 1;
3002         logprintf(0, "Hardware Error: Pattern mismatch in block starting at "
3003                   "sector %lld in DiskThread::ValidateSectorsOnDisk on "
3004                   "disk %s (thread %d).\n",
3005                   address, device_name_.c_str(), thread_num_);
3006       }
3007     }
3008
3009     bytes_read += current_blocks * read_block_size_;
3010     blocks -= current_blocks;
3011   }
3012
3013   return true;
3014 }
3015
3016 int DiskThread::Work() {
3017   int fd;
3018
3019   logprintf(9, "Log: Starting disk thread %d, disk %s\n",
3020             thread_num_, device_name_.c_str());
3021
3022   srandom(time(NULL));
3023
3024   if (!OpenDevice(&fd)) {
3025     return 0;
3026   }
3027
3028   // Allocate a block buffer aligned to 512 bytes since the kernel requires it
3029   // when using direst IO.
3030   block_buffer_ = memalign(kBufferAlignment, write_block_size_);
3031   if (block_buffer_ == NULL) {
3032     CloseDevice(fd);
3033     logprintf(0, "Process Error: Unable to allocate memory for buffers "
3034                  "for disk %s (thread %d).\n",
3035               device_name_.c_str(), thread_num_);
3036     return 0;
3037   }
3038
3039   if (io_setup(5, &aio_ctx_)) {
3040     logprintf(0, "Process Error: Unable to create aio context for disk %s"
3041               " (thread %d).\n",
3042               device_name_.c_str(), thread_num_);
3043     return 0;
3044   }
3045
3046   DoWork(fd);
3047
3048   status_ = 1;
3049
3050   io_destroy(aio_ctx_);
3051   CloseDevice(fd);
3052   free(block_buffer_);
3053
3054   logprintf(9, "Log: Completed %d (disk %s): disk thread status %d, "
3055                "%d pages copied\n",
3056             thread_num_, device_name_.c_str(), status_, pages_copied_);
3057   return 1;
3058 }
3059
3060 RandomDiskThread::RandomDiskThread(DiskBlockTable *block_table)
3061     : DiskThread(block_table) {
3062   update_block_table_ = 0;
3063 }
3064
3065 RandomDiskThread::~RandomDiskThread() {
3066 }
3067
3068 bool RandomDiskThread::DoWork(int fd) {
3069   blocks_read_ = 0;
3070   blocks_written_ = 0;
3071   logprintf(11, "Random phase for disk %s (thread %d).\n",
3072             device_name_.c_str(), thread_num_);
3073   while (IsReadyToRun()) {
3074     BlockData *block = block_table_->GetRandomBlock();
3075     if (block == NULL) {
3076       logprintf(12, "No block available for device %s (thread %d).\n",
3077                 device_name_.c_str(), thread_num_);
3078     } else {
3079       ValidateBlockOnDisk(fd, block);
3080       block_table_->ReleaseBlock(block);
3081       blocks_read_++;
3082     }
3083   }
3084   pages_copied_ = blocks_read_;
3085   return true;
3086 }
3087
3088 MemoryRegionThread::MemoryRegionThread() {
3089   error_injection_ = false;
3090   pages_ = NULL;
3091 }
3092
3093 MemoryRegionThread::~MemoryRegionThread() {
3094   if (pages_ != NULL)
3095     delete pages_;
3096 }
3097
3098 bool MemoryRegionThread::SetRegion(void *region, int64 size) {
3099   int plength = sat_->page_length();
3100   int npages = size / plength;
3101   if (size % plength) {
3102     logprintf(0, "Process Error: region size is not a multiple of SAT "
3103               "page length\n");
3104     return false;
3105   } else {
3106     if (pages_ != NULL)
3107       delete pages_;
3108     pages_ = new PageEntryQueue(npages);
3109     char *base_addr = reinterpret_cast<char*>(region);
3110     region_ = base_addr;
3111     for (int i = 0; i < npages; i++) {
3112       struct page_entry pe;
3113       init_pe(&pe);
3114       pe.addr = reinterpret_cast<void*>(base_addr + i * plength);
3115       pe.offset = i * plength;
3116
3117       pages_->Push(&pe);
3118     }
3119     return true;
3120   }
3121 }
3122
3123 void MemoryRegionThread::ProcessError(struct ErrorRecord *error,
3124                                       int priority,
3125                                       const char *message) {
3126   uint32 buffer_offset;
3127   if (phase_ == kPhaseCopy) {
3128     // If the error occurred on the Copy Phase, it means that
3129     // the source data (i.e., the main memory) is wrong. so
3130     // just pass it to the original ProcessError to call a
3131     // bad-dimm error
3132     WorkerThread::ProcessError(error, priority, message);
3133   } else if (phase_ == kPhaseCheck) {
3134     // A error on the Check Phase means that the memory region tested
3135     // has an error. Gathering more information and then reporting
3136     // the error.
3137     // Determine if this is a write or read error.
3138     os_->Flush(error->vaddr);
3139     error->reread = *(error->vaddr);
3140     char *good = reinterpret_cast<char*>(&(error->expected));
3141     char *bad = reinterpret_cast<char*>(&(error->actual));
3142     sat_assert(error->expected != error->actual);
3143     unsigned int offset = 0;
3144     for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
3145       if (good[offset] != bad[offset])
3146         break;
3147     }
3148
3149     error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
3150
3151     buffer_offset = error->vbyteaddr - region_;
3152
3153     // Find physical address if possible.
3154     error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
3155     logprintf(priority,
3156               "%s: miscompare on %s, CRC check at %p(0x%llx), "
3157               "offset %llx: read:0x%016llx, reread:0x%016llx "
3158               "expected:0x%016llx\n",
3159               message,
3160               identifier_.c_str(),
3161               error->vaddr,
3162               error->paddr,
3163               buffer_offset,
3164               error->actual,
3165               error->reread,
3166               error->expected);
3167   } else {
3168     logprintf(0, "Process Error: memory region thread raised an "
3169               "unexpected error.");
3170   }
3171 }
3172
3173 int MemoryRegionThread::Work() {
3174   struct page_entry source_pe;
3175   struct page_entry memregion_pe;
3176   int result = 1;
3177   int64 loops = 0;
3178   const uint64 error_constant = 0x00ba00000000ba00LL;
3179
3180   // For error injection.
3181   int64 *addr = 0x0;
3182   int offset = 0;
3183   int64 data = 0;
3184
3185   logprintf(9, "Log: Starting Memory Region thread %d\n", thread_num_);
3186
3187   while (IsReadyToRun()) {
3188     // Getting pages from SAT and queue.
3189     phase_ = kPhaseNoPhase;
3190     result &= sat_->GetValid(&source_pe);
3191     if (!result) {
3192       logprintf(0, "Process Error: memory region thread failed to pop "
3193                 "pages from SAT, bailing\n");
3194       break;
3195     }
3196
3197     result &= pages_->PopRandom(&memregion_pe);
3198     if (!result) {
3199       logprintf(0, "Process Error: memory region thread failed to pop "
3200                 "pages from queue, bailing\n");
3201       break;
3202     }
3203
3204     // Error injection for CRC copy.
3205     if ((sat_->error_injection() || error_injection_) && loops == 1) {
3206       addr = reinterpret_cast<int64*>(source_pe.addr);
3207       offset = random() % (sat_->page_length() / wordsize_);
3208       data = addr[offset];
3209       addr[offset] = error_constant;
3210     }
3211
3212     // Copying SAT page into memory region.
3213     phase_ = kPhaseCopy;
3214     CrcCopyPage(&memregion_pe, &source_pe);
3215     memregion_pe.pattern = source_pe.pattern;
3216
3217     // Error injection for CRC Check.
3218     if ((sat_->error_injection() || error_injection_) && loops == 2) {
3219       addr = reinterpret_cast<int64*>(memregion_pe.addr);
3220       offset = random() % (sat_->page_length() / wordsize_);
3221       data = addr[offset];
3222       addr[offset] = error_constant;
3223     }
3224
3225     // Checking page content in memory region.
3226     phase_ = kPhaseCheck;
3227     CrcCheckPage(&memregion_pe);
3228
3229     phase_ = kPhaseNoPhase;
3230     // Storing pages on their proper queues.
3231     result &= sat_->PutValid(&source_pe);
3232     if (!result) {
3233       logprintf(0, "Process Error: memory region thread failed to push "
3234                 "pages into SAT, bailing\n");
3235       break;
3236     }
3237     result &= pages_->Push(&memregion_pe);
3238     if (!result) {
3239       logprintf(0, "Process Error: memory region thread failed to push "
3240                 "pages into queue, bailing\n");
3241       break;
3242     }
3243
3244     if ((sat_->error_injection() || error_injection_) &&
3245         loops >= 1 && loops <= 2) {
3246       addr[offset] = data;
3247     }
3248
3249     loops++;
3250     YieldSelf();
3251   }
3252
3253   pages_copied_ = loops;
3254   status_ = result;
3255   logprintf(9, "Log: Completed %d: Memory Region thread. Status %d, %d "
3256             "pages checked\n", thread_num_, status_, pages_copied_);
3257   return 1;
3258 }