|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
Parameter接口:
- package com.xiaohong.TongJi;
- public interface Parameter {
- /**
- * LenFields is the length of fields, or columns; it need to be modified for varied
- */
- public final int LenFields = 4;
- /**
- * iFields[0] for min()
- * iFields[1] for max()
- * iFields[2] for sum()
- * iFields[3] for sum2()
- */
- public final int LenStat = 4;
- public final int iMin = 0x80000000;
- public final int iMax = 0x7FFFFFFF;
- public final float fMin = (float) -3.4E38;
- public final float fMax = (float) 3.4E38;
- }
复制代码
map端:
- package com.xiaohong.TongJi;
- import java.io.IOException;
- import java.util.StringTokenizer;
- import org.apache.hadoop.io.FloatWritable;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapreduce.Mapper;
- public class TongJiMapper extends Mapper<LongWritable, Text, IntWritable, FloatWritable> implements Parameter {
- /**
- * The iFields[][] is the Array for Keys to present the min(), max(), sum()
- * of each field in the input. iFields[0] for min() iFields[1] for max()
- * iFields[2] for sum() iFields[3] for sum2()
- */
- private static IntWritable iFields[][] = new IntWritable[LenStat][LenFields];
- private static float min[] = new float[LenFields];
- private static float max[] = new float[LenFields];
- private static IntWritable iwCnt = new IntWritable(LenStat * LenFields);
- private final static FloatWritable one = new FloatWritable(1);
- public TongJiMapper() {
- for (int i = 0; i < LenStat; i++) {
- for (int j = 0; j < LenFields; j++) {
- iFields[i][j] = new IntWritable(i * LenFields + j);
- }
- }
- for (int j = 0; j < LenFields; j++) {
- min[j] = fMax; // the maximum integer
- max[j] = fMin; // the minimum integer
- }
- }
- @Override
- protected void map(LongWritable key, Text values,
- Mapper<LongWritable, Text, IntWritable, FloatWritable>.Context context)
- throws IOException, InterruptedException {
- /*
- * 原始数据:
- * 9.0000000e+000 1.0000000e+000 2.0000000e+000 4.0000000e+000
- 3.0000000e+000 8.0000000e+000 4.0000000e+000 8.0000000e+000
- 6.0000000e+000 5.0000000e+000 9.0000000e+000 1.0000000e+000
- 5.0000000e+000 6.0000000e+000 9.0000000e+000 2.0000000e+000
- 9.0000000e+000 8.0000000e+000 4.0000000e+000 2.0000000e+000
- 7.0000000e+000 9.0000000e+000 9.0000000e+000 2.0000000e+000
- 5.0000000e+000 7.0000000e+000 1.0000000e+000 6.0000000e+000
- */
- StringTokenizer st = new StringTokenizer(values.toString().toLowerCase(), " \t,;");
- float iTmp;
- for (int j = 0; j < LenFields; j++) {
- /** handle each field. */
- iTmp = Float.parseFloat(st.nextToken());
- /**
- * for min(), this judgement just output
- * about 37 <key,value> pairs in 100,000
- * records.
- */
-
- if (min[j] > iTmp) {
- min[j] = iTmp;
- context.write(iFields[0][j], new FloatWritable(min[j]));
- }
- if (max[j] < iTmp) { /** for max() */
- max[j] = iTmp;
- context.write(iFields[1][j], new FloatWritable(max[j]));
- }
- context.write(iFields[2][j], new FloatWritable(iTmp));
- /** for sum() */
- context.write(iFields[3][j], new FloatWritable(iTmp * iTmp));/** for sum2() */
- }
- context.write(iwCnt, one); /** for cnt() */
- }
- }
复制代码
reducer端:
- package com.xiaohong.TongJi;
- import java.io.IOException;
- import org.apache.hadoop.io.FloatWritable;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.mapreduce.Reducer;
- public class TongJiReducer extends Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>
- implements Parameter {
- @Override
- protected void reduce(IntWritable K, Iterable<FloatWritable> values,
- Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>.Context context)
- throws IOException, InterruptedException {
- float min = iMax;
- float max = iMin;
- float sum = 0;
- long iCnt = 0;
- int iCategory = K.get() / LenFields; // restore the category from Keys.
- switch (iCategory) {
- case 0:/** min() */
- for (FloatWritable value : values) {
- if (min > value.get()) {
- min = value.get();
- }
- }
- context.write(K, new FloatWritable(min));
- break;
- case 1:/** max() */
- for (FloatWritable value : values) {
- if (max < value.get()) {
- max = value.get();
- }
- }
- context.write(K, new FloatWritable(max));
- break;
- case 2:/** sum() */
- for (FloatWritable value : values) {
- sum += value.get();
- }
- context.write(K, new FloatWritable(sum));
- break;
- case 3: /** sum2() */
- for (FloatWritable value : values) {
- sum += value.get();
- }
- context.write(K, new FloatWritable(sum));
- break;
- case 4:
- for (FloatWritable value : values) {
- iCnt += value.get();
- }
- context.write(K, new FloatWritable(iCnt));
- break;
- } // switch
- }
- }
复制代码
yarn客户端:
- package com.xiaohong.TongJi;
- import java.io.IOException;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.FloatWritable;
- import org.apache.hadoop.io.IntWritable;
- import org.apache.hadoop.mapreduce.Job;
- import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
- import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
- import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
- import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
- public class JobSubMitter {
- public static void main(String[] args) throws Exception {
- Configuration conf = new Configuration();
- Job job = Job.getInstance(conf);
- job.setJarByClass(JobSubMitter.class);
- job.setMapperClass(TongJiMapper.class);
- job.setReducerClass(TongJiReducer.class);
- job.setMapOutputKeyClass(IntWritable.class);
- job.setMapOutputValueClass(FloatWritable.class);
- job.setOutputKeyClass(IntWritable.class);
- job.setOutputValueClass(FloatWritable.class);
- job.setInputFormatClass(TextInputFormat.class);
- FileInputFormat.setInputPaths(job, new Path("d:/mrData/TongJi/input"));
- job.setOutputFormatClass(TextOutputFormat.class);
- FileOutputFormat.setOutputPath(job, new Path("d:/mrData/TongJi/output"));
- boolean res = job.waitForCompletion(true);
- System.exit(res ? 0 : 1);
- }
- }
复制代码
问题:我想问下当reducer端执行到这句代码的时候"int iCategory = K.get()",怎么将K转换成0~4了呢?,K不是0~15吗,难道调用.get()方法有什么玄机吗?
|
|