老白的释然 发表于 2019-2-27 17:08:18

Spark大数据WordCount代码实例演示

//手动编写一个wordcount程序
//要导入Strom的jar包
//把lib下所有的jar包包含在工程里就好了
Topology = spout(采集数据) + bolt(s)(处理数据)
//要创建主程序来书写自己的任务
public class WordCountTopology{
        public static void main(String[] args){
                //创建一个任务,并且指定任务的spout组件和多个bolt组件
                TopologyBuilder builder = new TopologyBuilder();
                //指定spout任务用于采集数据
                builder.setSpout("myspout",new WordCountSpout());
                //指定任务的bolt用于分词,计数
                //随机接收数据
                builder.setBolt("mysplit",new WordCountSplitBolt()).shuffleGrouping("myspout");
                //采用字段进行接收,分组的字段是word,根据该字段来进行分组。
                builder.setBolt("mycount", new WordCountTotalBolt()).fieldsGrouping("mysplit",new Fields("words"));
                //创建线程
                StormTopology job = builder.createTopology();
                //直接在eclipse中运行任务
                LocalCluster cluster = new LocalCluster();
                cluster.submitTopology("mydemo", new Config(), job);
        }

}
/*
Topology = spout(采集数据) + bolt(s)(处理数据)
Topology任务的组成
*/
//用来采集数据
//继承了BaseRichSpout类之后就可以作为主键来采集数据了
public class WordCountSpout extends BaseRichSpout{
        //模拟一些数据
        private String[] data = {"I love Beijing","I Love China","Beijing is the capital of China"};
        @Override
        public void nextTuple(){
                //如何采集数据,需要将采集的数据发给下一个任务(进行)
                //为了降低采集数据的速度
                Utils.sleep(3000);
                //随机采集一个数据进行发送
                int random = (new Random()).nextInt(3);
                String str = data;
                System.out.println("采集的数据是: " + str);
                //发送采集到的数据
                this.collector.emit(new Values(str));
        }
        //定义发送数据的采集器
        @Override
        public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector){
                //collector表示:通过他可以把数据发送给下一个任务
                this.collector = collector;
        }
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare){
                //定义发送的数据格式
                declare.declare(new Fields("data"));
        }
       
}
//第一个bolt组件,用于拆分单词组件
public class WordCountSplitBolt extends BaseRichBolt{
        @Override
        public void execute(Tuple tuple){
                //从上一个任务中接受数据,如何处理
                String data = tuple.getStringByField("data");
                //分词: 单词 1
                String [] words = data.split(" ");
                //注意发送的格式要和我们定义的格式是一样的
                for(String w : words){
                        this.collector.emit(new Values(w,1));
                }
        }
        //通过collector将消息发送给下一个任务
        @Override
        public void prepare(Map args, TopologyContext arg1, SpoutOutputCollector collector){
                this.collector = collector;
        }
        //定义我们要发送的数据的格式
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare){
                declare.declare(new Fields("word","count"));
        }

}
//任务的第二个bolt组件,用于单词的计数
public class WordCountTotalBolt extends BaseRichBolt{
        private Map<String,Integer> result = new HashMap<>();
        @Override
        public void execute(Tuple tuple){
                //统计每个单词的总频率
                String word = tuple.getStringByField("word");
                int count = tuple.getStringByField("count");
                if(result.containKey(word)){
                        //如果包含累加
                        int total = result.get(word);
                        result.put(word,total + count);
                }
                //表示的是第一次出现
                else{
                        result.put(word, count);
                }
                //输出到屏幕上
                System.out.println("统计的结果是: " + result);
        }
        //通过collector将消息发送给下一个任务
        @Override
        public void prepare(Map args, TopologyContext arg1, SpoutOutputCollector collector){
               
        }
        //定义我们要发送的数据的格式
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declare){
               
        }

}

lsekfe 发表于 2019-4-2 15:53:29

看看

lsekfe 发表于 2019-4-2 16:03:55

看看
页: [1]
查看完整版本: Spark大数据WordCount代码实例演示