Created
August 7, 2025 16:28
-
-
Save HackingLZ/29a92e6a9e21c24f2e7ebfe20469d8a9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
M365 OSINT Reconnaissance Tool | |
Based on techniques from: https://dstreefkerk.github.io/2025-07-m365-email-osint-after-lockdown/ | |
This script performs modern M365/Azure AD reconnaissance after Microsoft's lockdown of traditional | |
enumeration methods. It uses multiple validation techniques to discover organizational information | |
and attempts to infer MOERA domains. | |
""" | |
import requests | |
import dns.resolver | |
import xml.etree.ElementTree as ET | |
import re | |
import sys | |
import argparse | |
from urllib.parse import quote | |
import json | |
from typing import Dict, List, Optional, Tuple | |
import time | |
class M365Recon: | |
def __init__(self): | |
self.session = requests.Session() | |
self.session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' | |
}) | |
def get_user_realm_info(self, domain: str) -> Dict: | |
""" | |
Query GetUserRealm endpoint to extract organization information | |
""" | |
url = f"https://login.microsoftonline.com/getuserrealm.srf?login=admin@{domain}&xml=1" | |
try: | |
response = self.session.get(url, timeout=10) | |
response.raise_for_status() | |
# Parse XML response | |
root = ET.fromstring(response.text) | |
result = { | |
'domain': domain, | |
'organization_name': root.find('.//FederationBrandName').text if root.find('.//FederationBrandName') is not None else None, | |
'namespace_type': root.find('.//NameSpaceType').text if root.find('.//NameSpaceType') is not None else None, | |
'is_federated': False, | |
'auth_url': None, | |
'tenant_exists': True | |
} | |
if result['namespace_type'] == 'Federated': | |
result['is_federated'] = True | |
auth_url_elem = root.find('.//AuthURL') | |
if auth_url_elem is not None: | |
result['auth_url'] = auth_url_elem.text | |
return result | |
except requests.exceptions.RequestException as e: | |
print(f"[!] Error querying GetUserRealm for {domain}: {e}") | |
return {'domain': domain, 'tenant_exists': False} | |
except ET.ParseError as e: | |
print(f"[!] Error parsing XML response for {domain}: {e}") | |
return {'domain': domain, 'tenant_exists': False} | |
def get_oidc_metadata(self, domain: str) -> Dict: | |
""" | |
Query OIDC metadata endpoint to extract tenant ID | |
""" | |
url = f"https://login.microsoftonline.com/{domain}/v2.0/.well-known/openid-configuration" | |
try: | |
response = self.session.get(url, timeout=10) | |
response.raise_for_status() | |
metadata = response.json() | |
issuer = metadata.get('issuer', '') | |
# Extract tenant ID from issuer URL | |
tenant_id_match = re.search(r'https://login\.microsoftonline\.com/([^/]+)/v2\.0', issuer) | |
tenant_id = tenant_id_match.group(1) if tenant_id_match else None | |
return { | |
'domain': domain, | |
'tenant_id': tenant_id, | |
'issuer': issuer, | |
'oidc_available': True | |
} | |
except requests.exceptions.RequestException: | |
return {'domain': domain, 'oidc_available': False} | |
except json.JSONDecodeError: | |
return {'domain': domain, 'oidc_available': False} | |
def check_eop_smart_host(self, domain: str) -> bool: | |
""" | |
Check if domain has Exchange Online Protection smart host configured | |
""" | |
smart_host = domain.replace('.', '-') + '.mail.protection.outlook.com' | |
try: | |
dns.resolver.resolve(smart_host, 'A') | |
return True | |
except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer, Exception): | |
return False | |
def check_m365_dns_indicators(self, domain: str) -> Dict: | |
""" | |
Check various DNS indicators for M365 usage | |
""" | |
indicators = { | |
'msoid': False, | |
'txt_verification': False, | |
'autodiscover': False, | |
'enterprise_reg': False, | |
'dkim_selectors': False, | |
'mx_records': [], | |
'spf_record': None | |
} | |
# Check MSOID | |
try: | |
answers = dns.resolver.resolve(f'msoid.{domain}', 'CNAME') | |
for answer in answers: | |
if 'clientconfig.microsoftonline' in str(answer): | |
indicators['msoid'] = True | |
break | |
except: | |
pass | |
# Check TXT records for MS verification | |
try: | |
answers = dns.resolver.resolve(domain, 'TXT') | |
for answer in answers: | |
txt_string = str(answer).strip('"') | |
if txt_string.startswith('MS=ms'): | |
indicators['txt_verification'] = True | |
elif txt_string.startswith('v=spf1'): | |
indicators['spf_record'] = txt_string | |
except: | |
pass | |
# Check Autodiscover | |
try: | |
answers = dns.resolver.resolve(f'autodiscover.{domain}', 'CNAME') | |
for answer in answers: | |
if str(answer) == 'autodiscover.outlook.com.': | |
indicators['autodiscover'] = True | |
break | |
except: | |
pass | |
# Check Enterprise Registration | |
try: | |
answers = dns.resolver.resolve(f'enterpriseregistration.{domain}', 'CNAME') | |
for answer in answers: | |
if str(answer) == 'enterpriseregistration.windows.net.': | |
indicators['enterprise_reg'] = True | |
break | |
except: | |
pass | |
# Check DKIM selectors | |
for selector in ['selector1', 'selector2']: | |
try: | |
dns.resolver.resolve(f'{selector}._domainkey.{domain}', 'CNAME') | |
indicators['dkim_selectors'] = True | |
break | |
except: | |
continue | |
# Check MX records | |
try: | |
answers = dns.resolver.resolve(domain, 'MX') | |
for answer in answers: | |
indicators['mx_records'].append(str(answer.exchange)) | |
except: | |
pass | |
return indicators | |
def generate_moera_candidates(self, org_name: str, domain: str) -> List[Tuple[str, str, int]]: | |
""" | |
Generate MOERA domain candidates based on organization name and domain | |
Returns list of (candidate_domain, generation_method, confidence_score) | |
""" | |
candidates = [] | |
if not org_name: | |
return candidates | |
# Normalize organization name | |
normalized = re.sub(r'[^a-zA-Z0-9\s]', '', org_name).lower() | |
normalized = re.sub(r'\s+', ' ', normalized).strip() | |
# Remove common corporate suffixes | |
suffixes_to_remove = [ | |
'corporation', 'corp', 'company', 'co', 'limited', 'ltd', 'llc', | |
'inc', 'incorporated', 'pty', 'group', 'holdings', 'international', | |
'global', 'worldwide', 'enterprises', 'solutions', 'services' | |
] | |
words = normalized.split() | |
filtered_words = [w for w in words if w not in suffixes_to_remove] | |
if not filtered_words: | |
filtered_words = words # Fallback to original if we filtered everything | |
# Method 1: Single word (highest confidence) | |
if len(filtered_words) == 1: | |
candidates.append((f"{filtered_words[0]}.onmicrosoft.com", "single_word", 90)) | |
# Method 2: Concatenated words (high confidence) | |
if len(filtered_words) > 1: | |
concatenated = ''.join(filtered_words) | |
candidates.append((f"{concatenated}.onmicrosoft.com", "concatenated", 80)) | |
# Method 3: Acronym from organization name (medium confidence) | |
if len(filtered_words) > 1: | |
acronym = ''.join([w[0] for w in filtered_words if w]) | |
if len(acronym) >= 2: | |
candidates.append((f"{acronym}.onmicrosoft.com", "acronym", 60)) | |
# Method 4: Domain-based candidates (medium confidence) | |
domain_base = domain.split('.')[0] | |
candidates.append((f"{domain_base}.onmicrosoft.com", "domain_based", 70)) | |
# Method 5: First word only (lower confidence) | |
if len(filtered_words) > 1: | |
candidates.append((f"{filtered_words[0]}.onmicrosoft.com", "first_word", 50)) | |
# Remove duplicates while preserving highest confidence scores | |
unique_candidates = {} | |
for candidate, method, confidence in candidates: | |
if candidate not in unique_candidates or confidence > unique_candidates[candidate][1]: | |
unique_candidates[candidate] = (method, confidence) | |
return [(domain, method, confidence) for domain, (method, confidence) in unique_candidates.items()] | |
def validate_moera_domain(self, candidate: str) -> bool: | |
""" | |
Validate MOERA domain by checking MX records | |
""" | |
try: | |
answers = dns.resolver.resolve(candidate, 'MX') | |
for answer in answers: | |
if 'mail.protection.outlook.com' in str(answer.exchange): | |
return True | |
return False | |
except: | |
return False | |
def assess_m365_confidence(self, indicators: Dict, eop_smart_host: bool) -> str: | |
""" | |
Assess M365 usage confidence based on various indicators | |
""" | |
if indicators['autodiscover'] or indicators['dkim_selectors'] or eop_smart_host: | |
return "Yes" | |
elif indicators['enterprise_reg']: | |
return "Likely" | |
elif indicators['msoid'] or indicators['txt_verification']: | |
return "Possibly" | |
elif any('outlook.com' in mx for mx in indicators['mx_records']): | |
return "Likely" | |
else: | |
return "No" | |
def analyze_domain(self, domain: str) -> Dict: | |
""" | |
Perform comprehensive analysis of a domain | |
""" | |
print(f"\n[*] Analyzing domain: {domain}") | |
results = { | |
'domain': domain, | |
'user_realm': {}, | |
'oidc_metadata': {}, | |
'dns_indicators': {}, | |
'eop_smart_host': False, | |
'm365_confidence': 'No', | |
'moera_candidates': [], | |
'validated_moera': None | |
} | |
# Get user realm information | |
print("[*] Querying GetUserRealm...") | |
results['user_realm'] = self.get_user_realm_info(domain) | |
# Get OIDC metadata | |
print("[*] Querying OIDC metadata...") | |
results['oidc_metadata'] = self.get_oidc_metadata(domain) | |
# Check DNS indicators | |
print("[*] Checking DNS indicators...") | |
results['dns_indicators'] = self.check_m365_dns_indicators(domain) | |
# Check EOP smart host | |
print("[*] Checking EOP smart host...") | |
results['eop_smart_host'] = self.check_eop_smart_host(domain) | |
# Assess M365 confidence | |
results['m365_confidence'] = self.assess_m365_confidence( | |
results['dns_indicators'], | |
results['eop_smart_host'] | |
) | |
# Generate and validate MOERA candidates | |
if results['user_realm'].get('organization_name'): | |
print("[*] Generating MOERA candidates...") | |
candidates = self.generate_moera_candidates( | |
results['user_realm']['organization_name'], | |
domain | |
) | |
validated_candidates = [] | |
for candidate, method, confidence in candidates: | |
print(f"[*] Validating MOERA candidate: {candidate}") | |
if self.validate_moera_domain(candidate): | |
validated_candidates.append((candidate, method, confidence, True)) | |
if not results['validated_moera'] or confidence > results['validated_moera'][2]: | |
results['validated_moera'] = (candidate, method, confidence, True) | |
else: | |
validated_candidates.append((candidate, method, confidence, False)) | |
# Add small delay to avoid overwhelming DNS servers | |
time.sleep(0.1) | |
results['moera_candidates'] = validated_candidates | |
return results | |
def print_results(self, results: Dict): | |
""" | |
Print formatted results | |
""" | |
domain = results['domain'] | |
print(f"\n{'='*60}") | |
print(f"ANALYSIS RESULTS FOR: {domain}") | |
print(f"{'='*60}") | |
# Basic tenant information | |
user_realm = results['user_realm'] | |
if user_realm.get('tenant_exists'): | |
print(f"\n[+] TENANT INFORMATION") | |
print(f" Organization Name: {user_realm.get('organization_name', 'N/A')}") | |
print(f" Federated: {'Yes' if user_realm.get('is_federated') else 'No'}") | |
if user_realm.get('auth_url'): | |
print(f" Auth URL: {user_realm['auth_url']}") | |
else: | |
print(f"\n[-] No Azure AD tenant found for {domain}") | |
return | |
# OIDC metadata | |
oidc = results['oidc_metadata'] | |
if oidc.get('oidc_available'): | |
print(f"\n[+] OIDC METADATA") | |
print(f" Tenant ID: {oidc.get('tenant_id', 'N/A')}") | |
print(f" Issuer: {oidc.get('issuer', 'N/A')}") | |
# M365 confidence assessment | |
print(f"\n[+] M365 USAGE ASSESSMENT") | |
print(f" Confidence Level: {results['m365_confidence']}") | |
# DNS indicators | |
indicators = results['dns_indicators'] | |
print(f"\n[+] DNS INDICATORS") | |
print(f" MSOID Record: {'Yes' if indicators['msoid'] else 'No'}") | |
print(f" MS TXT Verification: {'Yes' if indicators['txt_verification'] else 'No'}") | |
print(f" Autodiscover: {'Yes' if indicators['autodiscover'] else 'No'}") | |
print(f" Enterprise Registration: {'Yes' if indicators['enterprise_reg'] else 'No'}") | |
print(f" DKIM Selectors: {'Yes' if indicators['dkim_selectors'] else 'No'}") | |
print(f" EOP Smart Host: {'Yes' if results['eop_smart_host'] else 'No'}") | |
if indicators['mx_records']: | |
print(f" MX Records:") | |
for mx in indicators['mx_records']: | |
print(f" - {mx}") | |
if indicators['spf_record']: | |
print(f" SPF Record: {indicators['spf_record']}") | |
# MOERA domain candidates | |
if results['moera_candidates']: | |
print(f"\n[+] MOERA DOMAIN ANALYSIS") | |
if results['validated_moera']: | |
candidate, method, confidence, valid = results['validated_moera'] | |
print(f" Best Match: {candidate} (Confidence: {confidence}%, Method: {method})") | |
print(f" All Candidates:") | |
for candidate, method, confidence, valid in results['moera_candidates']: | |
status = "✓ VALID" if valid else "✗ Invalid" | |
inference = " (Inferred)" if confidence < 80 else "" | |
print(f" {candidate} - {status} ({confidence}% confidence, {method}){inference}") | |
else: | |
print(f"\n[-] No MOERA candidates generated (organization name required)") | |
def main(): | |
parser = argparse.ArgumentParser( | |
description='M365 OSINT Reconnaissance Tool', | |
formatter_class=argparse.RawDescriptionHelpFormatter, | |
epilog=""" | |
Examples: | |
python3 m365_recon.py microsoft.com | |
python3 m365_recon.py contoso.com --output results.json | |
Based on techniques from: | |
https://dstreefkerk.github.io/2025-07-m365-email-osint-after-lockdown/ | |
""" | |
) | |
parser.add_argument('domain', help='Target domain to analyze') | |
parser.add_argument('--output', '-o', help='Output results to JSON file') | |
parser.add_argument('--quiet', '-q', action='store_true', help='Suppress progress messages') | |
args = parser.parse_args() | |
# Validate domain format | |
domain_pattern = re.compile( | |
r'^[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(\.[a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*$' | |
) | |
if not domain_pattern.match(args.domain): | |
print(f"[!] Invalid domain format: {args.domain}") | |
sys.exit(1) | |
# Initialize reconnaissance tool | |
recon = M365Recon() | |
try: | |
# Analyze the domain | |
results = recon.analyze_domain(args.domain) | |
# Print results | |
if not args.quiet: | |
recon.print_results(results) | |
# Save to file if requested | |
if args.output: | |
with open(args.output, 'w') as f: | |
json.dump(results, f, indent=2) | |
print(f"\n[+] Results saved to: {args.output}") | |
except KeyboardInterrupt: | |
print(f"\n[!] Analysis interrupted by user") | |
sys.exit(1) | |
except Exception as e: | |
print(f"\n[!] Error during analysis: {e}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment