Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
S
seatunnel-web
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
宋勇
seatunnel-web
Commits
20e4b304
提交
20e4b304
authored
8月 21, 2023
作者:
宋勇
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
修改表字段内容
上级
d542c81e
隐藏空白字符变更
内嵌
并排
正在显示
4 个修改的文件
包含
75 行增加
和
51 行删除
+75
-51
MysqlJdbcDataSourceChannel.java
...asource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
+3
-3
OracleDataSourceChannel.java
...atasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
+18
-18
TidbJdbcDataSourceChannel.java
...atasource/plugin/tidb/jdbc/TidbJdbcDataSourceChannel.java
+30
-20
MysqlCDCDataSourceChannel.java
...atasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
+24
-10
没有找到文件。
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-mysql/src/main/java/org/apache/seatunnel/datasource/plugin/mysql/jdbc/MysqlJdbcDataSourceChannel.java
浏览文件 @
20e4b304
...
@@ -114,8 +114,8 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
...
@@ -114,8 +114,8 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
String
sql
=
String
sql
=
"SELECT * FROM information_schema.COLUMNS"
"SELECT * FROM information_schema.COLUMNS
"
+
"WHERE "
+
"
WHERE "
+
"TABLE_NAME = '"
+
"TABLE_NAME = '"
+
table
+
table
+
"' and TABLE_SCHEMA='"
+
"' and TABLE_SCHEMA='"
...
@@ -141,7 +141,7 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
...
@@ -141,7 +141,7 @@ public class MysqlJdbcDataSourceChannel implements DataSourceChannelExt {
}
}
tableField
.
setName
(
columnName
);
tableField
.
setName
(
columnName
);
tableField
.
setComment
(
resultSet
.
getString
(
"
REMARKS
"
));
tableField
.
setComment
(
resultSet
.
getString
(
"
COLUMN_COMMENT
"
));
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
tableFields
.
add
(
tableField
);
tableFields
.
add
(
tableField
);
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
浏览文件 @
20e4b304
...
@@ -112,27 +112,27 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
...
@@ -112,27 +112,27 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
String
sql
=
String
sql
=
"SELECT
\n
"
"SELECT
"
+
"
\t
t.TABLE_NAME,\n"
+
"
t.TABLE_NAME,\n"
+
"
\t
t.COLUMN_NAME,\n"
+
"
t.COLUMN_NAME,\n"
+
"
\t
t.COMMENTS,\n"
+
"
t.COMMENTS,\n"
+
"
\t
t1.DATA_TYPE,\n"
+
"
t1.DATA_TYPE,\n"
+
"
\t
t1.DATA_LENGTH VAR_LEN,\n"
+
"
t1.DATA_LENGTH VAR_LEN,\n"
+
"
\t
t1.DATA_PRECISION LEN,\n"
+
"
t1.DATA_PRECISION LEN,\n"
+
"
\t
DATA_SCALE ,\n"
+
"
DATA_SCALE ,\n"
+
"
\t
CASE NULLABLE\n"
+
"
CASE NULLABLE\n"
+
"
\t
WHEN 'N' THEN\n"
+
"
WHEN 'N' THEN\n"
+
"
\t\t
'true'\n"
+
"
'true'\n"
+
"
\t
ELSE\n"
+
"
ELSE\n"
+
"
\t\t
'false'\n"
+
"
'false'\n"
+
"END IS_NULLABLE\n"
+
"END IS_NULLABLE\n"
+
"\n"
+
"\n"
+
"FROM\n"
+
"FROM\n"
+
"
\t
all_col_comments t\n"
+
"
all_col_comments t\n"
+
"
\t
INNER JOIN all_tab_cols t1 ON t1.TABLE_NAME = t.table_name \n"
+
"
INNER JOIN all_tab_cols t1 ON t1.TABLE_NAME = t.table_name \n"
+
"
\t
AND t1.COLUMN_NAME = t.column_name \n"
+
"
AND t1.COLUMN_NAME = t.column_name \n"
+
"WHERE "
+
"
WHERE "
+
"
\t
t.table_name = '"
+
"
t.table_name = '"
+
table
+
table
+
"' and t.OWNER='"
+
"' and t.OWNER='"
+
database
+
database
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-tidb/src/main/java/org/apache/seatunnel/datasource/plugin/tidb/jdbc/TidbJdbcDataSourceChannel.java
浏览文件 @
20e4b304
...
@@ -112,28 +112,38 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt {
...
@@ -112,28 +112,38 @@ public class TidbJdbcDataSourceChannel implements DataSourceChannelExt {
try
(
Connection
connection
=
getConnection
(
requestParams
,
database
))
{
try
(
Connection
connection
=
getConnection
(
requestParams
,
database
))
{
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
String
primaryKey
=
getPrimaryKey
(
metaData
,
database
,
table
);
try
(
ResultSet
resultSet
=
metaData
.
getColumns
(
database
,
null
,
table
,
null
))
{
while
(
resultSet
.
next
())
{
String
sql
=
TableField
tableField
=
new
TableField
();
"SELECT * FROM information_schema.COLUMNS "
String
columnName
=
resultSet
.
getString
(
"COLUMN_NAME"
);
+
" WHERE "
tableField
.
setPrimaryKey
(
false
);
+
"TABLE_NAME = '"
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
+
table
tableField
.
setPrimaryKey
(
true
);
+
"' and TABLE_SCHEMA='"
}
+
database
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
+
"'"
;
if
(
typeList
.
contains
(
tableField
.
getType
().
toLowerCase
()))
{
Statement
statement
=
connection
.
createStatement
();
tableField
.
setLen
(
resultSet
.
getString
(
"CHARACTER_MAXIMUM_LENGTH"
));
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
}
else
{
while
(
resultSet
.
next
())
{
tableField
.
setLen
(
resultSet
.
getString
(
"NUMERIC_PRECISION"
));
TableField
tableField
=
new
TableField
();
tableField
.
setScale
(
resultSet
.
getString
(
"NUMERIC_SCALE"
));
String
columnName
=
resultSet
.
getString
(
"COLUMN_NAME"
);
}
tableField
.
setPrimaryKey
(
false
);
tableField
.
setName
(
columnName
);
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
tableField
.
setPrimaryKey
(
true
);
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
tableFields
.
add
(
tableField
);
}
}
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
if
(
typeList
.
contains
(
tableField
.
getType
().
toLowerCase
()))
{
tableField
.
setLen
(
resultSet
.
getString
(
"CHARACTER_MAXIMUM_LENGTH"
));
}
else
{
tableField
.
setLen
(
resultSet
.
getString
(
"NUMERIC_PRECISION"
));
tableField
.
setScale
(
resultSet
.
getString
(
"NUMERIC_SCALE"
));
}
tableField
.
setName
(
columnName
);
tableField
.
setComment
(
resultSet
.
getString
(
"COLUMN_COMMENT"
));
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
tableFields
.
add
(
tableField
);
}
}
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
}
catch
(
ClassNotFoundException
|
SQLException
e
)
{
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
throw
new
DataSourcePluginException
(
"get table fields failed"
,
e
);
}
}
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-mysql-cdc/src/main/java/org/apache/seatunnel/datasource/plugin/cdc/mysql/MysqlCDCDataSourceChannel.java
浏览文件 @
20e4b304
...
@@ -28,14 +28,10 @@ import com.google.common.collect.Sets;
...
@@ -28,14 +28,10 @@ import com.google.common.collect.Sets;
import
lombok.NonNull
;
import
lombok.NonNull
;
import
java.sql.*
;
import
java.sql.*
;
import
java.util.ArrayList
;
import
java.util.*
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Set
;
public
class
MysqlCDCDataSourceChannel
implements
DataSourceChannelExt
{
public
class
MysqlCDCDataSourceChannel
implements
DataSourceChannelExt
{
List
<
String
>
typeList
=
Arrays
.
asList
(
"varchar"
,
"char"
,
"json"
);
public
static
final
Set
<
String
>
MYSQL_SYSTEM_DATABASES
=
public
static
final
Set
<
String
>
MYSQL_SYSTEM_DATABASES
=
Sets
.
newHashSet
(
"information_schema"
,
"mysql"
,
"performance_schema"
,
"sys"
);
Sets
.
newHashSet
(
"information_schema"
,
"mysql"
,
"performance_schema"
,
"sys"
);
...
@@ -198,7 +194,17 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
...
@@ -198,7 +194,17 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
try
(
Connection
connection
=
init
(
requestParams
);
)
{
try
(
Connection
connection
=
init
(
requestParams
);
)
{
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
DatabaseMetaData
metaData
=
connection
.
getMetaData
();
String
primaryKey
=
getPrimaryKey
(
metaData
,
dbName
,
tableName
);
String
primaryKey
=
getPrimaryKey
(
metaData
,
dbName
,
tableName
);
ResultSet
resultSet
=
metaData
.
getColumns
(
dbName
,
null
,
tableName
,
null
);
String
sql
=
"SELECT * FROM information_schema.COLUMNS "
+
" WHERE "
+
"TABLE_NAME = '"
+
tableName
+
"' and TABLE_SCHEMA='"
+
dbName
+
"'"
;
Statement
statement
=
connection
.
createStatement
();
ResultSet
resultSet
=
statement
.
executeQuery
(
sql
);
while
(
resultSet
.
next
())
{
while
(
resultSet
.
next
())
{
TableField
tableField
=
new
TableField
();
TableField
tableField
=
new
TableField
();
String
columnName
=
resultSet
.
getString
(
"COLUMN_NAME"
);
String
columnName
=
resultSet
.
getString
(
"COLUMN_NAME"
);
...
@@ -206,12 +212,20 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
...
@@ -206,12 +212,20 @@ public class MysqlCDCDataSourceChannel implements DataSourceChannelExt {
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
if
(
StringUtils
.
isNotBlank
(
primaryKey
)
&&
primaryKey
.
equals
(
columnName
))
{
tableField
.
setPrimaryKey
(
true
);
tableField
.
setPrimaryKey
(
true
);
}
}
tableField
.
setType
(
resultSet
.
getString
(
"DATA_TYPE"
));
if
(
typeList
.
contains
(
tableField
.
getType
().
toLowerCase
()))
{
if
(
tableField
.
getType
().
toLowerCase
().
indexOf
(
"text"
)
<
0
)
{
tableField
.
setLen
(
resultSet
.
getString
(
"CHARACTER_MAXIMUM_LENGTH"
));
}
}
else
{
tableField
.
setLen
(
resultSet
.
getString
(
"NUMERIC_PRECISION"
));
tableField
.
setScale
(
resultSet
.
getString
(
"NUMERIC_SCALE"
));
}
tableField
.
setName
(
columnName
);
tableField
.
setName
(
columnName
);
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
boolean
isNullable
=
convertToBoolean
(
nullable
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
tableField
.
setNullable
(
isNullable
);
tableFields
.
add
(
tableField
);
tableFields
.
add
(
tableField
);
}
}
}
catch
(
SQLException
e
)
{
}
catch
(
SQLException
e
)
{
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论