Commit cf3a443d authored by Dmitry Kniazev's avatar Dmitry Kniazev
Browse files

Merge branch 'definition/create_stream' into 'develop'

Initial implementation of stream creation

See merge request !3
parents 95229da6 c4cf31ff
Pipeline #68949 failed with stages
in 2 minutes and 51 seconds
OSDU_AUTH_REFRESH_TOKEN=<need to login and obtain refresh token in the first place>
# change the lines below depending on the cloud provider
OSDU_AUTH_SERVER=login.microsoftonline.com
OSDU_AUTH_TOKEN_URL=/<tenant>/oauth2/v2.0/token
\ No newline at end of file
/target/
!.mvn/wrapper/maven-wrapper.jar
.env
application-local.properties
### STS ###
.apt_generated
......@@ -25,4 +27,7 @@
/.nb-gradle/
### VSCode
.vscode
\ No newline at end of file
.vscode
### Python environment
.venv
\ No newline at end of file
# Streaming Admin Service
# Streaming Administration Service
## Introduction
This service provides APIs to configure streams in OSDU data platform.
## Summary
This service provides APIs to register, start and stop streams in OSDU data platform. It currently allows registration and manipulation of 3 types of streams:
### Build and Run
#### Locally
Once you are in the project root directory. Execute the following commands
* **source** - typically a Kafka producer that connects to external system in order to receive streaming data and ingest it into OSDU data platform (e.g. WITSML/ETP producer).
* **processor** - typically a Kafka Streams application that reads streaming data ingested into OSDU data platform, process it and route it to prepared topics for downstream applications consumption (e.g. aggregate or filter ingested data).
* **sink** - typically a Kafka consumer application that reads streaming data from a prepared topic and consumes it (e.g. visualize or persist in a database).
To build the executable.
## More details
### API
[OpenAPI Specification](/docs/api/streaming.openapi.yaml)
### Schema
[Stream Dataset schemas](/docs/data/schemas)
[Stream.Generic.1.0.0.json - record example](/docs/data/examples/Stream.Generic.1.0.0.json)
## Configure
Before you can run and test the service locally there are a few things that need to be configured:
1. Verify that the _local_ profile is configured as active for spring boot. Check that [application.properties](/src/main/resources/application.properties) has the following setting:
```
# Profile
spring.profiles.active=local
```
2. Create a new file in the same folder and name it _application-local.properties_ with the following settings:
```
# OSDU
osdu.storage.api=https://<service-host>/api/storage/v2
```
>Note: Do not check in your _application-local.properties_ files (or any other sensitive information) to gitlab!
>Note: replace _service-host_ with the proper value of the storage service that you are working with.
>Note: Stream.Generic.1.0.0 schema must be registered with the Schema service before you can start working with Streams
>> TODO: Implement schema registration helper!
## Build and Run
Once you are in the project root directory. Execute the following commands:
To build the executable jar file.
``` bash
mvn clean package
```
Executing the above command generates the implementation based on the streaming api spec(open api format). The spec is part of the doc folder under the source tree.
Executing the above command generates the controllers interfaces based on the streaming api spec(open api format). The spec is part of the doc folder under the source tree.
To run the executable as a spring boot application
``` bash
mvn spring-boot:run
```
\ No newline at end of file
```
## Test
When you launch Spring Boot application it typically provides a few lines of the start up information. Make sure the following lines are there:
```
2021-09-30 00:06:31.339 DEBUG 2067 --- [ restartedMain] o.o.o.s.StreamingAdminApplication : Running with Spring Boot v2.5.5, Spring v5.3.10
2021-09-30 00:06:31.339 INFO 2067 --- [ restartedMain] o.o.o.s.StreamingAdminApplication : The following profiles are active: local
```
This indicates that your _local_ profile is activated.
```
2021-09-30 00:06:41.227 INFO 2067 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '/api/streaming/v1'
```
This indicates that your Tomcat has started and is available on the following link:
http://localhost:8080/api/streaming/v1
The embedded Swagger API documentation is also available on the following link:
http://localhost:8080/api/streaming/v1/swagger-ui/
Now you can use either Postman or [stream.helpers](/scripts) to test the API.
\ No newline at end of file
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath/>
<relativePath />
<!-- lookup parent from repository -->
</parent>
......
# STREAM.HELPERS
**stream.helpers** is a collection of Python functions available in CLI that help automate certain things during the development, such us:
* Refreshing access token for the environment
* Creating a new stream using POST request
* Fetching an existing stream using GET request
It is essentially an alternative to Postman so feel free to use whatever works best for you. I created it because I wanted to use it in some of my future integration tests and also share some of the environment variables I'm using to create the service itself. The script loads environment variables from the `.env` file and uses them to perform other actions.
## Prepare Python environment
Install or upgrade to Python 3.7 (should work for other Python versions as long as it is 3.7+):
```bash
sudo apt-get update
sudo apt install python3.7
sudo apt install python3.7-venv
```
The last command installs the additional package to be able to use a virtual environment for our scripts. Now navigate to the root folder of your project (`/stream-admin-service`) and create the new Python environment:
```bash
python3 -m venv .venv/stream.helpers
```
Next, activate the environment:
```bash
source .venv/stream.helpers/bin/activate
```
You will now see the environment is activated as your command line will be prefixed with `stream.helpers` - any packages you install to this environment will NOT affect the rest of your system. You can deactivate it and go back to your OS's Python by running the following command:
```bash
deactivate
```
Once activated, navigate to scripts folder and install all the dependencies for the scripts:
```bash
cd scripts/
pip install -r requirements.txt
```
## Explore arguments
Run the script to see if everything is ok and to display the options:
```bash
python stream.helpers.py -h
```
This should display the help info for the script.
## Create environment variables
Next, you need to create `.env` file in the project root folder. This file will contain the environment variables that will be loaded during the script startup. Navigate to the project root folder and create the `.env` file using the example file provided:
```bash
cd ..
cp .env.example .env
```
After that, open `.env` file in your favourite editor and update the values for the environment variables:
```txt
OSDU_AUTH_REFRESH_TOKEN=<need to login and obtain refresh token in the first place>
# change the lines below depending on the cloud provider
OSDU_AUTH_SERVER=login.microsoftonline.com
OSDU_AUTH_TOKEN_URL=/<tenant>/oauth2/v2.0/token
```
> Note: once you create your `.env` file - do NOT add it to git repository (I've added it to `.gitignore`, so you probably won't anyway)
> TODO: the instructions need to be updated for AWS/Azure/GCP/IBM environments
## Run helpers
### Token helper
This helper is usng the refresh token to get the new access token:
```bash
python stream.helpers.py token refresh
```
This will produce the access token string like below:
```bash
Response code: 200
export OSDU_API_TOKEN=eyJ0eXAiOiJKV1Q...
```
Copy and paste the export statement to your terminal and export it to your environment. Alternatively, add a new line to your `.env` file with the name and value for this variable.
> Note: the access token will expire in about 30 mins to an hour, so once you start getting HTTP 401 from the OSDU services, then you need to run the helper again and re-export the access token.
### Stream helper
This helper can make Http requests to the streaming service and pass the required headers with it:
```bash
python stream.helpers.py stream get --record-id opendes:dataset--Stream.Generic:12345
```
Will execute the get stream request.
> Note: POST is not ready yet
\ No newline at end of file
python-dotenv==0.19.0
import http.client
import json
import argparse
import os
from dotenv import load_dotenv
def refresh_token():
# loading env variables - make sure to export them to your env
# check .env.example file to see the list of variables needed
refresh_token = os.getenv('OSDU_AUTH_REFRESH_TOKEN')
auth_server = os.getenv('OSDU_AUTH_SERVER')
auth_token_url = os.getenv('OSDU_AUTH_TOKEN_URL')
conn = http.client.HTTPSConnection(f'{auth_server}')
payload = f'grant_type=refresh_token&refresh_token={refresh_token}'
headers = {
'Content-Type': 'application/x-www-form-urlencoded'
}
conn.request("POST", f"{auth_token_url}", payload, headers)
# read response and parse out the access token
# this is the JWT token we need to call OSDU APIs
# it expires every 30 mins to an hour
# so we need to refresh it preiodically
response = conn.getresponse()
print(f"Response code: {response.status}")
if response.status == 200:
data = json.loads(response.read())
print(f"export OSDU_API_TOKEN={data['access_token']}")
else:
print(
f"Failed refreshing the token: {response.getcode()} {response.msg}")
def get_stream(stream_id):
token = os.getenv('OSDU_API_TOKEN')
auth = f"Bearer {token}"
conn = http.client.HTTPConnection("localhost:8080")
payload = ''
headers = {'data-partition-id': 'opendes', 'Authorization': auth}
conn.request(
"GET", f"/api/streaming/v1/stream/{stream_id}", payload, headers)
res = conn.getresponse()
jsonString = res.read()
print(jsonString)
load_dotenv()
parser = argparse.ArgumentParser(
description='Little helpers to use for Streaming API testing. They can help you with things like refreshing access tokens, posting a stream record, getting a stream record, etc.')
parser.add_argument('helper', type=str, choices=[
"token", "stream"], help='helper to use')
parser.add_argument('command', type=str, choices=['refresh', 'get', 'post'],
help='command to execute')
parser.add_argument('--record-id', type=str, help='id of the stream record, e.g. opendes:dataset--Stream.Generic:12345',
default='opendes:dataset--Stream.Generic:12345')
args = parser.parse_args()
print(
f"Running helper [{args.helper}] with the following command [{args.command}]")
print(f"{args}")
if args.helper == "token":
if args.command == "refresh":
refresh_token()
else:
print(f"No such command for helper [{args.helper}]")
elif args.helper == "stream":
if args.command == "get":
if args.record_id:
get_stream(args.record_id)
else:
print(f"RECORD_ID cannot be empty with for the [{args.helper}]")
else:
print(f"No such command for helper [{args.helper}]")
else:
print(f"Helper [{args.helper}] is not implemented")
// Copyright © 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.streaming;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Main Application Configuration
*
* @author Dmitry Kniazev
*/
@SpringBootApplication
public class StreamingAdminApplication {
public static void main(String[] args) {
SpringApplication.run(StreamingAdminApplication.class, args);
}
}
// Copyright © 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.streaming.api;
import org.opengroup.osdu.streaming.StreamApi;
......@@ -12,6 +26,18 @@ import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.annotation.RequestScope;
/**
* Streams Administration Controller implements the auto-generated interface
* based on OpenAPI 3.0
*
* @author Dmitry Kniazev
*/
// TODO: Implement security filters for all methods
// TODO: Implement handling the underlying service exceptions and prvoide the
// proper response codes
@RestController
@RequestScope
public class StreamingAdminControllerImpl implements StreamApi {
......
package org.opengroup.osdu.streaming.di;
import java.util.HashMap;
import java.util.Map;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
public class DpsHeadersFactory {
@Value("${osdu.headers.dataPartition}")
private String default_partition;
@Value("${osdu.headers.JWTtoken}")
private String token;
@Bean
public DpsHeaders dpsHeaders() {
Map<String, String> headers = new HashMap<>();
headers.put(DpsHeaders.DATA_PARTITION_ID, default_partition);
headers.put(DpsHeaders.AUTHORIZATION, token);
return new DpsHeaders().createFromMap(headers);
}
}
// Copyright © 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.streaming.di;
import com.fasterxml.jackson.databind.DeserializationFeature;
......@@ -7,6 +21,8 @@ import org.opengroup.osdu.core.common.http.json.HttpResponseBodyMapper;
import org.opengroup.osdu.core.common.storage.IStorageFactory;
import org.opengroup.osdu.core.common.storage.StorageAPIConfig;
import org.opengroup.osdu.core.common.storage.StorageFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.AbstractFactoryBean;
import org.springframework.stereotype.Component;
......@@ -14,6 +30,8 @@ import org.springframework.stereotype.Component;
@Component
public class StorageClientFactory extends AbstractFactoryBean<IStorageFactory> {
Logger logger = LoggerFactory.getLogger(StorageClientFactory.class);
private final ObjectMapper objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false);
private final HttpResponseBodyMapper bodyMapper = new HttpResponseBodyMapper(objectMapper);
......@@ -28,7 +46,7 @@ public class StorageClientFactory extends AbstractFactoryBean<IStorageFactory> {
@Override
protected IStorageFactory createInstance() throws Exception {
// return new StorageFactory(StorageAPIConfig.Default(), bodyMapper);
logger.debug("Creating instance of IStorageFactory... " + this.getClass().getName());
return new StorageFactory(StorageAPIConfig.builder().rootUrl(STORAGE_API).build(), bodyMapper);
}
......
// Copyright © 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.streaming.di;
import javax.servlet.http.HttpServletRequest;
import org.opengroup.osdu.core.common.http.DpsHeaderFactory;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.annotation.RequestScope;
/**
* Web Configuration for Streaming Services
*
* @author Dmitry Kniazev
*/
@Configuration
public class StreamingBeansConfig {
Logger logger = LoggerFactory.getLogger(StreamingBeansConfig.class);
@Bean
@RequestScope
public DpsHeaders headers(HttpServletRequest request) {
logger.debug("Getting a new instance of DpsHeaders");
return new DpsHeaderFactory(request);
}
}
// Copyright © 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.streaming.service;
import org.opengroup.osdu.streaming.model.StreamDataset;
import org.opengroup.osdu.streaming.model.StreamRecord;
public interface StreamingAdminService {
// StreamRecord createOrUpdateStream(Record record);
StreamRecord getStream(String streamRecordId);
......
// Copyright © 2021 EPAM Systems
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.opengroup.osdu.streaming.service;
import javax.inject.Inject;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.opengroup.osdu.core.common.model.http.DpsHeaders;
import org.opengroup.osdu.core.common.model.storage.Record;
......@@ -12,36 +28,67 @@ import org.opengroup.osdu.streaming.model.StreamDataset;
import org.opengroup.osdu.streaming.model.StreamRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.context.annotation.RequestScope;
/**
* Streaming Admin Service
*
* @author Dmitry Kniazev
*/
@Service
@RequestScope
public class StreamingAdminServiceImpl implements StreamingAdminService {
Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class);
// I'm not sure where the DpsHeaders instance is instantiated and injected from,
// so I temporarily added my own factory to provide one with the hardcoded
// headers information. This has to be clarified and fixed!
@Inject
// TODO: Turn FAIL_ON_UNKNOWN_PROPERTIES on and change the API interface to
// handle all the fields properly, since this is now ignoring some neccessary
// fields (like ExtensionProperties and others)
private final ObjectMapper objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
@Autowired
private DpsHeaders headers;
@Inject
@Autowired
private IStorageFactory storageFactory;
@Override
public StreamRecord getStream(String streamRecordId) {
StreamRecord sRec = null;
String jsonRepr = new String();
try {
logger.debug("Creating a storage service with headers: " + headers.getHeaders().toString());
IStorageService storageService = this.storageFactory.create(headers);
Record record = storageService.getRecord(streamRecordId);
logger.info("Created stream record: " + record);
// this is a temporary implementation until I know how to marry Records with
// StreamRecords
// TODO: replace object marshalling/unmarshalling with the proper solution
if (record != null) {
jsonRepr = objectMapper.writeValueAsString(record);
logger.debug("Retrieved stream record: " + jsonRepr);
sRec = objectMapper.readValue(jsonRepr, StreamRecord.class);