Skip to content
Snippets Groups Projects
Commit 5875a7d7 authored by adubey8's avatar adubey8
Browse files

PublishImpl initial code

parent bc0fea4e
No related branches found
No related tags found
1 merge request!6Trusted ibm
// Copyright 2017-2019, Schlumberger
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opendes.indexer.azure.publish;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.reflect.TypeToken;
import com.microsoft.azure.servicebus.IMessage;
import com.microsoft.azure.servicebus.TopicClient;
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
import org.elasticsearch.common.Strings;
import org.opendes.client.api.DpsHeaders;
import org.opendes.client.multitenancy.ITenantFactory;
import org.opendes.core.model.DeploymentEnvironment;
import org.opendes.core.model.RecordChangedMessages;
import org.opendes.core.util.Config;
import org.opendes.indexer.model.RecordStatus;
import org.opendes.indexer.publish.IPublisher;
import org.opendes.indexer.util.JobStatus;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class PublisherImpl implements IPublisher {
private static final Map<String, Publisher> PUBSUB_CLIENTS = new HashMap<>();
private static final String TOPIC_ID = "indexing-progress";
private String TOPIC_NAME = "recordstopic";
private String CONNECTION_STRING = "Endpoint=sb://pliuopendes.servicebus.windows.net/;\" +\n" +
" \"SharedAccessKeyName=RootManageSharedAccessKey;\" +\n" +
" \"SharedAccessKey=km8Nscc0gf299Ck6npmM3D14VU5Tx1lJYRdlHcExIvY=";
@Autowired
private ITenantFactory tenantStorageFactory;
@Override
public void publishStatusChangedTagsToTopic(DpsHeaders headers, JobStatus indexerBatchStatus) throws Exception {
if (Config.getDeploymentEnvironment() == DeploymentEnvironment.LOCAL) ;
String tenant = headers.getPartitionId();
if (Strings.isNullOrEmpty(tenant))
tenant = headers.getAccountId();
TopicClient publisher = new TopicClient(new ConnectionStringBuilder(CONNECTION_STRING, TOPIC_NAME));
RecordChangedMessages recordChangedMessages = getRecordChangedMessage(headers, indexerBatchStatus);
publisher.send((IMessage) recordChangedMessages);
}
private RecordChangedMessages getRecordChangedMessage(DpsHeaders headers, JobStatus indexerBatchStatus) {
Gson gson = new GsonBuilder().create();
Map<String, String> attributesMap = new HashMap<>();
Type listType = new TypeToken<List<RecordStatus>>() {
}.getType();
JsonElement statusChangedTagsJson = gson.toJsonTree(indexerBatchStatus.getStatusesList(), listType);
String statusChangedTagsData = (statusChangedTagsJson.toString());
String tenant = headers.getPartitionId();
// This code it to provide backward compatibility to slb-account-id
if (!Strings.isNullOrEmpty(tenant)) {
attributesMap.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionId());
} else {
attributesMap.put(DpsHeaders.ACCOUNT_ID, headers.getAccountId());
}
attributesMap.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
RecordChangedMessages recordChangedMessages = new RecordChangedMessages();
// statusChangedTagsData is not ByteString but String
recordChangedMessages.setData(statusChangedTagsData);
recordChangedMessages.setAttributes(attributesMap);
return recordChangedMessages;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment