[pve-devel] [PATCH proxmox-offline-mirror 4/4] mirror: refactor fetch_binary/source_packages
Fabian Grünbichler
f.gruenbichler at proxmox.com
Tue Oct 18 11:20:40 CEST 2022
and pull out some of the progress variables into a struct.
Signed-off-by: Fabian Grünbichler <f.gruenbichler at proxmox.com>
---
src/mirror.rs | 520 ++++++++++++++++++++++++++++----------------------
1 file changed, 287 insertions(+), 233 deletions(-)
diff --git a/src/mirror.rs b/src/mirror.rs
index 39b7f47..faaaa19 100644
--- a/src/mirror.rs
+++ b/src/mirror.rs
@@ -7,7 +7,7 @@ use std::{
use anyhow::{bail, format_err, Error};
use flate2::bufread::GzDecoder;
-use globset::{Glob, GlobSetBuilder};
+use globset::{Glob, GlobSet, GlobSetBuilder};
use nix::libc;
use proxmox_http::{client::sync::Client, HttpClient, HttpOptions};
use proxmox_sys::fs::file_get_contents;
@@ -145,7 +145,7 @@ fn fetch_repo_file(
/// Helper to fetch InRelease (`detached` == false) or Release/Release.gpg (`detached` == true) files from repository.
///
/// Verifies the contained/detached signature and stores all fetched files under `prefix`.
-///
+///
/// Returns the verified raw release file data, or None if the "fetch" part itself fails.
fn fetch_release(
config: &ParsedMirrorConfig,
@@ -474,6 +474,259 @@ pub fn list_snapshots(config: &MirrorConfig) -> Result<Vec<Snapshot>, Error> {
Ok(list)
}
+struct MirrorProgress {
+ warnings: Vec<String>,
+ dry_run: Progress,
+ total: Progress,
+ skip_count: usize,
+ skip_bytes: usize,
+}
+
+fn convert_to_globset(config: &ParsedMirrorConfig) -> Result<Option<GlobSet>, Error> {
+ Ok(if let Some(skipped_packages) = &config.skip.skip_packages {
+ let mut globs = GlobSetBuilder::new();
+ for glob in skipped_packages {
+ let glob = Glob::new(glob)?;
+ globs.add(glob);
+ }
+ let globs = globs.build()?;
+ Some(globs)
+ } else {
+ None
+ })
+}
+
+fn fetch_binary_packages(
+ config: &ParsedMirrorConfig,
+ packages_indices: HashMap<&String, PackagesFile>,
+ dry_run: bool,
+ prefix: &Path,
+ progress: &mut MirrorProgress,
+) -> Result<(), Error> {
+ let skipped_package_globs = convert_to_globset(config)?;
+
+ for (basename, references) in packages_indices {
+ let total_files = references.files.len();
+ if total_files == 0 {
+ println!("\n{basename} - no files, skipping.");
+ continue;
+ } else {
+ println!("\n{basename} - {total_files} total file(s)");
+ }
+
+ let mut fetch_progress = Progress::new();
+ let mut skip_count = 0usize;
+ let mut skip_bytes = 0usize;
+ for package in references.files {
+ if let Some(ref sections) = &config.skip.skip_sections {
+ if sections.iter().any(|section| package.section == *section) {
+ println!(
+ "\tskipping {} - {}b (section '{}')",
+ package.package, package.size, package.section
+ );
+ skip_count += 1;
+ skip_bytes += package.size;
+ continue;
+ }
+ }
+ if let Some(skipped_package_globs) = &skipped_package_globs {
+ let matches = skipped_package_globs.matches(&package.package);
+ if !matches.is_empty() {
+ // safety, skipped_package_globs is set based on this
+ let globs = config.skip.skip_packages.as_ref().unwrap();
+ let matches: Vec<String> = matches.iter().map(|i| globs[*i].clone()).collect();
+ println!(
+ "\tskipping {} - {}b (package glob(s): {})",
+ package.package,
+ package.size,
+ matches.join(", ")
+ );
+ skip_count += 1;
+ skip_bytes += package.size;
+ continue;
+ }
+ }
+ let url = get_repo_url(&config.repository, &package.file);
+
+ if dry_run {
+ if config.pool.contains(&package.checksums) {
+ fetch_progress.update(&FetchResult {
+ data: vec![],
+ fetched: 0,
+ });
+ } else {
+ println!("\t(dry-run) GET missing '{url}' ({}b)", package.size);
+ fetch_progress.update(&FetchResult {
+ data: vec![],
+ fetched: package.size,
+ });
+ }
+ } else {
+ let mut full_path = PathBuf::from(prefix);
+ full_path.push(&package.file);
+
+ match fetch_plain_file(
+ config,
+ &url,
+ &full_path,
+ package.size,
+ &package.checksums,
+ false,
+ dry_run,
+ ) {
+ Ok(res) => fetch_progress.update(&res),
+ Err(err) if config.ignore_errors => {
+ let msg = format!(
+ "{}: failed to fetch package '{}' - {}",
+ basename, package.file, err,
+ );
+ eprintln!("{msg}");
+ progress.warnings.push(msg);
+ }
+ Err(err) => return Err(err),
+ }
+ }
+
+ if fetch_progress.file_count() % (max(total_files / 100, 1)) == 0 {
+ println!("\tProgress: {fetch_progress}");
+ }
+ }
+ println!("\tProgress: {fetch_progress}");
+ if dry_run {
+ progress.dry_run += fetch_progress;
+ } else {
+ progress.total += fetch_progress;
+ }
+ if skip_count > 0 {
+ progress.skip_count += skip_count;
+ progress.skip_bytes += skip_bytes;
+ println!("Skipped downloading {skip_count} packages totalling {skip_bytes}b");
+ }
+ }
+
+ Ok(())
+}
+
+fn fetch_source_packages(
+ config: &ParsedMirrorConfig,
+ source_packages_indices: HashMap<&String, SourcesFile>,
+ dry_run: bool,
+ prefix: &Path,
+ progress: &mut MirrorProgress,
+) -> Result<(), Error> {
+ let skipped_package_globs = convert_to_globset(config)?;
+
+ for (basename, references) in source_packages_indices {
+ let total_source_packages = references.source_packages.len();
+ if total_source_packages == 0 {
+ println!("\n{basename} - no files, skipping.");
+ continue;
+ } else {
+ println!("\n{basename} - {total_source_packages} total source package(s)");
+ }
+
+ let mut fetch_progress = Progress::new();
+ let mut skip_count = 0usize;
+ let mut skip_bytes = 0usize;
+ for package in references.source_packages {
+ if let Some(ref sections) = &config.skip.skip_sections {
+ if sections
+ .iter()
+ .any(|section| package.section.as_ref() == Some(section))
+ {
+ println!(
+ "\tskipping {} - {}b (section '{}')",
+ package.package,
+ package.size(),
+ package.section.as_ref().unwrap(),
+ );
+ skip_count += 1;
+ skip_bytes += package.size();
+ continue;
+ }
+ }
+ if let Some(skipped_package_globs) = &skipped_package_globs {
+ let matches = skipped_package_globs.matches(&package.package);
+ if !matches.is_empty() {
+ // safety, skipped_package_globs is set based on this
+ let globs = config.skip.skip_packages.as_ref().unwrap();
+ let matches: Vec<String> = matches.iter().map(|i| globs[*i].clone()).collect();
+ println!(
+ "\tskipping {} - {}b (package glob(s): {})",
+ package.package,
+ package.size(),
+ matches.join(", ")
+ );
+ skip_count += 1;
+ skip_bytes += package.size();
+ continue;
+ }
+ }
+
+ for file_reference in package.files.values() {
+ let path = format!("{}/{}", package.directory, file_reference.file);
+ let url = get_repo_url(&config.repository, &path);
+
+ if dry_run {
+ if config.pool.contains(&file_reference.checksums) {
+ fetch_progress.update(&FetchResult {
+ data: vec![],
+ fetched: 0,
+ });
+ } else {
+ println!("\t(dry-run) GET missing '{url}' ({}b)", file_reference.size);
+ fetch_progress.update(&FetchResult {
+ data: vec![],
+ fetched: file_reference.size,
+ });
+ }
+ } else {
+ let mut full_path = PathBuf::from(prefix);
+ full_path.push(&path);
+
+ match fetch_plain_file(
+ config,
+ &url,
+ &full_path,
+ file_reference.size,
+ &file_reference.checksums,
+ false,
+ dry_run,
+ ) {
+ Ok(res) => fetch_progress.update(&res),
+ Err(err) if config.ignore_errors => {
+ let msg = format!(
+ "{}: failed to fetch package '{}' - {}",
+ basename, file_reference.file, err,
+ );
+ eprintln!("{msg}");
+ progress.warnings.push(msg);
+ }
+ Err(err) => return Err(err),
+ }
+ }
+
+ if fetch_progress.file_count() % (max(total_source_packages / 100, 1)) == 0 {
+ println!("\tProgress: {fetch_progress}");
+ }
+ }
+ }
+ println!("\tProgress: {fetch_progress}");
+ if dry_run {
+ progress.dry_run += fetch_progress;
+ } else {
+ progress.total += fetch_progress;
+ }
+ if skip_count > 0 {
+ progress.skip_count += skip_count;
+ progress.skip_bytes += skip_bytes;
+ println!("Skipped downloading {skip_count} packages totalling {skip_bytes}b");
+ }
+ }
+
+ Ok(())
+}
+
/// Create a new snapshot of the remote repository, fetching and storing files as needed.
///
/// Operates in three phases:
@@ -518,8 +771,13 @@ pub fn create_snapshot(
let prefix = format!("{snapshot}.tmp");
let prefix = Path::new(&prefix);
- let mut total_progress = Progress::new();
- let mut warnings = Vec::new();
+ let mut progress = MirrorProgress {
+ warnings: Vec::new(),
+ skip_count: 0,
+ skip_bytes: 0,
+ dry_run: Progress::new(),
+ total: Progress::new(),
+ };
let parse_release = |res: FetchResult, name: &str| -> Result<ReleaseFile, Error> {
println!("Parsing {name}..");
@@ -534,14 +792,14 @@ pub fn create_snapshot(
// we want both on-disk for compat reasons, if both are available
let release = fetch_release(&config, prefix, true, dry_run)?
.map(|res| {
- total_progress.update(&res);
+ progress.total.update(&res);
parse_release(res, "Release")
})
.transpose()?;
let in_release = fetch_release(&config, prefix, false, dry_run)?
.map(|res| {
- total_progress.update(&res);
+ progress.total.update(&res);
parse_release(res, "InRelease")
})
.transpose()?;
@@ -671,7 +929,7 @@ pub fn create_snapshot(
reference.file_type, reference.path
);
eprintln!("{msg}");
- warnings.push(msg);
+ progress.warnings.push(msg);
failed_references.push(reference);
continue;
}
@@ -723,7 +981,7 @@ pub fn create_snapshot(
println!("Total dsc size for component: {component_dsc_size}");
packages_size += component_dsc_size;
- total_progress += fetch_progress;
+ progress.total += fetch_progress;
}
println!("Total deb size: {packages_size}");
if !failed_references.is_empty() {
@@ -733,244 +991,40 @@ pub fn create_snapshot(
}
}
- let skipped_package_globs = if let Some(skipped_packages) = &config.skip.skip_packages {
- let mut globs = GlobSetBuilder::new();
- for glob in skipped_packages {
- let glob = Glob::new(glob)?;
- globs.add(glob);
- }
- let globs = globs.build()?;
- Some(globs)
- } else {
- None
- };
-
println!("\nFetching packages..");
- let mut dry_run_progress = Progress::new();
- let mut total_skipped_count = 0usize;
- let mut total_skipped_bytes = 0usize;
- for (basename, references) in packages_indices {
- let total_files = references.files.len();
- if total_files == 0 {
- println!("\n{basename} - no files, skipping.");
- continue;
- } else {
- println!("\n{basename} - {total_files} total file(s)");
- }
-
- let mut fetch_progress = Progress::new();
- let mut skipped_count = 0usize;
- let mut skipped_bytes = 0usize;
- for package in references.files {
- if let Some(ref sections) = &config.skip.skip_sections {
- if sections.iter().any(|section| package.section == *section) {
- println!(
- "\tskipping {} - {}b (section '{}')",
- package.package, package.size, package.section
- );
- skipped_count += 1;
- skipped_bytes += package.size;
- continue;
- }
- }
- if let Some(skipped_package_globs) = &skipped_package_globs {
- let matches = skipped_package_globs.matches(&package.package);
- if !matches.is_empty() {
- // safety, skipped_package_globs is set based on this
- let globs = config.skip.skip_packages.as_ref().unwrap();
- let matches: Vec<String> = matches.iter().map(|i| globs[*i].clone()).collect();
- println!(
- "\tskipping {} - {}b (package glob(s): {})",
- package.package,
- package.size,
- matches.join(", ")
- );
- skipped_count += 1;
- skipped_bytes += package.size;
- continue;
- }
- }
- let url = get_repo_url(&config.repository, &package.file);
-
- if dry_run {
- if config.pool.contains(&package.checksums) {
- fetch_progress.update(&FetchResult {
- data: vec![],
- fetched: 0,
- });
- } else {
- println!("\t(dry-run) GET missing '{url}' ({}b)", package.size);
- fetch_progress.update(&FetchResult {
- data: vec![],
- fetched: package.size,
- });
- }
- } else {
- let mut full_path = PathBuf::from(prefix);
- full_path.push(&package.file);
-
- match fetch_plain_file(
- &config,
- &url,
- &full_path,
- package.size,
- &package.checksums,
- false,
- dry_run,
- ) {
- Ok(res) => fetch_progress.update(&res),
- Err(err) if config.ignore_errors => {
- let msg = format!(
- "{}: failed to fetch package '{}' - {}",
- basename, package.file, err,
- );
- eprintln!("{msg}");
- warnings.push(msg);
- }
- Err(err) => return Err(err),
- }
- }
-
- if fetch_progress.file_count() % (max(total_files / 100, 1)) == 0 {
- println!("\tProgress: {fetch_progress}");
- }
- }
- println!("\tProgress: {fetch_progress}");
- if dry_run {
- dry_run_progress += fetch_progress;
- } else {
- total_progress += fetch_progress;
- }
- if skipped_count > 0 {
- total_skipped_count += skipped_count;
- total_skipped_bytes += skipped_bytes;
- println!("Skipped downloading {skipped_count} packages totalling {skipped_bytes}b");
- }
- }
- for (basename, references) in source_packages_indices {
- let total_source_packages = references.source_packages.len();
- if total_source_packages == 0 {
- println!("\n{basename} - no files, skipping.");
- continue;
- } else {
- println!("\n{basename} - {total_source_packages} total source package(s)");
- }
+ fetch_binary_packages(&config, packages_indices, dry_run, prefix, &mut progress)?;
- let mut fetch_progress = Progress::new();
- let mut skipped_count = 0usize;
- let mut skipped_bytes = 0usize;
- for package in references.source_packages {
- if let Some(ref sections) = &config.skip.skip_sections {
- if sections
- .iter()
- .any(|section| package.section.as_ref() == Some(section))
- {
- println!(
- "\tskipping {} - {}b (section '{}')",
- package.package,
- package.size(),
- package.section.as_ref().unwrap(),
- );
- skipped_count += 1;
- skipped_bytes += package.size();
- continue;
- }
- }
- if let Some(skipped_package_globs) = &skipped_package_globs {
- let matches = skipped_package_globs.matches(&package.package);
- if !matches.is_empty() {
- // safety, skipped_package_globs is set based on this
- let globs = config.skip.skip_packages.as_ref().unwrap();
- let matches: Vec<String> = matches.iter().map(|i| globs[*i].clone()).collect();
- println!(
- "\tskipping {} - {}b (package glob(s): {})",
- package.package,
- package.size(),
- matches.join(", ")
- );
- skipped_count += 1;
- skipped_bytes += package.size();
- continue;
- }
- }
-
- for file_reference in package.files.values() {
- let path = format!("{}/{}", package.directory, file_reference.file);
- let url = get_repo_url(&config.repository, &path);
-
- if dry_run {
- if config.pool.contains(&file_reference.checksums) {
- fetch_progress.update(&FetchResult {
- data: vec![],
- fetched: 0,
- });
- } else {
- println!("\t(dry-run) GET missing '{url}' ({}b)", file_reference.size);
- fetch_progress.update(&FetchResult {
- data: vec![],
- fetched: file_reference.size,
- });
- }
- } else {
- let mut full_path = PathBuf::from(prefix);
- full_path.push(&path);
-
- match fetch_plain_file(
- &config,
- &url,
- &full_path,
- file_reference.size,
- &file_reference.checksums,
- false,
- dry_run,
- ) {
- Ok(res) => fetch_progress.update(&res),
- Err(err) if config.ignore_errors => {
- let msg = format!(
- "{}: failed to fetch package '{}' - {}",
- basename, file_reference.file, err,
- );
- eprintln!("{msg}");
- warnings.push(msg);
- }
- Err(err) => return Err(err),
- }
- }
-
- if fetch_progress.file_count() % (max(total_source_packages / 100, 1)) == 0 {
- println!("\tProgress: {fetch_progress}");
- }
- }
- }
- println!("\tProgress: {fetch_progress}");
- if dry_run {
- dry_run_progress += fetch_progress;
- } else {
- total_progress += fetch_progress;
- }
- if skipped_count > 0 {
- total_skipped_count += skipped_count;
- total_skipped_bytes += skipped_bytes;
- println!("Skipped downloading {skipped_count} packages totalling {skipped_bytes}b");
- }
- }
+ fetch_source_packages(
+ &config,
+ source_packages_indices,
+ dry_run,
+ prefix,
+ &mut progress,
+ )?;
if dry_run {
- println!("\nDry-run Stats (indices, downloaded but not persisted):\n{total_progress}");
- println!("\nDry-run stats (packages, new == missing):\n{dry_run_progress}");
+ println!(
+ "\nDry-run Stats (indices, downloaded but not persisted):\n{}",
+ progress.total
+ );
+ println!(
+ "\nDry-run stats (packages, new == missing):\n{}",
+ progress.dry_run
+ );
} else {
- println!("\nStats: {total_progress}");
+ println!("\nStats: {}", progress.total);
}
if total_count > 0 {
println!(
- "Skipped downloading {total_skipped_count} packages totalling {total_skipped_bytes}b"
+ "Skipped downloading {} packages totalling {}b",
+ progress.skip_count, progress.skip_bytes,
);
}
- if !warnings.is_empty() {
+ if !progress.warnings.is_empty() {
eprintln!("Warnings:");
- for msg in warnings {
+ for msg in progress.warnings {
eprintln!("- {msg}");
}
}
--
2.30.2
More information about the pve-devel
mailing list