Files
nixos-servers/scripts/create-host/manipulators.py
Torjus Håkestad 86249c466b
Some checks failed
Run nix flake check / flake-check (push) Failing after 21m31s
Run nix flake check / flake-check (pull_request) Failing after 15m17s
create-host: add delete feature
2026-02-03 12:11:41 +01:00

313 lines
10 KiB
Python

"""Text manipulation for flake.nix and Terraform files."""
import re
from pathlib import Path
from typing import Tuple
from models import HostConfig
def remove_from_flake_nix(hostname: str, repo_root: Path) -> bool:
"""
Remove host entry from flake.nix nixosConfigurations.
Args:
hostname: Hostname to remove
repo_root: Path to repository root
Returns:
True if found and removed, False if not found
"""
flake_path = repo_root / "flake.nix"
content = flake_path.read_text()
# Check if hostname exists
hostname_pattern = rf"^ {re.escape(hostname)} = nixpkgs\.lib\.nixosSystem"
if not re.search(hostname_pattern, content, re.MULTILINE):
return False
# Match the entire block from "hostname = " to "};"
replace_pattern = rf"^ {re.escape(hostname)} = nixpkgs\.lib\.nixosSystem \{{.*?^ \}};\n"
new_content, count = re.subn(replace_pattern, "", content, flags=re.MULTILINE | re.DOTALL)
if count == 0:
return False
flake_path.write_text(new_content)
return True
def remove_from_terraform_vms(hostname: str, repo_root: Path) -> bool:
"""
Remove VM entry from terraform/vms.tf locals.vms map.
Args:
hostname: Hostname to remove
repo_root: Path to repository root
Returns:
True if found and removed, False if not found
"""
terraform_path = repo_root / "terraform" / "vms.tf"
content = terraform_path.read_text()
# Check if hostname exists
hostname_pattern = rf'^\s+"{re.escape(hostname)}" = \{{'
if not re.search(hostname_pattern, content, re.MULTILINE):
return False
# Match the entire block from "hostname" = { to }
replace_pattern = rf'^\s+"{re.escape(hostname)}" = \{{.*?^\s+\}}\n'
new_content, count = re.subn(replace_pattern, "", content, flags=re.MULTILINE | re.DOTALL)
if count == 0:
return False
terraform_path.write_text(new_content)
return True
def remove_from_vault_terraform(hostname: str, repo_root: Path) -> bool:
"""
Remove host policy from terraform/vault/hosts-generated.tf.
Args:
hostname: Hostname to remove
repo_root: Path to repository root
Returns:
True if found and removed, False if not found
"""
vault_tf_path = repo_root / "terraform" / "vault" / "hosts-generated.tf"
if not vault_tf_path.exists():
return False
content = vault_tf_path.read_text()
# Check if hostname exists in the policies
if f'"{hostname}"' not in content:
return False
# Match the host entry block within generated_host_policies
# Pattern matches: "hostname" = { ... } with possible trailing newlines
replace_pattern = rf'\s*"{re.escape(hostname)}" = \{{\s*paths = \[.*?\]\s*\}}\n?'
new_content, count = re.subn(replace_pattern, "", content, flags=re.DOTALL)
if count == 0:
return False
vault_tf_path.write_text(new_content)
return True
def check_entries_exist(hostname: str, repo_root: Path) -> Tuple[bool, bool, bool]:
"""
Check which entries exist for a hostname.
Args:
hostname: Hostname to check
repo_root: Path to repository root
Returns:
Tuple of (flake_exists, terraform_vms_exists, vault_exists)
"""
# Check flake.nix
flake_path = repo_root / "flake.nix"
flake_content = flake_path.read_text()
flake_pattern = rf"^ {re.escape(hostname)} = nixpkgs\.lib\.nixosSystem"
flake_exists = bool(re.search(flake_pattern, flake_content, re.MULTILINE))
# Check terraform/vms.tf
terraform_path = repo_root / "terraform" / "vms.tf"
terraform_content = terraform_path.read_text()
terraform_pattern = rf'^\s+"{re.escape(hostname)}" = \{{'
terraform_exists = bool(re.search(terraform_pattern, terraform_content, re.MULTILINE))
# Check terraform/vault/hosts-generated.tf
vault_tf_path = repo_root / "terraform" / "vault" / "hosts-generated.tf"
vault_exists = False
if vault_tf_path.exists():
vault_content = vault_tf_path.read_text()
vault_exists = f'"{hostname}"' in vault_content
return (flake_exists, terraform_exists, vault_exists)
def update_flake_nix(config: HostConfig, repo_root: Path, force: bool = False) -> None:
"""
Add or update host entry in flake.nix nixosConfigurations.
Args:
config: Host configuration
repo_root: Path to repository root
force: If True, replace existing entry; if False, insert new entry
"""
flake_path = repo_root / "flake.nix"
content = flake_path.read_text()
# Create new entry
new_entry = f""" {config.hostname} = nixpkgs.lib.nixosSystem {{
inherit system;
specialArgs = {{
inherit inputs self sops-nix;
}};
modules = [
(
{{ config, pkgs, ... }}:
{{
nixpkgs.overlays = commonOverlays;
}}
)
./hosts/{config.hostname}
sops-nix.nixosModules.sops
];
}};
"""
# Check if hostname already exists
hostname_pattern = rf"^ {re.escape(config.hostname)} = nixpkgs\.lib\.nixosSystem"
existing_match = re.search(hostname_pattern, content, re.MULTILINE)
if existing_match and force:
# Replace existing entry
# Match the entire block from "hostname = " to "};"
replace_pattern = rf"^ {re.escape(config.hostname)} = nixpkgs\.lib\.nixosSystem \{{.*?^ \}};\n"
new_content, count = re.subn(replace_pattern, new_entry, content, flags=re.MULTILINE | re.DOTALL)
if count == 0:
raise ValueError(f"Could not find existing entry for {config.hostname} in flake.nix")
else:
# Insert new entry before closing brace of nixosConfigurations
# Pattern: " };\n packages = forAllSystems"
pattern = r"( \};)\n( packages = forAllSystems)"
replacement = rf"{new_entry}\g<1>\n\g<2>"
new_content, count = re.subn(pattern, replacement, content)
if count == 0:
raise ValueError(
"Could not find insertion point in flake.nix. "
"Looking for pattern: ' };\\n packages = forAllSystems'"
)
flake_path.write_text(new_content)
def update_terraform_vms(config: HostConfig, repo_root: Path, force: bool = False) -> None:
"""
Add or update VM entry in terraform/vms.tf locals.vms map.
Args:
config: Host configuration
repo_root: Path to repository root
force: If True, replace existing entry; if False, insert new entry
"""
terraform_path = repo_root / "terraform" / "vms.tf"
content = terraform_path.read_text()
# Create new entry based on whether we have static IP or DHCP
if config.is_static_ip:
new_entry = f''' "{config.hostname}" = {{
ip = "{config.ip}"
cpu_cores = {config.cpu}
memory = {config.memory}
disk_size = "{config.disk}"
}}
'''
else:
new_entry = f''' "{config.hostname}" = {{
cpu_cores = {config.cpu}
memory = {config.memory}
disk_size = "{config.disk}"
}}
'''
# Check if hostname already exists
hostname_pattern = rf'^\s+"{re.escape(config.hostname)}" = \{{'
existing_match = re.search(hostname_pattern, content, re.MULTILINE)
if existing_match and force:
# Replace existing entry
# Match the entire block from "hostname" = { to }
replace_pattern = rf'^\s+"{re.escape(config.hostname)}" = \{{.*?^\s+\}}\n'
new_content, count = re.subn(replace_pattern, new_entry, content, flags=re.MULTILINE | re.DOTALL)
if count == 0:
raise ValueError(f"Could not find existing entry for {config.hostname} in terraform/vms.tf")
else:
# Insert new entry before closing brace
# Pattern: " }\n\n # Compute VM configurations"
pattern = r"( \})\n\n( # Compute VM configurations)"
replacement = rf"{new_entry}\g<1>\n\n\g<2>"
new_content, count = re.subn(pattern, replacement, content)
if count == 0:
raise ValueError(
"Could not find insertion point in terraform/vms.tf. "
"Looking for pattern: ' }\\n\\n # Compute VM configurations'"
)
terraform_path.write_text(new_content)
def add_wrapped_token_to_vm(hostname: str, wrapped_token: str, repo_root: Path) -> None:
"""
Add or update the vault_wrapped_token field in an existing VM entry.
Args:
hostname: Hostname of the VM
wrapped_token: The wrapped token to add
repo_root: Path to repository root
"""
terraform_path = repo_root / "terraform" / "vms.tf"
content = terraform_path.read_text()
# Find the VM entry
hostname_pattern = rf'^\s+"{re.escape(hostname)}" = \{{'
match = re.search(hostname_pattern, content, re.MULTILINE)
if not match:
raise ValueError(f"Could not find VM entry for {hostname} in terraform/vms.tf")
# Find the full VM block
block_pattern = rf'(^\s+"{re.escape(hostname)}" = \{{)(.*?)(^\s+\}})'
block_match = re.search(block_pattern, content, re.MULTILINE | re.DOTALL)
if not block_match:
raise ValueError(f"Could not parse VM block for {hostname}")
block_start = block_match.group(1)
block_content = block_match.group(2)
block_end = block_match.group(3)
# Check if vault_wrapped_token already exists
if "vault_wrapped_token" in block_content:
# Update existing token
block_content = re.sub(
r'vault_wrapped_token\s*=\s*"[^"]*"',
f'vault_wrapped_token = "{wrapped_token}"',
block_content
)
else:
# Add new token field (add before closing brace)
# Find the last field and add after it
block_content = block_content.rstrip()
if block_content and not block_content.endswith("\n"):
block_content += "\n"
block_content += f' vault_wrapped_token = "{wrapped_token}"\n'
# Reconstruct the block
new_block = block_start + block_content + block_end
# Replace in content
new_content = re.sub(
rf'^\s+"{re.escape(hostname)}" = \{{.*?^\s+\}}',
new_block,
content,
flags=re.MULTILINE | re.DOTALL
)
terraform_path.write_text(new_content)