Skip to content
Snippets Groups Projects
Commit 3b031962 authored by MANISH KUMAR's avatar MANISH KUMAR
Browse files

Merge branch 'users/amaverma/reindexServiceBusHandlingAzure' into 'master'

adding IndexerQueueTaskBuilder for azure

See merge request !97
parents 4306003a 01d15b32
No related branches found
No related tags found
1 merge request!97adding IndexerQueueTaskBuilder for azure
Pipeline #29303 passed
......@@ -100,7 +100,7 @@ spec:
- name: STORAGE_QUERY_RECORD_FOR_CONVERSION_HOST
value: http://storage/api/storage/v2/query/records:batch
- name: STORAGE_QUERY_RECORD_HOST
value: http://storage/api/storage/v2/records
value: http://storage/api/storage/v2/query/records
- name: partition_service_endpoint
value: http://partition/api/partition/v1
- name: azure_istioauth_enabled
......
......@@ -161,4 +161,4 @@ spec:
- name: STORAGE_QUERY_RECORD_FOR_CONVERSION_HOST
value: http://storage/api/storage/v2/query/records:batch
- name: STORAGE_QUERY_RECORD_HOST
value: http://storage/api/storage/v2/records
value: http://storage/api/storage/v2/query/records
// Copyright © Azure
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.indexer.azure.util;
import com.google.common.reflect.TypeToken;
import com.google.gson.*;
import com.microsoft.azure.servicebus.Message;
import lombok.extern.java.Log;
import org.opengroup.osdu.azure.servicebus.ITopicClientFactory;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.indexer.RecordInfo;
import org.opengroup.osdu.core.common.model.search.RecordChangedMessages;
import org.opengroup.osdu.indexer.util.IndexerQueueTaskBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import org.springframework.web.context.annotation.RequestScope;
import javax.inject.Inject;
import javax.inject.Named;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.lang.reflect.Type;
@Log
@Component
@RequestScope
@Primary
public class IndexerQueueTaskBuilderAzure extends IndexerQueueTaskBuilder {
@Autowired
private ITopicClientFactory topicClientFactory;
@Inject
private JaxRsDpsLog logger;
@Inject
@Named("SERVICE_BUS_TOPIC")
private String serviceBusTopic;
@Override
public void createWorkerTask(String payload, Long countdownMillis, DpsHeaders headers) {
createTask(payload, headers);
}
@Override
public void createReIndexTask(String payload, Long countdownMillis, DpsHeaders headers) {
createTask(payload, headers);
}
private void createTask(String payload, DpsHeaders headers) {
Gson gson = new Gson();
RecordChangedMessages receivedPayload = gson.fromJson(payload, RecordChangedMessages.class);
Message message = new Message();
Map<String, Object> properties = new HashMap<>();
// properties
properties.put(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
properties.put(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
headers.addCorrelationIdIfMissing();
properties.put(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
message.setProperties(properties);
// data
List<RecordInfo> recordInfos = parseRecordsAsJSON(receivedPayload.getData());
// add all to body {"message": {"data":[], "id":...}}
JsonObject jo = new JsonObject();
jo.add("data", gson.toJsonTree(recordInfos));
jo.addProperty(DpsHeaders.ACCOUNT_ID, headers.getPartitionIdWithFallbackToAccountId());
jo.addProperty(DpsHeaders.DATA_PARTITION_ID, headers.getPartitionIdWithFallbackToAccountId());
jo.addProperty(DpsHeaders.CORRELATION_ID, headers.getCorrelationId());
JsonObject jomsg = new JsonObject();
jomsg.add("message", jo);
message.setBody(jomsg.toString().getBytes(StandardCharsets.UTF_8));
message.setContentType("application/json");
try {
logger.info("Indexer publishes message to Service Bus " + headers.getCorrelationId());
topicClientFactory.getClient(headers.getPartitionId(), serviceBusTopic).send(message);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
private List<RecordInfo> parseRecordsAsJSON(String inputPayload) {
Gson gson = new Gson();
Type type = new TypeToken<List<RecordInfo>>(){}.getType();
List<RecordInfo> recordInfoList = gson.fromJson(inputPayload, type);
return recordInfoList;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment