提交 65521e33 authored 作者: 王红亮's avatar 王红亮

更新redis配置

上级 0801baf5
......@@ -42,7 +42,8 @@ public class FtpDatasourceChannel implements DataSourceChannel {
}
@Override
public boolean checkDataSourceConnectivity(@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
FtpConfiguration conf = FtpAConfiguration.getConfiguration(requestParams);
try {
FTPClient ftpClient = FtpClientService.connect(conf);
......
package org.apache.seatunnel.datasource.plugin.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
......
package org.apache.seatunnel.datasource.plugin.mqtt;
import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
......@@ -10,6 +9,8 @@ import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import lombok.Data;
/** @Author Heartsuit @Date 2022-12-11 */
@Data
public class MqttClientService {
......
......@@ -17,19 +17,22 @@
package org.apache.seatunnel.datasource.plugin.mqtt;
import com.alibaba.fastjson2.JSONObject;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.alibaba.fastjson2.JSONObject;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
......
......@@ -17,13 +17,14 @@
package org.apache.seatunnel.datasource.plugin.mqtt;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set;
@AutoService(DataSourceFactory.class)
......
package org.apache.seatunnel.datasource.plugin.redis;
import org.apache.commons.lang3.StringUtils;
import redis.clients.jedis.ConnectionPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
......
package org.apache.seatunnel.datasource.plugin.redis;
import lombok.Data;
import scala.Int;
@Data
public class RedisConfiguration {
private String host;
private Integer port;
......@@ -13,10 +9,16 @@ public class RedisConfiguration {
private String data_type;
private String mode;
public RedisConfiguration() {
}
public RedisConfiguration() {}
public RedisConfiguration(String host, Integer port, String user, String auth, String key, String data_type, String mode) {
public RedisConfiguration(
String host,
Integer port,
String user,
String auth,
String key,
String data_type,
String mode) {
this.host = host;
this.port = port;
this.user = user;
......@@ -28,15 +30,28 @@ public class RedisConfiguration {
@Override
public String toString() {
return "RedisConfiguration{" +
"host='" + host + '\'' +
", port=" + port +
", user='" + user + '\'' +
", auth='" + auth + '\'' +
", key='" + key + '\'' +
", data_type='" + data_type + '\'' +
", mode='" + mode + '\'' +
'}';
return "RedisConfiguration{"
+ "host='"
+ host
+ '\''
+ ", port="
+ port
+ ", user='"
+ user
+ '\''
+ ", auth='"
+ auth
+ '\''
+ ", key='"
+ key
+ '\''
+ ", data_type='"
+ data_type
+ '\''
+ ", mode='"
+ mode
+ '\''
+ '}';
}
public String getHost() {
......@@ -94,4 +109,4 @@ public class RedisConfiguration {
public void setMode(String mode) {
this.mode = mode;
}
}
\ No newline at end of file
}
......@@ -22,17 +22,12 @@ import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
import java.sql.Connection;
import java.util.*;
import static com.google.common.base.Preconditions.checkArgument;
@Slf4j
public class RedisDataSourceChannel implements DataSourceChannel {
......@@ -60,11 +55,13 @@ public class RedisDataSourceChannel implements DataSourceChannel {
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
throw new UnsupportedOperationException("getDatabases is not supported for Redis datasource");
throw new UnsupportedOperationException(
"getDatabases is not supported for Redis datasource");
}
@Override
public boolean checkDataSourceConnectivity(@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
RedisConfiguration conf = RedisAConfiguration.getConfiguration(requestParams);
try {
Jedis redisClient = RedisClientService.connect(conf);
......@@ -89,18 +86,14 @@ public class RedisDataSourceChannel implements DataSourceChannel {
}
}
private Connection getConnection(Map<String, String> requestParams) throws Exception {
return getConnection(requestParams);
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default");
return Collections.emptyList();
throw new UnsupportedOperationException(
"getTableFields is not supported for redis datasource");
}
@Override
......@@ -109,7 +102,7 @@ public class RedisDataSourceChannel implements DataSourceChannel {
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default");
return Collections.emptyMap();
throw new UnsupportedOperationException(
"getTableFields is not supported for redis datasource");
}
}
......@@ -105,8 +105,8 @@ public class RedisOptionRule {
public static OptionRule optionRule() {
return OptionRule.builder()
.required(HOST, PORT, KEY,DATA_TYPE)
.optional(USER, AUTH,MODE, FORMAT,HASH_KEY_PARSE_MODE,KEY_PATTERN)
.required(HOST, PORT, KEY, DATA_TYPE)
.optional(USER, AUTH, MODE, FORMAT, HASH_KEY_PARSE_MODE, KEY_PATTERN)
.build();
}
......
......@@ -17,8 +17,6 @@
package org.apache.seatunnel.datasource.plugin.s3;
import com.alibaba.fastjson2.JSON;
import com.amazonaws.services.dynamodbv2.xspec.S;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
......@@ -27,6 +25,7 @@ import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.lang3.StringUtils;
import com.alibaba.fastjson2.JSON;
import io.minio.BucketExistsArgs;
import io.minio.GetObjectArgs;
import io.minio.GetObjectResponse;
......@@ -253,7 +252,8 @@ public class S3DatasourceChannel implements DataSourceChannel {
Map<String, List<TableField>> stringList = new HashMap<>();
tables.forEach(
tab -> {
List<TableField> tableFields = getObject(minioClient, tab, database, requestParams);
List<TableField> tableFields =
getObject(minioClient, tab, database, requestParams);
stringList.put(tab, tableFields);
});
return stringList;
......@@ -261,7 +261,11 @@ public class S3DatasourceChannel implements DataSourceChannel {
// "getTableFields is not supported for S3 datasource");
}
private List<TableField> getObject(MinioClient minioClient, String table, String database, Map<String, String> requestParams) {
private List<TableField> getObject(
MinioClient minioClient,
String table,
String database,
Map<String, String> requestParams) {
List<TableField> all = new ArrayList<>();
try {
GetObjectResponse minioClientObject =
......@@ -281,11 +285,13 @@ public class S3DatasourceChannel implements DataSourceChannel {
String name = declaredFields[i].getName();
map.put(name, split[i]);
}
TableField tableField = JSON.parseObject(JSON.toJSONString(map), TableField.class);
TableField tableField =
JSON.parseObject(JSON.toJSONString(map), TableField.class);
all.add(tableField);
}
}
System.out.println(all+"==================================================================");
System.out.println(
all + "==================================================================");
} catch (ErrorResponseException
| InsufficientDataException
| InternalException
......
......@@ -34,7 +34,8 @@ class HadoopS3AConfigurationTest {
// List<String> tables = new S3DatasourceChannel().getTables("S3", map, "backup",
// null);
// System.out.println(tables+"-----------------------");
List<TableField> tableField = new S3DatasourceChannel().getTableFields("S3", map, "ces", "aa/bb/cc/ss.csv");
List<TableField> tableField =
new S3DatasourceChannel().getTableFields("S3", map, "ces", "aa/bb/cc/ss.csv");
System.out.println(tableField + "-----------------------");
System.out.println(configuration);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论