Module pachyderm_sdk.api.storage.extension

Handwritten classes/methods that augment the existing Storage API.

Expand source code
"""Handwritten classes/methods that augment the existing Storage API."""

import os
from pathlib import Path
from typing import Union

from betterproto import which_one_of

from ..cdr.resolver import (
    CdrResolver,
    Ref,
    Cipher,
    Compress,
    Concat,
    ContentHash,
    Http,
    SizeLimits,
    Slice,
)
from . import FilesetStub as _GeneratedApiStub
from . import FileFilter, PathRange
from pachyderm_sdk.constants import CDR_CACHE_LOCATION


class ApiStub(_GeneratedApiStub):

    def assemble_fileset(
        self,
        fileset: str,
        path: Union[str, PathRange],
        destination: os.PathLike,
        *,
        cache_location: os.PathLike = CDR_CACHE_LOCATION,
        fetch_missing_chunks: bool = True,
        http_host_replacement: str = "",
    ) -> None:
        """Assemble the data contained at the specified path within the fileset
        and write that data to the specified destination directory. This method
        uses a local cache and will automatically fetch missing data from PFS,
        unless disabled.

        Parameters
        ----------
        fileset : str
            The UUID of the fileset.
        path : typing.Union[str, PathRange]
            The path, regex, or PathRange within the fileset to assemble.
            A value of "/" assembles the entire fileset.
        destination : os.PathLike
            The local directory at which to assemble the fileset.
        cache_location : os.PathLike (kwarg only)
            The location of the chunk cache. This is also configurable thru ENV VAR.
        fetch_missing_chunks : bool (kwarg only)
            If true, fetch any missing chunks needed to assemble the fileset.
            If false, raise an error if any chunks are missing.
        http_host_replacement : str (kwarg only)
            The value of this parameter replaces the host (including port) within
            the presigned URLs when resolving CDRs. This configuration is useful
            if, for some reason, the URL that pachd uses to interact with object
            storage is different from the one that you use. For example, if your
            Pachyderm object storage is running within your kubernetes cluster and
            pachd is configured to use a URL which is only valid in cluster.
            This may fix any issues, depending on the object storage and how they
            generate presigned URLs.
            Example: localhost:9000

        Raises
        ------
            FileNotFoundError:
                if chunk is missing from cache and fetch_missing_chunks=False.
        """
        destination = Path(os.path.expanduser(destination)).resolve()
        destination.mkdir(parents=True, exist_ok=True)
        resolver = CdrResolver(
            cache_location=cache_location,
            fetch_missing_chunks=fetch_missing_chunks,
            http_host_replacement=http_host_replacement,
        )

        for msg in self.read_fileset_cdr(fileset_id=fileset, filters=[as_filter(path)]):
            content = resolver.resolve(msg.ref)

            # File paths returned from PFS are absolute.
            # By removing the leading "/", we convert the path from absolute
            #   to relative from the repo root.
            file_path = msg.path.removeprefix("/")
            destination_file = destination.joinpath(file_path)
            destination_file.parent.mkdir(parents=True, exist_ok=True)
            destination_file.write_bytes(content)

    def fetch_chunks(
        self,
        fileset: str,
        path: Union[str, PathRange],
        *,
        cache_location: os.PathLike = CDR_CACHE_LOCATION,
        http_host_replacement: str = "",
        prune: bool = False,
    ) -> None:
        """Fetch all of the PFS chunks required to assemble the data contained
        at the specified path within the fileset and write them to the cache
        location.

        Parameters
        ----------
        fileset : str
            The UUID of the fileset.
        path : typing.Union[str, PathRange]
            A path, regex, or PathRange within the fileset.
            A value of "/" fetches the entire fileset.
        cache_location : os.PathLike (kwarg only)
            The location of the chunk cache. This is also configurable thru ENV VAR.
        http_host_replacement : str (kwarg only)
            The value of this parameter replaces the host (including port) within
            the presigned URLs when resolving CDRs. This configuration is useful
            if, for some reason, the URL that pachd uses to interact with object
            storage is different from the one that you use. For example, if your
            Pachyderm object storage is running within your kubernetes cluster and
            pachd is configured to use a URL which is only valid in cluster.
            This may fix any issues, depending on the object storage and how they
            generate presigned URLs.
            Example: localhost:9000
        prune : bool (kwarg only)
            If true, prune (delete) any chunks that are not required to assemble the fileset.
        """
        resolver = CdrResolver(
            cache_location=cache_location, http_host_replacement=http_host_replacement
        )

        unused_chunks = set()
        if prune:
            unused_chunks = {file.name for file in resolver.cache.iterdir()}

        def cache_chunks(ref: Ref) -> None:
            """This is an optimization. We could simply call resolver.resolve() and
            discard the output, but that would preform additional computation for
            decompressing, decrypting, assembling, etc."""
            field, body = which_one_of(ref, "body")
            if isinstance(body, Http):
                raise ValueError(
                    "malformed CDR: no ContentHash message contained within the CDR"
                )
            elif isinstance(body, ContentHash):
                unused_chunks.discard(resolver._chunk_name(body))
                resolver._dereference_content_hash(body)
                return
            elif isinstance(body, (Cipher, Compress, SizeLimits, Slice)):
                return cache_chunks(body.inner)
            elif isinstance(body, Concat):
                for inner_ref in body.refs:
                    cache_chunks(inner_ref)
            else:
                raise ValueError(f"unsupported Ref variant: {body}")

        for msg in self.read_fileset_cdr(fileset_id=fileset, filters=[as_filter(path)]):
            cache_chunks(msg.ref)

        for file_name in unused_chunks:
            file = resolver.cache.joinpath(file_name)
            if file.is_file():
                file.unlink(missing_ok=True)


def as_filter(path: Union[str, PathRange]) -> FileFilter:
    file_filter = FileFilter()
    if isinstance(path, PathRange):
        file_filter.path_range = path
    else:
        file_filter.path_regex = path
    return file_filter

Functions

def as_filter(path: Union[str, PathRange]) ‑> FileFilter
Expand source code
def as_filter(path: Union[str, PathRange]) -> FileFilter:
    file_filter = FileFilter()
    if isinstance(path, PathRange):
        file_filter.path_range = path
    else:
        file_filter.path_regex = path
    return file_filter

Classes

class ApiStub (channel: grpc.Channel)
Expand source code
class ApiStub(_GeneratedApiStub):

    def assemble_fileset(
        self,
        fileset: str,
        path: Union[str, PathRange],
        destination: os.PathLike,
        *,
        cache_location: os.PathLike = CDR_CACHE_LOCATION,
        fetch_missing_chunks: bool = True,
        http_host_replacement: str = "",
    ) -> None:
        """Assemble the data contained at the specified path within the fileset
        and write that data to the specified destination directory. This method
        uses a local cache and will automatically fetch missing data from PFS,
        unless disabled.

        Parameters
        ----------
        fileset : str
            The UUID of the fileset.
        path : typing.Union[str, PathRange]
            The path, regex, or PathRange within the fileset to assemble.
            A value of "/" assembles the entire fileset.
        destination : os.PathLike
            The local directory at which to assemble the fileset.
        cache_location : os.PathLike (kwarg only)
            The location of the chunk cache. This is also configurable thru ENV VAR.
        fetch_missing_chunks : bool (kwarg only)
            If true, fetch any missing chunks needed to assemble the fileset.
            If false, raise an error if any chunks are missing.
        http_host_replacement : str (kwarg only)
            The value of this parameter replaces the host (including port) within
            the presigned URLs when resolving CDRs. This configuration is useful
            if, for some reason, the URL that pachd uses to interact with object
            storage is different from the one that you use. For example, if your
            Pachyderm object storage is running within your kubernetes cluster and
            pachd is configured to use a URL which is only valid in cluster.
            This may fix any issues, depending on the object storage and how they
            generate presigned URLs.
            Example: localhost:9000

        Raises
        ------
            FileNotFoundError:
                if chunk is missing from cache and fetch_missing_chunks=False.
        """
        destination = Path(os.path.expanduser(destination)).resolve()
        destination.mkdir(parents=True, exist_ok=True)
        resolver = CdrResolver(
            cache_location=cache_location,
            fetch_missing_chunks=fetch_missing_chunks,
            http_host_replacement=http_host_replacement,
        )

        for msg in self.read_fileset_cdr(fileset_id=fileset, filters=[as_filter(path)]):
            content = resolver.resolve(msg.ref)

            # File paths returned from PFS are absolute.
            # By removing the leading "/", we convert the path from absolute
            #   to relative from the repo root.
            file_path = msg.path.removeprefix("/")
            destination_file = destination.joinpath(file_path)
            destination_file.parent.mkdir(parents=True, exist_ok=True)
            destination_file.write_bytes(content)

    def fetch_chunks(
        self,
        fileset: str,
        path: Union[str, PathRange],
        *,
        cache_location: os.PathLike = CDR_CACHE_LOCATION,
        http_host_replacement: str = "",
        prune: bool = False,
    ) -> None:
        """Fetch all of the PFS chunks required to assemble the data contained
        at the specified path within the fileset and write them to the cache
        location.

        Parameters
        ----------
        fileset : str
            The UUID of the fileset.
        path : typing.Union[str, PathRange]
            A path, regex, or PathRange within the fileset.
            A value of "/" fetches the entire fileset.
        cache_location : os.PathLike (kwarg only)
            The location of the chunk cache. This is also configurable thru ENV VAR.
        http_host_replacement : str (kwarg only)
            The value of this parameter replaces the host (including port) within
            the presigned URLs when resolving CDRs. This configuration is useful
            if, for some reason, the URL that pachd uses to interact with object
            storage is different from the one that you use. For example, if your
            Pachyderm object storage is running within your kubernetes cluster and
            pachd is configured to use a URL which is only valid in cluster.
            This may fix any issues, depending on the object storage and how they
            generate presigned URLs.
            Example: localhost:9000
        prune : bool (kwarg only)
            If true, prune (delete) any chunks that are not required to assemble the fileset.
        """
        resolver = CdrResolver(
            cache_location=cache_location, http_host_replacement=http_host_replacement
        )

        unused_chunks = set()
        if prune:
            unused_chunks = {file.name for file in resolver.cache.iterdir()}

        def cache_chunks(ref: Ref) -> None:
            """This is an optimization. We could simply call resolver.resolve() and
            discard the output, but that would preform additional computation for
            decompressing, decrypting, assembling, etc."""
            field, body = which_one_of(ref, "body")
            if isinstance(body, Http):
                raise ValueError(
                    "malformed CDR: no ContentHash message contained within the CDR"
                )
            elif isinstance(body, ContentHash):
                unused_chunks.discard(resolver._chunk_name(body))
                resolver._dereference_content_hash(body)
                return
            elif isinstance(body, (Cipher, Compress, SizeLimits, Slice)):
                return cache_chunks(body.inner)
            elif isinstance(body, Concat):
                for inner_ref in body.refs:
                    cache_chunks(inner_ref)
            else:
                raise ValueError(f"unsupported Ref variant: {body}")

        for msg in self.read_fileset_cdr(fileset_id=fileset, filters=[as_filter(path)]):
            cache_chunks(msg.ref)

        for file_name in unused_chunks:
            file = resolver.cache.joinpath(file_name)
            if file.is_file():
                file.unlink(missing_ok=True)

