鱼C论坛

 找回密码
 立即注册
查看: 1300|回复: 0

大数据mapreduce疑惑

[复制链接]
发表于 2018-5-31 22:32:37 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
Parameter接口:

  1. package com.xiaohong.TongJi;

  2. public interface Parameter {
  3.         /**
  4.          * LenFields is the length of fields, or columns; it need to be modified for varied
  5.          */
  6.         public final int LenFields = 4;
  7.         /**
  8.          * iFields[0] for min()
  9.          * iFields[1] for max()
  10.          * iFields[2] for sum()
  11.          * iFields[3] for sum2()
  12.          */
  13.         public final int LenStat = 4;

  14.         public final int iMin = 0x80000000;
  15.         public final int iMax = 0x7FFFFFFF;
  16.         public final float fMin = (float) -3.4E38;
  17.         public final float fMax = (float) 3.4E38;
  18. }
复制代码


map端:

  1. package com.xiaohong.TongJi;

  2. import java.io.IOException;
  3. import java.util.StringTokenizer;

  4. import org.apache.hadoop.io.FloatWritable;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Mapper;

  9. public class TongJiMapper extends Mapper<LongWritable, Text, IntWritable, FloatWritable> implements Parameter {

  10.         /**
  11.          * The iFields[][] is the Array for Keys to present the min(), max(), sum()
  12.          * of each field in the input. iFields[0] for min() iFields[1] for max()
  13.          * iFields[2] for sum() iFields[3] for sum2()
  14.          */

  15.         private static IntWritable iFields[][] = new IntWritable[LenStat][LenFields];
  16.         private static float min[] = new float[LenFields];
  17.         private static float max[] = new float[LenFields];
  18.         private static IntWritable iwCnt = new IntWritable(LenStat * LenFields);
  19.         private final static FloatWritable one = new FloatWritable(1);

  20.         public TongJiMapper() {
  21.                 for (int i = 0; i < LenStat; i++) {
  22.                         for (int j = 0; j < LenFields; j++) {
  23.                                 iFields[i][j] = new IntWritable(i * LenFields + j);
  24.                         }
  25.                 }

  26.                 for (int j = 0; j < LenFields; j++) {
  27.                         min[j] = fMax; // the maximum integer
  28.                         max[j] = fMin; // the minimum integer
  29.                 }

  30.         }

  31.         @Override
  32.         protected void map(LongWritable key, Text values,
  33.                         Mapper<LongWritable, Text, IntWritable, FloatWritable>.Context context)
  34.                         throws IOException, InterruptedException {

  35.                 /*
  36.                  * 原始数据:
  37.                  *         9.0000000e+000        1.0000000e+000        2.0000000e+000        4.0000000e+000
  38.                         3.0000000e+000        8.0000000e+000        4.0000000e+000        8.0000000e+000
  39.                         6.0000000e+000        5.0000000e+000        9.0000000e+000        1.0000000e+000
  40.                         5.0000000e+000        6.0000000e+000        9.0000000e+000        2.0000000e+000
  41.                         9.0000000e+000        8.0000000e+000        4.0000000e+000        2.0000000e+000
  42.                         7.0000000e+000        9.0000000e+000        9.0000000e+000        2.0000000e+000
  43.                         5.0000000e+000        7.0000000e+000        1.0000000e+000        6.0000000e+000
  44.                  */
  45.                 StringTokenizer st = new StringTokenizer(values.toString().toLowerCase(), " \t,;");
  46.                 float iTmp;
  47.                 for (int j = 0; j < LenFields; j++) {
  48.                         /** handle each field. */
  49.                         iTmp = Float.parseFloat(st.nextToken());
  50.                         /**
  51.                          * for min(), this judgement just output
  52.                          * about 37 <key,value> pairs in 100,000
  53.                          * records.
  54.                          */
  55.                        
  56.                         if (min[j] > iTmp) {
  57.                                 min[j] = iTmp;
  58.                                 context.write(iFields[0][j], new FloatWritable(min[j]));
  59.                         }
  60.                         if (max[j] < iTmp) { /** for max() */
  61.                                 max[j] = iTmp;
  62.                                 context.write(iFields[1][j], new FloatWritable(max[j]));
  63.                         }
  64.                         context.write(iFields[2][j], new FloatWritable(iTmp));
  65.                         /** for sum() */
  66.                         context.write(iFields[3][j], new FloatWritable(iTmp * iTmp));/** for sum2() */
  67.                 }
  68.                 context.write(iwCnt, one); /** for cnt() */

  69.         }

  70. }
