Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 6f95f8d

Browse files
fchikwekwenathanielc
andauthoredJan 22, 2021
fix(interval): clean up interval package (#3440)
* feat(interval): adds new interval package for consistent window handling The interval package is intended to replace the execute.Window and execute.Bounds types creating a consistent and well encapsulated implementation of the windowing behavior of Flux with calendar durations. * feat(intervals): use start oriented windows This makes three final adjustments 1. Intervals are start oriented, this seems like the most user friendly choice 2. Changes window API to GetLatestBounds instead of earliest because its more efficient and consumers of the API are not picky 3. Makes it so that adding months to the last day of a month moves forward to the last day of the month * fix(interval): clean up interval package * fix(values): revert end of month behavior Co-authored-by: Nathaniel Cook <nvcook42@gmail.com>
1 parent bf026f0 commit 6f95f8d

File tree

16 files changed

+8966
-1
lines changed

16 files changed

+8966
-1
lines changed
 

‎execute/window_test.go

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/influxdata/flux/codes"
99
"github.com/influxdata/flux/execute"
1010
"github.com/influxdata/flux/internal/errors"
11+
"github.com/influxdata/flux/interval"
1112
"github.com/influxdata/flux/values"
1213
)
1314

@@ -509,3 +510,265 @@ func errAsString(err error) (s string) {
509510
}
510511
return s
511512
}
513+
514+
func mustWindow(every, period, offset execute.Duration) execute.Window {
515+
w, err := execute.NewWindow(every, period, offset)
516+
if err != nil {
517+
panic(err)
518+
}
519+
return w
520+
}
521+
522+
func mustWindowInterval(every, period, offset execute.Duration) interval.Window {
523+
w, err := interval.NewWindow(every, period, offset)
524+
if err != nil {
525+
panic(err)
526+
}
527+
return w
528+
}
529+
530+
func mustTime(s string) execute.Time {
531+
t, err := time.Parse(time.RFC3339Nano, s)
532+
if err != nil {
533+
panic(err)
534+
}
535+
return values.ConvertTime(t)
536+
}
537+
538+
func BenchmarkNewWindow(b *testing.B) {
539+
var testcases = []struct {
540+
name string
541+
w execute.Window
542+
t execute.Time
543+
want execute.Bounds
544+
}{
545+
{
546+
name: "no interval - simple",
547+
w: mustWindow(
548+
values.ConvertDurationNsecs(5*time.Minute),
549+
values.ConvertDurationNsecs(5*time.Minute),
550+
values.ConvertDurationNsecs(0)),
551+
t: execute.Time(6 * time.Minute),
552+
want: execute.Bounds{
553+
Start: execute.Time(5 * time.Minute),
554+
Stop: execute.Time(10 * time.Minute),
555+
},
556+
},
557+
{
558+
name: "no interval - simple with negative period",
559+
w: mustWindow(
560+
values.ConvertDurationNsecs(5*time.Minute),
561+
values.ConvertDurationNsecs(-5*time.Minute),
562+
values.ConvertDurationNsecs(30*time.Second)),
563+
t: execute.Time(5 * time.Minute),
564+
want: execute.Bounds{
565+
Start: execute.Time(10*time.Minute + 30*time.Second),
566+
Stop: execute.Time(5*time.Minute + 30*time.Second),
567+
},
568+
},
569+
{
570+
name: "no interval - simple with offset",
571+
w: mustWindow(
572+
values.ConvertDurationNsecs(5*time.Minute),
573+
values.ConvertDurationNsecs(5*time.Minute),
574+
values.ConvertDurationNsecs(30*time.Second)),
575+
t: execute.Time(5 * time.Minute),
576+
want: execute.Bounds{
577+
Start: execute.Time(30 * time.Second),
578+
Stop: execute.Time(5*time.Minute + 30*time.Second),
579+
},
580+
},
581+
{
582+
name: "no interval - simple with negative offset",
583+
w: mustWindow(
584+
values.ConvertDurationNsecs(5*time.Minute),
585+
values.ConvertDurationNsecs(5*time.Minute),
586+
values.ConvertDurationNsecs(-30*time.Second)),
587+
t: execute.Time(5 * time.Minute),
588+
want: execute.Bounds{
589+
Start: execute.Time(4*time.Minute + 30*time.Second),
590+
Stop: execute.Time(9*time.Minute + 30*time.Second),
591+
},
592+
},
593+
{
594+
name: "no interval - simple with equal offset before",
595+
w: mustWindow(
596+
values.ConvertDurationNsecs(5*time.Minute),
597+
values.ConvertDurationNsecs(5*time.Minute),
598+
values.ConvertDurationNsecs(5*time.Minute)),
599+
t: execute.Time(0),
600+
want: execute.Bounds{
601+
Start: execute.Time(0 * time.Minute),
602+
Stop: execute.Time(5 * time.Minute),
603+
},
604+
},
605+
{
606+
name: "no interval - simple with equal offset after",
607+
w: mustWindow(
608+
values.ConvertDurationNsecs(5*time.Minute),
609+
values.ConvertDurationNsecs(5*time.Minute),
610+
values.ConvertDurationNsecs(5*time.Minute)),
611+
t: execute.Time(7 * time.Minute),
612+
want: execute.Bounds{
613+
Start: execute.Time(5 * time.Minute),
614+
Stop: execute.Time(10 * time.Minute),
615+
},
616+
},
617+
{
618+
name: "no interval - simple months",
619+
w: mustWindow(
620+
values.ConvertDurationMonths(5),
621+
values.ConvertDurationMonths(5),
622+
values.ConvertDurationMonths(0)),
623+
t: mustTime("1970-01-01T00:00:00Z"),
624+
want: execute.Bounds{
625+
Start: mustTime("1970-01-01T00:00:00Z"),
626+
Stop: mustTime("1970-06-01T00:00:00Z"),
627+
},
628+
},
629+
{
630+
name: "no interval - simple months with offset",
631+
w: mustWindow(
632+
values.ConvertDurationMonths(3),
633+
values.ConvertDurationMonths(3),
634+
values.ConvertDurationMonths(1)),
635+
t: mustTime("1970-01-01T00:00:00Z"),
636+
want: execute.Bounds{
637+
Start: mustTime("1969-11-01T00:00:00Z"),
638+
Stop: mustTime("1970-02-01T00:00:00Z"),
639+
},
640+
},
641+
}
642+
for _, tc := range testcases {
643+
b.Run(tc.name, func(b *testing.B) {
644+
for i := 0; i < b.N; i++ {
645+
got := tc.w.GetEarliestBounds(tc.t)
646+
if got != tc.want {
647+
b.Errorf("unexpected boundary: got %s want %s", got, tc.want)
648+
}
649+
}
650+
})
651+
652+
}
653+
}
654+
655+
type testBounds struct {
656+
Start values.Time
657+
Stop values.Time
658+
}
659+
660+
func BenchmarkNewWindowInterval(b *testing.B) {
661+
var testcases = []struct {
662+
name string
663+
w interval.Window
664+
t execute.Time
665+
want testBounds
666+
}{
667+
{
668+
name: "interval - simple",
669+
w: mustWindowInterval(
670+
values.ConvertDurationNsecs(5*time.Minute),
671+
values.ConvertDurationNsecs(5*time.Minute),
672+
values.ConvertDurationNsecs(0)),
673+
t: execute.Time(6 * time.Minute),
674+
want: testBounds{
675+
Start: execute.Time(5 * time.Minute),
676+
Stop: execute.Time(10 * time.Minute),
677+
},
678+
},
679+
{
680+
name: "interval - simple with negative period",
681+
w: mustWindowInterval(
682+
values.ConvertDurationNsecs(5*time.Minute),
683+
values.ConvertDurationNsecs(-5*time.Minute),
684+
values.ConvertDurationNsecs(30*time.Second)),
685+
t: execute.Time(5 * time.Minute),
686+
want: testBounds{
687+
Start: execute.Time(30 * time.Second),
688+
Stop: execute.Time(5*time.Minute + 30*time.Second),
689+
},
690+
},
691+
{
692+
name: "interval - simple with offset",
693+
w: mustWindowInterval(
694+
values.ConvertDurationNsecs(5*time.Minute),
695+
values.ConvertDurationNsecs(5*time.Minute),
696+
values.ConvertDurationNsecs(30*time.Second)),
697+
t: execute.Time(5 * time.Minute),
698+
want: testBounds{
699+
Start: execute.Time(30 * time.Second),
700+
Stop: execute.Time(5*time.Minute + 30*time.Second),
701+
},
702+
},
703+
{
704+
name: "interal - simple with negative offset",
705+
w: mustWindowInterval(
706+
values.ConvertDurationNsecs(5*time.Minute),
707+
values.ConvertDurationNsecs(5*time.Minute),
708+
values.ConvertDurationNsecs(-30*time.Second)),
709+
t: execute.Time(5 * time.Minute),
710+
want: testBounds{
711+
Start: execute.Time(4*time.Minute + 30*time.Second),
712+
Stop: execute.Time(9*time.Minute + 30*time.Second),
713+
},
714+
},
715+
{
716+
name: "interval - simple with equal offset before",
717+
w: mustWindowInterval(
718+
values.ConvertDurationNsecs(5*time.Minute),
719+
values.ConvertDurationNsecs(5*time.Minute),
720+
values.ConvertDurationNsecs(5*time.Minute)),
721+
t: execute.Time(0),
722+
want: testBounds{
723+
Start: execute.Time(0 * time.Minute),
724+
Stop: execute.Time(5 * time.Minute),
725+
},
726+
},
727+
{
728+
name: "interval - simple with equal offset after",
729+
w: mustWindowInterval(
730+
values.ConvertDurationNsecs(5*time.Minute),
731+
values.ConvertDurationNsecs(5*time.Minute),
732+
values.ConvertDurationNsecs(5*time.Minute)),
733+
t: execute.Time(7 * time.Minute),
734+
want: testBounds{
735+
Start: execute.Time(5 * time.Minute),
736+
Stop: execute.Time(10 * time.Minute),
737+
},
738+
},
739+
{
740+
name: "interval - simple months",
741+
w: mustWindowInterval(
742+
values.ConvertDurationMonths(5),
743+
values.ConvertDurationMonths(5),
744+
values.ConvertDurationMonths(0)),
745+
t: mustTime("1970-01-01T00:00:00Z"),
746+
want: testBounds{
747+
Start: mustTime("1970-01-01T00:00:00Z"),
748+
Stop: mustTime("1970-06-01T00:00:00Z"),
749+
},
750+
},
751+
{
752+
name: "interval - simple months with offset",
753+
w: mustWindowInterval(
754+
values.ConvertDurationMonths(3),
755+
values.ConvertDurationMonths(3),
756+
values.ConvertDurationMonths(1)),
757+
t: mustTime("1970-01-01T00:00:00Z"),
758+
want: testBounds{
759+
Start: mustTime("1969-11-01T00:00:00Z"),
760+
Stop: mustTime("1970-02-01T00:00:00Z"),
761+
},
762+
},
763+
}
764+
for _, tc := range testcases {
765+
b.Run(tc.name, func(b *testing.B) {
766+
for i := 0; i < b.N; i++ {
767+
got := tc.w.GetLatestBounds(tc.t)
768+
if got.Start() != tc.want.Start {
769+
b.Errorf("unexpected start boundary: got %s want %s", got.Start(), tc.want.Start)
770+
}
771+
}
772+
})
773+
}
774+
}

‎interval/bounds.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package interval
2+
3+
import (
4+
"fmt"
5+
"math"
6+
7+
"github.com/influxdata/flux/values"
8+
)
9+
10+
const (
11+
MaxTime = math.MaxInt64
12+
MinTime = math.MinInt64
13+
)
14+
15+
type Bounds struct {
16+
start values.Time
17+
stop values.Time
18+
// index keeps track of how many windows have been added or subtracted as additional
19+
// windows are added to or subtracted from the initial bounds. In essence, it tracks the
20+
// offset from the original bounds in order to keep operations more straightforward.
21+
// See the Window struct and the window tests for additional info.
22+
index int
23+
}
24+
25+
func (b Bounds) Start() values.Time {
26+
return b.start
27+
}
28+
29+
func (b Bounds) Stop() values.Time {
30+
return b.stop
31+
}
32+
33+
func (b Bounds) IsEmpty() bool {
34+
return b.start >= b.stop
35+
}
36+
37+
func (b Bounds) String() string {
38+
return fmt.Sprintf("[%v, %v)", b.start, b.stop)
39+
}
40+
41+
func (b Bounds) Contains(t values.Time) bool {
42+
return t >= b.start && t < b.stop
43+
}
44+
45+
func (b Bounds) Overlaps(o Bounds) bool {
46+
return b.Contains(o.start) || (b.Contains(o.stop) && o.stop > b.start) || o.Contains(b.start)
47+
}
48+
49+
func (b Bounds) Equal(o Bounds) bool {
50+
return b == o
51+
}
52+
53+
func (b Bounds) Length() values.Duration {
54+
if b.IsEmpty() {
55+
return values.ConvertDurationNsecs(0)
56+
}
57+
return b.stop.Sub(b.start)
58+
}

