Skip to content

Commit daa4d84

Browse files
committed
feat(crawler)!: rewrite complete crawl system
1 parent 06436b4 commit daa4d84

2 files changed

Lines changed: 133 additions & 92 deletions

File tree

src/scanning/crawler.rs

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
use std::net::Ipv4Addr;
2+
use std::time::{Duration, Instant};
3+
use colored_text::Colorize;
4+
use futures::{stream, StreamExt};
5+
use crate::database::{parse_server, ServerHistory, ServerInfo};
6+
use crate::config::MainConfig;
7+
use crate::logger;
8+
use crate::logger::DefaultColor;
9+
use crate::manager::TaskManager;
10+
use crate::randomizer::IpGenerator;
11+
use crate::scanning::scanner::{scan, ScanConfig};
12+
use crate::scanning::utils::{format_time, prettier_ping_result, save_server};
13+
14+
pub async fn crawl(gen_config: IpGenerator) {
15+
let _ = TaskManager::spawn("Crawler", move |_cancel_token| async move {
16+
loop {
17+
let start_time = Instant::now();
18+
19+
let ports = vec![25565, 25566]; // Example for later implementation
20+
let targets: Vec<(Ipv4Addr, u16)> = gen_config.generate()
21+
.flat_map(|ip| {
22+
stream::iter(ports.clone().into_iter().map(move |port| (ip, port)))
23+
})
24+
.collect()
25+
.await;
26+
27+
let total_targets = targets.len();
28+
let mut found_batch: Vec<(ServerInfo, ServerHistory)> = Vec::new();
29+
30+
let mut total_found_count = 0;
31+
let mut processed_count = 0;
32+
33+
logger::info(format!(
34+
"Scanning {} targets...",
35+
targets.len().hex(DefaultColor::Highlight.hex())
36+
)).prefix("Crawler").send().await;
37+
38+
let main_cfg = MainConfig::get().expect("Config not loaded!");
39+
40+
let config = ScanConfig {
41+
ping_timeout: Duration::from_millis(main_cfg.general.ping_timeout),
42+
query_timeout: Duration::from_millis(main_cfg.general.query_timeout),
43+
join_timeout: Duration::from_millis(main_cfg.general.join_timeout),
44+
with_uuid: main_cfg.general.do_uuid_fetch,
45+
max_tasks: main_cfg.get_crawler_tasks(),
46+
..ScanConfig::default()
47+
};
48+
49+
// Core part: scanning
50+
let scan_stream = scan(targets, config);
51+
tokio::pin!(scan_stream);
52+
53+
// Scan stream
54+
while let Some(maybe_result) = scan_stream.next().await {
55+
processed_count += 1;
56+
57+
// Success
58+
if let Some(result) = maybe_result {
59+
let parsed = parse_server(result.ip, result.port, result.ping.clone(), result.query, result.join);
60+
found_batch.push(parsed);
61+
total_found_count += 1;
62+
63+
let mut output = String::new();
64+
output.push_str(
65+
&format!(
66+
"Found server: {}:{}\n",
67+
result.ip.to_string().hex(DefaultColor::Highlight.hex()),
68+
result.port.hex(DefaultColor::Highlight.hex())
69+
)
70+
);
71+
output.push_str(&prettier_ping_result(result.ping).await);
72+
logger::success(output).prefix("Crawler").send().await;
73+
}
74+
75+
// Database insert
76+
if processed_count % 10000 == 0 {
77+
let batch_to_insert = std::mem::take(&mut found_batch);
78+
save_server(&batch_to_insert).await;
79+
}
80+
81+
// Progress calc
82+
let elapsed = start_time.elapsed().as_secs_f64();
83+
let ips_per_second = processed_count as f64 / elapsed;
84+
85+
if ips_per_second > 0.0 {
86+
let remaining_ips = total_targets.saturating_sub(processed_count);
87+
let remaining_secs = remaining_ips as f64 / ips_per_second;
88+
let percent = format!("{:.2}", (processed_count as f64 / total_targets as f64) * 100.0);
89+
90+
if processed_count % 10000 == 0 || processed_count == total_targets {
91+
logger::info(format!(
92+
"Progress: {}/{} IPs ({}%) - ETA: {}",
93+
processed_count.hex(DefaultColor::Highlight.hex()),
94+
total_targets.hex(DefaultColor::Highlight.hex()),
95+
percent.hex(DefaultColor::Highlight.hex()),
96+
format_time(remaining_secs as u64)
97+
)).prefix("Crawler").send().await;
98+
}
99+
}
100+
}
101+
102+
if !found_batch.is_empty() {
103+
save_server(&found_batch).await;
104+
}
105+
106+
// Finished
107+
let elapsed_time = start_time.elapsed();
108+
109+
let pps = if elapsed_time.as_secs() > 0 {
110+
total_targets as f64 / elapsed_time.as_secs_f64()
111+
} else {
112+
0.0
113+
};
114+
let percent = format!("{:.2}", (processed_count as f64 / total_targets as f64) * 100.0);
115+
116+
logger::info(
117+
format!(
118+
"Crawl iteration finished in {}. Found {} servers from {} targets. That is {}% ({}{})",
119+
format_time(elapsed_time.as_secs()).hex(DefaultColor::Highlight.hex()),
120+
total_found_count.hex(DefaultColor::Highlight.hex()),
121+
total_targets.hex(DefaultColor::Highlight.hex()),
122+
percent.hex(DefaultColor::Highlight.hex()),
123+
pps.round().hex(DefaultColor::Highlight.hex()),
124+
"pps".hex(DefaultColor::DarkHighlight.hex())
125+
)
126+
).send().await;
127+
}
128+
}).await;
129+
}

src/tasks.rs

Lines changed: 4 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
use std::net::Ipv4Addr;
2+
use std::str::FromStr;
23
use std::sync::{Arc, OnceLock};
34
use std::time::Duration;
45
use colored_text::Colorize;
56
use futures::StreamExt;
67
use tokio::sync::Semaphore;
7-
use tokio::task::JoinSet;
88
use crate::logger;
99
use crate::logger::DefaultColor;
1010
use crate::manager::TaskManager;
1111
use crate::minecraft::join::execute_join_check;
1212
use crate::minecraft::ping::execute_ping;
1313
use crate::minecraft::query::execute_query;
14-
use crate::randomizer::{IpGenerator, IpType};
1514
use crate::scanning::utils::{prettier_ping_result, prettier_query_result};
1615

1716
static NETWORK_SEMAPHORE: OnceLock<Arc<Semaphore>> = OnceLock::new();
@@ -34,101 +33,14 @@ pub async fn init_networking(max_tasks: usize) {
3433
}
3534
}
3635

37-
pub async fn crawl(cidr: Option<(Ipv4Addr, u8)>, max_tasks: u32, ip_count: u32) {
38-
TaskManager::spawn("Crawler", move |cancel_token| async move {
39-
logger::info("Started crawler...".to_string())
40-
.prefix("Crawler")
41-
.send()
42-
.await;
43-
44-
let mut iteration = 1;
45-
46-
loop {
47-
if cancel_token.is_cancelled() { break; }
48-
49-
let mut builder = IpGenerator::builder()
50-
.ip_type(IpType::PublicOnly)
51-
.count(ip_count);
52-
53-
if let Some((ip, prefix)) = cidr {
54-
builder = builder.cidr(ip, prefix);
55-
logger::info(
56-
format!(
57-
"Crawling CIDR {}/{} (Run #{})",
58-
ip.hex(DefaultColor::Highlight.hex()),
59-
prefix.hex(DefaultColor::Highlight.hex()),
60-
iteration.hex(DefaultColor::Highlight.hex())
61-
)
62-
).prefix("Crawler").send().await;
63-
} else {
64-
logger::info(
65-
format!(
66-
"Crawling random IPs (Run #{})",
67-
iteration.hex(DefaultColor::Highlight.hex())
68-
)
69-
).prefix("Crawler").send().await;
70-
}
71-
72-
let mut ip_stream = builder.build().generate();
73-
let mut set = JoinSet::new();
74-
75-
while let Some(ip) = ip_stream.next().await {
76-
if cancel_token.is_cancelled() {
77-
break;
78-
}
79-
80-
while set.len() >= max_tasks as usize {
81-
set.join_next().await;
82-
}
83-
84-
let port = 25565;
85-
let c_token = cancel_token.clone();
86-
let ip_str = ip.to_string();
87-
88-
set.spawn(async move {
89-
if c_token.is_cancelled() { return; }
90-
91-
match execute_ping(ip_str.clone(), port, 767, Duration::from_secs(3)).await {
92-
Ok(result) => {
93-
let mut output = String::new();
94-
output.push_str(
95-
&format!(
96-
"Found server: {}:{}\n",
97-
ip_str.hex(DefaultColor::Highlight.hex()),
98-
port.hex(DefaultColor::Highlight.hex())
99-
)
100-
);
101-
output.push_str(&prettier_ping_result(result).await);
102-
logger::success(output).prefix("Crawler").send().await;
103-
}
104-
Err(_) => {
105-
}
106-
}
107-
});
108-
}
109-
110-
while let Some(_) = set.join_next().await {}
111-
112-
if cancel_token.is_cancelled() {
113-
logger::info("Shutting down crawler.".to_string())
114-
.prefix("Crawler").send().await;
115-
return;
116-
}
117-
118-
iteration += 1;
119-
tokio::time::sleep(Duration::from_secs(2)).await;
120-
}
121-
}).await;
122-
}
123-
12436
// Testing
12537
pub async fn run_ping(target: String) {
12638
TaskManager::spawn("Ping", move |_cancel_token| async move {
12739
logger::info(format!("Starting Ping for {}", target.clone().hex("#00BFFF")))
12840
.send().await;
12941

13042
let parts: Vec<&str> = target.split(':').collect();
131-
let ip = parts[0].to_string();
43+
let ip = Ipv4Addr::from_str(parts[0]).unwrap(); // TODO
13244
let port = parts.get(1).and_then(|p| p.parse::<u16>().ok()).unwrap_or(25565);
13345

13446
match execute_ping(ip, port, 767, Duration::from_secs(5)).await {
@@ -155,7 +67,7 @@ pub async fn run_query(target: String) {
15567
.prefix("Query").send().await;
15668

15769
let parts: Vec<&str> = target.split(':').collect();
158-
let ip = parts[0];
70+
let ip = Ipv4Addr::from_str(parts[0]).unwrap(); // TODO
15971
let port = parts.get(1).and_then(|p| p.parse::<u16>().ok()).unwrap_or(25565);
16072

16173
match execute_query(ip, port, Duration::from_secs(5), true).await {
@@ -181,7 +93,7 @@ pub async fn run_join(target: String) {
18193
.prefix("Join").send().await;
18294

18395
let parts = target.split(':').collect::<Vec<&str>>();
184-
let ip = parts[0].to_string();
96+
let ip = Ipv4Addr::from_str(parts[0]).unwrap(); // TODO
18597
let port = parts.get(1).and_then(|p| p.parse::<u16>().ok()).unwrap_or(25565);
18698

18799
let username = "ServerRawler";

0 commit comments

Comments
 (0)