复制代码



reducer端:



  1. package com.xiaohong.TongJi;

  2. import java.io.IOException;

  3. import org.apache.hadoop.io.FloatWritable;
  4. import org.apache.hadoop.io.IntWritable;
  5. import org.apache.hadoop.mapreduce.Reducer;

  6. public class TongJiReducer extends Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>
  7.                 implements Parameter {

  8.         @Override
  9.         protected void reduce(IntWritable K, Iterable<FloatWritable> values,
  10.                         Reducer<IntWritable, FloatWritable, IntWritable, FloatWritable>.Context context)
  11.                         throws IOException, InterruptedException {

  12.                 float min = iMax;
  13.                 float max = iMin;
  14.                 float sum = 0;
  15.                 long iCnt = 0;
  16.                 int iCategory = K.get() / LenFields; // restore the category from Keys.
  17.                 switch (iCategory) {
  18.                 case 0:/** min() */
  19.                         for (FloatWritable value : values) {
  20.                                 if (min > value.get()) {
  21.                                         min = value.get();
  22.                                 }
  23.                         }
  24.                         context.write(K, new FloatWritable(min));
  25.                         break;
  26.                 case 1:/** max() */
  27.                         for (FloatWritable value : values) {
  28.                                 if (max < value.get()) {
  29.                                         max = value.get();
  30.                                 }
  31.                         }
  32.                         context.write(K, new FloatWritable(max));
  33.                         break;
  34.                 case 2:/** sum() */
  35.                         for (FloatWritable value : values) {
  36.                                 sum += value.get();
  37.                         }
  38.                         context.write(K, new FloatWritable(sum));
  39.                         break;
  40.                 case 3: /** sum2() */
  41.                         for (FloatWritable value : values) {
  42.                                 sum += value.get();
  43.                         }
  44.                         context.write(K, new FloatWritable(sum));
  45.                         break;
  46.                 case 4:
  47.                         for (FloatWritable value : values) {
  48.                                 iCnt += value.get();
  49.                         }
  50.                         context.write(K, new FloatWritable(iCnt));
  51.                         break;
  52.                 } // switch

  53.         }

  54. }
复制代码



yarn客户端:


  1. package com.xiaohong.TongJi;

  2. import java.io.IOException;

  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.FloatWritable;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.mapreduce.Job;
  8. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  9. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  10. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  11. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

  12. public class JobSubMitter {

  13.         public static void main(String[] args) throws Exception {

  14.                 Configuration conf = new Configuration();
  15.                 Job job = Job.getInstance(conf);

  16.                 job.setJarByClass(JobSubMitter.class);

  17.                 job.setMapperClass(TongJiMapper.class);
  18.                 job.setReducerClass(TongJiReducer.class);

  19.                 job.setMapOutputKeyClass(IntWritable.class);
  20.                 job.setMapOutputValueClass(FloatWritable.class);

  21.                 job.setOutputKeyClass(IntWritable.class);
  22.                 job.setOutputValueClass(FloatWritable.class);

  23.                 job.setInputFormatClass(TextInputFormat.class);
  24.                 FileInputFormat.setInputPaths(job, new Path("d:/mrData/TongJi/input"));

  25.                 job.setOutputFormatClass(TextOutputFormat.class);
  26.                 FileOutputFormat.setOutputPath(job, new Path("d:/mrData/TongJi/output"));

  27.                 boolean res = job.waitForCompletion(true);
  28.                 System.exit(res ? 0 : 1);

  29.         }

  30. }
复制代码





问题:我想问下当reducer端执行到这句代码的时候"int iCategory = K.get()",怎么将K转换成0~4了呢?,K不是0~15吗,难道调用.get()方法有什么玄机吗?
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2024-4-20 07:29

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表