Skip to content
Snippets Groups Projects
Commit 4c60e0e1 authored by Asbjørn L. Johansen's avatar Asbjørn L. Johansen
Browse files

Default behaviour is now to verify certificates unless an environment variable is set

parent 15223583
No related branches found
No related tags found
1 merge request!161Fix/ssl certificate validation
Pipeline #304437 passed
......@@ -115,7 +115,8 @@ You can configure this via the `GC_ID_TOKEN` environment variable. Here's how to
### 0.28
From this release verification of SSL certificates is enabled by default. If your environment is using self-signed certificates or a private CA this can be disabled for specific hosts or IP ranges.
From this release verification of SSL certificates is enabled by default. If your environment is using self-signed certificates or a private CA this can be disabled by setting the environment variable
`OSDU_API_DISABLE_SSL_VERIFICAITON` to **iknowthemaninthemiddle**.
### 0.17
......
......@@ -15,12 +15,10 @@
# limitations under the License.
import configparser
import importlib
from ipaddress import ip_address, ip_network
import logging
import os
import requests
from typing import Any, Optional
from urllib3.util import parse_url
import tenacity
......@@ -45,14 +43,15 @@ class BaseClient:
data_partition_id: Optional[str] = None,
token_refresher: Optional[TokenRefresher] = None,
logger: Optional[Any] = None,
user_id: Optional[str] = None,
disable_ssl_verify_hosts: Optional[list[str]] = None
user_id: Optional[str] = None
):
"""
Base client gets initialized with configuration values and a bearer token
based on provider-specific logic
"""
self._parse_config(config_manager, provider, data_partition_id, disable_ssl_verify_hosts)
self._ssl_verify = True
self._parse_config(config_manager, provider, data_partition_id)
self.user_id = user_id
self.unauth_retries = 0
......@@ -67,8 +66,7 @@ class BaseClient:
self,
config_manager: Optional[BaseConfigManager] = None,
provider: Optional[str] = None,
data_partition_id: Optional[str] = None,
disable_ssl_verify_hosts: Optional[list[str]] = None
data_partition_id: Optional[str] = None
):
"""
Parse config.
......@@ -76,6 +74,9 @@ class BaseClient:
:param config_manager: ConfigManager to get configs, defaults to None
:type config_manager: BaseConfigManager, optional
"""
if os.getenv('OSDU_API_DISABLE_SSL_VERIFICAITON') == 'iknowthemaninthemiddle':
self._ssl_verify = False
self.config_manager = config_manager or DefaultConfigManager()
# self.use_service_principal is used by AWS only, so other CSPs can set this attribute to False in any way.
......@@ -89,18 +90,6 @@ class BaseClient:
except configparser.Error:
self.use_service_principal = False
try:
disable_ssl_host_str = self.config_manager.get('client', 'disable_ssl_verification')
disable_ssl_host_list = disable_ssl_host_str.split('-')
if disable_ssl_verify_hosts is None:
self.disable_ssl_verification_hosts = disable_ssl_host_list
else:
self.disable_ssl_verification_hosts = disable_ssl_verify_hosts + disable_ssl_host_list
except configparser.Error:
self.disable_ssl_verification_hosts = disable_ssl_verify_hosts
if data_partition_id is None:
self.data_partition_id = self.config_manager.get(
'environment', 'data_partition_id')
......@@ -135,20 +124,18 @@ class BaseClient:
:return: response from OSDU service
:rtype: requests.Response
"""
print(self._ssl_verify(url))
if method == HttpMethod.GET:
response = requests.get(url=url, params=params, headers=headers,
verify=self._ssl_verify(url))
verify=self._ssl_verify)
elif method == HttpMethod.DELETE:
response = requests.delete(url=url, params=params, headers=headers,
verify=self._ssl_verify(url))
verify=self._ssl_verify)
elif method == HttpMethod.POST:
response = requests.post(url=url, params=params, data=data, headers=headers,
verify=self._ssl_verify(url))
verify=self._ssl_verify)
elif method == HttpMethod.PUT:
response = requests.put(url=url, params=params, data=data, headers=headers,
verify=self._ssl_verify(url))
verify=self._ssl_verify)
return response
def _send_request_with_principle_token(
......@@ -232,37 +219,6 @@ class BaseClient:
) -> requests.Response:
return self._send_request(method, url, data, headers, params)
def _ssl_verify(self, url: str) -> bool:
"""
Check if SSL certificate verification is disabled for a given URL.
:param url: URL to check
:type url: str
"""
if not self.disable_ssl_verification_hosts:
return True
parsed_url = parse_url(url)
hostname = parsed_url.hostname
try:
hostip = ip_address(hostname)
except ValueError:
hostip = None
for host in self.disable_ssl_verification_hosts:
if hostip:
try:
disable = hostip in ip_network(host)
except ValueError:
continue
else:
disable = host == hostname or (host.startswith('*') and hostname.endswith(host.lstrip('*')))
if disable: return False
return True
def make_request(
self,
method: HttpMethod,
......
......@@ -34,7 +34,4 @@ aws_oauth_custom_scope_ssm_path=/osdu/blah/oauth-custom-scope
client_id_ssm_path=/osdu/blah/client-credentials-client-id
client_secret_name=/osdu/blah/client_credentials_secret
client_secret_dict_key=client_credentials_client_secret
region_name=blah
[client]
disable_ssl_verification=trusted-10.0.0.0/24-*.trusted.domain-10.0.1.4
\ No newline at end of file
region_name=blah
\ No newline at end of file
......@@ -199,52 +199,3 @@ class TestBaseClient(unittest.TestCase):
token_refresher.refresh_token.assert_called_once()
self.assertEqual(client._send_request.call_count, 2)
self.assertEqual(response.status_code, http_status_code)
def test_ssl_verification_disabled_for_trusted_hosts(self):
target_hosts = ["10.0.0.5", "trusted", "service.trusted.domain", "10.0.1.4"]
for target_host in target_hosts:
with self.subTest(target_host=target_host):
# Arrange
config_manager = DefaultConfigManager(os.getcwd() + '/osdu_api/test/osdu_api.ini')
config_manager._parsed_config._sections["environment"]["use_service_principal"] = "False"
client = BaseClient(config_manager=config_manager, data_partition_id="opendes")
# Act
verify_ssl = client._ssl_verify(f'https://{target_host}')
# Assert
self.assertFalse(verify_ssl)
def test_ssl_verification_not_disabled_untrusted_hosts(self):
target_hosts = ["1.1.1.1", "untrusted", "service.untrusted.domain"]
for target_host in target_hosts:
with self.subTest(target_host=target_host):
# Arrange
config_manager = DefaultConfigManager(os.getcwd() + '/osdu_api/test/osdu_api.ini')
config_manager._parsed_config._sections["environment"]["use_service_principal"] = "False"
client = BaseClient(config_manager=config_manager, data_partition_id="opendes")
# Act
verify_ssl = client._ssl_verify(f'https://{target_host}')
# Assert
self.assertTrue(verify_ssl)
def test_ssl_verification_not_disabled_without_config(self):
target_hosts = ["10.0.0.5", "trusted", "service.trusted.domain", "10.0.1.4"]
for target_host in target_hosts:
with self.subTest(target_host=target_host):
# Arrange
config_manager = DefaultConfigManager(os.getcwd() + '/osdu_api/test/osdu_api.ini')
config_manager._parsed_config._sections["environment"]["use_service_principal"] = "False"
config_manager._parsed_config.remove_option("client", "disable_ssl_verification")
client = BaseClient(config_manager=config_manager, data_partition_id="opendes")
# Act
verify_ssl = client._ssl_verify(f'https://{target_host}')
# Assert
self.assertTrue(verify_ssl)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment