Package pachyderm_sdk

Expand source code
# Python version compatibility.
try:
    # >= 3.8
    import importlib.metadata as metadata
except ImportError:
    #  < 3.8
    import importlib_metadata as metadata  # type: ignore

__version__ = ""
try:
    __version__ = metadata.version(__name__)  # type: ignore
except (FileNotFoundError, ModuleNotFoundError):
    pass

from .api.pfs import _additions as __pfs_additions
from .client import Client
__all__ = [
    "Client",
]

Sub-modules

pachyderm_sdk.api
pachyderm_sdk.client
pachyderm_sdk.config
pachyderm_sdk.constants
pachyderm_sdk.errors

Errors that can be raised by this library.

pachyderm_sdk.interceptor

Classes

class Client (host: str = 'localhost', port: int = 30650, auth_token: Optional[str] = None, root_certs: Optional[bytes] = None, transaction_id: str = None, tls: bool = False)

The :class:.Client class that users will primarily interact with. Initialize an instance with python_pachyderm.Client().

To see documentation on the methods :class:.Client can call, refer to the mixins module.

Creates a Pachyderm client. If both files don't exist, a client with default settings is created.

Parameters

host : str, optional
The pachd host. Default is 'localhost', which is used with pachctl port-forward.
port : int, optional
The port to connect to. Default is 30650.
auth_token : str, optional
The authentication token. Used if authentication is enabled on the cluster.
root_certs : bytes, optional
The PEM-encoded root certificates as byte string.
transaction_id : str, optional
The ID of the transaction to run operations on.
tls : bool
Whether TLS should be used. If root_certs are specified, they are used. Otherwise, we use the certs provided by certifi.
Expand source code
class Client:
    """The :class:`.Client` class that users will primarily interact with.
    Initialize an instance with ``python_pachyderm.Client()``.

    To see documentation on the methods :class:`.Client` can call, refer to the
    `mixins` module.
    """

    def __init__(
        self,
        host: str = 'localhost',
        port: int = 30650,
        auth_token: Optional[str] = None,
        root_certs: Optional[bytes] = None,
        transaction_id: str = None,
        tls: bool = False,
    ):
        """
        Creates a Pachyderm client. If both files don't exist, a client
        with default settings is created.

        Parameters
        ----------
        host : str, optional
            The pachd host. Default is 'localhost', which is used with
            ``pachctl port-forward``.
        port : int, optional
            The port to connect to. Default is 30650.
        auth_token : str, optional
            The authentication token. Used if authentication is enabled on the
            cluster.
        root_certs : bytes, optional
            The PEM-encoded root certificates as byte string.
        transaction_id : str, optional
            The ID of the transaction to run operations on.
        tls : bool
            Whether TLS should be used. If `root_certs` are specified, they are
            used. Otherwise, we use the certs provided by certifi.
        """
        if auth_token is None:
            auth_token = os.environ.get(AUTH_TOKEN_ENV)

        tls = tls or (root_certs is not None)
        if tls and root_certs is None:
            # load default certs if none are specified
            import certifi

            with open(certifi.where(), "rb") as f:
                root_certs = f.read()

        self.address = "{}:{}".format(host, port)
        self.root_certs = root_certs
        channel = _create_channel(
            self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS
        )

        self._auth_token = auth_token
        self._transaction_id = transaction_id
        self._metadata = self._build_metadata()
        self._channel = _apply_metadata_interceptor(channel, self._metadata)

        # See implementation for api layout.
        self._init_api()

        if not auth_token and (oidc_token := os.environ.get(OIDC_TOKEN_ENV)):
            self.auth_token = self.auth.authenticate(id_token=oidc_token)

    def _init_api(self):
        self.admin = _AdminStub(self._channel)
        self.auth = _AuthStub(self._channel)
        self.debug = _DebugStub(self._channel)
        self.enterprise = _EnterpriseStub(self._channel)
        self.identity = _IdentityStub(self._channel)
        self.license = _LicenseStub(self._channel)
        self.pfs = _PfsStub(self._channel)
        self.pps = _PpsStub(self._channel)
        self.transaction = _TransactionStub(
            self._channel,
            get_transaction_id=lambda: self.transaction_id,
            set_transaction_id=lambda value: setattr(self, "transaction_id", value),
        )
        self._version_api = _VersionStub(self._channel)

    @classmethod
    def new_in_cluster(
        cls,
        auth_token: Optional[str] = None,
        transaction_id: Optional[str] = None
    ) -> "Client":
        """Creates a Pachyderm client that operates within a Pachyderm cluster.

        Parameters
        ----------
        auth_token : str, optional
            The authentication token. Used if authentication is enabled on the
            cluster.
        transaction_id : str, optional
            The ID of the transaction to run operations on.

        Returns
        -------
        Client
            A python_pachyderm client instance.
        """
        if CONFIG_PATH_SPOUT.exists():
            # TODO: Should we notify the user that we are using spout config?
            return cls.from_config(CONFIG_PATH_SPOUT)

        host = os.environ.get(PACHD_SERVICE_HOST_ENV)
        if host is None:
            raise RuntimeError(
                f"Environment variable {PACHD_SERVICE_HOST_ENV} not set "
                f"-- cannot connect. Are you running in a cluster?"
            )
        port = os.environ.get(PACHD_SERVICE_PORT_ENV)
        if port is None:
            raise RuntimeError(
                f"Environment variable {PACHD_SERVICE_PORT_ENV} not set "
                f"-- cannot connect. Are you running in a cluster?"
            )

        return cls(
            host=host,
            port=int(port),
            auth_token=auth_token,
            transaction_id=transaction_id,
        )

    @classmethod
    def from_pachd_address(
        cls,
        pachd_address: str,
        auth_token: str = None,
        root_certs: bytes = None,
        transaction_id: str = None,
    ) -> "Client":
        """Creates a Pachyderm client from a given pachd address.

        Parameters
        ----------
        pachd_address : str
            The address of pachd server
        auth_token : str, optional
            The authentication token. Used if authentication is enabled on the
            cluster.
        root_certs : bytes, optional
            The PEM-encoded root certificates as byte string. If unspecified,
            this will load default certs from certifi.
        transaction_id : str, optional
            The ID of the transaction to run operations on.

        Returns
        -------
        Client
            A python_pachyderm client instance.
        """
        if "://" not in pachd_address:
            pachd_address = "grpc://{}".format(pachd_address)

        u = urlparse(pachd_address)

        if u.scheme not in ("grpc", "http", "grpcs", "https"):
            raise ValueError("unrecognized pachd address scheme: {}".format(u.scheme))
        if u.path or u.params or u.query or u.fragment or u.username or u.password:
            raise ValueError("invalid pachd address")

        return cls(
            host=u.hostname,
            port=u.port,
            auth_token=auth_token,
            root_certs=root_certs,
            transaction_id=transaction_id,
            tls=u.scheme == "grpcs" or u.scheme == "https",
        )

    @classmethod
    def from_config(cls, config_file: Union[Path, str]=CONFIG_PATH_LOCAL) -> "Client":
        """Creates a Pachyderm client from a config file.

        Parameters
        ----------
        config_file : Union[Path, str]
            The path to a config json file.
            config_file defaults to the local config.

        Returns
        -------
        Client
            A properly configured Client.
        """
        config = ConfigFile(config_file)
        active_context = config.active_context
        client = cls.from_pachd_address(
            active_context.active_pachd_address,
            auth_token=active_context.session_token,
            root_certs=active_context.server_cas_decoded,
            transaction_id=active_context.active_transaction,
        )

        # Verify the deployment ID of the active context with the cluster.
        expected_deployment_id = active_context.cluster_deployment_id
        if expected_deployment_id:
            cluster_info = client.admin.inspect_cluster()
            if cluster_info.deployment_id != expected_deployment_id:
                raise BadClusterDeploymentID(
                    expected_deployment_id, cluster_info.deployment_id
                )

        return client

    @property
    def auth_token(self):
        """The authentication token. Used if authentication is enabled on the cluster."""
        return self._auth_token

    @auth_token.setter
    def auth_token(self, value):
        self._auth_token = value
        self._metadata = self._build_metadata()
        self._channel = _apply_metadata_interceptor(
            channel=_create_channel(
                self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS
            ),
            metadata=self._metadata,
        )
        self._init_api()

    @property
    def transaction_id(self):
        """The ID of the transaction to run operations on."""
        return self._transaction_id

    @transaction_id.setter
    def transaction_id(self, value):
        self._transaction_id = value
        self._metadata = self._build_metadata()
        self._channel = _apply_metadata_interceptor(
            channel=_create_channel(
                self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS
            ),
            metadata=self._metadata,
        )
        self._init_api()

    def _build_metadata(self):
        metadata = []
        if self._auth_token is not None:
            metadata.append(("authn-token", self._auth_token))
        if self._transaction_id is not None:
            metadata.append(("pach-transaction", self._transaction_id))
        return metadata

    def delete_all(self) -> None:
        """Delete all repos, commits, files, pipelines, and jobs.
        This resets the cluster to its initial state.
        """
        # Try removing all identities if auth is activated.
        with contextlib.suppress(AuthServiceNotActivated):
            self.identity.delete_all()

        # Try deactivating auth if activated.
        with contextlib.suppress(AuthServiceNotActivated):
            self.auth.deactivate()

        # Try removing all licenses if auth is activated.
        with contextlib.suppress(AuthServiceNotActivated):
            self.license.delete_all()

        self.pps.delete_all()
        self.pfs.delete_all()
        self.transaction.delete_all()

    def get_version(self) -> Version:
        """Requests version information from the pachd cluster."""
        return self._version_api.get_version()

Static methods

def new_in_cluster(auth_token: Optional[str] = None, transaction_id: Optional[str] = None) ‑> Client

Creates a Pachyderm client that operates within a Pachyderm cluster.

Parameters

auth_token : str, optional
The authentication token. Used if authentication is enabled on the cluster.
transaction_id : str, optional
The ID of the transaction to run operations on.

Returns

Client
A python_pachyderm client instance.
Expand source code
@classmethod
def new_in_cluster(
    cls,
    auth_token: Optional[str] = None,
    transaction_id: Optional[str] = None
) -> "Client":
    """Creates a Pachyderm client that operates within a Pachyderm cluster.

    Parameters
    ----------
    auth_token : str, optional
        The authentication token. Used if authentication is enabled on the
        cluster.
    transaction_id : str, optional
        The ID of the transaction to run operations on.

    Returns
    -------
    Client
        A python_pachyderm client instance.
    """
    if CONFIG_PATH_SPOUT.exists():
        # TODO: Should we notify the user that we are using spout config?
        return cls.from_config(CONFIG_PATH_SPOUT)

    host = os.environ.get(PACHD_SERVICE_HOST_ENV)
    if host is None:
        raise RuntimeError(
            f"Environment variable {PACHD_SERVICE_HOST_ENV} not set "
            f"-- cannot connect. Are you running in a cluster?"
        )
    port = os.environ.get(PACHD_SERVICE_PORT_ENV)
    if port is None:
        raise RuntimeError(
            f"Environment variable {PACHD_SERVICE_PORT_ENV} not set "
            f"-- cannot connect. Are you running in a cluster?"
        )

    return cls(
        host=host,
        port=int(port),
        auth_token=auth_token,
        transaction_id=transaction_id,
    )
