Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 91cc5a4

Browse files
author
jlapacik
committedOct 18, 2018
changes from review and test cases
1 parent a4bb895 commit 91cc5a4

File tree

8 files changed

+324
-27
lines changed

8 files changed

+324
-27
lines changed
 

‎functions/transformations/range.go

+22-8
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
plan "github.com/influxdata/flux/planner"
99
"github.com/influxdata/flux/semantic"
1010
"github.com/influxdata/flux/values"
11+
"github.com/pkg/errors"
1112
)
1213

1314
const RangeKind = "range"
@@ -100,12 +101,16 @@ type RangeProcedureSpec struct {
100101
StopCol string
101102
}
102103

103-
// TimeBounds implements plan.BoundedProcedureSpec
104-
func (s *RangeProcedureSpec) TimeBounds() *plan.Bounds {
105-
return &plan.Bounds{
104+
// TimeBounds implements plan.BoundsAwareProcedureSpec
105+
func (s *RangeProcedureSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bounds {
106+
bounds := &plan.Bounds{
106107
Start: values.ConvertTime(s.Bounds.Start.Time(s.Bounds.Now)),
107108
Stop: values.ConvertTime(s.Bounds.Stop.Time(s.Bounds.Now)),
108109
}
110+
if predecessorBounds != nil {
111+
bounds = bounds.Intersect(predecessorBounds)
112+
}
113+
return bounds
109114
}
110115

111116
func newRangeProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
@@ -119,12 +124,21 @@ func newRangeProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.Proc
119124
spec.TimeCol = execute.DefaultTimeColLabel
120125
}
121126

127+
bounds := flux.Bounds{
128+
Start: spec.Start,
129+
Stop: spec.Stop,
130+
Now: pa.Now(),
131+
}
132+
133+
if bounds.HasZero() {
134+
return nil, errors.New(`cannot pass zero time to 'range'`)
135+
}
136+
if bounds.IsEmpty() {
137+
return nil, errors.New("cannot query an empty range")
138+
}
139+
122140
return &RangeProcedureSpec{
123-
Bounds: flux.Bounds{
124-
Start: spec.Start,
125-
Stop: spec.Stop,
126-
Now: pa.Now(),
127-
},
141+
Bounds: bounds,
128142
TimeCol: spec.TimeCol,
129143
StartCol: spec.StartCol,
130144
StopCol: spec.StopCol,

‎functions/transformations/shift.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package transformations
22

33
import (
44
"fmt"
5+
"time"
56

67
"github.com/influxdata/flux"
78
"github.com/influxdata/flux/execute"
@@ -73,9 +74,18 @@ type ShiftProcedureSpec struct {
7374
plan.DefaultCost
7475
Shift flux.Duration
7576
Columns []string
77+
Now time.Time
7678
}
7779

78-
func newShiftProcedure(qs flux.OperationSpec, _ plan.Administration) (plan.ProcedureSpec, error) {
80+
// TimeBounds implements plan.BoundsAwareProcedureSpec
81+
func (s *ShiftProcedureSpec) TimeBounds(predecessorBounds *plan.Bounds) *plan.Bounds {
82+
if predecessorBounds != nil {
83+
return predecessorBounds.Shift(values.Duration(s.Shift))
84+
}
85+
return nil
86+
}
87+
88+
func newShiftProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
7989
spec, ok := qs.(*ShiftOpSpec)
8090
if !ok {
8191
return nil, fmt.Errorf("invalid spec type %T", qs)
@@ -84,6 +94,7 @@ func newShiftProcedure(qs flux.OperationSpec, _ plan.Administration) (plan.Proce
8494
return &ShiftProcedureSpec{
8595
Shift: spec.Shift,
8696
Columns: spec.Columns,
97+
Now: pa.Now(),
8798
}, nil
8899
}
89100

‎planner/bounds.go

+15-12
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ type Bounds struct {
1414
Stop values.Time
1515
}
1616

17-
// BoundedProcedureSpec represents any procedure that
18-
// introduces time bounds for the data it produces.
19-
type BoundedProcedureSpec interface {
20-
TimeBounds() *Bounds
17+
// BoundsAwareProcedureSpec is any procedure
18+
// that modifies the time bounds of its data.
19+
type BoundsAwareProcedureSpec interface {
20+
TimeBounds(predecessorBounds *Bounds) *Bounds
2121
}
2222

2323
// ComputeBounds computes the time bounds for a
@@ -35,15 +35,10 @@ func ComputeBounds(node PlanNode) error {
3535
}
3636
}
3737

38-
s, bdd := node.ProcedureSpec().(BoundedProcedureSpec)
39-
40-
if bdd && bounds != nil {
41-
bounds = bounds.Intersect(s.TimeBounds())
42-
}
43-
if bdd && bounds == nil {
44-
bounds = s.TimeBounds()
38+
if s, ok := node.ProcedureSpec().(BoundsAwareProcedureSpec); ok {
39+
bounds = s.TimeBounds(bounds)
4540
}
46-
node.AddBounds(bounds)
41+
node.SetBounds(bounds)
4742
return nil
4843
}
4944

@@ -105,3 +100,11 @@ func (b *Bounds) Intersect(o *Bounds) *Bounds {
105100

106101
return i
107102
}
103+
104+
// Shift moves the start and stop values of a time range by a specified duration
105+
func (b *Bounds) Shift(d values.Duration) *Bounds {
106+
return &Bounds{
107+
Start: b.Start.Add(d),
108+
Stop: b.Stop.Add(d),
109+
}
110+
}

‎planner/bounds_test.go

+261
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/google/go-cmp/cmp"
88

99
"github.com/influxdata/flux/planner"
10+
"github.com/influxdata/flux/planner/plantest"
1011
"github.com/influxdata/flux/values"
1112
)
1213

@@ -246,3 +247,263 @@ func TestBounds_IsEmpty(t *testing.T) {
246247
})
247248
}
248249
}
250+
251+
// A BoundsAwareProcedureSpec that intersects its bounds with its predecessors' bounds
252+
type mockBoundsIntersectProcedureSpec struct {
253+
planner.DefaultCost
254+
bounds *planner.Bounds
255+
}
256+
257+
func (m *mockBoundsIntersectProcedureSpec) Kind() planner.ProcedureKind {
258+
return "mock-intersect-bounds"
259+
}
260+
261+
func (m *mockBoundsIntersectProcedureSpec) Copy() planner.ProcedureSpec {
262+
return &mockBoundsIntersectProcedureSpec{}
263+
}
264+
265+
func (m *mockBoundsIntersectProcedureSpec) TimeBounds(predecessorBounds *planner.Bounds) *planner.Bounds {
266+
if predecessorBounds != nil {
267+
return predecessorBounds.Intersect(m.bounds)
268+
}
269+
return m.bounds
270+
}
271+
272+
// A BoundsAwareProcedureSpec that shifts its predecessors' bounds
273+
type mockBoundsShiftProcedureSpec struct {
274+
planner.DefaultCost
275+
by values.Duration
276+
}
277+
278+
func (m *mockBoundsShiftProcedureSpec) Kind() planner.ProcedureKind {
279+
return "mock-shift-bounds"
280+
}
281+
282+
func (m *mockBoundsShiftProcedureSpec) Copy() planner.ProcedureSpec {
283+
return &mockBoundsShiftProcedureSpec{}
284+
}
285+
286+
func (m *mockBoundsShiftProcedureSpec) TimeBounds(predecessorBounds *planner.Bounds) *planner.Bounds {
287+
if predecessorBounds != nil {
288+
return predecessorBounds.Shift(m.by)
289+
}
290+
return nil
291+
}
292+
293+
// Create a PlanNode with id and mockBoundsIntersectProcedureSpec
294+
func makeBoundsNode(id string, bounds *planner.Bounds) planner.PlanNode {
295+
return planner.CreatePhysicalNode(planner.NodeID(id),
296+
&mockBoundsIntersectProcedureSpec{
297+
bounds: bounds,
298+
})
299+
}
300+
301+
// Create a PlanNode with id and mockBoundsShiftProcedureSpec
302+
func makeShiftNode(id string, duration values.Duration) planner.PlanNode {
303+
return planner.CreateLogicalNode(planner.NodeID(id),
304+
&mockBoundsShiftProcedureSpec{
305+
by: duration,
306+
})
307+
}
308+
309+
func bounds(start, stop int) *planner.Bounds {
310+
return &planner.Bounds{
311+
Start: values.Time(start),
312+
Stop: values.Time(stop),
313+
}
314+
}
315+
316+
// Test that bounds are propagated up through the plan correctly
317+
func TestBounds_ComputePlanBounds(t *testing.T) {
318+
tests := []struct {
319+
// Name of test
320+
name string
321+
// Nodes and edges defining plan
322+
spec *plantest.PhysicalPlanSpec
323+
// Map from node ID to the expected bounds for that node
324+
want map[planner.NodeID]*planner.Bounds
325+
}{
326+
{
327+
name: "no bounds",
328+
spec: &plantest.PhysicalPlanSpec{
329+
Nodes: []planner.PlanNode{
330+
makeNode("0"),
331+
},
332+
},
333+
want: map[planner.NodeID]*planner.Bounds{
334+
"0": nil,
335+
},
336+
},
337+
{
338+
name: "single time bounds",
339+
// 0 -> 1 -> 2 -> 3 -> 4
340+
spec: &plantest.PhysicalPlanSpec{
341+
Nodes: []planner.PlanNode{
342+
makeNode("0"),
343+
makeNode("1"),
344+
makeBoundsNode("2", bounds(5, 10)),
345+
makeNode("3"),
346+
makeNode("4"),
347+
},
348+
Edges: [][2]int{
349+
{0, 1},
350+
{1, 2},
351+
{2, 3},
352+
{3, 4},
353+
},
354+
},
355+
want: map[planner.NodeID]*planner.Bounds{
356+
"0": nil,
357+
"1": nil,
358+
"2": bounds(5, 10),
359+
"3": bounds(5, 10),
360+
"4": bounds(5, 10)},
361+
},
362+
{
363+
name: "multiple intersect time bounds",
364+
// 0 -> 1 -> 2 -> 3 -> 4
365+
spec: &plantest.PhysicalPlanSpec{
366+
Nodes: []planner.PlanNode{
367+
makeNode("0"),
368+
makeBoundsNode("1", bounds(5, 10)),
369+
makeNode("2"),
370+
makeBoundsNode("3", bounds(7, 11)),
371+
makeNode("4"),
372+
},
373+
Edges: [][2]int{
374+
{0, 1},
375+
{1, 2},
376+
{2, 3},
377+
{3, 4},
378+
},
379+
},
380+
want: map[planner.NodeID]*planner.Bounds{
381+
"0": nil,
382+
"1": bounds(5, 10),
383+
"2": bounds(5, 10),
384+
"3": bounds(7, 10),
385+
"4": bounds(7, 10)},
386+
},
387+
{
388+
name: "shift nil time bounds",
389+
// 0 -> 1 -> 2
390+
spec: &plantest.PhysicalPlanSpec{
391+
Nodes: []planner.PlanNode{
392+
makeNode("0"),
393+
makeShiftNode("1", values.Duration(5)),
394+
makeNode("2"),
395+
},
396+
Edges: [][2]int{
397+
{0, 1},
398+
{1, 2},
399+
},
400+
},
401+
want: map[planner.NodeID]*planner.Bounds{
402+
"0": nil,
403+
"1": nil,
404+
"2": nil,
405+
},
406+
},
407+
{
408+
name: "shift bounds after intersecting bounds",
409+
// 0 -> 1 -> 2 -> 3 -> 4
410+
spec: &plantest.PhysicalPlanSpec{
411+
Nodes: []planner.PlanNode{
412+
makeNode("0"),
413+
makeBoundsNode("1", bounds(5, 10)),
414+
makeNode("2"),
415+
makeShiftNode("3", values.Duration(5)),
416+
makeNode("4"),
417+
},
418+
Edges: [][2]int{
419+
{0, 1},
420+
{1, 2},
421+
{2, 3},
422+
{3, 4},
423+
},
424+
},
425+
want: map[planner.NodeID]*planner.Bounds{
426+
"0": nil,
427+
"1": bounds(5, 10),
428+
"2": bounds(5, 10),
429+
"3": bounds(10, 15),
430+
"4": bounds(10, 15)},
431+
},
432+
{
433+
name: "join",
434+
// 2
435+
// / \
436+
// 0 1
437+
spec: &plantest.PhysicalPlanSpec{
438+
Nodes: []planner.PlanNode{
439+
makeBoundsNode("0", bounds(5, 10)),
440+
makeBoundsNode("1", bounds(12, 20)),
441+
makeNode("2"),
442+
},
443+
Edges: [][2]int{
444+
{0, 2},
445+
{1, 2},
446+
},
447+
},
448+
want: map[planner.NodeID]*planner.Bounds{
449+
"0": bounds(5, 10),
450+
"1": bounds(12, 20),
451+
"2": bounds(5, 20),
452+
},
453+
},
454+
{
455+
name: "yields",
456+
// 3 4
457+
// \ /
458+
// 1 2
459+
// \ /
460+
// 0
461+
spec: &plantest.PhysicalPlanSpec{
462+
Nodes: []planner.PlanNode{
463+
makeNode("0"),
464+
makeBoundsNode("1", bounds(5, 10)),
465+
makeNode("2"),
466+
makeNode("3"),
467+
makeNode("4"),
468+
},
469+
Edges: [][2]int{
470+
{0, 1},
471+
{0, 2},
472+
{1, 3},
473+
{1, 4},
474+
},
475+
},
476+
want: map[planner.NodeID]*planner.Bounds{
477+
"0": nil,
478+
"1": bounds(5, 10),
479+
"2": nil,
480+
"3": bounds(5, 10),
481+
"4": bounds(5, 10),
482+
},
483+
},
484+
}
485+
486+
for _, tc := range tests {
487+
tc := tc
488+
t.Run(tc.name, func(t *testing.T) {
489+
// Create plan from spec
490+
thePlan := plantest.CreatePhysicalPlanSpec(tc.spec)
491+
492+
// Method used to compute the bounds at each node
493+
if err := thePlan.BottomUpWalk(planner.ComputeBounds); err != nil {
494+
t.Fatal(err)
495+
}
496+
497+
// Map NodeID -> Bounds
498+
got := make(map[planner.NodeID]*planner.Bounds)
499+
thePlan.BottomUpWalk(func(n planner.PlanNode) error {
500+
got[n.ID()] = n.Bounds()
501+
return nil
502+
})
503+
504+
if !cmp.Equal(tc.want, got) {
505+
t.Errorf("Did not get expected time bounds, -want/+got:\n%v", cmp.Diff(tc.want, got))
506+
}
507+
})
508+
}
509+
}

‎planner/logical.go

-4
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,6 @@ func createLogicalPlan(spec *flux.Spec) (*PlanSpec, error) {
146146
return nil, err
147147
}
148148

149-
if err := v.plan.BottomUpWalk(ComputeBounds); err != nil {
150-
return nil, err
151-
}
152-
153149
return v.plan, nil
154150
}
155151

‎planner/physical.go

+5
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ func (pp *physicalPlanner) Plan(spec *PlanSpec) (*PlanSpec, error) {
5151
return nil, err
5252
}
5353

54+
// Compute time bounds for nodes in the plan
55+
if err := final.BottomUpWalk(ComputeBounds); err != nil {
56+
return nil, err
57+
}
58+
5459
// Update memory quota
5560
if final.Resources.MemoryBytesQuota == 0 {
5661
final.Resources.MemoryBytesQuota = pp.defaultMemoryLimit

‎planner/plantest/cmp.go

+7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package plantest
22

33
import (
44
"fmt"
5+
56
"github.com/google/go-cmp/cmp"
67
"github.com/influxdata/flux/planner"
78
"github.com/influxdata/flux/semantic/semantictest"
@@ -91,6 +92,12 @@ func cmpPlanNode(p, q planner.PlanNode) error {
9192
return fmt.Errorf("wanted %s, but got %s", p.Kind(), q.Kind())
9293
}
9394

95+
// Both nodes must have the same time bounds
96+
if !cmp.Equal(p.Bounds(), q.Bounds()) {
97+
return fmt.Errorf("plan nodes have different bounds -want(%s)/+got(%s) %s",
98+
p.ID(), q.ID(), cmp.Diff(p.Bounds(), q.Bounds()))
99+
}
100+
94101
// The specifications of both procedures must be the same
95102
if !cmp.Equal(p.ProcedureSpec(), q.ProcedureSpec(), semantictest.CmpOptions...) {
96103
return fmt.Errorf("procedure specs not equal -want(%s)/+got(%s) %s",

‎planner/types.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ type PlanNode interface {
3232

3333
// Helper methods for manipulating a plan
3434
// These methods are used during planning
35-
AddBounds(bounds *Bounds)
35+
SetBounds(bounds *Bounds)
3636
AddSuccessors(...PlanNode)
3737
AddPredecessors(...PlanNode)
3838
RemovePredecessor(PlanNode)
@@ -165,7 +165,7 @@ type bounds struct {
165165
value *Bounds
166166
}
167167

168-
func (b *bounds) AddBounds(bounds *Bounds) {
168+
func (b *bounds) SetBounds(bounds *Bounds) {
169169
b.value = bounds
170170
}
171171

0 commit comments

Comments
 (0)
Please sign in to comment.