Skip to content

Commit fdd012f

Browse files
committed
Merge remote-tracking branch 'upstream/bugfix/pubmed-citations' into bugfix/pubmed-citations
2 parents 756c52f + 4ea4880 commit fdd012f

4 files changed

Lines changed: 136 additions & 30 deletions

File tree

server/preprocessing/other-scripts/run_metrics.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ if (!is.null(params$lang_id)) {
5050
if (!is.null(params$metrics_sources)) {
5151
metrics_sources <- params$metrics_sources
5252
} else {
53-
metrics_sources <- 'all'
53+
metrics_sources <- c("altmetric", "crossref")
5454
}
5555

5656
source('metrics.R')

server/workers/common/common/utils.py

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
1+
import re
12
import os
23
import json
34
import time
45
import uuid
5-
from dateutil.parser import parse
6-
from datetime import timedelta
7-
import re
86
import redis
9-
import pandas as pd
107
import pathlib
8+
import numpy as np
9+
import pandas as pd
10+
from datetime import timedelta
11+
from dateutil.parser import parse
1112

1213

1314
redis_config = {
@@ -119,3 +120,91 @@ def get_nested_value(data, keys, default=None):
119120
if data is None:
120121
return default
121122
return data
123+
124+
125+
def push_metadata_to_queue(redis_store, params, metadata):
126+
"""
127+
Sending metadata for processing into Redis queue and returning the request_id.
128+
129+
:param redis_store: Object of the Redis store.
130+
:param params: Request params.
131+
:param metadata: DataFrame with default metadata.
132+
:return: request_id for the receiving of the request result.
133+
"""
134+
135+
request_id = str(uuid.uuid4())
136+
params["metrics_sources"] = ["crossref"]
137+
task_data = json.dumps({
138+
"id": request_id,
139+
"params": params,
140+
"metadata": metadata.to_json(orient="records"),
141+
})
142+
143+
redis_store.rpush("metrics", task_data)
144+
return request_id
145+
146+
147+
def fetch_enriched_metadata(redis_store, request_id, timeout = 600):
148+
"""
149+
Getting enriched metadata from Redis.
150+
151+
:param redis_store: Object of the Redis store.
152+
:param request_id: Unique indemnificator of the request.
153+
:param timeout: Results waiting time (default - 600 seconds).
154+
:return: Enriched DataFrame with metadata.
155+
"""
156+
result = get_key(redis_store, request_id, timeout)
157+
return pd.DataFrame(result["input_data"])
158+
159+
160+
def ensure_required_columns(metadata: pd.DataFrame) -> pd.DataFrame:
161+
"""
162+
Checks that all necessary columns are available or adding them with NaN value.
163+
164+
:param metadata: DataFrame with metadata.
165+
:return: Updated DataFrame.
166+
"""
167+
REQUIRED_METADATA_COLUMNS = [
168+
"citation_count",
169+
"cited_by_wikipedia_count",
170+
"cited_by_msm_count",
171+
"cited_by_policies_count",
172+
"cited_by_patents_count",
173+
"cited_by_accounts_count",
174+
"cited_by_fbwalls_count",
175+
"cited_by_feeds_count",
176+
"cited_by_gplus_count",
177+
"cited_by_rdts_count",
178+
"cited_by_qna_count",
179+
"cited_by_tweeters_count",
180+
"cited_by_videos_count"
181+
]
182+
183+
for column in REQUIRED_METADATA_COLUMNS:
184+
if column not in metadata.columns:
185+
metadata[column] = np.NaN
186+
187+
return metadata
188+
189+
190+
def enrich_metadata(redis_store, params, metadata: pd.DataFrame) -> pd.DataFrame:
191+
"""
192+
Enriching metadata - adding information about citations from Redis.
193+
194+
:param redis_store: store object of Redis.
195+
:param params: params of the request.
196+
:param metadata: DataFrame with default metadata.
197+
198+
:return: Enriched DataFrame with metadata.
199+
"""
200+
201+
# Creates a request to metrics for metadata enrichment
202+
# and returns request_id for receiving the result later
203+
request_id = push_metadata_to_queue(redis_store, params, metadata)
204+
205+
# Getting the result after metadata enrichment at metrics
206+
enriched_metadata = fetch_enriched_metadata(redis_store, request_id)
207+
208+
# Checks that all necessary columns are available or adding them with NaN value
209+
enriched_metadata = ensure_required_columns(enriched_metadata)
210+
return enriched_metadata

server/workers/metrics/src/metrics.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,16 @@ def next_item(self):
3434

3535
@error_logging_aspect(log_level=logging.ERROR)
3636
def execute_search(self, params: dict, metadata: str) -> dict:
37+
self.logger.debug(f"execute_search function running in metrics.py")
38+
3739
command = [
38-
self.command,
39-
self.runner,
40-
self.wd,
41-
params.get('q'),
40+
self.command,
41+
self.runner,
42+
self.wd,
43+
params.get('q'),
4244
params.get('service')
4345
]
4446

45-
self.logger.debug(f"Executing command: {command}")
46-
4747
data = {
4848
"params": params,
4949
"metadata": metadata
@@ -59,11 +59,19 @@ def execute_search(self, params: dict, metadata: str) -> dict:
5959
)
6060
stdout, stderr = proc.communicate(json.dumps(data))
6161

62-
self.logger.debug(f"Stdout: {stdout}")
62+
# TODO: Remove after development
63+
self.logger.debug(f"Raw stdout: {stdout}")
64+
self.logger.debug(f"Raw stderr: {stderr}")
6365

6466
output = [line for line in stdout.split('\n') if line]
6567
errors = [line for line in stderr.split('\n') if line]
6668

69+
# TODO: Remove after development
70+
if not output:
71+
raise ValueError("No output received from the subprocess")
72+
if len(output) < 2:
73+
raise ValueError(f"Unexpected output format: {output}")
74+
6775
if not output:
6876
raise ValueError("No output received from the subprocess")
6977

server/workers/pubmed/src/pubmed.py

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import pandas as pd
44
import logging
55
from common.r_wrapper import RWrapper
6+
from common.utils import enrich_metadata
67

78
formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s',
89
datefmt='%Y-%m-%d %H:%M:%S')
@@ -19,35 +20,43 @@ def next_item(self):
1920
endpoint = msg.get('endpoint')
2021
return k, params, endpoint
2122

23+
2224
def execute_search(self, params):
2325
q = params.get('q')
2426
service = params.get('service')
25-
data = {}
26-
data["params"] = params
27-
cmd = [self.command, self.runner, self.wd,
28-
q, service]
27+
data = {"params": params}
28+
cmd = [self.command, self.runner, self.wd, q, service]
29+
2930
try:
3031
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
3132
encoding='utf-8')
3233
stdout, stderr = proc.communicate(json.dumps(data))
33-
output = [o for o in stdout.split('\n') if len(o) > 0]
34-
error = [o for o in stderr.split('\n') if len(o) > 0]
34+
output = [o for o in stdout.split('\n') if o]
35+
error = [o for o in stderr.split('\n') if o]
36+
3537
raw_metadata = json.loads(output[-2])
3638
raw_text = json.loads(output[-1])
39+
3740
if isinstance(raw_metadata, dict) and raw_metadata.get('status') == "error":
38-
res = raw_metadata
39-
else:
40-
metadata = pd.DataFrame(raw_metadata)
41-
text = pd.DataFrame(raw_text)
42-
input_data = {}
43-
input_data["metadata"] = metadata.to_json(orient='records')
44-
input_data["text"] = text.to_json(orient='records')
45-
res = {}
46-
res["input_data"] = input_data
47-
res["params"] = params
48-
return res
41+
return raw_metadata
42+
43+
metadata = pd.DataFrame(raw_metadata)
44+
45+
metadata = enrich_metadata(self.redis_store, params, metadata)
46+
for index, row in metadata.iterrows():
47+
self.logger.debug(f"Title: {row['title']}, DOI: {row['doi']}, Citations: {row.get('citation_count', 'N/A')}")
48+
49+
text = pd.DataFrame(raw_text)
50+
51+
input_data = {
52+
"metadata": metadata.to_json(orient='records'),
53+
"text": text.to_json(orient='records')
54+
}
55+
56+
return {"input_data": input_data, "params": params}
57+
4958
except Exception as e:
50-
self.logger.error(e)
59+
self.logger.error(f"Error in execute_search: {e}")
5160
self.logger.error(error)
5261
raise
5362

0 commit comments

Comments
 (0)