Skip to content

Commit 12349a5

Browse files
committed
#23 adding write support for the channel access datasource
1 parent f63dee6 commit 12349a5

5 files changed

Lines changed: 168 additions & 14 deletions

File tree

gpclient/gpclient-ca/pom.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@
2121
<artifactId>gpclient-core</artifactId>
2222
<version>${project.version}</version>
2323
</dependency>
24+
<dependency>
25+
<groupId>org.epics</groupId>
26+
<artifactId>vtype</artifactId>
27+
<version>${project.version}</version>
28+
</dependency>
29+
<dependency>
30+
<groupId>org.epics</groupId>
31+
<artifactId>epics-util</artifactId>
32+
<version>${project.version}</version>
33+
</dependency>
2434
</dependencies>
2535
<build>
2636
<plugins>

gpclient/gpclient-ca/src/main/java/org/epics/gpclient/datasource/ca/CAChannelHandler.java

Lines changed: 114 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,26 @@
66

77
import static org.epics.gpclient.datasource.ca.CADataSource.log;
88

9+
import static org.epics.util.array.UnsafeUnwrapper.wrappedArray;
10+
import static org.epics.util.array.UnsafeUnwrapper.wrappedDoubleArray;
11+
912
import java.util.Arrays;
13+
import java.util.LinkedList;
1014
import java.util.logging.Level;
1115
import java.util.regex.Pattern;
1216

1317
import org.epics.gpclient.ReadCollector;
18+
import org.epics.gpclient.WriteCollector.WriteRequest;
1419
import org.epics.gpclient.datasource.MultiplexedChannelHandler;
1520
import org.epics.gpclient.datasource.ca.types.CATypeAdapter;
21+
import org.epics.util.array.CollectionNumbers;
22+
import org.epics.util.array.ListNumber;
23+
import org.epics.vtype.VByte;
24+
import org.epics.vtype.VDouble;
25+
import org.epics.vtype.VFloat;
26+
import org.epics.vtype.VInt;
27+
import org.epics.vtype.VLong;
28+
import org.epics.vtype.VShort;
1629

1730
import gov.aps.jca.CAException;
1831
import gov.aps.jca.Channel;
@@ -74,7 +87,8 @@ protected void connect() {
7487
channel = caDataSource.getContext().createChannel(getChannelName(), connectionListener, (short) (Channel.PRIORITY_MIN + 1));
7588
}
7689
} catch (CAException ex) {
77-
throw new RuntimeException("JCA Connection failed", ex);
90+
reportExceptionToAllReadersAndWriters(ex);
91+
log.log(Level.WARNING, "JCA Connection failed", ex);
7892
}
7993
}
8094

@@ -89,7 +103,8 @@ protected void disconnect() {
89103
channel.destroy();
90104
}
91105
} catch (CAException ex) {
92-
throw new RuntimeException("JCA Disconnect fail", ex);
106+
reportExceptionToAllReadersAndWriters(ex);
107+
log.log(Level.WARNING, "JCA Disconnect fail", ex);
93108
} finally {
94109
channel = null;
95110
processConnection(null);
@@ -265,6 +280,103 @@ public void monitorChanged(MonitorEvent ev) {
265280
}
266281
};
267282

