Apache Gora™ 服务于大数据的内存模型框架

服务于大数据的内存模型 Apache Gora - In-Memory Model for Big Data

Apache Gora提供服务于大数据的内存数据模型in-memory data model并且能够持久化保存的框架,,支持NoSQL系列的列存储键值存储或文本存储以及RDBMSs, 可以使用 Apache Hadoop MapReduce 支持插件进行数据分析。

Gora其实是一个类似Hibernate的ORM框架,但是不只是支持关系数据库,更重要支持NoSQL之类大数据的存储。

特点:
1.数据持久Data Persistence : 将内存中对象持久保存到列存储HBase, Cassandra, Hypertable; 键值存储 Voldermort, Redis等SQL databases 如 MySQL, HSQLDB, 本地文件系统如 Hadoop HDFS.

2.数据访问Data Access : 可以使用Java友好通用API 本地或远程访问

3.索引Indexing : 可以将对象持久保存到Lucene 和 Solr 索引库,使用Gora API访问或查询数据

4.分析Analysis : 访问数据并进行分析,通过适配器如Apache Pig, Apache Hive 和 Cascading

Gora下载地址

案例使用

评:Gora好像一个数据中心的API。
[该贴被banq于2012-12-12 15:12修改过]

在其
案例教程中是使用日志分析作为案例。

假设有如下Web网站访问日志:
88.254.190.73 - - [10/Mar/2009:20:40:26 +0200] "GET / HTTP/1.1" 200 43 "http://www.buldinle.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; GTB5; .NET CLR 2.0.50727; InfoPath.2)"

使用org.apache.gora.tutorial.log.LogManager将其转换成Gora的数据模型格式,模型的主键是日志的行数,值是Pageview对象。其schema如下:


{
"type": "record",
"name": "Pageview",
"namespace": "org.apache.gora.tutorial.log.generated",
"fields" : [
{
"name": "url", "type": "string"},
{
"name": "timestamp", "type": "long"},
{
"name": "ip", "type": "string"},
{
"name": "httpMethod", "type": "string"},
{
"name": "httpStatusCode", "type": "int"},
{
"name": "responseSize", "type": "int"},
{
"name": "referrer", "type": "string"},
{
"name": "userAgent", "type": "string"}
]
}

将数据通过LogManager的parse方法转换到DataStore下:


public class LogManager {
..
private DataStore<Long, Pageview> dataStore;
...
}

dataStore是通过
DataStoreFactory.
getDataStore(dataStoreClass, Long.class, Pageview.class, conf);获得,在任何地方要获得转换后的数据,只要通过DataStoreFactory的getDataStore方法获得即可。

比如org.apache.gora.tutorial.log.LogAnalytics是日志分析代码,通过读取上述日志文件,通过Map/Reduce计算出每天的PageView访问量。


public class LogAnalytics extends Configured implements Tool {

private static final Logger log = LoggerFactory.getLogger(LogAnalytics.class);

/** The number of miliseconds in a day */
private static final long DAY_MILIS = 1000 * 60 * 60 * 24;

/**
* The Mapper takes Long keys and Pageview objects, and emits
* tuples of <url, day> as keys and 1 as values. Input values are
* read from the input data store.
* Note that all Hadoop serializable classes can be used as map output key and value.
*/

public static class LogAnalyticsMapper
extends GoraMapper<Long, Pageview, TextLong, LongWritable> {

private LongWritable one = new LongWritable(1L);

private TextLong tuple;

@[author]Override[/author]
protected void setup(Context context) throws IOException ,InterruptedException {
tuple = new TextLong();
tuple.setKey(new Text());
tuple.setValue(new LongWritable());
};

@[author]Override[/author]
protected void map(Long key, Pageview pageview, Context context)
throws IOException ,InterruptedException {

Utf8 url = pageview.getUrl();
long day = getDay(pageview.getTimestamp());

tuple.getKey().set(url.toString());
tuple.getValue().set(day);

context.write(tuple, one);
};

/** Rolls up the given timestamp to the day cardinality, so that
* data can be aggregated daily */

private long getDay(long timeStamp) {
return (timeStamp / DAY_MILIS) * DAY_MILIS;
}
}

/**
* The Reducer receives tuples of <url, day> as keys and a list of
* values corresponding to the keys, and emits a combined keys and
* {@link MetricDatum} objects. The metric datum objects are stored
* as job outputs in the output data store.
*/

public static class LogAnalyticsReducer
extends GoraReducer<TextLong, LongWritable, String, MetricDatum> {

private MetricDatum metricDatum = new MetricDatum();

@[author]Override[/author]
protected void reduce(TextLong tuple
, Iterable<LongWritable> values, Context context)
throws IOException ,InterruptedException {

long sum = 0L;
//sum up the values
for(LongWritable value: values) {
sum+= value.get();
}

String dimension = tuple.getKey().toString();
long timestamp = tuple.getValue().get();

metricDatum.setMetricDimension(new Utf8(dimension));
metricDatum.setTimestamp(timestamp);

String key = metricDatum.getMetricDimension().toString();
key +=
"_" + Long.toString(timestamp);
metricDatum.setMetric(sum);

context.write(key, metricDatum);
};
}

/**
* Creates and returns the {@link Job} for submitting to Hadoop mapreduce.
* @param dataStore
* @param query
* @return
* @throws IOException
*/

public Job createJob(DataStore<Long, Pageview> inStore
, DataStore<String, MetricDatum> outStore, int numReducer) throws IOException {
Job job = new Job(getConf());

job.setJobName(
"Log Analytics");
job.setNumReduceTasks(numReducer);
job.setJarByClass(getClass());

/* Mappers are initialized with GoraMapper.initMapper() or
* GoraInputFormat.setInput()*/

GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class
, LogAnalyticsMapper.class, true);

/* Reducers are initialized with GoraReducerinitReducer().
* If the output is not to be persisted via Gora, any reducer
* can be used instead. */

GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class);

return job;
}

@[author]Override[/author]
public int run(String[] args) throws Exception {

DataStore<Long, Pageview> inStore;
DataStore<String, MetricDatum> outStore;
Configuration conf = new Configuration();

if(args.length > 0) {
String dataStoreClass = args[0];
inStore = DataStoreFactory.
getDataStore(dataStoreClass, Long.class, Pageview.class, conf);
if(args.length > 1) {
dataStoreClass = args[1];
}
outStore = DataStoreFactory.
getDataStore(dataStoreClass,
String.class, MetricDatum.class, conf);
} else {
inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf);
outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf);
}

Job job = createJob(inStore, outStore, 3);
boolean success = job.waitForCompletion(true);

inStore.close();
outStore.close();

log.info(
"Log completed with " + (success ? "success" : "failure"));

return success ? 0 : 1;
}

public static void main(String[] args) throws Exception {
//run as any other MR job
int ret = ToolRunner.run(new LogAnalytics(), args);
System.exit(ret);
}

}