‎interval/bounds_test.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package interval
2+
3+
import (
4+
"testing"
5+
6+
"github.com/influxdata/flux/values"
7+
)
8+
9+
var tests = []struct {
10+
name string
11+
a, b Bounds
12+
want bool
13+
}{
14+
{
15+
name: "edge overlap",
16+
a: Bounds{
17+
start: values.Time(0),
18+
stop: values.Time(10),
19+
},
20+
b: Bounds{
21+
start: values.Time(10),
22+
stop: values.Time(20),
23+
},
24+
25+
want: false,
26+
},
27+
{
28+
name: "edge overlap sym",
29+
a: Bounds{
30+
start: values.Time(10),
31+
stop: values.Time(20),
32+
},
33+
b: Bounds{
34+
start: values.Time(0),
35+
stop: values.Time(10),
36+
},
37+
want: false,
38+
},
39+
{
40+
name: "single overlap",
41+
a: Bounds{
42+
start: values.Time(0),
43+
stop: values.Time(10),
44+
},
45+
b: Bounds{
46+
start: values.Time(5),
47+
stop: values.Time(15),
48+
},
49+
want: true,
50+
},
51+
{
52+
name: "no overlap sym",
53+
a: Bounds{
54+
start: values.Time(0),
55+
stop: values.Time(10),
56+
},
57+
b: Bounds{
58+
start: values.Time(5),
59+
stop: values.Time(15),
60+
},
61+
want: true,
62+
},
63+
{
64+
name: "double overlap (bounds contained)",
65+
a: Bounds{
66+
start: values.Time(10),
67+
stop: values.Time(20),
68+
},
69+
b: Bounds{
70+
start: values.Time(14),
71+
stop: values.Time(15),
72+
},
73+
want: true,
74+
},
75+
{
76+
name: "double overlap (bounds contained) sym",
77+
a: Bounds{
78+
start: values.Time(14),
79+
stop: values.Time(15),
80+
},
81+
b: Bounds{
82+
start: values.Time(10),
83+
stop: values.Time(20),
84+
},
85+
want: true,
86+
},
87+
}
88+
89+
// Written to verify symmetrical behavior of interval.(Bounds).Overlaps
90+
// Given two Bounds a and b, if a.Overlaps(b) then b.Overlaps(a).
91+
//
92+
// Cases:
93+
// given two ranges [a1, a2), [b1, b2)
94+
// a1 <= b1 <= a2 <= b2 -> true
95+
// b1 <= a1 <= b2 <= a2 -> true
96+
// a1 <= b1 <= b2 <= a2 -> true
97+
// b2 <= a1 <= a2 <= b2 -> true
98+
// a1 <= a2 <= b1 <= b2 -> false
99+
// b1 <= b2 <= a1 <= a2 -> false
100+
func TestBounds_Overlaps(t *testing.T) {
101+
for _, tt := range tests {
102+
t.Run(tt.name, func(t *testing.T) {
103+
if got := tt.a.Overlaps(tt.b); got != tt.want {
104+
t.Errorf("Bounds.Overlaps() = %v, want %v", got, tt.want)
105+
}
106+
})
107+
}
108+
}

