1+ import re
12import os
23import json
34import time
45import uuid
5- from dateutil .parser import parse
6- from datetime import timedelta
7- import re
86import redis
9- import pandas as pd
107import pathlib
8+ import numpy as np
9+ import pandas as pd
10+ from datetime import timedelta
11+ from dateutil .parser import parse
12+ from typing import Dict , List , Union
13+ from typing_extensions import Literal
1114
1215
1316redis_config = {
@@ -29,7 +32,7 @@ def get_key(store, key, timeout=180):
2932 result = {
3033 "k" : key ,
3134 "status" : "error" ,
32- "error" : "timeout"
35+ "error" : "timeout"
3336 }
3437 while tries <= max_tries :
3538 res = store .get (key + "_output" )
@@ -119,3 +122,149 @@ def get_nested_value(data, keys, default=None):
119122 if data is None :
120123 return default
121124 return data
125+
126+
127+ def push_metadata_to_queue (
128+ redis_store : redis .Redis ,
129+ params : Dict [str , Union [str , List [str ]]],
130+ metadata : pd .DataFrame ,
131+ source_list : List [str ]
132+ ) -> str :
133+ """
134+ Sending metadata for processing into Redis queue and returning the request_id.
135+
136+ :param redis_store: Object of the Redis store.
137+ :param params: Request params.
138+ :param metadata: DataFrame with default metadata.
139+ :param source_list: define from which service additional metadata will be received (available values: "crossref", "altmetric").
140+ :return: request_id for the receiving of the request result.
141+ """
142+ # Checks that valid values are specified in the source array
143+ check_metadata_enrichment_source (source_list )
144+
145+ # Creates a new unique request identifier that will then be used to retrieve the result
146+ request_id = str (uuid .uuid4 ())
147+
148+ # Specifies from which sources to obtain information
149+ params ["metrics_sources" ] = source_list
150+
151+ # Payload object creation
152+ task_data = json .dumps ({
153+ "id" : request_id ,
154+ "params" : params ,
155+ "metadata" : metadata .to_json (orient = "records" ),
156+ })
157+
158+ # Pushing request to Redis and returning request id
159+ redis_store .rpush ("metrics" , task_data )
160+ return request_id
161+
162+
163+ def check_metadata_enrichment_source (source_list : List [str ]) -> None :
164+ """
165+ Checks that valid values are specified in the source array.
166+
167+ :param source_list: List of sources from where metadata will be enriched.
168+ :return: None.
169+ """
170+ if not all (source in ("crossref" , "altmetric" ) for source in source_list ):
171+ raise ValueError ("Source list must contain only 'crossref' or 'altmetric'" )
172+
173+
174+ def fetch_enriched_metadata (redis_store : redis .Redis , request_id : str , timeout : int = 600 ) -> pd .DataFrame :
175+ """
176+ Getting enriched metadata from Redis.
177+
178+ :param redis_store: Object of the Redis store.
179+ :param request_id: Unique indemnificator of the request.
180+ :param timeout: Results waiting time (default - 600 seconds).
181+ :return: Enriched DataFrame with metadata.
182+ """
183+ # Getting result of metadata enrichment from Redis
184+ result = get_key (redis_store , request_id , timeout )
185+ return pd .DataFrame (result ["input_data" ])
186+
187+
188+ def get_metadata_columns_for_source (source_list : List [str ]) -> List [str ]:
189+ """
190+ Returning required metadata columns for different sources.
191+
192+ :param source_list: List of sources from where metadata received.
193+ :return: array with required metadata columns.
194+ """
195+ # Checks that valid values are specified in the source array
196+ check_metadata_enrichment_source (source_list )
197+
198+ # Define required metadata columns for different sources and return them
199+ result = []
200+
201+ if "crossref" in source_list :
202+ result .extend (["citation_count" ])
203+
204+ if "altmetric" in source_list :
205+ result .extend ([
206+ "cited_by_wikipedia_count" ,
207+ "cited_by_msm_count" ,
208+ "cited_by_policies_count" ,
209+ "cited_by_patents_count" ,
210+ "cited_by_accounts_count" ,
211+ "cited_by_fbwalls_count" ,
212+ "cited_by_feeds_count" ,
213+ "cited_by_gplus_count" ,
214+ "cited_by_rdts_count" ,
215+ "cited_by_qna_count" ,
216+ "cited_by_tweeters_count" ,
217+ "cited_by_videos_count"
218+ ])
219+
220+ return result
221+
222+
223+ def ensure_required_columns (metadata : pd .DataFrame , source_list : List [str ]) -> pd .DataFrame :
224+ """
225+ Checks that all necessary columns are available or adding them with NaN value.
226+
227+ :param metadata: DataFrame with metadata.
228+ :param source_list: List of sources from where metadata received.
229+ :return: Updated DataFrame.
230+ """
231+ # Checks that valid values are specified in the source array
232+ check_metadata_enrichment_source (source_list )
233+
234+ # Gets metadata columns that must be received from source(-s)
235+ columns = get_metadata_columns_for_source (source_list )
236+ for column in columns :
237+ if column not in metadata .columns :
238+ metadata [column ] = np .NaN
239+
240+ return metadata
241+
242+
243+ def enrich_metadata (
244+ redis : redis .Redis ,
245+ params : Dict [str , Union [str , List [str ]]],
246+ metadata : pd .DataFrame ,
247+ source_list : List [str ],
248+ ) -> pd .DataFrame :
249+ """
250+ Enriching metadata - adding information about citations from Redis.
251+
252+ :param redis: store object of Redis.
253+ :param params: params of the request.
254+ :param metadata: DataFrame with default metadata.
255+ :param source: define from which service additional metadata will be received (available values: "crossref", "altmetric").
256+ :return: Enriched DataFrame with metadata.
257+ """
258+ # Checks that valid values are specified in the source array
259+ check_metadata_enrichment_source (source_list )
260+
261+ # Creates a request to metrics for metadata enrichment
262+ # and returns request_id for receiving the result later
263+ request_id = push_metadata_to_queue (redis , params , metadata , source_list )
264+
265+ # Getting the result after metadata enrichment at metrics
266+ enriched_metadata = fetch_enriched_metadata (redis , request_id )
267+
268+ # Checks that all necessary columns are available or adding them with NaN value
269+ enriched_metadata = ensure_required_columns (enriched_metadata , source_list )
270+ return enriched_metadata
0 commit comments