@@ -6,13 +6,19 @@ import (
6
6
"time"
7
7
8
8
"github.com/influxdata/flux"
9
+ "github.com/influxdata/flux/ast"
9
10
"github.com/influxdata/flux/functions/inputs"
10
11
"github.com/influxdata/flux/functions/transformations"
11
12
"github.com/influxdata/flux/planner"
12
13
"github.com/influxdata/flux/planner/plantest"
13
14
"github.com/influxdata/flux/semantic"
14
15
)
15
16
17
+ func compile (fluxText string ) (* flux.Spec , error ) {
18
+ now := time .Now ().UTC ()
19
+ return flux .Compile (context .Background (), fluxText , now )
20
+ }
21
+
16
22
// Test the translation of Flux query to logical plan
17
23
func TestFluxSpecToLogicalPlan (t * testing.T ) {
18
24
testcases := []struct {
@@ -99,14 +105,12 @@ func TestFluxSpecToLogicalPlan(t *testing.T) {
99
105
t .Run (tc .name , func (t * testing.T ) {
100
106
t .Parallel ()
101
107
102
- now := time .Now ().UTC ()
103
- spec , err := flux .Compile (context .Background (), tc .query , now )
108
+ spec , err := compile (tc .query )
104
109
105
110
if err != nil {
106
111
t .Fatal (err )
107
112
}
108
113
109
- tc .spec .Now = now
110
114
want := plantest .CreateLogicalPlanSpec (tc .spec )
111
115
112
116
thePlanner := planner .NewLogicalPlanner ()
@@ -125,3 +129,197 @@ func TestFluxSpecToLogicalPlan(t *testing.T) {
125
129
})
126
130
}
127
131
}
132
+
133
+ type MergeFiltersRule struct {
134
+ }
135
+
136
+ func (MergeFiltersRule ) Name () string {
137
+ return "mergeFilters"
138
+ }
139
+
140
+ func (MergeFiltersRule ) Pattern () planner.Pattern {
141
+ return planner .Pat (transformations .FilterKind ,
142
+ planner .Pat (transformations .FilterKind ,
143
+ planner .Any ()))
144
+ }
145
+
146
+ func (MergeFiltersRule ) Rewrite (pn planner.PlanNode ) (planner.PlanNode , bool ) {
147
+ specTop := pn .ProcedureSpec ()
148
+
149
+ filterSpecTop := specTop .(* transformations.FilterProcedureSpec )
150
+ filterSpecBottom := pn .Predecessors ()[0 ].ProcedureSpec ().(* transformations.FilterProcedureSpec )
151
+ mergedFilterSpec := mergeFilterSpecs (filterSpecTop , filterSpecBottom )
152
+
153
+ return planner .MergeLogicalPlanNodes (pn , pn .Predecessors ()[0 ], mergedFilterSpec ), true
154
+ }
155
+
156
+ func mergeFilterSpecs (a , b * transformations.FilterProcedureSpec ) planner.ProcedureSpec {
157
+ fn := a .Fn .Copy ().(* semantic.FunctionExpression )
158
+
159
+ aExp , aOK := a .Fn .Body .(semantic.Expression )
160
+ bExp , bOK := b .Fn .Body .(semantic.Expression )
161
+
162
+ if ! aOK || ! bOK {
163
+ // Note that this is just a unit test, so "return" statements are not handled.
164
+ panic ("function body not expression" )
165
+ }
166
+
167
+ fn .Body = & semantic.LogicalExpression {
168
+ Operator : ast .AndOperator ,
169
+ Left : aExp ,
170
+ Right : bExp ,
171
+ }
172
+
173
+ return & transformations.FilterProcedureSpec {
174
+ Fn : fn ,
175
+ }
176
+ }
177
+
178
+ type PushFilterThroughMapRule struct {
179
+ }
180
+
181
+ func (PushFilterThroughMapRule ) Name () string {
182
+ return "pushFilterThroughMap"
183
+ }
184
+
185
+ func (PushFilterThroughMapRule ) Pattern () planner.Pattern {
186
+ return planner .Pat (transformations .FilterKind ,
187
+ planner .Pat (transformations .MapKind ,
188
+ planner .Any ()))
189
+ }
190
+
191
+ func (PushFilterThroughMapRule ) Rewrite (pn planner.PlanNode ) (planner.PlanNode , bool ) {
192
+ // It will not always be possible to push a filter through a map... but this is just a unit test.
193
+ return planner .SwapPlanNodes (pn , pn .Predecessors ()[0 ]), true
194
+ }
195
+
196
+ func init () {
197
+ planner .RegisterLogicalRule (MergeFiltersRule {})
198
+ planner .RegisterLogicalRule (PushFilterThroughMapRule {})
199
+ }
200
+
201
+ func TestLogicalPlanner (t * testing.T ) {
202
+ testcases := []struct {
203
+ name string
204
+ flux string
205
+ wantPlan plantest.LogicalPlanSpec
206
+ }{{
207
+ name : "with merge-able filters" ,
208
+ flux : `
209
+ from(bucket: "telegraf") |>
210
+ filter(fn: (r) => r._measurement == "cpu") |>
211
+ filter(fn: (r) => r._value > 0.5) |>
212
+ filter(fn: (r) => r._value < 0.9) |>
213
+ yield(name: "result")` ,
214
+ wantPlan : plantest.LogicalPlanSpec {
215
+ Nodes : []planner.PlanNode {
216
+ planner .CreateLogicalNode ("from0" , & inputs.FromProcedureSpec {Bucket : "telegraf" }),
217
+ planner .CreateLogicalNode ("merged_filter1_merged_filter2_filter3" , & transformations.FilterProcedureSpec {Fn : & semantic.FunctionExpression {
218
+ Params : []* semantic.FunctionParam {{Key : & semantic.Identifier {Name : "r" }}},
219
+ Body : & semantic.LogicalExpression {Operator : ast .AndOperator ,
220
+ Left : & semantic.LogicalExpression {Operator : ast .AndOperator ,
221
+ Left : & semantic.BinaryExpression {Operator : ast .LessThanOperator ,
222
+ Left : & semantic.MemberExpression {Object : & semantic.IdentifierExpression {Name : "r" }, Property : "_value" },
223
+ Right : & semantic.FloatLiteral {Value : 0.9 }},
224
+ Right : & semantic.BinaryExpression {Operator : ast .GreaterThanOperator ,
225
+ Left : & semantic.MemberExpression {Object : & semantic.IdentifierExpression {Name : "r" }, Property : "_value" },
226
+ Right : & semantic.FloatLiteral {Value : 0.5 }}},
227
+ Right : & semantic.BinaryExpression {Operator : ast .EqualOperator ,
228
+ Left : & semantic.MemberExpression {Object : & semantic.IdentifierExpression {Name : "r" }, Property : "_measurement" },
229
+ Right : & semantic.StringLiteral {Value : "cpu" }}}},
230
+ }),
231
+ planner .CreateLogicalNode ("yield4" , & transformations.YieldProcedureSpec {Name : "result" }),
232
+ },
233
+ Edges : [][2 ]int {
234
+ {0 , 1 },
235
+ {1 , 2 },
236
+ },
237
+ },
238
+ },
239
+ {
240
+ name : "with swappable map and filter" ,
241
+ flux : `from(bucket: "telegraf") |> map(fn: (r) => r._value * 2.0) |> filter(fn: (r) => r._value < 10.0) |> yield(name: "result")` ,
242
+ wantPlan : plantest.LogicalPlanSpec {
243
+ Nodes : []planner.PlanNode {
244
+ planner .CreateLogicalNode ("from0" , & inputs.FromProcedureSpec {Bucket : "telegraf" }),
245
+ planner .CreateLogicalNode ("filter2_copy" , & transformations.FilterProcedureSpec {Fn : & semantic.FunctionExpression {
246
+ Params : []* semantic.FunctionParam {{Key : & semantic.Identifier {Name : "r" }}},
247
+ Body : & semantic.BinaryExpression {Operator : ast .LessThanOperator ,
248
+ Left : & semantic.MemberExpression {Object : & semantic.IdentifierExpression {Name : "r" }, Property : "_value" },
249
+ Right : & semantic.FloatLiteral {Value : 10 }},
250
+ }}),
251
+ planner .CreateLogicalNode ("map1" , & transformations.MapProcedureSpec {
252
+ Fn : & semantic.FunctionExpression {
253
+ Params : []* semantic.FunctionParam {{Key : & semantic.Identifier {Name : "r" }}},
254
+ Body : & semantic.BinaryExpression {Operator : ast .MultiplicationOperator ,
255
+ Left : & semantic.MemberExpression {Object : & semantic.IdentifierExpression {Name : "r" }, Property : "_value" },
256
+ Right : & semantic.FloatLiteral {Value : 2 }}},
257
+ MergeKey : true ,
258
+ }),
259
+ planner .CreateLogicalNode ("yield3" , & transformations.YieldProcedureSpec {Name : "result" }),
260
+ },
261
+ Edges : [][2 ]int {
262
+ {0 , 1 },
263
+ {1 , 2 },
264
+ {2 , 3 },
265
+ },
266
+ }},
267
+ {
268
+ name : "rules working together" ,
269
+ flux : `
270
+ from(bucket: "telegraf") |>
271
+ filter(fn: (r) => r._value != 0) |>
272
+ map(fn: (r) => r._value * 10) |>
273
+ filter(fn: (r) => f._value < 100) |>
274
+ yield(name: "result")` ,
275
+ wantPlan : plantest.LogicalPlanSpec {
276
+ Nodes : []planner.PlanNode {
277
+ planner .CreateLogicalNode ("from0" , & inputs.FromProcedureSpec {Bucket : "telegraf" }),
278
+ planner .CreateLogicalNode ("merged_filter1_filter3_copy" , & transformations.FilterProcedureSpec {Fn : & semantic.FunctionExpression {
279
+ Params : []* semantic.FunctionParam {{Key : & semantic.Identifier {Name : "r" }}},
280
+ Body : & semantic.LogicalExpression {Operator : ast .AndOperator ,
281
+ Left : & semantic.BinaryExpression {Operator : ast .LessThanOperator ,
282
+ Left : & semantic.MemberExpression {Object : & semantic.IdentifierExpression {Name : "f" }, Property : "_value" },
283
+ Right : & semantic.IntegerLiteral {Value : 100 }},
284
+ Right : & semantic.BinaryExpression {Operator : ast .NotEqualOperator ,
285
+ Left : & semantic.MemberExpression {Object : & semantic.IdentifierExpression {Name : "r" }, Property : "_value" },
286
+ Right : & semantic.IntegerLiteral {}}},
287
+ }}),
288
+ planner .CreateLogicalNode ("map2" , & transformations.MapProcedureSpec {Fn : & semantic.FunctionExpression {
289
+ Params : []* semantic.FunctionParam {{Key : & semantic.Identifier {Name : "r" }}},
290
+ Body : & semantic.BinaryExpression {Operator : ast .MultiplicationOperator ,
291
+ Left : & semantic.MemberExpression {Object : & semantic.IdentifierExpression {Name : "r" }, Property : "_value" },
292
+ Right : & semantic.IntegerLiteral {Value : 10 }}},
293
+ MergeKey : true ,
294
+ }),
295
+ planner .CreateLogicalNode ("yield4" , & transformations.YieldProcedureSpec {Name : "result" }),
296
+ },
297
+ Edges : [][2 ]int {
298
+ {0 , 1 },
299
+ {1 , 2 },
300
+ {2 , 3 },
301
+ },
302
+ },
303
+ },
304
+ }
305
+
306
+ for _ , tc := range testcases {
307
+ tc := tc
308
+ t .Run (tc .name , func (t * testing.T ) {
309
+ t .Parallel ()
310
+
311
+ fluxSpec , err := compile (tc .flux )
312
+ if err != nil {
313
+ t .Fatalf ("could not compile flux query: %v" , err )
314
+ }
315
+
316
+ logicalPlanner := planner .NewLogicalPlanner ()
317
+ logicalPlan , err := logicalPlanner .Plan (fluxSpec )
318
+
319
+ wantPlan := plantest .CreateLogicalPlanSpec (& tc .wantPlan )
320
+ if err := plantest .ComparePlans (wantPlan , logicalPlan , plantest .CompareLogicalPlanNodes ); err != nil {
321
+ t .Error (err )
322
+ }
323
+ })
324
+ }
325
+ }
0 commit comments