集团主站
欢迎来到成都达内官方网站!达内—美国上市公司 亿元级外企IT培训企业!
成都it培训哪家好
成都it培训哪家好
全国服务监督电话:15023458194  |   联系客服   |
当前位置:主页 > 培训课程 > 大数据 >

成都hadoop培训:Hadoop MR ETL离线项目

发布者: 成都达内     浏览次数:     发布时间:2019-04-04 10:40:36

成都hadoop培训:Hadoop MR ETL离线项目,1、需求:利用MR对日志进行清洗后交由Hive统计分析;2、步骤解析...

  成都hadoop培训:Hadoop MR ETL离线项目

  一、需求及步骤解析

  1、需求

  利用MR对日志进行清洗后交由Hive统计分析

  2、步骤解析

  1、自己造一份日志,包含(cdn,region,level,time,ip,domain,url、traffic)字段,且time、ip、domain、traffic变化,50M到100M大小

  2、编写MR程序对日志进行清洗

  3、清洗完后的日志移动到Hive外表的location上

  4、刷新Hive分区信息

  5、查询每个domain的traffic的总和

  6、利用Shell封装整个运行过程

  二、利用日志生成器生成日志并上传至HDFS

  日志生成器

  package com.ruoze.hadoop.utils;

  import java.io.File;

  import java.io.FileWriter;

  import java.io.IOException;

  import java.text.SimpleDateFormat;

  import java.util.Date;

  import java.util.Random;

  public class GenerateLogUtils {

  public static void main(String[] args) {

  for (int i =0 ;i<300000; i++){

  generateLog();

  }

  }

  private static String generateLog() {

  try {

  Random rd = new Random();

  Date date = randomDate("2019-01-01", "2019-01-31");

  String[] domainStr = new String[]{

  "v1.go2yd.com",

  "v2.go2yd.com",

  "v3.go2yd.com",

  "v4.go2yd.com",

  "v5.go2yd.com",

  };

  int domainNum = rd.nextInt(domainStr.length - 1);

  String[] trafficStr = new String[]{

  "136662",

  "785966",

  "987422",

  "975578",

  "154851",

  ""

  };

  int trafficNum = rd.nextInt(trafficStr.length - 1);

  StringBuilder builder = new StringBuilder();

  builder

  .append("baidu").append("\t")

  .append("CN").append("\t")

  .append("2").append("\t")

  .append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date)).append("\t")

  .append(getRandomIp()).append("\t")

  .append(domainStr[domainNum]).append("\t")

  .append("http://v1.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4").append("\t")

  .append(trafficStr[trafficNum]).append("\t");

  File file = new File("access.log");

  if (!file.exists()) {

  file.createNewFile();

  }

  FileWriter fileWriter = new FileWriter(file.getName(), true);

  fileWriter.write(builder.toString() + "\n");

  fileWriter.close();

  } catch (IOException e) {

  e.printStackTrace();

  }

  return "";

  }

  /**

  * 随机生成时间

  *

  * @param beginDate

  * @param endDate

  * @return

  */

  private static Date randomDate(String beginDate, String endDate) {

  try {

  SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");

  Date start = format.parse(beginDate);

  Date end = format.parse(endDate);

  if (start.getTime() >= end.getTime()) {

  return null;

  }

  long date = random(start.getTime(), end.getTime());

  return new Date(date);

  } catch (Exception e) {

  e.printStackTrace();

  }

  return null;

  }

  private static long random(long begin, long end) {

  long rtn = begin + (long) (Math.random() * (end - begin));

  if (rtn == begin || rtn == end) {

  return random(begin, end);

  }

  return rtn;

  }

  /**

  * 随机生成IP-----------------------------------------------------

  *

  * @return

  */

  public static String getRandomIp() {

  // ip范围

  int[][] range = {{607649792, 608174079},// 36.56.0.0-36.63.255.255

  {1038614528, 1039007743},// 61.232.0.0-61.237.255.255

  {1783627776, 1784676351},// 106.80.0.0-106.95.255.255

  {2035023872, 2035154943},// 121.76.0.0-121.77.255.255

  {2078801920, 2079064063},// 123.232.0.0-123.235.255.255

  {-1950089216, -1948778497},// 139.196.0.0-139.215.255.255

  {-1425539072, -1425014785},// 171.8.0.0-171.15.255.255

  {-1236271104, -1235419137},// 182.80.0.0-182.92.255.255

  {-770113536, -768606209},// 210.25.0.0-210.47.255.255

  {-569376768, -564133889}, // 222.16.0.0-222.95.255.255

  };

  Random rdint = new Random();

  int index = rdint.nextInt(10);

  String ip = num2ip(range[index][0] + new Random().nextInt(range[index][1] - range[index][0]));

  return ip;

  }

  /*

  * 将十进制转换成ip地址

  */

  public static String num2ip(int ip) {

  int[] b = new int[4];

  String x = "";

  b[0] = (int) ((ip >> 24) & 0xff);

  b[1] = (int) ((ip >> 16) & 0xff);

  b[2] = (int) ((ip >> 8) & 0xff);

  b[3] = (int) (ip & 0xff);

  x = Integer.toString(b[0]) + "." + Integer.toString(b[1]) + "." + Integer.toString(b[2]) + "." + Integer.toString(b[3]);

  return x;

  }

  }

  将access.log上传至HDFS路径

  hadoop fs -put access.log /g6/hadoop/accesslog/20190402/

  三、MR清洗

  1、编写清洗日志的LogUtils类

  package com.ruoze.hadoop.utils;

  import java.text.DateFormat;

  import java.text.ParseException;

  import java.text.SimpleDateFormat;

  import java.util.Locale;

  public class LogUtils {

  DateFormat sourceFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.ENGLISH);

  DateFormat targetFormat = new SimpleDateFormat("yyyyMMddHHmmss");

  /**

  * 日志文件解析,对内容进行字段的处理

  * 按\t分割

  */

  public String parse(String log) {

  String result = "";

  try {

  String[] splits = log.split("\t");

  String cdn = splits[0];

  String region = splits[1];

  String level = splits[2];

  String timeStr = splits[3];

  // String time = timeStr.substring(1, timeStr.length() - 7);

  String time = targetFormat.format(sourceFormat.parse(timeStr));

  String ip = splits[4];

  String domain = splits[5];

  String url = splits[6];

  String traffic = splits[7];

  StringBuilder builder = new StringBuilder("");

  builder.append(cdn).append("\t")

  .append(region).append("\t")

  .append(level).append("\t")

  .append(time).append("\t")

  .append(ip).append("\t")

  .append(domain).append("\t")

  .append(url).append("\t")

  .append(traffic);

  result = builder.toString();

  } catch (ParseException e) {

  e.printStackTrace();

  }

  return result;

  }

  }

  2、LogUtils的单元测试

  package com.ruoze.hadoop.utils;

  import org.junit.After;

  import org.junit.Before;

  import org.junit.Test;

  public class LogUtilsTest {

  private LogUtils utils;

  @Test

  public void LogUtilsTest() {

  String log = "baidu\tCN\t2\t2019-01-10 16:02:54\t121.77.143.199\tv2.go2yd.com\thttp://v3.go2yd.com/user_upload/1531633977627104fdecdc68fe7a2c4b96b2226fd3f4c.mp4_bd.mp4\t97557845";

  String result = utils.parse(log);

  System.out.println(result);

  }

  @Before

  public void setUp() {

  utils = new LogUtils();

  }

  @After

  public void trarDown() {

  utils = null;

  }

  }

  测试结果如图:

  成都hadoop培训:Hadoop MR ETL离线项目

  3、Mapper

  package com.ruoze.hadoop.mapreduce;

  import com.ruoze.hadoop.utils.LogUtils;

  import org.apache.commons.lang.StringUtils;

  import org.apache.hadoop.io.LongWritable;

  import org.apache.hadoop.io.NullWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.Mapper;

  import java.io.IOException;

  public class LogETLMapper extends Mapper{

  /**

  * 通过mapreduce框架的map方式进行数据清洗

  * 进来一条数据就按照我们的解析规则清洗完以后输出

  */

  @Override

  protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

  int length = value.toString().split("\t").length;

  String traffic = value.toString().split("\t")[7];

  if(length == 8 && traffic != null) {

  LogUtils utils = new LogUtils();

  String result = utils.parse(value.toString());

  if(StringUtils.isNotBlank(result)) {

  context.write(NullWritable.get(), new Text(result));

  }

  }

  }

  }

  4、Job

  package com.ruoze.hadoop.mapreduce;

  import org.apache.hadoop.conf.Configuration;

  import org.apache.hadoop.fs.FileSystem;

  import org.apache.hadoop.fs.Path;

  import org.apache.hadoop.io.NullWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.Job;

  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  public class LogETLDriver {

  public static void main(String[] args) throws Exception{

  if (args.length != 2) {

  System.err.println("please input 2 params: input output");

  System.exit(0);

  }

  String input = args[0];

  String output = args[1];

  //System.setProperty("hadoop.home.dir", "D:\\Hadoop\\hadoop-2.6.0-cdh5.7.0");

  Configuration configuration = new Configuration();

  FileSystem fileSystem = FileSystem.get(configuration);

  Path outputPath = new Path(output);

  if (fileSystem.exists(outputPath)) {

  fileSystem.delete(outputPath, true);

  }

  Job job = Job.getInstance(configuration);

  job.setJarByClass(LogETLDriver.class);

  job.setMapperClass(LogETLMapper.class);

  job.setMapOutputKeyClass(NullWritable.class);

  job.setMapOutputValueClass(Text.class);

  FileInputFormat.setInputPaths(job, new Path(input));

  FileOutputFormat.setOutputPath(job, new Path(output));

  job.waitForCompletion(true);

  }

  }

  以上程序编写后打包上传至服务器:

  [hadoop@hadoop000 lib]$ ll

  total 12

  -rw-r--r-- 1 hadoop hadoop 8754 Mar 29 22:38 hadoop-1.0.jar

  在HDFS上创建MR程序的输出路径:

  hadoop fs -mkdir -p /g6/hadoop/access/output/day=20190402

  四、创建Hive外表

  create external table g6_access (

  cdn string,

  region string,

  level string,

  time string,

  ip string,

  domain string,

  url string,

  traffic bigint

  ) partitioned by (day string)

  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

  LOCATION '/g6/hadoop/access/clear'

  因为MR程序每次运行时会删除输出路径,所以Hive的location不要指向输出路径,等MR跑完后将数据移动到location下。

  五、运行Hadoop MR程序进行测试

  1、运行MR

  hadoop jar /home/hadoop/lib/hadoop-1.0.jar com.ruoze.hadoop.mapreduce.LogETLDriver /g6/hadoop/accesslog/20190402/ /g6/hadoop/access/output/day=20190402

  2、将输出结果移动到Location下

  hadoop fs -mv /g6/hadoop/access/output/day=20190402 /g6/hadoop/access/clear

  3、刷新Hive分区(不刷新Hive是查询不到数据的)

  alter table g6_access add if not exists partition(day=20190402);

  4、Hive统计分析每个domain的traffic的总和

  hive (g6_hadoop)> select domain,count(*) from g6_access group by domain;

  Query ID = hadoop_20190402232525_4b5c6115-d9a4-4dbd-8cbd-768f298decb4

  Total jobs = 1

  Launching Job 1 out of 1

  Number of reduce tasks not specified. Estimated from input data size: 1

  In order to change the average load for a reducer (in bytes):

  set hive.exec.reducers.bytes.per.reducer=

  In order to limit the maximum number of reducers:

  set hive.exec.reducers.max=

  In order to set a constant number of reducers:

  set mapreduce.job.reduces=

  Starting Job = job_1554215624276_0002, Tracking URL = http://hadoop000:8088/proxy/application_1554215624276_0002/

  Kill Command = /home/hadoop/soul/app/hadoop-2.6.0-cdh5.7.0/bin/hadoop job -kill job_1554215624276_0002

  Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1

  2019-04-02 23:30:45,007 Stage-1 map = 0%, reduce = 0%

  2019-04-02 23:30:51,476 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.73 sec

  2019-04-02 23:30:57,940 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.77 sec

  MapReduce Total cumulative CPU time: 2 seconds 770 msec

  Ended Job = job_1554215624276_0002

  MapReduce Jobs Launched:

  Stage-Stage-1: Map: 1 Reduce: 1 Cumulative CPU: 2.77 sec HDFS Read: 44772154 HDFS Write: 76 SUCCESS

  Total MapReduce CPU Time Spent: 2 seconds 770 msec

  OK

  domain _c1

  v1.go2yd.com 74908

  v2.go2yd.com 74795

  v3.go2yd.com 75075

  v4.go2yd.com 75222

  Time taken: 21.612 seconds, Fetched: 4 row(s)

  六、shell封装整个流程

  g6_mr_etl.sh

  #/bin/bash

  source ~/.bash_profile

  if [ $# != 1 ] ; then

  echo "Usage: g6_mr_etl.sh "

  echo "E.g.: g6_mr_etl.sh 20190402"

  exit 1;

  fi

  process_date=$1

  echo -e "\033[36m###### step1:MR ETL ######\033[0m"

  hadoop jar /home/hadoop/lib/hadoop-1.0.jar com.ruoze.hadoop.mapreduce.LogETLDriver /g6/hadoop/accesslog/$process_date/ /g6/hadoop/access/output/day=$pro

  cess_date

  hive -e "use hive;

  create external table if not exists g6_access (

  cdn string,

  region string,

  level string,

  time string,

  ip string,

  domain string,

  url string,

  traffic bigint

  ) partitioned by (day string)

  ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'

  LOCATION '/g6/hadoop/access/clear' ;"

  echo -e "\033[36m###### step2:Mv Data to DW ###### \033[0m"

  hadoop fs -mv /g6/hadoop/access/output/day=$process_date /g6/hadoop/access/clear

  echo -e "\033[36m###### step3:Alter metadata ######\033[0m"

  database=g6_hadoop

  hive -e "use ${database}; alter table g6_access add if not exists partition(day=$proces

(责任编辑:范老师)
最新开班
  • 成都Java培训班
    免费试听名额发放中...
  • 成都C++培训班
    免费试听名额发放中...
  • 成都PHP培训班
    免费试听名额发放中...
  • 成都网络工程培训班
    免费试听名额发放中...
  • 成都Unity3D培训班
    免费试听名额发放中...
  • 成都大数据培训班
    免费试听名额发放中...
  • 成都uid培训班
    免费试听名额发放中...
  • 成都会计培训班
    免费试听名额发放中...
  • 成都Python培训班
    免费试听名额发放中...
  • 成都嵌入式培训班
    免费试听名额发放中...
  • 成都web培训班
    免费试听名额发放中...
  • 成都软件测试培训班
    免费试听名额发放中...
在线留言
提交

校区地址:绵阳市涪城区临园路东段68号富临大都会7栋3单元9层12号

联系电话:15023458194

公交路线:富乐路口凯德广场(10路;29路;3路;15路;11路;15a路;71路)

校区地址:成都市锦江区东大街紫东楼端35号明宇金融广场19楼1906室

联系电话:15023458194

公交路线:芷泉街(18路;21路;43路;48路;104路;152路;335路 ) 地铁路线:东门大桥站(地铁2号线)

校区地址:成都市高新区奥克斯广场蜀锦路209号一楼商铺

联系电话:15023458194

公交路线:益州大道锦城大道口(18路;21路;43路;48路;104路;152路;335路 ) 地铁路线:孵化园(地铁1号线)

校区地址:成都锦江区东大街芷泉街229号东方广场C座3楼303

联系电话:15023458194

公交路线:芷泉街(188路;115路;515路;236路;505路;501路;84路 ) 地铁路线:东门大桥站(地铁2号线)

校区地址:成都市武侯区佳灵路3号红牌楼广场2号写字楼11楼1115号

联系电话:15023458194

公交路线:红牌楼东(11路;92路;100路;111路;139路;g28路;快速公交K1/K2) 地铁路线:红牌楼站(地铁3号线)

校区地址:成都市锦江区红星路二段70号四川日报大厦502-2

联系电话:15023458194

公交路线:市二医院站(6路;49路;102路;5路;37路;g92路;) 地铁路线:地铁市二医院(地铁3号线)

校区地址:成都市锦江区东大街芷泉段229号东方广场C座16层

联系电话:15023458194

公交路线:芷泉街(18路;21路;43路;48路;104路;152路;335路 ) 地铁路线:东门大桥站(地铁2号线)

校区地址:四川省成都市武侯区高新科技孵化园9号园区E座7楼

联系电话:15023458194

公交路线:益州大道锦城大道口(18路;21路;43路;48路;104路;152路;335路 ) 地铁路线:孵化园(地铁1号线)

校区地址:成都市人民南路一段86号“城市之心”大厦26楼

联系电话:15023458194

公交路线:成都市人民南路(6路;14路;42路;72路;76路;1010路;)

校区地址:成都市高新区奥克斯广场B座1708

联系电话:15023458194

公交路线:益州大道锦城大道口(18路;21路;43路;48路;104路;152路;335路 ) 地铁路线:孵化园(地铁1号线)

了解达内动态
关注成都达内教育公众号

首页 | 关于达内 | 课程中心 | 专家师资 | 视频教程 | 学员空间 | 校企合作 | 新闻资讯 | 就业指导 | 网站地图

2016-2025 达内时代科技集团有限公司 版权所有 京ICP证8000853号-56