服务器之家

服务器之家 > 正文

java 中自定义OutputFormat的实例详解

时间:2020-12-18 13:11     来源/作者:woshisap

java 中 自定义OutputFormat的实例详解

实例代码:

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package com.ccse.hadoop.outputformat;
 
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.StringTokenizer;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 
 
public class MySelfOutputFormatApp {
   
  public final static String INPUT_PATH = "hdfs://chaoren1:9000/mapinput";
  public final static String OUTPUT_PATH = "hdfs://chaoren1:9000/mapoutput";
  public final static String OUTPUT_FILENAME = "/abc";
   
  public static void main(String[] args) throws IOException, URISyntaxException, 
    ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), conf);
    fileSystem.delete(new Path(OUTPUT_PATH), true);
     
    Job job = new Job(conf, MySelfOutputFormatApp.class.getSimpleName());
    job.setJarByClass(MySelfOutputFormatApp.class);
     
    FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
    job.setMapperClass(MyMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
     
    job.setReducerClass(MyReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setOutputFormatClass(MyselfOutputFormat.class);
     
    job.waitForCompletion(true);
  }
   
  public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
 
    private Text word = new Text();
    private LongWritable writable = new LongWritable(1);
     
    @Override
    protected void map(LongWritable key, Text value,
        Mapper<LongWritable, Text, Text, LongWritable>.Context context)
        throws IOException, InterruptedException {
      if (value != null) {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
          word.set(tokenizer.nextToken());
          context.write(word, writable);
        }
      }
    }
     
  }
   
  public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
 
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values,
        Reducer<Text, LongWritable, Text, LongWritable>.Context context)
        throws IOException, InterruptedException {
      long sum = 0
      for (LongWritable value : values) {
        sum += value.get();
      }
      context.write(key, new LongWritable(sum));
    }
  }
 
  public static class MyselfOutputFormat extends OutputFormat<Text, LongWritable> {
 
    private FSDataOutputStream outputStream = null;
     
    @Override
    public RecordWriter<Text, LongWritable> getRecordWriter(
        TaskAttemptContext context) throws IOException,
        InterruptedException {
      try {
        FileSystem fileSystem = FileSystem.get(new URI(MySelfOutputFormatApp.OUTPUT_PATH), context.getConfiguration());
        //指定文件的输出路径
        final Path path = new Path(MySelfOutputFormatApp.OUTPUT_PATH 
                     + MySelfOutputFormatApp.OUTPUT_FILENAME);
        this.outputStream = fileSystem.create(path, false);
      } catch (URISyntaxException e) {
        e.printStackTrace();
      }
      return new MySelfRecordWriter(outputStream);
    }
 
    @Override
    public void checkOutputSpecs(JobContext context) throws IOException,
        InterruptedException {
    }
 
    @Override
    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
        throws IOException, InterruptedException {
      return new FileOutputCommitter(new Path(MySelfOutputFormatApp.OUTPUT_PATH), context);
    }
     
  }
   
  public static class MySelfRecordWriter extends RecordWriter<Text, LongWritable> {
 
    private FSDataOutputStream outputStream = null;
     
    public MySelfRecordWriter(FSDataOutputStream outputStream) {
      this.outputStream = outputStream;
    }
     
    @Override
    public void write(Text key, LongWritable value) throws IOException,
        InterruptedException {
      this.outputStream.writeBytes(key.toString());
      this.outputStream.writeBytes("\t");
      this.outputStream.writeLong(value.get());
    }
 
    @Override
    public void close(TaskAttemptContext context) throws IOException,
        InterruptedException {
      this.outputStream.close();
    }
     
  }
   
}

 2.OutputFormat是用于处理各种输出目的地的。

2.1 OutputFormat需要写出去的键值对,是来自于Reducer类,是通过RecordWriter获得的。

2.2 RecordWriter中的write(...)方法只有k和v,写到哪里去哪?这要通过单独传入OutputStream来处理。write就是把k和v写入到OutputStream中的。

2.3 RecordWriter类位于OutputFormat中的。因此,我们自定义的OutputFromat必须继承OutputFormat类型。那么,流对象必须在getRecordWriter(...)方法中获得。

以上就是java 中自定义OutputFormat的实例,如有疑问请留言或者到本站社区交流讨论,感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!

原文链接:http://blog.csdn.net/woshisap/article/details/42320129

标签:

相关文章

热门资讯

2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全
2020微信伤感网名听哭了 让对方看到心疼的伤感网名大全 2019-12-26
Intellij idea2020永久破解,亲测可用!!!
Intellij idea2020永久破解,亲测可用!!! 2020-07-29
歪歪漫画vip账号共享2020_yy漫画免费账号密码共享
歪歪漫画vip账号共享2020_yy漫画免费账号密码共享 2020-04-07
电视剧《琉璃》全集在线观看 琉璃美人煞1-59集免费观看地址
电视剧《琉璃》全集在线观看 琉璃美人煞1-59集免费观看地址 2020-08-12
最新idea2020注册码永久激活(激活到2100年)
最新idea2020注册码永久激活(激活到2100年) 2020-07-29
返回顶部