首页 / 新闻

04.

01

2017

基于流的SQL引擎:StreamSQL(基础介绍)

技术博客

Inceptor StreamSQL简介

一些流处理平台(比如Spark Streaming,Storm)通常要求用户在创建流处理应用时必须用Java或者Scala进行开发,包括早期的TDH(4.3之前)也是如此。这要求业务人员只有对框架以及流处理本身,甚至是底层技术足够熟悉,才能写出高效的流处理程序。大大地限制了流处理的推广和应用,架高了流处理应用的开发门槛,对于数据科学家和数据分析者而言增加了操作难度,导致无法将精力完全投入在业务分析上。

为降低流应用开发的入门要求,星环从TDH 4.3开始,在Transwarp Stream中引入全新的StreamSQL,允许用户通过SQL实现业务逻辑。StreamSQL几乎可以应对所有类型的业务场景,包括ETL工具,规则报警工具等简单业务场景。为了实现更复杂的业务逻辑,它还对PL/SQL作为高级功能提供了优美的支持。从4.8开始,StreamSQL又新增了基于事件驱动模式的流处理功能,在低延迟处理方面的性能表现更为出色。

关于StreamSQL的几个重要概念

StreamSQL有三个核心概念:Stream、StreamJob 和 Application。概括地说,Stream是数据流,StreamJob是对一个或多个Stream进行计算并将结果写进一张表的任务,Application是一个或多个StreamJob的集合。

  • Stream

    Stream分为两种:Input Stream和Derived Stream。直接用于接收数据源传来的数据称为Input Stream;对已有Stream进行变形得到的新的Stream称为Derived Stream。

  • StreamJob

    StreamSQL中的Stream是静态的——它们仅仅描述了如何对数据源传来的数据进行接收和变形的计划,但并不执行这些计划。要让StreamSQL执行计划,需要有相应的Action操作来触发StreamJob。启动一个StreamJob时,StreamSQL会为每一个Input Stream启动一组称为 receiver的任务来接收数据,接收的数据经过一系列Derived Stream的变形最终被插入一张表,供用户查询。

  • Application

    Application是一组业务逻辑相关的StreamJob的集合。合理地使用Application划分StreamJob可以实现资源的共享和隔离。之后我们会有文章对Application的隔离能力进行专门介绍。

StreamSQL的简单示例

快速入门

这里将通过一个简单的例子,对StreamSQL的使用方法做基本介绍。我们会使用Kafka的console工具生成一些简单的数据,并让StreamSQL来处理。该演示包含三部分:

1. 建一个Kafka数据源;

2. 在Inceptor中建一个Stream并触发StreamJob;

3. 在Inceptor中处理Stream从Kafka数据源接收的数据。

建Kafka数据源

1. 登录Kafka节点

登陆集群中任意安装了Kafka的节点。进入/usr/lib/kafka/bin目录,该目录下有建Kafka数据源的所需要的脚本。

2. 建一个Kafka Topic

执行下面指令,运行/usr/lib/kafka/bin目录下的kafka-create-topic.sh脚本:

该指令提供了如下信息:topic名称为demo,使用172.16.1.128上的Zookeeper,分3个partition。

3. 查看Kafka Topic

执行下面指令,运行/usr/lib/kafka/bin目录下的kafka-topics.sh脚本:

我们可以看到刚才建的名为demo的topic,和它的Partition信息。

4. 建Kafka Producer并发布消息

执行下面指令,运行/usr/lib/kafka/bin目录下的kafka-console-producer.sh脚本:

该指令的含义为:指定使用172.16.1.128节点为Kafka Broker,并且指定Producer发布消息的topic为demo。现在,我们可以在命令行中输入一些消息,这些消息都将被发布给demo:

5. 建好数据源

至此,已经建好了一个Kafka数据源,并发布了一些消息。先不要停止上面Producer的进程,让它保持运行,你可以继续在命令行中输入消息。现在打开另一个窗口登陆集群,进入Inceptor,建一个Stream并触发StreamJob的开始。

建Stream及触发StreamJob

1. 登录Inceptor