def from_pachd_address(pachd_address: str, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None) ‑> Client

Creates a Pachyderm client from a given pachd address.

Parameters

pachd_address : str
The address of pachd server
auth_token : str, optional
The authentication token. Used if authentication is enabled on the cluster.
root_certs : bytes, optional
The PEM-encoded root certificates as byte string. If unspecified, this will load default certs from certifi.
transaction_id : str, optional
The ID of the transaction to run operations on.

Returns

Client
A python_pachyderm client instance.
Expand source code
@classmethod
def from_pachd_address(
    cls,
    pachd_address: str,
    auth_token: str = None,
    root_certs: bytes = None,
    transaction_id: str = None,
) -> "Client":
    """Creates a Pachyderm client from a given pachd address.

    Parameters
    ----------
    pachd_address : str
        The address of pachd server
    auth_token : str, optional
        The authentication token. Used if authentication is enabled on the
        cluster.
    root_certs : bytes, optional
        The PEM-encoded root certificates as byte string. If unspecified,
        this will load default certs from certifi.
    transaction_id : str, optional
        The ID of the transaction to run operations on.

    Returns
    -------
    Client
        A python_pachyderm client instance.
    """
    if "://" not in pachd_address:
        pachd_address = "grpc://{}".format(pachd_address)

    u = urlparse(pachd_address)

    if u.scheme not in ("grpc", "http", "grpcs", "https"):
        raise ValueError("unrecognized pachd address scheme: {}".format(u.scheme))
    if u.path or u.params or u.query or u.fragment or u.username or u.password:
        raise ValueError("invalid pachd address")

    return cls(
        host=u.hostname,
        port=u.port,
        auth_token=auth_token,
        root_certs=root_certs,
        transaction_id=transaction_id,
        tls=u.scheme == "grpcs" or u.scheme == "https",
    )
