Skip to content

feat: return to internal string references #5486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 5, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 0 additions & 55 deletions array/array.go
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@ import (

"github.com/apache/arrow/go/v7/arrow"
"github.com/apache/arrow/go/v7/arrow/array"
arrowmem "github.com/apache/arrow/go/v7/arrow/memory"

"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/internal/errors"
@@ -158,65 +157,11 @@ func (a *String) Value(i int) string {
return a.ValueString(i)
}

// ValueRef returns a reference to the memory buffer and location that
// stores the value at i. The reference is only valid for as long as the
// array is, the buffer needs to be retained if further access is
// required.
func (a *String) ValueRef(i int) StringRef {
if vr, ok := a.binaryArray.(interface{ ValueRef(int) StringRef }); ok {
return vr.ValueRef(i)
}
return StringRef{
buf: a.Data().Buffers()[2],
off: a.ValueOffset(i),
len: a.ValueLen(i),
}
}

// ValueCopy returns the value at the requested position copied into a
// new memory location. This value will remain valid after the array is
// released, but is not tracked by the memory allocator.
//
// This function is intended to be temporary while changes are being
// made to reduce the amount of unaccounted data memory.
func (a *String) ValueCopy(i int) string {
return string(a.ValueRef(i).Bytes())
}

func (a *String) IsConstant() bool {
ic, ok := a.binaryArray.(interface{ IsConstant() bool })
return ok && ic.IsConstant()
}

// StringRef contains a referenct to the storage for a value.
type StringRef struct {
buf *arrowmem.Buffer
off int
len int
}

// Buffer returns the memory buffer that contains the value.
func (r StringRef) Buffer() *arrowmem.Buffer {
return r.buf
}

// Offset returns the offset into the memory buffer at which the value
// starts.
func (r StringRef) Offset() int {
return r.off
}

// Len returns the length of the value.
func (r StringRef) Len() int {
return r.len
}

// Bytes returns the bytes from the memory buffer that contain the
// value.
func (r StringRef) Bytes() []byte {
return r.buf.Bytes()[r.off : r.off+r.len]
}

type sliceable interface {
Slice(i, j int) Array
}
8 changes: 0 additions & 8 deletions array/repeat.go
Original file line number Diff line number Diff line change
@@ -83,11 +83,3 @@ func (b *repeatedBinary) Slice(i, j int) binaryArray {
buf: b.buf,
}
}

func (b *repeatedBinary) ValueRef(int) StringRef {
return StringRef{
buf: b.buf,
off: 0,
len: b.buf.Len(),
}
}
4 changes: 2 additions & 2 deletions arrow/arrow_test.go
Original file line number Diff line number Diff line change
@@ -472,7 +472,7 @@ func TestSlice_String(t *testing.T) {

vs := make([]string, l)
for i := 0; i < l; i++ {
vs[i] = arr.ValueCopy(i)
vs[i] = arr.Value(i)
}

if !cmp.Equal(values, vs) {
@@ -540,7 +540,7 @@ func TestSlice_String(t *testing.T) {

vs = vs[:0]
for i := 0; i < l; i++ {
vs = append(vs, arr.ValueCopy(i))
vs = append(vs, arr.Value(i))
}

if !cmp.Equal(tc.want, vs) {
2 changes: 1 addition & 1 deletion csv/result.go
Original file line number Diff line number Diff line change
@@ -1257,7 +1257,7 @@ func encodeValueFrom(i, j int, c colMeta, cr flux.ColReader) (string, error) {
}
case flux.TString:
if cr.Strings(j).IsValid(i) {
v = cr.Strings(j).ValueCopy(i)
v = cr.Strings(j).Value(i)
}
case flux.TTime:
if cr.Times(j).IsValid(i) {
26 changes: 1 addition & 25 deletions execute/executetest/allocator.go
Original file line number Diff line number Diff line change
@@ -7,29 +7,5 @@ import (
)

var UnlimitedAllocator = &memory.ResourceAllocator{
Allocator: Allocator{},
}

// Allocator is an allocator for use in test. When a buffer is freed the
// contents are overwritten with a predictable pattern to help detect
// use-after-free scenarios.
type Allocator struct{}

func (Allocator) Allocate(size int) []byte {
return arrowmem.DefaultAllocator.Allocate(size)
}

func (a Allocator) Reallocate(size int, b []byte) []byte {
b1 := a.Allocate(size)
copy(b1, b)
a.Free(b)
return b1
}

func (a Allocator) Free(b []byte) {
var pattern = [...]byte{0x00, 0x33, 0xcc, 0xff}
for i := range b {
b[i] = pattern[i%len(pattern)]
}
arrowmem.DefaultAllocator.Free(b)
Allocator: arrowmem.DefaultAllocator,
}
2 changes: 1 addition & 1 deletion execute/executetest/table.go
Original file line number Diff line number Diff line change
@@ -544,7 +544,7 @@ func ConvertTable(tbl flux.Table) (*Table, error) {
}
case flux.TString:
if col := cr.Strings(j); col.IsValid(i) {
row[j] = col.ValueCopy(i)
row[j] = col.Value(i)
}
case flux.TTime:
if col := cr.Times(j); col.IsValid(i) {
2 changes: 1 addition & 1 deletion execute/table.go
Original file line number Diff line number Diff line change
@@ -437,7 +437,7 @@ func ValueForRow(cr flux.ColReader, i, j int) values.Value {
if cr.Strings(j).IsNull(i) {
return values.NewNull(semantic.BasicString)
}
return values.NewString(cr.Strings(j).ValueCopy(i))
return values.NewString(cr.Strings(j).Value(i))
case flux.TInt:
if cr.Ints(j).IsNull(i) {
return values.NewNull(semantic.BasicInt)
2 changes: 1 addition & 1 deletion execute/table/stringify.go
Original file line number Diff line number Diff line change
@@ -124,7 +124,7 @@ func valueForRow(cr flux.ColReader, i, j int) values.Value {
if cr.Strings(j).IsNull(i) {
return values.NewNull(semantic.BasicString)
}
return values.NewString(cr.Strings(j).ValueCopy(i))
return values.NewString(cr.Strings(j).Value(i))
case flux.TInt:
if cr.Ints(j).IsNull(i) {
return values.NewNull(semantic.BasicInt)
2 changes: 1 addition & 1 deletion internal/arrowutil/array_values.gen.go
Original file line number Diff line number Diff line change
@@ -498,7 +498,7 @@ func (v StringArrayValue) Get(i int) values.Value {
if v.arr.IsNull(i) {
return values.Null
}
return values.New(v.arr.ValueCopy(i))
return values.New(v.arr.Value(i))
}

func (v StringArrayValue) Set(i int, value values.Value) {
4 changes: 2 additions & 2 deletions internal/arrowutil/compare.gen.go
Original file line number Diff line number Diff line change
@@ -237,7 +237,7 @@ func StringCompare(x, y *array.String, i, j int) int {
return 1
}

if l, r := x.ValueCopy(i), y.ValueCopy(j); l < r {
if l, r := x.Value(i), y.Value(j); l < r {
return -1
} else if l == r {
return 0
@@ -256,7 +256,7 @@ func StringCompareDesc(x, y *array.String, i, j int) int {
return 1
}

if l, r := x.ValueCopy(i), y.ValueCopy(j); l > r {
if l, r := x.Value(i), y.Value(j); l > r {
return -1
} else if l == r {
return 0
6 changes: 3 additions & 3 deletions internal/arrowutil/copy.gen.go
Original file line number Diff line number Diff line change
@@ -284,7 +284,7 @@ func CopyStringsTo(b *array.StringBuilder, arr *array.String) {
b.AppendNull()
continue
}
b.Append(arr.ValueCopy(i))
b.Append(arr.Value(i))
}
}

@@ -315,7 +315,7 @@ func CopyStringsByIndexTo(b *array.StringBuilder, arr *array.String, indices *ar
b.AppendNull()
continue
}
b.Append(arr.ValueCopy(offset))
b.Append(arr.Value(offset))
}
}

@@ -324,5 +324,5 @@ func CopyStringValue(b *array.StringBuilder, arr *array.String, i int) {
b.AppendNull()
return
}
b.Append(arr.ValueCopy(i))
b.Append(arr.Value(i))
}
2 changes: 1 addition & 1 deletion internal/arrowutil/filter.gen.go
Original file line number Diff line number Diff line change
@@ -108,7 +108,7 @@ func FilterStrings(arr *array.String, bitset []byte, mem memory.Allocator) *arra
for i := 0; i < len(bitset); i++ {
if bitutil.BitIsSet(bitset, i) {
if arr.IsValid(i) {
b.Append(arr.ValueCopy(i))
b.Append(arr.Value(i))
} else {
b.AppendNull()
}
4 changes: 2 additions & 2 deletions internal/arrowutil/iterator.gen.go
Original file line number Diff line number Diff line change
@@ -289,10 +289,10 @@ func IterateStrings(arrs []array.Array) StringIterator {
return StringIterator{Values: values}
}

// ValueCopy returns the current value in the iterator.
// Value returns the current value in the iterator.
func (i *StringIterator) Value() string {
vs := i.Values[0]
return vs.ValueCopy(i.i)
return vs.Value(i.i)
}

// IsValid returns if the current value is valid.
2 changes: 1 addition & 1 deletion internal/arrowutil/types.tmpldata
Original file line number Diff line number Diff line change
@@ -50,7 +50,7 @@
"MonoType": "semantic.BasicString",
"IsNumeric": false,
"IsComparable": true,
"Value": "ValueCopy",
"Value": "Value",
"Append": "Append",
"NewArray": "NewStringArray"
}
4 changes: 2 additions & 2 deletions internal/moving_average/array_container.go
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ func (a *ArrayContainer) Value(i int) values.Value {
case *array.Float:
return values.New(float64(a.array.(*array.Float).Value(i)))
case *array.String:
return values.New(string(a.array.(*array.String).ValueCopy(i)))
return values.New(string(a.array.(*array.String).Value(i)))
default:
return nil
}
@@ -54,7 +54,7 @@ func (a *ArrayContainer) OrigValue(i int) interface{} {
case *array.Float:
return a.array.(*array.Float).Value(i)
case *array.String:
return a.array.(*array.String).ValueCopy(i)
return a.array.(*array.String).Value(i)
default:
return nil
}
2 changes: 1 addition & 1 deletion result_iterator_test.go
Original file line number Diff line number Diff line change
@@ -192,7 +192,7 @@ func TestQueryResultIterator_Results(t *testing.T) {
for i := 0; i < cr.Len(); i++ {
r := row{
Value: cr.Ints(0).Value(i),
Tag: cr.Strings(1).ValueCopy(i),
Tag: cr.Strings(1).Value(i),
}
got = append(got, r)
}
2 changes: 1 addition & 1 deletion semantic/semantictest/cmp.go
Original file line number Diff line number Diff line change
@@ -219,7 +219,7 @@ func getValue(arr array.Array, i int) values.Value {
case *array.Float:
return values.New(arr.Value(i))
case *array.String:
return values.New(arr.ValueCopy(i))
return values.New(arr.Value(i))
case *array.Boolean:
return values.New(arr.Value(i))
default:
6 changes: 3 additions & 3 deletions stdlib/experimental/mqtt/to.go
Original file line number Diff line number Diff line change
@@ -316,12 +316,12 @@ func (t *ToMQTTTransformation) Process(id execute.DatasetID, tbl flux.Table) err
if col.Type != flux.TString {
return errors.Newf(codes.FailedPrecondition, "invalid type for measurement column: %s", col.Type)
}
m.name = er.Strings(j).ValueCopy(i)
m.name = er.Strings(j).Value(i)
case isTag[j]:
if col.Type != flux.TString {
return errors.Newf(codes.FailedPrecondition, "invalid type for tag column: %s", col.Type)
}
m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j).ValueCopy(i)})
m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j).Value(i)})

case isValue[j]:
switch col.Type {
@@ -332,7 +332,7 @@ func (t *ToMQTTTransformation) Process(id execute.DatasetID, tbl flux.Table) err
case flux.TUInt:
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.UInts(j).Value(i)})
case flux.TString:
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j).ValueCopy(i)})
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j).Value(i)})
case flux.TTime:
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: values.Time(er.Times(j).Value(i))})
case flux.TBool:
4 changes: 2 additions & 2 deletions stdlib/influxdata/influxdb/to.go
Original file line number Diff line number Diff line change
@@ -217,7 +217,7 @@ outer:
for j, col := range chunk.Cols() {
switch {
case col.Label == spec.MeasurementColumn:
metric.NameStr = er.Strings(j).ValueCopy(i)
metric.NameStr = er.Strings(j).Value(i)
case col.Label == timeColLabel:
valueTime := execute.ValueForRow(&er, i, j)
if valueTime.IsNull() {
@@ -230,7 +230,7 @@ outer:
return errors.New(codes.Invalid, "invalid type for tag column")
}

value := er.Strings(j).ValueCopy(i)
value := er.Strings(j).Value(i)
if value == "" {
// Skip tag value if it is empty.
continue
6 changes: 3 additions & 3 deletions stdlib/kafka/to.go
Original file line number Diff line number Diff line change
@@ -353,12 +353,12 @@ func (t *ToKafkaTransformation) Process(id execute.DatasetID, tbl flux.Table) (e
if col.Type != flux.TString {
return errors.New(codes.FailedPrecondition, "invalid type for measurement column")
}
m.name = er.Strings(j).ValueCopy(i)
m.name = er.Strings(j).Value(i)
case isTag[j]:
if col.Type != flux.TString {
return errors.New(codes.FailedPrecondition, "invalid type for measurement column")
}
m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j).ValueCopy(i)})
m.tags = append(m.tags, &protocol.Tag{Key: col.Label, Value: er.Strings(j).Value(i)})
case isValue[j]:
switch col.Type {
case flux.TFloat:
@@ -368,7 +368,7 @@ func (t *ToKafkaTransformation) Process(id execute.DatasetID, tbl flux.Table) (e
case flux.TUInt:
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.UInts(j).Value(i)})
case flux.TString:
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j).ValueCopy(i)})
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: er.Strings(j).Value(i)})
case flux.TTime:
m.fields = append(m.fields, &protocol.Field{Key: col.Label, Value: values.Time(er.Times(j).Value(i))})
case flux.TBool:
2 changes: 1 addition & 1 deletion stdlib/sql/to.go
Original file line number Diff line number Diff line change
@@ -471,7 +471,7 @@ func CreateInsertComponents(t *ToSQLTransformation, tbl flux.Table) (colNames []
valueArgs = append(valueArgs, nil)
break
}
valueArgs = append(valueArgs, er.Strings(j).ValueCopy(i))
valueArgs = append(valueArgs, er.Strings(j).Value(i))
case flux.TTime:
if er.Times(j).IsNull(i) {
valueArgs = append(valueArgs, nil)
2 changes: 1 addition & 1 deletion stdlib/universe/distinct.go
Original file line number Diff line number Diff line change
@@ -309,7 +309,7 @@ func (t *distinctTransformation) Process(id execute.DatasetID, tbl flux.Table) e
}
nullDistinct = true
} else {
v := cr.Strings(j).ValueCopy(i)
v := cr.Strings(j).Value(i)
if stringDistinct[v] {
continue
}
2 changes: 1 addition & 1 deletion stdlib/universe/key_values.go
Original file line number Diff line number Diff line change
@@ -396,7 +396,7 @@ func (t *keyValuesTransformation) Process(id execute.DatasetID, tbl flux.Table)
}
nullDistinct = true
} else {
v := vs.ValueCopy(i)
v := vs.Value(i)
if stringDistinct[[2]string{c.name, v}] {
continue
}
2 changes: 1 addition & 1 deletion stdlib/universe/mode.go
Original file line number Diff line number Diff line change
@@ -220,7 +220,7 @@ func (t *modeTransformation) doString(cr flux.ColReader, tbl flux.Table, builder
if cr.Strings(j).IsNull(i) {
continue
}
v := cr.Strings(j).ValueCopy(i)
v := cr.Strings(j).Value(i)
stringMode[v]++
}

Loading