Created
May 15, 2025 17:46
-
-
Save grapeot/7dd9a452f1706aa5b950885a45cff29e to your computer and use it in GitHub Desktop.
修仙小说物品提取
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ollama | |
import os | |
import glob | |
from multiprocessing import Pool, cpu_count | |
import logging | |
import time | |
import tqdm # Added for progress bar | |
# --- Configuration --- | |
SECTIONS_DIR = "sections" | |
OBJECTS_DIR = "objects" | |
OLLAMA_MODEL = "qwen3:235b-a22b" # User-specified model | |
NUM_PROCESSES = 2 # User-specified number of processes | |
LOG_LEVEL = logging.INFO # logging.DEBUG for more verbose output | |
# --- Prompt Template --- | |
PROMPT_PREFIX = "" # Modified: Removed prefix as requested | |
PROMPT_SUFFIX = """ | |
阅读上面的小说,找出其中可能是有特殊功效的物品的词组。如果小说中有针对这个物品功效的说明,也一并列出。如果只是一般的日常用品可以忽略。 | |
格式:如果没有对功用的相关说明,每行放一个物品名称;如果有相关说明,在物品名称后加入冒号和详细的功效说明。 | |
比如: | |
幽冥鬼火:从幽冥之地产生的火焰,可以炼丹 | |
如果没有出现特殊功效的物品名称,则输出空字符串。 | |
""" | |
# --- Logging Setup --- | |
logging.basicConfig(level=LOG_LEVEL, format='%(asctime)s - %(levelname)s - %(processName)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
def ensure_dir_exists(directory_path): | |
"""Creates a directory if it doesn't already exist.""" | |
if not os.path.exists(directory_path): | |
try: | |
os.makedirs(directory_path) | |
logger.info(f"Created directory: {directory_path}") | |
except OSError as e: | |
logger.error(f"Error creating directory {directory_path}: {e}") | |
raise | |
def construct_prompt(chapter_content): | |
"""Constructs the full prompt for the Ollama API.""" | |
return f"{PROMPT_PREFIX}{chapter_content}{PROMPT_SUFFIX}" | |
def call_ollama_api(prompt_text, model_name): | |
""" | |
Calls the Ollama API with the given prompt and model. | |
Retries a few times in case of transient errors. | |
""" | |
max_retries = 3 | |
retry_delay = 10 # seconds | |
for attempt in range(max_retries): | |
try: | |
logger.debug(f"Attempting to call Ollama API (attempt {attempt + 1}/{max_retries}) with model {model_name}.") | |
# Ensure Ollama server is running and the model is pulled: | |
# ollama serve & | |
# ollama pull qwen3:235b-a22b | |
response = ollama.chat( | |
model=model_name, | |
messages=[ | |
{ | |
'role': 'user', | |
'content': prompt_text, | |
} | |
] | |
) | |
logger.debug(f"Ollama API response received.") | |
return response['message']['content'] | |
except ollama.ResponseError as e: | |
logger.error(f"Ollama API ResponseError on attempt {attempt + 1}: {e}") | |
if e.status_code == 404: # Model not found | |
logger.error(f"Model '{model_name}' not found. Please ensure it's pulled: `ollama pull {model_name}`") | |
# Could try to pull it here, but for now, let's just error out. | |
# ollama.pull(model_name) | |
# logger.info(f"Attempted to pull model {model_name}. Please retry.") | |
return f"Error: Model '{model_name}' not found. Details: {e.error}" | |
if attempt < max_retries - 1: | |
logger.info(f"Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
else: | |
logger.error(f"Max retries reached for Ollama API call.") | |
return f"Error: Max retries reached. Last error: {e.error}" | |
except Exception as e: | |
logger.error(f"An unexpected error occurred during Ollama API call on attempt {attempt + 1}: {e}") | |
if attempt < max_retries - 1: | |
logger.info(f"Retrying in {retry_delay} seconds...") | |
time.sleep(retry_delay) | |
else: | |
logger.error(f"Max retries reached for Ollama API call due to unexpected error.") | |
return f"Error: Max retries reached. Last unexpected error: {str(e)}" | |
return "Error: Ollama API call failed after multiple retries." | |
def process_file(txt_file_path): | |
""" | |
Processes a single text file: reads content, calls Ollama, saves result. | |
""" | |
try: | |
base_name = os.path.basename(txt_file_path) | |
output_file_path = os.path.join(OBJECTS_DIR, base_name) | |
if os.path.exists(output_file_path): | |
# Check if existing output is an error message, if so, allow reprocessing | |
try: | |
with open(output_file_path, 'r', encoding='utf-8') as f_out_check: | |
if f_out_check.read().strip().startswith("Error:"): | |
logger.info(f"Output file {output_file_path} contains an error. Reprocessing.") | |
else: | |
logger.info(f"Output file {output_file_path} already exists. Skipping.") | |
return f"Skipped: {output_file_path} (already exists)" | |
except Exception as e_read: | |
logger.warning(f"Could not read existing output file {output_file_path} to check for errors, will reprocess: {e_read}") | |
logger.info(f"Processing file: {txt_file_path}") | |
try: | |
with open(txt_file_path, 'r', encoding='utf-8') as f_in: | |
chapter_content = f_in.read() | |
except Exception as e: | |
logger.error(f"Error reading file {txt_file_path}: {e}") | |
return f"Error reading {txt_file_path}: {e}" | |
if not chapter_content.strip(): | |
logger.warning(f"File {txt_file_path} is empty. Skipping Ollama call.") | |
# Create an empty output file or a note. | |
with open(output_file_path, 'w', encoding='utf-8') as f_out: | |
f_out.write("Source file was empty.") | |
return f"Processed (empty): {txt_file_path}" | |
prompt = construct_prompt(chapter_content) | |
logger.debug(f"Constructed prompt for {base_name}") | |
ollama_result = call_ollama_api(prompt, OLLAMA_MODEL) | |
try: | |
with open(output_file_path, 'w', encoding='utf-8') as f_out: | |
f_out.write(ollama_result) | |
logger.info(f"Successfully processed and saved: {output_file_path}") | |
if ollama_result.startswith("Error:"): | |
return f"Completed with error: {output_file_path} - Ollama Error: {ollama_result}" | |
return f"Success: {output_file_path}" | |
except Exception as e: | |
logger.error(f"Error writing output file {output_file_path}: {e}") | |
return f"Error writing {output_file_path}: {e}" | |
except Exception as e: | |
logger.error(f"Unhandled exception while processing {txt_file_path}: {e}") | |
# Attempt to write error to the output file for this specific chapter | |
try: | |
base_name = os.path.basename(txt_file_path) | |
output_file_path = os.path.join(OBJECTS_DIR, base_name) | |
with open(output_file_path, 'w', encoding='utf-8') as f_out_err: | |
f_out_err.write(f"Error processing this chapter: {str(e)}") | |
except Exception as e_write_err: | |
logger.error(f"Could not even write error file for {txt_file_path}: {e_write_err}") | |
return f"Failed (unhandled): {txt_file_path} - {e}" | |
def main(): | |
""" | |
Main function to orchestrate the processing of chapter files. | |
""" | |
logger.info("Starting chapter processing script.") | |
logger.info(f"Using Ollama model: {OLLAMA_MODEL}") | |
logger.info(f"Number of parallel processes: {NUM_PROCESSES}") | |
try: | |
ensure_dir_exists(SECTIONS_DIR) | |
ensure_dir_exists(OBJECTS_DIR) | |
except Exception as e: | |
logger.error(f"Could not create necessary directories. Exiting. Error: {e}") | |
return | |
txt_files = glob.glob(os.path.join(SECTIONS_DIR, "*.txt")) | |
if not txt_files: | |
logger.warning(f"No .txt files found in {SECTIONS_DIR}. Exiting.") | |
return | |
txt_files.sort() # Sort for consistent processing order | |
# TEST MODE REMOVED - processing all files now | |
# num_test_files = 6 | |
# if len(txt_files) > num_test_files: | |
# logger.info(f"TEST MODE: Selecting first {num_test_files} files out of {len(txt_files)} for processing.") | |
# txt_files = txt_files[:num_test_files] | |
# else: | |
# logger.info(f"TEST MODE: Fewer than {num_test_files} files available. Processing all {len(txt_files)} files.") | |
# logger.debug(f"Files to process in test mode: {txt_files}") | |
logger.info(f"Found {len(txt_files)} .txt files to process in {SECTIONS_DIR}.") | |
# Ensure NUM_PROCESSES is not more than available CPUs or number of files | |
actual_num_processes = min(NUM_PROCESSES, cpu_count(), len(txt_files)) | |
if actual_num_processes < NUM_PROCESSES: | |
logger.warning(f"Adjusted number of processes to {actual_num_processes} due to CPU count or file count.") | |
start_time = time.time() | |
with Pool(processes=actual_num_processes) as pool: | |
# results = pool.map(process_file, txt_files) | |
results = list(tqdm.tqdm(pool.imap_unordered(process_file, txt_files), total=len(txt_files), desc="Processing Chapters")) # Added tqdm | |
end_time = time.time() | |
logger.info(f"Processing completed in {end_time - start_time:.2f} seconds.") | |
success_count = 0 | |
skipped_count = 0 | |
error_count = 0 | |
for result in results: | |
logger.info(f"File processing result: {result}") | |
if result and result.startswith("Success:"): | |
success_count +=1 | |
elif result and result.startswith("Skipped:"): | |
skipped_count +=1 | |
else: | |
error_count +=1 # Includes "Error:", "Failed:", "Completed with error:" | |
logger.info(f"Summary: {success_count} succeeded, {skipped_count} skipped, {error_count} failed/errored.") | |
logger.info("Script finished.") | |
if __name__ == "__main__": | |
# Before running, ensure: | |
# 1. Ollama is installed and the server is running (`ollama serve`). | |
# 2. The model is pulled (`ollama pull qwen3:235b-a22b`). | |
# 3. The 'ollama' Python package is installed in your environment (`pip install ollama`). | |
# 4. The 'tqdm' Python package is installed (`pip install tqdm`). | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment