模板聚合的翻譯 【翻譯】Flink Table Api & SQL — 性能調(diào)優(yōu) — 流式聚合
日期:2023-03-11 12:39:36 / 人氣: 530 / 發(fā)布者:成都翻譯公司
在此頁面中,我們將介紹一些有用的優(yōu)化選項以及流聚合的內(nèi)部原理,這將在某些情況下帶來很大的改進(jìn)。注意:當(dāng)前,僅對無邊界聚合支持流聚合優(yōu)化。小型批處理聚合的核心思想是將一組輸入緩存在聚合運算符內(nèi)部的緩沖區(qū)中。數(shù)據(jù)流中的記錄可能會傾斜,因此聚合運算符的某些實例會比其他實例處理更多的記錄,這會導(dǎo)致熱點。注意:但是,當(dāng)前,拆分優(yōu)化不支持包含用戶定義的AggregateFunction的聚合。本文翻譯自官網(wǎng):Streaming Aggregation
Flink Table Api & SQL 翻譯目錄
SQL 是*廣泛使用的數(shù)據(jù)分析語言。Flink 的 Table API 和 SQL 使用戶能夠以更少的時間和精力定義高效的流分析應(yīng)用程序。而且,F(xiàn)link Table API 和 SQL 都得到了有效的優(yōu)化,集成了很多查詢優(yōu)化和優(yōu)化算子的實現(xiàn)。但是,并非所有優(yōu)化都默認(rèn)啟用,因此對于某些工作負(fù)載,您可以通過打開某些選項來提高性能。
在這個頁面中,我們將介紹一些有用的優(yōu)化選項和流聚合的內(nèi)部原理模板聚合的翻譯,它們在某些情況下會帶來很大的改進(jìn)。
注意:目前只有 Blink planner 支持本頁提到的優(yōu)化選項。
注意:目前,流聚合優(yōu)化僅支持無邊界聚合。以后會支持窗口聚合的優(yōu)化。
默認(rèn)情況下,無界聚合算子對輸入的記錄進(jìn)行一條一條的處理,即(1)從狀態(tài)中讀取累加器,(2)將記錄累加/收回到累加器中,(3)將累加器寫回狀態(tài),(4)下一條記錄會從(1))再次處理。這種處理方式可能會增加StateBackend的開銷(尤其是RocksDB StateBackend)。另外,生產(chǎn)中很常見的數(shù)據(jù)傾斜會加劇問題并使工作更容易受到背壓情況的影響。
小批量聚合
小批量聚合的核心思想是在聚合算子內(nèi)部的緩沖區(qū)中緩存一組輸入。當(dāng)輸入被觸發(fā)進(jìn)行處理時,每個鍵只需一個操作即可訪問狀態(tài)。這可以大大減少狀態(tài)開銷并獲得更好的吞吐量。但是,這可能會增加一些延遲,因為它會緩沖一些記錄而不是立即處理它們。這是吞吐量和延遲之間的權(quán)衡。
下圖說明了小批量聚合如何減少狀態(tài)操作。
默認(rèn)情況下禁用 MiniBatch 優(yōu)化。要啟用此優(yōu)化,您應(yīng)該設(shè)置 table.exec.mini-batch.enabled、table.exec.mini-batch.allow-latency 和 table.exec.mini-batch.size。請參閱配置頁面了解更多詳細(xì)信息。
以下示例顯示了如何啟用這些選項。
// instantiate table environment
val tEnv: TableEnvironment = ...
// access flink configuration
val configuration = tEnv.getConfig().getConfiguration()
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true") // enable mini-batch optimization
configuration.setString("table.exec.mini-batch.allow-latency", "5 s") // use 5 seconds to buffer input records
configuration.setString("table.exec.mini-batch.size", "5000") // the maximum number of records can be buffered by each aggregate operator task
本地全局聚合
提出將局部聚合分為兩個階段來解決數(shù)據(jù)傾斜的問題,即先在上游進(jìn)行局部聚合,然后在下游進(jìn)行全局聚合,類似于MapReduce中的Combine+Reduce模式. 例如,請考慮以下 SQL:
SELECT color, sum(id)
FROM T
GROUP BY color
數(shù)據(jù)流中的記錄可能會發(fā)生傾斜,因此聚合運算符的某些實例將處理比其他實例更多的記錄,這可能會導(dǎo)致熱點。本地聚合可以幫助將一定數(shù)量的具有相同密鑰的輸入累積到單個累加器中。全局摘要只會接收減少的累加器,而不是大量的原始輸入。這樣可以大大降低網(wǎng)絡(luò)重組和狀態(tài)訪問的成本。每個本地聚合累積的輸入數(shù)量基于*小批處理間隔。這意味著本地-全局聚合取決于啟用小批量優(yōu)化。
下圖顯示了本地全局聚合如何提高性能。
以下示例顯示如何啟用本地全局聚合。
// instantiate table environment
val tEnv: TableEnvironment = ...
// access flink configuration
val configuration = tEnv.getConfig().getConfiguration()
// set low-level key-value options
configuration.setString("table.exec.mini-batch.enabled", "true") // local-global aggregation depends on mini-batch is enabled
configuration.setString("table.exec.mini-batch.allow-latency", "5 s")
configuration.setString("table.exec.mini-batch.size", "5000")
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE") // enable two-phase, i.e. local-global aggregation
拆分不同的聚合
局部和全局優(yōu)化可以有效消除常規(guī)聚合的數(shù)據(jù)傾斜,如SUM、COUNT、MAX、MIN、AVG。然而,在處理不同的聚合反應(yīng)時,其性能并不令人滿意。
例如,如果我們要分析今天有多少獨立用戶登錄。我們可能有以下查詢:
SELECT day, COUNT(DISTINCT user_id)
FROM T
GROUP BY day
如果distinct key(即user_id)的值是稀疏的,COUNT DISTINCT不能減少記錄。即使啟用了局部和全局優(yōu)化,也無濟(jì)于事。因為累加器仍然包含幾乎所有的原始記錄,全局聚合將成為瓶頸(大部分重累加器由一個任務(wù)處理,即同一天)。
這個優(yōu)化的思路是把不同的聚合(比如COUNT(DISTINCT col))分成兩個層次。第一個聚合由組鍵和其他桶鍵打亂。用于計算桶鍵 HASH_CODE(distinct_key)% BUCKET_NUM。BUCKET_NUM 默認(rèn)為 1024,可以通過 table.optimizer.distinct-agg.split.bucket-num 選項進(jìn)行配置。第二次聚合由原始組鍵打亂,用于 SUM 聚合來自不同桶的 COUNT DISTINCT 值。由于相同的唯一鍵只會在同一個桶中計算,因此轉(zhuǎn)換是等效的。桶鍵作為額外的組鍵,分擔(dān)組鍵中熱點的負(fù)擔(dān)。Bucket 關(guān)鍵字使工作具有可擴(kuò)展性,以解決不同聚合中的數(shù)據(jù)傾斜/熱點問題。
拆分非重復(fù)聚合后,上述查詢將自動改寫為以下查詢:
SELECT day, SUM(cnt)
FROM (
SELECT day, COUNT(DISTINCT user_id) as cnt
FROM T
GROUP BY day, MOD(HASH_CODE(user_id), 1024)
)
GROUP BY day
下圖展示了拆分非重復(fù)聚合如何提高性能(假設(shè)顏色代表日期,字母代表user_id)。
注意:以上是*簡單的例子,可以從這個優(yōu)化中受益。此外,F(xiàn)link 支持拆分更復(fù)雜的聚合查詢,例如多個不同key的(如不同的集合COUNT(DISTINCT a)、SUM(DISTINCT b)),以及其他非重復(fù)的聚合工作(例如模板聚合的翻譯,總和、*大值、*小值、計數(shù))。
注意:但是,目前,拆分優(yōu)化不支持包含用戶定義的 AggregateFunction 的聚合。
以下示例顯示了如何啟用拆分不同聚合優(yōu)化。
// instantiate table environment
val tEnv: TableEnvironment = ...
tEnv.getConfig // access high-level configuration
.getConfiguration // set low-level key-value options
.setString("table.optimizer.distinct-agg.split.enabled", "true") // enable distinct agg split
在不同的聚合上使用 FILTER 修飾符
在某些情況下,用戶可能需要從不同維度計算 UV(唯一身份訪問者)的數(shù)量,例如 Android UV、iPhone UV、Web UV 和總 UV。很多用戶會選擇CASE WHEN來支持這個功能,例如:
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT CASE WHEN flag IN ('android', 'iphone') THEN user_id ELSE NULL END) AS app_uv,
COUNT(DISTINCT CASE WHEN flag IN ('wap', 'other') THEN user_id ELSE NULL END) AS web_uv
FROM T
GROUP BY day
但是,在這種情況下,建議使用 FILTER 語法而不是 CASE WHEN。因為FILTER更符合SQL標(biāo)準(zhǔn),會得到更多的性能提升。FILTER 是聚合函數(shù)中使用的修飾符,用于限制聚合中使用的值。用 FILTER 修飾符替換上面的例子,如下所示:
SELECT
day,
COUNT(DISTINCT user_id) AS total_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('android', 'iphone')) AS app_uv,
COUNT(DISTINCT user_id) FILTER (WHERE flag IN ('wap', 'other')) AS web_uv
FROM T
GROUP BY day
Flink SQL 優(yōu)化器可以在同一個唯一鍵上識別不同的過濾參數(shù)。例如,在上面的示例中,所有三個 COUNT DISTINCT 都在 user_id 列上。那么 Flink 可以只使用一個共享狀態(tài)實例而不是三個狀態(tài)實例來減少狀態(tài)訪問和狀態(tài)大小。在某些工作負(fù)載中,這可以顯著提高性能。
相關(guān)閱讀Relate
熱門文章 Recent
- 建設(shè)銀行水單翻譯模板 中國建設(shè)銀行的專用字體是什2023-03-11
- 轉(zhuǎn)學(xué)推薦信英文萬能模板及翻譯 留學(xué)申請之寫作推薦信必須注意的幾個問題2023-03-11
- 病毒rna可直接作為翻譯的模板 什么是單股正鏈RNA?2023-03-11
- 英語作文高考模板帶翻譯 高中生英語作文范文100字帶翻譯2023-03-11
- 謝菲爾德學(xué)位證書翻譯模板 在英國本科留學(xué)費用2023-03-11
- 內(nèi)華達(dá)州結(jié)婚證英譯漢翻譯模板 美國結(jié)婚證翻譯模板2023-03-11
- 外國單位推薦信模板翻譯 留學(xué)申請推薦信,怎么寫才能更有效果?2023-03-11
- 翻譯專業(yè)資格水平考試報名表模板 2018山西省全國翻譯專業(yè)資格(水平)考試筆譯考試合格審核公告2023-03-11
- mrna翻譯原料和模板2023-03-11
- 翻譯公司翻譯模板 特思翻譯公司項目計劃書【*新范本模板】2023-03-11