提交 0801baf5 authored 作者: 王红亮's avatar 王红亮

更新redis配置

上级 d836318a
......@@ -17,33 +17,73 @@
package org.apache.seatunnel.datasource.plugin.redis;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import lombok.NonNull;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import static com.google.common.base.Preconditions.checkArgument;
public class RedisRequestParamsUtils {
public static Properties parsePropertiesFromRequestParams(Map<String, String> requestParams) {
checkArgument(
requestParams.containsKey(RedisOptionRule.HOST.key()),
String.format("Missing %s in requestParams", RedisOptionRule.HOST.key()));
final Properties properties = new Properties();
properties.put(RedisOptionRule.HOST.key(), requestParams.get(RedisOptionRule.HOST.key()));
if (requestParams.containsKey(RedisOptionRule.PORT.key())) {
Config configObject =
ConfigFactory.parseString(requestParams.get(RedisOptionRule.PORT.key()));
configObject
.entrySet()
.forEach(
entry -> {
properties.put(
entry.getKey(), entry.getValue().unwrapped().toString());
});
}
return properties;
import java.util.Set;
public class JedisWrapper extends Jedis {
private final JedisCluster jedisCluster;
public JedisWrapper(@NonNull JedisCluster jedisCluster) {
this.jedisCluster = jedisCluster;
}
@Override
public String set(final String key, final String value) {
return jedisCluster.set(key, value);
}
@Override
public String get(final String key) {
return jedisCluster.get(key);
}
@Override
public long hset(final String key, final Map<String, String> hash) {
return jedisCluster.hset(key, hash);
}
@Override
public Map<String, String> hgetAll(final String key) {
return jedisCluster.hgetAll(key);
}
@Override
public long lpush(final String key, final String... strings) {
return jedisCluster.lpush(key, strings);
}
@Override
public List<String> lrange(final String key, final long start, final long stop) {
return jedisCluster.lrange(key, start, stop);
}
@Override
public long sadd(final String key, final String... members) {
return jedisCluster.sadd(key, members);
}
@Override
public Set<String> smembers(final String key) {
return jedisCluster.smembers(key);
}
@Override
public long zadd(final String key, final double score, final String member) {
return jedisCluster.zadd(key, score, member);
}
@Override
public List<String> zrange(final String key, final long start, final long stop) {
return jedisCluster.zrange(key, start, stop);
}
@Override
public void close() {
jedisCluster.close();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.redis;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@Slf4j
public class RedisAConfiguration {
public static RedisConfiguration getConfiguration(Map<String, String> redisOption) {
if (!redisOption.containsKey(RedisOptionRule.HOST.key())) {
throw new IllegalArgumentException(
"redis datasource host is null, please check your config");
}
if (!redisOption.containsKey(RedisOptionRule.PORT.key())) {
throw new IllegalArgumentException(
"redis datasource port is null, please check your config");
}
if (!redisOption.containsKey(RedisOptionRule.USER.key())) {
throw new IllegalArgumentException(
"redis datasource username is null, please check your config");
}
if (!redisOption.containsKey(RedisOptionRule.AUTH.key())) {
throw new IllegalArgumentException(
"redis datasource password is null, please check your config");
}
if (!redisOption.containsKey(RedisOptionRule.KEY.key())) {
throw new IllegalArgumentException(
"redis datasource path is null, please check your config");
}
if (!redisOption.containsKey(RedisOptionRule.DATA_TYPE.key())) {
throw new IllegalArgumentException(
"redis datasource file_format_type is null, please check your config");
}
RedisConfiguration redisConfiguration = new RedisConfiguration();
redisConfiguration.setHost(redisOption.get(RedisOptionRule.HOST.key()));
redisConfiguration.setPort(Integer.valueOf(redisOption.get(RedisOptionRule.PORT.key())));
redisConfiguration.setUser(redisOption.get(RedisOptionRule.USER.key()));
redisConfiguration.setAuth(redisOption.get(RedisOptionRule.AUTH.key()));
redisConfiguration.setKey(redisOption.get(RedisOptionRule.KEY.key()));
redisConfiguration.setData_type(redisOption.get(RedisOptionRule.DATA_TYPE.key()));
return redisConfiguration;
}
}
package org.apache.seatunnel.datasource.plugin.redis;
import org.apache.commons.lang3.StringUtils;
import redis.clients.jedis.ConnectionPoolConfig;
import redis.clients.jedis.HostAndPort;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
public class RedisClientService {
public static Jedis connect(RedisConfiguration conf) {
List<String> redisNodes = Collections.emptyList();
switch (conf.getMode()) {
case "SINGLE":
Jedis jedis = new Jedis(conf.getHost(), conf.getPort());
if (StringUtils.isNotBlank(conf.getAuth())) {
jedis.auth(conf.getAuth());
}
if (StringUtils.isNotBlank(conf.getUser())) {
jedis.aclSetUser(conf.getUser());
}
return jedis;
case "CLUSTER":
HashSet<HostAndPort> nodes = new HashSet<>();
HostAndPort node = new HostAndPort(conf.getHost(), conf.getPort());
nodes.add(node);
if (!redisNodes.isEmpty()) {
for (String redisNode : redisNodes) {
String[] splits = redisNode.split(":");
if (splits.length != 2) {
throw new RuntimeException(
"Invalid redis node information,"
+ "redis node information must like as the following: [host:port]");
}
HostAndPort hostAndPort =
new HostAndPort(splits[0], Integer.parseInt(splits[1]));
nodes.add(hostAndPort);
}
}
ConnectionPoolConfig connectionPoolConfig = new ConnectionPoolConfig();
JedisCluster jedisCluster;
if (StringUtils.isNotBlank(conf.getAuth())) {
jedisCluster =
new JedisCluster(
nodes,
JedisCluster.DEFAULT_TIMEOUT,
JedisCluster.DEFAULT_TIMEOUT,
JedisCluster.DEFAULT_MAX_ATTEMPTS,
conf.getAuth(),
connectionPoolConfig);
} else {
jedisCluster = new JedisCluster(nodes);
}
return new JedisWrapper(jedisCluster);
default:
// do nothing
throw new RuntimeException("Not support this redis mode");
}
}
}
package org.apache.seatunnel.datasource.plugin.redis;
import lombok.Data;
import scala.Int;
@Data
public class RedisConfiguration {
private String host;
private Integer port;
private String user;
private String auth;
private String key;
private String data_type;
private String mode;
public RedisConfiguration() {
}
public RedisConfiguration(String host, Integer port, String user, String auth, String key, String data_type, String mode) {
this.host = host;
this.port = port;
this.user = user;
this.auth = auth;
this.key = key;
this.data_type = data_type;
this.mode = mode;
}
@Override
public String toString() {
return "RedisConfiguration{" +
"host='" + host + '\'' +
", port=" + port +
", user='" + user + '\'' +
", auth='" + auth + '\'' +
", key='" + key + '\'' +
", data_type='" + data_type + '\'' +
", mode='" + mode + '\'' +
'}';
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getAuth() {
return auth;
}
public void setAuth(String auth) {
this.auth = auth;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getData_type() {
return data_type;
}
public void setData_type(String data_type) {
this.data_type = data_type;
}
public String getMode() {
return mode;
}
public void setMode(String mode) {
this.mode = mode;
}
}
\ No newline at end of file
......@@ -26,6 +26,7 @@ import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
import java.sql.Connection;
import java.util.*;
......@@ -52,33 +53,39 @@ public class RedisDataSourceChannel implements DataSourceChannel {
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> option) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
try {
if (StringUtils.isNotBlank(database)) {
return Arrays.asList(database);
}
} catch (Exception ex) {
throw new DataSourcePluginException(
"check redis connectivity failed, " + ex.getMessage(), ex);
}
return null;
Map<String, String> options) {
throw new UnsupportedOperationException("getTables is not supported for Redis datasource");
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
return DEFAULT_DATABASES;
throw new UnsupportedOperationException("getDatabases is not supported for Redis datasource");
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
try (Connection ignored = getConnection(requestParams)) {
return true;
public boolean checkDataSourceConnectivity(@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
RedisConfiguration conf = RedisAConfiguration.getConfiguration(requestParams);
try {
Jedis redisClient = RedisClientService.connect(conf);
if (Objects.isNull(conf)) {
throw new DataSourcePluginException(
String.format(
"check redis connectivity failed, config is: %s", requestParams));
}
if (Objects.nonNull(redisClient)) {
return true;
} else {
throw new DataSourcePluginException(
String.format(
"check redis connectivity failed, config is: %s", requestParams));
}
} catch (Exception e) {
throw new DataSourcePluginException("check jdbc connectivity failed", e);
throw new DataSourcePluginException(
String.format("check redis connectivity failed, config is: %s", requestParams));
}
}
......
......@@ -105,8 +105,8 @@ public class RedisOptionRule {
public static OptionRule optionRule() {
return OptionRule.builder()
.required(HOST, PORT, KEY)
.optional(USER, AUTH, KEY_PATTERN, FORMAT)
.required(HOST, PORT, KEY,DATA_TYPE)
.optional(USER, AUTH,MODE, FORMAT,HASH_KEY_PARSE_MODE,KEY_PATTERN)
.build();
}
......@@ -116,6 +116,6 @@ public class RedisOptionRule {
public enum Format {
JSON,
// TEXT will be supported later
TEXT
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论