[pbs-devel] [PATCH v6 proxmox-backup 47/65] fix #3174: client: pxar: enable caching and meta comparison
Dominik Csapak
d.csapak at proxmox.com
Wed May 22 16:45:22 CEST 2024
hight level comment
this patch does many things and feels like it could be broken up more clearly
e.g. putting the catalog manipulation into add_directory could be it's own patch
some other comments inline:
On 5/14/24 12:34, Christian Ebner wrote:
> When walking the file system tree, check for each entry if it is
> reusable, meaning that the metadata did not change and the payload
> chunks can be reindexed instead of reencoding the whole data.
>
> If the metadata matched, the range of the dynamic index entries for
> that file are looked up in the previous payload data index.
> Use the range and possible padding introduced by partial reuse of
> chunks to decide whether to reuse the dynamic entries and encode
> the file payloads as payload reference right away or cache the entry
> for now and keep looking ahead.
>
> If however a non-reusable (because changed) entry is encountered
> before the padding threshold is reached, the entries on the cache are
> flushed to the archive by reencoding them, resetting the cached state.
>
> Reusable chunk digests and size as well as reference offsets to the
> start of regular files payloads within the payload stream are injected
> into the backup stream by sending them to the chunker via a dedicated
> channel, forcing a chunk boundary and inserting the chunks.
>
> If the threshold value for reuse is reached, the chunks are injected
> in the payload stream and the references with the corresponding
> offsets encoded in the metadata stream.
>
> Since multiple files might be contained within a single chunk, it is
> assured that the deduplication of chunks is performed, by keeping back
> the last chunk, so following files might as well reuse that same
> chunk without double indexing it. It is assured that this chunk is
> injected in the stream also in case that the following lookups lead to
> a cache clear and reencoding.
>
> Directory boundaries are cached as well, and written as part of the
> encoding when flushing.
>
> Signed-off-by: Christian Ebner <c.ebner at proxmox.com>
> ---
> pbs-client/src/pxar/create.rs | 494 +++++++++++++++++++++++++++++++---
> 1 file changed, 460 insertions(+), 34 deletions(-)
>
> diff --git a/pbs-client/src/pxar/create.rs b/pbs-client/src/pxar/create.rs
> index 7e6402de5..b2932c973 100644
> --- a/pbs-client/src/pxar/create.rs
> +++ b/pbs-client/src/pxar/create.rs
> @@ -21,9 +21,10 @@ use pathpatterns::{MatchEntry, MatchFlag, MatchList, MatchType, PatternFlag};
> use proxmox_sys::error::SysError;
> use pxar::accessor::aio::{Accessor, Directory};
> use pxar::accessor::ReadAt;
> -use pxar::encoder::{LinkOffset, SeqWrite};
> +use pxar::encoder::{LinkOffset, PayloadOffset, SeqWrite};
> use pxar::{EntryKind, Metadata};
>
> +use proxmox_human_byte::HumanByte;
> use proxmox_io::vec;
> use proxmox_lang::c_str;
> use proxmox_sys::fs::{self, acl, xattr};
> @@ -33,10 +34,14 @@ use pbs_datastore::dynamic_index::DynamicIndexReader;
> use pbs_datastore::index::IndexFile;
>
> use crate::inject_reused_chunks::InjectChunks;
> +use crate::pxar::look_ahead_cache::{CacheEntry, CacheEntryData};
> use crate::pxar::metadata::errno_is_unsupported;
> use crate::pxar::tools::assert_single_path_component;
> use crate::pxar::Flags;
>
> +const CHUNK_PADDING_THRESHOLD: f64 = 0.1;
> +const MAX_CACHE_SIZE: usize = 512;
> +
> /// Pxar options for creating a pxar archive/stream
> #[derive(Default)]
> pub struct PxarCreateOptions {
> @@ -154,6 +159,11 @@ struct Archiver {
> skip_e2big_xattr: bool,
> forced_boundaries: Option<mpsc::Sender<InjectChunks>>,
> previous_payload_index: Option<DynamicIndexReader>,
> + cached_entries: Vec<CacheEntry>,
> + cached_hardlinks: HashSet<HardLinkInfo>,
> + cached_range: Range<u64>,
> + cached_last_chunk: Option<ReusableDynamicEntry>,
> + caching_enabled: bool,
could this be better if those were in an optional struct?
e.g.
cache: Option<PxarCreateCache>,
not sure if it makes the code easier to read or not...
> }
>
> type Encoder<'a, T> = pxar::encoder::aio::Encoder<'a, T>;
> @@ -213,6 +223,8 @@ where
> set.insert(stat.st_dev);
> }
>
> + let metadata_mode = options.previous_ref.is_some() && writers.payload_writer.is_some();
> +
> let mut encoder = Encoder::new(
> &mut writers.writer,
> &metadata,
> @@ -256,11 +268,23 @@ where
> skip_e2big_xattr: options.skip_e2big_xattr,
> forced_boundaries,
> previous_payload_index,
> + cached_entries: Vec::new(),
> + cached_range: Range::default(),
> + cached_last_chunk: None,
> + cached_hardlinks: HashSet::new(),
> + caching_enabled: false,
> };
>
> archiver
> .archive_dir_contents(&mut encoder, previous_metadata_accessor, source_dir, true)
> .await?;
> +
> + if metadata_mode {
> + archiver
> + .flush_cached_reusing_if_below_threshold(&mut encoder, false)
> + .await?;
> + }
> +
> encoder.finish().await?;
> encoder.close().await?;
>
> @@ -318,7 +342,10 @@ impl Archiver {
> for file_entry in file_list {
> let file_name = file_entry.name.to_bytes();
>
> - if is_root && file_name == b".pxarexclude-cli" {
> + if is_root
> + && file_name == b".pxarexclude-cli"
> + && previous_metadata_accessor.is_none()
> + {
IIUC we don't encode a '.pxarexclude-cli' when there is a previous ref ?
does that really make sense
> self.encode_pxarexclude_cli(encoder, &file_entry.name, old_patterns_count)
> .await?;
> continue;
> @@ -336,6 +363,7 @@ impl Archiver {
> .await
> .map_err(|err| self.wrap_err(err))?;
> }
> +
> self.path = old_path;
> self.entry_counter = entry_counter;
> self.patterns.truncate(old_patterns_count);
> @@ -618,8 +646,6 @@ impl Archiver {
> c_file_name: &CStr,
> stat: &FileStat,
> ) -> Result<(), Error> {
> - use pxar::format::mode;
> -
> let file_mode = stat.st_mode & libc::S_IFMT;
> let open_mode = if file_mode == libc::S_IFREG || file_mode == libc::S_IFDIR {
> OFlag::empty()
> @@ -657,6 +683,96 @@ impl Archiver {
> self.skip_e2big_xattr,
> )?;
>
> + if self.previous_payload_index.is_none() {
> + return self
> + .add_entry_to_archive(
> + encoder,
> + previous_metadata,
> + c_file_name,
> + stat,
> + fd,
> + &metadata,
> + None,
> + )
> + .await;
> + }
> +
> + // Avoid having to many open file handles in cached entries
> + if self.cached_entries.len() > MAX_CACHE_SIZE {
> + log::debug!("Max cache size reached, reuse cached entries");
> + self.flush_cached_reusing_if_below_threshold(encoder, true)
> + .await?;
> + }
> +
> + if metadata.is_regular_file() {
> + self.cache_or_flush_entries(
> + encoder,
> + previous_metadata,
> + c_file_name,
> + stat,
> + fd,
> + &metadata,
> + )
> + .await
> + } else if self.caching_enabled {
> + if stat.st_mode & libc::S_IFMT == libc::S_IFDIR {
> + let fd_clone = fd.try_clone()?;
> + let cache_entry = CacheEntry::DirEntry(CacheEntryData::new(
> + fd,
> + c_file_name.into(),
> + *stat,
> + metadata.clone(),
> + PayloadOffset::default(),
> + ));
> + self.cached_entries.push(cache_entry);
> +
> + let dir = Dir::from_fd(fd_clone.into_raw_fd())?;
> + self.add_directory(
> + encoder,
> + previous_metadata,
> + dir,
> + c_file_name,
> + &metadata,
> + stat,
> + )
> + .await?;
> + } else {
> + let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
> + fd,
> + c_file_name.into(),
> + *stat,
> + metadata,
> + PayloadOffset::default(),
> + ));
> + self.cached_entries.push(cache_entry);
> + }
> + Ok(())
> + } else {
> + self.add_entry_to_archive(
> + encoder,
> + previous_metadata,
> + c_file_name,
> + stat,
> + fd,
> + &metadata,
> + None,
> + )
> + .await
> + }
a few questions here:
'caching_enabled' is IMHO a bit misleading as this is only used for non file entries?
personally i find the split handling of caching vs flushing here and in cache_or_flush_entries
a bit confusing:
hardlinks/files are handled there, while dirs and all other filetypes are handled here?
why ?
i'd rather have all types handled there, wihch would make the code here much more readable
and having the code for all types there should also not be that ugly
> + }
> +
> + async fn add_entry_to_archive<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + previous_metadata: &mut Option<Directory<MetadataArchiveReader>>,
> + c_file_name: &CStr,
> + stat: &FileStat,
> + fd: OwnedFd,
> + metadata: &Metadata,
> + payload_offset: Option<PayloadOffset>,
> + ) -> Result<(), Error> {
> + use pxar::format::mode;
> +
> let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
> match metadata.file_type() {
> mode::IFREG => {
> @@ -685,9 +801,14 @@ impl Archiver {
> .add_file(c_file_name, file_size, stat.st_mtime)?;
> }
>
> - let offset: LinkOffset = self
> - .add_regular_file(encoder, fd, file_name, &metadata, file_size)
> - .await?;
> + let offset: LinkOffset = if let Some(payload_offset) = payload_offset {
> + encoder
> + .add_payload_ref(metadata, file_name, file_size, payload_offset)
> + .await?
> + } else {
> + self.add_regular_file(encoder, fd, file_name, metadata, file_size)
> + .await?
> + };
>
> if stat.st_nlink > 1 {
> self.hardlinks
> @@ -698,59 +819,43 @@ impl Archiver {
> }
> mode::IFDIR => {
> let dir = Dir::from_fd(fd.into_raw_fd())?;
> -
> - if let Some(ref catalog) = self.catalog {
> - catalog.lock().unwrap().start_directory(c_file_name)?;
> - }
> - let result = self
> - .add_directory(
> - encoder,
> - previous_metadata,
> - dir,
> - c_file_name,
> - &metadata,
> - stat,
> - )
> - .await;
> - if let Some(ref catalog) = self.catalog {
> - catalog.lock().unwrap().end_directory()?;
> - }
> - result
> + self.add_directory(encoder, previous_metadata, dir, c_file_name, metadata, stat)
> + .await
> }
> mode::IFSOCK => {
> if let Some(ref catalog) = self.catalog {
> catalog.lock().unwrap().add_socket(c_file_name)?;
> }
>
> - Ok(encoder.add_socket(&metadata, file_name).await?)
> + Ok(encoder.add_socket(metadata, file_name).await?)
> }
> mode::IFIFO => {
> if let Some(ref catalog) = self.catalog {
> catalog.lock().unwrap().add_fifo(c_file_name)?;
> }
>
> - Ok(encoder.add_fifo(&metadata, file_name).await?)
> + Ok(encoder.add_fifo(metadata, file_name).await?)
> }
> mode::IFLNK => {
> if let Some(ref catalog) = self.catalog {
> catalog.lock().unwrap().add_symlink(c_file_name)?;
> }
>
> - self.add_symlink(encoder, fd, file_name, &metadata).await
> + self.add_symlink(encoder, fd, file_name, metadata).await
> }
> mode::IFBLK => {
> if let Some(ref catalog) = self.catalog {
> catalog.lock().unwrap().add_block_device(c_file_name)?;
> }
>
> - self.add_device(encoder, file_name, &metadata, stat).await
> + self.add_device(encoder, file_name, metadata, stat).await
> }
> mode::IFCHR => {
> if let Some(ref catalog) = self.catalog {
> catalog.lock().unwrap().add_char_device(c_file_name)?;
> }
>
> - self.add_device(encoder, file_name, &metadata, stat).await
> + self.add_device(encoder, file_name, metadata, stat).await
> }
> other => bail!(
> "encountered unknown file type: 0x{:x} (0o{:o})",
> @@ -760,18 +865,329 @@ impl Archiver {
> }
> }
>
> + async fn cache_or_flush_entries<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + previous_metadata_accessor: &mut Option<Directory<MetadataArchiveReader>>,
> + c_file_name: &CStr,
> + stat: &FileStat,
> + fd: OwnedFd,
> + metadata: &Metadata,
> + ) -> Result<(), Error> {
> + let file_name: &Path = OsStr::from_bytes(c_file_name.to_bytes()).as_ref();
> + let reusable = if let Some(accessor) = previous_metadata_accessor {
> + self.is_reusable_entry(accessor, file_name, metadata)
> + .await?
> + } else {
> + None
> + };
this part could be moved below the hardlink handling no?
because here we define 'reusable' but only need it further below
> +
> + if stat.st_nlink > 1 {
> + let link_info = HardLinkInfo {
> + st_dev: stat.st_dev,
> + st_ino: stat.st_ino,
> + };
> + if self.cached_hardlinks.contains(&link_info) {
> + // This hardlink has been seen by the lookahead cache already, put it on the cache
> + // with a dummy offset and continue without lookup and chunk injection.
> + // On flushing or re-encoding, the logic there will store the actual hardlink with
> + // offset.
> + self.caching_enabled = true;
> + let cache_entry = CacheEntry::RegEntry(CacheEntryData::new(
> + fd,
> + c_file_name.into(),
> + *stat,
> + metadata.clone(),
> + PayloadOffset::default(),
> + ));
> + self.cached_entries.push(cache_entry);
> + return Ok(());
> + } else {
> + // mark this hardlink as seen by the lookahead cache
> + self.cached_hardlinks.insert(link_info);
> + }
> + }
> +
> + if let Some(payload_range) = reusable {
namely here.
also if we'd move the checking if previous_metadata_accessor 'is_some()'
into 'is_reusable_entry' we could inline the variable at all
making this function a bit shorter and easier to read
> + // check for range continuation in payload archive
> + if self.cached_range.end == 0 {
> + // initialize first range to start and end with start of new range
> + self.cached_range.start = payload_range.start;
> + self.cached_range.end = payload_range.start;
> + }
> +
> + if self.cached_range.end == payload_range.start {
> + self.cached_range.end = payload_range.end;
> + log::debug!(
> + "Cache range continuation, new range: {:?}",
> + self.cached_range
> + );
> + } else {
> + log::debug!("Cache range has hole, new range: {payload_range:?}");
> + self.flush_cached_reusing_if_below_threshold(encoder, true)
> + .await?;
> + // range has to be set after flushing of cached entries, which resets the range
> + self.cached_range = payload_range.clone();
> + }
> +
> + // offset relative to start of current range, does not include possible padding of
> + // actual chunks, which needs to be added before encoding the payload reference
> + let offset =
> + PayloadOffset::default().add(payload_range.start - self.cached_range.start);
> + log::debug!("Offset relative to range start: {offset:?}");
> +
> + self.caching_enabled = true;
> + self.cached_entries
> + .push(CacheEntry::RegEntry(CacheEntryData::new(
> + fd,
> + c_file_name.into(),
> + *stat,
> + metadata.clone(),
> + offset,
> + )));
> +
> + return Ok(());
> + }
> +
> + self.flush_cached_reencoding(encoder).await?;
> + self.add_entry_to_archive(
> + encoder,
> + previous_metadata_accessor,
> + c_file_name,
> + stat,
> + fd,
> + metadata,
> + None,
> + )
> + .await
> + }
> +
> + async fn flush_cached_reusing_if_below_threshold<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + keep_last_chunk: bool,
> + ) -> Result<(), Error> {
> + let mut prev_last_chunk = self.cached_last_chunk.take();
> +
> + if self.cached_range.is_empty() {
> + if let Some(prev) = prev_last_chunk {
> + // make sure to inject previous last
> + self.inject_chunks_at_current_payload_position(encoder, vec![prev].as_slice())?;
> + }
> + // only non regular file entries (directories) in cache, allows to do regular encoding
> + self.encode_entries_to_archive(encoder, None).await?;
> + return Ok(());
> + }
> +
> + if let Some(ref ref_payload_index) = self.previous_payload_index {
> + let (mut indices, start_padding, end_padding) =
> + lookup_dynamic_entries(ref_payload_index, self.cached_range.clone())?;
> + let mut padding = start_padding + end_padding;
> + let total_size = (self.cached_range.end - self.cached_range.start) + padding;
> +
> + // take into account used bytes of kept back chunk for padding
> + if let (Some(first), Some(last)) = (indices.first_mut(), prev_last_chunk.as_mut()) {
> + if last.digest() == first.digest() {
> + // Update padding used for threshold calculation only
> + let used = last.size() - last.padding;
> + padding -= used;
> + }
> + }
> +
> + let ratio = padding as f64 / total_size as f64;
> +
> + if ratio > CHUNK_PADDING_THRESHOLD {
> + log::debug!(
> + "Padding ratio: {ratio} > {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}",
> + HumanByte::from(padding),
> + HumanByte::from(total_size),
> + indices.len(),
> + );
> + // do not reuse chunks if introduced padding higher than threshold
> + // opt for re-encoding in that case
> + if let Some(prev) = prev_last_chunk {
> + // make sure to inject previous last
> + self.inject_chunks_at_current_payload_position(encoder, vec![prev].as_slice())?;
this can be more directly written as `&[prev]` no need to allocate a vector for it
> + }
> + self.encode_entries_to_archive(encoder, None).await?;
> + } else {
> + log::debug!(
> + "Padding ratio: {ratio} < {CHUNK_PADDING_THRESHOLD}, padding: {}, total {}, chunks: {}",
> + HumanByte::from(padding),
> + HumanByte::from(total_size),
> + indices.len(),
> + );
> +
> + // check for cases where kept back last is not equal first chunk because the range
> + // end aligned with a chunk boundary, and the chunks therefore needs to be injected
> + if let (Some(first), Some(last)) = (indices.first_mut(), prev_last_chunk) {
> + if last.digest() != first.digest() {
> + // make sure to inject previous last
> + self.inject_chunks_at_current_payload_position(
> + encoder,
> + vec![last].as_slice(),
same here
> + )?;
> + } else {
> + let used = last.size() - last.padding;
> + first.padding -= used;
> + }
> + }
> +
> + let base_offset = Some(encoder.payload_position()?.add(start_padding));
> + self.encode_entries_to_archive(encoder, base_offset).await?;
> +
> + if keep_last_chunk {
> + self.cached_last_chunk = indices.pop();
> + }
> +
> + self.inject_chunks_at_current_payload_position(encoder, indices.as_slice())?;
> + }
> +
> + // clear range while keeping end for possible continuation if entries have been flushed
> + // because the max cache size was reached
> + self.cached_range = self.cached_range.end..self.cached_range.end;
> + self.caching_enabled = false;
> +
> + Ok(())
> + } else {
> + bail!("cannot reuse chunks without previous index reader");
> + }
> + }
> +
> + // Clear the cache and reencode all cached entries
> + // Make sure to inject a possibly kept back chunk from a previous chunk continuation attempt
> + async fn flush_cached_reencoding<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + ) -> Result<(), Error> {
> + if let Some(prev) = self.cached_last_chunk.take() {
> + // make sure to inject previous last
> + self.inject_chunks_at_current_payload_position(encoder, vec![prev].as_slice())?;
and here too
> + }
> +
> + self.encode_entries_to_archive(encoder, None).await?;
> +
> + self.cached_range = self.cached_range.end..self.cached_range.end;
> + self.caching_enabled = false;
> + Ok(())
> + }
> +
> + // Take ownership of cached entries and encode them to the archive
> + // Encode with reused payload chunks when base offset is some, reencode otherwise
> + async fn encode_entries_to_archive<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + base_offset: Option<PayloadOffset>,
> + ) -> Result<(), Error> {
> + // take ownership of cached entries, leaving new empty cache behind
> + let entries = std::mem::take(&mut self.cached_entries);
> + log::debug!(
> + "Got {} cache entries to encode: reuse is {}",
> + entries.len(),
> + base_offset.is_some()
> + );
> +
> + for entry in entries {
> + match entry {
> + CacheEntry::RegEntry(CacheEntryData {
> + fd,
> + c_file_name,
> + stat,
> + metadata,
> + payload_offset,
> + }) => {
> + self.add_entry_to_archive(
> + encoder,
> + &mut None,
> + &c_file_name,
> + &stat,
> + fd,
> + &metadata,
> + base_offset.map(|base_offset| payload_offset.add(base_offset.raw())),
> + )
> + .await?
> + }
> + CacheEntry::DirEntry(CacheEntryData {
> + c_file_name,
> + metadata,
> + ..
> + }) => {
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().start_directory(&c_file_name)?;
> + }
> + let dir_name = OsStr::from_bytes(c_file_name.to_bytes());
> + encoder.create_directory(dir_name, &metadata).await?;
> + }
> + CacheEntry::DirEnd => {
> + encoder.finish().await?;
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().end_directory()?;
> + }
> + }
> + }
> + }
> +
> + Ok(())
> + }
> +
> + fn inject_chunks_at_current_payload_position<T: SeqWrite + Send>(
> + &mut self,
> + encoder: &mut Encoder<'_, T>,
> + reused_chunks: &[ReusableDynamicEntry],
> + ) -> Result<(), Error> {
> + let mut injection_boundary = encoder.payload_position()?;
> +
> + for chunks in reused_chunks.chunks(128) {
> + let mut chunk_list = Vec::with_capacity(128);
> + let mut size = PayloadOffset::default();
> +
> + for chunk in chunks.iter() {
> + log::debug!(
> + "Injecting chunk with {} padding (chunk size {})",
> + HumanByte::from(chunk.padding),
> + HumanByte::from(chunk.size()),
> + );
> + size = size.add(chunk.size());
> + chunk_list.push(chunk.clone());
> + }
> +
> + let inject_chunks = InjectChunks {
> + boundary: injection_boundary.raw(),
> + chunks: chunk_list,
> + size: size.raw() as usize,
> + };
> +
> + if let Some(sender) = self.forced_boundaries.as_mut() {
> + sender.send(inject_chunks)?;
> + } else {
> + bail!("missing injection queue");
> + };
> +
> + injection_boundary = injection_boundary.add(size.raw());
> + log::debug!("Advance payload position by: {size:?}");
> + encoder.advance(size)?;
> + }
> +
> + Ok(())
> + }
> +
> async fn add_directory<T: SeqWrite + Send>(
> &mut self,
> encoder: &mut Encoder<'_, T>,
> previous_metadata_accessor: &mut Option<Directory<MetadataArchiveReader>>,
> dir: Dir,
> - dir_name: &CStr,
> + c_dir_name: &CStr,
> metadata: &Metadata,
> stat: &FileStat,
> ) -> Result<(), Error> {
> - let dir_name = OsStr::from_bytes(dir_name.to_bytes());
> + let dir_name = OsStr::from_bytes(c_dir_name.to_bytes());
>
> - encoder.create_directory(dir_name, metadata).await?;
> + if !self.caching_enabled {
> + if let Some(ref catalog) = self.catalog {
> + catalog.lock().unwrap().start_directory(c_dir_name)?;
> + }
> + encoder.create_directory(dir_name, metadata).await?;
> + }
>
> let old_fs_magic = self.fs_magic;
> let old_fs_feature_flags = self.fs_feature_flags;
> @@ -811,7 +1227,17 @@ impl Archiver {
> self.fs_feature_flags = old_fs_feature_flags;
> self.current_st_dev = old_st_dev;
>
> - encoder.finish().await?;
> + if !self.caching_enabled {
> + encoder.finish().await?;
> + if let Some(ref catalog) = self.catalog {
> + if !self.caching_enabled {
> + catalog.lock().unwrap().end_directory()?;
> + }
> + }
> + } else {
> + self.cached_entries.push(CacheEntry::DirEnd);
> + }
> +
> result
> }
>
More information about the pbs-devel
mailing list