@@ -13,7 +13,9 @@ use anyhow::{Context as _, anyhow};
1313use cap_media_info:: VideoInfo ;
1414use cap_project:: { CursorEvents , StudioRecordingMeta } ;
1515use cap_timestamp:: { Timestamp , Timestamps } ;
16- use futures:: { FutureExt , StreamExt , channel:: mpsc, future:: OptionFuture } ;
16+ use futures:: {
17+ FutureExt , StreamExt , channel:: mpsc, future:: OptionFuture , stream:: FuturesUnordered ,
18+ } ;
1719use kameo:: { Actor as _, prelude:: * } ;
1820use relative_path:: RelativePathBuf ;
1921use std:: {
@@ -274,6 +276,33 @@ impl Pipeline {
274276 cursor : self . cursor ,
275277 } )
276278 }
279+
280+ fn spawn_watcher ( & self , completion_tx : watch:: Sender < Option < Result < ( ) , PipelineDoneError > > > ) {
281+ let mut futures = FuturesUnordered :: new ( ) ;
282+ futures. push ( self . screen . done_fut ( ) ) ;
283+
284+ if let Some ( ref microphone) = self . microphone {
285+ futures. push ( microphone. done_fut ( ) ) ;
286+ }
287+
288+ if let Some ( ref camera) = self . camera {
289+ futures. push ( camera. done_fut ( ) ) ;
290+ }
291+
292+ if let Some ( ref system_audio) = self . system_audio {
293+ futures. push ( system_audio. done_fut ( ) ) ;
294+ }
295+
296+ tokio:: spawn ( async move {
297+ while let Some ( res) = futures. next ( ) . await {
298+ if let Err ( err) = res {
299+ if completion_tx. borrow ( ) . is_none ( ) {
300+ let _ = completion_tx. send ( Some ( Err ( err) ) ) ;
301+ }
302+ }
303+ }
304+ } ) ;
305+ }
277306}
278307
279308struct CursorPipeline {
@@ -568,7 +597,7 @@ impl SegmentPipelineFactory {
568597 cursors : Cursors ,
569598 next_cursors_id : u32 ,
570599 ) -> anyhow:: Result < Pipeline > {
571- let result = create_segment_pipeline (
600+ let pipeline = create_segment_pipeline (
572601 & self . segments_dir ,
573602 & self . cursors_dir ,
574603 self . index ,
@@ -585,46 +614,12 @@ impl SegmentPipelineFactory {
585614
586615 self . index += 1 ;
587616
588- spawn_pipeline_watchers ( & result , & self . completion_tx ) ;
617+ pipeline . spawn_watcher ( self . completion_tx . clone ( ) ) ;
589618
590- Ok ( result )
619+ Ok ( pipeline )
591620 }
592621}
593622
594- fn spawn_pipeline_watchers (
595- pipeline : & Pipeline ,
596- completion_tx : & watch:: Sender < Option < Result < ( ) , PipelineDoneError > > > ,
597- ) {
598- for fut in collect_pipeline_done_futs ( pipeline) {
599- let tx = completion_tx. clone ( ) ;
600- tokio:: spawn ( async move {
601- if let Err ( err) = fut. await {
602- if tx. borrow ( ) . is_none ( ) {
603- let _ = tx. send ( Some ( Err ( err) ) ) ;
604- }
605- }
606- } ) ;
607- }
608- }
609-
610- fn collect_pipeline_done_futs ( pipeline : & Pipeline ) -> Vec < DoneFut > {
611- let mut futures = vec ! [ pipeline. screen. done_fut( ) ] ;
612-
613- if let Some ( ref microphone) = pipeline. microphone {
614- futures. push ( microphone. done_fut ( ) ) ;
615- }
616-
617- if let Some ( ref camera) = pipeline. camera {
618- futures. push ( camera. done_fut ( ) ) ;
619- }
620-
621- if let Some ( ref system_audio) = pipeline. system_audio {
622- futures. push ( system_audio. done_fut ( ) ) ;
623- }
624-
625- futures
626- }
627-
628623fn completion_rx_to_done_fut (
629624 mut rx : watch:: Receiver < Option < Result < ( ) , PipelineDoneError > > > ,
630625) -> DoneFut {
0 commit comments