提交 390785bd authored 作者: 胡伟's avatar 胡伟

opc ua kepserver代码提交

上级 528d6584
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>datasource-opcua-kepserver</artifactId>
<properties>
<kafka.client.version>3.2.0</kafka.client.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>common-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.auto.service/auto-service -->
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>sdk-client</artifactId>
<version>0.2.4</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcpkix-jdk15on</artifactId>
<version>1.57</version>
</dependency>
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>sdk-server</artifactId>
<version>0.2.4</version>
</dependency>
</dependencies>
</project>
...@@ -39,7 +39,7 @@ class KeyStoreLoader { ...@@ -39,7 +39,7 @@ class KeyStoreLoader {
private X509Certificate clientCertificate; private X509Certificate clientCertificate;
private KeyPair clientKeyPair; private KeyPair clientKeyPair;
KeyStoreLoader load(Path baseDir, String endPoint) throws Exception { public KeyStoreLoader load(Path baseDir, String endPoint) throws Exception {
KeyStore keyStore = KeyStore.getInstance("PKCS12"); KeyStore keyStore = KeyStore.getInstance("PKCS12");
Path serverKeyStore = baseDir.resolve("example-client.pfx"); Path serverKeyStore = baseDir.resolve("example-client.pfx");
...@@ -104,7 +104,7 @@ class KeyStoreLoader { ...@@ -104,7 +104,7 @@ class KeyStoreLoader {
return this; return this;
} }
X509Certificate getClientCertificate() { public X509Certificate getClientCertificate() {
return clientCertificate; return clientCertificate;
} }
...@@ -112,7 +112,7 @@ class KeyStoreLoader { ...@@ -112,7 +112,7 @@ class KeyStoreLoader {
return clientCertificateChain; return clientCertificateChain;
} }
KeyPair getClientKeyPair() { public KeyPair getClientKeyPair() {
return clientKeyPair; return clientKeyPair;
} }
} }
package org.apache.seatunnel.datasource.plugin.opcua;
import org.apache.commons.collections4.CollectionUtils;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
import org.eclipse.milo.opcua.sdk.client.api.nodes.Node;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
import org.eclipse.milo.opcua.stack.client.UaTcpStackClient;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.UaException;
import org.eclipse.milo.opcua.stack.core.types.builtin.*;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UInteger;
import org.eclipse.milo.opcua.stack.core.types.enumerated.MonitoringMode;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoredItemCreateRequest;
import org.eclipse.milo.opcua.stack.core.types.structured.MonitoringParameters;
import org.eclipse.milo.opcua.stack.core.types.structured.ReadValueId;
import lombok.Data;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Data
public class OpcUaClientService {
// 批量订阅namespaceIndex默认为2
private int batchNamespaceIndex = 2;
// 批量订阅时的identifiers
private List<String> batchIdentifiers;
private OpcUaClient client;
private String host;
private Integer port;
private String suffix;
public OpcUaClientService(String host, Integer port, String suffix) throws Exception {
this.host = host;
this.port = port;
this.suffix = suffix;
connectOpcUaServer(host, port, suffix);
}
/**
* 创建OPC UA客户端
*
* @param host
* @param port
* @param suffix
* @return
* @throws Exception
*/
public OpcUaClient connectOpcUaServer(String host, Integer port, String suffix)
throws Exception {
String endPointUrl = "opc.tcp://" + host + ":" + port + suffix;
Path securityTempDir = Paths.get(System.getProperty("java.io.tmpdir"), "security");
Files.createDirectories(securityTempDir);
if (!Files.exists(securityTempDir)) {
throw new Exception("unable to create security dir: " + securityTempDir);
}
KeyStoreLoader loader =
new KeyStoreLoader().load(securityTempDir, endPointUrl); // 创建OpcUA的访问证书类对象
// 获取OPC UA的服务器端节点
EndpointDescription[] endpoints = UaTcpStackClient.getEndpoints(endPointUrl).get();
// 过滤掉不需要的安全策略,选择一个自己需要的安全策略
EndpointDescription endpoint =
Arrays.stream(endpoints)
.filter(e -> e.getEndpointUrl().equals(endPointUrl))
.findFirst()
.orElseThrow(() -> new Exception("没有节点返回"));
// 设置OPC UA的配置信息
OpcUaClientConfig config =
OpcUaClientConfig.builder()
.setApplicationUri(endPointUrl) // 放入url
.setCertificate(
loader
.getClientCertificate()) // 需要传入一个数字证书作为形参,我们用KeyStoreLoader类创建了
.setKeyPair(loader.getClientKeyPair()) // 传入密匙对
.setEndpoint(endpoint) // EndpointDescription对象,就是设置刚刚选择的节点就可以了
// 使用匿名登录方式
.setIdentityProvider(new AnonymousProvider())
.setRequestTimeout(UInteger.valueOf(10 * 1000)) // 设置请求超时时间,单位为毫秒。
.build();
// 创建OPC UA客户端
client = new OpcUaClient(config);
Thread.sleep(2000); // 线程休眠一下再返回对象,给创建过程一个时间。
return client;
}
/**
* 遍历树形节点
*
* @param uaNode 节点
* @throws Exception
*/
public List listNode(Node uaNode, List<Node> nodes) throws Exception {
List<Node> nodesSub;
if (uaNode == null) {
nodesSub = client.getAddressSpace().browse(Identifiers.ObjectsFolder).get();
} else {
nodesSub = client.getAddressSpace().browseNode(uaNode).get();
}
if (CollectionUtils.isNotEmpty(nodesSub)) {
for (Node nd : nodesSub) {
nodes.add(nd);
// 排除系统行性节点,这些系统性节点名称一般都是以"_"开头
if (Objects.requireNonNull(nd.getBrowseName().get(10, TimeUnit.SECONDS).getName())
.contains("_")) {
continue;
}
System.out.println(
"Node= "
+ nd.getBrowseName().get(10, TimeUnit.SECONDS).getName()
+ ",id="
+ nd.getNodeId());
listNode(nd, nodes);
}
}
return nodes;
}
/**
* 读取节点数据
*
* <p>namespaceIndex可以通过UaExpert客户端去查询,一般来说这个值是2。
* identifier也可以通过UaExpert客户端去查询,这个值=通道名称.设备名称.标记名称
*
* @param namespaceIndex
* @param identifier
* @throws Exception
*/
public Object[] readNodeValue(int namespaceIndex, String identifier) throws Exception {
// 节点
NodeId nodeId = new NodeId(namespaceIndex, identifier);
// 读取节点数据
DataValue value = client.readValue(0.0, TimestampsToReturn.Neither, nodeId).get();
// 状态
System.out.println("Status: " + value.getStatusCode());
// 标识符
String id = String.valueOf(nodeId.getIdentifier());
System.out.println(id + ": " + value.getValue().getValue());
Object ob = value.getValue().getValue();
Object[] objects = new Object[] {null};
if (ob != null) {
if (ob instanceof DateTime) {
LocalDateTime localDateTime =
((DateTime) ob)
.getJavaDate()
.toInstant()
.atOffset(ZoneOffset.ofHours(8))
.toLocalDateTime();
objects = new Object[] {localDateTime + ""};
} else {
objects = new Object[] {ob + ""};
}
}
return objects;
}
public Object[] readNodeValue(int namespaceIndex, Integer identifier) throws Exception {
// 节点
NodeId nodeId = new NodeId(namespaceIndex, identifier);
// 读取节点数据
DataValue value = client.readValue(0.0, TimestampsToReturn.Neither, nodeId).get();
// 状态
System.out.println("Status: " + value.getStatusCode());
// 标识符
String id = String.valueOf(nodeId.getIdentifier());
System.out.println(id + ": " + value.getValue().getValue());
Object ob = value.getValue().getValue();
Object[] objects = new Object[] {null};
if (ob != null) {
if (ob instanceof DateTime) {
LocalDateTime localDateTime =
((DateTime) ob)
.getJavaDate()
.toInstant()
.atOffset(ZoneOffset.ofHours(8))
.toLocalDateTime();
objects = new Object[] {localDateTime + ""};
} else {
objects = new Object[] {ob + ""};
}
}
return objects;
}
/**
* 读取指定节点的值的重载方法
*
* @param nodeId
* @throws Exception
*/
public Object[] readNodeValue(NodeId nodeId) throws Exception {
// 读取节点数据
DataValue value = client.readValue(0.0, TimestampsToReturn.Neither, nodeId).get();
// 状态
System.out.println("Status: " + value.getStatusCode());
// 标识符
String id = String.valueOf(nodeId.getIdentifier());
System.out.println(nodeId);
System.out.println(id + ": " + value.getValue().getValue());
Object ob = value.getValue().getValue();
Object[] objects = new Object[] {null};
if (ob != null) {
if (ob instanceof DateTime) {
LocalDateTime localDateTime =
((DateTime) ob)
.getJavaDate()
.toInstant()
.atOffset(ZoneOffset.ofHours(8))
.toLocalDateTime();
objects = new Object[] {localDateTime + ""};
} else {
objects = new Object[] {ob + ""};
}
}
return objects;
}
/**
* 读取节点数据
*
* <p>namespaceIndex可以通过UaExpert客户端去查询,一般来说这个值是2。
* identifier也可以通过UaExpert客户端去查询,这个值=通道名称.设备名称.标记名称
*
* @param namespaceIndex
* @param identifier
* @throws Exception
*/
public Object readNodeObject(int namespaceIndex, String identifier) throws Exception {
// 节点
NodeId nodeId = new NodeId(namespaceIndex, identifier);
// 读取节点数据
DataValue value = client.readValue(0.0, TimestampsToReturn.Neither, nodeId).get();
// 状态
System.out.println("Status: " + value.getStatusCode());
// 标识符
String id = String.valueOf(nodeId.getIdentifier());
System.out.println(id + ": " + value.getValue().getValue());
return value.getValue().getValue();
}
public Object readNodeObject(int namespaceIndex, Integer identifier) throws Exception {
// 节点
NodeId nodeId = new NodeId(namespaceIndex, identifier);
// 读取节点数据
DataValue value = client.readValue(0.0, TimestampsToReturn.Neither, nodeId).get();
// 状态
System.out.println("Status: " + value.getStatusCode());
// 标识符
String id = String.valueOf(nodeId.getIdentifier());
System.out.println(id + ": " + value.getValue().getValue());
Object value1 = value.getValue().getValue();
return value1;
}
/**
* 读取指定节点的值的重载方法
*
* @param nodeId
* @throws Exception
*/
public Object readNodeObject(NodeId nodeId) throws Exception {
// 读取节点数据
DataValue value = client.readValue(0.0, TimestampsToReturn.Neither, nodeId).get();
// 状态
System.out.println("Status: " + value.getStatusCode());
// 标识符
String id = String.valueOf(nodeId.getIdentifier());
System.out.println(nodeId);
System.out.println(id + ": " + value.getValue().getValue());
return value.getValue().getValue();
}
/**
* 写入节点数据
*
* @param namespaceIndex
* @param identifier
* @param value
* @throws Exception
*/
public boolean writeNodeValue(int namespaceIndex, String identifier, Float value)
throws Exception {
// 节点
NodeId nodeId = new NodeId(namespaceIndex, identifier);
// 创建数据对象,此处的数据对象一定要定义类型,不然会出现类型错误,导致无法写入
DataValue newValue = new DataValue(new Variant(value), null, null);
// 写入节点数据
StatusCode statusCode = client.writeValue(nodeId, newValue).join();
System.out.println("结果:" + statusCode.isGood());
return true;
}
/**
* 订阅(单个)
*
* @param namespaceIndex
* @param identifier
* @throws Exception
*/
private static final AtomicInteger atomic = new AtomicInteger();
public void subscribe(int namespaceIndex, String identifier) throws Exception {
// 创建发布间隔1000ms的订阅对象
AtomicInteger atomic = new AtomicInteger(1);
// 创建发布间隔1000ms的订阅对象
client.getSubscriptionManager()
.createSubscription(1000.0)
.thenAccept(
t -> {
// 节点1
NodeId nodeId1 = new NodeId(2, "my.device.x1");
ReadValueId readValueId1 =
new ReadValueId(nodeId1, AttributeId.Value.uid(), null, null);
// 节点2
NodeId nodeId2 = new NodeId(2, "my.device.x2");
ReadValueId readValueId2 =
new ReadValueId(nodeId2, AttributeId.Value.uid(), null, null);
// 创建监控的参数
MonitoringParameters parameters =
new MonitoringParameters(
UInteger.valueOf(atomic.getAndIncrement()),
1000.0,
null,
UInteger.valueOf(10),
true);
// 创建监控项请求
// 该请求最后用于创建订阅。
MonitoredItemCreateRequest request1 =
new MonitoredItemCreateRequest(
readValueId1, MonitoringMode.Reporting, parameters);
MonitoredItemCreateRequest request2 =
new MonitoredItemCreateRequest(
readValueId2, MonitoringMode.Reporting, parameters);
List<MonitoredItemCreateRequest> requests = new ArrayList<>();
requests.add(request1);
requests.add(request2);
// 创建监控项,并且注册变量值改变时候的回调函数。
t.createMonitoredItems(
TimestampsToReturn.Both,
requests,
(item, id) ->
item.setValueConsumer(
(it, val) -> {
System.out.println(
"标识为"
+ it.getReadValueId()
.getNodeId()
+ "的项的值被更新为:"
+ val.getValue()
.getValue());
}));
})
.get();
// 持续订阅
Thread.sleep(Long.MAX_VALUE);
}
/**
* 订阅单个节点的重载方法
*
* @param nodeId
* @throws Exception
*/
public void subscribe(NodeId nodeId) throws Exception {
// 创建发布间隔1000ms的订阅对象
client.getSubscriptionManager()
.createSubscription(1000.0)
.thenAccept(
t -> {
ReadValueId readValueId =
new ReadValueId(nodeId, AttributeId.Value.uid(), null, null);
// 创建监控的参数
MonitoringParameters parameters =
new MonitoringParameters(
UInteger.valueOf(atomic.getAndIncrement()),
1000.0,
null,
UInteger.valueOf(10),
true);
// 创建监控项请求
// 该请求最后用于创建订阅。
MonitoredItemCreateRequest request =
new MonitoredItemCreateRequest(
readValueId, MonitoringMode.Reporting, parameters);
List<MonitoredItemCreateRequest> requests = new ArrayList<>();
requests.add(request);
// 创建监控项,并且注册变量值改变时候的回调函数。
t.createMonitoredItems(
TimestampsToReturn.Both,
requests,
(item, id) ->
item.setValueConsumer(
(it, val) -> {
System.out.println(
"nodeid :"
+ it.getReadValueId()
.getNodeId());
System.out.println(
"value :"
+ val.getValue()
.getValue());
}));
})
.get();
// 持续订阅
Thread.sleep(Long.MAX_VALUE);
}
/** 自定义订阅监听 */
private class CustomSubscriptionListener implements UaSubscriptionManager.SubscriptionListener {
private final OpcUaClient client;
CustomSubscriptionListener(OpcUaClient client) {
this.client = client;
}
public void onKeepAlive(UaSubscription subscription, DateTime publishTime) {
System.out.println("onKeepAlive");
}
public void onStatusChanged(UaSubscription subscription, StatusCode status) {
System.out.println("onStatusChanged");
}
public void onPublishFailure(UaException exception) {
System.out.println("onPublishFailure");
}
public void onNotificationDataLost(UaSubscription subscription) {
System.out.println("onNotificationDataLost");
}
}
public void close() {
if (null != client) {
client.disconnect();
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.milo.opcua.stack.core.NamespaceTable;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UShort;
import com.alibaba.fastjson2.JSONObject;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.*;
@Slf4j
public class OpcuaDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default";
public static class Holder {
private static final OpcuaDataSourceChannel INSTANCE = new OpcuaDataSourceChannel();
}
public static OpcuaDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return OpcuaOptionRule.optionRule();
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return OpcuaOptionRule.metadataRule();
}
@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> option) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
try {
if (requestParams.isEmpty()) {
return new ArrayList<>();
}
OpcUaClientService opcClient = createOpcClient(requestParams);
if (Objects.nonNull(opcClient)) {
String ns = requestParams.get("ns") + "";
String id = requestParams.get("id") + "";
NodeId nodeId = new NodeId(UShort.valueOf(ns), id);
return Arrays.asList(nodeId.toString());
}
return new ArrayList<>();
} catch (Exception ex) {
throw new DataSourcePluginException(
"check kafka connectivity failed, " + ex.getMessage(), ex);
}
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
String suffix = requestParams.get("suffix");
if (StringUtils.isNotEmpty(suffix)) {
return Arrays.asList(suffix);
}
return DEFAULT_DATABASES;
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
try {
OpcUaClientService opcClient = createOpcClient(requestParams);
// just test the connection
NamespaceTable namespaceTable = opcClient.getClient().getNamespaceTable();
return Objects.nonNull(namespaceTable);
} catch (Exception ex) {
throw new DataSourcePluginException(
"check kafka connectivity failed, " + ex.getMessage(), ex);
}
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
try {
List<TableField> tableFields = new ArrayList<>();
OpcUaClientService opcClient = createOpcClient(requestParams);
NodeId nodeId = null;
// just test the connection
if (StringUtils.isNotEmpty(table)) {
try {
JSONObject jsonObject = JSONObject.parseObject(table);
UShort uShort = UShort.valueOf(jsonObject.getString("ns"));
Object id = jsonObject.get("id");
if (id instanceof Integer) {
nodeId = new NodeId(uShort, (Integer) id);
} else {
nodeId = new NodeId(uShort, id + "");
}
} catch (Exception e) {
String[] split =
table.replace("NodeId{", "")
.replace("{", "")
.replace("}", "")
.split(",");
UShort uShort = null;
String id = "";
for (String s : split) {
String[] split1 = s.split(":|=");
if (split1.length == 2) {
if (split1[0].toLowerCase().indexOf("ns") > -1) {
uShort = UShort.valueOf(split1[1]);
}
if (split1[0].toLowerCase().indexOf("id") > -1) {
id = split1[1];
}
}
}
if (uShort != null && StringUtils.isNotEmpty(id)) {
try {
// 如果是int
Integer idn = Integer.parseInt(id);
nodeId = new NodeId(uShort, idn);
} catch (Exception ex) {
nodeId = new NodeId(uShort, id);
}
}
}
}
if (Objects.isNull(nodeId)) {
return new ArrayList<>();
}
Object objects = opcClient.readNodeObject(nodeId);
try {
JSONObject jsonObject = JSONObject.parseObject(objects.toString());
// 获取所有key的迭代器
Iterator<String> keys = jsonObject.keySet().iterator();
while (keys.hasNext()) {
String key = keys.next();
Object value = jsonObject.get(key);
TableField tableField = new TableField();
tableField.setName(key);
tableField.setType(value.getClass().getTypeName());
tableFields.add(tableField);
}
return tableFields;
} catch (Exception e) {
// 不是json
TableField tableField = new TableField();
tableField.setName(nodeId.getIdentifier() + "");
tableField.setType("String");
tableFields.add(tableField);
return tableFields;
}
} catch (Exception ex) {
throw new DataSourcePluginException(
"check mqtt getTableFields failed, " + ex.getMessage(), ex);
}
}
@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
Map<String, List<TableField>> map = new HashMap<>();
if (CollectionUtils.isNotEmpty(tables)) {
for (String table : tables) {
map.put(table, getTableFields(pluginName, requestParams, database, table));
}
}
return map;
}
private OpcUaClientService createOpcClient(Map<String, String> requestParams) {
String host = requestParams.get("host") + "";
String port = requestParams.get("port") + "";
String suffix = requestParams.get("suffix") + "";
try {
OpcUaClientService opcUaClientService =
new OpcUaClientService(host, Integer.parseInt(port), suffix);
return opcUaClientService;
} catch (Exception e) {
throw new DataSourcePluginException("OpcUa插件 创建链接错误!", e);
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set;
@AutoService(DataSourceFactory.class)
public class OpcuaDataSourceFactory implements DataSourceFactory {
public static final String OPCUA_PLUGIN_NAME = "Opcua";
public static final String OPCUA_PLUGIN_ICON = "opcua";
public static final String OPCUA_PLUGIN_VERSION = "1.0.0";
@Override
public String factoryIdentifier() {
return OPCUA_PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
return Sets.newHashSet(
DataSourcePluginInfo.builder()
.name(OPCUA_PLUGIN_NAME)
.icon(OPCUA_PLUGIN_ICON)
.version(OPCUA_PLUGIN_VERSION)
.supportVirtualTables(false)
.type(DatasourcePluginTypeEnum.NO_STRUCTURED.getCode())
.build());
}
@Override
public DataSourceChannel createChannel() {
return OpcuaDataSourceChannel.getInstance();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
public class OpcuaOptionRule {
public static final Option<String> HOST =
Options.key("host").stringType().noDefaultValue().withDescription("socket host");
public static final Option<Integer> PORT =
Options.key("port").intType().noDefaultValue().withDescription("socket port");
public static final Option<String> SUFFIX =
Options.key("suffix").stringType().noDefaultValue().withDescription("suffix");
public static final Option<Integer> NS =
Options.key("ns").intType().noDefaultValue().withDescription("ns");
public static final Option<String> ID =
Options.key("id").stringType().noDefaultValue().withDescription("id");
public static final Option<String> TYPE =
Options.key("type").stringType().defaultValue("int").withDescription("type");
public static final Option<Integer> GS =
Options.key("gs").intType().defaultValue(1).withDescription("type");
public static OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT, SUFFIX, NS, ID, TYPE).optional(GS).build();
}
public static OptionRule metadataRule() {
return OptionRule.builder().required(NS, ID, TYPE).build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import java.util.Map;
import java.util.Properties;
import static com.google.common.base.Preconditions.checkArgument;
public class OpcuaRequestParamsUtils {
public static Properties parsePropertiesFromRequestParams(Map<String, String> requestParams) {
checkArgument(
requestParams.containsKey(OpcuaOptionRule.HOST.key()),
String.format("Missing %s in requestParams", OpcuaOptionRule.HOST.key()));
final Properties properties = new Properties();
properties.put(OpcuaOptionRule.HOST.key(), requestParams.get(OpcuaOptionRule.HOST.key()));
if (requestParams.containsKey(OpcuaOptionRule.PORT.key())) {
Config configObject =
ConfigFactory.parseString(requestParams.get(OpcuaOptionRule.PORT.key()));
configObject
.entrySet()
.forEach(
entry -> {
properties.put(
entry.getKey(), entry.getValue().unwrapped().toString());
});
}
return properties;
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论