Skip to content

Commit 40b8a7d

Browse files
committed
Fixed beataml Oom kill. Cleaned up and added retries to pubchem_retreival.
1 parent cab4a86 commit 40b8a7d

2 files changed

Lines changed: 124 additions & 56 deletions

File tree

build/beatAML/GetBeatAML.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -467,10 +467,10 @@ def map_and_combine(df, data_type, entrez_map_file, improve_map_file, map_file=N
467467
how='left')
468468
mapped_df.insert(0, 'improve_sample_id', mapped_df.pop('improve_sample_id'))
469469

470-
print(mapped_df.to_string())
471-
mapped_df['improve_sample_id'] = mapped_df['improve_sample_id'].astype(int)
472-
mapped_df['entrez_id'] = mapped_df['entrez_id'].fillna(0)
473-
mapped_df['entrez_id'] = mapped_df['entrez_id'].astype(int)
470+
# Replace NaNs, round values, and convert to integers for specified columns
471+
columns_to_convert = ['improve_sample_id', 'entrez_id']
472+
mapped_df[columns_to_convert] = mapped_df[columns_to_convert].fillna(0).round().astype('int32')
473+
474474
mapped_df['source'] = 'synapse'
475475
mapped_df['study'] = 'BeatAML'
476476
mapped_df =mapped_df.drop_duplicates()

build/utils/pubchem_retrieval.py

Lines changed: 120 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import signal
88
import sys
99

10+
# Global variables
1011
request_counter = 0
1112
last_request_time = time.time()
1213
lock = threading.Lock()
@@ -16,7 +17,22 @@
1617
existing_structures = dict()
1718
existing_pubchemids = set()
1819

19-
def fetch_url(url):
20+
21+
def fetch_url(url, retries=3, backoff_factor=1):
22+
"""
23+
Fetches a URL with retry mechanism and backoff.
24+
25+
Parameters:
26+
- url (str): The URL to fetch.
27+
- retries (int): Number of retry attempts.
28+
- backoff_factor (float): Factor to calculate backoff time.
29+
30+
Returns:
31+
- dict: JSON response if successful.
32+
33+
Raises:
34+
- Exception: If all retry attempts fail.
35+
"""
2036
global last_request_time, lock, request_counter
2137
with lock:
2238
current_time = time.time()
@@ -35,19 +51,35 @@ def fetch_url(url):
3551

3652
request_counter += 1
3753

38-
# Proceed with the request
39-
response = requests.get(url)
40-
if response.status_code == 200:
41-
return response.json()
42-
else:
43-
raise Exception(f"Failed to fetch {url}")
54+
for attempt in range(retries + 1): # Total attempts = retries + 1
55+
try:
56+
response = requests.get(url, timeout=10)
57+
if response.status_code == 200:
58+
return response.json()
59+
else:
60+
raise Exception(f"Failed to fetch {url}, Status Code: {response.status_code}")
61+
except Exception as exc:
62+
if attempt < retries:
63+
wait = backoff_factor * (2 ** attempt)
64+
print(f"Attempt {attempt + 1} for URL {url} failed with error: {exc}. Retrying in {wait} seconds...")
65+
time.sleep(wait)
66+
else:
67+
print(f"All {retries + 1} attempts failed for URL {url}.")
68+
raise
69+
70+
71+
def retrieve_drug_info(compound, ignore_chems, isname=True):
72+
"""
73+
Retrieves information for a given compound from PubChem.
74+
75+
Parameters:
76+
- compound (str or int): Name or CID of the compound.
77+
- ignore_chems (str): File path to log ignored compounds.
78+
- isname (bool): True if the compound is a name, False if it's a CID.
4479
45-
def retrieve_drug_info(compound,ignore_chems,isname=True):
46-
'''
47-
compound_name: name of compound or CID
48-
ignore_chems: list of chemicals to ignore
49-
isname: true if compound is name, false if is CID
50-
'''
80+
Returns:
81+
- list: List of dictionaries containing drug information, or None if unsuccessful.
82+
"""
5183
global improve_drug_id, existing_synonyms, existing_structures
5284
if pd.isna(compound):
5385
return None
@@ -62,7 +94,7 @@ def retrieve_drug_info(compound,ignore_chems,isname=True):
6294
"properties": f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/CID/{compound}/property/CanonicalSMILES,InChIKey,MolecularFormula,MolecularWeight/JSON",
6395
"synonyms": f"https://pubchem.ncbi.nlm.nih.gov/rest/pug/compound/CID/{compound}/synonyms/JSON"
6496
}
65-
97+
6698
with ThreadPoolExecutor(max_workers=4) as executor:
6799
future_to_url = {executor.submit(fetch_url, url): key for key, url in urls.items()}
68100
results = {}
@@ -74,7 +106,7 @@ def retrieve_drug_info(compound,ignore_chems,isname=True):
74106
results[key] = data
75107
except Exception as exc:
76108
print(f'{compound} generated an exception: {exc}')
77-
with open(ignore_chems,"a") as f:
109+
with open(ignore_chems, "a") as f:
78110
f.write(f"{compound}\n")
79111
return None
80112

@@ -89,119 +121,155 @@ def retrieve_drug_info(compound,ignore_chems,isname=True):
89121
else:
90122
sl = synonyms_list
91123
for synonym in sl:
92-
# print(synonym)
93124
synonym_lower = str(synonym).lower()
94-
# if synonym_lower in existing_synonyms: ### THIS IS CAUSING THE LOOP TO END BEFORE IT GETS TO THE COMPOUND NAME
95-
# return None
96125
if synonym_lower not in existing_synonyms:
97-
new_syns.add(str(synonym).lower())
98-
if len(new_syns) == 0: #JUST BE SURE WE HAVE NO NEW SYNONYMS BEFORE RETURNING
126+
new_syns.add(synonym_lower)
127+
if len(new_syns) == 0: # Ensure there are new synonyms before proceeding
99128
return None
100-
for synonym in new_syns:#synonyms_list + [compound]: ##NOW JUST ADD THOSE
129+
for synonym in new_syns:
101130
synonym_lower = str(synonym).lower()
102131
existing_synonyms.add(synonym_lower)
103-
104132

105-
###now check for structure
133+
# Check for structure
106134
if properties['CanonicalSMILES'] in existing_structures.keys():
107-
print('found structure for '+str(compound))
135+
print(f'Found structure for {compound}')
108136
SMI_assignment = existing_structures[properties['CanonicalSMILES']]
109137
else:
110138
improve_drug_id += 1
111139
SMI_assignment = f"SMI_{improve_drug_id}"
112140
existing_structures[properties['CanonicalSMILES']] = SMI_assignment
113-
141+
114142
#print(new_syns)
115143
data_for_tsv = [{
116144
'improve_drug_id': SMI_assignment,
117145
'name': str(synonym).lower(),
118146
**properties
119-
} for synonym in new_syns]##synonyms_list]
147+
} for synonym in new_syns]
120148

121149
return data_for_tsv
122150
else:
123151
return None
124152

125-
def fetch_data_for_batch(batch,ignore_chems,isname):
153+
154+
def fetch_data_for_batch(batch, ignore_chems, isname):
155+
"""
156+
Fetches drug information for a batch of compounds.
157+
158+
Parameters:
159+
- batch (list): List of compound names or CIDs.
160+
- ignore_chems (str): File path to log ignored compounds.
161+
- isname (bool): True if compounds are names, False if they're CIDs.
162+
163+
Returns:
164+
- list: Combined list of drug information for the batch.
165+
"""
126166
all_data = []
127167
for compound_name in batch:
128-
data = retrieve_drug_info(compound_name,ignore_chems,isname)
168+
data = retrieve_drug_info(compound_name, ignore_chems, isname)
129169
if data:
130170
all_data.extend(data)
131171
return all_data
132172

173+
133174
def read_existing_data(output_filename):
134-
global improve_drug_id,existing_synonyms,existing_structures
175+
"""
176+
Reads existing data from the output file to prevent duplication.
177+
178+
Parameters:
179+
- output_filename (str): File path to the output file.
180+
181+
Returns:
182+
- None
183+
"""
184+
global improve_drug_id, existing_synonyms, existing_structures
135185
try:
136186
df = pd.read_csv(output_filename, sep='\t', quoting=3)
137187
existing_synonyms = set([str(a).lower() for a in set(df.chem_name)])
138-
existing_pubchemids = set([str(a) for a in df['pubchem_id']])
188+
existing_pubchemids = set([str(a) for a in df['pubchem_id']])
139189
max_id = df['improve_drug_id'].str.extract(r'SMI_(\d+)').astype(float).max()
140190
improve_drug_id = int(max_id[0]) + 1 if pd.notna(max_id[0]) else 1
141-
existing_structures = {row['canSMILES']:row['improve_drug_id'] for index,row in df.iterrows()}
142-
print('Read in '+str(len(existing_synonyms))+' drug names and '+str(len(existing_pubchemids))+' pubchem ids')
191+
existing_structures = {row['canSMILES']: row['improve_drug_id'] for _, row in df.iterrows()}
192+
print(f'Read in {len(existing_synonyms)} drug names and {len(existing_pubchemids)} pubchem IDs')
143193
except FileNotFoundError:
144194
return {}
145195

146196

147197
def timeout_handler(signum, frame):
198+
"""
199+
Handles timeouts by setting the global `should_continue` flag to False.
200+
"""
148201
global should_continue
149202
print("Time limit reached, exiting gracefully...")
150203
should_continue = False
151204

