BigQuery Loader
Under the umbrella of Snowplow BigQuery Loader, we have a family of applications that can be used to load enriched Snowplow data into BigQuery.
At the high level, BigQuery loader reads enriched Snowplow events in real time and loads them in BigQuery using the Storage Write API.
BigQuery loader consists of three applications: Loader, Mutator and Repeater. The following diagram illustrates the interaction between them and BigQuery:
For more information on how events are stored in BigQuery, check the mapping between Snowplow schemas and the corresponding BigQuery column types.
Technical Architecture
The available tools are:
- Snowplow BigQuery StreamLoader, a standalone Scala app that can be deployed on Google Kubernetes Engine.
- Snowplow BigQuery Loader, an alternative to StreamLoader, in the form of a Google Cloud Dataflow job.
- Snowplow BigQuery Mutator, a Scala app that performs table updates to add new columns as required.
- Snowplow BigQuery Repeater, a Scala app that reads failed inserts (caused by table update lag) and re-tries inserting them into BigQuery after some delay, sinking failures into a dead-letter bucket.
Snowplow BigQuery StreamLoader
- Reads Snowplow enriched events from a dedicated Pub/Sub subscription.
- Uses the JSON transformer from the Snowplow Scala Analytics SDK to convert those enriched events into JSON.
- Uses Iglu Client to fetch JSON schemas for self-describing events and entities.
- Uses Iglu Schema DDL to transform self-describing events and entities into BigQuery format.
- Writes transformed data into BigQuery.
- Writes all encountered Iglu types into a dedicated Pub/Sub topic (the
types
topic). - Writes all data that failed to be validated against its schema into a dedicated
badRows
Pub/Sub topic. - Writes all data that was successfully transformed, but could not be loaded into a dedicated
failedInserts
topic.
Snowplow BigQuery Loader
An Apache Beam job intended to run on Google Cloud Dataflow. An alternative to the StreamLoader application, it has the same algorithm.
Snowplow BigQuery Mutator
The Mutator app is in charge of performing automatic table updates, which means you do not have to pause loading and manually update the table every time you're adding a new custom self-describing event or entity.
- Reads messages from a dedicated subscription to the
types
topic. - Finds out if a message contains a type that has not been encountered yet (by checking internal cache).
- If a message contains a new type, double-checks it with the connected BigQuery table.
- If the type is not in the table, fetches its JSON schema from an Iglu registry.
- Transforms the JSON schema into BigQuery column definition.
- Adds the column to the connected BigQuery table.
Snowplow BigQuery Repeater
The Repeater app is in charge of handling failed inserts. It reads ready-to-load events from a dedicated subscription on the failedInserts
topic and re-tries inserting them into BigQuery to overcome 'table update lag'.
Table update lag
The loader app inserts data into BigQuery in near real-time. At the same time, it sinks messages containing information about the fields of an event into the types
topic. It can take up to 10-15 seconds for Mutator to fetch, parse the message and execute an ALTER TABLE
statement against the table. Additionally, the new column takes some time to propagate and become visible to all workers trying to write to it.
If a new type arrives from the input subscription in this period of time, BigQuery might reject the row containing it and it will be sent to the failedInserts
topic. This topic contains JSON objects ready to be loaded into BigQuery (ie not canonical Snowplow Enriched event format).
In order to load this data again from failedInserts
to BigQuery you can use Repeater, which reads a subscription on failedInserts
and performs INSERT
statements.
Repeater has several important behavior aspects:
- If a pulled record is not a valid Snowplow event, it will result into a
loader_recovery_error
bad row. - If a pulled record is a valid event, Repeater will wait some time (15 minutes by default) after the
etl_tstamp
before attempting to re-insert it, in order to let Mutator do its job. - If the database responds with an error, the row will get transformed into a
loader_recovery_error
bad row. - All entities in the dead-letter bucket are valid Snowplow bad rows.
Topics, subscriptions and message formats
The Snowplow BigQuery Loader apps use Pub/Sub topics and subscriptions to store intermediate data and communicate with each other.
Kind | Populated by | Consumed by | Data format |
Input subscription | Enriched events topic | Loader / StreamLoader | canonical TSV + JSON enriched format |
Types topic | Loader / StreamLoader | Types subscription | iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0 |
Types subscription | Types topic | Mutator | iglu:com.snowplowanalytics.snowplow/shredded_type/jsonschema/1-0-0 |
Bad row topic | Loader / StreamLoader | GCS Loader | iglu:com.snowplowanalytics.snowplow.badrows/loader_iglu_error/jsonschema/2-0-0 iglu:com.snowplowanalytics.snowplow.badrows/loader_parsing_error/jsonschema/2-0-0 iglu:com.snowplowanalytics.snowplow.badrows/loader_runtime_error/jsonschema/1-0-1 |
Failed insert topic | Loader / StreamLoader | Failed insert subscription | BigQuery JSON |
Failed insert subscription | Failed insert topic | Repeater | BigQuery JSON |
Setup guide
Configuration file
Loader / StreamLoader, Mutator and Repeater accept the same configuration file in HOCON format. An example of a minimal configuration file can look like this:
{
"projectId": "com-acme"
"loader": {
"input": {
"subscription": "enriched-sub"
}
"output": {
"good": {
"datasetId": "snowplow"
"tableId": "events"
}
"bad": {
"topic": "bad-topic"
}
"types": {
"topic": "types-topic"
}
"failedInserts": {
"topic": "failed-inserts-topic"
}
}
}
"mutator": {
"input": {
"subscription": "types-sub"
}
"output": {
"good": ${loader.output.good} # will be automatically inferred
}
}
"repeater": {
"input": {
"subscription": "failed-inserts-sub"
}
"output": {
"good": ${loader.output.good} # will be automatically inferred
"deadLetters": {
"bucket": "gs://dead-letter-bucket"
}
}
}
"monitoring": {} # disabled
}
The loader takes command line arguments --config
with a path to the configuration hocon file and --resolver
with a path to the Iglu resolver file. If you are running the docker image then you should mount the configuration files into the container:
docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-streamloader:1.6.4 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json
Or you can pass the whole config as a base64-encoded string using the --config
option, like so:
docker run \
-v /path/to/resolver.json:/resolver.json \
snowplow/snowplow-bigquery-streamloader:1.6.4 \
--config=ewogICJwcm9qZWN0SWQiOiAiY29tLWFjbWUiCgogICJsb2FkZXIiOiB7CiAgICAiaW5wdXQiOiB7CiAgICAgICJzdWJzY3JpcHRpb24iOiAiZW5yaWNoZWQtc3ViIgogICAgfQoKICAgICJvdXRwdXQiOiB7CiAgICAgICJnb29kIjogewogICAgICAgICJkYXRhc2V0SWQiOiAic25vd3Bsb3ciCiAgICAgICAgInRhYmxlSWQiOiAiZXZlbnRzIgogICAgICB9CgogICAgICAiYmFkIjogewogICAgICAgICJ0b3BpYyI6ICJiYWQtdG9waWMiCiAgICAgIH0KCiAgICAgICJ0eXBlcyI6IHsKICAgICAgICAidG9waWMiOiAidHlwZXMtdG9waWMiCiAgICAgIH0KCiAgICAgICJmYWlsZWRJbnNlcnRzIjogewogICAgICAgICJ0b3BpYyI6ICJmYWlsZWQtaW5zZXJ0cy10b3BpYyIKICAgICAgfQogICAgfQogIH0KCiAgIm11dGF0b3IiOiB7CiAgICAiaW5wdXQiOiB7CiAgICAgICJzdWJzY3JpcHRpb24iOiAidHlwZXMtc3ViIgogICAgfQoKICAgICJvdXRwdXQiOiB7CiAgICAgICJnb29kIjogJHtsb2FkZXIub3V0cHV0Lmdvb2R9ICMgd2lsbCBiZSBhdXRvbWF0aWNhbGx5IGluZmVycmVkCiAgICB9CiAgfQoKICAicmVwZWF0ZXIiOiB7CiAgICAiaW5wdXQiOiB7CiAgICAgICJzdWJzY3JpcHRpb24iOiAiZmFpbGVkLWluc2VydHMtc3ViIgogICAgfQoKICAgICJvdXRwdXQiOiB7CiAgICAgICJnb29kIjogJHtsb2FkZXIub3V0cHV0Lmdvb2R9ICMgd2lsbCBiZSBhdXRvbWF0aWNhbGx5IGluZmVycmVkCgogICAgICAiZGVhZExldHRlcnMiOiB7CiAgICAgICAgImJ1Y2tldCI6ICJnczovL2RlYWQtbGV0dGVyLWJ1Y2tldCIKICAgICAgfQogICAgfQogIH0KCiAgIm1vbml0b3JpbmciOiB7fSAjIGRpc2FibGVkCn0= \
--resolver=/resolver.json
The --config
command option is actually optional. For some setups it is more convenient to provide configuration parameters using JVM system properties or environment variables, as documented in the Lightbend config readme.
For example, to override the repeater.input.subscription
setting using system properties:
docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-streamloader:1.6.4 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
-Drepeater.input.subscription="failed-inserts-sub"
Or to use environment variables for every setting:
docker run \
-v /path/to/resolver.json:/resolver.json \
snowplow/snowplow-bigquery-repeater:1.6.4 \
--resolver=/resolver.json \
-Dconfig.override_with_env_vars=true
See the configuration reference for more details and advanced settings.
Command line options
All apps accept a config HOCON as specified above, and an Iglu resolver config passed via the --resolver
option. The latter must conform to the iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-3
schema.
StreamLoader
StreamLoader accepts --config
and --resolver
arguments, as well as any JVM system properties that can be used to override the configuration.
docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-streamloader:1.6.4 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
-Dconfig.override_with_env_vars=true
The --config
flag is optional, but if missing, all configuration options must be specified in some other way (system properties or environment variables).
The Dataflow Loader
The Dataflow Loader accepts the same two arguments as StreamLoader and any other supported by Google Cloud Dataflow.
docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-loader:1.6.4 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--labels={"key1":"val1","key2":"val2"} # optional Dataflow args
The optional labels
argument is an example of a Dataflow natively supported argument. It accepts a JSON with key-value pairs that will be used as labels to the Cloud Dataflow job.
This can be launched from any machine authenticated to submit Dataflow jobs.
Mutator
Mutator has three subcommands: listen
, create
and add-column
.
listen
listen
is the primary command and is used to automate table migrations.
docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-mutator:1.6.4 \
listen \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--verbose # optional, for debugging only
add-column
add-column
can be used once to add a column to the table specified via the loader.output.good
setting. This should eliminate the risk of table update lag and the necessity to run a Repeater, but requires 'manual' intervention.
docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-mutator:1.6.4 \
add-column \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--shred-property=CONTEXTS \
--schema="iglu:com.acme/app_context/jsonschema/1-0-0"
The specified schema must be present in one of the Iglu registries in the resolver configuration.
create
create
creates an empty table with atomic
structure. It can optionally be partitioned by a TIMESTAMP
field.
docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-mutator:1.6.4 \
create \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--partitionColumn=load_tstamp \ # optional TIMESTAMP column by which to partition the table
--requirePartitionFilter # optionally require a filter on the partition column in all queries
See the Google documentation for more information about partitioned tables.
Repeater
We recommend constantly running Repeater on a small / cheap node or Docker container.
docker run \
-v /path/to/configs:/configs \
snowplow/snowplow-bigquery-repeater:1.6.4 \
--config=/configs/bigquery.hocon \
--resolver=/configs/resolver.json \
--bufferSize=20 \ # size of the batch to send to the dead-letter bucket
--timeout=20 \ # duration after which bad rows will be sunk into the dead-letter bucket
--backoffPeriod=900 \ # seconds to wait before attempting an insert (calculated against etl_tstamp)
--verbose # optional, for debugging only
bufferSize
, timeout
and backoffPeriod
are optional parameters.
Docker support
All applications are available as Docker images on Docker Hub, based on Ubuntu Focal and OpenJDK 11:
$ docker pull snowplow/snowplow-bigquery-streamloader:1.6.4
$ docker pull snowplow/snowplow-bigquery-loader:1.6.4
$ docker pull snowplow/snowplow-bigquery-mutator:1.6.4
$ docker pull snowplow/snowplow-bigquery-repeater:1.6.4
We also provide an alternative lightweight set of images based on Google's "distroless" base image, which may provide some security advantages for carrying fewer dependencies. These images are distinguished with the 1.6.4-distroless
tag:
$ docker pull snowplow/snowplow-bigquery-streamloader:1.6.4-distroless
$ docker pull snowplow/snowplow-bigquery-loader:1.6.4-distroless
$ docker pull snowplow/snowplow-bigquery-mutator:1.6.4-distroless
$ docker pull snowplow/snowplow-bigquery-repeater:1.6.4-distroless
Mutator, Repeater and Streamloader are also available as fatjar files attached to releases in the project's Github repository.