2323import sif3 .common .header .HeaderValues .ServiceType ;
2424import sif3 .common .interfaces .EventConsumer ;
2525import sif3 .common .model .EventMetadata ;
26- import sif3 .common .model .SIFContext ;
2726import sif3 .common .model .SIFEvent ;
28- import sif3 .common .model .SIFZone ;
2927import sif3 .common .model .ServiceInfo ;
3028import sif3 .common .model .ServiceRights ;
3129import sif3 .common .model .ServiceRights .AccessRight ;
4442 */
4543public abstract class AbstractEventConsumer <L > extends AbstractConsumer implements EventConsumer <L >, Runnable
4644{
47- // private LocalConsumerQueue localConsumerQueue = null;
48- // private ExecutorService service = null;
49-
5045 /**
5146 * This method is called when a consumer service has received an event. This class does implement the actual onEvent( method
5247 * from the event interface. It may do some additional work for house keeping purpose so the original onEvent() id processed
5348 * as part of this class but then this method is called so that the actual consumer can do its work as required.
5449 *
55- * @param sifEvent The event data that has been received and shall be processed by the consumer.
56- * @param zone The zone from which the event has been received.
57- * @param context The context for which the event is applicable for .
50+ * @param sifEvent The event data that has been received and shall be processed by the consumer. This parameter also holds
51+ * the zone and context in the limitToZoneCtxList property. It will always only hold one entry in that
52+ * list. So the zone can be retrieved with the following call: sifEvent.getLimitToZoneCtxList().get(0).getZone() .
5853 * @param metadata Additional metadata that is known for the event. Typical values include custom HTTP headers, sourceName etc.
5954 * @param msgReadID The ID of the SIF queue reader. It is informative only and is only of use where there are multiple concurrent
6055 * subscribers on a message queue.
6156 * @param consumerID The consumer ID that has been used to receive the event from the event queue. It is informative
6257 * only and is only of use where there are multiple event subscribers enabled.
6358 */
64- public abstract void processEvent (SIFEvent <L > sifEvent , SIFZone zone , SIFContext context , EventMetadata metadata , String msgReadID , String consumerID );
59+ public abstract void processEvent (SIFEvent <L > sifEvent , EventMetadata metadata , String msgReadID , String consumerID );
6560
6661 public AbstractEventConsumer ()
6762 {
6863 super ();
69- // Only create local queue if event processing is enabled.
70- // if (getConsumerEnvironment().getEventsEnabled() && getConsumerEnvironment().getEventsSupported())
71- // {
72- // createLocalConsumerQueue();
73- // }
7464 }
7565
7666 /*
7767 * (non-Javadoc)
7868 *
79- * @see sif3.common.interfaces.EventConsumer#onEvent(sif3.common.model.SIFEvent, sif3.common.model.SIFZone, sif3.common.model.SIFContext , java.lang.String)
69+ * @see sif3.common.interfaces.EventConsumer#onEvent(sif3.common.model.SIFEvent, sif3.common.model.EventMetadata, java.lang.String , java.lang.String)
8070 */
8171 @ Override
82- public void onEvent (SIFEvent <L > sifEvent , SIFZone zone , SIFContext context , EventMetadata metadata , String msgReadID , String consumerID )
72+ // public void onEvent(SIFEvent<L> sifEvent, SIFZone zone, SIFContext context, EventMetadata metadata, String msgReadID, String consumerID)
73+ public void onEvent (SIFEvent <L > sifEvent , EventMetadata metadata , String msgReadID , String consumerID )
8374 {
8475 // Right now all that is required is to call the abstract processEvent() method.
85- processEvent (sifEvent , zone , context , metadata , msgReadID , consumerID );
76+ // processEvent(sifEvent, zone, context, metadata, msgReadID, consumerID);
77+ processEvent (sifEvent , metadata , msgReadID , consumerID );
8678 }
8779
8880 /*
@@ -93,12 +85,6 @@ public void onEvent(SIFEvent<L> sifEvent, SIFZone zone, SIFContext context, Even
9385 @ Override
9486 public void finalise ()
9587 {
96- // logger.debug("Shut down Event Consumer Thread Pool for "+getConsumerName());
97- // if (service != null)
98- // {
99- // service.shutdown();
100- // }
101-
10288 // Call user defined finalise of the subscriber.
10389 super .finalise ();
10490 }
@@ -116,20 +102,6 @@ public final void run()
116102// logger.debug("============================\n Start "+getConsumerName()+" worker thread.\n======================================");
117103 }
118104
119- // /*------------------------------*/
120- // /* Some Getter & Setter methods */
121- // /*------------------------------*/
122- //
123- // public final LocalConsumerQueue getLocalConsumerQueue()
124- // {
125- // return localConsumerQueue;
126- // }
127- //
128- // public final void setLocalConsumerQueue(LocalConsumerQueue localConsumerQueue)
129- // {
130- // this.localConsumerQueue = localConsumerQueue;
131- // }
132-
133105 /*----------------------------*/
134106 /*-- Other required methods --*/
135107 /*----------------------------*/
@@ -161,52 +133,7 @@ protected final List<ServiceInfo> getEventServices()
161133 return filterEventServices (sif3Session .getServiceInfoForService (getMultiObjectClassInfo ().getObjectName (), ServiceType .OBJECT , AccessRight .SUBSCRIBE , ServiceRights .AccessType .APPROVED ));
162134 }
163135
164- // public final int getNumOfConsumerThreads()
165- // {
166- // return getServiceProperties().getPropertyAsInt("consumer.local.workerThread", getClass().getSimpleName(), 1);
167- // }
168-
169- /**
170- * Only creates the local consumer queue if it doesn't already exist. The queue (new or existing) is then returned.
171- *
172- * @return See desc.
173- */
174- // public final LocalConsumerQueue createLocalConsumerQueue()
175- // {
176- // if (getLocalConsumerQueue() == null)
177- // {
178- // // Create local queue with the capacity indicated with the consumer config
179- // logger.debug("Create Local Queue for "+getConsumerName());
180- //
181- // // Use the local queue as a trigger of threads rather than actual queueing of messages. Use 1 as the minimum
182- // setLocalConsumerQueue(new LocalConsumerQueue(1, getClass().getSimpleName() + "LocalQueue", getClass().getSimpleName()));
183- // startListenerThreads();
184- // }
185- // return getLocalConsumerQueue();
186- // }
187-
188136 /*---------------------*/
189137 /*-- Private Methods --*/
190138 /*---------------------*/
191-
192- /*
193- * Will initialise the threads and add them to the local consumer queue.
194- */
195- // private void startListenerThreads()
196- // {
197- // // Start up all consumers for this subscriber.
198- // int numThreads = getNumOfConsumerThreads();
199- // logger.debug("Start "+numThreads+" "+getConsumerName()+" threads.");
200- // logger.debug("Total number of threads before starting Local Queue for "+getConsumerName()+" "+Thread.activeCount());
201- // service = Executors.newFixedThreadPool(numThreads);
202- // for (int i = 0; i < numThreads; i++)
203- // {
204- // String consumerID = getConsumerName()+" "+(i+1);
205- // logger.debug("Start Consumer "+consumerID);
206- // LocalMessageConsumer consumer = new LocalMessageConsumer(getLocalConsumerQueue(), consumerID, this);
207- // service.execute(consumer);
208- // }
209- // logger.debug(numThreads+" "+getConsumerName()+" initilaised and started.");
210- // logger.debug("Total number of threads after starting Local Queue for "+getConsumerName()+" "+Thread.activeCount());
211- // }
212- }
139+ }
0 commit comments