Created
February 1, 2025 18:54
-
-
Save WalBeh/89d246bd704d18185137f6d8196592e2 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# /// script | |
# requires-python = ">=3.12" | |
# dependencies = [ | |
# "aiodns", | |
# "argparse", | |
# "asyncio", | |
# "loguru", | |
# ] | |
# /// | |
import asyncio | |
import aiodns | |
import argparse | |
import datetime | |
from loguru import logger | |
from collections import defaultdict | |
import sys | |
logger.remove() | |
logger.add( | |
sys.stderr, | |
format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{message}</cyan>" | |
) | |
class ContinuousDNSResolver: | |
def __init__(self, nameservers): | |
self.nameservers = nameservers | |
self.resolution_times = {} | |
self.attempt_counts = {ns: 0 for ns in nameservers} # Track attempt counts | |
self.results = defaultdict(list) | |
async def monitor_nameserver(self, nameserver, domain, record_type='A'): | |
resolver = aiodns.DNSResolver(nameservers=[nameserver]) | |
while nameserver not in self.resolution_times: | |
self.attempt_counts[nameserver] += 1 # Increment attempt counter | |
try: | |
result = await resolver.query(domain, record_type) | |
ips = [ip.host for ip in result] | |
logger.success(f"β {nameserver} resolved {domain} to {ips} (attempt {self.attempt_counts[nameserver]})") | |
if nameserver not in self.resolution_times: | |
self.resolution_times[nameserver] = datetime.datetime.now() | |
self.results[tuple(ips)].append(nameserver) | |
break | |
except aiodns.error.DNSError as e: | |
logger.warning(f"β οΈ {nameserver} failed (attempt {self.attempt_counts[nameserver]}): {e.__class__.__name__}") | |
await asyncio.sleep(1) | |
except Exception as e: | |
logger.error(f"π₯ {nameserver} error (attempt {self.attempt_counts[nameserver]}): {str(e)}") | |
await asyncio.sleep(5) | |
async def start_monitoring(self, domain, record_type='A'): | |
tasks = [] | |
for ns in self.nameservers: | |
task = asyncio.create_task(self.monitor_nameserver(ns, domain, record_type)) | |
tasks.append(task) | |
await asyncio.gather(*tasks) | |
def print_report(self): | |
logger.info("\nπ Final Resolution Report:") | |
for nameserver in self.nameservers: | |
if nameserver in self.resolution_times: | |
timestamp = self.resolution_times[nameserver].strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] | |
attempts = self.attempt_counts[nameserver] | |
logger.info(f" {nameserver:<15} resolved at {timestamp} (after {attempts} attempts)") | |
else: | |
attempts = self.attempt_counts[nameserver] | |
logger.error(f" {nameserver:<15} never resolved (attempted {attempts} times)") | |
async def main(domain): | |
nameservers = [ | |
"8.8.8.8", # Google | |
"1.1.1.1", # Cloudflare | |
"9.9.9.9", # Quad9 | |
"208.67.222.222", # OpenDNS | |
"8.26.56.26", # Comodo | |
"100.100.100.100", # tailscale | |
"192.168.1.1", # inet router | |
"205.251.195.102" # AWS | |
] | |
logger.info(f"π Starting continuous DNS monitoring for {domain}") | |
resolver = ContinuousDNSResolver(nameservers) | |
await resolver.start_monitoring(domain) | |
resolver.print_report() | |
if __name__ == "__main__": | |
parser = argparse.ArgumentParser(description='Continuous DNS resolution monitor') | |
parser.add_argument('-d', '--domain', required=True, help='Domain to monitor') | |
args = parser.parse_args() | |
try: | |
asyncio.run(main(args.domain)) | |
except KeyboardInterrupt: | |
logger.info("\nπ Monitoring stopped by user") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
uv run check-dns-resolvers.py --domain red-taun-....net
--- NS in the script is the AUTH for the eks1-dev zone!