Ancestors

Methods

def assemble_fileset(self, fileset: str, path: Union[str, PathRange], destination: os.PathLike, *, cache_location: os.PathLike = PosixPath('~/.pachyderm/cdr_cache'), fetch_missing_chunks: bool = True, http_host_replacement: str = '') ‑> None

Assemble the data contained at the specified path within the fileset and write that data to the specified destination directory. This method uses a local cache and will automatically fetch missing data from PFS, unless disabled.

Parameters

fileset : str
The UUID of the fileset.
path : typing.Union[str, PathRange]
The path, regex, or PathRange within the fileset to assemble. A value of "/" assembles the entire fileset.
destination : os.PathLike
The local directory at which to assemble the fileset.
cache_location : os.PathLike (kwarg only)
The location of the chunk cache. This is also configurable thru ENV VAR.
fetch_missing_chunks : bool (kwarg only)
If true, fetch any missing chunks needed to assemble the fileset. If false, raise an error if any chunks are missing.
http_host_replacement : str (kwarg only)
The value of this parameter replaces the host (including port) within the presigned URLs when resolving CDRs. This configuration is useful if, for some reason, the URL that pachd uses to interact with object storage is different from the one that you use. For example, if your Pachyderm object storage is running within your kubernetes cluster and pachd is configured to use a URL which is only valid in cluster. This may fix any issues, depending on the object storage and how they generate presigned URLs. Example: localhost:9000

