Skip to content
GitLab
Menu
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
System
Lib
cloud
azure
OS Core Lib Azure
Commits
9fdce08d
Commit
9fdce08d
authored
Jul 29, 2020
by
harshit aggarwal
Browse files
Removing CosmosFacade
parent
b1542623
Pipeline
#4454
failed with stages
in 48 seconds
Changes
2
Pipelines
3
Hide whitespace changes
Inline
Side-by-side
src/main/java/org/opengroup/osdu/azure/CosmosFacade.java
deleted
100644 → 0
View file @
b1542623
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
org.opengroup.osdu.azure
;
import
com.azure.cosmos.CosmosClientException
;
import
com.azure.cosmos.CosmosContainer
;
import
com.azure.cosmos.CosmosItem
;
import
com.azure.cosmos.CosmosItemProperties
;
import
com.azure.cosmos.CosmosItemRequestOptions
;
import
com.azure.cosmos.FeedOptions
;
import
com.azure.cosmos.FeedResponse
;
import
com.azure.cosmos.NotFoundException
;
import
com.azure.cosmos.SqlQuerySpec
;
import
com.azure.cosmos.internal.AsyncDocumentClient
;
import
com.azure.cosmos.internal.Document
;
import
org.opengroup.osdu.core.common.model.http.AppException
;
import
reactor.core.publisher.Flux
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Optional
;
import
java.util.logging.Level
;
import
java.util.logging.Logger
;
/**
* A simpler interface for interacting with CosmosDB.
* Usage Examples:
* <pre>
* {@code
* @Inject
* private CosmosContainer container;
*
* void findItemExample() {
* Optional<MyObject> myItem = CosmosFacade.findItem(container, "id", "partition-key", MyObject.class);
* myItem.isPresent(); // true if found, false otherwise
* }
*
* void findAllItemsExample() {
* List<MyObject> objects = CosmosFacade.findAllItems(container, MyObject.class);
* }
*
* void queryItemsExample() {
* SqlQuerySpec query = new SqlQuerySpec()
* .setQueryText("SELECT * FROM c WHERE c.isFoo = @isFoo")
* .setParameters(new SqlParameterList(new SqlParameter("@isFoo", true)));
* FeedOptions options = new FeedOptions().setEnableCrossPartitionQuery(true);
*
* List<MyObject> objects = CosmosFacade.queryItems(container, query, options, MyObject.class);
* }
* }
* </pre>
*/
public
final
class
CosmosFacade
{
private
static
final
Logger
LOGGER
=
Logger
.
getLogger
(
CosmosFacade
.
class
.
getName
());
/**
* Private constructor -- this class should never be instantiated.
*/
private
CosmosFacade
()
{
}
/**
* @param cosmos Container to query
* @param id ID of item
* @param partitionKey Partition key of item
*/
public
static
void
deleteItem
(
final
CosmosContainer
cosmos
,
final
String
id
,
final
String
partitionKey
)
{
try
{
findItem
(
cosmos
,
id
,
partitionKey
).
delete
(
new
CosmosItemRequestOptions
(
partitionKey
));
}
catch
(
NotFoundException
e
)
{
String
errorMessage
=
"Item was unexpectedly not found"
;
LOGGER
.
log
(
Level
.
WARNING
,
errorMessage
,
e
);
throw
new
AppException
(
404
,
errorMessage
,
e
.
getMessage
(),
e
);
}
catch
(
CosmosClientException
e
)
{
String
errorMessage
=
"Unexpectedly failed to delete item from CosmosDB"
;
LOGGER
.
log
(
Level
.
WARNING
,
errorMessage
,
e
);
throw
new
AppException
(
500
,
errorMessage
,
e
.
getMessage
(),
e
);
}
}
/**
* @param cosmos Container to query
* @param id ID of item
* @param partitionKey Partition key of item
* @param clazz Class to serialize results into
* @param <T> Type to return
* @return The item that was found based on the IDs provided
*/
public
static
<
T
>
Optional
<
T
>
findItem
(
final
CosmosContainer
cosmos
,
final
String
id
,
final
String
partitionKey
,
final
Class
<
T
>
clazz
)
{
try
{
T
item
=
findItem
(
cosmos
,
id
,
partitionKey
)
.
read
(
new
CosmosItemRequestOptions
(
partitionKey
))
.
getProperties
()
.
getObject
(
clazz
);
return
Optional
.
ofNullable
(
item
);
}
catch
(
NotFoundException
e
)
{
LOGGER
.
info
(
String
.
format
(
"Unable to find item with ID=%s and PK=%s"
,
id
,
partitionKey
));
return
Optional
.
empty
();
}
catch
(
IOException
e
)
{
LOGGER
.
warning
(
String
.
format
(
"Malformed document for item with ID=%s and PK=%s"
,
id
,
partitionKey
));
return
Optional
.
empty
();
}
catch
(
CosmosClientException
e
)
{
String
errorMessage
=
"Unexpectedly encountered error calling CosmosDB"
;
LOGGER
.
log
(
Level
.
WARNING
,
errorMessage
,
e
);
throw
new
AppException
(
500
,
errorMessage
,
e
.
getMessage
(),
e
);
}
}
/**
* @param container Container to query
* @param clazz Class type of response
* @param <T> Type of response
* @return List of items found in container
*/
public
static
<
T
>
List
<
T
>
findAllItems
(
final
CosmosContainer
container
,
final
Class
<
T
>
clazz
)
{
FeedOptions
options
=
new
FeedOptions
().
setEnableCrossPartitionQuery
(
true
);
return
queryItems
(
container
,
new
SqlQuerySpec
(
"SELECT * FROM c"
),
options
,
clazz
);
}
/**
* @param container Container to query
* @param clazz Class type of response
* @param query {@link SqlQuerySpec} to execute
* @param options Query options
* @param <T> Type of response
* @return List of items found in container
*/
public
static
<
T
>
List
<
T
>
queryItems
(
final
CosmosContainer
container
,
final
SqlQuerySpec
query
,
final
FeedOptions
options
,
final
Class
<
T
>
clazz
)
{
ArrayList
<
T
>
results
=
new
ArrayList
<>();
Iterator
<
FeedResponse
<
CosmosItemProperties
>>
paginatedResponse
=
container
.
queryItems
(
query
,
options
);
while
(
paginatedResponse
.
hasNext
())
{
for
(
CosmosItemProperties
properties
:
paginatedResponse
.
next
().
getResults
())
{
try
{
results
.
add
(
properties
.
getObject
(
clazz
));
}
catch
(
IOException
e
)
{
String
errorMessage
=
String
.
format
(
"Malformed document for item with ID=%s"
,
properties
.
getId
());
LOGGER
.
log
(
Level
.
WARNING
,
errorMessage
,
e
);
throw
new
AppException
(
500
,
errorMessage
,
e
.
getMessage
(),
e
);
}
}
}
return
results
;
}
/**
* @param client {@link AsyncDocumentClient} used to configure/execute requests against database service
* @param dbName Cosmos DB name
* @param container Container to query
* @param clazz Class type of response
* @param <T> Type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @return List of items found on specific page in container
*/
public
static
<
T
>
List
<
T
>
findAllItems
(
final
AsyncDocumentClient
client
,
final
String
dbName
,
final
String
container
,
final
Class
<
T
>
clazz
,
final
short
pageSize
,
final
int
pageNum
)
{
return
queryItems
(
client
,
dbName
,
container
,
new
SqlQuerySpec
(
"SELECT * FROM c"
),
clazz
,
pageSize
,
pageNum
);
}
/**
* @param client {@link AsyncDocumentClient} used to configure/execute requests against database service
* @param dbName Cosmos DB name
* @param container Container to query
* @param query {@link SqlQuerySpec} to execute
* @param clazz Class type of response
* @param <T> Type of response
* @param pageSize Number of items returned
* @param pageNum Page number returned
* @return List of items found on specific page in container
*/
public
static
<
T
>
List
<
T
>
queryItems
(
final
AsyncDocumentClient
client
,
final
String
dbName
,
final
String
container
,
final
SqlQuerySpec
query
,
final
Class
<
T
>
clazz
,
final
short
pageSize
,
final
int
pageNum
)
{
String
continuationToken
=
null
;
int
currentPage
=
0
;
HashMap
<
String
,
List
<
T
>>
results
;
do
{
String
nextContinuationToken
=
""
;
results
=
returnItemsWithToken
(
client
,
dbName
,
container
,
query
,
clazz
,
pageSize
,
continuationToken
);
for
(
Map
.
Entry
<
String
,
List
<
T
>>
entry
:
results
.
entrySet
())
{
nextContinuationToken
=
entry
.
getKey
();
}
continuationToken
=
nextContinuationToken
;
currentPage
++;
}
while
(
currentPage
<
pageNum
&&
continuationToken
!=
null
);
return
results
.
get
(
continuationToken
);
}
/**
* @param client {@link AsyncDocumentClient} used to configure/execute requests against database service
* @param dbName Cosmos DB name
* @param container Container to query
* @param query {@link SqlQuerySpec} to execute
* @param clazz Class type of response
* @param <T> Type of response
* @param pageSize Number of items returned
* @param continuationToken Token used to continue the enumeration
* @return Continuation Token and list of documents in container
*/
private
static
<
T
>
HashMap
<
String
,
List
<
T
>>
returnItemsWithToken
(
final
AsyncDocumentClient
client
,
final
String
dbName
,
final
String
container
,
final
SqlQuerySpec
query
,
final
Class
<
T
>
clazz
,
final
short
pageSize
,
final
String
continuationToken
)
{
HashMap
<
String
,
List
<
T
>>
map
=
new
HashMap
<>();
List
<
T
>
items
=
new
ArrayList
<
T
>();
FeedOptions
feedOptions
=
new
FeedOptions
()
.
maxItemCount
((
int
)
pageSize
)
.
setEnableCrossPartitionQuery
(
true
)
.
requestContinuation
(
continuationToken
);
String
collectionLink
=
String
.
format
(
"/dbs/%s/colls/%s"
,
dbName
,
container
);
Flux
<
FeedResponse
<
Document
>>
queryFlux
=
client
.
queryDocuments
(
collectionLink
,
query
,
feedOptions
);
Iterator
<
FeedResponse
<
Document
>>
it
=
queryFlux
.
toIterable
().
iterator
();
FeedResponse
<
Document
>
page
=
it
.
next
();
List
<
Document
>
results
=
page
.
getResults
();
for
(
Document
doc
:
results
)
{
T
obj
=
doc
.
toObject
(
clazz
);
items
.
add
(
obj
);
}
map
.
put
(
page
.
getContinuationToken
(),
items
);
return
map
;
}
/**
* @param container Container to query
* @param item Data object to store
* @param <T> Type of response
*/
public
static
<
T
>
void
upsertItem
(
final
CosmosContainer
container
,
final
T
item
)
{
try
{
container
.
upsertItem
(
item
);
}
catch
(
CosmosClientException
e
)
{
String
errorMessage
=
"Unexpectedly failed to put item into CosmosDB"
;
LOGGER
.
log
(
Level
.
WARNING
,
errorMessage
,
e
);
throw
new
AppException
(
500
,
errorMessage
,
e
.
getMessage
(),
e
);
}
}
/**
* @param cosmos Container to query
* @param id ID of item
* @param partitionKey Partition key of item
* @return The item. It may not exist - the caller must check
*/
private
static
CosmosItem
findItem
(
final
CosmosContainer
cosmos
,
final
String
id
,
final
String
partitionKey
)
{
return
cosmos
.
getItem
(
id
,
partitionKey
);
}
}
src/test/java/org/opengroup/osdu/azure/CosmosFacadeTest.java
deleted
100644 → 0
View file @
b1542623
// Copyright © Microsoft Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package
org.opengroup.osdu.azure
;
import
com.azure.cosmos.*
;
import
com.azure.cosmos.internal.AsyncDocumentClient
;
import
com.azure.cosmos.internal.Document
;
import
org.junit.jupiter.api.BeforeEach
;
import
org.junit.jupiter.api.Test
;
import
org.junit.jupiter.api.extension.ExtendWith
;
import
org.mockito.ArgumentCaptor
;
import
org.mockito.Mock
;
import
org.mockito.junit.jupiter.MockitoExtension
;
import
org.opengroup.osdu.core.common.model.http.AppException
;
import
reactor.core.publisher.Flux
;
import
java.io.IOException
;
import
java.util.ArrayList
;
import
java.util.Collections
;
import
java.util.Iterator
;
import
java.util.List
;
import
static
org
.
junit
.
jupiter
.
api
.
Assertions
.*;
import
static
org
.
mockito
.
ArgumentMatchers
.
any
;
import
static
org
.
mockito
.
Mockito
.*;
@ExtendWith
(
MockitoExtension
.
class
)
class
CosmosFacadeTest
{
private
static
final
String
ID
=
"id"
;
private
static
final
String
PARTITION_KEY
=
"pk"
;
private
static
final
String
COSMOS_DB
=
"cosmosdb"
;
private
static
final
String
CONTAINER
=
"container"
;
private
static
final
String
COLLECTION_LINK
=
"/dbs/cosmosdb/colls/container"
;
@Mock
private
AsyncDocumentClient
documentClient
;
@Mock
private
CosmosContainer
container
;
@Mock
private
CosmosItem
cosmosItem
;
@Mock
private
CosmosItemProperties
cosmosItemProperties
;
@Mock
private
CosmosItemResponse
cosmosResponse
;
@Mock
private
Iterator
<
FeedResponse
<
CosmosItemProperties
>>
queryResponse
;
@BeforeEach
void
init
()
throws
CosmosClientException
{
// mock the common cosmos request/response pattern that most tests need. because
// not all tests will leverage these, we make the mocks lenient.
lenient
().
doReturn
(
cosmosItem
).
when
(
container
).
getItem
(
ID
,
PARTITION_KEY
);
lenient
().
doReturn
(
cosmosResponse
).
when
(
cosmosItem
).
read
(
any
());
lenient
().
doReturn
(
cosmosItemProperties
).
when
(
cosmosResponse
).
getProperties
();
}
@Test
void
delete_throws404_ifNotFound
()
throws
CosmosClientException
{
doThrow
(
NotFoundException
.
class
).
when
(
cosmosItem
).
delete
(
any
());
AppException
exception
=
assertThrows
(
AppException
.
class
,
()
->
{
CosmosFacade
.
deleteItem
(
container
,
ID
,
PARTITION_KEY
);
});
assertEquals
(
404
,
exception
.
getError
().
getCode
());
}
@Test
void
delete_throws500_ifUnknownError
()
throws
CosmosClientException
{
doThrow
(
CosmosClientException
.
class
).
when
(
cosmosItem
).
delete
(
any
());
AppException
exception
=
assertThrows
(
AppException
.
class
,
()
->
{
CosmosFacade
.
deleteItem
(
container
,
ID
,
PARTITION_KEY
);
});
assertEquals
(
500
,
exception
.
getError
().
getCode
());
}
@Test
void
findItem_returnsEmpty_ifNotFound
()
throws
CosmosClientException
{
doThrow
(
NotFoundException
.
class
).
when
(
cosmosItem
).
read
(
any
());
assertFalse
(
CosmosFacade
.
findItem
(
container
,
ID
,
PARTITION_KEY
,
String
.
class
).
isPresent
());
}
@Test
void
findItem_returnsEmpty_ifMalformedDocument
()
throws
IOException
{
doThrow
(
IOException
.
class
).
when
(
cosmosItemProperties
).
getObject
(
any
());
assertFalse
(
CosmosFacade
.
findItem
(
container
,
ID
,
PARTITION_KEY
,
String
.
class
).
isPresent
());
}
@Test
void
findItem_throws500_ifUnknownError
()
throws
CosmosClientException
{
doThrow
(
CosmosClientException
.
class
).
when
(
cosmosItem
).
read
(
any
());
AppException
exception
=
assertThrows
(
AppException
.
class
,
()
->
{
CosmosFacade
.
findItem
(
container
,
ID
,
PARTITION_KEY
,
String
.
class
);
});
assertEquals
(
500
,
exception
.
getError
().
getCode
());
}
@Test
void
upsertItem_throws500_ifUnknownError
()
throws
CosmosClientException
{
doThrow
(
CosmosClientException
.
class
).
when
(
container
).
upsertItem
(
any
());
AppException
exception
=
assertThrows
(
AppException
.
class
,
()
->
{
CosmosFacade
.
upsertItem
(
container
,
"some-data"
);
});
assertEquals
(
500
,
exception
.
getError
().
getCode
());
}
@Test
void
findAllItems_executesCorrectQuery
()
throws
IOException
{
mockQueryResponse
(
"s1"
);
CosmosFacade
.
findAllItems
(
container
,
String
.
class
);
ArgumentCaptor
<
SqlQuerySpec
>
query
=
ArgumentCaptor
.
forClass
(
SqlQuerySpec
.
class
);
ArgumentCaptor
<
FeedOptions
>
feedOptions
=
ArgumentCaptor
.
forClass
(
FeedOptions
.
class
);
verify
(
container
).
queryItems
(
query
.
capture
(),
feedOptions
.
capture
());
assertEquals
(
"SELECT * FROM c"
,
query
.
getValue
().
getQueryText
());
assertTrue
(
feedOptions
.
getValue
().
getEnableCrossPartitionQuery
());
}
@Test
void
findAllItems_pagesCorrectly
()
throws
IOException
{
mockQueryResponse
(
"s1"
,
"s2"
,
"s3"
);
List
<
String
>
results
=
CosmosFacade
.
findAllItems
(
container
,
String
.
class
);
assertEquals
(
3
,
results
.
size
());
assertTrue
(
results
.
contains
(
"s1"
));
assertTrue
(
results
.
contains
(
"s2"
));
assertTrue
(
results
.
contains
(
"s3"
));
}
@Test
void
findAllItems_byPageNumber
()
{
mockPaginatedQueryResponse
(
2
,
2
,
"s1"
,
"s2"
,
"s3"
,
"s4"
,
"s5"
);
List
<
String
>
results
=
CosmosFacade
.
findAllItems
(
documentClient
,
COSMOS_DB
,
CONTAINER
,
String
.
class
,
(
short
)
2
,
2
);
assertEquals
(
2
,
results
.
size
());
assertTrue
(
results
.
contains
(
"s3"
));
assertTrue
(
results
.
contains
(
"s4"
));
mockPaginatedQueryResponse
(
3
,
2
,
"T1"
,
"T2"
,
"T3"
,
"T4"
,
"T5"
);
results
=
CosmosFacade
.
findAllItems
(
documentClient
,
COSMOS_DB
,
CONTAINER
,
String
.
class
,
(
short
)
3
,
2
);
assertEquals
(
2
,
results
.
size
());
assertTrue
(
results
.
contains
(
"T4"
));
assertTrue
(
results
.
contains
(
"T5"
));
}
@Test
void
queryItems_byPageNumber
()
throws
IOException
{
mockPaginatedQueryResponse
(
3
,
1
,
"W1"
,
"W2"
,
"W3"
,
"W4"
,
"W5"
);
List
<
String
>
results
=
CosmosFacade
.
queryItems
(
documentClient
,
COSMOS_DB
,
CONTAINER
,
new
SqlQuerySpec
(
"SELECT * FROM c"
),
String
.
class
,
(
short
)
3
,
1
);
assertEquals
(
3
,
results
.
size
());
assertTrue
(
results
.
contains
(
"W1"
));
assertTrue
(
results
.
contains
(
"W2"
));
assertTrue
(
results
.
contains
(
"W3"
));
mockPaginatedQueryResponse
(
2
,
3
,
"Z1"
,
"Z2"
,
"Z3"
,
"Z4"
,
"Z5"
);
results
=
CosmosFacade
.
queryItems
(
documentClient
,
COSMOS_DB
,
CONTAINER
,
new
SqlQuerySpec
(
"SELECT * FROM c"
),
String
.
class
,
(
short
)
2
,
3
);
assertEquals
(
1
,
results
.
size
());
assertTrue
(
results
.
contains
(
"Z5"
));
}
private
void
mockQueryResponse
(
String
...
responses
)
throws
IOException
{
ArrayList
<
FeedResponse
<
CosmosItemProperties
>>
paginatedResponse
=
new
ArrayList
<>();
for
(
String
response
:
responses
)
{
@SuppressWarnings
(
"unchecked"
)
FeedResponse
<
CosmosItemProperties
>
pageResponse
=
(
FeedResponse
<
CosmosItemProperties
>)
mock
(
FeedResponse
.
class
);
CosmosItemProperties
properties
=
mock
(
CosmosItemProperties
.
class
);
doReturn
(
Collections
.
singletonList
(
properties
)).
when
(
pageResponse
).
getResults
();
doReturn
(
response
).
when
(
properties
).
getObject
(
any
());
paginatedResponse
.
add
(
pageResponse
);
}
doReturn
(
paginatedResponse
.
iterator
()).
when
(
container
).
queryItems
(
any
(
SqlQuerySpec
.
class
),
any
());
}
private
void
mockPaginatedQueryResponse
(
int
pageSize
,
int
pageNum
,
String
...
responses
)
{
List
<
Document
>
resp
=
new
ArrayList
<>();
FeedResponse
<
Document
>
pageResponse
=
(
FeedResponse
<
Document
>)
mock
(
FeedResponse
.
class
);
for
(
String
response
:
responses
)
{
Document
doc
=
mock
(
Document
.
class
);
resp
.
add
(
doc
);
lenient
().
doReturn
(
Collections
.
singletonList
(
doc
)).
when
(
pageResponse
).
getResults
();
lenient
().
doReturn
(
response
).
when
(
doc
).
toObject
(
any
());
}
when
(
pageResponse
.
getResults
()).
thenReturn
(
currentPage
(
resp
,
pageSize
,
pageNum
));
doReturn
(
Flux
.
just
(
pageResponse
))
.
when
(
documentClient
)
.
queryDocuments
(
eq
(
COLLECTION_LINK
),
any
((
SqlQuerySpec
.
class
)),
any
());
}
private
static
List
<
Document
>
currentPage
(
List
<
Document
>
dataList
,
int
pageSize
,
int
pageNum
)
{
List
<
Document
>
currentPageList
=
new
ArrayList
<>();
if
(
dataList
!=
null
&&
dataList
.
size
()
>
0
)
{
int
currIdx
=
(
pageNum
>
1
?
(
pageNum
-
1
)
*
pageSize
:
0
);
for
(
int
i
=
0
;
i
<
pageSize
&&
i
<
dataList
.
size
()
-
currIdx
;
i
++)
{
currentPageList
.
add
(
dataList
.
get
(
currIdx
+
i
));
}
}
return
currentPageList
;
}
}
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment