Separate out records changed message for collaboration context
Enhancing the implementation of this ADR by completely separating out message processing mechanism for new records changed topic. We've introduced a new model to support the new records changed messages and refactored the shared code so that CSPs that have not implemented the collaboration provider interfaces will keep using the original method (we reverted this back to not include the collaboration context) until they adopt it
The motivation for doing this is to be able to retrieve 'version' and 'modifiedBy' information for message listeners, so we have made a hard separation with a new model and a new publish message interface.
Existing behavior of the system should not be affected with this change.
Merge request reports
Activity
64 60 .serviceBusTopicName(serviceBusConfig.getServiceBusTopic()) 65 61 .build(); 66 62 67 messagePublisher.publishMessage(headers, publisherInfo, collaborationContext); 63 messagePublisher.publishMessage(headers, publisherInfo, Optional.empty()); 68 64 } 69 65 } 70 66 71 67 72 public void publishMessageToRecordsTopicV2(Optional<CollaborationContext> collaborationContext, DpsHeaders headers, PubSubInfo... messages) { 64 60 .serviceBusTopicName(serviceBusConfig.getServiceBusTopic()) 65 61 .build(); 66 62 67 messagePublisher.publishMessage(headers, publisherInfo, collaborationContext); 63 messagePublisher.publishMessage(headers, publisherInfo, Optional.empty()); 68 64 } 69 65 } 70 66 71 67 72 public void publishMessageToRecordsTopicV2(Optional<CollaborationContext> collaborationContext, DpsHeaders headers, PubSubInfo... messages) { 68 public void publishMessage(Optional<CollaborationContext> collaborationContext, DpsHeaders headers, RecordChangedV2... messages) { 73 69 // The batch size is same for both Event grid and Service bus. 74 70 final int BATCH_SIZE = Integer.parseInt(publisherConfig.getPubSubBatchSize()); 75 71 for (int i = 0; i < messages.length; i += BATCH_SIZE) { 76 72 String messageId = String.format("%s-%d",headers.getCorrelationId(), i); 50 52 private LegalTagCache legalTagCache; 51 53 @Autowired 52 54 private MessageBusImpl pubSubclient; 55 @Autowired 56 private IFeatureFlag iCollaborationFeatureFlag; 81 88 for (RecordMetadata recordMetadata : recordsMetadata) { 82 89 recordIds.add(recordMetadata.getId()); 83 90 } 84 this.pubSubclient.publishMessage(Optional.empty(), headers, pubsubInfos); 91 if (iCollaborationFeatureFlag.isFeatureEnabled(COLLABORATIONS_FEATURE_NAME)) { 92 this.pubSubclient.publishMessage(Optional.empty(), headers, recordsChangedV2s); 93 } 94 if (!collaborationContext.isPresent()) { 95 this.pubSubclient.publishMessage(headers, pubsubInfos); 96 } This was a bit confusing for me to read. Personally, I'd prefer
if(!context) { publishToV1(); if(ffEnabled()) { publishToV2(); } } else { publishToV2(); }
Edited by Alok JoshiThank you for the suggestion. My thought process was that we should always publish it to V2 regardless of the context and keep the original functionality untouched (which is publish to V1) if the context is not provided. This way we are not creating a dependency between the feature flag and context and may also be easier to cleanup the feature flag later. Please let me know if the nested conditioning is still better to understand, I will be happy to update it.
Edited by Larissa Pereira
81 88 for (RecordMetadata recordMetadata : recordsMetadata) { 82 89 recordIds.add(recordMetadata.getId()); 83 90 } 84 this.pubSubclient.publishMessage(Optional.empty(), headers, pubsubInfos); 91 if (iCollaborationFeatureFlag.isFeatureEnabled(COLLABORATIONS_FEATURE_NAME)) { 92 this.pubSubclient.publishMessage(Optional.empty(), headers, recordsChangedV2s); 93 } 94 if (!collaborationContext.isPresent()) { 79 recordChangedV2[i] = RecordChangedV2.builder().id(recordMetadata.getId()).version(recordMetadata.getLatestVersion()).kind(recordMetadata.getKind()).op(OperationType.update).recordBlocks(processing.getRecordBlocks()).build(); 73 80 if (!Strings.isNullOrEmpty(processing.getRecordMetadata().getPreviousVersionKind())) { 74 81 pubsubInfo[i].setPreviousVersionKind(processing.getRecordMetadata().getPreviousVersionKind()); 82 recordChangedV2[i].setPreviousVersionKind(processing.getRecordMetadata().getPreviousVersionKind()); 75 83 } 76 84 } 77 85 } 78 86 79 87 this.commitBatch(recordsProcessing, recordsMetadata, collaborationContext); 80 this.pubSubClient.publishMessage(collaborationContext, this.headers, pubsubInfo); 88 if (iCollaborationFeatureFlag.isFeatureEnabled(COLLABORATIONS_FEATURE_NAME)) { 89 this.pubSubClient.publishMessage(collaborationContext, this.headers, recordChangedV2); 90 } 91 if (!collaborationContext.isPresent()) { 92 this.pubSubClient.publishMessage(this.headers, pubsubInfo); 93 } 85 88 86 89 @Autowired 87 90 private StorageAuditLogger auditLogger; 91 @Autowired 92 private IFeatureFlag iCollaborationFeatureFlag; 93 private static final String COLLABORATIONS_FEATURE_NAME="collaborations-enabled"; maybe we can define this in a common file so there is less duplication of declaration
https://dev.azure.com/slb-digital-us/osdu/_git/collaborations?path=/collaborations-core/src/main/java/org/opengroup/osdu/collaborations/utils/StringConstants.java something like this
Would you recommend creating a StringConstants file for storage? There was a similar discussion for notification service where we agreed, we could add this constant to the core-common Constants file since it is being used by multiple services. We only let it go for now to avoid another core-common MR. Please let me know your thoughts ?
yeah just create one in Storage for now, I don't think its a big deal if multiple services end up have a similar config for constants
Edited by Alok Joshi
assigned to @ajoshi19
added 1 commit
- 171e56a2 - add modifiedBy property to the RecordChangedV2 model
77 .messageId(messageId) 78 .build(); 63 PublisherInfo publisherInfo = getPublisherInfo(); 64 publisherInfo.setBatch(batch); 65 publisherInfo.setMessageId(messageId); 79 66 messagePublisher.publishMessage(headers, publisherInfo, collaborationContext); 80 67 } 81 68 } 69 70 private PublisherInfo getPublisherInfo() { 71 return PublisherInfo.builder() 72 .eventGridTopicName(eventGridConfig.getEventGridTopic()) 73 .eventGridEventSubject(eventGridConfig.getEventSubject()) 74 .eventGridEventType(eventGridConfig.getEventType()) 75 .eventGridEventDataVersion(eventGridConfig.getEventDataVersion()) 76 .serviceBusTopicName(serviceBusConfig.getServiceBusRecordsEventTopic()) - Resolved by Yurii Kondakov
- Resolved by Yurii Kondakov
requested review from @kelham
added Azure Common Code labels
added MREnhancement label
changed milestone to %M16 - Release 0.19