Spark大数据WordCount代码实例演示
//手动编写一个wordcount程序//要导入Strom的jar包
//把lib下所有的jar包包含在工程里就好了
Topology = spout(采集数据) + bolt(s)(处理数据)
//要创建主程序来书写自己的任务
public class WordCountTopology{
public static void main(String[] args){
//创建一个任务,并且指定任务的spout组件和多个bolt组件
TopologyBuilder builder = new TopologyBuilder();
//指定spout任务用于采集数据
builder.setSpout("myspout",new WordCountSpout());
//指定任务的bolt用于分词,计数
//随机接收数据
builder.setBolt("mysplit",new WordCountSplitBolt()).shuffleGrouping("myspout");
//采用字段进行接收,分组的字段是word,根据该字段来进行分组。
builder.setBolt("mycount", new WordCountTotalBolt()).fieldsGrouping("mysplit",new Fields("words"));
//创建线程
StormTopology job = builder.createTopology();
//直接在eclipse中运行任务
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("mydemo", new Config(), job);
}
}
/*
Topology = spout(采集数据) + bolt(s)(处理数据)
Topology任务的组成
*/
//用来采集数据
//继承了BaseRichSpout类之后就可以作为主键来采集数据了
public class WordCountSpout extends BaseRichSpout{
//模拟一些数据
private String[] data = {"I love Beijing","I Love China","Beijing is the capital of China"};
@Override
public void nextTuple(){
//如何采集数据,需要将采集的数据发给下一个任务(进行)
//为了降低采集数据的速度
Utils.sleep(3000);
//随机采集一个数据进行发送
int random = (new Random()).nextInt(3);
String str = data;
System.out.println("采集的数据是: " + str);
//发送采集到的数据
this.collector.emit(new Values(str));
}
//定义发送数据的采集器
@Override
public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector){
//collector表示:通过他可以把数据发送给下一个任务
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declare){
//定义发送的数据格式
declare.declare(new Fields("data"));
}
}
//第一个bolt组件,用于拆分单词组件
public class WordCountSplitBolt extends BaseRichBolt{
@Override
public void execute(Tuple tuple){
//从上一个任务中接受数据,如何处理
String data = tuple.getStringByField("data");
//分词: 单词 1
String [] words = data.split(" ");
//注意发送的格式要和我们定义的格式是一样的
for(String w : words){
this.collector.emit(new Values(w,1));
}
}
//通过collector将消息发送给下一个任务
@Override
public void prepare(Map args, TopologyContext arg1, SpoutOutputCollector collector){
this.collector = collector;
}
//定义我们要发送的数据的格式
@Override
public void declareOutputFields(OutputFieldsDeclarer declare){
declare.declare(new Fields("word","count"));
}
}
//任务的第二个bolt组件,用于单词的计数
public class WordCountTotalBolt extends BaseRichBolt{
private Map<String,Integer> result = new HashMap<>();
@Override
public void execute(Tuple tuple){
//统计每个单词的总频率
String word = tuple.getStringByField("word");
int count = tuple.getStringByField("count");
if(result.containKey(word)){
//如果包含累加
int total = result.get(word);
result.put(word,total + count);
}
//表示的是第一次出现
else{
result.put(word, count);
}
//输出到屏幕上
System.out.println("统计的结果是: " + result);
}
//通过collector将消息发送给下一个任务
@Override
public void prepare(Map args, TopologyContext arg1, SpoutOutputCollector collector){
}
//定义我们要发送的数据的格式
@Override
public void declareOutputFields(OutputFieldsDeclarer declare){
}
}
看看 看看
页:
[1]