Skip to content

Instantly share code, notes, and snippets.

@JonathanLalou
Created April 16, 2025 19:06
Show Gist options
  • Save JonathanLalou/297eca568e5cf76457b1e03eba3c9045 to your computer and use it in GitHub Desktop.
Save JonathanLalou/297eca568e5cf76457b1e03eba3c9045 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
# @author Jonathan Lalou
import feedparser
import argparse
from datetime import datetime, timedelta
from ebooklib import epub
import re
from bs4 import BeautifulSoup
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
def clean_html(html_content):
"""Clean HTML content while preserving formatting."""
soup = BeautifulSoup(html_content, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Remove any inline styles
for tag in soup.find_all(True):
if 'style' in tag.attrs:
del tag.attrs['style']
# Return the cleaned HTML
return str(soup)
def get_next_feed_page(current_feed, feed_url):
"""Get the next page of the feed using various pagination methods."""
# Method 1: next_page link in feed
if hasattr(current_feed, 'next_page'):
logging.info(f"Found next_page link: {current_feed.next_page}")
return current_feed.next_page
# Method 2: Atom-style pagination
if hasattr(current_feed.feed, 'links'):
for link in current_feed.feed.links:
if link.get('rel') == 'next':
logging.info(f"Found Atom-style next link: {link.href}")
return link.href
# Method 3: RSS 2.0 pagination (using lastBuildDate)
if hasattr(current_feed.feed, 'lastBuildDate'):
last_date = current_feed.feed.lastBuildDate
if hasattr(current_feed.entries, 'last'):
last_entry = current_feed.entries[-1]
if hasattr(last_entry, 'published_parsed'):
last_entry_date = datetime(*last_entry.published_parsed[:6])
# Try to construct next page URL with date parameter
if '?' in feed_url:
next_url = f"{feed_url}&before={last_entry_date.strftime('%Y-%m-%d')}"
else:
next_url = f"{feed_url}?before={last_entry_date.strftime('%Y-%m-%d')}"
logging.info(f"Constructed date-based next URL: {next_url}")
return next_url
# Method 4: Check for pagination in feed description
if hasattr(current_feed.feed, 'description'):
desc = current_feed.feed.description
# Look for common pagination patterns in description
next_page_patterns = [
r'next page: (https?://\S+)',
r'older posts: (https?://\S+)',
r'page \d+: (https?://\S+)'
]
for pattern in next_page_patterns:
match = re.search(pattern, desc, re.IGNORECASE)
if match:
next_url = match.group(1)
logging.info(f"Found next page URL in description: {next_url}")
return next_url
return None
def get_feed_type(feed):
"""Determine if the feed is RSS 2.0 or Atom format."""
if hasattr(feed, 'version') and feed.version.startswith('rss'):
return 'rss'
elif hasattr(feed, 'version') and feed.version == 'atom10':
return 'atom'
# Try to detect by checking for Atom-specific elements
elif hasattr(feed.feed, 'links') and any(link.get('rel') == 'self' for link in feed.feed.links):
return 'atom'
# Default to RSS if no clear indicators
return 'rss'
def get_entry_content(entry, feed_type):
"""Get the content of an entry based on feed type."""
if feed_type == 'atom':
# Atom format
if hasattr(entry, 'content'):
return entry.content[0].value if entry.content else ''
elif hasattr(entry, 'summary'):
return entry.summary
else:
# RSS 2.0 format
if hasattr(entry, 'content'):
return entry.content[0].value if entry.content else ''
elif hasattr(entry, 'description'):
return entry.description
return ''
def get_entry_date(entry, feed_type):
"""Get the publication date of an entry based on feed type."""
if feed_type == 'atom':
# Atom format uses updated or published
if hasattr(entry, 'published_parsed'):
return datetime(*entry.published_parsed[:6])
elif hasattr(entry, 'updated_parsed'):
return datetime(*entry.updated_parsed[:6])
else:
# RSS 2.0 format uses pubDate
if hasattr(entry, 'published_parsed'):
return datetime(*entry.published_parsed[:6])
return datetime.now()
def get_feed_metadata(feed, feed_type):
"""Extract metadata from feed based on its type."""
metadata = {
'title': '',
'description': '',
'language': 'en',
'author': 'Unknown',
'publisher': '',
'rights': '',
'updated': ''
}
if feed_type == 'atom':
# Atom format metadata
metadata['title'] = feed.feed.get('title', '')
metadata['description'] = feed.feed.get('subtitle', '')
metadata['language'] = feed.feed.get('language', 'en')
metadata['author'] = feed.feed.get('author', 'Unknown')
metadata['rights'] = feed.feed.get('rights', '')
metadata['updated'] = feed.feed.get('updated', '')
else:
# RSS 2.0 format metadata
metadata['title'] = feed.feed.get('title', '')
metadata['description'] = feed.feed.get('description', '')
metadata['language'] = feed.feed.get('language', 'en')
metadata['author'] = feed.feed.get('author', 'Unknown')
metadata['copyright'] = feed.feed.get('copyright', '')
metadata['lastBuildDate'] = feed.feed.get('lastBuildDate', '')
return metadata
def create_ebook(feed_url, start_date, end_date, output_file):
"""Create an ebook from RSS feed entries within the specified date range."""
logging.info(f"Starting ebook creation from feed: {feed_url}")
logging.info(f"Date range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
# Parse the RSS feed
feed = feedparser.parse(feed_url)
if feed.bozo:
logging.error(f"Error parsing feed: {feed.bozo_exception}")
return False
# Determine feed type
feed_type = get_feed_type(feed)
logging.info(f"Detected feed type: {feed_type}")
logging.info(f"Successfully parsed feed: {feed.feed.get('title', 'Unknown Feed')}")
# Create a new EPUB book
book = epub.EpubBook()
# Extract metadata based on feed type
metadata = get_feed_metadata(feed, feed_type)
logging.info(f"Setting metadata for ebook: {metadata['title']}")
# Set basic metadata
book.set_identifier(feed_url) # Use feed URL as unique identifier
book.set_title(metadata['title'])
book.set_language(metadata['language'])
book.add_author(metadata['author'])
# Add additional metadata if available
if metadata['description']:
book.add_metadata('DC', 'description', metadata['description'])
if metadata['publisher']:
book.add_metadata('DC', 'publisher', metadata['publisher'])
if metadata['rights']:
book.add_metadata('DC', 'rights', metadata['rights'])
if metadata['updated']:
book.add_metadata('DC', 'date', metadata['updated'])
# Add date range to description
date_range_desc = f"Content from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}"
book.add_metadata('DC', 'description', f"{metadata['description']}\n\n{date_range_desc}")
# Create table of contents
chapters = []
toc = []
# Process entries within date range
entries_processed = 0
entries_in_range = 0
consecutive_out_of_range = 0
current_page = 1
processed_urls = set() # Track processed URLs to avoid duplicates
logging.info("Starting to process feed entries...")
while True:
logging.info(f"Processing page {current_page} with {len(feed.entries)} entries")
# Process current batch of entries
for entry in feed.entries[entries_processed:]:
entries_processed += 1
# Skip if we've already processed this entry
entry_id = entry.get('id', entry.get('link', ''))
if entry_id in processed_urls:
logging.debug(f"Skipping duplicate entry: {entry_id}")
continue
processed_urls.add(entry_id)
# Get entry date based on feed type
entry_date = get_entry_date(entry, feed_type)
if entry_date < start_date:
consecutive_out_of_range += 1
logging.debug(f"Skipping entry from {entry_date.strftime('%Y-%m-%d')} (before start date)")
continue
elif entry_date > end_date:
consecutive_out_of_range += 1
logging.debug(f"Skipping entry from {entry_date.strftime('%Y-%m-%d')} (after end date)")
continue
else:
consecutive_out_of_range = 0
entries_in_range += 1
# Create chapter
title = entry.get('title', 'Untitled')
logging.info(f"Adding chapter: {title} ({entry_date.strftime('%Y-%m-%d')})")
# Get content based on feed type
content = get_entry_content(entry, feed_type)
# Clean the content
cleaned_content = clean_html(content)
# Create chapter
chapter = epub.EpubHtml(
title=title,
file_name=f'chapter_{len(chapters)}.xhtml',
content=f'<h1>{title}</h1>{cleaned_content}'
)
# Add chapter to book
book.add_item(chapter)
chapters.append(chapter)
toc.append(epub.Link(chapter.file_name, title, chapter.id))
# If we have no entries in range or we've seen too many consecutive out-of-range entries, stop
if entries_in_range == 0 or consecutive_out_of_range >= 10:
if entries_in_range == 0:
logging.warning("No entries found within the specified date range")
else:
logging.info(f"Stopping after {consecutive_out_of_range} consecutive out-of-range entries")
break
# Try to get more entries if available
next_page_url = get_next_feed_page(feed, feed_url)
if next_page_url:
current_page += 1
logging.info(f"Fetching next page: {next_page_url}")
feed = feedparser.parse(next_page_url)
if not feed.entries:
logging.info("No more entries available")
break
else:
logging.info("No more pages available")
break
if entries_in_range == 0:
logging.error("No entries found within the specified date range")
return False
logging.info(f"Processed {entries_processed} total entries, {entries_in_range} within date range")
# Add table of contents
book.toc = toc
# Add navigation files
book.add_item(epub.EpubNcx())
book.add_item(epub.EpubNav())
# Define CSS style
style = '''
@namespace epub "http://www.idpf.org/2007/ops";
body {
font-family: Cambria, Liberation Serif, serif;
}
h1 {
text-align: left;
text-transform: uppercase;
font-weight: 200;
}
'''
# Add CSS file
nav_css = epub.EpubItem(
uid="style_nav",
file_name="style/nav.css",
media_type="text/css",
content=style
)
book.add_item(nav_css)
# Create spine
book.spine = ['nav'] + chapters
# Write the EPUB file
logging.info(f"Writing EPUB file: {output_file}")
epub.write_epub(output_file, book, {})
logging.info("EPUB file created successfully")
return True
def main():
parser = argparse.ArgumentParser(description='Convert RSS feed to EPUB ebook')
parser.add_argument('feed_url', help='URL of the RSS feed')
parser.add_argument('--start-date', help='Start date (YYYY-MM-DD)',
default=(datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d'))
parser.add_argument('--end-date', help='End date (YYYY-MM-DD)',
default=datetime.now().strftime('%Y-%m-%d'))
parser.add_argument('--output', help='Output EPUB file name',
default='rss_feed.epub')
parser.add_argument('--debug', action='store_true', help='Enable debug logging')
args = parser.parse_args()
if args.debug:
logging.getLogger().setLevel(logging.DEBUG)
# Parse dates
start_date = datetime.strptime(args.start_date, '%Y-%m-%d')
end_date = datetime.strptime(args.end_date, '%Y-%m-%d')
# Create ebook
if create_ebook(args.feed_url, start_date, end_date, args.output):
logging.info(f"Successfully created ebook: {args.output}")
else:
logging.error("Failed to create ebook")
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment