Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
refactor: Pass the context to the executor compiler
  • Loading branch information
Markus Westerlind committed Sep 26, 2022
commit fa51c45dccbf0c140033e9a15039c5f9a048cd99
7 changes: 5 additions & 2 deletions compiler/compiler.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package compiler

import (
"context"

"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/values"
)

func Compile(scope Scope, f *semantic.FunctionExpression, in semantic.MonoType) (Func, error) {
func Compile(ctx context.Context, scope Scope, f *semantic.FunctionExpression, in semantic.MonoType) (Func, error) {
if scope == nil {
scope = NewScope()
}
Expand Down Expand Up @@ -60,7 +62,7 @@ func Compile(scope Scope, f *semantic.FunctionExpression, in semantic.MonoType)
}
}

compiler := &compiler{}
compiler := &compiler{ctx}
root, err := compiler.compile(f.Block, subst)
if err != nil {
return nil, errors.Wrapf(err, codes.Inherit, "cannot compile @ %v", f.Location())
Expand Down Expand Up @@ -401,6 +403,7 @@ func apply(sub semantic.Substitutor, props []semantic.PropertyType, t semantic.M
}

type compiler struct {
ctx context.Context
}

// compile recursively compiles semantic nodes into evaluators.
Expand Down
2 changes: 1 addition & 1 deletion compiler/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,7 +1301,7 @@ func (f *functionValue) Call(ctx context.Context, args values.Object) (values.Va
}
defer releaseScope(scope)

fn, err := Compile(scope, f.fn, args.Type())
fn, err := Compile(ctx, scope, f.fn, args.Type())
if err != nil {
return nil, err
}
Expand Down
28 changes: 14 additions & 14 deletions execute/row_fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (f *dynamicFn) typeof(cols []flux.ColMeta, vectorized bool) (semantic.MonoT
return semantic.NewObjectType(properties), nil
}

func (f *dynamicFn) compileFunction(cols []flux.ColMeta, extraTypes map[string]semantic.MonoType, vectorized bool) error {
func (f *dynamicFn) compileFunction(ctx context.Context, cols []flux.ColMeta, extraTypes map[string]semantic.MonoType, vectorized bool) error {

// If the types have not changed we do not need to recompile, just use the cached version
if f.compiledFn == nil || !f.compiledFn.isCacheHit(cols, extraTypes, vectorized) {
Expand All @@ -101,7 +101,7 @@ func (f *dynamicFn) compileFunction(cols []flux.ColMeta, extraTypes map[string]s
}

inType := semantic.NewObjectType(properties)
fn, err := compiler.Compile(f.scope, f.fn, inType)
fn, err := compiler.Compile(ctx, f.scope, f.fn, inType)
if err != nil {
return err
}
Expand All @@ -117,8 +117,8 @@ func (f *dynamicFn) compileFunction(cols []flux.ColMeta, extraTypes map[string]s
return nil
}

func (f *dynamicFn) prepare(cols []flux.ColMeta, extraTypes map[string]semantic.MonoType, vectorized bool) (preparedFn, error) {
err := f.compileFunction(cols, extraTypes, vectorized)
func (f *dynamicFn) prepare(ctx context.Context, cols []flux.ColMeta, extraTypes map[string]semantic.MonoType, vectorized bool) (preparedFn, error) {
err := f.compileFunction(ctx, cols, extraTypes, vectorized)
if err != nil {
return preparedFn{}, err
}
Expand Down Expand Up @@ -213,8 +213,8 @@ func NewTablePredicateFn(fn *semantic.FunctionExpression, scope compiler.Scope)
}
}

func (f *TablePredicateFn) Prepare(tbl flux.Table) (*TablePredicatePreparedFn, error) {
fn, err := f.prepare(tbl.Key().Cols(), nil, false)
func (f *TablePredicateFn) Prepare(ctx context.Context, tbl flux.Table) (*TablePredicatePreparedFn, error) {
fn, err := f.prepare(ctx, tbl.Key().Cols(), nil, false)
if err != nil {
return nil, err
} else if fn.returnType().Nature() != semantic.Bool {
Expand Down Expand Up @@ -260,8 +260,8 @@ func NewRowPredicateFn(fn *semantic.FunctionExpression, scope compiler.Scope) *R
return &RowPredicateFn{dynamicFn: r}
}

func (f *RowPredicateFn) Prepare(cols []flux.ColMeta) (*RowPredicatePreparedFn, error) {
fn, err := f.prepare(cols, nil, false)
func (f *RowPredicateFn) Prepare(ctx context.Context, cols []flux.ColMeta) (*RowPredicatePreparedFn, error) {
fn, err := f.prepare(ctx, cols, nil, false)
if err != nil {
return nil, err
} else if fn.returnType().Nature() != semantic.Bool {
Expand Down Expand Up @@ -317,8 +317,8 @@ func NewRowMapFn(fn *semantic.FunctionExpression, scope compiler.Scope) *RowMapF
}
}

func (f *RowMapFn) Prepare(cols []flux.ColMeta) (*RowMapPreparedFn, error) {
fn, err := f.prepare(cols, nil, false)
func (f *RowMapFn) Prepare(ctx context.Context, cols []flux.ColMeta) (*RowMapPreparedFn, error) {
fn, err := f.prepare(ctx, cols, nil, false)
if err != nil {
return nil, err
} else if k := fn.returnType().Nature(); k != semantic.Object {
Expand Down Expand Up @@ -355,8 +355,8 @@ func NewRowReduceFn(fn *semantic.FunctionExpression, scope compiler.Scope) *RowR
}
}

func (f *RowReduceFn) Prepare(cols []flux.ColMeta, reducerType map[string]semantic.MonoType) (*RowReducePreparedFn, error) {
fn, err := f.prepare(cols, reducerType, false)
func (f *RowReduceFn) Prepare(ctx context.Context, cols []flux.ColMeta, reducerType map[string]semantic.MonoType) (*RowReducePreparedFn, error) {
fn, err := f.prepare(ctx, cols, reducerType, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -391,8 +391,8 @@ func NewRowJoinFn(fn *semantic.FunctionExpression, scope compiler.Scope) *RowJoi
}
}

func (f *RowJoinFn) Prepare(cols []flux.ColMeta, rightType map[string]semantic.MonoType, vectorized bool) (*RowJoinPreparedFn, error) {
fn, err := f.prepare(cols, rightType, vectorized)
func (f *RowJoinFn) Prepare(ctx context.Context, cols []flux.ColMeta, rightType map[string]semantic.MonoType, vectorized bool) (*RowJoinPreparedFn, error) {
fn, err := f.prepare(ctx, cols, rightType, vectorized)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions execute/vector_fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ func NewVectorMapFn(fn *semantic.FunctionExpression, scope compiler.Scope) *Vect
}
}

func (f *VectorMapFn) Prepare(cols []flux.ColMeta) (*VectorMapPreparedFn, error) {
fn, err := f.prepare(cols, nil, true)
func (f *VectorMapFn) Prepare(ctx context.Context, cols []flux.ColMeta) (*VectorMapPreparedFn, error) {
fn, err := f.prepare(ctx, cols, nil, true)
if err != nil {
return nil, err
} else if k := fn.returnType().Nature(); k != semantic.Object {
Expand Down
4 changes: 2 additions & 2 deletions stdlib/array/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func init() {
inputType := semantic.NewObjectType([]semantic.PropertyType{
{Key: []byte("x"), Value: elementType},
})
f, err := compiler.Compile(compiler.ToScope(fn.Scope), fn.Fn, inputType)
f, err := compiler.Compile(ctx, compiler.ToScope(fn.Scope), fn.Fn, inputType)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func init() {
inputType := semantic.NewObjectType([]semantic.PropertyType{
{Key: []byte("x"), Value: elementType},
})
f, err := compiler.Compile(compiler.ToScope(fn.Scope), fn.Fn, inputType)
f, err := compiler.Compile(ctx, compiler.ToScope(fn.Scope), fn.Fn, inputType)
if err != nil {
return nil, err
}
Expand Down
18 changes: 9 additions & 9 deletions stdlib/contrib/jsternberg/aggregate/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,13 @@ func (t *tableTransformation) prepare(cols []flux.ColMeta, n int) ([]*columnStat
arrayType := semantic.NewArrayType(flux.SemanticType(cols[j].Type))

cs := &columnState{Label: c.As, Column: j}
if err := cs.compileInitFunc(c, arrayType); err != nil {
if err := cs.compileInitFunc(t.ctx, c, arrayType); err != nil {
return nil, err
}
if err := cs.compileReduceFunc(c, arrayType); err != nil {
if err := cs.compileReduceFunc(t.ctx, c, arrayType); err != nil {
return nil, err
}
if err := cs.compileComputeFunc(c, n, t.mem); err != nil {
if err := cs.compileComputeFunc(t.ctx, c, n, t.mem); err != nil {
return nil, err
}
columns[i] = cs
Expand Down Expand Up @@ -420,40 +420,40 @@ func (cs *columnState) Write(ctx context.Context, state values.Value) error {
return arrow.AppendValue(cs.Builder, v)
}

func (cs *columnState) compileInitFunc(c TableColumn, arrayType semantic.MonoType) error {
func (cs *columnState) compileInitFunc(ctx context.Context, c TableColumn, arrayType semantic.MonoType) error {
cs.Init.Input = semantic.NewObjectType([]semantic.PropertyType{
{Key: []byte("values"), Value: arrayType},
})

scope := compiler.ToScope(c.Init.Scope)
fn, err := compiler.Compile(scope, c.Init.Fn, cs.Init.Input)
fn, err := compiler.Compile(ctx, scope, c.Init.Fn, cs.Init.Input)
if err != nil {
return errors.Wrap(err, codes.Inherit, "error compiling aggregate init function")
}
cs.Init.Fn = fn
return nil
}

func (cs *columnState) compileReduceFunc(c TableColumn, arrayType semantic.MonoType) error {
func (cs *columnState) compileReduceFunc(ctx context.Context, c TableColumn, arrayType semantic.MonoType) error {
cs.Reduce.Input = semantic.NewObjectType([]semantic.PropertyType{
{Key: []byte("values"), Value: arrayType},
{Key: []byte("state"), Value: cs.Init.Fn.Type()},
})
scope := compiler.ToScope(c.Reduce.Scope)
fn, err := compiler.Compile(scope, c.Reduce.Fn, cs.Reduce.Input)
fn, err := compiler.Compile(ctx, scope, c.Reduce.Fn, cs.Reduce.Input)
if err != nil {
return errors.Wrap(err, codes.Inherit, "error compiling aggregate reduce function")
}
cs.Reduce.Fn = fn
return nil
}

func (cs *columnState) compileComputeFunc(c TableColumn, n int, mem memory.Allocator) error {
func (cs *columnState) compileComputeFunc(ctx context.Context, c TableColumn, n int, mem memory.Allocator) error {
cs.Compute.Input = semantic.NewObjectType([]semantic.PropertyType{
{Key: []byte("state"), Value: cs.Reduce.Fn.Type()},
})
scope := compiler.ToScope(c.Compute.Scope)
fn, err := compiler.Compile(scope, c.Compute.Fn, cs.Compute.Input)
fn, err := compiler.Compile(ctx, scope, c.Compute.Fn, cs.Compute.Input)
if err != nil {
return errors.Wrap(err, codes.Inherit, "error compiling aggregate compute function")
}
Expand Down
6 changes: 3 additions & 3 deletions stdlib/experimental/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (c *mergeJoinCache) SetTriggerSpec(spec plan.TriggerSpec) {

func (c *mergeJoinCache) join(key flux.GroupKey, a, b *RowIterator) (flux.Table, error) {
// Compile row fn for the input rows
if err := c.fn.Prepare(a.columns, b.columns); err != nil {
if err := c.fn.Prepare(c.ctx, a.columns, b.columns); err != nil {
return nil, err
}

Expand Down Expand Up @@ -498,7 +498,7 @@ func newRowJoinFn(fn *semantic.FunctionExpression, scope compiler.Scope) *rowJoi
}
}

func (fn *rowJoinFn) Prepare(left, right []flux.ColMeta) error {
func (fn *rowJoinFn) Prepare(ctx context.Context, left, right []flux.ColMeta) error {
// Check the left and right types to make sure required properties are
// columns in their respective ColMeta.
fntype := fn.fn.TypeOf()
Expand Down Expand Up @@ -570,7 +570,7 @@ func (fn *rowJoinFn) Prepare(left, right []flux.ColMeta) error {
{Key: []byte("left"), Value: semantic.NewObjectType(l)},
{Key: []byte("right"), Value: semantic.NewObjectType(r)},
})
f, err := compiler.Compile(fn.scope, fn.fn, in)
f, err := compiler.Compile(ctx, fn.scope, fn.fn, in)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion stdlib/generate/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func createFromGeneratorSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID
s.Start = spec.Start
s.Stop = spec.Stop
s.Count = spec.Count
fn, err := compiler.Compile(compiler.ToScope(spec.Fn.Scope), spec.Fn.Fn, semantic.NewObjectType(
fn, err := compiler.Compile(a.Context(), compiler.ToScope(spec.Fn.Scope), spec.Fn.Fn, semantic.NewObjectType(
[]semantic.PropertyType{
{Key: []byte("n"), Value: semantic.BasicInt},
},
Expand Down
2 changes: 1 addition & 1 deletion stdlib/influxdata/influxdb/to.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (t *toTransformation) writeTable(chunk table.Chunk) (err error) {
var fn *execute.RowMapPreparedFn
if spec.FieldFn.Fn != nil {
var err error
if fn, err = t.fn.Prepare(columns); err != nil {
if fn, err = t.fn.Prepare(t.ctx, columns); err != nil {
return err
}
}
Expand Down
3 changes: 2 additions & 1 deletion stdlib/join/join_fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func NewJoinFn(fn interpreter.ResolvedFunction) *JoinFn {
}
}

func (f *JoinFn) Prepare(lcols, rcols []flux.ColMeta) error {
func (f *JoinFn) Prepare(ctx context.Context, lcols, rcols []flux.ColMeta) error {
typ := f.Type()
args, err := typ.SortedArguments()
if err != nil {
Expand Down Expand Up @@ -61,6 +61,7 @@ func (f *JoinFn) Prepare(lcols, rcols []flux.ColMeta) error {
})
f.args = values.NewObject(in)
prepared, err := f.fn.Prepare(
ctx,
lcols,
map[string]semantic.MonoType{"r": *f.rtyp},
false,
Expand Down
2 changes: 1 addition & 1 deletion stdlib/join/merge_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func (s *joinState) join(
} else {
rschema = defaultRight
}
err := fn.Prepare(lschema, rschema)
err := fn.Prepare(ctx, lschema, rschema)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion stdlib/universe/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ type filterTransformation struct {
func (t *filterTransformation) Process(chunk table.Chunk, d *execute.TransportDataset, mem arrowmem.Allocator) error {
// Prepare the function for the column types.
cols := chunk.Cols()
fn, err := t.fn.Prepare(cols)
fn, err := t.fn.Prepare(t.ctx, cols)
if err != nil {
// TODO(nathanielc): Should we not fail the query for failed compilation?
return err
Expand Down
8 changes: 4 additions & 4 deletions stdlib/universe/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (m *mapTransformation) Process(

// Prepare the compiled function for the set of columns.
cols := chunk.Cols()
fn, err := m.fn.Prepare(cols)
fn, err := m.fn.Prepare(m.ctx, cols)
if err != nil {
return err
}
Expand Down Expand Up @@ -324,7 +324,7 @@ func (m *mapTransformation) Close() error {
}

type mapFunc interface {
Prepare(cols []flux.ColMeta) (mapPreparedFunc, error)
Prepare(ctx context.Context, cols []flux.ColMeta) (mapPreparedFunc, error)
}

type mapPreparedFunc interface {
Expand All @@ -335,8 +335,8 @@ type mapRowFunc struct {
fn *execute.RowMapFn
}

func (m *mapRowFunc) Prepare(cols []flux.ColMeta) (mapPreparedFunc, error) {
fn, err := m.fn.Prepare(cols)
func (m *mapRowFunc) Prepare(ctx context.Context, cols []flux.ColMeta) (mapPreparedFunc, error) {
fn, err := m.fn.Prepare(ctx, cols)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions stdlib/universe/map_vectorized.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ type mapVectorFunc struct {
fn *execute.VectorMapFn
}

func (m *mapVectorFunc) Prepare(cols []flux.ColMeta) (mapPreparedFunc, error) {
fn, err := m.fn.Prepare(cols)
func (m *mapVectorFunc) Prepare(ctx context.Context, cols []flux.ColMeta) (mapPreparedFunc, error) {
fn, err := m.fn.Prepare(ctx, cols)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion stdlib/universe/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewReduceTransformation(ctx context.Context, spec *ReduceProcedureSpec, d e
func (t *reduceTransformation) Process(id execute.DatasetID, tbl flux.Table) error {
// Prepare the function with the column types list.
cols := tbl.Cols()
fn, err := t.fn.Prepare(cols, map[string]semantic.MonoType{"accumulator": t.identity.Type()})
fn, err := t.fn.Prepare(t.ctx, cols, map[string]semantic.MonoType{"accumulator": t.identity.Type()})
if err != nil {
return err
}
Expand Down
16 changes: 8 additions & 8 deletions stdlib/universe/schema_functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,31 +333,31 @@ func (s *DuplicateOpSpec) Copy() SchemaMutation {
}
}

func (s *RenameOpSpec) Mutator() (SchemaMutator, error) {
m, err := NewRenameMutator(s)
func (s *RenameOpSpec) Mutator(ctx context.Context) (SchemaMutator, error) {
m, err := NewRenameMutator(ctx, s)
if err != nil {
return nil, err
}
return m, nil
}

func (s *DropOpSpec) Mutator() (SchemaMutator, error) {
m, err := NewDropMutator(s)
func (s *DropOpSpec) Mutator(ctx context.Context) (SchemaMutator, error) {
m, err := NewDropMutator(ctx, s)
if err != nil {
return nil, err
}
return m, nil
}

func (s *KeepOpSpec) Mutator() (SchemaMutator, error) {
m, err := NewKeepMutator(s)
func (s *KeepOpSpec) Mutator(ctx context.Context) (SchemaMutator, error) {
m, err := NewKeepMutator(ctx, s)
if err != nil {
return nil, err
}
return m, nil
}

func (s *DuplicateOpSpec) Mutator() (SchemaMutator, error) {
func (s *DuplicateOpSpec) Mutator(ctx context.Context) (SchemaMutator, error) {
m, err := NewDuplicateMutator(s)
if err != nil {
return nil, err
Expand Down Expand Up @@ -415,7 +415,7 @@ func createSchemaMutationTransformation(id execute.DatasetID, mode execute.Accum
func NewSchemaMutationTransformation(ctx context.Context, spec *SchemaMutationProcedureSpec, id execute.DatasetID, mem memory.Allocator) (execute.Transformation, execute.Dataset, error) {
mutators := make([]SchemaMutator, len(spec.Mutations))
for i, mutation := range spec.Mutations {
m, err := mutation.Mutator()
m, err := mutation.Mutator(ctx)
if err != nil {
return nil, nil, err
}
Expand Down
Loading