怎样在cdh5.2上执行mahout实战的itemcf on hadoop

使用Mahout实现协同过滤
Mahout算法框架自带的推荐器有下面这些:
GenericUserBasedRecommender:基于用户的推荐器,用户数量少时速度快;
GenericItemBasedRecommender:基于商品推荐器,商品数量少时速度快,尤其当外部提供了商品相似度数据后效率更好;
SlopeOneRe...
Mahout算法框架自带的推荐器有下面这些:
GenericUserBasedRecommender:基于用户的推荐器,用户数量少时速度快;
GenericItemBasedRecommender:基于商品推荐器,商品数量少时速度快,尤其当外部提供了商品相似度数据后效率更好;
SlopeOneRecommender:基于slope-one算法的推荐器,在线推荐或更新较快,需要事先大量预处理运算,物品数量少时较好;
SVDRecommender:奇异值分解,推荐效果较好,但之前需要大量预处理运算;
KnnRecommender:基于k近邻算法(KNN),适合于物品数量较小时;
TreeClusteringRecommender:基于聚类的推荐器,在线推荐较快,之前需要大量预处理运算,用户数量较少时效果好;
Mahout最常用的三个推荐器是上述的前三个,本文主要讨论前两种的使用。
接口相关介绍
基于用户或物品的推荐器主要包括以下几个接口:
DataModel 是用户喜好信息的抽象接口,它的具体实现支持从任意类型的数据源抽取用户喜好信息。Taste 默认提供 JDBCDataModel 和 FileDataModel,分别支持从数据库和文件中读取用户的喜好信息。
UserSimilarity 和 ItemSimilarity。UserSimilarity 用于定义两个用户间的相似度,它是基于协同过滤的推荐引擎的核心部分,可以用来计算用户的“邻居”,这里我们将与当前用户口味相似的用户称为他的邻居。ItemSimilarity 类似的,计算内容之间的相似度。
UserNeighborhood 用于基于用户相似度的推荐方法中,推荐的内容是基于找到与当前用户喜好相似的邻居用户的方式产生的。UserNeighborhood 定义了确定邻居用户的方法,具体实现一般是基于 UserSimilarity 计算得到的。
Recommender 是推荐引擎的抽象接口,Taste 中的核心组件。程序中,为它提供一个 DataModel,它可以计算出对不同用户的推荐内容。实际应用中,主要使用它的实现类 GenericUserBasedRecommender 或者 GenericItemBasedRecommender,分别实现基于用户相似度的推荐引擎或者基于内容的推荐引擎。
RecommenderEvaluator:评分器。
RecommenderIRStatsEvaluator:搜集推荐性能相关的指标,包括准确率、召回率等等。
目前,Mahout为DataModel提供了以下几种实现:
org.apache.mahout.cf.taste.impl.model.GenericDataModel
org.apache.mahout.cf.taste.impl.model.GenericBooleanPrefDataModel
org.apache.mahout.cf.taste.impl.model.PlusAnonymousUserDataModel
org.apache.mahout.cf.taste.impl.model.file.FileDataModel
org.apache.mahout.cf.taste.impl.model.hbase.HBaseDataModel
org.apache.mahout.cf.taste.impl.model.cassandra.CassandraDataModel
org.apache.mahout.cf.taste.impl.model.mongodb.MongoDBDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.SQL92JDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.MySQLJDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.PostgreSQLJDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.GenericJDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.SQL92BooleanPrefJDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.MySQLBooleanPrefJDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.PostgreBooleanPrefSQLJDBCDataModel
org.apache.mahout.cf.taste.impl.model.jdbc.ReloadFromJDBCDataModel
从类名上就可以大概猜出来每个DataModel的用途,奇怪的是竟然没有HDFS的DataModel,有人实现了一个,请参考。
UserSimilarity 和 ItemSimilarity 相似度实现有以下几种:
CityBlockSimilarity:基于Manhattan距离相似度
EuclideanDistanceSimilarity:基于欧几里德距离计算相似度
LogLikelihoodSimilarity:基于对数似然比的相似度
PearsonCorrelationSimilarity:基于皮尔逊相关系数计算相似度
SpearmanCorrelationSimilarity:基于皮尔斯曼相关系数相似度
TanimotoCoefficientSimilarity:基于谷本系数计算相似度
UncenteredCosineSimilarity:计算 Cosine 相似度
以上相似度的说明,请参考。
UserNeighborhood 主要实现有两种:
NearestNUserNeighborhood:对每个用户取固定数量N个最近邻居
ThresholdUserNeighborhood:对每个用户基于一定的限制,取落在相似度限制以内的所有用户为邻居
Recommender分为以下几种实现:
GenericUserBasedRecommender:基于用户的推荐引擎
GenericBooleanPrefUserBasedRecommender:基于用户的无偏好值推荐引擎
GenericItemBasedRecommender:基于物品的推荐引擎
GenericBooleanPrefItemBasedRecommender:基于物品的无偏好值推荐引擎
RecommenderEvaluator有以下几种实现:
AverageAbsoluteDifferenceRecommenderEvaluator:计算平均差值
RMSRecommenderEvaluator:计算均方根差
RecommenderIRStatsEvaluator的实现类是GenericRecommenderIRStatsEvaluator。
首先,需要在maven中加入对mahout的依赖:
&dependency&
&groupId&org.apache.mahout&/groupId&
&artifactId&mahout-core&/artifactId&
&version&0.9&/version&
&/dependency&
&dependency&
&groupId&org.apache.mahout&/groupId&
&artifactId&mahout-integration&/artifactId&
&version&0.9&/version&
&/dependency&
&dependency&
&groupId&org.apache.mahout&/groupId&
&artifactId&mahout-math&/artifactId&
&version&0.9&/version&
&/dependency&
&dependency&
&groupId&org.apache.mahout&/groupId&
&artifactId&mahout-examples&/artifactId&
&version&0.9&/version&
&/dependency&
基于用户的推荐,以FileDataModel为例:
File modelFile modelFile = new File("intro.csv");
DataModel model = new FileDataModel(modelFile);
//用户相似度,使用基于皮尔逊相关系数计算相似度
UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
//选择邻居用户,使用NearestNUserNeighborhood实现UserNeighborhood接口,选择邻近的4个用户
UserNeighborhood neighborhood = new NearestNUserNeighborhood(4, similarity, model);
Recommender recommender = new GenericUserBasedRecommender(model, neighborhood, similarity);
//给用户1推荐4个物品
List&RecommendedItem& recommendations = recommender.recommend(1, 4);
for (RecommendedItem recommendation : recommendations) {
System.out.println(recommendation);
注意: FileDataModel要求输入文件中的字段分隔符为逗号或者制表符,如果你想使用其他分隔符,你可以扩展一个FileDataModel的实现,例如,mahout中已经提供了一个解析MoiveLens的数据集(分隔符为::)的实现GroupLensDataModel。
GenericUserBasedRecommender是基于用户的简单推荐器实现类,推荐主要参照传入的DataModel和UserNeighborhood,总体是三个步骤:
(1) 从UserNeighborhood获取当前用户Ui最相似的K个用户集合{U1, U2, …Uk};
(2) 从这K个用户集合排除Ui的偏好商品,剩下的Item集合为{Item0, Item1, …Itemm};
(3) 对Item集合里每个Itemj计算Ui可能偏好程度值pref(Ui, Itemj),并把Item按此数值从高到低排序,前N个item推荐给用户Ui。
对相同用户重复获得推荐结果,我们可以改用CachingRecommender来包装GenericUserBasedRecommender对象,将推荐结果缓存起来:
Recommender cachingRecommender = new CachingRecommender(recommender);
上面代码可以在main方法中直接运行,然后,我们可以获取推荐模型的评分:
//使用平均绝对差值获得评分
RecommenderEvaluator evaluator = new AverageAbsoluteDifferenceRecommenderEvaluator();
// 用RecommenderBuilder构建推荐引擎
RecommenderBuilder recommenderBuilder = new RecommenderBuilder() {
public Recommender buildRecommender(DataModel model) throws TasteException {
UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
UserNeighborhood neighborhood = new NearestNUserNeighborhood(4, similarity, model);
return new GenericUserBasedRecommender(model, neighborhood, similarity);
// Use 70%
test using the other 30%.
double score = evaluator.evaluate(recommenderBuilder, null, model, 0.7, 1.0);
System.out.println(score);
接下来,可以获取推荐结果的查准率和召回率:
RecommenderIRStatsEvaluator statsEvaluator = new GenericRecommenderIRStatsEvaluator();
// Build the same recommender for testing that we did last time:
RecommenderBuilder recommenderBuilder = new RecommenderBuilder() {
public Recommender buildRecommender(DataModel model) throws TasteException {
UserSimilarity similarity = new PearsonCorrelationSimilarity(model);
UserNeighborhood neighborhood = new NearestNUserNeighborhood(4, similarity, model);
return new GenericUserBasedRecommender(model, neighborhood, similarity);
// 计算推荐4个结果时的查准率和召回率
IRStatistics stats = statsEvaluator.evaluate(recommenderBuilder,null, model, null, 4,
GenericRecommenderIRStatsEvaluator.CHOOSE_THRESHOLD,1.0);
System.out.println(stats.getPrecision());
System.out.println(stats.getRecall());
如果是基于物品的推荐,代码大体相似,只是没有了UserNeighborhood,然后将上面代码中的User换成Item即可,完整代码如下:
File modelFile modelFile = new File("intro.csv");
DataModel model = new FileDataModel(new File(file));
// Build the same recommender for testing that we did last time:
RecommenderBuilder recommenderBuilder = new RecommenderBuilder() {
public Recommender buildRecommender(DataModel model) throws TasteException {
ItemSimilarity similarity = new PearsonCorrelationSimilarity(model);
return new GenericItemBasedRecommender(model, similarity);
//获取推荐结果
List&RecommendedItem& recommendations = recommenderBuilder.buildRecommender(model).recommend(1, 4);
for (RecommendedItem recommendation : recommendations) {
System.out.println(recommendation);
//计算评分
RecommenderEvaluator evaluator =
new AverageAbsoluteDifferenceRecommenderEvaluator();
// Use 70%
test using the other 30%.
double score = evaluator.evaluate(recommenderBuilder, null, model, 0.7, 1.0);
System.out.println(score);
//计算查全率和查准率
RecommenderIRStatsEvaluator statsEvaluator = new GenericRecommenderIRStatsEvaluator();
// Evaluate precision and recall "at 2":
IRStatistics stats = statsEvaluator.evaluate(recommenderBuilder,
null, model, null, 4,
GenericRecommenderIRStatsEvaluator.CHOOSE_THRESHOLD,
System.out.println(stats.getPrecision());
System.out.println(stats.getRecall());
在Spark中运行
在Spark中运行,需要将Mahout相关的jar添加到Spark的classpath中,修改/etc/spark/conf/spark-env.sh,添加下面两行代码:
SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:/usr/lib/mahout/lib/*"
SPARK_DIST_CLASSPATH="$SPARK_DIST_CLASSPATH:/usr/lib/mahout/*"
然后,以本地模式在spark-shell中运行下面代码交互测试:
//注意:这里是本地目录
val model = new FileDataModel(new File("intro.csv"))
val evaluator = new RMSRecommenderEvaluator()
val recommenderBuilder = new RecommenderBuilder {
override def buildRecommender(dataModel: DataModel): Recommender = {
val similarity = new LogLikelihoodSimilarity(dataModel)
new GenericItemBasedRecommender(dataModel, similarity)
val score = evaluator.evaluate(recommenderBuilder, null, model, 0.95, 0.05)
println(s"Score=$score")
val recommender=recommenderBuilder.buildRecommender(model)
val users=trainingRatings.map(_.user).distinct().take(20)
import scala.collection.JavaConversions._
val result=users.par.map{user=&
user+","+recommender.recommend(user,40).map(_.getItemID).mkString(",")
上面有一个评估基于物品或是用户的各种相似度下的评分的类,叫做 RecommenderEvaluator,供大家学习参考。
分布式运行
Mahout提供了org.apache.mahout.cf.taste.hadoop.item.RecommenderJob类以MapReduce的方式来实现基于物品的协同过滤,查看该类的使用说明:
$ hadoop jar /usr/lib/mahout/mahout-examples-0.9-cdh5.4.0-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob
15/06/10 16:19:34 ERROR common.AbstractJob: Missing required option --similarityClassname
Missing required option --similarityClassname
[--input &input& --output &output& --numRecommendations &numRecommendations&
--usersFile &usersFile& --itemsFile &itemsFile& --filterFile &filterFile&
--booleanData &booleanData& --maxPrefsPerUser &maxPrefsPerUser&
--minPrefsPerUser &minPrefsPerUser& --maxSimilaritiesPerItem
&maxSimilaritiesPerItem& --maxPrefsInItemSimilarity &maxPrefsInItemSimilarity&
--similarityClassname &similarityClassname& --threshold &threshold&
--outputPathForSimilarityMatrix &outputPathForSimilarityMatrix& --randomSeed
&randomSeed& --sequencefileOutput --help --tempDir &tempDir& --startPhase
&startPhase& --endPhase &endPhase&]
--similarityClassname (-s) similarityClassname
Name of distributed
similarity measures class to
instantiate, alternatively
use one of the predefined
similarities
([SIMILARITY_COOCCURRENCE,
SIMILARITY_LOGLIKELIHOOD,
SIMILARITY_TANIMOTO_COEFFICIEN
T, SIMILARITY_CITY_BLOCK,
SIMILARITY_COSINE,
SIMILARITY_PEARSON_CORRELATION
SIMILARITY_EUCLIDEAN_DISTANCE]
可见,该类可以接收的命令行参数如下:
--input(path): 存储用户偏好数据的目录,该目录下可以包含一个或多个存储用户偏好数据的文本文件;
--output(path): 结算结果的输出目录
--numRecommendations (integer): 为每个用户推荐的item数量,默认为10
--usersFile (path): 指定一个包含了一个或多个存储userID的文件路径,仅为该路径下所有文件包含的userID做推荐计算 (该选项可选)
--itemsFile (path): 指定一个包含了一个或多个存储itemID的文件路径,仅为该路径下所有文件包含的itemID做推荐计算 (该选项可选)
--filterFile (path): 指定一个路径,该路径下的文件包含了[userID,itemID]值对,userID和itemID用逗号分隔。计算结果将不会为user推荐[userID,itemID]值对中包含的item (该选项可选)
--booleanData (boolean): 如果输入数据不包含偏好数值,则将该参数设置为true,默认为false
--maxPrefsPerUser (integer): 在最后计算推荐结果的阶段,针对每一个user使用的偏好数据的最大数量,默认为10
--minPrefsPerUser (integer): 在相似度计算中,忽略所有偏好数据量少于该值的用户,默认为1
--maxSimilaritiesPerItem (integer): 针对每个item的相似度最大值,默认为100
--maxPrefsPerUserInItemSimilarity (integer): 在item相似度计算阶段,针对每个用户考虑的偏好数据最大数量,默认为1000
--similarityClassname (classname): 向量相似度计算类
outputPathForSimilarityMatrix:SimilarityMatrix输出目录
--randomSeed:随机种子 –sequencefileOutput:序列文件输出路径
--tempDir (path): 存储临时文件的目录,默认为当前用户的home目录下的temp目录
--startPhase
--endPhase
--threshold (double): 忽略相似度低于该阀值的item对
一个例子如下,使用SIMILARITY_LOGLIKELIHOOD相似度推荐物品:
$ hadoop jar /usr/lib/mahout/mahout-examples-0.9-cdh5.4.0-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob --input /tmp/mahout/part-00000 --output /tmp/mahout-out
-s SIMILARITY_LOGLIKELIHOOD
默认情况下,mahout使用的reduce数目为1,这样造成大数据处理时效率较低,可以通过参数mahout执行脚本中的MAHOUT_OPTS中的-Dmapred.reduce.tasks参数指定reduce数目。
上面命令运行完成之后,会在当前用户的hdfs主目录生成temp目录,该目录可由--tempDir (path)参数设置:
$ hadoop fs -ls temp
Found 10 items
-rw-r--r--
3 root hadoop
14:42 temp/maxValues.bin
-rw-r--r--
3 root hadoop
5-06-10 14:42 temp/norms.bin
drwxr-xr-x
- root hadoop
14:41 temp/notUsed
-rw-r--r--
3 root hadoop
14:42 temp/numNonZeroEntries.bin
-rw-r--r--
3 root hadoop
5-06-10 14:41 temp/observationsPerColumn.bin
drwxr-xr-x
- root hadoop
14:47 temp/pairwiseSimilarity
drwxr-xr-x
- root hadoop
14:52 temp/partialMultiply
drwxr-xr-x
- root hadoop
14:39 temp/preparePreferenceMatrix
drwxr-xr-x
- root hadoop
14:50 temp/similarityMatrix
drwxr-xr-x
- root hadoop
14:42 temp/weights
观察yarn的管理界面,该命令会生成9个任务,任务名称依次是:
PreparePreferenceMatrixJob-ItemIDIndexMapper-Reducer
PreparePreferenceMatrixJob-ToItemPrefsMapper-Reducer
PreparePreferenceMatrixJob-ToItemVectorsMapper-Reducer
RowSimilarityJob-CountObservationsMapper-Reducer
RowSimilarityJob-VectorNormMapper-Reducer
RowSimilarityJob-CooccurrencesMapper-Reducer
RowSimilarityJob-UnsymmetrifyMapper-Reducer
partialMultiply
RecommenderJob-PartialMultiplyMapper-Reducer
从任务名称,大概可以知道每个任务在做什么,如果你的输入参数不一样,生成的任务数可能不一样,这个需要测试一下才能确认。
在hdfs上查看输出的结果,用户和推荐结果用\t分隔,推荐结果中物品之间用逗号分隔,物品后面通过冒号连接评分:
843 [.426,.:4.5:4.108,.0467]
[.:3.164,.:3.7:3.3839]
使用Java API方式执行,请参考。
在Scala或者Spark中,可以以Java API或者命令方式运行,最后还可以通过Spark来处理推荐的结果,例如:过滤、去重、补足数据,这部分内容不做介绍。
版权声明:本文内容由互联网用户自发贡献,本社区不拥有所有权,也不承担相关法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至: 进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
用云栖社区APP,舒服~
【云栖快讯】红轴机械键盘、无线鼠标等753个大奖,先到先得,云栖社区首届博主招募大赛9月21日-11月20日限时开启,为你再添一个高端技术交流场所&&
云上企业级一站式智能研发协同平台,为企业用户提供从需求、编码到测试、发布、反馈等端到端的持续交付服务。
基于云安全大数据能力实现,通过防御SQL注入、XSS跨站脚本、常见Web服务器插件漏洞、木马上传、非授权核心资源...
消息队列(Message Queue,简称MQ)是阿里云商用的专业消息中间件,是企业级互联网架构的核心产品,基于...
为您提供简单高效、处理能力可弹性伸缩的计算服务,帮助您快速构建更稳定、安全的应用,提升运维效率,降低 IT 成本...
MaxCompute75折抢购
Loading...,主要介绍Hadoop家族产品,常用的项目包括Hadoop,
Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。
从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。
作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!
关于作者:
转载请注明出处:
Mahout是Hadoop家族一员,从血缘就继承了Hadoop程序的特点,支持HDFS访问和MapReduce分步式算法。随着Mahout的发展,从0.7版本开始,Mahout做了重大的升级。移除了部分算法的单机内存计算,只支持基于Hadoop的MapReduce平行计算。
从这点上,我们能看出Mahout走向大数据,坚持并行化的决心!相信在Hadoop的大框架下,Mahout最终能成为一个大数据的明星产品!
1. Mahout开发环境介绍
在&&文章中,我们已经配置好了基于Maven的Mahout的开发环境,我们将继续完成Mahout的分步式的程序开发。
本文的mahout版本为0.8。
开发环境:
找到pom.xml,修改mahout版本为0.8
&mahout.version&0.8&/mahout.version&
然后,下载依赖库。
~ mvn clean install
由于 org.conan.mymahout.cluster06.Kmeans.java 类代码,是基于mahout-0.6的,所以会报错。我们可以先注释这个文件。
2. Mahout基于Hadoop的分步环境介绍
如上图所示,我们可以选择在win7中开发,也可以在linux中开发,开发过程我们可以在本地环境进行调试,标配的工具都是Maven和Eclipse。
Mahout在运行过程中,会把MapReduce的算法程序包,自动发布的Hadoop的集群环境中,这种开发和运行模式,就和真正的生产环境差不多了。
3. 用Mahout实现协同过滤ItemCF
1). 准备数据文件: item.csv
上传测试数据到HDFS,单机内存实验请参考文章:
~ hadoop fs -mkdir /user/hdfs/userCF
~ hadoop fs -copyFromLocal /home/conan/datafiles/item.csv /user/hdfs/userCF
~ hadoop fs -cat /user/hdfs/userCF/item.csv
2). Java程序:HdfsDAO.java
HdfsDAO.java,是一个HDFS操作的工具,用API实现Hadoop的各种HDFS命令,请参考文章:
我们这里会用到HdfsDAO.java类中的一些方法:
HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
hdfs.rmr(inPath);
hdfs.mkdirs(inPath);
hdfs.copyFile(localFile, inPath);
hdfs.ls(inPath);
hdfs.cat(inFile);
3). Java程序:ItemCFHadoop.java
用Mahout实现分步式算法,我们看到Mahout in Action中的解释。
实现程序:
package org.conan.mymahout.
import org.apache.hadoop.mapred.JobC
import org.apache.mahout.cf.taste.hadoop.item.RecommenderJ
import org.conan.mymahout.hdfs.HdfsDAO;
public class ItemCFHadoop {
private static final String HDFS = &hdfs://192.168.1.210:9000&;
public static void main(String[] args) throws Exception {
String localFile = &datafile/item.csv&;
String inPath = HDFS + &/user/hdfs/userCF&;
String inFile = inPath + &/item.csv&;
String outPath = HDFS + &/user/hdfs/userCF/result/&;
String outFile = outPath + &/part-r-00000&;
String tmpPath = HDFS + &/tmp/& + System.currentTimeMillis();
JobConf conf = config();
HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
hdfs.rmr(inPath);
hdfs.mkdirs(inPath);
hdfs.copyFile(localFile, inPath);
hdfs.ls(inPath);
hdfs.cat(inFile);
StringBuilder sb = new StringBuilder();
sb.append(&--input &).append(inPath);
sb.append(& --output &).append(outPath);
sb.append(& --booleanData true&);
sb.append(& --similarityClassname org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.EuclideanDistanceSimilarity&);
sb.append(& --tempDir &).append(tmpPath);
args = sb.toString().split(& &);
RecommenderJob job = new RecommenderJob();
job.setConf(conf);
job.run(args);
hdfs.cat(outFile);
public static JobConf config() {
JobConf conf = new JobConf(ItemCFHadoop.class);
conf.setJobName(&ItemCFHadoop&);
conf.addResource(&classpath:/hadoop/core-site.xml&);
conf.addResource(&classpath:/hadoop/hdfs-site.xml&);
conf.addResource(&classpath:/hadoop/mapred-site.xml&);
RecommenderJob.java,实际上就是封装了,上面整个图的分步式并行算法的执行过程!如果没有这层封装,我们需要自己去实现图中8个步骤MapReduce算法。
关于上面算法的深度剖析,请参考文章:
4). 运行程序
控制台输出:
Delete: hdfs://192.168.1.210:9000/user/hdfs/userCF
Create: hdfs://192.168.1.210:9000/user/hdfs/userCF
copy from: datafile/item.csv to hdfs://192.168.1.210:9000/user/hdfs/userCF
ls: hdfs://192.168.1.210:9000/user/hdfs/userCF
==========================================================
name: hdfs://192.168.1.210:9000/user/hdfs/userCF/item.csv, folder: false, size: 229
==========================================================
cat: hdfs://192.168.1.210:9000/user/hdfs/userCF/item.csv
5,106,4.0SLF4J: Failed to load class &org.slf4j.impl.StaticLoggerBinder&.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
10:26:35 org.apache.hadoop.util.NativeCodeLoader
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
10:26:35 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
10:26:35 org.apache.press.snappy.LoadSnappy
警告: Snappy native library not loaded
10:26:36 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
10:26:36 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: io.sort.mb = 100
10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: data buffer = 14720
10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: record buffer = 680
10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
10:26:36 org.apache.press.CodecPool getCompressor
信息: Got brand-new compressor
10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
10:26:36 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_ is done. And is in the process of commiting
10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:36 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_' done.
10:26:36 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:36 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
10:26:36 org.apache.press.CodecPool getDecompressor
信息: Got brand-new decompressor
10:26:36 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 42 bytes
10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:36 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_ is done. And is in the process of commiting
10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:36 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_ is allowed to commit now
10:26:36 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_' to hdfs://192.168.1.210:9000/tmp/0/preparePreferenceMatrix/itemIDIndex
10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce & reduce
10:26:36 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_' done.
10:26:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 100% reduce 100%
10:26:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
10:26:37 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
10:26:37 org.apache.hadoop.mapred.Counters log
File Output Format Counters
10:26:37 org.apache.hadoop.mapred.Counters log
Bytes Written=187
10:26:37 org.apache.hadoop.mapred.Counters log
FileSystemCounters
10:26:37 org.apache.hadoop.mapred.Counters log
FILE_BYTES_READ=3287330
10:26:37 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_READ=916
10:26:37 org.apache.hadoop.mapred.Counters log
FILE_BYTES_WRITTEN=3443292
10:26:37 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_WRITTEN=645
10:26:37 org.apache.hadoop.mapred.Counters log
File Input Format Counters
10:26:37 org.apache.hadoop.mapred.Counters log
Bytes Read=229
10:26:37 org.apache.hadoop.mapred.Counters log
Map-Reduce Framework
10:26:37 org.apache.hadoop.mapred.Counters log
Map output materialized bytes=46
10:26:37 org.apache.hadoop.mapred.Counters log
Map input records=21
10:26:37 org.apache.hadoop.mapred.Counters log
Reduce shuffle bytes=0
10:26:37 org.apache.hadoop.mapred.Counters log
Spilled Records=14
10:26:37 org.apache.hadoop.mapred.Counters log
Map output bytes=84
10:26:37 org.apache.hadoop.mapred.Counters log
Total committed heap usage (bytes)=
10:26:37 org.apache.hadoop.mapred.Counters log
SPLIT_RAW_BYTES=116
10:26:37 org.apache.hadoop.mapred.Counters log
Combine input records=21
10:26:37 org.apache.hadoop.mapred.Counters log
Reduce input records=7
10:26:37 org.apache.hadoop.mapred.Counters log
Reduce input groups=7
10:26:37 org.apache.hadoop.mapred.Counters log
Combine output records=7
10:26:37 org.apache.hadoop.mapred.Counters log
Reduce output records=7
10:26:37 org.apache.hadoop.mapred.Counters log
Map output records=21
10:26:37 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
10:26:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0002
10:26:37 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: io.sort.mb = 100
10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: data buffer = 14720
10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: record buffer = 680
10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
10:26:37 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0002_m_ is done. And is in the process of commiting
10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:37 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0002_m_' done.
10:26:37 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
10:26:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 68 bytes
10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:37 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0002_r_ is done. And is in the process of commiting
10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:37 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0002_r_ is allowed to commit now
10:26:37 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0002_r_' to hdfs://192.168.1.210:9000/tmp/0/preparePreferenceMatrix/userVectors
10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce & reduce
10:26:37 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0002_r_' done.
10:26:38 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 100% reduce 100%
10:26:38 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0002
10:26:38 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
10:26:38 org.apache.hadoop.mapred.Counters log
org.apache.mahout.cf.taste.hadoop.item.ToUserVectorsReducer$Counters
10:26:38 org.apache.hadoop.mapred.Counters log
10:26:38 org.apache.hadoop.mapred.Counters log
File Output Format Counters
10:26:38 org.apache.hadoop.mapred.Counters log
Bytes Written=288
10:26:38 org.apache.hadoop.mapred.Counters log
FileSystemCounters
10:26:38 org.apache.hadoop.mapred.Counters log
FILE_BYTES_READ=6574274
10:26:38 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_READ=1374
10:26:38 org.apache.hadoop.mapred.Counters log
FILE_BYTES_WRITTEN=6887592
10:26:38 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_WRITTEN=1120
10:26:38 org.apache.hadoop.mapred.Counters log
File Input Format Counters
10:26:38 org.apache.hadoop.mapred.Counters log
Bytes Read=229
10:26:38 org.apache.hadoop.mapred.Counters log
Map-Reduce Framework
10:26:38 org.apache.hadoop.mapred.Counters log
Map output materialized bytes=72
10:26:38 org.apache.hadoop.mapred.Counters log
Map input records=21
10:26:38 org.apache.hadoop.mapred.Counters log
Reduce shuffle bytes=0
10:26:38 org.apache.hadoop.mapred.Counters log
Spilled Records=42
10:26:38 org.apache.hadoop.mapred.Counters log
Map output bytes=63
10:26:38 org.apache.hadoop.mapred.Counters log
Total committed heap usage (bytes)=
10:26:38 org.apache.hadoop.mapred.Counters log
SPLIT_RAW_BYTES=116
10:26:38 org.apache.hadoop.mapred.Counters log
Combine input records=0
10:26:38 org.apache.hadoop.mapred.Counters log
Reduce input records=21
10:26:38 org.apache.hadoop.mapred.Counters log
Reduce input groups=5
10:26:38 org.apache.hadoop.mapred.Counters log
Combine output records=0
10:26:38 org.apache.hadoop.mapred.Counters log
Reduce output records=5
10:26:38 org.apache.hadoop.mapred.Counters log
Map output records=21
10:26:38 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
10:26:38 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0003
10:26:38 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: io.sort.mb = 100
10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: data buffer = 14720
10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: record buffer = 680
10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
10:26:38 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0003_m_ is done. And is in the process of commiting
10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:38 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0003_m_' done.
10:26:38 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:38 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
10:26:38 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 89 bytes
10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:38 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0003_r_ is done. And is in the process of commiting
10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:38 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0003_r_ is allowed to commit now
10:26:38 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0003_r_' to hdfs://192.168.1.210:9000/tmp/0/preparePreferenceMatrix/ratingMatrix
10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce & reduce
10:26:38 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0003_r_' done.
10:26:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 100% reduce 100%
10:26:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0003
10:26:39 org.apache.hadoop.mapred.Counters log
信息: Counters: 21
10:26:39 org.apache.hadoop.mapred.Counters log
File Output Format Counters
10:26:39 org.apache.hadoop.mapred.Counters log
Bytes Written=335
10:26:39 org.apache.hadoop.mapred.Counters log
org.apache.mahout.cf.taste.hadoop.preparation.ToItemVectorsMapper$Elements
10:26:39 org.apache.hadoop.mapred.Counters log
USER_RATINGS_NEGLECTED=0
10:26:39 org.apache.hadoop.mapred.Counters log
USER_RATINGS_USED=21
10:26:39 org.apache.hadoop.mapred.Counters log
FileSystemCounters
10:26:39 org.apache.hadoop.mapred.Counters log
FILE_BYTES_READ=9861349
10:26:39 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_READ=1950
10:26:39 org.apache.hadoop.mapred.Counters log
FILE_BYTES_WRITTEN=
10:26:39 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_WRITTEN=1751
10:26:39 org.apache.hadoop.mapred.Counters log
File Input Format Counters
10:26:39 org.apache.hadoop.mapred.Counters log
Bytes Read=288
10:26:39 org.apache.hadoop.mapred.Counters log
Map-Reduce Framework
10:26:39 org.apache.hadoop.mapred.Counters log
Map output materialized bytes=93
10:26:39 org.apache.hadoop.mapred.Counters log
Map input records=5
10:26:39 org.apache.hadoop.mapred.Counters log
Reduce shuffle bytes=0
10:26:39 org.apache.hadoop.mapred.Counters log
Spilled Records=14
10:26:39 org.apache.hadoop.mapred.Counters log
Map output bytes=336
10:26:39 org.apache.hadoop.mapred.Counters log
Total committed heap usage (bytes)=
10:26:39 org.apache.hadoop.mapred.Counters log
SPLIT_RAW_BYTES=157
10:26:39 org.apache.hadoop.mapred.Counters log
Combine input records=21
10:26:39 org.apache.hadoop.mapred.Counters log
Reduce input records=7
10:26:39 org.apache.hadoop.mapred.Counters log
Reduce input groups=7
10:26:39 org.apache.hadoop.mapred.Counters log
Combine output records=7
10:26:39 org.apache.hadoop.mapred.Counters log
Reduce output records=7
10:26:39 org.apache.hadoop.mapred.Counters log
Map output records=21
10:26:39 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
10:26:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0004
10:26:39 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: io.sort.mb = 100
10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: data buffer = 14720
10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: record buffer = 680
10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
10:26:39 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0004_m_ is done. And is in the process of commiting
10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:39 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0004_m_' done.
10:26:39 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:39 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
10:26:39 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 118 bytes
10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:39 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0004_r_ is done. And is in the process of commiting
10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:39 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0004_r_ is allowed to commit now
10:26:39 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0004_r_' to hdfs://192.168.1.210:9000/tmp/0/weights
10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce & reduce
10:26:39 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0004_r_' done.
10:26:40 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 100% reduce 100%
10:26:40 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0004
10:26:40 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
10:26:40 org.apache.hadoop.mapred.Counters log
File Output Format Counters
10:26:40 org.apache.hadoop.mapred.Counters log
Bytes Written=381
10:26:40 org.apache.hadoop.mapred.Counters log
FileSystemCounters
10:26:40 org.apache.hadoop.mapred.Counters log
FILE_BYTES_READ=
10:26:40 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_READ=2628
10:26:40 org.apache.hadoop.mapred.Counters log
FILE_BYTES_WRITTEN=
10:26:40 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_WRITTEN=2551
10:26:40 org.apache.hadoop.mapred.Counters log
File Input Format Counters
10:26:40 org.apache.hadoop.mapred.Counters log
Bytes Read=335
10:26:40 org.apache.hadoop.mapred.Counters log
org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob$Counters
10:26:40 org.apache.hadoop.mapred.Counters log
10:26:40 org.apache.hadoop.mapred.Counters log
Map-Reduce Framework
10:26:40 org.apache.hadoop.mapred.Counters log
Map output materialized bytes=122
10:26:40 org.apache.hadoop.mapred.Counters log
Map input records=7
10:26:40 org.apache.hadoop.mapred.Counters log
Reduce shuffle bytes=0
10:26:40 org.apache.hadoop.mapred.Counters log
Spilled Records=16
10:26:40 org.apache.hadoop.mapred.Counters log
Map output bytes=516
10:26:40 org.apache.hadoop.mapred.Counters log
Total committed heap usage (bytes)=
10:26:40 org.apache.hadoop.mapred.Counters log
SPLIT_RAW_BYTES=158
10:26:40 org.apache.hadoop.mapred.Counters log
Combine input records=24
10:26:40 org.apache.hadoop.mapred.Counters log
Reduce input records=8
10:26:40 org.apache.hadoop.mapred.Counters log
Reduce input groups=8
10:26:40 org.apache.hadoop.mapred.Counters log
Combine output records=8
10:26:40 org.apache.hadoop.mapred.Counters log
Reduce output records=5
10:26:40 org.apache.hadoop.mapred.Counters log
Map output records=24
10:26:40 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
10:26:40 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0005
10:26:40 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: io.sort.mb = 100
10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: data buffer = 14720
10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: record buffer = 680
10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
10:26:40 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0005_m_ is done. And is in the process of commiting
10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:40 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0005_m_' done.
10:26:40 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:40 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
10:26:40 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 121 bytes
10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:40 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0005_r_ is done. And is in the process of commiting
10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:40 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0005_r_ is allowed to commit now
10:26:40 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0005_r_' to hdfs://192.168.1.210:9000/tmp/0/pairwiseSimilarity
10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce & reduce
10:26:40 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0005_r_' done.
10:26:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 100% reduce 100%
10:26:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0005
10:26:41 org.apache.hadoop.mapred.Counters log
信息: Counters: 21
10:26:41 org.apache.hadoop.mapred.Counters log
File Output Format Counters
10:26:41 org.apache.hadoop.mapred.Counters log
Bytes Written=392
10:26:41 org.apache.hadoop.mapred.Counters log
FileSystemCounters
10:26:41 org.apache.hadoop.mapred.Counters log
FILE_BYTES_READ=
10:26:41 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_READ=3488
10:26:41 org.apache.hadoop.mapred.Counters log
FILE_BYTES_WRITTEN=
10:26:41 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_WRITTEN=3408
10:26:41 org.apache.hadoop.mapred.Counters log
File Input Format Counters
10:26:41 org.apache.hadoop.mapred.Counters log
Bytes Read=381
10:26:41 org.apache.hadoop.mapred.Counters log
org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob$Counters
10:26:41 org.apache.hadoop.mapred.Counters log
PRUNED_COOCCURRENCES=0
10:26:41 org.apache.hadoop.mapred.Counters log
COOCCURRENCES=57
10:26:41 org.apache.hadoop.mapred.Counters log
Map-Reduce Framework
10:26:41 org.apache.hadoop.mapred.Counters log
Map output materialized bytes=125
10:26:41 org.apache.hadoop.mapred.Counters log
Map input records=5
10:26:41 org.apache.hadoop.mapred.Counters log
Reduce shuffle bytes=0
10:26:41 org.apache.hadoop.mapred.Counters log
Spilled Records=14
10:26:41 org.apache.hadoop.mapred.Counters log
Map output bytes=744
10:26:41 org.apache.hadoop.mapred.Counters log
Total committed heap usage (bytes)=
10:26:41 org.apache.hadoop.mapred.Counters log
SPLIT_RAW_BYTES=129
10:26:41 org.apache.hadoop.mapred.Counters log
Combine input records=21
10:26:41 org.apache.hadoop.mapred.Counters log
Reduce input records=7
10:26:41 org.apache.hadoop.mapred.Counters log
Reduce input groups=7
10:26:41 org.apache.hadoop.mapred.Counters log
Combine output records=7
10:26:41 org.apache.hadoop.mapred.Counters log
Reduce output records=7
10:26:41 org.apache.hadoop.mapred.Counters log
Map output records=21
10:26:41 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
10:26:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0006
10:26:41 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: io.sort.mb = 100
10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: data buffer = 14720
10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: record buffer = 680
10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
10:26:41 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0006_m_ is done. And is in the process of commiting
10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:41 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0006_m_' done.
10:26:41 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:41 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
10:26:41 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 158 bytes
10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:41 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0006_r_ is done. And is in the process of commiting
10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:41 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0006_r_ is allowed to commit now
10:26:41 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0006_r_' to hdfs://192.168.1.210:9000/tmp/0/similarityMatrix
10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce & reduce
10:26:41 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0006_r_' done.
10:26:42 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 100% reduce 100%
10:26:42 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0006
10:26:42 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
10:26:42 org.apache.hadoop.mapred.Counters log
File Output Format Counters
10:26:42 org.apache.hadoop.mapred.Counters log
Bytes Written=554
10:26:42 org.apache.hadoop.mapred.Counters log
FileSystemCounters
10:26:42 org.apache.hadoop.mapred.Counters log
FILE_BYTES_READ=
10:26:42 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_READ=4342
10:26:42 org.apache.hadoop.mapred.Counters log
FILE_BYTES_WRITTEN=
10:26:42 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_WRITTEN=4354
10:26:42 org.apache.hadoop.mapred.Counters log
File Input Format Counters
10:26:42 org.apache.hadoop.mapred.Counters log
Bytes Read=392
10:26:42 org.apache.hadoop.mapred.Counters log
Map-Reduce Framework
10:26:42 org.apache.hadoop.mapred.Counters log
Map output materialized bytes=162
10:26:42 org.apache.hadoop.mapred.Counters log
Map input records=7
10:26:42 org.apache.hadoop.mapred.Counters log
Reduce shuffle bytes=0
10:26:42 org.apache.hadoop.mapred.Counters log
Spilled Records=14
10:26:42 org.apache.hadoop.mapred.Counters log
Map output bytes=599
10:26:42 org.apache.hadoop.mapred.Counters log
Total committed heap usage (bytes)=
10:26:42 org.apache.hadoop.mapred.Counters log
SPLIT_RAW_BYTES=140
10:26:42 org.apache.hadoop.mapred.Counters log
Combine input records=25
10:26:42 org.apache.hadoop.mapred.Counters log
Reduce input records=7
10:26:42 org.apache.hadoop.mapred.Counters log
Reduce input groups=7
10:26:42 org.apache.hadoop.mapred.Counters log
Combine output records=7
10:26:42 org.apache.hadoop.mapred.Counters log
Reduce output records=7
10:26:42 org.apache.hadoop.mapred.Counters log
Map output records=25
10:26:42 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
10:26:42 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
10:26:42 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0007
10:26:42 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: io.sort.mb = 100
10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: data buffer = 14720
10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: record buffer = 680
10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
10:26:42 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0007_m_ is done. And is in the process of commiting
10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:42 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0007_m_' done.
10:26:42 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: io.sort.mb = 100
10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: data buffer = 14720
10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: record buffer = 680
10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
10:26:42 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0007_m_ is done. And is in the process of commiting
10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:42 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0007_m_' done.
10:26:42 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:42 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 2 sorted segments
10:26:42 org.apache.press.CodecPool getDecompressor
信息: Got brand-new decompressor
10:26:42 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 2 segments left of total size: 233 bytes
10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:42 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0007_r_ is done. And is in the process of commiting
10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:42 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0007_r_ is allowed to commit now
10:26:42 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0007_r_' to hdfs://192.168.1.210:9000/tmp/0/partialMultiply
10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce & reduce
10:26:42 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0007_r_' done.
10:26:43 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 100% reduce 100%
10:26:43 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0007
10:26:43 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
10:26:43 org.apache.hadoop.mapred.Counters log
File Output Format Counters
10:26:43 org.apache.hadoop.mapred.Counters log
Bytes Written=572
10:26:43 org.apache.hadoop.mapred.Counters log
FileSystemCounters
10:26:43 org.apache.hadoop.mapred.Counters log
FILE_BYTES_READ=
10:26:43 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_READ=8751
10:26:43 org.apache.hadoop.mapred.Counters log
FILE_BYTES_WRITTEN=
10:26:43 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_WRITTEN=7934
10:26:43 org.apache.hadoop.mapred.Counters log
File Input Format Counters
10:26:43 org.apache.hadoop.mapred.Counters log
Bytes Read=0
10:26:43 org.apache.hadoop.mapred.Counters log
Map-Reduce Framework
10:26:43 org.apache.hadoop.mapred.Counters log
Map output materialized bytes=241
10:26:43 org.apache.hadoop.mapred.Counters log
Map input records=12
10:26:43 org.apache.hadoop.mapred.Counters log
Reduce shuffle bytes=0
10:26:43 org.apache.hadoop.mapred.Counters log
Spilled Records=56
10:26:43 org.apache.hadoop.mapred.Counters log
Map output bytes=453
10:26:43 org.apache.hadoop.mapred.Counters log
Total committed heap usage (bytes)=
10:26:43 org.apache.hadoop.mapred.Counters log
SPLIT_RAW_BYTES=665
10:26:43 org.apache.hadoop.mapred.Counters log
Combine input records=0
10:26:43 org.apache.hadoop.mapred.Counters log
Reduce input records=28
10:26:43 org.apache.hadoop.mapred.Counters log
Reduce input groups=7
10:26:43 org.apache.hadoop.mapred.Counters log
Combine output records=0
10:26:43 org.apache.hadoop.mapred.Counters log
Reduce output records=7
10:26:43 org.apache.hadoop.mapred.Counters log
Map output records=28
10:26:43 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
10:26:43 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0008
10:26:43 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: io.sort.mb = 100
10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: data buffer = 14720
10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer
信息: record buffer = 680
10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
10:26:43 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0008_m_ is done. And is in the process of commiting
10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:43 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0008_m_' done.
10:26:43 org.apache.hadoop.mapred.Task initialize
Using ResourceCalculatorPlugin : null
10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:43 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
10:26:43 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 206 bytes
10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:43 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0008_r_ is done. And is in the process of commiting
10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
10:26:43 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0008_r_ is allowed to commit now
10:26:43 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0008_r_' to hdfs://192.168.1.210:9000/user/hdfs/userCF/result
10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce & reduce
10:26:43 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0008_r_' done.
10:26:44 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
map 100% reduce 100%
10:26:44 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0008
10:26:44 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
10:26:44 org.apache.hadoop.mapred.Counters log
File Output Format Counters
10:26:44 org.apache.hadoop.mapred.Counters log
Bytes Written=217
10:26:44 org.apache.hadoop.mapred.Counters log
FileSystemCounters
10:26:44 org.apache.hadoop.mapred.Counters log
FILE_BYTES_READ=
10:26:44 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_READ=7357
10:26:44 org.apache.hadoop.mapred.Counters log
FILE_BYTES_WRITTEN=
10:26:44 org.apache.hadoop.mapred.Counters log
HDFS_BYTES_WRITTEN=6269
10:26:44 org.apache.hadoop.mapred.Counters log
File Input Format Counters
10:26:44 org.apache.hadoop.mapred.Counters log
Bytes Read=572
10:26:44 org.apache.hadoop.mapred.Counters log
Map-Reduce Framework
10:26:44 org.apache.hadoop.mapred.Counters log
Map output materialized bytes=210
10:26:44 org.apache.hadoop.mapred.Counters log
Map input records=7
10:26:44 org.apache.hadoop.mapred.Counters log
Reduce shuffle bytes=0
10:26:44 org.apache.hadoop.mapred.Counters log
Spilled Records=42
10:26:44 org.apache.hadoop.mapred.Counters log
Map output bytes=927
10:26:44 org.apache.hadoop.mapred.Counters log
Total committed heap usage (bytes)=
10:26:44 org.apache.hadoop.mapred.Counters log
SPLIT_RAW_BYTES=137
10:26:44 org.apache.hadoop.mapred.Counters log
Combine input records=0
10:26:44 org.apache.hadoop.mapred.Counters log
Reduce input records=21
10:26:44 org.apache.hadoop.mapred.Counters log
Reduce input groups=5
10:26:44 org.apache.hadoop.mapred.Counters log
Combine output records=0
10:26:44 org.apache.hadoop.mapred.Counters log
Reduce output records=5
10:26:44 org.apache.hadoop.mapred.Counters log
Map output records=21
cat: hdfs://192.168.1.210:9000/user/hdfs/userCF/result//part-r-00000
1 [104:1.:1.:1.:0.]
2 [106:1.:1.:0.]
3 [103:1.:1.:1.1462644]
4 [102:1.:1.:0.]
5 [107:1.1993587]
5). 推荐结果解读
我们可以把上面的日志分解析成3个部分解读
a. 初始化环境
出初HDFS的数据目录和工作目录,并上传数据文件。
Delete: hdfs://192.168.1.210:9000/user/hdfs/userCF
Create: hdfs://192.168.1.210:9000/user/hdfs/userCF
copy from: datafile/item.csv to hdfs://192.168.1.210:9000/user/hdfs/userCF
ls: hdfs://192.168.1.210:9000/user/hdfs/userCF
==========================================================
name: hdfs://192.168.1.210:9000/user/hdfs/userCF/item.csv, folder: false, size: 229
==========================================================
cat: hdfs://192.168.1.210:9000/user/hdfs/userCF/item.csv
b. 算法执行
分别执行,上图中对应的8种MapReduce算法。
Job complete: job_local_0001
Job complete: job_local_0002
Job complete: job_local_0003
Job complete: job_local_0004
Job complete: job_local_0005
Job complete: job_local_0006
Job complete: job_local_0007
Job complete: job_local_0008
c. 打印推荐结果
方便我们看到计算后的推荐结果
cat: hdfs://192.168.1.210:9000/user/hdfs/userCF/result//part-r-00000
1 [104:1.:1.:1.:0.]
2 [106:1.:1.:0.]
3 [103:1.:1.:1.1462644]
4 [102:1.:1.:0.]
5 [107:1.1993587]
4. 模板项目上传github
大家可以下载这个项目,做为开发的起点。
~ git clone /bsspirit/maven_mahout_template
~ git checkout mahout-0.8
我们完成了基于物品的协同过滤分步式算法实现,下面将继续介绍Mahout的Kmeans的分步式算法实现,请参考文章:
转载请注明出处:
本文已收录于以下专栏:
相关文章推荐
阅读导读:
1.简述用Mahout实现协同过滤ItemCF的步骤?
2.如何用API实现Hadoop的各种HDFS命令?
3.Kmeans.java类报错,暂时可以怎么处理?
Mahout是Hadoop家族中一员,从血缘就继承了Hadoop程序的特点,支持HDFS访问和MapReduce分布式计算法。随着Mahout的发展,从0.7版本开始,Mahout做了重大的升...
阅读导读:
1.什么是聚类分析?
2.Mahout中的kmeans算法,默认的分融符是什么?
3.用kmeans算法得到的结果有什么特点?
聚类算法kmeans
  聚类分析是数据挖掘...
上文已经说明了用户的协同过滤,这篇也来谈谈基于物品的协同过滤。
$ mahout recommenditembased -s SIMILARITY_LOGLIKELIHOOD -i /us...
ItemCF:ItemCollaborationFilter,基于物品的协同过滤
算法核心思想:给用户推荐那些和他们之前喜欢的物品相似的物品。
比如,用户A之前买过《数据挖掘导论》,该算法会根据此...
关键字:python
1 第一阶段:
   该阶段包含三个job:
   (1)输入是原始数据,格式为: ,计算每个teamid对应的index,并保存index及其对应的teamid中的最小值,输出为。 
.csdn.net/heyutao007
Mahout支持2种 M/R 的jobs实现itemBase的协同过滤 
I.ItemSimilarityJob 
II.Recommend...
他的最新文章
讲师:王哲涵
讲师:王渊命
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)

我要回帖

更多关于 mahout实战 的文章

 

随机推荐