wdms_app.py 10.5 KB
Newer Older
ethiraj krishnamanaidu's avatar
ethiraj krishnamanaidu committed
1
2
3
4
5
6
7
8
9
10
11
12
13
# Copyright 2021 Schlumberger
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
Kin Jin Ng's avatar
Kin Jin Ng committed
14
import sys
15
16
17
from os import getpid
import asyncio
from time import sleep
ethiraj krishnamanaidu's avatar
ethiraj krishnamanaidu committed
18

19
20
21
from fastapi import FastAPI, Depends
from fastapi.openapi.utils import get_openapi

Luc Yriarte's avatar
Luc Yriarte committed
22
from app import __version__, __build_number__, __app_name__
23
from app import bulk_utils
24
25
from app.auth.auth import require_opendes_authorized_user
from app.conf import Config, check_environment
Luc Yriarte's avatar
Luc Yriarte committed
26
from app.errors.exception_handlers import add_exception_handlers
Kin Jin Ng's avatar
Kin Jin Ng committed
27
from app.extensions import discoverer
28

Luc Yriarte's avatar
Luc Yriarte committed
29
30
31
32
33
from app.helper import traces, logger
from app.injector.app_injector import AppInjector
from app.injector.main_injector import MainInjector
from app.middleware import CreateBasicContextMiddleware, TracingMiddleware
from app.middleware.basic_context_middleware import require_data_partition_id
Luc Yriarte's avatar
Luc Yriarte committed
34
from app.routers import probes, about, sessions
Luc Yriarte's avatar
Luc Yriarte committed
35
36
37
38
39
40
41
from app.routers.ddms_v2 import (
    ddms_v2,
    wellbore_ddms_v2,
    logset_ddms_v2,
    marker_ddms_v2,
    log_ddms_v2,
    well_ddms_v2
42
)
43
44
45
46
47
48
from app.routers.ddms_v3 import (
    wellbore_ddms_v3,
    well_ddms_v3,
    welllog_ddms_v3,
    wellbore_trajectory_ddms_v3,
    markerset_ddms_v3)
Luc Yriarte's avatar
Luc Yriarte committed
49
50
51
from app.routers.trajectory import trajectory_ddms_v2
from app.routers.dipset import dipset_ddms_v2, dip_ddms_v2
from app.routers.logrecognition import log_recognition
52
from app.routers.search import search, fast_search, search_v3, fast_search_v3
Luc Yriarte's avatar
Luc Yriarte committed
53
from app.clients import StorageRecordServiceClient, SearchServiceClient
54
55
56
57
58
59
from app.utils import (
    get_http_client_session,
    OpenApiHandler,
    get_wdms_temp_dir,
    get_pool_executor,
    POOL_EXECUTOR_MAX_WORKER)
60

Luc Yriarte's avatar
Luc Yriarte committed
61
base_app = FastAPI()
62

63
# The sub application which contains all the routers
64
65
66
67
68
69
70
wdms_app = FastAPI(title=__app_name__,
                   description='build ' + __build_number__,
                   version=__version__,
                   )

app_injector = AppInjector()

Luc Yriarte's avatar
Luc Yriarte committed
71
72
base_app.mount(Config.openapi_prefix.value, wdms_app)

73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

def custom_openapi(*args, **kwargs):
    if wdms_app.openapi_schema:
        return wdms_app.openapi_schema
    openapi_schema = get_openapi(
        title=wdms_app.title,
        version=wdms_app.version,
        description=wdms_app.description,
        routes=wdms_app.routes,
        servers=wdms_app.servers
    )

    routes_in_schemas = [route for route in wdms_app.routes if getattr(route, 'include_in_schema', True)]
    OpenApiHandler(openapi_schema, [getattr(route, 'operation_id', None) for route in routes_in_schemas])

    wdms_app.openapi_schema = openapi_schema
    return wdms_app.openapi_schema


wdms_app.openapi = custom_openapi


def hide_router_modules(modules):
    for mod in modules:
        for rte in mod.router.routes:
            rte.include_in_schema = False


101
102
103
104
105
106
def executor_startup_task():
    """ This is a dummy task used to startup executors"""
    print(f"process {getpid()} started")
    sleep(0.2)  # to keep executor "busy"


Luc Yriarte's avatar
Luc Yriarte committed
107
@base_app.on_event("startup")
108
async def startup_event():
109
110
111
    service_name = Config.service_name.value

    logger.init_logger(service_name=service_name)
112
113
114
    check_environment(Config)
    print('using temporary directory:', get_wdms_temp_dir())
    MainInjector().configure(app_injector)
115
    wdms_app.trace_exporter = traces.create_exporter(service_name=service_name)
116

117
118
119
120
121
122
123
124
125
    # init executor pool
    logger.get_logger().info("Startup process pool executor")

    # force to adjust process count now instead of on first demand
    pool = get_pool_executor()
    loop = asyncio.get_running_loop()
    futures = [loop.run_in_executor(pool, executor_startup_task) for _ in range(POOL_EXECUTOR_MAX_WORKER)]
    await asyncio.gather(*futures)

