diff --git a/cli/fix/move_unnecessary_dependencies.py b/cli/fix/move_unnecessary_dependencies.py index a8006803..d81454ea 100644 --- a/cli/fix/move_unnecessary_dependencies.py +++ b/cli/fix/move_unnecessary_dependencies.py @@ -3,7 +3,7 @@ """ Move unnecessary meta dependencies to guarded include_role/import_role -for better performance. +for better performance, while preserving YAML comments, quotes, and layout. Heuristic (matches tests/integration/test_unnecessary_role_dependencies.py): - A dependency is considered UNNECESSARY if: @@ -17,11 +17,11 @@ Action: - Remove such dependencies from roles//meta/main.yml. - Prepend a guarded include block to roles//tasks/01_core.yml (preferred) or roles//tasks/main.yml if 01_core.yml is absent. -- If multiple dependencies moved for a role, use a loop over include_role. +- If multiple dependencies are moved for a role, use a loop over include_role. Notes: - Creates .bak backups for modified YAML files. -- Uses PyYAML; formatting will be normalized. +- Requires ruamel.yaml to preserve comments/quotes everywhere. """ import argparse @@ -32,11 +32,29 @@ import shutil import sys from typing import Dict, Set, List, Tuple, Optional -import yaml +# --- Require ruamel.yaml for full round-trip preservation --- +try: + from ruamel.yaml import YAML + from ruamel.yaml.comments import CommentedMap, CommentedSeq + from ruamel.yaml.scalarstring import SingleQuotedScalarString + _HAVE_RUAMEL = True +except Exception: + _HAVE_RUAMEL = False +if not _HAVE_RUAMEL: + print("[ERR] ruamel.yaml is required to preserve comments/quotes. Install with: pip install ruamel.yaml", file=sys.stderr) + sys.exit(3) + +yaml_rt = YAML() +yaml_rt.preserve_quotes = True +yaml_rt.width = 10**9 # prevent line wrapping # ---------------- Utilities ---------------- +def _backup(path: str): + if os.path.exists(path): + shutil.copy2(path, path + ".bak") + def read_text(path: str) -> str: try: with open(path, "r", encoding="utf-8") as f: @@ -44,56 +62,45 @@ def read_text(path: str) -> str: except Exception: return "" - -def write_text(path: str, content: str): - with open(path, "w", encoding="utf-8") as f: - f.write(content) - - -def safe_load_yaml(path: str): +def load_yaml_rt(path: str): try: with open(path, "r", encoding="utf-8") as f: - data = yaml.safe_load(f) - return data if data is not None else {} + data = yaml_rt.load(f) + return data if data is not None else CommentedMap() except FileNotFoundError: - return {} + return CommentedMap() except Exception as e: print(f"[WARN] Failed to parse YAML: {path}: {e}", file=sys.stderr) - return {} + return CommentedMap() - -def safe_dump_yaml(data, path: str): - # backup first - if os.path.exists(path): - shutil.copy2(path, path + ".bak") +def dump_yaml_rt(data, path: str): + _backup(path) with open(path, "w", encoding="utf-8") as f: - yaml.safe_dump(data, f, sort_keys=False, allow_unicode=True) - + yaml_rt.dump(data, f) def roles_root(project_root: str) -> str: return os.path.join(project_root, "roles") - def iter_role_dirs(project_root: str) -> List[str]: root = roles_root(project_root) return [d for d in glob.glob(os.path.join(root, "*")) if os.path.isdir(d)] - def role_name_from_dir(role_dir: str) -> str: return os.path.basename(role_dir.rstrip(os.sep)) - def path_if_exists(*parts) -> Optional[str]: p = os.path.join(*parts) return p if os.path.exists(p) else None - def gather_yaml_files(base: str, patterns: List[str]) -> List[str]: files: List[str] = [] for pat in patterns: files.extend(glob.glob(os.path.join(base, pat), recursive=True)) return [f for f in files if os.path.isfile(f)] +def sq(v: str): + """Return a single-quoted scalar (ruamel) for consistent quoting.""" + return SingleQuotedScalarString(v) # ---------------- Providers: vars & handlers ---------------- @@ -109,7 +116,6 @@ def flatten_keys(data) -> Set[str]: out |= flatten_keys(item) return out - def collect_role_defined_vars(role_dir: str) -> Set[str]: """Vars a role 'provides': defaults/vars keys + set_fact keys in tasks.""" provided: Set[str] = set() @@ -117,13 +123,13 @@ def collect_role_defined_vars(role_dir: str) -> Set[str]: for rel in ("defaults/main.yml", "vars/main.yml"): p = path_if_exists(role_dir, rel) if p: - data = safe_load_yaml(p) + data = load_yaml_rt(p) provided |= flatten_keys(data) # set_fact keys task_files = gather_yaml_files(os.path.join(role_dir, "tasks"), ["**/*.yml", "*.yml"]) for tf in task_files: - data = safe_load_yaml(tf) + data = load_yaml_rt(tf) if isinstance(data, list): for task in data: if isinstance(task, dict) and "set_fact" in task and isinstance(task["set_fact"], dict): @@ -132,13 +138,12 @@ def collect_role_defined_vars(role_dir: str) -> Set[str]: noisy = {"when", "name", "vars", "tags", "register"} return {v for v in provided if isinstance(v, str) and v and v not in noisy} - def collect_role_handler_names(role_dir: str) -> Set[str]: """Handler names defined by a role (for notify detection).""" handler_file = path_if_exists(role_dir, "handlers/main.yml") if not handler_file: return set() - data = safe_load_yaml(handler_file) + data = load_yaml_rt(handler_file) names: Set[str] = set() if isinstance(data, list): for task in data: @@ -148,7 +153,6 @@ def collect_role_handler_names(role_dir: str) -> Set[str]: names.add(nm.strip()) return names - # ---------------- Consumers: usage scanning ---------------- def find_var_positions(text: str, varname: str) -> List[int]: @@ -161,7 +165,6 @@ def find_var_positions(text: str, varname: str) -> List[int]: positions.append(m.start()) return positions - def first_var_use_offset_in_text(text: str, provided_vars: Set[str]) -> Optional[int]: first: Optional[int] = None for v in provided_vars: @@ -170,7 +173,6 @@ def first_var_use_offset_in_text(text: str, provided_vars: Set[str]) -> Optional first = off return first - def first_include_offset_for_role(text: str, producer_role: str) -> Optional[int]: """ Find earliest include/import of a given role in this YAML text. @@ -187,7 +189,6 @@ def first_include_offset_for_role(text: str, producer_role: str) -> Optional[int m = pattern.search(text) return m.start() if m else None - def find_notify_offsets_for_handlers(text: str, handler_names: Set[str]) -> List[int]: """ Heuristic: for each handler name, find occurrences where 'notify' appears within @@ -205,12 +206,11 @@ def find_notify_offsets_for_handlers(text: str, handler_names: Set[str]) -> List offsets.append(start) return sorted(offsets) - def parse_meta_dependencies(role_dir: str) -> List[str]: meta = path_if_exists(role_dir, "meta/main.yml") if not meta: return [] - data = safe_load_yaml(meta) + data = load_yaml_rt(meta) dd = data.get("dependencies") deps: List[str] = [] if isinstance(dd, list): @@ -223,7 +223,6 @@ def parse_meta_dependencies(role_dir: str) -> List[str]: deps.append(str(item["name"])) return deps - # ---------------- Fix application ---------------- def sanitize_run_once_var(role_name: str) -> str: @@ -236,39 +235,59 @@ def sanitize_run_once_var(role_name: str) -> str: def build_include_block_yaml(consumer_role: str, moved_deps: List[str]) -> List[dict]: """ Build a guarded block that includes one or many roles. - We prepend this to tasks/01_core.yml or tasks/main.yml. + This block will be prepended to tasks/01_core.yml or tasks/main.yml. """ guard_var = sanitize_run_once_var(consumer_role) if len(moved_deps) == 1: - include_task = { - "name": f"Include moved dependency '{moved_deps[0]}'", - "include_role": {"name": moved_deps[0]}, - } + inner_tasks = [ + { + "name": f"Include moved dependency '{moved_deps[0]}'", + "include_role": {"name": moved_deps[0]}, + } + ] else: - include_task = { - "name": "Include moved dependencies", - "include_role": {"name": "{{ item }}"}, - "loop": moved_deps, - } + inner_tasks = [ + { + "name": "Include moved dependencies", + "include_role": {"name": "{{ item }}"}, + "loop": moved_deps, + } + ] - block = { - "block": [ - include_task, - {"set_fact": {guard_var: True}}, - ], - "when": f"{guard_var} is not defined", + # Always set the run_once fact at the end + inner_tasks.append({"set_fact": {guard_var: True}}) + + # Correct Ansible block structure + block_task = { "name": "Load former meta dependencies once", + "block": inner_tasks, + "when": f"{guard_var} is not defined", } - return [block] + return [block_task] -def prepend_tasks(tasks_path: str, new_tasks: List[dict], dry_run: bool): +def prepend_tasks(tasks_path: str, new_tasks, dry_run: bool): + """ + Prepend new_tasks (CommentedSeq) to an existing tasks YAML list while preserving comments. + If the file does not exist, create it with new_tasks. + """ if os.path.exists(tasks_path): - existing = safe_load_yaml(tasks_path) - if not isinstance(existing, list): - existing = [] if existing in (None, {}) else [existing] - combined = new_tasks + existing + existing = load_yaml_rt(tasks_path) + if isinstance(existing, list): + combined = CommentedSeq() + for item in new_tasks: + combined.append(item) + for item in existing: + combined.append(item) + elif isinstance(existing, dict): + # Rare case: tasks file with a single mapping; coerce to list + combined = CommentedSeq() + for item in new_tasks: + combined.append(item) + combined.append(existing) + else: + combined = new_tasks else: os.makedirs(os.path.dirname(tasks_path), exist_ok=True) combined = new_tasks @@ -276,51 +295,55 @@ def prepend_tasks(tasks_path: str, new_tasks: List[dict], dry_run: bool): if dry_run: print(f"[DRY-RUN] Would write {tasks_path} with {len(new_tasks)} prepended task(s).") return - safe_dump_yaml(combined, tasks_path) + + dump_yaml_rt(combined, tasks_path) print(f"[OK] Updated {tasks_path} (prepended {len(new_tasks)} task(s)).") - def update_meta_remove_deps(meta_path: str, remove: List[str], dry_run: bool): - data = safe_load_yaml(meta_path) - deps = data.get("dependencies") + """ + Remove entries from meta.dependencies while leaving the rest of the file intact. + Quotes, comments, key order, and line breaks are preserved. + Returns True if a change would be made (or was made when not in dry-run). + """ + if not os.path.exists(meta_path): + return False + + doc = load_yaml_rt(meta_path) + deps = doc.get("dependencies") if not isinstance(deps, list): return False - # Normalize all to strings (role/name forms) def dep_name(item): - if isinstance(item, str): - return item if isinstance(item, dict): return item.get("role") or item.get("name") - return None + return item - kept = [] - removed_any = False + keep = CommentedSeq() + removed = [] for item in deps: name = dep_name(item) if name in remove: - removed_any = True + removed.append(name) else: - kept.append(item) + keep.append(item) - if not removed_any: + if not removed: return False - if kept: - data["dependencies"] = kept + if keep: + doc["dependencies"] = keep else: - # remove key entirely - data.pop("dependencies", None) + if "dependencies" in doc: + del doc["dependencies"] if dry_run: - print(f"[DRY-RUN] Would rewrite {meta_path}; removed: {', '.join(remove)}") + print(f"[DRY-RUN] Would rewrite {meta_path}; removed: {', '.join(removed)}") return True - safe_dump_yaml(data, meta_path) - print(f"[OK] Rewrote {meta_path}; removed: {', '.join(remove)}") + dump_yaml_rt(doc, meta_path) + print(f"[OK] Rewrote {meta_path}; removed: {', '.join(removed)}") return True - def dependency_is_unnecessary(consumer_dir: str, consumer_name: str, producer_name: str, @@ -340,7 +363,6 @@ def dependency_is_unnecessary(consumer_dir: str, # 2) Tasks: any usage before include/import? If yes -> keep meta dep task_files = gather_yaml_files(os.path.join(consumer_dir, "tasks"), ["**/*.yml", "*.yml"]) - any_usage = False for p in task_files: text = read_text(p) if not text: @@ -350,19 +372,16 @@ def dependency_is_unnecessary(consumer_dir: str, notify_offs = find_notify_offsets_for_handlers(text, provider_handlers) if var_use_off is not None: - any_usage = True if include_off is None or include_off > var_use_off: return False # used before include for noff in notify_offs: - any_usage = True if include_off is None or include_off > noff: return False # notify before include # If we get here: no early use, and either no usage at all or usage after include return True - def process_role(role_dir: str, providers_index: Dict[str, Tuple[Set[str], Set[str]]], only_role: Optional[str], @@ -381,8 +400,10 @@ def process_role(role_dir: str, # Build provider vars/handlers accessors moved: List[str] = [] for producer in meta_deps: + # Only consider local roles we can analyze + producer_dir = path_if_exists(os.path.dirname(role_dir), producer) or path_if_exists(os.path.dirname(roles_root(os.path.dirname(role_dir))), "roles", producer) if producer not in providers_index: - # External/unknown role → skip + # Unknown/external role → skip (we cannot verify safety) continue pvars, phandlers = providers_index[producer] if dependency_is_unnecessary(role_dir, consumer_name, producer, pvars, phandlers): @@ -403,7 +424,6 @@ def process_role(role_dir: str, prepend_tasks(target_tasks, include_block, dry_run=dry_run) return True - def build_providers_index(all_roles: List[str]) -> Dict[str, Tuple[Set[str], Set[str]]]: """ Map role_name -> (provided_vars, handler_names) @@ -414,10 +434,9 @@ def build_providers_index(all_roles: List[str]) -> Dict[str, Tuple[Set[str], Set index[rn] = (collect_role_defined_vars(rd), collect_role_handler_names(rd)) return index - def main(): parser = argparse.ArgumentParser( - description="Move unnecessary meta dependencies to guarded include_role for performance." + description="Move unnecessary meta dependencies to guarded include_role for performance (preserve comments/quotes)." ) parser.add_argument( "--project-root", @@ -457,6 +476,5 @@ def main(): else: print("[OK] Finished moving unnecessary dependencies.") - if __name__ == "__main__": main()