use std::{ collections::HashMap, path::{Path, PathBuf}, }; use anyhow::{bail, Context, Result}; use futures::{Stream, StreamExt, TryStreamExt}; use log::info; use tokio::{ fs::{self, DirEntry}, io, sync::mpsc::{self, Receiver, Sender}, task::JoinHandle, }; use tokio_stream::wrappers::{ReadDirStream, ReceiverStream}; pub struct MappingGenerator { current_mappings: HashMap, paths_to_process: Vec, } pub struct MappingGeneratorHelper { tx: Sender<(PathBuf, oneshotSender)>, handle: JoinHandle<()>, done: Vec, } impl MappingGeneratorHelper { pub fn new() -> Self { let (rx, tx) = mpsc::channel(100); let handle = tokio::spawn(async move { while let Some(dir) = rx.recv().await { info!("processing '{}'..", dir.display()); get_dir(dir); } }); Self { tx, handle } } pub async fn process(&self, dir: PathBuf) -> Result<()> { let (tx, rx) = self.tx.send(dir).await?; Ok(()) } } impl MappingGenerator { pub async fn new(directories_to_scan: Vec, depth: usize) -> Result { let cleaned_directories: Vec = directories_to_scan .into_iter() .map(check_dir) .collect::>()?; let helper = MappingGeneratorHelper::new(); cleaned_directories .into_iter() .for_each(|dir| helper.process(dir)); info!( "Will process:\n {}", all_directories .iter() .map(|pat| pat.display().to_string()) .collect::>() .join("\n ") ); Ok(Self { current_mappings: HashMap::new(), paths_to_process: all_directories, }) } } fn check_dir(dir: PathBuf) -> Result { match dir.metadata() { Ok(_) => Ok(dir), Err(e) => bail!( "'{}' is not a valid path; Error was: '{}'", dir.display(), e ), } } pub async fn get_dir(dir: PathBuf, current_depth: usize, max_depth: usize) -> Result> { let (tx, rx) = mpsc::channel(100); let handle = tokio::spawn(async move { get_dir_recursive(dir, current_depth, max_depth, tx) }); let out = ReceiverStream::new(rx).collect::>().await; handle.await?; Ok(out) } async fn get_dir_recursive( dir: PathBuf, current_depth: usize, max_depth: usize, tx: Sender, ) -> Result<()> { if dir.is_dir() && current_depth != max_depth { tx.send(dir).await?; match fs::read_dir(&dir).await { Ok(directories) => { let mut handles: Vec>> = vec![]; while let Some(entry) = directories .next_entry() .await .with_context(|| format!("Failed to read directory: '{}'", dir.display()))? { let tx_new = tx.clone(); handles.push(tokio::spawn(async move { get_dir_recursive(entry.path(), current_depth + 1, max_depth, tx_new) .await .with_context(|| { format!("Failed to get child directories to '{}'", dir.display()) })?; Ok(()) })); } let out: Vec<_> = tokio_stream::iter(handles) .then(|handle| async move { handle.await }) .collect() .await; // I have no idea what happened here to the type system for i in out { i?? } Ok(()) } Err(e) => { bail!( "Unable to read directory {}, skipping; error: {}", dir.display(), e ); } } } else { return Ok(()); } } #[cfg(test)] mod test { use std::path::PathBuf; use super::get_dir; #[test] fn test_get_dir() { let dirs = get_dir(PathBuf::from("~/repos")); let expected_dirs = vec![PathBuf::from("~/repos/rust")]; assert_eq!(dirs, expected_dirs); } }