Socket
Flink 实时处理 Socket 数据
Flink Socket 源码 GitHub
通过 Maven Archetype 创建项目
创建项目
mvn archetype:generate \ |
通过以上 Maven
命令进行项目创建的过程中,命令会交互式地提示用户对项目的 groupId
、artifactId
、version
、package
等信息进行定义,且部分选项有默认值,直接回车即可。如图如果创建项目成功之后,客户端会有相应提示。
这里我们分别指定 groupId
、artifactId
的信息分别如下,其余参数使用默认值
groupId
:com.qstartifactId
:flink-socket
检查项目
对于使用 Maven 创建的项目,我们可以看到的项目结构如下所示
以上项目结构可以看出,该项目是一个 Scala
代码的项目,分别是 BatchJob.java
和 StreamingJob.java
两个文件,分别对应 Flink
批量接口 DataSet
的实例代码和流式接口的实例代码。
将项目导入 IDE
项目经过上述步骤创建后,Flink
官网推荐使用 Intellij IDEA
进行后续项目开发。
编译项目
项目经过上述步骤创建后,可以使用 Maven Command
命令 mvn clean package
对项目进行编译,编译完成后会在项目同级目录下生成 target/<artifactId>-<version>.jar
文件,此 jar 文件就可以通过 Web 客户端提交到集群上运行。
开发环境配置
这里我们使用官网推荐的 IntelliJ IDEA
作为应用的开发的 IDE。
下载 IntelliJ IDEA
用户可以通过 IntelliJ IDEA
官方地址下载安装程序,根据操作系统选择相应的程序包进行安装。
安装 Scala Plugins
安装完 IntelliJ IDEA
默认是不支持 Scala
开发环境的,需要安装 Scala
插件进行支持。一下说明在 IDEA
中进行 Scala
插件的安装。
打开
IDEA IDE
后,在IntelliJ IDEA
菜单栏中选择Preferences
选项,然后选择Plugins
子选项,最后在页面中选择Marketplace
,在搜索框中输入Scala
进行搜索在检索出来的选项列表中选择和安装
Scala
插件点击安装后重启 IDE,
Scala
编程环境即可生效
导入 Flink 项目
- 启动
IntelliJ IDEA
,选择Import Project
,在文件选项框中选择创建好的项目,点击确定。 - 导入项目中选择
Import project from external mode
中的Maven
后续选项使用默认值即可。
Flink Socket 应用程序
编写 Flink Socket 应用程序代码
import org.apache.flink.streaming.api.scala._ |
在 IDE 中测试代码
在代码文件中右键运行程序
此时会报如下错误
这时我们需要在 IDEA 的 Run/Debug Configuration
中将 Include dependencies with "Provided" scope
选项勾选,这时我们就可以在本地 IDE 运行了
在本地测试代码
首先在命令行我们现在终端开启监听端口 9000
,在命令行中执行如下命令
nc -l 9000 |
然后在 IDE 中 右键运行 StreamingJob
类的 main
方法,运行结果如下
在 Web 客户端中运行 Job
首先在项目所在目录执行 mvn clean package
进行打包,在项目的 target
目录下生成一个 flink-socket-1.0-SNAPSHOT.jar
文件
在命令行我们现在终端开启监听端口 9000
,在命令行中执行如下命令
nc -l 9000 |
在浏览器中打开 Flink Web 监控页面,在左侧选择 Submit New Job
选项,点击 右上角的 Add New
选择我们编译好的 flink-socket-1.0-SNAPSHOT.jar
文件,点击 Submit 按钮提交 Job
选择 Task Managers
选择列表中的对应 Job 点击 Stdout
选项查看执行结果