diff --git a/src/api2/backup.rs b/src/api2/backup.rs index e60c2078..45dbdec4 100644 --- a/src/api2/backup.rs +++ b/src/api2/backup.rs @@ -10,7 +10,7 @@ use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironm use proxmox::api::router::SubdirMap; use proxmox::api::schema::*; -use crate::tools::{self, WrappedReaderStream}; +use crate::tools; use crate::server::{WorkerTask, H2Service}; use crate::backup::*; use crate::api2::types::*; @@ -199,7 +199,6 @@ pub const BACKUP_API_SUBDIRS: SubdirMap = &[ ), ( "dynamic_index", &Router::new() - .download(&API_METHOD_DYNAMIC_CHUNK_INDEX) .post(&API_METHOD_CREATE_DYNAMIC_INDEX) .put(&API_METHOD_DYNAMIC_APPEND) ), @@ -222,10 +221,13 @@ pub const BACKUP_API_SUBDIRS: SubdirMap = &[ ), ( "fixed_index", &Router::new() - .download(&API_METHOD_FIXED_CHUNK_INDEX) .post(&API_METHOD_CREATE_FIXED_INDEX) .put(&API_METHOD_FIXED_APPEND) ), + ( + "previous", &Router::new() + .download(&API_METHOD_DOWNLOAD_PREVIOUS) + ), ( "speedtest", &Router::new() .upload(&API_METHOD_UPLOAD_SPEEDTEST) @@ -610,20 +612,17 @@ fn finish_backup ( } #[sortable] -pub const API_METHOD_DYNAMIC_CHUNK_INDEX: ApiMethod = ApiMethod::new( - &ApiHandler::AsyncHttp(&dynamic_chunk_index), +pub const API_METHOD_DOWNLOAD_PREVIOUS: ApiMethod = ApiMethod::new( + &ApiHandler::AsyncHttp(&download_previous), &ObjectSchema::new( - r###" -Download the dynamic chunk index from the previous backup. -Simply returns an empty list if this is the first backup. -"### , + "Download archive from previous backup.", &sorted!([ ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA) ]), ) ); -fn dynamic_chunk_index( +fn download_previous( _parts: Parts, _req_body: Body, param: Value, @@ -636,130 +635,16 @@ fn dynamic_chunk_index( let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); - if !archive_name.ends_with(".didx") { - bail!("wrong archive extension: '{}'", archive_name); - } - - let empty_response = { - Response::builder() - .status(StatusCode::OK) - .body(Body::empty())? - }; - let last_backup = match &env.last_backup { Some(info) => info, - None => return Ok(empty_response), + None => bail!("no previous backup"), }; - let mut path = last_backup.backup_dir.relative_path(); + env.log(format!("download '{}' from previous backup.", archive_name)); + + let mut path = env.datastore.snapshot_path(&last_backup.backup_dir); path.push(&archive_name); - let index = match env.datastore.open_dynamic_reader(path) { - Ok(index) => index, - Err(_) => { - env.log(format!("there is no last backup for archive '{}'", archive_name)); - return Ok(empty_response); - } - }; - - env.log(format!("download last backup index for archive '{}'", archive_name)); - - let count = index.index_count(); - for pos in 0..count { - let info = index.chunk_info(pos)?; - let size = info.size() as u32; - env.register_chunk(info.digest, size)?; - } - - let reader = DigestListEncoder::new(Box::new(index)); - - let stream = WrappedReaderStream::new(reader); - - // fixme: set size, content type? - let response = http::Response::builder() - .status(200) - .body(Body::wrap_stream(stream))?; - - Ok(response) - }.boxed() -} - -#[sortable] -pub const API_METHOD_FIXED_CHUNK_INDEX: ApiMethod = ApiMethod::new( - &ApiHandler::AsyncHttp(&fixed_chunk_index), - &ObjectSchema::new( - r###" -Download the fixed chunk index from the previous backup. -Simply returns an empty list if this is the first backup. -"### , - &sorted!([ - ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA) - ]), - ) -); - -fn fixed_chunk_index( - _parts: Parts, - _req_body: Body, - param: Value, - _info: &ApiMethod, - rpcenv: Box, -) -> ApiResponseFuture { - - async move { - let env: &BackupEnvironment = rpcenv.as_ref(); - - let archive_name = tools::required_string_param(¶m, "archive-name")?.to_owned(); - - if !archive_name.ends_with(".fidx") { - bail!("wrong archive extension: '{}'", archive_name); - } - - let empty_response = { - Response::builder() - .status(StatusCode::OK) - .body(Body::empty())? - }; - - let last_backup = match &env.last_backup { - Some(info) => info, - None => return Ok(empty_response), - }; - - let mut path = last_backup.backup_dir.relative_path(); - path.push(&archive_name); - - let index = match env.datastore.open_fixed_reader(path) { - Ok(index) => index, - Err(_) => { - env.log(format!("there is no last backup for archive '{}'", archive_name)); - return Ok(empty_response); - } - }; - - env.log(format!("download last backup index for archive '{}'", archive_name)); - - let count = index.index_count(); - let image_size = index.index_bytes(); - for pos in 0..count { - let digest = index.index_digest(pos).unwrap(); - // Note: last chunk can be smaller - let start = (pos*index.chunk_size) as u64; - let mut end = start + index.chunk_size as u64; - if end > image_size { end = image_size; } - let size = (end - start) as u32; - env.register_chunk(*digest, size)?; - } - - let reader = DigestListEncoder::new(Box::new(index)); - - let stream = WrappedReaderStream::new(reader); - - // fixme: set size, content type? - let response = http::Response::builder() - .status(200) - .body(Body::wrap_stream(stream))?; - - Ok(response) + crate::api2::helpers::create_download_response(path).await }.boxed() } diff --git a/src/backup/index.rs b/src/backup/index.rs index 5f37717c..2f362421 100644 --- a/src/backup/index.rs +++ b/src/backup/index.rs @@ -1,11 +1,5 @@ use std::collections::HashMap; use std::ops::Range; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use bytes::{Bytes, BytesMut}; -use anyhow::{format_err, Error}; -use futures::*; pub struct ChunkReadInfo { pub range: Range, @@ -59,111 +53,3 @@ pub trait IndexFile { map } } - -/// Encode digest list from an `IndexFile` into a binary stream -/// -/// The reader simply returns a birary stream of 32 byte digest values. -pub struct DigestListEncoder { - index: Box, - pos: usize, - count: usize, -} - -impl DigestListEncoder { - - pub fn new(index: Box) -> Self { - let count = index.index_count(); - Self { index, pos: 0, count } - } -} - -impl std::io::Read for DigestListEncoder { - fn read(&mut self, buf: &mut [u8]) -> Result { - if buf.len() < 32 { - panic!("read buffer too small"); - } - - if self.pos < self.count { - let mut written = 0; - loop { - let digest = self.index.index_digest(self.pos).unwrap(); - buf[written..(written + 32)].copy_from_slice(digest); - self.pos += 1; - written += 32; - if self.pos >= self.count { - break; - } - if (written + 32) >= buf.len() { - break; - } - } - Ok(written) - } else { - Ok(0) - } - } -} - -/// Decodes a Stream into Stream -/// -/// The reader simply returns a birary stream of 32 byte digest values. - -pub struct DigestListDecoder { - input: S, - buffer: BytesMut, -} - -impl DigestListDecoder { - pub fn new(input: S) -> Self { - Self { input, buffer: BytesMut::new() } - } -} - -impl Unpin for DigestListDecoder {} - -impl Stream for DigestListDecoder -where - S: Stream>, - E: Into, -{ - type Item = Result<[u8; 32], Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let this = self.get_mut(); - - loop { - if this.buffer.len() >= 32 { - let left = this.buffer.split_to(32); - - let mut digest = std::mem::MaybeUninit::<[u8; 32]>::uninit(); - unsafe { - (*digest.as_mut_ptr()).copy_from_slice(&left[..]); - return Poll::Ready(Some(Ok(digest.assume_init()))); - } - } - - match Pin::new(&mut this.input).poll_next(cx) { - Poll::Pending => { - return Poll::Pending; - } - Poll::Ready(Some(Err(err))) => { - return Poll::Ready(Some(Err(err.into()))); - } - Poll::Ready(Some(Ok(data))) => { - this.buffer.extend_from_slice(&data); - // continue - } - Poll::Ready(None) => { - let rest = this.buffer.len(); - if rest == 0 { - return Poll::Ready(None); - } - return Poll::Ready(Some(Err(format_err!( - "got small digest ({} != 32).", - rest, - )))); - } - } - } - } -} diff --git a/src/bin/proxmox-backup-client.rs b/src/bin/proxmox-backup-client.rs index ddb35473..56ab9e11 100644 --- a/src/bin/proxmox-backup-client.rs +++ b/src/bin/proxmox-backup-client.rs @@ -261,16 +261,15 @@ async fn api_datastore_latest_snapshot( Ok((group.backup_type().to_owned(), group.backup_id().to_owned(), backup_time)) } - async fn backup_directory>( client: &BackupWriter, + previous_manifest: Option>, dir_path: P, archive_name: &str, chunk_size: Option, device_set: Option>, verbose: bool, skip_lost_and_found: bool, - crypt_config: Option>, catalog: Arc>>, exclude_pattern: Vec, entries_max: usize, @@ -300,7 +299,7 @@ async fn backup_directory>( }); let stats = client - .upload_stream(archive_name, stream, "dynamic", None, crypt_config) + .upload_stream(previous_manifest, archive_name, stream, "dynamic", None) .await?; Ok(stats) @@ -308,12 +307,12 @@ async fn backup_directory>( async fn backup_image>( client: &BackupWriter, + previous_manifest: Option>, image_path: P, archive_name: &str, image_size: u64, chunk_size: Option, _verbose: bool, - crypt_config: Option>, ) -> Result { let path = image_path.as_ref().to_owned(); @@ -326,7 +325,7 @@ async fn backup_image>( let stream = FixedChunkStream::new(stream, chunk_size.unwrap_or(4*1024*1024)); let stats = client - .upload_stream(archive_name, stream, "fixed", Some(image_size), crypt_config) + .upload_stream(previous_manifest, archive_name, stream, "fixed", Some(image_size)) .await?; Ok(stats) @@ -709,8 +708,7 @@ async fn start_garbage_collection(param: Value) -> Result { } fn spawn_catalog_upload( - client: Arc, - crypt_config: Option>, + client: Arc ) -> Result< ( Arc>>, @@ -728,7 +726,7 @@ fn spawn_catalog_upload( tokio::spawn(async move { let catalog_upload_result = client - .upload_stream(CATALOG_NAME, catalog_chunk_stream, "dynamic", None, crypt_config) + .upload_stream(None, CATALOG_NAME, catalog_chunk_stream, "dynamic", None) .await; if let Err(ref err) = catalog_upload_result { @@ -959,6 +957,7 @@ async fn create_backup( let client = BackupWriter::start( client, + crypt_config.clone(), repo.store(), backup_type, &backup_id, @@ -966,6 +965,12 @@ async fn create_backup( verbose, ).await?; + let previous_manifest = if let Ok(previous_manifest) = client.download_previous_manifest().await { + Some(Arc::new(previous_manifest)) + } else { + None + }; + let snapshot = BackupDir::new(backup_type, backup_id, backup_time.timestamp()); let mut manifest = BackupManifest::new(snapshot); @@ -977,21 +982,21 @@ async fn create_backup( BackupSpecificationType::CONFIG => { println!("Upload config file '{}' to '{:?}' as {}", filename, repo, target); let stats = client - .upload_blob_from_file(&filename, &target, crypt_config.clone(), true) + .upload_blob_from_file(&filename, &target, true, Some(true)) .await?; manifest.add_file(target, stats.size, stats.csum, is_encrypted)?; } BackupSpecificationType::LOGFILE => { // fixme: remove - not needed anymore ? println!("Upload log file '{}' to '{:?}' as {}", filename, repo, target); let stats = client - .upload_blob_from_file(&filename, &target, crypt_config.clone(), true) + .upload_blob_from_file(&filename, &target, true, Some(true)) .await?; manifest.add_file(target, stats.size, stats.csum, is_encrypted)?; } BackupSpecificationType::PXAR => { // start catalog upload on first use if catalog.is_none() { - let (cat, res) = spawn_catalog_upload(client.clone(), crypt_config.clone())?; + let (cat, res) = spawn_catalog_upload(client.clone())?; catalog = Some(cat); catalog_result_tx = Some(res); } @@ -1001,13 +1006,13 @@ async fn create_backup( catalog.lock().unwrap().start_directory(std::ffi::CString::new(target.as_str())?.as_c_str())?; let stats = backup_directory( &client, + previous_manifest.clone(), &filename, &target, chunk_size_opt, devices.clone(), verbose, skip_lost_and_found, - crypt_config.clone(), catalog.clone(), pattern_list.clone(), entries_max as usize, @@ -1019,12 +1024,12 @@ async fn create_backup( println!("Upload image '{}' to '{:?}' as {}", filename, repo, target); let stats = backup_image( &client, - &filename, + previous_manifest.clone(), + &filename, &target, size, chunk_size_opt, verbose, - crypt_config.clone(), ).await?; manifest.add_file(target, stats.size, stats.csum, is_encrypted)?; } @@ -1051,7 +1056,7 @@ async fn create_backup( let target = "rsa-encrypted.key"; println!("Upload RSA encoded key to '{:?}' as {}", repo, target); let stats = client - .upload_blob_from_data(rsa_encrypted_key, target, None, false, false) + .upload_blob_from_data(rsa_encrypted_key, target, false, None) .await?; manifest.add_file(format!("{}.blob", target), stats.size, stats.csum, is_encrypted)?; @@ -1071,7 +1076,7 @@ async fn create_backup( println!("Upload index.json to '{:?}'", repo); let manifest = serde_json::to_string_pretty(&manifest)?.into(); client - .upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, crypt_config.clone(), true, true) + .upload_blob_from_data(manifest, MANIFEST_BLOB_NAME, true, Some(true)) .await?; client.finish().await?; diff --git a/src/client/backup_writer.rs b/src/client/backup_writer.rs index 9df1035b..769f3e94 100644 --- a/src/client/backup_writer.rs +++ b/src/client/backup_writer.rs @@ -1,4 +1,5 @@ use std::collections::HashSet; +use std::os::unix::fs::OpenOptionsExt; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; @@ -22,6 +23,7 @@ pub struct BackupWriter { h2: H2Client, abort: AbortHandle, verbose: bool, + crypt_config: Option>, } impl Drop for BackupWriter { @@ -38,12 +40,13 @@ pub struct BackupStats { impl BackupWriter { - fn new(h2: H2Client, abort: AbortHandle, verbose: bool) -> Arc { - Arc::new(Self { h2, abort, verbose }) + fn new(h2: H2Client, abort: AbortHandle, crypt_config: Option>, verbose: bool) -> Arc { + Arc::new(Self { h2, abort, crypt_config, verbose }) } pub async fn start( client: HttpClient, + crypt_config: Option>, datastore: &str, backup_type: &str, backup_id: &str, @@ -64,7 +67,7 @@ impl BackupWriter { let (h2, abort) = client.start_h2_connection(req, String::from(PROXMOX_BACKUP_PROTOCOL_ID_V1!())).await?; - Ok(BackupWriter::new(h2, abort, debug)) + Ok(BackupWriter::new(h2, abort, crypt_config, debug)) } pub async fn get( @@ -159,16 +162,19 @@ impl BackupWriter { &self, data: Vec, file_name: &str, - crypt_config: Option>, compress: bool, - sign_only: bool, + crypt_or_sign: Option, ) -> Result { - let blob = if let Some(ref crypt_config) = crypt_config { - if sign_only { - DataBlob::create_signed(&data, crypt_config, compress)? + let blob = if let Some(ref crypt_config) = self.crypt_config { + if let Some(encrypt) = crypt_or_sign { + if encrypt { + DataBlob::encode(&data, Some(crypt_config), compress)? + } else { + DataBlob::create_signed(&data, crypt_config, compress)? + } } else { - DataBlob::encode(&data, Some(crypt_config), compress)? + DataBlob::encode(&data, None, compress)? } } else { DataBlob::encode(&data, None, compress)? @@ -187,8 +193,8 @@ impl BackupWriter { &self, src_path: P, file_name: &str, - crypt_config: Option>, compress: bool, + crypt_or_sign: Option, ) -> Result { let src_path = src_path.as_ref(); @@ -203,25 +209,16 @@ impl BackupWriter { .await .map_err(|err| format_err!("unable to read file {:?} - {}", src_path, err))?; - let blob = DataBlob::encode(&contents, crypt_config.as_ref().map(AsRef::as_ref), compress)?; - let raw_data = blob.into_inner(); - let size = raw_data.len() as u64; - let csum = openssl::sha::sha256(&raw_data); - let param = json!({ - "encoded-size": size, - "file-name": file_name, - }); - self.h2.upload("POST", "blob", Some(param), "application/octet-stream", raw_data).await?; - Ok(BackupStats { size, csum }) + self.upload_blob_from_data(contents, file_name, compress, crypt_or_sign).await } pub async fn upload_stream( &self, + previous_manifest: Option>, archive_name: &str, stream: impl Stream>, prefix: &str, fixed_size: Option, - crypt_config: Option>, ) -> Result { let known_chunks = Arc::new(Mutex::new(HashSet::new())); @@ -233,7 +230,18 @@ impl BackupWriter { let index_path = format!("{}_index", prefix); let close_path = format!("{}_close", prefix); - self.download_chunk_list(&index_path, archive_name, known_chunks.clone()).await?; + if let Some(manifest) = previous_manifest { + // try, but ignore errors + match archive_type(archive_name) { + Ok(ArchiveType::FixedIndex) => { + let _ = self.download_previous_fixed_index(archive_name, &manifest, known_chunks.clone()).await; + } + Ok(ArchiveType::DynamicIndex) => { + let _ = self.download_previous_dynamic_index(archive_name, &manifest, known_chunks.clone()).await; + } + _ => { /* do nothing */ } + } + } let wid = self.h2.post(&index_path, Some(param)).await?.as_u64().unwrap(); @@ -244,7 +252,7 @@ impl BackupWriter { stream, &prefix, known_chunks.clone(), - crypt_config, + self.crypt_config.clone(), self.verbose, ) .await?; @@ -374,41 +382,93 @@ impl BackupWriter { (verify_queue_tx, verify_result_rx) } - pub async fn download_chunk_list( + pub async fn download_previous_fixed_index( &self, - path: &str, archive_name: &str, + manifest: &BackupManifest, known_chunks: Arc>>, - ) -> Result<(), Error> { + ) -> Result { + + let mut tmpfile = std::fs::OpenOptions::new() + .write(true) + .read(true) + .custom_flags(libc::O_TMPFILE) + .open("/tmp")?; let param = json!({ "archive-name": archive_name }); - let request = H2Client::request_builder("localhost", "GET", path, Some(param), None).unwrap(); + self.h2.download("previous", Some(param), &mut tmpfile).await?; - let h2request = self.h2.send_request(request, None).await?; - let resp = h2request.await?; + let index = FixedIndexReader::new(tmpfile) + .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?; + // Note: do not use values stored in index (not trusted) - instead, computed them again + let (csum, size) = index.compute_csum(); + manifest.verify_file(archive_name, &csum, size)?; - let status = resp.status(); - - if !status.is_success() { - H2Client::h2api_response(resp).await?; // raise error - unreachable!(); - } - - let mut body = resp.into_body(); - let mut flow_control = body.flow_control().clone(); - - let mut stream = DigestListDecoder::new(body.map_err(Error::from)); - - while let Some(chunk) = stream.try_next().await? { - let _ = flow_control.release_capacity(chunk.len()); - known_chunks.lock().unwrap().insert(chunk); + // add index chunks to known chunks + let mut known_chunks = known_chunks.lock().unwrap(); + for i in 0..index.index_count() { + known_chunks.insert(*index.index_digest(i).unwrap()); } if self.verbose { - println!("{}: known chunks list length is {}", archive_name, known_chunks.lock().unwrap().len()); + println!("{}: known chunks list length is {}", archive_name, index.index_count()); } - Ok(()) + Ok(index) + } + + pub async fn download_previous_dynamic_index( + &self, + archive_name: &str, + manifest: &BackupManifest, + known_chunks: Arc>>, + ) -> Result { + + let mut tmpfile = std::fs::OpenOptions::new() + .write(true) + .read(true) + .custom_flags(libc::O_TMPFILE) + .open("/tmp")?; + + let param = json!({ "archive-name": archive_name }); + self.h2.download("previous", Some(param), &mut tmpfile).await?; + + let index = DynamicIndexReader::new(tmpfile) + .map_err(|err| format_err!("unable to read fixed index '{}' - {}", archive_name, err))?; + // Note: do not use values stored in index (not trusted) - instead, computed them again + let (csum, size) = index.compute_csum(); + manifest.verify_file(archive_name, &csum, size)?; + + // add index chunks to known chunks + let mut known_chunks = known_chunks.lock().unwrap(); + for i in 0..index.index_count() { + known_chunks.insert(*index.index_digest(i).unwrap()); + } + + if self.verbose { + println!("{}: known chunks list length is {}", archive_name, index.index_count()); + } + + Ok(index) + } + + /// Download backup manifest (index.json) of last backup + pub async fn download_previous_manifest(&self) -> Result { + + use std::convert::TryFrom; + + let mut raw_data = Vec::with_capacity(64 * 1024); + + let param = json!({ "archive-name": MANIFEST_BLOB_NAME }); + self.h2.download("previous", Some(param), &mut raw_data).await?; + + let blob = DataBlob::from_raw(raw_data)?; + blob.verify_crc()?; + let data = blob.decode(self.crypt_config.as_ref().map(Arc::as_ref))?; + let json: Value = serde_json::from_slice(&data[..])?; + let manifest = BackupManifest::try_from(json)?; + + Ok(manifest) } fn upload_chunk_info_stream(