提交 b981c513 authored 作者: 王红亮's avatar 王红亮

更新ftp配置

上级 e30a5524
...@@ -25,16 +25,9 @@ import java.util.Map; ...@@ -25,16 +25,9 @@ import java.util.Map;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY; import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
@Slf4j @Slf4j
public class HadoopFtpAConfiguration { public class FtpAConfiguration {
/* S3 constants */ public static FtpConfiguration getConfiguration(Map<String, String> FtpOption) {
private static final String HDFS_FTP_IMPL = "org.apache.hadoop.fs.ftp.FTPFileSystem";
private static final String FTP_PROTOCOL = "192.168.1.174";
private static final String DEFAULT_PROTOCOL = "ftp";
private static final String FTP_FORMAT_KEY = "fs.%s.%s";
private static final String HDFS_IMPL_KEY = "impl";
public static Configuration getConfiguration(Map<String, String> FtpOption) {
if (!FtpOption.containsKey(FtpOptionRule.HOST.key())) { if (!FtpOption.containsKey(FtpOptionRule.HOST.key())) {
throw new IllegalArgumentException( throw new IllegalArgumentException(
...@@ -60,20 +53,13 @@ public class HadoopFtpAConfiguration { ...@@ -60,20 +53,13 @@ public class HadoopFtpAConfiguration {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"FtpRedshift datasource file_format_type is null, please check your config"); "FtpRedshift datasource file_format_type is null, please check your config");
} }
String host = FtpOption.get(FtpOptionRule.HOST.key()); FtpConfiguration ftpConfiguration = new FtpConfiguration();
ftpConfiguration.setHost(FtpOptionRule.HOST.key());
String protocol = DEFAULT_PROTOCOL; ftpConfiguration.setPort(FtpOptionRule.PORT.key());
if (host.startsWith(FTP_PROTOCOL)) { ftpConfiguration.setUser(FtpOptionRule.USERNAME.key());
protocol = FTP_PROTOCOL; ftpConfiguration.setPassword(FtpOptionRule.PASSWORD.key());
} ftpConfiguration.setPath(FtpOptionRule.PATH.key());
String fsImpl = protocol.equals(FTP_PROTOCOL) ? HDFS_FTP_IMPL : HDFS_FTP_IMPL;
Configuration hadoopConf = new Configuration();
hadoopConf.set(FS_DEFAULT_NAME_KEY, host);
hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
return hadoopConf;
}
private static String formatKey(String protocol, String key) { return ftpConfiguration;
return String.format(FTP_FORMAT_KEY, protocol, key);
} }
} }
package org.apache.seatunnel.datasource.plugin.ftp;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.ftp.FTPInputStream;
import org.apache.hadoop.net.NetUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.ConnectException;
public class FtpClientService {
protected FileSystem.Statistics statistics;
public static final String FS_FTP_HOST = "192.168.1.174";
public static final Integer FS_FTP_HOST_PORT = 21;
public static final String FS_FTP_USER_PREFIX = "ftp.";
public static final String FS_FTP_PASSWORD_PREFIX = "ftp.";
public static final String E_SAME_DIRECTORY_ONLY = "/ftp";
private FTPClient connect() throws IOException {
FTPClient client = null;
String host = FS_FTP_HOST;
int port = FS_FTP_HOST_PORT;
String user = FS_FTP_USER_PREFIX;
String password = FS_FTP_PASSWORD_PREFIX;
client = new FTPClient();
client.connect(host, port);
int reply = client.getReplyCode();
if (!FTPReply.isPositiveCompletion(reply)) {
throw NetUtils.wrapException(
host,
port,
NetUtils.UNKNOWN_HOST,
0,
new ConnectException("Server response " + reply));
} else if (client.login(user, password)) {
client.setFileTransferMode(FTP.BLOCK_TRANSFER_MODE);
client.setFileType(FTP.BINARY_FILE_TYPE);
} else {
throw new IOException(
"Login failed on server - "
+ host
+ ", port - "
+ port
+ " as user '"
+ user
+ "'");
}
return client;
}
public FSDataInputStream open(String path) throws IOException {
FTPClient client = connect();
// Change to parent directory on the
// server. Only then can we read the
// file
// on the server by opening up an InputStream. As a side effect the working
// directory on the server is changed to the parent directory of the file.
// The FTP client connection is closed when close() is called on the
// FSDataInputStream.
client.changeWorkingDirectory(path);
InputStream is = client.retrieveFileStream(path);
FSDataInputStream fis = new FSDataInputStream(new FTPInputStream(is, client, statistics));
if (!FTPReply.isPositivePreliminary(client.getReplyCode())) {
// The ftpClient is an inconsistent state. Must close the stream
// which in turn will logout and disconnect from FTP server
fis.close();
throw new IOException("Unable to open file: " + path + ", Aborting");
}
return fis;
}
}
package org.apache.seatunnel.datasource.plugin.ftp;
public class FtpConfiguration {
private String host;
private String port;
private String user;
private String password;
private String path;
public FtpConfiguration() {
}
public FtpConfiguration(String host, String port, String user, String password, String path) {
this.host = host;
this.port = port;
this.user = user;
this.password = password;
this.path = path;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getPort() {
return port;
}
public void setPort(String port) {
this.port = port;
}
public String getUser() {
return user;
}
public void setUser(String user) {
this.user = user;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getPath() {
return path;
}
public void setPath(String path) {
this.path = path;
}
@Override
public String toString() {
return "FtpConfiguration{" +
"host='" + host + '\'' +
", port=" + port +
", user='" + user + '\'' +
", password='" + password + '\'' +
", path='" + path + '\'' +
'}';
}
}
...@@ -39,7 +39,7 @@ public class FtpDataSourceFactory implements DataSourceFactory { ...@@ -39,7 +39,7 @@ public class FtpDataSourceFactory implements DataSourceFactory {
@Override @Override
public Set<DataSourcePluginInfo> supportedDataSources() { public Set<DataSourcePluginInfo> supportedDataSources() {
DataSourcePluginInfo s3DatasourcePluginInfo = DataSourcePluginInfo ftpDatasourcePluginInfo =
DataSourcePluginInfo.builder() DataSourcePluginInfo.builder()
.name(PLUGIN_NAME) .name(PLUGIN_NAME)
.type(DatasourcePluginTypeEnum.FILE.getCode()) .type(DatasourcePluginTypeEnum.FILE.getCode())
...@@ -48,7 +48,7 @@ public class FtpDataSourceFactory implements DataSourceFactory { ...@@ -48,7 +48,7 @@ public class FtpDataSourceFactory implements DataSourceFactory {
.icon("FtpFile") .icon("FtpFile")
.build(); .build();
return Sets.newHashSet(s3DatasourcePluginInfo); return Sets.newHashSet(ftpDatasourcePluginInfo);
} }
@Override @Override
......
...@@ -25,11 +25,11 @@ import org.apache.seatunnel.api.configuration.util.OptionRule; ...@@ -25,11 +25,11 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField; import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.seatunnel.datasource.plugin.redshift.s3.HadoopS3AConfiguration;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
public class FtpDatasourceChannel implements DataSourceChannel { public class FtpDatasourceChannel implements DataSourceChannel {
@Override @Override
...@@ -42,6 +42,17 @@ public class FtpDatasourceChannel implements DataSourceChannel { ...@@ -42,6 +42,17 @@ public class FtpDatasourceChannel implements DataSourceChannel {
return FtpOptionRule.metadataRule(); return FtpOptionRule.metadataRule();
} }
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
FtpConfiguration conf = FtpAConfiguration.getConfiguration(requestParams);
if (Objects.isNull(conf)){
throw new DataSourcePluginException(
String.format("check ftp connectivity failed, config is: %s", requestParams));
}
return true;
}
@Override @Override
public List<String> getTables( public List<String> getTables(
@NonNull String pluginName, @NonNull String pluginName,
...@@ -57,19 +68,6 @@ public class FtpDatasourceChannel implements DataSourceChannel { ...@@ -57,19 +68,6 @@ public class FtpDatasourceChannel implements DataSourceChannel {
throw new UnsupportedOperationException("getDatabases is not supported for Ftp datasource"); throw new UnsupportedOperationException("getDatabases is not supported for Ftp datasource");
} }
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
Configuration conf = HadoopFtpAConfiguration.getConfiguration(requestParams);
try (FileSystem fs = FileSystem.get(conf)) {
fs.listStatus(new Path("/"));
return true;
} catch (IOException e) {
throw new DataSourcePluginException(
String.format("check ftp connectivity failed, config is: %s", requestParams), e);
}
}
@Override @Override
public List<TableField> getTableFields( public List<TableField> getTableFields(
@NonNull String pluginName, @NonNull String pluginName,
......
...@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.configuration.Options; ...@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.configuration.util.OptionRule;
public class FtpOptionRule { public class FtpOptionRule {
public static final Option<String> HOST = public static final Option<String> HOST =
Options.key("host") Options.key("host")
.stringType() .stringType()
...@@ -49,12 +50,19 @@ public class FtpOptionRule { ...@@ -49,12 +50,19 @@ public class FtpOptionRule {
.stringType() .stringType()
.noDefaultValue() .noDefaultValue()
.withDescription("path"); .withDescription("path");
public static final Option<String> FILE_FORMAT_TYPE = public static final Option<FileFormat> FILE_FORMAT_TYPE =
Options.key("file_format_type") Options.key("file_format_type")
.stringType() .enumType(FileFormat.class)
.noDefaultValue() .noDefaultValue()
.withDescription("file_format_type"); .withDescription("file_format_type");
public static final Option<String> DELIMITER =
Options.key("delimiter")
.stringType()
.noDefaultValue()
.withDescription("delimiter");
public static OptionRule optionRule() { public static OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT).optional(USERNAME, PASSWORD,PATH).build(); return OptionRule.builder().required(HOST, PORT).optional(USERNAME, PASSWORD,PATH).build();
} }
...@@ -62,4 +70,18 @@ public class FtpOptionRule { ...@@ -62,4 +70,18 @@ public class FtpOptionRule {
public static OptionRule metadataRule() { public static OptionRule metadataRule() {
return OptionRule.builder().required(FILE_FORMAT_TYPE).build(); return OptionRule.builder().required(FILE_FORMAT_TYPE).build();
} }
public enum FileFormat {
CSV("csv"),
TEXT("txt"),
PARQUET("parquet"),
ORC("orc"),
JSON("json");
private final String type;
FileFormat(String type) {
this.type = type;
}
}
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论