提交 d18b47a1 authored 作者: 宋勇's avatar 宋勇

atasource-jdbc-demeng

datasource-jdbc-access datasource-http datasource-xml datasource-csv datasource-excel 增加
上级 373502c0
......@@ -141,7 +141,6 @@
<artifactId>aws-java-sdk-bundle</artifactId>
</dependency>
</dependencies>
</dependencies>
</project>
package org.apache.seatunnel.datasource.plugin.csv;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.hadoop.conf.Configuration;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@Slf4j
......@@ -29,7 +31,7 @@ public class CSVAConfiguration {
throw new IllegalArgumentException(
"S3 datasource endpoint is null, please check your config");
}
String bucket = s3Options.get( CSVOptionRule.BUCKET.key());
String bucket = s3Options.get(CSVOptionRule.BUCKET.key());
String protocol = DEFAULT_PROTOCOL;
if (bucket.startsWith(S3A_PROTOCOL)) {
......
......@@ -17,13 +17,14 @@
package org.apache.seatunnel.datasource.plugin.csv;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set;
@AutoService(DataSourceFactory.class)
......
......@@ -17,18 +17,20 @@
package org.apache.seatunnel.datasource.plugin.csv;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.fastjson2.JSON;
import io.minio.*;
import io.minio.errors.*;
import io.minio.messages.Bucket;
import io.minio.messages.Item;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import java.io.BufferedReader;
import java.io.IOException;
......
......@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import java.util.Arrays;
import java.util.Map;
public class CSVOptionRule {
......@@ -129,7 +128,6 @@ public class CSVOptionRule {
.build();
}
public enum S3aAwsCredentialsProvider {
SimpleAWSCredentialsProvider("org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"),
......
......@@ -141,7 +141,6 @@
<artifactId>aws-java-sdk-bundle</artifactId>
</dependency>
</dependencies>
</dependencies>
</project>
package org.apache.seatunnel.datasource.plugin.excel;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.hadoop.conf.Configuration;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@Slf4j
......@@ -29,7 +31,7 @@ public class ExcelAConfiguration {
throw new IllegalArgumentException(
"S3 datasource endpoint is null, please check your config");
}
String bucket = s3Options.get( ExcelOptionRule.BUCKET.key());
String bucket = s3Options.get(ExcelOptionRule.BUCKET.key());
String protocol = DEFAULT_PROTOCOL;
if (bucket.startsWith(S3A_PROTOCOL)) {
......
......@@ -17,13 +17,14 @@
package org.apache.seatunnel.datasource.plugin.excel;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set;
@AutoService(DataSourceFactory.class)
......
......@@ -17,18 +17,20 @@
package org.apache.seatunnel.datasource.plugin.excel;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.fastjson2.JSON;
import io.minio.*;
import io.minio.errors.*;
import io.minio.messages.Bucket;
import io.minio.messages.Item;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import java.io.BufferedReader;
import java.io.IOException;
......
......@@ -129,7 +129,6 @@ public class ExcelOptionRule {
.build();
}
public enum S3aAwsCredentialsProvider {
SimpleAWSCredentialsProvider("org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"),
......
......@@ -27,8 +27,7 @@ public class HttpAConfiguration {
public static HttpConfiguration getConfiguration(Map<String, String> ftpOption) {
if (!ftpOption.containsKey(HttpOptionRule.URL.key())) {
throw new IllegalArgumentException(
"url is null, please check your config");
throw new IllegalArgumentException("url is null, please check your config");
}
HttpConfiguration httpAConfiguration = new HttpConfiguration();
......@@ -37,7 +36,6 @@ public class HttpAConfiguration {
httpAConfiguration.setToken(HttpOptionRule.TOKEN.key());
httpAConfiguration.setRequest_params(HttpOptionRule.REQUEST_PARAMS.key());
return httpAConfiguration;
}
}
package org.apache.seatunnel.datasource.plugin.http;
import org.apache.http.client.HttpClient;
import org.apache.http.impl.client.HttpClients;
......@@ -8,14 +7,9 @@ public class HttpClientService {
public static HttpClient connect(HttpConfiguration conf) throws Exception {
// 创建HttpClient实例
HttpClient client = HttpClients.createDefault();
return client;
}
}
......@@ -10,12 +10,11 @@ public class HttpConfiguration {
public HttpConfiguration() {}
public HttpConfiguration(
String url,String method, String token,String request_params ) {
public HttpConfiguration(String url, String method, String token, String request_params) {
this.url = url;
this.token = token;
this.method=method;
this.request_params=request_params;
this.method = method;
this.request_params = request_params;
}
public String getUrl() {
......@@ -26,7 +25,6 @@ public class HttpConfiguration {
this.url = url;
}
public String getMethod() {
return method;
}
......
......@@ -17,23 +17,21 @@
package org.apache.seatunnel.datasource.plugin.http;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.methods.*;
import org.apache.http.entity.StringEntity;
import org.apache.http.util.EntityUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import lombok.NonNull;
import java.net.URI;
import java.util.List;
import java.util.Map;
import java.util.Objects;
......@@ -75,7 +73,8 @@ public class HttpDatasourceChannel implements DataSourceChannel {
System.out.println("url:" + url);
HttpGet httpGet = new HttpGet(url);
if (StringUtils.isNotBlank(token)) {
httpGet.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
httpGet.setHeader(
"Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 执行请求并获得响应
......@@ -83,7 +82,8 @@ public class HttpDatasourceChannel implements DataSourceChannel {
} else if (StringUtils.isBlank(method) || "POST".equals(method.toUpperCase())) {
HttpPost httpPost = new HttpPost(url);
if (StringUtils.isNotBlank(token)) {
httpPost.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
httpPost.setHeader(
"Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 设置请求体(例如:JSON数据)
StringEntity requestEntity = new StringEntity(parmams);
......@@ -94,7 +94,8 @@ public class HttpDatasourceChannel implements DataSourceChannel {
} else if (StringUtils.isBlank(method) || "PUT".equals(method.toUpperCase())) {
HttpPut httpPut = new HttpPut(url);
if (StringUtils.isNotBlank(token)) {
httpPut.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
httpPut.setHeader(
"Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 设置请求体(例如:JSON数据)
StringEntity requestEntity = new StringEntity(parmams);
......@@ -110,7 +111,8 @@ public class HttpDatasourceChannel implements DataSourceChannel {
HttpDelete httpDelete = new HttpDelete(url);
if (StringUtils.isNotBlank(token)) {
httpDelete.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
httpDelete.setHeader(
"Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 执行请求并获得响应
......@@ -119,7 +121,8 @@ public class HttpDatasourceChannel implements DataSourceChannel {
} else if (StringUtils.isBlank(method) || "PATCH".equals(method.toUpperCase())) {
HttpPatch httpPatch = new HttpPatch(url);
if (StringUtils.isNotBlank(token)) {
httpPatch.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
httpPatch.setHeader(
"Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 设置请求体(例如:JSON数据)
StringEntity requestEntity = new StringEntity(parmams);
......@@ -133,7 +136,8 @@ public class HttpDatasourceChannel implements DataSourceChannel {
}
HttpOptions httpOptions = new HttpOptions(url);
if (StringUtils.isNotBlank(token)) {
httpOptions.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
httpOptions.setHeader(
"Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 执行请求并获得响应
......@@ -145,7 +149,8 @@ public class HttpDatasourceChannel implements DataSourceChannel {
}
HttpHead httpHead = new HttpHead(url);
if (StringUtils.isNotBlank(token)) {
httpHead.setHeader("Authorization", "Bearer " + token.replace("Bearer ", "").trim());
httpHead.setHeader(
"Authorization", "Bearer " + token.replace("Bearer ", "").trim());
}
// 执行请求并获得响应
......@@ -160,7 +165,6 @@ public class HttpDatasourceChannel implements DataSourceChannel {
String responseBody = EntityUtils.toString(response.getEntity());
System.out.println("Response Body: " + responseBody);
if (statusCode == 200) {
return true;
} else {
......
......@@ -47,11 +47,8 @@ public class HttpOptionRule {
.noDefaultValue()
.withDescription("the http user token to use when connecting to the broker");
public static OptionRule optionRule() {
return OptionRule.builder().required(URL,METHOD).optional(TOKEN,REQUEST_PARAMS).build();
return OptionRule.builder().required(URL, METHOD).optional(TOKEN, REQUEST_PARAMS).build();
}
public static OptionRule metadataRule() {
......@@ -59,8 +56,6 @@ public class HttpOptionRule {
}
public enum FileFormat {
JSON("json"),
;
......
......@@ -56,7 +56,17 @@
<artifactId>ucanaccess</artifactId>
<version>5.0.1</version>
</dependency>
<dependency>
<groupId>com.cdzhiyong.unified</groupId>
<artifactId>minio-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.14</version>
</dependency>
</dependencies>
</project>
......@@ -37,15 +37,22 @@ public class AccessDataSourceConfig {
.type(DatasourcePluginTypeEnum.DATABASE.getCode())
.build();
public static final Set<String> MYSQL_SYSTEM_DATABASES =
Sets.newHashSet("SYSTEM", "ROLL");
public static final Set<String> MYSQL_SYSTEM_DATABASES = Sets.newHashSet("SYSTEM", "ROLL");
public static final OptionRule OPTION_RULE =
OptionRule.builder()
.required(org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.URL, org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.DRIVER)
.optional(org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.USER, org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.PASSWORD)
.required(
org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.URL,
org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule
.DRIVER)
.optional(
org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule
.USER,
org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule
.PASSWORD)
.build();
// public static final OptionRule METADATA_RULE =
// OptionRule.builder().required(org.apache.seatunnel.datasource.plugin.demeng.jdbc.DemengOptionRule.DATABASE, org.apache.seatunnel.datasource.plugin.demeng.jdbc.DemengOptionRule.TABLE).build();
// public static final OptionRule METADATA_RULE =
//
// OptionRule.builder().required(org.apache.seatunnel.datasource.plugin.demeng.jdbc.DemengOptionRule.DATABASE, org.apache.seatunnel.datasource.plugin.demeng.jdbc.DemengOptionRule.TABLE).build();
}
......@@ -17,17 +17,21 @@
package org.apache.seatunnel.datasource.plugin.access.jdbc;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannelExt;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.sql.*;
import java.util.*;
import java.util.function.Function;
......@@ -39,7 +43,8 @@ public class AccessJdbcDataSourceChannel implements DataSourceChannelExt {
List<String> typeList = Arrays.asList("varchar", "char", "json");
public static class Holder {
private static final AccessJdbcDataSourceChannel INSTANCE = new AccessJdbcDataSourceChannel();
private static final AccessJdbcDataSourceChannel INSTANCE =
new AccessJdbcDataSourceChannel();
}
public static AccessJdbcDataSourceChannel getInstance() {
......@@ -60,17 +65,15 @@ public class AccessJdbcDataSourceChannel implements DataSourceChannelExt {
List<Map> reList = new ArrayList<>();
ResultSetMetaData rsmd = rs.getMetaData();
int columnCount = rsmd.getColumnCount();//ResultSet的总列数
int columnCount = rsmd.getColumnCount(); // ResultSet的总列数
while (rs.next()) {
Map map = new HashMap();
for (int i = 1; i <= columnCount; i++) {
Object value = rs.getObject(i);
String columnName = rsmd.getColumnName(i);
map.put(columnName, value);
}
reList.add(map);
}
return reList;
}
......@@ -87,36 +90,41 @@ public class AccessJdbcDataSourceChannel implements DataSourceChannelExt {
// 定义结果集对象
ResultSet rs = null;
try (Connection connection = getConnection(requestParams)) {
String sql_selectAll = "SELECT CAST(null AS VARCHAR(50)) AS TABLE_CAT,\n" +
"CAST(null AS VARCHAR(50)) AS TABLE_SCHEM,\n" +
"r.TABLE_NAME AS TABLE_NAME,\n" +
"l.TABLE_TYPE,\n" +
"l.REMARKS,\n" +
"l.TYPE_CAT,\n" +
"l.TYPE_SCHEM,\n" +
"l.TYPE_NAME,\n" +
"l.SELF_REFERENCING_COL_NAME,\n" +
"l.REF_GENERATION,\n" +
"l.HSQLDB_TYPE,\n" +
"l.READ_ONLY,\n" +
"l.COMMIT_ACTION \n" +
"FROM INFORMATION_SCHEMA.SYSTEM_TABLES l \n" +
"INNER JOIN UCA_METADATA.TABLES r \n" +
"ON( l.TABLE_NAME= r.ESCAPED_TABLE_NAME) \n" +
"WHERE \n" +
"TABLE_CAT = 'PUBLIC' AND TABLE_SCHEM = 'PUBLIC' AND TABLE_NAME LIKE '%' ESCAPE '\\';";
String sql_selectAll =
"SELECT CAST(null AS VARCHAR(50)) AS TABLE_CAT,\n"
+ "CAST(null AS VARCHAR(50)) AS TABLE_SCHEM,\n"
+ "r.TABLE_NAME AS TABLE_NAME,\n"
+ "l.TABLE_TYPE,\n"
+ "l.REMARKS,\n"
+ "l.TYPE_CAT,\n"
+ "l.TYPE_SCHEM,\n"
+ "l.TYPE_NAME,\n"
+ "l.SELF_REFERENCING_COL_NAME,\n"
+ "l.REF_GENERATION,\n"
+ "l.HSQLDB_TYPE,\n"
+ "l.READ_ONLY,\n"
+ "l.COMMIT_ACTION \n"
+ "FROM INFORMATION_SCHEMA.SYSTEM_TABLES l \n"
+ "INNER JOIN UCA_METADATA.TABLES r \n"
+ "ON( l.TABLE_NAME= r.ESCAPED_TABLE_NAME) \n"
+ "WHERE \n"
+ "TABLE_CAT = 'PUBLIC' AND TABLE_SCHEM = 'PUBLIC' AND TABLE_NAME LIKE '%' ESCAPE '\\';";
state = connection.createStatement();
//执行查询的 SQL 语句
// 执行查询的 SQL 语句
rs = state.executeQuery(sql_selectAll);
List<Map> resultSet = getResultSet(rs);
tableNames = resultSet.stream().filter(f -> f.get("TABLE_NAME") != null).map(m -> m.get("TABLE_NAME") + "").collect(Collectors.toList());
tableNames =
resultSet.stream()
.filter(f -> f.get("TABLE_NAME") != null)
.map(m -> m.get("TABLE_NAME") + "")
.collect(Collectors.toList());
return tableNames;
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
} finally {
try {
//关闭资源
// 关闭资源
rs.close();
state.close();
} catch (SQLException e) {
......@@ -131,7 +139,6 @@ public class AccessJdbcDataSourceChannel implements DataSourceChannelExt {
List<String> tableNames = new ArrayList<>();
tableNames.add("all");
return tableNames;
}
@Override
......@@ -156,7 +163,9 @@ public class AccessJdbcDataSourceChannel implements DataSourceChannelExt {
String primaryKey = getPrimaryKey(metaData, database, table);
String sql =
"select * from INFORMATION_SCHEMA.SYSTEM_columns where TABLE_NAME='" + table.toUpperCase() + "' ";
"select * from INFORMATION_SCHEMA.SYSTEM_columns where TABLE_NAME='"
+ table.toUpperCase()
+ "' ";
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
......@@ -176,7 +185,7 @@ public class AccessJdbcDataSourceChannel implements DataSourceChannelExt {
// tableField.setScale(resultSet.getString("SCALE"));
}
String extra = resultSet.getString("IS_AUTOINCREMENT");
if (StringUtils.isNotBlank(extra) &&"YES".equals(extra.trim().toUpperCase())) {
if (StringUtils.isNotBlank(extra) && "YES".equals(extra.trim().toUpperCase())) {
tableField.setHasAutoIncrement(true);
}
......@@ -230,15 +239,82 @@ public class AccessJdbcDataSourceChannel implements DataSourceChannelExt {
private Connection getConnection(Map<String, String> requestParams, String databaseName)
throws SQLException, ClassNotFoundException {
// checkNotNull(requestParams.get(org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.DRIVER.key()));
checkNotNull(requestParams.get(org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.URL.key()), "Jdbc url cannot be null");
String url =
org.apache.seatunnel.datasource.plugin.access.jdbc.AccessOptionRule.URL.key();
checkNotNull(requestParams.get(AccessOptionRule.URL.key()), "Jdbc url cannot be null");
String url = AccessOptionRule.URL.key();
String substring = url.substring(url.lastIndexOf("/"), url.length()).replace("/", "");
if (StringUtils.isBlank(substring)) {
throw new RuntimeException("没找到文件名称!");
}
String[] split = substring.split("\\.");
String prefix = split[0];
String suffix = "";
if (split.length > 1) {
suffix = "." + split[1];
}
// 定义 SQL 语句执行对象
Statement state = null;
// 定义结果集对象
ResultSet rs = null;
Connection conn = null;
File path = new File("/tmp");
if (!path.exists()) {
path.mkdir();
}
FileOutputStream outputStream = null;
InputStream in = null;
HttpURLConnection con = null;
File file = null;
try {
file = File.createTempFile(prefix, suffix, path);
outputStream = new FileOutputStream(file);
URL urldownl = new URL(url);
con = (HttpURLConnection) urldownl.openConnection();
con.setRequestMethod("GET");
in = con.getInputStream();
byte[] buffer = new byte[1024];
int bytesRead;
while ((bytesRead = in.read(buffer)) != -1) {
outputStream.write(buffer, 0, bytesRead);
}
outputStream.flush();
} catch (Exception e) {
System.out.println("读取url 文件失败!" + e);
throw new RuntimeException("读取url 文件失败!");
} finally {
try {
if (in != null) {
in.close();
}
if (con != null) {
con.disconnect();
}
if (outputStream != null) {
outputStream.close();
}
} catch (Exception ee) {
System.out.println("关闭流错误!" + ee);
}
}
String database = path + "/" + file.getName();
// JDBC连接字符串
// 在application.properties或application.yml文件中配置数据库连接信息。你需要指定驱动类名为net.ucanaccess.jdbc.UcanaccessDriver,并提供Access数据库的URL。例如,URL可以是jdbc:ucanaccess://D:/Access2003/database/db_test.mdb;openExclusive=false;ignoreCase=true。[1]
String connectionString =
"jdbc:ucanaccess://" + database + ";openExclusive=false;ignoreCase=true";
// 加载UCanAccess JDBC驱动
Class.forName("net.ucanaccess.jdbc.UcanaccessDriver");
// 建立连接
Connection conn = DriverManager.getConnection(url);
conn = DriverManager.getConnection(url);
System.out.println("Connected to the database successfully");
return conn;
}
......@@ -255,26 +331,27 @@ public class AccessJdbcDataSourceChannel implements DataSourceChannelExt {
// 定义结果集对象
ResultSet rs = null;
try (Connection connection = getConnection(requestParams)) {
String sql_selectAll = "SELECT CAST(null AS VARCHAR(50)) AS TABLE_CAT,\n" +
"CAST(null AS VARCHAR(50)) AS TABLE_SCHEM,\n" +
"r.TABLE_NAME AS TABLE_NAME,\n" +
"l.TABLE_TYPE,\n" +
"l.REMARKS,\n" +
"l.TYPE_CAT,\n" +
"l.TYPE_SCHEM,\n" +
"l.TYPE_NAME,\n" +
"l.SELF_REFERENCING_COL_NAME,\n" +
"l.REF_GENERATION,\n" +
"l.HSQLDB_TYPE,\n" +
"l.READ_ONLY,\n" +
"l.COMMIT_ACTION \n" +
"FROM INFORMATION_SCHEMA.SYSTEM_TABLES l \n" +
"INNER JOIN UCA_METADATA.TABLES r \n" +
"ON( l.TABLE_NAME= r.ESCAPED_TABLE_NAME) \n" +
"WHERE \n" +
"TABLE_CAT = 'PUBLIC' AND TABLE_SCHEM = 'PUBLIC' AND TABLE_NAME LIKE '%' ESCAPE '\\';";
String sql_selectAll =
"SELECT CAST(null AS VARCHAR(50)) AS TABLE_CAT,\n"
+ "CAST(null AS VARCHAR(50)) AS TABLE_SCHEM,\n"
+ "r.TABLE_NAME AS TABLE_NAME,\n"
+ "l.TABLE_TYPE,\n"
+ "l.REMARKS,\n"
+ "l.TYPE_CAT,\n"
+ "l.TYPE_SCHEM,\n"
+ "l.TYPE_NAME,\n"
+ "l.SELF_REFERENCING_COL_NAME,\n"
+ "l.REF_GENERATION,\n"
+ "l.HSQLDB_TYPE,\n"
+ "l.READ_ONLY,\n"
+ "l.COMMIT_ACTION \n"
+ "FROM INFORMATION_SCHEMA.SYSTEM_TABLES l \n"
+ "INNER JOIN UCA_METADATA.TABLES r \n"
+ "ON( l.TABLE_NAME= r.ESCAPED_TABLE_NAME) \n"
+ "WHERE \n"
+ "TABLE_CAT = 'PUBLIC' AND TABLE_SCHEM = 'PUBLIC' AND TABLE_NAME LIKE '%' ESCAPE '\\';";
state = connection.createStatement();
//执行查询的 SQL 语句
// 执行查询的 SQL 语句
rs = state.executeQuery(sql_selectAll);
List<Map> resultSet = getResultSet(rs);
......@@ -283,7 +360,7 @@ public class AccessJdbcDataSourceChannel implements DataSourceChannelExt {
throw new DataSourcePluginException("get table names failed", e);
} finally {
try {
//关闭资源
// 关闭资源
rs.close();
state.close();
} catch (SQLException e) {
......
......@@ -27,8 +27,7 @@ public class AccessOptionRule {
.stringType()
.noDefaultValue()
.withDescription(
"jdbc url, eg:"
+ " http://localhost:9000/bucket/filename.mdb");
"jdbc url, eg:" + " http://localhost:9000/bucket/filename.mdb");
public static final Option<String> USER =
Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
......@@ -36,11 +35,12 @@ public class AccessOptionRule {
public static final Option<String> PASSWORD =
Options.key("password").stringType().noDefaultValue().withDescription("jdbc password");
// public static final Option<String> DATABASE =
// Options.key("database").stringType().noDefaultValue().withDescription("jdbc database");
// public static final Option<String> DATABASE =
// Options.key("database").stringType().noDefaultValue().withDescription("jdbc
// database");
// public static final Option<String> TABLE =
// Options.key("table").stringType().noDefaultValue().withDescription("jdbc table");
// public static final Option<String> TABLE =
// Options.key("table").stringType().noDefaultValue().withDescription("jdbc table");
public static final Option<DriverType> DRIVER =
Options.key("driver")
......
......@@ -22,7 +22,7 @@
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>datasource-jdbc-demeng</artifactId>
<artifactId>datasource-jdbc-dameng</artifactId>
<properties>
<mysql-connector.version>8.0.28</mysql-connector.version>
......
......@@ -25,9 +25,9 @@ import com.google.common.collect.Sets;
import java.util.Set;
public class DemengDataSourceConfig {
public class DamengDataSourceConfig {
public static final String PLUGIN_NAME = "JDBC-Demeng";
public static final String PLUGIN_NAME = "JDBC-Dameng";
public static final DataSourcePluginInfo MYSQL_DATASOURCE_PLUGIN_INFO =
DataSourcePluginInfo.builder()
......@@ -37,15 +37,16 @@ public class DemengDataSourceConfig {
.type(DatasourcePluginTypeEnum.DATABASE.getCode())
.build();
public static final Set<String> MYSQL_SYSTEM_DATABASES =
Sets.newHashSet("SYSTEM", "ROLL");
public static final Set<String> MYSQL_SYSTEM_DATABASES = Sets.newHashSet("SYSTEM", "ROLL");
public static final OptionRule OPTION_RULE =
OptionRule.builder()
.required(DemengOptionRule.URL, DemengOptionRule.DRIVER)
.optional(DemengOptionRule.USER, DemengOptionRule.PASSWORD)
.required(DamengOptionRule.URL, DamengOptionRule.DRIVER)
.optional(DamengOptionRule.USER, DamengOptionRule.PASSWORD)
.build();
public static final OptionRule METADATA_RULE =
OptionRule.builder().required(DemengOptionRule.DATABASE, DemengOptionRule.TABLE).build();
OptionRule.builder()
.required(DamengOptionRule.DATABASE, DamengOptionRule.TABLE)
.build();
}
......@@ -17,13 +17,12 @@
package org.apache.seatunnel.datasource.plugin.demeng.jdbc;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannelExt;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
......@@ -35,41 +34,41 @@ import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
public class DemengJdbcDataSourceChannel implements DataSourceChannelExt {
public class DamengJdbcDataSourceChannel implements DataSourceChannelExt {
List<String> typeList = Arrays.asList("varchar", "char", "json");
public static class Holder {
private static final DemengJdbcDataSourceChannel INSTANCE = new DemengJdbcDataSourceChannel();
private static final DamengJdbcDataSourceChannel INSTANCE =
new DamengJdbcDataSourceChannel();
}
public static DemengJdbcDataSourceChannel getInstance() {
public static DamengJdbcDataSourceChannel getInstance() {
return Holder.INSTANCE;
}
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return DemengDataSourceConfig.OPTION_RULE;
return DamengDataSourceConfig.OPTION_RULE;
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return DemengDataSourceConfig.METADATA_RULE;
return DamengDataSourceConfig.METADATA_RULE;
}
public static List<Map> getResultSet(ResultSet rs) throws SQLException {
List<Map> reList=new ArrayList<>();
ResultSetMetaData rsmd = rs.getMetaData() ;
int columnCount = rsmd.getColumnCount();//ResultSet的总列数
List<Map> reList = new ArrayList<>();
ResultSetMetaData rsmd = rs.getMetaData();
int columnCount = rsmd.getColumnCount(); // ResultSet的总列数
while (rs.next()) {
Map map=new HashMap();
for(int i=1;i<=columnCount;i++) {
Map map = new HashMap();
for (int i = 1; i <= columnCount; i++) {
Object value = rs.getObject(i);
String columnName = rsmd.getColumnName(i);
map.put(columnName,value);
map.put(columnName, value);
}
reList.add(map);
}
return reList;
}
......@@ -87,20 +86,25 @@ public class DemengJdbcDataSourceChannel implements DataSourceChannelExt {
ResultSet rs = null;
try (Connection connection = getConnection(requestParams)) {
String sql_selectAll = "SELECT * FROM user_tables";
if(StringUtils.isNotBlank(database)) {
sql_selectAll=sql_selectAll+" where TABLESPACE_NAME='" + database.toUpperCase() + "'";
if (StringUtils.isNotBlank(database)) {
sql_selectAll =
sql_selectAll + " where TABLESPACE_NAME='" + database.toUpperCase() + "'";
}
state = connection.createStatement();
//执行查询的 SQL 语句
// 执行查询的 SQL 语句
rs = state.executeQuery(sql_selectAll);
List<Map> resultSet = getResultSet(rs);
tableNames = resultSet.stream().filter(f -> f.get("TABLE_NAME") != null).map(m -> m.get("TABLE_NAME") + "").collect(Collectors.toList());
tableNames =
resultSet.stream()
.filter(f -> f.get("TABLE_NAME") != null)
.map(m -> m.get("TABLE_NAME") + "")
.collect(Collectors.toList());
return tableNames;
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
}finally {
} finally {
try {
//关闭资源
// 关闭资源
rs.close();
state.close();
} catch (SQLException e) {
......@@ -118,19 +122,24 @@ public class DemengJdbcDataSourceChannel implements DataSourceChannelExt {
// 定义结果集对象
ResultSet rs = null;
try (Connection connection = getConnection(requestParams)) {
String sql_selectAll = "SELECT TABLESPACE_NAME FROM user_tables group by TABLESPACE_NAME ";
String sql_selectAll =
"SELECT TABLESPACE_NAME FROM user_tables group by TABLESPACE_NAME ";
state = connection.createStatement();
//执行查询的 SQL 语句
// 执行查询的 SQL 语句
rs = state.executeQuery(sql_selectAll);
List<Map> resultSet = getResultSet(rs);
tableNames = resultSet.stream().filter(f -> f.get("TABLESPACE_NAME") != null).map(m -> m.get("TABLESPACE_NAME") + "").collect(Collectors.toList());
tableNames =
resultSet.stream()
.filter(f -> f.get("TABLESPACE_NAME") != null)
.map(m -> m.get("TABLESPACE_NAME") + "")
.collect(Collectors.toList());
return tableNames;
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get Databases names failed", e);
}finally {
} finally {
try {
//关闭资源
// 关闭资源
rs.close();
state.close();
} catch (SQLException e) {
......@@ -161,7 +170,9 @@ public class DemengJdbcDataSourceChannel implements DataSourceChannelExt {
String primaryKey = getPrimaryKey(metaData, database, table);
String sql =
" select * from SYSCOLUMNS where id in (select id from SYSOBJECTS where name='"+table.toUpperCase()+"')";
" select * from SYSCOLUMNS where id in (select id from SYSOBJECTS where name='"
+ table.toUpperCase()
+ "')";
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
while (resultSet.next()) {
......@@ -190,7 +201,7 @@ public class DemengJdbcDataSourceChannel implements DataSourceChannelExt {
// tableField.setDefaultValue(resultSet.getString("COLUMN_DEFAULT"));
String nullable = resultSet.getString("NULLABLE$");
if("Y".equals(nullable)) {
if ("Y".equals(nullable)) {
tableField.setNullable(true);
} else {
tableField.setNullable(false);
......@@ -236,20 +247,19 @@ public class DemengJdbcDataSourceChannel implements DataSourceChannelExt {
private Connection getConnection(Map<String, String> requestParams, String databaseName)
throws SQLException, ClassNotFoundException {
checkNotNull(requestParams.get(DemengOptionRule.DRIVER.key()));
checkNotNull(requestParams.get(DemengOptionRule.URL.key()), "Jdbc url cannot be null");
String url =
DemengOptionRule.URL.key();
if (requestParams.containsKey(DemengOptionRule.USER.key())) {
String username = requestParams.get(DemengOptionRule.USER.key());
String password = requestParams.get(DemengOptionRule.PASSWORD.key());
Class.forName(DemengOptionRule.DriverType.DEMENG.getDriverClassName());
checkNotNull(requestParams.get(DamengOptionRule.DRIVER.key()));
checkNotNull(requestParams.get(DamengOptionRule.URL.key()), "Jdbc url cannot be null");
String url = DamengOptionRule.URL.key();
if (requestParams.containsKey(DamengOptionRule.USER.key())) {
String username = requestParams.get(DamengOptionRule.USER.key());
String password = requestParams.get(DamengOptionRule.PASSWORD.key());
Class.forName(DamengOptionRule.DriverType.DEMENG.getDriverClassName());
Connection conn = DriverManager.getConnection(url, username, password);
conn.setAutoCommit(true);
return conn;
}
Class.forName(DemengOptionRule.DriverType.DEMENG.getDriverClassName());
Class.forName(DamengOptionRule.DriverType.DEMENG.getDriverClassName());
Connection conn = DriverManager.getConnection(url, "SYSDBA", null);
conn.setAutoCommit(true);
return conn;
......@@ -269,20 +279,21 @@ public class DemengJdbcDataSourceChannel implements DataSourceChannelExt {
ResultSet rs = null;
try (Connection connection = getConnection(requestParams)) {
String sql_selectAll = "SELECT * FROM user_tables";
if(StringUtils.isNotBlank(database)) {
sql_selectAll=sql_selectAll+" where TABLESPACE_NAME='" + database.toUpperCase() + "'";
if (StringUtils.isNotBlank(database)) {
sql_selectAll =
sql_selectAll + " where TABLESPACE_NAME='" + database.toUpperCase() + "'";
}
state = connection.createStatement();
//执行查询的 SQL 语句
// 执行查询的 SQL 语句
rs = state.executeQuery(sql_selectAll);
List<Map> resultSet = getResultSet(rs);
return resultSet;
} catch (ClassNotFoundException | SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
}finally {
} finally {
try {
//关闭资源
// 关闭资源
rs.close();
state.close();
} catch (SQLException e) {
......@@ -299,7 +310,7 @@ public class DemengJdbcDataSourceChannel implements DataSourceChannelExt {
Map<String, String> options,
String tableName) {
List<Map> list = getTableAlls(pluginName,requestParams,database,tableName,options);
List<Map> list = getTableAlls(pluginName, requestParams, database, tableName, options);
if (CollectionUtils.isNotEmpty(list)) {
return list.get(0);
}
......
......@@ -29,20 +29,20 @@ import java.util.Set;
@Slf4j
@AutoService(DataSourceFactory.class)
public class DemengJdbcDataSourceFactory implements DataSourceFactory {
public class DamengJdbcDataSourceFactory implements DataSourceFactory {
@Override
public String factoryIdentifier() {
return DemengDataSourceConfig.PLUGIN_NAME;
return DamengDataSourceConfig.PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
return Sets.newHashSet(DemengDataSourceConfig.MYSQL_DATASOURCE_PLUGIN_INFO);
return Sets.newHashSet(DamengDataSourceConfig.MYSQL_DATASOURCE_PLUGIN_INFO);
}
@Override
public DataSourceChannel createChannel() {
return new DemengJdbcDataSourceChannel();
return new DamengJdbcDataSourceChannel();
}
}
......@@ -20,15 +20,13 @@ package org.apache.seatunnel.datasource.plugin.demeng.jdbc;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
public class DemengOptionRule {
public class DamengOptionRule {
public static final Option<String> URL =
Options.key("url")
.stringType()
.noDefaultValue()
.withDescription(
"jdbc url, eg:"
+ " jdbc:dm://localhost:5236");
.withDescription("jdbc url, eg:" + " jdbc:dm://localhost:5236");
public static final Option<String> USER =
Options.key("user").stringType().noDefaultValue().withDescription("jdbc user");
......
......@@ -141,7 +141,6 @@
<artifactId>aws-java-sdk-bundle</artifactId>
</dependency>
</dependencies>
</dependencies>
</project>
package org.apache.seatunnel.datasource.plugin.xml;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.hadoop.conf.Configuration;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@Slf4j
......
......@@ -17,13 +17,14 @@
package org.apache.seatunnel.datasource.plugin.xml;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set;
@AutoService(DataSourceFactory.class)
......
......@@ -17,18 +17,20 @@
package org.apache.seatunnel.datasource.plugin.xml;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.fastjson2.JSON;
import io.minio.*;
import io.minio.errors.*;
import io.minio.messages.Bucket;
import io.minio.messages.Item;
import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import java.io.BufferedReader;
import java.io.IOException;
......
......@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import java.util.Arrays;
import java.util.Map;
public class XMLOptionRule {
......@@ -121,7 +120,7 @@ public class XMLOptionRule {
return OptionRule.builder()
.required(PATH, TYPE)
.conditional(TYPE, FileFormat.XML.type, DELIMITER)
.conditional(TYPE,FileFormat.XML.type, SCHEMA)
.conditional(TYPE, FileFormat.XML.type, SCHEMA)
.optional(PARSE_PARSE_PARTITION_FROM_PATH)
.optional(DATE_FORMAT)
.optional(DATETIME_FORMAT)
......
......@@ -50,7 +50,7 @@
<module>datasource-redis</module>
<module>datasource-rabbitmq</module>
<module>datasource-ftp</module>
<module>datasource-jdbc-demeng</module>
<module>datasource-jdbc-dameng</module>
<module>datasource-jdbc-access</module>
<module>datasource-http</module>
<module>datasource-xml</module>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论