Skip to content

Instantly share code, notes, and snippets.

@ntfargo
Created November 8, 2024 21:45
Show Gist options
  • Save ntfargo/a7ed61d615b645cd0581a0b3fe8966f1 to your computer and use it in GitHub Desktop.
Save ntfargo/a7ed61d615b645cd0581a0b3fe8966f1 to your computer and use it in GitHub Desktop.
import requests
import re
import json
import time
from datetime import datetime
from bs4 import BeautifulSoup
import signal
import sys
class WebKitBugzillaChecker:
def __init__(self, github_token):
self.github_token = github_token
self.headers = {
'Authorization': f'token {github_token}',
'Accept': 'application/vnd.github.v3+json'
}
self.state_file = 'checker_state.json'
self.unauthorized_bugs_file = 'unauthorized_bugs.txt'
self.load_state()
self.setup_signal_handlers()
def setup_signal_handlers(self):
signal.signal(signal.SIGINT, self.handle_interrupt)
signal.signal(signal.SIGTERM, self.handle_interrupt)
def handle_interrupt(self, signum, frame):
print("\nReceived interrupt signal. Saving state before exiting...")
self.save_state()
print(f"State saved. Last processed page: {self.current_page}")
print(f"To resume, start the program with --start-page {self.current_page}")
sys.exit(0)
def load_state(self):
try:
with open(self.state_file, 'r') as f:
state = json.load(f)
self.seen_bugs = set(state.get('seen_bugs', []))
self.seen_commits = set(state.get('seen_commits', []))
self.current_page = state.get('current_page', 1)
print(f"Loaded state: {len(self.seen_bugs)} bugs, {len(self.seen_commits)} commits")
print(f"Last page processed: {self.current_page - 1}")
except FileNotFoundError:
self.seen_bugs = set()
self.seen_commits = set()
self.current_page = 1
print("No previous state found. Starting fresh.")
def save_state(self):
state = {
'seen_bugs': list(self.seen_bugs),
'seen_commits': list(self.seen_commits),
'current_page': self.current_page,
'last_updated': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
with open(self.state_file, 'w') as f:
json.dump(state, f, indent=2)
def _save_unauthorized_bug(self, bug_id, commit_url, error_msg):
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(self.unauthorized_bugs_file, 'a', encoding='utf-8') as f:
f.write(f"[{timestamp}] Bug {bug_id} | Commit: {commit_url}\n")
f.write(f"Error Message: {error_msg}\n")
f.write("-" * 80 + "\n")
def extract_bugzilla_ids(self, commit_message):
patterns = [
r'https?://bugs\.webkit\.org/show_bug\.cgi\?id=(\d+)',
r'bug (\d+)',
r'Bug (\d+)',
r'bugzilla (\d+)',
r'Bugzilla (\d+)'
]
bug_ids = set()
for pattern in patterns:
matches = re.finditer(pattern, commit_message)
bug_ids.update(match.group(1) for match in matches)
return bug_ids
def check_bugzilla_access(self, bug_id):
url = f'https://bugs.webkit.org/show_bug.cgi?id={bug_id}'
try:
response = requests.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
error_div = soup.find('div', {'id': 'error_msg', 'class': 'throw_error'})
if error_div and "You are not authorized to access bug" in error_div.text:
return True, error_div.text.strip()
return False, None
except requests.RequestException as e:
print(f"Error checking Bugzilla bug {bug_id}: {e}")
return False, None
def process_commit(self, commit_url, commit_message):
if commit_url in self.seen_commits:
print(f"Skipping already processed commit: {commit_url}")
return
self.seen_commits.add(commit_url)
bug_ids = self.extract_bugzilla_ids(commit_message)
for bug_id in bug_ids:
if bug_id not in self.seen_bugs:
print(f"Checking bug {bug_id}...")
self.seen_bugs.add(bug_id)
is_unauthorized, error_msg = self.check_bugzilla_access(bug_id)
if is_unauthorized:
print(f"Found unauthorized bug: {bug_id}")
print(f"Error message: {error_msg}")
self._save_unauthorized_bug(bug_id, commit_url, error_msg)
time.sleep(1)
# Save state periodically
if len(self.seen_commits) % 10 == 0:
self.save_state()
def scan_webkit_commits(self, start_page=None, end_page=None):
if start_page is not None:
self.current_page = start_page
if end_page is None:
end_page = self.current_page + 10 # Default to 5 pages
base_url = "https://api.github.com/repos/WebKit/WebKit/commits"
print(f"Starting scan from page {self.current_page} to page {end_page}")
while self.current_page <= end_page:
print(f"\nProcessing page {self.current_page}...")
params = {'page': self.current_page, 'per_page': 100}
try:
response = requests.get(base_url, headers=self.headers, params=params)
response.raise_for_status()
commits = response.json()
if not commits:
print("No more commits found.")
break
for commit in commits:
commit_url = commit['html_url']
commit_message = commit['commit']['message']
self.process_commit(commit_url, commit_message)
self.current_page += 1
self.save_state()
except requests.RequestException as e:
print(f"Error fetching commits page {self.current_page}: {e}")
continue
print("\nScanning complete!")
print(f"Processed {len(self.seen_commits)} commits and {len(self.seen_bugs)} bugs in total.")
self.save_state()
def main():
import argparse
parser = argparse.ArgumentParser(description='WebKit Bugzilla Access Checker')
parser.add_argument('--github-token', required=True, help='GitHub API token')
parser.add_argument('--start-page', type=int, help='Page number to start from')
parser.add_argument('--end-page', type=int, help='Page number to end at')
args = parser.parse_args()
checker = WebKitBugzillaChecker(args.github_token)
checker.scan_webkit_commits(args.start_page, args.end_page)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment