1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use crate::loom::sync::Arc;
use crate::runtime::thread_pool::queue::{local, Cluster, Inject};
use crate::task::Task;

use std::cell::Cell;
use std::fmt;

pub(crate) struct Worker<T: 'static> {
    cluster: Arc<Cluster<T>>,
    index: u16,
    /// Task to pop next
    next: Cell<Option<Task<T>>>,
}

impl<T: 'static> Worker<T> {
    pub(super) fn new(cluster: Arc<Cluster<T>>, index: usize) -> Worker<T> {
        Worker {
            cluster,
            index: index as u16,
            next: Cell::new(None),
        }
    }

    pub(crate) fn injector(&self) -> Inject<T> {
        Inject::new(self.cluster.clone())
    }

    /// Returns `true` if the queue is closed
    pub(crate) fn is_closed(&self) -> bool {
        self.cluster.global.is_closed()
    }

    /// Pushes to the local queue.
    ///
    /// If the local queue is full, the task is pushed onto the global queue.
    ///
    /// # Return
    ///
    /// Returns `true` if the pushed task can be stolen by another worker.
    pub(crate) fn push(&self, task: Task<T>) -> bool {
        let prev = self.next.take();
        let ret = prev.is_some();

        if let Some(prev) = prev {
            // safety: we guarantee that only one thread pushes to this local
            // queue at a time.
            unsafe {
                self.local().push(prev, &self.cluster.global);
            }
        }

        self.next.set(Some(task));

        ret
    }

    pub(crate) fn push_yield(&self, task: Task<T>) {
        unsafe { self.local().push(task, &self.cluster.global) }
    }

    /// Pops a task checking the local queue first.
    pub(crate) fn pop_local_first(&self) -> Option<Task<T>> {
        self.local_pop().or_else(|| self.cluster.global.pop())
    }

    /// Pops a task checking the global queue first.
    pub(crate) fn pop_global_first(&self) -> Option<Task<T>> {
        self.cluster.global.pop().or_else(|| self.local_pop())
    }

    /// Steals from other local queues.
    ///
    /// `start` specifies the queue from which to start stealing.
    pub(crate) fn steal(&self, start: usize) -> Option<Task<T>> {
        let num_queues = self.cluster.local.len();

        for i in 0..num_queues {
            let i = (start + i) % num_queues;

            if i == self.index as usize {
                continue;
            }

            // safety: we own the dst queue
            let ret = unsafe { self.cluster.local[i].steal(self.local()) };

            if ret.is_some() {
                return ret;
            }
        }

        None
    }

    /// An approximation of whether or not the queue is empty.
    pub(crate) fn is_empty(&self) -> bool {
        for local_queue in &self.cluster.local[..] {
            if !local_queue.is_empty() {
                return false;
            }
        }

        self.cluster.global.is_empty()
    }

    fn local_pop(&self) -> Option<Task<T>> {
        if let Some(task) = self.next.take() {
            return Some(task);
        }
        // safety: we guarantee that only one thread pushes to this local queue
        // at a time.
        unsafe { self.local().pop() }
    }

    fn local(&self) -> &local::Queue<T> {
        &self.cluster.local[self.index as usize]
    }
}

impl<T: 'static> fmt::Debug for Worker<T> {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt.debug_struct("queue::Worker")
            .field("cluster", &"...")
            .field("index", &self.index)
            .finish()
    }
}