1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
use atomic::Ordering;
use crossbeam_queue::SegQueue;
use std::{sync::atomic::AtomicUsize, task::Waker};

#[derive(Debug)]
pub struct Notifier {
    generation: AtomicUsize,
    wakers: SegQueue<Waker>,
}

impl Notifier {
    pub fn new() -> Self {
        Self {
            generation: AtomicUsize::new(0),
            wakers: SegQueue::new(),
        }
    }

    pub fn guard(&self) -> NotificationGuard {
        let generation = self.generation.load(Ordering::Relaxed);

        NotificationGuard {
            generation,
            stored_generation: &self.generation,
        }
    }

    pub fn notify(&self) {
        self.generation.fetch_add(1, Ordering::AcqRel);

        #[cfg(feature = "debug")]
        let mut woken = 0usize;

        while let Some(waker) = self.wakers.pop() {
            #[cfg(feature = "debug")]
            {
                woken += 1;
            }

            waker.wake();
        }

        #[cfg(feature = "debug")]
        if woken > 0 {
            log::info!("Woke {} tasks", woken);
        }
    }

    pub fn subscribe(&self, cx: &crate::Context<'_>) {
        if let Some(waker) = cx.waker() {
            self.wakers.push(waker.clone());
        }
    }
}

pub struct NotificationGuard<'a> {
    generation: usize,
    stored_generation: &'a AtomicUsize,
}

impl<'a> NotificationGuard<'a> {
    pub fn is_expired(&self) -> bool {
        self.stored_generation.load(Ordering::Relaxed) != self.generation
    }
}