1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
//! Tools for generating a stream of structured events, similar to C tor's `ControlPort`.

// @@ begin lint list maintained by maint/add_warning @@
#![deny(missing_docs)]
#![warn(noop_method_call)]
#![deny(unreachable_pub)]
#![warn(clippy::all)]
#![deny(clippy::await_holding_lock)]
#![deny(clippy::cargo_common_metadata)]
#![deny(clippy::cast_lossless)]
#![deny(clippy::checked_conversions)]
#![warn(clippy::cognitive_complexity)]
#![deny(clippy::debug_assert_with_mut_call)]
#![deny(clippy::exhaustive_enums)]
#![deny(clippy::exhaustive_structs)]
#![deny(clippy::expl_impl_clone_on_copy)]
#![deny(clippy::fallible_impl_from)]
#![deny(clippy::implicit_clone)]
#![deny(clippy::large_stack_arrays)]
#![warn(clippy::manual_ok_or)]
#![deny(clippy::missing_docs_in_private_items)]
#![deny(clippy::missing_panics_doc)]
#![warn(clippy::needless_borrow)]
#![warn(clippy::needless_pass_by_value)]
#![warn(clippy::option_option)]
#![warn(clippy::rc_buffer)]
#![deny(clippy::ref_option_ref)]
#![warn(clippy::semicolon_if_nothing_returned)]
#![warn(clippy::trait_duplication_in_bounds)]
#![deny(clippy::unnecessary_wraps)]
#![warn(clippy::unseparated_literal_suffix)]
#![deny(clippy::unwrap_used)]
#![allow(clippy::let_unit_value)] // This can reasonably be done for explicitness
//! <!-- @@ end lint list maintained by maint/add_warning @@ -->

pub mod events;

use crate::events::{TorEvent, TorEventKind};
use async_broadcast::{InactiveReceiver, Receiver, Sender, TrySendError};
use futures::channel::mpsc;
use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
use futures::future::Either;
use futures::StreamExt;
use once_cell::sync::OnceCell;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::{Context, Poll};
use thiserror::Error;
use tracing::{error, warn};

/// Pointer to an `UnboundedSender`, used to send events into the `EventReactor`.
static EVENT_SENDER: OnceCell<UnboundedSender<TorEvent>> = OnceCell::new();
/// An inactive receiver for the currently active broadcast channel, if there is one.
static CURRENT_RECEIVER: OnceCell<InactiveReceiver<TorEvent>> = OnceCell::new();
/// The number of `TorEventKind`s there are.
const EVENT_KIND_COUNT: usize = 1;
/// An array containing one `AtomicUsize` for each `TorEventKind`, used to track subscriptions.
///
/// When a `TorEventReceiver` subscribes to a `TorEventKind`, it uses its `usize` value to index
/// into this array and increment the associated `AtomicUsize` (and decrements it to unsubscribe).
/// This lets event emitters check whether there are any subscribers, and avoid emitting events
/// if there aren't.
static EVENT_SUBSCRIBERS: [AtomicUsize; EVENT_KIND_COUNT] = [AtomicUsize::new(0); EVENT_KIND_COUNT];

/// The size of the internal broadcast channel used to implement event subscription.
pub static BROADCAST_CAPACITY: usize = 512;

/// A reactor used to forward events to make the event reporting system work.
///
/// # Note
///
/// Currently, this type is a singleton; there is one event reporting system used for the entire
/// program. This is not stable, and may change in future.
pub struct EventReactor {
    /// A receiver that the reactor uses to learn about incoming events.
    ///
    /// This is unbounded so that event publication doesn't have to be async.
    receiver: UnboundedReceiver<TorEvent>,
    /// A sender that the reactor uses to publish events.
    ///
    /// Events are only sent here if at least one subscriber currently wants them.
    broadcast: Sender<TorEvent>,
}

impl EventReactor {
    /// Initialize the event reporting system, returning a reactor that must be run for it to work,
    /// and a `TorEventReceiver` that can be used to extract events from the system. If the system
    /// has already been initialized, returns `None` instead of a reactor.
    ///
    /// # Warnings
    ///
    /// The returned reactor *must* be run with `EventReactor::run`, in a background async task.
    /// If it is not, the event system might consume unbounded amounts of memory.
    pub fn new() -> Option<Self> {
        let (tx, rx) = mpsc::unbounded();
        if EVENT_SENDER.set(tx).is_ok() {
            let (btx, brx) = async_broadcast::broadcast(BROADCAST_CAPACITY);
            CURRENT_RECEIVER
                .set(brx.deactivate())
                .expect("CURRENT_RECEIVER can't be set if EVENT_SENDER is unset!");
            Some(Self {
                receiver: rx,
                broadcast: btx,
            })
        } else {
            None
        }
    }
    /// Get a `TorEventReceiver` to receive events from, assuming an `EventReactor` is already
    /// running somewhere. (If it isn't, returns `None`.)
    ///
    /// As noted in the type-level documentation, this function might not always work this way.
    pub fn receiver() -> Option<TorEventReceiver> {
        CURRENT_RECEIVER
            .get()
            .map(|rx| TorEventReceiver::wrap(rx.clone()))
    }
    /// Run the event forwarding reactor.
    ///
    /// You *must* call this function once a reactor is created.
    pub async fn run(mut self) {
        while let Some(event) = self.receiver.next().await {
            match self.broadcast.try_broadcast(event) {
                Ok(_) => {}
                Err(TrySendError::Closed(_)) => break,
                Err(TrySendError::Full(event)) => {
                    // If the channel is full, do a blocking broadcast to wait for it to be
                    // not full, and log a warning about receivers lagging behind.
                    warn!("TorEventReceivers aren't receiving events fast enough!");
                    if self.broadcast.broadcast(event).await.is_err() {
                        break;
                    }
                }
                Err(TrySendError::Inactive(_)) => {
                    // no active receivers, so just drop the event on the floor.
                }
            }
        }
        // It shouldn't be possible to get here, since we have globals keeping the channels
        // open. Still, if we somehow do, log an error about it.
        error!("event reactor shutting down; this shouldn't ever happen");
    }
}

/// An error encountered when trying to receive a `TorEvent`.
#[derive(Clone, Debug, Error)]
#[non_exhaustive]
pub enum ReceiverError {
    /// The receiver isn't subscribed to anything, so wouldn't ever return any events.
    #[error("no event subscriptions")]
    NoSubscriptions,
    /// The internal broadcast channel was closed, which shouldn't ever happen.
    #[error("internal event broadcast channel closed")]
    ChannelClosed,
}

/// A receiver for `TorEvent`s emitted by other users of this crate.
///
/// To use this type, first subscribe to some kinds of event by calling
/// `TorEventReceiver::subscribe`. Then, consume events using the implementation of
/// `futures::stream::Stream`.
///
/// # Warning
///
/// Once interest in events has been signalled with `subscribe`, events must be continuously
/// read from the receiver in order to avoid excessive memory consumption.
#[derive(Clone, Debug)]
pub struct TorEventReceiver {
    /// If no events have been subscribed to yet, this is an `InactiveReceiver`; otherwise,
    /// it's a `Receiver`.
    inner: Either<Receiver<TorEvent>, InactiveReceiver<TorEvent>>,
    /// Whether we're subscribed to each event kind (if `subscribed[kind]` is true, we're
    /// subscribed to `kind`).
    subscribed: [bool; EVENT_KIND_COUNT],
}

impl futures::stream::Stream for TorEventReceiver {
    type Item = TorEvent;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();
        match this.inner {
            Either::Left(ref mut active) => loop {
                match Pin::new(&mut *active).poll_next(cx) {
                    Poll::Ready(Some(e)) => {
                        if this.subscribed[e.kind() as usize] {
                            return Poll::Ready(Some(e));
                        }
                        // loop, since we weren't subscribed to that event
                    }
                    x => return x,
                }
            },
            Either::Right(_) => {
                warn!("TorEventReceiver::poll_next() called without subscriptions!");
                Poll::Ready(None)
            }
        }
    }
}

impl TorEventReceiver {
    /// Create a `TorEventReceiver` from an `InactiveReceiver` handle.
    pub(crate) fn wrap(rx: InactiveReceiver<TorEvent>) -> Self {
        Self {
            inner: Either::Right(rx),
            subscribed: [false; EVENT_KIND_COUNT],
        }
    }
    /// Subscribe to a given kind of `TorEvent`.
    ///
    /// After calling this function, `TorEventReceiver::recv` will emit events of that kind.
    /// This function is idempotent (subscribing twice has the same effect as doing so once).
    pub fn subscribe(&mut self, kind: TorEventKind) {
        if !self.subscribed[kind as usize] {
            EVENT_SUBSCRIBERS[kind as usize].fetch_add(1, Ordering::SeqCst);
            self.subscribed[kind as usize] = true;
        }
        // FIXME(eta): cloning is ungood, but hard to avoid
        if let Either::Right(inactive) = self.inner.clone() {
            self.inner = Either::Left(inactive.activate());
        }
    }
    /// Unsubscribe from a given kind of `TorEvent`.
    ///
    /// After calling this function, `TorEventReceiver::recv` will no longer emit events of that
    /// kind.
    /// This function is idempotent (unsubscribing twice has the same effect as doing so once).
    pub fn unsubscribe(&mut self, kind: TorEventKind) {
        if self.subscribed[kind as usize] {
            EVENT_SUBSCRIBERS[kind as usize].fetch_sub(1, Ordering::SeqCst);
            self.subscribed[kind as usize] = false;
        }
        // If we're now not subscribed to anything, deactivate our channel.
        if self.subscribed.iter().all(|x| !*x) {
            // FIXME(eta): cloning is ungood, but hard to avoid
            if let Either::Left(active) = self.inner.clone() {
                self.inner = Either::Right(active.deactivate());
            }
        }
    }
}

impl Drop for TorEventReceiver {
    fn drop(&mut self) {
        for (i, subscribed) in self.subscribed.iter().enumerate() {
            // FIXME(eta): duplicates logic from Self::unsubscribe, because it's not possible
            //             to go from a `usize` to a `TorEventKind`
            if *subscribed {
                EVENT_SUBSCRIBERS[i].fetch_sub(1, Ordering::SeqCst);
            }
        }
    }
}

