Skip to content

Commit e7b1dff

Browse files
authored
Merge pull request #22 from napakalas/issue-#16
Add pg_import.py utility to import map-specific connectivity into competency database
2 parents 567ddc2 + d094a8c commit e7b1dff

1 file changed

Lines changed: 324 additions & 0 deletions

File tree

tools/pg_import.py

Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
#===============================================================================
2+
#
3+
# Flatmap viewer and annotation tools
4+
#
5+
# Copyright (c) 2019-21 David Brooks
6+
#
7+
# Licensed under the Apache License, Version 2.0 (the "License");
8+
# you may not use this file except in compliance with the License.
9+
# You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing, software
14+
# distributed under the License is distributed on an "AS IS" BASIS,
15+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
# See the License for the specific language governing permissions and
17+
# limitations under the License.
18+
#
19+
#===============================================================================
20+
21+
import json
22+
import logging
23+
import os
24+
from typing import Any, Optional
25+
from tqdm import tqdm
26+
import pathlib
27+
28+
#===============================================================================
29+
30+
import psycopg as pg
31+
from landez.sources import MBTilesReader
32+
33+
#===============================================================================
34+
35+
from mapknowledge import KnowledgeStore, NERVE_TYPE
36+
37+
#===============================================================================
38+
39+
PG_DATABASE = 'map-knowledge'
40+
41+
DEFAULT_STORE = 'knowledgebase.db'
42+
43+
KNOWLEDGE_USER = os.environ.get('KNOWLEDGE_USER')
44+
KNOWLEDGE_HOST = os.environ.get('KNOWLEDGE_HOST', 'localhost:5432')
45+
FLATMAP_ROOT = os.environ.get('FLATMAP_ROOT')
46+
47+
#===============================================================================
48+
49+
def clean_source(source: str) -> str:
50+
if source.endswith('-npo'):
51+
return source[:-4]
52+
return source
53+
54+
#===============================================================================
55+
56+
type KnowledgeDict = dict[str, Any]
57+
58+
class KnowledgeList:
59+
def __init__(self, source: str, knowledge: Optional[list[KnowledgeDict]]=None):
60+
self.__source = clean_source(source)
61+
if knowledge is None:
62+
self.__knowledge: list[KnowledgeDict] = []
63+
else:
64+
self.__knowledge = knowledge
65+
66+
@property
67+
def source(self):
68+
return self.__source
69+
70+
@property
71+
def knowledge(self):
72+
return self.__knowledge
73+
74+
def append(self, knowledge: KnowledgeDict):
75+
self.__knowledge.append(knowledge)
76+
77+
#===============================================================================
78+
79+
NODE_PHENOTYPES = [
80+
'ilxtr:hasSomaLocatedIn',
81+
'ilxtr:hasAxonPresynapticElementIn',
82+
'ilxtr:hasAxonSensorySubcellularElementIn',
83+
'ilxtr:hasAxonLeadingToSensorySubcellularElementIn',
84+
'ilxtr:hasAxonLocatedIn',
85+
'ilxtr:hasDendriteLocatedIn',
86+
]
87+
NODE_TYPES = [
88+
NERVE_TYPE,
89+
]
90+
91+
def setup_anatomical_types(cursor):
92+
#==================================
93+
cursor.execute('DELETE FROM anatomical_types at WHERE NOT EXISTS (SELECT 1 FROM path_node_types pt WHERE at.type_id = pt.type_id)')
94+
cursor.executemany('INSERT INTO anatomical_types (type_id, label) VALUES (%s, %s) ON CONFLICT DO NOTHING',
95+
[(type, type) for type in NODE_PHENOTYPES + NODE_TYPES])
96+
97+
#===============================================================================
98+
99+
def delete_source_from_tables(cursor, source: str):
100+
#==================================================
101+
cursor.execute('DELETE FROM path_taxons WHERE source_id=%s', (source, ))
102+
cursor.execute('DELETE FROM feature_evidence WHERE source_id=%s', (source, ))
103+
cursor.execute('DELETE FROM path_edges WHERE source_id=%s', (source, ))
104+
cursor.execute('DELETE FROM path_features WHERE source_id=%s', (source, ))
105+
cursor.execute('DELETE FROM path_node_features WHERE source_id=%s', (source, ))
106+
cursor.execute('DELETE FROM path_forward_connections WHERE source_id=%s', (source, ))
107+
cursor.execute('DELETE FROM path_node_types WHERE source_id=%s', (source, ))
108+
cursor.execute('DELETE FROM path_phenotypes WHERE source_id=%s', (source, ))
109+
cursor.execute('DELETE FROM path_properties WHERE source_id=%s', (source, ))
110+
cursor.execute('DELETE FROM path_nodes WHERE source_id=%s', (source, ))
111+
cursor.execute('DELETE FROM feature_types WHERE source_id=%s', (source, ))
112+
cursor.execute('DELETE FROM feature_terms WHERE source_id=%s', (source, ))
113+
114+
def update_connectivity(cursor, knowledge: KnowledgeList):
115+
#=========================================================
116+
source = knowledge.source
117+
progress_bar = tqdm(total=len(knowledge.knowledge),
118+
unit='records', ncols=80,
119+
bar_format='{l_bar}{bar}| {n_fmt}/{total_fmt}')
120+
for record in knowledge.knowledge:
121+
if source == clean_source(record.get('source', '')):
122+
if (connectivity := record.get('connectivity')) is not None:
123+
path_id = record['id']
124+
125+
# Taxons
126+
taxons = record.get('taxons', ['NCBITaxon:40674'])
127+
cursor.executemany('INSERT INTO taxons (taxon_id) VALUES (%s) ON CONFLICT DO NOTHING',
128+
((taxon,) for taxon in taxons))
129+
130+
# Path taxons
131+
with cursor.copy("COPY path_taxons (source_id, path_id, taxon_id) FROM STDIN") as copy:
132+
for taxon in taxons:
133+
copy.write_row((source, path_id, taxon))
134+
135+
# Evidence
136+
evidence = record.get('references', [])
137+
cursor.executemany('INSERT INTO evidence (evidence_id) VALUES (%s) ON CONFLICT DO NOTHING',
138+
((evidence,) for evidence in evidence))
139+
140+
# Path evidence
141+
with cursor.copy("COPY feature_evidence (source_id, term_id, evidence_id) FROM STDIN") as copy:
142+
for evidence_id in evidence:
143+
copy.write_row((source, path_id, evidence_id))
144+
145+
# Nodes
146+
nodes = set(json.dumps(node) for (node, _) in connectivity) | set(json.dumps(node) for (_, node) in connectivity)
147+
cursor.executemany('INSERT INTO path_nodes (source_id, path_id, node_id) VALUES (%s, %s, %s) ON CONFLICT DO NOTHING',
148+
((source, path_id, node,) for node in nodes))
149+
150+
# Node features
151+
node_features = [ (source, path_id, node, feature)
152+
for (node, features) in [(node, json.loads(node)) for node in nodes]
153+
for feature in [features[0]] + features[1] ]
154+
cursor.executemany('INSERT INTO path_node_features (source_id, path_id, node_id, feature_id) VALUES (%s, %s, %s, %s) ON CONFLICT DO NOTHING',
155+
node_features)
156+
157+
# Path edges
158+
path_nodes = [ (source, path_id, json.dumps(node_0), json.dumps(node_1)) for (node_0, node_1) in connectivity ]
159+
with cursor.copy("COPY path_edges (source_id, path_id, node_0, node_1) FROM STDIN") as copy:
160+
for row in path_nodes:
161+
copy.write_row(row)
162+
163+
# Path features
164+
path_features = [(source, path_id, feature) for feature in set([nf[3] for nf in node_features])]
165+
with cursor.copy("COPY path_features (source_id, path_id, feature_id) FROM STDIN") as copy:
166+
for row in path_features:
167+
copy.write_row(row)
168+
169+
# Forward connections
170+
forward_connections = [(source, path_id, forward_path) for forward_path in record.get('forward-connections', [])]
171+
with cursor.copy("COPY path_forward_connections (source_id, path_id, forward_path_id) FROM STDIN") as copy:
172+
for row in forward_connections:
173+
copy.write_row(row)
174+
175+
# Path node types
176+
node_types = []
177+
node_phenotypes = record.get('node-phenotypes', {})
178+
for type, nodes in node_phenotypes.items():
179+
node_types.extend([(source, path_id, json.dumps(node), type)
180+
for node in nodes])
181+
node_types.extend([(source, path_id, json.dumps(node), NERVE_TYPE)
182+
for node in record.get('nerves', [])])
183+
with cursor.copy("COPY path_node_types (source_id, path_id, node_id, type_id) FROM STDIN") as copy:
184+
for row in node_types:
185+
copy.write_row(row)
186+
187+
# Path phenotypes
188+
with cursor.copy("COPY path_phenotypes (source_id, path_id, phenotype) FROM STDIN") as copy:
189+
for phenotype in record.get('phenotypes', []):
190+
copy.write_row((source, path_id, phenotype))
191+
192+
# General path properties
193+
cursor.execute('INSERT INTO path_properties (source_id, path_id, biological_sex, alert, disconnected) VALUES (%s, %s, %s, %s, %s)',
194+
(source, path_id, record.get('biologicalSex'), record.get('alert'), record.get('pathDisconnected')))
195+
196+
progress_bar.update(1)
197+
progress_bar.close()
198+
199+
def update_features(cursor, knowledge: KnowledgeList):
200+
#=====================================================
201+
source = knowledge.source
202+
cursor.execute('DELETE FROM feature_terms WHERE source_id=%s', (source, ))
203+
204+
for record in knowledge.knowledge:
205+
if source == clean_source(record.get('source', '')):
206+
207+
# Feature terms
208+
with cursor.copy("COPY feature_terms (source_id, term_id, label, description) FROM STDIN") as copy:
209+
copy.write_row([source, record['id'], record.get('label'), record.get('long-label')])
210+
211+
# Feature types
212+
with cursor.copy("COPY feature_types (source_id, term_id, type_id) FROM STDIN") as copy:
213+
if (term_type:=record.get('type')) is not None:
214+
copy.write_row([source, record['id'], term_type])
215+
216+
def update_knowledge_source(cursor, source):
217+
#===========================================
218+
cursor.execute('INSERT INTO knowledge_sources (source_id) VALUES (%s) ON CONFLICT DO NOTHING', (source,))
219+
220+
#===============================================================================
221+
222+
def pg_import(uuid):
223+
#=======================================
224+
knowledge = map_knowledge(uuid)
225+
user = f'{KNOWLEDGE_USER}@' if KNOWLEDGE_USER else ''
226+
with pg.connect(f'postgresql://{user}{KNOWLEDGE_HOST}/{PG_DATABASE}') as db:
227+
with db.cursor() as cursor:
228+
delete_source_from_tables(cursor, knowledge.source)
229+
setup_anatomical_types(cursor)
230+
update_knowledge_source(cursor, knowledge.source)
231+
update_features(cursor, knowledge)
232+
update_connectivity(cursor, knowledge)
233+
db.commit()
234+
235+
#===============================================================================
236+
237+
def map_knowledge(uuid) -> KnowledgeList:
238+
#========================================
239+
mbtiles = pathlib.Path(FLATMAP_ROOT) / uuid / 'index.mbtiles'
240+
if not mbtiles.exists():
241+
raise FileNotFoundError(f"MBTiles file not found at: {mbtiles}")
242+
243+
store = KnowledgeStore(
244+
store_directory = FLATMAP_ROOT,
245+
knowledge_base = DEFAULT_STORE,
246+
read_only = False,
247+
use_sckan = False
248+
)
249+
250+
reader = MBTilesReader(mbtiles)
251+
252+
# Load metadata
253+
row = reader._query("SELECT value FROM metadata WHERE name='metadata'").fetchone()
254+
metadata = json.loads(row[0])
255+
if uuid != metadata.get('uuid'):
256+
raise IOError("Flatmap source UUID doesn't match the provided UUID.")
257+
258+
sckan_release = metadata.get('connectivity', {}).get('npo', {}).get('release')
259+
260+
# Load pathways
261+
row = reader._query("SELECT value FROM metadata WHERE name='pathways'").fetchone()
262+
pathways = json.loads(row[0]).get('paths', {})
263+
knowledge_terms = {}
264+
265+
for path_id, path in pathways.items():
266+
if 'connectivity' not in path:
267+
continue
268+
269+
db_knowledge = store.entity_knowledge(path_id, sckan_release)
270+
knowledge_terms[path_id] = {
271+
'id': path_id,
272+
'label': db_knowledge['label'],
273+
'long-label': db_knowledge['long-label'],
274+
'connectivity': path['connectivity'],
275+
'taxons': [metadata.get('taxon', '')],
276+
'forward-connections': path['forward-connections'],
277+
'node-phenotypes': path['node-phenotypes'],
278+
'nerves': path.get('node-nerves', []),
279+
'pathDisconnected': db_knowledge['pathDisconnected'],
280+
'phenotypes': db_knowledge.get('phenotypes', []),
281+
'source': uuid,
282+
'references': db_knowledge.get('references', []),
283+
'alert': db_knowledge.get('alert', [])
284+
}
285+
286+
# Load annotations
287+
row = reader._query("SELECT value FROM metadata WHERE name='annotations'").fetchone()
288+
annotations = json.loads(row[0])
289+
290+
for feature in annotations.values():
291+
model = feature.get('models')
292+
if model and model not in knowledge_terms:
293+
db_knowledge = store.entity_knowledge(model, sckan_release)
294+
knowledge_terms[model] = {
295+
'id': model,
296+
'label': db_knowledge['label'],
297+
'source': uuid,
298+
**({'type': db_knowledge['type']} if 'type' in db_knowledge else {})
299+
}
300+
301+
return KnowledgeList(uuid, list(knowledge_terms.values()))
302+
303+
#===============================================================================
304+
305+
def main():
306+
import argparse
307+
308+
parser = argparse.ArgumentParser(description='Import Flatmap knowledge into a PostgreSQL knowledge store.')
309+
parser.add_argument('-q', '--quiet', action='store_true', help='Suppress INFO log messages')
310+
parser.add_argument('--uuid', required=True, help='Map UUID')
311+
312+
args = parser.parse_args()
313+
314+
if not args.quiet:
315+
logging.basicConfig(level=logging.INFO)
316+
pg_import(args.uuid)
317+
318+
#===============================================================================
319+
320+
if __name__ == '__main__':
321+
#=========================
322+
main()
323+
324+
#===============================================================================

0 commit comments

Comments
 (0)