Flink实战之StreamingFileSink如何写数据到其它HA的Hadoop集群

网友投稿 1140 2022-09-01

Flink实战之StreamingFileSink如何写数据到其它HA的Hadoop集群

Flink实战之StreamingFileSink如何写数据到其它HA的Hadoop集群

前言

我们公司使用的集群都是 ​​EMR​​​ 集群,于是就分别创建了一个 ​​flink​​​ 集群专门用户实时计算,一个 ​​hadoop​​​ 集群专门用于 ​​spark​​​、​​hive​​​ 的离线计算。两个集群是完全隔离的。但是有一些实时数据的采集需求,需要把数据写入到我们做离线计算的集群,有人说我只需要在 ​​StreamingFileSink​​​ 需要传入的​​hdfs​​​ 路径前加上离线集群的 ​​ip​​​ 就好了,比如:​​hdfs://otherIp:/usr/hive/warehouse/​​​ 这样固然能写入数据,但是我们的​​hadoop​​​ 集群都是 ​​HA​​​ 的。​​namenode​​​ 切换的时候会导致写不进去数据,所以此方法不可行。本文主要提供 ​​flink​​​ 写入其它 ​​HA​​ 集群的方法和思路

原因查找

如果我们直接通过指定 ​​StreamingFileSink​​​ 的写入路径为其它 ​​HA​​​ 的 ​​Hadoop​​​ 集群时,比如:​​hdfs://HDFS42143/usr/hive/warehouse/hour_hive​​ ,会出现这样的异常

大家都知道我们在创建 ​​HA​​​ 集群时,需要指定一个 ​​nameservice​​​,这个 ​​nameservice​​​ 可以是你喜欢的符号,然后还需要一些额外的 ​​HA​​ 配置。比如

dfs.client.failover.proxy.provider.HDFS42142=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProviderdfs.nameservices=HDFS42142dfs.namenode.rpc-address.HDFS42142.nn1=172.xx.xx.01:portdfs.namenode.rpc-address.HDFS42142.nn2=172.xx.xx.02:portdfs.ha.namenodes.HDFS42142=nn1,nn2

可是在 ​​StreamingFileSink​​​源码里面没有找到含有的 ​​hadoop​​​ 配置的构造方法。怎么搞呢?我们可以先理解以下​​StreamingFileSink​​的写入原理

StreamingFileSink 源码剖析

一般我们创建 ​​StreamingFileSink​​ 都会使用以下方式

package com.tuya.sink;import com.tuya.AppArgs;import com.tuya.sink.filesystem.HdfsBucketAssigner;import com.tuya.sink.filesystem.MyRollingPolicy;import org.apache.flink.api.common.serialization.SimpleStringEncoder;import org.apache.flink.core.fs.Path;import org.apache.flink.formats.parquet.avro.ParquetAvroWriters;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;import org.slf4j.Logger;import org.slf4j.LoggerFactory;/** * desc: * * @author scx * @create 2019/10/24 */public class FileSinkFactory { private static final Logger LOG = LoggerFactory.getLogger(FileSinkFactory.class); /** * 多久检测一次process状态文件 */ private static final long CHECK_INTERVAL = 30 * 1000L; /** * 默认多久未写入的文件为超时(超时后会生成一个新的文件) */ private static final long DEFAULT_INACTIVITY_INTERVAL = 30 * 60L * 1000L; /** * 默认多久滚动生成一个hdfs文件 */ private static final long DEFAULT_ROLLOVER_INTERVAL = 40 * 60L * 1000L; /** * hdfs文件最大限制 128M */ private static final long DEFAULT_MAX_PART_SIZE = 1024L * 1024L * 128L; public static StreamingFileSink bulkSink(AppArgs appArgs, Class clazz) { return StreamingFileSink.forBulkFormat(new Path(appArgs.getHdfsPath()), ParquetAvroWriters.forReflectRecord(clazz)) .withBucketAssigner(new HdfsBucketAssigner<>()) .withBucketCheckInterval(CHECK_INTERVAL) .build(); } public static StreamingFileSink rowSink(AppArgs appArgs, Class clazz) { return StreamingFileSink.forRowFormat(new Path(appArgs.getHdfsPath()), new SimpleStringEncoder()) .withBucketAssigner(new HdfsBucketAssigner<>()) .withRollingPolicy(new MyRollingPolicy<>(appArgs, DEFAULT_MAX_PART_SIZE, DEFAULT_ROLLOVER_INTERVAL, DEFAULT_INACTIVITY_INTERVAL)) .withBucketCheckInterval(CHECK_INTERVAL) .build(); }}

