Skip to content
Snippets Groups Projects
Commit a834d249 authored by Shane Hutchins's avatar Shane Hutchins
Browse files

Merge branch 'vl/fastapi-swagger-mdw' into 'master'

Add CSP security header for swagger page

See merge request !558
parents bd27100f ef99ea19
No related branches found
No related tags found
1 merge request!558Add CSP security header for swagger page
Pipeline #307116 passed
#!/usr/bin/env python3
# OSDU Policy Service
from fastapi import FastAPI, Request, Depends
from fastapi import FastAPI, Request, Depends, Response
# from fastapi.responses import RedirectResponse, HTMLResponse
# from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm, HTTPBearer
......@@ -38,8 +38,11 @@ from auth import auth
# from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
import correlation
import time
from collections import OrderedDict
from typing import Union
# import pdb; pdb.set_trace()
# TranslateItem.update_forward_refs()
......@@ -129,6 +132,79 @@ middleware = [
)
]
CSP: dict[str, Union[str, list[str]]] = {
"default-src": "'self'",
"img-src": [
"*",
# For SWAGGER UI
"data:",
],
"connect-src": "'self'",
"script-src": "'self'",
"style-src": ["'self'", "'unsafe-inline'"],
"script-src-elem": [
# For SWAGGER UI
"cdn.jsdelivr.net",
"'sha256-a5di3Fi8kPzq+7FX0kam9jjaIKEDTPWnO01nHt9p7SM='"
],
"style-src-elem": [
# For SWAGGER UI
"cdn.jsdelivr.net",
],
}
def parse_policy(policy: dict[str, Union[Union[str, list[str]]], str]) -> str:
"""Parse a given policy dict to string."""
if isinstance(policy, str):
# parse the string into a policy dict
policy_string = policy
policy = OrderedDict()
for policy_part in policy_string.split(";"):
policy_parts = policy_part.strip().split(" ")
policy[policy_parts[0]] = " ".join(policy_parts[1:])
policies = []
for section, content in policy.items():
if not isinstance(content, str):
content = " ".join(content)
policy_part = f"{section} {content}"
policies.append(policy_part)
parsed_policy = "; ".join(policies)
return parsed_policy
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""Add security headers to all responses."""
def __init__(self, app: FastAPI, csp: bool = True) -> None:
"""Init SecurityHeadersMiddleware.
:param app: FastAPI instance
:param no_csp: If no CSP should be used;
defaults to :py:obj:`False`
"""
super().__init__(app)
self.csp = csp
async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
"""Dispatch of the middleware.
:param request: Incoming request
:param call_next: Function to process the request
:return: Return response coming from from processed request
"""
headers = {
"Content-Security-Policy": "" if not self.csp else parse_policy(CSP),
}
response = await call_next(request)
response.headers.update(headers)
return response
"""
def policy_context_dependency(correlation_id: str = Header(None), user_agent: str = Header(None)):
# When used a Depends(), this fucntion get the `Correlation-ID` header,
......@@ -165,6 +241,8 @@ app = FastAPI(
openapi_tags=tags_metadata,
)
app.add_middleware(SecurityHeadersMiddleware, csp=True)
# @app.on_event("startup")
# async def startup_event():
# logger = logging.getLogger("uvicorn.access")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment