|
- //手动编写一个wordcount程序
- //要导入Strom的jar包
- //把lib下所有的jar包包含在工程里就好了
- Topology = spout(采集数据) + bolt(s)(处理数据)
- //要创建主程序来书写自己的任务
- public class WordCountTopology{
- public static void main(String[] args){
- //创建一个任务,并且指定任务的spout组件和多个bolt组件
- TopologyBuilder builder = new TopologyBuilder();
- //指定spout任务用于采集数据
- builder.setSpout("myspout",new WordCountSpout());
- //指定任务的bolt用于分词,计数
- //随机接收数据
- builder.setBolt("mysplit",new WordCountSplitBolt()).shuffleGrouping("myspout");
- //采用字段进行接收,分组的字段是word,根据该字段来进行分组。
- builder.setBolt("mycount", new WordCountTotalBolt()).fieldsGrouping("mysplit",new Fields("words"));
- //创建线程
- StormTopology job = builder.createTopology();
- //直接在eclipse中运行任务
- LocalCluster cluster = new LocalCluster();
- cluster.submitTopology("mydemo", new Config(), job);
- }
-
- }
- /*
- Topology = spout(采集数据) + bolt(s)(处理数据)
- Topology任务的组成
- */
- //用来采集数据
- //继承了BaseRichSpout类之后就可以作为主键来采集数据了
- public class WordCountSpout extends BaseRichSpout{
- //模拟一些数据
- private String[] data = {"I love Beijing","I Love China","Beijing is the capital of China"};
- @Override
- public void nextTuple(){
- //如何采集数据,需要将采集的数据发给下一个任务(进行)
- //为了降低采集数据的速度
- Utils.sleep(3000);
- //随机采集一个数据进行发送
- int random = (new Random()).nextInt(3);
- String str = data[random];
- System.out.println("采集的数据是: " + str);
- //发送采集到的数据
- this.collector.emit(new Values(str));
- }
- //定义发送数据的采集器
- @Override
- public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector){
- //collector表示:通过他可以把数据发送给下一个任务
- this.collector = collector;
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declare){
- //定义发送的数据格式
- declare.declare(new Fields("data"));
- }
-
- }
- //第一个bolt组件,用于拆分单词组件
- public class WordCountSplitBolt extends BaseRichBolt{
- @Override
- public void execute(Tuple tuple){
- //从上一个任务中接受数据,如何处理
- String data = tuple.getStringByField("data");
- //分词: 单词 1
- String [] words = data.split(" ");
- //注意发送的格式要和我们定义的格式是一样的
- for(String w : words){
- this.collector.emit(new Values(w,1));
- }
- }
- //通过collector将消息发送给下一个任务
- @Override
- public void prepare(Map args, TopologyContext arg1, SpoutOutputCollector collector){
- this.collector = collector;
- }
- //定义我们要发送的数据的格式
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declare){
- declare.declare(new Fields("word","count"));
- }
-
- }
- //任务的第二个bolt组件,用于单词的计数
- public class WordCountTotalBolt extends BaseRichBolt{
- private Map<String,Integer> result = new HashMap<>();
- @Override
- public void execute(Tuple tuple){
- //统计每个单词的总频率
- String word = tuple.getStringByField("word");
- int count = tuple.getStringByField("count");
- if(result.containKey(word)){
- //如果包含累加
- int total = result.get(word);
- result.put(word,total + count);
- }
- //表示的是第一次出现
- else{
- result.put(word, count);
- }
- //输出到屏幕上
- System.out.println("统计的结果是: " + result);
- }
- //通过collector将消息发送给下一个任务
- @Override
- public void prepare(Map args, TopologyContext arg1, SpoutOutputCollector collector){
-
- }
- //定义我们要发送的数据的格式
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declare){
-
- }
-
- }
复制代码
|
|