hadoop官方mapreduce简单例子-WordCount代码解读

WordCount经常在我们初学mapreduce的时候,被作为最简单的例子来讲解,这次我们就从官方给出的源码入手,看看是怎么回事。

所谓Mapreduce,就是Map(映射)”和”Reduce(归约)。map过程,就是把一组<key,value>映射成新的<key,value>;reduce过程,就是把map过程产生的一些列<key,value>,其中那些key相同的,归约成<key,<values>>来处理,一个key对应一组value。

有了这点认识,我们来看看hadoop的mapreduce源码范例:wordcount。

这个程序用来统计一个文本里面各个单词出现的个数。具体怎么运行可以参看我之前写的《centos+虚拟机配置hadoop2.5.2-mapreduce-wordcount例子

下面分析一下WordCount源码:

Map过程:

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

关于mapper类的要点:

  • maps将输入的键值对输出为中间键值对,输出不一定是和输入是同样类型的,一个输入键值对可以输出多个键值对,也可能不输出。
  • hadoop的Map-Reduce框架对每一个InputSplit键值对都生成一个map任务,这些键值对是由InputFormat生成的。
  • 框架首先调用setup函数,随后是map函数(InputSplit产生的每一个键值对都调用一次),最后调用finally函数。在wordcount中,只重写了map函数。
  • mapper的输出被分成了不同组,供每个reducer来处理,我们可以通过实现抽象类Partitioner来控制排序和分组。

TokenizerMapper类的简单解析:

由于继承mapper类,因此要实现map()方法。
其中有这么一行代码:

StringTokenizer itr = new StringTokenizer(value.toString());

value是传进来的值,对于这个例子而言,是一行的文本,将一行分割成一个又一个单词,然后,用一个while循环迭代,生成新的(key,value)。代码如下:

while (itr.hasMoreTokens()) {
    word.set(itr.nextToken());
    context.write(word, one);
}

其中的one这个变量,是在刚开始声明的:

private final static IntWritable one = new IntWritable(1);

用来计数,这里是1,所以,输出的(key,value)都是这样的形式:(“单词”,1)。可供之后处理。

Reduce过程:

  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

关于reducer类的要点:

  • reducer将中间输出键值对中那些键相同的合并,值为集合。
  • reduce主要有三个阶段:
    1. shuffle洗牌:reducer将排序好的输出从每个mapper里面复制出来,整个过程用HTTP来通信。
    2. sort排序:框架将reducer的具有相同键的输入合并排序,因为不同的mapper可能有相同的键,shuffle过程和sort过程是同时进行的。
    3. Reduce归约:在reduce阶段,每个key传进来,reduce方法都被调用一次,进行归约。

IntSumReducer类的简单解析:

重写了reduce方法,由于之前已经有map过程了,因此此时传进来的键值对的形式是<key,>,即value不是一个值,而是值的集合。用一个for循环,即可遍历某个key里面的所有值:

for (IntWritable val : values) {
    sum += val.get();
}

这个for循环将某个key对应的所有的value累加,即某单词出现次数的累加。
然后把结果输出:

result.set(sum);
context.write(key, result);

其中的result在之前声明了:

private IntWritable result = new IntWritable();

是整型,所以之后context.write(key,result)就把某单词出现的次数输出了。

Mapreduce的整个过程:

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }

要点:

  • Configuration conf = new Configuration(); 这句新建了一个配置对象conf,可以配置mapreduce的一些参数,这里没有对配置作过多的调整。
  • Job job = new Job(conf, “word count”); 这句新建了一个job,用于控制整个工作流程,接下来的6个set函数:
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

它们分别是:
1.设置工作的类名,这里是WordCount
2.设置mapper的类:TokenizerMapper.class)
3.设置combiner的类:IntSumReducer.class
4.设置reducer的类:IntSumReducer.class,和combiner是一样的。
5.设置输出key的格式,text类型
6.设置输出value的格式,int类型。

  • 接着就是设置输入输出路径,都由命令行参数来指定
  • 最后调用job.waitForCompletion(true) 来开始工作。

