val kafkaDDL :String =
"""
|create table MyTable (
|componentType STRING,
|endDate STRING,
|envName STRING,
|resultId STRING,
|spendTime STRING,
|returnDataNum STRING,
|startDate STRING,
|tableName STRING,
|tenantName STRING
|) with (
| 'connector.type' = 'kafka',
| 'connector.version' = '0.10',
| 'connector.topic' = 'test',
| 'connector.properties.1.key' = 'bootstrap.servers',
| 'connector.properties.1.value' = 'localhost:9092',
| 'connector.properties.0.key' = 'zookeeper.connect',
| 'connector.properties.0.value' = 'localhost:2181',
| 'connector.properties.2.key' = 'group.id',
| 'connector.properties.2.value' = 'testGroup',
| 'connector.startup-mode' = 'earliest-offset',
| 'connector.property-version' = '1',
| 'format.type' = 'json',
| 'format.property-version' = '1',
| 'update-mode' = 'append'
|)
""".stripMargin
tableEnv.sqlUpdate(kafkaDDL)
val hbaseDDL :String =
"""
|Create table flink_log1 (
|rowkey string,
|cf ROW(f1 BIGINT,f2 BIGINT,f3 INT,f4 INT,f5 BIGINT,tenantName VARCHAR)
|) with(
| 'connector.type' = 'hbase',
| 'connector.version' = '1.4.3',
| 'connector.table-name' = 'flink_log1',
| 'connector.zookeeper.quorum' = '....:2181,....:2181',
| 'connector.zookeeper.znode.parent' = '/hbase',
| 'connector.write.buffer-flush.max-size' = '10mb',
| 'connector.write.buffer-flush.max-rows' = '1000'
|)
""".stripMargin
tableEnv.sqlUpdate(hbaseDDL)
val sql =
"select CONCAT_WS('_',tenantName,tenantName) as tm_tenantName, " +
"count(tenantName) as f1 ," +
"count(case when resultId = '2' then resultId else '' end) as f2 ,"+
"avg(spendTime) as f3 ,"+
"sum(returnDataNum) as f4 ,"+
"count(case when resultId = '1' then tenantName else '' end) as f5 ,"+
"tenantName "+
"from MyTable where substring(endDate,1,10)='2020-06-28' " +
"group by CONCAT_WS('_',tenantName,tenantName) , tenantName"
val table: Table = tableEnv.sqlQuery(sql)
tableEnv.createTemporaryView("tmp",table)
tableEnv.sqlUpdate("insert into flink_log1 " +
"select tm_tenantName,ROW(f1,f2,f3,f4,f5,tenantName) as cf from tmp ")
streamEnv.execute("my insert hbase sql")
这里的情况主要是插入hbase的时候group by 多个字段会导致
报:
Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has full primary keys if it is updated.
但是我group by 单个字段是的时候是没有问题的,也就是当我加上group by tenantName的时候就会报这个错误
麻烦老师看一下这是什么问题
谢谢~