Apache Gora™ 服务于大数据的内存模型框架

12-12-12 banq
服务于大数据的内存模型 Apache Gora - In-Memory Model for Big Data

Apache Gora提供服务于大数据的内存数据模型in-memory data model并且能够持久化保存的框架,,支持NoSQL系列的列存储键值存储或文本存储以及RDBMSs, 可以使用 Apache Hadoop MapReduce 支持插件进行数据分析。

Gora其实是一个类似Hibernate的ORM框架,但是不只是支持关系数据库,更重要支持NoSQL之类大数据的存储。

特点:
1.数据持久Data Persistence : 将内存中对象持久保存到列存储HBase, Cassandra, Hypertable; 键值存储 Voldermort, Redis等SQL databases 如 MySQL, HSQLDB, 本地文件系统如 Hadoop HDFS.

2.数据访问Data Access : 可以使用Java友好通用API 本地或远程访问

3.索引Indexing : 可以将对象持久保存到Lucene 和 Solr 索引库,使用Gora API访问或查询数据

4.分析Analysis : 访问数据并进行分析,通过适配器如Apache Pig, Apache Hive 和 Cascading

Gora下载地址

案例使用

评:Gora好像一个数据中心的API。

[该贴被banq于2012-12-12 15:12修改过]

4
banq
2012-12-12 15:22
在其

案例教程中是使用日志分析作为案例。

假设有如下Web网站访问日志:
88.254.190.73 - - [10/Mar/2009:20:40:26 +0200] "GET / HTTP/1.1" 200 43 "http://www.buldinle.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; GTB5; .NET CLR 2.0.50727; InfoPath.2)"

使用org.apache.gora.tutorial.log.LogManager将其转换成Gora的数据模型格式,模型的主键是日志的行数,值是Pageview对象。其schema如下:

 {
  "type": "record",
  "name": "Pageview",
  "namespace": "org.apache.gora.tutorial.log.generated",
  "fields" : [
    {"name": "url", "type": "string"},
    {"name": "timestamp", "type": "long"},
    {"name": "ip", "type": "string"},
    {"name": "httpMethod", "type": "string"},
    {"name": "httpStatusCode", "type": "int"},
    {"name": "responseSize", "type": "int"},
    {"name": "referrer", "type": "string"},
    {"name": "userAgent", "type": "string"}
  ]
}
<p class="indent">


将数据通过LogManager的parse方法转换到DataStore下:

public class LogManager {
  ..
  private DataStore<Long, Pageview> dataStore; 
   ... 
}
<p class="indent">


dataStore是通过
DataStoreFactory.
getDataStore(dataStoreClass, Long.class, Pageview.class, conf);获得,在任何地方要获得转换后的数据,只要通过DataStoreFactory的getDataStore方法获得即可。

比如org.apache.gora.tutorial.log.LogAnalytics是日志分析代码,通过读取上述日志文件,通过Map/Reduce计算出每天的PageView访问量。

public class LogAnalytics extends Configured implements Tool {

  private static final Logger log = LoggerFactory.getLogger(LogAnalytics.class);
  
  /** The number of miliseconds in a day */
  private static final long DAY_MILIS = 1000 * 60 * 60 * 24;
    
  /**
   * The Mapper takes Long keys and Pageview objects, and emits 
   * tuples of <url, day> as keys and 1 as values. Input values are 
   * read from the input data store.
   * Note that all Hadoop serializable classes can be used as map output key and value.
   */
  public static class LogAnalyticsMapper 
    extends GoraMapper<Long, Pageview, TextLong, LongWritable> {
    
    private LongWritable one = new LongWritable(1L);
  
    private TextLong tuple;
    
    @[author]Override[/author]
    protected void setup(Context context) throws IOException ,InterruptedException {
      tuple = new TextLong();
      tuple.setKey(new Text());
      tuple.setValue(new LongWritable());
    };
    
    @[author]Override[/author]
    protected void map(Long key, Pageview pageview, Context context) 
      throws IOException ,InterruptedException {
      
      Utf8 url = pageview.getUrl();
      long day = getDay(pageview.getTimestamp());
      
      tuple.getKey().set(url.toString());
      tuple.getValue().set(day);
      
      context.write(tuple, one);
    };
    
    /** Rolls up the given timestamp to the day cardinality, so that 
     * data can be aggregated daily */
    private long getDay(long timeStamp) {
      return (timeStamp / DAY_MILIS) * DAY_MILIS; 
    }
  }
  
  /**
   * The Reducer receives tuples of <url, day> as keys and a list of 
   * values corresponding to the keys, and emits a combined keys and
   * {@link MetricDatum} objects. The metric datum objects are stored 
   * as job outputs in the output data store.
   */
  public static class LogAnalyticsReducer 
    extends GoraReducer<TextLong, LongWritable, String, MetricDatum> {
    
    private MetricDatum metricDatum = new MetricDatum();
    
    @[author]Override[/author]
    protected void reduce(TextLong tuple
        , Iterable<LongWritable> values, Context context) 
      throws IOException ,InterruptedException {
      
      long sum = 0L; //sum up the values
      for(LongWritable value: values) {
        sum+= value.get();
      }
      
      String dimension = tuple.getKey().toString();
      long timestamp = tuple.getValue().get();
      
      metricDatum.setMetricDimension(new Utf8(dimension));
      metricDatum.setTimestamp(timestamp);
      
      String key = metricDatum.getMetricDimension().toString();
      key += "_" + Long.toString(timestamp);
      metricDatum.setMetric(sum);
      
      context.write(key, metricDatum);
    };
  }
  
  /**
   * Creates and returns the {@link Job} for submitting to Hadoop mapreduce.
   * @param dataStore
   * @param query
   * @return
   * @throws IOException
   */
  public Job createJob(DataStore<Long, Pageview> inStore
      , DataStore<String, MetricDatum> outStore, int numReducer) throws IOException {
    Job job = new Job(getConf());

    job.setJobName("Log Analytics");
    job.setNumReduceTasks(numReducer);
    job.setJarByClass(getClass());

    /* Mappers are initialized with GoraMapper.initMapper() or 
     * GoraInputFormat.setInput()*/
    GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class
        , LogAnalyticsMapper.class, true);

    /* Reducers are initialized with GoraReducerinitReducer().
     * If the output is not to be persisted via Gora, any reducer 
     * can be used instead. */
    GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);
    
    return job;
  }
  
  @[author]Override[/author]
  public int run(String[] args) throws Exception {
    
    DataStore<Long, Pageview> inStore;
    DataStore<String, MetricDatum> outStore;
    Configuration conf = new Configuration();    

    if(args.length > 0) {
      String dataStoreClass = args[0];
      inStore = DataStoreFactory.
          getDataStore(dataStoreClass, Long.class, Pageview.class, conf);
      if(args.length > 1) {
        dataStoreClass = args[1];
      }
      outStore = DataStoreFactory.
          getDataStore(dataStoreClass, 
			 String.class, MetricDatum.class, conf);
    } else {
	inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);
	outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);
    }
    
    Job job = createJob(inStore, outStore, 3);
    boolean success = job.waitForCompletion(true);
    
    inStore.close();
    outStore.close();
    
    log.info("Log completed with " + (success ? "success" : "failure"));
    
    return success ? 0 : 1;
  }
  
  public static void main(String[] args) throws Exception {
    //run as any other MR job
    int ret = ToolRunner.run(new LogAnalytics(), args);
    System.exit(ret);
  }
  
}
<p class="indent">

猜你喜欢