Apache Gora™ 服务于大数据的内存模型框架
12-12-12
banq
服务于大数据的内存模型 Apache Gora - In-Memory Model for Big Data
Apache Gora提供服务于大数据的内存数据模型in-memory data model并且能够持久化保存的框架,,支持NoSQL系列的列存储键值存储或文本存储以及RDBMSs, 可以使用 Apache Hadoop MapReduce 支持插件进行数据分析。
Gora其实是一个类似Hibernate的ORM框架,但是不只是支持关系数据库,更重要支持NoSQL之类大数据的存储。
特点:
1.数据持久Data Persistence : 将内存中对象持久保存到列存储HBase, Cassandra, Hypertable; 键值存储 Voldermort, Redis等SQL databases 如 MySQL, HSQLDB, 本地文件系统如 Hadoop HDFS.
2.数据访问Data Access : 可以使用Java友好通用API 本地或远程访问
3.索引Indexing : 可以将对象持久保存到Lucene 和 Solr 索引库,使用Gora API访问或查询数据
4.分析Analysis : 访问数据并进行分析,通过适配器如Apache Pig, Apache Hive 和 Cascading
Apache Gora提供服务于大数据的内存数据模型in-memory data model并且能够持久化保存的框架,,支持NoSQL系列的列存储键值存储或文本存储以及RDBMSs, 可以使用 Apache Hadoop MapReduce 支持插件进行数据分析。
Gora其实是一个类似Hibernate的ORM框架,但是不只是支持关系数据库,更重要支持NoSQL之类大数据的存储。
特点:
1.数据持久Data Persistence : 将内存中对象持久保存到列存储HBase, Cassandra, Hypertable; 键值存储 Voldermort, Redis等SQL databases 如 MySQL, HSQLDB, 本地文件系统如 Hadoop HDFS.
2.数据访问Data Access : 可以使用Java友好通用API 本地或远程访问
3.索引Indexing : 可以将对象持久保存到Lucene 和 Solr 索引库,使用Gora API访问或查询数据
4.分析Analysis : 访问数据并进行分析,通过适配器如Apache Pig, Apache Hive 和 Cascading
案例使用
评:Gora好像一个数据中心的API。
[该贴被banq于2012-12-12 15:12修改过]
banq
2012-12-12 15:22
在其
将数据通过LogManager的parse方法转换到DataStore下:
dataStore是通过
DataStoreFactory.
getDataStore(dataStoreClass, Long.class, Pageview.class, conf);获得,在任何地方要获得转换后的数据,只要通过DataStoreFactory的getDataStore方法获得即可。
比如org.apache.gora.tutorial.log.LogAnalytics是日志分析代码,通过读取上述日志文件,通过Map/Reduce计算出每天的PageView访问量。
案例教程中是使用日志分析作为案例。
假设有如下Web网站访问日志:
88.254.190.73 - - [10/Mar/2009:20:40:26 +0200] "GET / HTTP/1.1" 200 43 "http://www.buldinle.com/" "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; GTB5; .NET CLR 2.0.50727; InfoPath.2)"
使用org.apache.gora.tutorial.log.LogManager将其转换成Gora的数据模型格式,模型的主键是日志的行数,值是Pageview对象。其schema如下:
{ "type": "record", "name": "Pageview", "namespace": "org.apache.gora.tutorial.log.generated", "fields" : [ {"name": "url", "type": "string"}, {"name": "timestamp", "type": "long"}, {"name": "ip", "type": "string"}, {"name": "httpMethod", "type": "string"}, {"name": "httpStatusCode", "type": "int"}, {"name": "responseSize", "type": "int"}, {"name": "referrer", "type": "string"}, {"name": "userAgent", "type": "string"} ] } <p class="indent"> |
将数据通过LogManager的parse方法转换到DataStore下:
public class LogManager { .. private DataStore<Long, Pageview> dataStore; ... } <p class="indent"> |
dataStore是通过
DataStoreFactory.
getDataStore(dataStoreClass, Long.class, Pageview.class, conf);获得,在任何地方要获得转换后的数据,只要通过DataStoreFactory的getDataStore方法获得即可。
比如org.apache.gora.tutorial.log.LogAnalytics是日志分析代码,通过读取上述日志文件,通过Map/Reduce计算出每天的PageView访问量。
public class LogAnalytics extends Configured implements Tool { private static final Logger log = LoggerFactory.getLogger(LogAnalytics.class); /** The number of miliseconds in a day */ private static final long DAY_MILIS = 1000 * 60 * 60 * 24; /** * The Mapper takes Long keys and Pageview objects, and emits * tuples of <url, day> as keys and 1 as values. Input values are * read from the input data store. * Note that all Hadoop serializable classes can be used as map output key and value. */ public static class LogAnalyticsMapper extends GoraMapper<Long, Pageview, TextLong, LongWritable> { private LongWritable one = new LongWritable(1L); private TextLong tuple; @[author]Override[/author] protected void setup(Context context) throws IOException ,InterruptedException { tuple = new TextLong(); tuple.setKey(new Text()); tuple.setValue(new LongWritable()); }; @[author]Override[/author] protected void map(Long key, Pageview pageview, Context context) throws IOException ,InterruptedException { Utf8 url = pageview.getUrl(); long day = getDay(pageview.getTimestamp()); tuple.getKey().set(url.toString()); tuple.getValue().set(day); context.write(tuple, one); }; /** Rolls up the given timestamp to the day cardinality, so that * data can be aggregated daily */ private long getDay(long timeStamp) { return (timeStamp / DAY_MILIS) * DAY_MILIS; } } /** * The Reducer receives tuples of <url, day> as keys and a list of * values corresponding to the keys, and emits a combined keys and * {@link MetricDatum} objects. The metric datum objects are stored * as job outputs in the output data store. */ public static class LogAnalyticsReducer extends GoraReducer<TextLong, LongWritable, String, MetricDatum> { private MetricDatum metricDatum = new MetricDatum(); @[author]Override[/author] protected void reduce(TextLong tuple , Iterable<LongWritable> values, Context context) throws IOException ,InterruptedException { long sum = 0L; //sum up the values for(LongWritable value: values) { sum+= value.get(); } String dimension = tuple.getKey().toString(); long timestamp = tuple.getValue().get(); metricDatum.setMetricDimension(new Utf8(dimension)); metricDatum.setTimestamp(timestamp); String key = metricDatum.getMetricDimension().toString(); key += "_" + Long.toString(timestamp); metricDatum.setMetric(sum); context.write(key, metricDatum); }; } /** * Creates and returns the {@link Job} for submitting to Hadoop mapreduce. * @param dataStore * @param query * @return * @throws IOException */ public Job createJob(DataStore<Long, Pageview> inStore , DataStore<String, MetricDatum> outStore, int numReducer) throws IOException { Job job = new Job(getConf()); job.setJobName("Log Analytics"); job.setNumReduceTasks(numReducer); job.setJarByClass(getClass()); /* Mappers are initialized with GoraMapper.initMapper() or * GoraInputFormat.setInput()*/ GoraMapper.initMapperJob(job, inStore, TextLong.class, LongWritable.class , LogAnalyticsMapper.class, true); /* Reducers are initialized with GoraReducerinitReducer(). * If the output is not to be persisted via Gora, any reducer * can be used instead. */ GoraReducer.initReducerJob(job, outStore, LogAnalyticsReducer.class); return job; } @[author]Override[/author] public int run(String[] args) throws Exception { DataStore<Long, Pageview> inStore; DataStore<String, MetricDatum> outStore; Configuration conf = new Configuration(); if(args.length > 0) { String dataStoreClass = args[0]; inStore = DataStoreFactory. getDataStore(dataStoreClass, Long.class, Pageview.class, conf); if(args.length > 1) { dataStoreClass = args[1]; } outStore = DataStoreFactory. getDataStore(dataStoreClass, String.class, MetricDatum.class, conf); } else { inStore = DataStoreFactory.getDataStore(Long.class, Pageview.class, conf); outStore = DataStoreFactory.getDataStore(String.class, MetricDatum.class, conf); } Job job = createJob(inStore, outStore, 3); boolean success = job.waitForCompletion(true); inStore.close(); outStore.close(); log.info("Log completed with " + (success ? "success" : "failure")); return success ? 0 : 1; } public static void main(String[] args) throws Exception { //run as any other MR job int ret = ToolRunner.run(new LogAnalytics(), args); System.exit(ret); } } <p class="indent"> |
猜你喜欢