Storm应用系列之——最基本的例子

2023年3月28日

前言:

        本文会从如何写一个Storm的topology开始,来对Storm实现的细节进行阐述。避免干巴巴的讲理论。

 

1. 建立Maven项目

我们用Maven来管理项目,方便lib依赖的引用和版本控制。

建立最基本的pom.xml如下:

 

1.   
3. 4.0.0  
4. com.edi.storm  
5. storm-samples  
6. 0.0.1-SNAPSHOT  
7. jar  
8.   
9.   
10.   
11. UTF-8  
12.   
13.   
14.   
15.   
16.   
17. clojars.org  
18. http://clojars.org/repo  
19.   
20.   
21.   
22.   
23.   
24. storm-samples  
25.   
26.   
27. org.apache.maven.plugins  
28. maven-compiler-plugin  
29. 3.1  
30.   
31. 1.7  
32. 1.7  
33. ${project.build.sourceEncoding}  
34.   
35.   
36.   
37.   
38.   
39. maven-assembly-plugin  
40.   
41.   
42. jar-with-dependencies  
43.   
44.   
45.   
46.   
47. make-assembly  
48. package  
49.   
50. single  
51.   
52.   
53.   
54.   
55.   
56.   
57.   
58.   
59.   
60.   
61.   
62.   
63. storm  
64. storm  
65. 0.9.0-rc2  
66. provided  
67.   
68.   
69. 

这里我额外添加了两个build 插件:

 

maven-compiler-plugin : 为了方便指定编译时jdk。Storm的依赖包里面某些是jdk1.5的.

和 

maven-assembly-plugin: 为了把所有依赖包最后打到一个jar包去,方便测试和部署。后面会提到如果不想打到一个jar该怎么做。

 

 

2. 建立Spout

前文提到过,Storm中的spout负责发射数据。

 

我们来实现这样一个spout:

它会随机发射一系列的句子,句子的格式是 谁:说的话

代码如下:

 

1. public class RandomSpout extends BaseRichSpout {  
2.   
3. private SpoutOutputCollector collector;  
4.   
5. private Random rand;  
6.       
7. private static String[] sentences = new String[] {"edi:I'm happy", "marry:I'm angry", "john:I'm sad", "ted:I'm excited", "laden:I'm dangerous"};  
8.       
9. @Override  
10. public void open(Map conf, TopologyContext context,  
11.             SpoutOutputCollector collector) {  
12. this.collector = collector;  
13. this.rand = new Random();  
14.     }  
15.   
16. @Override  
17. public void nextTuple() {  
18.         String toSay = sentences[rand.nextInt(sentences.length)];  
19. this.collector.emit(new Values(toSay));  
20.     }  
21.   
22. @Override  
23. public void declareOutputFields(OutputFieldsDeclarer declarer) {  
24. new Fields("sentence"));  
25.     }  
26.   
27. }

 

这里要先理解Tuple的概念。

Storm中,基本元数据是靠Tuple才承载的。或者说,Tuple是数据的一个大抽象。它要求实现类必须能序列化。

 

该Spout代码里面最核心的部分有两个:

a. 用collector.emit()方法发射tuple。我们不用自己实现tuple,我们只需要定义tuple的value,Storm会帮我们生成tuple。Values对象接受变长参数。Tuple中以List存放Values,List的Index按照new Values(obj1, obj2,...)的参数的index,例如我们emit(new Values("v1", "v2")), 那么Tuple的属性即为:{ [ "v1" ], [ "V2" ] }

b. declarer.declare方法用来给我们发射的value在整个Stream中定义一个别名。可以理解为key。该值必须在整个topology定义中唯一。

 

 

3. 建立Bolt

既然有了源,那么我们就来建立节点处理源流出来的数据。怎么处理呢?为了演示,我们来做些无聊的事情:末尾添加"!",然后打印。

两个功能,两个Bolt。

先看添加"!"的Bolt

 

1. public class ExclaimBasicBolt extends BaseBasicBolt {  
2.   
3. @Override  
4. public void execute(Tuple tuple, BasicOutputCollector collector) {  
5. //String sentence = tuple.getString(0);  
6. 0);  
7. "!";  
8. new Values(out));  
9.     }  
10.   
11. @Override  
12. public void declareOutputFields(OutputFieldsDeclarer declarer) {  
13. new Fields("excl_sentence"));  
14.     }  
15.   
16. }

在RandomSpout中,我们发射的Tuple具有这样的属性 { [ "edi:I'm Happy" ] }, 所以tuple的value list中第0个值,肯定是个String。我们用tuple.getvalue(0)取到。

Storm为tuple封装了一些方法方便我们取一些基本类型,例如String,我们可以直接用getString(int N) 。

取到以后,我们在末尾添加"!"后,仍然发射一个Tuple,定义其唯一的value的field 名字为"excl_sentence"

 

打印Bolt

 

1. public class PrintBolt extends BaseBasicBolt {  
2.   
3. @Override  
4. public void execute(Tuple tuple, BasicOutputCollector collector) {  
5. 0);  
6. "String recieved: " + rec);  
7.     }  
8.   
9. @Override  
10. public void declareOutputFields(OutputFieldsDeclarer declarer) {  
11. // do nothing  
12.     }  
13.   
14. }

仍然是取第一个,因为我们并没有定义过第二个value

 

 

4. 建立Topology

现在我们建立拓扑结构的主要组件都有了,可以创建topology了。

 

1. public class ExclaimBasicTopo {  
2.   
3. public static void main(String[] args) throws Exception {  
4. new TopologyBuilder();  
5.           
6. "spout", new RandomSpout());  
7. "exclaim", new ExclaimBasicBolt()).shuffleGrouping("spout");  
8. "print", new PrintBolt()).shuffleGrouping("exclaim");  
9.   
10. new Config();  
11. false);  
12.   
13. if (args != null && args.length > 0) {  
14. 3);  
15.   
16. 0], conf, builder.createTopology());  
17. else {  
18.   
19. new LocalCluster();  
20. "test", conf, builder.createTopology());  
21. 100000);  
22. "test");  
23.             cluster.shutdown();  
24.         }  
25.     }  
26. }

 

很简单,对吧。

其中,

 

1. builder.setSpout("spout", new RandomSpout());

定义一个spout,id为"spout"

 

1. builder.setBolt("exclaim", new ExclaimBasicBolt()).shuffleGrouping("spout");

定义了一个id为"exclaim"的bolt,并且按照随机分组获得"spout"发射的tuple

 

 

1. builder.setBolt("print", new PrintBolt()).shuffleGrouping("exclaim");

定义了一个id为"print"的bolt,并且按照随机分组获得"exclaim”发射出来的tuple

 

1. .shuffleGrouping

是指明Storm按照何种策略将tuple分配到后续的bolt去。

 

可以看到,如果我们运行时不带参数,是把topology提交到了LocalCluster的,即所有的task都在一个本地JVM去执行。可以用LocalCluster来调试。如果后面带一个参数,即为该topology的名字,那么就把该topology提交到集群上去了。

把项目用M2E插件导入Eclipse直接运行试试

 

1. String recieved: marry:I'm angry!  
2. String recieved: edi:I'm happy!  
3. String recieved: john:I'm sad!  
4. String recieved: edi:I'm happy!  
5. String recieved: ted:I'm excited!  
6. String recieved: laden:I'm dangerous!  
7. String recieved: edi:I'm happy!  
8. String recieved: edi:I'm happy!

这里我们并没有指定并行,那么其实是每个spout、bolt仅有一个线程对应去执行。

 

我们修改下代码,指定并行数

 

1. builder.setBolt("exclaim", new ExclaimBasicBolt(), 2).shuffleGrouping("spout");  
2. builder.setBolt("print", new PrintBolt(),3).shuffleGrouping("exclaim");

由于我们并没有多指定task数目,所以默认,会有两个exectuor去执行两个exclaimBasicBolt的task,3个executor去执行3个PrintBolt的task。

 

为了方便体现确实是并行,我们修改PrintBolt代码如下:

 

1. public class PrintBolt extends BaseBasicBolt {  
2.   
3. private int indexId;  
4.       
5. @Override  
6. public void prepare(Map stormConf, TopologyContext context) {  
7. this.indexId = context.getThisTaskIndex();  
8.     }  
9.   
10. @Override  
11. public void execute(Tuple tuple, BasicOutputCollector collector) {  
12. 0);  
13. "Bolt[%d] String recieved: %s",this.indexId, rec));  
14.     }  
15.   
16. @Override  
17. public void declareOutputFields(OutputFieldsDeclarer declarer) {  
18. // do nothing  
19.     }  
20.   
21. }

这里从上下文中拿到该Bolt的TaskIndex,我们指定了3的并发度,所以理论上有3个task,那么该值应该为[1,2,3]。

 

运行下看看:

 

1. Bolt[0] String recieved: marry:I'm angry!  
2. Bolt[2] String recieved: john:I'm sad!  
3. Bolt[2] String recieved: ted:I'm excited!  
4. Bolt[2] String recieved: john:I'm sad!  
5. Bolt[2] String recieved: john:I'm sad!

证实确实是并发了。

 

本地测试通过了,我们用 mvn clean install 命令编译,然后把target目录下生成的 storm-samples-jar-with-dependencies.jar 拷到nimbus机器上,执行

 

1. ./storm jar storm-samples-jar-with-dependencies.jar com.edi.storm.topos.ExclaimBasicTopo test

在StormUI里面,点进 test

 

看到spout 已然已经emit了 11347280个tuple了…… 而id为exclaim的bolt也已经接受了2906920个tuple了。print没有输出,所以emit为0。

 


 

截止到这里,一个简单的Storm的topology已经完成了。

但是,这里依然有些问题:

1. 什么是acker?

2. Bolt为什么有两个继承类和接口?

3. Topology的提交方式到底有几种?

4. 除了随机分组,还有哪些分组策略?

5. Storm是如何保证tuple不被丢失的?

6. 我看到spout发送数据比bolt处理的速度快太多了,我能不能在spout里面sleep?

7. 并发数要如何指定呢?

 

服务器托管,北京服务器托管,服务器租用 http://www.hhisp.net
机房租用,北京机房租用,IDC机房托管, http://www.e1idc.net

hackdl

咨询热线/微信 13051898268