Skip to content

Commit a22abb9

Browse files
committed
feat(wip)!: Initial implementation of a real connection pooling
1 parent 32da594 commit a22abb9

4 files changed

Lines changed: 99 additions & 422 deletions

File tree

canyon_core/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ toml = { workspace = true }
2727
serde = { workspace = true }
2828
walkdir = { workspace = true }
2929
cfg-if = "1.0.0"
30+
bb8-postgres = "0.9.0"
31+
bb8-tiberius = "0.16.0"
32+
bb8 = "0.9.0"
3033

3134
[features]
3235
postgres = ["tokio-postgres"]

canyon_core/src/canyon.rs

Lines changed: 11 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,14 @@ use crate::connection::conn_errors::DatasourceNotFound;
22
use crate::connection::database_type::DatabaseType;
33
use crate::connection::datasources::{CanyonSqlConfig, DatasourceConfig, Datasources};
44
use crate::connection::{
5-
CANYON_INSTANCE, db_connector, get_canyon_tokio_runtime, pool::get_pool_manager,
5+
CANYON_INSTANCE, db_connector, get_canyon_tokio_runtime,
66
};
77
use db_connector::DatabaseConnection;
88
use std::collections::HashMap;
99
use std::sync::Arc;
1010
use std::{error::Error, fs};
1111
use tokio::sync::Mutex;
12+
use crate::connection::pool::CanyonConnection;
1213

1314
pub type SharedConnection = Arc<Mutex<DatabaseConnection>>;
1415

@@ -53,14 +54,14 @@ pub type SharedConnection = Arc<Mutex<DatabaseConnection>>;
5354
/// - `find_datasource_by_name_or_default`: Finds a datasource by name or returns the default.
5455
/// - `get_connection`: Retrieves a read-only connection from the cache.
5556
/// - `get_mut_connection`: Retrieves a mutable connection from the cache.
56-
pub struct Canyon {
57+
pub struct Canyon<'a> {
5758
config: Datasources,
58-
connections: HashMap<&'static str, DatabaseConnection>,
59+
connections: HashMap<&'static str, CanyonConnection<'a>>,
5960
default_connection: Option<DatabaseConnection>,
6061
default_db_type: Option<DatabaseType>,
6162
}
6263

