導(dǎo)讀:阿里云 EMR 團隊和趣頭條的大數(shù)據(jù)團隊共同研發(fā)了 RSS,解決 Spark on Yarn 層面提到的所有問題,并為 Spark 跑在 Kubernetes 上提供 Shuffle 基礎(chǔ)組件。
作者 | 王振華、曹佳清、范振
業(yè)務(wù)場景與現(xiàn)狀
趣頭條是一家依賴大數(shù)據(jù)的科技公司,在 2018~2019 年經(jīng)歷了業(yè)務(wù)的高速發(fā)展,主 App 和其他創(chuàng)新 App 的日活增加了 10 倍以上,相應(yīng)的大數(shù)據(jù)系統(tǒng)也從最初的 100 臺機器增加到了 1000 臺以上規(guī)模。多個業(yè)務(wù)線依賴于大數(shù)據(jù)平臺展開業(yè)務(wù),大數(shù)據(jù)系統(tǒng)的高效和穩(wěn)定成了公司業(yè)務(wù)發(fā)展的基石,在大數(shù)據(jù)的架構(gòu)上我們使用了業(yè)界成熟的方案,存儲構(gòu)建在 HDFS 上、計算資源調(diào)度依賴 Yarn、表元數(shù)據(jù)使用 Hive 管理、用 Spark 進行計算,具體如圖 1 所示:
圖 1 趣頭條離線大數(shù)據(jù)平臺架構(gòu)圖
其中 Yarn 集群使用了單一大集群的方案,HDFS 使用了聯(lián)邦的方案,同時基于成本因素,HDFS 和 Yarn 服務(wù)在 ECS 上進行了 DataNode 和 NodeManager 的混部。
在趣頭條每天有 6W 的 Spark 任務(wù)跑在 Yarn 集群上,每天新增的 Spark 任務(wù)穩(wěn)定在 100 左右,公司的迅速發(fā)展要求需求快速實現(xiàn),積累了很多治理欠債,種種問題表現(xiàn)出來集群穩(wěn)定性需要提升,其中 Shuffle 的穩(wěn)定性越來越成為集群的桎梏,亟需解決。
當(dāng)前大數(shù)據(jù)平臺的挑戰(zhàn)與思考
近半年大數(shù)據(jù)平臺主要的業(yè)務(wù)指標是降本增效,一方面業(yè)務(wù)方希望離線平臺每天能夠承載更多的作業(yè),另一方面我們自身有降本的需求,如何在降本的前提下支撐更多地業(yè)務(wù)量對于每個技術(shù)人都是非常大地挑戰(zhàn)。熟悉 Spark 的同學(xué)應(yīng)該非常清楚,在大規(guī)模集群場景下,Spark Shuffle 在實現(xiàn)上有比較大的缺陷,體現(xiàn)在以下的幾個方面:
- Spark Shuffle Fetch 過程存在大量的網(wǎng)絡(luò)小包,現(xiàn)有的 External Shuffle Service 設(shè)計并沒有非常細致的處理這些 RPC 請求,大規(guī)模場景下會有很多connection reset 發(fā)生,導(dǎo)致 FetchFailed,從而導(dǎo)致 Stage 重算。
- Spark Shuffle Fetch 過程存在大量的隨機讀,大規(guī)模高負載集群條件下,磁盤 IO 負載高、CPU 滿載時常發(fā)生,極容易發(fā)生 FetchFailed,從而導(dǎo)致 stage 重算。
- 重算過程會放大集群的繁忙程度,搶占機器資源,導(dǎo)致惡性循環(huán)嚴重,SLA 完不成,需要運維人員手動將作業(yè)跑在空閑的Label集群。
- 計算和 Shuffle 過程架構(gòu)不能拆開,不能把 Shuffle 限定在指定的集群內(nèi),不能利用部分 SSD 機器。
- M*N 次的 shuffle 過程:對于 10K mapper、5K reducer 級別的作業(yè),基本跑不完。
- NodeManager 和 Spark Shuffle Service 是同一進程,Shuffle 過程太重,經(jīng)常導(dǎo)致 NodeManager 重啟,從而影響 Yarn 調(diào)度穩(wěn)定性。
以上的這些問題對于 Spark 研發(fā)同學(xué)是非常痛苦的,好多作業(yè)每天運行時長方差會非常大,而且總有一些無法完成的作業(yè),要么業(yè)務(wù)進行拆分,要么跑到獨有的 Yarn 集群中。除了現(xiàn)有面臨的挑戰(zhàn)之外,我們也在積極構(gòu)建下一代基礎(chǔ)架構(gòu)設(shè)施,隨著云原生 Kubernetes 概念越來越火,Spark 社區(qū)也提供了 Spark on Kubernetes 版本,相比較于 Yarn 來說,Kubernetes 能夠更好的利用云原生的彈性,提供更加豐富的運維、部署、隔離等特性。但是 Spark on Kubernetes 目前還存在很多問題沒有解決,包括容器內(nèi)的 Shuffle 方式、動態(tài)資源調(diào)度、調(diào)度性能有限等等。我們針對 Kubernetes 在趣頭條的落地,主要有以下幾個方面的需求:
- 實時集群、OLAP 集群和 Spark 集群之前都是相互獨立的,怎樣能夠?qū)⑦@些資源形成統(tǒng)一大數(shù)據(jù)資源池。通過 Kubernetes 的天生隔離特性,更好的實現(xiàn)離線業(yè)務(wù)與實時業(yè)務(wù)混部,達到降本增效目的。
- 公司的在線業(yè)務(wù)都運行在 Kubernetes 集群中,如何利用在線業(yè)務(wù)和大數(shù)據(jù)業(yè)務(wù)的不同特點進行錯峰調(diào)度,達成 ECS 的總資源量最少。
- 希望能夠基于 Kubernetes 來包容在線服務(wù)、大數(shù)據(jù)、AI 等基礎(chǔ)架構(gòu),做到運維體系統(tǒng)一化。
因為趣頭條的大數(shù)據(jù)業(yè)務(wù)目前全都部署在阿里云上,阿里云 EMR 團隊和趣頭條的大數(shù)據(jù)團隊進行了深入技術(shù)共創(chuàng),共同研發(fā)了 Remote Shuffle Service(以下簡稱 RSS),旨在解決 Spark on Yarn 層面提到的所有問題,并為 Spark 跑在 Kubernetes 上提供 Shuffle 基礎(chǔ)組件。
Remote Shuffle Service 設(shè)計與實現(xiàn)
- Remote Shuffle Service 的背景
早在 2019 年初我們就關(guān)注到了社區(qū)已經(jīng)有相應(yīng)的討論,如 SPARK-25299。該 Issue 主要希望解決的問題是在云原生環(huán)境下,Spark 需要將 Shuffle 數(shù)據(jù)寫出到遠程的服務(wù)中。但是我們經(jīng)過調(diào)研后發(fā)現(xiàn) Spark 3.0(之前的 master 分支)只支持了部分的接口,而沒有對應(yīng)的實現(xiàn)。該接口主要希望在現(xiàn)有的 Shuffle 代碼框架下,將數(shù)據(jù)寫到遠程服務(wù)中。如果基于這種方式實現(xiàn),比如直接將 Shuffle 以流的方式寫入到 HDFS 或者 Alluxio 等高速內(nèi)存系統(tǒng),會有相當(dāng)大的性能開銷,趣頭條也做了一些相應(yīng)的工作,并進行了部分的 Poc,性能與原版 Spark Shuffle 實現(xiàn)相差特別多,最差性能可下降 3 倍以上。同時我們也調(diào)研了一部分其他公司的實現(xiàn)方案,例如 Facebook 的 Riffle 方案以及 LinkedIn 開源的 Magnet,這些實現(xiàn)方案是首先將 Shuffle 文件寫到本地,然后在進行 Merge 或者 Upload 到遠程的服務(wù)上,這和后續(xù)我們的Kubernetes架構(gòu)是不兼容的,因為 Kubernetes 場景下,本地磁盤 Hostpath 或者 LocalPV 并不是一個必選項,而且也會存在隔離和權(quán)限的問題。
基于上述背景,我們與阿里云 EMR 團隊共同開發(fā)了 Remote Shuffle Service。RSS 可以提供以下的能力,完美的解決了 Spark Shuffle 面臨的技術(shù)挑戰(zhàn),為我們集群的穩(wěn)定性和容器化的落地提供了強有力的保證,主要體現(xiàn)在以下幾個方面:
- 高性能服務(wù)器的設(shè)計思路,不同于 Spark 原有 Shuffle Service,RPC 更輕量、通用和穩(wěn)定。
- 兩副本機制,能夠保證的 Shuffle fetch 極小概率(低于 0.01%)失敗。
- 合并 shuffle 文件,從 M*N 次 shuffle 變成 N 次 shuffle,順序讀 HDD 磁盤會顯著提升 shuffle heavy 作業(yè)性能。
- 減少 Executor 計算時內(nèi)存壓力,避免 map 過程中 Shuffle Spill。
- 計算與存儲分離架構(gòu),可以將 Shuffle Service 部署到特殊硬件環(huán)境中,例如 SSD 機器,可以保證 SLA 極高的作業(yè)。
- 完美解決 Spark on Kubernetes 方案中對于本地磁盤的依賴。
- Remote Shuffle Service 的實現(xiàn)
- 整體設(shè)計
Spark RSS 架構(gòu)包含三個角色:Master、Worker、Client。Master 和 Worker 構(gòu)成服務(wù)端,Client 以不侵入的方式集成到 Spark ShuffleManager 里(RssShuffleManager 實現(xiàn)了 ShuffleManager 接口)。
- Master 的主要職責(zé)是資源分配與狀態(tài)管理。
- Worker 的主要職責(zé)是處理和存儲 Shuffle 數(shù)據(jù)。
- Client 的主要職責(zé)是緩存和推送 Shuffle 數(shù)據(jù)。
整體流程如下所示(其中 ResourceManager 和 MetaService 是 Master 的組件),如圖 2。
圖 2 RSS 整體架構(gòu)圖
- 實現(xiàn)流程
下面重點來講一下實現(xiàn)的流程:
- RSS 采用 Push Style 的 shuffle 模式,每個 Mapper 持有一個按 Partition 分界的緩存區(qū),Shuffle 數(shù)據(jù)首先寫入緩存區(qū),每當(dāng)某個 Partition 的緩存滿了即觸發(fā) PushData。
- Driver 先和 Master 發(fā)生 StageStart 的請求,Master 接受到該 RPC 后,會分配對應(yīng)的 Worker Partition 并返回給 Driver,Shuffle Client 得到這些元信息后,進行后續(xù)的推送數(shù)據(jù)。
- Client 開始向主副本推送數(shù)據(jù)。主副本 Worker 收到請求后,把數(shù)據(jù)緩存到本地內(nèi)存,同時把該請求以 Pipeline 的方式轉(zhuǎn)發(fā)給從副本,從而實現(xiàn)了 2 副本機制。
- 為了不阻塞 PushData 的請求,Worker 收到 PushData 請求后會以純異步的方式交由專有的線程池異步處理。根據(jù)該 Data 所屬的 Partition 拷貝到事先分配的 buffer 里,若 buffer 滿了則觸發(fā) flush。RSS 支持多種存儲后端,包括 DFS 和 Local。若后端是 DFS,則主從副本只有一方會 flush,依靠 DFS 的雙副本保證容錯;若后端是 Local,則主從雙方都會 flush。
- 在所有的 Mapper 都結(jié)束后,Driver 會觸發(fā) StageEnd 請求。Master 接收到該 RPC 后,會向所有 Worker 發(fā)送 CommitFiles 請求,Worker 收到后把屬于該 Stage buffer 里的數(shù)據(jù) flush 到存儲層,close 文件,并釋放 buffer。Master 收到所有響應(yīng)后,記錄每個 partition 對應(yīng)的文件列表。若 CommitFiles 請求失敗,則 Master 標記此 Stage 為 DataLost。
- 在 Reduce 階段,reduce task 首先向 Master 請求該 Partition 對應(yīng)的文件列表,若返回碼是 DataLost,則觸發(fā) Stage 重算或直接 abort 作業(yè)。若返回正常,則直接讀取文件數(shù)據(jù)。
總體來講,RSS 的設(shè)計要點總結(jié)為 3 個層面:
- 采用 PushStyle 的方式做 shuffle,避免了本地存儲,從而適應(yīng)了計算存儲分離架構(gòu)。
- 按照 reduce 做聚合,避免了小文件隨機讀寫和小數(shù)據(jù)量網(wǎng)絡(luò)請求。
- 做了 2 副本,提高了系統(tǒng)穩(wěn)定性。
- 容錯
對于 RSS 系統(tǒng),容錯性是至關(guān)重要的,我們分為以下幾個維度來實現(xiàn):
- PushData 失敗
當(dāng) PushData 失敗次數(shù)(Worker 掛了,網(wǎng)絡(luò)繁忙,CPU繁忙等)超過 MaxRetry 后,Client 會給 Master 發(fā)消息請求新的 Partition Location,此后本 Client 都會使用新的 Location 地址,該階段稱為 Revive。
若 Revive 是因為 Client 端而非 Worker 的問題導(dǎo)致,則會產(chǎn)生同一個 Partition 數(shù)據(jù)分布在不同 Worker 上的情況,Master 的 Meta 組件會正確處理這種情形。
若發(fā)生 WorkerLost,則會導(dǎo)致大量 PushData 同時失敗,此時會有大量同一 Partition 的 Revive 請求打到 Master。為了避免給同一個 Partition 分配過多的 Location,Master 保證僅有一個 Revive 請求真正得到處理,其余的請求塞到 pending queue 里,待 Revive 處理結(jié)束后返回同一個 Location。
- Worker 宕機
當(dāng)發(fā)生 WorkerLost 時,對于該 Worker 上的副本數(shù)據(jù),Master 向其 peer 發(fā)送 CommitFile 的請求,然后清理 peer 上的 buffer。若 Commit Files 失敗,則記錄該 Stage 為 DataLost;若成功,則后續(xù)的 PushData 通過 Revive 機制重新申請 Location。
- 數(shù)據(jù)去重
Speculation task 和 task 重算會導(dǎo)致數(shù)據(jù)重復(fù)。解決辦法是每個 PushData的數(shù)據(jù)片里編碼了所屬的 mapId、attemptId 和 batchId,并且 Master 為每個 map task 記錄成功 commit 的 attemtpId。read 端通過 attemptId 過濾不同的 attempt 數(shù)據(jù),并通過 batchId 過濾同一個 attempt 的重復(fù)數(shù)據(jù)。
- 多副本
RSS 目前支持 DFS 和 Local 兩種存儲后端。
在 DFS 模式下,ReadPartition 失敗會直接導(dǎo)致 Stage 重算或 abort job。在 Local 模式,ReadPartition 失敗會觸發(fā)從 peer location 讀,若主從都失敗則觸發(fā) Stage 重算或 abort job。
- 高可用
大家可以看到 RSS 的設(shè)計中 Master 是一個單點,雖然 Master 的負載很小,不會輕易地掛掉,但是這對于線上穩(wěn)定性來說無疑是一個風(fēng)險點。在項目的最初上線階段,我們希望可以通過 SubCluster 的方式進行 workaround,即通過部署多套 RSS 來承載不同的業(yè)務(wù),這樣即使 RSS Master 宕機,也只會影響有限的一部分業(yè)務(wù)。但是隨著系統(tǒng)的深入使用,我們決定直面問題,引進高可用 Master。主要的實現(xiàn)如下:
首先,Master 目前的元數(shù)據(jù)比較多,我們可以將一部分與 ApplD ShuffleId 本身相關(guān)的元數(shù)據(jù)下沉到 Driver 的 ShuffleManager 中,由于元數(shù)據(jù)并不會很多,Driver 增加的內(nèi)存開銷非常有限。
另外,關(guān)于全局負載均衡的元數(shù)據(jù)和調(diào)度相關(guān)的元數(shù)據(jù),我們利用 Raft 實現(xiàn)了 Master 組件的高可用,這樣我們通過部署 3 或 5 臺 Master,真正的實現(xiàn)了大規(guī)??蓴U展的需求。
實際效果與分析
- 性能與穩(wěn)定性
團隊針對 TeraSort、TPC-DS 以及大量的內(nèi)部作業(yè)進行了測試,在 Reduce 階段減少了隨機讀的開銷,任務(wù)的穩(wěn)定性和性能都有了大幅度提升。
圖 3 是 TeraSort 的 benchmark,以 10T Terasort 為例,Shuffle 量壓縮后大約 5.6T??梢钥闯鲈摿考壍淖鳂I(yè)在 RSS 場景下,由于 Shuffle read 變?yōu)轫樞蜃x,性能會有大幅提升。
圖 3 TeraSort 性能測試(RSS 性能更好)
圖 4 是一個線上實際脫敏后的 Shuffle heavy 大作業(yè),之前在混部集群中很小概率可以跑完,每天任務(wù) SLA 不能按時達成,分析原因主要是由于大量的 FetchFailed 導(dǎo)致 stage 進行重算。使用 RSS 之后每天可以穩(wěn)定的跑完,2.1T 的 shuffle 也不會出現(xiàn)任何 FetchFailed 的場景。在更大的數(shù)據(jù)集性能和SLA表現(xiàn)都更為顯著。
圖 4 實際業(yè)務(wù)的作業(yè) stage 圖(使用 RSS 保障穩(wěn)定性和性能)
- 業(yè)務(wù)效果
在大數(shù)據(jù)團隊和阿里云 EMR 團隊的共同努力下,經(jīng)過近半年的上線、運營 RSS,以及和業(yè)務(wù)部門的長時間測試,業(yè)務(wù)價值主要體現(xiàn)在以下方面:
- 降本增效效果明顯,在集群規(guī)模小幅下降的基礎(chǔ)上,支撐了更多的計算任務(wù),TCO 成本下降 20%。
- SLA 顯著提升,大規(guī)模 Spark Shuffle 任務(wù)從跑不完到能跑完,我們能夠?qū)⒉煌?SLA 級別作業(yè)合并到同一集群,減小集群節(jié)點數(shù)量,達到統(tǒng)一管理,縮小成本的目的。原本業(yè)務(wù)方有一部分 SLA比 較高的作業(yè)在一個獨有的 Yarn 集群 B 中運行,由于主 Yarn 集群 A 的負載非常高,如果跑到集群 A 中,會經(jīng)常的掛掉。利用 RSS 之后可以放心的將作業(yè)跑到主集群 A 中,從而釋放掉獨有 Yarn 集群 B。
- 作業(yè)執(zhí)行效率顯著提升,跑的慢→跑的快。我們比較了幾個典型的 Shuffle heavy 作業(yè),一個重要的業(yè)務(wù)線作業(yè)原本需要 3 小時,RSS 版本需要 1.6 小時。抽取線上 5~10 個作業(yè),大作業(yè)的性能提升相當(dāng)明顯,不同作業(yè)平均下來有 30% 以上的性能提升,即使是 shuffle 量不大的作業(yè),由于比較穩(wěn)定不需要 stage 重算,長期運行平均時間也會減少 10%-20%。
- 架構(gòu)靈活性顯著提升,升級為計算與存儲分離架構(gòu)。Spark 在容器中運行的過程中,將 RSS 作為基礎(chǔ)組件,可以使得 Spark 容器化能夠大規(guī)模的落地,為離線在線統(tǒng)一資源、統(tǒng)一調(diào)度打下了基礎(chǔ)。
未來展望
趣頭條大數(shù)據(jù)平臺和阿里云 EMR 團隊后續(xù)會繼續(xù)保持深入共創(chuàng),將探索更多的方向。主要有以下的一些思路:
- RSS 存儲能力優(yōu)化,包括將云的對象存儲作為存儲后端。
- RSS 多引擎支持,例如 MapReduce、Tez 等,提升歷史任務(wù)執(zhí)行效率。
- 加速大數(shù)據(jù)容器化落地,配合 RSS 能力,解決 K8s 調(diào)度器性能、調(diào)度策略等一系列挑戰(zhàn)。
- 持續(xù)優(yōu)化成本,配合 EMR 的彈性伸縮功能,一方面 Spark 可以使用更多的阿里云 ECS/ECI 搶占式實例來進一步壓縮成本,另一方面將已有機器包括阿里云 ACK、ECI 等資源形成統(tǒng)一大池子,將大數(shù)據(jù)的計算組件和在線業(yè)務(wù)進行錯峰調(diào)度以及混部。
版權(quán)聲明:本文內(nèi)容由互聯(lián)網(wǎng)用戶自發(fā)貢獻,該文觀點僅代表作者本人。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如發(fā)現(xiàn)本站有涉嫌抄襲侵權(quán)/違法違規(guī)的內(nèi)容, 請發(fā)送郵件至 舉報,一經(jīng)查實,本站將立刻刪除。