ETL: Data Fusion 轉換資料至 PostgreSQL

GCP ETL 介紹: Data Fusion 轉換資料至 PostgreSQL

資料透過 Data Fusion 處理過後,將處理過的資料存至 PostgreSQL

選擇資料來源

點選 Wrangler

ETL sink source to PostgreSQL

在左側的資料來源選擇你的資料來源

  • Database
  • Kafka
  • S3
  • Google Cloud Storage
  • Google BigQuery
  • Google Cloud Spanner
  • ADLS Handler

我這裡是選擇 Database 當作我要轉換資料的資料來源,點選後在右側選擇要轉換的資料表

ETL sink source to PostgreSQL

使用 Wrangler 做資料轉換

選擇完資料表後會到 Wrangler 資料轉換頁,可以點選上方欄位下拉選單,對資料進行解析或變更資料類型

ETL sink source to PostgreSQL

這裡因為 Sink 目標的資料表的欄位名稱是不同的,所以可以直接在上方將欄位名稱做異動變更

在 Wrangler 異動的任何資料不會去影響到原本來源資料庫的資料,Wrangler 會將資料擷取下來後在額外進行處理,而不會直接異動原始資料

ETL sink source to PostgreSQL

ETL sink source to PostgreSQL

欄位變更完後點選右上方 Create a Pipeline,然後在彈跳出的視窗點選 Batch pipeline 建立資料解析流程

ETL sink source to PostgreSQL

建立完成後會出現 Source 是剛剛選擇的資料庫,而點選 Wrangler 的 Properties 可以看到剛剛做的異動

ETL sink source to PostgreSQL

異動的 Scripts 會寫在下方

keep id,english_given_name,english_family_name,created_at,updated_at
rename english_given_name english_first_name
rename english_family_name english_last_name
rename created_at signup_at

再重新點選 WRANGLE 按鈕可以回去原本的資料異動頁面

ETL sink source to PostgreSQL

變更資料來源 SQL

點選資料來源 DatabaseProperties 按鈕可以做資料庫來源的細部設定

ETL sink source to PostgreSQL

在下方可以看到 Connection String,格式會長的像這樣

jdbc:postgresql://{ip-address}:{port}/{database-name}

jdbc:postgresql://1.1.1.1:5432/etl_database

而在下方的 Import Query 可以看到資料來源撈取的 SQL 語法

ETL sink source to PostgreSQL

資料來源的 SQL 語法可以自行做異動

SELECT * FROM employee WHERE created_at < now() AND $CONDITIONS

ETL sink source to PostgreSQL

設定資料儲存目標 Database

當資料處理完成後,可以點選左方的 Sink 頁籤,點選 Database 當作 Sink 的目標

ETL sink source to PostgreSQL

Wrangler 處理完後的資料拉選至 Sink Database,可以看到像下方的樣子,點選 Sink DatabaseProperties 設定 Sink 資料庫

ETL sink source to PostgreSQL

類別 簡述 範例
Plugin Name 套件名稱 postgresql
Plugin Type 套件類型 jdbc
Plugin Type 套件類型 jdbc
Connection String 連線字串 jdbc:postgresql://{ip-address}:{port}/{database-name}
Table Name 資料表名稱 etl_table_name
Columns 欄位名稱 Input Schema 對應的欄位名稱

ETL sink source to PostgreSQL

設定完資料對應後,設定連線的帳號密碼及交易

類別 簡述 說明
Username 資料庫帳號 db account
Password 資料庫密碼 db password
Enable Auto-Commit 自動 commit postgresql 僅需設定為 False
Column Name Case 欄位名稱大小寫 預設不變更欄位名稱大小寫: No change
Transaction Isolation Level 交易層級 TRANSACTION_SERIALIZABLE

ETL sink source to PostgreSQL

測試執行

設定完成後,在上方點選 Preview 按鈕進行預先執行,確認整個邏輯從頭到尾是否能正常運行

ETL sink source to PostgreSQL

點選完 Preview 後點選 Run 進行測試

ETL sink source to PostgreSQL

ETL sink source to PostgreSQL

測試成功會看到上方顯示 The preview of the pipeline has completed successfully 的訊息,點選下方的 Preview Data 選項,可以預覽整個測試流程的 input 及 output 資料

ETL sink source to PostgreSQL

下方是 Sink Database 的 Preview Data

ETL sink source to PostgreSQL

設定部署

重新點選 Preview 按鈕,離開 Preview 模式,點選左上方的 Name your pipeline 命名此 pipeline,pipeline 要有名稱才可以順利的 Deploy

ETL sink source to PostgreSQL

ETL sink source to PostgreSQL

命名完成後點選 Deploy 按鈕將此 pipeline 部署出去

ETL sink source to PostgreSQL

開始部署

部署出去後,可以點選上方的 Run 按鈕開始執行

ETL sink source to PostgreSQL

ETL sink source to PostgreSQL

部署完成後,可以看到 Status 顯示為 Succeeded,在下方也會顯示輸出的資料列有多少

ETL sink source to PostgreSQL

這樣就完成了整個 ETL 的資料擷取的部署流程了,資料可以順利的從 Source 來源 轉換到 Sink 資料庫

參考資料