aboutsummaryrefslogtreecommitdiff
path: root/src/lookup.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/lookup.rs')
-rw-r--r--src/lookup.rs860
1 files changed, 860 insertions, 0 deletions
diff --git a/src/lookup.rs b/src/lookup.rs
new file mode 100644
index 0000000..f5b3177
--- /dev/null
+++ b/src/lookup.rs
@@ -0,0 +1,860 @@
+use crate::tlds::WhoisOverrides;
+use crate::types::{DomainResult, DomainStatus, ErrorKind};
+use futures::stream::{self, StreamExt};
+use std::collections::HashMap;
+use std::path::{Path, PathBuf};
+use std::sync::Arc;
+use std::time::Duration;
+
+#[cfg(feature = "builtin-whois")]
+use tokio::io::{AsyncReadExt, AsyncWriteExt};
+#[cfg(feature = "builtin-whois")]
+use tokio::net::TcpStream;
+
+// IANA RDAP bootstrap URL, set in Cargo.toml [package.metadata.hoardom]
+const RDAP_BOOTSTRAP_URL: &str = env!("HOARDOM_RDAP_BOOTSTRAP_URL");
+
+// TLD -> RDAP server map, grabbed once and reused
+pub struct RdapBootstrap {
+ tld_map: HashMap<String, String>,
+ raw_json: Option<String>,
+}
+
+impl RdapBootstrap {
+ pub async fn fetch(client: &reqwest::Client, verbose: bool) -> Result<Self, String> {
+ if verbose {
+ eprintln!("[verbose] Fetching RDAP bootstrap from {}", RDAP_BOOTSTRAP_URL);
+ }
+
+ let resp = client
+ .get(RDAP_BOOTSTRAP_URL)
+ .send()
+ .await
+ .map_err(|e| format!("Failed to fetch RDAP bootstrap: {}", e))?;
+
+ if !resp.status().is_success() {
+ return Err(format!("RDAP bootstrap returned HTTP {}", resp.status()));
+ }
+
+ let body = resp
+ .text()
+ .await
+ .map_err(|e| format!("Failed to read RDAP bootstrap body: {}", e))?;
+
+ let json: serde_json::Value = serde_json::from_str(&body)
+ .map_err(|e| format!("Failed to parse RDAP bootstrap JSON: {}", e))?;
+
+ let tld_map = Self::parse_bootstrap_json(&json);
+
+ if verbose {
+ eprintln!("[verbose] RDAP bootstrap loaded, {} TLDs mapped", tld_map.len());
+ }
+
+ Ok(Self { tld_map, raw_json: Some(body) })
+ }
+
+ pub fn load_cached(cache_path: &Path, verbose: bool) -> Result<Self, String> {
+ if verbose {
+ eprintln!("[verbose] Loading cached RDAP bootstrap from {}", cache_path.display());
+ }
+ let body = std::fs::read_to_string(cache_path)
+ .map_err(|e| format!("Could not read cached bootstrap: {}", e))?;
+ let json: serde_json::Value = serde_json::from_str(&body)
+ .map_err(|e| format!("Could not parse cached bootstrap: {}", e))?;
+ let tld_map = Self::parse_bootstrap_json(&json);
+ if verbose {
+ eprintln!("[verbose] Cached RDAP bootstrap loaded, {} TLDs mapped", tld_map.len());
+ }
+ Ok(Self { tld_map, raw_json: Some(body) })
+ }
+
+ pub fn save_cache(&self, cache_path: &Path) -> Result<(), String> {
+ if let Some(ref json) = self.raw_json {
+ if let Some(parent) = cache_path.parent() {
+ std::fs::create_dir_all(parent)
+ .map_err(|e| format!("Failed to create cache dir: {}", e))?;
+ }
+ std::fs::write(cache_path, json)
+ .map_err(|e| format!("Failed to write cache file: {}", e))?;
+ }
+ Ok(())
+ }
+
+ fn parse_bootstrap_json(json: &serde_json::Value) -> HashMap<String, String> {
+ let mut tld_map = HashMap::new();
+ // bootstrap format: { "services": [ [ ["tld1", "tld2"], ["https://rdap.server.example/"] ], ... ] }
+ if let Some(services) = json.get("services").and_then(|s| s.as_array()) {
+ for service in services {
+ if let Some(arr) = service.as_array() {
+ if arr.len() >= 2 {
+ let tlds = arr[0].as_array();
+ let urls = arr[1].as_array();
+ if let (Some(tlds), Some(urls)) = (tlds, urls) {
+ if let Some(base_url) = urls.first().and_then(|u| u.as_str()) {
+ let base = base_url.trim_end_matches('/').to_string();
+ for tld in tlds {
+ if let Some(t) = tld.as_str() {
+ tld_map.insert(t.to_lowercase(), base.clone());
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ tld_map
+ }
+
+ pub fn get_server(&self, tld: &str) -> Option<&String> {
+ self.tld_map.get(&tld.to_lowercase())
+ }
+}
+
+pub async fn lookup_domain(
+ client: &reqwest::Client,
+ bootstrap: &RdapBootstrap,
+ whois_overrides: &WhoisOverrides,
+ name: &str,
+ tld: &str,
+ verbose: bool,
+) -> DomainResult {
+ let full = format!("{}.{}", name, tld);
+
+ let base_url = match bootstrap.get_server(tld) {
+ Some(url) => url.clone(),
+ None => {
+ // no RDAP server for this TLD, fall back to WHOIS
+ if verbose {
+ eprintln!("[verbose] No RDAP server for {}, falling back to WHOIS", tld);
+ }
+ return whois_lookup(whois_overrides, name, tld, verbose).await;
+ }
+ };
+
+ let url = format!("{}/domain/{}", base_url, full);
+
+ if verbose {
+ eprintln!("[verbose] Looking up: {}", url);
+ }
+
+ let resp = match client.get(&url).send().await {
+ Ok(r) => r,
+ Err(e) => {
+ if verbose {
+ eprintln!("[verbose] Request error for {}: {}", full, e);
+ }
+ let kind = if e.is_timeout() {
+ ErrorKind::Timeout
+ } else {
+ ErrorKind::Unknown
+ };
+ return DomainResult::new(name, tld, DomainStatus::Error {
+ kind,
+ message: "unknown error".to_string(),
+ });
+ }
+ };
+
+ let status_code = resp.status();
+
+ if verbose {
+ eprintln!("[verbose] {} -> HTTP {}", full, status_code);
+ }
+
+ // 404 = not found in RDAP = domain is available (not registered)
+ if status_code == 404 {
+ return DomainResult::new(name, tld, DomainStatus::Available);
+ }
+
+ // 400 = probably invalid query
+ if status_code == 400 {
+ return DomainResult::new(name, tld, DomainStatus::Error {
+ kind: ErrorKind::InvalidTld,
+ message: "invalid tld".to_string(),
+ });
+ }
+
+ // 429 = rate limited
+ if status_code == 429 {
+ return DomainResult::new(name, tld, DomainStatus::Error {
+ kind: ErrorKind::RateLimit,
+ message: "rate limited".to_string(),
+ });
+ }
+
+ // 403 = forbidden (some registries block queries)
+ if status_code == 403 {
+ return DomainResult::new(name, tld, DomainStatus::Error {
+ kind: ErrorKind::Forbidden,
+ message: "forbidden".to_string(),
+ });
+ }
+
+ // anything else thats not success
+ if !status_code.is_success() {
+ return DomainResult::new(name, tld, DomainStatus::Error {
+ kind: ErrorKind::Unknown,
+ message: format!("HTTP {}", status_code),
+ });
+ }
+
+ // 200 = domain exists, try to parse expiry from RDAP json
+ let expiry = match resp.json::<serde_json::Value>().await {
+ Ok(json) => extract_expiry(&json),
+ Err(_) => None,
+ };
+
+ DomainResult::new(name, tld, DomainStatus::Registered { expiry })
+}
+
+fn extract_expiry(json: &serde_json::Value) -> Option<String> {
+ // RDAP stores events as an array, expiration is eventAction = "expiration"
+ if let Some(events) = json.get("events").and_then(|e| e.as_array()) {
+ for event in events {
+ if let Some(action) = event.get("eventAction").and_then(|a| a.as_str()) {
+ if action == "expiration" {
+ if let Some(date) = event.get("eventDate").and_then(|d| d.as_str()) {
+ // RDAP dates are ISO 8601, just grab the date part
+ return Some(date.chars().take(10).collect());
+ }
+ }
+ }
+ }
+ }
+ None
+}
+
+// ---- WHOIS fallback for TLDs not in RDAP bootstrap ----
+
+// -- No whois feature: just return an error --
+#[cfg(not(any(feature = "system-whois", feature = "builtin-whois")))]
+async fn whois_lookup(_whois_overrides: &WhoisOverrides, name: &str, tld: &str, _verbose: bool) -> DomainResult {
+ DomainResult::new(name, tld, DomainStatus::Error {
+ kind: ErrorKind::InvalidTld,
+ message: "no RDAP server (whois disabled)".to_string(),
+ })
+}
+
+// -- System whois: shells out to the systems whois binary --
+#[cfg(feature = "system-whois")]
+async fn whois_lookup(_whois_overrides: &WhoisOverrides, name: &str, tld: &str, verbose: bool) -> DomainResult {
+ let full = format!("{}.{}", name, tld);
+ let whois_cmd = env!("HOARDOM_WHOIS_CMD");
+ let whois_flags = env!("HOARDOM_WHOIS_FLAGS");
+
+ if verbose {
+ if whois_flags.is_empty() {
+ eprintln!("[verbose] System WHOIS: {} {}", whois_cmd, full);
+ } else {
+ eprintln!("[verbose] System WHOIS: {} {} {}", whois_cmd, whois_flags, full);
+ }
+ }
+
+ let mut cmd = tokio::process::Command::new(whois_cmd);
+ // add flags if any are configured
+ if !whois_flags.is_empty() {
+ for flag in whois_flags.split_whitespace() {
+ cmd.arg(flag);
+ }
+ }
+ cmd.arg(&full);
+
+ let output = match tokio::time::timeout(
+ Duration::from_secs(15),
+ cmd.output(),
+ ).await {
+ Ok(Ok(out)) => out,
+ Ok(Err(e)) => {
+ if verbose {
+ eprintln!("[verbose] System whois error for {}: {}", full, e);
+ }
+ return DomainResult::new(name, tld, DomainStatus::Error {
+ kind: ErrorKind::Unknown,
+ message: format!("whois command failed: {}", e),
+ });
+ }
+ Err(_) => {
+ if verbose {
+ eprintln!("[verbose] System whois timeout for {}", full);
+ }
+ return DomainResult::new(name, tld, DomainStatus::Error {
+ kind: ErrorKind::Timeout,
+ message: "whois timeout".to_string(),
+ });
+ }
+ };
+
+ let response_str = String::from_utf8_lossy(&output.stdout);
+
+ if verbose {
+ eprintln!("[verbose] WHOIS response for {} ({} bytes)", full, output.stdout.len());
+ }
+
+ if !output.status.success() {
+ let stderr = String::from_utf8_lossy(&output.stderr);
+ if verbose {
+ eprintln!("[verbose] whois stderr: {}", stderr.trim());
+ }
+ // some whois commands exit non-zero for "not found" but still give useful stdout
+ if !response_str.is_empty() {
+ return parse_whois_response(name, tld, &response_str);
+ }
+ return DomainResult::new(name, tld, DomainStatus::Error {
+ kind: ErrorKind::Unknown,
+ message: "whois command returned error".to_string(),
+ });
+ }
+
+ parse_whois_response(name, tld, &response_str)
+}
+
+// -- Builtin whois: raw TCP to whois servers directly --
+
+/// try a whois server, returns the response string or errors out
+#[cfg(feature = "builtin-whois")]
+async fn try_whois_server(server: &str, domain: &str, verbose: bool) -> Result<String, &'static str> {
+ let addr = format!("{}:43", server);
+
+ let stream = match tokio::time::timeout(
+ Duration::from_secs(4),
+ TcpStream::connect(&addr),
+ ).await {
+ Ok(Ok(s)) => s,
+ Ok(Err(_)) => return Err("connect error"),
+ Err(_) => return Err("connect timeout"),
+ };
+
+ if verbose {
+ eprintln!("[verbose] WHOIS connected: {} -> {}", domain, server);
+ }
+
+ let (mut reader, mut writer) = stream.into_split();
+
+ let query = format!("{}\r\n", domain);
+ if writer.write_all(query.as_bytes()).await.is_err() {
+ return Err("write error");
+ }
+
+ let mut response = Vec::new();
+ match tokio::time::timeout(
+ Duration::from_secs(8),
+ reader.read_to_end(&mut response),
+ ).await {
+ Ok(Ok(_)) => {}
+ Ok(Err(_)) => return Err("read error"),
+ Err(_) => return Err("read timeout"),
+ }
+
+ Ok(String::from_utf8_lossy(&response).to_string())
+}
+
+/// candidate whois servers for a TLD based on common naming patterns
+#[cfg(feature = "builtin-whois")]
+fn whois_candidates(tld: &str) -> Vec<String> {
+ // most registries follow one of these patterns
+ vec![
+ format!("whois.nic.{}", tld),
+ format!("whois.{}", tld),
+ format!("{}.whois-servers.net", tld),
+ ]
+}
+
+#[cfg(feature = "builtin-whois")]
+async fn whois_lookup(whois_overrides: &WhoisOverrides, name: &str, tld: &str, verbose: bool) -> DomainResult {
+ let full = format!("{}.{}", name, tld);
+
+ // if Lists.toml has an explicit server ("tld:server"), use ONLY that one
+ if let Some(server) = whois_overrides.get_server(tld) {
+ if verbose {
+ eprintln!("[verbose] WHOIS (override): {} -> {}", full, server);
+ }
+ return match try_whois_server(server, &full, verbose).await {
+ Ok(resp) if !resp.is_empty() => parse_whois_response(name, tld, &resp),
+ Ok(_) => DomainResult::new(name, tld, DomainStatus::Error {
+ kind: ErrorKind::Unknown,
+ message: "empty whois response".to_string(),
+ }),
+ Err(e) => DomainResult::new(name, tld, DomainStatus::Error {
+ kind: if e.contains("timeout") { ErrorKind::Timeout } else { ErrorKind::Unknown },
+ message: format!("whois {}: {}", server, e),
+ }),
+ };
+ }
+
+ // no override: try common server patterns until one responds
+ let candidates = whois_candidates(tld);
+
+ if verbose {
+ eprintln!("[verbose] WHOIS probing {} candidates for .{}", candidates.len(), tld);
+ }
+
+ for server in &candidates {
+ match try_whois_server(server, &full, verbose).await {
+ Ok(resp) if !resp.is_empty() => {
+ return parse_whois_response(name, tld, &resp);
+ }
+ Ok(_) => {
+ if verbose {
+ eprintln!("[verbose] WHOIS {} returned empty for {}", server, full);
+ }
+ }
+ Err(e) => {
+ if verbose {
+ eprintln!("[verbose] WHOIS {} failed for {}: {}", server, full, e);
+ }
+ }
+ }
+ }
+
+ // nothing worked
+ DomainResult::new(name, tld, DomainStatus::Error {
+ kind: ErrorKind::Unknown,
+ message: "no whois server reachable".to_string(),
+ })
+}
+
+fn parse_whois_response(name: &str, tld: &str, response: &str) -> DomainResult {
+ let lower = response.to_lowercase();
+
+ // common "not found" / "available" patterns across registrars
+ let available_patterns = [
+ "no match for",
+ "not found",
+ "no entries found",
+ "no data found",
+ "status: free",
+ "status: available",
+ "is free",
+ "no object found",
+ "object not found",
+ "nothing found",
+ "domain not found",
+ "no information available",
+ "we do not have an entry",
+ ];
+
+ for pattern in &available_patterns {
+ if lower.contains(pattern) {
+ return DomainResult::new(name, tld, DomainStatus::Available);
+ }
+ }
+
+ // if we got a response and it wasnt "not found", domain is probably registered
+ // try to extract expiry date
+ let expiry = extract_whois_expiry(&lower);
+
+ DomainResult::new(name, tld, DomainStatus::Registered { expiry })
+}
+
+fn extract_whois_expiry(response: &str) -> Option<String> {
+ let expiry_patterns = [
+ "expiry date:",
+ "expiration date:",
+ "registry expiry date:",
+ "registrar registration expiration date:",
+ "paid-till:",
+ "expires:",
+ "expire:",
+ "renewal date:",
+ "expires on:",
+ ];
+
+ for line in response.lines() {
+ let trimmed = line.trim().to_lowercase();
+ for pattern in &expiry_patterns {
+ if trimmed.starts_with(pattern) {
+ let value = trimmed[pattern.len()..].trim();
+ // try to extract a date-looking thing (first 10 chars if it looks like YYYY-MM-DD)
+ if value.len() >= 10 {
+ let date_part: String = value.chars().take(10).collect();
+ // basic sanity check: contains digits and dashes
+ if date_part.contains('-') && date_part.chars().any(|c| c.is_ascii_digit()) {
+ return Some(date_part);
+ }
+ }
+ // maybe its in a different format, just return what we got
+ if !value.is_empty() {
+ return Some(value.to_string());
+ }
+ }
+ }
+ }
+ None
+}
+
+pub async fn lookup_with_retry(
+ client: &reqwest::Client,
+ bootstrap: &RdapBootstrap,
+ whois_overrides: &WhoisOverrides,
+ name: &str,
+ tld: &str,
+ retries: u32,
+ noretry: &[ErrorKind],
+ verbose: bool,
+) -> DomainResult {
+ let mut result = lookup_domain(client, bootstrap, whois_overrides, name, tld, verbose).await;
+
+ for attempt in 1..=retries {
+ if !result.is_error() {
+ break;
+ }
+ // skip retry if the error kind is in the noretry list
+ if let DomainStatus::Error { kind, .. } = &result.status {
+ if noretry.contains(kind) {
+ if verbose {
+ eprintln!("[verbose] Not retrying {}.{} (error kind in noretry list)", name, tld);
+ }
+ break;
+ }
+ }
+ if verbose {
+ eprintln!("[verbose] Retry {}/{} for {}.{}", attempt, retries, name, tld);
+ }
+ tokio::time::sleep(Duration::from_millis(500)).await;
+ result = lookup_domain(client, bootstrap, whois_overrides, name, tld, verbose).await;
+ }
+
+ result
+}
+
+pub fn build_client() -> reqwest::Client {
+ reqwest::Client::builder()
+ .timeout(Duration::from_secs(10))
+ .user_agent(format!("hoardom/{}", env!("CARGO_PKG_VERSION")))
+ .build()
+ .expect("Failed to create HTTP client")
+}
+
+pub async fn lookup_all(
+ name: &str,
+ tlds: &[&str],
+ delay_secs: f64,
+ retries: u32,
+ verbose: bool,
+ cache_path: Option<&Path>,
+ force_refresh: bool,
+ jobs: u8,
+ whois_overrides: &WhoisOverrides,
+ noretry: &[ErrorKind],
+ on_progress: impl Fn(usize, usize),
+) -> Vec<DomainResult> {
+ let client = build_client();
+
+ let bootstrap = match resolve_bootstrap(&client, cache_path, force_refresh, verbose).await {
+ Some(b) => b,
+ None => return Vec::new(),
+ };
+
+ let total = tlds.len();
+ let concurrent = (jobs as usize).max(1);
+
+ if concurrent <= 1 {
+ // sequential path (original behaviour)
+ let mut results = Vec::with_capacity(total);
+ let delay = Duration::from_secs_f64(delay_secs);
+ for (i, tld) in tlds.iter().enumerate() {
+ let result = lookup_with_retry(&client, &bootstrap, whois_overrides, name, tld, retries, noretry, verbose).await;
+ results.push(result);
+ on_progress(i + 1, total);
+ if delay_secs > 0.0 && i + 1 < total {
+ tokio::time::sleep(delay).await;
+ }
+ }
+ results
+ } else {
+ // concurrent path
+ let bootstrap = Arc::new(bootstrap);
+ let client = Arc::new(client);
+ let whois_overrides = Arc::new(whois_overrides.clone());
+ let noretry = Arc::new(noretry.to_vec());
+ let name_owned = name.to_string();
+
+ let mut stream = stream::iter(tlds.iter().enumerate())
+ .map(|(i, tld)| {
+ let client = Arc::clone(&client);
+ let bootstrap = Arc::clone(&bootstrap);
+ let whois_overrides = Arc::clone(&whois_overrides);
+ let noretry = Arc::clone(&noretry);
+ let name = name_owned.clone();
+ let tld = tld.to_string();
+ async move {
+ let result = lookup_with_retry(&client, &bootstrap, &whois_overrides, &name, &tld, retries, &noretry, verbose).await;
+ (i, result)
+ }
+ })
+ .buffer_unordered(concurrent);
+
+ let mut results: Vec<(usize, DomainResult)> = Vec::with_capacity(total);
+ let mut done_count = 0usize;
+ while let Some(item) = stream.next().await {
+ results.push(item);
+ done_count += 1;
+ on_progress(done_count, total);
+ }
+
+ // sort by original order
+ results.sort_by_key(|(i, _)| *i);
+
+ results.into_iter().map(|(_, r)| r).collect()
+ }
+}
+
+pub async fn refresh_cache(cache_path: &Path, verbose: bool) -> Result<(), String> {
+ let client = build_client();
+ let bootstrap = RdapBootstrap::fetch(&client, verbose).await?;
+ bootstrap.save_cache(cache_path)?;
+ eprintln!("RDAP bootstrap cache refreshed ({} TLDs)", bootstrap.tld_map.len());
+ Ok(())
+}
+
+async fn resolve_bootstrap(
+ client: &reqwest::Client,
+ cache_path: Option<&Path>,
+ force_refresh: bool,
+ verbose: bool,
+) -> Option<RdapBootstrap> {
+ // try loading bootstrap from cache first (unless force refresh)
+ let cached = if !force_refresh {
+ if let Some(cp) = cache_path {
+ if cp.exists() {
+ match RdapBootstrap::load_cached(cp, verbose) {
+ Ok(b) => Some(b),
+ Err(e) => {
+ if verbose {
+ eprintln!("[verbose] Cache load failed: {}, fetching fresh", e);
+ }
+ None
+ }
+ }
+ } else {
+ None
+ }
+ } else {
+ None
+ }
+ } else {
+ if verbose {
+ eprintln!("[verbose] Force refresh requested, skipping cache");
+ }
+ None
+ };
+
+ match cached {
+ Some(b) => Some(b),
+ None => {
+ match RdapBootstrap::fetch(client, verbose).await {
+ Ok(b) => {
+ if let Some(cp) = cache_path {
+ if let Err(e) = b.save_cache(cp) {
+ if verbose {
+ eprintln!("[verbose] Failed to save cache: {}", e);
+ }
+ } else if verbose {
+ eprintln!("[verbose] RDAP bootstrap cached to {}", cp.display());
+ }
+ }
+ Some(b)
+ }
+ Err(e) => {
+ eprintln!("Error: {}", e);
+ eprintln!("Cannot perform lookups without RDAP bootstrap data.");
+ None
+ }
+ }
+ }
+ }
+}
+
+pub enum StreamMsg {
+ Result { result: DomainResult, sort_index: usize },
+ Progress { current: usize, total: usize },
+ Error(String),
+ Done,
+}
+
+pub struct LookupStream {
+ pub receiver: tokio::sync::mpsc::Receiver<StreamMsg>,
+ pub handle: tokio::task::JoinHandle<()>,
+}
+
+pub type LookupBatch = Vec<(String, Vec<String>)>;
+
+// spawns a bg task, sends results via channel so TUI gets em live
+pub fn lookup_streaming(
+ name: String,
+ tlds: Vec<String>,
+ delay_secs: f64,
+ retries: u32,
+ verbose: bool,
+ cache_path: Option<PathBuf>,
+ force_refresh: bool,
+ jobs: u8,
+ whois_overrides: WhoisOverrides,
+ noretry: Vec<ErrorKind>,
+) -> LookupStream {
+ let (tx, rx) = tokio::sync::mpsc::channel(64);
+
+ let handle = tokio::spawn(async move {
+ let client = build_client();
+
+ let bootstrap = match resolve_bootstrap(
+ &client,
+ cache_path.as_deref(),
+ force_refresh,
+ verbose,
+ ).await {
+ Some(b) => b,
+ None => {
+ let _ = tx.send(StreamMsg::Error("Failed to load RDAP bootstrap".to_string())).await;
+ let _ = tx.send(StreamMsg::Done).await;
+ return;
+ }
+ };
+
+ let total = tlds.len();
+ let concurrent = (jobs as usize).max(1);
+
+ if concurrent <= 1 {
+ let delay = Duration::from_secs_f64(delay_secs);
+ for (i, tld) in tlds.iter().enumerate() {
+ let result = lookup_with_retry(&client, &bootstrap, &whois_overrides, &name, tld, retries, &noretry, verbose).await;
+ let _ = tx.send(StreamMsg::Result { result, sort_index: i }).await;
+ let _ = tx.send(StreamMsg::Progress { current: i + 1, total }).await;
+ if delay_secs > 0.0 && i + 1 < total {
+ tokio::time::sleep(delay).await;
+ }
+ }
+ } else {
+ let bootstrap = Arc::new(bootstrap);
+ let client = Arc::new(client);
+ let whois_overrides = Arc::new(whois_overrides);
+ let noretry = Arc::new(noretry);
+ let tx2 = tx.clone();
+
+ let mut stream = stream::iter(tlds.into_iter().enumerate())
+ .map(|(idx, tld)| {
+ let client = Arc::clone(&client);
+ let bootstrap = Arc::clone(&bootstrap);
+ let whois_overrides = Arc::clone(&whois_overrides);
+ let noretry = Arc::clone(&noretry);
+ let name = name.clone();
+ async move {
+ let result = lookup_with_retry(&client, &bootstrap, &whois_overrides, &name, &tld, retries, &noretry, verbose).await;
+ (idx, result)
+ }
+ })
+ .buffer_unordered(concurrent);
+
+ let mut done_count = 0usize;
+ while let Some((idx, result)) = stream.next().await {
+ done_count += 1;
+ let _ = tx2.send(StreamMsg::Result { result, sort_index: idx }).await;
+ let _ = tx2.send(StreamMsg::Progress { current: done_count, total }).await;
+ }
+ }
+
+ let _ = tx.send(StreamMsg::Done).await;
+ });
+
+ LookupStream {
+ receiver: rx,
+ handle,
+ }
+}
+
+pub fn lookup_many_streaming(
+ batches: LookupBatch,
+ delay_secs: f64,
+ retries: u32,
+ verbose: bool,
+ cache_path: Option<PathBuf>,
+ force_refresh: bool,
+ jobs: u8,
+ whois_overrides: WhoisOverrides,
+ noretry: Vec<ErrorKind>,
+) -> LookupStream {
+ if batches.len() == 1 {
+ let (name, tlds) = batches.into_iter().next().unwrap();
+ return lookup_streaming(name, tlds, delay_secs, retries, verbose, cache_path, force_refresh, jobs, whois_overrides, noretry);
+ }
+
+ let (tx, rx) = tokio::sync::mpsc::channel(64);
+
+ let handle = tokio::spawn(async move {
+ let client = build_client();
+
+ let bootstrap = match resolve_bootstrap(
+ &client,
+ cache_path.as_deref(),
+ force_refresh,
+ verbose,
+ ).await {
+ Some(b) => b,
+ None => {
+ let _ = tx.send(StreamMsg::Error("Failed to load RDAP bootstrap".to_string())).await;
+ let _ = tx.send(StreamMsg::Done).await;
+ return;
+ }
+ };
+
+ let total: usize = batches.iter().map(|(_, tlds)| tlds.len()).sum();
+ let concurrent = (jobs as usize).max(1);
+
+ if concurrent <= 1 {
+ let delay = Duration::from_secs_f64(delay_secs);
+ let mut current = 0usize;
+ let mut global_idx = 0usize;
+ for (name, tlds) in batches {
+ for tld in tlds {
+ let result = lookup_with_retry(&client, &bootstrap, &whois_overrides, &name, &tld, retries, &noretry, verbose).await;
+ current += 1;
+ let _ = tx.send(StreamMsg::Result { result, sort_index: global_idx }).await;
+ let _ = tx.send(StreamMsg::Progress { current, total }).await;
+ if delay_secs > 0.0 && current < total {
+ tokio::time::sleep(delay).await;
+ }
+ global_idx += 1;
+ }
+ }
+ } else {
+ let bootstrap = Arc::new(bootstrap);
+ let client = Arc::new(client);
+ let whois_overrides = Arc::new(whois_overrides);
+ let noretry = Arc::new(noretry);
+ let tx2 = tx.clone();
+
+ // flatten all (name, tld) pairs with their global index
+ let pairs: Vec<(usize, String, String)> = batches
+ .into_iter()
+ .flat_map(|(name, tlds)| tlds.into_iter().map(move |tld| (name.clone(), tld)))
+ .enumerate()
+ .map(|(idx, (name, tld))| (idx, name, tld))
+ .collect();
+
+ let mut stream = stream::iter(pairs.into_iter())
+ .map(|(idx, name, tld)| {
+ let client = Arc::clone(&client);
+ let bootstrap = Arc::clone(&bootstrap);
+ let whois_overrides = Arc::clone(&whois_overrides);
+ let noretry = Arc::clone(&noretry);
+ async move {
+ let result = lookup_with_retry(&client, &bootstrap, &whois_overrides, &name, &tld, retries, &noretry, verbose).await;
+ (idx, result)
+ }
+ })
+ .buffer_unordered(concurrent);
+
+ let mut done_count = 0usize;
+ while let Some((idx, result)) = stream.next().await {
+ done_count += 1;
+ let _ = tx2.send(StreamMsg::Result { result, sort_index: idx }).await;
+ let _ = tx2.send(StreamMsg::Progress { current: done_count, total }).await;
+ }
+ }
+
+ let _ = tx.send(StreamMsg::Done).await;
+ });
+
+ LookupStream { receiver: rx, handle }
+}