Skip to content
This repository was archived by the owner on Jun 13, 2025. It is now read-only.

Commit 4615270

Browse files
Dan Leedataform.co
authored andcommitted
add assertions and fix user uniqueness
1 parent 8da0817 commit 4615270

5 files changed

Lines changed: 23 additions & 11 deletions

File tree

dataform.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
2-
"defaultSchema": "dataform",
3-
"assertionSchema": "dataform_assertions",
2+
"defaultSchema": "segment_dataform_package",
3+
"assertionSchema": "segment_dataform_package",
44
"warehouse": "bigquery",
55
"gcloudProjectId": "tada-analytics"
66
}

includes/page_events.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ module.exports = (params) => {
1111
}).query(ctx => `
1212
1313
-- format page calls into a format suitable to join with track calls
14-
select
14+
select distinct
1515
pages.timestamp,
1616
user_id,
1717
anonymous_id,

includes/sessions.js

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ module.exports = (params) => {
88
const customTrackFieldsObj = params.customTrackFields.reduce((acc, item) => ({...acc, [item]: item }), {});
99

1010
return publish("segment_sessions", {
11+
assertions: {
12+
uniqueKey: ["session_id"]
13+
},
1114
description: "Sessions contain a combined view of tracks and pages from segment. Each session is a period of sustained activity, with a new session starting after a 30min+ period of inactivity. Each session contains a repeated field of records which are either tracks or pages. Common fields are extracted out into the top level and type specific fields are kept within two structs: records.track and records.page",
1215
columns: {
1316
session_id: "Unique identifier of the session",
@@ -18,7 +21,7 @@ module.exports = (params) => {
1821
...params.defaultConfig
1922
}).query(ctx => `
2023
21-
/* TODO: optimise this code to make it work, or enable only for incremental builds
24+
/* TODO: optimise this code to make it work, or enable only for incremental builds */
2225
with first_and_last_page_values as (
2326
select distinct
2427
session_id,
@@ -29,6 +32,7 @@ select distinct
2932
ignore_nulls: true,
3033
partition_fields: "session_id",
3134
order_fields: 'sessionized_pages.timestamp asc',
35+
frame_clause: "rows between unbounded preceding and unbounded following",
3236
})} as first_${value}`).join(",\n ")},
3337
${Object.entries({...segmentCommon.PAGE_FIELDS, ...customPageFieldsObj}).map(
3438
([key, value]) => `${crossdb.windowFunction({
@@ -37,10 +41,12 @@ select distinct
3741
ignore_nulls: true,
3842
partition_fields: "session_id",
3943
order_fields: 'sessionized_pages.timestamp asc',
44+
frame_clause: "rows between unbounded preceding and unbounded following",
4045
})} as last_${value}`).join(",\n ")}
4146
from
4247
${ctx.ref(params.defaultConfig.schema, "segment_sessionized_pages")} as sessionized_pages
4348
)
49+
/*
4450
*/
4551
4652
select
@@ -56,12 +62,13 @@ select
5662
count(segment_sessionized_events.page_id) as total_pages,
5763
${crossdb.timestampDiff("millisecond", "min(segment_sessionized_events.timestamp)", "max(segment_sessionized_events.timestamp)")} as duration_millis
5864
${ctx.when(global.session.config.warehouse == "bigquery", `) as stats`)},
59-
/* See TODO at start of script
65+
/* See TODO at start of script */
6066
-- first values in the session for page fields
6167
${ctx.when(global.session.config.warehouse == "bigquery", `struct(\n `)}
6268
${Object.entries({...segmentCommon.PAGE_FIELDS, ...customPageFieldsObj}).map(
6369
([key, value]) => `first_and_last_page_values.first_${value}`).join(",\n ")}
6470
${ctx.when(global.session.config.warehouse == "bigquery", `) as first_page_values`)},
71+
/*
6572
-- last values in the session for page fields
6673
${ctx.when(global.session.config.warehouse == "bigquery", `struct(\n `)}
6774
${Object.entries({...segmentCommon.PAGE_FIELDS, ...customPageFieldsObj}).map(
@@ -88,21 +95,21 @@ select
8895
) as records`)}
8996
from
9097
${ctx.ref(params.defaultConfig.schema, "segment_sessionized_events")} as segment_sessionized_events
91-
/* See TODO at start of script
98+
/* See TODO at start of script */
9299
left join first_and_last_page_values
93100
using(session_id)
94-
*/
101+
/**/
95102
${ctx.when(global.session.config.warehouse == "bigquery", `
96103
left join ${ctx.ref(params.defaultConfig.schema, "segment_sessionized_pages")} as segment_sessionized_pages
97104
using(page_id)
98105
left join ${ctx.ref(params.defaultConfig.schema, "segment_sessionized_tracks")} as segment_sessionized_tracks
99106
using(track_id)`)}
100107
group by
101108
session_id, session_index, user_id
102-
/* See TODO at start of script
109+
/* See TODO at start of script */
103110
${Object.entries({...segmentCommon.PAGE_FIELDS, ...customPageFieldsObj}).map(
104111
([key, value]) => `, first_${value}`).join(" ")}
105-
${Object.entries({...segmentCommon.PAGE_FIELDS, ...customPageFieldsObj}).map(
112+
/* ${Object.entries({...segmentCommon.PAGE_FIELDS, ...customPageFieldsObj}).map(
106113
([key, value]) => `, last_${value}`).join(" ")}
107114
*/
108115
`)

includes/track_events.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ module.exports = (params) => {
1111
}).query(ctx => `
1212
1313
-- format track calls into a format suitable to join with page calls
14-
select
14+
select distinct
1515
tracks.timestamp,
1616
user_id,
1717
anonymous_id,

includes/users.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ let USER = `coalesce(
88

99
module.exports = (params) => {
1010
return publish("segment_users", {
11+
assertions: {
12+
uniqueKey: ["user_id"]
13+
},
1114
description: "Users aggregates all identifies calls to give a table with one row per user_id. Identify calls without only an anonymous_id are mapped to the user where possible.",
1215
columns: {
1316
user_id: "Unique identifier of the user",
@@ -24,14 +27,16 @@ select distinct
2427
ignore_nulls: true,
2528
partition_fields: USER,
2629
order_fields: "identifies.timestamp asc",
30+
frame_clause: "rows between unbounded preceding and unbounded following",
2731
})} as first_seen_at
2832
${params.customUserFields.length ? `,` : ``}
29-
${params.customUserFields.map(f=> `${crossdb.windowFunction({
33+
${params.customUserFields.map(f => `${crossdb.windowFunction({
3034
func: "first_value",
3135
value: f,
3236
ignore_nulls: true,
3337
partition_fields: USER,
3438
order_fields: "identifies.timestamp desc",
39+
frame_clause: "rows between unbounded preceding and unbounded following",
3540
})} as ${f}`).join(",\n ")}
3641
from
3742
${ctx.ref(params.defaultConfig.schema, "segment_user_map")} as segment_user_anonymous_map

0 commit comments

Comments
 (0)