‎interval/window.go

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
package interval
2+
3+
import (
4+
"github.com/influxdata/flux/codes"
5+
"github.com/influxdata/flux/internal/errors"
6+
"github.com/influxdata/flux/values"
7+
)
8+
9+
const epoch = values.Time(0)
10+
11+
var epochYear, epochMonth int64
12+
13+
func init() {
14+
ts := epoch.Time()
15+
y, m, _ := ts.Date()
16+
epochYear = int64(y)
17+
epochMonth = int64(m - 1)
18+
}
19+
20+
// TODO(nathanielc): Make the epoch a parameter to the window
21+
// See https://github.com/influxdata/flux/issues/2093
22+
//
23+
// Window is a description of an infinite set of boundaries in time.
24+
type Window struct {
25+
// The ith window start is expressed via this equation:
26+
// window_start_i = zero + every * i
27+
// window_stop_i = zero + every * i + period
28+
every values.Duration
29+
period values.Duration
30+
zero values.Time
31+
zeroMonths int64
32+
}
33+
34+
// NewWindow creates a window which can be used to determine the boundaries for a given point.
35+
// Window boundaries start at the epoch plus the offset.
36+
// Each subsequent window starts at a multiple of the every duration.
37+
// Each window's length is the start boundary plus the period.
38+
// Every must not be a mix of months and nanoseconds in order to preserve constant time bounds lookup.
39+
func NewWindow(every, period, offset values.Duration) (Window, error) {
40+
zero := epoch.Add(offset)
41+
w := Window{
42+
every: every,
43+
period: period,
44+
zero: zero,
45+
zeroMonths: monthsSince(zero),
46+
}
47+
if err := w.isValid(); err != nil {
48+
return Window{}, err
49+
}
50+
return w, nil
51+
}
52+
53+
func (w Window) isValid() error {
54+
if w.every.IsZero() {
55+
return errors.New(codes.Invalid, "duration used as an interval cannot be zero")
56+
}
57+
if w.every.IsMixed() {
58+
const docURL = "https://v2.docs.influxdata.com/v2.0/reference/flux/stdlib/built-in/transformations/window/#calendar-months-and-years"
59+
return errors.New(codes.Invalid, "duration used as an interval cannot mix month and nanosecond units").
60+
WithDocURL(docURL)
61+
}
62+
if w.every.IsNegative() {
63+
return errors.New(codes.Invalid, "duration used as an interval cannot be negative")
64+
}
65+
return nil
66+
}
67+
68+
// GetLatestBounds returns the bounds for the latest window bounds that contains the given time t.
69+
// For underlapping windows that do not contain time t, the window directly before time t will be returned.
70+
func (w Window) GetLatestBounds(t values.Time) Bounds {
71+
// Get the latest index that should contain the time t
72+
index := w.lastIndex(t)
73+
// Construct the bounds from the index
74+
start := w.zero.Add(w.every.Mul(index))
75+
b := Bounds{
76+
start: start,
77+
stop: start.Add(w.period),
78+
index: index,
79+
}
80+
// If the period is negative its possible future bounds can still contain this point
81+
if w.period.IsNegative() {
82+
// swap start and stop since the period was negative
83+
b.start, b.stop = b.stop, b.start
84+
// If period is NOT mixed we can do a direct calculation
85+
// to determine how far into the future a bounds may be found.
86+
if !w.period.IsMixed() {
87+
// Since its not mixed we can adjust the index closer based
88+
// on how many windows a period can span
89+
var period, every int64
90+
if w.every.MonthsOnly() {
91+
every = w.every.Months()
92+
period = w.period.Months()
93+
} else {
94+
every = w.every.Nanoseconds()
95+
period = w.period.Nanoseconds()
96+
}
97+
if period > every {
98+
indexDelta := period / every
99+
index += int(indexDelta)
100+
}
101+
}
102+
// Now do a direct search
103+
next := w.NextBounds(b)
104+
for next.Contains(t) {
105+
b = next
106+
next = w.NextBounds(next)
107+
}
108+
}
109+
return b
110+
}
111+
112+
// GetOverlappingBounds returns a slice of bounds that overlaps the input bounds.
113+
// The returned set of bounds are ordered by decreasing time.
114+
func (w Window) GetOverlappingBounds(start, stop values.Time) []Bounds {
115+
bounds := Bounds{
116+
start: start,
117+
stop: stop,
118+
}
119+
if bounds.IsEmpty() {
120+
return []Bounds{}
121+
}
122+
123+
// Estimate the number of windows by using a rough approximation.
124+
count := (bounds.Length().Duration() / w.every.Duration()) + (w.period.Duration() / w.every.Duration())
125+
bs := make([]Bounds, 0, count)
126+
127+
curr := w.GetLatestBounds(stop)
128+
for curr.stop > start {
129+
if curr.Overlaps(bounds) {
130+
bs = append(bs, curr)
131+
}
132+
curr = w.PrevBounds(curr)
133+
}
134+
135+
return bs
136+
}
137+
138+
// NextBounds returns the next boundary in sequence from the given boundary.
139+
func (w Window) NextBounds(b Bounds) Bounds {
140+
index := b.index + 1
141+
start := w.zero.Add(w.every.Mul(index))
142+
stop := start.Add(w.period)
143+
if w.period.IsNegative() {
144+
start, stop = stop, start
145+
}
146+
return Bounds{
147+
start: start,
148+
stop: stop,
149+
index: index,
150+
}
151+
}
152+
153+
// PrevBounds returns the previous boundary in sequence from the given boundary.
154+
func (w Window) PrevBounds(b Bounds) Bounds {
155+
index := b.index - 1
156+
start := w.zero.Add(w.every.Mul(index))
157+
stop := start.Add(w.period)
158+
if w.period.IsNegative() {
159+
start, stop = stop, start
160+
}
161+
return Bounds{
162+
start: start,
163+
stop: stop,
164+
index: index,
165+
}
166+
}
167+
168+
// lastIndex will compute the index of the last bounds to contain t
169+
func (w Window) lastIndex(t values.Time) int {
170+
// We treat both nanoseconds and months as the space of whole numbers (aka integers).
171+
// This keeps the math the same once we transform into the correct space.
172+
// For months, we operate in the number of months since the epoch.
173+
// For nanoseconds, we operate in the number of nanoseconds since the epoch.
174+
if w.every.MonthsOnly() {
175+
target := monthsSince(t)
176+
// Check if the target day and time of the month is before the zero day and time of the month.
177+
// If it is, that means that in _months_ space we are really in the previous month.
178+
if isBeforeWithinMonth(t, w.zero) {
179+
target -= 1
180+
}
181+
return lastIndex(w.zeroMonths, target, w.every.Months())
182+
}
183+
return lastIndex(int64(w.zero), int64(t), w.every.Nanoseconds())
184+
}
185+
186+
// lastIndex computes the index where zero + every * index ≤ target
187+
// The zero, target and every values can be in any units so long as they are consistent and zero based.
188+
func lastIndex(zero, target, every int64) int {
189+
// Given
190+
// zero + every * index ≤ target
191+
// Therefore
192+
// index ≤ (target - zero) / every
193+
// We want to find the most positive index where the above is true
194+
195+
// Example: Positive Index
196+
// zero = 3 target = 14 every = 5
197+
// Number line with window starts marked:
198+
// -2 -1 0 1 2 |3 4 5 6 7 |8 9 10 11 12 |13 14 15 16 17
199+
// 0 1 2
200+
// We can see that the index we want is 2
201+
// (target - zero) /every
202+
// = (14 - 3) / 5
203+
// = 11 / 5
204+
// = 2
205+
// We do not adjust because the delta was positive
206+
207+
// Example: Positive Index on boundary
208+
// zero = 3 target = 13 every = 5
209+
// Number line with window starts marked:
210+
// -2 -1 0 1 2 |3 4 5 6 7 |8 9 10 11 12 |13 14 15 16 17
211+
// 0 1 2
212+
// We can see that the index we want is 2
213+
// (target - zero) /every
214+
// = (13 - 3) / 5
215+
// = 10 / 5
216+
// = 2
217+
// We do not adjust because the delta was positive
218+
219+
// Example: Negative Index
220+
// zero = 3 target = -9 every = 5
221+
// Number line with window starts marked:
222+
// |-12 -11 -10 -9 -8 |-7 -6 -5 -4 -3 |-2 -1 0 1 2 |3 4 5 6 7
223+
// -3 -2 -1 0
224+
// We can see that the index we want is -3
225+
// (target - zero) /every
226+
// = (-9 - 3) / 5
227+
// = -12 / 5
228+
// = -2
229+
// We have to adjust by 1 because the delta was negative
230+
// and we get -3
231+
232+
// Example: Negative Index on boundary
233+
// zero = 3 target = -7 every = 5
234+
// Number line with window starts marked:
235+
// |-12 -11 -10 -9 -8 |-7 -6 -5 -4 -3 |-2 -1 0 1 2 |3 4 5 6 7
236+
// -3 -2 -1 0
237+
// We can see that the index we want is -2
238+
// (target - zero) /every
239+
// = (-7 - 3) / 5
240+
// = -10 / 5
241+
// = -2
242+
// This time we land right on the boundary, since we are lower inclusive
243+
// we do not need to adjust.
244+
245+
delta := target - zero
246+
index := delta / every
247+
248+
// For targets before the zero we need to adjust the index,
249+
// but only if we did not land right on the boundary.
250+
if delta < 0 && delta%every != 0 {
251+
index -= 1
252+
}
253+
return int(index)
254+
}
255+
256+
// monthsSince converts a time into the number of months since the unix epoch
257+
func monthsSince(t values.Time) int64 {
258+
ts := t.Time()
259+
year, month, _ := ts.Date()
260+
return (int64(year)-epochYear)*12 + int64(month-1) - epochMonth
261+
}
262+
263+
// isBeforeWithinMonth reports whether a comes before b within the month.
264+
// The year and month of a and b are not relevant.
265+
func isBeforeWithinMonth(a, b values.Time) bool {
266+
at := a.Time()
267+
bt := b.Time()
268+
ad := at.Day()
269+
bd := bt.Day()
270+
if ad > bd {
271+
return false
272+
}
273+
if ad < bd {
274+
return true
275+
}
276+
277+
ah, am, as := at.Clock()
278+
bh, bm, bs := bt.Clock()
279+
if ah > bh {
280+
return false
281+
}
282+
if ah < bh {
283+
return true
284+
}
285+
if am > bm {
286+
return false
287+
}
288+
if am < bm {
289+
return true
290+
}
291+
if as > bs {
292+
return false
293+
}
294+
if as < bs {
295+
return true
296+
}
297+
an := at.Nanosecond()
298+
bn := bt.Nanosecond()
299+
if an > bn {
300+
return false
301+
}
302+
if an < bn {
303+
return true
304+
}
305+
return false
306+
}
307+
308+
//TODO
309+
// Add tests very far away from the epoch

