使用的工具:三台linux、hadoop-1.1.2、jdk1.7.0_45、Xmanager Enterprise 4、eclipse、

目标统计:pv、uv

对日志字段进行分析

每行记录有5部分组成:

  1. 访问ip 2.访问时间 3.访问资源 4.访问状态 5.本次流量

     

 

先对日志进行清理

mapreduce程序

package hmbbs;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Locale;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class HmbbsCleaner extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {final Job job = new Job(new Configuration(),HmbbsCleaner.class.getSimpleName());job.setJarByClass(HmbbsCleaner.class);FileInputFormat.setInputPaths(job, args[0]);job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(LongWritable.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);return 0;}public static void main(String[] args) throws Exception {ToolRunner.run(new HmbbsCleaner(), args);}static class MyMapper extendsMapper
 {LogParser logParser = new LogParser();Text v2 = new Text();protected void map(LongWritable key,Text value,org.apache.hadoop.mapreduce.Mapper
.Context context)throws java.io.IOException, InterruptedException {final String[] parsed = logParser.parse(value.toString());// if (parsed[2].startsWith("GET /static/")|| parsed[2].startsWith("GET /uc_server")) {return;}// if (parsed[2].startsWith("GET /")) {parsed[2] = parsed[2].substring("GET /".length());} else if (parsed[2].startsWith("POST /")) {parsed[2] = parsed[2].substring("POST /".length());}// if (parsed[2].endsWith(" HTTP/1.1")) {parsed[2] = parsed[2].substring(0, parsed[2].length()- " HTTP/1.1".length());}v2.set(parsed[0] + "\t" + parsed[1] + "\t" + parsed[2]);context.write(key, v2);};}static class MyReducer extendsReducer
 {protected void reduce(LongWritable k2,java.lang.Iterable
 v2s,org.apache.hadoop.mapreduce.Reducer
.Context context)throws java.io.IOException, InterruptedException {for (Text v2 : v2s) {context.write(v2, NullWritable.get());}};}static class LogParser {public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MMM/yyyy:HH:mm:ss", Locale.ENGLISH);public static final SimpleDateFormat dateformat1 = new SimpleDateFormat("yyyyMMddHHmmss");public static void main(String[] args) throws ParseException {final String S1 = "27.19.74.143 - - [30/May/2013:17:38:20 +0800] \"GET /static/p_w_picpath/common/faq.gif HTTP/1.1\" 200 1127";LogParser parser = new LogParser();final String[] array = parser.parse(S1);System.out.println( S1);System.out.format(" ip=%s, time=%s, url=%s, status=%s, traffic=%s",array[0], array[1], array[2], array[3], array[4]);}/** *  *  * @param string * @return * @throws ParseException */private Date parseDateFormat(String string) {Date parse = null;try {parse = FORMAT.parse(string);} catch (ParseException e) {e.printStackTrace();}return parse;}/** * *  * @param line * @return  */public String[] parse(String line) {String ip = parseIP(line);String time = parseTime(line);String url = parseURL(line);String status = parseStatus(line);String traffic = parseTraffic(line);return new String[] { ip, time, url, status, traffic };}private String parseTraffic(String line) {final String trim = line.substring(line.lastIndexOf("\"") + 1).trim();String traffic = trim.split(" ")[1];return traffic;}private String parseStatus(String line) {final String trim = line.substring(line.lastIndexOf("\"") + 1).trim();String status = trim.split(" ")[0];return status;}private String parseURL(String line) {final int first = line.indexOf("\"");final int last = line.lastIndexOf("\"");String url = line.substring(first + 1, last);return url;}private String parseTime(String line) {final int first = line.indexOf("[");final int last = line.indexOf("+0800]");String time = line.substring(first + 1, last).trim();Date date = parseDateFormat(time);return dateformat1.format(date);}private String parseIP(String line) {String ip = line.split("- -")[0].trim();return ip;}}}

统计pv的mapreduce

清洗后的数据以我自定义以\t为隔,所以

