Package pachyderm_sdk
Expand source code
# Python version compatibility.
try:
# >= 3.8
import importlib.metadata as metadata
except ImportError:
# < 3.8
import importlib_metadata as metadata # type: ignore
__version__ = ""
try:
__version__ = metadata.version(__name__) # type: ignore
except (FileNotFoundError, ModuleNotFoundError):
pass
from .api.pfs import _additions as __pfs_additions
from .client import Client
__all__ = [
"Client",
]
Sub-modules
pachyderm_sdk.api
pachyderm_sdk.client
pachyderm_sdk.config
pachyderm_sdk.constants
pachyderm_sdk.errors
-
Errors that can be raised by this library.
pachyderm_sdk.interceptor
Classes
class Client (host: str = 'localhost', port: int = 30650, auth_token: Optional[str] = None, root_certs: Optional[bytes] = None, transaction_id: str = None, tls: bool = False)
-
The :class:
.Client
class that users will primarily interact with. Initialize an instance withpython_pachyderm.Client()
.To see documentation on the methods :class:
.Client
can call, refer to themixins
module.Creates a Pachyderm client. If both files don't exist, a client with default settings is created.
Parameters
host
:str
, optional- The pachd host. Default is 'localhost', which is used with
pachctl port-forward
. port
:int
, optional- The port to connect to. Default is 30650.
auth_token
:str
, optional- The authentication token. Used if authentication is enabled on the cluster.
root_certs
:bytes
, optional- The PEM-encoded root certificates as byte string.
transaction_id
:str
, optional- The ID of the transaction to run operations on.
tls
:bool
- Whether TLS should be used. If
root_certs
are specified, they are used. Otherwise, we use the certs provided by certifi.
Expand source code
class Client: """The :class:`.Client` class that users will primarily interact with. Initialize an instance with ``python_pachyderm.Client()``. To see documentation on the methods :class:`.Client` can call, refer to the `mixins` module. """ def __init__( self, host: str = 'localhost', port: int = 30650, auth_token: Optional[str] = None, root_certs: Optional[bytes] = None, transaction_id: str = None, tls: bool = False, ): """ Creates a Pachyderm client. If both files don't exist, a client with default settings is created. Parameters ---------- host : str, optional The pachd host. Default is 'localhost', which is used with ``pachctl port-forward``. port : int, optional The port to connect to. Default is 30650. auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. root_certs : bytes, optional The PEM-encoded root certificates as byte string. transaction_id : str, optional The ID of the transaction to run operations on. tls : bool Whether TLS should be used. If `root_certs` are specified, they are used. Otherwise, we use the certs provided by certifi. """ if auth_token is None: auth_token = os.environ.get(AUTH_TOKEN_ENV) tls = tls or (root_certs is not None) if tls and root_certs is None: # load default certs if none are specified import certifi with open(certifi.where(), "rb") as f: root_certs = f.read() self.address = "{}:{}".format(host, port) self.root_certs = root_certs channel = _create_channel( self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS ) self._auth_token = auth_token self._transaction_id = transaction_id self._metadata = self._build_metadata() self._channel = _apply_metadata_interceptor(channel, self._metadata) # See implementation for api layout. self._init_api() if not auth_token and (oidc_token := os.environ.get(OIDC_TOKEN_ENV)): self.auth_token = self.auth.authenticate(id_token=oidc_token) def _init_api(self): self.admin = _AdminStub(self._channel) self.auth = _AuthStub(self._channel) self.debug = _DebugStub(self._channel) self.enterprise = _EnterpriseStub(self._channel) self.identity = _IdentityStub(self._channel) self.license = _LicenseStub(self._channel) self.pfs = _PfsStub(self._channel) self.pps = _PpsStub(self._channel) self.transaction = _TransactionStub( self._channel, get_transaction_id=lambda: self.transaction_id, set_transaction_id=lambda value: setattr(self, "transaction_id", value), ) self._version_api = _VersionStub(self._channel) @classmethod def new_in_cluster( cls, auth_token: Optional[str] = None, transaction_id: Optional[str] = None ) -> "Client": """Creates a Pachyderm client that operates within a Pachyderm cluster. Parameters ---------- auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. """ if CONFIG_PATH_SPOUT.exists(): # TODO: Should we notify the user that we are using spout config? return cls.from_config(CONFIG_PATH_SPOUT) host = os.environ.get(PACHD_SERVICE_HOST_ENV) if host is None: raise RuntimeError( f"Environment variable {PACHD_SERVICE_HOST_ENV} not set " f"-- cannot connect. Are you running in a cluster?" ) port = os.environ.get(PACHD_SERVICE_PORT_ENV) if port is None: raise RuntimeError( f"Environment variable {PACHD_SERVICE_PORT_ENV} not set " f"-- cannot connect. Are you running in a cluster?" ) return cls( host=host, port=int(port), auth_token=auth_token, transaction_id=transaction_id, ) @classmethod def from_pachd_address( cls, pachd_address: str, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None, ) -> "Client": """Creates a Pachyderm client from a given pachd address. Parameters ---------- pachd_address : str The address of pachd server auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. root_certs : bytes, optional The PEM-encoded root certificates as byte string. If unspecified, this will load default certs from certifi. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. """ if "://" not in pachd_address: pachd_address = "grpc://{}".format(pachd_address) u = urlparse(pachd_address) if u.scheme not in ("grpc", "http", "grpcs", "https"): raise ValueError("unrecognized pachd address scheme: {}".format(u.scheme)) if u.path or u.params or u.query or u.fragment or u.username or u.password: raise ValueError("invalid pachd address") return cls( host=u.hostname, port=u.port, auth_token=auth_token, root_certs=root_certs, transaction_id=transaction_id, tls=u.scheme == "grpcs" or u.scheme == "https", ) @classmethod def from_config(cls, config_file: Union[Path, str]=CONFIG_PATH_LOCAL) -> "Client": """Creates a Pachyderm client from a config file. Parameters ---------- config_file : Union[Path, str] The path to a config json file. config_file defaults to the local config. Returns ------- Client A properly configured Client. """ config = ConfigFile(config_file) active_context = config.active_context client = cls.from_pachd_address( active_context.active_pachd_address, auth_token=active_context.session_token, root_certs=active_context.server_cas_decoded, transaction_id=active_context.active_transaction, ) # Verify the deployment ID of the active context with the cluster. expected_deployment_id = active_context.cluster_deployment_id if expected_deployment_id: cluster_info = client.admin.inspect_cluster() if cluster_info.deployment_id != expected_deployment_id: raise BadClusterDeploymentID( expected_deployment_id, cluster_info.deployment_id ) return client @property def auth_token(self): """The authentication token. Used if authentication is enabled on the cluster.""" return self._auth_token @auth_token.setter def auth_token(self, value): self._auth_token = value self._metadata = self._build_metadata() self._channel = _apply_metadata_interceptor( channel=_create_channel( self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS ), metadata=self._metadata, ) self._init_api() @property def transaction_id(self): """The ID of the transaction to run operations on.""" return self._transaction_id @transaction_id.setter def transaction_id(self, value): self._transaction_id = value self._metadata = self._build_metadata() self._channel = _apply_metadata_interceptor( channel=_create_channel( self.address, self.root_certs, options=GRPC_CHANNEL_OPTIONS ), metadata=self._metadata, ) self._init_api() def _build_metadata(self): metadata = [] if self._auth_token is not None: metadata.append(("authn-token", self._auth_token)) if self._transaction_id is not None: metadata.append(("pach-transaction", self._transaction_id)) return metadata def delete_all(self) -> None: """Delete all repos, commits, files, pipelines, and jobs. This resets the cluster to its initial state. """ # Try removing all identities if auth is activated. with contextlib.suppress(AuthServiceNotActivated): self.identity.delete_all() # Try deactivating auth if activated. with contextlib.suppress(AuthServiceNotActivated): self.auth.deactivate() # Try removing all licenses if auth is activated. with contextlib.suppress(AuthServiceNotActivated): self.license.delete_all() self.pps.delete_all() self.pfs.delete_all() self.transaction.delete_all() def get_version(self) -> Version: """Requests version information from the pachd cluster.""" return self._version_api.get_version()
Static methods
def new_in_cluster(auth_token: Optional[str] = None, transaction_id: Optional[str] = None) ‑> Client
-
Creates a Pachyderm client that operates within a Pachyderm cluster.
Parameters
auth_token
:str
, optional- The authentication token. Used if authentication is enabled on the cluster.
transaction_id
:str
, optional- The ID of the transaction to run operations on.
Returns
Client
- A python_pachyderm client instance.
Expand source code
@classmethod def new_in_cluster( cls, auth_token: Optional[str] = None, transaction_id: Optional[str] = None ) -> "Client": """Creates a Pachyderm client that operates within a Pachyderm cluster. Parameters ---------- auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. """ if CONFIG_PATH_SPOUT.exists(): # TODO: Should we notify the user that we are using spout config? return cls.from_config(CONFIG_PATH_SPOUT) host = os.environ.get(PACHD_SERVICE_HOST_ENV) if host is None: raise RuntimeError( f"Environment variable {PACHD_SERVICE_HOST_ENV} not set " f"-- cannot connect. Are you running in a cluster?" ) port = os.environ.get(PACHD_SERVICE_PORT_ENV) if port is None: raise RuntimeError( f"Environment variable {PACHD_SERVICE_PORT_ENV} not set " f"-- cannot connect. Are you running in a cluster?" ) return cls( host=host, port=int(port), auth_token=auth_token, transaction_id=transaction_id, )
def from_pachd_address(pachd_address: str, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None) ‑> Client
-
Creates a Pachyderm client from a given pachd address.
Parameters
pachd_address
:str
- The address of pachd server
auth_token
:str
, optional- The authentication token. Used if authentication is enabled on the cluster.
root_certs
:bytes
, optional- The PEM-encoded root certificates as byte string. If unspecified, this will load default certs from certifi.
transaction_id
:str
, optional- The ID of the transaction to run operations on.
Returns
Client
- A python_pachyderm client instance.
Expand source code
@classmethod def from_pachd_address( cls, pachd_address: str, auth_token: str = None, root_certs: bytes = None, transaction_id: str = None, ) -> "Client": """Creates a Pachyderm client from a given pachd address. Parameters ---------- pachd_address : str The address of pachd server auth_token : str, optional The authentication token. Used if authentication is enabled on the cluster. root_certs : bytes, optional The PEM-encoded root certificates as byte string. If unspecified, this will load default certs from certifi. transaction_id : str, optional The ID of the transaction to run operations on. Returns ------- Client A python_pachyderm client instance. """ if "://" not in pachd_address: pachd_address = "grpc://{}".format(pachd_address) u = urlparse(pachd_address) if u.scheme not in ("grpc", "http", "grpcs", "https"): raise ValueError("unrecognized pachd address scheme: {}".format(u.scheme)) if u.path or u.params or u.query or u.fragment or u.username or u.password: raise ValueError("invalid pachd address") return cls( host=u.hostname, port=u.port, auth_token=auth_token, root_certs=root_certs, transaction_id=transaction_id, tls=u.scheme == "grpcs" or u.scheme == "https", )
def from_config(config_file: Union[pathlib.Path, str] = PosixPath('~/.pachyderm/config.json')) ‑> Client
-
Creates a Pachyderm client from a config file.
Parameters
config_file
:Union[Path, str]
- The path to a config json file. config_file defaults to the local config.
Returns
Client
- A properly configured Client.
Expand source code
@classmethod def from_config(cls, config_file: Union[Path, str]=CONFIG_PATH_LOCAL) -> "Client": """Creates a Pachyderm client from a config file. Parameters ---------- config_file : Union[Path, str] The path to a config json file. config_file defaults to the local config. Returns ------- Client A properly configured Client. """ config = ConfigFile(config_file) active_context = config.active_context client = cls.from_pachd_address( active_context.active_pachd_address, auth_token=active_context.session_token, root_certs=active_context.server_cas_decoded, transaction_id=active_context.active_transaction, ) # Verify the deployment ID of the active context with the cluster. expected_deployment_id = active_context.cluster_deployment_id if expected_deployment_id: cluster_info = client.admin.inspect_cluster() if cluster_info.deployment_id != expected_deployment_id: raise BadClusterDeploymentID( expected_deployment_id, cluster_info.deployment_id ) return client
Instance variables
var auth_token
-
The authentication token. Used if authentication is enabled on the cluster.
Expand source code
@property def auth_token(self): """The authentication token. Used if authentication is enabled on the cluster.""" return self._auth_token
var transaction_id
-
The ID of the transaction to run operations on.
Expand source code
@property def transaction_id(self): """The ID of the transaction to run operations on.""" return self._transaction_id
Methods
def delete_all(self) ‑> None
-
Delete all repos, commits, files, pipelines, and jobs. This resets the cluster to its initial state.
Expand source code
def delete_all(self) -> None: """Delete all repos, commits, files, pipelines, and jobs. This resets the cluster to its initial state. """ # Try removing all identities if auth is activated. with contextlib.suppress(AuthServiceNotActivated): self.identity.delete_all() # Try deactivating auth if activated. with contextlib.suppress(AuthServiceNotActivated): self.auth.deactivate() # Try removing all licenses if auth is activated. with contextlib.suppress(AuthServiceNotActivated): self.license.delete_all() self.pps.delete_all() self.pfs.delete_all() self.transaction.delete_all()
def get_version(self) ‑> Version
-
Requests version information from the pachd cluster.
Expand source code
def get_version(self) -> Version: """Requests version information from the pachd cluster.""" return self._version_api.get_version()