|
import logging |
|
import sys |
|
import os |
|
from datetime import datetime |
|
import time |
|
from typing import Optional, Union, TextIO, Dict, Any |
|
import inspect |
|
from pathlib import Path |
|
import gzip |
|
import shutil |
|
from logging.handlers import RotatingFileHandler |
|
import functools |
|
import threading |
|
from enum import Enum |
|
from collections import defaultdict |
|
|
|
# Try to import colorama, but don't fail if it's not available |
|
try: |
|
import colorama |
|
COLORAMA_AVAILABLE = True |
|
colorama.init() |
|
except ImportError: |
|
COLORAMA_AVAILABLE = False |
|
print("Warning: colorama not installed. Console output will not be colored. " |
|
"Install with: pip install colorama") |
|
|
|
class LogLevel(Enum): |
|
"""Custom log levels""" |
|
TRACE = 5 |
|
PERFORMANCE = 15 |
|
SUCCESS = 25 |
|
|
|
class ColorFormatter(logging.Formatter): |
|
"""Formatter adding colors to levelnames and messages""" |
|
|
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
|
|
# Define colors only if colorama is available |
|
if COLORAMA_AVAILABLE: |
|
self.COLORS = { |
|
'DEBUG': colorama.Fore.BLUE, |
|
'INFO': colorama.Fore.GREEN, |
|
'WARNING': colorama.Fore.YELLOW, |
|
'ERROR': colorama.Fore.RED, |
|
'CRITICAL': colorama.Fore.RED + colorama.Style.BRIGHT, |
|
'TRACE': colorama.Fore.MAGENTA, |
|
'PERFORMANCE': colorama.Fore.CYAN, |
|
'SUCCESS': colorama.Fore.GREEN + colorama.Style.BRIGHT |
|
} |
|
self.RESET = colorama.Style.RESET_ALL |
|
else: |
|
self.COLORS = {} |
|
self.RESET = '' |
|
|
|
def format(self, record): |
|
# Add colors to levelname only if colorama is available |
|
if COLORAMA_AVAILABLE and record.levelname in self.COLORS: |
|
record.levelname = f"{self.COLORS[record.levelname]}{record.levelname}{self.RESET}" |
|
return super().format(record) |
|
|
|
class SafeColorFormatter(logging.Formatter): |
|
"""Fallback formatter when colorama is not available""" |
|
|
|
def format(self, record): |
|
return super().format(record) |
|
|
|
class CompressedRotatingFileHandler(RotatingFileHandler): |
|
"""A handler that rotates files and compresses old logs""" |
|
|
|
def rotation_filename(self, default_name: str) -> str: |
|
"""Generate the name of the rotated file""" |
|
return f"{default_name}.{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
|
|
|
def rotate(self, source: str, dest: str) -> None: |
|
"""Rotate and compress the old log file""" |
|
if os.path.exists(source): |
|
try: |
|
with open(source, 'rb') as f_in: |
|
with gzip.open(f"{dest}.gz", 'wb') as f_out: |
|
shutil.copyfileobj(f_in, f_out) |
|
os.remove(source) |
|
except Exception as e: |
|
# If compression fails, fall back to simple rotation |
|
if os.path.exists(source): |
|
shutil.copy2(source, dest) |
|
os.remove(source) |
|
print(f"Warning: Log compression failed: {str(e)}") |
|
|
|
# ... [PerformanceMetrics class remains the same] ... |
|
|
|
class CustomLogger: |
|
""" |
|
Enhanced logger with rotation, compression, optional colors, |
|
custom levels, and performance metrics. |
|
""" |
|
|
|
_instance = None |
|
_initialized = False |
|
|
|
def __new__(cls): |
|
if cls._instance is None: |
|
cls._instance = super(CustomLogger, cls).__new__(cls) |
|
return cls._instance |
|
|
|
def __init__(self): |
|
if self._initialized: |
|
return |
|
|
|
# Initialize logging |
|
self.logger = logging.getLogger('CustomLogger') |
|
self.logger.setLevel(logging.DEBUG) |
|
self._initialized = True |
|
|
|
# Initialize handlers |
|
self.file_handler = None |
|
self.console_handler = None |
|
|
|
# Initialize performance metrics |
|
self.metrics = PerformanceMetrics() |
|
|
|
# Register custom log levels |
|
logging.addLevelName(LogLevel.TRACE.value, 'TRACE') |
|
logging.addLevelName(LogLevel.PERFORMANCE.value, 'PERFORMANCE') |
|
logging.addLevelName(LogLevel.SUCCESS.value, 'SUCCESS') |
|
|
|
# Default formats |
|
self.console_format = ( |
|
'%(asctime)s | %(levelname)-8s | %(message)s' |
|
) |
|
self.file_format = ( |
|
'%(asctime)s | %(levelname)-8s | %(filename)s:%(lineno)d | ' |
|
'%(funcName)s() | %(message)s' |
|
) |
|
|
|
# Initialize with console logging by default |
|
self.setup_console_logging() |
|
|
|
def setup_console_logging(self, format_string: Optional[str] = None, |
|
level: int = logging.DEBUG, |
|
use_colors: bool = True) -> None: |
|
""" |
|
Setup console logging with optional colors |
|
|
|
Args: |
|
format_string: Custom format string for logs |
|
level: Logging level |
|
use_colors: Whether to use colors (if colorama is available) |
|
""" |
|
if self.console_handler: |
|
self.logger.removeHandler(self.console_handler) |
|
|
|
self.console_handler = logging.StreamHandler(sys.stdout) |
|
self.console_handler.setLevel(level) |
|
|
|
# Choose formatter based on color availability and preference |
|
if use_colors and COLORAMA_AVAILABLE: |
|
formatter = ColorFormatter(format_string or self.console_format) |
|
else: |
|
formatter = SafeColorFormatter(format_string or self.console_format) |
|
|
|
self.console_handler.setFormatter(formatter) |
|
self.logger.addHandler(self.console_handler) |
|
|
|
def setup_file_logging(self, |
|
filename: Optional[Union[str, Path]] = None, |
|
format_string: Optional[str] = None, |
|
level: int = logging.DEBUG, |
|
max_bytes: int = 1024 * 1024, # 1MB |
|
backup_count: int = 5, |
|
compress_logs: bool = True) -> None: |
|
""" |
|
Setup file logging with optional rotation and compression |
|
|
|
Args: |
|
filename: Log file path |
|
format_string: Custom format string for logs |
|
level: Logging level |
|
max_bytes: Maximum size of log file before rotation |
|
backup_count: Number of backup files to keep |
|
compress_logs: Whether to compress rotated logs |
|
""" |
|
if self.file_handler: |
|
self.logger.removeHandler(self.file_handler) |
|
|
|
if filename is None: |
|
filename = f"logs/app_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" |
|
|
|
# Ensure logs directory exists |
|
os.makedirs(os.path.dirname(filename), exist_ok=True) |
|
|
|
# Choose handler based on compression preference |
|
if compress_logs: |
|
self.file_handler = CompressedRotatingFileHandler( |
|
filename, |
|
maxBytes=max_bytes, |
|
backupCount=backup_count |
|
) |
|
else: |
|
self.file_handler = RotatingFileHandler( |
|
filename, |
|
maxBytes=max_bytes, |
|
backupCount=backup_count |
|
) |
|
|
|
self.file_handler.setLevel(level) |
|
formatter = logging.Formatter(format_string or self.file_format) |
|
self.file_handler.setFormatter(formatter) |
|
self.logger.addHandler(self.file_handler) |
|
|
|
# ... [Rest of the CustomLogger class remains the same] ... |
|
|
|
# Create a global logger instance |
|
logger = CustomLogger() |
|
|
|
# Convenience functions for direct import |
|
debug = logger.debug |
|
info = logger.info |
|
warning = logger.warning |
|
error = logger.error |
|
critical = logger.critical |
|
trace = logger.trace |
|
performance = logger.performance |
|
success = logger.success |
|
get_performance_metrics = logger.get_performance_metrics |