Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
S
seatunnel-web
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
宋勇
seatunnel-web
Commits
d542c81e
提交
d542c81e
authored
8月 21, 2023
作者:
宋勇
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
修改表字段内容
上级
9150e0e4
隐藏空白字符变更
内嵌
并排
正在显示
7 个修改的文件
包含
189 行增加
和
171 行删除
+189
-171
MysqlJdbcDataSourceChannel.java
...asource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
+47
-38
OracleDataSourceChannel.java
...atasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
+56
-52
PostgresqlDataSourceChannel.java
...e/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
+11
-10
SqlServerDataSourceChannel.java
...rce/plugin/sqlserver/jdbc/SqlServerDataSourceChannel.java
+54
-51
TidbJdbcDataSourceChannel.java
...atasource/plugin/tidb/jdbc/TidbJdbcDataSourceChannel.java
+11
-9
MysqlCDCDataSourceChannel.java
...atasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
+10
-9
TableField.java
...che/seatunnel/datasource/plugin/api/model/TableField.java
+0
-2
没有找到文件。
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
浏览文件 @
d542c81e
...
@@ -55,10 +55,10 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
...
@@ -55,10 +55,10 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
Map
<
String
,
String
>
option
)
{
Map
<
String
,
String
>
option
)
{
List
<
String
>
tableNames
=
new
ArrayList
<>();
List
<
String
>
tableNames
=
new
ArrayList
<>();
try
(
Connection
connection
=
getConnection
(
requestParams
);
try
(
Connection
connection
=
getConnection
(
requestParams
);
ResultSet
resultSet
=
ResultSet
resultSet
=
connection
connection
.
getMetaData
()
.
getMetaData
()
.
getTables
(
database
,
null
,
null
,
new
String
[]
{
"TABLE"
}))
{
.
getTables
(
database
,
null
,
null
,
new
String
[]
{
"TABLE"
}))
{
while
(
resultSet
.
next
())
{
while
(
resultSet
.
next
())
{
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
...
@@ -76,8 +76,8 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
...
@@ -76,8 +76,8 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
@NonNull
String
pluginName
,
@NonNull
Map
<
String
,
String
>
requestParams
)
{
@NonNull
String
pluginName
,
@NonNull
Map
<
String
,
String
>
requestParams
)
{
List
<
String
>
dbNames
=
new
ArrayList
<>();
List
<
String
>
dbNames
=
new
ArrayList
<>();
try
(
Connection
connection
=
getConnection
(
requestParams
);
try
(
Connection
connection
=
getConnection
(
requestParams
);
PreparedStatement
statement
=
connection
.
prepareStatement
(
"SHOW DATABASES;"
);
PreparedStatement
statement
=
connection
.
prepareStatement
(
"SHOW DATABASES;"
);
ResultSet
re
=
statement
.
executeQuery
())
{
ResultSet
re
=
statement
.
executeQuery
())
{
// filter system databases
// filter system databases
while
(
re
.
next
())
{
while
(
re
.
next
())
{
String
dbName
=
re
.
getString
(
"database"
);
String
dbName
=
re
.
getString
(
"database"
);
...
@@ -113,30 +113,40 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
...
@@ -113,30 +113,40 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
try
(
ResultSet
resultSet
=
metaData
.
getColumns
(
database
,
null
,
table
,
null
))
{
String
sql
=
while
(
resultSet
.
next
())
{
"SELECT * FROM information_schema.COLUMNS"
TableField
tableField
=
new
TableField
();
+
"WHERE "
String
columnName
=
resultSet
.
getString
(
"COLUMN_NAME"
);
+
"TABLE_NAME = '"
tableField
.
setPrimaryKey
(
false
);
+
table
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
+
"' and TABLE_SCHEMA='"
tableField
.
setPrimaryKey
(
true
);
+
database
}
+
"'"
;
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
Statement
statement
=
connection
.
createStatement
();
if
(
typeList
.
contains
(
tableField
.
getType
().
toLowerCase
()))
{
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
while
(
resultSet
.
next
())
{
TableField
tableField
=
new
TableField
();
String
columnName
=
resultSet
.
getString
(
"COLUMN_NAME"
);
tableField
.
setPrimaryKey
(
false
);
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
tableField
.
setPrimaryKey
(
true
);
}
tableField
.
setType
(
resultSet
.
getString
(
"DATA_TYPE"
));
if
(
typeList
.
contains
(
tableField
.
getType
().
toLowerCase
()))
{
if
(
tableField
.
getType
().
toLowerCase
().
indexOf
(
"text"
)
<
0
)
{
tableField
.
setLen
(
resultSet
.
getString
(
"CHARACTER_MAXIMUM_LENGTH"
));
tableField
.
setLen
(
resultSet
.
getString
(
"CHARACTER_MAXIMUM_LENGTH"
));
}
else
{
tableField
.
setLen
(
resultSet
.
getString
(
"NUMERIC_PRECISION"
));
tableField
.
setScale
(
resultSet
.
getString
(
"NUMERIC_SCALE"
));
}
}
}
else
{
tableField
.
setLen
(
resultSet
.
getString
(
"NUMERIC_PRECISION"
));
tableField
.
setName
(
columnName
);
tableField
.
setScale
(
resultSet
.
getString
(
"NUMERIC_SCALE"
));
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
tableFields
.
add
(
tableField
);
}
}
tableField
.
setName
(
columnName
);
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
tableFields
.
add
(
tableField
);
}
}
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
}
...
@@ -196,13 +206,11 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
...
@@ -196,13 +206,11 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
List
<
Map
>
tableNames
=
new
ArrayList
<>();
List
<
Map
>
tableNames
=
new
ArrayList
<>();
try
(
Connection
connection
=
getConnection
(
requestParams
))
{
try
(
Connection
connection
=
getConnection
(
requestParams
))
{
String
sql
=
String
sql
=
"SELECT * FROM information_schema.tables WHERE table_schema='"
+
database
+
"'"
;
"SELECT * FROM information_schema.tables WHERE table_schema='"
+
database
+
"'"
;
Statement
statement
=
connection
.
createStatement
();
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
while
(
resultSet
.
next
())
{
while
(
resultSet
.
next
())
{
Map
tables
=
new
HashMap
();
Map
tables
=
new
HashMap
();
...
@@ -210,19 +218,20 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
...
@@ -210,19 +218,20 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
// 获取表列数
// 获取表列数
int
columnCount
=
metaData
.
getColumnCount
();
int
columnCount
=
metaData
.
getColumnCount
();
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
tables
.
put
(
metaData
.
getColumnName
(
i
),
resultSet
.
getObject
(
i
));
}
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
tables
.
put
(
"TABLE_NAME"
,
tableName
);
tables
.
put
(
"TABLE_NAME"
,
tableName
);
}
}
String
tableComment
=
resultSet
.
getString
(
"TABLE_COMMENT"
);
String
tableComment
=
resultSet
.
getString
(
"TABLE_COMMENT"
);
if
(
StringUtils
.
isNotBlank
(
tableComment
))
{
if
(
StringUtils
.
isNotBlank
(
tableComment
))
{
tables
.
put
(
"TABLE_COMMENT"
,
tableComment
);
tables
.
put
(
"TABLE_COMMENT"
,
tableComment
);
}
tables
.
put
(
metaData
.
getColumnName
(
i
),
resultSet
.
getObject
(
i
));
}
}
tableNames
.
add
(
tables
);
}
}
return
tableNames
;
return
tableNames
;
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
浏览文件 @
d542c81e
...
@@ -33,7 +33,8 @@ import java.util.*;
...
@@ -33,7 +33,8 @@ import java.util.*;
import
static
com
.
google
.
common
.
base
.
Preconditions
.
checkNotNull
;
import
static
com
.
google
.
common
.
base
.
Preconditions
.
checkNotNull
;
public
class
OracleDataSourceChannel
implements
DataSourceChannelExt
{
public
class
OracleDataSourceChannel
implements
DataSourceChannelExt
{
List
<
String
>
typeList
=
Arrays
.
asList
(
"varchar"
,
"varchar2"
,
"char"
,
"json"
);
List
<
String
>
typeList
=
Arrays
.
asList
(
"varchar"
,
"varchar2"
,
"char"
,
"json"
);
@Override
@Override
public
OptionRule
getDataSourceOptions
(
@NonNull
String
pluginName
)
{
public
OptionRule
getDataSourceOptions
(
@NonNull
String
pluginName
)
{
return
OracleDataSourceConfig
.
OPTION_RULE
;
return
OracleDataSourceConfig
.
OPTION_RULE
;
...
@@ -110,54 +111,56 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
...
@@ -110,54 +111,56 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
String
sql
=
String
sql
=
"SELECT\n"
+
"SELECT\n"
"\tt.TABLE_NAME,\n"
+
+
"\tt.TABLE_NAME,\n"
"\tt.COLUMN_NAME,\n"
+
+
"\tt.COLUMN_NAME,\n"
"\tt.COMMENTS,\n"
+
+
"\tt.COMMENTS,\n"
"\tt1.DATA_TYPE,\n"
+
+
"\tt1.DATA_TYPE,\n"
"\tt1.DATA_LENGTH VAR_LEN,\n"
+
+
"\tt1.DATA_LENGTH VAR_LEN,\n"
"\tt1.DATA_PRECISION LEN,\n"
+
+
"\tt1.DATA_PRECISION LEN,\n"
"\tDATA_SCALE ,\n"
+
+
"\tDATA_SCALE ,\n"
"\tCASE NULLABLE\n"
+
+
"\tCASE NULLABLE\n"
"\tWHEN 'N' THEN\n"
+
+
"\tWHEN 'N' THEN\n"
"\t\t'true'\n"
+
+
"\t\t'true'\n"
"\tELSE\n"
+
+
"\tELSE\n"
"\t\t'false'\n"
+
+
"\t\t'false'\n"
"END IS_NULLABLE\n"
+
+
"END IS_NULLABLE\n"
"\n"
+
+
"\n"
"FROM\n"
+
+
"FROM\n"
"\tall_col_comments t\n"
+
+
"\tall_col_comments t\n"
"\tINNER JOIN all_tab_cols t1 ON t1.TABLE_NAME = t.table_name \n"
+
+
"\tINNER JOIN all_tab_cols t1 ON t1.TABLE_NAME = t.table_name \n"
"\tAND t1.COLUMN_NAME = t.column_name \n"
+
+
"\tAND t1.COLUMN_NAME = t.column_name \n"
"WHERE "
+
+
"WHERE "
"\tt.table_name = '"
+
table
+
"' and t.OWNER='"
+
database
+
"'"
;
+
"\tt.table_name = '"
+
table
+
"' and t.OWNER='"
+
database
+
"'"
;
Statement
statement
=
connection
.
createStatement
();
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
while
(
resultSet
.
next
())
{
TableField
tableField
=
new
TableField
();
String
columnName
=
resultSet
.
getString
(
"COLUMN_NAME"
);
tableField
.
setPrimaryKey
(
false
);
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
tableField
.
setPrimaryKey
(
true
);
}
while
(
resultSet
.
next
())
{
if
(
typeList
.
contains
(
tableField
.
getType
().
toLowerCase
()))
{
TableField
tableField
=
new
TableField
();
tableField
.
setLen
(
resultSet
.
getString
(
"VAR_LEN"
));
String
columnName
=
resultSet
.
getString
(
"COLUMN_NAME"
);
}
else
{
tableField
.
setPrimaryKey
(
false
);
tableField
.
setLen
(
resultSet
.
getString
(
"LEN"
));
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
tableField
.
setScale
(
resultSet
.
getString
(
"DATA_SCALE"
));
tableField
.
setPrimaryKey
(
true
);
}
if
(
typeList
.
contains
(
tableField
.
getType
().
toLowerCase
()))
{
tableField
.
setLen
(
resultSet
.
getString
(
"VAR_LEN"
));
}
else
{
tableField
.
setLen
(
resultSet
.
getString
(
"LEN"
));
tableField
.
setScale
(
resultSet
.
getString
(
"DATA_SCALE"
));
}
tableField
.
setName
(
columnName
);
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
tableFields
.
add
(
tableField
);
}
}
tableField
.
setName
(
columnName
);
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
tableFields
.
add
(
tableField
);
}
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
...
@@ -222,19 +225,20 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
...
@@ -222,19 +225,20 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
// 获取表列数
// 获取表列数
int
columnCount
=
metaData
.
getColumnCount
();
int
columnCount
=
metaData
.
getColumnCount
();
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
tables
.
put
(
metaData
.
getColumnName
(
i
),
resultSet
.
getObject
(
i
));
}
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
tables
.
put
(
"TABLE_NAME"
,
tableName
);
tables
.
put
(
"TABLE_NAME"
,
tableName
);
}
}
String
tableComment
=
resultSet
.
getString
(
"TABLE_COMMENT"
);
String
tableComment
=
resultSet
.
getString
(
"TABLE_COMMENT"
);
if
(
StringUtils
.
isNotBlank
(
tableComment
))
{
if
(
StringUtils
.
isNotBlank
(
tableComment
))
{
tables
.
put
(
"TABLE_COMMENT"
,
tableComment
);
tables
.
put
(
"TABLE_COMMENT"
,
tableComment
);
}
tables
.
put
(
metaData
.
getColumnName
(
i
),
resultSet
.
getObject
(
i
));
}
}
tableNames
.
add
(
tables
);
}
}
return
tableNames
;
return
tableNames
;
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
浏览文件 @
d542c81e
...
@@ -34,6 +34,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
...
@@ -34,6 +34,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
public
class
PostgresqlDataSourceChannel
implements
DataSourceChannelExt
{
public
class
PostgresqlDataSourceChannel
implements
DataSourceChannelExt
{
List
<
String
>
typeList
=
Arrays
.
asList
(
"varchar"
,
"char"
,
"json"
);
List
<
String
>
typeList
=
Arrays
.
asList
(
"varchar"
,
"char"
,
"json"
);
@Override
@Override
public
OptionRule
getDataSourceOptions
(
@NonNull
String
pluginName
)
{
public
OptionRule
getDataSourceOptions
(
@NonNull
String
pluginName
)
{
return
PostgresqlDataSourceConfig
.
OPTION_RULE
;
return
PostgresqlDataSourceConfig
.
OPTION_RULE
;
...
@@ -199,7 +200,6 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
...
@@ -199,7 +200,6 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
Statement
statement
=
connection
.
createStatement
();
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
while
(
resultSet
.
next
())
{
while
(
resultSet
.
next
())
{
Map
tables
=
new
HashMap
();
Map
tables
=
new
HashMap
();
...
@@ -207,19 +207,20 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
...
@@ -207,19 +207,20 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
// 获取表列数
// 获取表列数
int
columnCount
=
metaData
.
getColumnCount
();
int
columnCount
=
metaData
.
getColumnCount
();
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
tables
.
put
(
metaData
.
getColumnName
(
i
),
resultSet
.
getObject
(
i
));
}
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
tables
.
put
(
"TABLE_NAME"
,
tableName
);
tables
.
put
(
"TABLE_NAME"
,
tableName
);
}
}
String
tableComment
=
resultSet
.
getString
(
"TABLE_COMMENT"
);
String
tableComment
=
resultSet
.
getString
(
"TABLE_COMMENT"
);
if
(
StringUtils
.
isNotBlank
(
tableComment
))
{
if
(
StringUtils
.
isNotBlank
(
tableComment
))
{
tables
.
put
(
"TABLE_COMMENT"
,
tableComment
);
tables
.
put
(
"TABLE_COMMENT"
,
tableComment
);
}
tables
.
put
(
metaData
.
getColumnName
(
i
),
resultSet
.
getObject
(
i
));
}
}
tableNames
.
add
(
tables
);
}
}
return
tableNames
;
return
tableNames
;
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-sqlserver/src/main/java/org/apache/seatunnel/datasource/plugin/sqlserver/jdbc/SqlServerDataSourceChannel.java
浏览文件 @
d542c81e
...
@@ -116,49 +116,51 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
...
@@ -116,49 +116,51 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
String
sql
=
String
sql
=
"SELECT\n"
+
"SELECT\n"
"\tA.name AS TABLE_NAME,\n"
+
+
"\tA.name AS TABLE_NAME,\n"
"\tf.value AS TABLE_COMMENT,\n"
+
+
"\tf.value AS TABLE_COMMENT,\n"
"\tB.name AS COLUMN_NAME,\n"
+
+
"\tB.name AS COLUMN_NAME,\n"
"\tB.max_length AS COLUMN_LENGTH,\n"
+
+
"\tB.max_length AS COLUMN_LENGTH,\n"
"\tB.scale AS COLUMN_SCALE,\n"
+
+
"\tB.scale AS COLUMN_SCALE,\n"
"\tC.value AS REMARKS,\n"
+
+
"\tC.value AS REMARKS,\n"
"\tD.name AS TYPE_NAME,\n"
+
+
"\tD.name AS TYPE_NAME,\n"
"CASE\n"
+
+
"CASE\n"
"\t\tA.type \n"
+
+
"\t\tA.type \n"
"\t\tWHEN 'u' THEN\n"
+
+
"\t\tWHEN 'u' THEN\n"
"\t\t'用户定义表' \n"
+
+
"\t\t'用户定义表' \n"
"\tEND AS Type,\n"
+
+
"\tEND AS Type,\n"
"\tB.is_nullable AS IS_NULLABLE \n"
+
+
"\tB.is_nullable AS IS_NULLABLE \n"
"FROM\n"
+
+
"FROM\n"
"\tsys.tables AS A\n"
+
+
"\tsys.tables AS A\n"
"\tINNER JOIN sys.columns AS B ON B.object_id = A.object_id\n"
+
+
"\tINNER JOIN sys.columns AS B ON B.object_id = A.object_id\n"
"\tLEFT JOIN sys.extended_properties AS C ON C.major_id = B.object_id \n"
+
+
"\tLEFT JOIN sys.extended_properties AS C ON C.major_id = B.object_id \n"
"\tAND C.minor_id = B.column_id\n"
+
+
"\tAND C.minor_id = B.column_id\n"
"\tLEFT JOIN sys.types AS D ON D.system_type_id = B.system_type_id\n"
+
+
"\tLEFT JOIN sys.types AS D ON D.system_type_id = B.system_type_id\n"
"\tleft JOIN sys.extended_properties AS f ON A.object_id = f.major_id \n"
+
+
"\tleft JOIN sys.extended_properties AS f ON A.object_id = f.major_id \n"
"\tAND f.minor_id = 0 \n"
+
+
"\tAND f.minor_id = 0 \n"
"WHERE\n"
+
+
"WHERE\n"
"\tA.name = '"
+
table
+
"'"
;
+
"\tA.name = '"
Statement
statement
=
connection
.
createStatement
();
+
table
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
+
"'"
;
while
(
resultSet
.
next
())
{
Statement
statement
=
connection
.
createStatement
();
TableField
tableField
=
new
TableField
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
String
columnName
=
resultSet
.
getString
(
"COLUMN_NAME"
);
while
(
resultSet
.
next
())
{
tableField
.
setPrimaryKey
(
false
);
TableField
tableField
=
new
TableField
();
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
String
columnName
=
resultSet
.
getString
(
"COLUMN_NAME"
);
tableField
.
setPrimaryKey
(
true
);
tableField
.
setPrimaryKey
(
false
);
}
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
tableField
.
setLen
(
resultSet
.
getString
(
"COLUMN_LENGTH"
));
tableField
.
setPrimaryKey
(
true
);
tableField
.
setScale
(
resultSet
.
getString
(
"COLUMN_SCALE"
));
tableField
.
setName
(
columnName
);
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
tableFields
.
add
(
tableField
);
}
}
tableField
.
setLen
(
resultSet
.
getString
(
"COLUMN_LENGTH"
));
tableField
.
setScale
(
resultSet
.
getString
(
"COLUMN_SCALE"
));
tableField
.
setName
(
columnName
);
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
tableFields
.
add
(
tableField
);
}
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
...
@@ -224,19 +226,20 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
...
@@ -224,19 +226,20 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
// 获取表列数
// 获取表列数
int
columnCount
=
metaData
.
getColumnCount
();
int
columnCount
=
metaData
.
getColumnCount
();
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
tables
.
put
(
metaData
.
getColumnName
(
i
),
resultSet
.
getObject
(
i
));
}
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
tables
.
put
(
"TABLE_NAME"
,
tableName
);
tables
.
put
(
"TABLE_NAME"
,
tableName
);
}
}
String
tableComment
=
resultSet
.
getString
(
"TABLE_COMMENT"
);
String
tableComment
=
resultSet
.
getString
(
"TABLE_COMMENT"
);
if
(
StringUtils
.
isNotBlank
(
tableComment
))
{
if
(
StringUtils
.
isNotBlank
(
tableComment
))
{
tables
.
put
(
"TABLE_COMMENT"
,
tableComment
);
tables
.
put
(
"TABLE_COMMENT"
,
tableComment
);
}
tables
.
put
(
metaData
.
getColumnName
(
i
),
resultSet
.
getObject
(
i
));
}
}
tableNames
.
add
(
tables
);
}
}
return
tableNames
;
return
tableNames
;
}
catch
(
SQLException
e
)
{
}
catch
(
SQLException
e
)
{
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-tidb/src/main/java/org/apache/seatunnel/datasource/plugin/tidb/jdbc/TidbJdbcDataSourceChannel.java
浏览文件 @
d542c81e
...
@@ -36,6 +36,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
...
@@ -36,6 +36,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
public
class
TidbJdbcDataSourceChannel
implements
DataSourceChannelExt
{
public
class
TidbJdbcDataSourceChannel
implements
DataSourceChannelExt
{
List
<
String
>
typeList
=
Arrays
.
asList
(
"varchar"
,
"char"
,
"json"
);
List
<
String
>
typeList
=
Arrays
.
asList
(
"varchar"
,
"char"
,
"json"
);
@Override
@Override
public
OptionRule
getDataSourceOptions
(
@NonNull
String
pluginName
)
{
public
OptionRule
getDataSourceOptions
(
@NonNull
String
pluginName
)
{
return
TidbDataSourceConfig
.
OPTION_RULE
;
return
TidbDataSourceConfig
.
OPTION_RULE
;
...
@@ -202,19 +203,20 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt {
...
@@ -202,19 +203,20 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt {
// 获取表列数
// 获取表列数
int
columnCount
=
metaData
.
getColumnCount
();
int
columnCount
=
metaData
.
getColumnCount
();
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
tables
.
put
(
metaData
.
getColumnName
(
i
),
resultSet
.
getObject
(
i
));
}
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
tables
.
put
(
"TABLE_NAME"
,
tableName
);
tables
.
put
(
"TABLE_NAME"
,
tableName
);
}
}
String
tableComment
=
resultSet
.
getString
(
"TABLE_COMMENT"
);
String
tableComment
=
resultSet
.
getString
(
"TABLE_COMMENT"
);
if
(
StringUtils
.
isNotBlank
(
tableComment
))
{
if
(
StringUtils
.
isNotBlank
(
tableComment
))
{
tables
.
put
(
"TABLE_COMMENT"
,
tableComment
);
tables
.
put
(
"TABLE_COMMENT"
,
tableComment
);
}
tables
.
put
(
metaData
.
getColumnName
(
i
),
resultSet
.
getObject
(
i
));
}
}
tableNames
.
add
(
tables
);
}
}
return
tableNames
;
return
tableNames
;
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-mysql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
浏览文件 @
d542c81e
...
@@ -263,19 +263,20 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
...
@@ -263,19 +263,20 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
// 获取表列数
// 获取表列数
int
columnCount
=
metaData
.
getColumnCount
();
int
columnCount
=
metaData
.
getColumnCount
();
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
tables
.
put
(
metaData
.
getColumnName
(
i
),
resultSet
.
getObject
(
i
));
}
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
tables
.
put
(
"TABLE_NAME"
,
tableName
);
tables
.
put
(
"TABLE_NAME"
,
tableName
);
}
}
String
tableComment
=
resultSet
.
getString
(
"TABLE_COMMENT"
);
String
tableComment
=
resultSet
.
getString
(
"TABLE_COMMENT"
);
if
(
StringUtils
.
isNotBlank
(
tableComment
))
{
if
(
StringUtils
.
isNotBlank
(
tableComment
))
{
tables
.
put
(
"TABLE_COMMENT"
,
tableComment
);
tables
.
put
(
"TABLE_COMMENT"
,
tableComment
);
}
tables
.
put
(
metaData
.
getColumnName
(
i
),
resultSet
.
getObject
(
i
));
}
}
tableNames
.
add
(
tables
);
}
}
return
tableNames
;
return
tableNames
;
}
catch
(
SQLException
e
)
{
}
catch
(
SQLException
e
)
{
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/model/TableField.java
浏览文件 @
d542c81e
...
@@ -42,9 +42,7 @@ public class TableField {
...
@@ -42,9 +42,7 @@ public class TableField {
private
String
outputDataType
;
private
String
outputDataType
;
private
String
len
;
private
String
len
;
private
String
scale
;
private
String
scale
;
}
}
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论