Skip to content

Commit 6809c75

Browse files
committed
feat(wip)!: Unifying the pool within the database connection type
1 parent a22abb9 commit 6809c75

3 files changed

Lines changed: 44 additions & 120 deletions

File tree

canyon_core/src/canyon.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,6 @@ mod __impl {
225225
use std::error::Error;
226226
use std::path::PathBuf;
227227
use walkdir::WalkDir;
228-
use crate::connection::pool::CanyonConnection;
229228

230229
// Internal helper to locate the config file
231230
pub(crate) fn find_config_path() -> Result<PathBuf, std::io::Error> {
@@ -251,8 +250,8 @@ mod __impl {
251250

252251
pub(crate) async fn process_new_conn_by_datasource<'a>(
253252
ds: &DatasourceConfig,
254-
connections: &mut HashMap<&str, CanyonConnection<'a>>,
255-
default: &mut Option<CanyonConnection<'a>>,
253+
connections: &mut HashMap<&str, DatabaseConnection>,
254+
default: &mut Option<DatabaseConnection>,
256255
default_db_type: &mut Option<DatabaseType>,
257256
) -> Result<(), Box<dyn Error + Send + Sync>> {
258257
if default.is_none() {

canyon_core/src/connection/db_connector.rs

Lines changed: 42 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,22 @@ use crate::connection::clients::mssql::SqlServerConnection;
44
use crate::connection::clients::mysql::MysqlConnection;
55
#[cfg(feature = "postgres")]
66
use crate::connection::clients::postgresql::PostgreSqlConnection;
7+
78
use crate::connection::database_type::DatabaseType;
89
use crate::connection::datasources::DatasourceConfig;
910
use std::error::Error;
11+
use std::sync::Arc;
12+
use bb8::PooledConnection;
13+
use bb8_postgres::PostgresConnectionManager;
14+
use tokio_postgres::NoTls;
15+
use mysql_async::{self, Pool as MySqlPool};
16+
use bb8_tiberius::ConnectionManager as TiberiusConnectionManager;
17+
18+
type PgManager = PostgresConnectionManager<NoTls>;
19+
pub(crate) type PgPooled<'a> = PooledConnection<'a, PgManager>;
20+
21+
type MsManager = TiberiusConnectionManager;
22+
pub(crate) type MsPooled<'a> = PooledConnection<'a, MsManager>;
1023

1124
/// The Canyon database connection handler. When the client's program
1225
/// starts, Canyon gets the information about the desired datasources,
@@ -15,11 +28,11 @@ use std::error::Error;
1528
pub enum DatabaseConnection {
1629
// NOTE: is this a Datasource instead of a connection?
1730
#[cfg(feature = "postgres")]
18-
Postgres(PostgreSqlConnection), // NOTE: *Connection means *Client?
31+
Postgres(Arc<bb8::Pool<PgManager>>),
1932
#[cfg(feature = "mssql")]
20-
SqlServer(SqlServerConnection),
33+
SqlServer(Arc<bb8::Pool<MsManager>>),
2134
#[cfg(feature = "mysql")]
22-
MySQL(MysqlConnection),
35+
MySQL(mysql_async::Pool),
2336
}
2437

2538
unsafe impl Send for DatabaseConnection {}
@@ -28,21 +41,21 @@ unsafe impl Sync for DatabaseConnection {}
2841
impl DatabaseConnection {
2942
pub async fn new(
3043
datasource: &DatasourceConfig,
31-
) -> Result<DatabaseConnection, Box<dyn Error + Send + Sync>> {
44+
) -> Result<Self, Box<dyn Error + Send + Sync>> {
3245
// Add connection pooling at the client level for better performance
3346
match datasource.get_db_type() {
3447
#[cfg(feature = "postgres")]
3548
DatabaseType::PostgreSql => {
36-
connection_helpers::create_postgres_connection(datasource).await
49+
Ok(Self::Postgres(Arc::from(connection_helpers::create_postgres_connection(datasource).await?)))
3750
}
3851

3952
#[cfg(feature = "mssql")]
4053
DatabaseType::SqlServer => {
41-
connection_helpers::create_sqlserver_connection(datasource).await
54+
Ok(Self::SqlServer(Arc::from(connection_helpers::create_sqlserver_connection(datasource).await?)))
4255
}
4356

4457
#[cfg(feature = "mysql")]
45-
DatabaseType::MySQL => connection_helpers::create_mysql_connection(datasource).await,
58+
DatabaseType::MySQL => Ok(Self::MySQL(connection_helpers::create_mysql_connection(datasource).await?)),
4659
}
4760
}
4861

@@ -57,6 +70,7 @@ impl DatabaseConnection {
5770
}
5871
}
5972

73+
/*
6074
#[cfg(feature = "postgres")]
6175
pub fn postgres_connection(&self) -> &PostgreSqlConnection {
6276
match self {
@@ -83,15 +97,17 @@ impl DatabaseConnection {
8397
_ => panic!(),
8498
}
8599
}
100+
*/
86101
}
87102

88103
mod connection_helpers {
104+
use bb8::Pool;
89105
use super::*;
90106

91107
#[cfg(feature = "postgres")]
92-
pub async fn create_postgres_connection(
108+
pub(crate) async fn create_postgres_connection(
93109
datasource: &DatasourceConfig,
94-
) -> Result<DatabaseConnection, Box<dyn Error + Send + Sync>> {
110+
) -> Result<Pool<PgManager>, Box<dyn Error + Send + Sync>> {
95111
let (user, password) = auth::extract_postgres_auth(&datasource.auth)?;
96112

97113
// Use optimized connection settings
@@ -118,15 +134,17 @@ mod connection_helpers {
118134
}
119135
});
120136

121-
Ok(DatabaseConnection::Postgres(PostgreSqlConnection {
122-
client,
123-
}))
137+
let manager = PostgresConnectionManager::new(config, NoTls);
138+
bb8::Pool::builder()
139+
.max_size(10u32)
140+
.build(manager).await
141+
.map_err(|err| Box::new(err) as Box<dyn Error + Send + Sync>)
124142
}
125143

126144
#[cfg(feature = "mssql")]
127-
pub async fn create_sqlserver_connection(
145+
pub(crate) async fn create_sqlserver_connection(
128146
datasource: &DatasourceConfig,
129-
) -> Result<DatabaseConnection, Box<dyn Error + Send + Sync>> {
147+
) -> Result<Pool<bb8_tiberius::ConnectionManager>, Box<dyn Error + Send + Sync>> {
130148
use async_std::net::TcpStream;
131149
let mut tiberius_config = tiberius::Config::new();
132150

@@ -144,31 +162,28 @@ mod connection_helpers {
144162
let tcp = TcpStream::connect(tiberius_config.get_addr()).await?;
145163
tcp.set_nodelay(true)?;
146164

147-
let client = tiberius::Client::connect(tiberius_config, tcp).await?;
148-
149-
Ok(DatabaseConnection::SqlServer(SqlServerConnection {
150-
client: Box::leak(Box::new(client)),
151-
}))
165+
let manager = TiberiusConnectionManager::new(tiberius_config);
166+
bb8::Pool::builder().max_size(10u32).build(manager).await
167+
.map_err(|err| Box::new(err) as Box<dyn Error + Send + Sync>)
152168
}
153169

154170
#[cfg(feature = "mysql")]
155-
pub async fn create_mysql_connection(
171+
pub(crate) async fn create_mysql_connection(
156172
datasource: &DatasourceConfig,
157-
) -> Result<DatabaseConnection, Box<dyn Error + Send + Sync>> {
173+
) -> Result<MySqlPool, Box<dyn Error + Send + Sync>> {
158174
use mysql_async::Pool;
159175

160176
let (user, password) = auth::extract_mysql_auth(&datasource.auth)?;
161177
let url = connection_string(user, password, datasource);
162178

163179
// Use optimized pool settings for better performance
164-
let _pool_constraints = mysql_async::PoolConstraints::new(2, 10)
180+
let pool_constraints = mysql_async::PoolConstraints::new(2, 10)
165181
.ok_or_else(|| "Failure launching the MySQL pool")?;
166182

167-
let mysql_connection = Pool::from_url(url)?;
168-
169-
Ok(DatabaseConnection::MySQL(MysqlConnection {
170-
client: mysql_connection,
171-
}))
183+
let mysql_opts = mysql_async::Opts::from_url(&url)?;
184+
let mysql_opts_builder = mysql_async::OptsBuilder::from_opts(mysql_opts)
185+
.pool_opts(mysql_async::PoolOpts::default().with_constraints(pool_constraints));
186+
Ok(MySqlPool::new(mysql_opts_builder))
172187
}
173188

174189
// #[cfg(any(feature = "postgres", feature = "mysql"))]

canyon_core/src/connection/pool.rs

Lines changed: 0 additions & 90 deletions
This file was deleted.

0 commit comments

Comments
 (0)