Skip to content

Macros

Global usage notes

source_model syntax

dbt itself supports references to data via the ref() function for models, and the source() function for dbt sources.

AutomateDV provides the means for specifying sources for Data Vault structures with a source_model argument.

This behaves differently for the stage macro, which supports either style, shown below:

ref style

1
2
stg_customer:
  source_model: 'raw_customer'

source style

1
2
3
stg_customer:
  source_model:
    tpch_sample: 'LINEITEM'
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
version: 2

sources:
  - name: tpch_sample
    database: SNOWFLAKE_SAMPLE_DATA
    schema: TPCH_SF10
    tables:
      - name: LINEITEM
      - name: CUSTOMER
      - name: ORDERS
      - name: PARTSUPP
      - name: SUPPLIER
      - name: PART
      - name: NATION
      - name: REGION

The mapping provided for the source style is in the form source_name: table_name which mimics the syntax for the source() macro.

For all other structures (Hub, Link, Satellite, etc.) the source_model argument must be a string to denote a single staging source, or a list of strings to denote multiple staging sources, which must be names of models (minus the .sql).

Global variables

AutomateDV provides user-overridable global variables which allow you to configure different aspects of AutomateDV. These variables will be expanded in future versions of AutomateDV.

Hashing configuration

1
2
3
4
5
vars:
  hash: MD5
  concat_string: '||'
  null_placeholder_string: '^^'
  hash_content_casing: 'UPPER'

hash

Configure the type of hashing.

This can be one of:

  • MD5
  • SHA

Read more

concat_string

Configure the string value to use for concatenating strings together when hashing. By default, this is two pipe characters: '||'

Read more

null_placeholder_string

Configure the string value to use for replacing NULL values when hashing. By default, this is two caret characters: '^^'

hash_content_casing

This variable configures whether hashed columns are normalised with UPPER() when calculating the hash value.

This can be one of:

  • UPPER
  • DISABLED
1
2
3
source_model: raw_source
hashed_columns:
  CUSTOMER_HK: CUSTOMER_ID
1
CAST((MD5_BINARY(NULLIF(UPPER(TRIM(CAST(CUSTOMER_ID AS VARCHAR))), ''))) AS BINARY(16)) AS CUSTOMER_HK
1
2
3
source_model: raw_source
hashed_columns:
  CUSTOMER_HK: CUSTOMER_ID
1
CAST((MD5_BINARY(NULLIF(TRIM(CAST(CUSTOMER_ID AS VARCHAR)), ''))) AS BINARY(16)) AS CUSTOMER_HK

Info

We've added this config to give you more options when hashing. If there is logical difference between uppercase and lowercase values in your data, set this to DISABLED otherwise, the standard approach is to use UPPER

Ghost Record configuration

1
2
3
vars:
  enable_ghost_records: false
  system_record_value: 'AUTOMATE_DV_SYSTEM'

How Ghost Records work

In the Data Vault standards, ghost records are intended to provide equi-join capabilities for PIT tables when queries on a satellite at a point in time would otherwise return no records. Instead of having to handle NULLs and incur performance penalties for joins which do not return records, the ghost record is a single record inserted into a Satellite upon its first creation which can be used instead.

In AutomateDV this is implemented as an optional CTE which only gets created in the above circumstances and when the enable_ghost_records global variable is set to true.

A Ghost Record does not inherently mean anything (it is for performance only), and so the value of each column is set to NULL or a sensible meaningless value.

The below tables describe what a ghost record will look like:

Parameter Value
src_pk Binary MD5/SHA256) 0000..(x32) / 0000..(x64)
src_hashdiff (Binary MD5/SHA256) 0000..(x32) / 0000..(x64)
src_payload (Any) NULL
src_extra_columns (Any) NULL
src_eff (Date/Timestamp) 1900-01-01 / 1900-01-01 00:00:00
src_ldts (Date/Timestamp) 1900-01-01 / 1900-01-01 00:00:00
src_source (String) AUTOMATE_DV_SYSTEM (default)
CUSTOMER_HK HASHDIFF CUSTOMER_NAME CUSTOMER_DOB CUSTOMER_PHONE EFFECTIVE_FROM LOAD_DATETIME RECORD_SOURCE
000000... 000000... NULL NULL NULL 1900-01-01 00:00:00 1900-01-01 00:00:00 AUTOMATE_DV_SYSTEM

Ghost record source code

Check out how this works under-the-hood

enable_ghost_records

Enable the use of ghost records in your project. This can either be true or false, true will enable the configuration and false will disable it.

This will insert a ghost record to a satellite table whether it is a new table or pre-existing.

Before adding the ghost record, the satellite macro will check there is not already one loaded.

Note

If this is enabled on an existing project, the ghost-records will be inserted into the satellite on the first dbt run after enabling only

system_record_value

This will set the record source system for the ghost record. The default is AUTOMATE_DV_SYSTEM and can be changed to any string.

Note

If this is changed on an existing project, the source system of already loaded ghost records will not be changed unless you --full-refresh.

NULL Key configurations

1
2
3
vars:
  null_key_required: '-1'
  null_key_optional: '-2'

null_key_required

Configure the string value to use for replacing NULL values found in keys where a value is required, e.g. prior to hashing. By default, this is '-1'.

null_key_optional

Configure the string value to use for replacing NULL values found in optional keys. By default, this is '-2'.

Read more

Other global variables

1
2
3
4
vars:
  escape_char_left: '"'
  escape_char_right: '"'
  max_datetime: '9999-12-31 23:59:59.999999'

max_datetime

Configure the value for the maximum datetime.

This value will be used for showing that a record's effectivity is 'open' or 'current' in certain circumstances.

The default is variations on 9999-12-31 23:59:59.999999 where there is more or less nanosecond precision (9's after the .) depending on platform.

escape_char_left/escape_char_right

Configure the characters to use to delimit SQL column names when escaping. Column names are delimited when using the escaping feature of AutomateDV, and by default both the delimiting characters are double quotes following the SQL:1999 standard.

Here are some examples for different platforms:

1
2
3
4
...
vars:
  escape_char_left: '`'
  escape_char_right: '`'
1
2
3
4
...
vars:
  escape_char_left: '['
  escape_char_right: ']'
1
2
3
4
...
vars:
  escape_char_left: '"'
  escape_char_right: '"'

Table templates

(macros/tables)

These macros are the core of the package and can be called in your models to build the different types of tables needed for your Data Vault 2.0 Data Warehouse.

hub

view source:

Snowflake BigQuery SQLServer Databricks Postgres

Generates SQL to build a Hub table using the provided parameters.

Usage

1
2
3
{{ automate_dv.hub(src_pk=src_pk, src_nk=src_nk, src_ldts=src_ldts,
                   src_extra_columns=src_extra_columns,
                   src_source=src_source, source_model=source_model) }}

Parameters

Parameter Description Type Required?
src_pk Source primary key column List[String]/String
src_nk Source natural key column List[String]/String
src_extra_columns Select arbitrary columns from the source List[String]/String
src_ldts Source load date timestamp column String
src_source Name of the column containing the source ID List[String]/String
source_model Staging model name List[String]/String
Video Tutorial

Tip

Read the tutorial for more details

Example Metadata

See examples

Example Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
    LEFT JOIN ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.hub_customer_incremental AS d
    ON a.CUSTOMER_HK = d.CUSTOMER_HK
    WHERE d.CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
row_rank_2 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_orders AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
stage_union AS (
    SELECT * FROM row_rank_1
    UNION ALL
    SELECT * FROM row_rank_2
),
row_rank_union AS (
    SELECT ru.*,
           ROW_NUMBER() OVER(
               PARTITION BY ru.CUSTOMER_HK
               ORDER BY ru.LOAD_DATETIME, ru.RECORD_SOURCE ASC
           ) AS row_rank_number
    FROM stage_union AS ru
    WHERE ru.CUSTOMER_HK IS NOT NULL
    QUALIFY row_rank_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_union AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
row_rank_2 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_orders AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
stage_union AS (
    SELECT * FROM row_rank_1
    UNION ALL
    SELECT * FROM row_rank_2
),
row_rank_union AS (
    SELECT ru.*,
           ROW_NUMBER() OVER(
               PARTITION BY ru.CUSTOMER_HK
               ORDER BY ru.LOAD_DATETIME, ru.RECORD_SOURCE ASC
           ) AS row_rank_number
    FROM stage_union AS ru
    WHERE ru.CUSTOMER_HK IS NOT NULL
    QUALIFY row_rank_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_union AS a
    LEFT JOIN ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.hub_orders_multi_source_incremental AS d
    ON a.CUSTOMER_HK = d.CUSTOMER_HK
    WHERE d.CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
    LEFT JOIN `dbtvault-341416`.`dbtvault`.`hub_customer_incremental` AS d
    ON a.CUSTOMER_HK = d.CUSTOMER_HK
    WHERE d.CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
row_rank_2 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `dbtvault-341416`.`dbtvault`.`stg_orders` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
stage_union AS (
    SELECT * FROM row_rank_1
    UNION ALL
    SELECT * FROM row_rank_2
),
    row_rank_union AS (
    SELECT ru.*,
           ROW_NUMBER() OVER(
               PARTITION BY ru.CUSTOMER_HK
               ORDER BY ru.LOAD_DATETIME, ru.RECORD_SOURCE ASC
           ) AS row_rank_number
    FROM stage_union AS ru
    WHERE ru.CUSTOMER_HK IS NOT NULL
    QUALIFY row_rank_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_union AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
row_rank_2 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `dbtvault-341416`.`dbtvault`.`stg_orders` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
stage_union AS (
    SELECT * FROM row_rank_1
    UNION ALL
    SELECT * FROM row_rank_2
),
    row_rank_union AS (
    SELECT ru.*,
           ROW_NUMBER() OVER(
               PARTITION BY ru.CUSTOMER_HK
               ORDER BY ru.LOAD_DATETIME, ru.RECORD_SOURCE ASC
           ) AS row_rank_number
    FROM stage_union AS ru
    WHERE ru.CUSTOMER_HK IS NOT NULL
    QUALIFY row_rank_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_union AS a
    LEFT JOIN `dbtvault-341416`.`dbtvault`.`hub_orders_multi_source_incremental` AS d
    ON a.CUSTOMER_HK = d.CUSTOMER_HK
    WHERE d.CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
WITH row_rank_1 AS (
    SELECT CUSTOMER_HK, CUSTOMER_ID, LOAD_DATETIME, RECORD_SOURCE
    FROM (
        SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
               ROW_NUMBER() OVER(
                   PARTITION BY rr.CUSTOMER_HK
                   ORDER BY rr.LOAD_DATETIME
               ) AS row_number
        FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS rr
        WHERE rr.CUSTOMER_HK IS NOT NULL
    ) h
    WHERE h.row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
WITH row_rank_1 AS (
    SELECT CUSTOMER_HK, CUSTOMER_ID, LOAD_DATETIME, RECORD_SOURCE
    FROM (
        SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
               ROW_NUMBER() OVER(
                   PARTITION BY rr.CUSTOMER_HK
                   ORDER BY rr.LOAD_DATETIME
               ) AS row_number
        FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS rr
        WHERE rr.CUSTOMER_HK IS NOT NULL
    ) h
    WHERE h.row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
    LEFT JOIN "AUTOMATE_DV_TEST"."TEST"."hub_customer_incremental" AS d
    ON a.CUSTOMER_HK = d.CUSTOMER_HK
    WHERE d.CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
WITH row_rank_1 AS (
    SELECT CUSTOMER_HK, CUSTOMER_ID, LOAD_DATETIME, RECORD_SOURCE
    FROM (
        SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
               ROW_NUMBER() OVER(
                   PARTITION BY rr.CUSTOMER_HK
                   ORDER BY rr.LOAD_DATETIME
               ) AS row_number
        FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS rr
        WHERE rr.CUSTOMER_HK IS NOT NULL
    ) h
    WHERE h.row_number = 1
),
row_rank_2 AS (
    SELECT CUSTOMER_HK, CUSTOMER_ID, LOAD_DATETIME, RECORD_SOURCE
    FROM (
        SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
               ROW_NUMBER() OVER(
                   PARTITION BY rr.CUSTOMER_HK
                   ORDER BY rr.LOAD_DATETIME
               ) AS row_number
        FROM "AUTOMATE_DV_TEST"."TEST"."stg_orders" AS rr
        WHERE rr.CUSTOMER_HK IS NOT NULL
    ) h
    WHERE h.row_number = 1
),
stage_union AS (
    SELECT * FROM row_rank_1
    UNION ALL
    SELECT * FROM row_rank_2
),
row_rank_union AS (
    SELECT *
    FROM (
        SELECT ru.*,
               ROW_NUMBER() OVER(
                   PARTITION BY ru.CUSTOMER_HK
                   ORDER BY ru.LOAD_DATETIME, ru.RECORD_SOURCE ASC
               ) AS row_rank_number
        FROM stage_union AS ru
        WHERE ru.CUSTOMER_HK IS NOT NULL
    ) h
    WHERE h.row_rank_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_union AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
WITH row_rank_1 AS (
    SELECT CUSTOMER_HK, CUSTOMER_ID, LOAD_DATETIME, RECORD_SOURCE
    FROM (
        SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
               ROW_NUMBER() OVER(
                   PARTITION BY rr.CUSTOMER_HK
                   ORDER BY rr.LOAD_DATETIME
               ) AS row_number
        FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS rr
        WHERE rr.CUSTOMER_HK IS NOT NULL
    ) h
    WHERE h.row_number = 1
),
row_rank_2 AS (
    SELECT CUSTOMER_HK, CUSTOMER_ID, LOAD_DATETIME, RECORD_SOURCE
    FROM (
        SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
               ROW_NUMBER() OVER(
                   PARTITION BY rr.CUSTOMER_HK
                   ORDER BY rr.LOAD_DATETIME
               ) AS row_number
        FROM "AUTOMATE_DV_TEST"."TEST"."stg_orders" AS rr
        WHERE rr.CUSTOMER_HK IS NOT NULL
    ) h
    WHERE h.row_number = 1
),
stage_union AS (
    SELECT * FROM row_rank_1
    UNION ALL
    SELECT * FROM row_rank_2
),
row_rank_union AS (
    SELECT *
    FROM (
        SELECT ru.*,
               ROW_NUMBER() OVER(
                   PARTITION BY ru.CUSTOMER_HK
                   ORDER BY ru.LOAD_DATETIME, ru.RECORD_SOURCE ASC
               ) AS row_rank_number
        FROM stage_union AS ru
        WHERE ru.CUSTOMER_HK IS NOT NULL
    ) h
    WHERE h.row_rank_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_union AS a
    LEFT JOIN "AUTOMATE_DV_TEST"."TEST"."hub_orders_multi_source_incremental" AS d
    ON a.CUSTOMER_HK = d.CUSTOMER_HK
    WHERE d.CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
WITH row_rank_1 AS (
    SELECT DISTINCT ON (rr.CUSTOMER_HK) rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_customer" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    ORDER BY rr.CUSTOMER_HK, rr.LOAD_DATETIME
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
WITH row_rank_1 AS (
    SELECT DISTINCT ON (rr.CUSTOMER_HK) rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_customer" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    ORDER BY rr.CUSTOMER_HK, rr.LOAD_DATETIME
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
    LEFT JOIN "dbtvault_db"."development"."hub_customer_incremental" AS d
    ON a.CUSTOMER_HK = d.CUSTOMER_HK
    WHERE d.CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
WITH row_rank_1 AS (
    SELECT DISTINCT ON (rr.CUSTOMER_HK) rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_customer" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    ORDER BY rr.CUSTOMER_HK, rr.LOAD_DATETIME
),
row_rank_2 AS (
    SELECT DISTINCT ON (rr.CUSTOMER_HK) rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_orders" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    ORDER BY rr.CUSTOMER_HK, rr.LOAD_DATETIME
),
stage_union AS (
    SELECT * FROM row_rank_1
    UNION ALL
    SELECT * FROM row_rank_2
),
row_rank_union AS (SELECT DISTINCT ON (ru.CUSTOMER_HK) ru.*
    FROM stage_union AS ru
    WHERE ru.CUSTOMER_HK IS NOT NULL
    ORDER BY ru.CUSTOMER_HK, ru.LOAD_DATETIME, ru.RECORD_SOURCE ASC
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_union AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
WITH row_rank_1 AS (
    SELECT DISTINCT ON (rr.CUSTOMER_HK) rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_customer" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    ORDER BY rr.CUSTOMER_HK, rr.LOAD_DATETIME
),
row_rank_2 AS (
    SELECT DISTINCT ON (rr.CUSTOMER_HK) rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_orders" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    ORDER BY rr.CUSTOMER_HK, rr.LOAD_DATETIME
),
stage_union AS (
    SELECT * FROM row_rank_1
    UNION ALL
    SELECT * FROM row_rank_2
),
row_rank_union AS (SELECT DISTINCT ON (ru.CUSTOMER_HK) ru.*
    FROM stage_union AS ru
    WHERE ru.CUSTOMER_HK IS NOT NULL
    ORDER BY ru.CUSTOMER_HK, ru.LOAD_DATETIME, ru.RECORD_SOURCE ASC
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_union AS a
    LEFT JOIN "dbtvault_db"."development"."hub_orders_multi_source_incremental" AS d
    ON a.CUSTOMER_HK = d.CUSTOMER_HK
    WHERE d.CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
    LEFT JOIN `hive_metastore`.`dbtvault`.`hub_customer_incremental` AS d
    ON a.CUSTOMER_HK = d.CUSTOMER_HK
    WHERE d.CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
row_rank_2 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `hive_metastore`.`dbtvault`.`stg_orders` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
stage_union AS (
    SELECT * FROM row_rank_1
    UNION ALL
    SELECT * FROM row_rank_2
),
row_rank_union AS (
    SELECT ru.*,
           ROW_NUMBER() OVER(
               PARTITION BY ru.CUSTOMER_HK
               ORDER BY ru.LOAD_DATETIME, ru.RECORD_SOURCE ASC
           ) AS row_rank_number
    FROM stage_union AS ru
    WHERE ru.CUSTOMER_HK IS NOT NULL
    QUALIFY row_rank_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_union AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
row_rank_2 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `hive_metastore`.`dbtvault`.`stg_orders` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    QUALIFY row_number = 1
),
stage_union AS (
    SELECT * FROM row_rank_1
    UNION ALL
    SELECT * FROM row_rank_2
),
row_rank_union AS (
    SELECT ru.*,
           ROW_NUMBER() OVER(
               PARTITION BY ru.CUSTOMER_HK
               ORDER BY ru.LOAD_DATETIME, ru.RECORD_SOURCE ASC
           ) AS row_rank_number
    FROM stage_union AS ru
    WHERE ru.CUSTOMER_HK IS NOT NULL
    QUALIFY row_rank_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_union AS a
    LEFT JOIN `hive_metastore`.`dbtvault`.`hub_orders_multi_source_incremental` AS d
    ON a.CUSTOMER_HK = d.CUSTOMER_HK
    WHERE d.CUSTOMER_HK IS NULL
)
SELECT * FROM records_to_insert

view source:

Snowflake BigQuery SQLServer Databricks Postgres

Generates SQL to build a Link table using the provided parameters.

Usage

1
2
3
{{ automate_dv.link(src_pk=src_pk, src_fk=src_fk, src_ldts=src_ldts,
                    src_extra_columns=src_extra_columns,
                    src_source=src_source, source_model=source_model) }}

Parameters

Parameter Description Type Required?
src_pk Source primary key column List[String]/String
src_fk Source foreign key column(s) List[String]
src_extra_columns Select arbitrary columns from the source List[String]/String
src_ldts Source load date timestamp column String
src_source Name of the column containing the source ID List[String]/String
source_model Staging model name List[String]/String
Video Tutorial

Tip

Read the tutorial for more details

Example Metadata

See examples

Example Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
WITH row_rank_1 AS (
    SELECT *
    FROM
    (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    ) l
    WHERE l.row_number = 1
    ),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
WITH row_rank_1 AS (
    SELECT *
    FROM
    (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    ) l
    WHERE l.row_number = 1
    ),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
WITH row_rank_1 AS (
    SELECT *
    FROM
    (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    ) l
    WHERE l.row_number = 1
    ),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
WITH row_rank_1 AS (
    SELECT *
    FROM
    (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    ) l
    WHERE l.row_number = 1
    ),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
WITH row_rank_1 AS (
  SELECT * FROM (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM "dbtvault_db"."development"."stg_customer" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
  ) as l
  WHERE row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
WITH row_rank_1 AS (
  SELECT * FROM (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM "dbtvault_db"."development"."stg_customer" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
  ) as l
  WHERE row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
WITH row_rank_1 AS (
  SELECT * FROM (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM "dbtvault_db"."development"."stg_customer" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
  ) as l
  WHERE row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
WITH row_rank_1 AS (
  SELECT * FROM (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM "dbtvault_db"."development"."stg_customer" AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
  ) as l
  WHERE row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
WITH row_rank_1 AS (
    SELECT rr.CUSTOMER_HK, rr.CUSTOMER_ID, rr.LOAD_DATETIME, rr.RECORD_SOURCE,
           ROW_NUMBER() OVER(
               PARTITION BY rr.CUSTOMER_HK
               ORDER BY rr.LOAD_DATETIME
           ) AS row_number
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS rr
    WHERE rr.CUSTOMER_HK IS NOT NULL
    AND rr.CUSTOMER_ID IS NOT NULL
    QUALIFY row_number = 1
),
records_to_insert AS (
    SELECT a.CUSTOMER_HK, a.CUSTOMER_ID, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM row_rank_1 AS a
)
SELECT * FROM records_to_insert

Snowflake BigQuery SQLServer Databricks Postgres

Generates SQL to build a Transactional Link table using the provided parameters.

Usage

1
2
3
4
{{ automate_dv.t_link(src_pk=src_pk, src_fk=src_fk, src_payload=src_payload,
                   src_extra_columns=src_extra_columns,
                   src_eff=src_eff, src_ldts=src_ldts, 
                   src_source=src_source, source_model=source_model) }}

Parameters

Parameter Description Type Required?
src_pk Source primary key column List[String]/String
src_fk Source foreign key column(s) List[String]
src_payload Source payload column(s) List[String]
src_extra_columns Select arbitrary columns from the source List[String]/String
src_eff Source effective from column String
src_ldts Source load date timestamp column String
src_source Name of the column containing the source ID String
source_model Staging model name String

Tip

Read the tutorial for more details

Example Metadata

See examples

Example Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
WITH stage AS (
    SELECT TRANSACTION_HK, CUSTOMER_HK, ORDER_HK, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, o_totalprice, o_orderstatus, EFFECTIVE_FROM, LOAD_DATETIME, RECORD_SOURCE
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_transactions
    WHERE TRANSACTION_HK IS NOT NULL
    AND CUSTOMER_HK IS NOT NULL
    AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT DISTINCT stg.TRANSACTION_HK, stg.CUSTOMER_HK, stg.ORDER_HK, stg.o_orderdate, stg.o_orderpriority, stg.o_clerk, stg.o_shippriority, stg.o_comment, stg.o_totalprice, stg.o_orderstatus, stg.EFFECTIVE_FROM, stg.LOAD_DATETIME, stg.RECORD_SOURCE
    FROM stage AS stg
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
WITH stage AS (
    SELECT TRANSACTION_HK, CUSTOMER_HK, ORDER_HK, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, o_totalprice, o_orderstatus, EFFECTIVE_FROM, LOAD_DATETIME, RECORD_SOURCE
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_transactions
    WHERE TRANSACTION_HK IS NOT NULL
    AND CUSTOMER_HK IS NOT NULL
    AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT DISTINCT stg.TRANSACTION_HK, stg.CUSTOMER_HK, stg.ORDER_HK, stg.o_orderdate, stg.o_orderpriority, stg.o_clerk, stg.o_shippriority, stg.o_comment, stg.o_totalprice, stg.o_orderstatus, stg.EFFECTIVE_FROM, stg.LOAD_DATETIME, stg.RECORD_SOURCE
    FROM stage AS stg
    LEFT JOIN ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.t_link_transactions_incremental AS tgt
    ON stg.TRANSACTION_HK = tgt.TRANSACTION_HK
    WHERE tgt.TRANSACTION_HK IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
WITH stage AS (
    SELECT TRANSACTION_HK, CUSTOMER_HK, ORDER_HK, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, o_totalprice, o_orderstatus, EFFECTIVE_FROM, LOAD_DATETIME, RECORD_SOURCE
    FROM `dbtvault-341416`.`dbtvault`.`stg_transactions`
    WHERE TRANSACTION_HK IS NOT NULL
    AND CUSTOMER_HK IS NOT NULL
    AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT DISTINCT stg.TRANSACTION_HK, stg.CUSTOMER_HK, stg.ORDER_HK, stg.o_orderdate, stg.o_orderpriority, stg.o_clerk, stg.o_shippriority, stg.o_comment, stg.o_totalprice, stg.o_orderstatus, stg.EFFECTIVE_FROM, stg.LOAD_DATETIME, stg.RECORD_SOURCE
    FROM stage AS stg
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
WITH stage AS (
    SELECT TRANSACTION_HK, CUSTOMER_HK, ORDER_HK, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, o_totalprice, o_orderstatus, EFFECTIVE_FROM, LOAD_DATETIME, RECORD_SOURCE
    FROM `dbtvault-341416`.`dbtvault`.`stg_transactions`
    WHERE TRANSACTION_HK IS NOT NULL
    AND CUSTOMER_HK IS NOT NULL
    AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT DISTINCT stg.TRANSACTION_HK, stg.CUSTOMER_HK, stg.ORDER_HK, stg.o_orderdate, stg.o_orderpriority, stg.o_clerk, stg.o_shippriority, stg.o_comment, stg.o_totalprice, stg.o_orderstatus, stg.EFFECTIVE_FROM, stg.LOAD_DATETIME, stg.RECORD_SOURCE
    FROM stage AS stg
    LEFT JOIN `dbtvault-341416`.`dbtvault`.`t_link_transactions_incremental` AS tgt
    ON stg.TRANSACTION_HK = tgt.TRANSACTION_HK
    WHERE tgt.TRANSACTION_HK IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
WITH stage AS (
    SELECT TRANSACTION_HK, CUSTOMER_HK, ORDER_HK, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, o_totalprice, o_orderstatus, EFFECTIVE_FROM, LOAD_DATETIME, RECORD_SOURCE
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_transactions"
    WHERE TRANSACTION_HK IS NOT NULL
    AND CUSTOMER_HK IS NOT NULL
    AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT DISTINCT stg.TRANSACTION_HK, stg.CUSTOMER_HK, stg.ORDER_HK, stg.o_orderdate, stg.o_orderpriority, stg.o_clerk, stg.o_shippriority, stg.o_comment, stg.o_totalprice, stg.o_orderstatus, stg.EFFECTIVE_FROM, stg.LOAD_DATETIME, stg.RECORD_SOURCE
    FROM stage AS stg
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
WITH stage AS (
    SELECT TRANSACTION_HK, CUSTOMER_HK, ORDER_HK, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, o_totalprice, o_orderstatus, EFFECTIVE_FROM, LOAD_DATETIME, RECORD_SOURCE
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_transactions"
    WHERE TRANSACTION_HK IS NOT NULL
    AND CUSTOMER_HK IS NOT NULL
    AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT DISTINCT stg.TRANSACTION_HK, stg.CUSTOMER_HK, stg.ORDER_HK, stg.o_orderdate, stg.o_orderpriority, stg.o_clerk, stg.o_shippriority, stg.o_comment, stg.o_totalprice, stg.o_orderstatus, stg.EFFECTIVE_FROM, stg.LOAD_DATETIME, stg.RECORD_SOURCE
    FROM stage AS stg
    LEFT JOIN "AUTOMATE_DV_TEST"."TEST"."t_link_transactions_incremental" AS tgt
    ON stg.TRANSACTION_HK = tgt.TRANSACTION_HK
    WHERE tgt.TRANSACTION_HK IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
WITH stage AS (
    SELECT TRANSACTION_HK, CUSTOMER_HK, ORDER_HK, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, o_totalprice, o_orderstatus, EFFECTIVE_FROM, LOAD_DATETIME, RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_transactions"
    WHERE TRANSACTION_HK IS NOT NULL
    AND CUSTOMER_HK IS NOT NULL
    AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT DISTINCT stg.TRANSACTION_HK, stg.CUSTOMER_HK, stg.ORDER_HK, stg.o_orderdate, stg.o_orderpriority, stg.o_clerk, stg.o_shippriority, stg.o_comment, stg.o_totalprice, stg.o_orderstatus, stg.EFFECTIVE_FROM, stg.LOAD_DATETIME, stg.RECORD_SOURCE
    FROM stage AS stg
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
WITH stage AS (
    SELECT TRANSACTION_HK, CUSTOMER_HK, ORDER_HK, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, o_totalprice, o_orderstatus, EFFECTIVE_FROM, LOAD_DATETIME, RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_transactions"
    WHERE TRANSACTION_HK IS NOT NULL
    AND CUSTOMER_HK IS NOT NULL
    AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT DISTINCT stg.TRANSACTION_HK, stg.CUSTOMER_HK, stg.ORDER_HK, stg.o_orderdate, stg.o_orderpriority, stg.o_clerk, stg.o_shippriority, stg.o_comment, stg.o_totalprice, stg.o_orderstatus, stg.EFFECTIVE_FROM, stg.LOAD_DATETIME, stg.RECORD_SOURCE
    FROM stage AS stg
    LEFT JOIN "dbtvault_db"."development"."t_link_transactions_incremental" AS tgt
    ON stg.TRANSACTION_HK = tgt.TRANSACTION_HK
    WHERE tgt.TRANSACTION_HK IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
WITH stage AS (
    SELECT TRANSACTION_HK, CUSTOMER_HK, ORDER_HK, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, o_totalprice, o_orderstatus, EFFECTIVE_FROM, LOAD_DATETIME, RECORD_SOURCE
    FROM `hive_metastore`.`dbtvault`.`stg_transactions`
    WHERE TRANSACTION_HK IS NOT NULL
    AND CUSTOMER_HK IS NOT NULL
    AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT DISTINCT stg.TRANSACTION_HK, stg.CUSTOMER_HK, stg.ORDER_HK, stg.o_orderdate, stg.o_orderpriority, stg.o_clerk, stg.o_shippriority, stg.o_comment, stg.o_totalprice, stg.o_orderstatus, stg.EFFECTIVE_FROM, stg.LOAD_DATETIME, stg.RECORD_SOURCE
    FROM stage AS stg
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
WITH stage AS (
    SELECT TRANSACTION_HK, CUSTOMER_HK, ORDER_HK, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, o_totalprice, o_orderstatus, EFFECTIVE_FROM, LOAD_DATETIME, RECORD_SOURCE
    FROM `hive_metastore`.`dbtvault`.`stg_transactions`
    WHERE TRANSACTION_HK IS NOT NULL
    AND CUSTOMER_HK IS NOT NULL
    AND ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT DISTINCT stg.TRANSACTION_HK, stg.CUSTOMER_HK, stg.ORDER_HK, stg.o_orderdate, stg.o_orderpriority, stg.o_clerk, stg.o_shippriority, stg.o_comment, stg.o_totalprice, stg.o_orderstatus, stg.EFFECTIVE_FROM, stg.LOAD_DATETIME, stg.RECORD_SOURCE
    FROM stage AS stg
    LEFT JOIN `hive_metastore`.`dbtvault`.`t_link_transactions_incremental` AS tgt
    ON stg.TRANSACTION_HK = tgt.TRANSACTION_HK
    WHERE tgt.TRANSACTION_HK IS NULL
)
SELECT * FROM records_to_insert

sat

view source:

Snowflake BigQuery SQLServer Databricks Postgres

Generates SQL to build a Satellite table using the provided parameters.

Usage

1
2
3
4
{{ automate_dv.sat(src_pk=src_pk, src_hashdiff=src_hashdiff, src_payload=src_payload,
                   src_extra_columns=src_extra_columns,
                   src_eff=src_eff, src_ldts=src_ldts, 
                   src_source=src_source, source_model=source_model) }}

Parameters

Parameter Description Type Required?
src_pk Source primary key column List[String]/String
src_hashdiff Source hashdiff column String
src_payload Source payload column(s) List[String]
src_extra_columns Select arbitrary columns from the source List[String]/String
src_eff Source effective from column String
src_ldts Source load date timestamp column String
src_source Name of the column containing the source ID String
source_model Staging model name String
Video Tutorial

Tip

Read the tutorial for more details

Satellite Behaviour Flags

This section covers global variables (var) and config options that affect the behaviour of satellites.

Parameter Description Type Flag type Required?
apply_source_filter Adds additional logic to filter the source_model data Boolean config
enable_ghost_records Adds a single ghost record to the satellite Boolean var

Added in v0.10.1

This config option adds a WHERE clause (in incremental mode) using an additional CTE in the SQL code to filter the source_model's data

This ensures that records in the source data are filtered so that only records with src_ldts after the MAX ldts in the existing Satellite are processed during the satellite load.

It is intended for this config option to be used if you cannot guarantee atomic/idempotent batches i.e. only data which has not been loaded yet in your stage data.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
-- sat_customer_details.sql
{{
  config(
    apply_source_filter = true
  )
}}

{% set src_pk = ... %}
...

{{ automate_dv.sat(src_pk=src_pk, src_hashdiff=src_hashdiff, src_payload=src_payload,
                   src_extra_columns=src_extra_columns,
                   src_eff=src_eff, src_ldts=src_ldts, 
                   src_source=src_source, source_model=source_model) }}

This global variable option enables additional logic to add a ghost record upon first creation OR once when running in incremental mode if a ghost record has not already been added.

Read more about ghost records.

Example Metadata

See examples

Example Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
    SELECT
    sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
    RANK() OVER (
            PARTITION BY sd.CUSTOMER_HK
            ORDER BY sd.LOAD_DATETIME ASC
        ) as asc_rank
    FROM source_data as sd
    QUALIFY asc_rank = 1
),
unique_source_records AS (
    SELECT DISTINCT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE
    FROM source_data as sd
    QUALIFY sd.HASHDIFF != LAG(sd.HASHDIFF) OVER (
        PARTITION BY sd.CUSTOMER_HK
        ORDER BY sd.LOAD_DATETIME ASC)
),
records_to_insert AS (
    SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
    FROM first_record_in_set AS frin
    UNION
    SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
    FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
latest_records AS (
    SELECT current_records.CUSTOMER_HK, current_records.HASHDIFF, current_records.CUSTOMER_NAME, current_records.CUSTOMER_ADDRESS, current_records.CUSTOMER_PHONE, current_records.ACCBAL, current_records.MKTSEGMENT, current_records.COMMENT, current_records.EFFECTIVE_FROM, current_records.LOAD_DATETIME, current_records.RECORD_SOURCE,
        RANK() OVER (
           PARTITION BY current_records.CUSTOMER_HK
           ORDER BY current_records.LOAD_DATETIME DESC
        ) AS rank_num
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.satellite_incremental AS current_records
        JOIN (
            SELECT DISTINCT source_data.CUSTOMER_HK
            FROM source_data
        ) AS source_records
            ON source_records.CUSTOMER_HK = current_records.CUSTOMER_HK
    QUALIFY rank_num = 1
),
first_record_in_set AS (
    SELECT
    sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
    RANK() OVER (
            PARTITION BY sd.CUSTOMER_HK
            ORDER BY sd.LOAD_DATETIME ASC
        ) as asc_rank
    FROM source_data as sd
    QUALIFY asc_rank = 1
),
unique_source_records AS (
    SELECT DISTINCT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE
    FROM source_data as sd
    QUALIFY sd.HASHDIFF != LAG(sd.HASHDIFF) OVER (
        PARTITION BY sd.CUSTOMER_HK
        ORDER BY sd.LOAD_DATETIME ASC)
),
records_to_insert AS (
    SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
    FROM first_record_in_set AS frin
    LEFT JOIN LATEST_RECORDS lr
        ON lr.CUSTOMER_HK = frin.CUSTOMER_HK
        AND lr.HASHDIFF = frin.HASHDIFF
        WHERE lr.HASHDIFF IS NULL
    UNION
    SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
    FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
    SELECT
    sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
    RANK() OVER (
            PARTITION BY sd.CUSTOMER_HK
            ORDER BY sd.LOAD_DATETIME ASC
        ) as asc_rank
    FROM source_data as sd
    QUALIFY asc_rank = 1
),
unique_source_records AS (
    SELECT DISTINCT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE
    FROM source_data as sd
    QUALIFY sd.HASHDIFF != LAG(sd.HASHDIFF) OVER (
        PARTITION BY sd.CUSTOMER_HK
        ORDER BY sd.LOAD_DATETIME ASC)
),
ghost AS (
    SELECT
    NULL AS CUSTOMER_NAME,
    NULL AS CUSTOMER_PHONE,
    NULL AS CUSTOMER_ADDRESS,
    NULL AS ACCBAL,
    NULL AS MKTSEGMENT,
    NULL AS COMMENT,
    TO_TIMESTAMP_NTZ('1900-01-01 00:00:00') AS LOAD_DATETIME,
    TO_TIMESTAMP_NTZ('1900-01-01 00:00:00') AS EFFECTIVE_FROM,
    CAST('AUTOMATE_DV_SYSTEM' AS VARCHAR) AS RECORD_SOURCE,
    CAST('00000000000000000000000000000000' AS BINARY(16)) AS CUSTOMER_HK,
    CAST('00000000000000000000000000000000' AS BINARY(16)) AS HASHDIFF
),
records_to_insert AS (
    SELECT
        g.CUSTOMER_HK, g.HASHDIFF, g.CUSTOMER_NAME, g.CUSTOMER_ADDRESS, g.CUSTOMER_PHONE, g.ACCBAL, g.MKTSEGMENT, g.COMMENT, g.EFFECTIVE_FROM, g.LOAD_DATETIME, g.RECORD_SOURCE
        FROM ghost AS g
    UNION
    SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
    FROM first_record_in_set AS frin
    UNION
    SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
    FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
latest_records AS (
    SELECT current_records.CUSTOMER_HK, current_records.HASHDIFF, current_records.CUSTOMER_NAME, current_records.CUSTOMER_ADDRESS, current_records.CUSTOMER_PHONE, current_records.ACCBAL, current_records.MKTSEGMENT, current_records.COMMENT, current_records.EFFECTIVE_FROM, current_records.LOAD_DATETIME, current_records.RECORD_SOURCE,
        RANK() OVER (
           PARTITION BY current_records.CUSTOMER_HK
           ORDER BY current_records.LOAD_DATETIME DESC
        ) AS rank_num
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.satellite_ghost_incremental AS current_records
        JOIN (
            SELECT DISTINCT source_data.CUSTOMER_HK
            FROM source_data
        ) AS source_records
            ON source_records.CUSTOMER_HK = current_records.CUSTOMER_HK
    QUALIFY rank_num = 1
),
first_record_in_set AS (
    SELECT
    sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
    RANK() OVER (
            PARTITION BY sd.CUSTOMER_HK
            ORDER BY sd.LOAD_DATETIME ASC
        ) as asc_rank
    FROM source_data as sd
    QUALIFY asc_rank = 1
),
unique_source_records AS (
    SELECT DISTINCT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE
    FROM source_data as sd
    QUALIFY sd.HASHDIFF != LAG(sd.HASHDIFF) OVER (
        PARTITION BY sd.CUSTOMER_HK
        ORDER BY sd.LOAD_DATETIME ASC)
),
ghost AS (
    SELECT
    NULL AS CUSTOMER_NAME,
    NULL AS CUSTOMER_PHONE,
    NULL AS CUSTOMER_ADDRESS,
    NULL AS ACCBAL,
    NULL AS MKTSEGMENT,
    NULL AS COMMENT,
    TO_TIMESTAMP_NTZ('1900-01-01 00:00:00') AS LOAD_DATETIME,
    TO_TIMESTAMP_NTZ('1900-01-01 00:00:00') AS EFFECTIVE_FROM,
    CAST('AUTOMATE_DV_SYSTEM' AS VARCHAR) AS RECORD_SOURCE,
    CAST('00000000000000000000000000000000' AS BINARY(16)) AS CUSTOMER_HK,
    CAST('00000000000000000000000000000000' AS BINARY(16)) AS HASHDIFF
),
records_to_insert AS (
    SELECT
        g.CUSTOMER_HK, g.HASHDIFF, g.CUSTOMER_NAME, g.CUSTOMER_ADDRESS, g.CUSTOMER_PHONE, g.ACCBAL, g.MKTSEGMENT, g.COMMENT, g.EFFECTIVE_FROM, g.LOAD_DATETIME, g.RECORD_SOURCE
        FROM ghost AS g
        WHERE NOT EXISTS ( SELECT 1 FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.satellite_ghost_incremental AS h WHERE h.HASHDIFF = g.HASHDIFF )
    UNION
    SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
    FROM first_record_in_set AS frin
    LEFT JOIN LATEST_RECORDS lr
        ON lr.CUSTOMER_HK = frin.CUSTOMER_HK
        AND lr.HASHDIFF = frin.HASHDIFF
        WHERE lr.HASHDIFF IS NULL
    UNION
    SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
    FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
    SELECT
    sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
    RANK() OVER (
            PARTITION BY sd.CUSTOMER_HK
            ORDER BY sd.LOAD_DATETIME ASC
        ) as asc_rank
    FROM source_data as sd
    QUALIFY asc_rank = 1
),
unique_source_records AS (
    SELECT DISTINCT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE
    FROM source_data as sd
    QUALIFY sd.HASHDIFF != LAG(sd.HASHDIFF) OVER (
        PARTITION BY sd.CUSTOMER_HK
        ORDER BY sd.LOAD_DATETIME ASC)
),
records_to_insert AS (
    SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
    FROM first_record_in_set AS frin
    UNION DISTINCT
    SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
    FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
latest_records AS (
    SELECT current_records.CUSTOMER_HK, current_records.HASHDIFF, current_records.CUSTOMER_NAME, current_records.CUSTOMER_ADDRESS, current_records.CUSTOMER_PHONE, current_records.ACCBAL, current_records.MKTSEGMENT, current_records.COMMENT, current_records.EFFECTIVE_FROM, current_records.LOAD_DATETIME, current_records.RECORD_SOURCE,
        RANK() OVER (
           PARTITION BY current_records.CUSTOMER_HK
           ORDER BY current_records.LOAD_DATETIME DESC
        ) AS rank_num
    FROM `dbtvault-341416`.`dbtvault`.`satellite_incremental` AS current_records
        JOIN (
            SELECT DISTINCT source_data.CUSTOMER_HK
            FROM source_data
        ) AS source_records
            ON source_records.CUSTOMER_HK = current_records.CUSTOMER_HK
    QUALIFY rank_num = 1
),
first_record_in_set AS (
    SELECT
    sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
    RANK() OVER (
            PARTITION BY sd.CUSTOMER_HK
            ORDER BY sd.LOAD_DATETIME ASC
        ) as asc_rank
    FROM source_data as sd
    QUALIFY asc_rank = 1
),
unique_source_records AS (
    SELECT DISTINCT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE
    FROM source_data as sd
    QUALIFY sd.HASHDIFF != LAG(sd.HASHDIFF) OVER (
        PARTITION BY sd.CUSTOMER_HK
        ORDER BY sd.LOAD_DATETIME ASC)
),
records_to_insert AS (
    SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
    FROM first_record_in_set AS frin
    LEFT JOIN LATEST_RECORDS lr
        ON lr.CUSTOMER_HK = frin.CUSTOMER_HK
        AND lr.HASHDIFF = frin.HASHDIFF
        WHERE lr.HASHDIFF IS NULL
    UNION DISTINCT
    SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
    FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
    SELECT
    sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
    RANK() OVER (
            PARTITION BY sd.CUSTOMER_HK
            ORDER BY sd.LOAD_DATETIME ASC
        ) as asc_rank
    FROM source_data as sd
    QUALIFY asc_rank = 1
),
unique_source_records AS (
    SELECT DISTINCT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE
    FROM source_data as sd
    QUALIFY sd.HASHDIFF != LAG(sd.HASHDIFF) OVER (
        PARTITION BY sd.CUSTOMER_HK
        ORDER BY sd.LOAD_DATETIME ASC)
),
ghost AS (
    SELECT
    CAST(NULL AS STRING) AS CUSTOMER_NAME,
    CAST(NULL AS STRING) AS CUSTOMER_PHONE,
    CAST(NULL AS STRING) AS CUSTOMER_ADDRESS,
    CAST(NULL AS FLOAT64) AS ACCBAL,
    CAST(NULL AS STRING) AS MKTSEGMENT,
    CAST(NULL AS STRING) AS COMMENT,
    PARSE_DATETIME('%F %H:%M:%E6S', '1900-01-01 00:00:00') AS LOAD_DATETIME,
    PARSE_DATETIME('%F %H:%M:%E6S', '1900-01-01 00:00:00') AS EFFECTIVE_FROM,
    CAST('AUTOMATE_DV_SYSTEM' AS STRING) AS RECORD_SOURCE,
    CAST('00000000000000000000000000000000' AS STRING) AS CUSTOMER_HK,
    CAST('00000000000000000000000000000000' AS STRING) AS HASHDIFF
),
records_to_insert AS (
    SELECT
        g.CUSTOMER_HK, g.HASHDIFF, g.CUSTOMER_NAME, g.CUSTOMER_ADDRESS, g.CUSTOMER_PHONE, g.ACCBAL, g.MKTSEGMENT, g.COMMENT, g.EFFECTIVE_FROM, g.LOAD_DATETIME, g.RECORD_SOURCE
        FROM ghost AS g
    UNION DISTINCT
    SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
    FROM first_record_in_set AS frin
    UNION DISTINCT
    SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
    FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
latest_records AS (
    SELECT current_records.CUSTOMER_HK, current_records.HASHDIFF, current_records.CUSTOMER_NAME, current_records.CUSTOMER_ADDRESS, current_records.CUSTOMER_PHONE, current_records.ACCBAL, current_records.MKTSEGMENT, current_records.COMMENT, current_records.EFFECTIVE_FROM, current_records.LOAD_DATETIME, current_records.RECORD_SOURCE,
        RANK() OVER (
           PARTITION BY current_records.CUSTOMER_HK
           ORDER BY current_records.LOAD_DATETIME DESC
        ) AS rank_num
    FROM `dbtvault-341416`.`dbtvault`.`satellite_ghost_incremental` AS current_records
        JOIN (
            SELECT DISTINCT source_data.CUSTOMER_HK
            FROM source_data
        ) AS source_records
            ON source_records.CUSTOMER_HK = current_records.CUSTOMER_HK
    QUALIFY rank_num = 1
),
first_record_in_set AS (
    SELECT
    sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
    RANK() OVER (
            PARTITION BY sd.CUSTOMER_HK
            ORDER BY sd.LOAD_DATETIME ASC
        ) as asc_rank
    FROM source_data as sd
    QUALIFY asc_rank = 1
),
unique_source_records AS (
    SELECT DISTINCT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE
    FROM source_data as sd
    QUALIFY sd.HASHDIFF != LAG(sd.HASHDIFF) OVER (
        PARTITION BY sd.CUSTOMER_HK
        ORDER BY sd.LOAD_DATETIME ASC)
),
ghost AS (
    SELECT
    CAST(NULL AS STRING) AS CUSTOMER_NAME,
    CAST(NULL AS STRING) AS CUSTOMER_PHONE,
    CAST(NULL AS STRING) AS CUSTOMER_ADDRESS,
    CAST(NULL AS FLOAT64) AS ACCBAL,
    CAST(NULL AS STRING) AS MKTSEGMENT,
    CAST(NULL AS STRING) AS COMMENT,
    PARSE_DATETIME('%F %H:%M:%E6S', '1900-01-01 00:00:00') AS LOAD_DATETIME,
    PARSE_DATETIME('%F %H:%M:%E6S', '1900-01-01 00:00:00') AS EFFECTIVE_FROM,
    CAST('AUTOMATE_DV_SYSTEM' AS STRING) AS RECORD_SOURCE,
    CAST('00000000000000000000000000000000' AS STRING) AS CUSTOMER_HK,
    CAST('00000000000000000000000000000000' AS STRING) AS HASHDIFF
),
records_to_insert AS (
    SELECT
        g.CUSTOMER_HK, g.HASHDIFF, g.CUSTOMER_NAME, g.CUSTOMER_ADDRESS, g.CUSTOMER_PHONE, g.ACCBAL, g.MKTSEGMENT, g.COMMENT, g.EFFECTIVE_FROM, g.LOAD_DATETIME, g.RECORD_SOURCE
        FROM ghost AS g
        WHERE NOT EXISTS ( SELECT 1 FROM `dbtvault-341416`.`dbtvault`.`satellite_ghost_incremental` AS h WHERE h.HASHDIFF = g.HASHDIFF )
    UNION DISTINCT
    SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
    FROM first_record_in_set AS frin
    LEFT JOIN LATEST_RECORDS lr
        ON lr.CUSTOMER_HK = frin.CUSTOMER_HK
        AND lr.HASHDIFF = frin.HASHDIFF
        WHERE lr.HASHDIFF IS NULL
    UNION DISTINCT
    SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
    FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
    SELECT * FROM (
        SELECT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
        RANK() OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC
            ) as asc_rank
        FROM source_data as sd ) rin
    WHERE rin.asc_rank = 1
),
unique_source_records AS (
    SELECT
        b.CUSTOMER_HK, b.HASHDIFF, b.CUSTOMER_NAME, b.CUSTOMER_ADDRESS, b.CUSTOMER_PHONE, b.ACCBAL, b.MKTSEGMENT, b.COMMENT, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE
    FROM (
        SELECT DISTINCT
            sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
            LAG(sd.HASHDIFF) OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC) as prev_hashdiff
        FROM source_data as sd
        ) b
    WHERE b.HASHDIFF != b.prev_hashdiff
),
records_to_insert AS (
        SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
        FROM first_record_in_set AS frin
        UNION
        SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
        FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
latest_records AS (
    SELECT b.CUSTOMER_HK, b.HASHDIFF, b.CUSTOMER_NAME, b.CUSTOMER_ADDRESS, b.CUSTOMER_PHONE, b.ACCBAL, b.MKTSEGMENT, b.COMMENT, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE
    FROM (
        SELECT current_records.CUSTOMER_HK, current_records.HASHDIFF, current_records.CUSTOMER_NAME, current_records.CUSTOMER_ADDRESS, current_records.CUSTOMER_PHONE, current_records.ACCBAL, current_records.MKTSEGMENT, current_records.COMMENT, current_records.EFFECTIVE_FROM, current_records.LOAD_DATETIME, current_records.RECORD_SOURCE,
            RANK() OVER (
               PARTITION BY current_records.CUSTOMER_HK
               ORDER BY current_records.LOAD_DATETIME DESC
            ) AS rank
        FROM "AUTOMATE_DV_TEST"."TEST"."satellite_incremental" AS current_records
            JOIN (
                SELECT DISTINCT source_data.CUSTOMER_HK
                FROM source_data
            ) AS source_records
                ON source_records.CUSTOMER_HK = current_records.CUSTOMER_HK
    ) AS b
    WHERE b.rank = 1
),
first_record_in_set AS (
    SELECT * FROM (
        SELECT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
        RANK() OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC
            ) as asc_rank
        FROM source_data as sd ) rin
    WHERE rin.asc_rank = 1
),
unique_source_records AS (
    SELECT
        b.CUSTOMER_HK, b.HASHDIFF, b.CUSTOMER_NAME, b.CUSTOMER_ADDRESS, b.CUSTOMER_PHONE, b.ACCBAL, b.MKTSEGMENT, b.COMMENT, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE
    FROM (
        SELECT DISTINCT
            sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
            LAG(sd.HASHDIFF) OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC) as prev_hashdiff
        FROM source_data as sd
        ) b
    WHERE b.HASHDIFF != b.prev_hashdiff
),
records_to_insert AS (
        SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
        FROM first_record_in_set AS frin
        LEFT JOIN LATEST_RECORDS lr
            ON lr.CUSTOMER_HK = frin.CUSTOMER_HK
            AND lr.HASHDIFF = frin.HASHDIFF
            WHERE lr.HASHDIFF IS NULL
        UNION
        SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
        FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
    SELECT * FROM (
        SELECT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
        RANK() OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC
            ) as asc_rank
        FROM source_data as sd ) rin
    WHERE rin.asc_rank = 1
),
unique_source_records AS (
    SELECT
        b.CUSTOMER_HK, b.HASHDIFF, b.CUSTOMER_NAME, b.CUSTOMER_ADDRESS, b.CUSTOMER_PHONE, b.ACCBAL, b.MKTSEGMENT, b.COMMENT, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE
    FROM (
        SELECT DISTINCT
            sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
            LAG(sd.HASHDIFF) OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC) as prev_hashdiff
        FROM source_data as sd
        ) b
    WHERE b.HASHDIFF != b.prev_hashdiff
),
ghost AS (
    SELECT
    CAST(NULL AS varchar) AS CUSTOMER_NAME,
    CAST(NULL AS varchar) AS CUSTOMER_PHONE,
    CAST(NULL AS varchar) AS CUSTOMER_ADDRESS,
    CAST(NULL AS float) AS ACCBAL,
    CAST(NULL AS varchar) AS MKTSEGMENT,
    CAST(NULL AS varchar) AS COMMENT,
    CONVERT(DATE, '1900-01-01') AS LOAD_DATETIME,
    CONVERT(DATE, '1900-01-01') AS EFFECTIVE_FROM,
    CAST('AUTOMATE_DV_SYSTEM' AS varchar) AS RECORD_SOURCE,
    CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(16)), 16) AS BINARY(16)) AS CUSTOMER_HK,
    CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(16)), 16) AS BINARY(16)) AS HASHDIFF
),
records_to_insert AS (
    SELECT
        g.CUSTOMER_HK, g.HASHDIFF, g.CUSTOMER_NAME, g.CUSTOMER_ADDRESS, g.CUSTOMER_PHONE, g.ACCBAL, g.MKTSEGMENT, g.COMMENT, g.EFFECTIVE_FROM, g.LOAD_DATETIME, g.RECORD_SOURCE
        FROM ghost AS g
    UNION
        SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
        FROM first_record_in_set AS frin
        UNION
        SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
        FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
latest_records AS (
    SELECT b.CUSTOMER_HK, b.HASHDIFF, b.CUSTOMER_NAME, b.CUSTOMER_ADDRESS, b.CUSTOMER_PHONE, b.ACCBAL, b.MKTSEGMENT, b.COMMENT, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE
    FROM (
        SELECT current_records.CUSTOMER_HK, current_records.HASHDIFF, current_records.CUSTOMER_NAME, current_records.CUSTOMER_ADDRESS, current_records.CUSTOMER_PHONE, current_records.ACCBAL, current_records.MKTSEGMENT, current_records.COMMENT, current_records.EFFECTIVE_FROM, current_records.LOAD_DATETIME, current_records.RECORD_SOURCE,
            RANK() OVER (
               PARTITION BY current_records.CUSTOMER_HK
               ORDER BY current_records.LOAD_DATETIME DESC
            ) AS rank
        FROM "AUTOMATE_DV_TEST"."TEST"."satellite_ghost_incremental" AS current_records
            JOIN (
                SELECT DISTINCT source_data.CUSTOMER_HK
                FROM source_data
            ) AS source_records
                ON source_records.CUSTOMER_HK = current_records.CUSTOMER_HK
    ) AS b
    WHERE b.rank = 1
),
first_record_in_set AS (
    SELECT * FROM (
        SELECT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
        RANK() OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC
            ) as asc_rank
        FROM source_data as sd ) rin
    WHERE rin.asc_rank = 1
),
unique_source_records AS (
    SELECT
        b.CUSTOMER_HK, b.HASHDIFF, b.CUSTOMER_NAME, b.CUSTOMER_ADDRESS, b.CUSTOMER_PHONE, b.ACCBAL, b.MKTSEGMENT, b.COMMENT, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE
    FROM (
        SELECT DISTINCT
            sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
            LAG(sd.HASHDIFF) OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC) as prev_hashdiff
        FROM source_data as sd
        ) b
    WHERE b.HASHDIFF != b.prev_hashdiff
),
ghost AS (
    SELECT
    CAST(NULL AS varchar) AS CUSTOMER_NAME,
    CAST(NULL AS varchar) AS CUSTOMER_PHONE,
    CAST(NULL AS varchar) AS CUSTOMER_ADDRESS,
    CAST(NULL AS float) AS ACCBAL,
    CAST(NULL AS varchar) AS MKTSEGMENT,
    CAST(NULL AS varchar) AS COMMENT,
    CONVERT(DATE, '1900-01-01') AS LOAD_DATETIME,
    CONVERT(DATE, '1900-01-01') AS EFFECTIVE_FROM,
    CAST('AUTOMATE_DV_SYSTEM' AS varchar) AS RECORD_SOURCE,
    CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(16)), 16) AS BINARY(16)) AS CUSTOMER_HK,
    CAST(REPLICATE(CAST(CAST('0' AS tinyint) AS BINARY(16)), 16) AS BINARY(16)) AS HASHDIFF
),
records_to_insert AS (
    SELECT
        g.CUSTOMER_HK, g.HASHDIFF, g.CUSTOMER_NAME, g.CUSTOMER_ADDRESS, g.CUSTOMER_PHONE, g.ACCBAL, g.MKTSEGMENT, g.COMMENT, g.EFFECTIVE_FROM, g.LOAD_DATETIME, g.RECORD_SOURCE
        FROM ghost AS g
        WHERE NOT EXISTS ( SELECT 1 FROM "AUTOMATE_DV_TEST"."TEST"."satellite_ghost_incremental" AS h WHERE h.HASHDIFF = g.HASHDIFF )
    UNION
        SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
        FROM first_record_in_set AS frin
        LEFT JOIN LATEST_RECORDS lr
            ON lr.CUSTOMER_HK = frin.CUSTOMER_HK
            AND lr.HASHDIFF = frin.HASHDIFF
            WHERE lr.HASHDIFF IS NULL
        UNION
        SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
        FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
    SELECT * FROM (
        SELECT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
        RANK() OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC
            ) as asc_rank
        FROM source_data as sd ) rin
    WHERE rin.asc_rank = 1
),
unique_source_records AS (
    SELECT
        b.CUSTOMER_HK, b.HASHDIFF, b.CUSTOMER_NAME, b.CUSTOMER_ADDRESS, b.CUSTOMER_PHONE, b.ACCBAL, b.MKTSEGMENT, b.COMMENT, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE
    FROM (
        SELECT DISTINCT
            sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
            LAG(sd.HASHDIFF) OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC) as prev_hashdiff
        FROM source_data as sd
        ) b
    WHERE b.HASHDIFF != b.prev_hashdiff
),
records_to_insert AS (
        SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
        FROM first_record_in_set AS frin
        UNION
        SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
        FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
latest_records AS (
    SELECT b.CUSTOMER_HK, b.HASHDIFF, b.CUSTOMER_NAME, b.CUSTOMER_ADDRESS, b.CUSTOMER_PHONE, b.ACCBAL, b.MKTSEGMENT, b.COMMENT, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE
    FROM (
        SELECT current_records.CUSTOMER_HK, current_records.HASHDIFF, current_records.CUSTOMER_NAME, current_records.CUSTOMER_ADDRESS, current_records.CUSTOMER_PHONE, current_records.ACCBAL, current_records.MKTSEGMENT, current_records.COMMENT, current_records.EFFECTIVE_FROM, current_records.LOAD_DATETIME, current_records.RECORD_SOURCE,
            RANK() OVER (
               PARTITION BY current_records.CUSTOMER_HK
               ORDER BY current_records.LOAD_DATETIME DESC
            ) AS rank
        FROM "dbtvault_db"."development"."satellite_incremental" AS current_records
            JOIN (
                SELECT DISTINCT source_data.CUSTOMER_HK
                FROM source_data
            ) AS source_records
                ON source_records.CUSTOMER_HK = current_records.CUSTOMER_HK
    ) AS b
    WHERE b.rank = 1
),
first_record_in_set AS (
    SELECT * FROM (
        SELECT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
        RANK() OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC
            ) as asc_rank
        FROM source_data as sd ) rin
    WHERE rin.asc_rank = 1
),
unique_source_records AS (
    SELECT
        b.CUSTOMER_HK, b.HASHDIFF, b.CUSTOMER_NAME, b.CUSTOMER_ADDRESS, b.CUSTOMER_PHONE, b.ACCBAL, b.MKTSEGMENT, b.COMMENT, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE
    FROM (
        SELECT DISTINCT
            sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
            LAG(sd.HASHDIFF) OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC) as prev_hashdiff
        FROM source_data as sd
        ) b
    WHERE b.HASHDIFF != b.prev_hashdiff
),
records_to_insert AS (
        SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
        FROM first_record_in_set AS frin
        LEFT JOIN LATEST_RECORDS lr
            ON lr.CUSTOMER_HK = frin.CUSTOMER_HK
            AND lr.HASHDIFF = frin.HASHDIFF
            WHERE lr.HASHDIFF IS NULL
        UNION
        SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
        FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
    SELECT * FROM (
        SELECT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
        RANK() OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC
            ) as asc_rank
        FROM source_data as sd ) rin
    WHERE rin.asc_rank = 1
),
unique_source_records AS (
    SELECT
        b.CUSTOMER_HK, b.HASHDIFF, b.CUSTOMER_NAME, b.CUSTOMER_ADDRESS, b.CUSTOMER_PHONE, b.ACCBAL, b.MKTSEGMENT, b.COMMENT, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE
    FROM (
        SELECT DISTINCT
            sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
            LAG(sd.HASHDIFF) OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC) as prev_hashdiff
        FROM source_data as sd
        ) b
    WHERE b.HASHDIFF != b.prev_hashdiff
),
ghost AS (
    SELECT
    CAST(NULL AS text) AS customer_name,
    CAST(NULL AS text) AS customer_phone,
    CAST(NULL AS text) AS customer_address,
    CAST(NULL AS double precision) AS accbal,
    CAST(NULL AS text) AS mktsegment,
    CAST(NULL AS text) AS comment,
    TO_DATE('1900-01-01', 'YYY-MM-DD') AS load_datetime,
    TO_DATE('1900-01-01', 'YYY-MM-DD') AS effective_from,
    CAST('AUTOMATE_DV_SYSTEM' AS text) AS RECORD_SOURCE,
    CAST('00000000000000000000000000000000' AS BYTEA) AS customer_hk,
    CAST('00000000000000000000000000000000' AS BYTEA) AS hashdiff
),
records_to_insert AS (
    SELECT
        g.CUSTOMER_HK, g.HASHDIFF, g.CUSTOMER_NAME, g.CUSTOMER_ADDRESS, g.CUSTOMER_PHONE, g.ACCBAL, g.MKTSEGMENT, g.COMMENT, g.EFFECTIVE_FROM, g.LOAD_DATETIME, g.RECORD_SOURCE
        FROM ghost AS g
    UNION
        SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
        FROM first_record_in_set AS frin
        UNION
        SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
        FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
latest_records AS (
    SELECT b.CUSTOMER_HK, b.HASHDIFF, b.CUSTOMER_NAME, b.CUSTOMER_ADDRESS, b.CUSTOMER_PHONE, b.ACCBAL, b.MKTSEGMENT, b.COMMENT, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE
    FROM (
        SELECT current_records.CUSTOMER_HK, current_records.HASHDIFF, current_records.CUSTOMER_NAME, current_records.CUSTOMER_ADDRESS, current_records.CUSTOMER_PHONE, current_records.ACCBAL, current_records.MKTSEGMENT, current_records.COMMENT, current_records.EFFECTIVE_FROM, current_records.LOAD_DATETIME, current_records.RECORD_SOURCE,
            RANK() OVER (
               PARTITION BY current_records.CUSTOMER_HK
               ORDER BY current_records.LOAD_DATETIME DESC
            ) AS rank
        FROM "dbtvault_db"."development"."satellite_ghost_incremental" AS current_records
            JOIN (
                SELECT DISTINCT source_data.CUSTOMER_HK
                FROM source_data
            ) AS source_records
                ON source_records.CUSTOMER_HK = current_records.CUSTOMER_HK
    ) AS b
    WHERE b.rank = 1
),
first_record_in_set AS (
    SELECT * FROM (
        SELECT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
        RANK() OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC
            ) as asc_rank
        FROM source_data as sd ) rin
    WHERE rin.asc_rank = 1
),
unique_source_records AS (
    SELECT
        b.CUSTOMER_HK, b.HASHDIFF, b.CUSTOMER_NAME, b.CUSTOMER_ADDRESS, b.CUSTOMER_PHONE, b.ACCBAL, b.MKTSEGMENT, b.COMMENT, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE
    FROM (
        SELECT DISTINCT
            sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
            LAG(sd.HASHDIFF) OVER (
                PARTITION BY sd.CUSTOMER_HK
                ORDER BY sd.LOAD_DATETIME ASC) as prev_hashdiff
        FROM source_data as sd
        ) b
    WHERE b.HASHDIFF != b.prev_hashdiff
),
ghost AS (
    SELECT
    CAST(NULL AS text) AS customer_name,
    CAST(NULL AS text) AS customer_phone,
    CAST(NULL AS text) AS customer_address,
    CAST(NULL AS double precision) AS accbal,
    CAST(NULL AS text) AS mktsegment,
    CAST(NULL AS text) AS comment,
    TO_DATE('1900-01-01', 'YYY-MM-DD') AS load_datetime,
    TO_DATE('1900-01-01', 'YYY-MM-DD') AS effective_from,
    CAST('AUTOMATE_DV_SYSTEM' AS text) AS RECORD_SOURCE,
    CAST('00000000000000000000000000000000' AS BYTEA) AS customer_hk,
    CAST('00000000000000000000000000000000' AS BYTEA) AS hashdiff
),
records_to_insert AS (
    SELECT
        g.CUSTOMER_HK, g.HASHDIFF, g.CUSTOMER_NAME, g.CUSTOMER_ADDRESS, g.CUSTOMER_PHONE, g.ACCBAL, g.MKTSEGMENT, g.COMMENT, g.EFFECTIVE_FROM, g.LOAD_DATETIME, g.RECORD_SOURCE
        FROM ghost AS g
        WHERE NOT EXISTS ( SELECT 1 FROM "dbtvault_db"."development"."satellite_ghost_incremental" AS h WHERE h.HASHDIFF = g.HASHDIFF )
    UNION
        SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
        FROM first_record_in_set AS frin
        LEFT JOIN LATEST_RECORDS lr
            ON lr.CUSTOMER_HK = frin.CUSTOMER_HK
            AND lr.HASHDIFF = frin.HASHDIFF
            WHERE lr.HASHDIFF IS NULL
        UNION
        SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
        FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
    SELECT
    sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
    RANK() OVER (
            PARTITION BY sd.CUSTOMER_HK
            ORDER BY sd.LOAD_DATETIME ASC
        ) as asc_rank
    FROM source_data as sd
    QUALIFY asc_rank = 1
),
unique_source_records AS (
    SELECT DISTINCT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE
    FROM source_data as sd
    QUALIFY sd.HASHDIFF != LAG(sd.HASHDIFF) OVER (
        PARTITION BY sd.CUSTOMER_HK
        ORDER BY sd.LOAD_DATETIME ASC)
),
records_to_insert AS (
    SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
    FROM first_record_in_set AS frin
    UNION
    SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
    FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
latest_records AS (
    SELECT current_records.CUSTOMER_HK, current_records.HASHDIFF, current_records.CUSTOMER_NAME, current_records.CUSTOMER_ADDRESS, current_records.CUSTOMER_PHONE, current_records.ACCBAL, current_records.MKTSEGMENT, current_records.COMMENT, current_records.EFFECTIVE_FROM, current_records.LOAD_DATETIME, current_records.RECORD_SOURCE,
        RANK() OVER (
           PARTITION BY current_records.CUSTOMER_HK
           ORDER BY current_records.LOAD_DATETIME DESC
        ) AS rank_num
    FROM `hive_metastore`.`dbtvault`.`satellite_incremental` AS current_records
        JOIN (
            SELECT DISTINCT source_data.CUSTOMER_HK
            FROM source_data
        ) AS source_records
            ON source_records.CUSTOMER_HK = current_records.CUSTOMER_HK
    QUALIFY rank_num = 1
),
first_record_in_set AS (
    SELECT
    sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
    RANK() OVER (
            PARTITION BY sd.CUSTOMER_HK
            ORDER BY sd.LOAD_DATETIME ASC
        ) as asc_rank
    FROM source_data as sd
    QUALIFY asc_rank = 1
),
unique_source_records AS (
    SELECT DISTINCT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE
    FROM source_data as sd
    QUALIFY sd.HASHDIFF != LAG(sd.HASHDIFF) OVER (
        PARTITION BY sd.CUSTOMER_HK
        ORDER BY sd.LOAD_DATETIME ASC)
),
records_to_insert AS (
    SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
    FROM first_record_in_set AS frin
    LEFT JOIN LATEST_RECORDS lr
        ON lr.CUSTOMER_HK = frin.CUSTOMER_HK
        AND lr.HASHDIFF = frin.HASHDIFF
        WHERE lr.HASHDIFF IS NULL
    UNION
    SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
    FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
first_record_in_set AS (
    SELECT
    sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
    RANK() OVER (
            PARTITION BY sd.CUSTOMER_HK
            ORDER BY sd.LOAD_DATETIME ASC
        ) as asc_rank
    FROM source_data as sd
    QUALIFY asc_rank = 1
),
unique_source_records AS (
    SELECT DISTINCT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE
    FROM source_data as sd
    QUALIFY sd.HASHDIFF != LAG(sd.HASHDIFF) OVER (
        PARTITION BY sd.CUSTOMER_HK
        ORDER BY sd.LOAD_DATETIME ASC)
),
ghost AS (
    SELECT
    NULL AS CUSTOMER_NAME,
    NULL AS CUSTOMER_PHONE,
    NULL AS CUSTOMER_ADDRESS,
    NULL AS ACCBAL,
    NULL AS MKTSEGMENT,
    NULL AS COMMENT,
    TO_DATE('1900-01-01') AS LOAD_DATETIME,
    TO_DATE('1900-01-01') AS EFFECTIVE_FROM,
    CAST('AUTOMATE_DV_SYSTEM' AS string) AS RECORD_SOURCE,
    CAST('00000000000000000000000000000000' AS STRING) AS CUSTOMER_HK,
    CAST('00000000000000000000000000000000' AS STRING) AS HASHDIFF
),
records_to_insert AS (
    SELECT
        g.CUSTOMER_HK, g.HASHDIFF, g.CUSTOMER_NAME, g.CUSTOMER_ADDRESS, g.CUSTOMER_PHONE, g.ACCBAL, g.MKTSEGMENT, g.COMMENT, g.EFFECTIVE_FROM, g.LOAD_DATETIME, g.RECORD_SOURCE
        FROM ghost AS g
    UNION
    SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
    FROM first_record_in_set AS frin
    UNION
    SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
    FROM unique_source_records as usr
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
WITH source_data AS (
    SELECT a.CUSTOMER_HK, a.HASHDIFF, a.CUSTOMER_NAME, a.CUSTOMER_ADDRESS, a.CUSTOMER_PHONE, a.ACCBAL, a.MKTSEGMENT, a.COMMENT, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
),
latest_records AS (
    SELECT current_records.CUSTOMER_HK, current_records.HASHDIFF, current_records.CUSTOMER_NAME, current_records.CUSTOMER_ADDRESS, current_records.CUSTOMER_PHONE, current_records.ACCBAL, current_records.MKTSEGMENT, current_records.COMMENT, current_records.EFFECTIVE_FROM, current_records.LOAD_DATETIME, current_records.RECORD_SOURCE,
        RANK() OVER (
           PARTITION BY current_records.CUSTOMER_HK
           ORDER BY current_records.LOAD_DATETIME DESC
        ) AS rank_num
    FROM `hive_metastore`.`dbtvault`.`satellite_ghost_incremental` AS current_records
        JOIN (
            SELECT DISTINCT source_data.CUSTOMER_HK
            FROM source_data
        ) AS source_records
            ON source_records.CUSTOMER_HK = current_records.CUSTOMER_HK
    QUALIFY rank_num = 1
),
first_record_in_set AS (
    SELECT
    sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE,
    RANK() OVER (
            PARTITION BY sd.CUSTOMER_HK
            ORDER BY sd.LOAD_DATETIME ASC
        ) as asc_rank
    FROM source_data as sd
    QUALIFY asc_rank = 1
),
unique_source_records AS (
    SELECT DISTINCT
        sd.CUSTOMER_HK, sd.HASHDIFF, sd.CUSTOMER_NAME, sd.CUSTOMER_ADDRESS, sd.CUSTOMER_PHONE, sd.ACCBAL, sd.MKTSEGMENT, sd.COMMENT, sd.EFFECTIVE_FROM, sd.LOAD_DATETIME, sd.RECORD_SOURCE
    FROM source_data as sd
    QUALIFY sd.HASHDIFF != LAG(sd.HASHDIFF) OVER (
        PARTITION BY sd.CUSTOMER_HK
        ORDER BY sd.LOAD_DATETIME ASC)
),
ghost AS (
    SELECT
    NULL AS CUSTOMER_NAME,
    NULL AS CUSTOMER_PHONE,
    NULL AS CUSTOMER_ADDRESS,
    NULL AS ACCBAL,
    NULL AS MKTSEGMENT,
    NULL AS COMMENT,
    TO_DATE('1900-01-01') AS LOAD_DATETIME,
    TO_DATE('1900-01-01') AS EFFECTIVE_FROM,
    CAST('AUTOMATE_DV_SYSTEM' AS string) AS RECORD_SOURCE,
    CAST('00000000000000000000000000000000' AS STRING) AS CUSTOMER_HK,
    CAST('00000000000000000000000000000000' AS STRING) AS HASHDIFF
),
records_to_insert AS (
    SELECT
        g.CUSTOMER_HK, g.HASHDIFF, g.CUSTOMER_NAME, g.CUSTOMER_ADDRESS, g.CUSTOMER_PHONE, g.ACCBAL, g.MKTSEGMENT, g.COMMENT, g.EFFECTIVE_FROM, g.LOAD_DATETIME, g.RECORD_SOURCE
        FROM ghost AS g
        WHERE NOT EXISTS ( SELECT 1 FROM `hive_metastore`.`dbtvault`.`satellite_ghost_incremental` AS h WHERE h.HASHDIFF = g.HASHDIFF )
    UNION
    SELECT frin.CUSTOMER_HK, frin.HASHDIFF, frin.CUSTOMER_NAME, frin.CUSTOMER_ADDRESS, frin.CUSTOMER_PHONE, frin.ACCBAL, frin.MKTSEGMENT, frin.COMMENT, frin.EFFECTIVE_FROM, frin.LOAD_DATETIME, frin.RECORD_SOURCE
    FROM first_record_in_set AS frin
    LEFT JOIN LATEST_RECORDS lr
        ON lr.CUSTOMER_HK = frin.CUSTOMER_HK
        AND lr.HASHDIFF = frin.HASHDIFF
        WHERE lr.HASHDIFF IS NULL
    UNION
    SELECT usr.CUSTOMER_HK, usr.HASHDIFF, usr.CUSTOMER_NAME, usr.CUSTOMER_ADDRESS, usr.CUSTOMER_PHONE, usr.ACCBAL, usr.MKTSEGMENT, usr.COMMENT, usr.EFFECTIVE_FROM, usr.LOAD_DATETIME, usr.RECORD_SOURCE
    FROM unique_source_records as usr
)
SELECT * FROM records_to_insert

Hashdiff Aliasing

If you have multiple Satellites using a single stage as its data source, then you will need to use hashdiff aliasing

Excluding columns from the payload

An exclude_columns flag can be provided for payload columns which will invert the selection of columns provided in the list of columns.

This is extremely useful when a payload is composed of many columns, and you do not wish to individually provide all the columns.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{%- set yaml_metadata -%}
source_model: v_stg_orders
src_pk: CUSTOMER_HK
src_hashdiff: CUSTOMER_HASHDIFF
src_payload:
  exclude_columns: true
  columns:
    - NAME
    - PHONE
src_eff: EFFECTIVE_FROM
src_ldts: LOAD_DATETIME
src_source: RECORD_SOURCE
{%- endset -%}

{% set metadata_dict = fromyaml(yaml_metadata) %}

{{ automate_dv.sat(src_pk=metadata_dict["src_pk"],
                   src_hashdiff=metadata_dict["src_hashdiff"],
                   src_payload=metadata_dict["src_payload"],
                   src_eff=metadata_dict["src_eff"],
                   src_ldts=metadata_dict["src_ldts"],
                   src_source=metadata_dict["src_source"],
                   source_model=metadata_dict["source_model"]) }}

Using the configuration in the above snippet, if we had the following columns: NAME, PHONE, ADDRESS_LINE_1, EMAIL_ADDRESS, DOB,...

The satellite payload would be created with the following columns: ADDRESS_LINE_1, EMAIL_ADDRESS, DOB,...


eff_sat

view source:

Snowflake BigQuery SQLServer Databricks Postgres

Generates SQL to build an Effectivity Satellite table using the provided parameters.

Usage

1
2
3
4
5
{{ automate_dv.eff_sat(src_pk=src_pk, src_dfk=src_dfk, src_sfk=src_sfk,
                       src_start_date=src_start_date, src_end_date=src_end_date,
                       src_extra_columns=src_extra_columns,
                       src_eff=src_eff, src_ldts=src_ldts, src_source=src_source,
                       source_model=source_model) }}

Parameters

Parameter Description Type Required?
src_pk Source primary key column String
src_dfk Source driving foreign key column List[String]/String
src_sfk Source secondary foreign key column List[String]/String
src_start_date Source start date column String
src_end_date Source end date column String
src_extra_columns Select arbitrary columns from the source List[String]/String
src_eff Source effective from column String
src_ldts Source load date timestamp column String
src_source Name of the column containing the source ID String
source_model Staging model name String

Tip

Read the tutorial for more details

Example Metadata

See examples

Example Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT i.CUSTOMER_ORDER_HK, i.CUSTOMER_HK, i.ORDER_HK, i.START_DATE, i.END_DATE, i.EFFECTIVE_FROM, i.LOAD_DATETIME, i.RECORD_SOURCE
    FROM source_data AS i
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
latest_records AS (
    SELECT * FROM (
        SELECT b.CUSTOMER_ORDER_HK, b.CUSTOMER_HK, b.ORDER_HK, b.START_DATE, b.END_DATE, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE,
               ROW_NUMBER() OVER (
                    PARTITION BY b.CUSTOMER_ORDER_HK
                    ORDER BY b.LOAD_DATETIME DESC
               ) AS row_num
        FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.eff_sat_customer_order_incremental AS b
    )AS inner_rank
        WHERE row_num = 1),
latest_open AS (
    SELECT c.CUSTOMER_ORDER_HK, c.CUSTOMER_HK, c.ORDER_HK, c.START_DATE, c.END_DATE, c.EFFECTIVE_FROM, c.LOAD_DATETIME, c.RECORD_SOURCE
    FROM latest_records AS c
    WHERE TO_DATE(c.END_DATE) = TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
),
latest_closed AS (
    SELECT d.CUSTOMER_ORDER_HK, d.CUSTOMER_HK, d.ORDER_HK, d.START_DATE, d.END_DATE, d.EFFECTIVE_FROM, d.LOAD_DATETIME, d.RECORD_SOURCE
    FROM latest_records AS d
    WHERE TO_DATE(d.END_DATE) != TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
),
new_open_records AS (
    SELECT DISTINCT
        f.CUSTOMER_ORDER_HK,
        f.CUSTOMER_HK, f.ORDER_HK,
        f.START_DATE AS START_DATE,
        f.END_DATE AS END_DATE,
        f.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        f.LOAD_DATETIME,
        f.RECORD_SOURCE
    FROM source_data AS f
    LEFT JOIN latest_records AS lr
    ON f.CUSTOMER_ORDER_HK = lr.CUSTOMER_ORDER_HK
    WHERE lr.CUSTOMER_ORDER_HK IS NULL
),
new_reopened_records AS (
    SELECT DISTINCT
        lc.CUSTOMER_ORDER_HK,
        lc.CUSTOMER_HK, lc.ORDER_HK,
        g.START_DATE AS START_DATE,
        g.END_DATE AS END_DATE,
        g.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        g.LOAD_DATETIME,
        g.RECORD_SOURCE
    FROM source_data AS g
    INNER JOIN latest_closed AS lc
    ON g.CUSTOMER_ORDER_HK = lc.CUSTOMER_ORDER_HK
    WHERE TO_DATE(g.END_DATE) = TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
),
new_closed_records AS (
    SELECT DISTINCT
        lo.CUSTOMER_ORDER_HK,
        lo.CUSTOMER_HK, lo.ORDER_HK,
        h.START_DATE AS START_DATE,
        h.END_DATE AS END_DATE,
        h.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        h.LOAD_DATETIME,
        lo.RECORD_SOURCE
    FROM source_data AS h
    LEFT JOIN latest_open AS lo
    ON lo.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    LEFT JOIN latest_closed AS lc
    ON lc.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    WHERE TO_DATE(h.END_DATE) != TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
    AND lo.CUSTOMER_ORDER_HK IS NOT NULL
    AND lc.CUSTOMER_ORDER_HK IS NULL
),
records_to_insert AS (
    SELECT * FROM new_open_records
    UNION
    SELECT * FROM new_reopened_records
    UNION
    SELECT * FROM new_closed_records
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
latest_records AS (
    SELECT * FROM (
        SELECT b.CUSTOMER_ORDER_HK, b.CUSTOMER_HK, b.ORDER_HK, b.START_DATE, b.END_DATE, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE,
               ROW_NUMBER() OVER (
                    PARTITION BY b.CUSTOMER_ORDER_HK
                    ORDER BY b.LOAD_DATETIME DESC
               ) AS row_num
        FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.eff_sat_customer_order_incremental_nae AS b
    )AS inner_rank
        WHERE row_num = 1),
latest_open AS (
    SELECT c.CUSTOMER_ORDER_HK, c.CUSTOMER_HK, c.ORDER_HK, c.START_DATE, c.END_DATE, c.EFFECTIVE_FROM, c.LOAD_DATETIME, c.RECORD_SOURCE
    FROM latest_records AS c
    WHERE TO_DATE(c.END_DATE) = TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
),
latest_closed AS (
    SELECT d.CUSTOMER_ORDER_HK, d.CUSTOMER_HK, d.ORDER_HK, d.START_DATE, d.END_DATE, d.EFFECTIVE_FROM, d.LOAD_DATETIME, d.RECORD_SOURCE
    FROM latest_records AS d
    WHERE TO_DATE(d.END_DATE) != TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
),
new_open_records AS (
    SELECT DISTINCT
        f.CUSTOMER_ORDER_HK,
        f.CUSTOMER_HK, f.ORDER_HK,
        f.START_DATE AS START_DATE,
        f.END_DATE AS END_DATE,
        f.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        f.LOAD_DATETIME,
        f.RECORD_SOURCE
    FROM source_data AS f
    LEFT JOIN latest_records AS lr
    ON f.CUSTOMER_ORDER_HK = lr.CUSTOMER_ORDER_HK
    WHERE lr.CUSTOMER_ORDER_HK IS NULL
),
new_reopened_records AS (
    SELECT DISTINCT
        lc.CUSTOMER_ORDER_HK,
        lc.CUSTOMER_HK, lc.ORDER_HK,
        g.START_DATE AS START_DATE,
        g.END_DATE AS END_DATE,
        g.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        g.LOAD_DATETIME,
        g.RECORD_SOURCE
    FROM source_data AS g
    INNER JOIN latest_closed AS lc
    ON g.CUSTOMER_ORDER_HK = lc.CUSTOMER_ORDER_HK
    WHERE TO_DATE(g.END_DATE) = TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
),
new_closed_records AS (
    SELECT DISTINCT
        lo.CUSTOMER_ORDER_HK,
        lo.CUSTOMER_HK, lo.ORDER_HK,
        h.START_DATE AS START_DATE,
        h.END_DATE AS END_DATE,
        h.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        h.LOAD_DATETIME,
        lo.RECORD_SOURCE
    FROM source_data AS h
    LEFT JOIN latest_open AS lo
    ON lo.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    LEFT JOIN latest_closed AS lc
    ON lc.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    WHERE TO_DATE(h.END_DATE) != TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
    AND lo.CUSTOMER_ORDER_HK IS NOT NULL
    AND lc.CUSTOMER_ORDER_HK IS NULL
),
records_to_insert AS (
    SELECT * FROM new_open_records
    UNION
    SELECT * FROM new_reopened_records
    UNION
    SELECT * FROM new_closed_records
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT i.CUSTOMER_ORDER_HK, i.CUSTOMER_HK, i.ORDER_HK, i.START_DATE, i.END_DATE, i.EFFECTIVE_FROM, i.LOAD_DATETIME, i.RECORD_SOURCE
    FROM source_data AS i
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
latest_records AS (
    SELECT * FROM (
        SELECT b.CUSTOMER_ORDER_HK, b.CUSTOMER_HK, b.ORDER_HK, b.START_DATE, b.END_DATE, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE,
               ROW_NUMBER() OVER (
                    PARTITION BY b.CUSTOMER_ORDER_HK
                    ORDER BY b.LOAD_DATETIME DESC
               ) AS row_num
        FROM `dbtvault-341416`.`dbtvault`.`eff_sat_customer_order_incremental` AS b
    )AS inner_rank
        WHERE row_num = 1),
latest_open AS (
    SELECT c.CUSTOMER_ORDER_HK, c.CUSTOMER_HK, c.ORDER_HK, c.START_DATE, c.END_DATE, c.EFFECTIVE_FROM, c.LOAD_DATETIME, c.RECORD_SOURCE
    FROM latest_records AS c
    WHERE DATE(c.END_DATE) = DATE(PARSE_DATETIME('%F %H:%M:%E6S', '9999-12-31 23:59:59.999999'))
),
latest_closed AS (
    SELECT d.CUSTOMER_ORDER_HK, d.CUSTOMER_HK, d.ORDER_HK, d.START_DATE, d.END_DATE, d.EFFECTIVE_FROM, d.LOAD_DATETIME, d.RECORD_SOURCE
    FROM latest_records AS d
    WHERE DATE(d.END_DATE) != DATE(PARSE_DATETIME('%F %H:%M:%E6S', '9999-12-31 23:59:59.999999'))
),
new_open_records AS (
    SELECT DISTINCT
        f.CUSTOMER_ORDER_HK,
        f.CUSTOMER_HK, f.ORDER_HK,
        f.START_DATE AS START_DATE,
        f.END_DATE AS END_DATE,
        f.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        f.LOAD_DATETIME,
        f.RECORD_SOURCE
    FROM source_data AS f
    LEFT JOIN latest_records AS lr
    ON f.CUSTOMER_ORDER_HK = lr.CUSTOMER_ORDER_HK
    WHERE lr.CUSTOMER_ORDER_HK IS NULL
),
new_reopened_records AS (
    SELECT DISTINCT
        lc.CUSTOMER_ORDER_HK,
        lc.CUSTOMER_HK, lc.ORDER_HK,
        g.START_DATE AS START_DATE,
        g.END_DATE AS END_DATE,
        g.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        g.LOAD_DATETIME,
        g.RECORD_SOURCE
    FROM source_data AS g
    INNER JOIN latest_closed AS lc
    ON g.CUSTOMER_ORDER_HK = lc.CUSTOMER_ORDER_HK
    WHERE DATE(g.END_DATE) = DATE(PARSE_DATETIME('%F %H:%M:%E6S', '9999-12-31 23:59:59.999999'))
),
new_closed_records AS (
    SELECT DISTINCT
        lo.CUSTOMER_ORDER_HK,
        lo.CUSTOMER_HK, lo.ORDER_HK,
        h.START_DATE AS START_DATE,
        h.END_DATE AS END_DATE,
        h.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        h.LOAD_DATETIME,
        lo.RECORD_SOURCE
    FROM source_data AS h
    LEFT JOIN latest_open AS lo
    ON lo.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    LEFT JOIN latest_closed AS lc
    ON lc.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    WHERE DATE(h.END_DATE) != DATE(PARSE_DATETIME('%F %H:%M:%E6S', '9999-12-31 23:59:59.999999'))
    AND lo.CUSTOMER_ORDER_HK IS NOT NULL
    AND lc.CUSTOMER_ORDER_HK IS NULL
),
records_to_insert AS (
    SELECT * FROM new_open_records
    UNION DISTINCT
    SELECT * FROM new_reopened_records
    UNION DISTINCT
    SELECT * FROM new_closed_records
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
latest_records AS (
    SELECT * FROM (
        SELECT b.CUSTOMER_ORDER_HK, b.CUSTOMER_HK, b.ORDER_HK, b.START_DATE, b.END_DATE, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE,
               ROW_NUMBER() OVER (
                    PARTITION BY b.CUSTOMER_ORDER_HK
                    ORDER BY b.LOAD_DATETIME DESC
               ) AS row_num
        FROM `dbtvault-341416`.`dbtvault`.`eff_sat_customer_order_incremental_nae` AS b
    )AS inner_rank
        WHERE row_num = 1),
latest_open AS (
    SELECT c.CUSTOMER_ORDER_HK, c.CUSTOMER_HK, c.ORDER_HK, c.START_DATE, c.END_DATE, c.EFFECTIVE_FROM, c.LOAD_DATETIME, c.RECORD_SOURCE
    FROM latest_records AS c
    WHERE DATE(c.END_DATE) = DATE(PARSE_DATETIME('%F %H:%M:%E6S', '9999-12-31 23:59:59.999999'))
),
latest_closed AS (
    SELECT d.CUSTOMER_ORDER_HK, d.CUSTOMER_HK, d.ORDER_HK, d.START_DATE, d.END_DATE, d.EFFECTIVE_FROM, d.LOAD_DATETIME, d.RECORD_SOURCE
    FROM latest_records AS d
    WHERE DATE(d.END_DATE) != DATE(PARSE_DATETIME('%F %H:%M:%E6S', '9999-12-31 23:59:59.999999'))
),
new_open_records AS (
    SELECT DISTINCT
        f.CUSTOMER_ORDER_HK,
        f.CUSTOMER_HK, f.ORDER_HK,
        f.START_DATE AS START_DATE,
        f.END_DATE AS END_DATE,
        f.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        f.LOAD_DATETIME,
        f.RECORD_SOURCE
    FROM source_data AS f
    LEFT JOIN latest_records AS lr
    ON f.CUSTOMER_ORDER_HK = lr.CUSTOMER_ORDER_HK
    WHERE lr.CUSTOMER_ORDER_HK IS NULL
),
new_reopened_records AS (
    SELECT DISTINCT
        lc.CUSTOMER_ORDER_HK,
        lc.CUSTOMER_HK, lc.ORDER_HK,
        g.START_DATE AS START_DATE,
        g.END_DATE AS END_DATE,
        g.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        g.LOAD_DATETIME,
        g.RECORD_SOURCE
    FROM source_data AS g
    INNER JOIN latest_closed AS lc
    ON g.CUSTOMER_ORDER_HK = lc.CUSTOMER_ORDER_HK
    WHERE DATE(g.END_DATE) = DATE(PARSE_DATETIME('%F %H:%M:%E6S', '9999-12-31 23:59:59.999999'))
),
new_closed_records AS (
    SELECT DISTINCT
        lo.CUSTOMER_ORDER_HK,
        lo.CUSTOMER_HK, lo.ORDER_HK,
        h.START_DATE AS START_DATE,
        h.END_DATE AS END_DATE,
        h.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        h.LOAD_DATETIME,
        lo.RECORD_SOURCE
    FROM source_data AS h
    LEFT JOIN latest_open AS lo
    ON lo.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    LEFT JOIN latest_closed AS lc
    ON lc.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    WHERE DATE(h.END_DATE) != DATE(PARSE_DATETIME('%F %H:%M:%E6S', '9999-12-31 23:59:59.999999'))
    AND lo.CUSTOMER_ORDER_HK IS NOT NULL
    AND lc.CUSTOMER_ORDER_HK IS NULL
),
records_to_insert AS (
    SELECT * FROM new_open_records
    UNION DISTINCT
    SELECT * FROM new_reopened_records
    UNION DISTINCT
    SELECT * FROM new_closed_records
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT i.CUSTOMER_ORDER_HK, i.CUSTOMER_HK, i.ORDER_HK, i.START_DATE, i.END_DATE, i.EFFECTIVE_FROM, i.LOAD_DATETIME, i.RECORD_SOURCE
    FROM source_data AS i
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
latest_records AS (
    SELECT * FROM (
        SELECT b.CUSTOMER_ORDER_HK, b.CUSTOMER_HK, b.ORDER_HK, b.START_DATE, b.END_DATE, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE,
               ROW_NUMBER() OVER (
                    PARTITION BY b.CUSTOMER_ORDER_HK
                    ORDER BY b.LOAD_DATETIME DESC
               ) AS row_num
        FROM "AUTOMATE_DV_TEST"."TEST"."eff_sat_customer_order_incremental" AS b
    )l
        WHERE l.row_num = 1),
latest_open AS (
    SELECT c.CUSTOMER_ORDER_HK, c.CUSTOMER_HK, c.ORDER_HK, c.START_DATE, c.END_DATE, c.EFFECTIVE_FROM, c.LOAD_DATETIME, c.RECORD_SOURCE
    FROM latest_records AS c
    WHERE CONVERT(DATE, c.END_DATE) = CONVERT(DATE, CONVERT(DATETIME2, '9999-12-31 23:59:59.9999999'))
),
latest_closed AS (
    SELECT d.CUSTOMER_ORDER_HK, d.CUSTOMER_HK, d.ORDER_HK, d.START_DATE, d.END_DATE, d.EFFECTIVE_FROM, d.LOAD_DATETIME, d.RECORD_SOURCE
    FROM latest_records AS d
    WHERE CONVERT(DATE, d.END_DATE) != CONVERT(DATE, CONVERT(DATETIME2, '9999-12-31 23:59:59.9999999'))
),
new_open_records AS (
    SELECT DISTINCT
        f.CUSTOMER_ORDER_HK,
        f.CUSTOMER_HK, f.ORDER_HK,
        f.START_DATE AS START_DATE,
        f.END_DATE AS END_DATE,
        f.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        f.LOAD_DATETIME,
        f.RECORD_SOURCE
    FROM source_data AS f
    LEFT JOIN latest_records AS lr
    ON f.CUSTOMER_ORDER_HK = lr.CUSTOMER_ORDER_HK
    WHERE lr.CUSTOMER_ORDER_HK IS NULL
),
new_reopened_records AS (
    SELECT DISTINCT
        lc.CUSTOMER_ORDER_HK,
        lc.CUSTOMER_HK, lc.ORDER_HK,
        g.START_DATE AS START_DATE,
        g.END_DATE AS END_DATE,
        g.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        g.LOAD_DATETIME,
        g.RECORD_SOURCE
    FROM source_data AS g
    INNER JOIN latest_closed AS lc
    ON g.CUSTOMER_ORDER_HK = lc.CUSTOMER_ORDER_HK
    WHERE CONVERT(DATE, g.END_DATE) = CONVERT(DATE, CONVERT(DATETIME2, '9999-12-31 23:59:59.9999999'))
),
new_closed_records AS (
    SELECT DISTINCT
        lo.CUSTOMER_ORDER_HK,
        lo.CUSTOMER_HK, lo.ORDER_HK,
        h.START_DATE AS START_DATE,
        h.END_DATE AS END_DATE,
        h.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        h.LOAD_DATETIME,
        lo.RECORD_SOURCE
    FROM source_data AS h
    LEFT JOIN latest_open AS lo
    ON lo.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    LEFT JOIN latest_closed AS lc
    ON lc.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    WHERE CONVERT(DATE, h.END_DATE) != CONVERT(DATE, CONVERT(DATETIME2, '9999-12-31 23:59:59.9999999'))
    AND lo.CUSTOMER_ORDER_HK IS NOT NULL
    AND lc.CUSTOMER_ORDER_HK IS NULL
),
records_to_insert AS (
    SELECT * FROM new_open_records
    UNION
    SELECT * FROM new_reopened_records
    UNION
    SELECT * FROM new_closed_records
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
latest_records AS (
    SELECT * FROM (
        SELECT b.CUSTOMER_ORDER_HK, b.CUSTOMER_HK, b.ORDER_HK, b.START_DATE, b.END_DATE, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE,
               ROW_NUMBER() OVER (
                    PARTITION BY b.CUSTOMER_ORDER_HK
                    ORDER BY b.LOAD_DATETIME DESC
               ) AS row_num
        FROM "AUTOMATE_DV_TEST"."TEST"."eff_sat_customer_order_incremental_nae" AS b
    )l
        WHERE l.row_num = 1),
latest_open AS (
    SELECT c.CUSTOMER_ORDER_HK, c.CUSTOMER_HK, c.ORDER_HK, c.START_DATE, c.END_DATE, c.EFFECTIVE_FROM, c.LOAD_DATETIME, c.RECORD_SOURCE
    FROM latest_records AS c
    WHERE CONVERT(DATE, c.END_DATE) = CONVERT(DATE, CONVERT(DATETIME2, '9999-12-31 23:59:59.9999999'))
),
latest_closed AS (
    SELECT d.CUSTOMER_ORDER_HK, d.CUSTOMER_HK, d.ORDER_HK, d.START_DATE, d.END_DATE, d.EFFECTIVE_FROM, d.LOAD_DATETIME, d.RECORD_SOURCE
    FROM latest_records AS d
    WHERE CONVERT(DATE, d.END_DATE) != CONVERT(DATE, CONVERT(DATETIME2, '9999-12-31 23:59:59.9999999'))
),
new_open_records AS (
    SELECT DISTINCT
        f.CUSTOMER_ORDER_HK,
        f.CUSTOMER_HK, f.ORDER_HK,
        f.START_DATE AS START_DATE,
        f.END_DATE AS END_DATE,
        f.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        f.LOAD_DATETIME,
        f.RECORD_SOURCE
    FROM source_data AS f
    LEFT JOIN latest_records AS lr
    ON f.CUSTOMER_ORDER_HK = lr.CUSTOMER_ORDER_HK
    WHERE lr.CUSTOMER_ORDER_HK IS NULL
),
new_reopened_records AS (
    SELECT DISTINCT
        lc.CUSTOMER_ORDER_HK,
        lc.CUSTOMER_HK, lc.ORDER_HK,
        g.START_DATE AS START_DATE,
        g.END_DATE AS END_DATE,
        g.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        g.LOAD_DATETIME,
        g.RECORD_SOURCE
    FROM source_data AS g
    INNER JOIN latest_closed AS lc
    ON g.CUSTOMER_ORDER_HK = lc.CUSTOMER_ORDER_HK
    WHERE CONVERT(DATE, g.END_DATE) = CONVERT(DATE, CONVERT(DATETIME2, '9999-12-31 23:59:59.9999999'))
),
new_closed_records AS (
    SELECT DISTINCT
        lo.CUSTOMER_ORDER_HK,
        lo.CUSTOMER_HK, lo.ORDER_HK,
        h.START_DATE AS START_DATE,
        h.END_DATE AS END_DATE,
        h.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        h.LOAD_DATETIME,
        lo.RECORD_SOURCE
    FROM source_data AS h
    LEFT JOIN latest_open AS lo
    ON lo.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    LEFT JOIN latest_closed AS lc
    ON lc.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    WHERE CONVERT(DATE, h.END_DATE) != CONVERT(DATE, CONVERT(DATETIME2, '9999-12-31 23:59:59.9999999'))
    AND lo.CUSTOMER_ORDER_HK IS NOT NULL
    AND lc.CUSTOMER_ORDER_HK IS NULL
),
records_to_insert AS (
    SELECT * FROM new_open_records
    UNION
    SELECT * FROM new_reopened_records
    UNION
    SELECT * FROM new_closed_records
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT i.CUSTOMER_ORDER_HK, i.CUSTOMER_HK, i.ORDER_HK, i.START_DATE, i.END_DATE, i.EFFECTIVE_FROM, i.LOAD_DATETIME, i.RECORD_SOURCE
    FROM source_data AS i
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
latest_records AS (
    SELECT * FROM (
        SELECT b.CUSTOMER_ORDER_HK, b.CUSTOMER_HK, b.ORDER_HK, b.START_DATE, b.END_DATE, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE,
               ROW_NUMBER() OVER (
                    PARTITION BY b.CUSTOMER_ORDER_HK
                    ORDER BY b.LOAD_DATETIME DESC
               ) AS row_num
        FROM "dbtvault_db"."development"."eff_sat_customer_order_incremental" AS b
    )AS inner_rank
        WHERE row_num = 1),
latest_open AS (
    SELECT c.CUSTOMER_ORDER_HK, c.CUSTOMER_HK, c.ORDER_HK, c.START_DATE, c.END_DATE, c.EFFECTIVE_FROM, c.LOAD_DATETIME, c.RECORD_SOURCE
    FROM latest_records AS c
    WHERE TO_DATE(c.END_DATE::VARCHAR, 'YYY-MM-DD') = TO_DATE(to_char(timestamp '9999-12-31 23:59:59.999', 'YYYY-MM-DD HH24:MI:SS.MS')::timestamp::VARCHAR, 'YYY-MM-DD')
),
latest_closed AS (
    SELECT d.CUSTOMER_ORDER_HK, d.CUSTOMER_HK, d.ORDER_HK, d.START_DATE, d.END_DATE, d.EFFECTIVE_FROM, d.LOAD_DATETIME, d.RECORD_SOURCE
    FROM latest_records AS d
    WHERE TO_DATE(d.END_DATE::VARCHAR, 'YYY-MM-DD') != TO_DATE(to_char(timestamp '9999-12-31 23:59:59.999', 'YYYY-MM-DD HH24:MI:SS.MS')::timestamp::VARCHAR, 'YYY-MM-DD')
),
new_open_records AS (
    SELECT DISTINCT
        f.CUSTOMER_ORDER_HK,
        f.CUSTOMER_HK, f.ORDER_HK,
        f.START_DATE AS START_DATE,
        f.END_DATE AS END_DATE,
        f.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        f.LOAD_DATETIME,
        f.RECORD_SOURCE
    FROM source_data AS f
    LEFT JOIN latest_records AS lr
    ON f.CUSTOMER_ORDER_HK = lr.CUSTOMER_ORDER_HK
    WHERE lr.CUSTOMER_ORDER_HK IS NULL
),
new_reopened_records AS (
    SELECT DISTINCT
        lc.CUSTOMER_ORDER_HK,
        lc.CUSTOMER_HK, lc.ORDER_HK,
        g.START_DATE AS START_DATE,
        g.END_DATE AS END_DATE,
        g.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        g.LOAD_DATETIME,
        g.RECORD_SOURCE
    FROM source_data AS g
    INNER JOIN latest_closed AS lc
    ON g.CUSTOMER_ORDER_HK = lc.CUSTOMER_ORDER_HK
    WHERE TO_DATE(g.END_DATE::VARCHAR, 'YYY-MM-DD') = TO_DATE(to_char(timestamp '9999-12-31 23:59:59.999', 'YYYY-MM-DD HH24:MI:SS.MS')::timestamp::VARCHAR, 'YYY-MM-DD')
),
new_closed_records AS (
    SELECT DISTINCT
        lo.CUSTOMER_ORDER_HK,
        lo.CUSTOMER_HK, lo.ORDER_HK,
        h.START_DATE AS START_DATE,
        h.END_DATE AS END_DATE,
        h.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        h.LOAD_DATETIME,
        lo.RECORD_SOURCE
    FROM source_data AS h
    LEFT JOIN latest_open AS lo
    ON lo.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    LEFT JOIN latest_closed AS lc
    ON lc.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    WHERE TO_DATE(h.END_DATE::VARCHAR, 'YYY-MM-DD') != TO_DATE(to_char(timestamp '9999-12-31 23:59:59.999', 'YYYY-MM-DD HH24:MI:SS.MS')::timestamp::VARCHAR, 'YYY-MM-DD')
    AND lo.CUSTOMER_ORDER_HK IS NOT NULL
    AND lc.CUSTOMER_ORDER_HK IS NULL
),
records_to_insert AS (
    SELECT * FROM new_open_records
    UNION
    SELECT * FROM new_reopened_records
    UNION
    SELECT * FROM new_closed_records
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_customer" AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
latest_records AS (
    SELECT * FROM (
        SELECT b.CUSTOMER_ORDER_HK, b.CUSTOMER_HK, b.ORDER_HK, b.START_DATE, b.END_DATE, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE,
               ROW_NUMBER() OVER (
                    PARTITION BY b.CUSTOMER_ORDER_HK
                    ORDER BY b.LOAD_DATETIME DESC
               ) AS row_num
        FROM "dbtvault_db"."development"."eff_sat_customer_order_incremental_nae" AS b
    )AS inner_rank
        WHERE row_num = 1),
latest_open AS (
    SELECT c.CUSTOMER_ORDER_HK, c.CUSTOMER_HK, c.ORDER_HK, c.START_DATE, c.END_DATE, c.EFFECTIVE_FROM, c.LOAD_DATETIME, c.RECORD_SOURCE
    FROM latest_records AS c
    WHERE TO_DATE(c.END_DATE::VARCHAR, 'YYY-MM-DD') = TO_DATE(to_char(timestamp '9999-12-31 23:59:59.999', 'YYYY-MM-DD HH24:MI:SS.MS')::timestamp::VARCHAR, 'YYY-MM-DD')
),
latest_closed AS (
    SELECT d.CUSTOMER_ORDER_HK, d.CUSTOMER_HK, d.ORDER_HK, d.START_DATE, d.END_DATE, d.EFFECTIVE_FROM, d.LOAD_DATETIME, d.RECORD_SOURCE
    FROM latest_records AS d
    WHERE TO_DATE(d.END_DATE::VARCHAR, 'YYY-MM-DD') != TO_DATE(to_char(timestamp '9999-12-31 23:59:59.999', 'YYYY-MM-DD HH24:MI:SS.MS')::timestamp::VARCHAR, 'YYY-MM-DD')
),
new_open_records AS (
    SELECT DISTINCT
        f.CUSTOMER_ORDER_HK,
        f.CUSTOMER_HK, f.ORDER_HK,
        f.START_DATE AS START_DATE,
        f.END_DATE AS END_DATE,
        f.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        f.LOAD_DATETIME,
        f.RECORD_SOURCE
    FROM source_data AS f
    LEFT JOIN latest_records AS lr
    ON f.CUSTOMER_ORDER_HK = lr.CUSTOMER_ORDER_HK
    WHERE lr.CUSTOMER_ORDER_HK IS NULL
),
new_reopened_records AS (
    SELECT DISTINCT
        lc.CUSTOMER_ORDER_HK,
        lc.CUSTOMER_HK, lc.ORDER_HK,
        g.START_DATE AS START_DATE,
        g.END_DATE AS END_DATE,
        g.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        g.LOAD_DATETIME,
        g.RECORD_SOURCE
    FROM source_data AS g
    INNER JOIN latest_closed AS lc
    ON g.CUSTOMER_ORDER_HK = lc.CUSTOMER_ORDER_HK
    WHERE TO_DATE(g.END_DATE::VARCHAR, 'YYY-MM-DD') = TO_DATE(to_char(timestamp '9999-12-31 23:59:59.999', 'YYYY-MM-DD HH24:MI:SS.MS')::timestamp::VARCHAR, 'YYY-MM-DD')
),
new_closed_records AS (
    SELECT DISTINCT
        lo.CUSTOMER_ORDER_HK,
        lo.CUSTOMER_HK, lo.ORDER_HK,
        h.START_DATE AS START_DATE,
        h.END_DATE AS END_DATE,
        h.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        h.LOAD_DATETIME,
        lo.RECORD_SOURCE
    FROM source_data AS h
    LEFT JOIN latest_open AS lo
    ON lo.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    LEFT JOIN latest_closed AS lc
    ON lc.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    WHERE TO_DATE(h.END_DATE::VARCHAR, 'YYY-MM-DD') != TO_DATE(to_char(timestamp '9999-12-31 23:59:59.999', 'YYYY-MM-DD HH24:MI:SS.MS')::timestamp::VARCHAR, 'YYY-MM-DD')
    AND lo.CUSTOMER_ORDER_HK IS NOT NULL
    AND lc.CUSTOMER_ORDER_HK IS NULL
),
records_to_insert AS (
    SELECT * FROM new_open_records
    UNION
    SELECT * FROM new_reopened_records
    UNION
    SELECT * FROM new_closed_records
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
records_to_insert AS (
    SELECT i.CUSTOMER_ORDER_HK, i.CUSTOMER_HK, i.ORDER_HK, i.START_DATE, i.END_DATE, i.EFFECTIVE_FROM, i.LOAD_DATETIME, i.RECORD_SOURCE
    FROM source_data AS i
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
latest_records AS (
    SELECT * FROM (
        SELECT b.CUSTOMER_ORDER_HK, b.CUSTOMER_HK, b.ORDER_HK, b.START_DATE, b.END_DATE, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE,
               ROW_NUMBER() OVER (
                    PARTITION BY b.CUSTOMER_ORDER_HK
                    ORDER BY b.LOAD_DATETIME DESC
               ) AS row_num
        FROM `hive_metastore`.`dbtvault`.`eff_sat_customer_order_incremental` AS b
    )AS inner_rank
        WHERE row_num = 1),
latest_open AS (
    SELECT c.CUSTOMER_ORDER_HK, c.CUSTOMER_HK, c.ORDER_HK, c.START_DATE, c.END_DATE, c.EFFECTIVE_FROM, c.LOAD_DATETIME, c.RECORD_SOURCE
    FROM latest_records AS c
    WHERE TO_DATE(c.END_DATE) = TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
),
latest_closed AS (
    SELECT d.CUSTOMER_ORDER_HK, d.CUSTOMER_HK, d.ORDER_HK, d.START_DATE, d.END_DATE, d.EFFECTIVE_FROM, d.LOAD_DATETIME, d.RECORD_SOURCE
    FROM latest_records AS d
    WHERE TO_DATE(d.END_DATE) != TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
),
new_open_records AS (
    SELECT DISTINCT
        f.CUSTOMER_ORDER_HK,
        f.CUSTOMER_HK, f.ORDER_HK,
        f.START_DATE AS START_DATE,
        f.END_DATE AS END_DATE,
        f.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        f.LOAD_DATETIME,
        f.RECORD_SOURCE
    FROM source_data AS f
    LEFT JOIN latest_records AS lr
    ON f.CUSTOMER_ORDER_HK = lr.CUSTOMER_ORDER_HK
    WHERE lr.CUSTOMER_ORDER_HK IS NULL
),
new_reopened_records AS (
    SELECT DISTINCT
        lc.CUSTOMER_ORDER_HK,
        lc.CUSTOMER_HK, lc.ORDER_HK,
        g.START_DATE AS START_DATE,
        g.END_DATE AS END_DATE,
        g.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        g.LOAD_DATETIME,
        g.RECORD_SOURCE
    FROM source_data AS g
    INNER JOIN latest_closed AS lc
    ON g.CUSTOMER_ORDER_HK = lc.CUSTOMER_ORDER_HK
    WHERE TO_DATE(g.END_DATE) = TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
),
new_closed_records AS (
    SELECT DISTINCT
        lo.CUSTOMER_ORDER_HK,
        lo.CUSTOMER_HK, lo.ORDER_HK,
        h.START_DATE AS START_DATE,
        h.END_DATE AS END_DATE,
        h.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        h.LOAD_DATETIME,
        lo.RECORD_SOURCE
    FROM source_data AS h
    LEFT JOIN latest_open AS lo
    ON lo.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    LEFT JOIN latest_closed AS lc
    ON lc.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    WHERE TO_DATE(h.END_DATE) != TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
    AND lo.CUSTOMER_ORDER_HK IS NOT NULL
    AND lc.CUSTOMER_ORDER_HK IS NULL
),
records_to_insert AS (
    SELECT * FROM new_open_records
    UNION
    SELECT * FROM new_reopened_records
    UNION
    SELECT * FROM new_closed_records
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
WITH source_data AS (
    SELECT a.CUSTOMER_ORDER_HK, a.CUSTOMER_HK, a.ORDER_HK, a.START_DATE, a.END_DATE, a.EFFECTIVE_FROM, a.LOAD_DATETIME, a.RECORD_SOURCE
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS a
    WHERE a.CUSTOMER_HK IS NOT NULL
    AND a.ORDER_HK IS NOT NULL
),
latest_records AS (
    SELECT * FROM (
        SELECT b.CUSTOMER_ORDER_HK, b.CUSTOMER_HK, b.ORDER_HK, b.START_DATE, b.END_DATE, b.EFFECTIVE_FROM, b.LOAD_DATETIME, b.RECORD_SOURCE,
               ROW_NUMBER() OVER (
                    PARTITION BY b.CUSTOMER_ORDER_HK
                    ORDER BY b.LOAD_DATETIME DESC
               ) AS row_num
        FROM `hive_metastore`.`dbtvault`.`eff_sat_customer_order_incremental_nae` AS b
    )AS inner_rank
        WHERE row_num = 1),
latest_open AS (
    SELECT c.CUSTOMER_ORDER_HK, c.CUSTOMER_HK, c.ORDER_HK, c.START_DATE, c.END_DATE, c.EFFECTIVE_FROM, c.LOAD_DATETIME, c.RECORD_SOURCE
    FROM latest_records AS c
    WHERE TO_DATE(c.END_DATE) = TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
),
latest_closed AS (
    SELECT d.CUSTOMER_ORDER_HK, d.CUSTOMER_HK, d.ORDER_HK, d.START_DATE, d.END_DATE, d.EFFECTIVE_FROM, d.LOAD_DATETIME, d.RECORD_SOURCE
    FROM latest_records AS d
    WHERE TO_DATE(d.END_DATE) != TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
),
new_open_records AS (
    SELECT DISTINCT
        f.CUSTOMER_ORDER_HK,
        f.CUSTOMER_HK, f.ORDER_HK,
        f.START_DATE AS START_DATE,
        f.END_DATE AS END_DATE,
        f.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        f.LOAD_DATETIME,
        f.RECORD_SOURCE
    FROM source_data AS f
    LEFT JOIN latest_records AS lr
    ON f.CUSTOMER_ORDER_HK = lr.CUSTOMER_ORDER_HK
    WHERE lr.CUSTOMER_ORDER_HK IS NULL
),
new_reopened_records AS (
    SELECT DISTINCT
        lc.CUSTOMER_ORDER_HK,
        lc.CUSTOMER_HK, lc.ORDER_HK,
        g.START_DATE AS START_DATE,
        g.END_DATE AS END_DATE,
        g.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        g.LOAD_DATETIME,
        g.RECORD_SOURCE
    FROM source_data AS g
    INNER JOIN latest_closed AS lc
    ON g.CUSTOMER_ORDER_HK = lc.CUSTOMER_ORDER_HK
    WHERE TO_DATE(g.END_DATE) = TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
),
new_closed_records AS (
    SELECT DISTINCT
        lo.CUSTOMER_ORDER_HK,
        lo.CUSTOMER_HK, lo.ORDER_HK,
        h.START_DATE AS START_DATE,
        h.END_DATE AS END_DATE,
        h.EFFECTIVE_FROM AS EFFECTIVE_FROM,
        h.LOAD_DATETIME,
        lo.RECORD_SOURCE
    FROM source_data AS h
    LEFT JOIN latest_open AS lo
    ON lo.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    LEFT JOIN latest_closed AS lc
    ON lc.CUSTOMER_ORDER_HK = h.CUSTOMER_ORDER_HK
    WHERE TO_DATE(h.END_DATE) != TO_DATE(TO_TIMESTAMP('9999-12-31 23:59:59.999999'))
    AND lo.CUSTOMER_ORDER_HK IS NOT NULL
    AND lc.CUSTOMER_ORDER_HK IS NULL
),
records_to_insert AS (
    SELECT * FROM new_open_records
    UNION
    SELECT * FROM new_reopened_records
    UNION
    SELECT * FROM new_closed_records
)
SELECT * FROM records_to_insert

Auto end-dating

Auto end-dating is enabled by providing a config option as below:

1
2
3
4
5
6
{{ config(is_auto_end_dating=true) }}

{{ automate_dv.eff_sat(src_pk=src_pk, src_dfk=src_dfk, src_sfk=src_sfk,
                       src_start_date=src_start_date, src_end_date=src_end_date,
                       src_eff=src_eff, src_ldts=src_ldts, src_source=src_source,
                       source_model=source_model) }}

This will enable 3 extra CTEs in the Effectivity Satellite SQL generated by the macro. Examples of this SQL are in the Example Output section above. The result of this will be additional effectivity records with end dates included, which will aid business logic and creation of presentation layer structures downstream.

In most cases where Effectivity Satellites are recording 1-1 or 1-M relationships, this feature can be safely enabled. In situations where a M-M relationship is being modelled/recorded, it becomes impossible to infer end dates. This feature is disabled by default because it could be considered an application of a business rule: The definition of the 'end' of a relationship is considered business logic which should happen in the Business Vault.

Read the Effectivity Satellite tutorial for more information.

Warning

We have implemented the auto end-dating feature to cover most use cases and scenarios, but caution should be exercised if you are unsure.


ma_sat

view source:

Snowflake BigQuery SQLServer Databricks Postgres

Generates SQL to build a Multi-Active Satellite (MAS) table.

Usage

1
2
3
4
{{ automate_dv.ma_sat(src_pk=src_pk, src_cdk=src_cdk, src_hashdiff=src_hashdiff, 
                      src_payload=src_payload, src_eff=src_eff,
                      src_extra_columns=src_extra_columns, src_ldts=src_ldts, 
                      src_source=src_source, source_model=source_model) }}

Parameters

Parameter Description Type Required?
src_pk Source primary key column String
src_cdk Source child dependent key(s) column(s) List[String]
src_hashdiff Source hashdiff column String
src_payload Source payload column(s) List[String]
src_eff Source effective from column String
src_extra_columns Select arbitrary columns from the source List[String]/String
src_ldts Source load date timestamp column String
src_source Name of the column containing the source ID String
source_model Staging model name String

Tip

Read the tutorial for more details

Example Metadata

See examples

Example Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
WITH source_data AS (
    SELECT DISTINCT s.CUSTOMER_HK, s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID, s.CUSTOMER_NAME, s.HASHDIFF, s.EFFECTIVE_FROM, s.LOAD_DATETIME, s.RECORD_SOURCE
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS s
    WHERE s.CUSTOMER_HK IS NOT NULL
        AND s.CUSTOMER_PHONE IS NOT NULL
        AND s.CUSTOMER_PHONE_LOCATOR_ID IS NOT NULL
),
records_to_insert AS (
    SELECT source_data.CUSTOMER_HK, source_data.CUSTOMER_PHONE, source_data.CUSTOMER_PHONE_LOCATOR_ID, source_data.CUSTOMER_NAME, source_data.HASHDIFF, source_data.EFFECTIVE_FROM, source_data.LOAD_DATETIME, source_data.RECORD_SOURCE
    FROM source_data
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
WITH source_data AS (
    SELECT DISTINCT s.CUSTOMER_HK, s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID, s.CUSTOMER_NAME, s.HASHDIFF, s.EFFECTIVE_FROM, s.LOAD_DATETIME, s.RECORD_SOURCE
        ,COUNT(DISTINCT s.HASHDIFF, s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID)
            OVER (PARTITION BY s.CUSTOMER_HK) AS source_count
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.stg_customer AS s
    WHERE s.CUSTOMER_HK IS NOT NULL
        AND s.CUSTOMER_PHONE IS NOT NULL
        AND s.CUSTOMER_PHONE_LOCATOR_ID IS NOT NULL
),
latest_records AS (
    SELECT mas.CUSTOMER_HK, mas.HASHDIFF, mas.CUSTOMER_PHONE, mas.CUSTOMER_PHONE_LOCATOR_ID, mas.LOAD_DATETIME,
           mas.latest_rank,
           DENSE_RANK() OVER (PARTITION BY mas.CUSTOMER_HK
                              ORDER BY mas.HASHDIFF, mas.CUSTOMER_PHONE, mas.CUSTOMER_PHONE_LOCATOR_ID ASC
           ) AS check_rank
    FROM (
    SELECT inner_mas.CUSTOMER_HK, inner_mas.HASHDIFF, inner_mas.CUSTOMER_PHONE, inner_mas.CUSTOMER_PHONE_LOCATOR_ID, inner_mas.LOAD_DATETIME,
           RANK() OVER (PARTITION BY inner_mas.CUSTOMER_HK
                        ORDER BY inner_mas.LOAD_DATETIME DESC
           ) AS latest_rank
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.ma_sat_customer_address_incremental AS inner_mas
        INNER JOIN (
            SELECT DISTINCT s.CUSTOMER_HK
            FROM source_data as s
        ) AS spk
            ON inner_mas.CUSTOMER_HK = spk.CUSTOMER_HK
            QUALIFY latest_rank = 1
    ) AS mas
),
latest_group_details AS (
    SELECT lr.CUSTOMER_HK,
           lr.LOAD_DATETIME,
           MAX(lr.check_rank) AS latest_count
    FROM latest_records AS lr
    GROUP BY lr.CUSTOMER_HK, lr.LOAD_DATETIME
),
records_to_insert AS (
    SELECT source_data.CUSTOMER_HK, source_data.CUSTOMER_PHONE, source_data.CUSTOMER_PHONE_LOCATOR_ID, source_data.CUSTOMER_NAME, source_data.HASHDIFF, source_data.EFFECTIVE_FROM, source_data.LOAD_DATETIME, source_data.RECORD_SOURCE
    FROM source_data
    WHERE EXISTS (
        SELECT 1
        FROM source_data AS stage
        WHERE NOT EXISTS (
            SELECT 1
            FROM (
                SELECT lr.CUSTOMER_HK, lr.HASHDIFF, lr.CUSTOMER_PHONE, lr.CUSTOMER_PHONE_LOCATOR_ID, lr.LOAD_DATETIME,
                lg.latest_count
                FROM latest_records AS lr
                INNER JOIN latest_group_details AS lg
                    ON lr.CUSTOMER_HK = lg.CUSTOMER_HK
                    AND lr.LOAD_DATETIME = lg.LOAD_DATETIME
            ) AS active_records
            WHERE stage.CUSTOMER_HK = active_records.CUSTOMER_HK
                AND stage.HASHDIFF = active_records.HASHDIFF
                AND stage.CUSTOMER_PHONE = active_records.CUSTOMER_PHONE AND stage.CUSTOMER_PHONE_LOCATOR_ID = active_records.CUSTOMER_PHONE_LOCATOR_ID
                AND stage.source_count = active_records.latest_count
        )
        AND source_data.CUSTOMER_HK = stage.CUSTOMER_HK
    )
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    WITH source_data AS (
    SELECT DISTINCT s.CUSTOMER_HK, s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID, s.CUSTOMER_NAME, s.HASHDIFF, s.EFFECTIVE_FROM, s.LOAD_DATETIME, s.RECORD_SOURCE
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS s
    WHERE s.CUSTOMER_HK IS NOT NULL
        AND s.CUSTOMER_PHONE IS NOT NULL
        AND s.CUSTOMER_PHONE_LOCATOR_ID IS NOT NULL
),
records_to_insert AS (
    SELECT source_data.CUSTOMER_HK, source_data.CUSTOMER_PHONE, source_data.CUSTOMER_PHONE_LOCATOR_ID, source_data.CUSTOMER_NAME, source_data.HASHDIFF, source_data.EFFECTIVE_FROM, source_data.LOAD_DATETIME, source_data.RECORD_SOURCE
    FROM source_data
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
    WITH source_data AS (
    SELECT DISTINCT s.CUSTOMER_HK, s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID, s.CUSTOMER_NAME, s.HASHDIFF, s.EFFECTIVE_FROM, s.LOAD_DATETIME, s.RECORD_SOURCE
    FROM `dbtvault-341416`.`dbtvault`.`stg_customer` AS s
    WHERE s.CUSTOMER_HK IS NOT NULL
        AND s.CUSTOMER_PHONE IS NOT NULL
        AND s.CUSTOMER_PHONE_LOCATOR_ID IS NOT NULL
),
source_data_with_count AS (
    SELECT a.*,
           b.source_count
    FROM source_data a
    INNER JOIN (
        SELECT t.CUSTOMER_HK,
            COUNT(*) AS source_count
        FROM (SELECT DISTINCT s.CUSTOMER_HK, s.HASHDIFF, s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID FROM source_data AS s) AS t
        GROUP BY t.CUSTOMER_HK
    ) AS b
    ON a.CUSTOMER_HK = b.CUSTOMER_HK
),
latest_records AS (
    SELECT mas.CUSTOMER_HK, mas.HASHDIFF, mas.CUSTOMER_PHONE, mas.CUSTOMER_PHONE_LOCATOR_ID, mas.LOAD_DATETIME,
           mas.latest_rank,
           DENSE_RANK() OVER (PARTITION BY mas.CUSTOMER_HK
                              ORDER BY mas.HASHDIFF,
                                       mas.CUSTOMER_PHONE, mas.CUSTOMER_PHONE_LOCATOR_ID ASC
           ) AS check_rank
    FROM (
    SELECT inner_mas.CUSTOMER_HK, inner_mas.HASHDIFF, inner_mas.CUSTOMER_PHONE, inner_mas.CUSTOMER_PHONE_LOCATOR_ID, inner_mas.LOAD_DATETIME,
           RANK() OVER (PARTITION BY inner_mas.CUSTOMER_HK
                        ORDER BY inner_mas.LOAD_DATETIME DESC
           ) AS latest_rank
    FROM `dbtvault-341416`.`dbtvault`.`ma_sat_customer_address_incremental` AS inner_mas
    INNER JOIN (SELECT DISTINCT s.CUSTOMER_HK FROM source_data as s ) AS spk
        ON inner_mas.CUSTOMER_HK = spk.CUSTOMER_HK
    ) AS mas
    WHERE latest_rank = 1
),
latest_group_details AS (
    SELECT lr.CUSTOMER_HK,
           lr.LOAD_DATETIME,
           MAX(lr.check_rank) AS latest_count
    FROM latest_records AS lr
    GROUP BY lr.CUSTOMER_HK, lr.LOAD_DATETIME
),
records_to_insert AS (
    SELECT source_data_with_count.CUSTOMER_HK, source_data_with_count.CUSTOMER_PHONE, source_data_with_count.CUSTOMER_PHONE_LOCATOR_ID, source_data_with_count.CUSTOMER_NAME, source_data_with_count.HASHDIFF, source_data_with_count.EFFECTIVE_FROM, source_data_with_count.LOAD_DATETIME, source_data_with_count.RECORD_SOURCE
    FROM source_data_with_count
    WHERE EXISTS (
        SELECT 1
        FROM source_data_with_count AS stage
        WHERE NOT EXISTS (
            SELECT 1
            FROM (
                SELECT lr.CUSTOMER_HK, lr.HASHDIFF, lr.CUSTOMER_PHONE, lr.CUSTOMER_PHONE_LOCATOR_ID, lr.LOAD_DATETIME,
                       lg.latest_count
                FROM latest_records AS lr
                INNER JOIN latest_group_details AS lg
                    ON lr.CUSTOMER_HK = lg.CUSTOMER_HK
                    AND lr.LOAD_DATETIME = lg.LOAD_DATETIME
            ) AS active_records
            WHERE stage.CUSTOMER_HK = active_records.CUSTOMER_HK
                AND stage.HASHDIFF = active_records.HASHDIFF
                AND stage.CUSTOMER_PHONE = active_records.CUSTOMER_PHONE AND stage.CUSTOMER_PHONE_LOCATOR_ID = active_records.CUSTOMER_PHONE_LOCATOR_ID
                AND stage.source_count = active_records.latest_count
        )
        AND source_data_with_count.CUSTOMER_HK = stage.CUSTOMER_HK
    )
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    WITH source_data AS (
    SELECT DISTINCT s.CUSTOMER_HK, s.HASHDIFF, s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID, s.CUSTOMER_NAME, s.EFFECTIVE_FROM, s.LOAD_DATETIME, s.RECORD_SOURCE
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS s
    WHERE s.CUSTOMER_HK IS NOT NULL
        AND s.CUSTOMER_PHONE IS NOT NULL
        AND s.CUSTOMER_PHONE_LOCATOR_ID IS NOT NULL
),
records_to_insert AS (
SELECT source_data.CUSTOMER_HK, source_data.HASHDIFF, source_data.CUSTOMER_PHONE, source_data.CUSTOMER_PHONE_LOCATOR_ID, source_data.CUSTOMER_NAME, source_data.EFFECTIVE_FROM, source_data.LOAD_DATETIME, source_data.RECORD_SOURCE
    FROM source_data
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
    WITH source_data AS (
    SELECT DISTINCT s.CUSTOMER_HK, s.HASHDIFF, s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID, s.CUSTOMER_NAME, s.EFFECTIVE_FROM, s.LOAD_DATETIME, s.RECORD_SOURCE
    FROM "AUTOMATE_DV_TEST"."TEST"."stg_customer" AS s
    WHERE s.CUSTOMER_HK IS NOT NULL
        AND s.CUSTOMER_PHONE IS NOT NULL
        AND s.CUSTOMER_PHONE_LOCATOR_ID IS NOT NULL
),
source_data_with_count AS (
    SELECT a.*,
           b.source_count
    FROM source_data a
    INNER JOIN
    (
        SELECT t.CUSTOMER_HK,
               COUNT(*) AS source_count
        FROM (
            SELECT DISTINCT s.CUSTOMER_HK,
                            s.HASHDIFF,
                            s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID
            FROM source_data AS s
        ) AS t
        GROUP BY t.CUSTOMER_HK
    ) AS b
    ON a.CUSTOMER_HK = b.CUSTOMER_HK
),
latest_records AS (
    SELECT mas.CUSTOMER_HK, mas.HASHDIFF, mas.CUSTOMER_PHONE, mas.CUSTOMER_PHONE_LOCATOR_ID, mas.LOAD_DATETIME,
           mas.latest_rank,
           DENSE_RANK() OVER (PARTITION BY mas.CUSTOMER_HK
                              ORDER BY mas.HASHDIFF, mas.CUSTOMER_PHONE, mas.CUSTOMER_PHONE_LOCATOR_ID ASC
           ) AS check_rank
    FROM (
    SELECT inner_mas.CUSTOMER_HK, inner_mas.HASHDIFF, inner_mas.CUSTOMER_PHONE, inner_mas.CUSTOMER_PHONE_LOCATOR_ID, inner_mas.LOAD_DATETIME,
           RANK() OVER (PARTITION BY inner_mas.CUSTOMER_HK
                        ORDER BY inner_mas.LOAD_DATETIME DESC
           ) AS latest_rank
    FROM "AUTOMATE_DV_TEST"."TEST"."ma_sat_customer_address_incremental" AS inner_mas
    INNER JOIN (SELECT DISTINCT s.CUSTOMER_HK FROM source_data as s ) AS spk
        ON inner_mas.CUSTOMER_HK = spk.CUSTOMER_HK
    ) AS mas
    WHERE latest_rank = 1
    ),
latest_group_details AS (
    SELECT lr.CUSTOMER_HK,
           lr.LOAD_DATETIME,
           MAX(lr.check_rank) AS latest_count
    FROM latest_records AS lr
    GROUP BY lr.CUSTOMER_HK, lr.LOAD_DATETIME
),
records_to_insert AS (
    SELECT source_data_with_count.CUSTOMER_HK, source_data_with_count.HASHDIFF, source_data_with_count.CUSTOMER_PHONE, source_data_with_count.CUSTOMER_PHONE_LOCATOR_ID, source_data_with_count.CUSTOMER_NAME, source_data_with_count.EFFECTIVE_FROM, source_data_with_count.LOAD_DATETIME, source_data_with_count.RECORD_SOURCE
    FROM source_data_with_count
    WHERE EXISTS (
        SELECT 1
        FROM source_data_with_count AS stage
        WHERE NOT EXISTS (
            SELECT 1
            FROM (
                SELECT lr.CUSTOMER_HK, lr.HASHDIFF, lr.CUSTOMER_PHONE, lr.CUSTOMER_PHONE_LOCATOR_ID, lr.LOAD_DATETIME,
                lg.latest_count
                FROM latest_records AS lr
                INNER JOIN latest_group_details AS lg
                    ON lr.CUSTOMER_HK = lg.CUSTOMER_HK
                    AND lr.LOAD_DATETIME = lg.LOAD_DATETIME
            ) AS active_records
            WHERE stage.CUSTOMER_HK = active_records.CUSTOMER_HK
                AND stage.HASHDIFF = active_records.HASHDIFF
                AND stage.CUSTOMER_PHONE = active_records.CUSTOMER_PHONE AND stage.CUSTOMER_PHONE_LOCATOR_ID = active_records.CUSTOMER_PHONE_LOCATOR_ID
                AND stage.source_count = active_records.latest_count
        )
        AND source_data_with_count.CUSTOMER_HK = stage.CUSTOMER_HK
    )
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    WITH source_data AS (
    SELECT DISTINCT s.CUSTOMER_HK, s.HASHDIFF, s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID, s.CUSTOMER_NAME, s.EFFECTIVE_FROM, s.LOAD_DATETIME, s.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_customer" AS s
    WHERE s.CUSTOMER_HK IS NOT NULL
        AND s.CUSTOMER_PHONE IS NOT NULL
        AND s.CUSTOMER_PHONE_LOCATOR_ID IS NOT NULL
),
records_to_insert AS (
SELECT source_data.CUSTOMER_HK, source_data.HASHDIFF, source_data.CUSTOMER_PHONE, source_data.CUSTOMER_PHONE_LOCATOR_ID, source_data.CUSTOMER_NAME, source_data.EFFECTIVE_FROM, source_data.LOAD_DATETIME, source_data.RECORD_SOURCE
    FROM source_data
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
    WITH source_data AS (
    SELECT DISTINCT s.CUSTOMER_HK, s.HASHDIFF, s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID, s.CUSTOMER_NAME, s.EFFECTIVE_FROM, s.LOAD_DATETIME, s.RECORD_SOURCE
    FROM "dbtvault_db"."development"."stg_customer" AS s
    WHERE s.CUSTOMER_HK IS NOT NULL
        AND s.CUSTOMER_PHONE IS NOT NULL
        AND s.CUSTOMER_PHONE_LOCATOR_ID IS NOT NULL
),
source_data_with_count AS (
    SELECT a.*,
           b.source_count
    FROM source_data a
    INNER JOIN
    (
        SELECT t.CUSTOMER_HK,
               COUNT(*) AS source_count
        FROM (
            SELECT DISTINCT s.CUSTOMER_HK,
                            s.HASHDIFF,
                            s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID
            FROM source_data AS s
        ) AS t
        GROUP BY t.CUSTOMER_HK
    ) AS b
    ON a.CUSTOMER_HK = b.CUSTOMER_HK
),
latest_records AS (
    SELECT mas.CUSTOMER_HK, mas.HASHDIFF, mas.CUSTOMER_PHONE, mas.CUSTOMER_PHONE_LOCATOR_ID, mas.LOAD_DATETIME,
           mas.latest_rank,
           DENSE_RANK() OVER (PARTITION BY mas.CUSTOMER_HK
                              ORDER BY mas.HASHDIFF, mas.CUSTOMER_PHONE, mas.CUSTOMER_PHONE_LOCATOR_ID ASC
           ) AS check_rank
    FROM (
    SELECT inner_mas.CUSTOMER_HK, inner_mas.HASHDIFF, inner_mas.CUSTOMER_PHONE, inner_mas.CUSTOMER_PHONE_LOCATOR_ID, inner_mas.LOAD_DATETIME,
           RANK() OVER (PARTITION BY inner_mas.CUSTOMER_HK
                        ORDER BY inner_mas.LOAD_DATETIME DESC
           ) AS latest_rank
    FROM "dbtvault_db"."development"."ma_sat_customer_address_incremental" AS inner_mas
    INNER JOIN (SELECT DISTINCT s.CUSTOMER_HK FROM source_data as s ) AS spk
        ON inner_mas.CUSTOMER_HK = spk.CUSTOMER_HK
    ) AS mas
    WHERE latest_rank = 1
    ),
latest_group_details AS (
    SELECT lr.CUSTOMER_HK,
           lr.LOAD_DATETIME,
           MAX(lr.check_rank) AS latest_count
    FROM latest_records AS lr
    GROUP BY lr.CUSTOMER_HK, lr.LOAD_DATETIME
),
records_to_insert AS (
    SELECT source_data_with_count.CUSTOMER_HK, source_data_with_count.HASHDIFF, source_data_with_count.CUSTOMER_PHONE, source_data_with_count.CUSTOMER_PHONE_LOCATOR_ID, source_data_with_count.CUSTOMER_NAME, source_data_with_count.EFFECTIVE_FROM, source_data_with_count.LOAD_DATETIME, source_data_with_count.RECORD_SOURCE
    FROM source_data_with_count
    WHERE EXISTS (
        SELECT 1
        FROM source_data_with_count AS stage
        WHERE NOT EXISTS (
            SELECT 1
            FROM (
                SELECT lr.CUSTOMER_HK, lr.HASHDIFF, lr.CUSTOMER_PHONE, lr.CUSTOMER_PHONE_LOCATOR_ID, lr.LOAD_DATETIME,
                lg.latest_count
                FROM latest_records AS lr
                INNER JOIN latest_group_details AS lg
                    ON lr.CUSTOMER_HK = lg.CUSTOMER_HK
                    AND lr.LOAD_DATETIME = lg.LOAD_DATETIME
            ) AS active_records
            WHERE stage.CUSTOMER_HK = active_records.CUSTOMER_HK
                AND stage.HASHDIFF = active_records.HASHDIFF
                AND stage.CUSTOMER_PHONE = active_records.CUSTOMER_PHONE AND stage.CUSTOMER_PHONE_LOCATOR_ID = active_records.CUSTOMER_PHONE_LOCATOR_ID
                AND stage.source_count = active_records.latest_count
        )
        AND source_data_with_count.CUSTOMER_HK = stage.CUSTOMER_HK
    )
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
    WITH source_data AS (
    SELECT DISTINCT s.CUSTOMER_HK, s.HASHDIFF, s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID, s.CUSTOMER_NAME, s.EFFECTIVE_FROM, s.LOAD_DATETIME, s.RECORD_SOURCE
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS s
    WHERE s.CUSTOMER_HK IS NOT NULL
        AND s.CUSTOMER_PHONE IS NOT NULL
        AND s.CUSTOMER_PHONE_LOCATOR_ID IS NOT NULL
),
records_to_insert AS (
SELECT source_data.CUSTOMER_HK, source_data.HASHDIFF, source_data.CUSTOMER_PHONE, source_data.CUSTOMER_PHONE_LOCATOR_ID, source_data.CUSTOMER_NAME, source_data.EFFECTIVE_FROM, source_data.LOAD_DATETIME, source_data.RECORD_SOURCE
    FROM source_data
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
    WITH source_data AS (
    SELECT DISTINCT s.CUSTOMER_HK, s.HASHDIFF, s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID, s.CUSTOMER_NAME, s.EFFECTIVE_FROM, s.LOAD_DATETIME, s.RECORD_SOURCE
    FROM `hive_metastore`.`dbtvault`.`stg_customer` AS s
    WHERE s.CUSTOMER_HK IS NOT NULL
        AND s.CUSTOMER_PHONE IS NOT NULL
        AND s.CUSTOMER_PHONE_LOCATOR_ID IS NOT NULL
),
source_data_with_count AS (
    SELECT a.*,
           b.source_count
    FROM source_data a
    INNER JOIN
    (
        SELECT t.CUSTOMER_HK,
               COUNT(*) AS source_count
        FROM (
            SELECT DISTINCT s.CUSTOMER_HK,
                            s.HASHDIFF,
                            s.CUSTOMER_PHONE, s.CUSTOMER_PHONE_LOCATOR_ID
            FROM source_data AS s
        ) AS t
        GROUP BY t.CUSTOMER_HK
    ) AS b
    ON a.CUSTOMER_HK = b.CUSTOMER_HK
),
latest_records AS (
    SELECT mas.CUSTOMER_HK, mas.HASHDIFF, mas.CUSTOMER_PHONE, mas.CUSTOMER_PHONE_LOCATOR_ID, mas.LOAD_DATETIME,
           mas.latest_rank,
           DENSE_RANK() OVER (PARTITION BY mas.CUSTOMER_HK
                              ORDER BY mas.HASHDIFF, mas.CUSTOMER_PHONE, mas.CUSTOMER_PHONE_LOCATOR_ID ASC
           ) AS check_rank
    FROM (
    SELECT inner_mas.CUSTOMER_HK, inner_mas.HASHDIFF, inner_mas.CUSTOMER_PHONE, inner_mas.CUSTOMER_PHONE_LOCATOR_ID, inner_mas.LOAD_DATETIME,
           RANK() OVER (PARTITION BY inner_mas.CUSTOMER_HK
                        ORDER BY inner_mas.LOAD_DATETIME DESC
           ) AS latest_rank
    FROM `hive_metastore`.`dbtvault`.`ma_sat_customer_address_incremental` AS inner_mas
    INNER JOIN (SELECT DISTINCT s.CUSTOMER_HK FROM source_data as s ) AS spk
        ON inner_mas.CUSTOMER_HK = spk.CUSTOMER_HK
        QUALIFY latest_rank = 1
    ) AS mas
    ),
latest_group_details AS (
    SELECT lr.CUSTOMER_HK,
           lr.LOAD_DATETIME,
           MAX(lr.check_rank) AS latest_count
    FROM latest_records AS lr
    GROUP BY lr.CUSTOMER_HK, lr.LOAD_DATETIME
),
records_to_insert AS (
    SELECT source_data_with_count.CUSTOMER_HK, source_data_with_count.HASHDIFF, source_data_with_count.CUSTOMER_PHONE, source_data_with_count.CUSTOMER_PHONE_LOCATOR_ID, source_data_with_count.CUSTOMER_NAME, source_data_with_count.EFFECTIVE_FROM, source_data_with_count.LOAD_DATETIME, source_data_with_count.RECORD_SOURCE
    FROM source_data_with_count
    WHERE EXISTS (
        SELECT 1
        FROM source_data_with_count AS stage
        WHERE NOT EXISTS (
            SELECT 1
            FROM (
                SELECT lr.CUSTOMER_HK, lr.HASHDIFF, lr.CUSTOMER_PHONE, lr.CUSTOMER_PHONE_LOCATOR_ID, lr.LOAD_DATETIME,
                lg.latest_count
                FROM latest_records AS lr
                INNER JOIN latest_group_details AS lg
                    ON lr.CUSTOMER_HK = lg.CUSTOMER_HK
                    AND lr.LOAD_DATETIME = lg.LOAD_DATETIME
            ) AS active_records
            WHERE stage.CUSTOMER_HK = active_records.CUSTOMER_HK
                AND stage.HASHDIFF = active_records.HASHDIFF
                AND stage.CUSTOMER_PHONE = active_records.CUSTOMER_PHONE AND stage.CUSTOMER_PHONE_LOCATOR_ID = active_records.CUSTOMER_PHONE_LOCATOR_ID
                AND stage.source_count = active_records.latest_count
        )
        AND source_data_with_count.CUSTOMER_HK = stage.CUSTOMER_HK
    )
)
SELECT * FROM records_to_insert

xts

view source:

Snowflake BigQuery SQLServer Databricks Postgres

Generates SQL to build an Extended Tracking Satellite table using the provided parameters.

Usage

1
2
3
{{ automate_dv.xts(src_pk=src_pk, src_satellite=src_satellite, 
                   src_extra_columns=src_extra_columns, src_ldts=src_ldts,
                   src_source=src_source, source_model=source_model) }}`

Parameters

Parameter Description Type Required?
src_pk Source primary key column String/List
src_satellite Dictionary of source satellite name column and hashdiff column Dictionary
src_extra_columns Select arbitrary columns from the source List[String]/String
src_ldts Source load date/timestamp column String
src_source Name of the column containing the source ID String/List
source_model Staging model name String/List

Tip

Read the tutorial for more details

Understanding the src_satellite parameter

Read More

Example Metadata

See examples

Example Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
WITH satellite_a AS (
    SELECT CUSTOMER_PK, HASHDIFF AS HASHDIFF, SATELLITE_NAME AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER
    WHERE CUSTOMER_PK IS NOT NULL
),
union_satellites AS (
    SELECT *
    FROM satellite_a
),
records_to_insert AS (
    SELECT DISTINCT union_satellites.*
    FROM union_satellites
    LEFT JOIN DBTVAULT.TEST.XTS AS d
        ON (union_satellites.HASHDIFF = d.HASHDIFF
        AND union_satellites.LOAD_DATE = d.LOAD_DATE
        AND union_satellites.SATELLITE_NAME = d.SATELLITE_NAME
        )
    WHERE d.HASHDIFF IS NULL
        AND d.LOAD_DATE IS NULL
        AND d.SATELLITE_NAME IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
WITH satellite_a AS (
    SELECT CUSTOMER_PK, HASHDIFF_1 AS HASHDIFF, SATELLITE_1 AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT
    WHERE CUSTOMER_PK IS NOT NULL
),
satellite_b AS (
    SELECT CUSTOMER_PK, HASHDIFF_2 AS HASHDIFF, SATELLITE_2 AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT
    WHERE CUSTOMER_PK IS NOT NULL
),
union_satellites AS (
    SELECT * FROM satellite_a
    UNION ALL
    SELECT * FROM satellite_b
),
records_to_insert AS (
    SELECT DISTINCT union_satellites.* FROM union_satellites
    LEFT JOIN DBTVAULT.TEST.XTS_2SAT AS d
        ON (union_satellites.HASHDIFF = d.HASHDIFF
        AND union_satellites.LOAD_DATE = d.LOAD_DATE
        AND union_satellites.SATELLITE_NAME = d.SATELLITE_NAME
    )
    WHERE d.HASHDIFF IS NULL
        AND d.LOAD_DATE IS NULL
        AND d.SATELLITE_NAME IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
WITH satellite_a AS (
    SELECT CUSTOMER_PK, HASHDIFF_1 AS HASHDIFF, SATELLITE_1 AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT_1
    WHERE CUSTOMER_PK IS NOT NULL
),
satellite_b AS (
    SELECT CUSTOMER_PK, HASHDIFF_2 AS HASHDIFF, SATELLITE_2 AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT_1
    WHERE CUSTOMER_PK IS NOT NULL
),
satellite_c AS (
    SELECT CUSTOMER_PK, HASHDIFF_1 AS HASHDIFF, SATELLITE_1 AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT_2
    WHERE CUSTOMER_PK IS NOT NULL
),
satellite_d AS (
    SELECT CUSTOMER_PK, HASHDIFF_2 AS HASHDIFF, SATELLITE_2 AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT_2
    WHERE CUSTOMER_PK IS NOT NULL
),
union_satellites AS (
    SELECT * FROM satellite_a
    UNION ALL
    SELECT * FROM satellite_b
    UNION ALL
    SELECT * FROM satellite_c
    UNION ALL
    SELECT * FROM satellite_d
),
records_to_insert AS (
    SELECT DISTINCT union_satellites.* FROM union_satellites
    LEFT JOIN DBTVAULT.TEST.XTS_2SAT AS d
        ON (union_satellites.HASHDIFF = d.HASHDIFF
            AND union_satellites.LOAD_DATE = d.LOAD_DATE
            AND union_satellites.SATELLITE_NAME = d.SATELLITE_NAME
        )
    WHERE d.HASHDIFF IS NULL
        AND d.LOAD_DATE IS NULL
        AND d.SATELLITE_NAME IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
WITH satellite_a AS (
    SELECT CUSTOMER_PK, HASHDIFF AS HASHDIFF, SATELLITE_NAME AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER
    WHERE CUSTOMER_PK IS NOT NULL
),
union_satellites AS (
    SELECT *
    FROM satellite_a
),
records_to_insert AS (
    SELECT DISTINCT union_satellites.*
    FROM union_satellites
    LEFT JOIN DBTVAULT.TEST.XTS AS d
        ON (union_satellites.HASHDIFF = d.HASHDIFF
        AND union_satellites.LOAD_DATE = d.LOAD_DATE
        AND union_satellites.SATELLITE_NAME = d.SATELLITE_NAME
        )
    WHERE d.HASHDIFF IS NULL
        AND d.LOAD_DATE IS NULL
        AND d.SATELLITE_NAME IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
WITH
satellite_a AS (
    SELECT s.CUSTOMER_PK, s.HASHDIFF_1 AS HASHDIFF, s.SATELLITE_1 AS SATELLITE_NAME, s.LOAD_DATE, s.SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT AS s
    WHERE s.CUSTOMER_PK IS NOT NULL
),
satellite_b AS (
    SELECT s.CUSTOMER_PK, s.HASHDIFF_2 AS HASHDIFF, s.SATELLITE_2 AS SATELLITE_NAME, s.LOAD_DATE, s.SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT AS s
    WHERE s.CUSTOMER_PK IS NOT NULL
),
union_satellites AS (
    SELECT * FROM satellite_a
    UNION ALL
    SELECT * FROM satellite_b
),
records_to_insert AS (
    SELECT DISTINCT union_satellites.* FROM union_satellites
    LEFT JOIN DBTVAULT.TEST.XTS_2SAT AS d
        ON (union_satellites.HASHDIFF = d.HASHDIFF
        AND union_satellites.LOAD_DATE = d.LOAD_DATE
        AND union_satellites.SATELLITE_NAME = d.SATELLITE_NAME
    )
    WHERE d.HASHDIFF IS NULL
    AND d.LOAD_DATE IS NULL
    AND d.SATELLITE_NAME IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
WITH
satellite_a AS (
    SELECT s.CUSTOMER_PK, s.HASHDIFF_1 AS HASHDIFF, s.SATELLITE_1 AS SATELLITE_NAME, s.LOAD_DATE, s.SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT_1 AS s
    WHERE s.CUSTOMER_PK IS NOT NULL
),
satellite_b AS (
    SELECT s.CUSTOMER_PK, s.HASHDIFF_2 AS HASHDIFF, s.SATELLITE_2 AS SATELLITE_NAME, s.LOAD_DATE, s.SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT_1 AS s
    WHERE s.CUSTOMER_PK IS NOT NULL
),
satellite_c AS (
    SELECT s.CUSTOMER_PK, s.HASHDIFF_1 AS HASHDIFF, s.SATELLITE_1 AS SATELLITE_NAME, s.LOAD_DATE, s.SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT AS s
    WHERE s.CUSTOMER_PK IS NOT NULL
),
satellite_d AS (
    SELECT s.CUSTOMER_PK, s.HASHDIFF_2 AS HASHDIFF, s.SATELLITE_2 AS SATELLITE_NAME, s.LOAD_DATE, s.SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT AS s
    WHERE s.CUSTOMER_PK IS NOT NULL
),
satellite_e AS (
    SELECT s.CUSTOMER_PK, s.HASHDIFF_1 AS HASHDIFF, s.SATELLITE_1 AS SATELLITE_NAME, s.LOAD_DATE, s.SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT_2 AS s
    WHERE s.CUSTOMER_PK IS NOT NULL
),
satellite_f AS (
    SELECT s.CUSTOMER_PK, s.HASHDIFF_2 AS HASHDIFF, s.SATELLITE_2 AS SATELLITE_NAME, s.LOAD_DATE, s.SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT_2 AS s
    WHERE s.CUSTOMER_PK IS NOT NULL
),
union_satellites AS (
    SELECT * FROM satellite_a
    UNION ALL
    SELECT * FROM satellite_b
    UNION ALL
    SELECT * FROM satellite_c
    UNION ALL
    SELECT * FROM satellite_d
    UNION ALL
    SELECT * FROM satellite_e
    UNION ALL
    SELECT * FROM satellite_f
),
records_to_insert AS (
    SELECT DISTINCT union_satellites.* FROM union_satellites
    LEFT JOIN DBTVAULT.TEST.XTS_2SAT AS d
        ON (union_satellites.HASHDIFF = d.HASHDIFF
        AND union_satellites.LOAD_DATE = d.LOAD_DATE
        AND union_satellites.SATELLITE_NAME = d.SATELLITE_NAME
        )
    WHERE d.HASHDIFF IS NULL
    AND d.LOAD_DATE IS NULL
    AND d.SATELLITE_NAME IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
WITH satellite_a AS (
    SELECT CUSTOMER_PK, HASHDIFF AS HASHDIFF, SATELLITE_NAME AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER
    WHERE CUSTOMER_PK IS NOT NULL
),
union_satellites AS (
    SELECT *
    FROM satellite_a
),
records_to_insert AS (
    SELECT DISTINCT union_satellites.*
    FROM union_satellites
    LEFT JOIN DBTVAULT.TEST.XTS AS d
        ON (union_satellites.HASHDIFF = d.HASHDIFF
        AND union_satellites.LOAD_DATE = d.LOAD_DATE
        AND union_satellites.SATELLITE_NAME = d.SATELLITE_NAME
        )
    WHERE d.HASHDIFF IS NULL
        AND d.LOAD_DATE IS NULL
        AND d.SATELLITE_NAME IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
WITH satellite_a AS (
    SELECT CUSTOMER_PK, HASHDIFF_1 AS HASHDIFF, SATELLITE_1 AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT
    WHERE CUSTOMER_PK IS NOT NULL
),
satellite_b AS (
    SELECT CUSTOMER_PK, HASHDIFF_2 AS HASHDIFF, SATELLITE_2 AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT
    WHERE CUSTOMER_PK IS NOT NULL
),
union_satellites AS (
    SELECT * FROM satellite_a
    UNION ALL
    SELECT * FROM satellite_b
),
records_to_insert AS (
    SELECT DISTINCT union_satellites.* FROM union_satellites
    LEFT JOIN DBTVAULT.TEST.XTS_2SAT AS d
        ON (union_satellites.HASHDIFF = d.HASHDIFF
        AND union_satellites.LOAD_DATE = d.LOAD_DATE
        AND union_satellites.SATELLITE_NAME = d.SATELLITE_NAME
        )
    WHERE d.HASHDIFF IS NULL
        AND d.LOAD_DATE IS NULL
        AND d.SATELLITE_NAME IS NULL
)
SELECT * FROM records_to_insert
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
WITH satellite_a AS (
    SELECT CUSTOMER_PK, HASHDIFF_1 AS HASHDIFF, SATELLITE_1 AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT_1
    WHERE CUSTOMER_PK IS NOT NULL
),
satellite_b AS (
    SELECT CUSTOMER_PK, HASHDIFF_2 AS HASHDIFF, SATELLITE_2 AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT_1
    WHERE CUSTOMER_PK IS NOT NULL
),
satellite_c AS (
    SELECT CUSTOMER_PK, HASHDIFF_1 AS HASHDIFF, SATELLITE_1 AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT_2
    WHERE CUSTOMER_PK IS NOT NULL
),
satellite_d AS (
    SELECT CUSTOMER_PK, HASHDIFF_2 AS HASHDIFF, SATELLITE_2 AS SATELLITE_NAME, LOAD_DATE, SOURCE
    FROM DBTVAULT.TEST.STG_CUSTOMER_2SAT_2
    WHERE CUSTOMER_PK IS NOT NULL
),
union_satellites AS (
    SELECT * FROM satellite_a
    UNION ALL
    SELECT * FROM satellite_b
    UNION ALL
    SELECT * FROM satellite_c
    UNION ALL
    SELECT * FROM satellite_d
),
records_to_insert AS (
    SELECT DISTINCT union_satellites.* FROM union_satellites
    LEFT JOIN DBTVAULT.TEST.XTS_2SAT AS d
        ON (union_satellites.HASHDIFF = d.HASHDIFF
            AND union_satellites.LOAD_DATE = d.LOAD_DATE
            AND union_satellites.SATELLITE_NAME = d.SATELLITE_NAME
        )
    WHERE d.HASHDIFF IS NULL
        AND d.LOAD_DATE IS NULL
        AND d.SATELLITE_NAME IS NULL
)
SELECT * FROM records_to_insert

Example Coming soon!

Example Coming soon!

pit

view source:

Snowflake BigQuery SQLServer Databricks Postgres

Generates SQL to build a Point-In-Time (PIT) table.

1
2
3
4
5
6
{{ automate_dv.pit(src_pk=src_pk, 
                   as_of_dates_table=as_of_dates_table,
                   satellites=satellites,
                   stage_tables_ldts=stage_tables_ldts,
                   src_ldts=src_ldts,
                   source_model=source_model) }}

Parameters

Parameter Description Type Required?
src_pk Source primary key column String
as_of_dates_table Name for the As of Date table String
satellites Dictionary of satellite reference mappings Mapping
stage_tables_ldts Dictionary of stage table reference mappings Mapping
src_ldts Source load date timestamp column String
source_model Hub model name String

Tip

Read the tutorial for more details

Example Metadata

See examples

Example Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
WITH as_of_dates AS (
    SELECT *
    FROM DBTVAULT.TEST.AS_OF_DATE AS a
),
new_rows_as_of_dates AS (
    SELECT
        a.CUSTOMER_PK,
        b.AS_OF_DATE
    FROM DBTVAULT.TEST.HUB_CUSTOMER AS a
    INNER JOIN as_of_dates AS b
    ON (1=1)
),
new_rows AS (
    SELECT
        a.CUSTOMER_PK,
        a.AS_OF_DATE,
        COALESCE(MAX(sat_customer_details_src.CUSTOMER_PK), CAST('0000000000000000' AS BINARY(16))) AS SAT_CUSTOMER_DETAILS_PK,
        COALESCE(MAX(sat_customer_details_src.LOAD_DATE), CAST('1900-01-01 00:00:00.000' AS timestamp_ntz)) AS SAT_CUSTOMER_DETAILS_LDTS,
        COALESCE(MAX(sat_customer_login_src.CUSTOMER_PK), CAST('0000000000000000' AS BINARY(16))) AS SAT_CUSTOMER_LOGIN_PK,
        COALESCE(MAX(sat_customer_login_src.LOAD_DATE), CAST('1900-01-01 00:00:00.000' AS timestamp_ntz)) AS SAT_CUSTOMER_LOGIN_LDTS,
        COALESCE(MAX(sat_customer_profile_src.CUSTOMER_PK), CAST('0000000000000000' AS BINARY(16))) AS SAT_CUSTOMER_PROFILE_PK,
        COALESCE(MAX(sat_customer_profile_src.LOAD_DATE), CAST('1900-01-01 00:00:00.000' AS timestamp_ntz)) AS SAT_CUSTOMER_PROFILE_LDTS
    FROM new_rows_as_of_dates AS a
    LEFT JOIN DBTVAULT.TEST.SAT_CUSTOMER_DETAILS AS sat_customer_details_src
        ON a.CUSTOMER_PK = sat_customer_details_src.CUSTOMER_PK
        AND sat_customer_details_src.LOAD_DATE <= a.AS_OF_DATE
    LEFT JOIN DBTVAULT.TEST.SAT_CUSTOMER_LOGIN AS sat_customer_login_src
        ON a.CUSTOMER_PK = sat_customer_login_src.CUSTOMER_PK
        AND sat_customer_login_src.LOAD_DATE <= a.AS_OF_DATE
    LEFT JOIN DBTVAULT.TEST.SAT_CUSTOMER_PROFILE AS sat_customer_profile_src
        ON a.CUSTOMER_PK = sat_customer_profile_src.CUSTOMER_PK
        AND sat_customer_profile_src.LOAD_DATE <= a.AS_OF_DATE
    GROUP BY
        a.CUSTOMER_PK, a.AS_OF_DATE
),
pit AS (
    SELECT * FROM new_rows
)
SELECT DISTINCT * FROM pit
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
WITH as_of_dates AS (
    SELECT *
    FROM DBTVAULT.TEST.AS_OF_DATE
),
last_safe_load_datetime AS (
    SELECT MIN(LOAD_DATETIME) AS LAST_SAFE_LOAD_DATETIME
    FROM (
        SELECT MIN(LOAD_DATE) AS LOAD_DATETIME FROM DBTVAULT.TEST.STG_CUSTOMER_DETAILS
        UNION ALL
        SELECT MIN(LOAD_DATE) AS LOAD_DATETIME FROM DBTVAULT.TEST.STG_CUSTOMER_LOGIN
        UNION ALL
        SELECT MIN(LOAD_DATE) AS LOAD_DATETIME FROM DBTVAULT.TEST.STG_CUSTOMER_PROFILE
    ) a
),
as_of_grain_old_entries AS (
    SELECT DISTINCT AS_OF_DATE
    FROM DBTVAULT.TEST.PIT_CUSTOMER
),
as_of_grain_lost_entries AS (
    SELECT a.AS_OF_DATE
    FROM as_of_grain_old_entries AS a
    LEFT OUTER JOIN as_of_dates AS b
        ON a.AS_OF_DATE = b.AS_OF_DATE
    WHERE b.AS_OF_DATE IS NULL
),
as_of_grain_new_entries AS (
    SELECT a.AS_OF_DATE
    FROM as_of_dates AS a
    LEFT OUTER JOIN as_of_grain_old_entries AS b
        ON a.AS_OF_DATE = b.AS_OF_DATE
    WHERE b.AS_OF_DATE IS NULL
),
min_date AS (
    SELECT min(AS_OF_DATE) AS MIN_DATE
    FROM as_of_dates
),
backfill_as_of AS (
    SELECT AS_OF_DATE
    FROM as_of_dates AS a
    WHERE a.AS_OF_DATE < (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime)
),
new_rows_pks AS (
    SELECT a.CUSTOMER_PK
    FROM DBTVAULT.TEST.HUB_CUSTOMER AS a
    WHERE a.LOAD_DATE >= (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime)
),
new_rows_as_of AS (
    SELECT AS_OF_DATE
    FROM as_of_dates AS a
    WHERE a.AS_OF_DATE >= (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime)
    UNION
    SELECT AS_OF_DATE
    FROM as_of_grain_new_entries
),
overlap AS (
    SELECT a.*
    FROM DBTVAULT.TEST.PIT_CUSTOMER AS a
    INNER JOIN DBTVAULT.TEST.HUB_CUSTOMER as b
        ON a.CUSTOMER_PK = b.CUSTOMER_PK
    WHERE a.AS_OF_DATE >= (SELECT MIN_DATE FROM min_date)
        AND a.AS_OF_DATE < (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime)
        AND a.AS_OF_DATE NOT IN (SELECT AS_OF_DATE FROM as_of_grain_lost_entries)
),
-- Back-fill any newly arrived hubs, set all historical pit dates to ghost records
backfill_rows_as_of_dates AS (
    SELECT
        a.CUSTOMER_PK,
        b.AS_OF_DATE
    FROM new_rows_pks AS a
    INNER JOIN backfill_as_of AS b
        ON (1=1 )
),
backfill AS (
    SELECT
        a.CUSTOMER_PK,
        a.AS_OF_DATE,
        CAST('0000000000000000' AS BINARY(16)) AS SAT_CUSTOMER_DETAILS_PK,
        CAST('1900-01-01 00:00:00.000' AS timestamp_ntz) AS SAT_CUSTOMER_DETAILS_LDTS,
        CAST('0000000000000000' AS BINARY(16)) AS SAT_CUSTOMER_LOGIN_PK,
        CAST('1900-01-01 00:00:00.000' AS timestamp_ntz) AS SAT_CUSTOMER_LOGIN_LDTS,
        CAST('0000000000000000' AS BINARY(16)) AS SAT_CUSTOMER_PROFILE_PK,
        CAST('1900-01-01 00:00:00.000' AS timestamp_ntz) AS SAT_CUSTOMER_PROFILE_LDTS
    FROM backfill_rows_as_of_dates AS a
    LEFT JOIN DBTVAULT.TEST.SAT_CUSTOMER_DETAILS AS sat_customer_details_src
        ON a.CUSTOMER_PK = sat_customer_details_src.CUSTOMER_PK
        AND sat_customer_details_src.LOAD_DATE <= a.AS_OF_DATE
    LEFT JOIN DBTVAULT.TEST.SAT_CUSTOMER_LOGIN AS sat_customer_login_src
        ON a.CUSTOMER_PK = sat_customer_login_src.CUSTOMER_PK
        AND sat_customer_login_src.LOAD_DATE <= a.AS_OF_DATE
    LEFT JOIN DBTVAULT.TEST.SAT_CUSTOMER_PROFILE AS sat_customer_profile_src
        ON a.CUSTOMER_PK = sat_customer_profile_src.CUSTOMER_PK
        AND sat_customer_profile_src.LOAD_DATE <= a.AS_OF_DATE
    GROUP BY
        a.CUSTOMER_PK, a.AS_OF_DATE
),
new_rows_as_of_dates AS (
    SELECT
        a.CUSTOMER_PK,
        b.AS_OF_DATE
    FROM DBTVAULT.TEST.HUB_CUSTOMER AS a
    INNER JOIN new_rows_as_of AS b
    ON (1=1)
),
new_rows AS (
    SELECT
        a.CUSTOMER_PK,
        a.AS_OF_DATE,
        COALESCE(MAX(sat_customer_details_src.CUSTOMER_PK), CAST('0000000000000000' AS BINARY(16))) AS SAT_CUSTOMER_DETAILS_PK,
        COALESCE(MAX(sat_customer_details_src.LOAD_DATE), CAST('1900-01-01 00:00:00.000' AS timestamp_ntz)) AS SAT_CUSTOMER_DETAILS_LDTS,
        COALESCE(MAX(sat_customer_login_src.CUSTOMER_PK), CAST('0000000000000000' AS BINARY(16))) AS SAT_CUSTOMER_LOGIN_PK,
        COALESCE(MAX(sat_customer_login_src.LOAD_DATE), CAST('1900-01-01 00:00:00.000' AS timestamp_ntz)) AS SAT_CUSTOMER_LOGIN_LDTS,
        COALESCE(MAX(sat_customer_profile_src.CUSTOMER_PK), CAST('0000000000000000' AS BINARY(16))) AS SAT_CUSTOMER_PROFILE_PK,
        COALESCE(MAX(sat_customer_profile_src.LOAD_DATE), CAST('1900-01-01 00:00:00.000' AS timestamp_ntz)) AS SAT_CUSTOMER_PROFILE_LDTS
    FROM new_rows_as_of_dates AS a
    LEFT JOIN DBTVAULT.TEST.SAT_CUSTOMER_DETAILS AS sat_customer_details_src
        ON a.CUSTOMER_PK = sat_customer_details_src.CUSTOMER_PK
        AND sat_customer_details_src.LOAD_DATE <= a.AS_OF_DATE
    LEFT JOIN DBTVAULT.TEST.SAT_CUSTOMER_LOGIN AS sat_customer_login_src
        ON a.CUSTOMER_PK = sat_customer_login_src.CUSTOMER_PK
        AND sat_customer_login_src.LOAD_DATE <= a.AS_OF_DATE
    LEFT JOIN DBTVAULT.TEST.SAT_CUSTOMER_PROFILE AS sat_customer_profile_src
        ON a.CUSTOMER_PK = sat_customer_profile_src.CUSTOMER_PK
        AND sat_customer_profile_src.LOAD_DATE <= a.AS_OF_DATE
    GROUP BY
        a.CUSTOMER_PK, a.AS_OF_DATE
),
pit AS (
    SELECT * FROM new_rows
    UNION ALL
    SELECT * FROM overlap
    UNION ALL
    SELECT * FROM backfill
)
SELECT DISTINCT * FROM pit

Example Coming soon!

Example Coming soon!

Example Coming soon!

Example Coming soon!

As Of Date Tables

An As of Date table contains a single column of dates (a date spine) used to construct the history in the PIT. A typical structure will contain a date range where the date interval will be short, such as every day or every hour, followed by a period of time after which the date intervals are slightly larger.

An example history could be end of day values for 3 months followed by another 3 months of end of week values. The As of Date table would then contain a datetime for each entry to match this.

As the days pass, the As of Dates should change to reflect this with dates being removed off the end and new dates added.

If we use the 3-month example from before, and a week had passed since when we had created the As of Date table, then it would still contain 3 months worth of end of day values followed by 3 months of end of week values but shifted a week forward to reflect the current date.

Think of As of Date tables as essentially a rolling window of time.

Note

At the current release of AutomateDV there is no functionality that auto generates this table for you, so you will have to supply this yourself. For further information, please check the tutorial page.

Another caveat is that even though the As of Date table can take any name, you need to make sure it's defined accordingly in the as_of_dates_table metadata parameter (see the metadata section for PITs). The column name in the As of Date table is currently defaulted to 'AS_OF_DATE' and it cannot be changed.


bridge

view source:

Snowflake BigQuery SQLServer Databricks Postgres

Generates SQL to build a simple Bridge table, starting from a Hub and 'walking' through one or more associated Links ( and their Effectivity Satellites), using the provided parameters.

For the current version, Effectivity Satellite auto end dating must be enabled.

Usage

1
2
3
4
5
{{ automate_dv.bridge(source_model=source_model, src_pk=src_pk,
                      src_ldts=src_ldts,
                      bridge_walk=bridge_walk,
                      as_of_dates_table=as_of_dates_table,
                      stage_tables_ldts=stage_tables_ldts) }}

Parameters

Parameter Description Type Required?
source_model Starting Hub model name String
src_pk Starting Hub primary key column String
src_ldts Starting Hub load date timestamp String
bridge_walk Dictionary of bridge reference mappings Mapping
as_of_dates_table Name for the As of Date table String
stage_tables_ldts Dictionary of stage table reference mappings and their load date timestamps Mapping

Tip

Read the tutorial for more details

Example Metadata

See examples

Example Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
WITH as_of AS (
     SELECT a.AS_OF_DATE
     FROM DBTVAULT_DEV.TEST.AS_OF_DATE AS a
     WHERE a.AS_OF_DATE <= CURRENT_DATE()
),
new_rows AS (
    SELECT
        a.CUSTOMER_PK,
        b.AS_OF_DATE,LINK_CUSTOMER_ORDER.CUSTOMER_ORDER_PK AS LINK_CUSTOMER_ORDER_PK
                    ,EFF_SAT_CUSTOMER_ORDER.END_DATE AS EFF_SAT_CUSTOMER_ORDER_ENDDATE
                    ,EFF_SAT_CUSTOMER_ORDER.LOAD_DATETIME AS EFF_SAT_CUSTOMER_ORDER_LOADDATE
    FROM DBTVAULT_DEV.TEST.HUB_CUSTOMER AS a
    INNER JOIN AS_OF AS b
        ON (1=1)
    LEFT JOIN DBTVAULT_DEV.TEST.LINK_CUSTOMER_ORDER AS LINK_CUSTOMER_ORDER
        ON a.CUSTOMER_PK = LINK_CUSTOMER_ORDER.CUSTOMER_FK
    INNER JOIN DBTVAULT_DEV.TEST.EFF_SAT_CUSTOMER_ORDER AS EFF_SAT_CUSTOMER_ORDER
        ON EFF_SAT_CUSTOMER_ORDER.CUSTOMER_ORDER_PK = LINK_CUSTOMER_ORDER.CUSTOMER_ORDER_PK
        AND EFF_SAT_CUSTOMER_ORDER.LOAD_DATETIME <= b.AS_OF_DATE
),
all_rows AS (
    SELECT * FROM new_rows
),
candidate_rows AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY AS_OF_DATE,
                LINK_CUSTOMER_ORDER_PK
            ORDER BY
                EFF_SAT_CUSTOMER_ORDER_LOADDATE DESC
            ) AS row_num
    FROM all_rows
    QUALIFY row_num = 1
),
bridge AS (
    SELECT
        c.CUSTOMER_PK,
        c.AS_OF_DATE,c.LINK_CUSTOMER_ORDER_PK
    FROM candidate_rows AS c
    WHERE TO_DATE(c.EFF_SAT_CUSTOMER_ORDER_ENDDATE) = TO_DATE('9999-12-31 23:59:59.999999')
)
SELECT * FROM bridge
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
WITH as_of AS (
     SELECT a.AS_OF_DATE
     FROM DBTVAULT_DEV.TEST.AS_OF_DATE AS a
     WHERE a.AS_OF_DATE <= CURRENT_DATE()
),
last_safe_load_datetime AS (
    SELECT MIN(LOAD_DATETIME) AS LAST_SAFE_LOAD_DATETIME
    FROM (SELECT MIN(LOAD_DATETIME) AS LOAD_DATETIME FROM DBTVAULT_DEV.TEST.STG_CUSTOMER_ORDER
            ) AS l
),
as_of_grain_old_entries AS (
    SELECT DISTINCT AS_OF_DATE
    FROM DBTVAULT_DEV.TEST.BRIDGE_CUSTOMER_ORDER
),
as_of_grain_lost_entries AS (
    SELECT a.AS_OF_DATE
    FROM as_of_grain_old_entries AS a
    LEFT OUTER JOIN as_of AS b
        ON a.AS_OF_DATE = b.AS_OF_DATE
    WHERE b.AS_OF_DATE IS NULL
),
as_of_grain_new_entries AS (
    SELECT a.AS_OF_DATE
    FROM as_of AS a
    LEFT OUTER JOIN as_of_grain_old_entries AS b
        ON a.AS_OF_DATE = b.AS_OF_DATE
    WHERE b.AS_OF_DATE IS NULL
),
min_date AS (
    SELECT min(AS_OF_DATE) AS MIN_DATE
    FROM as_of
),
new_rows_pks AS (
    SELECT h.CUSTOMER_PK
    FROM DBTVAULT_DEV.TEST.HUB_CUSTOMER AS h
    WHERE h.LOAD_DATETIME >= (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime)
),
new_rows_as_of AS (
    SELECT AS_OF_DATE
    FROM as_of
    WHERE as_of.AS_OF_DATE >= (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime)
    UNION
    SELECT as_of_date
    FROM as_of_grain_new_entries
),
overlap_pks AS (
    SELECT p.CUSTOMER_PK
    FROM DBTVAULT_DEV.TEST.BRIDGE_CUSTOMER_ORDER AS p
    INNER JOIN DBTVAULT_DEV.TEST.HUB_CUSTOMER as h
        ON p.CUSTOMER_PK = h.CUSTOMER_PK
    WHERE p.AS_OF_DATE >= (SELECT MIN_DATE FROM min_date)
        AND p.AS_OF_DATE < (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime)
        AND p.AS_OF_DATE NOT IN (SELECT AS_OF_DATE FROM as_of_grain_lost_entries)
),
overlap_as_of AS (
    SELECT AS_OF_DATE
    FROM as_of AS p
    WHERE p.AS_OF_DATE >= (SELECT MIN_DATE FROM min_date)
        AND p.AS_OF_DATE < (SELECT LAST_SAFE_LOAD_DATETIME FROM last_safe_load_datetime)
        AND p.AS_OF_DATE NOT IN (SELECT AS_OF_DATE FROM as_of_grain_lost_entries)
),
overlap AS (
    SELECT
        a.CUSTOMER_PK,
        b.AS_OF_DATE
                    ,LINK_CUSTOMER_ORDER.CUSTOMER_ORDER_PK AS LINK_CUSTOMER_ORDER_PK
                    ,EFF_SAT_CUSTOMER_ORDER.END_DATE AS EFF_SAT_CUSTOMER_ORDER_ENDDATE
                    ,EFF_SAT_CUSTOMER_ORDER.LOAD_DATETIME AS EFF_SAT_CUSTOMER_ORDER_LOADDATE
    FROM overlap_pks AS a
    INNER JOIN overlap_as_of AS b
        ON (1=1)
    LEFT JOIN DBTVAULT_DEV.TEST.LINK_CUSTOMER_ORDER AS LINK_CUSTOMER_ORDER
        ON a.CUSTOMER_PK = LINK_CUSTOMER_ORDER.CUSTOMER_FK
    INNER JOIN DBTVAULT_DEV.TEST.EFF_SAT_CUSTOMER_ORDER AS EFF_SAT_CUSTOMER_ORDER
        ON EFF_SAT_CUSTOMER_ORDER.CUSTOMER_ORDER_PK = LINK_CUSTOMER_ORDER.CUSTOMER_ORDER_PK
        AND EFF_SAT_CUSTOMER_ORDER.LOAD_DATETIME <= b.AS_OF_DATE
),
new_rows AS (
    SELECT
        a.CUSTOMER_PK,
        b.AS_OF_DATE,LINK_CUSTOMER_ORDER.CUSTOMER_ORDER_PK AS LINK_CUSTOMER_ORDER_PK
                    ,EFF_SAT_CUSTOMER_ORDER.END_DATE AS EFF_SAT_CUSTOMER_ORDER_ENDDATE
                    ,EFF_SAT_CUSTOMER_ORDER.LOAD_DATETIME AS EFF_SAT_CUSTOMER_ORDER_LOADDATE
    FROM DBTVAULT_DEV.TEST.HUB_CUSTOMER AS a
    INNER JOIN NEW_ROWS_AS_OF AS b
        ON (1=1)
    LEFT JOIN DBTVAULT_DEV.TEST.LINK_CUSTOMER_ORDER AS LINK_CUSTOMER_ORDER
        ON a.CUSTOMER_PK = LINK_CUSTOMER_ORDER.CUSTOMER_FK
    INNER JOIN DBTVAULT_DEV.TEST.EFF_SAT_CUSTOMER_ORDER AS EFF_SAT_CUSTOMER_ORDER
        ON EFF_SAT_CUSTOMER_ORDER.CUSTOMER_ORDER_PK = LINK_CUSTOMER_ORDER.CUSTOMER_ORDER_PK
        AND EFF_SAT_CUSTOMER_ORDER.LOAD_DATETIME <= b.AS_OF_DATE
),
all_rows AS (
    SELECT * FROM new_rows
    UNION ALL
    SELECT * FROM overlap
),
candidate_rows AS (
    SELECT *,
        ROW_NUMBER() OVER (
            PARTITION BY AS_OF_DATE,
                LINK_CUSTOMER_ORDER_PK
            ORDER BY
                EFF_SAT_CUSTOMER_ORDER_LOADDATE DESC
            ) AS row_num
    FROM all_rows
    QUALIFY row_num = 1
),
bridge AS (
    SELECT
        c.CUSTOMER_PK,
        c.AS_OF_DATE,c.LINK_CUSTOMER_ORDER_PK
    FROM candidate_rows AS c
    WHERE TO_DATE(c.EFF_SAT_CUSTOMER_ORDER_ENDDATE) = TO_DATE('9999-12-31 23:59:59.999999')
)
SELECT * FROM bridge

Example Coming soon!

Example Coming soon!

Example Coming soon!

Example Coming soon!

As Of Date Table Structures

An As of Date table contains a single column of dates used to construct the history in the Bridge table.

Note

At the current release of AutomateDV there is no functionality that auto generates this table for you, so you will have to supply this yourself. For further information, please check the tutorial page.

Another caveat is that even though the As of Date table can take any name, you need to make sure it's defined accordingly in the as_of_dates_table metadata parameter (see the metadata section for Bridges). The column name in the As of Date table is currently defaulted to 'AS_OF_DATE' and it cannot be changed.


ref_table

view source:

Snowflake BigQuery SQLServer Databricks Postgres

Generates SQL to build a Reference table using the provided parameters.

Usage

1
2
3
4
{{ automate_dv.ref_table(src_pk=src_pk, src_ldts=src_ldts,
                         src_extra_columns=src_extra_columns,
                         src_ldts=src_ldts, src_source=src_source,
                         source_model=source_model) }}

Parameters

Parameter Description Type Required?
src_pk Source primary key column String
src_extra_columns Select arbitrary columns from the source List[String]/String
src_ldts Source load date timestamp column String
src_source Name of the column containing the source ID String
source_model Staging model name String

Tip

Read the tutorial for more details

Example Metadata

See examples

Example Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
WITH to_insert AS (
    SELECT DISTINCT
    a.DATE_PK, a.YEAR, a.MONTH, a.DAY, a.DAY_OF_WEEK
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.REF_DATE AS a
    WHERE a.DATE_PK IS NOT NULL
),

non_historized AS (
    SELECT
    a.DATE_PK, a.YEAR, a.MONTH, a.DAY, a.DAY_OF_WEEK
    FROM to_insert AS a
)

SELECT * FROM non_historized
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
WITH to_insert AS (
    SELECT DISTINCT
    a.DATE_PK, a.YEAR, a.MONTH, a.DAY, a.DAY_OF_WEEK
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.REF_DATE AS a
    WHERE a.DATE_PK IS NOT NULL
),

non_historized AS (
    SELECT
    a.DATE_PK, a.YEAR, a.MONTH, a.DAY, a.DAY_OF_WEEK
    FROM to_insert AS a
    LEFT JOIN ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.REF_DATE AS d
    ON a.DATE_PK = d.DATE_PK
    WHERE d.DATE_PK IS NULL
)

SELECT * FROM non_historized

Staging Macros

(macros/staging)

These macros are intended for use in the staging layer.

In AutomateDV, we call this staging layer "primed staging" as we are preparing or 'priming' the data ready for use in the raw vault. It is important to understand that according to Data Vault 2.0 standards, the primed stages is essentially where all of our hard business rules are defined. We are not excessively transforming the data beyond what is reasonable prior to the raw stage, but simply creating some columns to drive audit and performance downstream.


stage

(view source)

Generates SQL to build a staging area using the provided parameters.

Usage

1
2
3
4
5
6
{{ automate_dv.stage(include_source_columns=true,
                     source_model=source_model,
                     derived_columns=derived_columns,
                     null_columns=null_columns,
                     hashed_columns=hashed_columns,
                     ranked_columns=ranked_columns) }}

Parameters

Parameter Description Type Default Required?
include_source_columns If true, select all columns in the source_model Boolean true
source_model Staging model name Mapping N/A
derived_columns Mappings of column names and their value Mapping none
null_columns Mappings of columns for which null business keys should be replaced Mapping none
hashed_columns Mappings of hashes to their component columns Mapping none
ranked_columns Mappings of ranked columns names to their order by and partition by columns Mapping none
Video Tutorial

Tip

Read the tutorial for more details

Example Metadata

See examples

Example Output

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
WITH source_data AS (
    SELECT
    C_CUSTKEY,
    C_NAME,
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.CUSTOMER
),
derived_columns AS (
    SELECT
    C_CUSTKEY,
    C_NAME,
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT,
    C_CUSTKEY AS CUSTOMER_ID,
    '1998-01-01' AS LOAD_DATETIME,
    'TPCH_CUSTOMER' AS RECORD_SOURCE
    FROM source_data
),
null_columns AS (
    SELECT
    C_CUSTKEY,
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID AS CUSTOMER_ID_ORIGINAL,
        IFNULL(CUSTOMER_ID, '-1') AS CUSTOMER_ID,
    C_NAME AS C_NAME_ORIGINAL,
        IFNULL(C_NAME, '-2') AS C_NAME
    FROM derived_columns
),
hashed_columns AS (
    SELECT
    C_CUSTKEY,
    C_NAME,
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT,
    CUSTOMER_ID,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID_ORIGINAL,
    C_NAME_ORIGINAL,
    CAST(MD5_BINARY(NULLIF(UPPER(TRIM(CAST(C_CUSTKEY AS VARCHAR))), '')) AS BINARY(16)) AS CUSTOMER_HK
    FROM null_columns
),
ranked_columns AS (
    SELECT *,
    RANK() OVER (PARTITION BY CUSTOMER_HK ORDER BY LOAD_DATETIME) AS AUTOMATE_DV_RANK
    FROM hashed_columns
),
columns_to_select AS (
    SELECT
    C_CUSTKEY,
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT,
    CUSTOMER_ID,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID_ORIGINAL,
    C_NAME,
    C_NAME_ORIGINAL,
    CUSTOMER_HK,
    AUTOMATE_DV_RANK
    FROM ranked_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
WITH source_data AS (
    SELECT
    C_CUSTKEY,
    C_NAME,
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.CUSTOMER
),
columns_to_select AS (
    SELECT
    C_CUSTKEY,
    C_NAME,
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT
    FROM source_data
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
WITH source_data AS (
    SELECT
    C_CUSTKEY,
    C_NAME,
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.CUSTOMER
),
null_columns AS (
    SELECT
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT,
    C_CUSTKEY AS C_CUSTKEY_ORIGINAL,
        IFNULL(C_CUSTKEY, '-1') AS C_CUSTKEY,
    C_NAME AS C_NAME_ORIGINAL,
        IFNULL(C_NAME, '-2') AS C_NAME
    FROM source_data
),
columns_to_select AS (
    SELECT
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT,
    C_CUSTKEY,
    C_CUSTKEY_ORIGINAL,
    C_NAME,
    C_NAME_ORIGINAL
    FROM null_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
WITH source_data AS (
    SELECT
    C_CUSTKEY,
    C_NAME,
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.CUSTOMER
),
hashed_columns AS (
    SELECT
    C_CUSTKEY,
    C_NAME,
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT,
    CAST(MD5_BINARY(NULLIF(UPPER(TRIM(CAST(C_CUSTKEY AS VARCHAR))), '')) AS BINARY(16)) AS CUSTOMER_HK
    FROM source_data
),
columns_to_select AS (
    SELECT
    C_CUSTKEY,
    C_NAME,
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT,
    CUSTOMER_HK
    FROM hashed_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
WITH source_data AS (
    SELECT
    C_CUSTKEY,
    C_NAME,
    C_ADDRESS,
    C_NATIONKEY,
    C_PHONE,
    C_ACCTBAL,
    C_MKTSEGMENT,
    C_COMMENT
    FROM ALEX_HIGGS.AUTOMATE_DV_DOCS_SAMPLES.CUSTOMER
),
ranked_columns AS (
    SELECT *,
    RANK() OVER (PARTITION BY C_CUSTKEY ORDER BY C_NATIONKEY) AS AUTOMATE_DV_RANK
    FROM source_data
),
columns_to_select AS (
    SELECT
    AUTOMATE_DV_RANK
    FROM ranked_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM `dbtvault-341416`.`dbtvault`.`CUSTOMER`
),
derived_columns AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    C_CUSTKEY AS CUSTOMER_ID,
    '1998-01-01' AS LOAD_DATETIME,
    'TPCH_CUSTOMER' AS RECORD_SOURCE
    FROM source_data
),
null_columns AS (
    SELECT
    c_custkey,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID AS CUSTOMER_ID_ORIGINAL,
        IFNULL(CUSTOMER_ID, '-1') AS CUSTOMER_ID,
    C_NAME AS C_NAME_ORIGINAL,
        IFNULL(C_NAME, '-2') AS C_NAME
    FROM derived_columns
),
hashed_columns AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CUSTOMER_ID,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID_ORIGINAL,
    C_NAME_ORIGINAL,
    CAST(UPPER(TO_HEX(MD5(NULLIF(UPPER(TRIM(CAST(C_CUSTKEY AS STRING))), '')))) AS STRING) AS CUSTOMER_HK
    FROM null_columns
),
ranked_columns AS (
    SELECT *,
    RANK() OVER (PARTITION BY CUSTOMER_HK ORDER BY LOAD_DATETIME) AS AUTOMATE_DV_RANK
    FROM hashed_columns
),
columns_to_select AS (
    SELECT
    c_custkey,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CUSTOMER_ID,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID_ORIGINAL,
    C_NAME,
    C_NAME_ORIGINAL,
    CUSTOMER_HK,
    AUTOMATE_DV_RANK
    FROM ranked_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM `dbtvault-341416`.`dbtvault`.`CUSTOMER`
),
columns_to_select AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM source_data
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM `dbtvault-341416`.`dbtvault`.`CUSTOMER`
),
null_columns AS (
    SELECT
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    C_CUSTKEY AS C_CUSTKEY_ORIGINAL,
        IFNULL(C_CUSTKEY, '-1') AS C_CUSTKEY,
    C_NAME AS C_NAME_ORIGINAL,
        IFNULL(C_NAME, '-2') AS C_NAME
    FROM source_data
),
columns_to_select AS (
    SELECT
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    C_CUSTKEY,
    C_CUSTKEY_ORIGINAL,
    C_NAME,
    C_NAME_ORIGINAL
    FROM null_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM `dbtvault-341416`.`dbtvault`.`CUSTOMER`
),
hashed_columns AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CAST(UPPER(TO_HEX(MD5(NULLIF(UPPER(TRIM(CAST(C_CUSTKEY AS STRING))), '')))) AS STRING) AS CUSTOMER_HK
    FROM source_data
),
columns_to_select AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CUSTOMER_HK
    FROM hashed_columns
)
SELECT * FROM columns_to_select

``sql WITH source_data AS ( SELECT c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment FROMdbtvault-341416.dbtvault.CUSTOMER` ), ranked_columns AS ( SELECT *, RANK() OVER (PARTITION BY C_CUSTKEY ORDER BY C_NATIONKEY) AS AUTOMATE_DV_RANK FROM source_data ), columns_to_select AS ( SELECT AUTOMATE_DV_RANK FROM ranked_columns ) SELECT * FROM columns_to_select

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM "AUTOMATE_DV_TEST"."TEST"."CUSTOMER"
),
derived_columns AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    C_CUSTKEY AS CUSTOMER_ID,
    '1998-01-01' AS LOAD_DATETIME,
    'TPCH_CUSTOMER' AS RECORD_SOURCE
    FROM source_data
),
null_columns AS (
    SELECT
    c_custkey,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID AS CUSTOMER_ID_ORIGINAL,
        ISNULL(CUSTOMER_ID, '-1') AS CUSTOMER_ID,
    C_NAME AS C_NAME_ORIGINAL,
        ISNULL(C_NAME, '-2') AS C_NAME
    FROM derived_columns
),
hashed_columns AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CUSTOMER_ID,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID_ORIGINAL,
    C_NAME_ORIGINAL,
    CONVERT(BINARY(16), HASHBYTES('MD5', NULLIF(UPPER(TRIM(CAST(C_CUSTKEY AS VARCHAR(MAX)))), '')), 2) AS CUSTOMER_HK
    FROM null_columns
),
ranked_columns AS (
    SELECT *,
    RANK() OVER (PARTITION BY CUSTOMER_HK ORDER BY LOAD_DATETIME) AS AUTOMATE_DV_RANK
    FROM hashed_columns
),
columns_to_select AS (
    SELECT
    c_custkey,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CUSTOMER_ID,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID_ORIGINAL,
    C_NAME,
    C_NAME_ORIGINAL,
    CUSTOMER_HK,
    AUTOMATE_DV_RANK
    FROM ranked_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM "AUTOMATE_DV_TEST"."TEST"."CUSTOMER"
),
columns_to_select AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM source_data
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM "AUTOMATE_DV_TEST"."TEST"."CUSTOMER"
),
null_columns AS (
    SELECT
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    C_CUSTKEY AS C_CUSTKEY_ORIGINAL,
        ISNULL(C_CUSTKEY, '-1') AS C_CUSTKEY,
    C_NAME AS C_NAME_ORIGINAL,
        ISNULL(C_NAME, '-2') AS C_NAME
    FROM source_data
),
columns_to_select AS (
    SELECT
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    C_CUSTKEY,
    C_CUSTKEY_ORIGINAL,
    C_NAME,
    C_NAME_ORIGINAL
    FROM null_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM "AUTOMATE_DV_TEST"."TEST"."CUSTOMER"
),
hashed_columns AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CONVERT(BINARY(16), HASHBYTES('MD5', NULLIF(UPPER(TRIM(CAST(C_CUSTKEY AS VARCHAR(MAX)))), '')), 2) AS CUSTOMER_HK
    FROM source_data
),
columns_to_select AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CUSTOMER_HK
    FROM hashed_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM "AUTOMATE_DV_TEST"."TEST"."CUSTOMER"
),
ranked_columns AS (
    SELECT *,
    RANK() OVER (PARTITION BY C_CUSTKEY ORDER BY C_NATIONKEY) AS AUTOMATE_DV_RANK
    FROM source_data
),
columns_to_select AS (
    SELECT
    AUTOMATE_DV_RANK
    FROM ranked_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM "dbtvault_db"."development"."CUSTOMER"
),
derived_columns AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    C_CUSTKEY AS CUSTOMER_ID,
    '1998-01-01' AS LOAD_DATETIME,
    'TPCH_CUSTOMER' AS RECORD_SOURCE
    FROM source_data
),
null_columns AS (
    SELECT
    c_custkey,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID AS CUSTOMER_ID_ORIGINAL,
        COALESCE(CUSTOMER_ID, '-1') AS CUSTOMER_ID,
    C_NAME AS C_NAME_ORIGINAL,
        COALESCE(C_NAME, '-2') AS C_NAME
    FROM derived_columns
),
hashed_columns AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CUSTOMER_ID,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID_ORIGINAL,
    C_NAME_ORIGINAL,
    DECODE(MD5(NULLIF(UPPER(TRIM(CAST(C_CUSTKEY AS VARCHAR))), '')), 'hex') AS CUSTOMER_HK
    FROM null_columns
),
ranked_columns AS (
    SELECT *,
    RANK() OVER (PARTITION BY CUSTOMER_HK ORDER BY LOAD_DATETIME) AS AUTOMATE_DV_RANK
    FROM hashed_columns
),
columns_to_select AS (
    SELECT
    c_custkey,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CUSTOMER_ID,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID_ORIGINAL,
    C_NAME,
    C_NAME_ORIGINAL,
    CUSTOMER_HK,
    AUTOMATE_DV_RANK
    FROM ranked_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM "dbtvault_db"."development"."CUSTOMER"
),
columns_to_select AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM source_data
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM "dbtvault_db"."development"."CUSTOMER"
),
null_columns AS (
    SELECT
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    C_CUSTKEY AS C_CUSTKEY_ORIGINAL,
        COALESCE(C_CUSTKEY, '-1') AS C_CUSTKEY,
    C_NAME AS C_NAME_ORIGINAL,
        COALESCE(C_NAME, '-2') AS C_NAME
    FROM source_data
),
columns_to_select AS (
    SELECT
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    C_CUSTKEY,
    C_CUSTKEY_ORIGINAL,
    C_NAME,
    C_NAME_ORIGINAL
    FROM null_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM "dbtvault_db"."development"."CUSTOMER"
),
hashed_columns AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    DECODE(MD5(NULLIF(UPPER(TRIM(CAST(C_CUSTKEY AS VARCHAR))), '')), 'hex') AS CUSTOMER_HK
    FROM source_data
),
columns_to_select AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CUSTOMER_HK
    FROM hashed_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM "dbtvault_db"."development"."CUSTOMER"
),
ranked_columns AS (
    SELECT *,
    RANK() OVER (PARTITION BY C_CUSTKEY ORDER BY C_NATIONKEY) AS AUTOMATE_DV_RANK
    FROM source_data
),
columns_to_select AS (
    SELECT
    AUTOMATE_DV_RANK
    FROM ranked_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM `hive_metastore`.`dbtvault`.`CUSTOMER`
),
derived_columns AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    C_CUSTKEY AS CUSTOMER_ID,
    '1998-01-01' AS LOAD_DATETIME,
    'TPCH_CUSTOMER' AS RECORD_SOURCE
    FROM source_data
),
null_columns AS (
    SELECT
    c_custkey,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID AS CUSTOMER_ID_ORIGINAL,
        IFNULL(CUSTOMER_ID, '-1') AS CUSTOMER_ID,
    C_NAME AS C_NAME_ORIGINAL,
        IFNULL(C_NAME, '-2') AS C_NAME
    FROM derived_columns
),
hashed_columns AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CUSTOMER_ID,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID_ORIGINAL,
    C_NAME_ORIGINAL,
    CAST(UPPER(MD5(NULLIF(UPPER(TRIM(CAST(C_CUSTKEY AS VARCHAR(16)))), ''))) AS STRING) AS CUSTOMER_HK
    FROM null_columns
),
ranked_columns AS (
    SELECT *,
    RANK() OVER (PARTITION BY CUSTOMER_HK ORDER BY LOAD_DATETIME) AS AUTOMATE_DV_RANK
    FROM hashed_columns
),
columns_to_select AS (
    SELECT
    c_custkey,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CUSTOMER_ID,
    LOAD_DATETIME,
    RECORD_SOURCE,
    CUSTOMER_ID_ORIGINAL,
    C_NAME,
    C_NAME_ORIGINAL,
    CUSTOMER_HK,
    AUTOMATE_DV_RANK
    FROM ranked_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM `hive_metastore`.`dbtvault`.`CUSTOMER`
),
columns_to_select AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM source_data
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM `hive_metastore`.`dbtvault`.`CUSTOMER`
),
null_columns AS (
    SELECT
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    C_CUSTKEY AS C_CUSTKEY_ORIGINAL,
        IFNULL(C_CUSTKEY, '-1') AS C_CUSTKEY,
    C_NAME AS C_NAME_ORIGINAL,
        IFNULL(C_NAME, '-2') AS C_NAME
    FROM source_data
),
columns_to_select AS (
    SELECT
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    C_CUSTKEY,
    C_CUSTKEY_ORIGINAL,
    C_NAME,
    C_NAME_ORIGINAL
    FROM null_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM `hive_metastore`.`dbtvault`.`CUSTOMER`
),
hashed_columns AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CAST(UPPER(MD5(NULLIF(UPPER(TRIM(CAST(C_CUSTKEY AS VARCHAR(16)))), ''))) AS STRING) AS CUSTOMER_HK
    FROM source_data
),
columns_to_select AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment,
    CUSTOMER_HK
    FROM hashed_columns
)
SELECT * FROM columns_to_select
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
WITH source_data AS (
    SELECT
    c_custkey,
    c_name,
    c_address,
    c_nationkey,
    c_phone,
    c_acctbal,
    c_mktsegment,
    c_comment
    FROM `hive_metastore`.`dbtvault`.`CUSTOMER`
),
ranked_columns AS (
    SELECT *,
    RANK() OVER (PARTITION BY C_CUSTKEY ORDER BY C_NATIONKEY) AS AUTOMATE_DV_RANK
    FROM source_data
),
columns_to_select AS (
    SELECT
    AUTOMATE_DV_RANK
    FROM ranked_columns
)
SELECT * FROM columns_to_select

Stage Macro Configurations

Stage Macro In-Depth

Supporting Macros

(macros/supporting)

Supporting macros are helper functions for use in models. It should not be necessary to call these macros directly, however they are used extensively in the table templates and may be used for your own purposes if you wish.


hash (macro)

(view source)

Warning

This macro should not be used for cryptographic purposes. The intended use is for creating checksum-like values only, so that we may compare records consistently.
Read More

See Also

A macro for generating hashing SQL for columns.

Usage

1
2
{{ automate_dv.hash('CUSTOMERKEY', 'CUSTOMER_HK') }},
{{ automate_dv.hash(['CUSTOMERKEY', 'PHONE', 'DOB', 'NAME'], 'HASHDIFF', true) }}
1
2
3
4
5
CAST(MD5_BINARY(CONCAT_WS('||',
IFNULL(NULLIF(UPPER(TRIM(CAST(CUSTOMER_ID AS VARCHAR))), ''), '^^'),
IFNULL(NULLIF(UPPER(TRIM(CAST(DOB AS VARCHAR))), ''), '^^'),
IFNULL(NULLIF(UPPER(TRIM(CAST(PHONE AS VARCHAR))), ''), '^^')
)) AS BINARY(16)) AS HASHDIFF
1
2
3
4
5
CAST(SHA2_BINARY(CONCAT_WS('||',
IFNULL(NULLIF(UPPER(TRIM(CAST(CUSTOMER_ID AS VARCHAR))), ''), '^^'),
IFNULL(NULLIF(UPPER(TRIM(CAST(DOB AS VARCHAR))), ''), '^^'), 
IFNULL(NULLIF(UPPER(TRIM(CAST(PHONE AS VARCHAR))), ''), '^^')
)) AS BINARY(32)) AS HASHDIFF
1
2
3
4
5
CAST(HASHBYTES('MD5', (CONCAT_WS('||',
ISNULL(NULLIF(UPPER(TRIM(CAST(CUSTOMER_ID AS VARCHAR(max)))), ''), '^^'),
ISNULL(NULLIF(UPPER(TRIM(CAST(DOB AS VARCHAR(max)))), ''), '^^'),
ISNULL(NULLIF(UPPER(TRIM(CAST(PHONE AS VARCHAR(max)))), ''), '^^')
)) AS BINARY(16)) AS HASHDIFF
1
2
3
4
5
CAST(HASHBYTES('SHA2_256', (CONCAT_WS('||',
ISNULL(NULLIF(UPPER(TRIM(CAST(CUSTOMER_ID AS VARCHAR(max)))), ''), '^^'),
ISNULL(NULLIF(UPPER(TRIM(CAST(DOB AS VARCHAR(max)))), ''), '^^'), 
ISNULL(NULLIF(UPPER(TRIM(CAST(PHONE AS VARCHAR(max)))), ''), '^^')
)) AS BINARY(32)) AS HASHDIFF

Tip

The hash_columns macro can be used to simplify the hashing process and generate multiple hashes with one macro.

Parameters

Parameter Description Type Required?
columns Columns to hash on List[String]/String
alias The name to give the hashed column String
is_hashdiff Will alpha sort columns if true, default false. Boolean

prefix

(view source)

A macro for quickly prefixing a list of columns with a string.

Parameters

Parameter Description Type Required?
columns A list of column names List[String]
prefix_str The prefix for the columns String

Usage

1
{{ automate_dv.prefix(['CUSTOMERKEY', 'DOB', 'NAME', 'PHONE'], 'a') }} {{ automate_dv.prefix(['CUSTOMERKEY'], 'a') }}
1
a.CUSTOMERKEY, a.DOB, a.NAME, a.PHONE a.CUSTOMERKEY

Note

Single columns must be provided as a 1-item list.


Internal

(macros/internal)

Internal macros are used by other macros provided by AutomateDV. They process provided metadata and should not need to be called directly.