提交 a573a68d authored 作者: 宋勇's avatar 宋勇

修改添加静态实例

上级 75753937
...@@ -38,6 +38,15 @@ public class ElasticSearchDataSourceChannel implements DataSourceChannel { ...@@ -38,6 +38,15 @@ public class ElasticSearchDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default"; private static final String DATABASE = "default";
public static class Holder {
private static final ElasticSearchDataSourceChannel INSTANCE =
new ElasticSearchDataSourceChannel();
}
public static ElasticSearchDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public boolean canAbleGetSchema() { public boolean canAbleGetSchema() {
return true; return true;
......
...@@ -43,6 +43,6 @@ public class ElasticSearchDataSourceFactory implements DataSourceFactory { ...@@ -43,6 +43,6 @@ public class ElasticSearchDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new ElasticSearchDataSourceChannel(); return ElasticSearchDataSourceChannel.getInstance();
} }
} }
...@@ -39,6 +39,14 @@ public class InfluxdbDataSourceChannel implements DataSourceChannel { ...@@ -39,6 +39,14 @@ public class InfluxdbDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default"; private static final String DATABASE = "default";
public static class Holder {
private static final InfluxdbDataSourceChannel INSTANCE = new InfluxdbDataSourceChannel();
}
public static InfluxdbDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public boolean canAbleGetSchema() { public boolean canAbleGetSchema() {
return true; return true;
......
...@@ -43,6 +43,6 @@ public class InfluxdbDataSourceFactory implements DataSourceFactory { ...@@ -43,6 +43,6 @@ public class InfluxdbDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new InfluxdbDataSourceChannel(); return InfluxdbDataSourceChannel.getInstance();
} }
} }
...@@ -42,6 +42,14 @@ import static com.google.common.base.Preconditions.checkNotNull; ...@@ -42,6 +42,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j @Slf4j
public class ClickhouseJdbcDataSourceChannel implements DataSourceChannel { public class ClickhouseJdbcDataSourceChannel implements DataSourceChannel {
public static class Holder {
private static final ClickhouseJdbcDataSourceChannel INSTANCE =
new ClickhouseJdbcDataSourceChannel();
}
public static ClickhouseJdbcDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
......
...@@ -43,6 +43,6 @@ public class ClickhouseJdbcDataSourceFactory implements DataSourceFactory { ...@@ -43,6 +43,6 @@ public class ClickhouseJdbcDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new ClickhouseJdbcDataSourceChannel(); return ClickhouseJdbcDataSourceChannel.getInstance();
} }
} }
...@@ -44,6 +44,13 @@ import java.util.Map; ...@@ -44,6 +44,13 @@ import java.util.Map;
@Slf4j @Slf4j
public class HiveJdbcDataSourceChannel implements DataSourceChannel { public class HiveJdbcDataSourceChannel implements DataSourceChannel {
public static class Holder {
private static final HiveJdbcDataSourceChannel INSTANCE = new HiveJdbcDataSourceChannel();
}
public static HiveJdbcDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
......
...@@ -48,6 +48,6 @@ public class HiveJdbcDataSourceFactory implements DataSourceFactory { ...@@ -48,6 +48,6 @@ public class HiveJdbcDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new HiveJdbcDataSourceChannel(); return HiveJdbcDataSourceChannel.getInstance();
} }
} }
...@@ -38,6 +38,14 @@ import static com.google.common.base.Preconditions.checkNotNull; ...@@ -38,6 +38,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt { public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
List<String> typeList = Arrays.asList("varchar", "char", "json"); List<String> typeList = Arrays.asList("varchar", "char", "json");
public static class Holder {
private static final MysqlJdbcDataSourceChannel INSTANCE = new MysqlJdbcDataSourceChannel();
}
public static MysqlJdbcDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return MysqlDataSourceConfig.OPTION_RULE; return MysqlDataSourceConfig.OPTION_RULE;
......
...@@ -43,6 +43,6 @@ public class MysqlJdbcDataSourceFactory implements DataSourceFactory { ...@@ -43,6 +43,6 @@ public class MysqlJdbcDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new MysqlJdbcDataSourceChannel(); return MysqlJdbcDataSourceChannel.getInstance();
} }
} }
...@@ -36,6 +36,14 @@ import static com.google.common.base.Preconditions.checkNotNull; ...@@ -36,6 +36,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class OracleDataSourceChannel implements DataSourceChannelExt { public class OracleDataSourceChannel implements DataSourceChannelExt {
List<String> typeList = Arrays.asList("varchar", "varchar2", "char", "json"); List<String> typeList = Arrays.asList("varchar", "varchar2", "char", "json");
public static class Holder {
private static final OracleDataSourceChannel INSTANCE = new OracleDataSourceChannel();
}
public static OracleDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return OracleDataSourceConfig.OPTION_RULE; return OracleDataSourceConfig.OPTION_RULE;
......
...@@ -42,6 +42,6 @@ public class OracleJdbcDataSourceFactory implements DataSourceFactory { ...@@ -42,6 +42,6 @@ public class OracleJdbcDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new OracleDataSourceChannel(); return OracleDataSourceChannel.getInstance();
} }
} }
...@@ -36,6 +36,15 @@ import static com.google.common.base.Preconditions.checkNotNull; ...@@ -36,6 +36,15 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class PostgresqlDataSourceChannel implements DataSourceChannelExt { public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
List<String> typeList = Arrays.asList("varchar", "char", "json"); List<String> typeList = Arrays.asList("varchar", "char", "json");
public static class Holder {
private static final PostgresqlDataSourceChannel INSTANCE =
new PostgresqlDataSourceChannel();
}
public static PostgresqlDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return PostgresqlDataSourceConfig.OPTION_RULE; return PostgresqlDataSourceConfig.OPTION_RULE;
......
...@@ -41,6 +41,6 @@ public class PostgresqlDataSourceFactory implements DataSourceFactory { ...@@ -41,6 +41,6 @@ public class PostgresqlDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new PostgresqlDataSourceChannel(); return PostgresqlDataSourceChannel.getInstance();
} }
} }
...@@ -40,6 +40,13 @@ import java.util.Map; ...@@ -40,6 +40,13 @@ import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
public class RedshiftDataSourceChannel implements DataSourceChannel { public class RedshiftDataSourceChannel implements DataSourceChannel {
public static class Holder {
private static final RedshiftDataSourceChannel INSTANCE = new RedshiftDataSourceChannel();
}
public static RedshiftDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
......
...@@ -43,6 +43,6 @@ public class RedshiftDataSourceFactory implements DataSourceFactory { ...@@ -43,6 +43,6 @@ public class RedshiftDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new RedshiftDataSourceChannel(); return RedshiftDataSourceChannel.getInstance();
} }
} }
...@@ -36,6 +36,14 @@ import static com.google.common.base.Preconditions.checkNotNull; ...@@ -36,6 +36,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
@Slf4j @Slf4j
public class SqlServerDataSourceChannel implements DataSourceChannelExt { public class SqlServerDataSourceChannel implements DataSourceChannelExt {
public static class Holder {
private static final SqlServerDataSourceChannel INSTANCE = new SqlServerDataSourceChannel();
}
public static SqlServerDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return SqlServerDataSourceConfig.OPTION_RULE; return SqlServerDataSourceConfig.OPTION_RULE;
......
...@@ -43,6 +43,6 @@ public class SqlServerDataSourceFactory implements DataSourceFactory { ...@@ -43,6 +43,6 @@ public class SqlServerDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new SqlServerDataSourceChannel(); return SqlServerDataSourceChannel.getInstance();
} }
} }
...@@ -40,6 +40,14 @@ import java.util.Map; ...@@ -40,6 +40,14 @@ import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
public class StarRocksJdbcDataSourceChannel implements DataSourceChannel { public class StarRocksJdbcDataSourceChannel implements DataSourceChannel {
public static class Holder {
private static final StarRocksJdbcDataSourceChannel INSTANCE =
new StarRocksJdbcDataSourceChannel();
}
public static StarRocksJdbcDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
......
...@@ -41,6 +41,6 @@ public class StarRocksJdbcDataSourceFactory implements DataSourceFactory { ...@@ -41,6 +41,6 @@ public class StarRocksJdbcDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new StarRocksJdbcDataSourceChannel(); return StarRocksJdbcDataSourceChannel.getInstance();
} }
} }
...@@ -38,6 +38,14 @@ import static com.google.common.base.Preconditions.checkNotNull; ...@@ -38,6 +38,14 @@ import static com.google.common.base.Preconditions.checkNotNull;
public class TidbJdbcDataSourceChannel implements DataSourceChannelExt { public class TidbJdbcDataSourceChannel implements DataSourceChannelExt {
List<String> typeList = Arrays.asList("varchar", "char", "json"); List<String> typeList = Arrays.asList("varchar", "char", "json");
public static class Holder {
private static final TidbJdbcDataSourceChannel INSTANCE = new TidbJdbcDataSourceChannel();
}
public static TidbJdbcDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return TidbDataSourceConfig.OPTION_RULE; return TidbDataSourceConfig.OPTION_RULE;
......
...@@ -43,6 +43,6 @@ public class TidbJdbcDataSourceFactory implements DataSourceFactory { ...@@ -43,6 +43,6 @@ public class TidbJdbcDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new TidbJdbcDataSourceChannel(); return TidbJdbcDataSourceChannel.getInstance();
} }
} }
...@@ -41,6 +41,13 @@ import static com.google.common.base.Preconditions.checkArgument; ...@@ -41,6 +41,13 @@ import static com.google.common.base.Preconditions.checkArgument;
@Slf4j @Slf4j
public class KafkaDataSourceChannel implements DataSourceChannel { public class KafkaDataSourceChannel implements DataSourceChannel {
public static class Holder {
private static final KafkaDataSourceChannel INSTANCE = new KafkaDataSourceChannel();
}
public static KafkaDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
private static final String DATABASE = "default"; private static final String DATABASE = "default";
private static final DescribeClusterOptions DEFAULT_TIMEOUT_OPTIONS = private static final DescribeClusterOptions DEFAULT_TIMEOUT_OPTIONS =
......
...@@ -53,6 +53,6 @@ public class KafkaDataSourceFactory implements DataSourceFactory { ...@@ -53,6 +53,6 @@ public class KafkaDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new KafkaDataSourceChannel(); return KafkaDataSourceChannel.getInstance();
} }
} }
...@@ -26,6 +26,7 @@ import org.apache.seatunnel.datasource.plugin.api.model.TableField; ...@@ -26,6 +26,7 @@ import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.alibaba.fastjson2.JSONObject; import com.alibaba.fastjson2.JSONObject;
...@@ -41,6 +42,14 @@ public class MqttDataSourceChannel implements DataSourceChannel { ...@@ -41,6 +42,14 @@ public class MqttDataSourceChannel implements DataSourceChannel {
private InitCallback initCallback; private InitCallback initCallback;
public static class Holder {
private static final MqttDataSourceChannel INSTANCE = new MqttDataSourceChannel();
}
public static MqttDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return MqttOptionRule.optionRule(); return MqttOptionRule.optionRule();
...@@ -88,7 +97,11 @@ public class MqttDataSourceChannel implements DataSourceChannel { ...@@ -88,7 +97,11 @@ public class MqttDataSourceChannel implements DataSourceChannel {
try { try {
MqttClientService mqttClient = createMqttClient(requestParams); MqttClientService mqttClient = createMqttClient(requestParams);
// just test the connection // just test the connection
mqttClient.getMqttClient().connect(); MqttClient mqttClient1 = mqttClient.getMqttClient();
if (mqttClient1.isConnected()) {
return true;
}
mqttClient1.connect();
return StringUtils.isNotEmpty(mqttClient.getTopic()); return StringUtils.isNotEmpty(mqttClient.getTopic());
} catch (Exception ex) { } catch (Exception ex) {
......
...@@ -53,6 +53,6 @@ public class MqttDataSourceFactory implements DataSourceFactory { ...@@ -53,6 +53,6 @@ public class MqttDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new MqttDataSourceChannel(); return MqttDataSourceChannel.getInstance();
} }
} }
...@@ -53,7 +53,7 @@ public class MqttOptionRule { ...@@ -53,7 +53,7 @@ public class MqttOptionRule {
.withDescription("mqtt server password"); .withDescription("mqtt server password");
public static OptionRule optionRule() { public static OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT).optional(USERNAME, PASSWORD).build(); return OptionRule.builder().required(HOST, PORT, TOPIC, USERNAME, PASSWORD).build();
} }
public static OptionRule metadataRule() { public static OptionRule metadataRule() {
......
package org.apache.seatunnel.datasource.plugin.mqtt;
import static org.junit.jupiter.api.Assertions.*;
class MqttClientServiceTest {
public static void main(String[] args) throws Exception {
InitCallback initCallback = new InitCallback();
MqttClientService mqttClientService =
new MqttClientService("172.32.1.83", 1833, "", "", "dd", initCallback);
System.out.println(mqttClientService);
}
}
...@@ -36,6 +36,14 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -36,6 +36,14 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
public static final Set<String> MYSQL_SYSTEM_DATABASES = public static final Set<String> MYSQL_SYSTEM_DATABASES =
Sets.newHashSet("information_schema", "mysql", "performance_schema", "sys"); Sets.newHashSet("information_schema", "mysql", "performance_schema", "sys");
public static class Holder {
private static final MysqlCDCDataSourceChannel INSTANCE = new MysqlCDCDataSourceChannel();
}
public static MysqlCDCDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public boolean canAbleGetSchema() { public boolean canAbleGetSchema() {
return true; return true;
......
...@@ -41,6 +41,6 @@ public class MysqlCDCDataSourceFactory implements DataSourceFactory { ...@@ -41,6 +41,6 @@ public class MysqlCDCDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new MysqlCDCDataSourceChannel(); return MysqlCDCDataSourceChannel.getInstance();
} }
} }
...@@ -26,6 +26,8 @@ import lombok.Data; ...@@ -26,6 +26,8 @@ import lombok.Data;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
...@@ -153,7 +155,21 @@ public class OpcUaClientService { ...@@ -153,7 +155,21 @@ public class OpcUaClientService {
// 标识符 // 标识符
String id = String.valueOf(nodeId.getIdentifier()); String id = String.valueOf(nodeId.getIdentifier());
System.out.println(id + ": " + value.getValue().getValue()); System.out.println(id + ": " + value.getValue().getValue());
return (Object[]) value.getValue().getValue(); Object ob = value.getValue().getValue();
Object[] objects = new Object[] {null};
if (ob != null) {
if (ob instanceof DateTime) {
LocalDateTime localDateTime =
((DateTime) ob)
.getJavaInstant()
.atOffset(ZoneOffset.ofHours(8))
.toLocalDateTime();
objects = new Object[] {localDateTime + ""};
} else {
objects = new Object[] {ob + ""};
}
}
return objects;
} }
public Object[] readNodeValue(int namespaceIndex, Integer identifier) throws Exception { public Object[] readNodeValue(int namespaceIndex, Integer identifier) throws Exception {
...@@ -169,9 +185,21 @@ public class OpcUaClientService { ...@@ -169,9 +185,21 @@ public class OpcUaClientService {
// 标识符 // 标识符
String id = String.valueOf(nodeId.getIdentifier()); String id = String.valueOf(nodeId.getIdentifier());
System.out.println(id + ": " + value.getValue().getValue()); System.out.println(id + ": " + value.getValue().getValue());
Object[] value1 = (Object[]) value.getValue().getValue(); Object ob = value.getValue().getValue();
Object[] objects = new Object[] {null};
return value1; if (ob != null) {
if (ob instanceof DateTime) {
LocalDateTime localDateTime =
((DateTime) ob)
.getJavaInstant()
.atOffset(ZoneOffset.ofHours(8))
.toLocalDateTime();
objects = new Object[] {localDateTime + ""};
} else {
objects = new Object[] {ob + ""};
}
}
return objects;
} }
/** /**
...@@ -191,7 +219,21 @@ public class OpcUaClientService { ...@@ -191,7 +219,21 @@ public class OpcUaClientService {
String id = String.valueOf(nodeId.getIdentifier()); String id = String.valueOf(nodeId.getIdentifier());
System.out.println(nodeId); System.out.println(nodeId);
System.out.println(id + ": " + value.getValue().getValue()); System.out.println(id + ": " + value.getValue().getValue());
return (Object[]) value.getValue().getValue(); Object ob = value.getValue().getValue();
Object[] objects = new Object[] {null};
if (ob != null) {
if (ob instanceof DateTime) {
LocalDateTime localDateTime =
((DateTime) ob)
.getJavaInstant()
.atOffset(ZoneOffset.ofHours(8))
.toLocalDateTime();
objects = new Object[] {localDateTime + ""};
} else {
objects = new Object[] {ob + ""};
}
}
return objects;
} }
/** /**
......
...@@ -40,6 +40,14 @@ public class OpcuaDataSourceChannel implements DataSourceChannel { ...@@ -40,6 +40,14 @@ public class OpcuaDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default"; private static final String DATABASE = "default";
public static class Holder {
private static final OpcuaDataSourceChannel INSTANCE = new OpcuaDataSourceChannel();
}
public static OpcuaDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return OpcuaOptionRule.optionRule(); return OpcuaOptionRule.optionRule();
...@@ -178,7 +186,7 @@ public class OpcuaDataSourceChannel implements DataSourceChannel { ...@@ -178,7 +186,7 @@ public class OpcuaDataSourceChannel implements DataSourceChannel {
} }
return tableFields; return tableFields;
} catch (Exception e) { } catch (Exception e) {
//不是json // 不是json
TableField tableField = new TableField(); TableField tableField = new TableField();
tableField.setName(nodeId.getIdentifier() + ""); tableField.setName(nodeId.getIdentifier() + "");
tableField.setType("String"); tableField.setType("String");
...@@ -192,8 +200,6 @@ public class OpcuaDataSourceChannel implements DataSourceChannel { ...@@ -192,8 +200,6 @@ public class OpcuaDataSourceChannel implements DataSourceChannel {
} }
} }
@Override @Override
public Map<String, List<TableField>> getTableFields( public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName, @NonNull String pluginName,
......
...@@ -53,6 +53,6 @@ public class OpcuaDataSourceFactory implements DataSourceFactory { ...@@ -53,6 +53,6 @@ public class OpcuaDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new OpcuaDataSourceChannel(); return OpcuaDataSourceChannel.getInstance();
} }
} }
...@@ -42,10 +42,10 @@ public class OpcuaOptionRule { ...@@ -42,10 +42,10 @@ public class OpcuaOptionRule {
Options.key("type").stringType().defaultValue("int").withDescription("type"); Options.key("type").stringType().defaultValue("int").withDescription("type");
public static OptionRule optionRule() { public static OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT, SUFFIX).optional(TYPE).build(); return OptionRule.builder().required(HOST, PORT, SUFFIX, NS, ID, TYPE).build();
} }
public static OptionRule metadataRule() { public static OptionRule metadataRule() {
return OptionRule.builder().required(NS, ID).optional(TYPE).build(); return OptionRule.builder().required(NS, ID, TYPE).build();
} }
} }
...@@ -67,11 +67,67 @@ ...@@ -67,11 +67,67 @@
<groupId>org.apache.logging.log4j</groupId> <groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId> <artifactId>log4j-1.2-api</artifactId>
</exclusion> </exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-reload4j</artifactId>-->
<!-- <version>1.7.35</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId> <artifactId>hadoop-aws</artifactId>
<version>3.3.5</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.amazonaws</groupId> <groupId>com.amazonaws</groupId>
......
...@@ -26,8 +26,6 @@ import lombok.extern.slf4j.Slf4j; ...@@ -26,8 +26,6 @@ import lombok.extern.slf4j.Slf4j;
import java.util.Map; import java.util.Map;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
@Slf4j @Slf4j
public class HadoopS3AConfiguration { public class HadoopS3AConfiguration {
...@@ -58,7 +56,7 @@ public class HadoopS3AConfiguration { ...@@ -58,7 +56,7 @@ public class HadoopS3AConfiguration {
} }
String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : HDFS_S3N_IMPL; String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : HDFS_S3N_IMPL;
Configuration hadoopConf = new Configuration(); Configuration hadoopConf = new Configuration();
hadoopConf.set(FS_DEFAULT_NAME_KEY, bucket); hadoopConf.set("fs.defaut.name", bucket);
hadoopConf.set( hadoopConf.set(
S3OptionRule.FS_S3A_ENDPOINT.key(), S3OptionRule.FS_S3A_ENDPOINT.key(),
s3Options.get(S3OptionRule.FS_S3A_ENDPOINT.key())); s3Options.get(S3OptionRule.FS_S3A_ENDPOINT.key()));
......
...@@ -53,6 +53,6 @@ public class S3DataSourceFactory implements DataSourceFactory { ...@@ -53,6 +53,6 @@ public class S3DataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new S3DatasourceChannel(); return S3DatasourceChannel.getInstance();
} }
} }
...@@ -33,6 +33,14 @@ import java.util.List; ...@@ -33,6 +33,14 @@ import java.util.List;
import java.util.Map; import java.util.Map;
public class S3DatasourceChannel implements DataSourceChannel { public class S3DatasourceChannel implements DataSourceChannel {
public static class Holder {
private static final S3DatasourceChannel INSTANCE = new S3DatasourceChannel();
}
public static S3DatasourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return S3OptionRule.optionRule(); return S3OptionRule.optionRule();
......
package org.apache.seatunnel.datasource.plugin.s3;
import org.apache.hadoop.conf.Configuration;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.*;
class HadoopS3AConfigurationTest {
public static void main(String[] args) {
Map<String, String> map = new HashMap<>();
map.put("groupName", "测试s3适配器1");
map.put("adapterId", "1714559639070314496");
map.put("bucket", "hdfs://192.168.1.174:9001/s3atests3");
map.put("fs.s3a.endpoint", "http://192.168.1.174:9001");
map.put("fs.s3a.connection.ssl.enabled", "false");
map.put(
"fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
map.put("hadoop_s3_properties", " ");
map.put("access_key", "zyminio");
map.put("secret_key", "zysoft123");
Configuration configuration = HadoopS3AConfiguration.getConfiguration(map);
System.out.println(configuration);
}
}
...@@ -34,14 +34,65 @@ ...@@ -34,14 +34,65 @@
<dependency> <dependency>
<groupId>org.apache.seatunnel</groupId> <groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId> <artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
<version>2.3.2</version> <version>2.3.3</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>org.apache.avro</groupId> <groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId> <artifactId>avro</artifactId>
</exclusion> </exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>1.7.35</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency> <dependency>
<groupId>com.amazon.redshift</groupId> <groupId>com.amazon.redshift</groupId>
<artifactId>redshift-jdbc42</artifactId> <artifactId>redshift-jdbc42</artifactId>
...@@ -50,7 +101,7 @@ ...@@ -50,7 +101,7 @@
<dependency> <dependency>
<groupId>org.apache.hadoop</groupId> <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId> <artifactId>hadoop-aws</artifactId>
<version>${hadoop-aws.version}</version> <version>3.3.5</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>jdk.tools</groupId> <groupId>jdk.tools</groupId>
......
...@@ -47,6 +47,14 @@ import java.util.Set; ...@@ -47,6 +47,14 @@ import java.util.Set;
@Slf4j @Slf4j
public class S3RedshiftDataSourceChannel implements DataSourceChannel { public class S3RedshiftDataSourceChannel implements DataSourceChannel {
public static class Holder {
private static final S3RedshiftDataSourceChannel INSTANCE =
new S3RedshiftDataSourceChannel();
}
public static S3RedshiftDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
......
...@@ -51,6 +51,6 @@ public class S3RedshiftDataSourceFactory implements DataSourceFactory { ...@@ -51,6 +51,6 @@ public class S3RedshiftDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new S3RedshiftDataSourceChannel(); return S3RedshiftDataSourceChannel.getInstance();
} }
} }
...@@ -35,6 +35,14 @@ import java.util.*; ...@@ -35,6 +35,14 @@ import java.util.*;
@Slf4j @Slf4j
public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt { public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt {
public static class Holder {
private static final SqlServerCDCDataSourceChannel INSTANCE =
new SqlServerCDCDataSourceChannel();
}
public static SqlServerCDCDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
public static final Set<String> MYSQL_SYSTEM_DATABASES = public static final Set<String> MYSQL_SYSTEM_DATABASES =
Sets.newHashSet("master", "tempdb", "model", "msdb"); Sets.newHashSet("master", "tempdb", "model", "msdb");
......
...@@ -42,6 +42,6 @@ public class SqlServerCDCDataSourceFactory implements DataSourceFactory { ...@@ -42,6 +42,6 @@ public class SqlServerCDCDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new SqlServerCDCDataSourceChannel(); return SqlServerCDCDataSourceChannel.getInstance();
} }
} }
...@@ -38,6 +38,13 @@ import java.util.List; ...@@ -38,6 +38,13 @@ import java.util.List;
import java.util.Map; import java.util.Map;
public class StarRocksDataSourceChannel implements DataSourceChannel { public class StarRocksDataSourceChannel implements DataSourceChannel {
public static class Holder {
private static final StarRocksDataSourceChannel INSTANCE = new StarRocksDataSourceChannel();
}
public static StarRocksDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
private static final Logger LOGGER = LoggerFactory.getLogger(StarRocksDataSourceChannel.class); private static final Logger LOGGER = LoggerFactory.getLogger(StarRocksDataSourceChannel.class);
......
...@@ -40,6 +40,6 @@ public class StarRocksDataSourceFactory implements DataSourceFactory { ...@@ -40,6 +40,6 @@ public class StarRocksDataSourceFactory implements DataSourceFactory {
@Override @Override
public DataSourceChannel createChannel() { public DataSourceChannel createChannel() {
return new StarRocksDataSourceChannel(); return StarRocksDataSourceChannel.getInstance();
} }
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论