Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
S
seatunnel-web
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
宋勇
seatunnel-web
Commits
c76c04fe
提交
c76c04fe
authored
8月 19, 2023
作者:
hs364342311
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/master'
上级
142751c7
e7ecfaf9
隐藏空白字符变更
内嵌
并排
正在显示
6 个修改的文件
包含
128 行增加
和
42 行删除
+128
-42
MysqlJdbcDataSourceChannel.java
...asource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
+27
-16
OracleDataSourceChannel.java
...atasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
+38
-7
PostgresqlDataSourceChannel.java
...e/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
+18
-11
SqlServerDataSourceChannel.java
...rce/plugin/sqlserver/jdbc/SqlServerDataSourceChannel.java
+31
-2
TidbJdbcDataSourceChannel.java
...atasource/plugin/tidb/jdbc/TidbJdbcDataSourceChannel.java
+9
-6
TableField.java
...che/seatunnel/datasource/plugin/api/model/TableField.java
+5
-0
没有找到文件。
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
浏览文件 @
c76c04fe
...
...
@@ -28,16 +28,14 @@ import org.apache.commons.lang3.StringUtils;
import
lombok.NonNull
;
import
java.sql.*
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.*
;
import
java.util.function.Function
;
import
java.util.stream.Collectors
;
import
static
com
.
google
.
common
.
base
.
Preconditions
.
checkNotNull
;
public
class
MysqlJdbcDataSourceChannel
implements
DataSourceChannelExt
{
List
<
String
>
typeList
=
Arrays
.
asList
(
"varchar"
,
"char"
,
"json"
);
@Override
public
OptionRule
getDataSourceOptions
(
@NonNull
String
pluginName
)
{
...
...
@@ -57,10 +55,10 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
Map
<
String
,
String
>
option
)
{
List
<
String
>
tableNames
=
new
ArrayList
<>();
try
(
Connection
connection
=
getConnection
(
requestParams
);
ResultSet
resultSet
=
connection
.
getMetaData
()
.
getTables
(
database
,
null
,
null
,
new
String
[]
{
"TABLE"
}))
{
ResultSet
resultSet
=
connection
.
getMetaData
()
.
getTables
(
database
,
null
,
null
,
new
String
[]
{
"TABLE"
}))
{
while
(
resultSet
.
next
())
{
String
tableName
=
resultSet
.
getString
(
"TABLE_NAME"
);
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
...
...
@@ -78,8 +76,8 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
@NonNull
String
pluginName
,
@NonNull
Map
<
String
,
String
>
requestParams
)
{
List
<
String
>
dbNames
=
new
ArrayList
<>();
try
(
Connection
connection
=
getConnection
(
requestParams
);
PreparedStatement
statement
=
connection
.
prepareStatement
(
"SHOW DATABASES;"
);
ResultSet
re
=
statement
.
executeQuery
())
{
PreparedStatement
statement
=
connection
.
prepareStatement
(
"SHOW DATABASES;"
);
ResultSet
re
=
statement
.
executeQuery
())
{
// filter system databases
while
(
re
.
next
())
{
String
dbName
=
re
.
getString
(
"database"
);
...
...
@@ -114,6 +112,7 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
try
(
Connection
connection
=
getConnection
(
requestParams
,
database
))
{
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
try
(
ResultSet
resultSet
=
metaData
.
getColumns
(
database
,
null
,
table
,
null
))
{
while
(
resultSet
.
next
())
{
TableField
tableField
=
new
TableField
();
...
...
@@ -122,8 +121,16 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
tableField
.
setPrimaryKey
(
true
);
}
tableField
.
setName
(
columnName
);
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
if
(
typeList
.
contains
(
tableField
.
getType
().
toLowerCase
()))
{
tableField
.
setLen
(
resultSet
.
getString
(
"CHARACTER_MAXIMUM_LENGTH"
));
}
else
{
tableField
.
setLen
(
resultSet
.
getString
(
"NUMERIC_PRECISION"
));
tableField
.
setScale
(
resultSet
.
getString
(
"NUMERIC_SCALE"
));
}
tableField
.
setName
(
columnName
);
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
...
...
@@ -187,11 +194,15 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
String
database
,
Map
<
String
,
String
>
options
)
{
List
<
Map
>
tableNames
=
new
ArrayList
<>();
try
(
Connection
connection
=
getConnection
(
requestParams
);
ResultSet
resultSet
=
connection
.
getMetaData
()
.
getTables
(
database
,
null
,
null
,
new
String
[]
{
"TABLE"
}))
{
try
(
Connection
connection
=
getConnection
(
requestParams
))
{
String
sql
=
"SELECT * FROM information_schema.tables WHERE table_schema='"
+
database
+
"'"
;
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
while
(
resultSet
.
next
())
{
Map
tables
=
new
HashMap
();
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
浏览文件 @
c76c04fe
...
...
@@ -28,15 +28,12 @@ import org.apache.commons.lang3.StringUtils;
import
lombok.NonNull
;
import
java.sql.*
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.*
;
import
static
com
.
google
.
common
.
base
.
Preconditions
.
checkNotNull
;
public
class
OracleDataSourceChannel
implements
DataSourceChannelExt
{
List
<
String
>
typeList
=
Arrays
.
asList
(
"varchar"
,
"varchar2"
,
"char"
,
"json"
);
@Override
public
OptionRule
getDataSourceOptions
(
@NonNull
String
pluginName
)
{
return
OracleDataSourceConfig
.
OPTION_RULE
;
...
...
@@ -112,7 +109,34 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
try
(
Connection
connection
=
getConnection
(
requestParams
,
database
))
{
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
try
(
ResultSet
resultSet
=
metaData
.
getColumns
(
database
,
null
,
table
,
null
))
{
String
sql
=
"SELECT\n"
+
"\tt.TABLE_NAME,\n"
+
"\tt.COLUMN_NAME,\n"
+
"\tt.COMMENTS,\n"
+
"\tt1.DATA_TYPE,\n"
+
"\tt1.DATA_LENGTH VAR_LEN,\n"
+
"\tt1.DATA_PRECISION LEN,\n"
+
"\tDATA_SCALE ,\n"
+
"\tCASE NULLABLE\n"
+
"\tWHEN 'N' THEN\n"
+
"\t\t'true'\n"
+
"\tELSE\n"
+
"\t\t'false'\n"
+
"END IS_NULLABLE\n"
+
"\n"
+
"FROM\n"
+
"\tall_col_comments t\n"
+
"\tINNER JOIN all_tab_cols t1 ON t1.TABLE_NAME = t.table_name \n"
+
"\tAND t1.COLUMN_NAME = t.column_name \n"
+
"WHERE "
+
"\tt.table_name = '"
+
table
+
"' and t.OWNER='"
+
database
+
"'"
;
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
while
(
resultSet
.
next
())
{
TableField
tableField
=
new
TableField
();
String
columnName
=
resultSet
.
getString
(
"COLUMN_NAME"
);
...
...
@@ -120,6 +144,13 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
tableField
.
setPrimaryKey
(
true
);
}
if
(
typeList
.
contains
(
tableField
.
getType
().
toLowerCase
()))
{
tableField
.
setLen
(
resultSet
.
getString
(
"VAR_LEN"
));
}
else
{
tableField
.
setLen
(
resultSet
.
getString
(
"LEN"
));
tableField
.
setScale
(
resultSet
.
getString
(
"DATA_SCALE"
));
}
tableField
.
setName
(
columnName
);
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
...
...
@@ -127,7 +158,7 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
tableFields
.
add
(
tableField
);
}
}
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
浏览文件 @
c76c04fe
...
...
@@ -28,15 +28,12 @@ import org.apache.commons.lang3.StringUtils;
import
lombok.NonNull
;
import
java.sql.*
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.*
;
import
static
com
.
google
.
common
.
base
.
Preconditions
.
checkNotNull
;
public
class
PostgresqlDataSourceChannel
implements
DataSourceChannelExt
{
List
<
String
>
typeList
=
Arrays
.
asList
(
"varchar"
,
"char"
,
"json"
);
@Override
public
OptionRule
getDataSourceOptions
(
@NonNull
String
pluginName
)
{
return
PostgresqlDataSourceConfig
.
OPTION_RULE
;
...
...
@@ -129,8 +126,16 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
tableField
.
setPrimaryKey
(
true
);
}
tableField
.
setName
(
columnName
);
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
if
(
typeList
.
contains
(
tableField
.
getType
().
toLowerCase
()))
{
tableField
.
setLen
(
resultSet
.
getString
(
"CHARACTER_MAXIMUM_LENGTH"
));
}
else
{
tableField
.
setLen
(
resultSet
.
getString
(
"NUMERIC_PRECISION"
));
tableField
.
setScale
(
resultSet
.
getString
(
"NUMERIC_SCALE"
));
}
tableField
.
setName
(
columnName
);
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
...
...
@@ -188,11 +193,13 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
String
database
,
Map
<
String
,
String
>
options
)
{
List
<
Map
>
tableNames
=
new
ArrayList
<>();
try
(
Connection
connection
=
getConnection
(
requestParams
);
ResultSet
resultSet
=
connection
.
getMetaData
()
.
getTables
(
database
,
null
,
null
,
new
String
[]
{
"TABLE"
}))
{
try
(
Connection
connection
=
getConnection
(
requestParams
))
{
String
sql
=
"SELECT table_schema, table_name TABLE_NAME, table_comment TABLE_COMMENT FROM information_schema.tables"
;
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
while
(
resultSet
.
next
())
{
Map
tables
=
new
HashMap
();
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-sqlserver/src/main/java/org/apache/seatunnel/datasource/plugin/sqlserver/jdbc/SqlServerDataSourceChannel.java
浏览文件 @
c76c04fe
...
...
@@ -115,7 +115,34 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
try
(
Connection
connection
=
getConnection
(
requestParams
,
null
))
{
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
try
(
ResultSet
resultSet
=
metaData
.
getColumns
(
database
,
null
,
table
,
null
))
{
String
sql
=
"SELECT\n"
+
"\tA.name AS TABLE_NAME,\n"
+
"\tf.value AS TABLE_COMMENT,\n"
+
"\tB.name AS COLUMN_NAME,\n"
+
"\tB.max_length AS COLUMN_LENGTH,\n"
+
"\tB.scale AS COLUMN_SCALE,\n"
+
"\tC.value AS REMARKS,\n"
+
"\tD.name AS TYPE_NAME,\n"
+
"CASE\n"
+
"\t\tA.type \n"
+
"\t\tWHEN 'u' THEN\n"
+
"\t\t'用户定义表' \n"
+
"\tEND AS Type,\n"
+
"\tB.is_nullable AS IS_NULLABLE \n"
+
"FROM\n"
+
"\tsys.tables AS A\n"
+
"\tINNER JOIN sys.columns AS B ON B.object_id = A.object_id\n"
+
"\tLEFT JOIN sys.extended_properties AS C ON C.major_id = B.object_id \n"
+
"\tAND C.minor_id = B.column_id\n"
+
"\tLEFT JOIN sys.types AS D ON D.system_type_id = B.system_type_id\n"
+
"\tleft JOIN sys.extended_properties AS f ON A.object_id = f.major_id \n"
+
"\tAND f.minor_id = 0 \n"
+
"WHERE\n"
+
"\tA.name = '"
+
table
+
"'"
;
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
while
(
resultSet
.
next
())
{
TableField
tableField
=
new
TableField
();
String
columnName
=
resultSet
.
getString
(
"COLUMN_NAME"
);
...
...
@@ -123,6 +150,8 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
tableField
.
setPrimaryKey
(
true
);
}
tableField
.
setLen
(
resultSet
.
getString
(
"COLUMN_LENGTH"
));
tableField
.
setScale
(
resultSet
.
getString
(
"COLUMN_SCALE"
));
tableField
.
setName
(
columnName
);
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
...
...
@@ -130,7 +159,7 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
tableFields
.
add
(
tableField
);
}
}
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-tidb/src/main/java/org/apache/seatunnel/datasource/plugin/tidb/jdbc/TidbJdbcDataSourceChannel.java
浏览文件 @
c76c04fe
...
...
@@ -28,17 +28,14 @@ import org.apache.commons.lang3.StringUtils;
import
lombok.NonNull
;
import
java.sql.*
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.*
;
import
java.util.function.Function
;
import
java.util.stream.Collectors
;
import
static
com
.
google
.
common
.
base
.
Preconditions
.
checkNotNull
;
public
class
TidbJdbcDataSourceChannel
implements
DataSourceChannelExt
{
List
<
String
>
typeList
=
Arrays
.
asList
(
"varchar"
,
"char"
,
"json"
);
@Override
public
OptionRule
getDataSourceOptions
(
@NonNull
String
pluginName
)
{
return
TidbDataSourceConfig
.
OPTION_RULE
;
...
...
@@ -122,8 +119,14 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt {
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
tableField
.
setPrimaryKey
(
true
);
}
tableField
.
setName
(
columnName
);
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
if
(
typeList
.
contains
(
tableField
.
getType
().
toLowerCase
()))
{
tableField
.
setLen
(
resultSet
.
getString
(
"CHARACTER_MAXIMUM_LENGTH"
));
}
else
{
tableField
.
setLen
(
resultSet
.
getString
(
"NUMERIC_PRECISION"
));
tableField
.
setScale
(
resultSet
.
getString
(
"NUMERIC_SCALE"
));
}
tableField
.
setName
(
columnName
);
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/model/TableField.java
浏览文件 @
c76c04fe
...
...
@@ -41,4 +41,9 @@ public class TableField {
private
Boolean
unSupport
;
private
String
outputDataType
;
private
String
len
;
private
String
scale
;
}
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论