Module pachyderm_sdk.api.pps.extension
Handwritten classes/methods that augment the existing PPS API.
Expand source code
"""Handwritten classes/methods that augment the existing PPS API."""
import base64
import json
from queue import SimpleQueue
from typing import Dict, Generator, List
import grpc
from betterproto.lib.google.protobuf import Empty
from more_itertools import take
from . import ApiStub as _GeneratedApiStub
from . import (
ContinueCreateDatumRequest,
CreateDatumRequest,
DatumInfo,
Input,
Job,
Pipeline,
PipelineInfo,
StartCreateDatumRequest,
)
class ApiStub(_GeneratedApiStub):
def inspect_pipeline(
self,
*,
pipeline: "Pipeline" = None,
details: bool = False,
history: int = 0,
) -> "PipelineInfo":
"""Inspects a pipeline.
Parameters
----------
pipeline : pps.Pipeline
The pipeline to inspect.
details : bool, optional
If true, return pipeline details.
history : int, optional
Indicates to return historical versions of `pipeline_name`.
Semantics are:
- 0: Return current version of `pipeline_name`
- 1: Return the above and `pipeline_name` from the next most recent version.
- 2: etc.
- -1: Return all historical versions of `pipeline_name`.
Returns
-------
pps.PipelineInfo
Examples
--------
>>> from pachyderm_sdk import Client
>>> from pachyderm_sdk.api import pps
>>> client: Client
>>> pipeline_info = client.pps.inspect_pipeline(
>>> pipeline=pps.Pipeline(name="foo")
>>> )
"""
if history:
response = self.list_pipeline(pipeline=pipeline, history=history)
try:
return next(response)
except StopIteration:
raise ValueError("invalid pipeline")
return super().inspect_pipeline(pipeline=pipeline, details=details)
def pipeline_exists(self, pipeline: "Pipeline") -> bool:
"""Checks whether a pipeline exists.
Parameters
----------
pipeline: pps.Pipeline
The pipeline to check.
Returns
-------
bool
Whether the pipeline exists.
"""
try:
super().inspect_pipeline(pipeline=pipeline)
return True
except grpc.RpcError as err:
err: grpc.Call
if err.code() == grpc.StatusCode.NOT_FOUND:
return False
raise err
def job_exists(self, job: "Job") -> bool:
"""Checks whether a job exists.
Parameters
----------
job: pps.Job
The job to check.
Returns
-------
bool
Whether the job exists.
"""
try:
super().inspect_job(job=job)
return True
except grpc.RpcError as err:
err: grpc.Call
if err.code() == grpc.StatusCode.NOT_FOUND:
return False
raise err
# noinspection PyMethodOverriding
def create_secret(
self,
*,
name: str,
data: Dict,
labels: Dict[str, str] = None,
annotations: Dict[str, str] = None,
) -> Empty:
"""Creates a new secret.
Parameters
----------
name : str
The name of the secret.
data : Dict[str, Union[str, bytes]]
The data to store in the secret. Each key must consist of
alphanumeric characters ``-``, ``_`` or ``.``.
labels : Dict[str, str], optional
Kubernetes labels to attach to the secret.
annotations : Dict[str, str], optional
Kubernetes annotations to attach to the secret.
"""
encoded_data = {}
for k, v in data.items():
if isinstance(v, str):
v = v.encode("utf8")
encoded_data[k] = base64.b64encode(v).decode("utf8")
file = json.dumps(
{
"kind": "Secret",
"apiVersion": "v1",
"metadata": {
"name": name,
"labels": labels,
"annotations": annotations,
},
"data": encoded_data,
}
).encode()
return super().create_secret(file=file)
def generate_datums(
self, input_spec: "Input", batch_size: int
) -> Generator[List["DatumInfo"], int, None]:
"""Creates a generator that yields batches of datums for a given input spec
Parameters
----------
input_spec : pps.Input
The input spec.
batch_size : int
The number of datums to return. If 0, the default batch size set
server-side is returned. To change the batch size, use the `.send()`
method on the returned generator.
Returns
-------
Iterator[DatumInfo]
Examples
--------
datum_stream = client.pps.generate_datum(
input_spec=pps.Input(pfs=pps.PfsInput(repo="repo", glob="/*")),
batch_size=10,
)
dis = next(datum_stream) # Returns 10 datums.
more_dis = datum_stream.send(5) # Returns 5 datums.
"""
send_queue = SimpleQueue() # Put messages to be sent here.
stream = super().create_datum(
iter(send_queue.get, None)
) # The line of communication.
send_queue.put(
CreateDatumRequest(
start=StartCreateDatumRequest(input=input_spec, number=batch_size)
)
)
# Return the first batch. Users can .send() the next batch size to this generator.
# If nothing is sent then the original batch size is used.
new_batch_size = yield take(batch_size, stream)
while True:
send_queue.put(
CreateDatumRequest(
continue_=ContinueCreateDatumRequest(
number=new_batch_size or batch_size
)
)
)
batch = take(new_batch_size or batch_size, stream)
if len(batch) == 0:
# We want to catch when there are no datums left.
# Else, if users used a for-loop it would infinitely iterate.
return
new_batch_size = yield batch
Classes
class ApiStub (channel: grpc.Channel)
-
Expand source code
class ApiStub(_GeneratedApiStub): def inspect_pipeline( self, *, pipeline: "Pipeline" = None, details: bool = False, history: int = 0, ) -> "PipelineInfo": """Inspects a pipeline. Parameters ---------- pipeline : pps.Pipeline The pipeline to inspect. details : bool, optional If true, return pipeline details. history : int, optional Indicates to return historical versions of `pipeline_name`. Semantics are: - 0: Return current version of `pipeline_name` - 1: Return the above and `pipeline_name` from the next most recent version. - 2: etc. - -1: Return all historical versions of `pipeline_name`. Returns ------- pps.PipelineInfo Examples -------- >>> from pachyderm_sdk import Client >>> from pachyderm_sdk.api import pps >>> client: Client >>> pipeline_info = client.pps.inspect_pipeline( >>> pipeline=pps.Pipeline(name="foo") >>> ) """ if history: response = self.list_pipeline(pipeline=pipeline, history=history) try: return next(response) except StopIteration: raise ValueError("invalid pipeline") return super().inspect_pipeline(pipeline=pipeline, details=details) def pipeline_exists(self, pipeline: "Pipeline") -> bool: """Checks whether a pipeline exists. Parameters ---------- pipeline: pps.Pipeline The pipeline to check. Returns ------- bool Whether the pipeline exists. """ try: super().inspect_pipeline(pipeline=pipeline) return True except grpc.RpcError as err: err: grpc.Call if err.code() == grpc.StatusCode.NOT_FOUND: return False raise err def job_exists(self, job: "Job") -> bool: """Checks whether a job exists. Parameters ---------- job: pps.Job The job to check. Returns ------- bool Whether the job exists. """ try: super().inspect_job(job=job) return True except grpc.RpcError as err: err: grpc.Call if err.code() == grpc.StatusCode.NOT_FOUND: return False raise err # noinspection PyMethodOverriding def create_secret( self, *, name: str, data: Dict, labels: Dict[str, str] = None, annotations: Dict[str, str] = None, ) -> Empty: """Creates a new secret. Parameters ---------- name : str The name of the secret. data : Dict[str, Union[str, bytes]] The data to store in the secret. Each key must consist of alphanumeric characters ``-``, ``_`` or ``.``. labels : Dict[str, str], optional Kubernetes labels to attach to the secret. annotations : Dict[str, str], optional Kubernetes annotations to attach to the secret. """ encoded_data = {} for k, v in data.items(): if isinstance(v, str): v = v.encode("utf8") encoded_data[k] = base64.b64encode(v).decode("utf8") file = json.dumps( { "kind": "Secret", "apiVersion": "v1", "metadata": { "name": name, "labels": labels, "annotations": annotations, }, "data": encoded_data, } ).encode() return super().create_secret(file=file) def generate_datums( self, input_spec: "Input", batch_size: int ) -> Generator[List["DatumInfo"], int, None]: """Creates a generator that yields batches of datums for a given input spec Parameters ---------- input_spec : pps.Input The input spec. batch_size : int The number of datums to return. If 0, the default batch size set server-side is returned. To change the batch size, use the `.send()` method on the returned generator. Returns ------- Iterator[DatumInfo] Examples -------- datum_stream = client.pps.generate_datum( input_spec=pps.Input(pfs=pps.PfsInput(repo="repo", glob="/*")), batch_size=10, ) dis = next(datum_stream) # Returns 10 datums. more_dis = datum_stream.send(5) # Returns 5 datums. """ send_queue = SimpleQueue() # Put messages to be sent here. stream = super().create_datum( iter(send_queue.get, None) ) # The line of communication. send_queue.put( CreateDatumRequest( start=StartCreateDatumRequest(input=input_spec, number=batch_size) ) ) # Return the first batch. Users can .send() the next batch size to this generator. # If nothing is sent then the original batch size is used. new_batch_size = yield take(batch_size, stream) while True: send_queue.put( CreateDatumRequest( continue_=ContinueCreateDatumRequest( number=new_batch_size or batch_size ) ) ) batch = take(new_batch_size or batch_size, stream) if len(batch) == 0: # We want to catch when there are no datums left. # Else, if users used a for-loop it would infinitely iterate. return new_batch_size = yield batch
Ancestors
Methods
def inspect_pipeline(self, *, pipeline: Pipeline = None, details: bool = False, history: int = 0) ‑> PipelineInfo
-
Inspects a pipeline.
Parameters
pipeline
:pps.Pipeline
- The pipeline to inspect.
details
:bool
, optional- If true, return pipeline details.
history
:int
, optional-
Indicates to return historical versions of
pipeline_name
. Semantics are:- 0: Return current version of
pipeline_name
- 1: Return the above and
pipeline_name
from the next most recent version. - 2: etc.
- -1: Return all historical versions of
pipeline_name
.
- 0: Return current version of
Returns
pps.PipelineInfo
Examples
>>> from pachyderm_sdk import Client >>> from pachyderm_sdk.api import pps >>> client: Client >>> pipeline_info = client.pps.inspect_pipeline( >>> pipeline=pps.Pipeline(name="foo") >>> )
Expand source code
def inspect_pipeline( self, *, pipeline: "Pipeline" = None, details: bool = False, history: int = 0, ) -> "PipelineInfo": """Inspects a pipeline. Parameters ---------- pipeline : pps.Pipeline The pipeline to inspect. details : bool, optional If true, return pipeline details. history : int, optional Indicates to return historical versions of `pipeline_name`. Semantics are: - 0: Return current version of `pipeline_name` - 1: Return the above and `pipeline_name` from the next most recent version. - 2: etc. - -1: Return all historical versions of `pipeline_name`. Returns ------- pps.PipelineInfo Examples -------- >>> from pachyderm_sdk import Client >>> from pachyderm_sdk.api import pps >>> client: Client >>> pipeline_info = client.pps.inspect_pipeline( >>> pipeline=pps.Pipeline(name="foo") >>> ) """ if history: response = self.list_pipeline(pipeline=pipeline, history=history) try: return next(response) except StopIteration: raise ValueError("invalid pipeline") return super().inspect_pipeline(pipeline=pipeline, details=details)
def pipeline_exists(self, pipeline: Pipeline) ‑> bool
-
Checks whether a pipeline exists.
Parameters
pipeline
:pps.Pipeline
- The pipeline to check.
Returns
bool
- Whether the pipeline exists.
Expand source code
def pipeline_exists(self, pipeline: "Pipeline") -> bool: """Checks whether a pipeline exists. Parameters ---------- pipeline: pps.Pipeline The pipeline to check. Returns ------- bool Whether the pipeline exists. """ try: super().inspect_pipeline(pipeline=pipeline) return True except grpc.RpcError as err: err: grpc.Call if err.code() == grpc.StatusCode.NOT_FOUND: return False raise err
def job_exists(self, job: Job) ‑> bool
-
Checks whether a job exists.
Parameters
job
:pps.Job
- The job to check.
Returns
bool
- Whether the job exists.
Expand source code
def job_exists(self, job: "Job") -> bool: """Checks whether a job exists. Parameters ---------- job: pps.Job The job to check. Returns ------- bool Whether the job exists. """ try: super().inspect_job(job=job) return True except grpc.RpcError as err: err: grpc.Call if err.code() == grpc.StatusCode.NOT_FOUND: return False raise err
def create_secret(self, *, name: str, data: Dict, labels: Dict[str, str] = None, annotations: Dict[str, str] = None) ‑> betterproto.lib.google.protobuf.Empty
-
Creates a new secret.
Parameters
name
:str
- The name of the secret.
data
:Dict[str, Union[str, bytes]]
- The data to store in the secret. Each key must consist of
alphanumeric characters
-
,_
or.
. labels
:Dict[str, str]
, optional- Kubernetes labels to attach to the secret.
annotations
:Dict[str, str]
, optional- Kubernetes annotations to attach to the secret.
Expand source code
def create_secret( self, *, name: str, data: Dict, labels: Dict[str, str] = None, annotations: Dict[str, str] = None, ) -> Empty: """Creates a new secret. Parameters ---------- name : str The name of the secret. data : Dict[str, Union[str, bytes]] The data to store in the secret. Each key must consist of alphanumeric characters ``-``, ``_`` or ``.``. labels : Dict[str, str], optional Kubernetes labels to attach to the secret. annotations : Dict[str, str], optional Kubernetes annotations to attach to the secret. """ encoded_data = {} for k, v in data.items(): if isinstance(v, str): v = v.encode("utf8") encoded_data[k] = base64.b64encode(v).decode("utf8") file = json.dumps( { "kind": "Secret", "apiVersion": "v1", "metadata": { "name": name, "labels": labels, "annotations": annotations, }, "data": encoded_data, } ).encode() return super().create_secret(file=file)
def generate_datums(self, input_spec: Input, batch_size: int) ‑> Generator[List[DatumInfo], int, None]
-
Creates a generator that yields batches of datums for a given input spec
Parameters
input_spec
:pps.Input
- The input spec.
batch_size
:int
- The number of datums to return. If 0, the default batch size set
server-side is returned. To change the batch size, use the
.send()
method on the returned generator.
Returns
Iterator[DatumInfo]
Examples
datum_stream = client.pps.generate_datum( input_spec=pps.Input(pfs=pps.PfsInput(repo="repo", glob="/*")), batch_size=10, )
dis = next(datum_stream) # Returns 10 datums. more_dis = datum_stream.send(5) # Returns 5 datums.
Expand source code
def generate_datums( self, input_spec: "Input", batch_size: int ) -> Generator[List["DatumInfo"], int, None]: """Creates a generator that yields batches of datums for a given input spec Parameters ---------- input_spec : pps.Input The input spec. batch_size : int The number of datums to return. If 0, the default batch size set server-side is returned. To change the batch size, use the `.send()` method on the returned generator. Returns ------- Iterator[DatumInfo] Examples -------- datum_stream = client.pps.generate_datum( input_spec=pps.Input(pfs=pps.PfsInput(repo="repo", glob="/*")), batch_size=10, ) dis = next(datum_stream) # Returns 10 datums. more_dis = datum_stream.send(5) # Returns 5 datums. """ send_queue = SimpleQueue() # Put messages to be sent here. stream = super().create_datum( iter(send_queue.get, None) ) # The line of communication. send_queue.put( CreateDatumRequest( start=StartCreateDatumRequest(input=input_spec, number=batch_size) ) ) # Return the first batch. Users can .send() the next batch size to this generator. # If nothing is sent then the original batch size is used. new_batch_size = yield take(batch_size, stream) while True: send_queue.put( CreateDatumRequest( continue_=ContinueCreateDatumRequest( number=new_batch_size or batch_size ) ) ) batch = take(new_batch_size or batch_size, stream) if len(batch) == 0: # We want to catch when there are no datums left. # Else, if users used a for-loop it would infinitely iterate. return new_batch_size = yield batch