ETL: Data Fusion 轉換資料至 PostgreSQL
Categories:
資料透過 Data Fusion 處理過後,將處理過的資料存至 PostgreSQL
選擇資料來源
點選 Wrangler
在左側的資料來源選擇你的資料來源
- Database
- Kafka
- S3
- Google Cloud Storage
- Google BigQuery
- Google Cloud Spanner
- ADLS Handler
我這裡是選擇 Database
當作我要轉換資料的資料來源,點選後在右側選擇要轉換的資料表
使用 Wrangler 做資料轉換
選擇完資料表後會到 Wrangler
資料轉換頁,可以點選上方欄位下拉選單,對資料進行解析或變更資料類型
這裡因為 Sink 目標的資料表的欄位名稱是不同的,所以可以直接在上方將欄位名稱做異動變更
在 Wrangler 異動的任何資料不會去影響到原本來源資料庫的資料,Wrangler 會將資料擷取下來後在額外進行處理,而不會直接異動原始資料
欄位變更完後點選右上方 Create a Pipeline
,然後在彈跳出的視窗點選 Batch pipeline
建立資料解析流程
建立完成後會出現 Source 是剛剛選擇的資料庫,而點選 Wrangler 的 Properties
可以看到剛剛做的異動
異動的 Scripts 會寫在下方
keep id,english_given_name,english_family_name,created_at,updated_at
rename english_given_name english_first_name
rename english_family_name english_last_name
rename created_at signup_at
再重新點選 WRANGLE
按鈕可以回去原本的資料異動頁面
變更資料來源 SQL
點選資料來源 Database
的 Properties
按鈕可以做資料庫來源的細部設定
在下方可以看到 Connection String
,格式會長的像這樣
jdbc:postgresql://{ip-address}:{port}/{database-name}
jdbc:postgresql://1.1.1.1:5432/etl_database
而在下方的 Import Query
可以看到資料來源撈取的 SQL 語法
資料來源的 SQL 語法可以自行做異動
SELECT * FROM employee WHERE created_at < now() AND $CONDITIONS
設定資料儲存目標 Database
當資料處理完成後,可以點選左方的 Sink
頁籤,點選 Database
當作 Sink 的目標
將 Wrangler
處理完後的資料拉選至 Sink Database
,可以看到像下方的樣子,點選 Sink Database
的 Properties
設定 Sink 資料庫
類別 | 簡述 | 範例 |
---|---|---|
Plugin Name | 套件名稱 | postgresql |
Plugin Type | 套件類型 | jdbc |
Plugin Type | 套件類型 | jdbc |
Connection String | 連線字串 | jdbc:postgresql://{ip-address}:{port}/{database-name} |
Table Name | 資料表名稱 | etl_table_name |
Columns | 欄位名稱 | Input Schema 對應的欄位名稱 |
設定完資料對應後,設定連線的帳號密碼及交易
類別 | 簡述 | 說明 |
---|---|---|
Username | 資料庫帳號 | db account |
Password | 資料庫密碼 | db password |
Enable Auto-Commit | 自動 commit | postgresql 僅需設定為 False |
Column Name Case | 欄位名稱大小寫 | 預設不變更欄位名稱大小寫: No change |
Transaction Isolation Level | 交易層級 | TRANSACTION_SERIALIZABLE |
測試執行
設定完成後,在上方點選 Preview
按鈕進行預先執行,確認整個邏輯從頭到尾是否能正常運行
點選完 Preview
後點選 Run
進行測試
測試成功會看到上方顯示 The preview of the pipeline has completed successfully
的訊息,點選下方的 Preview Data
選項,可以預覽整個測試流程的 input 及 output 資料
下方是 Sink Database 的 Preview Data
設定部署
重新點選 Preview
按鈕,離開 Preview 模式,點選左上方的 Name your pipeline
命名此 pipeline,pipeline 要有名稱才可以順利的 Deploy
命名完成後點選 Deploy
按鈕將此 pipeline 部署出去
開始部署
部署出去後,可以點選上方的 Run
按鈕開始執行
部署完成後,可以看到 Status
顯示為 Succeeded
,在下方也會顯示輸出的資料列有多少
這樣就完成了整個 ETL 的資料擷取的部署流程了,資料可以順利的從 Source 來源
轉換到 Sink 資料庫