Run Commands

SQL Ingest

Learn how to set up the SQL Ingest Tool to import data.

⚠ī¸

SQL Ingest is an experimental feature.

You can inject database content, collected by your data warehouse, by pulling the result of a given query into Pachyderm and saving it as a CSV or JSON file.

Before You Start #


How to Set Up SQL Ingest #

1. Create & Upload a Secret #

You must generate a secret that contains the password granting user access to the database; you will pass the username details through the database connection string in step 2.

  1. Copy the following:
    kubectl create secret generic yourSecretName --from-literal=PACHYDERM_SQL_PASSWORD=yourDatabaseUserPassword --dry-run=client --output=json > yourSecretFile.json
  2. Swap out yourSecretName, yourDatabaseUserPassword, and yourSecretFile with relevant inputs.
  3. Open a terminal and run the command.
  4. Copy the following:
    pachctl create secret -f yourSecretFile.json
  5. Swap out yourSecretfile with relevant filename.
  6. Run the command.
  7. Confirm secret by running pachctl list secret.
ℹī¸

Not all secret formats are the same. For a full walkthrough on how to create, edit, and view different types of secrets, see Create and Manage Secrets in Pachyderm.

2. Create a Database Connection String #

Pachyderm’s SQL Ingest requires a connection string defined as a Jsonnet URL parameter to connect to your database; the URL is structured as follows:

<protocol>://<username>@<host>:<port>/<database>?<param1>=<value1>&<param2>=<value2>

3. Create a Pipeline Spec #

Pachyderm provides a default Jsonnet template that has key parameters built in. To use it, you must pass an argument for each parameter.

  1. Copy the following:
    pachctl update pipeline --jsonnet https://raw.githubusercontent.com/pachyderm/pachyderm/2.7.x/src/templates/sql_ingest_cron.jsonnet \
      --arg name=<pipelineName> \
      --arg url="<connectionStringToDdatabase>" \
      --arg query="<query>" \
      --arg hasHeader=<boolean> \
      --arg cronSpec="<pullInterval>" \
      --arg secretName="<youSecretName>" \
      --arg format=<CsvOrJson> 
      --arg outputFile='<fileName>'
  2. Swap out all of the parameter values with relevant inputs.
  3. Open terminal.
  4. Run the command.

4. View Query & Results #

  • To View Query String: pachctl inspect pipeline <pipelineName>
  • To View Output File Name: pachctl list file <pipelineName>@master
  • To View Output File Contents: pachctl get file <pipelineName>@master:/0000

Example: Snowflake #

In this example, we are leveraging Snowflake’s support for queries traversing semi-structured data (here, JSON).

  1. Create a secret with your password named snowflakeSecret.
  2. Create a Snowflake specific database connection URL using the following details:
    • Protocol: snowflake
    • Username: username
    • Host: VCNYTW-MH64356 (account name or locator)
    • Database: SNOWFLAKE_SAMPLE_DATA
    • Schema: WEATHER
    • Warehouse: COMPUTE_WH
    snowflake://username@VCNYTW-MH64356/SNOWFLAKE_SAMPLE_DATA/WEATHER?warehouse=COMPUTE_WH
  3. Build query for the table DAILY_14_TOTAL using information from column V.
    select T, V:city.name, V:data[0].weather[0].description as morning, V:data[12].weather[0].description as pm FROM DAILY_14_TOTAL LIMIT 1
  4. Define the pipeline spec by populating all of the parameter values:
   pachctl update pipeline --jsonnet https://raw.githubusercontent.com/pachyderm/pachyderm/2.7.x/src/templates/sql_ingest_cron.jsonnet \
   --arg name=mysnowflakeingest \
   --arg url="snowflake://username@VCNYTW-MH64356/SNOWFLAKE_SAMPLE_DATA/WEATHER?warehouse=COMPUTE_WH" \
   --arg query="select T, V:city.name, V:data[0].weather[0].description as morning, V:data[12].weather[0].description as pm FROM DAILY_14_TOTAL LIMIT 1" \
   --arg hasHeader=true \
   --arg cronSpec="@every 30s" \
   --arg secretName="snowflakeSecret" \
   --arg format=json
  1. Run the command.

How Does This Work? #

SQL Ingest’s Jsonnet pipeline spec, sql_ingest_cron.jsonnet, creates all of the following:

  • 1 Input Data Repo: Used to store timestamp files at the cronSpec’s set interval rate (--arg cronSpec="pullInterval" \) to trigger the pipeline.
  • 1 Cron Pipeline: Houses the spec details that define the input type and settings and data transformation.
  • 1 Output Repo: Used to store the data transformed by the cron pipeline; set by the pipeline spec’s pipeline.name attribute, which you can define through the Jsonnet parameter --arg name=outputRepoName \.
  • 1 Output File: Used to save the query results (JSON or CSV) and potentially be used as input for a following pipeline.