‎interval/window_test.go

Lines changed: 1043 additions & 0 deletions
Large diffs are not rendered by default.

‎libflux/go/libflux/buildinfo.gen.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,8 @@ var sourceHashes = map[string]string{
216216
"stdlib/internal/testutil/testutil.flux": "fbbfd21ec43984db50f086ef9b0cc84beeb00569a0a34d83626d0efabdeb1fae",
217217
"stdlib/interpolate/interpolate.flux": "f3d620793dc5e560f14eb386729b3e476bd6016ab904a977956d346710a7050f",
218218
"stdlib/interpolate/interpolate_test.flux": "914918a01bc667aa1f4ded78db5b6e126870306cf177b7be74fac4d10804bbed",
219+
"stdlib/interval/interval.flux": "abcbefdd0a083e6d00beacef7ad839f59f020b520e0a596285cdf03fca4f374b",
220+
"stdlib/interval/interval_test.flux": "faf505bc406a7e891dda1c97daaf8153d36c7cd0f7cae5f2780d96a278267f6d",
219221
"stdlib/json/json.flux": "dc7d8a8e29bb69dd2c0aa5e94ddc4cdcaa754dc7b4e8f81aa9775954f28142a2",
220222
"stdlib/kafka/kafka.flux": "dd58fc35a7207bb0fb1457d35c31db7696ef819c2e0748d1e36700ee563ec77f",
221223
"stdlib/math/math.flux": "a1201b200ba955b42296fe06efe6477a49af5c2fd2c5363409050679b8522ed7",

‎stdlib/flux_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,14 @@ var skip = map[string]map[string]string{
3434
"drop_referenced": "need to support known errors in new test framework (https://github.com/influxdata/flux/issues/536)",
3535
"yield": "yield requires special test case (https://github.com/influxdata/flux/issues/535)",
3636
"task_per_line": "join produces inconsistent/racy results when table schemas do not match (https://github.com/influxdata/flux/issues/855)",
37-
"integral_columns": "aggregates changed to operate on just a single columnm.",
37+
"integral_columns": "aggregates changed to operate on just a single column",
3838
},
3939
"http": {
4040
"http_endpoint": "need ability to test side effects in e2e tests: https://github.com/influxdata/flux/issues/1723)",
4141
},
42+
"interval": {
43+
"interval": "switch these tests cases to produce a non-table stream once that is supported (https://github.com/influxdata/flux/issues/535)",
44+
},
4245
"testing/chronograf": {
4346
"measurement_tag_keys": "unskip chronograf flux tests once filter is refactored (https://github.com/influxdata/flux/issues/1289)",
4447
"aggregate_window_mean": "unskip chronograf flux tests once filter is refactored (https://github.com/influxdata/flux/issues/1289)",

‎stdlib/interval/flux_gen.go

Lines changed: 949 additions & 0 deletions
Large diffs are not rendered by default.

‎stdlib/interval/flux_test_gen.go

Lines changed: 6025 additions & 0 deletions
Large diffs are not rendered by default.

‎stdlib/interval/interval.flux

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package interval
2+
3+
4+
builtin intervals : (
5+
every: A,
6+
period: B,
7+
offset: C,
8+
) => (
9+
start: D,
10+
stop: E,
11+
) => [{
12+
start: time,
13+
stop: time,
14+
}] where
15+
A: Timeable,
16+
B: Timeable,
17+
C: Timeable,
18+
D: Timeable,
19+
E: Timeable

‎stdlib/interval/interval.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package interval
2+
3+
import (
4+
"context"
5+
6+
"github.com/influxdata/flux/codes"
7+
"github.com/influxdata/flux/execute"
8+
"github.com/influxdata/flux/internal/errors"
9+
"github.com/influxdata/flux/interval"
10+
"github.com/influxdata/flux/runtime"
11+
"github.com/influxdata/flux/semantic"
12+
"github.com/influxdata/flux/values"
13+
)
14+
15+
func init() {
16+
typ := runtime.MustLookupBuiltinType("interval", "intervals")
17+
rt, _ := typ.ReturnType()
18+
arrTyp, _ := rt.ReturnType()
19+
runtime.RegisterPackageValue("interval", "intervals", values.NewFunction(
20+
"intervals",
21+
typ,
22+
func(ctx context.Context, args values.Object) (values.Value, error) {
23+
every, err := getDuration(args, "every")
24+
if err != nil {
25+
return nil, err
26+
}
27+
period, err := getDuration(args, "period")
28+
if err != nil {
29+
return nil, err
30+
}
31+
offset, err := getDuration(args, "offset")
32+
if err != nil {
33+
return nil, err
34+
}
35+
w, err := interval.NewWindow(every, period, offset)
36+
if err != nil {
37+
return nil, err
38+
}
39+
return values.NewFunction("intervals", rt, func(ctx context.Context, args values.Object) (values.Value, error) {
40+
start, err := getTimeable(ctx, args, "start")
41+
if err != nil {
42+
return nil, err
43+
}
44+
stop, err := getTimeable(ctx, args, "stop")
45+
if err != nil {
46+
return nil, err
47+
}
48+
49+
bounds := w.GetOverlappingBounds(start, stop)
50+
elements := make([]values.Value, len(bounds))
51+
for i := range bounds {
52+
elements[i] = values.NewObjectWithValues(map[string]values.Value{
53+
"start": values.NewTime(bounds[i].Start()),
54+
"stop": values.NewTime(bounds[i].Stop()),
55+
})
56+
}
57+
return values.NewArrayWithBacking(arrTyp, elements), nil
58+
59+
},
60+
false,
61+
), nil
62+
},
63+
false,
64+
))
65+
}
66+
67+
func getDuration(args values.Object, name string) (values.Duration, error) {
68+
v, ok := args.Get(name)
69+
if !ok {
70+
return values.Duration{}, errors.Newf(codes.Internal, "unexpected missing argument %q", name)
71+
}
72+
if v.Type().Nature() != semantic.Duration {
73+
return values.Duration{}, errors.Newf(codes.Internal, "unexpected argument %q type %v", name, v.Type())
74+
}
75+
return v.Duration(), nil
76+
}
77+
func getTimeable(ctx context.Context, args values.Object, name string) (values.Time, error) {
78+
t, ok := args.Get(name)
79+
if !ok {
80+
return 0, errors.Newf(codes.Internal, "unexpected missing argument %q", name)
81+
}
82+
switch t.Type().Nature() {
83+
case semantic.Time:
84+
return t.Time(), nil
85+
case semantic.Duration:
86+
deps := execute.GetExecutionDependencies(ctx)
87+
nowTime := *deps.Now
88+
return values.ConvertTime(nowTime).Add(t.Duration()), nil
89+
default:
90+
return 0, errors.Newf(codes.Internal, "unexpected type of argument %q, type: %v", name, t.Type())
91+
}
92+
93+
}

‎stdlib/interval/interval_test.flux

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// Three rules:
2+
// 1. When you add a duration to a time, you expect all smaller units of time to remain the same.
3+
// 2. When you add a duration to a time, you expect the unit being added to change by the amount specified.
4+
// 3. When you cannot achieve rule 1, then follow rule 2.
5+
//
6+
// Example cases:
7+
// a) When adding a day, you expect the time of day to remain the same.
8+
// b) When adding a month, you expect the day of the month and the time of day to remain the same.
9+
//
10+
// In cases that do not follow rule 1, follow rule 2.
11+
// Example case:
12+
// a) When adding one month to Jan 31, the day of the month cannot remain the same so the last day of February should be selected.
13+
// b) Using any days in march would violate rule 2 because it would increase the month unit by 2 even though you only added 1 month.
14+
//
15+
//
16+
// Mixed durations do not commute or associate so when they are involved in the algebra of determining a window they make the math hard/impossible.
17+
// Therefore we have certain restrictions in order to make window lookup constant.
18+
//
19+
// Possible equations for window definitions:
20+
//
21+
// Option 1
22+
// window_start_i = (epoch + offset) + every * i
23+
// window_stop_i = window_start_i + period
24+
//
25+
// Option 2
26+
// window_start_i = epoch + every * i + offset
27+
// window_stop_i = epoch + every * i + period + offset
28+
//
29+
// Known edge cases
30+
// a) When you can't follow rule 1 (covered above)
31+
// b) When the `every` duration is on the same order of magnitude as the gap in time
32+
// i) i.e. group by 1h or 2h or 3h over day light savings change
33+
// c) When a mixed time adjusts things across boundaries
34+
// d) When a period is negative
35+
36+
package interval_test
37+
38+
import "experimental"
39+
import "interval"
40+
41+
// these test cases cannot pass CI, so they're skipped for now. Once we're able to test a non table stream test case,
42+
// we can unskip them
43+
experimental.addDuration(d: 1d, to: 2020-01-01T00:00:00Z) == 2020-01-02T00:00:00Z or die(msg: "day addition")
44+
experimental.addDuration(d: 1mo, to: 2020-01-01T00:00:00Z) == 2020-02-01T00:00:00Z or die(msg: "month addition")
45+
experimental.addDuration(d: 1mo, to: 2020-01-31T00:00:00Z) == 2020-02-29T00:00:00Z or die(msg: "month addition end of month")
46+
experimental.addDuration(d: 1mo, to: 2020-02-28T00:00:00Z) == 2020-03-28T00:00:00Z or die(msg: "month addition ??")
47+
48+
// per minute intervals
49+
interval.intervals(every: 1m, period: 1m, offset: 0s)(start: 2020-10-30T00:00:00Z, stop: 2020-10-30T00:10:00Z) == [
50+
{start: 2020-10-30T00:09:00Z, stop: 2020-10-30T00:10:00Z},
51+
{start: 2020-10-30T00:08:00Z, stop: 2020-10-30T00:09:00Z},
52+
{start: 2020-10-30T00:07:00Z, stop: 2020-10-30T00:08:00Z},
53+
{start: 2020-10-30T00:06:00Z, stop: 2020-10-30T00:07:00Z},
54+
{start: 2020-10-30T00:05:00Z, stop: 2020-10-30T00:06:00Z},
55+
{start: 2020-10-30T00:04:00Z, stop: 2020-10-30T00:05:00Z},
56+
{start: 2020-10-30T00:03:00Z, stop: 2020-10-30T00:04:00Z},
57+
{start: 2020-10-30T00:02:00Z, stop: 2020-10-30T00:03:00Z},
58+
{start: 2020-10-30T00:01:00Z, stop: 2020-10-30T00:02:00Z},
59+
{start: 2020-10-30T00:00:00Z, stop: 2020-10-30T00:01:00Z},
60+
] or die(msg: "per minute intervals")
61+
62+
// daily
63+
interval.intervals(every: 1d, period: 1d, offset: 11h)(start: 2020-10-30T11:00:00Z, stop: 2020-11-05T11:00:00Z) == [
64+
{start: 2020-11-04T11:00:00Z, stop: 2020-11-05T11:00:00Z},
65+
{start: 2020-11-03T11:00:00Z, stop: 2020-11-04T11:00:00Z},
66+
{start: 2020-11-02T11:00:00Z, stop: 2020-11-03T11:00:00Z},
67+
{start: 2020-11-01T11:00:00Z, stop: 2020-11-02T11:00:00Z},
68+
{start: 2020-10-31T11:00:00Z, stop: 2020-11-01T11:00:00Z},
69+
{start: 2020-10-30T11:00:00Z, stop: 2020-10-31T11:00:00Z},
70+
] or die(msg: "per day intervals")
71+
72+
// daily 9-5
73+
interval.intervals(every: 1d, period: 8h, offset: 9h)(start: 2020-10-30T00:00:00Z, stop: 2020-11-05T00:00:00Z) == [
74+
{start: 2020-11-04T09:00:00Z, stop: 2020-11-04T17:00:00Z},
75+
{start: 2020-11-03T09:00:00Z, stop: 2020-11-03T17:00:00Z},
76+
{start: 2020-11-02T09:00:00Z, stop: 2020-11-02T17:00:00Z},
77+
{start: 2020-11-01T09:00:00Z, stop: 2020-11-01T17:00:00Z},
78+
{start: 2020-10-31T09:00:00Z, stop: 2020-10-31T17:00:00Z},
79+
{start: 2020-10-30T09:00:00Z, stop: 2020-10-30T17:00:00Z},
80+
] or die(msg: "per day 9AM-5PM intervals")

‎stdlib/packages.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444
_ "github.com/influxdata/flux/stdlib/internal/promql"
4545
_ "github.com/influxdata/flux/stdlib/internal/testutil"
4646
_ "github.com/influxdata/flux/stdlib/interpolate"
47+
_ "github.com/influxdata/flux/stdlib/interval"
4748
_ "github.com/influxdata/flux/stdlib/json"
4849
_ "github.com/influxdata/flux/stdlib/kafka"
4950
_ "github.com/influxdata/flux/stdlib/math"

‎stdlib/test_packages.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
tasks "github.com/influxdata/flux/stdlib/influxdata/influxdb/tasks"
2525
promql "github.com/influxdata/flux/stdlib/internal/promql"
2626
interpolate "github.com/influxdata/flux/stdlib/interpolate"
27+
interval "github.com/influxdata/flux/stdlib/interval"
2728
planner "github.com/influxdata/flux/stdlib/planner"
2829
regexp "github.com/influxdata/flux/stdlib/regexp"
2930
strings "github.com/influxdata/flux/stdlib/strings"
@@ -59,6 +60,7 @@ var FluxTestPackages = func() []*ast.Package {
5960
pkgs = append(pkgs, tasks.FluxTestPackages...)
6061
pkgs = append(pkgs, promql.FluxTestPackages...)
6162
pkgs = append(pkgs, interpolate.FluxTestPackages...)
63+
pkgs = append(pkgs, interval.FluxTestPackages...)
6264
pkgs = append(pkgs, planner.FluxTestPackages...)
6365
pkgs = append(pkgs, regexp.FluxTestPackages...)
6466
pkgs = append(pkgs, strings.FluxTestPackages...)

‎values/time.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,11 @@ func (d Duration) NanoOnly() bool {
216216
return d.Months() == 0 && d.Nanoseconds() != 0
217217
}
218218

219+
// IsMixed returns true if this duration has a combination of month and nanosecond components.
220+
func (d Duration) IsMixed() bool {
221+
return d.Months() != 0 && d.Nanoseconds() != 0
222+
}
223+
219224
func (d Duration) Months() int64 { return d.months }
220225
func (d Duration) Nanoseconds() int64 { return d.nsecs }
221226

‎values/time_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ func TestTime_Add(t *testing.T) {
8585
d: "1mo",
8686
want: "2019-02-28T00:00:00Z",
8787
},
88+
{
89+
t: "2019-02-28T00:00:00Z",
90+
d: "1mo",
91+
want: "2019-03-28T00:00:00Z",
92+
},
8893
{
8994
t: "2019-03-31T00:00:00Z",
9095
d: "-1mo",

0 commit comments

Comments
 (0)
Please sign in to comment.