Commit 3dfb4ed0 authored by Alexandre Vincent's avatar Alexandre Vincent
Browse files

Merge branch 'master' into avincent/pip-tools-dependency-management

Dask version upgrade 2021.6 -> 2021.7 replicated into requirements.in and compiled
requirements.txt
parents c562b48c f484d749
Pipeline #57143 passed with stages
in 15 minutes and 33 seconds
......@@ -25,7 +25,7 @@ from app.bulk_persistence import BulkId
from app.bulk_persistence.dask.errors import BulkNotFound, BulkNotProcessable
from app.bulk_persistence.dask.traces import wrap_trace_process
from app.bulk_persistence.dask.utils import (SessionFileMeta, by_pairs,
do_merge, set_index,
do_merge,
worker_capture_timing_handlers)
from app.helper.logger import get_logger
from app.helper.traces import with_trace
......@@ -106,7 +106,7 @@ class DaskBulkStorage:
dask_client = dask_client or await DaskClient.create()
if DaskBulkStorage.client is not dask_client: # executed only once per dask client
DaskBulkStorage.client = dask_client
if parameters.register_fsspec_implementation:
parameters.register_fsspec_implementation()
......@@ -149,10 +149,17 @@ class DaskBulkStorage:
"""Read a Parquet file into a Dask DataFrame
path : string or list
**kwargs: dict (of dicts) Passthrough key-word arguments for read backend.
read_parquet parameters:
chunksize='25M': if chunk are too small, we aggregate them until we reach chunksize
aggregate_files=True: because we are passing a list of path when commiting a session,
aggregate_files is needed when paths are different
"""
return self._submit_with_trace(dd.read_parquet, path,
engine='pyarrow-dataset',
storage_options=self._parameters.storage_options,
chunksize='25M',
aggregate_files=True,
**kwargs)
def _load_bulk(self, record_id: str, bulk_id: str) -> dd.DataFrame:
......@@ -303,14 +310,11 @@ class DaskBulkStorage:
@internal_bulk_exceptions
async def session_commit(self, session: Session, from_bulk_id: str = None) -> str:
dfs = [self._load(pf) for pf in self._get_next_files_list(session)]
if from_bulk_id:
dfs.insert(0, self._load_bulk(session.recordId, from_bulk_id))
if not dfs:
raise BulkNotProcessable("No data to commit")
if len(dfs) > 1: # set_index is not needed if no merge operations are done
dfs = self._map_with_trace(set_index, dfs)
if from_bulk_id:
dfs.insert(0, self._load_bulk(session.recordId, from_bulk_id))
while len(dfs) > 1:
dfs = [self._submit_with_trace(do_merge, a, b) for a, b in by_pairs(dfs)]
......
......@@ -20,6 +20,8 @@ from logging import INFO
from app.helper.logger import get_logger
from app.utils import capture_timings
import dask.dataframe as dd
def worker_make_log_captured_timing_handler(level=INFO):
"""log captured timing from the worker subprocess (no access to context)"""
......@@ -71,25 +73,25 @@ class SessionFileMeta:
@capture_timings("set_index", handlers=worker_capture_timing_handlers)
def set_index(ddf): # TODO
def set_index(ddf: dd.DataFrame):
"""Set index of the dask dataFrame only if needed."""
if not ddf.known_divisions or '_idx' not in ddf:
if '_idx' not in ddf:
ddf['_idx'] = ddf.index # we need to create a temporary variable to set it as index
ddf['_idx'] = ddf['_idx'].astype(ddf.index.dtype)
return ddf.set_index('_idx', sorted=True)
if not ddf.known_divisions:
return ddf.set_index(ddf.index, sorted=True)
return ddf
@capture_timings("do_merge", handlers=worker_capture_timing_handlers)
def do_merge(df1, df2):
def do_merge(df1: dd.DataFrame, df2: dd.DataFrame):
"""Combine the 2 dask dataframe. Updates df1 with df2 values if overlap."""
if df2 is None:
return df1
df1 = set_index(df1)
df2 = set_index(df2)
if share_items(df1.columns, df2.columns):
ddf = df2.combine_first(df1)
else:
ddf = df1.join(df2, how='outer') # join seems faster when there no columns in common
return ddf[sorted(ddf.columns)]
return ddf
......@@ -140,7 +140,7 @@ steps:
appUrl=${{ parameters.baseUrl }}${{ parameters.basePath }}
echo "Testing App on ${appUrl}"
python tests/integration/gen_postman_env.py --token $(token) --base_url ${appUrl} --cloud_provider ${{ parameters.cloudProvider }} --acl_domain ${{ parameters.aclDomain }} --legal_tag ${{ parameters.legalTag }} --data_partition ${{ parameters.dataPartition }}
pip install virtualenv
......
......@@ -20,7 +20,7 @@ opencensus-ext-ocagent
opencensus-ext-logging
# for chunking feature
dask[distributed]==2021.6.2
dask[distributed]==2021.7.2
fsspec
python-ulid
......
......@@ -108,7 +108,7 @@ cryptography==3.4.7
# osdu-core-lib-python
# osdu-core-lib-python-gcp
# pyjwt
dask[distributed]==2021.6.2
dask[distributed]==2021.7.2
# via
# -r requirements.in
# distributed
......@@ -116,7 +116,7 @@ decorator==5.0.9
# via
# gcsfs
# jsonpath-ng
distributed==2021.06.2
distributed==2021.07.2
# via dask
fastapi==0.65.1
# via
......@@ -255,7 +255,9 @@ osdu-data-ecosystem-storage==1.2.0
osdu-log-recognition-lib==0.0.9
# via -r requirements.in
packaging==21.0
# via google-api-core
# via
# dask
# google-api-core
pandas==1.2.4
# via -r requirements.in
partd==1.2.0
......
{
"info": {
"_postman_id": "08ee8ed2-3963-442e-b118-b51f482225f2",
"name": "Core dependencies test",
"_postman_id": "41aaca62-e223-45e4-9773-1c09ce09e884",
"name": "Core dependencies test V3",
"schema": "https://schema.getpostman.com/json/collection/v2.1.0/collection.json"
},
"item": [
......@@ -15,13 +15,17 @@
"listen": "test",
"script": {
"exec": [
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {",
"const testName = pm.info.requestName + \" [\" + pm.environment.get('base_url') + \"]\"",
"try{",
" pm.response.to.be.ok;",
" pm.response.to.be.withBody;",
" pm.response.to.be.json;",
"",
" pm.expect(pm.response.json().groups).to.not.be.empty;",
"});",
" pm.test(testName, () => true);",
"}catch(e){",
" console.log(resobj);",
" pm.test(testName, () => {throw new Error(e.message)});",
"}",
"",
"",
""
......@@ -70,132 +74,15 @@
"name": "Storage",
"item": [
{
"name": "storage - query all schemas",
"event": [
{
"listen": "test",
"script": {
"exec": [
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {",
" pm.response.to.be.ok;",
" pm.response.to.be.withBody;",
" pm.response.to.be.json;",
" pm.expect(pm.response.json()).to.not.be.empty;",
"});"
],
"type": "text/javascript"
}
}
],
"request": {
"auth": {
"type": "bearer",
"bearer": [
{
"key": "token",
"value": "{{token}}",
"type": "string"
}
]
},
"method": "GET",
"header": [
{
"key": "data-partition-id",
"value": "{{data_partition}}",
"type": "text"
}
],
"url": {
"raw": "{{base_url}}/api/schema-service/v1/schema?source=wks&authority=slb",
"host": [
"{{base_url}}"
],
"path": [
"api",
"schema-service",
"v1",
"schema"
],
"query": [
{
"key": "source",
"value": "wks"
},
{
"key": "authority",
"value": "slb"
}
]
}
},
"response": []
},
{
"name": "storage - get schema from logSetKind",
"event": [
{
"listen": "test",
"script": {
"exec": [
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {",
" pm.response.to.be.ok;",
" pm.response.to.be.withBody;",
" pm.response.to.be.json;",
"",
" pm.expect(pm.response.json()).to.not.be.empty;",
"});",
""
],
"type": "text/javascript"
}
}
],
"request": {
"auth": {
"type": "bearer",
"bearer": [
{
"key": "token",
"value": "{{token}}",
"type": "string"
}
]
},
"method": "GET",
"header": [
{
"key": "data-partition-id",
"value": "{{data_partition}}",
"type": "text"
}
],
"url": {
"raw": "{{base_url}}/api/schema-service/v1/schema/{{logSetKind}}",
"host": [
"{{base_url}}"
],
"path": [
"api",
"schema-service",
"v1",
"schema",
"{{logSetKind}}"
]
}
},
"response": []
},
{
"name": "storage - create logset record Copy",
"name": "storage - create logset record",
"event": [
{
"listen": "test",
"script": {
"exec": [
"const resobj = pm.response.json();",
"",
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {",
"const testName = pm.info.requestName + \" [\" + pm.environment.get('base_url') + \"]\"",
"try{",
" pm.response.to.have.status(201);",
" pm.response.to.be.withBody;",
" pm.response.to.be.json;",
......@@ -203,12 +90,15 @@
" pm.expect(resobj.recordIds.length).to.eql(1);",
" if (resobj.skippedRecordIds)",
" pm.expect(resobj.skippedRecordIds.length).to.eql(0);",
"});",
"",
"// stored the record id for the following tests",
"let record_id = resobj.recordIds[0];",
"console.log(\"created-logset-record-id\", record_id)",
"pm.variables.set(\"created-logset-record-id\", record_id);",
" // stored the record id for the following tests",
" let record_id = resobj.recordIds[0];",
" console.log(\"created-logset-record-id\", record_id)",
" pm.variables.set(\"created-logset-record-id\", record_id);",
" pm.test(testName, () => true);",
"}catch(e){",
" console.log(resobj);",
" pm.test(testName, () => {throw new Error(e.message)});",
"}",
""
],
"type": "text/javascript"
......@@ -280,11 +170,18 @@
"listen": "test",
"script": {
"exec": [
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {",
"const testName = pm.info.requestName + \" [\" + pm.environment.get('base_url') + \"]\"",
"try{",
" pm.response.to.be.ok;",
" pm.response.to.be.withBody;",
" pm.response.to.be.json;",
"});"
" pm.test(testName, () => true);",
"}catch(e){",
" console.log(pm.response.json());",
" pm.test(testName, () => {throw new Error(e.message)});",
"}",
"",
""
],
"type": "text/javascript"
}
......@@ -332,9 +229,14 @@
"listen": "test",
"script": {
"exec": [
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {",
"const testName = pm.info.requestName + \" [\" + pm.environment.get('base_url') + \"]\"",
"try{",
" pm.response.to.have.status(204);",
"});",
" pm.test(testName, () => true);",
"}catch(e){",
" console.log(pm.response.json());",
" pm.test(testName, () => {throw new Error(e.message)});",
"}",
""
],
"type": "text/javascript"
......@@ -393,7 +295,8 @@
"exec": [
"const resobj = pm.response.json();",
"",
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {",
"const testName = pm.info.requestName + \" [\" + pm.environment.get('base_url') + \"]\"",
"try{",
" pm.response.to.have.status(201);",
" pm.response.to.be.withBody;",
" pm.response.to.be.json;",
......@@ -401,7 +304,11 @@
" pm.expect(resobj.recordIds.length).to.eql(1);",
" if (resobj.skippedRecordIds)",
" pm.expect(resobj.skippedRecordIds.length).to.eql(0);",
"});",
" pm.test(testName, () => true);",
"}catch(e){",
" console.log(resobj);",
" pm.test(testName, () => {throw new Error(e.message)});",
"}",
""
],
"type": "text/javascript"
......@@ -475,7 +382,8 @@
"exec": [
"const resobj = pm.response.json();",
"",
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {",
"const testName = pm.info.requestName + \" [\" + pm.environment.get('base_url') + \"]\"",
"try{",
" pm.response.to.have.status(201);",
" pm.response.to.be.withBody;",
" pm.response.to.be.json;",
......@@ -483,7 +391,11 @@
" pm.expect(resobj.recordIds.length).to.eql(1);",
" if (resobj.skippedRecordIds)",
" pm.expect(resobj.skippedRecordIds.length).to.eql(0);",
"});",
" pm.test(testName, () => true);",
"}catch(e){",
" console.log(resobj);",
" pm.test(testName, () => {throw new Error(e.message)});",
"}",
""
],
"type": "text/javascript"
......@@ -557,7 +469,8 @@
"exec": [
"const resobj = pm.response.json();",
"",
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {",
"const testName = pm.info.requestName + \" [\" + pm.environment.get('base_url') + \"]\"",
"try{",
" pm.response.to.have.status(201);",
" pm.response.to.be.withBody;",
" pm.response.to.be.json;",
......@@ -565,7 +478,11 @@
" pm.expect(resobj.recordIds.length).to.eql(1);",
" if (resobj.skippedRecordIds)",
" pm.expect(resobj.skippedRecordIds.length).to.eql(0);",
"});",
" pm.test(testName, () => true);",
"}catch(e){",
" console.log(resobj);",
" pm.test(testName, () => {throw new Error(e.message)});",
"}",
""
],
"type": "text/javascript"
......@@ -639,7 +556,8 @@
"exec": [
"const resobj = pm.response.json();",
"",
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {",
"const testName = pm.info.requestName + \" [\" + pm.environment.get('base_url') + \"]\"",
"try{",
" pm.response.to.have.status(201);",
" pm.response.to.be.withBody;",
" pm.response.to.be.json;",
......@@ -647,7 +565,11 @@
" pm.expect(resobj.recordIds.length).to.eql(1);",
" if (resobj.skippedRecordIds)",
" pm.expect(resobj.skippedRecordIds.length).to.eql(0);",
"});",
" pm.test(testName, () => true);",
"}catch(e){",
" console.log(resobj);",
" pm.test(testName, () => {throw new Error(e.message)});",
"}",
""
],
"type": "text/javascript"
......@@ -742,13 +664,18 @@
"listen": "test",
"script": {
"exec": [
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {\r",
"const testName = pm.info.requestName + \" [\" + pm.environment.get('base_url') + \"]\"\r",
"try{\r",
" pm.response.to.be.ok;\r",
" pm.response.to.be.withBody;\r",
" pm.response.to.be.json;\r",
"\r",
" pm.expect(pm.response.json().results).to.not.be.empty;\r",
"});\r",
" pm.test(testName, () => true);\r",
"}catch(e){\r",
" console.log(pm.response.json());\r",
" pm.test(testName, () => {throw new Error(e.message)});\r",
"}\r",
""
],
"type": "text/javascript"
......@@ -822,13 +749,18 @@
"listen": "test",
"script": {
"exec": [
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {\r",
"const testName = pm.info.requestName + \" [\" + pm.environment.get('base_url') + \"]\"\r",
"try{\r",
" pm.response.to.be.ok;\r",
" pm.response.to.be.withBody;\r",
" pm.response.to.be.json;\r",
"\r",
" pm.expect(pm.response.json().results).to.not.be.empty;\r",
"});\r",
" pm.test(testName, () => true);\r",
"}catch(e){\r",
" console.log(pm.response.json());\r",
" pm.test(testName, () => {throw new Error(e.message)});\r",
"}\r",
""
],
"type": "text/javascript"
......@@ -891,13 +823,18 @@
"listen": "test",
"script": {
"exec": [
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {\r",
"const testName = pm.info.requestName + \" [\" + pm.environment.get('base_url') + \"]\"\r",
"try{\r",
" pm.response.to.be.ok;\r",
" pm.response.to.be.withBody;\r",
" pm.response.to.be.json;\r",
"\r",
" pm.expect(pm.response.json().results).to.not.be.empty;\r",
"});\r",
" pm.test(testName, () => true);\r",
"}catch(e){\r",
" console.log(pm.response.json());\r",
" pm.test(testName, () => {throw new Error(e.message)});\r",
"}\r",
""
],
"type": "text/javascript"
......@@ -965,13 +902,18 @@
"listen": "test",
"script": {
"exec": [
"pm.test(pm.info.requestName + \" [\" + pm.variables.get('osduBaseUrl') + \"]\", function () {\r",
"const testName = pm.info.requestName + \" [\" + pm.environment.get('base_url') + \"]\"\r",
"try{\r",
" pm.response.to.be.ok;\r",
" pm.response.to.be.withBody;\r",
" pm.response.to.be.json;\r",
"\r",
" pm.expect(pm.response.json().results).to.not.be.empty;\r",
"});\r",
" pm.test(testName, () => true);\r",
"}catch(e){\r",
" console.log(pm.response.json());\r",
" pm.test(testName, () => {throw new Error(e.message)});\r",
"}\r",
""
],
"type": "text/javascript"
......@@ -1033,6 +975,145 @@
"response": []
}
]
},
{
"name": "Schemas",
"item": [
{
"name": "schemas - get schemas DDMS",
"event": [
{
"listen": "test",
"script": {
"exec": [
"//This test doesnt match with the other tests based on a try catch for a better way to manage errors\r",
"//the main request ( {{base_url}}/api/schema-service/v1/schema?source={{sourceKind}}&scope=SHARED ) will be run once\r",
"//based on this request the script will complete the URL to check all schemas listed in a json file\r",
"var getRequest = pm.request;\r",
"var url_variables = pm.request.url.toString()+\"&authority={{authorityKindName}}&entityType={{entityTypeName}}\";\r",
"var schemasVersionsListJson = JSON.parse(pm.environment.get(\"schemas_versions_list_json\")); //set as environment variable and not collection variable\r",
"var authoritiesCount = Object.keys(schemasVersionsListJson).length\r",
"\r",
"pm.test(pm.info.requestName + \" [\" + pm.environment.get('base_url') + \"]\", function () {\r",
"\r",
" for (let i = 0; i < authoritiesCount; i++) {\r",
" var currentAuthority = Object.keys(schemasVersionsListJson)[i];\r",
"\r",
" pm.test(\"Test on authority :\"+currentAuthority, function () {\r",
" pm.variables.set(\"authorityKindName\", currentAuthority);\r",
" var entitiesCount = Object.keys(schemasVersionsListJson[currentAuthority]).length\r",
"\r",
" for (let j = 0; j < entitiesCount; j++) {\r",
" var entityTypeName = Object.keys(schemasVersionsListJson[currentAuthority])[j];\r",
" pm.variables.set(\"entityTypeName\", entityTypeName);\r",
" getRequest.url = pm.variables.replaceIn(url_variables);\r",
"\r",
" pm.sendRequest(getRequest, (error, response) => {\r",
" console.log(\"Request : \" + getRequest.url.toString());\r",
" console.log(error ? error : response.json());\r",
"\r",
" pm.expect(response).to.be.ok;\r",
" pm.expect(response).to.be.json;\r",
"\r",
" const responseJson = response.json();\r",