Flume+HBase+Kafka集成与开发

网友投稿 639 2022-11-26

Flume+HBase+Kafka集成与开发

Flume+HBase+Kafka集成与开发

先把flume1.7的源码包-

​​和节点3上去

我们对导入的flume源码进行二次开发

我们不要动他原来的,我们新建一个类

然后把这个类的内容拷过来然后修改文件名和类名

package org.apache.flume.sink.hbase;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */import com.google.common.base.Charsets;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.FlumeException;import org.apache.flume.conf.ComponentConfiguration;import org.hbase.async.AtomicIncrementRequest;import org.hbase.async.PutRequest;import java.util.ArrayList;import java.util.List;//package org.apache.flume.sink.hbase;import com.google.common.base.Charsets;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.FlumeException;import org.apache.flume.conf.ComponentConfiguration;import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;import org.hbase.async.AtomicIncrementRequest;import org.hbase.async.PutRequest;import java.util.ArrayList;import java.util.List;/** * A simple serializer to be used with the AsyncHBaseSink * that returns puts from an event, by writing the event * body into it. The headers are discarded. It also updates a row in hbase * which acts as an event counter. * * Takes optional parameters:

* rowPrefix: The prefix to be used. Default: default

* incrementRow The row to increment. Default: incRow

* suffix: uuid/random/timestamp.Default: uuid

* * Mandatory parameters:

* cf:Column family.

* Components that have no defaults and will not be used if absent: * payloadColumn: Which column to put payload in. If it is not present, * event data will not be written.

