diff --git a/cecli/coders/base_coder.py b/cecli/coders/base_coder.py index 6d774522bc7..431f0b5c8eb 100755 --- a/cecli/coders/base_coder.py +++ b/cecli/coders/base_coder.py @@ -419,6 +419,8 @@ def __init__( uuid: str = "", parent_uuid: str = "", ): + from cecli.helpers.agents.service import AgentService + # initialize from args.map_cache_dir self.coroutines = coroutines # Per-instance tool and server filtering dictionaries @@ -544,8 +546,10 @@ def __init__( self.show_diffs = show_diffs - # Initialize conversation system if enabled + # Initialize all registry sub systems + AgentService.get_instance(self) ConversationService.get_chunks(self).initialize_conversation_system() + ObservationService.get_instance(self) self.commands = commands or Commands(self.io, self, args=args) self.commands.coder = self @@ -3675,10 +3679,11 @@ def consolidate_chunks(self): func_err = None content_err = None - last_chunk = self.partial_response_chunks[len(self.partial_response_chunks) - 1] - if last_chunk: - if getattr(last_chunk, "usage", None): - response.usage = last_chunk.usage + if len(self.partial_response_chunks): + last_chunk = self.partial_response_chunks[len(self.partial_response_chunks) - 1] + if last_chunk: + if getattr(last_chunk, "usage", None): + response.usage = last_chunk.usage # Collect provider-specific fields from chunks to preserve them # We need to track both by ID (primary) and index (fallback) since @@ -4487,7 +4492,7 @@ async def handle_shell_commands(self, commands_str, group): if output: accumulated_output += f"Output from {command}\n{output}\n" - print(accumulated_output) + self.io.tool_output(accumulated_output) if accumulated_output.strip() and await self.io.confirm_ask( "Add command output to the chat?", allow_never=True diff --git a/cecli/commands/run.py b/cecli/commands/run.py index 5cd2ebc8e46..d5568f9e9d2 100644 --- a/cecli/commands/run.py +++ b/cecli/commands/run.py @@ -37,7 +37,7 @@ async def execute(cls, io, coder, args, **kwargs): ) if coder.args.tui: - print(combined_output) + coder.io.tool_warning(combined_output) else: # This print statement, for whatever reason, # allows the thread to properly yield control of the terminal diff --git a/cecli/helpers/agents/service.py b/cecli/helpers/agents/service.py index c3d303f4700..f62b2950540 100644 --- a/cecli/helpers/agents/service.py +++ b/cecli/helpers/agents/service.py @@ -571,11 +571,13 @@ def start_generate_task(self, info: SubAgentInfo, user_message: str) -> asyncio. Returns: The ``asyncio.Task`` wrapping ``generate()``. """ + from cecli.commands import SwitchCoderSignal async def _run_generate(): info.status = SubAgentStatus.RUNNING try: await info.coder.generate(user_message=user_message, preproc=True) + if info.status == SubAgentStatus.RUNNING: info.status = SubAgentStatus.FINISHED info.summary = info.summary or DEFAULT_SUMMARY_COMPLETED @@ -597,6 +599,8 @@ async def _run_generate(): ) await self._inject_sub_agent_result(info) raise + except SwitchCoderSignal: + raise # Cancel any previous generate task to prevent duplicate concurrent generates if info.generate_task is not None and not info.generate_task.done(): @@ -604,8 +608,15 @@ async def _run_generate(): task = asyncio.create_task(_run_generate()) info.generate_task = task + + def _raise_if_signal(exc): + if isinstance(exc, SwitchCoderSignal): + raise exc + # Suppress "Task exception was never retrieved" for fire-and-forget tasks - task.add_done_callback(lambda t: t.exception() if not t.cancelled() else None) + task.add_done_callback( + lambda t: _raise_if_signal(t.exception()) if not t.cancelled() else None + ) return task async def _inject_sub_agent_result(self, info: SubAgentInfo) -> None: diff --git a/cecli/helpers/hashline.py b/cecli/helpers/hashline.py index 35a3a868689..197930a9e25 100644 --- a/cecli/helpers/hashline.py +++ b/cecli/helpers/hashline.py @@ -1,6 +1,8 @@ import difflib import re +from cecli.helpers.grep_ast.parsers import filename_to_lang +from cecli.helpers.grep_ast.tsl import get_language, get_parser from cecli.helpers.hashpos.hashpos import HashPos HASHLINE_PREFIX_RE = HashPos.HASH_PREFIX_RE @@ -1287,9 +1289,302 @@ def sort_ranges(op): return (-start_idx, priority) +def _would_create_duplicate_content(source_lines, candidate_start, candidate_end, repl_lines): + """ + Check if applying the edit would create duplicate adjacent content at boundaries. + """ + if not repl_lines: + return False + + # Check start boundary: first replacement line matches line before edit range + if candidate_start > 0: + line_before = source_lines[candidate_start - 1].strip() + first_repl = repl_lines[0].strip() + # Ensure the line actually contains text to prevent eating blank lines + if line_before and line_before == first_repl: + return True + + # Check end boundary: last replacement line matches line after edit range + if candidate_end < len(source_lines) - 1: + line_after = source_lines[candidate_end + 1].strip() + last_repl = repl_lines[-1].strip() + if line_after and line_after == last_repl: + return True + + return False + + +def _fix_duplicate_content_boundaries(source_lines, resolved_ops): + """ + Expand edit boundaries when replacement content duplicates adjacent lines. + """ + for resolved in resolved_ops: + op = resolved["op"] + if op["operation"] not in {"replace", "insert", "delete"}: + continue + repl_text = op.get("text", "") or "" + repl_lines = repl_text.splitlines() + if not repl_lines: + continue + + start_idx = resolved["start_idx"] + end_idx = resolved["end_idx"] + + # Expand start backward if first replacement line duplicates line before + while start_idx > 0: + line_before = source_lines[start_idx - 1].strip() + first_repl = repl_lines[0].strip() + if line_before and line_before == first_repl: + start_idx -= 1 + else: + break + + # Expand end forward if last replacement line duplicates line after + while end_idx < len(source_lines) - 1: + line_after = source_lines[end_idx + 1].strip() + last_repl = repl_lines[-1].strip() + if line_after and line_after == last_repl: + end_idx += 1 + else: + break + + resolved["start_idx"] = start_idx + resolved["end_idx"] = end_idx + + return resolved_ops + + +def _apply_closure_safeguard( + original_content: str, + resolved_ops: list, + file_path: str = None, +) -> list: + """ + Use tree-sitter to heal edit boundaries by simulating edits and checking syntax. + + For each replace/delete operation, this simulates applying the edit with + the replacement text, parses the resulting code with tree-sitter, and + checks if the result is syntactically valid (no ERROR/MISSING nodes). + + If the edit produces invalid syntax (e.g., missing closing braces), + it progressively expands/contracts the start and end boundaries by a + step and re-tests until the resulting code parses correctly or the + maximum number of expansion steps is exhausted. + + This prevents the common LLM edit error of "eating" outer scope + closing braces, parentheses, or brackets by finding the nearest + syntactically valid edit boundary. + + Args: + original_content: Original source code (without hashlines) + resolved_ops: List of resolved operation dicts with start_idx/end_idx + file_path: File path to determine tree-sitter language + + Returns: + Modified resolved_ops with healed boundaries + """ + + def is_syntactically_valid(source_bytes: bytes, parser) -> bool: + try: + tree = parser.parse(source_bytes) + return not tree.root_node.has_error + except Exception: + return False + + def apply_edit( + source_lines: list[str], + start: int, + end: int, + replacement: list[str], + ) -> bytes: + """Applies the edit and returns the new source code as bytes.""" + new_lines = source_lines[:start] + replacement + source_lines[end + 1 :] + return "\n".join(new_lines).encode("utf-8") + + def count_closures(source_bytes: bytes) -> int: + """Counts the total number of brackets, braces, and parenthesis.""" + return sum(source_bytes.count(b) for b in b"{}[]()") + + def get_indentation(line: str) -> int: + """Helper to safely calculate the leading whitespace of a line.""" + return len(line) - len(line.lstrip(" \t")) + + if not resolved_ops or not file_path: + return resolved_ops + + # Determine language from file path + lang = filename_to_lang(file_path) + if not lang: + return resolved_ops + + # Set up tree-sitter parser + try: + language = get_language(lang) # noqa + parser = get_parser(lang) + except Exception: + # Can't determine language, skip safeguard + return resolved_ops + + source_lines = original_content.splitlines() + MAX_STEPS = 3 # Maximum expansion steps + + for resolved in resolved_ops: + op = resolved["op"] + if op["operation"] not in {"replace", "insert", "delete"}: + continue + + llm_start = resolved["start_idx"] + llm_end = resolved["end_idx"] + + # Clamp to valid bounds + if llm_start < 0 or llm_start >= len(source_lines): + continue + if llm_end < llm_start: + continue + + # Get replacement text + replacement_text = op.get("text", "") or "" + if op["operation"] == "delete": + replacement_text = "" + repl_lines = replacement_text.splitlines() + + # Clamp end to valid range + llm_end = min(llm_end, len(source_lines) - 1) + + # --- THE HEALING LOOP --- + # Try original bounds first (distance 0), then progressively expand + # outward in rounds. At each round d>=1, test exactly 4 candidates: + # 1. Both indices down by d lines (range shifts down) + # 2. Both indices up by d lines (range shifts up) + # 3. Start index down by d lines, end unchanged (partial expansion) + # 4. End index down by d lines, start unchanged (partial expansion) + # + # If multiple candidates are valid at a round, select using: + # 1. Longest resulting source code (preserve more code) + # 2. Partial expansions over full range shifts + # 3. Downward changes over upward changes + + found_valid = False + EFFECTIVE_STEPS = MAX_STEPS + + if llm_start == llm_end: + EFFECTIVE_STEPS = 1 + + for distance in range(EFFECTIVE_STEPS + 1): + if distance == 0: + round_candidates = [(0, 0)] + else: + round_candidates = [ + (-distance, 0), # Start down only (partial) + (+distance, 0), # Start up only (partial) + (0, +distance), # End down only (partial) + (0, -distance), # End up only (partial) + (-distance, +distance), # Both indices down + (+distance, -distance), # Both indices up + (+distance, +distance), # Expand outward (start up, end down) + (-distance, -distance), # Contract inward (start down, end up) + ] + + valid_at_round = [] + for start_shift, end_shift in round_candidates: + candidate_start = max(0, llm_start - start_shift) + candidate_end = min(len(source_lines) - 1, llm_end + end_shift) + + if candidate_end < candidate_start: + continue + + # Skip candidates that would create duplicate adjacent content at edit boundaries + if _would_create_duplicate_content( + source_lines, candidate_start, candidate_end, repl_lines + ): + continue + + test_source = apply_edit( + source_lines, + candidate_start, + candidate_end, + repl_lines, + ) + + if is_syntactically_valid(test_source, parser): + # Determine properties for tiebreaking + is_partial = (start_shift == 0) ^ (end_shift == 0) # XOR: exactly one is zero + is_downward = start_shift <= 0 # Negative/zero shift = moving down + is_both = start_shift == end_shift # Whole range expansion/contraction + closures = count_closures(test_source) + + # --- INDENTATION SCORING --- + indent_score = 0 + if repl_lines: + # Safely grab the first and last non-empty replacement lines + first_repl = next((line for line in repl_lines if line.strip()), "") + last_repl = next( + (line for line in reversed(repl_lines) if line.strip()), "" + ) + + # Check Start Boundary Match + if first_repl and candidate_start < len(source_lines): + source_start_line = source_lines[candidate_start] + if source_start_line.strip() and get_indentation( + first_repl + ) == get_indentation(source_start_line): + indent_score += 1 + + # Check End Boundary Match + if last_repl and candidate_end < len(source_lines): + source_end_line = source_lines[candidate_end] + if source_end_line.strip() and get_indentation( + last_repl + ) == get_indentation(source_end_line): + indent_score += 1 + + valid_at_round.append( + { + "start_idx": candidate_start, + "end_idx": candidate_end, + "source_len": len(test_source), + "is_partial": is_partial, + "is_downward": is_downward, + "is_both": is_both, + "closure_count": closures, + "indent_score": indent_score, + } + ) + + if valid_at_round: + # Sort using the new hierarchy: + # 1. Fewest total closures (minimize structural pollution) + # 2. Indentation Score (Descending: 2 is better than 0) + # 3. Longest source (preserve more file content) + # 4. Partial expansions over full range shifts + # 5. Downward changes over upward changes + valid_at_round.sort( + key=lambda r: ( + -r["indent_score"], # Descending: larger score is better (2 > 1 > 0) + -r["source_len"], # Descending: larger is better + r["closure_count"], # Ascending: smaller is better + not r["is_partial"], # Booleans: False comes before True + not r["is_downward"], # Booleans: False comes before True + r["is_both"], # Booleans: False comes before True + ) + ) + + best = valid_at_round[0] + resolved["start_idx"] = best["start_idx"] + resolved["end_idx"] = best["end_idx"] + found_valid = True + break + + if not found_valid: + pass + + return resolved_ops + + def apply_hashline_operations( original_content: str, operations: list, + file_path: str = None, ) -> tuple[str, list, list]: """ Apply multiple hashline operations sequentially. @@ -1326,6 +1621,13 @@ def apply_hashline_operations( if not normalized_operations: return original_content, [], failed_ops + # Convert insert operations without @000 marker to inclusive replace operations + for op in normalized_operations: + if op["operation"] == "insert": + start_hash_fragment, _, _ = parse_hashline(op["start_line_hash"]) + if start_hash_fragment != "@000" and start_hash_fragment != "000@": + op["operation"] = "replace" + op["end_line_hash"] = op["start_line_hash"] # Apply hashline to original content once # This converts content to hashed lines for line tracking @@ -1352,12 +1654,6 @@ def apply_hashline_operations( hashed_lines, start_hash_fragment, start_line_num_str ) - # if found_start is None: - # # Fall back to fragment matching if exact match fails - # found_start = find_hashline_by_fragment( - # hashed_lines, start_hash_fragment, start_line_num - # ) - if found_start is None: raise ContentHashError( f"Start line hash fragment '{start_hash_fragment}' not found in file" @@ -1425,8 +1721,14 @@ def apply_hashline_operations( resolved_ops = _merged_contained_ranges(resolved_ops) # Merge contiguous replace operations resolved_ops = _merge_replace_operations(resolved_ops) - # Apply content-aware range expansion/shifting for replace operations - # resolved_ops = _apply_range_shifting(hashed_lines, resolved_ops) + if file_path: + # Apply tree-sitter based closure safeguard to snap boundaries to AST nodes + resolved_ops = _apply_closure_safeguard(original_content, resolved_ops, file_path) + + # Fix edit boundaries where replacement content duplicates adjacent lines + source_lines = original_content.splitlines() + resolved_ops = _fix_duplicate_content_boundaries(source_lines, resolved_ops) + # Sort by start_idx descending to apply from bottom to top # When operations have same start_idx, apply in order: insert, replace, delete # This ensures correct behavior when multiple operations target the same line @@ -1488,8 +1790,22 @@ def apply_hashline_operations( # end_idx, replacement_lines = _apply_end_stitching( # hashed_lines, start_idx, end_idx, replacement_lines, max_overlap_check # ) + # Preserve empty lines after the edit range + # If there is an empty line immediately after the edit boundary, + # track it so it can be restored if consumed by the replacement + next_idx = end_idx + 1 + preserve_trailing_empty = ( + next_idx < len(hashed_lines) and hashed_lines[next_idx].strip() == "" + ) hashed_lines[start_idx : end_idx + 1] = replacement_lines + + # Ensure the trailing empty line is preserved after replacement + if preserve_trailing_empty: + check_idx = start_idx + len(replacement_lines) + if check_idx >= len(hashed_lines) or hashed_lines[check_idx].strip() != "": + hashed_lines.insert(check_idx, "\n") + else: # Empty text - replace with nothing (delete) hashed_lines[start_idx : end_idx + 1] = [] diff --git a/cecli/tools/edit_text.py b/cecli/tools/edit_text.py index 23b585807f3..3b1ae9e58f8 100644 --- a/cecli/tools/edit_text.py +++ b/cecli/tools/edit_text.py @@ -43,6 +43,8 @@ class Tool(BaseTool): "Use content ID ranges with the start_line and end_line parameters with format " "`content_id::` (the content id with the :: demarcator). For empty files, use `@000` as the " "content ID references. " + "Start and end values are inclusive: start and end content IDs both count as " + "part of the range to replace, insert at, or delete. " "Edits within a file must not be adjacent or overlapping." ), "parameters": { @@ -63,7 +65,7 @@ class Tool(BaseTool): "description": ( "The type of operation: 'replace' (replace range with" " text), 'delete' (remove range), or 'insert' (insert text" - " after start_line). Defaults to 'replace'." + " at start_line, replacing the start line)." ), }, "start_line": { @@ -250,6 +252,7 @@ def execute( new_content, successful_ops, failed_ops = apply_hashline_operations( original_content=original_content, operations=operations, + file_path=file_path_key, ) if new_content != original_content: diff --git a/cecli/tools/read_range.py b/cecli/tools/read_range.py index f9fafa4a3cb..809e0b6de97 100644 --- a/cecli/tools/read_range.py +++ b/cecli/tools/read_range.py @@ -460,7 +460,7 @@ def _is_valid_int(s): ) if cls._special_marker_count[abs_path] > 1: coder.abs_fnames.add(abs_path) - preview = f"Full contents of {rel_path} added to cotext." + preview = f"Full contents of {rel_path} will be added to context in future message." if abs_path in coder.abs_read_only_fnames: coder.abs_read_only_fnames.remove(abs_path) diff --git a/cecli/tui/app.py b/cecli/tui/app.py index 61acf5e3633..1a2d3822792 100644 --- a/cecli/tui/app.py +++ b/cecli/tui/app.py @@ -1167,8 +1167,16 @@ def _switch_to_container(self, uuid: str, suppress_input_enable: bool = False) - def create_sub_agent_container(self, uuid: str, name: str) -> None: """Create an OutputContainer for a sub-agent.""" + from cecli.helpers.agents.service import AgentService + if uuid in self._sub_agent_containers: + agent_service = AgentService.get_instance(self.worker.coder) + sub_agent_info = agent_service.sub_agents.get(uuid) + if sub_agent_info: + sub_agent_info.coder.show_announcements() + return + container = OutputContainer(id=f"output-{uuid}", classes="subagent-output") container.display = False # Hidden initially self._sub_agent_containers[uuid] = container @@ -1184,8 +1192,6 @@ def create_sub_agent_container(self, uuid: str, name: str) -> None: # Show announcements from the sub-agent's coder try: - from cecli.helpers.agents.service import AgentService - agent_service = AgentService.get_instance(self.worker.coder) sub_agent_info = agent_service.sub_agents.get(uuid) if sub_agent_info: diff --git a/cecli/tui/io.py b/cecli/tui/io.py index bdc47056a51..b17da16a2f4 100644 --- a/cecli/tui/io.py +++ b/cecli/tui/io.py @@ -134,7 +134,7 @@ def stream_print(self, *messages, **kwargs): """ # Pop coder_uuid from kwargs before passing to console coder_uuid = kwargs.pop("coder_uuid", None) - coder_uuid = kwargs.pop("type", None) + kwargs.pop("type", None) # Capture Rich rendering with forced ANSI output console = self._get_tui_console() diff --git a/cecli/tui/worker.py b/cecli/tui/worker.py index 5a19e1567a4..9da5439dcbf 100644 --- a/cecli/tui/worker.py +++ b/cecli/tui/worker.py @@ -10,6 +10,7 @@ from cecli.commands import SwitchCoderSignal from cecli.helpers.conversation import ConversationService, MessageTag +logger = logging.getLogger(__name__) # Suppress asyncio task destroyed warnings during shutdown logging.getLogger("asyncio").setLevel(logging.CRITICAL) @@ -46,6 +47,7 @@ def _run_thread(self): """Thread entry point - creates event loop and runs coder.""" self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) + self.loop.set_exception_handler(self.worker_loop_exception_handler) try: self.loop.run_until_complete(self._async_run()) @@ -96,46 +98,95 @@ async def _async_run(self): except KeyboardInterrupt: continue except SwitchCoderSignal as switch: - # Handle chat mode switches (e.g., /chat-mode architect) - try: - await self.coder.auto_save_session(force=True) - kwargs = dict(io=self.coder.io, from_coder=self.coder) - kwargs.update(switch.kwargs) - if "show_announcements" in kwargs: - del kwargs["show_announcements"] - kwargs["num_cache_warming_pings"] = 0 - kwargs["args"] = self.coder.args - # Skip summarization to avoid blocking LLM calls during mode switch - kwargs["summarize_from_coder"] = False - - new_coder = await Coder.create(**kwargs) - new_coder.args = self.coder.args - - for tag in [MessageTag.SYSTEM, MessageTag.EXAMPLES, MessageTag.STATIC]: - ConversationService.get_manager(new_coder).clear_tag(tag) - - if switch.kwargs.get("show_announcements") is False: - new_coder.suppress_announcements_for_next_prompt = True - - # Notify TUI of mode change - self.coder = new_coder - edit_format = getattr(self.coder, "edit_format", "code") or "code" - self.output_queue.put( - { - "type": "mode_change", - "mode": edit_format, - } - ) - except Exception as e: - self.output_queue.put( - {"type": "error", "message": f"Failed to switch mode: {e}"} - ) - break + await self._handle_switch_coder_signal(switch) # Continue the loop with the new coder except Exception as e: - self.output_queue.put({"type": "error", "message": str(e)}) + self.output_queue.put( + { + "type": "error", + "message": str(e), + "coder_uuid": self.coder.uuid, + } + ) break + async def _handle_switch_coder_signal(self, switch): + """Handle a SwitchCoderSignal, creating a new coder and notifying the TUI.""" + try: + from cecli.helpers.agents.service import AgentService + + # Determine the active coder — could be a sub-agent in the foreground + target_coder = self.coder + try: + agent_service = AgentService.get_instance(target_coder) + foreground = agent_service.foreground_coder + if foreground is not None: + target_coder = foreground + except Exception: + pass + + await target_coder.auto_save_session(force=True) + kwargs = dict(io=target_coder.io, from_coder=target_coder) + kwargs.update(switch.kwargs) + if "show_announcements" in kwargs: + del kwargs["show_announcements"] + kwargs["num_cache_warming_pings"] = 0 + kwargs["args"] = target_coder.args + # Skip summarization to avoid blocking LLM calls during mode switch + kwargs["summarize_from_coder"] = False + + new_coder = await Coder.create(**kwargs) + new_coder.args = target_coder.args + + for tag in [MessageTag.SYSTEM, MessageTag.EXAMPLES, MessageTag.STATIC]: + ConversationService.get_manager(new_coder).clear_tag(tag) + + if switch.kwargs.get("show_announcements") is False: + new_coder.suppress_announcements_for_next_prompt = True + + # Notify TUI of mode change + if target_coder == self.coder: + self.coder = new_coder + else: + new_coder.show_announcements() + + edit_format = getattr(target_coder, "edit_format", "code") or "code" + self.output_queue.put( + { + "type": "mode_change", + "mode": edit_format, + "coder_uuid": new_coder.uuid, + } + ) + except Exception as e: + self.output_queue.put( + { + "type": "error", + "message": f"Failed to switch mode: {e}", + "coder_uuid": target_coder, + } + ) + + def worker_loop_exception_handler(self, loop, context): + """ + This runs directly on the worker thread whenever an unhandled + exception occurs in a task or callback. + + Catches SwitchCoderSignal from fire-and-forget tasks and dispatches + them to the dedicated handler so mode switches work even when the + signal is raised outside the main coder.run() loop. + """ + exception = context.get("exception") + + if isinstance(exception, SwitchCoderSignal): + logger.info("Worker thread caught SwitchCoderSignal in global handler.") + # Schedule a coroutine to handle the switch logic on the loop + loop.create_task(self._handle_switch_coder_signal(exception)) + else: + # Always fall back to the default handler so you don't swallow + # normal bugs, tracebacks, or connection errors. + loop.default_exception_handler(context) + def interrupt(self): """Cancel the current output task on the active (foreground) coder. diff --git a/tests/basic/test_hashline_closure.py b/tests/basic/test_hashline_closure.py new file mode 100644 index 00000000000..174a24fb248 --- /dev/null +++ b/tests/basic/test_hashline_closure.py @@ -0,0 +1,423 @@ +"""Tests for closure safeguard and content boundary functions in hashline.""" + +from cecli.helpers.hashline import ( + _apply_closure_safeguard, + _fix_duplicate_content_boundaries, + _would_create_duplicate_content, +) + +# ============================================================================= +# Tests for _would_create_duplicate_content +# ============================================================================= + + +def test_would_create_duplicate_no_duplicate(): + """No duplicate content at boundaries — should return False.""" + source = ["line1", "line2", "line3", "line4", "line5"] + result = _would_create_duplicate_content(source, 1, 2, ["new_line", "other"]) + assert result is False + + +def test_would_create_duplicate_start_boundary(): + """First replacement line matches line before edit range — should return True.""" + source = ["keep_me", "old1", "old2", "after"] + # Replacement starts with "keep_me" which is the line at index 0 (start-1) + result = _would_create_duplicate_content(source, 1, 2, ["keep_me", "new_line"]) + assert result is True + + +def test_would_create_duplicate_end_boundary(): + """Last replacement line matches line after edit range — should return True.""" + source = ["before", "old1", "old2", "keep_me"] + # Replacement ends with "keep_me" which is the line at index 3 (end+1) + result = _would_create_duplicate_content(source, 1, 2, ["new1", "keep_me"]) + assert result is True + + +def test_would_create_duplicate_both_boundaries(): + """Both start and end boundaries have duplicate content.""" + source = ["start_dup", "old1", "old2", "end_dup"] + result = _would_create_duplicate_content(source, 1, 2, ["start_dup", "middle", "end_dup"]) + assert result is True + + +def test_would_create_duplicate_empty_repl_lines(): + """Empty replacement lines — should return False.""" + source = ["line1", "line2", "line3"] + result = _would_create_duplicate_content(source, 0, 1, []) + assert result is False + + +def test_would_create_duplicate_start_at_zero(): + """candidate_start is 0 (no line before) — should not check start.""" + source = ["first", "second", "third"] + # Even though repl[0] matches source[0], start is 0 so no check + result = _would_create_duplicate_content(source, 0, 1, ["first", "new"]) + assert result is False + + +def test_would_create_duplicate_end_at_end_of_file(): + """candidate_end is last line (no line after) — should not check end.""" + source = ["first", "second", "last"] + result = _would_create_duplicate_content(source, 0, 2, ["new", "last"]) + assert result is False + + +def test_would_create_duplicate_whitespace_insensitive(): + """Comparison uses rstrip(), so trailing whitespace differences are ignored.""" + source = ["spaced ", "old1", "old2", "end"] + # repl[0] has no trailing spaces but rstrip() matches + result = _would_create_duplicate_content(source, 1, 2, ["spaced", "new"]) # noqa + + +def test_would_create_duplicate_single_line_replace(): + """Single-line replace that duplicates both adjacent lines.""" + source = ["dup", "old", "dup"] + result = _would_create_duplicate_content(source, 1, 1, ["dup", "dup"]) + # repl[0] matches source[0], repl[-1] matches source[2] + assert result is True + + +def test_would_create_duplicate_delete_operation(): + """Delete operation with empty repl_lines — should return False.""" + source = ["a", "b", "c", "d"] + result = _would_create_duplicate_content(source, 1, 2, []) + assert result is False + + +# ============================================================================= +# Tests for _fix_duplicate_content_boundaries +# ============================================================================= + + +def _make_op(operation, text, start_idx, end_idx, index=0): + """Helper to create a resolved operation dict.""" + return { + "index": index, + "start_idx": start_idx, + "end_idx": end_idx, + "op": { + "operation": operation, + "text": text, + }, + } + + +def test_fix_boundaries_no_duplicate(): + """No duplicate content at boundaries — indices should remain unchanged.""" + source = ["a", "b", "c", "d", "e"] + ops = [_make_op("replace", "x\ny", 1, 2)] + result = _fix_duplicate_content_boundaries(source, ops) + assert result[0]["start_idx"] == 1 + assert result[0]["end_idx"] == 2 + + +def test_fix_boundaries_start_expansion(): + """First replacement line duplicates line before — start_idx should decrement.""" + source = ["dup", "old1", "old2", "after"] + ops = [_make_op("replace", "dup\nnew", 1, 2)] + result = _fix_duplicate_content_boundaries(source, ops) + assert result[0]["start_idx"] == 0 # Expanded backward to consume "dup" + assert result[0]["end_idx"] == 2 + + +def test_fix_boundaries_end_expansion(): + """Last replacement line duplicates line after — end_idx should increment.""" + source = ["before", "old1", "old2", "dup"] + ops = [_make_op("replace", "new\ndup", 1, 2)] + result = _fix_duplicate_content_boundaries(source, ops) + assert result[0]["start_idx"] == 1 + assert result[0]["end_idx"] == 3 # Expanded forward to consume "dup" + + +def test_fix_boundaries_both_sides(): + """Both start and end have duplicates — both indices should adjust.""" + source = ["start_dup", "old1", "old2", "end_dup"] + ops = [_make_op("replace", "start_dup\nnew\nend_dup", 1, 2)] + result = _fix_duplicate_content_boundaries(source, ops) + assert result[0]["start_idx"] == 0 # Expanded backward + assert result[0]["end_idx"] == 3 # Expanded forward + + +def test_fix_boundaries_start_at_zero(): + """start_idx is 0 — cannot expand backward, should stay 0.""" + source = ["dup", "old", "after"] + ops = [_make_op("replace", "dup\nnew", 0, 1)] + result = _fix_duplicate_content_boundaries(source, ops) + assert result[0]["start_idx"] == 0 # Already at 0 + assert result[0]["end_idx"] == 1 + + +def test_fix_boundaries_end_at_last_line(): + """end_idx is last line — cannot expand forward, should stay.""" + source = ["before", "old", "dup"] + ops = [_make_op("replace", "new\ndup", 1, 2)] + result = _fix_duplicate_content_boundaries(source, ops) + assert result[0]["start_idx"] == 1 + assert result[0]["end_idx"] == 2 # Already at last line + + +def test_fix_boundaries_skip_insert(): + """Insert operations should be skipped entirely.""" + source = ["a", "b", "c"] + ops = [_make_op("insert", "new_line", 1, 1)] + result = _fix_duplicate_content_boundaries(source, ops) + assert result[0]["start_idx"] == 1 + assert result[0]["end_idx"] == 1 + + +def test_fix_boundaries_empty_replacement(): + """Empty replacement text — should skip (no repl_lines).""" + source = ["a", "b", "c"] + ops = [_make_op("delete", "", 1, 1)] + result = _fix_duplicate_content_boundaries(source, ops) + assert result[0]["start_idx"] == 1 + assert result[0]["end_idx"] == 1 + + +def test_fix_boundaries_chained_duplicates_start(): + """Multiple consecutive duplicate lines at start — while loop should consume all.""" + source = ["dup", "dup", "dup", "old", "after"] + # Replacement starts with "dup" repeated — should consume all 3 "dup" lines + ops = [_make_op("replace", "dup\nnew", 3, 3)] + result = _fix_duplicate_content_boundaries(source, ops) + assert result[0]["start_idx"] == 0 # All 3 "dup" lines consumed + assert result[0]["end_idx"] == 3 + + +def test_fix_boundaries_chained_duplicates_end(): + """Multiple consecutive duplicate lines at end — while loop should consume all.""" + source = ["before", "old", "dup", "dup", "dup"] + ops = [_make_op("replace", "new\ndup", 1, 1)] + result = _fix_duplicate_content_boundaries(source, ops) + assert result[0]["start_idx"] == 1 + assert result[0]["end_idx"] == 4 # All 3 "dup" lines consumed + + +def test_fix_boundaries_multiple_ops(): + """Multiple operations — each should be processed independently.""" + source = ["dup1", "old1", "old2", "dup2", "old3", "old4"] + ops = [ + _make_op("replace", "dup1\nnew", 1, 2, index=0), # Should expand start to 0 + _make_op("replace", "new\ndup2", 3, 3, index=1), # Should expand end to 3 + ] + result = _fix_duplicate_content_boundaries(source, ops) + # First op: start expanded backward from 1 to 0 + assert result[0]["start_idx"] == 0 + assert result[0]["end_idx"] == 2 + # Second op: end expanded forward from 3 to 3 (dup2 at index 4... wait, no) + # Actually source[3]="dup2", repl[-1]="dup2" → end_idx goes from 3 to... + # dup2 is at index 3, source[4]="old3" is not "dup2", so end stays at 3 + assert result[1]["start_idx"] == 3 + assert result[1]["end_idx"] == 3 + + +def test_fix_boundaries_whitespace_insensitive(): + """Boundary fix uses rstrip() — should match despite trailing whitespace differences.""" + source = ["dup ", "old1", "old2", "after"] + ops = [_make_op("replace", "dup\nnew", 1, 2)] + result = _fix_duplicate_content_boundaries(source, ops) + assert result[0]["start_idx"] == 0 + + +def test_fix_boundaries_partial_match_does_not_trigger(): + """Only exact (rstripped) match triggers expansion, not partial/substring.""" + source = ["different", "old1", "old2", "after"] + ops = [_make_op("replace", "diff\nnew", 1, 2)] + result = _fix_duplicate_content_boundaries(source, ops) + assert result[0]["start_idx"] == 1 # No change + assert result[0]["end_idx"] == 2 + + +# ============================================================================= +# Tests for _apply_closure_safeguard (tree-sitter boundary healing) +# ============================================================================= + + +def _make_closure_op(operation, text, start_idx, end_idx, index=0): + """Helper to create a resolved operation for _apply_closure_safeguard.""" + op = { + "operation": operation, + "text": text, + } + return { + "index": index, + "start_idx": start_idx, + "end_idx": end_idx, + "op": op, + } + + +def test_closure_safeguard_no_file_path(): + """Without file_path, ops should be returned unchanged.""" + ops = [_make_closure_op("replace", "pass", 0, 0)] + result = _apply_closure_safeguard("x = 1", ops, file_path=None) + assert result == ops + + +def test_closure_safeguard_unsupported_language(): + """File path with unsupported language — ops returned unchanged.""" + ops = [_make_closure_op("replace", "xyz", 0, 0)] + result = _apply_closure_safeguard("some content", ops, file_path="test.xyz") + assert result == ops + + +def test_closure_safeguard_empty_ops(): + """Empty ops list — returned as-is.""" + result = _apply_closure_safeguard("x = 1", [], file_path="test.py") + assert result == [] + + +def test_closure_safeguard_valid_code_no_change(): + """Edit that produces valid Python should keep original bounds.""" + source = """x = 1 +y = 2 +z = 3 +""" + # Replace all 3 lines with 2 valid lines + ops = [_make_closure_op("replace", "a = 10\nb = 20", 0, 2)] + result = _apply_closure_safeguard(source, ops, file_path="test.py") + assert result[0]["start_idx"] == 0 + assert result[0]["end_idx"] == 2 + + +def test_closure_safeguard_heals_outer_scope(): + """ + When an edit eats outer scope (e.g., removes opening brace but leaves + closing brace), the safeguard should expand boundaries to consume the stray brace. + """ + source = """def foo(): + return { + "key": "value" + } +""" + # Lines: 0="def foo():", 1=" return {", 2=' "key": "value"', + # 3=" }" + # Replacing lines 1-2 with ' return "value"' leaves a stray '}' on line 3, + # which tree-sitter finds as an ERROR. + # The safeguard should expand end_idx to include line 3. + ops = [_make_closure_op("replace", ' return "value"', 1, 2)] + result = _apply_closure_safeguard(source, ops, file_path="test.py") + assert result[0]["start_idx"] == 1 + assert result[0]["end_idx"] == 3 # Expanded to consume stray closing brace + + +def test_closure_safeguard_simple_python_replace(): + """Simple replace that maintains valid Python syntax.""" + source = """x = 1 +y = 2 +z = 3 +""" + # Replace line 1 ("y = 2") with "y = 99" + ops = [_make_closure_op("replace", "y = 99", 1, 1)] + result = _apply_closure_safeguard(source, ops, file_path="test.py") + assert result[0]["start_idx"] == 1 + assert result[0]["end_idx"] == 1 + + +def test_closure_safeguard_delete_valid(): + """Delete operation that maintains valid syntax.""" + source = """x = 1 +y = 2 +z = 3 +""" + # Delete line 1 ("y = 2") — result is "x = 1\nz = 3" which is valid + ops = [_make_closure_op("delete", None, 1, 1)] + result = _apply_closure_safeguard(source, ops, file_path="test.py") + assert result[0]["start_idx"] == 1 + assert result[0]["end_idx"] == 1 + + +def test_closure_safeguard_heals_stray_except(): + """ + When an edit eats the 'try' header but leaves the 'except' block, + the safeguard should expand boundaries to consume the stray 'except'. + """ + source = """try: + pass +except: + pass +""" + # Lines: 0="try:", 1=" pass", 2="except:", 3=" pass" + # Replacing lines 0-1 with 'x = 1' leaves a stray 'except:' on line 2. + # The safeguard should expand the boundary to avoid the parse error. + ops = [_make_closure_op("replace", "x = 1", 0, 1)] + result = _apply_closure_safeguard(source, ops, file_path="test.py") + healed_start = result[0]["start_idx"] + healed_end = result[0]["end_idx"] + # Boundaries should have changed from original (0, 1) + assert (healed_start, healed_end) != (0, 1), "Expected safeguard to heal stray except error" + # Verify the healed result parses correctly via tree-sitter (what the safeguard uses) + from cecli.helpers.grep_ast.tsl import get_parser + + parser = get_parser("python") + lines = source.splitlines() + new_lines = lines[:healed_start] + ["x = 1"] + lines[healed_end + 1 :] + new_source = "\n".join(new_lines) + tree = parser.parse(new_source.encode("utf-8")) + assert ( + not tree.root_node.has_error + ), f"Healed source still has tree-sitter errors: {new_source!r}" + + +def test_closure_safeguard_skip_insert(): + """Insert operations should be skipped by the safeguard.""" + source = "x = 1\ny = 2\n" + ops = [_make_closure_op("insert", "z = 3", 1, 1)] + result = _apply_closure_safeguard(source, ops, file_path="test.py") + assert result == ops # Insert ops pass through unchanged + + +def test_closure_safeguard_with_outer_function_brace(): + """ + Test that the safeguard can expand an edit that would 'eat' an outer + scope's closing brace. + """ + source = """if True: + if False: + x = 1 +y = 2 +""" + lines = source.splitlines() # noqa + # Line 0: "if True:" + # Line 1: " if False:" + # Line 2: " x = 1" + # Line 3: "y = 2" + + # Replace lines 1-2 with something that would leave invalid indentation + # Let me test something that should work: replacing just the inner if + ops = [_make_closure_op("replace", " x = 99", 1, 2)] + result = _apply_closure_safeguard(source, ops, file_path="test.py") + # The replacement " x = 99" should be valid, keep original bounds + assert result[0]["start_idx"] == 1 + assert result[0]["end_idx"] == 2 + + +def test_closure_safeguard_heals_broken_dict(): + """ + A replace that removes the opening brace but keeps the closing brace + should have its boundaries expanded by the safeguard to include the + stray closing brace. + """ + source = """def foo(): + return { + "key": "value" + } +""" + # Replace lines 1-2 with ' return "value"' — this eats the opening + # brace but leaves the closing '}' on line 3, producing a parse error. + ops = [_make_closure_op("replace", ' return "value"', 1, 2)] + result = _apply_closure_safeguard(source, ops, file_path="test.py") + healed_start = result[0]["start_idx"] + healed_end = result[0]["end_idx"] + # Verify the healed result parses correctly via tree-sitter + from cecli.helpers.grep_ast.tsl import get_parser + + parser = get_parser("python") + lines = source.splitlines() + new_lines = lines[:healed_start] + [' return "value"'] + lines[healed_end + 1 :] + new_source = "\n".join(new_lines) + tree = parser.parse(new_source.encode("utf-8")) + assert ( + not tree.root_node.has_error + ), f"Healed source still has tree-sitter errors: {new_source!r}" diff --git a/tests/tools/test_insert_block.py b/tests/tools/test_insert_block.py index 9171a6cfee4..55598295b1b 100644 --- a/tests/tools/test_insert_block.py +++ b/tests/tools/test_insert_block.py @@ -99,8 +99,9 @@ def test_position_top_succeeds_with_no_patterns(coder_with_file): assert result.startswith("Successfully executed EditText.") lines = file_path.read_text().splitlines() - assert lines[0] == "first line" # Original first line remains first - assert lines[1] == "inserted line" # Inserted line comes after line 1 + # Inserted line replaces first line (inclusive bounds) assert lines[1] == "second line" + # Original second line shifts up + assert lines[0] == "inserted line" coder.io.tool_error.assert_not_called() @@ -222,7 +223,7 @@ def test_line_number_beyond_file_length_appends(coder_with_file): assert result.startswith("Successfully executed EditText.") content = file_path.read_text() - assert content == "first line\nsecond line\nappended line\n" + assert content == "first line\nappended line\n" coder.io.tool_error.assert_not_called() @@ -255,5 +256,5 @@ def test_line_number_beyond_file_length_appends_no_trailing_newline(coder_with_f content = file_path.read_text() # Current implementation joins with \n, but respects original trailing newline # Original doesn't have trailing newline, so result won't have one either - assert content == "first line\nsecond line\nappended line" + assert content == "first line\nappended line" coder.io.tool_error.assert_not_called()