while let Ok(sz) = rsp.read(&mut buf) {
let read = &buf[..sz];
vec.extend_from_slice(read);
- vec = 'outer: loop {
- for line in vec.split_inclusive(|c| *c == 10) {
- if !line.ends_with(&[10]) {
- let mut newvec = Vec::new();
- newvec.extend_from_slice(line);
- break 'outer newvec;
- } else if line.starts_with(&[':' as u8]) {
- // Ignore lines starting with ':': in the
- // Mastodon streaming APIs those are
- // heartbeat events whose sole purpose is
- // to avoid the connection timing out. We
- // don't communicate them back to the main
- // thread.
- } else {
- let rsp = match std::str::from_utf8(&line) {
- Err(_) => StreamResponse::BadUTF8,
- Ok(d) => StreamResponse::Line(d.to_owned()),
- };
- receiver(StreamUpdate {
- id: id.clone(),
- response: rsp
- });
- }
+ let mut lines_iter = vec.split_inclusive(|c| *c == b'\n')
+ .peekable();
+ while let Some(line) = lines_iter.next_if(
+ |line| line.ends_with(b"\n"))
+ {
+ if line.starts_with(b":") {
+ // Ignore lines starting with ':': in the
+ // Mastodon streaming APIs those are heartbeat
+ // events whose sole purpose is to avoid the
+ // connection timing out. We don't communicate
+ // them back to the main thread.
+ } else {
+ let rsp = match std::str::from_utf8(&line) {
+ Err(_) => StreamResponse::BadUTF8,
+ Ok(d) => StreamResponse::Line(d.to_owned()),
+ };
+ receiver(StreamUpdate {
+ id: id.clone(),
+ response: rsp
+ });
}
- // If we didn't get a partial line inside the loop, then
- // we must have an empty buffer here
- break Vec::new();
+ }
+
+ // We've consumed all the complete lines in the
+ // vector. Reinitialise the vector to whatever is
+ // left.
+ vec = match lines_iter.next() {
+ Some(rest) => rest.into(),
+ None => Vec::new(),
};
}