Skip to content

Latest commit

 

History

History
295 lines (258 loc) · 15.9 KB

mysql_binlogname_position.md

File metadata and controls

295 lines (258 loc) · 15.9 KB

MySQL 5.X对接-使用binlogname+position

下面的例子都可以参考Fountain-examples

1. 使用条件

MySQL 5.X版本均支持使用binlogfilename+position进行dump的策略。

可以通过配置文件配置这两个值,这两个值可以通过如下命令获取:

mysql> show binlog events in 'mysql-bin.000004' from 9547101 limit 10;

如果配置文件或者持久化文件中找不到同步点配置,则fountain会尝试使用如下命令获取最后一个position:

mysql> show master status;
+------------------+----------+--------------+------------------+----------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Group_ID |
+------------------+----------+--------------+------------------+----------+
| mysql-bin.000004 |  9547101 |              |                  |      913 |
+------------------+----------+--------------+------------------+----------+
1 row in set (0.01 sec)

2. Properties配置

jdbc-mysql.properties

# mysql用户名密码
mysql_username=beidou
mysql_password=u7i8o9p0

# 当同步点存储文件不存在时候,使用该配置来进行binlog dump,如果置空,则从MySQL Server最后的点开始
# 可以通过类似命令寻找同步点,"show binlog events in 'mysql-bin.000007' from positio limit 10"
mysql_binlogdump_filename=
mysql_binlogdump_position=
#mysql_binlogdump_filename=mysql-bin.000006
#mysql_binlogdump_position=68863


# 主mysql地址,端口,slaveId(与其他slave不能重复,由于主备不会同一时间连,则配置中可以相同)
#mysql_shard_0_server=172.20.133.119
mysql_shard_0_server=192.168.1.107
mysql_shard_0_port=3308
mysql_shard_0_slaveId=10

# 备mysql
#mysql_shard_0_ha1_server=172.20.133.119
mysql_shard_0_ha1_server=192.168.1.107
mysql_shard_0_ha1_port=3308
mysql_shard_0_ha1_slaveId=10

# 下面是mysql的一些高级设置,一般情况下不建议修改
# replication或者query socket的一些初始化参数
mysql_wait_timeout=999999
mysql_net_write_timeout=240
mysql_net_read_timeout=240
mysql_charset=binary

# 当主mysql长时间接收不到任何event时,切换到另外一个备mysql的超时时间,单位为毫秒
mysql_replication_socket_so_timeout=120000

# mysql复制线程的一些基础socket参数,timeout单位为毫秒
#mysql_replication_socket_in_buf=16384
#mysql_replication_socket_out_buf=16384
#mysql_replication_socket_connect_timeout=3000

fountain-config.properties

# 同步点存储的文件夹地址
producer_position=/Users/baidu/work/fountain/mysql.rowbase51

# fountain-producer和consumer直接的缓冲队列长度
memq_limit=60000

# 使用MiniTransactionPolicy策略,一个事务中最大处理的行数
trans_max_size=50000

### 按照表名进行过滤时,表名格式为database.table(可以为正则),以逗号分隔
### 当白名单和黑名单同时存在时,只有不在黑名单中同时在白名单中存在的才起作用
#filter_shard_table_black=beidou.*cold
filter_shard_table_white=
filter_shard_table_black=

3. XML文件配置

fountain-config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">

    <description>Fountain的Spring配置</description>

    <!-- properties配置文件 -->
    <bean id="producerConfig"
          class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_OVERRIDE"/>
        <property name="ignoreResourceNotFound" value="true"/>
        <property name="ignoreUnresolvablePlaceholders" value="true"/>
        <property name="locations">
            <list>
                <value>classpath:mysql.rowbase51/fountain-config.properties</value>
                <value>classpath:mysql.rowbase51/jdbc-mysql.properties</value>
            </list>
        </property>
    </bean>

    <!-- 基于表名的匹配器,支持黑白名单,支持正则表达式。黑名单的表直接忽略,白名单如果置空,则表示不启用,否则必须符合。
         符合表过滤匹配条件的binlog event会被继续处理。可以自己实现EventMatcher接口,定制匹配 -->
    <bean id="shardTableMatcher" class="net.neoremind.fountain.producer.matcher.TableMatcher">
        <property name="tableWhite" value="${filter_shard_table_white}"></property>
        <property name="tableBlack" value="${filter_shard_table_black}"></property>
    </bean>

    <!-- Row based binlog解析器 -->
    <bean id="defaultParser"
          class="net.neoremind.fountain.producer.parser.impl.DefaultParser"
          scope="prototype">
        <constructor-arg ref="shardTableMatcher"/>
    </bean>

    <!-- 同步点记录的桥接器,一般用于consumer记录同步点,而不是producer,这样可以保证消费完毕才持久化 -->
    <bean id="disposeEventPositionBridge"
          class="net.neoremind.fountain.eventposition.DisposeEventPositionBridgeImpl"></bean>

    <!-- 记录已处理事件的位置,当fountain重新启动或者切换master时按照如下顺序寻找同步点:
         1)找JVM ThreadLocal获取同步点
         2)通过本地的持久化文件寻找,也就是这里配置的bean可以使用装饰模式来不断的扩展保存、读取同步点的方法,用户可以实现DisposeEventPosition接口,定制自己的存储方式或位置表述
         3)通过本XML配置文件的选项找gtid或者binglogfile+position等信息
         4)调用MySQL的一些query命令来寻找Server最后处理的点 -->
    <bean id="disposeEventPosition" class="net.neoremind.fountain.eventposition.ReadonlyDisposeEventPosition"
          scope="prototype">
        <property name="delegate">
            <bean class="net.neoremind.fountain.eventposition.LocalFileBinlogAndOffsetDisposeEventPosition"
                  scope="prototype">
                <property name="rootPath" value="${producer_position}"></property>
                <property name="disposeEventPositionBridge" ref="disposeEventPositionBridge"></property>
            </bean>
        </property>
    </bean>

    <!-- 对于RowsLogEvent的处理策略,MiniTransactionPolicy会按照整个事务积攒一个事务内的所有RowsLogEvent,
         做merge后再通过dispatcher下发到下游 -->
    <bean id="miniransPolicy"
          class="net.neoremind.fountain.producer.dispatch.transcontrol.MiniTransactionPolicy"
          scope="prototype">
        <!-- 事务数据中RowsLogEvent的最大值,配置单位为记录条数,超出这个最大值就丢弃整个事务的数据,缺省为30000 -->
        <property name="maxTransSize" value="${trans_max_size}"></property>
    </bean>

    <!--  fountain-producer和consumer直接的缓冲队列 -->
    <bean id="fountainMQ" class="net.neoremind.fountain.common.mq.MultiPermitsMemFountainMQ">
        <constructor-arg index="0">
            <value>${memq_limit}</value>
        </constructor-arg>
    </bean>

    <!-- 通过MQ为通道传输变化的数据传输层 -->
    <bean id="memqTransport"
          class="net.neoremind.fountain.producer.dispatch.fountainmq.FoutainMQTransport">
        <property name="fmq" ref="fountainMQ"/>
    </bean>

    <!-- 变化的数据的下发流程,包括转换、序列化、和传输 -->
    <bean id="dispatchWorkflow"
          class="net.neoremind.fountain.producer.dispatch.DefaultDispatchWorkflow">
        <!-- 转化ChangeDataSet对象到其他的java对象的转化器,用户可以实现EventConverter接口定制转化器,以期转化为对用户更为友好的java对象
             缺省是DefaultEventConverter,它不做任何转化,直接输出ChangeDataSet -->
        <!-- 配置数据传输层 -->
        <property name="tranport" ref="memqTransport"/>
        <!-- <property name="packProtocol" ref="packProtocol"/> -->
        <!-- <property name="transFilter" ref="transFilter"/> -->
    </bean>

    <!-- 真正用于处理消息即变化数据的对象,实现Consumer接口。这一部分涉及使用方的业务逻辑,必须要使用方自行实现,
         如果使用方不使用ConsumerWorkflow接口的缺省实现net.neoremind.fountain.consumer.DefaultConsumerWorkflow, 这部分可以忽略 -->
    <bean id="consumerFromMemMQ" class="net.neoremind.fountain.test.consumer.TestConsumer">
        <property name="bridge" ref="disposeEventPositionBridge"></property>
    </bean>

    <!-- 最终消费、使用变化数据的流程,内置缺省实现是DefaultConsumerWorkflow,它需要反序列化之后调用Consumer接口实现,使用方可以
         通过实现ConsumerWorkflow接口来实现自己的消费流程,此时实现Consumer接口就不是必要的 -->
    <bean id="consumerWorkflowFromMemMQ"
          class="net.neoremind.fountain.consumer.spi.def.DefaultConsumerWorkflow">
        <property name="consumer" ref="consumerFromMemMQ"></property>
        <!-- <property name="recievedDataConverter" ref=""></property> -->
        <!-- <property name="unPackProtocol" ref="unPackProtocol"></property> -->
    </bean>

    <!-- 消费者监听线程 -->
    <bean id="fountainMQMessageListener"
          class="net.neoremind.fountain.consumer.support.fountainmq.FountainMQMessageListener"
          init-method="start" destroy-method="destroy">
        <property name="fmq" ref="fountainMQ"/>
        <property name="workflow" ref="consumerWorkflowFromMemMQ"></property>
    </bean>

    <!-- MySQL binlog dump的策略,针对不同的MySQL版本可以采用如下策略,
         1)BinlogGtIdAresV51DumpStrategy,针对百度自己的MySQL Ares 5.1版本使用gtid
         2)BinlogFileNamePositionDumpStrategy,所有版本MySQL都支持的传统通过binlog filename + position
         3)BinlogGtIdV56DumpStrategy,MySQL5.6之后支持的gtid set dump
         这里注意,默认地isChecksumSupport=false,BinlogGtIdV56DumpStrategy则恒为true,
         对于如果其他版本的MySQL master支持checksum,需要设置isChecksumSupport为true,目前fountain对于4个byte的校验和做忽略处理-->
    <bean id="binlogFileNamePositionDumpStrategy"
          class="net.neoremind.fountain.producer.datasource.binlogdump.BinlogFileNamePositionDumpStrategy">
        <property name="binlogFileName" value="${mysql_binlogdump_filename}"/>
        <property name="binlogPosition" value="${mysql_binlogdump_position}"/>
        <!--<property name="isChecksumSupport" value="false"/>-->
    </bean>

    <!-- 配置数据源和数据监控器,它们成对出现,数据监控器称之为fountain-producer,每个fountain-producer绑定一个数据源,
         一般数据源是ha数据源。每个fountain-producer是一个线程。如果有多个数据源,可以配置多个配置数据源和数据监控器对 -->
    <!-- 配置需要监控的mysql数据源,支持ha,可以定制多个具体的数据源,一般定制2个,一主一备 -->
    <bean id="groupIdHAMysqlBinlogDataSource00"
          class="net.neoremind.fountain.producer.datasource.ha.HAMysqlBinlogDataSource"
          init-method="init">
        <property name="disposeEventPosition" ref="disposeEventPosition"></property>
        <property name="datasourceChoosePolicy">
            <bean class="net.neoremind.fountain.datasource.RoundRobinDatasourceChoosePolicy">
                <property name="tryInterval" value="3000"></property>
            </bean>
        </property>
        <property name="mysqlDataSourceList">
            <list>
                <bean class="net.neoremind.fountain.producer.datasource.MysqlBinlogDataSource">
                    <property name="conf">
                        <bean class="net.neoremind.fountain.datasource.DatasourceConfigure">
                            <property name="mysqlServer" value="${mysql_shard_0_server}"></property>
                            <property name="mysqlPort" value="${mysql_shard_0_port}"></property>
                            <property name="waitTimeout" value="${mysql_wait_timeout}"/>
                            <property name="netWriteTimeout" value="${mysql_net_write_timeout}"/>
                            <property name="netReadTimeout" value="${mysql_net_read_timeout}"/>
                            <property name="charset" value="${mysql_charset}"/>
                            <property name="userName" value="${mysql_username}"></property>
                            <property name="password" value="${mysql_password}"></property>
                            <property name="soTimeout" value="${mysql_replication_socket_so_timeout}"></property>
                        </bean>
                    </property>
                    <property name="binlogDumpStrategy" ref="binlogFileNamePositionDumpStrategy"></property>
                    <property name="slaveId" value="${mysql_shard_0_slaveId}"></property>
                </bean>
                <bean class="net.neoremind.fountain.producer.datasource.MysqlBinlogDataSource">
                    <property name="conf">
                        <bean class="net.neoremind.fountain.datasource.DatasourceConfigure">
                            <property name="mysqlServer" value="${mysql_shard_0_ha1_server}"></property>
                            <property name="mysqlPort" value="${mysql_shard_0_ha1_port}"></property>
                            <property name="waitTimeout" value="${mysql_wait_timeout}"/>
                            <property name="netWriteTimeout" value="${mysql_net_write_timeout}"/>
                            <property name="netReadTimeout" value="${mysql_net_read_timeout}"/>
                            <property name="charset" value="${mysql_charset}"/>
                            <property name="userName" value="${mysql_username}"></property>
                            <property name="password" value="${mysql_password}"></property>
                            <property name="soTimeout" value="${mysql_replication_socket_so_timeout}"></property>
                        </bean>
                    </property>
                    <property name="binlogDumpStrategy" ref="binlogFileNamePositionDumpStrategy"></property>
                    <property name="slaveId" value="${mysql_shard_0_ha1_slaveId}"></property>
                </bean>
            </list>
        </property>
    </bean>

    <!-- 监控mysql数据变化的监控器,称之为fountain-producer -->
    <bean id="producer00" class="net.neoremind.fountain.producer.DefaultProducer"
          init-method="start" destroy-method="destroy">
        <!-- 事务控制器,缺省使用net.neoremind.fountain.producer.dispatch.transcontrol.NonTransactionPolicy,
             这不是代表没事务,而是事务不完整,当一个RowsLogEvent的数据解析完后就下发,
             一个event只是事务中一张表或一张表的部分数据. 一个完成的事务可能是多张表或者全部数据 -->
        <property name="transactionPolicy" ref="miniransPolicy"></property>
        <!-- 绑定要监控的数据源 -->
        <property name="dataSource" ref="groupIdHAMysqlBinlogDataSource00"></property>
        <!-- 数据解析器 -->
        <property name="parser" ref="defaultParser"></property>
        <!-- event匹配器 -->
        <property name="matcher" ref="shardTableMatcher"></property>
        <!-- 配置数据下发,支持多个下发, 每个下发支持不同的下发流程 -->
        <property name="dispatcher" ref="dispatchWorkflow"/>
    </bean>

</beans>

如果想使用NonTransactionPolicy策略请修改配置如下:

<!-- 对于RowsLogEvent的处理策略,NonTransactionPolicy只要接收到RowsLogEvent,
     便会通过dispatcher下发到下游 -->
<bean id="nonTransactionPolicy"
      class="net.neoremind.fountain.producer.dispatch.transcontrol.NonTransactionPolicy"
      scope="prototype">
</bean>