Raises

FileNotFoundError:
    if chunk is missing from cache and fetch_missing_chunks=False.
Expand source code
def assemble_fileset(
    self,
    fileset: str,
    path: Union[str, PathRange],
    destination: os.PathLike,
    *,
    cache_location: os.PathLike = CDR_CACHE_LOCATION,
    fetch_missing_chunks: bool = True,
    http_host_replacement: str = "",
) -> None:
    """Assemble the data contained at the specified path within the fileset
    and write that data to the specified destination directory. This method
    uses a local cache and will automatically fetch missing data from PFS,
    unless disabled.

    Parameters
    ----------
    fileset : str
        The UUID of the fileset.
    path : typing.Union[str, PathRange]
        The path, regex, or PathRange within the fileset to assemble.
        A value of "/" assembles the entire fileset.
    destination : os.PathLike
        The local directory at which to assemble the fileset.
    cache_location : os.PathLike (kwarg only)
        The location of the chunk cache. This is also configurable thru ENV VAR.
    fetch_missing_chunks : bool (kwarg only)
        If true, fetch any missing chunks needed to assemble the fileset.
        If false, raise an error if any chunks are missing.
    http_host_replacement : str (kwarg only)
        The value of this parameter replaces the host (including port) within
        the presigned URLs when resolving CDRs. This configuration is useful
        if, for some reason, the URL that pachd uses to interact with object
        storage is different from the one that you use. For example, if your
        Pachyderm object storage is running within your kubernetes cluster and
        pachd is configured to use a URL which is only valid in cluster.
        This may fix any issues, depending on the object storage and how they
        generate presigned URLs.
        Example: localhost:9000

    Raises
    ------
        FileNotFoundError:
            if chunk is missing from cache and fetch_missing_chunks=False.
    """
    destination = Path(os.path.expanduser(destination)).resolve()
    destination.mkdir(parents=True, exist_ok=True)
    resolver = CdrResolver(
        cache_location=cache_location,
        fetch_missing_chunks=fetch_missing_chunks,
        http_host_replacement=http_host_replacement,
    )

    for msg in self.read_fileset_cdr(fileset_id=fileset, filters=[as_filter(path)]):
        content = resolver.resolve(msg.ref)

        # File paths returned from PFS are absolute.
        # By removing the leading "/", we convert the path from absolute
        #   to relative from the repo root.
        file_path = msg.path.removeprefix("/")
        destination_file = destination.joinpath(file_path)
        destination_file.parent.mkdir(parents=True, exist_ok=True)
        destination_file.write_bytes(content)
