Skip to content

Commit 4b58624

Browse files
authored
Merge pull request #81 from epics-base/cas_nameserver
CA server (and thus name server demo) now support searches via TCP
2 parents bca12fd + 3733444 commit 4b58624

8 files changed

Lines changed: 215 additions & 43 deletions

File tree

src/core/com/cosylab/epics/caj/cas/handlers/SearchResponse.java

Lines changed: 54 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.logging.Level;
2323

2424
import com.cosylab.epics.caj.cas.CAJServerContext;
25+
import com.cosylab.epics.caj.cas.CASTransport;
2526
import com.cosylab.epics.caj.cas.requests.SearchFailedRequest;
2627
import com.cosylab.epics.caj.cas.requests.SearchRequest;
2728
import com.cosylab.epics.caj.impl.CAConstants;
@@ -49,20 +50,37 @@ protected void internalHandleResponse(
4950
Transport transport,
5051
ByteBuffer[] response) {
5152

52-
ByteBuffer headerBuffer = response[0];
53-
final int start = headerBuffer.position();
53+
// For a UDP client, BroadcastTransport.processRead will call this with response=[buffer].
54+
// That one buffer holds version info, search header and search payload, positioned at start of payload:
55+
//
56+
// 00 00 00 00 00 01 00 0D 00 00 00 01 00 00 00 00 .... .... .... ....
57+
// 00 06 00 08 00 05 00 0D 00 00 00 01 00 00 00 01 .... .... .... ....
58+
// 54 45 53 54 00 00 00 00 TEST ....
59+
// ^^
60+
//
61+
// For a TCP client, CASTransport.processRead will call this with a response=[headerBuffer, payloadBuffer].
62+
// headerBuffer is positioned at the end, and a payloadBuffer positioned at the start:
63+
//
64+
// 00 06 00 08 00 05 00 0D 00 00 00 01 00 00 00 01 .... .... .... ....
65+
// ^^
66+
// 54 45 53 54 00 00 00 00 TEST ....
67+
// ^^
68+
// We need the payload, which holds the zero-padded channel name
69+
ByteBuffer buffer = response.length == 2 ? response[1] : response[0];
70+
71+
final int start = buffer.position();
5472
final int bufferEnd = start + payloadSize;
5573

5674
// to support multiple messages in one UDP packet
57-
headerBuffer.position(bufferEnd);
75+
buffer.position(bufferEnd);
5876

5977
// check channel name size
6078
if (payloadSize <= 1) {
6179
context.getLogger().fine("Empty channel name search request from: " + responseFrom);
6280
return;
6381
}
6482

65-
String channelName = extractString(headerBuffer, start, payloadSize, false);
83+
String channelName = extractString(buffer, start, payloadSize, false);
6684

6785
// empty name check
6886
if (channelName.length() == 0) {
@@ -92,8 +110,9 @@ protected void internalHandleResponse(
92110
if (completion == ProcessVariableExistanceCompletion.EXISTS_HERE ||
93111
completion == ProcessVariableExistanceCompletion.DOES_NOT_EXIST_HERE ||
94112
completion.doesExistElsewhere())
95-
{
96-
searchResponse(responseFrom, dataType, dataCount, parameter1, completion);
113+
{ // Reply via TCP?
114+
CASTransport tcp = transport instanceof CASTransport ? (CASTransport) transport : null;
115+
searchResponse(responseFrom, tcp, dataType, dataCount, parameter1, completion);
97116
}
98117
// in case of ProcessVariableExistanceCompletion.ASYNC_COMPLETION callback will call searchResponse method
99118
// in case of null, do nothing
@@ -102,12 +121,13 @@ protected void internalHandleResponse(
102121
/**
103122
* Respond to search response.
104123
* @param responseFrom
124+
* @param tcp Use TCP CASTransport?
105125
* @param dataType
106126
* @param dataCount
107127
* @param cid
108128
* @param completion
109129
*/
110-
private void searchResponse(InetSocketAddress responseFrom, short dataType, int dataCount, int cid, ProcessVariableExistanceCompletion completion) {
130+
private void searchResponse(InetSocketAddress responseFrom, CASTransport tcp, short dataType, int dataCount, int cid, ProcessVariableExistanceCompletion completion) {
111131

112132
//
113133
// ... respond
@@ -118,20 +138,40 @@ private void searchResponse(InetSocketAddress responseFrom, short dataType, int
118138
try
119139
{
120140
// TODO prepend version header (if context.getLastReceivedSequenceNumber() != 0)
121-
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), (short)dataCount, cid);
122-
context.getBroadcastTransport().send(searchRequest, responseFrom);
141+
// UDP includes payload (version) in reply, TCP has no payload
142+
if (tcp == null)
143+
{
144+
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), null, true, (short)dataCount, cid);
145+
context.getLogger().log(Level.FINE, "UDP EXISTS_HERE search reply");
146+
context.getBroadcastTransport().send(searchRequest, responseFrom);
147+
}
148+
else
149+
{
150+
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), null, false, (short)dataCount, cid);
151+
context.getLogger().log(Level.FINE, "TCP EXISTS_HERE search reply");
152+
tcp.send(searchRequest.getRequestMessage());
153+
}
123154
} catch (Throwable th) {
124155
context.getLogger().log(Level.WARNING, "Failed to send back search response to: " + responseFrom, th);
125156
}
126157
}
127158
else if (completion.doesExistElsewhere())
128159
{
129-
// send back
160+
// Same comments as for EXISTS_HERE
130161
try
131162
{
132-
// TODO prepend version header (if context.getLastReceivedSequenceNumber() != 0)
133-
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), completion.getOtherAddress(), (short)dataCount, cid);
134-
context.getBroadcastTransport().send(searchRequest, responseFrom);
163+
if (tcp == null)
164+
{
165+
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), completion.getOtherAddress(), true, (short)dataCount, cid);
166+
context.getLogger().log(Level.FINE, "UDP EXISTS_ELSEWHERE search reply: " + completion.getOtherAddress());
167+
context.getBroadcastTransport().send(searchRequest, responseFrom);
168+
}
169+
else
170+
{
171+
SearchRequest searchRequest = new SearchRequest(context.getBroadcastTransport(), completion.getOtherAddress(), false, (short)dataCount, cid);
172+
context.getLogger().log(Level.FINE, "TCP EXISTS_ELSEWHERE search reply: " + completion.getOtherAddress());
173+
tcp.send(searchRequest.getRequestMessage());
174+
}
135175
} catch (Throwable th) {
136176
context.getLogger().log(Level.WARNING, "Failed to send back search response to: " + responseFrom, th);
137177
}
@@ -197,7 +237,7 @@ public ProcessVariableExistanceCallbackImpl(InetSocketAddress responseFrom,
197237
* @see gov.aps.jca.cas.ProcessVariableExistanceCallback#processVariableExistanceTestCompleted(gov.aps.jca.cas.ProcessVariableExistanceCompletion)
198238
*/
199239
public void processVariableExistanceTestCompleted(ProcessVariableExistanceCompletion completion) {
200-
searchResponse(responseFrom, dataType, dataCount, cid, completion);
240+
searchResponse(responseFrom, null /* not TCP */, dataType, dataCount, cid, completion);
201241
}
202242

203243
/* (non-Javadoc)

src/core/com/cosylab/epics/caj/cas/handlers/VersionResponse.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,15 @@
1414

1515
package com.cosylab.epics.caj.cas.handlers;
1616

17+
import java.util.logging.Level;
18+
import java.util.logging.Logger;
1719
import java.net.InetSocketAddress;
1820
import java.nio.ByteBuffer;
1921

2022
import com.cosylab.epics.caj.cas.CAJServerContext;
2123
import com.cosylab.epics.caj.cas.CASTransport;
2224
import com.cosylab.epics.caj.impl.Transport;
25+
import com.cosylab.epics.caj.impl.CAConstants;
2326

2427
/**
2528
* Version (request) handler.
@@ -60,6 +63,26 @@ protected void internalHandleResponse(
6063
{
6164
// TCP only
6265
((CASTransport)transport).setPriority(dataType);
66+
67+
// By responding to client with version info we indicate support for TCP name search
68+
// https://docs.epics-controls.org/en/latest/internal/ca_protocol.html#tcp-search
69+
// "CA_PROTO_SEARCH messages MUST NOT be sent on a Circuit unless a CA_PROTO_VERSION message has been received indicating >= CA_V412."
70+
71+
ByteBuffer my_response = ByteBuffer.allocate(16);
72+
my_response.putShort((short)0); // Command: Version
73+
my_response.putShort((short)0); // Payload Size: nothing
74+
my_response.putShort((short)0); // Priority
75+
my_response.putShort(CAConstants.CA_MINOR_PROTOCOL_REVISION);
76+
my_response.putInt(0); // Reserved parameter 1
77+
my_response.putInt(0); // Reserved parameter 2
78+
try
79+
{
80+
((CASTransport)transport).send(my_response);
81+
}
82+
catch (Exception ex)
83+
{
84+
Logger.global.log(Level.WARNING, "Server cannot send version response", ex);
85+
}
6386
}
6487
}
6588

src/core/com/cosylab/epics/caj/cas/requests/SearchRequest.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,14 @@ public class SearchRequest extends AbstractCARequest {
3636

3737
private static final Logger logger = Logger.getLogger(SearchRequest.class.getName());
3838

39-
/**
40-
* @param transport Transport to use
41-
* @param clientMinorVersion
42-
* @param cid Channel ID
43-
*/
44-
public SearchRequest(Transport transport, short clientMinorVersion, int cid)
45-
{
46-
this(transport, null, clientMinorVersion, cid);
47-
}
48-
4939
/**
5040
* @param transport Transport to use
5141
* @param other_address Optional other address to report, null to use this transport
42+
* @param with_version Include minor version in payload (for UDP), or no payload (for TCP)?
5243
* @param clientMinorVersion
5344
* @param cid Channel ID
5445
*/
55-
public SearchRequest(Transport transport, InetSocketAddress other_address, short clientMinorVersion, int cid) {
46+
public SearchRequest(Transport transport, InetSocketAddress other_address, boolean with_version, short clientMinorVersion, int cid) {
5647
super(transport);
5748

5849
// add minor version payload (aligned by 8)
@@ -72,17 +63,20 @@ public SearchRequest(Transport transport, InetSocketAddress other_address, short
7263
: other_address.getAddress();
7364
if (serverAddress != null && !serverAddress.isAnyLocalAddress())
7465
serverIP = InetAddressUtil.ipv4AddressToInt(serverAddress);
75-
logger.log(Level.FINE, "Replying to search with " + serverAddress + ":" + port);
7666
}
7767

7868
requestMessage = insertCAHeader(transport, requestMessage,
79-
(short)6, (short)8, (short)port, 0,
69+
(short)6, (short)(with_version ? 8 : 0), (short)port, 0,
8070
serverIP, cid);
81-
82-
requestMessage.putShort(CAConstants.CAS_MINOR_PROTOCOL_REVISION);
83-
// clear rest of the message (security)
84-
requestMessage.putShort((short)0);
85-
requestMessage.putInt(0);
71+
if (with_version)
72+
{
73+
requestMessage.putShort(CAConstants.CAS_MINOR_PROTOCOL_REVISION);
74+
// clear rest of the message (security)
75+
requestMessage.putShort((short)0);
76+
requestMessage.putInt(0);
77+
logger.log(Level.FINE, "Replying to search with " + serverIP + ":" + port + ", minor " + CAConstants.CAS_MINOR_PROTOCOL_REVISION);
78+
}
79+
logger.log(Level.FINE, "Replying to search with " + serverIP + ":" + port + ", no payload");
8680
}
8781

8882
/**

src/core/com/cosylab/epics/caj/impl/BroadcastTransport.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,9 @@ protected void processWrite() {
237237
*/
238238
protected void send(ByteBuffer buffer)
239239
{
240+
// TCP-only search?
241+
if (broadcastAddresses == null)
242+
return;
240243
for (int i = 0; i < broadcastAddresses.length; i++)
241244
{
242245
try

src/core/com/cosylab/epics/caj/impl/handlers/SearchResponse.java

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.cosylab.epics.caj.impl.CATransport;
2626
import com.cosylab.epics.caj.impl.Transport;
2727
import com.cosylab.epics.caj.util.InetAddressUtil;
28+
import com.cosylab.epics.caj.util.HexDump;
2829

2930
/**
3031
* @author <a href="mailto:matej.sekoranjaATcosylab.com">Matej Sekoranja</a>
@@ -53,21 +54,69 @@ protected void internalHandleResponse(
5354
//
5455

5556
short minorVersion = CAConstants.CA_UNKNOWN_MINOR_PROTOCOL_REVISION;
57+
58+
// Search response via UDP: Only response[0]
59+
// Payload size 8, with only a 'short minor' in payload
60+
//
61+
// Hexdump [response[0] @ 16] size = 24
62+
// 00 06 00 08 13 C8 00 00 FF FF FF FF 00 00 00 01 .... .... .... ....
63+
// 00 0B 00 00 00 00 00 00
64+
// or
65+
// Hexdump [response[0] @ 32] size = 40
66+
// 00 00 00 00 00 01 00 0D 00 00 00 01 00 00 00 00 .... .... .... ....
67+
// 00 06 00 08 13 C8 00 00 FF FF FF FF 00 00 00 01 .... .... .... ....
68+
// 00 0D 00 00 00 00 00 00 .... ....
69+
70+
// Search response via TCP: Two buffers
71+
// No payload, second buffer is empty
72+
//
73+
// Hexdump [response[0] @ 16] size = 16
74+
// 00 06 00 00 13 C8 00 00 FF FF FF FF 00 00 00 01 .... .... .... ....
75+
// Hexdump [response[1] @ 0] size = 0
76+
77+
// Earlier CAJ server server added the 8 byte payload to TCP response,
78+
// which caused client to crash.
79+
// Now handle that as well in client.
80+
// Hexdump [response[0] @ 16] size = 16
81+
// 00 06 00 08 13 C8 00 00 FF FF FF FF 00 00 00 01 .... .... .... ....
82+
// Hexdump [response[1] @ 0] size = 8
83+
// 00 0B 00 00 00 00 00 00
84+
85+
// System.out.println("Client received search response");
86+
// System.out.println(response[0]);
87+
// if (response.length > 1)
88+
// System.out.println(response[1]);
89+
//
90+
// byte[] data = new byte[response[0].limit()];
91+
// for (int i=0; i<response[0].limit(); ++i)
92+
// data[i] = response[0].get(i);
93+
// HexDump.hexDump("response[0] @ " + response[0].position(), data, 0, response[0].limit());
94+
// if (response.length > 1)
95+
// {
96+
// data = new byte[response[1].limit()];
97+
// for (int i=0; i<response[1].limit(); ++i)
98+
// data[i] = response[1].get(i);
99+
// HexDump.hexDump("response[1] @ " + response[1].position(), data, 0, response[1].limit());
100+
// }
56101

57102
// Starting with CA V4.1 the minor version number is
58103
// appended to the end of each search reply.
59104
int payloadStart = response[0].position();
60105
if (payloadSize >= 2 /* short size = 2 bytes */)
61106
{
62-
// UDP response (all in buffer 0)
63-
minorVersion = response[0].getShort();
107+
// UDP response (all in buffer 0), or TCP (payload in buffer 1)?
108+
if (response.length == 1)
109+
minorVersion = response[0].getShort();
110+
else
111+
minorVersion = response[1].getShort();
64112
} else if(transport instanceof CATransport) {
65113
// for TCP transport use already provided version
66114
minorVersion = transport.getMinorRevision();
67115
}
68116

69117
// read rest of the playload (needed for UDP)
70-
response[0].position(payloadStart + payloadSize);
118+
if (response.length == 1)
119+
response[0].position(payloadStart + payloadSize);
71120

72121
// signed short conversion -> signed int
73122
int port = dataType & 0xFFFF;

test/com/cosylab/epics/caj/cas/test/CANameServer.java

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,18 +32,27 @@
3232
*
3333
* Example usage:
3434
*
35-
* In one terminal,
36-
* export EPICS_CA_SERVER_PORT=9876
37-
* then run an IOC database with a record named "ramp".
35+
* In one terminal, run an IOC database with a record named "ramp"
36+
* under a non-default UDP and TCP port:
37+
* export EPICS_CA_SERVER_PORT=9876
38+
* softIoc -d test/resources/ramp.db
3839
*
39-
* In another terminal,
40-
* caget ramp
41-
* will not be able to connect.
40+
* In another terminal, check that
41+
* caget ramp
42+
* will NOT be able to connect because it searches by default
43+
* via UDP port 5064, while the IOC runs on 9876 (UDP and TCP).
4244
*
4345
* Now run this CANameServer on the same host,
46+
* java -cp target/classes:target/test-classes -DCAJ_DEBUG=true com.cosylab.epics.caj.cas.test.CANameServer
4447
* and try `caget ramp` again.
45-
* It will reach the name server, which replies with 127.0.0.1, port 9876
46-
* to the seach request and client can then reach the IOC.
48+
* The client will reach the name server via UDP 5064.
49+
* The name server replies with 127.0.0.1, port 9876
50+
* to the search request and client can then reach the IOC.
51+
*
52+
* The CANameServer also supports searches via TCP:
53+
* export EPICS_CA_NAME_SERVERS=127.0.0.1
54+
* export EPICS_CA_AUTO_ADDR_LIST=no
55+
* caget ramp
4756
*/
4857
public class CANameServer
4958
{

0 commit comments

Comments
 (0)