Skip to content

Commit ab83427

Browse files
committed
Buffer executor submit calls to allow interrupting the download process
1 parent 61ddebb commit ab83427

2 files changed

Lines changed: 47 additions & 10 deletions

File tree

core.py

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from collections import namedtuple
22
from urllib.parse import urljoin
3+
import concurrent.futures
34
import threading
45
import requests
56
import datetime
@@ -10,13 +11,51 @@
1011
import json
1112
import math
1213

13-
# Allows you to retrieve the arguments passed to a function and
14-
# an arbitrary value by passing a 'store' or '_store' argument
15-
# as its return value. Good for use with concurrent.futures.
14+
# Calls a function and returns an object with the arguments used, its
15+
# return value and an arbitrary value provided by 'store' or '_store'.
16+
# In addition, a callable may be passed to either 'cancel' or '_cancel'
17+
# that may override the return value if it evaluates to a truthy value,
18+
# in which case, the original call will not be made. The callable must
19+
# take no arguments. This is intended for use with concurrent.futures.
1620
def wrap_call(function, *args, **kwargs):
1721
store = kwargs.pop('_store', kwargs.pop('store', None))
22+
cancel = kwargs.pop('_cancel', kwargs.pop('cancel', lambda: None))
1823
Wrap = namedtuple('Wrap', ['result', 'store', 'args', 'kwargs'])
19-
return Wrap(function(*args, **kwargs), store, args, kwargs)
24+
return Wrap(cancel() or function(*args, **kwargs), store, args, kwargs)
25+
26+
class BufferedExecutor(object):
27+
def __init__(self, submit_size, *args, **kwargs):
28+
self._submit_size = submit_size
29+
self._executor = concurrent.futures.ThreadPoolExecutor(*args, **kwargs)
30+
self._buffer = list()
31+
self._shutdown = False
32+
33+
def submit(self, fn, *args, **kwargs):
34+
self._buffer.append((fn, args, kwargs))
35+
36+
def __submit_from_buffer(self):
37+
fn, args, kwargs = self._buffer.pop(0)
38+
return self._executor.submit(fn, *args, **kwargs)
39+
40+
def as_completed(self):
41+
submitted = [self.__submit_from_buffer() for _ in range(self._submit_size)]
42+
while self._buffer and not self._shutdown:
43+
done, _ = concurrent.futures.wait(submitted, return_when=concurrent.futures.FIRST_COMPLETED)
44+
for future in done:
45+
submitted.remove(future)
46+
submitted.append(self.__submit_from_buffer())
47+
yield future
48+
49+
def shutdown(self, wait=True):
50+
self._shutdown = True
51+
self._executor.shutdown(wait=wait)
52+
53+
def __enter__(self):
54+
return self
55+
56+
def __exit__(self, exc_type, exc_val, exc_tb):
57+
self.shutdown(wait=True)
58+
return False
2059

2160
class IndexServer(object):
2261
# Index metadata schema:

update.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22
from core import IndexServer, Task, ProgressReporter
33
from tqdm import tqdm
44
from urllib.parse import quote, urljoin
5-
from concurrent.futures import as_completed
6-
import concurrent.futures
75
import threading
86
import argparse
97
import urllib3
@@ -59,13 +57,13 @@ def perform_update(flashpoint, current, target, file_endpoint, reporter):
5957
reporter.logger.info('File from target index already in temp folder. Skipped: %s' % hash)
6058

6159
session = requests.Session()
62-
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
63-
tasks = list()
60+
with core.BufferedExecutor(32, max_workers=8) as executor:
6461
for hash in to_download:
6562
path = target['files'][hash][0]
6663
url = urljoin(file_endpoint, quote(path))
67-
tasks.append(executor.submit(core.wrap_call, download_file, session, url, os.path.join(tmp, hash), store=path))
68-
for future, report in reporter.task_it('Downloading new data...', as_completed(tasks), length=len(tasks), unit='file'):
64+
executor.submit(core.wrap_call, download_file, session, url, os.path.join(tmp, hash),
65+
store=path, cancel=reporter.is_stopped)
66+
for future, report in reporter.task_it('Downloading new data...', executor.as_completed(), length=len(to_download), unit='file'):
6967
report(os.path.basename(future.result().store))
7068

7169
reporter.task('Removing obsolete files...')

0 commit comments

Comments
 (0)