1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
use crate::pending::{GuardStatus, RequestId};
use crate::GuardMgrInner;
#[cfg(test)]
use futures::channel::oneshot;
use futures::{channel::mpsc, stream::StreamExt};
use tor_proto::ClockSkew;
use std::sync::{Mutex, Weak};
#[derive(Debug)]
pub(crate) enum Msg {
Status(RequestId, GuardStatus, Option<ClockSkew>),
#[cfg(test)]
Ping(oneshot::Sender<()>),
}
pub(crate) async fn report_status_events(
runtime: impl tor_rtcompat::SleepProvider,
inner: Weak<Mutex<GuardMgrInner>>,
mut events: mpsc::UnboundedReceiver<Msg>,
) {
loop {
match events.next().await {
Some(Msg::Status(id, status, skew)) => {
if let Some(inner) = inner.upgrade() {
let mut inner = inner.lock().expect("Poisoned lock");
inner.handle_msg(id, status, skew, &runtime);
} else {
return;
}
}
#[cfg(test)]
Some(Msg::Ping(sender)) => {
let _ignore = sender.send(());
}
None => return,
}
}
}
pub(crate) async fn run_periodic<R: tor_rtcompat::SleepProvider>(
runtime: R,
inner: Weak<Mutex<GuardMgrInner>>,
) {
loop {
let delay = if let Some(inner) = inner.upgrade() {
let mut inner = inner.lock().expect("Poisoned lock");
let wallclock = runtime.wallclock();
let now = runtime.now();
inner.run_periodic_events(wallclock, now)
} else {
return;
};
runtime.sleep(delay).await;
}
}
pub(crate) async fn keep_netdir_updated<RT: tor_rtcompat::Runtime>(
runtime: RT,
inner: Weak<Mutex<GuardMgrInner>>,
netdir_provider: Weak<dyn tor_netdir::NetDirProvider>,
) {
use tor_netdir::DirEvent;
let mut event_stream = match netdir_provider.upgrade().map(|p| p.events()) {
Some(s) => s,
None => return,
};
while let Some(event) = event_stream.next().await {
match event {
DirEvent::NewConsensus | DirEvent::NewDescriptors => {
if let (Some(inner), Some(provider)) = (inner.upgrade(), netdir_provider.upgrade())
{
let mut inner = inner.lock().expect("Poisoned lock");
if let Some(netdir) = provider.latest_netdir() {
inner.update(runtime.wallclock(), Some(&netdir));
}
}
}
_ => {}
}
}
}