博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm 统计单词
阅读量:6095 次
发布时间:2019-06-20

本文共 5553 字,大约阅读时间需要 18 分钟。

hot3.png

1、发射数据Spout类

 

public class SentenceSpout extends BaseRichSpout {    private static final long serialVersionUID = 1L;    private SpoutOutputCollector collector;    private Integer count = 0;    public SentenceSpout() {    }    @Override    public void open(Map conf, TopologyContext context,                     SpoutOutputCollector collector) {        this.collector = collector;    }    @Override    public void nextTuple() {        if (count > 0) {            Utils.sleep(5000);            return;        }        count++;                String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away",                "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"};        for (String sentence : sentences) {            collector.emit(new Values(sentence));        }    }    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("sentence"));    }}

2、按照\\s切割单词Bolt类

 

public class SentenceSplitBolt extends BaseRichBolt {    private static final long serialVersionUID = 1L;    OutputCollector collector;    private static final Log LOG = LogFactory.getLog(ReduceBolt.class);    @Override    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {        this.collector = collector;    }    @Override    public void execute(Tuple tuple) {        //LOG.info("sentence: " + tuple);        //split: spout:5, stream: default, id: {}, [the cow jumped over the moon]        String sentence = tuple.getString(0);        String[] words = sentence.split("\\s");        for (String word : words) {            collector.emit(new Values(word));        }    }    @Override    public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("sentence-split"));    }}

 

3、统计Bolt类

 

public class ReduceBolt extends BaseRichBolt {    private static final long serialVersionUID = 1L;    OutputCollector collector;    Map
counts = new HashMap
(); private static final Log LOG = LogFactory.getLog(ReduceBolt.class); public void prepare(Map conf, TopologyContext context, OutputCollector collector) { this.collector = collector; } public void execute(Tuple tuple) { //LOG.info("word: " + tuple); String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); }}

 

4、展示Bolt类

public class ResultBolt extends BaseRichBolt {    private static final long serialVersionUID = 1L;    OutputCollector collector;    private static final Log LOG = LogFactory.getLog(ReduceBolt.class);    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {        this.collector = collector;    }    public void execute(Tuple tuple) {        //LOG.info("result: " + tuple);        LOG.info(tuple.getStringByField("word") + "  " + tuple.getIntegerByField("count"));    }    public void declareOutputFields(OutputFieldsDeclarer declarer) {        declarer.declare(new Fields("words"));    }}

 

5、Main方法

public static void main(String[] args) {        TopologyBuilder builder = new TopologyBuilder();        //读取数据源        builder.setSpout("spout", new SentenceSpout(), 1);        //句子切分        builder.setBolt("split", new SentenceSplitBolt(), 1).shuffleGrouping("spout");        //统计单词 并按照split组件中的sentence-split字段聚合        builder.setBolt("count", new ReduceBolt(), 1).fieldsGrouping("split", new Fields("sentence-split"));        //展示        builder.setBolt("result", new ResultBolt(), 1).shuffleGrouping("count");        Config config = new Config();        config.setDebug(false);        //集群模式        if (args != null && args.length > 0) {            config.setNumWorkers(2);            try {                StormSubmitter.submitTopology(args[0], config, builder.createTopology());            } catch (AlreadyAliveException e) {                e.printStackTrace();            } catch (InvalidTopologyException e) {                e.printStackTrace();            }            //单机模式        } else {            config.setMaxTaskParallelism(1);            ;            LocalCluster cluster = new LocalCluster();            cluster.submitTopology("word-count", config, builder.createTopology());            try {                Thread.sleep(60 * 1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            cluster.shutdown();        }    }}

 

6、结果

04-27 13:11:06 [INFO] [word.ReduceBolt(32)] seven  1

04-27 13:11:06 [INFO] [word.ReduceBolt(32)] years  1
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] ago  1
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] snow  1
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] white  1
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] and  2
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] the  4
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] seven  2
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] dwarfs  1
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] i  1
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] am  1
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] at  1
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] two  1
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] with  1
04-27 13:11:06 [INFO] [word.ReduceBolt(32)] nature  1

转载于:https://my.oschina.net/momisabuilder/blog/888280

你可能感兴趣的文章
我的友情链接
查看>>
asp.net开发3层架构 每一层作用
查看>>
基于模型开发 Back-to-Back测试统合工具-MC-Verifier
查看>>
如何使用UML(统一建模语言)画PHP类图
查看>>
execl打开linux下cvs文件乱码问题解决办法
查看>>
android当前正在运行的应用包名
查看>>
转:电源滤波电路、整流电源滤波电路分析
查看>>
我的友情链接
查看>>
Hadoop集群搭建的无密登录配置
查看>>
angular使directive让div contenteditable & ng-model生效
查看>>
制作CentOS 6.4 U盘启动安装盘
查看>>
Java try、catch、finally及finally执行顺序详解
查看>>
children childNodes nodeType
查看>>
如何在Ubuntu 16.04上将Redis服务器设置为PHP的会话处理程序
查看>>
固态硬盘价格大跳水,再不入手又要涨了!
查看>>
css隐形的空隙(inline的坑)
查看>>
深圳美景品牌策划机构:美景“快传播”赢得法国最大乳业合作社赞誉
查看>>
nginx服务
查看>>
Android中使用自定义的字体
查看>>
linux 中文件类型和颜色的区分
查看>>