diff --git a/src/client/http_client.rs b/src/client/http_client.rs index 1053e45a..46ddf758 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -5,6 +5,8 @@ use hyper::Body; use hyper::client::Client; use xdg::BaseDirectories; use chrono::Utc; +use std::collections::HashSet; +use std::sync::{Arc, Mutex}; use http::{Request, Response}; use http::header::HeaderValue; @@ -465,10 +467,50 @@ impl H2Client { (verify_queue_tx, verify_result_rx) } + pub fn download_chunk_list( + &self, + path: &str, + archive_name: &str, + known_chunks: Arc>>, + ) -> impl Future { + + let param = json!({ "archive-name": archive_name }); + let request = Self::request_builder("localhost", "GET", path, Some(param)).unwrap(); + + self.send_request(request, None) + .and_then(move |response| { + response + .map_err(Error::from) + .and_then(move |resp| { + let status = resp.status(); + if !status.is_success() { + bail!("download chunk list failed with status {}", status); + } + + let (_head, body) = resp.into_parts(); + + Ok(body) + }) + .and_then(move |mut body| { + + let mut release_capacity = body.release_capacity().clone(); + + crate::backup::DigestListDecoder::new(body.map_err(Error::from)) + .for_each(move |chunk| { + let _ = release_capacity.release_capacity(chunk.len()); + println!("GOT DOWNLOAD {}", tools::digest_to_hex(&chunk)); + known_chunks.lock().unwrap().insert(chunk); + Ok(()) + }) + }) + }) + } + pub fn upload_stream( &self, wid: u64, - stream: impl Stream, Error=Error>, + stream: impl Stream, + known_chunks: Arc>>, ) -> impl Future { let repeat = std::sync::Arc::new(AtomicUsize::new(0)); @@ -492,18 +534,20 @@ impl H2Client { let digest = openssl::sha::sha256(&data); - let chunk_is_known = false; + let mut known_chunks = known_chunks.lock().unwrap(); + let chunk_is_known = known_chunks.contains(&digest); let upload_data; let request; - println!("upload chunk ({} bytes)", data.len()); - if chunk_is_known { + println!("append existing chunk ({} bytes)", data.len()); let param = json!({ "wid": wid, "digest": tools::digest_to_hex(&digest) }); request = Self::request_builder("localhost", "PUT", "dynamic_index", Some(param)).unwrap(); upload_data = None; } else { + println!("upload new chunk {} ({} bytes)", tools::digest_to_hex(&digest), data.len()); + known_chunks.insert(digest); let param = json!({ "wid": wid, "size" : data.len() }); request = Self::request_builder("localhost", "POST", "dynamic_chunk", Some(param)).unwrap(); upload_data = Some(bytes::Bytes::from(data));