logo头像

Edward.K Thinking

Azure Data Factory讓資料從Cosmos DB傳送到Azure SQL Database


當我們把非結構化資料大量存放到Cosmos DB後,再來就要做數據分析,在分析前,通常第一件事情就要整理資料,且把資料進行結構化的整理,這件事情有很多種作法,這裡進行其中一種作法,就是把Cosmos DB資料匯入到Azure SQL Database中,中間測試過幾種方法,如果只是小量資料方法倒是很多,不過,今日要轉拋資料是將每天八十萬筆的資料進行轉拋。目前看來在Azure上要做這樣事情大概使用Azure Data Factory是比較容易些。

甚麼是Azure Data Factory?簡單來說它就是資料轉拋的一種ETL工具,在地端就類似SSIS這種ETL工具,雖然,現在Azure Data Factory也在V2版本,也可以支援以開發好的SSIS放到Azure Data Factory執行了,不過,這邊還是介紹使用原本就在Azure Data Factory的模組來做ETL的流程。

在開發Azure Data Factory的ETL有三個區塊要做設定

  • 建立資料庫的連線,又可以叫做連接器
  • 建立連接資料庫在Azure Data Factory內的資料集
  • 建立資料搬移的流程或動作

建立連接器


本篇案例是建立Cosmos DB資料轉移到Azure SQL DataBase,所以,我們要針對兩個資料源建立連接器。在建立連接器時候,會因為你選得資料源不同,導致其設定的屬性都不盡相同。首先我們先設定連接到Cosmos DB

如果今日在同一個訂閱帳號,就可以直接用選的,就選到你想要的連接的Cosmos DB,並且同時會帶入必要的連線資訊,如果今天不是同一個訂閱帳戶,就必須輸入Cosmos DB的URL和Key才有辦法連線

設定好該屬性就可以建立好連接器,連接器在這邊定義就是屬於資料連線的意思,另一個連接器是Azure SQL Database,其做法也是相同的,此外,在Azure Data Factory這些設定本身也是透過Json的方式去定義,所以,就剛剛Cosmos DB的建立連接器,可以看到其json格式如下

1
2
3
4
5
6
7
8
9
10
11
{
"name": "CosmosDb1",
"type": "Microsoft.DataFactory/factories/linkedservices",
"properties": {
"type": "CosmosDb",
"typeProperties": {
"connectionString": "AccountEndpoint=XXXX;",
"encryptedCredential": "XXX"
}
}
}

換句話說,如果今日不透過介面去修改,直接在改json檔案也是可以,這一點在後續提到Pipeline部分時候,這是一個相當好用的一個作法

建立資料集


建立資料集主要目的在於你可以決定哪些資料是需要轉拋的,舉例來說你對應的資料庫的資料表有12個欄位,但是在資料集中你定義5個欄位,實際上會轉拋就是這五個欄位的資料而已。因為資料轉拋有來源端和目的地端,相對的資料集也必須建立兩個地方,因此,我們就必須建立CosmosDB資料集,透過資料集來決定哪些資料欄位要轉拋到Azure SQL DataBase

在資料集中也必須選對資料源,這主要目的在於建立Data Schema部分,需要去定義資料欄位的格式或形態,不同資料源的格式與型態是不同的,所以,並不能全部通用,因此必須選到對的才可以

依舊是選擇CosmosDB,設定部分比較重要的就是兩塊,分別是Connection和Schema

  • Connection : 設定你要使用哪個連接器,並對應到相關的資料庫
  • Schema : 建立要儲存資料的資料表,這邊可以用匯入的方式,將資料集從資料庫匯入,就可以省去自己建立Schema的痛苦,不過,目前對於結構化的資料庫比較有效,非結構化資料庫基本上資料欄位不會正確被放入,還是需要人工填入,如果是人工填入,務必資料欄位名稱要跟到時候從資料庫取得的資料欄位名稱要相同

在建立資料集同時,如果有一種情況是來源端的資料集,有可能被切分到不同目的端,這時候可以在Pipelines做,不需要建立兩個類似的資料集

建立Pipeline (Action)


當前前面兩項都做好後,再來就是把這些材料給組合起來。可以看到在 Pipeline中可以使用的Action Task是很多的,不過,一般來說最常用的大概是Copy Data這個

而這次的Case也必須透過這個Task才可以完成,如果只是要做Copy Data部分,在原本Azure Data Factory中也提供精靈模式讓我們快速建立,不過,我不建議這樣做,還是一步一步來會比較穩一些

如果要做Copy Data,就拉一個Copy Data資料過來,紅色框表示是需要設定地方

  • Source : 資料來源端
  • Sink : 資料目的端
  • Mappling : 來源與目的端的欄位mapping,如果兩端欄位名稱一樣,會自動幫你Mapping好,如果不同,則必須自行做調整
  • Setting : 定義資料在兩端傳輸過程的資料傳輸設定

因為這次我們是用Cosmos DB,所以在User Query部分,我建議使用Query語法,畢竟對於Cosmos DB來說沒有明確的Table名稱,且有些欄位可能是槽狀資料集,也不適合與Azure SQL Database進行轉拋,透過User QUery可以撰寫SQL語法取得想要資料進行轉拋

在來源端選擇Azure SQL Database 資料集,這邊也可以執行Stored Procedure,另外,比較重要的在於Write Batch szie,這代表從來源端要蒐集多少資料後才會開始轉拋到目的端,例如初始化是10K,表示要等到一萬筆才會寫入目的端,這時候大多數會被Keep在Azure Data Factory

在Mapping部分,雖然Dataset有欄位,我們也可以設定在這個Pipeline中是否需要使用到這欄位的Maaping

當一切都設定完畢就可以使用Trigger進行資料拋轉,當然也可以設定排程去定時跑這個Pipeline。此外,所有的設定要儲存,一定要按上方的Publish才可以,不然不會被儲存,此外,就如我前面提到,這部分也是透過Json方式去設定,所以,要修改東西直接改Json檔案也可以,在這裡會比較建議大方向透過UI設定,細節部分透過Json修改會比較好

最後


目前感覺Azure Data Factory還不算很穩定,不穩定地方在於介面操作,有時候會發生明明都弄好,但是介面儲存後依舊還是舊的情況,但是如果透過Json去改就沒問題,另外,在本次案例中因為需要先抓取Cosmos時間作為參數,給後面的User Query,因此,在Azure Data Factory中也可以應用變數來當作User Query裡面的參數,像是下面圖,我想先從Cosmos DB取得時間資料,並把時間資料傳給下面Copy Data作為參數使用

這裡我們會用到兩種參數,分別是

  • @{pipeline().parameters.StartDate}
  • @{activity(‘QueryDate’).output.firstRow.ydate}

第一個參數是代表這pipeline的參數,可以在整個pipeline去設定這個參數叫做StartDate,這樣在Trigger時候,就可以指定內容給Task用,在第一個Lookup中,想要找Cosmos DB的時間,其寫法如下

1
SELECT @{pipeline().parameters.StartDate} as ydate FROM c

其中@{pipeline().parameters.StartDate}可以設定取得當下時間的Function,又或是臨時自己想要跑某天資料,也可以在pipeline參數中輸入想要的日期,這參數可以讓你在手動觸發時候,自己輸入想要的參數,這樣在於自動化或是手動都會有比較好的維護,自動化就是固定抓前一天的資料,手動則是輸入資料,如:2011-01-01。記住在Pipeline那邊必須設定StartDate的參數

當我們在lookup那邊取得資料時候,就必須要到Line的Task中去設定來接受lookup所得到的值,作為Line Task來使用,我們可以透過其參數是@{activity('QueryDate').output.firstRow.ydate},其中QueryDate代表Lookup的名字,表示你要從哪個Task取出資料,ydate當然就查詢出來的資料欄位名稱,因此,把@{activity('QueryDate').output.firstRow.ydate}放到後面想要的地方就可以。目前測試過一次轉拋720萬筆資料大約需要90分鐘時間,不過,這時間主要會取決於來源端與目的端的pool有多大。

因此,透過這樣方式算是可以很快將資料做轉拋的動作,尤其是在不同類型資料庫上

上一篇