Skip to content

Commit bfb2cd5

Browse files
committed
move logic for synchronous job execution, so it can be used outside of imagecollection as well
1 parent b5f3b47 commit bfb2cd5

2 files changed

Lines changed: 45 additions & 39 deletions

File tree

openeo/rest/imagecollectionclient.py

Lines changed: 2 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
from deprecated import deprecated
88
from shapely.geometry import Polygon, MultiPolygon, mapping
9-
from requests import ConnectionError
109

1110
from openeo.graphbuilder import GraphBuilder
1211
from openeo.imagecollection import ImageCollection, CollectionMetadata
@@ -982,45 +981,9 @@ def execute_batch(
982981
:param format_options: String Parameters for the job result format
983982
984983
"""
985-
# TODO: move this logic to RESTJob?
984+
from openeo.rest.job import RESTJob
986985
job = self.send_job(out_format,job_options=job_options, **format_options)
987-
job.start_job()
988-
989-
job_id = job.job_id
990-
job_info = None
991-
status = None
992-
poll_interval = min(5, max_poll_interval)
993-
start_time = time.time()
994-
while True:
995-
# TODO: also allow a hard time limit on this infinite poll loop?
996-
elapsed = str(datetime.timedelta(seconds=time.time() - start_time))
997-
try:
998-
job_info = job.describe_job()
999-
except ConnectionError as e:
1000-
print("{t} Connection error while querying job status: {e}".format(t=elapsed, e=e))
1001-
time.sleep(connection_retry_interval)
1002-
continue
1003-
1004-
status = job_info.get("status", "N/A")
1005-
print("{t} Job {i}: {s} (progress {p})".format(
1006-
t=elapsed, i=job_id, s=status,
1007-
p='{p}%'.format(p=job_info["progress"]) if "progress" in job_info else "N/A"
1008-
))
1009-
if status not in ('submitted', 'queued', 'running'):
1010-
break
1011-
1012-
time.sleep(poll_interval)
1013-
poll_interval = min(1.25 * poll_interval, max_poll_interval)
1014-
1015-
elapsed = str(datetime.timedelta(seconds=time.time() - start_time))
1016-
if status == 'finished':
1017-
job.download_results(outputfile)
1018-
else:
1019-
raise RuntimeError("Batch job {i} didn't finish properly. Status: {s} (after {t}).".format(
1020-
i=job_id, s=status, t=elapsed
1021-
))
1022-
1023-
return job_info
986+
return RESTJob.run_synchronous(job,outputfile,print=print, max_poll_interval=60, connection_retry_interval=30)
1024987

1025988
def send_job(self, out_format=None, job_options=None, **format_options) -> Job:
1026989
"""

openeo/rest/job.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
import typing
33
import urllib.request
44
from typing import List
5+
import datetime
6+
import time
7+
from requests import ConnectionError
58

69
from openeo.job import Job, JobResult
710
from openeo.rest import OpenEoClientException
@@ -112,3 +115,43 @@ def queue(self):
112115
def results(self) -> List[RESTJobResult]:
113116
""" Returns this job's results. """
114117
return [RESTJobResult(link['href']) for link in self.connection.job_results(self.job_id)['links']]
118+
119+
@classmethod
120+
def run_synchronous(cls, job, outputfile: str, print=print, max_poll_interval=60, connection_retry_interval=30):
121+
job.start_job()
122+
123+
job_id = job.job_id
124+
job_info = None
125+
status = None
126+
poll_interval = min(5, max_poll_interval)
127+
start_time = time.time()
128+
while True:
129+
# TODO: also allow a hard time limit on this infinite poll loop?
130+
elapsed = str(datetime.timedelta(seconds=time.time() - start_time))
131+
try:
132+
job_info = job.describe_job()
133+
except ConnectionError as e:
134+
print("{t} Connection error while querying job status: {e}".format(t=elapsed, e=e))
135+
time.sleep(connection_retry_interval)
136+
continue
137+
138+
status = job_info.get("status", "N/A")
139+
print("{t} Job {i}: {s} (progress {p})".format(
140+
t=elapsed, i=job_id, s=status,
141+
p='{p}%'.format(p=job_info["progress"]) if "progress" in job_info else "N/A"
142+
))
143+
if status not in ('submitted', 'queued', 'running'):
144+
break
145+
146+
time.sleep(poll_interval)
147+
poll_interval = min(1.25 * poll_interval, max_poll_interval)
148+
149+
elapsed = str(datetime.timedelta(seconds=time.time() - start_time))
150+
if status == 'finished':
151+
job.download_results(outputfile)
152+
else:
153+
raise RuntimeError("Batch job {i} didn't finish properly. Status: {s} (after {t}).".format(
154+
i=job_id, s=status, t=elapsed
155+
))
156+
157+
return job_info

0 commit comments

Comments
 (0)