From 2f831baec04297861b602096c7f9be9de3160d46 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Thu, 5 Sep 2019 12:55:22 +0200 Subject: [PATCH] src/client/http_client.rs - BackupClient: use async --- src/client/http_client.rs | 145 +++++++++++++++++--------------------- 1 file changed, 63 insertions(+), 82 deletions(-) diff --git a/src/client/http_client.rs b/src/client/http_client.rs index b76939be..d620a7de 100644 --- a/src/client/http_client.rs +++ b/src/client/http_client.rs @@ -613,102 +613,91 @@ impl BackupClient { self.canceller.cancel(); } - pub fn upload_blob( + pub async fn upload_blob( &self, mut reader: R, file_name: &str, - ) -> impl Future> { + ) -> Result { + let mut raw_data = Vec::new(); + // fixme: avoid loading into memory + reader.read_to_end(&mut raw_data)?; - let h2 = self.h2.clone(); - let file_name = file_name.to_owned(); - - async move { - let mut raw_data = Vec::new(); - // fixme: avoid loading into memory - reader.read_to_end(&mut raw_data)?; - - let csum = openssl::sha::sha256(&raw_data); - let param = json!({"encoded-size": raw_data.len(), "file-name": file_name }); - let size = raw_data.len() as u64; // fixme: should be decoded size instead?? - let _value = h2.upload("blob", Some(param), raw_data).await?; - Ok(BackupStats { size, csum }) - } + let csum = openssl::sha::sha256(&raw_data); + let param = json!({"encoded-size": raw_data.len(), "file-name": file_name }); + let size = raw_data.len() as u64; // fixme: should be decoded size instead?? + let _value = self.h2.upload("blob", Some(param), raw_data).await?; + Ok(BackupStats { size, csum }) } - pub fn upload_blob_from_data( + pub async fn upload_blob_from_data( &self, data: Vec, file_name: &str, crypt_config: Option>, compress: bool, sign_only: bool, - ) -> impl Future> { + ) -> Result { - let h2 = self.h2.clone(); - let file_name = file_name.to_owned(); let size = data.len() as u64; - async move { - let blob = if let Some(crypt_config) = crypt_config { - if sign_only { - DataBlob::create_signed(&data, crypt_config, compress)? - } else { - DataBlob::encode(&data, Some(crypt_config.clone()), compress)? - } + let blob = if let Some(crypt_config) = crypt_config { + if sign_only { + DataBlob::create_signed(&data, crypt_config, compress)? } else { - DataBlob::encode(&data, None, compress)? - }; + DataBlob::encode(&data, Some(crypt_config.clone()), compress)? + } + } else { + DataBlob::encode(&data, None, compress)? + }; - let raw_data = blob.into_inner(); + let raw_data = blob.into_inner(); - let csum = openssl::sha::sha256(&raw_data); - let param = json!({"encoded-size": raw_data.len(), "file-name": file_name }); - let _value = h2.upload("blob", Some(param), raw_data).await?; - Ok(BackupStats { size, csum }) - } + let csum = openssl::sha::sha256(&raw_data); + let param = json!({"encoded-size": raw_data.len(), "file-name": file_name }); + let _value = self.h2.upload("blob", Some(param), raw_data).await?; + Ok(BackupStats { size, csum }) } - pub fn upload_blob_from_file>( + pub async fn upload_blob_from_file>( &self, src_path: P, file_name: &str, crypt_config: Option>, compress: bool, - ) -> impl Future> { + ) -> Result { - let h2 = self.h2.clone(); - let file_name = file_name.to_owned(); - let src_path = src_path.as_ref().to_owned(); + let src_path = src_path.as_ref(); - async move { - let mut file = tokio::fs::File::open(src_path.clone()) - .await - .map_err(move |err| format_err!("unable to open file {:?} - {}", src_path, err))?; + let mut file = tokio::fs::File::open(src_path.clone()) + .await + .map_err(|err| format_err!("unable to open file {:?} - {}", src_path, err))?; - let mut contents = Vec::new(); - file.read_to_end(&mut contents).await.map_err(Error::from)?; + let mut contents = Vec::new(); - let size: u64 = contents.len() as u64; - let blob = DataBlob::encode(&contents, crypt_config, compress)?; - let raw_data = blob.into_inner(); - let csum = openssl::sha::sha256(&raw_data); - let param = json!({ - "encoded-size": raw_data.len(), - "file-name": file_name, - }); - h2.upload("blob", Some(param), raw_data).await?; - Ok(BackupStats { size, csum }) - } + file.read_to_end(&mut contents) + .await + .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?; + + let size: u64 = contents.len() as u64; + let blob = DataBlob::encode(&contents, crypt_config, compress)?; + let raw_data = blob.into_inner(); + let csum = openssl::sha::sha256(&raw_data); + let param = json!({ + "encoded-size": raw_data.len(), + "file-name": file_name, + }); + self.h2.upload("blob", Some(param), raw_data).await?; + Ok(BackupStats { size, csum }) } - pub fn upload_stream( + pub async fn upload_stream( &self, archive_name: &str, stream: impl Stream>, prefix: &str, fixed_size: Option, crypt_config: Option>, - ) -> impl Future> { + ) -> Result { let known_chunks = Arc::new(Mutex::new(HashSet::new())); let mut param = json!({ "archive-name": archive_name }); @@ -719,20 +708,13 @@ impl BackupClient { let index_path = format!("{}_index", prefix); let close_path = format!("{}_close", prefix); - let prefix = prefix.to_owned(); + Self::download_chunk_list(self.h2.clone(), &index_path, archive_name, known_chunks.clone()).await?; - let h2 = self.h2.clone(); + let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap(); - let download_future = - Self::download_chunk_list(h2.clone(), &index_path, archive_name, known_chunks.clone()); - - async move { - download_future.await?; - - let wid = h2.post(&index_path, Some(param)).await?.as_u64().unwrap(); - - let (chunk_count, size, _speed, csum) = Self::upload_chunk_info_stream( - h2.clone(), + let (chunk_count, size, _speed, csum) = + Self::upload_chunk_info_stream( + self.h2.clone(), wid, stream, &prefix, @@ -741,17 +723,16 @@ impl BackupClient { ) .await?; - let param = json!({ - "wid": wid , - "chunk-count": chunk_count, - "size": size, - }); - let _value = h2.post(&close_path, Some(param)).await?; - Ok(BackupStats { - size: size as u64, - csum, - }) - } + let param = json!({ + "wid": wid , + "chunk-count": chunk_count, + "size": size, + }); + let _value = self.h2.post(&close_path, Some(param)).await?; + Ok(BackupStats { + size: size as u64, + csum, + }) } fn response_queue() -> (