From 5441708634d538b8f9313e25f09753b3d478b092 Mon Sep 17 00:00:00 2001 From: Dietmar Maurer Date: Fri, 25 Sep 2020 12:14:59 +0200 Subject: [PATCH] src/client/pull.rs: use new ParallelHandler --- src/client/pull.rs | 149 +++++++++------------------------------------ 1 file changed, 28 insertions(+), 121 deletions(-) diff --git a/src/client/pull.rs b/src/client/pull.rs index 0e8c2ace..e92d2622 100644 --- a/src/client/pull.rs +++ b/src/client/pull.rs @@ -7,10 +7,11 @@ use std::sync::{Arc, Mutex}; use std::collections::{HashSet, HashMap}; use std::io::{Seek, SeekFrom}; use std::time::SystemTime; +use std::sync::atomic::{AtomicUsize, Ordering}; use proxmox::api::error::{StatusCode, HttpError}; use crate::{ - tools::compute_file_csum, + tools::{ParallelHandler, compute_file_csum}, server::WorkerTask, backup::*, api2::types::*, @@ -22,111 +23,6 @@ use crate::{ // fixme: delete vanished groups // Todo: correctly lock backup groups -fn chunk_writer_pipeline( - target: Arc, -) -> ( - std::sync::mpsc::SyncSender<(DataBlob, [u8;32], u64)>, - std::sync::mpsc::Receiver>, -) { - let (writer_tx, writer_rx) = std::sync::mpsc::sync_channel(1); - let (result_tx, result_rx) = std::sync::mpsc::channel(); - let (pipe1_tx, pipe1_rx) = std::sync::mpsc::sync_channel(1); - let (pipe2_tx, pipe2_rx) = std::sync::mpsc::sync_channel(5); - - let pipe2_copy_tx = pipe2_tx.clone(); - let result_tx_copy1 = result_tx.clone(); - let result_tx_copy2 = result_tx.clone(); - - let thread1 = std::thread::Builder::new() - .name(String::from("sync chunk decompressor")) - .spawn(move|| { - let result: Result<(), Error> = proxmox::try_block!({ - loop { - let (chunk, digest, size): (DataBlob, [u8;32], u64) = match writer_rx.recv() { - Ok(input) => input, - Err(_err) => return Ok(()), - }; - - if chunk.is_encrypted() { - pipe2_copy_tx.send((chunk, digest)).unwrap(); - continue; - } - - let data = chunk.decode(None, None)?; - - if data.len() as u64 != size { - bail!("detected chunk with wrong length ({} != {})", size, data.len()); - } - - pipe1_tx.send((chunk, data, digest)).unwrap(); - } - }); - if let Err(err) = result { - result_tx_copy1.send(Err(err)).unwrap(); - } - }).unwrap(); - - let thread2 = std::thread::Builder::new() - .name(String::from("sync chunk verifier")) - .spawn(move|| { - let result: Result<(), Error> = proxmox::try_block!({ - loop { - let (chunk, data, digest): (DataBlob, Vec, [u8;32]) = match pipe1_rx.recv() { - Ok(input) => input, - Err(_err) => return Ok(()), - }; - - let data_digest = openssl::sha::sha256(&data); - if digest != data_digest { - bail!("detected chunk with wrong digest."); - } - pipe2_tx.send((chunk, digest)).unwrap(); - } - }); - if let Err(err) = result { - result_tx_copy2.send(Err(err)).unwrap(); - } - }).unwrap(); - - std::thread::Builder::new() - .name(String::from("sync chunk writer")) - .spawn(move|| { - let mut bytes = 0; - let result: Result<(), Error> = proxmox::try_block!({ - loop { - let (chunk, digest): (DataBlob, [u8;32]) = match pipe2_rx.recv() { - Ok(input) => input, - Err(_err) => break, - }; - bytes += chunk.raw_size() as usize; - target.insert_chunk(&chunk, &digest)?; - } - - if let Err(panic) = thread2.join() { - match panic.downcast::<&str>() { - Ok(panic_msg) => bail!("verification thread failed: {}", panic_msg), - Err(_) => bail!("verification thread failed"), - } - } - - if let Err(panic) = thread1.join() { - match panic.downcast::<&str>() { - Ok(panic_msg) => bail!("decompressor thread failed: {}", panic_msg), - Err(_) => bail!("decompressor thread failed"), - } - } - - Ok(()) - }); - match result { - Ok(()) => result_tx.send(Ok(bytes)).unwrap(), - Err(err) => result_tx.send(Err(err)).unwrap(), - } - }).unwrap(); - - (writer_tx, result_rx) -} - async fn pull_index_chunks( worker: &WorkerTask, chunk_reader: RemoteChunkReader, @@ -154,14 +50,28 @@ async fn pull_index_chunks( }) ); - let (write_channel, write_result_rx) = chunk_writer_pipeline(target.clone()); + let target2 = target.clone(); + let verify_pool = ParallelHandler::new( + "sync chunk writer", 4, + move |(chunk, digest, size): (DataBlob, [u8;32], u64)| { + // println!("verify and write {}", proxmox::tools::digest_to_hex(&digest)); + chunk.verify_unencrypted(size as usize, &digest)?; + target2.insert_chunk(&chunk, &digest)?; + Ok(()) + } + ); - let result = stream + let verify_and_write_channel = verify_pool.channel(); + + let bytes = Arc::new(AtomicUsize::new(0)); + + stream .map(|info| { let target = Arc::clone(&target); let chunk_reader = chunk_reader.clone(); - let write_channel = write_channel.clone(); + let bytes = Arc::clone(&bytes); + let verify_and_write_channel = verify_and_write_channel.clone(); Ok::<_, Error>(async move { let chunk_exists = crate::tools::runtime::block_in_place(|| target.cond_touch_chunk(&info.digest, false))?; @@ -171,31 +81,28 @@ async fn pull_index_chunks( } //worker.log(format!("sync {} chunk {}", pos, proxmox::tools::digest_to_hex(digest))); let chunk = chunk_reader.read_raw_chunk(&info.digest).await?; + let raw_size = chunk.raw_size() as usize; // decode, verify and write in a separate threads to maximize throughput - crate::tools::runtime::block_in_place(|| write_channel.send((chunk, info.digest, info.size())))?; + crate::tools::runtime::block_in_place(|| verify_and_write_channel.send((chunk, info.digest, info.size())))?; + + bytes.fetch_add(raw_size, Ordering::SeqCst); Ok(()) }) }) .try_buffer_unordered(20) .try_for_each(|_res| futures::future::ok(())) - .await; + .await?; - drop(write_channel); + drop(verify_and_write_channel); - // check errors from result channel first - let bytes = match write_result_rx.recv() { - Err(_) => bail!("result channel closed early."), - Ok(Ok(bytes)) => bytes, - Ok(Err(err)) => bail!("write chnunk failed - {}", err), - }; - - // then check result - result?; + verify_pool.complete()?; let elapsed = start_time.elapsed()?.as_secs_f64(); + let bytes = bytes.load(Ordering::SeqCst); + worker.log(format!("downloaded {} bytes ({} MiB/s)", bytes, (bytes as f64)/(1024.0*1024.0*elapsed))); Ok(())