[pbs-devel] [PATCH v4 proxmox-backup 3/6] pbs-datastore: add active operations tracking
Dominik Csapak
d.csapak at proxmox.com
Fri Nov 19 16:21:04 CET 2021
comments inline
On 11/12/21 13:30, Hannes Laimer wrote:
> Saves the currently active read/write operation counts in a file. The
> file is updated whenever a reference returned by lookup_datastore is
> dropped and whenever a reference is returned by lookup_datastore. The
> files are locked before every access, there is one file per datastore.
> ---
> v3->v4: implement Clone for the DataStore wrapper struct. So clones are
> tracked correctly. On every update of the file, the lines of inactive
> processes are removed. A line looks like this:
> <pid> <read> <write>
> So, PID has <read>/<write> active read/write operations. This is needed
> because if we just had one number for read/write and some process would
> die, we are not able to find out how many operations were ended with
> it. Like this we only consider lines that have a PID that belongs to an
> active process.
>
> pbs-api-types/src/maintenance.rs | 1 +
> pbs-datastore/Cargo.toml | 1 +
> pbs-datastore/src/datastore.rs | 166 +++++++++++++++++++++++++------
> pbs-datastore/src/lib.rs | 3 +
> src/bin/proxmox-backup-api.rs | 1 +
> src/server/mod.rs | 16 ++-
> 6 files changed, 158 insertions(+), 30 deletions(-)
>
> diff --git a/pbs-api-types/src/maintenance.rs b/pbs-api-types/src/maintenance.rs
> index f816b279..98e3ec62 100644
> --- a/pbs-api-types/src/maintenance.rs
> +++ b/pbs-api-types/src/maintenance.rs
> @@ -25,6 +25,7 @@ pub enum MaintenanceType {
> Offline(String),
> }
>
> +#[derive(Copy ,Clone)]
> /// Operation requirments, used when checking for maintenance mode.
> pub enum Operation {
> Read,
> diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
> index 01c5ee00..2063c1bb 100644
> --- a/pbs-datastore/Cargo.toml
> +++ b/pbs-datastore/Cargo.toml
> @@ -34,5 +34,6 @@ proxmox-time = "1"
> proxmox-uuid = "1"
>
> pbs-api-types = { path = "../pbs-api-types" }
> +pbs-buildcfg = { path = "../pbs-buildcfg" }
> pbs-tools = { path = "../pbs-tools" }
> pbs-config = { path = "../pbs-config" }
> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
> index 064fd273..d8c2f3f5 100644
> --- a/pbs-datastore/src/datastore.rs
> +++ b/pbs-datastore/src/datastore.rs
> @@ -9,6 +9,7 @@ use std::time::Duration;
> use anyhow::{bail, format_err, Error};
> use lazy_static::lazy_static;
>
> +use proxmox::sys::linux::procfs;
> use proxmox::tools::fs::{replace_file, file_read_optional_string, CreateOptions};
>
> use pbs_api_types::{UPID, DataStoreConfig, Authid, GarbageCollectionStatus, MaintenanceType, Operation};
> @@ -31,7 +32,7 @@ use crate::manifest::{
> };
>
> lazy_static! {
> - static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
> + static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStoreImpl>>> = Mutex::new(HashMap::new());
> }
>
> /// checks if auth_id is owner, or, if owner is a token, if
> @@ -52,13 +53,113 @@ pub fn check_backup_owner(
> ///
> /// A Datastore can store severals backups, and provides the
> /// management interface for backup.
> -pub struct DataStore {
> +pub struct DataStoreImpl {
> chunk_store: Arc<ChunkStore>,
> gc_mutex: Mutex<()>,
> last_gc_status: Mutex<GarbageCollectionStatus>,
> verify_new: bool,
> }
>
> +pub struct DataStore {
> + inner: Arc<DataStoreImpl>,
> + operation: Option<Operation>,
> +}
> +
> +impl Clone for DataStore {
> + fn clone(&self) -> Self {
> + if let Some(operation) = self.operation.clone() {
> + if let Err(e) = update_active_operations(self.name(), operation, -1) {
> + eprintln!("could not update active operations - {}", e);
> + }
> + }
> + DataStore {
> + inner: self.inner.clone(),
> + operation: self.operation.clone(),
> + }
> + }
> +}
> +
> +fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> {
> + let mut path = PathBuf::from(crate::ACTIVE_OPERATIONS_DIR);
> + let mut lock_path = PathBuf::from(crate::ACTIVE_OPERATIONS_DIR);
> + path.push(name);
> + lock_path.push(format!(".{}.lock", name));
> +
> + let user = pbs_config::backup_user()?;
> + let options = proxmox::tools::fs::CreateOptions::new()
> + .group(user.gid)
> + .owner(user.uid)
> + .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> + let timeout = std::time::Duration::new(10, 0);
> + proxmox::tools::fs::open_file_locked(&lock_path, timeout, true, options.clone())?;
> +
> + let pid = std::process::id() as i64;
> + proxmox::tools::fs::replace_file(
> + &path,
> + match proxmox::tools::fs::file_get_optional_contents(&path) {
you could do that outside of the replace_file call
removes a level of indentation
> + Ok(Some(data)) => {
> + let mut updated = false;
> + let new_data = String::from_utf8(data)
we already have: file_read_optional_string
so no need to convert it ourselves
removes of at least one level of intendation
> + .unwrap_or(String::new())
> + .lines()
> + .into_iter()
> + .fold(String::new(), |xs, line| {
any special reason to use fold with the permanent allocation (format!())
instead of simply using a loop and appending to a string?
> + let split = line.split(" ").collect::<Vec<&str>>();
> + match split[0].parse::<i32>() {
> + Ok(line_pid) if line_pid as i64 == pid && split.len() == 3 => {
> + let stat = (
> + split[1].parse::<i64>().unwrap_or(0),
> + split[2].parse::<i64>().unwrap_or(0),
> + );
> + updated = true;
> + match (operation, stat) {
> + (Operation::Write, (r, w)) => {
> + format!("{}\n{} {} {}", xs, pid, r, w + count)
> + }
> + (Operation::Read, (r, w)) => {
> + format!("{}\n{} {} {}", xs, pid, r + count, w)
> + }
> + }
> + }
> + Ok(line_pid)
> + if procfs::check_process_running(line_pid).is_some()
> + && split.len() == 3 =>
> + {
> + format!("{}\n{}", xs, line)
> + }
> + _ => xs,
imho, the line parsing/printing should be their own function, that
way one can more easily reason about the behaviour...
> + }
> + });
> + match operation {
> + Operation::Read if !updated => format!("{}\n{} {} {}", new_data, pid, 1, 0),
> + Operation::Write if !updated => format!("{}\n{} {} {}", new_data, pid, 0, 1),
> + _ => new_data,
> + }
> + }
> + _ => match operation {
> + Operation::Read => format!("{} {} {}", pid, 1, 0),
> + Operation::Write => format!("{} {} {}", pid, 0, 1),
> + },
> + }
> + .as_bytes(),
> + options,
> + false,
> + )?;
> +
> + Ok(())
> +}
> +
> +impl Drop for DataStore {
> + fn drop(&mut self) {
> + if let Some(operation) = self.operation.clone() {
> + if let Err(e) = update_active_operations(self.name(), operation, -1) {
> + eprintln!("could not update active operations - {}", e);
> + }
> + }
> + }
> +}
> +
> impl DataStore {
> pub fn lookup_datastore(
> name: &str,
> @@ -73,6 +174,7 @@ impl DataStore {
> | (Some(MaintenanceType::Offline(message)), Some(_)) => {
> bail!("Datastore '{}' is in maintenance mode: {}", name, message);
> },
> + (_, Some(operation)) => update_active_operations(name, operation, 1)?,
> _ => {}
> }
>
> @@ -83,7 +185,10 @@ impl DataStore {
> if datastore.chunk_store.base() == path &&
> datastore.verify_new == config.verify_new.unwrap_or(false)
> {
> - return Ok(datastore.clone());
> + return Ok(Arc::new(Self {
> + inner: datastore.clone(),
> + operation,
> + }))
> }
> }
>
> @@ -92,7 +197,10 @@ impl DataStore {
> let datastore = Arc::new(datastore);
> map.insert(name.to_string(), datastore.clone());
>
> - Ok(datastore)
> + Ok(Arc::new(Self {
> + inner: datastore,
> + operation,
> + }))
> }
>
> /// removes all datastores that are not configured anymore
> @@ -107,7 +215,7 @@ impl DataStore {
> Ok(())
> }
>
> - fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<Self, Error> {
> + fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<DataStoreImpl, Error> {
> let chunk_store = ChunkStore::open(store_name, path)?;
>
> let mut gc_status_path = chunk_store.base_path();
> @@ -125,7 +233,7 @@ impl DataStore {
> GarbageCollectionStatus::default()
> };
>
> - Ok(Self {
> + Ok(DataStoreImpl {
> chunk_store: Arc::new(chunk_store),
> gc_mutex: Mutex::new(()),
> last_gc_status: Mutex::new(gc_status),
> @@ -139,19 +247,19 @@ impl DataStore {
> impl Iterator<Item = (Result<pbs_tools::fs::ReadDirEntry, Error>, usize, bool)>,
> Error
> > {
> - self.chunk_store.get_chunk_iterator()
> + self.inner.chunk_store.get_chunk_iterator()
> }
>
> pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
>
> - let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
> + let index = FixedIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
>
> Ok(index)
> }
>
> pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
>
> - let full_path = self.chunk_store.relative_path(filename.as_ref());
> + let full_path = self.inner.chunk_store.relative_path(filename.as_ref());
>
> let index = FixedIndexReader::open(&full_path)?;
>
> @@ -163,14 +271,14 @@ impl DataStore {
> ) -> Result<DynamicIndexWriter, Error> {
>
> let index = DynamicIndexWriter::create(
> - self.chunk_store.clone(), filename.as_ref())?;
> + self.inner.chunk_store.clone(), filename.as_ref())?;
>
> Ok(index)
> }
>
> pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
>
> - let full_path = self.chunk_store.relative_path(filename.as_ref());
> + let full_path = self.inner.chunk_store.relative_path(filename.as_ref());
>
> let index = DynamicIndexReader::open(&full_path)?;
>
> @@ -220,11 +328,11 @@ impl DataStore {
> }
>
> pub fn name(&self) -> &str {
> - self.chunk_store.name()
> + self.inner.chunk_store.name()
> }
>
> pub fn base_path(&self) -> PathBuf {
> - self.chunk_store.base_path()
> + self.inner.chunk_store.base_path()
> }
>
> /// Cleanup a backup directory
> @@ -530,7 +638,7 @@ impl DataStore {
> worker.check_abort()?;
> worker.fail_on_shutdown()?;
> let digest = index.index_digest(pos).unwrap();
> - if !self.chunk_store.cond_touch_chunk(digest, false)? {
> + if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
> task_warn!(
> worker,
> "warning: unable to access non-existent chunk {}, required by {:?}",
> @@ -546,7 +654,7 @@ impl DataStore {
> let mut bad_path = PathBuf::new();
> bad_path.push(self.chunk_path(digest).0);
> bad_path.set_extension(bad_ext);
> - self.chunk_store.cond_touch_path(&bad_path, false)?;
> + self.inner.chunk_store.cond_touch_path(&bad_path, false)?;
> }
> }
> }
> @@ -626,24 +734,24 @@ impl DataStore {
> }
>
> pub fn last_gc_status(&self) -> GarbageCollectionStatus {
> - self.last_gc_status.lock().unwrap().clone()
> + self.inner.last_gc_status.lock().unwrap().clone()
> }
>
> pub fn garbage_collection_running(&self) -> bool {
> - !matches!(self.gc_mutex.try_lock(), Ok(_))
> + !matches!(self.inner.gc_mutex.try_lock(), Ok(_))
> }
>
> pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> {
>
> - if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
> + if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() {
>
> // avoids that we run GC if an old daemon process has still a
> // running backup writer, which is not save as we have no "oldest
> // writer" information and thus no safe atime cutoff
> - let _exclusive_lock = self.chunk_store.try_exclusive_lock()?;
> + let _exclusive_lock = self.inner.chunk_store.try_exclusive_lock()?;
>
> let phase1_start_time = proxmox_time::epoch_i64();
> - let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
> + let oldest_writer = self.inner.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
>
> let mut gc_status = GarbageCollectionStatus::default();
> gc_status.upid = Some(upid.to_string());
> @@ -653,7 +761,7 @@ impl DataStore {
> self.mark_used_chunks(&mut gc_status, worker)?;
>
> task_log!(worker, "Start GC phase2 (sweep unused chunks)");
> - self.chunk_store.sweep_unused_chunks(
> + self.inner.chunk_store.sweep_unused_chunks(
> oldest_writer,
> phase1_start_time,
> &mut gc_status,
> @@ -730,7 +838,7 @@ impl DataStore {
> let _ = replace_file(path, serialized.as_bytes(), options, false);
> }
>
> - *self.last_gc_status.lock().unwrap() = gc_status;
> + *self.inner.last_gc_status.lock().unwrap() = gc_status;
>
> } else {
> bail!("Start GC failed - (already running/locked)");
> @@ -740,15 +848,15 @@ impl DataStore {
> }
>
> pub fn try_shared_chunk_store_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
> - self.chunk_store.try_shared_lock()
> + self.inner.chunk_store.try_shared_lock()
> }
>
> pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
> - self.chunk_store.chunk_path(digest)
> + self.inner.chunk_store.chunk_path(digest)
> }
>
> pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
> - self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
> + self.inner.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
> }
>
> pub fn insert_chunk(
> @@ -756,7 +864,7 @@ impl DataStore {
> chunk: &DataBlob,
> digest: &[u8; 32],
> ) -> Result<(bool, u64), Error> {
> - self.chunk_store.insert_chunk(chunk, digest)
> + self.inner.chunk_store.insert_chunk(chunk, digest)
> }
>
> pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result<DataBlob, Error> {
> @@ -772,13 +880,13 @@ impl DataStore {
>
>
> pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
> - let (chunk_path, _digest_str) = self.chunk_store.chunk_path(digest);
> + let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest);
> std::fs::metadata(chunk_path).map_err(Error::from)
> }
>
> pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
>
> - let (chunk_path, digest_str) = self.chunk_store.chunk_path(digest);
> + let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest);
>
> proxmox_lang::try_block!({
> let mut file = std::fs::File::open(&chunk_path)?;
> @@ -892,7 +1000,7 @@ impl DataStore {
> }
>
> pub fn verify_new(&self) -> bool {
> - self.verify_new
> + self.inner.verify_new
> }
>
> /// returns a list of chunks sorted by their inode number on disk
> diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
> index d50a64a5..159642fd 100644
> --- a/pbs-datastore/src/lib.rs
> +++ b/pbs-datastore/src/lib.rs
> @@ -145,6 +145,9 @@
> // Note: .pcat1 => Proxmox Catalog Format version 1
> pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
>
> +/// Directory path where active operations counters are saved.
> +pub const ACTIVE_OPERATIONS_DIR: &str = concat!(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), "/active-operations");
> +
> #[macro_export]
> macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
> () => {
> diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
> index 9eb20269..aa60d316 100644
> --- a/src/bin/proxmox-backup-api.rs
> +++ b/src/bin/proxmox-backup-api.rs
> @@ -73,6 +73,7 @@ async fn run() -> Result<(), Error> {
>
> proxmox_backup::server::create_run_dir()?;
> proxmox_backup::server::create_state_dir()?;
> + proxmox_backup::server::create_active_operations_dir()?;
> proxmox_backup::server::jobstate::create_jobstate_dir()?;
> proxmox_backup::tape::create_tape_status_dir()?;
> proxmox_backup::tape::create_drive_state_dir()?;
> diff --git a/src/server/mod.rs b/src/server/mod.rs
> index deeb3398..ffb26f0e 100644
> --- a/src/server/mod.rs
> +++ b/src/server/mod.rs
> @@ -4,7 +4,7 @@
> //! services. We want async IO, so this is built on top of
> //! tokio/hyper.
>
> -use anyhow::Error;
> +use anyhow::{format_err, Error};
> use serde_json::Value;
>
> use proxmox::tools::fs::{create_path, CreateOptions};
> @@ -71,3 +71,17 @@ pub fn create_state_dir() -> Result<(), Error> {
> create_path(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M!(), None, Some(opts))?;
> Ok(())
> }
> +
> +/// Create active operations dir with correct permission.
> +pub fn create_active_operations_dir() -> Result<(), Error> {
> + let backup_user = pbs_config::backup_user()?;
> + let mode = nix::sys::stat::Mode::from_bits_truncate(0o0750);
> + let options = CreateOptions::new()
> + .perm(mode)
> + .owner(backup_user.uid)
> + .group(backup_user.gid);
> +
> + create_path(pbs_datastore::ACTIVE_OPERATIONS_DIR, None, Some(options))
> + .map_err(|err: Error| format_err!("unable to create active operations dir - {}", err))?;
> + Ok(())
> +}
>
More information about the pbs-devel
mailing list