最后附上完整源代码:

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

centos+虚拟机配置hadoop2.5.2-mapreduce-wordcount例子

配置完hadoop的环境,照例,肯定要跑一个简单的程序测试一下。

似乎提到hadoop,就不得不提mapreduce,作为mapreduce的一个开源实现,hadoop的应用非常广。引用一段文字,大概就能了解一下它们的历史了:

2003年和2004年,Google公司在国际会议上分别发表了两篇关于Google分布式文件系统和MapReduce的论文,公布了Google的GFS和MapReduce的基本原理和主要设计思想。2004年,开源项目Lucene(搜索索引程序库)和Nutch(搜索引擎)的创始人Doug Cutting发现MapReduce正是其所需要的解决大规模Web数据处理的重要技术,因而模仿Google MapReduce,基于Java设计开发了一个称为Hadoop的开源MapReduce并行计算框架和系统。自此,Hadoop成为Apache开源组织下最重要的项目,自其推出后很快得到了全球学术界和工业界的普遍关注,并得到推广和普及应用。

废话不多说,我们来看看怎么跑基于mapreduce的简单例子–wordcount。

首先要格式化namenode。根据你的目录,运行以下的命令

hadoop/bin/hadoop namenode -format

查看信息的倒数几行,如果显示:

Storage directory ~/hadoop-2.5.2/hdfs/name has been successfully formatted

则说明成功了!

接下来启动hdfs,在hadoop文件夹下的sbin文件夹内运行以下命令(根据你当前目录的位置自行调整):

sbin/start-dfs.sh

在主节点运行jps命令,看到namenode进程已经启动:
主节点jps

在子节点键入jps命令,看到datanode进程已经启动:

子节点jps进程

因为之前的配置,所以我们可以在50070端口查看hadoop的工作状态。

比如我可以在浏览器输入:http://192.168.254.100:50070/  (根据你的主节点ip改变网址),可以看到工作状态:

hadoop工作状态

接下来启动yarn,在sbin目录下:

sbin/start-yarn.sh

好了,准备工作完成,接下来,跑程序。

1.在hdfs上创建input目录

如果你的hadoop已经加入环境变量了,你就不需要切换目录到bin下,直接hadoop + 命令就可以了。否则,你得先切换到hadoop的bin目录下,运行以下命令

./hadoop fs -mkdir -p input

2.新建文本
新建一个test.txt

vim /home/test.txt

在里面输入任意内容,单词,然后保存。比如我是这样输入的:

test.txt的内容

将文件复制到hdfs的input目录下:

./hadoop fs -put /home/test.txt input

3.运行wordcount程序

程序在hadoop文件夹下的share下,注意以下命令的路径:

./Hadoop jar share/hadoop/mapreduce/sources/hadoop-mapreduce-examples-2.5.2-sources.jar org.apache.hadoop.examples.WordCount input output

4.观察结果
这是运行过程中的状态:
wordcount工作状态

看看结果生成了什么文件:

./hadoop fs -ls output/

结果生成了两个文件:
wordcount结果
part-r -00000就是我们要的结果。
看看里面的内容:

./hadoop fs -cat output/part-r-00000

里面是计数结果:
wordcount计数结果

 

以上就是wordcount运行的全过程,最后退出hdfs和yarn。先切换到sbin目录下,然后运行以下命令即可:

./stop-dfs.sh
./stop-yarn.sh

centos+虚拟机配置hadoop2.5.2-hadoop文件配置

在上一节《centos+虚拟机配置hadoop2.5.2-前期准备》,讲了一下配置hadoop的前期工作,方便我们之后的操作。做好前期的准备工作之后,可以进行hadoop的安装配置了。

本文配置的hadoop版本为2.5.2,可以到这里下载:http://mirror.bit.edu.cn/apache/hadoop/common/hadoop-2.5.2/   我下载的是tar.gz格式的。

下载完成,上传到centos服务器(虚拟机里的centos)。首先解压,我把它解压到

这个目录下:/env/

tar -zvxf hadoop-2.5.2.tar.gz -C /env/

随后要设置环境变量。把这些命令写入/etc/profile和~/.bashrc 中,说个题外话,通常我们配置环境变量都会接触到这两个文件,他们到底有什么区别呢?找了找资料:

bashrc与profile都用于保存用户的环境信息,bashrc用于交互式non-loginshell,而profile用于交互式login shell

先编辑/etc/profile

vi /etc/profile

在文本的末尾加入以下命令,具体的路径以你自己配置的为准:

export HADOOP_PREFIX=/env/hadoop-2.5.2
export YARN_CONF_DIR=/env/hadoop-2.5.2
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_PREFIX/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_PREFIX/lib"
export HADOOP_HOME=/env/hadoop-2.5.2
export HADOOP_HOME_WARN_SUPPRESS=1

更改profile
使配置生效:

source /etc/profile 

再编辑~/.bashrc

vi ~/.bashrc

加入以下内容,具体路径仍以你的配置为准:

export JAVA_HOME=/usr/java/jdk1.7.0_79
export JRE_HOME=/usr/java/jdk1.7.0_79/jre
export HADOOP_HOME=/env/hadoop-2.5.2
export HADOOP_DEV_HOME=/env/hadoop-2.5.2
export HADOOP_COMMON_HOME=/env/hadoop-2.5.2
export HADOOP_HDFS_HOME=/env/hadoop-2.5.2
export HADOOP_CONF_DIR=/env/hadoop-2.5.2/conf

使配置生效:

source ~/.bashrc

然后在另外两台主机上重复以上步骤,将环境变量配置好。

接下来,来配置一下hadoop。切换目录

cd /env/hadoop-2.5.2/etc/hadoop/

要配置5个文件:core-site.xml,hdfs-site.xml,yarn-site.xml,mapred-site.xml和slaves

首先编辑core-site.xml,在configuration里面添加以下内容

<configuration>
    <property>
        <name>fs.default.name</name>
        <value>hdfs://main:9000</value>
    </property>
    <property>
        <name>hadoop.tmp.dir</name>
        <value>/env/hadoop-2.5.2/tmp</value>
    </property>
</configuration>

随后编辑vim hdfs-site.xml,配置hadoop的文件系统hdfs。在configuration里面添加以下内容

<configuration>
    <property>
        <name>dfs.namenode.name.dir</name>
        <value>file:/home/hadoop/dfs/name</value>
    </property>
    <property>
        <name>dfs.namenode.data.dir</name>
        <value>file:/home/hadoop/dfs/data</value>
    </property>                                                                     <property>
        <name>dfs.replication</name>
        <value>2</value>
    </property>
</configuration>

编辑yarn-site.xml,这个是map-reduce的新框架yarn的配置文件。添加以下内容,里面的value,可以设置主机名,也可以设置为ip:

<configuration>
    <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
    </property>
    <property>
        <name>yarn.resourcemanager.address</name>
        <value>192.168.254.100:8032</value>
    </property>
    <property> 
        <name>yarn.resourcemanager.scheduler.address</name>
        <value>192.168.254.100:8030</value>
        </property>
    <property>
        <name>yarn.resourcemanager.resource-tracker.address</name>
        <value>192.168.254.100:8031</value>
    </property>
    <property>
        <name>yarn.resourcemanager.admin.address</name>
        <value>192.168.254.100:8033</value>
    </property>
    <property>
        <name>yarn.resourcemanager.webapp.address</name>
        <value>192.168.254.100:8088</value>
    </property>
</configuration>

然后编辑mapred-site.xml,这个是map-reduce任务页面的配置,添加以下内容:

<configuration>  
    <property>    
        <name>mapred.job.tracker</name>   
        <value>main:9001</value>  
    </property>
</configuration>

最后,还是在刚才的目录下,编辑slaves文件,将两个子节点的ip或者主机名添加进去

vi slaves

编辑内容如下:

#localhost
192.168.254.101
192.168.254.102

保存,搞定!
将配置好的hadoop直接复制到另外两台主机(-r参数是复制文件夹,不要漏掉):

scp -r /env/hadoop-2.5.2 root@slave1:/env/
scp -r /env/hadoop-2.5.2 root@slave2:/env/

至此,配置完毕,下一篇还是老样子,跑一个简单的程序。

centos+虚拟机配置hadoop2.5.2-前期准备

之前写了一个《storm的配置以及实例实践》系列文章,系列第一篇:《storm的配置以及实例实践-前言》。

也运行了一个简单的例子,对storm这个框架有了大概的了解,之后可以在此框架上写一写代码,加深理解。

其实我们接触大数据处理框架,听得最多的还是hadoop,今天就在之前配置的基础上,继续配置hadoop,进行相关的学习。

稍微提一下之前的配置:三台虚拟机,系统为centos6.5,均配置了静态的ip以及对应的主机名,今天要配置的是hadoop2.5.2,可以去http://mirror.bit.edu.cn/apache/hadoop/common/hadoop-2.5.2/  下载。

正式配置之前,需要做一些前期的准备。

确认主机名以及能互相ping通

命令行输入

vim /etc/sysconfig/network

确认设置了主机名,需要检查三台虚拟机是否有设置。比如我的三台虚拟机是这样的,hostname为主机名:
hostname设置

设置对应的ip地址与主机名的映射:

vim /etc/hosts

如图:
主机名与ip的映射

直接用主机名来互相ping一下,看一下能不能ping通。如果不能,请看一下配置文件是否正确,用source命令使配置生效。如下图,就是能ping通。

主机间互相ping

多说一句,这个步骤是为了保证之后hadoop能正确执行分布式任务,如果ping不通,谈何分布式任务。

设置免密码登陆ssh

为了方便,我们可以设置免密码登陆ssh。以下步骤在主节点主机执行即可。

本机免密码登陆ssh,生成公钥文件authorized_keys,命令如下,出现提示直接回车下一步即可:

ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa&nbsp;
cat ~/.ssh/id_dsa.pub&gt;&gt; ~/.ssh/authorized_keys&nbsp;

测试一下本机是否可以无密码登陆ssh,如果不需要密码则成功了:
本机ssh无密码登陆

除了本机无密码登陆ssh,我们也要设置本机(主节点)免密码ssh登陆另外两台主机,其实很简单,只需要复制刚才生成的公钥文件authorized_keys到另外两台主机就行了,可以通过各种方式复制,也可以直接在命令行输入以下命令,把ip更改为你的子节点ip就行了:

scp authorized_keys root@192.168.254.101:/root/.ssh/
scp authorized_keys root@192.168.254.102:/root/.ssh/ 

测试本机是否可以无密码通过ssh登陆子节点:

ssh root@192.168.254.101 

无需密码登陆就成功了。

大概的前期准备工作就完成了,下一篇继续讲怎么配置hadoop2.5.2。

storm的配置以及实例实践-在集群上运行程序

代码编写完毕,接下来就是检验代码的效果了。
Storm的代码可以在本地上跑,也可以在集群上跑。为了模拟真实的操作环境,我们当然要在集群上跑,不然前面配置那么多就白配置了。
当然,如果要在本地上跑,拓扑的代码就不一样了,这里先不说,上一篇的代码是按照集群环境来写的。接下来,我们就要验收一下前面的工作成果了。

首先在eclipse的工程中,将代码export成jar文件,步骤很简单,右键–export–jar文件。另外,你得把拓扑的函数完整名字记录下来,就这个例子而言,拓扑类是Topmain,然而完整路径则不一定,在eclipse,直接把鼠标移动到类名上,就能看到完整路径了。如下图:

主类路径

主类名是stormtest.test.TopMain,这和你的工程、包有关,反正先记录下来。

接下来把3台虚拟机都跑起来。把刚才的jar包上传到第一台服务器上,即配置的主节点。
由于zookeeper我们配置的也是在主节点上,所以在主节点服务器上需要开启zookeeper的服务。切换到zookeeper的主目录下的bin目录,然后运行以下命令

./zkServer.sh start

zookeeper启动成功

如上图即运行成功。

然后在这台主机上运行storm的主节点nimbus:将目录切换到storm目录下的bin,运行以下命令:

./storm nimbus 1>/dev/null 2>&1 &

没出错的话,nimbus就会在后台运行了。然后再开启UI,方便我们在线查看工程运行的情况。同样在该目录下,运行以下命令:

./storm ui 1>/dev/null 2>&1 &

输入jps可以查看进程的运行情况:
jps查看进程
接下来,可以在本地访问UI了,在浏览器输入–主机地址:8080 回车访问。比如我就输入
192.168.254.100:8080,可以看到以下概况:
UI

OK,接下来开启从节点。切换到第二台机器的命令行,同样切换目录到storm下的bin,输入以下命令:

./storm supervisor 1>/dev/null 2>&1 &

如法炮制,切换到第三台机器,同样运行

./storm supervisor 1>/dev/null 2>&1 &

这样,我们的集群就先运行起来了。这时候回去UI刷新一下,可以看到两个从节点确实跑起来了:
UI2
此时此刻,我们要进行最激动人心的一步了,提交拓扑。还记得刚才的jar包吗,我是把它上传到第一台机器的上了 完整路径是/home/filetrans/storm-example.jar
在第一台机器上切换目录到storm下的bin,运行以下命令:

./storm jar /home/filetrans/storm-example.jar stormtest.test.TopMain

以上命令,jar表示运行jar程序,/home/filetrans/storm-example.jar表示jar程序的路径,stormtest.test.TopMain表示主函数的完整路径,这在文章一开始有提到过。
出现以下提示,说明拓扑提交成功了。
提交拓扑成功
此时再去UI刷新看看,我们会发现有拓扑在工作了,拓扑名为example_top,状态为ACTIVE:
工程active状态
Num Workers是我们代码里设置的worker数,有3个。Num executor为线程数,有14个,14是怎么来的呢?首先以下三行代码,我们可以看出一共有加起来就一共有12个线程了。

//设置spout,并行度为4,第一个参数为自定义的名字
topologyBuilder.setSpout("randomspout", new RandomSpout(),4);
//设置bolt,数据来源是上面的spout
topologyBuilder.setBolt("upperbolt", new UpperBolt(),4).shuffleGrouping("randomspout");
topologyBuilder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");

然后以下这一行代码,有两个线程:

config.setNumAckers(2);   //应答器数目

全部加起来,就有14个线程了。

Ok,程序也跑了有一段时间了,我们去看看效果。回头看看SuffixBolt类
我们在里面写了这么一句代码:

fileWriter = new FileWriter("/home/"+UUID.randomUUID());

然后在后面的代码中,往这个文件追加写入了运行结果。
此时我们需要到第二台或者第三台主机上查看这些文件。
切换到/home目录下
目录

前三个文件就是以随机的uuid明明的记录文件,使用more+文件名命令打开看看。
More 命令一下。
查看内容
确实有结果,程序跑成功了。

如果这个时候,我们不管程序,它就会继续跑,文件无限增大。所以在项目结束后,需要关闭这个拓扑的工作。

回到主节点上,切换到storm目录下的bin,运行以下命令:
./storm kill example_top
其中的example_top是拓扑名称,是我们在代码里面定义的,在UI上也可以看到。Kill完之后,去UI刷新一下,发现拓扑的状态变为KILLED,再过一会你再刷新,拓扑就直接消失了。
拓扑状态killed
至此,一个完整的程序就跑完了。关于storm,就先告一段落了。

storm的配置以及实例实践-一个简单的实例

