chiark / gitweb /
Initial commit
[stressapptest] / src / worker.h
1 // Copyright 2006 Google Inc. All Rights Reserved.
2
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6
7 //      http://www.apache.org/licenses/LICENSE-2.0
8
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // worker.h : worker thread interface
16
17 // This file contains the Worker Thread class interface
18 // for the SAT test. Worker Threads implement a repetative
19 // task used to test or stress the system.
20
21 #ifndef STRESSAPPTEST_WORKER_H_
22 #define STRESSAPPTEST_WORKER_H_
23
24 #include <pthread.h>
25
26 #include <sys/time.h>
27 #include <sys/types.h>
28
29 #include <linux/aio_abi.h>
30
31 #include <queue>
32 #include <set>
33 #include <string>
34 #include <vector>
35
36 // This file must work with autoconf on its public version,
37 // so these includes are correct.
38 #include "disk_blocks.h"
39 #include "queue.h"
40 #include "sattypes.h"
41
42
43 // Global Datastruture shared by the Cache Coherency Worker Threads.
44 struct cc_cacheline_data {
45   int *num;
46 };
47
48 // Typical usage:
49 // (Other workflows may be possible, see function comments for details.)
50 // - Control thread creates object.
51 // - Control thread calls AddWorkers(1) for each worker thread.
52 // - Control thread calls Initialize().
53 // - Control thread launches worker threads.
54 // - Every worker thread frequently calls ContinueRunning().
55 // - Control thread periodically calls PauseWorkers(), effectively sleeps, and
56 //     then calls ResumeWorkers().
57 // - Some worker threads may exit early, before StopWorkers() is called.  They
58 //     call RemoveSelf() after their last call to ContinueRunning().
59 // - Control thread eventually calls StopWorkers().
60 // - Worker threads exit.
61 // - Control thread joins worker threads.
62 // - Control thread calls Destroy().
63 // - Control thread destroys object.
64 //
65 // Threadsafety:
66 // - ContinueRunning() may be called concurrently by different workers, but not
67 //     by a single worker.
68 // - No other methods may ever be called concurrently, with themselves or
69 //     eachother.
70 // - This object may be used by multiple threads only between Initialize() and
71 //     Destroy().
72 //
73 // TODO(matthewb): Move this class and its unittest to their own files.
74 class WorkerStatus {
75  public:
76   //--------------------------------
77   // Methods for the control thread.
78   //--------------------------------
79
80   WorkerStatus() : num_workers_(0), status_(RUN) {}
81
82   // Called by the control thread to increase the worker count.  Must be called
83   // before Initialize().  The worker count is 0 upon object initialization.
84   void AddWorkers(int num_new_workers) {
85     // No need to lock num_workers_mutex_ because this is before Initialize().
86     num_workers_ += num_new_workers;
87   }
88
89   // Called by the control thread.  May not be called multiple times.  If
90   // called, Destroy() must be called before destruction.
91   void Initialize();
92
93   // Called by the control thread after joining all worker threads.  Must be
94   // called iff Initialize() was called.  No methods may be called after calling
95   // this.
96   void Destroy();
97
98   // Called by the control thread to tell the workers to pause.  Does not return
99   // until all workers have called ContinueRunning() or RemoveSelf().  May only
100   // be called between Initialize() and Stop().  Must not be called multiple
101   // times without ResumeWorkers() having been called inbetween.
102   void PauseWorkers();
103
104   // Called by the control thread to tell the workers to resume from a pause.
105   // May only be called between Initialize() and Stop().  May only be called
106   // directly after PauseWorkers().
107   void ResumeWorkers();
108
109   // Called by the control thread to tell the workers to stop.  May only be
110   // called between Initialize() and Destroy().  May only be called once.
111   void StopWorkers();
112
113   //--------------------------------
114   // Methods for the worker threads.
115   //--------------------------------
116
117   // Called by worker threads to decrease the worker count by one.  May only be
118   // called between Initialize() and Destroy().  May wait for ResumeWorkers()
119   // when called after PauseWorkers().
120   void RemoveSelf();
121
122   // Called by worker threads between Initialize() and Destroy().  May be called
123   // any number of times.  Return value is whether or not the worker should
124   // continue running.  When called after PauseWorkers(), does not return until
125   // ResumeWorkers() or StopWorkers() has been called.  Number of distinct
126   // calling threads must match the worker count (see AddWorkers() and
127   // RemoveSelf()).
128   bool ContinueRunning();
129
130   // TODO(matthewb): Is this functionality really necessary?  Remove it if not.
131   //
132   // This is a hack!  It's like ContinueRunning(), except it won't pause.  If
133   // any worker threads use this exclusively in place of ContinueRunning() then
134   // PauseWorkers() should never be used!
135   bool ContinueRunningNoPause();
136
137  private:
138   enum Status { RUN, PAUSE, STOP };
139
140   void WaitOnPauseBarrier() {
141     int error = pthread_barrier_wait(&pause_barrier_);
142     if (error != PTHREAD_BARRIER_SERIAL_THREAD)
143       sat_assert(error == 0);
144   }
145
146   void AcquireNumWorkersLock() {
147     sat_assert(0 == pthread_mutex_lock(&num_workers_mutex_));
148   }
149
150   void ReleaseNumWorkersLock() {
151     sat_assert(0 == pthread_mutex_unlock(&num_workers_mutex_));
152   }
153
154   void AcquireStatusReadLock() {
155     sat_assert(0 == pthread_rwlock_rdlock(&status_rwlock_));
156   }
157
158   void AcquireStatusWriteLock() {
159     sat_assert(0 == pthread_rwlock_wrlock(&status_rwlock_));
160   }
161
162   void ReleaseStatusLock() {
163     sat_assert(0 == pthread_rwlock_unlock(&status_rwlock_));
164   }
165
166   Status GetStatus() {
167     AcquireStatusReadLock();
168     Status status = status_;
169     ReleaseStatusLock();
170     return status;
171   }
172
173   // Returns the previous status.
174   Status SetStatus(Status status) {
175     AcquireStatusWriteLock();
176     Status prev_status = status_;
177     status_ = status;
178     ReleaseStatusLock();
179     return prev_status;
180   }
181
182   pthread_mutex_t num_workers_mutex_;
183   int num_workers_;
184
185   pthread_rwlock_t status_rwlock_;
186   Status status_;
187
188   // Guaranteed to not be in use when (status_ != PAUSE).
189   pthread_barrier_t pause_barrier_;
190
191   DISALLOW_COPY_AND_ASSIGN(WorkerStatus);
192 };
193
194
195 // This is a base class for worker threads.
196 // Each thread repeats a specific
197 // task on various blocks of memory.
198 class WorkerThread {
199  public:
200   // Enum to mark a thread as low/med/high priority.
201   enum Priority {
202     Low,
203     Normal,
204     High,
205   };
206   WorkerThread();
207   virtual ~WorkerThread();
208
209   // Initialize values and thread ID number.
210   void InitThread(int thread_num_init,
211                   class Sat *sat_init,
212                   class OsLayer *os_init,
213                   class PatternList *patternlist_init,
214                   WorkerStatus *worker_status);
215
216   // This function is DEPRECATED, it does nothing.
217   void SetPriority(Priority priority) { priority_ = priority; }
218   // Spawn the worker thread, by running Work().
219   int SpawnThread();
220   // Only for ThreadSpawnerGeneric().
221   void StartRoutine();
222   bool InitPriority();
223
224   // Wait for the thread to complete its cleanup.
225   virtual int JoinThread();
226   // Kill worker thread with SIGINT.
227   virtual int KillThread();
228
229   // This is the task function that the thread executes.
230   // This is implemented per subclass.
231   virtual int Work();
232
233   // Starts per-WorkerThread timer.
234   void StartThreadTimer() {gettimeofday(&start_time_, NULL);}
235   // Reads current timer value and returns run duration without recording it.
236   int64 ReadThreadTimer() {
237     struct timeval end_time_;
238     gettimeofday(&end_time_, NULL);
239     return (end_time_.tv_sec - start_time_.tv_sec)*1000000 +
240       (end_time_.tv_usec - start_time_.tv_usec);
241   }
242   // Stops per-WorkerThread timer and records thread run duration.
243   // Start/Stop ThreadTimer repetitively has cumulative effect, ie the timer
244   // is effectively paused and restarted, so runduration_usec accumulates on.
245   void StopThreadTimer() {
246     runduration_usec_ += ReadThreadTimer();
247   }
248
249   // Acccess member variables.
250   int GetStatus() {return status_;}
251   int64 GetErrorCount() {return errorcount_;}
252   int64 GetPageCount() {return pages_copied_;}
253   int64 GetRunDurationUSec() {return runduration_usec_;}
254
255   // Returns bandwidth defined as pages_copied / thread_run_durations.
256   float GetCopiedData();
257   // Calculate worker thread specific copied data.
258   virtual float GetMemoryCopiedData() {return 0;}
259   virtual float GetDeviceCopiedData() {return 0;}
260   // Calculate worker thread specific bandwidth.
261   virtual float GetMemoryBandwidth()
262     {return GetMemoryCopiedData() / (
263         runduration_usec_ * 1.0 / 1000000);}
264   virtual float GetDeviceBandwidth()
265     {return GetDeviceCopiedData() / (
266         runduration_usec_ * 1.0 / 1000000);}
267
268   void set_cpu_mask(int32 mask) {cpu_mask_ = mask;}
269   void set_tag(int32 tag) {tag_ = tag;}
270
271   // Returns CPU mask, where each bit represents a logical cpu.
272   uint32 AvailableCpus();
273   // Returns CPU mask of CPUs this thread is bound to,
274   uint32 CurrentCpus();
275
276   int ThreadID() {return thread_num_;}
277
278   // Bind worker thread to specified CPU(s)
279   bool BindToCpus(uint32 thread_mask);
280
281  protected:
282   // This function dictates whether the main work loop
283   // continues, waits, or terminates.
284   // All work loops should be of the form:
285   //   do {
286   //     // work.
287   //   } while (IsReadyToRun());
288   virtual bool IsReadyToRun() { return worker_status_->ContinueRunning(); }
289   // TODO(matthewb): Is this function really necessary? Remove it if not.
290   //
291   // Like IsReadyToRun(), except it won't pause.
292   virtual bool IsReadyToRunNoPause() {
293     return worker_status_->ContinueRunningNoPause();
294   }
295
296   // These are functions used by the various work loops.
297   // Pretty print and log a data miscompare.
298   virtual void ProcessError(struct ErrorRecord *er,
299                             int priority,
300                             const char *message);
301
302   // Compare a region of memory with a known data patter, and report errors.
303   virtual int CheckRegion(void *addr,
304                           class Pattern *pat,
305                           int64 length,
306                           int offset,
307                           int64 patternoffset);
308
309   // Fast compare a block of memory.
310   virtual int CrcCheckPage(struct page_entry *srcpe);
311
312   // Fast copy a block of memory, while verifying correctness.
313   virtual int CrcCopyPage(struct page_entry *dstpe,
314                           struct page_entry *srcpe);
315
316   // Fast copy a block of memory, while verifying correctness, and heating CPU.
317   virtual int CrcWarmCopyPage(struct page_entry *dstpe,
318                               struct page_entry *srcpe);
319
320   // Fill a page with its specified pattern.
321   virtual bool FillPage(struct page_entry *pe);
322
323   // Copy with address tagging.
324   virtual bool AdlerAddrMemcpyC(uint64 *dstmem64,
325                                 uint64 *srcmem64,
326                                 unsigned int size_in_bytes,
327                                 AdlerChecksum *checksum,
328                                 struct page_entry *pe);
329   // Crc data with address tagging.
330   virtual bool AdlerAddrCrcC(uint64 *srcmem64,
331                              unsigned int size_in_bytes,
332                              AdlerChecksum *checksum,
333                              struct page_entry *pe);
334   // Report a mistagged cacheline.
335   bool ReportTagError(uint64 *mem64,
336                       uint64 actual,
337                       uint64 tag);
338   // Print out the error record of the tag mismatch.
339   void ProcessTagError(struct ErrorRecord *error,
340                        int priority,
341                        const char *message);
342
343   // A worker thread can yield itself to give up CPU until it's scheduled again
344   bool YieldSelf();
345
346  protected:
347   // General state variables that all subclasses need.
348   int thread_num_;                  // Thread ID.
349   volatile int status_;             // Error status.
350   volatile int64 pages_copied_;     // Recorded for memory bandwidth calc.
351   volatile int64 errorcount_;       // Miscompares seen by this thread.
352
353   volatile uint32 cpu_mask_;        // Cores this thread is allowed to run on.
354   volatile uint32 tag_;             // Tag hint for memory this thread can use.
355
356   bool tag_mode_;                   // Tag cachelines with vaddr.
357
358   // Thread timing variables.
359   struct timeval start_time_;        // Worker thread start time.
360   volatile int64 runduration_usec_;  // Worker run duration in u-seconds.
361
362   // Function passed to pthread_create.
363   void *(*thread_spawner_)(void *args);
364   pthread_t thread_;                // Pthread thread ID.
365   Priority priority_;               // Worker thread priority.
366   class Sat *sat_;                  // Reference to parent stest object.
367   class OsLayer *os_;               // Os abstraction: put hacks here.
368   class PatternList *patternlist_;  // Reference to data patterns.
369
370   // Work around style guide ban on sizeof(int).
371   static const uint64 iamint_ = 0;
372   static const int wordsize_ = sizeof(iamint_);
373
374  private:
375   WorkerStatus *worker_status_;
376
377   DISALLOW_COPY_AND_ASSIGN(WorkerThread);
378 };
379
380 // Worker thread to perform File IO.
381 class FileThread : public WorkerThread {
382  public:
383   FileThread();
384   // Set filename to use for file IO.
385   virtual void SetFile(const char *filename_init);
386   virtual int Work();
387
388   // Calculate worker thread specific bandwidth.
389   virtual float GetDeviceCopiedData()
390     {return GetCopiedData()*2;}
391   virtual float GetMemoryCopiedData();
392
393  protected:
394   // Record of where these pages were sourced from, and what
395   // potentially broken components they passed through.
396   struct PageRec {
397      struct Pattern *pattern;  // This is the data it should contain.
398      void *src;  // This is the memory location the data was sourced from.
399      void *dst;  // This is where it ended up.
400   };
401
402   // These are functions used by the various work loops.
403   // Pretty print and log a data miscompare. Disks require
404   // slightly different error handling.
405   virtual void ProcessError(struct ErrorRecord *er,
406                             int priority,
407                             const char *message);
408
409   virtual bool OpenFile(int *pfile);
410   virtual bool CloseFile(int fd);
411
412   // Read and write whole file to disk.
413   virtual bool WritePages(int fd);
414   virtual bool ReadPages(int fd);
415
416   // Read and write pages to disk.
417   virtual bool WritePageToFile(int fd, struct page_entry *src);
418   virtual bool ReadPageFromFile(int fd, struct page_entry *dst);
419
420   // Sector tagging support.
421   virtual bool SectorTagPage(struct page_entry *src, int block);
422   virtual bool SectorValidatePage(const struct PageRec &page,
423                                   struct page_entry *dst,
424                                   int block);
425
426   // Get memory for an incoming data transfer..
427   virtual bool PagePrepare();
428   // Remove memory allocated for data transfer.
429   virtual bool PageTeardown();
430
431   // Get memory for an incoming data transfer..
432   virtual bool GetEmptyPage(struct page_entry *dst);
433   // Get memory for an outgoing data transfer..
434   virtual bool GetValidPage(struct page_entry *dst);
435   // Throw out a used empty page.
436   virtual bool PutEmptyPage(struct page_entry *src);
437   // Throw out a used, filled page.
438   virtual bool PutValidPage(struct page_entry *src);
439
440
441   struct PageRec *page_recs_;          // Array of page records.
442   int crc_page_;                        // Page currently being CRC checked.
443   string filename_;                     // Name of file to access.
444   string devicename_;                   // Name of device file is on.
445
446   bool page_io_;                        // Use page pool for IO.
447   void *local_page_;                   // malloc'd page fon non-pool IO.
448   int pass_;                            // Number of writes to the file so far.
449
450   // Tag to detect file corruption.
451   struct SectorTag {
452     volatile uint8 magic;
453     volatile uint8 block;
454     volatile uint8 sector;
455     volatile uint8 pass;
456     char pad[512-4];
457   };
458
459   DISALLOW_COPY_AND_ASSIGN(FileThread);
460 };
461
462
463 // Worker thread to perform Network IO.
464 class NetworkThread : public WorkerThread {
465  public:
466   NetworkThread();
467   // Set hostname to use for net IO.
468   virtual void SetIP(const char *ipaddr_init);
469   virtual int Work();
470
471   // Calculate worker thread specific bandwidth.
472   virtual float GetDeviceCopiedData()
473     {return GetCopiedData()*2;}
474
475  protected:
476   // IsReadyToRunNoPause() wrapper, for NetworkSlaveThread to override.
477   virtual bool IsNetworkStopSet();
478   virtual bool CreateSocket(int *psocket);
479   virtual bool CloseSocket(int sock);
480   virtual bool Connect(int sock);
481   virtual bool SendPage(int sock, struct page_entry *src);
482   virtual bool ReceivePage(int sock, struct page_entry *dst);
483   char ipaddr_[256];
484   int sock_;
485
486  private:
487   DISALLOW_COPY_AND_ASSIGN(NetworkThread);
488 };
489
490 // Worker thread to reflect Network IO.
491 class NetworkSlaveThread : public NetworkThread {
492  public:
493   NetworkSlaveThread();
494   // Set socket for IO.
495   virtual void SetSock(int sock);
496   virtual int Work();
497
498  protected:
499   virtual bool IsNetworkStopSet();
500
501  private:
502   DISALLOW_COPY_AND_ASSIGN(NetworkSlaveThread);
503 };
504
505 // Worker thread to detect incoming Network IO.
506 class NetworkListenThread : public NetworkThread {
507  public:
508   NetworkListenThread();
509   virtual int Work();
510
511  private:
512   virtual bool Listen();
513   virtual bool Wait();
514   virtual bool GetConnection(int *pnewsock);
515   virtual bool SpawnSlave(int newsock, int threadid);
516   virtual bool ReapSlaves();
517
518   // For serviced incoming connections.
519   struct ChildWorker {
520     WorkerStatus status;
521     NetworkSlaveThread thread;
522   };
523   typedef vector<ChildWorker*> ChildVector;
524   ChildVector child_workers_;
525
526   DISALLOW_COPY_AND_ASSIGN(NetworkListenThread);
527 };
528
529 // Worker thread to perform Memory Copy.
530 class CopyThread : public WorkerThread {
531  public:
532   CopyThread() {}
533   virtual int Work();
534   // Calculate worker thread specific bandwidth.
535   virtual float GetMemoryCopiedData()
536     {return GetCopiedData()*2;}
537
538  private:
539   DISALLOW_COPY_AND_ASSIGN(CopyThread);
540 };
541
542 // Worker thread to perform Memory Invert.
543 class InvertThread : public WorkerThread {
544  public:
545   InvertThread() {}
546   virtual int Work();
547   // Calculate worker thread specific bandwidth.
548   virtual float GetMemoryCopiedData()
549     {return GetCopiedData()*4;}
550
551  private:
552   virtual int InvertPageUp(struct page_entry *srcpe);
553   virtual int InvertPageDown(struct page_entry *srcpe);
554   DISALLOW_COPY_AND_ASSIGN(InvertThread);
555 };
556
557 // Worker thread to fill blank pages on startup.
558 class FillThread : public WorkerThread {
559  public:
560   FillThread();
561   // Set how many pages this thread should fill before exiting.
562   virtual void SetFillPages(int64 num_pages_to_fill_init);
563   virtual int Work();
564
565  private:
566   // Fill a page with the data pattern in pe->pattern.
567   virtual bool FillPageRandom(struct page_entry *pe);
568   int64 num_pages_to_fill_;
569   DISALLOW_COPY_AND_ASSIGN(FillThread);
570 };
571
572 // Worker thread to verify page data matches pattern data.
573 // Thread will check and replace pages until "done" flag is set,
574 // then it will check and discard pages until no more remain.
575 class CheckThread : public WorkerThread {
576  public:
577   CheckThread() {}
578   virtual int Work();
579   // Calculate worker thread specific bandwidth.
580   virtual float GetMemoryCopiedData()
581     {return GetCopiedData();}
582
583  private:
584   DISALLOW_COPY_AND_ASSIGN(CheckThread);
585 };
586
587
588 // Worker thread to poll for system error messages.
589 // Thread will check for messages until "done" flag is set.
590 class ErrorPollThread : public WorkerThread {
591  public:
592   ErrorPollThread() {}
593   virtual int Work();
594
595  private:
596   DISALLOW_COPY_AND_ASSIGN(ErrorPollThread);
597 };
598
599 // Computation intensive worker thread to stress CPU.
600 class CpuStressThread : public WorkerThread {
601  public:
602   CpuStressThread() {}
603   virtual int Work();
604
605  private:
606   DISALLOW_COPY_AND_ASSIGN(CpuStressThread);
607 };
608
609 // Worker thread that tests the correctness of the
610 // CPU Cache Coherency Protocol.
611 class CpuCacheCoherencyThread : public WorkerThread {
612  public:
613   CpuCacheCoherencyThread(cc_cacheline_data *cc_data,
614                           int cc_cacheline_count_,
615                           int cc_thread_num_,
616                           int cc_inc_count_);
617   virtual int Work();
618
619  protected:
620   cc_cacheline_data *cc_cacheline_data_;  // Datstructure for each cacheline.
621   int cc_local_num_;        // Local counter for each thread.
622   int cc_cacheline_count_;  // Number of cache lines to operate on.
623   int cc_thread_num_;       // The integer id of the thread which is
624                             // used as an index into the integer array
625                             // of the cacheline datastructure.
626   int cc_inc_count_;        // Number of times to increment the counter.
627
628  private:
629   DISALLOW_COPY_AND_ASSIGN(CpuCacheCoherencyThread);
630 };
631
632 // Worker thread to perform disk test.
633 class DiskThread : public WorkerThread {
634  public:
635   explicit DiskThread(DiskBlockTable *block_table);
636   virtual ~DiskThread();
637   // Calculate disk thread specific bandwidth.
638   virtual float GetDeviceCopiedData() {
639     return (blocks_written_ * write_block_size_ +
640             blocks_read_ * read_block_size_) / kMegabyte;}
641
642   // Set filename for device file (in /dev).
643   virtual void SetDevice(const char *device_name);
644   // Set various parameters that control the behaviour of the test.
645   virtual bool SetParameters(int read_block_size,
646                              int write_block_size,
647                              int64 segment_size,
648                              int64 cache_size,
649                              int blocks_per_segment,
650                              int64 read_threshold,
651                              int64 write_threshold,
652                              int non_destructive);
653
654   virtual int Work();
655
656   virtual float GetMemoryCopiedData() {return 0;}
657
658  protected:
659   static const int kSectorSize = 512;       // Size of sector on disk.
660   static const int kBufferAlignment = 512;  // Buffer alignment required by the
661                                             // kernel.
662   static const int kBlockRetry = 100;       // Number of retries to allocate
663                                             // sectors.
664
665   enum IoOp {
666     ASYNC_IO_READ   = 0,
667     ASYNC_IO_WRITE  = 1
668   };
669
670   virtual bool OpenDevice(int *pfile);
671   virtual bool CloseDevice(int fd);
672
673   // Retrieves the size (in bytes) of the disk/file.
674   virtual bool GetDiskSize(int fd);
675
676   // Retrieves the current time in microseconds.
677   virtual int64 GetTime();
678
679   // Do an asynchronous disk I/O operation.
680   virtual bool AsyncDiskIO(IoOp op, int fd, void *buf, int64 size,
681                            int64 offset, int64 timeout);
682
683   // Write a block to disk.
684   virtual bool WriteBlockToDisk(int fd, BlockData *block);
685
686   // Verify a block on disk.
687   virtual bool ValidateBlockOnDisk(int fd, BlockData *block);
688
689   // Main work loop.
690   virtual bool DoWork(int fd);
691
692   int read_block_size_;       // Size of blocks read from disk, in bytes.
693   int write_block_size_;      // Size of blocks written to disk, in bytes.
694   int64 blocks_read_;         // Number of blocks read in work loop.
695   int64 blocks_written_;      // Number of blocks written in work loop.
696   int64 segment_size_;        // Size of disk segments (in bytes) that the disk
697                               // will be split into where testing can be
698                               // confined to a particular segment.
699                               // Allows for control of how evenly the disk will
700                               // be tested.  Smaller segments imply more even
701                               // testing (less random).
702   int blocks_per_segment_;    // Number of blocks that will be tested per
703                               // segment.
704   int cache_size_;            // Size of disk cache, in bytes.
705   int queue_size_;            // Length of in-flight-blocks queue, in blocks.
706   int non_destructive_;       // Use non-destructive mode or not.
707   int update_block_table_;    // If true, assume this is the thread
708                               // responsible for writing the data in the disk
709                               // for this block device and, therefore,
710                               // update the block table. If false, just use
711                               // the block table to get data.
712
713   // read/write times threshold for reporting a problem
714   int64 read_threshold_;      // Maximum time a read should take (in us) before
715                               // a warning is given.
716   int64 write_threshold_;     // Maximum time a write should take (in us) before
717                               // a warning is given.
718   int64 read_timeout_;        // Maximum time a read can take before a timeout
719                               // and the aborting of the read operation.
720   int64 write_timeout_;       // Maximum time a write can take before a timeout
721                               // and the aborting of the write operation.
722
723   string device_name_;        // Name of device file to access.
724   int64 device_sectors_;      // Number of sectors on the device.
725
726   std::queue<BlockData*> in_flight_sectors_;   // Queue of sectors written but
727                                                 // not verified.
728   void *block_buffer_;        // Pointer to aligned block buffer.
729
730   aio_context_t aio_ctx_;     // Asynchronous I/O context for Linux native AIO.
731
732   DiskBlockTable *block_table_;  // Disk Block Table, shared by all disk
733                                  // threads that read / write at the same
734                                  // device
735
736   DISALLOW_COPY_AND_ASSIGN(DiskThread);
737 };
738
739 class RandomDiskThread : public DiskThread {
740  public:
741   explicit RandomDiskThread(DiskBlockTable *block_table);
742   virtual ~RandomDiskThread();
743   // Main work loop.
744   virtual bool DoWork(int fd);
745  protected:
746   DISALLOW_COPY_AND_ASSIGN(RandomDiskThread);
747 };
748
749 // Worker thread to perform checks in a specific memory region.
750 class MemoryRegionThread : public WorkerThread {
751  public:
752   MemoryRegionThread();
753   ~MemoryRegionThread();
754   virtual int Work();
755   void ProcessError(struct ErrorRecord *error, int priority,
756                     const char *message);
757   bool SetRegion(void *region, int64 size);
758   // Calculate worker thread specific bandwidth.
759   virtual float GetMemoryCopiedData()
760     {return GetCopiedData();}
761   virtual float GetDeviceCopiedData()
762     {return GetCopiedData() * 2;}
763   void SetIdentifier(string identifier) {
764     identifier_ = identifier;
765   }
766
767  protected:
768   // Page queue for this particular memory region.
769   char *region_;
770   PageEntryQueue *pages_;
771   bool error_injection_;
772   int phase_;
773   string identifier_;
774   static const int kPhaseNoPhase = 0;
775   static const int kPhaseCopy = 1;
776   static const int kPhaseCheck = 2;
777
778  private:
779   DISALLOW_COPY_AND_ASSIGN(MemoryRegionThread);
780 };
781
782 #endif  // STRESSAPPTEST_WORKER_H_