提交 23799810 authored 作者: 宋勇's avatar 宋勇

修改 ftpclient 测试

上级 2a3cf766
...@@ -18,12 +18,9 @@ ...@@ -18,12 +18,9 @@
package org.apache.seatunnel.datasource.plugin.ftp; package org.apache.seatunnel.datasource.plugin.ftp;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import java.util.Map; import java.util.Map;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
@Slf4j @Slf4j
public class FtpAConfiguration { public class FtpAConfiguration {
......
...@@ -3,30 +3,24 @@ package org.apache.seatunnel.datasource.plugin.ftp; ...@@ -3,30 +3,24 @@ package org.apache.seatunnel.datasource.plugin.ftp;
import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply; import org.apache.commons.net.ftp.FTPReply;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.ftp.FTPInputStream;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import io.netty.util.internal.StringUtil;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException; import java.net.ConnectException;
public class FtpClientService { public class FtpClientService {
protected FileSystem.Statistics statistics;
public static final String FS_FTP_HOST = "192.168.1.174";
public static final Integer FS_FTP_HOST_PORT = 21;
public static final String FS_FTP_USER_PREFIX = "ftp.";
public static final String FS_FTP_PASSWORD_PREFIX = "ftp.";
public static final String E_SAME_DIRECTORY_ONLY = "/ftp";
public static FTPClient connect() throws IOException { public static FTPClient connect(FtpConfiguration conf) throws IOException {
FTPClient client = null; FTPClient client = null;
String host = FS_FTP_HOST; String host = conf.getHost();
int port = FS_FTP_HOST_PORT; int port =
String user = FS_FTP_USER_PREFIX; !StringUtil.isNullOrEmpty(conf.getPort()) ? Integer.parseInt(conf.getPort()) : 21;
String password = FS_FTP_PASSWORD_PREFIX; String user = conf.getUser();
String password = conf.getPassword();
client = new FTPClient(); client = new FTPClient();
client.connect(host, port); client.connect(host, port);
int reply = client.getReplyCode(); int reply = client.getReplyCode();
...@@ -53,26 +47,4 @@ public class FtpClientService { ...@@ -53,26 +47,4 @@ public class FtpClientService {
return client; return client;
} }
public FSDataInputStream open(String path) throws IOException {
FTPClient client = connect();
// Change to parent directory on the
// server. Only then can we read the
// file
// on the server by opening up an InputStream. As a side effect the working
// directory on the server is changed to the parent directory of the file.
// The FTP client connection is closed when close() is called on the
// FSDataInputStream.
client.changeWorkingDirectory(path);
InputStream is = client.retrieveFileStream(path);
FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is, client, statistics));
if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
// The ftpClient is an inconsistent state. Must close the stream
// which in turn will logout and disconnect from FTP server
fis.close();
throw new IOException("Unable to open file: " + path + ", Aborting");
}
return fis;
}
} }
...@@ -8,10 +8,10 @@ public class FtpConfiguration { ...@@ -8,10 +8,10 @@ public class FtpConfiguration {
private String path; private String path;
private String type; private String type;
public FtpConfiguration() { public FtpConfiguration() {}
}
public FtpConfiguration(String host, String port, String user, String password, String path, String type) { public FtpConfiguration(
String host, String port, String user, String password, String path, String type) {
this.host = host; this.host = host;
this.port = port; this.port = port;
this.user = user; this.user = user;
...@@ -70,13 +70,25 @@ public class FtpConfiguration { ...@@ -70,13 +70,25 @@ public class FtpConfiguration {
@Override @Override
public String toString() { public String toString() {
return "FtpConfiguration{" + return "FtpConfiguration{"
"host='" + host + '\'' + + "host='"
", port='" + port + '\'' + + host
", user='" + user + '\'' + + '\''
", password='" + password + '\'' + + ", port='"
", path='" + path + '\'' + + port
", type='" + type + '\'' + + '\''
'}'; + ", user='"
+ user
+ '\''
+ ", password='"
+ password
+ '\''
+ ", path='"
+ path
+ '\''
+ ", type='"
+ type
+ '\''
+ '}';
} }
} }
...@@ -17,17 +17,15 @@ ...@@ -17,17 +17,15 @@
package org.apache.seatunnel.datasource.plugin.ftp; package org.apache.seatunnel.datasource.plugin.ftp;
import lombok.NonNull;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField; import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import java.io.IOException; import org.apache.commons.net.ftp.FTPClient;
import lombok.NonNull;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
...@@ -48,25 +46,27 @@ public class FtpDatasourceChannel implements DataSourceChannel { ...@@ -48,25 +46,27 @@ public class FtpDatasourceChannel implements DataSourceChannel {
@NonNull String pluginName, @NonNull Map<String, String> requestParams) { @NonNull String pluginName, @NonNull Map<String, String> requestParams) {
FtpConfiguration conf = FtpAConfiguration.getConfiguration(requestParams); FtpConfiguration conf = FtpAConfiguration.getConfiguration(requestParams);
try { try {
FTPClient ftpClient = FtpClientService.connect(); FTPClient ftpClient = FtpClientService.connect(conf);
if (Objects.isNull(conf)) { if (Objects.isNull(conf)) {
throw new DataSourcePluginException( throw new DataSourcePluginException(
String.format("check ftp connectivity failed, config is: %s", requestParams)); String.format(
"check ftp connectivity failed, config is: %s", requestParams));
} }
if (ftpClient.changeWorkingDirectory(requestParams.get(FtpOptionRule.PATH.key()))) { if (ftpClient.changeWorkingDirectory(requestParams.get(FtpOptionRule.PATH.key()))) {
return true; return true;
} else { } else {
throw new DataSourcePluginException( throw new DataSourcePluginException(
String.format("check ftp connectivity failed, config is: %s", requestParams)); String.format(
"check ftp connectivity failed, config is: %s", requestParams));
} }
} catch (Exception e) { } catch (Exception e) {
throw new DataSourcePluginException( throw new DataSourcePluginException(
String.format("check ftp connectivity failed, config is: %s", requestParams)); String.format("check ftp connectivity failed, config is: %s", requestParams));
} }
// return true; // return true;
} }
@Override @Override
......
...@@ -46,10 +46,7 @@ public class FtpOptionRule { ...@@ -46,10 +46,7 @@ public class FtpOptionRule {
.noDefaultValue() .noDefaultValue()
.withDescription("the password to use when connecting to the broker"); .withDescription("the password to use when connecting to the broker");
public static final Option<String> PATH = public static final Option<String> PATH =
Options.key("path") Options.key("path").stringType().noDefaultValue().withDescription("path");
.stringType()
.noDefaultValue()
.withDescription("path");
public static final Option<FileFormat> FILE_FORMAT_TYPE = public static final Option<FileFormat> FILE_FORMAT_TYPE =
Options.key("file_format_type") Options.key("file_format_type")
.enumType(FileFormat.class) .enumType(FileFormat.class)
...@@ -57,14 +54,10 @@ public class FtpOptionRule { ...@@ -57,14 +54,10 @@ public class FtpOptionRule {
.withDescription("file_format_type"); .withDescription("file_format_type");
public static final Option<String> DELIMITER = public static final Option<String> DELIMITER =
Options.key("delimiter") Options.key("delimiter").stringType().noDefaultValue().withDescription("delimiter");
.stringType()
.noDefaultValue()
.withDescription("delimiter");
public static OptionRule optionRule() { public static OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT).optional(USERNAME, PASSWORD,PATH).build(); return OptionRule.builder().required(HOST, PORT).optional(USERNAME, PASSWORD, PATH).build();
} }
public static OptionRule metadataRule() { public static OptionRule metadataRule() {
......
...@@ -17,11 +17,12 @@ ...@@ -17,11 +17,12 @@
package org.apache.seatunnel.datasource.plugin.rabbitmq; package org.apache.seatunnel.datasource.plugin.rabbitmq;
import org.apache.seatunnel.common.Handover;
import com.rabbitmq.client.*; import com.rabbitmq.client.*;
import com.rabbitmq.utility.Utility; import com.rabbitmq.utility.Utility;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.seatunnel.common.Handover;
import java.io.IOException; import java.io.IOException;
......
...@@ -17,11 +17,13 @@ ...@@ -17,11 +17,13 @@
package org.apache.seatunnel.datasource.plugin.rabbitmq; package org.apache.seatunnel.datasource.plugin.rabbitmq;
import org.apache.seatunnel.common.Handover;
import org.apache.commons.lang3.StringUtils;
import com.rabbitmq.client.*; import com.rabbitmq.client.*;
import lombok.Data; import lombok.Data;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.common.Handover;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException; import java.net.URISyntaxException;
...@@ -39,8 +41,6 @@ public class RabbitmqClient { ...@@ -39,8 +41,6 @@ public class RabbitmqClient {
private final Connection connection; private final Connection connection;
private final Channel channel; private final Channel channel;
public RabbitmqClient(RabbitmqOptionRule config) { public RabbitmqClient(RabbitmqOptionRule config) {
this.config = config; this.config = config;
try { try {
......
...@@ -17,15 +17,17 @@ ...@@ -17,15 +17,17 @@
package org.apache.seatunnel.datasource.plugin.rabbitmq; package org.apache.seatunnel.datasource.plugin.rabbitmq;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField; import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
...@@ -38,7 +40,6 @@ public class RabbitmqDataSourceChannel implements DataSourceChannel { ...@@ -38,7 +40,6 @@ public class RabbitmqDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default"; private static final String DATABASE = "default";
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return RabbitmqOptionRule.optionRule(); return RabbitmqOptionRule.optionRule();
...@@ -55,7 +56,8 @@ public class RabbitmqDataSourceChannel implements DataSourceChannel { ...@@ -55,7 +56,8 @@ public class RabbitmqDataSourceChannel implements DataSourceChannel {
Map<String, String> requestParams, Map<String, String> requestParams,
String database, String database,
Map<String, String> option) { Map<String, String> option) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default"); // checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
try { try {
if (StringUtils.isNotBlank(database)) { if (StringUtils.isNotBlank(database)) {
return Arrays.asList(database); return Arrays.asList(database);
...@@ -82,7 +84,7 @@ public class RabbitmqDataSourceChannel implements DataSourceChannel { ...@@ -82,7 +84,7 @@ public class RabbitmqDataSourceChannel implements DataSourceChannel {
try { try {
// just test the connection // just test the connection
return true; return true;
// return StringUtils.isNotEmpty(RabbitmqClient.getTopi); // return StringUtils.isNotEmpty(RabbitmqClient.getTopi);
} catch (Exception ex) { } catch (Exception ex) {
throw new DataSourcePluginException( throw new DataSourcePluginException(
"check mqtt connectivity failed, " + ex.getMessage(), ex); "check mqtt connectivity failed, " + ex.getMessage(), ex);
...@@ -108,5 +110,4 @@ public class RabbitmqDataSourceChannel implements DataSourceChannel { ...@@ -108,5 +110,4 @@ public class RabbitmqDataSourceChannel implements DataSourceChannel {
checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default"); checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default");
return Collections.emptyMap(); return Collections.emptyMap();
} }
} }
...@@ -17,11 +17,12 @@ ...@@ -17,11 +17,12 @@
package org.apache.seatunnel.datasource.plugin.rabbitmq; package org.apache.seatunnel.datasource.plugin.rabbitmq;
import lombok.Data;
import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.configuration.util.OptionRule;
import lombok.Data;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
...@@ -52,7 +53,28 @@ public class RabbitmqOptionRule { ...@@ -52,7 +53,28 @@ public class RabbitmqOptionRule {
private boolean forE2ETesting = false; private boolean forE2ETesting = false;
private boolean usesCorrelationId = false; private boolean usesCorrelationId = false;
public RabbitmqOptionRule(String host, Integer port, String virtualHost, String username, String password, String uri, Integer networkRecoveryInterval, Boolean automaticRecovery, Boolean topologyRecovery, Integer connectionTimeout, Integer requestedChannelMax, Integer requestedFrameMax, Integer requestedHeartbeat, Integer prefetchCount, long deliveryTimeout, String queueName, String routingKey, boolean logFailuresOnly, String exchange, boolean forE2ETesting, boolean usesCorrelationId) { public RabbitmqOptionRule(
String host,
Integer port,
String virtualHost,
String username,
String password,
String uri,
Integer networkRecoveryInterval,
Boolean automaticRecovery,
Boolean topologyRecovery,
Integer connectionTimeout,
Integer requestedChannelMax,
Integer requestedFrameMax,
Integer requestedHeartbeat,
Integer prefetchCount,
long deliveryTimeout,
String queueName,
String routingKey,
boolean logFailuresOnly,
String exchange,
boolean forE2ETesting,
boolean usesCorrelationId) {
this.host = host; this.host = host;
this.port = port; this.port = port;
this.virtualHost = virtualHost; this.virtualHost = virtualHost;
......
...@@ -32,7 +32,8 @@ public class RabbitmqRequestParamsUtils { ...@@ -32,7 +32,8 @@ public class RabbitmqRequestParamsUtils {
requestParams.containsKey(RabbitmqOptionRule.HOST.key()), requestParams.containsKey(RabbitmqOptionRule.HOST.key()),
String.format("Missing %s in requestParams", RabbitmqOptionRule.HOST.key())); String.format("Missing %s in requestParams", RabbitmqOptionRule.HOST.key()));
final Properties properties = new Properties(); final Properties properties = new Properties();
properties.put(RabbitmqOptionRule.HOST.key(), requestParams.get(RabbitmqOptionRule.HOST.key())); properties.put(
RabbitmqOptionRule.HOST.key(), requestParams.get(RabbitmqOptionRule.HOST.key()));
if (requestParams.containsKey(RabbitmqOptionRule.PORT.key())) { if (requestParams.containsKey(RabbitmqOptionRule.PORT.key())) {
Config configObject = Config configObject =
ConfigFactory.parseString(requestParams.get(RabbitmqOptionRule.PORT.key())); ConfigFactory.parseString(requestParams.get(RabbitmqOptionRule.PORT.key()));
......
...@@ -17,19 +17,17 @@ ...@@ -17,19 +17,17 @@
package org.apache.seatunnel.datasource.plugin.redis; package org.apache.seatunnel.datasource.plugin.redis;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField; import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import redis.clients.jedis.Jedis;
import org.apache.commons.lang3.StringUtils;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException;
import java.util.*; import java.util.*;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
...@@ -39,7 +37,6 @@ public class RedisDataSourceChannel implements DataSourceChannel { ...@@ -39,7 +37,6 @@ public class RedisDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default"; private static final String DATABASE = "default";
@Override @Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) { public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return RedisOptionRule.optionRule(); return RedisOptionRule.optionRule();
...@@ -56,13 +53,15 @@ public class RedisDataSourceChannel implements DataSourceChannel { ...@@ -56,13 +53,15 @@ public class RedisDataSourceChannel implements DataSourceChannel {
Map<String, String> requestParams, Map<String, String> requestParams,
String database, String database,
Map<String, String> option) { Map<String, String> option) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default"); // checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
try { try {
if (StringUtils.isNotBlank(database)) { if (StringUtils.isNotBlank(database)) {
return Arrays.asList(database); return Arrays.asList(database);
} }
} catch (Exception ex) { } catch (Exception ex) {
throw new DataSourcePluginException("check redis connectivity failed, " + ex.getMessage(), ex); throw new DataSourcePluginException(
"check redis connectivity failed, " + ex.getMessage(), ex);
} }
return null; return null;
} }
...@@ -83,8 +82,7 @@ public class RedisDataSourceChannel implements DataSourceChannel { ...@@ -83,8 +82,7 @@ public class RedisDataSourceChannel implements DataSourceChannel {
} }
} }
private Connection getConnection( Map<String, String> requestParams) private Connection getConnection(Map<String, String> requestParams) throws Exception {
throws Exception {
return getConnection(requestParams); return getConnection(requestParams);
} }
...@@ -107,5 +105,4 @@ public class RedisDataSourceChannel implements DataSourceChannel { ...@@ -107,5 +105,4 @@ public class RedisDataSourceChannel implements DataSourceChannel {
checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default"); checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default");
return Collections.emptyMap(); return Collections.emptyMap();
} }
} }
...@@ -104,7 +104,10 @@ public class RedisOptionRule { ...@@ -104,7 +104,10 @@ public class RedisOptionRule {
"hash key parse mode, support all or kv, default value is all"); "hash key parse mode, support all or kv, default value is all");
public static OptionRule optionRule() { public static OptionRule optionRule() {
return OptionRule.builder().required(HOST,PORT,KEY).optional(USER,AUTH,KEY_PATTERN,FORMAT).build(); return OptionRule.builder()
.required(HOST, PORT, KEY)
.optional(USER, AUTH, KEY_PATTERN, FORMAT)
.build();
} }
public static OptionRule metadataRule() { public static OptionRule metadataRule() {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论