Splitting Data for Distributed Processing

Before you read this section, make sure that you understand the concepts described in Distributed Computing.

Pachyderm enables you to parallelize computations over data as long as that data can be split up into multiple datums. However, in many cases, you might have a dataset that you want or need to commit into Pachyderm as a single file rather than a bunch of smaller files that are easily mapped to datums, such as one file per record. For such cases, Pachyderm provides an easy way to prepare your dataset for subsequent distributed computing by splitting it upon uploading to a Pachyderm repository.

In this example, you have a dataset that consists of information about your users and a repository called user. This data is in CSV format in a single file called user_data.csv with one record per line:

$ head user_data.csv
1,[email protected],
2,[email protected],
3,[email protected],
4,[email protected],
5,[email protected],
6,[email protected],
7,[email protected],
8,[email protected],
9,[email protected],
10,[email protected],

If you put this data into Pachyderm as a single file, Pachyderm processes them a single datum. It cannot process each of these user records in parallel as separate datums. Potentially, you can manually separate these user records into standalone files before you commit them into the users repository or through a pipeline stage dedicated to this splitting task. However, Pachyderm provides an optimized way of completing this task.

The put file API includes an option for splitting the file into separate datums automatically. You can use the --split flag with the put file command.

To complete this example, follow the steps below:

  1. Create a users repository by running:

    pachctl create repo users
  2. Create a file called user_data.csv with the contents listed above.

  3. Put your user_data.csv file into Pachyderm and automatically split it into separate datums for each line:

    $ pachctl put file [email protected] -f user_data.csv --split line --target-file-datums 1

    The --split line argument specifies that Pachyderm splits this file into lines, and the --target-file-datums 1 argument specifies that each resulting file must include at most one datum or one line.

  4. View the list of files in the master branch of the users repository:

    $ pachctl list file [email protected]
    NAME                 TYPE                SIZE
    user_data.csv   dir                 5.346 KiB

    If you run pachctl list file command for the master branch in the users repository, Pachyderm still shows the user_data.csv entity to you as one entity in the repo However, this entity is now a directory that contains all of the split records.

  5. To view the detailed information about the user_data.csv file, run the command with the file name specified after a colon:

    $ pachctl list file [email protected]:user_data.csv
    NAME                             TYPE                SIZE
    user_data.csv/0000000000000000   file                43 B
    user_data.csv/0000000000000001   file                39 B
    user_data.csv/0000000000000002   file                37 B
    user_data.csv/0000000000000003   file                34 B
    user_data.csv/0000000000000004   file                35 B
    user_data.csv/0000000000000005   file                41 B
    user_data.csv/0000000000000006   file                32 B

    Then, a pipeline that takes the repo users as input with a glob pattern of /user_data.csv/* processes each user record, such as each line in the CSV file in parallel.

JSON and Text File Splitting Examples

Pachyderm supports this type of splitting for lines or JSON blobs as well. See the examples below.

  • Split a json file on json blobs by putting each json blob into a separate file.

    $ pachctl put file [email protected] -f user_data.json --split json --target-file-datums 1
  • Split a json file on json blobs by putting three json blobs into each split file.

    $ pachctl put file [email protected] -f user_data.json --split json --target-file-datums 3
  • Split a file on lines by putting each 100-bytes chunk into the split files.

    $ pachctl put file [email protected] -f user_data.txt --split line --target-file-bytes 100

Ingesting PostgresSQL data

Pachyderm supports direct data ingestion from PostgreSQL. You need first extract your database into a script file by using pg_dump and then add the data from the file into Pachyderm by running the pachctl put file with the --split flag.

When you use pachctl put file --split sql ..., Pachyderm splits you pgdump file into three parts - the header, rows, and the footer. The header contains all the SQL statements in the pgdump file that set up the schema and tables. The rows are split into individual files, or if you specify the --target-file-datums or --target-file-bytes, multiple rows per file. The footer contains the remaining SQL statements for setting up the tables.

The header and footer are stored in the directory that contains the rows. If you request a get file on that directory, you get just the header and footer. If you request an individual file, you see the header, the row or rows, and the footer. If you request all the files with a glob pattern, for example, /directoryname/*, you receive the header, all the rows, and the footer recreating the full pgdump. Therefore, you can construct full or partial pgdump files so that you can load full or partial datasets.

To put your PostgreSQL data into Pachyderm, complete the following steps:

  1. Generate a pgdump file:


    $ pg_dump -t users -f users.pgdump
  2. View the pgdump file


    $ cat users.pgdump 
    -- PostgreSQL database dump
    -- Dumped from database version 9.5.12
    -- Dumped by pg_dump version 9.5.12
    SET statement_timeout = 0;
    SET lock_timeout = 0;
    SET client_encoding = 'UTF8';
    SET standard_conforming_strings = on;
    SELECT pg_catalog.set_config('search_path', '', false);
    SET check_function_bodies = false;
    SET client_min_messages = warning;
    SET row_security = off;
    SET default_tablespace = '';
    SET default_with_oids = false;
    -- Name: users; Type: TABLE; Schema: public; Owner: postgres
    CREATE TABLE public.users (
        id integer NOT NULL,
        name text NOT NULL,
        saying text NOT NULL
    ALTER TABLE public.users OWNER TO postgres;
    -- Data for Name: users; Type: TABLE DATA; Schema: public; Owner: postgres
    COPY public.users (id, name, saying) FROM stdin;
    0    wile E Coyote   ...
    1    road runner \\.
    -- PostgreSQL database dump complete
  3. Ingest the SQL data by using the pachctl put file command with the --split file:

    $ pachctl put file [email protected] -f users.pgdump --split sql
    $ pachctl put file [email protected]:users --split sql -f users.pgdump
  4. View the information about your repository:

    $ pachctl list file [email protected]
    NAME         TYPE SIZE
    users        dir  914B

    The users.pgdump file is added to the master branch in the data repository.

  5. View the information about the users.pgdump file:

    $ pachctl list file [email protected]:users
    NAME                           TYPE SIZE
    /users/0000000000000000        file 20B
    /users/0000000000000001        file 18B
  6. In your pipeline, where you have started and forked PostgreSQL, you can load the data by running the following or a similar script:

    $ cat /pfs/data/users/* | sudo -u postgres psql

    By using the glob pattern /*, this code loads each raw PostgreSQL chunk into your PostgreSQL instance for processing by your pipeline.

Tip: For this use case, you might want to use --target-file-datums or --target-file-bytes because these commands enable your queries to run against many rows at a time.