登陆集群中的任意一个节点,连接到Inceptor。这里,我们以hive用户身份连接一个LDAP认证的Inceptor Server 2。

此处的port由Transwarp Manager配置页上的参数 hive.server2.thrift.port 配置,默认为10010。

2. 建一个Stream

该StreamSQL语句建了一个名为demo_stream的Stream,它使用Kafka为源,接收发布给名为demo的topic的消息,将接收的消息按“,”分隔为两列:id(类型为INT)和letter(类型为STRING)。

3. 查看Stream

通过SHOW STREAMS查看刚才创建的Stream。

我们可以看到结果中出现了刚刚建好的demo_stream。

4. 创建并触发一个StreamJob

a. 建一张新表demo_table,它需要和demo_stream有相同的schema:

b. 向demo_table插入demo_stream中的数据,这个操作会触发StreamJob的执行:

5. 列出正在运行的StreamJob

执行下面指令:

我们可以看到如下输出:

输出结果包含streamid,触发StreamJob的sql和状态。

6. 在Inceptor管理界面查看StreamJob运行状态

打开浏览器,访问http://:,可以在Inceptor的管理界面看到当前正在运行的StreamJob,其中此处的port由Transwarp Manager配置界面上的 inceptor.ui.port 参数决定,默认为4044:

7. 接收消息

此时demo_stream已经开始接收发布到之前创建的demo的消息。需要注意的是,demo_stream对发布到demo的消息的接收是从streamid=29008ed34b9e45bca784362948b88a85的StreamJob触发开始的,也就可以看到在“Active Stages”下有正在运行的demo_stream。

是说从执行INSERT开始,因此在执行INSERT之前发布到demo的消息都不会被demo_stream接收。所以,目前demo_table中没有任何记录:

接收并处理Kafka传来的数据

1. 切换到在运行的Kafka Producer的交互界面:

由于“hello”,“world”都是在streamid=29008ed34b9e45bca784362948b88a85的StreamJob触发前发布的,这两条消息都不会被demo_stream接收。

2. 在命令行中输入一些数据

由于已经规定了demo_stream接收消息的类型是由“,”分隔的两列文本,我们需要输入这个类型的消息,以便demo_stream处理。输入:

3. 查看数据

切换到Inceptor命令行的窗口,查看demo_table中的数据,我们可以看到demo_table中出现了我们刚才发布的四条消息:

4. 现在可以在demo_table上进行一些查询:

停止Streamjob

演示到此结束,用下面指令可以停止streamid为29008ed34b9e45bca784362948b88a85的streamjob:

StreamSQL的优势

相对于采用编程(Scala/Java)的方式开发流应用,采用StreamSQL具有以下优势:

  • 微批模式和事件驱动模式的一体化

    在同一套系统里,用户可以根据业务需求,灵活切换微批模式的流处理和事件驱动模式的流处理。

  • 极高的易用性

    而使用StreamSQL,用户只需要有编写普通SQL的经验,就可以写出高效、安全、稳定的流处理应用。

  • 性能提升

    在一些条件下,采用StreamSQL的方式甚至比编程方式获得更高的性能提升。这是因为StreamSQL做了一些特殊优化,在编程模式下无法轻易实现。

  • 产品化程度高

    通过编程的方式来实现流处理的另一个问题是产品化程度非常低。由于编程有较高的自由度,出现问题的可能性很大;而又由于编程的方式将流处理平台和用户程序绑定在一起,用户没办法及时分析出出错问题的root cause。SQL作为一个通用的接口将大大地提高产品化程度。

  • 迁移成本低

    用户原有的很多业务逻辑是通过SQL实现,如果通过编程的方式迁移到流上,迁移成本非常高,难以保证迁移后逻辑的正确性。而一旦采用StreamSQL,用户只需要修改少量SQL,迁移成本几乎接近零。

由于具备上述几项优点,Inceptor StreamSQL将会在建立流式应用时,表现出其强大的业务应对能力和易用性。用户将发现流式分析的实现过程也可以很便捷。

对此篇文章如有任何问题,欢迎以邮件形式联系我们:bigdataopenlab@transwarp.io