自定义Recevier详解

简介

Spark Streaming可以接收实时流数据从任意的具有内置支持的数据源(比如 Flume, Kafka, Kinesis, files, sockets等等),这就要求开发人员实现一个接收器,用于接收来自有关数据源的数据。当然这个步骤不是必须的,因为在Spark内部的一些”Stream”方法中已经封装了类似Recevier接收器的操作.但是在数据传输的过程中可靠性是很重要的,应用在接受到源信息后,要对源发出确认信息,这种灵活的操作就得采用自定义Recevier.

Recevier机制

要构造自定义的Recevier就要实现抽象类Recevier,它在org.apache.spark.streaming.receiver包中,在老版本中实现的是NetworkRecevier,Recevier其中最重要的是onstart()以及onstop()方法,onstart()用以开始接收数据,onstop()用以停止数据的接收,但是它们两者不能无限期的阻塞。通常情况下,onstart()会是负责接收数据的线程,而onstop()将确保这些线程停止接收数据。接收线程也可以使用isstopped(),Recevier中的方法,返回一个Boolean值,用以检查他们是否应该停止接收数据。当我们接收到数据后就可以调用store()来存储数据,Recevier中重载了很多的store()方法来适应不同的情况.接收线程的所有异常都应该被捕获处理,以避免接收器的故障.重新启动将通过异步调用onstop()然后调用onstart()延迟后重新启动接收器。停止将调用onstop()或者采用isstoped()方式来终止接收器。同时,reporterror将报告一个错误信息的驱动程序(在日志和UI可见)。
Spark Recevier源码如下:

测试

接下来我将写个Demo来测试一下自定义Recevier
首先构造一个自定义的Recevier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
//自定义接收器
class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
//启动接受数据的线程
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// 这里没有调用Receiver的线程
//如果自身isStopped()返回的是false则停止
}
/** 创建一个套接字连接和接收数据,直到Receiver被停止 */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// 连接主机:端口
socket = new Socket(host, port)
//直到停止或连接中断否则继续读取
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// 当服务器再次激活时,重新启动重新连接
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// 如果不能连接到服务器则重新启动
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// 如果出现错误则重新启动
restart("Error receiving data", t)
} }}

构造好Recevier后我们就可以来进行Streaming操作了,源码如下:

与普通方法不同的是,我们调用了receiveStream来返回一个inputStream,它传入的参数是一个Recevier的对象,在这里我们传的是刚刚我们自定义的Recevier对象。