63-
impl Canyon {
64+
impl<'a> Canyon<'a> {
6465
/// Returns the global singleton instance of `Canyon`.
6566
///
6667
/// This function allows access to the singleton instance of the Canyon engine
@@ -114,8 +115,8 @@ impl Canyon {
114115
let config_content = fs::read_to_string(&path)?;
115116
let config: Datasources = toml::from_str::<CanyonSqlConfig>(&config_content)?.canyon_sql;
116117

117-
let mut connections: HashMap<&str, DatabaseConnection> = HashMap::new();
118-
let mut default_connection: Option<DatabaseConnection> = None;
118+
let mut connections: HashMap<&str, CanyonConnection<'a>> = HashMap::new();
119+
let mut default_connection: Option<CanyonConnection<'a>> = None;
119120
let mut default_db_type: Option<DatabaseType> = None;
120121

121122
for ds in config.datasources.iter() {
@@ -204,33 +205,6 @@ impl Canyon {
204205
Ok(conn)
205206
}
206207

207-
/// Gets a pooled connection for better performance
208-
/// This is an internal method that uses the connection pool
209-
pub async fn get_pooled_connection(
210-
&self,
211-
name: &str,
212-
) -> Result<crate::connection::pool::PooledConnection, DatasourceNotFound> {
213-
let pool_manager = get_pool_manager();
214-
let mut pool_manager_guard = pool_manager.lock().await;
215-
216-
// Find the datasource
217-
let datasource = self.find_datasource_by_name_or_default(name)?;
218-
219-
// Create pool if it doesn't exist
220-
if !pool_manager_guard.has_pool(name) {
221-
pool_manager_guard
222-
.create_pool(name, datasource)
223-
.await
224-
.map_err(|_| DatasourceNotFound::from(Some(name)))?;
225-
}
226-
227-
// Get pooled connection
228-
pool_manager_guard
229-
.get_connection(name)
230-
.await
231-
.map_err(|_| DatasourceNotFound::from(Some(name)))
232-
}
233-
234208
/// Gets a fast connection that automatically uses pooling when available
235209
/// This method provides the best performance by using connection pooling
236210
pub async fn get_fast_connection(
@@ -251,6 +225,7 @@ mod __impl {
251225
use std::error::Error;
252226
use std::path::PathBuf;
253227
use walkdir::WalkDir;
228+
use crate::connection::pool::CanyonConnection;
254229

255230
// Internal helper to locate the config file
256231
pub(crate) fn find_config_path() -> Result<PathBuf, std::io::Error> {
@@ -274,10 +249,10 @@ mod __impl {
274249
})
275250
}
276251

277-
pub(crate) async fn process_new_conn_by_datasource(
252+
pub(crate) async fn process_new_conn_by_datasource<'a>(
278253
ds: &DatasourceConfig,
279-
connections: &mut HashMap<&str, DatabaseConnection>,
280-
default: &mut Option<DatabaseConnection>,
254+
connections: &mut HashMap<&str, CanyonConnection<'a>>,
255+
default: &mut Option<CanyonConnection<'a>>,
281256
default_db_type: &mut Option<DatabaseType>,
282257
) -> Result<(), Box<dyn Error + Send + Sync>> {
283258
if default.is_none() {

canyon_core/src/connection/db_connector.rs

Lines changed: 2 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -46,29 +46,6 @@ impl DatabaseConnection {
4646
}
4747
}
4848

49-
/// Creates a connection with optimized settings for better performance
50-
pub async fn new_optimized(
51-
datasource: &DatasourceConfig,
52-
) -> Result<DatabaseConnection, Box<dyn Error + Send + Sync>> {
53-
// Use optimized connection settings for better performance
54-
match datasource.get_db_type() {
55-
#[cfg(feature = "postgres")]
56-
DatabaseType::PostgreSql => {
57-
connection_helpers::create_postgres_connection_optimized(datasource).await
58-
}
59-
60-
#[cfg(feature = "mssql")]
61-
DatabaseType::SqlServer => {
62-
connection_helpers::create_sqlserver_connection_optimized(datasource).await
63-
}
64-
65-
#[cfg(feature = "mysql")]
66-
DatabaseType::MySQL => {
67-
connection_helpers::create_mysql_connection_optimized(datasource).await
68-
}
69-
}
70-
}
71-
7249
pub fn get_db_type(&self) -> DatabaseType {
7350
match self {
7451
#[cfg(feature = "postgres")]
@@ -116,28 +93,6 @@ mod connection_helpers {
11693
datasource: &DatasourceConfig,
11794
) -> Result<DatabaseConnection, Box<dyn Error + Send + Sync>> {
11895
let (user, password) = auth::extract_postgres_auth(&datasource.auth)?;
119-
let url = connection_string(user, password, datasource);
120-
121-
let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls).await?;
122-
123-
tokio::spawn(async move {
124-
if let Err(e) = connection.await {
125-
eprintln!(
126-
"An error occurred while trying to connect to the PostgreSQL database: {e}"
127-
);
128-
}
129-
});
130-
131-
Ok(DatabaseConnection::Postgres(PostgreSqlConnection {
132-
client,
133-
}))
134-
}
135-
136-
#[cfg(feature = "postgres")]
137-
pub async fn create_postgres_connection_optimized(
138-
datasource: &DatasourceConfig,
139-
) -> Result<DatabaseConnection, Box<dyn Error + Send + Sync>> {
140-
let (user, password) = auth::extract_postgres_auth(&datasource.auth)?;
14196

14297
// Use optimized connection settings
14398
let mut config = tokio_postgres::Config::new();
@@ -196,62 +151,18 @@ mod connection_helpers {
196151
}))
197152
}
198153

199-
#[cfg(feature = "mssql")]
200-
pub async fn create_sqlserver_connection_optimized(
201-
datasource: &DatasourceConfig,
202-
) -> Result<DatabaseConnection, Box<dyn Error + Send + Sync>> {
203-
use async_std::net::TcpStream;
204-
let mut tiberius_config = tiberius::Config::new();
205-
206-
tiberius_config.host(&datasource.properties.host);
207-
tiberius_config.port(datasource.properties.port.unwrap_or_default());
208-
tiberius_config.database(&datasource.properties.db_name);
209-
210-
let auth_config = auth::extract_mssql_auth(&datasource.auth)?;
211-
tiberius_config.authentication(auth_config);
212-
tiberius_config.trust_cert(); // TODO: this should be specifically set via user input
213-
tiberius_config.encryption(tiberius::EncryptionLevel::NotSupported); // TODO: user input
214-
215-
// Optimize connection settings for better performance
216-
// Note: Tiberius doesn't expose these settings directly
217-
// The optimization is handled at the TCP level
218-
219-
let tcp = TcpStream::connect(tiberius_config.get_addr()).await?;
220-
tcp.set_nodelay(true)?;
221-
222-
let client = tiberius::Client::connect(tiberius_config, tcp).await?;
223-
224-
Ok(DatabaseConnection::SqlServer(SqlServerConnection {
225-
client: Box::leak(Box::new(client)),
226-
}))
227-
}
228-
229154
#[cfg(feature = "mysql")]
230155
pub async fn create_mysql_connection(
231156
datasource: &DatasourceConfig,
232157
) -> Result<DatabaseConnection, Box<dyn Error + Send + Sync>> {
233158
use mysql_async::Pool;
234159

235-
let (user, password) = auth::extract_mysql_auth(&datasource.auth)?;
236-
let url = connection_string(user, password, datasource);
237-
let mysql_connection = Pool::from_url(url)?;
238-
239-
Ok(DatabaseConnection::MySQL(MysqlConnection {
240-
client: mysql_connection,
241-
}))
242-
}
243-
244-
#[cfg(feature = "mysql")]
245-
pub async fn create_mysql_connection_optimized(
246-
datasource: &DatasourceConfig,
247-
) -> Result<DatabaseConnection, Box<dyn Error + Send + Sync>> {
248-
use mysql_async::Pool;
249-
250160
let (user, password) = auth::extract_mysql_auth(&datasource.auth)?;
251161
let url = connection_string(user, password, datasource);
252162

253163
// Use optimized pool settings for better performance
254-
let _pool_constraints = mysql_async::PoolConstraints::new(2, 10).unwrap();
164+
let _pool_constraints = mysql_async::PoolConstraints::new(2, 10)
165+
.ok_or_else(|| "Failure launching the MySQL pool")?;
255166

256167
let mysql_connection = Pool::from_url(url)?;
257168

0 commit comments

Comments
 (0)