|
import asyncio |
|
import aiohttp |
|
import json |
|
import psutil |
|
import multiprocessing |
|
|
|
LABELS_FILE_PATH = './labels.json' |
|
BASE_URL = 'https://api.ensdata.net' |
|
EXTRA_OPTIONS = '?farcaster=true?expiry=true' |
|
RESULT_JSON_FILE_PATH = './ens_kv.json' |
|
DEFAULT_BATCH_SIZE = 50 |
|
|
|
def get_recommended_batch_size(default=DEFAULT_BATCH_SIZE): |
|
cpu_count = multiprocessing.cpu_count() |
|
|
|
mem = psutil.virtual_memory() |
|
available_mem_gb = mem.available / (2**30) |
|
|
|
assumed_mem_per_task = 0.01 |
|
mem_based_limit = int(available_mem_gb / assumed_mem_per_task) |
|
|
|
recommended = min(default * cpu_count, mem_based_limit, default * 2) |
|
|
|
print(f"Machine configuration: {cpu_count} CPUs, {available_mem_gb:.2f} GB available memory") |
|
print(f"Recommended concurrency limit: {recommended}") |
|
return recommended |
|
|
|
def has_empty_characters(s): |
|
return any(char.isspace() for char in s) |
|
|
|
async def fetch_label(session, label: str): |
|
ens_domain_name = f'{label}.eth' |
|
url = f'{BASE_URL}/{ens_domain_name}{EXTRA_OPTIONS}' |
|
try: |
|
async with session.get(url, allow_redirects=True) as response: |
|
print(f"Processing label: '{ens_domain_name}'", end='\t') |
|
if response.ok: |
|
data = await response.json() |
|
print("✅") |
|
return label, data |
|
else: |
|
print(f"🥲 {response.status}") |
|
return label, None |
|
except Exception as e: |
|
print(f"Error processing {ens_domain_name}: {e}") |
|
return label, None |
|
|
|
def chunks(lst, batch_size): |
|
"""Yield successive batch_size-sized chunks from lst.""" |
|
for i in range(0, len(lst), batch_size): |
|
yield lst[i:i + batch_size] |
|
|
|
async def main_async(result_dict): |
|
# Load and filter labels from file |
|
try: |
|
with open(LABELS_FILE_PATH) as fd: |
|
all_labels = json.load(fd) |
|
except Exception as e: |
|
print(f"Error reading {LABELS_FILE_PATH}: {e}") |
|
return |
|
|
|
filtered_labels = [ |
|
label for label in all_labels |
|
if 3 <= len(label) <= 16 and not has_empty_characters(label) |
|
] |
|
|
|
batch_size = get_recommended_batch_size() # Adjust the batch size if needed |
|
number_of_domains = 0 |
|
|
|
async with aiohttp.ClientSession() as session: |
|
try: |
|
for batch_index, label_batch in enumerate(chunks(filtered_labels, batch_size), start=1): |
|
print(f"\nProcessing batch {batch_index} with {len(label_batch)} labels, so far {number_of_domains} domains") |
|
tasks = [fetch_label(session, label) for label in label_batch] |
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
for result in results: |
|
if isinstance(result, Exception): |
|
print(f"Task resulted in exception: {result}") |
|
else: |
|
label, data = result |
|
if data is not None: |
|
result_dict[label] = data |
|
number_of_domains += 1 |
|
except KeyboardInterrupt: |
|
print("\nKeyboard interrupt detected. Stopping further processing.") |
|
|
|
return result_dict |
|
|
|
def write_results(result_dict): |
|
try: |
|
with open(RESULT_JSON_FILE_PATH, 'w+') as result_fd: |
|
json.dump(result_dict, result_fd, indent=2) |
|
print(f"\nWrote all entries to JSON file: {RESULT_JSON_FILE_PATH}") |
|
except Exception as e: |
|
print(f"Error writing to {RESULT_JSON_FILE_PATH}: {e}") |
|
|
|
if __name__ == '__main__': |
|
result_dict = {} |
|
|
|
try: |
|
result_dict = asyncio.run(main_async(result_dict)) |
|
except KeyboardInterrupt: |
|
print("\nKeyboard interrupt caught during asyncio.run execution.") |
|
finally: |
|
write_results(result_dict) |
|
exit(0) |
Thanks to https://ensdata.net/ for setting up the endpoints 🙏 .