Skip to content

Commit

Permalink
for shangfei
Browse files Browse the repository at this point in the history
  • Loading branch information
iznauy committed Aug 23, 2021
1 parent c886894 commit 987d674
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 11 deletions.
13 changes: 3 additions & 10 deletions conf/config.properties
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ username=root
password=root

# 时序数据库列表,使用','分隔不同实例
storageEngineList=127.0.0.1#6667#iotdb#username=root#password=root#sessionPoolSize=100#edgeName=edge1,127.0.0.1#6668#iotdb#username=root#password=root#sessionPoolSize=100#edgeName=,127.0.0.1#6669#iotdb#username=root#password=root#sessionPoolSize=100#edgeName=
#storageEngineList=127.0.0.1#8086#influxdb#url=http://localhost:8086/
storageEngineList=127.0.0.1#6667#iotdb#username=root#password=root#sessionPoolSize=30#edgeName=edge1

# 异步请求最大重复次数
maxAsyncRetryTimes=3
Expand All @@ -36,12 +35,6 @@ databaseClassNames=iotdb=cn.edu.tsinghua.iginx.iotdb.IoTDBPlanExecutor,influxdb=
# 策略类名
policyClassName=cn.edu.tsinghua.iginx.policy.cloud.EdgeCloudCollaborationPolicy

# 统计信息收集类
# statisticsCollectorClassName=cn.edu.tsinghua.iginx.statistics.StatisticsCollector

# 统计信息打印间隔,单位毫秒
# statisticsLogInterval=1000

####################
### Rest 服务配置
####################
Expand Down Expand Up @@ -87,7 +80,7 @@ mqtt_port=1883

mqtt_handler_pool_size=1

mqtt_payload_formatter=cn.edu.tsinghua.iginx.mqtt.JsonPayloadFormatter
mqtt_payload_formatter=cn.edu.tsinghua.iginx.mqtt.ShangFeiPayloadFormatter

mqtt_max_message_size=1048576

Expand All @@ -99,7 +92,7 @@ mqtt_max_message_size=1048576
enable_edge_cloud_collaboration=true

# iginx 是否为边缘端,默认为非边缘端
is_edge=false
is_edge=true

# 边缘端名字,所有由边缘端写入的序列均包含该前缀;非边缘端不需要填写
<<<<<<< HEAD
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package cn.edu.tsinghua.iginx.mqtt;

import cn.edu.tsinghua.iginx.conf.Config;
import cn.edu.tsinghua.iginx.conf.ConfigDescriptor;
import cn.edu.tsinghua.iginx.thrift.DataType;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import io.netty.buffer.ByteBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

public class ShangFeiPayloadFormatter implements IPayloadFormatter {

private static final Gson gson = new GsonBuilder().create();

private static final Logger logger = LoggerFactory.getLogger(ShangFeiPayloadFormatter.class);

private final Config config = ConfigDescriptor.getInstance().getConfig();

@Override
public List<Message> format(ByteBuf payload) {
if (payload == null) {
return null;
}
String txt = payload.toString(StandardCharsets.UTF_8);
JsonArray jsonArray = gson.fromJson(txt, JsonArray.class);
List<Message> messages = new ArrayList<>();

for (int i = 0; i < jsonArray.size(); i++) {
JsonObject jsonObject = jsonArray.get(i).getAsJsonObject();
JsonObject metadata = jsonObject.get("metadata").getAsJsonObject();
JsonElement value = jsonObject.get("value");

String displayName = metadata.get("displayName").getAsString();
String path = metadata.get("path").getAsString().replace('/', '.').substring(1) + jsonObject.get("name").getAsString() + "@" + displayName;
if (config.isEnableEdgeCloudCollaboration() && config.isEdge() && !config.getEdgeName().equals("")) {
path = config.getEdgeName() + "." + path;
}
long timestamp = jsonObject.get("timestamp").getAsLong();

Message message = new Message();
message.setPath(path);
message.setTimestamp(timestamp);

String dataType = metadata.get("dataType").getAsString();
switch (dataType) {
case "Boolean":
message.setValue(value.getAsBoolean());
message.setDataType(DataType.BOOLEAN);
break;
case "Char":
message.setValue(value.getAsString().getBytes(StandardCharsets.UTF_8));
message.setDataType(DataType.BINARY);
break;
case "Byte":
message.setValue(value.getAsInt());
message.setDataType(DataType.INTEGER);
break;
default:
logger.warn("unknown datatype of mqtt: " + dataType);
}
messages.add(message);
}
return messages;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ private List<Long> selectStorageEngines(List<Pair<Long, Integer>> storageEngineF
storageEngineFragmentCounts = storageEngineFragmentCounts.stream().sorted(Comparator.comparingInt(o -> o.v)).collect(Collectors.toList());
List<Long> storageEngines = new ArrayList<>();
for (int i = 0; i < count; i++) {
if (i > storageEngineFragmentCounts.size()) {
if (i >= storageEngineFragmentCounts.size()) {
break;
}
storageEngines.add(storageEngineFragmentCounts.get(i).k);
Expand Down

0 comments on commit 987d674

Please sign in to comment.