diff options
Diffstat (limited to 'src/lookup.rs')
| -rw-r--r-- | src/lookup.rs | 860 |
1 files changed, 860 insertions, 0 deletions
diff --git a/src/lookup.rs b/src/lookup.rs new file mode 100644 index 0000000..f5b3177 --- /dev/null +++ b/src/lookup.rs @@ -0,0 +1,860 @@ +use crate::tlds::WhoisOverrides; +use crate::types::{DomainResult, DomainStatus, ErrorKind}; +use futures::stream::{self, StreamExt}; +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::Arc; +use std::time::Duration; + +#[cfg(feature = "builtin-whois")] +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +#[cfg(feature = "builtin-whois")] +use tokio::net::TcpStream; + +// IANA RDAP bootstrap URL, set in Cargo.toml [package.metadata.hoardom] +const RDAP_BOOTSTRAP_URL: &str = env!("HOARDOM_RDAP_BOOTSTRAP_URL"); + +// TLD -> RDAP server map, grabbed once and reused +pub struct RdapBootstrap { + tld_map: HashMap<String, String>, + raw_json: Option<String>, +} + +impl RdapBootstrap { + pub async fn fetch(client: &reqwest::Client, verbose: bool) -> Result<Self, String> { + if verbose { + eprintln!("[verbose] Fetching RDAP bootstrap from {}", RDAP_BOOTSTRAP_URL); + } + + let resp = client + .get(RDAP_BOOTSTRAP_URL) + .send() + .await + .map_err(|e| format!("Failed to fetch RDAP bootstrap: {}", e))?; + + if !resp.status().is_success() { + return Err(format!("RDAP bootstrap returned HTTP {}", resp.status())); + } + + let body = resp + .text() + .await + .map_err(|e| format!("Failed to read RDAP bootstrap body: {}", e))?; + + let json: serde_json::Value = serde_json::from_str(&body) + .map_err(|e| format!("Failed to parse RDAP bootstrap JSON: {}", e))?; + + let tld_map = Self::parse_bootstrap_json(&json); + + if verbose { + eprintln!("[verbose] RDAP bootstrap loaded, {} TLDs mapped", tld_map.len()); + } + + Ok(Self { tld_map, raw_json: Some(body) }) + } + + pub fn load_cached(cache_path: &Path, verbose: bool) -> Result<Self, String> { + if verbose { + eprintln!("[verbose] Loading cached RDAP bootstrap from {}", cache_path.display()); + } + let body = std::fs::read_to_string(cache_path) + .map_err(|e| format!("Could not read cached bootstrap: {}", e))?; + let json: serde_json::Value = serde_json::from_str(&body) + .map_err(|e| format!("Could not parse cached bootstrap: {}", e))?; + let tld_map = Self::parse_bootstrap_json(&json); + if verbose { + eprintln!("[verbose] Cached RDAP bootstrap loaded, {} TLDs mapped", tld_map.len()); + } + Ok(Self { tld_map, raw_json: Some(body) }) + } + + pub fn save_cache(&self, cache_path: &Path) -> Result<(), String> { + if let Some(ref json) = self.raw_json { + if let Some(parent) = cache_path.parent() { + std::fs::create_dir_all(parent) + .map_err(|e| format!("Failed to create cache dir: {}", e))?; + } + std::fs::write(cache_path, json) + .map_err(|e| format!("Failed to write cache file: {}", e))?; + } + Ok(()) + } + + fn parse_bootstrap_json(json: &serde_json::Value) -> HashMap<String, String> { + let mut tld_map = HashMap::new(); + // bootstrap format: { "services": [ [ ["tld1", "tld2"], ["https://rdap.server.example/"] ], ... ] } + if let Some(services) = json.get("services").and_then(|s| s.as_array()) { + for service in services { + if let Some(arr) = service.as_array() { + if arr.len() >= 2 { + let tlds = arr[0].as_array(); + let urls = arr[1].as_array(); + if let (Some(tlds), Some(urls)) = (tlds, urls) { + if let Some(base_url) = urls.first().and_then(|u| u.as_str()) { + let base = base_url.trim_end_matches('/').to_string(); + for tld in tlds { + if let Some(t) = tld.as_str() { + tld_map.insert(t.to_lowercase(), base.clone()); + } + } + } + } + } + } + } + } + tld_map + } + + pub fn get_server(&self, tld: &str) -> Option<&String> { + self.tld_map.get(&tld.to_lowercase()) + } +} + +pub async fn lookup_domain( + client: &reqwest::Client, + bootstrap: &RdapBootstrap, + whois_overrides: &WhoisOverrides, + name: &str, + tld: &str, + verbose: bool, +) -> DomainResult { + let full = format!("{}.{}", name, tld); + + let base_url = match bootstrap.get_server(tld) { + Some(url) => url.clone(), + None => { + // no RDAP server for this TLD, fall back to WHOIS + if verbose { + eprintln!("[verbose] No RDAP server for {}, falling back to WHOIS", tld); + } + return whois_lookup(whois_overrides, name, tld, verbose).await; + } + }; + + let url = format!("{}/domain/{}", base_url, full); + + if verbose { + eprintln!("[verbose] Looking up: {}", url); + } + + let resp = match client.get(&url).send().await { + Ok(r) => r, + Err(e) => { + if verbose { + eprintln!("[verbose] Request error for {}: {}", full, e); + } + let kind = if e.is_timeout() { + ErrorKind::Timeout + } else { + ErrorKind::Unknown + }; + return DomainResult::new(name, tld, DomainStatus::Error { + kind, + message: "unknown error".to_string(), + }); + } + }; + + let status_code = resp.status(); + + if verbose { + eprintln!("[verbose] {} -> HTTP {}", full, status_code); + } + + // 404 = not found in RDAP = domain is available (not registered) + if status_code == 404 { + return DomainResult::new(name, tld, DomainStatus::Available); + } + + // 400 = probably invalid query + if status_code == 400 { + return DomainResult::new(name, tld, DomainStatus::Error { + kind: ErrorKind::InvalidTld, + message: "invalid tld".to_string(), + }); + } + + // 429 = rate limited + if status_code == 429 { + return DomainResult::new(name, tld, DomainStatus::Error { + kind: ErrorKind::RateLimit, + message: "rate limited".to_string(), + }); + } + + // 403 = forbidden (some registries block queries) + if status_code == 403 { + return DomainResult::new(name, tld, DomainStatus::Error { + kind: ErrorKind::Forbidden, + message: "forbidden".to_string(), + }); + } + + // anything else thats not success + if !status_code.is_success() { + return DomainResult::new(name, tld, DomainStatus::Error { + kind: ErrorKind::Unknown, + message: format!("HTTP {}", status_code), + }); + } + + // 200 = domain exists, try to parse expiry from RDAP json + let expiry = match resp.json::<serde_json::Value>().await { + Ok(json) => extract_expiry(&json), + Err(_) => None, + }; + + DomainResult::new(name, tld, DomainStatus::Registered { expiry }) +} + +fn extract_expiry(json: &serde_json::Value) -> Option<String> { + // RDAP stores events as an array, expiration is eventAction = "expiration" + if let Some(events) = json.get("events").and_then(|e| e.as_array()) { + for event in events { + if let Some(action) = event.get("eventAction").and_then(|a| a.as_str()) { + if action == "expiration" { + if let Some(date) = event.get("eventDate").and_then(|d| d.as_str()) { + // RDAP dates are ISO 8601, just grab the date part + return Some(date.chars().take(10).collect()); + } + } + } + } + } + None +} + +// ---- WHOIS fallback for TLDs not in RDAP bootstrap ---- + +// -- No whois feature: just return an error -- +#[cfg(not(any(feature = "system-whois", feature = "builtin-whois")))] +async fn whois_lookup(_whois_overrides: &WhoisOverrides, name: &str, tld: &str, _verbose: bool) -> DomainResult { + DomainResult::new(name, tld, DomainStatus::Error { + kind: ErrorKind::InvalidTld, + message: "no RDAP server (whois disabled)".to_string(), + }) +} + +// -- System whois: shells out to the systems whois binary -- +#[cfg(feature = "system-whois")] +async fn whois_lookup(_whois_overrides: &WhoisOverrides, name: &str, tld: &str, verbose: bool) -> DomainResult { + let full = format!("{}.{}", name, tld); + let whois_cmd = env!("HOARDOM_WHOIS_CMD"); + let whois_flags = env!("HOARDOM_WHOIS_FLAGS"); + + if verbose { + if whois_flags.is_empty() { + eprintln!("[verbose] System WHOIS: {} {}", whois_cmd, full); + } else { + eprintln!("[verbose] System WHOIS: {} {} {}", whois_cmd, whois_flags, full); + } + } + + let mut cmd = tokio::process::Command::new(whois_cmd); + // add flags if any are configured + if !whois_flags.is_empty() { + for flag in whois_flags.split_whitespace() { + cmd.arg(flag); + } + } + cmd.arg(&full); + + let output = match tokio::time::timeout( + Duration::from_secs(15), + cmd.output(), + ).await { + Ok(Ok(out)) => out, + Ok(Err(e)) => { + if verbose { + eprintln!("[verbose] System whois error for {}: {}", full, e); + } + return DomainResult::new(name, tld, DomainStatus::Error { + kind: ErrorKind::Unknown, + message: format!("whois command failed: {}", e), + }); + } + Err(_) => { + if verbose { + eprintln!("[verbose] System whois timeout for {}", full); + } + return DomainResult::new(name, tld, DomainStatus::Error { + kind: ErrorKind::Timeout, + message: "whois timeout".to_string(), + }); + } + }; + + let response_str = String::from_utf8_lossy(&output.stdout); + + if verbose { + eprintln!("[verbose] WHOIS response for {} ({} bytes)", full, output.stdout.len()); + } + + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + if verbose { + eprintln!("[verbose] whois stderr: {}", stderr.trim()); + } + // some whois commands exit non-zero for "not found" but still give useful stdout + if !response_str.is_empty() { + return parse_whois_response(name, tld, &response_str); + } + return DomainResult::new(name, tld, DomainStatus::Error { + kind: ErrorKind::Unknown, + message: "whois command returned error".to_string(), + }); + } + + parse_whois_response(name, tld, &response_str) +} + +// -- Builtin whois: raw TCP to whois servers directly -- + +/// try a whois server, returns the response string or errors out +#[cfg(feature = "builtin-whois")] +async fn try_whois_server(server: &str, domain: &str, verbose: bool) -> Result<String, &'static str> { + let addr = format!("{}:43", server); + + let stream = match tokio::time::timeout( + Duration::from_secs(4), + TcpStream::connect(&addr), + ).await { + Ok(Ok(s)) => s, + Ok(Err(_)) => return Err("connect error"), + Err(_) => return Err("connect timeout"), + }; + + if verbose { + eprintln!("[verbose] WHOIS connected: {} -> {}", domain, server); + } + + let (mut reader, mut writer) = stream.into_split(); + + let query = format!("{}\r\n", domain); + if writer.write_all(query.as_bytes()).await.is_err() { + return Err("write error"); + } + + let mut response = Vec::new(); + match tokio::time::timeout( + Duration::from_secs(8), + reader.read_to_end(&mut response), + ).await { + Ok(Ok(_)) => {} + Ok(Err(_)) => return Err("read error"), + Err(_) => return Err("read timeout"), + } + + Ok(String::from_utf8_lossy(&response).to_string()) +} + +/// candidate whois servers for a TLD based on common naming patterns +#[cfg(feature = "builtin-whois")] +fn whois_candidates(tld: &str) -> Vec<String> { + // most registries follow one of these patterns + vec![ + format!("whois.nic.{}", tld), + format!("whois.{}", tld), + format!("{}.whois-servers.net", tld), + ] +} + +#[cfg(feature = "builtin-whois")] +async fn whois_lookup(whois_overrides: &WhoisOverrides, name: &str, tld: &str, verbose: bool) -> DomainResult { + let full = format!("{}.{}", name, tld); + + // if Lists.toml has an explicit server ("tld:server"), use ONLY that one + if let Some(server) = whois_overrides.get_server(tld) { + if verbose { + eprintln!("[verbose] WHOIS (override): {} -> {}", full, server); + } + return match try_whois_server(server, &full, verbose).await { + Ok(resp) if !resp.is_empty() => parse_whois_response(name, tld, &resp), + Ok(_) => DomainResult::new(name, tld, DomainStatus::Error { + kind: ErrorKind::Unknown, + message: "empty whois response".to_string(), + }), + Err(e) => DomainResult::new(name, tld, DomainStatus::Error { + kind: if e.contains("timeout") { ErrorKind::Timeout } else { ErrorKind::Unknown }, + message: format!("whois {}: {}", server, e), + }), + }; + } + + // no override: try common server patterns until one responds + let candidates = whois_candidates(tld); + + if verbose { + eprintln!("[verbose] WHOIS probing {} candidates for .{}", candidates.len(), tld); + } + + for server in &candidates { + match try_whois_server(server, &full, verbose).await { + Ok(resp) if !resp.is_empty() => { + return parse_whois_response(name, tld, &resp); + } + Ok(_) => { + if verbose { + eprintln!("[verbose] WHOIS {} returned empty for {}", server, full); + } + } + Err(e) => { + if verbose { + eprintln!("[verbose] WHOIS {} failed for {}: {}", server, full, e); + } + } + } + } + + // nothing worked + DomainResult::new(name, tld, DomainStatus::Error { + kind: ErrorKind::Unknown, + message: "no whois server reachable".to_string(), + }) +} + +fn parse_whois_response(name: &str, tld: &str, response: &str) -> DomainResult { + let lower = response.to_lowercase(); + + // common "not found" / "available" patterns across registrars + let available_patterns = [ + "no match for", + "not found", + "no entries found", + "no data found", + "status: free", + "status: available", + "is free", + "no object found", + "object not found", + "nothing found", + "domain not found", + "no information available", + "we do not have an entry", + ]; + + for pattern in &available_patterns { + if lower.contains(pattern) { + return DomainResult::new(name, tld, DomainStatus::Available); + } + } + + // if we got a response and it wasnt "not found", domain is probably registered + // try to extract expiry date + let expiry = extract_whois_expiry(&lower); + + DomainResult::new(name, tld, DomainStatus::Registered { expiry }) +} + +fn extract_whois_expiry(response: &str) -> Option<String> { + let expiry_patterns = [ + "expiry date:", + "expiration date:", + "registry expiry date:", + "registrar registration expiration date:", + "paid-till:", + "expires:", + "expire:", + "renewal date:", + "expires on:", + ]; + + for line in response.lines() { + let trimmed = line.trim().to_lowercase(); + for pattern in &expiry_patterns { + if trimmed.starts_with(pattern) { + let value = trimmed[pattern.len()..].trim(); + // try to extract a date-looking thing (first 10 chars if it looks like YYYY-MM-DD) + if value.len() >= 10 { + let date_part: String = value.chars().take(10).collect(); + // basic sanity check: contains digits and dashes + if date_part.contains('-') && date_part.chars().any(|c| c.is_ascii_digit()) { + return Some(date_part); + } + } + // maybe its in a different format, just return what we got + if !value.is_empty() { + return Some(value.to_string()); + } + } + } + } + None +} + +pub async fn lookup_with_retry( + client: &reqwest::Client, + bootstrap: &RdapBootstrap, + whois_overrides: &WhoisOverrides, + name: &str, + tld: &str, + retries: u32, + noretry: &[ErrorKind], + verbose: bool, +) -> DomainResult { + let mut result = lookup_domain(client, bootstrap, whois_overrides, name, tld, verbose).await; + + for attempt in 1..=retries { + if !result.is_error() { + break; + } + // skip retry if the error kind is in the noretry list + if let DomainStatus::Error { kind, .. } = &result.status { + if noretry.contains(kind) { + if verbose { + eprintln!("[verbose] Not retrying {}.{} (error kind in noretry list)", name, tld); + } + break; + } + } + if verbose { + eprintln!("[verbose] Retry {}/{} for {}.{}", attempt, retries, name, tld); + } + tokio::time::sleep(Duration::from_millis(500)).await; + result = lookup_domain(client, bootstrap, whois_overrides, name, tld, verbose).await; + } + + result +} + +pub fn build_client() -> reqwest::Client { + reqwest::Client::builder() + .timeout(Duration::from_secs(10)) + .user_agent(format!("hoardom/{}", env!("CARGO_PKG_VERSION"))) + .build() + .expect("Failed to create HTTP client") +} + +pub async fn lookup_all( + name: &str, + tlds: &[&str], + delay_secs: f64, + retries: u32, + verbose: bool, + cache_path: Option<&Path>, + force_refresh: bool, + jobs: u8, + whois_overrides: &WhoisOverrides, + noretry: &[ErrorKind], + on_progress: impl Fn(usize, usize), +) -> Vec<DomainResult> { + let client = build_client(); + + let bootstrap = match resolve_bootstrap(&client, cache_path, force_refresh, verbose).await { + Some(b) => b, + None => return Vec::new(), + }; + + let total = tlds.len(); + let concurrent = (jobs as usize).max(1); + + if concurrent <= 1 { + // sequential path (original behaviour) + let mut results = Vec::with_capacity(total); + let delay = Duration::from_secs_f64(delay_secs); + for (i, tld) in tlds.iter().enumerate() { + let result = lookup_with_retry(&client, &bootstrap, whois_overrides, name, tld, retries, noretry, verbose).await; + results.push(result); + on_progress(i + 1, total); + if delay_secs > 0.0 && i + 1 < total { + tokio::time::sleep(delay).await; + } + } + results + } else { + // concurrent path + let bootstrap = Arc::new(bootstrap); + let client = Arc::new(client); + let whois_overrides = Arc::new(whois_overrides.clone()); + let noretry = Arc::new(noretry.to_vec()); + let name_owned = name.to_string(); + + let mut stream = stream::iter(tlds.iter().enumerate()) + .map(|(i, tld)| { + let client = Arc::clone(&client); + let bootstrap = Arc::clone(&bootstrap); + let whois_overrides = Arc::clone(&whois_overrides); + let noretry = Arc::clone(&noretry); + let name = name_owned.clone(); + let tld = tld.to_string(); + async move { + let result = lookup_with_retry(&client, &bootstrap, &whois_overrides, &name, &tld, retries, &noretry, verbose).await; + (i, result) + } + }) + .buffer_unordered(concurrent); + + let mut results: Vec<(usize, DomainResult)> = Vec::with_capacity(total); + let mut done_count = 0usize; + while let Some(item) = stream.next().await { + results.push(item); + done_count += 1; + on_progress(done_count, total); + } + + // sort by original order + results.sort_by_key(|(i, _)| *i); + + results.into_iter().map(|(_, r)| r).collect() + } +} + +pub async fn refresh_cache(cache_path: &Path, verbose: bool) -> Result<(), String> { + let client = build_client(); + let bootstrap = RdapBootstrap::fetch(&client, verbose).await?; + bootstrap.save_cache(cache_path)?; + eprintln!("RDAP bootstrap cache refreshed ({} TLDs)", bootstrap.tld_map.len()); + Ok(()) +} + +async fn resolve_bootstrap( + client: &reqwest::Client, + cache_path: Option<&Path>, + force_refresh: bool, + verbose: bool, +) -> Option<RdapBootstrap> { + // try loading bootstrap from cache first (unless force refresh) + let cached = if !force_refresh { + if let Some(cp) = cache_path { + if cp.exists() { + match RdapBootstrap::load_cached(cp, verbose) { + Ok(b) => Some(b), + Err(e) => { + if verbose { + eprintln!("[verbose] Cache load failed: {}, fetching fresh", e); + } + None + } + } + } else { + None + } + } else { + None + } + } else { + if verbose { + eprintln!("[verbose] Force refresh requested, skipping cache"); + } + None + }; + + match cached { + Some(b) => Some(b), + None => { + match RdapBootstrap::fetch(client, verbose).await { + Ok(b) => { + if let Some(cp) = cache_path { + if let Err(e) = b.save_cache(cp) { + if verbose { + eprintln!("[verbose] Failed to save cache: {}", e); + } + } else if verbose { + eprintln!("[verbose] RDAP bootstrap cached to {}", cp.display()); + } + } + Some(b) + } + Err(e) => { + eprintln!("Error: {}", e); + eprintln!("Cannot perform lookups without RDAP bootstrap data."); + None + } + } + } + } +} + +pub enum StreamMsg { + Result { result: DomainResult, sort_index: usize }, + Progress { current: usize, total: usize }, + Error(String), + Done, +} + +pub struct LookupStream { + pub receiver: tokio::sync::mpsc::Receiver<StreamMsg>, + pub handle: tokio::task::JoinHandle<()>, +} + +pub type LookupBatch = Vec<(String, Vec<String>)>; + +// spawns a bg task, sends results via channel so TUI gets em live +pub fn lookup_streaming( + name: String, + tlds: Vec<String>, + delay_secs: f64, + retries: u32, + verbose: bool, + cache_path: Option<PathBuf>, + force_refresh: bool, + jobs: u8, + whois_overrides: WhoisOverrides, + noretry: Vec<ErrorKind>, +) -> LookupStream { + let (tx, rx) = tokio::sync::mpsc::channel(64); + + let handle = tokio::spawn(async move { + let client = build_client(); + + let bootstrap = match resolve_bootstrap( + &client, + cache_path.as_deref(), + force_refresh, + verbose, + ).await { + Some(b) => b, + None => { + let _ = tx.send(StreamMsg::Error("Failed to load RDAP bootstrap".to_string())).await; + let _ = tx.send(StreamMsg::Done).await; + return; + } + }; + + let total = tlds.len(); + let concurrent = (jobs as usize).max(1); + + if concurrent <= 1 { + let delay = Duration::from_secs_f64(delay_secs); + for (i, tld) in tlds.iter().enumerate() { + let result = lookup_with_retry(&client, &bootstrap, &whois_overrides, &name, tld, retries, &noretry, verbose).await; + let _ = tx.send(StreamMsg::Result { result, sort_index: i }).await; + let _ = tx.send(StreamMsg::Progress { current: i + 1, total }).await; + if delay_secs > 0.0 && i + 1 < total { + tokio::time::sleep(delay).await; + } + } + } else { + let bootstrap = Arc::new(bootstrap); + let client = Arc::new(client); + let whois_overrides = Arc::new(whois_overrides); + let noretry = Arc::new(noretry); + let tx2 = tx.clone(); + + let mut stream = stream::iter(tlds.into_iter().enumerate()) + .map(|(idx, tld)| { + let client = Arc::clone(&client); + let bootstrap = Arc::clone(&bootstrap); + let whois_overrides = Arc::clone(&whois_overrides); + let noretry = Arc::clone(&noretry); + let name = name.clone(); + async move { + let result = lookup_with_retry(&client, &bootstrap, &whois_overrides, &name, &tld, retries, &noretry, verbose).await; + (idx, result) + } + }) + .buffer_unordered(concurrent); + + let mut done_count = 0usize; + while let Some((idx, result)) = stream.next().await { + done_count += 1; + let _ = tx2.send(StreamMsg::Result { result, sort_index: idx }).await; + let _ = tx2.send(StreamMsg::Progress { current: done_count, total }).await; + } + } + + let _ = tx.send(StreamMsg::Done).await; + }); + + LookupStream { + receiver: rx, + handle, + } +} + +pub fn lookup_many_streaming( + batches: LookupBatch, + delay_secs: f64, + retries: u32, + verbose: bool, + cache_path: Option<PathBuf>, + force_refresh: bool, + jobs: u8, + whois_overrides: WhoisOverrides, + noretry: Vec<ErrorKind>, +) -> LookupStream { + if batches.len() == 1 { + let (name, tlds) = batches.into_iter().next().unwrap(); + return lookup_streaming(name, tlds, delay_secs, retries, verbose, cache_path, force_refresh, jobs, whois_overrides, noretry); + } + + let (tx, rx) = tokio::sync::mpsc::channel(64); + + let handle = tokio::spawn(async move { + let client = build_client(); + + let bootstrap = match resolve_bootstrap( + &client, + cache_path.as_deref(), + force_refresh, + verbose, + ).await { + Some(b) => b, + None => { + let _ = tx.send(StreamMsg::Error("Failed to load RDAP bootstrap".to_string())).await; + let _ = tx.send(StreamMsg::Done).await; + return; + } + }; + + let total: usize = batches.iter().map(|(_, tlds)| tlds.len()).sum(); + let concurrent = (jobs as usize).max(1); + + if concurrent <= 1 { + let delay = Duration::from_secs_f64(delay_secs); + let mut current = 0usize; + let mut global_idx = 0usize; + for (name, tlds) in batches { + for tld in tlds { + let result = lookup_with_retry(&client, &bootstrap, &whois_overrides, &name, &tld, retries, &noretry, verbose).await; + current += 1; + let _ = tx.send(StreamMsg::Result { result, sort_index: global_idx }).await; + let _ = tx.send(StreamMsg::Progress { current, total }).await; + if delay_secs > 0.0 && current < total { + tokio::time::sleep(delay).await; + } + global_idx += 1; + } + } + } else { + let bootstrap = Arc::new(bootstrap); + let client = Arc::new(client); + let whois_overrides = Arc::new(whois_overrides); + let noretry = Arc::new(noretry); + let tx2 = tx.clone(); + + // flatten all (name, tld) pairs with their global index + let pairs: Vec<(usize, String, String)> = batches + .into_iter() + .flat_map(|(name, tlds)| tlds.into_iter().map(move |tld| (name.clone(), tld))) + .enumerate() + .map(|(idx, (name, tld))| (idx, name, tld)) + .collect(); + + let mut stream = stream::iter(pairs.into_iter()) + .map(|(idx, name, tld)| { + let client = Arc::clone(&client); + let bootstrap = Arc::clone(&bootstrap); + let whois_overrides = Arc::clone(&whois_overrides); + let noretry = Arc::clone(&noretry); + async move { + let result = lookup_with_retry(&client, &bootstrap, &whois_overrides, &name, &tld, retries, &noretry, verbose).await; + (idx, result) + } + }) + .buffer_unordered(concurrent); + + let mut done_count = 0usize; + while let Some((idx, result)) = stream.next().await { + done_count += 1; + let _ = tx2.send(StreamMsg::Result { result, sort_index: idx }).await; + let _ = tx2.send(StreamMsg::Progress { current: done_count, total }).await; + } + } + + let _ = tx.send(StreamMsg::Done).await; + }); + + LookupStream { receiver: rx, handle } +} |
