从在用的四家cdn的大量日志中,统计出每场直播的流量数据,包括国内流量和海外流量。
获取日志
目前已有的数据来源:四家cdn服务商。每个服务商都有自己不同的日志接口,不同的日志获取方式,可以把日志类型分为:
各家厂商cdn日志的收集方法参见各自官网。获取到的日志示例文件名如下:
cdn_code | log_name |
---|
netcenter | 2017-12-06-2300-2330_rtmp-wsz.qukanvideo.com.cn.log.gz |
dnion | hls-d.quklive.com_20180509_03_04.gz |
alicdn | play-a.quklive.com_2017_12_07_1100_1200.gz |
qukan->alicdn | recordcdn-sz.qukanvideo.com_2017_12_06_1800_1900.gz |
tencent | 2017120607_hangzhouqukan.cdn.log.gz |
可以从文件名判断属于日志所属的cdn代码和对应的协议。将cdn代码、播放类型代码、协议代码对应的关系直接存在字典中:
通过日志名称匹配到域名,并取得对应的cdn代码、播放类型代码、协议代码,然后对具体的日志做不同的正则处理。
日志存入HDFS
直接使用hdfs命令拷贝到HDFS中
通过日志下载程序调用接口下载到的日志可以使用以下命令直接拷贝到hdfs,拷贝成功一个日志,就删除对应的本地文件系统日志。示例命令如下:
使用分布式日志收集系统flume导入到HDFS中
对于用户访问日志的采集,更多的是使用flume,并且会将采集的数据保存到HDFS中 。虽然本次项目日志不需要采用此种方式,但是也可以作为一个手段。flume在分布式日志收集上比较类似于ELK中的logstash,可以对比学习下。最简单(单agent)的数据流模型如下:
具体使用方法参见:Flume 1.8.0 User Guide
hadoop压缩日志
各个cdn厂商提供的cdn日志都是gz格式的压缩日志,因此必须考虑对压缩日志的处理。Hadoop3 对于压缩格式是自动识别的。如果我们压缩的文件有相应压缩格式的扩展名(比如 lzo,gz,bzip2 等)。Hadoop 会根据压缩格式的扩展名自动选择相对应的解码器来解压数据,此过程完全是 Hadoop 自动处理,我们只需要确保输入的压缩文件有扩展名。因此这一步可以直接省略自行解压的操作。
但是需要注意在mapper环境变量中得到的输入文件的文件名是解压之前的文件名,也就是带压缩扩展名的。在hadoop3中可以通过以下变量验证:
需要稍微注意的地方有两点:
- input_file_path保存的是文件在hdfs上的完整路径。 比如:
hdfs://node-master:9000/user/hadoop/cdn_log/2018-05-09-0100-0130_rtmpdist-wsz.qukanvideo.com.cn.log.gz
- 老版本的api为
map_input_file
,在集群上尝试了老版本的api,代码会报错。
MR程序
具体代码参见Github:https://github.com/Flowsnow/hadoop-mapreduce-demo
需要先确定mapper和redecer中间的数据格式,需要考虑到shuffle。因为最终是要按照live_id分组进行统计,因此live_id作为key,中间数据如下:
使用'\t'
分隔,ip用于后续判断属于国内还是海外。
flow_statistic_mapper.py
主要从各个cdn日志中筛选出有效的格式化数据,因此最多的操作就是对日志文件名和日志每一行进行正则匹配。
flow_statistic_reducer.py
根据ip查询是国内流量还是海外流量,对每场直播进行统计。
代码调试
使用linux管道、cat命令、sort命令综合使用进行调试,示例调试命令如下:
因为原始日志是压缩格式的,因此调试时可以先把日志解压然后调试,相对应的mapper中的输入文件名称也会有变化,需要注意。
MR调用
命令和执行结果如下:
流量数据导出到Mysql
使用Sqoop导出HDFS中的流量数据到Mysql中,需要创建有对应字段的新表,具体使用参见Sqoop导入导出文档。
hadoop streaming错误排查
使用hadoop streaming编写MR程序时最常见的错误:hadoop-streaming-subprocess-failed-with-code-1
对应的需要检查以下几个问题:
- 如果是通过./mapper.py的方式执行,则需要给mapper.py增加执行权限
- python shell命令执行时,py文件头部需要指定
#!/usr/bin/env python
- Python环境和程序依赖的第三方库需要在集群中的所有节点上安装
上述几项没有问题之后,基本就是代码层面的问题了。需要逐层排查
参考: