Created
November 8, 2024 21:45
-
-
Save ntfargo/a7ed61d615b645cd0581a0b3fe8966f1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import re | |
import json | |
import time | |
from datetime import datetime | |
from bs4 import BeautifulSoup | |
import signal | |
import sys | |
class WebKitBugzillaChecker: | |
def __init__(self, github_token): | |
self.github_token = github_token | |
self.headers = { | |
'Authorization': f'token {github_token}', | |
'Accept': 'application/vnd.github.v3+json' | |
} | |
self.state_file = 'checker_state.json' | |
self.unauthorized_bugs_file = 'unauthorized_bugs.txt' | |
self.load_state() | |
self.setup_signal_handlers() | |
def setup_signal_handlers(self): | |
signal.signal(signal.SIGINT, self.handle_interrupt) | |
signal.signal(signal.SIGTERM, self.handle_interrupt) | |
def handle_interrupt(self, signum, frame): | |
print("\nReceived interrupt signal. Saving state before exiting...") | |
self.save_state() | |
print(f"State saved. Last processed page: {self.current_page}") | |
print(f"To resume, start the program with --start-page {self.current_page}") | |
sys.exit(0) | |
def load_state(self): | |
try: | |
with open(self.state_file, 'r') as f: | |
state = json.load(f) | |
self.seen_bugs = set(state.get('seen_bugs', [])) | |
self.seen_commits = set(state.get('seen_commits', [])) | |
self.current_page = state.get('current_page', 1) | |
print(f"Loaded state: {len(self.seen_bugs)} bugs, {len(self.seen_commits)} commits") | |
print(f"Last page processed: {self.current_page - 1}") | |
except FileNotFoundError: | |
self.seen_bugs = set() | |
self.seen_commits = set() | |
self.current_page = 1 | |
print("No previous state found. Starting fresh.") | |
def save_state(self): | |
state = { | |
'seen_bugs': list(self.seen_bugs), | |
'seen_commits': list(self.seen_commits), | |
'current_page': self.current_page, | |
'last_updated': datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
} | |
with open(self.state_file, 'w') as f: | |
json.dump(state, f, indent=2) | |
def _save_unauthorized_bug(self, bug_id, commit_url, error_msg): | |
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
with open(self.unauthorized_bugs_file, 'a', encoding='utf-8') as f: | |
f.write(f"[{timestamp}] Bug {bug_id} | Commit: {commit_url}\n") | |
f.write(f"Error Message: {error_msg}\n") | |
f.write("-" * 80 + "\n") | |
def extract_bugzilla_ids(self, commit_message): | |
patterns = [ | |
r'https?://bugs\.webkit\.org/show_bug\.cgi\?id=(\d+)', | |
r'bug (\d+)', | |
r'Bug (\d+)', | |
r'bugzilla (\d+)', | |
r'Bugzilla (\d+)' | |
] | |
bug_ids = set() | |
for pattern in patterns: | |
matches = re.finditer(pattern, commit_message) | |
bug_ids.update(match.group(1) for match in matches) | |
return bug_ids | |
def check_bugzilla_access(self, bug_id): | |
url = f'https://bugs.webkit.org/show_bug.cgi?id={bug_id}' | |
try: | |
response = requests.get(url) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
error_div = soup.find('div', {'id': 'error_msg', 'class': 'throw_error'}) | |
if error_div and "You are not authorized to access bug" in error_div.text: | |
return True, error_div.text.strip() | |
return False, None | |
except requests.RequestException as e: | |
print(f"Error checking Bugzilla bug {bug_id}: {e}") | |
return False, None | |
def process_commit(self, commit_url, commit_message): | |
if commit_url in self.seen_commits: | |
print(f"Skipping already processed commit: {commit_url}") | |
return | |
self.seen_commits.add(commit_url) | |
bug_ids = self.extract_bugzilla_ids(commit_message) | |
for bug_id in bug_ids: | |
if bug_id not in self.seen_bugs: | |
print(f"Checking bug {bug_id}...") | |
self.seen_bugs.add(bug_id) | |
is_unauthorized, error_msg = self.check_bugzilla_access(bug_id) | |
if is_unauthorized: | |
print(f"Found unauthorized bug: {bug_id}") | |
print(f"Error message: {error_msg}") | |
self._save_unauthorized_bug(bug_id, commit_url, error_msg) | |
time.sleep(1) | |
# Save state periodically | |
if len(self.seen_commits) % 10 == 0: | |
self.save_state() | |
def scan_webkit_commits(self, start_page=None, end_page=None): | |
if start_page is not None: | |
self.current_page = start_page | |
if end_page is None: | |
end_page = self.current_page + 10 # Default to 5 pages | |
base_url = "https://api.github.com/repos/WebKit/WebKit/commits" | |
print(f"Starting scan from page {self.current_page} to page {end_page}") | |
while self.current_page <= end_page: | |
print(f"\nProcessing page {self.current_page}...") | |
params = {'page': self.current_page, 'per_page': 100} | |
try: | |
response = requests.get(base_url, headers=self.headers, params=params) | |
response.raise_for_status() | |
commits = response.json() | |
if not commits: | |
print("No more commits found.") | |
break | |
for commit in commits: | |
commit_url = commit['html_url'] | |
commit_message = commit['commit']['message'] | |
self.process_commit(commit_url, commit_message) | |
self.current_page += 1 | |
self.save_state() | |
except requests.RequestException as e: | |
print(f"Error fetching commits page {self.current_page}: {e}") | |
continue | |
print("\nScanning complete!") | |
print(f"Processed {len(self.seen_commits)} commits and {len(self.seen_bugs)} bugs in total.") | |
self.save_state() | |
def main(): | |
import argparse | |
parser = argparse.ArgumentParser(description='WebKit Bugzilla Access Checker') | |
parser.add_argument('--github-token', required=True, help='GitHub API token') | |
parser.add_argument('--start-page', type=int, help='Page number to start from') | |
parser.add_argument('--end-page', type=int, help='Page number to end at') | |
args = parser.parse_args() | |
checker = WebKitBugzillaChecker(args.github_token) | |
checker.scan_webkit_commits(args.start_page, args.end_page) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment