Commit a9e59442 authored by Stephen Nimmo's avatar Stephen Nimmo
Browse files

Merge branch '1-implement-topic-creation' into 'master'

Continue implementing the stream registration. It now works with the storage...

Closes #1

See merge request !8
parents 827da03b 4ae0321a
Pipeline #72145 failed with stages
in 3 minutes and 11 seconds
OSDU_AUTH_REFRESH_TOKEN=<need to login and obtain refresh token in the first place>
# change the lines below depending on the cloud provider
OSDU_AUTH_SERVER=login.microsoftonline.com
OSDU_AUTH_TOKEN_URL=/<tenant>/oauth2/v2.0/token
\ No newline at end of file
/target/
!.mvn/wrapper/maven-wrapper.jar
.env
application-local.properties
### STS ###
.apt_generated
......@@ -23,3 +25,10 @@
/dist/
/nbdist/
/.nb-gradle/
### VSCode
.vscode
### Python environment
.venv
__pycache__
\ No newline at end of file
### Java Spring template project
# Streaming Administration Service
This project is based on a GitLab [Project Template](https://docs.gitlab.com/ee/gitlab-basics/create-project.html).
## Summary
This service provides APIs to register, start and stop streams in OSDU data platform. It currently allows registration and manipulation of 3 types of streams:
Improvements can be proposed in the [original project](https://gitlab.com/gitlab-org/project-templates/spring).
* **source** - typically a Kafka producer that connects to external system in order to receive streaming data and ingest it into OSDU data platform (e.g. WITSML/ETP producer).
* **processor** - typically a Kafka Streams application that reads streaming data ingested into OSDU data platform, process it and route it to prepared topics for downstream applications consumption (e.g. aggregate or filter ingested data).
* **sink** - typically a Kafka consumer application that reads streaming data from a prepared topic and consumes it (e.g. visualize or persist in a database).
### CI/CD with Auto DevOps
## More details
This template is compatible with [Auto DevOps](https://docs.gitlab.com/ee/topics/autodevops/).
### API
[OpenAPI Specification](/docs/api/streaming.openapi.yaml)
If Auto DevOps is not already enabled for this project, you can [turn it on](https://docs.gitlab.com/ee/topics/autodevops/#enabling-auto-devops) in the project settings.
\ No newline at end of file
### Schema
[Stream Dataset schemas](/docs/data/schemas)
[Stream.Generic.1.0.0.json - record example](/docs/data/examples/Stream.Generic.1.0.0.json)
## Configure
Before you can run and test the service locally there are a few things that need to be configured:
1. Verify that the _local_ profile is configured as active for spring boot. Check that [application.properties](/src/main/resources/application.properties) has the following setting:
```
# Profile
spring.profiles.active=local
```
2. Create a new file in the same folder and name it _application-local.properties_ with the following settings:
```
# OSDU
osdu.storage.api=https://<service-host>/api/storage/v2
```
>Note: Do not check in your _application-local.properties_ files (or any other sensitive information) to gitlab!
>Note: replace _service-host_ with the proper value of the storage service that you are working with.
>Note: Stream.Generic.1.0.0 schema must be registered with the Schema service before you can start working with Streams
>> TODO: Implement schema registration helper!
## Build and Run
Once you are in the project root directory. Execute the following commands:
To build the executable jar file.
``` bash
mvn clean package
```
Executing the above command generates the controllers interfaces based on the streaming api spec(open api format). The spec is part of the doc folder under the source tree. The generation process is controlled by the maven plugin and is configured in the pom.xml (see [configuration options reference](https://openapi-generator.tech/docs/generators/spring))
To run the executable as a spring boot application
``` bash
mvn spring-boot:run
```
## Test
When you launch Spring Boot application it typically provides a few lines of the start up information. Make sure the following lines are there:
```
2021-09-30 00:06:31.339 DEBUG 2067 --- [ restartedMain] o.o.o.s.StreamingAdminApplication : Running with Spring Boot v2.5.5, Spring v5.3.10
2021-09-30 00:06:31.339 INFO 2067 --- [ restartedMain] o.o.o.s.StreamingAdminApplication : The following profiles are active: local
```
This indicates that your _local_ profile is activated.
```
2021-09-30 00:06:41.227 INFO 2067 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '/api/streaming/v1'
```
This indicates that your Tomcat has started and is available on the following link:
http://localhost:8080/api/streaming/v1
The embedded Swagger API documentation is also available on the following link:
http://localhost:8080/api/streaming/v1/swagger-ui/
Now you can use either Postman or [stream.helpers](/scripts) to test the API.
\ No newline at end of file
version: '3.8'
services:
zookeeper:
image: strimzi/kafka:0.20.0-kafka-2.6.0
command: [
"sh", "-c", "bin/zookeeper-server-start.sh config/zookeeper.properties"
]
ports:
- "2181:2181"
environment:
LOG_DIR: /tmp/logs
kafka:
image: strimzi/kafka:0.20.0-kafka-2.6.0
command: [
"sh", "-c", "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}"
]
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
LOG_DIR: "/tmp/logs"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
\ No newline at end of file
openapi: 3.0.0
info:
version: 1.0.0
title: OSDU Streaming Services
license:
name: Apache 2.0
url: "http://www.apache.org/licenses/LICENSE-2.0"
servers:
- url: "http://localhost:8080/api/streaming/v1"
description: "Local development server"
tags:
- name: stream-admin-api
description: Stream Admin API
paths:
/streams:
get:
summary: "List all registered streams"
operationId: listAllStreams
tags:
- stream-admin-api
parameters:
- name: data-partition-id
in: header
description: "tenant"
required: true
example: "opendes"
schema:
type: string
responses:
"200":
description: OK
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/StreamRecord"
"401":
description: Unauthorized
"403":
description: Forbidden
default:
description: Unexpected error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/stream:
post:
summary: "Register a new stream"
operationId: createNewStream
tags:
- stream-admin-api
parameters:
- name: data-partition-id
in: header
description: "tenant"
required: true
example: "opendes"
schema:
type: string
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/StreamRecord"
responses:
"201":
description: Created
content:
application/json:
schema:
type: string
description: "ID of the stream created"
"401":
description: Unauthorized
"403":
description: Forbidden
default:
description: Unexpected error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/stream/{id}:
get:
summary: "Get stream definition by Id"
operationId: getStreamById
tags:
- stream-admin-api
parameters:
- name: data-partition-id
in: header
description: "tenant"
required: true
example: "opendes"
schema:
type: string
- name: id
in: path
description: "Stream ID"
required: true
example: "opendes:dataset--Stream.Generic:4dc4e8ec354e4953b6968fcb1d9d9f38"
schema:
type: string
responses:
"200":
description: OK
content:
application/json:
schema:
$ref: "#/components/schemas/StreamRecord"
"401":
description: Unauthorized
"403":
description: Forbidden
"404":
description: Not found
default:
description: Unexpected error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
delete:
summary: "Delete stream definition by Id and decommision all associated compute resources"
operationId: deleteStreamById
tags:
- stream-admin-api
parameters:
- name: data-partition-id
in: header
description: "tenant"
required: true
example: "opendes"
schema:
type: string
- name: id
in: path
description: "Stream ID"
required: true
example: "opendes:dataset--Stream.Generic:4dc4e8ec354e4953b6968fcb1d9d9f38"
schema:
type: string
responses:
"204":
description: Stream deleted
"401":
description: Unauthorized
"403":
description: Forbidden
"404":
description: Not found
default:
description: Unexpected error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/stream/{id}/start:
get:
summary: "Start live streaming for the specified stream Id"
operationId: startStreamById
tags:
- stream-admin-api
parameters:
- name: data-partition-id
in: header
description: "tenant"
required: true
example: "opendes"
schema:
type: string
- name: id
in: path
description: "Stream ID"
required: true
example: "opendes:dataset--Stream.Generic:4dc4e8ec354e4953b6968fcb1d9d9f38"
schema:
type: string
responses:
"200":
description: Start command performed successfully
content:
application/json:
schema:
$ref: "#/components/schemas/StreamStatus"
"401":
description: Unauthorized
"403":
description: Forbidden
"404":
description: Not found
default:
description: Unexpected error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/stream/{id}/stop:
get:
summary: "Stop live streaming for the specified stream Id"
operationId: stopStreamById
tags:
- stream-admin-api
parameters:
- name: data-partition-id
in: header
description: "tenant"
required: true
example: "opendes"
schema:
type: string
- name: id
in: path
description: "Stream ID"
required: true
example: "opendes:dataset--Stream.Generic:4dc4e8ec354e4953b6968fcb1d9d9f38"
schema:
type: string
responses:
"200":
description: Stop command performed successfully
content:
application/json:
schema:
$ref: "#/components/schemas/StreamStatus"
"401":
description: Unauthorized
"403":
description: Forbidden
"404":
description: Not found
default:
description: Unexpected error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
/stream/{id}/info:
get:
summary: "Get the current status of the stream by Id"
operationId: getStreamStatusById
tags:
- stream-admin-api
parameters:
- name: data-partition-id
in: header
description: "tenant"
required: true
example: "opendes"
schema:
type: string
- name: id
in: path
description: "Stream ID"
required: true
example: "opendes:dataset--Stream.Generic:4dc4e8ec354e4953b6968fcb1d9d9f38"
schema:
type: string
responses:
"200":
description: Info command performed successfully
content:
application/json:
schema:
$ref: "#/components/schemas/StreamStatus"
"401":
description: Unauthorized
"403":
description: Forbidden
"404":
description: Not found
default:
description: Unexpected error
content:
application/json:
schema:
$ref: "#/components/schemas/Error"
components:
schemas:
StreamRecord:
type: object
required:
- id
- kind
- data
- acl
- legal
- type
- namespace
properties:
id:
type: string
pattern: '^[\\w\\-\\.]+:dataset\\-\\-Stream.Generic:[\\w\\-\\.\\:\\%]+'
example: "opendes:dataset--Stream.Generic:4dc4e8ec354e4953b6968fcb1d9d9f38"
description: "Unique identifier in whole Data Ecosystem. When not provided, Data Ecosystem will create and assign an id to the record."
kind:
type: string
example: "osdu:wks:dataset--Stream.Generic:1.0.0"
description: "Record kind for which the schema information is applied to."
data:
$ref: "#/components/schemas/StreamDataset"
namespace:
type: string
example: "opendes:osdu"
version:
type: integer
format: int64
example: 1605804087572060
acl:
$ref: "#/components/schemas/Acl"
legal:
$ref: "#/components/schemas/Legal"
tags:
$ref: "#/components/schemas/Tags"
meta:
type: array
items:
$ref: "#/components/schemas/Map"
ancestry:
$ref: "#/components/schemas/RecordAncestry"
StreamDataset:
type: object
properties:
ResourceSecurityClassification:
type: string
example: "opendes:reference-data--ResourceSecurityClassification:RESTRICTED:"
Source:
type: string
example: "Contoso Inc."
Name:
type: string
example: "Example Well Log Kafka Stream"
Description:
type: string
example: "This stream follows data for one Well Log and routes it from source to sink with no additional filtering"
DatasetProperties:
type: object
properties:
StreamType:
type: string
enum:
- source
- processor
- sink
description: 'Type of the stream. Enumeration: "source", "processor", "sink"'
example: "processor"
StreamDefinition:
$ref: "#/components/schemas/StreamDefinition"
ExtensionProperties:
type: object
StreamDefinition:
type: object
required:
- SubscribeIDs
properties:
SourceBindings:
type: array
items:
type: string
example: "opendes_wks_work-product-component--WellLog_1.0.0"
SinkBindings:
type: array
items:
type: string
example: "opendes_4dc4e8ec354e4953b6968fcb1d9d9f38_work-product-component--WellLog_1.0.0"
SubscribeIds:
type: array
items:
type: string
example: "opendes:work-product-component--WellLog:be54a691c0384182944d71c6b2b6f699"
Filter:
type: string
example: "TBD"
Aggregate:
type: string
example: "TBD"
StreamStatus:
type: object
properties:
Command:
type: string
enum:
- start
- stop
- info
Workers:
type: array
items:
type: object
properties:
id:
type: integer
state:
type: string
enum:
- running
- stopped
- failed
Acl:
type: object
properties:
owners:
type: array
items:
type: string
example: "data.stream-4dc4e8ec354e4953b6968fcb1d9d9f38.producer@opendes.contoso.com"
viewers:
type: array
items:
type: string
example: "data.stream-4dc4e8ec354e4953b6968fcb1d9d9f38.consumer@opendes.contoso.com"
Legal:
type: object
properties:
legaltags:
type: array
items:
type: string
example: "opendes-public-usa-dataset"
otherRelevantDataCountries:
type: array
items:
type: string
example: "US"
status:
type: string
enum:
- compliant
- incompliant
Tags:
type: object
additionalProperties:
type: object
Map:
type: object
additionalProperties:
type: object
RecordAncestry:
type: object
properties:
parents:
type: array
items:
type: string
Error:
type: object
required:
- code
- message
properties:
code:
type: integer
format: int32
message:
type: string
securitySchemes:
JWT:
type: http
scheme: bearer
bearerFormat: JWT
security:
- JWT:
- global
{
"id": "opendes:dataset--Stream.Generic:4dc4e8ec354e4953b6968fcb1d9d9f38",
"kind": "osdu:wks:dataset--Stream.Generic:1.0.0",
"acl": {
"owners": [
"data.stream-4dc4e8ec354e4953b6968fcb1d9d9f38.producers@opendes.contoso.com"
],
"viewers": [