1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//! Declare the lowest level of stream: a stream that operates on raw
//! cells.

use crate::circuit::{sendme, StreamTarget};
use crate::{Error, Result};
use tor_cell::relaycell::msg::RelayMsg;

use crate::circuit::sendme::StreamRecvWindow;
use futures::channel::mpsc;
use futures::stream::StreamExt;

/// The read part of a stream on a particular circuit.
#[derive(Debug)]
pub struct StreamReader {
    /// The underlying `StreamTarget` for this stream.
    pub(crate) target: StreamTarget,
    /// Channel to receive stream messages from the reactor.
    pub(crate) receiver: mpsc::Receiver<RelayMsg>,
    /// Congestion control receive window for this stream.
    ///
    /// Having this here means we're only going to update it when the end consumer of this stream
    /// actually reads things, meaning we don't ask for more data until it's actually needed (as
    /// opposed to having the reactor assume we're always reading, and potentially overwhelm itself
    /// with having to buffer data).
    pub(crate) recv_window: StreamRecvWindow,
    /// Whether or not this stream has ended.
    pub(crate) ended: bool,
}

impl StreamReader {
    /// Try to read the next relay message from this stream.
    async fn recv_raw(&mut self) -> Result<RelayMsg> {
        if self.ended {
            // Prevent reading from streams after they've ended.
            return Err(Error::NotConnected);
        }
        let msg = self
            .receiver
            .next()
            .await
            // This probably means that the other side closed the
            // mpsc channel.  I'm not sure the error type is correct though?
            .ok_or_else(|| {
                Error::StreamProto("stream channel disappeared without END cell?".into())
            })?;

        if sendme::msg_counts_towards_windows(&msg) && self.recv_window.take()? {
            self.target.send_sendme()?;
            self.recv_window.put();
        }

        Ok(msg)
    }

    /// As recv_raw, but if there is an error or an end cell, note that this
    /// stream has ended.
    pub async fn recv(&mut self) -> Result<RelayMsg> {
        let val = self.recv_raw().await;
        match val {
            Err(_) | Ok(RelayMsg::End(_)) => {
                self.ended = true;
            }
            _ => {}
        }
        val
    }

    /// Shut down this stream.
    pub fn protocol_error(&mut self) {
        self.target.protocol_error();
    }
}