1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
use std::{marker::PhantomData, pin::Pin};

use crate::stream::{PollRecv, Stream};
use crate::Context;
use pin_project::pin_project;

#[pin_project]
pub struct MapStream<From, Map, Into> {
    #[pin]
    from: From,

    map: Map,
    into: PhantomData<Into>,
}

impl<From, Map, Into> MapStream<From, Map, Into>
where
    From: Stream,
    Map: Fn(From::Item) -> Into,
{
    pub fn new(from: From, map: Map) -> Self {
        Self {
            from,
            map,
            into: PhantomData,
        }
    }
}

impl<From, Map, Into> Stream for MapStream<From, Map, Into>
where
    From: Stream,
    Map: Fn(From::Item) -> Into,
{
    type Item = Into;

    fn poll_recv(self: Pin<&mut Self>, cx: &mut Context<'_>) -> PollRecv<Self::Item> {
        let this = self.project();

        match this.from.poll_recv(cx) {
            PollRecv::Ready(v) => PollRecv::Ready((this.map)(v)),
            PollRecv::Pending => PollRecv::Pending,
            PollRecv::Closed => PollRecv::Closed,
        }
    }
}

#[cfg(test)]
mod tests {
    use std::pin::Pin;

    use crate::test::stream::*;
    use crate::{
        stream::{PollRecv, Stream},
        Context,
    };
    use std::convert::identity;

    use super::MapStream;

    #[test]
    fn map() {
        let source = from_iter(vec![1, 2, 3]);
        let mut find = MapStream::new(source, |i| i + 10);

        let mut cx = Context::empty();

        assert_eq!(PollRecv::Ready(11), Pin::new(&mut find).poll_recv(&mut cx));
        assert_eq!(PollRecv::Ready(12), Pin::new(&mut find).poll_recv(&mut cx));
        assert_eq!(PollRecv::Ready(13), Pin::new(&mut find).poll_recv(&mut cx));
        assert_eq!(PollRecv::Closed, Pin::new(&mut find).poll_recv(&mut cx));
    }

    #[test]
    fn forward_pending() {
        let source = pending::<usize>();
        let mut find = MapStream::new(source, identity);

        let mut cx = Context::empty();

        assert_eq!(PollRecv::Pending, Pin::new(&mut find).poll_recv(&mut cx));
    }

    #[test]
    fn forward_closed() {
        let source = closed::<usize>();
        let mut find = MapStream::new(source, identity);

        let mut cx = Context::empty();

        assert_eq!(PollRecv::Closed, Pin::new(&mut find).poll_recv(&mut cx));
    }
}