Files
nixos-servers/scripts/create-host/vault_helper.py
Torjus Håkestad 2b4dc424cc
All checks were successful
Run nix flake check / flake-check (push) Successful in 2m20s
Run nix flake check / flake-check (pull_request) Successful in 2m20s
vault: implement bootstrap integration
2026-02-03 00:54:31 +01:00

179 lines
5.6 KiB
Python

"""Helper functions for Vault/OpenBao API interactions."""
import os
import subprocess
from pathlib import Path
from typing import Optional
import hvac
import typer
def get_vault_client(vault_addr: Optional[str] = None, vault_token: Optional[str] = None) -> hvac.Client:
"""
Get a Vault client instance.
Args:
vault_addr: Vault server address (defaults to BAO_ADDR env var or hardcoded default)
vault_token: Vault token (defaults to BAO_TOKEN env var or prompts user)
Returns:
Configured hvac.Client instance
Raises:
typer.Exit: If unable to create client or authenticate
"""
# Get Vault address
if vault_addr is None:
vault_addr = os.getenv("BAO_ADDR", "https://vault01.home.2rjus.net:8200")
# Get Vault token
if vault_token is None:
vault_token = os.getenv("BAO_TOKEN")
if not vault_token:
typer.echo("\n⚠️ Vault token required. Set BAO_TOKEN environment variable or enter it below.")
vault_token = typer.prompt("Vault token (BAO_TOKEN)", hide_input=True)
# Create client
try:
client = hvac.Client(url=vault_addr, token=vault_token, verify=False)
# Verify authentication
if not client.is_authenticated():
typer.echo(f"\n❌ Failed to authenticate to Vault at {vault_addr}", err=True)
typer.echo("Check your BAO_TOKEN and ensure Vault is accessible", err=True)
raise typer.Exit(code=1)
return client
except Exception as e:
typer.echo(f"\n❌ Error connecting to Vault: {e}", err=True)
raise typer.Exit(code=1)
def generate_wrapped_token(hostname: str, repo_root: Path) -> str:
"""
Generate a wrapped token containing AppRole credentials for a host.
This function:
1. Applies Terraform to ensure the AppRole exists
2. Reads the role_id for the host
3. Generates a secret_id
4. Wraps both credentials in a cubbyhole token (24h TTL, single-use)
Args:
hostname: The host to generate credentials for
repo_root: Path to repository root (for running terraform)
Returns:
Wrapped token string (hvs.CAES...)
Raises:
typer.Exit: If Terraform fails or Vault operations fail
"""
from rich.console import Console
console = Console()
# Get Vault client
client = get_vault_client()
# First, apply Terraform to ensure AppRole exists
console.print(f"\n[bold blue]Applying Vault Terraform configuration...[/bold blue]")
terraform_dir = repo_root / "terraform" / "vault"
try:
result = subprocess.run(
["tofu", "apply", "-auto-approve"],
cwd=terraform_dir,
capture_output=True,
text=True,
check=False,
)
if result.returncode != 0:
console.print(f"[red]❌ Terraform apply failed:[/red]")
console.print(result.stderr)
raise typer.Exit(code=1)
console.print("[green]✓[/green] Terraform applied successfully")
except FileNotFoundError:
console.print(f"[red]❌ Error: 'tofu' command not found[/red]")
console.print("Ensure OpenTofu is installed and in PATH")
raise typer.Exit(code=1)
# Read role_id
try:
console.print(f"[bold blue]Reading AppRole credentials for {hostname}...[/bold blue]")
role_id_response = client.read(f"auth/approle/role/{hostname}/role-id")
role_id = role_id_response["data"]["role_id"]
console.print(f"[green]✓[/green] Retrieved role_id")
except Exception as e:
console.print(f"[red]❌ Failed to read role_id for {hostname}:[/red] {e}")
console.print(f"\nEnsure the AppRole '{hostname}' exists in Vault")
raise typer.Exit(code=1)
# Generate secret_id
try:
secret_id_response = client.write(f"auth/approle/role/{hostname}/secret-id")
secret_id = secret_id_response["data"]["secret_id"]
console.print(f"[green]✓[/green] Generated secret_id")
except Exception as e:
console.print(f"[red]❌ Failed to generate secret_id:[/red] {e}")
raise typer.Exit(code=1)
# Wrap the credentials in a cubbyhole token
try:
console.print(f"[bold blue]Creating wrapped token (24h TTL, single-use)...[/bold blue]")
# Use the response wrapping feature to wrap our credentials
# This creates a temporary token that can only be used once to retrieve the actual credentials
wrap_response = client.write(
"sys/wrapping/wrap",
wrap_ttl="24h",
# The data we're wrapping
role_id=role_id,
secret_id=secret_id,
)
wrapped_token = wrap_response["wrap_info"]["token"]
console.print(f"[green]✓[/green] Created wrapped token: {wrapped_token[:20]}...")
console.print(f"[yellow]⚠️[/yellow] Token expires in 24 hours")
console.print(f"[yellow]⚠️[/yellow] Token can only be used once")
return wrapped_token
except Exception as e:
console.print(f"[red]❌ Failed to create wrapped token:[/red] {e}")
raise typer.Exit(code=1)
def verify_vault_setup(hostname: str) -> bool:
"""
Verify that Vault is properly configured for a host.
Checks:
- Vault is accessible
- AppRole exists for the hostname
- Can read role_id
Args:
hostname: The host to verify
Returns:
True if everything is configured correctly, False otherwise
"""
try:
client = get_vault_client()
# Try to read the role_id
client.read(f"auth/approle/role/{hostname}/role-id")
return True
except Exception:
return False