1 // Copyright 2006 Google Inc. All Rights Reserved.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 // worker.cc : individual tasks that can be run in combination to
30 #include <sys/select.h>
32 #include <sys/types.h>
33 #include <sys/times.h>
35 // These are necessary, but on by default
37 // #define __USE_LARGEFILE64
39 #include <sys/socket.h>
41 #include <arpa/inet.h>
42 #include <linux/unistd.h> // for gettid
43 // For size of block device
44 #include <sys/ioctl.h>
46 // For asynchronous I/O
47 #include <linux/aio_abi.h>
49 #include <sys/syscall.h>
54 // This file must work with autoconf on its public version,
55 // so these includes are correct.
56 #include "error_diag.h" // NOLINT
57 #include "os.h" // NOLINT
58 #include "pattern.h" // NOLINT
59 #include "queue.h" // NOLINT
60 #include "sat.h" // NOLINT
61 #include "sattypes.h" // NOLINT
62 #include "worker.h" // NOLINT
65 // Why ubuntu, do you hate gettid so bad?
66 #if !defined(__NR_gettid)
67 # define __NR_gettid 224
70 #define gettid() syscall(__NR_gettid)
71 #if !defined(CPU_SETSIZE)
72 _syscall3(int, sched_getaffinity, pid_t, pid,
73 unsigned int, len, cpu_set_t*, mask)
74 _syscall3(int, sched_setaffinity, pid_t, pid,
75 unsigned int, len, cpu_set_t*, mask)
78 // Linux aio syscalls.
79 #if !defined(__NR_io_setup)
80 #define __NR_io_setup 206
81 #define __NR_io_destroy 207
82 #define __NR_io_getevents 208
83 #define __NR_io_submit 209
84 #define __NR_io_cancel 210
87 #define io_setup(nr_events, ctxp) \
88 syscall(__NR_io_setup, (nr_events), (ctxp))
89 #define io_submit(ctx_id, nr, iocbpp) \
90 syscall(__NR_io_submit, (ctx_id), (nr), (iocbpp))
91 #define io_getevents(ctx_id, io_getevents, nr, events, timeout) \
92 syscall(__NR_io_getevents, (ctx_id), (io_getevents), (nr), (events), \
94 #define io_cancel(ctx_id, iocb, result) \
95 syscall(__NR_io_cancel, (ctx_id), (iocb), (result))
96 #define io_destroy(ctx) \
97 syscall(__NR_io_destroy, (ctx))
100 // Get HW core ID from cpuid instruction.
101 inline int apicid(void) {
103 #ifdef STRESSAPPTEST_CPU_PPC
106 __asm __volatile("cpuid" : "=b" (cpu) : "a" (1) : "cx", "dx");
111 // Work around the sad fact that there are two (gnu, xsi) incompatible
112 // versions of strerror_r floating around google. Awesome.
113 bool sat_strerror(int err, char *buf, int len) {
115 char *errmsg = reinterpret_cast<char*>(strerror_r(err, buf, len));
116 int retval = reinterpret_cast<int64>(errmsg);
122 strncpy(buf, errmsg, len);
129 inline uint64 addr_to_tag(void *address) {
130 return reinterpret_cast<uint64>(address);
134 #if !defined(O_DIRECT)
135 // Sometimes this isn't available.
136 // Disregard if it's not defined.
140 // A struct to hold captured errors, for later reporting.
142 uint64 actual; // This is the actual value read.
143 uint64 reread; // This is the actual value, reread.
144 uint64 expected; // This is what it should have been.
145 uint64 *vaddr; // This is where it was (or wasn't).
146 char *vbyteaddr; // This is byte specific where the data was (or wasn't).
147 uint64 paddr; // This is the bus address, if available.
148 uint64 *tagvaddr; // This holds the tag value if this data was tagged.
149 uint64 tagpaddr; // This holds the physical address corresponding to the tag.
152 // This is a helper function to create new threads with pthreads.
153 static void *ThreadSpawnerGeneric(void *ptr) {
154 WorkerThread *worker = static_cast<WorkerThread*>(ptr);
155 worker->StartRoutine();
160 void WorkerStatus::Initialize() {
161 sat_assert(0 == pthread_mutex_init(&num_workers_mutex_, NULL));
162 sat_assert(0 == pthread_rwlock_init(&status_rwlock_, NULL));
163 sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL,
167 void WorkerStatus::Destroy() {
168 sat_assert(0 == pthread_mutex_destroy(&num_workers_mutex_));
169 sat_assert(0 == pthread_rwlock_destroy(&status_rwlock_));
170 sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
173 void WorkerStatus::PauseWorkers() {
174 if (SetStatus(PAUSE) != PAUSE)
175 WaitOnPauseBarrier();
178 void WorkerStatus::ResumeWorkers() {
179 if (SetStatus(RUN) == PAUSE)
180 WaitOnPauseBarrier();
183 void WorkerStatus::StopWorkers() {
184 if (SetStatus(STOP) == PAUSE)
185 WaitOnPauseBarrier();
188 bool WorkerStatus::ContinueRunning() {
189 // This loop is an optimization. We use it to immediately re-check the status
190 // after resuming from a pause, instead of returning and waiting for the next
191 // call to this function.
193 switch (GetStatus()) {
197 // Wait for the other workers to call this function so that
198 // PauseWorkers() can return.
199 WaitOnPauseBarrier();
200 // Wait for ResumeWorkers() to be called.
201 WaitOnPauseBarrier();
209 bool WorkerStatus::ContinueRunningNoPause() {
210 return (GetStatus() != STOP);
213 void WorkerStatus::RemoveSelf() {
214 // Acquire a read lock on status_rwlock_ while (status_ != PAUSE).
216 AcquireStatusReadLock();
217 if (status_ != PAUSE)
219 // We need to obey PauseWorkers() just like ContinueRunning() would, so that
220 // the other threads won't wait on pause_barrier_ forever.
222 // Wait for the other workers to call this function so that PauseWorkers()
224 WaitOnPauseBarrier();
225 // Wait for ResumeWorkers() to be called.
226 WaitOnPauseBarrier();
229 // This lock would be unnecessary if we held a write lock instead of a read
230 // lock on status_rwlock_, but that would also force all threads calling
231 // ContinueRunning() to wait on this one. Using a separate lock avoids that.
232 AcquireNumWorkersLock();
233 // Decrement num_workers_ and reinitialize pause_barrier_, which we know isn't
234 // in use because (status != PAUSE).
235 sat_assert(0 == pthread_barrier_destroy(&pause_barrier_));
236 sat_assert(0 == pthread_barrier_init(&pause_barrier_, NULL, num_workers_));
238 ReleaseNumWorkersLock();
240 // Release status_rwlock_.
245 // Parent thread class.
246 WorkerThread::WorkerThread() {
250 runduration_usec_ = 0;
252 worker_status_ = NULL;
253 thread_spawner_ = &ThreadSpawnerGeneric;
257 WorkerThread::~WorkerThread() {}
259 // Constructors. Just init some default values.
260 FillThread::FillThread() {
261 num_pages_to_fill_ = 0;
264 // Initialize file name to empty.
265 FileThread::FileThread() {
273 // If file thread used bounce buffer in memory, account for the extra
274 // copy for memory bandwidth calculation.
275 float FileThread::GetMemoryCopiedData() {
276 if (!os_->normal_mem())
277 return GetCopiedData();
282 // Initialize target hostname to be invalid.
283 NetworkThread::NetworkThread() {
284 snprintf(ipaddr_, sizeof(ipaddr_), "Unknown");
289 NetworkSlaveThread::NetworkSlaveThread() {
293 NetworkListenThread::NetworkListenThread() {
296 // Init member variables.
297 void WorkerThread::InitThread(int thread_num_init,
299 class OsLayer *os_init,
300 class PatternList *patternlist_init,
301 WorkerStatus *worker_status) {
302 sat_assert(worker_status);
303 worker_status->AddWorkers(1);
305 thread_num_ = thread_num_init;
308 patternlist_ = patternlist_init;
309 worker_status_ = worker_status;
311 cpu_mask_ = AvailableCpus();
314 tag_mode_ = sat_->tag_mode();
318 // Use pthreads to prioritize a system thread.
319 bool WorkerThread::InitPriority() {
320 // This doesn't affect performance that much, and may not be too safe.
322 bool ret = BindToCpus(cpu_mask_);
324 logprintf(11, "Log: Bind to %x failed.\n", cpu_mask_);
326 logprintf(11, "Log: Thread %d running on apic ID %d mask %x (%x).\n",
327 thread_num_, apicid(), CurrentCpus(), cpu_mask_);
329 if (priority_ == High) {
331 param.sched_priority = 1;
332 // Set the priority; others are unchanged.
333 logprintf(0, "Log: Changing priority to SCHED_FIFO %d\n",
334 param.sched_priority);
335 if (sched_setscheduler(0, SCHED_FIFO, ¶m)) {
337 sat_strerror(errno, buf, sizeof(buf));
338 logprintf(0, "Process Error: sched_setscheduler "
339 "failed - error %d %s\n",
347 // Use pthreads to create a system thread.
348 int WorkerThread::SpawnThread() {
349 // Create the new thread.
350 int result = pthread_create(&thread_, NULL, thread_spawner_, this);
353 sat_strerror(result, buf, sizeof(buf));
354 logprintf(0, "Process Error: pthread_create "
355 "failed - error %d %s\n", result,
361 // 0 is pthreads success.
365 // Kill the worker thread with SIGINT.
366 int WorkerThread::KillThread() {
367 pthread_kill(thread_, SIGINT);
371 // Block until thread has exited.
372 int WorkerThread::JoinThread() {
373 int result = pthread_join(thread_, NULL);
376 logprintf(0, "Process Error: pthread_join failed - error %d\n", result);
380 // 0 is pthreads success.
385 void WorkerThread::StartRoutine() {
390 worker_status_->RemoveSelf();
394 // Thread work loop. Execute until marked finished.
395 int WorkerThread::Work() {
397 logprintf(9, "Log: ...\n");
398 // Sleep for 1 second.
400 } while (IsReadyToRun());
406 // Returns CPU mask of CPUs available to this process,
407 // Conceptually, each bit represents a logical CPU, ie:
408 // mask = 3 (11b): cpu0, 1
409 // mask = 13 (1101b): cpu0, 2, 3
410 uint32 WorkerThread::AvailableCpus() {
412 CPU_ZERO(&curr_cpus);
413 sched_getaffinity(getppid(), sizeof(curr_cpus), &curr_cpus);
414 return cpuset_to_uint32(&curr_cpus);
418 // Returns CPU mask of CPUs this thread is bound to,
419 // Conceptually, each bit represents a logical CPU, ie:
420 // mask = 3 (11b): cpu0, 1
421 // mask = 13 (1101b): cpu0, 2, 3
422 uint32 WorkerThread::CurrentCpus() {
424 CPU_ZERO(&curr_cpus);
425 sched_getaffinity(0, sizeof(curr_cpus), &curr_cpus);
426 return cpuset_to_uint32(&curr_cpus);
430 // Bind worker thread to specified CPU(s)
432 // thread_mask: cpu_set_t representing CPUs, ie
433 // mask = 1 (01b): cpu0
434 // mask = 3 (11b): cpu0, 1
435 // mask = 13 (1101b): cpu0, 2, 3
437 // Returns true on success, false otherwise.
438 bool WorkerThread::BindToCpus(uint32 thread_mask) {
439 uint32 process_mask = AvailableCpus();
440 if (thread_mask == process_mask)
443 logprintf(11, "Log: available CPU mask - %x\n", process_mask);
444 if ((thread_mask | process_mask) != process_mask) {
445 // Invalid cpu_mask, ie cpu not allocated to this process or doesn't exist.
446 logprintf(0, "Log: requested CPUs %x not a subset of available %x\n",
447 thread_mask, process_mask);
451 cpuset_from_uint32(thread_mask, &cpuset);
452 return (sched_setaffinity(gettid(), sizeof(cpuset), &cpuset) == 0);
456 // A worker thread can yield itself to give up CPU until it's scheduled again.
457 // Returns true on success, false on error.
458 bool WorkerThread::YieldSelf() {
459 return (sched_yield() == 0);
463 // Fill this page with its pattern.
464 bool WorkerThread::FillPage(struct page_entry *pe) {
465 // Error check arguments.
467 logprintf(0, "Process Error: Fill Page entry null\n");
471 // Mask is the bitmask of indexes used by the pattern.
472 // It is the pattern size -1. Size is always a power of 2.
473 uint64 *memwords = static_cast<uint64*>(pe->addr);
474 int length = sat_->page_length();
477 // Select tag or data as appropriate.
478 for (int i = 0; i < length / wordsize_; i++) {
481 if ((i & 0x7) == 0) {
482 data.l64 = addr_to_tag(&memwords[i]);
484 data.l32.l = pe->pattern->pattern(i << 1);
485 data.l32.h = pe->pattern->pattern((i << 1) + 1);
487 memwords[i] = data.l64;
490 // Just fill in untagged data directly.
491 for (int i = 0; i < length / wordsize_; i++) {
494 data.l32.l = pe->pattern->pattern(i << 1);
495 data.l32.h = pe->pattern->pattern((i << 1) + 1);
496 memwords[i] = data.l64;
504 // Tell the thread how many pages to fill.
505 void FillThread::SetFillPages(int64 num_pages_to_fill_init) {
506 num_pages_to_fill_ = num_pages_to_fill_init;
509 // Fill this page with a random pattern.
510 bool FillThread::FillPageRandom(struct page_entry *pe) {
511 // Error check arguments.
513 logprintf(0, "Process Error: Fill Page entry null\n");
516 if ((patternlist_ == 0) || (patternlist_->Size() == 0)) {
517 logprintf(0, "Process Error: No data patterns available\n");
521 // Choose a random pattern for this block.
522 pe->pattern = patternlist_->GetRandomPattern();
523 if (pe->pattern == 0) {
524 logprintf(0, "Process Error: Null data pattern\n");
528 // Actually fill the page.
533 // Memory fill work loop. Execute until alloted pages filled.
534 int FillThread::Work() {
537 logprintf(9, "Log: Starting fill thread %d\n", thread_num_);
539 // We want to fill num_pages_to_fill pages, and
540 // stop when we've filled that many.
541 // We also want to capture early break
542 struct page_entry pe;
544 while (IsReadyToRun() && (loops < num_pages_to_fill_)) {
545 result &= sat_->GetEmpty(&pe);
547 logprintf(0, "Process Error: fill_thread failed to pop pages, "
552 // Fill the page with pattern
553 result &= FillPageRandom(&pe);
556 // Put the page back on the queue.
557 result &= sat_->PutValid(&pe);
559 logprintf(0, "Process Error: fill_thread failed to push pages, "
566 // Fill in thread status.
567 pages_copied_ = loops;
569 logprintf(9, "Log: Completed %d: Fill thread. Status %d, %d pages filled\n",
570 thread_num_, status_, pages_copied_);
575 // Print error information about a data miscompare.
576 void WorkerThread::ProcessError(struct ErrorRecord *error,
578 const char *message) {
579 char dimm_string[256] = "";
581 int apic_id = apicid();
582 uint32 cpumask = CurrentCpus();
584 // Determine if this is a write or read error.
585 os_->Flush(error->vaddr);
586 error->reread = *(error->vaddr);
588 char *good = reinterpret_cast<char*>(&(error->expected));
589 char *bad = reinterpret_cast<char*>(&(error->actual));
591 sat_assert(error->expected != error->actual);
592 unsigned int offset = 0;
593 for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
594 if (good[offset] != bad[offset])
598 error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
600 // Find physical address if possible.
601 error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
603 // Pretty print DIMM mapping if available.
604 os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
606 // Report parseable error.
608 // Run miscompare error through diagnoser for logging and reporting.
609 os_->error_diagnoser_->AddMiscompareError(dimm_string,
610 reinterpret_cast<uint64>
614 "%s: miscompare on CPU %d(%x) at %p(0x%llx:%s): "
615 "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
628 // Overwrite incorrect data with correct data to prevent
629 // future miscompares when this data is reused.
630 *(error->vaddr) = error->expected;
631 os_->Flush(error->vaddr);
636 // Print error information about a data miscompare.
637 void FileThread::ProcessError(struct ErrorRecord *error,
639 const char *message) {
640 char dimm_string[256] = "";
642 // Determine if this is a write or read error.
643 os_->Flush(error->vaddr);
644 error->reread = *(error->vaddr);
646 char *good = reinterpret_cast<char*>(&(error->expected));
647 char *bad = reinterpret_cast<char*>(&(error->actual));
649 sat_assert(error->expected != error->actual);
650 unsigned int offset = 0;
651 for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
652 if (good[offset] != bad[offset])
656 error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
658 // Find physical address if possible.
659 error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
661 // Pretty print DIMM mapping if available.
662 os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
664 // If crc_page_ is valid, ie checking content read back from file,
665 // track src/dst memory addresses. Otherwise catagorize as general
666 // mememory miscompare for CRC checking everywhere else.
667 if (crc_page_ != -1) {
668 int miscompare_byteoffset = static_cast<char*>(error->vbyteaddr) -
669 static_cast<char*>(page_recs_[crc_page_].dst);
670 os_->error_diagnoser_->AddHDDMiscompareError(devicename_,
672 miscompare_byteoffset,
673 page_recs_[crc_page_].src,
674 page_recs_[crc_page_].dst);
676 os_->error_diagnoser_->AddMiscompareError(dimm_string,
677 reinterpret_cast<uint64>
682 "%s: miscompare on %s at %p(0x%llx:%s): read:0x%016llx, "
683 "reread:0x%016llx expected:0x%016llx\n",
693 // Overwrite incorrect data with correct data to prevent
694 // future miscompares when this data is reused.
695 *(error->vaddr) = error->expected;
696 os_->Flush(error->vaddr);
700 // Do a word by word result check of a region.
701 // Print errors on mismatches.
702 int WorkerThread::CheckRegion(void *addr,
703 class Pattern *pattern,
706 int64 pattern_offset) {
707 uint64 *memblock = static_cast<uint64*>(addr);
708 const int kErrorLimit = 128;
710 int overflowerrors = 0; // Count of overflowed errors.
711 bool page_error = false;
712 string errormessage("Hardware Error");
714 recorded[kErrorLimit]; // Queued errors for later printing.
716 // For each word in the data region.
717 for (int i = 0; i < length / wordsize_; i++) {
718 uint64 actual = memblock[i];
721 // Determine the value that should be there.
723 int index = 2 * i + pattern_offset;
724 data.l32.l = pattern->pattern(index);
725 data.l32.h = pattern->pattern(index + 1);
727 // Check tags if necessary.
728 if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
729 expected = addr_to_tag(&memblock[i]);
733 // If the value is incorrect, save an error record for later printing.
734 if (actual != expected) {
735 if (errors < kErrorLimit) {
736 recorded[errors].actual = actual;
737 recorded[errors].expected = expected;
738 recorded[errors].vaddr = &memblock[i];
742 // If we have overflowed the error queue, just print the errors now.
743 logprintf(10, "Log: Error record overflow, too many miscompares!\n");
744 errormessage = "Page Error";
750 // Find if this is a whole block corruption.
751 if (page_error && !tag_mode_) {
752 int patsize = patternlist_->Size();
753 for (int pat = 0; pat < patsize; pat++) {
754 class Pattern *altpattern = patternlist_->GetPattern(pat);
757 const int kGoodAgain = 2;
758 const int kNoMatch = 3;
760 unsigned int badstart = 0;
761 unsigned int badend = 0;
763 // Don't match against ourself!
764 if (pattern == altpattern)
767 for (int i = 0; i < length / wordsize_; i++) {
768 uint64 actual = memblock[i];
772 // Determine the value that should be there.
773 int index = 2 * i + pattern_offset;
775 expected.l32.l = pattern->pattern(index);
776 expected.l32.h = pattern->pattern(index + 1);
778 possible.l32.l = pattern->pattern(index);
779 possible.l32.h = pattern->pattern(index + 1);
781 if (state == kGood) {
782 if (actual == expected.l64) {
784 } else if (actual == possible.l64) {
793 } else if (state == kBad) {
794 if (actual == possible.l64) {
797 } else if (actual == expected.l64) {
804 } else if (state == kGoodAgain) {
805 if (actual == expected.l64) {
814 if ((state == kGoodAgain) || (state == kBad)) {
815 unsigned int blockerrors = badend - badstart + 1;
816 errormessage = "Block Error";
817 ProcessError(&recorded[0], 0, errormessage.c_str());
818 logprintf(0, "Block Error: (%p) pattern %s instead of %s, "
819 "%d bytes from offset 0x%x to 0x%x\n",
821 altpattern->name(), pattern->name(),
822 blockerrors * wordsize_,
823 offset + badstart * wordsize_,
824 offset + badend * wordsize_);
825 errorcount_ += blockerrors;
832 // Process error queue after all errors have been recorded.
833 for (int err = 0; err < errors; err++) {
835 if (errorcount_ + err < 30)
836 priority = 0; // Bump up the priority for the first few errors.
837 ProcessError(&recorded[err], priority, errormessage.c_str());
841 // For each word in the data region.
842 int error_recount = 0;
843 for (int i = 0; i < length / wordsize_; i++) {
844 uint64 actual = memblock[i];
847 // Determine the value that should be there.
848 int index = 2 * i + pattern_offset;
850 data.l32.l = pattern->pattern(index);
851 data.l32.h = pattern->pattern(index + 1);
854 // Check tags if necessary.
855 if (tag_mode_ && ((reinterpret_cast<uint64>(&memblock[i]) & 0x3f) == 0)) {
856 expected = addr_to_tag(&memblock[i]);
859 // If the value is incorrect, save an error record for later printing.
860 if (actual != expected) {
861 if (error_recount < kErrorLimit) {
862 // We already reported these.
865 // If we have overflowed the error queue, print the errors now.
866 struct ErrorRecord er;
868 er.expected = expected;
869 er.vaddr = &memblock[i];
871 // Do the error printout. This will take a long time and
872 // likely change the machine state.
873 ProcessError(&er, 12, errormessage.c_str());
880 // Keep track of observed errors.
881 errorcount_ += errors + overflowerrors;
882 return errors + overflowerrors;
885 float WorkerThread::GetCopiedData() {
886 return pages_copied_ * sat_->page_length() / kMegabyte;
889 // Calculate the CRC of a region.
890 // Result check if the CRC mismatches.
891 int WorkerThread::CrcCheckPage(struct page_entry *srcpe) {
892 const int blocksize = 4096;
893 const int blockwords = blocksize / wordsize_;
896 const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
897 uint64 *memblock = static_cast<uint64*>(srcpe->addr);
898 int blocks = sat_->page_length() / blocksize;
899 for (int currentblock = 0; currentblock < blocks; currentblock++) {
900 uint64 *memslice = memblock + currentblock * blockwords;
904 AdlerAddrCrcC(memslice, blocksize, &crc, srcpe);
906 CalculateAdlerChecksum(memslice, blocksize, &crc);
909 // If the CRC does not match, we'd better look closer.
910 if (!crc.Equals(*expectedcrc)) {
911 logprintf(11, "Log: CrcCheckPage Falling through to slow compare, "
912 "CRC mismatch %s != %s\n",
913 crc.ToHexString().c_str(),
914 expectedcrc->ToHexString().c_str());
915 int errorcount = CheckRegion(memslice,
918 currentblock * blocksize, 0);
919 if (errorcount == 0) {
920 logprintf(0, "Log: CrcCheckPage CRC mismatch %s != %s, "
921 "but no miscompares found.\n",
922 crc.ToHexString().c_str(),
923 expectedcrc->ToHexString().c_str());
925 errors += errorcount;
929 // For odd length transfers, we should never hit this.
930 int leftovers = sat_->page_length() % blocksize;
932 uint64 *memslice = memblock + blocks * blockwords;
933 errors += CheckRegion(memslice,
936 blocks * blocksize, 0);
942 // Print error information about a data miscompare.
943 void WorkerThread::ProcessTagError(struct ErrorRecord *error,
945 const char *message) {
946 char dimm_string[256] = "";
947 char tag_dimm_string[256] = "";
948 bool read_error = false;
950 int apic_id = apicid();
951 uint32 cpumask = CurrentCpus();
953 // Determine if this is a write or read error.
954 os_->Flush(error->vaddr);
955 error->reread = *(error->vaddr);
957 // Distinguish read and write errors.
958 if (error->actual != error->reread) {
962 sat_assert(error->expected != error->actual);
964 error->vbyteaddr = reinterpret_cast<char*>(error->vaddr);
966 // Find physical address if possible.
967 error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
968 error->tagpaddr = os_->VirtualToPhysical(error->tagvaddr);
970 // Pretty print DIMM mapping if available.
971 os_->FindDimm(error->paddr, dimm_string, sizeof(dimm_string));
972 // Pretty print DIMM mapping if available.
973 os_->FindDimm(error->tagpaddr, tag_dimm_string, sizeof(tag_dimm_string));
975 // Report parseable error.
978 "%s: Tag from %p(0x%llx:%s) (%s) miscompare on CPU %d(%x) at "
980 "read:0x%016llx, reread:0x%016llx expected:0x%016llx\n",
982 error->tagvaddr, error->tagpaddr,
984 read_error?"read error":"write error",
997 // Overwrite incorrect data with correct data to prevent
998 // future miscompares when this data is reused.
999 *(error->vaddr) = error->expected;
1000 os_->Flush(error->vaddr);
1004 // Print out and log a tag error.
1005 bool WorkerThread::ReportTagError(
1009 struct ErrorRecord er;
1015 // Generate vaddr from tag.
1016 er.tagvaddr = reinterpret_cast<uint64*>(actual);
1018 ProcessTagError(&er, 0, "Hardware Error");
1022 // C implementation of Adler memory copy, with memory tagging.
1023 bool WorkerThread::AdlerAddrMemcpyC(uint64 *dstmem64,
1025 unsigned int size_in_bytes,
1026 AdlerChecksum *checksum,
1027 struct page_entry *pe) {
1028 // Use this data wrapper to access memory with 64bit read/write.
1031 unsigned int count = size_in_bytes / sizeof(data);
1033 if (count > ((1U) << 19)) {
1034 // Size is too large, must be strictly less than 512 KB.
1043 class Pattern *pattern = pe->pattern;
1047 // Process 64 bits at a time.
1048 if ((i & 0x7) == 0) {
1049 data.l64 = srcmem64[i];
1050 dstdata.l64 = dstmem64[i];
1051 uint64 src_tag = addr_to_tag(&srcmem64[i]);
1052 uint64 dst_tag = addr_to_tag(&dstmem64[i]);
1053 // Detect if tags have been corrupted.
1054 if (data.l64 != src_tag)
1055 ReportTagError(&srcmem64[i], data.l64, src_tag);
1056 if (dstdata.l64 != dst_tag)
1057 ReportTagError(&dstmem64[i], dstdata.l64, dst_tag);
1059 data.l32.l = pattern->pattern(i << 1);
1060 data.l32.h = pattern->pattern((i << 1) + 1);
1061 a1 = a1 + data.l32.l;
1063 a1 = a1 + data.l32.h;
1067 dstmem64[i] = data.l64;
1070 data.l64 = srcmem64[i];
1071 a1 = a1 + data.l32.l;
1073 a1 = a1 + data.l32.h;
1075 dstmem64[i] = data.l64;
1079 data.l64 = srcmem64[i];
1080 a2 = a2 + data.l32.l;
1082 a2 = a2 + data.l32.h;
1084 dstmem64[i] = data.l64;
1087 checksum->Set(a1, a2, b1, b2);
1092 // C implementation of Adler memory crc.
1093 bool WorkerThread::AdlerAddrCrcC(uint64 *srcmem64,
1094 unsigned int size_in_bytes,
1095 AdlerChecksum *checksum,
1096 struct page_entry *pe) {
1097 // Use this data wrapper to access memory with 64bit read/write.
1099 unsigned int count = size_in_bytes / sizeof(data);
1101 if (count > ((1U) << 19)) {
1102 // Size is too large, must be strictly less than 512 KB.
1111 class Pattern *pattern = pe->pattern;
1115 // Process 64 bits at a time.
1116 if ((i & 0x7) == 0) {
1117 data.l64 = srcmem64[i];
1118 uint64 src_tag = addr_to_tag(&srcmem64[i]);
1119 // Check that tags match expected.
1120 if (data.l64 != src_tag)
1121 ReportTagError(&srcmem64[i], data.l64, src_tag);
1124 data.l32.l = pattern->pattern(i << 1);
1125 data.l32.h = pattern->pattern((i << 1) + 1);
1126 a1 = a1 + data.l32.l;
1128 a1 = a1 + data.l32.h;
1133 data.l64 = srcmem64[i];
1134 a1 = a1 + data.l32.l;
1136 a1 = a1 + data.l32.h;
1141 data.l64 = srcmem64[i];
1142 a2 = a2 + data.l32.l;
1144 a2 = a2 + data.l32.h;
1148 checksum->Set(a1, a2, b1, b2);
1152 // Copy a block of memory quickly, while keeping a CRC of the data.
1153 // Result check if the CRC mismatches.
1154 int WorkerThread::CrcCopyPage(struct page_entry *dstpe,
1155 struct page_entry *srcpe) {
1157 const int blocksize = 4096;
1158 const int blockwords = blocksize / wordsize_;
1159 int blocks = sat_->page_length() / blocksize;
1161 // Base addresses for memory copy
1162 uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1163 uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1164 // Remember the expected CRC
1165 const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1167 for (int currentblock = 0; currentblock < blocks; currentblock++) {
1168 uint64 *targetmem = targetmembase + currentblock * blockwords;
1169 uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1173 AdlerAddrMemcpyC(targetmem, sourcemem, blocksize, &crc, srcpe);
1175 AdlerMemcpyC(targetmem, sourcemem, blocksize, &crc);
1178 // Investigate miscompares.
1179 if (!crc.Equals(*expectedcrc)) {
1180 logprintf(11, "Log: CrcCopyPage Falling through to slow compare, "
1181 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1182 expectedcrc->ToHexString().c_str());
1183 int errorcount = CheckRegion(sourcemem,
1186 currentblock * blocksize, 0);
1187 if (errorcount == 0) {
1188 logprintf(0, "Log: CrcCopyPage CRC mismatch %s != %s, "
1189 "but no miscompares found. Retrying with fresh data.\n",
1190 crc.ToHexString().c_str(),
1191 expectedcrc->ToHexString().c_str());
1193 // Copy the data originally read from this region back again.
1194 // This data should have any corruption read originally while
1195 // calculating the CRC.
1196 memcpy(sourcemem, targetmem, blocksize);
1197 errorcount = CheckRegion(sourcemem,
1200 currentblock * blocksize, 0);
1201 if (errorcount == 0) {
1202 logprintf(0, "Process Error: CrcCopyPage CRC mismatch %s != %s, "
1203 "but no miscompares found on second pass.\n",
1204 crc.ToHexString().c_str(),
1205 expectedcrc->ToHexString().c_str());
1209 errors += errorcount;
1213 // For odd length transfers, we should never hit this.
1214 int leftovers = sat_->page_length() % blocksize;
1216 uint64 *targetmem = targetmembase + blocks * blockwords;
1217 uint64 *sourcemem = sourcemembase + blocks * blockwords;
1219 errors += CheckRegion(sourcemem,
1222 blocks * blocksize, 0);
1223 int leftoverwords = leftovers / wordsize_;
1224 for (int i = 0; i < leftoverwords; i++) {
1225 targetmem[i] = sourcemem[i];
1229 // Update pattern reference to reflect new contents.
1230 dstpe->pattern = srcpe->pattern;
1232 // Clean clean clean the errors away.
1234 // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1235 // cause bad data to be propogated across the page.
1243 // Invert a block of memory quickly, traversing downwards.
1244 int InvertThread::InvertPageDown(struct page_entry *srcpe) {
1245 const int blocksize = 4096;
1246 const int blockwords = blocksize / wordsize_;
1247 int blocks = sat_->page_length() / blocksize;
1249 // Base addresses for memory copy
1250 unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1252 for (int currentblock = blocks-1; currentblock >= 0; currentblock--) {
1253 unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1254 for (int i = blockwords - 32; i >= 0; i -= 32) {
1255 for (int index = i + 31; index >= i; --index) {
1256 unsigned int actual = sourcemem[index];
1257 sourcemem[index] = ~actual;
1259 OsLayer::FastFlush(&sourcemem[i]);
1266 // Invert a block of memory, traversing upwards.
1267 int InvertThread::InvertPageUp(struct page_entry *srcpe) {
1268 const int blocksize = 4096;
1269 const int blockwords = blocksize / wordsize_;
1270 int blocks = sat_->page_length() / blocksize;
1272 // Base addresses for memory copy
1273 unsigned int *sourcemembase = static_cast<unsigned int *>(srcpe->addr);
1275 for (int currentblock = 0; currentblock < blocks; currentblock++) {
1276 unsigned int *sourcemem = sourcemembase + currentblock * blockwords;
1277 for (int i = 0; i < blockwords; i += 32) {
1278 for (int index = i; index <= i + 31; ++index) {
1279 unsigned int actual = sourcemem[index];
1280 sourcemem[index] = ~actual;
1282 OsLayer::FastFlush(&sourcemem[i]);
1288 // Copy a block of memory quickly, while keeping a CRC of the data.
1289 // Result check if the CRC mismatches. Warm the CPU while running
1290 int WorkerThread::CrcWarmCopyPage(struct page_entry *dstpe,
1291 struct page_entry *srcpe) {
1293 const int blocksize = 4096;
1294 const int blockwords = blocksize / wordsize_;
1295 int blocks = sat_->page_length() / blocksize;
1297 // Base addresses for memory copy
1298 uint64 *targetmembase = static_cast<uint64*>(dstpe->addr);
1299 uint64 *sourcemembase = static_cast<uint64*>(srcpe->addr);
1300 // Remember the expected CRC
1301 const AdlerChecksum *expectedcrc = srcpe->pattern->crc();
1303 for (int currentblock = 0; currentblock < blocks; currentblock++) {
1304 uint64 *targetmem = targetmembase + currentblock * blockwords;
1305 uint64 *sourcemem = sourcemembase + currentblock * blockwords;
1309 AdlerAddrMemcpyC(targetmem, sourcemem, blocksize, &crc, srcpe);
1311 os_->AdlerMemcpyWarm(targetmem, sourcemem, blocksize, &crc);
1314 // Investigate miscompares.
1315 if (!crc.Equals(*expectedcrc)) {
1316 logprintf(11, "Log: CrcWarmCopyPage Falling through to slow compare, "
1317 "CRC mismatch %s != %s\n", crc.ToHexString().c_str(),
1318 expectedcrc->ToHexString().c_str());
1319 int errorcount = CheckRegion(sourcemem,
1322 currentblock * blocksize, 0);
1323 if (errorcount == 0) {
1324 logprintf(0, "Log: CrcWarmCopyPage CRC mismatch %s != %s, "
1325 "but no miscompares found. Retrying with fresh data.\n",
1326 crc.ToHexString().c_str(),
1327 expectedcrc->ToHexString().c_str());
1329 // Copy the data originally read from this region back again.
1330 // This data should have any corruption read originally while
1331 // calculating the CRC.
1332 memcpy(sourcemem, targetmem, blocksize);
1333 errorcount = CheckRegion(sourcemem,
1336 currentblock * blocksize, 0);
1337 if (errorcount == 0) {
1338 logprintf(0, "Process Error: CrcWarmCopyPage CRC mismatch %s "
1339 "!= %s, but no miscompares found on second pass.\n",
1340 crc.ToHexString().c_str(),
1341 expectedcrc->ToHexString().c_str());
1345 errors += errorcount;
1349 // For odd length transfers, we should never hit this.
1350 int leftovers = sat_->page_length() % blocksize;
1352 uint64 *targetmem = targetmembase + blocks * blockwords;
1353 uint64 *sourcemem = sourcemembase + blocks * blockwords;
1355 errors += CheckRegion(sourcemem,
1358 blocks * blocksize, 0);
1359 int leftoverwords = leftovers / wordsize_;
1360 for (int i = 0; i < leftoverwords; i++) {
1361 targetmem[i] = sourcemem[i];
1365 // Update pattern reference to reflect new contents.
1366 dstpe->pattern = srcpe->pattern;
1368 // Clean clean clean the errors away.
1370 // TODO(nsanders): Maybe we should patch rather than fill? Filling may
1371 // cause bad data to be propogated across the page.
1379 // Memory check work loop. Execute until done, then exhaust pages.
1380 int CheckThread::Work() {
1381 struct page_entry pe;
1385 logprintf(9, "Log: Starting Check thread %d\n", thread_num_);
1387 // We want to check all the pages, and
1388 // stop when there aren't any left.
1390 result &= sat_->GetValid(&pe);
1392 if (IsReadyToRunNoPause())
1393 logprintf(0, "Process Error: check_thread failed to pop pages, "
1400 // Do the result check.
1403 // Push pages back on the valid queue if we are still going,
1404 // throw them out otherwise.
1405 if (IsReadyToRunNoPause())
1406 result &= sat_->PutValid(&pe);
1408 result &= sat_->PutEmpty(&pe);
1410 logprintf(0, "Process Error: check_thread failed to push pages, "
1417 pages_copied_ = loops;
1419 logprintf(9, "Log: Completed %d: Check thread. Status %d, %d pages checked\n",
1420 thread_num_, status_, pages_copied_);
1425 // Memory copy work loop. Execute until marked done.
1426 int CopyThread::Work() {
1427 struct page_entry src;
1428 struct page_entry dst;
1432 logprintf(9, "Log: Starting copy thread %d: cpu %x, mem %x\n",
1433 thread_num_, cpu_mask_, tag_);
1435 while (IsReadyToRun()) {
1436 // Pop the needed pages.
1437 result &= sat_->GetValid(&src, tag_);
1438 result &= sat_->GetEmpty(&dst, tag_);
1440 logprintf(0, "Process Error: copy_thread failed to pop pages, "
1445 // Force errors for unittests.
1446 if (sat_->error_injection()) {
1448 char *addr = reinterpret_cast<char*>(src.addr);
1449 int offset = random() % sat_->page_length();
1450 addr[offset] = 0xba;
1454 // We can use memcpy, or CRC check while we copy.
1456 CrcWarmCopyPage(&dst, &src);
1457 } else if (sat_->strict()) {
1458 CrcCopyPage(&dst, &src);
1460 memcpy(dst.addr, src.addr, sat_->page_length());
1461 dst.pattern = src.pattern;
1464 result &= sat_->PutValid(&dst);
1465 result &= sat_->PutEmpty(&src);
1467 // Copy worker-threads yield themselves at the end of each copy loop,
1468 // to avoid threads from preempting each other in the middle of the inner
1469 // copy-loop. Cooperations between Copy worker-threads results in less
1470 // unnecessary cache thrashing (which happens when context-switching in the
1471 // middle of the inner copy-loop).
1475 logprintf(0, "Process Error: copy_thread failed to push pages, "
1482 pages_copied_ = loops;
1484 logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1485 thread_num_, status_, pages_copied_);
1491 // Memory invert work loop. Execute until marked done.
1492 int InvertThread::Work() {
1493 struct page_entry src;
1497 logprintf(9, "Log: Starting invert thread %d\n", thread_num_);
1499 while (IsReadyToRun()) {
1500 // Pop the needed pages.
1501 result &= sat_->GetValid(&src);
1503 logprintf(0, "Process Error: invert_thread failed to pop pages, "
1511 // For the same reason CopyThread yields itself (see YieldSelf comment
1512 // in CopyThread::Work(), InvertThread yields itself after each invert
1513 // operation to improve cooperation between different worker threads
1514 // stressing the memory/cache.
1517 InvertPageDown(&src);
1519 InvertPageDown(&src);
1527 result &= sat_->PutValid(&src);
1529 logprintf(0, "Process Error: invert_thread failed to push pages, "
1536 pages_copied_ = loops * 2;
1538 logprintf(9, "Log: Completed %d: Copy thread. Status %d, %d pages copied\n",
1539 thread_num_, status_, pages_copied_);
1544 // Set file name to use for File IO.
1545 void FileThread::SetFile(const char *filename_init) {
1546 filename_ = filename_init;
1547 devicename_ = os_->FindFileDevice(filename_);
1550 // Open the file for access.
1551 bool FileThread::OpenFile(int *pfile) {
1552 int fd = open(filename_.c_str(),
1553 O_RDWR | O_CREAT | O_SYNC | O_DIRECT,
1556 logprintf(0, "Process Error: Failed to create file %s!!\n",
1567 bool FileThread::CloseFile(int fd) {
1572 // Check sector tagging.
1573 bool FileThread::SectorTagPage(struct page_entry *src, int block) {
1574 int page_length = sat_->page_length();
1575 struct FileThread::SectorTag *tag =
1576 (struct FileThread::SectorTag *)(src->addr);
1579 unsigned char magic = ((0xba + thread_num_) & 0xff);
1580 for (int sec = 0; sec < page_length / 512; sec++) {
1581 tag[sec].magic = magic;
1582 tag[sec].block = block & 0xff;
1583 tag[sec].sector = sec & 0xff;
1584 tag[sec].pass = pass_ & 0xff;
1589 bool FileThread::WritePageToFile(int fd, struct page_entry *src) {
1590 int page_length = sat_->page_length();
1591 // Fill the file with our data.
1592 int64 size = write(fd, src->addr, page_length);
1594 if (size != page_length) {
1595 os_->ErrorReport(devicename_.c_str(), "write-error", 1);
1597 logprintf(0, "Block Error: file_thread failed to write, "
1604 // Write the data to the file.
1605 bool FileThread::WritePages(int fd) {
1606 int strict = sat_->strict();
1608 // Start fresh at beginning of file for each batch of pages.
1609 lseek(fd, 0, SEEK_SET);
1610 for (int i = 0; i < sat_->disk_pages(); i++) {
1611 struct page_entry src;
1612 if (!GetValidPage(&src))
1614 // Save expected pattern.
1615 page_recs_[i].pattern = src.pattern;
1616 page_recs_[i].src = src.addr;
1618 // Check data correctness.
1622 SectorTagPage(&src, i);
1624 bool result = WritePageToFile(fd, &src);
1626 if (!PutEmptyPage(&src))
1635 // Copy data from file into memory block.
1636 bool FileThread::ReadPageFromFile(int fd, struct page_entry *dst) {
1637 int page_length = sat_->page_length();
1639 // Do the actual read.
1640 int64 size = read(fd, dst->addr, page_length);
1641 if (size != page_length) {
1642 os_->ErrorReport(devicename_.c_str(), "read-error", 1);
1643 logprintf(0, "Block Error: file_thread failed to read, "
1651 // Check sector tagging.
1652 bool FileThread::SectorValidatePage(const struct PageRec &page,
1653 struct page_entry *dst, int block) {
1655 static int calls = 0;
1658 // Do sector tag compare.
1659 int firstsector = -1;
1660 int lastsector = -1;
1661 bool badsector = false;
1662 int page_length = sat_->page_length();
1664 // Cast data block into an array of tagged sectors.
1665 struct FileThread::SectorTag *tag =
1666 (struct FileThread::SectorTag *)(dst->addr);
1668 sat_assert(sizeof(*tag) == 512);
1671 if (sat_->error_injection()) {
1673 for (int badsec = 8; badsec < 17; badsec++)
1674 tag[badsec].pass = 27;
1677 (static_cast<int32*>(dst->addr))[27] = 0xbadda7a;
1681 // Check each sector for the correct tag we added earlier,
1682 // then revert the tag to the to normal data pattern.
1683 unsigned char magic = ((0xba + thread_num_) & 0xff);
1684 for (int sec = 0; sec < page_length / 512; sec++) {
1686 if ((tag[sec].magic != magic) ||
1687 (tag[sec].block != (block & 0xff)) ||
1688 (tag[sec].sector != (sec & 0xff)) ||
1689 (tag[sec].pass != (pass_ & 0xff))) {
1690 // Offset calculation for tag location.
1691 int offset = sec * sizeof(SectorTag);
1692 if (tag[sec].block != (block & 0xff))
1693 offset += 1 * sizeof(uint8);
1694 else if (tag[sec].sector != (sec & 0xff))
1695 offset += 2 * sizeof(uint8);
1696 else if (tag[sec].pass != (pass_ & 0xff))
1697 offset += 3 * sizeof(uint8);
1699 // Run sector tag error through diagnoser for logging and reporting.
1701 os_->error_diagnoser_->AddHDDSectorTagError(devicename_, tag[sec].block,
1704 page.src, page.dst);
1706 logprintf(5, "Sector Error: Sector tag @ 0x%x, pass %d/%d. "
1707 "sec %x/%x, block %d/%d, magic %x/%x, File: %s \n",
1708 block * page_length + 512 * sec,
1709 (pass_ & 0xff), (unsigned int)tag[sec].pass,
1710 sec, (unsigned int)tag[sec].sector,
1711 block, (unsigned int)tag[sec].block,
1712 magic, (unsigned int)tag[sec].magic,
1715 // Keep track of first and last bad sector.
1716 if (firstsector == -1)
1717 firstsector = (block * page_length / 512) + sec;
1718 lastsector = (block * page_length / 512) + sec;
1721 // Patch tag back to proper pattern.
1722 unsigned int *addr = (unsigned int *)(&tag[sec]);
1723 *addr = dst->pattern->pattern(512 * sec / sizeof(*addr));
1726 // If we found sector errors:
1727 if (badsector == true) {
1728 logprintf(5, "Log: file sector miscompare at offset %x-%x. File: %s\n",
1730 ((lastsector + 1) * 512) - 1,
1733 // Either exit immediately, or patch the data up and continue.
1734 if (sat_->stop_on_error()) {
1737 // Patch up bad pages.
1738 for (int block = (firstsector * 512) / page_length;
1739 block <= (lastsector * 512) / page_length;
1741 unsigned int *memblock = static_cast<unsigned int *>(dst->addr);
1742 int length = page_length / wordsize_;
1743 for (int i = 0; i < length; i++) {
1744 memblock[i] = dst->pattern->pattern(i);
1754 // Get memory for an incoming data transfer..
1755 bool FileThread::PagePrepare() {
1756 // We can only do direct IO to SAT pages if it is normal mem.
1757 page_io_ = os_->normal_mem();
1759 // Init a local buffer if we need it.
1761 local_page_ = static_cast<void*>(memalign(512, sat_->page_length()));
1763 logprintf(0, "Process Error: disk thread memalign returned 0\n");
1772 // Remove memory allocated for data transfer.
1773 bool FileThread::PageTeardown() {
1774 // Free a local buffer if we need to.
1783 // Get memory for an incoming data transfer..
1784 bool FileThread::GetEmptyPage(struct page_entry *dst) {
1786 if (!sat_->GetEmpty(dst))
1789 dst->addr = local_page_;
1796 // Get memory for an outgoing data transfer..
1797 bool FileThread::GetValidPage(struct page_entry *src) {
1798 struct page_entry tmp;
1799 if (!sat_->GetValid(&tmp))
1805 src->addr = local_page_;
1807 CrcCopyPage(src, &tmp);
1808 if (!sat_->PutValid(&tmp))
1815 // Throw out a used empty page.
1816 bool FileThread::PutEmptyPage(struct page_entry *src) {
1818 if (!sat_->PutEmpty(src))
1824 // Throw out a used, filled page.
1825 bool FileThread::PutValidPage(struct page_entry *src) {
1827 if (!sat_->PutValid(src))
1835 // Copy data from file into memory blocks.
1836 bool FileThread::ReadPages(int fd) {
1837 int page_length = sat_->page_length();
1838 int strict = sat_->strict();
1842 // Read our data back out of the file, into it's new location.
1843 lseek(fd, 0, SEEK_SET);
1844 for (int i = 0; i < sat_->disk_pages(); i++) {
1845 struct page_entry dst;
1846 if (!GetEmptyPage(&dst))
1848 // Retrieve expected pattern.
1849 dst.pattern = page_recs_[i].pattern;
1850 // Update page recordpage record.
1851 page_recs_[i].dst = dst.addr;
1853 // Read from the file into destination page.
1854 if (!ReadPageFromFile(fd, &dst)) {
1859 SectorValidatePage(page_recs_[i], &dst, i);
1861 // Ensure that the transfer ended up with correct data.
1863 // Record page index currently CRC checked.
1865 int errors = CrcCheckPage(&dst);
1867 logprintf(5, "Log: file miscompare at block %d, "
1868 "offset %x-%x. File: %s\n",
1869 i, i * page_length, ((i + 1) * page_length) - 1,
1874 errorcount_ += errors;
1876 if (!PutValidPage(&dst))
1883 // File IO work loop. Execute until marked done.
1884 int FileThread::Work() {
1889 logprintf(9, "Log: Starting file thread %d, file %s, device %s\n",
1892 devicename_.c_str());
1897 // Open the data IO file.
1904 // Load patterns into page records.
1905 page_recs_ = new struct PageRec[sat_->disk_pages()];
1906 for (int i = 0; i < sat_->disk_pages(); i++) {
1907 page_recs_[i].pattern = new struct Pattern();
1911 while (IsReadyToRun()) {
1912 // Do the file write.
1913 if (!(fileresult &= WritePages(fd)))
1916 // Do the file read.
1917 if (!(fileresult &= ReadPages(fd)))
1924 pages_copied_ = loops * sat_->disk_pages();
1931 logprintf(9, "Log: Completed %d: file thread status %d, %d pages copied\n",
1932 thread_num_, status_, pages_copied_);
1936 bool NetworkThread::IsNetworkStopSet() {
1937 return !IsReadyToRunNoPause();
1940 bool NetworkSlaveThread::IsNetworkStopSet() {
1941 // This thread has no completion status.
1942 // It finishes whever there is no more data to be
1947 // Set ip name to use for Network IO.
1948 void NetworkThread::SetIP(const char *ipaddr_init) {
1949 strncpy(ipaddr_, ipaddr_init, 256);
1953 // Return 0 on error.
1954 bool NetworkThread::CreateSocket(int *psocket) {
1955 int sock = socket(AF_INET, SOCK_STREAM, 0);
1957 logprintf(0, "Process Error: Cannot open socket\n");
1966 // Close the socket.
1967 bool NetworkThread::CloseSocket(int sock) {
1972 // Initiate the tcp connection.
1973 bool NetworkThread::Connect(int sock) {
1974 struct sockaddr_in dest_addr;
1975 dest_addr.sin_family = AF_INET;
1976 dest_addr.sin_port = htons(kNetworkPort);
1977 memset(&(dest_addr.sin_zero), '\0', sizeof(dest_addr.sin_zero));
1979 // Translate dot notation to u32.
1980 if (inet_aton(ipaddr_, &dest_addr.sin_addr) == 0) {
1981 logprintf(0, "Process Error: Cannot resolve %s\n", ipaddr_);
1987 if (-1 == connect(sock, reinterpret_cast<struct sockaddr *>(&dest_addr),
1988 sizeof(struct sockaddr))) {
1989 logprintf(0, "Process Error: Cannot connect %s\n", ipaddr_);
1997 // Initiate the tcp connection.
1998 bool NetworkListenThread::Listen() {
1999 struct sockaddr_in sa;
2001 memset(&(sa.sin_zero), '\0', sizeof(sa.sin_zero));
2003 sa.sin_family = AF_INET;
2004 sa.sin_addr.s_addr = INADDR_ANY;
2005 sa.sin_port = htons(kNetworkPort);
2007 if (-1 == bind(sock_, (struct sockaddr*)&sa, sizeof(struct sockaddr))) {
2009 sat_strerror(errno, buf, sizeof(buf));
2010 logprintf(0, "Process Error: Cannot bind socket: %s\n", buf);
2019 // Wait for a connection from a network traffic generation thread.
2020 bool NetworkListenThread::Wait() {
2025 // Watch sock_ to see when it has input.
2027 FD_SET(sock_, &rfds);
2028 // Wait up to five seconds.
2032 retval = select(sock_ + 1, &rfds, NULL, NULL, &tv);
2034 return (retval > 0);
2037 // Wait for a connection from a network traffic generation thread.
2038 bool NetworkListenThread::GetConnection(int *pnewsock) {
2039 struct sockaddr_in sa;
2040 socklen_t size = sizeof(struct sockaddr_in);
2042 int newsock = accept(sock_, reinterpret_cast<struct sockaddr *>(&sa), &size);
2044 logprintf(0, "Process Error: Did not receive connection\n");
2049 *pnewsock = newsock;
2053 bool NetworkThread::SendPage(int sock, struct page_entry *src) {
2054 int page_length = sat_->page_length();
2055 char *address = static_cast<char*>(src->addr);
2057 // Send our data over the network.
2058 int size = page_length;
2060 int transferred = send(sock, address + (page_length - size), size, 0);
2061 if ((transferred == 0) || (transferred == -1)) {
2062 if (!IsNetworkStopSet()) {
2064 sat_strerror(errno, buf, sizeof(buf));
2065 logprintf(0, "Process Error: Thread %d, "
2066 "Network write failed, bailing. (%s)\n",
2071 size = size - transferred;
2077 bool NetworkThread::ReceivePage(int sock, struct page_entry *dst) {
2078 int page_length = sat_->page_length();
2079 char *address = static_cast<char*>(dst->addr);
2081 // Maybe we will get our data back again, maybe not.
2082 int size = page_length;
2084 int transferred = recv(sock, address + (page_length - size), size, 0);
2085 if ((transferred == 0) || (transferred == -1)) {
2086 // Typically network slave thread should exit as network master
2087 // thread stops sending data.
2088 if (IsNetworkStopSet()) {
2090 if (transferred == 0 && err == 0) {
2091 // Two system setups will not sync exactly,
2092 // allow early exit, but log it.
2093 logprintf(0, "Log: Net thread did not recieve any data, exitting.\n");
2096 sat_strerror(err, buf, sizeof(buf));
2097 // Print why we failed.
2098 logprintf(0, "Process Error: Thread %d, "
2099 "Network read failed, bailing (%s).\n",
2101 // Print arguments and results.
2102 logprintf(0, "Log: recv(%d, address %x, size %x, 0) == %x, err %d\n",
2103 sock, address + (page_length - size),
2104 size, transferred, err);
2105 if ((transferred == 0) &&
2106 (page_length - size < 512) &&
2107 (page_length - size > 0)) {
2108 // Print null terminated data received, to see who's been
2109 // sending us supicious unwanted data.
2110 address[page_length - size] = 0;
2111 logprintf(0, "Log: received %d bytes: '%s'\n",
2112 page_length - size, address);
2118 size = size - transferred;
2124 // Network IO work loop. Execute until marked done.
2125 int NetworkThread::Work() {
2126 logprintf(9, "Log: Starting network thread %d, ip %s\n",
2132 if (!CreateSocket(&sock))
2135 // Network IO loop requires network slave thread to have already initialized.
2136 // We will sleep here for awhile to ensure that the slave thread will be
2137 // listening by the time we connect.
2138 // Sleep for 15 seconds.
2140 logprintf(9, "Log: Starting execution of network thread %d, ip %s\n",
2145 // Connect to a slave thread.
2151 int strict = sat_->strict();
2153 while (IsReadyToRun()) {
2154 struct page_entry src;
2155 struct page_entry dst;
2156 result &= sat_->GetValid(&src);
2157 result &= sat_->GetEmpty(&dst);
2159 logprintf(0, "Process Error: net_thread failed to pop pages, "
2164 // Check data correctness.
2168 // Do the network write.
2169 if (!(result &= SendPage(sock, &src)))
2172 // Update pattern reference to reflect new contents.
2173 dst.pattern = src.pattern;
2175 // Do the network read.
2176 if (!(result &= ReceivePage(sock, &dst)))
2179 // Ensure that the transfer ended up with correct data.
2183 // Return all of our pages to the queue.
2184 result &= sat_->PutValid(&dst);
2185 result &= sat_->PutEmpty(&src);
2187 logprintf(0, "Process Error: net_thread failed to push pages, "
2194 pages_copied_ = loops;
2200 logprintf(9, "Log: Completed %d: network thread status %d, "
2201 "%d pages copied\n",
2202 thread_num_, status_, pages_copied_);
2206 // Spawn slave threads for incoming connections.
2207 bool NetworkListenThread::SpawnSlave(int newsock, int threadid) {
2208 logprintf(12, "Log: Listen thread spawning slave\n");
2210 // Spawn slave thread, to reflect network traffic back to sender.
2211 ChildWorker *child_worker = new ChildWorker;
2212 child_worker->thread.SetSock(newsock);
2213 child_worker->thread.InitThread(threadid, sat_, os_, patternlist_,
2214 &child_worker->status);
2215 child_worker->status.Initialize();
2216 child_worker->thread.SpawnThread();
2217 child_workers_.push_back(child_worker);
2222 // Reap slave threads.
2223 bool NetworkListenThread::ReapSlaves() {
2225 // Gather status and reap threads.
2226 logprintf(12, "Log: Joining all outstanding threads\n");
2228 for (int i = 0; i < child_workers_.size(); i++) {
2229 NetworkSlaveThread& child_thread = child_workers_[i]->thread;
2230 logprintf(12, "Log: Joining slave thread %d\n", i);
2231 child_thread.JoinThread();
2232 if (child_thread.GetStatus() != 1) {
2233 logprintf(0, "Process Error: Slave Thread %d failed with status %d\n", i,
2234 child_thread.GetStatus());
2237 errorcount_ += child_thread.GetErrorCount();
2238 logprintf(9, "Log: Slave Thread %d found %lld miscompares\n", i,
2239 child_thread.GetErrorCount());
2240 pages_copied_ += child_thread.GetPageCount();
2246 // Network listener IO work loop. Execute until marked done.
2247 int NetworkListenThread::Work() {
2249 logprintf(9, "Log: Starting network listen thread %d\n",
2254 if (!CreateSocket(&sock_))
2256 logprintf(9, "Log: Listen thread created sock\n");
2258 // Allows incoming connections to be queued up by socket library.
2261 logprintf(12, "Log: Listen thread waiting for incoming connections\n");
2263 // Wait on incoming connections, and spawn worker threads for them.
2264 int threadcount = 0;
2265 while (IsReadyToRun()) {
2266 // Poll for connections that we can accept().
2268 // Accept those connections.
2269 logprintf(12, "Log: Listen thread found incoming connection\n");
2270 if (GetConnection(&newsock)) {
2271 SpawnSlave(newsock, threadcount);
2277 // Gather status and join spawned threads.
2280 // Delete the child workers.
2281 for (ChildVector::iterator it = child_workers_.begin();
2282 it != child_workers_.end(); ++it) {
2283 (*it)->status.Destroy();
2286 child_workers_.clear();
2292 "Log: Completed %d: network listen thread status %d, "
2293 "%d pages copied\n",
2294 thread_num_, status_, pages_copied_);
2298 // Set network reflector socket struct.
2299 void NetworkSlaveThread::SetSock(int sock) {
2303 // Network reflector IO work loop. Execute until marked done.
2304 int NetworkSlaveThread::Work() {
2305 logprintf(9, "Log: Starting network slave thread %d\n",
2308 // Verify that we have a socket.
2316 // Init a local buffer for storing data.
2317 void *local_page = static_cast<void*>(memalign(512, sat_->page_length()));
2319 logprintf(0, "Process Error: Net Slave thread memalign returned 0\n");
2324 struct page_entry page;
2325 page.addr = local_page;
2327 // This thread will continue to run as long as the thread on the other end of
2328 // the socket is still sending and receiving data.
2330 // Do the network read.
2331 if (!ReceivePage(sock, &page))
2334 // Do the network write.
2335 if (!SendPage(sock, &page))
2341 pages_copied_ = loops;
2348 "Log: Completed %d: network slave thread status %d, "
2349 "%d pages copied\n",
2350 thread_num_, status_, pages_copied_);
2354 // Thread work loop. Execute until marked finished.
2355 int ErrorPollThread::Work() {
2356 logprintf(9, "Log: Starting system error poll thread %d\n", thread_num_);
2358 // This calls a generic error polling function in the Os abstraction layer.
2360 errorcount_ += os_->ErrorPoll();
2362 } while (IsReadyToRun());
2364 logprintf(9, "Log: Finished system error poll thread %d: %d errors\n",
2365 thread_num_, errorcount_);
2370 // Worker thread to heat up CPU.
2371 int CpuStressThread::Work() {
2372 logprintf(9, "Log: Starting CPU stress thread %d\n", thread_num_);
2375 // Run ludloff's platform/CPU-specific assembly workload.
2376 os_->CpuStressWorkload();
2378 } while (IsReadyToRun());
2380 logprintf(9, "Log: Finished CPU stress thread %d:\n",
2386 CpuCacheCoherencyThread::CpuCacheCoherencyThread(cc_cacheline_data *data,
2387 int cacheline_count,
2390 cc_cacheline_data_ = data;
2391 cc_cacheline_count_ = cacheline_count;
2392 cc_thread_num_ = thread_num;
2393 cc_inc_count_ = inc_count;
2396 // Worked thread to test the cache coherency of the CPUs
2397 int CpuCacheCoherencyThread::Work() {
2398 logprintf(9, "Log: Starting the Cache Coherency thread %d\n",
2400 uint64 time_start, time_end;
2403 unsigned int seed = static_cast<unsigned int>(gettid());
2404 gettimeofday(&tv, NULL); // Get the timestamp before increments.
2405 time_start = tv.tv_sec * 1000000ULL + tv.tv_usec;
2407 uint64 total_inc = 0; // Total increments done by the thread.
2408 while (IsReadyToRun()) {
2409 for (int i = 0; i < cc_inc_count_; i++) {
2410 // Choose a datastructure in random and increment the appropriate
2411 // member in that according to the offset (which is the same as the
2413 int r = rand_r(&seed);
2414 r = cc_cacheline_count_ * (r / (RAND_MAX + 1.0));
2415 // Increment the member of the randomely selected structure.
2416 (cc_cacheline_data_[r].num[cc_thread_num_])++;
2419 total_inc += cc_inc_count_;
2421 // Calculate if the local counter matches with the global value
2422 // in all the cache line structures for this particular thread.
2423 int cc_global_num = 0;
2424 for (int cline_num = 0; cline_num < cc_cacheline_count_; cline_num++) {
2425 cc_global_num += cc_cacheline_data_[cline_num].num[cc_thread_num_];
2426 // Reset the cachline member's value for the next run.
2427 cc_cacheline_data_[cline_num].num[cc_thread_num_] = 0;
2429 if (sat_->error_injection())
2432 if (cc_global_num != cc_inc_count_) {
2434 logprintf(0, "Hardware Error: global(%d) and local(%d) do not match\n",
2435 cc_global_num, cc_inc_count_);
2438 gettimeofday(&tv, NULL); // Get the timestamp at the end.
2439 time_end = tv.tv_sec * 1000000ULL + tv.tv_usec;
2441 uint64 us_elapsed = time_end - time_start;
2442 // inc_rate is the no. of increments per second.
2443 double inc_rate = total_inc * 1e6 / us_elapsed;
2445 logprintf(4, "Stats: CC Thread(%d): Time=%llu us,"
2446 " Increments=%llu, Increments/sec = %.6lf\n",
2447 cc_thread_num_, us_elapsed, total_inc, inc_rate);
2448 logprintf(9, "Log: Finished CPU Cache Coherency thread %d:\n",
2454 DiskThread::DiskThread(DiskBlockTable *block_table) {
2455 read_block_size_ = kSectorSize; // default 1 sector (512 bytes)
2456 write_block_size_ = kSectorSize; // this assumes read and write block size
2458 segment_size_ = -1; // use the entire disk as one segment
2459 cache_size_ = 16 * 1024 * 1024; // assume 16MiB cache by default
2460 // Use a queue such that 3/2 times as much data as the cache can hold
2461 // is written before it is read so that there is little chance the read
2462 // data is in the cache.
2463 queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2464 blocks_per_segment_ = 32;
2466 read_threshold_ = 100000; // 100ms is a reasonable limit for
2467 write_threshold_ = 100000; // reading/writing a sector
2469 read_timeout_ = 5000000; // 5 seconds should be long enough for a
2470 write_timeout_ = 5000000; // timout for reading/writing
2472 device_sectors_ = 0;
2473 non_destructive_ = 0;
2476 block_table_ = block_table;
2477 update_block_table_ = 1;
2480 DiskThread::~DiskThread() {
2483 // Set filename for device file (in /dev).
2484 void DiskThread::SetDevice(const char *device_name) {
2485 device_name_ = device_name;
2488 // Set various parameters that control the behaviour of the test.
2489 // -1 is used as a sentinel value on each parameter (except non_destructive)
2490 // to indicate that the parameter not be set.
2491 bool DiskThread::SetParameters(int read_block_size,
2492 int write_block_size,
2495 int blocks_per_segment,
2496 int64 read_threshold,
2497 int64 write_threshold,
2498 int non_destructive) {
2499 if (read_block_size != -1) {
2500 // Blocks must be aligned to the disk's sector size.
2501 if (read_block_size % kSectorSize != 0) {
2502 logprintf(0, "Process Error: Block size must be a multiple of %d "
2503 "(thread %d).\n", kSectorSize, thread_num_);
2507 read_block_size_ = read_block_size;
2510 if (write_block_size != -1) {
2511 // Write blocks must be aligned to the disk's sector size and to the
2513 if (write_block_size % kSectorSize != 0) {
2514 logprintf(0, "Process Error: Write block size must be a multiple "
2515 "of %d (thread %d).\n", kSectorSize, thread_num_);
2518 if (write_block_size % read_block_size_ != 0) {
2519 logprintf(0, "Process Error: Write block size must be a multiple "
2520 "of the read block size, which is %d (thread %d).\n",
2521 read_block_size_, thread_num_);
2525 write_block_size_ = write_block_size;
2528 // Make sure write_block_size_ is still valid.
2529 if (read_block_size_ > write_block_size_) {
2530 logprintf(5, "Log: Assuming write block size equal to read block size, "
2531 "which is %d (thread %d).\n", read_block_size_,
2533 write_block_size_ = read_block_size_;
2535 if (write_block_size_ % read_block_size_ != 0) {
2536 logprintf(0, "Process Error: Write block size (defined as %d) must "
2537 "be a multiple of the read block size, which is %d "
2538 "(thread %d).\n", write_block_size_, read_block_size_,
2545 if (cache_size != -1) {
2546 cache_size_ = cache_size;
2549 if (blocks_per_segment != -1) {
2550 if (blocks_per_segment <= 0) {
2551 logprintf(0, "Process Error: Blocks per segment must be greater than "
2552 "zero.\n (thread %d)", thread_num_);
2556 blocks_per_segment_ = blocks_per_segment;
2559 if (read_threshold != -1) {
2560 if (read_threshold <= 0) {
2561 logprintf(0, "Process Error: Read threshold must be greater than "
2562 "zero (thread %d).\n", thread_num_);
2566 read_threshold_ = read_threshold;
2569 if (write_threshold != -1) {
2570 if (write_threshold <= 0) {
2571 logprintf(0, "Process Error: Write threshold must be greater than "
2572 "zero (thread %d).\n", thread_num_);
2576 write_threshold_ = write_threshold;
2579 if (segment_size != -1) {
2580 // Segments must be aligned to the disk's sector size.
2581 if (segment_size % kSectorSize != 0) {
2582 logprintf(0, "Process Error: Segment size must be a multiple of %d"
2583 " (thread %d).\n", kSectorSize, thread_num_);
2587 segment_size_ = segment_size / kSectorSize;
2590 non_destructive_ = non_destructive;
2592 // Having a queue of 150% of blocks that will fit in the disk's cache
2593 // should be enough to force out the oldest block before it is read and hence,
2594 // making sure the data comes form the disk and not the cache.
2595 queue_size_ = ((cache_size_ / write_block_size_) * 3) / 2;
2596 // Updating DiskBlockTable parameters
2597 if (update_block_table_) {
2598 block_table_->SetParameters(kSectorSize, write_block_size_,
2599 device_sectors_, segment_size_,
2605 bool DiskThread::OpenDevice(int *pfile) {
2606 int fd = open(device_name_.c_str(),
2607 O_RDWR | O_SYNC | O_DIRECT | O_LARGEFILE,
2610 logprintf(0, "Process Error: Failed to open device %s (thread %d)!!\n",
2611 device_name_.c_str(), thread_num_);
2616 return GetDiskSize(fd);
2619 // Retrieves the size (in bytes) of the disk/file.
2620 bool DiskThread::GetDiskSize(int fd) {
2621 struct stat device_stat;
2622 if (fstat(fd, &device_stat) == -1) {
2623 logprintf(0, "Process Error: Unable to fstat disk %s (thread %d).\n",
2624 device_name_.c_str(), thread_num_);
2628 // For a block device, an ioctl is needed to get the size since the size
2629 // of the device file (i.e. /dev/sdb) is 0.
2630 if (S_ISBLK(device_stat.st_mode)) {
2631 uint64 block_size = 0;
2633 if (ioctl(fd, BLKGETSIZE64, &block_size) == -1) {
2634 logprintf(0, "Process Error: Unable to ioctl disk %s (thread %d).\n",
2635 device_name_.c_str(), thread_num_);
2639 // If an Elephant is initialized with status DEAD its size will be zero.
2640 if (block_size == 0) {
2641 os_->ErrorReport(device_name_.c_str(), "device-size-zero", 1);
2643 status_ = 1; // Avoid a procedural error.
2647 device_sectors_ = block_size / kSectorSize;
2649 } else if (S_ISREG(device_stat.st_mode)) {
2650 device_sectors_ = device_stat.st_size / kSectorSize;
2653 logprintf(0, "Process Error: %s is not a regular file or block "
2654 "device (thread %d).\n", device_name_.c_str(),
2659 logprintf(12, "Log: Device sectors: %lld on disk %s (thread %d).\n",
2660 device_sectors_, device_name_.c_str(), thread_num_);
2662 if (update_block_table_) {
2663 block_table_->SetParameters(kSectorSize, write_block_size_,
2664 device_sectors_, segment_size_,
2671 bool DiskThread::CloseDevice(int fd) {
2676 // Return the time in microseconds.
2677 int64 DiskThread::GetTime() {
2679 gettimeofday(&tv, NULL);
2680 return tv.tv_sec * 1000000 + tv.tv_usec;
2683 bool DiskThread::DoWork(int fd) {
2684 int64 block_num = 0;
2685 blocks_written_ = 0;
2689 if (segment_size_ == -1) {
2692 num_segments = device_sectors_ / segment_size_;
2693 if (device_sectors_ % segment_size_ != 0)
2697 // Disk size should be at least 3x cache size. See comment later for
2699 sat_assert(device_sectors_ * kSectorSize > 3 * cache_size_);
2701 // This disk test works by writing blocks with a certain pattern to
2702 // disk, then reading them back and verifying it against the pattern
2703 // at a later time. A failure happens when either the block cannot
2704 // be written/read or when the read block is different than what was
2705 // written. If a block takes too long to write/read, then a warning
2706 // is given instead of an error since taking too long is not
2707 // necessarily an error.
2709 // To prevent the read blocks from coming from the disk cache,
2710 // enough blocks are written before read such that a block would
2711 // be ejected from the disk cache by the time it is read.
2713 // TODO(amistry): Implement some sort of read/write throttling. The
2714 // flood of asynchronous I/O requests when a drive is
2715 // unplugged is causing the application and kernel to
2716 // become unresponsive.
2718 while (IsReadyToRun()) {
2719 // Write blocks to disk.
2720 logprintf(16, "Write phase for disk %s (thread %d).\n",
2721 device_name_.c_str(), thread_num_);
2722 while (IsReadyToRunNoPause() &&
2723 in_flight_sectors_.size() < queue_size_ + 1) {
2724 // Confine testing to a particular segment of the disk.
2725 int64 segment = (block_num / blocks_per_segment_) % num_segments;
2726 if (block_num % blocks_per_segment_ == 0) {
2727 logprintf(20, "Log: Starting to write segment %lld out of "
2728 "%lld on disk %s (thread %d).\n",
2729 segment, num_segments, device_name_.c_str(),
2734 BlockData *block = block_table_->GetUnusedBlock(segment);
2736 // If an unused sequence of sectors could not be found, skip to the
2737 // next block to process. Soon, a new segment will come and new
2738 // sectors will be able to be allocated. This effectively puts a
2739 // minumim on the disk size at 3x the stated cache size, or 48MiB
2740 // if a cache size is not given (since the cache is set as 16MiB
2741 // by default). Given that todays caches are at the low MiB range
2742 // and drive sizes at the mid GB, this shouldn't pose a problem.
2743 // The 3x minimum comes from the following:
2744 // 1. In order to allocate 'y' blocks from a segment, the
2745 // segment must contain at least 2y blocks or else an
2746 // allocation may not succeed.
2747 // 2. Assume the entire disk is one segment.
2748 // 3. A full write phase consists of writing blocks corresponding to
2750 // 4. Therefore, the one segment must have 2 * 3/2 * cache
2751 // size worth of blocks = 3 * cache size worth of blocks
2753 // In non-destructive mode, don't write anything to disk.
2754 if (!non_destructive_) {
2755 if (!WriteBlockToDisk(fd, block)) {
2756 block_table_->RemoveBlock(block);
2761 block->SetBlockAsInitialized();
2764 in_flight_sectors_.push(block);
2767 // Verify blocks on disk.
2768 logprintf(20, "Read phase for disk %s (thread %d).\n",
2769 device_name_.c_str(), thread_num_);
2770 while (IsReadyToRunNoPause() && !in_flight_sectors_.empty()) {
2771 BlockData *block = in_flight_sectors_.front();
2772 in_flight_sectors_.pop();
2773 ValidateBlockOnDisk(fd, block);
2774 block_table_->RemoveBlock(block);
2779 pages_copied_ = blocks_written_ + blocks_read_;
2783 // Do an asynchronous disk I/O operation.
2784 bool DiskThread::AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
2785 int64 offset, int64 timeout) {
2786 // Use the Linux native asynchronous I/O interface for reading/writing.
2787 // A read/write consists of three basic steps:
2788 // 1. create an io context.
2789 // 2. prepare and submit an io request to the context
2790 // 3. wait for an event on the context.
2795 const char *error_str;
2797 { IOCB_CMD_PREAD, "read", "disk-read-error" },
2798 { IOCB_CMD_PWRITE, "write", "disk-write-error" }
2802 memset(&cb, 0, sizeof(cb));
2805 cb.aio_lio_opcode = operations[op].opcode;
2806 cb.aio_buf = (__u64)buf;
2807 cb.aio_nbytes = size;
2808 cb.aio_offset = offset;
2810 struct iocb *cbs[] = { &cb };
2811 if (io_submit(aio_ctx_, 1, cbs) != 1) {
2812 logprintf(0, "Process Error: Unable to submit async %s "
2813 "on disk %s (thread %d).\n",
2814 operations[op].op_str, device_name_.c_str(),
2819 struct io_event event;
2820 memset(&event, 0, sizeof(event));
2822 tv.tv_sec = timeout / 1000000;
2823 tv.tv_nsec = (timeout % 1000000) * 1000;
2824 if (io_getevents(aio_ctx_, 1, 1, &event, &tv) != 1) {
2825 // A ctrl-c from the keyboard will cause io_getevents to fail with an
2826 // EINTR error code. This is not an error and so don't treat it as such,
2827 // but still log it.
2828 if (errno == EINTR) {
2829 logprintf(5, "Log: %s interrupted on disk %s (thread %d).\n",
2830 operations[op].op_str, device_name_.c_str(),
2833 os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2835 logprintf(0, "Hardware Error: Timeout doing async %s to sectors "
2836 "starting at %lld on disk %s (thread %d).\n",
2837 operations[op].op_str, offset / kSectorSize,
2838 device_name_.c_str(), thread_num_);
2841 // Don't bother checking return codes since io_cancel seems to always fail.
2842 // Since io_cancel is always failing, destroying and recreating an I/O
2843 // context is a workaround for canceling an in-progress I/O operation.
2844 // TODO(amistry): Find out why io_cancel isn't working and make it work.
2845 io_cancel(aio_ctx_, &cb, &event);
2846 io_destroy(aio_ctx_);
2848 if (io_setup(5, &aio_ctx_)) {
2849 logprintf(0, "Process Error: Unable to create aio context on disk %s"
2851 device_name_.c_str(), thread_num_);
2857 // event.res contains the number of bytes written/read or
2858 // error if < 0, I think.
2859 if (event.res != size) {
2861 os_->ErrorReport(device_name_.c_str(), operations[op].error_str, 1);
2863 if (event.res < 0) {
2864 switch (event.res) {
2866 logprintf(0, "Hardware Error: Low-level I/O error while doing %s to "
2867 "sectors starting at %lld on disk %s (thread %d).\n",
2868 operations[op].op_str, offset / kSectorSize,
2869 device_name_.c_str(), thread_num_);
2872 logprintf(0, "Hardware Error: Unknown error while doing %s to "
2873 "sectors starting at %lld on disk %s (thread %d).\n",
2874 operations[op].op_str, offset / kSectorSize,
2875 device_name_.c_str(), thread_num_);
2878 logprintf(0, "Hardware Error: Unable to %s to sectors starting at "
2879 "%lld on disk %s (thread %d).\n",
2880 operations[op].op_str, offset / kSectorSize,
2881 device_name_.c_str(), thread_num_);
2889 // Write a block to disk.
2890 bool DiskThread::WriteBlockToDisk(int fd, BlockData *block) {
2891 memset(block_buffer_, 0, block->GetSize());
2893 // Fill block buffer with a pattern
2894 struct page_entry pe;
2895 if (!sat_->GetValid(&pe)) {
2896 // Even though a valid page could not be obatined, it is not an error
2897 // since we can always fill in a pattern directly, albeit slower.
2898 unsigned int *memblock = static_cast<unsigned int *>(block_buffer_);
2899 block->SetPattern(patternlist_->GetRandomPattern());
2901 logprintf(11, "Log: Warning, using pattern fill fallback in "
2902 "DiskThread::WriteBlockToDisk on disk %s (thread %d).\n",
2903 device_name_.c_str(), thread_num_);
2905 for (int i = 0; i < block->GetSize()/wordsize_; i++) {
2906 memblock[i] = block->GetPattern()->pattern(i);
2909 memcpy(block_buffer_, pe.addr, block->GetSize());
2910 block->SetPattern(pe.pattern);
2911 sat_->PutValid(&pe);
2914 logprintf(12, "Log: Writing %lld sectors starting at %lld on disk %s"
2916 block->GetSize()/kSectorSize, block->GetAddress(),
2917 device_name_.c_str(), thread_num_);
2919 int64 start_time = GetTime();
2921 if (!AsyncDiskIO(ASYNC_IO_WRITE, fd, block_buffer_, block->GetSize(),
2922 block->GetAddress() * kSectorSize, write_timeout_)) {
2926 int64 end_time = GetTime();
2927 logprintf(12, "Log: Writing time: %lld us (thread %d).\n",
2928 end_time - start_time, thread_num_);
2929 if (end_time - start_time > write_threshold_) {
2930 logprintf(5, "Log: Write took %lld us which is longer than threshold "
2931 "%lld us on disk %s (thread %d).\n",
2932 end_time - start_time, write_threshold_, device_name_.c_str(),
2939 // Verify a block on disk.
2940 bool DiskThread::ValidateBlockOnDisk(int fd, BlockData *block) {
2941 int64 blocks = block->GetSize() / read_block_size_;
2942 int64 bytes_read = 0;
2943 int64 current_blocks;
2944 int64 current_bytes;
2945 uint64 address = block->GetAddress();
2947 logprintf(20, "Log: Reading sectors starting at %lld on disk %s "
2949 address, device_name_.c_str(), thread_num_);
2951 // Read block from disk and time the read. If it takes longer than the
2952 // threshold, complain.
2953 if (lseek(fd, address * kSectorSize, SEEK_SET) == -1) {
2954 logprintf(0, "Process Error: Unable to seek to sector %lld in "
2955 "DiskThread::ValidateSectorsOnDisk on disk %s "
2956 "(thread %d).\n", address, device_name_.c_str(), thread_num_);
2959 int64 start_time = GetTime();
2961 // Split a large write-sized block into small read-sized blocks and
2962 // read them in groups of randomly-sized multiples of read block size.
2963 // This assures all data written on disk by this particular block
2964 // will be tested using a random reading pattern.
2966 while (blocks != 0) {
2967 // Test all read blocks in a written block.
2968 current_blocks = (random() % blocks) + 1;
2969 current_bytes = current_blocks * read_block_size_;
2971 memset(block_buffer_, 0, current_bytes);
2973 logprintf(20, "Log: Reading %lld sectors starting at sector %lld on "
2974 "disk %s (thread %d)\n",
2975 current_bytes / kSectorSize,
2976 (address * kSectorSize + bytes_read) / kSectorSize,
2977 device_name_.c_str(), thread_num_);
2979 if (!AsyncDiskIO(ASYNC_IO_READ, fd, block_buffer_, current_bytes,
2980 address * kSectorSize + bytes_read,
2985 int64 end_time = GetTime();
2986 logprintf(20, "Log: Reading time: %lld us (thread %d).\n",
2987 end_time - start_time, thread_num_);
2988 if (end_time - start_time > read_threshold_) {
2989 logprintf(5, "Log: Read took %lld us which is longer than threshold "
2990 "%lld us on disk %s (thread %d).\n",
2991 end_time - start_time, read_threshold_,
2992 device_name_.c_str(), thread_num_);
2995 // In non-destructive mode, don't compare the block to the pattern since
2996 // the block was never written to disk in the first place.
2997 if (!non_destructive_) {
2998 if (CheckRegion(block_buffer_, block->GetPattern(), current_bytes,
3000 os_->ErrorReport(device_name_.c_str(), "disk-pattern-error", 1);
3002 logprintf(0, "Hardware Error: Pattern mismatch in block starting at "
3003 "sector %lld in DiskThread::ValidateSectorsOnDisk on "
3004 "disk %s (thread %d).\n",
3005 address, device_name_.c_str(), thread_num_);
3009 bytes_read += current_blocks * read_block_size_;
3010 blocks -= current_blocks;
3016 int DiskThread::Work() {
3019 logprintf(9, "Log: Starting disk thread %d, disk %s\n",
3020 thread_num_, device_name_.c_str());
3022 srandom(time(NULL));
3024 if (!OpenDevice(&fd)) {
3028 // Allocate a block buffer aligned to 512 bytes since the kernel requires it
3029 // when using direst IO.
3030 block_buffer_ = memalign(kBufferAlignment, write_block_size_);
3031 if (block_buffer_ == NULL) {
3033 logprintf(0, "Process Error: Unable to allocate memory for buffers "
3034 "for disk %s (thread %d).\n",
3035 device_name_.c_str(), thread_num_);
3039 if (io_setup(5, &aio_ctx_)) {
3040 logprintf(0, "Process Error: Unable to create aio context for disk %s"
3042 device_name_.c_str(), thread_num_);
3050 io_destroy(aio_ctx_);
3052 free(block_buffer_);
3054 logprintf(9, "Log: Completed %d (disk %s): disk thread status %d, "
3055 "%d pages copied\n",
3056 thread_num_, device_name_.c_str(), status_, pages_copied_);
3060 RandomDiskThread::RandomDiskThread(DiskBlockTable *block_table)
3061 : DiskThread(block_table) {
3062 update_block_table_ = 0;
3065 RandomDiskThread::~RandomDiskThread() {
3068 bool RandomDiskThread::DoWork(int fd) {
3070 blocks_written_ = 0;
3071 logprintf(11, "Random phase for disk %s (thread %d).\n",
3072 device_name_.c_str(), thread_num_);
3073 while (IsReadyToRun()) {
3074 BlockData *block = block_table_->GetRandomBlock();
3075 if (block == NULL) {
3076 logprintf(12, "No block available for device %s (thread %d).\n",
3077 device_name_.c_str(), thread_num_);
3079 ValidateBlockOnDisk(fd, block);
3080 block_table_->ReleaseBlock(block);
3084 pages_copied_ = blocks_read_;
3088 MemoryRegionThread::MemoryRegionThread() {
3089 error_injection_ = false;
3093 MemoryRegionThread::~MemoryRegionThread() {
3098 bool MemoryRegionThread::SetRegion(void *region, int64 size) {
3099 int plength = sat_->page_length();
3100 int npages = size / plength;
3101 if (size % plength) {
3102 logprintf(0, "Process Error: region size is not a multiple of SAT "
3108 pages_ = new PageEntryQueue(npages);
3109 char *base_addr = reinterpret_cast<char*>(region);
3110 region_ = base_addr;
3111 for (int i = 0; i < npages; i++) {
3112 struct page_entry pe;
3114 pe.addr = reinterpret_cast<void*>(base_addr + i * plength);
3115 pe.offset = i * plength;
3123 void MemoryRegionThread::ProcessError(struct ErrorRecord *error,
3125 const char *message) {
3126 uint32 buffer_offset;
3127 if (phase_ == kPhaseCopy) {
3128 // If the error occurred on the Copy Phase, it means that
3129 // the source data (i.e., the main memory) is wrong. so
3130 // just pass it to the original ProcessError to call a
3132 WorkerThread::ProcessError(error, priority, message);
3133 } else if (phase_ == kPhaseCheck) {
3134 // A error on the Check Phase means that the memory region tested
3135 // has an error. Gathering more information and then reporting
3137 // Determine if this is a write or read error.
3138 os_->Flush(error->vaddr);
3139 error->reread = *(error->vaddr);
3140 char *good = reinterpret_cast<char*>(&(error->expected));
3141 char *bad = reinterpret_cast<char*>(&(error->actual));
3142 sat_assert(error->expected != error->actual);
3143 unsigned int offset = 0;
3144 for (offset = 0; offset < (sizeof(error->expected) - 1); offset++) {
3145 if (good[offset] != bad[offset])
3149 error->vbyteaddr = reinterpret_cast<char*>(error->vaddr) + offset;
3151 buffer_offset = error->vbyteaddr - region_;
3153 // Find physical address if possible.
3154 error->paddr = os_->VirtualToPhysical(error->vbyteaddr);
3156 "%s: miscompare on %s, CRC check at %p(0x%llx), "
3157 "offset %llx: read:0x%016llx, reread:0x%016llx "
3158 "expected:0x%016llx\n",
3160 identifier_.c_str(),
3168 logprintf(0, "Process Error: memory region thread raised an "
3169 "unexpected error.");
3173 int MemoryRegionThread::Work() {
3174 struct page_entry source_pe;
3175 struct page_entry memregion_pe;
3178 const uint64 error_constant = 0x00ba00000000ba00LL;
3180 // For error injection.
3185 logprintf(9, "Log: Starting Memory Region thread %d\n", thread_num_);
3187 while (IsReadyToRun()) {
3188 // Getting pages from SAT and queue.
3189 phase_ = kPhaseNoPhase;
3190 result &= sat_->GetValid(&source_pe);
3192 logprintf(0, "Process Error: memory region thread failed to pop "
3193 "pages from SAT, bailing\n");
3197 result &= pages_->PopRandom(&memregion_pe);
3199 logprintf(0, "Process Error: memory region thread failed to pop "
3200 "pages from queue, bailing\n");
3204 // Error injection for CRC copy.
3205 if ((sat_->error_injection() || error_injection_) && loops == 1) {
3206 addr = reinterpret_cast<int64*>(source_pe.addr);
3207 offset = random() % (sat_->page_length() / wordsize_);
3208 data = addr[offset];
3209 addr[offset] = error_constant;
3212 // Copying SAT page into memory region.
3213 phase_ = kPhaseCopy;
3214 CrcCopyPage(&memregion_pe, &source_pe);
3215 memregion_pe.pattern = source_pe.pattern;
3217 // Error injection for CRC Check.
3218 if ((sat_->error_injection() || error_injection_) && loops == 2) {
3219 addr = reinterpret_cast<int64*>(memregion_pe.addr);
3220 offset = random() % (sat_->page_length() / wordsize_);
3221 data = addr[offset];
3222 addr[offset] = error_constant;
3225 // Checking page content in memory region.
3226 phase_ = kPhaseCheck;
3227 CrcCheckPage(&memregion_pe);
3229 phase_ = kPhaseNoPhase;
3230 // Storing pages on their proper queues.
3231 result &= sat_->PutValid(&source_pe);
3233 logprintf(0, "Process Error: memory region thread failed to push "
3234 "pages into SAT, bailing\n");
3237 result &= pages_->Push(&memregion_pe);
3239 logprintf(0, "Process Error: memory region thread failed to push "
3240 "pages into queue, bailing\n");
3244 if ((sat_->error_injection() || error_injection_) &&
3245 loops >= 1 && loops <= 2) {
3246 addr[offset] = data;
3253 pages_copied_ = loops;
3255 logprintf(9, "Log: Completed %d: Memory Region thread. Status %d, %d "
3256 "pages checked\n", thread_num_, status_, pages_copied_);