[pbs-devel] [PATCH proxmox-backup v5 3/8] pbs-datastore: add active operations tracking

Dominik Csapak d.csapak at proxmox.com
Mon Jan 31 15:44:42 CET 2022


looks mostly fine (one comment inline)

but did you intentionally ignore thomas comment about
the pid reusing and saving/checking of the starttime?

On 1/24/22 14:44, Hannes Laimer wrote:
> Saves the currently active read/write operation counts in a file. The
> file is updated whenever a reference returned by lookup_datastore is
> dropped and whenever a reference is returned by lookup_datastore. The
> files are locked before every access, there is one file per datastore.
> 
> Signed-off-by: Hannes Laimer <h.laimer at proxmox.com>
> ---
> FIXUP: fix logic, previous version of this patch missed updates for
> PIDs that were not present in non empty list
> 
>   pbs-api-types/src/maintenance.rs   |   1 +
>   pbs-datastore/Cargo.toml           |   1 +
>   pbs-datastore/src/datastore.rs     | 100 ++++++++++++++++++++---------
>   pbs-datastore/src/lib.rs           |   4 ++
>   pbs-datastore/src/task_tracking.rs |  72 +++++++++++++++++++++
>   src/bin/proxmox-backup-api.rs      |   1 +
>   src/server/mod.rs                  |  16 ++++-
>   7 files changed, 163 insertions(+), 32 deletions(-)
>   create mode 100644 pbs-datastore/src/task_tracking.rs
> 
> diff --git a/pbs-api-types/src/maintenance.rs b/pbs-api-types/src/maintenance.rs
> index 4d3ccb3b..1b5753d2 100644
> --- a/pbs-api-types/src/maintenance.rs
> +++ b/pbs-api-types/src/maintenance.rs
> @@ -25,6 +25,7 @@ pub enum MaintenanceType {
>       Offline(String),
>   }
>   
> +#[derive(Copy ,Clone)]
>   /// Operation requirments, used when checking for maintenance mode.
>   pub enum Operation {
>       Read,
> diff --git a/pbs-datastore/Cargo.toml b/pbs-datastore/Cargo.toml
> index d8cefa00..0c703d44 100644
> --- a/pbs-datastore/Cargo.toml
> +++ b/pbs-datastore/Cargo.toml
> @@ -35,5 +35,6 @@ proxmox-uuid = "1"
>   proxmox-sys = "0.2"
>   
>   pbs-api-types = { path = "../pbs-api-types" }
> +pbs-buildcfg = { path = "../pbs-buildcfg" }
>   pbs-tools = { path = "../pbs-tools" }
>   pbs-config = { path = "../pbs-config" }
> diff --git a/pbs-datastore/src/datastore.rs b/pbs-datastore/src/datastore.rs
> index c65d4574..774dea80 100644
> --- a/pbs-datastore/src/datastore.rs
> +++ b/pbs-datastore/src/datastore.rs
> @@ -9,11 +9,12 @@ use std::time::Duration;
>   use anyhow::{bail, format_err, Error};
>   use lazy_static::lazy_static;
>   
> -use proxmox_sys::fs::{replace_file, file_read_optional_string, CreateOptions};
> +use proxmox_sys::fs::{replace_file, file_read_optional_string,
> +    CreateOptions, lock_dir_noblock, DirLockGuard,
> +};
>   use proxmox_sys::process_locker::ProcessLockSharedGuard;
>   use proxmox_sys::WorkerTaskContext;
>   use proxmox_sys::{task_log, task_warn};
> -use proxmox_sys::fs::{lock_dir_noblock, DirLockGuard};
>   
>   use pbs_api_types::{
>       UPID, DataStoreConfig, Authid, MaintenanceType, Operation, GarbageCollectionStatus, HumanByte
> @@ -31,9 +32,10 @@ use crate::manifest::{
>       ArchiveType, BackupManifest,
>       archive_type,
>   };
> +use crate::task_tracking::update_active_operations;
>   
>   lazy_static! {
> -    static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStore>>> = Mutex::new(HashMap::new());
> +    static ref DATASTORE_MAP: Mutex<HashMap<String, Arc<DataStoreImpl>>> = Mutex::new(HashMap::new());
>   }
>   
>   /// checks if auth_id is owner, or, if owner is a token, if
> @@ -54,13 +56,42 @@ pub fn check_backup_owner(
>   ///
>   /// A Datastore can store severals backups, and provides the
>   /// management interface for backup.
> -pub struct DataStore {
> +pub struct DataStoreImpl {
>       chunk_store: Arc<ChunkStore>,
>       gc_mutex: Mutex<()>,
>       last_gc_status: Mutex<GarbageCollectionStatus>,
>       verify_new: bool,
>   }
>   
> +pub struct DataStore {
> +    inner: Arc<DataStoreImpl>,
> +    operation: Option<Operation>,
> +}
> +
> +impl Clone for DataStore {
> +    fn clone(&self) -> Self {
> +        if let Some(operation) = self.operation.clone() {
> +            if let Err(e) = update_active_operations(self.name(), operation, -1) {
> +                eprintln!("could not update active operations - {}", e);
> +            }
> +        }
> +        DataStore {
> +            inner: self.inner.clone(),
> +            operation: self.operation.clone(),
> +        }
> +    }
> +}
> +
> +impl Drop for DataStore {
> +    fn drop(&mut self) {
> +        if let Some(operation) = self.operation.clone() {
> +            if let Err(e) = update_active_operations(self.name(), operation, -1) {
> +                eprintln!("could not update active operations - {}", e);
> +            }
> +        }
> +    }
> +}
> +
>   impl DataStore {
>       pub fn lookup_datastore(
>           name: &str,
> @@ -75,6 +106,7 @@ impl DataStore {
>               | (Some(MaintenanceType::Offline(message)), Some(_)) => {
>                   bail!("Datastore '{}' is in maintenance mode: {}", name, message);
>               },
> +            (_, Some(operation)) => update_active_operations(name, operation, 1)?,
>               _ => {}
>           }
>   
> @@ -85,7 +117,10 @@ impl DataStore {
>               if datastore.chunk_store.base() == path &&
>                   datastore.verify_new == config.verify_new.unwrap_or(false)
>               {
> -                return Ok(datastore.clone());
> +                return Ok(Arc::new(Self {
> +                    inner: datastore.clone(),
> +                    operation,
> +                }))
>               }
>           }
>   
> @@ -94,7 +129,10 @@ impl DataStore {
>           let datastore = Arc::new(datastore);
>           map.insert(name.to_string(), datastore.clone());
>   
> -        Ok(datastore)
> +        Ok(Arc::new(Self {
> +            inner: datastore,
> +            operation,
> +        }))
>       }
>   
>       /// removes all datastores that are not configured anymore
> @@ -109,7 +147,7 @@ impl DataStore {
>           Ok(())
>       }
>   
> -    fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<Self, Error> {
> +    fn open_with_path(store_name: &str, path: &Path, config: DataStoreConfig) -> Result<DataStoreImpl, Error> {
>           let chunk_store = ChunkStore::open(store_name, path)?;
>   
>           let mut gc_status_path = chunk_store.base_path();
> @@ -127,7 +165,7 @@ impl DataStore {
>               GarbageCollectionStatus::default()
>           };
>   
> -        Ok(Self {
> +        Ok(DataStoreImpl {
>               chunk_store: Arc::new(chunk_store),
>               gc_mutex: Mutex::new(()),
>               last_gc_status: Mutex::new(gc_status),
> @@ -141,19 +179,19 @@ impl DataStore {
>           impl Iterator<Item = (Result<proxmox_sys::fs::ReadDirEntry, Error>, usize, bool)>,
>           Error
>       > {
> -        self.chunk_store.get_chunk_iterator()
> +        self.inner.chunk_store.get_chunk_iterator()
>       }
>   
>       pub fn create_fixed_writer<P: AsRef<Path>>(&self, filename: P, size: usize, chunk_size: usize) -> Result<FixedIndexWriter, Error> {
>   
> -        let index = FixedIndexWriter::create(self.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
> +        let index = FixedIndexWriter::create(self.inner.chunk_store.clone(), filename.as_ref(), size, chunk_size)?;
>   
>           Ok(index)
>       }
>   
>       pub fn open_fixed_reader<P: AsRef<Path>>(&self, filename: P) -> Result<FixedIndexReader, Error> {
>   
> -        let full_path =  self.chunk_store.relative_path(filename.as_ref());
> +        let full_path =  self.inner.chunk_store.relative_path(filename.as_ref());
>   
>           let index = FixedIndexReader::open(&full_path)?;
>   
> @@ -165,14 +203,14 @@ impl DataStore {
>       ) -> Result<DynamicIndexWriter, Error> {
>   
>           let index = DynamicIndexWriter::create(
> -            self.chunk_store.clone(), filename.as_ref())?;
> +            self.inner.chunk_store.clone(), filename.as_ref())?;
>   
>           Ok(index)
>       }
>   
>       pub fn open_dynamic_reader<P: AsRef<Path>>(&self, filename: P) -> Result<DynamicIndexReader, Error> {
>   
> -        let full_path =  self.chunk_store.relative_path(filename.as_ref());
> +        let full_path =  self.inner.chunk_store.relative_path(filename.as_ref());
>   
>           let index = DynamicIndexReader::open(&full_path)?;
>   
> @@ -222,11 +260,11 @@ impl DataStore {
>       }
>   
>       pub fn name(&self) -> &str {
> -        self.chunk_store.name()
> +        self.inner.chunk_store.name()
>       }
>   
>       pub fn base_path(&self) -> PathBuf {
> -        self.chunk_store.base_path()
> +        self.inner.chunk_store.base_path()
>       }
>   
>       /// Cleanup a backup directory
> @@ -529,7 +567,7 @@ impl DataStore {
>               worker.check_abort()?;
>               worker.fail_on_shutdown()?;
>               let digest = index.index_digest(pos).unwrap();
> -            if !self.chunk_store.cond_touch_chunk(digest, false)? {
> +            if !self.inner.chunk_store.cond_touch_chunk(digest, false)? {
>                   task_warn!(
>                       worker,
>                       "warning: unable to access non-existent chunk {}, required by {:?}",
> @@ -545,7 +583,7 @@ impl DataStore {
>                       let mut bad_path = PathBuf::new();
>                       bad_path.push(self.chunk_path(digest).0);
>                       bad_path.set_extension(bad_ext);
> -                    self.chunk_store.cond_touch_path(&bad_path, false)?;
> +                    self.inner.chunk_store.cond_touch_path(&bad_path, false)?;
>                   }
>               }
>           }
> @@ -625,24 +663,24 @@ impl DataStore {
>       }
>   
>       pub fn last_gc_status(&self) -> GarbageCollectionStatus {
> -        self.last_gc_status.lock().unwrap().clone()
> +        self.inner.last_gc_status.lock().unwrap().clone()
>       }
>   
>       pub fn garbage_collection_running(&self) -> bool {
> -        !matches!(self.gc_mutex.try_lock(), Ok(_))
> +        !matches!(self.inner.gc_mutex.try_lock(), Ok(_))
>       }
>   
>       pub fn garbage_collection(&self, worker: &dyn WorkerTaskContext, upid: &UPID) -> Result<(), Error> {
>   
> -        if let Ok(ref mut _mutex) = self.gc_mutex.try_lock() {
> +        if let Ok(ref mut _mutex) = self.inner.gc_mutex.try_lock() {
>   
>               // avoids that we run GC if an old daemon process has still a
>               // running backup writer, which is not save as we have no "oldest
>               // writer" information and thus no safe atime cutoff
> -            let _exclusive_lock =  self.chunk_store.try_exclusive_lock()?;
> +            let _exclusive_lock =  self.inner.chunk_store.try_exclusive_lock()?;
>   
>               let phase1_start_time = proxmox_time::epoch_i64();
> -            let oldest_writer = self.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
> +            let oldest_writer = self.inner.chunk_store.oldest_writer().unwrap_or(phase1_start_time);
>   
>               let mut gc_status = GarbageCollectionStatus::default();
>               gc_status.upid = Some(upid.to_string());
> @@ -652,7 +690,7 @@ impl DataStore {
>               self.mark_used_chunks(&mut gc_status, worker)?;
>   
>               task_log!(worker, "Start GC phase2 (sweep unused chunks)");
> -            self.chunk_store.sweep_unused_chunks(
> +            self.inner.chunk_store.sweep_unused_chunks(
>                   oldest_writer,
>                   phase1_start_time,
>                   &mut gc_status,
> @@ -729,7 +767,7 @@ impl DataStore {
>                   let _ = replace_file(path, serialized.as_bytes(), options, false);
>               }
>   
> -            *self.last_gc_status.lock().unwrap() = gc_status;
> +            *self.inner.last_gc_status.lock().unwrap() = gc_status;
>   
>           } else {
>               bail!("Start GC failed - (already running/locked)");
> @@ -739,15 +777,15 @@ impl DataStore {
>       }
>   
>       pub fn try_shared_chunk_store_lock(&self) -> Result<ProcessLockSharedGuard, Error> {
> -        self.chunk_store.try_shared_lock()
> +        self.inner.chunk_store.try_shared_lock()
>       }
>   
>       pub fn chunk_path(&self, digest:&[u8; 32]) -> (PathBuf, String) {
> -        self.chunk_store.chunk_path(digest)
> +        self.inner.chunk_store.chunk_path(digest)
>       }
>   
>       pub fn cond_touch_chunk(&self, digest: &[u8; 32], fail_if_not_exist: bool) -> Result<bool, Error> {
> -        self.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
> +        self.inner.chunk_store.cond_touch_chunk(digest, fail_if_not_exist)
>       }
>   
>       pub fn insert_chunk(
> @@ -755,7 +793,7 @@ impl DataStore {
>           chunk: &DataBlob,
>           digest: &[u8; 32],
>       ) -> Result<(bool, u64), Error> {
> -        self.chunk_store.insert_chunk(chunk, digest)
> +        self.inner.chunk_store.insert_chunk(chunk, digest)
>       }
>   
>       pub fn load_blob(&self, backup_dir: &BackupDir, filename: &str) -> Result<DataBlob, Error> {
> @@ -771,13 +809,13 @@ impl DataStore {
>   
>   
>       pub fn stat_chunk(&self, digest: &[u8; 32]) -> Result<std::fs::Metadata, Error> {
> -        let (chunk_path, _digest_str) = self.chunk_store.chunk_path(digest);
> +        let (chunk_path, _digest_str) = self.inner.chunk_store.chunk_path(digest);
>           std::fs::metadata(chunk_path).map_err(Error::from)
>       }
>   
>       pub fn load_chunk(&self, digest: &[u8; 32]) -> Result<DataBlob, Error> {
>   
> -        let (chunk_path, digest_str) = self.chunk_store.chunk_path(digest);
> +        let (chunk_path, digest_str) = self.inner.chunk_store.chunk_path(digest);
>   
>           proxmox_lang::try_block!({
>               let mut file = std::fs::File::open(&chunk_path)?;
> @@ -891,7 +929,7 @@ impl DataStore {
>       }
>   
>       pub fn verify_new(&self) -> bool {
> -        self.verify_new
> +        self.inner.verify_new
>       }
>   
>       /// returns a list of chunks sorted by their inode number on disk
> diff --git a/pbs-datastore/src/lib.rs b/pbs-datastore/src/lib.rs
> index d50a64a5..131040c6 100644
> --- a/pbs-datastore/src/lib.rs
> +++ b/pbs-datastore/src/lib.rs
> @@ -145,6 +145,9 @@
>   // Note: .pcat1 => Proxmox Catalog Format version 1
>   pub const CATALOG_NAME: &str = "catalog.pcat1.didx";
>   
> +/// Directory path where active operations counters are saved.
> +pub const ACTIVE_OPERATIONS_DIR: &str = concat!(pbs_buildcfg::PROXMOX_BACKUP_RUN_DIR_M!(), "/active-operations");
> +
>   #[macro_export]
>   macro_rules! PROXMOX_BACKUP_PROTOCOL_ID_V1 {
>       () => {
> @@ -179,6 +182,7 @@ pub mod paperkey;
>   pub mod prune;
>   pub mod read_chunk;
>   pub mod store_progress;
> +pub mod task_tracking;
>   
>   pub mod dynamic_index;
>   pub mod fixed_index;
> diff --git a/pbs-datastore/src/task_tracking.rs b/pbs-datastore/src/task_tracking.rs
> new file mode 100644
> index 00000000..608a3096
> --- /dev/null
> +++ b/pbs-datastore/src/task_tracking.rs
> @@ -0,0 +1,72 @@
> +use std::path::PathBuf;
> +use anyhow::Error;
> +use libc::pid_t;
> +
> +use proxmox_sys::fs::{CreateOptions, file_read_optional_string, open_file_locked, replace_file};
> +use proxmox_sys::linux::procfs;
> +use pbs_api_types::Operation;
> +use serde::{Deserialize, Serialize};
> +
> +#[derive(Deserialize, Serialize, Clone)]
> +struct TaskOperations {
> +    pid: u32,
> +    reading_operations: i64,
> +    writing_operations: i64,
> +}
> +
> +pub fn update_active_operations(name: &str, operation: Operation, count: i64) -> Result<(), Error> {
> +    let path = PathBuf::from(format!("{}/{}", crate::ACTIVE_OPERATIONS_DIR, name));
> +    let lock_path = PathBuf::from(format!("{}/{}.lock", crate::ACTIVE_OPERATIONS_DIR, name));
> +
> +    let user = pbs_config::backup_user()?;
> +    let options = CreateOptions::new()
> +        .group(user.gid)
> +        .owner(user.uid)
> +        .perm(nix::sys::stat::Mode::from_bits_truncate(0o660));
> +
> +    let timeout = std::time::Duration::new(10, 0);
> +    open_file_locked(&lock_path, timeout, true, options.clone())?;
> +
> +    let pid = std::process::id();
> +    let mut updated = false;
> +
> +    let mut updated_tasks = match file_read_optional_string(&path)? {
> +        Some(data) => {
> +            let mut active_tasks: Vec<TaskOperations> = serde_json::from_str::<Vec<TaskOperations>>(&data)?
> +                .iter()
> +                .filter(|task|
> +                    procfs::check_process_running(task.pid as pid_t).is_some())
> +                .cloned()
> +                .collect();
> +
> +            active_tasks.iter_mut()
> +                .for_each(|task| {
> +                    if task.pid == pid {
> +                        updated = true;
> +                        match operation {
> +                            Operation::Read => task.reading_operations += count,
> +                            Operation::Write => task.writing_operations += count,
> +                        }
> +                    }
> +                });
> +            active_tasks

would it not be possible to have that in the filter function too?
or by using maybe 'filter_map' ?

alternatively you could have a simple for loop which pushes
only the correct results into a vec (by using 'into_iter()')

in any case, i think we can avoid the double iteration here

> +        }
> +        None => Vec::new(),
> +    };
> +
> +    if !updated {
> +        updated_tasks.push(match operation {
> +            Operation::Read => TaskOperations {
> +                pid,
> +                reading_operations: 1,
> +                writing_operations: 0,
> +            },
> +            Operation::Write => TaskOperations {
> +                pid,
> +                reading_operations: 0,
> +                writing_operations: 1,
> +            }
> +        })
> +    }
> +    replace_file(&path, serde_json::to_string(&updated_tasks)?.as_bytes(), options, false)
> +}
> \ No newline at end of file
> diff --git a/src/bin/proxmox-backup-api.rs b/src/bin/proxmox-backup-api.rs
> index e6fc5f23..445994dc 100644
> --- a/src/bin/proxmox-backup-api.rs
> +++ b/src/bin/proxmox-backup-api.rs
> @@ -73,6 +73,7 @@ async fn run() -> Result<(), Error> {
>   
>       proxmox_backup::server::create_run_dir()?;
>       proxmox_backup::server::create_state_dir()?;
> +    proxmox_backup::server::create_active_operations_dir()?;
>       proxmox_backup::server::jobstate::create_jobstate_dir()?;
>       proxmox_backup::tape::create_tape_status_dir()?;
>       proxmox_backup::tape::create_drive_state_dir()?;
> diff --git a/src/server/mod.rs b/src/server/mod.rs
> index 2b54cdf0..5a9884e9 100644
> --- a/src/server/mod.rs
> +++ b/src/server/mod.rs
> @@ -4,7 +4,7 @@
>   //! services. We want async IO, so this is built on top of
>   //! tokio/hyper.
>   
> -use anyhow::Error;
> +use anyhow::{format_err, Error};
>   use serde_json::Value;
>   
>   use proxmox_sys::fs::{create_path, CreateOptions};
> @@ -71,3 +71,17 @@ pub fn create_state_dir() -> Result<(), Error> {
>       create_path(pbs_buildcfg::PROXMOX_BACKUP_STATE_DIR_M!(), None, Some(opts))?;
>       Ok(())
>   }
> +
> +/// Create active operations dir with correct permission.
> +pub fn create_active_operations_dir() -> Result<(), Error> {
> +    let backup_user = pbs_config::backup_user()?;
> +    let mode = nix::sys::stat::Mode::from_bits_truncate(0o0750);
> +    let options = CreateOptions::new()
> +        .perm(mode)
> +        .owner(backup_user.uid)
> +        .group(backup_user.gid);
> +
> +    create_path(pbs_datastore::ACTIVE_OPERATIONS_DIR, None, Some(options))
> +        .map_err(|err: Error| format_err!("unable to create active operations dir - {}", err))?;
> +    Ok(())
> +}






More information about the pbs-devel mailing list