Skip to content

Commit fb9b12a

Browse files
author
btgoodwin
committed
Renamed bulkio test to not conflict with other imports of REDHAWK's own bulkio module and corrected the test to reflect our API for BULKIO. Also discovered and fixed numerous bugs from a botched transition from protobuf to JSON formatted BULKIO pushes.
1 parent 6822ba4 commit fb9b12a

2 files changed

Lines changed: 66 additions & 92 deletions

File tree

rest/bulkio_handler.py

Lines changed: 50 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import logging
2222

2323
from bulkio.bulkioInterfaces import BULKIO__POA
24-
24+
from bulkio import sri
2525
from omniORB import CORBA
2626

2727
# third party imports
@@ -69,25 +69,25 @@ def open(self, *args):
6969
for p in obj.ports:
7070
if p.name == path[0]:
7171
if p._direction == 'Uses':
72-
data_type = p._using.name
7372
namespace = p._using.nameSpace
7473

7574
if namespace == 'BULKIO':
76-
self.port = p.ref
75+
self.port = p
7776
logging.debug("Found port %s", self.port)
7877

78+
data_type = p._using.name
7979
bulkio_poa = getattr(BULKIO__POA, data_type)
8080
logging.debug(bulkio_poa)
8181

8282
self.async_port = AsyncPort(bulkio_poa, self._pushSRI, self._pushPacket)
83-
self._portname = 'rest-python-%s' % id(self)
8483

8584
if len(path) == 3:
8685
self._connectionId = path[2][1:]
8786
else:
88-
self._connectionId = None
87+
self._connectionId = 'rest-python-%s' % id(self)
8988

90-
self.port.connectPort(self.async_port.getPort(), self._portname, self._connectionId)
89+
self.port.ref.connectPort(self.async_port.getPort(), self._connectionId)
90+
logging.info("Opened websocket to %s, %s", self.port, self._connectionId)
9191

9292
break
9393
else:
@@ -106,68 +106,55 @@ def open(self, *args):
106106
self.close()
107107

108108
def on_message(self, message):
109-
ctrl = json.loads(message)
110-
111-
if (ctrl.type == Control.MaxWidth):
112-
if 0 < ctrl.value:
113-
self._outputWidth = ctrl.value
114-
logging.info('Decimation requested to {0} samples'.format(ctrl.value))
115-
else:
116-
self._outputWidth = None
117-
logging.info('Decimation disabled')
118-
119-
elif (ctrl.type == Control.MaxPPS):
120-
logging.warning('Packets per second (PPS) not implemented yet.')
109+
try:
110+
ctrl = json.loads(message)
111+
112+
if (ctrl['type'] == Control.MaxWidth):
113+
if 0 < ctrl['value']:
114+
self._outputWidth = ctrl['value']
115+
logging.info('Decimation requested to {0} samples'.format(ctrl['value']))
116+
else:
117+
self._outputWidth = None
118+
logging.info('Decimation disabled')
119+
120+
elif (ctrl['type'] == Control.MaxPPS):
121+
logging.warning('Packets per second (PPS) not implemented yet.')
122+
except Exception as e:
123+
self.write_message(dict(error='SystemError', message=str(e)))
121124

122125
def on_close(self):
123126
logging.debug('Stream CLOSE')
124127
try:
125-
self.port.disconnectPort(self._portname, self._connectionId)
128+
self.port.ref.disconnectPort(self._connectionId)
126129
except CORBA.TRANSIENT:
127130
pass
128131
except Exception, e:
129-
logging.exception('Error disconnecting port %s' % self._portname)
130-
131-
def _pushSRI(self, SRI):
132-
newSri = dict(
133-
hversion=SRI.hversion,
134-
xstart=SRI.xstart,
135-
xdelta=SRI.xdelta,
136-
xunits=SRI.xunits,
137-
subsize=SRI.subsize,
138-
ystart=SRI.ystart,
139-
ydelta=SRI.ydelta,
140-
yunits=SRI.yunits,
141-
mode=SRI.mode,
142-
streamID=SRI.streamID,
143-
blocking=SRI.blocking,
144-
keywords=dict(((kw.id, kw.value.value()) for kw in SRI.keywords)))
145-
self._updateSRIFromDict(newSri)
132+
logging.exception('Error disconnecting port %s' % self._connectionId)
133+
134+
def _pushSRI(self, newSRI):
135+
origSRI, changed = self._getSRI(newSRI.streamID)
136+
if origSRI is not None:
137+
changed = sri.compareSRI(origSRI, newSRI)
138+
self._SRIs[newSRI.streamID] = (newSRI, changed)
146139

147140
def _getSRI(self, streamID):
148141
return self._SRIs.get(streamID, (None, True))
149142

150-
def _updateSRIFromDict(self, newSri):
151-
sri, changed = self._getSRI(newSri['streamID'])
152-
if sri is not None:
153-
changed = compareSRI(sri, newSri)
154-
self._SRIs[sri.streamID] = (newSri, changed)
155-
156143
def _pushPacket(self, data, ts, EOS, stream_id):
157144
data = numpy.array(data)
158145

159146
if None == self._outputWidth:
160147
self._outputWidth = data.size
161148

