Skip to content

Commit b80b457

Browse files
author
jlapacik
committedOct 26, 2018
add the notion of bounds to plan nodes (#142)
1 parent e2fe43f commit b80b457

File tree

13 files changed

+739
-28
lines changed

13 files changed

+739
-28
lines changed
 

‎bounds.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@ import "time"
55
type Bounds struct {
66
Start Time
77
Stop Time
8+
Now time.Time
89
}
910

1011
// IsEmpty reports whether the given bounds
1112
// are empty, i.e., if start >= stop.
12-
func (b Bounds) IsEmpty(now time.Time) bool {
13-
return b.Start.Time(now).Equal(b.Stop.Time(now)) || b.Start.Time(now).After(b.Stop.Time(now))
13+
func (b Bounds) IsEmpty() bool {
14+
return b.Start.Time(b.Now).Equal(b.Stop.Time(b.Now)) || b.Start.Time(b.Now).After(b.Stop.Time(b.Now))
1415
}
1516

1617
// HasZero returns true if the given bounds contain a Go zero time value as either Start or Stop.

‎bounds_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,53 +73,53 @@ func TestBounds_IsEmpty(t *testing.T) {
7373
}{
7474
{
7575
name: "empty bounds / start == stop",
76-
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
7776
bounds: flux.Bounds{
7877
Start: flux.Now,
7978
Stop: flux.Now,
79+
Now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
8080
},
8181
want: true,
8282
},
8383
{
8484
name: "empty bounds / absolute now == relative now",
85-
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
8685
bounds: flux.Bounds{
8786
Start: flux.Now,
8887
Stop: flux.Time{
8988
Absolute: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
9089
},
90+
Now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
9191
},
9292
want: true,
9393
},
9494
{
9595
name: "start > stop",
96-
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
9796
bounds: flux.Bounds{
9897
Start: flux.Time{
9998
IsRelative: true,
10099
Relative: time.Hour,
101100
},
102101
Stop: flux.Now,
102+
Now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
103103
},
104104
want: true,
105105
},
106106
{
107107
name: "start < stop",
108-
now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
109108
bounds: flux.Bounds{
110109
Start: flux.Time{
111110
IsRelative: true,
112111
Relative: -1 * time.Hour,
113112
},
114113
Stop: flux.Now,
114+
Now: time.Date(2018, time.August, 14, 11, 0, 0, 0, time.UTC),
115115
},
116116
want: false,
117117
},
118118
}
119119

120120
for _, tt := range tests {
121121
t.Run(tt.name, func(t *testing.T) {
122-
got := tt.bounds.IsEmpty(tt.now)
122+
got := tt.bounds.IsEmpty()
123123
if got != tt.want {
124124
t.Errorf("unexpected result for bounds.IsEmpty(): got %t, want %t", got, tt.want)
125125
}

‎execute/executor.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,11 +138,21 @@ func (v *createExecutionNodeVisitor) Visit(node plan.PlanNode) error {
138138
kind := spec.Kind()
139139
id := plan.ProcedureIDFromNodeID(node.ID())
140140

141+
// Add explicit stream context if bounds are set on this node
142+
var streamContext streamContext
143+
if node.Bounds() != nil {
144+
streamContext.bounds = &Bounds{
145+
Start: node.Bounds().Start,
146+
Stop: node.Bounds().Stop,
147+
}
148+
}
149+
141150
// Build execution context
142151
ec := executionContext{
143-
ctx: v.ctx,
144-
es: v.es,
145-
parents: make([]DatasetID, len(node.Predecessors())),
152+
ctx: v.ctx,
153+
es: v.es,
154+
parents: make([]DatasetID, len(node.Predecessors())),
155+
streamContext: streamContext,
146156
}
147157

148158
for i, pred := range node.Predecessors() {

‎functions/transformations/range.go

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
plan "github.com/influxdata/flux/planner"
99
"github.com/influxdata/flux/semantic"
1010
"github.com/influxdata/flux/values"
11+
"github.com/pkg/errors"
1112
)
1213

1314
const RangeKind = "range"
@@ -100,6 +101,18 @@ type RangeProcedureSpec struct {
100101
StopCol string
101102
}
102103

104+
// TimeBounds implements plan.BoundsAwareProcedureSpec
105+
func (s *RangeProcedureSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bounds {
106+
bounds := &plan.Bounds{
107+
Start: values.ConvertTime(s.Bounds.Start.Time(s.Bounds.Now)),
108+
Stop: values.ConvertTime(s.Bounds.Stop.Time(s.Bounds.Now)),
109+
}
110+
if predecessorBounds != nil {
111+
bounds = bounds.Intersect(predecessorBounds)
112+
}
113+
return bounds
114+
}
115+
103116
func newRangeProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
104117
spec, ok := qs.(*RangeOpSpec)
105118

@@ -111,11 +124,21 @@ func newRangeProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.Proc
111124
spec.TimeCol = execute.DefaultTimeColLabel
112125
}
113126

127+
bounds := flux.Bounds{
128+
Start: spec.Start,
129+
Stop: spec.Stop,
130+
Now: pa.Now(),
131+
}
132+
133+
if bounds.HasZero() {
134+
return nil, errors.New(`cannot pass zero time to 'range'`)
135+
}
136+
if bounds.IsEmpty() {
137+
return nil, errors.New("cannot query an empty range")
138+
}
139+
114140
return &RangeProcedureSpec{
115-
Bounds: flux.Bounds{
116-
Start: spec.Start,
117-
Stop: spec.Stop,
118-
},
141+
Bounds: bounds,
119142
TimeCol: spec.TimeCol,
120143
StartCol: spec.StartCol,
121144
StopCol: spec.StopCol,
@@ -142,12 +165,9 @@ func createRangeTransformation(id execute.DatasetID, mode execute.AccumulationMo
142165
cache := execute.NewTableBuilderCache(a.Allocator())
143166
d := execute.NewDataset(id, mode, cache)
144167

145-
bounds := execute.Bounds{
146-
Start: a.ResolveTime(s.Bounds.Start),
147-
Stop: a.ResolveTime(s.Bounds.Stop),
148-
}
168+
bounds := a.StreamContext().Bounds()
149169

150-
t, err := NewRangeTransformation(d, cache, s, bounds)
170+
t, err := NewRangeTransformation(d, cache, s, *bounds)
151171
if err != nil {
152172
return nil, nil, err
153173
}

‎functions/transformations/shift.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package transformations
22

33
import (
44
"fmt"
5+
"time"
56

67
"github.com/influxdata/flux"
78
"github.com/influxdata/flux/execute"
@@ -73,9 +74,18 @@ type ShiftProcedureSpec struct {
7374
plan.DefaultCost
7475
Shift flux.Duration
7576
Columns []string
77+
Now time.Time
7678
}
7779

78-
func newShiftProcedure(qs flux.OperationSpec, _ plan.Administration) (plan.ProcedureSpec, error) {
80+
// TimeBounds implements plan.BoundsAwareProcedureSpec
81+
func (s *ShiftProcedureSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bounds {
82+
if predecessorBounds != nil {
83+
return predecessorBounds.Shift(values.Duration(s.Shift))
84+
}
85+
return nil
86+
}
87+
88+
func newShiftProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
7989
spec, ok := qs.(*ShiftOpSpec)
8090
if !ok {
8191
return nil, fmt.Errorf("invalid spec type %T", qs)
@@ -84,6 +94,7 @@ func newShiftProcedure(qs flux.OperationSpec, _ plan.Administration) (plan.Proce
8494
return &ShiftProcedureSpec{
8595
Shift: spec.Shift,
8696
Columns: spec.Columns,
97+
Now: pa.Now(),
8798
}, nil
8899
}
89100

‎planner/bounds.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package planner
2+
3+
import "github.com/influxdata/flux/values"
4+
5+
// EmptyBounds is a time range containing only a single point
6+
var EmptyBounds = &Bounds{
7+
Start: values.Time(0),
8+
Stop: values.Time(0),
9+
}
10+
11+
// Bounds is a range of time
12+
type Bounds struct {
13+
Start values.Time
14+
Stop values.Time
15+
}
16+
17+
// BoundsAwareProcedureSpec is any procedure
18+
// that modifies the time bounds of its data.
19+
type BoundsAwareProcedureSpec interface {
20+
TimeBounds(predecessorBounds *Bounds) *Bounds
21+
}
22+
23+
// ComputeBounds computes the time bounds for a
24+
// plan node from the bounds of its predecessors.
25+
func ComputeBounds(node PlanNode) error {
26+
var bounds *Bounds
27+
28+
for _, pred := range node.Predecessors() {
29+
30+
if pred.Bounds() != nil && bounds == nil {
31+
bounds = pred.Bounds()
32+
}
33+
if pred.Bounds() != nil && bounds != nil {
34+
bounds = bounds.Union(pred.Bounds())
35+
}
36+
}
37+
38+
if s, ok := node.ProcedureSpec().(BoundsAwareProcedureSpec); ok {
39+
bounds = s.TimeBounds(bounds)
40+
}
41+
node.SetBounds(bounds)
42+
return nil
43+
}
44+
45+
// IsEmpty reports whether the given bounds contain at most a single point
46+
func (b *Bounds) IsEmpty() bool {
47+
return b.Start >= b.Stop
48+
}
49+
50+
// Contains reports whether a given time is contained within the time range
51+
func (b *Bounds) Contains(t values.Time) bool {
52+
return t >= b.Start && t < b.Stop
53+
}
54+
55+
// Overlaps reports whether two given bounds have overlapping time ranges
56+
func (b *Bounds) Overlaps(o *Bounds) bool {
57+
return b.Contains(o.Start) ||
58+
(b.Contains(o.Stop) && o.Stop > b.Start) ||
59+
o.Contains(b.Start)
60+
}
61+
62+
// Union returns the smallest bounds which contain both input bounds.
63+
// It returns empty bounds if one of the input bounds are empty.
64+
func (b *Bounds) Union(o *Bounds) *Bounds {
65+
if b.IsEmpty() || o.IsEmpty() {
66+
return EmptyBounds
67+
}
68+
u := new(Bounds)
69+
70+
u.Start = b.Start
71+
if o.Start < b.Start {
72+
u.Start = o.Start
73+
}
74+
75+
u.Stop = b.Stop
76+
if o.Stop > b.Stop {
77+
u.Stop = o.Stop
78+
}
79+
80+
return u
81+
}
82+
83+
// Intersect returns the intersection of two bounds.
84+
// It returns empty bounds if one of the input bounds are empty.
85+
func (b *Bounds) Intersect(o *Bounds) *Bounds {
86+
if b.IsEmpty() || o.IsEmpty() || !b.Overlaps(o) {
87+
return EmptyBounds
88+
}
89+
i := new(Bounds)
90+
91+
i.Start = b.Start
92+
if o.Start > b.Start {
93+
i.Start = o.Start
94+
}
95+
96+
i.Stop = b.Stop
97+
if o.Stop < b.Stop {
98+
i.Stop = o.Stop
99+
}
100+
101+
return i
102+
}
103+
104+
// Shift moves the start and stop values of a time range by a specified duration
105+
func (b *Bounds) Shift(d values.Duration) *Bounds {
106+
return &Bounds{
107+
Start: b.Start.Add(d),
108+
Stop: b.Stop.Add(d),
109+
}
110+
}

‎planner/bounds_test.go

Lines changed: 509 additions & 0 deletions
Large diffs are not rendered by default.

‎planner/logical.go

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package planner
22

33
import (
44
"fmt"
5+
"time"
56

67
"github.com/influxdata/flux"
78
)
@@ -67,7 +68,7 @@ func WithLogicalRule(rule Rule) LogicalOption {
6768

6869
// Plan translates the given flux.Spec to a plan and transforms it by applying rules.
6970
func (l *logicalPlanner) Plan(spec *flux.Spec) (*PlanSpec, error) {
70-
logicalPlan, err := createLogicalPlan(spec, l)
71+
logicalPlan, err := createLogicalPlan(spec)
7172
if err != nil {
7273
return nil, err
7374
}
@@ -80,14 +81,24 @@ func (l *logicalPlanner) Plan(spec *flux.Spec) (*PlanSpec, error) {
8081
return newLogicalPlan, nil
8182
}
8283

83-
func (logicalPlanner) ConvertID(oid flux.OperationID) ProcedureID {
84+
// TODO: This is unnecessary if we no longer need ProcedureIDs
85+
type administration struct {
86+
now time.Time
87+
}
88+
89+
func (a administration) Now() time.Time {
90+
return a.now
91+
}
92+
93+
func (a administration) ConvertID(oid flux.OperationID) ProcedureID {
8494
return ProcedureIDFromOperationID(oid)
8595
}
8696

8797
// LogicalPlanNode consists of the input and output edges and a procedure spec
8898
// that describes what the node does.
8999
type LogicalPlanNode struct {
90100
edges
101+
bounds
91102
id NodeID
92103
Spec ProcedureSpec
93104
}
@@ -116,15 +127,16 @@ func (lpn *LogicalPlanNode) ShallowCopy() PlanNode {
116127
}
117128

118129
// createLogicalPlan creates a logical query plan from a flux spec
119-
func createLogicalPlan(spec *flux.Spec, a Administration) (*PlanSpec, error) {
130+
func createLogicalPlan(spec *flux.Spec) (*PlanSpec, error) {
120131
nodes := make(map[flux.OperationID]PlanNode, len(spec.Operations))
132+
admin := administration{now: spec.Now}
121133

122134
plan := NewPlanSpec()
123135
plan.Resources = spec.Resources
124136
plan.Now = spec.Now
125137

126138
v := &fluxSpecVisitor{
127-
a: a,
139+
a: admin,
128140
spec: spec,
129141
plan: plan,
130142
nodes: nodes,

‎planner/logical_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@ import (
1414
"github.com/influxdata/flux/semantic"
1515
)
1616

17-
func compile(fluxText string) (*flux.Spec, error) {
18-
now := time.Now().UTC()
17+
func compile(fluxText string, now time.Time) (*flux.Spec, error) {
1918
return flux.Compile(context.Background(), fluxText, now)
2019
}
2120

2221
// Test the translation of Flux query to logical plan
2322
func TestFluxSpecToLogicalPlan(t *testing.T) {
23+
now := time.Now().UTC()
2424
testcases := []struct {
2525
// Name of the test
2626
name string
@@ -48,6 +48,7 @@ func TestFluxSpecToLogicalPlan(t *testing.T) {
4848
Stop: flux.Time{
4949
IsRelative: true,
5050
},
51+
Now: now,
5152
},
5253
TimeCol: "_time",
5354
StartCol: "_start",
@@ -76,6 +77,7 @@ func TestFluxSpecToLogicalPlan(t *testing.T) {
7677
Stop: flux.Time{
7778
IsRelative: true,
7879
},
80+
Now: now,
7981
},
8082
TimeCol: "_time",
8183
StartCol: "_start",
@@ -105,7 +107,7 @@ func TestFluxSpecToLogicalPlan(t *testing.T) {
105107
t.Run(tc.name, func(t *testing.T) {
106108
t.Parallel()
107109

108-
spec, err := compile(tc.query)
110+
spec, err := compile(tc.query, now)
109111

110112
if err != nil {
111113
t.Fatal(err)
@@ -308,7 +310,7 @@ func TestLogicalPlanner(t *testing.T) {
308310
t.Run(tc.name, func(t *testing.T) {
309311
t.Parallel()
310312

311-
fluxSpec, err := compile(tc.flux)
313+
fluxSpec, err := compile(tc.flux, time.Now().UTC())
312314
if err != nil {
313315
t.Fatalf("could not compile flux query: %v", err)
314316
}

‎planner/physical.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ func (pp *physicalPlanner) Plan(spec *PlanSpec) (*PlanSpec, error) {
5151
return nil, err
5252
}
5353

54+
// Compute time bounds for nodes in the plan
55+
if err := final.BottomUpWalk(ComputeBounds); err != nil {
56+
return nil, err
57+
}
58+
5459
// Update memory quota
5560
if final.Resources.MemoryBytesQuota == 0 {
5661
final.Resources.MemoryBytesQuota = pp.defaultMemoryLimit
@@ -132,6 +137,7 @@ type PhysicalProcedureSpec interface {
132137
// PhysicalPlanNode represents a physical operation in a plan.
133138
type PhysicalPlanNode struct {
134139
edges
140+
bounds
135141
id NodeID
136142
Spec PhysicalProcedureSpec
137143

‎planner/plantest/cmp.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package plantest
22

33
import (
44
"fmt"
5+
56
"github.com/google/go-cmp/cmp"
67
"github.com/influxdata/flux/planner"
78
"github.com/influxdata/flux/semantic/semantictest"
@@ -91,6 +92,12 @@ func cmpPlanNode(p, q planner.PlanNode) error {
9192
return fmt.Errorf("wanted %s, but got %s", p.Kind(), q.Kind())
9293
}
9394

95+
// Both nodes must have the same time bounds
96+
if !cmp.Equal(p.Bounds(), q.Bounds()) {
97+
return fmt.Errorf("plan nodes have different bounds -want(%s)/+got(%s) %s",
98+
p.ID(), q.ID(), cmp.Diff(p.Bounds(), q.Bounds()))
99+
}
100+
94101
// The specifications of both procedures must be the same
95102
if !cmp.Equal(p.ProcedureSpec(), q.ProcedureSpec(), semantictest.CmpOptions...) {
96103
return fmt.Errorf("procedure specs not equal -want(%s)/+got(%s) %s",

‎planner/registration.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package planner
22

33
import (
44
"fmt"
5+
"time"
56

67
"github.com/influxdata/flux"
78
uuid "github.com/satori/go.uuid"
@@ -18,6 +19,7 @@ func (id ProcedureID) String() string {
1819

1920
type Administration interface {
2021
ConvertID(flux.OperationID) ProcedureID
22+
Now() time.Time
2123
}
2224

2325
func ProcedureIDFromOperationID(id flux.OperationID) ProcedureID {
@@ -28,6 +30,11 @@ func ProcedureIDFromNodeID(id NodeID) ProcedureID {
2830
return ProcedureID(uuid.NewV5(RootUUID, string(id)))
2931
}
3032

33+
// TODO: Is it necessary to pass in an Administration?
34+
// Currently Administration only converts IDs and provides
35+
// access to the now time. If it is determined that there
36+
// is no need for ProcedureIDs then we could probably just
37+
// pass in the now time directly.
3138
type CreateProcedureSpec func(flux.OperationSpec, Administration) (ProcedureSpec, error)
3239

3340
var kindToProcedure = make(map[ProcedureKind]CreateProcedureSpec)

‎planner/types.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ type PlanNode interface {
1515
// Returns an identifier for this plan node
1616
ID() NodeID
1717

18+
// Returns the time bounds for this plan node
19+
Bounds() *Bounds
20+
1821
// Plan nodes executed immediately before this node
1922
Predecessors() []PlanNode
2023

@@ -29,6 +32,7 @@ type PlanNode interface {
2932

3033
// Helper methods for manipulating a plan
3134
// These methods are used during planning
35+
SetBounds(bounds *Bounds)
3236
AddSuccessors(...PlanNode)
3337
AddPredecessors(...PlanNode)
3438
RemovePredecessor(PlanNode)
@@ -157,6 +161,18 @@ type ProcedureSpec interface {
157161
// ProcedureKind denotes the kind of operation
158162
type ProcedureKind string
159163

164+
type bounds struct {
165+
value *Bounds
166+
}
167+
168+
func (b *bounds) SetBounds(bounds *Bounds) {
169+
b.value = bounds
170+
}
171+
172+
func (b *bounds) Bounds() *Bounds {
173+
return b.value
174+
}
175+
160176
type edges struct {
161177
predecessors []PlanNode
162178
successors []PlanNode

0 commit comments

Comments
 (0)
Please sign in to comment.