智慧屏 安装 app如何提升家庭娱乐与教育体验的关键工具
688
2022-11-26
flume使用hive stream写入到hive
1、hive中创建表:
create table customers (id string, name string, email string, street_address string, company string)partitioned by (time string)clustered by (id) into 5 buckets stored as orclocation '/user/iteblog/salescust'TBLPROPERTIES ('transactional'='true');
注意:采用orc存储,同时支持clustered
为了在Hive中启用事务,我们需要在Hive中进行如下的配置:
hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
2、配置flume:
$ vi flumetohive.confflumeagent1.sources = source_from_kafkaflumeagent1.channels = mem_channelflumeagent1.sinks = hive_sink# Define / Configure sourceflumeagent1.sources.source_from_kafka.type = org.apache.flume.source.kafka.KafkaSourceflumeagent1.sources.source_from_kafka.zookeeperConnect = sandbox.hortonworks.com:2181flumeagent1.sources.source_from_kafka-ic = SalesDBTransactionsflumeagent1.sources.source_from_kafka.groupID = flumeflumeagent1.sources.source_from_kafka.channels = mem_channelflumeagent1.sources.source_from_kafka.interceptors = i1flumeagent1.sources.source_from_kafka.interceptors.i1.type = timestampflumeagent1.sources.source_from_kafka.consumer.timeout.ms = 1000 # Hive Sinkflumeagent1.sinks.hive_sink.type = hiveflumeagent1.sinks.hive_sink.hive.metastore = thrift://sandbox.hortonworks.com:9083flumeagent1.sinks.hive_sink.hive.database = rajflumeagent1.sinks.hive_sink.hive.table = customersflumeagent1.sinks.hive_sink.hive.txnsPerBatchAsk = 2flumeagent1.sinks.hive_sink.hive.partition = %y-%m-%d-%H-%Mflumeagent1.sinks.hive_sink.batchSize = 10flumeagent1.sinks.hive_sink.serializer = DELIMITEDflumeagent1.sinks.hive_sink.serializer.delimiter = ,flumeagent1.sinks.hive_sink.serializer.fieldnames = id,name,email,street_address,company# Use a channel which buffers events in memoryflumeagent1.channels.mem_channel.type = memoryflumeagent1.channels.mem_channel.capacity = 10000flumeagent1.channels.mem_channel.transactionCapacity = 100# Bind the source and sink to the channelflumeagent1.sources.source_from_kafka.channels = mem_channelflumeagent1.sinks.hive_sink.channel = mem_channel
1)source使用的是kafka;
2)sink使用的是hive,这里没有使用hdfs+hive创建外表的方式,主要是因为flume的hive sink内部使用了hive stream来做的orc文件追加,好处是文件小而且效率高。
参考:https://iteblog.com/archives/1771.html
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~