终于把配置部分写完了。其实如果你觉得浪费时间,不想熟悉linux,那么其实你可以根本就不看,直接用本地模式跑一个简单的程序也行。但是事实上,我们学习storm,就是为了让其处理实时的大数据,配置集群有利于我们更熟悉整个框架。好了不多说,进入正题,这篇文章,我将用简单的实例代码来初步接触storm。

在storm官网下载的压缩包里有官方给出的例子,建议大家根据这个来学习一下。由于里面涉及到maven,需要配置一些东西,所以这里暂时不直接跑它的例子。取而代之的,我们用一个比较直观的例子来熟悉storm。

需求如下:

从一个数据源中获取随机的单词,然后将其转换为大写,再加个后缀,写入本地文件。

在storm的框架下,我们可以这样来解决这个问题。首先因为我们没有数据源,所以我们用RandomSpout来模拟产生源源不断的数据,然后用UpperBolt来接收数据,将字母变为大写,再发送给SuffixBolt,给它加后缀,然后写入文件。

接下来就用代码来说话。

eclipse新建工程,并在build path中把storm的jar包引进来(通常在lib文件夹下)。

写第一个类RandomSpout,用于产生数据

import java.util.Map;
import java.util.Random;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class RandomSpout extends BaseRichSpout{
	SpoutOutputCollector collector=null;
	String[] words={"storm","apache","spark","hadoop"};
	@Override
	public void nextTuple() {
		Random random = new Random();
		String word =words[random.nextInt(words.length)];
		//发送消息
		collector.emit(new Values(word));
		
	}

	@Override
	public void open(Map arg0, TopologyContext arg1, SpoutOutputCollector collector) {
		//初始化collector
		this.collector=collector;
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("src_word"));
	}

}

要点:

  • open()方法是运行的最早的一个方法,我们这里初始化了collector,以方便给别的方法使用。
  • 最开始创建了一个String数组,作为单词源。里面有四个单词:storm,apache,spark,hadoop。
  • nextTuple()方法会被循环调用,里面的collector.emit(new Values(word))用于发送数据给接收者。
  • declareOutputFields()方法用来定义输出的字段。

然后新建UpperBolt类,用于接收RandomSpout发来的单词,并将单词转化为大写。

import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class UpperBolt extends BaseBasicBolt {

	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String src_word = tuple.getString(0);
		String upper_word=src_word.toUpperCase();
		//发送消息出去
		collector.emit(new Values(upper_word));
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("upper_word"));
	}

}

要点:

  • execute()方法不断接收tuple,并在里面写逻辑进行处理,并通过collector将封装成tuple的数据emit出去
  • tuple是storm传输数据的单位,我们需要将数据处理好之后封装成tuple,再发送出去,new Values(upper_word)就是封装成tuple的一种形式

再新建SuffixBolt类,用于接受经过大写处理的单词,同时给单词加上后缀“-handled”。最后再写入本地文件。

import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

public class SuffixBolt extends BaseBasicBolt{
	FileWriter fileWriter = null; 
	
