Last active
July 24, 2024 15:34
-
-
Save queencitycyber/642b7da5de816d6ba7c2a315eafd9609 to your computer and use it in GitHub Desktop.
postmessage scanning
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import json | |
import os | |
import re | |
from typing import List, Dict | |
import aiohttp | |
import click | |
from bs4 import BeautifulSoup | |
from rich.console import Console | |
from rich.progress import Progress | |
from rich.table import Table | |
class PostMessageScanner: | |
def __init__(self, urls: List[str], output_format: str, poc: bool, output_dir: str): | |
self.urls = urls | |
self.output_format = output_format | |
self.poc = poc | |
self.output_dir = output_dir | |
self.results = [] | |
self.console = Console() | |
self.poc_files = [] | |
async def scan_url(self, url: str, session: aiohttp.ClientSession) -> Dict: | |
result = {"url": url, "vulnerabilities": []} | |
try: | |
async with session.get(url) as response: | |
html = await response.text() | |
soup = BeautifulSoup(html, 'html.parser') | |
# Check inline scripts | |
for script in soup.find_all('script'): | |
if script.string: | |
self.check_script_content(script.string, result) | |
# Check external scripts | |
tasks = [] | |
for script in soup.find_all('script', src=True): | |
src = script['src'] | |
if src.startswith('//'): | |
src = f'https:{src}' | |
elif src.startswith('/'): | |
src = f'{url}{src}' | |
elif not src.startswith('http'): | |
src = f'{url}/{src}' | |
tasks.append(self.fetch_and_check_script(src, session, result)) | |
await asyncio.gather(*tasks) | |
except Exception as e: | |
result["vulnerabilities"].append(f"Error scanning {url}: {str(e)}") | |
return result | |
async def fetch_and_check_script(self, src: str, session: aiohttp.ClientSession, result: Dict): | |
try: | |
async with session.get(src) as response: | |
script_content = await response.text() | |
self.check_script_content(script_content, result) | |
except Exception as e: | |
result["vulnerabilities"].append(f"Error fetching script from {src}: {str(e)}") | |
def check_script_content(self, content: str, result: Dict): | |
vulnerabilities_found = [] | |
if re.search(r'postMessage\s*\([^)]*,\s*[\'"]?\*[\'"]?\s*\)', content): | |
vulnerabilities_found.append("Found postMessage with wildcard (*) origin") | |
if re.search(r'addEventListener\s*\(\s*[\'"]message[\'"]', content): | |
if not re.search(r'event\.origin', content): | |
vulnerabilities_found.append("Found message event listener without origin check") | |
if vulnerabilities_found: | |
result["vulnerabilities"].extend(vulnerabilities_found) | |
if self.poc: | |
self.generate_poc(result["url"], vulnerabilities_found) | |
def generate_poc(self, url: str, vulnerabilities: List[str]): | |
poc_content = f""" | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>PostMessage PoC for {url}</title> | |
</head> | |
<body> | |
<script> | |
// Sample PoC code | |
window.addEventListener("message", function(event) {{ | |
// Replace '*' with a specific origin in real scenarios | |
if (event.origin !== "*") return; | |
console.log("Message received:", event.data); | |
}}); | |
// Send a message to the target window | |
var targetWindow = window.open("{url}"); | |
targetWindow.postMessage("PoC message", "*"); | |
</script> | |
</body> | |
</html> | |
""" | |
try: | |
os.makedirs(self.output_dir, exist_ok=True) | |
poc_filename = os.path.join(self.output_dir, f"poc_{url.replace('/', '_')}.html") | |
with open(poc_filename, "w") as f: | |
f.write(poc_content) | |
self.poc_files.append(poc_filename) | |
except Exception as e: | |
self.console.print(f"[bold red]Error creating PoC file for {url}: {e}[/bold red]") | |
async def scan_all_urls(self): | |
async with aiohttp.ClientSession() as session: | |
with Progress() as progress: | |
task = progress.add_task("[cyan]Scanning URLs...", total=len(self.urls)) | |
async def scan_with_progress(url): | |
result = await self.scan_url(url, session) | |
progress.update(task, advance=1) | |
return result | |
self.results = await asyncio.gather(*[scan_with_progress(url) for url in self.urls]) | |
def output_results(self): | |
if self.output_format == 'json': | |
print(json.dumps(self.results, indent=2)) | |
else: | |
table = Table(title="PostMessage Wildcard Origin Vulnerability Scan Results") | |
table.add_column("URL", style="cyan") | |
table.add_column("Vulnerabilities", style="magenta") | |
for result in self.results: | |
if result["vulnerabilities"]: | |
vulnerabilities = "\n".join(result["vulnerabilities"]) | |
table.add_row(result["url"], vulnerabilities) | |
self.console.print(table) | |
if self.poc_files: | |
self.console.print(f"[bold green]PoCs created in: {', '.join(self.poc_files)}[/bold green]") | |
@click.command() | |
@click.option('--url', help='Single URL to scan') | |
@click.option('--file', type=click.File('r'), help='File containing list of URLs to scan') | |
@click.option('--output', type=click.Choice(['text', 'json']), default='text', help='Output format') | |
@click.option('--poc', is_flag=True, help='Generate PoC file if vulnerabilities are found') | |
@click.option('--output-dir', default='poc_output', help='Directory to save PoC files') | |
def main(url, file, output, poc, output_dir): | |
if url: | |
urls = [url] | |
elif file: | |
urls = [line.strip() for line in file if line.strip()] | |
else: | |
click.echo("Please provide either a URL or a file containing URLs.") | |
return | |
scanner = PostMessageScanner(urls, output, poc, output_dir) | |
asyncio.run(scanner.scan_all_urls()) | |
scanner.output_results() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment