Skip to content

Commit 9644702

Browse files
authored
feat(stdlib): add influxdb source (#2568)
The influxdb source will recognize the `from |> range` pattern and it will convert it to an AST and serialize that to a remote influxdb v2 server. If a `filter` is included afterwards, it will be converted back to AST and be included in the influxdb from call.
1 parent d850334 commit 9644702

39 files changed

+2082
-234
lines changed

cmd/flux/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,18 @@ package main
22

33
import (
44
"github.com/influxdata/flux/cmd/flux/cmd"
5+
"github.com/influxdata/flux/plan"
6+
"github.com/influxdata/flux/stdlib/influxdata/influxdb"
7+
58
// Register the sqlite3 database driver.
69
_ "github.com/mattn/go-sqlite3"
710
)
811

12+
const DefaultInfluxDBHost = "http://localhost:9999"
13+
914
func main() {
15+
plan.RegisterLogicalRules(influxdb.DefaultFromAttributes{
16+
Host: func(v string) *string { return &v }(DefaultInfluxDBHost),
17+
})
1018
cmd.Execute()
1119
}

execute/executetest/compile.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,19 @@ var (
2222
func FunctionExpression(t testing.TB, source string) *semantic.FunctionExpression {
2323
t.Helper()
2424

25-
if prelude == nil {
26-
prelude = runtime.Prelude()
27-
}
2825
if stdlib == nil {
2926
stdlib = runtime.StdLib()
3027
}
28+
if prelude == nil {
29+
prelude = values.NewScope()
30+
for _, path := range []string{"universe", "influxdata/influxdb"} {
31+
p, err := stdlib.ImportPackageObject(path)
32+
if err != nil {
33+
t.Fatalf("error importing prelude package %q: %s", path, err)
34+
}
35+
p.Range(prelude.Set)
36+
}
37+
}
3138

3239
pkg, err := semantic.AnalyzeSource(source)
3340
if err != nil {

interpreter/package.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ type Package struct {
1818
// name is the name of the package.
1919
name string
2020

21+
// path is the canonical import path that is used to import this package.
22+
path string
23+
2124
// object contains the object properties of this package.
2225
object values.Object
2326

@@ -26,16 +29,17 @@ type Package struct {
2629
sideEffects []SideEffect
2730
}
2831

29-
func NewPackageWithValues(name string, obj values.Object) *Package {
32+
func NewPackageWithValues(name, path string, obj values.Object) *Package {
3033
return &Package{
3134
name: name,
35+
path: path,
3236
object: obj,
3337
}
3438
}
3539

3640
func NewPackage(name string) *Package {
3741
obj := values.NewObject(semantic.NewObjectType(nil))
38-
return NewPackageWithValues(name, obj)
42+
return NewPackageWithValues(name, "", obj)
3943
}
4044

4145
func (p *Package) Copy() *Package {
@@ -50,9 +54,17 @@ func (p *Package) Copy() *Package {
5054
sideEffects: sideEffects,
5155
}
5256
}
57+
58+
// Name returns the package name.
5359
func (p *Package) Name() string {
5460
return p.name
5561
}
62+
63+
// Path returns the canonical import path for this package.
64+
func (p *Package) Path() string {
65+
return p.path
66+
}
67+
5668
func (p *Package) SideEffects() []SideEffect {
5769
return p.sideEffects
5870
}

interpreter/package_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ func TestAccessNestedImport(t *testing.T) {
4242
t.Skip("Handle imports for user-defined packages https://github.com/influxdata/flux/issues/2343")
4343
// package a
4444
// x = 0
45-
packageA := interpreter.NewPackageWithValues("a", values.NewObjectWithValues(map[string]values.Value{
45+
packageA := interpreter.NewPackageWithValues("a", "", values.NewObjectWithValues(map[string]values.Value{
4646
"x": values.NewInt(0),
4747
}))
4848

4949
// package b
5050
// import "a"
51-
packageB := interpreter.NewPackageWithValues("b", values.NewObjectWithValues(map[string]values.Value{
51+
packageB := interpreter.NewPackageWithValues("b", "", values.NewObjectWithValues(map[string]values.Value{
5252
"a": packageA,
5353
}))
5454

libflux/src/flux/semantic/builtins.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,14 @@ pub fn builtins() -> Builtins<'static> {
124124
"influxdata/influxdb" => maplit::hashmap! {
125125
// This is a one-or-the-other parameters function
126126
// https://github.com/influxdata/flux/issues/1659
127-
"from" => "forall [t0, t1] (?bucket: string, ?bucketID: string) -> [{_measurement: string | _field: string | _time: time | _value: t0 | t1}]",
127+
"from" => r#"forall [t0, t1] (
128+
?bucket: string,
129+
?bucketID: string,
130+
?org: string,
131+
?orgID: string,
132+
?host: string,
133+
?token: string
134+
) -> [{_measurement: string | _field: string | _time: time | _value: t0 | t1}]"#,
128135
// exactly one of (bucket, bucketID) must be specified
129136
// exactly one of (org, orgID) must be specified
130137
// https://github.com/influxdata/flux/issues/1660

plan/format_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717

1818
func TestFormatted(t *testing.T) {
1919
fromSpec := &influxdb.FromProcedureSpec{
20-
Bucket: "my-bucket",
20+
Bucket: influxdb.NameOrID{Name: "my-bucket"},
2121
}
2222

2323
// (r) => r._value > 5.0

plan/logical_test.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func TestPlan_LogicalPlanFromSpec(t *testing.T) {
5454

5555
var (
5656
fromSpec = &influxdb.FromProcedureSpec{
57-
Bucket: "my-bucket",
57+
Bucket: influxdb.NameOrID{Name: "my-bucket"},
5858
}
5959
rangeSpec = &universe.RangeProcedureSpec{
6060
Bounds: flux.Bounds{
@@ -449,7 +449,9 @@ func TestLogicalPlanner(t *testing.T) {
449449
yield(name: "result")`,
450450
wantPlan: plantest.PlanSpec{
451451
Nodes: []plan.Node{
452-
plan.CreateLogicalNode("from0", &influxdb.FromProcedureSpec{Bucket: "telegraf"}),
452+
plan.CreateLogicalNode("from0", &influxdb.FromProcedureSpec{
453+
Bucket: influxdb.NameOrID{Name: "telegraf"},
454+
}),
453455
plan.CreateLogicalNode("merged_filter1_filter2_filter3", &universe.FilterProcedureSpec{
454456
Fn: interpreter.ResolvedFunction{
455457
Scope: valuestest.Scope(),
@@ -497,7 +499,9 @@ func TestLogicalPlanner(t *testing.T) {
497499
from(bucket: "telegraf") |> map(fn: (r) => ({r with _value: r._value * 2.0})) |> filter(fn: (r) => r._value < 10.0) |> yield(name: "result")`,
498500
wantPlan: plantest.PlanSpec{
499501
Nodes: []plan.Node{
500-
plan.CreateLogicalNode("from0", &influxdb.FromProcedureSpec{Bucket: "telegraf"}),
502+
plan.CreateLogicalNode("from0", &influxdb.FromProcedureSpec{
503+
Bucket: influxdb.NameOrID{Name: "telegraf"},
504+
}),
501505
plan.CreateLogicalNode("filter2_copy", &universe.FilterProcedureSpec{
502506
Fn: interpreter.ResolvedFunction{
503507
Scope: valuestest.Scope(),
@@ -555,7 +559,9 @@ func TestLogicalPlanner(t *testing.T) {
555559
yield(name: "result")`,
556560
wantPlan: plantest.PlanSpec{
557561
Nodes: []plan.Node{
558-
plan.CreateLogicalNode("from0", &influxdb.FromProcedureSpec{Bucket: "telegraf"}),
562+
plan.CreateLogicalNode("from0", &influxdb.FromProcedureSpec{
563+
Bucket: influxdb.NameOrID{Name: "telegraf"},
564+
}),
559565
plan.CreateLogicalNode("merged_filter1_filter3_copy", &universe.FilterProcedureSpec{
560566
Fn: interpreter.ResolvedFunction{
561567
Scope: valuestest.Scope(),

runtime/builtins_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ func TestValidatePackageBuiltins(t *testing.T) {
1919
}{
2020
{
2121
name: "no errors",
22-
pkg: interpreter.NewPackageWithValues("test", values.NewObjectWithValues(map[string]values.Value{
22+
pkg: interpreter.NewPackageWithValues("test", "", values.NewObjectWithValues(map[string]values.Value{
2323
"foo": values.NewInt(0),
2424
})),
2525
astPkg: &ast.Package{
@@ -34,15 +34,15 @@ func TestValidatePackageBuiltins(t *testing.T) {
3434
},
3535
{
3636
name: "extra values",
37-
pkg: interpreter.NewPackageWithValues("test", values.NewObjectWithValues(map[string]values.Value{
37+
pkg: interpreter.NewPackageWithValues("test", "", values.NewObjectWithValues(map[string]values.Value{
3838
"foo": values.NewInt(0),
3939
})),
4040
astPkg: &ast.Package{},
4141
err: errors.New("missing builtin values [], extra builtin values [foo]"),
4242
},
4343
{
4444
name: "missing values",
45-
pkg: interpreter.NewPackageWithValues("test", values.NewObjectWithValues(map[string]values.Value{})),
45+
pkg: interpreter.NewPackageWithValues("test", "", values.NewObjectWithValues(map[string]values.Value{})),
4646
astPkg: &ast.Package{
4747
Files: []*ast.File{{
4848
Body: []ast.Statement{
@@ -56,7 +56,7 @@ func TestValidatePackageBuiltins(t *testing.T) {
5656
},
5757
{
5858
name: "missing and values",
59-
pkg: interpreter.NewPackageWithValues("test", values.NewObjectWithValues(map[string]values.Value{
59+
pkg: interpreter.NewPackageWithValues("test", "", values.NewObjectWithValues(map[string]values.Value{
6060
"foo": values.NewInt(0),
6161
"bar": values.NewInt(0),
6262
})),

runtime/importer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ func (imp *importer) ImportPackageObject(path string) (*interpreter.Package, err
8282
return nil, err
8383
}
8484
obj := newObjectFromScope(scope)
85-
imp.pkgs[path] = interpreter.NewPackageWithValues(itrp.PackageName(), obj)
85+
imp.pkgs[path] = interpreter.NewPackageWithValues(itrp.PackageName(), path, obj)
8686
return imp.pkgs[path], nil
8787
}
8888

spec_test.go

Lines changed: 0 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,104 +1,14 @@
11
package flux_test
22

33
import (
4-
"encoding/json"
54
"errors"
65
"strconv"
76
"testing"
8-
"time"
97

108
"github.com/google/go-cmp/cmp"
11-
"github.com/google/go-cmp/cmp/cmpopts"
129
"github.com/influxdata/flux"
13-
"github.com/influxdata/flux/stdlib/influxdata/influxdb"
14-
"github.com/influxdata/flux/stdlib/universe"
1510
)
1611

17-
var ignoreUnexportedQuerySpec = cmpopts.IgnoreUnexported(flux.Spec{})
18-
19-
func TestSpec_JSON(t *testing.T) {
20-
srcData := []byte(`
21-
{
22-
"operations":[
23-
{
24-
"id": "from",
25-
"kind": "from",
26-
"spec": {
27-
"bucket":"mybucket"
28-
}
29-
},
30-
{
31-
"id": "range",
32-
"kind": "range",
33-
"spec": {
34-
"start": "-4h",
35-
"stop": "now"
36-
}
37-
},
38-
{
39-
"id": "sum",
40-
"kind": "sum"
41-
}
42-
],
43-
"edges":[
44-
{"parent":"from","child":"range"},
45-
{"parent":"range","child":"sum"}
46-
]
47-
}
48-
`)
49-
50-
// Ensure we can properly unmarshal a query
51-
gotQ := flux.Spec{}
52-
if err := json.Unmarshal(srcData, &gotQ); err != nil {
53-
t.Fatal(err)
54-
}
55-
expQ := flux.Spec{
56-
Operations: []*flux.Operation{
57-
{
58-
ID: "from",
59-
Spec: &influxdb.FromOpSpec{
60-
Bucket: "mybucket",
61-
},
62-
},
63-
{
64-
ID: "range",
65-
Spec: &universe.RangeOpSpec{
66-
Start: flux.Time{
67-
Relative: -4 * time.Hour,
68-
IsRelative: true,
69-
},
70-
Stop: flux.Time{
71-
IsRelative: true,
72-
},
73-
},
74-
},
75-
{
76-
ID: "sum",
77-
Spec: &universe.SumOpSpec{},
78-
},
79-
},
80-
Edges: []flux.Edge{
81-
{Parent: "from", Child: "range"},
82-
{Parent: "range", Child: "sum"},
83-
},
84-
}
85-
if !cmp.Equal(gotQ, expQ, ignoreUnexportedQuerySpec) {
86-
t.Errorf("unexpected query:\n%s", cmp.Diff(gotQ, expQ, ignoreUnexportedQuerySpec))
87-
}
88-
89-
// Ensure we can properly marshal a query
90-
data, err := json.Marshal(expQ)
91-
if err != nil {
92-
t.Fatal(err)
93-
}
94-
if err := json.Unmarshal(data, &gotQ); err != nil {
95-
t.Fatal(err)
96-
}
97-
if !cmp.Equal(gotQ, expQ, ignoreUnexportedQuerySpec) {
98-
t.Errorf("unexpected query after marshalling: -want/+got %s", cmp.Diff(expQ, gotQ, ignoreUnexportedQuerySpec))
99-
}
100-
}
101-
10212
func TestSpec_Walk(t *testing.T) {
10313
testCases := []struct {
10414
query *flux.Spec

0 commit comments

Comments
 (0)