简介
Spark Streaming可以接收实时流数据从任意的具有内置支持的数据源(比如 Flume, Kafka, Kinesis, files, sockets等等),这就要求开发人员实现一个接收器,用于接收来自有关数据源的数据。当然这个步骤不是必须的,因为在Spark内部的一些”Stream”方法中已经封装了类似Recevier接收器的操作.但是在数据传输的过程中可靠性是很重要的,应用在接受到源信息后,要对源发出确认信息,这种灵活的操作就得采用自定义Recevier.
Recevier机制
要构造自定义的Recevier就要实现抽象类Recevier,它在org.apache.spark.streaming.receiver包中,在老版本中实现的是NetworkRecevier,Recevier其中最重要的是onstart()以及onstop()方法,onstart()用以开始接收数据,onstop()用以停止数据的接收,但是它们两者不能无限期的阻塞。通常情况下,onstart()会是负责接收数据的线程,而onstop()将确保这些线程停止接收数据。接收线程也可以使用isstopped(),Recevier中的方法,返回一个Boolean值,用以检查他们是否应该停止接收数据。当我们接收到数据后就可以调用store()来存储数据,Recevier中重载了很多的store()方法来适应不同的情况.接收线程的所有异常都应该被捕获处理,以避免接收器的故障.重新启动将通过异步调用onstop()然后调用onstart()延迟后重新启动接收器。停止将调用onstop()或者采用isstoped()方式来终止接收器。同时,reporterror将报告一个错误信息的驱动程序(在日志和UI可见)。
Spark Recevier源码如下:
测试
接下来我将写个Demo来测试一下自定义Recevier
首先构造一个自定义的Recevier
构造好Recevier后我们就可以来进行Streaming操作了,源码如下:
与普通方法不同的是,我们调用了receiveStream来返回一个inputStream,它传入的参数是一个Recevier的对象,在这里我们传的是刚刚我们自定义的Recevier对象。