博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
mapreduce 二次排序
阅读量:5121 次
发布时间:2019-06-13

本文共 8572 字,大约阅读时间需要 28 分钟。

1 二次排序

1.1 思路

所谓二次排序,对第1个字段相同的数据,使用第2个字段进行排序。

举个例子,电商平台记录了每一用户的每一笔订单的订单金额,现在要求属于同一个用户的所有订单金额作排序,并且输出的用户名也要排序。

账户 订单金额
hadoop@apache 200
hive@apache 550
yarn@apache 580
hive@apache 159
hadoop@apache 300
hive@apache 258
hadoop@apache 300
yarn@apache 100
hadoop@apache 150
yarn@apache 560
yarn@apache 260

二次排序后的结果

账户 订单金额
hadoop@apache 150
hadoop@apache 200
hadoop@apache 300
hadoop@apache 300
hive@apache 159
hive@apache 258
hive@apache 550
yarn@apache 100
yarn@apache 260
yarn@apache 560
yarn@apache 580

实现的思路是使用自定义key,key中实现按用户名和订单金额2个字段的排序,自定义分区和分组类,按用户名进行分区和分组。自定义排序的比较器,分别用于在map端和reduce的合并排序。

因为hadoop默认使用的字符串序列化java.io.DataOutputStream.writeUTF(), 使用了"变种的UTF编码",序列化后的字节流不能在RawComparator使用。

在实现中,用一种变通的方法,直接使用“账户”字段的字节流,并且把字节流长度也一并序列化。RawComparator得到的字节流就是我们写进去的字节流。当然,在进行反序列化时,需要根据这个长度来读出“账户”字段。

1.2 实现

程序代码

