From 5875a7d7fc781b9e67a9d578797daba379a4e0f8 Mon Sep 17 00:00:00 2001 From: adubey8 <adubey8@slb.com> Date: Tue, 24 Sep 2019 13:57:52 -0700 Subject: [PATCH] PublishImpl initial code --- .../indexer/azure/publish/PublisherImpl.java | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 indexer-service-azure/src/main/java/org/opendes/indexer/azure/publish/PublisherImpl.java diff --git a/indexer-service-azure/src/main/java/org/opendes/indexer/azure/publish/PublisherImpl.java b/indexer-service-azure/src/main/java/org/opendes/indexer/azure/publish/PublisherImpl.java new file mode 100644 index 000000000..d02b69859 --- /dev/null +++ b/indexer-service-azure/src/main/java/org/opendes/indexer/azure/publish/PublisherImpl.java @@ -0,0 +1,96 @@ +// Copyright 2017-2019, Schlumberger +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.opendes.indexer.azure.publish; + + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonElement; +import com.google.gson.reflect.TypeToken; +import com.microsoft.azure.servicebus.IMessage; +import com.microsoft.azure.servicebus.TopicClient; +import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder; +import org.elasticsearch.common.Strings; +import org.opendes.client.api.DpsHeaders; +import org.opendes.client.multitenancy.ITenantFactory; +import org.opendes.core.model.DeploymentEnvironment; +import org.opendes.core.model.RecordChangedMessages; +import org.opendes.core.util.Config; +import org.opendes.indexer.model.RecordStatus; +import org.opendes.indexer.publish.IPublisher; +import org.opendes.indexer.util.JobStatus; +import org.reactivestreams.Publisher; +import org.springframework.beans.factory.annotation.Autowired; + +import java.lang.reflect.Type; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class PublisherImpl implements IPublisher { + + private static final Map<String, Publisher> PUBSUB_CLIENTS = new HashMap<>(); + + private static final String TOPIC_ID = "indexing-progress"; + private String TOPIC_NAME = "recordstopic"; + private String CONNECTION_STRING = "Endpoint=sb://pliuopendes.servicebus.windows.net/;\" +\n" + + " \"SharedAccessKeyName=RootManageSharedAccessKey;\" +\n" + + " \"SharedAccessKey=km8Nscc0gf299Ck6npmM3D14VU5Tx1lJYRdlHcExIvY="; + + + @Autowired + private ITenantFactory tenantStorageFactory; + + @Override + public void publishStatusChangedTagsToTopic(DpsHeaders headers, JobStatus indexerBatchStatus) throws Exception { + + if (Config.getDeploymentEnvironment() == DeploymentEnvironment.LOCAL) ; + + String tenant = headers.getPartitionId(); + if (Strings.isNullOrEmpty(tenant)) + tenant = headers.getAccountId(); + + TopicClient publisher = new TopicClient(new ConnectionStringBuilder(CONNECTION_STRING, TOPIC_NAME)); + RecordChangedMessages recordChangedMessages = getRecordChangedMessage(headers, indexerBatchStatus); + publisher.send((IMessage) recordChangedMessages); + + } + + private RecordChangedMessages getRecordChangedMessage(DpsHeaders headers, JobStatus indexerBatchStatus) { + Gson gson = new GsonBuilder().create(); + Map<String, String> attributesMap = new HashMap<>(); + Type listType = new TypeToken<List<RecordStatus>>() { + }.getType(); + JsonElement statusChangedTagsJson = gson.toJsonTree(indexerBatchStatus.getStatusesList(), listType); + String statusChangedTagsData = (statusChangedTagsJson.toString()); + + String tenant = headers.getPartitionId(); + // This code it to provide backward compatibility to slb-account-id + if (!Strings.isNullOrEmpty(tenant)) { + attributesMap.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionId()); + } else { + attributesMap.put(DpsHeaders.ACCOUNT_ID, headers.getAccountId()); + } + attributesMap.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId()); + + + RecordChangedMessages recordChangedMessages = new RecordChangedMessages(); + // statusChangedTagsData is not ByteString but String + recordChangedMessages.setData(statusChangedTagsData); + recordChangedMessages.setAttributes(attributesMap); + + return recordChangedMessages; + } +} -- GitLab