    @Override
	public void prepare(Map stormConf, TopologyContext context) {
		try {
			fileWriter = new FileWriter("/home/"+UUID.randomUUID());//初始化fileWriter,写到本地文件,在/home/目录下,文件用随机的UUID命名
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public void execute(Tuple tuple, BasicOutputCollector collector) {
		String upper_word = tuple.getString(0);//接收数据
		String suffix_word = upper_word+"-handled";//加后缀
		
		try {
			fileWriter.append(suffix_word+"\n");//写入文件
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declare(new Fields("suffix_word") );
	}

}

要点:

  • execute()方法不断接收tuple,但和之前的Bolt不一样的是,这次我们不用发送数据了,而是将处理后的数据直接写入本地文件中。

最后,写个主程序类TopMain,用来串联所有的组件。

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;

public class TopMain {

	public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
		
		//新建对象
		TopologyBuilder topologyBuilder = new TopologyBuilder();
		//设置spout,并行度为4,第一个参数为自定义的名字
		topologyBuilder.setSpout("randomspout", new RandomSpout(),4);
		//设置bolt,数据来源是上面的spout
		topologyBuilder.setBolt("upperbolt", new UpperBolt(),4).shuffleGrouping("randomspout");
		topologyBuilder.setBolt("suffixbolt", new SuffixBolt(), 4).shuffleGrouping("upperbolt");
		
		//创建拓扑
		StormTopology top = topologyBuilder.createTopology();
		
		//创建config,配置storm
		Config config = new Config();
		config.setNumWorkers(3);  //进程数
		config.setNumAckers(2);   //应答器数目

		//提交拓扑给集群
		StormSubmitter.submitTopology("example_top", config, top);
	
	}
}

要点:

  • 拓扑是storm里面一个很重要的概念,说白了,就是工程的整合,在这里,你可以设置worker数,Acker数,串联Spout和Bolt等等,关于它的配置,其实有很多内容,这里先不多说,注释把基本的功能写出来了,其实每个地方都可以深入探讨的。

好,代码写完了,怎么跑呢,且听下回分解。

storm的配置以及实例实践-克隆虚拟机

在之前,我们已经把storm的运行环境、相关依赖都安装好了,不知道你还记不记得《storm的配置以及实例实践-设置虚拟机网络》这篇文章,讲的是虚拟机配置网络的事情。同样的,我们现在要做的事情是利用虚拟机来模拟集群,所以,每台虚拟机都需要配置自己的网络。

或许你已经被之前的配置步骤搞的晕头晃脑了。在这里先告诉你,你不需要重新这些步骤了,我们现在要做的,就是直接在虚拟机里面克隆系统,再稍微配置一下网络即可。

克隆虚拟机的过程非常简单。

把虚拟机中的系统关机,选中配置好的那部虚拟机,然后点击菜单的“虚拟机”–“管理”–“克隆”,按照步骤进行克隆即可。需要注意的一点是,步骤中有一步要你选择克隆的类型,选择创建完整克隆。如下图:

创建完整克隆

等待克隆完毕即可。

接下来配置网络。

首先命令行键入

ifconfig -a

找到HWaddr,把它的值复制下来,我的是00:0C:29:d3:8e:e9,这个就是我们新虚拟机的mac地址。

接下来修改 /etc/sysconfig/network-scripts/ifcfg-eth0
内容如下:

DEVICE="eth0"
#BOOTPROTO="dhcp"
BOOTPROTO="static"
IPADDR=192.168.129.101
NETMASK=255.255.255.0
HWADDR="00:0C:29:d3:8e:e9"

上面的代码中,高亮的是要修改的。其中第4行,改ip,因为第一台主机设置的是192.168.129.100,所以这一台改成192.168.129.101;第6行,mac地址,把刚才复制的mac地址弄进来即可。

接着更改主机名

vim /etc/sysconfig/network

把hostname改一下

HOSTNAME=sub01

然后更改网卡:

vim //etc/udev/rules.d/70-persistent-net.rules

把第二行的用#注释掉,然后把ATTR{address}更改一下,改成我们的mac地址。如下图:
更改网卡

搞定。如法炮制,然后再克隆第三台虚拟机,一个简单的集群就搭建起来了。

storm的配置以及实例实践-storm与zookeeper的安装配置

之前配置了那么多东西,都是为了安装storm,事实上,如果我们用的是企业级的服务器,一般会预装很多依赖,非常省事。而之前我所讲的所有配置,都是基于裸机的,安装一遍也没有坏处,至少也对linux更加熟悉了。废话不多说,马上开始配置storm。正如前面所讲,storm的工作,有一部分是依靠zookeeper的,所以,两个都得装。安装过程很容易。

storm下载地址(版本0.9.3):

http://www.apache.org/dyn/closer.lua/storm/apache-storm-0.9.3/apache-storm-0.9.3.tar.gz

zookeeper下载地址(版本3.4.6):

http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz

若是下载过慢或者链接失效可以自行去官网下载。

为了方便起见,我们可以把storm和zookeeper都解压到一个文件夹中,比如我就在根目录下新建了一个env文件夹。

首先解压storm:

tar -xvzf apache-storm-0.9.3.tar.gz /env

然后把storm的路径加入环境变量中,具体而言,就是vim一下~/.bashrc这个文件,然后在末尾加入两句:

export STORM_HOME=/env/apache-storm-0.9.3
export PATH=$PATH:$STORM_HOME/bin

保存。

继续配置storm,vim一下storm目录下的conf/storm.yaml,打开几行注释,配置zookeeper节点以及nimbus

storm.zookeeper.servers:
    - "192.168.254.100"
nimbus.host: "192.168.254.100"

随后解压zookeeper

tar -xvzf zookeeper-3.4.6.tar.gz /env

配置的话,不需要特别配置,我们只需要把样例的配置文件复制一份即可。
切换到zookeeper的目录下,然后执行

cd conf
cp zoo_sample.cfg zoo.cfg

搞定。

至此,一台机子上的所有配置已经完成了。接下来的工作就比较简单了,复制虚拟机,更改一些配置就可以了。

storm的配置以及实例实践-安装依赖

上一篇讲了jdk的安装,这一篇继续配置。在安装storm和zookeeper之前,我们需要安装一些依赖,否则在正式安装的时候会出错。安装依赖也非常简单,yum可以帮我们解决所有的问题。下面说说要安装什么依赖。

先安装c++编译器。

yum install gcc-c++

按照提示输入y,回车,安装完毕。

再安装软件包uuid-devel和libuuid-devel

yum install uuid-devel
yum install libuuid-devel

随后安装libtool

yum install libtool

安装完以上的所有,我们继续安装storm所需要的Zeromq和jzmq
zeromq下载地址:
http://download.zeromq.org/zeromq-4.1.3.tar.gz
jzmq下载地址:
https://codeload.github.com/zeromq/jzmq/zip/master

把下载的两个压缩包上传到服务器,并在服务器上解压。解压完成之后,切换到对应目录下,编译安装。

cd zeromq-4.1.3
./configure
make
make install

安装jzmq的过程也是如法炮制

cd jzmq-master
./autogen.sh
./configure
make
make install

至此,所需依赖就安装完了,下一篇,终于可以正式安装storm了。

storm的配置以及实例实践-更换centos的jdk版本

首先我们命令行输入

java -version

看看centos本身自带的jdk版本,不出意外的话是openjdk的。一般来说,我们需要换成新的jdk,以免之后跑程序的时候出现问题。

以安装jdk1.7为例。

首先去官网下载:

http://www.oracle.com/technetwork/java/javase/downloads/jdk7-downloads-1880260.html

找到合适的版本,由于我们是安装在centos上面的,因此选择合适的linux版本下载,比如我下载的是jdk-7u79-linux-x64.tar.gz

下载完毕,利用上一篇文章《storm的配置以及实例实践-利用securecrt连接服务器》说到的方法,把本地下载的jdk文件上传到服务器上。

一般我们把jdk-7u79-linux-x64.tar.gz解压到这个路径下:/usr/java/

tar -xvf jdk-7u79-linux-x64.tar.gz /usr/java/ 

搞定。记住你解压后jdk的路径名,比如我的就是:/usr/java/jdk1.7.0_79/
剩下就是设置jdk环境变量了。

先编辑一下 /etc/profile这个文件

vim /etc/profile 

在末尾加入几行代码,注意,要和你的jdk路径一直,我以我自己的为例,即/usr/java/jdk1.7.0_79/

export JAVA_HOME=/usr/local/java/jdk1.7.0_79
export JRE_HOME=/usr/local/java/jdk1.7.0_79/jre
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin: $PATH

保存退出。
再次使用

java -version

查看java版本,显示是刚才安装的那个版本就可以了,配置成功。