轻量级前端框架助力开发者提升项目效率与性能
717
2022-11-26
Flume+HBase+Kafka集成与开发
先把flume1.7的源码包-
和节点3上去
我们对导入的flume源码进行二次开发
我们不要动他原来的,我们新建一个类
然后把这个类的内容拷过来然后修改文件名和类名
package org.apache.flume.sink.hbase;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */import com.google.common.base.Charsets;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.FlumeException;import org.apache.flume.conf.ComponentConfiguration;import org.hbase.async.AtomicIncrementRequest;import org.hbase.async.PutRequest;import java.util.ArrayList;import java.util.List;//package org.apache.flume.sink.hbase;import com.google.common.base.Charsets;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.FlumeException;import org.apache.flume.conf.ComponentConfiguration;import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;import org.hbase.async.AtomicIncrementRequest;import org.hbase.async.PutRequest;import java.util.ArrayList;import java.util.List;/** * A simple serializer to be used with the AsyncHBaseSink * that returns puts from an event, by writing the event * body into it. The headers are discarded. It also updates a row in hbase * which acts as an event counter. * * Takes optional parameters:
* rowPrefix: The prefix to be used. Default: default
* incrementRow The row to increment. Default: incRow
* suffix: uuid/random/timestamp.Default: uuid
* * Mandatory parameters:
* cf:Column family.
* Components that have no defaults and will not be used if absent: * payloadColumn: Which column to put payload in. If it is not present, * event data will not be written.
* incrementColumn: Which column to increment. If this is absent, it * means no column is incremented. */public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer { private byte[] table; private byte[] cf; private byte[] payload; private byte[] payloadColumn; private byte[] incrementColumn; private String rowPrefix; private byte[] incrementRow; private SimpleHbaseEventSerializer.KeyType keyType; @Override public void initialize(byte[] table, byte[] cf) { this.table = table; this.cf = cf; } @Override public List
在原来基础上稍微做修改
按住ctrl键单机鼠标进去
添加以下内容
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.flume.sink.hbase;import java.io.UnsupportedEncodingException;import java.util.Random;import java.util.UUID;/** * Utility class for users to generate their own keys. Any key can be used, * this is just a utility that provides a set of simple keys. */public class SimpleRowKeyGenerator { public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException { return (prefix + UUID.randomUUID().toString()).getBytes("UTF8"); } public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException { return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8"); } public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException { return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8"); } public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException { return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8"); } public static byte[] getKfkRowKey(String userid,String datetime) throws UnsupportedEncodingException { return (userid + datetime + String.valueOf(System.currentTimeMillis())).getBytes("UTF8"); } }
继续修改,修改后的代码是下面的
KfkAsyncHbaseEventSerializer.java
package org.apache.flume.sink.hbase;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */import com.google.common.base.Charsets;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.FlumeException;import org.apache.flume.conf.ComponentConfiguration;import org.hbase.async.AtomicIncrementRequest;import org.hbase.async.PutRequest;import java.util.ArrayList;import java.util.List;//package org.apache.flume.sink.hbase;import com.google.common.base.Charsets;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.FlumeException;import org.apache.flume.conf.ComponentConfiguration;import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;import org.hbase.async.AtomicIncrementRequest;import org.hbase.async.PutRequest;import java.util.ArrayList;import java.util.List;/** * A simple serializer to be used with the AsyncHBaseSink * that returns puts from an event, by writing the event * body into it. The headers are discarded. It also updates a row in hbase * which acts as an event counter. * * Takes optional parameters:
* rowPrefix: The prefix to be used. Default: default
* incrementRow The row to increment. Default: incRow
* suffix: uuid/random/timestamp.Default: uuid
* * Mandatory parameters:
* cf:Column family.
* Components that have no defaults and will not be used if absent: * payloadColumn: Which column to put payload in. If it is not present, * event data will not be written.
* incrementColumn: Which column to increment. If this is absent, it * means no column is incremented. */public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer { private byte[] table; private byte[] cf; private byte[] payload; private byte[] payloadColumn; private byte[] incrementColumn; private String rowPrefix; private byte[] incrementRow; private SimpleHbaseEventSerializer.KeyType keyType; @Override public void initialize(byte[] table, byte[] cf) { this.table = table; this.cf = cf; } @Override public List
现在把代码打包
我们可以看到有很多相关的依赖包,我们把不需要的删掉
打好的架包在本地的工程路径的这里
现在把这个架包上传到flume的lib目录下
也就是这个目录。
可以看到上传日期,就是今天上传的
下面配置flume + kafka
agent1.sources = r1agent1.channels = kafkaC hbaseCagent1.sinks=kafkaSink hbaseSink#***********flume + hbase************agent1.sources.r1.type = avroagent1.sources.r1.channels = hbaseCagent1.sources.r1.bind = bigdata-pro01.kfk.comagent1.sources.r1.port=5555agent1.sources.r1.threads=5agent1.channels.hbaseC.type = memoryagent1.channels.hbaseC.capacity = 100000agent1.channels.hbaseC.transactionCapacity = 100000agent1.channels.hbaseC.keep-alive=20agent1.sinks.hbaseSink.type = asynchbaseagent1.sinks.hbaseSink.table=weblogsagent1.sinks.hbaseSink.columnFamily=infoagent1.sinks.hbaseSink.serializer= org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer agent1.sinks.hbaseSink.channel = hbaseCagent1.sinks.hbaseSink.serializer.payloadColumn=datatime,userid,searchname,retorder,cliorder,cliurl#**************flume + kafka***************agent1.channels.kafkaC.type = memoryagent1.channels.kafkaC.capacity = 100000agent1.channels.kafkaC.transactionCapacity = 100000agent1.channels.kafkaC.keep-alive=20agent1.sinks.kafkaSink.channel = kafkaCagent1.sinks.kafkaSink.type= org.apache.flume.sink.kafka.KafkaSinkagent1.sinks.kafkaSink.kafka.brokerList=bigdata-pro01.kfk.com:9092,bigdata-pro02.kfk.com:9092,bigdata-pro03.kfk.com:9092agent1.sinks.kafkaSink-ic=testagent1.sinks.kafkaSink.zookeeperConnect=bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181agent1.sinks.kafkaSink.requiredAcks=1agent1.sinks.kafkaSink.batchSize=1agent1.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~