diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexSchemaService.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexSchemaService.java index 7e512ae7af32a70650372cfd2a2e48bed6093e27..1562f19565dd17eda89b090ea1aafd08251ecc49 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexSchemaService.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexSchemaService.java @@ -20,10 +20,13 @@ import org.opengroup.osdu.core.common.model.indexer.IndexSchema; import org.opengroup.osdu.core.common.model.indexer.OperationType; import java.io.IOException; +import java.util.List; import java.util.Map; public interface IndexSchemaService { + IndexSchema getIndexerInputSchema(String kind, List<String> errors) throws AppException; + IndexSchema getIndexerInputSchema(String kind, boolean invalidateCached) throws AppException; void processSchemaMessages(Map<String, OperationType> schemaMsgs) throws IOException; diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexSchemaServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexSchemaServiceImpl.java index fb0469356a95978325aa45d9f44b3dd186b308fa..b3d251883e9645f8e325a4cd494fe59eb2738774 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexSchemaServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexSchemaServiceImpl.java @@ -16,12 +16,6 @@ package org.opengroup.osdu.indexer.service; import com.google.common.base.Strings; import com.google.gson.Gson; -import java.io.IOException; -import java.io.UnsupportedEncodingException; -import java.net.URISyntaxException; -import java.util.HashMap; -import java.util.Map; -import javax.inject.Inject; import org.apache.http.HttpStatus; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; @@ -38,18 +32,23 @@ import org.opengroup.osdu.core.common.model.storage.SchemaItem; import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver; import org.opengroup.osdu.core.common.search.IndicesService; import org.opengroup.osdu.indexer.provider.interfaces.ISchemaCache; +import org.opengroup.osdu.indexer.schema.converter.exeption.SchemaProcessingException; import org.opengroup.osdu.indexer.util.ElasticClientHandler; import org.opengroup.osdu.indexer.util.TypeMapper; import org.springframework.stereotype.Service; +import javax.inject.Inject; +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + @Service public class IndexSchemaServiceImpl implements IndexSchemaService { private static final String FLATTENED_SCHEMA = "_flattened"; - private static final String WELLBORE_MARKER_SET = "WellboreMarkerSet"; - private static final String MARKERS = "Markers"; - private static final String WELL_LOG = "WellLog"; - private static final String CURVES = "Curves"; private final Gson gson = new Gson(); @@ -122,6 +121,21 @@ public class IndexSchemaServiceImpl implements IndexSchemaService { } } + @Override + public IndexSchema getIndexerInputSchema(String kind, List<String> errors) throws AppException { + try { + return getIndexerInputSchema(kind, false); + } catch (SchemaProcessingException ex) { + log.error(ex.getMessage(), ex); + errors.add(ex.getMessage()); + } catch (RuntimeException ex) { + String msg = String.format("Failed to get the schema from the Schema service, kind: %s | message: %s", kind, ex.getMessage()); + log.error(msg, ex); + errors.add(msg); + } + return this.getEmptySchema(kind); + } + @Override public IndexSchema getIndexerInputSchema(String kind, boolean invalidateCached) throws AppException { @@ -135,8 +149,7 @@ public class IndexSchemaServiceImpl implements IndexSchemaService { // get from storage schema = getSchema(kind); if (Strings.isNullOrEmpty(schema)) { - Schema basicSchema = Schema.builder().kind(kind).build(); - return normalizeSchema(gson.toJson(basicSchema)); + return this.getEmptySchema(kind); } else { // cache the schema this.schemaCache.put(kind, schema); @@ -151,18 +164,22 @@ public class IndexSchemaServiceImpl implements IndexSchemaService { // search flattened schema in memcache String flattenedSchema = (String) this.schemaCache.get(kind + FLATTENED_SCHEMA); if (Strings.isNullOrEmpty(flattenedSchema)) { - Schema basicSchema = Schema.builder().kind(kind).build(); - return normalizeSchema(gson.toJson(basicSchema)); + return this.getEmptySchema(kind); } return this.gson.fromJson(flattenedSchema, IndexSchema.class); } } catch (AppException e) { throw e; } catch (Exception e) { - throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Schema parse/read error", "Error while reading schema via storage service.", e); + throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Schema parse/read error", "Error reading schema.", e); } } + private IndexSchema getEmptySchema(String kind) { + Schema basicSchema = Schema.builder().kind(kind).build(); + return normalizeSchema(gson.toJson(basicSchema)); + } + private String getSchema(String kind) throws URISyntaxException, UnsupportedEncodingException { return this.schemaProvider.getSchema(kind); } @@ -209,7 +226,7 @@ public class IndexSchemaServiceImpl implements IndexSchemaService { for (SchemaItem schemaItem : schemaObj.getSchema()) { String dataType = schemaItem.getKind(); Object elasticDataType = TypeMapper.getIndexerType(dataType, ElasticType.TEXT.getValue()); - if(schemaItem.getProperties() != null){ + if (schemaItem.getProperties() != null) { HashMap<String, Object> propertiesMap = normalizeInnerProperties(schemaItem); elasticDataType = TypeMapper.getObjectsArrayMapping(dataType, propertiesMap); } @@ -244,12 +261,12 @@ public class IndexSchemaServiceImpl implements IndexSchemaService { HashMap<String, Object> propertiesMap = new HashMap<>(); for (SchemaItem propertiesItem : schemaItem.getProperties()) { String propertiesItemKind = propertiesItem.getKind(); - Object propertiesElasticType = TypeMapper.getIndexerType(propertiesItemKind,ElasticType.TEXT.getValue()); - if(propertiesItem.getProperties() != null){ + Object propertiesElasticType = TypeMapper.getIndexerType(propertiesItemKind, ElasticType.TEXT.getValue()); + if (propertiesItem.getProperties() != null) { HashMap<String, Object> innerProperties = normalizeInnerProperties(propertiesItem); propertiesElasticType = TypeMapper.getObjectsArrayMapping(propertiesItemKind, innerProperties); } - propertiesMap.put(propertiesItem.getPath(),propertiesElasticType); + propertiesMap.put(propertiesItem.getPath(), propertiesElasticType); } return propertiesMap; } diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java index af1e5aea27f0b3846c68ba76a0514fa2ae714c22..60d299bdff5dd5e450616814b907533a429d0a4b 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/IndexerServiceImpl.java @@ -15,7 +15,6 @@ package org.opengroup.osdu.indexer.service; import com.google.gson.Gson; - import com.google.gson.GsonBuilder; import org.apache.http.HttpStatus; import org.elasticsearch.ElasticsearchStatusException; @@ -30,33 +29,34 @@ import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.rest.RestStatus; -import org.opengroup.osdu.core.common.model.entitlements.Acl; import org.opengroup.osdu.core.common.Constants; -import org.opengroup.osdu.core.common.model.http.DpsHeaders; -import org.opengroup.osdu.core.common.model.http.AppException; -import org.opengroup.osdu.core.common.model.indexer.*; import org.opengroup.osdu.core.common.logging.JaxRsDpsLog; -import org.opengroup.osdu.indexer.provider.interfaces.IPublisher; -import org.opengroup.osdu.indexer.logging.AuditLogger; -import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; +import org.opengroup.osdu.core.common.model.entitlements.Acl; +import org.opengroup.osdu.core.common.model.http.AppException; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.http.RequestStatus; +import org.opengroup.osdu.core.common.model.indexer.*; import org.opengroup.osdu.core.common.model.search.RecordChangedMessages; import org.opengroup.osdu.core.common.model.search.RecordMetaAttribute; import org.opengroup.osdu.core.common.provider.interfaces.IRequestInfo; +import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver; import org.opengroup.osdu.core.common.search.IndicesService; +import org.opengroup.osdu.indexer.logging.AuditLogger; +import org.opengroup.osdu.indexer.provider.interfaces.IPublisher; import org.opengroup.osdu.indexer.util.ElasticClientHandler; -import org.opengroup.osdu.core.common.search.ElasticIndexNameResolver; -import org.apache.commons.beanutils.PropertyUtils; -import org.apache.commons.beanutils.NestedNullException; +import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import javax.inject.Inject; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; import java.util.function.Consumer; -import java.util.logging.Level; import java.util.stream.Collectors; @Service @@ -221,8 +221,11 @@ public class IndexerServiceImpl implements IndexerService { for (Map.Entry<String, Map<String, OperationType>> entry : upsertRecordMap.entrySet()) { String kind = entry.getKey(); - IndexSchema schemaObj = this.schemaService.getIndexerInputSchema(kind, false); - if (schemaObj.isDataSchemaMissing()) { + List<String> errors = new ArrayList<>(); + IndexSchema schemaObj = this.schemaService.getIndexerInputSchema(kind, errors); + if (!errors.isEmpty()) { + this.jobStatus.addOrUpdateRecordStatus(entry.getValue().keySet(), IndexingStatus.WARN, RequestStatus.INVALID_RECORD, String.join("|", errors), String.format("error | kind: %s", kind)); + } else if (schemaObj.isDataSchemaMissing()) { this.jobStatus.addOrUpdateRecordStatus(entry.getValue().keySet(), IndexingStatus.WARN, HttpStatus.SC_NOT_FOUND, "schema not found", String.format("schema not found | kind: %s", kind)); } diff --git a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/SchemaProviderImpl.java b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/SchemaProviderImpl.java index 6f2bf52cca0a5addf8c3c59a297cb548cd9171c6..164cf5fa4fac298e761ea2195246c591067c76b8 100644 --- a/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/SchemaProviderImpl.java +++ b/indexer-core/src/main/java/org/opengroup/osdu/indexer/service/SchemaProviderImpl.java @@ -58,18 +58,7 @@ public class SchemaProviderImpl implements SchemaService { @Override public String getSchema(String kind) throws URISyntaxException, UnsupportedEncodingException { - String schemaServiceSchema; - - try { - schemaServiceSchema = getFromSchemaService(kind); - } catch (SchemaProcessingException ex) { - log.error(ex.getMessage(), ex); - return null; - } catch (RuntimeException ex) { - log.error(String.format("Failed to get the schema from the Schema service, kind: %s | message: %s", kind, ex.getMessage()), ex); - return null; - } - + String schemaServiceSchema = getFromSchemaService(kind); return Objects.nonNull(schemaServiceSchema) ? schemaServiceSchema : getFromStorageService(kind); }