Luc Yriarte's avatar
Luc Yriarte committed
126
127
128
    if Config.alpha_feature_enabled.value:
        enable_alpha_feature()

Kin Jin Ng's avatar
Kin Jin Ng committed
129
130
    add_extension_routers()

131

Luc Yriarte's avatar
Luc Yriarte committed
132
@base_app.on_event('shutdown')
133
async def shutdown_event():
Luc Yriarte's avatar
Luc Yriarte committed
134
135
136
137
138
139
140
141
142
    # clients close
    storage_client = await app_injector.get(StorageRecordServiceClient)
    if storage_client is not None:
        await storage_client.api_client.close()

    search_client = await app_injector.get(SearchServiceClient)
    if search_client is not None:
        await storage_client.api_client.close()

143
144
    await get_http_client_session().close()

Kin Jin Ng's avatar
Kin Jin Ng committed
145

fabian serin's avatar
fabian serin committed
146
def update_operation_ids():
147
148
149
150
151
152
153
154
155
156
157
    # Ensure all operation_id are uniques
    from fastapi.routing import APIRoute
    for route in wdms_app.routes:
        if isinstance(route, APIRoute):
            if route.operation_id in bulk_utils.OPERATION_IDS.values():  # All route with possible duplicate
                new_operation_id = route.unique_id
                if route.operation_id in OpenApiHandler._handlers:
                    OpenApiHandler._handlers[new_operation_id] = OpenApiHandler._handlers.pop(route.operation_id)
                route.operation_id = route.unique_id


158
159
160
161
DDMS_V2_PATH = '/ddms/v2'
DDMS_V3_PATH = '/ddms/v3'
ALPHA_APIS_PREFIX = '/alpha'

162
163

wdms_app.include_router(probes.router)
164
wdms_app.include_router(about.router, prefix=DDMS_V2_PATH)
165
166
167
168
169
170
171
172
173

ddms_v2_routes_groups = [
    (ddms_v2, "Wellbore DDMS"),
    (well_ddms_v2, "Well"),
    (wellbore_ddms_v2, "Wellbore"),
    (logset_ddms_v2, "Logset"),
    (trajectory_ddms_v2, "Trajectory"),
    (marker_ddms_v2, "Marker"),
    (log_ddms_v2, "Log"),
Luc Yriarte's avatar
Luc Yriarte committed
174
175
    (dipset_ddms_v2, "Dipset"),
    (dip_ddms_v2, "Dips"),
176
177
178
]
for ddms_v2_routes_group in ddms_v2_routes_groups:
    wdms_app.include_router(ddms_v2_routes_group[0].router,
179
                            prefix=DDMS_V2_PATH,
180
181
182
                            tags=[ddms_v2_routes_group[1]],
                            dependencies=[
                                Depends(require_opendes_authorized_user, use_cache=False),
Luc Yriarte's avatar
Luc Yriarte committed
183
184
185
186
187
188
189
                                Depends(require_data_partition_id, use_cache=False)
                            ])

ddms_v3_routes_groups = [
    (wellbore_ddms_v3, "Wellbore"),
    (well_ddms_v3, "Well"),
    (welllog_ddms_v3, "WellLog"),
190
191
    (wellbore_trajectory_ddms_v3, "Trajectory"),
    (markerset_ddms_v3, "Marker"),
Luc Yriarte's avatar
Luc Yriarte committed
192
193
194
]
for ddms_v3_routes_group in ddms_v3_routes_groups:
    wdms_app.include_router(ddms_v3_routes_group[0].router,
195
                            prefix=DDMS_V3_PATH,
Luc Yriarte's avatar
Luc Yriarte committed
196
197
198
                            tags=[ddms_v3_routes_group[1]],
                            dependencies=[
                                Depends(require_opendes_authorized_user, use_cache=False),
199
200
201
                                Depends(require_data_partition_id, use_cache=False)
                            ])

Luc Yriarte's avatar
Luc Yriarte committed
202
wdms_app.include_router(search.router, prefix='/ddms', tags=['search'], dependencies=[
203
    Depends(require_data_partition_id, use_cache=False),
Luc Yriarte's avatar
Luc Yriarte committed
204
    Depends(require_opendes_authorized_user, use_cache=False)
205
])
Luc Yriarte's avatar
Luc Yriarte committed
206
wdms_app.include_router(fast_search.router, prefix='/ddms', tags=['fast-search'], dependencies=[
207
    Depends(require_data_partition_id, use_cache=False),
Luc Yriarte's avatar
Luc Yriarte committed
208
    Depends(require_opendes_authorized_user, use_cache=False)])
209

210
211
212
213
214
215
216
217
218
219
wdms_app.include_router(search_v3.router, prefix='/ddms/v3', tags=['search'], dependencies=[
    Depends(require_data_partition_id, use_cache=False),
    Depends(require_opendes_authorized_user, use_cache=False)
])

wdms_app.include_router(fast_search_v3.router, prefix='/ddms/v3', tags=['fast-search'], dependencies=[
    Depends(require_data_partition_id, use_cache=False),
    Depends(require_opendes_authorized_user, use_cache=False)
])