/// Returns a boolean indicating whether the event `kind` has any subscribers (as in,
/// whether `TorEventReceiver::subscribe` has been called with that event kind).
///
/// This is useful to avoid doing work to generate events that might be computationally expensive
/// to generate.
pub fn event_has_subscribers(kind: TorEventKind) -> bool {
    EVENT_SUBSCRIBERS[kind as usize].load(Ordering::SeqCst) > 0
}

/// Broadcast the given `TorEvent` to any interested subscribers.
///
/// As an optimization, does nothing if the event has no subscribers (`event_has_subscribers`
/// returns false). (also does nothing if the event subsystem hasn't been initialized yet)
///
/// This function isn't intended for use outside Arti crates (as in, library consumers of Arti
/// shouldn't broadcast events!).
pub fn broadcast(event: TorEvent) {
    if !event_has_subscribers(event.kind()) {
        return;
    }
    if let Some(sender) = EVENT_SENDER.get() {
        // If this fails, there isn't much we can really do about it!
        let _ = sender.unbounded_send(event);
    }
}

#[cfg(test)]
mod test {
    #![allow(clippy::unwrap_used)]
    use crate::{
        broadcast, event_has_subscribers, EventReactor, StreamExt, TorEvent, TorEventKind,
    };
    use once_cell::sync::OnceCell;
    use std::sync::{Mutex, MutexGuard};
    use std::time::Duration;
    use tokio::runtime::Runtime;

    // HACK(eta): these tests need to run effectively singlethreaded, since they mutate global
    //            state. They *also* need to share the same tokio runtime, which the
    //            #[tokio::test] thing doesn't do (it makes a new runtime per test), because of
    //            the need to have a background singleton EventReactor.
    //
    //            To hack around this, we just have a global runtime protected by a mutex!
    static TEST_MUTEX: OnceCell<Mutex<Runtime>> = OnceCell::new();

    /// Locks the mutex, and makes sure the event reactor is initialized.
    fn test_setup() -> MutexGuard<'static, Runtime> {
        let mutex = TEST_MUTEX.get_or_init(|| Mutex::new(Runtime::new().unwrap()));
        let runtime = mutex
            .lock()
            .expect("mutex poisoned, probably by other failing tests");
        if let Some(reactor) = EventReactor::new() {
            runtime.handle().spawn(reactor.run());
        }
        runtime
    }

    #[test]
    fn subscriptions() {
        let rt = test_setup();

        rt.block_on(async move {
            // shouldn't have any subscribers at the start
            assert!(!event_has_subscribers(TorEventKind::Empty));

            let mut rx = EventReactor::receiver().unwrap();
            // creating a receiver shouldn't result in any subscriptions
            assert!(!event_has_subscribers(TorEventKind::Empty));

            rx.subscribe(TorEventKind::Empty);
            // subscription should work
            assert!(event_has_subscribers(TorEventKind::Empty));

            rx.unsubscribe(TorEventKind::Empty);
            // unsubscribing should work
            assert!(!event_has_subscribers(TorEventKind::Empty));

            // subscription should be idempotent
            rx.subscribe(TorEventKind::Empty);
            rx.subscribe(TorEventKind::Empty);
            rx.subscribe(TorEventKind::Empty);
            assert!(event_has_subscribers(TorEventKind::Empty));

            rx.unsubscribe(TorEventKind::Empty);
            assert!(!event_has_subscribers(TorEventKind::Empty));

            rx.subscribe(TorEventKind::Empty);
            assert!(event_has_subscribers(TorEventKind::Empty));

            std::mem::drop(rx);
            // dropping the receiver should auto-unsubscribe
            assert!(!event_has_subscribers(TorEventKind::Empty));
        });
    }

    #[test]
    fn empty_recv() {
        let rt = test_setup();

        rt.block_on(async move {
            let mut rx = EventReactor::receiver().unwrap();
            // attempting to read from a receiver with no subscriptions should return None
            let result = rx.next().await;
            assert!(result.is_none());
        });
    }

    #[test]
    fn receives_events() {
        let rt = test_setup();

        rt.block_on(async move {
            let mut rx = EventReactor::receiver().unwrap();
            rx.subscribe(TorEventKind::Empty);
            // HACK(eta): give the event reactor time to run
            tokio::time::sleep(Duration::from_millis(100)).await;
            broadcast(TorEvent::Empty);

            let result = rx.next().await;
            assert_eq!(result, Some(TorEvent::Empty));
        });
    }

    #[test]
    fn does_not_send_to_no_subscribers() {
        let rt = test_setup();

        rt.block_on(async move {
            // this event should just get dropped on the floor, because no subscribers exist
            broadcast(TorEvent::Empty);

            let mut rx = EventReactor::receiver().unwrap();
            rx.subscribe(TorEventKind::Empty);

            // this shouldn't have an event to receive now
            let result = tokio::time::timeout(Duration::from_millis(100), rx.next()).await;
            assert!(result.is_err());
        });
    }
}