def from_config(config_file: Union[pathlib.Path, str] = PosixPath('~/.pachyderm/config.json')) ‑> Client

Creates a Pachyderm client from a config file.

Parameters

config_file : Union[Path, str]
The path to a config json file. config_file defaults to the local config.

Returns

Client
A properly configured Client.
Expand source code
@classmethod
def from_config(cls, config_file: Union[Path, str]=CONFIG_PATH_LOCAL) -> "Client":
    """Creates a Pachyderm client from a config file.

    Parameters
    ----------
    config_file : Union[Path, str]
        The path to a config json file.
        config_file defaults to the local config.

    Returns
    -------
    Client
        A properly configured Client.
    """
    config = ConfigFile(config_file)
    active_context = config.active_context
    client = cls.from_pachd_address(
        active_context.active_pachd_address,
        auth_token=active_context.session_token,
        root_certs=active_context.server_cas_decoded,
        transaction_id=active_context.active_transaction,
    )

    # Verify the deployment ID of the active context with the cluster.
    expected_deployment_id = active_context.cluster_deployment_id
    if expected_deployment_id:
        cluster_info = client.admin.inspect_cluster()
        if cluster_info.deployment_id != expected_deployment_id:
            raise BadClusterDeploymentID(
                expected_deployment_id, cluster_info.deployment_id
            )

    return client

Instance variables

var auth_token

The authentication token. Used if authentication is enabled on the cluster.

Expand source code
@property
def auth_token(self):
    """The authentication token. Used if authentication is enabled on the cluster."""
    return self._auth_token
var transaction_id

The ID of the transaction to run operations on.

Expand source code
@property
def transaction_id(self):
    """The ID of the transaction to run operations on."""
    return self._transaction_id

Methods

def delete_all(self) ‑> None

Delete all repos, commits, files, pipelines, and jobs. This resets the cluster to its initial state.

Expand source code
def delete_all(self) -> None:
    """Delete all repos, commits, files, pipelines, and jobs.
    This resets the cluster to its initial state.
    """
    # Try removing all identities if auth is activated.
    with contextlib.suppress(AuthServiceNotActivated):
        self.identity.delete_all()

    # Try deactivating auth if activated.
    with contextlib.suppress(AuthServiceNotActivated):
        self.auth.deactivate()

    # Try removing all licenses if auth is activated.
    with contextlib.suppress(AuthServiceNotActivated):
        self.license.delete_all()

    self.pps.delete_all()
    self.pfs.delete_all()
    self.transaction.delete_all()
def get_version(self) ‑> Version

Requests version information from the pachd cluster.

Expand source code
def get_version(self) -> Version:
    """Requests version information from the pachd cluster."""
    return self._version_api.get_version()