Skip to content

Commit

Permalink
docs(conn): connection guide (#3186)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <[email protected]>
  • Loading branch information
ngjaying authored Sep 12, 2024
1 parent 1a2a63e commit 702dcf7
Show file tree
Hide file tree
Showing 9 changed files with 278 additions and 139 deletions.
8 changes: 8 additions & 0 deletions docs/directory.json
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,10 @@
]
}
]
},
{
"title": "连接管理",
"path": "guide/connections/overview"
}
]
},
Expand Down Expand Up @@ -1120,6 +1124,10 @@
]
}
]
},
{
"title": "Connection Management",
"path": "guide/connections/overview"
}
]
},
Expand Down
134 changes: 134 additions & 0 deletions docs/en_US/guide/connections/overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Connection Management

Sources and Sinks are used for interacting with external systems, which involve connecting to external resources. This
chapter primarily explains how eKuiper manages connections.

## Connection Types

Different connection types vary in complexity. For example, MQTT long connections require attention to connection
status, and there may be disconnections during rule execution that necessitate automatic reconnection. On the other
hand, HTTP connections are stateless by default, making their state management simpler. To unify the management of
complex connection resources, including creation, reuse, automatic reconnection, and obtaining connection status,
eKuiper v2 introduced an internal connection pool component and adapted a series of connection types:

- MQTT Connection
- Neuron Connection
- EdgeX Connection
- SQL Connection
- HTTP Connection (including REST sink, HTTP Pull source, and HTTP push source connections)
- WebSocket Connection

Other connection types may be gradually integrated in subsequent versions. Connection types integrated into the
connection pool can be independently created via API and accessed.

In eKuiper, the lifecycle management of various connections is divided into three types:

1. **Connection Attached to Rule**: By default, connections are managed by the Source/Sink implementation itself, with
their lifecycle controlled by the rule in use. The connection resources will only start connecting when the rule is
started; when the rule ends, the connection will be closed. In the example below, we create a data stream `memStream`
of type `memory`. Since this type is not integrated into the connection pool, the connection will only be established
when a rule using this stream is started.

```sql
create stream memStream () WITH (TYPE="mqtt", DATASOURCE="demo")
```

2. **Anonymous Connection Resource Managed by Connection Pool**: Some connection types are adapted to the connection
pool management interface, with their lifecycle managed by the connection pool. When a rule containing these types of
connections is started, the rule will obtain an anonymous (actual resource id generated by the rule and not shared)
resource from the connection pool. In the example below, we create a data stream `mqttStream` of type `mqtt`. The
connection is anonymous, and since this type is adapted to the connection pool, we can retrieve this connection via
the connection API. When the rule is deleted, the corresponding connection will also be deleted.

```sql
create stream mqttStream () WITH (TYPE="mqtt", DATASOURCE="demo")
```

3. **User-Created Connection Resource**: Users can add, delete, modify, and query resources via
the [Connection Management API](../../api/restapi/connection.md). Resources created via the API must specify a unique
id, which can be referenced in rules. **Note**: Only connection resources adapted to the connection pool can be
managed via the API. Connections created in this way are independent physical connections that run immediately after
creation and do not depend on rules. They can be shared by multiple rules or multiple sources/sinks.

**Note**: User-created connections are physical connections that will automatically reconnect until the connection is
successful.

### Connection Reuse

User-created connection resources can run independently, and multiple rules can reference this named resource.
Connection reuse is configured through the `connectionSelector` configuration item. Users only need to create the
connection resource once for reuse, improving connection management efficiency and simplifying the configuration
process.

1. **Create Resource**: As shown in the example below, create a connection with id `mqttcon1` via the API. The
connection parameters required can be configured in `props`. After the connection is successfully created, `mqttcon1`
can be found in the connection list API.

```shell
POST http://localhost:9081/connections
{
"id": "mqttcon1"
"typ":"mqtt",
"props": {
server: "tcp://127.0.0.1:1883"
}
}
```

