diff --git a/src/datasets/pipeline.rs b/src/datasets/pipeline.rs index 6086b04..443aa72 100644 --- a/src/datasets/pipeline.rs +++ b/src/datasets/pipeline.rs @@ -23,7 +23,8 @@ use crate::python_runner; use crate::runner_sse; use crate::source_language::{classify_runtime_extension, SourceLanguage}; use crate::sync::discovery::{ - discover_project_log_refs, ProjectLogRefDiscoveryResult, ProjectLogRefScope, + discover_project_log_refs, ProjectLogRefDiscoveryOptions, ProjectLogRefDiscoveryResult, + ProjectLogRefScope, }; use crate::sync::{ artifact_base_dir, artifact_spec_dir, create_jsonl_file_writer, epoch_seconds, read_json_file, @@ -280,10 +281,12 @@ pub async fn run(base: BaseArgs, args: PipelineArgs) -> Result<()> { &args.runner, &source_project.id, &source, - refs, - args.transform.max_concurrency(), - Some(&attachment_dir), - None, + TransformInput { + refs, + max_concurrency: args.transform.max_concurrency(), + attachment_dir: Some(&attachment_dir), + row_writer: None, + }, ) .await?; let row_count = transform_response.rows.len(); @@ -1216,10 +1219,12 @@ async fn transform_refs(base: &BaseArgs, args: PipelineTransformArgs) -> Result< &args.runner, &source_project.id, &source, - refs, - args.transform.max_concurrency(), - Some(&attachment_dir), - Some(&mut writer as &mut dyn Write), + TransformInput { + refs, + max_concurrency: args.transform.max_concurrency(), + attachment_dir: Some(&attachment_dir), + row_writer: Some(&mut writer as &mut dyn Write), + }, ) .await?; writer.flush().context("failed to flush transform output")?; @@ -1256,16 +1261,26 @@ async fn transform_refs(base: &BaseArgs, args: PipelineTransformArgs) -> Result< ) } +struct TransformInput<'a> { + refs: Vec, + max_concurrency: usize, + attachment_dir: Option<&'a Path>, + row_writer: Option<&'a mut dyn Write>, +} + async fn transform_source_refs( base: &BaseArgs, runner: &PipelineRunnerArgs, source_project_id: &str, source: &PipelineSourceInspect, - refs: Vec, - max_concurrency: usize, - attachment_dir: Option<&Path>, - mut row_writer: Option<&mut dyn Write>, + input: TransformInput<'_>, ) -> Result { + let TransformInput { + refs, + max_concurrency, + attachment_dir, + mut row_writer, + } = input; if let Some(attachment_dir) = attachment_dir { fs::create_dir_all(attachment_dir) .with_context(|| format!("failed to create {}", attachment_dir.display()))?; @@ -1849,11 +1864,13 @@ async fn discover_refs( let result = discover_project_log_refs( &client, &ctx, - &project.id, - filter.as_ref(), - project_log_ref_scope(scope), - limit, - options.page_size, + ProjectLogRefDiscoveryOptions { + project_id: &project.id, + filter: filter.as_ref(), + scope: project_log_ref_scope(scope), + target: limit, + page_size: options.page_size, + }, |reference| { write_jsonl_value(&mut writer, &reference.to_value())?; writer.flush().context("failed to flush discovery output")?; diff --git a/src/sync/discovery.rs b/src/sync/discovery.rs index 77581a7..8d32e6c 100644 --- a/src/sync/discovery.rs +++ b/src/sync/discovery.rs @@ -46,6 +46,14 @@ pub(crate) struct ProjectLogRefDiscoveryResult { pub(crate) pages: usize, } +pub(crate) struct ProjectLogRefDiscoveryOptions<'a> { + pub(crate) project_id: &'a str, + pub(crate) filter: Option<&'a Value>, + pub(crate) scope: ProjectLogRefScope, + pub(crate) target: usize, + pub(crate) page_size: usize, +} + #[derive(Debug, Deserialize)] struct DiscoveryBtqlResponse { data: Vec>, @@ -56,16 +64,19 @@ struct DiscoveryBtqlResponse { pub(crate) async fn discover_project_log_refs( client: &ApiClient, ctx: &LoginContext, - project_id: &str, - filter: Option<&Value>, - scope: ProjectLogRefScope, - target: usize, - page_size: usize, + options: ProjectLogRefDiscoveryOptions<'_>, mut on_ref: F, ) -> Result where F: FnMut(ProjectLogRef) -> Result<()>, { + let ProjectLogRefDiscoveryOptions { + project_id, + filter, + scope, + target, + page_size, + } = options; let mut seen = HashSet::new(); let mut trace_roots = Vec::new(); let mut trace_refs_by_root_span_id = HashMap::new(); @@ -94,19 +105,19 @@ where } ProjectLogRefScope::Trace => { let root_span_id = reference.root_span_id.clone(); - if !trace_refs_by_root_span_id.contains_key(&root_span_id) { - if trace_roots.len() >= target { - continue; + match trace_refs_by_root_span_id.entry(root_span_id) { + std::collections::hash_map::Entry::Vacant(entry) => { + if trace_roots.len() >= target { + continue; + } + trace_roots.push(entry.key().clone()); + entry.insert(reference); + } + std::collections::hash_map::Entry::Occupied(mut entry) => { + if better_trace_origin_ref(entry.get(), &reference) { + entry.insert(reference); + } } - trace_roots.push(root_span_id.clone()); - trace_refs_by_root_span_id.insert(root_span_id, reference); - continue; - } - let should_replace = trace_refs_by_root_span_id - .get(&root_span_id) - .is_none_or(|current| better_trace_origin_ref(current, &reference)); - if should_replace { - trace_refs_by_root_span_id.insert(root_span_id, reference); } } } diff --git a/src/topics/mod.rs b/src/topics/mod.rs index 945597e..b3222ba 100644 --- a/src/topics/mod.rs +++ b/src/topics/mod.rs @@ -196,7 +196,7 @@ enum TopicMapCommands { #[command(alias = "view")] Show(TopicMapViewArgs), /// Update a configured Topics topic map by name or function ID - Set(TopicMapSetArgs), + Set(Box), } #[derive(Debug, Clone, Args)]