"""Helper functions for Vault/OpenBao API interactions.""" import os import subprocess from pathlib import Path from typing import Optional import hvac import typer def get_vault_client(vault_addr: Optional[str] = None, vault_token: Optional[str] = None) -> hvac.Client: """ Get a Vault client instance. Args: vault_addr: Vault server address (defaults to BAO_ADDR env var or hardcoded default) vault_token: Vault token (defaults to BAO_TOKEN env var or prompts user) Returns: Configured hvac.Client instance Raises: typer.Exit: If unable to create client or authenticate """ # Get Vault address if vault_addr is None: vault_addr = os.getenv("BAO_ADDR", "https://vault01.home.2rjus.net:8200") # Get Vault token if vault_token is None: vault_token = os.getenv("BAO_TOKEN") if not vault_token: typer.echo("\n⚠️ Vault token required. Set BAO_TOKEN environment variable or enter it below.") vault_token = typer.prompt("Vault token (BAO_TOKEN)", hide_input=True) # Create client try: client = hvac.Client(url=vault_addr, token=vault_token, verify=False) # Verify authentication if not client.is_authenticated(): typer.echo(f"\n❌ Failed to authenticate to Vault at {vault_addr}", err=True) typer.echo("Check your BAO_TOKEN and ensure Vault is accessible", err=True) raise typer.Exit(code=1) return client except Exception as e: typer.echo(f"\n❌ Error connecting to Vault: {e}", err=True) raise typer.Exit(code=1) def generate_wrapped_token(hostname: str, repo_root: Path) -> str: """ Generate a wrapped token containing AppRole credentials for a host. This function: 1. Applies Terraform to ensure the AppRole exists 2. Reads the role_id for the host 3. Generates a secret_id 4. Wraps both credentials in a cubbyhole token (24h TTL, single-use) Args: hostname: The host to generate credentials for repo_root: Path to repository root (for running terraform) Returns: Wrapped token string (hvs.CAES...) Raises: typer.Exit: If Terraform fails or Vault operations fail """ from rich.console import Console console = Console() # Get Vault client client = get_vault_client() # First, apply Terraform to ensure AppRole exists console.print(f"\n[bold blue]Applying Vault Terraform configuration...[/bold blue]") terraform_dir = repo_root / "terraform" / "vault" try: result = subprocess.run( ["tofu", "apply", "-auto-approve"], cwd=terraform_dir, capture_output=True, text=True, check=False, ) if result.returncode != 0: console.print(f"[red]❌ Terraform apply failed:[/red]") console.print(result.stderr) raise typer.Exit(code=1) console.print("[green]✓[/green] Terraform applied successfully") except FileNotFoundError: console.print(f"[red]❌ Error: 'tofu' command not found[/red]") console.print("Ensure OpenTofu is installed and in PATH") raise typer.Exit(code=1) # Read role_id try: console.print(f"[bold blue]Reading AppRole credentials for {hostname}...[/bold blue]") role_id_response = client.read(f"auth/approle/role/{hostname}/role-id") role_id = role_id_response["data"]["role_id"] console.print(f"[green]✓[/green] Retrieved role_id") except Exception as e: console.print(f"[red]❌ Failed to read role_id for {hostname}:[/red] {e}") console.print(f"\nEnsure the AppRole '{hostname}' exists in Vault") raise typer.Exit(code=1) # Generate secret_id try: secret_id_response = client.write(f"auth/approle/role/{hostname}/secret-id") secret_id = secret_id_response["data"]["secret_id"] console.print(f"[green]✓[/green] Generated secret_id") except Exception as e: console.print(f"[red]❌ Failed to generate secret_id:[/red] {e}") raise typer.Exit(code=1) # Wrap the credentials in a cubbyhole token try: console.print(f"[bold blue]Creating wrapped token (24h TTL, single-use)...[/bold blue]") # Use the response wrapping feature to wrap our credentials # This creates a temporary token that can only be used once to retrieve the actual credentials wrap_response = client.write( "sys/wrapping/wrap", wrap_ttl="24h", # The data we're wrapping role_id=role_id, secret_id=secret_id, ) wrapped_token = wrap_response["wrap_info"]["token"] console.print(f"[green]✓[/green] Created wrapped token: {wrapped_token[:20]}...") console.print(f"[yellow]⚠️[/yellow] Token expires in 24 hours") console.print(f"[yellow]⚠️[/yellow] Token can only be used once") return wrapped_token except Exception as e: console.print(f"[red]❌ Failed to create wrapped token:[/red] {e}") raise typer.Exit(code=1) def verify_vault_setup(hostname: str) -> bool: """ Verify that Vault is properly configured for a host. Checks: - Vault is accessible - AppRole exists for the hostname - Can read role_id Args: hostname: The host to verify Returns: True if everything is configured correctly, False otherwise """ try: client = get_vault_client() # Try to read the role_id client.read(f"auth/approle/role/{hostname}/role-id") return True except Exception: return False