1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
use std::sync::Arc;
use notifier::Notifier;
use ref_count::RefCount;
use std::fmt::Debug;
use crate::Context;
use self::{notifier::NotificationGuard, ref_count::TryDecrement};
pub mod mpmc_circular_buffer;
pub mod notifier;
mod oneshot_cell;
mod ref_count;
mod state_cell;
pub(crate) mod transfer;
pub(crate) fn shared<E>(extension: E) -> (SenderShared<E>, ReceiverShared<E>) {
let inner = Arc::new(Shared::new(extension));
let sender = SenderShared {
inner: inner.clone(),
};
let receiver = ReceiverShared { inner };
(sender, receiver)
}
#[derive(Debug)]
pub struct Shared<E> {
sender_notify: Notifier,
sender_count: RefCount,
receiver_notify: Notifier,
receiver_count: RefCount,
pub(crate) extension: E,
}
impl<E> Shared<E> {
pub fn new(extension: E) -> Self {
Self {
sender_notify: Notifier::new(),
sender_count: RefCount::new(1),
receiver_notify: Notifier::new(),
receiver_count: RefCount::new(1),
extension,
}
}
}
pub struct SenderShared<E> {
inner: Arc<Shared<E>>,
}
impl<E> SenderShared<E> {
pub fn extension(&self) -> &E {
&self.inner.extension
}
pub fn notify_receivers(&self) {
self.inner.receiver_notify.notify();
}
pub fn notify_self(&self) {
self.inner.sender_notify.notify();
}
pub fn subscribe_recv(&self, cx: &Context<'_>) {
self.inner.sender_notify.subscribe(cx);
}
pub fn recv_guard(&self) -> NotificationGuard {
self.inner.sender_notify.guard()
}
pub fn is_alive(&self) -> bool {
self.inner.receiver_count.is_alive()
}
pub fn clone_receiver(&self) -> ReceiverShared<E> {
self.inner.receiver_count.increment();
ReceiverShared {
inner: self.inner.clone(),
}
}
pub fn is_closed(&self) -> bool {
!self.is_alive()
}
}
impl<E> Debug for SenderShared<E>
where
E: Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.inner.fmt(f)
}
}
impl<E> Clone for SenderShared<E> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
inner.sender_count.increment();
Self { inner }
}
}
impl<E> Drop for SenderShared<E> {
fn drop(&mut self) {
match self.inner.sender_count.decrement() {
TryDecrement::Alive(_) => {}
TryDecrement::Dead => {
self.notify_receivers();
}
}
}
}
pub struct ReceiverShared<E> {
pub(crate) inner: Arc<Shared<E>>,
}
impl<E> ReceiverShared<E> {
pub fn extension(&self) -> &E {
&self.inner.extension
}
pub fn notify_senders(&self) {
self.inner.sender_notify.notify();
}
pub fn subscribe_send(&self, cx: &Context<'_>) {
self.inner.receiver_notify.subscribe(cx);
}
pub fn send_guard(&self) -> NotificationGuard {
self.inner.receiver_notify.guard()
}
pub fn is_alive(&self) -> bool {
self.inner.sender_count.is_alive()
}
pub fn is_closed(&self) -> bool {
!self.is_alive()
}
}
impl<E> Clone for ReceiverShared<E> {
fn clone(&self) -> Self {
let inner = self.inner.clone();
inner.receiver_count.increment();
Self { inner }
}
}
impl<E> Drop for ReceiverShared<E> {
fn drop(&mut self) {
match self.inner.receiver_count.decrement() {
TryDecrement::Alive(_) => {}
TryDecrement::Dead => {
self.notify_senders();
}
}
}
}