Skip to content

Commit b566ccb

Browse files
committed
basic channel access datasource
1 parent 036a429 commit b566ccb

5 files changed

Lines changed: 641 additions & 0 deletions

File tree

Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,360 @@
1+
package org.epics.gpclient.datasource.ca;
2+
3+
import static org.epics.gpclient.datasource.ca.CADataSource.log;
4+
5+
import java.util.Arrays;
6+
import java.util.logging.Level;
7+
import java.util.regex.Pattern;
8+
9+
import org.epics.gpclient.datasource.MultiplexedChannelHandler;
10+
11+
import gov.aps.jca.CAException;
12+
import gov.aps.jca.Channel;
13+
import gov.aps.jca.Monitor;
14+
import gov.aps.jca.dbr.DBR;
15+
import gov.aps.jca.dbr.DBRType;
16+
import gov.aps.jca.dbr.DBR_CTRL_Double;
17+
import gov.aps.jca.dbr.DBR_LABELS_Enum;
18+
import gov.aps.jca.dbr.DBR_String;
19+
import gov.aps.jca.dbr.DBR_TIME_Byte;
20+
import gov.aps.jca.dbr.DBR_TIME_Double;
21+
import gov.aps.jca.dbr.DBR_TIME_Enum;
22+
import gov.aps.jca.dbr.DBR_TIME_Float;
23+
import gov.aps.jca.dbr.DBR_TIME_Int;
24+
import gov.aps.jca.dbr.DBR_TIME_Short;
25+
import gov.aps.jca.dbr.DBR_TIME_String;
26+
import gov.aps.jca.event.AccessRightsEvent;
27+
import gov.aps.jca.event.AccessRightsListener;
28+
import gov.aps.jca.event.ConnectionEvent;
29+
import gov.aps.jca.event.ConnectionListener;
30+
import gov.aps.jca.event.MonitorEvent;
31+
import gov.aps.jca.event.MonitorListener;
32+
33+
public class CAChannelHandler extends MultiplexedChannelHandler<CAConnectionPayload, CAMessagePayload> {
34+
35+
private static final int LARGE_ARRAY = 10000000;
36+
37+
private final CADataSource caDataSource;
38+
39+
private volatile Channel channel;
40+
private volatile boolean largeArray = false;
41+
private volatile boolean sentReadOnlyException = false;
42+
43+
private Monitor valueMonitor;
44+
private Monitor metadataMonitor;
45+
46+
public CAChannelHandler(String channelName, CADataSource caDataSource) {
47+
super(channelName);
48+
this.caDataSource = caDataSource;
49+
}
50+
51+
/**
52+
* The datasource this channel refers to.
53+
*
54+
* @return a ca data source
55+
*/
56+
public CADataSource getCADataSource() {
57+
return caDataSource;
58+
}
59+
60+
@Override
61+
protected void connect() {
62+
try {
63+
// Give the listener right away so that no event gets lost
64+
// If it's a large array, connect using lower priority
65+
if (largeArray) {
66+
channel = caDataSource.getContext().createChannel(getChannelName(), connectionListener,
67+
Channel.PRIORITY_MIN);
68+
} else {
69+
channel = caDataSource.getContext().createChannel(getChannelName(), connectionListener,
70+
(short) (Channel.PRIORITY_MIN + 1));
71+
}
72+
} catch (CAException ex) {
73+
throw new RuntimeException("JCA Connection failed", ex);
74+
}
75+
}
76+
77+
@Override
78+
protected void disconnect() {
79+
try {
80+
// Close the channel
81+
// Need to guard because the channel may be closed if the
82+
// context was already destroyed
83+
if (channel.getConnectionState() != Channel.ConnectionState.CLOSED) {
84+
channel.removeConnectionListener(connectionListener);
85+
channel.destroy();
86+
}
87+
} catch (CAException ex) {
88+
throw new RuntimeException("JCA Disconnect fail", ex);
89+
} finally {
90+
channel = null;
91+
processConnection(null);
92+
}
93+
}
94+
95+
@Override
96+
protected boolean isConnected(CAConnectionPayload connPayload) {
97+
return connPayload != null && connPayload.isChannelConnected();
98+
}
99+
100+
@Override
101+
protected boolean isWriteConnected(CAConnectionPayload connPayload) {
102+
return connPayload != null && connPayload.isWriteConnected();
103+
}
104+
105+
private final ConnectionListener connectionListener = new ConnectionListener() {
106+
107+
@Override
108+
public void connectionChanged(ConnectionEvent ev) {
109+
synchronized (CAChannelHandler.this) {
110+
try {
111+
if (log.isLoggable(Level.FINEST)) {
112+
log.log(Level.FINEST, "JCA connectionChanged for channel {0} event {1}",
113+
new Object[] { getChannelName(), ev });
114+
}
115+
116+
// Take the channel from the event so that there is no
117+
// synchronization problem
118+
Channel channel = (Channel) ev.getSource();
119+
120+
// Check whether the channel is large and was opened
121+
// as large. Reconnect if does not match
122+
if (ev.isConnected() && channel.getElementCount() >= LARGE_ARRAY && !largeArray) {
123+
disconnect();
124+
largeArray = true;
125+
connect();
126+
return;
127+
}
128+
129+
processConnection(new CAConnectionPayload(CAChannelHandler.this, channel, getConnectionPayload()));
130+
if (ev.isConnected()) {
131+
// If connected, no write access and exception was not sent, notify writers
132+
if (!channel.getWriteAccess() && !sentReadOnlyException) {
133+
reportExceptionToAllWriters(createReadOnlyException());
134+
sentReadOnlyException = true;
135+
}
136+
// Setup monitors on connection
137+
setup(channel);
138+
} else {
139+
resetMessage();
140+
// Next connection, resend the read only exception if that's the case
141+
sentReadOnlyException = false;
142+
}
143+
144+
channel.addAccessRightsListener(new AccessRightsListener() {
145+
146+
@Override
147+
public void accessRightsChanged(AccessRightsEvent ev) {
148+
if (log.isLoggable(Level.FINEST)) {
149+
log.log(Level.FINEST, "JCA accessRightsChanged for channel {0} event {1}",
150+
new Object[] { getChannelName(), ev });
151+
}
152+
processConnection(
153+
new CAConnectionPayload(CAChannelHandler.this, channel, getConnectionPayload()));
154+
if (!sentReadOnlyException && !channel.getWriteAccess()) {
155+
reportExceptionToAllWriters(createReadOnlyException());
156+
sentReadOnlyException = true;
157+
}
158+
}
159+
160+
});
161+
} catch (Exception ex) {
162+
reportExceptionToAllReadersAndWriters(ex);
163+
}
164+
}
165+
}
166+
};;
167+
168+
private void setup(Channel channel) throws CAException {
169+
DBRType metaType = metadataFor(channel);
170+
171+
// If metadata is needed, get it
172+
if (metaType != null) {
173+
DBR dbr = channel.get(metaType, 1);
174+
if (log.isLoggable(Level.FINEST)) {
175+
log.log(Level.FINEST, "JCA metadata getCompleted for channel {0} event {1}",
176+
new Object[] { getChannelName(), dbr });
177+
}
178+
processMessage(new CAMessagePayload(dbr, null));
179+
180+
}
181+
// At each (re)connect, we need to create a new monitor:
182+
// since the type could be changed, we would have a type mismatch
183+
// between the current type and the old type when the monitor was
184+
// created
185+
186+
// XXX: Ideally, we would destroy the monitor on reconnect,
187+
// but currently this does not work with CAJ (you get an
188+
// IllegalStateException because the transport is not there
189+
// anymore). So, for now, we destroy the monitor during the
190+
// the connection callback.
191+
192+
// XXX: Ideally, we should just close (clear) the monitor, but
193+
// this would cause one last event to reach the monitorListener.
194+
// So, we remove the monitorListener right before the clear.
195+
196+
if (valueMonitor != null) {
197+
valueMonitor.removeMonitorListener(monitorListener);
198+
valueMonitor.clear();
199+
valueMonitor = null;
200+
}
201+
202+
valueMonitor = channel.addMonitor(valueTypeFor(channel), countFor(channel), caDataSource.getMonitorMask(),
203+
monitorListener);
204+
// Remove current metadata monitor
205+
if (metadataMonitor != null) {
206+
metadataMonitor.removeMonitorListener(metadataListener);
207+
metadataMonitor.clear();
208+
metadataMonitor = null;
209+
}
210+
211+
// Setup metadata monitor if required
212+
if (caDataSource.isDbePropertySupported() && metaType != null) {
213+
metadataMonitor = channel.addMonitor(metaType, 1, Monitor.PROPERTY, metadataListener);
214+
}
215+
216+
// Flush the entire context (it's the best we can do)
217+
channel.getContext().flushIO();
218+
}
219+
220+
private final MonitorListener monitorListener = new MonitorListener() {
221+
222+
@Override
223+
public void monitorChanged(MonitorEvent event) {
224+
synchronized (CAChannelHandler.this) {
225+
if (log.isLoggable(Level.FINEST)) {
226+
log.log(Level.FINEST, "JCA value monitorChanged for channel {0} value {1}, event {2}",
227+
new Object[] { getChannelName(), toStringDBR(event.getDBR()), event });
228+
}
229+
230+
DBR metadata = null;
231+
if (getLastMessagePayload() != null) {
232+
metadata = getLastMessagePayload().getMetadata();
233+
}
234+
processMessage(new CAMessagePayload(metadata, event));
235+
}
236+
}
237+
};
238+
239+
private final MonitorListener metadataListener = new MonitorListener() {
240+
241+
@Override
242+
public void monitorChanged(MonitorEvent ev) {
243+
synchronized (CAChannelHandler.this) {
244+
if (log.isLoggable(Level.FINEST)) {
245+
log.log(Level.FINEST, "JCA metadata monitorChanged for channel {0} event {1}",
246+
new Object[] { getChannelName(), ev });
247+
}
248+
249+
// In case the metadata arrives after the monitor
250+
MonitorEvent event = null;
251+
if (getLastMessagePayload() != null) {
252+
event = getLastMessagePayload().getEvent();
253+
}
254+
processMessage(new CAMessagePayload(ev.getDBR(), event));
255+
}
256+
}
257+
};
258+
259+
protected int countFor(Channel channel) {
260+
if (channel.getElementCount() == 1)
261+
return 1;
262+
263+
if (caDataSource.isVarArraySupported())
264+
return 0;
265+
else
266+
return channel.getElementCount();
267+
}
268+
269+
protected DBRType metadataFor(Channel channel) {
270+
DBRType type = channel.getFieldType();
271+
272+
if (type.isBYTE() || type.isSHORT() || type.isINT() || type.isFLOAT() || type.isDOUBLE())
273+
return DBR_CTRL_Double.TYPE;
274+
275+
if (type.isENUM())
276+
return DBR_LABELS_Enum.TYPE;
277+
278+
return null;
279+
}
280+
281+
static Pattern rtypeStringPattern = Pattern.compile(".+\\.RTYP.*");
282+
283+
protected DBRType valueTypeFor(Channel channel) {
284+
DBRType type = channel.getFieldType();
285+
286+
if (type.isBYTE()) {
287+
return DBR_TIME_Byte.TYPE;
288+
} else if (type.isSHORT()) {
289+
return DBR_TIME_Short.TYPE;
290+
} else if (type.isINT()) {
291+
return DBR_TIME_Int.TYPE;
292+
} else if (type.isFLOAT()) {
293+
return DBR_TIME_Float.TYPE;
294+
} else if (type.isDOUBLE()) {
295+
return DBR_TIME_Double.TYPE;
296+
} else if (type.isENUM()) {
297+
return DBR_TIME_Enum.TYPE;
298+
} else if (type.isSTRING()) {
299+
if (caDataSource.isRtypValueOnly() && rtypeStringPattern.matcher(channel.getName()).matches()) {
300+
return DBR_String.TYPE;
301+
}
302+
return DBR_TIME_String.TYPE;
303+
}
304+
305+
throw new IllegalArgumentException("Unsupported type " + type);
306+
}
307+
308+
private Exception createReadOnlyException() {
309+
return new RuntimeException("'" + getChannelName() + "' is read-only");
310+
}
311+
312+
private String toStringDBR(DBR value) {
313+
StringBuilder builder = new StringBuilder();
314+
if (value == null) {
315+
return "null";
316+
}
317+
if (value.getValue() instanceof double[]) {
318+
builder.append(Arrays.toString((double[]) value.getValue()));
319+
} else if (value.getValue() instanceof short[]) {
320+
builder.append(Arrays.toString((short[]) value.getValue()));
321+
} else if (value.getValue() instanceof String[]) {
322+
builder.append(Arrays.toString((String[]) value.getValue()));
323+
} else {
324+
builder.append(value.getValue().toString());
325+
}
326+
return builder.toString();
327+
}
328+
329+
/**
330+
* Converts a String into byte array.
331+
*
332+
* @param text the string to be converted
333+
* @return byte array, always including '\0' termination
334+
*/
335+
static byte[] toBytes(final String text) {
336+
// TODO: it's unclear what encoding is used and how
337+
338+
// Write string as byte array WITH '\0' TERMINATION!
339+
final byte[] bytes = new byte[text.length() + 1];
340+
System.arraycopy(text.getBytes(), 0, bytes, 0, text.length());
341+
bytes[text.length()] = '\0';
342+
return bytes;
343+
}
344+
345+
/**
346+
* Converts a byte array into a String. It
347+
*
348+
* @param data the array to be converted
349+
* @return the string
350+
*/
351+
static String toString(byte[] data) {
352+
int index = 0;
353+
while (index < data.length && data[index] != '\0') {
354+
index++;
355+
}
356+
357+
return new String(data, 0, index);
358+
}
359+
360+
}

0 commit comments

Comments
 (0)