Shortcuts

Source code for geodesic.account.credentials

import base64
from typing import Union, Any
from getpass import getpass
from geodesic.bases import _APIObject
import json
import requests
from geodesic import raise_on_error
from geodesic.service import ServiceClient
from geodesic.descriptors import _StringDescr

# Credentials client
credentials_client = ServiceClient("krampus", 1, "credentials")

SERVICE_ACCOUNT_KEY = "SERVICE_ACCOUNT_KEY"
AWS_KEY_PAIR = "AWS_KEY_PAIR"
AZURE_ACCESS_KEY = "AZURE_ACCESS_KEY"
JWT = "JWT"
OAUTH2_CLIENT_CREDENTIALS = "OAUTH2_CLIENT_CREDENTIALS"
OAUTH2_REFRESH_TOKEN = "OAUTH2_REFRESH_TOKEN"
BASIC_AUTH = "BASIC_AUTH"
API_KEY = "API_KEY"
DOCKER_PULL = "DOCKER_PULL"

valid_types = [
    SERVICE_ACCOUNT_KEY,
    AWS_KEY_PAIR,
    AZURE_ACCESS_KEY,
    JWT,
    OAUTH2_CLIENT_CREDENTIALS,
    OAUTH2_REFRESH_TOKEN,
    BASIC_AUTH,
    API_KEY,
    DOCKER_PULL,
]


[docs]def get_credential(name_or_uid: str = None): """ Gets the uid/name/type of requested credential, or None if it doesn't exist Args: name_or_uid: Name or UID of the credential to access """ res = raise_on_error(credentials_client.get(name_or_uid)) c = res.json()["credential"] if c is None: return None return Credential(**c)
[docs]def get_credentials(): """ Returns all of your user's credentials """ res = raise_on_error(credentials_client.get("")) return [Credential(**p) for p in res.json()["credentials"]]
[docs]class Credential(_APIObject): """ Credentials to access secure resources such as a cloud storage bucket. Credentials have a name, type and data. Credentials can be created or deleted but not accessed again except by internal services. This is for security reasons. Credentials are stored using symmetric PGP encryption at rest. """ uid = _StringDescr(doc="the unique ID for this credential. Set automatically") name = _StringDescr( doc="the name of this credential. Unique to the user and how a user will typically reference it" ) type = _StringDescr( one_of=valid_types, doc=f"the type of the credential. Supported types are {', '.join(valid_types)}", ) _limit_setitem = ["name", "type", "data"] def __init__(self, **credential): self._name = None self._type = None self.__data = bytes() self._client = credentials_client for k, v in credential.items(): setattr(self, k, v)
[docs] def create(self, ignore_if_exists=True): """ Creates a new Credentials. Encodes the data to be sent. """ data = self.__data if isinstance(data, bytes): enc_data = base64.b64encode(data).decode() elif isinstance(data, dict): enc_data = base64.b64encode(json.dumps(data).encode()).decode() elif isinstance(data, str): enc_data = base64.b64encode(data.encode()).decode() try: raise_on_error(self._client.post("", name=self.name, type=self.type, data=enc_data)) except requests.HTTPError as e: try: get_credential(self.name) if ignore_if_exists: return raise e except requests.HTTPError: raise e
def delete(self): raise_on_error(self._client.delete(self.name)) @property def data(self): return self.__data @data.setter def data(self, v: Any): self.__data = v def __setattr__(self, name: str, value: Any) -> None: if name == "data": self.__data = value return super().__setattr__(name, value)
[docs] def from_docker_registry( *, name: str, username: str, host: str, email: str, password: str = None ) -> "Credential": """creates a new Docker Pull Credential for a Docker Registry Creates a Credential object that allows internal resources to pull a container image from a registry. Most registries support this method of pulling images and specifically this is how this is done in Kubernetes in most cases. Details for GCP/GCR: https://cloud.google.com/container-registry/docs/advanced-authentication#json-key Arguments: name: the name of the Credential to create username: the username (varies by registry provider, username for Dockerhub, "_json_key" for GCR, etc) host: the hostname for the registry (e.g. https://docker.io) email: the email address associated with this account password: the password or token for this account Returns: a new Credential object that can be saved to the backend. """ if password is None: password = getpass(prompt="enter password or appropriately formatted key: ") return Credential( name=name, type=DOCKER_PULL, data={ "username": username, "host": host, "email": email, "password": password, }, )
[docs] @staticmethod def from_gcp_service_account(*, name: str, key: Union[str, dict] = None) -> "Credential": """creates new GCP Service Account Credential Creates a new Credential object for a GCP Service Account (e.g. Google Earth Engine, BigQuery, etc) Arguments: name: the name of the Credential to create key: the full service account, either a string or dict. If `None`, user will be prompted via stdin Returns: a new Credential object that can be saved to backend """ if key is None: key = getpass(prompt="Paste Complete Google Cloud Service Account Key: ") if isinstance(key, str): key = json.loads(key) return Credential(name=name, type=SERVICE_ACCOUNT_KEY, data=key)
[docs] @staticmethod def from_aws_key_pair( *, name: str, aws_access_key_id: str, aws_secret_access_key: str = None ) -> "Credential": """creates new AWS Key Pair Credential Creates a new Credential object for an AWS Key Pair (such as from an IAM User) Arguments: name: the name of the Credential to create aws_access_key_id: the access key id aws_secret_access_key: the secret key. If `None`, user will be prompted via stdin. Returns: a new Credential object that can be saved to backend """ if aws_secret_access_key is None: aws_secret_access_key = getpass(prompt="AWS Secret Access Key: ") return Credential( name=name, type=AWS_KEY_PAIR, data={ "aws_access_key_id": aws_access_key_id, "aws_secret_access_key": aws_secret_access_key, }, )
[docs] @staticmethod def from_azure_storage_account( *, name: str, account_name: str, account_key: str = None ) -> "Credential": """creates new Azure Storage Account Credential Creates a new Credential object for an Azure Storage Account (e.g. Blob storage) Arguments: name: the name of the Credential to create account_name: the Azure account name account_key: the secret key for the account. If `None`, user will be prompted via stdin. Returns: a new Credential object that can be saved to backend """ if account_key is None: account_key = getpass(prompt="Azure Storage Account Key: ") return Credential( name=name, type=AZURE_ACCESS_KEY, data={"account_name": account_name, "account_key": account_key}, )
[docs] @staticmethod def from_azure_connection_string(*, name: str, connection_string: str = None) -> "Credential": """creates new Azure Storage Account Credential Creates a new Credential object for an Azure Storage Account (e.g. Blob storage) Arguments: name: the name of the Credential to create connection_string: the Azure account's connection string. If `None`, user will be prompted via stdin. Returns: a new Credential object that can be saved to backend """ if connection_string is None: connection_string = getpass(prompt="Azure Storage Account Connection String: ") return Credential( name=name, type=AZURE_ACCESS_KEY, data={"connection_string": connection_string}, )
[docs] @staticmethod def from_jwt(*, name: str, jwt: str = None) -> "Credential": """creates new JSON Web Token Credential Creates a new Credential object for an arbitrary JWT Arguments: name: the name of the Credential to create jwt: the string/encoded JWT. If `None`, user will be prompted via stdin. Returns: a new Credential object that can be saved to backend """ if jwt is None: jwt = getpass(prompt="Paste Complete JSON Web Token: ") return Credential(name=name, type=JWT, data=jwt)
[docs] @staticmethod def from_oauth2_client_credentials( *, name: str, client_id: str, client_secret: str = None, token_url: str = None, authorization_url: str = None, audience: str = None, scope: str = None, ) -> "Credential": """creates new OAuth2 Client Credentials Credential Creates a new Credential object for an OAuth2 Application Arguments: name: the name of the Credential to create client_id: the client_id of the oauth2 app client_secret: the client secret of the oauth2 app. If `None`, user will be prompted via stdin. token_url: the token url/uri to request an access token authorization_url: the authorization url for certain auth flows audience: (optional) the audience of the access_token scope: (optional) custom scope to be requested with the token Returns: a new Credential object that can be saved to backend """ if token_url is None: raise ValueError("must provide token_url") if authorization_url is None: raise ValueError("must provide authorization_url") if client_secret is None: client_secret = getpass(prompt="Client Secret: ") data = { "client_id": client_id, "client_secret": client_secret, "token_url": token_url, "authorization_url": authorization_url, } if audience is not None: data["audience"] = audience if scope is not None: data["scope"] = scope return Credential(name=name, type=OAUTH2_CLIENT_CREDENTIALS, data=data)
[docs] @staticmethod def from_basic_auth(*, name: str, username: str, password: str = None) -> "Credential": """creates new Basic Auth Credential Creates a new Credential object for a username/password Arguments: name: the name of the Credential to create username: the username password: the password. If `None`, user will be prompted via stdin. Returns: a new Credential object that can be saved to backend """ if password is None: password = getpass() return Credential( name=name, type=BASIC_AUTH, data={"username": username, "password": password}, )

Docs

Developer documentation for Seer AI APIs

View Docs

Tutorials

Get in-depth tutorials for beginners and advanced developers

View Tutorials

Resources

Find development resources and get your questions answered

View Resources