From 91320f0879a2a787ac32abbb94d4b4d8ee3a5d4f Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Thu, 23 May 2019 09:42:37 +0200 Subject: [PATCH] src/client/http_client.rs: use ChunkInfo streams This will make out of order uploads possible... --- src/client/http_client.rs | 37 +++++++++++++++++++++++++------------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 7855db5e..e9a39582 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -405,6 +405,12 @@ pub struct BackupClient { h2: H2Client, } +struct ChunkInfo { + digest: [u8; 32], + data: bytes::BytesMut, + offset: u64, +} + impl BackupClient { pub fn new(h2: h2::client::SendRequest) -> Self { @@ -435,6 +441,15 @@ impl BackupClient { let known_chunks = Arc::new(Mutex::new(HashSet::new())); + let mut stream_len = 0u64; + + let stream = stream. + map(move |data| { + let digest = openssl::sha::sha256(&data); + stream_len += data.len() as u64; + ChunkInfo { data, digest, offset: stream_len } + }); + let h2 = self.h2.clone(); let h2_2 = self.h2.clone(); let h2_3 = self.h2.clone(); @@ -532,7 +547,7 @@ impl BackupClient { fn upload_stream( h2: H2Client, wid: u64, - stream: impl Stream, + stream: impl Stream, known_chunks: Arc>>, ) -> impl Future { @@ -547,33 +562,31 @@ impl BackupClient { let start_time = std::time::Instant::now(); stream - .for_each(move |data| { + .for_each(move |chunk_info| { let h2 = h2.clone(); repeat.fetch_add(1, Ordering::SeqCst); - stream_len.fetch_add(data.len(), Ordering::SeqCst); + stream_len.fetch_add(chunk_info.data.len(), Ordering::SeqCst); let upload_queue = upload_queue.clone(); - let digest = openssl::sha::sha256(&data); - let mut known_chunks = known_chunks.lock().unwrap(); - let chunk_is_known = known_chunks.contains(&digest); + let chunk_is_known = known_chunks.contains(&chunk_info.digest); let upload_data; let request; if chunk_is_known { - println!("append existing chunk ({} bytes)", data.len()); - let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&digest) }); + println!("append existing chunk ({} bytes)", chunk_info.data.len()); + let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&chunk_info.digest) }); request = H2Client::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap(); upload_data = None; } else { - println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&digest), data.len()); - known_chunks.insert(digest); - let param = json!({ "wid": wid, "size" : data.len() }); + println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&chunk_info.digest), chunk_info.data.len()); + known_chunks.insert(chunk_info.digest); + let param = json!({ "wid": wid, "size" : chunk_info.data.len() }); request = H2Client::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); - upload_data = Some(bytes::Bytes::from(data)); + upload_data = Some(chunk_info.data.freeze()); } h2.send_request(request, upload_data)