提交 f539126f authored 作者: 王红亮's avatar 王红亮

新增ftp,RabbitMQ,redis配置信息代码

上级 942b1359
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>datasource-ftp</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-hadoop3-3.1.4-uber</artifactId>
<version>2.3.2</version>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.amazon.redshift</groupId>
<artifactId>redshift-jdbc42</artifactId>
<version>${redshift.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-aws</artifactId>
<version>${hadoop-aws.version}</version>
<exclusions>
<exclusion>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-bundle</artifactId>
<version>${aws-java-sdk-bundle.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-s3redshift</artifactId>
<version>1.0.0-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.ftp;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set;
@AutoService(DataSourceFactory.class)
public class FtpDataSourceFactory implements DataSourceFactory {
private static final String PLUGIN_NAME = "Ftp";
@Override
public String factoryIdentifier() {
return PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
DataSourcePluginInfo s3DatasourcePluginInfo =
DataSourcePluginInfo.builder()
.name(PLUGIN_NAME)
.type(DatasourcePluginTypeEnum.FILE.getCode())
.version("1.0.0")
.supportVirtualTables(false)
.icon("FtpFile")
.build();
return Sets.newHashSet(s3DatasourcePluginInfo);
}
@Override
public DataSourceChannel createChannel() {
return new FtpDatasourceChannel();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.ftp;
import lombok.NonNull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.redshift.s3.HadoopS3AConfiguration;
import java.io.IOException;
import java.util.List;
import java.util.Map;
public class FtpDatasourceChannel implements DataSourceChannel {
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return FtpOptionRule.optionRule();
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return FtpOptionRule.metadataRule();
}
@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> options) {
throw new UnsupportedOperationException("getTables is not supported for Ftp datasource");
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
throw new UnsupportedOperationException("getDatabases is not supported for Ftp datasource");
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
Configuration conf = HadoopFtpAConfiguration.getConfiguration(requestParams);
try (FileSystem fs = FileSystem.get(conf)) {
fs.listStatus(new Path("/"));
return true;
} catch (IOException e) {
throw new DataSourcePluginException(
String.format("check ftp connectivity failed, config is: %s", requestParams), e);
}
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
throw new UnsupportedOperationException(
"getTableFields is not supported for Ftp datasource");
}
@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
throw new UnsupportedOperationException(
"getTableFields is not supported for Ftp datasource");
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.ftp;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
public class FtpOptionRule {
public static final Option<String> HOST =
Options.key("host")
.stringType()
.noDefaultValue()
.withDescription("the default host to use for connections");
public static final Option<Integer> PORT =
Options.key("port")
.intType()
.noDefaultValue()
.withDescription("the default port to use for connections");
public static final Option<String> USERNAME =
Options.key("username")
.stringType()
.noDefaultValue()
.withDescription("the AMQP user name to use when connecting to the broker");
public static final Option<String> PASSWORD =
Options.key("password")
.stringType()
.noDefaultValue()
.withDescription("the password to use when connecting to the broker");
public static final Option<String> PATH =
Options.key("path")
.stringType()
.noDefaultValue()
.withDescription("path");
public static final Option<String> FILE_FORMAT_TYPE =
Options.key("file_format_type")
.stringType()
.noDefaultValue()
.withDescription("file_format_type");
public static OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT).optional(USERNAME, PASSWORD,PATH).build();
}
public static OptionRule metadataRule() {
return OptionRule.builder().required(FILE_FORMAT_TYPE).build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.ftp;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import java.util.Map;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
@Slf4j
public class HadoopFtpAConfiguration {
/* S3 constants */
private static final String HDFS_FTP_IMPL = "org.apache.hadoop.fs.ftp.FTPFileSystem";
private static final String FTP_PROTOCOL = "192.168.1.174";
private static final String DEFAULT_PROTOCOL = "ftp";
private static final String FTP_FORMAT_KEY = "fs.%s.%s";
private static final String HDFS_IMPL_KEY = "impl";
public static Configuration getConfiguration(Map<String, String> FtpOption) {
if (!FtpOption.containsKey(FtpOptionRule.HOST.key())) {
throw new IllegalArgumentException(
"FtpRedshift datasource host is null, please check your config");
}
if (!FtpOption.containsKey(FtpOptionRule.PORT.key())) {
throw new IllegalArgumentException(
"FtpRedshift datasource port is null, please check your config");
}
if (!FtpOption.containsKey(FtpOptionRule.USERNAME.key())) {
throw new IllegalArgumentException(
"FtpRedshift datasource username is null, please check your config");
}
if (!FtpOption.containsKey(FtpOptionRule.PASSWORD.key())) {
throw new IllegalArgumentException(
"FtpRedshift datasource password is null, please check your config");
}
if (!FtpOption.containsKey(FtpOptionRule.PATH.key())) {
throw new IllegalArgumentException(
"FtpRedshift datasource path is null, please check your config");
}
if (!FtpOption.containsKey(FtpOptionRule.FILE_FORMAT_TYPE.key())) {
throw new IllegalArgumentException(
"FtpRedshift datasource file_format_type is null, please check your config");
}
String host = FtpOption.get(FtpOptionRule.HOST.key());
String protocol = DEFAULT_PROTOCOL;
if (host.startsWith(FTP_PROTOCOL)) {
protocol = FTP_PROTOCOL;
}
String fsImpl = protocol.equals(FTP_PROTOCOL) ? HDFS_FTP_IMPL : HDFS_FTP_IMPL;
Configuration hadoopConf = new Configuration();
hadoopConf.set(FS_DEFAULT_NAME_KEY, host);
hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
return hadoopConf;
}
private static String formatKey(String protocol, String key) {
return String.format(FTP_FORMAT_KEY, protocol, key);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>datasource-rabbitmq</artifactId>
<properties>
<rabbitmq.version>5.9.0</rabbitmq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>common-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.auto.service/auto-service -->
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq.version}</version>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.rabbitmq;
import com.rabbitmq.client.*;
import com.rabbitmq.utility.Utility;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.common.Handover;
import java.io.IOException;
import static org.apache.seatunnel.datasource.plugin.rabbitmq.RabbitmqConnectorErrorCode.HANDLE_SHUTDOWN_SIGNAL_FAILED;
@Slf4j
public class QueueingConsumer extends DefaultConsumer {
private final Handover<Delivery> handover;
// When this is non-null the queue is in shutdown mode and nextDelivery should
// throw a shutdown signal exception.
private volatile ShutdownSignalException shutdown;
private volatile ConsumerCancelledException cancelled;
private static final Delivery POISON = new Delivery(null, null, null);
public QueueingConsumer(Channel channel, Handover<Delivery> handover) {
this(channel, Integer.MAX_VALUE, handover);
}
public QueueingConsumer(Channel channel, int capacity, Handover<Delivery> handover) {
super(channel);
this.handover = handover;
}
private void checkShutdown() {
if (shutdown != null) {
throw Utility.fixStackTrace(shutdown);
}
}
@Override
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
shutdown = sig;
try {
handover.produce(POISON);
} catch (InterruptedException | Handover.ClosedException e) {
throw new RabbitmqConnectorException(HANDLE_SHUTDOWN_SIGNAL_FAILED, e);
}
}
@SneakyThrows
@Override
public void handleCancel(String consumerTag) throws IOException {
cancelled = new ConsumerCancelledException();
handover.produce(POISON);
}
@SneakyThrows
@Override
public void handleDelivery(
String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
checkShutdown();
handover.produce(new Delivery(envelope, properties, body));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.rabbitmq;
import com.rabbitmq.client.*;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.common.Handover;
import java.io.IOException;
import java.net.URISyntaxException;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.TimeoutException;
import static org.apache.seatunnel.datasource.plugin.rabbitmq.RabbitmqConnectorErrorCode.*;
@Slf4j
@Data
public class RabbitmqClient {
private final RabbitmqOptionRule config;
private final ConnectionFactory connectionFactory;
private final Connection connection;
private final Channel channel;
public RabbitmqClient(RabbitmqOptionRule config) {
this.config = config;
try {
this.connectionFactory = getConnectionFactory();
this.connection = connectionFactory.newConnection();
this.channel = connection.createChannel();
// set channel prefetch count
if (config.getPrefetchCount() != null) {
channel.basicQos(config.getPrefetchCount(), true);
}
setupQueue();
} catch (Exception e) {
throw new RabbitmqConnectorException(
CREATE_RABBITMQ_CLIENT_FAILED,
String.format(
"Error while create RMQ client with %s at %s",
config.getQueueName(), config.getHost()),
e);
}
}
public Channel getChannel() {
return channel;
}
public DefaultConsumer getQueueingConsumer(Handover<Delivery> handover) {
DefaultConsumer consumer = new QueueingConsumer(channel, handover);
return consumer;
}
public ConnectionFactory getConnectionFactory() {
ConnectionFactory factory = new ConnectionFactory();
if (!StringUtils.isEmpty(config.getUri())) {
try {
factory.setUri(config.getUri());
} catch (URISyntaxException e) {
throw new RabbitmqConnectorException(PARSE_URI_FAILED, e);
} catch (KeyManagementException e) {
// this should never happen
throw new RabbitmqConnectorException(INIT_SSL_CONTEXT_FAILED, e);
} catch (NoSuchAlgorithmException e) {
// this should never happen
throw new RabbitmqConnectorException(SETUP_SSL_FACTORY_FAILED, e);
}
} else {
factory.setHost(config.getHost());
factory.setPort(config.getPort());
factory.setVirtualHost(config.getVirtualHost());
factory.setUsername(config.getUsername());
factory.setPassword(config.getPassword());
}
if (config.getAutomaticRecovery() != null) {
factory.setAutomaticRecoveryEnabled(config.getAutomaticRecovery());
}
if (config.getConnectionTimeout() != null) {
factory.setConnectionTimeout(config.getConnectionTimeout());
}
if (config.getNetworkRecoveryInterval() != null) {
factory.setNetworkRecoveryInterval(config.getNetworkRecoveryInterval());
}
if (config.getRequestedHeartbeat() != null) {
factory.setRequestedHeartbeat(config.getRequestedHeartbeat());
}
if (config.getTopologyRecovery() != null) {
factory.setTopologyRecoveryEnabled(config.getTopologyRecovery());
}
if (config.getRequestedChannelMax() != null) {
factory.setRequestedChannelMax(config.getRequestedChannelMax());
}
if (config.getRequestedFrameMax() != null) {
factory.setRequestedFrameMax(config.getRequestedFrameMax());
}
return factory;
}
public void write(byte[] msg) {
try {
if (StringUtils.isEmpty(config.getRoutingKey())) {
channel.basicPublish("", config.getQueueName(), null, msg);
} else {
// not support set returnListener
channel.basicPublish(
config.getExchange(), config.getRoutingKey(), false, false, null, msg);
}
} catch (IOException e) {
if (config.isLogFailuresOnly()) {
log.error(
"Cannot send RMQ message {} at {}",
config.getQueueName(),
config.getHost(),
e);
} else {
throw new RabbitmqConnectorException(
SEND_MESSAGE_FAILED,
String.format(
"Cannot send RMQ message %s at %s",
config.getQueueName(), config.getHost()),
e);
}
}
}
public void close() {
Exception t = null;
try {
if (channel != null) {
channel.close();
}
} catch (IOException | TimeoutException e) {
t = e;
}
try {
if (connection != null) {
connection.close();
}
} catch (IOException e) {
if (t != null) {
log.warn(
"Both channel and connection closing failed. Logging channel exception and failing with connection exception",
t);
}
t = e;
}
if (t != null) {
throw new RabbitmqConnectorException(
CLOSE_CONNECTION_FAILED,
String.format(
"Error while closing RMQ connection with %s at %s",
config.getQueueName(), config.getHost()),
t);
}
}
protected void setupQueue() throws IOException {
if (config.getQueueName() != null) {
declareQueueDefaults(channel, config.getQueueName());
}
}
private void declareQueueDefaults(Channel channel, String queueName) throws IOException {
channel.queueDeclare(queueName, true, false, false, null);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.rabbitmq;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
public enum RabbitmqConnectorErrorCode implements SeaTunnelErrorCode {
HANDLE_SHUTDOWN_SIGNAL_FAILED("RABBITMQ-01", "handle queue consumer shutdown signal failed"),
CREATE_RABBITMQ_CLIENT_FAILED("RABBITMQ-02", "create rabbitmq client failed"),
CLOSE_CONNECTION_FAILED("RABBITMQ-03", "close connection failed"),
SEND_MESSAGE_FAILED("RABBITMQ-04", "send messages failed"),
MESSAGE_ACK_FAILED(
"RABBITMQ-05", "messages could not be acknowledged during checkpoint creation"),
MESSAGE_ACK_REJECTED("RABBITMQ-06", "messages could not be acknowledged with basicReject"),
PARSE_URI_FAILED("RABBITMQ-07", "parse uri failed"),
INIT_SSL_CONTEXT_FAILED("RABBITMQ-08", "initialize ssl context failed"),
SETUP_SSL_FACTORY_FAILED("RABBITMQ-09", "setup ssl factory failed");
private final String code;
private final String description;
RabbitmqConnectorErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return code;
}
@Override
public String getDescription() {
return description;
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.rabbitmq;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
public class RabbitmqConnectorException extends SeaTunnelRuntimeException {
public RabbitmqConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
super(seaTunnelErrorCode, errorMessage);
}
public RabbitmqConnectorException(
SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
super(seaTunnelErrorCode, errorMessage, cause);
}
public RabbitmqConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
super(seaTunnelErrorCode, cause);
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.rabbitmq;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static com.google.common.base.Preconditions.checkArgument;
@Slf4j
public class RabbitmqDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default";
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return RabbitmqOptionRule.optionRule();
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return RabbitmqOptionRule.metadataRule();
}
@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> option) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default");
try {
if (StringUtils.isNotBlank(database)) {
return Arrays.asList(database);
}
} catch (Exception ex) {
throw new DataSourcePluginException(
"check redis connectivity failed, " + ex.getMessage(), ex);
}
return null;
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
return DEFAULT_DATABASES;
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
if (requestParams.isEmpty()) {
throw new SeaTunnelException("requestParmas 为空!");
}
try {
// just test the connection
return true;
// return StringUtils.isNotEmpty(RabbitmqClient.getTopi);
} catch (Exception ex) {
throw new DataSourcePluginException(
"check mqtt connectivity failed, " + ex.getMessage(), ex);
}
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default");
return Collections.emptyList();
}
@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default");
return Collections.emptyMap();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.rabbitmq;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set;
@AutoService(DataSourceFactory.class)
public class RabbitmqDataSourceFactory implements DataSourceFactory {
public static final String RABBITMQ_PLUGIN_NAME = "Rabbitmq";
public static final String RABBITMQ_PLUGIN_ICON = "rabbitmq";
public static final String RABBITMQ_PLUGIN_VERSION = "1.0.0";
@Override
public String factoryIdentifier() {
return RABBITMQ_PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
return Sets.newHashSet(
DataSourcePluginInfo.builder()
.name(RABBITMQ_PLUGIN_NAME)
.icon(RABBITMQ_PLUGIN_ICON)
.version(RABBITMQ_PLUGIN_VERSION)
.supportVirtualTables(false)
.type(DatasourcePluginTypeEnum.NO_STRUCTURED.getCode())
.build());
}
@Override
public DataSourceChannel createChannel() {
return new RabbitmqDataSourceChannel();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.rabbitmq;
import lombok.Data;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Data
public class RabbitmqOptionRule {
private String host;
private Integer port;
private String virtualHost;
private String username;
private String password;
private String uri;
private Integer networkRecoveryInterval;
private Boolean automaticRecovery;
private Boolean topologyRecovery;
private Integer connectionTimeout;
private Integer requestedChannelMax;
private Integer requestedFrameMax;
private Integer requestedHeartbeat;
private Integer prefetchCount;
private long deliveryTimeout;
private String queueName;
private String routingKey;
private boolean logFailuresOnly = false;
private String exchange = "";
private boolean forE2ETesting = false;
private boolean usesCorrelationId = false;
public RabbitmqOptionRule(String host, Integer port, String virtualHost, String username, String password, String uri, Integer networkRecoveryInterval, Boolean automaticRecovery, Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax, Integer requestedFrameMax, Integer requestedHeartbeat, Integer prefetchCount, long deliveryTimeout, String queueName, String routingKey, boolean logFailuresOnly, String exchange, boolean forE2ETesting, boolean usesCorrelationId) {
this.host = host;
this.port = port;
this.virtualHost = virtualHost;
this.username = username;
this.password = password;
this.uri = uri;
this.networkRecoveryInterval = networkRecoveryInterval;
this.automaticRecovery = automaticRecovery;
this.topologyRecovery = topologyRecovery;
this.connectionTimeout = connectionTimeout;
this.requestedChannelMax = requestedChannelMax;
this.requestedFrameMax = requestedFrameMax;
this.requestedHeartbeat = requestedHeartbeat;
this.prefetchCount = prefetchCount;
this.deliveryTimeout = deliveryTimeout;
this.queueName = queueName;
this.routingKey = routingKey;
this.logFailuresOnly = logFailuresOnly;
this.exchange = exchange;
this.forE2ETesting = forE2ETesting;
this.usesCorrelationId = usesCorrelationId;
}
private final Map<String, Object> sinkOptionProps = new HashMap<>();
public static final Option<String> HOST =
Options.key("host")
.stringType()
.noDefaultValue()
.withDescription("the default host to use for connections");
public static final Option<Integer> PORT =
Options.key("port")
.intType()
.noDefaultValue()
.withDescription("the default port to use for connections");
public static final Option<String> VIRTUAL_HOST =
Options.key("virtual_host")
.stringType()
.noDefaultValue()
.withDescription("the virtual host to use when connecting to the broker");
public static final Option<String> USERNAME =
Options.key("username")
.stringType()
.noDefaultValue()
.withDescription("the AMQP user name to use when connecting to the broker");
public static final Option<String> PASSWORD =
Options.key("password")
.stringType()
.noDefaultValue()
.withDescription("the password to use when connecting to the broker");
public static final Option<String> QUEUE_NAME =
Options.key("queue_name")
.stringType()
.noDefaultValue()
.withDescription("the queue to write the message to");
public static final Option<String> URL =
Options.key("url")
.stringType()
.noDefaultValue()
.withDescription(
"convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host");
public static final Option<Integer> NETWORK_RECOVERY_INTERVAL =
Options.key("network_recovery_interval")
.intType()
.noDefaultValue()
.withDescription(
"how long will automatic recovery wait before attempting to reconnect, in ms");
public static final Option<Boolean> AUTOMATIC_RECOVERY_ENABLED =
Options.key("AUTOMATIC_RECOVERY_ENABLED")
.booleanType()
.noDefaultValue()
.withDescription("if true, enables connection recovery");
public static final Option<Boolean> TOPOLOGY_RECOVERY_ENABLED =
Options.key("topology_recovery_enabled")
.booleanType()
.noDefaultValue()
.withDescription("if true, enables topology recovery");
public static final Option<Integer> CONNECTION_TIMEOUT =
Options.key("connection_timeout")
.intType()
.noDefaultValue()
.withDescription("connection TCP establishment timeout in milliseconds");
public static final Option<Integer> REQUESTED_CHANNEL_MAX =
Options.key("requested_channel_max")
.intType()
.noDefaultValue()
.withDescription("initially requested maximum channel number");
public static final Option<Integer> REQUESTED_FRAME_MAX =
Options.key("requested_frame_max")
.intType()
.noDefaultValue()
.withDescription("the requested maximum frame size");
public static final Option<Integer> REQUESTED_HEARTBEAT =
Options.key("requested_heartbeat")
.intType()
.noDefaultValue()
.withDescription("the requested heartbeat timeout");
public static final Option<Long> PREFETCH_COUNT =
Options.key("prefetch_count")
.longType()
.noDefaultValue()
.withDescription(
"prefetchCount the max number of messages to receive without acknowledgement\n");
public static final Option<Integer> DELIVERY_TIMEOUT =
Options.key("delivery_timeout")
.intType()
.noDefaultValue()
.withDescription("deliveryTimeout maximum wait time");
public static final Option<String> ROUTING_KEY =
Options.key("routing_key")
.stringType()
.noDefaultValue()
.withDescription("the routing key to publish the message to");
public static final Option<String> EXCHANGE =
Options.key("exchange")
.stringType()
.noDefaultValue()
.withDescription("the exchange to publish the message to");
public static final Option<Boolean> FOR_E2E_TESTING =
Options.key("for_e2e_testing")
.booleanType()
.noDefaultValue()
.withDescription("use to recognize E2E mode");
public static final Option<Map<String, String>> RABBITMQ_CONFIG =
Options.key("rabbitmq.config")
.mapType()
.defaultValue(Collections.emptyMap())
.withDescription(
"In addition to the above parameters that must be specified by the RabbitMQ client, the user can also specify multiple non-mandatory parameters for the client, "
+ "covering [all the parameters specified in the official RabbitMQ document](https://www.rabbitmq.com/configure.html).");
public static final Option<Boolean> USE_CORRELATION_ID =
Options.key("use_correlation_id")
.booleanType()
.noDefaultValue()
.withDescription(
"Whether the messages received are supplied with a unique"
+ "id to deduplicate messages (in case of failed acknowledgments).");
public static OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT).optional(USERNAME, PASSWORD).build();
}
public static OptionRule metadataRule() {
return OptionRule.builder().required(VIRTUAL_HOST).build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.rabbitmq;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import java.util.Map;
import java.util.Properties;
import static com.google.common.base.Preconditions.checkArgument;
public class RabbitmqRequestParamsUtils {
public static Properties parsePropertiesFromRequestParams(Map<String, String> requestParams) {
checkArgument(
requestParams.containsKey(RabbitmqOptionRule.HOST.key()),
String.format("Missing %s in requestParams", RabbitmqOptionRule.HOST.key()));
final Properties properties = new Properties();
properties.put(RabbitmqOptionRule.HOST.key(), requestParams.get(RabbitmqOptionRule.HOST.key()));
if (requestParams.containsKey(RabbitmqOptionRule.PORT.key())) {
Config configObject =
ConfigFactory.parseString(requestParams.get(RabbitmqOptionRule.PORT.key()));
configObject
.entrySet()
.forEach(
entry -> {
properties.put(
entry.getKey(), entry.getValue().unwrapped().toString());
});
}
return properties;
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-datasource-plugins</artifactId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<artifactId>datasource-redis</artifactId>
<properties>
<jedis.version>4.2.2</jedis.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>datasource-plugins-api</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>common-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.auto.service/auto-service -->
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
</dependencies>
</project>
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.redis;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import redis.clients.jedis.Jedis;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.*;
import static com.google.common.base.Preconditions.checkArgument;
@Slf4j
public class RedisDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default";
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return RedisOptionRule.optionRule();
}
@Override
public OptionRule getDatasourceMetadataFieldsByDataSourceName(@NonNull String pluginName) {
return RedisOptionRule.metadataRule();
}
@Override
public List<String> getTables(
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> option) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default");
try {
if (StringUtils.isNotBlank(database)) {
return Arrays.asList(database);
}
} catch (Exception ex) {
throw new DataSourcePluginException("check redis connectivity failed, " + ex.getMessage(), ex);
}
return null;
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
return DEFAULT_DATABASES;
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
try (Connection ignored = getConnection(requestParams)) {
return true;
} catch (Exception e) {
throw new DataSourcePluginException("check jdbc connectivity failed", e);
}
}
private Connection getConnection( Map<String, String> requestParams)
throws Exception {
return getConnection(requestParams);
}
@Override
public List<TableField> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default");
return Collections.emptyList();
}
@Override
public Map<String, List<TableField>> getTableFields(
@NonNull String pluginName,
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default");
return Collections.emptyMap();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.redis;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set;
@AutoService(DataSourceFactory.class)
public class RedisDataSourceFactory implements DataSourceFactory {
public static final String REDIS_PLUGIN_NAME = "Redis";
public static final String REDIS_PLUGIN_ICON = "redis";
public static final String REDIS_PLUGIN_VERSION = "1.0.0";
@Override
public String factoryIdentifier() {
return REDIS_PLUGIN_NAME;
}
@Override
public Set<DataSourcePluginInfo> supportedDataSources() {
return Sets.newHashSet(
DataSourcePluginInfo.builder()
.name(REDIS_PLUGIN_NAME)
.icon(REDIS_PLUGIN_ICON)
.version(REDIS_PLUGIN_VERSION)
.supportVirtualTables(false)
.type(DatasourcePluginTypeEnum.NO_STRUCTURED.getCode())
.build());
}
@Override
public DataSourceChannel createChannel() {
return new RedisDataSourceChannel();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.redis;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
public class RedisOptionRule {
public enum RedisMode {
SINGLE,
CLUSTER;
}
public enum HashKeyParseMode {
ALL,
KV;
}
public static final Option<String> HOST =
Options.key("host")
.stringType()
.noDefaultValue()
.withDescription("redis hostname or ip");
public static final Option<String> PORT =
Options.key("port").stringType().noDefaultValue().withDescription("redis port");
public static final Option<String> AUTH =
Options.key("auth")
.stringType()
.noDefaultValue()
.withDescription(
"redis authentication password, you need it when you connect to an encrypted cluster");
public static final Option<String> USER =
Options.key("user")
.stringType()
.noDefaultValue()
.withDescription(
"redis authentication user, you need it when you connect to an encrypted cluster");
public static final Option<String> KEY_PATTERN =
Options.key("keys")
.stringType()
.noDefaultValue()
.withDescription(
"keys pattern, redis source connector support fuzzy key matching, user needs to ensure that the matched keys are the same type");
public static final Option<String> KEY =
Options.key("key")
.stringType()
.noDefaultValue()
.withDescription("The value of key you want to write to redis.");
public static final Option<String> DATA_TYPE =
Options.key("data_type")
.stringType()
.noDefaultValue()
.withDescription("redis data types, support key hash list set zset.");
public static final Option<RedisOptionRule.Format> FORMAT =
Options.key("format")
.enumType(RedisOptionRule.Format.class)
.defaultValue(RedisOptionRule.Format.JSON)
.withDescription(
"the format of upstream data, now only support json and text, default json.");
public static final Option<RedisOptionRule.RedisMode> MODE =
Options.key("mode")
.enumType(RedisOptionRule.RedisMode.class)
.defaultValue(RedisMode.SINGLE)
.withDescription(
"redis mode, support single or cluster, default value is single");
public static final Option<String> NODES =
Options.key("nodes")
.stringType()
.noDefaultValue()
.withDescription(
"redis nodes information, used in cluster mode, must like as the following format: [host1:port1, host2:port2]");
public static final Option<RedisOptionRule.HashKeyParseMode> HASH_KEY_PARSE_MODE =
Options.key("hash_key_parse_mode")
.enumType(RedisOptionRule.HashKeyParseMode.class)
.defaultValue(HashKeyParseMode.ALL)
.withDescription(
"hash key parse mode, support all or kv, default value is all");
public static OptionRule optionRule() {
return OptionRule.builder().required(HOST,PORT,KEY).optional(USER,AUTH,KEY_PATTERN,FORMAT).build();
}
public static OptionRule metadataRule() {
return OptionRule.builder().required(DATA_TYPE).build();
}
public enum Format {
JSON,
// TEXT will be supported later
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.redis;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import java.util.Map;
import java.util.Properties;
import static com.google.common.base.Preconditions.checkArgument;
public class RedisRequestParamsUtils {
public static Properties parsePropertiesFromRequestParams(Map<String, String> requestParams) {
checkArgument(
requestParams.containsKey(RedisOptionRule.HOST.key()),
String.format("Missing %s in requestParams", RedisOptionRule.HOST.key()));
final Properties properties = new Properties();
properties.put(RedisOptionRule.HOST.key(), requestParams.get(RedisOptionRule.HOST.key()));
if (requestParams.containsKey(RedisOptionRule.PORT.key())) {
Config configObject =
ConfigFactory.parseString(requestParams.get(RedisOptionRule.PORT.key()));
configObject
.entrySet()
.forEach(
entry -> {
properties.put(
entry.getKey(), entry.getValue().unwrapped().toString());
});
}
return properties;
}
}
......@@ -47,6 +47,9 @@
<module>datasource-jdbc-tidb</module>
<module>datasource-mqtt</module>
<module>datasource-opcua</module>
<module>datasource-redis</module>
<module>datasource-rabbitmq</module>
<module>datasource-ftp</module>
</modules>
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论