diff --git a/src/backup.rs b/src/backup.rs index 13b9a32f..5f958acf 100644 --- a/src/backup.rs +++ b/src/backup.rs @@ -102,6 +102,9 @@ //! //! Not sure if this is better. TODO +mod chunk_stream; +pub use chunk_stream::*; + mod chunk_stat; pub use chunk_stat::*; diff --git a/src/backup/chunk_stream.rs b/src/backup/chunk_stream.rs new file mode 100644 index 00000000..39cadfe0 --- /dev/null +++ b/src/backup/chunk_stream.rs @@ -0,0 +1,73 @@ +use failure::*; + +use proxmox_protocol::Chunker; +use futures::{Async, Poll}; +use futures::stream::Stream; + +pub struct ChunkStream, Error=Error>> { + input: S, + chunker: Chunker, + buffer: Option>, + rest: Option>, +} + +impl , Error=Error>> ChunkStream { + + pub fn new(input: S) -> Self { + Self { input, chunker: Chunker::new(4 * 1024 * 1024), buffer: None, rest: None } + } +} + +impl , Error=Error>> Stream for ChunkStream { + + type Item = Vec; + type Error = Error; + + fn poll(&mut self) -> Poll>, Error> { + loop { + match self.input.poll() { + Err(err) => { + return Err(err); + } + Ok(Async::NotReady) => { + return Ok(Async::NotReady); + } + Ok(Async::Ready(None)) => { + let mut data = self.buffer.take().or_else(|| Some(vec![])).unwrap(); + if let Some(rest) = self.rest.take() { data.extend(rest); } + + if data.len() > 0 { + return Ok(Async::Ready(Some(data))); + } else { + return Ok(Async::Ready(None)); + } + } + Ok(Async::Ready(Some(mut data))) => { + + if let Some(rest) = self.rest.take() { data.extend(rest); } + + let buffer = self.buffer.get_or_insert_with(|| Vec::with_capacity(1024*1024)); + let boundary = self.chunker.scan(&data); + + if boundary == 0 { + buffer.extend(data); + // continue poll + } else if boundary == data.len() { + buffer.extend(data); + return Ok(Async::Ready(self.buffer.take())); + } else if boundary < data.len() { + let (left, right) = data.split_at(boundary); + buffer.extend(left); + + let rest = self.rest.get_or_insert_with(|| Vec::with_capacity(right.len())); + rest.extend(right); + + return Ok(Async::Ready(self.buffer.take())); + } else { + panic!("got unexpected chunk boundary from chunker"); + } + } + } + } + } +}