1 // Copyright 2006 Google Inc. All Rights Reserved.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 // worker.h : worker thread interface
17 // This file contains the Worker Thread class interface
18 // for the SAT test. Worker Threads implement a repetative
19 // task used to test or stress the system.
21 #ifndef STRESSAPPTEST_WORKER_H_
22 #define STRESSAPPTEST_WORKER_H_
27 #include <sys/types.h>
29 #include <linux/aio_abi.h>
36 // This file must work with autoconf on its public version,
37 // so these includes are correct.
38 #include "disk_blocks.h"
43 // Global Datastruture shared by the Cache Coherency Worker Threads.
44 struct cc_cacheline_data {
49 // (Other workflows may be possible, see function comments for details.)
50 // - Control thread creates object.
51 // - Control thread calls AddWorkers(1) for each worker thread.
52 // - Control thread calls Initialize().
53 // - Control thread launches worker threads.
54 // - Every worker thread frequently calls ContinueRunning().
55 // - Control thread periodically calls PauseWorkers(), effectively sleeps, and
56 // then calls ResumeWorkers().
57 // - Some worker threads may exit early, before StopWorkers() is called. They
58 // call RemoveSelf() after their last call to ContinueRunning().
59 // - Control thread eventually calls StopWorkers().
60 // - Worker threads exit.
61 // - Control thread joins worker threads.
62 // - Control thread calls Destroy().
63 // - Control thread destroys object.
66 // - ContinueRunning() may be called concurrently by different workers, but not
67 // by a single worker.
68 // - No other methods may ever be called concurrently, with themselves or
70 // - This object may be used by multiple threads only between Initialize() and
73 // TODO(matthewb): Move this class and its unittest to their own files.
76 //--------------------------------
77 // Methods for the control thread.
78 //--------------------------------
80 WorkerStatus() : num_workers_(0), status_(RUN) {}
82 // Called by the control thread to increase the worker count. Must be called
83 // before Initialize(). The worker count is 0 upon object initialization.
84 void AddWorkers(int num_new_workers) {
85 // No need to lock num_workers_mutex_ because this is before Initialize().
86 num_workers_ += num_new_workers;
89 // Called by the control thread. May not be called multiple times. If
90 // called, Destroy() must be called before destruction.
93 // Called by the control thread after joining all worker threads. Must be
94 // called iff Initialize() was called. No methods may be called after calling
98 // Called by the control thread to tell the workers to pause. Does not return
99 // until all workers have called ContinueRunning() or RemoveSelf(). May only
100 // be called between Initialize() and Stop(). Must not be called multiple
101 // times without ResumeWorkers() having been called inbetween.
104 // Called by the control thread to tell the workers to resume from a pause.
105 // May only be called between Initialize() and Stop(). May only be called
106 // directly after PauseWorkers().
107 void ResumeWorkers();
109 // Called by the control thread to tell the workers to stop. May only be
110 // called between Initialize() and Destroy(). May only be called once.
113 //--------------------------------
114 // Methods for the worker threads.
115 //--------------------------------
117 // Called by worker threads to decrease the worker count by one. May only be
118 // called between Initialize() and Destroy(). May wait for ResumeWorkers()
119 // when called after PauseWorkers().
122 // Called by worker threads between Initialize() and Destroy(). May be called
123 // any number of times. Return value is whether or not the worker should
124 // continue running. When called after PauseWorkers(), does not return until
125 // ResumeWorkers() or StopWorkers() has been called. Number of distinct
126 // calling threads must match the worker count (see AddWorkers() and
128 bool ContinueRunning();
130 // TODO(matthewb): Is this functionality really necessary? Remove it if not.
132 // This is a hack! It's like ContinueRunning(), except it won't pause. If
133 // any worker threads use this exclusively in place of ContinueRunning() then
134 // PauseWorkers() should never be used!
135 bool ContinueRunningNoPause();
138 enum Status { RUN, PAUSE, STOP };
140 void WaitOnPauseBarrier() {
141 int error = pthread_barrier_wait(&pause_barrier_);
142 if (error != PTHREAD_BARRIER_SERIAL_THREAD)
143 sat_assert(error == 0);
146 void AcquireNumWorkersLock() {
147 sat_assert(0 == pthread_mutex_lock(&num_workers_mutex_));
150 void ReleaseNumWorkersLock() {
151 sat_assert(0 == pthread_mutex_unlock(&num_workers_mutex_));
154 void AcquireStatusReadLock() {
155 sat_assert(0 == pthread_rwlock_rdlock(&status_rwlock_));
158 void AcquireStatusWriteLock() {
159 sat_assert(0 == pthread_rwlock_wrlock(&status_rwlock_));
162 void ReleaseStatusLock() {
163 sat_assert(0 == pthread_rwlock_unlock(&status_rwlock_));
167 AcquireStatusReadLock();
168 Status status = status_;
173 // Returns the previous status.
174 Status SetStatus(Status status) {
175 AcquireStatusWriteLock();
176 Status prev_status = status_;
182 pthread_mutex_t num_workers_mutex_;
185 pthread_rwlock_t status_rwlock_;
188 // Guaranteed to not be in use when (status_ != PAUSE).
189 pthread_barrier_t pause_barrier_;
191 DISALLOW_COPY_AND_ASSIGN(WorkerStatus);
195 // This is a base class for worker threads.
196 // Each thread repeats a specific
197 // task on various blocks of memory.
200 // Enum to mark a thread as low/med/high priority.
207 virtual ~WorkerThread();
209 // Initialize values and thread ID number.
210 void InitThread(int thread_num_init,
212 class OsLayer *os_init,
213 class PatternList *patternlist_init,
214 WorkerStatus *worker_status);
216 // This function is DEPRECATED, it does nothing.
217 void SetPriority(Priority priority) { priority_ = priority; }
218 // Spawn the worker thread, by running Work().
220 // Only for ThreadSpawnerGeneric().
224 // Wait for the thread to complete its cleanup.
225 virtual int JoinThread();
226 // Kill worker thread with SIGINT.
227 virtual int KillThread();
229 // This is the task function that the thread executes.
230 // This is implemented per subclass.
233 // Starts per-WorkerThread timer.
234 void StartThreadTimer() {gettimeofday(&start_time_, NULL);}
235 // Reads current timer value and returns run duration without recording it.
236 int64 ReadThreadTimer() {
237 struct timeval end_time_;
238 gettimeofday(&end_time_, NULL);
239 return (end_time_.tv_sec - start_time_.tv_sec)*1000000 +
240 (end_time_.tv_usec - start_time_.tv_usec);
242 // Stops per-WorkerThread timer and records thread run duration.
243 // Start/Stop ThreadTimer repetitively has cumulative effect, ie the timer
244 // is effectively paused and restarted, so runduration_usec accumulates on.
245 void StopThreadTimer() {
246 runduration_usec_ += ReadThreadTimer();
249 // Acccess member variables.
250 int GetStatus() {return status_;}
251 int64 GetErrorCount() {return errorcount_;}
252 int64 GetPageCount() {return pages_copied_;}
253 int64 GetRunDurationUSec() {return runduration_usec_;}
255 // Returns bandwidth defined as pages_copied / thread_run_durations.
256 float GetCopiedData();
257 // Calculate worker thread specific copied data.
258 virtual float GetMemoryCopiedData() {return 0;}
259 virtual float GetDeviceCopiedData() {return 0;}
260 // Calculate worker thread specific bandwidth.
261 virtual float GetMemoryBandwidth()
262 {return GetMemoryCopiedData() / (
263 runduration_usec_ * 1.0 / 1000000);}
264 virtual float GetDeviceBandwidth()
265 {return GetDeviceCopiedData() / (
266 runduration_usec_ * 1.0 / 1000000);}
268 void set_cpu_mask(int32 mask) {cpu_mask_ = mask;}
269 void set_tag(int32 tag) {tag_ = tag;}
271 // Returns CPU mask, where each bit represents a logical cpu.
272 uint32 AvailableCpus();
273 // Returns CPU mask of CPUs this thread is bound to,
274 uint32 CurrentCpus();
276 int ThreadID() {return thread_num_;}
278 // Bind worker thread to specified CPU(s)
279 bool BindToCpus(uint32 thread_mask);
282 // This function dictates whether the main work loop
283 // continues, waits, or terminates.
284 // All work loops should be of the form:
287 // } while (IsReadyToRun());
288 virtual bool IsReadyToRun() { return worker_status_->ContinueRunning(); }
289 // TODO(matthewb): Is this function really necessary? Remove it if not.
291 // Like IsReadyToRun(), except it won't pause.
292 virtual bool IsReadyToRunNoPause() {
293 return worker_status_->ContinueRunningNoPause();
296 // These are functions used by the various work loops.
297 // Pretty print and log a data miscompare.
298 virtual void ProcessError(struct ErrorRecord *er,
300 const char *message);
302 // Compare a region of memory with a known data patter, and report errors.
303 virtual int CheckRegion(void *addr,
307 int64 patternoffset);
309 // Fast compare a block of memory.
310 virtual int CrcCheckPage(struct page_entry *srcpe);
312 // Fast copy a block of memory, while verifying correctness.
313 virtual int CrcCopyPage(struct page_entry *dstpe,
314 struct page_entry *srcpe);
316 // Fast copy a block of memory, while verifying correctness, and heating CPU.
317 virtual int CrcWarmCopyPage(struct page_entry *dstpe,
318 struct page_entry *srcpe);
320 // Fill a page with its specified pattern.
321 virtual bool FillPage(struct page_entry *pe);
323 // Copy with address tagging.
324 virtual bool AdlerAddrMemcpyC(uint64 *dstmem64,
326 unsigned int size_in_bytes,
327 AdlerChecksum *checksum,
328 struct page_entry *pe);
329 // Crc data with address tagging.
330 virtual bool AdlerAddrCrcC(uint64 *srcmem64,
331 unsigned int size_in_bytes,
332 AdlerChecksum *checksum,
333 struct page_entry *pe);
334 // Report a mistagged cacheline.
335 bool ReportTagError(uint64 *mem64,
338 // Print out the error record of the tag mismatch.
339 void ProcessTagError(struct ErrorRecord *error,
341 const char *message);
343 // A worker thread can yield itself to give up CPU until it's scheduled again
347 // General state variables that all subclasses need.
348 int thread_num_; // Thread ID.
349 volatile int status_; // Error status.
350 volatile int64 pages_copied_; // Recorded for memory bandwidth calc.
351 volatile int64 errorcount_; // Miscompares seen by this thread.
353 volatile uint32 cpu_mask_; // Cores this thread is allowed to run on.
354 volatile uint32 tag_; // Tag hint for memory this thread can use.
356 bool tag_mode_; // Tag cachelines with vaddr.
358 // Thread timing variables.
359 struct timeval start_time_; // Worker thread start time.
360 volatile int64 runduration_usec_; // Worker run duration in u-seconds.
362 // Function passed to pthread_create.
363 void *(*thread_spawner_)(void *args);
364 pthread_t thread_; // Pthread thread ID.
365 Priority priority_; // Worker thread priority.
366 class Sat *sat_; // Reference to parent stest object.
367 class OsLayer *os_; // Os abstraction: put hacks here.
368 class PatternList *patternlist_; // Reference to data patterns.
370 // Work around style guide ban on sizeof(int).
371 static const uint64 iamint_ = 0;
372 static const int wordsize_ = sizeof(iamint_);
375 WorkerStatus *worker_status_;
377 DISALLOW_COPY_AND_ASSIGN(WorkerThread);
380 // Worker thread to perform File IO.
381 class FileThread : public WorkerThread {
384 // Set filename to use for file IO.
385 virtual void SetFile(const char *filename_init);
388 // Calculate worker thread specific bandwidth.
389 virtual float GetDeviceCopiedData()
390 {return GetCopiedData()*2;}
391 virtual float GetMemoryCopiedData();
394 // Record of where these pages were sourced from, and what
395 // potentially broken components they passed through.
397 struct Pattern *pattern; // This is the data it should contain.
398 void *src; // This is the memory location the data was sourced from.
399 void *dst; // This is where it ended up.
402 // These are functions used by the various work loops.
403 // Pretty print and log a data miscompare. Disks require
404 // slightly different error handling.
405 virtual void ProcessError(struct ErrorRecord *er,
407 const char *message);
409 virtual bool OpenFile(int *pfile);
410 virtual bool CloseFile(int fd);
412 // Read and write whole file to disk.
413 virtual bool WritePages(int fd);
414 virtual bool ReadPages(int fd);
416 // Read and write pages to disk.
417 virtual bool WritePageToFile(int fd, struct page_entry *src);
418 virtual bool ReadPageFromFile(int fd, struct page_entry *dst);
420 // Sector tagging support.
421 virtual bool SectorTagPage(struct page_entry *src, int block);
422 virtual bool SectorValidatePage(const struct PageRec &page,
423 struct page_entry *dst,
426 // Get memory for an incoming data transfer..
427 virtual bool PagePrepare();
428 // Remove memory allocated for data transfer.
429 virtual bool PageTeardown();
431 // Get memory for an incoming data transfer..
432 virtual bool GetEmptyPage(struct page_entry *dst);
433 // Get memory for an outgoing data transfer..
434 virtual bool GetValidPage(struct page_entry *dst);
435 // Throw out a used empty page.
436 virtual bool PutEmptyPage(struct page_entry *src);
437 // Throw out a used, filled page.
438 virtual bool PutValidPage(struct page_entry *src);
441 struct PageRec *page_recs_; // Array of page records.
442 int crc_page_; // Page currently being CRC checked.
443 string filename_; // Name of file to access.
444 string devicename_; // Name of device file is on.
446 bool page_io_; // Use page pool for IO.
447 void *local_page_; // malloc'd page fon non-pool IO.
448 int pass_; // Number of writes to the file so far.
450 // Tag to detect file corruption.
452 volatile uint8 magic;
453 volatile uint8 block;
454 volatile uint8 sector;
459 DISALLOW_COPY_AND_ASSIGN(FileThread);
463 // Worker thread to perform Network IO.
464 class NetworkThread : public WorkerThread {
467 // Set hostname to use for net IO.
468 virtual void SetIP(const char *ipaddr_init);
471 // Calculate worker thread specific bandwidth.
472 virtual float GetDeviceCopiedData()
473 {return GetCopiedData()*2;}
476 // IsReadyToRunNoPause() wrapper, for NetworkSlaveThread to override.
477 virtual bool IsNetworkStopSet();
478 virtual bool CreateSocket(int *psocket);
479 virtual bool CloseSocket(int sock);
480 virtual bool Connect(int sock);
481 virtual bool SendPage(int sock, struct page_entry *src);
482 virtual bool ReceivePage(int sock, struct page_entry *dst);
487 DISALLOW_COPY_AND_ASSIGN(NetworkThread);
490 // Worker thread to reflect Network IO.
491 class NetworkSlaveThread : public NetworkThread {
493 NetworkSlaveThread();
494 // Set socket for IO.
495 virtual void SetSock(int sock);
499 virtual bool IsNetworkStopSet();
502 DISALLOW_COPY_AND_ASSIGN(NetworkSlaveThread);
505 // Worker thread to detect incoming Network IO.
506 class NetworkListenThread : public NetworkThread {
508 NetworkListenThread();
512 virtual bool Listen();
514 virtual bool GetConnection(int *pnewsock);
515 virtual bool SpawnSlave(int newsock, int threadid);
516 virtual bool ReapSlaves();
518 // For serviced incoming connections.
521 NetworkSlaveThread thread;
523 typedef vector<ChildWorker*> ChildVector;
524 ChildVector child_workers_;
526 DISALLOW_COPY_AND_ASSIGN(NetworkListenThread);
529 // Worker thread to perform Memory Copy.
530 class CopyThread : public WorkerThread {
534 // Calculate worker thread specific bandwidth.
535 virtual float GetMemoryCopiedData()
536 {return GetCopiedData()*2;}
539 DISALLOW_COPY_AND_ASSIGN(CopyThread);
542 // Worker thread to perform Memory Invert.
543 class InvertThread : public WorkerThread {
547 // Calculate worker thread specific bandwidth.
548 virtual float GetMemoryCopiedData()
549 {return GetCopiedData()*4;}
552 virtual int InvertPageUp(struct page_entry *srcpe);
553 virtual int InvertPageDown(struct page_entry *srcpe);
554 DISALLOW_COPY_AND_ASSIGN(InvertThread);
557 // Worker thread to fill blank pages on startup.
558 class FillThread : public WorkerThread {
561 // Set how many pages this thread should fill before exiting.
562 virtual void SetFillPages(int64 num_pages_to_fill_init);
566 // Fill a page with the data pattern in pe->pattern.
567 virtual bool FillPageRandom(struct page_entry *pe);
568 int64 num_pages_to_fill_;
569 DISALLOW_COPY_AND_ASSIGN(FillThread);
572 // Worker thread to verify page data matches pattern data.
573 // Thread will check and replace pages until "done" flag is set,
574 // then it will check and discard pages until no more remain.
575 class CheckThread : public WorkerThread {
579 // Calculate worker thread specific bandwidth.
580 virtual float GetMemoryCopiedData()
581 {return GetCopiedData();}
584 DISALLOW_COPY_AND_ASSIGN(CheckThread);
588 // Worker thread to poll for system error messages.
589 // Thread will check for messages until "done" flag is set.
590 class ErrorPollThread : public WorkerThread {
596 DISALLOW_COPY_AND_ASSIGN(ErrorPollThread);
599 // Computation intensive worker thread to stress CPU.
600 class CpuStressThread : public WorkerThread {
606 DISALLOW_COPY_AND_ASSIGN(CpuStressThread);
609 // Worker thread that tests the correctness of the
610 // CPU Cache Coherency Protocol.
611 class CpuCacheCoherencyThread : public WorkerThread {
613 CpuCacheCoherencyThread(cc_cacheline_data *cc_data,
614 int cc_cacheline_count_,
620 cc_cacheline_data *cc_cacheline_data_; // Datstructure for each cacheline.
621 int cc_local_num_; // Local counter for each thread.
622 int cc_cacheline_count_; // Number of cache lines to operate on.
623 int cc_thread_num_; // The integer id of the thread which is
624 // used as an index into the integer array
625 // of the cacheline datastructure.
626 int cc_inc_count_; // Number of times to increment the counter.
629 DISALLOW_COPY_AND_ASSIGN(CpuCacheCoherencyThread);
632 // Worker thread to perform disk test.
633 class DiskThread : public WorkerThread {
635 explicit DiskThread(DiskBlockTable *block_table);
636 virtual ~DiskThread();
637 // Calculate disk thread specific bandwidth.
638 virtual float GetDeviceCopiedData() {
639 return (blocks_written_ * write_block_size_ +
640 blocks_read_ * read_block_size_) / kMegabyte;}
642 // Set filename for device file (in /dev).
643 virtual void SetDevice(const char *device_name);
644 // Set various parameters that control the behaviour of the test.
645 virtual bool SetParameters(int read_block_size,
646 int write_block_size,
649 int blocks_per_segment,
650 int64 read_threshold,
651 int64 write_threshold,
652 int non_destructive);
656 virtual float GetMemoryCopiedData() {return 0;}
659 static const int kSectorSize = 512; // Size of sector on disk.
660 static const int kBufferAlignment = 512; // Buffer alignment required by the
662 static const int kBlockRetry = 100; // Number of retries to allocate
670 virtual bool OpenDevice(int *pfile);
671 virtual bool CloseDevice(int fd);
673 // Retrieves the size (in bytes) of the disk/file.
674 virtual bool GetDiskSize(int fd);
676 // Retrieves the current time in microseconds.
677 virtual int64 GetTime();
679 // Do an asynchronous disk I/O operation.
680 virtual bool AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
681 int64 offset, int64 timeout);
683 // Write a block to disk.
684 virtual bool WriteBlockToDisk(int fd, BlockData *block);
686 // Verify a block on disk.
687 virtual bool ValidateBlockOnDisk(int fd, BlockData *block);
690 virtual bool DoWork(int fd);
692 int read_block_size_; // Size of blocks read from disk, in bytes.
693 int write_block_size_; // Size of blocks written to disk, in bytes.
694 int64 blocks_read_; // Number of blocks read in work loop.
695 int64 blocks_written_; // Number of blocks written in work loop.
696 int64 segment_size_; // Size of disk segments (in bytes) that the disk
697 // will be split into where testing can be
698 // confined to a particular segment.
699 // Allows for control of how evenly the disk will
700 // be tested. Smaller segments imply more even
701 // testing (less random).
702 int blocks_per_segment_; // Number of blocks that will be tested per
704 int cache_size_; // Size of disk cache, in bytes.
705 int queue_size_; // Length of in-flight-blocks queue, in blocks.
706 int non_destructive_; // Use non-destructive mode or not.
707 int update_block_table_; // If true, assume this is the thread
708 // responsible for writing the data in the disk
709 // for this block device and, therefore,
710 // update the block table. If false, just use
711 // the block table to get data.
713 // read/write times threshold for reporting a problem
714 int64 read_threshold_; // Maximum time a read should take (in us) before
715 // a warning is given.
716 int64 write_threshold_; // Maximum time a write should take (in us) before
717 // a warning is given.
718 int64 read_timeout_; // Maximum time a read can take before a timeout
719 // and the aborting of the read operation.
720 int64 write_timeout_; // Maximum time a write can take before a timeout
721 // and the aborting of the write operation.
723 string device_name_; // Name of device file to access.
724 int64 device_sectors_; // Number of sectors on the device.
726 std::queue<BlockData*> in_flight_sectors_; // Queue of sectors written but
728 void *block_buffer_; // Pointer to aligned block buffer.
730 aio_context_t aio_ctx_; // Asynchronous I/O context for Linux native AIO.
732 DiskBlockTable *block_table_; // Disk Block Table, shared by all disk
733 // threads that read / write at the same
736 DISALLOW_COPY_AND_ASSIGN(DiskThread);
739 class RandomDiskThread : public DiskThread {
741 explicit RandomDiskThread(DiskBlockTable *block_table);
742 virtual ~RandomDiskThread();
744 virtual bool DoWork(int fd);
746 DISALLOW_COPY_AND_ASSIGN(RandomDiskThread);
749 // Worker thread to perform checks in a specific memory region.
750 class MemoryRegionThread : public WorkerThread {
752 MemoryRegionThread();
753 ~MemoryRegionThread();
755 void ProcessError(struct ErrorRecord *error, int priority,
756 const char *message);
757 bool SetRegion(void *region, int64 size);
758 // Calculate worker thread specific bandwidth.
759 virtual float GetMemoryCopiedData()
760 {return GetCopiedData();}
761 virtual float GetDeviceCopiedData()
762 {return GetCopiedData() * 2;}
763 void SetIdentifier(string identifier) {
764 identifier_ = identifier;
768 // Page queue for this particular memory region.
770 PageEntryQueue *pages_;
771 bool error_injection_;
774 static const int kPhaseNoPhase = 0;
775 static const int kPhaseCopy = 1;
776 static const int kPhaseCheck = 2;
779 DISALLOW_COPY_AND_ASSIGN(MemoryRegionThread);
782 #endif // STRESSAPPTEST_WORKER_H_