Skip to content
项目
群组
代码片段
帮助
当前项目
正在载入...
登录 / 注册
切换导航面板
S
seatunnel-web
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
图表
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
日程
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
图像
聊天
创建新问题
作业
提交
问题看板
Open sidebar
宋勇
seatunnel-web
Commits
51e95d4a
提交
51e95d4a
authored
4月 21, 2024
作者:
宋勇
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/master'
上级
d2d56984
cfdde250
隐藏空白字符变更
内嵌
并排
正在显示
8 个修改的文件
包含
278 行增加
和
160 行删除
+278
-160
pom.xml
.../seatunnel-datasource-plugins/datasource-influxdb/pom.xml
+5
-1
InfluxdbDataSourceChannel.java
...datasource/plugin/influxdb/InfluxdbDataSourceChannel.java
+95
-25
InfluxdbOptionRule.java
...tunnel/datasource/plugin/influxdb/InfluxdbOptionRule.java
+14
-2
InfluxdbsClient.java
...el/datasource/plugin/influxdb/client/InfluxdbsClient.java
+98
-90
TableName.java
...eatunnel/datasource/plugin/influxdb/client/TableName.java
+4
-7
pom.xml
...atunnel-datasource-plugins/datasource-jdbc-oracle/pom.xml
+12
-7
OracleDataSourceChannel.java
...atasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
+47
-26
HadoopS3AConfigurationTest.java
...nnel/datasource/plugin/s3/HadoopS3AConfigurationTest.java
+3
-2
没有找到文件。
seatunnel-datasource/seatunnel-datasource-plugins/datasource-influxdb/pom.xml
浏览文件 @
51e95d4a
...
...
@@ -50,12 +50,16 @@
<scope>
provided
</scope>
</dependency>
<!-- driver -->
<dependency>
<groupId>
com.influxdb
</groupId>
<artifactId>
influxdb-client-java
</artifactId>
<version>
3.1.0
</version>
</dependency>
<dependency>
<groupId>
org.influxdb
</groupId>
<artifactId>
influxdb-java
</artifactId>
<version>
2.23
</version>
</dependency>
<dependency>
<groupId>
com.alibaba
</groupId>
<artifactId>
fastjson
</artifactId>
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-influxdb/src/main/java/org/apache/seatunnel/datasource/plugin/influxdb/InfluxdbDataSourceChannel.java
浏览文件 @
51e95d4a
...
...
@@ -17,7 +17,6 @@
package
org
.
apache
.
seatunnel
.
datasource
.
plugin
.
influxdb
;
import
org.apache.seatunnel.shade.com.typesafe.config.Config
;
import
org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
;
import
org.apache.seatunnel.api.configuration.util.OptionRule
;
...
...
@@ -28,10 +27,14 @@ import org.apache.seatunnel.datasource.plugin.influxdb.client.InfluxdbsClient;
import
org.apache.commons.lang3.StringUtils
;
import
org.influxdb.InfluxDB
;
import
org.influxdb.dto.Query
;
import
org.influxdb.dto.QueryResult
;
import
lombok.NonNull
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.
Linked
HashMap
;
import
java.util.List
;
import
java.util.Map
;
...
...
@@ -69,10 +72,20 @@ public class InfluxdbDataSourceChannel implements DataSourceChannel {
String
database
,
Map
<
String
,
String
>
option
)
{
try
(
InfluxdbsClient
client
=
InfluxdbsClient
.
createInstance
(
ConfigFactory
.
parseMap
(
requestParams
)))
{
return
client
.
getTables
(
ConfigFactory
.
parseMap
(
requestParams
));
try
{
InfluxDB
instance
=
InfluxdbsClient
.
createInstance
(
ConfigFactory
.
parseMap
(
requestParams
));
instance
.
setDatabase
(
database
);
QueryResult
showMeasurements
=
instance
.
query
(
new
Query
(
"show measurements"
));
List
<
QueryResult
.
Result
>
results
=
showMeasurements
.
getResults
();
List
<
String
>
table
=
new
ArrayList
<>();
for
(
QueryResult
.
Result
result
:
results
)
{
List
<
List
<
Object
>>
values
=
result
.
getSeries
().
get
(
0
).
getValues
();
for
(
List
<
Object
>
value
:
values
)
{
table
.
add
(
String
.
valueOf
(
value
.
get
(
0
)));
}
}
return
table
;
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
e
);
}
...
...
@@ -81,16 +94,42 @@ public class InfluxdbDataSourceChannel implements DataSourceChannel {
@Override
public
List
<
String
>
getDatabases
(
@NonNull
String
pluginName
,
@NonNull
Map
<
String
,
String
>
requestParams
)
{
return
DEFAULT_DATABASES
;
InfluxDB
instance
=
InfluxdbsClient
.
createInstance
(
ConfigFactory
.
parseMap
(
requestParams
));
boolean
isConnected
=
false
;
QueryResult
showDatabases
=
new
QueryResult
();
try
{
// 尝试执行查询,如果成功,则连接成功
showDatabases
=
instance
.
query
(
new
Query
(
"SHOW DATABASES"
));
isConnected
=
true
;
}
catch
(
Exception
e
)
{
// 如果有异常,则连接失败
e
.
printStackTrace
();
}
List
<
QueryResult
.
Result
>
results
=
showDatabases
.
getResults
();
List
<
String
>
database
=
new
ArrayList
<>();
for
(
QueryResult
.
Result
result
:
results
)
{
String
name
=
result
.
getSeries
().
get
(
0
).
getName
();
database
.
add
(
name
);
}
return
database
;
}
@Override
public
boolean
checkDataSourceConnectivity
(
@NonNull
String
pluginName
,
@NonNull
Map
<
String
,
String
>
requestParams
)
{
try
(
InfluxdbsClient
client
=
InfluxdbsClient
.
createInstance
(
ConfigFactory
.
parseMap
(
requestParams
)))
{
return
true
;
try
{
InfluxDB
instance
=
InfluxdbsClient
.
createInstance
(
ConfigFactory
.
parseMap
(
requestParams
));
try
{
// 尝试执行查询,如果成功,则连接成功
instance
.
query
(
new
Query
(
"SHOW DATABASES"
));
return
true
;
}
catch
(
Exception
e
)
{
// 如果有异常,则连接失败
e
.
printStackTrace
();
}
return
false
;
}
catch
(
Throwable
e
)
{
throw
new
DataSourcePluginException
(
"check InfluxDB connectivity failed, "
+
e
.
getMessage
(),
e
);
...
...
@@ -104,15 +143,28 @@ public class InfluxdbDataSourceChannel implements DataSourceChannel {
String
database
,
String
table
)
{
// databaseCheck(database);
try
(
InfluxdbsClient
client
=
InfluxdbsClient
.
createInstance
(
ConfigFactory
.
parseMap
(
requestParams
)))
{
Config
config
=
ConfigFactory
.
parseMap
(
requestParams
);
Map
<
String
,
String
>
fieldTypeMapping
=
client
.
getFieldTypeMapping
(
config
,
table
);
try
{
InfluxDB
instance
=
InfluxdbsClient
.
createInstance
(
ConfigFactory
.
parseMap
(
requestParams
));
instance
.
setDatabase
(
database
);
QueryResult
showDatabases
=
new
QueryResult
();
try
{
// 尝试执行查询,如果成功,则连接成功
showDatabases
=
instance
.
query
(
new
Query
(
"select * from "
+
table
,
database
));
}
catch
(
Exception
e
)
{
// 如果有异常,则连接失败
e
.
printStackTrace
();
}
List
<
QueryResult
.
Result
>
results
=
showDatabases
.
getResults
();
List
<
TableField
>
fields
=
new
ArrayList
<>();
fieldTypeMapping
.
forEach
(
(
fieldName
,
fieldType
)
->
fields
.
add
(
convertToTableField
(
fieldName
,
fieldType
)));
for
(
QueryResult
.
Result
result
:
results
)
{
List
<
String
>
columns
=
result
.
getSeries
().
get
(
0
).
getColumns
();
for
(
String
column
:
columns
)
{
TableField
tableField
=
new
TableField
();
tableField
.
setName
(
column
);
fields
.
add
(
tableField
);
}
}
return
fields
;
}
catch
(
Exception
ex
)
{
throw
new
DataSourcePluginException
(
"Get table fields failed"
,
ex
);
...
...
@@ -125,12 +177,30 @@ public class InfluxdbDataSourceChannel implements DataSourceChannel {
@NonNull
Map
<
String
,
String
>
requestParams
,
String
database
,
List
<
String
>
tables
)
{
// databaseCheck(database);
Map
<
String
,
List
<
TableField
>>
tableFields
=
new
HashMap
<>();
tables
.
forEach
(
table
->
tableFields
.
put
(
table
,
getTableFields
(
pluginName
,
requestParams
,
database
,
table
)));
InfluxDB
instance
=
InfluxdbsClient
.
createInstance
(
ConfigFactory
.
parseMap
(
requestParams
));
instance
.
setDatabase
(
database
);
Map
<
String
,
List
<
TableField
>>
tableFields
=
new
LinkedHashMap
<>();
for
(
String
table
:
tables
)
{
QueryResult
showDatabases
=
new
QueryResult
();
try
{
// 尝试执行查询,如果成功,则连接成功
showDatabases
=
instance
.
query
(
new
Query
(
"select * from "
+
table
,
database
));
}
catch
(
Exception
e
)
{
// 如果有异常,则连接失败
e
.
printStackTrace
();
}
List
<
QueryResult
.
Result
>
results
=
showDatabases
.
getResults
();
List
<
TableField
>
fields
=
new
ArrayList
<>();
for
(
QueryResult
.
Result
result
:
results
)
{
List
<
String
>
columns
=
result
.
getSeries
().
get
(
0
).
getColumns
();
for
(
String
column
:
columns
)
{
TableField
tableField
=
new
TableField
();
tableField
.
setName
(
column
);
fields
.
add
(
tableField
);
}
}
tableFields
.
put
(
table
,
fields
);
}
return
tableFields
;
}
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-influxdb/src/main/java/org/apache/seatunnel/datasource/plugin/influxdb/InfluxdbOptionRule.java
浏览文件 @
51e95d4a
...
...
@@ -32,6 +32,18 @@ public class InfluxdbOptionRule {
.
withDescription
(
"Influxdb Url address, the format is http://host:port, allowing only hosts to be specified. Such as http://192.168.1.124:8086 "
);
public
static
final
Option
<
String
>
USERNAME
=
Options
.
key
(
"username"
)
.
stringType
()
.
noDefaultValue
()
.
withDescription
(
"Influxdb username "
);
public
static
final
Option
<
String
>
PASSWORD
=
Options
.
key
(
"password"
)
.
stringType
()
.
noDefaultValue
()
.
withDescription
(
"Influxdb password"
);
public
static
final
Option
<
String
>
TOKEN
=
Options
.
key
(
"token"
).
stringType
().
noDefaultValue
().
withDescription
(
"Influxdb token "
);
...
...
@@ -48,10 +60,10 @@ public class InfluxdbOptionRule {
.
withDescription
(
"Influxdb hour(Unit: Hour, negative)"
);
public
static
OptionRule
optionRule
()
{
return
OptionRule
.
builder
().
required
(
URL
).
optional
(
TOKEN
,
BUCKET
,
ORG
,
HOUR
).
build
();
return
OptionRule
.
builder
().
required
(
URL
).
optional
(
USERNAME
,
PASSWORD
,
HOUR
).
build
();
}
public
static
OptionRule
metadataRule
()
{
return
OptionRule
.
builder
().
required
(
TOKEN
).
build
();
return
OptionRule
.
builder
().
required
(
USERNAME
).
build
();
}
}
seatunnel-datasource/seatunnel-datasource-plugins/datasource-influxdb/src/main/java/org/apache/seatunnel/datasource/plugin/influxdb/client/InfluxdbsClient.java
浏览文件 @
51e95d4a
...
...
@@ -5,129 +5,137 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import
org.apache.seatunnel.datasource.plugin.influxdb.InfluxdbOptionRule
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.influxdb.InfluxDB
;
import
org.influxdb.InfluxDBFactory
;
import
com.influxdb.client.InfluxDBClient
;
import
com.influxdb.client.InfluxDBClientFactory
;
import
com.influxdb.query.FluxColumn
;
import
com.influxdb.query.FluxTable
;
import
lombok.extern.slf4j.Slf4j
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Optional
;
import
java.util.stream.Collectors
;
@Slf4j
public
class
InfluxdbsClient
implements
AutoCloseable
{
private
final
InfluxDBClient
influxDBClient
;
// private
InfluxDBClient influxDBClient;
private
static
final
ObjectMapper
OBJECT_MAPPER
=
new
ObjectMapper
();
private
InfluxdbsClient
(
InfluxDBClient
influxDBClient
)
{
this
.
influxDBClient
=
influxDBClient
;
}
//
private InfluxdbsClient(InfluxDBClient influxDBClient) {
//
this.influxDBClient = influxDBClient;
//
}
@Override
public
void
close
()
throws
Exception
{
try
{
influxDBClient
.
close
();
//
influxDBClient.close();
}
catch
(
Exception
e
)
{
log
.
warn
(
"close influxdb connection error"
,
e
);
}
}
public
List
<
String
>
getTables
(
Config
pluginConfig
)
{
String
org
=
Optional
.
of
(
pluginConfig
.
getString
(
InfluxdbOptionRule
.
ORG
.
key
())).
orElse
(
""
);
String
sj
=
Optional
.
of
(
pluginConfig
.
getString
(
InfluxdbOptionRule
.
HOUR
.
key
())).
orElse
(
"-1h"
);
String
query
=
"from(bucket: \"primary\") |> range(start: "
+
sj
+
") "
;
List
<
TableName
>
tables
=
influxDBClient
.
getQueryApi
().
query
(
query
,
org
,
TableName
.
class
);
return
tables
.
stream
()
.
map
(
TableName:
:
get_measurement
)
.
collect
(
Collectors
.
toSet
())
.
stream
()
.
collect
(
Collectors
.
toList
());
}
public
static
InfluxdbsClient
createInstance
(
Config
pluginConfig
)
{
// public List<String> getTables(Config pluginConfig) {
// String org =
// Optional.of(pluginConfig.getString(InfluxdbOptionRule.ORG.key())).orElse("");
// String sj =
//
// Optional.of(pluginConfig.getString(InfluxdbOptionRule.HOUR.key())).orElse("-1h");
// String query = "from(bucket: \"primary\") |> range(start: " + sj + ") ";
// List<TableName> tables = influxDBClient.getQueryApi().query(query, org,
// TableName.class);
//
// return tables.stream()
// .map(TableName::get_measurement)
// .collect(Collectors.toSet())
// .stream()
// .collect(Collectors.toList());
// }
public
static
InfluxDB
createInstance
(
Config
pluginConfig
)
{
try
{
Optional
<
String
>
url
=
Optional
.
empty
();
Optional
<
String
>
token
=
Optional
.
empty
();
Optional
<
String
>
bucket
=
Optional
.
empty
();
Optional
<
String
>
username
=
Optional
.
empty
();
Optional
<
String
>
password
=
Optional
.
empty
();
Optional
<
String
>
org
=
Optional
.
empty
();
Optional
<
String
>
bucket
=
Optional
.
empty
();
if
(
pluginConfig
.
hasPath
(
InfluxdbOptionRule
.
URL
.
key
()))
{
url
=
Optional
.
of
(
pluginConfig
.
getString
(
InfluxdbOptionRule
.
URL
.
key
()));
}
if
(
pluginConfig
.
hasPath
(
InfluxdbOptionRule
.
TOKEN
.
key
()))
{
token
=
Optional
.
of
(
pluginConfig
.
getString
(
InfluxdbOptionRule
.
TOKEN
.
key
()));
if
(
pluginConfig
.
hasPath
(
InfluxdbOptionRule
.
USERNAME
.
key
()))
{
username
=
Optional
.
of
(
pluginConfig
.
getString
(
InfluxdbOptionRule
.
USERNAME
.
key
()));
}
if
(
pluginConfig
.
hasPath
(
InfluxdbOptionRule
.
BUCKET
.
key
()))
{
bucket
=
Optional
.
of
(
pluginConfig
.
getString
(
InfluxdbOptionRule
.
BUCKET
.
key
()));
if
(
pluginConfig
.
hasPath
(
InfluxdbOptionRule
.
PASSWORD
.
key
()))
{
password
=
Optional
.
of
(
pluginConfig
.
getString
(
InfluxdbOptionRule
.
PASSWORD
.
key
()));
}
if
(
pluginConfig
.
hasPath
(
InfluxdbOptionRule
.
ORG
.
key
()))
{
org
=
Optional
.
of
(
pluginConfig
.
getString
(
InfluxdbOptionRule
.
ORG
.
key
()));
}
return
createInstance
(
url
,
token
,
bucket
,
org
);
if
(
pluginConfig
.
hasPath
(
InfluxdbOptionRule
.
BUCKET
.
key
()))
{
bucket
=
Optional
.
of
(
pluginConfig
.
getString
(
InfluxdbOptionRule
.
BUCKET
.
key
()));
}
try
{
InfluxDB
connect
=
InfluxDBFactory
.
connect
(
url
.
get
(),
username
.
get
(),
password
.
get
());
return
connect
;
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
e
);
}
// return createInstance(url, token, bucket, org);
}
catch
(
Exception
e
)
{
throw
new
RuntimeException
(
"Create EsRestClient failed"
,
e
);
}
}
public
static
InfluxdbsClient
createInstance
(
Optional
<
String
>
url
,
Optional
<
String
>
token
,
Optional
<
String
>
bucket
,
Optional
<
String
>
org
)
{
InfluxDBClient
influxDBClient1
=
getRestClientBuilder
(
url
,
token
,
bucket
,
org
);
return
new
InfluxdbsClient
(
influxDBClient1
);
}
private
static
InfluxDBClient
getRestClientBuilder
(
Optional
<
String
>
url
,
Optional
<
String
>
token
,
Optional
<
String
>
bucket
,
Optional
<
String
>
org
)
{
if
(
url
.
isPresent
()
&&
token
.
isPresent
())
{
return
InfluxDBClientFactory
.
create
(
url
.
get
(),
token
.
get
().
toCharArray
());
}
else
{
throw
new
ResponseException
(
"influxDB connect fail ."
);
}
}
public
Map
<
String
,
String
>
getFieldTypeMapping
(
Config
pluginConfig
,
String
index
)
{
String
org
=
Optional
.
of
(
pluginConfig
.
getString
(
InfluxdbOptionRule
.
ORG
.
key
())).
orElse
(
""
);
String
query
=
"from(bucket: \"primary\") |> range(start: "
+
Optional
.
of
(
pluginConfig
.
getString
(
InfluxdbOptionRule
.
HOUR
.
key
()))
.
orElse
(
"-1h"
)
+
")|> filter(fn: (r) => r._measurement == \""
+
index
+
"\" )"
;
Map
<
String
,
String
>
mapping
=
new
HashMap
<>();
try
{
List
<
FluxTable
>
tables
=
influxDBClient
.
getQueryApi
().
query
(
query
,
org
);
if
(
CollectionUtils
.
isNotEmpty
(
tables
))
{
List
<
List
<
FluxColumn
>>
collect
=
tables
.
stream
().
map
(
m
->
m
.
getColumns
()).
collect
(
Collectors
.
toList
());
// 都是相同的
List
<
FluxColumn
>
fluxColumnList
=
collect
.
get
(
0
);
;
fluxColumnList
.
stream
()
.
forEach
(
m
->
{
mapping
.
put
(
m
.
getLabel
(),
m
.
getDataType
());
});
}
}
catch
(
Exception
ex
)
{
throw
new
ResponseException
(
ex
);
}
return
mapping
;
}
// public static InfluxdbsClient createInstance(
// Optional<String> url,
// Optional<String> token,
// Optional<String> bucket,
// Optional<String> org) {
// InfluxDBClient influxDBClient1 = getRestClientBuilder(url, token, bucket, org);
// return new InfluxdbsClient(influxDBClient1);
// }
//
// private static InfluxDBClient getRestClientBuilder(
// Optional<String> url,
// Optional<String> token,
// Optional<String> bucket,
// Optional<String> org) {
// if (url.isPresent() && token.isPresent()) {
//
// return InfluxDBClientFactory.create(url.get(), token.get().toCharArray());
// } else {
// throw new ResponseException("influxDB connect fail .");
// }
// }
//
// public Map<String, String> getFieldTypeMapping(Config pluginConfig, String index) {
// String org =
// Optional.of(pluginConfig.getString(InfluxdbOptionRule.ORG.key())).orElse("");
//
// String query =
// "from(bucket: \"primary\") |> range(start: "
// + Optional.of(pluginConfig.getString(InfluxdbOptionRule.HOUR.key()))
// .orElse("-1h")
// + ")|> filter(fn: (r) => r._measurement == \""
// + index
// + "\" )";
// Map<String, String> mapping = new HashMap<>();
// try {
// List<FluxTable> tables = influxDBClient.getQueryApi().query(query, org);
// if (CollectionUtils.isNotEmpty(tables)) {
// List<List<FluxColumn>> collect =
// tables.stream().map(m -> m.getColumns()).collect(Collectors.toList());
// // 都是相同的
// List<FluxColumn> fluxColumnList = collect.get(0);
// ;
// fluxColumnList.stream()
// .forEach(
// m -> {
// mapping.put(m.getLabel(), m.getDataType());
// });
// }
//
// } catch (Exception ex) {
// throw new ResponseException(ex);
// }
// return mapping;
// }
}
seatunnel-datasource/seatunnel-datasource-plugins/datasource-influxdb/src/main/java/org/apache/seatunnel/datasource/plugin/influxdb/client/TableName.java
浏览文件 @
51e95d4a
package
org
.
apache
.
seatunnel
.
datasource
.
plugin
.
influxdb
.
client
;
import
com.influxdb.query.FluxTable
;
import
java.util.StringJoiner
;
public
class
TableName
{
private
String
_measurement
;
...
...
@@ -15,7 +11,8 @@ public class TableName {
return
this
.
_measurement
;
}
public
String
toString
()
{
return
(
new
StringJoiner
(
", "
,
FluxTable
.
class
.
getSimpleName
()
+
"["
,
"]"
)).
toString
();
}
// public String toString() {
// return (new StringJoiner(", ", FluxTable.class.getSimpleName() + "[",
// "]")).toString();
// }
}
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/pom.xml
浏览文件 @
51e95d4a
...
...
@@ -29,6 +29,18 @@
</properties>
<dependencies>
<!-- Oracle驱动包、支持字符集的依赖包-->
<!-- Oracle 驱动包 -->
<dependency>
<groupId>
com.oracle.database.jdbc
</groupId>
<artifactId>
ojdbc8
</artifactId>
<version>
12.2.0.1
</version>
</dependency>
<!-- Additional library required to support Internationalization -->
<dependency>
<groupId>
org.apache.seatunnel
</groupId>
<artifactId>
datasource-plugins-api
</artifactId>
...
...
@@ -50,13 +62,6 @@
<scope>
provided
</scope>
</dependency>
<!-- driver -->
<dependency>
<groupId>
com.oracle.database.jdbc
</groupId>
<artifactId>
ojdbc8
</artifactId>
<version>
${oracle-jdbc.version}
</version>
<scope>
provided
</scope>
</dependency>
</dependencies>
</project>
seatunnel-datasource/seatunnel-datasource-plugins/datasource-jdbc-oracle/src/main/java/org/apache/seatunnel/datasource/plugin/oracle/jdbc/OracleDataSourceChannel.java
浏览文件 @
51e95d4a
...
...
@@ -21,7 +21,6 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
import
org.apache.seatunnel.datasource.plugin.api.DataSourceChannelExt
;
import
org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException
;
import
org.apache.seatunnel.datasource.plugin.api.model.TableField
;
import
org.apache.seatunnel.datasource.plugin.api.utils.JdbcUtils
;
import
org.apache.commons.collections4.CollectionUtils
;
import
org.apache.commons.lang3.StringUtils
;
...
...
@@ -83,11 +82,12 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
@NonNull
String
pluginName
,
@NonNull
Map
<
String
,
String
>
requestParams
)
{
List
<
String
>
dbNames
=
new
ArrayList
<>();
try
(
Connection
connection
=
getConnection
(
requestParams
);
PreparedStatement
statement
=
connection
.
prepareStatement
(
"SHOW DATABASES;"
);
PreparedStatement
statement
=
connection
.
prepareStatement
(
"SELECT table_name FROM user_tables"
);
ResultSet
re
=
statement
.
executeQuery
())
{
// filter system databases
while
(
re
.
next
())
{
String
dbName
=
re
.
getString
(
"database"
);
String
dbName
=
re
.
getString
(
1
);
if
(
StringUtils
.
isNotBlank
(
dbName
)
&&
!
OracleDataSourceConfig
.
ORACLE_SYSTEM_DATABASES
.
contains
(
dbName
))
{
dbNames
.
add
(
dbName
);
...
...
@@ -124,8 +124,8 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
"SELECT "
+
" t.TABLE_NAME,\n"
+
" t.COLUMN_NAME,\n"
+
" t.COMMENTS,\n"
+
" t1.DATA_TYPE,\n"
+
" t.COMMENTS
REMARKS
,\n"
+
" t1.DATA_TYPE
TYPE_NAME
,\n"
+
" t1.DATA_LENGTH VAR_LEN,\n"
+
" t1.DATA_PRECISION LEN,\n"
+
" DATA_SCALE ,\n"
...
...
@@ -158,7 +158,8 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
tableField
.
setPrimaryKey
(
true
);
}
if
(
typeList
.
contains
(
tableField
.
getType
().
toLowerCase
()))
{
if
(
resultSet
.
getString
(
"TYPE_NAME"
)
!=
null
&&
typeList
.
contains
(
resultSet
.
getString
(
"TYPE_NAME"
).
toLowerCase
()))
{
tableField
.
setLen
(
resultSet
.
getString
(
"VAR_LEN"
));
}
else
{
tableField
.
setLen
(
resultSet
.
getString
(
"LEN"
));
...
...
@@ -174,7 +175,11 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
tableField
.
setType
(
resultSet
.
getString
(
"TYPE_NAME"
));
tableField
.
setComment
(
resultSet
.
getString
(
"REMARKS"
));
Object
nullable
=
resultSet
.
getObject
(
"IS_NULLABLE"
);
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
if
(
Objects
.
nonNull
(
nullable
))
{
tableField
.
setNullable
(
Boolean
.
TRUE
.
toString
().
equals
(
nullable
.
toString
()));
}
else
{
tableField
.
setNullable
(
Boolean
.
TRUE
);
}
tableFields
.
add
(
tableField
);
}
...
...
@@ -190,7 +195,13 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
@NonNull
Map
<
String
,
String
>
requestParams
,
@NonNull
String
database
,
@NonNull
List
<
String
>
tables
)
{
return
null
;
HashMap
<
String
,
List
<
TableField
>>
fields
=
new
HashMap
<>();
for
(
String
table
:
tables
)
{
List
<
TableField
>
tableFields
=
getTableFields
(
pluginName
,
requestParams
,
database
,
table
);
fields
.
put
(
table
,
tableFields
);
}
return
fields
;
}
private
String
getPrimaryKey
(
DatabaseMetaData
metaData
,
String
dbName
,
String
tableName
)
...
...
@@ -211,12 +222,22 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
throws
SQLException
,
ClassNotFoundException
{
checkNotNull
(
requestParams
.
get
(
OracleOptionRule
.
DRIVER
.
key
()));
checkNotNull
(
requestParams
.
get
(
OracleOptionRule
.
URL
.
key
()),
"Jdbc url cannot be null"
);
String
url
=
JdbcUtils
.
replaceDatabase
(
requestParams
.
get
(
OracleOptionRule
.
URL
.
key
()),
databaseName
);
String
url
=
requestParams
.
get
(
OracleOptionRule
.
URL
.
key
());
if
(
requestParams
.
containsKey
(
OracleOptionRule
.
USER
.
key
()))
{
String
username
=
requestParams
.
get
(
OracleOptionRule
.
USER
.
key
());
String
password
=
requestParams
.
get
(
OracleOptionRule
.
PASSWORD
.
key
());
String
driver
=
requestParams
.
get
(
OracleOptionRule
.
DRIVER
.
key
());
if
(
StringUtils
.
isEmpty
(
driver
))
{
driver
=
"oracle.jdbc.driver.OracleDriver"
;
}
Class
Clazz
=
Class
.
forName
(
driver
);
Driver
driverInstance
=
null
;
try
{
driverInstance
=
(
Driver
)
Clazz
.
newInstance
();
}
catch
(
InstantiationException
|
IllegalAccessException
e
)
{
throw
new
RuntimeException
(
e
);
}
DriverManager
.
registerDriver
(
driverInstance
);
return
DriverManager
.
getConnection
(
url
,
username
,
password
);
}
return
DriverManager
.
getConnection
(
url
);
...
...
@@ -231,16 +252,16 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
Map
<
String
,
String
>
options
)
{
List
<
Map
>
tableNames
=
new
ArrayList
<>();
try
(
Connection
connection
=
getConnection
(
requestParams
,
database
))
{
StringBuilder
sql
=
new
StringBuilder
(
"s
s
elect * from all_tables where 1=1 "
);
StringBuilder
sql
=
new
StringBuilder
(
"select * from all_tables where 1=1 "
);
if
(
StringUtils
.
isNotBlank
(
database
))
{
sql
.
append
(
" and OWNER=\
""
+
database
+
"\"
"
);
sql
.
append
(
" and OWNER=\
'"
+
database
+
"\'
"
);
}
if
(
StringUtils
.
isNotBlank
(
tableName
))
{
sql
.
append
(
" and table_name=\
""
+
tableName
+
"\"
"
);
sql
.
append
(
" and table_name=\
'"
+
tableName
+
"\'
"
);
}
Statement
statement
=
connection
.
createStatement
();
...
...
@@ -250,23 +271,23 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
while
(
resultSet
.
next
())
{
Map
tables
=
new
HashMap
();
// ResultSetMetaData metaData = resultSet.getMetaData();
// 获取表列数
int
columnCount
=
metaData
.
getColumnCount
();
for
(
int
i
=
1
;
i
<=
columnCount
;
i
++)
{
tables
.
put
(
metaData
.
getColumnName
(
i
),
resultSet
.
getObject
(
i
));
}
//
//
ResultSetMetaData metaData = resultSet.getMetaData();
//
//
获取表列数
//
int columnCount = metaData.getColumnCount();
//
for (int i = 1; i <= columnCount; i++) {
//
tables.put(metaData.getColumnName(i), resultSet.getObject(i));
//
}
String
tableNameold
=
resultSet
.
getString
(
"TABLE_NAME"
);
if
(
StringUtils
.
isNotBlank
(
tableNameold
))
{
tables
.
put
(
"TABLE_NAME"
,
tableNameold
);
}
String
tableComment
=
resultSet
.
getString
(
"TABLE_COMMENT"
);
if
(
StringUtils
.
isNotBlank
(
tableComment
))
{
tables
.
put
(
"TABLE_COMMENT"
,
tableComment
);
}
//
String tableComment = resultSet.getString("TABLE_COMMENT");
//
//
if (StringUtils.isNotBlank(tableComment)) {
//
tables.put("TABLE_COMMENT", tableComment);
//
}
tableNames
.
add
(
tables
);
}
return
tableNames
;
...
...
@@ -285,7 +306,7 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
try
(
Connection
connection
=
getConnection
(
requestParams
,
database
))
{
String
sql
=
"s
s
elect * from all_tables where table_name='"
"select * from all_tables where table_name='"
+
tableName
+
"' and OWNER ='"
+
database
...
...
seatunnel-datasource/seatunnel-datasource-plugins/datasource-s3/src/test/java/org/apache/seatunnel/datasource/plugin/s3/HadoopS3AConfigurationTest.java
浏览文件 @
51e95d4a
...
...
@@ -25,10 +25,11 @@ class HadoopS3AConfigurationTest {
map
.
put
(
"fs.s3a.aws.credentials.provider"
,
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
);
map
.
put
(
"hadoop_s3_properties"
,
"
"
);
map
.
put
(
"hadoop_s3_properties"
,
""
);
map
.
put
(
"access_key"
,
"zyminio"
);
map
.
put
(
"secret_key"
,
"zysoft123"
);
map
.
put
(
"secret_key"
,
"zysoft123
1
"
);
Configuration
configuration
=
HadoopS3AConfiguration
.
getConfiguration
(
map
);
// List<String> databases = new S3DatasourceChannel().getDatabases("S3", map);
// System.out.println(databases+"-----------------------");
// List<String> tables = new S3DatasourceChannel().getTables("S3", map, "backup",
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论