抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >


Flink 实时处理 Socket 数据

Flink Socket 源码 GitHub

通过 Maven Archetype 创建项目

创建项目

mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.9.0

通过以上 Maven 命令进行项目创建的过程中,命令会交互式地提示用户对项目的 groupIdartifactIdversionpackage 等信息进行定义,且部分选项有默认值,直接回车即可。如图如果创建项目成功之后,客户端会有相应提示。

这里我们分别指定 groupIdartifactId 的信息分别如下,其余参数使用默认值

  • groupId:com.qst
  • artifactId:flink-socket

创建项目

检查项目

对于使用 Maven 创建的项目,我们可以看到的项目结构如下所示

检查项目

以上项目结构可以看出,该项目是一个 Scala 代码的项目,分别是 BatchJob.javaStreamingJob.java 两个文件,分别对应 Flink 批量接口 DataSet 的实例代码和流式接口的实例代码。

将项目导入 IDE

项目经过上述步骤创建后,Flink 官网推荐使用 Intellij IDEA 进行后续项目开发。

编译项目

项目经过上述步骤创建后,可以使用 Maven Command 命令 mvn clean package 对项目进行编译,编译完成后会在项目同级目录下生成 target/<artifactId>-<version>.jar 文件,此 jar 文件就可以通过 Web 客户端提交到集群上运行。

开发环境配置

这里我们使用官网推荐的 IntelliJ IDEA 作为应用的开发的 IDE。

下载 IntelliJ IDEA

用户可以通过 IntelliJ IDEA 官方地址下载安装程序,根据操作系统选择相应的程序包进行安装。

安装 Scala Plugins

安装完 IntelliJ IDEA 默认是不支持 Scala 开发环境的,需要安装 Scala 插件进行支持。一下说明在 IDEA 中进行 Scala 插件的安装。

  • 打开 IDEA IDE 后,在 IntelliJ IDEA 菜单栏中选择 Preferences 选项,然后选择 Plugins 子选项,最后在页面中选择 Marketplace,在搜索框中输入 Scala 进行搜索

  • 在检索出来的选项列表中选择和安装 Scala 插件

  • 点击安装后重启 IDE,Scala 编程环境即可生效

安装 Scala Plugins

  • 启动 IntelliJ IDEA,选择 Import Project,在文件选项框中选择创建好的项目,点击确定。
  • 导入项目中选择 Import project from external mode 中的 Maven 后续选项使用默认值即可。
import org.apache.flink.streaming.api.scala._

object StreamingJob {
def main(args: Array[String]) {
//设置环境变量
val env = StreamExecutionEnvironment.getExecutionEnvironment
//指定数据源,读取socket
val socketStream = env.socketTextStream("localhost", 9000, '\n')
//对数据集指定转换操作逻辑
val count = socketStream
.flatMap(_.toLowerCase.split("\\W+"))
.filter(_.nonEmpty)
.map((_, 1))
.keyBy(0)
.sum(1)
//将计算结果打印到控制台
count.print()
//指定任务名称并触发流式任务
env.execute("Socket Stream")
}
}

在 IDE 中测试代码

在代码文件中右键运行程序

此时会报如下错误

错误

这时我们需要在 IDEA 的 Run/Debug Configuration 中将 Include dependencies with "Provided" scope 选项勾选,这时我们就可以在本地 IDE 运行了

选项勾选

在本地测试代码

首先在命令行我们现在终端开启监听端口 9000,在命令行中执行如下命令

nc -l 9000

nc -l 9000

然后在 IDE 中 右键运行 StreamingJob 类的 main 方法,运行结果如下

运行

在 Web 客户端中运行 Job

首先在项目所在目录执行 mvn clean package 进行打包,在项目的 target 目录下生成一个 flink-socket-1.0-SNAPSHOT.jar 文件
在命令行我们现在终端开启监听端口 9000,在命令行中执行如下命令

nc -l 9000

nc -l 9000

在浏览器中打开 Flink Web 监控页面,在左侧选择 Submit New Job 选项,点击 右上角的 Add New 选择我们编译好的 flink-socket-1.0-SNAPSHOT.jar 文件,点击 Submit 按钮提交 Job

Submit

选择 Task Managers 选择列表中的对应 Job 点击 Stdout 选项查看执行结果

查看执行结果

推荐阅读
Flink 编程模型 Flink 编程模型 Flink 概述 Flink 概述 Flink Flink Flink常用的算子 Flink常用的算子 Flink 时间语义 Flink 时间语义 SSM Maven 配置模板 SSM Maven 配置模板

留言区

Are You A Robot?