diff --git a/pbs-api-types/src/datastore.rs b/pbs-api-types/src/datastore.rs index 9a549fb0..0ef4240b 100644 --- a/pbs-api-types/src/datastore.rs +++ b/pbs-api-types/src/datastore.rs @@ -1060,9 +1060,22 @@ pub struct DatastoreWithNamespace { impl fmt::Display for DatastoreWithNamespace { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { if self.ns.is_root() { - write!(f, "{}", self.store) + write!(f, "datastore {}, root namespace", self.store) } else { - write!(f, "{}/{}", self.store, self.ns) + write!(f, "datastore '{}', namespace '{}'", self.store, self.ns) + } + } +} + +impl DatastoreWithNamespace { + pub fn acl_path(&self) -> Vec<&str> { + let mut path: Vec<&str> = vec!["datastore", &self.store]; + + if self.ns.is_root() { + path + } else { + path.extend(self.ns.inner.iter().map(|comp| comp.as_str())); + path } } } diff --git a/pbs-api-types/src/jobs.rs b/pbs-api-types/src/jobs.rs index 87009b3a..368e60e3 100644 --- a/pbs-api-types/src/jobs.rs +++ b/pbs-api-types/src/jobs.rs @@ -9,15 +9,15 @@ use proxmox_schema::*; use crate::{ Authid, BackupNamespace, BackupType, RateLimitConfig, Userid, BACKUP_GROUP_SCHEMA, BACKUP_NAMESPACE_SCHEMA, DATASTORE_SCHEMA, DRIVE_NAME_SCHEMA, MEDIA_POOL_NAME_SCHEMA, - PROXMOX_SAFE_ID_FORMAT, REMOTE_ID_SCHEMA, SINGLE_LINE_COMMENT_SCHEMA, + NS_MAX_DEPTH_SCHEMA, PROXMOX_SAFE_ID_FORMAT, REMOTE_ID_SCHEMA, SINGLE_LINE_COMMENT_SCHEMA, }; const_regex! { /// Regex for verification jobs 'DATASTORE:ACTUAL_JOB_ID' pub VERIFICATION_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):"); - /// Regex for sync jobs 'REMOTE:REMOTE_DATASTORE:LOCAL_DATASTORE:ACTUAL_JOB_ID' - pub SYNC_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):"); + /// Regex for sync jobs 'REMOTE:REMOTE_DATASTORE:LOCAL_DATASTORE:(?:LOCAL_NS_ANCHOR:)ACTUAL_JOB_ID' + pub SYNC_JOB_WORKER_ID_REGEX = concat!(r"^(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(", PROXMOX_SAFE_ID_REGEX_STR!(), r"):(?:", BACKUP_NS_RE!(), r"):"); } pub const JOB_ID_SCHEMA: Schema = StringSchema::new("Job ID.") @@ -413,6 +413,10 @@ pub const GROUP_FILTER_LIST_SCHEMA: Schema = store: { schema: DATASTORE_SCHEMA, }, + ns: { + type: BackupNamespace, + optional: true, + }, "owner": { type: Authid, optional: true, @@ -423,10 +427,18 @@ pub const GROUP_FILTER_LIST_SCHEMA: Schema = "remote-store": { schema: DATASTORE_SCHEMA, }, + "remote-ns": { + type: BackupNamespace, + optional: true, + }, "remove-vanished": { schema: REMOVE_VANISHED_BACKUPS_SCHEMA, optional: true, }, + "max-depth": { + schema: NS_MAX_DEPTH_SCHEMA, + optional: true, + }, comment: { optional: true, schema: SINGLE_LINE_COMMENT_SCHEMA, @@ -452,11 +464,17 @@ pub struct SyncJobConfig { pub id: String, pub store: String, #[serde(skip_serializing_if = "Option::is_none")] + pub ns: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub owner: Option, pub remote: String, pub remote_store: String, #[serde(skip_serializing_if = "Option::is_none")] + pub remote_ns: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub remove_vanished: Option, + #[serde(default)] + pub max_depth: usize, #[serde(skip_serializing_if = "Option::is_none")] pub comment: Option, #[serde(skip_serializing_if = "Option::is_none")] diff --git a/src/api2/config/sync.rs b/src/api2/config/sync.rs index cb87314c..b02ae691 100644 --- a/src/api2/config/sync.rs +++ b/src/api2/config/sync.rs @@ -1,6 +1,8 @@ use ::serde::{Deserialize, Serialize}; use anyhow::{bail, Error}; use hex::FromHex; +use pbs_api_types::BackupNamespace; +use pbs_api_types::MAX_NAMESPACE_DEPTH; use serde_json::Value; use proxmox_router::{http_bail, Permission, Router, RpcEnvironment}; @@ -25,11 +27,21 @@ pub fn check_sync_job_read_access( return false; } + if let Some(ref ns) = job.ns { + let ns_privs = user_info.lookup_privs(auth_id, &["datastore", &job.store, &ns.to_string()]); + if ns_privs & PRIV_DATASTORE_AUDIT == 0 { + return false; + } + } + let remote_privs = user_info.lookup_privs(auth_id, &["remote", &job.remote]); remote_privs & PRIV_REMOTE_AUDIT != 0 } -// user can run the corresponding pull job +/// checks whether user can run the corresponding pull job +/// +/// namespace creation/deletion ACL and backup group ownership checks happen in the pull code directly. +/// remote side checks/filters remote datastore/namespace/group access. pub fn check_sync_job_modify_access( user_info: &CachedUserInfo, auth_id: &Authid, @@ -40,6 +52,13 @@ pub fn check_sync_job_modify_access( return false; } + if let Some(ref ns) = job.ns { + let ns_privs = user_info.lookup_privs(auth_id, &["datastore", &job.store, &ns.to_string()]); + if ns_privs & PRIV_DATASTORE_BACKUP == 0 { + return false; + } + } + if let Some(true) = job.remove_vanished { if datastore_privs & PRIV_DATASTORE_PRUNE == 0 { return false; @@ -198,6 +217,10 @@ pub enum DeletableProperty { rate_out, /// Delete the burst_out property. burst_out, + /// Delete the ns property, + ns, + /// Delete the remote_ns property, + remote_ns, } #[api( @@ -283,10 +306,28 @@ pub fn update_sync_job( DeletableProperty::burst_out => { data.limit.burst_out = None; } + DeletableProperty::ns => { + data.ns = None; + } + DeletableProperty::remote_ns => { + data.remote_ns = None; + } } } } + let check_max_depth = |ns: &BackupNamespace, depth| -> Result<(), Error> { + if ns.depth() + depth >= MAX_NAMESPACE_DEPTH { + bail!( + "namespace and recursion depth exceed limit: {} + {} >= {}", + ns.depth(), + depth, + MAX_NAMESPACE_DEPTH + ); + } + Ok(()) + }; + if let Some(comment) = update.comment { let comment = comment.trim().to_string(); if comment.is_empty() { @@ -299,12 +340,23 @@ pub fn update_sync_job( if let Some(store) = update.store { data.store = store; } + if let Some(ns) = update.ns { + check_max_depth(&ns, update.max_depth.unwrap_or(data.max_depth))?; + data.ns = Some(ns); + } if let Some(remote) = update.remote { data.remote = remote; } if let Some(remote_store) = update.remote_store { data.remote_store = remote_store; } + if let Some(remote_ns) = update.remote_ns { + check_max_depth( + &remote_ns, + update.max_depth.unwrap_or(data.max_depth), + )?; + data.remote_ns = Some(remote_ns); + } if let Some(owner) = update.owner { data.owner = Some(owner); } @@ -335,6 +387,15 @@ pub fn update_sync_job( if update.remove_vanished.is_some() { data.remove_vanished = update.remove_vanished; } + if let Some(max_depth) = update.max_depth { + if let Some(ref ns) = data.ns { + check_max_depth(ns, max_depth)?; + } + if let Some(ref ns) = data.remote_ns { + check_max_depth(ns, max_depth)?; + } + data.max_depth = max_depth; + } if !check_sync_job_modify_access(&user_info, &auth_id, &data) { bail!("permission check failed"); @@ -453,10 +514,13 @@ acl:1:/remote/remote1/remotestore1:write@pbs:RemoteSyncOperator id: "regular".to_string(), remote: "remote0".to_string(), remote_store: "remotestore1".to_string(), + remote_ns: None, store: "localstore0".to_string(), + ns: None, owner: Some(write_auth_id.clone()), comment: None, remove_vanished: None, + max_depth: 0, group_filter: None, schedule: None, limit: pbs_api_types::RateLimitConfig::default(), // no limit diff --git a/src/api2/node/tasks.rs b/src/api2/node/tasks.rs index 05fd4fe7..f045dcac 100644 --- a/src/api2/node/tasks.rs +++ b/src/api2/node/tasks.rs @@ -39,6 +39,7 @@ fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) -> let remote = captures.get(1); let remote_store = captures.get(2); let local_store = captures.get(3); + let local_ns = captures.get(4).map(|m| m.as_str()); if let (Some(remote), Some(remote_store), Some(local_store)) = (remote, remote_store, local_store) @@ -46,6 +47,7 @@ fn check_job_privs(auth_id: &Authid, user_info: &CachedUserInfo, upid: &UPID) -> return check_pull_privs( auth_id, local_store.as_str(), + local_ns, remote.as_str(), remote_store.as_str(), false, diff --git a/src/api2/pull.rs b/src/api2/pull.rs index e89f867c..7ff1a799 100644 --- a/src/api2/pull.rs +++ b/src/api2/pull.rs @@ -9,9 +9,9 @@ use proxmox_schema::api; use proxmox_sys::task_log; use pbs_api_types::{ - Authid, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA, - GROUP_FILTER_LIST_SCHEMA, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, PRIV_REMOTE_READ, - REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, + Authid, BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA, + GROUP_FILTER_LIST_SCHEMA, NS_MAX_DEPTH_SCHEMA, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_PRUNE, + PRIV_REMOTE_READ, REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, }; use pbs_config::CachedUserInfo; use proxmox_rest_server::WorkerTask; @@ -22,13 +22,24 @@ use crate::server::pull::{pull_store, PullParameters}; pub fn check_pull_privs( auth_id: &Authid, store: &str, + ns: Option<&str>, remote: &str, remote_store: &str, delete: bool, ) -> Result<(), Error> { let user_info = CachedUserInfo::new()?; - user_info.check_privs(auth_id, &["datastore", store], PRIV_DATASTORE_BACKUP, false)?; + let local_store_ns_acl_path = match ns { + Some(ns) => vec!["datastore", store, ns], + None => vec!["datastore", store], + }; + + user_info.check_privs( + auth_id, + &local_store_ns_acl_path, + PRIV_DATASTORE_BACKUP, + false, + )?; user_info.check_privs( auth_id, &["remote", remote, remote_store], @@ -37,7 +48,12 @@ pub fn check_pull_privs( )?; if delete { - user_info.check_privs(auth_id, &["datastore", store], PRIV_DATASTORE_PRUNE, false)?; + user_info.check_privs( + auth_id, + &local_store_ns_acl_path, + PRIV_DATASTORE_PRUNE, + false, + )?; } Ok(()) @@ -49,14 +65,17 @@ impl TryFrom<&SyncJobConfig> for PullParameters { fn try_from(sync_job: &SyncJobConfig) -> Result { PullParameters::new( &sync_job.store, + sync_job.ns.clone().unwrap_or_default(), &sync_job.remote, &sync_job.remote_store, + sync_job.remote_ns.clone().unwrap_or_default(), sync_job .owner .as_ref() .unwrap_or_else(|| Authid::root_auth_id()) .clone(), sync_job.remove_vanished, + sync_job.max_depth, sync_job.group_filter.clone(), sync_job.limit.clone(), ) @@ -71,10 +90,11 @@ pub fn do_sync_job( to_stdout: bool, ) -> Result { let job_id = format!( - "{}:{}:{}:{}", + "{}:{}:{}:{}:{}", sync_job.remote, sync_job.remote_store, sync_job.store, + sync_job.ns.clone().unwrap_or_default(), job.jobname() ); let worker_type = job.jobtype().to_string(); @@ -154,16 +174,28 @@ pub fn do_sync_job( store: { schema: DATASTORE_SCHEMA, }, + ns: { + type: BackupNamespace, + optional: true, + }, remote: { schema: REMOTE_ID_SCHEMA, }, "remote-store": { schema: DATASTORE_SCHEMA, }, + "remote-ns": { + type: BackupNamespace, + optional: true, + }, "remove-vanished": { schema: REMOVE_VANISHED_BACKUPS_SCHEMA, optional: true, }, + "max-depth": { + schema: NS_MAX_DEPTH_SCHEMA, + optional: true, + }, "group-filter": { schema: GROUP_FILTER_LIST_SCHEMA, optional: true, @@ -186,9 +218,12 @@ The delete flag additionally requires the Datastore.Prune privilege on '/datasto /// Sync store from other repository async fn pull( store: String, + ns: Option, remote: String, remote_store: String, + remote_ns: Option, remove_vanished: Option, + max_depth: Option, group_filter: Option>, limit: RateLimitConfig, _info: &ApiMethod, @@ -197,14 +232,32 @@ async fn pull( let auth_id: Authid = rpcenv.get_auth_id().unwrap().parse()?; let delete = remove_vanished.unwrap_or(false); - check_pull_privs(&auth_id, &store, &remote, &remote_store, delete)?; + let ns = ns.unwrap_or_default(); + let max_depth = max_depth.unwrap_or(0); + let ns_str = if ns.is_root() { + None + } else { + Some(ns.to_string()) + }; + + check_pull_privs( + &auth_id, + &store, + ns_str.as_deref(), + &remote, + &remote_store, + delete, + )?; let pull_params = PullParameters::new( &store, + ns, &remote, &remote_store, + remote_ns.unwrap_or_default(), auth_id.clone(), remove_vanished, + max_depth, group_filter, limit, )?; @@ -217,7 +270,13 @@ async fn pull( auth_id.to_string(), true, move |worker| async move { - task_log!(worker, "sync datastore '{}' start", store); + task_log!( + worker, + "pull datastore '{}' from '{}/{}'", + store, + remote, + remote_store, + ); let pull_future = pull_store(&worker, &client, &pull_params); let future = select! { @@ -227,7 +286,7 @@ async fn pull( let _ = future?; - task_log!(worker, "sync datastore '{}' end", store); + task_log!(worker, "pull datastore '{}' end", store); Ok(()) }, diff --git a/src/bin/proxmox-backup-manager.rs b/src/bin/proxmox-backup-manager.rs index 652da681..02f9a6b6 100644 --- a/src/bin/proxmox-backup-manager.rs +++ b/src/bin/proxmox-backup-manager.rs @@ -12,8 +12,9 @@ use proxmox_sys::fs::CreateOptions; use pbs_api_types::percent_encoding::percent_encode_component; use pbs_api_types::{ BackupNamespace, GroupFilter, RateLimitConfig, SyncJobConfig, DATASTORE_SCHEMA, - GROUP_FILTER_LIST_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, REMOTE_ID_SCHEMA, - REMOVE_VANISHED_BACKUPS_SCHEMA, UPID_SCHEMA, VERIFICATION_OUTDATED_AFTER_SCHEMA, + GROUP_FILTER_LIST_SCHEMA, IGNORE_VERIFIED_BACKUPS_SCHEMA, NS_MAX_DEPTH_SCHEMA, + REMOTE_ID_SCHEMA, REMOVE_VANISHED_BACKUPS_SCHEMA, UPID_SCHEMA, + VERIFICATION_OUTDATED_AFTER_SCHEMA, }; use pbs_client::{display_task_log, view_task_result}; use pbs_config::sync; @@ -234,19 +235,31 @@ fn task_mgmt_cli() -> CommandLineInterface { #[api( input: { properties: { - "local-store": { + "store": { schema: DATASTORE_SCHEMA, }, + "ns": { + type: BackupNamespace, + optional: true, + }, remote: { schema: REMOTE_ID_SCHEMA, }, "remote-store": { schema: DATASTORE_SCHEMA, }, + "remote-ns": { + type: BackupNamespace, + optional: true, + }, "remove-vanished": { schema: REMOVE_VANISHED_BACKUPS_SCHEMA, optional: true, }, + "max-depth": { + schema: NS_MAX_DEPTH_SCHEMA, + optional: true, + }, "group-filter": { schema: GROUP_FILTER_LIST_SCHEMA, optional: true, @@ -266,8 +279,11 @@ fn task_mgmt_cli() -> CommandLineInterface { async fn pull_datastore( remote: String, remote_store: String, - local_store: String, + remote_ns: Option, + store: String, + ns: Option, remove_vanished: Option, + max_depth: Option, group_filter: Option>, limit: RateLimitConfig, param: Value, @@ -277,12 +293,24 @@ async fn pull_datastore( let client = connect_to_localhost()?; let mut args = json!({ - "store": local_store, + "store": store, "remote": remote, "remote-store": remote_store, "limit": limit, }); + if remote_ns.is_some() { + args["remote-ns"] = json!(remote_ns); + } + + if ns.is_some() { + args["local-ns"] = json!(ns); + } + + if max_depth.is_some() { + args["max-depth"] = json!(max_depth); + } + if group_filter.is_some() { args["group-filter"] = json!(group_filter); } @@ -406,14 +434,13 @@ async fn run() -> Result<(), Error> { .insert( "pull", CliCommand::new(&API_METHOD_PULL_DATASTORE) - .arg_param(&["remote", "remote-store", "local-store"]) - .completion_cb( - "local-store", - pbs_config::datastore::complete_datastore_name, - ) + .arg_param(&["remote", "remote-store", "store"]) + .completion_cb("store", pbs_config::datastore::complete_datastore_name) + .completion_cb("ns", complete_sync_local_datastore_namespace) .completion_cb("remote", pbs_config::remote::complete_remote_name) .completion_cb("remote-store", complete_remote_datastore_name) - .completion_cb("group-filter", complete_remote_datastore_group_filter), + .completion_cb("group-filter", complete_remote_datastore_group_filter) + .completion_cb("remote-ns", complete_remote_datastore_namespace), ) .insert( "verify", @@ -507,6 +534,12 @@ fn get_remote_ns(param: &HashMap) -> Option { if let Some(ns_str) = param.get("remote-ns") { BackupNamespace::from_str(ns_str).ok() } else { + if let Some(id) = param.get("id") { + let job = get_sync_job(id).ok(); + if let Some(ref job) = job { + return job.remote_ns.clone(); + } + } None } } diff --git a/src/bin/proxmox_backup_manager/sync.rs b/src/bin/proxmox_backup_manager/sync.rs index 9ba7578a..1f5f6523 100644 --- a/src/bin/proxmox_backup_manager/sync.rs +++ b/src/bin/proxmox_backup_manager/sync.rs @@ -103,12 +103,14 @@ pub fn sync_job_commands() -> CommandLineInterface { .completion_cb("id", pbs_config::sync::complete_sync_job_id) .completion_cb("schedule", pbs_config::datastore::complete_calendar_event) .completion_cb("store", pbs_config::datastore::complete_datastore_name) + .completion_cb("ns", crate::complete_sync_local_datastore_namespace) .completion_cb("remote", pbs_config::remote::complete_remote_name) .completion_cb("remote-store", crate::complete_remote_datastore_name) .completion_cb( "group-filter", crate::complete_remote_datastore_group_filter, - ), + ) + .completion_cb("remote-ns", crate::complete_remote_datastore_namespace), ) .insert( "update", @@ -117,11 +119,13 @@ pub fn sync_job_commands() -> CommandLineInterface { .completion_cb("id", pbs_config::sync::complete_sync_job_id) .completion_cb("schedule", pbs_config::datastore::complete_calendar_event) .completion_cb("store", pbs_config::datastore::complete_datastore_name) + .completion_cb("ns", crate::complete_sync_local_datastore_namespace) .completion_cb("remote-store", crate::complete_remote_datastore_name) .completion_cb( "group-filter", crate::complete_remote_datastore_group_filter, - ), + ) + .completion_cb("remote-ns", crate::complete_remote_datastore_namespace), ) .insert( "remove", diff --git a/src/server/pull.rs b/src/server/pull.rs index 0ad9aac6..19b1df8d 100644 --- a/src/server/pull.rs +++ b/src/server/pull.rs @@ -1,5 +1,6 @@ //! Sync datastore from remote server +use std::cmp::min; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::io::{Seek, SeekFrom}; @@ -9,14 +10,16 @@ use std::time::SystemTime; use anyhow::{bail, format_err, Error}; use http::StatusCode; +use pbs_config::CachedUserInfo; use serde_json::json; use proxmox_router::HttpError; use proxmox_sys::task_log; use pbs_api_types::{ - Authid, BackupNamespace, GroupFilter, GroupListItem, Operation, RateLimitConfig, Remote, - SnapshotListItem, + Authid, BackupNamespace, DatastoreWithNamespace, GroupFilter, GroupListItem, NamespaceListItem, + Operation, RateLimitConfig, Remote, SnapshotListItem, MAX_NAMESPACE_DEPTH, + PRIV_DATASTORE_AUDIT, PRIV_DATASTORE_BACKUP, PRIV_DATASTORE_MODIFY, }; use pbs_client::{ @@ -43,10 +46,16 @@ pub struct PullParameters { source: BackupRepository, /// Local store that is pulled into store: Arc, + /// Remote namespace + remote_ns: BackupNamespace, + /// Local namespace (anchor) + ns: BackupNamespace, /// Owner of synced groups (needs to match local owner of pre-existing groups) owner: Authid, /// Whether to remove groups which exist locally, but not on the remote end remove_vanished: bool, + /// How many levels of sub-namespaces to pull (0 == no recursion) + max_depth: usize, /// Filters for reducing the pull scope group_filter: Option>, /// Rate limits for all transfers from `remote` @@ -60,15 +69,20 @@ impl PullParameters { /// [BackupRepository] with `remote_store`. pub fn new( store: &str, + ns: BackupNamespace, remote: &str, remote_store: &str, + remote_ns: BackupNamespace, owner: Authid, remove_vanished: Option, + max_depth: usize, group_filter: Option>, limit: RateLimitConfig, ) -> Result { let store = DataStore::lookup_datastore(store, Some(Operation::Write))?; + let max_depth = min(max_depth, MAX_NAMESPACE_DEPTH - remote_ns.depth()); + let (remote_config, _digest) = pbs_config::remote::config()?; let remote: Remote = remote_config.lookup("remote", remote)?; @@ -83,10 +97,13 @@ impl PullParameters { Ok(Self { remote, + remote_ns, + ns, source, store, owner, remove_vanished, + max_depth, group_filter, limit, }) @@ -96,6 +113,14 @@ impl PullParameters { pub async fn client(&self) -> Result { crate::api2::config::remote::remote_client(&self.remote, Some(self.limit.clone())).await } + + /// Returns DatastoreWithNamespace with namespace (or local namespace anchor). + pub fn store_with_ns(&self, ns: BackupNamespace) -> DatastoreWithNamespace { + DatastoreWithNamespace { + store: self.store.name().to_string(), + ns, + } + } } async fn pull_index_chunks( @@ -240,14 +265,12 @@ async fn pull_single_archive( worker: &WorkerTask, reader: &BackupReader, chunk_reader: &mut RemoteChunkReader, - tgt_store: Arc, - snapshot: &pbs_api_types::BackupDir, + snapshot: &pbs_datastore::BackupDir, archive_info: &FileInfo, downloaded_chunks: Arc>>, ) -> Result<(), Error> { let archive_name = &archive_info.filename; - let mut path = tgt_store.base_path(); - path.push(snapshot.to_string()); + let mut path = snapshot.full_path(); path.push(archive_name); let mut tmp_path = path.clone(); @@ -274,7 +297,7 @@ async fn pull_single_archive( pull_index_chunks( worker, chunk_reader.clone(), - tgt_store.clone(), + snapshot.datastore().clone(), index, downloaded_chunks, ) @@ -290,7 +313,7 @@ async fn pull_single_archive( pull_index_chunks( worker, chunk_reader.clone(), - tgt_store.clone(), + snapshot.datastore().clone(), index, downloaded_chunks, ) @@ -347,18 +370,13 @@ async fn try_client_log_download( async fn pull_snapshot( worker: &WorkerTask, reader: Arc, - tgt_store: Arc, - snapshot: &pbs_api_types::BackupDir, + snapshot: &pbs_datastore::BackupDir, downloaded_chunks: Arc>>, ) -> Result<(), Error> { - let snapshot_relative_path = snapshot.to_string(); - - let mut manifest_name = tgt_store.base_path(); - manifest_name.push(&snapshot_relative_path); + let mut manifest_name = snapshot.full_path(); manifest_name.push(MANIFEST_BLOB_NAME); - let mut client_log_name = tgt_store.base_path(); - client_log_name.push(&snapshot_relative_path); + let mut client_log_name = snapshot.full_path(); client_log_name.push(CLIENT_LOG_BLOB_NAME); let mut tmp_manifest_name = manifest_name.clone(); @@ -424,8 +442,7 @@ async fn pull_snapshot( let manifest = BackupManifest::try_from(tmp_manifest_blob)?; for item in manifest.files() { - let mut path = tgt_store.base_path(); - path.push(&snapshot_relative_path); + let mut path = snapshot.full_path(); path.push(&item.filename); if path.exists() { @@ -474,7 +491,6 @@ async fn pull_snapshot( worker, &reader, &mut chunk_reader, - tgt_store.clone(), snapshot, item, downloaded_chunks.clone(), @@ -491,38 +507,37 @@ async fn pull_snapshot( } // cleanup - remove stale files - tgt_store.cleanup_backup_dir(snapshot, &manifest)?; + snapshot + .datastore() + .cleanup_backup_dir(snapshot, &manifest)?; Ok(()) } -/// Pulls a `snapshot` into `tgt_store`, differentiating between new snapshots (removed on error) -/// and existing ones (kept even on error). +/// Pulls a `snapshot`, removing newly created ones on error, but keeping existing ones in any case. +/// +/// The `reader` is configured to read from the remote / source namespace, while the `snapshot` is +/// pointing to the local datastore and target namespace. async fn pull_snapshot_from( worker: &WorkerTask, reader: Arc, - tgt_store: Arc, - snapshot: &pbs_api_types::BackupDir, + snapshot: &pbs_datastore::BackupDir, downloaded_chunks: Arc>>, ) -> Result<(), Error> { - // FIXME: Namespace support requires source AND target namespace - let ns = BackupNamespace::root(); - let (_path, is_new, _snap_lock) = tgt_store.create_locked_backup_dir(&ns, snapshot)?; + let (_path, is_new, _snap_lock) = snapshot + .datastore() + .create_locked_backup_dir(snapshot.backup_ns(), snapshot.as_ref())?; let snapshot_path = snapshot.to_string(); if is_new { task_log!(worker, "sync snapshot {:?}", snapshot_path); - if let Err(err) = pull_snapshot( - worker, - reader, - tgt_store.clone(), - snapshot, - downloaded_chunks, - ) - .await - { - if let Err(cleanup_err) = tgt_store.remove_backup_dir(&ns, snapshot, true) { + if let Err(err) = pull_snapshot(worker, reader, snapshot, downloaded_chunks).await { + if let Err(cleanup_err) = snapshot.datastore().remove_backup_dir( + snapshot.backup_ns(), + snapshot.as_ref(), + true, + ) { task_log!(worker, "cleanup error - {}", cleanup_err); } return Err(err); @@ -530,14 +545,7 @@ async fn pull_snapshot_from( task_log!(worker, "sync snapshot {:?} done", snapshot_path); } else { task_log!(worker, "re-sync snapshot {:?}", snapshot_path); - pull_snapshot( - worker, - reader, - tgt_store.clone(), - snapshot, - downloaded_chunks, - ) - .await?; + pull_snapshot(worker, reader, snapshot, downloaded_chunks).await?; task_log!(worker, "re-sync snapshot {:?} done", snapshot_path); } @@ -590,13 +598,18 @@ impl std::fmt::Display for SkipInfo { /// Pulls a group according to `params`. /// /// Pulling a group consists of the following steps: -/// - Query the list of snapshots available for this group on the remote, sort by snapshot time +/// - Query the list of snapshots available for this group in the source namespace on the remote +/// - Sort by snapshot time /// - Get last snapshot timestamp on local datastore /// - Iterate over list of snapshots /// -- Recreate client/BackupReader /// -- pull snapshot, unless it's not finished yet or older than last local snapshot /// - (remove_vanished) list all local snapshots, remove those that don't exist on remote /// +/// Backwards-compat: if `source_ns` is [None], only the group type and ID will be sent to the +/// remote when querying snapshots. This allows us to interact with old remotes that don't have +/// namespace support yet. +/// /// Permission checks: /// - remote snapshot access is checked by remote (twice: query and opening the backup reader) /// - local group owner is already checked by pull_store @@ -605,21 +618,25 @@ async fn pull_group( client: &HttpClient, params: &PullParameters, group: &pbs_api_types::BackupGroup, + remote_ns: BackupNamespace, progress: &mut StoreProgress, ) -> Result<(), Error> { - // FIXME: Namespace support - let ns = BackupNamespace::root(); - let path = format!( "api2/json/admin/datastore/{}/snapshots", params.source.store() ); - let args = json!({ + let mut args = json!({ "backup-type": group.ty, "backup-id": group.id, }); + if !remote_ns.is_root() { + args["backup-ns"] = serde_json::to_value(&remote_ns)?; + } + + let target_ns = remote_ns.map_prefix(¶ms.remote_ns, ¶ms.ns)?; + let mut result = client.get(&path, Some(args)).await?; let mut list: Vec = serde_json::from_value(result["data"].take())?; @@ -629,7 +646,7 @@ async fn pull_group( let fingerprint = client.fingerprint(); - let last_sync = params.store.last_successful_backup(&ns, group)?; + let last_sync = params.store.last_successful_backup(&target_ns, group)?; let mut remote_snapshots = std::collections::HashSet::new(); @@ -684,20 +701,15 @@ async fn pull_group( new_client, None, params.source.store(), - &ns, + &remote_ns, &snapshot, true, ) .await?; - let result = pull_snapshot_from( - worker, - reader, - params.store.clone(), - &snapshot, - downloaded_chunks.clone(), - ) - .await; + let snapshot = params.store.backup_dir(target_ns.clone(), snapshot)?; + + let result = pull_snapshot_from(worker, reader, &snapshot, downloaded_chunks.clone()).await; progress.done_snapshots = pos as u64 + 1; task_log!(worker, "percentage done: {}", progress); @@ -706,7 +718,7 @@ async fn pull_group( } if params.remove_vanished { - let group = params.store.backup_group(ns.clone(), group.clone()); + let group = params.store.backup_group(target_ns.clone(), group.clone()); let local_list = group.list_backups()?; for info in local_list { let backup_time = info.backup_dir.backup_time(); @@ -728,7 +740,7 @@ async fn pull_group( ); params .store - .remove_backup_dir(&ns, info.backup_dir.as_ref(), false)?; + .remove_backup_dir(&target_ns, info.backup_dir.as_ref(), false)?; } } @@ -739,37 +751,290 @@ async fn pull_group( Ok(()) } +async fn query_namespaces( + client: &HttpClient, + params: &PullParameters, +) -> Result, Error> { + let path = format!( + "api2/json/admin/datastore/{}/namespace", + params.source.store() + ); + let data = json!({ + "max-depth": params.max_depth, + }); + let mut result = client + .get(&path, Some(data)) + .await + .map_err(|err| format_err!("Failed to retrieve namespaces from remote - {}", err))?; + let mut list: Vec = serde_json::from_value(result["data"].take())?; + + // parents first + list.sort_unstable_by(|a, b| a.ns.name_len().cmp(&b.ns.name_len())); + + Ok(list.iter().map(|item| item.ns.clone()).collect()) +} + +fn check_ns_privs( + store_with_ns: &DatastoreWithNamespace, + owner: &Authid, + privs: u64, +) -> Result<(), Error> { + let user_info = CachedUserInfo::new()?; + + // TODO re-sync with API, maybe find common place? + + let user_privs = user_info.lookup_privs(owner, &store_with_ns.acl_path()); + + if (user_privs & privs) == 0 { + bail!("no permission to modify parent/datastore."); + } + Ok(()) +} + +fn check_and_create_ns( + params: &PullParameters, + store_with_ns: &DatastoreWithNamespace, +) -> Result { + let ns = &store_with_ns.ns; + let mut created = false; + + if !ns.is_root() && !params.store.namespace_path(&ns).exists() { + let mut parent = ns.clone(); + let name = parent.pop(); + + let parent = params.store_with_ns(parent); + + if let Err(err) = check_ns_privs(&parent, ¶ms.owner, PRIV_DATASTORE_MODIFY) { + bail!( + "Not allowed to create namespace {} - {}", + store_with_ns, + err, + ); + } + if let Some(name) = name { + if let Err(err) = params.store.create_namespace(&parent.ns, name) { + bail!( + "sync namespace {} failed - namespace creation failed: {}", + &store_with_ns, + err + ); + } + created = true; + } else { + bail!( + "sync namespace {} failed - namespace creation failed - couldn't determine parent namespace", + &store_with_ns, + ); + } + } + + // TODO re-sync with API, maybe find common place? + if let Err(err) = check_ns_privs(&store_with_ns, ¶ms.owner, PRIV_DATASTORE_BACKUP) { + bail!("sync namespace {} failed - {}", &store_with_ns, err); + } + + Ok(created) +} + +fn check_and_remove_ns(params: &PullParameters, local_ns: &BackupNamespace) -> Result { + let parent = local_ns.clone().parent(); + check_ns_privs( + ¶ms.store_with_ns(parent), + ¶ms.owner, + PRIV_DATASTORE_MODIFY, + )?; + params.store.remove_namespace_recursive(local_ns) +} + +fn check_and_remove_vanished_ns( + worker: &WorkerTask, + params: &PullParameters, + synced_ns: HashSet, +) -> Result { + let mut errors = false; + let user_info = CachedUserInfo::new()?; + + let mut local_ns_list: Vec = params + .store + .recursive_iter_backup_ns_ok(params.ns.clone(), Some(params.max_depth))? + .filter(|ns| { + let store_with_ns = params.store_with_ns(ns.clone()); + let user_privs = user_info.lookup_privs(¶ms.owner, &store_with_ns.acl_path()); + user_privs & (PRIV_DATASTORE_BACKUP | PRIV_DATASTORE_AUDIT) != 0 + }) + .collect(); + + // children first! + local_ns_list.sort_unstable_by_key(|b| std::cmp::Reverse(b.name_len())); + + for local_ns in local_ns_list { + if local_ns == params.ns { + continue; + } + + if synced_ns.contains(&local_ns) { + continue; + } + + if local_ns.is_root() { + continue; + } + match check_and_remove_ns(params, &local_ns) { + Ok(true) => task_log!(worker, "Removed namespace {}", local_ns), + Ok(false) => task_log!( + worker, + "Did not remove namespace {} - protected snapshots remain", + local_ns + ), + Err(err) => { + task_log!(worker, "Failed to remove namespace {} - {}", local_ns, err); + errors = true; + } + } + } + + Ok(errors) +} + /// Pulls a store according to `params`. /// /// Pulling a store consists of the following steps: -/// - Query list of groups on the remote +/// - Query list of namespaces on the remote +/// - Iterate list +/// -- create sub-NS if needed (and allowed) +/// -- attempt to pull each NS in turn +/// - (remove_vanished && max_depth > 0) remove sub-NS which are not or no longer available on the remote +/// +/// Backwards compat: if the remote namespace is `/` and recursion is disabled, no namespace is +/// passed to the remote at all to allow pulling from remotes which have no notion of namespaces. +/// +/// Permission checks: +/// - access to local datastore, namespace anchor and remote entry need to be checked at call site +/// - remote namespaces are filtered by remote +/// - creation and removal of sub-NS checked here +/// - access to sub-NS checked here +pub async fn pull_store( + worker: &WorkerTask, + client: &HttpClient, + params: &PullParameters, +) -> Result<(), Error> { + // explicit create shared lock to prevent GC on newly created chunks + let _shared_store_lock = params.store.try_shared_chunk_store_lock()?; + + let namespaces = if params.remote_ns.is_root() && params.max_depth == 0 { + vec![params.remote_ns.clone()] // backwards compat - don't query remote namespaces! + } else { + query_namespaces(client, params).await? + }; + + let (mut groups, mut snapshots) = (0, 0); + let mut synced_ns = HashSet::with_capacity(namespaces.len()); + let mut errors = false; + + for namespace in namespaces { + let source_store_ns = DatastoreWithNamespace { + store: params.source.store().to_owned(), + ns: namespace.clone(), + }; + let target_ns = namespace.map_prefix(¶ms.remote_ns, ¶ms.ns)?; + let target_store_ns = params.store_with_ns(target_ns.clone()); + + task_log!(worker, "----"); + task_log!( + worker, + "Syncing {} into {}", + source_store_ns, + target_store_ns + ); + + synced_ns.insert(target_ns.clone()); + + match check_and_create_ns(params, &target_store_ns) { + Ok(true) => task_log!(worker, "Created namespace {}", target_ns), + Ok(false) => {} + Err(err) => { + task_log!( + worker, + "Cannot sync {} into {} - {}", + source_store_ns, + target_store_ns, + err, + ); + errors = true; + continue; + } + } + + match pull_ns(worker, client, params, namespace.clone(), target_ns).await { + Ok((ns_progress, ns_errors)) => { + errors |= ns_errors; + + if params.max_depth > 0 { + groups += ns_progress.done_groups; + snapshots += ns_progress.done_snapshots; + task_log!( + worker, + "Finished syncing namespace {}, current progress: {} groups, {} snapshots", + namespace, + groups, + snapshots, + ); + } + } + Err(err) => { + errors = true; + task_log!( + worker, + "Encountered errors while syncing namespace {} - {}", + namespace, + err, + ); + } + }; + } + + if params.remove_vanished { + errors |= check_and_remove_vanished_ns(worker, params, synced_ns)?; + } + + if errors { + bail!("sync failed with some errors."); + } + + Ok(()) +} + +/// Pulls a namespace according to `params`. +/// +/// Pulling a namespace consists of the following steps: +/// - Query list of groups on the remote (in `source_ns`) /// - Filter list according to configured group filters /// - Iterate list and attempt to pull each group in turn /// - (remove_vanished) remove groups with matching owner and matching the configured group filters which are /// not or no longer available on the remote /// /// Permission checks: -/// - access to local datastore and remote entry need to be checked at call site -/// - remote groups are filtered by remote +/// - remote namespaces are filtered by remote /// - owner check for vanished groups done here -pub async fn pull_store( +pub async fn pull_ns( worker: &WorkerTask, client: &HttpClient, params: &PullParameters, -) -> Result<(), Error> { - // FIXME: Namespace support requires source AND target namespace - let ns = BackupNamespace::root(); - let local_ns = BackupNamespace::root(); - - // explicit create shared lock to prevent GC on newly created chunks - let _shared_store_lock = params.store.try_shared_chunk_store_lock()?; - - // FIXME: Namespaces! AND: If we make this API call recurse down namespaces we need to do the - // same down in the `remove_vanished` case! + source_ns: BackupNamespace, + target_ns: BackupNamespace, +) -> Result<(StoreProgress, bool), Error> { let path = format!("api2/json/admin/datastore/{}/groups", params.source.store()); + let args = if !source_ns.is_root() { + Some(json!({ + "backup-ns": source_ns, + })) + } else { + None + }; + let mut result = client - .get(&path, None) + .get(&path, args) .await .map_err(|err| format_err!("Failed to retrieve backup groups from remote - {}", err))?; @@ -789,6 +1054,7 @@ pub async fn pull_store( filters.iter().any(|filter| group.matches(filter)) }; + // Get groups with target NS set let list: Vec = list.into_iter().map(|item| item.backup).collect(); let list = if let Some(ref group_filter) = ¶ms.group_filter { @@ -826,7 +1092,7 @@ pub async fn pull_store( let (owner, _lock_guard) = match params .store - .create_locked_backup_group(&ns, &group, ¶ms.owner) + .create_locked_backup_group(&target_ns, &group, ¶ms.owner) { Ok(result) => result, Err(err) => { @@ -852,7 +1118,16 @@ pub async fn pull_store( owner ); errors = true; // do not stop here, instead continue - } else if let Err(err) = pull_group(worker, client, params, &group, &mut progress).await { + } else if let Err(err) = pull_group( + worker, + client, + params, + &group, + source_ns.clone(), + &mut progress, + ) + .await + { task_log!(worker, "sync group {} failed - {}", &group, err,); errors = true; // do not stop here, instead continue } @@ -860,13 +1135,12 @@ pub async fn pull_store( if params.remove_vanished { let result: Result<(), Error> = proxmox_lang::try_block!({ - // FIXME: See above comment about namespaces & recursion - for local_group in params.store.iter_backup_groups(Default::default())? { + for local_group in params.store.iter_backup_groups(target_ns.clone())? { let local_group = local_group?; if new_groups.contains(local_group.as_ref()) { continue; } - let owner = params.store.get_owner(&local_ns, &local_group.group())?; + let owner = params.store.get_owner(&target_ns, local_group.group())?; if check_backup_owner(&owner, ¶ms.owner).is_err() { continue; } @@ -881,7 +1155,10 @@ pub async fn pull_store( local_group.backup_type(), local_group.backup_id() ); - match params.store.remove_backup_group(&ns, local_group.as_ref()) { + match params + .store + .remove_backup_group(&target_ns, local_group.as_ref()) + { Ok(true) => {} Ok(false) => { task_log!( @@ -904,9 +1181,5 @@ pub async fn pull_store( }; } - if errors { - bail!("sync failed with some errors."); - } - - Ok(()) + Ok((progress, errors)) }