Skip to main content

Snowplow Utils Macros

caution

This page is auto-generated from our dbt packages, some information may be incomplete

Snowplow Utils​

Allow Refresh​

macros/utils/allow_refresh.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro allow_refresh() %}
{{ return(adapter.dispatch('allow_refresh', 'snowplow_utils')()) }}
{% endmacro %}

Depends On

App Id Filter​

macros/utils/app_id_filter.sql

Description

Generates a sql filter for the values in app_ids applied on the app_id column.

Arguments

  • app_ids (list): List of app_ids to filter to include

Returns

app_id in (...) if any app_ids are provided, otherwise true.

Usage

app_id_filter(['web', 'mobile', 'news'])

-- returns
app_id in ('web', 'mobile', 'news')

Details

Code
Source
{% macro app_id_filter(app_ids) %}

{%- if app_ids|length -%}

app_id in ('{{ app_ids|join("','") }}') --filter on app_id if provided

{%- else -%}

true

{%- endif -%}

{% endmacro %}

Referenced By

Base Create Snowplow Events This Run​

macros/base/base_create_snowplow_events_this_run.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro base_create_snowplow_events_this_run(sessions_this_run_table='snowplow_base_sessions_this_run', session_identifiers=[{"table" : "events", "field" : "domain_sessionid"}], session_sql=none, session_timestamp='load_tstamp', derived_tstamp_partitioned=true, days_late_allowed=3, max_session_days=3, app_ids=[], snowplow_events_database=none, snowplow_events_schema='atomic', snowplow_events_table='events', entities_or_sdes=none, custom_sql=none) %}
{{ return(adapter.dispatch('base_create_snowplow_events_this_run', 'snowplow_utils')(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql)) }}
{% endmacro %}

Depends On

Referenced By

Base Create Snowplow Incremental Manifest​

macros/base/base_create_snowplow_incremental_manifest.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro base_create_snowplow_incremental_manifest() %}

{% set create_manifest_query %}
with prep as (
select
cast(null as {{ snowplow_utils.type_max_string() }}) model,
cast('1970-01-01' as {{ type_timestamp() }}) as last_success
)

select *

from prep
where false
{% endset %}

{{ return(create_manifest_query) }}

{% endmacro %}

Depends On

Referenced By

Base Create Snowplow Quarantined Sessions​

macros/base/base_create_snowplow_quarantined_sessions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro base_create_snowplow_quarantined_sessions() %}

{% set create_quarantined_query %}
with prep as (
select
cast(null as {{ snowplow_utils.type_max_string() }}) session_identifier
)

select *

from prep
where false

{% endset %}

{{ return(create_quarantined_query) }}

{% endmacro %}

Depends On

Referenced By

Base Create Snowplow Sessions Lifecycle Manifest​

macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro base_create_snowplow_sessions_lifecycle_manifest(session_identifiers=[{"schema": "atomic", "field" : "domain_sessionid"}], session_sql=none, session_timestamp='load_tstamp', user_identifiers=[{"schema": "atomic", "field" : "domain_userid"}], user_sql=none, quarantined_sessions=none, derived_tstamp_partitioned=true, days_late_allowed=3, max_session_days=3, app_ids=[], snowplow_events_database=none, snowplow_events_schema='atomic', snowplow_events_table='events', event_limits_table='snowplow_base_new_event_limits', incremental_manifest_table='snowplow_incremental_manifest', package_name='snowplow') %}
{{ return(adapter.dispatch('base_create_snowplow_sessions_lifecycle_manifest', 'snowplow_utils')(session_identifiers, session_sql, session_timestamp, user_identifiers, user_sql, quarantined_sessions, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, event_limits_table, incremental_manifest_table, package_name)) }}
{% endmacro %}

Depends On

Referenced By

Base Create Snowplow Sessions This Run​

