Skip to content

Commit b84a4e5

Browse files
feat: refactor event processing / auth middleware
1 parent 5b48ea4 commit b84a4e5

17 files changed

Lines changed: 152 additions & 182 deletions

File tree

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ axum-test={version="20"}
9999
[features]
100100
default=["geoip"]
101101
geoip=["dep:maxminddb"]
102-
_enable_seeding=[]
102+
__dev=[]
103103

104104
[profile.dev]
105105
opt-level=1

data/licenses-npm.json

Lines changed: 1 addition & 1 deletion
Large diffs are not rendered by default.

src/app/core/events.rs

Lines changed: 35 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
use std::sync::Arc;
22

3-
use anyhow::{Result, bail};
3+
use anyhow::{Context, Result};
44
use arc_swap::ArcSwap;
55
use chrono::{DateTime, Utc};
66
use rand::distr::{SampleString, StandardUniform};
7-
use std::sync::mpsc::Receiver;
7+
use tokio::sync::mpsc::Receiver;
88

99
use crate::app::models::{Event, event_params};
10-
use crate::app::{DuckDBPool, EVENT_BATCH_INTERVAL, SqlitePool};
10+
use crate::app::{DuckDBPool, SqlitePool};
1111

1212
#[derive(Clone)]
1313
pub struct LiwanEvents {
@@ -48,49 +48,50 @@ impl LiwanEvents {
4848
/// Append events in batch
4949
pub fn append(&self, events: impl Iterator<Item = Event>) -> Result<()> {
5050
let conn = self.duckdb.get()?;
51-
let mut appender = conn.appender("events")?;
51+
let mut appender = conn.appender("events").context("Failed to get DuckDB appender")?;
5252
let mut first_event_time = Utc::now();
5353
for event in events {
54-
appender.append_row(event_params![event])?;
55-
if first_event_time > event.created_at {
54+
if event.created_at < first_event_time {
5655
first_event_time = event.created_at;
5756
}
57+
appender.append_row(event_params![event]).context("Failed to append event to DuckDB")?;
5858
}
59-
appender.flush()?;
60-
update_event_times(&conn, first_event_time)?;
59+
appender.flush().context("Failed to flush events to DuckDB")?;
60+
update_event_times(&conn, first_event_time).context("Failed to update event times in DuckDB")?;
6161
Ok(())
6262
}
6363

6464
/// Start processing events from the given channel. Blocks until the channel is closed.
65-
pub fn process(&self, events: Receiver<Event>) -> Result<()> {
66-
let conn = self.duckdb.get()?;
65+
pub async fn process(&self, mut events: Receiver<Event>) -> Result<()> {
66+
let mut buffer = Vec::with_capacity(1024);
6767

6868
loop {
69-
match events.recv() {
70-
Ok(event) => {
71-
let mut appender = conn.appender("events")?;
72-
let mut first_event_time = event.created_at;
73-
appender.append_row(event_params![event])?;
74-
75-
// Non-blockingly drain the remaining events in the queue if there are any
76-
let mut count = 1;
77-
for event in events.try_iter() {
78-
appender.append_row(event_params![event])?;
79-
count += 1;
80-
81-
if first_event_time > event.created_at {
82-
first_event_time = event.created_at;
83-
}
84-
}
85-
86-
appender.flush()?;
87-
update_event_times(&conn, first_event_time)?;
88-
tracing::debug!("Processed {} events", count);
89-
90-
// Sleep to allow more events to be received before the next batch
91-
std::thread::sleep(EVENT_BATCH_INTERVAL);
69+
let count = events.recv_many(&mut buffer, 1024).await;
70+
71+
if count == 0 {
72+
tracing::info!("Event channel closed, stopping event processing");
73+
break Ok(());
74+
}
75+
76+
let first_event_time = buffer.first().map(|e| e.created_at).unwrap_or_else(Utc::now);
77+
let events = std::mem::take(&mut buffer).into_iter();
78+
79+
let conn = self.duckdb.clone();
80+
let res = tokio::task::spawn_blocking(move || {
81+
let conn = conn.get().context("Failed to get DuckDB connection")?;
82+
let mut appender = conn.appender("events").context("Failed to get DuckDB appender")?;
83+
for event in events {
84+
appender.append_row(event_params![event]).context("Failed to append event to DuckDB")?;
9285
}
93-
Err(_) => bail!("event channel closed"),
86+
appender.flush().context("Failed to flush events to DuckDB")?;
87+
update_event_times(&conn, first_event_time)?;
88+
anyhow::Ok(())
89+
})
90+
.await?;
91+
92+
match res {
93+
Err(err) => tracing::error!("Event processing task panicked: {:?}", err),
94+
_ => tracing::debug!("Processed {} events", count),
9495
}
9596
}
9697
}

src/app/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ mod embedded {
3838
pub(super) mod events { refinery::embed_migrations!("src/migrations/events"); }
3939
}
4040

41-
const EVENT_BATCH_INTERVAL: std::time::Duration = std::time::Duration::from_secs(5);
42-
4341
impl Liwan {
4442
pub fn try_new(config: Config) -> Result<Arc<Self>> {
4543
tracing::debug!("Initializing app");
@@ -114,7 +112,7 @@ impl Liwan {
114112
}
115113
}
116114

117-
#[cfg(any(debug_assertions, test, feature = "_enable_seeding"))]
115+
#[cfg(any(debug_assertions, test, feature = "__dev"))]
118116
impl Liwan {
119117
pub fn seed_database(&self, count_per_entity: usize) -> Result<()> {
120118
use chrono::{Days, Utc};

src/app/models.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ pub struct Project {
3232
pub id: String,
3333
pub display_name: String,
3434
pub public: bool,
35-
pub secret: Option<String>, // enable public access with password protection
35+
pub secret: Option<String>, // currently unused
3636
}
3737

3838
#[derive(Debug, Clone)]

src/cli.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,11 @@ pub enum Command {
3131
UpdatePassword(UpdatePassword),
3232
AddUser(AddUser),
3333
Users(ListUsers),
34-
#[cfg(any(debug_assertions, test, feature = "_enable_seeding"))]
34+
#[cfg(any(debug_assertions, test, feature = "__dev"))]
3535
SeedDatabase(SeedDatabase),
3636
}
3737

38-
#[cfg(any(debug_assertions, test, feature = "_enable_seeding"))]
38+
#[cfg(any(debug_assertions, test, feature = "__dev"))]
3939
#[derive(FromArgs)]
4040
#[argh(subcommand, name = "seed-database")]
4141
/// Seed the database with some test data
@@ -129,10 +129,10 @@ pub fn handle_command(mut config: Config, cmd: Command) -> Result<()> {
129129
std::fs::write(&output, DEFAULT_CONFIG)?;
130130
println!("Configuration file written to liwan.config.toml");
131131
}
132-
#[cfg(any(debug_assertions, test, feature = "_enable_seeding"))]
132+
#[cfg(any(debug_assertions, test, feature = "__dev"))]
133133
Command::SeedDatabase(_) => {
134134
let app = Liwan::try_new(config)?;
135-
app.seed_database(1_000_000)?;
135+
app.seed_database(10_000_000)?;
136136
println!("Database seeded with test data");
137137
}
138138
}

src/main.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ async fn main() -> Result<()> {
1616
setup_logger(args.log_level)?;
1717

1818
let config = Config::load(args.config)?;
19-
let (s, r) = std::sync::mpsc::channel::<Event>();
19+
let (s, r) = tokio::sync::mpsc::channel::<Event>(1024 * 10);
2020

2121
if let Some(cmd) = args.cmd {
2222
return cli::handle_command(config, cmd);
@@ -30,7 +30,7 @@ async fn main() -> Result<()> {
3030
biased;
3131
_ = liwan::utils::signals::shutdown() => app_copy.shutdown(),
3232
res = web::start_webserver(app.clone(), s) => res,
33-
res = tokio::task::spawn_blocking(move || app.clone().events.process(r)) => res?
33+
res = tokio::task::spawn(async move { app.events.process(r).await }) => res?,
3434
}
3535
}
3636

src/utils/validate.rs

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use crate::{
2-
app::models::{Project, UserRole},
3-
web::SessionUser,
4-
};
5-
1+
use crate::app::models::{Project, User, UserRole};
62
pub const MAX_DATAPOINTS: u32 = 2000;
73

84
pub fn is_valid_id(id: &str) -> bool {
@@ -13,8 +9,8 @@ pub fn is_valid_username(name: &str) -> bool {
139
name.chars().all(|c| c.is_alphanumeric() || c == '-' || c == '_') && name.len() <= 64 && name.len() >= 3
1410
}
1511

16-
pub fn can_access_project(project: &Project, user: Option<&SessionUser>) -> bool {
17-
project.public || user.is_some_and(|u| u.0.role == UserRole::Admin || u.0.projects.contains(&project.id))
12+
pub fn can_access_project(project: &Project, user: Option<&User>) -> bool {
13+
project.public || user.is_some_and(|u| u.role == UserRole::Admin || u.projects.contains(&project.id))
1814
}
1915

2016
#[cfg(test)]
@@ -32,11 +28,7 @@ mod tests {
3228
};
3329
assert!(can_access_project(&project, None), "Public project should be accessible without a user.");
3430

35-
let user = SessionUser(User {
36-
username: "test".to_string(),
37-
role: UserRole::User,
38-
projects: vec!["other".to_string()],
39-
});
31+
let user = User { username: "test".to_string(), role: UserRole::User, projects: vec!["other".to_string()] };
4032
assert!(can_access_project(&project, Some(&user)), "Public project should be accessible with any user.");
4133

4234
let project = Project {
@@ -45,7 +37,7 @@ mod tests {
4537
secret: None,
4638
public: false,
4739
};
48-
let admin_user = SessionUser(User { username: "admin".to_string(), role: UserRole::Admin, projects: vec![] });
40+
let admin_user = User { username: "admin".to_string(), role: UserRole::Admin, projects: vec![] };
4941
assert!(can_access_project(&project, Some(&admin_user)), "Admin should have access to any project.");
5042

5143
let project = Project {

src/web/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@ pub mod webext;
44

55
use std::net::{SocketAddr, ToSocketAddrs};
66
use std::ops::Deref;
7-
use std::sync::{Arc, mpsc::Sender};
7+
use std::sync::Arc;
88

99
use anyhow::{Context, Result};
1010
use axum::handler::{Handler, HandlerWithoutStateExt};
1111
use rust_embed::RustEmbed;
1212

1313
use aide::{axum::ApiRouter, openapi};
1414
use http::{HeaderValue, Method, header};
15+
use tokio::sync::mpsc::Sender;
1516
use tower_http::{
1617
compression::CompressionLayer,
1718
cors::{Any, CorsLayer},
@@ -21,7 +22,7 @@ use tower_http::{
2122
use crate::app::{Liwan, models::Event};
2223
use crate::web::webext::serve;
2324

24-
pub use session::{MaybeExtract, SessionId, SessionUser};
25+
pub use session::MaybeSessionId;
2526
use webext::StaticFile;
2627

2728
#[derive(RustEmbed, Clone)]

src/web/routes/admin.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ use crate::{
1414
app::models::{Entity, Project, UserRole},
1515
utils::validate::can_access_project,
1616
web::{
17-
MaybeExtract, RouterState, SessionUser,
17+
RouterState,
18+
session::{Auth, MaybeAuth},
1819
webext::{ApiResult, AxumErrExt, empty_response, http_bail},
1920
},
2021
};
@@ -152,7 +153,7 @@ struct EntitiesResponse {
152153

153154
async fn get_users(
154155
app: State<RouterState>,
155-
SessionUser(user): SessionUser,
156+
Auth(user): Auth,
156157
) -> ApiResult<UseApi<impl IntoApiResponse, Json<UsersResponse>>> {
157158
if user.role != UserRole::Admin {
158159
http_bail!(StatusCode::FORBIDDEN, "Forbidden")
@@ -172,7 +173,7 @@ async fn get_users(
172173
async fn update_user(
173174
app: State<RouterState>,
174175
Path(username): Path<String>,
175-
SessionUser(session_user): SessionUser,
176+
Auth(session_user): Auth,
176177
user: Json<UpdateUserRequest>,
177178
) -> ApiResult<impl IntoApiResponse> {
178179
if session_user.role != UserRole::Admin {
@@ -193,7 +194,7 @@ async fn update_user(
193194
async fn update_user_password(
194195
app: State<RouterState>,
195196
Path(username): Path<String>,
196-
SessionUser(session_user): SessionUser,
197+
Auth(session_user): Auth,
197198
password: Json<UpdatePasswordRequest>,
198199
) -> ApiResult<impl IntoApiResponse> {
199200
if session_user.role != UserRole::Admin || username != session_user.username {
@@ -210,7 +211,7 @@ async fn update_user_password(
210211
async fn remove_user(
211212
app: State<RouterState>,
212213
Path(username): Path<String>,
213-
SessionUser(session_user): SessionUser,
214+
Auth(session_user): Auth,
214215
) -> ApiResult<impl IntoApiResponse> {
215216
if session_user.role != UserRole::Admin {
216217
http_bail!(StatusCode::FORBIDDEN, "Forbidden")
@@ -227,7 +228,7 @@ async fn remove_user(
227228

228229
async fn create_user(
229230
app: State<RouterState>,
230-
SessionUser(session_user): SessionUser,
231+
Auth(session_user): Auth,
231232
user: Json<CreateUserRequest>,
232233
) -> ApiResult<impl IntoApiResponse> {
233234
if session_user.role != UserRole::Admin {
@@ -246,7 +247,7 @@ async fn create_user(
246247
async fn project_create_handler(
247248
app: State<RouterState>,
248249
Path(project_id): Path<String>,
249-
SessionUser(user): SessionUser,
250+
Auth(user): Auth,
250251
Json(project): Json<CreateProjectRequest>,
251252
) -> ApiResult<impl IntoApiResponse> {
252253
if user.role != UserRole::Admin {
@@ -271,7 +272,7 @@ async fn project_create_handler(
271272
async fn project_update_handler(
272273
app: State<RouterState>,
273274
Path(project_id): Path<String>,
274-
SessionUser(user): SessionUser,
275+
Auth(user): Auth,
275276
Json(req): Json<UpdateProjectRequest>,
276277
) -> ApiResult<impl IntoApiResponse> {
277278
if user.role != UserRole::Admin {
@@ -300,7 +301,7 @@ async fn project_update_handler(
300301

301302
async fn projects_handler(
302303
app: State<RouterState>,
303-
MaybeExtract(user): MaybeExtract<SessionUser>,
304+
MaybeAuth(user): MaybeAuth,
304305
) -> ApiResult<UseApi<impl IntoApiResponse, Json<ProjectsResponse>>> {
305306
let projects = app.projects.all().http_err("Failed to get projects", StatusCode::INTERNAL_SERVER_ERROR)?;
306307
let projects: Vec<Project> = projects.into_iter().filter(|p| can_access_project(p, user.as_ref())).collect();
@@ -326,7 +327,7 @@ async fn projects_handler(
326327

327328
async fn project_handler(
328329
app: State<RouterState>,
329-
MaybeExtract(user): MaybeExtract<SessionUser>,
330+
MaybeAuth(user): MaybeAuth,
330331
Path(project_id): Path<String>,
331332
) -> ApiResult<UseApi<impl IntoApiResponse, Json<ProjectResponse>>> {
332333
let project = app.projects.get(&project_id).http_status(StatusCode::NOT_FOUND)?;
@@ -353,7 +354,7 @@ async fn project_handler(
353354
async fn project_delete_handler(
354355
app: State<RouterState>,
355356
Path(project_id): Path<String>,
356-
SessionUser(user): SessionUser,
357+
Auth(user): Auth,
357358
) -> ApiResult<impl IntoApiResponse> {
358359
let project = app.projects.get(&project_id).http_status(StatusCode::NOT_FOUND)?;
359360
if user.role != UserRole::Admin {
@@ -366,7 +367,7 @@ async fn project_delete_handler(
366367

367368
async fn entities_handler(
368369
app: State<RouterState>,
369-
SessionUser(user): SessionUser,
370+
Auth(user): Auth,
370371
) -> ApiResult<UseApi<impl IntoApiResponse, Json<EntitiesResponse>>> {
371372
if user.role != UserRole::Admin {
372373
http_bail!(StatusCode::FORBIDDEN, "Forbidden")
@@ -398,7 +399,7 @@ async fn entities_handler(
398399

399400
async fn entity_create_handler(
400401
app: State<RouterState>,
401-
SessionUser(user): SessionUser,
402+
Auth(user): Auth,
402403
Json(entity): Json<CreateEntityRequest>,
403404
) -> ApiResult<Json<EntityResponse>> {
404405
if user.role != UserRole::Admin {
@@ -418,7 +419,7 @@ async fn entity_create_handler(
418419
async fn entity_update_handler(
419420
app: State<RouterState>,
420421
Path(entity_id): Path<String>,
421-
SessionUser(user): SessionUser,
422+
Auth(user): Auth,
422423
Json(entity): Json<UpdateEntityRequest>,
423424
) -> ApiResult<impl IntoApiResponse> {
424425
if user.role != UserRole::Admin {
@@ -443,7 +444,7 @@ async fn entity_update_handler(
443444
async fn entity_delete_handler(
444445
app: State<RouterState>,
445446
Path(entity_id): Path<String>,
446-
SessionUser(user): SessionUser,
447+
Auth(user): Auth,
447448
) -> ApiResult<impl IntoApiResponse> {
448449
if user.role != UserRole::Admin {
449450
http_bail!(StatusCode::FORBIDDEN, "Forbidden")

0 commit comments

Comments
 (0)