package com.hadoop;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.nio.charset.Charset;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.DoubleWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.io.WritableComparable;import org.apache.hadoop.io.WritableComparator;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.security.UserGroupInformation;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class SecondarySortMapReduce extends Configured implements Tool {    /**     * 消费信息     * @author Ivan     *     */    public static class CostBean implements WritableComparable
{ private String account; private double cost; public void set(String account, double cost) { this.account = account; this.cost = cost; } public String getAccount() { return account; } public double getCost() { return cost; } @Override public void write(DataOutput out) throws IOException { byte[] buffer = account.getBytes(Charset.forName("UTF-8")); out.writeInt(buffer.length); // 账户的字节流长度. out.writeUTF()使用的编码方式很复杂,需要使用DataInput.readUTF()来解码,这里不这么用 out.write(buffer); out.writeDouble(cost); } @Override public void readFields(DataInput in) throws IOException { int accountLength = in.readInt(); byte[] bytes = new byte[accountLength]; in.readFully(bytes); account = new String(bytes); cost = in.readDouble(); } @Override public int compareTo(CostBean o) { if (account.equals(o.account)) { //账户相等, 接下来比较消费金额 return cost == o.cost ? 0 : (cost > o.cost ? 1 : -1); } return account.compareTo(o.account); } @Override public String toString() { return account + "\t" + cost; } } /** * 用于map端和reduce端排序的比较器:如果账户相同,则比较金额 * @author Ivan * */ public static class CostBeanComparator extends WritableComparator { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int accountLength1 = readInt(b1, s1); int accountLength2 = readInt(b2, s2); int result = compareBytes(b1, s1 + 4, accountLength1, b2, s2 + 4, accountLength2); if (result == 0) { // 账户相同,则比较金额 double thisValue = readDouble(b1, s1 + 4 + accountLength1); double thatValue = readDouble(b2, s2 + 4 + accountLength2); return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1)); } else { return result; } } } /** * 用于map端在写磁盘使用的分区器 * @author Ivan * */ public static class CostBeanPatitioner extends Partitioner
{ /** * 根据 account分区 */ @Override public int getPartition(CostBean key, DoubleWritable value, int numPartitions) { return key.account.hashCode() % numPartitions; } } /** * 用于在reduce端分组的比较器根据account字段分组,即相同account的作为一组 * @author Ivan * */ public static class GroupComparator extends WritableComparator { @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { int accountLength1 = readInt(b1, s1); int accountLength2 = readInt(b2, s2); byte[] tmpb1 = new byte[accountLength1]; byte[] tmpb2 = new byte[accountLength2]; System.arraycopy(b1, s1 + 4, tmpb1, 0, accountLength1); System.arraycopy(b2, s2 + 4, tmpb2, 0, accountLength2); String account1 = new String(tmpb1, Charset.forName("UTF-8")); String account2 = new String(tmpb1, Charset.forName("UTF-8")); System.out.println("grouping: accout1=" + account1 + ", accout2=" + account2); return compareBytes(b1, s1 + 4, accountLength1, b2, s2 + 4, accountLength2); } } /** * Mapper类 * @author Ivan * */ public static class SecondarySortMapper extends Mapper
{ private final CostBean outputKey = new CostBean(); private final DoubleWritable outputValue = new DoubleWritable(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] data = value.toString().split("\t"); double cost = Double.parseDouble(data[1]); outputKey.set(data[0].trim(), cost); outputValue.set(cost); context.write(outputKey, outputValue); } } public static class SecondarySortReducer extends Reducer
{ private final Text outputKey = new Text(); private final DoubleWritable outputValue = new DoubleWritable(); @Override protected void reduce(CostBean key, Iterable
values,Context context) throws IOException, InterruptedException { outputKey.set(key.getAccount()); for (DoubleWritable v : values) { outputValue.set(v.get()); context.write(outputKey, outputValue); } } } public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = Job.getInstance(conf, SecondarySortMapReduce.class.getSimpleName()); job.setJarByClass(SecondarySortMapReduce.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // map settings job.setMapperClass(SecondarySortMapper.class); job.setMapOutputKeyClass(CostBean.class); job.setMapOutputValueClass(DoubleWritable.class); // partition settings job.setPartitionerClass(CostBeanPatitioner.class); // sorting job.setSortComparatorClass(CostBeanComparator.class); // grouping job.setGroupingComparatorClass(GroupComparator.class); // reduce settings job.setReducerClass(SecondarySortReducer.class); job.setOutputKeyClass(Text.class); job.setOutputKeyClass(DoubleWritable.class); boolean res = job.waitForCompletion(true); return res ? 0 : 1; } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { if (args.length < 2) { throw new IllegalArgumentException("Usage:
"); } ToolRunner.run(new Configuration(), new SecondarySortMapReduce(), args); }}

1.3 测试

运行环境
  • 操作系统: Centos 6.4
  • Hadoop: Apache Hadoop-2.5.0

拿上面的例子作为测试数据

账户 金额
hadoop@apache 200
hive@apache 550
yarn@apache 580
hive@apache 159
hadoop@apache 300
hive@apache 258
hadoop@apache 300
yarn@apache 100
hadoop@apache 150
yarn@apache 560
yarn@apache 260

1.png

转载于:https://www.cnblogs.com/ivanny/p/secondary_soft.html

你可能感兴趣的文章
mysql基础语句
查看>>
cassandra vs mongo (1)存储引擎
查看>>
Visual Studio基于CMake配置opencv1.0.0、opencv2.2
查看>>
MySQL索引背后的数据结构及算法原理
查看>>
#Leetcode# 209. Minimum Size Subarray Sum
查看>>
SDN第四次作业
查看>>
DM8168 DVRRDK软件框架研究
查看>>
django迁移数据库错误
查看>>
yii 跳转页面
查看>>
洛谷 1449——后缀表达式(线性数据结构)
查看>>
Data truncation: Out of range value for column 'Quality' at row 1
查看>>
Dirichlet分布深入理解
查看>>
字符串处理
查看>>
HtmlUnitDriver 网页内容动态抓取
查看>>
ad logon hour
查看>>
获得进程可执行文件的路径: GetModuleFileNameEx, GetProcessImageFileName, QueryFullProcessImageName...
查看>>
证件照(1寸2寸)拍摄处理知识汇总
查看>>
罗马数字与阿拉伯数字转换
查看>>
Eclipse 反编译之 JadClipse
查看>>
Python入门-函数
查看>>