提交 942b1359 authored 作者: 宋勇's avatar 宋勇

添加mqtt,opcua

上级 3474b595
......@@ -960,6 +960,27 @@
<pluginManagement>
<plugins>
<plugin>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-maven-plugin</artifactId>
<version>4.5.3.0</version>
<dependencies>
<dependency>
<groupId>com.github.spotbugs</groupId>
<artifactId>spotbugs-annotations</artifactId>
<version>4.6.0</version>
</dependency>
</dependencies>
<executions>
<execution>
<id>check</id>
<goals>
<goal>check</goal>
</goals>
<phase>package</phase>
</execution>
</executions>
</plugin>
<!-- java compiler (Start) -->
<!-- <plugin>-->
......
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>datasource-mqtt</artifactId>
<properties>
<kafka.client.version>3.2.0</kafka.client.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>common-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.auto.service/auto-service -->
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<scope>provided</scope>
</dependency>
<!--MQTT-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>6.1.3</version>
</dependency>
<!--MQTT-->
</dependencies>
</project>
package org.apache.seatunnel.datasource.plugin.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* MQTT回调函数
*
* @author Mr.Qu
* @since 2020/11/18
*/
@Slf4j
public class InitCallback implements MqttCallback {
private List<MqttMessage> mqttMessageList;
public List<MqttMessage> getMqttMessageList() {
return mqttMessageList;
}
public void setMqttMessageList(List<MqttMessage> mqttMessageList) {
this.mqttMessageList = mqttMessageList;
}
/** MQTT 断开连接会执行此方法 */
@Override
public void connectionLost(Throwable cause) {
log.error(cause.getMessage(), cause);
}
/** publish发布成功后会执行到这里 */
@Override
public void deliveryComplete(IMqttDeliveryToken token) {}
/** subscribe订阅后得到的消息会执行到这里 回调 */
@Override
public void messageArrived(String topic, MqttMessage message) {
// 回调
log.info("[{}] : {}", topic, new String(message.getPayload()));
mqttMessageList.add(message);
// deviceService
// .updateDeviceStatus(new
// Device().setUsername("qbb").setTs(System.currentTimeMillis()));
/*try {
JSONObject jsonObject = JSON.parseObject(msg);
String clientId = String.valueOf(jsonObject.get("clientid"));
if (topic.endsWith("/disconnected")) {
log.info("客户端已掉线:{}", clientId);
} else {
log.info("客户端已上线:{}", clientId);
}
} catch (JSONException e) {
log.error("JSON Format Parsing Exception : {}", msg);
}*/
}
}
package org.apache.seatunnel.datasource.plugin.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import lombok.Data;
/** @Author Heartsuit @Date 2022-12-11 */
@Data
public class MqttClientService {
private String HOST;
private Integer PORT;
private final String clientId = "Client" + (int) (Math.random() * 100000000);
private MqttClient mqttClient;
private String TOPIC;
public MqttClientService(
String host,
Integer port,
String username,
String password,
String topic,
MqttCallback mqttCallback)
throws MqttException {
this.HOST = host;
this.PORT = port;
setMqttClient(username, password, topic, mqttCallback);
}
public MqttClient getMqttClient(
String host,
Integer port,
String username,
String password,
String topic,
MqttCallback mqttCallback)
throws MqttException {
this.HOST = host;
this.PORT = port;
setMqttClient(username, password, topic, mqttCallback);
return mqttClient;
}
/**
* 客户端connect连接mqtt服务器
*
* @param username 用户名
* @param password 密码
* @param mqttCallback 回调函数
*/
public void setMqttClient(
String username, String password, String topic, MqttCallback mqttCallback)
throws MqttException {
MqttConnectOptions options = mqttConnectOptions(username, password);
this.TOPIC = topic;
/*if (mqttCallback == null) {
mqttClient.setCallback(new Callback());
} else {
}*/
mqttClient.setCallback(mqttCallback);
mqttClient.connect(options);
}
public void setHOST(String host) {
this.HOST = host;
}
public String getTopic() {
MqttTopic topic = mqttClient.getTopic(TOPIC);
return topic.getName();
}
/** MQTT连接参数设置 */
private MqttConnectOptions mqttConnectOptions(String userName, String passWord)
throws MqttException {
String url = "tcp://" + HOST + ":" + PORT;
mqttClient = new MqttClient(url, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10); // /默认:30
options.setAutomaticReconnect(true); // 默认:false
options.setCleanSession(false); // 默认:true
// options.setKeepAliveInterval(20);//默认:60
return options;
}
/** 关闭MQTT连接 */
public void close() throws MqttException {
mqttClient.close();
mqttClient.disconnect();
}
/** 向某个主题发布消息 默认qos:1 */
public void pub(String topic, String msg) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
// mqttMessage.setQos(2);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
public void pub(String msg) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
// mqttMessage.setQos(2);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(TOPIC);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 向某个主题发布消息
*
* @param topic: 发布的主题
* @param msg: 发布的消息
* @param qos: 消息质量 Qos:0、1、2
*/
public void pub(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
public void pub(String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(TOPIC);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 订阅某一个主题 ,此方法默认的的Qos等级为:1
*
* @param topic 主题
*/
public void sub(String topic) throws MqttException {
mqttClient.subscribe(topic);
}
public void sub() throws MqttException {
mqttClient.subscribe(TOPIC);
}
/**
* 订阅某一个主题,可携带Qos
*
* @param topic 所要订阅的主题
* @param qos 消息质量:0、1、2
*/
public void sub(String topic, int qos) throws MqttException {
mqttClient.subscribe(topic, qos);
}
public void sub(int qos) throws MqttException {
mqttClient.subscribe(TOPIC, qos);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.mqtt;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.alibaba.fastjson2.JSONObject;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
@Slf4j
public class MqttDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default";
private InitCallback initCallback;
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return MqttOptionRule.optionRule();
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return MqttOptionRule.metadataRule();
}
@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> option) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
String topic = requestParams.get("topic");
if (StringUtils.isNotBlank(topic)) {
return Arrays.asList(topic);
} else if (StringUtils.isNotBlank(database)) {
return Arrays.asList(database);
} else {
return new ArrayList<>();
}
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
String suffix = requestParams.get("topic");
if (StringUtils.isNotEmpty(suffix)) {
return Arrays.asList(suffix);
}
return DEFAULT_DATABASES;
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
if (requestParams.isEmpty()) {
throw new SeaTunnelException("requestParmas 为空!");
}
try {
MqttClientService mqttClient = createMqttClient(requestParams);
// just test the connection
mqttClient.getMqttClient().connect();
return StringUtils.isNotEmpty(mqttClient.getTopic());
} catch (Exception ex) {
throw new DataSourcePluginException(
"check mqtt connectivity failed, " + ex.getMessage(), ex);
}
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
try {
List<TableField> tableFields = new ArrayList<>();
MqttClientService mqttClient = createMqttClient(requestParams);
// just test the connection
mqttClient.getMqttClient().connect();
List<MqttMessage> mqttMessageList = initCallback.getMqttMessageList();
if (CollectionUtils.isNotEmpty(mqttMessageList)) {
MqttMessage mqttMessage = mqttMessageList.get(0);
String s = new String(mqttMessage.getPayload());
JSONObject jsonObject = JSONObject.parseObject(s);
// 获取所有key的迭代器
Iterator<String> keys = jsonObject.keySet().iterator();
while (keys.hasNext()) {
String key = keys.next();
Object value = jsonObject.get(key);
TableField tableField = new TableField();
tableField.setName(key);
tableField.setType(value.getClass().getTypeName());
tableFields.add(tableField);
}
return tableFields;
}
return Collections.emptyList();
} catch (Exception ex) {
throw new DataSourcePluginException(
"check mqtt getTableFields failed, " + ex.getMessage(), ex);
}
}
@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
if (CollectionUtils.isEmpty(tables)) {
return Collections.emptyMap();
}
Map<String, List<TableField>> map = new HashMap<>();
for (String table : tables) {
map.put(table, getTableFields(pluginName, requestParams, database, table));
}
return map;
}
private MqttClientService createMqttClient(Map<String, String> requestParams) {
String host = requestParams.get("host") + "";
String port = requestParams.get("port") + "";
String username = requestParams.get("username") + "";
String password = requestParams.get("password") + "";
String topic = requestParams.get("topic") + "";
initCallback = new InitCallback();
try {
MqttClientService mqttClientService =
new MqttClientService(
host, Integer.parseInt(port), username, password, topic, initCallback);
return mqttClientService;
} catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.mqtt;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set;
@AutoService(DataSourceFactory.class)
public class MqttDataSourceFactory implements DataSourceFactory {
public static final String MQTT_PLUGIN_NAME = "Mqtt";
public static final String MQTT_PLUGIN_ICON = "mqtt";
public static final String MQTT_PLUGIN_VERSION = "1.0.0";
@Override
public String factoryIdentifier() {
return MQTT_PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
return Sets.newHashSet(
DataSourcePluginInfo.builder()
.name(MQTT_PLUGIN_NAME)
.icon(MQTT_PLUGIN_ICON)
.version(MQTT_PLUGIN_VERSION)
.supportVirtualTables(false)
.type(DatasourcePluginTypeEnum.NO_STRUCTURED.getCode())
.build());
}
@Override
public DataSourceChannel createChannel() {
return new MqttDataSourceChannel();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.mqtt;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
public class MqttOptionRule {
public static final Option<String> HOST =
Options.key("host")
.stringType()
.noDefaultValue()
.withDescription("mqtt server address, separated by \",\".");
public static final Option<Integer> PORT =
Options.key("port")
.intType()
.noDefaultValue()
.withDescription("mqtt server port, separated by \",\".");
public static final Option<String> TOPIC =
Options.key("topic")
.stringType()
.noDefaultValue()
.withDescription(
"Kafka topic name. If there are multiple topics, use , to split, for example: \"tpc1,tpc2\".");
public static final Option<String> USERNAME =
Options.key("username")
.stringType()
.noDefaultValue()
.withDescription("mqtt server username");
public static final Option<String> PASSWORD =
Options.key("password")
.stringType()
.noDefaultValue()
.withDescription("mqtt server password");
public static OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT).optional(USERNAME, PASSWORD).build();
}
public static OptionRule metadataRule() {
return OptionRule.builder().required(TOPIC).build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.mqtt;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import java.util.Map;
import java.util.Properties;
import static com.google.common.base.Preconditions.checkArgument;
public class MqttRequestParamsUtils {
public static Properties parsePropertiesFromRequestParams(Map<String, String> requestParams) {
checkArgument(
requestParams.containsKey(MqttOptionRule.HOST.key()),
String.format("Missing %s in requestParams", MqttOptionRule.HOST.key()));
final Properties properties = new Properties();
properties.put(MqttOptionRule.HOST.key(), requestParams.get(MqttOptionRule.HOST.key()));
if (requestParams.containsKey(MqttOptionRule.PORT.key())) {
Config configObject =
ConfigFactory.parseString(requestParams.get(MqttOptionRule.PORT.key()));
configObject
.entrySet()
.forEach(
entry -> {
properties.put(
entry.getKey(), entry.getValue().unwrapped().toString());
});
}
return properties;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>datasource-opcua</artifactId>
<properties>
<kafka.client.version>3.2.0</kafka.client.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>common-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.auto.service/auto-service -->
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>sdk-client</artifactId>
<version>0.6.8</version>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.milo.opcua.stack.core.NamespaceTable;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UShort;
import com.alibaba.fastjson2.JSONObject;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
@Slf4j
public class OpcuaDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default";
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return OpcuaOptionRule.optionRule();
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return OpcuaOptionRule.metadataRule();
}
@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> option) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
try {
if (requestParams.isEmpty()) {
return new ArrayList<>();
}
OpcUaClientService opcClient = createOpcClient(requestParams);
if (Objects.nonNull(opcClient)) {
String ns = requestParams.get("ns") + "";
String id = requestParams.get("id") + "";
NodeId nodeId = new NodeId(UShort.valueOf(ns), id);
return Arrays.asList(nodeId.toString());
}
return new ArrayList<>();
} catch (Exception ex) {
throw new DataSourcePluginException(
"check kafka connectivity failed, " + ex.getMessage(), ex);
}
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
String suffix = requestParams.get("suffix");
if (StringUtils.isNotEmpty(suffix)) {
return Arrays.asList(suffix);
}
return DEFAULT_DATABASES;
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
try {
OpcUaClientService opcClient = createOpcClient(requestParams);
// just test the connection
NamespaceTable namespaceTable = opcClient.getClient().getNamespaceTable();
return Objects.nonNull(namespaceTable);
} catch (Exception ex) {
throw new DataSourcePluginException(
"check kafka connectivity failed, " + ex.getMessage(), ex);
}
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
try {
List<TableField> tableFields = new ArrayList<>();
OpcUaClientService opcClient = createOpcClient(requestParams);
NodeId nodeId = null;
// just test the connection
if (StringUtils.isNotEmpty(table)) {
try {
JSONObject jsonObject = JSONObject.parseObject(table);
UShort uShort = UShort.valueOf(jsonObject.getString("ns"));
Object id = jsonObject.get("id");
if (id instanceof Integer) {
nodeId = new NodeId(uShort, (Integer) id);
} else {
nodeId = new NodeId(uShort, id + "");
}
} catch (Exception e) {
String[] split =
table.replace("NodeId{", "")
.replace("{", "")
.replace("}", "")
.split(",");
UShort uShort = null;
String id = "";
for (String s : split) {
String[] split1 = s.split(":|=");
if (split1.length == 2) {
if (split1[0].toLowerCase().indexOf("ns") > -1) {
uShort = UShort.valueOf(split1[1]);
}
if (split1[0].toLowerCase().indexOf("id") > -1) {
id = split1[1];
}
}
}
if (uShort != null && StringUtils.isNotEmpty(id)) {
try {
// 如果是int
Integer idn = Integer.parseInt(id);
nodeId = new NodeId(uShort, idn);
} catch (Exception ex) {
nodeId = new NodeId(uShort, id);
}
}
}
}
if (Objects.isNull(nodeId)) {
return new ArrayList<>();
}
Object objects = opcClient.readNodeObject(nodeId);
try {
JSONObject jsonObject = JSONObject.parseObject(objects.toString());
// 获取所有key的迭代器
Iterator<String> keys = jsonObject.keySet().iterator();
while (keys.hasNext()) {
String key = keys.next();
Object value = jsonObject.get(key);
TableField tableField = new TableField();
tableField.setName(key);
tableField.setType(value.getClass().getTypeName());
tableFields.add(tableField);
}
return tableFields;
} catch (Exception e) {
//不是json
TableField tableField = new TableField();
tableField.setName(nodeId.getIdentifier() + "");
tableField.setType("String");
tableFields.add(tableField);
return tableFields;
}
} catch (Exception ex) {
throw new DataSourcePluginException(
"check mqtt getTableFields failed, " + ex.getMessage(), ex);
}
}
@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
Map<String, List<TableField>> map = new HashMap<>();
if (CollectionUtils.isNotEmpty(tables)) {
for (String table : tables) {
map.put(table, getTableFields(pluginName, requestParams, database, table));
}
}
return map;
}
private OpcUaClientService createOpcClient(Map<String, String> requestParams) {
String host = requestParams.get("host") + "";
String port = requestParams.get("port") + "";
String suffix = requestParams.get("suffix") + "";
try {
OpcUaClientService opcUaClientService =
new OpcUaClientService(host, Integer.parseInt(port), suffix);
return opcUaClientService;
} catch (Exception e) {
throw new DataSourcePluginException("OpcUa插件 创建链接错误!", e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set;
@AutoService(DataSourceFactory.class)
public class OpcuaDataSourceFactory implements DataSourceFactory {
public static final String OPCUA_PLUGIN_NAME = "Opcua";
public static final String OPCUA_PLUGIN_ICON = "opcua";
public static final String OPCUA_PLUGIN_VERSION = "1.0.0";
@Override
public String factoryIdentifier() {
return OPCUA_PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
return Sets.newHashSet(
DataSourcePluginInfo.builder()
.name(OPCUA_PLUGIN_NAME)
.icon(OPCUA_PLUGIN_ICON)
.version(OPCUA_PLUGIN_VERSION)
.supportVirtualTables(false)
.type(DatasourcePluginTypeEnum.NO_STRUCTURED.getCode())
.build());
}
@Override
public DataSourceChannel createChannel() {
return new OpcuaDataSourceChannel();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
public class OpcuaOptionRule {
public static final Option<String> HOST =
Options.key("host").stringType().noDefaultValue().withDescription("socket host");
public static final Option<Integer> PORT =
Options.key("port").intType().noDefaultValue().withDescription("socket port");
public static final Option<String> SUFFIX =
Options.key("suffix").stringType().noDefaultValue().withDescription("suffix");
public static final Option<Integer> NS =
Options.key("ns").intType().noDefaultValue().withDescription("ns");
public static final Option<String> ID =
Options.key("id").stringType().noDefaultValue().withDescription("id");
public static final Option<String> TYPE =
Options.key("type").stringType().defaultValue("int").withDescription("type");
public static OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT, SUFFIX).optional(TYPE).build();
}
public static OptionRule metadataRule() {
return OptionRule.builder().required(NS, ID).optional(TYPE).build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import java.util.Map;
import java.util.Properties;
import static com.google.common.base.Preconditions.checkArgument;
public class OpcuaRequestParamsUtils {
public static Properties parsePropertiesFromRequestParams(Map<String, String> requestParams) {
checkArgument(
requestParams.containsKey(OpcuaOptionRule.HOST.key()),
String.format("Missing %s in requestParams", OpcuaOptionRule.HOST.key()));
final Properties properties = new Properties();
properties.put(OpcuaOptionRule.HOST.key(), requestParams.get(OpcuaOptionRule.HOST.key()));
if (requestParams.containsKey(OpcuaOptionRule.PORT.key())) {
Config configObject =
ConfigFactory.parseString(requestParams.get(OpcuaOptionRule.PORT.key()));
configObject
.entrySet()
.forEach(
entry -> {
properties.put(
entry.getKey(), entry.getValue().unwrapped().toString());
});
}
return properties;
}
}
......@@ -45,6 +45,8 @@
<module>datasource-s3</module>
<module>datasource-sqlserver-cdc</module>
<module>datasource-jdbc-tidb</module>
<module>datasource-mqtt</module>
<module>datasource-opcua</module>
</modules>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论