推广 热搜: 行业  机械  设备    经纪  教师  系统  参数    蒸汽 

漫谈Clickhouse Join

   日期:2024-11-07     移动:http://yishengsujiao.xhstdz.com/quote/1481.html

 

漫谈Clickhouse Join

随着公司业务的不断发展,不同业务线数据都有了大规模积累。在此基础上为了精细化运营,更好地服务客户,就需要通过积累的数据沉淀出各类实体标签,比如用户标签、帖子标签、基金标签。

+系统应运而生,包括标签体系、个体画像、标签分群等主要功能模块。

标签体系: 通过可视化界面,结合业务场景需求,创建并维护用户标签、内容标签、标的标签,构建完善的标签体系。标签是数据平台的基础,个体画像、用户分群、群体洞察、用户分析都依托于标签,作为标签的统一管理平台,满足各业务线对标签的数据需求。

个体画像:以标签体系为基础,打造用户、标的、内容全景视图,全方位展现用户、标的、内容信息,对个体情况一目了然。基于个体画像可以充分利用沉淀的数据资产,对多维度海量用户数据进行整理和分析后形成精准用户洞察,进而赋能业务,实现精细化运营和精准营销。

标签分群:以标签为基础的人群服务输出,以API或数据下载方式为用户营销、产品运营提供服务。通过勾勒用户行为画像精准划分不同群体,筛选出所需要的特定用户群体用于精准投放、精准营销、个性化推送等场景,以满足不同用户的不同需求。

经过技术对比调研,最终雪+ 平台标签计算和存储引擎采用了 Clickhouse。

为什么我们选择 Clickhouse 呢?首先,Clickhouse 分布式集群能承载大数据存储和计算,现已广泛应用于各互联网公司,用户和社区活跃,能借鉴一系列大厂分享的生产环境最佳实践输出。天下武功,唯快不破,Clickhouse 作为在 OLAP 领域翘楚,向来以极致的速度闻名于世,能满足我们实时查询的需求。有别于市面上绝大部分开源 OLAP 引擎,Clickhouse 在一定程度上支持删除修改操作,在数据处理上能满足我们灵活删改标签数据的需求。Clickhouse 适合离线批量导入,也支持实时写入数据,离线和实时标签的需求都可以得到支持。

Clickhouse 单表查询性能极佳,一般业务都会先ETL成大宽表导入 Clickhouse中提供实时查询。而我们也是这么做的,不过每个业务线实体一张宽表,比如雪球社区用户标签宽表、蛋卷基金用户标签宽表、社区帖子标签宽表、股票标签宽表。

作为性能强悍到爆表的 OLAP 组件,Clickhouse 几乎能够覆盖雪+ 平台所有场景的需求,不过在一个地方触碰到了其最薄弱的地方 Join。

接下来我们一起来了解一下 Clickhouse Join 需要注意的优化技巧和原理吧。若有不对之处,欢迎各路大佬指正,在此仅为抛砖引玉。

随着业务发展,出现了需要涉及表关联的情况,而 Join 正是 Clickhouse 的阿喀琉斯之踵。当在Clickhouse 上遇到 Join 需求时,大家往往都会眉头紧皱。有别于单表查询上的惊艳,Clickhouse 在Join方面乏善可陈,速度一不小心由火箭飞天变成乌龟爬爬。

我们用户分群功能在创建分群时,在创建规则里可选择不同的标签条件组合,同时需要支持跨业务线实体圈选。不过由于数据合规需要,各部分数据不能整合成一张宽表。

能够实现业务需求的 SQL 伪代码如下,需要把业务线1用户标签表关联 ID maping表再关联业务线2用户标签表,三张表关联寻找标签码 k000088 = '稳健型',即业务线1用户标签风险承受力为稳健型的业务线2用户。

下面SQL中所有表名和字段名非实际名称,只是代指。每张表中数据每天按T+1粒度同步前一天 Hive中的数据快照并且按天分区,其中业务线1用户表 user_label_1 单分区数据量约六千万,业务线2用户表 user_label_2和 ID 映射表 id_maping 单分区数据量约两百万

 

 此时执行会报错,经错误信息提示,原来 Clickhouse 分布式表 Join 需要加上 Global,即变成

 

再次执行,发现需要特别长时间,时延达到分钟级,完全无法忍受,继续优化。

我们把时间分区条件放到子查询里先过滤出一批小数据集,同时把小表放join的右边。

 

经过优化,Join 操作可以在1秒之内返回数据,不过为什么可以这么优化呢

我们下面就来通过分析 Clickhouse 的 Join 的实现来回答这个问题。

对源码有兴趣的小伙伴,可以先跟着我看一点源码分析实验。

在本地Mac上搭建拥有两个分片,每个分片两个副本共四个节点的伪分布式集群,并且用 C++ 的IDE Clion 绑定四个进程监控四个数据库的执行过程。

选择什么地方作为切入点来跟踪观察代码呢

先铺垫一点 Clickhouse 基本运行原理的知识。

上图是 Clickhouse 上执行 SQL 的大致流程,我们一般以 TCP 客户端连接 Clickhouse,TCPHandler 中主要方法为 runImpl,核心处理如下

 

在我们 executeQuery 方法输出的 state.io 中有一个 QueryPipeline 对象即数据加工处理的流水线。

Processor 是处理 SQL 中各步骤数据的基本单元,Source、Transform、Sink 都继承自 IProcessor,都可作为一个 Processor 被加入到流水线中。在 Plan 阶段,会通过 Pipeline 把算子Transform 编排起来交给 Executer 按照计划执行。数据从 Source 流经编排好顺序的 Transform 链路经过加工处理输出到 Sink。

当我们对SQL执行过程头绪不多的时候,可以通过查看Pipeline计划来找 Transform。

我们可以通过 explain pipeline SQL 语句查看 pipeline

 

 

可以看到,其中使用到了 JoiningTransform,所以我们通过快捷键⌘ + ⇧ + O即 Shift + Command + O 找到 JoiningTransform.cpp

随便找个方法比如 Block JoiningTransform::transformHeader(Block header, const JoinPtr & join),打个断点,再在 Clickhouse Client输入调试 Join SQL即可发现程序在断点处停顿

我们观察,在断点处有一个指针join,执行了其中的方法

joinBlock(Block & block, std::shared_ptr & not_processed)

我们按住⌘键并点击该方法,进入了 IJoin.h 头文件,该方法为抽象类 IJoin 的抽象方法

我们按住⌘ + ⌥并点击该方法,发现有三个实现该方法的类,分别是 HashJoin、 MergeJoin和 JoinSwitcher

我们再来看看 JoinSwitcher 是什么。在全局搜索 JoinSwitcher,找到其构造方法调用附近,打断点重新进入调试

原来,在语意分析类 expressionAnalyzer 中有个判断

当 analyzed_join->forceHashJoin() || (analyzed_join->preferMergeJoin() && !allow_merge_join)时直接走 HashJoin

当 analyzed_join->forceMergeJoin() || (analyzed_join->preferMergeJoin() && allow_merge_join)时走 MergeJoin

 

 当 join_algorithm 设置为 JoinAlgorithm::AUTO 时,会走 JoinSwitcher,该类开头注释写着

 

即根据内存是否能够放下右表来自动判断

 

 

默认走 HashJoin,若 limits.softCheck(rows, bytes) 检测到超过预先设置的 max_rows 或 max_bytes,通过 switchJoin()方法切换到 MergeJoin。

我们回过头来再看 IJoin 类,其中有两个重要方法 addJoinedBlock 和 joinBlock。

 

通过注释我们就可以看出,addJoinedBlock 方法用于把右边的表加载并 Built Hash,而addJoinedBlock 方法用于把左表数据与之前 Built 好的右表 Hash 数据关联。

我们再来观察一下分布式情况下Join和Global Join的区别。

在executeQuery.cpp 的方法 executeQueryImpl 第687行代码 query_log->add(elem) 处会记录下该数据库实例执行的 SQL,包括自己主动发起的和收到远程请求被动发起的本地表查询。

在各进程挂起时观察右表本地表子查询的次数,可以发现普通 Join 在两个 Shard 上被动各多执行了一次,而 Globle Join 在不含初始执行节点的另一个 Shard 上多被动执行了一次。

Clickhouse 有极其丰富的表引擎,其中按照表数据是否分布在不同节点上分为本地表和分布式表。

本地表和单机数据库一样,数据只在一台数据库服务器上,其 Join 为单机 Join。

分布式表基于本地表,看上去是在本地表基础上套了个壳,数据读写都可以经过分布式表,而实际存储在各个数据库服务器底层的本地表上,其 Join 为分布式 Join,实现算法有 HashJoin、MergeJoin两种。

哈希Join算法

一般情况下最高效的 Join 实现,所以 Clickhouse 默认选择这种方式。哈希连接的执行过程包括建立阶段(Build Phase)和探测阶段(Probe Phase)。

建立阶段:选择一张表,对 Join Key 做哈希建立 Hash Map,全部放在内存里。Clickhouse 会一直以右表来建立Hash Map。

探测阶段:另一个表作为探测表,扫描它的每一行并计算 Join Key 的哈希值,在 Hash Map 中试探是否可以 Get(key)。考虑到哈希碰撞,若哈希值相等,需要再判断一次原始 Key 值是否相等,返回最终满足条件的记录。

排序归并 Join 算法

排序归并算法就是 Sort-Merge Join,也被称为 Merge Join。可以分为排序和归并两个阶段。

排序:就是对两表根据 Join Key 进行排序

归并:因为两张表已经按照同样的顺序排列,两表各一次循环遍历即可。

