Last active
June 13, 2025 02:04
-
-
Save rightson/8f4d038c87f3aaaa311c817e63768473 to your computer and use it in GitHub Desktop.
Action Config Manager
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import re | |
import logging | |
from typing import List, Dict, Any, Tuple, Set, Optional | |
from collections import defaultdict | |
# Set up logger | |
logger = logging.getLogger(__name__) | |
def has_uncommented_action_table_created(tcl_content: str) -> bool: | |
"""Check if TCL content contains uncommented action_table definition""" | |
lines = tcl_content.split('\n') | |
for line in lines: | |
stripped = line.strip() | |
# Skip comment lines | |
if stripped.startswith('#'): | |
continue | |
# Check for set action_table definition | |
if re.search(r'set\s+action_table\s+\[dict', stripped, re.IGNORECASE): | |
return True | |
return False | |
def extract_dict_entries(tcl_content: str) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Tuple[int, int]]]: | |
""" | |
Extract all dict append entries from TCL content | |
Returns: (entry dictionary, entry position information) | |
""" | |
entries = {} | |
positions = {} | |
lines = tcl_content.split('\n') | |
i = 0 | |
while i < len(lines): | |
line = lines[i] | |
stripped = line.strip() | |
# Match dict append action_table $variable { or dict append action_table variable { | |
# Handle both $variable and variable formats | |
match = re.match(r'dict\s+append\s+action_table\s+\$?(\w+)\s*\{', stripped, re.IGNORECASE) | |
if match: | |
variable = match.group(1) # Variable name without $ symbol | |
start_line = i | |
# Parse dictionary content until finding corresponding closing brace | |
brace_count = 1 | |
entry_data = {} | |
i += 1 | |
while i < len(lines) and brace_count > 0: | |
current_line = lines[i].strip() | |
# Count braces | |
brace_count += current_line.count('{') - current_line.count('}') | |
# Parse key-value pairs | |
if brace_count > 0: | |
key_value_match = re.match(r'(\w+)\s*\{([^}]*)\}', current_line) | |
if key_value_match: | |
key = key_value_match.group(1) | |
value = key_value_match.group(2) | |
entry_data[key] = value | |
i += 1 | |
entries[variable] = entry_data | |
positions[variable] = (start_line, i - 1) | |
logger.debug(f"Extracted entry '{variable}': {entry_data}") | |
else: | |
i += 1 | |
logger.debug(f"Total extracted entries: {len(entries)}") | |
return entries, positions | |
def detect_duplicates(existing_entries: Dict[str, Dict[str, Any]]) -> Dict[Tuple[str, str], List[str]]: | |
""" | |
Detect duplicate variable names based on milestone/event combination | |
Returns: Dictionary mapping (milestone, event) to list of variable names | |
""" | |
milestone_event_map = defaultdict(list) | |
for variable, entry_data in existing_entries.items(): | |
milestone = entry_data.get('milestone', '') | |
event = entry_data.get('event', '') | |
if milestone and event: | |
milestone_event_map[(milestone, event)].append(variable) | |
# Filter to only return duplicates | |
duplicates = {key: variables for key, variables in milestone_event_map.items() if len(variables) > 1} | |
if duplicates: | |
logger.warning(f"Detected {len(duplicates)} duplicate milestone/event combinations:") | |
for (milestone, event), variables in duplicates.items(): | |
logger.warning(f" {milestone}/{event}: variables {variables}") | |
return duplicates | |
def resolve_duplicates(existing_entries: Dict[str, Dict[str, Any]], | |
duplicates: Dict[Tuple[str, str], List[str]], | |
existing_keys: Set[str]) -> Tuple[Dict[str, str], Set[str]]: | |
""" | |
Resolve duplicates by keeping the first occurrence and renaming others | |
Returns: (variable_rename_map, variables_to_remove) | |
""" | |
variable_rename_map = {} | |
variables_to_remove = set() | |
for (milestone, event), duplicate_variables in duplicates.items(): | |
logger.info(f"Resolving duplicates for {milestone}/{event}: {duplicate_variables}") | |
# Keep the first variable, rename or remove others | |
keep_variable = duplicate_variables[0] | |
logger.info(f" Keeping variable: {keep_variable}") | |
for i, dup_var in enumerate(duplicate_variables[1:], 1): | |
# Try to generate a unique name for the duplicate | |
new_name = generate_unique_key(existing_keys, f"{keep_variable}_dup") | |
existing_keys.add(new_name) | |
variable_rename_map[dup_var] = new_name | |
logger.info(f" Renaming duplicate variable: {dup_var} -> {new_name}") | |
return variable_rename_map, variables_to_remove | |
def find_matching_entry(existing_entries: Dict[str, Dict[str, Any]], milestone: str, event: str) -> Optional[str]: | |
"""Find matching variable name based on milestone and event combination""" | |
for variable, entry_data in existing_entries.items(): | |
if (entry_data.get('milestone', '') == milestone and | |
entry_data.get('event', '') == event): | |
logger.debug(f"Found matching entry '{variable}' for {milestone}/{event}") | |
return variable | |
logger.debug(f"No matching entry found for {milestone}/{event}") | |
return None | |
def generate_unique_key(existing_keys: Set[str], base_name: str = "tcl_mgr_entry") -> str: | |
""" | |
Generate unique variable name that doesn't conflict with existing keys | |
Uses a distinctive prefix to avoid conflicts with externally managed variables | |
""" | |
counter = 1 | |
while f"{base_name}_{counter}" in existing_keys: | |
counter += 1 | |
new_key = f"{base_name}_{counter}" | |
logger.debug(f"Generated unique key: {new_key}") | |
return new_key | |
def format_dict_entry(variable: str, event_data: Dict[str, Any]) -> List[str]: | |
"""Format dictionary entry to TCL format (without $ prefix)""" | |
lines = [f"dict append action_table {variable} {{"] | |
# Ensure specific field order | |
field_order = ['milestone', 'event', 'design_type', 'status', 'start_time', 'deadline', 'time_left', 'fdi_checker', 'action'] | |
# Process ordered fields first | |
for field in field_order: | |
if field in event_data: | |
value = event_data[field] | |
lines.append(f" {field:<12} {{{value}}}") | |
# Process other fields not in order | |
for key, value in event_data.items(): | |
if key not in field_order: | |
lines.append(f" {key:<12} {{{value}}}") | |
lines.append("}") | |
logger.debug(f"Formatted entry for variable '{variable}': {len(lines)} lines") | |
return lines | |
def is_different(existing_event: Dict[str, Any], new_event: Dict[str, Any]) -> bool: | |
"""Compare if two events have differences (excluding milestone and event)""" | |
# Only compare fields other than milestone and event | |
existing_copy = {k: v for k, v in existing_event.items() if k not in ['milestone', 'event']} | |
new_copy = {k: v for k, v in new_event.items() if k not in ['milestone', 'event']} | |
is_diff = existing_copy != new_copy | |
if is_diff: | |
logger.debug(f"Detected differences between existing and new event data") | |
logger.debug(f"Existing: {existing_copy}") | |
logger.debug(f"New: {new_copy}") | |
return is_diff | |
def update_action_config_core(events_data: List[Dict[str, Any]], action_config_file: str, verbose: bool = True) -> Dict[str, Any]: | |
"""Core logic: Update action_config.tcl file with duplicate detection and resolution""" | |
logger.info(f"Starting action_config.tcl update for file: {action_config_file}") | |
logger.debug(f"Input events_data: {len(events_data)} events") | |
# Initialize template | |
template = """if { [info exist action_table ] } { | |
if { [array exist action_table] } { | |
array unset action_table | |
} else { | |
unset action_table | |
} | |
} | |
set action_table [dict create] | |
""" | |
# Read or create file | |
if os.path.exists(action_config_file): | |
with open(action_config_file, "r", encoding='utf-8') as f: | |
tcl_content = f.read() | |
if not tcl_content.strip() or not has_uncommented_action_table_created(tcl_content): | |
logger.info("Using template to initialize action_config.tcl as file is empty or missing initialization") | |
tcl_content = template | |
else: | |
logger.info("Found existing action_config.tcl with proper initialization") | |
else: | |
logger.info("action_config.tcl does not exist, creating with template") | |
tcl_content = template | |
# Parse existing entries | |
existing_entries, entry_positions = extract_dict_entries(tcl_content) | |
existing_keys = set(existing_entries.keys()) | |
logger.debug(f"Existing keys: {existing_keys}") | |
# Detect and resolve duplicates | |
duplicates = detect_duplicates(existing_entries) | |
duplicate_fixes_made = len(duplicates) > 0 | |
variable_rename_map = {} | |
if duplicate_fixes_made: | |
logger.info(f"Found {len(duplicates)} duplicate milestone/event combinations, resolving...") | |
variable_rename_map, variables_to_remove = resolve_duplicates(existing_entries, duplicates, existing_keys) | |
# Update existing_keys to reflect renames | |
for old_var, new_var in variable_rename_map.items(): | |
existing_keys.discard(old_var) | |
existing_keys.add(new_var) | |
updates_made = False | |
entries_to_update = {} | |
new_entries = [] | |
# Process each event | |
for event in events_data: | |
milestone = event.get('milestone', '') | |
event_name = event.get('event', '') | |
if not milestone or not event_name: | |
logger.info(f"Skipping event with missing milestone or event: {event}") | |
continue | |
logger.debug(f"Processing event: {milestone}/{event_name}") | |
matching_key = find_matching_entry(existing_entries, milestone, event_name) | |
if matching_key: | |
# Check if this key was renamed due to duplicate resolution | |
actual_key = variable_rename_map.get(matching_key, matching_key) | |
# Found matching entry, check if update is needed | |
existing_event = existing_entries[matching_key] | |
if is_different(existing_event, event): | |
entries_to_update[actual_key] = event | |
updates_made = True | |
logger.info(f"Field changes detected for {milestone}/{event_name} (key: {actual_key})") | |
else: | |
logger.debug(f"No changes needed for {milestone}/{event_name} (key: {actual_key})") | |
else: | |
# Add new entry | |
new_key = generate_unique_key(existing_keys) | |
existing_keys.add(new_key) | |
new_entries.append((new_key, event)) | |
updates_made = True | |
logger.info(f"New entry to be added: {milestone}/{event_name} (key: {new_key})") | |
logger.debug(f"Summary - Entries to update: {len(entries_to_update)}, New entries: {len(new_entries)}") | |
# If there are updates or duplicate fixes, regenerate file content | |
if updates_made or duplicate_fixes_made: | |
logger.info("Applying updates to action_config.tcl") | |
lines = tcl_content.split('\n') | |
result_lines = [] | |
i = 0 | |
processed_variables = set() | |
while i < len(lines): | |
line = lines[i] | |
# Check if this is a dict append line | |
match = re.match(r'dict\s+append\s+action_table\s+\$?(\w+)\s*\{', line.strip(), re.IGNORECASE) | |
if match: | |
variable = match.group(1) # Variable name without $ symbol | |
# Handle renamed variables | |
actual_variable = variable_rename_map.get(variable, variable) | |
# Skip if we've already processed this variable (handles duplicates) | |
if actual_variable in processed_variables: | |
logger.debug(f"Skipping duplicate variable: {variable}") | |
# Skip original dict append block | |
brace_count = 1 | |
i += 1 | |
while i < len(lines) and brace_count > 0: | |
current_line = lines[i] | |
brace_count += current_line.count('{') - current_line.count('}') | |
i += 1 | |
continue | |
processed_variables.add(actual_variable) | |
# Check if this entry needs updating | |
if actual_variable in entries_to_update: | |
# Replace entire dict append block with updated data | |
new_entry_lines = format_dict_entry(actual_variable, entries_to_update[actual_variable]) | |
result_lines.extend(new_entry_lines) | |
logger.debug(f"Updated dict append block for variable: {actual_variable}") | |
elif variable in variable_rename_map: | |
# This is a renamed duplicate - use original data but with new name | |
original_data = existing_entries[variable] | |
new_entry_lines = format_dict_entry(actual_variable, original_data) | |
result_lines.extend(new_entry_lines) | |
logger.debug(f"Renamed dict append block: {variable} -> {actual_variable}") | |
else: | |
# Keep original entry but update variable name if needed | |
original_data = existing_entries[variable] | |
new_entry_lines = format_dict_entry(actual_variable, original_data) | |
result_lines.extend(new_entry_lines) | |
logger.debug(f"Preserved dict append block for variable: {actual_variable}") | |
# Skip original dict append block | |
brace_count = 1 | |
i += 1 | |
while i < len(lines) and brace_count > 0: | |
current_line = lines[i] | |
brace_count += current_line.count('{') - current_line.count('}') | |
i += 1 | |
continue | |
result_lines.append(line) | |
i += 1 | |
# Add new entries at the end of file | |
for new_key, event in new_entries: | |
result_lines.append("") | |
result_lines.append(f"# New entry added by adaptive tool: {event.get('milestone', '')}/{event.get('event', '')}") | |
result_lines.extend(format_dict_entry(new_key, event)) | |
logger.debug(f"Added new dict append block for key: {new_key}") | |
# Write to file | |
with open(action_config_file, "w", encoding='utf-8') as f: | |
f.write('\n'.join(result_lines)) | |
logger.info(f"Successfully updated action_config.tcl: {action_config_file}") | |
logger.info(f"Updated {len(entries_to_update)} existing entries, added {len(new_entries)} new entries") | |
if duplicate_fixes_made: | |
logger.info(f"Fixed {len(variable_rename_map)} duplicate variable names") | |
else: | |
logger.info("No updates needed for action_config.tcl") | |
return { | |
'success': True, | |
'message': 'Update completed successfully', | |
'updates_made': updates_made or duplicate_fixes_made, | |
'entries_updated': len(entries_to_update), | |
'entries_added': len(new_entries), | |
'duplicates_fixed': len(variable_rename_map), | |
'duplicate_details': variable_rename_map | |
} | |
def update_action_config_from_data(events_data: List[Dict[str, Any]], | |
action_config_file: str, | |
timeout: int = 10, | |
verbose: bool = True) -> Dict[str, Any]: | |
""" | |
Main function to update action_config file from event data | |
Args: | |
events_data: List of event data | |
action_config_file: TCL configuration file path | |
timeout: Timeout setting (reserved parameter) | |
verbose: Whether to output detailed information | |
Returns: | |
Update result dictionary | |
""" | |
try: | |
logger.info(f"Starting update_action_config_from_data with {len(events_data)} events") | |
return update_action_config_core(events_data, action_config_file, verbose) | |
except Exception as e: | |
error_msg = f'Update failed: {str(e)}' | |
logger.error(error_msg) | |
return { | |
'success': False, | |
'message': error_msg, | |
'updates_made': False, | |
'entries_updated': 0, | |
'entries_added': 0, | |
'duplicates_fixed': 0, | |
'duplicate_details': {} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# test_action_config_complete.tcl | |
# Complete action config file for testing | |
# This file contains initialization and existing entries with mixed variable naming | |
if { [info exist action_table ] } { | |
if { [array exist action_table] } { | |
array unset action_table | |
} else { | |
unset action_table | |
} | |
} | |
set action_table [dict create] | |
# First existing entry with $ prefix (old style) | |
dict append action_table $legacy_entry_1 { | |
milestone {milestone1} | |
event {event1} | |
design_type {old_design1 TOP PARTITION} | |
status {old_status} | |
start_time {2025-06-10T00:00:00} | |
deadline {2025-06-28} | |
time_left {old_time_left} | |
fdi_checker {/old/path/checker1} | |
action {old_action1} | |
} | |
# Second existing entry without $ prefix (new style) | |
dict append action_table modern_entry_xyz { | |
milestone {milestone_existing} | |
event {event_existing} | |
design_type {existing_design BLOCK} | |
status {completed} | |
start_time {2025-06-05T00:00:00} | |
deadline {2025-06-25} | |
time_left {0 days} | |
fdi_checker {/path/to/existing/checker} | |
action {existing_action} | |
} | |
# Third entry with external naming convention | |
dict append action_table user_defined_var_12345 { | |
milestone {preserve_milestone} | |
event {preserve_event} | |
design_type {PRESERVE DESIGN} | |
status {preserve_status} | |
start_time {2025-06-01T00:00:00} | |
deadline {2025-06-20} | |
time_left {preserve_time} | |
fdi_checker {/preserve/path} | |
action {preserve_action} | |
} | |
# Some additional TCL code to ensure it's preserved | |
puts "This is additional TCL code that should be preserved" | |
set some_variable "test_value" | |
# End of file comment} | |
status {preserve_status} | |
start_time {2025-06-01T00:00:00} | |
deadline {2025-06-20} | |
time_left {preserve_time} | |
fdi_checker {/preserve/path} | |
action {preserve_action} | |
} | |
# Some additional TCL code to ensure it's preserved | |
puts "This is additional TCL code that should be preserved" | |
set some_variable "test_value" | |
# End of file comment |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def test_large_dataset(self, temp_dir, setup_logging): | |
"""Test with large number of events""" | |
config_file = os.path.join(temp_dir, 'large_dataset_config.tcl') | |
# Generate large dataset | |
large_events = [] | |
for i in range(100): | |
large_events.append({ | |
'milestone': f'milestone_{i}', | |
'event': f'event_{i}', | |
'design_type': f'design_{i} PARTITION', | |
'status': 'active', | |
'start_time': '2025-06-11T00:25:06', | |
'deadline': '2025-06-29', | |
'time_left': f'{i} days', | |
'fdi_checker': f'/path/to/checker_{i}', | |
'action': f'action_{i}' | |
}) | |
result = tcl_manager.update_action_config_core(large_events, config_file) | |
assert result['success'] is True | |
assert result['updates_made'] is True | |
assert result['entries_added'] == 100 | |
# Verify file content | |
with open(config_file, 'r') as f: | |
content = f.read() | |
assert content.count('dict append action_table') == 100 | |
assert 'milestone_0' in content | |
assert 'milestone_99' in content | |
# Verify no $ prefixes in generated entries | |
assert '$tcl_mgr_entry_' not in content | |
assert 'tcl_mgr_entry_' in contentimport pytest | |
import os | |
import tempfile | |
import shutil | |
from unittest.mock import patch, MagicMock | |
import logging | |
# Import the module under test | |
import tcl_manager | |
class TestTclManager: | |
@pytest.fixture | |
def temp_dir(self): | |
"""Create temporary directory for test files""" | |
temp_dir = tempfile.mkdtemp() | |
yield temp_dir | |
shutil.rmtree(temp_dir) | |
@pytest.fixture | |
def sample_events_data(self): | |
"""Sample event data for testing""" | |
return [ | |
{ | |
'milestone': 'milestone1', | |
'event': 'event1', | |
'design_type': 'design1 TOP PARTITION', | |
'status': 'active', | |
'start_time': '2025-06-11T00:25:06', | |
'deadline': '2025-06-29', | |
'time_left': '18 days', | |
'fdi_checker': '/path/to/checker1', | |
'action': 'action1' | |
}, | |
{ | |
'milestone': 'milestone2', | |
'event': 'event2', | |
'design_type': 'design2 SUB PARTITION', | |
'status': 'pending', | |
'start_time': '2025-06-12T00:25:06', | |
'deadline': '2025-06-30', | |
'time_left': '19 days', | |
'fdi_checker': '/path/to/checker2', | |
'action': 'action2' | |
} | |
] | |
@pytest.fixture | |
def existing_tcl_content(self): | |
"""Sample existing TCL content with mixed variable naming""" | |
return """# This is a comment | |
if { [info exist action_table ] } { | |
if { [array exist action_table] } { | |
array unset action_table | |
} else { | |
unset action_table | |
} | |
} | |
set action_table [dict create] | |
# Another comment - entry with $ prefix (old style) | |
dict append action_table $entry_1 { | |
milestone {milestone1} | |
event {event1} | |
design_type {old design TOP PARTITION} | |
status {old_status} | |
start_time {2025-06-10T00:25:06} | |
deadline {2025-06-28} | |
time_left {old_time} | |
fdi_checker {/old/path/checker} | |
action {old_action} | |
} | |
# More comments to preserve - entry without $ prefix (new style) | |
dict append action_table external_entry_xyz { | |
milestone {milestone3} | |
event {event3} | |
design_type {design3 BLOCK} | |
status {completed} | |
start_time {2025-06-05T00:25:06} | |
deadline {2025-06-25} | |
time_left {0 days} | |
fdi_checker {/path/to/checker3} | |
action {action3} | |
} | |
""" | |
@pytest.fixture | |
def empty_tcl_content(self): | |
"""Empty TCL content without initialization""" | |
return "# Just a comment\n# Another comment\n" | |
@pytest.fixture | |
def setup_logging(self): | |
"""Setup logging for tests""" | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger('tcl_manager') | |
return logger | |
def test_has_uncommented_action_table_created_positive(self): | |
"""Test detection of action_table creation""" | |
content = """ | |
# Comment line | |
set action_table [dict create] | |
""" | |
result = tcl_manager.has_uncommented_action_table_created(content) | |
assert result is True | |
def test_has_uncommented_action_table_created_negative(self): | |
"""Test when action_table is not created or commented""" | |
content = """ | |
# set action_table [dict create] | |
# Another comment | |
""" | |
result = tcl_manager.has_uncommented_action_table_created(content) | |
assert result is False | |
def test_extract_dict_entries(self, existing_tcl_content): | |
"""Test extraction of dictionary entries with mixed variable naming""" | |
entries, positions = tcl_manager.extract_dict_entries(existing_tcl_content) | |
assert len(entries) == 2 | |
assert 'entry_1' in entries # Should extract without $ symbol | |
assert 'external_entry_xyz' in entries # Should handle external naming | |
# Check entry_1 content | |
entry_1 = entries['entry_1'] | |
assert entry_1['milestone'] == 'milestone1' | |
assert entry_1['event'] == 'event1' | |
assert entry_1['status'] == 'old_status' | |
# Check external_entry_xyz content | |
external_entry = entries['external_entry_xyz'] | |
assert external_entry['milestone'] == 'milestone3' | |
assert external_entry['event'] == 'event3' | |
assert external_entry['status'] == 'completed' | |
# Check positions | |
assert len(positions) == 2 | |
assert 'entry_1' in positions | |
assert 'external_entry_xyz' in positions | |
def test_find_matching_entry(self): | |
"""Test finding matching entries by milestone and event""" | |
existing_entries = { | |
'entry_1': {'milestone': 'milestone1', 'event': 'event1', 'status': 'active'}, | |
'entry_2': {'milestone': 'milestone2', 'event': 'event2', 'status': 'pending'} | |
} | |
# Test finding existing entry | |
result = tcl_manager.find_matching_entry(existing_entries, 'milestone1', 'event1') | |
assert result == 'entry_1' | |
# Test not finding entry | |
result = tcl_manager.find_matching_entry(existing_entries, 'milestone3', 'event3') | |
assert result is None | |
def test_generate_unique_key(self): | |
"""Test unique key generation with external variable names""" | |
existing_keys = {'entry_1', 'external_var_abc', 'user_defined_xyz', 'tcl_mgr_entry_1'} | |
new_key = tcl_manager.generate_unique_key(existing_keys) | |
assert new_key == 'tcl_mgr_entry_2' # Should use distinctive prefix | |
assert new_key not in existing_keys | |
# Test with custom base name | |
new_key_custom = tcl_manager.generate_unique_key(existing_keys, "custom_base") | |
assert new_key_custom == 'custom_base_1' | |
assert new_key_custom not in existing_keys | |
def test_format_dict_entry(self): | |
"""Test TCL dictionary entry formatting without $ prefix""" | |
event_data = { | |
'milestone': 'test_milestone', | |
'event': 'test_event', | |
'design_type': 'TEST DESIGN', | |
'status': 'active' | |
} | |
lines = tcl_manager.format_dict_entry('test_var', event_data) | |
assert lines[0] == 'dict append action_table test_var {' # No $ prefix | |
assert lines[-1] == '}' | |
assert any('milestone' in line and 'test_milestone' in line for line in lines) | |
assert any('event' in line and 'test_event' in line for line in lines) | |
def test_is_different(self): | |
"""Test event difference detection""" | |
existing_event = { | |
'milestone': 'milestone1', | |
'event': 'event1', | |
'status': 'old_status', | |
'design_type': 'old_design' | |
} | |
# Different event (status changed) | |
new_event_different = { | |
'milestone': 'milestone1', | |
'event': 'event1', | |
'status': 'new_status', | |
'design_type': 'old_design' | |
} | |
# Same event | |
new_event_same = { | |
'milestone': 'milestone1', | |
'event': 'event1', | |
'status': 'old_status', | |
'design_type': 'old_design' | |
} | |
assert tcl_manager.is_different(existing_event, new_event_different) is True | |
assert tcl_manager.is_different(existing_event, new_event_same) is False | |
def test_update_action_config_core_new_file(self, temp_dir, sample_events_data, setup_logging): | |
"""Test updating non-existent file (should create new file)""" | |
config_file = os.path.join(temp_dir, 'new_action_config.tcl') | |
result = tcl_manager.update_action_config_core(sample_events_data, config_file) | |
assert result['success'] is True | |
assert result['updates_made'] is True | |
assert result['entries_updated'] == 0 | |
assert result['entries_added'] == 2 | |
# Check file was created | |
assert os.path.exists(config_file) | |
# Check file content | |
with open(config_file, 'r') as f: | |
content = f.read() | |
assert 'set action_table [dict create]' in content | |
assert 'milestone1' in content | |
assert 'milestone2' in content | |
def test_update_action_config_core_empty_file(self, temp_dir, sample_events_data, setup_logging): | |
"""Test updating empty file""" | |
config_file = os.path.join(temp_dir, 'empty_action_config.tcl') | |
# Create empty file | |
with open(config_file, 'w') as f: | |
f.write('') | |
result = tcl_manager.update_action_config_core(sample_events_data, config_file) | |
assert result['success'] is True | |
assert result['updates_made'] is True | |
assert result['entries_added'] == 2 | |
# Check file content | |
with open(config_file, 'r') as f: | |
content = f.read() | |
assert 'set action_table [dict create]' in content | |
def test_update_action_config_core_existing_file_update(self, temp_dir, existing_tcl_content, sample_events_data, setup_logging): | |
"""Test updating existing file with matching entries""" | |
config_file = os.path.join(temp_dir, 'existing_action_config.tcl') | |
# Create file with existing content | |
with open(config_file, 'w') as f: | |
f.write(existing_tcl_content) | |
result = tcl_manager.update_action_config_core(sample_events_data, config_file) | |
assert result['success'] is True | |
assert result['updates_made'] is True | |
assert result['entries_updated'] == 1 # milestone1/event1 should be updated | |
assert result['entries_added'] == 1 # milestone2/event2 should be added | |
# Check file content | |
with open(config_file, 'r') as f: | |
content = f.read() | |
# Check that comments are preserved | |
assert '# This is a comment' in content | |
assert '# More comments to preserve' in content | |
# Check that milestone1/event1 was updated | |
assert 'design1 TOP PARTITION' in content # New value | |
assert 'old design TOP PARTITION' not in content # Old value should be replaced | |
# Check that milestone2/event2 was added | |
assert 'milestone2' in content | |
assert 'event2' in content | |
def test_update_action_config_core_no_updates_needed(self, temp_dir, setup_logging): | |
"""Test when no updates are needed""" | |
config_file = os.path.join(temp_dir, 'no_update_config.tcl') | |
# Create file with matching content (without $ prefix) | |
existing_content = """ | |
if { [info exist action_table ] } { | |
if { [array exist action_table] } { | |
array unset action_table | |
} else { | |
unset action_table | |
} | |
} | |
set action_table [dict create] | |
dict append action_table tcl_mgr_entry_1 { | |
milestone {milestone1} | |
event {event1} | |
design_type {design1 TOP PARTITION} | |
status {active} | |
start_time {2025-06-11T00:25:06} | |
deadline {2025-06-29} | |
time_left {18 days} | |
fdi_checker {/path/to/checker1} | |
action {action1} | |
} | |
""" | |
with open(config_file, 'w') as f: | |
f.write(existing_content) | |
events_data = [{ | |
'milestone': 'milestone1', | |
'event': 'event1', | |
'design_type': 'design1 TOP PARTITION', | |
'status': 'active', | |
'start_time': '2025-06-11T00:25:06', | |
'deadline': '2025-06-29', | |
'time_left': '18 days', | |
'fdi_checker': '/path/to/checker1', | |
'action': 'action1' | |
}] | |
result = tcl_manager.update_action_config_core(events_data, config_file) | |
assert result['success'] is True | |
assert result['updates_made'] is False | |
assert result['entries_updated'] == 0 | |
assert result['entries_added'] == 0 | |
def test_update_action_config_core_invalid_events(self, temp_dir, setup_logging): | |
"""Test handling of invalid event data""" | |
config_file = os.path.join(temp_dir, 'invalid_events_config.tcl') | |
invalid_events = [ | |
{'milestone': '', 'event': 'event1'}, # Empty milestone | |
{'milestone': 'milestone1', 'event': ''}, # Empty event | |
{'milestone': 'milestone1'}, # Missing event | |
{'event': 'event1'}, # Missing milestone | |
{'milestone': 'valid_milestone', 'event': 'valid_event', 'status': 'active'} # Valid event | |
] | |
result = tcl_manager.update_action_config_core(invalid_events, config_file) | |
assert result['success'] is True | |
assert result['updates_made'] is True | |
assert result['entries_added'] == 1 # Only the valid event should be added | |
def test_update_action_config_from_data_success(self, temp_dir, sample_events_data): | |
"""Test main wrapper function success case""" | |
config_file = os.path.join(temp_dir, 'wrapper_test_config.tcl') | |
result = tcl_manager.update_action_config_from_data(sample_events_data, config_file) | |
assert result['success'] is True | |
assert result['updates_made'] is True | |
assert os.path.exists(config_file) | |
@patch('tcl_manager.update_action_config_core') | |
def test_update_action_config_from_data_exception(self, mock_core, temp_dir, sample_events_data): | |
"""Test main wrapper function exception handling""" | |
config_file = os.path.join(temp_dir, 'exception_test_config.tcl') | |
# Make the core function raise an exception | |
mock_core.side_effect = Exception("Test exception") | |
result = tcl_manager.update_action_config_from_data(sample_events_data, config_file) | |
assert result['success'] is False | |
assert 'Test exception' in result['message'] | |
assert result['updates_made'] is False | |
assert result['entries_updated'] == 0 | |
assert result['entries_added'] == 0 | |
def test_file_permissions_readonly(self, temp_dir, sample_events_data, setup_logging): | |
"""Test handling of read-only file""" | |
config_file = os.path.join(temp_dir, 'readonly_config.tcl') | |
# Create file and make it read-only | |
with open(config_file, 'w') as f: | |
f.write('# Read-only file') | |
os.chmod(config_file, 0o444) # Read-only permissions | |
try: | |
result = tcl_manager.update_action_config_from_data(sample_events_data, config_file) | |
assert result['success'] is False | |
finally: | |
# Restore permissions for cleanup | |
os.chmod(config_file, 0o644) | |
def test_mixed_variable_formats(self, temp_dir, setup_logging): | |
"""Test handling of mixed variable naming formats (with and without $ prefix)""" | |
config_file = os.path.join(temp_dir, 'mixed_format_config.tcl') | |
# Create file with mixed formats | |
mixed_content = """ | |
if { [info exist action_table ] } { | |
if { [array exist action_table] } { | |
array unset action_table | |
} else { | |
unset action_table | |
} | |
} | |
set action_table [dict create] | |
# Old style with $ prefix | |
dict append action_table $old_style_var { | |
milestone {milestone_old} | |
event {event_old} | |
status {old_status} | |
} | |
# New style without $ prefix | |
dict append action_table new_style_var { | |
milestone {milestone_new} | |
event {event_new} | |
status {new_status} | |
} | |
# External variable name | |
dict append action_table user_defined_12345 { | |
milestone {milestone_external} | |
event {event_external} | |
status {external_status} | |
} | |
""" | |
with open(config_file, 'w') as f: | |
f.write(mixed_content) | |
# Update with new events | |
events_data = [ | |
{ | |
'milestone': 'milestone_old', | |
'event': 'event_old', | |
'status': 'updated_old_status', | |
'design_type': 'UPDATED OLD DESIGN' | |
}, | |
{ | |
'milestone': 'milestone_completely_new', | |
'event': 'event_completely_new', | |
'status': 'brand_new_status' | |
} | |
] | |
result = tcl_manager.update_action_config_core(events_data, config_file) | |
assert result['success'] is True | |
assert result['updates_made'] is True | |
assert result['entries_updated'] == 1 # milestone_old/event_old updated | |
assert result['entries_added'] == 1 # milestone_completely_new/event_completely_new added | |
# Check file content | |
with open(config_file, 'r') as f: | |
content = f.read() | |
# Verify old entry was updated | |
assert 'updated_old_status' in content | |
assert 'UPDATED OLD DESIGN' in content | |
# Verify new entry was added with proper format (no $ prefix) | |
assert 'milestone_completely_new' in content | |
assert 'tcl_mgr_entry_' in content # Should use our distinctive prefix | |
# Verify external entry was preserved | |
assert 'user_defined_12345' in content | |
assert 'milestone_external' in content | |
if __name__ == '__main__': | |
pytest.main([__file__, '-v']) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment