Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import org.a2aproject.sdk.extras.queuemanager.replicated.core.ReplicatedEventQueueItem;
import org.a2aproject.sdk.jsonrpc.common.json.JsonUtil;
import org.a2aproject.sdk.server.PublicAgentCard;
import org.a2aproject.sdk.server.events.EventQueue;
import org.a2aproject.sdk.server.events.QueueClosedEvent;
import org.a2aproject.sdk.server.events.QueueManager;
import org.a2aproject.sdk.spec.A2AClientException;
import org.a2aproject.sdk.spec.AgentCard;
import org.a2aproject.sdk.spec.Message;
Expand Down Expand Up @@ -73,9 +71,6 @@ public class KafkaReplicationIntegrationTest {
@Channel("replicated-events-out")
Emitter<String> testEmitter;

@Inject
QueueManager queueManager;

private Client streamingClient;
private Client nonStreamingClient;
private Client pollingClient;
Expand Down Expand Up @@ -342,19 +337,23 @@ public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception {

// Set up streaming resubscription to listen for the QueueClosedEvent
CountDownLatch streamCompletedLatch = new CountDownLatch(1);
CountDownLatch firstEventLatch = new CountDownLatch(1);
AtomicBoolean streamCompleted = new AtomicBoolean(false);
AtomicBoolean streamErrored = new AtomicBoolean(false);
AtomicReference<Throwable> errorRef = new AtomicReference<>();

// Create consumer - we expect the stream to complete when QueueClosedEvent arrives
// Create consumer - signal when the first event arrives (proves HTTP connection is live),
// then ignore subsequent events while waiting for the stream to complete
BiConsumer<ClientEvent, AgentCard> consumer = (event, agentCard) -> {
// We might receive some events before the stream completes, that's fine
// The important thing is that the stream eventually completes
if (firstEventLatch.getCount() > 0) {
assertInstanceOf(TaskEvent.class, event, "First event on subscribe MUST be TaskEvent (A2A spec 3.1.6)");
}
firstEventLatch.countDown();
};
Comment thread
ehsavoie marked this conversation as resolved.

// Create error handler that captures completion
Consumer<Throwable> errorHandler = error -> {
if (error == null) {
if (error == null || isStreamClosedError(error)) {
// null error means stream completed normally
streamCompleted.set(true);
} else {
Expand All @@ -367,11 +366,13 @@ public void testQueueClosedEventTerminatesRemoteSubscribers() throws Exception {
// Subscribe to the task - this creates a streaming subscription
streamingClient.subscribeToTask(new TaskIdParams(taskId), List.of(consumer), errorHandler);
Comment thread
ehsavoie marked this conversation as resolved.

// Wait for the EventConsumer to start polling (replaces unreliable Thread.sleep)
// This ensures the consumer is ready to receive the QueueClosedEvent
EventQueue queue = queueManager.get(taskId);
assertNotNull(queue, "Queue should exist for task " + taskId);
queueManager.awaitQueuePollerStart(queue);
// Wait until the subscription delivers its first event (the initial task snapshot).
// This proves the HTTP/SSE connection is fully established from the client's perspective
// before we send the QueueClosedEvent. Using awaitQueuePollerStart(mainQueue) is not
// reliable here because the MainQueue's one-time latch was already fired by the first
// subscription (from pollingClient.sendMessage), so it returns immediately.
assertTrue(firstEventLatch.await(15, TimeUnit.SECONDS),
"Should receive initial task snapshot from subscription");

// Now manually send a QueueClosedEvent to Kafka to simulate queue closure on another node
QueueClosedEvent closedEvent = new QueueClosedEvent(taskId);
Expand Down