-
-
Save rlskoeser/777694bfebf2fea031946316007cf5f0 to your computer and use it in GitHub Desktop.
A basic web-scrape script designed to get summary information an all resources on a site; includes a report of bad links
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# Script to scrape all links from a site, compile counts of each link, status | |
# codes of access and output the results as a CSV | |
# | |
# There's absolutely no reason this shouldn't be pulled into an OOP paradigm | |
# per se, but I left it functionalized because that can be easier for multitasking. | |
# | |
# Requirements: | |
# requests, bs4 | |
# python3 | |
# | |
# Usage: | |
# python scrape.py [base_url] | |
# | |
# base_url - top level domain, with or without http(s) string. | |
# This will attempt to crawl all pages, so if a sub portion of the site links | |
# to /, any linked pages will be crawled anyway. | |
import argparse | |
from collections import OrderedDict | |
import csv | |
from datetime import datetime | |
import sys | |
import urllib.parse | |
from bs4 import BeautifulSoup, SoupStrainer | |
import requests | |
from requests.exceptions import ConnectionError | |
def recurse_scrape(base_url, url, session, results, verbose=False, source_url=None): | |
"""Scrape and recurse all urls found a website | |
Args: | |
base_url (str) - base url to begin scrape recursion from | |
url (str) - url for the current iteration to scrape for further links | |
session (obj) - common requests Session object | |
results (list) - common list for all results | |
""" | |
# break on mailto | |
if url.startswith('mailto'): | |
return | |
# bail out after a set number for testing the script | |
# if len(results.keys()) > 10: | |
# return | |
# scan results list and if this is a duplicate, don't request, just note | |
# that there is a link to the same resource | |
if url in results.keys(): | |
results[url]['links'] += 1 | |
# NOTE: skipping anchor difference here; | |
# add logic to strip out when storing if we care | |
return | |
# prevent recursion to another site | |
if urllib.parse.urlsplit(base_url)[1] not in url: | |
return | |
# get the result, including a redirect but don't follow it, we want to | |
# log it separately and run it through the recursive scrape | |
try: | |
if verbose: | |
sys.stdout.write('Scraping: %s\n' % url) | |
response = session.get(url, allow_redirects=False) | |
except ConnectionError: | |
# handle connection errors and log as such just in case no status code is ever returned. | |
results[url] = { | |
'url': url, | |
'date': datetime.utcnow(), # TODO: Format this the same way as the requests date field | |
'status code': 'CONN-ERR', | |
'content type': '', | |
'last modified': '', | |
'size': '', | |
'links': 1, | |
'timestamp': datetime.utcnow().isoformat() | |
} | |
return | |
results[url] = { | |
'url': url, | |
'date': response.headers.get('Date', ''), | |
'status code': response.status_code, | |
'content type': response.headers.get('Content-Type', ''), | |
'last modified': response.headers.get('Last-Modified', ''), | |
'contentlength': response.headers.get('Content-Length', ''), | |
'size': len(response.content), | |
'links': 1, | |
# only documents the first place we found it | |
'source url': source_url or '', | |
# timestamp in isoformat so we can filter on it more easily | |
'timestamp': datetime.utcnow().isoformat() | |
} | |
# allow recursion to follow redirects even on off site links | |
if response.status_code in [302, 301]: | |
recurse_scrape( | |
base_url, | |
urllib.parse.urljoin(base_url, response.headers['Location']), | |
session, | |
results, | |
verbose=verbose, | |
# current url becomes the source url on recursion | |
source_url=url | |
) | |
# any other codes 300-500s should be treated as dead-ends as they indicate | |
# an error, otherwise, if status is OK, grab the page, parse any and all | |
# links, and recurse on them | |
if response.status_code == requests.codes.ok\ | |
and response.headers['Content-Type'].startswith('text/html'): | |
# only_a_tags = SoupStrainer("a") | |
page = BeautifulSoup(response.content, features='html.parser') | |
links = [] | |
# find all links and included content: header link, image, script, a | |
for link in page.find_all('link'): | |
links.append(link['href']) | |
for img in page.find_all('img'): | |
links.append(img['src']) | |
for link in page.find_all('a'): | |
if link.has_attr('href'): | |
links.append(link['href']) | |
for script in page.find_all('script'): | |
if script.has_attr('src'): | |
links.append(script['src']) | |
for link_url in links: | |
# ignore anchor links | |
if '#' in link_url: | |
link_url = link_url.split('#')[0] | |
# links starting with http need no modification | |
if not link_url.startswith('http'): | |
# if link is relative | |
if not link_url.startswith('/'): | |
# make relative to current url | |
link_url = urllib.parse.urljoin(url, link_url) | |
else: | |
# make relative to base url | |
link_url = urllib.parse.urljoin(base_url, link_url) | |
recurse_scrape( | |
base_url, | |
link_url, | |
session, | |
results, | |
verbose=verbose, | |
source_url=url | |
) | |
def create_parser(): | |
"""Define an argparse instance""" | |
parser = argparse.ArgumentParser( | |
description='Utility script for scraping and analyzing a CDH website' | |
) | |
parser.add_argument('base_url', action='store', type=str, | |
help='The base url to begin scraping from') | |
parser.add_argument('--output', '-o', action='store', type=str, | |
default='output.csv', help='name of output CSV file') | |
parser.add_argument('--verbose', '-v', action='store_true') | |
return parser | |
def main(): | |
# create parser and get args | |
parser = create_parser() | |
args = parser.parse_args() | |
# build a common session for scraper and set session headers | |
session = requests.Session() | |
session.headers = { | |
'User-Agent': 'cdh-scraper/0.1' | |
} | |
results = OrderedDict() | |
base_url = args.base_url | |
if not base_url.startswith('http'): | |
# regrettably this ends up being neater than using urllib | |
# in terms of code readablility | |
base_url = 'http://%s' % base_url.strip('/') | |
# detect a site-wide upgrade to https:// or redirect | |
response = session.get(base_url, allow_redirects=False) | |
if response.status_code == 301: | |
sys.stdout.write('Detected an upgrade to https...\n') | |
base_url = response.headers['Location'] | |
# begin the recursion | |
recurse_scrape(base_url, base_url, session, results, verbose=args.verbose) | |
with open(args.output, 'w') as csvfile: | |
writer = csv.DictWriter(csvfile, fieldnames=results[base_url].keys()) | |
writer.writeheader() | |
for row in results.values(): | |
writer.writerow(row) | |
sys.stdout.write('Problematic urls:\n') | |
sys.stdout.write('-----------------\n') | |
for result in results.values(): | |
if result['status code'] not in [301, 302, 200, 'CONN-ERR']: | |
sys.stdout.write('%(url)s\t%(status code)s\t%(source url)s\n' % result) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment