From 0b561b39e31cce533e71f161083edc24d52a6258 Mon Sep 17 00:00:00 2001 From: Yashraj Shukla Date: Sat, 6 Jun 2026 13:00:54 +0000 Subject: [PATCH] feat(go-adk): add compaction support to Go ADK runtime Implements post-invocation event history compaction for the Go ADK runtime. The upstream Go ADK v1.1.0 does not ship this so it is implemented within kagent via a new compaction package Closes #1783 Signed-off-by: Yashraj Shukla --- go/adk/cmd/main.go | 8 +- go/adk/pkg/a2a/executor.go | 15 + go/adk/pkg/compaction/compaction.go | 317 ++++++++++++++++++ go/adk/pkg/compaction/compaction_test.go | 143 ++++++++ go/adk/pkg/runner/adapter.go | 18 +- go/adk/pkg/session/local_session.go | 12 + .../controller/reconciler/reconciler.go | 5 +- 7 files changed, 507 insertions(+), 11 deletions(-) create mode 100644 go/adk/pkg/compaction/compaction.go create mode 100644 go/adk/pkg/compaction/compaction_test.go diff --git a/go/adk/cmd/main.go b/go/adk/cmd/main.go index ad0d28d804..2d4f79f13d 100644 --- a/go/adk/cmd/main.go +++ b/go/adk/cmd/main.go @@ -173,11 +173,16 @@ func main() { logger.Info("Memory service enabled", "appName", appName) } - runnerConfig, subagentSessionIDs, err := runnerpkg.CreateRunnerConfig(ctx, agentConfig, sessionService, appName, memoryService) + runnerConfig, subagentSessionIDs, compactionCfg, err := runnerpkg.CreateRunnerConfig(ctx, agentConfig, sessionService, appName, memoryService) if err != nil { logger.Error(err, "Failed to create Google ADK Runner config") os.Exit(1) } + if compactionCfg != nil { + logger.Info("Compaction enabled", + "interval", compactionCfg.CompactionInterval, + "overlapSize", compactionCfg.OverlapSize) + } stream := agentConfig.GetStream() executor := a2a.NewKAgentExecutor(a2a.KAgentExecutorConfig{ @@ -187,6 +192,7 @@ func main() { Stream: stream, AppName: appName, Logger: logger, + CompactionConfig: compactionCfg, }) // Build the agent card. diff --git a/go/adk/pkg/a2a/executor.go b/go/adk/pkg/a2a/executor.go index 8ae11d3d69..87d5b1dc43 100644 --- a/go/adk/pkg/a2a/executor.go +++ b/go/adk/pkg/a2a/executor.go @@ -12,6 +12,7 @@ import ( "github.com/a2aproject/a2a-go/a2asrv/eventqueue" "github.com/go-logr/logr" "github.com/kagent-dev/kagent/go/adk/pkg/auth" + "github.com/kagent-dev/kagent/go/adk/pkg/compaction" "github.com/kagent-dev/kagent/go/adk/pkg/models" "github.com/kagent-dev/kagent/go/adk/pkg/session" "github.com/kagent-dev/kagent/go/adk/pkg/skills" @@ -37,6 +38,7 @@ type KAgentExecutorConfig struct { AppName string SkillsDirectory string Logger logr.Logger + CompactionConfig *compaction.Config } // KAgentExecutor implements a2asrv.AgentExecutor @@ -48,6 +50,7 @@ type KAgentExecutor struct { appName string skillsDirectory string logger logr.Logger + compactor *compaction.Compactor } var _ a2asrv.AgentExecutor = (*KAgentExecutor)(nil) @@ -69,6 +72,7 @@ func NewKAgentExecutor(cfg KAgentExecutorConfig) *KAgentExecutor { appName: cfg.AppName, skillsDirectory: skillsDir, logger: cfg.Logger.WithName("kagent-executor"), + compactor: compaction.New(cfg.CompactionConfig, cfg.Logger), } } @@ -355,6 +359,17 @@ func (e *KAgentExecutor) Execute(ctx context.Context, reqCtx *a2asrv.RequestCont } } + // 10b. Post-invocation compaction (no-op when not configured). + // TODO: avoid the extra GetSession call by threading the session object through Execute. + if e.compactor != nil && e.sessionService != nil { + liveSess, sessErr := e.sessionService.GetSession(ctx, e.appName, userID, sessionID) + if sessErr == nil && liveSess != nil { + if compactErr := e.compactor.MaybeCompact(ctx, liveSess, e.sessionService, 0); compactErr != nil { + e.logger.Error(compactErr, "Post-invocation compaction failed (non-fatal)") + } + } + } + // 11. Emit final event. finalMeta := maps.Clone(baseMeta) if invocationID != "" { diff --git a/go/adk/pkg/compaction/compaction.go b/go/adk/pkg/compaction/compaction.go new file mode 100644 index 0000000000..6d3ba5b37f --- /dev/null +++ b/go/adk/pkg/compaction/compaction.go @@ -0,0 +1,317 @@ +package compaction + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/go-logr/logr" + "github.com/google/uuid" + "google.golang.org/adk/model" + + adkapiconfig "github.com/kagent-dev/kagent/go/api/adk" + adksession "google.golang.org/adk/session" + "google.golang.org/genai" +) + +const ( + summaryEventAuthor = "compaction_summarizer" + defaultCompactionInterval = 5 + defaultOverlapSize = 2 + // noInvocationSentinel is a sentinel for events with no InvocationID. + // The NUL prefix makes collision with real IDs practically impossible. + noInvocationSentinel = "\x00no_invocation" + defaultSummaryPrompt = "You are a conversation compactor. Summarise the following agent conversation history concisely, preserving all key facts, decisions, tool calls, and outcomes. The summary will replace these events in the agent context window.\n\nConversation history:\n{{events}}\n\nProvide a concise summary:" +) + +// Config holds compaction settings for an agent. +type Config struct { + CompactionInterval int + OverlapSize int + TokenThreshold int + EventRetentionSize int + SummarizerLLM model.LLM + PromptTemplate string +} + +// Compactor performs post-invocation event history compaction on a session. +// A nil Compactor is valid; all methods are no-ops. +type Compactor struct { + cfg *Config + logger logr.Logger +} + +// New returns a Compactor for cfg, or nil when cfg is nil. +func New(cfg *Config, logger logr.Logger) *Compactor { + if cfg == nil { + return nil + } + return &Compactor{cfg: cfg, logger: logger.WithName("compaction")} +} + +// MaybeCompact checks whether compaction should run after the latest invocation +// and performs it if so. Safe to call after every agent run. +func (c *Compactor) MaybeCompact( + ctx context.Context, + sess adksession.Session, + sessionSvc adksession.Service, + lastTokens int, +) error { + if c == nil { + return nil + } + + log := c.logger.WithValues("sessionID", sess.ID()) + + allEvents := collectEvents(sess) + invocations := groupByInvocation(allEvents) + + trigger := false + + if len(invocations) >= c.cfg.CompactionInterval { + trigger = true + log.V(1).Info("Compaction triggered by interval", + "invocations", len(invocations), + "threshold", c.cfg.CompactionInterval) + } + + if !trigger && c.cfg.TokenThreshold > 0 { + tokens := lastTokens + if tokens == 0 { + tokens = estimateTokens(allEvents) + } + if tokens >= c.cfg.TokenThreshold { + trigger = true + log.V(1).Info("Compaction triggered by token threshold", + "tokens", tokens, + "threshold", c.cfg.TokenThreshold) + } + } + + if !trigger { + return nil + } + + return c.compact(ctx, sess, sessionSvc, invocations, log) +} + +func (c *Compactor) compact( + ctx context.Context, + sess adksession.Session, + sessionSvc adksession.Service, + invocations []invocationGroup, + log logr.Logger, +) error { + keepCount := c.cfg.OverlapSize + if keepCount >= len(invocations) { + return nil + } + + toCompact := invocations[:len(invocations)-keepCount] + if len(toCompact) == 0 { + return nil + } + + var compactEvents []*adksession.Event + for _, inv := range toCompact { + compactEvents = append(compactEvents, inv.events...) + } + + log.Info("Compacting events", + "compactCount", len(compactEvents), + "keepInvocations", keepCount) + + summaryText, err := c.summarise(ctx, compactEvents) + if err != nil { + log.Error(err, "Failed to summarise events; skipping compaction") + return nil + } + + summaryEvent := buildSummaryEvent(summaryText) + + if err := sessionSvc.AppendEvent(ctx, sess, summaryEvent); err != nil { + return fmt.Errorf("compaction: failed to persist summary event: %w", err) + } + + var keepEvents []*adksession.Event + for _, inv := range invocations[len(invocations)-keepCount:] { + keepEvents = append(keepEvents, inv.events...) + } + // Apply EventRetentionSize cap if configured. + if c.cfg.EventRetentionSize > 0 && len(keepEvents) > c.cfg.EventRetentionSize { + keepEvents = keepEvents[len(keepEvents)-c.cfg.EventRetentionSize:] + } + replaceSessionEvents(sess, summaryEvent, keepEvents) + + log.Info("Compaction complete", + "summaryLen", len(summaryText), + "keptInvocations", keepCount) + + return nil +} + +func (c *Compactor) summarise(ctx context.Context, events []*adksession.Event) (string, error) { + text := serializeEvents(events) + + if c.cfg.SummarizerLLM == nil { + return "[Compacted history]\n" + text, nil + } + + prompt := strings.ReplaceAll(c.cfg.PromptTemplate, "{{events}}", text) + + req := &model.LLMRequest{ + Contents: []*genai.Content{ + {Role: "user", Parts: []*genai.Part{{Text: prompt}}}, + }, + } + + var parts []string + for resp, err := range c.cfg.SummarizerLLM.GenerateContent(ctx, req, false) { + if err != nil { + return "", fmt.Errorf("summarizer LLM error: %w", err) + } + if resp != nil && resp.Content != nil { + for _, p := range resp.Content.Parts { + if p != nil && p.Text != "" { + parts = append(parts, p.Text) + } + } + } + } + + if len(parts) == 0 { + return "", fmt.Errorf("summarizer returned empty response") + } + return strings.Join(parts, ""), nil +} + +type invocationGroup struct { + invocationID string + events []*adksession.Event +} + +func groupByInvocation(events []*adksession.Event) []invocationGroup { + var groups []invocationGroup + index := map[string]int{} + + for _, e := range events { + id := e.InvocationID + if id == "" { + id = noInvocationSentinel + } + if i, ok := index[id]; ok { + groups[i].events = append(groups[i].events, e) + } else { + index[id] = len(groups) + groups = append(groups, invocationGroup{ + invocationID: id, + events: []*adksession.Event{e}, + }) + } + } + return groups +} + +func collectEvents(sess adksession.Session) []*adksession.Event { + var out []*adksession.Event + for e := range sess.Events().All() { + out = append(out, e) + } + return out +} + +func serializeEvents(events []*adksession.Event) string { + var sb strings.Builder + for _, e := range events { + if e.Content == nil { + continue + } + role := e.Author + if role == "" { + role = e.Content.Role + } + for _, p := range e.Content.Parts { + if p == nil { + continue + } + switch { + case p.Text != "": + fmt.Fprintf(&sb, "[%s]: %s\n", role, p.Text) + case p.FunctionCall != nil: + fmt.Fprintf(&sb, "[%s] called tool %q\n", role, p.FunctionCall.Name) + case p.FunctionResponse != nil: + fmt.Fprintf(&sb, "[tool %s response]\n", p.FunctionResponse.Name) + } + } + } + return sb.String() +} + +func buildSummaryEvent(summaryText string) *adksession.Event { + e := &adksession.Event{ + ID: uuid.NewString(), + InvocationID: "compaction_" + uuid.NewString(), + Timestamp: time.Now(), + Author: summaryEventAuthor, + } + e.LLMResponse = model.LLMResponse{ + Content: &genai.Content{ + Role: "model", + Parts: []*genai.Part{{Text: summaryText}}, + }, + } + return e +} + +func estimateTokens(events []*adksession.Event) int { + return len(serializeEvents(events)) / 4 +} + +// replaceSessionEvents rewrites the in-memory event list when the session +// supports it. If not, the summary was still persisted via AppendEvent and +// will be visible on the next fresh Get from the backend. +func replaceSessionEvents(sess adksession.Session, summary *adksession.Event, keep []*adksession.Event) { + type replacer interface { + ReplaceEvents([]*adksession.Event) + } + if r, ok := sess.(replacer); ok { + newEvents := make([]*adksession.Event, 0, 1+len(keep)) + newEvents = append(newEvents, summary) + newEvents = append(newEvents, keep...) + r.ReplaceEvents(newEvents) + } +} + +// FromAgentConfig builds a Config from the kagent AgentConfig. +// Returns nil when compaction is not configured. +func FromAgentConfig(agentCfg *adkapiconfig.AgentConfig) (*Config, error) { + if agentCfg == nil || agentCfg.ContextConfig == nil || agentCfg.ContextConfig.Compaction == nil { + return nil, nil + } + comp := agentCfg.ContextConfig.Compaction + + cfg := &Config{ + CompactionInterval: defaultCompactionInterval, + OverlapSize: defaultOverlapSize, + PromptTemplate: defaultSummaryPrompt, + } + + if comp.CompactionInterval != nil && *comp.CompactionInterval > 0 { + cfg.CompactionInterval = *comp.CompactionInterval + } + if comp.OverlapSize != nil && *comp.OverlapSize >= 0 { + cfg.OverlapSize = *comp.OverlapSize + } + if comp.TokenThreshold != nil { + cfg.TokenThreshold = *comp.TokenThreshold + } + if comp.EventRetentionSize != nil { + cfg.EventRetentionSize = *comp.EventRetentionSize + } + if comp.PromptTemplate != "" { + cfg.PromptTemplate = comp.PromptTemplate + } + + return cfg, nil +} diff --git a/go/adk/pkg/compaction/compaction_test.go b/go/adk/pkg/compaction/compaction_test.go new file mode 100644 index 0000000000..2c23d43c22 --- /dev/null +++ b/go/adk/pkg/compaction/compaction_test.go @@ -0,0 +1,143 @@ +package compaction + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/logr" + "github.com/google/uuid" + "google.golang.org/adk/model" + adksession "google.golang.org/adk/session" + "google.golang.org/genai" +) + +func makeEvent(invID, author, text string) *adksession.Event { + e := &adksession.Event{ + ID: uuid.NewString(), + InvocationID: invID, + Author: author, + Timestamp: time.Now(), + } + e.LLMResponse = model.LLMResponse{ + Content: &genai.Content{ + Role: "model", + Parts: []*genai.Part{{Text: text}}, + }, + } + return e +} + +func TestNew_NilConfig(t *testing.T) { + c := New(nil, logr.Discard()) + if c != nil { + t.Fatal("expected nil compactor for nil config") + } +} + +func TestNew_WithConfig(t *testing.T) { + cfg := &Config{CompactionInterval: 5, OverlapSize: 2} + c := New(cfg, logr.Discard()) + if c == nil { + t.Fatal("expected non-nil compactor") + } +} + +func TestGroupByInvocation_Basic(t *testing.T) { + events := []*adksession.Event{ + makeEvent("inv1", "user", "hello"), + makeEvent("inv1", "agent", "hi"), + makeEvent("inv2", "user", "how are you"), + makeEvent("inv2", "agent", "good"), + makeEvent("inv3", "user", "bye"), + } + groups := groupByInvocation(events) + if len(groups) != 3 { + t.Fatalf("expected 3 groups, got %d", len(groups)) + } + if groups[0].invocationID != "inv1" { + t.Errorf("expected inv1, got %s", groups[0].invocationID) + } + if len(groups[0].events) != 2 { + t.Errorf("expected 2 events in inv1, got %d", len(groups[0].events)) + } +} + +func TestGroupByInvocation_EmptyID(t *testing.T) { + events := []*adksession.Event{ + makeEvent("", "user", "hello"), + makeEvent("", "agent", "hi"), + } + groups := groupByInvocation(events) + if len(groups) != 1 { + t.Fatalf("expected 1 group for empty IDs, got %d", len(groups)) + } +} + +func TestSerializeEvents(t *testing.T) { + events := []*adksession.Event{ + makeEvent("inv1", "user", "hello world"), + } + text := serializeEvents(events) + if text == "" { + t.Fatal("expected non-empty serialized text") + } + if len(text) == 0 { + t.Fatal("serialized text should not be empty") + } +} + +func TestEstimateTokens(t *testing.T) { + events := []*adksession.Event{ + makeEvent("inv1", "user", "hello world"), + } + tokens := estimateTokens(events) + if tokens <= 0 { + t.Fatal("expected positive token estimate") + } +} + +func TestBuildSummaryEvent(t *testing.T) { + e := buildSummaryEvent("this is a summary") + if e == nil { + t.Fatal("expected non-nil summary event") + } + if e.Author != summaryEventAuthor { + t.Errorf("expected author %s, got %s", summaryEventAuthor, e.Author) + } + if e.Content == nil { + t.Fatal("expected non-nil content") + } + if len(e.Content.Parts) == 0 || e.Content.Parts[0].Text != "this is a summary" { + t.Error("unexpected summary content") + } +} + +func TestMaybeCompact_NilCompactor(t *testing.T) { + var c *Compactor + err := c.MaybeCompact(context.TODO(), nil, nil, 0) + if err != nil { + t.Fatalf("expected nil error from nil compactor, got %v", err) + } +} + +func TestCompact_BelowThreshold(t *testing.T) { + cfg := &Config{CompactionInterval: 5, OverlapSize: 2} + c := New(cfg, logr.Discard()) + + // Only 2 invocations, threshold is 5 - compact should be a no-op. + events := []*adksession.Event{ + makeEvent("inv1", "user", "hello"), + makeEvent("inv2", "user", "world"), + } + invocations := groupByInvocation(events) + if len(invocations) >= c.cfg.CompactionInterval { + t.Fatal("test setup wrong: should be below threshold") + } + + // compact returns nil without touching session when keepCount >= len(invocations). + err := c.compact(context.TODO(), nil, nil, invocations, logr.Discard()) + if err != nil { + t.Fatalf("expected nil error below threshold, got %v", err) + } +} diff --git a/go/adk/pkg/runner/adapter.go b/go/adk/pkg/runner/adapter.go index 0441f778c0..8fff174b44 100644 --- a/go/adk/pkg/runner/adapter.go +++ b/go/adk/pkg/runner/adapter.go @@ -8,6 +8,7 @@ import ( "github.com/go-logr/logr" "github.com/kagent-dev/kagent/go/adk/pkg/agent" + "github.com/kagent-dev/kagent/go/adk/pkg/compaction" kagentmemory "github.com/kagent-dev/kagent/go/adk/pkg/memory" "github.com/kagent-dev/kagent/go/adk/pkg/session" "github.com/kagent-dev/kagent/go/adk/pkg/sts" @@ -34,26 +35,26 @@ func CreateRunnerConfig( sessionService *session.KAgentSessionService, appName string, memoryService *kagentmemory.KagentMemoryService, -) (runner.Config, map[string]string, error) { +) (runner.Config, map[string]string, *compaction.Config, error) { log := logr.FromContextOrDiscard(ctx) var extraTools []adktool.Tool if memoryService != nil { saveTool, err := kagentmemory.NewSaveMemoryTool(memoryService) if err != nil { - return runner.Config{}, nil, fmt.Errorf("failed to create save_memory tool: %w", err) + return runner.Config{}, nil, nil, fmt.Errorf("failed to create save_memory tool: %w", err) } extraTools = append(extraTools, saveTool) } stsPlugin, err := buildTokenPropagationPlugin(ctx, log) if err != nil { - return runner.Config{}, nil, err + return runner.Config{}, nil, nil, err } adkAgent, subagentSessionIDs, err := agent.CreateGoogleADKAgentWithSubagentSessionIDs(ctx, agentConfig, agentNameFromAppName(appName), stsPlugin, extraTools...) if err != nil { - return runner.Config{}, nil, fmt.Errorf("failed to create agent: %w", err) + return runner.Config{}, nil, nil, fmt.Errorf("failed to create agent: %w", err) } var adkSessionService adksession.Service @@ -76,7 +77,7 @@ func CreateRunnerConfig( if stsPlugin != nil { p, err := stsPlugin.ADKPlugin() if err != nil { - return runner.Config{}, nil, fmt.Errorf("failed to create STS ADK plugin: %w", err) + return runner.Config{}, nil, nil, fmt.Errorf("failed to create STS ADK plugin: %w", err) } if p != nil { adkPlugins = append(adkPlugins, p) @@ -93,7 +94,12 @@ func CreateRunnerConfig( }, } - return cfg, subagentSessionIDs, nil + compactionCfg, err := compaction.FromAgentConfig(agentConfig) + if err != nil { + return runner.Config{}, nil, nil, fmt.Errorf("failed to build compaction config: %w", err) + } + + return cfg, subagentSessionIDs, compactionCfg, nil } func buildTokenPropagationPlugin(ctx context.Context, log logr.Logger) (*sts.TokenPropagationPlugin, error) { diff --git a/go/adk/pkg/session/local_session.go b/go/adk/pkg/session/local_session.go index a2f43d190c..1ea2f60ab3 100644 --- a/go/adk/pkg/session/local_session.go +++ b/go/adk/pkg/session/local_session.go @@ -63,6 +63,18 @@ func (s *localSession) appendEvent(event *adksession.Event) error { return nil } +// ReplaceEvents atomically replaces the entire event slice. +// Called by the compactor after it has persisted a summary event and wants to +// drop the compacted events from the in-memory cache. +// A defensive copy is taken so callers cannot mutate the session's event list. +func (s *localSession) ReplaceEvents(events []*adksession.Event) { + s.mu.Lock() + defer s.mu.Unlock() + snap := make([]*adksession.Event, len(events)) + copy(snap, events) + s.events = snap +} + // events implements adksession.Events. type events []*adksession.Event diff --git a/go/core/internal/controller/reconciler/reconciler.go b/go/core/internal/controller/reconciler/reconciler.go index 821f95cd74..edeac97e34 100644 --- a/go/core/internal/controller/reconciler/reconciler.go +++ b/go/core/internal/controller/reconciler/reconciler.go @@ -895,10 +895,7 @@ func (a *kagentReconciler) validateRuntimeFeatures(agent v1alpha2.AgentObject) s } // Memory: ✅ Supported in Go as of PR #1444 - // Context compression: Not yet implemented in Go runtime - if decl.Context != nil && decl.Context.Compaction != nil { - unsupported = append(unsupported, "context compression/compaction (not implemented in Go runtime)") - } + // Context compression: supported in Go runtime via the compaction package. if len(unsupported) == 0 { return ""