"""Text manipulation for flake.nix and Terraform files.""" import re from pathlib import Path from typing import Tuple from models import HostConfig def remove_from_flake_nix(hostname: str, repo_root: Path) -> bool: """ Remove host entry from flake.nix nixosConfigurations. Args: hostname: Hostname to remove repo_root: Path to repository root Returns: True if found and removed, False if not found """ flake_path = repo_root / "flake.nix" content = flake_path.read_text() # Check if hostname exists hostname_pattern = rf"^ {re.escape(hostname)} = nixpkgs\.lib\.nixosSystem" if not re.search(hostname_pattern, content, re.MULTILINE): return False # Match the entire block from "hostname = " to "};" replace_pattern = rf"^ {re.escape(hostname)} = nixpkgs\.lib\.nixosSystem \{{.*?^ \}};\n" new_content, count = re.subn(replace_pattern, "", content, flags=re.MULTILINE | re.DOTALL) if count == 0: return False flake_path.write_text(new_content) return True def remove_from_terraform_vms(hostname: str, repo_root: Path) -> bool: """ Remove VM entry from terraform/vms.tf locals.vms map. Args: hostname: Hostname to remove repo_root: Path to repository root Returns: True if found and removed, False if not found """ terraform_path = repo_root / "terraform" / "vms.tf" content = terraform_path.read_text() # Check if hostname exists hostname_pattern = rf'^\s+"{re.escape(hostname)}" = \{{' if not re.search(hostname_pattern, content, re.MULTILINE): return False # Match the entire block from "hostname" = { to } replace_pattern = rf'^\s+"{re.escape(hostname)}" = \{{.*?^\s+\}}\n' new_content, count = re.subn(replace_pattern, "", content, flags=re.MULTILINE | re.DOTALL) if count == 0: return False terraform_path.write_text(new_content) return True def remove_from_vault_terraform(hostname: str, repo_root: Path) -> bool: """ Remove host policy from terraform/vault/hosts-generated.tf. Args: hostname: Hostname to remove repo_root: Path to repository root Returns: True if found and removed, False if not found """ vault_tf_path = repo_root / "terraform" / "vault" / "hosts-generated.tf" if not vault_tf_path.exists(): return False content = vault_tf_path.read_text() # Check if hostname exists in the policies if f'"{hostname}"' not in content: return False # Match the host entry block within generated_host_policies # Pattern matches: "hostname" = { ... } with possible trailing newlines replace_pattern = rf'\s*"{re.escape(hostname)}" = \{{\s*paths = \[.*?\]\s*\}}\n?' new_content, count = re.subn(replace_pattern, "", content, flags=re.DOTALL) if count == 0: return False vault_tf_path.write_text(new_content) return True def check_entries_exist(hostname: str, repo_root: Path) -> Tuple[bool, bool, bool]: """ Check which entries exist for a hostname. Args: hostname: Hostname to check repo_root: Path to repository root Returns: Tuple of (flake_exists, terraform_vms_exists, vault_exists) """ # Check flake.nix flake_path = repo_root / "flake.nix" flake_content = flake_path.read_text() flake_pattern = rf"^ {re.escape(hostname)} = nixpkgs\.lib\.nixosSystem" flake_exists = bool(re.search(flake_pattern, flake_content, re.MULTILINE)) # Check terraform/vms.tf terraform_path = repo_root / "terraform" / "vms.tf" terraform_content = terraform_path.read_text() terraform_pattern = rf'^\s+"{re.escape(hostname)}" = \{{' terraform_exists = bool(re.search(terraform_pattern, terraform_content, re.MULTILINE)) # Check terraform/vault/hosts-generated.tf vault_tf_path = repo_root / "terraform" / "vault" / "hosts-generated.tf" vault_exists = False if vault_tf_path.exists(): vault_content = vault_tf_path.read_text() vault_exists = f'"{hostname}"' in vault_content return (flake_exists, terraform_exists, vault_exists) def update_flake_nix(config: HostConfig, repo_root: Path, force: bool = False) -> None: """ Add or update host entry in flake.nix nixosConfigurations. Args: config: Host configuration repo_root: Path to repository root force: If True, replace existing entry; if False, insert new entry """ flake_path = repo_root / "flake.nix" content = flake_path.read_text() # Create new entry new_entry = f""" {config.hostname} = nixpkgs.lib.nixosSystem {{ inherit system; specialArgs = {{ inherit inputs self sops-nix; }}; modules = [ ( {{ config, pkgs, ... }}: {{ nixpkgs.overlays = commonOverlays; }} ) ./hosts/{config.hostname} sops-nix.nixosModules.sops ]; }}; """ # Check if hostname already exists hostname_pattern = rf"^ {re.escape(config.hostname)} = nixpkgs\.lib\.nixosSystem" existing_match = re.search(hostname_pattern, content, re.MULTILINE) if existing_match and force: # Replace existing entry # Match the entire block from "hostname = " to "};" replace_pattern = rf"^ {re.escape(config.hostname)} = nixpkgs\.lib\.nixosSystem \{{.*?^ \}};\n" new_content, count = re.subn(replace_pattern, new_entry, content, flags=re.MULTILINE | re.DOTALL) if count == 0: raise ValueError(f"Could not find existing entry for {config.hostname} in flake.nix") else: # Insert new entry before closing brace of nixosConfigurations # Pattern: " };\n packages = forAllSystems" pattern = r"( \};)\n( packages = forAllSystems)" replacement = rf"{new_entry}\g<1>\n\g<2>" new_content, count = re.subn(pattern, replacement, content) if count == 0: raise ValueError( "Could not find insertion point in flake.nix. " "Looking for pattern: ' };\\n packages = forAllSystems'" ) flake_path.write_text(new_content) def update_terraform_vms(config: HostConfig, repo_root: Path, force: bool = False) -> None: """ Add or update VM entry in terraform/vms.tf locals.vms map. Args: config: Host configuration repo_root: Path to repository root force: If True, replace existing entry; if False, insert new entry """ terraform_path = repo_root / "terraform" / "vms.tf" content = terraform_path.read_text() # Create new entry based on whether we have static IP or DHCP if config.is_static_ip: new_entry = f''' "{config.hostname}" = {{ ip = "{config.ip}" cpu_cores = {config.cpu} memory = {config.memory} disk_size = "{config.disk}" }} ''' else: new_entry = f''' "{config.hostname}" = {{ cpu_cores = {config.cpu} memory = {config.memory} disk_size = "{config.disk}" }} ''' # Check if hostname already exists hostname_pattern = rf'^\s+"{re.escape(config.hostname)}" = \{{' existing_match = re.search(hostname_pattern, content, re.MULTILINE) if existing_match and force: # Replace existing entry # Match the entire block from "hostname" = { to } replace_pattern = rf'^\s+"{re.escape(config.hostname)}" = \{{.*?^\s+\}}\n' new_content, count = re.subn(replace_pattern, new_entry, content, flags=re.MULTILINE | re.DOTALL) if count == 0: raise ValueError(f"Could not find existing entry for {config.hostname} in terraform/vms.tf") else: # Insert new entry before closing brace # Pattern: " }\n\n # Compute VM configurations" pattern = r"( \})\n\n( # Compute VM configurations)" replacement = rf"{new_entry}\g<1>\n\n\g<2>" new_content, count = re.subn(pattern, replacement, content) if count == 0: raise ValueError( "Could not find insertion point in terraform/vms.tf. " "Looking for pattern: ' }\\n\\n # Compute VM configurations'" ) terraform_path.write_text(new_content) def add_wrapped_token_to_vm(hostname: str, wrapped_token: str, repo_root: Path) -> None: """ Add or update the vault_wrapped_token field in an existing VM entry. Args: hostname: Hostname of the VM wrapped_token: The wrapped token to add repo_root: Path to repository root """ terraform_path = repo_root / "terraform" / "vms.tf" content = terraform_path.read_text() # Find the VM entry hostname_pattern = rf'^\s+"{re.escape(hostname)}" = \{{' match = re.search(hostname_pattern, content, re.MULTILINE) if not match: raise ValueError(f"Could not find VM entry for {hostname} in terraform/vms.tf") # Find the full VM block block_pattern = rf'(^\s+"{re.escape(hostname)}" = \{{)(.*?)(^\s+\}})' block_match = re.search(block_pattern, content, re.MULTILINE | re.DOTALL) if not block_match: raise ValueError(f"Could not parse VM block for {hostname}") block_start = block_match.group(1) block_content = block_match.group(2) block_end = block_match.group(3) # Check if vault_wrapped_token already exists if "vault_wrapped_token" in block_content: # Update existing token block_content = re.sub( r'vault_wrapped_token\s*=\s*"[^"]*"', f'vault_wrapped_token = "{wrapped_token}"', block_content ) else: # Add new token field (add before closing brace) # Find the last field and add after it block_content = block_content.rstrip() if block_content and not block_content.endswith("\n"): block_content += "\n" block_content += f' vault_wrapped_token = "{wrapped_token}"\n' # Reconstruct the block new_block = block_start + block_content + block_end # Replace in content new_content = re.sub( rf'^\s+"{re.escape(hostname)}" = \{{.*?^\s+\}}', new_block, content, flags=re.MULTILINE | re.DOTALL ) terraform_path.write_text(new_content)