152-
# Call this function from other scripts.
153-
def update_dataframe_and_write_tsv(unique_names, output_filename="drugs.tsv",ignore_chems="ignore_chems.txt", batch_size=1,isname=True,time_limit=5*60*60):
205+
206+
def update_dataframe_and_write_tsv(unique_names, output_filename="drugs.tsv", ignore_chems="ignore_chems.txt",
207+
batch_size=1, isname=True, time_limit=48 * 60 * 60):
208+
"""
209+
Updates the data frame with drug information and writes it to a TSV file.
210+
211+
Parameters:
212+
- unique_names (iterable): List of unique compound names or CIDs.
213+
- output_filename (str): File path to the output TSV file.
214+
- ignore_chems (str): File path to log ignored compounds.
215+
- batch_size (int): Number of compounds to process in each batch.
216+
- isname (bool): True if unique_names are names, False if they're CIDs.
217+
- time_limit (int): Time limit for the script in seconds. This is a remnant of the GitHub Action CI.
218+
219+
Returns:
220+
- None
221+
"""
154222
global should_continue, existing_synonyms, existing_pubchemids
155-
#time_limit=5*60*60 # 5 hours
156223
signal.signal(signal.SIGALRM, timeout_handler)
157224
signal.alarm(time_limit)
158-
print('starting with '+str(len(unique_names))+' drug names/ids')
225+
print(f'Starting with {len(unique_names)} unique drug names/IDs')
226+
159227
try:
160-
print('reading in '+output_filename)
228+
print(f'Reading existing data from {output_filename}')
161229
read_existing_data(output_filename)
162230
if isname:
163231
unique_names = set([str(name).lower() for name in unique_names if not pd.isna(name)])
164232
unique_names = set(unique_names) - set(existing_synonyms)
165-
print('looking at '+str(len(unique_names))+' names')
233+
print(f'Looking at {len(unique_names)} names')
166234
else:
167235
unique_names = set([str(name) for name in unique_names if not pd.isna(name)])
168236
unique_names = set(unique_names) - set(existing_pubchemids)
169-
print('looking at '+str(len(unique_names))+' ids')
237+
print(f'Looking at {len(unique_names)} IDs')
170238
ignore_chem_set = set()
171239
if os.path.exists(ignore_chems):
172240
with open(ignore_chems, 'r') as file:
173241
for line in file:
174242
ignore_chem_set.add(line.strip())
175243
unique_names = list(set(unique_names) - ignore_chem_set)
176-
177-
print(f"{len(unique_names)} Drugs to search")#: {(unique_names)}")
244+
245+
print(f"{len(unique_names)} Drugs to search")
178246
for i in range(0, len(unique_names), batch_size):
179247
if not should_continue:
180248
break
181249
if unique_names[i] in existing_synonyms or unique_names[i] in existing_pubchemids:
182250
continue
183-
184-
batch = unique_names[i:i+batch_size]
185-
data = fetch_data_for_batch(batch,ignore_chems,isname)
251+
252+
batch = unique_names[i:i + batch_size]
253+
data = fetch_data_for_batch(batch, ignore_chems, isname)
186254
if data:
187255
file_exists = os.path.isfile(output_filename)
188-
mode = 'a' if file_exists else 'w'
256+
mode = 'a' if file_exists else 'w'
189257
with open(output_filename, mode) as f:
190258
if not file_exists:
191259
f.write("improve_drug_id\tchem_name\tpubchem_id\tcanSMILES\tInChIKey\tformula\tweight\n")
192260
for entry in data:
193-
f.write(f"{entry['improve_drug_id']}\t{entry['name']}\t{entry.get('CID', '')}\t{entry['CanonicalSMILES']}\t{entry['InChIKey']}\t{entry['MolecularFormula']}\t{entry['MolecularWeight']}\n")
194-
195-
with open(ignore_chems,"a") as ig_f:
261+
f.write(f"{entry['improve_drug_id']}\t{entry['name']}\t{entry.get('CID', '')}\t"
262+
f"{entry['CanonicalSMILES']}\t{entry['InChIKey']}\t"
263+
f"{entry['MolecularFormula']}\t{entry['MolecularWeight']}\n")
264+
265+
with open(ignore_chems, "a") as ig_f:
196266
for entry in data:
197267
if isname:
198268
ig_f.write(f"{entry['name']}\n")
199269
else:
200-
ig_f.write(f"{entry.get('CID','')}\n")
201-
202-
270+
ig_f.write(f"{entry.get('CID', '')}\n")
271+
203272
except Exception as e:
204273
print(f"An unexpected error occurred: {e}")
205274
finally:
206-
# Cancel the alarm
207275
signal.alarm(0)

0 commit comments

Comments
 (0)