Skip to content

Commit bea9586

Browse files
authoredMay 21, 2024··
feat(execute): allocate memory for string content. (#5482)
Update the string column builder to account for the memory for the string contents.
1 parent 918c26c commit bea9586

File tree

3 files changed

+136
-49
lines changed

3 files changed

+136
-49
lines changed
 

‎execute/allocator.go

+19-10
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package execute
22

33
import (
4+
arrowmem "github.com/apache/arrow/go/v7/arrow/memory"
5+
46
"github.com/influxdata/flux/memory"
57
)
68

@@ -156,17 +158,14 @@ func (a *Allocator) GrowFloats(slice []float64, n int) []float64 {
156158
return s
157159
}
158160

159-
// Strings makes a slice of string values.
160-
// Only the string headers are accounted for.
161-
func (a *Allocator) Strings(l, c int) []string {
161+
// Strings makes a slice of String values.
162+
func (a *Allocator) Strings(l, c int) []String {
162163
a.account(c, stringSize)
163-
return make([]string, l, c)
164+
return make([]String, l, c)
164165
}
165166

166-
// AppendStrings appends strings to a slice.
167-
// Only the string headers are accounted for.
168-
func (a *Allocator) AppendStrings(slice []string, vs ...string) []string {
169-
// TODO(nathanielc): Account for actual size of strings
167+
// AppendStrings appends Strings to a slice.
168+
func (a *Allocator) AppendStrings(slice []String, vs ...String) []String {
170169
if cap(slice)-len(slice) >= len(vs) {
171170
return append(slice, vs...)
172171
}
@@ -176,14 +175,14 @@ func (a *Allocator) AppendStrings(slice []string, vs ...string) []string {
176175
return s
177176
}
178177

179-
func (a *Allocator) GrowStrings(slice []string, n int) []string {
178+
func (a *Allocator) GrowStrings(slice []String, n int) []String {
180179
newCap := len(slice) + n
181180
if newCap < cap(slice) {
182181
return slice[:newCap]
183182
}
184183
// grow capacity same way as built-in append
185184
newCap = newCap*3/2 + 1
186-
s := make([]string, len(slice)+n, newCap)
185+
s := make([]String, len(slice)+n, newCap)
187186
copy(s, slice)
188187
diff := cap(s) - cap(slice)
189188
a.account(diff, stringSize)
@@ -220,3 +219,13 @@ func (a *Allocator) GrowTimes(slice []Time, n int) []Time {
220219
a.account(diff, timeSize)
221220
return s
222221
}
222+
223+
// String represents a string stored in some backing byte slice.
224+
type String struct {
225+
offset int
226+
len int
227+
}
228+
229+
func (s String) Bytes(buf *arrowmem.Buffer) []byte {
230+
return buf.Bytes()[s.offset : s.offset+s.len]
231+
}

‎execute/table.go

+65-39
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
package execute
22

33
import (
4+
"bytes"
45
"fmt"
56
"sort"
67
"sync/atomic"
78

9+
arrowmem "github.com/apache/arrow/go/v7/arrow/memory"
810
"github.com/google/go-cmp/cmp"
11+
912
"github.com/influxdata/flux"
1013
"github.com/influxdata/flux/array"
1114
"github.com/influxdata/flux/arrow"
@@ -295,8 +298,9 @@ func TablesEqual(left, right flux.Table, alloc memory.Allocator) (bool, error) {
295298
eq = cmp.Equal(leftBuffer.cols[j].(*floatColumnBuilder).data,
296299
rightBuffer.cols[j].(*floatColumnBuilder).data)
297300
case flux.TString:
298-
eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder).data,
299-
rightBuffer.cols[j].(*stringColumnBuilder).data)
301+
eq = cmp.Equal(leftBuffer.cols[j].(*stringColumnBuilder),
302+
rightBuffer.cols[j].(*stringColumnBuilder),
303+
cmp.Comparer(stringColumnBuilderEqual))
300304
case flux.TTime:
301305
eq = cmp.Equal(leftBuffer.cols[j].(*timeColumnBuilder).data,
302306
rightBuffer.cols[j].(*timeColumnBuilder).data)
@@ -324,6 +328,27 @@ func colsMatch(left, right []flux.ColMeta) bool {
324328
return true
325329
}
326330

331+
func stringColumnBuilderEqual(x, y *stringColumnBuilder) bool {
332+
if x.Len() != y.Len() {
333+
return false
334+
}
335+
for i := 0; i < x.Len(); i++ {
336+
if x.IsNil(i) {
337+
if !y.IsNil(i) {
338+
return false
339+
}
340+
continue
341+
}
342+
if y.IsNil(i) {
343+
return false
344+
}
345+
if !bytes.Equal(x.data[i].Bytes(x.buf), y.data[i].Bytes(y.buf)) {
346+
return false
347+
}
348+
}
349+
return true
350+
}
351+
327352
// ColMap writes a mapping of builder index to cols index into colMap.
328353
// When colMap does not have enough capacity a new colMap is allocated.
329354
// The colMap is always returned
@@ -598,6 +623,7 @@ func (b *ColListTableBuilder) AddCol(c flux.ColMeta) (int, error) {
598623
case flux.TString:
599624
b.cols = append(b.cols, &stringColumnBuilder{
600625
columnBuilderBase: colBase,
626+
buf: arrowmem.NewResizableBuffer(b.alloc.Allocator),
601627
})
602628
if b.NRows() > 0 {
603629
if err := b.GrowStrings(newIdx, b.NRows()); err != nil {
@@ -919,8 +945,9 @@ func (b *ColListTableBuilder) SetString(i int, j int, value string) error {
919945
if err := b.checkCol(j, flux.TString); err != nil {
920946
return err
921947
}
922-
b.cols[j].(*stringColumnBuilder).data[i] = value
923-
b.cols[j].SetNil(i, false)
948+
col := b.cols[j].(*stringColumnBuilder)
949+
col.data[i] = col.makeString(value)
950+
col.SetNil(i, false)
924951
return nil
925952
}
926953

@@ -929,7 +956,7 @@ func (b *ColListTableBuilder) AppendString(j int, value string) error {
929956
return err
930957
}
931958
col := b.cols[j].(*stringColumnBuilder)
932-
col.data = b.alloc.AppendStrings(col.data, value)
959+
col.data = b.alloc.AppendStrings(col.data, col.makeString(value))
933960
b.nrows = len(col.data)
934961
return nil
935962
}
@@ -1152,11 +1179,6 @@ func (b *ColListTableBuilder) Floats(j int) []float64 {
11521179
CheckColType(b.colMeta[j], flux.TFloat)
11531180
return b.cols[j].(*floatColumnBuilder).data
11541181
}
1155-
func (b *ColListTableBuilder) Strings(j int) []string {
1156-
meta := b.colMeta[j]
1157-
CheckColType(meta, flux.TString)
1158-
return b.cols[j].(*stringColumnBuilder).data
1159-
}
11601182
func (b *ColListTableBuilder) Times(j int) []values.Time {
11611183
CheckColType(b.colMeta[j], flux.TTime)
11621184
return b.cols[j].(*timeColumnBuilder).data
@@ -1180,7 +1202,9 @@ func (b *ColListTableBuilder) GetRow(row int) values.Object {
11801202
case flux.TFloat:
11811203
val = values.NewFloat(b.cols[j].(*floatColumnBuilder).data[row])
11821204
case flux.TString:
1183-
val = values.NewString(b.cols[j].(*stringColumnBuilder).data[row])
1205+
// TODO(mhilton): avoid a copy
1206+
col := b.cols[j].(*stringColumnBuilder)
1207+
val = values.NewString(string(col.data[row].Bytes(col.buf)))
11841208
case flux.TTime:
11851209
val = values.NewTime(b.cols[j].(*timeColumnBuilder).data[row])
11861210
}
@@ -1866,46 +1890,38 @@ func (c *stringColumn) Copy() column {
18661890

18671891
type stringColumnBuilder struct {
18681892
columnBuilderBase
1869-
data []string
1893+
data []String
1894+
1895+
// buf contains a backing buffer containing the bytes of the
1896+
// strings.
1897+
buf *arrowmem.Buffer
18701898
}
18711899

18721900
func (c *stringColumnBuilder) Clear() {
1873-
c.data = c.data[0:0]
1901+
c.buf.Release()
1902+
c.buf = arrowmem.NewResizableBuffer(c.alloc.Allocator)
1903+
c.data = c.data[:0]
18741904
}
18751905

18761906
func (c *stringColumnBuilder) Release() {
1907+
c.buf.Release()
18771908
c.alloc.Free(cap(c.data), stringSize)
1878-
c.data = nil
18791909
}
18801910

18811911
func (c *stringColumnBuilder) Copy() column {
1882-
var data *array.String
1883-
if len(c.nils) > 0 {
1884-
b := arrow.NewStringBuilder(c.alloc.Allocator)
1885-
b.Reserve(len(c.data))
1886-
sz := 0
1887-
for i, v := range c.data {
1888-
if c.nils[i] {
1889-
continue
1890-
}
1891-
sz += len(v)
1892-
}
1893-
b.ReserveData(sz)
1894-
for i, v := range c.data {
1895-
if c.nils[i] {
1896-
b.AppendNull()
1897-
continue
1898-
}
1899-
b.Append(v)
1912+
builder := arrow.NewStringBuilder(c.alloc.Allocator)
1913+
builder.Reserve(len(c.data))
1914+
builder.ReserveData(c.buf.Len())
1915+
for i, v := range c.data {
1916+
if c.nils[i] {
1917+
builder.AppendNull()
1918+
continue
19001919
}
1901-
data = b.NewStringArray()
1902-
b.Release()
1903-
} else {
1904-
data = arrow.NewString(c.data, c.alloc.Allocator)
1920+
builder.AppendBytes(v.Bytes(c.buf))
19051921
}
19061922
col := &stringColumn{
19071923
ColMeta: c.ColMeta,
1908-
data: data,
1924+
data: builder.NewStringArray(),
19091925
}
19101926
return col
19111927
}
@@ -1916,13 +1932,13 @@ func (c *stringColumnBuilder) Len() int {
19161932

19171933
func (c *stringColumnBuilder) Equal(i, j int) bool {
19181934
return c.EqualFunc(i, j, func(i, j int) bool {
1919-
return c.data[i] == c.data[j]
1935+
return bytes.Equal(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf))
19201936
})
19211937
}
19221938

19231939
func (c *stringColumnBuilder) Less(i, j int) bool {
19241940
return c.LessFunc(i, j, func(i, j int) bool {
1925-
return c.data[i] < c.data[j]
1941+
return bytes.Compare(c.data[i].Bytes(c.buf), c.data[j].Bytes(c.buf)) < 0
19261942
})
19271943
}
19281944

@@ -1931,6 +1947,16 @@ func (c *stringColumnBuilder) Swap(i, j int) {
19311947
c.data[i], c.data[j] = c.data[j], c.data[i]
19321948
}
19331949

1950+
func (c *stringColumnBuilder) makeString(s string) String {
1951+
offset := c.buf.Len()
1952+
c.buf.Resize(offset + len(s))
1953+
copy(c.buf.Bytes()[offset:], s)
1954+
return String{
1955+
offset: offset,
1956+
len: len(s),
1957+
}
1958+
}
1959+
19341960
type timeColumn struct {
19351961
flux.ColMeta
19361962
data *array.Int

‎execute/table_test.go

+52
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,58 @@ func TestTablesEqual(t *testing.T) {
148148
},
149149
want: false,
150150
},
151+
{
152+
name: "string values",
153+
data0: &executetest.Table{
154+
ColMeta: []flux.ColMeta{
155+
{Label: "_time", Type: flux.TTime},
156+
{Label: "_value", Type: flux.TString},
157+
},
158+
Data: [][]interface{}{
159+
{execute.Time(1), "1"},
160+
{execute.Time(2), "2"},
161+
{execute.Time(3), "3"},
162+
},
163+
},
164+
data1: &executetest.Table{
165+
ColMeta: []flux.ColMeta{
166+
{Label: "_time", Type: flux.TTime},
167+
{Label: "_value", Type: flux.TString},
168+
},
169+
Data: [][]interface{}{
170+
{execute.Time(1), "1"},
171+
{execute.Time(2), "2"},
172+
{execute.Time(3), "3"},
173+
},
174+
},
175+
want: true,
176+
},
177+
{
178+
name: "string mismatch",
179+
data0: &executetest.Table{
180+
ColMeta: []flux.ColMeta{
181+
{Label: "_time", Type: flux.TTime},
182+
{Label: "_value", Type: flux.TString},
183+
},
184+
Data: [][]interface{}{
185+
{execute.Time(1), "1"},
186+
{execute.Time(2), "2"},
187+
{execute.Time(3), "3"},
188+
},
189+
},
190+
data1: &executetest.Table{
191+
ColMeta: []flux.ColMeta{
192+
{Label: "_time", Type: flux.TTime},
193+
{Label: "_value", Type: flux.TString},
194+
},
195+
Data: [][]interface{}{
196+
{execute.Time(1), "1"},
197+
{execute.Time(2), "2"},
198+
{execute.Time(3), "4"},
199+
},
200+
},
201+
want: false,
202+
},
151203
}
152204
for _, tc := range testCases {
153205
tc := tc

0 commit comments

Comments
 (0)
Please sign in to comment.