Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
S
seatunnel-web
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
宋勇
seatunnel-web
Commits
4f1e6809
提交
4f1e6809
authored
9月 04, 2023
作者:
宋勇
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
添加表信息
上级
858e3ad7
隐藏空白字符变更
内嵌
并排
正在显示
8 个修改的文件
包含
360 行增加
和
0 行删除
+360
-0
MysqlJdbcDataSourceChannel.java
...asource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
+51
-0
OracleDataSourceChannel.java
...atasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
+51
-0
PostgresqlDataSourceChannel.java
...e/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
+51
-0
SqlServerDataSourceChannel.java
...rce/plugin/sqlserver/jdbc/SqlServerDataSourceChannel.java
+51
-0
TidbJdbcDataSourceChannel.java
...atasource/plugin/tidb/jdbc/TidbJdbcDataSourceChannel.java
+51
-0
MysqlCDCDataSourceChannel.java
...atasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
+49
-0
DataSourceChannelExt.java
...seatunnel/datasource/plugin/api/DataSourceChannelExt.java
+7
-0
SqlServerCDCDataSourceChannel.java
...e/plugin/cdc/sqlserver/SqlServerCDCDataSourceChannel.java
+49
-0
没有找到文件。
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
浏览文件 @
4f1e6809
...
...
@@ -23,6 +23,7 @@ import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import
org.apache.seatunnel.datasource.plugin.api.model.TableField
;
import
org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
lombok.NonNull
;
...
...
@@ -242,4 +243,54 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
throw
new
DataSourcePluginException
(
"get table names failed"
,
e
);
}
}
@Override
public
Map
getTableName
(
@NonNull
String
pluginName
,
Map
<
String
,
String
>
requestParams
,
String
database
,
Map
<
String
,
String
>
options
,
String
tableName
)
{
try
(
Connection
connection
=
getConnection
(
requestParams
,
database
))
{
String
sql
=
"select * from information_schema.tables where table_name ='"
+
tableName
+
"' and table_schema ='"
+
database
+
"'"
;
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
ResultSetMetaData
metaData
=
resultSet
.
getMetaData
();
// 字段的个数
int
count
=
metaData
.
getColumnCount
();
// 创建List集合
List
<
Map
>
list
=
new
ArrayList
<>();
// 处理结果集
while
(
resultSet
.
next
())
{
// 创建Map集合
Map
map
=
new
HashMap
<>();
// 根据字段的个数, 循环
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
// 获取字段的名字
String
columnName
=
metaData
.
getColumnName
(
i
+
1
);
// 获取字段对应的值
Object
object
=
resultSet
.
getObject
(
columnName
);
// 将字段名和字段值, 存入map集合中
map
.
put
(
columnName
,
object
);
}
// 将map集合添加到List集合中
list
.
add
(
map
);
}
if
(
CollectionUtils
.
isNotEmpty
(
list
))
{
return
list
.
get
(
0
);
}
}
catch
(
SQLException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
catch
(
ClassNotFoundException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
return
null
;
}
}
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
浏览文件 @
4f1e6809
...
...
@@ -23,6 +23,7 @@ import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import
org.apache.seatunnel.datasource.plugin.api.model.TableField
;
import
org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
lombok.NonNull
;
...
...
@@ -250,4 +251,54 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
throw
new
DataSourcePluginException
(
"get table names failed"
,
e
);
}
}
@Override
public
Map
getTableName
(
@NonNull
String
pluginName
,
Map
<
String
,
String
>
requestParams
,
String
database
,
Map
<
String
,
String
>
options
,
String
tableName
)
{
try
(
Connection
connection
=
getConnection
(
requestParams
,
database
))
{
String
sql
=
"sselect * from all_tables where table_name='"
+
tableName
+
"' and OWNER ='"
+
database
+
"'"
;
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
ResultSetMetaData
metaData
=
resultSet
.
getMetaData
();
// 字段的个数
int
count
=
metaData
.
getColumnCount
();
// 创建List集合
List
<
Map
>
list
=
new
ArrayList
<>();
// 处理结果集
while
(
resultSet
.
next
())
{
// 创建Map集合
Map
map
=
new
HashMap
<>();
// 根据字段的个数, 循环
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
// 获取字段的名字
String
columnName
=
metaData
.
getColumnName
(
i
+
1
);
// 获取字段对应的值
Object
object
=
resultSet
.
getObject
(
columnName
);
// 将字段名和字段值, 存入map集合中
map
.
put
(
columnName
,
object
);
}
// 将map集合添加到List集合中
list
.
add
(
map
);
}
if
(
CollectionUtils
.
isNotEmpty
(
list
))
{
return
list
.
get
(
0
);
}
}
catch
(
SQLException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
catch
(
ClassNotFoundException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
return
null
;
}
}
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-postgresql/src/main/java/org/apache/seatunnel/datasource/plugin/postgresql/jdbc/PostgresqlDataSourceChannel.java
浏览文件 @
4f1e6809
...
...
@@ -23,6 +23,7 @@ import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import
org.apache.seatunnel.datasource.plugin.api.model.TableField
;
import
org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
lombok.NonNull
;
...
...
@@ -232,4 +233,54 @@ public class PostgresqlDataSourceChannel implements DataSourceChannelExt {
throw
new
DataSourcePluginException
(
"get table names failed"
,
e
);
}
}
@Override
public
Map
getTableName
(
@NonNull
String
pluginName
,
Map
<
String
,
String
>
requestParams
,
String
database
,
Map
<
String
,
String
>
options
,
String
tableName
)
{
try
(
Connection
connection
=
getConnection
(
requestParams
,
database
))
{
String
sql
=
"select * from information_schema.tables where table_name ='"
+
tableName
+
"' and table_schema ='"
+
database
+
"'"
;
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
ResultSetMetaData
metaData
=
resultSet
.
getMetaData
();
// 字段的个数
int
count
=
metaData
.
getColumnCount
();
// 创建List集合
List
<
Map
>
list
=
new
ArrayList
<>();
// 处理结果集
while
(
resultSet
.
next
())
{
// 创建Map集合
Map
map
=
new
HashMap
<>();
// 根据字段的个数, 循环
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
// 获取字段的名字
String
columnName
=
metaData
.
getColumnName
(
i
+
1
);
// 获取字段对应的值
Object
object
=
resultSet
.
getObject
(
columnName
);
// 将字段名和字段值, 存入map集合中
map
.
put
(
columnName
,
object
);
}
// 将map集合添加到List集合中
list
.
add
(
map
);
}
if
(
CollectionUtils
.
isNotEmpty
(
list
))
{
return
list
.
get
(
0
);
}
}
catch
(
SQLException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
catch
(
ClassNotFoundException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
return
null
;
}
}
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-sqlserver/src/main/java/org/apache/seatunnel/datasource/plugin/sqlserver/jdbc/SqlServerDataSourceChannel.java
浏览文件 @
4f1e6809
...
...
@@ -23,6 +23,7 @@ import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import
org.apache.seatunnel.datasource.plugin.api.model.TableField
;
import
org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
lombok.NonNull
;
...
...
@@ -253,4 +254,54 @@ public class SqlServerDataSourceChannel implements DataSourceChannelExt {
throw
new
DataSourcePluginException
(
"get table names failed"
,
e
);
}
}
@Override
public
Map
getTableName
(
@NonNull
String
pluginName
,
Map
<
String
,
String
>
requestParams
,
String
database
,
Map
<
String
,
String
>
options
,
String
tableName
)
{
try
(
Connection
connection
=
getConnection
(
requestParams
,
database
))
{
String
sql
=
"SELECT * FROM sys.all_objects WHERE object_id = OBJECT_ID(N'["
+
database
+
"].[dbo].["
+
tableName
+
"]') AND type IN ('U')"
;
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
ResultSetMetaData
metaData
=
resultSet
.
getMetaData
();
// 字段的个数
int
count
=
metaData
.
getColumnCount
();
// 创建List集合
List
<
Map
>
list
=
new
ArrayList
<>();
// 处理结果集
while
(
resultSet
.
next
())
{
// 创建Map集合
Map
map
=
new
HashMap
<>();
// 根据字段的个数, 循环
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
// 获取字段的名字
String
columnName
=
metaData
.
getColumnName
(
i
+
1
);
// 获取字段对应的值
Object
object
=
resultSet
.
getObject
(
columnName
);
// 将字段名和字段值, 存入map集合中
map
.
put
(
columnName
,
object
);
}
// 将map集合添加到List集合中
list
.
add
(
map
);
}
if
(
CollectionUtils
.
isNotEmpty
(
list
))
{
return
list
.
get
(
0
);
}
}
catch
(
SQLException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
catch
(
ClassNotFoundException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
return
null
;
}
}
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-tidb/src/main/java/org/apache/seatunnel/datasource/plugin/tidb/jdbc/TidbJdbcDataSourceChannel.java
浏览文件 @
4f1e6809
...
...
@@ -23,6 +23,7 @@ import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import
org.apache.seatunnel.datasource.plugin.api.model.TableField
;
import
org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
lombok.NonNull
;
...
...
@@ -238,4 +239,54 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt {
throw
new
DataSourcePluginException
(
"get table names failed"
,
e
);
}
}
@Override
public
Map
getTableName
(
@NonNull
String
pluginName
,
Map
<
String
,
String
>
requestParams
,
String
database
,
Map
<
String
,
String
>
options
,
String
tableName
)
{
try
(
Connection
connection
=
getConnection
(
requestParams
,
database
))
{
String
sql
=
"select * from information_schema.tables where table_name ='"
+
tableName
+
"' and table_schema ='"
+
database
+
"'"
;
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
ResultSetMetaData
metaData
=
resultSet
.
getMetaData
();
// 字段的个数
int
count
=
metaData
.
getColumnCount
();
// 创建List集合
List
<
Map
>
list
=
new
ArrayList
<>();
// 处理结果集
while
(
resultSet
.
next
())
{
// 创建Map集合
Map
map
=
new
HashMap
<>();
// 根据字段的个数, 循环
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
// 获取字段的名字
String
columnName
=
metaData
.
getColumnName
(
i
+
1
);
// 获取字段对应的值
Object
object
=
resultSet
.
getObject
(
columnName
);
// 将字段名和字段值, 存入map集合中
map
.
put
(
columnName
,
object
);
}
// 将map集合添加到List集合中
list
.
add
(
map
);
}
if
(
CollectionUtils
.
isNotEmpty
(
list
))
{
return
list
.
get
(
0
);
}
}
catch
(
SQLException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
catch
(
ClassNotFoundException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
return
null
;
}
}
seatunnel-datasource/seatunnel-datasource-plugins/datasource-mysql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
浏览文件 @
4f1e6809
...
...
@@ -22,6 +22,7 @@ import org.apache.seatunnel.datasource.plugin.api.DataSourceChannelExt;
import
org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException
;
import
org.apache.seatunnel.datasource.plugin.api.model.TableField
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
com.google.common.collect.Sets
;
...
...
@@ -297,4 +298,52 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
throw
new
DataSourcePluginException
(
"get table names failed"
,
e
);
}
}
@Override
public
Map
getTableName
(
@NonNull
String
pluginName
,
Map
<
String
,
String
>
requestParams
,
String
database
,
Map
<
String
,
String
>
options
,
String
tableName
)
{
try
(
Connection
connection
=
init
(
requestParams
);
)
{
String
sql
=
"select * from information_schema.tables where table_name ='"
+
tableName
+
"' and table_schema ='"
+
database
+
"'"
;
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
ResultSetMetaData
metaData
=
resultSet
.
getMetaData
();
// 字段的个数
int
count
=
metaData
.
getColumnCount
();
// 创建List集合
List
<
Map
>
list
=
new
ArrayList
<>();
// 处理结果集
while
(
resultSet
.
next
())
{
// 创建Map集合
Map
map
=
new
HashMap
<>();
// 根据字段的个数, 循环
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
// 获取字段的名字
String
columnName
=
metaData
.
getColumnName
(
i
+
1
);
// 获取字段对应的值
Object
object
=
resultSet
.
getObject
(
columnName
);
// 将字段名和字段值, 存入map集合中
map
.
put
(
columnName
,
object
);
}
// 将map集合添加到List集合中
list
.
add
(
map
);
}
if
(
CollectionUtils
.
isNotEmpty
(
list
))
{
return
list
.
get
(
0
);
}
}
catch
(
SQLException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
return
null
;
}
}
seatunnel-datasource/seatunnel-datasource-plugins/datasource-plugins-api/src/main/java/org/apache/seatunnel/datasource/plugin/api/DataSourceChannelExt.java
浏览文件 @
4f1e6809
...
...
@@ -12,4 +12,11 @@ public interface DataSourceChannelExt extends DataSourceChannel {
Map
<
String
,
String
>
requestParams
,
String
database
,
Map
<
String
,
String
>
options
);
Map
getTableName
(
@NonNull
String
pluginName
,
Map
<
String
,
String
>
requestParams
,
String
database
,
Map
<
String
,
String
>
options
,
String
tableName
);
}
seatunnel-datasource/seatunnel-datasource-plugins/datasource-sqlserver-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/sqlserver/SqlServerCDCDataSourceChannel.java
浏览文件 @
4f1e6809
...
...
@@ -22,6 +22,7 @@ import org.apache.seatunnel.datasource.plugin.api.DataSourceChannelExt;
import
org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException
;
import
org.apache.seatunnel.datasource.plugin.api.model.TableField
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
import
org.apache.commons.lang3.tuple.Pair
;
...
...
@@ -265,4 +266,52 @@ public class SqlServerCDCDataSourceChannel implements DataSourceChannelExt {
throw
new
DataSourcePluginException
(
"get table names failed"
,
e
);
}
}
@Override
public
Map
getTableName
(
@NonNull
String
pluginName
,
Map
<
String
,
String
>
requestParams
,
String
database
,
Map
<
String
,
String
>
options
,
String
tableName
)
{
try
(
Connection
connection
=
getConnection
(
pluginName
,
requestParams
))
{
String
sql
=
"SELECT * FROM sys.all_objects WHERE object_id = OBJECT_ID(N'["
+
database
+
"].[dbo].["
+
tableName
+
"]') AND type IN ('U')"
;
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
ResultSetMetaData
metaData
=
resultSet
.
getMetaData
();
// 字段的个数
int
count
=
metaData
.
getColumnCount
();
// 创建List集合
List
<
Map
>
list
=
new
ArrayList
<>();
// 处理结果集
while
(
resultSet
.
next
())
{
// 创建Map集合
Map
map
=
new
HashMap
<>();
// 根据字段的个数, 循环
for
(
int
i
=
0
;
i
<
count
;
i
++)
{
// 获取字段的名字
String
columnName
=
metaData
.
getColumnName
(
i
+
1
);
// 获取字段对应的值
Object
object
=
resultSet
.
getObject
(
columnName
);
// 将字段名和字段值, 存入map集合中
map
.
put
(
columnName
,
object
);
}
// 将map集合添加到List集合中
list
.
add
(
map
);
}
if
(
CollectionUtils
.
isNotEmpty
(
list
))
{
return
list
.
get
(
0
);
}
}
catch
(
SQLException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
return
null
;
}
}
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论