* incrementColumn: Which column to increment. If this is absent, it * means no column is incremented. */public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer { private byte[] table; private byte[] cf; private byte[] payload; private byte[] payloadColumn; private byte[] incrementColumn; private String rowPrefix; private byte[] incrementRow; private SimpleHbaseEventSerializer.KeyType keyType; @Override public void initialize(byte[] table, byte[] cf) { this.table = table; this.cf = cf; } @Override public List getActions() { List actions = new ArrayList(); if (payloadColumn != null) { byte[] rowKey; try { switch (keyType) { case TS: rowKey = SimpleRowKeyGenerator.getTimestampKey(rowPrefix); break; case TSNANO: rowKey = SimpleRowKeyGenerator.getNanoTimestampKey(rowPrefix); break; case RANDOM: rowKey = SimpleRowKeyGenerator.getRandomKey(rowPrefix); break; default: rowKey = SimpleRowKeyGenerator.getUUIDKey(rowPrefix); break; } PutRequest putRequest = new PutRequest(table, rowKey, cf, payloadColumn, payload); actions.add(putRequest); } catch (Exception e) { throw new FlumeException("Could not get row key!", e); } } return actions; } public List getIncrements() { List actions = new ArrayList(); if (incrementColumn != null) { AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn); actions.add(inc); } return actions; } @Override public void cleanUp() { // TODO Auto-generated method stub } @Override public void configure(Context context) { String pCol = context.getString("payloadColumn", "pCol"); String iCol = context.getString("incrementColumn", "iCol"); rowPrefix = context.getString("rowPrefix", "default"); String suffix = context.getString("suffix", "uuid"); if (pCol != null && !pCol.isEmpty()) { if (suffix.equals("timestamp")) { keyType = SimpleHbaseEventSerializer.KeyType.TS; } else if (suffix.equals("random")) { keyType = SimpleHbaseEventSerializer.KeyType.RANDOM; } else if (suffix.equals("nano")) { keyType = SimpleHbaseEventSerializer.KeyType.TSNANO; } else { keyType = SimpleHbaseEventSerializer.KeyType.UUID; } payloadColumn = pCol.getBytes(Charsets.UTF_8); } if (iCol != null && !iCol.isEmpty()) { incrementColumn = iCol.getBytes(Charsets.UTF_8); } incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); } @Override public void setEvent(Event event) { this.payload = event.getBody(); } @Override public void configure(ComponentConfiguration conf) { // TODO Auto-generated method stub }}

在原来基础上稍微做修改

按住ctrl键单机鼠标进去

添加以下内容

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */package org.apache.flume.sink.hbase;import java.io.UnsupportedEncodingException;import java.util.Random;import java.util.UUID;/** * Utility class for users to generate their own keys. Any key can be used, * this is just a utility that provides a set of simple keys. */public class SimpleRowKeyGenerator { public static byte[] getUUIDKey(String prefix) throws UnsupportedEncodingException { return (prefix + UUID.randomUUID().toString()).getBytes("UTF8"); } public static byte[] getRandomKey(String prefix) throws UnsupportedEncodingException { return (prefix + String.valueOf(new Random().nextLong())).getBytes("UTF8"); } public static byte[] getTimestampKey(String prefix) throws UnsupportedEncodingException { return (prefix + String.valueOf(System.currentTimeMillis())).getBytes("UTF8"); } public static byte[] getNanoTimestampKey(String prefix) throws UnsupportedEncodingException { return (prefix + String.valueOf(System.nanoTime())).getBytes("UTF8"); } public static byte[] getKfkRowKey(String userid,String datetime) throws UnsupportedEncodingException { return (userid + datetime + String.valueOf(System.currentTimeMillis())).getBytes("UTF8"); } }

继续修改,修改后的代码是下面的

KfkAsyncHbaseEventSerializer.java

package org.apache.flume.sink.hbase;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */import com.google.common.base.Charsets;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.FlumeException;import org.apache.flume.conf.ComponentConfiguration;import org.hbase.async.AtomicIncrementRequest;import org.hbase.async.PutRequest;import java.util.ArrayList;import java.util.List;//package org.apache.flume.sink.hbase;import com.google.common.base.Charsets;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.FlumeException;import org.apache.flume.conf.ComponentConfiguration;import org.apache.flume.sink.hbase.SimpleHbaseEventSerializer.KeyType;import org.hbase.async.AtomicIncrementRequest;import org.hbase.async.PutRequest;import java.util.ArrayList;import java.util.List;/** * A simple serializer to be used with the AsyncHBaseSink * that returns puts from an event, by writing the event * body into it. The headers are discarded. It also updates a row in hbase * which acts as an event counter. * * Takes optional parameters:

* rowPrefix: The prefix to be used. Default: default

* incrementRow The row to increment. Default: incRow

* suffix: uuid/random/timestamp.Default: uuid

* * Mandatory parameters:

* cf:Column family.

* Components that have no defaults and will not be used if absent: * payloadColumn: Which column to put payload in. If it is not present, * event data will not be written.

* incrementColumn: Which column to increment. If this is absent, it * means no column is incremented. */public class KfkAsyncHbaseEventSerializer implements AsyncHbaseEventSerializer { private byte[] table; private byte[] cf; private byte[] payload; private byte[] payloadColumn; private byte[] incrementColumn; private String rowPrefix; private byte[] incrementRow; private SimpleHbaseEventSerializer.KeyType keyType; @Override public void initialize(byte[] table, byte[] cf) { this.table = table; this.cf = cf; } @Override public List getActions() { List actions = new ArrayList(); if (payloadColumn != null) { byte[] rowKey; try { String [] columns =String.valueOf(payloadColumn).split(","); String [] values =String.valueOf(this.payload).split(","); for(int i=0;i getIncrements() { List actions = new ArrayList(); if (incrementColumn != null) { AtomicIncrementRequest inc = new AtomicIncrementRequest(table, incrementRow, cf, incrementColumn); actions.add(inc); } return actions; } @Override public void cleanUp() { // TODO Auto-generated method stub } @Override public void configure(Context context) { String pCol = context.getString("payloadColumn", "pCol"); String iCol = context.getString("incrementColumn", "iCol"); rowPrefix = context.getString("rowPrefix", "default"); String suffix = context.getString("suffix", "uuid"); if (pCol != null && !pCol.isEmpty()) { if (suffix.equals("timestamp")) { keyType = SimpleHbaseEventSerializer.KeyType.TS; } else if (suffix.equals("random")) { keyType = SimpleHbaseEventSerializer.KeyType.RANDOM; } else if (suffix.equals("nano")) { keyType = SimpleHbaseEventSerializer.KeyType.TSNANO; } else { keyType = SimpleHbaseEventSerializer.KeyType.UUID; } payloadColumn = pCol.getBytes(Charsets.UTF_8); } if (iCol != null && !iCol.isEmpty()) { incrementColumn = iCol.getBytes(Charsets.UTF_8); } incrementRow = context.getString("incrementRow", "incRow").getBytes(Charsets.UTF_8); } @Override public void setEvent(Event event) { this.payload = event.getBody(); } @Override public void configure(ComponentConfiguration conf) { // TODO Auto-generated method stub }}

现在把代码打包

我们可以看到有很多相关的依赖包,我们把不需要的删掉

打好的架包在本地的工程路径的这里

现在把这个架包上传到flume的lib目录下

也就是这个目录。

可以看到上传日期,就是今天上传的

下面配置flume + kafka

agent1.sources = r1agent1.channels = kafkaC hbaseCagent1.sinks=kafkaSink hbaseSink#***********flume + hbase************agent1.sources.r1.type = avroagent1.sources.r1.channels = hbaseCagent1.sources.r1.bind = bigdata-pro01.kfk.comagent1.sources.r1.port=5555agent1.sources.r1.threads=5agent1.channels.hbaseC.type = memoryagent1.channels.hbaseC.capacity = 100000agent1.channels.hbaseC.transactionCapacity = 100000agent1.channels.hbaseC.keep-alive=20agent1.sinks.hbaseSink.type = asynchbaseagent1.sinks.hbaseSink.table=weblogsagent1.sinks.hbaseSink.columnFamily=infoagent1.sinks.hbaseSink.serializer= org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer agent1.sinks.hbaseSink.channel = hbaseCagent1.sinks.hbaseSink.serializer.payloadColumn=datatime,userid,searchname,retorder,cliorder,cliurl#**************flume + kafka***************agent1.channels.kafkaC.type = memoryagent1.channels.kafkaC.capacity = 100000agent1.channels.kafkaC.transactionCapacity = 100000agent1.channels.kafkaC.keep-alive=20agent1.sinks.kafkaSink.channel = kafkaCagent1.sinks.kafkaSink.type= org.apache.flume.sink.kafka.KafkaSinkagent1.sinks.kafkaSink.kafka.brokerList=bigdata-pro01.kfk.com:9092,bigdata-pro02.kfk.com:9092,bigdata-pro03.kfk.com:9092agent1.sinks.kafkaSink-ic=testagent1.sinks.kafkaSink.zookeeperConnect=bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181agent1.sinks.kafkaSink.requiredAcks=1agent1.sinks.kafkaSink.batchSize=1agent1.sinks.kafkaSink.serializer.class=kafka.serializer.StringEncoder

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:nginx-1.8.1的安装
下一篇:如何在maven项目里面编写mapreduce程序以及一个maven项目里面管理多个mapreduce程序
相关文章

 发表评论

暂时没有评论,来抢沙发吧~