51Testing软件测试论坛

 找回密码
 (注-册)加入51Testing

QQ登录

只需一步,快速开始

微信登录,快人一步

手机号码,快捷登录

查看: 2626|回复: 2
打印 上一主题 下一主题

[原创] Spark大数据WordCount代码实例演示

[复制链接]

该用户从未签到

跳转到指定楼层
1#
发表于 2019-2-27 17:08:18 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. //手动编写一个wordcount程序
  2. //要导入Strom的jar包
  3. //把lib下所有的jar包包含在工程里就好了
  4. Topology = spout(采集数据) + bolt(s)(处理数据)
  5. //要创建主程序来书写自己的任务
  6. public class WordCountTopology{
  7.         public static void main(String[] args){
  8.                 //创建一个任务,并且指定任务的spout组件和多个bolt组件
  9.                 TopologyBuilder builder = new TopologyBuilder();
  10.                 //指定spout任务用于采集数据
  11.                 builder.setSpout("myspout",new WordCountSpout());
  12.                 //指定任务的bolt用于分词,计数
  13.                 //随机接收数据
  14.                 builder.setBolt("mysplit",new WordCountSplitBolt()).shuffleGrouping("myspout");
  15.                 //采用字段进行接收,分组的字段是word,根据该字段来进行分组。
  16.                 builder.setBolt("mycount", new WordCountTotalBolt()).fieldsGrouping("mysplit",new Fields("words"));
  17.                 //创建线程
  18.                 StormTopology job = builder.createTopology();
  19.                 //直接在eclipse中运行任务
  20.                 LocalCluster cluster = new LocalCluster();
  21.                 cluster.submitTopology("mydemo", new Config(), job);
  22.         }

  23. }
  24. /*
  25. Topology = spout(采集数据) + bolt(s)(处理数据)
  26. Topology任务的组成
  27. */
  28. //用来采集数据
  29. //继承了BaseRichSpout类之后就可以作为主键来采集数据了
  30. public class WordCountSpout extends BaseRichSpout{
  31.         //模拟一些数据
  32.         private String[] data = {"I love Beijing","I Love China","Beijing is the capital of China"};
  33.         @Override
  34.         public void nextTuple(){
  35.                 //如何采集数据,需要将采集的数据发给下一个任务(进行)
  36.                 //为了降低采集数据的速度
  37.                 Utils.sleep(3000);
  38.                 //随机采集一个数据进行发送
  39.                 int random = (new Random()).nextInt(3);
  40.                 String str = data[random];
  41.                 System.out.println("采集的数据是: " + str);
  42.                 //发送采集到的数据
  43.                 this.collector.emit(new Values(str));
  44.         }
  45.         //定义发送数据的采集器
  46.         @Override
  47.         public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector){
  48.                 //collector表示:通过他可以把数据发送给下一个任务
  49.                 this.collector = collector;
  50.         }
  51.         @Override
  52.         public void declareOutputFields(OutputFieldsDeclarer declare){
  53.                 //定义发送的数据格式
  54.                 declare.declare(new Fields("data"));
  55.         }
  56.        
  57. }
  58. //第一个bolt组件,用于拆分单词组件
  59. public class WordCountSplitBolt extends BaseRichBolt{
  60.         @Override
  61.         public void execute(Tuple tuple){
  62.                 //从上一个任务中接受数据,如何处理
  63.                 String data = tuple.getStringByField("data");
  64.                 //分词: 单词 1
  65.                 String [] words = data.split(" ");
  66.                 //注意发送的格式要和我们定义的格式是一样的
  67.                 for(String w : words){
  68.                         this.collector.emit(new Values(w,1));
  69.                 }
  70.         }
  71.         //通过collector将消息发送给下一个任务
  72.         @Override
  73.         public void prepare(Map args, TopologyContext arg1, SpoutOutputCollector collector){
  74.                 this.collector = collector;
  75.         }
  76.         //定义我们要发送的数据的格式
  77.         @Override
  78.         public void declareOutputFields(OutputFieldsDeclarer declare){
  79.                 declare.declare(new Fields("word","count"));
  80.         }

  81. }
  82. //任务的第二个bolt组件,用于单词的计数
  83. public class WordCountTotalBolt extends BaseRichBolt{
  84.         private Map<String,Integer> result = new HashMap<>();
  85.         @Override
  86.         public void execute(Tuple tuple){
  87.                 //统计每个单词的总频率
  88.                 String word = tuple.getStringByField("word");
  89.                 int count = tuple.getStringByField("count");
  90.                 if(result.containKey(word)){
  91.                         //如果包含累加
  92.                         int total = result.get(word);
  93.                         result.put(word,total + count);
  94.                 }
  95.                 //表示的是第一次出现
  96.                 else{
  97.                         result.put(word, count);
  98.                 }
  99.                 //输出到屏幕上
  100.                 System.out.println("统计的结果是: " + result);
  101.         }
  102.         //通过collector将消息发送给下一个任务
  103.         @Override
  104.         public void prepare(Map args, TopologyContext arg1, SpoutOutputCollector collector){
  105.                
  106.         }
  107.         //定义我们要发送的数据的格式
  108.         @Override
  109.         public void declareOutputFields(OutputFieldsDeclarer declare){
  110.                
  111.         }

  112. }
复制代码

分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏
回复

使用道具 举报

本版积分规则

关闭

站长推荐上一条 /1 下一条

小黑屋|手机版|Archiver|51Testing软件测试网 ( 沪ICP备05003035号 关于我们

GMT+8, 2024-11-22 03:06 , Processed in 0.065074 second(s), 22 queries .

Powered by Discuz! X3.2

© 2001-2024 Comsenz Inc.

快速回复 返回顶部 返回列表