Skip to content

Commit 5ffcf36

Browse files
authored
feat(dependencies/mqtt): add a dependency for mqtt (#4169)
1 parent c6ef543 commit 5ffcf36

File tree

6 files changed

+195
-29
lines changed

6 files changed

+195
-29
lines changed

dependencies/mqtt/mqtt.go

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package mqtt
2+
3+
import (
4+
"context"
5+
"io"
6+
"time"
7+
8+
mqtt "github.com/eclipse/paho.mqtt.golang"
9+
"github.com/influxdata/flux"
10+
"github.com/influxdata/flux/codes"
11+
"github.com/influxdata/flux/internal/errors"
12+
)
13+
14+
const (
15+
DefaultTimeout = 1 * time.Second
16+
DefaultClientID = "flux-mqtt"
17+
)
18+
19+
type key int
20+
21+
const clientKey key = iota
22+
23+
// Inject will inject this Dialer into the dependency chain.
24+
func Inject(ctx context.Context, dialer Dialer) context.Context {
25+
return context.WithValue(ctx, clientKey, dialer)
26+
}
27+
28+
// Dependency will inject the Dialer into the dependency chain.
29+
type Dependency struct {
30+
Dialer Dialer
31+
}
32+
33+
// Inject will inject the Dialer into the dependency chain.
34+
func (d Dependency) Inject(ctx context.Context) context.Context {
35+
return Inject(ctx, d.Dialer)
36+
}
37+
38+
// GetDialer will return the Dialer for the current context.
39+
// If no Dialer has been injected into the dependencies,
40+
// this will return a default provider.
41+
func GetDialer(ctx context.Context) Dialer {
42+
p := ctx.Value(clientKey)
43+
if p == nil {
44+
return DefaultProvider{}
45+
}
46+
return p.(Dialer)
47+
}
48+
49+
// Options contains additional options for configuring the mqtt client.
50+
type Options struct {
51+
ClientID string
52+
Username string
53+
Password string
54+
Timeout time.Duration
55+
}
56+
57+
// Dialer provides a method to connect a client to one or more mqtt brokers.
58+
type Dialer interface {
59+
// Dial will connect to the given brokers and return a Client.
60+
Dial(ctx context.Context, brokers []string, options Options) (Client, error)
61+
}
62+
63+
// Client is an mqtt client that can publish to an mqtt broker.
64+
type Client interface {
65+
// Publish will publish the payload to a particular topic.
66+
Publish(ctx context.Context, topic string, qos byte, retain bool, payload interface{}) error
67+
68+
io.Closer
69+
}
70+
71+
// DefaultProvider is the default provider that uses the default mqtt client.
72+
type DefaultProvider struct{}
73+
74+
func (p DefaultProvider) Dial(ctx context.Context, brokers []string, options Options) (Client, error) {
75+
if len(brokers) == 0 {
76+
return nil, errors.New(codes.Invalid, "at least one broker is required for mqtt")
77+
}
78+
opts := mqtt.NewClientOptions()
79+
for _, broker := range brokers {
80+
opts.AddBroker(broker)
81+
}
82+
83+
deps := flux.GetDependencies(ctx)
84+
if url, err := deps.URLValidator(); err != nil {
85+
return nil, err
86+
} else {
87+
for _, broker := range opts.Servers {
88+
if err := url.Validate(broker); err != nil {
89+
return nil, err
90+
}
91+
}
92+
}
93+
94+
if options.ClientID != "" {
95+
opts.SetClientID(options.ClientID)
96+
} else {
97+
opts.SetClientID(DefaultClientID)
98+
}
99+
100+
if options.Timeout > 0 {
101+
opts.SetConnectTimeout(options.Timeout)
102+
} else {
103+
opts.SetConnectTimeout(DefaultTimeout)
104+
}
105+
106+
if options.Username != "" {
107+
opts.SetUsername(options.Username)
108+
if options.Password != "" {
109+
opts.SetPassword(options.Password)
110+
}
111+
}
112+
113+
client := mqtt.NewClient(opts)
114+
if token := client.Connect(); token.Wait() && token.Error() != nil {
115+
return nil, token.Error()
116+
}
117+
return &defaultClient{
118+
client: client,
119+
timeout: options.Timeout,
120+
}, nil
121+
}
122+
123+
type defaultClient struct {
124+
client mqtt.Client
125+
timeout time.Duration
126+
}
127+
128+
func (d *defaultClient) Publish(ctx context.Context, topic string, qos byte, retain bool, payload interface{}) error {
129+
token := d.client.Publish(topic, qos, retain, payload)
130+
if !token.WaitTimeout(d.timeout) {
131+
return errors.New(codes.Canceled, "mqtt publish: timeout reached")
132+
} else if err := token.Error(); err != nil {
133+
return err
134+
}
135+
return nil
136+
}
137+
138+
func (d *defaultClient) Close() error {
139+
d.client.Disconnect(250)
140+
return nil
141+
}

mock/mqtt_provider.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package mock
2+
3+
import (
4+
"context"
5+
6+
"github.com/influxdata/flux/dependencies/mqtt"
7+
)
8+
9+
type MqttDialer struct {
10+
DialFn func(ctx context.Context, brokers []string, options mqtt.Options) (mqtt.Client, error)
11+
}
12+
13+
func (m MqttDialer) Dial(ctx context.Context, brokers []string, options mqtt.Options) (mqtt.Client, error) {
14+
return m.DialFn(ctx, brokers, options)
15+
}
16+
17+
type MqttClient struct {
18+
PublishFn func(ctx context.Context, topic string, qos byte, retain bool, payload interface{}) error
19+
CloseFn func() error
20+
}
21+
22+
func (m MqttClient) Publish(ctx context.Context, topic string, qos byte, retain bool, payload interface{}) error {
23+
return m.PublishFn(ctx, topic, qos, retain, payload)
24+
}
25+
26+
func (m MqttClient) Close() error {
27+
if m.CloseFn == nil {
28+
return nil
29+
}
30+
return m.CloseFn()
31+
}

stdlib/experimental/mqtt/mqtt.go

Lines changed: 15 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package mqtt
22

33
import (
4+
"context"
45
"time"
56

6-
MQTT "github.com/eclipse/paho.mqtt.golang"
77
"github.com/influxdata/flux"
88
"github.com/influxdata/flux/codes"
9+
"github.com/influxdata/flux/dependencies/mqtt"
910
"github.com/influxdata/flux/internal/errors"
1011
"github.com/influxdata/flux/values"
1112
)
@@ -88,32 +89,22 @@ func (o *CommonMQTTOpSpec) ReadArgs(args flux.Arguments) error {
8889
return nil
8990
}
9091

91-
func publish(topic, message string, spec *CommonMQTTOpSpec) (bool, error) {
92-
opts := MQTT.NewClientOptions().AddBroker(spec.Broker)
93-
if spec.ClientID != "" {
94-
opts.SetClientID(spec.ClientID)
95-
} else {
96-
opts.SetClientID(DefaultClientID)
97-
}
98-
if spec.Timeout > 0 {
99-
opts.SetConnectTimeout(spec.Timeout)
100-
}
101-
if spec.Username != "" {
102-
opts.SetUsername(spec.Username)
103-
if spec.Password != "" {
104-
opts.SetPassword(spec.Password)
105-
}
92+
func publish(ctx context.Context, topic, message string, spec *CommonMQTTOpSpec) (bool, error) {
93+
options := mqtt.Options{
94+
ClientID: spec.ClientID,
95+
Username: spec.Username,
96+
Password: spec.Password,
97+
Timeout: spec.Timeout,
10698
}
107-
108-
client := MQTT.NewClient(opts)
109-
if token := client.Connect(); token.Wait() && token.Error() != nil {
110-
return false, token.Error()
99+
provider := mqtt.GetDialer(ctx)
100+
client, err := provider.Dial(ctx, []string{spec.Broker}, options)
101+
if err != nil {
102+
return false, err
111103
}
112-
defer client.Disconnect(250)
104+
defer func() { _ = client.Close() }()
113105

114-
if token := client.Publish(topic, byte(spec.QoS), spec.Retain, message); token.Wait() && token.Error() != nil {
115-
return false, token.Error()
106+
if err := client.Publish(ctx, topic, byte(spec.QoS), spec.Retain, message); err != nil {
107+
return false, err
116108
}
117-
118109
return true, nil
119110
}

stdlib/experimental/mqtt/publish.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func init() {
3838
return nil, errors.New(codes.Invalid, "empty message")
3939
}
4040

41-
published, err := publish(topic, message, spec)
41+
published, err := publish(ctx, topic, message, spec)
4242
if err != nil {
4343
return nil, err
4444
}

stdlib/experimental/mqtt/to.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package mqtt
22

33
import (
4+
"context"
45
"encoding/json"
56
"net/url"
67
"sort"
@@ -193,12 +194,13 @@ func createToMQTTTransformation(id execute.DatasetID, mode execute.AccumulationM
193194
}
194195
cache := execute.NewTableBuilderCache(a.Allocator())
195196
d := execute.NewDataset(id, mode, cache)
196-
t := NewToMQTTTransformation(d, cache, s)
197+
t := NewToMQTTTransformation(a.Context(), d, cache, s)
197198
return t, d, nil
198199
}
199200

200201
type ToMQTTTransformation struct {
201202
execute.ExecutionNode
203+
ctx context.Context
202204
d execute.Dataset
203205
cache execute.TableBuilderCache
204206
spec *ToMQTTProcedureSpec
@@ -208,8 +210,9 @@ func (t *ToMQTTTransformation) RetractTable(id execute.DatasetID, key flux.Group
208210
return t.d.RetractTable(key)
209211
}
210212

211-
func NewToMQTTTransformation(d execute.Dataset, cache execute.TableBuilderCache, spec *ToMQTTProcedureSpec) *ToMQTTTransformation {
213+
func NewToMQTTTransformation(ctx context.Context, d execute.Dataset, cache execute.TableBuilderCache, spec *ToMQTTProcedureSpec) *ToMQTTTransformation {
212214
return &ToMQTTTransformation{
215+
ctx: ctx,
213216
d: d,
214217
cache: cache,
215218
spec: spec,
@@ -366,7 +369,7 @@ func (t *ToMQTTTransformation) Process(id execute.DatasetID, tbl flux.Table) err
366369
topic = m.createTopic(message)
367370
}
368371
spec := &t.spec.Spec.CommonMQTTOpSpec
369-
publish(topic, message, spec)
372+
publish(t.ctx, topic, message, spec)
370373
}
371374

372375
return nil

stdlib/experimental/mqtt/to_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -542,7 +542,7 @@ func TestToMQTT_Process(t *testing.T) {
542542
tc.want.Table,
543543
nil,
544544
func(d execute.Dataset, c execute.TableBuilderCache) execute.Transformation {
545-
return mqtt.NewToMQTTTransformation(d, c, tc.spec)
545+
return mqtt.NewToMQTTTransformation(context.Background(), d, c, tc.spec)
546546
},
547547
)
548548
msg, err := receive(received)

0 commit comments

Comments
 (0)