Scala109-跨集群读取hive

网友投稿 570 2022-09-02

Scala109-跨集群读取hive

Scala109-跨集群读取hive

实际工作中的场景,要在A集群部署任务,读取B集群hive的数据,存在A集群对应的表中。目前调研得出以下三种方式:

A集群直接读取B集群的hdfs数据(两个集群环境要通),对得到的DataFrame操作,写入A集群的hive中A集群上建sparksession时,加入B集群的参数,直接spark.sql读取数据,再写入集群A的hdfs中(此时不能再写入集群A的hive了)pyspark,通过pyhive这个库读取hive数据,转换成pyspark的dataframe,再建立tempview,通过spark.sql写入集群A的hive

比较合理的是第一种方式,简单易处理。

pyhive方式

环境问题不说了,需要安装pyhive库,需要python虚拟环境或者在集群每个节点安装pyhive库。简单提供代码demo。

from pyhive import hiveimport pandas as pddef get_hive_data(sql, host, port, database, username, password, auth='CUSTOM', ): conn = hive.connect(host=host, port=port, auth=auth, database=database, username=username, password=password) cursor = conn.cursor() cursor.execute(sql) columns = [col[0] for col in cursor.description] result = [dict(zip(columns, row)) for row in cursor.fetchall()] df = pd.DataFrame(result) # df.columns = [i.split(".")[1] for i in columns] return dfif __name__ == '__main__': spark = get_or_create('ReWrBtwDiffCluster') # host主机ip,port:端口号,username:用户名,database:使用的数据库名称 # conn = hive.Connection(host=host, port=10000, username=username, password=password, # database='test_db', auth="CUSTOM") sql = "SHOW DATABASES" result = get_hive_data(sql=sql,host=host, port=10000, username=username, password=password,database='test_db', auth="CUSTOM") print(result) spark.stop()

上面得到pandas的DataFrame转成spark的DataFrame,再插到对应hive表中就可以了

读B集群hdfs,写A集群hive

def extractHdfsData2Hive(spark: SparkSession, table: String, db: String, hdfsPath: String = null): Unit = { val data = spark.read.format("parquet").load(hdfsPath) data.show(truncate = false) println(s"start : insert data to $db.$table") insertTable(spark, dataDf = data, table, db, dt = dt,partitionCol=partitionCol) println(s"insert data to $db.$table success!") }

要注意hdfs的路径和端口,一般端口为8023

在A集群读B集群hive

这个操作不太实用,在A集群上直接读取B集群,然后再把数据写到A集群的hdfs上。简单列个demo

val hiveMetaServerAddr = "thrift://ip:9083"val builder = SparkSession .builder() .appName("hiveTest") .enableHiveSupport() .config("hive.exec.dynamic.partition", "true") .config("hive.exec.dynamic.partition.mode", "nonstrict") .config("spark.executor.heartbeatInterval", "60s") .config("spark-work.timeout", "120s") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.kryoserializer.buffer.max", "512m") .config("spark.dynamicAllocation.enabled", "false") .config("spark.sql.inMemoryColumnarStorage.compressed", "true") .config("spark.sql.inMemoryColumnarStorage.batchSize", 10000) .config("spark.sql.broadcastTimeout", 600) .config("spark.sql.autoBroadcastJoinThreshold", -1) .config("spark.sql.crossJoin.enabled", true) .config("spark.sql.files.maxRecordsPerFile", 20000) .config("hive.metastore.uris", hiveMetaServerAddr) val spark = builder.getOrCreate()

这里的ip应该是要求master节点的ip。集群的事儿不太懂,暂时先这样,后面了解了再补充。先解决需求。

Ref

2021-01-27 于南京市江宁区九龙湖

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:PythonNote024---argparse使用
下一篇:图解 SQL 里的各种 JOIN,看完不懂来找我!
相关文章

 发表评论

暂时没有评论,来抢沙发吧~