macros/base/base_create_snowplow_sessions_this_run.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro base_create_snowplow_sessions_this_run(lifecycle_manifest_table='snowplow_base_sessions_lifecycle_manifest', new_event_limits_table='snowplow_base_new_event_limits') %}
{% set lifecycle_manifest = ref(lifecycle_manifest_table) %}
{% set new_event_limits = ref(new_event_limits_table) %}
{%- set lower_limit,
upper_limit,
session_start_limit = snowplow_utils.return_base_new_event_limits(new_event_limits) %}

{% set sessions_sql %}


select
s.session_identifier,
s.user_identifier,
s.start_tstamp,
-- end_tstamp used in next step to limit events. When backfilling, set end_tstamp to upper_limit if end_tstamp > upper_limit.
-- This ensures we don't accidentally process events after upper_limit
case when s.end_tstamp > {{ upper_limit }} then {{ upper_limit }} else s.end_tstamp end as end_tstamp

from {{ lifecycle_manifest }} s

where
-- General window of start_tstamps to limit table scans. Logic complicated by backfills.
-- To be within the run, session start_tstamp must be >= lower_limit - max_session_days as we limit end_tstamp in manifest to start_tstamp + max_session_days
s.start_tstamp >= {{ session_start_limit }}
and s.start_tstamp <= {{ upper_limit }}
-- Select sessions within window that either; start or finish between lower & upper limit, start and finish outside of lower and upper limits
and not (s.start_tstamp > {{ upper_limit }} or s.end_tstamp < {{ lower_limit }})
{% endset %}

{{ return(sessions_sql) }}
{% endmacro%}

Depends On

Referenced By

Base Get Quarantine Sql​

macros/base/base_quarantine_sessions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro base_get_quarantine_sql(relation, max_session_length) %}

{# Find sessions exceeding max_session_days #}
{% set quarantine_sql -%}

select
session_identifier

from {{ relation }}
-- '=' since end_tstamp is restricted to start_tstamp + max_session_days
where end_tstamp = {{ snowplow_utils.timestamp_add(
'day',
max_session_length,
'start_tstamp'
) }}

{%- endset %}

{{ return(quarantine_sql) }}

{% endmacro %}

Depends On

Referenced By

Base Quarantine Sessions​

macros/base/base_quarantine_sessions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro base_quarantine_sessions(max_session_length, quarantined_sessions='snowplow_base_quarantined_sessions', src_relation=this) %}

{{ return(adapter.dispatch('base_quarantine_sessions', 'snowplow_utils')(max_session_length, quarantined_sessions, src_relation)) }}

{% endmacro %}

Depends On

Referenced By

Cast To Tstamp​

macros/utils/cross_db/timestamp_functions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro cast_to_tstamp(tstamp_literal) -%}
{% if tstamp_literal is none or tstamp_literal|lower in ['null',''] %}
cast(null as {{type_timestamp()}})
{% else %}
cast('{{tstamp_literal}}' as {{type_timestamp()}})
{% endif %}
{%- endmacro %}

Depends On

  • macro.dbt.type_timestamp

Referenced By

Cluster By Fields Sessions Lifecycle​

macros/utils/cross_db/cluster_by_fields.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro cluster_by_fields_sessions_lifecycle() %}

{{ return(adapter.dispatch('cluster_by_fields_sessions_lifecycle', 'snowplow_utils')()) }}

{% endmacro %}

Depends On

Coalesce Field Paths​

macros/utils/bigquery/combine_column_versions/coalesce_field_paths.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro coalesce_field_paths(paths, field_alias, include_field_alias, relation_alias) %}

{% set relation_alias = '' if relation_alias is none else relation_alias~'.' %}

{% set field_alias = '' if not include_field_alias else ' as '~field_alias %}

{% set joined_paths = relation_alias~paths|join(', '~relation_alias) %}

{% set coalesced_field_paths = 'coalesce('~joined_paths~')'~field_alias %}

{{ return(coalesced_field_paths) }}

