Commit 9b2ce875 authored by Yannick's avatar Yannick
Browse files

Merge branch 'feature-backoff' into 'master'

add backoff on search and storage client

See merge request !67
parents a88b690b f371789b
Pipeline #35897 failed with stages
in 13 minutes and 9 seconds
......@@ -27,7 +27,7 @@ __all__ = ['SearchServiceClient',
from app.clients.clients_middleware import client_middleware
from app.clients.clients_middleware import client_middleware, backoff_middleware
SearchServiceClient = AsyncSearchApi
StorageRecordServiceClient = AsyncRecordsApi
......@@ -49,6 +49,7 @@ def make_search_client(host) -> SearchServiceClient:
max_keepalive_connections=Config.de_client_config_max_keepalive.value or None)
return odes_search.AsyncApis(search_client).search_api
......@@ -61,4 +62,5 @@ def make_storage_record_client(host) -> StorageRecordServiceClient:
max_keepalive_connections=Config.de_client_config_max_keepalive.value or None)
return odes_storage.AsyncApis(storage_client).records_api
import backoff
from app.conf import Config
from httpx import (
TimeoutException, # => ReadTimeout, WriteTimeout, ConnectTimeout, PoolTimeout
def backoff_policy(on_backoff_handlers=None):
return backoff.on_exception(backoff.expo,
(RemoteProtocolError, TimeoutException),
......@@ -17,6 +17,9 @@ from opencensus.trace.span import SpanKind
from app import conf
from app.utils import Context
from app.helper import utils, traces
from .backoff_policy import backoff_policy
from sys import exc_info
from traceback import format_exception
def _before_tracing_attributes(ctx, request):
......@@ -40,6 +43,19 @@ def _before_tracing_attributes(ctx, request):
def backoff_handler_log_it(details):
ctx = Context.current()
exception_type, raised_exec, tb = exc_info()
s_stack = format_exception(exception_type, raised_exec, tb)
ctx.logger.exception(f"Backoff callback, tries={details['tries']}: {raised_exec}. Stack = {s_stack}")
async def backoff_middleware(request, call_next):
return await call_next(request)
async def client_middleware(request, call_next):
ctx = Context.current()
......@@ -132,6 +132,21 @@ class ConfigurationContainer:
factory=lambda x: int(x))
de_client_backoff_max_tries: EnvVar = EnvVar(
description="""The maximum number of attempts to make before giving
up. Once exhausted, the exception will be allowed to escape.
The default value of None means their is no limit to the
number of tries.""",
factory=lambda x: int(x))
de_client_backoff_max_wait: EnvVar = EnvVar(
description="""The maximum wait in second between retry. """,
factory=lambda x: int(x))
build_details: EnvVar = EnvVar(
description='contains optional extra information of the build, format is the multiple "key=value" separated'
......@@ -12,13 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from app.bulk_persistence import create_and_store_dataframe
import pandas as pd
from app.bulk_persistence import get_dataframe
from opencensus.trace.span import SpanKind
from odes_storage.models import Record
from app.utils import Context
from app.bulk_persistence import create_and_store_dataframe
from app.bulk_persistence import get_dataframe
from app.utils import Context
from app.model.log_bulk import LogBulkHelper
......@@ -38,4 +39,6 @@ class Persistence:
async def write_bulk(cls, ctx: Context, dataframe) -> str:
return await create_and_store_dataframe(ctx, dataframe)
with ctx.tracer.span(name=f'write bulk') as span:
span.span_kind = SpanKind.CLIENT
return await create_and_store_dataframe(ctx, dataframe)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment