chiark / gitweb /
3a85f5c27b1886b3863ca22e033386a849ecc26b
[hippotat.git] / src / queue.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use crate::prelude::*;
6
7 // xxx are we using this at all ?
8 #[derive(Default,Clone)]
9 pub struct PacketQueue<D> {
10   queue: VecDeque<D>,
11   content: usize,
12 }
13
14 impl<D> PacketQueue<D> where D: AsRef<[u8]> {
15   pub fn push_back(&mut self, data: D) {
16     self.content += data.as_ref().len();
17     self.queue.push_back(data);
18   }
19
20   pub fn pop_front(&mut self) -> Option<D> {
21     let data = self.queue.pop_front()?;
22     self.content -= data.as_ref().len();
23     Some(data)
24   }
25
26   pub fn content_count(&self) -> usize { self.queue.len() }
27   pub fn content_len(&self) -> usize { self.content }
28   pub fn total_len(&self) -> usize {
29     self.content_count() + self.content_len()
30   }
31
32   pub fn is_empty(&self) -> bool { self.queue.is_empty() }
33   pub fn peek_front(&self) -> Option<&D> { self.queue.front() }
34 }
35
36 #[derive(Default,Clone)]
37 pub struct QueueBuf<E> {
38   content: usize,
39   eaten1: usize, // 0 <= eaten1 < queue.front()...len()
40   queue: VecDeque<E>,
41 }
42
43 #[derive(Default,Debug,Clone)]
44 pub struct FrameQueueBuf {
45   queue: QueueBuf<Cervine<'static, Box<[u8]>, [u8]>>,
46 }
47
48 impl<E> Debug for QueueBuf<E> where E: AsRef<[u8]> {
49   #[throws(fmt::Error)]
50   fn fmt(&self, f: &mut fmt::Formatter) {
51     write!(f, "Queue{{content={},eaten1={},queue=[",
52            self.content, self.eaten1)?;
53     for q in &self.queue { write!(f, "{},", q.as_ref().len())?; }
54     write!(f, "]}}")?;
55   }
56 }
57
58 impl<E> QueueBuf<E> where E: AsRef<[u8]> {
59   pub fn push<B: Into<E>>(&mut self, b: B) {
60     self.push_(b.into());
61   }
62   fn push_(&mut self, b: E) {
63     let l = b.as_ref().len();
64     self.queue.push_back(b);
65     self.content += l;
66   }
67   pub fn is_empty(&self) -> bool { self.content == 0 }
68   pub fn len(&self) -> usize { self.content }
69 }
70
71 impl FrameQueueBuf {
72   pub fn push_esc<B: Into<Box<[u8]>>>(&mut self, b: B) {
73     self.push_esc_(b.into());
74   }
75   fn push_esc_(&mut self, b: Box<[u8]>) {
76     self.queue.push_(Cervine::Owned(b));
77     self.queue.push_(Cervine::Borrowed(&SLIP_END_SLICE));
78   }
79   pub fn esc_push(&mut self, b: Box<[u8]>) {
80     self.queue.push_(Cervine::Borrowed(&SLIP_END_SLICE));
81     self.queue.push_(Cervine::Owned(b));
82   }
83   pub fn push_raw(&mut self, b: Box<[u8]>) {
84     self.queue.push_(Cervine::Owned(b));
85   }
86   pub fn is_empty(&self) -> bool { self.queue.is_empty() }
87   pub fn len(&self) -> usize { self.queue.len() }
88 }
89
90 impl<E> hyper::body::Buf for QueueBuf<E> where E: AsRef<[u8]> {
91   fn remaining(&self) -> usize { self.content }
92   fn chunk(&self) -> &[u8] {
93     let front = if let Some(f) = self.queue.front() { f } else { return &[] };
94     &front.as_ref()[ self.eaten1.. ]
95   }
96   fn advance(&mut self, cnt: usize) {
97     self.content -= cnt;
98     self.eaten1 += cnt;
99     loop {
100       if self.eaten1 == 0 { break }
101       let front = self.queue.front().unwrap();
102       if self.eaten1 < front.as_ref().len() { break; }
103       self.eaten1 -= front.as_ref().len();
104       self.queue.pop_front().unwrap();
105     }
106   }
107 }
108
109 impl hyper::body::Buf for FrameQueueBuf {
110   fn remaining(&self) -> usize { self.queue.remaining() }
111   fn chunk(&self) -> &[u8] { self.queue.chunk() }
112   fn advance(&mut self, cnt: usize) { self.queue.advance(cnt) }
113 }
114
115 pin_project!{
116   pub struct BufBody<B:Buf> {
117     body: Option<B>,
118   }
119 }
120 impl<B:Buf> BufBody<B> {
121   pub fn new(body: B) -> Self { Self { body: Some(body ) } }
122 }
123 impl BufBody<FrameQueueBuf> {
124   pub fn display<S:Display>(s: S) -> Self {
125     let s = s.to_string().into_bytes();
126     let mut buf: FrameQueueBuf = default();
127     buf.push_raw(s.into());
128     Self::new(buf)
129   }
130 }
131
132 impl<B:Buf> HttpBody for BufBody<B> {
133   type Error = Void;
134   type Data = B;
135   fn poll_data(self: Pin<&mut Self>, _: &mut std::task::Context<'_>)
136                -> Poll<Option<Result<B, Void>>> {
137     Poll::Ready(Ok(self.project().body.take()).transpose())
138   }
139   fn poll_trailers(self: Pin<&mut Self>, _: &mut std::task::Context<'_>)
140  -> Poll<Result<Option<hyper::HeaderMap<hyper::header::HeaderValue>>, Void>> {
141     Poll::Ready(Ok(None))
142   }
143 }