{% endmacro %}

Referenced By

Combine Column Versions​

macros/utils/bigquery/combine_column_versions/combine_column_versions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro combine_column_versions(relation, column_prefix, required_fields=[], nested_level=none, level_filter='equalto', relation_alias=none, include_field_alias=true, array_index=0, max_nested_level=15, exclude_versions=[]) %}

{# Create field_alias if not supplied i.e. is not tuple #}
{% set required_fields_tmp = required_fields %}
{% set required_fields = [] %}
{% for field in required_fields_tmp %}
{% set field_tuple = snowplow_utils.get_field_alias(field) %}
{% do required_fields.append(field_tuple) %}
{% endfor %}

{% set required_field_names = required_fields|map(attribute=0)|list %}

{# Determines correct level_limit. This limits recursive iterations during unnesting. #}
{% set level_limit = snowplow_utils.get_level_limit(nested_level, level_filter, required_field_names) %}

{# Limit level_limit to max_nested_level if required #}
{% set level_limit = max_nested_level if level_limit is none or level_limit > max_nested_level else level_limit %}

{%- set matched_columns = snowplow_utils.get_columns_in_relation_by_column_prefix(relation, column_prefix) -%}

{# Removes excluded versions, assuming column name ends with a version of format 'X_X_X' #}
{%- set filter_columns_by_version = snowplow_utils.exclude_column_versions(matched_columns, exclude_versions) -%}

{%- set flattened_fields_by_col_version = [] -%}

{# Flatten fields within each column version. Returns nested arrays of dicts. #}
{# Dict: {'field_name': str, 'field_alias': str, 'flattened_path': str, 'nested_level': int #}
{% for column in filter_columns_by_version|sort(attribute='name', reverse=true) %}
{% set flattened_fields = snowplow_utils.flatten_fields(fields=column.fields,
parent=column,
path=column.name,
array_index=array_index,
level_limit=level_limit
) %}

{% do flattened_fields_by_col_version.append(flattened_fields) %}

{% endfor %}

{# Flatten nested arrays and merges fields across col version. Returns array of dicts containing all field_paths for field. #}
{# Dict: {'field_name': str, 'flattened_field_paths': str, 'nested_level': int #}
{% set merged_fields = snowplow_utils.merge_fields_across_col_versions(flattened_fields_by_col_version) %}

{# Filters merged_fields based on required_fields if provided, or the level filter if provided. Default return all fields. #}
{% set matched_fields = snowplow_utils.get_matched_fields(fields=merged_fields,
required_field_names=required_field_names,
nested_level=nested_level,
level_filter=level_filter
) %}

{% set coalesced_field_paths = [] %}

{% for field in matched_fields %}

{% set passed_field_alias = required_fields|selectattr(0, "equalto", field.field_name)|map(attribute=1)|list %}
{% set default_field_alias = field.field_name|replace('.', '_') %}
{# Use passed_field_alias from required_fields if supplied #}
{% set field_alias = default_field_alias if not passed_field_alias|length else passed_field_alias[0] %}

{# Coalesce each field's path across all version of columns, ordered by latest col version. #}
{% set coalesced_field_path = snowplow_utils.coalesce_field_paths(paths=field.field_paths,
field_alias=field_alias,
include_field_alias=include_field_alias,
relation_alias=relation_alias) %}

{% do coalesced_field_paths.append(coalesced_field_path) %}

{% endfor %}

{# Returns array of all coalesced field paths #}
{{ return(coalesced_field_paths) }}

{% endmacro %}

Depends On

Referenced By

Current Timestamp In Utc​

macros/utils/cross_db/timestamp_functions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro current_timestamp_in_utc() -%}
{{ return(adapter.dispatch('current_timestamp_in_utc', 'snowplow_utils')()) }}
{%- endmacro %}

Depends On

  • macro.dbt.current_timestamp
  • macro.dbt.type_timestamp

Referenced By

Exclude Column Versions​

macros/utils/bigquery/combine_column_versions/exclude_column_versions.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro exclude_column_versions(columns, exclude_versions) %}
{%- set filtered_columns_by_version = [] -%}
{% for column in columns %}
{%- set col_version = column.name[-5:] -%}
{% if col_version not in exclude_versions %}
{% do filtered_columns_by_version.append(column) %}
{% endif %}
{% endfor %}

{{ return(filtered_columns_by_version) }}

{% endmacro %}

Referenced By

Flatten Fields​

macros/utils/bigquery/combine_column_versions/flatten_fields.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro flatten_fields(fields, parent, path, array_index, level_limit=none, level_counter=1, flattened_fields=[], field_name='') %}

{% for field in fields %}

{# Only recurse up-until level_limit #}
{% if level_limit is not none and level_counter > level_limit %}
{{ return(flattened_fields) }}
{% endif %}

{# If parent column is an array then take element [array_index]. #}
{% set delimiter = '[safe_offset(%s)].'|format(array_index) if parent.mode == 'REPEATED' else '.' %}
{% set path = path~delimiter~field.name %}
{% set field_name = field_name~'.'~field.name if field_name != '' else field_name~field.name %}

{% set field_dict = {
'field_name': field_name,
'path': path,
'nested_level': level_counter
} %}

{% do flattened_fields.append(field_dict) %}

{# If field has nested fields recurse to extract all fields, unless array. #}
{% if field.dtype == 'RECORD' and field.mode != 'REPEATED' %}

{{ snowplow_utils.flatten_fields(
fields=field.fields,
parent=field,
level_limit=level_limit,
level_counter=level_counter+1,
path=path,
flattened_fields=flattened_fields,
field_name=field_name
) }}

{% endif %}

{% endfor %}

{{ return(flattened_fields) }}

{% endmacro %}

Depends On

Referenced By

Get Array To String​

macros/utils/cross_db/get_array_to_string.sql

Description

This macro takes care of harmonising cross-db array to string type functions. The macro supports a custom delimiter if you don't want to use a comma with no space (default).

Arguments

  • array_column (string): Name of the column to join into a string
  • column_prefix (string): Table alias for the array_column
  • delimiter (string): (Optional) String that determines how to delimit your array values. Default ','

Returns

The data warehouse appropriate sql to convert an array to a string.

Usage

select
...
{{ snowplow_utils.get_array_to_string('my_array_column', 'a', ', ') }}
...
from ... a

Details

Code
Source


{%- macro get_array_to_string(array_column, column_prefix, delimiter=',') -%}
{{ return(adapter.dispatch('get_array_to_string', 'snowplow_utils')(array_column, column_prefix, delimiter)) }}
{%- endmacro -%}


Referenced By

Get Cluster By​

macros/utils/get_cluster_by.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_cluster_by(bigquery_cols=none, snowflake_cols=none) %}

{%- do exceptions.warn("Warning: the `get_cluster_by` macro is deprecated and will be removed in a future version of the package, please use `get_value_by_target_type` instead.") -%}


{% if target.type == 'bigquery' %}
{{ return(bigquery_cols) }}
{% elif target.type == 'snowflake' %}
{{ return(snowflake_cols) }}
{% endif %}

{% endmacro %}

Get Columns In Relation By Column Prefix​

macros/utils/get_columns_in_relation_by_column_prefix.sql

Description

This macro returns an array of column objects within a relation that start with the given column prefix. This is useful when you have multiple versions of a column within a table and want to dynamically identify all versions.

Arguments

  • relation (relation): A table or ref type object to get the columns from
  • column_prefix (string): The prefix string to search for matching columns

Returns

An array of (column objects)[https://docs.getdbt.com/reference/dbt-classes#column]. The name of each column can be accessed with the name property.

Usage

get_columns_in_relation_by_column_prefix(ref('snowplow_web_base_events_this_run'), 'domain')

-- returns
['domain_sessionid', 'domain_userid', 'domain_sessionidx',...]

{% set matched_columns = snowplow_utils.get_columns_in_relation_by_column_prefix(
relation=ref('snowplow_web_base_events_this_run'),
column_prefix='custom_context_1_0_'
) %}

{% for column in matched_columns %}
{{ column.name }}
{% endfor %}

<h1>Renders</h1>
to something like:
'custom_context_1_0_1'
'custom_context_1_0_2'
'custom_context_1_0_3'

Details

Code
Source
{% macro get_columns_in_relation_by_column_prefix(relation, column_prefix) %}

{# Prevent introspective queries during parsing #}
{%- if not execute -%}
{{ return('') }}
{% endif %}

{%- set columns = adapter.get_columns_in_relation(relation) -%}

{# get_columns_in_relation returns uppercase cols for snowflake so uppercase column_prefix #}
{%- set column_prefix = column_prefix.upper() if target.type == 'snowflake' else column_prefix -%}

{%- set matched_columns = [] -%}

{# add columns with matching prefix to matched_columns #}
{% for column in columns %}
{% if column.name.startswith(column_prefix) %}
{% do matched_columns.append(column) %}
{% endif %}
{% endfor %}

{% if matched_columns|length %}
{{ return(matched_columns) }}
{% else %}
{{ exceptions.raise_compiler_error("Snowplow: No columns found with prefix "~column_prefix) }}
{% endif %}

{% endmacro %}

Referenced By

Get Enabled Snowplow Models​

macros/incremental_hooks/get_enabled_snowplow_models.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_enabled_snowplow_models(package_name, graph_object=none, models_to_run=var("models_to_run", ""), base_events_table_name='snowplow_base_events_this_run') -%}

{# Override dbt graph object if graph_object is passed. Testing purposes #}
{% if graph_object is not none %}
{% set graph = graph_object %}
{% endif %}

{# models_to_run optionally passed using dbt ls command. This returns a string of models to be run. Split into list #}
{% if models_to_run|length %}
{% set selected_models = models_to_run.split(" ") %}
{% else %}
{% set selected_models = none %}
{% endif %}

{% set enabled_models = [] %}
{% set untagged_snowplow_models = [] %}
{% set snowplow_model_tag = package_name+'_incremental' %}
{% set snowplow_events_this_run_path = 'model.' + project_name + '.' + base_events_table_name %}

{% if execute %}

{% set nodes = graph.nodes.values() | selectattr("resource_type", "equalto", "model") %}

{% for node in nodes %}
{# If selected_models is specified, filter for these models #}
{% if selected_models is none or node.name in selected_models %}

{% if node.config.enabled and snowplow_model_tag not in node.tags and snowplow_events_this_run_path in node.depends_on.nodes %}

{%- do untagged_snowplow_models.append(node.name) -%}

{% endif %}

{% if node.config.enabled and snowplow_model_tag in node.tags %}

{%- do enabled_models.append(node.name) -%}

{% endif %}

{% endif %}

{% endfor %}

{% if untagged_snowplow_models|length %}
{#
Prints warning for models that reference snowplow_base_events_this_run but are untagged as '{package_name}_incremental'
Without this tagging these models will not be inserted into the manifest, breaking the incremental logic.
Only catches first degree dependencies rather than all downstream models
#}
{%- do exceptions.raise_compiler_error("Snowplow Warning: Untagged models referencing '"+snowplow_events_this_run_path+"'. Please refer to the Snowplow docs on tagging. "
+ "Models: "+ ', '.join(untagged_snowplow_models)) -%}

{% endif %}

{% if enabled_models|length == 0 %}
{%- do exceptions.raise_compiler_error("No enabled models identified.") -%}
{% endif %}

{% endif %}

{{ return(enabled_models) }}

{%- endmacro %}

Referenced By

Get Field​

macros/utils/cross_db/get_field.sql

Description

This macro exists to make it easier to extract a field from our unstruct_ and contexts_ type columns for users in Snowflake, Databricks, and BigQuery (although you may prefer to use combine_column_versions for BigQuery, as this manages multiple context versions and allows for extraction of multiple fields at the same time). The macro can handle type casting and selecting from arrays.

Arguments

  • column_name (string): Name of the column to extract the field from
  • field_name (string): Name of the field to extract
  • table_alias (string): (Optional) Alias of the table in your query that the column exists in. Default none (no table alias)
  • type (string): (Optional) Type to cast the field to if required. Default none (no casting)
  • array_index (integer): (Optional) Index of the array to select in case of multiple entries. Uses SAFE_OFFSET for BigQuery. Default none (not an array)

Returns

SQL snippet to select the field specified from the column

Usage

Extracting a single field


select
{{ snowplow_utils.get_field(column_name = 'contexts_nl_basjes_yauaa_context_1',
field_name = 'agent_class',
table_alias = 'a',
type = 'string',
array_index = 0)}} as yauaa_agent_class
from
my_events_table a

Extracting multiple fields


select
{% for field in [('field1', 'string'), ('field2', 'numeric'), ...] %}
{{ snowplow_utils.get_field(column_name = 'contexts_nl_basjes_yauaa_context_1',
field_name = field[0],
table_alias = 'a',
type = field[1],
array_index = 0)}} as {{ field[0] }}
{% endfor %}

from
my_events_table a

Details

Code
Source
{% macro get_field(column_name, field_name, table_alias = none, type = none, array_index = none) %}
{{ return(adapter.dispatch('get_field', 'snowplow_utils')(column_name, field_name, table_alias, type, array_index)) }}
{% endmacro %}

Referenced By

Get Field Alias​

macros/utils/bigquery/combine_column_versions/get_field_alias.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_field_alias(field) %}

{# Check if field is supplied as tuple e.g. (field_name, field_alias) #}
{% if field is iterable and field is not string %}
{{ return(field) }}
{% else %}
{{ return((field, field|replace('.', '_'))) }}
{% endif %}

{% endmacro %}

Referenced By

Get Incremental Manifest Status​

macros/incremental_hooks/get_incremental_manifest_status.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_incremental_manifest_status(incremental_manifest_table, models_in_run) -%}

{# In case of not execute just return empty strings to avoid hitting database #}
{% if not execute %}
{{ return(['', '', '', '']) }}
{% endif %}

{% set target_relation = adapter.get_relation(
database=incremental_manifest_table.database,
schema=incremental_manifest_table.schema,
identifier=incremental_manifest_table.name) %}

{% if target_relation is not none %}

{% set last_success_query %}
select min(last_success) as min_last_success,
max(last_success) as max_last_success,
coalesce(count(*), 0) as models
from {{ incremental_manifest_table }}
where model in ({{ snowplow_utils.print_list(models_in_run) }})
{% endset %}

{% set results = run_query(last_success_query) %}

{% if execute %}

{% set min_last_success = results.columns[0].values()[0] %}
{% set max_last_success = results.columns[1].values()[0] %}
{% set models_matched_from_manifest = results.columns[2].values()[0] %}
{% set has_matched_all_models = true if models_matched_from_manifest == models_in_run|length else false %}

{{ return([min_last_success, max_last_success, models_matched_from_manifest, has_matched_all_models]) }}

{% endif %}


{% else %}

{% do exceptions.warn("Snowplow Warning: " ~ incremental_manifest_table ~ " does not exist. This is expected if you are compiling a fresh installation of the dbt-snowplow-* packages.") %}

{{ return(['9999-01-01 00:00:00', '9999-01-01 00:00:00', 0, false]) }}

{% endif %}


{%- endmacro %}

Depends On

Referenced By

Get Incremental Manifest Table Relation​

macros/incremental_hooks/get_incremental_manifest_table_relation.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_incremental_manifest_table_relation(package_name) %}

{%- set incremental_manifest_table = ref(package_name~'_incremental_manifest') -%}

{{ return(incremental_manifest_table) }}

{% endmacro %}

Referenced By

Get Level Limit​

macros/utils/bigquery/combine_column_versions/get_level_limit.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_level_limit(level, level_filter, required_field_names) %}

{% set accepted_level_filters = ['equalto','lessthan','greaterthan'] %}

{% if level_filter is not in accepted_level_filters %}
{% set incompatible_level_filter_error_message -%}
Error: Incompatible level filter arg. Accepted args: {{accepted_level_filters|join(', ')}}
{%- endset %}
{{ return(snowplow_utils.throw_compiler_error(incompatible_level_filter_error_message)) }}
{% endif %}

{% if level is not none and required_field_names|length %}
{% set double_filter_error_message -%}
Error: Cannot filter fields by both `required_fields` and `level` arg. Please use only one.
{%- endset %}
{{ return(snowplow_utils.throw_compiler_error(double_filter_error_message)) }}
{% endif %}

{% if required_field_names|length and level_filter != 'equalto' %}
{% set required_fields_error_message -%}
Error: To filter fields using `required_fields` arg, `level_filter` must be set to `equalto`
{%- endset %}
{{ return(snowplow_utils.throw_compiler_error(required_fields_error_message)) }}
{% endif %}

{# level_limit is inclusive #}

{% if level is not none %}

{% if level_filter == 'equalto' %}

{% set level_limit = level %}

{% elif level_filter == 'lessthan' %}

{% set level_limit = level -1 %}

{% elif level_filter == 'greaterthan' %}

{% set level_limit = none %}

{% endif %}

{% elif required_field_names|length %}

{% set field_depths = [] %}
{% for field in required_field_names %}
{% set field_depth = field.split('.')|length %}
{% do field_depths.append(field_depth) %}
{% endfor %}

{% set level_limit = field_depths|max %}

{% else %}

{# Case when selecting all available fields #}

{% set level_limit = none %}

{% endif %}

{{ return(level_limit) }}

{% endmacro %}

Depends On

Referenced By

Get Matched Fields​

macros/utils/bigquery/combine_column_versions/get_matched_fields.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_matched_fields(fields, required_field_names, nested_level, level_filter) %}

{% if not required_field_names|length %}

{% if nested_level is none %}

{% set matched_fields = fields %}

{% else %}

{% set matched_fields = fields|selectattr('nested_level',level_filter, nested_level)|list %}

{% endif %}

{% else %}

{% set matched_fields = fields|selectattr('field_name','in', required_field_names)|list %}

{% endif %}

{{ return(matched_fields) }}

{% endmacro %}

Referenced By

Get New Event Limits Table Relation​

macros/incremental_hooks/get_new_event_limits_table_relation.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_new_event_limits_table_relation(package_name) %}

{%- set new_event_limits_table = ref(package_name~'_base_new_event_limits') -%}

{{ return(new_event_limits_table) }}

{% endmacro %}

Referenced By

Get Optional Fields​

macros/utils/bigquery/get_optional_fields.sql

Description

This macro does not currently have a description.

Details

Code
Source
{% macro get_optional_fields(enabled, fields, col_prefix, relation, relation_alias, include_field_alias=true) -%}

{%- if enabled -%}

{%- set combined_fields = snowplow_utils.combine_column_versions(
relation=relation,
column_prefix=col_prefix,
required_fields=fields|map(attribute='field')|list,
relation_alias=relation_alias,
include_field_alias=include_field_alias
) -%}

{{ combined_fields|join(',\n') }}

{%- else -%}

{% for field in fields %}

{%- set field_alias = snowplow_utils.get_field_alias(field.field)[1] -%}

cast(null as {{ field.dtype }}){%- if include_field_alias %} as {{ field_alias }}{%- endif %} {%- if not loop