public class LogAnalytics extends Configured implements Tool {
private static final Logger log = LoggerFactory.getLogger(LogAnalytics.class); /** The number of miliseconds in a day */ private static final long DAY_MILIS = 1000 * 60 * 60 * 24; /** * The Mapper takes Long keys and Pageview objects, and emits * tuples of <url, day> as keys and 1 as values. Input values are * read from the input data store. * Note that all Hadoop serializable classes can be used as map output key and value. */ public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong, LongWritable> { private LongWritable one = new LongWritable(1L); private TextLong tuple; @[author]Override[/author] protected void setup(Context context) throws IOException ,InterruptedException { tuple = new TextLong(); tuple.setKey(new Text()); tuple.setValue(new LongWritable()); }; @[author]Override[/author] protected void map(Long key, Pageview pageview, Context context) throws IOException ,InterruptedException { Utf8 url = pageview.getUrl(); long day = getDay(pageview.getTimestamp()); tuple.getKey().set(url.toString()); tuple.getValue().set(day); context.write(tuple, one); }; /** Rolls up the given timestamp to the day cardinality, so that * data can be aggregated daily */ private long getDay(long timeStamp) { return (timeStamp / DAY_MILIS) * DAY_MILIS; } } /** * The Reducer receives tuples of <url, day> as keys and a list of * values corresponding to the keys, and emits a combined keys and * {@link MetricDatum} objects. The metric datum objects are stored * as job outputs in the output data store. */ public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable, String, MetricDatum> { private MetricDatum metricDatum = new MetricDatum(); @[author]Override[/author] protected void reduce(TextLong tuple , Iterable<LongWritable> values, Context context) throws IOException ,InterruptedException { long sum = 0L; //sum up the values for(LongWritable value: values) { sum+= value.get(); } String dimension = tuple.getKey().toString(); long timestamp = tuple.getValue().get(); metricDatum.setMetricDimension(new Utf8(dimension)); metricDatum.setTimestamp(timestamp); String key = metricDatum.getMetricDimension().toString(); key += "_" + Long.toString(timestamp); metricDatum.setMetric(sum); context.write(key, metricDatum); }; } /** * Creates and returns the {@link Job} for submitting to Hadoop mapreduce. * @param dataStore * @param query * @return * @throws IOException */ public Job createJob(DataStore<Long, Pageview> inStore , DataStore<String, MetricDatum> outStore, int numReducer) throws IOException { Job job = new Job(getConf());
job.setJobName("Log Analytics"); job.setNumReduceTasks(numReducer); job.setJarByClass(getClass());
/* Mappers are initialized with GoraMapper.initMapper() or * GoraInputFormat.setInput()*/ GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class , LogAnalyticsMapper.class, true);
/* Reducers are initialized with GoraReducerinitReducer(). * If the output is not to be persisted via Gora, any reducer * can be used instead. */ GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class); return job; } @[author]Override[/author] public int run(String[] args) throws Exception { DataStore<Long, Pageview> inStore; DataStore<String, MetricDatum> outStore; Configuration conf = new Configuration();
if(args.length > 0) { String dataStoreClass = args[0]; inStore = DataStoreFactory. getDataStore(dataStoreClass, Long.class, Pageview.class, conf); if(args.length > 1) { dataStoreClass = args[1]; } outStore = DataStoreFactory. getDataStore(dataStoreClass, String.class, MetricDatum.class, conf); } else { inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf); outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf); } Job job = createJob(inStore, outStore, 3); boolean success = job.waitForCompletion(true); inStore.close(); outStore.close(); log.info("Log completed with " + (success ? "success" : "failure")); return success ? 0 : 1; } public static void main(String[] args) throws Exception { //run as any other MR job int ret = ToolRunner.run(new LogAnalytics(), args); System.exit(ret); } }
|