def fetch_chunks(self, fileset: str, path: Union[str, PathRange], *, cache_location: os.PathLike = PosixPath('~/.pachyderm/cdr_cache'), http_host_replacement: str = '', prune: bool = False) ‑> None

Fetch all of the PFS chunks required to assemble the data contained at the specified path within the fileset and write them to the cache location.

Parameters

fileset : str
The UUID of the fileset.
path : typing.Union[str, PathRange]
A path, regex, or PathRange within the fileset. A value of "/" fetches the entire fileset.
cache_location : os.PathLike (kwarg only)
The location of the chunk cache. This is also configurable thru ENV VAR.
http_host_replacement : str (kwarg only)
The value of this parameter replaces the host (including port) within the presigned URLs when resolving CDRs. This configuration is useful if, for some reason, the URL that pachd uses to interact with object storage is different from the one that you use. For example, if your Pachyderm object storage is running within your kubernetes cluster and pachd is configured to use a URL which is only valid in cluster. This may fix any issues, depending on the object storage and how they generate presigned URLs. Example: localhost:9000
prune : bool (kwarg only)
If true, prune (delete) any chunks that are not required to assemble the fileset.
Expand source code
def fetch_chunks(
    self,
    fileset: str,
    path: Union[str, PathRange],
    *,
    cache_location: os.PathLike = CDR_CACHE_LOCATION,
    http_host_replacement: str = "",
    prune: bool = False,
) -> None:
    """Fetch all of the PFS chunks required to assemble the data contained
    at the specified path within the fileset and write them to the cache
    location.

    Parameters
    ----------
    fileset : str
        The UUID of the fileset.
    path : typing.Union[str, PathRange]
        A path, regex, or PathRange within the fileset.
        A value of "/" fetches the entire fileset.
    cache_location : os.PathLike (kwarg only)
        The location of the chunk cache. This is also configurable thru ENV VAR.
    http_host_replacement : str (kwarg only)
        The value of this parameter replaces the host (including port) within
        the presigned URLs when resolving CDRs. This configuration is useful
        if, for some reason, the URL that pachd uses to interact with object
        storage is different from the one that you use. For example, if your
        Pachyderm object storage is running within your kubernetes cluster and
        pachd is configured to use a URL which is only valid in cluster.
        This may fix any issues, depending on the object storage and how they
        generate presigned URLs.
        Example: localhost:9000
    prune : bool (kwarg only)
        If true, prune (delete) any chunks that are not required to assemble the fileset.
    """
    resolver = CdrResolver(
        cache_location=cache_location, http_host_replacement=http_host_replacement
    )

    unused_chunks = set()
    if prune:
        unused_chunks = {file.name for file in resolver.cache.iterdir()}

    def cache_chunks(ref: Ref) -> None:
        """This is an optimization. We could simply call resolver.resolve() and
        discard the output, but that would preform additional computation for
        decompressing, decrypting, assembling, etc."""
        field, body = which_one_of(ref, "body")
        if isinstance(body, Http):
            raise ValueError(
                "malformed CDR: no ContentHash message contained within the CDR"
            )
        elif isinstance(body, ContentHash):
            unused_chunks.discard(resolver._chunk_name(body))
            resolver._dereference_content_hash(body)
            return
        elif isinstance(body, (Cipher, Compress, SizeLimits, Slice)):
            return cache_chunks(body.inner)
        elif isinstance(body, Concat):
            for inner_ref in body.refs:
                cache_chunks(inner_ref)
        else:
            raise ValueError(f"unsupported Ref variant: {body}")

    for msg in self.read_fileset_cdr(fileset_id=fileset, filters=[as_filter(path)]):
        cache_chunks(msg.ref)

    for file_name in unused_chunks:
        file = resolver.cache.joinpath(file_name)
        if file.is_file():
            file.unlink(missing_ok=True)