Apache Flink - 表格 API 和 SQL



表格 API 是一個關係型 API,具有類似 SQL 的表示式語言。此 API 可以同時進行批處理和流處理。它可以嵌入 Java 和 Scala Dataset 和 Datastream API 中。您可以從現有的 Dataset 和 Datastream 或外部資料來源建立表。透過此關係型 API,您可以執行連線、聚合、選擇和篩選等操作。無論輸入是批處理還是流處理,查詢的語義都會保持不變。

這裡有一個示例表格 API 程式 −

// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// create a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// register a Table
tableEnv.registerTable("table1", ...) // or
tableEnv.registerTableSource("table2", ...) // or
tableEnv.registerExternalCatalog("extCat", ...)

// register an output Table
tableEnv.registerTableSink("outputTable", ...);
// create a Table from a Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// Create a Table from a SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")

// emit a Table API result Table to a TableSink, same for SQL result
tapiResult.insertInto("outputTable")

// execute
env.execute()
廣告