[pve-devel] [PATCH proxmox-offline-mirror 4/4] mirror: refactor fetch_binary/source_packages

Fabian Grünbichler f.gruenbichler at proxmox.com
Tue Oct 18 11:20:40 CEST 2022


and pull out some of the progress variables into a struct.

Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
 src/mirror.rs | 520 ++++++++++++++++++++++++++++----------------------
 1 file changed, 287 insertions(+), 233 deletions(-)

diff --git a/src/mirror.rs b/src/mirror.rs
index 39b7f47..faaaa19 100644
--- a/src/mirror.rs
+++ b/src/mirror.rs
@@ -7,7 +7,7 @@ use std::{
 
 use anyhow::{bail, format_err, Error};
 use flate2::bufread::GzDecoder;
-use globset::{Glob, GlobSetBuilder};
+use globset::{Glob, GlobSet, GlobSetBuilder};
 use nix::libc;
 use proxmox_http::{client::sync::Client, HttpClient, HttpOptions};
 use proxmox_sys::fs::file_get_contents;
@@ -145,7 +145,7 @@ fn fetch_repo_file(
 /// Helper to fetch InRelease (`detached` == false) or Release/Release.gpg (`detached` == true) files from repository.
 ///
 /// Verifies the contained/detached signature and stores all fetched files under `prefix`.
-/// 
+///
 /// Returns the verified raw release file data, or None if the "fetch" part itself fails.
 fn fetch_release(
     config: &ParsedMirrorConfig,
@@ -474,6 +474,259 @@ pub fn list_snapshots(config: &MirrorConfig) -> Result<Vec<Snapshot>, Error> {
     Ok(list)
 }
 
+struct MirrorProgress {
+    warnings: Vec<String>,
+    dry_run: Progress,
+    total: Progress,
+    skip_count: usize,
+    skip_bytes: usize,
+}
+
+fn convert_to_globset(config: &ParsedMirrorConfig) -> Result<Option<GlobSet>, Error> {
+    Ok(if let Some(skipped_packages) = &config.skip.skip_packages {
+        let mut globs = GlobSetBuilder::new();
+        for glob in skipped_packages {
+            let glob = Glob::new(glob)?;
+            globs.add(glob);
+        }
+        let globs = globs.build()?;
+        Some(globs)
+    } else {
+        None
+    })
+}
+
+fn fetch_binary_packages(
+    config: &ParsedMirrorConfig,
+    packages_indices: HashMap<&String, PackagesFile>,
+    dry_run: bool,
+    prefix: &Path,
+    progress: &mut MirrorProgress,
+) -> Result<(), Error> {
+    let skipped_package_globs = convert_to_globset(config)?;
+
+    for (basename, references) in packages_indices {
+        let total_files = references.files.len();
+        if total_files == 0 {
+            println!("\n{basename} - no files, skipping.");
+            continue;
+        } else {
+            println!("\n{basename} - {total_files} total file(s)");
+        }
+
+        let mut fetch_progress = Progress::new();
+        let mut skip_count = 0usize;
+        let mut skip_bytes = 0usize;
+        for package in references.files {
+            if let Some(ref sections) = &config.skip.skip_sections {
+                if sections.iter().any(|section| package.section == *section) {
+                    println!(
+                        "\tskipping {} - {}b (section '{}')",
+                        package.package, package.size, package.section
+                    );
+                    skip_count += 1;
+                    skip_bytes += package.size;
+                    continue;
+                }
+            }
+            if let Some(skipped_package_globs) = &skipped_package_globs {
+                let matches = skipped_package_globs.matches(&package.package);
+                if !matches.is_empty() {
+                    // safety, skipped_package_globs is set based on this
+                    let globs = config.skip.skip_packages.as_ref().unwrap();
+                    let matches: Vec<String> = matches.iter().map(|i| globs[*i].clone()).collect();
+                    println!(
+                        "\tskipping {} - {}b (package glob(s): {})",
+                        package.package,
+                        package.size,
+                        matches.join(", ")
+                    );
+                    skip_count += 1;
+                    skip_bytes += package.size;
+                    continue;
+                }
+            }
+            let url = get_repo_url(&config.repository, &package.file);
+
+            if dry_run {
+                if config.pool.contains(&package.checksums) {
+                    fetch_progress.update(&FetchResult {
+                        data: vec![],
+                        fetched: 0,
+                    });
+                } else {
+                    println!("\t(dry-run) GET missing '{url}' ({}b)", package.size);
+                    fetch_progress.update(&FetchResult {
+                        data: vec![],
+                        fetched: package.size,
+                    });
+                }
+            } else {
+                let mut full_path = PathBuf::from(prefix);
+                full_path.push(&package.file);
+
+                match fetch_plain_file(
+                    config,
+                    &url,
+                    &full_path,
+                    package.size,
+                    &package.checksums,
+                    false,
+                    dry_run,
+                ) {
+                    Ok(res) => fetch_progress.update(&res),
+                    Err(err) if config.ignore_errors => {
+                        let msg = format!(
+                            "{}: failed to fetch package '{}' - {}",
+                            basename, package.file, err,
+                        );
+                        eprintln!("{msg}");
+                        progress.warnings.push(msg);
+                    }
+                    Err(err) => return Err(err),
+                }
+            }
+
+            if fetch_progress.file_count() % (max(total_files / 100, 1)) == 0 {
+                println!("\tProgress: {fetch_progress}");
+            }
+        }
+        println!("\tProgress: {fetch_progress}");
+        if dry_run {
+            progress.dry_run += fetch_progress;
+        } else {
+            progress.total += fetch_progress;
+        }
+        if skip_count > 0 {
+            progress.skip_count += skip_count;
+            progress.skip_bytes += skip_bytes;
+            println!("Skipped downloading {skip_count} packages totalling {skip_bytes}b");
+        }
+    }
+
+    Ok(())
+}
+
+fn fetch_source_packages(
+    config: &ParsedMirrorConfig,
+    source_packages_indices: HashMap<&String, SourcesFile>,
+    dry_run: bool,
+    prefix: &Path,
+    progress: &mut MirrorProgress,
+) -> Result<(), Error> {
+    let skipped_package_globs = convert_to_globset(config)?;
+
+    for (basename, references) in source_packages_indices {
+        let total_source_packages = references.source_packages.len();
+        if total_source_packages == 0 {
+            println!("\n{basename} - no files, skipping.");
+            continue;
+        } else {
+            println!("\n{basename} - {total_source_packages} total source package(s)");
+        }
+
+        let mut fetch_progress = Progress::new();
+        let mut skip_count = 0usize;
+        let mut skip_bytes = 0usize;
+        for package in references.source_packages {
+            if let Some(ref sections) = &config.skip.skip_sections {
+                if sections
+                    .iter()
+                    .any(|section| package.section.as_ref() == Some(section))
+                {
+                    println!(
+                        "\tskipping {} - {}b (section '{}')",
+                        package.package,
+                        package.size(),
+                        package.section.as_ref().unwrap(),
+                    );
+                    skip_count += 1;
+                    skip_bytes += package.size();
+                    continue;
+                }
+            }
+            if let Some(skipped_package_globs) = &skipped_package_globs {
+                let matches = skipped_package_globs.matches(&package.package);
+                if !matches.is_empty() {
+                    // safety, skipped_package_globs is set based on this
+                    let globs = config.skip.skip_packages.as_ref().unwrap();
+                    let matches: Vec<String> = matches.iter().map(|i| globs[*i].clone()).collect();
+                    println!(
+                        "\tskipping {} - {}b (package glob(s): {})",
+                        package.package,
+                        package.size(),
+                        matches.join(", ")
+                    );
+                    skip_count += 1;
+                    skip_bytes += package.size();
+                    continue;
+                }
+            }
+
+            for file_reference in package.files.values() {
+                let path = format!("{}/{}", package.directory, file_reference.file);
+                let url = get_repo_url(&config.repository, &path);
+
+                if dry_run {
+                    if config.pool.contains(&file_reference.checksums) {
+                        fetch_progress.update(&FetchResult {
+                            data: vec![],
+                            fetched: 0,
+                        });
+                    } else {
+                        println!("\t(dry-run) GET missing '{url}' ({}b)", file_reference.size);
+                        fetch_progress.update(&FetchResult {
+                            data: vec![],
+                            fetched: file_reference.size,
+                        });
+                    }
+                } else {
+                    let mut full_path = PathBuf::from(prefix);
+                    full_path.push(&path);
+
+                    match fetch_plain_file(
+                        config,
+                        &url,
+                        &full_path,
+                        file_reference.size,
+                        &file_reference.checksums,
+                        false,
+                        dry_run,
+                    ) {
+                        Ok(res) => fetch_progress.update(&res),
+                        Err(err) if config.ignore_errors => {
+                            let msg = format!(
+                                "{}: failed to fetch package '{}' - {}",
+                                basename, file_reference.file, err,
+                            );
+                            eprintln!("{msg}");
+                            progress.warnings.push(msg);
+                        }
+                        Err(err) => return Err(err),
+                    }
+                }
+
+                if fetch_progress.file_count() % (max(total_source_packages / 100, 1)) == 0 {
+                    println!("\tProgress: {fetch_progress}");
+                }
+            }
+        }
+        println!("\tProgress: {fetch_progress}");
+        if dry_run {
+            progress.dry_run += fetch_progress;
+        } else {
+            progress.total += fetch_progress;
+        }
+        if skip_count > 0 {
+            progress.skip_count += skip_count;
+            progress.skip_bytes += skip_bytes;
+            println!("Skipped downloading {skip_count} packages totalling {skip_bytes}b");
+        }
+    }
+
+    Ok(())
+}
+
 /// Create a new snapshot of the remote repository, fetching and storing files as needed.
 ///
 /// Operates in three phases:
@@ -518,8 +771,13 @@ pub fn create_snapshot(
     let prefix = format!("{snapshot}.tmp");
     let prefix = Path::new(&prefix);
 
-    let mut total_progress = Progress::new();
-    let mut warnings = Vec::new();
+    let mut progress = MirrorProgress {
+        warnings: Vec::new(),
+        skip_count: 0,
+        skip_bytes: 0,
+        dry_run: Progress::new(),
+        total: Progress::new(),
+    };
 
     let parse_release = |res: FetchResult, name: &str| -> Result<ReleaseFile, Error> {
         println!("Parsing {name}..");
@@ -534,14 +792,14 @@ pub fn create_snapshot(
     // we want both on-disk for compat reasons, if both are available
     let release = fetch_release(&config, prefix, true, dry_run)?
         .map(|res| {
-            total_progress.update(&res);
+            progress.total.update(&res);
             parse_release(res, "Release")
         })
         .transpose()?;
 
     let in_release = fetch_release(&config, prefix, false, dry_run)?
         .map(|res| {
-            total_progress.update(&res);
+            progress.total.update(&res);
             parse_release(res, "InRelease")
         })
         .transpose()?;
@@ -671,7 +929,7 @@ pub fn create_snapshot(
                             reference.file_type, reference.path
                         );
                         eprintln!("{msg}");
-                        warnings.push(msg);
+                        progress.warnings.push(msg);
                         failed_references.push(reference);
                         continue;
                     }
@@ -723,7 +981,7 @@ pub fn create_snapshot(
         println!("Total dsc size for component: {component_dsc_size}");
         packages_size += component_dsc_size;
 
-        total_progress += fetch_progress;
+        progress.total += fetch_progress;
     }
     println!("Total deb size: {packages_size}");
     if !failed_references.is_empty() {
@@ -733,244 +991,40 @@ pub fn create_snapshot(
         }
     }
 
-    let skipped_package_globs = if let Some(skipped_packages) = &config.skip.skip_packages {
-        let mut globs = GlobSetBuilder::new();
-        for glob in skipped_packages {
-            let glob = Glob::new(glob)?;
-            globs.add(glob);
-        }
-        let globs = globs.build()?;
-        Some(globs)
-    } else {
-        None
-    };
-
     println!("\nFetching packages..");
-    let mut dry_run_progress = Progress::new();
-    let mut total_skipped_count = 0usize;
-    let mut total_skipped_bytes = 0usize;
-    for (basename, references) in packages_indices {
-        let total_files = references.files.len();
-        if total_files == 0 {
-            println!("\n{basename} - no files, skipping.");
-            continue;
-        } else {
-            println!("\n{basename} - {total_files} total file(s)");
-        }
-
-        let mut fetch_progress = Progress::new();
-        let mut skipped_count = 0usize;
-        let mut skipped_bytes = 0usize;
-        for package in references.files {
-            if let Some(ref sections) = &config.skip.skip_sections {
-                if sections.iter().any(|section| package.section == *section) {
-                    println!(
-                        "\tskipping {} - {}b (section '{}')",
-                        package.package, package.size, package.section
-                    );
-                    skipped_count += 1;
-                    skipped_bytes += package.size;
-                    continue;
-                }
-            }
-            if let Some(skipped_package_globs) = &skipped_package_globs {
-                let matches = skipped_package_globs.matches(&package.package);
-                if !matches.is_empty() {
-                    // safety, skipped_package_globs is set based on this
-                    let globs = config.skip.skip_packages.as_ref().unwrap();
-                    let matches: Vec<String> = matches.iter().map(|i| globs[*i].clone()).collect();
-                    println!(
-                        "\tskipping {} - {}b (package glob(s): {})",
-                        package.package,
-                        package.size,
-                        matches.join(", ")
-                    );
-                    skipped_count += 1;
-                    skipped_bytes += package.size;
-                    continue;
-                }
-            }
-            let url = get_repo_url(&config.repository, &package.file);
-
-            if dry_run {
-                if config.pool.contains(&package.checksums) {
-                    fetch_progress.update(&FetchResult {
-                        data: vec![],
-                        fetched: 0,
-                    });
-                } else {
-                    println!("\t(dry-run) GET missing '{url}' ({}b)", package.size);
-                    fetch_progress.update(&FetchResult {
-                        data: vec![],
-                        fetched: package.size,
-                    });
-                }
-            } else {
-                let mut full_path = PathBuf::from(prefix);
-                full_path.push(&package.file);
-
-                match fetch_plain_file(
-                    &config,
-                    &url,
-                    &full_path,
-                    package.size,
-                    &package.checksums,
-                    false,
-                    dry_run,
-                ) {
-                    Ok(res) => fetch_progress.update(&res),
-                    Err(err) if config.ignore_errors => {
-                        let msg = format!(
-                            "{}: failed to fetch package '{}' - {}",
-                            basename, package.file, err,
-                        );
-                        eprintln!("{msg}");
-                        warnings.push(msg);
-                    }
-                    Err(err) => return Err(err),
-                }
-            }
-
-            if fetch_progress.file_count() % (max(total_files / 100, 1)) == 0 {
-                println!("\tProgress: {fetch_progress}");
-            }
-        }
-        println!("\tProgress: {fetch_progress}");
-        if dry_run {
-            dry_run_progress += fetch_progress;
-        } else {
-            total_progress += fetch_progress;
-        }
-        if skipped_count > 0 {
-            total_skipped_count += skipped_count;
-            total_skipped_bytes += skipped_bytes;
-            println!("Skipped downloading {skipped_count} packages totalling {skipped_bytes}b");
-        }
-    }
 
-    for (basename, references) in source_packages_indices {
-        let total_source_packages = references.source_packages.len();
-        if total_source_packages == 0 {
-            println!("\n{basename} - no files, skipping.");
-            continue;
-        } else {
-            println!("\n{basename} - {total_source_packages} total source package(s)");
-        }
+    fetch_binary_packages(&config, packages_indices, dry_run, prefix, &mut progress)?;
 
-        let mut fetch_progress = Progress::new();
-        let mut skipped_count = 0usize;
-        let mut skipped_bytes = 0usize;
-        for package in references.source_packages {
-            if let Some(ref sections) = &config.skip.skip_sections {
-                if sections
-                    .iter()
-                    .any(|section| package.section.as_ref() == Some(section))
-                {
-                    println!(
-                        "\tskipping {} - {}b (section '{}')",
-                        package.package,
-                        package.size(),
-                        package.section.as_ref().unwrap(),
-                    );
-                    skipped_count += 1;
-                    skipped_bytes += package.size();
-                    continue;
-                }
-            }
-            if let Some(skipped_package_globs) = &skipped_package_globs {
-                let matches = skipped_package_globs.matches(&package.package);
-                if !matches.is_empty() {
-                    // safety, skipped_package_globs is set based on this
-                    let globs = config.skip.skip_packages.as_ref().unwrap();
-                    let matches: Vec<String> = matches.iter().map(|i| globs[*i].clone()).collect();
-                    println!(
-                        "\tskipping {} - {}b (package glob(s): {})",
-                        package.package,
-                        package.size(),
-                        matches.join(", ")
-                    );
-                    skipped_count += 1;
-                    skipped_bytes += package.size();
-                    continue;
-                }
-            }
-
-            for file_reference in package.files.values() {
-                let path = format!("{}/{}", package.directory, file_reference.file);
-                let url = get_repo_url(&config.repository, &path);
-
-                if dry_run {
-                    if config.pool.contains(&file_reference.checksums) {
-                        fetch_progress.update(&FetchResult {
-                            data: vec![],
-                            fetched: 0,
-                        });
-                    } else {
-                        println!("\t(dry-run) GET missing '{url}' ({}b)", file_reference.size);
-                        fetch_progress.update(&FetchResult {
-                            data: vec![],
-                            fetched: file_reference.size,
-                        });
-                    }
-                } else {
-                    let mut full_path = PathBuf::from(prefix);
-                    full_path.push(&path);
-
-                    match fetch_plain_file(
-                        &config,
-                        &url,
-                        &full_path,
-                        file_reference.size,
-                        &file_reference.checksums,
-                        false,
-                        dry_run,
-                    ) {
-                        Ok(res) => fetch_progress.update(&res),
-                        Err(err) if config.ignore_errors => {
-                            let msg = format!(
-                                "{}: failed to fetch package '{}' - {}",
-                                basename, file_reference.file, err,
-                            );
-                            eprintln!("{msg}");
-                            warnings.push(msg);
-                        }
-                        Err(err) => return Err(err),
-                    }
-                }
-
-                if fetch_progress.file_count() % (max(total_source_packages / 100, 1)) == 0 {
-                    println!("\tProgress: {fetch_progress}");
-                }
-            }
-        }
-        println!("\tProgress: {fetch_progress}");
-        if dry_run {
-            dry_run_progress += fetch_progress;
-        } else {
-            total_progress += fetch_progress;
-        }
-        if skipped_count > 0 {
-            total_skipped_count += skipped_count;
-            total_skipped_bytes += skipped_bytes;
-            println!("Skipped downloading {skipped_count} packages totalling {skipped_bytes}b");
-        }
-    }
+    fetch_source_packages(
+        &config,
+        source_packages_indices,
+        dry_run,
+        prefix,
+        &mut progress,
+    )?;
 
     if dry_run {
-        println!("\nDry-run Stats (indices, downloaded but not persisted):\n{total_progress}");
-        println!("\nDry-run stats (packages, new == missing):\n{dry_run_progress}");
+        println!(
+            "\nDry-run Stats (indices, downloaded but not persisted):\n{}",
+            progress.total
+        );
+        println!(
+            "\nDry-run stats (packages, new == missing):\n{}",
+            progress.dry_run
+        );
     } else {
-        println!("\nStats: {total_progress}");
+        println!("\nStats: {}", progress.total);
     }
     if total_count > 0 {
         println!(
-            "Skipped downloading {total_skipped_count} packages totalling {total_skipped_bytes}b"
+            "Skipped downloading {} packages totalling {}b",
+            progress.skip_count, progress.skip_bytes,
         );
     }
 
-    if !warnings.is_empty() {
+    if !progress.warnings.is_empty() {
         eprintln!("Warnings:");
-        for msg in warnings {
+        for msg in progress.warnings {
             eprintln!("- {msg}");
         }
     }
-- 
2.30.2






More information about the pve-devel mailing list