From 169c0e060f9d644d0db5c8aa79505ba037817731 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Wed, 22 May 2019 09:05:35 +0200 Subject: [PATCH] src/backup/chunk_stream.rs: optimize FixedChunkStream (use BytesMut) --- src/backup/chunk_stream.rs | 57 ++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 30 deletions(-) diff --git a/src/backup/chunk_stream.rs b/src/backup/chunk_stream.rs index 31d6ea94..a3272408 100644 --- a/src/backup/chunk_stream.rs +++ b/src/backup/chunk_stream.rs @@ -73,27 +73,40 @@ impl , Error=Error>> Stream for ChunkStream { } } +use bytes::BytesMut; + /// Split input stream into fixed sized chunks -pub struct FixedChunkStream, Error=Error>> { +pub struct FixedChunkStream { input: S, chunk_size: usize, - buffer: Option>, + buffer: BytesMut, } -impl , Error=Error>> FixedChunkStream { +impl FixedChunkStream { pub fn new(input: S, chunk_size: usize) -> Self { - Self { input, chunk_size, buffer: None } + Self { input, chunk_size, buffer: BytesMut::new() } } } -impl , Error=Error>> Stream for FixedChunkStream { +impl Stream for FixedChunkStream + where S: Stream, + S::Item: AsRef<[u8]>, +{ - type Item = Vec; - type Error = Error; + type Item = BytesMut; + type Error = S::Error; - fn poll(&mut self) -> Poll>, Error> { + fn poll(&mut self) -> Poll, S::Error> { loop { + + if self.buffer.len() == self.chunk_size { + return Ok(Async::Ready(Some(self.buffer.take()))); + } else if self.buffer.len() > self.chunk_size { + let result = self.buffer.split_to(self.chunk_size); + return Ok(Async::Ready(Some(result))); + } + match self.input.poll() { Err(err) => { return Err(err); @@ -103,30 +116,14 @@ impl , Error=Error>> Stream for FixedChunkStream { } Ok(Async::Ready(None)) => { // last chunk can have any size - return Ok(Async::Ready(self.buffer.take())); + if self.buffer.len() > 0 { + return Ok(Async::Ready(Some(self.buffer.take()))); + } else { + return Ok(Async::Ready(None)); + } } Ok(Async::Ready(Some(data))) => { - let buffer = self.buffer.get_or_insert_with(|| Vec::with_capacity(1024*1024)); - let need = self.chunk_size - buffer.len(); - - if need > data.len() { - buffer.extend(data); - // continue poll - } else if need == data.len() { - buffer.extend(data); - return Ok(Async::Ready(self.buffer.take())); - } else if need < data.len() { - let (left, right) = data.split_at(need); - buffer.extend(left); - - let result = self.buffer.take(); - - self.buffer = Some(Vec::from(right)); - - return Ok(Async::Ready(result)); - } else { - unreachable!(); - } + self.buffer.extend_from_slice(data.as_ref()); } } }