Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support rocketmq #1680

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config.properties.example
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ log_level=info
producer=kafka
kafka.bootstrap.servers=localhost:9092

producer=rocketmq
rocketmq_namesrv_addr=localhost:9876
rocketmq_producer_group=rocketmq_producer_group
rocketmq_send_topic=maxwell
rocketmq_tags=*

# mysql login info
host=localhost
user=maxwell
Expand Down
7 changes: 6 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@
<!-- OpenCensus used for pushing metrics to stackdriver -->
<dependency>
<groupId>io.opencensus</groupId>
<artifactId>opencensus-contrib-dropwizard</artifactId>
<artifactId>opencensus-contrib-dropwizard</artifactId>
<version>${opencensus.version}</version>
</dependency>
<dependency>
Expand All @@ -373,6 +373,11 @@
<artifactId>opencensus-exporter-stats-stackdriver</artifactId>
<version>${opencensus.version}</version>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
</dependencies>

<build>
Expand Down
57 changes: 28 additions & 29 deletions src/main/java/com/zendesk/maxwell/MaxwellConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,6 @@ public class MaxwellConfig extends AbstractConfig {

public String sqsQueueUri;

public String snsTopic;
public String snsAttrs;

public String pubsubProjectId;
public String pubsubTopic;
public String ddlPubsubTopic;
Expand Down Expand Up @@ -110,8 +107,6 @@ public class MaxwellConfig extends AbstractConfig {

public MaxwellDiagnosticContext.Config diagnosticConfig;

public boolean enableHttpConfig;

public String clientID;
public Long replicaServerID;

Expand All @@ -126,9 +121,8 @@ public class MaxwellConfig extends AbstractConfig {
public String rabbitmqUser;
public String rabbitmqPass;
public String rabbitmqHost;
public Integer rabbitmqPort;
public int rabbitmqPort;
public String rabbitmqVirtualHost;
public String rabbitmqURI;
public String rabbitmqExchange;
public String rabbitmqExchangeType;
public boolean rabbitMqExchangeDurable;
Expand All @@ -137,6 +131,11 @@ public class MaxwellConfig extends AbstractConfig {
public boolean rabbitmqMessagePersistent;
public boolean rabbitmqDeclareExchange;

public String rocketmqNamesrvAddr;
public String rocketmqProducerGroup;
public String rocketmqSendTopic;
public String rocketmqTags;

public String natsUrl;
public String natsSubject;

Expand Down Expand Up @@ -450,6 +449,14 @@ protected MaxwellOptionParser buildOptionParser() {
parser.accepts( "rabbitmq_message_persistent", "Message persistence. Defaults to false" ).withOptionalArg();
parser.accepts( "rabbitmq_declare_exchange", "Should declare the exchange for rabbitmq publisher. Defaults to true" ).withOptionalArg();


parser.section( "rocketmq" );

parser.accepts( "rocketmq_namesrv_addr", "Rocketmq NameServer" ).withRequiredArg();
parser.accepts( "rocketmq_producer_group", "Rocketmq ProducerGroup" ).withRequiredArg();
parser.accepts( "rocketmq_send_topic", "optionally provide a topic name to push to. default: maxwell" ).withRequiredArg();
parser.accepts( "rocketmq_tags", "Rocketmq send messgae tags. default:tags" ).withRequiredArg();

parser.section( "redis" );

parser.accepts( "redis_host", "Host of Redis server" ).withRequiredArg();
Expand All @@ -469,26 +476,22 @@ protected MaxwellOptionParser buildOptionParser() {
parser.section("metrics");

parser.accepts( "metrics_prefix", "the prefix maxwell will apply to all metrics" ).withRequiredArg();
parser.accepts( "metrics_type", "how maxwell metrics will be reported, at least one of slf4j|jmx|http|datadog|stackdriver" ).withRequiredArg();
parser.accepts( "metrics_type", "how maxwell metrics will be reported, at least one of slf4j|jmx|http|datadog" ).withRequiredArg();
parser.accepts( "metrics_slf4j_interval", "the frequency metrics are emitted to the log, in seconds, when slf4j reporting is configured" ).withRequiredArg();
parser.accepts( "metrics_age_slo", "the threshold in seconds for message age service level objective" ).withRequiredArg().ofType(Integer.class);
parser.accepts( "metrics_jvm", "enable jvm metrics: true|false. default: false" ).withRequiredArg().ofType(Boolean.class);
parser.accepts( "http_port", "the port the server will bind to when http reporting is configured" ).withRequiredArg().ofType(Integer.class);
parser.accepts( "http_path_prefix", "the http path prefix when metrics_type includes http or diagnostic is enabled, default /" ).withRequiredArg();
parser.accepts( "http_bind_address", "the ip address the server will bind to when http reporting is configured" ).withRequiredArg();
parser.accepts( "metrics_datadog_type", "when metrics_type includes datadog this is the way metrics will be reported, one of udp|http" ).withRequiredArg();
parser.accepts( "metrics_datadog_tags", "datadog tags that should be supplied, e.g. tag1:value1,tag2:value2" ).withRequiredArg();
parser.accepts( "metrics_datadog_interval", "the frequency metrics are pushed to datadog, in seconds" ).withRequiredArg().ofType(Long.class);
parser.accepts( "metrics_datadog_apikey", "the datadog api key to use when metrics_datadog_type = http" ).withRequiredArg();
parser.accepts( "metrics_datadog_site", "the site to publish metrics to when metrics_datadog_type = http, one of us|eu, default us" ).withRequiredArg();
parser.accepts( "metrics_datadog_host", "the host to publish metrics to when metrics_datadog_type = udp" ).withRequiredArg();
parser.accepts( "metrics_datadog_port", "the port to publish metrics to when metrics_datadog_type = udp" ).withRequiredArg().ofType(Integer.class);


parser.section("http");
parser.accepts( "http_port", "the port the server will bind to when http reporting is configured" ).withRequiredArg().ofType(Integer.class);
parser.accepts( "http_path_prefix", "the http path prefix when metrics_type includes http or diagnostic is enabled, default /" ).withRequiredArg();
parser.accepts( "http_bind_address", "the ip address the server will bind to when http reporting is configured" ).withRequiredArg();
parser.accepts( "http_diagnostic", "enable http diagnostic endpoint: true|false. default: false" ).withOptionalArg().ofType(Boolean.class);
parser.accepts( "http_diagnostic_timeout", "the http diagnostic response timeout in ms when http_diagnostic=true. default: 10000" ).withRequiredArg().ofType(Integer.class);
parser.accepts( "http_config", "enable http config update endpoint: true|false. default: false" ).withOptionalArg().ofType(Boolean.class);
parser.accepts( "metrics_jvm", "enable jvm metrics: true|false. default: false" ).withRequiredArg().ofType(Boolean.class);

parser.accepts( "help", "display help" ).withOptionalArg().forHelp();

Expand Down Expand Up @@ -571,12 +574,11 @@ private void setup(OptionSet options, Properties properties) {
this.pubsubMaxRpcTimeout = Duration.ofSeconds(fetchLongOption("pubsub_max_rpc_timeout", options, properties, 600L));
this.pubsubTotalTimeout = Duration.ofSeconds(fetchLongOption("pubsub_total_timeout", options, properties, 600L));

this.rabbitmqHost = fetchStringOption("rabbitmq_host", options, properties, null);
this.rabbitmqPort = fetchIntegerOption("rabbitmq_port", options, properties, null);
this.rabbitmqHost = fetchStringOption("rabbitmq_host", options, properties, "localhost");
this.rabbitmqPort = fetchIntegerOption("rabbitmq_port", options, properties, 5672);
this.rabbitmqUser = fetchStringOption("rabbitmq_user", options, properties, "guest");
this.rabbitmqPass = fetchStringOption("rabbitmq_pass", options, properties, "guest");
this.rabbitmqVirtualHost = fetchStringOption("rabbitmq_virtual_host", options, properties, "/");
this.rabbitmqURI = fetchStringOption("rabbitmq_uri", options, properties, null);
this.rabbitmqExchange = fetchStringOption("rabbitmq_exchange", options, properties, "maxwell");
this.rabbitmqExchangeType = fetchStringOption("rabbitmq_exchange_type", options, properties, "fanout");
this.rabbitMqExchangeDurable = fetchBooleanOption("rabbitmq_exchange_durable", options, properties, false);
Expand All @@ -588,6 +590,11 @@ private void setup(OptionSet options, Properties properties) {
this.natsUrl = fetchStringOption("nats_url", options, properties, "nats://localhost:4222");
this.natsSubject = fetchStringOption("nats_subject", options, properties, "%{database}.%{table}");

this.rocketmqNamesrvAddr = fetchStringOption("rocketmq_namesrv_addr", options, properties, "localhost:9876");
this.rocketmqProducerGroup = fetchStringOption("rocketmq_producer_group", options, properties, "rocketmq_producer_group");
this.rocketmqSendTopic = fetchStringOption("rocketmq_send_topic", options, properties, "maxwell");
this.rocketmqTags = fetchStringOption("rocketmq_tags", options, properties, "tags");

this.redisHost = fetchStringOption("redis_host", options, properties, "localhost");
this.redisPort = fetchIntegerOption("redis_port", options, properties, 6379);
this.redisAuth = fetchStringOption("redis_auth", options, properties, null);
Expand Down Expand Up @@ -633,8 +640,6 @@ private void setup(OptionSet options, Properties properties) {

this.sqsQueueUri = fetchStringOption("sqs_queue_uri", options, properties, null);

this.snsTopic = fetchStringOption("sns_topic", options, properties, null);
this.snsAttrs = fetchStringOption("sns_attrs", options, properties, null);
this.outputFile = fetchStringOption("output_file", options, properties, null);

this.metricsPrefix = fetchStringOption("metrics_prefix", options, properties, "MaxwellMetrics");
Expand Down Expand Up @@ -662,8 +667,6 @@ private void setup(OptionSet options, Properties properties) {
this.diagnosticConfig.enable = fetchBooleanOption("http_diagnostic", options, properties, false);
this.diagnosticConfig.timeout = fetchLongOption("http_diagnostic_timeout", options, properties, 10000L);

this.enableHttpConfig = fetchBooleanOption("http_config", options, properties, false);

this.includeDatabases = fetchStringOption("include_dbs", options, properties, null);
this.excludeDatabases = fetchStringOption("exclude_dbs", options, properties, null);
this.includeTables = fetchStringOption("include_tables", options, properties, null);
Expand Down Expand Up @@ -693,7 +696,6 @@ private void setup(OptionSet options, Properties properties) {
outputConfig.includesRowQuery = fetchBooleanOption("output_row_query", options, properties, false);
outputConfig.includesPrimaryKeys = fetchBooleanOption("output_primary_keys", options, properties, false);
outputConfig.includesPrimaryKeyColumns = fetchBooleanOption("output_primary_key_columns", options, properties, false);
outputConfig.includesPushTimestamp = fetchBooleanOption("output_push_timestamp", options, properties, false);
outputConfig.outputDDL = fetchBooleanOption("output_ddl", options, properties, false);
outputConfig.zeroDatesAsNull = fetchBooleanOption("output_null_zerodates", options, properties, false);
outputConfig.namingStrategy = fetchStringOption("output_naming_strategy", options, properties, null);
Expand Down Expand Up @@ -836,7 +838,7 @@ public void validate() {

if ( this.producerType.equals("kafka") ) {
if ( !this.kafkaProperties.containsKey("bootstrap.servers") ) {
usageForOptions("Please specify kafka.bootstrap.servers", "kafka");
usageForOptions("You must specify kafka.bootstrap.servers for the kafka producer!", "kafka");
}

if ( this.kafkaPartitionHash == null ) {
Expand All @@ -856,8 +858,6 @@ public void validate() {
usageForOptions("please specify a stream name for kinesis", "kinesis_stream");
} else if (this.producerType.equals("sqs") && this.sqsQueueUri == null) {
usageForOptions("please specify a queue uri for sqs", "sqs_queue_uri");
} else if (this.producerType.equals("sns") && this.snsTopic == null) {
usageForOptions("please specify a topic ARN for SNS", "sns_topic");
} else if (this.producerType.equals("pubsub")) {
if (this.pubsubRequestBytesThreshold <= 0L)
usage("--pubsub_request_bytes_threshold must be > 0");
Expand Down Expand Up @@ -930,8 +930,7 @@ public void validate() {
null,
this.maxwellMysql.user,
this.maxwellMysql.password,
this.maxwellMysql.sslMode,
this.maxwellMysql.enableHeartbeat
this.maxwellMysql.sslMode
);

this.replicationMysql.jdbcOptions = this.maxwellMysql.jdbcOptions;
Expand Down
31 changes: 27 additions & 4 deletions src/main/java/com/zendesk/maxwell/MaxwellContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,42 @@
import com.zendesk.maxwell.bootstrap.BootstrapController;
import com.zendesk.maxwell.bootstrap.SynchronousBootstrapper;
import com.zendesk.maxwell.filtering.Filter;
import com.zendesk.maxwell.monitoring.*;
import com.zendesk.maxwell.producer.*;
import com.zendesk.maxwell.monitoring.MaxwellDiagnostic;
import com.zendesk.maxwell.monitoring.MaxwellDiagnosticContext;
import com.zendesk.maxwell.monitoring.MaxwellHTTPServer;
import com.zendesk.maxwell.monitoring.MaxwellMetrics;
import com.zendesk.maxwell.monitoring.Metrics;
import com.zendesk.maxwell.producer.AbstractProducer;
import com.zendesk.maxwell.producer.BufferedProducer;
import com.zendesk.maxwell.producer.FileProducer;
import com.zendesk.maxwell.producer.MaxwellKafkaProducer;
import com.zendesk.maxwell.producer.MaxwellKinesisProducer;
import com.zendesk.maxwell.producer.MaxwellPubsubProducer;
import com.zendesk.maxwell.producer.MaxwellRedisProducer;
import com.zendesk.maxwell.producer.MaxwellSQSProducer;
import com.zendesk.maxwell.producer.NoneProducer;
import com.zendesk.maxwell.producer.ProfilerProducer;
import com.zendesk.maxwell.producer.RabbitmqProducer;
import com.zendesk.maxwell.producer.RocketmqProducer;
import com.zendesk.maxwell.producer.StdoutProducer;
import com.zendesk.maxwell.recovery.RecoveryInfo;
import com.zendesk.maxwell.replication.*;
import com.zendesk.maxwell.replication.BinlogConnectorDiagnostic;
import com.zendesk.maxwell.replication.HeartbeatNotifier;
import com.zendesk.maxwell.replication.MysqlVersion;
import com.zendesk.maxwell.replication.Position;
import com.zendesk.maxwell.replication.Replicator;
import com.zendesk.maxwell.row.RowMap;
import com.zendesk.maxwell.schema.MysqlPositionStore;
import com.zendesk.maxwell.schema.MysqlSchemaCompactor;
import com.zendesk.maxwell.schema.PositionStoreThread;
import com.zendesk.maxwell.schema.ReadOnlyMysqlPositionStore;
import com.zendesk.maxwell.util.C3P0ConnectionPool;
import com.zendesk.maxwell.util.ConnectionPool;
import com.zendesk.maxwell.util.RunLoopProcess;
import com.zendesk.maxwell.util.StoppableTask;
import com.zendesk.maxwell.util.TaskManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.zendesk.maxwell.util.ConnectionPool;

import java.io.IOException;
import java.net.URISyntaxException;
Expand Down Expand Up @@ -406,6 +426,9 @@ public AbstractProducer getProducer() throws IOException {
case "rabbitmq":
this.producer = new RabbitmqProducer(this);
break;
case "rocketmq":
this.producer = new RocketmqProducer(this);
break;
case "redis":
this.producer = new MaxwellRedisProducer(this);
break;
Expand Down
63 changes: 63 additions & 0 deletions src/main/java/com/zendesk/maxwell/producer/RocketmqProducer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.zendesk.maxwell.producer;

import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.row.RowMap;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketmqProducer extends AbstractProducer {

private static final Logger LOGGER = LoggerFactory.getLogger(RocketmqProducer.class);
// 实例化消息生产者Producer
private DefaultMQProducer producer;

private String rocketmqSendTopic;
private String rocketmqTags;


public RocketmqProducer(MaxwellContext context) {
super(context);
try {
producer = new DefaultMQProducer(context.getConfig().rocketmqProducerGroup);
// 设置NameServer的地址
producer.setNamesrvAddr(context.getConfig().rocketmqNamesrvAddr);

rocketmqSendTopic = context.getConfig().rocketmqSendTopic;
rocketmqTags = context.getConfig().rocketmqTags;
// 启动Producer实例
producer.start();
} catch (MQClientException e) {
throw new RuntimeException(e);
}

}

@Override
public void push(RowMap r) throws Exception {
if ( !r.shouldOutput(outputConfig) ) {
context.setPosition(r.getNextPosition());
return;
}
String value = r.toJSON(outputConfig);
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message(rocketmqSendTopic /* Topic */,
rocketmqTags /* Tag */,
(value).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);

producer.send(msg);

if ( r.isTXCommit() ) {
context.setPosition(r.getNextPosition());
}
if ( LOGGER.isDebugEnabled()) {
LOGGER.debug("-> topic:" + rocketmqSendTopic + ", tags:" + rocketmqTags);
}
}


}