ÿØÿà JFIF ÿÛ „ ( %"1"%)+...383,7(-.-
![]() Server : Apache/2.4.6 (CentOS) OpenSSL/1.0.2k-fips PHP/7.4.20 System : Linux st2.domain.com 3.10.0-1127.10.1.el7.x86_64 #1 SMP Wed Jun 3 14:28:03 UTC 2020 x86_64 User : apache ( 48) PHP Version : 7.4.20 Disable Function : NONE Directory : /proc/self/cwd/usr/share/mysqlsh/oci_sdk/oci/auth/signers/ |
# coding: utf-8 # Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved. import json import threading import oci from oci._vendor import requests from .instance_principals_security_token_signer import InstancePrincipalsSecurityTokenSigner from .security_token_signer import SecurityTokenSigner from ..security_token_container import SecurityTokenContainer from ..session_key_supplier import SessionKeySupplier from .. import auth_utils from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat class ResourcePrincipalsFederationSigner(SecurityTokenSigner): def __init__(self, resource_principal_token_endpoint=None, resource_principal_session_token_endpoint=None, retry_strategy=None, log_requests=None): """ :param resource_principal_token_endpoint: The endpoint that can provide the resource principal token. This is a service endpoint. :param resource_principal_session_token_endpoint: The endpoint that can provide the resource principal session token. This will default to the auth federation endpoint if not provided. """ self._reset_signers_lock = threading.Lock() if resource_principal_token_endpoint: self.resource_principal_token_endpoint = resource_principal_token_endpoint else: raise ValueError("resource_principal_token_endpoint must be provided") self.instance_principal_signer = InstancePrincipalsSecurityTokenSigner() self.session_key_supplier = SessionKeySupplier() self.region = self.instance_principal_signer.initialize_and_return_region() self.tenancy_id = self.instance_principal_signer.tenancy_id if resource_principal_session_token_endpoint: self.resource_principal_session_token_endpoint = resource_principal_session_token_endpoint else: self.resource_principal_session_token_endpoint = oci.regions.endpoint_for('auth', self.region) if retry_strategy: self.retry_strategy = retry_strategy else: self.retry_strategy = oci.retry.DEFAULT_RETRY_STRATEGY # Get the instance id from the metadata service # TODO add error checks to ensure instance_id was retrieved. self.INSTANCE_ID_ENDPOINT = '{}/instance/id'.format(self.instance_principal_signer.METADATA_URL_BASE) # Set the connect time out to 10 seconds and the read time out to 60 secounds. timeout = (10, 60) response = requests.get(self.INSTANCE_ID_ENDPOINT, timeout=timeout) self.instance_id = response.text.strip().lower() # Holders for the tokens needed. self.rpt = None self.spst = None # Setup the base_client for calls to Service to get Resource Principal Token # and Service Principal Sesion Token # The config is not needed by when using the instance principals signer, but request logging could be enabled. config = {} if log_requests: config["log_requests"] = log_requests self.base_client = oci.BaseClient("", # No service config, self.instance_principal_signer, {}, # No type mapping region_client=False, service_endpoint=self.resource_principal_token_endpoint) # Get the Resource Principal Session Token and use it to set up the signer self.rpst = self.get_security_token() super(ResourcePrincipalsFederationSigner, self).__init__(self.rpst, self.session_key_supplier.get_key_pair()['private']) def get_security_token(self): if hasattr(self, 'security_token'): if self.security_token.valid_with_jitter(): return self.security_token.security_token return self._refresh_security_token_inner() def refresh_security_token(self): return self._refresh_security_token_inner() def _refresh_security_token_inner(self): self._reset_signers_lock.acquire() try: self.session_key_supplier.refresh() self.instance_principal_signer.refresh_security_token() # Get RPT blob, Service Principal Session Token from service, Steps A.1 and B.1 self.rpt, self.spst = self._get_resource_principal_token_and_service_principal_session_token() # Get RPST token from itentity, steps A.2 and B.2 self.security_token = SecurityTokenContainer(self.session_key_supplier, self._get_resource_principal_session_token()) return self.security_token.security_token finally: self._reset_signers_lock.release() def _get_resource_principal_token_and_service_principal_session_token(self): """ Get the Resource Principal Token and the Service Principal Session Token This makes a call to the resource_principal_token_endpoint which is defined by the service. """ method = "get" resource_path = "/20180711/resourcePrincipalToken/{}".format(self.instance_id) self.base_client.endpoint = self.resource_principal_token_endpoint response = self.make_call(method, resource_path) parsed_response = json.loads(response.data.decode('UTF-8')) return parsed_response['resourcePrincipalToken'], parsed_response['servicePrincipalSessionToken'] def _get_resource_principal_session_token(self): """ Get the Resource Principal Session Token """ method = "post" resource_path = "/v1/resourcePrincipalSessionToken" self.base_client.endpoint = self.resource_principal_session_token_endpoint public_key = self.session_key_supplier.get_key_pair()['public'] sanitized_public_key = auth_utils.sanitize_certificate_string(public_key.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)) request_payload = { 'resourcePrincipalToken': self.rpt, 'servicePrincipalSessionToken': self.spst, 'sessionPublicKey': sanitized_public_key } # The base client will convert the payload to JSON, but won't update the content length, so we need to # it here. json_request_payload = json.dumps(request_payload) header_params = {'content-type': 'application/json', 'Content-Length': str(len(json_request_payload))} response = self.make_call(method, resource_path, header_params=header_params, body=request_payload) parsed_response = json.loads(response.data.decode('UTF-8')) return parsed_response['token'] def make_call(self, method, resource_path, path_params=None, header_params=None, body=None): """ make_call Normally this would be part of the generated client. In this case the endpoint for the Resource Principal Token is not part of the generated client, so we need the same behavior here. """ if self.retry_strategy: return self.retry_strategy.make_retrying_call( self.base_client.call_api, resource_path=resource_path, method=method, path_params=path_params, header_params=header_params, body=body, response_type=oci.base_client.BYTES_RESPONSE_TYPE) else: return self.base_client.call_api( resource_path=resource_path, method=method, path_params=path_params, header_params=header_params, body=body, response_type=oci.base_client.BYTES_RESPONSE_TYPE)