Skip to content
Snippets Groups Projects

Separate out records changed message for collaboration context

Merged Alok Joshi requested to merge add-version-info-collab into master

Enhancing the implementation of this ADR by completely separating out message processing mechanism for new records changed topic. We've introduced a new model to support the new records changed messages and refactored the shared code so that CSPs that have not implemented the collaboration provider interfaces will keep using the original method (we reverted this back to not include the collaboration context) until they adopt it

The motivation for doing this is to be able to retrieve 'version' and 'modifiedBy' information for message listeners, so we have made a hard separation with a new model and a new publish message interface.

Existing behavior of the system should not be affected with this change.

Edited by Alok Joshi

Merge request reports

Merge request pipeline #165388 failed

Pipeline: Storage

#165389

    Merge request pipeline failed for ec82e00e

    Approval is optional

    Merged by Alok JoshiAlok Joshi 2 years ago (Feb 13, 2023 2:47pm UTC)

    Merge details

    Pipeline #166393 failed

    Pipeline failed for d6d39c93 on master

    4 environments impacted.

    Activity

    Filter activity
    • Approvals
    • Assignees & reviewers
    • Comments (from bots)
    • Comments (from users)
    • Commits & branches
    • Edits
    • Labels
    • Lock status
    • Mentions
    • Merge request status
    • Tracking
    64 60 .serviceBusTopicName(serviceBusConfig.getServiceBusTopic())
    65 61 .build();
    66 62
    67 messagePublisher.publishMessage(headers, publisherInfo, collaborationContext);
    63 messagePublisher.publishMessage(headers, publisherInfo, Optional.empty());
    68 64 }
    69 65 }
    70 66
    71 67
    72 public void publishMessageToRecordsTopicV2(Optional<CollaborationContext> collaborationContext, DpsHeaders headers, PubSubInfo... messages) {
    • Author Maintainer

      lets keep the order of input parameters similar to original method publishMessageToRecordsTopicV2(headers, messages, collaborationContext)

    • There is a requirement in Java that- "Vararg parameter (PubSubInfo... or RecordChangedV2...) need to be last in the parameter list". Hence I am unable to place another parameter after messages

    • Please register or sign in to reply
  • Alok Joshi
    Alok Joshi @ajoshi19 started a thread on commit 3821c5ed
  • 64 60 .serviceBusTopicName(serviceBusConfig.getServiceBusTopic())
    65 61 .build();
    66 62
    67 messagePublisher.publishMessage(headers, publisherInfo, collaborationContext);
    63 messagePublisher.publishMessage(headers, publisherInfo, Optional.empty());
    68 64 }
    69 65 }
    70 66
    71 67
    72 public void publishMessageToRecordsTopicV2(Optional<CollaborationContext> collaborationContext, DpsHeaders headers, PubSubInfo... messages) {
    68 public void publishMessage(Optional<CollaborationContext> collaborationContext, DpsHeaders headers, RecordChangedV2... messages) {
    73 69 // The batch size is same for both Event grid and Service bus.
    74 70 final int BATCH_SIZE = Integer.parseInt(publisherConfig.getPubSubBatchSize());
    75 71 for (int i = 0; i < messages.length; i += BATCH_SIZE) {
    76 72 String messageId = String.format("%s-%d",headers.getCorrelationId(), i);
    • Author Maintainer

      what is the significance of this messageId, I see that in the end we assign it as PublisherInfo messageId, but what does this addition do (original method doesn't do this).

      My particular concern is when the correlationId is empty; messageId won't remain unique in that case

    • Author Maintainer

      Looks like this is to generate unique message ID to avoid duplicate message processing at the listener, lets leave it as is for now and we can address this separately if it becomes an issue

    • Please register or sign in to reply
  • Alok Joshi
    Alok Joshi @ajoshi19 started a thread on commit 3821c5ed
  • 50 52 private LegalTagCache legalTagCache;
    51 53 @Autowired
    52 54 private MessageBusImpl pubSubclient;
    55 @Autowired
    56 private IFeatureFlag iCollaborationFeatureFlag;
  • Alok Joshi
    Alok Joshi @ajoshi19 started a thread on commit 3821c5ed
  • 81 88 for (RecordMetadata recordMetadata : recordsMetadata) {
    82 89 recordIds.add(recordMetadata.getId());
    83 90 }
    84 this.pubSubclient.publishMessage(Optional.empty(), headers, pubsubInfos);
    91 if (iCollaborationFeatureFlag.isFeatureEnabled(COLLABORATIONS_FEATURE_NAME)) {
    92 this.pubSubclient.publishMessage(Optional.empty(), headers, recordsChangedV2s);
    93 }
    94 if (!collaborationContext.isPresent()) {
    95 this.pubSubclient.publishMessage(headers, pubsubInfos);
    96 }
    • Author Maintainer

      This was a bit confusing for me to read. Personally, I'd prefer

      if(!context) { publishToV1(); if(ffEnabled()) { publishToV2(); } } else { publishToV2(); }

      Edited by Alok Joshi
    • Thank you for the suggestion. My thought process was that we should always publish it to V2 regardless of the context and keep the original functionality untouched (which is publish to V1) if the context is not provided. This way we are not creating a dependency between the feature flag and context and may also be easier to cleanup the feature flag later. Please let me know if the nested conditioning is still better to understand, I will be happy to update it.

      Edited by Larissa Pereira
    • Author Maintainer

      Thats fine with me

    • Please register or sign in to reply
  • Alok Joshi
    Alok Joshi @ajoshi19 started a thread on commit 3821c5ed
  • 81 88 for (RecordMetadata recordMetadata : recordsMetadata) {
    82 89 recordIds.add(recordMetadata.getId());
    83 90 }
    84 this.pubSubclient.publishMessage(Optional.empty(), headers, pubsubInfos);
    91 if (iCollaborationFeatureFlag.isFeatureEnabled(COLLABORATIONS_FEATURE_NAME)) {
    92 this.pubSubclient.publishMessage(Optional.empty(), headers, recordsChangedV2s);
    93 }
    94 if (!collaborationContext.isPresent()) {
    • Author Maintainer

      it looks like collaborationContext is always empty (as initialized), it is not getting updated anywhere?

    • I tried to maintain the functionality of this method when storage was made collaboration context aware and from line 84 it was set to empty so doesn't look like it was getting updated anywhere before. I will cleanup the variable since I am directly providing Optional.empty() here.

    • Please register or sign in to reply
  • Alok Joshi
    Alok Joshi @ajoshi19 started a thread on commit 3821c5ed
  • 79 recordChangedV2[i] = RecordChangedV2.builder().id(recordMetadata.getId()).version(recordMetadata.getLatestVersion()).kind(recordMetadata.getKind()).op(OperationType.update).recordBlocks(processing.getRecordBlocks()).build();
    73 80 if (!Strings.isNullOrEmpty(processing.getRecordMetadata().getPreviousVersionKind())) {
    74 81 pubsubInfo[i].setPreviousVersionKind(processing.getRecordMetadata().getPreviousVersionKind());
    82 recordChangedV2[i].setPreviousVersionKind(processing.getRecordMetadata().getPreviousVersionKind());
    75 83 }
    76 84 }
    77 85 }
    78 86
    79 87 this.commitBatch(recordsProcessing, recordsMetadata, collaborationContext);
    80 this.pubSubClient.publishMessage(collaborationContext, this.headers, pubsubInfo);
    88 if (iCollaborationFeatureFlag.isFeatureEnabled(COLLABORATIONS_FEATURE_NAME)) {
    89 this.pubSubClient.publishMessage(collaborationContext, this.headers, recordChangedV2);
    90 }
    91 if (!collaborationContext.isPresent()) {
    92 this.pubSubClient.publishMessage(this.headers, pubsubInfo);
    93 }
  • Alok Joshi
    Alok Joshi @ajoshi19 started a thread on commit 3821c5ed
  • 85 88
    86 89 @Autowired
    87 90 private StorageAuditLogger auditLogger;
    91 @Autowired
    92 private IFeatureFlag iCollaborationFeatureFlag;
    93 private static final String COLLABORATIONS_FEATURE_NAME="collaborations-enabled";
  • Yurii Kondakov
    Yurii Kondakov @ykondakov started a thread on commit b48c496d
  • 90 91 private StorageAuditLogger auditLogger;
    91 92 @Autowired
    92 93 private IFeatureFlag iCollaborationFeatureFlag;
  • assigned to @ajoshi19

  • Alok Joshi added 1 commit

    added 1 commit

    Compare with previous version

  • Yurii Kondakov added 1 commit

    added 1 commit

    • 171e56a2 - add modifiedBy property to the RecordChangedV2 model

    Compare with previous version

  • Yurii Kondakov
    Yurii Kondakov @ykondakov started a thread on commit 500f58b9
  • 77 .messageId(messageId)
    78 .build();
    63 PublisherInfo publisherInfo = getPublisherInfo();
    64 publisherInfo.setBatch(batch);
    65 publisherInfo.setMessageId(messageId);
    79 66 messagePublisher.publishMessage(headers, publisherInfo, collaborationContext);
    80 67 }
    81 68 }
    69
    70 private PublisherInfo getPublisherInfo() {
    71 return PublisherInfo.builder()
    72 .eventGridTopicName(eventGridConfig.getEventGridTopic())
    73 .eventGridEventSubject(eventGridConfig.getEventSubject())
    74 .eventGridEventType(eventGridConfig.getEventType())
    75 .eventGridEventDataVersion(eventGridConfig.getEventDataVersion())
    76 .serviceBusTopicName(serviceBusConfig.getServiceBusRecordsEventTopic())
  • Alok Joshi
  • Alok Joshi
  • Alok Joshi added 1 commit

    added 1 commit

    Compare with previous version

  • Alok Joshi added 1 commit

    added 1 commit

    • 49de6c62 - modify message properties in example

    Compare with previous version

  • Alok Joshi requested review from @kelham

    requested review from @kelham

  • changed milestone to %M16 - Release 0.19

  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Loading
  • Please register or sign in to reply
    Loading