@@ -14,10 +14,11 @@ use retry::{retry_function, RetryError};
1414use tokio:: time:: timeout;
1515use types:: batch_state:: BatchState ;
1616use types:: user_state:: UserState ;
17-
17+ use boring :: ssl :: { SslMethod , SslAcceptor , SslStream , SslFiletype } ;
1818use std:: collections:: HashMap ;
1919use std:: env;
2020use std:: net:: SocketAddr ;
21+ use std:: path:: PathBuf ;
2122use std:: sync:: Arc ;
2223use std:: time:: Duration ;
2324
@@ -260,7 +261,14 @@ impl Batcher {
260261 }
261262 }
262263
263- pub async fn listen_connections ( self : Arc < Self > , address : & str ) -> Result < ( ) , BatcherError > {
264+ pub async fn listen_connections ( self : Arc < Self > , address : & str , cert : PathBuf , key : PathBuf ) -> Result < ( ) , BatcherError > {
265+ let mut acceptor;
266+ let mut acceptor_builder = SslAcceptor :: mozilla_intermediate_v5 ( SslMethod :: tls ( ) ) . unwrap ( ) ;
267+ acceptor_builder. set_private_key_file ( key, SslFiletype :: PEM ) . unwrap ( ) ;
268+ acceptor_builder. set_certificate_chain_file ( cert) . unwrap ( ) ;
269+ acceptor_builder. check_private_key ( ) . unwrap ( ) ;
270+ acceptor = Arc :: new ( acceptor_builder. build ( ) ) ;
271+
264272 // Create the event loop and TCP listener we'll accept connections on.
265273 let listener = TcpListener :: bind ( address)
266274 . await
@@ -272,7 +280,7 @@ impl Batcher {
272280 Ok ( ( stream, addr) ) => {
273281 let batcher = self . clone ( ) ;
274282 // Let's spawn the handling of each connection in a separate task.
275- tokio:: spawn ( batcher. handle_connection ( stream, addr) ) ;
283+ tokio:: spawn ( batcher. handle_connection ( stream, addr, acceptor . clone ( ) ) ) ;
276284 }
277285 Err ( e) => {
278286 self . metrics . user_error ( & [ "connection_accept_error" , "" ] ) ;
@@ -366,11 +374,14 @@ impl Batcher {
366374 self : Arc < Self > ,
367375 raw_stream : TcpStream ,
368376 addr : SocketAddr ,
377+ acceptor : Arc < SslAcceptor > ,
369378 ) -> Result < ( ) , BatcherError > {
370379 info ! ( "Incoming TCP connection from: {}" , addr) ;
371380 self . metrics . open_connections . inc ( ) ;
372-
373- let ws_stream_future = tokio_tungstenite:: accept_async ( raw_stream) ;
381+ let tls_stream = tokio_boring:: accept ( & acceptor, raw_stream)
382+ . await
383+ . map_err ( |e | BatcherError :: TlsError ( e. to_string ( ) ) ) ?;
384+ let ws_stream_future = tokio_tungstenite:: accept_async ( tls_stream) ;
374385 let ws_stream =
375386 match timeout ( Duration :: from_secs ( CONNECTION_TIMEOUT ) , ws_stream_future) . await {
376387 Ok ( Ok ( stream) ) => stream,
0 commit comments