1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
use atomic::{Atomic, Ordering};

use crate::{stream::PollRecv, Context};

use super::{
    notifier::Notifier,
    oneshot_cell::{OneshotCell, TryRecvError},
};

#[derive(Copy, Clone)]
enum State {
    Alive,
    Dead,
}

pub struct Transfer<T: Sized> {
    sender: Atomic<State>,
    receiver: Atomic<State>,
    value: OneshotCell<T>,
    notify_rx: Notifier,
}

impl<T> Transfer<T> {
    pub fn new() -> Self {
        Self {
            sender: Atomic::new(State::Alive),
            receiver: Atomic::new(State::Alive),
            value: OneshotCell::new(),
            notify_rx: Notifier::new(),
        }
    }

    pub fn send(&self, value: T) -> Result<(), T> {
        if let State::Dead = self.receiver.load(Ordering::Acquire) {
            return Err(value);
        }

        self.value.send(value)?;
        self.notify_rx.notify();

        Ok(())
    }

    pub fn recv(&self, cx: &Context<'_>) -> PollRecv<T> {
        loop {
            let guard = self.notify_rx.guard();
            match self.value.try_recv() {
                Ok(value) => return PollRecv::Ready(value),
                Err(TryRecvError::Pending) => {
                    if let State::Dead = self.sender.load(Ordering::Acquire) {
                        return match self.value.try_recv() {
                            Ok(v) => PollRecv::Ready(v),
                            Err(TryRecvError::Pending) => PollRecv::Closed,
                            Err(TryRecvError::Closed) => PollRecv::Closed,
                        };
                    }

                    self.notify_rx.subscribe(cx);

                    if guard.is_expired() {
                        continue;
                    }

                    return PollRecv::Pending;
                }
                Err(TryRecvError::Closed) => return PollRecv::Closed,
            }
        }
    }

    pub fn sender_disconnect(&self) {
        self.sender.store(State::Dead, Ordering::Release);
        self.notify_rx.notify();
    }

    pub fn receiver_disconnect(&self) {
        self.receiver.store(State::Dead, Ordering::Release);
    }
}