​​StreamingFileSink.forBulkFormat​​​ 和 ​​StreamingFileSink.forRowFormat​​​ 分别表示行编码格式和块编码格式的写入。由于我使用的是行编码格式并且两者内部获取 ​​hdfs​​​ 的 ​​filesyStem​​​ 逻辑基本一致,就从 ​​StreamingFileSink.forRowFormat​​ 为入口分析。

在 ​​StreamingFileSink​​​ 源码中重写了 ​​initializeState​​ 方法,该方法会在程序启动的时候调用一次

@Override public void initializeState(FunctionInitializationContext context) throws Exception { final int subtaskIndex = getRuntimeContext().getIndexOfThisSubtask(); this.buckets = bucketsBuilder.createBuckets(subtaskIndex); final OperatorStateStore stateStore = context.getOperatorStateStore(); bucketStates = stateStore.getListState(BUCKET_STATE_DESC); maxPartCountersState = stateStore.getUnionListState(MAX_PART_COUNTER_STATE_DESC); if (context.isRestored()) { buckets.initializeState(bucketStates, maxPartCountersState); } }

主要查看 这一行 ​​this.buckets = bucketsBuilder.createBuckets(subtaskIndex)​​​ 使用​​bucketsBuilder​​​根据​​task​​​下表创建所有 ​​bucket​​​ 的管理者 ​​buckets​​

​​BucketsBuilder​​​ 有两个实现类,分别是​​BulkFormatBuilder​​​、​​RowFormatBuilder​​​. 查看​​​RowFormatBuilder​​实现类

extends StreamingFileSink.BucketsBuilder { //省略部分代码 @Override Buckets createBuckets(int subtaskIndex) throws IOException { return new Buckets<>( basePath, bucketAssigner, bucketFactory, new RowWisePartWriter.Factory<>(encoder), rollingPolicy, subtaskIndex); } }

​​createBuckets​​​ 方法会返回一个 ​​Buckets​​​ 对象,继续进入 ​​Buckets​​ 构造方法内

Buckets( final Path basePath, final BucketAssigner bucketAssigner, final BucketFactory bucketFactory, final PartFileWriter.PartFileFactory partFileWriterFactory, final RollingPolicy rollingPolicy, final int subtaskIndex) throws IOException { this.basePath = Preconditions.checkNotNull(basePath); //省略部分代码 try { this.fsWriter = FileSystem.get(basePath.toUri()).createRecoverableWriter(); } catch (IOException e) { LOG.error("Unable to create filesystem for path: {}", basePath); throw e; } //省略部分代码 }

在这里我们终于看到熟悉的 ​​FileSystem​​​ 了,​​FileSystem.get(basePath.toUri())​​​ 通过我们传入的 ​​basePath​​​ 路径来获得一个 ​​FileSystem​​,点进去

/** * Returns a reference to the {@link FileSystem} instance for accessing the * file system identified by the given {@link URI}. * * @param uri * the {@link URI} identifying the file system * @return a reference to the {@link FileSystem} instance for accessing the file system identified by the given * {@link URI}. * @throws IOException * thrown if a reference to the file system instance could not be obtained */ public static FileSystem get(URI uri) throws IOException { return FileSystemSafetyNet.wrapWithSafetyNetWhenActivated(getUnguardedFileSystem(uri)); }

​​FileSystemSafetyNet.wrapWithSafetyNetWhenActivated​​​ 方法封装了​​FileSystem​​​ 来防止未关闭流而导致的资源泄漏问题,不是我们观察的重点,进入​​getUnguardedFileSystem​​ 方法。

public static FileSystem getUnguardedFileSystem(final URI fsUri) throws IOException { checkNotNull(fsUri, "file system URI"); LOCK.lock(); try { final URI uri; //判断我们的写入的路径有没有传入scheme,即:hdfs://,file://等前缀 if (fsUri.getScheme() != null) { //如果传入了scheme直接赋值给uri uri = fsUri; } //省略部分代码 final FSKey key = new FSKey(uri.getScheme(), uri.getAuthority()); // 先检查缓存 { FileSystem cached = CACHE.get(key); if (cached != null) { return cached; } } //如果FS_FACTORIES为空进行一下初始化加载 if (FS_FACTORIES.isEmpty()) { initialize(new Configuration()); } final FileSystem fs; final FileSystemFactory factory = FS_FACTORIES.get(uri.getScheme()); //如果fileSystem工厂存在,创建fileSystem if (factory != null) { fs = factory.create(uri); } else { try { //不存在使用失败重试的factory进行创建fileSystem fs = FALLBACK_FACTORY.create(uri); } catch (UnsupportedFileSystemSchemeException e) { throw new UnsupportedFileSystemSchemeException( "Could not find a file system implementation for scheme '" + uri.getScheme() + "'. The scheme is not directly supported by Flink and no Hadoop file " + "system to support this scheme could be loaded.", e); } } CACHE.put(key, fs); return fs; } finally { LOCK.unlock(); } }

上面代码简单进行了注释,首先判断​​fsUri​​​ 的 ​​schema​​​ 是否存在,如果不存在或进行一些默认操作。我们配置的是​​hdfs://HDFS42143/usr/hive/warehouse/hour_hive​​​ ,​​scheme​​​ 为 ​​hdfs​​​,然后先检查缓存是否已经存在,存在的话直接返回。不存在的话判断 ​​FS_FACTORIES​​​ 中是否存在,如果继续不存在就使用默认的​​FALLBACK_FACTORY​​​ 工厂创建 ​​filesystem​​​ 。这里主要看 ​​initialize​​ 方法

public static void initialize(Configuration config) throws IOException, IllegalConfigurationException { LOCK.lock(); try { //省略部分代码 for (FileSystemFactory factory : RAW_FACTORIES) { factory.configure(config); String scheme = factory.getScheme(); FileSystemFactory fsf = ConnectionLimitingFactory.decorateIfLimited(factory, scheme, config); FS_FACTORIES.put(scheme, fsf); } // configure the default (fallback) factory FALLBACK_FACTORY.configure(config); //省略部分代码 } finally { LOCK.unlock(); } }

在 ​​initialize​​​ 方法里我们可以看到遍历 ​​RAW_FACTORIES​​​ 集合,首先调用​​configure​​​ 方法加载配置,然后把该集合内的 ​​FileSystemFactory​​​ 实例以其所支持的 ​​schema​​​ 为 ​​key​​​,本身对象为 ​​value​​​ 放到​​FS_FACTORIES​​​ 的 ​​map​​​ 中,下面还对 ​​FALLBACK_FACTORY​​​ 进行了 ​​configure​​​ 配置加载。需要注意的是initialize方法会在很多地方被调用,比如​​jobManager​​​、​​taskManager​​​启动的时候。 看到这里有两个疑问,​​​RAW_FACTORIES​​​ 和 ​​FALLBACK_FACTORY​​​ 分别是在哪里创建的 首先看 ​​​RAW_FACTORIES​​

/** All available file system factories. */ private static final List RAW_FACTORIES = loadFileSystems(); private static List loadFileSystems() { final ArrayList list = new ArrayList<>(); list.add(new LocalFileSystemFactory()); LOG.debug("Loading extension file systems via services"); try { ServiceLoader serviceLoader = ServiceLoader.load(FileSystemFactory.class); Iterator iter = serviceLoader.iterator(); while (iter.hasNext()) { try { FileSystemFactory factory = iter.next(); list.add(factory); LOG.debug("Added file system {}:{}", factory.getScheme(), factory.getClass().getName()); } catch (Throwable t) { ExceptionUtils.rethrowIfFatalErrorOrOOM(t); LOG.error("Failed to load a file system via services", t); } } } catch (Throwable t) { ExceptionUtils.rethrowIfFatalErrorOrOOM(t); LOG.error("Failed to load additional file systems via services", t); } return Collections.unmodifiableList(list); }

​​RAW_FACTORIES​​​ 是创建的静态变量,然后从静态方法 ​​loadFileSystems​​​ 加载。需要注意的是,​​loadFileSystems​​​ 方法中首先会加一个默认的 ​​factory​​​ 即​​LocalFileSystemFactory​​​。然后其它的 ​​factory​​​使用​​ServiceLoader.load(FileSystemFactory.class)​​​ 通过 ​​SPI​​​ 获取,但是我并没有发现Flink源码在 ​​src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory​​​ 目录下配置FileSystemFactory的实现类,也就是说这些需要我们自己配置,如果不配置,那么默认情况下 ​​SPI​​​ 获得的 ​​FileSystemFactory​​​ 是为空的。也就是说​​RAW_FACTORIES​​​ 只有一个 ​​LocalFileSystemFactory​​​ 实例。而​​LocalFileSystemFactory​​​ 的 ​​scheme​​​ 为 ​​file​​。

然后看 ​​FALLBACK_FACTORY​​

private static final FileSystemFactory FALLBACK_FACTORY = loadHadoopFsFactory(); private static FileSystemFactory loadHadoopFsFactory() { final ClassLoader cl = FileSystem.class.getClassLoader(); // first, see if the Flink runtime classes are available final Class factoryClass; try { factoryClass = Class .forName("org.apache.flink.runtime.fs.hdfs.HadoopFsFactory", false, cl) .asSubclass(FileSystemFactory.class); } catch (ClassNotFoundException e) { LOG.info("No Flink runtime dependency present. " + "The extended set of supported File Systems via Hadoop is not available."); return new UnsupportedSchemeFactory("Flink runtime classes missing in classpath/dependencies."); } catch (Exception | LinkageError e) { LOG.warn("Flink's Hadoop file system factory could not be loaded", e); return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be loaded", e); } // check (for eager and better exception messages) if the Hadoop classes are available here try { Class.forName("org.apache.hadoop.conf.Configuration", false, cl); Class.forName("org.apache.hadoop.fs.FileSystem", false, cl); } catch (ClassNotFoundException e) { LOG.info("Hadoop is not in the classpath/dependencies. " + "The extended set of supported File Systems via Hadoop is not available."); return new UnsupportedSchemeFactory("Hadoop is not in the classpath/dependencies."); } // Create the factory. try { return factoryClass.newInstance(); } catch (Exception | LinkageError e) { LOG.warn("Flink's Hadoop file system factory could not be created", e); return new UnsupportedSchemeFactory("Flink's Hadoop file system factory could not be created", e); } }

这里就很简单了,直接通过反射的方式新建 ​​org.apache.flink.runtime.fs.hdfs.HadoopFsFactory​​​ 实例,也就是说 ​​FALLBACK_FACTORY​​​ 的值为 ​​HadoopFsFactory​​​,​​scheme​​​ 为 ​​*​​

到这里我们先总结一下

我们的​​HA​​​ 集群的路径为​​hdfs://HDFS42143/usr/hive/warehouse/hour_hive​​​,​​scheme​​​ 为​​hdfs​​我们对​​HA​​​ 集群的​​CRUD​​​ 操作的​​FileSystem​​​ 是在​​FileSystem.getUnguardedFileSystem​​方法中获得的​​RAW_FACTORIES​​​ 集合内的factory都是通过JavaSPI的方式加载的,并且只有一个实例​​LocalFileSystemFactory​​​,支持的​​scheme​​​为​​file​​​​FALLBACK_FACTORY​​​ 的值为​​HadoopFsFactory​​​,支持的​​scheme​​​ 为​​*​​

我们的 ​​HA​​​ 集群就是使用 ​​FALLBACK_FACTORY​​​创建的 ​​fileSystem​​​。在上面的 ​​initialize(Configuration config)​​​ 方法中会执行 ​​FALLBACK_FACTORY.configure(config);​​

进入 ​​HadoopFsFactory​​

public class HadoopFsFactory implements FileSystemFactory { private static final Logger LOG = LoggerFactory.getLogger(HadoopFsFactory.class); /** Flink's configuration object. */ private Configuration flinkConfig; /** Hadoop's configuration for the file systems. */ private org.apache.hadoop.conf.Configuration hadoopConfig; @Override public String getScheme() { // the hadoop factory creates various schemes return "*"; } @Override public void configure(Configuration config) { flinkConfig = config; hadoopConfig = null; // reset the Hadoop Config } @Override public FileSystem create(URI fsUri) throws IOException { checkNotNull(fsUri, "fsUri"); final String scheme = fsUri.getScheme(); checkArgument(scheme != null, "file system has null scheme"); // from here on, we need to handle errors due to missing optional // dependency classes try { // -- (1) get the loaded Hadoop config (or fall back to one loaded from the classpath) final org.apache.hadoop.conf.Configuration hadoopConfig; if (this.hadoopConfig != null) { hadoopConfig = this.hadoopConfig; } else if (flinkConfig != null) { hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); this.hadoopConfig = hadoopConfig; } else { LOG.warn("Hadoop configuration has not been explicitly initialized prior to loading a Hadoop file system." + " Using configuration from the classpath."); hadoopConfig = new org.apache.hadoop.conf.Configuration(); } // -- (2) get the Hadoop file system class for that scheme final Class fsClass; try { fsClass = org.apache.hadoop.fs.FileSystem.getFileSystemClass(scheme, hadoopConfig); } catch (IOException e) { throw new UnsupportedFileSystemSchemeException( "Hadoop File System abstraction does not support scheme '" + scheme + "'. " + "Either no file system implementation exists for that scheme, " + "or the relevant classes are missing from the classpath.", e); } // -- (3) instantiate the Hadoop file system LOG.debug("Instantiating for file system scheme {} Hadoop File System {}", scheme, fsClass.getName()); final org.apache.hadoop.fs.FileSystem hadoopFs = fsClass.newInstance(); // -- (4) create the proper URI to initialize the file system final URI initUri; if (fsUri.getAuthority() != null) { initUri = fsUri; } else { LOG.debug("URI {} does not specify file system authority, trying to load default authority (fs.defaultFS)"); String configEntry = hadoopConfig.get("fs.defaultFS", null); if (configEntry == null) { // fs.default.name deprecated as of hadoop 2.2.0 - see // configEntry = hadoopConfig.get("fs.default.name", null); } if (LOG.isDebugEnabled()) { LOG.debug("Hadoop's 'fs.defaultFS' is set to {}", configEntry); } if (configEntry == null) { throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + "Hadoop configuration did not contain an entry for the default file system ('fs.defaultFS')."); } else { try { initUri = URI.create(configEntry); } catch (IllegalArgumentException e) { throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + "The configuration contains an invalid file system default name " + "('fs.default.name' or 'fs.defaultFS'): " + configEntry); } if (initUri.getAuthority() == null) { throw new IOException(getMissingAuthorityErrorPrefix(fsUri) + "Hadoop configuration for default file system ('fs.default.name' or 'fs.defaultFS') " + "contains no valid authority component (like hdfs namenode, S3 host, etc)"); } } } // -- (5) configure the Hadoop file system try { hadoopFs.initialize(initUri, hadoopConfig); } catch (UnknownHostException e) { String message = "The Hadoop file system's authority (" + initUri.getAuthority() + "), specified by either the file URI or the configuration, cannot be resolved."; throw new IOException(message, e); } HadoopFileSystem fs = new HadoopFileSystem(hadoopFs); // create the Flink file system, optionally limiting the open connections if (flinkConfig != null) { return limitIfConfigured(fs, scheme, flinkConfig); } else { return fs; } } catch (ReflectiveOperationException | LinkageError e) { throw new UnsupportedFileSystemSchemeException("Cannot support file system for '" + fsUri.getScheme() + "' via Hadoop, because Hadoop is not in the classpath, or some classes " + "are missing from the classpath.", e); } catch (IOException e) { throw e; } catch (Exception e) { throw new IOException("Cannot instantiate file system for URI: " + fsUri, e); } }}

代码都比较简单,在

@Override public void configure(Configuration config) { flinkConfig = config; hadoopConfig = null; // reset the Hadoop Config }

我们可以看到 ​​hadoopConfig​​​ 会被设置为 ​​null​​​,然后把配置赋值给 ​​flinkConfig​​ 然后在

final org.apache.hadoop.conf.Configuration hadoopConfig; if (this.hadoopConfig != null) { hadoopConfig = this.hadoopConfig; } else if (flinkConfig != null) { hadoopConfig = HadoopUtils.getHadoopConfiguration(flinkConfig); this.hadoopConfig = hadoopConfig; } else { LOG.warn("Hadoop configuration has not been explicitly initialized prior to loading a Hadoop file system." + " Using configuration from the classpath."); hadoopConfig = new org.apache.hadoop.conf.Configuration(); }

​​hadoopConfig​​​ 通过 ​​HadoopUtils.getHadoopConfiguration​​​ 方法从 ​​flinkConfig​​​ 解析。 在最下面通过反射的方式创建 ​​​HadoopFileSystem​​​ 我们进入 ​​HadoopUtils.getHadoopConfiguration​​ 方法

public static Configuration getHadoopConfiguration(org.apache.flink.configuration.Configuration flinkConfiguration) { Configuration result = new HdfsConfiguration(); boolean foundHadoopConfiguration = false; //从flink配置中读取fs.hdfs.hdfsdefault配置 final String hdfsDefaultPath = flinkConfiguration.getString(ConfigConstants.HDFS_DEFAULT_CONFIG, null); //加载 fs.hdfs.hdfsdefault 路径的资源 if (hdfsDefaultPath != null) { result.addResource(new org.apache.hadoop.fs.Path(hdfsDefaultPath)); LOG.debug("Using hdfs-default configuration-file path form Flink config: {}", hdfsDefaultPath); foundHadoopConfiguration = true; } else { LOG.debug("Cannot find hdfs-default configuration-file path in Flink config."); } //从flink配置中获取fs.hdfs.hdfssite的值 final String hdfsSitePath = flinkConfiguration.getString(ConfigConstants.HDFS_SITE_CONFIG, null); //加载 fs.hdfs.hdfssite 路径的资源 if (hdfsSitePath != null) { result.addResource(new org.apache.hadoop.fs.Path(hdfsSitePath)); LOG.debug("Using hdfs-site configuration-file path form Flink config: {}", hdfsSitePath); foundHadoopConfiguration = true; } else { LOG.debug("Cannot find hdfs-site configuration-file path in Flink config."); } String[] possibleHadoopConfPaths = new String[4]; //从flink配置中获取fs.hdfs.hadoopconf的配置路径 possibleHadoopConfPaths[0] = flinkConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG, null); //从环境变量中获取HADOOP_CONF_DIR的路径 possibleHadoopConfPaths[1] = System.getenv("HADOOP_CONF_DIR"); //从环境变量中获取HADOOP_HOME的路径 final String hadoopHome = System.getenv("HADOOP_HOME"); if (hadoopHome != null) { possibleHadoopConfPaths[2] = hadoopHome + "/conf"; possibleHadoopConfPaths[3] = hadoopHome + "/etc/hadoop"; // hadoop 2.2 } //从这些可能的路径中加载hadoop配置资源 for (String possibleHadoopConfPath : possibleHadoopConfPaths) { if (possibleHadoopConfPath != null) { if (new File(possibleHadoopConfPath).exists()) { if (new File(possibleHadoopConfPath + "/core-site.xml").exists()) { result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/core-site.xml")); LOG.debug("Adding " + possibleHadoopConfPath + "/core-site.xml to hadoop configuration"); foundHadoopConfiguration = true; } if (new File(possibleHadoopConfPath + "/hdfs-site.xml").exists()) { result.addResource(new org.apache.hadoop.fs.Path(possibleHadoopConfPath + "/hdfs-site.xml")); LOG.debug("Adding " + possibleHadoopConfPath + "/hdfs-site.xml to hadoop configuration"); foundHadoopConfiguration = true; } } } } if (!foundHadoopConfiguration) { LOG.debug("Could not find Hadoop configuration via any of the supported methods " + "(Flink configuration, environment variables)."); } return result; }

代码已经加了简单的注释,通过这段代码我们可以发现 ​​hadoop​​​ 配置都是通过 ​​flink​​​ 配置以及环境变量中获得。而我们的flink集群的环境变量中是不可能存在我们另外一个 ​​hadoop​​ 集群的配置的。所以此时我们有两种解决办法。

解决办法

第二种: 这种方法为新建一个 ​​​CustomerHadoopFsFactory​​​ 类,该类在从 ​​flink​​​ 配置中获取 ​​hadoop​​​ 配置之后添加自己的 ​​HA​​​ 集群的配置,在上面的分析我们知道所有的factory实例都是通过 ​​SPI​​​ 获得,所以需要自己在​​src/main/resources/META-INF/services/​​​ 目录新建​​org.apache.flink.core.fs.FileSystemFactory​​​ 文件,文件内容为自定义​​CustomerHadoopFsFactory​​​ 类的权限定名,该类比较简单。并且适用于所有 ​​flink​​ 集群的所有机器。

总结

以上两种方法都能够解决 ​​StreamingFileSink​​​ 如何写数据到其它 ​​HA​​​ 的​​Hadoop​​​ 集群的问题。第一种不太灵活,需要把另外一个集群的配置文件移到flink集群,并且每台机器都要配置。第二种就比较灵活了,配置可以通过flink启动传入,或者放到 ​​properties​​ 文件中,自己读取加载。我使用的第二种方式,为了避免有些人不懂如何创建,怎么放置,附上我的使用方式截图

关注我,随时获取最新文章

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:程序员求职找工作的4条实用建议!(程序员10月份找工作难找)
下一篇:深入剖析 Delta Lake: MySQL CDC 实战
相关文章

 发表评论

暂时没有评论,来抢沙发吧~