提交 b502349f authored 作者: 李纤's avatar 李纤

修改influxdb连接

上级 67193dbc
......@@ -50,12 +50,16 @@
<scope>provided</scope>
</dependency>
<!-- driver -->
<dependency>
<groupId>com.influxdb</groupId>
<artifactId>influxdb-client-java</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.23</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
......
......@@ -17,7 +17,6 @@
package org.apache.seatunnel.datasource.plugin.influxdb;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.configuration.util.OptionRule;
......@@ -28,10 +27,14 @@ import org.apache.seatunnel.datasource.plugin.influxdb.client.InfluxdbsClient;
import org.apache.commons.lang3.StringUtils;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import lombok.NonNull;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
......@@ -69,10 +72,20 @@ public class InfluxdbDataSourceChannel implements DataSourceChannel {
String database,
Map<String, String> option) {
try (InfluxdbsClient client =
InfluxdbsClient.createInstance(ConfigFactory.parseMap(requestParams))) {
return client.getTables(ConfigFactory.parseMap(requestParams));
try {
InfluxDB instance =
InfluxdbsClient.createInstance(ConfigFactory.parseMap(requestParams));
instance.setDatabase(database);
QueryResult showMeasurements = instance.query(new Query("show measurements"));
List<QueryResult.Result> results = showMeasurements.getResults();
List<String> table = new ArrayList<>();
for (QueryResult.Result result : results) {
List<List<Object>> values = result.getSeries().get(0).getValues();
for (List<Object> value : values) {
table.add(String.valueOf(value.get(0)));
}
}
return table;
} catch (Exception e) {
throw new RuntimeException(e);
}
......@@ -81,16 +94,42 @@ public class InfluxdbDataSourceChannel implements DataSourceChannel {
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
return DEFAULT_DATABASES;
InfluxDB instance = InfluxdbsClient.createInstance(ConfigFactory.parseMap(requestParams));
boolean isConnected = false;
QueryResult showDatabases = new QueryResult();
try {
// 尝试执行查询,如果成功,则连接成功
showDatabases = instance.query(new Query("SHOW DATABASES"));
isConnected = true;
} catch (Exception e) {
// 如果有异常,则连接失败
e.printStackTrace();
}
List<QueryResult.Result> results = showDatabases.getResults();
List<String> database = new ArrayList<>();
for (QueryResult.Result result : results) {
String name = result.getSeries().get(0).getName();
database.add(name);
}
return database;
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
try (InfluxdbsClient client =
InfluxdbsClient.createInstance(ConfigFactory.parseMap(requestParams))) {
return true;
try {
InfluxDB instance =
InfluxdbsClient.createInstance(ConfigFactory.parseMap(requestParams));
try {
// 尝试执行查询,如果成功,则连接成功
instance.query(new Query("SHOW DATABASES"));
return true;
} catch (Exception e) {
// 如果有异常,则连接失败
e.printStackTrace();
}
return false;
} catch (Throwable e) {
throw new DataSourcePluginException(
"check InfluxDB connectivity failed, " + e.getMessage(), e);
......@@ -104,15 +143,28 @@ public class InfluxdbDataSourceChannel implements DataSourceChannel {
String database,
String table) {
// databaseCheck(database);
try (InfluxdbsClient client =
InfluxdbsClient.createInstance(ConfigFactory.parseMap(requestParams))) {
Config config = ConfigFactory.parseMap(requestParams);
Map<String, String> fieldTypeMapping = client.getFieldTypeMapping(config, table);
try {
InfluxDB instance =
InfluxdbsClient.createInstance(ConfigFactory.parseMap(requestParams));
instance.setDatabase(database);
QueryResult showDatabases = new QueryResult();
try {
// 尝试执行查询,如果成功,则连接成功
showDatabases = instance.query(new Query("select * from " + table, database));
} catch (Exception e) {
// 如果有异常,则连接失败
e.printStackTrace();
}
List<QueryResult.Result> results = showDatabases.getResults();
List<TableField> fields = new ArrayList<>();
fieldTypeMapping.forEach(
(fieldName, fieldType) ->
fields.add(convertToTableField(fieldName, fieldType)));
for (QueryResult.Result result : results) {
List<String> columns = result.getSeries().get(0).getColumns();
for (String column : columns) {
TableField tableField = new TableField();
tableField.setName(column);
fields.add(tableField);
}
}
return fields;
} catch (Exception ex) {
throw new DataSourcePluginException("Get table fields failed", ex);
......@@ -125,12 +177,30 @@ public class InfluxdbDataSourceChannel implements DataSourceChannel {
@NonNull Map<String, String> requestParams,
String database,
List<String> tables) {
// databaseCheck(database);
Map<String, List<TableField>> tableFields = new HashMap<>();
tables.forEach(
table ->
tableFields.put(
table, getTableFields(pluginName, requestParams, database, table)));
InfluxDB instance = InfluxdbsClient.createInstance(ConfigFactory.parseMap(requestParams));
instance.setDatabase(database);
Map<String, List<TableField>> tableFields = new LinkedHashMap<>();
for (String table : tables) {
QueryResult showDatabases = new QueryResult();
try {
// 尝试执行查询,如果成功,则连接成功
showDatabases = instance.query(new Query("select * from " + table, database));
} catch (Exception e) {
// 如果有异常,则连接失败
e.printStackTrace();
}
List<QueryResult.Result> results = showDatabases.getResults();
List<TableField> fields = new ArrayList<>();
for (QueryResult.Result result : results) {
List<String> columns = result.getSeries().get(0).getColumns();
for (String column : columns) {
TableField tableField = new TableField();
tableField.setName(column);
fields.add(tableField);
}
}
tableFields.put(table, fields);
}
return tableFields;
}
......
......@@ -32,6 +32,18 @@ public class InfluxdbOptionRule {
.withDescription(
"Influxdb Url address, the format is http://host:port, allowing only hosts to be specified. Such as http://192.168.1.124:8086 ");
public static final Option<String> USERNAME =
Options.key("username")
.stringType()
.noDefaultValue()
.withDescription("Influxdb username ");
public static final Option<String> PASSWORD =
Options.key("password")
.stringType()
.noDefaultValue()
.withDescription("Influxdb password");
public static final Option<String> TOKEN =
Options.key("token").stringType().noDefaultValue().withDescription("Influxdb token ");
......@@ -48,10 +60,10 @@ public class InfluxdbOptionRule {
.withDescription("Influxdb hour(Unit: Hour, negative)");
public static OptionRule optionRule() {
return OptionRule.builder().required(URL).optional(TOKEN, BUCKET, ORG, HOUR).build();
return OptionRule.builder().required(URL).optional(USERNAME, PASSWORD, HOUR).build();
}
public static OptionRule metadataRule() {
return OptionRule.builder().required(TOKEN).build();
return OptionRule.builder().required(USERNAME).build();
}
}
......@@ -5,129 +5,137 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.datasource.plugin.influxdb.InfluxdbOptionRule;
import org.apache.commons.collections4.CollectionUtils;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import com.influxdb.query.FluxColumn;
import com.influxdb.query.FluxTable;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
@Slf4j
public class InfluxdbsClient implements AutoCloseable {
private final InfluxDBClient influxDBClient;
// private InfluxDBClient influxDBClient;
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private InfluxdbsClient(InfluxDBClient influxDBClient) {
this.influxDBClient = influxDBClient;
}
// private InfluxdbsClient(InfluxDBClient influxDBClient) {
// this.influxDBClient = influxDBClient;
// }
@Override
public void close() throws Exception {
try {
influxDBClient.close();
// influxDBClient.close();
} catch (Exception e) {
log.warn("close influxdb connection error", e);
}
}
public List<String> getTables(Config pluginConfig) {
String org = Optional.of(pluginConfig.getString(InfluxdbOptionRule.ORG.key())).orElse("");
String sj =
Optional.of(pluginConfig.getString(InfluxdbOptionRule.HOUR.key())).orElse("-1h");
String query = "from(bucket: \"primary\") |> range(start: " + sj + ") ";
List<TableName> tables = influxDBClient.getQueryApi().query(query, org, TableName.class);
return tables.stream()
.map(TableName::get_measurement)
.collect(Collectors.toSet())
.stream()
.collect(Collectors.toList());
}
public static InfluxdbsClient createInstance(Config pluginConfig) {
// public List<String> getTables(Config pluginConfig) {
// String org =
// Optional.of(pluginConfig.getString(InfluxdbOptionRule.ORG.key())).orElse("");
// String sj =
//
// Optional.of(pluginConfig.getString(InfluxdbOptionRule.HOUR.key())).orElse("-1h");
// String query = "from(bucket: \"primary\") |> range(start: " + sj + ") ";
// List<TableName> tables = influxDBClient.getQueryApi().query(query, org,
// TableName.class);
//
// return tables.stream()
// .map(TableName::get_measurement)
// .collect(Collectors.toSet())
// .stream()
// .collect(Collectors.toList());
// }
public static InfluxDB createInstance(Config pluginConfig) {
try {
Optional<String> url = Optional.empty();
Optional<String> token = Optional.empty();
Optional<String> bucket = Optional.empty();
Optional<String> username = Optional.empty();
Optional<String> password = Optional.empty();
Optional<String> org = Optional.empty();
Optional<String> bucket = Optional.empty();
if (pluginConfig.hasPath(InfluxdbOptionRule.URL.key())) {
url = Optional.of(pluginConfig.getString(InfluxdbOptionRule.URL.key()));
}
if (pluginConfig.hasPath(InfluxdbOptionRule.TOKEN.key())) {
token = Optional.of(pluginConfig.getString(InfluxdbOptionRule.TOKEN.key()));
if (pluginConfig.hasPath(InfluxdbOptionRule.USERNAME.key())) {
username = Optional.of(pluginConfig.getString(InfluxdbOptionRule.USERNAME.key()));
}
if (pluginConfig.hasPath(InfluxdbOptionRule.BUCKET.key())) {
bucket = Optional.of(pluginConfig.getString(InfluxdbOptionRule.BUCKET.key()));
if (pluginConfig.hasPath(InfluxdbOptionRule.PASSWORD.key())) {
password = Optional.of(pluginConfig.getString(InfluxdbOptionRule.PASSWORD.key()));
}
if (pluginConfig.hasPath(InfluxdbOptionRule.ORG.key())) {
org = Optional.of(pluginConfig.getString(InfluxdbOptionRule.ORG.key()));
}
return createInstance(url, token, bucket, org);
if (pluginConfig.hasPath(InfluxdbOptionRule.BUCKET.key())) {
bucket = Optional.of(pluginConfig.getString(InfluxdbOptionRule.BUCKET.key()));
}
try {
InfluxDB connect =
InfluxDBFactory.connect(url.get(), username.get(), password.get());
return connect;
} catch (Exception e) {
throw new RuntimeException(e);
}
// return createInstance(url, token, bucket, org);
} catch (Exception e) {
throw new RuntimeException("Create EsRestClient failed", e);
}
}
public static InfluxdbsClient createInstance(
Optional<String> url,
Optional<String> token,
Optional<String> bucket,
Optional<String> org) {
InfluxDBClient influxDBClient1 = getRestClientBuilder(url, token, bucket, org);
return new InfluxdbsClient(influxDBClient1);
}
private static InfluxDBClient getRestClientBuilder(
Optional<String> url,
Optional<String> token,
Optional<String> bucket,
Optional<String> org) {
if (url.isPresent() && token.isPresent()) {
return InfluxDBClientFactory.create(url.get(), token.get().toCharArray());
} else {
throw new ResponseException("influxDB connect fail .");
}
}
public Map<String, String> getFieldTypeMapping(Config pluginConfig, String index) {
String org = Optional.of(pluginConfig.getString(InfluxdbOptionRule.ORG.key())).orElse("");
String query =
"from(bucket: \"primary\") |> range(start: "
+ Optional.of(pluginConfig.getString(InfluxdbOptionRule.HOUR.key()))
.orElse("-1h")
+ ")|> filter(fn: (r) => r._measurement == \""
+ index
+ "\" )";
Map<String, String> mapping = new HashMap<>();
try {
List<FluxTable> tables = influxDBClient.getQueryApi().query(query, org);
if (CollectionUtils.isNotEmpty(tables)) {
List<List<FluxColumn>> collect =
tables.stream().map(m -> m.getColumns()).collect(Collectors.toList());
// 都是相同的
List<FluxColumn> fluxColumnList = collect.get(0);
;
fluxColumnList.stream()
.forEach(
m -> {
mapping.put(m.getLabel(), m.getDataType());
});
}
} catch (Exception ex) {
throw new ResponseException(ex);
}
return mapping;
}
// public static InfluxdbsClient createInstance(
// Optional<String> url,
// Optional<String> token,
// Optional<String> bucket,
// Optional<String> org) {
// InfluxDBClient influxDBClient1 = getRestClientBuilder(url, token, bucket, org);
// return new InfluxdbsClient(influxDBClient1);
// }
//
// private static InfluxDBClient getRestClientBuilder(
// Optional<String> url,
// Optional<String> token,
// Optional<String> bucket,
// Optional<String> org) {
// if (url.isPresent() && token.isPresent()) {
//
// return InfluxDBClientFactory.create(url.get(), token.get().toCharArray());
// } else {
// throw new ResponseException("influxDB connect fail .");
// }
// }
//
// public Map<String, String> getFieldTypeMapping(Config pluginConfig, String index) {
// String org =
// Optional.of(pluginConfig.getString(InfluxdbOptionRule.ORG.key())).orElse("");
//
// String query =
// "from(bucket: \"primary\") |> range(start: "
// + Optional.of(pluginConfig.getString(InfluxdbOptionRule.HOUR.key()))
// .orElse("-1h")
// + ")|> filter(fn: (r) => r._measurement == \""
// + index
// + "\" )";
// Map<String, String> mapping = new HashMap<>();
// try {
// List<FluxTable> tables = influxDBClient.getQueryApi().query(query, org);
// if (CollectionUtils.isNotEmpty(tables)) {
// List<List<FluxColumn>> collect =
// tables.stream().map(m -> m.getColumns()).collect(Collectors.toList());
// // 都是相同的
// List<FluxColumn> fluxColumnList = collect.get(0);
// ;
// fluxColumnList.stream()
// .forEach(
// m -> {
// mapping.put(m.getLabel(), m.getDataType());
// });
// }
//
// } catch (Exception ex) {
// throw new ResponseException(ex);
// }
// return mapping;
// }
}
package org.apache.seatunnel.datasource.plugin.influxdb.client;
import com.influxdb.query.FluxTable;
import java.util.StringJoiner;
public class TableName {
private String _measurement;
......@@ -15,7 +11,8 @@ public class TableName {
return this._measurement;
}
public String toString() {
return (new StringJoiner(", ", FluxTable.class.getSimpleName() + "[", "]")).toString();
}
// public String toString() {
// return (new StringJoiner(", ", FluxTable.class.getSimpleName() + "[",
// "]")).toString();
// }
}
......@@ -83,7 +83,8 @@ public class OracleDataSourceChannel implements DataSourceChannelExt {
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
List<String> dbNames = new ArrayList<>();
try (Connection connection = getConnection(requestParams);
PreparedStatement statement = connection.prepareStatement("SHOW DATABASES;");
PreparedStatement statement =
connection.prepareStatement("SELECT table_name FROM user_tables;");
ResultSet re = statement.executeQuery()) {
// filter system databases
while (re.next()) {
......
......@@ -25,10 +25,11 @@ class HadoopS3AConfigurationTest {
map.put(
"fs.s3a.aws.credentials.provider",
"org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider");
map.put("hadoop_s3_properties", " ");
map.put("hadoop_s3_properties", "");
map.put("access_key", "zyminio");
map.put("secret_key", "zysoft123");
map.put("secret_key", "zysoft1231");
Configuration configuration = HadoopS3AConfiguration.getConfiguration(map);
// List<String> databases = new S3DatasourceChannel().getDatabases("S3", map);
// System.out.println(databases+"-----------------------");
// List<String> tables = new S3DatasourceChannel().getTables("S3", map, "backup",
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论