Skip to content

Instantly share code, notes, and snippets.

@rightson
Last active June 13, 2025 02:04
Show Gist options
  • Save rightson/8f4d038c87f3aaaa311c817e63768473 to your computer and use it in GitHub Desktop.
Save rightson/8f4d038c87f3aaaa311c817e63768473 to your computer and use it in GitHub Desktop.
Action Config Manager
import os
import re
import logging
from typing import List, Dict, Any, Tuple, Set, Optional
from collections import defaultdict
# Set up logger
logger = logging.getLogger(__name__)
def has_uncommented_action_table_created(tcl_content: str) -> bool:
"""Check if TCL content contains uncommented action_table definition"""
lines = tcl_content.split('\n')
for line in lines:
stripped = line.strip()
# Skip comment lines
if stripped.startswith('#'):
continue
# Check for set action_table definition
if re.search(r'set\s+action_table\s+\[dict', stripped, re.IGNORECASE):
return True
return False
def extract_dict_entries(tcl_content: str) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Tuple[int, int]]]:
"""
Extract all dict append entries from TCL content
Returns: (entry dictionary, entry position information)
"""
entries = {}
positions = {}
lines = tcl_content.split('\n')
i = 0
while i < len(lines):
line = lines[i]
stripped = line.strip()
# Match dict append action_table $variable { or dict append action_table variable {
# Handle both $variable and variable formats
match = re.match(r'dict\s+append\s+action_table\s+\$?(\w+)\s*\{', stripped, re.IGNORECASE)
if match:
variable = match.group(1) # Variable name without $ symbol
start_line = i
# Parse dictionary content until finding corresponding closing brace
brace_count = 1
entry_data = {}
i += 1
while i < len(lines) and brace_count > 0:
current_line = lines[i].strip()
# Count braces
brace_count += current_line.count('{') - current_line.count('}')
# Parse key-value pairs
if brace_count > 0:
key_value_match = re.match(r'(\w+)\s*\{([^}]*)\}', current_line)
if key_value_match:
key = key_value_match.group(1)
value = key_value_match.group(2)
entry_data[key] = value
i += 1
entries[variable] = entry_data
positions[variable] = (start_line, i - 1)
logger.debug(f"Extracted entry '{variable}': {entry_data}")
else:
i += 1
logger.debug(f"Total extracted entries: {len(entries)}")
return entries, positions
def detect_duplicates(existing_entries: Dict[str, Dict[str, Any]]) -> Dict[Tuple[str, str], List[str]]:
"""
Detect duplicate variable names based on milestone/event combination
Returns: Dictionary mapping (milestone, event) to list of variable names
"""
milestone_event_map = defaultdict(list)
for variable, entry_data in existing_entries.items():
milestone = entry_data.get('milestone', '')
event = entry_data.get('event', '')
if milestone and event:
milestone_event_map[(milestone, event)].append(variable)
# Filter to only return duplicates
duplicates = {key: variables for key, variables in milestone_event_map.items() if len(variables) > 1}
if duplicates:
logger.warning(f"Detected {len(duplicates)} duplicate milestone/event combinations:")
for (milestone, event), variables in duplicates.items():
logger.warning(f" {milestone}/{event}: variables {variables}")
return duplicates
def resolve_duplicates(existing_entries: Dict[str, Dict[str, Any]],
duplicates: Dict[Tuple[str, str], List[str]],
existing_keys: Set[str]) -> Tuple[Dict[str, str], Set[str]]:
"""
Resolve duplicates by keeping the first occurrence and renaming others
Returns: (variable_rename_map, variables_to_remove)
"""
variable_rename_map = {}
variables_to_remove = set()
for (milestone, event), duplicate_variables in duplicates.items():
logger.info(f"Resolving duplicates for {milestone}/{event}: {duplicate_variables}")
# Keep the first variable, rename or remove others
keep_variable = duplicate_variables[0]
logger.info(f" Keeping variable: {keep_variable}")
for i, dup_var in enumerate(duplicate_variables[1:], 1):
# Try to generate a unique name for the duplicate
new_name = generate_unique_key(existing_keys, f"{keep_variable}_dup")
existing_keys.add(new_name)
variable_rename_map[dup_var] = new_name
logger.info(f" Renaming duplicate variable: {dup_var} -> {new_name}")
return variable_rename_map, variables_to_remove
def find_matching_entry(existing_entries: Dict[str, Dict[str, Any]], milestone: str, event: str) -> Optional[str]:
"""Find matching variable name based on milestone and event combination"""
for variable, entry_data in existing_entries.items():
if (entry_data.get('milestone', '') == milestone and
entry_data.get('event', '') == event):
logger.debug(f"Found matching entry '{variable}' for {milestone}/{event}")
return variable
logger.debug(f"No matching entry found for {milestone}/{event}")
return None
def generate_unique_key(existing_keys: Set[str], base_name: str = "tcl_mgr_entry") -> str:
"""
Generate unique variable name that doesn't conflict with existing keys
Uses a distinctive prefix to avoid conflicts with externally managed variables
"""
counter = 1
while f"{base_name}_{counter}" in existing_keys:
counter += 1
new_key = f"{base_name}_{counter}"
logger.debug(f"Generated unique key: {new_key}")
return new_key
def format_dict_entry(variable: str, event_data: Dict[str, Any]) -> List[str]:
"""Format dictionary entry to TCL format (without $ prefix)"""
lines = [f"dict append action_table {variable} {{"]
# Ensure specific field order
field_order = ['milestone', 'event', 'design_type', 'status', 'start_time', 'deadline', 'time_left', 'fdi_checker', 'action']
# Process ordered fields first
for field in field_order:
if field in event_data:
value = event_data[field]
lines.append(f" {field:<12} {{{value}}}")
# Process other fields not in order
for key, value in event_data.items():
if key not in field_order:
lines.append(f" {key:<12} {{{value}}}")
lines.append("}")
logger.debug(f"Formatted entry for variable '{variable}': {len(lines)} lines")
return lines
def is_different(existing_event: Dict[str, Any], new_event: Dict[str, Any]) -> bool:
"""Compare if two events have differences (excluding milestone and event)"""
# Only compare fields other than milestone and event
existing_copy = {k: v for k, v in existing_event.items() if k not in ['milestone', 'event']}
new_copy = {k: v for k, v in new_event.items() if k not in ['milestone', 'event']}
is_diff = existing_copy != new_copy
if is_diff:
logger.debug(f"Detected differences between existing and new event data")
logger.debug(f"Existing: {existing_copy}")
logger.debug(f"New: {new_copy}")
return is_diff
def update_action_config_core(events_data: List[Dict[str, Any]], action_config_file: str, verbose: bool = True) -> Dict[str, Any]:
"""Core logic: Update action_config.tcl file with duplicate detection and resolution"""
logger.info(f"Starting action_config.tcl update for file: {action_config_file}")
logger.debug(f"Input events_data: {len(events_data)} events")
# Initialize template
template = """if { [info exist action_table ] } {
if { [array exist action_table] } {
array unset action_table
} else {
unset action_table
}
}
set action_table [dict create]
"""
# Read or create file
if os.path.exists(action_config_file):
with open(action_config_file, "r", encoding='utf-8') as f:
tcl_content = f.read()
if not tcl_content.strip() or not has_uncommented_action_table_created(tcl_content):
logger.info("Using template to initialize action_config.tcl as file is empty or missing initialization")
tcl_content = template
else:
logger.info("Found existing action_config.tcl with proper initialization")
else:
logger.info("action_config.tcl does not exist, creating with template")
tcl_content = template
# Parse existing entries
existing_entries, entry_positions = extract_dict_entries(tcl_content)
existing_keys = set(existing_entries.keys())
logger.debug(f"Existing keys: {existing_keys}")
# Detect and resolve duplicates
duplicates = detect_duplicates(existing_entries)
duplicate_fixes_made = len(duplicates) > 0
variable_rename_map = {}
if duplicate_fixes_made:
logger.info(f"Found {len(duplicates)} duplicate milestone/event combinations, resolving...")
variable_rename_map, variables_to_remove = resolve_duplicates(existing_entries, duplicates, existing_keys)
# Update existing_keys to reflect renames
for old_var, new_var in variable_rename_map.items():
existing_keys.discard(old_var)
existing_keys.add(new_var)
updates_made = False
entries_to_update = {}
new_entries = []
# Process each event
for event in events_data:
milestone = event.get('milestone', '')
event_name = event.get('event', '')
if not milestone or not event_name:
logger.info(f"Skipping event with missing milestone or event: {event}")
continue
logger.debug(f"Processing event: {milestone}/{event_name}")
matching_key = find_matching_entry(existing_entries, milestone, event_name)
if matching_key:
# Check if this key was renamed due to duplicate resolution
actual_key = variable_rename_map.get(matching_key, matching_key)
# Found matching entry, check if update is needed
existing_event = existing_entries[matching_key]
if is_different(existing_event, event):
entries_to_update[actual_key] = event
updates_made = True
logger.info(f"Field changes detected for {milestone}/{event_name} (key: {actual_key})")
else:
logger.debug(f"No changes needed for {milestone}/{event_name} (key: {actual_key})")
else:
# Add new entry
new_key = generate_unique_key(existing_keys)
existing_keys.add(new_key)
new_entries.append((new_key, event))
updates_made = True
logger.info(f"New entry to be added: {milestone}/{event_name} (key: {new_key})")
logger.debug(f"Summary - Entries to update: {len(entries_to_update)}, New entries: {len(new_entries)}")
# If there are updates or duplicate fixes, regenerate file content
if updates_made or duplicate_fixes_made:
logger.info("Applying updates to action_config.tcl")
lines = tcl_content.split('\n')
result_lines = []
i = 0
processed_variables = set()
while i < len(lines):
line = lines[i]
# Check if this is a dict append line
match = re.match(r'dict\s+append\s+action_table\s+\$?(\w+)\s*\{', line.strip(), re.IGNORECASE)
if match:
variable = match.group(1) # Variable name without $ symbol
# Handle renamed variables
actual_variable = variable_rename_map.get(variable, variable)
# Skip if we've already processed this variable (handles duplicates)
if actual_variable in processed_variables:
logger.debug(f"Skipping duplicate variable: {variable}")
# Skip original dict append block
brace_count = 1
i += 1
while i < len(lines) and brace_count > 0:
current_line = lines[i]
brace_count += current_line.count('{') - current_line.count('}')
i += 1
continue
processed_variables.add(actual_variable)
# Check if this entry needs updating
if actual_variable in entries_to_update:
# Replace entire dict append block with updated data
new_entry_lines = format_dict_entry(actual_variable, entries_to_update[actual_variable])
result_lines.extend(new_entry_lines)
logger.debug(f"Updated dict append block for variable: {actual_variable}")
elif variable in variable_rename_map:
# This is a renamed duplicate - use original data but with new name
original_data = existing_entries[variable]
new_entry_lines = format_dict_entry(actual_variable, original_data)
result_lines.extend(new_entry_lines)
logger.debug(f"Renamed dict append block: {variable} -> {actual_variable}")
else:
# Keep original entry but update variable name if needed
original_data = existing_entries[variable]
new_entry_lines = format_dict_entry(actual_variable, original_data)
result_lines.extend(new_entry_lines)
logger.debug(f"Preserved dict append block for variable: {actual_variable}")
# Skip original dict append block
brace_count = 1
i += 1
while i < len(lines) and brace_count > 0:
current_line = lines[i]
brace_count += current_line.count('{') - current_line.count('}')
i += 1
continue
result_lines.append(line)
i += 1
# Add new entries at the end of file
for new_key, event in new_entries:
result_lines.append("")
result_lines.append(f"# New entry added by adaptive tool: {event.get('milestone', '')}/{event.get('event', '')}")
result_lines.extend(format_dict_entry(new_key, event))
logger.debug(f"Added new dict append block for key: {new_key}")
# Write to file
with open(action_config_file, "w", encoding='utf-8') as f:
f.write('\n'.join(result_lines))
logger.info(f"Successfully updated action_config.tcl: {action_config_file}")
logger.info(f"Updated {len(entries_to_update)} existing entries, added {len(new_entries)} new entries")
if duplicate_fixes_made:
logger.info(f"Fixed {len(variable_rename_map)} duplicate variable names")
else:
logger.info("No updates needed for action_config.tcl")
return {
'success': True,
'message': 'Update completed successfully',
'updates_made': updates_made or duplicate_fixes_made,
'entries_updated': len(entries_to_update),
'entries_added': len(new_entries),
'duplicates_fixed': len(variable_rename_map),
'duplicate_details': variable_rename_map
}
def update_action_config_from_data(events_data: List[Dict[str, Any]],
action_config_file: str,
timeout: int = 10,
verbose: bool = True) -> Dict[str, Any]:
"""
Main function to update action_config file from event data
Args:
events_data: List of event data
action_config_file: TCL configuration file path
timeout: Timeout setting (reserved parameter)
verbose: Whether to output detailed information
Returns:
Update result dictionary
"""
try:
logger.info(f"Starting update_action_config_from_data with {len(events_data)} events")
return update_action_config_core(events_data, action_config_file, verbose)
except Exception as e:
error_msg = f'Update failed: {str(e)}'
logger.error(error_msg)
return {
'success': False,
'message': error_msg,
'updates_made': False,
'entries_updated': 0,
'entries_added': 0,
'duplicates_fixed': 0,
'duplicate_details': {}
}
# test_action_config_complete.tcl
# Complete action config file for testing
# This file contains initialization and existing entries with mixed variable naming
if { [info exist action_table ] } {
if { [array exist action_table] } {
array unset action_table
} else {
unset action_table
}
}
set action_table [dict create]
# First existing entry with $ prefix (old style)
dict append action_table $legacy_entry_1 {
milestone {milestone1}
event {event1}
design_type {old_design1 TOP PARTITION}
status {old_status}
start_time {2025-06-10T00:00:00}
deadline {2025-06-28}
time_left {old_time_left}
fdi_checker {/old/path/checker1}
action {old_action1}
}
# Second existing entry without $ prefix (new style)
dict append action_table modern_entry_xyz {
milestone {milestone_existing}
event {event_existing}
design_type {existing_design BLOCK}
status {completed}
start_time {2025-06-05T00:00:00}
deadline {2025-06-25}
time_left {0 days}
fdi_checker {/path/to/existing/checker}
action {existing_action}
}
# Third entry with external naming convention
dict append action_table user_defined_var_12345 {
milestone {preserve_milestone}
event {preserve_event}
design_type {PRESERVE DESIGN}
status {preserve_status}
start_time {2025-06-01T00:00:00}
deadline {2025-06-20}
time_left {preserve_time}
fdi_checker {/preserve/path}
action {preserve_action}
}
# Some additional TCL code to ensure it's preserved
puts "This is additional TCL code that should be preserved"
set some_variable "test_value"
# End of file comment}
status {preserve_status}
start_time {2025-06-01T00:00:00}
deadline {2025-06-20}
time_left {preserve_time}
fdi_checker {/preserve/path}
action {preserve_action}
}
# Some additional TCL code to ensure it's preserved
puts "This is additional TCL code that should be preserved"
set some_variable "test_value"
# End of file comment
def test_large_dataset(self, temp_dir, setup_logging):
"""Test with large number of events"""
config_file = os.path.join(temp_dir, 'large_dataset_config.tcl')
# Generate large dataset
large_events = []
for i in range(100):
large_events.append({
'milestone': f'milestone_{i}',
'event': f'event_{i}',
'design_type': f'design_{i} PARTITION',
'status': 'active',
'start_time': '2025-06-11T00:25:06',
'deadline': '2025-06-29',
'time_left': f'{i} days',
'fdi_checker': f'/path/to/checker_{i}',
'action': f'action_{i}'
})
result = tcl_manager.update_action_config_core(large_events, config_file)
assert result['success'] is True
assert result['updates_made'] is True
assert result['entries_added'] == 100
# Verify file content
with open(config_file, 'r') as f:
content = f.read()
assert content.count('dict append action_table') == 100
assert 'milestone_0' in content
assert 'milestone_99' in content
# Verify no $ prefixes in generated entries
assert '$tcl_mgr_entry_' not in content
assert 'tcl_mgr_entry_' in contentimport pytest
import os
import tempfile
import shutil
from unittest.mock import patch, MagicMock
import logging
# Import the module under test
import tcl_manager
class TestTclManager:
@pytest.fixture
def temp_dir(self):
"""Create temporary directory for test files"""
temp_dir = tempfile.mkdtemp()
yield temp_dir
shutil.rmtree(temp_dir)
@pytest.fixture
def sample_events_data(self):
"""Sample event data for testing"""
return [
{
'milestone': 'milestone1',
'event': 'event1',
'design_type': 'design1 TOP PARTITION',
'status': 'active',
'start_time': '2025-06-11T00:25:06',
'deadline': '2025-06-29',
'time_left': '18 days',
'fdi_checker': '/path/to/checker1',
'action': 'action1'
},
{
'milestone': 'milestone2',
'event': 'event2',
'design_type': 'design2 SUB PARTITION',
'status': 'pending',
'start_time': '2025-06-12T00:25:06',
'deadline': '2025-06-30',
'time_left': '19 days',
'fdi_checker': '/path/to/checker2',
'action': 'action2'
}
]
@pytest.fixture
def existing_tcl_content(self):
"""Sample existing TCL content with mixed variable naming"""
return """# This is a comment
if { [info exist action_table ] } {
if { [array exist action_table] } {
array unset action_table
} else {
unset action_table
}
}
set action_table [dict create]
# Another comment - entry with $ prefix (old style)
dict append action_table $entry_1 {
milestone {milestone1}
event {event1}
design_type {old design TOP PARTITION}
status {old_status}
start_time {2025-06-10T00:25:06}
deadline {2025-06-28}
time_left {old_time}
fdi_checker {/old/path/checker}
action {old_action}
}
# More comments to preserve - entry without $ prefix (new style)
dict append action_table external_entry_xyz {
milestone {milestone3}
event {event3}
design_type {design3 BLOCK}
status {completed}
start_time {2025-06-05T00:25:06}
deadline {2025-06-25}
time_left {0 days}
fdi_checker {/path/to/checker3}
action {action3}
}
"""
@pytest.fixture
def empty_tcl_content(self):
"""Empty TCL content without initialization"""
return "# Just a comment\n# Another comment\n"
@pytest.fixture
def setup_logging(self):
"""Setup logging for tests"""
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('tcl_manager')
return logger
def test_has_uncommented_action_table_created_positive(self):
"""Test detection of action_table creation"""
content = """
# Comment line
set action_table [dict create]
"""
result = tcl_manager.has_uncommented_action_table_created(content)
assert result is True
def test_has_uncommented_action_table_created_negative(self):
"""Test when action_table is not created or commented"""
content = """
# set action_table [dict create]
# Another comment
"""
result = tcl_manager.has_uncommented_action_table_created(content)
assert result is False
def test_extract_dict_entries(self, existing_tcl_content):
"""Test extraction of dictionary entries with mixed variable naming"""
entries, positions = tcl_manager.extract_dict_entries(existing_tcl_content)
assert len(entries) == 2
assert 'entry_1' in entries # Should extract without $ symbol
assert 'external_entry_xyz' in entries # Should handle external naming
# Check entry_1 content
entry_1 = entries['entry_1']
assert entry_1['milestone'] == 'milestone1'
assert entry_1['event'] == 'event1'
assert entry_1['status'] == 'old_status'
# Check external_entry_xyz content
external_entry = entries['external_entry_xyz']
assert external_entry['milestone'] == 'milestone3'
assert external_entry['event'] == 'event3'
assert external_entry['status'] == 'completed'
# Check positions
assert len(positions) == 2
assert 'entry_1' in positions
assert 'external_entry_xyz' in positions
def test_find_matching_entry(self):
"""Test finding matching entries by milestone and event"""
existing_entries = {
'entry_1': {'milestone': 'milestone1', 'event': 'event1', 'status': 'active'},
'entry_2': {'milestone': 'milestone2', 'event': 'event2', 'status': 'pending'}
}
# Test finding existing entry
result = tcl_manager.find_matching_entry(existing_entries, 'milestone1', 'event1')
assert result == 'entry_1'
# Test not finding entry
result = tcl_manager.find_matching_entry(existing_entries, 'milestone3', 'event3')
assert result is None
def test_generate_unique_key(self):
"""Test unique key generation with external variable names"""
existing_keys = {'entry_1', 'external_var_abc', 'user_defined_xyz', 'tcl_mgr_entry_1'}
new_key = tcl_manager.generate_unique_key(existing_keys)
assert new_key == 'tcl_mgr_entry_2' # Should use distinctive prefix
assert new_key not in existing_keys
# Test with custom base name
new_key_custom = tcl_manager.generate_unique_key(existing_keys, "custom_base")
assert new_key_custom == 'custom_base_1'
assert new_key_custom not in existing_keys
def test_format_dict_entry(self):
"""Test TCL dictionary entry formatting without $ prefix"""
event_data = {
'milestone': 'test_milestone',
'event': 'test_event',
'design_type': 'TEST DESIGN',
'status': 'active'
}
lines = tcl_manager.format_dict_entry('test_var', event_data)
assert lines[0] == 'dict append action_table test_var {' # No $ prefix
assert lines[-1] == '}'
assert any('milestone' in line and 'test_milestone' in line for line in lines)
assert any('event' in line and 'test_event' in line for line in lines)
def test_is_different(self):
"""Test event difference detection"""
existing_event = {
'milestone': 'milestone1',
'event': 'event1',
'status': 'old_status',
'design_type': 'old_design'
}
# Different event (status changed)
new_event_different = {
'milestone': 'milestone1',
'event': 'event1',
'status': 'new_status',
'design_type': 'old_design'
}
# Same event
new_event_same = {
'milestone': 'milestone1',
'event': 'event1',
'status': 'old_status',
'design_type': 'old_design'
}
assert tcl_manager.is_different(existing_event, new_event_different) is True
assert tcl_manager.is_different(existing_event, new_event_same) is False
def test_update_action_config_core_new_file(self, temp_dir, sample_events_data, setup_logging):
"""Test updating non-existent file (should create new file)"""
config_file = os.path.join(temp_dir, 'new_action_config.tcl')
result = tcl_manager.update_action_config_core(sample_events_data, config_file)
assert result['success'] is True
assert result['updates_made'] is True
assert result['entries_updated'] == 0
assert result['entries_added'] == 2
# Check file was created
assert os.path.exists(config_file)
# Check file content
with open(config_file, 'r') as f:
content = f.read()
assert 'set action_table [dict create]' in content
assert 'milestone1' in content
assert 'milestone2' in content
def test_update_action_config_core_empty_file(self, temp_dir, sample_events_data, setup_logging):
"""Test updating empty file"""
config_file = os.path.join(temp_dir, 'empty_action_config.tcl')
# Create empty file
with open(config_file, 'w') as f:
f.write('')
result = tcl_manager.update_action_config_core(sample_events_data, config_file)
assert result['success'] is True
assert result['updates_made'] is True
assert result['entries_added'] == 2
# Check file content
with open(config_file, 'r') as f:
content = f.read()
assert 'set action_table [dict create]' in content
def test_update_action_config_core_existing_file_update(self, temp_dir, existing_tcl_content, sample_events_data, setup_logging):
"""Test updating existing file with matching entries"""
config_file = os.path.join(temp_dir, 'existing_action_config.tcl')
# Create file with existing content
with open(config_file, 'w') as f:
f.write(existing_tcl_content)
result = tcl_manager.update_action_config_core(sample_events_data, config_file)
assert result['success'] is True
assert result['updates_made'] is True
assert result['entries_updated'] == 1 # milestone1/event1 should be updated
assert result['entries_added'] == 1 # milestone2/event2 should be added
# Check file content
with open(config_file, 'r') as f:
content = f.read()
# Check that comments are preserved
assert '# This is a comment' in content
assert '# More comments to preserve' in content
# Check that milestone1/event1 was updated
assert 'design1 TOP PARTITION' in content # New value
assert 'old design TOP PARTITION' not in content # Old value should be replaced
# Check that milestone2/event2 was added
assert 'milestone2' in content
assert 'event2' in content
def test_update_action_config_core_no_updates_needed(self, temp_dir, setup_logging):
"""Test when no updates are needed"""
config_file = os.path.join(temp_dir, 'no_update_config.tcl')
# Create file with matching content (without $ prefix)
existing_content = """
if { [info exist action_table ] } {
if { [array exist action_table] } {
array unset action_table
} else {
unset action_table
}
}
set action_table [dict create]
dict append action_table tcl_mgr_entry_1 {
milestone {milestone1}
event {event1}
design_type {design1 TOP PARTITION}
status {active}
start_time {2025-06-11T00:25:06}
deadline {2025-06-29}
time_left {18 days}
fdi_checker {/path/to/checker1}
action {action1}
}
"""
with open(config_file, 'w') as f:
f.write(existing_content)
events_data = [{
'milestone': 'milestone1',
'event': 'event1',
'design_type': 'design1 TOP PARTITION',
'status': 'active',
'start_time': '2025-06-11T00:25:06',
'deadline': '2025-06-29',
'time_left': '18 days',
'fdi_checker': '/path/to/checker1',
'action': 'action1'
}]
result = tcl_manager.update_action_config_core(events_data, config_file)
assert result['success'] is True
assert result['updates_made'] is False
assert result['entries_updated'] == 0
assert result['entries_added'] == 0
def test_update_action_config_core_invalid_events(self, temp_dir, setup_logging):
"""Test handling of invalid event data"""
config_file = os.path.join(temp_dir, 'invalid_events_config.tcl')
invalid_events = [
{'milestone': '', 'event': 'event1'}, # Empty milestone
{'milestone': 'milestone1', 'event': ''}, # Empty event
{'milestone': 'milestone1'}, # Missing event
{'event': 'event1'}, # Missing milestone
{'milestone': 'valid_milestone', 'event': 'valid_event', 'status': 'active'} # Valid event
]
result = tcl_manager.update_action_config_core(invalid_events, config_file)
assert result['success'] is True
assert result['updates_made'] is True
assert result['entries_added'] == 1 # Only the valid event should be added
def test_update_action_config_from_data_success(self, temp_dir, sample_events_data):
"""Test main wrapper function success case"""
config_file = os.path.join(temp_dir, 'wrapper_test_config.tcl')
result = tcl_manager.update_action_config_from_data(sample_events_data, config_file)
assert result['success'] is True
assert result['updates_made'] is True
assert os.path.exists(config_file)
@patch('tcl_manager.update_action_config_core')
def test_update_action_config_from_data_exception(self, mock_core, temp_dir, sample_events_data):
"""Test main wrapper function exception handling"""
config_file = os.path.join(temp_dir, 'exception_test_config.tcl')
# Make the core function raise an exception
mock_core.side_effect = Exception("Test exception")
result = tcl_manager.update_action_config_from_data(sample_events_data, config_file)
assert result['success'] is False
assert 'Test exception' in result['message']
assert result['updates_made'] is False
assert result['entries_updated'] == 0
assert result['entries_added'] == 0
def test_file_permissions_readonly(self, temp_dir, sample_events_data, setup_logging):
"""Test handling of read-only file"""
config_file = os.path.join(temp_dir, 'readonly_config.tcl')
# Create file and make it read-only
with open(config_file, 'w') as f:
f.write('# Read-only file')
os.chmod(config_file, 0o444) # Read-only permissions
try:
result = tcl_manager.update_action_config_from_data(sample_events_data, config_file)
assert result['success'] is False
finally:
# Restore permissions for cleanup
os.chmod(config_file, 0o644)
def test_mixed_variable_formats(self, temp_dir, setup_logging):
"""Test handling of mixed variable naming formats (with and without $ prefix)"""
config_file = os.path.join(temp_dir, 'mixed_format_config.tcl')
# Create file with mixed formats
mixed_content = """
if { [info exist action_table ] } {
if { [array exist action_table] } {
array unset action_table
} else {
unset action_table
}
}
set action_table [dict create]
# Old style with $ prefix
dict append action_table $old_style_var {
milestone {milestone_old}
event {event_old}
status {old_status}
}
# New style without $ prefix
dict append action_table new_style_var {
milestone {milestone_new}
event {event_new}
status {new_status}
}
# External variable name
dict append action_table user_defined_12345 {
milestone {milestone_external}
event {event_external}
status {external_status}
}
"""
with open(config_file, 'w') as f:
f.write(mixed_content)
# Update with new events
events_data = [
{
'milestone': 'milestone_old',
'event': 'event_old',
'status': 'updated_old_status',
'design_type': 'UPDATED OLD DESIGN'
},
{
'milestone': 'milestone_completely_new',
'event': 'event_completely_new',
'status': 'brand_new_status'
}
]
result = tcl_manager.update_action_config_core(events_data, config_file)
assert result['success'] is True
assert result['updates_made'] is True
assert result['entries_updated'] == 1 # milestone_old/event_old updated
assert result['entries_added'] == 1 # milestone_completely_new/event_completely_new added
# Check file content
with open(config_file, 'r') as f:
content = f.read()
# Verify old entry was updated
assert 'updated_old_status' in content
assert 'UPDATED OLD DESIGN' in content
# Verify new entry was added with proper format (no $ prefix)
assert 'milestone_completely_new' in content
assert 'tcl_mgr_entry_' in content # Should use our distinctive prefix
# Verify external entry was preserved
assert 'user_defined_12345' in content
assert 'milestone_external' in content
if __name__ == '__main__':
pytest.main([__file__, '-v'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment