from pyspark import SparkConf, SparkContext
def myprint(one):
print(one)
def top2list(one):
website = one[0]
locals = one[1]
localdic = {}
for local in locals:
if local in localdic:
localdic[local] += 1
else:
localdic[local] = 1
site_locallist = sorted(localdic.items(), key=lambda tp: tp[1], reverse=True)
returnlist = []
if (len(site_locallist) > 2):
for i in range(2):
returnlist.append(site_locallist[i])
else:
returnlist = site_locallist
return website, returnlist
def get_site_uid_count(one):
uid = one[0]
sites = one[1]
siteDic = {}
for site in sites:
if site in siteDic:
siteDic[site] += 1
else:
siteDic[site] = 1
returnlist = []
for site, count in siteDic.items():
returnlist.append((site, (uid, count)))
return returnlist
def top3list(one):
website = one[0]
uid_count_itr = one[1]
top3lists = ['', '', '']
for uid_count in uid_count_itr:
count = uid_count[1]
for i in range(len(top3lists)):
if top3lists[i] == "":
top3lists[i] = uid_count
break
elif top3lists[i][1] < count://运行时总是提示这行,str与int之间不能比较,打印type(top3lists[i][1])为int)
for j in range(2, i, -1):
top3lists[j] = top3lists[j - 1]
top3lists = uid_count
break
return website, top3lists
def getApv():
conf = SparkConf()
conf.setMaster("local")
conf.setAppName("pv")
sc = SparkContext(conf=conf)
linesRDD = sc.textFile("D:\mypython\date\website")
# linesRDD.map(lambda line:(line.split("\t")[2],1)).reduceByKey(lambda v1,v2:v1+v2).\
# sortBy(lambda tp:tp[1],ascending=False).foreach(lambda one:myprint(one=one))
# 每个网站访问最多的top2地区
# site_local = linesRDD.map(lambda line:(line.split("\t")[4],line.split("\t")[2])).groupByKey()
# site_local.map(lambda one:top2list(one)).foreach(print)
# 每个网站最活跃的用户top3
linesRDD.map(lambda line: (line.split("\t")[3], line.split("\t")[4])).groupByKey() \
.flatMap(lambda one: get_site_uid_count(one)).groupByKey().map(lambda one: top3list(one)).foreach(print)
if __name__ == "__main__":
getApv()