package com.oncedq.code;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.text.SimpleDateFormat;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
import org.apache.hadoop.mapred.lib.ChainMapper;
import com.oncedq.code.util.DateUtil;
public class ProcessSample {
public static class ExtractMappper extends MapReduceBase implements
Mapper<LongWritable, Text, LongWritable, Conn1> {
@Override
public void map(LongWritable arg0, Text arg1,
OutputCollector<LongWritable, Conn1> arg2, Reporter arg3)
throws IOException {
String line = arg1.toString();
String[] strs = line.split(";");
Conn1 conn1 = new Conn1();
conn1.orderKey = Long.parseLong(strs[0]);
conn1.customer = Long.parseLong(strs[1]);
conn1.state = strs[2];
conn1.price = Double.parseDouble(strs[3]);
conn1.orderDate = DateUtil.getDateFromString(strs[4], "yyyy-MM-dd");
LongWritable lw = new LongWritable(conn1.orderKey);
arg2.collect(lw, conn1);
}
}
private static class Conn1 implements WritableComparable<Conn1> {
public long orderKey;
public long customer;
public String state;
public double price;
public java.util.Date orderDate;
@Override
public void readFields(DataInput in) throws IOException {
orderKey = in.readLong();
customer = in.readLong();
state = Text.readString(in);
price = in.readDouble();
orderDate = DateUtil.getDateFromString(Text.readString(in),
"yyyy-MM-dd");
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(orderKey);
out.writeLong(customer);
Text.writeString(out, state);
out.writeDouble(price);
Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
}
@Override
public int compareTo(Conn1 arg0) {
// TODO Auto-generated method stub
return 0;
}
}
public static class Filter1Mapper extends MapReduceBase implements
Mapper<LongWritable, Conn1, LongWritable, Conn2> {
@Override
public void map(LongWritable inKey, Conn1 c2,
OutputCollector<LongWritable, Conn2> collector, Reporter report)
throws IOException {
if (c2.state.equals("F")) {
Conn2 inValue = new Conn2();
inValue.customer = c2.customer;
inValue.orderDate = c2.orderDate;
inValue.orderKey = c2.orderKey;
inValue.price = c2.price;
inValue.state = c2.state;
collector.collect(inKey, inValue);
}
}
}
private static class Conn2 implements WritableComparable<Conn1> {
public long orderKey;
public long customer;
public String state;
public double price;
public java.util.Date orderDate;
@Override
public void readFields(DataInput in) throws IOException {
orderKey = in.readLong();
customer = in.readLong();
state = Text.readString(in);
price = in.readDouble();
orderDate = DateUtil.getDateFromString(Text.readString(in),
"yyyy-MM-dd");
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(orderKey);
out.writeLong(customer);
Text.writeString(out, state);
out.writeDouble(price);
Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
}
@Override
public int compareTo(Conn1 arg0) {
// TODO Auto-generated method stub
return 0;
}
}
public static class RegexMapper extends MapReduceBase implements
Mapper<LongWritable, Conn2, LongWritable, Conn3> {
@Override
public void map(LongWritable inKey, Conn2 c3,
OutputCollector<LongWritable, Conn3> collector, Reporter report)
throws IOException {
c3.state = c3.state.replaceAll("F", "Find");
Conn3 c2 = new Conn3();
c2.customer = c3.customer;
c2.orderDate = c3.orderDate;
c2.orderKey = c3.orderKey;
c2.price = c3.price;
c2.state = c3.state;
collector.collect(inKey, c2);
}
}
private static class Conn3 implements WritableComparable<Conn1> {
public long orderKey;
public long customer;
public String state;
public double price;
public java.util.Date orderDate;
@Override
public void readFields(DataInput in) throws IOException {
orderKey = in.readLong();
customer = in.readLong();
state = Text.readString(in);
price = in.readDouble();
orderDate = DateUtil.getDateFromString(Text.readString(in),
"yyyy-MM-dd");
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(orderKey);
out.writeLong(customer);
Text.writeString(out, state);
out.writeDouble(price);
Text.writeString(out, DateUtil.getDateStr(orderDate, "yyyy-MM-dd"));
}
@Override
public int compareTo(Conn1 arg0) {
// TODO Auto-generated method stub
return 0;
}
}
public static class LoadMapper extends MapReduceBase implements
Mapper<LongWritable, Conn3, LongWritable, Conn3> {
@Override
public void map(LongWritable arg0, Conn3 arg1,
OutputCollector<LongWritable, Conn3> arg2, Reporter arg3)
throws IOException {
arg2.collect(arg0, arg1);
}
}
public static void main(String[] args) {
JobConf job = new JobConf(ProcessSample.class);
job.setJobName("ProcessSample");
job.setNumReduceTasks(0);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
JobConf mapper1 = new JobConf();
JobConf mapper2 = new JobConf();
JobConf mapper3 = new JobConf();
JobConf mapper4 = new JobConf();
ChainMapper cm = new ChainMapper();
cm.addMapper(job, ExtractMappper.class, LongWritable.class, Text.class,
LongWritable.class, Conn1.class, true, mapper1);
cm.addMapper(job, Filter1Mapper.class, LongWritable.class, Conn1.class,
LongWritable.class, Conn2.class, true, mapper2);
cm.addMapper(job, RegexMapper.class, LongWritable.class, Conn2.class,
LongWritable.class, Conn3.class, true, mapper3);
cm.addMapper(job, LoadMapper.class, LongWritable.class, Conn3.class,
LongWritable.class, Conn3.class, true, mapper4);
FileInputFormat.setInputPaths(job, new Path("orderData"));
FileOutputFormat.setOutputPath(job, new Path("orderDataOutput"));
Job job1;
try {
job1 = new Job(job);
JobControl jc = new JobControl("test");
jc.addJob(job1);
jc.run();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
如果想了解程序的具体意思,探讨ChainMapper和ChainReducer在数据处理流程中的应用,请加我QQ:405078363
分享到:
相关推荐
该资源包含了数据库设计的一些流程,数据字典等数据之间关系的构造
数据流程图和业务流程图案例教程.pdf数据流程图和业务流程图案例教程.pdf数据流程图和业务流程图案例教程.pdf数据流程图和业务流程图案例教程.pdf数据流程图和业务流程图案例教程.pdf
包含一些数据结构示例 可以参看以下 适合刚学习的人交流
数据流程图示例 软件工程 数据流程图示例 软件工程
本项目是基于Java的Spark数据处理示例设计源码,共包含157个文件,其中主要包含136个Java源代码文件,3个gitignore文件,3个xml配置文件,3个parquet数据文件,3个json数据文件,2个avsc文件,2个txt文本文件,1个...
给学习sql的人用,不下后悔
SpringBoot处理JSON数据示例代码,SpringBoot处理JSON数据示例代码
半导体磁阻效应数据处理打印示例- 半导体磁阻效应数据处理示例.doc
第11章:缺少数据建模和贝叶斯估计视图示例 第12章:蒙特卡洛模拟研究查看示例 第13章:示例:特殊功能 第14章:特殊建模问题 第 15 章:标题、数据、变量和定义命令 第16章:分析命令 第17章:MODEL命令 第 ...
使用iServer JAVA API访问iServer数据服务进行数据操作示例工程,博客地址: http://blog.csdn.net/supermapsupport/article/details/71107998
示例-SAP物料主数据详解示例-SAP物料主数据详解示例-SAP物料主数据详解示例-SAP物料主数据详解示例-SAP物料主数据详解示例-SAP物料主数据详解示例-SAP物料主数据详解示例-SAP物料主数据详解示例-SAP物料主数据详解...
Qt post请求发送和解析json数据完整示例,有完整的设置框架,可以发送和解析多个post请求
c数据结构示例c数据结构示例c数据结构示例c数据结构示例c数据结构示例c数据结构示例c数据结构示例c数据结构示例
这个示例代码中,我们展示了...该示例代码展示了Matlab数据处理与可视化的一些常用技巧,帮助你加载数据、进行数据处理和创建数据可视化。你可以根据需要对代码进行修改和扩展,以适应具体的数据处理和可视化需求。
python实现ARIMA时间序列预测模型,附有示例数据以及完整流程的结果 python实现ARIMA时间序列预测模型,附有示例数据以及完整流程的结果 python实现ARIMA时间序列预测模型,附有示例数据以及完整流程的结果 python...
Excel数据处理与分析实战技巧精粹示例文件
这个项目是一个使用 Python 的 Pandas 库进行数据处理和分析的示例。Pandas 是一个强大的数据处理工具,可以帮助我们加载、清洗、转换和分析数据,从而从原始数据中提取有价值的信息。在这个项目中,我们将展示如何...
县霍乱疫情应急处理流程图示例.pdf
numpy处理数据示例.ipynb
【小白必看】Python爬取NBA球员数据示例【小白必看】Python爬取NBA球员数据示例【小白必看】Python爬取NBA球员数据示例【小白必看】Python爬取NBA球员数据示例【小白必看】Python爬取NBA球员数据示例【小白必看】...