sql-ingest-diagram

In the default Jsonnet template, the file generated is obtainable from the output repo, outputRepoName@master:/0000. The filename is hardcoded, however you could paramaterize this as well using a custom Jsonnet pipeline spec and passing --arg outputFile='0000'. The file’s contents are the result of the query(--arg query="query") being ran against the database--arg url="connectionStringToDdatabase" ; both are defined in the transform.cmd attribute.

About SQL Ingest Pipeline Specs #

To create an SQL Ingest Jsonnet Pipeline spec, you must have a .jsonnet file and several parameters:

pachctl update pipeline --jsonnet https://raw.githubusercontent.com/pachyderm/pachyderm/2.7.x/src/templates/sql_ingest_cron.jsonnet \
  --arg name=<pipelineName> \
  --arg url="<connectionStringToDdatabase>" \
  --arg query="<query>" \
  --arg hasHeader=<boolean> \
  --arg cronSpec="<pullInterval>" \
  --arg secretName="<secretName>" \
  --arg format=<CsvOrJson> 
  • The name of each pipeline (and their related input/output repos) are derived from the name parameter (--arg name=<pipelineName>).

Parameters #

ParameterDescription
nameThe name of output repo where query results will materialize.
urlThe connection string to the database.
queryThe SQL query to be run against the connected database.
hasHeaderAdds a header to your CSV file if set to true. Ignored if format="json" (JSON files always display (header,value) pairs for each returned row). Defaults to false.

Pachyderm creates the header after each element of the comma separated list of your SELECT clause.
For example country.country_name_eng will have country.country_name_eng as header while country.country_name_eng as country_name will have country_name.
cronSpecHow often to run the query. For example "@every 60s".
formatThe type of your output file containing the results of your query (either json or csv).
secretNameThe Kubernetes secret name that contains the password to the database.
outputFileThe name of the file created by your pipeline and stored in your output repo; default 0000

URL Parameter Details #

<protocol>://<username>@<host>:<port>/<database>?<param1>=<value1>&<param2>=<value2>
  • Passwords are not included in the URL; they are retrieved from a secret.
  • The additional parameters after ? are optional and needed on a case-by-case bases (for example, Snowflake).
ParameterDescription
protocolThe name of the database protocol.
As of today, we support:
- postgres and postgresql : connect to Postgresql or compatible (for example Redshift).
- mysql : connect to MySQL or compatible (for example MariaDB).
- snowflake : connect to Snowflake.
usernameThe user used to access the database.
hostThe hostname of your database instance.
portThe port number your instance is listening on.
databaseThe name of the database to connect to.
Snowflake #

Pachyderm supports two connection URL patterns to query Snowflake:

snowflake://username@<account_identifier>/<db_name>/<schema_name>?warehouse=<warehouse_name>
snowflake://username@hostname:port/<db_name>/<schema_name>?account=<account_identifier>&warehouse=<warehouse_name>

The account_identifier takes one of the following forms for most URLs:

Formats & SQL Data Types #

The following comments on formatting reflect the state of this release and are subject to change.

Formats #

Numeric #

All numeric values are converted into strings in your CSV and JSON.

DatabaseCSVJSON
1234512345“12345”
123.45123.45“123.45”
⚠ī¸
  • Note that infinite (Inf) and not a number (NaN) values will also be stored as strings in JSON files.
  • Use this format #.# for all decimals that you plan to egress back to a database.
Date/Timestamps #
TypeDatabaseCSVJSON
Date2022-05-092022-05-09T00:00:00“2022-05-09T00:00:00”
Timestamp ntz2022-05-09 16:43:002022-05-09T16:43:00“2022-05-09T16:43:00”
Timestamp tz2022-05-09 16:43:00-05:002022-05-09T16:43:00-05:00“2022-05-09T16:43:00-05:00”
Strings #
DatabaseCSV
“null”null
`""`""""""
""""
nil
"my string"“““my string”””
“this will be enclosed in quotes because it has a ,”“this will be enclosed in quotes because it has a ,”
💡

When parsing your CSVs in your user code, remember to escape " with "".

Supported Data Types #

Some of the Data Types listed in this section are specific to a particular database.

Dates/TimestampsVarcharsNumericsBooleans
DATE
TIME
TIMESTAMP
TIMESTAMP_LTZ
TIMESTAMP_NTZ
TIMESTAMP_TZ
TIMESTAMPTZ
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITHOUT TIME ZONE
VARCHAR
TEXT
CHARACTER VARYING
SMALLINT
INT2
INTEGER
INT
INT4
BIGINT
INT8
FLOAT
FLOAT4
FLOAT8
REAL
DOUBLE PRECISION
NUMERIC
DECIMAL
NUMBER
BOOL
BOOLEAN