博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm编程入门API系列之Storm的Topology多个Executors数目控制实现
阅读量:6830 次
发布时间:2019-06-26

本文共 7200 字,大约阅读时间需要 24 分钟。

 

 

 

 

前期博客

 

 

 

 

继续编写

  StormTopologyMoreExecutor.java

 

 

 

 

package zhouls.bigdata.stormDemo;import java.util.Map;import org.apache.storm.Config;import org.apache.storm.LocalCluster;import org.apache.storm.StormSubmitter;import org.apache.storm.generated.AlreadyAliveException;import org.apache.storm.generated.AuthorizationException;import org.apache.storm.generated.InvalidTopologyException;import org.apache.storm.spout.SpoutOutputCollector;import org.apache.storm.task.OutputCollector;import org.apache.storm.task.TopologyContext;import org.apache.storm.topology.OutputFieldsDeclarer;import org.apache.storm.topology.TopologyBuilder;import org.apache.storm.topology.base.BaseRichBolt;import org.apache.storm.topology.base.BaseRichSpout;import org.apache.storm.tuple.Fields;import org.apache.storm.tuple.Tuple;import org.apache.storm.tuple.Values;import org.apache.storm.utils.Utils;public class StormTopologyMoreExecutor {        public static class MySpout extends BaseRichSpout{        private Map conf;        private TopologyContext context;        private SpoutOutputCollector collector;        public void open(Map conf, TopologyContext context,                SpoutOutputCollector collector) {            this.conf = conf;            this.collector = collector;            this.context = context;        }        int num = 0;         public void nextTuple() {            num++;            System.out.println("spout:"+num);            this.collector.emit(new Values(num));            Utils.sleep(1000);        }        public void declareOutputFields(OutputFieldsDeclarer declarer) {            declarer.declare(new Fields("num"));        }            }                public static class MyBolt extends BaseRichBolt{                private Map stormConf;        private TopologyContext context;        private OutputCollector collector;        public void prepare(Map stormConf, TopologyContext context,                OutputCollector collector) {            this.stormConf = stormConf;            this.context = context;            this.collector = collector;        }                public void execute(Tuple input) {            Integer num = input.getIntegerByField("num");            System.out.println("线程id:"+Thread.currentThread().getId()+",接收的值为:"+num);        }        public void declareOutputFields(OutputFieldsDeclarer declarer) {                    }            }                public static void main(String[] args) {        TopologyBuilder topologyBuilder = new TopologyBuilder();        String spout_id = MySpout.class.getSimpleName();        String bolt_id = MyBolt.class.getSimpleName();                topologyBuilder.setSpout(spout_id, new MySpout());        topologyBuilder.setBolt(bolt_id, new MyBolt(),3).shuffleGrouping(spout_id);                        Config config = new Config();        String topology_name = StormTopologyMoreExecutor.class.getSimpleName();        if(args.length==0){            //在本地运行            LocalCluster localCluster = new LocalCluster();            localCluster.submitTopology(topology_name, config, topologyBuilder.createTopology());        }else{            //在集群运行            try {                StormSubmitter.submitTopology(topology_name, config, topologyBuilder.createTopology());            } catch (AlreadyAliveException e) {                e.printStackTrace();            } catch (InvalidTopologyException e) {                e.printStackTrace();            } catch (AuthorizationException e) {                e.printStackTrace();            }        }            }}

 

 

 

 

 

   打jar包

 

 

 

 

 

 

 

 

 

 

 

 

 

 提交作业之前

 

 

 

 

 

 

 

[hadoop@master apache-storm-1.0.2]$ pwd/home/hadoop/app/apache-storm-1.0.2[hadoop@master apache-storm-1.0.2]$ lltotal 208drwxrwxr-x  2 hadoop hadoop  4096 May 21 17:18 bin-rw-r--r--  1 hadoop hadoop 82317 Jul 27  2016 CHANGELOG.mddrwxrwxr-x  2 hadoop hadoop  4096 Jul 27 20:12 confdrwxrwxr-x  3 hadoop hadoop  4096 Jul 27  2016 examplesdrwxrwxr-x 17 hadoop hadoop  4096 May 21 17:18 externaldrwxrwxr-x  2 hadoop hadoop  4096 Jul 27  2016 extlibdrwxrwxr-x  2 hadoop hadoop  4096 Jul 27  2016 extlib-daemondrwxrwxr-x  2 hadoop hadoop  4096 Jul 27 23:00 jardrwxrwxr-x  2 hadoop hadoop  4096 May 21 17:18 lib-rw-r--r--  1 hadoop hadoop 32101 Jul 27  2016 LICENSEdrwxrwxr-x  2 hadoop hadoop  4096 May 21 17:18 log4j2drwxrwxr-x  2 hadoop hadoop  4096 May 21 19:05 logs-rw-r--r--  1 hadoop hadoop   981 Jul 27  2016 NOTICEdrwxrwxr-x  6 hadoop hadoop  4096 May 21 17:18 public-rw-r--r--  1 hadoop hadoop 15287 Jul 27  2016 README.markdown-rw-r--r--  1 hadoop hadoop     6 Jul 27  2016 RELEASE-rw-r--r--  1 hadoop hadoop 23774 Jul 27  2016 SECURITY.md[hadoop@master apache-storm-1.0.2]$ bin/storm jar jar/StormTopologyMoreExecutor.jar zhouls.bigdata.stormDemo.StormTopologyMoreExecutor aaaRunning: /home/hadoop/app/jdk/bin/java -client -Ddaemon.name= -Dstorm.options= -Dstorm.home=/home/hadoop/app/apache-storm-1.0.2 -Dstorm.log.dir=/home/hadoop/app/apache-storm-1.0.2/logs -Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib -Dstorm.conf.file= -cp /home/hadoop/app/apache-storm-1.0.2/lib/log4j-api-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/kryo-3.0.3.jar:/home/hadoop/app/apache-storm-1.0.2/lib/storm-rename-hack-1.0.2.jar:/home/hadoop/app/apache-storm-1.0.2/lib/log4j-core-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/slf4j-api-1.7.7.jar:/home/hadoop/app/apache-storm-1.0.2/lib/minlog-1.3.0.jar:/home/hadoop/app/apache-storm-1.0.2/lib/objenesis-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/clojure-1.7.0.jar:/home/hadoop/app/apache-storm-1.0.2/lib/servlet-api-2.5.jar:/home/hadoop/app/apache-storm-1.0.2/lib/log4j-slf4j-impl-2.1.jar:/home/hadoop/app/apache-storm-1.0.2/lib/log4j-over-slf4j-1.6.6.jar:/home/hadoop/app/apache-storm-1.0.2/lib/storm-core-1.0.2.jar:/home/hadoop/app/apache-storm-1.0.2/lib/disruptor-3.3.2.jar:/home/hadoop/app/apache-storm-1.0.2/lib/asm-5.0.3.jar:/home/hadoop/app/apache-storm-1.0.2/lib/reflectasm-1.10.1.jar:jar/StormTopologyMoreExecutor.jar:/home/hadoop/app/apache-storm-1.0.2/conf:/home/hadoop/app/apache-storm-1.0.2/bin -Dstorm.jar=jar/StormTopologyMoreExecutor.jar zhouls.bigdata.stormDemo.StormTopologyMoreExecutor aaa2632 [main] INFO  o.a.s.StormSubmitter - Generated ZooKeeper secret payload for MD5-digest: -7700164527916050772:-91241746556222733753011 [main] INFO  o.a.s.s.a.AuthUtils - Got AutoCreds []3598 [main] INFO  o.a.s.StormSubmitter - Uploading topology jar jar/StormTopologyMoreExecutor.jar to assigned location: /home/hadoop/data/storm/nimbus/inbox/stormjar-83ebee61-5051-4ab5-aff7-e9fcf4560f42.jar3711 [main] INFO  o.a.s.StormSubmitter - Successfully uploaded topology jar to assigned location: /home/hadoop/data/storm/nimbus/inbox/stormjar-83ebee61-5051-4ab5-aff7-e9fcf4560f42.jar3714 [main] INFO  o.a.s.StormSubmitter - Submitting topology StormTopologyMoreExecutor in distributed mode with conf {
"storm.zookeeper.topology.auth.scheme":"digest","storm.zookeeper.topology.auth.payload":"-7700164527916050772:-9124174655622273375"}5363 [main] INFO o.a.s.StormSubmitter - Finished submitting topology: StormTopologyMoreExecutor[hadoop@master apache-storm-1.0.2]$

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

  

  为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

 

   因为,我之前运行的StormTopologyMoreWorker没有停掉

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

   为什么,会是如上的数字呢?大家要学,就要深入去学和理解。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

   

  即,以下就是它的3个Executor

 

 

 

 

  

 

转载地址:http://lljkl.baihongyu.com/

你可能感兴趣的文章
业务开发测试HBase之旅三:通过Java Api与HBase交互
查看>>
让ComboBox显示图片--PictureComboBox
查看>>
JS父页面获取子页面返回值
查看>>
鼠标点击主窗体时,模态子窗口是WindowStyle.None时如何闪烁
查看>>
LABJS源码浅析
查看>>
myShellcode
查看>>
Qore Oracle Module 2.2 发布
查看>>
MoonScript 0.2.2 发布,基于 Lua 的脚本语言
查看>>
assertThat使用方法
查看>>
2013年11月11日工商银行笔试总结
查看>>
Qt之问题求助——当VS遇到“向Pro中添加代码”怎么办?
查看>>
使用reserve函数避免vector和string的内存重新分配
查看>>
ADO.NET(内含存储过程讲解)
查看>>
利用TreeView实现C#工具箱效果
查看>>
PyTalk : a Jabber Client un Python using xmpppy and PyQt4
查看>>
C++类构造函数初始化列表(转)
查看>>
13最佳WordPress的维护插件
查看>>
Missing Screenshot 的解决方案
查看>>
jQuery:1.5.4.3,表格变色(单击行,把当行的单选按钮(radio)设为选中状态,并应用当前样式)...
查看>>
oracle11gR2安装示例数据库
查看>>