Skip to content
Snippets Groups Projects
Commit 1db93652 authored by MIchael Nguyen's avatar MIchael Nguyen
Browse files

indexer impl.

parent ecda41ee
No related branches found
No related tags found
1 merge request!6Trusted ibm
Showing
with 155 additions and 121 deletions
......@@ -23,10 +23,15 @@
<artifactId>aws-java-sdk-core</artifactId>
<version>1.11.651</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-request-signing-apache-interceptor</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-osdu-util</artifactId>
<version>0.0.2</version>
<version>0.0.5</version>
</dependency>
<dependency>
<groupId>com.github.derjust</groupId>
......@@ -113,7 +118,7 @@
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>indexer-search-core-lib</artifactId>
<version>1.0.9</version>
<version>1.0.8</version>
<scope>compile</scope>
</dependency>
<dependency>
......
package org.opengroup.osdu.indexer.aws.cache;
import org.opengroup.osdu.is.core.provider.interfaces.cache.IIndexCache;
import org.springframework.stereotype.Component;
@Component
public class IndexCache implements IIndexCache<String, Boolean> {
@Override
public void put(String s, Boolean o) {
}
@Override
public Boolean get(String s) {
return null;
}
@Override
public void delete(String s) {
}
@Override
public void clearAll() {
}
}
package org.opengroup.osdu.indexer.aws.di;
import org.opengroup.osdu.core.logging.DpsLog;
import org.opengroup.osdu.core.logging.Log;
import org.springframework.beans.factory.config.AbstractFactoryBean;
import org.springframework.stereotype.Component;
@Component
public class DpsLogFactory extends AbstractFactoryBean<DpsLog> {
@Override
protected DpsLog createInstance() throws Exception {
return new Log();
}
@Override
public Class<?> getObjectType() {
return DpsLog.class;
}
}
\ No newline at end of file
package org.opengroup.osdu.indexer.aws.publish;
import com.amazonaws.osdu.util.sns.PublishRequestBuilder;
import com.amazonaws.osdu.util.sns.SNSBuilder;
import com.amazonaws.services.sns.model.MessageAttributeValue;
import com.amazonaws.services.sns.model.PublishRequest;
import com.google.gson.Gson;
import com.amazonaws.services.sns.AmazonSNS;
import org.opengroup.osdu.core.api.DpsHeaders;
import org.opengroup.osdu.indexer.model.RecordStatus;
import org.opengroup.osdu.indexer.provider.interfaces.IPublisher;
import org.opengroup.osdu.indexer.util.JobStatus;
import org.springframework.beans.factory.annotation.Value;
......@@ -35,6 +37,7 @@ public class PublisherImpl implements IPublisher {
{
String json = new Gson().toJson(indexerBatchStatus.getStatusesList());
// attributes
Map<String, MessageAttributeValue> messageAttributes = new HashMap<>();
messageAttributes.put(DpsHeaders.ACCOUNT_ID, new MessageAttributeValue()
......@@ -48,8 +51,7 @@ public class PublisherImpl implements IPublisher {
.withDataType("String")
.withStringValue(headers.getCorrelationId()));
PublishRequest publishRequest = new PublishRequest(amazonSNSTopic, json)
.withMessageAttributes(messageAttributes);
PublishRequest publishRequest = new PublishRequestBuilder().generatePublishRequest("data", indexerBatchStatus.getStatusesList(), messageAttributes, amazonSNSTopic);
snsClient.publish(publishRequest);
}
......
package org.opengroup.osdu.indexer.aws.service;
import com.amazonaws.osdu.util.dynamodb.DynamoDBQueryHelper;
import com.google.gson.Gson;
import org.checkerframework.checker.units.qual.A;
import org.opengroup.osdu.indexer.aws.model.RecordMetadataDoc;
import org.opengroup.osdu.indexer.aws.model.SchemaDoc;
import org.opengroup.osdu.indexer.aws.util.dynamodb.record.DynamoDBRecord;
import org.opengroup.osdu.indexer.aws.util.dynamodb.schema.DynamoDBSchema;
import org.opengroup.osdu.indexer.aws.util.s3.S3RecordClient;
import org.opengroup.osdu.indexer.model.*;
import org.opengroup.osdu.indexer.service.StorageService;
import org.opengroup.osdu.is.core.util.AppException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
import java.io.Console;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
@Primary
......@@ -31,17 +23,11 @@ import java.util.stream.Collectors;
public class AwsStorageServiceImpl implements StorageService {
@Inject
DynamoDBSchema dynamoDBSchema;
@Inject
DynamoDBRecord dynamoDBRecord;
private DynamoDBQueryHelper queryHelper;
@Inject
S3RecordClient s3Client;
// private String getKeyNameForLatestVersion( recordMetadata){
// return recordMetadata.getKind() + "/" + recordMetadata.getId() + "/" + recordMetadata.getLatestVersion();
// }
@Override
public org.opengroup.osdu.indexer.model.Records getStorageRecords(List<String> ids) throws AppException, URISyntaxException {
Records records = new Records();
......@@ -50,14 +36,12 @@ public class AwsStorageServiceImpl implements StorageService {
List<ConversionStatus> statuses = new ArrayList<>();
Gson gson = new Gson();
for (String id: ids) {
Optional<RecordMetadataDoc> doc = dynamoDBRecord.findById(id);
if (!doc.isPresent()){
RecordMetadataDoc doc = queryHelper.loadByPrimaryKey(RecordMetadataDoc.class, id);
if (doc == null) {
notFound.add(id);
}
String record = s3Client.getRecord(doc.get().getMetadata());
Records.Entity entity = gson.fromJson(record, Records.Entity.class);
statuses.add(ConversionStatus.builder().status(doc.get().getStatus()).build());
Records.Entity entity = createRecord(doc, gson);
statuses.add(ConversionStatus.builder().status(doc.getStatus()).build());
entities.add(entity);
}
records.setNotFound(notFound);
......@@ -66,6 +50,20 @@ public class AwsStorageServiceImpl implements StorageService {
return records;
}
private Records.Entity createRecord(RecordMetadataDoc recordMetadataDoc, Gson gson) {
String record = s3Client.getRecord(recordMetadataDoc.getMetadata());
Records.Entity entity = gson.fromJson(record, Records.Entity.class);
entity.setId(recordMetadataDoc.getId());
entity.setAcl(recordMetadataDoc.getMetadata().getAcl());
entity.setAncestry(recordMetadataDoc.getMetadata().getAncestry());
entity.setKind(recordMetadataDoc.getKind());
Legal legal = new Legal();
legal.setLegaltags(recordMetadataDoc.getLegaltags().toArray(new String[recordMetadataDoc.getLegaltags().size()]));
entity.setLegal(legal);
entity.setVersion(recordMetadataDoc.getMetadata().getLatestVersion());
return entity;
}
@Override
public RecordQueryResponse getRecordsByKind(RecordReindexRequest request) throws URISyntaxException {
......@@ -74,15 +72,13 @@ public class AwsStorageServiceImpl implements StorageService {
@Override
public String getStorageSchema(String kind) throws URISyntaxException, UnsupportedEncodingException {
Optional<SchemaDoc> sd = dynamoDBSchema.findById(kind);
if (!sd.isPresent()) {
return null;
}
SchemaDoc sd = queryHelper.loadByPrimaryKey(SchemaDoc.class, kind);
if (sd == null) return null;
// Create a Schema object and assign the values retrieved from DynamoDB
Schema newSchema = new Schema();
newSchema.setKind(kind);
List<Schema.Mapping> mappings;
mappings = sd.get().getSchemaItems().stream().map(schemaItem -> Schema.Mapping.builder().kind(schemaItem.getKind()).path(schemaItem.getPath()).build()).collect(Collectors.toList());
mappings = sd.getSchemaItems().stream().map(schemaItem -> Schema.Mapping.builder().kind(schemaItem.getKind()).path(schemaItem.getPath()).build()).collect(Collectors.toList());
newSchema.setSchema(mappings);
return new Gson().toJson(newSchema);
}
......
package org.opengroup.osdu.indexer.aws.service;
import com.amazonaws.auth.AWS4Signer;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.http.AWSRequestSigningApacheInterceptor;
import com.amazonaws.osdu.util.iam.IAMConfig;
import org.apache.http.HttpHost;
import org.apache.http.HttpRequestInterceptor;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.opengroup.osdu.is.core.model.ClusterSettings;
import org.opengroup.osdu.is.core.util.ElasticClientHandler;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import javax.inject.Inject;
@Primary
@Component
public class ElasticClientHandlerAws extends ElasticClientHandler {
@Value("${aws.es.serviceName}")
private String serviceName;
@Value("${aws.region}")
private String region;
@Inject
private AwsElasticSettingServiceImpl elasticSettingService;
public ElasticClientHandlerAws() {
}
@Override
public RestHighLevelClient createRestClient() {
ClusterSettings clusterSettings = elasticSettingService.getElasticClusterInformation();
return esclient(clusterSettings.getHost());
}
private RestHighLevelClient esclient(String host) {
AWSCredentialsProvider credentials = new IAMConfig().amazonAWSCredentials();
AWS4Signer signer = new AWS4Signer();
signer.setServiceName(serviceName);
signer.setRegionName(region);
HttpRequestInterceptor interceptor = new AWSRequestSigningApacheInterceptor(serviceName, signer, credentials);
return new RestHighLevelClient(RestClient.builder(HttpHost.create(host)).setHttpClientConfigCallback(configCallBack -> configCallBack.addInterceptorLast(interceptor)));
}
}
package org.opengroup.osdu.indexer.aws.util;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.stereotype.Component;
@Component
@ComponentScan({"com.amazonaws.osdu.util"})
public class ComponentConfiguration {
}
......@@ -21,6 +21,7 @@ import org.opengroup.osdu.is.core.model.SlbHeaders;
import org.opengroup.osdu.is.core.provider.interfaces.util.IHeadersInfo;
import org.opengroup.osdu.is.core.util.AppException;
import org.opengroup.osdu.is.core.util.Preconditions;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import org.springframework.web.context.annotation.RequestScope;
......@@ -29,6 +30,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import javax.inject.Inject;
@Primary
@Log
@Component
@RequestScope
......
package org.opengroup.osdu.indexer.aws.util.dynamodb;
import com.amazonaws.osdu.util.dynamodb.DynamoDBBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.datamodeling.DynamoDBMapper;
import org.socialsignin.spring.data.dynamodb.repository.config.EnableDynamoDBRepositories;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
@EnableDynamoDBRepositories
(basePackages = "org.opengroup.osdu.indexer.aws")
public class DynamoDBConfig {
@Value("${aws.dynamodb.endpoint}")
private String amazonDynamoDBEndpoint;
@Value("${aws.dynamodb.region}")
private String amazonDynamoDBRegion;
@Value("${aws.dynamodb.table.prefix}")
private String tablePrefix;
@Bean
public AmazonDynamoDB amazonDynamoDB() {
// Generate the DynamoDB client
return DynamoDBBuilder.generateDynamoDBClient(amazonDynamoDBEndpoint, amazonDynamoDBRegion);
}
@Bean
@Primary
public DynamoDBMapper DynamoDBMapper() {
return DynamoDBBuilder.generateDynamoDBMapper(amazonDynamoDB(), tablePrefix);
}
}
package org.opengroup.osdu.indexer.aws.util.dynamodb.record;
import com.amazonaws.osdu.util.dynamodb.DynamoDBRepository;
import org.opengroup.osdu.indexer.aws.model.RecordMetadataDoc;
import org.socialsignin.spring.data.dynamodb.repository.EnableScan;
import org.socialsignin.spring.data.dynamodb.repository.EnableScanCount;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import java.util.List;
@EnableScan
@EnableScanCount
public interface DynamoDBRecord extends DynamoDBRepository<RecordMetadataDoc, String> {
// Currently unsupported by the spring-data-dynamodb library, see: https://github.com/derjust/spring-data-dynamodb/issues/114
// No release timeline at this time
List<RecordMetadataDoc> findByKindAndStatus(String kind, String status);
Page<RecordMetadataDoc> findByKindAndStatus(String kind, String status, Pageable pageable);
List<RecordMetadataDoc> findByUser(String user);
}
// Copyright © Amazon Web Services
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexer.aws.util.dynamodb.schema;
import com.amazonaws.osdu.util.dynamodb.DynamoDBRepository;
import org.opengroup.osdu.indexer.aws.model.SchemaDoc;
import org.socialsignin.spring.data.dynamodb.repository.EnableScan;
import org.socialsignin.spring.data.dynamodb.repository.EnableScanCount;
@EnableScan
@EnableScanCount
public interface DynamoDBSchema extends DynamoDBRepository<SchemaDoc, String> {}
......@@ -17,8 +17,8 @@ aws.s3.endpoint=s3.us-east-1.amazonaws.com
## AWS SNS configuration
aws.sns.region=us-east-1
aws.sns.arn=arn:aws:sns:us-east-1:888733619319:dev-osdu-messages
aws.sns.topic-name=dev-osdu-messages
aws.sns.arn=arn:aws:sns:us-east-1:888733619319:dev-osdu-indexer-messages
aws.sns.topic-name=dev-osdu-indexer-messages
#Spring Configuration
spring.security.user.name=opendes@byoc.local
......@@ -31,13 +31,16 @@ aws.dynamodb.table.prefix=dev-
aws.dynamodb.region=us-east-1
aws.dynamodb.endpoint=dynamodb.us-east-1.amazonaws.com
## AWS ES configuration
#aws.es.host=https://search-osdu-indexer-nj62ktooaiug2mjqzguzx5utr4.us-east-1.es.amazonaws.com
#aws.es.port=443
# AWS ES configuration
aws.es.host=https://search-dev-osdu-indexer-i5bpf2gv4iv6ha2xi7rook2rga.us-east-1.es.amazonaws.com
aws.es.port=-1
aws.es.userNameAndPassword=notused
aws.region=us-east-1
aws.es.serviceName=es
aws.es.host=e31703693e7442868515630d93038802.us-east-1.aws.found.io
aws.es.port=9243
aws.es.userNameAndPassword=elasticTmPLj2jfDNxeklXcXJPl34Ca
#aws.es.host=fc7169d00c9e437993904a989102d662.us-east-1.aws.found.io
#aws.es.port=9243
#aws.es.userNameAndPassword=elastic:YcfDGHnc2SpKmRfwTb6PBQcK
GAE_SERVICE=indexer
......
......@@ -240,7 +240,7 @@ public class IndexerServiceImpl implements IndexerService {
}
}
jaxRsDpsLog.info(String.format("valid upsert records: %s | can be indexed: %s", storageValidRecords.size(), indexerPayload.size()));
jaxRsDpsLog.info(String.format("valid upsert records: %s | can be indexed: %s", storageValidRecords.size(), indexerPayload.size()));
// this should only happen if storage service returned WRONG records with kind for all the records in the messages
if (indexerPayload.isEmpty()) {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment