3 日志收集

​ 日志在我们项目中是非常重要的,日志的作用也有差异,例如根据日志查找问题、根据日志做数据分析。在我们秒杀系统中,活跃的热点商品其实并不多,我们往往需要对热点商品进行额外处理。用户每次抢购商品的时候,都是从详情页发起的,因此统计热度商品,详情页的访问频次可以算一个方向,详情页访问的频次我们可以记录访问日志,然后统计某一段时间的访问量,根据访问量评判商品是否是热点商品。

3.1 业务分析

​ 日志收集流程如上图,用户请求经过nginx,此时已经留下了用户对某个商品访问的足迹,我们可以在这里将用户访问的商品信息发送给我们kafka,采用大数据实时分析工具Apache Druid实时存储访问信息,再通过程序分析计算访问情况。

3.2 Kafka

​ 从上面流程图中,可以看到实现日志收集中间件是MQ,我们秒杀系统中会搭建MQ服务。

​ 目前市面上成熟主流的MQ有Kafka 、RocketMQ、RabbitMQ、ActiveMQ,我们这里对每款MQ做一个简单介绍。

Kafka ` Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。 1.快速持久化:通过磁盘顺序读写与零拷贝机制,可以在O(1)的系统开销下进行消息持久化; 2.高吞吐:在一台普通的服务器上既可以达到10W/s的吞吐速率; 3.高堆积:支持topic下消费者较长时间离线,消息堆积量大; 4.完全的分布式系统:Broker、Producer、Consumer都原生自动支持分布式,依赖zookeeper自动实现复杂均衡; 5.支持Hadoop数据并行加载:对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。 6.高并发:支持数千个客户端同时读写

RocketMQ

RocketMQ的前身是Metaq,当Metaq3.0发布时,产品名称改为RocketMQ。RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点 : 1.能够保证严格的消息顺序 2.提供丰富的消息拉取模式 3.高效的订阅者水平扩展能力 4.实时的消息订阅机制 5.支持事务消息 6.亿级消息堆积能力

RabbitMQ

使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP,STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了Broker架构,核心思想是生产者不会将消息直接发送给队列,消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)、数据持久化都有很好的支持。多用于进行企业级的ESB整合。

kafka官网:http://kafka.apache.org/

3.2.1 Kafka搭建

​ 单机版的kafka搭建非常简单,不过我们今天采用Docker搭建kafka。Kafka使用Zookeeper存储Consumer、Broker信息,安装kafak的时候,需要先安装Zookeeper。

Zookeeper安装:

docker run -d —name zookeeper -p 3181:3181 -v /etc/localtime:/etc/localtime wurstmeister/zookeeper

讲解:

/etc/localtime:/etc/localtime:使容器与宿主机时间能够同步

Kafka安装:

docker run -d --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.17.0.223:3181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.223:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -v /etc/localtime:/etc/localtime wurstmeister/kafka

讲解:

KAFKA_BROKER_ID:当前Kafka的唯一ID
KAFKA_ZOOKEEPER_CONNECT:当前Kafka使用的Zookeeper配置信息
KAFKA_ADVERTISED_LISTENERS:对外发布(暴露)的监听器,对外发布监听端口、地址
KAFKA_LISTENERS:监听器,告诉外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务。

IP更改:

外部程序如果想链接Kafka,需要根据IP链接,所以我们可以给Kafka一个IP名字,编辑:/opt/kafka_2.12-2.4.1/config/server.properties,在文件最末尾添加如下代码:

host.name=192.168.211.137

3.2.2 队列创建

进入kafka容器,创建队列:

docker exec -it kafka /bin/sh

cd /opt/kafka_2.12-2.4.1/bin

./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic itemaccess

讲解:

解释:使用kafka-topics.sh创建队列
        --create:执行创建一个新的队列操作
        --bootstrap-server:需要链接的kafka配置,必填
        --replication-factor 1:设置分区的副本数量
        --topic itemaccess:队列的名字叫itemaccess

3.2.3 消息发布

在kafka容器中执行消息发送(接着上面的步骤执行):

./kafka-console-producer.sh --broker-list localhost:9092 --topic itemaccess

讲解:

解释:使用kafka-console-producer.sh实现向kafka的test队列发送消息
        --broker-list:指定将消息发给指定的Kafka服务的链接列表配置  HOST1:Port1,HOST2:Port2
        --topic itemaccess:指定要发送消息的队列名字

我们发送的消息如下(输入信息,回车即可发送):

{"actime":"2020-4-10 9:50:10","uri":"http://www-seckill.itheima.net/items/333.html","IP":"119.123.33.231","Token":"Bearer itcast"}

3.2.4 消息订阅

在kafka容器中执行消息订阅(接着上面的步骤执行,但要先按ctrl+c退出控制台):

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic itemaccess --from-beginning

讲解:

解释:使用kafka-console-consumer.sh从kafka中消费test队列的数据
        --bootstrap-server:从指定的kafka中读取消息
        --topic itemaccess:读取队列的名字
        --from-beginning:从最开始的数据读取,也就是读取所有数据的意思

查看已经存在的主题:

./kafka-topics.sh --zookeeper localhost:3181 --list

删除主题:

./kafka-topics.sh --zookeeper localhost:3181 --delete --topic itemaccess

查看主题信息:

/kafka-topics.sh --zookeeper localhost:3181 --describe --topic itemaccess

3.2.5 信息查看

​ 上面执行整个流程如下图:

Kafka注册信息查看:

​ 我们进入到zookeeper中,可以查看到kafka的注册信息,相关操作命令如下:

docker exec -it zookeeper /bin/bash

cd bin

./zkCli.sh

ls /

效果如下:

关于Kafka的学习,大家可以直接参考:http://kafka.apache.org/quickstart

3.3 收集日志-Lua

​ Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。

​ OpenResty® 是一个基于 Nginx 与 Lua 的高性能 Web 平台,其内部集成了大量精良的 Lua 库、第三方模块以及大多数的依赖项。用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和动态网关。OpenResty 通过lua脚本扩展nginx功能可提供负载均衡、请求路由、安全认证、服务鉴权、流量控制与日志监控等服务

​ OpenResty® 通过汇聚各种设计精良的 Nginx 模块(主要由 OpenResty 团队自主开发),从而将 Nginx 有效地变成一个强大的通用 Web 应用平台。这样,Web 开发人员和系统工程师可以使用 Lua 脚本语言调动 Nginx 支持的各种 C 以及 Lua 模块,快速构造出足以胜任 10K 乃至 1000K 以上单机并发连接的高性能 Web 应用系统。

​ 关于Lua的基本知识,我们这里就不学习了,直接进入日志收集的使用操作。

3.3.1 OpenRestry安装

关于OpenRestry的学习,大家可以参考:http://openresty.org/cn/

下载OpenRestry:

wget https://openresty.org/download/openresty-1.11.2.5.tar.gz

解压:

tar -xf openresty-1.11.2.5.tar.gz

安装(进入到解压目录进行安装):

cd openresty-1.11.2.5

./configure --prefix=/usr/local/openresty --with-luajit --without-http_redis2_module --with-http_stub_status_module --with-http_v2_module --with-http_gzip_static_module --with-http_sub_module

make

make install

软件会安装到/usr/local/openresty,这里面会包含nginx。

配置环境变量:

vi /etc/profile

export PATH=/usr/local/openresty/nginx/sbin:$PATH

source /etc/profile

3.3.2 详情页发布

​ 商品详情页生成后会存储在/usr/local/server/web/items目录下,详情页是静态网页,我们可以使用Nginx直接发布。

​ 商品详情页的访问:http://192.168.211.137/items/S1235433012716498944.html,我们可以让所有以/items/的请求直接到/usr/local/server/web/目录下找。

修改nginx.conf:

cd /usr/local/openresty/nginx/conf/

vi nginx.conf

修改内容如下:

启动nginx,并访问测试:http://192.168.211.137/items/S1235433012716498944.html

3.3.3 Lua日志收集

​ 使用Lua实现日志收集,并向Kafka发送访问的详情页信息,此时我们需要安装一个依赖组件lua-restry-kafka。关于lua-restry-kafka的下载和使用,可以参考https://github.com/doujiang24/lua-resty-kafka

1)收集流程

​ 日志收集流程如下:

​ 用户请求/web/items/1.html,进入到nginx第1个location中,在该location中向Kafka发送请求日志信息,并将请求中的/web去掉,跳转到另一个location中,并查找本地文件,这样既可以完成日志收集,也能完成文件的访问。

2)插件配置

lua-restry-kafkahttps://github.com/doujiang24/lua-resty-kafka

资料\lua中已经提供了该包lua-resty-kafka-master.zip,我们需要将该文件上传到/usr/local/openrestry目录下,并解压,再配置使用。

解压:

unzip lua-resty-kafka-master.zip

配置:

修改nginx.conf,在配置文件中指定lua-resty-kafka的库文件位置:

lua_package_path "/usr/local/openresty/lua-resty-kafka-master/lib/?.lua;;";

配置效果图如下:

3)日志收集

​ 用户访问详情页的时候,需要实现日志收集,日志收集采用Lua将当前访问信息发布到Kafka中,因此这里要实现Kafka消息生产者。

我们定义一个消息格式:

{
  "actime": "2020-4-10 9:50:30",
  "uri": "http://192.168.211.137/items/S1235433012716498944.html",
  "ip": "119.123.33.231",
  "token": "Bearer ITHEIMAOOPJAVAITCAST"
}

生产者脚本:

定义好了消息格式后,创建一个生产者,往Kafka中发送详情页的访问信息。我们创建一个lua脚本,items-access.lua,脚本内容如下:

上图脚本内容如下:

--引入json解析库
local cjson = require("cjson")
--kafka依赖库
local client = require "resty.kafka.client"
local producer = require "resty.kafka.producer"
--配置kafka的链接地址
local broker_list = {
      { host = "192.168.211.137", port = 9092 }
}
--创建生产者
local pro = producer:new(broker_list,{ producer_type="async"})
 
--获取IP
local headers=ngx.req.get_headers()
local ip=headers["X-REAL-IP"] or headers["X_FORWARDED_FOR"] or ngx.var.remote_addr or "0.0.0.0"
 
--定义消息内容
local logjson = {}
logjson["uri"]=ngx.var.uri
logjson["ip"]=ip
logjson["token"]="Bearer ITHEIMA"
logjson["actime"]=os.date("%Y-%m-%d %H:%m:%S")
 
--发送消息
local offset, err = pro:send("itemaccess", nil, cjson.encode(logjson))
 
--页面跳转
local uri = ngx.var.uri
uri = string.gsub(uri,"/web","")
ngx.exec(uri)

4)nginx配置

​ 按照上面的流程图,我们需要配置nginx的2个location,修改nginx.conf,代码如下:

上图代码如下:

server {
  listen       80;
  server_name  localhost;
 
  #/web开始的请求,做日志记录,然后跳转到下面的location
  location /web/items/ {
  content_by_lua_file /usr/local/openresty/nginx/lua/items-access.lua;
  }
 
 
  #商品详情页,以/items/开始的请求,直接在详情页目录下找文件
  location /items/ {
  #日志处理
  #content_by_lua_file /usr/local/openresty/nginx/lua/items-access.lua;
  root /usr/local/server/web/;
  }
}

5)日志收集测试

请求地址:http://192.168.211.137/web/items/S1235433012716498944.html

查看Kafka的itemaccess队列数据: