From 725ae749a6d742b2831584cf1bbe28e425db4e0b Mon Sep 17 00:00:00 2001 From: bitpathfinder Date: Fri, 22 May 2026 16:33:16 +0200 Subject: [PATCH] Fix on_up() destroying healthy pool after replace-with-same-IP When a node is replaced with the same IP, the driver receives both TOPOLOGY_CHANGE NEW_NODE and STATUS_CHANGE UP events. The NEW_NODE handler runs first, replacing the old host and establishing a new pool. The STATUS_CHANGE UP handler fires later with a stale reference to the old host object. Because Host.__eq__/__hash__ are endpoint-based, the stale on_up() tears down the new host's pool, causing a brief window where queries fail with NoHostAvailable. Add guards in on_up(): 1. If the host has been replaced in metadata (different object, same endpoint, new host already up), skip processing entirely. 2. Per-session: if a session already has a healthy (non-shutdown) pool for this host, skip remove/rebuild for that session only. The rest of on_up() bookkeeping (reconnector cancel, query preparation, LBP and control connection notifications) still runs unconditionally. Both guards reset _currently_handling_node_up under host.lock and use host.set_up() (which resets the conviction policy), consistent with the existing cleanup paths. Refs: SCYLLADB-833 --- cassandra/cluster.py | 18 ++++++ tests/unit/test_cluster.py | 112 +++++++++++++++++++++++++++++++++++++ 2 files changed, 130 insertions(+) diff --git a/cassandra/cluster.py b/cassandra/cluster.py index 5e7a68bc1c..81edc4f3af 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -1930,6 +1930,16 @@ def on_up(self, host): have_future = False futures = set() try: + # Guard against stale on_up destroying a healthy pool. + # Case 1: Host was replaced in metadata (different object, same endpoint). + current_host = self.metadata.get_host(host.endpoint) + if current_host is not None and current_host is not host and current_host.is_up: + log.debug("Host %s has been replaced by %s which is already up; " + "skipping stale on_up handling", host, current_host) + with host.lock: + host._currently_handling_node_up = False + return + log.info("Host %s may be up; will prepare queries and open connection pool", host) reconnector = host.get_and_set_reconnection_handler(None) @@ -1942,6 +1952,11 @@ def on_up(self, host): log.debug("Done preparing all queries for host %s, ", host) for session in tuple(self.sessions): + pool = session._pools.get(host) + if pool and not pool.is_shutdown: + log.debug("Session %s already has a healthy pool for host %s; " + "skipping remove/rebuild", session, host) + continue session.remove_pool(host) log.debug("Signalling to load balancing policies that host %s is up", host) @@ -1955,6 +1970,9 @@ def on_up(self, host): futures_results = [] callback = partial(self._on_up_future_completed, host, futures, futures_results, futures_lock) for session in tuple(self.sessions): + pool = session._pools.get(host) + if pool and not pool.is_shutdown: + continue future = session.add_or_renew_pool(host, is_host_addition=False) if future is not None: have_future = True diff --git a/tests/unit/test_cluster.py b/tests/unit/test_cluster.py index a4f0ebc4d3..4044d4c0dd 100644 --- a/tests/unit/test_cluster.py +++ b/tests/unit/test_cluster.py @@ -719,3 +719,115 @@ def test_no_warning_adding_lbp_ep_to_cluster_with_contact_points(self): ) patched_logger.warning.assert_not_called() + + +class TestOnUpStaleHost(unittest.TestCase): + """ + Tests for on_up() guards after a replace-with-same-IP scenario. + + Verifies that: + - A stale host reference (replaced in metadata) triggers a full skip. + - Per-session healthy pools are preserved (no teardown/rebuild), while + the rest of on_up() bookkeeping (LBP, control connection) still runs. + - Sessions missing a pool still get the normal rebuild path. + """ + + def _make_cluster(self, sessions=None): + """Create a minimal Cluster object without connecting.""" + from threading import Lock + cluster = object.__new__(Cluster) + cluster.is_shutdown = False + cluster.metadata = Mock() + cluster.sessions = sessions or set() + cluster.profile_manager = Mock() + cluster.control_connection = Mock() + cluster._listeners = set() + cluster._listener_lock = Lock() + return cluster + + def test_on_up_skips_when_host_replaced_in_metadata(self): + """ + If a NEW_NODE event already replaced the old host with a new one + (same endpoint, different host_id), on_up(old_host) should bail out. + """ + from cassandra.connection import DefaultEndPoint + endpoint = DefaultEndPoint('127.0.0.1') + + old_host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4()) + old_host.is_up = False + old_host._currently_handling_node_up = False + + new_host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4()) + new_host.is_up = True + + cluster = self._make_cluster() + cluster.metadata.get_host = Mock(return_value=new_host) + + cluster.on_up(old_host) + + self.assertFalse(old_host._currently_handling_node_up) + + def test_on_up_skips_rebuild_when_session_has_healthy_pool(self): + """ + If on_add already created a healthy pool in a session, a subsequent + on_up should not tear it down and rebuild, but should still run + the rest of the on_up bookkeeping (LBP, control connection). + """ + from cassandra.connection import DefaultEndPoint + endpoint = DefaultEndPoint('127.0.0.1') + + host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4()) + host.is_up = False + host._currently_handling_node_up = False + + mock_pool = Mock() + mock_pool.is_shutdown = False + mock_session = Mock() + mock_session._pools = {host: mock_pool} + + cluster = self._make_cluster(sessions={mock_session}) + cluster.metadata.get_host = Mock(return_value=host) + cluster.profile_manager.distance = Mock(return_value=HostDistance.IGNORED) + + cluster.on_up(host) + + mock_session.remove_pool.assert_not_called() + mock_session.add_or_renew_pool.assert_not_called() + cluster.profile_manager.on_up.assert_called_once_with(host) + cluster.control_connection.on_up.assert_called_once_with(host) + self.assertTrue(host.is_up) + self.assertFalse(host._currently_handling_node_up) + + def test_on_up_only_rebuilds_sessions_missing_pool(self): + """ + If only some sessions have a pool, on_up should only tear down/rebuild + for sessions that lack a healthy pool. + """ + from cassandra.connection import DefaultEndPoint + endpoint = DefaultEndPoint('127.0.0.1') + + host = Host(endpoint, conviction_policy_factory=Mock(), host_id=uuid.uuid4()) + host.is_up = False + host._currently_handling_node_up = False + + mock_pool = Mock() + mock_pool.is_shutdown = False + session_with_pool = Mock() + session_with_pool._pools = {host: mock_pool} + + session_without_pool = Mock() + session_without_pool._pools = {} + session_without_pool.add_or_renew_pool = Mock(return_value=None) + + cluster = self._make_cluster(sessions={session_with_pool, session_without_pool}) + cluster.metadata.get_host = Mock(return_value=host) + cluster.profile_manager.distance = Mock(return_value=HostDistance.IGNORED) + + cluster.on_up(host) + + # Session with healthy pool should NOT be torn down + session_with_pool.remove_pool.assert_not_called() + session_with_pool.add_or_renew_pool.assert_not_called() + # Session without pool should get remove + rebuild + session_without_pool.remove_pool.assert_called_once_with(host) + session_without_pool.add_or_renew_pool.assert_called_once_with(host, is_host_addition=False)