use crate::loom::rand::seed;
use crate::loom::sync::{Arc, Mutex};
use crate::park::{Park, Unpark};
use crate::runtime;
use crate::runtime::park::{Parker, Unparker};
use crate::runtime::thread_pool::{AtomicCell, Idle};
use crate::runtime::{queue, task};
use crate::util::linked_list::LinkedList;
use crate::util::FastRand;
use std::cell::RefCell;
use std::time::Duration;
pub(super) struct Worker {
shared: Arc<Shared>,
index: usize,
core: AtomicCell<Core>,
}
struct Core {
tick: u8,
lifo_slot: Option<Notified>,
run_queue: queue::Local<Arc<Worker>>,
is_searching: bool,
is_shutdown: bool,
tasks: LinkedList<Task>,
park: Option<Parker>,
rand: FastRand,
}
pub(super) struct Shared {
remotes: Box<[Remote]>,
inject: queue::Inject<Arc<Worker>>,
idle: Idle,
shutdown_workers: Mutex<Vec<(Box<Core>, Arc<Worker>)>>,
}
struct Remote {
steal: queue::Steal<Arc<Worker>>,
pending_drop: task::TransferStack<Arc<Worker>>,
unpark: Unparker,
}
struct Context {
worker: Arc<Worker>,
core: RefCell<Option<Box<Core>>>,
}
pub(crate) struct Launch(Vec<Arc<Worker>>);
type RunResult = Result<Box<Core>, ()>;
type Task = task::Task<Arc<Worker>>;
type Notified = task::Notified<Arc<Worker>>;
scoped_thread_local!(static CURRENT: Context);
pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
let mut cores = vec![];
let mut remotes = vec![];
for _ in 0..size {
let (steal, run_queue) = queue::local();
let park = park.clone();
let unpark = park.unpark();
cores.push(Box::new(Core {
tick: 0,
lifo_slot: None,
run_queue,
is_searching: false,
is_shutdown: false,
tasks: LinkedList::new(),
park: Some(park),
rand: FastRand::new(seed()),
}));
remotes.push(Remote {
steal,
pending_drop: task::TransferStack::new(),
unpark,
});
}
let shared = Arc::new(Shared {
remotes: remotes.into_boxed_slice(),
inject: queue::Inject::new(),
idle: Idle::new(size),
shutdown_workers: Mutex::new(vec![]),
});
let mut launch = Launch(vec![]);
for (index, core) in cores.drain(..).enumerate() {
launch.0.push(Arc::new(Worker {
shared: shared.clone(),
index,
core: AtomicCell::new(Some(core)),
}));
}
(shared, launch)
}
cfg_blocking! {
pub(crate) fn block_in_place<F, R>(f: F) -> R
where
F: FnOnce() -> R,
{
struct Reset;
impl Drop for Reset {
fn drop(&mut self) {
CURRENT.with(|maybe_cx| {
if let Some(cx) = maybe_cx {
let core = cx.worker.core.take();
*cx.core.borrow_mut() = core;
}
});
}
}
CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("can call blocking only when running in a spawned task");
let core = match cx.core.borrow_mut().take() {
Some(core) => {
crate::coop::stop();
core
},
None => return,
};
assert!(core.park.is_some());
cx.worker.core.set(core);
let worker = cx.worker.clone();
runtime::spawn_blocking(move || run(worker));
});
let _reset = Reset;
f()
}
}
const GLOBAL_POLL_INTERVAL: u8 = 61;
impl Launch {
pub(crate) fn launch(mut self) {
for worker in self.0.drain(..) {
runtime::spawn_blocking(move || run(worker));
}
}
}
fn run(worker: Arc<Worker>) {
let core = match worker.core.take() {
Some(core) => core,
None => return,
};
let cx = Context {
worker,
core: RefCell::new(None),
};
let _enter = crate::runtime::enter();
CURRENT.set(&cx, || {
assert!(cx.run(core).is_err());
});
}
impl Context {
fn run(&self, mut core: Box<Core>) -> RunResult {
while !core.is_shutdown {
core.tick();
core = self.maintenance(core);
if let Some(task) = core.next_task(&self.worker) {
core = self.run_task(task, core)?;
continue;
}
if let Some(task) = core.steal_work(&self.worker) {
core = self.run_task(task, core)?;
} else {
core = self.park(core);
}
}
self.worker.shared.shutdown(core, self.worker.clone());
Err(())
}
fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
core.transition_from_searching(&self.worker);
*self.core.borrow_mut() = Some(core);
crate::coop::budget(|| {
task.run();
loop {
let mut core = match self.core.borrow_mut().take() {
Some(core) => core,
None => return Err(()),
};
let task = match core.lifo_slot.take() {
Some(task) => task,
None => return Ok(core),
};
if crate::coop::has_budget_remaining() {
*self.core.borrow_mut() = Some(core);
task.run();
} else {
core.run_queue.push_back(task, self.worker.inject());
return Ok(core);
}
}
})
}
fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
if core.tick % GLOBAL_POLL_INTERVAL == 0 {
core = self.park_timeout(core, Some(Duration::from_millis(0)));
core.maintenance(&self.worker);
}
core
}
fn park(&self, mut core: Box<Core>) -> Box<Core> {
core.transition_to_parked(&self.worker);
while !core.is_shutdown {
core = self.park_timeout(core, None);
core.maintenance(&self.worker);
if core.transition_from_parked(&self.worker) {
return core;
}
}
core
}
fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
let mut park = core.park.take().expect("park missing");
*self.core.borrow_mut() = Some(core);
if let Some(timeout) = duration {
park.park_timeout(timeout).expect("park failed");
} else {
park.park().expect("park failed");
}
core = self.core.borrow_mut().take().expect("core missing");
core.park = Some(park);
if core.run_queue.is_stealable() {
self.worker.shared.notify_parked();
}
core
}
}
impl Core {
fn tick(&mut self) {
self.tick = self.tick.wrapping_add(1);
}
fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
if self.tick % GLOBAL_POLL_INTERVAL == 0 {
worker.inject().pop().or_else(|| self.next_local_task())
} else {
self.next_local_task().or_else(|| worker.inject().pop())
}
}
fn next_local_task(&mut self) -> Option<Notified> {
self.lifo_slot.take().or_else(|| self.run_queue.pop())
}
fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
if !self.transition_to_searching(worker) {
return None;
}
let num = worker.shared.remotes.len();
let start = self.rand.fastrand_n(num as u32) as usize;
for i in 0..num {
let i = (start + i) % num;
if i == worker.index {
continue;
}
let target = &worker.shared.remotes[i];
if let Some(task) = target.steal.steal_into(&mut self.run_queue) {
return Some(task);
}
}
worker.shared.inject.pop()
}
fn transition_to_searching(&mut self, worker: &Worker) -> bool {
if !self.is_searching {
self.is_searching = worker.shared.idle.transition_worker_to_searching();
}
self.is_searching
}
fn transition_from_searching(&mut self, worker: &Worker) {
if !self.is_searching {
return;
}
self.is_searching = false;
worker.shared.transition_worker_from_searching();
}
fn transition_to_parked(&mut self, worker: &Worker) {
let is_last_searcher = worker
.shared
.idle
.transition_worker_to_parked(worker.index, self.is_searching);
self.is_searching = false;
if is_last_searcher {
worker.shared.notify_if_work_pending();
}
}
fn transition_from_parked(&mut self, worker: &Worker) -> bool {
if self.lifo_slot.is_some() {
worker.shared.idle.unpark_worker_by_id(worker.index);
self.is_searching = true;
return true;
}
if worker.shared.idle.is_parked(worker.index) {
return false;
}
self.is_searching = true;
true
}
fn maintenance(&mut self, worker: &Worker) {
self.drain_pending_drop(worker);
if !self.is_shutdown {
self.is_shutdown = worker.inject().is_closed();
}
}
fn shutdown(&mut self, worker: &Worker) {
let mut park = self.park.take().expect("park missing");
for header in self.tasks.iter() {
header.shutdown();
}
loop {
self.drain_pending_drop(worker);
if self.tasks.is_empty() {
break;
}
park.park().expect("park failed");
}
while let Some(_) = self.next_local_task() {}
}
fn drain_pending_drop(&mut self, worker: &Worker) {
use std::mem::ManuallyDrop;
for task in worker.remote().pending_drop.drain() {
let task = ManuallyDrop::new(task);
unsafe {
self.tasks.remove(task.header().into());
}
}
}
}
impl Worker {
fn inject(&self) -> &queue::Inject<Arc<Worker>> {
&self.shared.inject
}
fn remote(&self) -> &Remote {
&self.shared.remotes[self.index]
}
fn eq(&self, other: &Worker) -> bool {
self.shared.ptr_eq(&other.shared) && self.index == other.index
}
}
impl task::Schedule for Arc<Worker> {
fn bind(task: Task) -> Arc<Worker> {
CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("scheduler context missing");
cx.core
.borrow_mut()
.as_mut()
.expect("scheduler core missing")
.tasks
.push_front(task);
cx.worker.clone()
})
}
fn release(&self, task: &Task) -> Option<Task> {
use std::ptr::NonNull;
CURRENT.with(|maybe_cx| {
let cx = maybe_cx.expect("scheduler context missing");
if self.eq(&cx.worker) {
let mut maybe_core = cx.core.borrow_mut();
if let Some(core) = &mut *maybe_core {
unsafe {
let ptr = NonNull::from(task.header());
return core.tasks.remove(ptr);
}
}
}
let task = unsafe { Task::from_raw(task.header().into()) };
self.remote().pending_drop.push(task);
if cx.core.borrow().is_some() {
return None;
}
if self.inject().is_closed() {
self.remote().unpark.unpark();
}
None
})
}
fn schedule(&self, task: Notified) {
self.shared.schedule(task, false);
}
fn yield_now(&self, task: Notified) {
self.shared.schedule(task, true);
}
}
impl Shared {
pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
CURRENT.with(|maybe_cx| {
if let Some(cx) = maybe_cx {
if self.ptr_eq(&cx.worker.shared) {
if let Some(core) = cx.core.borrow_mut().as_mut() {
self.schedule_local(core, task, is_yield);
return;
}
}
}
self.inject.push(task);
self.notify_parked();
});
}
fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
let should_notify = if is_yield {
core.run_queue.push_back(task, &self.inject);
true
} else {
let prev = core.lifo_slot.take();
let ret = prev.is_some();
if let Some(prev) = prev {
core.run_queue.push_back(prev, &self.inject);
}
core.lifo_slot = Some(task);
ret
};
if should_notify && core.park.is_some() {
self.notify_parked();
}
}
pub(super) fn close(&self) {
if self.inject.close() {
self.notify_all();
}
}
fn notify_parked(&self) {
if let Some(index) = self.idle.worker_to_notify() {
self.remotes[index].unpark.unpark();
}
}
fn notify_all(&self) {
for remote in &self.remotes[..] {
remote.unpark.unpark();
}
}
fn notify_if_work_pending(&self) {
for remote in &self.remotes[..] {
if !remote.steal.is_empty() {
self.notify_parked();
return;
}
}
if !self.inject.is_empty() {
self.notify_parked();
}
}
fn transition_worker_from_searching(&self) {
if self.idle.transition_worker_from_searching() {
self.notify_parked();
}
}
fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) {
let mut workers = self.shutdown_workers.lock().unwrap();
workers.push((core, worker));
if workers.len() != self.remotes.len() {
return;
}
for (mut core, worker) in workers.drain(..) {
core.shutdown(&worker);
}
while let Some(_) = self.inject.pop() {}
}
fn ptr_eq(&self, other: &Shared) -> bool {
self as *const _ == other as *const _
}
}