pysaprk_统计词频

网友投稿 687 2022-11-16

pysaprk_统计词频

pysaprk_统计词频

pysaprk_统计词频

Python 中使用行动操作对错误进行计数 并打印出来

#方法2需要复制这三行import findsparkfindspark.init()import pyspark import sysfrom pyspark import SparkConf, SparkContextif __name__ == "__main__":# master = "local" if len(sys.argv) == 2: master = sys.argv[1]# 在 Python 中初始化 Spark conf = SparkConf().setMaster("local").setAppName("My App") sc = SparkContext(conf = conf)# sc = SparkContext(master, "WordCount")# 创建 RDD # 最简单的方式就是把程序中一个已有的集合传给 SparkContext 的 parallelize()# 方法,如例 3-5 至例 3-7 所示。这种方式在学习 Spark 时非常有用,它让你可以在 shell 中# 快速创建出自己的 RDD,然后对这些 RDD 进行操作。不过,需要注意的是,除了开发原# 型和测试时,这种方式用得并不多,毕竟这种方式需要把你的整个数据集先放在一台机器# 的内存中# 通过sc.parallelize可以把Python list,NumPy array或者Pandas Series,Pandas DataFrame转成Spark的RDD数据。 lines = sc.parallelize(["pandas", "i like pandas"])# 可以看到输出结果,flatMap是先映射后扁平化。# countByValue的作用 统计一个RDD中各个value的出现次数。返回一个map,map的key是元素的值,value是出现的次数。 result = lines.flatMap(lambda x: x.split(" ")).countByValue()# for key, value in result.iteritems(): for key, value in result.items(): print ( "%s %i :",key, value)

Python 中使用行动操作对错误进行计数 并打印出来

#方法2需要复制这三行import findsparkfindspark.init()import pyspark import sysfrom pyspark import SparkContext# RDD 支持两种操作:转化操作和行动操作。RDD 的转化操作是返回一# 个新的 RDD 的操作,比如 map() 和 filter(),而行动操作则是向驱动器程序返回结果或# 把结果写入外部系统的操作,会触发实际的计算,比如 count() 和 first()。Spark 对待# 转化操作和行动操作的方式很不一样# 创建一个名为lines的RDDinputRDD = sc.textFile("C:/data/readme.txt")# C:/data/readme.txterrorsRDD = inputRDD.filter(lambda x: "error" in x)# 用 Python 进行 union() 转化操作errorsRDD = inputRDD.filter(lambda x: "error" in x)warningsRDD = inputRDD.filter(lambda x: "warning" in x)badLinesRDD = errorsRDD.union(warningsRDD)# ——————————————————————————————————以上为转化操作——————————————————————————————————————————————# 我们已经看到了如何通过转化操作从已有的 RDD 创建出新的 RDD,不过有时,我们希望# 对数据集进行实际的计算。行动操作是第二种类型的 RDD 操作,它们会把最终求得的结# 果返回到驱动器程序,或者写入外部存储系统中。由于行动操作需要生成实际的输出,它# 们会强制执行那些求值必须用到的 RDD 的转化操作。# sc.stop# name()返回rdd的名称# min()返回rdd中的最小值# sum()叠加rdd中所有元素# take(n)取rdd中前n个元素# count()返回rdd的元素个数# 打印所有的行badLinesRDD.collect()# badLinesRDD.take(2)# badLinesRDD.count() # # Python 中使用行动操作对错误进行计数 并打印出来print ("Input had : " + str(badLinesRDD.count()) + "concerning lines") # print "Here are 10 examples:"for line in badLinesRDD.take(10): print (line)# collect() 不能用在大规模数据集上 # 在这个例子中,我们在驱动器程序中使用 take() 获取了 RDD 中的少量元素。然后在本地# 遍历这些元素,并在驱动器端打印出来。RDD 还有一个 collect() 函数,可以用来获取整# 个 RDD 中的数据。如果你的程序把 RDD 筛选到一个很小的规模,并且你想在本地处理# 这些数据时,就可以使用它。记住,只有当你的整个数据集能在单台机器的内存中放得下# 时,才能使用 collect(),因此,collect() 不能用在大规模数据集上。# 在大多数情况下,RDD 不能通过 collect() 收集到驱动器进程中,因为它们一般都很大。# 此时,我们通常要把数据写到诸如 HDFS 或 Amazon S3 这样的分布式的存储系统中。你可# 以使用 saveAsTextFile()、saveAsSequenceFile(),或者任意的其他行动操作来把 RDD 的# 数据内容以各种自带的格式保存起来。我们会在第 5 章讲解导出数据的各种选项。# 需要注意的是,每当我们调用一个新的行动操作时,整个 RDD 都会从头开始计算。要避# 免这种低效的行为,用户可以将中间结果持久化,

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:pyspark_统计行数_过滤
下一篇:逻辑回归_通过正则化项减小方差
相关文章

 发表评论

暂时没有评论,来抢沙发吧~