Commit 2ea1c929 authored by Morris Estepa's avatar Morris Estepa
Browse files

Merge branch 'master' into master-dev-merge

parents c58b3375 e4fb596c
Pipeline #103896 failed with stages
in 18 minutes and 40 seconds
......@@ -60,7 +60,10 @@ include:
file: "cloud-providers/ibm.yml"
- project: "osdu/platform/ci-cd-pipelines"
file: "cloud-providers/aws.yml"
file: "cloud-providers/aws-global.yml"
- project: "osdu/platform/ci-cd-pipelines"
file: "cloud-providers/aws-maven.yml"
- project: "osdu/platform/ci-cd-pipelines"
ref: "master"
......
This diff is collapsed.
......@@ -37,5 +37,5 @@ spec:
"/api/workflow/webjars/*",
"/api/workflow/swagger-ui/*",
"*/v3/api-docs",
"/api/workflow/info"
"/api/workflow/v1/info"
]
apiVersion: v2
name: gcp-ingestion-workflow-configmap
name: gcp-workflow-configmap
description: A Helm chart for Kubernetes
# A chart can be either an 'application' or a 'library' chart.
......
{{- if .Values.conf.on_prem_enabled }}
apiVersion: v1
kind: Secret
metadata:
labels:
app: "{{ .Values.conf.app_name }}"
annotations:
rollme: {{ randAlphaNum 5 | quote }}
name: "{{ .Values.conf.secret_name }}"
namespace: "{{ .Release.Namespace }}"
type: Opaque
data:
OSDU_AIRFLOW_USERNAME: "{{ .Values.data.osdu_airflow_username }}"
OSDU_AIRFLOW_PASSWORD: "{{ .Values.data.osdu_airflow_password }}"
{{- end }}
......@@ -9,8 +9,8 @@ data:
LOG_LEVEL: "{{ .Values.data.log_level }}"
PARTITION_API: "{{ .Values.data.partition_api }}"
OSDU_ENTITLEMENTS_URL: "{{ .Values.data.osdu_entitlements_url }}"
OSDU_AIRFLOW_URL: "{{ .Values.data.gcp_airflow_url }}"
SPRING_PROFILES_ACTIVE: "{{ .Values.data.spring_profiles_active }}"
OSDU_AIRFLOW_URL: "{{ .Values.data.gcp_airflow_url }}"
{{- if not .Values.conf.on_prem_enabled }}
GOOGLE_AUDIENCES: "{{ .Values.data.google_audiences }}"
SHARED_TENANT_NAME: "{{ .Values.data.shared_tenant_name }}"
......
......@@ -3,15 +3,11 @@
# Declare variables to be passed into your templates.
data:
# commom
log_level: "INFO"
partition_api: "http://partition/api/partition/v1/"
osdu_entitlements_url: "http://entitlements/api/entitlements/v2/"
gcp_airflow_url: ""
spring_profiles_active: "gcp"
# onprem
osdu_airflow_username: ""
osdu_airflow_password: ""
# gcp
google_audiences: ""
shared_tenant_name: ""
......@@ -19,5 +15,4 @@ data:
conf:
configmap: "workflow-config"
app_name: "workflow"
secret_name: "workflow-secret"
on_prem_enabled: false
......@@ -27,6 +27,13 @@ spec:
{{- if .Values.conf.on_prem_enabled }}
- secretRef:
name: "{{ .Values.conf.secret_name }}"
# FIXME
env:
- name: OSDU_AIRFLOW_PASSWORD
valueFrom:
secretKeyRef:
name: "airflow"
key: airflow-password
{{- end }}
securityContext:
allowPrivilegeEscalation: false
......
......@@ -14,5 +14,5 @@ data:
conf:
configmap: "workflow-config"
app_name: "workflow"
secret_name: "workflow-secret"
secret_name: "airflow-workflow-secret"
on_prem_enabled: false
......@@ -493,12 +493,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/error'
/info:
/v1/info:
get:
tags:
- info
summary: "Version info"
description: "For deployment available public `/info` endpoint, \
description: "For deployment available public `/v1/info` endpoint, \
\ which provides build and git related information."
operationId: "Version info"
responses:
......
......@@ -27,7 +27,7 @@
<groupId>org.opengroup.osdu</groupId>
<artifactId>os-workflow</artifactId>
<version>0.14.0-SNAPSHOT</version>
<version>0.15.0-SNAPSHOT</version>
<packaging>pom</packaging>
<name>os-workflow</name>
......
......@@ -19,12 +19,12 @@
<parent>
<groupId>org.opengroup.osdu</groupId>
<artifactId>os-workflow</artifactId>
<version>0.14.0-SNAPSHOT</version>
<version>0.15.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>workflow-aws</artifactId>
<version>0.14.0-SNAPSHOT</version>
<version>0.15.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>workflow-aws</name>
......@@ -72,7 +72,7 @@
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>workflow-core</artifactId>
<version>0.14.0-SNAPSHOT</version>
<version>0.15.0-SNAPSHOT</version>
</dependency>
<dependency>
......
......@@ -20,7 +20,7 @@
<parent>
<groupId>org.opengroup.osdu</groupId>
<artifactId>os-workflow</artifactId>
<version>0.14.0-SNAPSHOT</version>
<version>0.15.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
......@@ -111,7 +111,7 @@
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>workflow-core</artifactId>
<version>0.14.0-SNAPSHOT</version>
<version>0.15.0-SNAPSHOT</version>
<exclusions>
<exclusion>
<groupId>org.springframework.security</groupId>
......
package org.opengroup.osdu.workflow.provider.azure.config;
import org.opengroup.osdu.azure.publisherFacade.models.PubSubAttributesBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MessagePublisherConfig {
@Bean
public PubSubAttributesBuilder getPubSub() {
return PubSubAttributesBuilder.builder().build();
}
}
package org.opengroup.osdu.workflow.provider.azure.gsm;
import com.microsoft.azure.eventgrid.models.EventGridEvent;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.joda.time.DateTime;
import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore;
import org.opengroup.osdu.azure.publisherFacade.MessagePublisher;
import org.opengroup.osdu.azure.publisherFacade.PublisherInfo;
import org.opengroup.osdu.core.common.exception.CoreException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.status.Message;
......@@ -13,73 +12,52 @@ import org.opengroup.osdu.core.common.status.IEventPublisher;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@Slf4j
@Service
@RequiredArgsConstructor
public class AzureEventGridPublisher implements IEventPublisher {
public class AzureMessagePublisher implements IEventPublisher {
private static final String STATUS_CHANGED = "status-changed";
private static final String EVENT_DATA_VERSION = "1.0";
@Setter
@Value("${azure.eventGrid.topicName}")
private String topicName;
private String eventGridTopic;
@Setter
@Value("${azure.eventGrid.enabled}")
private Boolean isEventGridEnabled;
@Value("${azure.serviceBus.topicName}")
private String serviceBusTopic;
private final EventGridTopicStore eventGridTopicStore;
private final MessagePublisher messagePublisher;
@Override
public void publish(Message[] messages, Map<String, String> attributesMap) throws CoreException {
validateEventGrid();
validateInput(messages, attributesMap);
Map<String, Object> message = createMessageMap(messages, attributesMap);
List<EventGridEvent> eventsList = createEventGridEventList(message);
String dataPartitionId = attributesMap.get(DpsHeaders.DATA_PARTITION_ID);
eventGridTopicStore.publishToEventGridTopic(dataPartitionId, topicName, eventsList);
String correlationId = attributesMap.get(DpsHeaders.CORRELATION_ID);
String logMsg = String.format(
"Event published successfully to topic='%s' with dataPartitionId='%s' and correlationId='%s'.",
topicName, dataPartitionId, correlationId
);
log.info(logMsg);
}
private List<EventGridEvent> createEventGridEventList(Map<String, Object> message) {
String messageId = UUID.randomUUID().toString();
EventGridEvent eventGridEvent = new EventGridEvent(messageId, STATUS_CHANGED, message, STATUS_CHANGED,
DateTime.now(), EVENT_DATA_VERSION);
return Collections.singletonList(eventGridEvent);
}
DpsHeaders dpsHeaders = new DpsHeaders();
dpsHeaders.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
dpsHeaders.put(DpsHeaders.CORRELATION_ID, correlationId);
private Map<String, Object> createMessageMap(Message[] messages, Map<String, String> attributesMap) {
String dataPartitionId = attributesMap.get(DpsHeaders.DATA_PARTITION_ID);
String correlationId = attributesMap.get(DpsHeaders.CORRELATION_ID);
Map<String, Object> message = new HashMap<>();
message.put("data", messages);
message.put(DpsHeaders.DATA_PARTITION_ID, dataPartitionId);
message.put(DpsHeaders.CORRELATION_ID, correlationId);
PublisherInfo publisherInfo = PublisherInfo.builder()
.batch(messages)
.eventGridTopicName(eventGridTopic)
.eventGridEventSubject(STATUS_CHANGED)
.eventGridEventType(STATUS_CHANGED)
.eventGridEventDataVersion(EVENT_DATA_VERSION)
.serviceBusTopicName(serviceBusTopic).build();
return message;
}
messagePublisher.publishMessage(dpsHeaders, publisherInfo);
private void validateEventGrid() throws CoreException {
if (!isEventGridEnabled) {
throw new CoreException("Event grid is not enabled");
}
String logMsg = String.format(
"Event published successfully to eventGridTopic='%s', ServiceBusTopic='%s' with dataPartitionId='%s' and correlationId='%s'.",
eventGridTopic, serviceBusTopic, dataPartitionId, correlationId
);
log.info(logMsg);
}
private void validateInput(Message[] messages, Map<String, String> attributesMap) throws CoreException {
......
......@@ -12,17 +12,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
LOG_PREFIX=workflow
# Server Path Configuration
server.servlet.contextPath=/api/workflow/
# Istio Auth Config Toggle
azure.istio.auth.enabled=${azure_istioauth_enabled}
# Partition service
PARTITION_API=${partition_service_endpoint}
azure.activedirectory.app-resource-id=${aad_client_id}
# Azure AD configuration for OpenIDConnect
azure.activedirectory.session-stateless=true
azure.activedirectory.client-id=${aad_client_id}
azure.activedirectory.AppIdUri=api://${aad_client_id}
# Azure CosmosDB configuration
osdu.azure.cosmosdb.database=${cosmosdb_database}
osdu.azure.cosmosdb.ingestionStrategyCollection=IngestionStrategy
......@@ -37,15 +42,19 @@ osdu.azure.system.cosmosPrimaryKeyName=system-cosmos-primary-key
osdu.azure.system.cosmosConnectionStringKeyName=system-cosmos-connection
osdu.azure.system.storageAccountNameKeyName=system-storage
osdu.azure.system.storageKeyKeyName=system-storage-key
# Azure fileshare configuration
osdu.azure.fileshare.shareName=airflowdags
osdu.azure.fileshare.shareNameV2=airflow2dags
osdu.azure.fileshare.dagsFolder=dags
osdu.azure.fileshare.customOperatorsFolder=plugins/operators
# Azure KeyVault configuration
azure.keyvault.url=${KEYVAULT_URI}
# Azure App Insights configuration
azure.application-insights.instrumentation-key=${appinsights_key}
# Airflow configuration
osdu.airflow.version2.enabled=${OSDU_AIRFLOW_VERSION2_ENABLED:false}
osdu.azure.airflow.url=${airflow_url}
......@@ -54,22 +63,23 @@ osdu.azure.airflow.password=${airflow_password}
osdu.azure.airflow.dagRunAbstractionEnabled=false
osdu.azure.airflow.controllerDagId=_controller_dag
osdu.azure.airflow.isDPAirflowUsedForSystemDAG=${dp_airflow_for_system_dag}
# Logging
logging.transaction.enabled=true
logging.slf4jlogger.enabled=true
logging.mdccontext.enabled=true
# Entitlements config
osdu.entitlements.url=${entitlements_service_endpoint}
osdu.entitlements.appKey=${entitlements_service_api_key}
osdu.azure.partitionId=opendes
# Use this property to name your shared tenant
shared.tenant.name=system
# Dagcontent config
osdu.azure.airflow.ignoreDagContent=${ignore_dagContent:true}
osdu.azure.airflow.ignoreCustomOperatorContent=${ignore_customOperatorContent:true}
# Azure Event Grid Configuration
azure.eventGrid.enabled=${event_grid_enabled:true}
azure.eventGrid.topicName=${event_grid_topic:statuschangedtopic}
# Configuration for health checks
management.server.port=8081
management.health.azure-key-vault.enabled=false
......@@ -92,3 +102,14 @@ osdu.azure.active-dag-runs.threshold=50000
spring.datasource.url=jdbc:postgresql://${postgres_name}.postgres.database.azure.com:${postgres_port}/${postgres_db}
spring.datasource.username=${postgres_username}@${postgres_name}
spring.datasource.password=${postgres_password}
# Azure Event Grid Configuration
azure.eventGrid.enabled=${event_grid_enabled_status:false}
azure.eventGrid.topicName=${event_grid_topic_status:statuschangedtopic}
# Azure Service Bus Configuration
azure.serviceBus.enabled=${service_bus_enabled_status:true}
azure.serviceBus.topicName=${service_bus_topic_status:statuschangedtopic}
azure.pubsub.publish=${azure_pubsub_publish:true}
azure.publisher.batchSize=50
......@@ -6,7 +6,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.azure.eventgrid.EventGridTopicStore;
import org.opengroup.osdu.azure.publisherFacade.MessagePublisher;
import org.opengroup.osdu.core.common.exception.CoreException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.status.Message;
......@@ -16,36 +16,34 @@ import java.util.HashMap;
import java.util.Map;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.*;
/**
* Tests for {@link AzureEventGridPublisher}
* Tests for {@link AzureMessagePublisher}
*/
@ExtendWith(MockitoExtension.class)
class AzureEventGridPublisherTest {
class AzureMessagePublisherTest {
private final static String TOPIC_NAME = "status_topic";
private final static String CORRELATION_ID = "CORRELATION_ID";
private final static String DATA_PARTITION_ID = "DATA_PARTITION_ID";
@Mock
private EventGridTopicStore eventGridTopicStore;
private MessagePublisher messagePublisher;
private AzureEventGridPublisher publisher;
private AzureMessagePublisher publisher;
@BeforeEach
void setUp() {
publisher = new AzureEventGridPublisher(eventGridTopicStore);
publisher.setTopicName(TOPIC_NAME);
publisher.setIsEventGridEnabled(true);
publisher = new AzureMessagePublisher(messagePublisher);
publisher.setEventGridTopic(TOPIC_NAME);
publisher.setServiceBusTopic(TOPIC_NAME);
}
@Test
void shouldPublishGSMMessage() {
//given
doNothing().when(eventGridTopicStore).publishToEventGridTopic(any(), any(), any());
doNothing().when(messagePublisher).publishMessage(any(), any());
Message[] messages = new Message[]{new StatusDetails()};
Map<String, String> attributesMap = new HashMap<String, String>() {{
put(DpsHeaders.CORRELATION_ID, CORRELATION_ID);
......@@ -56,28 +54,8 @@ class AzureEventGridPublisherTest {
publisher.publish(messages, attributesMap);
//then
verify(eventGridTopicStore, times(1))
.publishToEventGridTopic(any(), any(), any());
}
@Test
void shouldTrowCoreExceptionOnDisabledPublishing() {
//given
publisher.setIsEventGridEnabled(false);
Message[] messages = new Message[]{new StatusDetails()};
Map<String, String> attributesMap = new HashMap<String, String>() {{
put(DpsHeaders.CORRELATION_ID, CORRELATION_ID);
put(DpsHeaders.DATA_PARTITION_ID, DATA_PARTITION_ID);
}};
//when & then
Assertions.assertThrows(CoreException.class,
() -> publisher.publish(messages, attributesMap)
);
//then
verify(eventGridTopicStore, times(0))
.publishToEventGridTopic(any(), any(), any());
verify(messagePublisher, times(1))
.publishMessage(any(), any());
}
@Test
......@@ -94,8 +72,8 @@ class AzureEventGridPublisherTest {
);
//then
verify(eventGridTopicStore, times(0))
.publishToEventGridTopic(any(), any(), any());
verify(messagePublisher, times(0))
.publishMessage(any(), any());
}
@Test
......@@ -113,8 +91,8 @@ class AzureEventGridPublisherTest {
);
//then
verify(eventGridTopicStore, times(0))
.publishToEventGridTopic(any(), any(), any());
verify(messagePublisher, times(0))
.publishMessage(any(), any());
}
@Test
......@@ -128,8 +106,8 @@ class AzureEventGridPublisherTest {
);
//then
verify(eventGridTopicStore, times(0))
.publishToEventGridTopic(any(), any(), any());
verify(messagePublisher, times(0))
.publishMessage(any(), any());
}
@Test
......@@ -144,8 +122,8 @@ class AzureEventGridPublisherTest {
);
//then
verify(eventGridTopicStore, times(0))
.publishToEventGridTopic(any(), any(), any());
verify(messagePublisher, times(0))
.publishMessage(any(), any());
}
@Test
......@@ -162,8 +140,8 @@ class AzureEventGridPublisherTest {
);
//then
verify(eventGridTopicStore, times(0))
.publishToEventGridTopic(any(), any(), any());
verify(messagePublisher, times(0))
.publishMessage(any(), any());
}
@Test
......@@ -180,7 +158,7 @@ class AzureEventGridPublisherTest {
);
//then
verify(eventGridTopicStore, times(0))
.publishToEventGridTopic(any(), any(), any());
verify(messagePublisher, times(0))
.publishMessage(any(), any());
}
}
......@@ -70,3 +70,6 @@ spring.main.allow-bean-definition-overriding=true
# Dagcontent config
osdu.azure.airflow.ignoreDagContent=${ignore_dagContent}
# Enable message publisher
azure.pubsub.publish=true
......@@ -15,6 +15,7 @@ Must have:
| `<POSTGRES_PASSWORD_ENV_VARIABLE_NAME>` | ex `POSTGRES_PASS_OSDU` | Postgres password env name, name of that variable not defined at the service level, the name will be received through partition service. Each tenant can have it's own ENV name value, and it must be present in ENV of Workflow service | yes | - |
| `<AMQP_PASSWORD_ENV_VARIABLE_NAME>` | ex `AMQP_PASS_OSDU` | Amqp password env name, name of that variable not defined at the service level, the name will be received through partition service. Each tenant can have it's own ENV name value, and it must be present in ENV of Workflow service | yes | - |
| `<AMQP_ADMIN_PASSWORD_ENV_VARIABLE_NAME>` | ex `AMQP_ADMIN_PASS_OSDU` | Amqp admin password env name, name of that variable not defined at the service level, the name will be received through partition service. Each tenant can have it's own ENV name value, and it must be present in ENV of Workflow service | yes | - |
| `SHARED_TENANT_NAME` | ex `osdu` | Shared account id | no | - |
Defined in default application property file but possible to override:
......@@ -101,8 +102,8 @@ curl -L -X PATCH 'http://partition.com/api/partition/v1/partitions/opendes' -H '
### Persistence layer
### Database structure for OSMDRIVER=postgres
```
DROP TABLE IF EXISTS <partitionId>.workflow_osm;
CREATE TABLE IF NOT EXISTS opendes.workflow
DROP TABLE IF EXISTS anthos.workflow_osm;
CREATE TABLE IF NOT EXISTS anthos.workflow_osm
(
id text COLLATE pg_catalog."default" NOT NULL,
pk bigint NOT NULL GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
......@@ -110,12 +111,12 @@ CREATE TABLE IF NOT EXISTS opendes.workflow
CONSTRAINT workflow_id UNIQUE (id)
)
TABLESPACE pg_default;
ALTER TABLE opendes.workflow
ALTER TABLE anthos.workflow_osm
OWNER to postgres;
DROP TABLE IF EXISTS <partitionId>.workflow_run_osm;
CREATE TABLE IF NOT EXISTS <partitionId>.workflow_run
DROP TABLE IF EXISTS anthos.workflow_run_osm;
CREATE TABLE IF NOT EXISTS anthos.workflow_run_osm
(
id text COLLATE pg_catalog."default" NOT NULL,
</