Commit e6bad024 authored by Aalekh Jain's avatar Aalekh Jain
Browse files

Merge branch 'master' of...

Merge branch 'master' of https://community.opengroup.org/osdu/platform/system/lib/cloud/azure/os-core-lib-azure into containerCreationAndDeletion
parents add609d0 aa76045d
Pipeline #32318 failed with stage
in 10 seconds
......@@ -20,7 +20,7 @@
<groupId>org.opengroup.osdu</groupId>
<artifactId>core-lib-azure</artifactId>
<packaging>jar</packaging>
<version>0.0.63</version>
<version>0.8.0-SNAPSHOT</version>
<name>core-lib-azure</name>
<properties>
......@@ -334,6 +334,4 @@
</plugin>
</plugins>
</build>
</project>
......@@ -20,12 +20,17 @@ import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.models.BlobCopyInfo;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.CopyStatusType;
import com.azure.storage.blob.sas.BlobContainerSasPermission;
import com.azure.storage.blob.sas.BlobSasPermission;
import com.azure.storage.blob.sas.BlobServiceSasSignatureValues;
import com.azure.storage.blob.specialized.BlockBlobClient;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.azure.logging.CoreLoggerFactory;
import org.opengroup.osdu.azure.logging.DependencyPayload;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.slf4j.helpers.MessageFormatter;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
......@@ -34,7 +39,6 @@ import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collections;
/**
* A simpler interface to interact with Azure blob storage.
......@@ -78,12 +82,15 @@ import java.util.Collections;
* </pre>
*/
public class BlobStore {
private static final String LOGGER_NAME = BlobStore.class.getName();
private IBlobServiceClientFactory blobServiceClientFactory;
private ILogger logger;
/**
* Constructor to create BlobStore.
* @param factory Factory that provides blob client.
*
* @param factory Factory that provides blob client.
* @param loggerInstance logger instance to be used for logging.
*/
public BlobStore(final IBlobServiceClientFactory factory, final ILogger loggerInstance) {
......@@ -91,8 +98,6 @@ public class BlobStore {
this.logger = loggerInstance;
}
private static final String LOG_PREFIX = "azure-core-lib";
/**
* @param filePath Path of file to be read.
* @param dataPartitionId Data partition id
......@@ -105,19 +110,28 @@ public class BlobStore {
final String containerName) {
BlobContainerClient blobContainerClient = getBlobContainerClient(dataPartitionId, containerName);
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try (ByteArrayOutputStream downloadStream = new ByteArrayOutputStream()) {
blockBlobClient.download(downloadStream);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).info("{}", MessageFormatter.format("Done reading from {}", filePath).getMessage());
return downloadStream.toString(StandardCharsets.UTF_8.name());
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
throw handleBlobStoreException(404, "Specified blob was not found", ex);
}
throw handleBlobStoreException(500, "Failed to read specified blob", ex);
} catch (UnsupportedEncodingException ex) {
throw handleBlobStoreException(400, String.format("Encoding was not correct for item with name=%s", filePath), ex);
statusCode = HttpStatus.SC_BAD_REQUEST;
throw handleBlobStoreException(400, MessageFormatter.format("Encoding was not correct for item with name={}", filePath).getMessage(), ex);
} catch (IOException ex) {
throw handleBlobStoreException(500, String.format("Malformed document for item with name=%s", filePath), ex);
statusCode = HttpStatus.SC_INTERNAL_SERVER_ERROR;
throw handleBlobStoreException(500, MessageFormatter.format("Malformed document for item with name={}", filePath).getMessage(), ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.arrayFormat("{}:{}/{}", new String[]{dataPartitionId, containerName, filePath}).getMessage();
logDependency("READ_FROM_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
}
......@@ -133,14 +147,22 @@ public class BlobStore {
final String containerName) {
BlobContainerClient blobContainerClient = getBlobContainerClient(dataPartitionId, containerName);
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try {
blockBlobClient.delete();
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).info("{}", MessageFormatter.format("Done deleting blob at {}", filePath).getMessage());
return true;
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
if (ex.getErrorCode().equals(BlobErrorCode.BLOB_NOT_FOUND)) {
throw handleBlobStoreException(404, "Specified blob was not found", ex);
}
throw handleBlobStoreException(500, "Failed to delete blob", ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.arrayFormat("{}:{}/{}", new String[]{dataPartitionId, containerName, filePath}).getMessage();
logDependency("DELETE_FROM_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
}
......@@ -159,12 +181,22 @@ public class BlobStore {
int bytesSize = bytes.length;
BlobContainerClient blobContainerClient = getBlobContainerClient(dataPartitionId, containerName);
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
int statusCode = HttpStatus.SC_OK;
try (ByteArrayInputStream dataStream = new ByteArrayInputStream(bytes)) {
blockBlobClient.upload(dataStream, bytesSize, true);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).info("{}", MessageFormatter.format("Done uploading file content to %s", filePath).getMessage());
} catch (BlobStorageException ex) {
statusCode = ex.getStatusCode();
throw handleBlobStoreException(500, "Failed to upload file content.", ex);
} catch (IOException ex) {
throw handleBlobStoreException(500, String.format("Malformed document for item with name=%s", filePath), ex);
statusCode = HttpStatus.SC_INTERNAL_SERVER_ERROR;
throw handleBlobStoreException(500, MessageFormatter.format("Malformed document for item with name={}", filePath).getMessage(), ex);
} finally {
final long timeTaken = System.currentTimeMillis() - start;
final String dependencyData = MessageFormatter.format("{}:{}/{}", new String[]{dataPartitionId, containerName, filePath}).getMessage();
logDependency("WRITE_TO_STORAGE_CONTAINER", dependencyData, dependencyData, timeTaken, String.valueOf(statusCode), statusCode == HttpStatus.SC_OK);
}
}
......@@ -234,10 +266,11 @@ public class BlobStore {
/**
* Generates pre-signed url to a blob container.
*
* @param dataPartitionId data partition id
* @param containerName Name of the storage container
* @param expiryTime Time after which the token expires
* @param permissions permissions for the given container
* @param containerName Name of the storage container
* @param expiryTime Time after which the token expires
* @param permissions permissions for the given container
* @return Generates pre-signed url for a given container
*/
public String generatePreSignedURL(final String dataPartitionId, final String containerName, final OffsetDateTime expiryTime, final BlobContainerSasPermission permissions) {
......@@ -247,6 +280,7 @@ public class BlobStore {
/**
* Method is used to copy a file specified at Source URL to the provided destination.
*
* @param dataPartitionId Data partition id
* @param filePath Path of file (blob) to which the file has to be copied
* @param containerName Name of the storage container
......@@ -254,34 +288,49 @@ public class BlobStore {
* @return Blob Copy Final Result.
*/
public BlobCopyInfo copyFile(final String dataPartitionId, final String filePath, final String containerName,
final String sourceUrl) {
final String sourceUrl) {
BlobContainerClient blobContainerClient = getBlobContainerClient(dataPartitionId, containerName);
BlockBlobClient blockBlobClient = blobContainerClient.getBlobClient(filePath).getBlockBlobClient();
final long start = System.currentTimeMillis();
SyncPoller<BlobCopyInfo, Void> result = blockBlobClient.beginCopy(sourceUrl, Duration.ofSeconds(1));
return result.waitForCompletion().getValue();
BlobCopyInfo blobCopyInfo = result.waitForCompletion().getValue();
final long timeTaken = System.currentTimeMillis() - start;
final String target = MessageFormatter.arrayFormat("{}:{}/{}", new String[]{dataPartitionId, containerName, filePath}).getMessage();
CopyStatusType status = blobCopyInfo == null ? CopyStatusType.FAILED : blobCopyInfo.getCopyStatus();
logDependency("COPY_FILE", sourceUrl, target, timeTaken, status.toString(), status == CopyStatusType.SUCCESS);
return blobCopyInfo;
}
/**
*
* @param blockBlobClient Blob client
* @param expiryTime Time after which SAS Token expires
* @param permissions Permissions for the given blob
* @param expiryTime Time after which SAS Token expires
* @param permissions Permissions for the given blob
* @return Generates SAS Token.
*/
private String generateSASToken(final BlockBlobClient blockBlobClient, final OffsetDateTime expiryTime, final BlobSasPermission permissions) {
BlobServiceSasSignatureValues blobServiceSasSignatureValues = new BlobServiceSasSignatureValues(expiryTime, permissions);
return blockBlobClient.generateSas(blobServiceSasSignatureValues);
final long start = System.currentTimeMillis();
String sasToken = blockBlobClient.generateSas(blobServiceSasSignatureValues);
final long timeTaken = System.currentTimeMillis() - start;
logDependency("GENERATE_SAS_TOKEN", blockBlobClient.getBlobName(), blockBlobClient.getBlobUrl(), timeTaken, String.valueOf(HttpStatus.SC_OK), true);
return sasToken;
}
/**
* @param client Container client
* @param expiryTime Time after which SAS Token expires
* @param client Container client
* @param expiryTime Time after which SAS Token expires
* @param permissions Permissions for the given container
* @return Generates SAS Token.
*/
private String generateSASToken(final BlobContainerClient client, final OffsetDateTime expiryTime, final BlobContainerSasPermission permissions) {
BlobServiceSasSignatureValues blobServiceSasSignatureValues = new BlobServiceSasSignatureValues(expiryTime, permissions);
return client.generateSas(blobServiceSasSignatureValues);
final long start = System.currentTimeMillis();
String sasToken = client.generateSas(blobServiceSasSignatureValues);
final long timeTaken = System.currentTimeMillis() - start;
logDependency("GENERATE_SAS_TOKEN", client.getBlobContainerName(), client.getBlobContainerUrl(), timeTaken, String.valueOf(HttpStatus.SC_OK), true);
return sasToken;
}
/**
......@@ -309,7 +358,25 @@ public class BlobStore {
* @return Instance of AppException
*/
private AppException handleBlobStoreException(final int status, final String errorMessage, final Exception ex) {
logger.warning(LOG_PREFIX, errorMessage, Collections.<String, String>emptyMap());
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).warn(MessageFormatter.format("{}", errorMessage).getMessage(), ex);
return new AppException(status, errorMessage, ex.getMessage(), ex);
}
/**
* Log dependency.
*
* @param name the name of the command initiated with this dependency call
* @param data the command initiated by this dependency call
* @param target the target of this dependency call
* @param timeTakenInMs the request duration in milliseconds
* @param resultCode the result code of the call
* @param success indication of successful or unsuccessful call
*/
private void logDependency(final String name, final String data, final String target, final long timeTakenInMs, final String resultCode, final boolean success) {
DependencyPayload payload = new DependencyPayload(name, data, Duration.ofMillis(timeTakenInMs), resultCode, success);
payload.setType("BlobStore");
payload.setTarget(target);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).logDependency(payload);
}
}
......@@ -18,9 +18,12 @@ import com.microsoft.azure.eventgrid.EventGridClient;
import com.microsoft.azure.eventgrid.TopicCredentials;
import com.microsoft.azure.eventgrid.implementation.EventGridClientImpl;
import org.opengroup.osdu.azure.cache.EventGridTopicClientCache;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.azure.partition.EventGridTopicPartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceEventGridClient;
import org.opengroup.osdu.common.Validators;
import org.opengroup.osdu.core.common.partition.PartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -29,20 +32,23 @@ import org.springframework.stereotype.Component;
*/
@Component
public class EventGridTopicClientFactoryImpl implements IEventGridTopicClientFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(EventGridTopicClientFactoryImpl.class.getName());
@Autowired
private PartitionServiceClient partitionService;
private PartitionServiceEventGridClient partitionService;
@Autowired
private EventGridTopicClientCache clientCache;
/**
*
* @param dataPartitionId Data partition id
* @param topicName Topic Name
* @param topicName Topic name
* @return EventGridClient
* @throws PartitionException partitionException
*/
@Override
public EventGridClient getClient(final String dataPartitionId, final TopicName topicName) {
public EventGridClient getClient(final String dataPartitionId, final String topicName) throws PartitionException {
Validators.checkNotNullAndNotEmpty(dataPartitionId, "dataPartitionId");
Validators.checkNotNull(topicName, "topicName");
......@@ -50,16 +56,15 @@ public class EventGridTopicClientFactoryImpl implements IEventGridTopicClientFac
if (this.clientCache.containsKey(cacheKey)) {
return this.clientCache.get(cacheKey);
}
EventGridTopicPartitionInfoAzure eventGridTopicPartitionInfoAzure =
this.partitionService.getEventGridTopicInPartition(dataPartitionId, topicName);
PartitionInfoAzure pi = this.partitionService.getPartition(dataPartitionId);
TopicCredentials topicCredentials = null;
if (topicName == TopicName.RECORDS_CHANGED) {
topicCredentials = new TopicCredentials(pi.getEventGridRecordsTopicAccessKey());
}
TopicCredentials topicCredentials =
new TopicCredentials(eventGridTopicPartitionInfoAzure.getTopicAccessKey());
EventGridClient eventGridClient = new EventGridClientImpl(topicCredentials);
this.clientCache.put(cacheKey, eventGridClient);
return eventGridClient;
}
}
}
\ No newline at end of file
......@@ -16,10 +16,14 @@ package org.opengroup.osdu.azure.eventgrid;
import com.microsoft.azure.eventgrid.EventGridClient;
import com.microsoft.azure.eventgrid.models.EventGridEvent;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import lombok.SneakyThrows;
import org.opengroup.osdu.azure.cosmosdb.CosmosStoreBulkOperations;
import org.opengroup.osdu.azure.partition.EventGridTopicPartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceEventGridClient;
import org.opengroup.osdu.core.common.logging.ILogger;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
......@@ -55,33 +59,32 @@ import java.util.List;
@Component
public class EventGridTopicStore {
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosStoreBulkOperations.class.getName());
@Autowired
private IEventGridTopicClientFactory eventGridTopicClientFactory;
@Autowired
private ILogger logger;
@Autowired
private PartitionServiceClient partitionService;
private PartitionServiceEventGridClient eventGridPartitionClient;
/**
* @param dataPartitionId Data partition id
* @param topicName Topic name
* @param eventsList List of Event Grid Events
*/
public void publishToEventGridTopic(final String dataPartitionId, final TopicName topicName, final List<EventGridEvent> eventsList) {
PartitionInfoAzure pi = this.partitionService.getPartition(dataPartitionId);
@SneakyThrows
public void publishToEventGridTopic(final String dataPartitionId, final String topicName, final List<EventGridEvent> eventsList) {
EventGridTopicPartitionInfoAzure eventGridTopicPartitionInfoAzure = this.eventGridPartitionClient.getEventGridTopicInPartition(dataPartitionId, topicName);
String endpoint = "";
if (topicName == TopicName.RECORDS_CHANGED) {
try {
endpoint = String.format("https://%s/", new URI(pi.getEventGridRecordsTopicEndpoint()).getHost());
} catch (URISyntaxException e) {
throw new AppException(500, "Invalid Event Grid endpoint URI", "PartitionInfo for eventgrid-recordstopic " + pi.getEventGridRecordsTopicEndpoint(), e);
}
String endpoint;
try {
endpoint = String.format("https://%s/", new URI(eventGridTopicPartitionInfoAzure.getTopicName()).getHost());
} catch (URISyntaxException e) {
throw new AppException(500, "Invalid Event Grid endpoint URI", "PartitionInfo for Event Grid Topic " + topicName, e);
}
EventGridClient eventGridClient = eventGridTopicClientFactory.getClient(dataPartitionId, topicName);
eventGridClient.publishEvents(endpoint, eventsList);
}
}
......@@ -15,6 +15,7 @@
package org.opengroup.osdu.azure.eventgrid;
import com.microsoft.azure.eventgrid.EventGridClient;
import org.opengroup.osdu.core.common.partition.PartitionException;
/**
* Interface for Event Grid Topic client factory to return appropriate
......@@ -23,9 +24,11 @@ import com.microsoft.azure.eventgrid.EventGridClient;
public interface IEventGridTopicClientFactory {
/**
*
* @param dataPartitionId Data partition id
* @param topicName Topic name
* @return EventGridClient
* @throws PartitionException partitionException
*/
EventGridClient getClient(String dataPartitionId, TopicName topicName);
EventGridClient getClient(String dataPartitionId, String topicName) throws PartitionException;
}
......@@ -28,8 +28,8 @@ public final class DependencyPayload {
/**
* Instantiate payload with specified values.
*
* @param dependencyData the name of the command initiated with this dependency call
* @param dependencyName the command initiated by this dependency call
* @param dependencyName the name of the command initiated with this dependency call
* @param dependencyData the command initiated by this dependency call
* @param dependencyDuration the request duration
* @param dependencyResultCode the result code of the call
* @param dependencySuccess indication of successful or unsuccessful call
......
......@@ -12,12 +12,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.azure.eventgrid;
package org.opengroup.osdu.azure.partition;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* Topic Name Enum.
* The Topic Stores and client are controlled by the same.
* Azure event grid topic data partition variables.
*/
public enum TopicName {
RECORDS_CHANGED
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class EventGridTopicPartitionInfoAzure {
private String topicName;
private String topicAccessKey;
}
......@@ -86,8 +86,8 @@ public class PartitionServiceClient {
* @return PartitionServiceClient
*/
private IPartitionProvider getServiceClient() {
this.headers.put(DpsHeaders.AUTHORIZATION, "Bearer " + this.tokenService.getAuthorizationToken());
return this.partitionFactory.create(headers);
DpsHeaders newHeaders = DpsHeaders.createFromMap(headers.getHeaders());
newHeaders.put(DpsHeaders.AUTHORIZATION, "Bearer " + tokenService.getAuthorizationToken());
return partitionFactory.create(newHeaders);
}
}
\ No newline at end of file
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.azure.partition;
import com.azure.security.keyvault.secrets.SecretClient;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import org.apache.http.HttpStatus;
import org.opengroup.osdu.azure.KeyVaultFacade;
import org.opengroup.osdu.azure.util.AzureServicePrincipleTokenService;
import org.opengroup.osdu.common.Validators;
import org.opengroup.osdu.core.common.model.http.AppException;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.partition.IPartitionFactory;
import org.opengroup.osdu.core.common.partition.IPartitionProvider;
import org.opengroup.osdu.core.common.partition.PartitionException;
import org.opengroup.osdu.core.common.partition.PartitionInfo;
import org.opengroup.osdu.core.common.partition.Property;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
/**
* Partition service client for Event Grid implementation.
*/
@Service
@Lazy
public class PartitionServiceEventGridClient {
private static final String ACCESS_KEY_REGEX = "^eventgrid-([a-zA-Z0-9]*)topic-accesskey$";
private static final String TOPIC_NAME_REGEX = "^eventgrid-([a-zA-Z0-9]*)topic$";
private static final Logger LOGGER = LoggerFactory.getLogger(PartitionServiceEventGridClient.class.getName());
private final Gson gson = new Gson();
@Autowired
private IPartitionFactory partitionFactory;
@Autowired
private SecretClient secretClient;
@Autowired
private AzureServicePrincipleTokenService tokenService;
@Autowired
private DpsHeaders headers;
/**
* Get TopicInfo for a given topic.
*
* @param partitionId partitionId
* @param topicName topicName
* @return EventGridTopicPartitionInfoAzure
* @throws AppException exception from the configuration
* @throws PartitionException AppException Exception thrown by {@link IPartitionFactory}
*/
public EventGridTopicPartitionInfoAzure getEventGridTopicInPartition(final String partitionId, final String topicName) throws AppException, PartitionException {
Validators.checkNotNullAndNotEmpty(partitionId, "partitionId");
Validators.checkNotNullAndNotEmpty(topicName, "topicName");
Map<String, EventGridTopicPartitionInfoAzure> eventGridTopicPartitionInfoAzure = getAllEventGridTopicsInPartition(partitionId);
if (!eventGridTopicPartitionInfoAzure.containsKey(topicName)) {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Invalid EventGrid Partition configuration for the partition " + partitionId, "Please refer to wiki here <>");
}
return eventGridTopicPartitionInfoAzure.get(topicName);
}
/**
* Get partition info.
*
* @param partitionId Partition Id
* @return Partition info
* @throws AppException Exception thrown by {@link IPartitionFactory}
* @throws PartitionException Exception thrown by {@link IPartitionFactory}
*/
Map<String, EventGridTopicPartitionInfoAzure> getAllEventGridTopicsInPartition(final String partitionId) throws AppException, PartitionException {
PartitionInfo partitionInfo = getPartitionInfo(partitionId);
Map<String, Property> propertyMap = partitionInfo.getProperties();
Map<String, EventGridTopicPartitionInfoAzure> topics = new HashMap<>();
for (Map.Entry<String, Property> property : propertyMap.entrySet()) {
if (isEventGridProperty(property)) {
StringTokenizer stringTokenizer = new StringTokenizer(property.getKey(), "-");
if (stringTokenizer.countTokens() == 2) {
addEventGridTopicName(topics, property, stringTokenizer);
} else if (stringTokenizer.countTokens() == 3) {
addEventGridAccessKey(topics, property, stringTokenizer);
} else {
throw new AppException(HttpStatus.SC_INTERNAL_SERVER_ERROR, "Invalid EventGrid Partition configuration for the partition " + partitionId, "Please reconfigure the partition service");
}
}
}
return topics;
}
/**
* @param partitionId partitionId
* @return PartitionInfo
* @throws PartitionException Exception thrown by {@link IPartitionFactory}
*/