1 // Copyright 2006 Google Inc. All Rights Reserved.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 // worker.h : worker thread interface
17 // This file contains the Worker Thread class interface
18 // for the SAT test. Worker Threads implement a repetative
19 // task used to test or stress the system.
21 #ifndef STRESSAPPTEST_WORKER_H_
22 #define STRESSAPPTEST_WORKER_H_
27 #include <sys/types.h>
36 // This file must work with autoconf on its public version,
37 // so these includes are correct.
38 #include "disk_blocks.h"
43 // Global Datastruture shared by the Cache Coherency Worker Threads.
44 struct cc_cacheline_data {
49 // (Other workflows may be possible, see function comments for details.)
50 // - Control thread creates object.
51 // - Control thread calls AddWorkers(1) for each worker thread.
52 // - Control thread calls Initialize().
53 // - Control thread launches worker threads.
54 // - Every worker thread frequently calls ContinueRunning().
55 // - Control thread periodically calls PauseWorkers(), effectively sleeps, and
56 // then calls ResumeWorkers().
57 // - Some worker threads may exit early, before StopWorkers() is called. They
58 // call RemoveSelf() after their last call to ContinueRunning().
59 // - Control thread eventually calls StopWorkers().
60 // - Worker threads exit.
61 // - Control thread joins worker threads.
62 // - Control thread calls Destroy().
63 // - Control thread destroys object.
66 // - ContinueRunning() may be called concurrently by different workers, but not
67 // by a single worker.
68 // - No other methods may ever be called concurrently, with themselves or
70 // - This object may be used by multiple threads only between Initialize() and
73 // TODO(matthewb): Move this class and its unittest to their own files.
76 //--------------------------------
77 // Methods for the control thread.
78 //--------------------------------
80 WorkerStatus() : num_workers_(0), status_(RUN) {}
82 // Called by the control thread to increase the worker count. Must be called
83 // before Initialize(). The worker count is 0 upon object initialization.
84 void AddWorkers(int num_new_workers) {
85 // No need to lock num_workers_mutex_ because this is before Initialize().
86 num_workers_ += num_new_workers;
89 // Called by the control thread. May not be called multiple times. If
90 // called, Destroy() must be called before destruction.
93 // Called by the control thread after joining all worker threads. Must be
94 // called iff Initialize() was called. No methods may be called after calling
98 // Called by the control thread to tell the workers to pause. Does not return
99 // until all workers have called ContinueRunning() or RemoveSelf(). May only
100 // be called between Initialize() and Stop(). Must not be called multiple
101 // times without ResumeWorkers() having been called inbetween.
104 // Called by the control thread to tell the workers to resume from a pause.
105 // May only be called between Initialize() and Stop(). May only be called
106 // directly after PauseWorkers().
107 void ResumeWorkers();
109 // Called by the control thread to tell the workers to stop. May only be
110 // called between Initialize() and Destroy(). May only be called once.
113 //--------------------------------
114 // Methods for the worker threads.
115 //--------------------------------
117 // Called by worker threads to decrease the worker count by one. May only be
118 // called between Initialize() and Destroy(). May wait for ResumeWorkers()
119 // when called after PauseWorkers().
122 // Called by worker threads between Initialize() and Destroy(). May be called
123 // any number of times. Return value is whether or not the worker should
124 // continue running. When called after PauseWorkers(), does not return until
125 // ResumeWorkers() or StopWorkers() has been called. Number of distinct
126 // calling threads must match the worker count (see AddWorkers() and
128 bool ContinueRunning();
130 // TODO(matthewb): Is this functionality really necessary? Remove it if not.
132 // This is a hack! It's like ContinueRunning(), except it won't pause. If
133 // any worker threads use this exclusively in place of ContinueRunning() then
134 // PauseWorkers() should never be used!
135 bool ContinueRunningNoPause();
138 enum Status { RUN, PAUSE, STOP };
140 void WaitOnPauseBarrier() {
141 int error = pthread_barrier_wait(&pause_barrier_);
142 if (error != PTHREAD_BARRIER_SERIAL_THREAD)
143 sat_assert(error == 0);
146 void AcquireNumWorkersLock() {
147 sat_assert(0 == pthread_mutex_lock(&num_workers_mutex_));
150 void ReleaseNumWorkersLock() {
151 sat_assert(0 == pthread_mutex_unlock(&num_workers_mutex_));
154 void AcquireStatusReadLock() {
155 sat_assert(0 == pthread_rwlock_rdlock(&status_rwlock_));
158 void AcquireStatusWriteLock() {
159 sat_assert(0 == pthread_rwlock_wrlock(&status_rwlock_));
162 void ReleaseStatusLock() {
163 sat_assert(0 == pthread_rwlock_unlock(&status_rwlock_));
167 AcquireStatusReadLock();
168 Status status = status_;
173 // Returns the previous status.
174 Status SetStatus(Status status) {
175 AcquireStatusWriteLock();
176 Status prev_status = status_;
182 pthread_mutex_t num_workers_mutex_;
185 pthread_rwlock_t status_rwlock_;
188 // Guaranteed to not be in use when (status_ != PAUSE).
189 pthread_barrier_t pause_barrier_;
191 DISALLOW_COPY_AND_ASSIGN(WorkerStatus);
195 // This is a base class for worker threads.
196 // Each thread repeats a specific
197 // task on various blocks of memory.
200 // Enum to mark a thread as low/med/high priority.
207 virtual ~WorkerThread();
209 // Initialize values and thread ID number.
210 virtual void InitThread(int thread_num_init,
212 class OsLayer *os_init,
213 class PatternList *patternlist_init,
214 WorkerStatus *worker_status);
216 // This function is DEPRECATED, it does nothing.
217 void SetPriority(Priority priority) { priority_ = priority; }
218 // Spawn the worker thread, by running Work().
220 // Only for ThreadSpawnerGeneric().
224 // Wait for the thread to complete its cleanup.
225 virtual bool JoinThread();
226 // Kill worker thread with SIGINT.
227 virtual bool KillThread();
229 // This is the task function that the thread executes.
230 // This is implemented per subclass.
233 // Starts per-WorkerThread timer.
234 void StartThreadTimer() {gettimeofday(&start_time_, NULL);}
235 // Reads current timer value and returns run duration without recording it.
236 int64 ReadThreadTimer() {
237 struct timeval end_time_;
238 gettimeofday(&end_time_, NULL);
239 return (end_time_.tv_sec - start_time_.tv_sec)*1000000 +
240 (end_time_.tv_usec - start_time_.tv_usec);
242 // Stops per-WorkerThread timer and records thread run duration.
243 // Start/Stop ThreadTimer repetitively has cumulative effect, ie the timer
244 // is effectively paused and restarted, so runduration_usec accumulates on.
245 void StopThreadTimer() {
246 runduration_usec_ += ReadThreadTimer();
249 // Acccess member variables.
250 bool GetStatus() {return status_;}
251 int64 GetErrorCount() {return errorcount_;}
252 int64 GetPageCount() {return pages_copied_;}
253 int64 GetRunDurationUSec() {return runduration_usec_;}
255 // Returns bandwidth defined as pages_copied / thread_run_durations.
256 virtual float GetCopiedData();
257 // Calculate worker thread specific copied data.
258 virtual float GetMemoryCopiedData() {return 0;}
259 virtual float GetDeviceCopiedData() {return 0;}
260 // Calculate worker thread specific bandwidth.
261 virtual float GetMemoryBandwidth()
262 {return GetMemoryCopiedData() / (
263 runduration_usec_ * 1.0 / 1000000);}
264 virtual float GetDeviceBandwidth()
265 {return GetDeviceCopiedData() / (
266 runduration_usec_ * 1.0 / 1000000);}
268 void set_cpu_mask(cpu_set_t *mask) {
269 memcpy(&cpu_mask_, mask, sizeof(*mask));
272 void set_cpu_mask_to_cpu(int cpu_num) {
273 cpuset_set_ab(&cpu_mask_, cpu_num, cpu_num + 1);
276 void set_tag(int32 tag) {tag_ = tag;}
278 // Returns CPU mask, where each bit represents a logical cpu.
279 bool AvailableCpus(cpu_set_t *cpuset);
280 // Returns CPU mask of CPUs this thread is bound to,
281 bool CurrentCpus(cpu_set_t *cpuset);
282 // Returns Current Cpus mask as string.
283 string CurrentCpusFormat() {
284 cpu_set_t current_cpus;
285 CurrentCpus(¤t_cpus);
286 return cpuset_format(¤t_cpus);
289 int ThreadID() {return thread_num_;}
291 // Bind worker thread to specified CPU(s)
292 bool BindToCpus(const cpu_set_t *cpuset);
295 // This function dictates whether the main work loop
296 // continues, waits, or terminates.
297 // All work loops should be of the form:
300 // } while (IsReadyToRun());
301 virtual bool IsReadyToRun() { return worker_status_->ContinueRunning(); }
302 // TODO(matthewb): Is this function really necessary? Remove it if not.
304 // Like IsReadyToRun(), except it won't pause.
305 virtual bool IsReadyToRunNoPause() {
306 return worker_status_->ContinueRunningNoPause();
309 // These are functions used by the various work loops.
310 // Pretty print and log a data miscompare.
311 virtual void ProcessError(struct ErrorRecord *er,
313 const char *message);
315 // Compare a region of memory with a known data patter, and report errors.
316 virtual int CheckRegion(void *addr,
320 int64 patternoffset);
322 // Fast compare a block of memory.
323 virtual int CrcCheckPage(struct page_entry *srcpe);
325 // Fast copy a block of memory, while verifying correctness.
326 virtual int CrcCopyPage(struct page_entry *dstpe,
327 struct page_entry *srcpe);
329 // Fast copy a block of memory, while verifying correctness, and heating CPU.
330 virtual int CrcWarmCopyPage(struct page_entry *dstpe,
331 struct page_entry *srcpe);
333 // Fill a page with its specified pattern.
334 virtual bool FillPage(struct page_entry *pe);
336 // Copy with address tagging.
337 virtual bool AdlerAddrMemcpyC(uint64 *dstmem64,
339 unsigned int size_in_bytes,
340 AdlerChecksum *checksum,
341 struct page_entry *pe);
342 // SSE copy with address tagging.
343 virtual bool AdlerAddrMemcpyWarm(uint64 *dstmem64,
345 unsigned int size_in_bytes,
346 AdlerChecksum *checksum,
347 struct page_entry *pe);
348 // Crc data with address tagging.
349 virtual bool AdlerAddrCrcC(uint64 *srcmem64,
350 unsigned int size_in_bytes,
351 AdlerChecksum *checksum,
352 struct page_entry *pe);
353 // Setup tagging on an existing page.
354 virtual bool TagAddrC(uint64 *memwords,
355 unsigned int size_in_bytes);
356 // Report a mistagged cacheline.
357 virtual bool ReportTagError(uint64 *mem64,
360 // Print out the error record of the tag mismatch.
361 virtual void ProcessTagError(struct ErrorRecord *error,
363 const char *message);
365 // A worker thread can yield itself to give up CPU until it's scheduled again
369 // General state variables that all subclasses need.
370 int thread_num_; // Thread ID.
371 volatile bool status_; // Error status.
372 volatile int64 pages_copied_; // Recorded for memory bandwidth calc.
373 volatile int64 errorcount_; // Miscompares seen by this thread.
375 cpu_set_t cpu_mask_; // Cores this thread is allowed to run on.
376 volatile uint32 tag_; // Tag hint for memory this thread can use.
378 bool tag_mode_; // Tag cachelines with vaddr.
380 // Thread timing variables.
381 struct timeval start_time_; // Worker thread start time.
382 volatile int64 runduration_usec_; // Worker run duration in u-seconds.
384 // Function passed to pthread_create.
385 void *(*thread_spawner_)(void *args);
386 pthread_t thread_; // Pthread thread ID.
387 Priority priority_; // Worker thread priority.
388 class Sat *sat_; // Reference to parent stest object.
389 class OsLayer *os_; // Os abstraction: put hacks here.
390 class PatternList *patternlist_; // Reference to data patterns.
392 // Work around style guide ban on sizeof(int).
393 static const uint64 iamint_ = 0;
394 static const int wordsize_ = sizeof(iamint_);
397 WorkerStatus *worker_status_;
399 DISALLOW_COPY_AND_ASSIGN(WorkerThread);
402 // Worker thread to perform File IO.
403 class FileThread : public WorkerThread {
406 // Set filename to use for file IO.
407 virtual void SetFile(const char *filename_init);
410 // Calculate worker thread specific bandwidth.
411 virtual float GetDeviceCopiedData()
412 {return GetCopiedData()*2;}
413 virtual float GetMemoryCopiedData();
416 // Record of where these pages were sourced from, and what
417 // potentially broken components they passed through.
419 struct Pattern *pattern; // This is the data it should contain.
420 void *src; // This is the memory location the data was sourced from.
421 void *dst; // This is where it ended up.
424 // These are functions used by the various work loops.
425 // Pretty print and log a data miscompare. Disks require
426 // slightly different error handling.
427 virtual void ProcessError(struct ErrorRecord *er,
429 const char *message);
431 virtual bool OpenFile(int *pfile);
432 virtual bool CloseFile(int fd);
434 // Read and write whole file to disk.
435 virtual bool WritePages(int fd);
436 virtual bool ReadPages(int fd);
438 // Read and write pages to disk.
439 virtual bool WritePageToFile(int fd, struct page_entry *src);
440 virtual bool ReadPageFromFile(int fd, struct page_entry *dst);
442 // Sector tagging support.
443 virtual bool SectorTagPage(struct page_entry *src, int block);
444 virtual bool SectorValidatePage(const struct PageRec &page,
445 struct page_entry *dst,
448 // Get memory for an incoming data transfer..
449 virtual bool PagePrepare();
450 // Remove memory allocated for data transfer.
451 virtual bool PageTeardown();
453 // Get memory for an incoming data transfer..
454 virtual bool GetEmptyPage(struct page_entry *dst);
455 // Get memory for an outgoing data transfer..
456 virtual bool GetValidPage(struct page_entry *dst);
457 // Throw out a used empty page.
458 virtual bool PutEmptyPage(struct page_entry *src);
459 // Throw out a used, filled page.
460 virtual bool PutValidPage(struct page_entry *src);
463 struct PageRec *page_recs_; // Array of page records.
464 int crc_page_; // Page currently being CRC checked.
465 string filename_; // Name of file to access.
466 string devicename_; // Name of device file is on.
468 bool page_io_; // Use page pool for IO.
469 void *local_page_; // malloc'd page fon non-pool IO.
470 int pass_; // Number of writes to the file so far.
472 // Tag to detect file corruption.
474 volatile uint8 magic;
475 volatile uint8 block;
476 volatile uint8 sector;
481 DISALLOW_COPY_AND_ASSIGN(FileThread);
485 // Worker thread to perform Network IO.
486 class NetworkThread : public WorkerThread {
489 // Set hostname to use for net IO.
490 virtual void SetIP(const char *ipaddr_init);
493 // Calculate worker thread specific bandwidth.
494 virtual float GetDeviceCopiedData()
495 {return GetCopiedData()*2;}
498 // IsReadyToRunNoPause() wrapper, for NetworkSlaveThread to override.
499 virtual bool IsNetworkStopSet();
500 virtual bool CreateSocket(int *psocket);
501 virtual bool CloseSocket(int sock);
502 virtual bool Connect(int sock);
503 virtual bool SendPage(int sock, struct page_entry *src);
504 virtual bool ReceivePage(int sock, struct page_entry *dst);
509 DISALLOW_COPY_AND_ASSIGN(NetworkThread);
512 // Worker thread to reflect Network IO.
513 class NetworkSlaveThread : public NetworkThread {
515 NetworkSlaveThread();
516 // Set socket for IO.
517 virtual void SetSock(int sock);
521 virtual bool IsNetworkStopSet();
524 DISALLOW_COPY_AND_ASSIGN(NetworkSlaveThread);
527 // Worker thread to detect incoming Network IO.
528 class NetworkListenThread : public NetworkThread {
530 NetworkListenThread();
534 virtual bool Listen();
536 virtual bool GetConnection(int *pnewsock);
537 virtual bool SpawnSlave(int newsock, int threadid);
538 virtual bool ReapSlaves();
540 // For serviced incoming connections.
543 NetworkSlaveThread thread;
545 typedef vector<ChildWorker*> ChildVector;
546 ChildVector child_workers_;
548 DISALLOW_COPY_AND_ASSIGN(NetworkListenThread);
551 // Worker thread to perform Memory Copy.
552 class CopyThread : public WorkerThread {
556 // Calculate worker thread specific bandwidth.
557 virtual float GetMemoryCopiedData()
558 {return GetCopiedData()*2;}
561 DISALLOW_COPY_AND_ASSIGN(CopyThread);
564 // Worker thread to perform Memory Invert.
565 class InvertThread : public WorkerThread {
569 // Calculate worker thread specific bandwidth.
570 virtual float GetMemoryCopiedData()
571 {return GetCopiedData()*4;}
574 virtual int InvertPageUp(struct page_entry *srcpe);
575 virtual int InvertPageDown(struct page_entry *srcpe);
576 DISALLOW_COPY_AND_ASSIGN(InvertThread);
579 // Worker thread to fill blank pages on startup.
580 class FillThread : public WorkerThread {
583 // Set how many pages this thread should fill before exiting.
584 virtual void SetFillPages(int64 num_pages_to_fill_init);
588 // Fill a page with the data pattern in pe->pattern.
589 virtual bool FillPageRandom(struct page_entry *pe);
590 int64 num_pages_to_fill_;
591 DISALLOW_COPY_AND_ASSIGN(FillThread);
594 // Worker thread to verify page data matches pattern data.
595 // Thread will check and replace pages until "done" flag is set,
596 // then it will check and discard pages until no more remain.
597 class CheckThread : public WorkerThread {
601 // Calculate worker thread specific bandwidth.
602 virtual float GetMemoryCopiedData()
603 {return GetCopiedData();}
606 DISALLOW_COPY_AND_ASSIGN(CheckThread);
610 // Worker thread to poll for system error messages.
611 // Thread will check for messages until "done" flag is set.
612 class ErrorPollThread : public WorkerThread {
618 DISALLOW_COPY_AND_ASSIGN(ErrorPollThread);
621 // Computation intensive worker thread to stress CPU.
622 class CpuStressThread : public WorkerThread {
628 DISALLOW_COPY_AND_ASSIGN(CpuStressThread);
631 // Worker thread that tests the correctness of the
632 // CPU Cache Coherency Protocol.
633 class CpuCacheCoherencyThread : public WorkerThread {
635 CpuCacheCoherencyThread(cc_cacheline_data *cc_data,
636 int cc_cacheline_count_,
642 cc_cacheline_data *cc_cacheline_data_; // Datstructure for each cacheline.
643 int cc_local_num_; // Local counter for each thread.
644 int cc_cacheline_count_; // Number of cache lines to operate on.
645 int cc_thread_num_; // The integer id of the thread which is
646 // used as an index into the integer array
647 // of the cacheline datastructure.
648 int cc_inc_count_; // Number of times to increment the counter.
651 DISALLOW_COPY_AND_ASSIGN(CpuCacheCoherencyThread);
654 // Worker thread to perform disk test.
655 class DiskThread : public WorkerThread {
657 explicit DiskThread(DiskBlockTable *block_table);
658 virtual ~DiskThread();
659 // Calculate disk thread specific bandwidth.
660 virtual float GetDeviceCopiedData() {
661 return (blocks_written_ * write_block_size_ +
662 blocks_read_ * read_block_size_) / kMegabyte;}
664 // Set filename for device file (in /dev).
665 virtual void SetDevice(const char *device_name);
666 // Set various parameters that control the behaviour of the test.
667 virtual bool SetParameters(int read_block_size,
668 int write_block_size,
671 int blocks_per_segment,
672 int64 read_threshold,
673 int64 write_threshold,
674 int non_destructive);
678 virtual float GetMemoryCopiedData() {return 0;}
681 static const int kSectorSize = 512; // Size of sector on disk.
682 static const int kBufferAlignment = 512; // Buffer alignment required by the
684 static const int kBlockRetry = 100; // Number of retries to allocate
692 virtual bool OpenDevice(int *pfile);
693 virtual bool CloseDevice(int fd);
695 // Retrieves the size (in bytes) of the disk/file.
696 virtual bool GetDiskSize(int fd);
698 // Retrieves the current time in microseconds.
699 virtual int64 GetTime();
701 // Do an asynchronous disk I/O operation.
702 virtual bool AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
703 int64 offset, int64 timeout);
705 // Write a block to disk.
706 virtual bool WriteBlockToDisk(int fd, BlockData *block);
708 // Verify a block on disk.
709 virtual bool ValidateBlockOnDisk(int fd, BlockData *block);
712 virtual bool DoWork(int fd);
714 int read_block_size_; // Size of blocks read from disk, in bytes.
715 int write_block_size_; // Size of blocks written to disk, in bytes.
716 int64 blocks_read_; // Number of blocks read in work loop.
717 int64 blocks_written_; // Number of blocks written in work loop.
718 int64 segment_size_; // Size of disk segments (in bytes) that the disk
719 // will be split into where testing can be
720 // confined to a particular segment.
721 // Allows for control of how evenly the disk will
722 // be tested. Smaller segments imply more even
723 // testing (less random).
724 int blocks_per_segment_; // Number of blocks that will be tested per
726 int cache_size_; // Size of disk cache, in bytes.
727 int queue_size_; // Length of in-flight-blocks queue, in blocks.
728 int non_destructive_; // Use non-destructive mode or not.
729 int update_block_table_; // If true, assume this is the thread
730 // responsible for writing the data in the disk
731 // for this block device and, therefore,
732 // update the block table. If false, just use
733 // the block table to get data.
735 // read/write times threshold for reporting a problem
736 int64 read_threshold_; // Maximum time a read should take (in us) before
737 // a warning is given.
738 int64 write_threshold_; // Maximum time a write should take (in us) before
739 // a warning is given.
740 int64 read_timeout_; // Maximum time a read can take before a timeout
741 // and the aborting of the read operation.
742 int64 write_timeout_; // Maximum time a write can take before a timeout
743 // and the aborting of the write operation.
745 string device_name_; // Name of device file to access.
746 int64 device_sectors_; // Number of sectors on the device.
748 std::queue<BlockData*> in_flight_sectors_; // Queue of sectors written but
750 void *block_buffer_; // Pointer to aligned block buffer.
752 io_context_t aio_ctx_; // Asynchronous I/O context for Linux native AIO.
754 DiskBlockTable *block_table_; // Disk Block Table, shared by all disk
755 // threads that read / write at the same
758 DISALLOW_COPY_AND_ASSIGN(DiskThread);
761 class RandomDiskThread : public DiskThread {
763 explicit RandomDiskThread(DiskBlockTable *block_table);
764 virtual ~RandomDiskThread();
766 virtual bool DoWork(int fd);
768 DISALLOW_COPY_AND_ASSIGN(RandomDiskThread);
771 // Worker thread to perform checks in a specific memory region.
772 class MemoryRegionThread : public WorkerThread {
774 MemoryRegionThread();
775 ~MemoryRegionThread();
777 void ProcessError(struct ErrorRecord *error, int priority,
778 const char *message);
779 bool SetRegion(void *region, int64 size);
780 // Calculate worker thread specific bandwidth.
781 virtual float GetMemoryCopiedData()
782 {return GetCopiedData();}
783 virtual float GetDeviceCopiedData()
784 {return GetCopiedData() * 2;}
785 void SetIdentifier(string identifier) {
786 identifier_ = identifier;
790 // Page queue for this particular memory region.
792 PageEntryQueue *pages_;
793 bool error_injection_;
796 static const int kPhaseNoPhase = 0;
797 static const int kPhaseCopy = 1;
798 static const int kPhaseCheck = 2;
801 DISALLOW_COPY_AND_ASSIGN(MemoryRegionThread);
804 #endif // STRESSAPPTEST_WORKER_H_