@@ -94,108 +94,121 @@ async fn main() -> anyhow::Result<(), Box<dyn std::error::Error + Send + Sync>>
9494 let arguments = Arguments :: parse ( ) ;
9595
9696 loop {
97- let db_connection =
98- connect_database ( arguments. state_directory . as_path ( ) . join ( "snarf.sqlite" ) ) ?;
99- let server_state = match load_server_state ( & db_connection) ? {
100- Some ( server_state) => ServerState :: try_from ( server_state) ?,
101- None => {
102- let default_state = ServerState :: default ( ) ;
103- store_server_state ( & db_connection, & default_state. to_owned ( ) . into ( ) ) ?;
104- default_state
105- }
97+ if let ServerTransition :: Shutdown = start_server ( & arguments) . await ? {
98+ break ;
10699 } ;
100+ }
101+
102+ Ok ( ( ) )
103+ }
107104
108- let ( command_sender, mut command_receiver) = mpsc:: channel :: < ServerCommand > ( 8 ) ;
109- let do_shutdown = Arc :: new ( std:: sync:: Mutex :: new ( false ) ) ;
105+ /// The new state of the main loop.
106+ enum ServerTransition {
107+ Shutdown ,
108+ Restart ,
109+ }
110110
111- let do_shutdown_copy = do_shutdown. clone ( ) ;
112- let mut server_state_clone = server_state. clone ( ) ;
113- let shutdown = async move {
114- info ! ( "Press Cltr-C for graceful shutdown." ) ;
115- tokio:: select! {
116- _= tokio:: signal:: ctrl_c( ) => {
117- * do_shutdown_copy. lock( ) . unwrap( ) = true ;
118- }
119- action = command_receiver. recv( ) => {
120- match action {
121- Some ( ServerCommand :: MarkInitialized ) => {
122- server_state_clone. initialize( ) ;
123- store_server_state( & db_connection, & server_state_clone. into( ) ) . expect( "Updating the server state" ) ;
124- * do_shutdown_copy. lock( ) . unwrap( ) = false ;
125- } ,
126- Some ( ServerCommand :: Shutdown ) => {
127- * do_shutdown_copy. lock( ) . unwrap( ) = false ;
128- } ,
129- None => { }
130- }
111+ async fn start_server (
112+ arguments : & Arguments ,
113+ ) -> anyhow:: Result < ServerTransition , Box < dyn std:: error:: Error + Send + Sync > > {
114+ let db_connection = connect_database ( arguments. state_directory . as_path ( ) . join ( "snarf.sqlite" ) ) ?;
115+ let server_state = match load_server_state ( & db_connection) ? {
116+ Some ( server_state) => ServerState :: try_from ( server_state) ?,
117+ None => {
118+ let default_state = ServerState :: default ( ) ;
119+ store_server_state ( & db_connection, & default_state. to_owned ( ) . into ( ) ) ?;
120+ default_state
121+ }
122+ } ;
123+
124+ let ( command_sender, mut command_receiver) = mpsc:: channel :: < ServerCommand > ( 8 ) ;
125+ let do_shutdown = Arc :: new ( std:: sync:: Mutex :: new ( false ) ) ;
126+
127+ let do_shutdown_copy = do_shutdown. clone ( ) ;
128+ let mut server_state_clone = server_state. clone ( ) ;
129+ let shutdown = async move {
130+ info ! ( "Press Cltr-C for graceful shutdown." ) ;
131+ tokio:: select! {
132+ _= tokio:: signal:: ctrl_c( ) => {
133+ * do_shutdown_copy. lock( ) . unwrap( ) = true ;
134+ }
135+ action = command_receiver. recv( ) => {
136+ match action {
137+ Some ( ServerCommand :: MarkInitialized ) => {
138+ server_state_clone. initialize( ) ;
139+ store_server_state( & db_connection, & server_state_clone. into( ) ) . expect( "Updating the server state" ) ;
140+ * do_shutdown_copy. lock( ) . unwrap( ) = false ;
141+ } ,
142+ Some ( ServerCommand :: Shutdown ) => {
143+ * do_shutdown_copy. lock( ) . unwrap( ) = false ;
144+ } ,
145+ None => { }
131146 }
132147 }
133- } ;
134-
135- let ( blob_service, directory_service, path_info_service, nar_calculation_service) =
136- snix_store:: utils:: construct_services ( arguments. service_addrs . clone ( ) ) . await ?;
137-
138- // The signing_path_info service will sign only while serving new path_infos.
139- let signing_path_info_service = Arc :: new ( LazySigningPathInfoService :: new (
140- path_info_service. clone ( ) ,
141- server_state. cache_key ( ) ,
142- ) ) ;
143-
144- // The management channels are used to fill the cache and potentially to configure
145- // it, authenticated.
146- let management_routes = snarf:: server:: server_routes (
147- & command_sender,
148- & server_state,
149- blob_service. clone ( ) ,
150- directory_service. clone ( ) ,
151- signing_path_info_service. clone ( ) ,
152- nar_calculation_service,
153- ) ;
154-
155- // The nar-bridge serves the actual cache data, unauthenticated.
156- let nar_bridge_state = nar_bridge:: AppState :: new (
157- blob_service. clone ( ) ,
158- directory_service. clone ( ) ,
159- signing_path_info_service,
160- std:: num:: NonZero :: new ( 64usize ) . unwrap ( ) ,
161- ) ;
162-
163- // HTTP
164- let app = nar_bridge:: gen_router ( 30 )
165- . with_state ( nar_bridge_state)
166- . merge ( management_routes. into_axum_router ( ) ) ;
167-
168- let listen_address = & arguments
169- . listen_args
170- . listen_address
171- . clone ( )
172- . unwrap_or_else ( || {
173- "[::]:9000"
174- . parse ( )
175- . expect ( "invalid fallback listen address" )
176- } ) ;
177-
178- let listener = tokio_listener:: Listener :: bind (
179- listen_address,
180- & Default :: default ( ) ,
181- & arguments. listen_args . listener_options ,
182- )
183- . await ;
184-
185- info ! ( listen_address=%listen_address, "starting daemon" ) ;
186-
187- let serve = tokio_listener:: axum07:: serve (
188- listener. unwrap ( ) ,
189- app. into_make_service_with_connect_info :: < tokio_listener:: SomeSocketAddrClonable > ( ) ,
190- )
191- . with_graceful_shutdown ( shutdown) ;
192-
193- serve. await ?;
194-
195- if * do_shutdown. lock ( ) . unwrap ( ) {
196- break ;
197148 }
149+ } ;
150+
151+ let ( blob_service, directory_service, path_info_service, nar_calculation_service) =
152+ snix_store:: utils:: construct_services ( arguments. service_addrs . clone ( ) ) . await ?;
153+
154+ // The signing_path_info service will sign only while serving new path_infos.
155+ let signing_path_info_service = Arc :: new ( LazySigningPathInfoService :: new (
156+ path_info_service. clone ( ) ,
157+ server_state. cache_key ( ) ,
158+ ) ) ;
159+
160+ // The management channels are used to fill the cache and potentially to configure
161+ // it, authenticated.
162+ let management_routes = snarf:: server:: server_routes (
163+ & command_sender,
164+ & server_state,
165+ blob_service. clone ( ) ,
166+ directory_service. clone ( ) ,
167+ signing_path_info_service. clone ( ) ,
168+ nar_calculation_service,
169+ ) ;
170+
171+ // The nar-bridge serves the actual cache data, unauthenticated.
172+ let nar_bridge_state = nar_bridge:: AppState :: new (
173+ blob_service. clone ( ) ,
174+ directory_service. clone ( ) ,
175+ signing_path_info_service,
176+ std:: num:: NonZero :: new ( 64usize ) . unwrap ( ) ,
177+ ) ;
178+
179+ // HTTP
180+ let app = nar_bridge:: gen_router ( 30 )
181+ . with_state ( nar_bridge_state)
182+ . merge ( management_routes. into_axum_router ( ) ) ;
183+
184+ let listen_address = & arguments
185+ . listen_args
186+ . listen_address
187+ . clone ( )
188+ . unwrap_or_else ( || {
189+ "[::]:9000"
190+ . parse ( )
191+ . expect ( "invalid fallback listen address" )
192+ } ) ;
193+
194+ let listener = tokio_listener:: Listener :: bind (
195+ listen_address,
196+ & Default :: default ( ) ,
197+ & arguments. listen_args . listener_options ,
198+ )
199+ . await ;
200+
201+ info ! ( listen_address=%listen_address, "starting daemon" ) ;
202+
203+ tokio_listener:: axum07:: serve (
204+ listener. unwrap ( ) ,
205+ app. into_make_service_with_connect_info :: < tokio_listener:: SomeSocketAddrClonable > ( ) ,
206+ )
207+ . with_graceful_shutdown ( shutdown)
208+ . await ?;
209+
210+ if * do_shutdown. lock ( ) . unwrap ( ) {
211+ return Ok ( ServerTransition :: Shutdown ) ;
198212 }
199-
200- Ok ( ( ) )
213+ Ok ( ServerTransition :: Restart )
201214}
0 commit comments