Commit c1914ff5 authored by Hema Vishnu Pola [Microsoft]'s avatar Hema Vishnu Pola [Microsoft]
Browse files

Merge branch 'haaggarw/PerfTestingConfig' into 'master'

[Azure][Core] - Configuration & Logging Changes

See merge request !32
parents 60ce9f1f 44eb484e
Pipeline #26806 passed with stages
in 33 minutes and 8 seconds
......@@ -3,14 +3,19 @@
# Specify the Gitlab branch being used for image creation
# ie: community.opengroup.org:5555/osdu/platform/system/schema/{{ .Values.global.branch }}/schema:latest
#
global:
# Service(s) Replica Count
replicaCount: 1
azure:
servicebusSubscription: eg_sb_wkssubscription
servicebusTopic: recordstopiceg
storageContainer: osdu-wks-mappings
defaultTenant: #{DEFAULT_TENANT}#
image:
branch: #{ENVIRONMENT_NAME}#
tag: #{Build.SourceVersion}#
repository: #{container-registry}#.azurecr.io
default_tenant: #{DEFAULT_TENANT}#
......@@ -18,6 +18,7 @@ metadata:
name: {{ .Release.Name }}
namespace: osdu
spec:
replicas: {{ .Values.global.replicaCount }}
selector:
matchLabels:
app: {{ .Chart.Name }}
......@@ -38,6 +39,13 @@ spec:
- name: {{ .Chart.Name }}
image: {{ .Values.image.repository }}/{{ .Chart.Name }}-{{ .Values.image.branch }}:{{ .Values.image.tag | default .Chart.AppVersion }}
imagePullPolicy: Always
resources:
requests:
cpu: "100m"
memory: "2Gi"
limits:
cpu: "1000m"
memory: "2Gi"
volumeMounts:
- name: azure-keyvault
mountPath: "/mnt/azure-keyvault"
......@@ -99,10 +107,10 @@ spec:
- name: partition_service_endpoint
value: "http://partition/api/partition/v1"
- name: default_tenant
value: {{ .Values.default_tenant }}
value: {{ .Values.azure.defaultTenant }}
- name: max_concurrent_calls
value: "32"
- name: executor_n_threads
value: "32"
- name: max_lock_renew_duration_seconds
value: "600"
value: "2000"
\ No newline at end of file
# Copyright © Microsoft Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: keda.k8s.io/v1alpha1
kind: ScaledObject
metadata:
name: {{ .Chart.Name }}
namespace: osdu
labels:
deploymentName: {{ .Chart.Name }}
spec:
scaleTargetRef:
deploymentName: {{ .Chart.Name }}
triggers:
- type: azure-servicebus
metadata:
type: serviceBusTrigger
direction: in
name: message
subscriptionName: {{ .Values.azure.servicebusSubscription }}
topicName: {{ .Values.azure.servicebusTopic }}
connection: SERVICE_BUS
......@@ -12,12 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
global:
# Service(s) Replica Count
replicaCount: 1
azure:
servicebusSubscription: eg_sb_wkssubscription
servicebusTopic: recordstopiceg
storageContainer: osdu-wks-mappings
defaultTenant: opendes
image:
branch: master
tag: latest
repository: community.opengroup.org:5555/osdu/platform/data-flow/enrichment/wks
default_tenant: opendes
\ No newline at end of file
......@@ -55,8 +55,8 @@ stages:
parameters:
mavenGoal: 'package'
mavenPublishJUnitResults: true
serviceCoreMavenOptions: '-P wks-core --settings .mvn/community-maven-ado.settings.xml'
mavenOptions: '-P wks-azure --settings .mvn/community-maven-ado.settings.xml -Dmaven.repo.local=$(MAVEN_CACHE_FOLDER)'
serviceCoreMavenOptions: '-pl wks-core --settings .mvn/community-maven-ado.settings.xml -Dmaven.repo.local=$(MAVEN_CACHE_FOLDER)'
mavenOptions: '-pl provider/wks-azure --settings .mvn/community-maven-ado.settings.xml -Dmaven.repo.local=$(MAVEN_CACHE_FOLDER)'
copyFileContents: |
pom.xml
provider/wks-azure/maven/settings.xml
......@@ -78,7 +78,7 @@ stages:
chartPath: ${{ variables.chartPath }}
valuesFile: ${{ variables.valuesFile }}
testCoreMavenPomFile: 'testing/wks-test-core/pom.xml'
testCoreMavenOptions: '--settings $(System.DefaultWorkingDirectory)/drop/.mvn/community-maven-ado.settings.xml -DskipTests -DskipITs'
testCoreMavenOptions: '--settings $(System.DefaultWorkingDirectory)/drop/.mvn/community-maven-ado.settings.xml -Dmaven.repo.local=$(MAVEN_CACHE_FOLDER) -DskipTests -DskipITs'
integrationTestMavenGoal: 'verify'
skipDeploy: ${{ variables.SKIP_DEPLOY }}
skipTest: ${{ variables.SKIP_TESTS }}
......
......@@ -55,8 +55,8 @@ stages:
parameters:
mavenGoal: 'package'
mavenPublishJUnitResults: true
serviceCoreMavenOptions: '-P wks-core --settings .mvn/community-maven-ado.settings.xml'
mavenOptions: '-P wks-azure --settings .mvn/community-maven-ado.settings.xml -Dmaven.repo.local=$(MAVEN_CACHE_FOLDER)'
serviceCoreMavenOptions: '-pl wks-core --settings .mvn/community-maven-ado.settings.xml -Dmaven.repo.local=$(MAVEN_CACHE_FOLDER)'
mavenOptions: '-pl provider/wks-azure --settings .mvn/community-maven-ado.settings.xml -Dmaven.repo.local=$(MAVEN_CACHE_FOLDER)'
copyFileContents: |
pom.xml
provider/wks-azure/maven/settings.xml
......@@ -78,7 +78,7 @@ stages:
chartPath: ${{ variables.chartPath }}
valuesFile: ${{ variables.valuesFile }}
testCoreMavenPomFile: 'testing/wks-test-core/pom.xml'
testCoreMavenOptions: '--settings $(System.DefaultWorkingDirectory)/drop/.mvn/community-maven-ado.settings.xml -DskipTests -DskipITs'
testCoreMavenOptions: '--settings $(System.DefaultWorkingDirectory)/drop/.mvn/community-maven-ado.settings.xml -Dmaven.repo.local=$(MAVEN_CACHE_FOLDER) -DskipTests -DskipITs'
integrationTestMavenGoal: 'verify'
skipDeploy: ${{ variables.SKIP_DEPLOY }}
skipTest: ${{ variables.SKIP_TESTS }}
......
File suppressed by a .gitattributes entry or the file's encoding is unsupported.
package org.opengroup.osdu.wks.provider.azure;
import org.opengroup.osdu.wks.config.ThreadScopeBeanFactoryPostProcessor;
import org.opengroup.osdu.wks.provider.interfaces.SubscriptionManager;
import org.opengroup.osdu.wks.service.SchemaService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.web.client.RestTemplate;
@SpringBootApplication
@ComponentScan(value = {
"org.opengroup.osdu"
})
public class WksServiceApplication {
private final static Logger LOGGER = LoggerFactory.getLogger(WksServiceApplication.class);
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(org.opengroup.osdu.wks.WksServiceApplication.class, args);
SubscriptionManager subscriptionManager = context.getBean(SubscriptionManager.class);
subscriptionManager.subscribeRecordsChangeEvent();
}
@Bean("restTemplate")
public RestTemplate restTemplate() {
return new RestTemplate();
}
@Bean
public static BeanFactoryPostProcessor beanFactoryPostProcessor() {
return new ThreadScopeBeanFactoryPostProcessor();
}
}
......@@ -39,7 +39,7 @@ public class JwtTokenGenerator implements UserCredential {
}
catch (UnsupportedEncodingException e) {
LOGGER.error(e.getMessage(), e);
LOGGER.warn(e.getMessage(), e);
throw new ApplicationException(e.getMessage(), e.getCause());
}
......
......@@ -38,6 +38,8 @@ public class ProcessWKSTransform {
private ThreadDpsHeaders dpsHeaders;
public void initiateWksTransformation(IMessage message) {
boolean transformationSucceeded = false;
try {
String messageBody = new String(message.getMessageBody().getBinaryData().get(0), UTF_8);
......@@ -56,23 +58,28 @@ public class ProcessWKSTransform {
String dataPayload = messageData.getAsJsonObject().get(DATA).toString();
RawRecordDetails[] rawRecordDetails = getRecordsChangeData(dataPayload);
LOGGER.info("Transformation started for message with id: " + message.getMessageId());
LOGGER.info("Transformation started for message with id: {} containing {} Records", message.getMessageId(), rawRecordDetails.length);
wKSService.transform(rawRecordDetails, dataPartitionId, correlationId);
LOGGER.info("Transformation completed for message with id: " + message.getMessageId());
LOGGER.info("Transformation completed for message with id: {}", message.getMessageId());
transformationSucceeded = true;
} catch (BadRequestException e) {
LOGGER.error("Bad Request Reason: {}, pubsub message id: {}", e.getErrorMsg(),
LOGGER.warn("Bad Request Reason: {}, pubsub message id: {}", e.getErrorMsg(),
message.getMessageId(), e);
} catch (ApplicationException e) {
LOGGER.error("Application Error Reason: {}, pubsub message id: {}", e.getErrorMsg(),
LOGGER.warn("Application Error Reason: {}, pubsub message id: {}", e.getErrorMsg(),
message.getMessageId(), e);
} catch (NullPointerException e) {
LOGGER.error("Invalid format for message with id: {}", message.getMessageId(), e);
LOGGER.warn("Invalid format for message with id: {}", message.getMessageId(), e);
}
catch (Exception e) {
LOGGER.error("Exception encountered processing the message with id: {}", message.getMessageId(), e);
LOGGER.warn("Exception encountered processing the message with id: {}", message.getMessageId(), e);
}
if(!transformationSucceeded) {
LOGGER.info("Transformation failed for message with id: {}", message.getMessageId());
}
ThreadScopeContextHolder.getContext().clear();
......
......@@ -57,7 +57,11 @@ public class SubscriptionManagerImpl implements SubscriptionManager {
MessageHandler messageHandler = new MessageHandler(subscriptionClient, processWKSTransform);
subscriptionClient.registerMessageHandler(
messageHandler,
new MessageHandlerOptions(Integer.parseUnsignedInt(azureBootstrapConfig.getMaxConcurrentCalls()), false, Duration.ofSeconds(Integer.parseUnsignedInt(azureBootstrapConfig.getMaxLockRenewDurationInSeconds()))),
new MessageHandlerOptions(Integer.parseUnsignedInt(azureBootstrapConfig.getMaxConcurrentCalls()),
false,
Duration.ofSeconds(Integer.parseUnsignedInt(azureBootstrapConfig.getMaxLockRenewDurationInSeconds())),
Duration.ofSeconds(1)
),
executorService);
} catch (InterruptedException | ServiceBusException e) {
......
......@@ -38,7 +38,7 @@ public class StatusStoreServiceImpl implements StatusStoreService {
cosmosStore.upsertItem(requestIdentity.getDataPartitionId(), cosmosContainerConfig.getDatabase(), cosmosContainerConfig.getRelationshipStatusContainer(), doc.getId(), doc);
}
catch (AppException exception) {
LOGGER.error("Status update failed for Record ID: {} and Mapping ID: {} with exception {}", relationshipStatus.getRawRecordId(), relationshipStatus.getMappingId(), exception.getError().getMessage(), exception);
LOGGER.warn("Status update failed for Record ID: {} and Mapping ID: {} with exception {}", relationshipStatus.getRawRecordId(), relationshipStatus.getMappingId(), exception.getError().getMessage(), exception);
}
}
}
......
......@@ -81,7 +81,7 @@ public class MappingStoreImpl implements MappingStore {
mappingsList.add(mapper.readValue(content, MappingsModel.class));
} catch (Exception e) {
LOGGER.error("Error while processing mappings from blob store {}", e.getMessage(), e);
LOGGER.warn("Error while processing mappings from blob store {}", e.getMessage(), e);
}
}
return mappingsList;
......
......@@ -53,13 +53,19 @@ spring.application.name=wks-azure
shared_tenant=${default_tenant}
# Storage service
STORAGE_API=${storage_service_endpoint}
osdu.host.storageServiceUrl=${storage_service_endpoint}
osdu.restclient.storageRetryIntervalInSeconds=1
osdu.restclient.storageRetryCounter=3
# Search service
SEARCH_API=${search_service_endpoint}
osdu.host.searchServiceUrl=${search_service_endpoint}
osdu.restclient.searchRetryIntervalInSeconds=1
osdu.restclient.searchRetryCounter=3
#Schema service
osdu.host.schemaServiceUrl=${schema_service_endpoint}
osdu.restclient.schemaRetryIntervalInSeconds=1
osdu.restclient.schemaRetryCounter=3
# Partition service
PARTITION_API=${partition_service_endpoint}
......
......@@ -80,7 +80,7 @@ public class ProcessWKSTransformTest {
processWKSTransform.initiateWksTransformation(message);
verify(message, times(1)).getMessageBody();
verify(message, times(1)).getMessageId();
verify(message, times(2)).getMessageId();
}
@Test
......@@ -92,7 +92,7 @@ public class ProcessWKSTransformTest {
verify(mdcContextMap, times(1)).getContextMap(correlationId, dataPartitionId);
verify(message, times(1)).getMessageBody();
verify(message, times(1)).getMessageId();
verify(message, times(2)).getMessageId();
verify(message, times(1)).setMessageId(messageId);
}
......@@ -107,7 +107,7 @@ public class ProcessWKSTransformTest {
verify(mdcContextMap, times(1)).getContextMap(correlationId, dataPartitionId);
verify(message, times(1)).getMessageBody();
verify(message, times(2)).getMessageId();
verify(message, times(3)).getMessageId();
verify(message, times(1)).setMessageId(messageId);
}
......
......@@ -19,7 +19,7 @@ import org.springframework.web.client.RestTemplate;
})
public class WksServiceApplication {
private final static Logger LOGGER = LoggerFactory.getLogger(SchemaService.class);
private final static Logger LOGGER = LoggerFactory.getLogger(WksServiceApplication.class);
public static void main(String[] args) {
ApplicationContext context = SpringApplication.run(WksServiceApplication.class, args);
try {
......
......@@ -15,6 +15,7 @@ public class Constants {
public static final String NO_MAPPING_FOR_KIND = "Mapping not found for {}";
public static final String NO_MAPPING_FOR_ANY_RECORD = "Mapping not found for any record";
public static final String ERROR_WHILE_GETTING_RAW_RECORD = "Error retrieving raw record.";
public static final String TRANSFORMATION_STARTED = "Transformation started for raw record with id: {} with {} mapping files";
public static final String TRANFORMATION_SUCCESSFUL = "Transformed successfully raw record id: {}, wks record id: {} and wks schema kind: {}";
public static final String TRANFORMATION_ALREADY_DONE = "Transformation already done for {} out of {} records";
public static final String RAW_RECORD_NOT_PRESENT = "Raw record not present";
......@@ -24,6 +25,7 @@ public class Constants {
public static final String LAST_MODIFIED_DATE = "last_modified_date";
public static final String DASH = "-";
public static final String POINT = ".";
public static final String Slash = "/";
public static final String MISSING_RAW_RECORD_DETAILS = "Missing raw records details";
public static final String DATA_PARTITION_ID_NEEDED = "Data Partition Id is needed";
public static final String CORRELATION_ID_NEEDED = "Correlation Id is needed";
......
package org.opengroup.osdu.wks.constants;
public class SearchConstants {
public static final String POST_SEARCH_ENDPOINT = "/query";
}
package org.opengroup.osdu.wks.constants;
public class StorageConstants {
public static final String PUT_STORAGE_ENDPOINT = "/records";
public static final String GET_STORAGE_ENDPOINT = "/records";
}
package org.opengroup.osdu.wks.service;
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties("osdu.restclient")
@Getter
@Setter
public class RestClientConfig {
@Value("#{new Integer('${searchRetryIntervalInSeconds:60}')}")
private Integer searchRetryIntervalInSeconds;
@Value("#{new Integer('${storageRetryIntervalInSeconds:60}')}")
private Integer storageRetryIntervalInSeconds;
@Value("#{new Integer('${schemaRetryIntervalInSeconds:60}')}")
private Integer schemaRetryIntervalInSeconds;
@Value("#{new Integer('${searchRetryCounter:3}')}")
private Integer searchRetryCounter;
@Value("#{new Integer('${storageRetryCounter:3}')}")
private Integer storageRetryCounter;
@Value("#{new Integer('${schemaRetryCounter:3}')}")
private Integer schemaRetryCounter;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment