Skip to content

Commit 8037f81

Browse files
committed
Fixes to support integration testing with external KAFKA_URI (#2838)
1 parent 3479de9 commit 8037f81

3 files changed

Lines changed: 5 additions & 11 deletions

File tree

test/integration/test_admin_integration.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def test_create_describe_delete_acls(kafka_admin_client):
9393
def test_describe_configs_broker_resource_returns_configs(kafka_admin_client):
9494
"""Tests that describe config returns configs for broker
9595
"""
96-
broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId
96+
broker_id = kafka_admin_client._client.least_loaded_node()
9797
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
9898

9999
assert len(configs) == 1
@@ -121,7 +121,7 @@ def test_describe_configs_topic_resource_returns_configs(topic, kafka_admin_clie
121121
def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_client):
122122
"""Tests that describe config returns configs for mixed resource types (topic + broker)
123123
"""
124-
broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId
124+
broker_id = kafka_admin_client._client.least_loaded_node()
125125
configs = kafka_admin_client.describe_configs([
126126
ConfigResource(ConfigResourceType.TOPIC, topic),
127127
ConfigResource(ConfigResourceType.BROKER, broker_id)])
@@ -146,14 +146,6 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
146146
kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
147147

148148

149-
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
150-
def test_describe_consumer_group_does_not_exist(kafka_admin_client):
151-
"""Tests that the describe consumer group call fails if the group coordinator is not available
152-
"""
153-
with pytest.raises(CoordinatorNotAvailableError):
154-
kafka_admin_client.describe_consumer_groups(['test'])
155-
156-
157149
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11')
158150
def test_describe_consumer_group_exists(kafka_admin_client, kafka_consumer_factory, topic):
159151
"""Tests that the describe consumer group call returns valid consumer group information

test/integration/test_sasl_integration.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import os
23
import uuid
34
import time
45

@@ -8,6 +9,7 @@
89
from kafka.protocol.metadata import MetadataRequest_v1
910
from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore
1011

12+
pytestmark = pytest.mark.skipif("KAFKA_URI" in os.environ, reason="Testing on external Kafka Broker")
1113

1214
@pytest.fixture(
1315
params=[

test/test_fetcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -929,7 +929,7 @@ def test_success_does_not_check_exception(self, client, mocker):
929929
def poll_side_effect(**kw):
930930
f = kw.get('future')
931931
if f is future1:
932-
# Succeeds but has retry partitions the bug was that code
932+
# Succeeds but has retry partitions -- the bug was that code
933933
# would fall through to check future.exception (which is None),
934934
# causing an AttributeError
935935
f.success(({tp0: offset0}, {tp1}))

0 commit comments

Comments
 (0)