提交 12c34977 authored 作者: 宋勇's avatar 宋勇

查询数据库表接口

上级 0a83fff9
...@@ -208,15 +208,25 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt { ...@@ -208,15 +208,25 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
@NonNull String pluginName, @NonNull String pluginName,
Map<String, String> requestParams, Map<String, String> requestParams,
String database, String database,
String tableName,
Map<String, String> options) { Map<String, String> options) {
List<Map> tableNames = new ArrayList<>(); List<Map> tableNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams)) { try (Connection connection = getConnection(requestParams)) {
String sql = StringBuilder sql =new StringBuilder( "SELECT * FROM information_schema.tables WHERE 1=1 ");
"SELECT * FROM information_schema.tables WHERE table_schema='" + database + "'";
if (StringUtils.isNotBlank(database)) {
sql.append(" and table_schema='" + database + "'");
}
if (StringUtils.isNotBlank(tableName)) {
sql.append(" and TABLE_NAME='" + tableName + "'");
}
Statement statement = connection.createStatement(); Statement statement = connection.createStatement();
statement.execute("use " + database); // statement.execute("use " + database);
ResultSet resultSet = statement.executeQuery(sql); ResultSet resultSet = statement.executeQuery(sql.toString());
while (resultSet.next()) { while (resultSet.next()) {
Map tables = new HashMap(); Map tables = new HashMap();
...@@ -228,9 +238,9 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt { ...@@ -228,9 +238,9 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
tables.put(metaData.getColumnName(i), resultSet.getObject(i)); tables.put(metaData.getColumnName(i), resultSet.getObject(i));
} }
String tableName = resultSet.getString("TABLE_NAME"); String tableNameold = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableNameold)) {
tables.put("TABLE_NAME", tableName); tables.put("TABLE_NAME", tableNameold);
} }
String tableComment = resultSet.getString("TABLE_COMMENT"); String tableComment = resultSet.getString("TABLE_COMMENT");
...@@ -262,7 +272,7 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt { ...@@ -262,7 +272,7 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
+ database + database
+ "'"; + "'";
Statement statement = connection.createStatement(); Statement statement = connection.createStatement();
statement.execute("use " + database); // statement.execute("use " + database);
ResultSet resultSet = statement.executeQuery(sql); ResultSet resultSet = statement.executeQuery(sql);
ResultSetMetaData metaData = resultSet.getMetaData(); ResultSetMetaData metaData = resultSet.getMetaData();
// 字段的个数 // 字段的个数
......
...@@ -54,10 +54,10 @@ public class OracleDataSourceChannel implements DataSourceChannelExt { ...@@ -54,10 +54,10 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
Map<String, String> option) { Map<String, String> option) {
List<String> tableNames = new ArrayList<>(); List<String> tableNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams); try (Connection connection = getConnection(requestParams);
ResultSet resultSet = ResultSet resultSet =
connection connection
.getMetaData() .getMetaData()
.getTables(database, null, null, new String[] {"TABLE"}); ) { .getTables(database, null, null, new String[]{"TABLE"});) {
while (resultSet.next()) { while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME"); String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableName)) {
...@@ -75,8 +75,8 @@ public class OracleDataSourceChannel implements DataSourceChannelExt { ...@@ -75,8 +75,8 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
@NonNull String pluginName, @NonNull Map<String, String> requestParams) { @NonNull String pluginName, @NonNull Map<String, String> requestParams) {
List<String> dbNames = new ArrayList<>(); List<String> dbNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams); try (Connection connection = getConnection(requestParams);
PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;"); PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;");
ResultSet re = statement.executeQuery()) { ResultSet re = statement.executeQuery()) {
// filter system databases // filter system databases
while (re.next()) { while (re.next()) {
String dbName = re.getString("database"); String dbName = re.getString("database");
...@@ -219,26 +219,39 @@ public class OracleDataSourceChannel implements DataSourceChannelExt { ...@@ -219,26 +219,39 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
@NonNull String pluginName, @NonNull String pluginName,
Map<String, String> requestParams, Map<String, String> requestParams,
String database, String database,
String tableName,
Map<String, String> options) { Map<String, String> options) {
List<Map> tableNames = new ArrayList<>(); List<Map> tableNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams); try (Connection connection = getConnection(requestParams, database)) {
ResultSet resultSet = StringBuilder sql = new StringBuilder("sselect * from all_tables where 1=1 ");
connection
.getMetaData() if (StringUtils.isNotBlank(database)) {
.getTables(database, null, null, new String[] {"TABLE"})) {
sql.append(" and OWNER=\"" + database + "\"");
}
if (StringUtils.isNotBlank(tableName)) {
sql.append(" and table_name=\"" + tableName + "\"");
}
Statement statement = connection.createStatement();
// statement.execute("use " + database);
ResultSet resultSet = statement.executeQuery(sql.toString());
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) { while (resultSet.next()) {
Map tables = new HashMap(); Map tables = new HashMap();
ResultSetMetaData metaData = resultSet.getMetaData(); // ResultSetMetaData metaData = resultSet.getMetaData();
// 获取表列数 // 获取表列数
int columnCount = metaData.getColumnCount(); int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) { for (int i = 1; i <= columnCount; i++) {
tables.put(metaData.getColumnName(i), resultSet.getObject(i)); tables.put(metaData.getColumnName(i), resultSet.getObject(i));
} }
String tableName = resultSet.getString("TABLE_NAME"); String tableNameold = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableNameold)) {
tables.put("TABLE_NAME", tableName); tables.put("TABLE_NAME", tableNameold);
} }
String tableComment = resultSet.getString("TABLE_COMMENT"); String tableComment = resultSet.getString("TABLE_COMMENT");
......
...@@ -56,13 +56,13 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt { ...@@ -56,13 +56,13 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
String query = "SELECT table_schema, table_name FROM information_schema.tables"; String query = "SELECT table_schema, table_name FROM information_schema.tables";
try (Connection connection = getConnection(requestParams, database)) { try (Connection connection = getConnection(requestParams, database)) {
try (Statement statement = connection.createStatement(); try (Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(query)) { ResultSet resultSet = statement.executeQuery(query)) {
while (resultSet.next()) { while (resultSet.next()) {
String schemaName = resultSet.getString("table_schema"); String schemaName = resultSet.getString("table_schema");
String tableName = resultSet.getString("table_name"); String tableName = resultSet.getString("table_name");
if (StringUtils.isNotBlank(schemaName) if (StringUtils.isNotBlank(schemaName)
&& !PostgresqlDataSourceConfig.POSTGRESQL_SYSTEM_DATABASES.contains( && !PostgresqlDataSourceConfig.POSTGRESQL_SYSTEM_DATABASES.contains(
schemaName)) { schemaName)) {
tableNames.add(schemaName + "." + tableName); tableNames.add(schemaName + "." + tableName);
} }
} }
...@@ -78,14 +78,14 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt { ...@@ -78,14 +78,14 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
@NonNull String pluginName, @NonNull Map<String, String> requestParams) { @NonNull String pluginName, @NonNull Map<String, String> requestParams) {
List<String> dbNames = new ArrayList<>(); List<String> dbNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams); try (Connection connection = getConnection(requestParams);
PreparedStatement statement = PreparedStatement statement =
connection.prepareStatement("select datname from pg_database;"); connection.prepareStatement("select datname from pg_database;");
ResultSet re = statement.executeQuery()) { ResultSet re = statement.executeQuery()) {
while (re.next()) { while (re.next()) {
String dbName = re.getString("datname"); String dbName = re.getString("datname");
if (StringUtils.isNotBlank(dbName) if (StringUtils.isNotBlank(dbName)
&& !PostgresqlDataSourceConfig.POSTGRESQL_SYSTEM_DATABASES.contains( && !PostgresqlDataSourceConfig.POSTGRESQL_SYSTEM_DATABASES.contains(
dbName)) { dbName)) {
dbNames.add(dbName); dbNames.add(dbName);
} }
} }
...@@ -112,7 +112,7 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt { ...@@ -112,7 +112,7 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
@NonNull String database, @NonNull String database,
@NonNull String table) { @NonNull String table) {
List<TableField> tableFields = new ArrayList<>(); List<TableField> tableFields = new ArrayList<>();
try (Connection connection = getConnection(requestParams, database); ) { try (Connection connection = getConnection(requestParams, database);) {
DatabaseMetaData metaData = connection.getMetaData(); DatabaseMetaData metaData = connection.getMetaData();
String primaryKey = getPrimaryKey(metaData, database, table); String primaryKey = getPrimaryKey(metaData, database, table);
String[] split = table.split("\\."); String[] split = table.split("\\.");
...@@ -199,13 +199,23 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt { ...@@ -199,13 +199,23 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
@NonNull String pluginName, @NonNull String pluginName,
Map<String, String> requestParams, Map<String, String> requestParams,
String database, String database,
String tableName,
Map<String, String> options) { Map<String, String> options) {
List<Map> tableNames = new ArrayList<>(); List<Map> tableNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams)) { try (Connection connection = getConnection(requestParams)) {
String sql = StringBuilder sql = new StringBuilder("SELECT table_schema, table_name TABLE_NAME, table_comment TABLE_COMMENT FROM information_schema.tables where 1=1 ");
"SELECT table_schema, table_name TABLE_NAME, table_comment TABLE_COMMENT FROM information_schema.tables";
if (StringUtils.isNotBlank(database)) {
sql.append(" and table_schema='" + database + "'");
}
if (StringUtils.isNotBlank(tableName)) {
sql.append(" and table_name='" + tableName + "'");
}
Statement statement = connection.createStatement(); Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql); ResultSet resultSet = statement.executeQuery(sql.toString());
while (resultSet.next()) { while (resultSet.next()) {
Map tables = new HashMap(); Map tables = new HashMap();
...@@ -217,9 +227,9 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt { ...@@ -217,9 +227,9 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
tables.put(metaData.getColumnName(i), resultSet.getObject(i)); tables.put(metaData.getColumnName(i), resultSet.getObject(i));
} }
String tableName = resultSet.getString("TABLE_NAME"); String tableNameold = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableNameold)) {
tables.put("TABLE_NAME", tableName); tables.put("TABLE_NAME", tableNameold);
} }
String tableComment = resultSet.getString("TABLE_COMMENT"); String tableComment = resultSet.getString("TABLE_COMMENT");
......
...@@ -30,10 +30,7 @@ import lombok.NonNull; ...@@ -30,10 +30,7 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.sql.*; import java.sql.*;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkNotNull;
...@@ -226,14 +223,77 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt { ...@@ -226,14 +223,77 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
@NonNull String pluginName, @NonNull String pluginName,
Map<String, String> requestParams, Map<String, String> requestParams,
String database, String database,
String tableName,
Map<String, String> options) { Map<String, String> options) {
List<Map> tableNames = new ArrayList<>(); List<Map> tableNames = new ArrayList<>();
try (Connection connection = getConnection(pluginName, requestParams)) { try (Connection connection = getConnection(pluginName, requestParams)) {
String sql =
"select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from " ResultSet resultSet=null;
+ " sys.tables a left join sys.extended_properties on (a.object_id = g.major_id AND g.minor_id = 0)";
Statement statement = connection.createStatement(); if (StringUtils.isNotBlank(database)) {
ResultSet resultSet = statement.executeQuery(sql);
StringBuilder sql =new StringBuilder( "select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from "
+ " sys.tables a left join sys.extended_properties g on (a.object_id = g.major_id AND g.minor_id = 0) where 1=1 ");
if (StringUtils.isNotBlank(tableName)) {
sql.append(" and a.name='" + tableName + "'");
}
Statement statement = connection.createStatement();
statement.execute("use " + database);
resultSet = statement.executeQuery(sql.toString());
fillTableNames(tableNames, resultSet);
} else {
String sqlDatabases = "SELECT * FROM sys.sysdatabases";
Statement statement = connection.createStatement();
resultSet = statement.executeQuery(sqlDatabases.toString());
while (resultSet.next()) {
// ResultSetMetaData metaData = resultSet.getMetaData();
// // 获取表列数
// int columnCount = metaData.getColumnCount();
// for (int i = 1; i <= columnCount; i++) {
// tables.put(metaData.getColumnName(i), resultSet.getObject(i));
// }
String databaseName = resultSet.getString("name");
if (StringUtils.isNotBlank(databaseName) && !Arrays.asList("master","tempdb","model","msdb").contains(databaseName)) {
StringBuilder sql =new StringBuilder( "select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from "
+ " sys.tables a left join sys.extended_properties g on (a.object_id = g.major_id AND g.minor_id = 0) where 1=1 ");
if (StringUtils.isNotBlank(tableName)) {
sql.append(" and a.name='" + tableName + "'");
}
statement.execute("use " + database);
resultSet = statement.executeQuery(sql.toString());
fillTableNames(tableNames, resultSet);
}
}
}
return tableNames;
} catch (SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
}
}
private static void fillTableNames(List<Map> tableNames, ResultSet resultSet) throws SQLException {
if(Objects.nonNull(resultSet)) {
while (resultSet.next()) { while (resultSet.next()) {
Map tables = new HashMap(); Map tables = new HashMap();
...@@ -244,9 +304,9 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt { ...@@ -244,9 +304,9 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
tables.put(metaData.getColumnName(i), resultSet.getObject(i)); tables.put(metaData.getColumnName(i), resultSet.getObject(i));
} }
String tableName = resultSet.getString("TABLE_NAME"); String tableNameOld = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableNameOld)) {
tables.put("TABLE_NAME", tableName); tables.put("TABLE_NAME", tableNameOld);
} }
String tableComment = resultSet.getString("TABLE_COMMENT"); String tableComment = resultSet.getString("TABLE_COMMENT");
...@@ -256,9 +316,6 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt { ...@@ -256,9 +316,6 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
} }
tableNames.add(tables); tableNames.add(tables);
} }
return tableNames;
} catch (SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
} }
} }
......
...@@ -206,13 +206,25 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt { ...@@ -206,13 +206,25 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt {
@NonNull String pluginName, @NonNull String pluginName,
Map<String, String> requestParams, Map<String, String> requestParams,
String database, String database,
String tableName,
Map<String, String> options) { Map<String, String> options) {
List<Map> tableNames = new ArrayList<>(); List<Map> tableNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams); try (Connection connection = getConnection(requestParams)) {
ResultSet resultSet =
connection StringBuilder sql =new StringBuilder( "SELECT * FROM information_schema.tables WHERE 1=1 ");
.getMetaData()
.getTables(database, null, null, new String[] {"TABLE"})) { if (StringUtils.isNotBlank(database)) {
sql.append(" and table_schema='" + database + "'");
}
if (StringUtils.isNotBlank(tableName)) {
sql.append(" and TABLE_NAME='" + tableName + "'");
}
Statement statement = connection.createStatement();
// statement.execute("use " + database);
ResultSet resultSet = statement.executeQuery(sql.toString());
while (resultSet.next()) { while (resultSet.next()) {
Map tables = new HashMap(); Map tables = new HashMap();
...@@ -223,9 +235,9 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt { ...@@ -223,9 +235,9 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt {
tables.put(metaData.getColumnName(i), resultSet.getObject(i)); tables.put(metaData.getColumnName(i), resultSet.getObject(i));
} }
String tableName = resultSet.getString("TABLE_NAME"); String tableNameOld = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableNameOld)) {
tables.put("TABLE_NAME", tableName); tables.put("TABLE_NAME", tableNameOld);
} }
String tableComment = resultSet.getString("TABLE_COMMENT"); String tableComment = resultSet.getString("TABLE_COMMENT");
......
...@@ -27,10 +27,13 @@ import org.apache.commons.lang3.StringUtils; ...@@ -27,10 +27,13 @@ import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import lombok.NonNull; import lombok.NonNull;
import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
import java.sql.*; import java.sql.*;
import java.util.*; import java.util.*;
import static com.google.common.base.Preconditions.checkNotNull;
public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
List<String> typeList = Arrays.asList("varchar", "char", "json"); List<String> typeList = Arrays.asList("varchar", "char", "json");
public static final Set<String> MYSQL_SYSTEM_DATABASES = public static final Set<String> MYSQL_SYSTEM_DATABASES =
...@@ -97,9 +100,9 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -97,9 +100,9 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
@SuppressWarnings("checkstyle:MagicNumber") @SuppressWarnings("checkstyle:MagicNumber")
protected boolean checkJdbcConnectivity(Map<String, String> requestParams) { protected boolean checkJdbcConnectivity(Map<String, String> requestParams) {
try (Connection connection = init(requestParams); try (Connection connection = init(requestParams);
Statement statement = connection.createStatement()) { Statement statement = connection.createStatement()) {
try (ResultSet resultSet = statement.executeQuery("SHOW MASTER STATUS"); ) { try (ResultSet resultSet = statement.executeQuery("SHOW MASTER STATUS");) {
if (resultSet.next()) { if (resultSet.next()) {
String binlogFile = resultSet.getString("File"); String binlogFile = resultSet.getString("File");
if (StringUtils.isBlank(binlogFile)) { if (StringUtils.isBlank(binlogFile)) {
...@@ -111,7 +114,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -111,7 +114,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
} }
try (ResultSet resultSet = try (ResultSet resultSet =
statement.executeQuery("SHOW VARIABLES LIKE 'binlog_format'")) { statement.executeQuery("SHOW VARIABLES LIKE 'binlog_format'")) {
if (resultSet.next()) { if (resultSet.next()) {
String binlogFormat = resultSet.getString("Value"); String binlogFormat = resultSet.getString("Value");
if (!"ROW".equalsIgnoreCase(binlogFormat)) { if (!"ROW".equalsIgnoreCase(binlogFormat)) {
...@@ -123,7 +126,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -123,7 +126,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
} }
try (ResultSet resultSet = try (ResultSet resultSet =
statement.executeQuery("SHOW VARIABLES LIKE 'binlog_row_image'")) { statement.executeQuery("SHOW VARIABLES LIKE 'binlog_row_image'")) {
if (resultSet.next()) { if (resultSet.next()) {
String binlogRowImage = resultSet.getString("Value"); String binlogRowImage = resultSet.getString("Value");
if (!"FULL".equalsIgnoreCase(binlogRowImage)) { if (!"FULL".equalsIgnoreCase(binlogRowImage)) {
...@@ -157,8 +160,8 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -157,8 +160,8 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
protected List<String> getDataBaseNames(Map<String, String> requestParams) throws SQLException { protected List<String> getDataBaseNames(Map<String, String> requestParams) throws SQLException {
List<String> dbNames = new ArrayList<>(); List<String> dbNames = new ArrayList<>();
try (Connection connection = init(requestParams); try (Connection connection = init(requestParams);
PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;"); PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;");
ResultSet re = statement.executeQuery()) { ResultSet re = statement.executeQuery()) {
// filter system databases // filter system databases
while (re.next()) { while (re.next()) {
String dbName = re.getString("database"); String dbName = re.getString("database");
...@@ -173,10 +176,10 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -173,10 +176,10 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
protected List<String> getTableNames(Map<String, String> requestParams, String dbName) { protected List<String> getTableNames(Map<String, String> requestParams, String dbName) {
List<String> tableNames = new ArrayList<>(); List<String> tableNames = new ArrayList<>();
try (Connection connection = init(requestParams); try (Connection connection = init(requestParams);
ResultSet resultSet = ResultSet resultSet =
connection connection
.getMetaData() .getMetaData()
.getTables(dbName, null, null, new String[] {"TABLE"})) { .getTables(dbName, null, null, new String[]{"TABLE"})) {
while (resultSet.next()) { while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME"); String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableName)) {
...@@ -192,7 +195,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -192,7 +195,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
protected List<TableField> getTableFields( protected List<TableField> getTableFields(
Map<String, String> requestParams, String dbName, String tableName) { Map<String, String> requestParams, String dbName, String tableName) {
List<TableField> tableFields = new ArrayList<>(); List<TableField> tableFields = new ArrayList<>();
try (Connection connection = init(requestParams); ) { try (Connection connection = init(requestParams);) {
DatabaseMetaData metaData = connection.getMetaData(); DatabaseMetaData metaData = connection.getMetaData();
String primaryKey = getPrimaryKey(metaData, dbName, tableName); String primaryKey = getPrimaryKey(metaData, dbName, tableName);
...@@ -265,13 +268,27 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -265,13 +268,27 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
@NonNull String pluginName, @NonNull String pluginName,
Map<String, String> requestParams, Map<String, String> requestParams,
String database, String database,
String tableName,
Map<String, String> options) { Map<String, String> options) {
List<Map> tableNames = new ArrayList<>(); List<Map> tableNames = new ArrayList<>();
try (Connection connection = getConnection(pluginName, requestParams);
ResultSet resultSet = try (Connection connection =init(requestParams)) {
connection
.getMetaData() StringBuilder sql = new StringBuilder("SELECT * FROM information_schema.tables WHERE 1=1 ");
.getTables(database, null, null, new String[] {"TABLE"})) {
if (StringUtils.isNotBlank(database)) {
sql.append(" and table_schema='" + database + "'");
}
if (StringUtils.isNotBlank(tableName)) {
sql.append(" and TABLE_NAME='" + tableName + "'");
}
Statement statement = connection.createStatement();
// statement.execute("use " + database);
ResultSet resultSet = statement.executeQuery(sql.toString());
while (resultSet.next()) { while (resultSet.next()) {
Map tables = new HashMap(); Map tables = new HashMap();
...@@ -282,9 +299,9 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -282,9 +299,9 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
tables.put(metaData.getColumnName(i), resultSet.getObject(i)); tables.put(metaData.getColumnName(i), resultSet.getObject(i));
} }
String tableName = resultSet.getString("TABLE_NAME"); String tableNameOld = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableNameOld)) {
tables.put("TABLE_NAME", tableName); tables.put("TABLE_NAME", tableNameOld);
} }
String tableComment = resultSet.getString("TABLE_COMMENT"); String tableComment = resultSet.getString("TABLE_COMMENT");
...@@ -308,7 +325,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -308,7 +325,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
Map<String, String> options, Map<String, String> options,
String tableName) { String tableName) {
try (Connection connection = init(requestParams); ) { try (Connection connection = init(requestParams);) {
String sql = String sql =
"select * from information_schema.tables where table_name ='" "select * from information_schema.tables where table_name ='"
+ tableName + tableName
......
...@@ -11,6 +11,7 @@ public interface DataSourceChannelExt extends DataSourceChannel { ...@@ -11,6 +11,7 @@ public interface DataSourceChannelExt extends DataSourceChannel {
@NonNull String pluginName, @NonNull String pluginName,
Map<String, String> requestParams, Map<String, String> requestParams,
String database, String database,
String tableName,
Map<String, String> options); Map<String, String> options);
Map getTableName( Map getTableName(
......
...@@ -31,11 +31,7 @@ import lombok.NonNull; ...@@ -31,11 +31,7 @@ import lombok.NonNull;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import java.sql.*; import java.sql.*;
import java.util.ArrayList; import java.util.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Slf4j @Slf4j
public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt { public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt {
...@@ -232,14 +228,77 @@ public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -232,14 +228,77 @@ public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt {
@NonNull String pluginName, @NonNull String pluginName,
Map<String, String> requestParams, Map<String, String> requestParams,
String database, String database,
String tableName,
Map<String, String> options) { Map<String, String> options) {
List<Map> tableNames = new ArrayList<>(); List<Map> tableNames = new ArrayList<>();
try (Connection connection = getConnection(pluginName, requestParams)) { try (Connection connection = getConnection(pluginName, requestParams)) {
String sql =
"select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from " ResultSet resultSet=null;
+ " sys.tables a left join sys.extended_properties on (a.object_id = g.major_id AND g.minor_id = 0)";
Statement statement = connection.createStatement(); if (StringUtils.isNotBlank(database)) {
ResultSet resultSet = statement.executeQuery(sql);
StringBuilder sql =new StringBuilder( "select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from "
+ " sys.tables a left join sys.extended_properties g on (a.object_id = g.major_id AND g.minor_id = 0) where 1=1 ");
if (StringUtils.isNotBlank(tableName)) {
sql.append(" and a.name='" + tableName + "'");
}
Statement statement = connection.createStatement();
statement.execute("use " + database);
resultSet = statement.executeQuery(sql.toString());
fillTableNames(tableNames, resultSet);
} else {
String sqlDatabases = "SELECT * FROM sys.sysdatabases";
Statement statement = connection.createStatement();
resultSet = statement.executeQuery(sqlDatabases.toString());
while (resultSet.next()) {
// ResultSetMetaData metaData = resultSet.getMetaData();
// // 获取表列数
// int columnCount = metaData.getColumnCount();
// for (int i = 1; i <= columnCount; i++) {
// tables.put(metaData.getColumnName(i), resultSet.getObject(i));
// }
String databaseName = resultSet.getString("name");
if (StringUtils.isNotBlank(databaseName) && !Arrays.asList("master","tempdb","model","msdb").contains(databaseName)) {
StringBuilder sql =new StringBuilder( "select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from "
+ " sys.tables a left join sys.extended_properties g on (a.object_id = g.major_id AND g.minor_id = 0) where 1=1 ");
if (StringUtils.isNotBlank(tableName)) {
sql.append(" and a.name='" + tableName + "'");
}
statement.execute("use " + database);
resultSet = statement.executeQuery(sql.toString());
fillTableNames(tableNames, resultSet);
}
}
}
return tableNames;
} catch (SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
}
}
private static void fillTableNames(List<Map> tableNames, ResultSet resultSet) throws SQLException {
if(Objects.nonNull(resultSet)) {
while (resultSet.next()) { while (resultSet.next()) {
Map tables = new HashMap(); Map tables = new HashMap();
...@@ -247,26 +306,25 @@ public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -247,26 +306,25 @@ public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt {
// 获取表列数 // 获取表列数
int columnCount = metaData.getColumnCount(); int columnCount = metaData.getColumnCount();
for (int i = 1; i <= columnCount; i++) { for (int i = 1; i <= columnCount; i++) {
tables.put(metaData.getColumnName(i), resultSet.getObject(i));
}
String tableName = resultSet.getString("TABLE_NAME"); String tableNameOld = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableNameOld)) {
tables.put("TABLE_NAME", tableName); tables.put("TABLE_NAME", tableNameOld);
} }
String tableComment = resultSet.getString("TABLE_COMMENT"); String tableComment = resultSet.getString("TABLE_COMMENT");
if (StringUtils.isNotBlank(tableComment)) { if (StringUtils.isNotBlank(tableComment)) {
tables.put("TABLE_COMMENT", tableComment); tables.put("TABLE_COMMENT", tableComment);
}
tables.put(metaData.getColumnName(i), resultSet.getObject(i));
} }
tableNames.add(tables);
} }
return tableNames;
} catch (SQLException e) {
throw new DataSourcePluginException("get table names failed", e);
} }
} }
@Override @Override
public Map getTableName( public Map getTableName(
@NonNull String pluginName, @NonNull String pluginName,
......
...@@ -23,7 +23,10 @@ ...@@ -23,7 +23,10 @@
</parent> </parent>
<artifactId>seatunnel-app</artifactId> <artifactId>seatunnel-app</artifactId>
<properties>
<!-- 全局版本控制,如果要修改版本号,修改此处即可-->
<revision>1.0.0-SNAPSHOT</revision>
</properties>
<dependencies> <dependencies>
<!-- https://mvnrepository.com/artifact/io.swagger.core.v3/swagger-annotations --> <!-- https://mvnrepository.com/artifact/io.swagger.core.v3/swagger-annotations -->
......
...@@ -25,7 +25,7 @@ spring: ...@@ -25,7 +25,7 @@ spring:
date-format: yyyy-MM-dd HH:mm:ss date-format: yyyy-MM-dd HH:mm:ss
datasource: datasource:
driver-class-name: com.mysql.cj.jdbc.Driver driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://192.168.1.124:51382/seatunnel?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true url: jdbc:mysql://192.168.1.124:51382/data-dev-platform?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true
username: root username: root
password: zysoft password: zysoft
mvc: mvc:
......
...@@ -21,4 +21,4 @@ hazelcast-client: ...@@ -21,4 +21,4 @@ hazelcast-client:
hazelcast.logging.type: log4j2 hazelcast.logging.type: log4j2
network: network:
cluster-members: cluster-members:
- 127.0.0.1:5801 - 192.168.1.174:5801
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论