Skip to content
Snippets Groups Projects
Commit f8869a0d authored by Rostislav Dublin (EPAM)'s avatar Rostislav Dublin (EPAM)
Browse files

OQM stuff added to register Subscribers on all Pull Subscriptions. OQM...

OQM stuff added to register Subscribers on all Pull Subscriptions. OQM successfully receives events from subscriptions and conduct to finite clients webhook endpoints
parent 86659ccd
No related branches found
No related tags found
1 merge request!144(GONRG-3831) GCP Notification: OQM mapper
Pipeline #77652 failed
Showing
with 687 additions and 8 deletions
......@@ -25,7 +25,7 @@
<java.version>8</java.version>
<maven.compiler.target>${java.version}</maven.compiler.target>
<maven.compiler.source>${java.version}</maven.compiler.source>
<os-core-common.version>0.11.0</os-core-common.version>
<os-core-common.version>0.12.0</os-core-common.version>
</properties>
<licenses>
......
......@@ -37,6 +37,17 @@
</properties>
<dependencies>
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>osm</artifactId>
<version>0.13.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>oqm</artifactId>
<version>0.13.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>os-core-common</artifactId>
......@@ -44,7 +55,7 @@
<dependency>
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-gcp</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</dependency>
<dependency>
......
package org.opengroup.osdu.notification.provider.gcp.mappers.oqm;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.http.client.Client;
import com.rabbitmq.http.client.ClientParameters;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.partition.*;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriverRuntimeException;
import org.opengroup.osdu.core.gcp.oqm.driver.rabbitmq.MqOqmDestinationResolution;
import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.core.gcp.osm.translate.TranslatorRuntimeException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON;
/**
* For RabbitMQ. Tenant Based OQM destination resolver
*
* @author Rostislav_Dublin
* @since 09.11.2021
*/
@Component
@Scope(SCOPE_SINGLETON)
@ConditionalOnProperty(name = "oqmDriver", havingValue = "rabbitmq")
@RequiredArgsConstructor
@Slf4j
public class MqTenantOqmDestinationResolver implements org.opengroup.osdu.core.gcp.oqm.driver.rabbitmq.MqOqmDestinationResolver {
private final static String PARTITION_PROPERTIES_AMQP_PREFIX = "register.oqm.rabbitmq.amqp.";
private final static String PARTITION_PROPERTY_AMQP_HOST = PARTITION_PROPERTIES_AMQP_PREFIX.concat("host");
private final static String PARTITION_PROPERTY_AMQP_PORT = PARTITION_PROPERTIES_AMQP_PREFIX.concat("port");
private final static String PARTITION_PROPERTY_AMQP_PATH = PARTITION_PROPERTIES_AMQP_PREFIX.concat("path");
private final static String PARTITION_PROPERTY_AMQP_USERNAME = PARTITION_PROPERTIES_AMQP_PREFIX.concat("username");
private final static String PARTITION_PROPERTY_AMQP_PASSWORD = PARTITION_PROPERTIES_AMQP_PREFIX.concat("password");
private final static String PARTITION_PROPERTIES_ADMIN_PREFIX = "register.oqm.rabbitmq.admin.";
private final static String PARTITION_PROPERTY_ADMIN_SCHEMA = PARTITION_PROPERTIES_ADMIN_PREFIX.concat("schema");
private final static String PARTITION_PROPERTY_ADMIN_HOST = PARTITION_PROPERTIES_ADMIN_PREFIX.concat("host");
private final static String PARTITION_PROPERTY_ADMIN_PORT = PARTITION_PROPERTIES_ADMIN_PREFIX.concat("port");
private final static String PARTITION_PROPERTY_ADMIN_PATH = PARTITION_PROPERTIES_ADMIN_PREFIX.concat("path");
private final static String PARTITION_PROPERTY_ADMIN_USERNAME = PARTITION_PROPERTIES_ADMIN_PREFIX.concat("username");
private final static String PARTITION_PROPERTY_ADMIN_PASSWORD = PARTITION_PROPERTIES_ADMIN_PREFIX.concat("password");
private final DpsHeaders dpsHeaders;
private final IPartitionFactory partitionFactory;
private Map<OqmDestination, ConnectionFactory> amqpConnectionFactoryCache = new HashMap<>();
private Map<OqmDestination, Client> httpClientCache = new HashMap<>();
@Override
public MqOqmDestinationResolution resolve(OqmDestination destination) {
IPartitionProvider partitionProvider = partitionFactory.create(dpsHeaders);
PartitionInfo partitionInfo;
try {
partitionInfo = partitionProvider.get(destination.getPartitionId());
} catch (PartitionException e) {
throw new TranslatorRuntimeException(e, "Partition '{}' destination resolution issue", destination.getPartitionId());
}
Map<String, Property> partitionProperties = partitionInfo.getProperties();
String amqpHost = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_AMQP_HOST)).map(Property::getValue).map(Object::toString).orElse(null);
String amqpPort = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_AMQP_PORT)).map(Property::getValue).map(Object::toString).orElse(null);
String amqpPath = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_AMQP_PATH)).map(Property::getValue).map(Object::toString).orElse(null);
String amqpUser = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_AMQP_USERNAME)).map(Property::getValue).map(Object::toString).orElse(null);
String amqpPass = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_AMQP_PASSWORD)).map(Property::getValue).map(Object::toString).orElse(null);
String adminSchm = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_ADMIN_SCHEMA)).map(Property::getValue).map(Object::toString).orElse(null);
String adminHost = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_ADMIN_HOST)).map(Property::getValue).map(Object::toString).orElse(null);
String adminPort = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_ADMIN_PORT)).map(Property::getValue).map(Object::toString).orElse(null);
String adminPath = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_ADMIN_PATH)).map(Property::getValue).map(Object::toString).orElse(null);
String adminUser = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_ADMIN_USERNAME)).map(Property::getValue).map(Object::toString).orElse(null);
String adminPass = Optional.ofNullable(partitionProperties.get(PARTITION_PROPERTY_ADMIN_PASSWORD)).map(Property::getValue).map(Object::toString).orElse(null);
if (amqpHost == null || amqpPort == null || amqpPath == null || amqpUser == null || amqpPass == null) {
throw new TranslatorRuntimeException(null, "Partition '{}' RabbitMQ OQM destination resolution AMQP part configuration issue", destination.getPartitionId());
}
if (adminSchm == null || adminHost == null || adminPort == null || adminPath == null || adminUser == null || adminPass == null) {
throw new TranslatorRuntimeException(null, "Partition '{}' RabbitMQ OQM destination resolution ADMIN part configuration issue", destination.getPartitionId());
}
//noinspection SwitchStatementWithTooFewBranches
switch (destination.getPartitionId()) {
default:
String virtualHost = "/";
ConnectionFactory amqpFactory = amqpConnectionFactoryCache.get(destination);
if (amqpFactory == null) {
URI amqpUri;
try {
amqpUri = new URI("amqp", amqpUser + ":" + amqpPass, amqpHost, Integer.parseInt(amqpPort), amqpPath, null, null);
amqpFactory = new ConnectionFactory();
amqpFactory.setUri(amqpUri);
amqpConnectionFactoryCache.put(destination, amqpFactory);
} catch (URISyntaxException | NoSuchAlgorithmException | KeyManagementException e) {
throw new OqmDriverRuntimeException("RabbitMQ amqp URI and ConnectionFactory", e);
}
}
Client httpClient = httpClientCache.get(destination);
if (httpClient == null) {
try {
URI httpUrl = new URI(adminSchm, null, adminHost, Integer.parseInt(adminPort), adminPath, null, null);
ClientParameters clientParameters = new ClientParameters().url(httpUrl.toURL())
.username("guest").password("guest");
httpClient = new Client(clientParameters);
httpClientCache.put(destination, httpClient);
} catch (URISyntaxException | MalformedURLException e) {
throw new OqmDriverRuntimeException("RabbitMQ http(api) URI and Client", e);
}
}
return MqOqmDestinationResolution.builder()
.amqpFactory(amqpFactory)
.adminClient(httpClient)
.virtualHost(virtualHost)
.build();
}
}
@PreDestroy
public void shutdown() {
log.info("On pre-destroy.");
}
}
package org.opengroup.osdu.notification.provider.gcp.mappers.oqm;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriverRuntimeException;
import org.opengroup.osdu.core.gcp.oqm.driver.pubsub.PsOqmDestinationResolution;
import org.opengroup.osdu.core.gcp.oqm.driver.pubsub.PsOqmDestinationResolver;
import org.opengroup.osdu.core.gcp.oqm.model.OqmDestination;
import org.opengroup.osdu.notification.provider.gcp.util.GcpAppServiceConfig;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON;
/**
* For GCP PubSub. Tenant Based OQM destination resolver
*
* @author Rostislav_Dublin
* @since 09.11.2021
*/
@Component
@Scope(SCOPE_SINGLETON)
@ConditionalOnProperty(name = "oqmDriver", havingValue = "pubsub")
@Slf4j
@RequiredArgsConstructor
public class PsTenantOqmDestinationResolver implements PsOqmDestinationResolver {
private Map<OqmDestination, TopicAdminClient> topicClientCache = new HashMap<>();
private Map<OqmDestination, SubscriptionAdminClient> subscriptionClientCache = new HashMap<>();
private final ITenantFactory tenantInfoFactory;
private final GcpAppServiceConfig config;
@Override
public PsOqmDestinationResolution resolve(OqmDestination destination) {
TenantInfo ti = tenantInfoFactory.getTenantInfo(destination.getPartitionId());
String partitionId = destination.getPartitionId();
//noinspection SwitchStatementWithTooFewBranches
switch (partitionId) {
default:
String servicesProjectId = config.getGoogleCloudProject();
String dataProjectId = ti.getProjectId();
TopicAdminClient tac = topicClientCache.get(destination);
if (tac == null) {
try {
TopicAdminSettings tas = TopicAdminSettings.newBuilder().build();
tac = TopicAdminClient.create(tas);
topicClientCache.put(destination, tac);
} catch (IOException e) {
throw new OqmDriverRuntimeException("PsOqmDestinationResolution#resolve TopicAdminClient", e);
}
}
SubscriptionAdminClient sac = subscriptionClientCache.get(destination);
if (sac == null) {
try {
sac = SubscriptionAdminClient.create();
subscriptionClientCache.put(destination, sac);
} catch (IOException e) {
throw new OqmDriverRuntimeException("PsOqmDestinationResolution#resolve SubscriptionAdminClient", e);
}
}
return PsOqmDestinationResolution.builder()
.servicesProjectId(servicesProjectId)
.dataProjectId(dataProjectId)
.topicAdminClient(tac)
.subscriptionAdminClient(sac)
.build();
}
}
@PreDestroy
public void shutdown() {
log.info("On pre-destroy. {} topic client(s) & {} subscription clients to shutdown",
topicClientCache.size(), subscriptionClientCache.size());
for (TopicAdminClient tac : topicClientCache.values()) {
tac.shutdown();
}
for (SubscriptionAdminClient sac : subscriptionClientCache.values()) {
sac.shutdown();
}
}
}
/*
* Copyright 2017-2020, Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.notification.provider.gcp.pubsub;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.http.HttpResponse;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.opengroup.osdu.core.gcp.oqm.driver.OqmDriver;
import org.opengroup.osdu.core.gcp.oqm.model.*;
import org.opengroup.osdu.notification.provider.gcp.pubsub.tempForPoc.OqmNotificationHandler;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON;
@Slf4j
@Component
@Scope(SCOPE_SINGLETON)
@ConditionalOnProperty(name = "oqmDriver")
@RequiredArgsConstructor
public class OqmSubscriberManager {
private final String ACKNOWLEDGE = "message acknowledged by client";
private final String NOT_ACKNOWLEDGE = "message not acknowledged by client";
//TODO should be externalized to application.properties
private static final List<OqmTopic> INTERESTED_TOPICS =
Stream.of("records-changed", "schema-changed", "status-changed", "legaltags_changed")
.map(topicName -> OqmTopic.builder().name(topicName).build()).collect(Collectors.toList());
private static final String INTERESTED_SUBSCRIPTIONS_PREFIX = "de-";
private static final OqmSubscriptionQuery INTERESTED_SUBSCRIPTIONS_QUERY = OqmSubscriptionQuery.builder()
.forAnyOfTopics(INTERESTED_TOPICS).namePrefix(INTERESTED_SUBSCRIPTIONS_PREFIX)
.subscriberable(true).build();
private final ITenantFactory tenantInfoFactory;
private final OqmDriver driver;
private final OqmNotificationHandler notificationHandler;
@PostConstruct
void postConstruct() {
log.info("OqmSubscriberManager bean constructed. Provisioning STARTED");
//Get all Tenant infos
for (TenantInfo tenantInfo : tenantInfoFactory.listTenantInfo()) {
log.info("* OqmSubscriberManager on provisioning tenant {}:", tenantInfo.getDataPartitionId());
//For every Tenant Destination get "subscriberable" Subscriptions
for (OqmSubscription subscription : getSubscriberableSubscriptions(tenantInfo)) {
log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}:", tenantInfo.getDataPartitionId(), subscription.getName());
//Register a Subscriber on every subscription
OqmDestination destination = getDestination(tenantInfo);
OqmMessageReceiver receiver = (oqmMessage, oqmAckReplier) -> {
String pubsubMessage = oqmMessage.getData();
String notificationId = subscription.getName();
Map<String, String> headerAttributes = oqmMessage.getAttributes();
HttpResponse response;
boolean ackedNacked = false;
try {
response = notificationHandler.notifySubscriber(notificationId, pubsubMessage, headerAttributes);
if (!response.isSuccessCode()) {
log.error(NOT_ACKNOWLEDGE + response.getBody());
} else {
log.debug(ACKNOWLEDGE);
oqmAckReplier.ack();
}
ackedNacked = true;
} catch (Exception e) {
log.debug(NOT_ACKNOWLEDGE, e);
}
if (!ackedNacked) oqmAckReplier.nack();
};
OqmSubscriber subscriber = OqmSubscriber.builder().subscription(subscription).messageReceiver(receiver).build();
driver.subscribe(subscriber, destination);
log.info("* * OqmSubscriberManager on provisioning for tenant {}, subscription {}: Subscriber REGISTERED.", tenantInfo.getDataPartitionId(), subscription.getName());
}
log.info("OqmSubscriberManager bean constructed. Provisioning COMPLETED");
}
}
public List<OqmSubscription> getSubscriberableSubscriptions(TenantInfo tenantInfo) {
return driver.listSubscriptions(null, INTERESTED_SUBSCRIPTIONS_QUERY, getDestination(tenantInfo));
}
private OqmDestination getDestination(TenantInfo tenantInfo) {
return OqmDestination.builder().partitionId(tenantInfo.getDataPartitionId()).build();
}
}
/*
* Copyright 2017-2020, Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.notification.provider.gcp.pubsub.tempForPoc;
import org.opengroup.osdu.core.common.http.HttpClient;
import org.opengroup.osdu.core.common.http.HttpRequest;
import org.opengroup.osdu.core.common.http.HttpResponse;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.notification.Secret;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.notification.auth.factory.AuthFactory;
import org.opengroup.osdu.notification.auth.interfaces.SecretAuth;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
public class OqmNotificationHandler {
private final static Logger LOGGER = LoggerFactory.getLogger(OqmNotificationHandler.class);
@Autowired
private HttpClient httpClient;
@Autowired
private OqmSubscriptionHandler subscriptionHandler;
@Autowired
private AuthFactory authFactory;
@Value("${app.waitingTime:30000}")
private int WAITING_TIME;
public HttpResponse notifySubscriber(String notificationId, String pubsubMessage, Map<String, String> headerAttributes) throws Exception {
Subscription subscription = subscriptionHandler.getSubscriptionFromCache(notificationId, headerAttributes);
Secret secret = subscription.getSecret();
String endpoint = subscription.getPushEndpoint();
String secretType = secret.getSecretType();
String pushUrl;
// Authentication Secret
SecretAuth secretAuth = authFactory.getSecretAuth(secretType);
secretAuth.setSecret(secret);
pushUrl = secretAuth.getPushUrl(endpoint);
Map<String, String> requestHeader = secretAuth.getRequestHeaders();
requestHeader.put(DpsHeaders.CONTENT_TYPE, "application/json");
requestHeader.put(DpsHeaders.CORRELATION_ID, headerAttributes.get(DpsHeaders.CORRELATION_ID));
requestHeader.put(DpsHeaders.DATA_PARTITION_ID, headerAttributes.get(DpsHeaders.DATA_PARTITION_ID));
HttpRequest request = HttpRequest.post().url(pushUrl).headers(requestHeader).body(pubsubMessage).connectionTimeout(WAITING_TIME).build();
HttpResponse response = httpClient.send(request);
LOGGER.debug("Sending out notification to endpoint: " + endpoint);
return response;
}
}
package org.opengroup.osdu.notification.provider.gcp.pubsub.tempForPoc;
import lombok.extern.slf4j.Slf4j;
import org.opengroup.osdu.core.common.cryptographic.SignatureService;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
import org.springframework.context.annotation.ScopedProxyMode;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import static org.springframework.beans.factory.config.BeanDefinition.SCOPE_SINGLETON;
/**
* @author Rostislav_Dublin
* @since 17.11.2021
*/
@Component
@Scope(SCOPE_SINGLETON)
@Primary
@Slf4j
public class OqmSignatureService extends SignatureService {
@PostConstruct
void postConstruct() {
log.info("OqmSignatureService bean constructed.");
}
}
/*
* Copyright 2017-2020, Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.notification.provider.gcp.pubsub.tempForPoc;
import org.opengroup.osdu.core.common.cache.ICache;
import org.opengroup.osdu.core.common.cache.MultiTenantCache;
import org.opengroup.osdu.core.common.cache.VmCache;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.tenant.TenantInfo;
import org.opengroup.osdu.core.common.provider.interfaces.ITenantFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.util.Map;
@Component
public class OqmSubscriptionCacheFactory {
@Autowired
private ITenantFactory tenantFactory;
private MultiTenantCache<String> caches;
public OqmSubscriptionCacheFactory(@Value("${app.expireTime}") int expireTime, @Value("${app.maxCacheSize}") int maxCacheSize) {
this.caches = new MultiTenantCache<>(new VmCache<>(expireTime, maxCacheSize));
}
public void put(String key, String val, Map<String, String> headerAttributes) {
this.partitionCache(headerAttributes).put(key, val);
}
public String get(String key, Map<String, String> headerAttributes) {
return this.partitionCache(headerAttributes).get(key);
}
public void delete(String key, Map<String, String> headerAttributes) {
this.partitionCache(headerAttributes).delete(key);
}
public void clearAll(Map<String, String> headerAttributes) {
this.partitionCache(headerAttributes).clearAll();
}
private ICache<String, String> partitionCache(Map<String, String> headerAttributes) {
String tenantId = headerAttributes.get(DpsHeaders.DATA_PARTITION_ID);
TenantInfo tenantInfo = this.tenantFactory.getTenantInfo(tenantId);
if (tenantInfo == null) {
throw AppException.createUnauthorized(String.format("could not retrieve tenant info for data partition id: %s", tenantId));
}
return this.caches.get(String.format("%s:subscription", tenantInfo.getDataPartitionId()));
}
}
/*
* Copyright 2017-2020, Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.notification.provider.gcp.pubsub.tempForPoc;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.core.common.logging.JaxRsDpsLog;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.notification.Subscription;
import org.opengroup.osdu.core.common.notification.ISubscriptionFactory;
import org.opengroup.osdu.core.common.notification.ISubscriptionService;
import org.opengroup.osdu.core.common.notification.SubscriptionException;
import org.opengroup.osdu.core.common.util.IServiceAccountJwtClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Component
public class OqmSubscriptionHandler {
@Autowired
private ISubscriptionFactory subscriptionFactory;
@Autowired
private OqmSubscriptionCacheFactory subscriptionCacheFactory;
@Autowired
private JaxRsDpsLog log;
@Autowired
private IServiceAccountJwtClient serviceAccountJwtClient;
private static final Gson gson = new Gson();
private ObjectMapper objectMapper;
public Subscription getSubscriptionFromCache(String notificationId, Map<String, String> headerAttributes) {
String subscriptionString = subscriptionCacheFactory.get(notificationId, headerAttributes);
try {
if (Strings.isNullOrEmpty(subscriptionString))
subscriptionString = querySubscriptionAndUpdateCache(notificationId, headerAttributes);
ObjectMapper objectMapper = this.getObjectMapper();
return objectMapper.readValue(subscriptionString, Subscription.class);
} catch (IOException e) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error Parsing subscription String to object", "Unexpected error in pushing message", e);
} catch (SubscriptionException se) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Error query subscription from registration", "Unexpected error in pushing message", se);
}
}
private String querySubscriptionAndUpdateCache(String notificationId, Map<String, String> headerAttributes) throws AppException, SubscriptionException {
DpsHeaders headers = getDpsHeaders(headerAttributes);
ISubscriptionService service = subscriptionFactory.create(headers);
List<Subscription> subscriptionList = service.query(notificationId);
if (subscriptionList == null || subscriptionList.size() == 0) {
throw new AppException(HttpStatus.SC_NOT_FOUND, "Not found subscription for notificationId:" + notificationId, "Subscription not found");
}
Subscription subscription = subscriptionList.get(0);
String jsonSubscription = gson.toJson(subscription);
this.subscriptionCacheFactory.put(subscription.getNotificationId(), jsonSubscription, headerAttributes);
return jsonSubscription;
}
private DpsHeaders getDpsHeaders(Map<String, String> headerAttributes) {
Map<String, String> attributes = new HashMap<>(headerAttributes);
//extract headers from pubsub message
String dataPartitionId = headerAttributes.get(DpsHeaders.DATA_PARTITION_ID);
String authToken = this.serviceAccountJwtClient.getIdToken(dataPartitionId);
attributes.put(DpsHeaders.AUTHORIZATION, authToken);
return DpsHeaders.createFromMap(attributes);
}
//unit test purpose
protected ObjectMapper getObjectMapper() {
if (this.objectMapper == null) {
this.objectMapper = new ObjectMapper();
}
return this.objectMapper;
}
//unit test purpose
void setObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
}
/*
* Copyright 2017-2020, Schlumberger
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opengroup.osdu.notification.provider.gcp.util;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
@Component
@Primary
public class GcpAppServiceConfig {
@Value("${GOOGLE_CLOUD_PROJECT}")
private String googleCloudProject;
public String getGoogleCloudProject() {
return googleCloudProject;
}
}
\ No newline at end of file
......@@ -18,7 +18,7 @@ package org.opengroup.osdu.notification.provider.gcp.util;
import lombok.SneakyThrows;
import org.apache.http.impl.client.CloseableHttpClient;
import org.opengroup.osdu.core.gcp.GoogleIdToken.IGoogleIdTokenFactory;
import org.opengroup.osdu.core.gcp.googleidtoken.IGoogleIdTokenFactory;
import org.opengroup.osdu.notification.provider.interfaces.IGoogleServiceAccount;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......
......@@ -19,8 +19,7 @@ package org.opengroup.osdu.notification.provider.gcp.util;
import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson.JacksonFactory;
import com.google.api.client.json.gson.GsonFactory;
import com.google.api.services.iam.v1.Iam;
import com.google.api.services.iam.v1.IamScopes;
import com.google.api.services.iam.v1.model.SignJwtRequest;
......@@ -59,7 +58,7 @@ public class ServiceAccountJwtGcpClientImpl implements IServiceAccountJwtClient
private AppProperties config;
private static final String JWT_AUDIENCE = "https://www.googleapis.com/oauth2/v4/token";
private static final String SERVICE_ACCOUNT_NAME_FORMAT = "projects/%s/serviceAccounts/%s";
private static final JsonFactory JSON_FACTORY = new JacksonFactory();
private static final GsonFactory JSON_FACTORY = new GsonFactory();
static final String INVALID_INPUT = "Invalid inputs provided to getIdToken function";
static final String INVALID_DATA_PARTITION = "Invalid data partition id";
......
......@@ -3,7 +3,7 @@
<include resource="org/springframework/boot/logging/logback/defaults.xml"/>
<property resource="application.properties" />
<logger name="org.opengroup.osdu" level="${LOG_LEVEL}"/>
<springProfile name="local">
<springProfile name="!local">
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%yellow([%thread]) %highlight(| %-5level |) %green(%d) %cyan(| %logger{15} |) %highlight(%msg) %n</pattern>
......@@ -15,7 +15,7 @@
</root>
</springProfile>
<springProfile name="!local">
<springProfile name="off">
<appender name="stdout" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.core.encoder.LayoutWrappingEncoder">
<layout class="ch.qos.logback.contrib.json.classic.JsonLayout">
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment