Commit c921d84f authored by Luc Yriarte's avatar Luc Yriarte
Browse files

SLB code push 2

parent 36b786b7
......@@ -22,10 +22,14 @@ python-packages
# Unit test / coverage reports
.pytest_cache/
htmlcov
.coverage*
coverage.xml
unit_tests_report.xml
# Environments
.env
.venv
.venv*
env/
venv/
venv*/
......@@ -35,5 +39,5 @@ venv.bak/
.envs/
secrets/
.DS_Store
deploy/.DS_Store
**/.DS_Store
......@@ -2,33 +2,31 @@
Wellbore Data Management Services (WDMS) Open Subsurface Data Universe (OSDU) is one of the several backend services that comprise Schlumberger's Exploration and Production (E&P) software ecosystem. It is a single, containerized service written in Python that provides an API for wellbore related data.
[[_TOC_]]
## Install Software and Packages
1. Clone the os-wellbore-ddms repository
2. Download [Python](https://www.python.org/downloads/) >=3.7
3. Ensure pip, a pre-installed package manager and installer for Python, is installed and is upgraded to the latest version.
Windows:
```bash
py -m pip install --upgrade pip
py -m pip --version
```
Linux and macOS:
```bash
# Windows
python -m pip install --upgrade pip
python -m pip --version
```bash
python3 -m pip install --upgrade pip
python3 -m pip --version
```
# macOS and Linux
python3 -m pip install --upgrade pip
python3 -m pip --version
```
4. Using pip, download [FastAPI](https://fastapi.tiangolo.com/), the main framework to build the service APIs. To install fastapi and uvicorn (to work as the server), run the following command:
```bash
pip install fastapi[all]
```
```bash
pip install fastapi[all]
```
5. venv - venv allows you to manage separate package installations for different projects. They essentially allow you to create a "virtual" isolated Python installation and packages into that virtual environment. venv is already included in the Python standard library and requires no addtional installation.
5. [venv](https://docs.python.org/3/library/venv.html) allows you to manage separate package installations for different projects. They essentially allow you to create a "virtual" isolated Python installation and packages into that virtual environment. venv is already included in the Python standard library and requires no additional installation.
### Fast API Dependencies
......@@ -53,6 +51,9 @@ Linux and macOS:
- Implementation of blob storage on GCP
- osdu-core-python-gcp
- Implementation of blob storage and partition service on Azure
- osdu-core-python-azure
- Storage, search and entitlements
- osdu-python-clients
......@@ -60,56 +61,48 @@ Linux and macOS:
### Run the service locally
1. Create [virtual](https://pypi.org/project/virtualenv/) environment in the wellbore project directory. This will create a folder inside of the wellbore project directory. For example: ~/os-wellbore-ddms/nameofvirtualenv
1. Create virtual environment in the wellbore project directory. This will create a folder inside of the wellbore project directory. For example: ~/os-wellbore-ddms/nameofvirtualenv
Windows:
```bash
# Windows
python -m venv env
```bash
py -m venv env
```
on macOS and Linux:
```bash
python3 -m venv env
```
# macOS/Linux
python3 -m venv env
```
2. Activate the virtual environment
Windows:
```bash
env/Scripts/activate
```
```bash
# Windows
source env/Scripts/activate
macOS and Linux:
```bash
source env/bin/activate
```
# macOS/Linux
source env/bin/activate
```
3. Create pip.ini (Windows) or pip.conf (Mac) file inside the venv directory. This allows us to set a global index url which can download packages from specific sources.
3. Create pip.ini (Windows) or pip.conf (MacOS and Linux) file inside the `env` directory. This allows us to set a global index url which can download packages/libraries needed from the AzDO artifacts. There are several ways to add this extra index url:
Note: It is also possible to use [--extra-index-url parameter](https://pip.pypa.io/en/stable/reference/pip_install/#install-extra-index-url) to specify it on the pip install cmd inline
- It is also possible to use [--extra-index-url](https://pip.pypa.io/en/stable/reference/pip_install/#install-extra-index-url) parameter to specify it on the pip install cmd inline
4. Install dependencies
```bash
pip install -r requirements.txt
```
```bash
pip install -r requirements.txt
```
5. Run the service
```bash
# Run the service which will default to http://127.0.0.1:8097
python main.py
```bash
# Run the service which will default to http://127.0.0.1:8080
python main.py
# Run on specific host, port and enforce dev mode
python main.py --host MY_HOST --port MY_PORT --dev_mode 1
```
# Run on specific host, port and enforce dev mode
python main.py --host MY_HOST --port MY_PORT --dev_mode 1
```
If host is `127.0.0.1` or `localhost`, the dev_mode is automatically set to True.
The only significant change if dev_mode is on, is that configuration errors at startup are logged but don’t prevent the service to run, and allow to override some implementations.
If host is `127.0.0.1` or `localhost`, the dev_mode is automatically set to True.
The only significant change if dev_mode is on, is that configuration errors at startup are logged but don’t prevent the service to run, and allow to override some implementations.
The hosts for the entitlements, search and storage services have to be provided as environment variables, or on the command line.
......@@ -121,65 +114,145 @@ python main.py -e SERVICE_HOST_ENTITLEMENTS https://api.example.com/entitlements
1. Generate bearer token as all APIs but `/about` require authentication.
- Navigate to `http://127.0.0.1:8097/token` and follow the steps to generate a bearer token.
- Navigate to `http://127.0.0.1:8080/token` and follow the steps to generate a bearer token.
- Navigate to `http://127.0.0.1:8097/docs`. Click `Authorize` and enter your token. That will allow for authenticated requests.
- Navigate to `http://127.0.0.1:8080/docs`. Click `Authorize` and enter your token. That will allow for authenticated requests.
2. Choose storage option
Even if the service runs locally it still relies on osdu data ecosystem storage service to store documents and google blob store to store binary data (`bulk data`). It is possible to override this and use your local file system instead by setting the following environment variables:
Even if the service runs locally it still relies on osdu data ecosystem storage service `os-storage-dot-opendes.appspot.com/api/storage` to store documents and google blob store to store binary data (`bulk data`). It is possible to override this and use your local file system instead by setting the following environment variables:
- `USE_INTERNAL_STORAGE_SERVICE_WITH_PATH` to store on a local folder instead of osdu ecosystem storage service.
- `USE_LOCALFS_BLOB_STORAGE_WITH_PATH` to store on a local folder instead of google blob storage.
- `USE_INTERNAL_STORAGE_SERVICE_WITH_PATH` to store on a local folder instead of osdu ecosystem storage service.
- `USE_LOCALFS_BLOB_STORAGE_WITH_PATH` to store on a local folder instead of google blob storage.
```bash
# Create temp storage folders
mkdir tmpstorage
mkdir tmpblob
```bash
# Create temp storage folders
mkdir tmpstorage
mkdir tmpblob
# Set your repo path
path="C:/source"
# Set your repo path
path="C:/source"
python main.py -e USE_INTERNAL_STORAGE_SERVICE_WITH_PATH $path/os-wellbore-ddms/tmpstorage -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH $path/os-wellbore-ddms/tmpblob
```
python main.py -e USE_INTERNAL_STORAGE_SERVICE_WITH_PATH $path/os-wellbore-ddms/tmpstorage -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH $path/os-wellbore-ddms/tmpblob
```
3. Choose Cloud Provider
- The code can be run with specifying environment variables and by setting the cloud provider. The accepted values are `gcp`, `az` or `local`. When a cloud provider is passed as an environment variables, there are certain additional environment variables that become mandatory.
### Setting the Cloud Provider Environment Variables
- The following environment variables are required when the cloud provider is set to GCP:
- OS_WELLBORE_DDMS_DATA_PROJECT_ID: GCP Data Tenant ID
- OS_WELLBORE_DDMS_DATA_PROJECT_CREDENTIALS: path to the key file of the SA to access the data tenant
- SERVICE_HOST_ENTITLEMENTS: The Entitlements Service host
- SERVICE_HOST_SEARCH: The Search Service host
- SERVICE_HOST_STORAGE: The Storage Service host
```bash
python main.py -e CLOUD_PROVIDER gcp \
-e OS_WELLBORE_DDMS_DATA_PROJECT_ID projectid \
-e OS_WELLBORE_DDMS_DATA_PROJECT_CREDENTIALS pathtokeyfile \
-e SERVICE_HOST_ENTITLEMENTS entitlement_host \
-e SERVICE_HOST_SEARCH search_host \
-e SERVICE_HOST_STORAGE storage_host
```
- The following environment variables are required when the cloud provider is set to Azure:
- AZ_AI_INSTRUMENTATION_KEY: Azure Application Insights instrumentation key
- SERVICE_HOST_ENTITLEMENTS: The Entitlements Service host
- SERVICE_HOST_SEARCH: The Search Service host
- SERVICE_HOST_STORAGE: The Storage Service host
- SERVICE_HOST_PARTITION: The Partition Service internal host
- KEYVAULT_URL: The Key Vault url (needed by the Partition Service)
- USE_PARTITION_SERVICE: `enabled` when Partition Service is available in the environment. Needs to be `disabled` for `dev` or to run locally.
```bash
python main.py -e CLOUD_PROVIDER az \
-e AZ_AI_INSTRUMENTATION_KEY instrumentationkey \
-e SERVICE_HOST_ENTITLEMENTS entitlement_host \
-e SERVICE_HOST_SEARCH search_host \
-e SERVICE_HOST_STORAGE storage_host \
-e SERVICE_HOST_PARTITION partition_host \
-e KEYVAULT_URL keyvault_url \
-e USE_PARTITION_SERVICE disabled
```
Note: If you're running locally, you may need to provide environmental variables in your IDE. Here is a sample for providing a `.env` file.
As default, all Core Services endpoint values are set to `None` in `app/conf.py`, you can update `.env` file for core services endpoints based on your cloud provider.
### Create a log record
To create a `log` record, below is a payload sample for the PUT `/ddms/v2/logs` API. The response will contain an id you can use on the `/ddms/v2/logs/{logid}/data` to create some bulk data.
```bash
[{
"data": {
"log": {
"family": "Gamma Ray",
"familyType": "Gamma Ray",
"format": "float64",
"mnemonic": "GR",
"name": "GAMM",
"unitKey": "gAPI"
}
},
"kind": "opendes:osdu:log:1.0.5",
"namespace": "opendes:osdu",
"legal": {
"legaltags": [
"opendes-public-usa-dataset-1"
],
"otherRelevantDataCountries": [
"US"
],
"status": "compliant"
},
"acl": {
"viewers": [
"data.default.viewers@opendes.p4d.cloud.slb-ds.com"
],
"owners": [
"data.default.owners@opendes.p4d.cloud.slb-ds.com"
]
},
"type": "log"
}
]
```
- GCP
```json
[{
"data": {
"log": {
"family": "Gamma Ray",
"familyType": "Gamma Ray",
"format": "float64",
"mnemonic": "GR",
"name": "GAMM",
"unitKey": "gAPI"
}
},
"kind": "opendes:osdu:log:1.0.5",
"namespace": "opendes:osdu",
"legal": {
"legaltags": [
"opendes-public-usa-dataset-1"
],
"otherRelevantDataCountries": [
"US"
],
"status": "compliant"
},
"acl": {
"viewers": [
"data.default.viewers@opendes.p4d.cloud.slb-ds.com"
],
"owners": [
"data.default.owners@opendes.p4d.cloud.slb-ds.com"
]
},
"type": "log"
}
]
```
- MVP
```json
[
{
"acl": {
"owners": [
"data.default.owners@opendes.contoso.com"
],
"viewers": [
"data.default.viewers@opendes.contoso.com"
]
},
"data": {
"name": "wdms_e2e_log"
},
"kind": "opendes:wks:log:1.0.5",
"legal": {
"legaltags": [
"opendes-storage-1603197111615"
],
"otherRelevantDataCountries": [
"US",
"FR"
]
}
}
]
```
### Run with Uvicorn
......@@ -213,13 +286,13 @@ docker build -t=$IMAGE_TAG --rm . -f ./build/dockerfile --build-arg PIP_EXTRA_UR
1. Run the image
Replace the LOCAL_PORT value with a local port
Replace the LOCAL_PORT value with a local port
```bash
LOCAL_PORT=<local_port>
```bash
LOCAL_PORT=<local_port>
docker run -d -p $LOCAL_PORT:8097 -e OS_WELLBORE_DDMS_DEV_MODE=1 -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH=1 $IMAGE_TAG
```
docker run -d -p $LOCAL_PORT:8080 -e OS_WELLBORE_DDMS_DEV_MODE=1 -e USE_LOCALFS_BLOB_STORAGE_WITH_PATH=1 $IMAGE_TAG
```
2. Access app on `http://localhost:LOCAL_PORT/docs`
......@@ -227,9 +300,9 @@ docker run -d -p $LOCAL_PORT:8097 -e OS_WELLBORE_DDMS_DEV_MODE=1 -e USE_LOCALFS_
4. Logs can be checked by running
```bash
docker logs CONTAINER_ID
```
```bash
docker logs CONTAINER_ID
```
### Run Unit Tests Locally
......@@ -244,16 +317,11 @@ Coverage reports can be viewed after the command is run. The HMTL reports are sa
### Port Forward from Kubernetes
1.List the pods
```bash
kubectl get pods
```
1. List the pods: `kubectl get pods`
2. Port forward: `kubectl port-forward pods/POD_NAME LOCAL_PORT:8080`
3. Access it on `http://localhost:LOCAL_PORT/docs`
2.Port forward
```bash
kubectl port-forward pods/POD_NAME LOCAL_PORT:8097
```
### Tracing
3.Access it on `http://localhost:LOCAL_PORT/docs`
OpenCensus libraries are used to record incoming requests metrics (execution time, result code, etc...).
At the moment, 100% of the requests are saved.
from .bulk_id import BulkId
from .dataframe_persistence import create_and_store_dataframe, get_dataframe
from .dataframe_serializer import DataframeSerializer
from .json_orient import JSONOrient
from .mime_types import MimeTypes
from .tenant_provider import resolve_tenant
......@@ -7,6 +7,7 @@ class BlobBulk:
"""
represents a bulk bloblified, which means serialized in some way. data is expected to be an io.IOBase
"""
id: str
""" identifier """
data: Any = None
......
import asyncio
import uuid
from asyncio import iscoroutinefunction
from typing import Any, Tuple, Callable, Coroutine, Union, Dict, NamedTuple, Optional
from contextlib import asynccontextmanager
from os import remove, path
from io import BytesIO
import uuid
from os import path, remove
from typing import (
Any,
Callable,
Coroutine,
Dict,
NamedTuple,
Optional,
Tuple,
Union,
)
import pandas as pd
import pyarrow as pa
import pyarrow.feather as feather
import pyarrow.parquet as pq
from app.model.blob_bulk import BlobBulk
from app.utils import get_process_pool_executor
from app.storage.mime_types import MimeTypes, MimeType
from app.utils import get_wdms_temp_dir
from app.utils import get_pool_executor, get_wdms_temp_dir
from .blob_bulk import BlobBulk
from .mime_types import MimeType, MimeTypes
# Here are functions to (de)serializing) bulk data only, no knowledge at all regarding the domain models, only raw data
# here
......@@ -27,10 +38,17 @@ from app.utils import get_wdms_temp_dir
# - better proc fork and arg serialization
def export_to_parquet(path_like: str, dataframe: pd.DataFrame) -> Tuple[str, Dict[str, str]]:
def export_to_parquet(
path_like: str, dataframe: pd.DataFrame
) -> Tuple[str, Dict[str, str]]:
# parquet v2 has less restrictions concerning format (for example number as column name)
pq.write_table(pa.Table.from_pandas(dataframe, preserve_index=True), path_like, version='2.0', compression='snappy')
return path_like, {'content_type': MimeTypes.PARQUET.type}
pq.write_table(
pa.Table.from_pandas(dataframe, preserve_index=True),
path_like,
version="2.0",
compression="snappy",
)
return path_like, {"content_type": MimeTypes.PARQUET.type}
def load_from_parquet(data) -> pd.DataFrame:
......@@ -40,9 +58,15 @@ def load_from_parquet(data) -> pd.DataFrame:
return pq.read_table(data).to_pandas()
def export_to_feather(filename: str, dataframe: pd.DataFrame) -> Tuple[str, Dict[str, str]]:
feather.write_feather(pa.Table.from_pandas(dataframe, preserve_index=True), filename, compression='lz4')
return filename, {'content_type': MimeTypes.FEATHER.type}
def export_to_feather(
filename: str, dataframe: pd.DataFrame
) -> Tuple[str, Dict[str, str]]:
feather.write_feather(
pa.Table.from_pandas(dataframe, preserve_index=True),
filename,
compression="lz4",
)
return filename, {"content_type": MimeTypes.FEATHER.type}
def load_from_feather(data) -> pd.DataFrame:
......@@ -55,8 +79,7 @@ def load_from_feather(data) -> pd.DataFrame:
class BlobFileExporter(NamedTuple):
mime_type: MimeType
writer_fn: Union[
Callable[[str, pd.DataFrame], Any],
Coroutine[str, pd.DataFrame, Any]
Callable[[str, pd.DataFrame], Any], Coroutine[str, pd.DataFrame, Any]
]
def match(self, str_value: str) -> bool:
......@@ -73,14 +96,13 @@ class BlobFileExporters:
return BlobFileExporters.PARQUET
if BlobFileExporters.FEATHER.match(value):
return BlobFileExporters.FEATHER
raise KeyError('unknown file type ' + value)
raise KeyError("unknown file type " + value)
class BlobFileImporter(NamedTuple):
mime_type: MimeType
reader_fn: Union[
Callable[[str, pd.DataFrame], Any],
Coroutine[str, pd.DataFrame, Any]
Callable[[str, pd.DataFrame], Any], Coroutine[str, pd.DataFrame, Any]
]
def match(self, str_value: str) -> bool:
......@@ -115,7 +137,7 @@ async def _run_export_to_file_in_executor(filename: str,
def get_default_exporter_executor():
return get_process_pool_executor()
return get_pool_executor()
@asynccontextmanager
......
import uuid
from typing import Optional
class BulkId:
@staticmethod
def new_bulk_id() -> str:
return str(uuid.uuid4())
@classmethod
def bulk_urn_encode(cls, bulk_id: str) -> str:
return uuid.UUID(bulk_id).urn
@classmethod
def bulk_urn_decode(cls, urn: str) -> Optional[str]:
return str(uuid.UUID(urn))
import io
import pandas as pd
from osdu.core.api.storage.blob_storage_base import BlobStorageBase
from app.utils import Context
from .blob_storage import (
BlobBulk,
BlobFileExporters,
create_and_write_blob,
read_blob,
)
from .bulk_id import BulkId
from .mime_types import MimeTypes
from .tenant_provider import resolve_tenant
async def create_and_store_dataframe(ctx: Context, df: pd.DataFrame) -> str:
"""Store bulk on a blob storage"""
new_bulk_id = BulkId.new_bulk_id()
tenant = await resolve_tenant(ctx.partition_id)
async with create_and_write_blob(
df, file_exporter=BlobFileExporters.PARQUET, blob_id=new_bulk_id
) as bulkblob:
storage: BlobStorageBase = await ctx.app_injector.get(BlobStorageBase)
await storage.upload(
tenant,
bulkblob.id,
bulkblob.data,
content_type=bulkblob.content_type,
metadata=bulkblob.metadata,
)
return bulkblob.id
async def get_dataframe(ctx: Context, bulk_id: str) -> pd.DataFrame:
""" fetch bulk from a blob storage, provide column major """
tenant = await resolve_tenant(ctx.partition_id)
storage: BlobStorageBase = await ctx.app_injector.get(BlobStorageBase)
bytes_data = await storage.download(tenant, bulk_id)
# for now use fix parquet format saving one call
# meta_data = await storage.download_metadata(tenant.project_id, tenant.bucket_name, bulk_id)
# content_type = meta_data.metadata["content_type"]
blob = BlobBulk(
id=bulk_id,
data=io.BytesIO(bytes_data),
content_type=MimeTypes.PARQUET.type,
)
data_frame = await read_blob(blob)
return data_frame
import json
from io import BytesIO
from typing import Union, AnyStr, IO, Optional
from enum import Enum
from typing import Union, AnyStr, IO, Optional, List
from pathlib import Path
import json
import pandas as pd
import numpy as np
from app.storage.mime_types import MimeTypes
import pandas as pd