Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit df79295

Browse files
author
Christopher Wolff
committedDec 17, 2022
fix: update iox.sql to detect midstream errors
1 parent b9d6eb6 commit df79295

File tree

2 files changed

+33
-2
lines changed

2 files changed

+33
-2
lines changed
 

‎dependencies/iox/client.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,18 @@ func GetProvider(ctx context.Context) Provider {
4242
return p.(Provider)
4343
}
4444

45+
// RecordReader is similar to the RecordReader interface provided by Arrow's array
46+
// package, but includes a method for detecting errors that are sent mid-stream.
47+
type RecordReader interface {
48+
array.RecordReader
49+
Err() error
50+
}
51+
4552
// Client provides a way to query an iox instance.
4653
type Client interface {
4754
// Query will initiate a query using the given query string, parameters, and memory allocator
4855
// against the iox instance. It returns an array.RecordReader from the arrow flight api.
49-
Query(ctx context.Context, query string, params []interface{}, mem memory.Allocator) (array.RecordReader, error)
56+
Query(ctx context.Context, query string, params []interface{}, mem memory.Allocator) (RecordReader, error)
5057

5158
// GetSchema will retrieve a schema for the given table if this client supports that capability.
5259
// If this Client doesn't support this capability, it should return a flux error with the code

‎stdlib/experimental/iox/source.go

+25-1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ import (
1717
"github.com/influxdata/flux/internal/function"
1818
"github.com/influxdata/flux/memory"
1919
"github.com/influxdata/flux/plan"
20+
"github.com/opentracing/opentracing-go"
21+
"github.com/opentracing/opentracing-go/ext"
22+
"github.com/opentracing/opentracing-go/log"
2023
)
2124

2225
const SqlKind = "experimental/iox.sql"
@@ -116,6 +119,10 @@ func (s *sqlSource) createSchema(schema *stdarrow.Schema) ([]flux.ColMeta, error
116119
}
117120

118121
func (s *sqlSource) run(ctx context.Context) error {
122+
span, ctx := opentracing.StartSpanFromContext(ctx, "sqlSouce.run")
123+
defer span.Finish()
124+
span.LogFields(log.String("query", s.query))
125+
119126
// Note: query args are not actually supported yet, see
120127
// https://github.com/influxdata/influxdb_iox/issues/3718
121128
rr, err := s.client.Query(ctx, s.query, nil, s.mem)
@@ -130,14 +137,31 @@ func (s *sqlSource) run(ctx context.Context) error {
130137
}
131138
key := execute.NewGroupKey(nil, nil)
132139

133-
for rr.Next() {
140+
hasMore, err := nextRecordBatch(rr)
141+
for hasMore && err == nil {
134142
if err := s.produce(key, cols, rr.Record()); err != nil {
143+
ext.LogError(span, err)
135144
return err
136145
}
146+
hasMore, err = nextRecordBatch(rr)
137147
}
148+
if err != nil {
149+
ext.LogError(span, err)
150+
return err
151+
}
152+
138153
return nil
139154
}
140155

156+
func nextRecordBatch(rr iox.RecordReader) (bool, error) {
157+
n := rr.Next()
158+
if n {
159+
return true, nil
160+
}
161+
162+
return false, rr.Err()
163+
}
164+
141165
func (s *sqlSource) produce(key flux.GroupKey, cols []flux.ColMeta, record stdarrow.Record) error {
142166
buffer := arrow.TableBuffer{
143167
GroupKey: key,

0 commit comments

Comments
 (0)
Please sign in to comment.