DataBand数据帮 轻量级一站式大数据分析平台

   日期:2024-12-27    作者:mlgva 移动:http://3jjewl.riyuangf.com/mobile/quote/62631.html

详情开发使用介绍

DataBand(数据帮,快速采集清洗,任务管理,实时流和批处理数据分析,数据可视化展现,快速数据模板开发,ETL工具集、数据科学等。是轻量级的一站式的大数据平台。 我们致力于通过提供智能应用程序、数据分析和咨询服务来提供最优解决方案。

软件架构

技术栈

存储
  • 分布式存储:HDFS、HBase
  • 行式关系存储:MySQL、Oracle
  • 列式存储:ClickHouse
  • 列族存储:HBase、Cassandra
  • 文档库:ElasticSearch、MongoDB
计算
  • 计算引擎:Presto、Hive
  • 流处理:Storm、Flink
集成
  • Flume
  • Filebeat
  • Logstash
前端技术栈
  • Vue
  • Element UI
后端技术栈
  • Spring Boot
  • Spring Cloud
  • MyBatis

工程说明

大数据模拟数据源生成数据(数据准备工程

数据源

  • databand-mock-api:接口模拟工具,模拟业务系统api
  • databand-mock-log:日志模拟工具,手动产生大量的日志数据供调试测试,比如Syslog、log、CSV生成、Json、MySQL注入、RPC写、NetCat等
  • databand-mock-mq:日志模拟工具,通过MQ写产生大量的日志数据供调试测试,比如RabbitMQ写、Kafka写等
  • databand-mock-hadoop:大数据日志模拟工具,hdfs和mapreduce相关
数据采集清洗(采集清洗工程

  • databand-etl-mysql_ods:采集清洗mysql数据比如MySQL到ods临时中间库(包括Redis、Kafka等)
  • databand-etl-mysql_olap:采集清洗mysql数据到OLAP数据仓库
  • databand-etl-mysql_hadoop:采集清洗mysql数据到Hadoop分布式存储
  • databand-etl-logfile_ods:采集清洗半结构化日志文件,比如json、xml、log、csv文件数据到ods临时中间库
  • databand-etl-logfile_olap:采集清洗半结构化日志文件数据到OLAP数据仓库
  • databand-etl-logfile_hadoop:采集清洗日志文件数据到Hadoop分布式存储
  • databand-etl-mq_ods:通过MQ消费采集数据,入ods库
  • databand-etl-mq_olap:通过MQ消费采集数据,入OLAP库
  • databand-etl-mq_hadoop:通过MQ消费采集数据,入Hadoop;- databand-ml:数据科学工程
数据分析作业(定时作业调度工程
  • databand-job-springboot:定时任务作业调度服务,支持shell,hive,python,spark-sql,java jar任务。
  • databand-streamjob-springboot:流数据作业,支持kafka数据消费至clickhouse、mysql、es等。
数据分析门户(后端管理和前端展示工程
  • databand-admin-ui:前后端分离的纯前端UI工程,数据展现(目前未开发
  • databand-admin-thymeleaf:后端权限、关系、站点配置管理(前后端不分离,正在开发的版本,基于若依框架
  • databand-admin-api:数据api服务
  • databand-admin-tools:BI工具集
实时流数据(2021年-9月更新
  • databand-rt-flinkstreaming:flink实时数据流处理。主要是PV、UV,涉及窗口、聚合、延时、水印、统计、checkpoint等基本用法
  • databand-rt-redis:实时处理的一些缓存存储
  • databand-rt-sparkstreaming:spark实时数据流处理,和flink的功能近似,主要structured streaming

愿景目标

3年愿景目标 

工程细节说明

databand-mock-api (模拟数据源API工程) API模拟工具
  • App.java:一个简单的mock控制台程序

api mock详情介绍

api mock工程源码

databand-mock-log (模拟数据源生成日志数据工程) 日志模拟工具

目前是简单的控制台小程序,直接运行main即可。

  • CsvMock.java:csv文件生成,运行后在"FILE_PATH"定义的文件夹中可找到csv文件
  • LogMock.java:log文件生成,生成路径见配置文件:logback.xml。 win下默认“c:/logs/”,linux 或 mac下路径请自行修改
  • JsonMock.java:json文件生成,在"FILE_PATH"定义的文件夹中可找到json文件
  • XmlMock.java:xml文件生成,在"FILE_PATH"定义的文件夹中可找到json文件
  • RpcMock.java:rpc输出,运行后可以用flume(或filebeat)进行测试,配置文件见:/flumeConf/avro-memory-log.properties:运行脚本: flume-ng agent --conf conf --conf-file /usr/app/apache-flume-1.8.0-bin/avro-memory-log.properties --name a2 -Dflume.root.logger=INFO,console
  • SyslogMock.java:syslog(udp)输出,运行后可以用flume(或filebeat)进行测试,配置文件见:/flumeConf/syslog-log.properties
  • TcpMock.java:Tcp输出,运行后可以用flume进行测试,配置文件见:/flumeConf/syslog-log.properties
  • MySQLMock.java:mysql数据生成,通过list键值对形式对数据表进行写操作。

log mock工程源码

databand-mock-mq (模拟数据源生成日志数据工程) MQ消息模拟生成工具

目前是简单的控制台小程序,直接运行main即可。

  • KafkaProducer.java:Kafka消息生成
  • KafkaConsumer.java:Kafka消息消费
  • RabbitMQProducer.java:RabbitMQ消息生成
  • RabbitMQConsumer.java:RabbitMQ消息消费

mq mock工程源码

数据源日志

类型分为

  • CSV日志,用于批处理,采用UTF-8字符集,每行)表示一条记录,每条记录中各个字段的值使用双引号括起来,并使用逗号(,)分隔
  • Kafka 日志,用于流处理,生产者策略性的产生一些有偏移属性的带日期时间数据。

业务

  • a)产品销售日志,采用CSV格式
  • b)节目播出日志,采用CSV格式
  • c)搜索热词日志,采用kafka
  • d)广告播放日志,采用kafka

数据定义,批处理类型日志,原始数据源为csv,暂时以这两个业务作为批处理数据演示,实际上平台将是与业务无关的,只关注数据流和数据服务。

一、产品销售csv日志: 处理类org.databandtech.logmock.ProductSalesCSVLog

  • 1 产品id productId
  • 2 产品分类id categoryId
  • 3 型号规格 modelId
  • 4 颜色 color
  • 5 买家id userId
  • 6 购买日期 saleDatetime
  • 7 购买数量 buyCount
  • 8 购买金额 buyTotle
  • 9 折扣金额 buyDiscount
  • 10 城市 cityCode
  • 11 地址 address

二、节目播出csv日志 处理类org.databandtech.logmock.ShowsCSVLog

  • 1 用户id userId
  • 2 状态类型码 status
  • 3 城市 cityCode
  • 4 区县 areaCode
  • 5 收视开始时间 beginTime
  • 6 收视结束时间 endTime
  • 7 节目ID showId
  • 8 栏目ID columnId
  • 9 频道ID channelId
  • 10 高清标志码 hd
  • 11 节目类型码 showType

状态类型码

  • 1"tv_playing"、2"vod_playing"、3"browsing"、4"tvod_playing"、5"ad_event" 、6"external_link"、7"order"

高清标志码

  • 0:标清、1:高清、2:智能、3:其他

节目类型码

  • 电视剧:tv、电影:movie、综艺:variety、其他:other

流类型日志,原始数据源为kafka,暂时以这两个业务作为流数据演示,实际上平台将是与业务无关的,只关注数据流和数据服务。

三、搜索热词日志: 处理类org.databandtech.mockmq.HotWordKafkaLog

Kafka Topic: HOTWORDS

  • 1 KEYWORD 热词
  • 2 USERID 用户id
  • 3 TS 搜索时间

四、广告监测日志 处理类org.databandtech.mockmq.AdKafkaLog

Kafka Topic: ADMONITOR

  • 1 OS 设备的操作系统类型
  • 2 UID 用户id
  • 3 MAC1 MAC地址
  • 4 MACCN 当前联网形式
  • 5 IP IP
  • 6 ROVINCECODE 所属省份代码
  • 7 CITYCODE 所属城市代码
  • 8 AREACODE 所属区县代码
  • 9 TS 客户端触发的时间
  • 10 ADMID 广告素材
  • 11 ADID 广告主
  • 12 APPNAME 应用名称

分布式存储-原始记录备份

从CSV日志生成的数据源需要做原始文档的备份存储,使用HDFS,而kafka流数据则依据具体情况选择是否存入HDFS或者HIVE,还是直接清洗后,存入ClickHouse等。

将CSV日志原始存档进HDFS的方式

  • 1、直接Put文件目录进hdfs文件系统
  • 2、使用Flume的spooling-to-hdfs,使用方法见databand-etl-flume中的spooling-memory-hdfs2.properties
  • 3、使用databand-job-springboot定时任务,类型为HdfsBackupJob。

将kafka存进HDFS的方式

  • 1、使用Flume的kafka-to-hdfs,使用方法见databand-etl-flume中的kafka-flume-hdfs.properties
  • 2、使用Flink或者Storm导入,例子见databand-etl-storm、databand-etl-flink
  • 3、使用kafka的客户端库和hdfs客户端库,自行开发。

分布式存储-数据仓库存档

产品表外部表,建表语句

CREATE EXTERNAL TABLE product(address STRING,buycount INT,buydiscount INT,buytotle INT,categoryid STRING,citycode STRING,color STRING,modelid STRING,productid STRING,saledatetime STRING,userid STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY ' ' STORED AS TEXTFILE LOCATION '/home/product';

节目表外部表,建表语句

CREATE EXTERNAL TABLE show(areacode STRING,channelid STRING,citycode STRING,columnid STRING,hd INT,showdatetime STRING,showduration INT,showid STRING,status STRING,userid STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
LINES TERMINATED BY ' ' STORED AS TEXTFILE LOCATION '/home/show';

– 可以用load data引入数据,覆盖引入: – LOAD DATA LOCAL INPATH '/home/product/2020-12-20.csv' – OVERWRITE INTO TABLE product;

– HDFS 文件 – hive> LOAD DATA INPATH '/home/product/2020-12-20.csv' – > OVERWRITE INTO TABLE product;

– 本地文件 – hive> LOAD DATA LOCAL INPATH '/home/product/2020-12-20.csv' – > OVERWRITE INTO TABLE product;

Count计数语句

  • 计算累计订单数:select count(1) from product;
  • 计算地区为广州的订单数:select count(1) from product where cityCode="广州";
  • 计算节目数:select count(1) from show;
  • 计算日志为破茧的记录数:select count(1) from show where showid="破茧";
  • 计算2020-12月的全部DELL电脑订单金额:select sum(buytotle) from product where modelid="DELL" and instr(saledatetime,"2020-12")>0;

可以测试一下hive输出结果: 

分析规划 - 统计指标规划

产品销售日志 统计规划

X轴维度 - key

  • 时间维度: 年、季、月、周
  • 产品分类维度:按产品类型,如电视、PC
  • 按产品型号规格维度
  • 按城市分组维度
  • 按购买者维度

Y轴维度 - value

  • 订单数
  • 订单金额

指标

  • 产品各分类订单数,product_order_count_by_cate,按年、季、月、周、天
  • 产品各型号规格订单数,product_order_count_by_model,按年、季、月、天
  • 各城市分布订单数,product_order_count_by_city,按年、月
  • top20订购者订单数,product_order_count20_by_user,按年、月
  • 产品各分类订单金额,product_order_amount_by_cate,按年、季、月、周、天
  • 产品各型号规格订单金额,product_order_amount_by_model,按年、季、月、天
  • 各城市分布订单金额,product_order_amount_by_city,按年、月
  • top20订购者订单金额,product_order_amount20_by_user,按年、月
节目播出日志 统计规划

X轴维度 - key

  • 时间维度: 年、季、月、周
  • 城市维度
  • 频道维度
  • 节目维度
  • 用户维度

Y轴维度 - value

  • 播放时长
  • 播放次数

指标

  • 按城市分组播放时长,show_dration_by_city,按年、季、月、周、天
  • 按频道分组播放时长,show_dration_by_channel,按年、季、月
  • 按节目top20播放时长,show_dration20_by_show,按年、月
  • 按用户top20播放时长,show_dration20_by_user,按年、月
  • 按城市分组播放次数,show_times_by_city,按年、季、月、周、天
  • 按频道分组播放次数,show_times_by_channel,按年、季、月
  • 按节目top20播放次数,show_times20_by_show,按年、月
  • 按用户top20播放次数,show_times20_by_user,按年、月
搜索热词日志 统计规划

待完成

广告监测日志 统计规划

待完成

批处理统计分析

产品销售日志 批处理统计分析计算

产品各分类订单数(按天),hive sql

  • select categoryid,saledatetime,sum(buycount) from product group by categoryid,saledatetime order by saledatetime;

产品各分类订单数(按天,指定某天,用于增量定时任务导出统计)

  • select categoryid,saledatetime,sum(buycount) from product group by categoryid,saledatetime having saledatetime="2020-12-30" order by saledatetime ;

其他分析查询SQL略,按天统计的数据都有了,按周、月、季、年就以此聚合。

导出结果到本地文件,相同记录则覆盖

use default;
-- Save to [LOCAL]
INSERT OVERWRITE LOCAL DIRECTORY '/home/product_order_count_by_cate'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
-- SQL
select categoryid,saledatetime,sum(buycount) from product group by categoryid,saledatetime order by saledatetime;

导出结果到HDFS,相同记录则覆盖

use default;
-- Save to HDFS
INSERT OVERWRITE DIRECTORY '/home/product_order_count_by_cate'
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
-- SQL
select categoryid,saledatetime,sum(buycount) from product group by categoryid,saledatetime order by saledatetime;

执行完之后可以查看hdfs的记录是否已经保存

  • hadoop fs -tail /home/product_order_count_by_cate/000000_0

节目播出日志 批处理统计分析计算

批处理定时任务

databand-job-springboot:定时任务作业调度服务,支持Shell,Hadoop MR,HiveSQL,Python,Spark,Flink,JavaJar任务。

  • 注入见TaskConfig的方法scheduledTaskJobMap() 的例子,目前仅提供java注入,未来有数据库加载注入和配置文件注入
  • 命令行任务,CommandExecuteJob的实例
  • 原始记录备份(从本地,从数据源中备份原始数据到HDFS,HdfsBackupJob;
  • 原始记录备份(到本地,从HDFS数据源中备份原始数据到本地文件,HdfsToLocalFileJob;
  • Hive SQL任务,HiveSqlQueryJob,hive执行DQL查询任务,需要返回数据集,并对数据集进行分析数据库存储,存储的数据用于报表图表等展现,必须实现SavableTaskJob接口;
  • Hive SQL任务,HiveSqlExecuteJob,hive执行脚本任务,用于DDL、DML操作,比如load data等;
  • 统计分析计算,Hadoop中运行MR,执行处理,HadoopMRJob;
  • 更多任务类型,不一一列出。

其中每种类型都有针对各个统计指标的实例:JobInstances 和 JobType是多对一的关系。

运行方式

  • 1、先导入数据:databand_scheduletask.sql
  • 2、查看任务: http://localhost:8081/getAllSchedule

3、启动单一任务,目前还没有统一的管理界面,未来会开发完善

  • http://localhost:8081/start?jobcode=WindowsDir1
  • http://localhost:8081/start?jobcode=WindowsIP1
  • http://localhost:8081/start?jobcode=hdfs_product2020
  • http://localhost:8081/start?jobcode=hdfs_toLocal2020
  • http://localhost:8081/start?jobcode=hdfs_toLocal2020_1

流数据任务

databand-streamjob-springboot:流数据持久化任务


特别提示:本信息由相关用户自行提供,真实性未证实,仅供参考。请谨慎采用,风险自负。


举报收藏 0评论 0
0相关评论
相关最新动态
推荐最新动态
点击排行
{
网站首页  |  关于我们  |  联系方式  |  使用协议  |  隐私政策  |  版权隐私  |  网站地图  |  排名推广  |  广告服务  |  积分换礼  |  网站留言  |  RSS订阅  |  违规举报  |  鄂ICP备2020018471号