Skip to content

Commit fd22137

Browse files
committed
Add streaming get
1 parent 8d9e173 commit fd22137

2 files changed

Lines changed: 81 additions & 27 deletions

File tree

consulate/adapters.py

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,16 @@
44
"""
55
import json
66
import logging
7+
import socket
78

89
import requests
10+
import requests.exceptions
911
try:
1012
import requests_unixsocket
1113
except ImportError: # pragma: no cover
1214
requests_unixsocket = None
1315

14-
from consulate import api
15-
from consulate import utils
16+
from consulate import api, exceptions, utils
1617

1718
LOGGER = logging.getLogger(__name__)
1819

@@ -71,23 +72,51 @@ def delete(self, uri):
7172
return self._process_response(
7273
self.session.delete(uri, timeout=self.timeout))
7374

74-
def get(self, uri):
75+
def get(self, uri, timeout=None):
7576
"""Perform a HTTP get
7677
7778
:param src uri: The URL to send the DELETE to
79+
:param timeout: How long to wait on the response
80+
:type timeout: int or float or None
7881
:rtype: consulate.api.Response
7982
8083
"""
8184
LOGGER.debug("GET %s", uri)
82-
return self._process_response(
83-
self.session.get(uri, timeout=self.timeout))
85+
try:
86+
return self._process_response(
87+
self.session.get(uri, timeout=timeout or self.timeout))
88+
except (requests.exceptions.ConnectionError,
89+
OSError, socket.error) as err:
90+
raise exceptions.RequestError(str(err))
91+
92+
def get_stream(self, uri):
93+
"""Perform a HTTP get that returns the response as a stream.
94+
95+
:param src uri: The URL to send the DELETE to
96+
:rtype: iterator
97+
98+
"""
99+
LOGGER.debug("GET Stream from %s", uri)
100+
try:
101+
response = self.session.get(uri, stream=True)
102+
except (requests.exceptions.ConnectionError,
103+
OSError, socket.error) as err:
104+
raise exceptions.RequestError(str(err))
105+
if response.encoding is None:
106+
response.encoding = 'utf-8'
107+
if utils.response_ok(response):
108+
for line in response.iter_lines():
109+
if line:
110+
yield line.decode('utf-8')
84111

85112
@prepare_data
86-
def put(self, uri, data=None):
113+
def put(self, uri, data=None, timeout=None):
87114
"""Perform a HTTP put
88115
89116
:param src uri: The URL to send the DELETE to
90117
:param str data: The PUT data
118+
:param timeout: How long to wait on the response
119+
:type timeout: int or float or None
91120
:rtype: consulate.api.Response
92121
93122
"""
@@ -96,9 +125,14 @@ def put(self, uri, data=None):
96125
'Content-Type': CONTENT_FORM
97126
if utils.is_string(data) else CONTENT_JSON
98127
}
99-
return self._process_response(
100-
self.session.put(
101-
uri, data=data, headers=headers, timeout=self.timeout))
128+
try:
129+
return self._process_response(
130+
self.session.put(
131+
uri, data=data, headers=headers,
132+
timeout=timeout or self.timeout))
133+
except (requests.exceptions.ConnectionError,
134+
OSError, socket.error) as err:
135+
raise exceptions.RequestError(str(err))
102136

103137
@staticmethod
104138
def _process_response(response):
@@ -109,8 +143,12 @@ def _process_response(response):
109143
:rtype: consulate.api.Response
110144
111145
"""
112-
return api.Response(
113-
response.status_code, response.content, response.headers)
146+
try:
147+
return api.Response(
148+
response.status_code, response.content, response.headers)
149+
except (requests.exceptions.ConnectionError,
150+
OSError, socket.error) as err:
151+
raise exceptions.RequestError(str(err))
114152

115153

116154
class UnixSocketRequest(Request):

consulate/api/base.py

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,22 +51,22 @@ def _build_uri(self, params, query_params=None):
5151
urlencode(query_params))
5252
return '{0}/{1}'.format(self._base_uri, path)
5353

54-
def _get(self, params, query_params=None, raise_on_404=False):
54+
def _get(self, params, query_params=None, raise_on_404=False,
55+
timeout=None):
5556
"""Perform a GET request
5657
5758
:param list params: List of path parts
5859
:param dict query_params: Build query parameters
60+
:param timeout: How long to wait on the request for
61+
:type timeout: int or float or None
5962
6063
"""
61-
response = self._adapter.get(self._build_uri(params, query_params))
62-
if response.status_code == 200:
64+
response = self._adapter.get(
65+
self._build_uri(params, query_params), timeout=timeout)
66+
if not response:
67+
print(response)
68+
if utils.response_ok(response, raise_on_404):
6369
return response.body
64-
elif response.status_code == 401:
65-
raise exceptions.ACLDisabled(response.body)
66-
elif response.status_code == 403:
67-
raise exceptions.Forbidden(response.body)
68-
elif response.status_code == 404 and raise_on_404:
69-
raise exceptions.NotFound(response.body)
7070
return []
7171

7272
def _get_list(self, params, query_params=None):
@@ -81,20 +81,36 @@ def _get_list(self, params, query_params=None):
8181
return [result]
8282
return result
8383

84+
def _get_stream(self, params, query_params=None):
85+
"""Return a list queried from Consul
86+
87+
:param list params: List of path parts
88+
:param dict query_params: Build query parameters
89+
:rtype: iterator
90+
91+
"""
92+
for line in self._adapter.get_stream(
93+
self._build_uri(params, query_params)):
94+
yield line
95+
8496
def _get_no_response_body(self, url_parts, query=None):
85-
return self._adapter.get(self._build_uri(url_parts,
86-
query)).status_code == 200
97+
return utils.response_ok(
98+
self._adapter.get(self._build_uri(url_parts, query)))
8799

88100
def _get_response_body(self, url_parts, query=None):
89-
return self._adapter.get(self._build_uri(url_parts, query)).body
101+
response = self._adapter.get(self._build_uri(url_parts, query))
102+
if utils.response_ok(response):
103+
return response.body
90104

91105
def _put_no_response_body(self, url_parts, query=None, payload=None):
92-
return self._adapter.put(self._build_uri(url_parts, query),
93-
payload).status_code == 200
106+
return utils.response_ok(
107+
self._adapter.put(self._build_uri(url_parts, query), payload))
94108

95109
def _put_response_body(self, url_parts, query=None, payload=None):
96-
return self._adapter.put(self._build_uri(url_parts, query),
97-
payload).body
110+
response = self._adapter.put(
111+
self._build_uri(url_parts, query), payload)
112+
if utils.response_ok(response):
113+
return response.body
98114

99115

100116
class Response(object):

0 commit comments

Comments
 (0)