4 Apache Druid日志实时分析

4.1 业务分析

​ 秒杀业务中,通常会有很多用户同时蜂拥而上去抢购热卖商品,经常会出现抢购人数远大于商品库存。其实在秒杀过程中,热卖商品并不多,几乎只占1%,而99%的流量都源自热卖商品,很有可能因为这1%的热卖商品导致服务器宕机,因此针对热卖商品我们要做特殊处理。

​ 热卖商品我们这里称为热点商品,针对热点商品的处理,有这么几种思路,一是优化,二是限制,三是隔离

优化:优化热点数据最有效的办法就是缓存热点数据

限制:限制其实是一种削峰手段,我们可以把热点商品抢单采用队列来存储用户抢单信息,将热点抢单限制在一个队列里,防止热点商品抢单占用太多的资源服务,而使得其他服务无法获取抢单机会。

隔离:隔离其实就是将热点商品和非热点商品进行数据源的隔离、操作流程的隔离,不要因为1%的热点数据影响到另外的99%数据。我们可以把热点商品数据存储到缓存中和非热点数据分开,抢单程序也可以和非热点抢单分开。

热点数据又分为离线热点数据和实时热点数据,离线热点数据主要是分析过往热点商品信息,这个统计起来并无难度,可以直接从历史数据库中查询分析。但根据用户抢单实时数据进行分析是一个很困难的事,首先要存储大量的访问信息,同时还能高效的实时统计访问日志信息,从中获取热点商品信息。

4.2 Apache Druid

4.2.1 Apache Druid介绍

介绍

​ Apache Druid 是一个分布式的、支持实时多维 OLAP 分析的数据处理系统。它既支持高速的数据实时摄入,也支持实时且灵活的多维数据分析查询。因此 Druid 最常用的场景是大数据背景下、灵活快速的多维 OLAP 分析。 另外,Druid 还有一个关键的特点:它支持根据时间戳对数据进行预聚合摄入和聚合分析,因此也有用户经常在有时序数据处理分析的场景中用到它。

OLTP与OLAP的区别:

OLTP是传统的关系型数据库的主要应用,主要是基本的、日常的事务处理。

OLAP是数据仓库系统的主要应用,支持复杂的分析操作,侧重决策支持,并且提供直观易懂的分析查询结果。

OLAP和OLTP区别:

OLTPOLAP
用户面向操作人员,支持日常操作面向决策人员,支持管理需要
功能日常操作处理分析决策
DB 设计面向应用,事务驱动面向主题,分析驱动
数据当前的,最新的细节的历史的,聚集的,多维的,集成的,统一的
存取可更新,读/写数十条记录不可更新,但周期性刷新,读上百万条记录
工作单位简单的事务复杂的查询
DB 大小100MB-GB100GB-TB

OLTP就是面向我们的应用系统数据库的,OLAP是面向数据仓库的。

Apache Druid 特性: 

亚秒响应的交互式查询,支持较高并发。
支持实时导入,导入即可被查询,支持高并发导入。
采用分布式 shared-nothing 的架构,可以扩展到PB级。
支持聚合函数,count 和 sum,以及使用 javascript 实现自定义 UDF。
支持复杂的 Aggregator,近似查询的 Aggregator 例如 HyperLoglog 以及 Yahoo 开源的 DataSketches。
支持Groupby,Select,Search查询。

开源OLAP数据处理系统性能方面我们做个对比:

Apache Druid 架构设计

Druid自身包含下面4类节点:

1.Realtime Node:即时摄入实时数据,生成Segment(LSM-Tree实现与Hbase基本一致)文件。
2.Historical Node:加载已生成好的数据文件,以供数据查询。
3.Broker Node:对外提供数据查询服务,并同时从Realtime Node和Historical Node查询数据,合并后返回给调用方。
4.Coordinator Node:负责Historical Node的数据负载均衡,以及通过Rule管理数据生命周期。

同时,Druid集群还包含以下3类外部依赖:

1.元数据库(Metastore):存储druid集群的元数据信息,如Segment的相关信息,一般使用MySQL或PostgreSQL
2.分布式协调服务(Coordination):为Druid集群提供一致性服务,通常为zookeeper
3.数据文件存储(DeepStorage):存储生成的Segment文件,供Historical Node下载,一般为使用HDFS

数据摄入

​ Apache Druid同时支持流式和批量数据摄入。通常通过像 Kafka 这样的消息总线(加载流式数据)或通过像 HDFS 这样的分布式文件系统(加载批量数据)来连接原始数据源。

4.2.2 Apache Druid安装

Apache Druid的安装方面,我们可以参考官方文档实现。

JDK:java8(8u92+)

下载地址:https://druid.apache.org/downloads.html

解压该压缩包:

 tar -xf apache-druid-0.17.0-bin.tar.gz cd apache-druid-0.17.0

包文件如下:

启动单机版Apache Druid:

./bin/start-micro-quickstart

启动后,访问:http://192.168.211.137:8888

4.2.3 数据摄入

4.2.3.1 离线数据摄入

从一个文件中将数据加载到Apache Druid,参考地址:W<https://druid.apache.org/docs/latest/tutorials/tutorial-batch.html>,如下操作:

1)点击Load dataLocal diskConnect data

2)选择要导入的数据

我们要导入的数据在/usr/local/server/apache-druid-0.17.0/quickstart/tutorial/wikiticker-2015-09-12-sampled.json.gz,需要把该文件的相对路径填写到右边表单中,再点击Apply,如下图:

3)解析数据

在上一个步骤上点击Next:Parse data,此时会解析导入的数据,如下图:

4)解析时间

在上一个步骤上点击Next: Parse time,Apache Druid要求每条数据都有一个time列,如果我们导入的数据没有该列,Apache Druid会自动帮助我们创建该列,如下图:

5)数据分区设置

点击下一步一直到Partition,我们根据需要设置数据分区方式,如下图:

讲解:

Type:数据粒度使用的类型
Segment granularity:分片文件每个segment包含的时间戳范围
Force guaranteed rollup:是否启用批量推送模式
Partitioning type:分区类型
Max rows per segment:用于分片。确定每个段中的行数。

更多参数如下图:

6)设置数据源

Publish设置,注意设置数据源名字,这里类似数据库中数据库名字。

7)提交配置

最后一步需要提交配置,如下图,点击submit即可。

4.2.3.2 实时数据摄入

​ 前面的案例是离线数据的摄入,接着我们实现实时数据摄入,我们以收集用户访问商品详情页的访问记录为例,如下图:

参考地址:https://druid.apache.org/docs/latest/tutorials/tutorial-kafka.html

1)load data

2)配置Kafka源

3)配置数据源名字

其他的步骤和之前文件摄入一样,直到配置数据源名字,我们配置数据源名字叫itemlogs,最后一步submit和之前一样,如下图:

查询效果如下:

4.2.4 Druid SQL

4.2.4.1 简介

​ Apache Druid SQL是一个内置的SQL层,是Druid基于JSON的查询语言的替代品,由基于Apache Calcite的解析器和规划器提供支持。Druid SQL将SQL转换为Broker本机Druid查询,然后将其传递给数据进程。除了在Broker上转换SQL的(轻微)开销之外,与本机查询相比,没有额外的性能损失。

4.2.4.2 语法

每个Druid数据源都显示为“Druid”模式,这也是默认模式,Druid数据源引用为druid.dataSourceName或者简单引用dataSourceName

可以选择使用双引号引用数据源和列名等标识符。要在标识符中转义双引号,请使用另一个双引号,例如"My ""cat"" identifier",所有标识符都区分大小写。

文字字符串应引用单引号,如'foo',文字数字可以用100(表示整数),100.0(表示浮点值)或1.0e5(科学记数法)等形式编写。时间戳可以写成TIMESTAMP '2000-01-01 00:00:00'。时间算法,可以这样写INTERVAL '1' HOURINTERVAL '1 02:03' DAY TO MINUTEINTERVAL '1-2' YEAR TO MONTH,等等。

Druid SQL支持具有以下结构的SELECT查询:

[ EXPLAIN PLAN FOR ]
[ WITH tableName [ ( column1, column2, ... ) ] AS ( query ) ]
SELECT [ ALL | DISTINCT ] { * | exprs }
FROM table
[ WHERE expr ]
[ GROUP BY exprs ]
[ HAVING expr ]
[ ORDER BY expr [ ASC | DESC ], expr [ ASC | DESC ], ... ]
[ LIMIT limit ]
[ UNION ALL <another query> ]

查询所有:

SELECT * FROM "itemlogs"

查询count列:

SELECT "count" FROM "itemlogs"

查询前5条:

SELECT * FROM "itemlogs" LIMIT 5

分组查询:

SELECT ip FROM "itemlogs" GROUP BY ip

排序:

SELECT * FROM "itemlogs" ORDER BY __time DESC

求和:

SELECT SUM("count") FROM "itemlogs"

最大值:

SELECT MAX("count") FROM "itemlogs"

平均值:

SELECT AVG("count") FROM "itemlogs"

查询6年前的数据:

SELECT * FROM "wikiticker" WHERE "__time" >= CURRENT_TIMESTAMP - INTERVAL '6' YEAR

去除重复查询:

SELECT DISTINCT "count" FROM "accessitem"
4.2.5 JDBC查询Apache Druid

​ Apache Calcite是面向Hadoop新的查询引擎,它提供了标准的SQL语言、多种查询优化和连接各种数据源的能力,除此之外,Calcite还提供了OLAP和流处理的查询引擎。

​ 如果使用java,可以使用Calcite JDBC驱动程序进行Druid SQL查询。可以下载Avatica客户端jar后,将其添加到类路径并使用连接字符串jdbc:avatica:remote:url=http://192.168.211.137:8082/druid/v2/sql/avatica/

​ 如果是Maven项目,需要引入avatica-core包,如下:

<dependency>
    <groupId>org.apache.calcite.avatica</groupId>
    <artifactId>avatica-core</artifactId>
    <version>1.15.0</version>
</dependency>

使用案例:

public static void main(String[] args) throws Exception{
    //链接地址
    String url = "jdbc:avatica:remote:url=http://192.168.211.137:8082/druid/v2/sql/avatica/";
    AvaticaConnection connection = (AvaticaConnection) DriverManager.getConnection(url);
 
    //SQL语句,查询2020-4-10 11:50:30之后的访问uri和访问数量
    String sql="SELECT uri,count(*) AS \"viewcount\" FROM(SELECT * FROM \"itemlogs\" WHERE __time>'2020-4-10 11:50:30' ORDER BY __time DESC) GROUP BY uri LIMIT 100";
 
    //创建Statment
    AvaticaStatement statement = connection.createStatement();
 
    //执行查询
    ResultSet resultSet = statement.executeQuery(sql);
 
    while (resultSet.next()) {
        //获取uri
        String uri = resultSet.getString("uri");
        String viewcount = resultSet.getString("viewcount");
        System.out.println(uri+"--------->"+viewcount);
    }
}

知识点:

Druid的时区和国内时区不一致,会比我们的少8个小时,我们需要修改配置文件,批量将时间+8,代码如下:

sed -i "s/Duser.timezone=UTC/Duser.timezone=UTC+8/g" `grep Duser.timezone=UTC -rl ./`