From 9108fef736700501540d007d413b483376428cfd Mon Sep 17 00:00:00 2001 From: dmitry-kniazev Date: Wed, 29 Sep 2021 16:08:35 -0500 Subject: [PATCH 1/4] Fixed the DpsHeaders issue and added Python utils --- .env.example | 4 ++ .gitignore | 6 +- pom.xml | 5 +- scripts/dev.helpers.py | 66 +++++++++++++++++++ .../streaming/StreamingAdminApplication.java | 1 + .../osdu/streaming/di/DpsHeadersFactory.java | 28 -------- .../streaming/di/StorageClientFactory.java | 6 +- .../streaming/di/StreamingBeansConfig.java | 24 +++++++ .../service/StreamingAdminServiceImpl.java | 13 ++-- src/main/resources/application.properties | 17 +++-- 10 files changed, 128 insertions(+), 42 deletions(-) create mode 100644 .env.example create mode 100644 scripts/dev.helpers.py delete mode 100644 src/main/java/org/opengroup/osdu/streaming/di/DpsHeadersFactory.java create mode 100644 src/main/java/org/opengroup/osdu/streaming/di/StreamingBeansConfig.java diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..6b767eb --- /dev/null +++ b/.env.example @@ -0,0 +1,4 @@ +OSDU_AUTH_REFRESH_TOKEN= +# change the lines below depending on the cloud provider +OSDU_AUTH_SERVER=login.microsoftonline.com +OSDU_AUTH_TOKEN_URL=//oauth2/v2.0/token \ No newline at end of file diff --git a/.gitignore b/.gitignore index 937086b..e655588 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ /target/ !.mvn/wrapper/maven-wrapper.jar +.env ### STS ### .apt_generated @@ -25,4 +26,7 @@ /.nb-gradle/ ### VSCode -.vscode \ No newline at end of file +.vscode + +### Python environment +.venv \ No newline at end of file diff --git a/pom.xml b/pom.xml index be04b06..46f0fa2 100644 --- a/pom.xml +++ b/pom.xml @@ -1,13 +1,12 @@ - + 4.0.0 org.springframework.boot spring-boot-starter-parent 2.5.5 - + diff --git a/scripts/dev.helpers.py b/scripts/dev.helpers.py new file mode 100644 index 0000000..b937fde --- /dev/null +++ b/scripts/dev.helpers.py @@ -0,0 +1,66 @@ +import http.client +import json +import argparse +import os +from dotenv import load_dotenv + +def refresh_token(): + + # loading env variables - make sure to export them to your env + # check .env.example file to see the list of variables needed + refresh_token = os.getenv('OSDU_AUTH_REFRESH_TOKEN') + auth_server = os.getenv('OSDU_AUTH_SERVER') + auth_token_url = os.getenv('OSDU_AUTH_TOKEN_URL') + + conn = http.client.HTTPSConnection(f'{auth_server}') + payload = f'grant_type=refresh_token&refresh_token={refresh_token}' + headers = { + 'Content-Type': 'application/x-www-form-urlencoded' + } + conn.request("POST", f"{auth_token_url}", payload, headers) + + # read response and parse out the access token + # this is the JWT token we need to call OSDU APIs + # it expires every 30 mins to an hour + # so we need to refresh it preiodically + response = conn.getresponse() + print(f"Response code: {response.status}") + if response.status == 200: + data = json.loads(response.read()) + print(f"export OSDU_API_TOKEN={data['access_token']}") + else: + print(f"Failed refreshing the token: {response.getcode()} {response.msg}") + + +def get_stream(stream_id): + token = os.getenv('OSDU_API_TOKEN') + auth = f"Bearer {token}" + conn = http.client.HTTPConnection("localhost:8080") + payload = '' + headers = {'data-partition-id':'opendes', 'Authorization':auth} + conn.request("GET", f"/api/streaming/v1/stream/{stream_id}", payload, headers) + res = conn.getresponse() + jsonString = res.read() + print(jsonString) + + +load_dotenv() +parser = argparse.ArgumentParser(description='Helper parameters') +parser.add_argument('helper', type=str, help='Helper name: token|request') +parser.add_argument('command', type=str, help='Command: \ntoken: refresh\nrequest: get|post') + +args = parser.parse_args() +print(f"Running helper [{args.helper}] with the following command [{args.command}]") + +if args.helper == "token": + if args.command == "refresh": + refresh_token() + else: + print(f"No such command for helper [{args.helper}]") +elif args.helper == "request": + if args.command == "get": + get_stream(1) + else: + print(f"No such command for helper [{args.helper}]") +else: + print(f"Helper [{args.helper}] is not implemented") \ No newline at end of file diff --git a/src/main/java/org/opengroup/osdu/streaming/StreamingAdminApplication.java b/src/main/java/org/opengroup/osdu/streaming/StreamingAdminApplication.java index f3df697..6abb63d 100644 --- a/src/main/java/org/opengroup/osdu/streaming/StreamingAdminApplication.java +++ b/src/main/java/org/opengroup/osdu/streaming/StreamingAdminApplication.java @@ -9,4 +9,5 @@ public class StreamingAdminApplication { public static void main(String[] args) { SpringApplication.run(StreamingAdminApplication.class, args); } + } diff --git a/src/main/java/org/opengroup/osdu/streaming/di/DpsHeadersFactory.java b/src/main/java/org/opengroup/osdu/streaming/di/DpsHeadersFactory.java deleted file mode 100644 index 27752ec..0000000 --- a/src/main/java/org/opengroup/osdu/streaming/di/DpsHeadersFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -package org.opengroup.osdu.streaming.di; - -import java.util.HashMap; -import java.util.Map; - -import org.opengroup.osdu.core.common.model.http.DpsHeaders; -import org.springframework.beans.factory.annotation.Value; -import org.springframework.context.annotation.Bean; -import org.springframework.stereotype.Component; - -@Component -public class DpsHeadersFactory { - - @Value("${osdu.headers.dataPartition}") - private String default_partition; - - @Value("${osdu.headers.JWTtoken}") - private String token; - - @Bean - public DpsHeaders dpsHeaders() { - Map headers = new HashMap<>(); - headers.put(DpsHeaders.DATA_PARTITION_ID, default_partition); - headers.put(DpsHeaders.AUTHORIZATION, token); - return new DpsHeaders().createFromMap(headers); - } - -} diff --git a/src/main/java/org/opengroup/osdu/streaming/di/StorageClientFactory.java b/src/main/java/org/opengroup/osdu/streaming/di/StorageClientFactory.java index 91e81df..49bda39 100644 --- a/src/main/java/org/opengroup/osdu/streaming/di/StorageClientFactory.java +++ b/src/main/java/org/opengroup/osdu/streaming/di/StorageClientFactory.java @@ -7,6 +7,8 @@ import org.opengroup.osdu.core.common.http.json.HttpResponseBodyMapper; import org.opengroup.osdu.core.common.storage.IStorageFactory; import org.opengroup.osdu.core.common.storage.StorageAPIConfig; import org.opengroup.osdu.core.common.storage.StorageFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.beans.factory.config.AbstractFactoryBean; import org.springframework.stereotype.Component; @@ -14,6 +16,8 @@ import org.springframework.stereotype.Component; @Component public class StorageClientFactory extends AbstractFactoryBean { + Logger logger = LoggerFactory.getLogger(StorageClientFactory.class); + private final ObjectMapper objectMapper = new ObjectMapper() .configure(DeserializationFeature.FAIL_ON_IGNORED_PROPERTIES, false); private final HttpResponseBodyMapper bodyMapper = new HttpResponseBodyMapper(objectMapper); @@ -28,7 +32,7 @@ public class StorageClientFactory extends AbstractFactoryBean { @Override protected IStorageFactory createInstance() throws Exception { - // return new StorageFactory(StorageAPIConfig.Default(), bodyMapper); + logger.debug("Creating instance of IStorageFactory... " + this.getClass().getName()); return new StorageFactory(StorageAPIConfig.builder().rootUrl(STORAGE_API).build(), bodyMapper); } diff --git a/src/main/java/org/opengroup/osdu/streaming/di/StreamingBeansConfig.java b/src/main/java/org/opengroup/osdu/streaming/di/StreamingBeansConfig.java new file mode 100644 index 0000000..136c0ee --- /dev/null +++ b/src/main/java/org/opengroup/osdu/streaming/di/StreamingBeansConfig.java @@ -0,0 +1,24 @@ +package org.opengroup.osdu.streaming.di; + +import javax.servlet.http.HttpServletRequest; + +import org.opengroup.osdu.core.common.http.DpsHeaderFactory; +import org.opengroup.osdu.core.common.model.http.DpsHeaders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.context.annotation.RequestScope; + +@Configuration +public class StreamingBeansConfig { + + Logger logger = LoggerFactory.getLogger(StreamingBeansConfig.class); + + @Bean + @RequestScope + public DpsHeaders headers(HttpServletRequest request) { + logger.debug("Getting a new instance of DpsHeaders"); + return new DpsHeaderFactory(request); + } +} diff --git a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java index 9daa8d4..07b6963 100644 --- a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java +++ b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java @@ -1,7 +1,5 @@ package org.opengroup.osdu.streaming.service; -import javax.inject.Inject; - import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.storage.Record; import org.opengroup.osdu.core.common.model.storage.StorageException; @@ -12,9 +10,12 @@ import org.opengroup.osdu.streaming.model.StreamDataset; import org.opengroup.osdu.streaming.model.StreamRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.web.context.annotation.RequestScope; @Service +@RequestScope public class StreamingAdminServiceImpl implements StreamingAdminService { Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class); @@ -22,19 +23,21 @@ public class StreamingAdminServiceImpl implements StreamingAdminService { // I'm not sure where the DpsHeaders instance is instantiated and injected from, // so I temporarily added my own factory to provide one with the hardcoded // headers information. This has to be clarified and fixed! - @Inject + @Autowired private DpsHeaders headers; - @Inject + @Autowired private IStorageFactory storageFactory; @Override public StreamRecord getStream(String streamRecordId) { try { + logger.debug("=====>>> Using headers bean: " + headers.toString()); + headers.put("test", "012345"); logger.debug("Creating a storage service with headers: " + headers.getHeaders().toString()); IStorageService storageService = this.storageFactory.create(headers); Record record = storageService.getRecord(streamRecordId); - logger.info("Created stream record: " + record); + logger.info("Retrieved stream record: " + record); } catch (StorageException e) { logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse()); } diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index cc5f08b..2e25ae8 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,9 +1,18 @@ +# Tomcat server.servlet.contextPath=/api/streaming/v1/ -osdu.storage.api=https:///api/storage/v2 +# OSDU +osdu.storage.api=https://.../api/storage/v2 +# Logging logging.level.org.opengroup.osdu.streaming=debug +# spring web logging level +logging.level.web=debug +# this turns on debugging the http requests +spring.mvc.log-request-details=true -# this is for testing only - need to properly handle DpsHeaders from the osdu common lib! -osdu.headers.dataPartition=opendes -osdu.headers.JWTtoken=Bearer ey... \ No newline at end of file +# these parameters below are no longer used since the proper handling of DpsHeaders is implemented; +# developers now supposed to provide these headers when calling the Streaming API and the service +# will pass them through to the downstream calls +# osdu.headers.dataPartition=opendes +# osdu.headers.JWTtoken=Bearer ey... \ No newline at end of file -- GitLab From 5ec1a7dbc49aae76ede85d0bae1288b04f063341 Mon Sep 17 00:00:00 2001 From: Dmitry Kniazev Date: Thu, 30 Sep 2021 00:21:06 -0500 Subject: [PATCH 2/4] Get stream initial implementation --- .gitignore | 1 + README.md | 75 ++++++++++++-- scripts/README.MD | 99 +++++++++++++++++++ scripts/dev.helpers.py | 66 ------------- scripts/requirements.txt | 1 + scripts/stream.helpers.py | 79 +++++++++++++++ .../streaming/StreamingAdminApplication.java | 20 ++++ .../api/StreamingAdminControllerImpl.java | 26 +++++ .../streaming/di/StorageClientFactory.java | 14 +++ .../streaming/di/StreamingBeansConfig.java | 20 ++++ .../service/StreamingAdminService.java | 15 ++- .../service/StreamingAdminServiceImpl.java | 58 +++++++++-- src/main/resources/application.properties | 17 +--- 13 files changed, 393 insertions(+), 98 deletions(-) create mode 100644 scripts/README.MD delete mode 100644 scripts/dev.helpers.py create mode 100644 scripts/requirements.txt create mode 100644 scripts/stream.helpers.py diff --git a/.gitignore b/.gitignore index e655588..c6625a2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ /target/ !.mvn/wrapper/maven-wrapper.jar .env +application-local.properties ### STS ### .apt_generated diff --git a/README.md b/README.md index e68dcee..94464cb 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,78 @@ -# Streaming Admin Service +# Streaming Administration Service -## Introduction -This service provides APIs to configure streams in OSDU data platform. +## Summary +This service provides APIs to register, start and stop streams in OSDU data platform. It currently allows registration and manipulation of 3 types of streams: -### Build and Run -#### Locally -Once you are in the project root directory. Execute the following commands +* **source** - typically a Kafka producer that connects to external system in order to receive streaming data and ingest it into OSDU data platform (e.g. WITSML/ETP producer). +* **processor** - typically a Kafka Streams application that reads streaming data ingested into OSDU data platform, process it and route it to prepared topics for downstream applications consumption (e.g. aggregate or filter ingested data). +* **sink** - typically a Kafka consumer application that reads streaming data from a prepared topic and consumes it (e.g. visualize or persist in a database). -To build the executable. +## More details + +### API +[OpenAPI Specification](/docs/api/streaming.openapi.yaml) + +### Schema + +[Stream Dataset schemas](/docs/data/schemas) + +[Stream.Generic.1.0.0.json - record example](/docs/data/examples/Stream.Generic.1.0.0.json) + +## Configure +Before you can run and test the service locally there are a few things that need to be configured: + +1. Verify that the _local_ profile is configured as active for spring boot. Check that [application.properties](/src/main/resources/application.properties) has the following setting: + +``` +# Profile +spring.profiles.active=local +``` +2. Create a new file in the same folder and name it _application-local.properties_ with the following settings: + +``` +# OSDU +osdu.storage.api=https:///api/storage/v2 +``` +>Note: Do not check in your _application-local.properties_ files (or any other sensitive information) to gitlab! + +>Note: replace _service-host_ with the proper value of the storage service that you are working with. + +>Note: Stream.Generic.1.0.0 schema must be registered with the Schema service before you can start working with Streams +>> TODO: Implement schema registration helper! + +## Build and Run +Once you are in the project root directory. Execute the following commands: + +To build the executable jar file. ``` bash mvn clean package ``` -Executing the above command generates the implementation based on the streaming api spec(open api format). The spec is part of the doc folder under the source tree. +Executing the above command generates the controllers interfaces based on the streaming api spec(open api format). The spec is part of the doc folder under the source tree. To run the executable as a spring boot application ``` bash mvn spring-boot:run -``` \ No newline at end of file +``` +## Test +When you launch Spring Boot application it typically provides a few lines of the start up information. Make sure the following lines are there: + +``` +2021-09-30 00:06:31.339 DEBUG 2067 --- [ restartedMain] o.o.o.s.StreamingAdminApplication : Running with Spring Boot v2.5.5, Spring v5.3.10 +2021-09-30 00:06:31.339 INFO 2067 --- [ restartedMain] o.o.o.s.StreamingAdminApplication : The following profiles are active: local +``` +This indicates that your _local_ profile is activated. + +``` +2021-09-30 00:06:41.227 INFO 2067 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '/api/streaming/v1' +``` +This indicates that your Tomcat has started and is available on the following link: + +http://localhost:8080/api/streaming/v1 + +The embedded Swagger API documentation is also available on the following link: + +http://localhost:8080/api/streaming/v1/swagger-ui/ + +Now you can use either Postman or [stream.helpers](/scripts) to test the API. \ No newline at end of file diff --git a/scripts/README.MD b/scripts/README.MD new file mode 100644 index 0000000..9c86dc1 --- /dev/null +++ b/scripts/README.MD @@ -0,0 +1,99 @@ +# STREAM.HELPERS + +**stream.helpers** is a collection of Python functions available in CLI that help automate certain things during the development, such us: + +* Refreshing access token for the environment +* Creating a new stream using POST request +* Fetching an existing stream using GET request + +It is essentially an alternative to Postman so feel free to use whatever works best for you. I created it because I wanted to use it in some of my future integration tests and also share some of the environment variables I'm using to create the service itself. The script loads environment variables from the `.env` file and uses them to perform other actions. + +## Prepare Python environment + +Install or upgrade to Python 3.7 (should work for other Python versions as long as it is 3.7+): + +```bash +sudo apt-get update +sudo apt install python3.7 +sudo apt install python3.7-venv +``` + +The last command installs the additional package to be able to use a virtual environment for our scripts. Now navigate to the root folder of your project (`/stream-admin-service`) and create the new Python environment: + +```bash +python3 -m venv .venv/dev.helpers +``` +Next, activate the environment: + +```bash +source .venv/dev.helpers/bin/activate +``` +You will now see the environment is activated as your command line will be prefixed with `dev.helpers` - any packages you install to this environment will NOT affect the rest of your system. You can deactivate it and go back to your OS's Python by running the following command: + +```bash +deactivate +``` + +Once activated, navigate to scripts folder and install all the dependencies for the scripts: + +```bash +cd scripts/ +pip install -r requirements.txt +``` +## Explore arguments + +Run the script to see if everything is ok and to display the options: + +```bash +python dev.helpers.py -h +``` +This should display the help info for the script. + +## Create environment variables + +Next, you need to create `.env` file in the project root folder. This file will contain the environment variables that will be loaded during the script startup. Navigate to the project root folder and create the `.env` file using the example file provided: + +```bash +cd .. +cp .env.example .env +``` + +After that, open `.env` file in your favourite editor and update the values for the environment variables: + +```txt +OSDU_AUTH_REFRESH_TOKEN= +# change the lines below depending on the cloud provider +OSDU_AUTH_SERVER=login.microsoftonline.com +OSDU_AUTH_TOKEN_URL=//oauth2/v2.0/token +``` +> Note: once you create your `.env` file - do NOT add it to git repository (I've added it to `.gitignore`, so you probably won't anyway) + +> TODO: the instructions need to be updated for AWS/Azure/GCP/IBM environments + +## Run helpers + +### Token helper +This helper is usng the refresh token to get the new access token: + +```bash +python dev.helpers.py token refresh +``` +This will produce the access token string like below: + +```bash +Response code: 200 +export OSDU_API_TOKEN=eyJ0eXAiOiJKV1Q... +``` +Copy and paste the export statement to your terminal and export it to your environment. Alternatively, add a new line to your `.env` file with the name and value for this variable. + +> Note: the access token will expire in about 30 mins to an hour, so once you start getting HTTP 401 from the OSDU services, then you need to run the helper again and re-export the access token. + +### Stream helper +This helper can make Http requests to the streaming service and pass the required headers with it: + +```bash +python dev.helpers.py stream get --record-id opendes:dataset--Stream.Generic:12345 +``` +Will execute the get stream request. + +> Note: POST is not ready yet \ No newline at end of file diff --git a/scripts/dev.helpers.py b/scripts/dev.helpers.py deleted file mode 100644 index b937fde..0000000 --- a/scripts/dev.helpers.py +++ /dev/null @@ -1,66 +0,0 @@ -import http.client -import json -import argparse -import os -from dotenv import load_dotenv - -def refresh_token(): - - # loading env variables - make sure to export them to your env - # check .env.example file to see the list of variables needed - refresh_token = os.getenv('OSDU_AUTH_REFRESH_TOKEN') - auth_server = os.getenv('OSDU_AUTH_SERVER') - auth_token_url = os.getenv('OSDU_AUTH_TOKEN_URL') - - conn = http.client.HTTPSConnection(f'{auth_server}') - payload = f'grant_type=refresh_token&refresh_token={refresh_token}' - headers = { - 'Content-Type': 'application/x-www-form-urlencoded' - } - conn.request("POST", f"{auth_token_url}", payload, headers) - - # read response and parse out the access token - # this is the JWT token we need to call OSDU APIs - # it expires every 30 mins to an hour - # so we need to refresh it preiodically - response = conn.getresponse() - print(f"Response code: {response.status}") - if response.status == 200: - data = json.loads(response.read()) - print(f"export OSDU_API_TOKEN={data['access_token']}") - else: - print(f"Failed refreshing the token: {response.getcode()} {response.msg}") - - -def get_stream(stream_id): - token = os.getenv('OSDU_API_TOKEN') - auth = f"Bearer {token}" - conn = http.client.HTTPConnection("localhost:8080") - payload = '' - headers = {'data-partition-id':'opendes', 'Authorization':auth} - conn.request("GET", f"/api/streaming/v1/stream/{stream_id}", payload, headers) - res = conn.getresponse() - jsonString = res.read() - print(jsonString) - - -load_dotenv() -parser = argparse.ArgumentParser(description='Helper parameters') -parser.add_argument('helper', type=str, help='Helper name: token|request') -parser.add_argument('command', type=str, help='Command: \ntoken: refresh\nrequest: get|post') - -args = parser.parse_args() -print(f"Running helper [{args.helper}] with the following command [{args.command}]") - -if args.helper == "token": - if args.command == "refresh": - refresh_token() - else: - print(f"No such command for helper [{args.helper}]") -elif args.helper == "request": - if args.command == "get": - get_stream(1) - else: - print(f"No such command for helper [{args.helper}]") -else: - print(f"Helper [{args.helper}] is not implemented") \ No newline at end of file diff --git a/scripts/requirements.txt b/scripts/requirements.txt new file mode 100644 index 0000000..b60793c --- /dev/null +++ b/scripts/requirements.txt @@ -0,0 +1 @@ +python-dotenv==0.19.0 diff --git a/scripts/stream.helpers.py b/scripts/stream.helpers.py new file mode 100644 index 0000000..6d6239b --- /dev/null +++ b/scripts/stream.helpers.py @@ -0,0 +1,79 @@ +import http.client +import json +import argparse +import os +from dotenv import load_dotenv + + +def refresh_token(): + + # loading env variables - make sure to export them to your env + # check .env.example file to see the list of variables needed + refresh_token = os.getenv('OSDU_AUTH_REFRESH_TOKEN') + auth_server = os.getenv('OSDU_AUTH_SERVER') + auth_token_url = os.getenv('OSDU_AUTH_TOKEN_URL') + + conn = http.client.HTTPSConnection(f'{auth_server}') + payload = f'grant_type=refresh_token&refresh_token={refresh_token}' + headers = { + 'Content-Type': 'application/x-www-form-urlencoded' + } + conn.request("POST", f"{auth_token_url}", payload, headers) + + # read response and parse out the access token + # this is the JWT token we need to call OSDU APIs + # it expires every 30 mins to an hour + # so we need to refresh it preiodically + response = conn.getresponse() + print(f"Response code: {response.status}") + if response.status == 200: + data = json.loads(response.read()) + print(f"export OSDU_API_TOKEN={data['access_token']}") + else: + print( + f"Failed refreshing the token: {response.getcode()} {response.msg}") + + +def get_stream(stream_id): + token = os.getenv('OSDU_API_TOKEN') + auth = f"Bearer {token}" + conn = http.client.HTTPConnection("localhost:8080") + payload = '' + headers = {'data-partition-id': 'opendes', 'Authorization': auth} + conn.request( + "GET", f"/api/streaming/v1/stream/{stream_id}", payload, headers) + res = conn.getresponse() + jsonString = res.read() + print(jsonString) + + +load_dotenv() +parser = argparse.ArgumentParser( + description='Little helpers to use for Streaming API testing. They can help you with things like refreshing access tokens, posting a stream record, getting a stream record, etc.') +parser.add_argument('helper', type=str, choices=[ + "token", "stream"], help='helper to use') +parser.add_argument('command', type=str, choices=['refresh', 'get', 'post'], + help='command to execute') +parser.add_argument('--record-id', type=str, help='id of the stream record, e.g. opendes:dataset--Stream.Generic:12345', + default='opendes:dataset--Stream.Generic:12345') + +args = parser.parse_args() +print( + f"Running helper [{args.helper}] with the following command [{args.command}]") +print(f"{args}") + +if args.helper == "token": + if args.command == "refresh": + refresh_token() + else: + print(f"No such command for helper [{args.helper}]") +elif args.helper == "stream": + if args.command == "get": + if args.record_id: + get_stream(args.record_id) + else: + print(f"RECORD_ID cannot be empty with for the [{args.helper}]") + else: + print(f"No such command for helper [{args.helper}]") +else: + print(f"Helper [{args.helper}] is not implemented") diff --git a/src/main/java/org/opengroup/osdu/streaming/StreamingAdminApplication.java b/src/main/java/org/opengroup/osdu/streaming/StreamingAdminApplication.java index 6abb63d..f4ddc81 100644 --- a/src/main/java/org/opengroup/osdu/streaming/StreamingAdminApplication.java +++ b/src/main/java/org/opengroup/osdu/streaming/StreamingAdminApplication.java @@ -1,8 +1,28 @@ +// Copyright © 2021 EPAM Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.streaming; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +/** + * Main Application Configuration + * + * @author Dmitry Kniazev + */ + @SpringBootApplication public class StreamingAdminApplication { diff --git a/src/main/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerImpl.java b/src/main/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerImpl.java index cb4e572..30770af 100644 --- a/src/main/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerImpl.java +++ b/src/main/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerImpl.java @@ -1,3 +1,17 @@ +// Copyright © 2021 EPAM Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.streaming.api; import org.opengroup.osdu.streaming.StreamApi; @@ -12,6 +26,18 @@ import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.annotation.RequestScope; +/** + * Streams Administration Controller implements the auto-generated interface + * based on OpenAPI 3.0 + * + * @author Dmitry Kniazev + */ + +// TODO: Implement security filters for all methods + +// TODO: Implement handling the underlying service exceptions and prvoide the +// proper response codes + @RestController @RequestScope public class StreamingAdminControllerImpl implements StreamApi { diff --git a/src/main/java/org/opengroup/osdu/streaming/di/StorageClientFactory.java b/src/main/java/org/opengroup/osdu/streaming/di/StorageClientFactory.java index 49bda39..6389f13 100644 --- a/src/main/java/org/opengroup/osdu/streaming/di/StorageClientFactory.java +++ b/src/main/java/org/opengroup/osdu/streaming/di/StorageClientFactory.java @@ -1,3 +1,17 @@ +// Copyright © 2021 EPAM Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.streaming.di; import com.fasterxml.jackson.databind.DeserializationFeature; diff --git a/src/main/java/org/opengroup/osdu/streaming/di/StreamingBeansConfig.java b/src/main/java/org/opengroup/osdu/streaming/di/StreamingBeansConfig.java index 136c0ee..31320dd 100644 --- a/src/main/java/org/opengroup/osdu/streaming/di/StreamingBeansConfig.java +++ b/src/main/java/org/opengroup/osdu/streaming/di/StreamingBeansConfig.java @@ -1,3 +1,17 @@ +// Copyright © 2021 EPAM Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.streaming.di; import javax.servlet.http.HttpServletRequest; @@ -10,6 +24,12 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.context.annotation.RequestScope; +/** + * Web Configuration for Streaming Services + * + * @author Dmitry Kniazev + */ + @Configuration public class StreamingBeansConfig { diff --git a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminService.java b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminService.java index 1959c8f..bdb9ae5 100644 --- a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminService.java +++ b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminService.java @@ -1,10 +1,23 @@ +// Copyright © 2021 EPAM Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.streaming.service; import org.opengroup.osdu.streaming.model.StreamDataset; import org.opengroup.osdu.streaming.model.StreamRecord; public interface StreamingAdminService { - // StreamRecord createOrUpdateStream(Record record); StreamRecord getStream(String streamRecordId); diff --git a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java index 07b6963..0ec5c71 100644 --- a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java +++ b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java @@ -1,5 +1,23 @@ +// Copyright © 2021 EPAM Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.streaming.service; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.storage.Record; import org.opengroup.osdu.core.common.model.storage.StorageException; @@ -14,15 +32,24 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.context.annotation.RequestScope; +/** + * Streaming Admin Service + * + * @author Dmitry Kniazev + */ + @Service @RequestScope public class StreamingAdminServiceImpl implements StreamingAdminService { Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class); - // I'm not sure where the DpsHeaders instance is instantiated and injected from, - // so I temporarily added my own factory to provide one with the hardcoded - // headers information. This has to be clarified and fixed! + // TODO: Turn FAIL_ON_UNKNOWN_PROPERTIES on and change the API interface to + // handle all the fields properly, since this is now ignoring some neccessary + // fields (like ExtensionProperties and others) + private final ObjectMapper objectMapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + @Autowired private DpsHeaders headers; @@ -31,20 +58,37 @@ public class StreamingAdminServiceImpl implements StreamingAdminService { @Override public StreamRecord getStream(String streamRecordId) { + StreamRecord sRec = null; + String jsonRepr = new String(); try { - logger.debug("=====>>> Using headers bean: " + headers.toString()); - headers.put("test", "012345"); logger.debug("Creating a storage service with headers: " + headers.getHeaders().toString()); IStorageService storageService = this.storageFactory.create(headers); Record record = storageService.getRecord(streamRecordId); - logger.info("Retrieved stream record: " + record); + + // this is a temporary implementation until I know how to marry Records with + // StreamRecords + // TODO: replace object marshalling/unmarshalling with the proper solution + if (record != null) { + jsonRepr = objectMapper.writeValueAsString(record); + logger.debug("Retrieved stream record: " + jsonRepr); + sRec = objectMapper.readValue(jsonRepr, StreamRecord.class); + } + // TODO: throw exceptions to the controller so that the proper HTTP codes can be + // assigned for the API responses } catch (StorageException e) { logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse()); + } catch (JsonProcessingException e) { + logger.error("Got exception: " + e.getMessage() + "\nLocation" + e.getLocation().toString()); } - return new StreamRecord(); + return sRec; } + // TODO: Change API interface to StreamRecord and implement this method. + // The problem is that we don't know how to assign all the required attributes + // for the OSDU records, like ACL, tags, meta, legal, etc. Therefore it might be + // easier to expect all of these to be provided by the client, so that we can + // pass it to the storage svc without any changes... @Override public String createNewStream(StreamDataset streamDataset) { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 2e25ae8..c9fe9f8 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,18 +1,5 @@ # Tomcat server.servlet.contextPath=/api/streaming/v1/ -# OSDU -osdu.storage.api=https://.../api/storage/v2 - -# Logging -logging.level.org.opengroup.osdu.streaming=debug -# spring web logging level -logging.level.web=debug -# this turns on debugging the http requests -spring.mvc.log-request-details=true - -# these parameters below are no longer used since the proper handling of DpsHeaders is implemented; -# developers now supposed to provide these headers when calling the Streaming API and the service -# will pass them through to the downstream calls -# osdu.headers.dataPartition=opendes -# osdu.headers.JWTtoken=Bearer ey... \ No newline at end of file +# Profile +spring.profiles.active=local \ No newline at end of file -- GitLab From 8ab2cbee3a6eaba2afd0d7d19dc07f6dfe049ac5 Mon Sep 17 00:00:00 2001 From: Dmitry Kniazev Date: Thu, 30 Sep 2021 00:21:06 -0500 Subject: [PATCH 3/4] Get stream initial implementation --- .gitignore | 1 + README.md | 75 ++++++++++++-- scripts/README.MD | 99 +++++++++++++++++++ scripts/dev.helpers.py | 66 ------------- scripts/requirements.txt | 1 + scripts/stream.helpers.py | 79 +++++++++++++++ .../streaming/StreamingAdminApplication.java | 20 ++++ .../api/StreamingAdminControllerImpl.java | 26 +++++ .../streaming/di/StorageClientFactory.java | 14 +++ .../streaming/di/StreamingBeansConfig.java | 20 ++++ .../service/StreamingAdminService.java | 15 ++- .../service/StreamingAdminServiceImpl.java | 58 +++++++++-- src/main/resources/application.properties | 17 +--- 13 files changed, 393 insertions(+), 98 deletions(-) create mode 100644 scripts/README.MD delete mode 100644 scripts/dev.helpers.py create mode 100644 scripts/requirements.txt create mode 100644 scripts/stream.helpers.py diff --git a/.gitignore b/.gitignore index e655588..c6625a2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ /target/ !.mvn/wrapper/maven-wrapper.jar .env +application-local.properties ### STS ### .apt_generated diff --git a/README.md b/README.md index e68dcee..94464cb 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,78 @@ -# Streaming Admin Service +# Streaming Administration Service -## Introduction -This service provides APIs to configure streams in OSDU data platform. +## Summary +This service provides APIs to register, start and stop streams in OSDU data platform. It currently allows registration and manipulation of 3 types of streams: -### Build and Run -#### Locally -Once you are in the project root directory. Execute the following commands +* **source** - typically a Kafka producer that connects to external system in order to receive streaming data and ingest it into OSDU data platform (e.g. WITSML/ETP producer). +* **processor** - typically a Kafka Streams application that reads streaming data ingested into OSDU data platform, process it and route it to prepared topics for downstream applications consumption (e.g. aggregate or filter ingested data). +* **sink** - typically a Kafka consumer application that reads streaming data from a prepared topic and consumes it (e.g. visualize or persist in a database). -To build the executable. +## More details + +### API +[OpenAPI Specification](/docs/api/streaming.openapi.yaml) + +### Schema + +[Stream Dataset schemas](/docs/data/schemas) + +[Stream.Generic.1.0.0.json - record example](/docs/data/examples/Stream.Generic.1.0.0.json) + +## Configure +Before you can run and test the service locally there are a few things that need to be configured: + +1. Verify that the _local_ profile is configured as active for spring boot. Check that [application.properties](/src/main/resources/application.properties) has the following setting: + +``` +# Profile +spring.profiles.active=local +``` +2. Create a new file in the same folder and name it _application-local.properties_ with the following settings: + +``` +# OSDU +osdu.storage.api=https:///api/storage/v2 +``` +>Note: Do not check in your _application-local.properties_ files (or any other sensitive information) to gitlab! + +>Note: replace _service-host_ with the proper value of the storage service that you are working with. + +>Note: Stream.Generic.1.0.0 schema must be registered with the Schema service before you can start working with Streams +>> TODO: Implement schema registration helper! + +## Build and Run +Once you are in the project root directory. Execute the following commands: + +To build the executable jar file. ``` bash mvn clean package ``` -Executing the above command generates the implementation based on the streaming api spec(open api format). The spec is part of the doc folder under the source tree. +Executing the above command generates the controllers interfaces based on the streaming api spec(open api format). The spec is part of the doc folder under the source tree. To run the executable as a spring boot application ``` bash mvn spring-boot:run -``` \ No newline at end of file +``` +## Test +When you launch Spring Boot application it typically provides a few lines of the start up information. Make sure the following lines are there: + +``` +2021-09-30 00:06:31.339 DEBUG 2067 --- [ restartedMain] o.o.o.s.StreamingAdminApplication : Running with Spring Boot v2.5.5, Spring v5.3.10 +2021-09-30 00:06:31.339 INFO 2067 --- [ restartedMain] o.o.o.s.StreamingAdminApplication : The following profiles are active: local +``` +This indicates that your _local_ profile is activated. + +``` +2021-09-30 00:06:41.227 INFO 2067 --- [ restartedMain] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path '/api/streaming/v1' +``` +This indicates that your Tomcat has started and is available on the following link: + +http://localhost:8080/api/streaming/v1 + +The embedded Swagger API documentation is also available on the following link: + +http://localhost:8080/api/streaming/v1/swagger-ui/ + +Now you can use either Postman or [stream.helpers](/scripts) to test the API. \ No newline at end of file diff --git a/scripts/README.MD b/scripts/README.MD new file mode 100644 index 0000000..b3340de --- /dev/null +++ b/scripts/README.MD @@ -0,0 +1,99 @@ +# STREAM.HELPERS + +**stream.helpers** is a collection of Python functions available in CLI that help automate certain things during the development, such us: + +* Refreshing access token for the environment +* Creating a new stream using POST request +* Fetching an existing stream using GET request + +It is essentially an alternative to Postman so feel free to use whatever works best for you. I created it because I wanted to use it in some of my future integration tests and also share some of the environment variables I'm using to create the service itself. The script loads environment variables from the `.env` file and uses them to perform other actions. + +## Prepare Python environment + +Install or upgrade to Python 3.7 (should work for other Python versions as long as it is 3.7+): + +```bash +sudo apt-get update +sudo apt install python3.7 +sudo apt install python3.7-venv +``` + +The last command installs the additional package to be able to use a virtual environment for our scripts. Now navigate to the root folder of your project (`/stream-admin-service`) and create the new Python environment: + +```bash +python3 -m venv .venv/stream.helpers +``` +Next, activate the environment: + +```bash +source .venv/stream.helpers/bin/activate +``` +You will now see the environment is activated as your command line will be prefixed with `stream.helpers` - any packages you install to this environment will NOT affect the rest of your system. You can deactivate it and go back to your OS's Python by running the following command: + +```bash +deactivate +``` + +Once activated, navigate to scripts folder and install all the dependencies for the scripts: + +```bash +cd scripts/ +pip install -r requirements.txt +``` +## Explore arguments + +Run the script to see if everything is ok and to display the options: + +```bash +python stream.helpers.py -h +``` +This should display the help info for the script. + +## Create environment variables + +Next, you need to create `.env` file in the project root folder. This file will contain the environment variables that will be loaded during the script startup. Navigate to the project root folder and create the `.env` file using the example file provided: + +```bash +cd .. +cp .env.example .env +``` + +After that, open `.env` file in your favourite editor and update the values for the environment variables: + +```txt +OSDU_AUTH_REFRESH_TOKEN= +# change the lines below depending on the cloud provider +OSDU_AUTH_SERVER=login.microsoftonline.com +OSDU_AUTH_TOKEN_URL=//oauth2/v2.0/token +``` +> Note: once you create your `.env` file - do NOT add it to git repository (I've added it to `.gitignore`, so you probably won't anyway) + +> TODO: the instructions need to be updated for AWS/Azure/GCP/IBM environments + +## Run helpers + +### Token helper +This helper is usng the refresh token to get the new access token: + +```bash +python stream.helpers.py token refresh +``` +This will produce the access token string like below: + +```bash +Response code: 200 +export OSDU_API_TOKEN=eyJ0eXAiOiJKV1Q... +``` +Copy and paste the export statement to your terminal and export it to your environment. Alternatively, add a new line to your `.env` file with the name and value for this variable. + +> Note: the access token will expire in about 30 mins to an hour, so once you start getting HTTP 401 from the OSDU services, then you need to run the helper again and re-export the access token. + +### Stream helper +This helper can make Http requests to the streaming service and pass the required headers with it: + +```bash +python stream.helpers.py stream get --record-id opendes:dataset--Stream.Generic:12345 +``` +Will execute the get stream request. + +> Note: POST is not ready yet \ No newline at end of file diff --git a/scripts/dev.helpers.py b/scripts/dev.helpers.py deleted file mode 100644 index b937fde..0000000 --- a/scripts/dev.helpers.py +++ /dev/null @@ -1,66 +0,0 @@ -import http.client -import json -import argparse -import os -from dotenv import load_dotenv - -def refresh_token(): - - # loading env variables - make sure to export them to your env - # check .env.example file to see the list of variables needed - refresh_token = os.getenv('OSDU_AUTH_REFRESH_TOKEN') - auth_server = os.getenv('OSDU_AUTH_SERVER') - auth_token_url = os.getenv('OSDU_AUTH_TOKEN_URL') - - conn = http.client.HTTPSConnection(f'{auth_server}') - payload = f'grant_type=refresh_token&refresh_token={refresh_token}' - headers = { - 'Content-Type': 'application/x-www-form-urlencoded' - } - conn.request("POST", f"{auth_token_url}", payload, headers) - - # read response and parse out the access token - # this is the JWT token we need to call OSDU APIs - # it expires every 30 mins to an hour - # so we need to refresh it preiodically - response = conn.getresponse() - print(f"Response code: {response.status}") - if response.status == 200: - data = json.loads(response.read()) - print(f"export OSDU_API_TOKEN={data['access_token']}") - else: - print(f"Failed refreshing the token: {response.getcode()} {response.msg}") - - -def get_stream(stream_id): - token = os.getenv('OSDU_API_TOKEN') - auth = f"Bearer {token}" - conn = http.client.HTTPConnection("localhost:8080") - payload = '' - headers = {'data-partition-id':'opendes', 'Authorization':auth} - conn.request("GET", f"/api/streaming/v1/stream/{stream_id}", payload, headers) - res = conn.getresponse() - jsonString = res.read() - print(jsonString) - - -load_dotenv() -parser = argparse.ArgumentParser(description='Helper parameters') -parser.add_argument('helper', type=str, help='Helper name: token|request') -parser.add_argument('command', type=str, help='Command: \ntoken: refresh\nrequest: get|post') - -args = parser.parse_args() -print(f"Running helper [{args.helper}] with the following command [{args.command}]") - -if args.helper == "token": - if args.command == "refresh": - refresh_token() - else: - print(f"No such command for helper [{args.helper}]") -elif args.helper == "request": - if args.command == "get": - get_stream(1) - else: - print(f"No such command for helper [{args.helper}]") -else: - print(f"Helper [{args.helper}] is not implemented") \ No newline at end of file diff --git a/scripts/requirements.txt b/scripts/requirements.txt new file mode 100644 index 0000000..b60793c --- /dev/null +++ b/scripts/requirements.txt @@ -0,0 +1 @@ +python-dotenv==0.19.0 diff --git a/scripts/stream.helpers.py b/scripts/stream.helpers.py new file mode 100644 index 0000000..6d6239b --- /dev/null +++ b/scripts/stream.helpers.py @@ -0,0 +1,79 @@ +import http.client +import json +import argparse +import os +from dotenv import load_dotenv + + +def refresh_token(): + + # loading env variables - make sure to export them to your env + # check .env.example file to see the list of variables needed + refresh_token = os.getenv('OSDU_AUTH_REFRESH_TOKEN') + auth_server = os.getenv('OSDU_AUTH_SERVER') + auth_token_url = os.getenv('OSDU_AUTH_TOKEN_URL') + + conn = http.client.HTTPSConnection(f'{auth_server}') + payload = f'grant_type=refresh_token&refresh_token={refresh_token}' + headers = { + 'Content-Type': 'application/x-www-form-urlencoded' + } + conn.request("POST", f"{auth_token_url}", payload, headers) + + # read response and parse out the access token + # this is the JWT token we need to call OSDU APIs + # it expires every 30 mins to an hour + # so we need to refresh it preiodically + response = conn.getresponse() + print(f"Response code: {response.status}") + if response.status == 200: + data = json.loads(response.read()) + print(f"export OSDU_API_TOKEN={data['access_token']}") + else: + print( + f"Failed refreshing the token: {response.getcode()} {response.msg}") + + +def get_stream(stream_id): + token = os.getenv('OSDU_API_TOKEN') + auth = f"Bearer {token}" + conn = http.client.HTTPConnection("localhost:8080") + payload = '' + headers = {'data-partition-id': 'opendes', 'Authorization': auth} + conn.request( + "GET", f"/api/streaming/v1/stream/{stream_id}", payload, headers) + res = conn.getresponse() + jsonString = res.read() + print(jsonString) + + +load_dotenv() +parser = argparse.ArgumentParser( + description='Little helpers to use for Streaming API testing. They can help you with things like refreshing access tokens, posting a stream record, getting a stream record, etc.') +parser.add_argument('helper', type=str, choices=[ + "token", "stream"], help='helper to use') +parser.add_argument('command', type=str, choices=['refresh', 'get', 'post'], + help='command to execute') +parser.add_argument('--record-id', type=str, help='id of the stream record, e.g. opendes:dataset--Stream.Generic:12345', + default='opendes:dataset--Stream.Generic:12345') + +args = parser.parse_args() +print( + f"Running helper [{args.helper}] with the following command [{args.command}]") +print(f"{args}") + +if args.helper == "token": + if args.command == "refresh": + refresh_token() + else: + print(f"No such command for helper [{args.helper}]") +elif args.helper == "stream": + if args.command == "get": + if args.record_id: + get_stream(args.record_id) + else: + print(f"RECORD_ID cannot be empty with for the [{args.helper}]") + else: + print(f"No such command for helper [{args.helper}]") +else: + print(f"Helper [{args.helper}] is not implemented") diff --git a/src/main/java/org/opengroup/osdu/streaming/StreamingAdminApplication.java b/src/main/java/org/opengroup/osdu/streaming/StreamingAdminApplication.java index 6abb63d..f4ddc81 100644 --- a/src/main/java/org/opengroup/osdu/streaming/StreamingAdminApplication.java +++ b/src/main/java/org/opengroup/osdu/streaming/StreamingAdminApplication.java @@ -1,8 +1,28 @@ +// Copyright © 2021 EPAM Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.streaming; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; +/** + * Main Application Configuration + * + * @author Dmitry Kniazev + */ + @SpringBootApplication public class StreamingAdminApplication { diff --git a/src/main/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerImpl.java b/src/main/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerImpl.java index cb4e572..30770af 100644 --- a/src/main/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerImpl.java +++ b/src/main/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerImpl.java @@ -1,3 +1,17 @@ +// Copyright © 2021 EPAM Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.streaming.api; import org.opengroup.osdu.streaming.StreamApi; @@ -12,6 +26,18 @@ import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.annotation.RequestScope; +/** + * Streams Administration Controller implements the auto-generated interface + * based on OpenAPI 3.0 + * + * @author Dmitry Kniazev + */ + +// TODO: Implement security filters for all methods + +// TODO: Implement handling the underlying service exceptions and prvoide the +// proper response codes + @RestController @RequestScope public class StreamingAdminControllerImpl implements StreamApi { diff --git a/src/main/java/org/opengroup/osdu/streaming/di/StorageClientFactory.java b/src/main/java/org/opengroup/osdu/streaming/di/StorageClientFactory.java index 49bda39..6389f13 100644 --- a/src/main/java/org/opengroup/osdu/streaming/di/StorageClientFactory.java +++ b/src/main/java/org/opengroup/osdu/streaming/di/StorageClientFactory.java @@ -1,3 +1,17 @@ +// Copyright © 2021 EPAM Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.streaming.di; import com.fasterxml.jackson.databind.DeserializationFeature; diff --git a/src/main/java/org/opengroup/osdu/streaming/di/StreamingBeansConfig.java b/src/main/java/org/opengroup/osdu/streaming/di/StreamingBeansConfig.java index 136c0ee..31320dd 100644 --- a/src/main/java/org/opengroup/osdu/streaming/di/StreamingBeansConfig.java +++ b/src/main/java/org/opengroup/osdu/streaming/di/StreamingBeansConfig.java @@ -1,3 +1,17 @@ +// Copyright © 2021 EPAM Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.streaming.di; import javax.servlet.http.HttpServletRequest; @@ -10,6 +24,12 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.context.annotation.RequestScope; +/** + * Web Configuration for Streaming Services + * + * @author Dmitry Kniazev + */ + @Configuration public class StreamingBeansConfig { diff --git a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminService.java b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminService.java index 1959c8f..bdb9ae5 100644 --- a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminService.java +++ b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminService.java @@ -1,10 +1,23 @@ +// Copyright © 2021 EPAM Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.streaming.service; import org.opengroup.osdu.streaming.model.StreamDataset; import org.opengroup.osdu.streaming.model.StreamRecord; public interface StreamingAdminService { - // StreamRecord createOrUpdateStream(Record record); StreamRecord getStream(String streamRecordId); diff --git a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java index 07b6963..0ec5c71 100644 --- a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java +++ b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java @@ -1,5 +1,23 @@ +// Copyright © 2021 EPAM Systems +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package org.opengroup.osdu.streaming.service; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; + import org.opengroup.osdu.core.common.model.http.DpsHeaders; import org.opengroup.osdu.core.common.model.storage.Record; import org.opengroup.osdu.core.common.model.storage.StorageException; @@ -14,15 +32,24 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.web.context.annotation.RequestScope; +/** + * Streaming Admin Service + * + * @author Dmitry Kniazev + */ + @Service @RequestScope public class StreamingAdminServiceImpl implements StreamingAdminService { Logger logger = LoggerFactory.getLogger(StreamingAdminServiceImpl.class); - // I'm not sure where the DpsHeaders instance is instantiated and injected from, - // so I temporarily added my own factory to provide one with the hardcoded - // headers information. This has to be clarified and fixed! + // TODO: Turn FAIL_ON_UNKNOWN_PROPERTIES on and change the API interface to + // handle all the fields properly, since this is now ignoring some neccessary + // fields (like ExtensionProperties and others) + private final ObjectMapper objectMapper = new ObjectMapper() + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + @Autowired private DpsHeaders headers; @@ -31,20 +58,37 @@ public class StreamingAdminServiceImpl implements StreamingAdminService { @Override public StreamRecord getStream(String streamRecordId) { + StreamRecord sRec = null; + String jsonRepr = new String(); try { - logger.debug("=====>>> Using headers bean: " + headers.toString()); - headers.put("test", "012345"); logger.debug("Creating a storage service with headers: " + headers.getHeaders().toString()); IStorageService storageService = this.storageFactory.create(headers); Record record = storageService.getRecord(streamRecordId); - logger.info("Retrieved stream record: " + record); + + // this is a temporary implementation until I know how to marry Records with + // StreamRecords + // TODO: replace object marshalling/unmarshalling with the proper solution + if (record != null) { + jsonRepr = objectMapper.writeValueAsString(record); + logger.debug("Retrieved stream record: " + jsonRepr); + sRec = objectMapper.readValue(jsonRepr, StreamRecord.class); + } + // TODO: throw exceptions to the controller so that the proper HTTP codes can be + // assigned for the API responses } catch (StorageException e) { logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse()); + } catch (JsonProcessingException e) { + logger.error("Got exception: " + e.getMessage() + "\nLocation" + e.getLocation().toString()); } - return new StreamRecord(); + return sRec; } + // TODO: Change API interface to StreamRecord and implement this method. + // The problem is that we don't know how to assign all the required attributes + // for the OSDU records, like ACL, tags, meta, legal, etc. Therefore it might be + // easier to expect all of these to be provided by the client, so that we can + // pass it to the storage svc without any changes... @Override public String createNewStream(StreamDataset streamDataset) { diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties index 2e25ae8..c9fe9f8 100644 --- a/src/main/resources/application.properties +++ b/src/main/resources/application.properties @@ -1,18 +1,5 @@ # Tomcat server.servlet.contextPath=/api/streaming/v1/ -# OSDU -osdu.storage.api=https://.../api/storage/v2 - -# Logging -logging.level.org.opengroup.osdu.streaming=debug -# spring web logging level -logging.level.web=debug -# this turns on debugging the http requests -spring.mvc.log-request-details=true - -# these parameters below are no longer used since the proper handling of DpsHeaders is implemented; -# developers now supposed to provide these headers when calling the Streaming API and the service -# will pass them through to the downstream calls -# osdu.headers.dataPartition=opendes -# osdu.headers.JWTtoken=Bearer ey... \ No newline at end of file +# Profile +spring.profiles.active=local \ No newline at end of file -- GitLab From 573b9cbb285c96ff3039ef8855183ebeea22cbed Mon Sep 17 00:00:00 2001 From: Dmitry Kniazev Date: Thu, 30 Sep 2021 23:24:17 -0500 Subject: [PATCH 4/4] Creating and getting streams methods work with the storage service now --- .gitignore | 3 +- README.md | 2 +- docs/api/streaming.openapi.yaml | 19 +++++-- pom.xml | 9 +++- scripts/requirements.txt | 3 ++ scripts/stream.helpers.py | 43 +++++++++++++--- scripts/templates/processor.json | 49 +++++++++++++++++++ scripts/templates/source.json | 49 +++++++++++++++++++ .../api/StreamingAdminControllerImpl.java | 7 ++- .../service/StreamingAdminService.java | 3 +- .../service/StreamingAdminServiceImpl.java | 19 ++++--- .../example/demo/DemoApplicationTests.java | 28 ----------- .../api/StreamingAdminControllerTest.java | 40 +++++++++++++++ 13 files changed, 220 insertions(+), 54 deletions(-) create mode 100644 scripts/templates/processor.json create mode 100644 scripts/templates/source.json delete mode 100644 src/test/java/com/example/demo/DemoApplicationTests.java create mode 100644 src/test/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerTest.java diff --git a/.gitignore b/.gitignore index c6625a2..ce482eb 100644 --- a/.gitignore +++ b/.gitignore @@ -30,4 +30,5 @@ application-local.properties .vscode ### Python environment -.venv \ No newline at end of file +.venv +__pycache__ \ No newline at end of file diff --git a/README.md b/README.md index 94464cb..f488b79 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ To build the executable jar file. ``` bash mvn clean package ``` -Executing the above command generates the controllers interfaces based on the streaming api spec(open api format). The spec is part of the doc folder under the source tree. +Executing the above command generates the controllers interfaces based on the streaming api spec(open api format). The spec is part of the doc folder under the source tree. The generation process is controlled by the maven plugin and is configured in the pom.xml (see [configuration options reference](https://openapi-generator.tech/docs/generators/spring)) To run the executable as a spring boot application diff --git a/docs/api/streaming.openapi.yaml b/docs/api/streaming.openapi.yaml index 02a1a7b..9bec77e 100644 --- a/docs/api/streaming.openapi.yaml +++ b/docs/api/streaming.openapi.yaml @@ -1,12 +1,13 @@ openapi: 3.0.0 info: version: 1.0.0 - title: OSDU Streaming Service + title: OSDU Streaming Services license: name: Apache 2.0 url: "http://www.apache.org/licenses/LICENSE-2.0" servers: - - url: "https://localhost:8080/api/streaming/v1" + - url: "http://localhost:8080/api/streaming/v1" + description: "Local development server" tags: - name: stream-admin-api description: Stream Admin API @@ -62,7 +63,7 @@ paths: content: application/json: schema: - $ref: "#/components/schemas/StreamDataset" + $ref: "#/components/schemas/StreamRecord" responses: "201": description: Created @@ -311,6 +312,8 @@ components: $ref: "#/components/schemas/Acl" legal: $ref: "#/components/schemas/Legal" + tags: + $ref: "#/components/schemas/Tags" meta: type: array items: @@ -341,10 +344,12 @@ components: - source - processor - sink - description: 'Type of the stream. Enumeration: "SOURCE", "PROCESSOR", "SINK"' + description: 'Type of the stream. Enumeration: "source", "processor", "sink"' example: "processor" StreamDefinition: $ref: "#/components/schemas/StreamDefinition" + ExtensionProperties: + type: object StreamDefinition: type: object required: @@ -360,7 +365,7 @@ components: items: type: string example: "opendes_4dc4e8ec354e4953b6968fcb1d9d9f38_work-product-component--WellLog_1.0.0" - SubscribeIDs: + SubscribeIds: type: array items: type: string @@ -424,6 +429,10 @@ components: enum: - compliant - incompliant + Tags: + type: object + additionalProperties: + type: object Map: type: object additionalProperties: diff --git a/pom.xml b/pom.xml index 46f0fa2..5a0133f 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 @@ -53,6 +54,11 @@ spring-boot-starter-web + + org.springframework.boot + spring-boot-starter-webflux + + org.springframework.boot spring-boot-starter-test @@ -132,6 +138,7 @@ false true + false diff --git a/scripts/requirements.txt b/scripts/requirements.txt index b60793c..c6bc647 100644 --- a/scripts/requirements.txt +++ b/scripts/requirements.txt @@ -1 +1,4 @@ +Jinja2==3.0.1 +MarkupSafe==2.0.1 +pkg-resources==0.0.0 python-dotenv==0.19.0 diff --git a/scripts/stream.helpers.py b/scripts/stream.helpers.py index 6d6239b..32e5585 100644 --- a/scripts/stream.helpers.py +++ b/scripts/stream.helpers.py @@ -1,8 +1,13 @@ +from http import HTTPStatus import http.client import json import argparse import os from dotenv import load_dotenv +from jinja2 import Environment, FileSystemLoader, select_autoescape +env = Environment( + loader=FileSystemLoader(searchpath="./templates"), + autoescape=select_autoescape()) def refresh_token(): @@ -43,10 +48,34 @@ def get_stream(stream_id): conn.request( "GET", f"/api/streaming/v1/stream/{stream_id}", payload, headers) res = conn.getresponse() - jsonString = res.read() + jsonString = res.read().decode() print(jsonString) +def create_new_stream(stream_id, template_name): + # prepare the request body from template + template = env.get_template(f"{template_name}.json") + # prepare request headers + token = os.getenv('OSDU_API_TOKEN') + auth = f"Bearer {token}" + payload = template.render(record_id=stream_id) + headers = {'Content-Type': 'application/json', + 'data-partition-id': 'opendes', 'Authorization': auth} + # make POST request to create a new stream + conn = http.client.HTTPConnection("localhost:8080") + conn.request( + "POST", f"/api/streaming/v1/stream", payload, headers) + res = conn.getresponse() + jsonString = res.read().decode() + if res.status == HTTPStatus.CREATED: + print("Stream successfully created - see full response below:") + print(jsonString) + else: + print( + f"Received the following status from server: {res.status}. See full response below:") + print(jsonString) + + load_dotenv() parser = argparse.ArgumentParser( description='Little helpers to use for Streaming API testing. They can help you with things like refreshing access tokens, posting a stream record, getting a stream record, etc.') @@ -56,11 +85,14 @@ parser.add_argument('command', type=str, choices=['refresh', 'get', 'post'], help='command to execute') parser.add_argument('--record-id', type=str, help='id of the stream record, e.g. opendes:dataset--Stream.Generic:12345', default='opendes:dataset--Stream.Generic:12345') +parser.add_argument('--template', type=str, help='template for the stream record, e.g. source, processor, sink or your custom', + default='processor') +# main program here args = parser.parse_args() print( f"Running helper [{args.helper}] with the following command [{args.command}]") -print(f"{args}") +print(f"{args._get_args}") if args.helper == "token": if args.command == "refresh": @@ -69,10 +101,9 @@ if args.helper == "token": print(f"No such command for helper [{args.helper}]") elif args.helper == "stream": if args.command == "get": - if args.record_id: - get_stream(args.record_id) - else: - print(f"RECORD_ID cannot be empty with for the [{args.helper}]") + get_stream(args.record_id) + elif args.command == "post": + create_new_stream(args.record_id, args.template) else: print(f"No such command for helper [{args.helper}]") else: diff --git a/scripts/templates/processor.json b/scripts/templates/processor.json new file mode 100644 index 0000000..675f713 --- /dev/null +++ b/scripts/templates/processor.json @@ -0,0 +1,49 @@ +{ + "id": "{{ record_id }}", + "kind": "opendes:wks:dataset--Stream.Generic:1.0.0", + "acl": { + "owners": [ + "data.default.owners@opendes.contoso.com" + ], + "viewers": [ + "data.default.viewers@opendes.contoso.com" + ] + }, + "legal": { + "legaltags": [ + "opendes-public-usa-dataset-epam" + ], + "otherRelevantDataCountries": [ + "US" + ], + "status": "compliant" + }, + "tags": { + "env": "dev-azure", + "auto-cleanup": "yes" + }, + "ancestry": { + "parents": [] + }, + "meta": [], + "data": { + "ResourceSecurityClassification": "opendes:reference-data--ResourceSecurityClassification:RESTRICTED:", + "Source": "Contoso Inc.", + "Name": "Test processor stream", + "Description": "This stream is simulating the Kafka App processor that filters data and routes to myapp topic", + "DatasetProperties": { + "StreamType": "processor", + "StreamDefinition": { + "SubscribeIDs": [ + "opendes:work-product-component--WellLog:be54a691c0384182944d71c6b2b6f699" + ], + "SourceBindings": [ + "opendes_wks_work-product-component--WellLog_1.0.0" + ], + "SinkBindings": [ + "opendes_myapp_work-product-component--WellLog_1.0.0" + ] + } + } + } +} \ No newline at end of file diff --git a/scripts/templates/source.json b/scripts/templates/source.json new file mode 100644 index 0000000..c369690 --- /dev/null +++ b/scripts/templates/source.json @@ -0,0 +1,49 @@ +{ + "id": "{{ record_id }}", + "kind": "opendes:wks:dataset--Stream.Generic:1.0.0", + "acl": { + "owners": [ + "data.default.owners@opendes.contoso.com" + ], + "viewers": [ + "data.default.viewers@opendes.contoso.com" + ] + }, + "legal": { + "legaltags": [ + "opendes-public-usa-dataset-epam" + ], + "otherRelevantDataCountries": [ + "US" + ], + "status": "compliant" + }, + "tags": { + "env": "dev-azure", + "auto-cleanup": "yes" + }, + "ancestry": { + "parents": [] + }, + "meta": [], + "data": { + "ResourceSecurityClassification": "opendes:reference-data--ResourceSecurityClassification:RESTRICTED:", + "Source": "Contoso Inc.", + "Name": "Test sink stream", + "Description": "This stream is simulating the producer application that streams data into OSDU platform", + "DatasetProperties": { + "StreamType": "source", + "StreamDefinition": { + "SubscribeIDs": [ + "opendes:work-product-component--WellLog:be54a691c0384182944d71c6b2b6f699" + ], + "SourceBindings": [ + "wss://remote-etp.server.com" + ], + "SinkBindings": [ + "opendes_wks_work-product-component--WellLog_1.0.0" + ] + } + } + } +} \ No newline at end of file diff --git a/src/main/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerImpl.java b/src/main/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerImpl.java index 30770af..f037b0a 100644 --- a/src/main/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerImpl.java +++ b/src/main/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerImpl.java @@ -15,7 +15,6 @@ package org.opengroup.osdu.streaming.api; import org.opengroup.osdu.streaming.StreamApi; -import org.opengroup.osdu.streaming.model.StreamDataset; import org.opengroup.osdu.streaming.model.StreamRecord; import org.opengroup.osdu.streaming.service.StreamingAdminService; import org.slf4j.Logger; @@ -48,10 +47,10 @@ public class StreamingAdminControllerImpl implements StreamApi { private StreamingAdminService streamingAdminService; @Override - public ResponseEntity createNewStream(String dataPartitionId, StreamDataset streamDataset) { - logger.debug("Creating a new stream: " + streamDataset.getName()); + public ResponseEntity createNewStream(String dataPartitionId, StreamRecord streamRecord) { + logger.debug("Creating a new stream: " + streamRecord.getData().getName()); String id = null; - id = this.streamingAdminService.createNewStream(streamDataset); + id = this.streamingAdminService.createNewStream(streamRecord); if (id != null) { return new ResponseEntity(id, HttpStatus.CREATED); } else { diff --git a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminService.java b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminService.java index bdb9ae5..d2c2c05 100644 --- a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminService.java +++ b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminService.java @@ -14,13 +14,12 @@ package org.opengroup.osdu.streaming.service; -import org.opengroup.osdu.streaming.model.StreamDataset; import org.opengroup.osdu.streaming.model.StreamRecord; public interface StreamingAdminService { StreamRecord getStream(String streamRecordId); - String createNewStream(StreamDataset streamDataset); + String createNewStream(StreamRecord streamRecord); } diff --git a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java index 0ec5c71..bdd4615 100644 --- a/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java +++ b/src/main/java/org/opengroup/osdu/streaming/service/StreamingAdminServiceImpl.java @@ -24,7 +24,6 @@ import org.opengroup.osdu.core.common.model.storage.StorageException; import org.opengroup.osdu.core.common.model.storage.UpsertRecords; import org.opengroup.osdu.core.common.storage.IStorageFactory; import org.opengroup.osdu.core.common.storage.IStorageService; -import org.opengroup.osdu.streaming.model.StreamDataset; import org.opengroup.osdu.streaming.model.StreamRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -90,22 +89,30 @@ public class StreamingAdminServiceImpl implements StreamingAdminService { // easier to expect all of these to be provided by the client, so that we can // pass it to the storage svc without any changes... @Override - public String createNewStream(StreamDataset streamDataset) { + public String createNewStream(StreamRecord streamRecord) { UpsertRecords upRec = null; try { + // get the instance of StorageService logger.debug("Creating a storage service with headers: " + headers.getHeaders().toString()); IStorageService storageService = this.storageFactory.create(headers); + + // convert StreamRecord to Record Record rec = new Record(); - rec.setId("opendes:dataset--Stream.Kafka:12345"); - rec.setKind("opendes:wks:dataset--Stream.Kafka:1.0.0"); - logger.debug("Attempt to create a new record with:\nRecordId: " + rec.getId() + "\nRecordKind: " - + rec.getKind()); + String jsonRepr = new String(); + jsonRepr = objectMapper.writeValueAsString(streamRecord); + logger.debug("Extracted record: \n" + jsonRepr); + rec = objectMapper.readValue(jsonRepr, Record.class); + + // try inserting a new record to storage upRec = storageService.upsertRecord(rec); } catch (StorageException e) { logger.error("Got exception: " + e.getMessage() + "\nFull HTTP Response:" + e.getHttpResponse()); + } catch (JsonProcessingException e) { + logger.error("Got exception: " + e.getMessage() + "\nLocation" + e.getLocation().toString()); } + if (upRec != null && upRec.getRecordCount() > 0) { return upRec.getRecordIds().get(0); } else diff --git a/src/test/java/com/example/demo/DemoApplicationTests.java b/src/test/java/com/example/demo/DemoApplicationTests.java deleted file mode 100644 index 8595e84..0000000 --- a/src/test/java/com/example/demo/DemoApplicationTests.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.example.demo; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; -import org.springframework.boot.test.web.client.TestRestTemplate; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.test.context.junit4.SpringRunner; -import static org.assertj.core.api.Assertions.assertThat; - -@RunWith(SpringRunner.class) -@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) -public class DemoApplicationTests { - - @Test - public void contextLoads() { - } - - @Autowired - private TestRestTemplate restTemplate; - - @Test - public void homeResponse() { - String body = this.restTemplate.getForObject("/", String.class); - assertThat(body).isEqualTo("Spring is here!"); - } -} diff --git a/src/test/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerTest.java b/src/test/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerTest.java new file mode 100644 index 0000000..cd2b206 --- /dev/null +++ b/src/test/java/org/opengroup/osdu/streaming/api/StreamingAdminControllerTest.java @@ -0,0 +1,40 @@ +package org.opengroup.osdu.streaming.api; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.test.web.reactive.server.WebTestClient; + +@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) +public class StreamingAdminControllerTest { + + final String uri = "/stream/"; + final String stream_id = "opendes:dataset--Stream.Generic:12345"; + + // TODO add support for requests with teh valid tokens + + @Autowired + private WebTestClient webTestClient; + + // should fail since no required headers + @Test + void getStreamById_expectHttp400() { + webTestClient.get().uri(uri + stream_id).exchange().expectStatus().isBadRequest(); + } + + // should fail since no JWT token + @Test + void getStreamById_expectHttp401() { + webTestClient.get().uri(uri + stream_id).header("data-partition-id", "opendes").exchange().expectStatus() + .isUnauthorized(); + } + + // should fail since JWT token is invalid + @Test + void getStreamById_expectHttp403() { + webTestClient.get().uri(uri + stream_id).header("data-partition-id", "opendes") + .header("Authorization", "Bearer eyXYZ").exchange().expectStatus().isForbidden(); + } + +} -- GitLab