2. **Use in Data Source**: When configuring the MQTT source (`$ekuiper/etc/mqtt_source.yaml`), you can reference the
above connection configuration via `connectionSelector`, for example, both `demo_conf` and `demo2_conf` will
reference the connection configuration of `mqttcon1`.

```yaml
#Override the global configurations
demo_conf: #Conf_key
qos: 0
connectionSelector: mqttcon1
servers: [ tcp://10.211.55.6:1883, tcp://127.0.0.1 ]
#Override the global configurations
demo2_conf: #Conf_key
qos: 0
connentionSelector: mqttcon1
servers: [ tcp://10.211.55.6:1883, tcp://127.0.0.1 ]
```

Based on `demo_conf` and `demo2_conf`, create two data streams `demo` and `demo2`:

```text
demo (
...
) WITH (DATASOURCE="test/", FORMAT="JSON", CONF_KEY="demo_conf");
demo2 (
...
) WITH (DATASOURCE="test2/", FORMAT="JSON", CONF_KEY="demo2_conf");
```

When corresponding rules reference the above data streams, the source parts of the rules will share the connection.
Here, `DATASOURCE` corresponds to the MQTT subscription topic, and the `qos` in the configuration item will be used as
the `Qos` for subscription. In the example configuration above, `demo` subscribes to topic `test/` with Qos 0,
and `demo2` subscribes to topic `test2/` with Qos 0.

::: tip

For MQTT sources, if two streams have the same `DATASOURCE` but different `qos` values, only the rule that starts first
will trigger the subscription.

:::

