Run Commands

Distributed Image Processing

Learn how to use Pachyderm and the SDK to process images in parallel.

💡

You can download this repo at pachyderm/docs-content and navigate to latest/sdk/examples/opencv to execute the following steps.

This is a reproduction of Pachyderm’s OpenCV example in Python. This example showcases the pachyderm-sdk analogs of common pachctl commands, such as creating repos and pipelines or getting a file.

The image being used to run the pipeline code is built from the Dockerfile located at pachyderm/examples/opencv. You will also find the edges.py script there.

Before You Start #

How to Create a Distributed Image Processing Pipeline #

  1. Create a file named opencv.py with the following code:

    
    import shutil
    import tempfile
    from pachyderm_sdk import Client
    from pachyderm_sdk.api import pfs, pps
    
    
    def main(client: Client):
        # Create a repo called images
        images = pfs.Repo.from_uri("images")
        client.pfs.create_repo(repo=images)
    
        # Create the edges pipeline (and the edges repo automatically). This
        # pipeline runs when data is committed to the images repo, as indicated
        # by the input field.
        edges = pps.Pipeline(name="edges")
        client.pps.create_pipeline(
            pipeline=edges,
            transform=pps.Transform(
                cmd=["python3", "/edges.py"],
                image="pachyderm/opencv",
            ),
            input=pps.Input(pfs=pps.PfsInput(repo=images.name, glob="/*")),
        )
    
        # Create the montage pipeline (and the montage repo automatically). This
        # pipeline runs when data is committed to either the images repo or edges
        # repo, as indicated by the input field.
        client.pps.create_pipeline(
            pipeline=pps.Pipeline(name="montage"),
            transform=pps.Transform(
                cmd=["sh"],
                image="v4tech/imagemagick",
                stdin=[
                    "montage -shadow -background SkyBlue -geometry 300x300+2+2 $(find /pfs ! -name .env -type f | sort) /pfs/out/montage.png"
                ],
            ),
            input=pps.Input(
                cross=[
                    pps.Input(pfs=pps.PfsInput(glob="/", repo=images.name)),
                    pps.Input(pfs=pps.PfsInput(glob="/", repo=edges.name)),
                ]
            ),
        )
    
        with client.pfs.commit(branch=pfs.Branch.from_uri("images@master")) as commit:
            # Add some images from urls.
            # Alternatively, you could use `client.put_file_from_file` or
            # `client_put_file_bytes`.
            client.pfs.put_file_from_url(commit=commit, path="/liberty.jpg", url="https://docs.pachyderm.com/images/opencv/liberty.jpg")
            client.pfs.put_file_from_url(commit=commit, path="/kitten.jpg", url="https://docs.pachyderm.com/images/opencv/kitten.jpg")
            client.pfs.put_file_from_url(commit=commit, path="/robot.jpg", url="https://docs.pachyderm.com/images/opencv/robot.jpg")
    
        # Wait for the commit (and its downstream commits) to finish
        commit.wait_all()
    
        job = pps.Job(pipeline=pps.Pipeline(name="montage"), id=commit.id)
        if client.pps.inspect_job(job=job).state != pps.JobState.JOB_SUCCESS:
            print("Montage job failed, aborting. Check the pipeline logs for more details:")
            print("pachctl logs --pipeline=montage")
            exit(1)
    
        # Get the montage
        source_file = client.pfs.pfs_file(file=pfs.File.from_uri("montage@master:/montage.png"))
        with tempfile.NamedTemporaryFile(suffix="montage.png", delete=False) as dest_file:
            shutil.copyfileobj(source_file, dest_file)
            print("montage written to {}".format(dest_file.name))
    
    
    def clean(client: Client):
        client.pps.delete_pipeline(pipeline=pps.Pipeline(name="montage"))
        client.pps.delete_pipeline(pipeline=pps.Pipeline(name="edges"))
        client.pfs.delete_repo(repo=pfs.Repo.from_uri("images"), force=True)
    
    
    if __name__ == "__main__":
        # Connects to a pachyderm cluster using the pachctl config file located
        # at ~/.pachyderm/config.json. For other setups, you'll want one of the 
        # alternatives:
        # 1) To connect to pachyderm when this script is running inside the
        #    cluster, use `Client.new_in_cluster()`.
        # 2) To connect to pachyderm via a pachd address, use
        #    `Client.new_from_pachd_address`.
        # 3) To explicitly set the host and port, pass parameters into
        #    `Client()`.
        # 4) To use a config file located elsewhere, pass in the path to that
        #    config file to Client.from_config()
        client = Client.from_config()
    
        clean(client)
        main(client)
  2. Run the script:

    $ python opencv.py