diff --git a/go.mod b/go.mod index 7c9b099418f..6e266700341 100644 --- a/go.mod +++ b/go.mod @@ -34,10 +34,8 @@ require ( github.com/nanmu42/limitio v1.0.0 github.com/oklog/run v1.2.0 github.com/olekukonko/tablewriter v0.0.5 - github.com/parquet-go/parquet-go v0.24.0 github.com/planetscale/vtprotobuf v0.6.1-0.20250313105119-ba97887b0a25 github.com/polarsignals/frostdb v0.0.0-20260121113628-9e5cfe0171ad - github.com/polarsignals/iceberg-go v0.0.0-20240502213135-2ee70b71e76b github.com/prometheus/client_golang v1.23.2 github.com/prometheus/common v0.67.5 github.com/prometheus/prometheus v0.305.0 @@ -218,11 +216,13 @@ require ( github.com/opencontainers/image-spec v1.1.1 // indirect github.com/oracle/oci-go-sdk/v65 v65.41.1 // indirect github.com/ovh/go-ovh v1.8.0 // indirect + github.com/parquet-go/parquet-go v0.24.0 // indirect github.com/paulmach/orb v0.12.0 // indirect github.com/pierrec/lz4/v4 v4.1.25 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/polarsignals/iceberg-go v0.0.0-20240502213135-2ee70b71e76b // indirect github.com/polarsignals/wal v0.0.0-20240619104840-9da940027f9c // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/procfs v0.16.1 // indirect diff --git a/pkg/query/columnquery.go b/pkg/query/columnquery.go index 5f70ff4a628..9bcea8a7da0 100644 --- a/pkg/query/columnquery.go +++ b/pkg/query/columnquery.go @@ -34,7 +34,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "github.com/polarsignals/frostdb/pqarrow/arrowutils" + "github.com/parca-dev/parca/pkg/query/internal/arrowutils" metastorev1alpha1 "github.com/parca-dev/parca/gen/proto/go/parca/metastore/v1alpha1" pb "github.com/parca-dev/parca/gen/proto/go/parca/query/v1alpha1" diff --git a/pkg/query/flamegraph_arrow.go b/pkg/query/flamegraph_arrow.go index d344fa209b9..3c46b6de261 100644 --- a/pkg/query/flamegraph_arrow.go +++ b/pkg/query/flamegraph_arrow.go @@ -30,7 +30,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/math" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/olekukonko/tablewriter" - "github.com/polarsignals/frostdb/pqarrow/builder" + "github.com/parca-dev/parca/pkg/query/internal/builder" "github.com/zeebo/xxh3" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" diff --git a/pkg/query/internal/arrowutils/doc.go b/pkg/query/internal/arrowutils/doc.go new file mode 100644 index 00000000000..8d1c944779c --- /dev/null +++ b/pkg/query/internal/arrowutils/doc.go @@ -0,0 +1,9 @@ +// Package arrowutils vendors the Arrow record sort/take/merge utilities +// from github.com/polarsignals/frostdb/pqarrow/arrowutils. These power +// the sort and merge step in the columnar query path. +// +// The merge_test.go and schema_test.go suites from upstream have been +// omitted because they depend on github.com/polarsignals/frostdb/internal/records, +// which is not importable from outside the frostdb module. sort_test.go +// is preserved. +package arrowutils diff --git a/pkg/query/internal/arrowutils/groupranges.go b/pkg/query/internal/arrowutils/groupranges.go new file mode 100644 index 00000000000..ccf9a359099 --- /dev/null +++ b/pkg/query/internal/arrowutils/groupranges.go @@ -0,0 +1,304 @@ +package arrowutils + +import ( + "bytes" + "container/heap" + "fmt" + "strings" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" +) + +// GetGroupsAndOrderedSetRanges returns a min-heap of group ranges and ordered +// set ranges of the given arrow arrays in that order. For the given input with +// a single array: +// a a c d a b c +// This function will return [2, 3, 4, 5, 6] for the group ranges and [4] for +// the ordered set ranges. A group is a collection of values that are equal and +// an ordered set is a collection of groups that are in increasing order. +// The ranges are determined by iterating over the arrays and comparing the +// current group value for each column. The firstGroup to compare against must +// be provided (it can be initialized to the values at index 0 of each array). +// The last group found is returned. +func GetGroupsAndOrderedSetRanges( + firstGroup []any, arrs []arrow.Array, +) (*Int64Heap, *Int64Heap, []any, error) { + if len(firstGroup) != len(arrs) { + return nil, + nil, + nil, + fmt.Errorf( + "columns mismatch (%d != %d) when getting group ranges", + len(firstGroup), + len(arrs), + ) + } + + // Safe copy the group in order to not overwrite the input slice values. + curGroup := make([]any, len(firstGroup)) + for i, v := range firstGroup { + switch concreteV := v.(type) { + case []byte: + curGroup[i] = append([]byte(nil), concreteV...) + default: + curGroup[i] = v + } + } + + // groupRanges keeps track of the bounds of the group by columns. + groupRanges := &Int64Heap{} + heap.Init(groupRanges) + // setRanges keeps track of the bounds of ordered sets. i.e. in the + // following slice, (a, a, b, c) is an ordered set of three groups. The + // second ordered set is (a, e): [a, a, b, c, a, e] + setRanges := &Int64Heap{} + heap.Init(setRanges) + + // handleCmpResult is a closure that encapsulates the handling of the result + // of comparing a current grouping column with a value in a group array. + handleCmpResult := func(cmp, column int, t arrow.Array, j int) error { + switch cmp { + case -1, 1: + // New group, append range index. + heap.Push(groupRanges, int64(j)) + if cmp == 1 { + // New ordered set encountered. + heap.Push(setRanges, int64(j)) + } + + // And update the current group. + v := t.GetOneForMarshal(j) + switch concreteV := v.(type) { + case []byte: + // Safe copy, otherwise the value might get overwritten. + curGroup[column] = append([]byte(nil), concreteV...) + default: + curGroup[column] = v + } + case 0: + // Equal to group, do nothing. + } + return nil + } + for i, arr := range arrs { + switch t := arr.(type) { + case *array.Binary: + for j := 0; j < arr.Len(); j++ { + var curGroupValue []byte + if curGroup[i] != nil { + curGroupValue = curGroup[i].([]byte) + } + vIsNull := t.IsNull(j) + cmp, ok := nullComparison(curGroupValue == nil, vIsNull) + if !ok { + cmp = bytes.Compare(curGroupValue, t.Value(j)) + } + if err := handleCmpResult(cmp, i, t, j); err != nil { + return nil, nil, nil, err + } + } + case *array.String: + for j := 0; j < arr.Len(); j++ { + var curGroupValue *string + if curGroup[i] != nil { + g := curGroup[i].(string) + curGroupValue = &g + } + vIsNull := t.IsNull(j) + cmp, ok := nullComparison(curGroupValue == nil, vIsNull) + if !ok { + cmp = strings.Compare(*curGroupValue, t.Value(j)) + } + if err := handleCmpResult(cmp, i, t, j); err != nil { + return nil, nil, nil, err + } + } + case *array.Int64: + for j := 0; j < arr.Len(); j++ { + var curGroupValue *int64 + if curGroup[i] != nil { + g := curGroup[i].(int64) + curGroupValue = &g + } + vIsNull := t.IsNull(j) + cmp, ok := nullComparison(curGroupValue == nil, vIsNull) + if !ok { + cmp = compareInt64(*curGroupValue, t.Value(j)) + } + if err := handleCmpResult(cmp, i, t, j); err != nil { + return nil, nil, nil, err + } + } + case *array.Boolean: + for j := 0; j < arr.Len(); j++ { + var curGroupValue *bool + if curGroup[i] != nil { + g := curGroup[i].(bool) + curGroupValue = &g + } + vIsNull := t.IsNull(j) + cmp, ok := nullComparison(curGroupValue == nil, vIsNull) + if !ok { + cmp = compareBools(*curGroupValue, t.Value(j)) + } + if err := handleCmpResult(cmp, i, t, j); err != nil { + return nil, nil, nil, err + } + } + case VirtualNullArray: + for j := 0; j < arr.Len(); j++ { + cmp, ok := nullComparison(curGroup[i] == nil, true) + if !ok { + return nil, nil, nil, fmt.Errorf( + "null comparison should always be valid but group was: %v", curGroup[i], + ) + } + if err := handleCmpResult(cmp, i, t, j); err != nil { + return nil, nil, nil, err + } + } + case *array.Dictionary: + switch dict := t.Dictionary().(type) { + case *array.Binary: + for j := 0; j < arr.Len(); j++ { + var curGroupValue []byte + if curGroup[i] != nil { + curGroupValue = curGroup[i].([]byte) + } + vIsNull := t.IsNull(j) + cmp, ok := nullComparison(curGroupValue == nil, vIsNull) + if !ok { + cmp = bytes.Compare(curGroupValue, dict.Value(t.GetValueIndex(j))) + } + if err := handleCmpResult(cmp, i, t, j); err != nil { + return nil, nil, nil, err + } + } + + case *array.String: + for j := 0; j < arr.Len(); j++ { + var curGroupValue *string + if curGroup[i] != nil { + g := curGroup[i].(string) + curGroupValue = &g + } + vIsNull := t.IsNull(j) + cmp, ok := nullComparison(curGroupValue == nil, vIsNull) + if !ok { + cmp = strings.Compare(*curGroupValue, + dict.Value(t.GetValueIndex(j)), + ) + } + if err := handleCmpResult(cmp, i, t, j); err != nil { + return nil, nil, nil, err + } + } + + default: + panic(fmt.Sprintf("unsupported dictionary type: %T", dict)) + } + default: + panic(fmt.Sprintf("unsupported type: %T", t)) + } + } + return groupRanges, setRanges, curGroup, nil +} + +// nullComparison encapsulates null comparison. leftNull is whether the current +// Note that this function observes default SQL semantics as well as our own, +// i.e. nulls sort first. +// The comparison integer is returned, as well as whether either value was null. +// If the returned boolean is false, the comparison should be disregarded. +func nullComparison(leftNull, rightNull bool) (int, bool) { + if !leftNull && !rightNull { + // Both are not null, this implies that the null comparison should be + // disregarded. + return 0, false + } + + if leftNull { + if !rightNull { + return -1, true + } + return 0, true + } + return 1, true +} + +func compareInt64(a, b int64) int { + if a < b { + return -1 + } + if a > b { + return 1 + } + return 0 +} + +func compareBools(a, b bool) int { + if a == b { + return 0 + } + + if !a { + return -1 + } + return 1 +} + +type Int64Heap []int64 + +func (h Int64Heap) Len() int { + return len(h) +} + +func (h Int64Heap) Less(i, j int) bool { + return h[i] < h[j] +} + +func (h Int64Heap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] +} + +func (h *Int64Heap) Push(x any) { + *h = append(*h, x.(int64)) +} + +func (h *Int64Heap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +// PopNextNotEqual returns the next least element not equal to compare. +func (h *Int64Heap) PopNextNotEqual(compare int64) (int64, bool) { + for h.Len() > 0 { + v := heap.Pop(h).(int64) + if v != compare { + return v, true + } + } + return 0, false +} + +// Unwrap unwraps the heap into the provided scratch space. The result is a +// slice that will have distinct ints in order. This helps with reiterating over +// the same heap. +func (h *Int64Heap) Unwrap(scratch []int64) []int64 { + scratch = scratch[:0] + if h.Len() == 0 { + return scratch + } + cmp := (*h)[0] + scratch = append(scratch, cmp) + for h.Len() > 0 { + if v := heap.Pop(h).(int64); v != cmp { + scratch = append(scratch, v) + cmp = v + } + } + return scratch +} diff --git a/pkg/query/internal/arrowutils/merge.go b/pkg/query/internal/arrowutils/merge.go new file mode 100644 index 00000000000..3a7ba99ed50 --- /dev/null +++ b/pkg/query/internal/arrowutils/merge.go @@ -0,0 +1,189 @@ +package arrowutils + +import ( + "bytes" + "container/heap" + "fmt" + "math" + "strings" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + + "github.com/parca-dev/parca/pkg/query/internal/builder" +) + +// MergeRecords merges the given records. The records must all have the same +// schema. orderByCols is a slice of indexes into the columns that the records +// and resulting records are ordered by. While ordering the limit is checked before appending more rows. +// If limit is 0, no limit is applied. +// Note that the given records should already be ordered by the given columns. +// WARNING: Only ascending ordering is currently supported. +func MergeRecords( + mem memory.Allocator, + records []arrow.Record, + orderByCols []SortingColumn, + limit uint64, +) (arrow.Record, error) { + h := cursorHeap{ + cursors: make([]cursor, len(records)), + orderByCols: orderByCols, + } + for i := range h.cursors { + h.cursors[i].r = records[i] + } + + schema := records[0].Schema() + recordBuilder := builder.NewRecordBuilder(mem, schema) + defer recordBuilder.Release() + + if limit == 0 { + limit = math.MaxInt64 + } + count := uint64(0) + + heap.Init(&h) + for h.Len() > 0 && count < limit { + // Minimum cursor is always at index 0. + r := h.cursors[0].r + i := h.cursors[0].curIdx + for colIdx, b := range recordBuilder.Fields() { + if err := builder.AppendValue(b, r.Column(colIdx), i); err != nil { + return nil, err + } + } + if int64(i+1) >= r.NumRows() { + // Pop the cursor since it has no more data. + _ = heap.Pop(&h) + count++ + continue + } + h.cursors[0].curIdx++ + heap.Fix(&h, 0) + count++ + } + + return recordBuilder.NewRecord(), nil +} + +type cursor struct { + r arrow.Record + curIdx int +} + +type cursorHeap struct { + cursors []cursor + orderByCols []SortingColumn +} + +func (h cursorHeap) Len() int { + return len(h.cursors) +} + +func (h cursorHeap) Less(i, j int) bool { + for idx := range h.orderByCols { + c1 := h.cursors[i] + c2 := h.cursors[j] + sc := h.orderByCols[idx] + col1 := c1.r.Column(sc.Index) + col2 := c2.r.Column(sc.Index) + if cmp, ok := nullComparison(col1.IsNull(c1.curIdx), col2.IsNull(c2.curIdx)); ok { + if h.orderByCols[idx].NullsFirst { + return cmp == -1 + } + if !h.orderByCols[idx].NullsFirst { + return cmp == 1 + } + } + + cmp := h.compare(idx, i, j) + if cmp != 0 { + // Use direction to reorder the comparison. Direction determines if the list + // is in ascending or descending. + // + // For instance if comparison between i,j value is -1 and direction is -1 + // this will resolve to true hence the list will be in ascending order. Same + // principle applies for descending. + return cmp == h.orderByCols[idx].Direction.comparison() + } + // Try comparing the next column + } + + return false +} + +func (h cursorHeap) compare(idx, i, j int) int { + c1 := h.cursors[i] + c2 := h.cursors[j] + sc := h.orderByCols[idx] + switch arr1 := c1.r.Column(sc.Index).(type) { + case *array.Binary: + arr2 := c2.r.Column(sc.Index).(*array.Binary) + return bytes.Compare(arr1.Value(c1.curIdx), arr2.Value(c2.curIdx)) + case *array.String: + arr2 := c2.r.Column(sc.Index).(*array.String) + return strings.Compare(arr1.Value(c1.curIdx), arr2.Value(c2.curIdx)) + case *array.Int64: + arr2 := c2.r.Column(sc.Index).(*array.Int64) + v1 := arr1.Value(c1.curIdx) + v2 := arr2.Value(c2.curIdx) + if v1 == v2 { + return 0 + } + if v1 < v2 { + return -1 + } + return 1 + case *array.Int32: + arr2 := c2.r.Column(sc.Index).(*array.Int32) + v1 := arr1.Value(c1.curIdx) + v2 := arr2.Value(c2.curIdx) + if v1 == v2 { + return 0 + } + if v1 < v2 { + return -1 + } + return 1 + case *array.Uint64: + arr2 := c2.r.Column(sc.Index).(*array.Uint64) + v1 := arr1.Value(c1.curIdx) + v2 := arr2.Value(c2.curIdx) + if v1 == v2 { + return 0 + } + if v1 < v2 { + return -1 + } + return 1 + case *array.Dictionary: + switch dict := arr1.Dictionary().(type) { + case *array.Binary: + arr2 := c2.r.Column(sc.Index).(*array.Dictionary) + dict2 := arr2.Dictionary().(*array.Binary) + return bytes.Compare(dict.Value(arr1.GetValueIndex(c1.curIdx)), dict2.Value(arr2.GetValueIndex(c2.curIdx))) + default: + panic(fmt.Sprintf("unsupported dictionary type for record merging %T", dict)) + } + default: + panic(fmt.Sprintf("unsupported type for record merging %T", arr1)) + } +} + +func (h cursorHeap) Swap(i, j int) { + h.cursors[i], h.cursors[j] = h.cursors[j], h.cursors[i] +} + +func (h cursorHeap) Push(_ any) { + panic( + "number of cursors are known at Init time, none should ever be pushed", + ) +} + +func (h *cursorHeap) Pop() any { + n := len(h.cursors) - 1 + c := h.cursors[n] + h.cursors = h.cursors[:n] + return c +} diff --git a/pkg/query/internal/arrowutils/nullarray.go b/pkg/query/internal/arrowutils/nullarray.go new file mode 100644 index 00000000000..4b3c6d6fc89 --- /dev/null +++ b/pkg/query/internal/arrowutils/nullarray.go @@ -0,0 +1,81 @@ +package arrowutils + +import ( + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/memory" + + "github.com/parca-dev/parca/pkg/query/internal/builder" +) + +// VirtualNullArray is an arrow.Array that will return that any element is null +// via the arrow.Array interface methods. This is useful if callers need to +// represent an array of len NULL values without allocating/storing a bitmap. +// This should only be used internally. If callers need a physical null array, +// call MakeNullArray. +type VirtualNullArray struct { + dt arrow.DataType + len int +} + +func MakeVirtualNullArray(dt arrow.DataType, length int) VirtualNullArray { + return VirtualNullArray{ + dt: dt, + len: length, + } +} + +// MakeNullArray makes a physical arrow.Array full of NULLs of the given +// DataType. +func MakeNullArray(mem memory.Allocator, dt arrow.DataType, length int) arrow.Array { + // TODO(asubiotto): This can be improved by using the optimized builders' + // AppendNulls. Not sure whether this should be part of the builder package. + b := builder.NewBuilder(mem, dt) + defer b.Release() + b.Reserve(length) + for i := 0; i < length; i++ { + b.AppendNull() + } + return b.NewArray() +} + +func (n VirtualNullArray) MarshalJSON() ([]byte, error) { + panic("VirtualNullArray: MarshalJSON not implemented") +} + +func (n VirtualNullArray) DataType() arrow.DataType { + return n.dt +} + +func (n VirtualNullArray) NullN() int { + return n.len +} + +func (n VirtualNullArray) NullBitmapBytes() []byte { + panic("VirtualNullArray: NullBitmapBytes not implemented") +} + +func (n VirtualNullArray) IsNull(_ int) bool { + return true +} + +func (n VirtualNullArray) IsValid(_ int) bool { + return false +} + +func (n VirtualNullArray) Data() arrow.ArrayData { + panic("VirtualNullArray: Data not implemented") +} + +func (n VirtualNullArray) Len() int { + return n.len +} + +func (n VirtualNullArray) Retain() {} + +func (n VirtualNullArray) Release() {} + +func (n VirtualNullArray) String() string { return "VirtualNullArray" } + +func (n VirtualNullArray) ValueStr(_ int) string { return "" } + +func (n VirtualNullArray) GetOneForMarshal(_ int) any { return nil } diff --git a/pkg/query/internal/arrowutils/schema.go b/pkg/query/internal/arrowutils/schema.go new file mode 100644 index 00000000000..d5b7785adb5 --- /dev/null +++ b/pkg/query/internal/arrowutils/schema.go @@ -0,0 +1,95 @@ +package arrowutils + +import ( + "fmt" + "sort" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" +) + +// EnsureSameSchema ensures that all the records have the same schema. In cases +// where the schema is not equal, virtual null columns are inserted in the +// records with the missing column. When we have static schemas in the execution +// engine, steps like these should be unnecessary. +func EnsureSameSchema(records []arrow.Record) ([]arrow.Record, error) { + if len(records) < 2 { + return records, nil + } + + lastSchema := records[0].Schema() + needSchemaRecalculation := false + for i := range records { + if !records[i].Schema().Equal(lastSchema) { + needSchemaRecalculation = true + break + } + } + if !needSchemaRecalculation { + return records, nil + } + + columns := make(map[string]arrow.Field) + for _, r := range records { + for j := 0; j < r.Schema().NumFields(); j++ { + field := r.Schema().Field(j) + if _, ok := columns[field.Name]; !ok { + columns[field.Name] = field + } + } + } + + columnNames := make([]string, 0, len(columns)) + for name := range columns { + columnNames = append(columnNames, name) + } + sort.Strings(columnNames) + + mergedFields := make([]arrow.Field, 0, len(columnNames)) + for _, name := range columnNames { + mergedFields = append(mergedFields, columns[name]) + } + mergedSchema := arrow.NewSchema(mergedFields, nil) + + mergedRecords := make([]arrow.Record, len(records)) + var replacedRecords []arrow.Record + + for i := range records { + recordSchema := records[i].Schema() + if mergedSchema.Equal(recordSchema) { + mergedRecords[i] = records[i] + continue + } + + mergedColumns := make([]arrow.Array, 0, len(mergedFields)) + recordNumRows := records[i].NumRows() + for j := 0; j < mergedSchema.NumFields(); j++ { + field := mergedSchema.Field(j) + if otherFields := recordSchema.FieldIndices(field.Name); otherFields != nil { + if len(otherFields) > 1 { + fieldsFound, _ := recordSchema.FieldsByName(field.Name) + return nil, fmt.Errorf( + "found multiple fields %v for name %s", + fieldsFound, + field.Name, + ) + } + mergedColumns = append(mergedColumns, records[i].Column(otherFields[0])) + } else { + // Note that this VirtualNullArray will be read from, but the + // merged output will be a physical null array, so there is no + // virtual->physical conversion necessary before we return data. + mergedColumns = append(mergedColumns, MakeVirtualNullArray(field.Type, int(recordNumRows))) + } + } + + replacedRecords = append(replacedRecords, records[i]) + mergedRecords[i] = array.NewRecord(mergedSchema, mergedColumns, recordNumRows) + } + + for _, r := range replacedRecords { + r.Release() + } + + return mergedRecords, nil +} diff --git a/pkg/query/internal/arrowutils/sort.go b/pkg/query/internal/arrowutils/sort.go new file mode 100644 index 00000000000..2c4a0bd382a --- /dev/null +++ b/pkg/query/internal/arrowutils/sort.go @@ -0,0 +1,656 @@ +package arrowutils + +import ( + "bytes" + "cmp" + "context" + "errors" + "fmt" + "slices" + "sort" + "strconv" + "sync" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/compute" + "github.com/apache/arrow-go/v18/arrow/memory" + "golang.org/x/sync/errgroup" + + "github.com/parca-dev/parca/pkg/query/internal/builder" +) + +type Direction uint + +const ( + Ascending Direction = iota + Descending +) + +func (d Direction) comparison() int { + switch d { + case Ascending: + return -1 + case Descending: + return 1 + default: + panic("unexpected direction value " + strconv.Itoa(int(d)) + " only -1 and 1 are allowed") + } +} + +// SortingColumn describes a sorting column on a arrow.Record. +type SortingColumn struct { + Index int + Direction Direction + NullsFirst bool +} + +// SortRecord sorts given arrow.Record by columns. Returns *array.Int32 of +// indices to sorted rows or record r. +// +// Comparison is made sequentially by each column. When rows are equal in the +// first column we compare the rows om the second column and so on and so forth +// until rows that are not equal are found. +func SortRecord(r arrow.Record, columns []SortingColumn) (*array.Int32, error) { + if len(columns) == 0 { + return nil, errors.New("pqarrow/arrowutils: at least one column is needed for sorting") + } + ms, err := newMultiColSorter(r, columns) + if err != nil { + return nil, err + } + defer ms.Release() + sort.Sort(ms) + return ms.indices.NewArray().(*array.Int32), nil +} + +// Take uses indices which is an array of row index and returns a new record +// that only contains rows specified in indices. +// +// Use compute.WithAllocator to pass a custom memory.Allocator. +func Take(ctx context.Context, r arrow.Record, indices *array.Int32) (arrow.Record, error) { + // compute.Take doesn't support dictionaries or lists. Use take on r when r + // does not have these columns. + var customTake bool + for i := 0; i < int(r.NumCols()); i++ { + if r.Column(i).DataType().ID() == arrow.DICTIONARY || + r.Column(i).DataType().ID() == arrow.RUN_END_ENCODED || + r.Column(i).DataType().ID() == arrow.LIST || + r.Column(i).DataType().ID() == arrow.STRUCT { + customTake = true + break + } + } + if !customTake { + res, err := compute.Take( + ctx, + compute.TakeOptions{BoundsCheck: true}, + compute.NewDatumWithoutOwning(r), + compute.NewDatumWithoutOwning(indices), + ) + if err != nil { + return nil, err + } + return res.(*compute.RecordDatum).Value, nil + } + if r.NumCols() == 0 { + return r, nil + } + + resArr := make([]arrow.Array, r.NumCols()) + defer func() { + for _, a := range resArr { + if a != nil { + a.Release() + } + } + }() + var g errgroup.Group + for i := 0; i < int(r.NumCols()); i++ { + i := i + col := r.Column(i) + switch arr := r.Column(i).(type) { + case *array.Dictionary: + g.Go(func() error { return TakeDictColumn(ctx, arr, i, resArr, indices) }) + case *array.RunEndEncoded: + g.Go(func() error { return TakeRunEndEncodedColumn(ctx, arr, i, resArr, indices) }) + case *array.List: + g.Go(func() error { return TakeListColumn(ctx, arr, i, resArr, indices) }) + case *array.Struct: + g.Go(func() error { return TakeStructColumn(ctx, arr, i, resArr, indices) }) + default: + g.Go(func() error { return TakeColumn(ctx, col, i, resArr, indices) }) + } + } + if err := g.Wait(); err != nil { + return nil, err + } + + // We checked for at least one column at the beginning of the function. + expectedLen := resArr[0].Len() + for _, a := range resArr { + if a.Len() != expectedLen { + return nil, fmt.Errorf( + "pqarrow/arrowutils: expected same length %d for all columns got %d for %s", expectedLen, a.Len(), a.DataType().Name(), + ) + } + } + return array.NewRecord(r.Schema(), resArr, int64(indices.Len())), nil +} + +func TakeColumn(ctx context.Context, a arrow.Array, idx int, arr []arrow.Array, indices *array.Int32) error { + r, err := compute.TakeArray(ctx, a, indices) + if err != nil { + return err + } + arr[idx] = r + return nil +} + +func TakeDictColumn(ctx context.Context, a *array.Dictionary, idx int, arr []arrow.Array, indices *array.Int32) error { + switch a.Dictionary().(type) { + case *array.String, *array.Binary: + r := array.NewDictionaryBuilderWithDict( + compute.GetAllocator(ctx), a.DataType().(*arrow.DictionaryType), a.Dictionary(), + ).(*array.BinaryDictionaryBuilder) + defer r.Release() + + r.Reserve(indices.Len()) + idxBuilder := r.IndexBuilder() + for _, i := range indices.Int32Values() { + if a.IsNull(int(i)) { + r.AppendNull() + continue + } + idxBuilder.Append(a.GetValueIndex(int(i))) + } + + arr[idx] = r.NewArray() + return nil + case *array.FixedSizeBinary: + r := array.NewDictionaryBuilderWithDict( + compute.GetAllocator(ctx), a.DataType().(*arrow.DictionaryType), a.Dictionary(), + ).(*array.FixedSizeBinaryDictionaryBuilder) + defer r.Release() + + r.Reserve(indices.Len()) + idxBuilder := r.IndexBuilder() + for _, i := range indices.Int32Values() { + if a.IsNull(int(i)) { + r.AppendNull() + continue + } + // TODO: Improve this by not copying actual values. + idxBuilder.Append(a.GetValueIndex(int(i))) + } + + arr[idx] = r.NewArray() + return nil + } + + return nil +} + +func TakeRunEndEncodedColumn(ctx context.Context, a *array.RunEndEncoded, idx int, arr []arrow.Array, indices *array.Int32) error { + expandedIndexBuilder := array.NewInt32Builder(compute.GetAllocator(ctx)) + defer expandedIndexBuilder.Release() + + dict := a.Values().(*array.Dictionary) + for i := 0; i < a.Len(); i++ { + if dict.IsNull(a.GetPhysicalIndex(i)) { + expandedIndexBuilder.AppendNull() + } else { + expandedIndexBuilder.Append(int32(dict.GetValueIndex(a.GetPhysicalIndex(i)))) + } + } + expandedIndex := expandedIndexBuilder.NewInt32Array() + defer expandedIndex.Release() + + expandedReorderedArr := make([]arrow.Array, 1) + if err := TakeColumn(ctx, expandedIndex, 0, expandedReorderedArr, indices); err != nil { + return err + } + expandedReordered := expandedReorderedArr[0].(*array.Int32) + defer expandedReordered.Release() + + b := array.NewRunEndEncodedBuilder( + compute.GetAllocator(ctx), a.RunEndsArr().DataType(), a.Values().DataType(), + ) + defer b.Release() + b.Reserve(indices.Len()) + + dictValues := dict.Dictionary().(*array.String) + for i := 0; i < expandedReordered.Len(); i++ { + if expandedReordered.IsNull(i) { + b.AppendNull() + continue + } + reorderedIndex := expandedReordered.Value(i) + v := dictValues.Value(int(reorderedIndex)) + if err := b.AppendValueFromString(v); err != nil { + return err + } + } + + arr[idx] = b.NewRunEndEncodedArray() + return nil +} + +func TakeListColumn(ctx context.Context, a *array.List, idx int, arr []arrow.Array, indices *array.Int32) error { + mem := compute.GetAllocator(ctx) + r := array.NewBuilder(mem, a.DataType()).(*array.ListBuilder) + + switch valueBuilder := r.ValueBuilder().(type) { + case *array.BinaryDictionaryBuilder: + defer valueBuilder.Release() + + listValues := a.ListValues().(*array.Dictionary) + switch dictV := listValues.Dictionary().(type) { + case *array.String: + if err := valueBuilder.InsertStringDictValues(dictV); err != nil { + return err + } + case *array.Binary: + if err := valueBuilder.InsertDictValues(dictV); err != nil { + return err + } + } + idxBuilder := valueBuilder.IndexBuilder() + + r.Reserve(indices.Len()) + for _, i := range indices.Int32Values() { + if a.IsNull(int(i)) { + r.AppendNull() + continue + } + + r.Append(true) + start, end := a.ValueOffsets(int(i)) + for j := start; j < end; j++ { + idxBuilder.Append(listValues.GetValueIndex(int(j))) + } + // Resize is necessary here for the correct offsets to be appended to + // the list builder. Otherwise, length will remain at 0. + valueBuilder.Resize(idxBuilder.Len()) + } + + arr[idx] = r.NewArray() + return nil + case *array.StructBuilder: + defer valueBuilder.Release() + + structArray := a.ListValues().(*array.Struct) + + // expand the indices from the list to each row in the struct. + structIndicesBuilder := array.NewInt32Builder(mem) + structIndicesBuilder.Reserve(structArray.Len()) + defer structIndicesBuilder.Release() + + for _, i := range indices.Int32Values() { + start, end := a.ValueOffsets(int(i)) + for j := start; j < end; j++ { + structIndicesBuilder.Append(int32(j)) + } + } + structIndices := structIndicesBuilder.NewInt32Array() + defer structIndices.Release() + + arrays := []arrow.Array{structArray} + err := TakeStructColumn(ctx, structArray, 0, arrays, structIndices) + if err != nil { + return err + } + defer func() { + for _, a := range arrays { + a.Release() + } + }() + + newOffsetBuilder := array.NewInt32Builder(mem) + defer newOffsetBuilder.Release() + + // Build validity bitmap for the list array + nullBitmapBuilder := array.NewBooleanBuilder(mem) + defer nullBitmapBuilder.Release() + + newOffsetBuilder.Append(0) + newOffsetPrevious := int32(0) + nullCount := 0 + for _, i := range indices.Int32Values() { + if a.IsNull(int(i)) { + // If the list is null, repeat the previous offset and set the validity to false + newOffsetBuilder.Append(newOffsetPrevious) + nullBitmapBuilder.Append(false) + nullCount++ + continue + } + + start, end := a.ValueOffsets(int(i)) + // calculate the length of the current list element and add it to the offsets + newOffsetPrevious += int32(end - start) + newOffsetBuilder.Append(newOffsetPrevious) + nullBitmapBuilder.Append(true) + } + newOffsets := newOffsetBuilder.NewInt32Array() + defer newOffsets.Release() + + // Build validity buffer from the boolean builder + var validityBuffer *memory.Buffer + if nullCount > 0 { + nullBitmap := nullBitmapBuilder.NewBooleanArray() + defer nullBitmap.Release() + validityBuffer = nullBitmap.Data().Buffers()[1] + } + + offsetsBuffer := newOffsets.Data().Buffers()[1] + + data := array.NewData( + arrow.ListOf(structArray.DataType()), + indices.Len(), + []*memory.Buffer{validityBuffer, offsetsBuffer}, + []arrow.ArrayData{arrays[0].Data()}, + nullCount, + 0, + ) + defer data.Release() + arr[idx] = array.NewListData(data) + + return nil + default: + return fmt.Errorf("unexpected value builder type %T for list column", r.ValueBuilder()) + } +} + +func TakeStructColumn(ctx context.Context, a *array.Struct, idx int, arr []arrow.Array, indices *array.Int32) error { + aType := a.Data().DataType().(*arrow.StructType) + + // Immediately, return this struct if it has no fields/columns + if a.NumField() == 0 { + // If the original record is released and this is released once more, + // as usually done, we want to retain it once more. + a.Retain() + arr[idx] = a + return nil + } + + cols := make([]arrow.Array, a.NumField()) + names := make([]string, a.NumField()) + defer func() { + for _, col := range cols { + if col != nil { + col.Release() + } + } + }() + + for i := 0; i < a.NumField(); i++ { + names[i] = aType.Field(i).Name + + switch f := a.Field(i).(type) { + case *array.RunEndEncoded: + if err := TakeRunEndEncodedColumn(ctx, f, i, cols, indices); err != nil { + return err + } + case *array.Dictionary: + if err := TakeDictColumn(ctx, f, i, cols, indices); err != nil { + return err + } + case *array.List: + if err := TakeListColumn(ctx, f, i, cols, indices); err != nil { + return err + } + default: + err := TakeColumn(ctx, f, i, cols, indices) + if err != nil { + return err + } + } + } + + takeStruct, err := array.NewStructArray(cols, names) + if err != nil { + return err + } + + arr[idx] = takeStruct + return nil +} + +type multiColSorter struct { + indices *builder.OptInt32Builder + comparisons []comparator + directions []int + nullsFirst []bool +} + +func newMultiColSorter( + r arrow.Record, + columns []SortingColumn, +) (*multiColSorter, error) { + ms := multiColSorterPool.Get().(*multiColSorter) + if r.NumRows() <= 1 { + if r.NumRows() == 1 { + ms.indices.Append(0) + } + return ms, nil + } + ms.Reserve(int(r.NumRows()), len(columns)) + for i := range columns { + ms.directions[i] = columns[i].Direction.comparison() + ms.nullsFirst[i] = columns[i].NullsFirst + } + for i, col := range columns { + switch e := r.Column(col.Index).(type) { + case *array.Int16: + ms.comparisons[i] = newOrderedSorter[int16](e, cmp.Compare) + case *array.Int32: + ms.comparisons[i] = newOrderedSorter[int32](e, cmp.Compare) + case *array.Int64: + ms.comparisons[i] = newOrderedSorter[int64](e, cmp.Compare) + case *array.Uint16: + ms.comparisons[i] = newOrderedSorter[uint16](e, cmp.Compare) + case *array.Uint32: + ms.comparisons[i] = newOrderedSorter[uint32](e, cmp.Compare) + case *array.Uint64: + ms.comparisons[i] = newOrderedSorter[uint64](e, cmp.Compare) + case *array.Float64: + ms.comparisons[i] = newOrderedSorter[float64](e, cmp.Compare) + case *array.String: + ms.comparisons[i] = newOrderedSorter[string](e, cmp.Compare) + case *array.Binary: + ms.comparisons[i] = newOrderedSorter[[]byte](e, bytes.Compare) + case *array.Timestamp: + ms.comparisons[i] = newOrderedSorter[arrow.Timestamp](e, cmp.Compare) + case *array.Dictionary: + switch elem := e.Dictionary().(type) { + case *array.String: + ms.comparisons[i] = newOrderedSorter[string]( + &stringDictionary{ + dict: e, + elem: elem, + }, + cmp.Compare, + ) + case *array.Binary: + ms.comparisons[i] = newOrderedSorter[[]byte]( + &binaryDictionary{ + dict: e, + elem: elem, + }, + bytes.Compare, + ) + case *array.FixedSizeBinary: + ms.comparisons[i] = newOrderedSorter[[]byte]( + &fixedSizeBinaryDictionary{ + dict: e, + elem: elem, + }, + bytes.Compare, + ) + default: + ms.Release() + return nil, fmt.Errorf("unsupported dictionary column type for sorting %T for column %s", e, r.Schema().Field(col.Index).Name) + } + default: + ms.Release() + return nil, fmt.Errorf("unsupported column type for sorting %T for column %s", e, r.Schema().Field(col.Index).Name) + } + } + return ms, nil +} + +func (m *multiColSorter) Reserve(rows, columns int) { + m.indices.Reserve(rows) + for i := 0; i < rows; i++ { + m.indices.Set(i, int32(i)) + } + m.comparisons = slices.Grow(m.comparisons, columns)[:columns] + m.directions = slices.Grow(m.directions, columns)[:columns] + m.nullsFirst = slices.Grow(m.nullsFirst, columns)[:columns] +} + +func (m *multiColSorter) Reset() { + m.indices.Reserve(0) + m.comparisons = m.comparisons[:0] + m.directions = m.directions[:0] + m.nullsFirst = m.nullsFirst[:0] +} + +func (m *multiColSorter) Release() { + m.Reset() + multiColSorterPool.Put(m) +} + +var multiColSorterPool = &sync.Pool{ + New: func() any { + return &multiColSorter{ + indices: builder.NewOptInt32Builder(arrow.PrimitiveTypes.Int32), + } + }, +} + +var _ sort.Interface = (*multiColSorter)(nil) + +func (m *multiColSorter) Len() int { return m.indices.Len() } + +func (m *multiColSorter) Less(i, j int) bool { + for idx := range m.comparisons { + cmp := m.compare(idx, int(m.indices.Value(i)), int(m.indices.Value(j))) + if cmp != 0 { + // Use direction to reorder the comparison. Direction determines if the list + // is in ascending or descending. + // + // For instance if comparison between i,j value is -1 and direction is -1 + // this will resolve to true hence the list will be in ascending order. Same + // principle applies for descending. + return cmp == m.directions[idx] + } + // Try comparing the next column + } + return false +} + +func (m *multiColSorter) compare(idx, i, j int) int { + x := m.comparisons[idx] + if x.IsNull(i) { + if x.IsNull(j) { + return 0 + } + if m.directions[idx] == 1 { + if m.nullsFirst[idx] { + return 1 + } + return -1 + } + if m.nullsFirst[idx] { + return -1 + } + return 1 + } + if x.IsNull(j) { + if m.directions[idx] == 1 { + if m.nullsFirst[idx] { + return -1 + } + return 1 + } + if m.nullsFirst[idx] { + return 1 + } + return -1 + } + return x.Compare(i, j) +} + +func (m *multiColSorter) Swap(i, j int) { + m.indices.Swap(i, j) +} + +type comparator interface { + Compare(i, j int) int + IsNull(int) bool +} + +type orderedArray[T any] interface { + Value(int) T + IsNull(int) bool +} + +type orderedSorter[T any] struct { + array orderedArray[T] + compare func(T, T) int +} + +func newOrderedSorter[T any](a orderedArray[T], compare func(T, T) int) *orderedSorter[T] { + return &orderedSorter[T]{ + array: a, + compare: compare, + } +} + +func (s *orderedSorter[T]) IsNull(i int) bool { + return s.array.IsNull(i) +} + +func (s *orderedSorter[T]) Compare(i, j int) int { + return s.compare(s.array.Value(i), s.array.Value(j)) +} + +type stringDictionary struct { + dict *array.Dictionary + elem *array.String +} + +func (s *stringDictionary) IsNull(i int) bool { + return s.dict.IsNull(i) +} + +func (s *stringDictionary) Value(i int) string { + return s.elem.Value(s.dict.GetValueIndex(i)) +} + +type binaryDictionary struct { + dict *array.Dictionary + elem *array.Binary +} + +func (s *binaryDictionary) IsNull(i int) bool { + return s.dict.IsNull(i) +} + +func (s *binaryDictionary) Value(i int) []byte { + return s.elem.Value(s.dict.GetValueIndex(i)) +} + +type fixedSizeBinaryDictionary struct { + dict *array.Dictionary + elem *array.FixedSizeBinary +} + +func (s *fixedSizeBinaryDictionary) IsNull(i int) bool { + return s.dict.IsNull(i) +} + +func (s *fixedSizeBinaryDictionary) Value(i int) []byte { + return s.elem.Value(s.dict.GetValueIndex(i)) +} diff --git a/pkg/query/internal/arrowutils/sort_test.go b/pkg/query/internal/arrowutils/sort_test.go new file mode 100644 index 00000000000..7b1ca74b714 --- /dev/null +++ b/pkg/query/internal/arrowutils/sort_test.go @@ -0,0 +1,857 @@ +package arrowutils + +import ( + "context" + "fmt" + "sort" + "strings" + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/compute" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/stretchr/testify/require" +) + +func TestSortRecord(t *testing.T) { + null := func(v int64) *int64 { + return &v + } + + cases := []SortCase{ + { + Name: "must provide at least one column", + Samples: Samples{ + {}, + }, + Error: "expected missing column error", + }, + + { + Name: "No Nows", + Samples: Samples{}, + Columns: []SortingColumn{{Index: 0}}, + }, + { + Name: "One Row", + Samples: Samples{ + {}, + }, + Columns: []SortingColumn{ + { + Index: 0, + }, + }, + Indices: []int32{0}, + }, + { + Name: "By Integer column ascending", + Samples: Samples{ + {Int: 3}, + {Int: 2}, + {Int: 1}, + }, + Columns: []SortingColumn{ + {Index: 0}, + }, + Indices: []int32{2, 1, 0}, + }, + { + Name: "By Integer column descending", + Samples: Samples{ + {Int: 1}, + {Int: 2}, + {Int: 3}, + }, + + Columns: []SortingColumn{ + {Index: 0, Direction: Descending}, + }, + Indices: []int32{2, 1, 0}, + }, + { + Name: "By Double column ascending", + Samples: Samples{ + {Double: 3}, + {Double: 2}, + {Double: 1}, + }, + Columns: []SortingColumn{{Index: 1}}, + Indices: []int32{2, 1, 0}, + }, + { + Name: "By Double column descending", + Samples: Samples{ + {Double: 1}, + {Double: 2}, + {Double: 3}, + }, + Columns: []SortingColumn{{Index: 1, Direction: Descending}}, + Indices: []int32{2, 1, 0}, + }, + { + Name: "By String column ascending", + Samples: Samples{ + {String: "3"}, + {String: "2"}, + {String: "1"}, + }, + Columns: []SortingColumn{{Index: 2}}, + Indices: []int32{2, 1, 0}, + }, + { + Name: "By String column descending", + Samples: Samples{ + {String: "1"}, + {String: "2"}, + {String: "3"}, + }, + Columns: []SortingColumn{{Index: 2, Direction: Descending}}, + Indices: []int32{2, 1, 0}, + }, + { + Name: "By Timestamp column ascending", + Samples: Samples{ + {Timestamp: 3}, + {Timestamp: 2}, + {Timestamp: 1}, + }, + Columns: []SortingColumn{{Index: 6}}, + Indices: []int32{2, 1, 0}, + }, + { + Name: "By Timestamp column descending", + Samples: Samples{ + {Timestamp: 1}, + {Timestamp: 2}, + {Timestamp: 3}, + }, + Columns: []SortingColumn{{Index: 6, Direction: Descending}}, + Indices: []int32{2, 1, 0}, + }, + { + Name: "By Dict column ascending", + Samples: Samples{ + {Dict: "3"}, + {Dict: "2"}, + {Dict: "1"}, + }, + Columns: []SortingColumn{{Index: 3}}, + Indices: []int32{2, 1, 0}, + }, + { + Name: "By Dict column descending", + Samples: Samples{ + {Dict: "1"}, + {Dict: "2"}, + {Dict: "3"}, + }, + Columns: []SortingColumn{{Index: 3, Direction: Descending}}, + Indices: []int32{2, 1, 0}, + }, + { + Name: "By DictFixed column ascending", + Samples: Samples{ + {DictFixed: [2]byte{0, 3}}, + {DictFixed: [2]byte{0, 2}}, + {DictFixed: [2]byte{0, 1}}, + }, + Columns: []SortingColumn{{Index: 4}}, + Indices: []int32{2, 1, 0}, + }, + { + Name: "By DictFixed column descending", + Samples: Samples{ + {DictFixed: [2]byte{0, 1}}, + {DictFixed: [2]byte{0, 2}}, + {DictFixed: [2]byte{0, 3}}, + }, + Columns: []SortingColumn{{Index: 4, Direction: Descending}}, + Indices: []int32{2, 1, 0}, + }, + { + Name: "By Null column ascending", + Samples: Samples{ + {}, + {}, + {Nullable: null(1)}, + }, + Columns: []SortingColumn{{Index: 5}}, + Indices: []int32{2, 0, 1}, + }, + { + Name: "By Null column ascending nullsFirst", + Samples: Samples{ + {}, + {}, + {Nullable: null(1)}, + }, + Columns: []SortingColumn{{Index: 5, NullsFirst: true}}, + Indices: []int32{0, 1, 2}, + }, + { + Name: "By Null column descending", + Samples: Samples{ + {}, + {}, + {Nullable: null(1)}, + }, + Columns: []SortingColumn{{Index: 5, Direction: Descending}}, + Indices: []int32{2, 0, 1}, + }, + { + Name: "By Null column descending nullsFirst", + Samples: Samples{ + {}, + {}, + {Nullable: null(1)}, + }, + Columns: []SortingColumn{{Index: 5, Direction: Descending, NullsFirst: true}}, + Indices: []int32{0, 1, 2}, + }, + { + Name: "Multiple columns same direction", + Samples: Samples{ + {String: "1", Int: 3}, + {String: "2", Int: 2}, + {String: "3", Int: 2}, + {String: "4", Int: 1}, + }, + Columns: []SortingColumn{ + {Index: 0}, + {Index: 2}, + }, + Indices: []int32{3, 1, 2, 0}, + }, + { + Name: "Multiple columns different direction", + Samples: Samples{ + {String: "1", Int: 3}, + {String: "2", Int: 2}, + {String: "3", Int: 2}, + {String: "4", Int: 1}, + }, + Columns: []SortingColumn{ + {Index: 0, Direction: Ascending}, + {Index: 2, Direction: Descending}, + }, + Indices: []int32{3, 2, 1, 0}, + }, + } + + for _, kase := range cases { + t.Run(kase.Name, func(t *testing.T) { + sortAndCompare(t, kase) + }) + } +} + +func TestSortRecordBuilderReuse(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + + schema := arrow.NewSchema([]arrow.Field{{Name: "int64", Type: arrow.PrimitiveTypes.Int64}}, nil) + + b1 := array.NewInt64Builder(mem) + b1.AppendValues([]int64{3, 2, 1}, nil) + arr1 := b1.NewArray() + r1 := array.NewRecord(schema, []arrow.Array{arr1}, 3) + + ms, err := newMultiColSorter(r1, []SortingColumn{{Index: 0}}) + require.Nil(t, err) + sort.Sort(ms) + sortedArr1 := ms.indices.NewArray().(*array.Int32) + require.Equal(t, []int32{2, 1, 0}, sortedArr1.Int32Values()) + ms.Release() // usually defer + + b2 := array.NewInt64Builder(mem) + b2.AppendValues([]int64{2, 1}, nil) + arr2 := b2.NewArray() + r2 := array.NewRecord(schema, []arrow.Array{arr2}, 2) + + ms, err = newMultiColSorter(r2, []SortingColumn{{Index: 0}}) + require.Nil(t, err) + sort.Sort(ms) + sortedArr2 := ms.indices.NewArray().(*array.Int32) + require.Equal(t, []int32{1, 0}, sortedArr2.Int32Values()) + ms.Release() // usually defer + + // This failed before the fix because the builder's data was reused. + require.Equal(t, []int32{2, 1, 0}, sortedArr1.Int32Values()) + require.Equal(t, []int32{1, 0}, sortedArr2.Int32Values()) +} + +func TestReorderRecord(t *testing.T) { + readRunEndEncodedDictionary := func(arr *array.RunEndEncoded) string { + arrDict := arr.Values().(*array.Dictionary) + arrDictValues := arrDict.Dictionary().(*array.String) + + values := make([]string, arr.Len()) + for i := 0; i < arr.Len(); i++ { + physicalIndex := arr.GetPhysicalIndex(i) + if arrDict.IsNull(physicalIndex) { + values[i] = array.NullValueStr + continue + } + valueIndex := arrDict.GetValueIndex(physicalIndex) + values[i] = arrDictValues.Value(valueIndex) + } + return "[" + strings.Join(values, " ") + "]" + } + + t.Run("Simple", func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + b := array.NewRecordBuilder(mem, arrow.NewSchema( + []arrow.Field{ + { + Name: "int", + Type: arrow.PrimitiveTypes.Int64, + }, + }, nil, + )) + defer b.Release() + b.Field(0).(*array.Int64Builder).AppendValues([]int64{3, 2, 1}, nil) + r := b.NewRecord() + defer r.Release() + + indices := array.NewInt32Builder(mem) + indices.AppendValues([]int32{2, 1, 0}, nil) + by := indices.NewInt32Array() + defer by.Release() + result, err := Take(compute.WithAllocator(context.Background(), mem), r, by) + require.Nil(t, err) + defer result.Release() + + want := []int64{1, 2, 3} + require.Equal(t, want, result.Column(0).(*array.Int64).Int64Values()) + }) + t.Run("WithStringDict", func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + b := array.NewRecordBuilder(mem, arrow.NewSchema( + []arrow.Field{ + { + Name: "dict", + Type: &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Int32, + ValueType: arrow.BinaryTypes.String, + }, + }, + }, nil, + )) + defer b.Release() + d := b.Field(0).(*array.BinaryDictionaryBuilder) + require.NoError(t, d.AppendString("3")) + require.NoError(t, d.AppendString("2")) + require.NoError(t, d.AppendString("1")) + d.AppendNull() + require.NoError(t, d.AppendString("3")) + r := b.NewRecord() + defer r.Release() + + indices := array.NewInt32Builder(mem) + indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil) + by := indices.NewInt32Array() + defer by.Release() + result, err := Take(compute.WithAllocator(context.Background(), mem), r, by) + require.NoError(t, err) + defer result.Release() + + want := []string{"1", "2", "3", "3", ""} + got := result.Column(0).(*array.Dictionary) + require.Equal(t, len(want), got.Len()) + for i, v := range want { + if v == "" { + require.True(t, got.IsNull(i)) + continue + } + require.Equal(t, want[i], got.ValueStr(i)) + } + }) + t.Run("RunEndEncoded", func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + b := array.NewRecordBuilder(mem, arrow.NewSchema( + []arrow.Field{ + { + Name: "ree", + Type: arrow.RunEndEncodedOf( + arrow.PrimitiveTypes.Int32, + &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint32, + ValueType: arrow.BinaryTypes.String, + }), + }, + }, nil, + )) + defer b.Release() + + ree := b.Field(0).(*array.RunEndEncodedBuilder) + require.NoError(t, ree.AppendValueFromString("3")) + require.NoError(t, ree.AppendValueFromString("2")) + require.NoError(t, ree.AppendValueFromString("1")) + ree.AppendNull() + require.NoError(t, ree.AppendValueFromString("3")) + r := b.NewRecord() + defer r.Release() + + indices := array.NewInt32Builder(mem) + indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil) + by := indices.NewInt32Array() + defer by.Release() + + // Reordering + + result, err := Take(compute.WithAllocator(context.Background(), mem), r, by) + require.NoError(t, err) + defer result.Release() + + // Testing + + sorted := result.Column(0).(*array.RunEndEncoded) + sortedEnds := sorted.RunEndsArr().(*array.Int32) + // notice how the index to 3 is runEndEncoded + require.Equal(t, "[1 2 4 5]", sortedEnds.String()) + require.Equal(t, "[1 2 3 3 (null)]", readRunEndEncodedDictionary(sorted)) + }) + t.Run("WithFixedSizeBinaryDict", func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + b := array.NewRecordBuilder(mem, arrow.NewSchema( + []arrow.Field{ + { + Name: "dict", + Type: &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Int32, + ValueType: &arrow.FixedSizeBinaryType{ByteWidth: 2}, + }, + }, + }, nil, + )) + defer b.Release() + d := b.Field(0).(*array.FixedSizeBinaryDictionaryBuilder) + require.NoError(t, d.Append([]byte{0, 3})) + require.NoError(t, d.Append([]byte{0, 2})) + require.NoError(t, d.Append([]byte{0, 1})) + d.AppendNull() + require.NoError(t, d.Append([]byte{0, 3})) + r := b.NewRecord() + defer r.Release() + + indices := array.NewInt32Builder(mem) + indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil) + by := indices.NewInt32Array() + defer by.Release() + result, err := Take(compute.WithAllocator(context.Background(), mem), r, by) + require.NoError(t, err) + defer result.Release() + + want := [][]byte{{0, 1}, {0, 2}, {0, 3}, {0, 3}, {}} + got := result.Column(0).(*array.Dictionary) + require.Equal(t, len(want), got.Len()) + for i, v := range want { + if len(v) == 0 { + require.True(t, got.IsNull(i)) + continue + } + require.Equal(t, want[i], got.Dictionary().(*array.FixedSizeBinary).Value(got.GetValueIndex(i))) + } + }) + t.Run("List", func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + b := array.NewRecordBuilder(mem, arrow.NewSchema( + []arrow.Field{ + { + Name: "list", + Type: arrow.ListOf(&arrow.DictionaryType{IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrow.BinaryTypes.String}), + }, + }, nil, + )) + defer b.Release() + lb := b.Field(0).(*array.ListBuilder) + vb := lb.ValueBuilder().(*array.BinaryDictionaryBuilder) + lb.Append(true) + require.NoError(t, vb.AppendString("1")) + require.NoError(t, vb.AppendString("2")) + require.NoError(t, vb.AppendString("3")) + require.NoError(t, vb.AppendString("1")) + lb.Append(false) + lb.Append(true) + require.NoError(t, vb.AppendString("4")) + require.NoError(t, vb.AppendString("5")) + require.NoError(t, vb.AppendString("6")) + lb.Append(true) + require.NoError(t, vb.AppendString("3")) + require.NoError(t, vb.AppendString("3")) + require.NoError(t, vb.AppendString("3")) + require.NoError(t, vb.AppendString("4")) + r := b.NewRecord() + defer r.Release() + + indices := array.NewInt32Builder(mem) + indices.AppendValues([]int32{2, 1, 0, 3}, nil) + by := indices.NewInt32Array() + defer by.Release() + result, err := Take( + compute.WithAllocator(context.Background(), mem), r, by) + require.Nil(t, err) + defer result.Release() + + got := result.Column(0).(*array.List) + expected := []string{ + "[\"4\",\"5\",\"6\"]", + "", + "[\"1\",\"2\",\"3\",\"1\"]", + "[\"3\",\"3\",\"3\",\"4\"]", + } + require.Equal(t, len(expected), got.Len()) + for i, v := range expected { + if len(v) == 0 { + require.True(t, got.IsNull(i), "expected null at %d", i) + continue + } + require.Equal(t, expected[i], got.ValueStr(i), "unexpected value at %d", i) + } + }) + t.Run("Struct", func(t *testing.T) { + LabelArrowType := arrow.RunEndEncodedOf( + arrow.PrimitiveTypes.Int32, + &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Uint32, + ValueType: arrow.BinaryTypes.String, + }, + ) + + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + b := array.NewRecordBuilder(mem, arrow.NewSchema( + []arrow.Field{ + { + Name: "struct", + Type: arrow.StructOf( + arrow.Field{Name: "first", Type: LabelArrowType, Nullable: true}, + arrow.Field{Name: "second", Type: LabelArrowType, Nullable: true}, + arrow.Field{Name: "third", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, + ), + }, + }, &arrow.Metadata{}, + )) + defer b.Release() + + sb := b.Field(0).(*array.StructBuilder) + firstFieldBuilder := sb.FieldBuilder(0).(*array.RunEndEncodedBuilder) + secondFieldBuilder := sb.FieldBuilder(1).(*array.RunEndEncodedBuilder) + thirdFieldBuilder := sb.FieldBuilder(2).(*array.Int64Builder) + + sb.Append(true) + require.NoError(t, firstFieldBuilder.AppendValueFromString("3")) + require.NoError(t, secondFieldBuilder.AppendValueFromString("1")) + thirdFieldBuilder.Append(1) + sb.Append(true) + require.NoError(t, firstFieldBuilder.AppendValueFromString("2")) + require.NoError(t, secondFieldBuilder.AppendValueFromString("2")) + thirdFieldBuilder.Append(2) + sb.Append(true) + require.NoError(t, firstFieldBuilder.AppendValueFromString("1")) + require.NoError(t, secondFieldBuilder.AppendValueFromString("3")) + thirdFieldBuilder.Append(3) + sb.Append(true) + firstFieldBuilder.AppendNull() + require.NoError(t, secondFieldBuilder.AppendValueFromString("4")) + thirdFieldBuilder.Append(4) + sb.Append(true) + require.NoError(t, firstFieldBuilder.AppendValueFromString("3")) + require.NoError(t, secondFieldBuilder.AppendValueFromString("5")) + thirdFieldBuilder.Append(5) + + r := b.NewRecord() + defer r.Release() + + indices := array.NewInt32Builder(mem) + indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil) + by := indices.NewInt32Array() + defer by.Release() + result, err := Take(compute.WithAllocator(context.Background(), mem), r, by) + require.Nil(t, err) + defer result.Release() + resultStruct := result.Column(0).(*array.Struct) + + require.Equal(t, "[1 2 3 3 (null)]", readRunEndEncodedDictionary(resultStruct.Field(0).(*array.RunEndEncoded))) + require.Equal(t, "[3 2 5 1 4]", readRunEndEncodedDictionary(resultStruct.Field(1).(*array.RunEndEncoded))) + require.Equal(t, "[3 2 5 1 4]", resultStruct.Field(2).(*array.Int64).String()) + }) + t.Run("ListStruct", func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + b := array.NewRecordBuilder(mem, arrow.NewSchema([]arrow.Field{ + {Name: "list", Type: arrow.ListOf(arrow.StructOf([]arrow.Field{ + {Name: "int64", Type: arrow.PrimitiveTypes.Int64, Nullable: true}, + {Name: "uint64", Type: arrow.PrimitiveTypes.Uint64, Nullable: true}, + }...))}, + }, nil)) + defer b.Release() + + lb := b.Field(0).(*array.ListBuilder) + sb := lb.ValueBuilder().(*array.StructBuilder) + int64b := sb.FieldBuilder(0).(*array.Int64Builder) + uint64b := sb.FieldBuilder(1).(*array.Uint64Builder) + + lb.Append(true) + sb.Append(true) + int64b.Append(1) + uint64b.Append(2) + sb.Append(true) + int64b.Append(3) + uint64b.Append(4) + + lb.Append(true) + sb.Append(true) + int64b.Append(5) + uint64b.Append(6) + + lb.Append(true) + sb.Append(true) + int64b.Append(7) + uint64b.Append(8) + sb.Append(true) + int64b.Append(9) + uint64b.Append(10) + + r := b.NewRecord() + defer r.Release() + + indices := array.NewInt32Builder(mem) + indices.AppendValues([]int32{2, 1, 0}, nil) + defer indices.Release() + by := indices.NewInt32Array() + defer by.Release() + + result, err := Take(compute.WithAllocator(context.Background(), mem), r, by) + require.Nil(t, err) + defer result.Release() + + require.Equal(t, `[{[7 9] [8 10]} {[5] [6]} {[1 3] [2 4]}]`, result.Column(0).String()) + }) + t.Run("StructEmpty", func(t *testing.T) { + mem := memory.NewCheckedAllocator(memory.NewGoAllocator()) + defer mem.AssertSize(t, 0) + + b := array.NewRecordBuilder(mem, arrow.NewSchema( + []arrow.Field{ + { + Name: "struct", + Type: arrow.StructOf(), + }, + }, &arrow.Metadata{}, + )) + defer b.Release() + b.Field(0).AppendNulls(5) + + r := b.NewRecord() + defer r.Release() + + indices := array.NewInt32Builder(mem) + indices.AppendValues([]int32{2, 1, 4, 0, 3}, nil) + by := indices.NewInt32Array() + defer by.Release() + + result, err := Take(compute.WithAllocator(context.Background(), mem), r, by) + require.Nil(t, err) + defer result.Release() + resultStruct := result.Column(0).(*array.Struct) + resultStruct.Len() + }) +} + +// Use all supported sort field. +type Sample struct { + Int int64 + Double float64 + String string + Dict string + DictFixed [2]byte + Nullable *int64 + Timestamp arrow.Timestamp +} + +type Samples []Sample + +func (s Samples) Record() arrow.Record { + b := array.NewRecordBuilder(memory.NewGoAllocator(), + arrow.NewSchema([]arrow.Field{ + { + Name: "int", + Type: arrow.PrimitiveTypes.Int64, + }, + { + Name: "double", + Type: arrow.PrimitiveTypes.Float64, + }, + { + Name: "string", + Type: arrow.BinaryTypes.String, + }, + { + Name: "dict", + Type: &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Int32, + ValueType: arrow.BinaryTypes.String, + }, + }, + { + Name: "dictFixed", + Type: &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Int32, + ValueType: &arrow.FixedSizeBinaryType{ByteWidth: 2}, + }, + }, + { + Name: "nullable", + Type: arrow.PrimitiveTypes.Int64, + Nullable: true, + }, + { + Name: "timestamp", + Type: &arrow.TimestampType{}, + Nullable: true, + }, + }, nil), + ) + + fInt := b.Field(0).(*array.Int64Builder) + fDouble := b.Field(1).(*array.Float64Builder) + fString := b.Field(2).(*array.StringBuilder) + fBinaryDict := b.Field(3).(*array.BinaryDictionaryBuilder) + fFixedDict := b.Field(4).(*array.FixedSizeBinaryDictionaryBuilder) + fNullable := b.Field(5).(*array.Int64Builder) + fTimestamp := b.Field(6).(*array.TimestampBuilder) + + for _, v := range s { + fInt.Append(v.Int) + fDouble.Append(v.Double) + fString.Append(v.String) + if v.Timestamp == 0 { + fTimestamp.AppendNull() + } else { + fTimestamp.Append(v.Timestamp) + } + _ = fBinaryDict.AppendString(v.Dict) + _ = fFixedDict.Append(v.DictFixed[:]) + if v.Nullable != nil { + fNullable.Append(*v.Nullable) + } else { + fNullable.AppendNull() + } + } + return b.NewRecord() +} + +type SortCase struct { + Name string + Samples Samples + Columns []SortingColumn + Indices []int32 + Error string +} + +func sortAndCompare(t *testing.T, kase SortCase) { + t.Helper() + + got, err := SortRecord(kase.Samples.Record(), kase.Columns) + if kase.Error != "" { + require.NotNil(t, err, kase.Error) + return + } + defer got.Release() + + require.Equal(t, kase.Indices, got.Int32Values()) +} + +func BenchmarkTake(b *testing.B) { + const ( + numRows = 1024 + numValsPerListElem = 4 + ) + mem := memory.NewGoAllocator() + b.Run("Dict", func(b *testing.B) { + rb := array.NewRecordBuilder(mem, arrow.NewSchema( + []arrow.Field{ + { + Name: "dict", + Type: &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Int32, + ValueType: arrow.BinaryTypes.Binary, + }, + }, + }, nil, + )) + defer rb.Release() + d := rb.Field(0).(*array.BinaryDictionaryBuilder) + for i := 0; i < numRows; i++ { + // Interesting to benchmark with a string that appears every other row. + // i.e. only one entry in the dict. + require.NoError(b, d.AppendString("appearseveryotherrow")) + require.NoError(b, d.AppendString(fmt.Sprintf("%d", i))) + } + r := rb.NewRecord() + indices := array.NewInt32Builder(mem) + for i := r.NumRows() - 1; i > 0; i-- { + indices.Append(int32(i)) + } + ctx := compute.WithAllocator(context.Background(), mem) + indArr := indices.NewInt32Array() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := Take(ctx, r, indArr); err != nil { + b.Fatal(err) + } + } + }) + + b.Run("List", func(b *testing.B) { + listb := array.NewRecordBuilder(mem, arrow.NewSchema( + []arrow.Field{ + { + Name: "list", + Type: arrow.ListOf( + &arrow.DictionaryType{ + IndexType: arrow.PrimitiveTypes.Int32, ValueType: arrow.BinaryTypes.Binary, + }, + ), + }, + }, nil, + )) + defer listb.Release() + + l := listb.Field(0).(*array.ListBuilder) + vb := l.ValueBuilder().(*array.BinaryDictionaryBuilder) + for i := 0; i < numRows; i++ { + l.Append(true) + for j := 0; j < numValsPerListElem-1; j++ { + require.NoError(b, vb.AppendString(fmt.Sprintf("%d", i))) + } + require.NoError(b, vb.AppendString("appearseveryrow")) + } + + r := listb.NewRecord() + indices := array.NewInt32Builder(mem) + for i := numRows - 1; i > 0; i-- { + indices.Append(int32(i)) + } + ctx := compute.WithAllocator(context.Background(), mem) + indArr := indices.NewInt32Array() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + if _, err := Take(ctx, r, indArr); err != nil { + b.Fatal(err) + } + } + }) +} diff --git a/pkg/query/internal/arrowutils/utils.go b/pkg/query/internal/arrowutils/utils.go new file mode 100644 index 00000000000..53c6443189b --- /dev/null +++ b/pkg/query/internal/arrowutils/utils.go @@ -0,0 +1,38 @@ +package arrowutils + +import ( + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" +) + +// ArrayConcatenator is an object that helps callers keep track of a slice of +// arrays and concatenate them into a single one when needed. This is more +// efficient and memory safe than using a builder. +type ArrayConcatenator struct { + arrs []arrow.Array +} + +func (c *ArrayConcatenator) Add(arr arrow.Array) { + c.arrs = append(c.arrs, arr) +} + +func (c *ArrayConcatenator) NewArray(mem memory.Allocator) (arrow.Array, error) { + arr, err := array.Concatenate(c.arrs, mem) + if err != nil { + return nil, err + } + c.arrs = c.arrs[:0] + return arr, err +} + +func (c *ArrayConcatenator) Len() int { + return len(c.arrs) +} + +func (c *ArrayConcatenator) Release() { + for _, arr := range c.arrs { + arr.Release() + } + c.arrs = c.arrs[:0] +} diff --git a/pkg/query/internal/builder/doc.go b/pkg/query/internal/builder/doc.go new file mode 100644 index 00000000000..e9450a3be96 --- /dev/null +++ b/pkg/query/internal/builder/doc.go @@ -0,0 +1,10 @@ +// Package builder vendors the optimized Arrow array builders from +// github.com/polarsignals/frostdb/pqarrow/builder. These builders expose +// random-access mutation methods (Set/Add/Value/AppendData) that the +// flamegraph and table query algorithms rely on, which the upstream +// arrow-go array.Builder API does not provide. +// +// AppendParquetValues methods and the parquet-go dependency from the +// upstream package have been dropped — parca uses these builders only on +// the query side and never to convert parquet values. +package builder diff --git a/pkg/query/internal/builder/listbuilder.go b/pkg/query/internal/builder/listbuilder.go new file mode 100644 index 00000000000..36617eaf03e --- /dev/null +++ b/pkg/query/internal/builder/listbuilder.go @@ -0,0 +1,301 @@ +// Copyright (c) The FrostDB Authors. +// Licensed under the Apache License 2.0. + +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package builder + +import ( + "sync/atomic" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/bitutil" + "github.com/apache/arrow-go/v18/arrow/memory" +) + +const ( + minBuilderCapacity = 1 << 5 +) + +// builder provides common functionality for managing the validity bitmap (nulls) when building arrays. +type builder struct { + refCount int64 + mem memory.Allocator + nullBitmap *memory.Buffer + nulls int + length int + capacity int +} + +func (b *builder) init(capacity int) { + toAlloc := bitutil.CeilByte(capacity) / 8 + b.nullBitmap = memory.NewResizableBuffer(b.mem) + b.nullBitmap.Resize(toAlloc) + b.capacity = capacity + memory.Set(b.nullBitmap.Buf(), 0) +} + +func (b *builder) reset() { + if b.nullBitmap != nil { + b.nullBitmap.Release() + b.nullBitmap = nil + } + + b.nulls = 0 + b.length = 0 + b.capacity = 0 +} + +func (b *builder) resize(newBits int, init func(int)) { + if b.nullBitmap == nil { + init(newBits) + return + } + + newBytesN := bitutil.CeilByte(newBits) / 8 + oldBytesN := b.nullBitmap.Len() + b.nullBitmap.Resize(newBytesN) + b.capacity = newBits + if oldBytesN < newBytesN { + // TODO(sgc): necessary? + memory.Set(b.nullBitmap.Buf()[oldBytesN:], 0) + } + if newBits < b.length { + b.length = newBits + b.nulls = newBits - bitutil.CountSetBits(b.nullBitmap.Buf(), 0, newBits) + } +} + +func (b *builder) reserve(elements int, resize func(int)) { + if b.nullBitmap == nil { + b.nullBitmap = memory.NewResizableBuffer(b.mem) + } + if b.length+elements > b.capacity { + newCap := bitutil.NextPowerOf2(b.length + elements) + resize(newCap) + } +} + +// unsafeAppendBoolsToBitmap appends the contents of valid to the validity bitmap. +// As an optimization, if the valid slice is empty, the next length bits will be set to valid (not null). +func (b *builder) unsafeAppendBoolsToBitmap(valid []bool, length int) { + if len(valid) == 0 { + b.unsafeSetValid(length) + return + } + + byteOffset := b.length / 8 + bitOffset := byte(b.length % 8) + nullBitmap := b.nullBitmap.Bytes() + bitSet := nullBitmap[byteOffset] + + for _, v := range valid { + if bitOffset == 8 { + bitOffset = 0 + nullBitmap[byteOffset] = bitSet + byteOffset++ + bitSet = nullBitmap[byteOffset] + } + + if v { + bitSet |= bitutil.BitMask[bitOffset] + } else { + bitSet &= bitutil.FlippedBitMask[bitOffset] + b.nulls++ + } + bitOffset++ + } + + if bitOffset != 0 { + nullBitmap[byteOffset] = bitSet + } + b.length += len(valid) +} + +// unsafeSetValid sets the next length bits to valid in the validity bitmap. +func (b *builder) unsafeSetValid(length int) { + padToByte := min(8-(b.length%8), length) + if padToByte == 8 { + padToByte = 0 + } + bits := b.nullBitmap.Bytes() + for i := b.length; i < b.length+padToByte; i++ { + bitutil.SetBit(bits, i) + } + + start := (b.length + padToByte) / 8 + fastLength := (length - padToByte) / 8 + memory.Set(bits[start:start+fastLength], 0xff) + + newLength := b.length + length + // trailing bytes + for i := b.length + padToByte + (fastLength * 8); i < newLength; i++ { + bitutil.SetBit(bits, i) + } + + b.length = newLength +} + +// ListBuilder is a wrapper over an array.ListBuilder that uses ColumnBuilder as a values buffer. +type ListBuilder struct { + builder + + etype arrow.DataType // data type of the list's elements. + values ColumnBuilder + offsets *array.Int32Builder +} + +func NewListBuilder(mem memory.Allocator, etype arrow.DataType) *ListBuilder { + return &ListBuilder{ + builder: builder{refCount: 1, mem: mem}, + etype: etype, + values: NewBuilder(mem, etype), + offsets: array.NewInt32Builder(mem), + } +} + +// Release decreases the reference count by 1. +// When the reference count goes to zero, the memory is freed. +func (b *ListBuilder) Release() { + if atomic.AddInt64(&b.refCount, -1) == 0 { + if b.nullBitmap != nil { + b.nullBitmap.Release() + b.nullBitmap = nil + } + } + + b.values.Release() + b.offsets.Release() +} + +func (b *ListBuilder) appendNextOffset() { + b.offsets.Append(int32(b.values.Len())) +} + +func (b *ListBuilder) Append(v bool) { + b.Reserve(1) + b.unsafeAppendBoolToBitmap(v) + b.appendNextOffset() +} + +func (b *ListBuilder) AppendNull() { + b.Reserve(1) + b.unsafeAppendBoolToBitmap(false) + b.appendNextOffset() +} + +func (b *ListBuilder) AppendValues(offsets []int32, valid []bool) { + b.Reserve(len(valid)) + b.offsets.AppendValues(offsets, nil) + b.unsafeAppendBoolsToBitmap(valid, len(valid)) +} + +func (b *ListBuilder) unsafeAppendBoolToBitmap(isValid bool) { + if isValid { + bitutil.SetBit(b.nullBitmap.Bytes(), b.length) + } else { + b.nulls++ + } + b.length++ +} + +func (b *ListBuilder) init(capacity int) { + b.builder.init(capacity) + b.offsets.Resize(capacity + 1) +} + +// Reserve ensures there is enough space for appending n elements +// by checking the capacity and calling Resize if necessary. +func (b *ListBuilder) Reserve(n int) { + b.reserve(n, b.resizeHelper) + b.offsets.Reserve(n) +} + +// Resize adjusts the space allocated by b to n elements. If n is greater than b.Cap(), +// additional memory will be allocated. If n is smaller, the allocated memory may reduced. +func (b *ListBuilder) Resize(n int) { + b.resizeHelper(n) + b.offsets.Resize(n) +} + +func (b *ListBuilder) resizeHelper(n int) { + if n < minBuilderCapacity { + n = minBuilderCapacity + } + + if b.capacity == 0 { + b.init(n) + } else { + b.resize(n, b.builder.init) + } +} + +func (b *ListBuilder) ValueBuilder() ColumnBuilder { + return b.values +} + +// NewArray creates a List array from the memory buffers used by the builder and resets the ListBuilder +// so it can be used to build a new array. +func (b *ListBuilder) NewArray() arrow.Array { + return b.NewListArray() +} + +func (b *ListBuilder) Len() int { + return b.length +} + +// NewListArray creates a List array from the memory buffers used by the builder and resets the ListBuilder +// so it can be used to build a new array. +func (b *ListBuilder) NewListArray() (a *array.List) { + if b.offsets.Len() != b.length+1 { + b.appendNextOffset() + } + data := b.newData() + a = array.NewListData(data) + data.Release() + return +} + +func (b *ListBuilder) newData() (data *array.Data) { + values := b.values.NewArray() + defer values.Release() + + var offsets *memory.Buffer + if b.offsets != nil { + arr := b.offsets.NewInt32Array() + defer arr.Release() + offsets = arr.Data().Buffers()[1] + } + + data = array.NewData( + arrow.ListOf(b.etype), b.length, + []*memory.Buffer{ + b.nullBitmap, + offsets, + }, + []arrow.ArrayData{values.Data()}, + b.nulls, + 0, + ) + b.reset() + + return +} + +func (b *ListBuilder) Retain() { + b.values.Retain() +} diff --git a/pkg/query/internal/builder/optbuilders.go b/pkg/query/internal/builder/optbuilders.go new file mode 100644 index 00000000000..384a67e8c58 --- /dev/null +++ b/pkg/query/internal/builder/optbuilders.go @@ -0,0 +1,744 @@ +package builder + +import ( + "fmt" + "math" + "slices" + "sync/atomic" + "unsafe" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/bitutil" + "github.com/apache/arrow-go/v18/arrow/memory" +) + +// ColumnBuilder is a subset of the array.Builder interface implemented by the +// optimized builders in this file. +type ColumnBuilder interface { + Retain() + Release() + Len() int + AppendNull() + Reserve(int) + NewArray() arrow.Array +} + +// OptimizedBuilder is a set of FrostDB specific builder methods. +type OptimizedBuilder interface { + ColumnBuilder + AppendNulls(int) + ResetToLength(int) + RepeatLastValue(int) error + IsNull(i int) bool + IsValid(i int) bool + SetNull(i int) +} + +type builderBase struct { + dtype arrow.DataType + refCount int64 + length int + validityBitmap []byte +} + +func (b *builderBase) reset() { + b.length = 0 + b.validityBitmap = b.validityBitmap[:0] +} + +func (b *builderBase) Retain() { + atomic.AddInt64(&b.refCount, 1) +} + +func (b *builderBase) releaseInternal() { + b.length = 0 + b.validityBitmap = nil +} + +func (b *builderBase) Release() { + atomic.AddInt64(&b.refCount, -1) + b.releaseInternal() +} + +// Len returns the number of elements in the array builder. +func (b *builderBase) Len() int { + return b.length +} + +func (b *builderBase) Reserve(int) {} + +// AppendNulls appends n null values to the array being built. This is specific +// to distinct optimizations in FrostDB. +func (b *builderBase) AppendNulls(n int) { + b.validityBitmap = resizeBitmap(b.validityBitmap, b.length+n) + bitutil.SetBitsTo(b.validityBitmap, int64(b.length), int64(n), false) + b.length += n +} + +// SetNull is setting the value at the index i to null. +func (b *builderBase) SetNull(i int) { + bitutil.ClearBit(b.validityBitmap, i) +} + +func (b *builderBase) IsValid(n int) bool { + return bitutil.BitIsSet(b.validityBitmap, n) +} + +// appendValid does the opposite of appendNulls. +func (b *builderBase) appendValid(n int) { + b.validityBitmap = resizeBitmap(b.validityBitmap, b.length+n) + bitutil.SetBitsTo(b.validityBitmap, int64(b.length), int64(n), true) + b.length += n +} + +func (b *builderBase) IsNull(n int) bool { + return bitutil.BitIsNotSet(b.validityBitmap, n) +} + +func resizeBitmap(bitmap []byte, valuesToRepresent int) []byte { + bytesNeeded := int(bitutil.BytesForBits(int64(valuesToRepresent))) + if cap(bitmap) < bytesNeeded { + existingBitmap := bitmap + bitmap = make([]byte, bitutil.NextPowerOf2(bytesNeeded)) + copy(bitmap, existingBitmap) + } + return bitmap[:bytesNeeded] +} + +var ( + _ OptimizedBuilder = (*OptBinaryBuilder)(nil) + _ OptimizedBuilder = (*OptInt64Builder)(nil) + _ OptimizedBuilder = (*OptBooleanBuilder)(nil) + _ OptimizedBuilder = (*OptFloat64Builder)(nil) +) + +// OptBinaryBuilder is an optimized array.BinaryBuilder. +type OptBinaryBuilder struct { + builderBase + + data []byte + // offsets are offsets into data. The ith value is + // data[offsets[i]:offsets[i+1]]. Note however, that during normal operation, + // len(data) is never appended to the slice until the next value is added, + // i.e. the last offset is never closed until the offsets slice is appended + // to or returned to the caller. + offsets []uint32 +} + +func NewOptBinaryBuilder(dtype arrow.BinaryDataType) *OptBinaryBuilder { + b := &OptBinaryBuilder{} + b.dtype = dtype + return b +} + +// Release decreases the reference count by 1. +// When the reference count goes to zero, the memory is freed. +// Release may be called simultaneously from multiple goroutines. +func (b *OptBinaryBuilder) Release() { + if atomic.AddInt64(&b.refCount, -1) == 0 { + b.data = nil + b.offsets = nil + b.releaseInternal() + } +} + +// AppendNull adds a new null value to the array being built. This is slow, +// don't use it. +func (b *OptBinaryBuilder) AppendNull() { + b.offsets = append(b.offsets, uint32(len(b.data))) + b.builderBase.AppendNulls(1) +} + +// AppendEmptyValue adds a new empty byte slice to the array being built. +func (b *OptBinaryBuilder) AppendEmptyValue() { + b.offsets = append(b.offsets, uint32(len(b.data))) + // Don't append any data, just close the offset for an empty slice + b.appendValid(1) +} + +// AppendNulls appends n null values to the array being built. This is specific +// to distinct optimizations in FrostDB. +func (b *OptBinaryBuilder) AppendNulls(n int) { + for i := 0; i < n; i++ { + b.offsets = append(b.offsets, uint32(len(b.data))) + } + b.builderBase.AppendNulls(n) +} + +// NewArray creates a new array from the memory buffers used +// by the builder and resets the Builder so it can be used to build +// a new array. +func (b *OptBinaryBuilder) NewArray() arrow.Array { + b.offsets = append(b.offsets, uint32(len(b.data))) + offsetsAsBytes := unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(b.offsets))), len(b.offsets)*arrow.Uint32SizeBytes) + data := array.NewData( + b.dtype, + b.length, + []*memory.Buffer{ + memory.NewBufferBytes(b.validityBitmap), + memory.NewBufferBytes(offsetsAsBytes), + memory.NewBufferBytes(b.data), + }, + nil, + b.length-bitutil.CountSetBits(b.validityBitmap, 0, b.length), + 0, + ) + b.reset() + b.offsets = b.offsets[:0] + b.data = nil + + return array.NewBinaryData(data) +} + +var ErrMaxSizeReached = fmt.Errorf("max size reached") + +// AppendData appends a flat slice of bytes to the builder, with an accompanying +// slice of offsets. This data is considered to be non-null. +func (b *OptBinaryBuilder) AppendData(data []byte, offsets []uint32) error { + if len(b.data)+len(data) > math.MaxInt32 { // NOTE: we check against a max int32 here (instead of the uint32 that we're using for offsets) because the arror binary arrays use int32s. + return ErrMaxSizeReached + } + + // Trim the last offset since we want this last range to be "open". + offsets = offsets[:len(offsets)-1] + + offsetConversion := uint32(len(b.data)) + b.data = append(b.data, data...) + startOffset := len(b.offsets) + b.offsets = append(b.offsets, offsets...) + for curOffset := startOffset; curOffset < len(b.offsets); curOffset++ { + b.offsets[curOffset] += offsetConversion + } + + b.length += len(offsets) + b.validityBitmap = resizeBitmap(b.validityBitmap, b.length) + bitutil.SetBitsTo(b.validityBitmap, int64(startOffset), int64(len(offsets)), true) + return nil +} + +func (b *OptBinaryBuilder) Append(v []byte) error { + if len(b.data)+len(v) > math.MaxInt32 { + return ErrMaxSizeReached + } + b.offsets = append(b.offsets, uint32(len(b.data))) + b.data = append(b.data, v...) + b.length++ + b.validityBitmap = resizeBitmap(b.validityBitmap, b.length) + bitutil.SetBit(b.validityBitmap, b.length-1) + return nil +} + +// RepeatLastValue is specific to distinct optimizations in FrostDB. +func (b *OptBinaryBuilder) RepeatLastValue(n int) error { + if bitutil.BitIsNotSet(b.validityBitmap, b.length-1) { + // Last value is null. + b.AppendNulls(n) + return nil + } + + lastValue := b.data[b.offsets[len(b.offsets)-1]:] + if len(b.data)+(len(lastValue)*n) > math.MaxInt32 { + return ErrMaxSizeReached + } + for i := 0; i < n; i++ { + b.offsets = append(b.offsets, uint32(len(b.data))) + b.data = append(b.data, lastValue...) + } + b.appendValid(n) + return nil +} + +// ResetToLength is specific to distinct optimizations in FrostDB. +func (b *OptBinaryBuilder) ResetToLength(n int) { + if n == b.length { + return + } + + b.length = n + b.data = b.data[:b.offsets[n]] + b.offsets = b.offsets[:n] + b.validityBitmap = resizeBitmap(b.validityBitmap, n) +} + +func (b *OptBinaryBuilder) Value(i int) []byte { + if i == b.length-1 { // last value + return b.data[b.offsets[i]:] + } + return b.data[b.offsets[i]:b.offsets[i+1]] +} + +type OptInt64Builder struct { + builderBase + + data []int64 +} + +func NewOptInt64Builder(dtype arrow.DataType) *OptInt64Builder { + b := &OptInt64Builder{} + b.dtype = dtype + return b +} + +func (b *OptInt64Builder) resizeData(neededLength int) { + if cap(b.data) < neededLength { + oldData := b.data + b.data = make([]int64, bitutil.NextPowerOf2(neededLength)) + copy(b.data, oldData) + } + b.data = b.data[:neededLength] +} + +func (b *OptInt64Builder) Release() { + if atomic.AddInt64(&b.refCount, -1) == 0 { + b.data = nil + b.releaseInternal() + } +} + +func (b *OptInt64Builder) AppendNull() { + b.AppendNulls(1) +} + +// AppendEmptyValue adds a new zero value (0) to the array being built. +func (b *OptInt64Builder) AppendEmptyValue() { + b.Append(0) +} + +func (b *OptInt64Builder) AppendNulls(n int) { + b.resizeData(b.length + n) + b.builderBase.AppendNulls(n) +} + +func (b *OptInt64Builder) NewArray() arrow.Array { + dataAsBytes := unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(b.data))), len(b.data)*arrow.Int64SizeBytes) + data := array.NewData( + b.dtype, + b.length, + []*memory.Buffer{ + memory.NewBufferBytes(b.validityBitmap), + memory.NewBufferBytes(dataAsBytes), + }, + nil, + b.length-bitutil.CountSetBits(b.validityBitmap, 0, b.length), + 0, + ) + b.reset() + b.data = nil + return array.NewInt64Data(data) +} + +// AppendData appends a slice of int64s to the builder. This data is considered +// to be non-null. +func (b *OptInt64Builder) AppendData(data []int64) { + oldLength := b.length + b.data = append(b.data, data...) + b.length += len(data) + b.validityBitmap = resizeBitmap(b.validityBitmap, b.length) + bitutil.SetBitsTo(b.validityBitmap, int64(oldLength), int64(len(data)), true) +} + +func (b *OptInt64Builder) Append(v int64) { + b.data = append(b.data, v) + b.length++ + b.validityBitmap = resizeBitmap(b.validityBitmap, b.length) + bitutil.SetBit(b.validityBitmap, b.length-1) +} + +func (b *OptInt64Builder) Set(i int, v int64) { + b.data[i] = v +} + +func (b *OptInt64Builder) Add(i int, v int64) { + b.data[i] += v +} + +// Value returns the ith value of the builder. +func (b *OptInt64Builder) Value(i int) int64 { + return b.data[i] +} + +func (b *OptInt64Builder) RepeatLastValue(n int) error { + if bitutil.BitIsNotSet(b.validityBitmap, b.length-1) { + b.AppendNulls(n) + return nil + } + + lastValue := b.data[b.length-1] + b.resizeData(b.length + n) + for i := b.length; i < b.length+n; i++ { + b.data[i] = lastValue + } + b.appendValid(n) + return nil +} + +// ResetToLength is specific to distinct optimizations in FrostDB. +func (b *OptInt64Builder) ResetToLength(n int) { + if n == b.length { + return + } + + b.length = n + b.data = b.data[:n] + b.validityBitmap = resizeBitmap(b.validityBitmap, n) +} + +type OptBooleanBuilder struct { + builderBase + data []byte +} + +func NewOptBooleanBuilder(dtype arrow.DataType) *OptBooleanBuilder { + b := &OptBooleanBuilder{} + b.dtype = dtype + return b +} + +func (b *OptBooleanBuilder) Release() { + if atomic.AddInt64(&b.refCount, -1) == 0 { + b.data = nil + b.releaseInternal() + } +} + +func (b *OptBooleanBuilder) AppendNull() { + b.AppendNulls(1) +} + +// AppendEmptyValue adds a new zero value (false) to the array being built. +func (b *OptBooleanBuilder) AppendEmptyValue() { + b.AppendSingle(false) +} + +func (b *OptBooleanBuilder) AppendNulls(n int) { + v := b.length + n + b.data = resizeBitmap(b.data, v) + b.validityBitmap = resizeBitmap(b.validityBitmap, v) + + for i := 0; i < n; i++ { + bitutil.SetBitTo(b.data, b.length, false) + bitutil.SetBitTo(b.validityBitmap, b.length, false) + b.length++ + } +} + +func (b *OptBooleanBuilder) NewArray() arrow.Array { + data := array.NewData( + b.dtype, + b.length, + []*memory.Buffer{ + memory.NewBufferBytes(b.validityBitmap), + memory.NewBufferBytes(b.data), + }, + nil, + b.length-bitutil.CountSetBits(b.validityBitmap, 0, b.length), + 0, + ) + b.reset() + b.data = nil + array := array.NewBooleanData(data) + return array +} + +func (b *OptBooleanBuilder) Append(data []byte, valid int) { + n := b.length + valid + b.data = resizeBitmap(b.data, n) + b.validityBitmap = resizeBitmap(b.validityBitmap, n) + + // TODO: This isn't ideal setting bits 1 by 1, when we could copy in all the bits + for i := 0; i < valid; i++ { + bitutil.SetBitTo(b.data, b.length, bitutil.BitIsSet(data, i)) + bitutil.SetBitTo(b.validityBitmap, b.length, true) + b.length++ + } +} + +func (b *OptBooleanBuilder) Set(i int, v bool) { + bitutil.SetBitTo(b.data, i, v) +} + +func (b *OptBooleanBuilder) Value(i int) bool { + return bitutil.BitIsSet(b.data, i) +} + +func (b *OptBooleanBuilder) AppendData(_ []byte) { + panic("do not use AppendData for opt boolean builder, use Append instead") +} + +func (b *OptBooleanBuilder) AppendSingle(v bool) { + b.length++ + b.data = resizeBitmap(b.data, b.length) + b.validityBitmap = resizeBitmap(b.validityBitmap, b.length) + bitutil.SetBitTo(b.data, b.length-1, v) + bitutil.SetBit(b.validityBitmap, b.length-1) +} + +func (b *OptBooleanBuilder) RepeatLastValue(n int) error { + if bitutil.BitIsNotSet(b.validityBitmap, b.length-1) { + b.AppendNulls(n) + return nil + } + + lastValue := bitutil.BitIsSet(b.data, b.length-1) + b.data = resizeBitmap(b.data, b.length+n) + bitutil.SetBitsTo(b.data, int64(b.length), int64(n), lastValue) + b.appendValid(n) + return nil +} + +// ResetToLength is specific to distinct optimizations in FrostDB. +func (b *OptBooleanBuilder) ResetToLength(n int) { + if n == b.length { + return + } + + b.length = n + b.data = resizeBitmap(b.data, n) + b.validityBitmap = resizeBitmap(b.validityBitmap, n) +} + +type OptInt32Builder struct { + builderBase + + data []int32 +} + +func NewOptInt32Builder(dtype arrow.DataType) *OptInt32Builder { + b := &OptInt32Builder{} + b.dtype = dtype + return b +} + +func (b *OptInt32Builder) resizeData(neededLength int) { + if cap(b.data) < neededLength { + oldData := b.data + b.data = make([]int32, bitutil.NextPowerOf2(neededLength)) + copy(b.data, oldData) + } + b.data = b.data[:neededLength] +} + +func (b *OptInt32Builder) Release() { + if atomic.AddInt64(&b.refCount, -1) == 0 { + b.data = nil + b.releaseInternal() + } +} + +func (b *OptInt32Builder) AppendNull() { + b.AppendNulls(1) +} + +// AppendEmptyValue adds a new zero value (0) to the array being built. +func (b *OptInt32Builder) AppendEmptyValue() { + b.Append(0) +} + +func (b *OptInt32Builder) AppendNulls(n int) { + b.resizeData(b.length + n) + b.builderBase.AppendNulls(n) +} + +func (b *OptInt32Builder) NewArray() arrow.Array { + dataAsBytes := unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(b.data))), len(b.data)*arrow.Int32SizeBytes) + data := array.NewData( + b.dtype, + b.length, + []*memory.Buffer{ + memory.NewBufferBytes(b.validityBitmap), + memory.NewBufferBytes(dataAsBytes), + }, + nil, + b.length-bitutil.CountSetBits(b.validityBitmap, 0, b.length), + 0, + ) + b.reset() + b.data = nil + return array.NewInt32Data(data) +} + +// AppendData appends a slice of int32s to the builder. This data is considered +// to be non-null. +func (b *OptInt32Builder) AppendData(data []int32) { + oldLength := b.length + b.data = append(b.data, data...) + b.length += len(data) + b.validityBitmap = resizeBitmap(b.validityBitmap, b.length) + bitutil.SetBitsTo(b.validityBitmap, int64(oldLength), int64(len(data)), true) +} + +func (b *OptInt32Builder) Append(v int32) { + b.data = append(b.data, v) + b.length++ + b.validityBitmap = resizeBitmap(b.validityBitmap, b.length) + bitutil.SetBit(b.validityBitmap, b.length-1) +} + +// Set sets value v at index i. THis will panic if i is out of bounds. Use this +// after calling Reserve. +func (b *OptInt32Builder) Set(i int, v int32) { + b.data[i] = v + bitutil.SetBit(b.validityBitmap, i) +} + +// Swap swaps values at i and j index. +func (b *OptInt32Builder) Swap(i, j int) { + b.data[i], b.data[j] = b.data[j], b.data[i] +} + +func (b *OptInt32Builder) Add(i int, v int32) { + b.data[i] += v +} + +func (b *OptInt32Builder) Value(i int) int32 { + return b.data[i] +} + +func (b *OptInt32Builder) RepeatLastValue(n int) error { + if bitutil.BitIsNotSet(b.validityBitmap, b.length-1) { + b.AppendNulls(n) + return nil + } + + lastValue := b.data[b.length-1] + b.resizeData(b.length + n) + for i := b.length; i < b.length+n; i++ { + b.data[i] = lastValue + } + b.appendValid(n) + return nil +} + +// ResetToLength is specific to distinct optimizations in FrostDB. +func (b *OptInt32Builder) ResetToLength(n int) { + if n == b.length { + return + } + + b.length = n + b.data = b.data[:n] + b.validityBitmap = resizeBitmap(b.validityBitmap, n) +} + +func (b *OptInt32Builder) Reserve(n int) { + b.length = n + b.data = slices.Grow(b.data, n)[:n] + b.validityBitmap = resizeBitmap(b.validityBitmap, n) +} + +type OptFloat64Builder struct { + builderBase + + data []float64 +} + +func NewOptFloat64Builder(dtype arrow.DataType) *OptFloat64Builder { + b := &OptFloat64Builder{} + b.dtype = dtype + return b +} + +func (b *OptFloat64Builder) resizeData(neededLength int) { + if cap(b.data) < neededLength { + oldData := b.data + b.data = make([]float64, bitutil.NextPowerOf2(neededLength)) + copy(b.data, oldData) + } + b.data = b.data[:neededLength] +} + +func (b *OptFloat64Builder) Release() { + if atomic.AddInt64(&b.refCount, -1) == 0 { + b.data = nil + b.releaseInternal() + } +} + +func (b *OptFloat64Builder) AppendNull() { + b.AppendNulls(1) +} + +// AppendEmptyValue adds a new zero value (0.0) to the array being built. +func (b *OptFloat64Builder) AppendEmptyValue() { + b.Append(0.0) +} + +func (b *OptFloat64Builder) AppendNulls(n int) { + b.resizeData(b.length + n) + b.builderBase.AppendNulls(n) +} + +func (b *OptFloat64Builder) NewArray() arrow.Array { + dataAsBytes := unsafe.Slice((*byte)(unsafe.Pointer(unsafe.SliceData(b.data))), len(b.data)*arrow.Float64SizeBytes) + data := array.NewData( + b.dtype, + b.length, + []*memory.Buffer{ + memory.NewBufferBytes(b.validityBitmap), + memory.NewBufferBytes(dataAsBytes), + }, + nil, + b.length-bitutil.CountSetBits(b.validityBitmap, 0, b.length), + 0, + ) + b.reset() + b.data = nil + return array.NewFloat64Data(data) +} + +// AppendData appends a slice of float64s to the builder. +// This data is considered to be non-null. +func (b *OptFloat64Builder) AppendData(data []float64) { + oldLength := b.length + b.data = append(b.data, data...) + b.length += len(data) + b.validityBitmap = resizeBitmap(b.validityBitmap, b.length) + bitutil.SetBitsTo(b.validityBitmap, int64(oldLength), int64(len(data)), true) +} + +func (b *OptFloat64Builder) Append(v float64) { + b.data = append(b.data, v) + b.length++ + b.validityBitmap = resizeBitmap(b.validityBitmap, b.length) + bitutil.SetBit(b.validityBitmap, b.length-1) +} + +func (b *OptFloat64Builder) Set(i int, v float64) { + b.data[i] = v +} + +func (b *OptFloat64Builder) Add(i int, v float64) { + b.data[i] += v +} + +// Value returns the ith value of the builder. +func (b *OptFloat64Builder) Value(i int) float64 { + return b.data[i] +} + +func (b *OptFloat64Builder) RepeatLastValue(n int) error { + if bitutil.BitIsNotSet(b.validityBitmap, b.length-1) { + b.AppendNulls(n) + return nil + } + + lastValue := b.data[b.length-1] + b.resizeData(b.length + n) + for i := b.length; i < b.length+n; i++ { + b.data[i] = lastValue + } + b.appendValid(n) + return nil +} + +// ResetToLength is specific to distinct optimizations in FrostDB. +func (b *OptFloat64Builder) ResetToLength(n int) { + if n == b.length { + return + } + + b.length = n + b.data = b.data[:n] + b.validityBitmap = resizeBitmap(b.validityBitmap, n) +} diff --git a/pkg/query/internal/builder/optbuilders_test.go b/pkg/query/internal/builder/optbuilders_test.go new file mode 100644 index 00000000000..a96ff3d9b25 --- /dev/null +++ b/pkg/query/internal/builder/optbuilders_test.go @@ -0,0 +1,238 @@ +package builder_test + +import ( + "fmt" + "math" + "math/rand" + "testing" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" + "github.com/stretchr/testify/require" + + "github.com/parca-dev/parca/pkg/query/internal/builder" +) + +func TestOptBuilders(t *testing.T) { + testCases := []struct { + b builder.OptimizedBuilder + v any + }{ + { + b: builder.NewOptBinaryBuilder(arrow.BinaryTypes.Binary), + v: []byte("hello"), + }, + { + b: builder.NewOptBooleanBuilder(arrow.FixedWidthTypes.Boolean), + v: true, + }, + { + b: builder.NewOptFloat64Builder(arrow.PrimitiveTypes.Float64), + v: 1.0, + }, + { + b: builder.NewOptInt32Builder(arrow.PrimitiveTypes.Int32), + v: int32(123), + }, + { + b: builder.NewOptInt64Builder(arrow.PrimitiveTypes.Int64), + v: int64(123), + }, + } + for _, tc := range testCases { + t.Run(fmt.Sprintf("%T", tc.b), func(t *testing.T) { + require.NoError(t, builder.AppendGoValue(tc.b, tc.v)) + require.NoError(t, builder.AppendGoValue(tc.b, tc.v)) + + require.Equal(t, tc.b.Len(), 2) + require.True(t, tc.b.IsValid(0)) + require.True(t, tc.b.IsValid(1)) + + tc.b.SetNull(1) // overwrite second value with NULL + require.True(t, tc.b.IsValid(0)) + require.True(t, tc.b.IsNull(1)) + + a := tc.b.NewArray() + require.Equal(t, tc.v, a.GetOneForMarshal(0)) + require.Equal(t, nil, a.GetOneForMarshal(1)) + }) + } +} + +// https://github.com/polarsignals/frostdb/issues/270 +func TestIssue270(t *testing.T) { + b := builder.NewOptBinaryBuilder(arrow.BinaryTypes.Binary) + b.AppendNull() + const expString = "hello" + require.NoError(t, b.Append([]byte(expString))) + require.Equal(t, b.Len(), 2) + + a := b.NewArray().(*array.Binary) + require.Equal(t, a.Len(), 2) + require.True(t, a.IsNull(0)) + require.Equal(t, string(a.Value(1)), expString) +} + +func TestRepeatLastValue(t *testing.T) { + testCases := []struct { + b builder.OptimizedBuilder + v any + }{ + { + b: builder.NewOptBinaryBuilder(arrow.BinaryTypes.Binary), + v: []byte("hello"), + }, + { + b: builder.NewOptInt64Builder(arrow.PrimitiveTypes.Int64), + v: int64(123), + }, + { + b: builder.NewOptBooleanBuilder(arrow.FixedWidthTypes.Boolean), + v: true, + }, + } + for _, tc := range testCases { + require.NoError(t, builder.AppendGoValue(tc.b, tc.v)) + require.Equal(t, tc.b.Len(), 1) + require.NoError(t, tc.b.RepeatLastValue(9)) + require.Equal(t, tc.b.Len(), 10) + a := tc.b.NewArray() + for i := 0; i < a.Len(); i++ { + v := a.GetOneForMarshal(i) + require.Equal(t, tc.v, v) + } + } +} + +func Test_ListBuilder(t *testing.T) { + lb := builder.NewListBuilder(memory.NewGoAllocator(), &arrow.Int64Type{}) + + lb.Append(true) + lb.ValueBuilder().(*builder.OptInt64Builder).Append(1) + lb.ValueBuilder().(*builder.OptInt64Builder).Append(2) + lb.ValueBuilder().(*builder.OptInt64Builder).Append(3) + lb.Append(true) + lb.ValueBuilder().(*builder.OptInt64Builder).Append(4) + lb.ValueBuilder().(*builder.OptInt64Builder).Append(5) + lb.ValueBuilder().(*builder.OptInt64Builder).Append(6) + + ar := lb.NewArray() + require.Equal(t, "[[1 2 3] [4 5 6]]", fmt.Sprintf("%v", ar)) +} + +// Test_BuildLargeArray is a test that build a large array ( > MaxInt32) +// The reason for this test we've hit cases where the binary array builder had so many values appended onto +// it that it caused the uint32 that was being used to track value offsets to overflow. +func Test_BuildLargeArray(t *testing.T) { + if testing.Short() { + t.Skip("in short mode; skipping long test") + } + alloc := memory.NewGoAllocator() + bldr := builder.NewBuilder(alloc, &arrow.BinaryType{}) + + size := rand.Intn(1024) * 1024 // [1k,1MB) values + buf := make([]byte, size) + binbldr := array.NewBinaryBuilder(alloc, &arrow.BinaryType{}) + binbldr.Append(buf) + arr := binbldr.NewArray() + + n := (math.MaxInt32 / size) + 1 + for i := 0; i < n; i++ { + switch i { + case n - 1: + require.Error(t, builder.AppendValue(bldr, arr, 0)) + default: + require.NoError(t, builder.AppendValue(bldr, arr, 0)) + } + } + + newarr := bldr.NewArray() + + // Validate we can read all rows + for i := 0; i < n-1; i++ { + newarr.(*array.Binary).Value(i) + } + + // We expect fewer rows in the array + require.Equal(t, n-1, newarr.Data().Len()) +} + +func TestOptBinaryBuilder_Value(t *testing.T) { + b := builder.NewOptBinaryBuilder(arrow.BinaryTypes.Binary) + values := []string{"1", "2", "3"} + for _, v := range values { + require.NoError(t, b.Append([]byte(v))) + } + + for i, value := range values { + require.Equal(t, value, string(b.Value(i))) + } +} + +func TestAppendEmptyValue(t *testing.T) { + t.Run("OptBinaryBuilder", func(t *testing.T) { + b := builder.NewOptBinaryBuilder(arrow.BinaryTypes.Binary) + b.AppendEmptyValue() + require.Equal(t, 1, b.Len()) + require.True(t, b.IsValid(0)) + require.Len(t, b.Value(0), 0) // Empty value should have length 0 + + arr := b.NewArray().(*array.Binary) + require.Equal(t, 1, arr.Len()) + require.False(t, arr.IsNull(0)) + require.Len(t, arr.Value(0), 0) // Empty value should have length 0 + }) + + t.Run("OptInt64Builder", func(t *testing.T) { + b := builder.NewOptInt64Builder(arrow.PrimitiveTypes.Int64) + b.AppendEmptyValue() + require.Equal(t, 1, b.Len()) + require.True(t, b.IsValid(0)) + require.Equal(t, int64(0), b.Value(0)) + + arr := b.NewArray().(*array.Int64) + require.Equal(t, 1, arr.Len()) + require.False(t, arr.IsNull(0)) + require.Equal(t, int64(0), arr.Value(0)) + }) + + t.Run("OptInt32Builder", func(t *testing.T) { + b := builder.NewOptInt32Builder(arrow.PrimitiveTypes.Int32) + b.AppendEmptyValue() + require.Equal(t, 1, b.Len()) + require.True(t, b.IsValid(0)) + require.Equal(t, int32(0), b.Value(0)) + + arr := b.NewArray().(*array.Int32) + require.Equal(t, 1, arr.Len()) + require.False(t, arr.IsNull(0)) + require.Equal(t, int32(0), arr.Value(0)) + }) + + t.Run("OptFloat64Builder", func(t *testing.T) { + b := builder.NewOptFloat64Builder(arrow.PrimitiveTypes.Float64) + b.AppendEmptyValue() + require.Equal(t, 1, b.Len()) + require.True(t, b.IsValid(0)) + require.Equal(t, 0.0, b.Value(0)) + + arr := b.NewArray().(*array.Float64) + require.Equal(t, 1, arr.Len()) + require.False(t, arr.IsNull(0)) + require.Equal(t, 0.0, arr.Value(0)) + }) + + t.Run("OptBooleanBuilder", func(t *testing.T) { + b := builder.NewOptBooleanBuilder(arrow.FixedWidthTypes.Boolean) + b.AppendEmptyValue() + require.Equal(t, 1, b.Len()) + require.True(t, b.IsValid(0)) + require.Equal(t, false, b.Value(0)) + + arr := b.NewArray().(*array.Boolean) + require.Equal(t, 1, arr.Len()) + require.False(t, arr.IsNull(0)) + require.Equal(t, false, arr.Value(0)) + }) +} diff --git a/pkg/query/internal/builder/recordbuilder.go b/pkg/query/internal/builder/recordbuilder.go new file mode 100644 index 00000000000..cddc3dab11b --- /dev/null +++ b/pkg/query/internal/builder/recordbuilder.go @@ -0,0 +1,130 @@ +package builder + +import ( + "fmt" + "sync/atomic" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" +) + +// The code in this file is based heavily on Apache arrow's array.RecordBuilder, +// with some modifications to use our own optimized record builders. Ideally, we +// would eventually merge this upstream. + +// RecordBuilder eases the process of building a Record, iteratively, from +// a known Schema. +type RecordBuilder struct { + refCount int64 + mem memory.Allocator + schema *arrow.Schema + fields []ColumnBuilder +} + +// NewRecordBuilder returns a builder, using the provided memory allocator and a schema. +func NewRecordBuilder(mem memory.Allocator, schema *arrow.Schema) *RecordBuilder { + b := &RecordBuilder{ + refCount: 1, + mem: mem, + schema: schema, + fields: make([]ColumnBuilder, schema.NumFields()), + } + + for i := 0; i < schema.NumFields(); i++ { + b.fields[i] = NewBuilder(mem, schema.Field(i).Type) + } + + return b +} + +// Retain increases the reference count by 1. +// Retain may be called simultaneously from multiple goroutines. +func (b *RecordBuilder) Retain() { + atomic.AddInt64(&b.refCount, 1) +} + +// Release decreases the reference count by 1. +func (b *RecordBuilder) Release() { + if atomic.AddInt64(&b.refCount, -1) == 0 { + for _, f := range b.fields { + f.Release() + } + b.fields = nil + } +} + +func (b *RecordBuilder) Schema() *arrow.Schema { return b.schema } +func (b *RecordBuilder) Fields() []ColumnBuilder { return b.fields } +func (b *RecordBuilder) Field(i int) ColumnBuilder { return b.fields[i] } + +func (b *RecordBuilder) Reserve(size int) { + for _, f := range b.fields { + f.Reserve(size) + } +} + +// NewRecord creates a new record from the memory buffers and resets the +// RecordBuilder so it can be used to build a new record. +// +// The returned Record must be Release()'d after use. +// +// NewRecord panics if the fields' builder do not have the same length. +func (b *RecordBuilder) NewRecord() arrow.Record { + cols := make([]arrow.Array, len(b.fields)) + rows := int64(0) + + defer func(cols []arrow.Array) { + for _, col := range cols { + if col == nil { + continue + } + col.Release() + } + }(cols) + + for i, f := range b.fields { + cols[i] = f.NewArray() + irow := int64(cols[i].Len()) + if i > 0 && irow != rows { + panic(fmt.Errorf("arrow/array: field %d has %d rows. want=%d", i, irow, rows)) + } + rows = irow + } + + return array.NewRecord(b.schema, cols, rows) +} + +// ExpandSchema expands the record builder schema by adding new fields. +func (b *RecordBuilder) ExpandSchema(schema *arrow.Schema) { + for i := 0; i < schema.NumFields(); i++ { + f := schema.Field(i) + found := false + for j := 0; j < b.schema.NumFields(); j++ { + old := b.schema.Field(j) + if f.Equal(old) { + found = true + break + } + } + if found { // field already exists + continue + } + + // Add the new field + b.fields = append(b.fields[:i], append([]ColumnBuilder{NewBuilder(b.mem, f.Type)}, b.fields[i:]...)...) + } + + b.schema = schema +} + +// Reset will call ResetFull on any dictionary builders to prevent memo tables from growing unbounded. +func (b *RecordBuilder) Reset() { + for _, f := range b.fields { + if lb, ok := f.(*ListBuilder); ok { + if vb, ok := lb.ValueBuilder().(array.DictionaryBuilder); ok { + vb.ResetFull() + } + } + } +} diff --git a/pkg/query/internal/builder/utils.go b/pkg/query/internal/builder/utils.go new file mode 100644 index 00000000000..dac833a1bba --- /dev/null +++ b/pkg/query/internal/builder/utils.go @@ -0,0 +1,256 @@ +package builder + +import ( + "fmt" + "unsafe" + + "github.com/apache/arrow-go/v18/arrow" + "github.com/apache/arrow-go/v18/arrow/array" + "github.com/apache/arrow-go/v18/arrow/memory" +) + +func NewBuilder(mem memory.Allocator, t arrow.DataType) ColumnBuilder { + switch t := t.(type) { + case *arrow.BinaryType: + return NewOptBinaryBuilder(arrow.BinaryTypes.Binary) + case *arrow.Int64Type: + return NewOptInt64Builder(arrow.PrimitiveTypes.Int64) + case *arrow.ListType: + return NewListBuilder(mem, t.Elem()) + case *arrow.BooleanType: + return NewOptBooleanBuilder(arrow.FixedWidthTypes.Boolean) + default: + return array.NewBuilder(mem, t) + } +} + +func RollbackPrevious(cb ColumnBuilder) error { + switch b := cb.(type) { + case *OptBinaryBuilder: + b.ResetToLength(b.Len() - 1) + case *OptInt64Builder: + b.ResetToLength(b.Len() - 1) + case *OptBooleanBuilder: + b.ResetToLength(b.Len() - 1) + case *array.Int64Builder: + b.Resize(b.Len() - 1) + + case *array.StringBuilder: + b.Resize(b.Len() - 1) + case *array.BinaryBuilder: + b.Resize(b.Len() - 1) + case *array.FixedSizeBinaryBuilder: + b.Resize(b.Len() - 1) + case *array.BooleanBuilder: + b.Resize(b.Len() - 1) + case *array.BinaryDictionaryBuilder: + b.Resize(b.Len() - 1) + default: + return fmt.Errorf("unsupported type for RollbackPrevious %T", b) + } + return nil +} + +func AppendValue(cb ColumnBuilder, arr arrow.Array, i int) error { + if arr == nil || arr.IsNull(i) { + cb.AppendNull() + return nil + } + + switch b := cb.(type) { + case *OptBinaryBuilder: + return b.Append(arr.(*array.Binary).Value(i)) + case *OptInt64Builder: + b.Append(arr.(*array.Int64).Value(i)) + case *OptBooleanBuilder: + b.AppendSingle(arr.(*array.Boolean).Value(i)) + case *array.Int64Builder: + b.Append(arr.(*array.Int64).Value(i)) + case *array.Int32Builder: + b.Append(arr.(*array.Int32).Value(i)) + case *array.Float64Builder: + b.Append(arr.(*array.Float64).Value(i)) + case *array.Uint64Builder: + b.Append(arr.(*array.Uint64).Value(i)) + case *array.StringBuilder: + b.Append(arr.(*array.String).Value(i)) + case *array.BinaryBuilder: + b.Append(arr.(*array.Binary).Value(i)) + case *array.FixedSizeBinaryBuilder: + b.Append(arr.(*array.FixedSizeBinary).Value(i)) + case *array.BooleanBuilder: + b.Append(arr.(*array.Boolean).Value(i)) + case *array.StructBuilder: + arrStruct := arr.(*array.Struct) + + b.Append(true) + for j := 0; j < b.NumField(); j++ { + if err := AppendValue(b.FieldBuilder(j), arrStruct.Field(j), i); err != nil { + return fmt.Errorf("failed to append struct field: %w", err) + } + } + case *array.BinaryDictionaryBuilder: + switch a := arr.(type) { + case *array.Dictionary: + idx := a.GetValueIndex(i) + switch dict := a.Dictionary().(type) { + case *array.Binary: + if idx < 0 || idx >= dict.Len() { + b.AppendNull() + return nil + } + if err := b.Append(dict.Value(idx)); err != nil { + return err + } + case *array.String: + if idx < 0 || idx >= dict.Len() { + b.AppendNull() + return nil + } + if err := b.AppendString(dict.Value(idx)); err != nil { + return err + } + default: + return fmt.Errorf("dictionary type %T unsupported", dict) + } + default: + return fmt.Errorf("non-dictionary array %T provided for dictionary builder", a) + } + case *array.ListBuilder: + return buildList(b.ValueBuilder(), b, arr, i) + case *ListBuilder: + return buildList(b.ValueBuilder(), b, arr, i) + default: + return fmt.Errorf("unsupported type for arrow append %T", b) + } + return nil +} + +type ListLikeBuilder interface { + Append(bool) +} + +func buildList(vb any, b ListLikeBuilder, arr arrow.Array, i int) error { + list := arr.(*array.List) + start, end := list.ValueOffsets(i) + + data := list.ListValues().Data() + if start > int64(data.Len()) || start > end || data.Offset()+int(start) > data.Offset()+data.Len() { + return fmt.Errorf("invalid data range: start=%d end=%d for list with %v", start, end, list.Offsets()) + } + + values := array.NewSlice(list.ListValues(), start, end) + defer values.Release() + + switch v := values.(type) { + case *array.Int64: + int64Builder := vb.(*OptInt64Builder) + b.Append(true) + for j := 0; j < v.Len(); j++ { + int64Builder.Append(v.Value(j)) + } + case *array.Dictionary: + switch dict := v.Dictionary().(type) { + case *array.Binary: + b.Append(true) + for j := 0; j < v.Len(); j++ { + switch bldr := vb.(type) { + case *array.BinaryDictionaryBuilder: + if err := bldr.Append(dict.Value(v.GetValueIndex(j))); err != nil { + return err + } + default: + return fmt.Errorf("unknown value builder type %T", bldr) + } + } + } + case *array.Struct: + structBuilder, ok := vb.(*array.StructBuilder) + if !ok { + return fmt.Errorf("unsupported type for ListLikeBuilder: %T", vb) + } + + b.Append(true) + for j := 0; j < v.Len(); j++ { + structBuilder.Append(true) + for k := 0; k < v.NumField(); k++ { + if err := AppendValue(structBuilder.FieldBuilder(k), v.Field(k), j); err != nil { + return err + } + } + } + default: + return fmt.Errorf("unsupported type for List builder %T", v) + } + + return nil +} + +// TODO(asubiotto): This function doesn't handle NULLs in the case of optimized +// builders. +func AppendArray(cb ColumnBuilder, arr arrow.Array) error { + switch b := cb.(type) { + case *OptBinaryBuilder: + v := arr.(*array.Binary) + offsets := v.ValueOffsets() + return b.AppendData(v.ValueBytes(), *(*[]uint32)(unsafe.Pointer(&offsets))) + case *OptInt64Builder: + b.AppendData(arr.(*array.Int64).Int64Values()) + default: + // TODO(asubiotto): Handle OptBooleanBuilder. It needs some way to + // append data. + for i := 0; i < arr.Len(); i++ { + // This is an interface conversion on each call, but we should care + // more about porting our uses of arrow builders to optimized + // builders for exactly these use cases. + if err := AppendValue(cb, arr, i); err != nil { + return err + } + } + } + return nil +} + +func AppendGoValue(cb ColumnBuilder, v any) error { + if v == nil { + cb.AppendNull() + return nil + } + + switch b := cb.(type) { + case *OptBinaryBuilder: + return b.Append(v.([]byte)) + case *OptBooleanBuilder: + b.AppendSingle(v.(bool)) + case *OptFloat64Builder: + b.Append(v.(float64)) + case *OptInt32Builder: + b.Append(v.(int32)) + case *OptInt64Builder: + b.Append(v.(int64)) + case *array.Int64Builder: + b.Append(v.(int64)) + case *array.Int32Builder: + b.Append(v.(int32)) + case *array.StringBuilder: + b.Append(v.(string)) + case *array.BinaryBuilder: + b.Append(v.([]byte)) + case *array.FixedSizeBinaryBuilder: + b.Append(v.([]byte)) + case *array.BooleanBuilder: + b.Append(v.(bool)) + case *array.BinaryDictionaryBuilder: + switch e := v.(type) { + case string: + return b.Append([]byte(e)) + case []byte: + return b.Append(e) + default: + return fmt.Errorf("unsupported type %T for append go value %T", e, b) + } + default: + return fmt.Errorf("unsupported type for append go value %T", b) + } + return nil +} diff --git a/pkg/query/table.go b/pkg/query/table.go index 0929560346f..ce43e095bb0 100644 --- a/pkg/query/table.go +++ b/pkg/query/table.go @@ -23,7 +23,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/ipc" "github.com/apache/arrow-go/v18/arrow/math" "github.com/apache/arrow-go/v18/arrow/memory" - "github.com/polarsignals/frostdb/pqarrow/builder" + "github.com/parca-dev/parca/pkg/query/internal/builder" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "golang.org/x/exp/maps"