Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,8 @@ require (
github.com/nanmu42/limitio v1.0.0
github.com/oklog/run v1.2.0
github.com/olekukonko/tablewriter v0.0.5
github.com/parquet-go/parquet-go v0.24.0
github.com/planetscale/vtprotobuf v0.6.1-0.20250313105119-ba97887b0a25
github.com/polarsignals/frostdb v0.0.0-20260121113628-9e5cfe0171ad
github.com/polarsignals/iceberg-go v0.0.0-20240502213135-2ee70b71e76b
github.com/prometheus/client_golang v1.23.2
github.com/prometheus/common v0.67.5
github.com/prometheus/prometheus v0.305.0
Expand Down Expand Up @@ -218,11 +216,13 @@ require (
github.com/opencontainers/image-spec v1.1.1 // indirect
github.com/oracle/oci-go-sdk/v65 v65.41.1 // indirect
github.com/ovh/go-ovh v1.8.0 // indirect
github.com/parquet-go/parquet-go v0.24.0 // indirect
github.com/paulmach/orb v0.12.0 // indirect
github.com/pierrec/lz4/v4 v4.1.25 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/polarsignals/iceberg-go v0.0.0-20240502213135-2ee70b71e76b // indirect
github.com/polarsignals/wal v0.0.0-20240619104840-9da940027f9c // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/columnquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/polarsignals/frostdb/pqarrow/arrowutils"
"github.com/parca-dev/parca/pkg/query/internal/arrowutils"

metastorev1alpha1 "github.com/parca-dev/parca/gen/proto/go/parca/metastore/v1alpha1"
pb "github.com/parca-dev/parca/gen/proto/go/parca/query/v1alpha1"
Expand Down
2 changes: 1 addition & 1 deletion pkg/query/flamegraph_arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/apache/arrow-go/v18/arrow/math"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/olekukonko/tablewriter"
"github.com/polarsignals/frostdb/pqarrow/builder"
"github.com/parca-dev/parca/pkg/query/internal/builder"
"github.com/zeebo/xxh3"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
Expand Down
9 changes: 9 additions & 0 deletions pkg/query/internal/arrowutils/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Package arrowutils vendors the Arrow record sort/take/merge utilities
// from github.com/polarsignals/frostdb/pqarrow/arrowutils. These power
// the sort and merge step in the columnar query path.
//
// The merge_test.go and schema_test.go suites from upstream have been
// omitted because they depend on github.com/polarsignals/frostdb/internal/records,
// which is not importable from outside the frostdb module. sort_test.go
// is preserved.
package arrowutils
304 changes: 304 additions & 0 deletions pkg/query/internal/arrowutils/groupranges.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
package arrowutils

import (
"bytes"
"container/heap"
"fmt"
"strings"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
)

// GetGroupsAndOrderedSetRanges returns a min-heap of group ranges and ordered
// set ranges of the given arrow arrays in that order. For the given input with
// a single array:
// a a c d a b c
// This function will return [2, 3, 4, 5, 6] for the group ranges and [4] for
// the ordered set ranges. A group is a collection of values that are equal and
// an ordered set is a collection of groups that are in increasing order.
// The ranges are determined by iterating over the arrays and comparing the
// current group value for each column. The firstGroup to compare against must
// be provided (it can be initialized to the values at index 0 of each array).
// The last group found is returned.
func GetGroupsAndOrderedSetRanges(
firstGroup []any, arrs []arrow.Array,
) (*Int64Heap, *Int64Heap, []any, error) {
if len(firstGroup) != len(arrs) {
return nil,
nil,
nil,
fmt.Errorf(
"columns mismatch (%d != %d) when getting group ranges",
len(firstGroup),
len(arrs),
)
}

// Safe copy the group in order to not overwrite the input slice values.
curGroup := make([]any, len(firstGroup))
for i, v := range firstGroup {
switch concreteV := v.(type) {
case []byte:
curGroup[i] = append([]byte(nil), concreteV...)
default:
curGroup[i] = v
}
}

// groupRanges keeps track of the bounds of the group by columns.
groupRanges := &Int64Heap{}
heap.Init(groupRanges)
// setRanges keeps track of the bounds of ordered sets. i.e. in the
// following slice, (a, a, b, c) is an ordered set of three groups. The
// second ordered set is (a, e): [a, a, b, c, a, e]
setRanges := &Int64Heap{}
heap.Init(setRanges)

// handleCmpResult is a closure that encapsulates the handling of the result
// of comparing a current grouping column with a value in a group array.
handleCmpResult := func(cmp, column int, t arrow.Array, j int) error {
switch cmp {
case -1, 1:
// New group, append range index.
heap.Push(groupRanges, int64(j))
if cmp == 1 {
// New ordered set encountered.
heap.Push(setRanges, int64(j))
}

// And update the current group.
v := t.GetOneForMarshal(j)
switch concreteV := v.(type) {
case []byte:
// Safe copy, otherwise the value might get overwritten.
curGroup[column] = append([]byte(nil), concreteV...)
default:
curGroup[column] = v
}
case 0:
// Equal to group, do nothing.
}
return nil
}
for i, arr := range arrs {
switch t := arr.(type) {
case *array.Binary:
for j := 0; j < arr.Len(); j++ {
var curGroupValue []byte
if curGroup[i] != nil {
curGroupValue = curGroup[i].([]byte)
}
vIsNull := t.IsNull(j)
cmp, ok := nullComparison(curGroupValue == nil, vIsNull)
if !ok {
cmp = bytes.Compare(curGroupValue, t.Value(j))
}
if err := handleCmpResult(cmp, i, t, j); err != nil {
return nil, nil, nil, err
}
}
case *array.String:
for j := 0; j < arr.Len(); j++ {
var curGroupValue *string
if curGroup[i] != nil {
g := curGroup[i].(string)
curGroupValue = &g
}
vIsNull := t.IsNull(j)
cmp, ok := nullComparison(curGroupValue == nil, vIsNull)
if !ok {
cmp = strings.Compare(*curGroupValue, t.Value(j))
}
if err := handleCmpResult(cmp, i, t, j); err != nil {
return nil, nil, nil, err
}
}
case *array.Int64:
for j := 0; j < arr.Len(); j++ {
var curGroupValue *int64
if curGroup[i] != nil {
g := curGroup[i].(int64)
curGroupValue = &g
}
vIsNull := t.IsNull(j)
cmp, ok := nullComparison(curGroupValue == nil, vIsNull)
if !ok {
cmp = compareInt64(*curGroupValue, t.Value(j))
}
if err := handleCmpResult(cmp, i, t, j); err != nil {
return nil, nil, nil, err
}
}
case *array.Boolean:
for j := 0; j < arr.Len(); j++ {
var curGroupValue *bool
if curGroup[i] != nil {
g := curGroup[i].(bool)
curGroupValue = &g
}
vIsNull := t.IsNull(j)
cmp, ok := nullComparison(curGroupValue == nil, vIsNull)
if !ok {
cmp = compareBools(*curGroupValue, t.Value(j))
}
if err := handleCmpResult(cmp, i, t, j); err != nil {
return nil, nil, nil, err
}
}
case VirtualNullArray:
for j := 0; j < arr.Len(); j++ {
cmp, ok := nullComparison(curGroup[i] == nil, true)
if !ok {
return nil, nil, nil, fmt.Errorf(
"null comparison should always be valid but group was: %v", curGroup[i],
)
}
if err := handleCmpResult(cmp, i, t, j); err != nil {
return nil, nil, nil, err
}
}
case *array.Dictionary:
switch dict := t.Dictionary().(type) {
case *array.Binary:
for j := 0; j < arr.Len(); j++ {
var curGroupValue []byte
if curGroup[i] != nil {
curGroupValue = curGroup[i].([]byte)
}
vIsNull := t.IsNull(j)
cmp, ok := nullComparison(curGroupValue == nil, vIsNull)
if !ok {
cmp = bytes.Compare(curGroupValue, dict.Value(t.GetValueIndex(j)))
}
if err := handleCmpResult(cmp, i, t, j); err != nil {
return nil, nil, nil, err
}
}

case *array.String:
for j := 0; j < arr.Len(); j++ {
var curGroupValue *string
if curGroup[i] != nil {
g := curGroup[i].(string)
curGroupValue = &g
}
vIsNull := t.IsNull(j)
cmp, ok := nullComparison(curGroupValue == nil, vIsNull)
if !ok {
cmp = strings.Compare(*curGroupValue,
dict.Value(t.GetValueIndex(j)),
)
}
if err := handleCmpResult(cmp, i, t, j); err != nil {
return nil, nil, nil, err
}
}

default:
panic(fmt.Sprintf("unsupported dictionary type: %T", dict))
}
default:
panic(fmt.Sprintf("unsupported type: %T", t))
}
}
return groupRanges, setRanges, curGroup, nil
}

// nullComparison encapsulates null comparison. leftNull is whether the current
// Note that this function observes default SQL semantics as well as our own,
// i.e. nulls sort first.
// The comparison integer is returned, as well as whether either value was null.
// If the returned boolean is false, the comparison should be disregarded.
func nullComparison(leftNull, rightNull bool) (int, bool) {
if !leftNull && !rightNull {
// Both are not null, this implies that the null comparison should be
// disregarded.
return 0, false
}

if leftNull {
if !rightNull {
return -1, true
}
return 0, true
}
return 1, true
}

func compareInt64(a, b int64) int {
if a < b {
return -1
}
if a > b {
return 1
}
return 0
}

func compareBools(a, b bool) int {
if a == b {
return 0
}

if !a {
return -1
}
return 1
}

type Int64Heap []int64

func (h Int64Heap) Len() int {
return len(h)
}

func (h Int64Heap) Less(i, j int) bool {
return h[i] < h[j]
}

func (h Int64Heap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
}

func (h *Int64Heap) Push(x any) {
*h = append(*h, x.(int64))
}

func (h *Int64Heap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

// PopNextNotEqual returns the next least element not equal to compare.
func (h *Int64Heap) PopNextNotEqual(compare int64) (int64, bool) {
for h.Len() > 0 {
v := heap.Pop(h).(int64)
if v != compare {
return v, true
}
}
return 0, false
}

// Unwrap unwraps the heap into the provided scratch space. The result is a
// slice that will have distinct ints in order. This helps with reiterating over
// the same heap.
func (h *Int64Heap) Unwrap(scratch []int64) []int64 {
scratch = scratch[:0]
if h.Len() == 0 {
return scratch
}
cmp := (*h)[0]
scratch = append(scratch, cmp)
for h.Len() > 0 {
if v := heap.Pop(h).(int64); v != cmp {
scratch = append(scratch, v)
cmp = v
}
}
return scratch
}
Loading
Loading