Spark Streaming重要操作列举

离散流简介

在Spark Streaming中新增加了一个重要概念就是离散流(DStreams),它代表了一个连续的数据流,或从数据源接收到的,或通过将输入流中产生处理后的数据流。在内部,它是由RDDS的连续序列表示,这是Spark的一个不可改变的,分布式的数据集抽象。任何在DStream上使用的操作都会转化为底层RDDS操作。而DStream最重要的就是输入,转换,以及输出,今天介绍的将是后面的两部分.

离散流转换

类似RDD,转换允许输入的流可以被修改。DStreams支持多种转换的类似Spark RDD的使用。

1
map(func)

源DSTREAM的每个元素通过函数func返回一个新的 DSTREAM。

1
flatMap(func)

类似的map(),但每个输入项可以被映射到0或者更多的输出项

1
filter(func)

在源DSTREAM上选择FUNC函数返回仅为true的记录, 最终返回一个新的DSTREAM 。

1
repartition(numPartitions)

通过创建更多或更少的分区改变平行于这个DSTREAM的层次。

1
union(otherStream)

返回一个包含源DStream与其他DStream元素联合后新的DSTREAM。

1
count()

在源DSTREAM的每RDD的数目计数并返回包含单元素RDDS新的DSTREAM。返回的依然是DSTREAM,只不过里面只有count值

1
reduce(func)

使用函数func(有两个参数并返回一个结果)将源DStream中的每个RDD进行元素聚合,返回一个单元素RDDs新的DStream.该函数应该关联,使得它可以并行地进行计算。
例如对netcat上的数据求和可用下面代码:

1
lines.flatMap(_.split(" ")).map(w=>w.toInt).reduce(_+_).foreachRDD(e=>e.foreach(println))

1
countByValue()

当请求DStream类型为K元素的DStream,返回类型为 (K,Long)的键值对的新DSTREAM,其中每个键的值就是它的源DSTREAM每个RDD频率。
例如我在netcat上面输入d d f f g g h h,然后执行下面源码:

1
lines.flatMap(_.split(" ")).countByValue().foreachRDD(e=>e.foreach(println))

会得到如下结果

1
2
3
4
(d,2)
(h,2)
(f,2)
(g,2)

1
reduceByKey(func, [numTasks])

当一个类型为(K,V)键值对的DStream被调用的时候,返回类型为类型为(K,V)键值对的新DStream,其中每个键的值都是使用给定的reduce函数汇总。注意:默认情况下,使用Spark的并行任务默认号码(2为本地模式,并且在集群模式的数目是由配置属性确定 spark.default.parallelism)进行分组。你可以通过一个可选numTasks参数设置不同数量的任务。
这个我用wordcount的例子来说明,源码如下

1
2
3
4
val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
lines.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).print()
ssc.start()

在这个例子中我们对netcat推过来的单词进行了一次map()操作,为每个单词计数为1,然后我们得到了(单词1,1),(单词2,1)…,然后我们进行reduceByKey(+)把key相同的value相加,和RDD中的reduceByKey操作类似.

1
join(otherStream, [numTasks])

当被调用类型分别为的(K,V)和(K,W)键值对的2 个 DStream时,返回包含所有键值对每个键的元素,类型为(K,(V,W))键值对的一个新DSTREAM。
如下图例子


1
cogroup(otherStream, [numTasks])

当被调用的DStream含有(K, V) 和(K, W)键值对时,返回(K, Seq[V], Seq[W])元组的新DStream.

1
transform(func)

通过源DSTREAM的每RDD应用RDD-to-RDD函数返回一个新的DSTREAM。这可以用来在DStream做任意RDD操作。
虽然transform()的一些操作可以用map()等函数代替,但是无疑transform()的灵活性更高.它可以用以API中没有的RDD操作,而且操作不暴露在外面.

1
updateStateByKey(func)

返回一个新 “状态”的DStream,所在每个键的状态是根据键的前一个状态和键的新值应用给定函数后的更新。这可以被用来维持每个键的任何状态数据。
该updateStateByKey()操作可以让你保持任意状态,同时不断有新的信息进行更新。要使用此功能,你就必须做两个步:

1
2
1.定义检查点 - 用以进行检查更新,设置的应该是一个目录,里面会存每次读取的数据。
2.定义状态更新函数 - 用一个函数指定如何使用先前的状态,从输入流中的新值更新状态。

下面我将写的Demo测试一下,我们前面写的wordcount只能对每次输入的数据进行计数,以前输入的数据就不在计数范围之内了,现在我们要做的就是要能对以前输入的数据也放在计数范围之内.源码如下:

转换操作除了这些外还有一部分的窗口转换操作,允许你通过滑动窗口对数据进行转换.这些方法有如下一些:

1
window(windowLength, slideInterval)

返回一个基于源DStream的窗口批次计算得到新的 DStream.例如看下面代码:

1
lines.flatMap(_.split(" ")).window(Seconds(30),Seconds(10)).print()

这段代码用了window,它表示的是每10秒取一次Netcat上的最近30秒的数据.所以如果我们在netcat上按照每隔一秒输入递增数字(1,2,3,4,…)

1
2
3
4
5
在第一次输出时会是1,2,3,4,5,6,7,8,9,10
在第二次输出时会是1,2,3,4,...17,18,19,20
在第三次输出时会是1,2,3,4,...27,28,29,30
在第四次输出时就是11,12,13,14....38,39,40
...

1
countByWindow(windowLength,slideInterval)

返回一个流中的元素的滑动窗口计数。
如果你前面创建StreamingContext的时候使用了含有batchDuration: Duration的重载,那么你的windowLength,slideInterval必须是你前面定义的batchDuration的倍数.

1
reduceByWindow(func, windowLength,slideIn terval)

返回一个新的单件流,通过FUNC及在使用滑动间隔对数据流 中的元素聚合。该函数应该联合以便它可以正确地在并行计算。

1
reduceByKeyAndWindow(func,windowLengt h, slideInterval, [numTasks])

当调用DStream(K,V)对时,返回新(K,V)对的DSTREAM ,其中每个键的值是根据给定的reduce函数FUNC 和滑动窗口批次汇总。注意:默认情况下,这里使用Spark的并行任务默认号码(2本地模式,并在集群模式下的数量由配置属性决定spark.default.parallelism)做分组。你可以通过一个可选 numTasks参数设置不同数量的任务。

1
lines.flatMap(_.split(" ")).map(x=> (x, 1)).reduceByKeyAndWindow((a:Int,b:Int)=>a+b,Seconds(10),Seconds(5)).print(100)

这段代码,也是wordcount的变形,是每隔5秒对最近10秒的单词进行wordcount操作.

1
countByValueAndWindow(windowLength,sli deInterval, [numTasks])

当被调用的(K,V)对的DStream,返回新的(K,Long)对的DSTREAM,对其中每个键的值是其在一个滑动窗口频 率。和reduceByKeyAndWindow 一样,减少任务的数量是通过一个可选的参数进行配置。

离散流输出:

1
print()

首先在Dirver上打印每一批DStream数据中的10个元素。这对于开发和调试。

1
saveAsObjectFiles(prefix, [suffix])

将DSTREAM的内容为序列化对象并保存为SequenceFile文件。根据产生在每个批次间隔的文件名前缀和后缀:“前缀TIME_IN_MS [.后缀]” 。

1
saveAsTextFiles(prefix, [suffix])

保存此DSTREAM的内容作为文本文件。根据产生在每个批次间隔的文件名前缀和后缀:“前缀TIME_IN_MS [.后缀]” 。

1
saveAsHadoopFiles(prefix, [suffix])

保存此DSTREAM的内容为Hadoop的文件。根据产生在每个批次间隔的文件名前缀和后缀:“前缀TIME_IN_MS [.后缀]” 。

1
foreachRDD(func)

最通用的输出操作,是使用函数FUNC,从流中生成的每个RDD。这个函数可以将每个RDD的数据推送到外部系统,如保存RDD为文件,或通过网络写到数据库。注意,函数FUNC是在driver执行,并且通常将具有在它的RDD行为,将强制执行RDDS的计算。
通常将数据写入到外部系统需要创建一个连接对象(如TCP连接到远程服务器),并用它来发送数据到远程系统。我们可以这样写:

1
2
3
4
5
6
7
8
dstream.foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection)
// return to the pool for future reuse
}) })

DStreams由输出操作延迟方式执行的,就像RDDS由RDD actions延迟方式执行。具体来讲,DSTREAM输出操作内部的RDD actions迫使所接收的数据的处理。因此,如果你的应用程序没有任何输出操作,或在内部没有任何RDD action输出操作,如 dstream.foreachRDD(),那么什么都不会得到执行。系统将简单地接收的数据,并丢弃它.默认情况下,输出操作有一个就会执行一次。并且按照它们在它们的应用中定义的顺序执行。