小程序框架kbone与Hybird混合开发
1140
2022-09-01
Flink实战之StreamingFileSink如何写数据到其它HA的Hadoop集群
前言
我们公司使用的集群都是 EMR 集群,于是就分别创建了一个 flink 集群专门用户实时计算,一个 hadoop 集群专门用于 spark、hive 的离线计算。两个集群是完全隔离的。但是有一些实时数据的采集需求,需要把数据写入到我们做离线计算的集群,有人说我只需要在 StreamingFileSink 需要传入的hdfs 路径前加上离线集群的 ip 就好了,比如:hdfs://otherIp:/usr/hive/warehouse/ 这样固然能写入数据,但是我们的hadoop 集群都是 HA 的。namenode 切换的时候会导致写不进去数据,所以此方法不可行。本文主要提供 flink 写入其它 HA 集群的方法和思路
原因查找
如果我们直接通过指定 StreamingFileSink 的写入路径为其它 HA 的 Hadoop 集群时,比如:hdfs://HDFS42143/usr/hive/warehouse/hour_hive ,会出现这样的异常
大家都知道我们在创建 HA 集群时,需要指定一个 nameservice,这个 nameservice 可以是你喜欢的符号,然后还需要一些额外的 HA 配置。比如
dfs.client.failover.proxy.provider.HDFS42142=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProviderdfs.nameservices=HDFS42142dfs.namenode.rpc-address.HDFS42142.nn1=172.xx.xx.01:portdfs.namenode.rpc-address.HDFS42142.nn2=172.xx.xx.02:portdfs.ha.namenodes.HDFS42142=nn1,nn2
可是在 StreamingFileSink源码里面没有找到含有的 hadoop 配置的构造方法。怎么搞呢?我们可以先理解以下StreamingFileSink的写入原理
StreamingFileSink 源码剖析
一般我们创建 StreamingFileSink 都会使用以下方式
package com.tuya.sink;import com.tuya.AppArgs;import com.tuya.sink.filesystem.HdfsBucketAssigner;import com.tuya.sink.filesystem.MyRollingPolicy;import org.apache.flink.api.common.serialization.SimpleStringEncoder;import org.apache.flink.core.fs.Path;import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * desc: * * @author scx * @create 2019/10/24 */public class FileSinkFactory { private static final Logger LOG = LoggerFactory.getLogger(FileSinkFactory.class); /** * 多久检测一次process状态文件 */ private static final long CHECK_INTERVAL = 30 * 1000L; /** * 默认多久未写入的文件为超时(超时后会生成一个新的文件) */ private static final long DEFAULT_INACTIVITY_INTERVAL = 30 * 60L * 1000L; /** * 默认多久滚动生成一个hdfs文件 */ private static final long DEFAULT_ROLLOVER_INTERVAL = 40 * 60L * 1000L; /** * hdfs文件最大限制 128M */ private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L; public static
StreamingFileSink.forBulkFormat 和 StreamingFileSink.forRowFormat 分别表示行编码格式和块编码格式的写入。由于我使用的是行编码格式并且两者内部获取 hdfs 的 filesyStem 逻辑基本一致,就从 StreamingFileSink.forRowFormat 为入口分析。
在 StreamingFileSink 源码中重写了 initializeState 方法,该方法会在程序启动的时候调用一次
@Override public void initializeState(FunctionInitializationContext context) throws Exception { final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); this.buckets = bucketsBuilder.createBuckets(subtaskIndex); final OperatorStateStore stateStore = context.getOperatorStateStore(); bucketStates = stateStore.getListState(BUCKET_STATE_DESC); maxPartCountersState = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC); if (context.isRestored()) { buckets.initializeState(bucketStates, maxPartCountersState); } }
主要查看 这一行 this.buckets = bucketsBuilder.createBuckets(subtaskIndex) 使用bucketsBuilder根据task下表创建所有 bucket 的管理者 buckets
BucketsBuilder 有两个实现类,分别是BulkFormatBuilder、RowFormatBuilder. 查看RowFormatBuilder实现类
createBuckets 方法会返回一个 Buckets 对象,继续进入 Buckets 构造方法内
Buckets( final Path basePath, final BucketAssigner
在这里我们终于看到熟悉的 FileSystem 了,FileSystem.get(basePath.toUri()) 通过我们传入的 basePath 路径来获得一个 FileSystem,点进去
/** * Returns a reference to the {@link FileSystem} instance for accessing the * file system identified by the given {@link URI}. * * @param uri * the {@link URI} identifying the file system * @return a reference to the {@link FileSystem} instance for accessing the file system identified by the given * {@link URI}. * @throws IOException * thrown if a reference to the file system instance could not be obtained */ public static FileSystem get(URI uri) throws IOException { return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri)); }
FileSystemSafetyNet.wrapWithSafetyNetWhenActivated 方法封装了FileSystem 来防止未关闭流而导致的资源泄漏问题,不是我们观察的重点,进入getUnguardedFileSystem 方法。
public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOException { checkNotNull(fsUri, "file system URI"); LOCK.lock(); try { final URI uri; //判断我们的写入的路径有没有传入scheme,即:hdfs://,file://等前缀 if (fsUri.getScheme() != null) { //如果传入了scheme直接赋值给uri uri = fsUri; } //省略部分代码 final FSKey key = new FSKey(uri.getScheme(), uri.getAuthority()); // 先检查缓存 { FileSystem cached = CACHE.get(key); if (cached != null) { return cached; } } //如果FS_FACTORIES为空进行一下初始化加载 if (FS_FACTORIES.isEmpty()) { initialize(new Configuration()); } final FileSystem fs; final FileSystemFactory factory = FS_FACTORIES.get(uri.getScheme()); //如果fileSystem工厂存在,创建fileSystem if (factory != null) { fs = factory.create(uri); } else { try { //不存在使用失败重试的factory进行创建fileSystem fs = FALLBACK_FACTORY.create(uri); } catch (UnsupportedFileSystemSchemeException e) { throw new UnsupportedFileSystemSchemeException( "Could not find a file system implementation for scheme '" + uri.getScheme() + "'. The scheme is not directly supported by Flink and no Hadoop file " + "system to support this scheme could be loaded.", e); } } CACHE.put(key, fs); return fs; } finally { LOCK.unlock(); } }
上面代码简单进行了注释,首先判断fsUri 的 schema 是否存在,如果不存在或进行一些默认操作。我们配置的是hdfs://HDFS42143/usr/hive/warehouse/hour_hive ,scheme 为 hdfs,然后先检查缓存是否已经存在,存在的话直接返回。不存在的话判断 FS_FACTORIES 中是否存在,如果继续不存在就使用默认的FALLBACK_FACTORY 工厂创建 filesystem 。这里主要看 initialize 方法
public static void initialize(Configuration config) throws IOException, IllegalConfigurationException { LOCK.lock(); try { //省略部分代码 for (FileSystemFactory factory : RAW_FACTORIES) { factory.configure(config); String scheme = factory.getScheme(); FileSystemFactory fsf = ConnectionLimitingFactory.decorateIfLimited(factory, scheme, config); FS_FACTORIES.put(scheme, fsf); } // configure the default (fallback) factory FALLBACK_FACTORY.configure(config); //省略部分代码 } finally { LOCK.unlock(); } }
在 initialize 方法里我们可以看到遍历 RAW_FACTORIES 集合,首先调用configure 方法加载配置,然后把该集合内的 FileSystemFactory 实例以其所支持的 schema 为 key,本身对象为 value 放到FS_FACTORIES 的 map 中,下面还对 FALLBACK_FACTORY 进行了 configure 配置加载。需要注意的是initialize方法会在很多地方被调用,比如jobManager、taskManager启动的时候。 看到这里有两个疑问,RAW_FACTORIES 和 FALLBACK_FACTORY 分别是在哪里创建的 首先看 RAW_FACTORIES
/** All available file system factories. */ private static final List
RAW_FACTORIES 是创建的静态变量,然后从静态方法 loadFileSystems 加载。需要注意的是,loadFileSystems 方法中首先会加一个默认的 factory 即LocalFileSystemFactory。然后其它的 factory使用ServiceLoader.load(FileSystemFactory.class) 通过 SPI 获取,但是我并没有发现Flink源码在 src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory 目录下配置FileSystemFactory的实现类,也就是说这些需要我们自己配置,如果不配置,那么默认情况下 SPI 获得的 FileSystemFactory 是为空的。也就是说RAW_FACTORIES 只有一个 LocalFileSystemFactory 实例。而LocalFileSystemFactory 的 scheme 为 file。
然后看 FALLBACK_FACTORY
private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory(); private static FileSystemFactory loadHadoopFsFactory() { final ClassLoader cl = FileSystem.class.getClassLoader(); // first, see if the Flink runtime classes are available final Class extends FileSystemFactory> factoryClass; try { factoryClass = Class .forName("org.apache.flink.runtime.fs.hdfs.HadoopFsFactory", false, cl) .asSubclass(FileSystemFactory.class); } catch (ClassNotFoundException e) { LOG.info("No Flink runtime dependency present. " + "The extended set of supported File Systems via Hadoop is not available."); return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies."); } catch (Exception | LinkageError e) { LOG.warn("Flink's Hadoop file system factory could not be loaded", e); return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be loaded", e); } // check (for eager and better exception messages) if the Hadoop classes are available here try { Class.forName("org.apache.hadoop.conf.Configuration", false, cl); Class.forName("org.apache.hadoop.fs.FileSystem", false, cl); } catch (ClassNotFoundException e) { LOG.info("Hadoop is not in the classpath/dependencies. " + "The extended set of supported File Systems via Hadoop is not available."); return new UnsupportedSchemeFactory("Hadoop is not in the classpath/dependencies."); } // Create the factory. try { return factoryClass.newInstance(); } catch (Exception | LinkageError e) { LOG.warn("Flink's Hadoop file system factory could not be created", e); return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be created", e); } }
这里就很简单了,直接通过反射的方式新建 org.apache.flink.runtime.fs.hdfs.HadoopFsFactory 实例,也就是说 FALLBACK_FACTORY 的值为 HadoopFsFactory,scheme 为 *
到这里我们先总结一下
我们的HA 集群的路径为hdfs://HDFS42143/usr/hive/warehouse/hour_hive,scheme 为hdfs我们对HA 集群的CRUD 操作的FileSystem 是在FileSystem.getUnguardedFileSystem方法中获得的RAW_FACTORIES 集合内的factory都是通过JavaSPI的方式加载的,并且只有一个实例LocalFileSystemFactory,支持的scheme为fileFALLBACK_FACTORY 的值为HadoopFsFactory,支持的scheme 为*
我们的 HA 集群就是使用 FALLBACK_FACTORY创建的 fileSystem。在上面的 initialize(Configuration config) 方法中会执行 FALLBACK_FACTORY.configure(config);
进入 HadoopFsFactory
public class HadoopFsFactory implements FileSystemFactory { private static final Logger LOG = LoggerFactory.getLogger(HadoopFsFactory.class); /** Flink's configuration object. */ private Configuration flinkConfig; /** Hadoop's configuration for the file systems. */ private org.apache.hadoop.conf.Configuration hadoopConfig; @Override public String getScheme() { // the hadoop factory creates various schemes return "*"; } @Override public void configure(Configuration config) { flinkConfig = config; hadoopConfig = null; // reset the Hadoop Config } @Override public FileSystem create(URI fsUri) throws IOException { checkNotNull(fsUri, "fsUri"); final String scheme = fsUri.getScheme(); checkArgument(scheme != null, "file system has null scheme"); // from here on, we need to handle errors due to missing optional // dependency classes try { // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath) final org.apache.hadoop.conf.Configuration hadoopConfig; if (this.hadoopConfig != null) { hadoopConfig = this.hadoopConfig; } else if (flinkConfig != null) { hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); this.hadoopConfig = hadoopConfig; } else { LOG.warn("Hadoop configuration has not been explicitly initialized prior to loading a Hadoop file system." + " Using configuration from the classpath."); hadoopConfig = new org.apache.hadoop.conf.Configuration(); } // -- (2) get the Hadoop file system class for that scheme final Class extends org.apache.hadoop.fs.FileSystem> fsClass; try { fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConfig); } catch (IOException e) { throw new UnsupportedFileSystemSchemeException( "Hadoop File System abstraction does not support scheme '" + scheme + "'. " + "Either no file system implementation exists for that scheme, " + "or the relevant classes are missing from the classpath.", e); } // -- (3) instantiate the Hadoop file system LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", scheme, fsClass.getName()); final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance(); // -- (4) create the proper URI to initialize the file system final URI initUri; if (fsUri.getAuthority() != null) { initUri = fsUri; } else { LOG.debug("URI {} does not specify file system authority, trying to load default authority (fs.defaultFS)"); String configEntry = hadoopConfig.get("fs.defaultFS", null); if (configEntry == null) { // fs.default.name deprecated as of hadoop 2.2.0 - see // configEntry = hadoopConfig.get("fs.default.name", null); } if (LOG.isDebugEnabled()) { LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry); } if (configEntry == null) { throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + "Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS')."); } else { try { initUri = URI.create(configEntry); } catch (IllegalArgumentException e) { throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + "The configuration contains an invalid file system default name " + "('fs.default.name' or 'fs.defaultFS'): " + configEntry); } if (initUri.getAuthority() == null) { throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + "Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') " + "contains no valid authority component (like hdfs namenode, S3 host, etc)"); } } } // -- (5) configure the Hadoop file system try { hadoopFs.initialize(initUri, hadoopConfig); } catch (UnknownHostException e) { String message = "The Hadoop file system's authority (" + initUri.getAuthority() + "), specified by either the file URI or the configuration, cannot be resolved."; throw new IOException(message, e); } HadoopFileSystem fs = new HadoopFileSystem(hadoopFs); // create the Flink file system, optionally limiting the open connections if (flinkConfig != null) { return limitIfConfigured(fs, scheme, flinkConfig); } else { return fs; } } catch (ReflectiveOperationException | LinkageError e) { throw new UnsupportedFileSystemSchemeException("Cannot support file system for '" + fsUri.getScheme() + "' via Hadoop, because Hadoop is not in the classpath, or some classes " + "are missing from the classpath.", e); } catch (IOException e) { throw e; } catch (Exception e) { throw new IOException("Cannot instantiate file system for URI: " + fsUri, e); } }}
代码都比较简单,在
@Override public void configure(Configuration config) { flinkConfig = config; hadoopConfig = null; // reset the Hadoop Config }
我们可以看到 hadoopConfig 会被设置为 null,然后把配置赋值给 flinkConfig 然后在
final org.apache.hadoop.conf.Configuration hadoopConfig; if (this.hadoopConfig != null) { hadoopConfig = this.hadoopConfig; } else if (flinkConfig != null) { hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); this.hadoopConfig = hadoopConfig; } else { LOG.warn("Hadoop configuration has not been explicitly initialized prior to loading a Hadoop file system." + " Using configuration from the classpath."); hadoopConfig = new org.apache.hadoop.conf.Configuration(); }
hadoopConfig 通过 HadoopUtils.getHadoopConfiguration 方法从 flinkConfig 解析。 在最下面通过反射的方式创建 HadoopFileSystem 我们进入 HadoopUtils.getHadoopConfiguration 方法
public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { Configuration result = new HdfsConfiguration(); boolean foundHadoopConfiguration = false; //从flink配置中读取fs.hdfs.hdfsdefault配置 final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null); //加载 fs.hdfs.hdfsdefault 路径的资源 if (hdfsDefaultPath != null) { result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); LOG.debug("Using hdfs-default configuration-file path form Flink config: {}", hdfsDefaultPath); foundHadoopConfiguration = true; } else { LOG.debug("Cannot find hdfs-default configuration-file path in Flink config."); } //从flink配置中获取fs.hdfs.hdfssite的值 final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); //加载 fs.hdfs.hdfssite 路径的资源 if (hdfsSitePath != null) { result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); LOG.debug("Using hdfs-site configuration-file path form Flink config: {}", hdfsSitePath); foundHadoopConfiguration = true; } else { LOG.debug("Cannot find hdfs-site configuration-file path in Flink config."); } String[] possibleHadoopConfPaths = new String[4]; //从flink配置中获取fs.hdfs.hadoopconf的配置路径 possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); //从环境变量中获取HADOOP_CONF_DIR的路径 possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); //从环境变量中获取HADOOP_HOME的路径 final String hadoopHome = System.getenv("HADOOP_HOME"); if (hadoopHome != null) { possibleHadoopConfPaths[2] = hadoopHome + "/conf"; possibleHadoopConfPaths[3] = hadoopHome + "/etc/hadoop"; // hadoop 2.2 } //从这些可能的路径中加载hadoop配置资源 for (String possibleHadoopConfPath : possibleHadoopConfPaths) { if (possibleHadoopConfPath != null) { if (new File(possibleHadoopConfPath).exists()) { if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml")); LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration"); foundHadoopConfiguration = true; } if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) { result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml")); LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration"); foundHadoopConfiguration = true; } } } } if (!foundHadoopConfiguration) { LOG.debug("Could not find Hadoop configuration via any of the supported methods " + "(Flink configuration, environment variables)."); } return result; }
代码已经加了简单的注释,通过这段代码我们可以发现 hadoop 配置都是通过 flink 配置以及环境变量中获得。而我们的flink集群的环境变量中是不可能存在我们另外一个 hadoop 集群的配置的。所以此时我们有两种解决办法。
解决办法
第二种: 这种方法为新建一个 CustomerHadoopFsFactory 类,该类在从 flink 配置中获取 hadoop 配置之后添加自己的 HA 集群的配置,在上面的分析我们知道所有的factory实例都是通过 SPI 获得,所以需要自己在src/main/resources/META-INF/services/ 目录新建org.apache.flink.core.fs.FileSystemFactory 文件,文件内容为自定义CustomerHadoopFsFactory 类的权限定名,该类比较简单。并且适用于所有 flink 集群的所有机器。
总结
以上两种方法都能够解决 StreamingFileSink 如何写数据到其它 HA 的Hadoop 集群的问题。第一种不太灵活,需要把另外一个集群的配置文件移到flink集群,并且每台机器都要配置。第二种就比较灵活了,配置可以通过flink启动传入,或者放到 properties 文件中,自己读取加载。我使用的第二种方式,为了避免有些人不懂如何创建,怎么放置,附上我的使用方式截图
关注我,随时获取最新文章
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~