String[] arr = value.toString().split("\t");
package hmbbs;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Locale;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class KPIPV extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {final Job job = new Job(new Configuration(),KPIPV.class.getSimpleName());job.setJarByClass(KPIPV.class);FileInputFormat.setInputPaths(job, args[0]);job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);return 0;}public static void main(String[] args) throws Exception {ToolRunner.run(new KPIPV(), args);}static class MyMapper extends Mapper
 {protected void map(LongWritable key,Text value,org.apache.hadoop.mapreduce.Mapper
.Context context)throws java.io.IOException, InterruptedException {Text v1 = new Text();// 每行以制表符\t分隔String[] arr = value.toString().split("\t");// 每行请求不为空if (arr.length >= 0) {v1.set("1");} else {v1.set("0");}context.write(new Text("pv"), v1);};}static class MyReducer extends Reducer
 {private IntWritable result = new IntWritable(0);private Integer value2 = new Integer(0);protected void reduce(Text k2,java.lang.Iterable
 v2s,org.apache.hadoop.mapreduce.Reducer
.Context context)throws java.io.IOException, InterruptedException {for (Text v2 : v2s) {value2 += Integer.parseInt((v2.toString().trim().equals("1")) ? "1": "0");}result.set(value2);context.write(k2, result);};}}

统计UV,分析思路是不管某个IP点击了多少次,我们都只统计一次,看用户量多少。

所以,我先写一个mapreduce统计各IP点击次数,然后再写一个mapreduce统计PV。这样相当于前一个mapreduce为后个mapreduce做清洗。

如果你不想用前面已经清洗过的数据,你也可以直接用原日志来清洗。原日志是字段间是以空字符串来隔开的。

统计各IP点击次数

package hmbbs;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class KPIUV_FOUR extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {final Job job = new Job(new Configuration(),KPIUV_FOUR.class.getSimpleName());job.setJarByClass(KPIUV_FOUR.class);// FileInputFormat.setInputPaths(job, args[0]);FileInputFormat.setInputPaths(job,"hdfs://192.168.14.132:9000/chen/fa/");job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// FileOutputFormat.setOutputPath(job, new Path(args[1]));FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.14.132:9000/chen/fa/uv_1"));job.waitForCompletion(true);return 0;}public static void main(String[] args) throws Exception {ToolRunner.run(new KPIUV_FOUR(), args);}static class MyMapper extends Mapper
 {Text key1 = new Text();Text v1 = new Text();protected void map(LongWritable key,Text value,org.apache.hadoop.mapreduce.Mapper
.Context context)throws java.io.IOException, InterruptedException {// 每行以空格分隔String[] arr = value.toString().split(" ");for (String a : arr) {System.out.println(a);}if (arr.length >= 2) {if (arr[0].matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) {key1.set(arr[0]);}}v1.set("1");System.out.println("s1的值是" + key1);context.write(key1, v1);};}static class MyReducer extends Reducer
 {// Map
> map = new HashMap
>();protected void reduce(Text k2,java.lang.Iterable
 v2s,org.apache.hadoop.mapreduce.Reducer
.Context context)throws java.io.IOException, InterruptedException {Text result = new Text();Integer re = new Integer(0);System.out.println(k2 + "k2的值");ArrayList
 ar = new ArrayList
();while (v2s.iterator().hasNext()) {// ar.add(v2s.iterator().next());re += Integer.parseInt(v2s.iterator().next().toString());}// map.put(k2, ar);// re = ar.size();result.set(String.valueOf(re));context.write(k2, result);};}}

还有种方法是因为可以用Map思想,Map<String,List<int>这种我们只要统计Map中value中list的大小。

不过用Map的话感觉多此一举。还不如用原方法设置变量的时用局部变量来统计。

把生成的结果导到别一文件里。

[grid@hadoop2 ~]$ hadoop-1.1.2/bin/hadoop fs -mkdir /chen/csl[grid@hadoop2 ~]$ hadoop-1.1.2/bin/hadoop fs -mv /chen/fa/uv_1/part-r-00000 /chen/csl

然后再写一个mapreduce统计uv

package hmbbs;import hmbbs.KPIUV_FOUR.MyMapper;import hmbbs.KPIUV_FOUR.MyReducer;import java.util.ArrayList;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class KPIUV_FIVE extends Configured implements Tool {@Overridepublic int run(String[] args) throws Exception {final Job job = new Job(new Configuration(),KPIUV_FIVE.class.getSimpleName());job.setJarByClass(KPIUV_FIVE.class);// FileInputFormat.setInputPaths(job, args[0]);FileInputFormat.setInputPaths(job,"hdfs://192.168.14.132:9000/chen/csl/");job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);job.setReducerClass(MyReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);// FileOutputFormat.setOutputPath(job, new Path(args[1]));FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.14.132:9000/chen/csl/uv_1"));job.waitForCompletion(true);return 0;}public static void main(String[] args) throws Exception {ToolRunner.run(new KPIUV_FIVE(), args);}static class MyMapper extends Mapper
 {Text key1 = new Text();Text v1 = new Text();protected void map(LongWritable key,Text value,org.apache.hadoop.mapreduce.Mapper
.Context context)throws java.io.IOException, InterruptedException {// 每行以空格分隔String[] arr = value.toString().split("\t");for (String a : arr) {System.out.println(a);}if (arr.length == 2) {if (arr[0].matches("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}")) {key1.set(arr[0]);}}v1.set("1");System.out.println("s1的值是" + key1);context.write(new Text("uv"), v1);};}static class MyReducer extends Reducer
 {Text result = new Text();Integer re = new Integer(0);protected void reduce(Text k2,java.lang.Iterable
 v2s,org.apache.hadoop.mapreduce.Reducer
.Context context)throws java.io.IOException, InterruptedException {System.out.println(k2 + "k2的值");while (v2s.iterator().hasNext()) {re += Integer.parseInt(v2s.iterator().next().toString());}result.set(String.valueOf(re));context.write(k2, result);};}}

结果

[grid@hadoop2 ~]$ hadoop-1.1.2/bin/hadoop fs -cat /chen/csl/uv_1/part-r-00000 uv  10509