Skip to content

Instantly share code, notes, and snippets.

@queencitycyber
Last active July 24, 2024 15:34
Show Gist options
  • Save queencitycyber/642b7da5de816d6ba7c2a315eafd9609 to your computer and use it in GitHub Desktop.
Save queencitycyber/642b7da5de816d6ba7c2a315eafd9609 to your computer and use it in GitHub Desktop.
postmessage scanning
import asyncio
import json
import os
import re
from typing import List, Dict
import aiohttp
import click
from bs4 import BeautifulSoup
from rich.console import Console
from rich.progress import Progress
from rich.table import Table
class PostMessageScanner:
def __init__(self, urls: List[str], output_format: str, poc: bool, output_dir: str):
self.urls = urls
self.output_format = output_format
self.poc = poc
self.output_dir = output_dir
self.results = []
self.console = Console()
self.poc_files = []
async def scan_url(self, url: str, session: aiohttp.ClientSession) -> Dict:
result = {"url": url, "vulnerabilities": []}
try:
async with session.get(url) as response:
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
# Check inline scripts
for script in soup.find_all('script'):
if script.string:
self.check_script_content(script.string, result)
# Check external scripts
tasks = []
for script in soup.find_all('script', src=True):
src = script['src']
if src.startswith('//'):
src = f'https:{src}'
elif src.startswith('/'):
src = f'{url}{src}'
elif not src.startswith('http'):
src = f'{url}/{src}'
tasks.append(self.fetch_and_check_script(src, session, result))
await asyncio.gather(*tasks)
except Exception as e:
result["vulnerabilities"].append(f"Error scanning {url}: {str(e)}")
return result
async def fetch_and_check_script(self, src: str, session: aiohttp.ClientSession, result: Dict):
try:
async with session.get(src) as response:
script_content = await response.text()
self.check_script_content(script_content, result)
except Exception as e:
result["vulnerabilities"].append(f"Error fetching script from {src}: {str(e)}")
def check_script_content(self, content: str, result: Dict):
vulnerabilities_found = []
if re.search(r'postMessage\s*\([^)]*,\s*[\'"]?\*[\'"]?\s*\)', content):
vulnerabilities_found.append("Found postMessage with wildcard (*) origin")
if re.search(r'addEventListener\s*\(\s*[\'"]message[\'"]', content):
if not re.search(r'event\.origin', content):
vulnerabilities_found.append("Found message event listener without origin check")
if vulnerabilities_found:
result["vulnerabilities"].extend(vulnerabilities_found)
if self.poc:
self.generate_poc(result["url"], vulnerabilities_found)
def generate_poc(self, url: str, vulnerabilities: List[str]):
poc_content = f"""
<!DOCTYPE html>
<html>
<head>
<title>PostMessage PoC for {url}</title>
</head>
<body>
<script>
// Sample PoC code
window.addEventListener("message", function(event) {{
// Replace '*' with a specific origin in real scenarios
if (event.origin !== "*") return;
console.log("Message received:", event.data);
}});
// Send a message to the target window
var targetWindow = window.open("{url}");
targetWindow.postMessage("PoC message", "*");
</script>
</body>
</html>
"""
try:
os.makedirs(self.output_dir, exist_ok=True)
poc_filename = os.path.join(self.output_dir, f"poc_{url.replace('/', '_')}.html")
with open(poc_filename, "w") as f:
f.write(poc_content)
self.poc_files.append(poc_filename)
except Exception as e:
self.console.print(f"[bold red]Error creating PoC file for {url}: {e}[/bold red]")
async def scan_all_urls(self):
async with aiohttp.ClientSession() as session:
with Progress() as progress:
task = progress.add_task("[cyan]Scanning URLs...", total=len(self.urls))
async def scan_with_progress(url):
result = await self.scan_url(url, session)
progress.update(task, advance=1)
return result
self.results = await asyncio.gather(*[scan_with_progress(url) for url in self.urls])
def output_results(self):
if self.output_format == 'json':
print(json.dumps(self.results, indent=2))
else:
table = Table(title="PostMessage Wildcard Origin Vulnerability Scan Results")
table.add_column("URL", style="cyan")
table.add_column("Vulnerabilities", style="magenta")
for result in self.results:
if result["vulnerabilities"]:
vulnerabilities = "\n".join(result["vulnerabilities"])
table.add_row(result["url"], vulnerabilities)
self.console.print(table)
if self.poc_files:
self.console.print(f"[bold green]PoCs created in: {', '.join(self.poc_files)}[/bold green]")
@click.command()
@click.option('--url', help='Single URL to scan')
@click.option('--file', type=click.File('r'), help='File containing list of URLs to scan')
@click.option('--output', type=click.Choice(['text', 'json']), default='text', help='Output format')
@click.option('--poc', is_flag=True, help='Generate PoC file if vulnerabilities are found')
@click.option('--output-dir', default='poc_output', help='Directory to save PoC files')
def main(url, file, output, poc, output_dir):
if url:
urls = [url]
elif file:
urls = [line.strip() for line in file if line.strip()]
else:
click.echo("Please provide either a URL or a file containing URLs.")
return
scanner = PostMessageScanner(urls, output, poc, output_dir)
asyncio.run(scanner.scan_all_urls())
scanner.output_results()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment