Created
April 16, 2025 19:06
-
-
Save JonathanLalou/297eca568e5cf76457b1e03eba3c9045 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# @author Jonathan Lalou | |
import feedparser | |
import argparse | |
from datetime import datetime, timedelta | |
from ebooklib import epub | |
import re | |
from bs4 import BeautifulSoup | |
import logging | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
datefmt='%Y-%m-%d %H:%M:%S' | |
) | |
def clean_html(html_content): | |
"""Clean HTML content while preserving formatting.""" | |
soup = BeautifulSoup(html_content, 'html.parser') | |
# Remove script and style elements | |
for script in soup(["script", "style"]): | |
script.decompose() | |
# Remove any inline styles | |
for tag in soup.find_all(True): | |
if 'style' in tag.attrs: | |
del tag.attrs['style'] | |
# Return the cleaned HTML | |
return str(soup) | |
def get_next_feed_page(current_feed, feed_url): | |
"""Get the next page of the feed using various pagination methods.""" | |
# Method 1: next_page link in feed | |
if hasattr(current_feed, 'next_page'): | |
logging.info(f"Found next_page link: {current_feed.next_page}") | |
return current_feed.next_page | |
# Method 2: Atom-style pagination | |
if hasattr(current_feed.feed, 'links'): | |
for link in current_feed.feed.links: | |
if link.get('rel') == 'next': | |
logging.info(f"Found Atom-style next link: {link.href}") | |
return link.href | |
# Method 3: RSS 2.0 pagination (using lastBuildDate) | |
if hasattr(current_feed.feed, 'lastBuildDate'): | |
last_date = current_feed.feed.lastBuildDate | |
if hasattr(current_feed.entries, 'last'): | |
last_entry = current_feed.entries[-1] | |
if hasattr(last_entry, 'published_parsed'): | |
last_entry_date = datetime(*last_entry.published_parsed[:6]) | |
# Try to construct next page URL with date parameter | |
if '?' in feed_url: | |
next_url = f"{feed_url}&before={last_entry_date.strftime('%Y-%m-%d')}" | |
else: | |
next_url = f"{feed_url}?before={last_entry_date.strftime('%Y-%m-%d')}" | |
logging.info(f"Constructed date-based next URL: {next_url}") | |
return next_url | |
# Method 4: Check for pagination in feed description | |
if hasattr(current_feed.feed, 'description'): | |
desc = current_feed.feed.description | |
# Look for common pagination patterns in description | |
next_page_patterns = [ | |
r'next page: (https?://\S+)', | |
r'older posts: (https?://\S+)', | |
r'page \d+: (https?://\S+)' | |
] | |
for pattern in next_page_patterns: | |
match = re.search(pattern, desc, re.IGNORECASE) | |
if match: | |
next_url = match.group(1) | |
logging.info(f"Found next page URL in description: {next_url}") | |
return next_url | |
return None | |
def get_feed_type(feed): | |
"""Determine if the feed is RSS 2.0 or Atom format.""" | |
if hasattr(feed, 'version') and feed.version.startswith('rss'): | |
return 'rss' | |
elif hasattr(feed, 'version') and feed.version == 'atom10': | |
return 'atom' | |
# Try to detect by checking for Atom-specific elements | |
elif hasattr(feed.feed, 'links') and any(link.get('rel') == 'self' for link in feed.feed.links): | |
return 'atom' | |
# Default to RSS if no clear indicators | |
return 'rss' | |
def get_entry_content(entry, feed_type): | |
"""Get the content of an entry based on feed type.""" | |
if feed_type == 'atom': | |
# Atom format | |
if hasattr(entry, 'content'): | |
return entry.content[0].value if entry.content else '' | |
elif hasattr(entry, 'summary'): | |
return entry.summary | |
else: | |
# RSS 2.0 format | |
if hasattr(entry, 'content'): | |
return entry.content[0].value if entry.content else '' | |
elif hasattr(entry, 'description'): | |
return entry.description | |
return '' | |
def get_entry_date(entry, feed_type): | |
"""Get the publication date of an entry based on feed type.""" | |
if feed_type == 'atom': | |
# Atom format uses updated or published | |
if hasattr(entry, 'published_parsed'): | |
return datetime(*entry.published_parsed[:6]) | |
elif hasattr(entry, 'updated_parsed'): | |
return datetime(*entry.updated_parsed[:6]) | |
else: | |
# RSS 2.0 format uses pubDate | |
if hasattr(entry, 'published_parsed'): | |
return datetime(*entry.published_parsed[:6]) | |
return datetime.now() | |
def get_feed_metadata(feed, feed_type): | |
"""Extract metadata from feed based on its type.""" | |
metadata = { | |
'title': '', | |
'description': '', | |
'language': 'en', | |
'author': 'Unknown', | |
'publisher': '', | |
'rights': '', | |
'updated': '' | |
} | |
if feed_type == 'atom': | |
# Atom format metadata | |
metadata['title'] = feed.feed.get('title', '') | |
metadata['description'] = feed.feed.get('subtitle', '') | |
metadata['language'] = feed.feed.get('language', 'en') | |
metadata['author'] = feed.feed.get('author', 'Unknown') | |
metadata['rights'] = feed.feed.get('rights', '') | |
metadata['updated'] = feed.feed.get('updated', '') | |
else: | |
# RSS 2.0 format metadata | |
metadata['title'] = feed.feed.get('title', '') | |
metadata['description'] = feed.feed.get('description', '') | |
metadata['language'] = feed.feed.get('language', 'en') | |
metadata['author'] = feed.feed.get('author', 'Unknown') | |
metadata['copyright'] = feed.feed.get('copyright', '') | |
metadata['lastBuildDate'] = feed.feed.get('lastBuildDate', '') | |
return metadata | |
def create_ebook(feed_url, start_date, end_date, output_file): | |
"""Create an ebook from RSS feed entries within the specified date range.""" | |
logging.info(f"Starting ebook creation from feed: {feed_url}") | |
logging.info(f"Date range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}") | |
# Parse the RSS feed | |
feed = feedparser.parse(feed_url) | |
if feed.bozo: | |
logging.error(f"Error parsing feed: {feed.bozo_exception}") | |
return False | |
# Determine feed type | |
feed_type = get_feed_type(feed) | |
logging.info(f"Detected feed type: {feed_type}") | |
logging.info(f"Successfully parsed feed: {feed.feed.get('title', 'Unknown Feed')}") | |
# Create a new EPUB book | |
book = epub.EpubBook() | |
# Extract metadata based on feed type | |
metadata = get_feed_metadata(feed, feed_type) | |
logging.info(f"Setting metadata for ebook: {metadata['title']}") | |
# Set basic metadata | |
book.set_identifier(feed_url) # Use feed URL as unique identifier | |
book.set_title(metadata['title']) | |
book.set_language(metadata['language']) | |
book.add_author(metadata['author']) | |
# Add additional metadata if available | |
if metadata['description']: | |
book.add_metadata('DC', 'description', metadata['description']) | |
if metadata['publisher']: | |
book.add_metadata('DC', 'publisher', metadata['publisher']) | |
if metadata['rights']: | |
book.add_metadata('DC', 'rights', metadata['rights']) | |
if metadata['updated']: | |
book.add_metadata('DC', 'date', metadata['updated']) | |
# Add date range to description | |
date_range_desc = f"Content from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}" | |
book.add_metadata('DC', 'description', f"{metadata['description']}\n\n{date_range_desc}") | |
# Create table of contents | |
chapters = [] | |
toc = [] | |
# Process entries within date range | |
entries_processed = 0 | |
entries_in_range = 0 | |
consecutive_out_of_range = 0 | |
current_page = 1 | |
processed_urls = set() # Track processed URLs to avoid duplicates | |
logging.info("Starting to process feed entries...") | |
while True: | |
logging.info(f"Processing page {current_page} with {len(feed.entries)} entries") | |
# Process current batch of entries | |
for entry in feed.entries[entries_processed:]: | |
entries_processed += 1 | |
# Skip if we've already processed this entry | |
entry_id = entry.get('id', entry.get('link', '')) | |
if entry_id in processed_urls: | |
logging.debug(f"Skipping duplicate entry: {entry_id}") | |
continue | |
processed_urls.add(entry_id) | |
# Get entry date based on feed type | |
entry_date = get_entry_date(entry, feed_type) | |
if entry_date < start_date: | |
consecutive_out_of_range += 1 | |
logging.debug(f"Skipping entry from {entry_date.strftime('%Y-%m-%d')} (before start date)") | |
continue | |
elif entry_date > end_date: | |
consecutive_out_of_range += 1 | |
logging.debug(f"Skipping entry from {entry_date.strftime('%Y-%m-%d')} (after end date)") | |
continue | |
else: | |
consecutive_out_of_range = 0 | |
entries_in_range += 1 | |
# Create chapter | |
title = entry.get('title', 'Untitled') | |
logging.info(f"Adding chapter: {title} ({entry_date.strftime('%Y-%m-%d')})") | |
# Get content based on feed type | |
content = get_entry_content(entry, feed_type) | |
# Clean the content | |
cleaned_content = clean_html(content) | |
# Create chapter | |
chapter = epub.EpubHtml( | |
title=title, | |
file_name=f'chapter_{len(chapters)}.xhtml', | |
content=f'<h1>{title}</h1>{cleaned_content}' | |
) | |
# Add chapter to book | |
book.add_item(chapter) | |
chapters.append(chapter) | |
toc.append(epub.Link(chapter.file_name, title, chapter.id)) | |
# If we have no entries in range or we've seen too many consecutive out-of-range entries, stop | |
if entries_in_range == 0 or consecutive_out_of_range >= 10: | |
if entries_in_range == 0: | |
logging.warning("No entries found within the specified date range") | |
else: | |
logging.info(f"Stopping after {consecutive_out_of_range} consecutive out-of-range entries") | |
break | |
# Try to get more entries if available | |
next_page_url = get_next_feed_page(feed, feed_url) | |
if next_page_url: | |
current_page += 1 | |
logging.info(f"Fetching next page: {next_page_url}") | |
feed = feedparser.parse(next_page_url) | |
if not feed.entries: | |
logging.info("No more entries available") | |
break | |
else: | |
logging.info("No more pages available") | |
break | |
if entries_in_range == 0: | |
logging.error("No entries found within the specified date range") | |
return False | |
logging.info(f"Processed {entries_processed} total entries, {entries_in_range} within date range") | |
# Add table of contents | |
book.toc = toc | |
# Add navigation files | |
book.add_item(epub.EpubNcx()) | |
book.add_item(epub.EpubNav()) | |
# Define CSS style | |
style = ''' | |
@namespace epub "http://www.idpf.org/2007/ops"; | |
body { | |
font-family: Cambria, Liberation Serif, serif; | |
} | |
h1 { | |
text-align: left; | |
text-transform: uppercase; | |
font-weight: 200; | |
} | |
''' | |
# Add CSS file | |
nav_css = epub.EpubItem( | |
uid="style_nav", | |
file_name="style/nav.css", | |
media_type="text/css", | |
content=style | |
) | |
book.add_item(nav_css) | |
# Create spine | |
book.spine = ['nav'] + chapters | |
# Write the EPUB file | |
logging.info(f"Writing EPUB file: {output_file}") | |
epub.write_epub(output_file, book, {}) | |
logging.info("EPUB file created successfully") | |
return True | |
def main(): | |
parser = argparse.ArgumentParser(description='Convert RSS feed to EPUB ebook') | |
parser.add_argument('feed_url', help='URL of the RSS feed') | |
parser.add_argument('--start-date', help='Start date (YYYY-MM-DD)', | |
default=(datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d')) | |
parser.add_argument('--end-date', help='End date (YYYY-MM-DD)', | |
default=datetime.now().strftime('%Y-%m-%d')) | |
parser.add_argument('--output', help='Output EPUB file name', | |
default='rss_feed.epub') | |
parser.add_argument('--debug', action='store_true', help='Enable debug logging') | |
args = parser.parse_args() | |
if args.debug: | |
logging.getLogger().setLevel(logging.DEBUG) | |
# Parse dates | |
start_date = datetime.strptime(args.start_date, '%Y-%m-%d') | |
end_date = datetime.strptime(args.end_date, '%Y-%m-%d') | |
# Create ebook | |
if create_ebook(args.feed_url, start_date, end_date, args.output): | |
logging.info(f"Successfully created ebook: {args.output}") | |
else: | |
logging.error("Failed to create ebook") | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment