|
25 | 25 | import org.zstack.header.core.FutureReturnValueCompletion; |
26 | 26 | import org.zstack.header.core.NoErrorCompletion; |
27 | 27 | import org.zstack.header.core.NopeNoErrorCompletion; |
| 28 | +import org.zstack.header.core.cloudbus.CloudBusExtensionPoint; |
28 | 29 | import org.zstack.header.errorcode.ErrorCode; |
29 | 30 | import org.zstack.header.errorcode.OperationFailureException; |
30 | 31 | import org.zstack.header.errorcode.SysErrors; |
@@ -86,6 +87,7 @@ public class CloudBusImpl3 implements CloudBus, CloudBusIN { |
86 | 87 | private List<Service> services = new ArrayList<>(); |
87 | 88 | private Map<Class, List<ReplyMessagePreSendingExtensionPoint>> replyMessageMarshaller = new ConcurrentHashMap<Class, List<ReplyMessagePreSendingExtensionPoint>>(); |
88 | 89 | private List<RestAPIExtensionPoint> apiExts = new ArrayList<>(); |
| 90 | + private List<CloudBusExtensionPoint> msgExts = new ArrayList<>(); |
89 | 91 |
|
90 | 92 | private Map<Class, List<BeforeDeliveryMessageInterceptor>> beforeDeliveryMessageInterceptors = new HashMap<Class, List<BeforeDeliveryMessageInterceptor>>(); |
91 | 93 | private Map<Class, List<BeforeSendMessageInterceptor>> beforeSendMessageInterceptors = new HashMap<Class, List<BeforeSendMessageInterceptor>>(); |
@@ -328,6 +330,7 @@ public void timeout() { |
328 | 330 | }; |
329 | 331 |
|
330 | 332 | envelopes.put(msg.getId(), e); |
| 333 | + msgExts.forEach(m -> m.afterAddEnvelopes(msg.getId())); |
331 | 334 | send(msg, false); |
332 | 335 | } |
333 | 336 |
|
@@ -1056,6 +1059,7 @@ public void installBeforePublishEventInterceptor(BeforePublishEventInterceptor i |
1056 | 1059 | private void populateExtension() { |
1057 | 1060 | services = pluginRgty.getExtensionList(Service.class); |
1058 | 1061 | apiExts = pluginRgty.getExtensionList(RestAPIExtensionPoint.class); |
| 1062 | + msgExts = pluginRgty.getExtensionList(CloudBusExtensionPoint.class); |
1059 | 1063 | services.forEach(serv->{ |
1060 | 1064 | assert serv.getId() != null : String.format("service id can not be null[%s]", serv.getClass().getName()); |
1061 | 1065 | registerService(serv); |
@@ -1201,4 +1205,9 @@ public void handleHttpRequest(HttpEntity<String> e, HttpServletResponse rsp) { |
1201 | 1205 | logger.warn(String.format("unable to deliver a message received from HTTP. HTTP body: %s", e.getBody()), t); |
1202 | 1206 | } |
1203 | 1207 | } |
| 1208 | + |
| 1209 | + @Override |
| 1210 | + public int getEnvelopeSize() { |
| 1211 | + return envelopes.size(); |
| 1212 | + } |
1204 | 1213 | } |
0 commit comments