|
17 | 17 |
|
18 | 18 | package systemic.sif3.demo.rest.consumer.functional; |
19 | 19 |
|
| 20 | +import java.io.BufferedWriter; |
| 21 | +import java.io.FileWriter; |
| 22 | +import java.util.Date; |
| 23 | +import java.util.HashMap; |
| 24 | + |
| 25 | +import javax.ws.rs.core.MediaType; |
| 26 | + |
20 | 27 | import org.slf4j.Logger; |
21 | 28 | import org.slf4j.LoggerFactory; |
22 | 29 |
|
| 30 | +import au.com.systemic.framework.utils.DateUtils; |
| 31 | +import sif3.common.model.EventMetadata; |
23 | 32 | import sif3.common.model.PagingInfo; |
| 33 | +import sif3.common.model.SIFContext; |
| 34 | +import sif3.common.model.SIFEvent; |
| 35 | +import sif3.common.model.SIFZone; |
24 | 36 | import sif3.common.model.delayed.DelayedResponseReceipt; |
25 | 37 | import sif3.common.model.job.PhaseInfo; |
26 | 38 | import sif3.common.utils.JAXBUtils; |
|
42 | 54 | public abstract class BaseFunctionalConsumer extends AbstractFunctionalServiceConsumer |
43 | 55 | { |
44 | 56 | protected final Logger logger = LoggerFactory.getLogger(getClass()); |
45 | | - |
| 57 | + |
| 58 | + private final static String RECORD_MARKER = "\n================================================================================================\n"; |
| 59 | + private HashMap<String, Integer> recordCounters = new HashMap<String, Integer>(); |
| 60 | + private static Boolean writePayload = null; |
| 61 | + |
46 | 62 | public BaseFunctionalConsumer() |
47 | 63 | { |
48 | 64 | super(); |
@@ -125,4 +141,81 @@ public void processDelayedError(ErrorDetails error, DelayedResponseReceipt recei |
125 | 141 | { |
126 | 142 | logger.debug("Received DELAYED ERROR Response:\n"+error+"\nDelayed Receipt Details:\n"+receipt); |
127 | 143 | } |
| 144 | + |
| 145 | + /* (non-Javadoc) |
| 146 | + * @see sif3.infra.rest.consumer.AbstractFunctionalServiceConsumer#processJobEvent(sif3.common.model.SIFEvent, sif3.common.model.SIFZone, sif3.common.model.SIFContext, sif3.common.model.EventMetadata, java.lang.String, java.lang.String) |
| 147 | + */ |
| 148 | + @Override |
| 149 | + public void processJobEvent(SIFEvent<JobCollectionType> sifEvent, SIFZone zone, SIFContext context, EventMetadata metadata, String msgReadID, String consumerID) |
| 150 | + { |
| 151 | + String consumerName = getPrettyName()+"(QueueID:"+msgReadID+"; ConsumerID: "+consumerID+")"; |
| 152 | + logger.debug(consumerName +" received an event from Zone = "+zone+", Context = "+context+" and Event Metadata = "+metadata); |
| 153 | + dumpJobEvent(sifEvent, zone, context, msgReadID, consumerID); |
| 154 | + } |
| 155 | + |
| 156 | + /*---------------------*/ |
| 157 | + /*-- Private Methods --*/ |
| 158 | + /*---------------------*/ |
| 159 | + |
| 160 | + private void dumpJobEvent(SIFEvent<JobCollectionType> sifEvent, SIFZone zone, SIFContext context, String msgReadID, String consumerID) |
| 161 | + { |
| 162 | +// logger.debug(consumerID +" write data to file..."); |
| 163 | + String filename = getFileNameOnly(consumerID); |
| 164 | + String fullFileName= getFullFileName(consumerID); |
| 165 | + Integer recordNum = recordCounters.get(filename); |
| 166 | + if (recordNum == null) |
| 167 | + { |
| 168 | + recordNum = 1; |
| 169 | + recordCounters.put(filename, recordNum); |
| 170 | + } |
| 171 | + String timestamp = DateUtils.getISO8601withSecFraction(new Date()); |
| 172 | + |
| 173 | + BufferedWriter out = null; |
| 174 | + try |
| 175 | + { |
| 176 | + FileWriter fstream = new FileWriter(fullFileName, true); |
| 177 | + out = new BufferedWriter(fstream); |
| 178 | + out.write(RECORD_MARKER); |
| 179 | + out.write("Record " + recordNum + " - processed by Thread ID = "+Thread.currentThread().getId()+"\n"+sifEvent.getListSize()+" "+sifEvent.getEventAction().name()+" Events from Queue Reader "+ msgReadID+"\nReceived at "+timestamp+" from Zone = "+zone.getId()+" and Context = "+context.getId()); |
| 180 | + out.write(RECORD_MARKER); |
| 181 | + |
| 182 | + if (writePayload == null) |
| 183 | + { |
| 184 | + writePayload = getServiceProperties().getPropertyAsBool("test.consumer.write.payload", true); |
| 185 | + } |
| 186 | + |
| 187 | + if (writePayload) |
| 188 | + { |
| 189 | + out.write(getMarshaller().marshal(sifEvent.getSIFObjectList(), MediaType.APPLICATION_XML_TYPE)); |
| 190 | + } |
| 191 | + recordNum++; |
| 192 | + recordCounters.put(filename, recordNum); |
| 193 | + } |
| 194 | + catch (Exception ex) |
| 195 | + { |
| 196 | + logger.error("Failed to write data to dump file with name:" + fullFileName, ex); |
| 197 | + } |
| 198 | + finally |
| 199 | + { |
| 200 | + if (out != null) |
| 201 | + { |
| 202 | + try |
| 203 | + { |
| 204 | + out.close(); |
| 205 | + } |
| 206 | + catch (Exception ex) {} // nothing we can do |
| 207 | + } |
| 208 | + } |
| 209 | + } |
| 210 | + |
| 211 | + private String getFullFileName(String consumerID) |
| 212 | + { |
| 213 | + return getServiceProperties().getPropertyAsString("test.tempDir.output","") + "/" + getFileNameOnly(consumerID); |
| 214 | + |
| 215 | + } |
| 216 | + |
| 217 | + private String getFileNameOnly(String consumerID) |
| 218 | + { |
| 219 | + return consumerID+".log"; |
| 220 | + } |
128 | 221 | } |
0 commit comments