提交 3a0f3340 authored 作者: 宋勇's avatar 宋勇

kepserver

上级 390785bd
......@@ -28,6 +28,7 @@
<modules>
<module>seatunnel-datasource-client</module>
<module>seatunnel-datasource-plugins</module>
<module>seatunnel-datasource-plugins/datasource-kepserver</module>
</modules>
<dependencies>
......
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- <parent>-->
<!-- <groupId>org.apache.seatunnel</groupId>-->
<!-- <artifactId>seatunnel-datasource</artifactId>-->
<!-- <version>1.0.0-SNAPSHOT</version>-->
<!-- <relativePath>../../pom.xml</relativePath>-->
<!-- </parent>-->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.0</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-kepserver</artifactId>
<version>1.0.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>1.0.0-SNAPSHOT</version>
<!-- <scope>provided</scope>-->
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>common-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>sdk-client</artifactId>
<version>0.6.8</version>
</dependency>
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>sdk-server</artifactId>
<version>0.6.8</version>
</dependency>
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>stack-client</artifactId>
<version>0.6.8</version>
</dependency>
<dependency>
<groupId>org.eclipse.milo</groupId>
<artifactId>stack-server</artifactId>
<version>0.6.8</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
<artifactId>bcprov-jdk18on</artifactId>
<version>1.72</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.3.4</version>
</dependency>
</dependencies>
<repositories>
<!-- 如有Nexus私服, 取消注释并指向正确的服务器地址. -->
<repository>
<snapshots>
<enabled>true</enabled>
<updatePolicy>always</updatePolicy>
<checksumPolicy>warn</checksumPolicy>
</snapshots>
<id>zysoft-nexus-public</id>
<name>zysoft Repository</name>
<url>http://scjoyedu.eicp.net:10100/repository/maven-zysoft-public/</url>
</repository>
</repositories>
<pluginRepositories>
<!-- 如有Nexus私服, 取消注释并指向正确的服务器地址. -->
<pluginRepository>
<id>zysoft-nexus-public</id>
<name>zysoft Repository</name>
<url>http://scjoyedu.eicp.net:10100/repository/maven-zysoft-public/</url>
</pluginRepository>
</pluginRepositories>
<distributionManagement>
<repository>
<id>zysoft-nexus-releases</id>
<name>Releases</name>
<url>http://scjoyedu.eicp.net:10100/repository/maven-zysoft-releases/</url>
</repository>
<snapshotRepository>
<id>zysoft-nexus-snapshots</id>
<name>Snapshot</name>
<url>http://scjoyedu.eicp.net:10100/repository/maven-zysoft-snapshots/</url>
</snapshotRepository>
</distributionManagement>
</project>
\ No newline at end of file
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
import org.eclipse.milo.opcua.sdk.client.api.identity.IdentityProvider;
import org.eclipse.milo.opcua.stack.core.security.SecurityPolicy;
import org.eclipse.milo.opcua.stack.core.types.structured.EndpointDescription;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
public interface KepServerClient {
String getEndpointUrl();
default Predicate<EndpointDescription> endpointFilter() {
return e -> getSecurityPolicy().getUri().equals(e.getSecurityPolicyUri());
}
default SecurityPolicy getSecurityPolicy() {
// return SecurityPolicy.Basic256Sha256;
// return SecurityPolicy.Basic128Rsa15;
return SecurityPolicy.None;
}
default IdentityProvider getIdentityProvider() {
// return new UsernameProvider("ua_user","IwjqYp(8F+gW~^1");
return new AnonymousProvider();
}
Map run(OpcUaClient client, CompletableFuture<OpcUaClient> future) throws Exception;
}
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.stack.core.Stack;
import org.eclipse.milo.opcua.stack.core.types.builtin.LocalizedText;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.security.Security;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.Unsigned.uint;
public class KepServerClientRunner {
static {
// Required for SecurityPolicy.Aes256_Sha256_RsaPss
Security.addProvider(new BouncyCastleProvider());
}
private final Logger logger = LoggerFactory.getLogger(getClass());
private final CompletableFuture<OpcUaClient> future = new CompletableFuture<>();
private final KepServerClient kepServerClient;
public KepServerClientRunner(KepServerClient kepServerClient) throws Exception {
this(kepServerClient, true);
}
public KepServerClientRunner(KepServerClient kepServerClient, boolean serverRequired) throws Exception {
this.kepServerClient = kepServerClient;
}
private OpcUaClient createClient() throws Exception {
// 加载证书
// String filePath = KepServerClientRunner.class.getResource("/").getPath();
// File file = new File(filePath);
// KeyStoreLoader loader = new KeyStoreLoader().load(file.toPath());
return OpcUaClient.create(
kepServerClient.getEndpointUrl(),
endpoints ->
endpoints.stream()
.filter(kepServerClient.endpointFilter())
.findFirst(),
configBuilder ->
configBuilder
.setApplicationName(LocalizedText.english("eclipse milo opc-ua client"))
.setApplicationUri("urn:eclipse:milo:examples:client")
// .setKeyPair(loader.getClientKeyPair()) // 密钥对
// .setCertificate(loader.getClientCertificate()) // 证书
// .setCertificateChain(loader.getClientCertificateChain()) // 证书信任链
// .setCertificateValidator(certificateValidator) // 验证证书
.setIdentityProvider(kepServerClient.getIdentityProvider())
.setRequestTimeout(uint(5000))
.build()
);
}
public Map run() {
Map map=new HashMap<>();
try {
OpcUaClient client = createClient();
future.whenCompleteAsync((c, ex) -> {
if (ex != null) {
logger.error("Error running example: {}", ex.getMessage(), ex);
}
try {
client.disconnect().get();
Stack.releaseSharedResources();
} catch (InterruptedException | ExecutionException e) {
logger.error("Error disconnecting: {}", e.getMessage(), e);
}
try {
Thread.sleep(1000);
System.exit(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
try {
map = kepServerClient.run(client, future);
future.get(15, TimeUnit.SECONDS);
} catch (Throwable t) {
logger.error("Error running client example: {}", t.getMessage(), t);
future.completeExceptionally(t);
}
} catch (Throwable t) {
logger.error("Error getting client: {}", t.getMessage(), t);
future.completeExceptionally(t);
try {
Thread.sleep(1000);
System.exit(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(999_999_999);
} catch (InterruptedException e) {
e.printStackTrace();
}
return map;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.milo.opcua.stack.core.NamespaceTable;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.builtin.unsigned.UShort;
import com.alibaba.fastjson2.JSONObject;
import java.math.BigDecimal;
import java.util.*;
public class KepServerDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default";
public static class Holder {
private static final KepServerDataSourceChannel INSTANCE = new KepServerDataSourceChannel();
}
public static KepServerDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override
public OptionRule getDataSourceOptions(String pluginName) {
return KepServerOptionRule.optionRule();
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(String pluginName) {
return KepServerOptionRule.metadataRule();
}
@Override
public List<String> getTables(
String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> option) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
try {
if (requestParams.isEmpty()) {
return new ArrayList<>();
}
ReadKepServer example = new ReadKepServer(requestParams);
Map map = new KepServerClientRunner(example, true).run();
if (map!=null && map.size()>0) {
String ns = requestParams.get("ns") + "";
String id = requestParams.get("id") + "";
NodeId nodeId = new NodeId(UShort.valueOf(ns), id);
return Arrays.asList(nodeId.toString());
}
return new ArrayList<>();
} catch (Exception ex) {
throw new DataSourcePluginException(
"check kafka connectivity failed, " + ex.getMessage(), ex);
}
}
@Override
public List<String> getDatabases(
String pluginName, Map<String, String> requestParams) {
String suffix = requestParams.get("suffix");
if (StringUtils.isNotEmpty(suffix)) {
return Arrays.asList(suffix);
}
return DEFAULT_DATABASES;
}
@Override
public boolean checkDataSourceConnectivity(
String pluginName, Map<String, String> requestParams) {
try {
ReadKepServer example = new ReadKepServer(requestParams);
new KepServerClientRunner(example, true);
return true;
} catch (Exception ex) {
throw new DataSourcePluginException(
"check kafka connectivity failed, " + ex.getMessage(), ex);
}
}
@Override
public List<TableField> getTableFields(
String pluginName,
Map<String, String> requestParams,
String database,
String table) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
try {
List<TableField> tableFields = new ArrayList<>();
ReadKepServer example = new ReadKepServer(requestParams);
Map<String,Object> map = new KepServerClientRunner(example, true).run();
for(Map.Entry<String,Object> m:map.entrySet()) {
TableField tableField = new TableField();
tableField.setName(m.getKey());
tableField.setNullable(true);
if(m.getValue() instanceof Integer){
tableField.setType("int");
} else if(m.getValue() instanceof BigDecimal){
tableField.setType("double");
} else if (m.getValue() instanceof Date){
tableField.setType("date");
} else if(m.getValue() instanceof Short){
tableField.setType("short");
} else {
tableField.setType("varchar");
}
tableFields.add(tableField);
}
return tableFields;
} catch (Exception ex) {
throw new DataSourcePluginException(
"check mqtt getTableFields failed, " + ex.getMessage(), ex);
}
}
@Override
public Map<String, List<TableField>> getTableFields(
String pluginName,
Map<String, String> requestParams,
String database,
List<String> tables) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
Map<String, List<TableField>> map = new HashMap<>();
if (CollectionUtils.isNotEmpty(tables)) {
for (String table : tables) {
map.put(table, getTableFields(pluginName, requestParams, database, table));
}
}
return map;
}
}
......@@ -15,23 +15,23 @@
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
package org.apache.seatunnel.datasource.plugin.kepserver;
import com.google.auto.service.AutoService;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set;
@AutoService(DataSourceFactory.class)
public class OpcuaDataSourceFactory implements DataSourceFactory {
public class KepServerDataSourceFactory implements DataSourceFactory {
public static final String OPCUA_PLUGIN_NAME = "Opcua";
public static final String OPCUA_PLUGIN_ICON = "opcua";
public static final String OPCUA_PLUGIN_NAME = "Kepserver";
public static final String OPCUA_PLUGIN_ICON = "Kepserver";
public static final String OPCUA_PLUGIN_VERSION = "1.0.0";
@Override
......@@ -53,6 +53,6 @@ public class OpcuaDataSourceFactory implements DataSourceFactory {
@Override
public DataSourceChannel createChannel() {
return OpcuaDataSourceChannel.getInstance();
return KepServerDataSourceChannel.getInstance();
}
}
......@@ -15,13 +15,13 @@
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
public class OpcuaOptionRule {
public class KepServerOptionRule {
public static final Option<String> HOST =
Options.key("host").stringType().noDefaultValue().withDescription("socket host");
......@@ -45,10 +45,10 @@ public class OpcuaOptionRule {
Options.key("gs").intType().defaultValue(1).withDescription("type");
public static OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT, SUFFIX, NS, ID, TYPE).optional(GS).build();
return OptionRule.builder().required(HOST, PORT, NS, ID).optional(GS).build();
}
public static OptionRule metadataRule() {
return OptionRule.builder().required(NS, ID, TYPE).build();
return OptionRule.builder().required(NS, ID).build();
}
}
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
......@@ -25,17 +25,18 @@ import java.util.Properties;
import static com.google.common.base.Preconditions.checkArgument;
public class OpcuaRequestParamsUtils {
public class KepServerRequestParamsUtils {
public static Properties parsePropertiesFromRequestParams(Map<String, String> requestParams) {
checkArgument(
requestParams.containsKey(OpcuaOptionRule.HOST.key()),
String.format("Missing %s in requestParams", OpcuaOptionRule.HOST.key()));
requestParams.containsKey(KepServerOptionRule.HOST.key()),
String.format("Missing %s in requestParams", KepServerOptionRule.HOST.key()));
final Properties properties = new Properties();
properties.put(OpcuaOptionRule.HOST.key(), requestParams.get(OpcuaOptionRule.HOST.key()));
if (requestParams.containsKey(OpcuaOptionRule.PORT.key())) {
properties.put(
KepServerOptionRule.HOST.key(), requestParams.get(KepServerOptionRule.HOST.key()));
if (requestParams.containsKey(KepServerOptionRule.PORT.key())) {
Config configObject =
ConfigFactory.parseString(requestParams.get(OpcuaOptionRule.PORT.key()));
ConfigFactory.parseString(requestParams.get(KepServerOptionRule.PORT.key()));
configObject
.entrySet()
.forEach(
......
package org.apache.seatunnel.datasource.plugin.opcua;
/*
* Copyright (c) 2021 the Eclipse Milo Authors
*
......@@ -9,6 +8,8 @@ package org.apache.seatunnel.datasource.plugin.opcua;
* SPDX-License-Identifier: EPL-2.0
*/
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.eclipse.milo.opcua.sdk.server.util.HostnameUtil;
import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateBuilder;
import org.eclipse.milo.opcua.stack.core.util.SelfSignedCertificateGenerator;
......@@ -26,11 +27,13 @@ import java.util.regex.Pattern;
class KeyStoreLoader {
private static final Pattern IP_ADDR_PATTERN =
Pattern.compile(
"^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
private static final Pattern IP_ADDR_PATTERN = Pattern.compile(
"^(([01]?\\d\\d?|2[0-4]\\d|25[0-5])\\.){3}([01]?\\d\\d?|2[0-4]\\d|25[0-5])$");
// 证书的别名,就是生成证书时 -name 后的参数,不设置的话默认是“1”
private static final String CLIENT_ALIAS = "client-ai";
// 证书保护密码
private static final char[] PASSWORD = "password".toCharArray();
private final Logger logger = LoggerFactory.getLogger(getClass());
......@@ -39,29 +42,30 @@ class KeyStoreLoader {
private X509Certificate clientCertificate;
private KeyPair clientKeyPair;
public KeyStoreLoader load(Path baseDir, String endPoint) throws Exception {
KeyStoreLoader load(Path baseDir) throws Exception {
KeyStore keyStore = KeyStore.getInstance("PKCS12");
// 证书文件名,生成 target/classes/example-client.pfx
Path serverKeyStore = baseDir.resolve("example-client.pfx");
logger.info("Loading KeyStore at {}", serverKeyStore);
// 第一次走生成证书代码,以后会使用生成的证书
if (!Files.exists(serverKeyStore)) {
keyStore.load(null, PASSWORD);
KeyPair keyPair = SelfSignedCertificateGenerator.generateRsaKeyPair(2048);
SelfSignedCertificateBuilder builder =
new SelfSignedCertificateBuilder(keyPair)
.setCommonName("Eclipse Milo Example Client")
.setOrganization("digitalpetri")
.setOrganizationalUnit("dev")
.setLocalityName("Folsom")
.setStateName("CA")
.setCountryCode("US")
.setApplicationUri(endPoint);
// .addDnsName("localhost")
// .addIpAddress("192.168.0.101");
SelfSignedCertificateBuilder builder = new SelfSignedCertificateBuilder(keyPair)
.setCommonName("Eclipse Milo Example Client")
.setOrganization("digitalpetri")
.setOrganizationalUnit("dev")
.setLocalityName("Folsom")
.setStateName("CA")
.setCountryCode("US")
.setApplicationUri("urn:eclipse:milo:examples:client")
.addDnsName("localhost")
.addIpAddress("127.0.0.1");
// Get as many hostnames and IP addresses as we can listed in the certificate.
for (String hostname : HostnameUtil.getHostnames("0.0.0.0")) {
......@@ -74,11 +78,7 @@ class KeyStoreLoader {
X509Certificate certificate = builder.build();
keyStore.setKeyEntry(
CLIENT_ALIAS,
keyPair.getPrivate(),
PASSWORD,
new X509Certificate[] {certificate});
keyStore.setKeyEntry(CLIENT_ALIAS, keyPair.getPrivate(), PASSWORD, new X509Certificate[]{certificate});
try (OutputStream out = Files.newOutputStream(serverKeyStore)) {
keyStore.store(out, PASSWORD);
}
......@@ -92,10 +92,9 @@ class KeyStoreLoader {
if (clientPrivateKey instanceof PrivateKey) {
clientCertificate = (X509Certificate) keyStore.getCertificate(CLIENT_ALIAS);
clientCertificateChain =
Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS))
.map(X509Certificate.class::cast)
.toArray(X509Certificate[]::new);
clientCertificateChain = Arrays.stream(keyStore.getCertificateChain(CLIENT_ALIAS))
.map(X509Certificate.class::cast)
.toArray(X509Certificate[]::new);
PublicKey serverPublicKey = clientCertificate.getPublicKey();
clientKeyPair = new KeyPair(serverPublicKey, (PrivateKey) clientPrivateKey);
......@@ -104,7 +103,7 @@ class KeyStoreLoader {
return this;
}
public X509Certificate getClientCertificate() {
X509Certificate getClientCertificate() {
return clientCertificate;
}
......@@ -112,7 +111,8 @@ class KeyStoreLoader {
return clientCertificateChain;
}
public KeyPair getClientKeyPair() {
KeyPair getClientKeyPair() {
return clientKeyPair;
}
}
/*
* Copyright (c) 2019 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.apache.seatunnel.datasource.plugin.kepserver;
import com.google.common.collect.ImmutableList;
import org.apache.commons.lang3.StringUtils;
import org.checkerframework.checker.units.qual.K;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.nodes.UaNode;
import org.eclipse.milo.opcua.sdk.client.nodes.UaVariableNode;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.types.builtin.DataValue;
import org.eclipse.milo.opcua.stack.core.types.builtin.NodeId;
import org.eclipse.milo.opcua.stack.core.types.enumerated.NodeClass;
import org.eclipse.milo.opcua.stack.core.types.enumerated.ServerState;
import org.eclipse.milo.opcua.stack.core.types.enumerated.TimestampsToReturn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
public class ReadKepServer implements KepServerClient {
private String endpointUrl;
private Integer namespaceIndex;
private String identifier;
public static void main(String[] args) throws Exception {
Map map=new HashMap();
map.put(KepServerOptionRule.HOST.key(),"127.0.0.1");
map.put(KepServerOptionRule.PORT.key(),"49320");
map.put(KepServerOptionRule.NS.key(),"2");
map.put(KepServerOptionRule.ID.key(),"cs.设备.top2");
ReadKepServer example = new ReadKepServer(map);
example.endpointUrl="opc.tcp://127.0.0.1:49320";
new KepServerClientRunner(example, true).run();
}
ReadKepServer( Map<String, String> requestParams){
String host = requestParams.get(KepServerOptionRule.HOST.key());
String port= requestParams.get(KepServerOptionRule.PORT.key());
this.endpointUrl="opc.tcp://"+host+":"+port;
String ns = requestParams.get(KepServerOptionRule.NS.key());
if(StringUtils.isNotBlank(ns)){
try{
Integer integer = Integer.valueOf(ns);
this.namespaceIndex=integer;
}catch (Exception e){
this.namespaceIndex=0;
}
}
this.identifier=requestParams.get(KepServerOptionRule.ID.key());
}
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public String getEndpointUrl() {
return endpointUrl;
}
public void setEndpointUrl(String url){
this.endpointUrl=url;
}
@Override
public Map run(OpcUaClient client, CompletableFuture<OpcUaClient> future) throws Exception {
// synchronous connect
client.connect().get();
Map map=new HashMap<>();
NodeId nodeId = new NodeId(namespaceIndex, identifier); // 字符串类型的地址
// NodeId nodeId = new NodeId(4, 7); // 数字类型的地址
DataValue dataValue = client.readValue(0.0, TimestampsToReturn.Both, nodeId).get();
System.out.println("-----读取-----");
System.out.println("-----cs.设备.top2:" + dataValue.getValue().getValue());
//---------------------------------------
//Identifiers.DataTypesFolder
NodeId nodeId1 = new NodeId(namespaceIndex, identifier);
List<? extends UaNode> nodes = client.getAddressSpace().browseNodes(nodeId1);
for(UaNode node:nodes){
NodeId nodeId2 = node.getNodeId();
if(node.getNodeClass() == NodeClass.Variable){
DataValue value2 = client.readValue(0.0, TimestampsToReturn.Both, nodeId2).get();
System.out.println(nodeId2.getIdentifier().toString() + ": "+ value2.getValue().getValue());
map.put(identifier,value2.getValue().getValue());
}
}
// synchronous read request via VariableNode
UaVariableNode node = client.getAddressSpace().getVariableNode(Identifiers.Server_ServerStatus_StartTime);
DataValue value = node.readValue();
logger.info("StartTime={}", value.getValue().getValue());
// asynchronous read request
readServerStateAndTime(client).thenAccept(values -> {
DataValue v0 = values.get(0);
DataValue v1 = values.get(1);
logger.info("State={}", ServerState.from((Integer) v0.getValue().getValue()));
logger.info("CurrentTime={}", v1.getValue().getValue());
future.complete(client);
});
return map;
}
private CompletableFuture<List<DataValue>> readServerStateAndTime(OpcUaClient client) {
List<NodeId> nodeIds = ImmutableList.of(
Identifiers.Server_ServerStatus_State,
Identifiers.Server_ServerStatus_CurrentTime);
return client.readValues(0.0, TimestampsToReturn.Both, nodeIds);
}
}
package org.apache.seatunnel.datasource.plugin.opcua;
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.apache.commons.collections4.CollectionUtils;
import org.eclipse.milo.opcua.sdk.client.OpcUaClient;
import org.eclipse.milo.opcua.sdk.client.api.config.OpcUaClientConfig;
import org.eclipse.milo.opcua.sdk.client.api.identity.AnonymousProvider;
import org.eclipse.milo.opcua.sdk.client.api.nodes.Node;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscription;
import org.eclipse.milo.opcua.sdk.client.api.subscriptions.UaSubscriptionManager;
import org.eclipse.milo.opcua.stack.client.UaTcpStackClient;
import org.eclipse.milo.opcua.stack.core.AttributeId;
import org.eclipse.milo.opcua.stack.core.Identifiers;
import org.eclipse.milo.opcua.stack.core.UaException;
......@@ -32,11 +33,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@Data
public class OpcUaClientService {
public class KepServerClientService {
// 批量订阅namespaceIndex默认为2
private int batchNamespaceIndex = 2;
......@@ -51,7 +53,7 @@ public class OpcUaClientService {
private String suffix;
public OpcUaClientService(String host, Integer port, String suffix) throws Exception {
public KepServerClientService(String host, Integer port, String suffix) throws Exception {
this.host = host;
this.port = port;
this.suffix = suffix;
......@@ -75,31 +77,31 @@ public class OpcUaClientService {
if (!Files.exists(securityTempDir)) {
throw new Exception("unable to create security dir: " + securityTempDir);
}
KeyStoreLoader loader =
new KeyStoreLoader().load(securityTempDir, endPointUrl); // 创建OpcUA的访问证书类对象
// KeyStoreLoader loader =
// new KeyStoreLoader().load(securityTempDir, endPointUrl); // 创建OpcUA的访问证书类对象
// 获取OPC UA的服务器端节点
EndpointDescription[] endpoints = UaTcpStackClient.getEndpoints(endPointUrl).get();
// 过滤掉不需要的安全策略,选择一个自己需要的安全策略
EndpointDescription endpoint =
Arrays.stream(endpoints)
.filter(e -> e.getEndpointUrl().equals(endPointUrl))
.findFirst()
.orElseThrow(() -> new Exception("没有节点返回"));
// 设置OPC UA的配置信息
OpcUaClientConfig config =
OpcUaClientConfig.builder()
.setApplicationUri(endPointUrl) // 放入url
.setCertificate(
loader
.getClientCertificate()) // 需要传入一个数字证书作为形参,我们用KeyStoreLoader类创建了
.setKeyPair(loader.getClientKeyPair()) // 传入密匙对
.setEndpoint(endpoint) // EndpointDescription对象,就是设置刚刚选择的节点就可以了
// 使用匿名登录方式
.setIdentityProvider(new AnonymousProvider())
.setRequestTimeout(UInteger.valueOf(10 * 1000)) // 设置请求超时时间,单位为毫秒。
.build();
// EndpointDescription[] endpoints = UaTcpStackClient.getEndpoints(endPointUrl).get();
// // 过滤掉不需要的安全策略,选择一个自己需要的安全策略
// EndpointDescription endpoint =
// Arrays.stream(endpoints)
// .filter(e -> e.getEndpointUrl().equals(endPointUrl))
// .findFirst()
// .orElseThrow(() -> new Exception("没有节点返回"));
// // 设置OPC UA的配置信息
// OpcUaClientConfig config =
// OpcUaClientConfig.builder()
// .setApplicationUri(endPointUrl) // 放入url
// .setCertificate(
// loader
// .getClientCertificate()) // 需要传入一个数字证书作为形参,我们用KeyStoreLoader类创建了
// .setKeyPair(loader.getClientKeyPair()) // 传入密匙对
// .setEndpoint(endpoint) // EndpointDescription对象,就是设置刚刚选择的节点就可以了
// // 使用匿名登录方式
// .setIdentityProvider(new AnonymousProvider())
// .setRequestTimeout(UInteger.valueOf(10 * 1000)) // 设置请求超时时间,单位为毫秒。
// .build();
// 创建OPC UA客户端
client = new OpcUaClient(config);
// client = new OpcUaClient(config);
Thread.sleep(2000); // 线程休眠一下再返回对象,给创建过程一个时间。
return client;
}
......@@ -110,34 +112,34 @@ public class OpcUaClientService {
* @param uaNode 节点
* @throws Exception
*/
public List listNode(Node uaNode, List<Node> nodes) throws Exception {
List<Node> nodesSub;
if (uaNode == null) {
nodesSub = client.getAddressSpace().browse(Identifiers.ObjectsFolder).get();
} else {
nodesSub = client.getAddressSpace().browseNode(uaNode).get();
}
if (CollectionUtils.isNotEmpty(nodesSub)) {
for (Node nd : nodesSub) {
nodes.add(nd);
// 排除系统行性节点,这些系统性节点名称一般都是以"_"开头
if (Objects.requireNonNull(nd.getBrowseName().get(10, TimeUnit.SECONDS).getName())
.contains("_")) {
continue;
}
System.out.println(
"Node= "
+ nd.getBrowseName().get(10, TimeUnit.SECONDS).getName()
+ ",id="
+ nd.getNodeId());
listNode(nd, nodes);
}
}
return nodes;
}
// public List listNode(Node uaNode, List<Node> nodes) throws Exception {
//
// List<Node> nodesSub;
// if (uaNode == null) {
// nodesSub = client.getAddressSpace().browse(Identifiers.ObjectsFolder).get();
// } else {
// nodesSub = client.getAddressSpace().browseNode(uaNode).get();
// }
// if (CollectionUtils.isNotEmpty(nodesSub)) {
//
// for (Node nd : nodesSub) {
//
// nodes.add(nd);
// // 排除系统行性节点,这些系统性节点名称一般都是以"_"开头
// if (Objects.requireNonNull(nd.getBrowseName().get(10, TimeUnit.SECONDS).getName())
// .contains("_")) {
// continue;
// }
// System.out.println(
// "Node= "
// + nd.getBrowseName().get(10, TimeUnit.SECONDS).getName()
// + ",id="
// + nd.getNodeId());
// listNode(nd, nodes);
// }
// }
// return nodes;
// }
/**
* 读取节点数据
......@@ -484,4 +486,8 @@ public class OpcUaClientService {
client.disconnect();
}
}
}
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
......@@ -36,26 +36,26 @@ import lombok.extern.slf4j.Slf4j;
import java.util.*;
@Slf4j
public class OpcuaDataSourceChannel implements DataSourceChannel {
public class KepServerDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default";
public static class Holder {
private static final OpcuaDataSourceChannel INSTANCE = new OpcuaDataSourceChannel();
private static final KepServerDataSourceChannel INSTANCE = new KepServerDataSourceChannel();
}
public static OpcuaDataSourceChannel getInstance() {
public static KepServerDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return OpcuaOptionRule.optionRule();
return KepServerOptionRule.optionRule();
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return OpcuaOptionRule.metadataRule();
return KepServerOptionRule.metadataRule();
}
@Override
......@@ -70,7 +70,7 @@ public class OpcuaDataSourceChannel implements DataSourceChannel {
if (requestParams.isEmpty()) {
return new ArrayList<>();
}
OpcUaClientService opcClient = createOpcClient(requestParams);
KepServerClientService opcClient = createOpcClient(requestParams);
if (Objects.nonNull(opcClient)) {
String ns = requestParams.get("ns") + "";
String id = requestParams.get("id") + "";
......@@ -99,7 +99,7 @@ public class OpcuaDataSourceChannel implements DataSourceChannel {
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
try {
OpcUaClientService opcClient = createOpcClient(requestParams);
KepServerClientService opcClient = createOpcClient(requestParams);
// just test the connection
NamespaceTable namespaceTable = opcClient.getClient().getNamespaceTable();
return Objects.nonNull(namespaceTable);
......@@ -121,7 +121,7 @@ public class OpcuaDataSourceChannel implements DataSourceChannel {
// default");
try {
List<TableField> tableFields = new ArrayList<>();
OpcUaClientService opcClient = createOpcClient(requestParams);
KepServerClientService opcClient = createOpcClient(requestParams);
NodeId nodeId = null;
// just test the connection
if (StringUtils.isNotEmpty(table)) {
......@@ -218,15 +218,15 @@ public class OpcuaDataSourceChannel implements DataSourceChannel {
return map;
}
private OpcUaClientService createOpcClient(Map<String, String> requestParams) {
private KepServerClientService createOpcClient(Map<String, String> requestParams) {
String host = requestParams.get("host") + "";
String port = requestParams.get("port") + "";
String suffix = requestParams.get("suffix") + "";
try {
OpcUaClientService opcUaClientService =
new OpcUaClientService(host, Integer.parseInt(port), suffix);
return opcUaClientService;
KepServerClientService kepServerClientService =
new KepServerClientService(host, Integer.parseInt(port), suffix);
return kepServerClientService;
} catch (Exception e) {
throw new DataSourcePluginException("OpcUa插件 创建链接错误!", e);
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set;
@AutoService(DataSourceFactory.class)
public class KepServerDataSourceFactory implements DataSourceFactory {
public static final String OPCUA_PLUGIN_NAME = "Kepserver";
public static final String OPCUA_PLUGIN_ICON = "Kepserver";
public static final String OPCUA_PLUGIN_VERSION = "1.0.0";
@Override
public String factoryIdentifier() {
return OPCUA_PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
return Sets.newHashSet(
DataSourcePluginInfo.builder()
.name(OPCUA_PLUGIN_NAME)
.icon(OPCUA_PLUGIN_ICON)
.version(OPCUA_PLUGIN_VERSION)
.supportVirtualTables(false)
.type(DatasourcePluginTypeEnum.NO_STRUCTURED.getCode())
.build());
}
@Override
public DataSourceChannel createChannel() {
return KepServerDataSourceChannel.getInstance();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
public class KepServerOptionRule {
public static final Option<String> HOST =
Options.key("host").stringType().noDefaultValue().withDescription("socket host");
public static final Option<Integer> PORT =
Options.key("port").intType().noDefaultValue().withDescription("socket port");
public static final Option<String> SUFFIX =
Options.key("suffix").stringType().noDefaultValue().withDescription("suffix");
public static final Option<Integer> NS =
Options.key("ns").intType().noDefaultValue().withDescription("ns");
public static final Option<String> ID =
Options.key("id").stringType().noDefaultValue().withDescription("id");
public static final Option<String> TYPE =
Options.key("type").stringType().defaultValue("int").withDescription("type");
public static final Option<Integer> GS =
Options.key("gs").intType().defaultValue(1).withDescription("type");
public static OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT, NS, ID).optional(GS).build();
}
public static OptionRule metadataRule() {
return OptionRule.builder().required(NS, ID).build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import java.util.Map;
import java.util.Properties;
import static com.google.common.base.Preconditions.checkArgument;
public class KepServerRequestParamsUtils {
public static Properties parsePropertiesFromRequestParams(Map<String, String> requestParams) {
checkArgument(
requestParams.containsKey(KepServerOptionRule.HOST.key()),
String.format("Missing %s in requestParams", KepServerOptionRule.HOST.key()));
final Properties properties = new Properties();
properties.put(
KepServerOptionRule.HOST.key(), requestParams.get(KepServerOptionRule.HOST.key()));
if (requestParams.containsKey(KepServerOptionRule.PORT.key())) {
Config configObject =
ConfigFactory.parseString(requestParams.get(KepServerOptionRule.PORT.key()));
configObject
.entrySet()
.forEach(
entry -> {
properties.put(
entry.getKey(), entry.getValue().unwrapped().toString());
});
}
return properties;
}
}
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
......
......@@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.opcua;
package org.apache.seatunnel.datasource.plugin.kepserver;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
......
......@@ -46,7 +46,7 @@
<module>datasource-sqlserver-cdc</module>
<module>datasource-jdbc-tidb</module>
<module>datasource-mqtt</module>
<module>datasource-opcua</module>
<!-- <module>datasource-opcua</module>-->
<module>datasource-redis</module>
<module>datasource-rabbitmq</module>
<module>datasource-ftp</module>
......@@ -56,7 +56,7 @@
<module>datasource-xml</module>
<module>datasource-csv</module>
<module>datasource-excel</module>
<module>datasource-opcua-kepserver</module>
<!-- <module>datasource-kepserver1</module>-->
</modules>
<build>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论