本文共 1947 字,大约阅读时间需要 6 分钟。
一、什么是Flink
Flink是原生的流处理系统,提供high level的API。Flink也提供 API来像Spark一样进行批处理,但两者处理的基础是完全不同的。Flink把批处理当作流处理中的一种特殊情况。在Flink中,所有 的数据都看作流,是一种很好的抽象,因为这更接近于现实世界。 二、Flink安装$git clone https://github.com/apache/flink.git$cd flink$mvn clean package -DskipTestscd build-target
用http://127.0.0.1:8081进行访问界面。
三、创建工程$ curl https://flink.apache.org/q/quickstart.sh | bash或者参考入门代码:https://github.com/wuchong/my-flink-project
四、或者另一种方式
Maven 依赖 如果您正在 Maven 项目中开发程序,则必须使用此依赖项添加 flink-clients 模块:org.apache.flink flink-clients{ { site.scala_version_suffix }} 1.6.0
本地环境
LocalEnvironment
是本地执行 Flink
程序的句柄。可使用他,独立或嵌入其他程序在本地 JVM 中运行Flink
程序。 通过 ExecutionEnvironment.createLocalEnvironment()
方法实例化本地环境。 默认情况下,启动的本地线程数与计算机的CPU个数相同。 您也可以指定所需的并发度。 可以使用enableLogging()/disableLogging()
将本地环境日志打印到控制台。 在大多数情况下,调用 ExecutionEnvironment.getExecutionEnvironment()
是更好的方式。 当程序在本地启动时(在命令行界面之外),该方法返回一个 LocalEnvironment
,当使用 命令行 调用程序时,它返回一个预先配置的集群执行环境。 public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); DataSetdata = env.readTextFile("file:///path/to/file"); data.filter(new FilterFunction () { public boolean filter(String value) { return value.startsWith("http://"); } }) .writeAsText("file:///path/to/result"); JobExecutionResult res = env.execute();}
执行完成后返回的 Job ExecutionResult
对象包含程序运行时(Runtime)
和累加的结果。
LocalEnvironment
还可以将自定义配置传递给 Flink
。 Configuration conf = new Configuration();conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);
请注意: 本地执行环境不启动任何 Web 前端来监视执行情况。
关于解决构建maven项目中报错:Failed to execute goal org.apache.maven.plugins:maven-archetype-plugin:3.0.1…:的方法 1.首先进入仓库下面repository\org\apache\maven\plugins这个目录 2.删除目录下的maven-archetype-plugin文件夹 3.重新加载转载地址:http://boipn.baihongyu.com/