Skip to content

Commit c76027c

Browse files
author
Joerg Huber
committed
Manage NamedQuery Consumers.
1 parent 4debd5a commit c76027c

3 files changed

Lines changed: 50 additions & 10 deletions

File tree

SIF3InfraREST/SIF3REST/src/main/java/sif3/infra/rest/consumer/ConsumerLoader.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public class ConsumerLoader
7171
private List<AbstractEventConsumer<?>> eventConsumers = new ArrayList<AbstractEventConsumer<?>>();
7272
private List<AbstractConsumer> crudConsumers = new ArrayList<AbstractConsumer>();
7373
private List<AbstractFunctionalServiceConsumer> fsServiceConsumers = new ArrayList<AbstractFunctionalServiceConsumer>();
74+
private List<AbstractNamedQueryConsumer> nqConsumers = new ArrayList<AbstractNamedQueryConsumer>();
7475

7576
private List<QueueReaderInfo<RemoteMessageQueueReader>> msgReaderServices = new ArrayList<QueueReaderInfo<RemoteMessageQueueReader>>();
7677

@@ -233,6 +234,12 @@ private void shutdownConsumer()
233234
consumer.finalise();
234235
}
235236

237+
for (AbstractNamedQueryConsumer consumer : nqConsumers)
238+
{
239+
logger.debug("Shutdown " + consumer.getClass().getSimpleName());
240+
consumer.finalise();
241+
}
242+
236243
if (getConsumerEnvironment().getEventsEnabled() || (getConsumerEnvironment().getDelayedEnabled()))
237244
{
238245
logger.debug("Shutdown Event Subscription Connector...");
@@ -307,6 +314,11 @@ private boolean initAsyncProcessor()
307314
addServices(allLocalQueueCRUDServices, consumer.getAllApprovedCRUDServices(), consumer.getLocalConsumerQueue());
308315
}
309316

317+
for (AbstractNamedQueryConsumer consumer : nqConsumers)
318+
{
319+
addServices(allLocalQueueCRUDServices, consumer.getAllApprovedCRUDServices(), consumer.getLocalConsumerQueue());
320+
}
321+
310322
for (AbstractEventConsumer<?> consumer : eventConsumers)
311323
{
312324
addServices(allLocalQueueCRUDServices, consumer.getAllApprovedCRUDServices(), consumer.getLocalConsumerQueue());
@@ -400,7 +412,7 @@ private QueueReaderInfo<RemoteMessageQueueReader> startRemoteMessageReaderThread
400412
return queueReaderInfo;
401413
}
402414

403-
private void initialiseConsumers(AdvancedProperties adapterProps)
415+
private void initialiseConsumers(AdvancedProperties adapterProps)
404416
{
405417
List<String> classList = adapterProps.getFromCommaSeparated("consumer.classes");
406418
String basePackageName = makePackageName(adapterProps.getPropertyAsString("consumer.basePackageName", ""));
@@ -417,6 +429,9 @@ private void initialiseConsumers(AdvancedProperties adapterProps)
417429
// Instantiate class.
418430
Object classObj = ct.newInstance();
419431

432+
// Init local queues. Must be called here as it is not part of the constructor.
433+
((BaseConsumer)classObj).initLocalConsumerQueues();
434+
420435
// Set properties and add it to correct structure
421436
if (classObj instanceof AbstractEventConsumer)
422437
{
@@ -433,9 +448,14 @@ else if (classObj instanceof AbstractFunctionalServiceConsumer)
433448
logger.debug("Added " + classObj.getClass().getSimpleName() + " to fsServiceConsumers list");
434449
fsServiceConsumers.add((AbstractFunctionalServiceConsumer) classObj);
435450
}
451+
else if (classObj instanceof AbstractNamedQueryConsumer)
452+
{
453+
logger.debug("Added " + classObj.getClass().getSimpleName() + " to nqConsumers list");
454+
nqConsumers.add((AbstractNamedQueryConsumer) classObj);
455+
}
436456
else
437457
{
438-
logger.error("Consumer class " + className + " doesn't extend AbstractConsumer, AbstractEventConsumer of AbstractFunctionalServiceConsumer. Cannot initialse the Consumer.");
458+
logger.error("Consumer class " + className + " doesn't extend AbstractConsumer, AbstractEventConsumer, AbstractFunctionalServiceConsumer or AbstractNamedQueryConsumer. Cannot initialse the Consumer.");
439459
}
440460
}
441461
catch (Exception ex)

SIF3InfraREST/SIF3REST/src/main/java/sif3/infra/rest/queue/LocalMessageConsumer.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -306,15 +306,30 @@ private void processResponse(ResponseInfo responseInfo)
306306
{
307307
Object payloadObject = null;
308308

309-
if (responseInfo.getServiceType() == ServiceType.FUNCTIONAL)
309+
switch (responseInfo.getServiceType())
310310
{
311-
payloadObject = makeFunctionalServiceObject(responseInfo.getPayload(), responseInfo.getMediaType(), isPhaseObject, false);
312-
}
313-
else
314-
{
315-
payloadObject = makeDataModelObject(minimalConsumer, responseInfo.getPayload(), responseInfo.getMediaType());
311+
case OBJECT: // no code here as it is the same as the SERVICEPATH below!
312+
case SERVICEPATH:
313+
{
314+
payloadObject = makeDataModelObject(minimalConsumer, responseInfo.getPayload(), responseInfo.getMediaType());
315+
break;
316+
}
317+
case FUNCTIONAL:
318+
{
319+
payloadObject = makeFunctionalServiceObject(responseInfo.getPayload(), responseInfo.getMediaType(), isPhaseObject, false);
320+
break;
321+
}
322+
case XQUERYTEMPLATE:
323+
{
324+
// Payload must remain a string. So there is no action required for this service type!
325+
break;
326+
}
327+
default:
328+
{
329+
logger.error("Received a Query Response for a Servic Type ("+responseInfo.getServiceType()+") that is not yet supported with this framework. Ignore message:\n"+responseInfo);
330+
}
316331
}
317-
332+
318333
if (payloadObject instanceof ErrorDetails)
319334
{
320335
// Something is not good at all. We send the error to the consumer.
@@ -336,6 +351,11 @@ private void processResponse(ResponseInfo responseInfo)
336351
delayedConsumer.onServicePath(payloadObject, new QueryCriteria(responseInfo.getUrlService()), paging, delayedReceipt);
337352
break;
338353
}
354+
case XQUERYTEMPLATE:
355+
{
356+
delayedConsumer.onQuery(payloadObject, paging, delayedReceipt);
357+
break;
358+
}
339359
case FUNCTIONAL:
340360
{
341361
if (!isPhaseObject)

SIF3InfraREST/SIF3REST/src/main/java/sif3/infra/rest/queue/RemoteMessageQueueReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ private void processResponse(Response response)
339339
{
340340
logger.info("Received a DELAYED Response for which there is no consumer registered. Discard the following RESPONSE:\n" + response);
341341
}
342-
else // Create event object and send it to eventConsumer
342+
else // Create message object and send it to blocking queue
343343
{
344344
logger.debug(getReaderID()+": Attempts to push DELAYED Response to local queue...");
345345
localQueue.blockingPush(responseInfo);

0 commit comments

Comments
 (0)