Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in / Register
Toggle navigation
Menu
Open sidebar
Open Subsurface Data Universe Software
Platform
Data Flow
Real Time
Streams
Stream Admin Service
Commits
360ffc3f
Commit
360ffc3f
authored
Oct 28, 2021
by
Stephen Nimmo
Browse files
Cleaned up resources after test, fixed no envmap bug.
parent
854d7aa9
Pipeline
#74688
failed with stage
in 1 minute and 26 seconds
Changes
4
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
src/main/java/org/opengroup/osdu/streaming/service/DeploymentAdminService.java
View file @
360ffc3f
...
...
@@ -16,4 +16,6 @@ public interface DeploymentAdminService {
void
deleteStreamDeployment
(
StreamRecord
streamRecord
);
void
deleteAllStreamDeployments
();
}
src/main/java/org/opengroup/osdu/streaming/service/DeploymentAdminServiceImpl.java
View file @
360ffc3f
...
...
@@ -7,6 +7,8 @@ import io.kubernetes.client.openapi.models.*;
import
io.kubernetes.client.util.PatchUtils
;
import
org.opengroup.osdu.streaming.exception.StreamAdminException
;
import
org.opengroup.osdu.streaming.model.StreamRecord
;
import
org.slf4j.Logger
;
import
org.slf4j.LoggerFactory
;
import
org.springframework.beans.factory.annotation.Value
;
import
org.springframework.stereotype.Service
;
import
org.springframework.web.context.annotation.RequestScope
;
...
...
@@ -20,13 +22,18 @@ import java.util.stream.Collectors;
@RequestScope
public
class
DeploymentAdminServiceImpl
implements
DeploymentAdminService
{
private
static
final
String
PATCH_REPLICAS
=
"[{\"op\":\"replace\",\"path\":\"/spec/replicas\",\"value\":%s}]"
;
private
static
final
Logger
LOGGER
=
LoggerFactory
.
getLogger
(
DeploymentAdminServiceImpl
.
class
)
;
private
AppsV1Api
appsV1Api
;
private
String
namespace
;
private
static
final
String
PATCH_REPLICAS
=
"[{\"op\":\"replace\",\"path\":\"/spec/replicas\",\"value\":%s}]"
;
private
static
final
String
DEPLOYMENT_NAME_SUFFIX
=
"-deployment"
;
private
static
final
String
CONTAINER_NAME_SUFFIX
=
"-container"
;
private
static
final
String
SELECTOR_MATCH_LABEL_KEY
=
"run"
;
private
static
final
String
LABEL_TYPE_NAME
=
"osdu-streams-type"
;
private
static
final
String
LABEL_TYPE_VALUE
=
"osdu-streams-deployment"
;
private
static
final
String
LABEL_SELECTOR
=
String
.
format
(
"%s=%s"
,
LABEL_TYPE_NAME
,
LABEL_TYPE_VALUE
);
private
AppsV1Api
appsV1Api
;
private
String
namespace
;
public
DeploymentAdminServiceImpl
(
@Value
(
"${deployment.namespace}"
)
String
namespace
,
AppsV1Api
appsV1Api
)
{
this
.
namespace
=
namespace
;
...
...
@@ -59,6 +66,7 @@ public class DeploymentAdminServiceImpl implements DeploymentAdminService {
.
withNewMetadata
()
.
withName
(
deploymentName
)
.
withNamespace
(
this
.
namespace
)
.
withLabels
(
Collections
.
singletonMap
(
LABEL_TYPE_NAME
,
LABEL_TYPE_VALUE
))
.
endMetadata
()
.
withNewSpec
()
.
withReplicas
(
0
)
...
...
@@ -110,7 +118,7 @@ public class DeploymentAdminServiceImpl implements DeploymentAdminService {
}
private
V1Deployment
patchReplicas
(
String
deploymentName
,
int
replicas
)
{
String
patchString
=
String
.
format
(
PATCH_REPLICAS
,
1
);
String
patchString
=
String
.
format
(
PATCH_REPLICAS
,
replicas
);
try
{
return
PatchUtils
.
patch
(
V1Deployment
.
class
,
...
...
@@ -142,9 +150,17 @@ public class DeploymentAdminServiceImpl implements DeploymentAdminService {
}
}
public
void
deleteAllStreamDeployments
()
{
try
{
appsV1Api
.
deleteCollectionNamespacedDeployment
(
this
.
namespace
,
null
,
null
,
null
,
null
,
null
,
LABEL_SELECTOR
,
null
,
null
,
null
,
null
,
null
,
null
,
null
);
}
catch
(
ApiException
e
)
{
throw
new
StreamAdminException
(
e
);
}
}
private
Optional
<
V1Deployment
>
findExistingDeployment
(
String
deploymentName
)
{
try
{
V1DeploymentList
v1DeploymentList
=
appsV1Api
.
listNamespacedDeployment
(
this
.
namespace
,
null
,
null
,
null
,
null
,
null
,
null
,
null
,
null
,
null
,
null
);
V1DeploymentList
v1DeploymentList
=
appsV1Api
.
listNamespacedDeployment
(
this
.
namespace
,
null
,
null
,
null
,
null
,
LABEL_SELECTOR
,
null
,
null
,
null
,
null
,
null
);
return
v1DeploymentList
.
getItems
().
stream
().
filter
(
v1Deployment
->
v1Deployment
.
getMetadata
().
getName
().
equals
(
deploymentName
)).
findFirst
();
}
catch
(
ApiException
e
)
{
throw
new
StreamAdminException
(
e
);
...
...
@@ -197,7 +213,7 @@ public class DeploymentAdminServiceImpl implements DeploymentAdminService {
private
List
<
V1EnvVar
>
getEnvVar
(
StreamRecord
streamRecord
)
{
Map
<
String
,
String
>
envMap
=
streamRecord
.
getData
().
getDatasetProperties
().
getExtensionProperties
().
getStreamDeployment
().
getEnv
();
if
(
envMap
.
isEmpty
())
{
if
(
Objects
.
isNull
(
envMap
)
||
envMap
.
isEmpty
())
{
return
Collections
.
emptyList
();
}
return
envMap
.
entrySet
().
stream
().
map
(
entry
->
...
...
src/main/java/org/opengroup/osdu/streaming/service/StreamDeploymentStatus.java
View file @
360ffc3f
...
...
@@ -11,6 +11,7 @@ public class StreamDeploymentStatus {
private
String
deploymentName
;
private
String
namespace
;
//TODO This isn't really the status, but rather a reflection of the replicas count.
private
Status
status
;
private
LocalDateTime
dateTime
;
...
...
src/test/java/org/opengroup/osdu/streaming/service/DeploymentAdminServiceTest.java
View file @
360ffc3f
package
org.opengroup.osdu.streaming.service
;
import
org.junit.jupiter.api.AfterEach
;
import
org.junit.jupiter.api.Test
;
import
org.opengroup.osdu.streaming.model.*
;
import
org.springframework.beans.factory.annotation.Autowired
;
import
org.springframework.boot.test.context.SpringBootTest
;
import
java.util.Collections
;
import
java.util.Optional
;
import
java.util.UUID
;
import
java.util.*
;
import
static
org
.
assertj
.
core
.
api
.
Assertions
.
assertThat
;
...
...
@@ -17,6 +16,19 @@ public class DeploymentAdminServiceTest {
@Autowired
private
DeploymentAdminService
deploymentAdminService
;
@AfterEach
public
void
clean
()
{
deploymentAdminService
.
deleteAllStreamDeployments
();
}
@Test
public
void
createStreamDeploymentNoEnv
()
{
StreamRecord
streamRecord
=
this
.
createRandomStreamDeployment
(
false
);
Optional
<
StreamDeploymentStatus
>
streamDeploymentStatus
=
deploymentAdminService
.
findStreamDeploymentStatus
(
streamRecord
);
assertThat
(
streamDeploymentStatus
.
isPresent
()).
isTrue
();
assertThat
(
streamDeploymentStatus
.
get
().
getStatus
()).
isEqualTo
(
StreamDeploymentStatus
.
Status
.
STOPPED
);
}
@Test
public
void
getStreamDeploymentStatusNotDeployed
()
{
StreamRecord
streamRecord
=
this
.
createRandomStreamRecord
();
...
...
@@ -25,40 +37,52 @@ public class DeploymentAdminServiceTest {
}
@Test
public
void
createStreamDeploymentAndStartStreamDeploymentAndDelete
()
{
StreamRecord
streamRecord
=
null
;
try
{
streamRecord
=
this
.
createRandomStreamDeployment
();
StreamDeploymentStatus
streamDeploymentStatus
=
deploymentAdminService
.
startStreamDeployment
(
streamRecord
);
assertThat
(
streamDeploymentStatus
).
isNotNull
();
assertThat
(
streamDeploymentStatus
.
getStatus
()).
isEqualTo
(
StreamDeploymentStatus
.
Status
.
RUNNING
);
}
finally
{
deploymentAdminService
.
deleteStreamDeployment
(
streamRecord
);
}
public
void
createAndStartStreamDeployment
()
{
StreamRecord
streamRecord
=
this
.
createRandomStreamDeployment
();
StreamDeploymentStatus
streamDeploymentStatus
=
deploymentAdminService
.
startStreamDeployment
(
streamRecord
);
assertThat
(
streamDeploymentStatus
).
isNotNull
();
assertThat
(
streamDeploymentStatus
.
getStatus
()).
isEqualTo
(
StreamDeploymentStatus
.
Status
.
RUNNING
);
}
@Test
public
void
createStreamDeploymentAndStopStreamDeploymentAndDelete
()
{
StreamRecord
streamRecord
=
null
;
try
{
streamRecord
=
this
.
createRandomStreamDeployment
();
StreamDeploymentStatus
streamDeploymentStatus
=
deploymentAdminService
.
stopStreamDeployment
(
streamRecord
);
assertThat
(
streamDeploymentStatus
).
isNotNull
();
assertThat
(
streamDeploymentStatus
.
getStatus
()).
isEqualTo
(
StreamDeploymentStatus
.
Status
.
STOPPED
);
}
finally
{
deploymentAdminService
.
deleteStreamDeployment
(
streamRecord
);
}
public
void
createAndStopStreamDeployment
()
{
StreamRecord
streamRecord
=
this
.
createRandomStreamDeployment
();
StreamDeploymentStatus
streamDeploymentStatus
=
deploymentAdminService
.
startStreamDeployment
(
streamRecord
);
assertThat
(
streamDeploymentStatus
).
isNotNull
();
assertThat
(
streamDeploymentStatus
.
getStatus
()).
isEqualTo
(
StreamDeploymentStatus
.
Status
.
RUNNING
);
streamDeploymentStatus
=
deploymentAdminService
.
stopStreamDeployment
(
streamRecord
);
assertThat
(
streamDeploymentStatus
).
isNotNull
();
assertThat
(
streamDeploymentStatus
.
getStatus
()).
isEqualTo
(
StreamDeploymentStatus
.
Status
.
STOPPED
);
}
@Test
public
void
createStreamDeploymentAndDelete
()
{
StreamRecord
streamRecord
=
this
.
createRandomStreamDeployment
();
Optional
<
StreamDeploymentStatus
>
streamDeploymentStatus
=
deploymentAdminService
.
findStreamDeploymentStatus
(
streamRecord
);
assertThat
(
streamDeploymentStatus
.
isPresent
()).
isTrue
();
assertThat
(
streamDeploymentStatus
.
get
().
getStatus
()).
isEqualTo
(
StreamDeploymentStatus
.
Status
.
STOPPED
);
this
.
deploymentAdminService
.
deleteStreamDeployment
(
streamRecord
);
streamDeploymentStatus
=
deploymentAdminService
.
findStreamDeploymentStatus
(
streamRecord
);
assertThat
(
streamDeploymentStatus
.
isPresent
()).
isFalse
();
}
private
StreamRecord
createRandomStreamDeployment
()
{
StreamRecord
streamRecord
=
this
.
createRandomStreamRecord
();
return
this
.
createRandomStreamDeployment
(
true
);
}
private
StreamRecord
createRandomStreamDeployment
(
boolean
addEnv
)
{
StreamRecord
streamRecord
=
this
.
createRandomStreamRecord
(
addEnv
);
StreamDeploymentStatus
streamDeploymentStatus
=
deploymentAdminService
.
createStreamDeployment
(
streamRecord
);
assertThat
(
streamDeploymentStatus
).
isNotNull
();
assertThat
(
streamDeploymentStatus
.
getStatus
()).
isEqualTo
(
StreamDeploymentStatus
.
Status
.
STOPPED
);
return
streamRecord
;
}
private
StreamRecord
createRandomStreamRecord
()
{
private
StreamRecord
createRandomStreamRecord
(){
return
this
.
createRandomStreamRecord
(
true
);
}
private
StreamRecord
createRandomStreamRecord
(
boolean
addEnv
)
{
StreamRecord
streamRecord
=
new
StreamRecord
();
streamRecord
.
setId
(
this
.
createRandomId
());
streamRecord
.
setKind
(
UUID
.
randomUUID
().
toString
());
...
...
@@ -67,8 +91,10 @@ public class DeploymentAdminServiceTest {
StreamDatasetDatasetProperties
streamDatasetDatasetProperties
=
new
StreamDatasetDatasetProperties
();
StreamDatasetDatasetPropertiesExtensionProperties
streamDatasetDatasetPropertiesExtensionProperties
=
new
StreamDatasetDatasetPropertiesExtensionProperties
();
StreamDatasetDatasetPropertiesExtensionPropertiesStreamDeployment
deployment
=
new
StreamDatasetDatasetPropertiesExtensionPropertiesStreamDeployment
();
deployment
.
setImage
(
"busybox"
);
deployment
.
setEnv
(
Collections
.
singletonMap
(
"ENV1"
,
"ENV1_VALUE"
));
deployment
.
setImage
(
"nginx:latest"
);
if
(
addEnv
)
{
deployment
.
setEnv
(
Collections
.
singletonMap
(
"ENV1"
,
"ENV1_VALUE"
));
}
streamDatasetDatasetPropertiesExtensionProperties
.
setStreamDeployment
(
deployment
);
streamDatasetDatasetProperties
.
setExtensionProperties
(
streamDatasetDatasetPropertiesExtensionProperties
);
streamDataset
.
setDatasetProperties
(
streamDatasetDatasetProperties
);
...
...
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment