Skip to content

Latest commit

 

History

History
56 lines (46 loc) · 8.06 KB

backpressure.md

File metadata and controls

56 lines (46 loc) · 8.06 KB

背压

背压是使用流处理系统中经常会面对的问题之一(之所以说是之一,是因为我觉得数据倾斜也时常遇到^-^),它在流计算中指的是job处理数据的速度小于数据流入的速度 时的处理机制,通常来说,背压出现的时候,数据会迅速堆积,如果处理不当,会导致资源耗尽甚至任务崩溃。背压的场景非常之多,比如在京东或淘宝的618、双十一 购物节中,当时钟指向0点时,大量的用户开始将喜欢的商品加入购物车并进行结算,此时系统的流量将是平时流量的几倍甚至是十几倍,在这段时间内系统接收到的 数据将远高于其所能处理的数据量,这就是通常所说的"背压"。如果背压不能得到有效的处理,将会耗尽系统资源甚至导致系统崩溃。当然,如果对延迟的要求不太高或 是数据量比较小,背压的影响可能不是那么明显,但是对于大数据量的Flink任务来说,背压会严重影响其checkpoint的时长甚至导致快照失败,从而产生稳定性问题, 而这对于生产环境是十分危险的。

背压可以分为针对访问速率的静态背压和针对资源占用的动态背压两种实现方式,静态背压的实现简单但粗暴,就是直接对发往下游的数据进行提前的限流,这就很难实现动 态调整背压阈值的效果,但是有时又不得不使用,比如当使用了第三方组件且其不支持动态背压时。相比而言,动态限流的逻辑一般实现上会更加的复杂但却可以根据系统 的资源和当时的流量进行动态的控制,因而能最大化资源的使用且避免了不断的对背压流量阈值进行评估和调整。

我们知道目前主要的流处理引擎其实都提供了背压功能,但是具体到每个引擎其实现背压的方式却各不相同。在以前开发运维过的Storm框架中,它采用的是通过监控Bolt 中接收队列的负载情况,如果超过高水位值就将背压信息写入Zookeeper中,然后Zookeeper会通知所有的worker进入背压状态,最后Spout停止发送数据。新版的Storm 采用的是引入高性能无锁缓冲队列Disruptor,当这个队列已满时则停止发送数据,最终将背压一级级的向上传导直到Spout,于是Spout停止从kafka拉取数据。当Disruptor 缓冲队列不再满时,Spout重新从kafka拉取数据,从而实现了整个背压的处理过程。由于Disruptor使用的是环形数组RingBuffer实现,且在读取写入时无锁所以性能 较高。但是需要注意的时一定要通过maxSpoutPending参数来设置该缓冲队列的长度,否则该队列的长度将会无限长,依然会有背压问题,但是该长度的设置就非常难以确 定,设置的太短,会导致频繁的背压,影响数据的处理和吞吐量,设置得过长又起不到良好的背压效果,且容易导致worker节点OOM。

Flink没有继续使用Disruptor这种数据结构,但是其背压的实现依然还是有点类似。具体来讲,Flink中每一个Task都会有一个InputGate和ResultPartition,InputGate 负责接收数据,ResultPartition负责发送数据(当然Source Task没有InputGate,Sink Task没有ResultPartition),而这两个组件都会有一个对应的LocalBufferPool (缓冲池),LocalBufferPool中会有一定量的Buffer(这类buffer叫作Exclusive Buffer,其实就是Flink内存管理的单位MemorySegment的包装类,但它是特定Subtask 独享的)。当缓冲池中已申请的数量达到了上限或没有内存块时(此时,BufferPool中的Floating Buffer也已经被使用完毕),Task就会暂停读取Netty Channel,因此上游发 送端就会立即响应停止发送并进入背压状态,于是上游的写入也会停止,从而将背压逐级向上传递。这就是Flink基于TCP的背压实现的主要思路。

基于TCP的背压有两个明显的弊端:一个是每个TaskManager中可能要执行多个Task,如果多个Task的数据最终都要传输到下游的同一个TaskManager时就会复用同一个 Socket进行传输,此时候如果单个Task产生背压,则会导致复用的Socket整个发生阻塞,其余的Task的数据也无法进行传输,包括Checkpoint Barrier也无法发出导 致下游执行checkpoint的延迟也增大;二是它依赖于最底层的TCP去做流控,会导致背压传播路径过长,生效的延迟也比较大。因此在Flink 1.5版本开始就引入了新的 基于Credit的背压机制。它的原理很类似于TCP的Window机制,相信对于熟悉TCP的我们应该很容易就能理清其原理。

那么什么是基于Credit的背压机制呢?其实就是在数据的发送端维护对应的数据接收端的credit信息,这个信息表示下游还可以接收credit个buffer的数据。每次向下 游发送buffer数据的时候,credit就减去buffer的数量。当credit值为0的时候,就停止向下游发送数据。下游在有新的空闲内存的时候会通知上游有新的credit可用。 上游接收到新增的credit数量之后,更新对应channel的credit数量,即可重新开始向下游发送数据。

以上是一些理论上的讲解,概念上的阐述,主要作用是帮助分析,我们都知道"talk is cheap",所以还是来从源码上进行分析会来的更清楚。

CreditBasedSequenceNumberingViewReader是一个简单的ResultSubpartitionView的简单包装类,其中的属性numCreditsAvailable维护了下游消费端(也就是 下游RemoteInputChannel)的用于存放数据的可用buffer的数量,它表示了下游还可以接收credits个buffer的数据。每次向下游发送数据时都会调用getNextBuffer() 来获取待发送的数据,此时会对numCreditsAvailable的值自减。一旦credits的值变为0时即抛出IllegalStateException没有可用的credit异常,于是停止发送。当下 游RemoteInputChannel中的数据被消费后空闲出内存再通过notifyCreditAvailable通知上游重新开始发送,通知用的方法是notifyCreditAvailable(),这个方法会 在回收内存方法recycle()、监听器发现有缓存可用方法notifyBufferAvailable()、分配积压任务所需内存方法onSenderBacklog()时调用。通过追溯源码,我们发现 notifyCreditAvailable()方法最终调用了CreditBasedPartitionRequestClientHandler类的notifyCreditAvailable()方法,这个方法最终是通过EventExecutor 发送出去了一个UserEvent。CreditBasedPartitionRequestClientHandler类的userEventTriggered()方法会得到响应,在userEventTriggered()这个方法中, 会调用writeAndFlushNextMessageIfPossible()方法尝试对队列中的每一个Input Channel写入还没有上报的可用credits数量并刷新,上报信息会被封装为AddCredit 的Netty消息,并被PartitionRequestServerHandler的channelRead0()方法读取到,在其中判断消息类型如果是AddCredit类型,则将其放入PartitionRequestQueue 中,在其中的addCredit()方法中会根据receiverId获取NetworkSequenceViewReader,并调用它的addCredit()方法,由于NetworkSequenceViewReader就是上游的 CreditBasedSequenceNumberingViewReader,因此会直接调用其addCredit()方法将ResultSubpartitionView的可用credits增加,在增加了可用credits数量后,还 会调用enqueueAvailableReader()方法将reader加入到可用reader列表中,如果可用reader列表之前为空,还需要将reader对应的ResultSubpartitionView的buffer 发送到下游,在其中会调用getNextBuffer()方法,它会将numCreditsAvailable减一之后判断是否还有可用的credit,如果没有则抛出IllegalStateException异常。

CreditBasedPartitionRequestClientHandler类的writeAndFlushNextMessageIfPossible()方法会调用RemoteInputChannel类的getAndResetUnannouncedCredit() 方法获取到未上报的credits数即属性unannouncedCredit并将其值清零。

整个Flink基于Credit的背压机制分析到这里也就差不多了,其实现原理经过我们的分析之后其实相当的简单,抽象一下其实就是一个分布式环境下的生产者消费者模型,其核心 就是credit的上报和通知。