diff --git a/indexer-service-gcp/src/main/java/org/opendes/indexer/kms/KmsClient.java b/indexer-service-gcp/src/main/java/org/opendes/indexer/kms/KmsClient.java index f2c4304bff21eefeeced3a504457f6b968d6376d..4aaea42d99bf8b77d4ec499c60a8f674f274be21 100644 --- a/indexer-service-gcp/src/main/java/org/opendes/indexer/kms/KmsClient.java +++ b/indexer-service-gcp/src/main/java/org/opendes/indexer/kms/KmsClient.java @@ -26,7 +26,6 @@ import com.google.api.services.cloudkms.v1.model.DecryptResponse; import com.google.api.services.cloudkms.v1.model.EncryptRequest; import com.google.api.services.cloudkms.v1.model.EncryptResponse; import org.opendes.core.kms.IKmsClient; -import org.opendes.core.util.Config; import org.opendes.core.util.Preconditions; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @@ -37,6 +36,9 @@ import java.nio.charset.StandardCharsets; @Component public class KmsClient implements IKmsClient { + @Value("${GOOGLE_CLOUD_PROJECT}") + private String GOOGLE_CLOUD_PROJECT; + @Value("${KMS_KEY}") private String KMS_KEY; @@ -53,7 +55,7 @@ public class KmsClient implements IKmsClient { Preconditions.checkNotNullOrEmpty(textToBeEncrypted, "textToBeEncrypted cannot be null"); byte[] plaintext = textToBeEncrypted.getBytes(StandardCharsets.UTF_8); - String resourceName = String.format(KEY_NAME, Config.getGoogleCloudProjectId(), KEY_RING, KMS_KEY); + String resourceName = String.format(KEY_NAME, GOOGLE_CLOUD_PROJECT, KEY_RING, KMS_KEY); CloudKMS kms = createAuthorizedClient(); EncryptRequest request = new EncryptRequest().encodePlaintext(plaintext); EncryptResponse response = kms.projects().locations().keyRings().cryptoKeys() @@ -70,7 +72,7 @@ public class KmsClient implements IKmsClient { Preconditions.checkNotNullOrEmpty(textToBeDecrypted, "textToBeDecrypted cannot be null"); CloudKMS kms = createAuthorizedClient(); - String cryptoKeyName = String.format(KEY_NAME, Config.getGoogleCloudProjectId(), KEY_RING, KMS_KEY); + String cryptoKeyName = String.format(KEY_NAME, GOOGLE_CLOUD_PROJECT, KEY_RING, KMS_KEY); DecryptRequest request = new DecryptRequest().setCiphertext(textToBeDecrypted); DecryptResponse response = kms.projects().locations().keyRings().cryptoKeys() .decrypt(cryptoKeyName, request) diff --git a/indexer-service-gcp/src/main/java/org/opendes/indexer/persistence/ElasticRepositoryDatastore.java b/indexer-service-gcp/src/main/java/org/opendes/indexer/persistence/ElasticRepositoryDatastore.java index 6731588b01719990f434eb9c68da2f866f72d4bd..9e26e298cd36b5f8a8acdb5eb72cb0097ea974d5 100644 --- a/indexer-service-gcp/src/main/java/org/opendes/indexer/persistence/ElasticRepositoryDatastore.java +++ b/indexer-service-gcp/src/main/java/org/opendes/indexer/persistence/ElasticRepositoryDatastore.java @@ -23,10 +23,10 @@ import org.opendes.client.multitenancy.TenantInfo; import org.opendes.core.model.ClusterSettings; import org.opendes.core.persistence.ElasticRepository; import org.opendes.core.util.AppException; -import org.opendes.core.util.Config; import org.opendes.core.util.Preconditions; import org.opendes.indexer.kms.KmsClient; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component @@ -41,11 +41,17 @@ public class ElasticRepositoryDatastore implements ElasticRepository { @Autowired private DatastoreFactory datastoreFactory; + @Value("${ELASTIC_DATASTORE_KIND}") + private String ELASTIC_DATASTORE_KIND; + + @Value("${ELASTIC_DATASTORE_ID}") + private String ELASTIC_DATASTORE_ID; + @Override public ClusterSettings getElasticClusterSettings(TenantInfo tenantInfo) { Datastore googleDatastore = this.datastoreFactory.getDatastoreInstance(tenantInfo); - Key key = googleDatastore.newKeyFactory().setKind(Config.getElasticCredentialsDatastoreKind()).newKey(Config.getElasticCredentialsDatastoreId()); + Key key = googleDatastore.newKeyFactory().setKind(ELASTIC_DATASTORE_KIND).newKey(ELASTIC_DATASTORE_ID); Entity datastoreEntity = googleDatastore.get(key); if (datastoreEntity == null) { diff --git a/indexer-service-gcp/src/main/java/org/opendes/indexer/publish/PublisherImpl.java b/indexer-service-gcp/src/main/java/org/opendes/indexer/publish/PublisherImpl.java index 3c1ef58dec682b6f35e843176547892b74256732..1f7ca43ca8b78eb1e26c187084c6ea675d6b71e0 100644 --- a/indexer-service-gcp/src/main/java/org/opendes/indexer/publish/PublisherImpl.java +++ b/indexer-service-gcp/src/main/java/org/opendes/indexer/publish/PublisherImpl.java @@ -23,6 +23,7 @@ import com.google.gson.reflect.TypeToken; import com.google.protobuf.ByteString; import com.google.pubsub.v1.ProjectTopicName; import com.google.pubsub.v1.PubsubMessage; +import lombok.extern.java.Log; import org.apache.http.HttpStatus; import org.elasticsearch.common.Strings; import org.opendes.client.api.DpsHeaders; @@ -46,6 +47,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +@Log @Component public class PublisherImpl implements IPublisher { @@ -107,7 +109,7 @@ public class PublisherImpl implements IPublisher { } builder.putAttributes(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); - builder.putAttributes( AppEngineHeaders.CLOUD_TRACE_CONTEXT, headers.getHeaders().get(AppEngineHeaders.CLOUD_TRACE_CONTEXT)); +// builder.putAttributes( AppEngineHeaders.CLOUD_TRACE_CONTEXT, headers.getHeaders().get(AppEngineHeaders.CLOUD_TRACE_CONTEXT)); builder.setData(statusChangedTagsData); return builder.build(); diff --git a/indexer-service-gcp/src/main/java/org/opendes/indexer/util/HeadersInfoGcpImpl.java b/indexer-service-gcp/src/main/java/org/opendes/indexer/util/HeadersInfoGcpImpl.java index f862200c35ec745479e6bd91a19e9d6354e4befe..44a8bfe7398f76bcf7ac1e2a2b6ad5848d6c10b4 100644 --- a/indexer-service-gcp/src/main/java/org/opendes/indexer/util/HeadersInfoGcpImpl.java +++ b/indexer-service-gcp/src/main/java/org/opendes/indexer/util/HeadersInfoGcpImpl.java @@ -33,10 +33,6 @@ import java.util.stream.Collectors; @Component public class HeadersInfoGcpImpl implements IHeadersInfo { -// @Autowired -// private HttpHeaders httpHeaders; -// -// private DpsHeaders headersMap = null; @Autowired private DpsHeaders headersMap; @@ -64,10 +60,10 @@ public class HeadersInfoGcpImpl implements IHeadersInfo { @Override public DpsHeaders getHeaders() { if (headersMap == null) { - log.info("Headers Map DpsHeaders is null"); -// headersMap = this.getCoreServiceHeaders(httpHeaders.toSingleValueMap()); + log.warning("Headers Map DpsHeaders is null"); } - return headersMap; + DpsHeaders headers = this.getCoreServiceHeaders(headersMap.getHeaders()); + return headers; } @Override diff --git a/indexer-service-root/src/main/java/org/opendes/indexer/api/RecordIndexerApi.java b/indexer-service-root/src/main/java/org/opendes/indexer/api/RecordIndexerApi.java index 7bad1217534c99f9db8a1fc1bf4158e2979791ba..cfcd7bf5b1273935083bfbbb8b078438c0336e46 100644 --- a/indexer-service-root/src/main/java/org/opendes/indexer/api/RecordIndexerApi.java +++ b/indexer-service-root/src/main/java/org/opendes/indexer/api/RecordIndexerApi.java @@ -59,7 +59,6 @@ public class RecordIndexerApi { if (recordChangedMessages == null) { log.info("record change messages is null"); } - log.info("RCM DATA: " + recordChangedMessages.getData()); Type listType = new TypeToken<List<RecordInfo>>() {}.getType(); List<RecordInfo> recordInfos = new Gson().fromJson(recordChangedMessages.getData(), listType); diff --git a/indexer-service-root/src/main/java/org/opendes/indexer/service/IndexerServiceImpl.java b/indexer-service-root/src/main/java/org/opendes/indexer/service/IndexerServiceImpl.java index f39c1143c43253fd58742c1dd4b0e819ea311fbb..4caf645dd8f0fca3a9d171121f16e513f5482d9f 100644 --- a/indexer-service-root/src/main/java/org/opendes/indexer/service/IndexerServiceImpl.java +++ b/indexer-service-root/src/main/java/org/opendes/indexer/service/IndexerServiceImpl.java @@ -16,6 +16,7 @@ package org.opendes.indexer.service; import com.google.gson.Gson; +import lombok.extern.java.Log; import org.apache.http.HttpStatus; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.action.bulk.BulkItemResponse; @@ -61,6 +62,7 @@ import java.util.stream.Collectors; import static org.opendes.indexer.service.IAttributeParsingService.DATA_GEOJSON_TAG; import static org.opendes.indexer.service.IAttributeParsingService.RECORD_GEOJSON_TAG; +@Log @Service public class IndexerServiceImpl implements IndexerService { @@ -71,7 +73,7 @@ public class IndexerServiceImpl implements IndexerService { private final Gson gson = new Gson(); @Autowired - private JaxRsDpsLog log; + private JaxRsDpsLog jaxRsDpsLog; @Autowired private AuditLogger auditLogger; @Autowired @@ -92,12 +94,11 @@ public class IndexerServiceImpl implements IndexerService { private ElasticIndexNameResolver elasticIndexNameResolver; @Autowired private IAttributeParsingService attributeParsingServiceImpl; -// @Autowired -// private IRequestInfo requestInfo; + @Autowired + private IRequestInfo requestInfo; @Autowired private JobStatus jobStatus; - @Autowired private DpsHeaders headers; @Override @@ -109,8 +110,8 @@ public class IndexerServiceImpl implements IndexerService { String errorMessage = ""; List<String> retryRecordIds = new LinkedList<>(); - // get auth header -// this.headers = this.requestInfo.getHeaders(); + // get auth header with service account Authorization + this.headers = this.requestInfo.getHeaders(); // initialize status for all messages. this.jobStatus.initialize(recordInfos); @@ -242,7 +243,7 @@ public class IndexerServiceImpl implements IndexerService { } } - log.info(String.format("valid upsert records: %s | can be indexed: %s", storageValidRecords.size(), indexerPayload.size())); + jaxRsDpsLog.info(String.format("valid upsert records: %s | can be indexed: %s", storageValidRecords.size(), indexerPayload.size())); // this should only happen if storage service returned WRONG records with kind for all the records in the messages if (indexerPayload.isEmpty()) { @@ -275,10 +276,10 @@ public class IndexerServiceImpl implements IndexerService { } } catch (AppException e) { this.jobStatus.addOrUpdateRecordStatus(storageRecord.getId(), IndexingStatus.FAIL, HttpStatus.SC_INTERNAL_SERVER_ERROR, e.getMessage()); - log.warning(String.format("record-id: %s | %s", storageRecord.getId(), e.getMessage()), e); + jaxRsDpsLog.warning(String.format("record-id: %s | %s", storageRecord.getId(), e.getMessage()), e); } catch (Exception e) { this.jobStatus.addOrUpdateRecordStatus(storageRecord.getId(), IndexingStatus.FAIL, HttpStatus.SC_INTERNAL_SERVER_ERROR, String.format("error parsing records against schema, error-message: %s", e.getMessage())); - log.error(String.format("record-id: %s | error parsing records against schema, error-message: %s", storageRecord.getId(), e.getMessage()), e); + jaxRsDpsLog.error(String.format("record-id: %s | error parsing records against schema, error-message: %s", storageRecord.getId(), e.getMessage()), e); } try { @@ -301,7 +302,7 @@ public class IndexerServiceImpl implements IndexerService { document.setOperationType(idToOperationMap.get(storageRecord.getId())); } catch (Exception e) { this.jobStatus.addOrUpdateRecordStatus(storageRecord.getId(), IndexingStatus.FAIL, HttpStatus.SC_INTERNAL_SERVER_ERROR, String.format("error parsing meta data, error-message: %s", e.getMessage())); - log.error(String.format("record-id: %s | error parsing meta data, error-message: %s", storageRecord.getId(), e.getMessage()), e); + jaxRsDpsLog.error(String.format("record-id: %s | error parsing meta data, error-message: %s", storageRecord.getId(), e.getMessage()), e); } return document; } @@ -409,7 +410,7 @@ public class IndexerServiceImpl implements IndexerService { if ((record.getData() == null || record.getData().isEmpty()) && !record.skippedDataIndexing()) { // it will come here when schema is missing // TODO: rollback once we know what is causing the problem - log.warning(String.format("data not found for record: %s", record)); + jaxRsDpsLog.warning(String.format("data not found for record: %s", record)); } OperationType operation = record.getOperationType(); @@ -457,7 +458,7 @@ public class IndexerServiceImpl implements IndexerService { try { BulkResponse bulkResponse = restClient.bulk(bulkRequest, RequestOptions.DEFAULT); - log.info(String.format("records in bulk request: %s | acknowledged in response: %s", bulkRequest.numberOfActions(), bulkResponse.getItems().length)); + jaxRsDpsLog.info(String.format("records in bulk request: %s | acknowledged in response: %s", bulkRequest.numberOfActions(), bulkResponse.getItems().length)); // log failed bulk requests ArrayList<String> bulkFailures = new ArrayList<>(); @@ -473,7 +474,7 @@ public class IndexerServiceImpl implements IndexerService { this.jobStatus.addOrUpdateRecordStatus(bulkItemResponse.getId(), IndexingStatus.SUCCESS, HttpStatus.SC_OK, "Indexed Successfully"); } } - if (!bulkFailures.isEmpty()) this.log.warning(bulkFailures); + if (!bulkFailures.isEmpty()) this.jaxRsDpsLog.warning(bulkFailures); } catch (IOException e) { // throw explicit 504 for IOException throw new AppException(HttpStatus.SC_GATEWAY_TIMEOUT, "Elastic error", "Request cannot be completed in specified time.", e); @@ -525,14 +526,14 @@ public class IndexerServiceImpl implements IndexerService { } catch (NestedNullException ignored) { // property not found in record } catch (IllegalArgumentException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { - log.warning(String.format("record-id: %s | error fetching property: %s | error: %s", recordId, propertyKey, e.getMessage()), e); + jaxRsDpsLog.warning(String.format("record-id: %s | error fetching property: %s | error: %s", recordId, propertyKey, e.getMessage()), e); } return null; } private void retryAndEnqueueFailedRecords(List<RecordInfo> recordInfos, List<String> failuresRecordIds, RecordChangedMessages message) throws IOException { - log.info(String.format("queuing bulk failed records back to task-queue for retry | count: %s | records: %s", failuresRecordIds.size(), failuresRecordIds)); + jaxRsDpsLog.info(String.format("queuing bulk failed records back to task-queue for retry | count: %s | records: %s", failuresRecordIds.size(), failuresRecordIds)); List<RecordInfo> retryRecordInfos = new LinkedList<>(); for (String recordId : failuresRecordIds) { for (RecordInfo origMessage : recordInfos) { diff --git a/indexer-service-root/src/main/java/org/opendes/indexer/util/JobStatus.java b/indexer-service-root/src/main/java/org/opendes/indexer/util/JobStatus.java index 623afe85fbbe40170efbee5759133549fdfb543d..b16b6a9fcb2b31b0964bd20a1c0395b964882464 100644 --- a/indexer-service-root/src/main/java/org/opendes/indexer/util/JobStatus.java +++ b/indexer-service-root/src/main/java/org/opendes/indexer/util/JobStatus.java @@ -21,14 +21,15 @@ import org.opendes.indexer.model.*; import lombok.Data; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import org.springframework.web.context.annotation.RequestScope; import java.time.Instant; import java.util.*; import java.util.stream.Collectors; -@Component -@Data @Log +@Data +@Component public class JobStatus { @Autowired @@ -40,7 +41,6 @@ public class JobStatus { public void initialize(List<RecordInfo> recordInfos) { - log.info(">>>>>>>> IN JOB STATUS INITIALIZE <<<<<<<<<"); if (recordInfos == null || recordInfos.isEmpty()) return; List<RecordStatus> statuses = recordInfos.stream().map(msg -> RecordStatus.builder() @@ -52,7 +52,6 @@ public class JobStatus { .build()).collect(Collectors.toList()); this.statusesList.addAll(statuses); - this.statusesList.forEach(s -> log.info("STATUS LIST: " + s.toString())); } public void addOrUpdateRecordStatus(Collection<String> ids, IndexingStatus status, int statusCode, String message, String debugInfo) {