Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions python/antigravity/harness_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import asyncio
import importlib.util
import logging
import os
import sys
import grpc
from grpc_health.v1 import health, health_pb2, health_pb2_grpc
Expand Down Expand Up @@ -94,6 +95,37 @@ def hydrate_ax_history_to_steps(historical_messages) -> list[Step]:
steps.append(step)
return steps

def _has_credentials(config: AgentConfig | None) -> bool:
"""Checks if Gemini credentials are set either in env or config."""
# Check environment variables
has_api_key = os.environ.get("GEMINI_API_KEY") or os.environ.get("GOOGLE_API_KEY")
use_vertex = (
os.environ.get("GOOGLE_GENAI_USE_VERTEXAI", "").lower() in ("true", "1") or
os.environ.get("GOOGLE_GENAI_USE_ENTERPRISE", "").lower() in ("true", "1")
)
if has_api_key or use_vertex:
return True

# Check configuration
if config:
# Check nested gemini_config
gemini_config = getattr(config, "gemini_config", None)
if gemini_config:
# 1. Direct configuration
if getattr(gemini_config, "api_key", None) or getattr(gemini_config, "vertex", False):
return True
# 2. Per-model configuration
models = getattr(gemini_config, "models", None)
default_model = getattr(models, "default", None) if models else None
if default_model and getattr(default_model, "api_key", None):
return True

# Check top-level config shorthands
if getattr(config, "api_key", None) or getattr(config, "vertex", False):
return True

return False

class AntigravityHarnessServiceServicer(ax_pb2_grpc.HarnessServiceServicer):
"""Implements the ax.HarnessService protocol over gRPC."""

Expand Down Expand Up @@ -138,6 +170,20 @@ async def _run_turn(self, request):
)
return

# Check credentials
if not _has_credentials(loaded_config):
yield ax_pb2.HarnessResponse(
conversation_id=request.conversation_id,
end=ax_pb2.HarnessEnd(
state=ax_pb2.STATE_FAILED,
error_message=(
"No Gemini credentials configured. Please set the GEMINI_API_KEY environment variable "
"(AI Studio) or GOOGLE_GENAI_USE_VERTEXAI=True (Vertex AI) before starting the harness server."
)
)
)
return

try:
async with Agent(loaded_config) as agent:
conversation = agent.conversation
Expand Down
120 changes: 120 additions & 0 deletions python/antigravity/harness_server_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

@pytest.fixture
def mock_config(monkeypatch):
monkeypatch.setenv("GEMINI_API_KEY", "mock-api-key")
cfg = LocalAgentConfig(system_instructions="Test instructions")
import python.antigravity.harness_server as hs
hs.loaded_config = cfg
Expand Down Expand Up @@ -115,3 +116,122 @@ async def _run():
await server.stop(0)

asyncio.run(_run())


def test_grpc_connect_missing_credentials(mock_config, monkeypatch):
monkeypatch.delenv("GEMINI_API_KEY", raising=False)
monkeypatch.delenv("GOOGLE_API_KEY", raising=False)
monkeypatch.delenv("GOOGLE_GENAI_USE_VERTEXAI", raising=False)
monkeypatch.delenv("GOOGLE_GENAI_USE_ENTERPRISE", raising=False)

async def _run():
server = grpc.aio.server()
servicer = AntigravityHarnessServiceServicer()
ax_pb2_grpc.add_HarnessServiceServicer_to_server(servicer, server)
port = server.add_insecure_port("localhost:0")
await server.start()

addr = f"localhost:{port}"
async with grpc.aio.insecure_channel(addr) as channel:
stub = ax_pb2_grpc.HarnessServiceStub(channel)

start_payload = ax_pb2.HarnessStart(
messages=[
ax_pb2.Message(role="user", content=content_pb2.Content(text=content_pb2.TextContent(text="Hi")))
]
)
req = ax_pb2.HarnessRequest(
conversation_id="conv-test-credentials",
harness_id="antigravity",
start=start_payload
)

async def request_iter():
yield req

responses = []
async for resp in stub.Connect(request_iter()):
responses.append(resp)

assert len(responses) == 1
assert responses[0].WhichOneof('type') == 'end'
assert responses[0].end.state == ax_pb2.STATE_FAILED
assert "No Gemini credentials configured" in responses[0].end.error_message
assert "GEMINI_API_KEY" in responses[0].end.error_message

await server.stop(0)

asyncio.run(_run())


def test_grpc_connect_programmatic_credentials(monkeypatch):
monkeypatch.delenv("GEMINI_API_KEY", raising=False)
monkeypatch.delenv("GOOGLE_API_KEY", raising=False)
monkeypatch.delenv("GOOGLE_GENAI_USE_VERTEXAI", raising=False)
monkeypatch.delenv("GOOGLE_GENAI_USE_ENTERPRISE", raising=False)

# Config with API key programmatically set
cfg = LocalAgentConfig(system_instructions="Test instructions", api_key="mock-config-api-key")
import python.antigravity.harness_server as hs
hs.loaded_config = cfg

async def _run():
server = grpc.aio.server()
servicer = AntigravityHarnessServiceServicer()
ax_pb2_grpc.add_HarnessServiceServicer_to_server(servicer, server)
port = server.add_insecure_port("localhost:0")
await server.start()

addr = f"localhost:{port}"
async with grpc.aio.insecure_channel(addr) as channel:
stub = ax_pb2_grpc.HarnessServiceStub(channel)

# Mock Agent so we can test programmatic config logic passes
class MockConversation:
def __init__(self):
self._steps = []
async def chat(self, text):
class MockResponse:
def __init__(self):
self.chunks = self._chunk_generator()
async def _chunk_generator(self):
from google.antigravity.types import Text
yield Text(text="Passed check", step_index=0)
return MockResponse()

class MockAgent:
def __init__(self, config):
self.conversation = MockConversation()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
pass
monkeypatch.setattr("python.antigravity.harness_server.Agent", MockAgent)

start_payload = ax_pb2.HarnessStart(
messages=[
ax_pb2.Message(role="user", content=content_pb2.Content(text=content_pb2.TextContent(text="Hi")))
]
)
req = ax_pb2.HarnessRequest(
conversation_id="conv-test-prog",
harness_id="antigravity",
start=start_payload
)

async def request_iter():
yield req

responses = []
async for resp in stub.Connect(request_iter()):
responses.append(resp)

assert len(responses) == 2 # Text + End
assert responses[0].outputs.messages[0].content.text.text == "Passed check"
assert responses[1].end.state == ax_pb2.STATE_COMPLETED

await server.stop(0)

asyncio.run(_run())


Loading