You can also reuse the defined connection resource in the rule's action via `connentionSelector`.
## Connection Status
Connection status is divided into three types:
1. **Connected**, represented by 1 in metrics.
2. **Connecting**, represented by 0 in metrics.
3. **Disconnected**, represented by 1 in metrics.
Users can retrieve the connection status via the connection API. Additionally, users can view the connection status in
the rule's source/sink metrics, for example, the `source_demo_0_connection_status` metric indicates the connection
status of the `demo` stream. For a complete list of supported connection metrics, please refer to
the [Metrics List](../../operation/usage/monitor_with_prometheus.md#metric-types).
73 changes: 0 additions & 73 deletions docs/en_US/guide/connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,79 +73,6 @@ For specialized data dispatch requirements or integrations with particular platf

[Data templates](./sinks/data_template.md) in eKuiper allow for "secondary processing" of analysis results to cater to the diverse formatting requirements of different sink systems. Utilizing the Golang template system, eKuiper provides mechanisms for dynamic data transformation, conditional outputs, and iterative processing. This ensures compatibility and precise formatting for various sinks.

## Connection Selector

The connector selector is a powerful feature in eKuiper that allows users to define a connection once and reuse it across multiple configurations. It ensures efficient connection management and reduces redundancy.

To define a global connection configuration, use the `connectionSelector` key to name your connection, e.g., `mqtt.localConnection`. Override global configurations with custom configurations but reference the same `connectionSelector`.

For example, consider the configurations `demo_conf` and `demo2_conf`:

```yaml
#Override the global configurations
demo_conf: #Conf_key
qos: 0
connectionSelector: mqtt.localConnection
servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]

#Override the global configurations
demo2_conf: #Conf_key
qos: 0
connentionSelector: mqtt.localConnection
servers: [tcp://10.211.55.6:1883, tcp://127.0.0.1]
```
Both configurations reference the same `connectionSelector`, indicating that they utilize the same MQTT connection. When streams `demo` and `demo2` are defined based on these configurations:

```text
demo (
...
) WITH (DATASOURCE="test/", FORMAT="JSON", CONF_KEY="demo_conf");
demo2 (
...
) WITH (DATASOURCE="test2/", FORMAT="JSON", CONF_KEY="demo2_conf");
```

They inherently share the MQTT connection. Specifically:

- The stream `demo` subscribes to the MQTT topic `test/` with a QoS of 0.
- The stream `demo2` subscribes to `test2/`, also with a QoS of 0.

::: tip

For MQTT sources, if two streams have the same `DATASOURCE` but differing `qos` values, only the rule started first will trigger a subscription.

:::

**Configuration**

The actual connection profiles, like `mqtt.localConnection`, are usually defined in a separate file, such as `connections/connection.yaml`.

Example

```yaml
mqtt:
localConnection: #connection key
server: "tcp://127.0.0.1:1883"
username: ekuiper
password: password
#certificationPath: /var/kuiper/xyz-certificate.pem
#privateKeyPath: /var/kuiper/xyz-private.pem.ke
#insecureSkipVerify: false
#protocolVersion: 3
clientid: ekuiper
cloudConnection: #connection key
server: "tcp://broker.emqx.io:1883"
username: user1
password: password
#certificationPath: /var/kuiper/xyz-certificate.pem
#privateKeyPath: /var/kuiper/xyz-private.pem.ke
#insecureSkipVerify: false
#protocolVersion: 3
```

## Batch Configuration

For advanced data stream processing, eKuiper offers an array of connectors like Memory, File, MQTT, and more. To streamline the integration, eKuiper’s REST API introduces the capability for batch configuration, allowing users to simultaneously import or export multiple configurations.
Expand Down
3 changes: 3 additions & 0 deletions docs/en_US/guide/rules/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,6 @@ When we try to send a record to the stream, the status of the rule is obtained a
```

It can be seen that `records_in_total` and `records_out_total` of each operator have changed from 0 to 1, which means that the operator has received a record and passed a record to the next operator, and finally sent to the `sink` and the `sink` wrote 1 record.

If Prometheus configuration is enabled, these metrics will also be collected by Prometheus. For a complete list of
operational metrics, please refer to the [Metrics List](../../operation/usage/monitor_with_prometheus.md#metric-types).
8 changes: 8 additions & 0 deletions docs/en_US/operation/usage/monitor_with_prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ After version 1.6.1, we added two more exception-related metrics to facilitate t
- last_exception: the error message of the last exception.
- last_exception_time: the time of the last exception.

After version 2.0.0, we added connection-related metrics for source/sink.

- connection_status: Connection status. 1 for connected, 0 for connecting, -1 for disconnected.
- connection_last_connected_time: The last successful connection time.
- connection_last_disconnected_time: The last disconnection time.
- connection_last_disconnected_message: The message of the last disconnection exception.
- connection_last_try_time: The last reconnection attempt time.

The numeric types of these metrics can all be monitored using Prometheus. In the next section we will describe how to configure the Prometheus service in eKuiper.

## Configuring the Prometheus Service in eKuiper
Expand Down
114 changes: 114 additions & 0 deletions docs/zh_CN/guide/connections/overview.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
# 连接管理

Source 和 Sink 用于与外部系统的交互,其中都会包含连接外部资源的动作。本章主要讲解 eKuiper 中对连接的管理。

## 连接类型

不同的连接类型复杂度各不相同,例如 MQTT 长连接需要关注连接的状态,规则运行中可能出现连接的断连,需要自动重连等复杂的管理;而
HTTP 连接默认为无状态的连接,状态管理较为简单。为了统一管理复杂的连接资源创建,复用,自动重连以及获取连接状态等功能,eKuiper
v2 增加了内部的连接池组件,并适配了一系列连接类型:

- MQTT 连接
- Neuron 连接
- EdgeX 连接
- SQL 连接
- HTTP 连接 (包括 REST sink,HTTP Pull source,HTTP push source 使用的连接)
- WebSocket 连接

其余连接类型可能会在后续版本中陆续接入。接入连接池的连接类型可通过 API 进行资源的独立创建,并获取 API。

eKuiper 中对各种连接的生命周期的管理分为 3 种:

1. 连接附属于规则:默认情况下,连接由 Source/Sink 的实现自行管理,其生命周期由使用的规则进行控制。规则启动时,使用到的连接资源才会开始连接;规则结束时,连接将被关闭。在下例中,我们创建了
memory 类型的数据流 memStream。由于该类型未接入连接池,只有当使用该流的规则启动时,才会进行连接。

```sql
create stream memStream () WITH (TYPE="mqtt", DATASOURCE="demo")
```

2. 连接池管理的匿名连接资源:部分连接类型适配了连接池管理接口,其生命周期由连接池管理。当启动包含这些类型连接的规则时,规则会从连接池获取匿名(实际资源
id 由规则生成,且不会被共享)资源。在下例中,我们创建了 mqtt 类型的数据流 mqttStream。连接为匿名连接,由于该类型适配了连接池,我们可以在连接
API 中获取到该连接。规则删除时,对应连接也会删除。

```sql
create stream mqttStream () WITH (TYPE="mqtt", DATASOURCE="demo")
```

3. 用户创建的连接资源:用户可通过[连接管理API](../../api/restapi/connection.md) 进行资源的增删改查。通过 API 创建的资源必须指定唯一的
id,规则中可引用此处创建的规则资源。**请注意**:只有适配连接池的连接资源可通过 API
进行管理。这种连接类型创建的连接为独立的物理连接,创建完后会立即运行,无需依附于规则。它可以被多个规则,或者多个
source/sink 共用。

**请注意**:用户创建的连接为实体连接,会自动重连直到连接成功为止。

### 连接重用

用户创建的连接资源可以独立运行,多个规则可以引用该命名资源。连接重用是通过 `connectionSelector`
配置项进行配置。用户只需创建一次连接资源即可复用,提升连接管理效率,简化配置流程。

1. 创建资源,如下例通过 API 创建 id 为 mqttcon1 的连接。可在 props 中配置连接需要的参数。连接创建成功后,mqttcon1 可在连接列表
API 中找到。

```shell
POST http://localhost:9081/connections
{
"id": "mqttcon1"
"typ":"mqtt",
"props": {
server: "tcp://127.0.0.1:1883"
}
}
```

2. 在数据源中使用。在配置 MQTT 源(`$ekuiper/etc/mqtt_source.yaml`)时,可通过 `connectionSelector`
引用以上连接配置,例如`demo_conf``demo2_conf` 都将引用 `mqttcon1` 的连接配置。

```yaml
#Override the global configurations
demo_conf: #Conf_key
qos: 0
connectionSelector: mqttcon1
servers: [ tcp://10.211.55.6:1883, tcp://127.0.0.1 ]
#Override the global configurations
demo2_conf: #Conf_key
qos: 0
connentionSelector: mqttcon1
servers: [ tcp://10.211.55.6:1883, tcp://127.0.0.1 ]
```

基于 `demo_conf``demo2_conf` 分别创建两个数据流 `demo``demo2`

```text
demo (
...
) WITH (DATASOURCE="test/", FORMAT="JSON", CONF_KEY="demo_conf");
demo2 (
...
) WITH (DATASOURCE="test2/", FORMAT="JSON", CONF_KEY="demo2_conf");
```

当相应的规则分别引用以上数据流时,规则之间的源部分将共享连接。在这里 `DATASOURCE` 对应 mqtt 订阅的 topic,配置项中的 `qos`
将用作订阅时的 `Qos`。在以上示例配置中,`demo` 以 Qos 0 订阅 topic `test/``demo2` 以 Qos 0 订阅 topic `test2/`

::: tip

对于MQTT源,如果两个流具有相同的 `DATASOURCE``qos` 值不同,则只有先启动的规则才会触发订阅。

:::

也可以在规则的 action 中,通过 connentionSelector 重用定义的连接资源。

## 连接状态

连接状态分成 3 种:

1. 已连接,指标中用 1 表示。
2. 连接中,指标中用 0 表示。
3. 未连接,指标中用 1 表示。

用户可通过连接 API 获取连接的状态。同时,用户也可通过规则的指标查看规则 source/sink
中连接的状态,例如 `source_demo_0_connection_status` 指标表示 demo
流的连接状态。所有支持的连接指标请查看[指标列表](../../operation/usage/monitor_with_prometheus.md#运行指标)。
Loading

0 comments on commit 702dcf7

Please sign in to comment.