Snowplow Utils Macros
This page is auto-generated from our dbt packages, some information may be incomplete
Snowplow Utils​
Allow Refresh​
macros/utils/allow_refresh.sql
Description
This macro does not currently have a description.
Details
Code
- raw
- default
{% macro allow_refresh() %}
{{ return(adapter.dispatch('allow_refresh', 'snowplow_utils')()) }}
{% endmacro %}
{% macro default__allow_refresh() %}
{% set allow_refresh = snowplow_utils.get_value_by_target(
dev_value=none,
default_value=var('snowplow__allow_refresh'),
dev_target_name=var('snowplow__dev_target_name')
) %}
{{ return(allow_refresh) }}
{% endmacro %}
Depends On
App Id Filter​
macros/utils/app_id_filter.sql
Description
Generates a sql
filter for the values in app_ids
applied on the app_id
column.
Arguments
app_ids
(list): List of app_ids to filter to include
Returns
app_id in (...)
if any app_ids
are provided, otherwise true
.
Usage
app_id_filter(['web', 'mobile', 'news'])
-- returns
app_id in ('web', 'mobile', 'news')
Details
Code
{% macro app_id_filter(app_ids) %}
{%- if app_ids|length -%}
app_id in ('{{ app_ids|join("','") }}') --filter on app_id if provided
{%- else -%}
true
{%- endif -%}
{% endmacro %}
Referenced By
- Models
- Macros
- model.snowplow_ecommerce.snowplow_ecommerce_base_events_this_run
- model.snowplow_ecommerce.snowplow_ecommerce_base_sessions_lifecycle_manifest
- model.snowplow_mobile.snowplow_mobile_base_events_this_run
- model.snowplow_mobile.snowplow_mobile_base_sessions_lifecycle_manifest
- model.snowplow_normalize.snowplow_normalize_base_events_this_run
- model.snowplow_web.snowplow_web_base_events_this_run
Base Create Snowplow Events This Run​
macros/base/base_create_snowplow_events_this_run.sql
Description
This macro does not currently have a description.
Details
Code
- raw
- default
- postgres
{% macro base_create_snowplow_events_this_run(sessions_this_run_table='snowplow_base_sessions_this_run', session_identifiers=[{"table" : "events", "field" : "domain_sessionid"}], session_sql=none, session_timestamp='load_tstamp', derived_tstamp_partitioned=true, days_late_allowed=3, max_session_days=3, app_ids=[], snowplow_events_database=none, snowplow_events_schema='atomic', snowplow_events_table='events', entities_or_sdes=none, custom_sql=none) %}
{{ return(adapter.dispatch('base_create_snowplow_events_this_run', 'snowplow_utils')(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql)) }}
{% endmacro %}
{% macro default__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql) %}
{%- set lower_limit, upper_limit = snowplow_utils.return_limits_from_model(ref(sessions_this_run_table),
'start_tstamp',
'end_tstamp') %}
{% set sessions_this_run = ref(sessions_this_run_table) %}
{% set snowplow_events = api.Relation.create(database=snowplow_events_database, schema=snowplow_events_schema, identifier=snowplow_events_table) %}
{% set events_this_run_query %}
with identified_events AS (
select
{% if session_sql %}
{{ session_sql }} as session_identifier,
{% else -%}
COALESCE(
{% for identifier in session_identifiers %}
{%- if identifier['schema']|lower != 'atomic' -%}
{{ snowplow_utils.get_field(identifier['schema'], identifier['field'], 'e', dbt.type_string(), 0) }}
{%- else -%}
e.{{identifier['field']}}
{%- endif -%}
,
{%- endfor -%}
NULL
) as session_identifier,
{%- endif %}
e.*
{% if custom_sql %}
, {{ custom_sql }}
{% endif %}
from {{ snowplow_events }} e
)
select
a.*,
b.user_identifier -- take user_identifier from manifest. This ensures only 1 domain_userid per session.
from identified_events as a
inner join {{ sessions_this_run }} as b
on a.session_identifier = b.session_identifier
where a.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, 'b.start_tstamp') }}
and a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'a.dvce_created_tstamp') }}
and a.{{ session_timestamp }} >= {{ lower_limit }}
and a.{{ session_timestamp }} <= {{ upper_limit }}
{% if derived_tstamp_partitioned and target.type == 'bigquery' | as_bool() %}
and a.derived_tstamp >= {{ snowplow_utils.timestamp_add('hour', -1, lower_limit) }}
and a.derived_tstamp <= {{ upper_limit }}
{% endif %}
and {{ snowplow_utils.app_id_filter(app_ids) }}
qualify row_number() over (partition by a.event_id order by a.{{ session_timestamp }}, a.dvce_created_tstamp) = 1
{% endset %}
{{ return(events_this_run_query) }}
{% endmacro %}
{% macro postgres__base_create_snowplow_events_this_run(sessions_this_run_table, session_identifiers, session_sql, session_timestamp, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, entities_or_sdes, custom_sql) %}
{%- set lower_limit, upper_limit = snowplow_utils.return_limits_from_model(ref(sessions_this_run_table),
'start_tstamp',
'end_tstamp') %}
{% if entities_or_sdes %}
-- check uniqueness of entity/sde names provided
{% set ent_sde_names = [] %}
{% for ent_or_sde in entities_or_sdes %}
{% do ent_sde_names.append(ent_or_sde['name']) %}
{% endfor %}
{% if ent_sde_names | unique | list | length != entities_or_sdes | length %}
{% do exceptions.raise_compiler_error("There are duplicate names in your provided `entities_or_sdes` list. Please correct this before proceeding.")%}
{% endif %}
{% endif %}
{% set sessions_this_run = ref(sessions_this_run_table) %}
{% set snowplow_events = api.Relation.create(database=snowplow_events_database, schema=snowplow_events_schema, identifier=snowplow_events_table) %}
{% set events_this_run_query %}
with
{% if session_identifiers -%}
{% for identifier in session_identifiers %}
{% if identifier['schema']|lower != 'atomic' %}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, identifier['schema'], lower_limit, upper_limit, identifier['prefix']) }},
{%- endif -%}
{% endfor %}
{% endif %}
{%- if entities_or_sdes -%}
{%- for ent_or_sde in entities_or_sdes -%}
{%- set name = none -%}
{%- set prefix = none -%}
{%- set single_entity = true -%}
{%- if ent_or_sde['name'] -%}
{%- set name = ent_or_sde['name'] -%}
{%- else -%}
{%- do exceptions.raise_compiler_error("Need to specify the name of your Entity or SDE using the {'name'} attribute in a key-value map.") -%}
{%- endif -%}
{%- if ent_or_sde['prefix'] -%}
{%- set prefix = ent_or_sde['prefix'] -%}
{%- else -%}
{%- set prefix = name -%}
{%- endif -%}
{%- if ent_or_sde['single_entity'] and ent_or_sde['single_entity'] is boolean -%}
{%- set single_entity = ent_or_sde['single_entity'] -%}
{%- endif %}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, name, lower_limit, upper_limit, prefix, single_entity) }},
{% endfor -%}
{%- endif %}
identified_events AS (
select
{% if session_sql -%}
{{ session_sql }} as session_identifier,
{% else -%}
COALESCE(
{% for identifier in session_identifiers %}
{%- if identifier['schema']|lower != 'atomic' %}
{% if identifier['alias'] %}{{identifier['alias']}}{% else %}{{identifier['schema']}}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}_{{identifier['field']}}
{%- else %}
e.{{identifier['field']}}
{%- endif -%}
,
{%- endfor -%}
NULL
) as session_identifier,
{%- endif %}
e.*
{% if custom_sql %}
, {{ custom_sql }}
{%- endif %}
from {{ snowplow_events }} e
{% if session_identifiers|length > 0 %}
{% for identifier in session_identifiers %}
{%- if identifier['schema']|lower != 'atomic' -%}
left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{identifier['prefix']}}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{ identifier['prefix'] }}__tstamp
{% endif -%}
{% endfor %}
{% endif %}
), events_this_run as (
select
a.*,
b.user_identifier, -- take user_identifier from manifest. This ensures only 1 domain_userid per session.
row_number() over (partition by a.event_id order by a.{{ session_timestamp }}, a.dvce_created_tstamp ) as event_id_dedupe_index,
count(*) over (partition by a.event_id) as event_id_dedupe_count
from identified_events as a
inner join {{ sessions_this_run }} as b
on a.session_identifier = b.session_identifier
where a.{{ session_timestamp }} <= {{ snowplow_utils.timestamp_add('day', max_session_days, 'b.start_tstamp') }}
and a.dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'a.dvce_created_tstamp') }}
and a.{{ session_timestamp }} >= {{ lower_limit }}
and a.{{ session_timestamp }} <= {{ upper_limit }}
and {{ snowplow_utils.app_id_filter(app_ids) }}
)
select *
from events_this_run as e
{%- if entities_or_sdes -%}
{% for ent_or_sde in entities_or_sdes -%}
{%- set name = none -%}
{%- set prefix = none -%}
{%- set single_entity = true -%}
{%- set alias = none -%}
{%- if ent_or_sde['name'] -%}
{%- set name = ent_or_sde['name'] -%}
{%- else -%}
{%- do exceptions.raise_compiler_error("Need to specify the name of your Entity or SDE using the {'name'} attribute in a key-value map.") -%}
{%- endif -%}
{%- if ent_or_sde['prefix'] -%}
{%- set prefix = ent_or_sde['prefix'] -%}
{%- else -%}
{%- set prefix = name -%}
{%- endif -%}
{%- if ent_or_sde['single_entity'] and ent_or_sde['single_entity'] is boolean -%}
{%- set single_entity = ent_or_sde['single_entity'] -%}
{%- endif -%}
{%- if ent_or_sde['alias'] -%}
{%- set alias = ent_or_sde['alias'] -%}
{%- endif %}
left join {{name}} {% if alias -%} as {{ alias }} {%- endif %} on e.event_id = {% if alias -%} {{ alias }} {%- else -%}{{name}}{%- endif %}.{{prefix}}__id
and e.collector_tstamp = {% if alias -%} {{ alias }} {%- else -%}{{name}}{%- endif %}.{{prefix}}__tstamp
{% if not single_entity -%} and mod({% if alias -%} {{ alias }} {%- else -%}{{name}}{%- endif %}.{{prefix}}__index, e.event_id_dedupe_count) = 0{%- endif -%}
{% endfor %}
{% endif %}
where event_id_dedupe_index = 1
{% endset %}
{{ return(events_this_run_query) }}
{% endmacro %}
Depends On
- Macros
Referenced By
Base Create Snowplow Incremental Manifest​
macros/base/base_create_snowplow_incremental_manifest.sql
Description
This macro does not currently have a description.
Details
Code
{% macro base_create_snowplow_incremental_manifest() %}
{% set create_manifest_query %}
with prep as (
select
cast(null as {{ snowplow_utils.type_max_string() }}) model,
cast('1970-01-01' as {{ type_timestamp() }}) as last_success
)
select *
from prep
where false
{% endset %}
{{ return(create_manifest_query) }}
{% endmacro %}
Depends On
- Macros
- macro.dbt.type_timestamp
- macro.snowplow_utils.type_max_string
Referenced By
Base Create Snowplow Quarantined Sessions​
macros/base/base_create_snowplow_quarantined_sessions.sql
Description
This macro does not currently have a description.
Details
Code
{% macro base_create_snowplow_quarantined_sessions() %}
{% set create_quarantined_query %}
with prep as (
select
cast(null as {{ snowplow_utils.type_max_string() }}) session_identifier
)
select *
from prep
where false
{% endset %}
{{ return(create_quarantined_query) }}
{% endmacro %}
Depends On
Referenced By
Base Create Snowplow Sessions Lifecycle Manifest​
macros/base/base_create_snowplow_sessions_lifecycle_manifest.sql
Description
This macro does not currently have a description.
Details
Code
- raw
- default
- postgres
{% macro base_create_snowplow_sessions_lifecycle_manifest(session_identifiers=[{"schema": "atomic", "field" : "domain_sessionid"}], session_sql=none, session_timestamp='load_tstamp', user_identifiers=[{"schema": "atomic", "field" : "domain_userid"}], user_sql=none, quarantined_sessions=none, derived_tstamp_partitioned=true, days_late_allowed=3, max_session_days=3, app_ids=[], snowplow_events_database=none, snowplow_events_schema='atomic', snowplow_events_table='events', event_limits_table='snowplow_base_new_event_limits', incremental_manifest_table='snowplow_incremental_manifest', package_name='snowplow') %}
{{ return(adapter.dispatch('base_create_snowplow_sessions_lifecycle_manifest', 'snowplow_utils')(session_identifiers, session_sql, session_timestamp, user_identifiers, user_sql, quarantined_sessions, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, event_limits_table, incremental_manifest_table, package_name)) }}
{% endmacro %}
{% macro default__base_create_snowplow_sessions_lifecycle_manifest(session_identifiers, session_sql, session_timestamp, user_identifiers, user_sql, quarantined_sessions, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, event_limits_table, incremental_manifest_table, package_name) %}
{% set base_event_limits = ref(event_limits_table) %}
{% set lower_limit, upper_limit, _ = snowplow_utils.return_base_new_event_limits(base_event_limits) %}
{% set session_lookback_limit = snowplow_utils.get_session_lookback_limit(lower_limit) %}
{% set is_run_with_new_events = snowplow_utils.is_run_with_new_events(package_name, event_limits_table, incremental_manifest_table) %}
{% set snowplow_events = api.Relation.create(database=snowplow_events_database, schema=snowplow_events_schema, identifier=snowplow_events_table) %}
{% set sessions_lifecycle_manifest_query %}
with new_events_session_ids_init as (
select
{% if session_sql %}
{{ session_sql }} as session_identifier,
{% elif session_identifiers|length > 0 %}
COALESCE(
{% for identifier in session_identifiers %}
{%- if identifier['schema']|lower != 'atomic' -%}
{{ snowplow_utils.get_field(identifier['schema'], identifier['field'], 'e', dbt.type_string(), 0) }}
{%- else -%}
e.{{identifier['field']}}
{%- endif -%}
,
{%- endfor -%}
NULL
) as session_identifier,
{%- else -%}
{% do exceptions.raise_compiler_error("Need to specify either session identifiers or custom session code") %}
{%- endif %}
{%- if user_sql -%}
{{ user_sql }} as user_identifier,
{%- elif user_identifiers|length > 0 %}
max(
COALESCE(
{% for identifier in user_identifiers %}
{%- if identifier['schema']|lower != 'atomic' -%}
{{ snowplow_utils.get_field(identifier['schema'], identifier['field'], 'e', dbt.type_string(), 0) }}
{%- else -%}
e.{{identifier['field']}}
{%- endif -%}
,
{%- endfor -%}
NULL
)
) as user_identifier, -- Edge case 1: Arbitary selection to avoid window function like first_value.
{% else %}
{% do exceptions.raise_compiler_error("Need to specify either session identifiers or custom session code") %}
{%- endif %}
min({{ session_timestamp }}) as start_tstamp,
max({{ session_timestamp }}) as end_tstamp
from {{ snowplow_events }} e
where
dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'dvce_created_tstamp') }} -- don't process data that's too late
and {{ session_timestamp }} >= {{ lower_limit }}
and {{ session_timestamp }} <= {{ upper_limit }}
and {{ snowplow_utils.app_id_filter(app_ids) }}
and {{ is_run_with_new_events }} --don't reprocess sessions that have already been processed.
{% if derived_tstamp_partitioned and target.type == 'bigquery' | as_bool() %} -- BQ only
and derived_tstamp >= {{ lower_limit }}
and derived_tstamp <= {{ upper_limit }}
{% endif %}
group by 1
), new_events_session_ids as (
select *
from new_events_session_ids_init e
{% if quarantined_sessions %}
where session_identifier is not null
and not exists (select 1 from {{ ref(quarantined_sessions) }} as a where a.session_identifier = e.session_identifier) -- don't continue processing v.long sessions
{%- endif %}
)
{% if is_incremental() %}
, previous_sessions as (
select *
from {{ this }}
where start_tstamp >= {{ session_lookback_limit }}
and {{ is_run_with_new_events }} --don't reprocess sessions that have already been processed.
)
, session_lifecycle as (
select
ns.session_identifier,
coalesce(self.user_identifier, ns.user_identifier) as user_identifier, -- Edge case 1: Take previous value to keep domain_userid consistent. Not deterministic but performant
least(ns.start_tstamp, coalesce(self.start_tstamp, ns.start_tstamp)) as start_tstamp,
greatest(ns.end_tstamp, coalesce(self.end_tstamp, ns.end_tstamp)) as end_tstamp -- BQ 1 NULL will return null hence coalesce
from new_events_session_ids ns
left join previous_sessions as self
on ns.session_identifier = self.session_identifier
where
self.session_identifier is null -- process all new sessions
or self.end_tstamp < {{ snowplow_utils.timestamp_add('day', max_session_days, 'self.start_tstamp') }} --stop updating sessions exceeding 3 days
)
{% else %}
, session_lifecycle as (
select * from new_events_session_ids
)
{% endif %}
select
sl.session_identifier,
sl.user_identifier,
sl.start_tstamp,
least({{ snowplow_utils.timestamp_add('day', max_session_days, 'sl.start_tstamp') }}, sl.end_tstamp) as end_tstamp -- limit session length to max_session_days
{% if target.type in ['databricks', 'spark'] -%}
, DATE(start_tstamp) as start_tstamp_date
{%- endif %}
from session_lifecycle sl
{% endset %}
{{ return(sessions_lifecycle_manifest_query) }}
{% endmacro %}
{% macro postgres__base_create_snowplow_sessions_lifecycle_manifest(session_identifiers, session_sql, session_timestamp, user_identifiers, user_sql, quarantined_sessions, derived_tstamp_partitioned, days_late_allowed, max_session_days, app_ids, snowplow_events_database, snowplow_events_schema, snowplow_events_table, event_limits_table, incremental_manifest_table, package_name) %}
{% set base_event_limits = ref(event_limits_table) %}
{% set lower_limit, upper_limit, _ = snowplow_utils.return_base_new_event_limits(base_event_limits) %}
{% set session_lookback_limit = snowplow_utils.get_session_lookback_limit(lower_limit) %}
{% set is_run_with_new_events = snowplow_utils.is_run_with_new_events(package_name, event_limits_table, incremental_manifest_table) %}
{% set snowplow_events = api.Relation.create(database=snowplow_events_database, schema=snowplow_events_schema, identifier=snowplow_events_table) %}
{% set sessions_lifecycle_manifest_query %}
with
{% if session_identifiers %}
{% for identifier in session_identifiers %}
{% if identifier['schema']|lower != 'atomic' %}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, identifier['schema'], lower_limit, upper_limit, identifier['prefix']) }},
{%- endif -%}
{% endfor %}
{% endif %}
{% if user_identifiers%}
{% for identifier in user_identifiers %}
{% if identifier['schema']|lower != 'atomic' %}
{{ snowplow_utils.get_sde_or_context(snowplow_events_schema, identifier['schema'], lower_limit, upper_limit, identifier['prefix']) }},
{%- endif -%}
{% endfor %}
{% endif %}
new_events_session_ids_init as (
select
{% if session_sql %}
{{ session_sql }} as session_identifier,
{% elif session_identifiers|length > 0 %}
COALESCE(
{% for identifier in session_identifiers %}
{%- if identifier['schema']|lower != 'atomic' -%}
{% if identifier['alias'] %}{{identifier['alias']}}{% else %}{{identifier['schema']}}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}_{{identifier['field']}}
{%- else -%}
e.{{identifier['field']}}
{%- endif -%}
,
{%- endfor -%}
NULL
) as session_identifier,
{% else %}
{% do exceptions.raise_compiler_error("Need to specify either session identifiers or custom session SQL") %}
{% endif %}
{% if user_sql %}
{{ user_sql }} as user_identifier,
{% elif user_identifiers|length > 0 %}
max(
COALESCE(
{% for identifier in user_identifiers %}
{%- if identifier['schema']|lower != 'atomic' %}
{% if identifier['alias'] %}{{identifier['alias']}}{% else %}{{identifier['schema']}}{% endif %}.{% if identifier['prefix'] %}{{ identifier['prefix'] }}{% else %}{{ identifier['schema']}}{% endif %}_{{identifier['field']}}
{%- else %}
e.{{identifier['field']}}
{%- endif -%}
,
{%- endfor -%}
NULL
)
) as user_identifier, -- Edge case 1: Arbitary selection to avoid window function like first_value.
{% else %}
{% do exceptions.raise_compiler_error("Need to specify either user identifiers or custom user SQL") %}
{% endif %}
min({{ session_timestamp }}) as start_tstamp,
max({{ session_timestamp }}) as end_tstamp
from {{ snowplow_events }} e
{% if session_identifiers|length > 0 %}
{% for identifier in session_identifiers %}
{%- if identifier['schema']|lower != 'atomic' -%}
left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{identifier['prefix']}}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{ identifier['prefix'] }}__tstamp
{% endif -%}
{% endfor %}
{% endif %}
{% if session_identifiers|length > 0 %}
{% for identifier in user_identifiers %}
{%- if identifier['schema']|lower != 'atomic' -%}
left join {{ identifier['schema'] }} {% if identifier['alias'] %}as {{ identifier['alias'] }}{% endif %} on e.event_id = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{identifier['prefix']}}__id and e.collector_tstamp = {% if identifier['alias'] %}{{ identifier['alias']}}{% else %}{{ identifier['schema'] }}{% endif %}.{{ identifier['prefix'] }}__tstamp
{% endif -%}
{% endfor %}
{% endif %}
where
dvce_sent_tstamp <= {{ snowplow_utils.timestamp_add('day', days_late_allowed, 'dvce_created_tstamp') }} -- don't process data that's too late
and {{ session_timestamp }} >= {{ lower_limit }}
and {{ session_timestamp }} <= {{ upper_limit }}
and {{ snowplow_utils.app_id_filter(app_ids) }}
and {{ is_run_with_new_events }} --don't reprocess sessions that have already been processed.
{% if derived_tstamp_partitioned and target.type == 'bigquery' | as_bool() %} -- BQ only
and derived_tstamp >= {{ lower_limit }}
and derived_tstamp <= {{ upper_limit }}
{% endif %}
group by 1
), new_events_session_ids as (
select *
from new_events_session_ids_init e
{% if quarantined_sessions %}
where session_identifier is not null
and not exists (select 1 from {{ ref(quarantined_sessions) }} as a where a.session_identifier = e.session_identifier) -- don't continue processing v.long sessions
{%- endif %}
)
{% if is_incremental() %}
, previous_sessions as (
select *
from {{ this }}
where start_tstamp >= {{ session_lookback_limit }}
and {{ is_run_with_new_events }} --don't reprocess sessions that have already been processed.
)
, session_lifecycle as (
select
ns.session_identifier,
coalesce(self.user_identifier, ns.user_identifier) as user_identifier, -- Edge case 1: Take previous value to keep domain_userid consistent. Not deterministic but performant
least(ns.start_tstamp, coalesce(self.start_tstamp, ns.start_tstamp)) as start_tstamp,
greatest(ns.end_tstamp, coalesce(self.end_tstamp, ns.end_tstamp)) as end_tstamp -- BQ 1 NULL will return null hence coalesce
from new_events_session_ids ns
left join previous_sessions as self
on ns.session_identifier = self.session_identifier
where
self.session_identifier is null -- process all new sessions
or self.end_tstamp < {{ snowplow_utils.timestamp_add('day', max_session_days, 'self.start_tstamp') }} --stop updating sessions exceeding 3 days
)
{% else %}
, session_lifecycle as (
select * from new_events_session_ids
)
{% endif %}
select
sl.session_identifier,
sl.user_identifier,
sl.start_tstamp,
least({{ snowplow_utils.timestamp_add('day', max_session_days, 'sl.start_tstamp') }}, sl.end_tstamp) as end_tstamp -- limit session length to max_session_days
from session_lifecycle sl
{% endset %}
{{ return(sessions_lifecycle_manifest_query) }}
{% endmacro %}
Depends On
- Macros
- macro.dbt.is_incremental
- macro.dbt.type_string
- macro.snowplow_utils.app_id_filter
- macro.snowplow_utils.get_field
- macro.snowplow_utils.get_sde_or_context
- macro.snowplow_utils.get_session_lookback_limit
- macro.snowplow_utils.is_run_with_new_events
- macro.snowplow_utils.return_base_new_event_limits
- macro.snowplow_utils.timestamp_add
Referenced By
Base Create Snowplow Sessions This Run​
macros/base/base_create_snowplow_sessions_this_run.sql
Description
This macro does not currently have a description.
Details
Code
{% macro base_create_snowplow_sessions_this_run(lifecycle_manifest_table='snowplow_base_sessions_lifecycle_manifest', new_event_limits_table='snowplow_base_new_event_limits') %}
{% set lifecycle_manifest = ref(lifecycle_manifest_table) %}
{% set new_event_limits = ref(new_event_limits_table) %}
{%- set lower_limit,
upper_limit,
session_start_limit = snowplow_utils.return_base_new_event_limits(new_event_limits) %}
{% set sessions_sql %}
select
s.session_identifier,
s.user_identifier,
s.start_tstamp,
-- end_tstamp used in next step to limit events. When backfilling, set end_tstamp to upper_limit if end_tstamp > upper_limit.
-- This ensures we don't accidentally process events after upper_limit
case when s.end_tstamp > {{ upper_limit }} then {{ upper_limit }} else s.end_tstamp end as end_tstamp
from {{ lifecycle_manifest }} s
where
-- General window of start_tstamps to limit table scans. Logic complicated by backfills.
-- To be within the run, session start_tstamp must be >= lower_limit - max_session_days as we limit end_tstamp in manifest to start_tstamp + max_session_days
s.start_tstamp >= {{ session_start_limit }}
and s.start_tstamp <= {{ upper_limit }}
-- Select sessions within window that either; start or finish between lower & upper limit, start and finish outside of lower and upper limits
and not (s.start_tstamp > {{ upper_limit }} or s.end_tstamp < {{ lower_limit }})
{% endset %}
{{ return(sessions_sql) }}
{% endmacro%}
Depends On
Referenced By
Base Get Quarantine Sql​
macros/base/base_quarantine_sessions.sql
Description
This macro does not currently have a description.
Details
Code
{% macro base_get_quarantine_sql(relation, max_session_length) %}
{# Find sessions exceeding max_session_days #}
{% set quarantine_sql -%}
select
session_identifier
from {{ relation }}
-- '=' since end_tstamp is restricted to start_tstamp + max_session_days
where end_tstamp = {{ snowplow_utils.timestamp_add(
'day',
max_session_length,
'start_tstamp'
) }}
{%- endset %}
{{ return(quarantine_sql) }}
{% endmacro %}
Depends On
Referenced By
Base Quarantine Sessions​
macros/base/base_quarantine_sessions.sql
Description
This macro does not currently have a description.
Details
Code
- raw
- default
- postgres
{% macro base_quarantine_sessions(max_session_length, quarantined_sessions='snowplow_base_quarantined_sessions', src_relation=this) %}
{{ return(adapter.dispatch('base_quarantine_sessions', 'snowplow_utils')(max_session_length, quarantined_sessions, src_relation)) }}
{% endmacro %}
{% macro default__base_quarantine_sessions(max_session_length, quarantined_sessions_str, src_relation) %}
{% set quarantined_sessions = ref(quarantined_sessions_str) %}
{% set sessions_to_quarantine_sql = snowplow_utils.base_get_quarantine_sql(src_relation, max_session_length) %}
{% set quarantine_query %}
merge into {{ quarantined_sessions }} trg
using ({{ sessions_to_quarantine_sql }}) src
on trg.session_identifier = src.session_identifier
when not matched then insert (session_identifier) values(session_identifier);
{% endset %}
{{ return(quarantine_query) }}
{% endmacro %}
{% macro postgres__base_quarantine_sessions(max_session_length, quarantined_sessions_str, src_relation) %}
{% set quarantined_sessions = ref(quarantined_sessions_str) %}
{% set sessions_to_quarantine_tmp = 'sessions_to_quarantine_tmp' %}
begin;
create temporary table {{ sessions_to_quarantine_tmp }} as (
{{ snowplow_utils.base_get_quarantine_sql(src_relation, max_session_length) }}
);
delete from {{ quarantined_sessions }}
where session_identifier in (select session_identifier from {{ sessions_to_quarantine_tmp }});
insert into {{ quarantined_sessions }} (
select session_identifier from {{ sessions_to_quarantine_tmp }});
drop table {{ sessions_to_quarantine_tmp }};
commit;
{% endmacro %}
Depends On
Referenced By
Cast To Tstamp​
macros/utils/cross_db/timestamp_functions.sql
Description
This macro does not currently have a description.
Details
Code
{% macro cast_to_tstamp(tstamp_literal) -%}
{% if tstamp_literal is none or tstamp_literal|lower in ['null',''] %}
cast(null as {{type_timestamp()}})
{% else %}
cast('{{tstamp_literal}}' as {{type_timestamp()}})
{% endif %}
{%- endmacro %}
Depends On
- Macros
- macro.dbt.type_timestamp
Referenced By
Cluster By Fields Sessions Lifecycle​
macros/utils/cross_db/cluster_by_fields.sql
Description
This macro does not currently have a description.
Details
Code
- raw
- default
{% macro cluster_by_fields_sessions_lifecycle() %}
{{ return(adapter.dispatch('cluster_by_fields_sessions_lifecycle', 'snowplow_utils')()) }}
{% endmacro %}
{% macro default__cluster_by_fields_sessions_lifecycle() %}
{{ return(snowplow_utils.get_value_by_target_type(bigquery_val=["session_identifier"], snowflake_val=["to_date(start_tstamp)"])) }}
{% endmacro %}
Depends On
Coalesce Field Paths​
macros/utils/bigquery/combine_column_versions/coalesce_field_paths.sql
Description
This macro does not currently have a description.
Details
Code
{% macro coalesce_field_paths(paths, field_alias, include_field_alias, relation_alias) %}
{% set relation_alias = '' if relation_alias is none else relation_alias~'.' %}
{% set field_alias = '' if not include_field_alias else ' as '~field_alias %}
{% set joined_paths = relation_alias~paths|join(', '~relation_alias) %}
{% set coalesced_field_paths = 'coalesce('~joined_paths~')'~field_alias %}
{{ return(coalesced_field_paths) }}
{% endmacro %}
Referenced By
Combine Column Versions​
macros/utils/bigquery/combine_column_versions/combine_column_versions.sql
Description
This macro does not currently have a description.
Details
Code
{% macro combine_column_versions(relation, column_prefix, required_fields=[], nested_level=none, level_filter='equalto', relation_alias=none, include_field_alias=true, array_index=0, max_nested_level=15, exclude_versions=[]) %}
{# Create field_alias if not supplied i.e. is not tuple #}
{% set required_fields_tmp = required_fields %}
{% set required_fields = [] %}
{% for field in required_fields_tmp %}
{% set field_tuple = snowplow_utils.get_field_alias(field) %}
{% do required_fields.append(field_tuple) %}
{% endfor %}
{% set required_field_names = required_fields|map(attribute=0)|list %}
{# Determines correct level_limit. This limits recursive iterations during unnesting. #}
{% set level_limit = snowplow_utils.get_level_limit(nested_level, level_filter, required_field_names) %}
{# Limit level_limit to max_nested_level if required #}
{% set level_limit = max_nested_level if level_limit is none or level_limit > max_nested_level else level_limit %}
{%- set matched_columns = snowplow_utils.get_columns_in_relation_by_column_prefix(relation, column_prefix) -%}
{# Removes excluded versions, assuming column name ends with a version of format 'X_X_X' #}
{%- set filter_columns_by_version = snowplow_utils.exclude_column_versions(matched_columns, exclude_versions) -%}
{%- set flattened_fields_by_col_version = [] -%}
{# Flatten fields within each column version. Returns nested arrays of dicts. #}
{# Dict: {'field_name': str, 'field_alias': str, 'flattened_path': str, 'nested_level': int #}
{% for column in filter_columns_by_version|sort(attribute='name', reverse=true) %}
{% set flattened_fields = snowplow_utils.flatten_fields(fields=column.fields,
parent=column,
path=column.name,
array_index=array_index,
level_limit=level_limit
) %}
{% do flattened_fields_by_col_version.append(flattened_fields) %}
{% endfor %}
{# Flatten nested arrays and merges fields across col version. Returns array of dicts containing all field_paths for field. #}
{# Dict: {'field_name': str, 'flattened_field_paths': str, 'nested_level': int #}
{% set merged_fields = snowplow_utils.merge_fields_across_col_versions(flattened_fields_by_col_version) %}
{# Filters merged_fields based on required_fields if provided, or the level filter if provided. Default return all fields. #}
{% set matched_fields = snowplow_utils.get_matched_fields(fields=merged_fields,
required_field_names=required_field_names,
nested_level=nested_level,
level_filter=level_filter
) %}
{% set coalesced_field_paths = [] %}
{% for field in matched_fields %}
{% set passed_field_alias = required_fields|selectattr(0, "equalto", field.field_name)|map(attribute=1)|list %}
{% set default_field_alias = field.field_name|replace('.', '_') %}
{# Use passed_field_alias from required_fields if supplied #}
{% set field_alias = default_field_alias if not passed_field_alias|length else passed_field_alias[0] %}
{# Coalesce each field's path across all version of columns, ordered by latest col version. #}
{% set coalesced_field_path = snowplow_utils.coalesce_field_paths(paths=field.field_paths,
field_alias=field_alias,
include_field_alias=include_field_alias,
relation_alias=relation_alias) %}
{% do coalesced_field_paths.append(coalesced_field_path) %}
{% endfor %}
{# Returns array of all coalesced field paths #}
{{ return(coalesced_field_paths) }}
{% endmacro %}
Depends On
- Macros
- macro.snowplow_utils.coalesce_field_paths
- macro.snowplow_utils.exclude_column_versions
- macro.snowplow_utils.flatten_fields
- macro.snowplow_utils.get_columns_in_relation_by_column_prefix
- macro.snowplow_utils.get_field_alias
- macro.snowplow_utils.get_level_limit
- macro.snowplow_utils.get_matched_fields
- macro.snowplow_utils.merge_fields_across_col_versions
Referenced By
Current Timestamp In Utc​
macros/utils/cross_db/timestamp_functions.sql
Description
This macro does not currently have a description.
Details
Code
- raw
- default
- postgres
- redshift
- snowflake
{% macro current_timestamp_in_utc() -%}
{{ return(adapter.dispatch('current_timestamp_in_utc', 'snowplow_utils')()) }}
{%- endmacro %}
{% macro default__current_timestamp_in_utc() %}
{{current_timestamp()}}
{% endmacro %}
{% macro postgres__current_timestamp_in_utc() %}
(current_timestamp at time zone 'utc')::{{type_timestamp()}}
{% endmacro %}
{% macro redshift__current_timestamp_in_utc() %}
{{ return(snowplow_utils.default__current_timestamp_in_utc()) }}
{% endmacro %}
{% macro snowflake__current_timestamp_in_utc() %}
convert_timezone('UTC', {{current_timestamp()}})::{{type_timestamp()}}
{% endmacro %}
Depends On
- Macros
- macro.dbt.current_timestamp
- macro.dbt.type_timestamp
Referenced By
- Models
- Macros
- model.snowplow_media_player.snowplow_media_player_media_stats
- model.snowplow_mobile.snowplow_mobile_app_errors_this_run
- model.snowplow_mobile.snowplow_mobile_screen_views_this_run
- model.snowplow_mobile.snowplow_mobile_sessions_this_run
- model.snowplow_web.snowplow_web_page_views_this_run
- model.snowplow_web.snowplow_web_sessions_this_run
- model.snowplow_web.snowplow_web_users_this_run
- model.snowplow_web.snowplow_web_vital_measurements
Exclude Column Versions​
macros/utils/bigquery/combine_column_versions/exclude_column_versions.sql
Description
This macro does not currently have a description.
Details
Code
{% macro exclude_column_versions(columns, exclude_versions) %}
{%- set filtered_columns_by_version = [] -%}
{% for column in columns %}
{%- set col_version = column.name[-5:] -%}
{% if col_version not in exclude_versions %}
{% do filtered_columns_by_version.append(column) %}
{% endif %}
{% endfor %}
{{ return(filtered_columns_by_version) }}
{% endmacro %}
Referenced By
Flatten Fields​
macros/utils/bigquery/combine_column_versions/flatten_fields.sql
Description
This macro does not currently have a description.
Details
Code
{% macro flatten_fields(fields, parent, path, array_index, level_limit=none, level_counter=1, flattened_fields=[], field_name='') %}
{% for field in fields %}
{# Only recurse up-until level_limit #}
{% if level_limit is not none and level_counter > level_limit %}
{{ return(flattened_fields) }}
{% endif %}
{# If parent column is an array then take element [array_index]. #}
{% set delimiter = '[safe_offset(%s)].'|format(array_index) if parent.mode == 'REPEATED' else '.' %}
{% set path = path~delimiter~field.name %}
{% set field_name = field_name~'.'~field.name if field_name != '' else field_name~field.name %}
{% set field_dict = {
'field_name': field_name,
'path': path,
'nested_level': level_counter
} %}
{% do flattened_fields.append(field_dict) %}
{# If field has nested fields recurse to extract all fields, unless array. #}
{% if field.dtype == 'RECORD' and field.mode != 'REPEATED' %}
{{ snowplow_utils.flatten_fields(
fields=field.fields,
parent=field,
level_limit=level_limit,
level_counter=level_counter+1,
path=path,
flattened_fields=flattened_fields,
field_name=field_name
) }}
{% endif %}
{% endfor %}
{{ return(flattened_fields) }}
{% endmacro %}
Depends On
Referenced By
Get Array To String​
macros/utils/cross_db/get_array_to_string.sql
Description
This macro takes care of harmonising cross-db array to string type functions. The macro supports a custom delimiter if you don't want to use a comma with no space (default).
Arguments
array_column
(string): Name of the column to join into a stringcolumn_prefix
(string): Table alias for the array_columndelimiter
(string): (Optional) String that determines how to delimit your array values. Default ','
Returns
The data warehouse appropriate sql to convert an array to a string.
Usage
select
...
{{ snowplow_utils.get_array_to_string('my_array_column', 'a', ', ') }}
...
from ... a
Details
Code
- raw
- default
- spark
{%- macro get_array_to_string(array_column, column_prefix, delimiter=',') -%}
{{ return(adapter.dispatch('get_array_to_string', 'snowplow_utils')(array_column, column_prefix, delimiter)) }}
{%- endmacro -%}
{% macro default__get_array_to_string(array_column, column_prefix, delimiter=',') %}
array_to_string({{column_prefix}}.{{array_column}},'{{delimiter}}')
{% endmacro %}
{% macro spark__get_array_to_string(array_column, column_prefix, delimiter=',') %}
array_join({{column_prefix}}.{{array_column}},'{{delimiter}}')
{% endmacro %}
Referenced By
Get Cluster By​
macros/utils/get_cluster_by.sql
Description
This macro does not currently have a description.
Details
Code
{% macro get_cluster_by(bigquery_cols=none, snowflake_cols=none) %}
{%- do exceptions.warn("Warning: the `get_cluster_by` macro is deprecated and will be removed in a future version of the package, please use `get_value_by_target_type` instead.") -%}
{% if target.type == 'bigquery' %}
{{ return(bigquery_cols) }}
{% elif target.type == 'snowflake' %}
{{ return(snowflake_cols) }}
{% endif %}
{% endmacro %}
Get Columns In Relation By Column Prefix​
macros/utils/get_columns_in_relation_by_column_prefix.sql
Description
This macro returns an array of column objects within a relation that start with the given column prefix. This is useful when you have multiple versions of a column within a table and want to dynamically identify all versions.
Arguments
relation
(relation): A table orref
type object to get the columns fromcolumn_prefix
(string): The prefix string to search for matching columns
Returns
An array of (column objects)[https://docs.getdbt.com/reference/dbt-classes#column]. The name of each column can be accessed with the name property.
Usage
get_columns_in_relation_by_column_prefix(ref('snowplow_web_base_events_this_run'), 'domain')
-- returns
['domain_sessionid', 'domain_userid', 'domain_sessionidx',...]
{% set matched_columns = snowplow_utils.get_columns_in_relation_by_column_prefix(
relation=ref('snowplow_web_base_events_this_run'),
column_prefix='custom_context_1_0_'
) %}
{% for column in matched_columns %}
{{ column.name }}
{% endfor %}
<h1>Renders</h1>
to something like:
'custom_context_1_0_1'
'custom_context_1_0_2'
'custom_context_1_0_3'
Details
Code
{% macro get_columns_in_relation_by_column_prefix(relation, column_prefix) %}
{# Prevent introspective queries during parsing #}
{%- if not execute -%}
{{ return('') }}
{% endif %}
{%- set columns = adapter.get_columns_in_relation(relation) -%}
{# get_columns_in_relation returns uppercase cols for snowflake so uppercase column_prefix #}
{%- set column_prefix = column_prefix.upper() if target.type == 'snowflake' else column_prefix -%}
{%- set matched_columns = [] -%}
{# add columns with matching prefix to matched_columns #}
{% for column in columns %}
{% if column.name.startswith(column_prefix) %}
{% do matched_columns.append(column) %}
{% endif %}
{% endfor %}
{% if matched_columns|length %}
{{ return(matched_columns) }}
{% else %}
{{ exceptions.raise_compiler_error("Snowplow: No columns found with prefix "~column_prefix) }}
{% endif %}
{% endmacro %}
Referenced By
Get Enabled Snowplow Models​
macros/incremental_hooks/get_enabled_snowplow_models.sql
Description
This macro does not currently have a description.
Details
Code
{% macro get_enabled_snowplow_models(package_name, graph_object=none, models_to_run=var("models_to_run", ""), base_events_table_name='snowplow_base_events_this_run') -%}
{# Override dbt graph object if graph_object is passed. Testing purposes #}
{% if graph_object is not none %}
{% set graph = graph_object %}
{% endif %}
{# models_to_run optionally passed using dbt ls command. This returns a string of models to be run. Split into list #}
{% if models_to_run|length %}
{% set selected_models = models_to_run.split(" ") %}
{% else %}
{% set selected_models = none %}
{% endif %}
{% set enabled_models = [] %}
{% set untagged_snowplow_models = [] %}
{% set snowplow_model_tag = package_name+'_incremental' %}
{% set snowplow_events_this_run_path = 'model.' + project_name + '.' + base_events_table_name %}
{% if execute %}
{% set nodes = graph.nodes.values() | selectattr("resource_type", "equalto", "model") %}
{% for node in nodes %}
{# If selected_models is specified, filter for these models #}
{% if selected_models is none or node.name in selected_models %}
{% if node.config.enabled and snowplow_model_tag not in node.tags and snowplow_events_this_run_path in node.depends_on.nodes %}
{%- do untagged_snowplow_models.append(node.name) -%}
{% endif %}
{% if node.config.enabled and snowplow_model_tag in node.tags %}
{%- do enabled_models.append(node.name) -%}
{% endif %}
{% endif %}
{% endfor %}
{% if untagged_snowplow_models|length %}
{#
Prints warning for models that reference snowplow_base_events_this_run but are untagged as '{package_name}_incremental'
Without this tagging these models will not be inserted into the manifest, breaking the incremental logic.
Only catches first degree dependencies rather than all downstream models
#}
{%- do exceptions.raise_compiler_error("Snowplow Warning: Untagged models referencing '"+snowplow_events_this_run_path+"'. Please refer to the Snowplow docs on tagging. "
+ "Models: "+ ', '.join(untagged_snowplow_models)) -%}
{% endif %}
{% if enabled_models|length == 0 %}
{%- do exceptions.raise_compiler_error("No enabled models identified.") -%}
{% endif %}
{% endif %}
{{ return(enabled_models) }}
{%- endmacro %}
Referenced By
- Models
- Macros
Get Field​
macros/utils/cross_db/get_field.sql
Description
This macro exists to make it easier to extract a field from our unstruct_
and contexts_
type columns for users in Snowflake, Databricks, and BigQuery (although you may prefer to use combine_column_versions
for BigQuery, as this manages multiple context versions and allows for extraction of multiple fields at the same time). The macro can handle type casting and selecting from arrays.
Arguments
column_name
(string): Name of the column to extract the field fromfield_name
(string): Name of the field to extracttable_alias
(string): (Optional) Alias of the table in your query that the column exists in. Defaultnone
(no table alias)type
(string): (Optional) Type to cast the field to if required. Defaultnone
(no casting)array_index
(integer): (Optional) Index of the array to select in case of multiple entries. UsesSAFE_OFFSET
for BigQuery. Defaultnone
(not an array)
Returns
SQL snippet to select the field specified from the column
Usage
Extracting a single field
select
{{ snowplow_utils.get_field(column_name = 'contexts_nl_basjes_yauaa_context_1',
field_name = 'agent_class',
table_alias = 'a',
type = 'string',
array_index = 0)}} as yauaa_agent_class
from
my_events_table a
Extracting multiple fields
select
{% for field in [('field1', 'string'), ('field2', 'numeric'), ...] %}
{{ snowplow_utils.get_field(column_name = 'contexts_nl_basjes_yauaa_context_1',
field_name = field[0],
table_alias = 'a',
type = field[1],
array_index = 0)}} as {{ field[0] }}
{% endfor %}
from
my_events_table a
Details
Code
- raw
- bigquery
- default
- snowflake
- spark
{% macro get_field(column_name, field_name, table_alias = none, type = none, array_index = none) %}
{{ return(adapter.dispatch('get_field', 'snowplow_utils')(column_name, field_name, table_alias, type, array_index)) }}
{% endmacro %}
{% macro bigquery__get_field(column_name, field_name, table_alias = none, type = none, array_index = none) %}
{%- if type -%}cast({%- endif -%}{%- if table_alias -%}{{table_alias}}.{%- endif -%}{{column_name}}{%- if array_index is not none -%}[SAFE_OFFSET({{array_index}})]{%- endif -%}.{{field_name}}{%- if type %} as {{type}}){%- endif -%}
{% endmacro %}
{% macro default__get_field(column_name, field_name, table_alias = none, type = none, array_index = none) %}
{% if execute %}
{% do exceptions.raise_compiler_error('Macro get_field only supports Bigquery, Snowflake, Spark, and Databricks, it is not supported for ' ~ target.type) %}
{% endif %}
{% endmacro %}
{% macro snowflake__get_field(column_name, field_name, table_alias = none, type = none, array_index = none) %}
{%- if type is none and execute -%}
{% do exceptions.warn("Warning: macro snowplow_utils.get_field is being used without a type provided, Snowflake will return a variant column in this case which is unlikely to be what you want.") %}
{%- endif -%}
{%- if table_alias -%}{{table_alias}}.{%- endif -%}{{column_name}}{%- if array_index is not none -%}[{{array_index}}]{%- endif -%}:{{field_name}}{%- if type -%}::{{type}}{%- endif -%}
{% endmacro %}
{% macro spark__get_field(column_name, field_name, table_alias = none, type = none, array_index = none) %}
{%- if table_alias -%}{{table_alias}}.{%- endif -%}{{column_name}}{%- if array_index is not none -%}[{{array_index}}]{%- endif -%}.{{field_name}}{%- if type -%}::{{type}}{%- endif -%}
{% endmacro %}
Referenced By
Get Field Alias​
macros/utils/bigquery/combine_column_versions/get_field_alias.sql
Description
This macro does not currently have a description.
Details
Code
{% macro get_field_alias(field) %}
{# Check if field is supplied as tuple e.g. (field_name, field_alias) #}
{% if field is iterable and field is not string %}
{{ return(field) }}
{% else %}
{{ return((field, field|replace('.', '_'))) }}
{% endif %}
{% endmacro %}
Referenced By
Get Incremental Manifest Status​
macros/incremental_hooks/get_incremental_manifest_status.sql
Description
This macro does not currently have a description.
Details
Code
{% macro get_incremental_manifest_status(incremental_manifest_table, models_in_run) -%}
{# In case of not execute just return empty strings to avoid hitting database #}
{% if not execute %}
{{ return(['', '', '', '']) }}
{% endif %}
{% set target_relation = adapter.get_relation(
database=incremental_manifest_table.database,
schema=incremental_manifest_table.schema,
identifier=incremental_manifest_table.name) %}
{% if target_relation is not none %}
{% set last_success_query %}
select min(last_success) as min_last_success,
max(last_success) as max_last_success,
coalesce(count(*), 0) as models
from {{ incremental_manifest_table }}
where model in ({{ snowplow_utils.print_list(models_in_run) }})
{% endset %}
{% set results = run_query(last_success_query) %}
{% if execute %}
{% set min_last_success = results.columns[0].values()[0] %}
{% set max_last_success = results.columns[1].values()[0] %}
{% set models_matched_from_manifest = results.columns[2].values()[0] %}
{% set has_matched_all_models = true if models_matched_from_manifest == models_in_run|length else false %}
{{ return([min_last_success, max_last_success, models_matched_from_manifest, has_matched_all_models]) }}
{% endif %}
{% else %}
{% do exceptions.warn("Snowplow Warning: " ~ incremental_manifest_table ~ " does not exist. This is expected if you are compiling a fresh installation of the dbt-snowplow-* packages.") %}
{{ return(['9999-01-01 00:00:00', '9999-01-01 00:00:00', 0, false]) }}
{% endif %}
{%- endmacro %}
Depends On
- Macros
- macro.dbt.run_query
- macro.snowplow_utils.print_list
Referenced By
Get Incremental Manifest Table Relation​
macros/incremental_hooks/get_incremental_manifest_table_relation.sql
Description
This macro does not currently have a description.
Details
Code
{% macro get_incremental_manifest_table_relation(package_name) %}
{%- set incremental_manifest_table = ref(package_name~'_incremental_manifest') -%}
{{ return(incremental_manifest_table) }}
{% endmacro %}
Referenced By
Get Level Limit​
macros/utils/bigquery/combine_column_versions/get_level_limit.sql
Description
This macro does not currently have a description.
Details
Code
{% macro get_level_limit(level, level_filter, required_field_names) %}
{% set accepted_level_filters = ['equalto','lessthan','greaterthan'] %}
{% if level_filter is not in accepted_level_filters %}
{% set incompatible_level_filter_error_message -%}
Error: Incompatible level filter arg. Accepted args: {{accepted_level_filters|join(', ')}}
{%- endset %}
{{ return(snowplow_utils.throw_compiler_error(incompatible_level_filter_error_message)) }}
{% endif %}
{% if level is not none and required_field_names|length %}
{% set double_filter_error_message -%}
Error: Cannot filter fields by both `required_fields` and `level` arg. Please use only one.
{%- endset %}
{{ return(snowplow_utils.throw_compiler_error(double_filter_error_message)) }}
{% endif %}
{% if required_field_names|length and level_filter != 'equalto' %}
{% set required_fields_error_message -%}
Error: To filter fields using `required_fields` arg, `level_filter` must be set to `equalto`
{%- endset %}
{{ return(snowplow_utils.throw_compiler_error(required_fields_error_message)) }}
{% endif %}
{# level_limit is inclusive #}
{% if level is not none %}
{% if level_filter == 'equalto' %}
{% set level_limit = level %}
{% elif level_filter == 'lessthan' %}
{% set level_limit = level -1 %}
{% elif level_filter == 'greaterthan' %}
{% set level_limit = none %}
{% endif %}
{% elif required_field_names|length %}
{% set field_depths = [] %}
{% for field in required_field_names %}
{% set field_depth = field.split('.')|length %}
{% do field_depths.append(field_depth) %}
{% endfor %}
{% set level_limit = field_depths|max %}
{% else %}
{# Case when selecting all available fields #}
{% set level_limit = none %}
{% endif %}
{{ return(level_limit) }}
{% endmacro %}
Depends On
Referenced By
Get Matched Fields​
macros/utils/bigquery/combine_column_versions/get_matched_fields.sql
Description
This macro does not currently have a description.
Details
Code
{% macro get_matched_fields(fields, required_field_names, nested_level, level_filter) %}
{% if not required_field_names|length %}
{% if nested_level is none %}
{% set matched_fields = fields %}
{% else %}
{% set matched_fields = fields|selectattr('nested_level',level_filter, nested_level)|list %}
{% endif %}
{% else %}
{% set matched_fields = fields|selectattr('field_name','in', required_field_names)|list %}
{% endif %}
{{ return(matched_fields) }}
{% endmacro %}
Referenced By
Get New Event Limits Table Relation​
macros/incremental_hooks/get_new_event_limits_table_relation.sql
Description
This macro does not currently have a description.
Details
Code
{% macro get_new_event_limits_table_relation(package_name) %}
{%- set new_event_limits_table = ref(package_name~'_base_new_event_limits') -%}
{{ return(new_event_limits_table) }}
{% endmacro %}
Referenced By
Get Optional Fields​
macros/utils/bigquery/get_optional_fields.sql
Description
This macro does not currently have a description.
Details
Code
{% macro get_optional_fields(enabled, fields, col_prefix, relation, relation_alias, include_field_alias=true) -%}
{%- if enabled -%}
{%- set combined_fields = snowplow_utils.combine_column_versions(
relation=relation,
column_prefix=col_prefix,
required_fields=fields|map(attribute='field')|list,
relation_alias=relation_alias,
include_field_alias=include_field_alias
) -%}
{{ combined_fields|join(',\n') }}
{%- else -%}
{% for field in fields %}
{%- set field_alias = snowplow_utils.get_field_alias(field.field)[1] -%}
cast(null as {{ field.dtype }}){%- if include_field_alias %} as {{ field_alias }}{%- endif %} {%- if not loop