本文共 4004 字,大约阅读时间需要 13 分钟。
第一种方式代码:
1 import org.apache.spark.storage.StorageLevel 2 import org.apache.spark.{HashPartitioner, SparkConf} 3 import org.apache.spark.streaming.kafka.KafkaUtils 4 import org.apache.spark.streaming.{Seconds, StreamingContext} 5 6 object KafkaWordCount { 7 val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { 8 //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x))) 9 iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(i => (x, i)) }10 }11 12 def main(args: Array[String]) {13 LoggerLevels.setStreamingLogLevels()14 val Array(zkQuorum, group, topics, numThreads) = args15 val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")16 val ssc = new StreamingContext(sparkConf, Seconds(5))17 ssc.checkpoint("c://ck2")18 //"alog-2016-04-16,alog-2016-04-17,alog-2016-04-18"19 //"Array((alog-2016-04-16, 2), (alog-2016-04-17, 2), (alog-2016-04-18, 2))"20 val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap21 val data = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_AND_DISK_SER)22 val words = data.map(_._2).flatMap(_.split(" "))23 val wordCounts = words.map((_, 1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)24 wordCounts.print()//老师给的代码文件中没有这句话 必须要有一个Action,否则报错25 //java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute26 ssc.start()27 ssc.awaitTermination()28 }29 }
第二种方式代码:
1 import kafka.serializer.StringDecoder 2 import org.apache.log4j.{Level, Logger} 3 import org.apache.spark.SparkConf 4 import org.apache.spark.rdd.RDD 5 import org.apache.spark.streaming.kafka.{KafkaManager, KafkaUtils} 6 import org.apache.spark.streaming.{Seconds, StreamingContext} 7 8 9 object DirectKafkaWordCount {10 11 /* def dealLine(line: String): String = {12 val list = line.split(',').toList13 // val list = AnalysisUtil.dealString(line, ',', '"')// 把dealString函数当做split即可14 list.get(0).substring(0, 10) + "-" + list.get(26)15 }*/16 17 def processRdd(rdd: RDD[(String, String)]): Unit = {18 val lines = rdd.map(_._2)19 val words = lines.map(_.split(" "))20 val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)21 wordCounts.foreach(println)22 }23 24 def main(args: Array[String]) {25 if (args.length < 3) {26 System.err.println(27 s"""28 |Usage: DirectKafkaWordCount29 | is a list of one or more Kafka brokers30 | is a list of one or more kafka topics to consume from31 | is a consume group32 |33 """.stripMargin)34 System.exit(1)35 }36 37 Logger.getLogger("org").setLevel(Level.WARN)38 39 val Array(brokers, topics, groupId) = args40 41 // Create context with 2 second batch interval42 val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")43 sparkConf.setMaster("local[*]")44 sparkConf.set("spark.streaming.kafka.maxRatePerPartition", "5")45 sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")46 47 val ssc = new StreamingContext(sparkConf, Seconds(2))48 49 // Create direct kafka stream with brokers and topics50 val topicsSet = topics.split(",").toSet51 val kafkaParams = Map[String, String](52 "metadata.broker.list" -> brokers,53 "group.id" -> groupId,54 "auto.offset.reset" -> "smallest"55 )56 57 val km = new KafkaManager(kafkaParams)58 59 val messages = km.createDirectStream[String, String, StringDecoder, StringDecoder](60 ssc, kafkaParams, topicsSet)61 62 messages.foreachRDD(rdd => {63 if (!rdd.isEmpty()) {64 // 先处理消息65 processRdd(rdd)66 // 再更新offsets67 km.updateZKOffsets(rdd)68 }69 })70 71 ssc.start()72 ssc.awaitTermination()73 }74 }
本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/6810238.html,如需转载请自行联系原作者