Skip to content

Instantly share code, notes, and snippets.

@itzmeanjan
Created February 22, 2025 07:08
Show Gist options
  • Save itzmeanjan/9fcd04f3f30e10b21cb9c66073d18c49 to your computer and use it in GitHub Desktop.
Save itzmeanjan/9fcd04f3f30e10b21cb9c66073d18c49 to your computer and use it in GitHub Desktop.
Populate Ethereum Name Service (ENS) JSON Database from List of Labels

How to ?

  1. Download ENS labels JSON database from https://github.com/adraffy/ens-labels.git (retrieved at commit d1d85346).
  2. Download this gist and unzip. Then move the downloaded labels.json file inside the unzipped project directory.
  3. Inside unzipped project directory, create a Python virtual environment.
python -m venv .
source bin/activate
  1. Download dependencies using pip.
pip intall -r requirements.txt
  1. Run the Python script to populate a JSON database ens_kv.json.
python populate.py # When you want to stop it, just interrupt by pressing Ctrl+C.
  1. Find the results written to ens_kv.json file.

Note

When populating the JSON database ens_kv.json fetched key-value entries are stored in-memory until the very end of program execution, so over time memory usage of this program will keep increasing.

import asyncio
import aiohttp
import json
import psutil
import multiprocessing
LABELS_FILE_PATH = './labels.json'
BASE_URL = 'https://api.ensdata.net'
EXTRA_OPTIONS = '?farcaster=true?expiry=true'
RESULT_JSON_FILE_PATH = './ens_kv.json'
DEFAULT_BATCH_SIZE = 50
def get_recommended_batch_size(default=DEFAULT_BATCH_SIZE):
cpu_count = multiprocessing.cpu_count()
mem = psutil.virtual_memory()
available_mem_gb = mem.available / (2**30)
assumed_mem_per_task = 0.01
mem_based_limit = int(available_mem_gb / assumed_mem_per_task)
recommended = min(default * cpu_count, mem_based_limit, default * 2)
print(f"Machine configuration: {cpu_count} CPUs, {available_mem_gb:.2f} GB available memory")
print(f"Recommended concurrency limit: {recommended}")
return recommended
def has_empty_characters(s):
return any(char.isspace() for char in s)
async def fetch_label(session, label: str):
ens_domain_name = f'{label}.eth'
url = f'{BASE_URL}/{ens_domain_name}{EXTRA_OPTIONS}'
try:
async with session.get(url, allow_redirects=True) as response:
print(f"Processing label: '{ens_domain_name}'", end='\t')
if response.ok:
data = await response.json()
print("✅")
return label, data
else:
print(f"🥲 {response.status}")
return label, None
except Exception as e:
print(f"Error processing {ens_domain_name}: {e}")
return label, None
def chunks(lst, batch_size):
"""Yield successive batch_size-sized chunks from lst."""
for i in range(0, len(lst), batch_size):
yield lst[i:i + batch_size]
async def main_async(result_dict):
# Load and filter labels from file
try:
with open(LABELS_FILE_PATH) as fd:
all_labels = json.load(fd)
except Exception as e:
print(f"Error reading {LABELS_FILE_PATH}: {e}")
return
filtered_labels = [
label for label in all_labels
if 3 <= len(label) <= 16 and not has_empty_characters(label)
]
batch_size = get_recommended_batch_size() # Adjust the batch size if needed
number_of_domains = 0
async with aiohttp.ClientSession() as session:
try:
for batch_index, label_batch in enumerate(chunks(filtered_labels, batch_size), start=1):
print(f"\nProcessing batch {batch_index} with {len(label_batch)} labels, so far {number_of_domains} domains")
tasks = [fetch_label(session, label) for label in label_batch]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"Task resulted in exception: {result}")
else:
label, data = result
if data is not None:
result_dict[label] = data
number_of_domains += 1
except KeyboardInterrupt:
print("\nKeyboard interrupt detected. Stopping further processing.")
return result_dict
def write_results(result_dict):
try:
with open(RESULT_JSON_FILE_PATH, 'w+') as result_fd:
json.dump(result_dict, result_fd, indent=2)
print(f"\nWrote all entries to JSON file: {RESULT_JSON_FILE_PATH}")
except Exception as e:
print(f"Error writing to {RESULT_JSON_FILE_PATH}: {e}")
if __name__ == '__main__':
result_dict = {}
try:
result_dict = asyncio.run(main_async(result_dict))
except KeyboardInterrupt:
print("\nKeyboard interrupt caught during asyncio.run execution.")
finally:
write_results(result_dict)
exit(0)
aiohttp==3.11.12
psutil==7.0.0
@itzmeanjan
Copy link
Author

Thanks to https://ensdata.net/ for setting up the endpoints 🙏 .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment