博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka连接SparkStreaming的两种方式
阅读量:7300 次
发布时间:2019-06-30

本文共 4004 字,大约阅读时间需要 13 分钟。

第一种方式代码:

1 import org.apache.spark.storage.StorageLevel 2 import org.apache.spark.{HashPartitioner, SparkConf} 3 import org.apache.spark.streaming.kafka.KafkaUtils 4 import org.apache.spark.streaming.{Seconds, StreamingContext} 5  6 object KafkaWordCount { 7   val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { 8     //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x))) 9     iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) }10   }11 12   def main(args: Array[String]) {13     LoggerLevels.setStreamingLogLevels()14     val Array(zkQuorum, group, topics, numThreads) = args15     val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")16     val ssc = new StreamingContext(sparkConf, Seconds(5))17     ssc.checkpoint("c://ck2")18     //"alog-2016-04-16,alog-2016-04-17,alog-2016-04-18"19     //"Array((alog-2016-04-16, 2), (alog-2016-04-17, 2), (alog-2016-04-18, 2))"20     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap21     val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)22     val words = data.map(_._2).flatMap(_.split(" "))23     val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)24     wordCounts.print()//老师给的代码文件中没有这句话  必须要有一个Action,否则报错25     //java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute26     ssc.start()27     ssc.awaitTermination()28   }29 }

第二种方式代码:

1 import kafka.serializer.StringDecoder 2 import org.apache.log4j.{Level, Logger} 3 import org.apache.spark.SparkConf 4 import org.apache.spark.rdd.RDD 5 import org.apache.spark.streaming.kafka.{KafkaManager, KafkaUtils} 6 import org.apache.spark.streaming.{Seconds, StreamingContext} 7  8  9 object DirectKafkaWordCount {10 11   /*  def dealLine(line: String): String = {12       val list = line.split(',').toList13   //    val list = AnalysisUtil.dealString(line, ',', '"')// 把dealString函数当做split即可14       list.get(0).substring(0, 10) + "-" + list.get(26)15     }*/16 17   def processRdd(rdd: RDD[(String, String)]): Unit = {18     val lines = rdd.map(_._2)19     val words = lines.map(_.split(" "))20     val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)21     wordCounts.foreach(println)22   }23 24   def main(args: Array[String]) {25     if (args.length < 3) {26       System.err.println(27         s"""28            |Usage: DirectKafkaWordCount 
29 |
is a list of one or more Kafka brokers30 |
is a list of one or more kafka topics to consume from31 |
is a consume group32 |33 """.stripMargin)34 System.exit(1)35 }36 37 Logger.getLogger("org").setLevel(Level.WARN)38 39 val Array(brokers, topics, groupId) = args40 41 // Create context with 2 second batch interval42 val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")43 sparkConf.setMaster("local[*]")44 sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "5")45 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")46 47 val ssc = new StreamingContext(sparkConf, Seconds(2))48 49 // Create direct kafka stream with brokers and topics50 val topicsSet = topics.split(",").toSet51 val kafkaParams = Map[String, String](52 "metadata.broker.list" -> brokers,53 "group.id" -> groupId,54 "auto.offset.reset" -> "smallest"55 )56 57 val km = new KafkaManager(kafkaParams)58 59 val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder](60 ssc, kafkaParams, topicsSet)61 62 messages.foreachRDD(rdd => {63 if (!rdd.isEmpty()) {64 // 先处理消息65 processRdd(rdd)66 // 再更新offsets67 km.updateZKOffsets(rdd)68 }69 })70 71 ssc.start()72 ssc.awaitTermination()73 }74 }

本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6810238.html,如需转载请自行联系原作者

你可能感兴趣的文章
团队作业一
查看>>
CodeFirst体验之且行且珍惜
查看>>
Javascript各种事件汇总
查看>>
Zepto源码分析-架构
查看>>
'mysql' 不是内部或外部命令,也不是可运行的程序或批处理文件
查看>>
查询时注意 查询字段传值参数类型,尽量和数据库字段类型一致
查看>>
在Windows上启用LDAPs
查看>>
Windows服务的安装,启动,停止和卸载
查看>>
线程安全且高效的单例
查看>>
线程状态
查看>>
bzoj 1069 [SCOI2007]最大土地面积——旋转卡壳
查看>>
中文词频统计
查看>>
setBackgroundDrawable和setBackgroundColor的用法(转)
查看>>
ios 自定义UIView绘制时文字上下颠倒问题解决
查看>>
VC++ 实现文件与应用程序关联
查看>>
中位数与顺序统计量
查看>>
js中作用域总结
查看>>
编写易于理解代码的六种方式
查看>>
Summation Order
查看>>
多线程
查看>>