forked from amenzhinsky/iothub
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclient_module.go
158 lines (135 loc) · 4.16 KB
/
client_module.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package iotdevice
import (
"context"
"gitlab.com/michaeljohn/iothub/common"
"gitlab.com/michaeljohn/iothub/iotdevice/transport"
"gitlab.com/michaeljohn/iothub/logger"
)
// structs
// ModuleClient is iothub device client adapted for use with a module connection
type ModuleClient struct {
Client
}
// functions
// NewModuleFromConnectionString returns a ModuleClient struct with credentials based off of a supplied connection string
func NewModuleFromConnectionString(
transport transport.Transport,
cs, gatewayHostName, moduleGenerationID, workloadURI string,
edge bool,
opts ...ClientOption,
) (*ModuleClient, error) {
creds, err := ParseModuleConnectionString(cs)
if err != nil {
return nil, err
}
creds.EdgeGateway = edge
creds.WorkloadURI = workloadURI
creds.Gateway = gatewayHostName
creds.GenerationID = moduleGenerationID
return NewModule(transport, creds, opts...)
}
func NewModuleFromEnvironment(
transport transport.Transport,
edge bool,
opts ...ClientOption,
) (*ModuleClient, error) {
creds, err := ParseModuleEnvironmentVariables()
if err != nil {
return nil, err
}
creds.EdgeGateway = edge
return NewModule(transport, creds, opts...)
}
func ParseModuleEnvironmentVariables() (*ModuleSharedAccessKeyCredentials, error) {
m, err := common.GetEdgeModuleEnvironmentVariables()
if err != nil {
return nil, err
}
return &ModuleSharedAccessKeyCredentials{
SharedAccessKeyCredentials: SharedAccessKeyCredentials{
DeviceID: m["DeviceID"],
SharedAccessKey: common.SharedAccessKey{
HostName: m["IOTHubHostName"],
},
},
ModuleID: m["ModuleID"],
WorkloadURI: m["WorkloadAPI"],
GenerationID: m["GenerationID"],
Gateway: m["GatewayHostName"],
}, nil
}
// ParseModuleConnectionString returns a ModuleSharedAccessKeyCredentials struct with some properties derived from a supplied connection string
func ParseModuleConnectionString(cs string) (*ModuleSharedAccessKeyCredentials, error) {
m, err := common.ParseConnectionString(cs, "DeviceId", "ModuleId")
if err != nil {
return nil, err
}
return &ModuleSharedAccessKeyCredentials{
SharedAccessKeyCredentials: SharedAccessKeyCredentials{
DeviceID: m["DeviceId"],
SharedAccessKey: common.SharedAccessKey{
HostName: m["HostName"],
SharedAccessKeyName: m["SharedAccessKeyName"],
SharedAccessKey: m["SharedAccessKey"],
},
},
ModuleID: m["ModuleId"],
}, nil
}
// NewModule returns a new ModuleClient struct
func NewModule(
transport transport.Transport, creds transport.Credentials, opts ...ClientOption,
) (*ModuleClient, error) {
c := &ModuleClient{
Client: Client{
tr: transport,
creds: creds,
ready: make(chan struct{}),
done: make(chan struct{}),
logger: logger.New(logger.LevelWarn, nil),
evMux: newEventsMux(),
tsMux: newTwinStateMux(),
dmMux: newMethodMux(),
},
}
for _, opt := range opts {
opt(&c.Client)
}
// transport uses the same logger as the client
c.tr.SetLogger(c.logger)
return c, nil
}
// methods
// ModuleID returns module ID property from client's credential property
func (c *ModuleClient) ModuleID() string {
return c.creds.GetModuleID()
}
// GenerationID returns generation ID property from client's credential property
func (c *ModuleClient) GenerationID() string {
return c.creds.GetGenerationID()
}
// Gateway returns gateway hostname property from client's credential property
func (c *ModuleClient) Gateway() string {
return c.creds.GetGateway()
}
// Broker returns broker property from client's credential property
func (c *ModuleClient) Broker() string {
return c.creds.GetBroker()
}
// SubscribeTwinUpdates subscribes to module desired state changes.
// It returns a channel to read the twin updates from.
func (c *ModuleClient) SubscribeTwinUpdates(ctx context.Context) (*TwinStateSub, error) {
if err := c.checkConnection(ctx); err != nil {
return nil, err
}
if err := c.tsMux.once(func() error {
return c.tr.SubscribeTwinUpdates(ctx, c.tsMux)
}); err != nil {
return nil, err
}
return c.tsMux.sub(), nil
}
// UnsubscribeTwinUpdates unsubscribes the given handler from twin state updates.
func (c *ModuleClient) UnsubscribeTwinUpdates(sub *TwinStateSub) {
c.tsMux.unsub(sub)
}