提交 da025db5 authored 作者: 李纤's avatar 李纤

修改minio连接

上级 c7a8f3b2
package org.apache.seatunnel.datasource.plugin.mqtt; package org.apache.seatunnel.datasource.plugin.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback; import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
......
package org.apache.seatunnel.datasource.plugin.mqtt; package org.apache.seatunnel.datasource.plugin.mqtt;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import lombok.Data; import lombok.Data;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/** @Author Heartsuit @Date 2022-12-11 */ /** @Author Heartsuit @Date 2022-12-11 */
@Data @Data
......
...@@ -17,23 +17,26 @@ ...@@ -17,23 +17,26 @@
package org.apache.seatunnel.datasource.plugin.mqtt; package org.apache.seatunnel.datasource.plugin.mqtt;
import com.alibaba.fastjson2.JSONObject;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField; import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
import com.alibaba.fastjson2.JSONObject; import java.util.ArrayList;
import lombok.NonNull; import java.util.Arrays;
import lombok.extern.slf4j.Slf4j; import java.util.Collections;
import java.util.HashMap;
import java.util.*; import java.util.Iterator;
import java.util.List;
import java.util.Map;
@Slf4j @Slf4j
public class MqttDataSourceChannel implements DataSourceChannel { public class MqttDataSourceChannel implements DataSourceChannel {
......
...@@ -17,14 +17,13 @@ ...@@ -17,14 +17,13 @@
package org.apache.seatunnel.datasource.plugin.mqtt; package org.apache.seatunnel.datasource.plugin.mqtt;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory; import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum; import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set; import java.util.Set;
@AutoService(DataSourceFactory.class) @AutoService(DataSourceFactory.class)
......
...@@ -17,13 +17,11 @@ ...@@ -17,13 +17,11 @@
package org.apache.seatunnel.datasource.plugin.s3; package org.apache.seatunnel.datasource.plugin.s3;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.hadoop.conf.Configuration;
import lombok.extern.slf4j.Slf4j;
import java.util.Map; import java.util.Map;
@Slf4j @Slf4j
......
package org.apache.seatunnel.datasource.plugin.s3; package org.apache.seatunnel.datasource.plugin.s3;
import com.amazonaws.services.dynamodbv2.xspec.S;
import io.minio.MinioClient; import io.minio.MinioClient;
import io.minio.errors.MinioException; import io.minio.errors.MinioException;
...@@ -12,12 +11,10 @@ public class S3ClientService { ...@@ -12,12 +11,10 @@ public class S3ClientService {
private String BUCKET; private String BUCKET;
private Integer PORT; private Integer PORT;
private final String clientId = "Client" + (int) (Math.random() * 100000000); private final String clientId = "Client" + (int) (Math.random() * 100000000);
private MinioClient minioClient; private MinioClient minioClient;
private String TOPIC;
public S3ClientService( public S3ClientService(
String endpoint, String endpoint,
String provider, String provider,
...@@ -34,11 +31,16 @@ public class S3ClientService { ...@@ -34,11 +31,16 @@ public class S3ClientService {
setMinioClient(endpoint, provider, username, password, bucket, port); setMinioClient(endpoint, provider, username, password, bucket, port);
} }
public MinioClient getMinioClient() { /**
return minioClient; *
} * @param endpoint
* @param provider
* @param username
* @param password
* @param bucket
* @param port
* @throws MinioException
*/
public void setMinioClient( public void setMinioClient(
String endpoint, String provider, String endpoint, String provider,
...@@ -50,13 +52,4 @@ public class S3ClientService { ...@@ -50,13 +52,4 @@ public class S3ClientService {
.build(); .build();
} }
//
// /** 关闭MQTT连接 */
// public void close() throws MqttException {
// mqttClient.close();
// mqttClient.disconnect();
// }
} }
...@@ -17,14 +17,13 @@ ...@@ -17,14 +17,13 @@
package org.apache.seatunnel.datasource.plugin.s3; package org.apache.seatunnel.datasource.plugin.s3;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory; import org.apache.seatunnel.datasource.plugin.api.DataSourceFactory;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginInfo;
import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum; import org.apache.seatunnel.datasource.plugin.api.DatasourcePluginTypeEnum;
import com.google.auto.service.AutoService;
import com.google.common.collect.Sets;
import java.util.Set; import java.util.Set;
@AutoService(DataSourceFactory.class) @AutoService(DataSourceFactory.class)
......
...@@ -17,21 +17,13 @@ ...@@ -17,21 +17,13 @@
package org.apache.seatunnel.datasource.plugin.s3; package org.apache.seatunnel.datasource.plugin.s3;
import io.minio.MinioClient; import lombok.NonNull;
import org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.util.OptionRule; import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel; import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException; import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField; import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import lombok.NonNull;
import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论