Commit 8162c8e9 authored by ethiraj krishnamanaidu's avatar ethiraj krishnamanaidu
Browse files

OpenDES GA code merge

parent 689520c6
Pipeline #2510 failed with stages
in 23 minutes and 6 seconds
......@@ -153,7 +153,7 @@
<dependency>
<groupId>com.fasterxml.jackson.jaxrs</groupId>
<artifactId>jackson-jaxrs-json-provider</artifactId>
<version>2.9.9</version>
<version>2.9.10</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
......@@ -163,7 +163,7 @@
<dependency>
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<version>1.9.3</version>
<version>1.9.4</version>
</dependency>
<dependency>
......@@ -261,7 +261,7 @@
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>os-core-common</artifactId>
<version>0.0.13</version>
<version>0.0.18</version>
</dependency>
</dependencies>
......
......@@ -57,7 +57,7 @@
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>os-core-common</artifactId>
<version>0.0.13</version>
<version>0.0.18</version>
</dependency>
<!-- AWS-managed packages -->
......
......@@ -28,6 +28,7 @@ import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
......@@ -283,6 +284,7 @@ abstract class QueryBase {
SearchResponse makeSearchRequest(Query searchRequest, RestHighLevelClient client) {
Long startTime = 0L;
SearchRequest elasticSearchRequest = null;
SearchResponse searchResponse = null;
try {
if (searchRequest.getSpatialFilter() != null) {
......@@ -291,7 +293,7 @@ abstract class QueryBase {
elasticSearchRequest = createElasticRequest(searchRequest);
startTime = System.currentTimeMillis();
SearchResponse searchResponse = client.search(elasticSearchRequest, RequestOptions.DEFAULT);
searchResponse = client.search(elasticSearchRequest, RequestOptions.DEFAULT);
return searchResponse;
} catch (ElasticsearchStatusException e) {
switch (e.status()) {
......@@ -315,6 +317,7 @@ abstract class QueryBase {
Long latency = System.currentTimeMillis() - startTime;
String request = elasticSearchRequest != null ? elasticSearchRequest.source().toString() : searchRequest.toString();
this.log.info(String.format("elastic latency: %s | elastic request-payload: %s", latency, request));
this.auditLog(searchRequest, searchResponse);
}
}
......@@ -326,4 +329,16 @@ abstract class QueryBase {
}
abstract SearchRequest createElasticRequest(Query request) throws AppException, IOException;
abstract void querySuccessAuditLogger(Query request);
abstract void queryFailedAuditLogger(Query request);
private void auditLog(Query searchRequest, SearchResponse searchResponse) {
if (searchResponse != null && searchResponse.status() == RestStatus.OK) {
this.querySuccessAuditLogger(searchRequest);
return;
}
this.queryFailedAuditLogger(searchRequest);
}
}
\ No newline at end of file
......@@ -14,6 +14,7 @@
package org.opengroup.osdu.search.provider.aws.provider.impl;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
......@@ -52,7 +53,6 @@ public class QueryServiceAwsImpl extends QueryBase implements IQueryService {
QueryResponse queryResponse = this.executeQuery(searchRequest, client);
List<String> resources = new ArrayList<>();
resources.add(searchRequest.toString());
this.auditLogger.queryIndex(resources);
return queryResponse;
}
}
......@@ -64,7 +64,6 @@ public class QueryServiceAwsImpl extends QueryBase implements IQueryService {
QueryResponse queryResponse = executeQuery(searchRequest, client);
List<String> resources = new ArrayList<>();
resources.add(searchRequest.toString());
auditLogger.queryIndex(resources);
return queryResponse;
}
}
......@@ -74,11 +73,13 @@ public class QueryServiceAwsImpl extends QueryBase implements IQueryService {
List<Map<String, Object>> results = this.getHitsFromSearchResponse(searchResponse);
List<AggregationResponse> aggregations = getAggregationFromSearchResponse(searchResponse);
QueryResponse queryResponse = QueryResponse.getEmptyResponse();
queryResponse.setTotalCount(searchResponse.getHits().getTotalHits());
if (results != null) {
return QueryResponse.builder().results(results).aggregations(aggregations).totalCount(searchResponse.getHits().getTotalHits()).build();
} else {
return QueryResponse.getEmptyResponse();
queryResponse.setAggregations(aggregations);
queryResponse.setResults(results);
}
return queryResponse;
}
@Override
......@@ -103,4 +104,14 @@ public class QueryServiceAwsImpl extends QueryBase implements IQueryService {
return elasticSearchRequest;
}
@Override
void querySuccessAuditLogger(Query request) {
this.auditLogger.queryIndexSuccess(Lists.newArrayList(request.toString()));
}
@Override
void queryFailedAuditLogger(Query request) {
this.auditLogger.queryIndexFailed(Lists.newArrayList(request.toString()));
}
}
\ No newline at end of file
......@@ -14,6 +14,7 @@
package org.opengroup.osdu.search.provider.aws.provider.impl;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
......@@ -65,7 +66,7 @@ public class ScrollQueryServiceAwsImpl extends QueryBase implements IScrollQuery
@Override
public CursorQueryResponse queryIndex(CursorQueryRequest searchRequest) throws Exception {
CursorQueryResponse queryResponse = null;
CursorQueryResponse queryResponse = CursorQueryResponse.getEmptyResponse();
try (RestHighLevelClient client = this.elasticClientHandler.createRestClient()) {
if (StringUtils.isEmpty(searchRequest.getCursor())) {
......@@ -83,14 +84,14 @@ public class ScrollQueryServiceAwsImpl extends QueryBase implements IScrollQuery
SearchResponse searchScrollResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);
List<Map<String, Object>> results = getHitsFromSearchResponse(searchScrollResponse);
queryResponse.setTotalCount(searchScrollResponse.getHits().getTotalHits());
if (results != null) {
queryResponse = CursorQueryResponse.builder()
.cursor(this.refreshCursorCache(searchScrollResponse.getScrollId(), this.dpsHeaders.getUserEmail()))
.results(results)
.totalCount(searchScrollResponse.getHits().getTotalHits()).build();
queryResponse.setResults(results);
queryResponse.setCursor(this.refreshCursorCache(searchScrollResponse.getScrollId(), dpsHeaders.getUserEmail()));
List<String> resources = new ArrayList<>();
resources.add(searchRequest.toString());
this.auditLogger.queryIndexWithCursor(resources);
this.querySuccessAuditLogger(searchRequest);
}
} else {
throw new AppException(HttpServletResponse.SC_BAD_REQUEST, "Can't find the given cursor", "The given cursor is invalid or expired");
......@@ -109,7 +110,6 @@ public class ScrollQueryServiceAwsImpl extends QueryBase implements IScrollQuery
CursorQueryResponse queryResponse = this.executeCursorQuery(searchRequest, client);
List<String> resources = new ArrayList<>();
resources.add(searchRequest.toString());
this.auditLogger.queryIndexWithCursor(resources);
return queryResponse;
}
......@@ -124,7 +124,7 @@ public class ScrollQueryServiceAwsImpl extends QueryBase implements IScrollQuery
.totalCount(searchResponse.getHits().getTotalHits())
.build();
}
return null;
return CursorQueryResponse.getEmptyResponse();
}
@Override
......@@ -151,4 +151,14 @@ public class ScrollQueryServiceAwsImpl extends QueryBase implements IScrollQuery
}
return null;
}
@Override
void querySuccessAuditLogger(Query request) {
this.auditLogger.queryIndexWithCursorSuccess(Lists.newArrayList(request.toString()));
}
@Override
void queryFailedAuditLogger(Query request) {
this.auditLogger.queryIndexWithCursorFailed(Lists.newArrayList(request.toString()));
}
}
......@@ -53,7 +53,7 @@
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>os-core-common</artifactId>
<version>0.0.13</version>
<version>0.0.18</version>
</dependency>
<dependency>
<groupId>org.opengroup.osdu</groupId>
......
......@@ -16,44 +16,27 @@ package org.opengroup.osdu.search.provider.azure.provider.impl;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.multitenancy.ITenantInfoService;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.search.provider.interfaces.ICrossTenantInfoService;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.inject.Inject;
import java.util.LinkedList;
import java.util.List;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
@Component
public class CrossTenantInfoServiceImpl implements ITenantInfoService, ICrossTenantInfoService {
@Inject
private ITenantFactory tenantFactory;
/**
* TODO: Fix this: Due to two instances of DpsHeaders (one from corelib and another from os-indexer-search-lib, spring unable to inject the right one. So, instantiate using ConfigModule.
*/
@Inject
private DpsHeaders dpsHeaders;
@PostConstruct
public void init(){
dpsHeaders = new DpsHeaders();
}
@Override
public TenantInfo getTenantInfo() {
String primaryAccountId= TenantInfo.COMMON;
if(dpsHeaders != null){
if(dpsHeaders.getAccountId() != null)
primaryAccountId = dpsHeaders.getAccountId();
else if(dpsHeaders.getPartitionId() != null)
primaryAccountId = dpsHeaders.getPartitionId();
}
String primaryAccountId = this.dpsHeaders.getPartitionIdWithFallbackToAccountId();
TenantInfo tenantInfo = this.tenantFactory.getTenantInfo(primaryAccountId);
if (tenantInfo == null) {
throw AppException.createUnauthorized(String.format("could not retrieve tenant info for data partition id: %s", primaryAccountId));
......
......@@ -30,6 +30,7 @@ import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
......@@ -286,6 +287,7 @@ abstract class QueryBase {
SearchResponse makeSearchRequest(Query searchRequest, RestHighLevelClient client) {
Long startTime = 0L;
SearchRequest elasticSearchRequest = null;
SearchResponse searchResponse = null;
try {
if (searchRequest.getSpatialFilter() != null) {
......@@ -294,7 +296,7 @@ abstract class QueryBase {
elasticSearchRequest = createElasticRequest(searchRequest);
startTime = System.currentTimeMillis();
SearchResponse searchResponse = client.search(elasticSearchRequest, RequestOptions.DEFAULT);
searchResponse = client.search(elasticSearchRequest, RequestOptions.DEFAULT);
return searchResponse;
} catch (ElasticsearchStatusException e) {
switch (e.status()) {
......@@ -318,6 +320,7 @@ abstract class QueryBase {
Long latency = System.currentTimeMillis() - startTime;
String request = elasticSearchRequest != null ? elasticSearchRequest.source().toString() : searchRequest.toString();
this.log.info(String.format("elastic latency: %s | elastic request-payload: %s", latency, request));
this.auditLog(searchRequest, searchResponse);
}
}
......@@ -329,4 +332,16 @@ abstract class QueryBase {
}
abstract SearchRequest createElasticRequest(Query request) throws AppException, IOException;
abstract void querySuccessAuditLogger(Query request);
abstract void queryFailedAuditLogger(Query request);
private void auditLog(Query searchRequest, SearchResponse searchResponse) {
if (searchResponse != null && searchResponse.status() == RestStatus.OK) {
this.querySuccessAuditLogger(searchRequest);
return;
}
this.queryFailedAuditLogger(searchRequest);
}
}
\ No newline at end of file
......@@ -49,7 +49,6 @@ public class QueryServiceImpl extends QueryBase implements IQueryService {
public QueryResponse queryIndex(QueryRequest searchRequest) throws IOException {
try (RestHighLevelClient client = this.elasticClientHandler.createRestClient()) {
QueryResponse queryResponse = this.executeQuery(searchRequest, client);
this.auditLogger.queryIndex(Lists.newArrayList(searchRequest.toString()));
return queryResponse;
}
}
......@@ -59,7 +58,6 @@ public class QueryServiceImpl extends QueryBase implements IQueryService {
public QueryResponse queryIndex(QueryRequest searchRequest, ClusterSettings clusterSettings) throws Exception {
try (RestHighLevelClient client = elasticClientHandler.createRestClient(clusterSettings)) {
QueryResponse queryResponse = executeQuery(searchRequest, client);
auditLogger.queryIndex(Lists.newArrayList(searchRequest.toString()));
return queryResponse;
}
}
......@@ -69,11 +67,13 @@ public class QueryServiceImpl extends QueryBase implements IQueryService {
List<Map<String, Object>> results = this.getHitsFromSearchResponse(searchResponse);
List<AggregationResponse> aggregations = getAggregationFromSearchResponse(searchResponse);
QueryResponse queryResponse = QueryResponse.getEmptyResponse();
queryResponse.setTotalCount(searchResponse.getHits().getTotalHits());
if (results != null) {
return QueryResponse.builder().results(results).aggregations(aggregations).totalCount(searchResponse.getHits().getTotalHits()).build();
} else {
return QueryResponse.getEmptyResponse();
queryResponse.setAggregations(aggregations);
queryResponse.setResults(results);
}
return queryResponse;
}
@Override
......@@ -99,4 +99,14 @@ public class QueryServiceImpl extends QueryBase implements IQueryService {
return elasticSearchRequest;
}
@Override
void querySuccessAuditLogger(Query request) {
this.auditLogger.queryIndexSuccess(Lists.newArrayList(request.toString()));
}
@Override
void queryFailedAuditLogger(Query request) {
this.auditLogger.queryIndexFailed(Lists.newArrayList(request.toString()));
}
}
\ No newline at end of file
......@@ -24,6 +24,7 @@ import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.search.CursorQueryRequest;
import org.opengroup.osdu.core.common.model.search.CursorQueryResponse;
......@@ -65,7 +66,7 @@ public class ScrollQueryServiceImpl extends QueryBase implements IScrollQuerySer
@Override
public CursorQueryResponse queryIndex(CursorQueryRequest searchRequest) throws Exception {
CursorQueryResponse queryResponse = null;
CursorQueryResponse queryResponse = CursorQueryResponse.getEmptyResponse();
try (RestHighLevelClient client = this.elasticClientHandler.createRestClient()) {
if (Strings.isNullOrEmpty(searchRequest.getCursor())) {
......@@ -84,12 +85,12 @@ public class ScrollQueryServiceImpl extends QueryBase implements IScrollQuerySer
SearchResponse searchScrollResponse = client.scroll(scrollRequest, RequestOptions.DEFAULT);
List<Map<String, Object>> results = getHitsFromSearchResponse(searchScrollResponse);
queryResponse.setTotalCount(searchScrollResponse.getHits().getTotalHits());
if (results != null) {
queryResponse = CursorQueryResponse.builder()
.cursor(this.refreshCursorCache(searchScrollResponse.getScrollId(), this.dpsHeaders.getUserEmail()))
.results(results)
.totalCount(searchScrollResponse.getHits().getTotalHits()).build();
this.auditLogger.queryIndexWithCursor(Lists.newArrayList(searchRequest.toString()));
queryResponse.setResults(results);
queryResponse.setCursor(this.refreshCursorCache(searchScrollResponse.getScrollId(), dpsHeaders.getUserEmail()));
this.querySuccessAuditLogger(searchRequest);
}
} else {
throw new AppException(HttpServletResponse.SC_BAD_REQUEST, "Can't find the given cursor", "The given cursor is invalid or expired");
......@@ -106,7 +107,6 @@ public class ScrollQueryServiceImpl extends QueryBase implements IScrollQuerySer
private CursorQueryResponse initCursorQuery(CursorQueryRequest searchRequest, RestHighLevelClient client) {
CursorQueryResponse queryResponse = this.executeCursorQuery(searchRequest, client);
this.auditLogger.queryIndexWithCursor(Lists.newArrayList(searchRequest.toString()));
return queryResponse;
}
......@@ -121,7 +121,7 @@ public class ScrollQueryServiceImpl extends QueryBase implements IScrollQuerySer
.totalCount(searchResponse.getHits().getTotalHits())
.build();
}
return null;
return CursorQueryResponse.getEmptyResponse();
}
@Override
......@@ -133,6 +133,12 @@ public class ScrollQueryServiceImpl extends QueryBase implements IScrollQuerySer
// build query
SearchSourceBuilder sourceBuilder = this.createSearchSourceBuilder(request);
// Optimize Scroll request if users wants to iterate over all documents regardless of order
if (request.getSort() == null) {
sourceBuilder.sort(SortBuilders.scoreSort());
sourceBuilder.sort(SortBuilders.fieldSort("_doc"));
}
elasticSearchRequest.source(sourceBuilder);
elasticSearchRequest.scroll(new Scroll(SEARCH_SCROLL_TIMEOUT));
......@@ -148,4 +154,14 @@ public class ScrollQueryServiceImpl extends QueryBase implements IScrollQuerySer
}
return null;
}
@Override
void querySuccessAuditLogger(Query request) {
this.auditLogger.queryIndexWithCursorSuccess(Lists.newArrayList(request.toString()));
}
@Override
void queryFailedAuditLogger(Query request) {
this.auditLogger.queryIndexWithCursorFailed(Lists.newArrayList(request.toString()));
}
}
......@@ -49,7 +49,7 @@
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>os-core-common</artifactId>
<version>0.0.13</version>
<version>0.0.18</version>
</dependency>
<dependency>
......
......@@ -27,6 +27,7 @@ import org.elasticsearch.common.text.Text;
import org.elasticsearch.common.unit.DistanceUnit;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
......@@ -239,11 +240,12 @@ abstract class QueryBase {
SearchResponse makeSearchRequest(Query searchRequest, RestHighLevelClient client) {
Long startTime = 0L;
SearchRequest elasticSearchRequest = null;
SearchResponse searchResponse = null;
try {
elasticSearchRequest = createElasticRequest(searchRequest);
startTime = System.currentTimeMillis();
SearchResponse searchResponse = client.search(elasticSearchRequest, RequestOptions.DEFAULT);
searchResponse = client.search(elasticSearchRequest, RequestOptions.DEFAULT);
return searchResponse;
} catch (ElasticsearchStatusException e) {
switch (e.status()) {
......@@ -267,8 +269,22 @@ abstract class QueryBase {
Long latency = System.currentTimeMillis() - startTime;
String request = elasticSearchRequest != null ? elasticSearchRequest.source().toString() : searchRequest.toString();
this.log.info(String.format("elastic latency: %s | elastic request-payload: %s", latency, request));
this.auditLog(searchRequest, searchResponse);
}
}
abstract SearchRequest createElasticRequest(Query request) throws AppException;
abstract void querySuccessAuditLogger(Query request);
abstract void queryFailedAuditLogger(Query request);
private void auditLog(Query searchRequest, SearchResponse searchResponse) {
if (searchResponse != null && searchResponse.status() == RestStatus.OK) {
this.querySuccessAuditLogger(searchRequest);
return;
}
this.queryFailedAuditLogger(searchRequest);
}
}
\ No newline at end of file
......@@ -58,7 +58,6 @@ public class QueryServiceImpl extends QueryBase implements IQueryService {
public QueryResponse queryIndex(QueryRequest searchRequest) throws IOException {
try (RestHighLevelClient client = this.elasticClientHandler.createRestClient()) {
QueryResponse queryResponse = this.executeQuery(searchRequest, client);
this.auditLogger.queryIndex(Lists.newArrayList(searchRequest.toString()));
return queryResponse;
}
}
......@@ -68,7 +67,6 @@ public class QueryServiceImpl extends QueryBase implements IQueryService {
public QueryResponse queryIndex(QueryRequest searchRequest, ClusterSettings clusterSettings) throws Exception {
try (RestHighLevelClient client = elasticClientHandler.createRestClient(clusterSettings)) {
QueryResponse queryResponse = executeQuery(searchRequest, client);
auditLogger.queryIndex(Lists.newArrayList(searchRequest.toString()));
return queryResponse;
}
}
......@@ -79,11 +77,13 @@ public class QueryServiceImpl extends QueryBase implements IQueryService {
List<Map<String, Object>> results = this.getHitsFromSearchResponse(searchResponse);
List<AggregationResponse> aggregations = getAggregationFromSearchResponse(searchResponse);
QueryResponse queryResponse = QueryResponse.getEmptyResponse();
queryResponse.setTotalCount(searchResponse.getHits().getTotalHits());
if (results != null) {
return QueryResponse.builder().results(results).aggregations(aggregations).totalCount(searchResponse.getHits().getTotalHits()).build();
} else {
return QueryResponse.getEmptyResponse();
queryResponse.setAggregations(aggregations);
queryResponse.setResults(results);
}
return queryResponse;
}
@Override
......@@ -111,4 +111,15 @@ public class QueryServiceImpl extends QueryBase implements IQueryService {
return elasticSearchRequest;
}
@Override
void querySuccessAuditLogger(Query request) {
this.auditLogger.queryIndexSuccess(Lists.newArrayList(request.toString()));
}
@Override
void queryFailedAuditLogger(Query request) {
this.auditLogger.queryIndexFailed(Lists.newArrayList(request.toString()));
}
}
\ No newline at end of file
......@@ -25,6 +25,7 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.Scroll;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.search.util.ElasticClientHandler;
import org.opengroup.osdu.search.cache.CursorCache;
......@@ -71,7 +72,7 @@ public class ScrollQueryServiceImpl extends QueryBase implements IScrollQuerySer
@Override