Luc Yriarte's avatar
Luc Yriarte committed
220
wdms_app.include_router(log_recognition.router, prefix='/log-recognition', tags=['log-recognition'], dependencies=[
221
    Depends(require_data_partition_id, use_cache=False),
Luc Yriarte's avatar
Luc Yriarte committed
222
    Depends(require_opendes_authorized_user, use_cache=False)])
223

Luc Yriarte's avatar
Luc Yriarte committed
224

225
226
227
228
dependencies = [Depends(require_data_partition_id, use_cache=False),
                Depends(require_opendes_authorized_user, use_cache=False)]


229
230
231
232
233
234
235
236
tags = ['ALPHA feature: bulk data chunking']

# welllog bulk v3 APIs
wdms_app.include_router(
    sessions.router,
    prefix=ALPHA_APIS_PREFIX + DDMS_V3_PATH + welllog_ddms_v3.WELL_LOGS_API_BASE_PATH,
    tags=tags, dependencies=dependencies)
wdms_app.include_router(
237
    bulk_utils.router_bulk,
238
239
240
241
242
243
244
245
246
    prefix=ALPHA_APIS_PREFIX + DDMS_V3_PATH + welllog_ddms_v3.WELL_LOGS_API_BASE_PATH,
    tags=tags, dependencies=dependencies)

# wellbore trajectory bulk v3 APIs
wdms_app.include_router(
    sessions.router,
    prefix=ALPHA_APIS_PREFIX + DDMS_V3_PATH + wellbore_trajectory_ddms_v3.WELLBORE_TRAJECTORIES_API_BASE_PATH,
    tags=tags, dependencies=dependencies)
wdms_app.include_router(
247
    bulk_utils.router_bulk,
248
249
250
    prefix=ALPHA_APIS_PREFIX + DDMS_V3_PATH + wellbore_trajectory_ddms_v3.WELLBORE_TRAJECTORIES_API_BASE_PATH,
    tags=tags, dependencies=dependencies)

251
252
253
# log bulk v2 APIs
wdms_app.include_router(
    sessions.router,
254
    prefix=ALPHA_APIS_PREFIX + DDMS_V2_PATH + log_ddms_v2.LOGS_API_BASE_PATH,
255
256
    tags=tags, dependencies=dependencies)
wdms_app.include_router(
257
258
    bulk_utils.router_bulk,
    prefix=ALPHA_APIS_PREFIX + DDMS_V2_PATH + log_ddms_v2.LOGS_API_BASE_PATH,
259
260
    tags=tags, dependencies=dependencies)

261
#The multiple instanciation of bulk_utils router create some duplicates operation_id
fabian serin's avatar
fabian serin committed
262
update_operation_ids()
263
264


265
# ------------- add alpha feature: ONLY MOUNTED IN DEV AND DA ENVs
Luc Yriarte's avatar
Luc Yriarte committed
266
267
268
269
def enable_alpha_feature():
    """ must be called to enable and activate alpha feature"""
    logger.get_logger().warning("Enabling alpha feature: chunking")

270
    # include alpha routers down below #
Luc Yriarte's avatar
Luc Yriarte committed
271
272


273
274
# order is last executed first
wdms_app.add_middleware(TracingMiddleware)
Luc Yriarte's avatar
Luc Yriarte committed
275
wdms_app.add_middleware(CreateBasicContextMiddleware, injector=app_injector)
276
277

# adding exception handling
Luc Yriarte's avatar
Luc Yriarte committed
278
add_exception_handlers(wdms_app)
Kin Jin Ng's avatar
Kin Jin Ng committed
279
280


Kin Jin Ng's avatar
Kin Jin Ng committed
281
282
283
284
285
# Load and add router extensions [alpha version]
def add_extension_routers():
    for router in discoverer.get_routers():
        add_extension_router(router)

Kin Jin Ng's avatar
Kin Jin Ng committed
286

Kin Jin Ng's avatar
Kin Jin Ng committed
287
288
289
def add_extension_router(router):
    log = logger.get_logger()
    name = router.prefix
Kin Jin Ng's avatar
Kin Jin Ng committed
290
    try:
Kin Jin Ng's avatar
Kin Jin Ng committed
291
292
293
294
        log.info(f'Adding router family `{name}`')
        wdms_app.include_router(router, dependencies=[Depends(require_data_partition_id, use_cache=False),
                                                      Depends(require_opendes_authorized_user, use_cache=False)])
        log.info(f'Done. `{name}` added')
Kin Jin Ng's avatar
Kin Jin Ng committed
295
    except ValueError as error:
Kin Jin Ng's avatar
Kin Jin Ng committed
296
        log.warning(f'Failed to add `{name}` router. {error}')
Kin Jin Ng's avatar
Kin Jin Ng committed
297
    except:
Kin Jin Ng's avatar
Kin Jin Ng committed
298
        log.warning(f'Failed to add `{name}` router. {sys.exc_info()[0]}')