Commit ed4edfb0 authored by Dmitry Kniazev's avatar Dmitry Kniazev
Browse files

Merge branch 'definition/create_stream' into 'develop'

Continue implementing the stream registration

See merge request !4
parents cf3a443d 2be21151
Pipeline #69405 failed with stages
in 3 minutes and 14 seconds
......@@ -31,3 +31,4 @@ application-local.properties
### Python environment
.venv
__pycache__
\ No newline at end of file
......@@ -48,7 +48,7 @@ To build the executable jar file.
``` bash
mvn clean package
```
Executing the above command generates the controllers interfaces based on the streaming api spec(open api format). The spec is part of the doc folder under the source tree.
Executing the above command generates the controllers interfaces based on the streaming api spec(open api format). The spec is part of the doc folder under the source tree. The generation process is controlled by the maven plugin and is configured in the pom.xml (see [configuration options reference](https://openapi-generator.tech/docs/generators/spring))
To run the executable as a spring boot application
......
openapi: 3.0.0
info:
version: 1.0.0
title: OSDU Streaming Service
title: OSDU Streaming Services
license:
name: Apache 2.0
url: "http://www.apache.org/licenses/LICENSE-2.0"
servers:
- url: "https://localhost:8080/api/streaming/v1"
- url: "http://localhost:8080/api/streaming/v1"
description: "Local development server"
tags:
- name: stream-admin-api
description: Stream Admin API
......@@ -62,7 +63,7 @@ paths:
content:
application/json:
schema:
$ref: "#/components/schemas/StreamDataset"
$ref: "#/components/schemas/StreamRecord"
responses:
"201":
description: Created
......@@ -311,6 +312,8 @@ components:
$ref: "#/components/schemas/Acl"
legal:
$ref: "#/components/schemas/Legal"
tags:
$ref: "#/components/schemas/Tags"
meta:
type: array
items:
......@@ -341,10 +344,12 @@ components:
- source
- processor
- sink
description: 'Type of the stream. Enumeration: "SOURCE", "PROCESSOR", "SINK"'
description: 'Type of the stream. Enumeration: "source", "processor", "sink"'
example: "processor"
StreamDefinition:
$ref: "#/components/schemas/StreamDefinition"
ExtensionProperties:
type: object
StreamDefinition:
type: object
required:
......@@ -360,7 +365,7 @@ components:
items:
type: string
example: "opendes_4dc4e8ec354e4953b6968fcb1d9d9f38_work-product-component--WellLog_1.0.0"
SubscribeIDs:
SubscribeIds:
type: array
items:
type: string
......@@ -424,6 +429,10 @@ components:
enum:
- compliant
- incompliant
Tags:
type: object
additionalProperties:
type: object
Map:
type: object
additionalProperties:
......
......@@ -53,6 +53,11 @@
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
......@@ -132,6 +137,7 @@
<configOptions>
<delegatePattern>false</delegatePattern>
<interfaceOnly>true</interfaceOnly>
<useBeanValidation>false</useBeanValidation>
</configOptions>
</configuration>
</execution>
......
Jinja2==3.0.1
MarkupSafe==2.0.1
pkg-resources==0.0.0
python-dotenv==0.19.0
from http import HTTPStatus
import http.client
import json
import argparse
import os
from dotenv import load_dotenv
from jinja2 import Environment, FileSystemLoader, select_autoescape
env = Environment(
loader=FileSystemLoader(searchpath="./templates"),
autoescape=select_autoescape())
def refresh_token():
......@@ -43,7 +48,31 @@ def get_stream(stream_id):
conn.request(
"GET", f"/api/streaming/v1/stream/{stream_id}", payload, headers)
res = conn.getresponse()
jsonString = res.read()
jsonString = res.read().decode()
print(jsonString)
def create_new_stream(stream_id, template_name):
# prepare the request body from template
template = env.get_template(f"{template_name}.json")
# prepare request headers
token = os.getenv('OSDU_API_TOKEN')
auth = f"Bearer {token}"
payload = template.render(record_id=stream_id)
headers = {'Content-Type': 'application/json',
'data-partition-id': 'opendes', 'Authorization': auth}
# make POST request to create a new stream
conn = http.client.HTTPConnection("localhost:8080")
conn.request(
"POST", f"/api/streaming/v1/stream", payload, headers)
res = conn.getresponse()
jsonString = res.read().decode()
if res.status == HTTPStatus.CREATED:
print("Stream successfully created - see full response below:")
print(jsonString)
else:
print(
f"Received the following status from server: {res.status}. See full response below:")
print(jsonString)
......@@ -56,11 +85,14 @@ parser.add_argument('command', type=str, choices=['refresh', 'get', 'post'],
help='command to execute')
parser.add_argument('--record-id', type=str, help='id of the stream record, e.g. opendes:dataset--Stream.Generic:12345',
default='opendes:dataset--Stream.Generic:12345')
parser.add_argument('--template', type=str, help='template for the stream record, e.g. source, processor, sink or your custom',
default='processor')
# main program here
args = parser.parse_args()
print(
f"Running helper [{args.helper}] with the following command [{args.command}]")
print(f"{args}")
print(f"{args._get_args}")
if args.helper == "token":
if args.command == "refresh":
......@@ -69,10 +101,9 @@ if args.helper == "token":
print(f"No such command for helper [{args.helper}]")
elif args.helper == "stream":
if args.command == "get":
if args.record_id:
get_stream(args.record_id)
else:
print(f"RECORD_ID cannot be empty with for the [{args.helper}]")
elif args.command == "post":
create_new_stream(args.record_id, args.template)
else:
print(f"No such command for helper [{args.helper}]")
else:
......
{
"id": "{{ record_id }}",
"kind": "opendes:wks:dataset--Stream.Generic:1.0.0",
"acl": {
"owners": [
"data.default.owners@opendes.contoso.com"
],
"viewers": [
"data.default.viewers@opendes.contoso.com"
]
},
"legal": {
"legaltags": [
"opendes-public-usa-dataset-epam"
],
"otherRelevantDataCountries": [
"US"
],
"status": "compliant"
},
"tags": {
"env": "dev-azure",
"auto-cleanup": "yes"
},
"ancestry": {
"parents": []
},
"meta": [],
"data": {
"ResourceSecurityClassification": "opendes:reference-data--ResourceSecurityClassification:RESTRICTED:",
"Source": "Contoso Inc.",
"Name": "Test processor stream",
"Description": "This stream is simulating the Kafka App processor that filters data and routes to myapp topic",
"DatasetProperties": {
"StreamType": "processor",
"StreamDefinition": {
"SubscribeIDs": [
"opendes:work-product-component--WellLog:be54a691c0384182944d71c6b2b6f699"
],
"SourceBindings": [
"opendes_wks_work-product-component--WellLog_1.0.0"
],
"SinkBindings": [
"opendes_myapp_work-product-component--WellLog_1.0.0"
]
}
}
}
}
\ No newline at end of file
{
"id": "{{ record_id }}",
"kind": "opendes:wks:dataset--Stream.Generic:1.0.0",
"acl": {
"owners": [
"data.default.owners@opendes.contoso.com"
],
"viewers": [
"data.default.viewers@opendes.contoso.com"
]
},
"legal": {
"legaltags": [
"opendes-public-usa-dataset-epam"
],
"otherRelevantDataCountries": [
"US"
],
"status": "compliant"
},
"tags": {
"env": "dev-azure",
"auto-cleanup": "yes"
},
"ancestry": {
"parents": []
},
"meta": [],
"data": {
"ResourceSecurityClassification": "opendes:reference-data--ResourceSecurityClassification:RESTRICTED:",
"Source": "Contoso Inc.",
"Name": "Test sink stream",
"Description": "This stream is simulating the producer application that streams data into OSDU platform",
"DatasetProperties": {
"StreamType": "source",
"StreamDefinition": {
"SubscribeIDs": [
"opendes:work-product-component--WellLog:be54a691c0384182944d71c6b2b6f699"
],
"SourceBindings": [
"wss://remote-etp.server.com"
],
"SinkBindings": [
"opendes_wks_work-product-component--WellLog_1.0.0"
]
}
}
}
}
\ No newline at end of file
......@@ -15,7 +15,6 @@
package org.opengroup.osdu.streaming.api;
import org.opengroup.osdu.streaming.StreamApi;
import org.opengroup.osdu.streaming.model.StreamDataset;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.opengroup.osdu.streaming.service.StreamingAdminService;
import org.slf4j.Logger;
......@@ -48,10 +47,10 @@ public class StreamingAdminControllerImpl implements StreamApi {
private StreamingAdminService streamingAdminService;
@Override
public ResponseEntity<String> createNewStream(String dataPartitionId, StreamDataset streamDataset) {
logger.debug("Creating a new stream: " + streamDataset.getName());
public ResponseEntity<String> createNewStream(String dataPartitionId, StreamRecord streamRecord) {
logger.debug("Creating a new stream: " + streamRecord.getData().getName());
String id = null;
id = this.streamingAdminService.createNewStream(streamDataset);
id = this.streamingAdminService.createNewStream(streamRecord);
if (id != null) {
return new ResponseEntity<String>(id, HttpStatus.CREATED);
} else {
......
......@@ -14,13 +14,12 @@
package org.opengroup.osdu.streaming.service;
import org.opengroup.osdu.streaming.model.StreamDataset;
import org.opengroup.osdu.streaming.model.StreamRecord;
public interface StreamingAdminService {
StreamRecord getStream(String streamRecordId);
String createNewStream(StreamDataset streamDataset);
String createNewStream(StreamRecord streamRecord);
}
......@@ -24,7 +24,6 @@ import org.opengroup.osdu.core.common.model.storage.StorageException;
import org.opengroup.osdu.core.common.model.storage.UpsertRecords;
import org.opengroup.osdu.core.common.storage.IStorageFactory;
import org.opengroup.osdu.core.common.storage.IStorageService;
import org.opengroup.osdu.streaming.model.StreamDataset;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -90,22 +89,30 @@ public class StreamingAdminServiceImpl implements StreamingAdminService {
// easier to expect all of these to be provided by the client, so that we can
// pass it to the storage svc without any changes...
@Override
public String createNewStream(StreamDataset streamDataset) {
public String createNewStream(StreamRecord streamRecord) {
UpsertRecords upRec = null;
try {
// get the instance of StorageService
logger.debug("Creating a storage service with headers: " + headers.getHeaders().toString());
IStorageService storageService = this.storageFactory.create(headers);
// convert StreamRecord to Record
Record rec = new Record();
rec.setId("opendes:dataset--Stream.Kafka:12345");
rec.setKind("opendes:wks:dataset--Stream.Kafka:1.0.0");
logger.debug("Attempt to create a new record with:\nRecordId: " + rec.getId() + "\nRecordKind: "
+ rec.getKind());
String jsonRepr = new String();
jsonRepr = objectMapper.writeValueAsString(streamRecord);
logger.debug("Extracted record: \n" + jsonRepr);
rec = objectMapper.readValue(jsonRepr, Record.class);
// try inserting a new record to storage
upRec = storageService.upsertRecord(rec);
} catch (StorageException e) {
logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse());
} catch (JsonProcessingException e) {
logger.error("Got exception: " + e.getMessage() + "\nLocation" + e.getLocation().toString());
}
if (upRec != null && upRec.getRecordCount() > 0) {
return upRec.getRecordIds().get(0);
} else
......
package com.example.demo;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.boot.test.web.client.TestRestTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.junit4.SpringRunner;
import static org.assertj.core.api.Assertions.assertThat;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class DemoApplicationTests {
@Test
public void contextLoads() {
}
@Autowired
private TestRestTemplate restTemplate;
@Test
public void homeResponse() {
String body = this.restTemplate.getForObject("/", String.class);
assertThat(body).isEqualTo("Spring is here!");
}
}
package org.opengroup.osdu.streaming.api;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.test.web.reactive.server.WebTestClient;
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class StreamingAdminControllerTest {
final String uri = "/stream/";
final String stream_id = "opendes:dataset--Stream.Generic:12345";
// TODO add support for requests with teh valid tokens
@Autowired
private WebTestClient webTestClient;
// should fail since no required headers
@Test
void getStreamById_expectHttp400() {
webTestClient.get().uri(uri + stream_id).exchange().expectStatus().isBadRequest();
}
// should fail since no JWT token
@Test
void getStreamById_expectHttp401() {
webTestClient.get().uri(uri + stream_id).header("data-partition-id", "opendes").exchange().expectStatus()
.isUnauthorized();
}
// should fail since JWT token is invalid
@Test
void getStreamById_expectHttp403() {
webTestClient.get().uri(uri + stream_id).header("data-partition-id", "opendes")
.header("Authorization", "Bearer eyXYZ").exchange().expectStatus().isForbidden();
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment