Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit ffb51f8

Browse files
author
Markus Westerlind
authoredMar 29, 2022
feat: Track freed memory with SetFinalizer (#4597)
* feat: Track freed memory with SetFinalizer Instead of manual `Retain` and `Release` calls. I left those calls in right now, but since `Free` no longer actually frees the memory we can still see this works by observing the actual free being don in the finalizer. As we can see from the repeated `GC` calls (and from being forced to manually GC in the first place), the maximum memory in use at any time will be larger and garbage collection may only clear up part of what is unreachable. * feat: Enable the SetFinalizer based memory tracker by feature flag
1 parent e031aa0 commit ffb51f8

File tree

14 files changed

+274
-36
lines changed

14 files changed

+274
-36
lines changed
 

‎array/array_test.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,27 @@ func TestString(t *testing.T) {
121121
}
122122
}
123123

124+
type A struct {
125+
}
126+
127+
type B struct {
128+
A
129+
}
130+
131+
type C interface {
132+
C() string
133+
}
134+
135+
func (*A) C() string {
136+
return "A"
137+
}
138+
139+
func (*B) C() string {
140+
return "B"
141+
}
142+
124143
func TestNewStringFromBinaryArray(t *testing.T) {
125-
alloc := &fluxmemory.ResourceAllocator{}
144+
alloc := &fluxmemory.GcAllocator{ResourceAllocator: &fluxmemory.ResourceAllocator{}}
126145
// Need to use the Apache binary builder to be able to create an actual
127146
// Arrow Binary array.
128147
sb := apachearray.NewBinaryBuilder(alloc, array.StringType)
@@ -144,6 +163,9 @@ func TestNewStringFromBinaryArray(t *testing.T) {
144163

145164
a.Release()
146165
s.Release()
166+
167+
alloc.GC()
168+
147169
if want, got := int64(0), alloc.Allocated(); want != got {
148170
t.Errorf("epxected allocated to be %v, was %v", want, got)
149171
}

‎compiler/vectorized_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func TestVectorizedFns(t *testing.T) {
106106
for _, tc := range testCases {
107107
t.Run(tc.name, func(t *testing.T) {
108108
checked := arrow.NewCheckedAllocator(memory.DefaultAllocator)
109-
mem := &memory.ResourceAllocator{Allocator: checked}
109+
mem := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{Allocator: checked}}
110110

111111
pkg, err := runtime.AnalyzeSource(tc.fn)
112112
if err != nil {
@@ -140,13 +140,15 @@ func TestVectorizedFns(t *testing.T) {
140140
t.Fatalf("unexpected error: %s", err)
141141
}
142142

143-
want := vectorizedObjectFromMap(tc.want, &memory.ResourceAllocator{})
143+
want := vectorizedObjectFromMap(tc.want, &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
144144
if !cmp.Equal(want, got, CmpOptions...) {
145145
t.Errorf("unexpected value -want/+got\n%s", cmp.Diff(want, got, CmpOptions...))
146146
}
147147

148148
got.Release()
149149
input.Release()
150+
151+
mem.GC()
150152
checked.AssertSize(t, 0)
151153
})
152154
}

‎execute/dataset_test.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,15 @@ func TestTransportDataset_Process(t *testing.T) {
5151
dataset.AddTransformation(transport)
5252

5353
mem := arrowmem.NewCheckedAllocator(memory.DefaultAllocator)
54-
defer mem.AssertSize(t, 0)
55-
alloc := &memory.ResourceAllocator{
54+
alloc := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{
5655
Allocator: mem,
57-
}
56+
}}
57+
58+
defer func() {
59+
alloc.GC()
60+
mem.AssertSize(t, 0)
61+
}()
62+
5863
buffer := arrow.TableBuffer{
5964
GroupKey: execute.NewGroupKey(nil, nil),
6065
Columns: []flux.ColMeta{
@@ -96,9 +101,15 @@ func TestTransportDataset_AddTransformation(t *testing.T) {
96101

97102
mem := arrowmem.NewCheckedAllocator(memory.DefaultAllocator)
98103
defer mem.AssertSize(t, 0)
99-
alloc := &memory.ResourceAllocator{
104+
alloc := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{
100105
Allocator: mem,
101-
}
106+
}}
107+
108+
defer func() {
109+
alloc.GC()
110+
mem.AssertSize(t, 0)
111+
}()
112+
102113
buffer := arrow.TableBuffer{
103114
GroupKey: execute.NewGroupKey(nil, nil),
104115
Columns: []flux.ColMeta{
@@ -195,9 +206,15 @@ func TestTransportDataset_MultipleDownstream(t *testing.T) {
195206

196207
mem := arrowmem.NewCheckedAllocator(memory.DefaultAllocator)
197208
defer mem.AssertSize(t, 0)
198-
alloc := &memory.ResourceAllocator{
209+
alloc := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{
199210
Allocator: mem,
200-
}
211+
}}
212+
213+
defer func() {
214+
alloc.GC()
215+
mem.AssertSize(t, 0)
216+
}()
217+
201218
buffer := arrow.TableBuffer{
202219
GroupKey: execute.NewGroupKey(nil, nil),
203220
Columns: []flux.ColMeta{

‎execute/executetest/table.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,7 @@ func (tt TableTest) run(t *testing.T, name string, f func(tt *tableTest)) {
666666
TableTest: tt,
667667
t: t,
668668
logger: zaptest.NewLogger(t),
669-
alloc: &memory.ResourceAllocator{},
669+
alloc: &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}},
670670
})
671671
})
672672
}
@@ -675,7 +675,7 @@ type tableTest struct {
675675
TableTest
676676
t *testing.T
677677
logger *zap.Logger
678-
alloc *memory.ResourceAllocator
678+
alloc *memory.GcAllocator
679679
}
680680

681681
func (tt *tableTest) do(f func(tbl flux.Table) error) {
@@ -710,6 +710,8 @@ func (tt *tableTest) finish(allocatorUsed bool) {
710710
}
711711
}
712712

713+
tt.alloc.GC()
714+
713715
// Verify that all memory is correctly released if we use the table properly.
714716
if got := tt.alloc.Allocated(); got != 0 {
715717
tt.t.Errorf("caught memory leak: %d bytes were not released", got)

‎execute/table_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ func TestColListTable(t *testing.T) {
229229

230230
func TestColListTable_AppendNil(t *testing.T) {
231231
key := execute.NewGroupKey(nil, nil)
232-
tb := execute.NewColListTableBuilder(key, &memory.ResourceAllocator{})
232+
tb := execute.NewColListTableBuilder(key, &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
233233

234234
// Add a column for the value.
235235
idx, _ := tb.AddCol(flux.ColMeta{
@@ -268,7 +268,7 @@ func TestColListTable_AppendNil(t *testing.T) {
268268

269269
func TestColListTable_SetNil(t *testing.T) {
270270
key := execute.NewGroupKey(nil, nil)
271-
tb := execute.NewColListTableBuilder(key, &memory.ResourceAllocator{})
271+
tb := execute.NewColListTableBuilder(key, &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
272272

273273
// Add a column for the value.
274274
idx, _ := tb.AddCol(flux.ColMeta{
@@ -307,7 +307,7 @@ func TestColListTable_SetNil(t *testing.T) {
307307
}
308308

309309
func TestCopyTable(t *testing.T) {
310-
alloc := &memory.ResourceAllocator{}
310+
alloc := &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}}
311311

312312
input, err := gen.Input(context.Background(), gen.Schema{
313313
Tags: []gen.Tag{
@@ -363,6 +363,7 @@ func TestCopyTable(t *testing.T) {
363363
buf.Done()
364364
}
365365

366+
alloc.GC()
366367
// Ensure there has been no memory leak.
367368
if got, want := alloc.Allocated(), int64(0); got != want {
368369
t.Errorf("memory leak -want/+got:\n\t- %d\n\t+ %d", want, got)
@@ -449,7 +450,7 @@ func TestColListTableBuilder_AppendValues(t *testing.T) {
449450
name: "Strings",
450451
typ: flux.TString,
451452
values: func() array.Array {
452-
b := arrow.NewStringBuilder(&memory.ResourceAllocator{})
453+
b := arrow.NewStringBuilder(&memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
453454
b.Append("a")
454455
b.Append("d")
455456
b.AppendNull()
@@ -517,7 +518,7 @@ func TestColListTableBuilder_AppendValues(t *testing.T) {
517518
} {
518519
t.Run(tt.name, func(t *testing.T) {
519520
key := execute.NewGroupKey(nil, nil)
520-
b := execute.NewColListTableBuilder(key, &memory.ResourceAllocator{})
521+
b := execute.NewColListTableBuilder(key, &memory.GcAllocator{ResourceAllocator: &memory.ResourceAllocator{}})
521522
if _, err := b.AddCol(flux.ColMeta{Label: "_value", Type: tt.typ}); err != nil {
522523
t.Fatal(err)
523524
}

‎go.mod

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/influxdata/flux
22

3-
go 1.16
3+
go 1.17
44

55
require (
66
cloud.google.com/go v0.82.0
@@ -59,3 +59,69 @@ require (
5959
google.golang.org/grpc v1.44.0
6060
gopkg.in/yaml.v2 v2.3.0
6161
)
62+
63+
require (
64+
cloud.google.com/go/bigquery v1.8.0 // indirect
65+
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
66+
github.com/Azure/azure-storage-blob-go v0.14.0 // indirect
67+
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
68+
github.com/Azure/go-autorest/autorest/azure/cli v0.4.2 // indirect
69+
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
70+
github.com/Azure/go-autorest/logger v0.2.1 // indirect
71+
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
72+
github.com/Masterminds/semver v1.4.2 // indirect
73+
github.com/apache/arrow/go/arrow v0.0.0-20211112161151-bc219186db40 // indirect
74+
github.com/aws/aws-sdk-go v1.29.16 // indirect
75+
github.com/aws/aws-sdk-go-v2 v1.11.0 // indirect
76+
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 // indirect
77+
github.com/aws/aws-sdk-go-v2/credentials v1.6.1 // indirect
78+
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.7.1 // indirect
79+
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.0 // indirect
80+
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.0 // indirect
81+
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 // indirect
82+
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.0 // indirect
83+
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.9.0 // indirect
84+
github.com/aws/aws-sdk-go-v2/service/s3 v1.19.0 // indirect
85+
github.com/aws/smithy-go v1.9.0 // indirect
86+
github.com/cespare/xxhash v1.1.0 // indirect
87+
github.com/davecgh/go-spew v1.1.1 // indirect
88+
github.com/deepmap/oapi-codegen v1.6.0 // indirect
89+
github.com/dimchansky/utfbom v1.1.0 // indirect
90+
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
91+
github.com/gabriel-vasile/mimetype v1.4.0 // indirect
92+
github.com/goccy/go-json v0.7.10 // indirect
93+
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect
94+
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
95+
github.com/golang/protobuf v1.5.2 // indirect
96+
github.com/google/uuid v1.3.0 // indirect
97+
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
98+
github.com/inconshreveable/mousetrap v1.0.0 // indirect
99+
github.com/jmespath/go-jmespath v0.4.0 // indirect
100+
github.com/jstemmer/go-junit-report v0.9.1 // indirect
101+
github.com/klauspost/compress v1.13.6 // indirect
102+
github.com/mattn/go-colorable v0.1.8 // indirect
103+
github.com/mattn/go-ieproxy v0.0.1 // indirect
104+
github.com/mattn/go-isatty v0.0.12 // indirect
105+
github.com/miekg/dns v1.1.22 // indirect
106+
github.com/mitchellh/go-homedir v1.1.0 // indirect
107+
github.com/pierrec/lz4/v4 v4.1.11 // indirect
108+
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
109+
github.com/pmezard/go-difflib v1.0.0 // indirect
110+
github.com/sirupsen/logrus v1.8.1 // indirect
111+
github.com/spf13/pflag v1.0.3 // indirect
112+
github.com/uber-go/tally v3.3.15+incompatible // indirect
113+
go.opencensus.io v0.23.0 // indirect
114+
go.uber.org/atomic v1.7.0 // indirect
115+
go.uber.org/multierr v1.6.0 // indirect
116+
golang.org/x/crypto v0.0.0-20211117183948-ae814b36b871 // indirect
117+
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
118+
golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57 // indirect
119+
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c // indirect
120+
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect
121+
golang.org/x/text v0.3.7 // indirect
122+
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
123+
google.golang.org/appengine v1.6.7 // indirect
124+
google.golang.org/genproto v0.0.0-20210630183607-d20f26d13c79 // indirect
125+
google.golang.org/protobuf v1.27.1 // indirect
126+
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
127+
)

‎internal/feature/flags.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

‎internal/feature/flags.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,9 @@
6969
key: optimizeStateTracking
7070
default: false
7171
contact: Sean Brickley
72+
73+
- name: SetFinalizer Memory Tracking
74+
description: Enable SetFinalizer based memory tracking (as opposed to explicit Retain/Release)
75+
key: setFinalizerMemoryTracking
76+
default: false
77+
contact: Markus Westerlind

‎lang/compiler.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -280,14 +280,27 @@ func (p *Program) Start(ctx context.Context, alloc memory.Allocator) (flux.Query
280280
// This span gets closed by the query when it is done.
281281
s, cctx := opentracing.StartSpanFromContext(ctx, "execute")
282282
results := make(chan flux.Result)
283+
284+
resourceAlloc, ok := alloc.(*memory.ResourceAllocator)
285+
if !ok {
286+
resourceAlloc = &memory.ResourceAllocator{
287+
Allocator: alloc,
288+
}
289+
}
290+
291+
var statsAlloc memory.StatsAllocator
292+
if feature.SetfinalizerMemoryTracking().Enabled(ctx) {
293+
statsAlloc = &memory.GcAllocator{ResourceAllocator: resourceAlloc}
294+
} else {
295+
statsAlloc = resourceAlloc
296+
}
297+
283298
q := &query{
284299
ctx: cctx,
285300
results: results,
286-
alloc: &memory.ResourceAllocator{
287-
Allocator: alloc,
288-
},
289-
span: s,
290-
cancel: cancel,
301+
alloc: statsAlloc,
302+
span: s,
303+
cancel: cancel,
291304
stats: flux.Statistics{
292305
Metadata: make(metadata.Metadata),
293306
},

‎lang/query.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ type query struct {
1515
ctx context.Context
1616
results chan flux.Result
1717
stats flux.Statistics
18-
alloc *memory.ResourceAllocator
18+
alloc memory.StatsAllocator
1919
span opentracing.Span
2020
cancel func()
2121
err error

0 commit comments

Comments
 (0)
Please sign in to comment.