Last active
January 26, 2019 09:22
-
-
Save meg-codes/414bd5a79dc7709f71f7620072e88e2b to your computer and use it in GitHub Desktop.
A basic web-scrape script designed to look for bad links on a particular site
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Script to scrape all links from a site, compile counts of each link, status | |
# codes of access and output the results as a CSV | |
# | |
# There's absolutely no reason this shouldn't be pulled into an OOP paradigm | |
# per se, but I left it functionalized because that can be easier for multitasking. | |
# | |
# Requirements: | |
# requests, bs4 | |
# python3 | |
# | |
# Usage: | |
# python scrape.py [base_url] | |
# | |
# base_url - top level domain, with or without http(s) string. | |
# This will attempt to crawl all pages, so if a sub portion of the site links | |
# to /, any linked pages will be crawled anyway. | |
import argparse | |
import csv | |
from datetime import datetime | |
import sys | |
import urllib.parse | |
from bs4 import BeautifulSoup, SoupStrainer | |
import requests | |
from requests.exceptions import ConnectionError | |
def recurse_scrape(base_url, url, session, results, verbose=False): | |
"""Scrape and recurse all urls found a website | |
Args: | |
base_url (str) - base url to begin scrape recursion from | |
url (str) - url for the current iteration to scrape for further links | |
session (obj) - common requests Session object | |
results (list) - common list for all results | |
""" | |
# break on mailto | |
if url.startswith('mailto'): | |
return | |
# scan results list and if this is a duplicate, don't request, just note | |
# that there is a link to the same resource | |
for result in results: | |
# query strings are different, treating anchors as not | |
if result['url'].split('#')[0] == url.split('#')[0]: | |
result['links'] += 1 | |
return | |
# prevent recursion to another site | |
if urllib.parse.urlsplit(base_url)[1] not in url: | |
return | |
# get the result, including a redirect but don't follow it, we want to | |
# log it separately and run it through the recursive scrape | |
try: | |
if verbose: | |
sys.stdout.write('Scraping: %s\n' % url) | |
response = session.get(url, allow_redirects=False) | |
except ConnectionError: | |
# handle connection errors and log as such just in case no status code is ever returned. | |
results.append({ | |
'url': url, | |
'date': datetime.utcnow(), # TODO: Format this the same way as the requests date field | |
'status_code': 'CONN-ERR', | |
'content-type': '', | |
'last-modified': '', | |
'size': '', | |
'links': 1 | |
}) | |
return | |
results.append({ | |
'url': url, | |
'date': response.headers['Date'] if 'Date' in response.headers else '', | |
'status_code': response.status_code, | |
'content-type': response.headers['Content-Type'] if 'Content-Type' in response.headers else '', | |
'last-modified': (response.headers['Last-Modified'] | |
if 'Last-Modified' in response.headers else ''), | |
'size': len(response.content), | |
'links': 1 | |
}) | |
# allow recursion to follow redirects even on off site links | |
if response.status_code in [302, 301]: | |
recurse_scrape( | |
base_url, | |
urllib.parse.urljoin(base_url, response.headers['Location']), | |
session, | |
results, | |
verbose=verbose, | |
) | |
# any other codes 300-500s should be treated as dead-ends as they indicate | |
# an error, otherwise, if status is OK, grab the page, parse any and all | |
# links, and recurse on them | |
if response.status_code == requests.codes.ok\ | |
and response.headers['Content-Type'].startswith('text/html'): | |
only_a_tags = SoupStrainer("a") | |
bs = BeautifulSoup(response.content, parse_only=only_a_tags, features='html.parser') | |
for link in bs: | |
if link.has_attr('href'): | |
href = link['href'] | |
# treat as an absolute link and scrape accordingly | |
if href.startswith('http'): | |
recurse_scrape( | |
base_url, | |
href, | |
session, | |
results, | |
verbose=verbose, | |
) | |
else: | |
# starts with a / and is therefore base url | |
scrape_url = base_url | |
if not href.startswith('/'): | |
scrape_url = url | |
recurse_scrape( | |
base_url, | |
urllib.parse.urljoin(scrape_url, href), | |
session, | |
results, | |
verbose=verbose | |
) | |
def create_parser(): | |
"""Define an argparse instance""" | |
parser = argparse.ArgumentParser( | |
description='Utility script for scraping and analyzing a CDH website' | |
) | |
parser.add_argument('base_url', action='store', type=str, | |
help='The base url to begin scraping from') | |
parser.add_argument('--output', '-o', action='store', type=str, nargs=1, | |
default='output.csv', help='name of output CSV file') | |
parser.add_argument('--verbose', '-v', action='store_true') | |
return parser | |
def main(): | |
# create parser and get args | |
parser = create_parser() | |
args = parser.parse_args() | |
# build a common session for scraper and set session headers | |
session = requests.Session() | |
session.headers = { | |
'User-Agent': 'cdh-scraper/0.1' | |
} | |
results = [] | |
base_url = args.base_url | |
if not base_url.startswith('http'): | |
# regrettably this ends up being neater than using urllib | |
# in terms of code readablility | |
base_url = 'http://%s' % base_url.strip('/') | |
# detect a site-wide upgrade to https:// or redirect | |
response = session.get(base_url, allow_redirects=False) | |
if response.status_code == 301: | |
sys.stdout.write('Detected an upgrade to https...\n') | |
base_url = response.headers['Location'] | |
# begin the recursion | |
recurse_scrape(base_url, base_url, session, results, verbose=args.verbose) | |
with open(args.output, 'w') as csvfile: | |
writer = csv.DictWriter(csvfile, fieldnames=results[0].keys()) | |
for row in results: | |
writer.writeheader() | |
writer.writerow(row) | |
sys.stdout.write('Problematic urls:\n') | |
sys.stdout.write('-----------------\n') | |
for result in results: | |
if result['status_code'] not in [301, 302, 200, 'CONN-ERR']: | |
sys.stdout.write('%(url)s\t%(status_code)s\n' % result) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment