storm的配置以及实例实践-一个简单的实例

终于把配置部分写完了。其实如果你觉得浪费时间,不想熟悉linux,那么其实你可以根本就不看,直接用本地模式跑一个简单的程序也行。但是事实上,我们学习storm,就是为了让其处理实时的大数据,配置集群有利于我们更熟悉整个框架。好了不多说,进入正题,这篇文章,我将用简单的实例代码来初步接触storm。

在storm官网下载的压缩包里有官方给出的例子,建议大家根据这个来学习一下。由于里面涉及到maven,需要配置一些东西,所以这里暂时不直接跑它的例子。取而代之的,我们用一个比较直观的例子来熟悉storm。

需求如下:

从一个数据源中获取随机的单词,然后将其转换为大写,再加个后缀,写入本地文件。

在storm的框架下,我们可以这样来解决这个问题。首先因为我们没有数据源,所以我们用RandomSpout来模拟产生源源不断的数据,然后用UpperBolt来接收数据,将字母变为大写,再发送给SuffixBolt,给它加后缀,然后写入文件。

接下来就用代码来说话。

eclipse新建工程,并在build path中把storm的jar包引进来(通常在lib文件夹下)。

写第一个类RandomSpout,用于产生数据

import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class RandomSpout extends BaseRichSpout{
	SpoutOutputCollector collector=null;
	String[] words={"storm","apache","spark","hadoop"};
	@Override
	public void nextTuple() {
		Random random = new Random();
		String word =words[random.nextInt(words.length)];
		//发送消息
		collector.emit(new Values(word));
		
	}

	@Override
	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
		//初始化collector
		this.collector=collector;
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("src_word"));
	}

}

要点:

  • open()方法是运行的最早的一个方法,我们这里初始化了collector,以方便给别的方法使用。
  • 最开始创建了一个String数组,作为单词源。里面有四个单词:storm,apache,spark,hadoop。
  • nextTuple()方法会被循环调用,里面的collector.emit(new Values(word))用于发送数据给接收者。
  • declareOutputFields()方法用来定义输出的字段。

然后新建UpperBolt类,用于接收RandomSpout发来的单词,并将单词转化为大写。

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class UpperBolt extends BaseBasicBolt {

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String src_word = tuple.getString(0);
		String upper_word=src_word.toUpperCase();
		//发送消息出去
		collector.emit(new Values(upper_word));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("upper_word"));
	}

}

要点:

  • execute()方法不断接收tuple,并在里面写逻辑进行处理,并通过collector将封装成tuple的数据emit出去
  • tuple是storm传输数据的单位,我们需要将数据处理好之后封装成tuple,再发送出去,new Values(upper_word)就是封装成tuple的一种形式

再新建SuffixBolt类,用于接受经过大写处理的单词,同时给单词加上后缀“-handled”。最后再写入本地文件。

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

public class SuffixBolt extends BaseBasicBolt{
	FileWriter fileWriter = null; 
	
    @Override
	public void prepare(Map stormConf, TopologyContext context) {
		try {
			fileWriter = new FileWriter("/home/"+UUID.randomUUID());//初始化fileWriter,写到本地文件,在/home/目录下,文件用随机的UUID命名
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String upper_word = tuple.getString(0);//接收数据
		String suffix_word = upper_word+"-handled";//加后缀
		
		try {
			fileWriter.append(suffix_word+"\n");//写入文件
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("suffix_word") );
	}

}

要点:

  • execute()方法不断接收tuple,但和之前的Bolt不一样的是,这次我们不用发送数据了,而是将处理后的数据直接写入本地文件中。

最后,写个主程序类TopMain,用来串联所有的组件。

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;

public class TopMain {

	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
		
		//新建对象
		TopologyBuilder topologyBuilder = new TopologyBuilder();
		//设置spout,并行度为4,第一个参数为自定义的名字
		topologyBuilder.setSpout("randomspout", new RandomSpout(),4);
		//设置bolt,数据来源是上面的spout
		topologyBuilder.setBolt("upperbolt", new UpperBolt(),4).shuffleGrouping("randomspout");
		topologyBuilder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
		
		//创建拓扑
		StormTopology top = topologyBuilder.createTopology();
		
		//创建config,配置storm
		Config config = new Config();
		config.setNumWorkers(3);  //进程数
		config.setNumAckers(2);   //应答器数目

		//提交拓扑给集群
		StormSubmitter.submitTopology("example_top", config, top);
	
	}
}

要点:

  • 拓扑是storm里面一个很重要的概念,说白了,就是工程的整合,在这里,你可以设置worker数,Acker数,串联Spout和Bolt等等,关于它的配置,其实有很多内容,这里先不多说,注释把基本的功能写出来了,其实每个地方都可以深入探讨的。

好,代码写完了,怎么跑呢,且听下回分解。

发表评论

电子邮件地址不会被公开。 必填项已用*标注