Skip to content

feat(execute): allocate memory for string content. #5482

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 21, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions execute/allocator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package execute

import (
arrowmem "github.com/apache/arrow/go/v7/arrow/memory"

"github.com/influxdata/flux/memory"
)

@@ -156,17 +158,14 @@ func (a *Allocator) GrowFloats(slice []float64, n int) []float64 {
return s
}

// Strings makes a slice of string values.
// Only the string headers are accounted for.
func (a *Allocator) Strings(l, c int) []string {
// Strings makes a slice of String values.
func (a *Allocator) Strings(l, c int) []String {
a.account(c, stringSize)
return make([]string, l, c)
return make([]String, l, c)
}

// AppendStrings appends strings to a slice.
// Only the string headers are accounted for.
func (a *Allocator) AppendStrings(slice []string, vs ...string) []string {
// TODO(nathanielc): Account for actual size of strings
// AppendStrings appends Strings to a slice.
func (a *Allocator) AppendStrings(slice []String, vs ...String) []String {
if cap(slice)-len(slice) >= len(vs) {
return append(slice, vs...)
}
@@ -176,14 +175,14 @@ func (a *Allocator) AppendStrings(slice []string, vs ...string) []string {
return s
}

func (a *Allocator) GrowStrings(slice []string, n int) []string {
func (a *Allocator) GrowStrings(slice []String, n int) []String {
newCap := len(slice) + n
if newCap < cap(slice) {
return slice[:newCap]
}
// grow capacity same way as built-in append
newCap = newCap*3/2 + 1
s := make([]string, len(slice)+n, newCap)
s := make([]String, len(slice)+n, newCap)
copy(s, slice)
diff := cap(s) - cap(slice)
a.account(diff, stringSize)
@@ -220,3 +219,13 @@ func (a *Allocator) GrowTimes(slice []Time, n int) []Time {
a.account(diff, timeSize)
return s
}

// String represents a string stored in some backing byte slice.
type String struct {
offset int
len int
}

func (s String) Bytes(buf *arrowmem.Buffer) []byte {
return buf.Bytes()[s.offset : s.offset+s.len]
}
Comment on lines +223 to +231
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 My understanding of this new struct is that it stores the column string size and if a string get too long, we can do something with it

104 changes: 65 additions & 39 deletions execute/table.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package execute

import (
"bytes"
"fmt"
"sort"
"sync/atomic"

arrowmem "github.com/apache/arrow/go/v7/arrow/memory"
"github.com/google/go-cmp/cmp"

"github.com/influxdata/flux"
"github.com/influxdata/flux/array"
"github.com/influxdata/flux/arrow"
@@ -295,8 +298,9 @@ func TablesEqual(left, right flux.Table, alloc memory.Allocator) (bool, error) {
eq = cmp.Equal(leftBuffer.cols[j].(*floatColumnBuilder).data,
rightBuffer.cols[j].(*floatColumnBuilder).data)
case flux.TString:
eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder).data,
rightBuffer.cols[j].(*stringColumnBuilder).data)
eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder),
rightBuffer.cols[j].(*stringColumnBuilder),
cmp.Comparer(stringColumnBuilderEqual))
case flux.TTime:
eq = cmp.Equal(leftBuffer.cols[j].(*timeColumnBuilder).data,
rightBuffer.cols[j].(*timeColumnBuilder).data)
@@ -324,6 +328,27 @@ func colsMatch(left, right []flux.ColMeta) bool {
return true
}

func stringColumnBuilderEqual(x, y *stringColumnBuilder) bool {
if x.Len() != y.Len() {
return false
}
for i := 0; i < x.Len(); i++ {
if x.IsNil(i) {
if !y.IsNil(i) {
return false
}
continue
}
if y.IsNil(i) {
return false
}
if !bytes.Equal(x.data[i].Bytes(x.buf), y.data[i].Bytes(y.buf)) {
return false
}
}
return true
}

// ColMap writes a mapping of builder index to cols index into colMap.
// When colMap does not have enough capacity a new colMap is allocated.
// The colMap is always returned
@@ -598,6 +623,7 @@ func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error) {
case flux.TString:
b.cols = append(b.cols, &stringColumnBuilder{
columnBuilderBase: colBase,
buf: arrowmem.NewResizableBuffer(b.alloc.Allocator),
})
if b.NRows() > 0 {
if err := b.GrowStrings(newIdx, b.NRows()); err != nil {
@@ -919,8 +945,9 @@ func (b *ColListTableBuilder) SetString(i int, j int, value string) error {
if err := b.checkCol(j, flux.TString); err != nil {
return err
}
b.cols[j].(*stringColumnBuilder).data[i] = value
b.cols[j].SetNil(i, false)
col := b.cols[j].(*stringColumnBuilder)
col.data[i] = col.makeString(value)
col.SetNil(i, false)
return nil
}

@@ -929,7 +956,7 @@ func (b *ColListTableBuilder) AppendString(j int, value string) error {
return err
}
col := b.cols[j].(*stringColumnBuilder)
col.data = b.alloc.AppendStrings(col.data, value)
col.data = b.alloc.AppendStrings(col.data, col.makeString(value))
b.nrows = len(col.data)
return nil
}
@@ -1152,11 +1179,6 @@ func (b *ColListTableBuilder) Floats(j int) []float64 {
CheckColType(b.colMeta[j], flux.TFloat)
return b.cols[j].(*floatColumnBuilder).data
}
func (b *ColListTableBuilder) Strings(j int) []string {
meta := b.colMeta[j]
CheckColType(meta, flux.TString)
return b.cols[j].(*stringColumnBuilder).data
}
func (b *ColListTableBuilder) Times(j int) []values.Time {
CheckColType(b.colMeta[j], flux.TTime)
return b.cols[j].(*timeColumnBuilder).data
@@ -1180,7 +1202,9 @@ func (b *ColListTableBuilder) GetRow(row int) values.Object {
case flux.TFloat:
val = values.NewFloat(b.cols[j].(*floatColumnBuilder).data[row])
case flux.TString:
val = values.NewString(b.cols[j].(*stringColumnBuilder).data[row])
// TODO(mhilton): avoid a copy
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is intended this will be changed in a followup PR.

col := b.cols[j].(*stringColumnBuilder)
val = values.NewString(string(col.data[row].Bytes(col.buf)))
case flux.TTime:
val = values.NewTime(b.cols[j].(*timeColumnBuilder).data[row])
}
@@ -1866,46 +1890,38 @@ func (c *stringColumn) Copy() column {

type stringColumnBuilder struct {
columnBuilderBase
data []string
data []String

// buf contains a backing buffer containing the bytes of the
// strings.
buf *arrowmem.Buffer
}

func (c *stringColumnBuilder) Clear() {
c.data = c.data[0:0]
c.buf.Release()
c.buf = arrowmem.NewResizableBuffer(c.alloc.Allocator)
c.data = c.data[:0]
}

func (c *stringColumnBuilder) Release() {
c.buf.Release()
c.alloc.Free(cap(c.data), stringSize)
c.data = nil
}

func (c *stringColumnBuilder) Copy() column {
var data *array.String
if len(c.nils) > 0 {
b := arrow.NewStringBuilder(c.alloc.Allocator)
b.Reserve(len(c.data))
sz := 0
for i, v := range c.data {
if c.nils[i] {
continue
}
sz += len(v)
}
b.ReserveData(sz)
for i, v := range c.data {
if c.nils[i] {
b.AppendNull()
continue
}
b.Append(v)
builder := arrow.NewStringBuilder(c.alloc.Allocator)
builder.Reserve(len(c.data))
builder.ReserveData(c.buf.Len())
for i, v := range c.data {
if c.nils[i] {
builder.AppendNull()
continue
}
data = b.NewStringArray()
b.Release()
} else {
data = arrow.NewString(c.data, c.alloc.Allocator)
builder.AppendBytes(v.Bytes(c.buf))
}
col := &stringColumn{
ColMeta: c.ColMeta,
data: data,
data: builder.NewStringArray(),
}
return col
}
@@ -1916,13 +1932,13 @@ func (c *stringColumnBuilder) Len() int {

func (c *stringColumnBuilder) Equal(i, j int) bool {
return c.EqualFunc(i, j, func(i, j int) bool {
return c.data[i] == c.data[j]
return bytes.Equal(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf))
})
}

func (c *stringColumnBuilder) Less(i, j int) bool {
return c.LessFunc(i, j, func(i, j int) bool {
return c.data[i] < c.data[j]
return bytes.Compare(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf)) < 0
})
}

@@ -1931,6 +1947,16 @@ func (c *stringColumnBuilder) Swap(i, j int) {
c.data[i], c.data[j] = c.data[j], c.data[i]
}

func (c *stringColumnBuilder) makeString(s string) String {
offset := c.buf.Len()
c.buf.Resize(offset + len(s))
copy(c.buf.Bytes()[offset:], s)
return String{
offset: offset,
len: len(s),
}
}

type timeColumn struct {
flux.ColMeta
data *array.Int
52 changes: 52 additions & 0 deletions execute/table_test.go
Original file line number Diff line number Diff line change
@@ -148,6 +148,58 @@ func TestTablesEqual(t *testing.T) {
},
want: false,
},
{
name: "string values",
data0: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "3"},
},
},
data1: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "3"},
},
},
want: true,
},
{
name: "string mismatch",
data0: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "3"},
},
},
data1: &executetest.Table{
ColMeta: []flux.ColMeta{
{Label: "_time", Type: flux.TTime},
{Label: "_value", Type: flux.TString},
},
Data: [][]interface{}{
{execute.Time(1), "1"},
{execute.Time(2), "2"},
{execute.Time(3), "4"},
},
},
want: false,
},
}
for _, tc := range testCases {
tc := tc