179 lines
5.6 KiB
Python
179 lines
5.6 KiB
Python
"""Helper functions for Vault/OpenBao API interactions."""
|
|
|
|
import os
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import hvac
|
|
import typer
|
|
|
|
|
|
def get_vault_client(vault_addr: Optional[str] = None, vault_token: Optional[str] = None) -> hvac.Client:
|
|
"""
|
|
Get a Vault client instance.
|
|
|
|
Args:
|
|
vault_addr: Vault server address (defaults to BAO_ADDR env var or hardcoded default)
|
|
vault_token: Vault token (defaults to BAO_TOKEN env var or prompts user)
|
|
|
|
Returns:
|
|
Configured hvac.Client instance
|
|
|
|
Raises:
|
|
typer.Exit: If unable to create client or authenticate
|
|
"""
|
|
# Get Vault address
|
|
if vault_addr is None:
|
|
vault_addr = os.getenv("BAO_ADDR", "https://vault01.home.2rjus.net:8200")
|
|
|
|
# Get Vault token
|
|
if vault_token is None:
|
|
vault_token = os.getenv("BAO_TOKEN")
|
|
|
|
if not vault_token:
|
|
typer.echo("\n⚠️ Vault token required. Set BAO_TOKEN environment variable or enter it below.")
|
|
vault_token = typer.prompt("Vault token (BAO_TOKEN)", hide_input=True)
|
|
|
|
# Create client
|
|
try:
|
|
client = hvac.Client(url=vault_addr, token=vault_token, verify=False)
|
|
|
|
# Verify authentication
|
|
if not client.is_authenticated():
|
|
typer.echo(f"\n❌ Failed to authenticate to Vault at {vault_addr}", err=True)
|
|
typer.echo("Check your BAO_TOKEN and ensure Vault is accessible", err=True)
|
|
raise typer.Exit(code=1)
|
|
|
|
return client
|
|
|
|
except Exception as e:
|
|
typer.echo(f"\n❌ Error connecting to Vault: {e}", err=True)
|
|
raise typer.Exit(code=1)
|
|
|
|
|
|
def generate_wrapped_token(hostname: str, repo_root: Path) -> str:
|
|
"""
|
|
Generate a wrapped token containing AppRole credentials for a host.
|
|
|
|
This function:
|
|
1. Applies Terraform to ensure the AppRole exists
|
|
2. Reads the role_id for the host
|
|
3. Generates a secret_id
|
|
4. Wraps both credentials in a cubbyhole token (24h TTL, single-use)
|
|
|
|
Args:
|
|
hostname: The host to generate credentials for
|
|
repo_root: Path to repository root (for running terraform)
|
|
|
|
Returns:
|
|
Wrapped token string (hvs.CAES...)
|
|
|
|
Raises:
|
|
typer.Exit: If Terraform fails or Vault operations fail
|
|
"""
|
|
from rich.console import Console
|
|
|
|
console = Console()
|
|
|
|
# Get Vault client
|
|
client = get_vault_client()
|
|
|
|
# First, apply Terraform to ensure AppRole exists
|
|
console.print(f"\n[bold blue]Applying Vault Terraform configuration...[/bold blue]")
|
|
terraform_dir = repo_root / "terraform" / "vault"
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["tofu", "apply", "-auto-approve"],
|
|
cwd=terraform_dir,
|
|
capture_output=True,
|
|
text=True,
|
|
check=False,
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
console.print(f"[red]❌ Terraform apply failed:[/red]")
|
|
console.print(result.stderr)
|
|
raise typer.Exit(code=1)
|
|
|
|
console.print("[green]✓[/green] Terraform applied successfully")
|
|
|
|
except FileNotFoundError:
|
|
console.print(f"[red]❌ Error: 'tofu' command not found[/red]")
|
|
console.print("Ensure OpenTofu is installed and in PATH")
|
|
raise typer.Exit(code=1)
|
|
|
|
# Read role_id
|
|
try:
|
|
console.print(f"[bold blue]Reading AppRole credentials for {hostname}...[/bold blue]")
|
|
role_id_response = client.read(f"auth/approle/role/{hostname}/role-id")
|
|
role_id = role_id_response["data"]["role_id"]
|
|
console.print(f"[green]✓[/green] Retrieved role_id")
|
|
|
|
except Exception as e:
|
|
console.print(f"[red]❌ Failed to read role_id for {hostname}:[/red] {e}")
|
|
console.print(f"\nEnsure the AppRole '{hostname}' exists in Vault")
|
|
raise typer.Exit(code=1)
|
|
|
|
# Generate secret_id
|
|
try:
|
|
secret_id_response = client.write(f"auth/approle/role/{hostname}/secret-id")
|
|
secret_id = secret_id_response["data"]["secret_id"]
|
|
console.print(f"[green]✓[/green] Generated secret_id")
|
|
|
|
except Exception as e:
|
|
console.print(f"[red]❌ Failed to generate secret_id:[/red] {e}")
|
|
raise typer.Exit(code=1)
|
|
|
|
# Wrap the credentials in a cubbyhole token
|
|
try:
|
|
console.print(f"[bold blue]Creating wrapped token (24h TTL, single-use)...[/bold blue]")
|
|
|
|
# Use the response wrapping feature to wrap our credentials
|
|
# This creates a temporary token that can only be used once to retrieve the actual credentials
|
|
wrap_response = client.write(
|
|
"sys/wrapping/wrap",
|
|
wrap_ttl="24h",
|
|
# The data we're wrapping
|
|
role_id=role_id,
|
|
secret_id=secret_id,
|
|
)
|
|
|
|
wrapped_token = wrap_response["wrap_info"]["token"]
|
|
console.print(f"[green]✓[/green] Created wrapped token: {wrapped_token[:20]}...")
|
|
console.print(f"[yellow]⚠️[/yellow] Token expires in 24 hours")
|
|
console.print(f"[yellow]⚠️[/yellow] Token can only be used once")
|
|
|
|
return wrapped_token
|
|
|
|
except Exception as e:
|
|
console.print(f"[red]❌ Failed to create wrapped token:[/red] {e}")
|
|
raise typer.Exit(code=1)
|
|
|
|
|
|
def verify_vault_setup(hostname: str) -> bool:
|
|
"""
|
|
Verify that Vault is properly configured for a host.
|
|
|
|
Checks:
|
|
- Vault is accessible
|
|
- AppRole exists for the hostname
|
|
- Can read role_id
|
|
|
|
Args:
|
|
hostname: The host to verify
|
|
|
|
Returns:
|
|
True if everything is configured correctly, False otherwise
|
|
"""
|
|
try:
|
|
client = get_vault_client()
|
|
|
|
# Try to read the role_id
|
|
client.read(f"auth/approle/role/{hostname}/role-id")
|
|
return True
|
|
|
|
except Exception:
|
|
return False
|