抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >


大数据处理技术 - Hadoop-MapReduce 编程模型 - WordCount 实例分析

WordCount 示例编写

需求:在一堆给定的文本文件中统计输出每一个单词出现的总次数
数据格式准备如下:

hadoop,hive,hbase
hive,storm
hive,hbase,kafka
spark,flume,kafka,storm
hbase,hadoop,hbase
hive,spark,storm

定义 mapper 类

WordCountMapper.java
package com.qst.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* mapper 类继承 Mapper 表示我们的这个 class 类是一个标准的 mapper 类,需要四个泛型
* k1 v1 k2 v2
*/
public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
Text text = new Text();
IntWritable intWritable = new IntWritable();
/**
* 覆写父类的 map 方法,每一行数据要调用一次 map 方法,我们的处理逻辑都写在这个 map 方法里面
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
* hdfs 的最原始数据
hadoop,hive,hbase
hive,storm
hive,hbase,kafka
spark,flume,kafka,storm
hbase,hadoop,hbase
hive,spark,storm
经过第一步:TextInputFormat 之后
0 hadoop,hive,hbase
17 hive,storm
27 hive,hbase,kafka
*/
/**
* @param key 我们的 key1 行偏移量 ,一般没啥用,直接可以丢掉
* @param value 我们的 value1 行文本内容,需要切割,然后转换成新的 k2 v2 输出
* @param context 上下文对象,承接上文,把数据传输给下文
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
/*
hadoop,hive,hbase
hive,storm
hive,hbase,kafka
spark,flume,kafka,storm
hbase,hadoop,hbase
hive,spark,storm
*/
String line = value.toString();
String[] split = line.split(",");
//遍历我们切割出来的单词
for (String word : split) {
text.set(word);
intWritable.set(1);
//写出我们的 k2 v2 这里的类型跟我们的 k2 v2 保持一致
context.write(text,intWritable);
}
}
}

定义 reducer 类

WordCountReducer.java
package com.qst.wordcount;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 我们自定的 class 类继承 reducer 类表明我们这是一个标准的 reducer 类
* 跟我们的 k2 v2 k3 v3 四个泛型
*/
public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
/**
* 覆写 reduce 方法,
* @param key 接收的 key 是我们的 K2
* @param values 接收到 value 是一个集合 集合里面的数据类型是 v2 类型
* @param context 上下文对象,将我们的数据往外写
* @throws IOException
* @throws InterruptedException
*/
/*
hadoop,hive,hbase
hive,storm
hive,hbase,kafka
spark,flume,kafka,storm
hbase,hadoop,hbase
hive,spark,storm
hadoop <1,1>
hive <1,1,1,1>
hbase<1,1,1>
*/
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int a = 0;
for (IntWritable value : values) {
int i = value.get();
a += i;
}
//将我们的数据写出去
context.write(key,new IntWritable(a));
}
}

定义 main 方法

jobMain.java
package com.qst.wordcount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class JobMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//获取一个 job 对象,用于我们任务的组织,通过 job 对象将我们八个步骤组织到一起,提交给 yarn 集群运行
Job job = Job.getInstance(super.getConf(), "xxx");
//如果需要打包运行,一定得要加上这一句
job.setJarByClass(JobMain.class);
//获取到我们的 job 对象之后,通过 job 对象来组织我们的八个 class 类到一起,然后提交给 yarn 集群运行即可
//第一步:读取文件,解析成 key,value 对,这里是 k1 v1
job.setInputFormatClass(TextInputFormat.class);
//集群运行模式,从 hdfs 上面读取文件
// TextInputFormat.addInputPath(job,new Path("hdfs://node01:8020/wordcount"));
//使用本地模式来运行,从本地磁盘读取文件进行处理
TextInputFormat.addInputPath(job,new Path("file:///F:\\input"));
//第二步:自定义 map 逻辑,接收第一步的 k1,v1 转换成新的 k2 v2 进行输出
job.setMapperClass(WordCountMapper.class);
//设置我们 key2 的类型
job.setMapOutputKeyClass(Text.class);
//设置我们的 v2 类型
job.setMapOutputValueClass(IntWritable.class);
/**
* 第三到六步
* 第三步:分区 相同 key 的 value 发送到同一个 reduce 里面去,形成一个集合
* 第四步:排序
* 第五步:规约
* 第六步:分组
* 都省掉
*/
//第七步:设置我们的 reduce 类,接受我们的 key2 v2 输出我们 k3 v3
job.setReducerClass(WordCountReducer.class);
//设置我们 key3 输出的类型
job.setOutputKeyClass(Text.class);
//设置我们 value3 的输出类型
job.setOutputValueClass(IntWritable.class);
//第八步:设置我们的输出类 outputformat
job.setOutputFormatClass(TextOutputFormat.class);
//输出路径输出到 hdfs 上面去,表示我打包到集群上面去运行
// TextOutputFormat.setOutputPath(job,new Path("hdfs://node01:8020/wordcountout"));
//使用本地模式来运行
TextOutputFormat.setOutputPath(job,new Path("file:///F:\\output"));
//提交我们的任务
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
//提交我们的 job 任务
//任务完成之后,返回一个状态码值,如果状态码值是 0,表示程序运 行成功
int run = ToolRunner.run(configuration, new JobMain(), args);
System.exit(run);
}
}

WordCount 示例

提醒:本地运行完成之后,就可以打成 jar 包放到服务器上面去运行了,实际工作当中,都是将代码打成 jar 包,开发 main 方法作为程序的入口,然后放到集群上面去运行

推荐阅读
大数据处理技术-Hadoop-MapReduce的运行机制 大数据处理技术-Hadoop-MapReduce的运行机制 Hive安装部署 Hive安装部署 大数据处理技术-HDFS的JavaAPI操作 大数据处理技术-HDFS的JavaAPI操作 大数据处理技术-apache hadoop三种架构介绍(StandAlone) 大数据处理技术-apache hadoop三种架构介绍(StandAlone) 大数据处理技术-Hadoop集群初体验 大数据处理技术-Hadoop集群初体验 大数据处理技术-Hadoop-MapReduce 大数据处理技术-Hadoop-MapReduce

留言区

Are You A Robot?