283+
@Override
284+
protected void write(Object newValue) {
285+
// If it's a ListNumber, extract the array
286+
if (newValue instanceof ListNumber) {
287+
ListNumber data = (ListNumber) newValue;
288+
Object wrappedArray = wrappedArray(data);
289+
if (wrappedArray == null) {
290+
newValue = wrappedDoubleArray(data);
291+
} else {
292+
newValue = wrappedArray;
293+
}
294+
}
295+
try {
296+
if (newValue instanceof Double[]) {
297+
log.warning("You are writing a Double[] to channel " + getChannelName()
298+
+ ": use org.epics.util.array.ListDouble instead");
299+
final Double dbl[] = (Double[]) newValue;
300+
final double val[] = new double[dbl.length];
301+
for (int i = 0; i < val.length; ++i) {
302+
val[i] = dbl[i].doubleValue();
303+
}
304+
newValue = val;
305+
}
306+
if (newValue instanceof Integer[]) {
307+
log.warning("You are writing a Integer[] to channel " + getChannelName()
308+
+ ": use org.epics.util.array.ListInt instead");
309+
final Integer ival[] = (Integer[]) newValue;
310+
final int val[] = new int[ival.length];
311+
for (int i = 0; i < val.length; ++i) {
312+
val[i] = ival[i].intValue();
313+
}
314+
newValue = val;
315+
}
316+
317+
if (newValue instanceof String) {
318+
if (isLongString()) {
319+
channel.put(toBytes(newValue.toString()));
320+
} else {
321+
if (channel.getFieldType().isBYTE() && channel.getElementCount() > 1) {
322+
log.warning("You are writing the String " + newValue + " to BYTE channel " + getChannelName()
323+
+ ": use {\"longString\":true} for support");
324+
channel.put(toBytes(newValue.toString()));
325+
} else {
326+
channel.put(newValue.toString());
327+
}
328+
}
329+
} else if (newValue instanceof byte[]) {
330+
channel.put((byte[]) newValue);
331+
} else if (newValue instanceof short[]) {
332+
channel.put((short[]) newValue);
333+
} else if (newValue instanceof int[]) {
334+
channel.put((int[]) newValue);
335+
} else if (newValue instanceof float[]) {
336+
channel.put((float[]) newValue);
337+
} else if (newValue instanceof double[]) {
338+
channel.put((double[]) newValue);
339+
} else if (newValue instanceof VByte) {
340+
channel.put(((VByte) newValue).getValue());
341+
} else if (newValue instanceof VShort) {
342+
channel.put(((VShort) newValue).getValue());
343+
} else if (newValue instanceof VInt) {
344+
channel.put(((VInt)newValue).getValue());
345+
} else if (newValue instanceof VLong) {
346+
// XXX: Channel access does not support 64 bit integers
347+
// If fits 32 bits, use int. Use double otherwise
348+
long value64 = ((VLong) newValue).getValue();
349+
int value32 = (int) value64;
350+
if (value32 == value64) {
351+
channel.put(value32);
352+
} else {
353+
channel.put((double) value64);
354+
}
355+
} else if (newValue instanceof VFloat) {
356+
channel.put(((VFloat) newValue).getValue());
357+
} else if (newValue instanceof VDouble) {
358+
channel.put(((VDouble) newValue).getValue());
359+
} else {
360+
// callback.channelWritten(new Exception(new RuntimeException("Unsupported type
361+
// for CA: " + newValue.getClass())));
362+
return;
363+
}
364+
caDataSource.getContext().flushIO();
365+
} catch (Exception e) {
366+
throw new RuntimeException(e);
367+
}
368+
}
369+
370+
@Override
371+
protected void processWriteRequest(WriteRequest<?> request) {
372+
try {
373+
write(request.getValue());
374+
request.writeSuccessful();
375+
} catch (Exception ex) {
376+
request.writeFailed(ex);
377+
}
378+
}
379+
268380
protected int countFor(Channel channel) {
269381
if (channel.getElementCount() == 1)
270382
return 1;

gpclient/gpclient-ca/src/main/java/org/epics/gpclient/datasource/ca/CADataSource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ public boolean isHonorZeroPrecision() {
103103

104104
@Override
105105
protected ChannelHandler createChannel(String channelName) {
106+
log.log(Level.INFO, "CREATE channel " + channelName);
106107
return new CAChannelHandler(channelName, this);
107108
}
108109

gpclient/gpclient-ca/src/test/java/org/epics/gpclient/datasource/ca/CAChannelTest.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,24 @@
22

33
import java.time.Duration;
44
import java.util.List;
5+
import java.util.function.Function;
56
import java.util.logging.Logger;
67

78
import org.epics.gpclient.GPClientConfiguration;
89
import org.epics.gpclient.GPClientInstance;
10+
import org.epics.gpclient.PV;
911
import org.epics.gpclient.PVEvent;
1012
import org.epics.gpclient.PVEventRecorder;
1113
import org.epics.gpclient.PVReader;
14+
import org.epics.gpclient.PVWriter;
15+
import org.epics.gpclient.PVWriterListener;
1216
import org.epics.gpclient.ProbeCollector;
1317
import org.epics.gpclient.datasource.DataSource;
1418
import org.epics.gpclient.datasource.DataSourceProvider;
19+
import org.epics.vtype.Alarm;
20+
import org.epics.vtype.Display;
21+
import org.epics.vtype.Time;
22+
import org.epics.vtype.VDouble;
1523
import org.epics.vtype.VType;
1624
import org.junit.AfterClass;
1725
import org.junit.BeforeClass;
@@ -52,18 +60,42 @@ public static void teardown() {
5260
}
5361
}
5462

63+
@SuppressWarnings("rawtypes")
5564
@Test
5665
public void createSimpleChannel() throws InterruptedException {
5766
ProbeCollector probe = ProbeCollector.create();
5867
PVEventRecorder recorder = probe.getRecorder();
5968
PVReader<VType> pv = gpClient.read("ca://test_double_0").addListener(recorder).start();
60-
recorder.wait(500, recorder.forAConnectionEvent());
61-
recorder.wait(50, recorder.anEventOfType(PVEvent.Type.VALUE));
69+
recorder.wait(500, PVEventRecorder.forAConnectionEvent());
70+
recorder.wait(50, PVEventRecorder.anEventOfType(PVEvent.Type.VALUE));
6271
pv.close();
6372
Thread.sleep(1000);
64-
List<PVEvent> events = recorder.getEvents();
65-
events.stream().forEachOrdered(event -> {
66-
System.out.println(event.toString());
73+
}
74+
75+
@SuppressWarnings("rawtypes")
76+
@Test
77+
public void createSimpleWriteChannel() throws InterruptedException {
78+
ProbeCollector probe = ProbeCollector.create();
79+
PVEventRecorder recorder = probe.getRecorder();
80+
PV<VType, Object> pv = gpClient.readAndWrite("ca://test_double_0").addListener(recorder).start();
81+
recorder.wait(500, PVEventRecorder.forAConnectionEvent());
82+
recorder.wait(50, PVEventRecorder.anEventOfType(PVEvent.Type.VALUE));
83+
pv.write(VDouble.of(1.0, Alarm.none(), Time.now(), Display.none()));
84+
Thread.sleep(1000);
85+
pv.write(VDouble.of(2.0, Alarm.none(), Time.now(), Display.none()));
86+
Thread.sleep(1000);
87+
pv.write(VDouble.of(3.0, Alarm.none(), Time.now(), Display.none()));
88+
Thread.sleep(1000);
89+
pv.write(VDouble.of(4.0, Alarm.none(), Time.now(), Display.none()));
90+
Thread.sleep(1000);
91+
pv.write(VDouble.of(5.0, Alarm.none(), Time.now(), Display.none()));
92+
Thread.sleep(1000);
93+
pv.close();
94+
Thread.sleep(1000);
95+
recorder.hasReceived(events -> {
96+
return events.stream().filter(event -> {
97+
return event.isType(PVEvent.Type.WRITE_SUCCEEDED);
98+
}).count() == 5;
6799
});
68100
}
69101
}

gpclient/gpclient-ca/src/test/java/org/epics/gpclient/datasource/ca/InMemoryCAServer.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public static void closeServerInstance() {
4747

4848
private static final Random generator = new Random();
4949

50-
private static void initialize() {
50+
static synchronized void initialize() {
5151

5252
// Get the JCALibrary instance.
5353
JCALibrary jca = JCALibrary.getInstance();
@@ -72,7 +72,7 @@ private static void initialize() {
7272
initialized.set(true);
7373
}
7474

75-
private static void registerProcessVariables(DefaultServerImpl server) {
75+
static void registerProcessVariables(DefaultServerImpl server) {
7676
for (int i = 0; i < 10000; i++) {
7777
createDoubleProcessVariable("test_double_" + i, generator.doubles(-10, 10).findAny().getAsDouble(), server);
7878
createIntProcessVariable("test_int_" + i, generator.ints(-10, 10).findAny().getAsInt(), server);
@@ -81,7 +81,7 @@ private static void registerProcessVariables(DefaultServerImpl server) {
8181
}
8282
}
8383

84-
private static void createDoubleProcessVariable(String name, double value, DefaultServerImpl server) {
84+
static void createDoubleProcessVariable(String name, double value, DefaultServerImpl server) {
8585
// PV supporting all GR/CTRL info
8686
MemoryProcessVariable mpv = new MemoryProcessVariable(name, null, DBR_Double.TYPE, new double[] { value });
8787

@@ -103,7 +103,7 @@ private static void createDoubleProcessVariable(String name, double value, Defau
103103
server.registerProcessVaribale(mpv);
104104
}
105105

106-
private static void createIntProcessVariable(String name, int value, DefaultServerImpl server) {
106+
static void createIntProcessVariable(String name, int value, DefaultServerImpl server) {
107107
// PV supporting all GR/CTRL info
108108
MemoryProcessVariable mpv = new MemoryProcessVariable(name, null, DBR_Int.TYPE, new int[] { value });
109109

@@ -124,9 +124,8 @@ private static void createIntProcessVariable(String name, int value, DefaultServ
124124

125125
server.registerProcessVaribale(mpv);
126126
}
127-
128127

129-
private static void createFloatProcessVariable(String name, float value, DefaultServerImpl server) {
128+
static void createFloatProcessVariable(String name, float value, DefaultServerImpl server) {
130129
// PV supporting all GR/CTRL info
131130
MemoryProcessVariable mpv = new MemoryProcessVariable(name, null, DBR_Float.TYPE, new float[] { value });
132131

@@ -148,7 +147,7 @@ private static void createFloatProcessVariable(String name, float value, Default
148147
server.registerProcessVaribale(mpv);
149148
}
150149

151-
private static void createStringProcessVariable(String name, String value, DefaultServerImpl server) {
150+
static void createStringProcessVariable(String name, String value, DefaultServerImpl server) {
152151
// PV supporting all GR/CTRL info
153152
MemoryProcessVariable mpv = new MemoryProcessVariable(name, null, DBR_String.TYPE, new String[] { value });
154153
server.registerProcessVaribale(mpv);

0 commit comments

Comments
 (0)