提交 3474b595 authored 作者: 宋勇's avatar 宋勇

添加tablename查询

上级 12c34977
...@@ -213,7 +213,8 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt { ...@@ -213,7 +213,8 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
List<Map> tableNames = new ArrayList<>(); List<Map> tableNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams)) { try (Connection connection = getConnection(requestParams)) {
StringBuilder sql =new StringBuilder( "SELECT * FROM information_schema.tables WHERE 1=1 "); StringBuilder sql =
new StringBuilder("SELECT * FROM information_schema.tables WHERE 1=1 ");
if (StringUtils.isNotBlank(database)) { if (StringUtils.isNotBlank(database)) {
...@@ -225,7 +226,7 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt { ...@@ -225,7 +226,7 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
sql.append(" and TABLE_NAME='" + tableName + "'"); sql.append(" and TABLE_NAME='" + tableName + "'");
} }
Statement statement = connection.createStatement(); Statement statement = connection.createStatement();
// statement.execute("use " + database); // statement.execute("use " + database);
ResultSet resultSet = statement.executeQuery(sql.toString()); ResultSet resultSet = statement.executeQuery(sql.toString());
while (resultSet.next()) { while (resultSet.next()) {
......
...@@ -57,7 +57,7 @@ public class OracleDataSourceChannel implements DataSourceChannelExt { ...@@ -57,7 +57,7 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
ResultSet resultSet = ResultSet resultSet =
connection connection
.getMetaData() .getMetaData()
.getTables(database, null, null, new String[]{"TABLE"});) { .getTables(database, null, null, new String[] {"TABLE"}); ) {
while (resultSet.next()) { while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME"); String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableName)) {
...@@ -236,7 +236,7 @@ public class OracleDataSourceChannel implements DataSourceChannelExt { ...@@ -236,7 +236,7 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
} }
Statement statement = connection.createStatement(); Statement statement = connection.createStatement();
// statement.execute("use " + database); // statement.execute("use " + database);
ResultSet resultSet = statement.executeQuery(sql.toString()); ResultSet resultSet = statement.executeQuery(sql.toString());
ResultSetMetaData metaData = resultSet.getMetaData(); ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) { while (resultSet.next()) {
......
...@@ -112,7 +112,7 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt { ...@@ -112,7 +112,7 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
@NonNull String database, @NonNull String database,
@NonNull String table) { @NonNull String table) {
List<TableField> tableFields = new ArrayList<>(); List<TableField> tableFields = new ArrayList<>();
try (Connection connection = getConnection(requestParams, database);) { try (Connection connection = getConnection(requestParams, database); ) {
DatabaseMetaData metaData = connection.getMetaData(); DatabaseMetaData metaData = connection.getMetaData();
String primaryKey = getPrimaryKey(metaData, database, table); String primaryKey = getPrimaryKey(metaData, database, table);
String[] split = table.split("\\."); String[] split = table.split("\\.");
...@@ -203,7 +203,9 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt { ...@@ -203,7 +203,9 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
Map<String, String> options) { Map<String, String> options) {
List<Map> tableNames = new ArrayList<>(); List<Map> tableNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams)) { try (Connection connection = getConnection(requestParams)) {
StringBuilder sql = new StringBuilder("SELECT table_schema, table_name TABLE_NAME, table_comment TABLE_COMMENT FROM information_schema.tables where 1=1 "); StringBuilder sql =
new StringBuilder(
"SELECT table_schema, table_name TABLE_NAME, table_comment TABLE_COMMENT FROM information_schema.tables where 1=1 ");
if (StringUtils.isNotBlank(database)) { if (StringUtils.isNotBlank(database)) {
......
...@@ -228,12 +228,13 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt { ...@@ -228,12 +228,13 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
List<Map> tableNames = new ArrayList<>(); List<Map> tableNames = new ArrayList<>();
try (Connection connection = getConnection(pluginName, requestParams)) { try (Connection connection = getConnection(pluginName, requestParams)) {
ResultSet resultSet=null; ResultSet resultSet = null;
if (StringUtils.isNotBlank(database)) { if (StringUtils.isNotBlank(database)) {
StringBuilder sql =
StringBuilder sql =new StringBuilder( "select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from " new StringBuilder(
"select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from "
+ " sys.tables a left join sys.extended_properties g on (a.object_id = g.major_id AND g.minor_id = 0) where 1=1 "); + " sys.tables a left join sys.extended_properties g on (a.object_id = g.major_id AND g.minor_id = 0) where 1=1 ");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableName)) {
...@@ -251,17 +252,21 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt { ...@@ -251,17 +252,21 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
resultSet = statement.executeQuery(sqlDatabases.toString()); resultSet = statement.executeQuery(sqlDatabases.toString());
while (resultSet.next()) { while (resultSet.next()) {
// ResultSetMetaData metaData = resultSet.getMetaData();
// ResultSetMetaData metaData = resultSet.getMetaData(); // // 获取表列数
// // 获取表列数 // int columnCount = metaData.getColumnCount();
// int columnCount = metaData.getColumnCount(); // for (int i = 1; i <= columnCount; i++) {
// for (int i = 1; i <= columnCount; i++) { // tables.put(metaData.getColumnName(i),
// tables.put(metaData.getColumnName(i), resultSet.getObject(i)); // resultSet.getObject(i));
// } // }
String databaseName = resultSet.getString("name"); String databaseName = resultSet.getString("name");
if (StringUtils.isNotBlank(databaseName) && !Arrays.asList("master","tempdb","model","msdb").contains(databaseName)) { if (StringUtils.isNotBlank(databaseName)
StringBuilder sql =new StringBuilder( "select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from " && !Arrays.asList("master", "tempdb", "model", "msdb")
.contains(databaseName)) {
StringBuilder sql =
new StringBuilder(
"select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from "
+ " sys.tables a left join sys.extended_properties g on (a.object_id = g.major_id AND g.minor_id = 0) where 1=1 "); + " sys.tables a left join sys.extended_properties g on (a.object_id = g.major_id AND g.minor_id = 0) where 1=1 ");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableName)) {
...@@ -271,29 +276,19 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt { ...@@ -271,29 +276,19 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
statement.execute("use " + database); statement.execute("use " + database);
resultSet = statement.executeQuery(sql.toString()); resultSet = statement.executeQuery(sql.toString());
fillTableNames(tableNames, resultSet); fillTableNames(tableNames, resultSet);
} }
} }
} }
return tableNames; return tableNames;
} catch (SQLException e) { } catch (SQLException e) {
throw new DataSourcePluginException("get table names failed", e); throw new DataSourcePluginException("get table names failed", e);
} }
} }
private static void fillTableNames(List<Map> tableNames, ResultSet resultSet) throws SQLException { private static void fillTableNames(List<Map> tableNames, ResultSet resultSet)
if(Objects.nonNull(resultSet)) { throws SQLException {
if (Objects.nonNull(resultSet)) {
while (resultSet.next()) { while (resultSet.next()) {
Map tables = new HashMap(); Map tables = new HashMap();
......
...@@ -211,7 +211,8 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt { ...@@ -211,7 +211,8 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt {
List<Map> tableNames = new ArrayList<>(); List<Map> tableNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams)) { try (Connection connection = getConnection(requestParams)) {
StringBuilder sql =new StringBuilder( "SELECT * FROM information_schema.tables WHERE 1=1 "); StringBuilder sql =
new StringBuilder("SELECT * FROM information_schema.tables WHERE 1=1 ");
if (StringUtils.isNotBlank(database)) { if (StringUtils.isNotBlank(database)) {
...@@ -223,7 +224,7 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt { ...@@ -223,7 +224,7 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt {
sql.append(" and TABLE_NAME='" + tableName + "'"); sql.append(" and TABLE_NAME='" + tableName + "'");
} }
Statement statement = connection.createStatement(); Statement statement = connection.createStatement();
// statement.execute("use " + database); // statement.execute("use " + database);
ResultSet resultSet = statement.executeQuery(sql.toString()); ResultSet resultSet = statement.executeQuery(sql.toString());
while (resultSet.next()) { while (resultSet.next()) {
Map tables = new HashMap(); Map tables = new HashMap();
......
...@@ -27,13 +27,10 @@ import org.apache.commons.lang3.StringUtils; ...@@ -27,13 +27,10 @@ import org.apache.commons.lang3.StringUtils;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import lombok.NonNull; import lombok.NonNull;
import org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils;
import java.sql.*; import java.sql.*;
import java.util.*; import java.util.*;
import static com.google.common.base.Preconditions.checkNotNull;
public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
List<String> typeList = Arrays.asList("varchar", "char", "json"); List<String> typeList = Arrays.asList("varchar", "char", "json");
public static final Set<String> MYSQL_SYSTEM_DATABASES = public static final Set<String> MYSQL_SYSTEM_DATABASES =
...@@ -102,7 +99,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -102,7 +99,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
try (Connection connection = init(requestParams); try (Connection connection = init(requestParams);
Statement statement = connection.createStatement()) { Statement statement = connection.createStatement()) {
try (ResultSet resultSet = statement.executeQuery("SHOW MASTER STATUS");) { try (ResultSet resultSet = statement.executeQuery("SHOW MASTER STATUS"); ) {
if (resultSet.next()) { if (resultSet.next()) {
String binlogFile = resultSet.getString("File"); String binlogFile = resultSet.getString("File");
if (StringUtils.isBlank(binlogFile)) { if (StringUtils.isBlank(binlogFile)) {
...@@ -179,7 +176,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -179,7 +176,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
ResultSet resultSet = ResultSet resultSet =
connection connection
.getMetaData() .getMetaData()
.getTables(dbName, null, null, new String[]{"TABLE"})) { .getTables(dbName, null, null, new String[] {"TABLE"})) {
while (resultSet.next()) { while (resultSet.next()) {
String tableName = resultSet.getString("TABLE_NAME"); String tableName = resultSet.getString("TABLE_NAME");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableName)) {
...@@ -195,7 +192,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -195,7 +192,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
protected List<TableField> getTableFields( protected List<TableField> getTableFields(
Map<String, String> requestParams, String dbName, String tableName) { Map<String, String> requestParams, String dbName, String tableName) {
List<TableField> tableFields = new ArrayList<>(); List<TableField> tableFields = new ArrayList<>();
try (Connection connection = init(requestParams);) { try (Connection connection = init(requestParams); ) {
DatabaseMetaData metaData = connection.getMetaData(); DatabaseMetaData metaData = connection.getMetaData();
String primaryKey = getPrimaryKey(metaData, dbName, tableName); String primaryKey = getPrimaryKey(metaData, dbName, tableName);
...@@ -272,9 +269,10 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -272,9 +269,10 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
Map<String, String> options) { Map<String, String> options) {
List<Map> tableNames = new ArrayList<>(); List<Map> tableNames = new ArrayList<>();
try (Connection connection =init(requestParams)) { try (Connection connection = init(requestParams)) {
StringBuilder sql = new StringBuilder("SELECT * FROM information_schema.tables WHERE 1=1 "); StringBuilder sql =
new StringBuilder("SELECT * FROM information_schema.tables WHERE 1=1 ");
if (StringUtils.isNotBlank(database)) { if (StringUtils.isNotBlank(database)) {
...@@ -286,7 +284,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -286,7 +284,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
sql.append(" and TABLE_NAME='" + tableName + "'"); sql.append(" and TABLE_NAME='" + tableName + "'");
} }
Statement statement = connection.createStatement(); Statement statement = connection.createStatement();
// statement.execute("use " + database); // statement.execute("use " + database);
ResultSet resultSet = statement.executeQuery(sql.toString()); ResultSet resultSet = statement.executeQuery(sql.toString());
while (resultSet.next()) { while (resultSet.next()) {
...@@ -325,7 +323,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -325,7 +323,7 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
Map<String, String> options, Map<String, String> options,
String tableName) { String tableName) {
try (Connection connection = init(requestParams);) { try (Connection connection = init(requestParams); ) {
String sql = String sql =
"select * from information_schema.tables where table_name ='" "select * from information_schema.tables where table_name ='"
+ tableName + tableName
......
...@@ -233,12 +233,13 @@ public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -233,12 +233,13 @@ public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt {
List<Map> tableNames = new ArrayList<>(); List<Map> tableNames = new ArrayList<>();
try (Connection connection = getConnection(pluginName, requestParams)) { try (Connection connection = getConnection(pluginName, requestParams)) {
ResultSet resultSet=null; ResultSet resultSet = null;
if (StringUtils.isNotBlank(database)) { if (StringUtils.isNotBlank(database)) {
StringBuilder sql =
StringBuilder sql =new StringBuilder( "select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from " new StringBuilder(
"select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from "
+ " sys.tables a left join sys.extended_properties g on (a.object_id = g.major_id AND g.minor_id = 0) where 1=1 "); + " sys.tables a left join sys.extended_properties g on (a.object_id = g.major_id AND g.minor_id = 0) where 1=1 ");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableName)) {
...@@ -256,17 +257,21 @@ public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -256,17 +257,21 @@ public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt {
resultSet = statement.executeQuery(sqlDatabases.toString()); resultSet = statement.executeQuery(sqlDatabases.toString());
while (resultSet.next()) { while (resultSet.next()) {
// ResultSetMetaData metaData = resultSet.getMetaData();
// ResultSetMetaData metaData = resultSet.getMetaData(); // // 获取表列数
// // 获取表列数 // int columnCount = metaData.getColumnCount();
// int columnCount = metaData.getColumnCount(); // for (int i = 1; i <= columnCount; i++) {
// for (int i = 1; i <= columnCount; i++) { // tables.put(metaData.getColumnName(i),
// tables.put(metaData.getColumnName(i), resultSet.getObject(i)); // resultSet.getObject(i));
// } // }
String databaseName = resultSet.getString("name"); String databaseName = resultSet.getString("name");
if (StringUtils.isNotBlank(databaseName) && !Arrays.asList("master","tempdb","model","msdb").contains(databaseName)) { if (StringUtils.isNotBlank(databaseName)
StringBuilder sql =new StringBuilder( "select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from " && !Arrays.asList("master", "tempdb", "model", "msdb")
.contains(databaseName)) {
StringBuilder sql =
new StringBuilder(
"select ROW_NUMBER() OVER (ORDER BY a.name) AS No, a.name TABLE_NAME ,g.class_desc,g.major_id,g.class,g.value TABLE_COMMENT from "
+ " sys.tables a left join sys.extended_properties g on (a.object_id = g.major_id AND g.minor_id = 0) where 1=1 "); + " sys.tables a left join sys.extended_properties g on (a.object_id = g.major_id AND g.minor_id = 0) where 1=1 ");
if (StringUtils.isNotBlank(tableName)) { if (StringUtils.isNotBlank(tableName)) {
...@@ -276,29 +281,19 @@ public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -276,29 +281,19 @@ public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt {
statement.execute("use " + database); statement.execute("use " + database);
resultSet = statement.executeQuery(sql.toString()); resultSet = statement.executeQuery(sql.toString());
fillTableNames(tableNames, resultSet); fillTableNames(tableNames, resultSet);
} }
} }
} }
return tableNames; return tableNames;
} catch (SQLException e) { } catch (SQLException e) {
throw new DataSourcePluginException("get table names failed", e); throw new DataSourcePluginException("get table names failed", e);
} }
} }
private static void fillTableNames(List<Map> tableNames, ResultSet resultSet) throws SQLException { private static void fillTableNames(List<Map> tableNames, ResultSet resultSet)
if(Objects.nonNull(resultSet)) { throws SQLException {
if (Objects.nonNull(resultSet)) {
while (resultSet.next()) { while (resultSet.next()) {
Map tables = new HashMap(); Map tables = new HashMap();
...@@ -324,7 +319,6 @@ public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt { ...@@ -324,7 +319,6 @@ public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt {
} }
} }
@Override @Override
public Map getTableName( public Map getTableName(
@NonNull String pluginName, @NonNull String pluginName,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论