Skip to content

Commit 2bb3fa1

Browse files
committed
tests for connectivity queries - always expect data
1 parent b42ff50 commit 2bb3fa1

4 files changed

Lines changed: 432 additions & 181 deletions
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
"""Tests for DownstreamClassConnectivity query.
2+
3+
Tests the query that finds downstream partner neuron classes for a given
4+
neuron class, using the pre-indexed downstream_connectivity_query Solr field.
5+
"""
6+
7+
import pytest
8+
import pandas as pd
9+
10+
from vfbquery.vfb_queries import (
11+
get_downstream_class_connectivity,
12+
DownstreamClassConnectivity_to_schema,
13+
)
14+
15+
# FBbt_00001482 = lineage NB3-2 primary interneuron — known to have
16+
# downstream_connectivity_query data in the vfb_json Solr core.
17+
TEST_CLASS = "FBbt_00001482"
18+
# A class that is unlikely to have downstream connectivity data.
19+
EMPTY_CLASS = "FBbt_00000001"
20+
21+
22+
class TestDownstreamClassConnectivityDict:
23+
"""Tests using return_dataframe=False (dict output)."""
24+
25+
@pytest.mark.integration
26+
def test_returns_results(self):
27+
result = get_downstream_class_connectivity(
28+
TEST_CLASS, return_dataframe=False, force_refresh=True
29+
)
30+
assert isinstance(result, dict)
31+
assert result["count"] > 0
32+
assert len(result["rows"]) > 0
33+
34+
@pytest.mark.integration
35+
def test_row_has_expected_keys(self):
36+
result = get_downstream_class_connectivity(
37+
TEST_CLASS, return_dataframe=False, limit=1, force_refresh=True
38+
)
39+
assert result["rows"], "Expected at least one row"
40+
row = result["rows"][0]
41+
expected_keys = {
42+
"id", "downstream_class", "total_n", "connected_n",
43+
"percent_connected", "pairwise_connections", "total_weight", "avg_weight",
44+
}
45+
assert expected_keys.issubset(row.keys())
46+
47+
@pytest.mark.integration
48+
def test_headers_present(self):
49+
result = get_downstream_class_connectivity(
50+
TEST_CLASS, return_dataframe=False, limit=1, force_refresh=True
51+
)
52+
assert "headers" in result
53+
assert "downstream_class" in result["headers"]
54+
55+
@pytest.mark.integration
56+
def test_limit_respected(self):
57+
result = get_downstream_class_connectivity(
58+
TEST_CLASS, return_dataframe=False, limit=3, force_refresh=True
59+
)
60+
assert len(result["rows"]) <= 3
61+
# count should reflect total, not the limited set
62+
assert result["count"] >= len(result["rows"])
63+
64+
@pytest.mark.integration
65+
def test_empty_class_returns_zero(self):
66+
result = get_downstream_class_connectivity(
67+
EMPTY_CLASS, return_dataframe=False, force_refresh=True
68+
)
69+
assert result["count"] == 0
70+
assert result["rows"] == []
71+
72+
73+
class TestDownstreamClassConnectivityDataFrame:
74+
"""Tests using return_dataframe=True (DataFrame output)."""
75+
76+
@pytest.mark.integration
77+
def test_returns_dataframe(self):
78+
df = get_downstream_class_connectivity(
79+
TEST_CLASS, return_dataframe=True, force_refresh=True
80+
)
81+
assert isinstance(df, pd.DataFrame)
82+
assert not df.empty
83+
84+
@pytest.mark.integration
85+
def test_dataframe_has_expected_columns(self):
86+
df = get_downstream_class_connectivity(
87+
TEST_CLASS, return_dataframe=True, limit=1, force_refresh=True
88+
)
89+
expected_cols = {
90+
"id", "downstream_class", "total_n", "connected_n",
91+
"percent_connected", "pairwise_connections", "total_weight", "avg_weight",
92+
}
93+
assert expected_cols.issubset(set(df.columns))
94+
95+
@pytest.mark.integration
96+
def test_limit_respected(self):
97+
df = get_downstream_class_connectivity(
98+
TEST_CLASS, return_dataframe=True, limit=5, force_refresh=True
99+
)
100+
assert len(df) <= 5
101+
102+
@pytest.mark.integration
103+
def test_empty_class_returns_empty_dataframe(self):
104+
df = get_downstream_class_connectivity(
105+
EMPTY_CLASS, return_dataframe=True, force_refresh=True
106+
)
107+
assert isinstance(df, pd.DataFrame)
108+
assert df.empty
109+
110+
111+
class TestDownstreamClassConnectivitySchema:
112+
def test_schema_generation(self):
113+
schema = DownstreamClassConnectivity_to_schema(
114+
"test neuron class", {"short_form": TEST_CLASS}
115+
)
116+
assert schema.query == "DownstreamClassConnectivity"
117+
assert schema.function == "get_downstream_class_connectivity"
118+
assert schema.preview == 5
119+
assert "downstream_class" in schema.preview_columns
120+
assert "percent_connected" in schema.preview_columns
Lines changed: 105 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,89 +1,118 @@
1-
#!/usr/bin/env python3
2-
"""
3-
Test suite for NeuronNeuronConnectivityQuery.
1+
"""Tests for NeuronNeuronConnectivityQuery.
42
53
Tests the query that finds neurons connected to a given neuron.
64
This implements the neuron_neuron_connectivity_query from the VFB XMI specification.
7-
8-
Test cases:
9-
1. Query execution with known neuron
10-
2. Schema generation and validation
11-
3. Term info integration (if applicable)
12-
4. Preview results validation
135
"""
146

15-
import unittest
16-
import sys
17-
import os
18-
19-
# Add the src directory to the path
20-
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
7+
import pytest
8+
import pandas as pd
219

2210
from vfbquery.vfb_queries import (
2311
get_neuron_neuron_connectivity,
2412
NeuronNeuronConnectivityQuery_to_schema,
25-
get_term_info
2613
)
2714

28-
class NeuronNeuronConnectivityTest(unittest.TestCase):
29-
"""Test suite for neuron_neuron_connectivity_query"""
30-
31-
def setUp(self):
32-
"""Set up test fixtures"""
33-
# Test neuron: LPC1 (FlyEM-HB:1775513344) [VFB_jrchk00s]
34-
self.test_neuron = "VFB_jrchk00s"
35-
36-
def test_query_execution(self):
37-
"""Test that the query executes successfully"""
38-
print(f"\n=== Testing neuron_neuron_connectivity_query execution ===")
39-
result = get_neuron_neuron_connectivity(self.test_neuron, return_dataframe=False, limit=5)
40-
self.assertIsNotNone(result, "Query should return a result")
41-
self.assertIsInstance(result, dict, "Result should be a dictionary")
42-
print(f"Query returned {result.get('count', 0)} results")
43-
if 'data' in result and len(result['data']) > 0:
44-
first_result = result['data'][0]
45-
self.assertIn('id', first_result, "Result should contain 'id' field")
46-
self.assertIn('label', first_result, "Result should contain 'label' field")
47-
print(f"First result: {first_result.get('label', 'N/A')} ({first_result.get('id', 'N/A')})")
48-
else:
49-
print("No connected neurons found (this is OK if none exist)")
15+
# VFB_jrchk00s = LPC1 (FlyEM-HB:1775513344) — known to have connectivity data.
16+
TEST_NEURON = "VFB_jrchk00s"
17+
18+
19+
class TestNeuronNeuronConnectivityDict:
20+
"""Tests using return_dataframe=False (dict output)."""
21+
22+
@pytest.mark.integration
23+
def test_returns_results(self):
24+
result = get_neuron_neuron_connectivity(
25+
TEST_NEURON, return_dataframe=False
26+
)
27+
assert isinstance(result, dict)
28+
assert result["count"] > 0
29+
assert len(result["rows"]) > 0
30+
31+
@pytest.mark.integration
32+
def test_row_has_expected_keys(self):
33+
result = get_neuron_neuron_connectivity(
34+
TEST_NEURON, return_dataframe=False, limit=1
35+
)
36+
assert result["rows"], "Expected at least one row"
37+
row = result["rows"][0]
38+
expected_keys = {"id", "label", "outputs", "inputs", "tags"}
39+
assert expected_keys.issubset(row.keys())
40+
41+
@pytest.mark.integration
42+
def test_headers_present(self):
43+
result = get_neuron_neuron_connectivity(
44+
TEST_NEURON, return_dataframe=False, limit=1
45+
)
46+
assert "headers" in result
47+
assert "label" in result["headers"]
48+
assert "outputs" in result["headers"]
49+
assert "inputs" in result["headers"]
50+
51+
@pytest.mark.integration
52+
def test_limit_respected(self):
53+
result = get_neuron_neuron_connectivity(
54+
TEST_NEURON, return_dataframe=False, limit=3
55+
)
56+
assert len(result["rows"]) <= 3
57+
assert result["count"] >= len(result["rows"])
58+
59+
@pytest.mark.integration
60+
def test_direction_upstream(self):
61+
all_result = get_neuron_neuron_connectivity(
62+
TEST_NEURON, return_dataframe=False
63+
)
64+
up_result = get_neuron_neuron_connectivity(
65+
TEST_NEURON, return_dataframe=False, direction='upstream'
66+
)
67+
assert up_result["count"] > 0
68+
assert up_result["count"] <= all_result["count"]
69+
70+
@pytest.mark.integration
71+
def test_direction_downstream(self):
72+
all_result = get_neuron_neuron_connectivity(
73+
TEST_NEURON, return_dataframe=False
74+
)
75+
down_result = get_neuron_neuron_connectivity(
76+
TEST_NEURON, return_dataframe=False, direction='downstream'
77+
)
78+
assert down_result["count"] > 0
79+
assert down_result["count"] <= all_result["count"]
80+
81+
82+
class TestNeuronNeuronConnectivityDataFrame:
83+
"""Tests using return_dataframe=True (DataFrame output)."""
84+
85+
@pytest.mark.integration
86+
def test_returns_dataframe(self):
87+
df = get_neuron_neuron_connectivity(
88+
TEST_NEURON, return_dataframe=True
89+
)
90+
assert isinstance(df, pd.DataFrame)
91+
assert not df.empty
92+
93+
@pytest.mark.integration
94+
def test_dataframe_has_expected_columns(self):
95+
df = get_neuron_neuron_connectivity(
96+
TEST_NEURON, return_dataframe=True, limit=1
97+
)
98+
expected_cols = {"id", "label", "outputs", "inputs", "tags"}
99+
assert expected_cols.issubset(set(df.columns))
100+
101+
@pytest.mark.integration
102+
def test_limit_respected(self):
103+
df = get_neuron_neuron_connectivity(
104+
TEST_NEURON, return_dataframe=True, limit=5
105+
)
106+
assert len(df) <= 5
107+
50108

109+
class TestNeuronNeuronConnectivitySchema:
51110
def test_schema_generation(self):
52-
"""Test schema function generates correct structure"""
53-
print(f"\n=== Testing neuron_neuron_connectivity_query schema generation ===")
54-
test_name = "LPC1"
55-
test_takes = {"short_form": self.test_neuron}
56-
schema = NeuronNeuronConnectivityQuery_to_schema(test_name, test_takes)
57-
self.assertIsNotNone(schema, "Schema should not be None")
58-
self.assertEqual(schema.query, "NeuronNeuronConnectivityQuery", "Query name should match")
59-
self.assertEqual(schema.label, f"Neurons connected to {test_name}", "Label should be formatted correctly")
60-
self.assertEqual(schema.function, "get_neuron_neuron_connectivity", "Function name should match")
61-
self.assertEqual(schema.preview, 5, "Preview should be 5")
62-
expected_columns = ["id", "label", "outputs", "inputs", "tags"]
63-
self.assertEqual(schema.preview_columns, expected_columns, f"Preview columns should be {expected_columns}")
64-
print(f"Schema generated successfully: {schema.label}")
65-
66-
def test_preview_results(self):
67-
"""Test that preview results are properly formatted"""
68-
print(f"\n=== Testing preview results ===")
69-
result = get_neuron_neuron_connectivity(self.test_neuron, return_dataframe=False, limit=3)
70-
self.assertIsNotNone(result, "Query should return a result")
71-
if 'data' in result and len(result['data']) > 0:
72-
first_result = result['data'][0]
73-
self.assertIn('id', first_result, "Preview result should have 'id'")
74-
self.assertIn('label', first_result, "Preview result should have 'label'")
75-
print(f"First preview result: {first_result.get('label', 'N/A')}")
76-
else:
77-
print("No preview results available (this is OK if no connected neurons exist)")
78-
79-
80-
def run_tests():
81-
"""Run the test suite"""
82-
suite = unittest.TestLoader().loadTestsFromTestCase(NeuronNeuronConnectivityTest)
83-
runner = unittest.TextTestRunner(verbosity=2)
84-
result = runner.run(suite)
85-
return result.wasSuccessful()
86-
87-
if __name__ == '__main__':
88-
success = run_tests()
89-
sys.exit(0 if success else 1)
111+
schema = NeuronNeuronConnectivityQuery_to_schema(
112+
"LPC1", {"short_form": TEST_NEURON}
113+
)
114+
assert schema.query == "NeuronNeuronConnectivityQuery"
115+
assert schema.function == "get_neuron_neuron_connectivity"
116+
assert schema.label == "Neurons connected to LPC1"
117+
assert schema.preview == 5
118+
assert schema.preview_columns == ["id", "label", "outputs", "inputs", "tags"]

0 commit comments

Comments
 (0)