1、发射数据Spout类
public class SentenceSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; private SpoutOutputCollector collector; private Integer count = 0; public SentenceSpout() { } @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } @Override public void nextTuple() { if (count > 0) { Utils.sleep(5000); return; } count++; String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"}; for (String sentence : sentences) { collector.emit(new Values(sentence)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence")); }}
2、按照\\s切割单词Bolt类
public class SentenceSplitBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; OutputCollector collector; private static final Log LOG = LogFactory.getLog(ReduceBolt.class); @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; } @Override public void execute(Tuple tuple) { //LOG.info("sentence: " + tuple); //split: spout:5, stream: default, id: {}, [the cow jumped over the moon] String sentence = tuple.getString(0); String[] words = sentence.split("\\s"); for (String word : words) { collector.emit(new Values(word)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("sentence-split")); }}
3、统计Bolt类
public class ReduceBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; OutputCollector collector; Mapcounts = new HashMap (); private static final Log LOG = LogFactory.getLog(ReduceBolt.class); public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { //LOG.info("word: " + tuple); String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); }}
4、展示Bolt类
public class ResultBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; OutputCollector collector; private static final Log LOG = LogFactory.getLog(ReduceBolt.class); public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { //LOG.info("result: " + tuple); LOG.info(tuple.getStringByField("word") + " " + tuple.getIntegerByField("count")); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("words")); }}
5、Main方法
public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); //读取数据源 builder.setSpout("spout", new SentenceSpout(), 1); //句子切分 builder.setBolt("split", new SentenceSplitBolt(), 1).shuffleGrouping("spout"); //统计单词 并按照split组件中的sentence-split字段聚合 builder.setBolt("count", new ReduceBolt(), 1).fieldsGrouping("split", new Fields("sentence-split")); //展示 builder.setBolt("result", new ResultBolt(), 1).shuffleGrouping("count"); Config config = new Config(); config.setDebug(false); //集群模式 if (args != null && args.length > 0) { config.setNumWorkers(2); try { StormSubmitter.submitTopology(args[0], config, builder.createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } //单机模式 } else { config.setMaxTaskParallelism(1); ; LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", config, builder.createTopology()); try { Thread.sleep(60 * 1000); } catch (InterruptedException e) { e.printStackTrace(); } cluster.shutdown(); } }}
6、结果
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] seven 1
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] years 1 04-27 13:11:06 [INFO] [word.ReduceBolt(32)] ago 1 04-27 13:11:06 [INFO] [word.ReduceBolt(32)] snow 1 04-27 13:11:06 [INFO] [word.ReduceBolt(32)] white 1 04-27 13:11:06 [INFO] [word.ReduceBolt(32)] and 2 04-27 13:11:06 [INFO] [word.ReduceBolt(32)] the 4 04-27 13:11:06 [INFO] [word.ReduceBolt(32)] seven 2 04-27 13:11:06 [INFO] [word.ReduceBolt(32)] dwarfs 1 04-27 13:11:06 [INFO] [word.ReduceBolt(32)] i 1 04-27 13:11:06 [INFO] [word.ReduceBolt(32)] am 1 04-27 13:11:06 [INFO] [word.ReduceBolt(32)] at 1 04-27 13:11:06 [INFO] [word.ReduceBolt(32)] two 1 04-27 13:11:06 [INFO] [word.ReduceBolt(32)] with 1 04-27 13:11:06 [INFO] [word.ReduceBolt(32)] nature 1