Commit d131de43 authored by Riabokon Stanislav(EPAM)[GCP]'s avatar Riabokon Stanislav(EPAM)[GCP]
Browse files

Merge branch 'gcp-gsm' into 'master'

Implement Status Message event publishing for Workflow service (GONRG-2904)

See merge request !155
parents 1227093a 47d5a7e0
Pipeline #76400 failed with stages
in 4 minutes and 12 seconds
......@@ -805,8 +805,6 @@ The following software have components provided under the terms of this license:
- JUnit Platform Commons (from https://junit.org/junit5/)
- JUnit Platform Engine API (from https://junit.org/junit5/)
- JUnit Platform Engine API (from https://junit.org/junit5/)
- JUnit Vintage Engine (from https://junit.org/junit5/)
- JUnit Vintage Engine (from https://junit.org/junit5/)
- Jakarta Annotations API (from https://projects.eclipse.org/projects/ee4j.ca)
- Logback Classic Module (from https://repo1.maven.org/maven2/ch/qos/logback/logback-classic)
- Logback Contrib :: JSON :: Classic (from https://repo1.maven.org/maven2/ch/qos/logback/contrib/logback-json-classic)
......@@ -819,6 +817,8 @@ The following software have components provided under the terms of this license:
- Microsoft Application Insights Log4j 2 Appender (from https://github.com/Microsoft/ApplicationInsights-Java)
- SnakeYAML (from http://www.snakeyaml.org)
- SnakeYAML (from http://www.snakeyaml.org)
- org.junit.vintage:junit-vintage-engine (from http://junit.org/junit5/)
- org.junit.vintage:junit-vintage-engine (from http://junit.org/junit5/)
========================================================================
EPL-2.0
......@@ -837,9 +837,9 @@ The following software have components provided under the terms of this license:
- JUnit Platform Commons (from https://junit.org/junit5/)
- JUnit Platform Engine API (from https://junit.org/junit5/)
- JUnit Platform Engine API (from https://junit.org/junit5/)
- JUnit Vintage Engine (from https://junit.org/junit5/)
- JUnit Vintage Engine (from https://junit.org/junit5/)
- Jakarta Annotations API (from https://projects.eclipse.org/projects/ee4j.ca)
- org.junit.vintage:junit-vintage-engine (from http://junit.org/junit5/)
- org.junit.vintage:junit-vintage-engine (from http://junit.org/junit5/)
========================================================================
GPL-2.0-only
......@@ -1128,10 +1128,10 @@ The following software have components provided under the terms of this license:
- JUnit Platform Commons (from https://junit.org/junit5/)
- JUnit Platform Engine API (from https://junit.org/junit5/)
- JUnit Platform Engine API (from https://junit.org/junit5/)
- JUnit Vintage Engine (from https://junit.org/junit5/)
- JUnit Vintage Engine (from https://junit.org/junit5/)
- Jakarta Activation API jar (from https://repo1.maven.org/maven2/jakarta/activation/jakarta.activation-api)
- Jakarta XML Binding API (from https://repo1.maven.org/maven2/jakarta/xml/bind/jakarta.xml.bind-api)
- Spongy Castle (from http://rtyley.github.io/spongycastle/)
- org.junit.vintage:junit-vintage-engine (from http://junit.org/junit5/)
- org.junit.vintage:junit-vintage-engine (from http://junit.org/junit5/)
......@@ -34,6 +34,8 @@ In order to run the service locally, you will need to have the following environ
| `OSDU_AIRFLOW_USERNAME` | ex `******` | Username for access Apache Airflow | yes | - |
| `OSDU_AIRFLOW_PASSWORD` | ex `******` | Password for access Apache Airflow | yes | - |
| `PARTITION_API` | ex `http://localhost:8081/api/partition/v1` | Partition service endpoint | no | - |
| `STATUS_CHANGED_MESSAGING_ENABLED` | `true` OR `false` | Allows to configure message publishing about schemas changes to Pub/Sub | no | - |
| `STATUS_CHANGED_TOPIC_NAME` | ex `status-changed` | Allows to subscribe a specific Pub/Sub topic | no | - |
**Required to run integration tests**
......
/*
Copyright 2021 Google LLC
Copyright 2021 EPAM Systems, Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.opengroup.osdu.workflow.provider.gcp.config;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@ConfigurationProperties(prefix = "gcp.status-changed")
@Getter
@Setter
@Configuration
public class EventMessagingPropertiesConfig {
private boolean messagingEnabled;
private String topicName;
}
/*
Copyright 2021 Google LLC
Copyright 2021 EPAM Systems, Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.opengroup.osdu.workflow.provider.gcp.gsm;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectTopicName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PubsubMessage.Builder;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.exception.CoreException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.status.Message;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.status.IEventPublisher;
import org.opengroup.osdu.workflow.provider.gcp.config.EventMessagingPropertiesConfig;
import org.springframework.stereotype.Service;
import org.threeten.bp.Duration;
@Slf4j
@Service
@RequiredArgsConstructor
public class GcpEventPublisher implements IEventPublisher {
private final EventMessagingPropertiesConfig eventMessagingPropertiesConfig;
private Publisher publisher;
private final TenantInfo tenantInfo;
private static final RetrySettings RETRY_SETTINGS = RetrySettings.newBuilder()
.setTotalTimeout(Duration.ofSeconds(10))
.setInitialRetryDelay(Duration.ofMillis(5))
.setRetryDelayMultiplier(2)
.setMaxRetryDelay(Duration.ofSeconds(3))
.setInitialRpcTimeout(Duration.ofSeconds(10))
.setRpcTimeoutMultiplier(2)
.setMaxRpcTimeout(Duration.ofSeconds(10))
.build();
@Override
public void publish(Message[] messages,
Map<String, String> attributesMap) throws CoreException {
if (eventMessagingPropertiesConfig.isMessagingEnabled()) {
validateInput(messages, attributesMap);
if (Objects.isNull(this.publisher)) {
try {
this.publisher = Publisher.newBuilder(
ProjectTopicName.newBuilder()
.setProject(this.tenantInfo.getProjectId())
.setTopic(this.eventMessagingPropertiesConfig.getTopicName()).build())
.setRetrySettings(RETRY_SETTINGS).build();
} catch (IOException e) {
log.error(e.getMessage(), e);
}
}
Map<String, Object> messageMap = createMessageMap(messages, attributesMap);
PubsubMessage pubSubMessage = createPubSubMessageList(messageMap);
publisher.publish(pubSubMessage);
}
}
private Map<String, Object> createMessageMap(Message[] messages,
Map<String, String> attributesMap) {
String dataPartitionId = attributesMap.get(DpsHeaders.DATA_PARTITION_ID);
String correlationId = attributesMap.get(DpsHeaders.CORRELATION_ID);
Map<String, Object> message = new HashMap<>();
message.put("data", messages);
message.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
message.put(DpsHeaders.CORRELATION_ID, correlationId);
message.put(DpsHeaders.ACCOUNT_ID, this.tenantInfo.getName());
return message;
}
private PubsubMessage createPubSubMessageList(Map<String, Object> message) {
Builder messageBuilder = PubsubMessage.newBuilder();
messageBuilder.putAttributes(DpsHeaders.ACCOUNT_ID,
String.valueOf(message.get(DpsHeaders.ACCOUNT_ID)));
messageBuilder.putAttributes(DpsHeaders.DATA_PARTITION_ID,
String.valueOf(message.get(DpsHeaders.DATA_PARTITION_ID)));
messageBuilder.putAttributes(DpsHeaders.CORRELATION_ID,
String.valueOf(message.get(DpsHeaders.CORRELATION_ID)));
Message[] messagesArray = (Message[]) message.get("data");
ByteString data = ByteString.copyFromUtf8(Arrays.toString(messagesArray));
messageBuilder.setData(data);
return messageBuilder.build();
}
private void validateInput(Message[] messages, Map<String, String> attributesMap) {
validateMsg(messages);
validateAttributesMap(attributesMap);
}
private void validateMsg(Message[] messages) {
if (Objects.isNull(messages) || messages.length == 0) {
log.warn("Nothing in message to publish");
}
}
private void validateAttributesMap(Map<String, String> attributesMap) {
if (Objects.isNull(attributesMap) || attributesMap.isEmpty()) {
throw new IllegalArgumentException(
"data-partition-id and correlation-id are required to publish status event");
} else if (attributesMap.get(DpsHeaders.DATA_PARTITION_ID) == null) {
throw new IllegalArgumentException("data-partition-id is required to publish status event");
} else if (attributesMap.get(DpsHeaders.CORRELATION_ID) == null) {
throw new IllegalArgumentException("correlation-id is required to publish status event");
}
}
}
......@@ -28,3 +28,10 @@ google.audiences=123.apps.googleusercontent.com
partition.api=http://localhost:8081/api/partition/v1
osdu.version.info.gitPropertiesPath=/git.properties
gcp.status-changed.messagingEnabled=${STATUS_CHANGED_MESSAGING_ENABLED}
gcp.status-changed.topicName=${STATUS_CHANGED_TOPIC_NAME}
STATUS_CHANGED_TOPIC_NAME=status-changed
STATUS_CHANGED_MESSAGING_ENABLED=false
/*
Copyright 2021 Google LLC
Copyright 2021 EPAM Systems, Inc
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package org.opengroup.osdu.workflow.provider.gcp.gsm;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.pubsub.v1.PubsubMessage;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.status.DatasetDetails;
import org.opengroup.osdu.core.common.model.status.Message;
import org.opengroup.osdu.core.common.model.status.StatusDetails;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.workflow.provider.gcp.config.EventMessagingPropertiesConfig;
@RunWith(MockitoJUnitRunner.class)
public class GcpEventPublisherTest {
private static final String TENANT_NAME = "tenantName";
private static final String DATA_PARTITION_VALUE = "partitionValue";
private static final String CORRELATION_VALUE = "correlationValue";
@Mock
private Publisher publisher;
@Mock
private TenantInfo tenantInfo;
@Mock
private EventMessagingPropertiesConfig eventMessagingPropertiesConfig;
@InjectMocks
private GcpEventPublisher gcpEventPublisher;
@Test
public void shouldNot_publishEventMessage_WhenFlagIsFalse() {
when(this.eventMessagingPropertiesConfig.isMessagingEnabled()).thenReturn(false);
Message[] messages = buildMessageArray();
Map<String, String> AttributesMap = buildAttributesMap();
this.gcpEventPublisher.publish(messages, AttributesMap);
verify(this.publisher, times(0)).publish(any());
}
@Test
public void should_publishEventMessage_WhenFlagIsTrue() {
when(this.eventMessagingPropertiesConfig.isMessagingEnabled()).thenReturn(true);
when(this.tenantInfo.getName()).thenReturn(TENANT_NAME);
Message[] messages = buildMessageArray();
Map<String, String> AttributesMap = buildAttributesMap();
this.gcpEventPublisher.publish(messages, AttributesMap);
verify(this.publisher, times(1)).publish(any(PubsubMessage.class));
}
private Message[] buildMessageArray() {
Message[] messageArray = new Message[2];
StatusDetails statusDetails = new StatusDetails();
statusDetails.setKind("testKind1");
messageArray[0] = statusDetails;
DatasetDetails datasetDetails = new DatasetDetails();
datasetDetails.setKind("testKin2");
messageArray[1] = datasetDetails;
return messageArray;
}
private Map<String, String> buildAttributesMap() {
Map<String, String> attributesMap = new HashMap<>();
attributesMap.put(DpsHeaders.DATA_PARTITION_ID, DATA_PARTITION_VALUE);
attributesMap.put(DpsHeaders.CORRELATION_ID, CORRELATION_VALUE);
return attributesMap;
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment