1 // Copyright 2006 Google Inc. All Rights Reserved.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 // worker.h : worker thread interface
17 // This file contains the Worker Thread class interface
18 // for the SAT test. Worker Threads implement a repetative
19 // task used to test or stress the system.
21 #ifndef STRESSAPPTEST_WORKER_H_
22 #define STRESSAPPTEST_WORKER_H_
27 #include <sys/types.h>
38 // This file must work with autoconf on its public version,
39 // so these includes are correct.
40 #include "disk_blocks.h"
45 // Global Datastruture shared by the Cache Coherency Worker Threads.
46 struct cc_cacheline_data {
51 // (Other workflows may be possible, see function comments for details.)
52 // - Control thread creates object.
53 // - Control thread calls AddWorkers(1) for each worker thread.
54 // - Control thread calls Initialize().
55 // - Control thread launches worker threads.
56 // - Every worker thread frequently calls ContinueRunning().
57 // - Control thread periodically calls PauseWorkers(), effectively sleeps, and
58 // then calls ResumeWorkers().
59 // - Some worker threads may exit early, before StopWorkers() is called. They
60 // call RemoveSelf() after their last call to ContinueRunning().
61 // - Control thread eventually calls StopWorkers().
62 // - Worker threads exit.
63 // - Control thread joins worker threads.
64 // - Control thread calls Destroy().
65 // - Control thread destroys object.
68 // - ContinueRunning() may be called concurrently by different workers, but not
69 // by a single worker.
70 // - No other methods may ever be called concurrently, with themselves or
72 // - This object may be used by multiple threads only between Initialize() and
75 // TODO(matthewb): Move this class and its unittest to their own files.
78 //--------------------------------
79 // Methods for the control thread.
80 //--------------------------------
82 WorkerStatus() : num_workers_(0), status_(RUN) {}
84 // Called by the control thread to increase the worker count. Must be called
85 // before Initialize(). The worker count is 0 upon object initialization.
86 void AddWorkers(int num_new_workers) {
87 // No need to lock num_workers_mutex_ because this is before Initialize().
88 num_workers_ += num_new_workers;
91 // Called by the control thread. May not be called multiple times. If
92 // called, Destroy() must be called before destruction.
95 // Called by the control thread after joining all worker threads. Must be
96 // called iff Initialize() was called. No methods may be called after calling
100 // Called by the control thread to tell the workers to pause. Does not return
101 // until all workers have called ContinueRunning() or RemoveSelf(). May only
102 // be called between Initialize() and Stop(). Must not be called multiple
103 // times without ResumeWorkers() having been called inbetween.
106 // Called by the control thread to tell the workers to resume from a pause.
107 // May only be called between Initialize() and Stop(). May only be called
108 // directly after PauseWorkers().
109 void ResumeWorkers();
111 // Called by the control thread to tell the workers to stop. May only be
112 // called between Initialize() and Destroy(). May only be called once.
115 //--------------------------------
116 // Methods for the worker threads.
117 //--------------------------------
119 // Called by worker threads to decrease the worker count by one. May only be
120 // called between Initialize() and Destroy(). May wait for ResumeWorkers()
121 // when called after PauseWorkers().
124 // Called by worker threads between Initialize() and Destroy(). May be called
125 // any number of times. Return value is whether or not the worker should
126 // continue running. When called after PauseWorkers(), does not return until
127 // ResumeWorkers() or StopWorkers() has been called. Number of distinct
128 // calling threads must match the worker count (see AddWorkers() and
130 bool ContinueRunning();
132 // TODO(matthewb): Is this functionality really necessary? Remove it if not.
134 // This is a hack! It's like ContinueRunning(), except it won't pause. If
135 // any worker threads use this exclusively in place of ContinueRunning() then
136 // PauseWorkers() should never be used!
137 bool ContinueRunningNoPause();
140 enum Status { RUN, PAUSE, STOP };
142 void WaitOnPauseBarrier() {
143 #ifdef HAVE_PTHREAD_BARRIERS
144 int error = pthread_barrier_wait(&pause_barrier_);
145 if (error != PTHREAD_BARRIER_SERIAL_THREAD)
146 sat_assert(error == 0);
150 void AcquireNumWorkersLock() {
151 sat_assert(0 == pthread_mutex_lock(&num_workers_mutex_));
154 void ReleaseNumWorkersLock() {
155 sat_assert(0 == pthread_mutex_unlock(&num_workers_mutex_));
158 void AcquireStatusReadLock() {
159 sat_assert(0 == pthread_rwlock_rdlock(&status_rwlock_));
162 void AcquireStatusWriteLock() {
163 sat_assert(0 == pthread_rwlock_wrlock(&status_rwlock_));
166 void ReleaseStatusLock() {
167 sat_assert(0 == pthread_rwlock_unlock(&status_rwlock_));
171 AcquireStatusReadLock();
172 Status status = status_;
177 // Returns the previous status.
178 Status SetStatus(Status status) {
179 AcquireStatusWriteLock();
180 Status prev_status = status_;
186 pthread_mutex_t num_workers_mutex_;
189 pthread_rwlock_t status_rwlock_;
192 #ifdef HAVE_PTHREAD_BARRIERS
193 // Guaranteed to not be in use when (status_ != PAUSE).
194 pthread_barrier_t pause_barrier_;
197 DISALLOW_COPY_AND_ASSIGN(WorkerStatus);
201 // This is a base class for worker threads.
202 // Each thread repeats a specific
203 // task on various blocks of memory.
206 // Enum to mark a thread as low/med/high priority.
213 virtual ~WorkerThread();
215 // Initialize values and thread ID number.
216 virtual void InitThread(int thread_num_init,
218 class OsLayer *os_init,
219 class PatternList *patternlist_init,
220 WorkerStatus *worker_status);
222 // This function is DEPRECATED, it does nothing.
223 void SetPriority(Priority priority) { priority_ = priority; }
224 // Spawn the worker thread, by running Work().
226 // Only for ThreadSpawnerGeneric().
230 // Wait for the thread to complete its cleanup.
231 virtual bool JoinThread();
232 // Kill worker thread with SIGINT.
233 virtual bool KillThread();
235 // This is the task function that the thread executes.
236 // This is implemented per subclass.
239 // Starts per-WorkerThread timer.
240 void StartThreadTimer() {gettimeofday(&start_time_, NULL);}
241 // Reads current timer value and returns run duration without recording it.
242 int64 ReadThreadTimer() {
243 struct timeval end_time_;
244 gettimeofday(&end_time_, NULL);
245 return (end_time_.tv_sec - start_time_.tv_sec)*1000000 +
246 (end_time_.tv_usec - start_time_.tv_usec);
248 // Stops per-WorkerThread timer and records thread run duration.
249 // Start/Stop ThreadTimer repetitively has cumulative effect, ie the timer
250 // is effectively paused and restarted, so runduration_usec accumulates on.
251 void StopThreadTimer() {
252 runduration_usec_ += ReadThreadTimer();
255 // Acccess member variables.
256 bool GetStatus() {return status_;}
257 int64 GetErrorCount() {return errorcount_;}
258 int64 GetPageCount() {return pages_copied_;}
259 int64 GetRunDurationUSec() {return runduration_usec_;}
261 // Returns bandwidth defined as pages_copied / thread_run_durations.
262 virtual float GetCopiedData();
263 // Calculate worker thread specific copied data.
264 virtual float GetMemoryCopiedData() {return 0;}
265 virtual float GetDeviceCopiedData() {return 0;}
266 // Calculate worker thread specific bandwidth.
267 virtual float GetMemoryBandwidth()
268 {return GetMemoryCopiedData() / (
269 runduration_usec_ * 1.0 / 1000000);}
270 virtual float GetDeviceBandwidth()
271 {return GetDeviceCopiedData() / (
272 runduration_usec_ * 1.0 / 1000000);}
274 void set_cpu_mask(cpu_set_t *mask) {
275 memcpy(&cpu_mask_, mask, sizeof(*mask));
278 void set_cpu_mask_to_cpu(int cpu_num) {
279 cpuset_set_ab(&cpu_mask_, cpu_num, cpu_num + 1);
282 void set_tag(int32 tag) {tag_ = tag;}
284 // Returns CPU mask, where each bit represents a logical cpu.
285 bool AvailableCpus(cpu_set_t *cpuset);
286 // Returns CPU mask of CPUs this thread is bound to,
287 bool CurrentCpus(cpu_set_t *cpuset);
288 // Returns Current Cpus mask as string.
289 string CurrentCpusFormat() {
290 cpu_set_t current_cpus;
291 CurrentCpus(¤t_cpus);
292 return cpuset_format(¤t_cpus);
295 int ThreadID() {return thread_num_;}
297 // Bind worker thread to specified CPU(s)
298 bool BindToCpus(const cpu_set_t *cpuset);
301 // This function dictates whether the main work loop
302 // continues, waits, or terminates.
303 // All work loops should be of the form:
306 // } while (IsReadyToRun());
307 virtual bool IsReadyToRun() { return worker_status_->ContinueRunning(); }
308 // TODO(matthewb): Is this function really necessary? Remove it if not.
310 // Like IsReadyToRun(), except it won't pause.
311 virtual bool IsReadyToRunNoPause() {
312 return worker_status_->ContinueRunningNoPause();
315 // These are functions used by the various work loops.
316 // Pretty print and log a data miscompare.
317 virtual void ProcessError(struct ErrorRecord *er,
319 const char *message);
321 // Compare a region of memory with a known data patter, and report errors.
322 virtual int CheckRegion(void *addr,
326 int64 patternoffset);
328 // Fast compare a block of memory.
329 virtual int CrcCheckPage(struct page_entry *srcpe);
331 // Fast copy a block of memory, while verifying correctness.
332 virtual int CrcCopyPage(struct page_entry *dstpe,
333 struct page_entry *srcpe);
335 // Fast copy a block of memory, while verifying correctness, and heating CPU.
336 virtual int CrcWarmCopyPage(struct page_entry *dstpe,
337 struct page_entry *srcpe);
339 // Fill a page with its specified pattern.
340 virtual bool FillPage(struct page_entry *pe);
342 // Copy with address tagging.
343 virtual bool AdlerAddrMemcpyC(uint64 *dstmem64,
345 unsigned int size_in_bytes,
346 AdlerChecksum *checksum,
347 struct page_entry *pe);
348 // SSE copy with address tagging.
349 virtual bool AdlerAddrMemcpyWarm(uint64 *dstmem64,
351 unsigned int size_in_bytes,
352 AdlerChecksum *checksum,
353 struct page_entry *pe);
354 // Crc data with address tagging.
355 virtual bool AdlerAddrCrcC(uint64 *srcmem64,
356 unsigned int size_in_bytes,
357 AdlerChecksum *checksum,
358 struct page_entry *pe);
359 // Setup tagging on an existing page.
360 virtual bool TagAddrC(uint64 *memwords,
361 unsigned int size_in_bytes);
362 // Report a mistagged cacheline.
363 virtual bool ReportTagError(uint64 *mem64,
366 // Print out the error record of the tag mismatch.
367 virtual void ProcessTagError(struct ErrorRecord *error,
369 const char *message);
371 // A worker thread can yield itself to give up CPU until it's scheduled again
375 // General state variables that all subclasses need.
376 int thread_num_; // Thread ID.
377 volatile bool status_; // Error status.
378 volatile int64 pages_copied_; // Recorded for memory bandwidth calc.
379 volatile int64 errorcount_; // Miscompares seen by this thread.
381 cpu_set_t cpu_mask_; // Cores this thread is allowed to run on.
382 volatile uint32 tag_; // Tag hint for memory this thread can use.
384 bool tag_mode_; // Tag cachelines with vaddr.
386 // Thread timing variables.
387 struct timeval start_time_; // Worker thread start time.
388 volatile int64 runduration_usec_; // Worker run duration in u-seconds.
390 // Function passed to pthread_create.
391 void *(*thread_spawner_)(void *args);
392 pthread_t thread_; // Pthread thread ID.
393 Priority priority_; // Worker thread priority.
394 class Sat *sat_; // Reference to parent stest object.
395 class OsLayer *os_; // Os abstraction: put hacks here.
396 class PatternList *patternlist_; // Reference to data patterns.
398 // Work around style guide ban on sizeof(int).
399 static const uint64 iamint_ = 0;
400 static const int wordsize_ = sizeof(iamint_);
403 WorkerStatus *worker_status_;
405 DISALLOW_COPY_AND_ASSIGN(WorkerThread);
408 // Worker thread to perform File IO.
409 class FileThread : public WorkerThread {
412 // Set filename to use for file IO.
413 virtual void SetFile(const char *filename_init);
416 // Calculate worker thread specific bandwidth.
417 virtual float GetDeviceCopiedData()
418 {return GetCopiedData()*2;}
419 virtual float GetMemoryCopiedData();
422 // Record of where these pages were sourced from, and what
423 // potentially broken components they passed through.
425 struct Pattern *pattern; // This is the data it should contain.
426 void *src; // This is the memory location the data was sourced from.
427 void *dst; // This is where it ended up.
430 // These are functions used by the various work loops.
431 // Pretty print and log a data miscompare. Disks require
432 // slightly different error handling.
433 virtual void ProcessError(struct ErrorRecord *er,
435 const char *message);
437 virtual bool OpenFile(int *pfile);
438 virtual bool CloseFile(int fd);
440 // Read and write whole file to disk.
441 virtual bool WritePages(int fd);
442 virtual bool ReadPages(int fd);
444 // Read and write pages to disk.
445 virtual bool WritePageToFile(int fd, struct page_entry *src);
446 virtual bool ReadPageFromFile(int fd, struct page_entry *dst);
448 // Sector tagging support.
449 virtual bool SectorTagPage(struct page_entry *src, int block);
450 virtual bool SectorValidatePage(const struct PageRec &page,
451 struct page_entry *dst,
454 // Get memory for an incoming data transfer..
455 virtual bool PagePrepare();
456 // Remove memory allocated for data transfer.
457 virtual bool PageTeardown();
459 // Get memory for an incoming data transfer..
460 virtual bool GetEmptyPage(struct page_entry *dst);
461 // Get memory for an outgoing data transfer..
462 virtual bool GetValidPage(struct page_entry *dst);
463 // Throw out a used empty page.
464 virtual bool PutEmptyPage(struct page_entry *src);
465 // Throw out a used, filled page.
466 virtual bool PutValidPage(struct page_entry *src);
469 struct PageRec *page_recs_; // Array of page records.
470 int crc_page_; // Page currently being CRC checked.
471 string filename_; // Name of file to access.
472 string devicename_; // Name of device file is on.
474 bool page_io_; // Use page pool for IO.
475 void *local_page_; // malloc'd page fon non-pool IO.
476 int pass_; // Number of writes to the file so far.
478 // Tag to detect file corruption.
480 volatile uint8 magic;
481 volatile uint8 block;
482 volatile uint8 sector;
487 DISALLOW_COPY_AND_ASSIGN(FileThread);
491 // Worker thread to perform Network IO.
492 class NetworkThread : public WorkerThread {
495 // Set hostname to use for net IO.
496 virtual void SetIP(const char *ipaddr_init);
499 // Calculate worker thread specific bandwidth.
500 virtual float GetDeviceCopiedData()
501 {return GetCopiedData()*2;}
504 // IsReadyToRunNoPause() wrapper, for NetworkSlaveThread to override.
505 virtual bool IsNetworkStopSet();
506 virtual bool CreateSocket(int *psocket);
507 virtual bool CloseSocket(int sock);
508 virtual bool Connect(int sock);
509 virtual bool SendPage(int sock, struct page_entry *src);
510 virtual bool ReceivePage(int sock, struct page_entry *dst);
515 DISALLOW_COPY_AND_ASSIGN(NetworkThread);
518 // Worker thread to reflect Network IO.
519 class NetworkSlaveThread : public NetworkThread {
521 NetworkSlaveThread();
522 // Set socket for IO.
523 virtual void SetSock(int sock);
527 virtual bool IsNetworkStopSet();
530 DISALLOW_COPY_AND_ASSIGN(NetworkSlaveThread);
533 // Worker thread to detect incoming Network IO.
534 class NetworkListenThread : public NetworkThread {
536 NetworkListenThread();
540 virtual bool Listen();
542 virtual bool GetConnection(int *pnewsock);
543 virtual bool SpawnSlave(int newsock, int threadid);
544 virtual bool ReapSlaves();
546 // For serviced incoming connections.
549 NetworkSlaveThread thread;
551 typedef vector<ChildWorker*> ChildVector;
552 ChildVector child_workers_;
554 DISALLOW_COPY_AND_ASSIGN(NetworkListenThread);
557 // Worker thread to perform Memory Copy.
558 class CopyThread : public WorkerThread {
562 // Calculate worker thread specific bandwidth.
563 virtual float GetMemoryCopiedData()
564 {return GetCopiedData()*2;}
567 DISALLOW_COPY_AND_ASSIGN(CopyThread);
570 // Worker thread to perform Memory Invert.
571 class InvertThread : public WorkerThread {
575 // Calculate worker thread specific bandwidth.
576 virtual float GetMemoryCopiedData()
577 {return GetCopiedData()*4;}
580 virtual int InvertPageUp(struct page_entry *srcpe);
581 virtual int InvertPageDown(struct page_entry *srcpe);
582 DISALLOW_COPY_AND_ASSIGN(InvertThread);
585 // Worker thread to fill blank pages on startup.
586 class FillThread : public WorkerThread {
589 // Set how many pages this thread should fill before exiting.
590 virtual void SetFillPages(int64 num_pages_to_fill_init);
594 // Fill a page with the data pattern in pe->pattern.
595 virtual bool FillPageRandom(struct page_entry *pe);
596 int64 num_pages_to_fill_;
597 DISALLOW_COPY_AND_ASSIGN(FillThread);
600 // Worker thread to verify page data matches pattern data.
601 // Thread will check and replace pages until "done" flag is set,
602 // then it will check and discard pages until no more remain.
603 class CheckThread : public WorkerThread {
607 // Calculate worker thread specific bandwidth.
608 virtual float GetMemoryCopiedData()
609 {return GetCopiedData();}
612 DISALLOW_COPY_AND_ASSIGN(CheckThread);
616 // Worker thread to poll for system error messages.
617 // Thread will check for messages until "done" flag is set.
618 class ErrorPollThread : public WorkerThread {
624 DISALLOW_COPY_AND_ASSIGN(ErrorPollThread);
627 // Computation intensive worker thread to stress CPU.
628 class CpuStressThread : public WorkerThread {
634 DISALLOW_COPY_AND_ASSIGN(CpuStressThread);
637 // Worker thread that tests the correctness of the
638 // CPU Cache Coherency Protocol.
639 class CpuCacheCoherencyThread : public WorkerThread {
641 CpuCacheCoherencyThread(cc_cacheline_data *cc_data,
642 int cc_cacheline_count_,
648 cc_cacheline_data *cc_cacheline_data_; // Datstructure for each cacheline.
649 int cc_local_num_; // Local counter for each thread.
650 int cc_cacheline_count_; // Number of cache lines to operate on.
651 int cc_thread_num_; // The integer id of the thread which is
652 // used as an index into the integer array
653 // of the cacheline datastructure.
654 int cc_inc_count_; // Number of times to increment the counter.
657 DISALLOW_COPY_AND_ASSIGN(CpuCacheCoherencyThread);
660 // Worker thread to perform disk test.
661 class DiskThread : public WorkerThread {
663 explicit DiskThread(DiskBlockTable *block_table);
664 virtual ~DiskThread();
665 // Calculate disk thread specific bandwidth.
666 virtual float GetDeviceCopiedData() {
667 return (blocks_written_ * write_block_size_ +
668 blocks_read_ * read_block_size_) / kMegabyte;}
670 // Set filename for device file (in /dev).
671 virtual void SetDevice(const char *device_name);
672 // Set various parameters that control the behaviour of the test.
673 virtual bool SetParameters(int read_block_size,
674 int write_block_size,
677 int blocks_per_segment,
678 int64 read_threshold,
679 int64 write_threshold,
680 int non_destructive);
684 virtual float GetMemoryCopiedData() {return 0;}
687 static const int kSectorSize = 512; // Size of sector on disk.
688 static const int kBufferAlignment = 512; // Buffer alignment required by the
690 static const int kBlockRetry = 100; // Number of retries to allocate
698 virtual bool OpenDevice(int *pfile);
699 virtual bool CloseDevice(int fd);
701 // Retrieves the size (in bytes) of the disk/file.
702 virtual bool GetDiskSize(int fd);
704 // Retrieves the current time in microseconds.
705 virtual int64 GetTime();
707 // Do an asynchronous disk I/O operation.
708 virtual bool AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
709 int64 offset, int64 timeout);
711 // Write a block to disk.
712 virtual bool WriteBlockToDisk(int fd, BlockData *block);
714 // Verify a block on disk.
715 virtual bool ValidateBlockOnDisk(int fd, BlockData *block);
718 virtual bool DoWork(int fd);
720 int read_block_size_; // Size of blocks read from disk, in bytes.
721 int write_block_size_; // Size of blocks written to disk, in bytes.
722 int64 blocks_read_; // Number of blocks read in work loop.
723 int64 blocks_written_; // Number of blocks written in work loop.
724 int64 segment_size_; // Size of disk segments (in bytes) that the disk
725 // will be split into where testing can be
726 // confined to a particular segment.
727 // Allows for control of how evenly the disk will
728 // be tested. Smaller segments imply more even
729 // testing (less random).
730 int blocks_per_segment_; // Number of blocks that will be tested per
732 int cache_size_; // Size of disk cache, in bytes.
733 int queue_size_; // Length of in-flight-blocks queue, in blocks.
734 int non_destructive_; // Use non-destructive mode or not.
735 int update_block_table_; // If true, assume this is the thread
736 // responsible for writing the data in the disk
737 // for this block device and, therefore,
738 // update the block table. If false, just use
739 // the block table to get data.
741 // read/write times threshold for reporting a problem
742 int64 read_threshold_; // Maximum time a read should take (in us) before
743 // a warning is given.
744 int64 write_threshold_; // Maximum time a write should take (in us) before
745 // a warning is given.
746 int64 read_timeout_; // Maximum time a read can take before a timeout
747 // and the aborting of the read operation.
748 int64 write_timeout_; // Maximum time a write can take before a timeout
749 // and the aborting of the write operation.
751 string device_name_; // Name of device file to access.
752 int64 device_sectors_; // Number of sectors on the device.
754 std::queue<BlockData*> in_flight_sectors_; // Queue of sectors written but
756 void *block_buffer_; // Pointer to aligned block buffer.
759 io_context_t aio_ctx_; // Asynchronous I/O context for Linux native AIO.
762 DiskBlockTable *block_table_; // Disk Block Table, shared by all disk
763 // threads that read / write at the same
766 DISALLOW_COPY_AND_ASSIGN(DiskThread);
769 class RandomDiskThread : public DiskThread {
771 explicit RandomDiskThread(DiskBlockTable *block_table);
772 virtual ~RandomDiskThread();
774 virtual bool DoWork(int fd);
776 DISALLOW_COPY_AND_ASSIGN(RandomDiskThread);
779 // Worker thread to perform checks in a specific memory region.
780 class MemoryRegionThread : public WorkerThread {
782 MemoryRegionThread();
783 ~MemoryRegionThread();
785 void ProcessError(struct ErrorRecord *error, int priority,
786 const char *message);
787 bool SetRegion(void *region, int64 size);
788 // Calculate worker thread specific bandwidth.
789 virtual float GetMemoryCopiedData()
790 {return GetCopiedData();}
791 virtual float GetDeviceCopiedData()
792 {return GetCopiedData() * 2;}
793 void SetIdentifier(string identifier) {
794 identifier_ = identifier;
798 // Page queue for this particular memory region.
800 PageEntryQueue *pages_;
801 bool error_injection_;
804 static const int kPhaseNoPhase = 0;
805 static const int kPhaseCopy = 1;
806 static const int kPhaseCheck = 2;
809 DISALLOW_COPY_AND_ASSIGN(MemoryRegionThread);
812 #endif // STRESSAPPTEST_WORKER_H_