Commit fee409d2 authored by Ronak Sakhuja's avatar Ronak Sakhuja
Browse files

Removed BulkExecutor

parent 6162db0e
Pipeline #46356 failed with stage
in 2 minutes
......@@ -6,13 +6,11 @@ import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.DocumentCollection;
import com.microsoft.azure.documentdb.bulkexecutor.DocumentBulkExecutor;
import com.microsoft.azure.documentdb.RetryOptions;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.PostConstruct;
import org.opengroup.osdu.azure.di.CosmosBulkRetryConfiguration;
import org.opengroup.osdu.azure.partition.PartitionInfoAzure;
import org.opengroup.osdu.azure.partition.PartitionServiceClient;
import org.opengroup.osdu.common.Validators;
......@@ -42,9 +40,6 @@ public class CosmosBulkExecutorFactoryImpl implements ICosmosBulkExecutorFactory
@Autowired
private int documentClientMaxPoolSize;
@Autowired
private CosmosBulkRetryConfiguration cosmosBulkRetryConfiguration;
@Autowired
private int bulkExecutorMaxRUs;
......@@ -98,7 +93,6 @@ public class CosmosBulkExecutorFactoryImpl implements ICosmosBulkExecutorFactory
DocumentClient client = getDocumentClient(pi.getCosmosEndpoint(),
pi.getCosmosPrimaryKey());
RetryOptions retryOptions = cosmosBulkRetryConfiguration.getRetryOptions();
String collectionLink = String.format(unformattedCollectionLink, cosmosDBName, collectionName);
DocumentCollection collection = client.readCollection(collectionLink, null).getResource();
DocumentBulkExecutor executor = DocumentBulkExecutor.builder().from(
......@@ -107,7 +101,7 @@ public class CosmosBulkExecutorFactoryImpl implements ICosmosBulkExecutorFactory
collectionName,
collection.getPartitionKey(),
bulkExecutorMaxRUs
).withInitializationRetryOptions(retryOptions).build();
).build();
// Set client retry options to 0 because retries are handled by DocumentBulkExecutor class.
client.getConnectionPolicy().getRetryOptions().setMaxRetryAttemptsOnThrottledRequests(0);
......
/** Copyright © Microsoft Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
**/
package org.opengroup.osdu.azure.di;
import com.microsoft.azure.documentdb.RetryOptions;
import lombok.Getter;
import lombok.Setter;
import org.opengroup.osdu.azure.logging.CoreLoggerFactory;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* Class to set configuration for CosmosBulkExecutor.
*/
@Configuration
@ConfigurationProperties("azure.cosmosbulk")
@Getter
@Setter
public class CosmosBulkRetryConfiguration {
public static final String LOGGER_NAME = CosmosBulkRetryConfiguration.class.getName();
private RetryOptions defaultRetryOptions = new RetryOptions();
private int maxRetryAttemptsOnThrottledRequests = defaultRetryOptions.getMaxRetryAttemptsOnThrottledRequests();
private int maxRetryWaitTimeInSeconds = defaultRetryOptions.getMaxRetryWaitTimeInSeconds();
private long retryWithInitialBackoffTimeInMilliseconds = defaultRetryOptions.getRetryWithInitialBackoffTime();
private int retryWithBackoffMultiplier = defaultRetryOptions.getRetryWithBackoffMultiplier();
/**
* Create RetryOptions object based on application.properties configuration.
* @return object of RetryOptions
*/
public RetryOptions getRetryOptions() {
RetryOptions retryOptions = new RetryOptions();
retryOptions.setMaxRetryAttemptsOnThrottledRequests(maxRetryAttemptsOnThrottledRequests);
retryOptions.setMaxRetryWaitTimeInSeconds(maxRetryWaitTimeInSeconds);
retryOptions.setRetryWithInitialBackoffTime(retryWithInitialBackoffTimeInMilliseconds);
retryOptions.setRetryWithBackoffMultiplier(retryWithBackoffMultiplier);
CoreLoggerFactory.getInstance().getLogger(LOGGER_NAME).info("Retry Options on CosmosBulkExecutorClient with maxRetryAttempts = {} , MaxRetryWaitTime = {} , retryWithInitialBackOffTime = {} , retryWithBackoffMultiplier = {}", retryOptions.getMaxRetryAttemptsOnThrottledRequests(), retryOptions.getMaxRetryWaitTimeInSeconds(), retryOptions.getRetryWithInitialBackoffTime(), retryOptions.getRetryWithBackoffMultiplier());
return retryOptions;
}
}
/** Copyright © Microsoft Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
**/
package org.opengroup.osdu.azure.di;
import com.microsoft.azure.documentdb.RetryOptions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opengroup.osdu.azure.logging.CoreLogger;
import org.opengroup.osdu.azure.logging.CoreLoggerFactory;
import java.lang.reflect.Field;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.openMocks;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(MockitoExtension.class)
public class CosmosBulkRetryConfigurationTest {
@Mock
private CoreLoggerFactory coreLoggerFactory;
@Mock
private CoreLogger coreLogger;
@InjectMocks
private CosmosBulkRetryConfiguration cosmosBulkRetryConfiguration;
/**
* Workaround for inability to mock static methods like getInstance().
*
* @param mock CoreLoggerFactory mock instance
*/
private void mockSingleton(CoreLoggerFactory mock) {
try {
Field instance = CoreLoggerFactory.class.getDeclaredField("instance");
instance.setAccessible(true);
instance.set(null, mock);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
/**
* Reset workaround for inability to mock static methods like getInstance().
*/
private void resetSingleton() {
try {
Field instance = CoreLoggerFactory.class.getDeclaredField("instance");
instance.setAccessible(true);
instance.set(null, null);
instance.setAccessible(false);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@BeforeEach
void setUp() {
openMocks(this);
mockSingleton(coreLoggerFactory);
when(coreLoggerFactory.getLogger(anyString())).thenReturn(coreLogger);
}
@AfterEach
public void takeDown() {
resetSingleton();
}
private RetryOptions defaultRetryOptions = new RetryOptions();
@Test
public void should_set_max_retry_attempts() {
cosmosBulkRetryConfiguration.setMaxRetryAttemptsOnThrottledRequests(30);
RetryOptions retryOptions = cosmosBulkRetryConfiguration.getRetryOptions();
assertEquals(retryOptions.getMaxRetryAttemptsOnThrottledRequests(),30);
assertEquals(cosmosBulkRetryConfiguration.getMaxRetryWaitTimeInSeconds(),defaultRetryOptions.getMaxRetryWaitTimeInSeconds());
assertEquals(cosmosBulkRetryConfiguration.getRetryWithInitialBackoffTimeInMilliseconds(),defaultRetryOptions.getRetryWithInitialBackoffTime());
assertEquals(cosmosBulkRetryConfiguration.getRetryWithBackoffMultiplier(),defaultRetryOptions.getRetryWithBackoffMultiplier());
}
@Test
public void should_set_max_retry_wait_timeout() {
cosmosBulkRetryConfiguration.setMaxRetryWaitTimeInSeconds(45);
RetryOptions retryOptions = cosmosBulkRetryConfiguration.getRetryOptions();
assertEquals(retryOptions.getMaxRetryAttemptsOnThrottledRequests(),defaultRetryOptions.getMaxRetryAttemptsOnThrottledRequests());
assertEquals(cosmosBulkRetryConfiguration.getMaxRetryWaitTimeInSeconds(),45);
assertEquals(cosmosBulkRetryConfiguration.getRetryWithInitialBackoffTimeInMilliseconds(),defaultRetryOptions.getRetryWithInitialBackoffTime());
assertEquals(cosmosBulkRetryConfiguration.getRetryWithBackoffMultiplier(),defaultRetryOptions.getRetryWithBackoffMultiplier());
}
@Test
public void should_set_retry_with_initial_backoff() {
cosmosBulkRetryConfiguration.setRetryWithInitialBackoffTimeInMilliseconds(50);
RetryOptions retryOptions = cosmosBulkRetryConfiguration.getRetryOptions();
assertEquals(retryOptions.getMaxRetryAttemptsOnThrottledRequests(),defaultRetryOptions.getMaxRetryAttemptsOnThrottledRequests());
assertEquals(cosmosBulkRetryConfiguration.getMaxRetryWaitTimeInSeconds(),defaultRetryOptions.getMaxRetryWaitTimeInSeconds());
assertEquals(cosmosBulkRetryConfiguration.getRetryWithInitialBackoffTimeInMilliseconds(),50);
assertEquals(cosmosBulkRetryConfiguration.getRetryWithBackoffMultiplier(),defaultRetryOptions.getRetryWithBackoffMultiplier());
}
@Test
public void should_set_retry_with_backoff_multiplier() {
cosmosBulkRetryConfiguration.setRetryWithBackoffMultiplier(3);
RetryOptions retryOptions = cosmosBulkRetryConfiguration.getRetryOptions();
assertEquals(retryOptions.getMaxRetryAttemptsOnThrottledRequests(),defaultRetryOptions.getMaxRetryAttemptsOnThrottledRequests());
assertEquals(cosmosBulkRetryConfiguration.getMaxRetryWaitTimeInSeconds(),defaultRetryOptions.getMaxRetryWaitTimeInSeconds());
assertEquals(cosmosBulkRetryConfiguration.getRetryWithInitialBackoffTimeInMilliseconds(),defaultRetryOptions.getRetryWithInitialBackoffTime());
assertEquals(cosmosBulkRetryConfiguration.getRetryWithBackoffMultiplier(),3);
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment