提交 2863f25c authored 作者: 王红亮's avatar 王红亮

更新rabbitmq配置

上级 94eab635
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.rabbitmq;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
@Slf4j
public class RabbitAConfiguration {
public static RabbitConfiguration getConfiguration(Map<String, String> redisOption) {
if (!redisOption.containsKey(RabbitmqOptionRule.HOST.key())) {
throw new IllegalArgumentException(
"redis datasource host is null, please check your config");
}
if (!redisOption.containsKey(RabbitmqOptionRule.PORT.key())) {
throw new IllegalArgumentException(
"redis datasource port is null, please check your config");
}
if (!redisOption.containsKey(RabbitmqOptionRule.USERNAME.key())) {
throw new IllegalArgumentException(
"redis datasource username is null, please check your config");
}
if (!redisOption.containsKey(RabbitmqOptionRule.PASSWORD.key())) {
throw new IllegalArgumentException(
"redis datasource password is null, please check your config");
}
if (!redisOption.containsKey(RabbitmqOptionRule.VIRTUAL_HOST.key())) {
throw new IllegalArgumentException(
"redis datasource virtual_host is null, please check your config");
}
if (!redisOption.containsKey(RabbitmqOptionRule.QUEUE_NAME.key())) {
throw new IllegalArgumentException(
"redis datasource queue_name is null, please check your config");
}
if (!redisOption.containsKey(RabbitmqOptionRule.SCHEMA.key())) {
throw new IllegalArgumentException(
"redis datasource schema is null, please check your config");
}
if (!redisOption.containsKey(RabbitmqOptionRule.URL.key())) {
throw new IllegalArgumentException(
"redis datasource url is null, please check your config");
}
RabbitConfiguration redisConfiguration = new RabbitConfiguration();
redisConfiguration.setHost(redisOption.get(RabbitmqOptionRule.HOST.key()));
redisConfiguration.setPort(Integer.valueOf(redisOption.get(RabbitmqOptionRule.PORT.key())));
redisConfiguration.setUsername(redisOption.get(RabbitmqOptionRule.USERNAME.key()));
redisConfiguration.setPassword(redisOption.get(RabbitmqOptionRule.PASSWORD.key()));
redisConfiguration.setVirtualHost(redisOption.get(RabbitmqOptionRule.VIRTUAL_HOST.key()));
redisConfiguration.setQueueName(redisOption.get(RabbitmqOptionRule.QUEUE_NAME.key()));
redisConfiguration.setSchema(redisOption.get(RabbitmqOptionRule.SCHEMA.key()));
redisConfiguration.setUri(redisOption.get(RabbitmqOptionRule.URL.key()));
return redisConfiguration;
}
}
package org.apache.seatunnel.datasource.plugin.rabbitmq;
public class RabbitConfiguration {
private String host;
private Integer port;
private String virtualHost;
private String username;
private String password;
private String uri;
private Integer networkRecoveryInterval;
private Boolean automaticRecovery;
private Boolean topologyRecovery;
private Integer connectionTimeout;
private Integer requestedChannelMax;
private Integer requestedFrameMax;
private Integer requestedHeartbeat;
private Integer prefetchCount;
private long deliveryTimeout;
private String queueName;
private String routingKey;
private String schema;
public RabbitConfiguration() {}
public RabbitConfiguration(
String host,
Integer port,
String virtualHost,
String username,
String password,
String uri,
Integer networkRecoveryInterval,
Boolean automaticRecovery,
Boolean topologyRecovery,
Integer connectionTimeout,
Integer requestedChannelMax,
Integer requestedFrameMax,
Integer requestedHeartbeat,
Integer prefetchCount,
long deliveryTimeout,
String queueName,
String routingKey,
String schema) {
this.host = host;
this.port = port;
this.virtualHost = virtualHost;
this.username = username;
this.password = password;
this.uri = uri;
this.networkRecoveryInterval = networkRecoveryInterval;
this.automaticRecovery = automaticRecovery;
this.topologyRecovery = topologyRecovery;
this.connectionTimeout = connectionTimeout;
this.requestedChannelMax = requestedChannelMax;
this.requestedFrameMax = requestedFrameMax;
this.requestedHeartbeat = requestedHeartbeat;
this.prefetchCount = prefetchCount;
this.deliveryTimeout = deliveryTimeout;
this.queueName = queueName;
this.routingKey = routingKey;
this.schema = schema;
}
@Override
public String toString() {
return "RabbitConfiguration{"
+ "host='"
+ host
+ '\''
+ ", port="
+ port
+ ", virtualHost='"
+ virtualHost
+ '\''
+ ", username='"
+ username
+ '\''
+ ", password='"
+ password
+ '\''
+ ", uri='"
+ uri
+ '\''
+ ", networkRecoveryInterval="
+ networkRecoveryInterval
+ ", automaticRecovery="
+ automaticRecovery
+ ", topologyRecovery="
+ topologyRecovery
+ ", connectionTimeout="
+ connectionTimeout
+ ", requestedChannelMax="
+ requestedChannelMax
+ ", requestedFrameMax="
+ requestedFrameMax
+ ", requestedHeartbeat="
+ requestedHeartbeat
+ ", prefetchCount="
+ prefetchCount
+ ", deliveryTimeout="
+ deliveryTimeout
+ ", queueName='"
+ queueName
+ '\''
+ ", routingKey='"
+ routingKey
+ '\''
+ ", schema='"
+ schema
+ '\''
+ '}';
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
public String getVirtualHost() {
return virtualHost;
}
public void setVirtualHost(String virtualHost) {
this.virtualHost = virtualHost;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getUri() {
return uri;
}
public void setUri(String uri) {
this.uri = uri;
}
public Integer getNetworkRecoveryInterval() {
return networkRecoveryInterval;
}
public void setNetworkRecoveryInterval(Integer networkRecoveryInterval) {
this.networkRecoveryInterval = networkRecoveryInterval;
}
public Boolean getAutomaticRecovery() {
return automaticRecovery;
}
public void setAutomaticRecovery(Boolean automaticRecovery) {
this.automaticRecovery = automaticRecovery;
}
public Boolean getTopologyRecovery() {
return topologyRecovery;
}
public void setTopologyRecovery(Boolean topologyRecovery) {
this.topologyRecovery = topologyRecovery;
}
public Integer getConnectionTimeout() {
return connectionTimeout;
}
public void setConnectionTimeout(Integer connectionTimeout) {
this.connectionTimeout = connectionTimeout;
}
public Integer getRequestedChannelMax() {
return requestedChannelMax;
}
public void setRequestedChannelMax(Integer requestedChannelMax) {
this.requestedChannelMax = requestedChannelMax;
}
public Integer getRequestedFrameMax() {
return requestedFrameMax;
}
public void setRequestedFrameMax(Integer requestedFrameMax) {
this.requestedFrameMax = requestedFrameMax;
}
public Integer getRequestedHeartbeat() {
return requestedHeartbeat;
}
public void setRequestedHeartbeat(Integer requestedHeartbeat) {
this.requestedHeartbeat = requestedHeartbeat;
}
public Integer getPrefetchCount() {
return prefetchCount;
}
public void setPrefetchCount(Integer prefetchCount) {
this.prefetchCount = prefetchCount;
}
public long getDeliveryTimeout() {
return deliveryTimeout;
}
public void setDeliveryTimeout(long deliveryTimeout) {
this.deliveryTimeout = deliveryTimeout;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public String getSchema() {
return schema;
}
public void setSchema(String schema) {
this.schema = schema;
}
}
......@@ -17,8 +17,6 @@
package org.apache.seatunnel.datasource.plugin.rabbitmq;
import org.apache.seatunnel.common.Handover;
import org.apache.commons.lang3.StringUtils;
import com.rabbitmq.client.*;
......@@ -35,18 +33,17 @@ import static org.apache.seatunnel.datasource.plugin.rabbitmq.RabbitmqConnectorE
@Slf4j
@Data
public class RabbitmqClient {
private final RabbitmqOptionRule config;
private final ConnectionFactory connectionFactory;
private final Connection connection;
private final Channel channel;
public class RabbitmqClientService {
private RabbitConfiguration config;
private ConnectionFactory connectionFactory;
private Connection connection;
private Channel channel;
public RabbitmqClient(RabbitmqOptionRule config) {
this.config = config;
public RabbitmqClientService(RabbitConfiguration config) {
try {
this.connectionFactory = getConnectionFactory();
this.connection = connectionFactory.newConnection();
this.channel = connection.createChannel();
getConnectionFactory(config);
connectionFactory.newConnection();
connection.createChannel();
// set channel prefetch count
if (config.getPrefetchCount() != null) {
channel.basicQos(config.getPrefetchCount(), true);
......@@ -62,16 +59,7 @@ public class RabbitmqClient {
}
}
public Channel getChannel() {
return channel;
}
public DefaultConsumer getQueueingConsumer(Handover<Delivery> handover) {
DefaultConsumer consumer = new QueueingConsumer(channel, handover);
return consumer;
}
public ConnectionFactory getConnectionFactory() {
public static Connection getConnectionFactory(RabbitConfiguration config) {
ConnectionFactory factory = new ConnectionFactory();
if (!StringUtils.isEmpty(config.getUri())) {
try {
......@@ -114,66 +102,15 @@ public class RabbitmqClient {
if (config.getRequestedFrameMax() != null) {
factory.setRequestedFrameMax(config.getRequestedFrameMax());
}
return factory;
}
public void write(byte[] msg) {
try {
if (StringUtils.isEmpty(config.getRoutingKey())) {
channel.basicPublish("", config.getQueueName(), null, msg);
} else {
// not support set returnListener
channel.basicPublish(
config.getExchange(), config.getRoutingKey(), false, false, null, msg);
}
} catch (IOException e) {
if (config.isLogFailuresOnly()) {
log.error(
"Cannot send RMQ message {} at {}",
config.getQueueName(),
config.getHost(),
e);
} else {
throw new RabbitmqConnectorException(
SEND_MESSAGE_FAILED,
String.format(
"Cannot send RMQ message %s at %s",
config.getQueueName(), config.getHost()),
e);
}
}
}
public void close() {
Exception t = null;
try {
if (channel != null) {
channel.close();
}
} catch (IOException | TimeoutException e) {
t = e;
}
Connection connection = null;
try {
if (connection != null) {
connection.close();
}
connection = factory.newConnection();
} catch (IOException e) {
if (t != null) {
log.warn(
"Both channel and connection closing failed. Logging channel exception and failing with connection exception",
t);
}
t = e;
}
if (t != null) {
throw new RabbitmqConnectorException(
CLOSE_CONNECTION_FAILED,
String.format(
"Error while closing RMQ connection with %s at %s",
config.getQueueName(), config.getHost()),
t);
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
return connection;
}
protected void setupQueue() throws IOException {
......
......@@ -18,28 +18,18 @@
package org.apache.seatunnel.datasource.plugin.rabbitmq;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.datasource.plugin.api.DataSourceChannel;
import org.apache.seatunnel.datasource.plugin.api.DataSourcePluginException;
import org.apache.seatunnel.datasource.plugin.api.model.TableField;
import org.apache.commons.lang3.StringUtils;
import com.rabbitmq.client.Connection;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import static com.google.common.base.Preconditions.checkArgument;
import java.util.*;
@Slf4j
public class RabbitmqDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default";
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return RabbitmqOptionRule.optionRule();
......@@ -55,39 +45,42 @@ public class RabbitmqDataSourceChannel implements DataSourceChannel {
@NonNull String pluginName,
Map<String, String> requestParams,
String database,
Map<String, String> option) {
// checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be
// default");
try {
if (StringUtils.isNotBlank(database)) {
return Arrays.asList(database);
}
} catch (Exception ex) {
throw new DataSourcePluginException(
"check redis connectivity failed, " + ex.getMessage(), ex);
}
return null;
Map<String, String> options) {
throw new UnsupportedOperationException("getTables is not supported for Redis datasource");
}
@Override
public List<String> getDatabases(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
return DEFAULT_DATABASES;
throw new UnsupportedOperationException(
"getDatabases is not supported for Redis datasource");
}
@Override
public boolean checkDataSourceConnectivity(
@NonNull String pluginName, @NonNull Map<String, String> requestParams) {
if (requestParams.isEmpty()) {
throw new SeaTunnelException("requestParmas 为空!");
}
RabbitConfiguration conf = RabbitAConfiguration.getConfiguration(requestParams);
try {
// just test the connection
return true;
// return StringUtils.isNotEmpty(RabbitmqClient.getTopi);
} catch (Exception ex) {
Connection connectionFactory = RabbitmqClientService.getConnectionFactory(conf);
if (Objects.isNull(conf)) {
throw new DataSourcePluginException(
String.format(
"check rabbitmq connectivity failed, config is: %s",
requestParams));
}
if (Objects.nonNull(connectionFactory)) {
return true;
} else {
throw new DataSourcePluginException(
String.format(
"check redis connectivity failed, config is: %s", requestParams));
}
} catch (Exception e) {
throw new DataSourcePluginException(
"check mqtt connectivity failed, " + ex.getMessage(), ex);
String.format("check redis connectivity failed, config is: %s", requestParams));
}
}
......@@ -97,8 +90,8 @@ public class RabbitmqDataSourceChannel implements DataSourceChannel {
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull String table) {
checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default");
return Collections.emptyList();
throw new UnsupportedOperationException(
"getTableFields is not supported for redis datasource");
}
@Override
......@@ -107,7 +100,7 @@ public class RabbitmqDataSourceChannel implements DataSourceChannel {
@NonNull Map<String, String> requestParams,
@NonNull String database,
@NonNull List<String> tables) {
checkArgument(StringUtils.equalsIgnoreCase(database, DATABASE), "database must be default");
return Collections.emptyMap();
throw new UnsupportedOperationException(
"getTableFields is not supported for redis datasource");
}
}
......@@ -29,7 +29,6 @@ import java.util.Set;
@AutoService(DataSourceFactory.class)
public class RabbitmqDataSourceFactory implements DataSourceFactory {
public static final String RABBITMQ_PLUGIN_NAME = "Rabbitmq";
public static final String RABBITMQ_PLUGIN_ICON = "rabbitmq";
public static final String RABBITMQ_PLUGIN_VERSION = "1.0.0";
......
......@@ -21,85 +21,10 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import lombok.Data;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@Data
public class RabbitmqOptionRule {
private String host;
private Integer port;
private String virtualHost;
private String username;
private String password;
private String uri;
private Integer networkRecoveryInterval;
private Boolean automaticRecovery;
private Boolean topologyRecovery;
private Integer connectionTimeout;
private Integer requestedChannelMax;
private Integer requestedFrameMax;
private Integer requestedHeartbeat;
private Integer prefetchCount;
private long deliveryTimeout;
private String queueName;
private String routingKey;
private boolean logFailuresOnly = false;
private String exchange = "";
private boolean forE2ETesting = false;
private boolean usesCorrelationId = false;
public RabbitmqOptionRule(
String host,
Integer port,
String virtualHost,
String username,
String password,
String uri,
Integer networkRecoveryInterval,
Boolean automaticRecovery,
Boolean topologyRecovery,
Integer connectionTimeout,
Integer requestedChannelMax,
Integer requestedFrameMax,
Integer requestedHeartbeat,
Integer prefetchCount,
long deliveryTimeout,
String queueName,
String routingKey,
boolean logFailuresOnly,
String exchange,
boolean forE2ETesting,
boolean usesCorrelationId) {
this.host = host;
this.port = port;
this.virtualHost = virtualHost;
this.username = username;
this.password = password;
this.uri = uri;
this.networkRecoveryInterval = networkRecoveryInterval;
this.automaticRecovery = automaticRecovery;
this.topologyRecovery = topologyRecovery;
this.connectionTimeout = connectionTimeout;
this.requestedChannelMax = requestedChannelMax;
this.requestedFrameMax = requestedFrameMax;
this.requestedHeartbeat = requestedHeartbeat;
this.prefetchCount = prefetchCount;
this.deliveryTimeout = deliveryTimeout;
this.queueName = queueName;
this.routingKey = routingKey;
this.logFailuresOnly = logFailuresOnly;
this.exchange = exchange;
this.forE2ETesting = forE2ETesting;
this.usesCorrelationId = usesCorrelationId;
}
private final Map<String, Object> sinkOptionProps = new HashMap<>();
public static final Option<String> HOST =
Options.key("host")
.stringType()
......@@ -130,6 +55,12 @@ public class RabbitmqOptionRule {
.noDefaultValue()
.withDescription("the password to use when connecting to the broker");
public static final Option<String> SCHEMA =
Options.key("schema")
.stringType()
.noDefaultValue()
.withDescription("the schema to write the message to");
public static final Option<String> QUEUE_NAME =
Options.key("queue_name")
.stringType()
......@@ -234,10 +165,13 @@ public class RabbitmqOptionRule {
+ "id to deduplicate messages (in case of failed acknowledgments).");
public static OptionRule optionRule() {
return OptionRule.builder().required(HOST, PORT).optional(USERNAME, PASSWORD).build();
return OptionRule.builder()
.required(HOST, PORT, USERNAME, PASSWORD)
.optional(QUEUE_NAME, SCHEMA)
.build();
}
public static OptionRule metadataRule() {
return OptionRule.builder().required(VIRTUAL_HOST).build();
return OptionRule.builder().required(VIRTUAL_HOST, URL).build();
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.datasource.plugin.rabbitmq;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import java.util.Map;
import java.util.Properties;
import static com.google.common.base.Preconditions.checkArgument;
public class RabbitmqRequestParamsUtils {
public static Properties parsePropertiesFromRequestParams(Map<String, String> requestParams) {
checkArgument(
requestParams.containsKey(RabbitmqOptionRule.HOST.key()),
String.format("Missing %s in requestParams", RabbitmqOptionRule.HOST.key()));
final Properties properties = new Properties();
properties.put(
RabbitmqOptionRule.HOST.key(), requestParams.get(RabbitmqOptionRule.HOST.key()));
if (requestParams.containsKey(RabbitmqOptionRule.PORT.key())) {
Config configObject =
ConfigFactory.parseString(requestParams.get(RabbitmqOptionRule.PORT.key()));
configObject
.entrySet()
.forEach(
entry -> {
properties.put(
entry.getKey(), entry.getValue().unwrapped().toString());
});
}
return properties;
}
}
......@@ -30,9 +30,6 @@ import java.util.*;
@Slf4j
public class RedisDataSourceChannel implements DataSourceChannel {
private static final String DATABASE = "default";
@Override
public OptionRule getDataSourceOptions(@NonNull String pluginName) {
return RedisOptionRule.optionRule();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论