提交 373502c0 authored 作者: 宋勇's avatar 宋勇

atasource-jdbc-demeng

datasource-jdbc-access datasource-http datasource-xml datasource-csv datasource-excel 增加
上级 f06ee4a0
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>datasource-csv</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkiverse.minio</groupId>
<artifactId>minio-client</artifactId>
<version>0.2.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-reload4j</artifactId>-->
<!-- <version>1.7.35</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
</dependency>
</dependencies>
</project>
package org.apache.seatunnel.datasource.plugin.csv;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import java.util.Map;
@Slf4j
public class CSVAConfiguration {
/* S3 constants */
private static final String S3A_SCHEMA = "s3a";
private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
private static final String S3A_PROTOCOL = "s3a";
private static final String DEFAULT_PROTOCOL = "s3n";
private static final String S3_FORMAT_KEY = "fs.%s.%s";
private static final String HDFS_IMPL_KEY = "impl";
public static Configuration getConfiguration(Map<String, String> s3Options) {
if (!s3Options.containsKey(CSVOptionRule.BUCKET.key())) {
throw new IllegalArgumentException(
"S3 datasource bucket is null, please check your config");
}
if (!s3Options.containsKey(CSVOptionRule.FS_S3A_ENDPOINT.key())) {
throw new IllegalArgumentException(
"S3 datasource endpoint is null, please check your config");
}
String bucket = s3Options.get( CSVOptionRule.BUCKET.key());
String protocol = DEFAULT_PROTOCOL;
if (bucket.startsWith(S3A_PROTOCOL)) {
protocol = S3A_PROTOCOL;
}
String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : HDFS_S3N_IMPL;
Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.defaut.name", bucket);
hadoopConf.set(
CSVOptionRule.FS_S3A_ENDPOINT.key(),
s3Options.get(CSVOptionRule.FS_S3A_ENDPOINT.key()));
hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
if (s3Options.containsKey(CSVOptionRule.HADOOP_S3_PROPERTIES.key())) {
Config configObject =
ConfigFactory.parseString(
s3Options.get(CSVOptionRule.HADOOP_S3_PROPERTIES.key()));
configObject
.entrySet()
.forEach(
entry -> {
hadoopConf.set(
entry.getKey(), entry.getValue().unwrapped().toString());
});
}
if (CSVOptionRule.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider.getProvider()
.equals(s3Options.get(CSVOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key()))) {
hadoopConf.set(
CSVOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key(),
s3Options.get(CSVOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key()));
hadoopConf.set("fs.s3a.access.key", s3Options.get(CSVOptionRule.ACCESS_KEY.key()));
hadoopConf.set("fs.s3a.secret.key", s3Options.get(CSVOptionRule.SECRET_KEY.key()));
} else {
hadoopConf.set(
CSVOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key(),
s3Options.get(CSVOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key()));
}
return hadoopConf;
}
private static String formatKey(String protocol, String key) {
return String.format(S3_FORMAT_KEY, protocol, key);
}
}
package org.apache.seatunnel.datasource.plugin.csv;
import io.minio.MinioClient;
import io.minio.errors.MinioException;
public class CSVClientService {
private String ENDPOINT;
private String PROVIDER;
private String USERNAME;
private String PASSWORD;
private String BUCKET;
private Integer PORT;
private final String clientId = "Client" + (int) (Math.random() * 100000000);
private MinioClient minioClient;
public CSVClientService(
String endpoint, String provider, String username, String password, Integer port)
throws MinioException {
this.ENDPOINT = endpoint;
this.PROVIDER = provider;
this.USERNAME = username;
this.PASSWORD = password;
this.PORT = port;
setMinioClient(endpoint, provider, username, password, port);
}
public MinioClient getMinioClient() {
return minioClient;
}
public void setMinioClient(
String endpoint, String provider, String username, String password, Integer port)
throws MinioException {
minioClient =
new MinioClient.Builder()
.endpoint(endpoint, port, false)
.credentials(username, password)
.build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.csv;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import java.util.Set;
@AutoService(DataSourceFactory.class)
public class CSVDataSourceFactory implements DataSourceFactory {
private static final String PLUGIN_NAME = "S3";
@Override
public String factoryIdentifier() {
return PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
DataSourcePluginInfo s3DatasourcePluginInfo =
DataSourcePluginInfo.builder()
.name(PLUGIN_NAME)
.type(DatasourcePluginTypeEnum.FILE.getCode())
.version("1.0.0")
.supportVirtualTables(false)
.icon("S3File")
.build();
return Sets.newHashSet(s3DatasourcePluginInfo);
}
@Override
public DataSourceChannel createChannel() {
return CSVDatasourceChannel.getInstance();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.csv;
import com.alibaba.fastjson2.JSON;
import io.minio.*;
import io.minio.errors.*;
import io.minio.messages.Bucket;
import io.minio.messages.Item;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CSVDatasourceChannel implements DataSourceChannel {
private CSVClientService s3ClientService;
public static class Holder {
private static final CSVDatasourceChannel INSTANCE = new CSVDatasourceChannel();
}
public static CSVDatasourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return CSVOptionRule.optionRule();
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return CSVOptionRule.metadataRule();
}
@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> options) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
List<String> tab = new ArrayList<>();
try {
boolean b =
minioClient.bucketExists(BucketExistsArgs.builder().bucket(database).build());
if (!b) {
return tab;
// throw new MinioException("桶不存在");
}
Iterable<Result<Item>> results = getFileByDir(minioClient, database, null);
results.forEach(
x -> {
try {
boolean dir = x.get().isDir();
String s = x.get().objectName();
if (!dir) {
tab.add(s);
} else {
getAllFile(
tab,
getFileByDir(minioClient, database, s),
minioClient,
database);
}
} catch (ErrorResponseException
| InsufficientDataException
| InternalException
| InvalidKeyException
| InvalidResponseException
| IOException
| NoSuchAlgorithmException
| ServerException
| XmlParserException e) {
throw new RuntimeException(e);
}
});
return tab;
} catch (InvalidKeyException | IOException | NoSuchAlgorithmException | MinioException e) {
throw new RuntimeException(e);
}
// throw new UnsupportedOperationException("getTables is not supported for S3
// datasource");
}
private void getAllFile(
List<String> tab,
Iterable<Result<Item>> results,
MinioClient minioClient,
String database) {
results.forEach(
x -> {
try {
boolean dir = x.get().isDir();
String s = x.get().objectName();
if (!dir) {
tab.add(s);
} else {
this.getAllFile(
tab,
getFileByDir(minioClient, database, s),
minioClient,
database);
}
} catch (ErrorResponseException
| InsufficientDataException
| InternalException
| InvalidKeyException
| InvalidResponseException
| IOException
| NoSuchAlgorithmException
| ServerException
| XmlParserException e) {
throw new RuntimeException(e);
}
});
}
private Iterable<Result<Item>> getFileByDir(
@NonNull MinioClient minioClient, @NonNull String bucket, String dir) {
Iterable<Result<Item>> results;
if (StringUtils.isEmpty(dir)) {
results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucket).build());
} else {
results =
minioClient.listObjects(
ListObjectsArgs.builder().bucket(bucket).prefix(dir).build());
}
return results;
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
List<String> db = new ArrayList<>();
try {
List<Bucket> buckets = minioClient.listBuckets();
buckets.forEach(
x -> {
String name = x.name();
db.add(name);
});
return db;
} catch (ServerException
| ErrorResponseException
| InsufficientDataException
| IOException
| NoSuchAlgorithmException
| InvalidKeyException
| InvalidResponseException
| XmlParserException
| InternalException e) {
throw new RuntimeException(e);
}
// throw new UnsupportedOperationException("getDatabases is not supported for S3
// datasource");
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
if (requestParams.isEmpty()) {
throw new SeaTunnelException("requestParmas 为空!");
}
try {
createS3Client(requestParams);
return true;
} catch (Exception ex) {
throw new DataSourcePluginException(
"check s3 connectivity failed, " + ex.getMessage(), ex);
}
// try (FileSystem fs = FileSystem.get(conf)) {
//// fs.listStatus(new Path("/"));
//
// return true;
// } catch (IOException e) {
// throw new DataSourcePluginException(
// String.format("check s3 connectivity failed, config is: %s",
// requestParams), e);
// }
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
List<TableField> tableFields = getObject(minioClient, table, database, requestParams);
return tableFields;
// throw new UnsupportedOperationException(
// "getTableFields is not supported for S3 datasource");
}
@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
Map<String, List<TableField>> stringList = new HashMap<>();
tables.forEach(
tab -> {
List<TableField> tableFields =
getObject(minioClient, tab, database, requestParams);
stringList.put(tab, tableFields);
});
return stringList;
// throw new UnsupportedOperationException(
// "getTableFields is not supported for S3 datasource");
}
private List<TableField> getObject(
MinioClient minioClient,
String table,
String database,
Map<String, String> requestParams) {
List<TableField> all = new ArrayList<>();
try {
GetObjectResponse minioClientObject =
minioClient.getObject(
GetObjectArgs.builder().object(table).bucket(database).build());
BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader(minioClientObject));
String line = null;
while ((line = bufferedReader.readLine()) != null) {
if (table.endsWith("csv")) {
String delimiter = requestParams.get("delimiter");
String[] split = line.split(delimiter);
Class<TableField> tableFieldClass = TableField.class;
Field[] declaredFields = tableFieldClass.getDeclaredFields();
HashMap<String, Object> map = new HashMap<>();
for (int i = 0; i < split.length; i++) {
String name = declaredFields[i].getName();
map.put(name, split[i]);
}
TableField tableField =
JSON.parseObject(JSON.toJSONString(map), TableField.class);
all.add(tableField);
}
}
System.out.println(
all + "==================================================================");
} catch (ErrorResponseException
| InsufficientDataException
| InternalException
| InvalidKeyException
| InvalidResponseException
| IOException
| NoSuchAlgorithmException
| ServerException
| XmlParserException e) {
throw new RuntimeException(e);
}
return all;
}
public CSVClientService createS3Client(Map<String, String> requestParams) {
int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":");
String endpoint = requestParams.get("fs.s3a.endpoint") + "";
Integer port =
Integer.valueOf(
requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", ""));
String provider = requestParams.get("fs.s3a.aws.credentials.provider") + "";
String username = requestParams.get("access_key") + "";
String password = requestParams.get("secret_key") + "";
// String bucket = requestParams.get("bucket") + "";
try {
s3ClientService = new CSVClientService(endpoint, provider, username, password, port);
return s3ClientService;
} catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.csv;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import java.util.Arrays;
import java.util.Map;
public class CSVOptionRule {
public static final Option<String> ACCESS_KEY =
Options.key("access_key")
.stringType()
.noDefaultValue()
.withDescription("S3 access key");
public static final Option<String> SECRET_KEY =
Options.key("secret_key")
.stringType()
.noDefaultValue()
.withDescription("S3 secret key");
public static final Option<String> BUCKET =
Options.key("bucket").stringType().noDefaultValue().withDescription("S3 bucket name");
public static final Option<String> FS_S3A_ENDPOINT =
Options.key("fs.s3a.endpoint")
.stringType()
.noDefaultValue()
.withDescription("fs s3a endpoint");
public static final Option<S3aAwsCredentialsProvider> S3A_AWS_CREDENTIALS_PROVIDER =
Options.key("fs.s3a.aws.credentials.provider")
.enumType(S3aAwsCredentialsProvider.class)
.defaultValue(S3aAwsCredentialsProvider.InstanceProfileCredentialsProvider)
.withDescription("s3a aws credentials provider");
public static final Option<Map<String, String>> HADOOP_S3_PROPERTIES =
Options.key("hadoop_s3_properties")
.mapType()
.noDefaultValue()
.withDescription(
"{\n"
+ "fs.s3a.buffer.dir=/data/st_test/s3a\n"
+ "fs.s3a.fast.upload.buffer=disk\n"
+ "}");
public static OptionRule optionRule() {
return OptionRule.builder()
.required(BUCKET, FS_S3A_ENDPOINT, S3A_AWS_CREDENTIALS_PROVIDER)
.optional(HADOOP_S3_PROPERTIES)
.conditional(
S3A_AWS_CREDENTIALS_PROVIDER,
S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider,
ACCESS_KEY,
SECRET_KEY)
.build();
}
public static final Option<String> PATH =
Options.key("path").stringType().noDefaultValue().withDescription("S3 write path");
public static final Option<String> TYPE =
Options.key("file_format_type")
.stringType()
.defaultValue("csv")
.withDescription("S3 write type");
public static final Option<String> DELIMITER =
Options.key("delimiter")
.stringType()
.noDefaultValue()
.withDescription("S3 write delimiter");
public static final Option<Map<String, String>> SCHEMA =
Options.key("schema").mapType().noDefaultValue().withDescription("SeaTunnel Schema");
public static final Option<Boolean> PARSE_PARSE_PARTITION_FROM_PATH =
Options.key("parse_partition_from_path")
.booleanType()
.noDefaultValue()
.withDescription("S3 write parse_partition_from_path");
public static final Option<String> DATE_FORMAT =
Options.key("date_format")
.stringType()
.noDefaultValue()
.withDescription("S3 write date_format");
public static final Option<String> DATETIME_FORMAT =
Options.key("time_format")
.stringType()
.noDefaultValue()
.withDescription("S3 write time_format");
public static final Option<String> TIME_FORMAT =
Options.key("datetime_format")
.stringType()
.noDefaultValue()
.withDescription("S3 write datetime_format");
public static OptionRule metadataRule() {
return OptionRule.builder()
.required(PATH, TYPE)
.conditional(TYPE, FileFormat.CSV.type, DELIMITER)
.conditional(TYPE, FileFormat.CSV.type, SCHEMA)
.optional(PARSE_PARSE_PARTITION_FROM_PATH)
.optional(DATE_FORMAT)
.optional(DATETIME_FORMAT)
.optional(TIME_FORMAT)
.build();
}
public enum S3aAwsCredentialsProvider {
SimpleAWSCredentialsProvider("org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"),
InstanceProfileCredentialsProvider("com.amazonaws.auth.InstanceProfileCredentialsProvider");
private String provider;
S3aAwsCredentialsProvider(String provider) {
this.provider = provider;
}
public String getProvider() {
return provider;
}
@Override
public String toString() {
return provider;
}
}
public enum FileFormat {
CSV("csv"),
TEXT("txt"),
PARQUET("parquet"),
ORC("orc"),
JSON("json"),
XML("xml");
private final String type;
FileFormat(String type) {
this.type = type;
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>datasource-excel</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkiverse.minio</groupId>
<artifactId>minio-client</artifactId>
<version>0.2.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-reload4j</artifactId>-->
<!-- <version>1.7.35</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
</dependency>
</dependencies>
</project>
package org.apache.seatunnel.datasource.plugin.excel;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import java.util.Map;
@Slf4j
public class ExcelAConfiguration {
/* S3 constants */
private static final String S3A_SCHEMA = "s3a";
private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
private static final String S3A_PROTOCOL = "s3a";
private static final String DEFAULT_PROTOCOL = "s3n";
private static final String S3_FORMAT_KEY = "fs.%s.%s";
private static final String HDFS_IMPL_KEY = "impl";
public static Configuration getConfiguration(Map<String, String> s3Options) {
if (!s3Options.containsKey(ExcelOptionRule.BUCKET.key())) {
throw new IllegalArgumentException(
"S3 datasource bucket is null, please check your config");
}
if (!s3Options.containsKey(ExcelOptionRule.FS_S3A_ENDPOINT.key())) {
throw new IllegalArgumentException(
"S3 datasource endpoint is null, please check your config");
}
String bucket = s3Options.get( ExcelOptionRule.BUCKET.key());
String protocol = DEFAULT_PROTOCOL;
if (bucket.startsWith(S3A_PROTOCOL)) {
protocol = S3A_PROTOCOL;
}
String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : HDFS_S3N_IMPL;
Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.defaut.name", bucket);
hadoopConf.set(
ExcelOptionRule.FS_S3A_ENDPOINT.key(),
s3Options.get(ExcelOptionRule.FS_S3A_ENDPOINT.key()));
hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
if (s3Options.containsKey(ExcelOptionRule.HADOOP_S3_PROPERTIES.key())) {
Config configObject =
ConfigFactory.parseString(
s3Options.get(ExcelOptionRule.HADOOP_S3_PROPERTIES.key()));
configObject
.entrySet()
.forEach(
entry -> {
hadoopConf.set(
entry.getKey(), entry.getValue().unwrapped().toString());
});
}
if (ExcelOptionRule.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider.getProvider()
.equals(s3Options.get(ExcelOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key()))) {
hadoopConf.set(
ExcelOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key(),
s3Options.get(ExcelOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key()));
hadoopConf.set("fs.s3a.access.key", s3Options.get(ExcelOptionRule.ACCESS_KEY.key()));
hadoopConf.set("fs.s3a.secret.key", s3Options.get(ExcelOptionRule.SECRET_KEY.key()));
} else {
hadoopConf.set(
ExcelOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key(),
s3Options.get(ExcelOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key()));
}
return hadoopConf;
}
private static String formatKey(String protocol, String key) {
return String.format(S3_FORMAT_KEY, protocol, key);
}
}
package org.apache.seatunnel.datasource.plugin.excel;
import io.minio.MinioClient;
import io.minio.errors.MinioException;
public class ExcelClientService {
private String ENDPOINT;
private String PROVIDER;
private String USERNAME;
private String PASSWORD;
private String BUCKET;
private Integer PORT;
private final String clientId = "Client" + (int) (Math.random() * 100000000);
private MinioClient minioClient;
public ExcelClientService(
String endpoint, String provider, String username, String password, Integer port)
throws MinioException {
this.ENDPOINT = endpoint;
this.PROVIDER = provider;
this.USERNAME = username;
this.PASSWORD = password;
this.PORT = port;
setMinioClient(endpoint, provider, username, password, port);
}
public MinioClient getMinioClient() {
return minioClient;
}
public void setMinioClient(
String endpoint, String provider, String username, String password, Integer port)
throws MinioException {
minioClient =
new MinioClient.Builder()
.endpoint(endpoint, port, false)
.credentials(username, password)
.build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.excel;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import java.util.Set;
@AutoService(DataSourceFactory.class)
public class ExcelDataSourceFactory implements DataSourceFactory {
private static final String PLUGIN_NAME = "S3";
@Override
public String factoryIdentifier() {
return PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
DataSourcePluginInfo s3DatasourcePluginInfo =
DataSourcePluginInfo.builder()
.name(PLUGIN_NAME)
.type(DatasourcePluginTypeEnum.FILE.getCode())
.version("1.0.0")
.supportVirtualTables(false)
.icon("S3File")
.build();
return Sets.newHashSet(s3DatasourcePluginInfo);
}
@Override
public DataSourceChannel createChannel() {
return ExcelDatasourceChannel.getInstance();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.excel;
import com.alibaba.fastjson2.JSON;
import io.minio.*;
import io.minio.errors.*;
import io.minio.messages.Bucket;
import io.minio.messages.Item;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ExcelDatasourceChannel implements DataSourceChannel {
private ExcelClientService s3ClientService;
public static class Holder {
private static final ExcelDatasourceChannel INSTANCE = new ExcelDatasourceChannel();
}
public static ExcelDatasourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return ExcelOptionRule.optionRule();
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return ExcelOptionRule.metadataRule();
}
@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> options) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
List<String> tab = new ArrayList<>();
try {
boolean b =
minioClient.bucketExists(BucketExistsArgs.builder().bucket(database).build());
if (!b) {
return tab;
// throw new MinioException("桶不存在");
}
Iterable<Result<Item>> results = getFileByDir(minioClient, database, null);
results.forEach(
x -> {
try {
boolean dir = x.get().isDir();
String s = x.get().objectName();
if (!dir) {
tab.add(s);
} else {
getAllFile(
tab,
getFileByDir(minioClient, database, s),
minioClient,
database);
}
} catch (ErrorResponseException
| InsufficientDataException
| InternalException
| InvalidKeyException
| InvalidResponseException
| IOException
| NoSuchAlgorithmException
| ServerException
| XmlParserException e) {
throw new RuntimeException(e);
}
});
return tab;
} catch (InvalidKeyException | IOException | NoSuchAlgorithmException | MinioException e) {
throw new RuntimeException(e);
}
// throw new UnsupportedOperationException("getTables is not supported for S3
// datasource");
}
private void getAllFile(
List<String> tab,
Iterable<Result<Item>> results,
MinioClient minioClient,
String database) {
results.forEach(
x -> {
try {
boolean dir = x.get().isDir();
String s = x.get().objectName();
if (!dir) {
tab.add(s);
} else {
this.getAllFile(
tab,
getFileByDir(minioClient, database, s),
minioClient,
database);
}
} catch (ErrorResponseException
| InsufficientDataException
| InternalException
| InvalidKeyException
| InvalidResponseException
| IOException
| NoSuchAlgorithmException
| ServerException
| XmlParserException e) {
throw new RuntimeException(e);
}
});
}
private Iterable<Result<Item>> getFileByDir(
@NonNull MinioClient minioClient, @NonNull String bucket, String dir) {
Iterable<Result<Item>> results;
if (StringUtils.isEmpty(dir)) {
results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucket).build());
} else {
results =
minioClient.listObjects(
ListObjectsArgs.builder().bucket(bucket).prefix(dir).build());
}
return results;
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
List<String> db = new ArrayList<>();
try {
List<Bucket> buckets = minioClient.listBuckets();
buckets.forEach(
x -> {
String name = x.name();
db.add(name);
});
return db;
} catch (ServerException
| ErrorResponseException
| InsufficientDataException
| IOException
| NoSuchAlgorithmException
| InvalidKeyException
| InvalidResponseException
| XmlParserException
| InternalException e) {
throw new RuntimeException(e);
}
// throw new UnsupportedOperationException("getDatabases is not supported for S3
// datasource");
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
if (requestParams.isEmpty()) {
throw new SeaTunnelException("requestParmas 为空!");
}
try {
createS3Client(requestParams);
return true;
} catch (Exception ex) {
throw new DataSourcePluginException(
"check s3 connectivity failed, " + ex.getMessage(), ex);
}
// try (FileSystem fs = FileSystem.get(conf)) {
//// fs.listStatus(new Path("/"));
//
// return true;
// } catch (IOException e) {
// throw new DataSourcePluginException(
// String.format("check s3 connectivity failed, config is: %s",
// requestParams), e);
// }
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
List<TableField> tableFields = getObject(minioClient, table, database, requestParams);
return tableFields;
// throw new UnsupportedOperationException(
// "getTableFields is not supported for S3 datasource");
}
@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
Map<String, List<TableField>> stringList = new HashMap<>();
tables.forEach(
tab -> {
List<TableField> tableFields =
getObject(minioClient, tab, database, requestParams);
stringList.put(tab, tableFields);
});
return stringList;
// throw new UnsupportedOperationException(
// "getTableFields is not supported for S3 datasource");
}
private List<TableField> getObject(
MinioClient minioClient,
String table,
String database,
Map<String, String> requestParams) {
List<TableField> all = new ArrayList<>();
try {
GetObjectResponse minioClientObject =
minioClient.getObject(
GetObjectArgs.builder().object(table).bucket(database).build());
BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader(minioClientObject));
String line = null;
while ((line = bufferedReader.readLine()) != null) {
if (table.endsWith("csv")) {
String delimiter = requestParams.get("delimiter");
String[] split = line.split(delimiter);
Class<TableField> tableFieldClass = TableField.class;
Field[] declaredFields = tableFieldClass.getDeclaredFields();
HashMap<String, Object> map = new HashMap<>();
for (int i = 0; i < split.length; i++) {
String name = declaredFields[i].getName();
map.put(name, split[i]);
}
TableField tableField =
JSON.parseObject(JSON.toJSONString(map), TableField.class);
all.add(tableField);
}
}
System.out.println(
all + "==================================================================");
} catch (ErrorResponseException
| InsufficientDataException
| InternalException
| InvalidKeyException
| InvalidResponseException
| IOException
| NoSuchAlgorithmException
| ServerException
| XmlParserException e) {
throw new RuntimeException(e);
}
return all;
}
public ExcelClientService createS3Client(Map<String, String> requestParams) {
int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":");
String endpoint = requestParams.get("fs.s3a.endpoint") + "";
Integer port =
Integer.valueOf(
requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", ""));
String provider = requestParams.get("fs.s3a.aws.credentials.provider") + "";
String username = requestParams.get("access_key") + "";
String password = requestParams.get("secret_key") + "";
// String bucket = requestParams.get("bucket") + "";
try {
s3ClientService = new ExcelClientService(endpoint, provider, username, password, port);
return s3ClientService;
} catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.excel;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import java.util.Arrays;
import java.util.Map;
public class ExcelOptionRule {
public static final Option<String> ACCESS_KEY =
Options.key("access_key")
.stringType()
.noDefaultValue()
.withDescription("S3 access key");
public static final Option<String> SECRET_KEY =
Options.key("secret_key")
.stringType()
.noDefaultValue()
.withDescription("S3 secret key");
public static final Option<String> BUCKET =
Options.key("bucket").stringType().noDefaultValue().withDescription("S3 bucket name");
public static final Option<String> FS_S3A_ENDPOINT =
Options.key("fs.s3a.endpoint")
.stringType()
.noDefaultValue()
.withDescription("fs s3a endpoint");
public static final Option<S3aAwsCredentialsProvider> S3A_AWS_CREDENTIALS_PROVIDER =
Options.key("fs.s3a.aws.credentials.provider")
.enumType(S3aAwsCredentialsProvider.class)
.defaultValue(S3aAwsCredentialsProvider.InstanceProfileCredentialsProvider)
.withDescription("s3a aws credentials provider");
public static final Option<Map<String, String>> HADOOP_S3_PROPERTIES =
Options.key("hadoop_s3_properties")
.mapType()
.noDefaultValue()
.withDescription(
"{\n"
+ "fs.s3a.buffer.dir=/data/st_test/s3a\n"
+ "fs.s3a.fast.upload.buffer=disk\n"
+ "}");
public static OptionRule optionRule() {
return OptionRule.builder()
.required(BUCKET, FS_S3A_ENDPOINT, S3A_AWS_CREDENTIALS_PROVIDER)
.optional(HADOOP_S3_PROPERTIES)
.conditional(
S3A_AWS_CREDENTIALS_PROVIDER,
S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider,
ACCESS_KEY,
SECRET_KEY)
.build();
}
public static final Option<String> PATH =
Options.key("path").stringType().noDefaultValue().withDescription("S3 write path");
public static final Option<FileFormat> TYPE =
Options.key("file_format_type")
.enumType(FileFormat.class)
.noDefaultValue()
.withDescription("S3 write type");
public static final Option<String> DELIMITER =
Options.key("delimiter")
.stringType()
.noDefaultValue()
.withDescription("S3 write delimiter");
public static final Option<Map<String, String>> SCHEMA =
Options.key("schema").mapType().noDefaultValue().withDescription("SeaTunnel Schema");
public static final Option<Boolean> PARSE_PARSE_PARTITION_FROM_PATH =
Options.key("parse_partition_from_path")
.booleanType()
.noDefaultValue()
.withDescription("S3 write parse_partition_from_path");
public static final Option<String> DATE_FORMAT =
Options.key("date_format")
.stringType()
.noDefaultValue()
.withDescription("S3 write date_format");
public static final Option<String> DATETIME_FORMAT =
Options.key("time_format")
.stringType()
.noDefaultValue()
.withDescription("S3 write time_format");
public static final Option<String> TIME_FORMAT =
Options.key("datetime_format")
.stringType()
.noDefaultValue()
.withDescription("S3 write datetime_format");
public static OptionRule metadataRule() {
return OptionRule.builder()
.required(PATH, TYPE)
.conditional(TYPE, Arrays.asList(FileFormat.XLSX, FileFormat.XLS), DELIMITER)
.conditional(TYPE, Arrays.asList(FileFormat.XLSX, FileFormat.XLS), SCHEMA)
.optional(PARSE_PARSE_PARTITION_FROM_PATH)
.optional(DATE_FORMAT)
.optional(DATETIME_FORMAT)
.optional(TIME_FORMAT)
.build();
}
public enum S3aAwsCredentialsProvider {
SimpleAWSCredentialsProvider("org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"),
InstanceProfileCredentialsProvider("com.amazonaws.auth.InstanceProfileCredentialsProvider");
private String provider;
S3aAwsCredentialsProvider(String provider) {
this.provider = provider;
}
public String getProvider() {
return provider;
}
@Override
public String toString() {
return provider;
}
}
public enum FileFormat {
XLSX("xlsx"),
XLS("xls");
private final String type;
FileFormat(String type) {
this.type = type;
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>datasource-http</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.http;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@Slf4j
public class HttpAConfiguration {
public static HttpConfiguration getConfiguration(Map<String, String> ftpOption) {
if (!ftpOption.containsKey(HttpOptionRule.URL.key())) {
throw new IllegalArgumentException(
"url is null, please check your config");
}
HttpConfiguration httpAConfiguration = new HttpConfiguration();
httpAConfiguration.setUrl(HttpOptionRule.URL.key());
httpAConfiguration.setMethod(HttpOptionRule.METHOD.key());
httpAConfiguration.setToken(HttpOptionRule.TOKEN.key());
httpAConfiguration.setRequest_params(HttpOptionRule.REQUEST_PARAMS.key());
return httpAConfiguration;
}
}
package org.apache.seatunnel.datasource.plugin.http;
import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.HttpClients;
public class HttpClientService {
public static HttpClient connect(HttpConfiguration conf) throws Exception {
// 创建HttpClient实例
HttpClient client = HttpClients.createDefault();
return client;
}
}
package org.apache.seatunnel.datasource.plugin.http;
public class HttpConfiguration {
private String url;
private String token;
private String method;
private String request_params;
public HttpConfiguration() {}
public HttpConfiguration(
String url,String method, String token,String request_params ) {
this.url = url;
this.token = token;
this.method=method;
this.request_params=request_params;
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public String getToken() {
return token;
}
public void setToken(String token) {
this.token = token;
}
public String getRequest_params() {
return request_params;
}
public void setRequest_params(String request_params) {
this.request_params = request_params;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.http;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set;
@AutoService(DataSourceFactory.class)
public class HttpDataSourceFactory implements DataSourceFactory {
private static final String PLUGIN_NAME = "Http";
@Override
public String factoryIdentifier() {
return PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
DataSourcePluginInfo ftpDatasourcePluginInfo =
DataSourcePluginInfo.builder()
.name(PLUGIN_NAME)
.type(DatasourcePluginTypeEnum.FILE.getCode())
.version("1.0.0")
.supportVirtualTables(false)
.icon("FtpFile")
.build();
return Sets.newHashSet(ftpDatasourcePluginInfo);
}
@Override
public DataSourceChannel createChannel() {
return new HttpDatasourceChannel();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.http;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.*;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import lombok.NonNull;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class HttpDatasourceChannel implements DataSourceChannel {
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return HttpOptionRule.optionRule();
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return null;
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
HttpConfiguration conf = HttpAConfiguration.getConfiguration(requestParams);
try {
HttpClient httpClient = HttpClientService.connect(conf);
if (Objects.isNull(conf)) {
throw new DataSourcePluginException(
String.format(
"check ftp connectivity failed, config is: %s", requestParams));
}
String url = conf.getUrl();
String method = conf.getMethod();
String token = conf.getToken();
String parmams = conf.getRequest_params();
// 目标URL
URI uri = new URI(url);
HttpResponse response = null;
if (StringUtils.isBlank(method) || "GET".equals(method.toUpperCase())) {
if (StringUtils.isNotBlank(parmams)) {
url = url + "?" + parmams;
}
System.out.println("url:" + url);
HttpGet httpGet = new HttpGet(url);
if (StringUtils.isNotBlank(token)) {
httpGet.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 执行请求并获得响应
response = httpClient.execute(httpGet);
} else if (StringUtils.isBlank(method) || "POST".equals(method.toUpperCase())) {
HttpPost httpPost = new HttpPost(url);
if (StringUtils.isNotBlank(token)) {
httpPost.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 设置请求体(例如:JSON数据)
StringEntity requestEntity = new StringEntity(parmams);
httpPost.setEntity(requestEntity);
// 执行请求并获得响应
response = httpClient.execute(httpPost);
} else if (StringUtils.isBlank(method) || "PUT".equals(method.toUpperCase())) {
HttpPut httpPut = new HttpPut(url);
if (StringUtils.isNotBlank(token)) {
httpPut.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 设置请求体(例如:JSON数据)
StringEntity requestEntity = new StringEntity(parmams);
httpPut.setEntity(requestEntity);
// 执行请求并获得响应
response = httpClient.execute(httpPut);
} else if (StringUtils.isBlank(method) || "DELETE".equals(method.toUpperCase())) {
if (StringUtils.isNotBlank(parmams)) {
url = url + "?" + parmams;
}
System.out.println("url:" + url);
HttpDelete httpDelete = new HttpDelete(url);
if (StringUtils.isNotBlank(token)) {
httpDelete.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 执行请求并获得响应
response = httpClient.execute(httpDelete);
} else if (StringUtils.isBlank(method) || "PATCH".equals(method.toUpperCase())) {
HttpPatch httpPatch = new HttpPatch(url);
if (StringUtils.isNotBlank(token)) {
httpPatch.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 设置请求体(例如:JSON数据)
StringEntity requestEntity = new StringEntity(parmams);
httpPatch.setEntity(requestEntity);
// 执行请求并获得响应
response = httpClient.execute(httpPatch);
} else if (StringUtils.isBlank(method) || "OPTIONS".equals(method.toUpperCase())) {
if (StringUtils.isNotBlank(parmams)) {
url = url + "?" + parmams;
}
HttpOptions httpOptions = new HttpOptions(url);
if (StringUtils.isNotBlank(token)) {
httpOptions.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 执行请求并获得响应
response = httpClient.execute(httpOptions);
} else if (StringUtils.isBlank(method) || "HEAD".equals(method.toUpperCase())) {
if (StringUtils.isNotBlank(parmams)) {
url = url + "?" + parmams;
}
HttpHead httpHead = new HttpHead(url);
if (StringUtils.isNotBlank(token)) {
httpHead.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 执行请求并获得响应
response = httpClient.execute(httpHead);
}
int statusCode = response.getStatusLine().getStatusCode();
// 打印响应状态
System.out.println("Response Status: " + statusCode);
// 获取响应内容
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println("Response Body: " + responseBody);
if (statusCode == 200) {
return true;
} else {
throw new DataSourcePluginException(
String.format(
"check http connectivity failed, config is: %s", requestParams));
}
} catch (Exception e) {
throw new DataSourcePluginException(
String.format("check http connectivity failed, config is: %s", requestParams));
}
// return true;
}
@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> options) {
throw new UnsupportedOperationException("getTables is not supported for Ftp datasource");
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
throw new UnsupportedOperationException("getDatabases is not supported for Ftp datasource");
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
throw new UnsupportedOperationException(
"getTableFields is not supported for Ftp datasource");
}
@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
throw new UnsupportedOperationException(
"getTableFields is not supported for Ftp datasource");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.http;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
public class HttpOptionRule {
public static final Option<String> URL =
Options.key("url")
.stringType()
.noDefaultValue()
.withDescription("the default host to use for connections");
public static final Option<Integer> METHOD =
Options.key("method")
.intType()
.noDefaultValue()
.withDescription("the default port to use for connections");
public static final Option<Integer> REQUEST_PARAMS =
Options.key("requestparams")
.intType()
.noDefaultValue()
.withDescription("the default port to use for connections");
public static final Option<String> TOKEN =
Options.key("token")
.stringType()
.noDefaultValue()
.withDescription("the http user token to use when connecting to the broker");
public static OptionRule optionRule() {
return OptionRule.builder().required(URL,METHOD).optional(TOKEN,REQUEST_PARAMS).build();
}
public static OptionRule metadataRule() {
return null;
}
public enum FileFormat {
JSON("json"),
;
private final String type;
FileFormat(String type) {
this.type = type;
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>datasource-jdbc-Access</artifactId>
<properties>
<mysql-connector.version>8.0.28</mysql-connector.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.auto.service/auto-service -->
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- driver -->
<dependency>
<groupId>net.sf.ucanaccess</groupId>
<artifactId>ucanaccess</artifactId>
<version>5.0.1</version>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.access.jdbc;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.common.collect.Sets;
import java.util.Set;
public class AccessDataSourceConfig {
public static final String PLUGIN_NAME = "JDBC-Access";
public static final DataSourcePluginInfo MYSQL_DATASOURCE_PLUGIN_INFO =
DataSourcePluginInfo.builder()
.name(PLUGIN_NAME)
.icon(PLUGIN_NAME)
.version("1.0.0")
.type(DatasourcePluginTypeEnum.DATABASE.getCode())
.build();
public static final Set<String> MYSQL_SYSTEM_DATABASES =
Sets.newHashSet("SYSTEM", "ROLL");
public static final OptionRule OPTION_RULE =
OptionRule.builder()
.required(org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.URL, org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.DRIVER)
.optional(org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.USER, org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.PASSWORD)
.build();
// public static final OptionRule METADATA_RULE =
// OptionRule.builder().required(org.apache.seatunnel.datasource.plugin.demeng.jdbc.DemengOptionRule.DATABASE, org.apache.seatunnel.datasource.plugin.demeng.jdbc.DemengOptionRule.TABLE).build();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.access.jdbc;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannelExt;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
import java.sql.*;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
public class AccessJdbcDataSourceChannel implements DataSourceChannelExt {
List<String> typeList = Arrays.asList("varchar", "char", "json");
public static class Holder {
private static final AccessJdbcDataSourceChannel INSTANCE = new AccessJdbcDataSourceChannel();
}
public static AccessJdbcDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return AccessDataSourceConfig.OPTION_RULE;
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return null;
}
public static List<Map> getResultSet(ResultSet rs) throws SQLException {
List<Map> reList = new ArrayList<>();
ResultSetMetaData rsmd = rs.getMetaData();
int columnCount = rsmd.getColumnCount();//ResultSet的总列数
while (rs.next()) {
Map map = new HashMap();
for (int i = 1; i <= columnCount; i++) {
Object value = rs.getObject(i);
String columnName = rsmd.getColumnName(i);
map.put(columnName, value);
}
reList.add(map);
}
return reList;
}
@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> option) {
List<String> tableNames = new ArrayList<>();
// 定义 SQL 语句执行对象
Statement state = null;
// 定义结果集对象
ResultSet rs = null;
try (Connection connection = getConnection(requestParams)) {
String sql_selectAll = "SELECT CAST(null AS VARCHAR(50)) AS TABLE_CAT,\n" +
"CAST(null AS VARCHAR(50)) AS TABLE_SCHEM,\n" +
"r.TABLE_NAME AS TABLE_NAME,\n" +
"l.TABLE_TYPE,\n" +
"l.REMARKS,\n" +
"l.TYPE_CAT,\n" +
"l.TYPE_SCHEM,\n" +
"l.TYPE_NAME,\n" +
"l.SELF_REFERENCING_COL_NAME,\n" +
"l.REF_GENERATION,\n" +
"l.HSQLDB_TYPE,\n" +
"l.READ_ONLY,\n" +
"l.COMMIT_ACTION \n" +
"FROM INFORMATION_SCHEMA.SYSTEM_TABLES l \n" +
"INNER JOIN UCA_METADATA.TABLES r \n" +
"ON( l.TABLE_NAME= r.ESCAPED_TABLE_NAME) \n" +
"WHERE \n" +
"TABLE_CAT = 'PUBLIC' AND TABLE_SCHEM = 'PUBLIC' AND TABLE_NAME LIKE '%' ESCAPE '\\';";
state = connection.createStatement();
//执行查询的 SQL 语句
rs = state.executeQuery(sql_selectAll);
List<Map> resultSet = getResultSet(rs);
tableNames = resultSet.stream().filter(f -> f.get("TABLE_NAME") != null).map(m -> m.get("TABLE_NAME") + "").collect(Collectors.toList());
return tableNames;
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
} finally {
try {
//关闭资源
rs.close();
state.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
List<String> tableNames = new ArrayList<>();
tableNames.add("all");
return tableNames;
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
try (Connection ignored = getConnection(requestParams)) {
return true;
} catch (Exception e) {
throw new DataSourcePluginException("check jdbc connectivity failed", e);
}
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
List<TableField> tableFields = new ArrayList<>();
try (Connection connection = getConnection(requestParams, database)) {
DatabaseMetaData metaData = connection.getMetaData();
String primaryKey = getPrimaryKey(metaData, database, table);
String sql =
"select * from INFORMATION_SCHEMA.SYSTEM_columns where TABLE_NAME='" + table.toUpperCase() + "' ";
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
TableField tableField = new TableField();
String columnName = resultSet.getString("COLUMN_NAME");
tableField.setPrimaryKey(false);
if (StringUtils.isNotBlank(primaryKey) && primaryKey.equals(columnName)) {
tableField.setPrimaryKey(true);
}
tableField.setType(resultSet.getString("TYPE_NAME"));
if (typeList.contains(tableField.getType().toLowerCase())) {
if (tableField.getType().toLowerCase().indexOf("text") < 0) {
tableField.setLen(resultSet.getString("COLUMN_SIZE"));
}
} else {
tableField.setLen(resultSet.getString("COLUMN_SIZE"));
// tableField.setScale(resultSet.getString("SCALE"));
}
String extra = resultSet.getString("IS_AUTOINCREMENT");
if (StringUtils.isNotBlank(extra) &&"YES".equals(extra.trim().toUpperCase())) {
tableField.setHasAutoIncrement(true);
}
tableField.setName(columnName);
// tableField.setComment(resultSet.getString("COLUMN_COMMENT"));
// tableField.setDefaultValue(resultSet.getString("COLUMN_DEFAULT"));
String nullable = resultSet.getString("NULLABLE");
if ("Y".equals(nullable)) {
tableField.setNullable(true);
} else {
tableField.setNullable(false);
}
tableFields.add(tableField);
}
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get table fields failed", e);
}
return tableFields;
}
@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
return tables.parallelStream()
.collect(
Collectors.toMap(
Function.identity(),
table ->
getTableFields(
pluginName, requestParams, database, table)));
}
private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
throws SQLException {
ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName);
while (primaryKeysInfo.next()) {
return primaryKeysInfo.getString("NAME");
}
return null;
}
private Connection getConnection(Map<String, String> requestParams)
throws SQLException, ClassNotFoundException {
return getConnection(requestParams, null);
}
private Connection getConnection(Map<String, String> requestParams, String databaseName)
throws SQLException, ClassNotFoundException {
// checkNotNull(requestParams.get(org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.DRIVER.key()));
checkNotNull(requestParams.get(org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.URL.key()), "Jdbc url cannot be null");
String url =
org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.URL.key();
// 加载UCanAccess JDBC驱动
Class.forName("net.ucanaccess.jdbc.UcanaccessDriver");
// 建立连接
Connection conn = DriverManager.getConnection(url);
System.out.println("Connected to the database successfully");
return conn;
}
@Override
public List<Map> getTableAlls(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
String tableName,
Map<String, String> options) {
// 定义 SQL 语句执行对象
Statement state = null;
// 定义结果集对象
ResultSet rs = null;
try (Connection connection = getConnection(requestParams)) {
String sql_selectAll = "SELECT CAST(null AS VARCHAR(50)) AS TABLE_CAT,\n" +
"CAST(null AS VARCHAR(50)) AS TABLE_SCHEM,\n" +
"r.TABLE_NAME AS TABLE_NAME,\n" +
"l.TABLE_TYPE,\n" +
"l.REMARKS,\n" +
"l.TYPE_CAT,\n" +
"l.TYPE_SCHEM,\n" +
"l.TYPE_NAME,\n" +
"l.SELF_REFERENCING_COL_NAME,\n" +
"l.REF_GENERATION,\n" +
"l.HSQLDB_TYPE,\n" +
"l.READ_ONLY,\n" +
"l.COMMIT_ACTION \n" +
"FROM INFORMATION_SCHEMA.SYSTEM_TABLES l \n" +
"INNER JOIN UCA_METADATA.TABLES r \n" +
"ON( l.TABLE_NAME= r.ESCAPED_TABLE_NAME) \n" +
"WHERE \n" +
"TABLE_CAT = 'PUBLIC' AND TABLE_SCHEM = 'PUBLIC' AND TABLE_NAME LIKE '%' ESCAPE '\\';";
state = connection.createStatement();
//执行查询的 SQL 语句
rs = state.executeQuery(sql_selectAll);
List<Map> resultSet = getResultSet(rs);
return resultSet;
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
} finally {
try {
//关闭资源
rs.close();
state.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@Override
public Map getTableName(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> options,
String tableName) {
List<Map> list = getTableAlls(pluginName, requestParams, database, tableName, options);
if (CollectionUtils.isNotEmpty(list)) {
return list.get(0);
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.access.jdbc;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import java.util.Set;
@Slf4j
@AutoService(DataSourceFactory.class)
public class AccessJdbcDataSourceFactory implements DataSourceFactory {
@Override
public String factoryIdentifier() {
return AccessDataSourceConfig.PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
return Sets.newHashSet(AccessDataSourceConfig.MYSQL_DATASOURCE_PLUGIN_INFO);
}
@Override
public DataSourceChannel createChannel() {
return new AccessJdbcDataSourceChannel();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.access.jdbc;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
public class AccessOptionRule {
public static final Option<String> URL =
Options.key("url")
.stringType()
.noDefaultValue()
.withDescription(
"jdbc url, eg:"
+ " http://localhost:9000/bucket/filename.mdb");
public static final Option<String> USER =
Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
public static final Option<String> PASSWORD =
Options.key("password").stringType().noDefaultValue().withDescription("jdbc password");
// public static final Option<String> DATABASE =
// Options.key("database").stringType().noDefaultValue().withDescription("jdbc database");
// public static final Option<String> TABLE =
// Options.key("table").stringType().noDefaultValue().withDescription("jdbc table");
public static final Option<DriverType> DRIVER =
Options.key("driver")
.enumType(DriverType.class)
.defaultValue(DriverType.DEMENG)
.withDescription("driver");
public enum DriverType {
DEMENG("net.ucanaccess.jdbc.UcanaccessDriver"),
;
private final String driverClassName;
DriverType(String driverClassName) {
this.driverClassName = driverClassName;
}
public String getDriverClassName() {
return driverClassName;
}
@Override
public String toString() {
return driverClassName;
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>datasource-jdbc-demeng</artifactId>
<properties>
<mysql-connector.version>8.0.28</mysql-connector.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.auto.service/auto-service -->
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<scope>provided</scope>
</dependency>
<!-- driver -->
<dependency>
<groupId>com.dameng</groupId>
<artifactId>Dm8JdbcDriver18</artifactId>
<version>8.1.1.49</version>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.demeng.jdbc;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.common.collect.Sets;
import java.util.Set;
public class DemengDataSourceConfig {
public static final String PLUGIN_NAME = "JDBC-Demeng";
public static final DataSourcePluginInfo MYSQL_DATASOURCE_PLUGIN_INFO =
DataSourcePluginInfo.builder()
.name(PLUGIN_NAME)
.icon(PLUGIN_NAME)
.version("1.0.0")
.type(DatasourcePluginTypeEnum.DATABASE.getCode())
.build();
public static final Set<String> MYSQL_SYSTEM_DATABASES =
Sets.newHashSet("SYSTEM", "ROLL");
public static final OptionRule OPTION_RULE =
OptionRule.builder()
.required(DemengOptionRule.URL, DemengOptionRule.DRIVER)
.optional(DemengOptionRule.USER, DemengOptionRule.PASSWORD)
.build();
public static final OptionRule METADATA_RULE =
OptionRule.builder().required(DemengOptionRule.DATABASE, DemengOptionRule.TABLE).build();
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.demeng.jdbc;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannelExt;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
import java.sql.*;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
public class DemengJdbcDataSourceChannel implements DataSourceChannelExt {
List<String> typeList = Arrays.asList("varchar", "char", "json");
public static class Holder {
private static final DemengJdbcDataSourceChannel INSTANCE = new DemengJdbcDataSourceChannel();
}
public static DemengJdbcDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return DemengDataSourceConfig.OPTION_RULE;
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return DemengDataSourceConfig.METADATA_RULE;
}
public static List<Map> getResultSet(ResultSet rs) throws SQLException {
List<Map> reList=new ArrayList<>();
ResultSetMetaData rsmd = rs.getMetaData() ;
int columnCount = rsmd.getColumnCount();//ResultSet的总列数
while (rs.next()) {
Map map=new HashMap();
for(int i=1;i<=columnCount;i++) {
Object value = rs.getObject(i);
String columnName = rsmd.getColumnName(i);
map.put(columnName,value);
}
reList.add(map);
}
return reList;
}
@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> option) {
List<String> tableNames = new ArrayList<>();
// 定义 SQL 语句执行对象
Statement state = null;
// 定义结果集对象
ResultSet rs = null;
try (Connection connection = getConnection(requestParams)) {
String sql_selectAll = "SELECT * FROM user_tables";
if(StringUtils.isNotBlank(database)) {
sql_selectAll=sql_selectAll+" where TABLESPACE_NAME='" + database.toUpperCase() + "'";
}
state = connection.createStatement();
//执行查询的 SQL 语句
rs = state.executeQuery(sql_selectAll);
List<Map> resultSet = getResultSet(rs);
tableNames = resultSet.stream().filter(f -> f.get("TABLE_NAME") != null).map(m -> m.get("TABLE_NAME") + "").collect(Collectors.toList());
return tableNames;
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
}finally {
try {
//关闭资源
rs.close();
state.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
List<String> tableNames = new ArrayList<>();
// 定义 SQL 语句执行对象
Statement state = null;
// 定义结果集对象
ResultSet rs = null;
try (Connection connection = getConnection(requestParams)) {
String sql_selectAll = "SELECT TABLESPACE_NAME FROM user_tables group by TABLESPACE_NAME ";
state = connection.createStatement();
//执行查询的 SQL 语句
rs = state.executeQuery(sql_selectAll);
List<Map> resultSet = getResultSet(rs);
tableNames = resultSet.stream().filter(f -> f.get("TABLESPACE_NAME") != null).map(m -> m.get("TABLESPACE_NAME") + "").collect(Collectors.toList());
return tableNames;
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get Databases names failed", e);
}finally {
try {
//关闭资源
rs.close();
state.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
try (Connection ignored = getConnection(requestParams)) {
return true;
} catch (Exception e) {
throw new DataSourcePluginException("check jdbc connectivity failed", e);
}
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
List<TableField> tableFields = new ArrayList<>();
try (Connection connection = getConnection(requestParams, database)) {
DatabaseMetaData metaData = connection.getMetaData();
String primaryKey = getPrimaryKey(metaData, database, table);
String sql =
" select * from SYSCOLUMNS where id in (select id from SYSOBJECTS where name='"+table.toUpperCase()+"')";
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
TableField tableField = new TableField();
String columnName = resultSet.getString("NAME");
tableField.setPrimaryKey(false);
if (StringUtils.isNotBlank(primaryKey) && primaryKey.equals(columnName)) {
tableField.setPrimaryKey(true);
}
tableField.setType(resultSet.getString("TYPE$"));
if (typeList.contains(tableField.getType().toLowerCase())) {
if (tableField.getType().toLowerCase().indexOf("text") < 0) {
tableField.setLen(resultSet.getString("LENGTH$"));
}
} else {
tableField.setLen(resultSet.getString("LENGTH$"));
tableField.setScale(resultSet.getString("SCALE"));
}
String extra = resultSet.getString("INFO2");
if (StringUtils.isNotBlank(extra) && extra.trim().equals("1")) {
tableField.setHasAutoIncrement(true);
}
tableField.setName(columnName);
// tableField.setComment(resultSet.getString("COLUMN_COMMENT"));
// tableField.setDefaultValue(resultSet.getString("COLUMN_DEFAULT"));
String nullable = resultSet.getString("NULLABLE$");
if("Y".equals(nullable)) {
tableField.setNullable(true);
} else {
tableField.setNullable(false);
}
tableFields.add(tableField);
}
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get table fields failed", e);
}
return tableFields;
}
@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
return tables.parallelStream()
.collect(
Collectors.toMap(
Function.identity(),
table ->
getTableFields(
pluginName, requestParams, database, table)));
}
private String getPrimaryKey(DatabaseMetaData metaData, String dbName, String tableName)
throws SQLException {
ResultSet primaryKeysInfo = metaData.getPrimaryKeys(dbName, "%", tableName);
while (primaryKeysInfo.next()) {
return primaryKeysInfo.getString("NAME");
}
return null;
}
private Connection getConnection(Map<String, String> requestParams)
throws SQLException, ClassNotFoundException {
return getConnection(requestParams, null);
}
private Connection getConnection(Map<String, String> requestParams, String databaseName)
throws SQLException, ClassNotFoundException {
checkNotNull(requestParams.get(DemengOptionRule.DRIVER.key()));
checkNotNull(requestParams.get(DemengOptionRule.URL.key()), "Jdbc url cannot be null");
String url =
DemengOptionRule.URL.key();
if (requestParams.containsKey(DemengOptionRule.USER.key())) {
String username = requestParams.get(DemengOptionRule.USER.key());
String password = requestParams.get(DemengOptionRule.PASSWORD.key());
Class.forName(DemengOptionRule.DriverType.DEMENG.getDriverClassName());
Connection conn = DriverManager.getConnection(url, username, password);
conn.setAutoCommit(true);
return conn;
}
Class.forName(DemengOptionRule.DriverType.DEMENG.getDriverClassName());
Connection conn = DriverManager.getConnection(url, "SYSDBA", null);
conn.setAutoCommit(true);
return conn;
}
@Override
public List<Map> getTableAlls(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
String tableName,
Map<String, String> options) {
// 定义 SQL 语句执行对象
Statement state = null;
// 定义结果集对象
ResultSet rs = null;
try (Connection connection = getConnection(requestParams)) {
String sql_selectAll = "SELECT * FROM user_tables";
if(StringUtils.isNotBlank(database)) {
sql_selectAll=sql_selectAll+" where TABLESPACE_NAME='" + database.toUpperCase() + "'";
}
state = connection.createStatement();
//执行查询的 SQL 语句
rs = state.executeQuery(sql_selectAll);
List<Map> resultSet = getResultSet(rs);
return resultSet;
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
}finally {
try {
//关闭资源
rs.close();
state.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
@Override
public Map getTableName(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> options,
String tableName) {
List<Map> list = getTableAlls(pluginName,requestParams,database,tableName,options);
if (CollectionUtils.isNotEmpty(list)) {
return list.get(0);
}
return null;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.demeng.jdbc;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import lombok.extern.slf4j.Slf4j;
import java.util.Set;
@Slf4j
@AutoService(DataSourceFactory.class)
public class DemengJdbcDataSourceFactory implements DataSourceFactory {
@Override
public String factoryIdentifier() {
return DemengDataSourceConfig.PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
return Sets.newHashSet(DemengDataSourceConfig.MYSQL_DATASOURCE_PLUGIN_INFO);
}
@Override
public DataSourceChannel createChannel() {
return new DemengJdbcDataSourceChannel();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.demeng.jdbc;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
public class DemengOptionRule {
public static final Option<String> URL =
Options.key("url")
.stringType()
.noDefaultValue()
.withDescription(
"jdbc url, eg:"
+ " jdbc:dm://localhost:5236");
public static final Option<String> USER =
Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
public static final Option<String> PASSWORD =
Options.key("password").stringType().noDefaultValue().withDescription("jdbc password");
public static final Option<String> DATABASE =
Options.key("database").stringType().noDefaultValue().withDescription("jdbc database");
public static final Option<String> TABLE =
Options.key("table").stringType().noDefaultValue().withDescription("jdbc table");
public static final Option<DriverType> DRIVER =
Options.key("driver")
.enumType(DriverType.class)
.defaultValue(DriverType.DEMENG)
.withDescription("driver");
public enum DriverType {
DEMENG("dm.jdbc.driver.DmDriver"),
;
private final String driverClassName;
DriverType(String driverClassName) {
this.driverClassName = driverClassName;
}
public String getDriverClassName() {
return driverClassName;
}
@Override
public String toString() {
return driverClassName;
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>datasource-xml</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-1.2-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkiverse.minio</groupId>
<artifactId>minio-client</artifactId>
<version>0.2.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>org.slf4j</groupId>-->
<!-- <artifactId>slf4j-reload4j</artifactId>-->
<!-- <version>1.7.35</version>-->
<!-- <scope>test</scope>-->
<!-- </dependency>-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.5</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>3.3.5</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
</dependency>
</dependencies>
</project>
package org.apache.seatunnel.datasource.plugin.xml;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import java.util.Map;
@Slf4j
public class XMLAConfiguration {
/* S3 constants */
private static final String S3A_SCHEMA = "s3a";
private static final String HDFS_S3N_IMPL = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
private static final String HDFS_S3A_IMPL = "org.apache.hadoop.fs.s3a.S3AFileSystem";
private static final String S3A_PROTOCOL = "s3a";
private static final String DEFAULT_PROTOCOL = "s3n";
private static final String S3_FORMAT_KEY = "fs.%s.%s";
private static final String HDFS_IMPL_KEY = "impl";
public static Configuration getConfiguration(Map<String, String> s3Options) {
if (!s3Options.containsKey(XMLOptionRule.BUCKET.key())) {
throw new IllegalArgumentException(
"S3 datasource bucket is null, please check your config");
}
if (!s3Options.containsKey(XMLOptionRule.FS_S3A_ENDPOINT.key())) {
throw new IllegalArgumentException(
"S3 datasource endpoint is null, please check your config");
}
String bucket = s3Options.get(XMLOptionRule.BUCKET.key());
String protocol = DEFAULT_PROTOCOL;
if (bucket.startsWith(S3A_PROTOCOL)) {
protocol = S3A_PROTOCOL;
}
String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : HDFS_S3N_IMPL;
Configuration hadoopConf = new Configuration();
hadoopConf.set("fs.defaut.name", bucket);
hadoopConf.set(
XMLOptionRule.FS_S3A_ENDPOINT.key(),
s3Options.get(XMLOptionRule.FS_S3A_ENDPOINT.key()));
hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
if (s3Options.containsKey(XMLOptionRule.HADOOP_S3_PROPERTIES.key())) {
Config configObject =
ConfigFactory.parseString(
s3Options.get(XMLOptionRule.HADOOP_S3_PROPERTIES.key()));
configObject
.entrySet()
.forEach(
entry -> {
hadoopConf.set(
entry.getKey(), entry.getValue().unwrapped().toString());
});
}
if (XMLOptionRule.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider.getProvider()
.equals(s3Options.get(XMLOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key()))) {
hadoopConf.set(
XMLOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key(),
s3Options.get(XMLOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key()));
hadoopConf.set("fs.s3a.access.key", s3Options.get(XMLOptionRule.ACCESS_KEY.key()));
hadoopConf.set("fs.s3a.secret.key", s3Options.get(XMLOptionRule.SECRET_KEY.key()));
} else {
hadoopConf.set(
XMLOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key(),
s3Options.get(XMLOptionRule.S3A_AWS_CREDENTIALS_PROVIDER.key()));
}
return hadoopConf;
}
private static String formatKey(String protocol, String key) {
return String.format(S3_FORMAT_KEY, protocol, key);
}
}
package org.apache.seatunnel.datasource.plugin.xml;
import io.minio.MinioClient;
import io.minio.errors.MinioException;
public class XMLClientService {
private String ENDPOINT;
private String PROVIDER;
private String USERNAME;
private String PASSWORD;
private String BUCKET;
private Integer PORT;
private final String clientId = "Client" + (int) (Math.random() * 100000000);
private MinioClient minioClient;
public XMLClientService(
String endpoint, String provider, String username, String password, Integer port)
throws MinioException {
this.ENDPOINT = endpoint;
this.PROVIDER = provider;
this.USERNAME = username;
this.PASSWORD = password;
this.PORT = port;
setMinioClient(endpoint, provider, username, password, port);
}
public MinioClient getMinioClient() {
return minioClient;
}
public void setMinioClient(
String endpoint, String provider, String username, String password, Integer port)
throws MinioException {
minioClient =
new MinioClient.Builder()
.endpoint(endpoint, port, false)
.credentials(username, password)
.build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.xml;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import java.util.Set;
@AutoService(DataSourceFactory.class)
public class XMLDataSourceFactory implements DataSourceFactory {
private static final String PLUGIN_NAME = "S3";
@Override
public String factoryIdentifier() {
return PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
DataSourcePluginInfo s3DatasourcePluginInfo =
DataSourcePluginInfo.builder()
.name(PLUGIN_NAME)
.type(DatasourcePluginTypeEnum.FILE.getCode())
.version("1.0.0")
.supportVirtualTables(false)
.icon("S3File")
.build();
return Sets.newHashSet(s3DatasourcePluginInfo);
}
@Override
public DataSourceChannel createChannel() {
return XMLDatasourceChannel.getInstance();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.xml;
import com.alibaba.fastjson2.JSON;
import io.minio.*;
import io.minio.errors.*;
import io.minio.messages.Bucket;
import io.minio.messages.Item;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Field;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class XMLDatasourceChannel implements DataSourceChannel {
private XMLClientService s3ClientService;
public static class Holder {
private static final XMLDatasourceChannel INSTANCE = new XMLDatasourceChannel();
}
public static XMLDatasourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return XMLOptionRule.optionRule();
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return XMLOptionRule.metadataRule();
}
@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> options) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
List<String> tab = new ArrayList<>();
try {
boolean b =
minioClient.bucketExists(BucketExistsArgs.builder().bucket(database).build());
if (!b) {
return tab;
// throw new MinioException("桶不存在");
}
Iterable<Result<Item>> results = getFileByDir(minioClient, database, null);
results.forEach(
x -> {
try {
boolean dir = x.get().isDir();
String s = x.get().objectName();
if (!dir) {
tab.add(s);
} else {
getAllFile(
tab,
getFileByDir(minioClient, database, s),
minioClient,
database);
}
} catch (ErrorResponseException
| InsufficientDataException
| InternalException
| InvalidKeyException
| InvalidResponseException
| IOException
| NoSuchAlgorithmException
| ServerException
| XmlParserException e) {
throw new RuntimeException(e);
}
});
return tab;
} catch (InvalidKeyException | IOException | NoSuchAlgorithmException | MinioException e) {
throw new RuntimeException(e);
}
// throw new UnsupportedOperationException("getTables is not supported for S3
// datasource");
}
private void getAllFile(
List<String> tab,
Iterable<Result<Item>> results,
MinioClient minioClient,
String database) {
results.forEach(
x -> {
try {
boolean dir = x.get().isDir();
String s = x.get().objectName();
if (!dir) {
tab.add(s);
} else {
this.getAllFile(
tab,
getFileByDir(minioClient, database, s),
minioClient,
database);
}
} catch (ErrorResponseException
| InsufficientDataException
| InternalException
| InvalidKeyException
| InvalidResponseException
| IOException
| NoSuchAlgorithmException
| ServerException
| XmlParserException e) {
throw new RuntimeException(e);
}
});
}
private Iterable<Result<Item>> getFileByDir(
@NonNull MinioClient minioClient, @NonNull String bucket, String dir) {
Iterable<Result<Item>> results;
if (StringUtils.isEmpty(dir)) {
results = minioClient.listObjects(ListObjectsArgs.builder().bucket(bucket).build());
} else {
results =
minioClient.listObjects(
ListObjectsArgs.builder().bucket(bucket).prefix(dir).build());
}
return results;
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
List<String> db = new ArrayList<>();
try {
List<Bucket> buckets = minioClient.listBuckets();
buckets.forEach(
x -> {
String name = x.name();
db.add(name);
});
return db;
} catch (ServerException
| ErrorResponseException
| InsufficientDataException
| IOException
| NoSuchAlgorithmException
| InvalidKeyException
| InvalidResponseException
| XmlParserException
| InternalException e) {
throw new RuntimeException(e);
}
// throw new UnsupportedOperationException("getDatabases is not supported for S3
// datasource");
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
if (requestParams.isEmpty()) {
throw new SeaTunnelException("requestParmas 为空!");
}
try {
createS3Client(requestParams);
return true;
} catch (Exception ex) {
throw new DataSourcePluginException(
"check s3 connectivity failed, " + ex.getMessage(), ex);
}
// try (FileSystem fs = FileSystem.get(conf)) {
//// fs.listStatus(new Path("/"));
//
// return true;
// } catch (IOException e) {
// throw new DataSourcePluginException(
// String.format("check s3 connectivity failed, config is: %s",
// requestParams), e);
// }
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
List<TableField> tableFields = getObject(minioClient, table, database, requestParams);
return tableFields;
// throw new UnsupportedOperationException(
// "getTableFields is not supported for S3 datasource");
}
@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
createS3Client(requestParams);
MinioClient minioClient = s3ClientService.getMinioClient();
Map<String, List<TableField>> stringList = new HashMap<>();
tables.forEach(
tab -> {
List<TableField> tableFields =
getObject(minioClient, tab, database, requestParams);
stringList.put(tab, tableFields);
});
return stringList;
// throw new UnsupportedOperationException(
// "getTableFields is not supported for S3 datasource");
}
private List<TableField> getObject(
MinioClient minioClient,
String table,
String database,
Map<String, String> requestParams) {
List<TableField> all = new ArrayList<>();
try {
GetObjectResponse minioClientObject =
minioClient.getObject(
GetObjectArgs.builder().object(table).bucket(database).build());
BufferedReader bufferedReader =
new BufferedReader(new InputStreamReader(minioClientObject));
String line = null;
while ((line = bufferedReader.readLine()) != null) {
if (table.endsWith("csv")) {
String delimiter = requestParams.get("delimiter");
String[] split = line.split(delimiter);
Class<TableField> tableFieldClass = TableField.class;
Field[] declaredFields = tableFieldClass.getDeclaredFields();
HashMap<String, Object> map = new HashMap<>();
for (int i = 0; i < split.length; i++) {
String name = declaredFields[i].getName();
map.put(name, split[i]);
}
TableField tableField =
JSON.parseObject(JSON.toJSONString(map), TableField.class);
all.add(tableField);
}
}
System.out.println(
all + "==================================================================");
} catch (ErrorResponseException
| InsufficientDataException
| InternalException
| InvalidKeyException
| InvalidResponseException
| IOException
| NoSuchAlgorithmException
| ServerException
| XmlParserException e) {
throw new RuntimeException(e);
}
return all;
}
public XMLClientService createS3Client(Map<String, String> requestParams) {
int i = requestParams.get("fs.s3a.endpoint").lastIndexOf(":");
String endpoint = requestParams.get("fs.s3a.endpoint") + "";
Integer port =
Integer.valueOf(
requestParams.get("fs.s3a.endpoint").substring(i + 1).replace("/", ""));
String provider = requestParams.get("fs.s3a.aws.credentials.provider") + "";
String username = requestParams.get("access_key") + "";
String password = requestParams.get("secret_key") + "";
// String bucket = requestParams.get("bucket") + "";
try {
s3ClientService = new XMLClientService(endpoint, provider, username, password, port);
return s3ClientService;
} catch (Exception e) {
throw new SeaTunnelException("创建Mqtt客户端错误!");
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.xml;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import java.util.Arrays;
import java.util.Map;
public class XMLOptionRule {
public static final Option<String> ACCESS_KEY =
Options.key("access_key")
.stringType()
.noDefaultValue()
.withDescription("S3 access key");
public static final Option<String> SECRET_KEY =
Options.key("secret_key")
.stringType()
.noDefaultValue()
.withDescription("S3 secret key");
public static final Option<String> BUCKET =
Options.key("bucket").stringType().noDefaultValue().withDescription("S3 bucket name");
public static final Option<String> FS_S3A_ENDPOINT =
Options.key("fs.s3a.endpoint")
.stringType()
.noDefaultValue()
.withDescription("fs s3a endpoint");
public static final Option<S3aAwsCredentialsProvider> S3A_AWS_CREDENTIALS_PROVIDER =
Options.key("fs.s3a.aws.credentials.provider")
.enumType(S3aAwsCredentialsProvider.class)
.defaultValue(S3aAwsCredentialsProvider.InstanceProfileCredentialsProvider)
.withDescription("s3a aws credentials provider");
public static final Option<Map<String, String>> HADOOP_S3_PROPERTIES =
Options.key("hadoop_s3_properties")
.mapType()
.noDefaultValue()
.withDescription(
"{\n"
+ "fs.s3a.buffer.dir=/data/st_test/s3a\n"
+ "fs.s3a.fast.upload.buffer=disk\n"
+ "}");
public static OptionRule optionRule() {
return OptionRule.builder()
.required(BUCKET, FS_S3A_ENDPOINT, S3A_AWS_CREDENTIALS_PROVIDER)
.optional(HADOOP_S3_PROPERTIES)
.conditional(
S3A_AWS_CREDENTIALS_PROVIDER,
S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider,
ACCESS_KEY,
SECRET_KEY)
.build();
}
public static final Option<String> PATH =
Options.key("path").stringType().noDefaultValue().withDescription("S3 write path");
public static final Option<String> TYPE =
Options.key("file_format_type")
.stringType()
.defaultValue("xml")
.withDescription("S3 write type");
public static final Option<String> DELIMITER =
Options.key("delimiter")
.stringType()
.noDefaultValue()
.withDescription("S3 write delimiter");
public static final Option<Map<String, String>> SCHEMA =
Options.key("schema").mapType().noDefaultValue().withDescription("SeaTunnel Schema");
public static final Option<Boolean> PARSE_PARSE_PARTITION_FROM_PATH =
Options.key("parse_partition_from_path")
.booleanType()
.noDefaultValue()
.withDescription("S3 write parse_partition_from_path");
public static final Option<String> DATE_FORMAT =
Options.key("date_format")
.stringType()
.noDefaultValue()
.withDescription("S3 write date_format");
public static final Option<String> DATETIME_FORMAT =
Options.key("time_format")
.stringType()
.noDefaultValue()
.withDescription("S3 write time_format");
public static final Option<String> TIME_FORMAT =
Options.key("datetime_format")
.stringType()
.noDefaultValue()
.withDescription("S3 write datetime_format");
public static OptionRule metadataRule() {
return OptionRule.builder()
.required(PATH, TYPE)
.conditional(TYPE, FileFormat.XML.type, DELIMITER)
.conditional(TYPE,FileFormat.XML.type, SCHEMA)
.optional(PARSE_PARSE_PARTITION_FROM_PATH)
.optional(DATE_FORMAT)
.optional(DATETIME_FORMAT)
.optional(TIME_FORMAT)
.build();
}
public enum S3aAwsCredentialsProvider {
SimpleAWSCredentialsProvider("org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"),
InstanceProfileCredentialsProvider("com.amazonaws.auth.InstanceProfileCredentialsProvider");
private String provider;
S3aAwsCredentialsProvider(String provider) {
this.provider = provider;
}
public String getProvider() {
return provider;
}
@Override
public String toString() {
return provider;
}
}
public enum FileFormat {
CSV("csv"),
TEXT("txt"),
PARQUET("parquet"),
ORC("orc"),
JSON("json"),
XML("xml");
private final String type;
FileFormat(String type) {
this.type = type;
}
}
}
......@@ -50,7 +50,12 @@
<module>datasource-redis</module>
<module>datasource-rabbitmq</module>
<module>datasource-ftp</module>
<module>datasource-jdbc-demeng</module>
<module>datasource-jdbc-access</module>
<module>datasource-http</module>
<module>datasource-xml</module>
<module>datasource-csv</module>
<module>datasource-excel</module>
</modules>
<build>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论