diff --git a/docs/features/costs.md b/docs/features/costs.md index db732ec..8e2e704 100644 --- a/docs/features/costs.md +++ b/docs/features/costs.md @@ -1,46 +1,50 @@ # Cost Estimates -deployml integrates with [Infracost](https://github.com/infracost/infracost) to provide cost estimates before deploying your infrastructure, helping you manage cloud costs effectively in academic settings. - -## Overview - -Cost analysis runs automatically during deployment, showing monthly cost estimates for your entire stack, cost breakdowns by component, and warnings if costs exceed your configured threshold. The process analyzes your Terraform configuration before deployment, allowing you to adjust your configuration based on estimates. +deployml integrates with [Infracost](https://www.infracost.io) to show infrastructure costs. Both commands read your Terraform configuration — they price always-on resources like Cloud SQL accurately, but usage-based services (BigQuery, GCS, Cloud Run) show $0 since costs depend on actual usage. Check the GCP Billing Console for real usage charges. ## Setup -Install Infracost and register for a free API key using the instructions [here](https://www.infracost.io/docs/#quick-start). +```bash +brew install infracost +infracost auth login +``` -## Configuration +Run `deployml doctor` to confirm infracost is installed and authenticated. -Cost analysis is enabled by default. Configure it in your YAML file to enable or disable cost analysis, set a warning threshold in USD (default $100/month), and choose the currency for cost display. +## Commands -Here is an example of what this might look like: -```yaml -cost_analysis: - enabled: true # Enable/disable cost analysis (default: true) - warning_threshold: 50.0 # Warn if monthly cost exceeds this amount (default: 100.0) - currency: "USD" - bucket_amount: 200 # GB stored across GCS buckets - cloudsql_amount: 50 # GB of Cloud SQL storage +**Before deploying** — estimates cost from your config without touching any infrastructure: +```bash +deployml estimate ``` -## Typical Costs +**While deployed** — scans your actual deployed Terraform workspace: +```bash +deployml costs +``` -Here are estimated typical costs for several **GCP** services, but please do not simply believe these numbers without keeping track of costs yourself. +Both commands show a breakdown of which resources cost money and how much. -- Cloud Run services cost $10-30 per month depending on traffic. -- Cloud SQL PostgreSQL ranges from $7/month for small instances to $25+ for production. -- Google Cloud Storage costs approximately $0.020 per GB per month. -- BigQuery storage costs $0.020 per GB per month with query costs based on data scanned. -- Cloud VMs cost approximately $25 per month for medium instances. -- GKE clusters have no management fee, but you pay for VM instances and load balancers. Note that the GKE can get very expensive very quickly. +## Cost shown during deploy +`deployml deploy` automatically runs a cost estimate after `terraform plan` and shows it before the confirmation prompt: +``` + Deploy stack? Monthly cost: ~$34.55 USD [y/N]: +``` + +## Configuration + +```yaml +cost_analysis: + enabled: true # set to false to skip (default: true) + warning_threshold: 50.0 # warn if monthly cost exceeds this (default: 100.0) +``` +## Typical costs (Cloud Run stack) -## Cost Optimization +A standard MLflow + FastAPI + Grafana deployment runs around **$34/month**, almost entirely Cloud SQL. Cloud Run, BigQuery, and GCS scale to zero and cost nothing at idle. -Here are some tips to keep the costs low while you are learning: +## Keeping costs low -- Use SQLite instead of Cloud SQL whenever possible, particularly for development purposes and when your data is small. -- Enable auto-teardown to prevent forgotten deployments. -- Use Cloud Run for variable workloads to take advantage of scale-to-zero pricing. \ No newline at end of file +- Always `deployml destroy` when done — Cloud SQL bills continuously. +- Use `backend_store_uri: sqlite` instead of `postgresql` during development to eliminate Cloud SQL entirely. diff --git a/docs/tutorials/gcp-cloud-run.md b/docs/tutorials/gcp-cloud-run.md index 9bae181..57033d5 100644 --- a/docs/tutorials/gcp-cloud-run.md +++ b/docs/tutorials/gcp-cloud-run.md @@ -9,6 +9,11 @@ Make sure `deployml doctor` passes before starting. You will need: - `gcloud` CLI, authenticated (`gcloud auth login` and `gcloud auth application-default login`) - Docker (running) - Terraform +- Infracost (optional, for cost estimates): + ```bash + brew install infracost + infracost auth login + ``` ## 1. Create a GCP Project @@ -71,6 +76,16 @@ stack: - `model_serving` — deploys a FastAPI container that pulls the latest registered model from MLflow on startup - `model_monitoring` — deploys Grafana connected to the Postgres `metrics` database +## 3.5 Estimate Costs (Optional) + +Check what the stack will cost before committing to a 20-minute deploy: + +```bash +deployml estimate +``` + +No GCP credentials required, no infrastructure touched. A standard Cloud Run stack runs around **$34/month** — almost entirely Cloud SQL. See [Cost Estimates](../features/costs.md). + ## 4. Build Docker Images Build and push the service images to Artifact Registry: @@ -152,6 +167,14 @@ You should see `offline_features`, `predictions`, `ground_truth`, and `drift_met With the stack running, follow the [example walkthrough](example.md) to train a model, register it, serve predictions through FastAPI, and visualize drift metrics in Grafana. +## 8.5 Check Running Costs + +```bash +deployml costs +``` + +Shows what your deployed stack is currently costing. Cloud SQL is the main driver at ~$34/month — everything else scales to zero. + ## 9. Teardown When you are done, destroy all infrastructure to avoid ongoing charges: diff --git a/src/deployml/cli/cli.py b/src/deployml/cli/cli.py index fe9dc14..056cf9d 100644 --- a/src/deployml/cli/cli.py +++ b/src/deployml/cli/cli.py @@ -41,6 +41,7 @@ ) from deployml.utils.infracost import ( check_infracost_available, + check_infracost_authenticated, run_infracost_analysis, format_cost_for_confirmation, ) @@ -529,6 +530,13 @@ def doctor( # Infracost if infracost_installed: typer.secho("\n Infracost is installed", fg=typer.colors.GREEN) + if check_infracost_authenticated(): + typer.secho(" Infracost is authenticated", fg=typer.colors.GREEN) + else: + typer.secho( + " Infracost not authenticated — run: infracost auth login", + fg=typer.colors.YELLOW, + ) else: typer.secho( "\nWARNING: Infracost not installed (optional)", fg=typer.colors.YELLOW @@ -778,6 +786,212 @@ def terraform( output_dir = Path(output_dir) +@cli.command() +def estimate( + config_path: Path = typer.Option( + Path("config.yaml"), "--config-path", "-c", help="Path to YAML config file" + ), +): + """Estimate monthly infrastructure cost without deploying anything.""" + import tempfile + import shutil as _shutil + import hashlib as _hashlib + + if not check_infracost_available(): + typer.echo(" Infracost is not installed.") + typer.echo(" Install: https://www.infracost.io/docs/#quick-start") + raise typer.Exit(code=1) + + if not check_infracost_authenticated(): + typer.echo(" Infracost is not authenticated.") + typer.echo(" Run: infracost auth login") + typer.echo(" Or set: export INFRACOST_API_KEY=") + raise typer.Exit(code=1) + + if not config_path.exists(): + typer.echo(f" Config file not found: {config_path}") + raise typer.Exit(code=1) + + config = yaml.safe_load(config_path.read_text()) + cloud = config["provider"]["name"] + project_id = config["provider"]["project_id"] + region = config["provider"]["region"] + deployment_type = config["deployment"]["type"] + stack = config.get("stack", []) + workspace_name = config.get("name") or "development" + + if deployment_type == "gke": + typer.echo(" Cost estimation is not supported for GKE deployments.") + raise typer.Exit(code=1) + + teardown_config = config.get("teardown", {}) + teardown_enabled = teardown_config.get("enabled", False) + + # Auto-resolve image URIs so templates render with valid image paths + _TOOL_IMAGE_NAMES = { + "mlflow": "mlflow", + "feast": "feast", + "fastapi": "fastapi", + "grafana": "grafana-container", + "wandb": "wandb", + } + _ar_base = f"{region}-docker.pkg.dev/{project_id}/mlops-images" + for stage in stack: + for stage_name, tool in stage.items(): + tool_name = tool.get("name", "") + params = tool.setdefault("params", {}) + existing_image = params.get("image", "") + if not existing_image or existing_image.startswith("gcr.io/"): + image_name = _TOOL_IMAGE_NAMES.get(tool_name) + if image_name: + params["image"] = f"{_ar_base}/{image_name}:latest" + if stage_name == "workflow_orchestration" and tool_name == "cron": + for job in params.get("jobs", []): + if not job.get("image") or job.get("image", "").startswith("gcr.io/"): + job_name = job.get("service_name", "") + job["image"] = f"{_ar_base}/{job_name}:latest" + + # Build bucket_configs without GCS calls — estimate doesn't need live cloud state + bucket_configs = [] + for stage in stack: + for stage_name, tool in stage.items(): + if tool.get("params", {}).get("artifact_bucket"): + bucket_configs.append({ + "stage": stage_name, + "tool": tool["name"], + "bucket_name": tool["params"]["artifact_bucket"], + "create": tool["params"].get("create_artifact_bucket", True), + "exists": False, + }) + create_artifact_bucket = any(c["create"] for c in bucket_configs) + + name_material = f"{workspace_name}:{project_id}".encode("utf-8") + name_hash = _hashlib.sha1(name_material).hexdigest()[:6] + + warning_threshold = config.get("cost_analysis", {}).get("warning_threshold", 100.0) + + temp_dir = Path(tempfile.mkdtemp()) + modules_dir = temp_dir / "modules" + modules_dir.mkdir() + + try: + copy_modules_to_workspace( + modules_dir, + stack=stack, + deployment_type=deployment_type, + cloud=cloud, + teardown_enabled=teardown_enabled, + ) + + env = Environment(loader=FileSystemLoader(TEMPLATE_DIR)) + if deployment_type == "cloud_run": + if any(tool.get("name") == "wandb" for stage in stack for tool in stage.values()): + main_template = env.get_template(f"{cloud}/{deployment_type}/wandb_main.tf.j2") + elif any(tool.get("name") == "mlflow" for stage in stack for tool in stage.values()): + main_template = env.get_template(f"{cloud}/{deployment_type}/mlflow_main.tf.j2") + else: + main_template = env.get_template(f"{cloud}/{deployment_type}/main.tf.j2") + else: + main_template = env.get_template(f"{cloud}/{deployment_type}/main.tf.j2") + + var_template = env.get_template(f"{cloud}/{deployment_type}/variables.tf.j2") + tfvars_template = env.get_template(f"{cloud}/{deployment_type}/terraform.tfvars.j2") + + render_kwargs = dict( + cloud=cloud, + stack=stack, + deployment_type=deployment_type, + create_artifact_bucket=create_artifact_bucket, + bucket_configs=bucket_configs, + project_id=project_id, + stack_name=workspace_name, + name_hash=name_hash, + teardown_config=None, + teardown_cron_schedule="", + teardown_scheduled_timestamp=0, + ) + if deployment_type == "cloud_vm": + render_kwargs["region"] = region + render_kwargs["zone"] = config["provider"].get("zone", f"{region}-a") + + main_tf = main_template.render(**render_kwargs) + variables_tf = var_template.render( + stack=stack, + cloud=cloud, + project_id=project_id, + stack_name=workspace_name, + name_hash=name_hash, + ) + tfvars_content = tfvars_template.render( + project_id=project_id, + region=region, + zone=config["provider"].get("zone", f"{region}-a"), + stack=stack, + cloud=cloud, + create_artifact_bucket=create_artifact_bucket, + stack_name=workspace_name, + name_hash=name_hash, + ) + + (temp_dir / "main.tf").write_text(main_tf) + (temp_dir / "variables.tf").write_text(variables_tf) + (temp_dir / "terraform.tfvars").write_text(tfvars_content) + + typer.echo(f" Estimating cost for: {workspace_name}") + analysis = run_infracost_analysis(temp_dir, warning_threshold, show_resources=True) + + if analysis is None: + typer.secho(" Cost estimate unavailable.", fg=typer.colors.YELLOW) + raise typer.Exit(code=1) + + except typer.Exit: + raise + except Exception as e: + typer.echo(f" Estimate failed: {e}") + raise typer.Exit(code=1) + finally: + _shutil.rmtree(temp_dir, ignore_errors=True) + + +@cli.command() +def costs( + config_path: Path = typer.Option( + Path("config.yaml"), "--config-path", "-c", help="Path to YAML config file" + ), +): + """Show the monthly cost of your currently running deployment.""" + if not check_infracost_available(): + typer.echo(" Infracost is not installed.") + typer.echo(" Install: https://www.infracost.io/docs/#quick-start") + raise typer.Exit(code=1) + + if not check_infracost_authenticated(): + typer.echo(" Infracost is not authenticated.") + typer.echo(" Run: infracost auth login") + raise typer.Exit(code=1) + + if not config_path.exists(): + typer.echo(f" Config file not found: {config_path}") + raise typer.Exit(code=1) + + config = yaml.safe_load(config_path.read_text()) + workspace_name = config.get("name") or "development" + terraform_dir = Path.cwd() / ".deployml" / workspace_name / "terraform" + + if not terraform_dir.exists(): + typer.echo(f" No deployment found at {terraform_dir}") + typer.echo(" Run 'deployml deploy' to deploy, or 'deployml estimate' for a pre-deploy cost prediction.") + raise typer.Exit(code=1) + + warning_threshold = config.get("cost_analysis", {}).get("warning_threshold", 100.0) + typer.echo(f" Checking costs for running deployment: {workspace_name}") + analysis = run_infracost_analysis(terraform_dir, warning_threshold, show_resources=True) + + if analysis is None: + typer.secho(" Cost check unavailable.", fg=typer.colors.YELLOW) + raise typer.Exit(code=1) + + @cli.command() def deploy( config_path: Path = typer.Option( @@ -1210,50 +1424,7 @@ def deploy( cost_analysis = None if cost_enabled: - usage_file_path = cost_config.get("usage_file") - usage_file = Path(usage_file_path) if usage_file_path else None - - # If no explicit usage file provided, generate one from high-level YAML values - if usage_file is None: - try: - bucket_amount = cost_config.get("bucket_amount") - cloudsql_amount = cost_config.get( - "cloudSQL_amount" - ) or cost_config.get("cloudsql_amount") - bigquery_amount = cost_config.get( - "bigQuery_amount" - ) or cost_config.get("bigquery_amount") - - resource_type_default_usage = {} - # Map high-level amounts to Infracost resource defaults - if bucket_amount is not None: - resource_type_default_usage["google_storage_bucket"] = { - "storage_gb": float(bucket_amount) - } - if cloudsql_amount is not None: - resource_type_default_usage[ - "google_sql_database_instance" - ] = {"storage_gb": float(cloudsql_amount)} - if bigquery_amount is not None: - resource_type_default_usage["google_bigquery_table"] = { - "storage_gb": float(bigquery_amount) - } - - if resource_type_default_usage: - usage_yaml = { - "version": "0.1", - "resource_type_default_usage": resource_type_default_usage, - } - usage_file = DEPLOYML_TERRAFORM_DIR / "infracost-usage.yml" - with open(usage_file, "w") as f: - yaml.safe_dump(usage_yaml, f, sort_keys=False) - except Exception: - # If usage-file generation fails, continue without it - usage_file = None - - cost_analysis = run_infracost_analysis( - DEPLOYML_TERRAFORM_DIR, warning_threshold, usage_file=usage_file - ) + cost_analysis = run_infracost_analysis(DEPLOYML_TERRAFORM_DIR, warning_threshold) # Format confirmation message with cost information if cost_analysis: diff --git a/src/deployml/diagnostics/doctor.py b/src/deployml/diagnostics/doctor.py index 791ff97..9c36a38 100644 --- a/src/deployml/diagnostics/doctor.py +++ b/src/deployml/diagnostics/doctor.py @@ -87,7 +87,8 @@ def run_all_checks(self) -> List[CheckResult]: # Development tools self._check_git() self._check_infracost() - + self._check_infracost_authenticated() + # Permissions and access self._check_docker_permissions() self._check_cloud_authentication() @@ -388,7 +389,28 @@ def _check_infracost(self): message="Installed but version check failed", required=False )) - + + def _check_infracost_authenticated(self): + """Check if infracost is authenticated via credentials file or API key env var""" + if not shutil.which("infracost"): + return # _check_infracost already reported not-installed + from deployml.utils.infracost import check_infracost_authenticated + if check_infracost_authenticated(): + self._add_result(CheckResult( + name="Infracost Auth", + status=CheckStatus.PASS, + message="Infracost is authenticated", + required=False + )) + else: + self._add_result(CheckResult( + name="Infracost Auth", + status=CheckStatus.WARNING, + message="Infracost not authenticated — cost analysis will be skipped", + fix_command="infracost auth login OR export INFRACOST_API_KEY=", + required=False + )) + def _check_docker_permissions(self): """Check Docker permissions""" try: diff --git a/src/deployml/utils/helpers.py b/src/deployml/utils/helpers.py index 97e8e39..e005ae3 100644 --- a/src/deployml/utils/helpers.py +++ b/src/deployml/utils/helpers.py @@ -102,6 +102,16 @@ def copy_modules_to_workspace( # BigQuery is always included — provides the mlops dataset and tables used_modules.add("bigquery") + # cloud_sql_postgres is needed when mlflow or feast uses a postgresql backend. + # This mirrors the template's flags.needs_postgres logic so the module source + # reference in the rendered main.tf can always be resolved. + for stage in stack: + for stage_name, tool in stage.items(): + if tool.get("name") in ("mlflow", "feast"): + backend = tool.get("params", {}).get("backend_store_uri", "") + if backend.startswith("postgresql"): + used_modules.add("cloud_sql_postgres") + # Only copy the modules that are being used, and only the specific deployment type for module_path in MODULE_TEMPLATES_DIR.iterdir(): if module_path.is_dir() and module_path.name in used_modules: diff --git a/src/deployml/utils/infracost.py b/src/deployml/utils/infracost.py index 5418ef2..7bb675a 100644 --- a/src/deployml/utils/infracost.py +++ b/src/deployml/utils/infracost.py @@ -1,52 +1,50 @@ import json +import os import subprocess import typer from pathlib import Path -from typing import Dict, Optional, List -from dataclasses import dataclass - - -@dataclass -class CostComponent: - """Represents a single cost component for a resource""" - - name: str - unit: str - monthly_cost: float - hourly_cost: float - usage_based: bool = False - - -@dataclass -class ResourceCost: - """Represents cost information for a single resource""" - - name: str - resource_type: str - monthly_cost: float - hourly_cost: float - components: List[CostComponent] +from typing import Dict, List, Optional, Tuple +from dataclasses import dataclass, field + + +_RESOURCE_TYPE_LABELS = { + "google_sql_database_instance": "Cloud SQL", + "google_cloud_run_service": "Cloud Run", + "google_cloud_run_v2_service": "Cloud Run", + "google_storage_bucket": "GCS Bucket", + "google_bigquery_dataset": "BigQuery", + "google_bigquery_table": "BigQuery", + "google_compute_instance": "Compute Engine VM", + "google_container_cluster": "GKE Cluster", + "google_redis_instance": "Cloud Memorystore", + "google_pubsub_topic": "Pub/Sub", + "google_pubsub_subscription": "Pub/Sub", +} + + +def _resource_label(address: str) -> str: + """Extract a human-readable label from a terraform resource address.""" + # address looks like: module.cloud_sql_postgres.google_sql_database_instance.postgres + parts = address.split(".") + for part in parts: + if part in _RESOURCE_TYPE_LABELS: + return _RESOURCE_TYPE_LABELS[part] + if part.startswith("google_"): + return part # fall back to raw type name + return address @dataclass class CostAnalysis: - """Represents the complete cost analysis results""" - total_monthly_cost: float - total_hourly_cost: float currency: str - resources: List[ResourceCost] - detected_resources: int - supported_resources: int + resources: int + costed_resources: int + free_resources: int + resource_costs: List[Tuple[str, float]] = field(default_factory=list) def check_infracost_available() -> bool: - """ - Check if infracost CLI is available in the system PATH. - - Returns: - bool: True if infracost is available, False otherwise. - """ try: result = subprocess.run( ["infracost", "--version"], @@ -55,269 +53,203 @@ def check_infracost_available() -> bool: timeout=10, ) return result.returncode == 0 - except ( - subprocess.TimeoutExpired, - FileNotFoundError, - subprocess.SubprocessError, - ): + except (subprocess.TimeoutExpired, FileNotFoundError, subprocess.SubprocessError): return False -def run_infracost_breakdown(terraform_dir: Path, usage_file: Optional[Path] = None) -> Optional[Dict]: - """ - Run infracost breakdown analysis on the terraform directory. +def check_infracost_authenticated() -> bool: + if os.environ.get("INFRACOST_API_KEY"): + return True + # v2 on macOS stores token here; v1 / Linux uses ~/.config/infracost/credentials.yml + macos_token = Path.home() / "Library" / "Application Support" / "infracost" / "token.json" + linux_creds = Path.home() / ".config" / "infracost" / "credentials.yml" + return macos_token.exists() or linux_creds.exists() - Args: - terraform_dir: Path to the terraform directory - Returns: - Dict containing the infracost JSON output, or None if failed +def run_infracost_scan(terraform_dir: Path) -> Optional[Dict]: """ - if not check_infracost_available(): - return None + Run infracost v2 scan and return parsed JSON. + Uses `infracost scan --json`. Falls back to flag-before-subcommand + form if needed, since --json is a global flag in v2. + Does not require terraform init — infracost v2 parses HCL directly. + """ + cmds = [ + ["infracost", "scan", str(terraform_dir), "--json"], + ["infracost", "--json", "scan", str(terraform_dir)], + ] + for cmd in cmds: + try: + result = subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=120, + ) + if result.returncode == 0: + return json.loads(result.stdout) + if "unknown flag" not in result.stderr: + typer.echo(f"Infracost scan failed: {result.stderr}") + return None + # unknown flag → try next form + except subprocess.TimeoutExpired: + typer.echo("Infracost scan timed out") + return None + except json.JSONDecodeError: + typer.echo("Failed to parse infracost JSON output") + return None + except (FileNotFoundError, subprocess.SubprocessError) as e: + typer.echo(f"Infracost error: {e}") + return None + typer.echo("Infracost scan failed: could not find working --json flag form") + return None - try: - # Run infracost breakdown and capture JSON output - cmd = [ - "infracost", - "breakdown", - "--path", - str(terraform_dir), - "--format", - "json", - ] - if usage_file is not None: - cmd.extend(["--usage-file", str(usage_file)]) +def fetch_resource_costs() -> List[Tuple[str, float]]: + """ + Run `infracost inspect --group-by resource --json` against the last cached + scan and return a list of (label, monthly_cost) for non-zero cost resources. + """ + try: result = subprocess.run( - cmd, - cwd=terraform_dir, + ["infracost", "inspect", "--group-by", "resource", "--json"], capture_output=True, text=True, - timeout=60, + timeout=30, ) - - if result.returncode == 0: - return json.loads(result.stdout) - else: - typer.echo(f"⚠️ Infracost analysis failed: {result.stderr}") - return None - - except subprocess.TimeoutExpired: - typer.echo("⚠️ Infracost analysis timed out") - return None - except json.JSONDecodeError: - typer.echo("⚠️ Failed to parse infracost output") - return None - except Exception as e: - typer.echo(f"⚠️ Infracost error: {e}") - return None - - -def parse_infracost_data(data: Dict) -> Optional[CostAnalysis]: + if result.returncode != 0: + return [] + rows = json.loads(result.stdout) + costs = [] + seen_labels = set() + for row in rows: + cost = float(row.get("cost", 0) or 0) + if cost <= 0: + continue + address = row.get("columns", {}).get("resource", "") + label = _resource_label(address) + if label in seen_labels: + # Accumulate duplicate resource types (e.g. multiple Cloud Run services) + for i, (lbl, c) in enumerate(costs): + if lbl == label: + costs[i] = (lbl, c + cost) + break + else: + seen_labels.add(label) + costs.append((label, cost)) + return sorted(costs, key=lambda x: x[1], reverse=True) + except Exception: + return [] + + +def parse_infracost_scan_data(data: Dict) -> Optional[CostAnalysis]: """ - Parse infracost JSON data into structured cost analysis. - - Args: - data: Raw infracost JSON data + Parse v2 infracost JSON into a CostAnalysis. - Returns: - CostAnalysis object or None if parsing failed + Two schemas exist depending on how infracost is invoked: + - `infracost scan --json`: fields under a top-level "summary" key, + cost field is "total_monthly_cost" + - `infracost inspect --json`: fields at top level, cost field is "monthly_cost" """ try: - resources = [] - - for project in data.get("projects", []): - breakdown = project.get("breakdown", {}) - - for resource_data in breakdown.get("resources", []): - components = [] - - # Parse cost components - for comp_data in resource_data.get("costComponents", []): - monthly_cost = comp_data.get("monthlyCost") - hourly_cost = comp_data.get("hourlyCost") - - # Skip components without cost data - if monthly_cost is None and hourly_cost is None: - continue - - component = CostComponent( - name=comp_data.get("name", ""), - unit=comp_data.get("unit", ""), - monthly_cost=float(monthly_cost or 0), - hourly_cost=float(hourly_cost or 0), - usage_based=comp_data.get("usageBased", False), - ) - components.append(component) - - resource = ResourceCost( - name=resource_data.get("name", ""), - resource_type=resource_data.get("resourceType", ""), - monthly_cost=float(resource_data.get("monthlyCost", 0)), - hourly_cost=float(resource_data.get("hourlyCost", 0)), - components=components, - ) - resources.append(resource) - + if "summary" in data: + # infracost scan --json format + summary = data["summary"] + monthly_cost_str = summary.get("total_monthly_cost", "0") or "0" + return CostAnalysis( + total_monthly_cost=float(monthly_cost_str), + currency=data.get("currency", "USD"), + resources=int(summary.get("resources", 0)), + costed_resources=int(summary.get("costed_resources", 0)), + free_resources=int(summary.get("free_resources", 0)), + ) + # infracost inspect --json format + monthly_cost_str = data.get("monthly_cost", "0") or "0" return CostAnalysis( - total_monthly_cost=float(data.get("totalMonthlyCost", 0)), - total_hourly_cost=float(data.get("totalHourlyCost", 0)), + total_monthly_cost=float(monthly_cost_str), currency=data.get("currency", "USD"), - resources=resources, - detected_resources=data.get("summary", {}).get( - "totalDetectedResources", 0 - ), - supported_resources=data.get("summary", {}).get( - "totalSupportedResources", 0 - ), + resources=int(data.get("resources", 0)), + costed_resources=int(data.get("costed_resources", 0)), + free_resources=int(data.get("free_resources", 0)), ) - - except Exception as e: - typer.echo(f"⚠️ Failed to parse cost data: {e}") + except (ValueError, TypeError) as e: + typer.echo(f"Failed to parse cost data: {e}") return None def display_cost_breakdown( - analysis: CostAnalysis, warning_threshold: float = 100.0 + analysis: CostAnalysis, + warning_threshold: float = 100.0, + show_resources: bool = False, ) -> None: - """ - Display a user-friendly cost breakdown. - - Args: - analysis: CostAnalysis object with cost data - warning_threshold: Monthly cost threshold for warnings (default: $100) - """ typer.echo("\n" + "=" * 60) - typer.secho("💰 COST ANALYSIS", fg=typer.colors.BRIGHT_CYAN, bold=True) + typer.secho("COST ANALYSIS", fg=typer.colors.BRIGHT_CYAN, bold=True) typer.echo("=" * 60) - # Overall costs monthly_cost = analysis.total_monthly_cost + color = typer.colors.BRIGHT_GREEN if monthly_cost < warning_threshold else typer.colors.BRIGHT_YELLOW typer.secho( f"Monthly Cost: ${monthly_cost:.2f} {analysis.currency}", - fg=( - typer.colors.BRIGHT_GREEN - if monthly_cost < warning_threshold - else typer.colors.BRIGHT_YELLOW - ), + fg=color, bold=True, ) typer.echo( - f"Hourly Cost: ${analysis.total_hourly_cost:.4f} {analysis.currency}" - ) - typer.echo( - f"Resources: {analysis.supported_resources} supported, {analysis.detected_resources} total" + f"Resources: {analysis.costed_resources} costed, " + f"{analysis.free_resources} free, {analysis.resources} total" ) - # Warning for high costs + if show_resources and analysis.resource_costs: + typer.echo("\nWhat costs money:") + for label, cost in analysis.resource_costs: + typer.secho(f" ${cost:.2f} {label}", fg=typer.colors.BRIGHT_BLUE) + typer.echo( + "\nNote: Cloud Run, BigQuery, and GCS show $0 at idle — they scale to\n" + "zero and charge only for actual usage." + ) + if monthly_cost > warning_threshold: typer.echo() typer.secho( - f"⚠️ WARNING: Monthly cost exceeds ${warning_threshold:.0f} threshold!", + f"WARNING: Monthly cost exceeds ${warning_threshold:.0f} threshold!", fg=typer.colors.BRIGHT_RED, bold=True, ) - - # Resource breakdown - if analysis.resources: - typer.echo("\n📋 Resource Breakdown:") - typer.echo("-" * 40) - - # Sort resources by monthly cost (highest first) - sorted_resources = sorted( - analysis.resources, key=lambda r: r.monthly_cost, reverse=True - ) - - for resource in sorted_resources: - if resource.monthly_cost > 0: - typer.echo(f"\n• {resource.name}") - typer.echo(f" Type: {resource.resource_type}") - typer.secho( - f" Monthly Cost: ${resource.monthly_cost:.2f}", - fg=typer.colors.BRIGHT_BLUE, - ) - - # Show top cost components - if resource.components: - for component in resource.components[ - :3 - ]: # Show top 3 components - if component.monthly_cost > 0: - usage_note = ( - " (usage-based)" - if component.usage_based - else "" - ) - typer.echo( - f" └─ {component.name}: ${component.monthly_cost:.2f}{usage_note}" - ) - - # Usage-based resources note - usage_based_resources = [ - r - for r in analysis.resources - if any(c.usage_based for c in r.components) - ] - - if usage_based_resources: - typer.echo("\n📊 Note: Some resources have usage-based pricing") - typer.echo(" Actual costs may vary based on usage patterns") - typer.echo() def format_cost_for_confirmation(monthly_cost: float, currency: str) -> str: - """ - Format cost information for the deployment confirmation prompt. - - Args: - monthly_cost: Monthly cost in the specified currency - currency: Currency code (e.g., 'USD') - - Returns: - Formatted string for confirmation prompt - """ if monthly_cost > 0: - return f"💰 Monthly cost: ~${monthly_cost:.2f} {currency}" + return f"Monthly cost: ~${monthly_cost:.2f} {currency}" else: - return "💰 Monthly cost: Variable (usage-based pricing)" + return "Monthly cost: Variable (usage-based pricing)" def run_infracost_analysis( - terraform_dir: Path, warning_threshold: float = 100.0, usage_file: Optional[Path] = None + terraform_dir: Path, + warning_threshold: float = 100.0, + show_resources: bool = False, ) -> Optional[CostAnalysis]: - """ - Run complete infracost analysis workflow. - - Args: - terraform_dir: Path to terraform directory - warning_threshold: Cost warning threshold - - Returns: - CostAnalysis object or None if analysis failed - """ - # Check if infracost is available if not check_infracost_available(): - typer.echo( - "💡 Tip: Install infracost CLI for cost analysis before deployment" - ) + typer.echo("Tip: Install infracost CLI for cost analysis before deployment") typer.echo(" Visit: https://www.infracost.io/docs/#quick-start") return None - typer.echo("💰 Running cost analysis...") + if not check_infracost_authenticated(): + typer.echo("Tip: Authenticate infracost to enable cost analysis") + typer.echo(" Run: infracost auth login") + typer.echo(" Or set: export INFRACOST_API_KEY=") + return None - # Run infracost breakdown - raw_data = run_infracost_breakdown(terraform_dir, usage_file=usage_file) - if not raw_data: + typer.echo("Running cost analysis...") + raw_data = run_infracost_scan(terraform_dir) + if raw_data is None: return None - # Parse the data - analysis = parse_infracost_data(raw_data) - if not analysis: + analysis = parse_infracost_scan_data(raw_data) + if analysis is None: return None - # Display the breakdown - display_cost_breakdown(analysis, warning_threshold) + if show_resources: + analysis.resource_costs = fetch_resource_costs() + display_cost_breakdown(analysis, warning_threshold, show_resources=show_resources) return analysis diff --git a/tests/test_infracost.py b/tests/test_infracost.py new file mode 100644 index 0000000..26d9227 --- /dev/null +++ b/tests/test_infracost.py @@ -0,0 +1,169 @@ +"""Unit tests for src/deployml/utils/infracost.py (v2 rewrite). + +Covers: + 1. check_infracost_available — OS-boundary mock (subprocess) + 2. check_infracost_authenticated — file-system + env var mocks + 3. parse_infracost_scan_data — pure parsing, no I/O + 4. format_cost_for_confirmation — pure function +""" + +import os +from pathlib import Path +from types import SimpleNamespace +from unittest.mock import patch, patch as _patch + +from deployml.utils.infracost import ( + CostAnalysis, + check_infracost_available, + check_infracost_authenticated, + format_cost_for_confirmation, + parse_infracost_scan_data, +) + + +# --------------------------------------------------------------------------- +# check_infracost_available +# --------------------------------------------------------------------------- + +def test_check_infracost_available_returns_true(): + with patch("deployml.utils.infracost.subprocess.run") as mock_run: + mock_run.return_value = SimpleNamespace(returncode=0, stdout="infracost v2.4.2", stderr="") + assert check_infracost_available() is True + + +def test_check_infracost_available_returns_false_when_not_installed(): + with patch("deployml.utils.infracost.subprocess.run", side_effect=FileNotFoundError): + assert check_infracost_available() is False + + +def test_check_infracost_available_returns_false_on_nonzero_returncode(): + with patch("deployml.utils.infracost.subprocess.run") as mock_run: + mock_run.return_value = SimpleNamespace(returncode=1, stdout="", stderr="error") + assert check_infracost_available() is False + + +# --------------------------------------------------------------------------- +# check_infracost_authenticated +# --------------------------------------------------------------------------- + +def test_check_infracost_authenticated_returns_true_with_credentials_file(tmp_path): + creds_file = tmp_path / ".config" / "infracost" / "credentials.yml" + creds_file.parent.mkdir(parents=True) + creds_file.write_text("api_key: test123\n") + with patch("deployml.utils.infracost.Path.home", return_value=tmp_path): + # INFRACOST_API_KEY must be absent so we exercise the file-path branch + env_without_key = {k: v for k, v in os.environ.items() if k != "INFRACOST_API_KEY"} + with patch.dict(os.environ, env_without_key, clear=True): + assert check_infracost_authenticated() is True + + +def test_check_infracost_authenticated_returns_true_with_env_var(tmp_path): + with patch("deployml.utils.infracost.Path.home", return_value=tmp_path): + with patch.dict(os.environ, {"INFRACOST_API_KEY": "ic-test-key-abc"}): + assert check_infracost_authenticated() is True + + +def test_check_infracost_authenticated_returns_false_when_neither(tmp_path): + # tmp_path has no credentials.yml and INFRACOST_API_KEY is cleared + env_without_key = {k: v for k, v in os.environ.items() if k != "INFRACOST_API_KEY"} + with patch.dict(os.environ, env_without_key, clear=True): + with patch("deployml.utils.infracost.Path.home", return_value=tmp_path): + assert check_infracost_authenticated() is False + + +# --------------------------------------------------------------------------- +# parse_infracost_scan_data (pure parsing, no subprocess) +# --------------------------------------------------------------------------- + +# infracost inspect --json format (fields at top level) +_INSPECT_JSON = { + "projects": 1, + "resources": 6, + "costed_resources": 1, + "free_resources": 5, + "monthly_cost": "26.4591683", + "currency": "USD", + "project_details": [], + "failing_policy_list": [], +} + +# infracost scan --json format (fields nested under "summary") +_SCAN_JSON = { + "currency": "USD", + "summary": { + "projects": 1, + "resources": 71, + "costed_resources": 10, + "free_resources": 61, + "total_monthly_cost": "34.55", + }, + "projects": [], +} + + +def test_parse_infracost_scan_data_inspect_format_extracts_all_fields(): + result = parse_infracost_scan_data(_INSPECT_JSON) + assert result is not None + assert abs(result.total_monthly_cost - 26.4591683) < 0.0001 + assert result.currency == "USD" + assert result.resources == 6 + assert result.costed_resources == 1 + assert result.free_resources == 5 + + +def test_parse_infracost_scan_data_scan_format_extracts_all_fields(): + # infracost scan --json wraps fields under "summary" with "total_monthly_cost" + result = parse_infracost_scan_data(_SCAN_JSON) + assert result is not None + assert abs(result.total_monthly_cost - 34.55) < 0.001 + assert result.currency == "USD" + assert result.resources == 71 + assert result.costed_resources == 10 + assert result.free_resources == 61 + + +def test_parse_infracost_scan_data_handles_empty_dict(): + result = parse_infracost_scan_data({}) + assert result is not None + assert result.total_monthly_cost == 0.0 + assert result.currency == "USD" + assert result.resources == 0 + assert result.costed_resources == 0 + assert result.free_resources == 0 + + +def test_parse_infracost_scan_data_handles_null_monthly_cost(): + data = {**_INSPECT_JSON, "monthly_cost": None} + result = parse_infracost_scan_data(data) + assert result is not None + assert result.total_monthly_cost == 0.0 + + +def test_parse_infracost_scan_data_handles_invalid_cost_string(): + # float("not-a-number") raises ValueError → function should return None + data = {**_INSPECT_JSON, "monthly_cost": "not-a-number"} + result = parse_infracost_scan_data(data) + assert result is None + + +def test_parse_infracost_scan_data_handles_zero_cost(): + data = {**_INSPECT_JSON, "monthly_cost": "0", "costed_resources": 0, "free_resources": 6} + result = parse_infracost_scan_data(data) + assert result is not None + assert result.total_monthly_cost == 0.0 + assert result.costed_resources == 0 + + +# --------------------------------------------------------------------------- +# format_cost_for_confirmation (pure function) +# --------------------------------------------------------------------------- + +def test_format_cost_for_confirmation_nonzero_cost(): + result = format_cost_for_confirmation(26.46, "USD") + assert "$26.46" in result + assert "USD" in result + + +def test_format_cost_for_confirmation_zero_cost(): + result = format_cost_for_confirmation(0.0, "USD") + assert "Variable" in result or "usage-based" in result.lower()