A、B 两个表各设置一个指针,分别为 I 和 J,若A表第 I 行 Key 小于 B 表第 J 行 Key,则 I ++,否则 J ++,直到匹配到数据则添加到结果集。若表的记录本身就是有序的,或者 Join key 刚好是索引列,选择排序归并成本更低。

Clickhouse 的分布式 Join 查询可以分为两类,不带 Global 关键字的普通 Join,和带 Global 关键字分布式全局 Join。

普通JOIN实现

我们来看一下下面这条 SQL 在 Clickhouse 里是怎么实现的

 

假设集群有4个节点,分为2个分片,2个副本,其中 Node-1、Node-2 为一个分片 Shard-a,Node-3、Node-4 为一个分片 Shard-b。

有两张本地表 left_table_local 和 right_table_local,left_table_all 和 right_table_all 分别是两张本地表的分布式表。

普通Join 实现过程如下

1.客户端发送 SQL,找到一个执行节点,比如 Node-1

2.Node-1 触发一次分布式查询

Node-1将左表 left_table_all 解析替换为其本地表left_table_local,SQL 变为如下形式

 

该SQL在本节点和Shard-b其中一个节点比如 Node-3 执行一次

3.Node-1、Node-3 执行SQL时发现右表是分布式表又都触发一次分布式查询

向另一个分片发送查询请求

 

并将结果发送到其对应的另一个分片上,这样每个分片上都有全量右表right_table_all数据

4.节点 Node-1、Node-3 汇总右表数据形成 subquery 并执行

 

5.Node-3 执行完本地 Join之后,发送数据给 Node-1,Node-1 再汇总结果返回客户端

通过上述流程,可以发现,右表本地表查询了4次,即分布式表总数据查询了2次,存在读放大现象。

可以看出,Clickhouse 普通分布式 Join 查询是一个简单版的 Shuffle Join 的实现,或者说是一个不完整的实现。不完整的地方在于,并未按 Join Key 去 Shuffle 数据,而是每个节点全量拉取右表数据。

 

Global Join 实现

Global Join 实现过程如下

  1. 首先,客户端发送 SQL,找到一个执行节点,比如 Node-1
  2. 在Node-1 和 Shard-b 的一个节点比如 Node-4 上上执行将右表数据查询出来在Node-1上汇总为subquery
  3. 把 subquery 发送到 Shard-b 的一个节点比如 Node-3 上执行
  4. ,Node-3 执行完本地 Join 之后,发送数据给 Node-1
  5. Node-1 再汇总结果返回客户端

可以看出,Global Join 将右表所有本地的数据查询出来在 Node-1 节点上进行汇总,再通过网络发送到其他节点,避免其他节点重复计算,从而避免查询放大。

Global Join 可以看做一个不完整的 Broadcast Join 实现。如果 Join 的右表数据量较大,则会占用大量网络带宽,导致查询性能降低,所以这种情况只适合其中至少有一个表为小表的 Join。

现在我们了解了一下Join实现的细节,回过头来,就可以很自然地了解之前我们Clickhouse Join优化的原理了。Clickhouse 的 Join 一直是被诟病的地方,ClickHouse 单机 Join 操作默认采用 Hash Join算法,可选 Merge Join 算法,但又并未实现完整意义上的 Shuffle Join 和 Broadcast Join ,效率较低。

假设分片数为 N,Global Join 右表本地表查询次数为 N,而普通 Join 右表查询次数为 N 的平方。通过右表广播到所有节点的方式减小 Clickhouse 普通 Join 带来的读放大情况。

ClickHouse 根据 Join 的右表数据,构建 Hash Map,并将 SQL 中所需的列全部读入内存中。如果右表数据量过大,节点内存无法容纳后,无法完成计算。

另外特别值得注意的一点是,与其他有自动优化功能的查询引擎如 Hive、Spark 不同,Clickhouse 的查询优化器比较弱,Join操作的谓词不会主动下推,因此一定要先在子查询里做完过滤、聚合等操作,再在结果集上做Join。

总之,在实际使用中,我们需要使用Global Join代替Join ,并且将较小的表作为右表,尽可能增加过滤条件,减少进入Join计算的数据量。

作者:雪球工程师团队 链接:https://juejin.cn/post/7105938321326473252 来源:稀土掘金 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

本文地址:http://nhjcxspj.xhstdz.com/quote/1481.html    物流园资讯网 http://nhjcxspj.xhstdz.com/ , 查看更多

特别提示:本信息由相关企业自行提供,真实性未证实,仅供参考。请谨慎采用,风险自负。


0相关评论
相关行业动态
推荐行业动态
点击排行
网站首页  |  关于我们  |  联系方式  |  使用协议  |  版权隐私  |  网站地图  |  排名推广  |  广告服务  |  积分换礼  |  网站留言  |  RSS订阅  |  违规举报  |  鄂ICP备2020018471号