From 32bef1e2d198bd9e71d63bddbc3b0747701bb7a9 Mon Sep 17 00:00:00 2001 From: Wolfgang Bumiller Date: Fri, 23 Aug 2019 13:41:30 +0200 Subject: [PATCH] src/backup/chunk_stream.rs: switch to async Signed-off-by: Wolfgang Bumiller --- src/backup/chunk_stream.rs | 123 ++++++++++++++++++------------------- 1 file changed, 61 insertions(+), 62 deletions(-) diff --git a/src/backup/chunk_stream.rs b/src/backup/chunk_stream.rs index d542a328..4d0ca6b6 100644 --- a/src/backup/chunk_stream.rs +++ b/src/backup/chunk_stream.rs @@ -1,125 +1,124 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + use bytes::BytesMut; use failure::*; -use futures::stream::Stream; -use futures::{Async, Poll}; +use futures::ready; +use futures::stream::{Stream, TryStream}; use super::Chunker; /// Split input stream into dynamic sized chunks -pub struct ChunkStream { +pub struct ChunkStream { input: S, chunker: Chunker, buffer: BytesMut, scan_pos: usize, } -impl ChunkStream { +impl ChunkStream { pub fn new(input: S, chunk_size: Option) -> Self { Self { input, chunker: Chunker::new(chunk_size.unwrap_or(4*1024*1024)), buffer: BytesMut::new(), scan_pos: 0} } } -impl Stream for ChunkStream - where S: Stream, - S::Item: AsRef<[u8]>, - S::Error: Into, +impl Unpin for ChunkStream {} + +impl Stream for ChunkStream +where + S: TryStream, + S::Ok: AsRef<[u8]>, + S::Error: Into, { - type Item = BytesMut; - type Error = Error; + type Item = Result; - fn poll(&mut self) -> Poll, Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let this = self.get_mut(); loop { + if this.scan_pos < this.buffer.len() { + let boundary = this.chunker.scan(&this.buffer[this.scan_pos..]); - if self.scan_pos < self.buffer.len() { - let boundary = self.chunker.scan(&self.buffer[self.scan_pos..]); - - let chunk_size = self.scan_pos + boundary; + let chunk_size = this.scan_pos + boundary; if boundary == 0 { - self.scan_pos = self.buffer.len(); + this.scan_pos = this.buffer.len(); // continue poll - } else if chunk_size <= self.buffer.len() { - let result = self.buffer.split_to(chunk_size); - self.scan_pos = 0; - return Ok(Async::Ready(Some(result))); + } else if chunk_size <= this.buffer.len() { + let result = this.buffer.split_to(chunk_size); + this.scan_pos = 0; + return Poll::Ready(Some(Ok(result))); } else { panic!("got unexpected chunk boundary from chunker"); } } - match self.input.poll() { - Err(err) => { - return Err(err.into()); + match ready!(Pin::new(&mut this.input).try_poll_next(cx)) { + Some(Err(err)) => { + return Poll::Ready(Some(Err(err.into()))); } - Ok(Async::NotReady) => { - return Ok(Async::NotReady); - } - Ok(Async::Ready(None)) => { - self.scan_pos = 0; - if self.buffer.len() > 0 { - return Ok(Async::Ready(Some(self.buffer.take()))); + None => { + this.scan_pos = 0; + if this.buffer.len() > 0 { + return Poll::Ready(Some(Ok(this.buffer.take()))); } else { - return Ok(Async::Ready(None)); + return Poll::Ready(None); } } - Ok(Async::Ready(Some(data))) => { - self.buffer.extend_from_slice(data.as_ref()); - } + Some(Ok(data)) => { + this.buffer.extend_from_slice(data.as_ref()); + } } } } } /// Split input stream into fixed sized chunks -pub struct FixedChunkStream { +pub struct FixedChunkStream { input: S, chunk_size: usize, buffer: BytesMut, } -impl FixedChunkStream { - +impl FixedChunkStream { pub fn new(input: S, chunk_size: usize) -> Self { Self { input, chunk_size, buffer: BytesMut::new() } } } -impl Stream for FixedChunkStream - where S: Stream, - S::Item: AsRef<[u8]>, +impl Unpin for FixedChunkStream {} + +impl Stream for FixedChunkStream +where + S: TryStream, + S::Ok: AsRef<[u8]>, { + type Item = Result; - type Item = BytesMut; - type Error = S::Error; - - fn poll(&mut self) -> Poll, S::Error> { + fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { + let this = self.get_mut(); loop { - - if self.buffer.len() == self.chunk_size { - return Ok(Async::Ready(Some(self.buffer.take()))); - } else if self.buffer.len() > self.chunk_size { - let result = self.buffer.split_to(self.chunk_size); - return Ok(Async::Ready(Some(result))); + if this.buffer.len() == this.chunk_size { + return Poll::Ready(Some(Ok(this.buffer.take()))); + } else if this.buffer.len() > this.chunk_size { + let result = this.buffer.split_to(this.chunk_size); + return Poll::Ready(Some(Ok(result))); } - match self.input.poll() { - Err(err) => { - return Err(err); + match ready!(Pin::new(&mut this.input).try_poll_next(cx)) { + Some(Err(err)) => { + return Poll::Ready(Some(Err(err))); } - Ok(Async::NotReady) => { - return Ok(Async::NotReady); - } - Ok(Async::Ready(None)) => { + None => { // last chunk can have any size - if self.buffer.len() > 0 { - return Ok(Async::Ready(Some(self.buffer.take()))); + if this.buffer.len() > 0 { + return Poll::Ready(Some(Ok(this.buffer.take()))); } else { - return Ok(Async::Ready(None)); + return Poll::Ready(None); } } - Ok(Async::Ready(Some(data))) => { - self.buffer.extend_from_slice(data.as_ref()); + Some(Ok(data)) => { + this.buffer.extend_from_slice(data.as_ref()); } } }