wdms_app.py 10.5 KB
Newer Older
ethiraj krishnamanaidu's avatar
ethiraj krishnamanaidu committed
1
2
3
4
5
6
7
8
9
10
11
12
13
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Kin Jin Ng's avatar
Kin Jin Ng committed
14
import sys
15
16
17
from os import getpid
import asyncio
from time import sleep
ethiraj krishnamanaidu's avatar
ethiraj krishnamanaidu committed
18

19
20
21
from fastapi import FastAPI, Depends
from fastapi.openapi.utils import get_openapi

Luc Yriarte's avatar
Luc Yriarte committed
22
from app import __version__, __build_number__, __app_name__
23
24
from app.auth.auth import require_opendes_authorized_user
from app.conf import Config, check_environment
Luc Yriarte's avatar
Luc Yriarte committed
25
from app.errors.exception_handlers import add_exception_handlers
26
from app.modules import discoverer
27

Luc Yriarte's avatar
Luc Yriarte committed
28
29
30
31
32
from app.helper import traces, logger
from app.injector.app_injector import AppInjector
from app.injector.main_injector import MainInjector
from app.middleware import CreateBasicContextMiddleware, TracingMiddleware
from app.middleware.basic_context_middleware import require_data_partition_id
33
from app.routers import probes, about, sessions
Luc Yriarte's avatar
Luc Yriarte committed
34
35
36
37
38
39
40
from app.routers.ddms_v2 import (
    ddms_v2,
    wellbore_ddms_v2,
    logset_ddms_v2,
    marker_ddms_v2,
    log_ddms_v2,
    well_ddms_v2
41
)
42
43
44
45
46
47
from app.routers.ddms_v3 import (
    wellbore_ddms_v3,
    well_ddms_v3,
    welllog_ddms_v3,
    wellbore_trajectory_ddms_v3,
    markerset_ddms_v3)
48
from app.routers.bulk import bulk_routes
Luc Yriarte's avatar
Luc Yriarte committed
49
50
from app.routers.trajectory import trajectory_ddms_v2
from app.routers.dipset import dipset_ddms_v2, dip_ddms_v2
fabian serin's avatar
fabian serin committed
51
from app.routers.search import search, fast_search, search_v3, fast_search_v3, search_v3_alpha
Luc Yriarte's avatar
Luc Yriarte committed
52
from app.clients import StorageRecordServiceClient, SearchServiceClient
53
54
55
56
from app.utils import (
    get_http_client_session,
    OpenApiHandler,
    get_wdms_temp_dir,
Jeremie Hallal's avatar
Jeremie Hallal committed
57
58
    run_in_pool_executor,
    DaskClient,
59
    POOL_EXECUTOR_MAX_WORKER)
Jeremie Hallal's avatar
Jeremie Hallal committed
60
61
62
63
64
65
66
67
68
from app.routers.bulk.utils import (
    update_operation_ids,
    set_v3_input_dataframe_check,
    set_legacy_input_dataframe_check,
)
from app.routers.bulk.bulk_uri_dependencies import (
    set_osdu_bulk_id_access,
    set_log_bulk_id_access
)
69

Luc Yriarte's avatar
Luc Yriarte committed
70
base_app = FastAPI()
71

72
# The sub application which contains all the routers
73
74
75
76
77
78
79
wdms_app = FastAPI(title=__app_name__,
                   description='build ' + __build_number__,
                   version=__version__,
                   )

app_injector = AppInjector()

Luc Yriarte's avatar
Luc Yriarte committed
80
81
base_app.mount(Config.openapi_prefix.value, wdms_app)

82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109

def custom_openapi(*args, **kwargs):
    if wdms_app.openapi_schema:
        return wdms_app.openapi_schema
    openapi_schema = get_openapi(
        title=wdms_app.title,
        version=wdms_app.version,
        description=wdms_app.description,
        routes=wdms_app.routes,
        servers=wdms_app.servers
    )

    routes_in_schemas = [route for route in wdms_app.routes if getattr(route, 'include_in_schema', True)]
    OpenApiHandler(openapi_schema, [getattr(route, 'operation_id', None) for route in routes_in_schemas])

    wdms_app.openapi_schema = openapi_schema
    return wdms_app.openapi_schema


wdms_app.openapi = custom_openapi


def hide_router_modules(modules):
    for mod in modules:
        for rte in mod.router.routes:
            rte.include_in_schema = False


110
111
112
113
114
115
def executor_startup_task():
    """ This is a dummy task used to startup executors"""
    print(f"process {getpid()} started")
    sleep(0.2)  # to keep executor "busy"


Luc Yriarte's avatar
Luc Yriarte committed
116
@base_app.on_event("startup")
117
async def startup_event():
118
119
120
    service_name = Config.service_name.value

    logger.init_logger(service_name=service_name)
121
122
123
    check_environment(Config)
    print('using temporary directory:', get_wdms_temp_dir())
    MainInjector().configure(app_injector)
124
    wdms_app.trace_exporter = traces.create_exporter(service_name=service_name)
125

Jeremie Hallal's avatar
Jeremie Hallal committed
126
127
128
129
    # seems that the lock is not in the same event loop as requests
    # so we need to wait instead of just fire a task
    asyncio.create_task(DaskClient.create())

130
131
132
133
    # init executor pool
    logger.get_logger().info("Startup process pool executor")

    # force to adjust process count now instead of on first demand
Jeremie Hallal's avatar
Jeremie Hallal committed
134
135
    for _ in range(POOL_EXECUTOR_MAX_WORKER):
        asyncio.create_task(run_in_pool_executor(executor_startup_task))
136

Luc Yriarte's avatar
Luc Yriarte committed
137
138
139
    if Config.alpha_feature_enabled.value:
        enable_alpha_feature()

140
    add_modules_routers()
Kin Jin Ng's avatar
Kin Jin Ng committed
141

142

Luc Yriarte's avatar
Luc Yriarte committed
143
@base_app.on_event('shutdown')
144
async def shutdown_event():
Luc Yriarte's avatar
Luc Yriarte committed
145
146
147
148
149
150
151
152
153
    # clients close
    storage_client = await app_injector.get(StorageRecordServiceClient)
    if storage_client is not None:
        await storage_client.api_client.close()

    search_client = await app_injector.get(SearchServiceClient)
    if search_client is not None:
        await storage_client.api_client.close()

154
    await get_http_client_session().close()
Jeremie Hallal's avatar
Jeremie Hallal committed
155
    await DaskClient.close()
156

Kin Jin Ng's avatar
Kin Jin Ng committed
157

158
159
160
DDMS_V2_PATH = '/ddms/v2'
DDMS_V3_PATH = '/ddms/v3'
ALPHA_APIS_PREFIX = '/alpha'
161
162
basic_dependencies = [
    Depends(require_data_partition_id, use_cache=False),
163
    Depends(require_opendes_authorized_user, use_cache=False),
164
]
165
166

wdms_app.include_router(probes.router)
167
wdms_app.include_router(about.router, tags=["Wellbore DDMS"])
168

169
170
# hidden from swagger but maintained for backward compatibility with /ddms/v2 APIs
wdms_app.include_router(about.router, prefix=DDMS_V2_PATH, tags=["Wellbore DDMS"], include_in_schema=False)
171
wdms_app.include_router(ddms_v2.router, prefix=DDMS_V2_PATH, tags=["Wellbore DDMS"], include_in_schema=False)
172

173
174
175
176
177
178
179
ddms_v2_routes_groups = [
    (well_ddms_v2, "Well"),
    (wellbore_ddms_v2, "Wellbore"),
    (logset_ddms_v2, "Logset"),
    (trajectory_ddms_v2, "Trajectory"),
    (marker_ddms_v2, "Marker"),
    (log_ddms_v2, "Log"),
Luc Yriarte's avatar
Luc Yriarte committed
180
181
    (dipset_ddms_v2, "Dipset"),
    (dip_ddms_v2, "Dips"),
182
]
183
184
for v2_api, tag in ddms_v2_routes_groups:
    wdms_app.include_router(v2_api.router,
185
                            prefix=DDMS_V2_PATH,
186
187
                            tags=[tag],
                            dependencies=basic_dependencies)
Luc Yriarte's avatar
Luc Yriarte committed
188
189
190
191
192

ddms_v3_routes_groups = [
    (wellbore_ddms_v3, "Wellbore"),
    (well_ddms_v3, "Well"),
    (welllog_ddms_v3, "WellLog"),
193
    (wellbore_trajectory_ddms_v3, "Trajectory v3"),
194
    (markerset_ddms_v3, "Marker"),
195

Luc Yriarte's avatar
Luc Yriarte committed
196
]
197
198
for v3_api, tag in ddms_v3_routes_groups:
    wdms_app.include_router(v3_api.router,
199
                            prefix=DDMS_V3_PATH,
200
201
                            tags=[tag],
                            dependencies=basic_dependencies)
202

203
204
wdms_app.include_router(search.router, prefix='/ddms', tags=['search'], dependencies=basic_dependencies)
wdms_app.include_router(fast_search.router, prefix='/ddms', tags=['fast-search'], dependencies=basic_dependencies)
205

fabian serin's avatar
fabian serin committed
206
207
wdms_app.include_router(search_v3.router, prefix=DDMS_V3_PATH, tags=['search v3'], dependencies=basic_dependencies)
wdms_app.include_router(fast_search_v3.router, prefix=DDMS_V3_PATH, tags=['fast-search v3'],
208
                        dependencies=basic_dependencies)
fabian serin's avatar
fabian serin committed
209
wdms_app.include_router(search_v3_alpha.router, prefix=ALPHA_APIS_PREFIX + DDMS_V3_PATH, tags=['ALPHA feature: search v3'],
210
211
                        dependencies=basic_dependencies)

212

213
alpha_tags = ['ALPHA feature: bulk data chunking']
Jeremie Hallal's avatar
Jeremie Hallal committed
214
v3_bulk_dependencies = [*basic_dependencies, Depends(set_v3_input_dataframe_check), Depends(set_osdu_bulk_id_access)]
215
216
217
218
219
220
221
222
223
224
225
226
227

for bulk_prefix, bulk_tags, is_visible in [(ALPHA_APIS_PREFIX + DDMS_V3_PATH, alpha_tags, False),
                                           (DDMS_V3_PATH, [], True)
                                           ]:
    # welllog bulk v3 APIs
    wdms_app.include_router(
        sessions.router,
        prefix=bulk_prefix + welllog_ddms_v3.WELL_LOGS_API_BASE_PATH,
        tags=bulk_tags if bulk_tags else ["WellLog"],
        dependencies=basic_dependencies,
        include_in_schema=is_visible)

    wdms_app.include_router(
228
        bulk_routes.router,
229
230
        prefix=bulk_prefix + welllog_ddms_v3.WELL_LOGS_API_BASE_PATH,
        tags=bulk_tags if bulk_tags else ["WellLog"],
231
        dependencies=v3_bulk_dependencies,
232
233
234
235
236
237
238
239
240
241
242
        include_in_schema=is_visible)

    # wellbore trajectory bulk v3 APIs
    wdms_app.include_router(
        sessions.router,
        prefix=bulk_prefix + wellbore_trajectory_ddms_v3.WELLBORE_TRAJECTORIES_API_BASE_PATH,
        tags=bulk_tags if bulk_tags else ["Trajectory v3"],
        dependencies=basic_dependencies,
        include_in_schema=is_visible)

    wdms_app.include_router(
243
        bulk_routes.router,
244
245
        prefix=bulk_prefix + wellbore_trajectory_ddms_v3.WELLBORE_TRAJECTORIES_API_BASE_PATH,
        tags=bulk_tags if bulk_tags else ["Trajectory v3"],
246
        dependencies=v3_bulk_dependencies,
247
        include_in_schema=is_visible)
248

249
250
251
# log bulk v2 APIs
wdms_app.include_router(
    sessions.router,
252
    prefix=ALPHA_APIS_PREFIX + DDMS_V2_PATH + log_ddms_v2.LOGS_API_BASE_PATH,
253
254
    tags=alpha_tags,
    dependencies=basic_dependencies)
255
wdms_app.include_router(
256
    bulk_routes.router,
257
    prefix=ALPHA_APIS_PREFIX + DDMS_V2_PATH + log_ddms_v2.LOGS_API_BASE_PATH,
258
    tags=alpha_tags,
Jeremie Hallal's avatar
Jeremie Hallal committed
259
    dependencies=[*basic_dependencies, Depends(set_legacy_input_dataframe_check), Depends(set_log_bulk_id_access)])
260

261
262
# The multiple instantiation of bulk_utils router create some duplicates operation_id
update_operation_ids(wdms_app)
263

264

265
# ------------- add alpha feature: ONLY MOUNTED IN DEV AND DA ENVs
Luc Yriarte's avatar
Luc Yriarte committed
266
267
def enable_alpha_feature():
    """ must be called to enable and activate alpha feature"""
268
    # logger.get_logger().warning("Enabling alpha feature")
269
    # include alpha routers down below #
270
    pass
Luc Yriarte's avatar
Luc Yriarte committed
271
272


273
274
# order is last executed first
wdms_app.add_middleware(TracingMiddleware)
Luc Yriarte's avatar
Luc Yriarte committed
275
wdms_app.add_middleware(CreateBasicContextMiddleware, injector=app_injector)
276
277

# adding exception handling
Luc Yriarte's avatar
Luc Yriarte committed
278
add_exception_handlers(wdms_app)
Kin Jin Ng's avatar
Kin Jin Ng committed
279
280


Kin Jin Ng's avatar
Kin Jin Ng committed
281
282
283
284
def remove_modules_routers():
    discoverer.reset_routers()


285
# Load and add router modules
286
def add_modules_routers():
Kin Jin Ng's avatar
Kin Jin Ng committed
287
    for router in discoverer.get_routers():
288
        add_modules_router(router)
Kin Jin Ng's avatar
Kin Jin Ng committed
289

Kin Jin Ng's avatar
Kin Jin Ng committed
290

291
def add_modules_router(router):
Kin Jin Ng's avatar
Kin Jin Ng committed
292
293
    log = logger.get_logger()
    name = router.prefix
Kin Jin Ng's avatar
Kin Jin Ng committed
294
    try:
Kin Jin Ng's avatar
Kin Jin Ng committed
295
296
297
298
        log.info(f'Adding router family `{name}`')
        wdms_app.include_router(router, dependencies=[Depends(require_data_partition_id, use_cache=False),
                                                      Depends(require_opendes_authorized_user, use_cache=False)])
        log.info(f'Done. `{name}` added')
Kin Jin Ng's avatar
Kin Jin Ng committed
299
    except ValueError as error:
Kin Jin Ng's avatar
Kin Jin Ng committed
300
        log.warning(f'Failed to add `{name}` router. {error}')
Kin Jin Ng's avatar
Kin Jin Ng committed
301
    except:
Kin Jin Ng's avatar
Kin Jin Ng committed
302
        log.warning(f'Failed to add `{name}` router. {sys.exc_info()[0]}')