162149
# Get SRI and modify if necessary (from decimation)
163-
sri, changed = self._getSRI(stream_id)
164-
outSri = dict.copy(sri)
150+
SRI, changed = self._getSRI(stream_id)
151+
outSRI = copy_sri(SRI)
165152
if 0 < data.size and data.size != self._outputWidth:
166153
D, M = divmod(data.size, self._outputWidth)
167154
if 0 == M and 1 < D:
168155
# Mean decimate
169156
data = data.reshape(-1, D).mean(axis=1)
170-
outSri.xdelta = sri.xdelta * D
157+
outSRI.xdelta = SRI.xdelta * D
171158
changed = True
172159
else:
173160
# Restore...invalid setting.
@@ -179,12 +166,12 @@ def _pushPacket(self, data, ts, EOS, stream_id):
179166
# Tack on SRI, Package, Deliver.
180167
packet = dict(
181168
streamID = stream_id,
182-
T = ts,
169+
T = ts.__dict__,
183170
EOS = EOS,
184171
sriChanged = changed,
185-
SRI = outSri,
172+
SRI = outSRI.__dict__,
186173
type = self.port._using.name,
187-
dataBuffer = data
174+
dataBuffer = data.tolist()
188175
)
189176
self._ioloop.add_callback(self.write_message, packet)
190177

@@ -196,31 +183,18 @@ def write_message(self, *args, **ioargs):
196183
logging.debug('Received WebSocketClosedError. Ignoring')
197184
self.close()
198185

199-
# Imported from ossie.utils sb
200-
# TODO: Handle keywords JSON strings
201-
def compareSRI(a, b):
202-
if a.hversion != b.hversion:
203-
return False
204-
if a.xstart != b.xstart:
205-
return False
206-
if a.xdelta != b.xdelta:
207-
return False
208-
if a.xunits != b.xunits:
209-
return False
210-
if a.subsize != b.subsize:
211-
return False
212-
if a.ystart != b.ystart:
213-
return False
214-
if a.ydelta != b.ydelta:
215-
return False
216-
if a.yunits != b.yunits:
217-
return False
218-
if a.mode != b.mode:
219-
return False
220-
if a.streamID != b.streamID:
221-
return False
222-
if a.blocking != b.blocking:
223-
return False
224-
if a.keywords != b.keywords:
225-
return False
226-
return True
186+
def copy_sri(SRI):
187+
copied = sri.create()
188+
copied.hversion = SRI.hversion
189+
copied.xstart = SRI.xstart
190+
copied.xdelta = SRI.xdelta
191+
copied.xunits = SRI.xunits
192+
copied.subsize = SRI.subsize
193+
copied.ystart = SRI.ystart
194+
copied.ydelta = SRI.ydelta
195+
copied.yunits = SRI.yunits
196+
copied.mode = SRI.mode
197+
copied.streamID = SRI.streamID
198+
copied.blocking = SRI.blocking
199+
copied.keywords = SRI.keywords[:]
200+
return copied

tests/bulkio.py renamed to tests/bulkio_tests.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,6 @@
4646

4747
class BulkIOTests(JsonTests, AsyncHTTPTestCase, LogTrapTestCase):
4848

49-
# def setUp(self):
50-
# super(RESTfulTest, self).setUp()
51-
# rtl_app.RTLApp('REDHAWK_DEV').stop_survey()
52-
5349
def setUp(self):
5450
super(JsonTests, self).setUp()
5551
json, resp = self._json_request(
@@ -86,33 +82,37 @@ def test_bulkio_ws(self):
8682

8783
# NOTE: A timeout means the website took too long to respond
8884
# it could mean that bulkio port is not sending data
89-
cid = next((cp['id'] for cp in self.components if cp['name'] == 'SigGen'), None)
85+
cid = next((cp['id'] for cp in self.components if cp['name'] == Default.COMPONENT), None)
9086
if not cid:
91-
self.fail('Unable to find SigGen component')
87+
self.fail('Unable to find %s component' % (Default.COMPONENT))
9288

93-
url = self.get_url("%s/components/%s/ports/out/bulkio"%(Default.REST_BASE+self.base_url,cid)).replace('http','ws')
94-
conn1 = yield websocket.websocket_connect(url,
95-
io_loop=self.io_loop)
89+
url = self.get_url("%s/components/%s/ports/%s/bulkio"%(Default.REST_BASE+self.base_url,cid,Default.COMPONENT_USES_PORT)).replace('http','ws')
90+
conn1 = yield websocket.websocket_connect(url, io_loop=self.io_loop)
9691

9792
foundSRI = False
9893
for x in xrange(10):
9994
msg = yield conn1.read_message()
10095
try:
101-
data = json.loads(msg)
102-
logging.debug("Got SRI %s", data)
96+
packet = json.loads(msg)
97+
sri = packet.get('SRI', None)
98+
logging.debug("Got SRI %s", sri)
10399
foundSRI = True
104100
props = set(('hversion', 'xstart', 'xdelta', 'xunits',
105101
'subsize', 'ystart', 'ydelta', 'yunits', 'mode',
106102
'streamID', 'blocking', 'keywords'))
107-
missing = props.difference(data.keys())
103+
missing = props.difference(sri.keys())
108104
if missing:
109-
self.fail("Missing SRI properties %s" % missing)
105+
self.fail("Missing SRI properties %s" % missing)
106+
107+
buf = packet.get('dataBuffer', [])
108+
if not buf:
109+
self.fail("Data buffer was empty.")
110+
110111
except ValueError:
111112
data = dict(data=msg)
112113

113-
if data.get('error', None):
114-
self.fail('Recieved websocket error %s' % data)
115-
#conn1.protocol.close()
114+
if packet.get('error', None):
115+
self.fail('Recieved websocket error %s' % packet)
116116
conn1.close()
117117

118118
# wait a little bit to force close to take place in ioloop

0 commit comments

Comments
 (0)