hbaseSink.md

September 8, 2020 · View on GitHub

1.格式:

CREATE TABLE MyResult(
    colFamily:colName colType,
    ...
 )WITH(
    type ='hbase',
    zookeeperQuorum ='ip:port[,ip:port]',
    tableName ='tableName',
    rowKey ='colName[+colName]',
    parallelism ='1',
    zookeeperParent ='/hbase'
 )


2.支持版本

hbase2.0

3.表结构定义

参数名称含义
tableName在 sql 中使用的名称;即注册到flink-table-env上的名称
colFamily:colNamehbase中的列族名称和列名称
colType列类型 colType支持的类型

4.参数:

参数名称含义是否必填默认值
type表明 输出表类型[mysq|hbase|elasticsearch]
zookeeperQuorumhbase zk地址,多个直接用逗号隔开
zookeeperParentzkParent 路径
tableName关联的hbase表名称
rowkeyhbase的rowkey关联的列信息'+'多个值以逗号隔开
updateModeAPPEND:不回撤数据,只下发增量数据,UPSERT:先删除回撤数据,然后更新APPEND|
parallelism并行度设置1
kerberosAuthEnable是否开启kerberos认证false
regionserverPrincipalregionserver的principal,这个值从hbase-site.xml的hbase.regionserver.kerberos.principal属性中获取
clientKeytabFileclient的keytab 文件
clientPrincipalclient的principal
zookeeperSaslClientzookeeper.sasl.client值true
securityKrb5Confjava.security.krb5.conf值
另外开启Kerberos认证还需要在VM参数中配置krb5, -Djava.security.krb5.conf=/Users/xuchao/Documents/flinkSql/kerberos/krb5.conf
同时在addShipfile参数中添加keytab文件的路径,参数具体细节请看命令参数说明

5.样例:

普通结果表语句示例

CREATE TABLE MyTable(
    name varchar,
    channel varchar,
    age int
 )WITH(
    type ='kafka10',
    bootstrapServers ='172.16.8.107:9092',
    zookeeperQuorum ='172.16.8.107:2181/kafka',
    offsetReset ='latest',
    topic ='mqTest01',
    timezone='Asia/Shanghai',
    updateMode ='append',
    enableKeyPartitions ='false',
    topicIsPattern ='false',
    parallelism ='1'
 );

CREATE TABLE MyResult(
    cf:name varchar ,
    cf:channel varchar 
 )WITH(
	type ='hbase',
	zookeeperQuorum ='172.16.10.104:2181,172.16.10.224:2181,172.16.10.252:2181',
	zookeeperParent ='/hbase',
	tableName ='myresult',
	partitionedJoin ='false',
	parallelism ='1',
	rowKey='name+channel'
 );

insert          
into
    MyResult
    select
        channel,
        name                                            
    from
        MyTable a       

 

kerberos认证结果表语句示例

CREATE TABLE MyTable(
    name varchar,
    channel varchar,
    age int
 )WITH(
    type ='kafka10',
    bootstrapServers ='172.16.8.107:9092',
    zookeeperQuorum ='172.16.8.107:2181/kafka',
    offsetReset ='latest',
    topic ='mqTest01',
    timezone='Asia/Shanghai',
    updateMode ='append',
    enableKeyPartitions ='false',
    topicIsPattern ='false',
    parallelism ='1'
 );

CREATE TABLE MyResult(
    cf:name varchar ,
    cf:channel varchar 
 )WITH(
	type ='hbase',
	zookeeperQuorum ='cdh2.cdhsite:2181,cdh4.cdhsite:2181',
	zookeeperParent ='/hbase',
	tableName ='myresult',
	partitionedJoin ='false',
	parallelism ='1',
	rowKey='name',
    kerberosAuthEnable='true',
    regionserverPrincipal='hbase/_HOST@DTSTACK.COM',
    clientKeytabFile='test.keytab',
    clientPrincipal='test@DTSTACK.COM',
    securityKrb5Conf='krb5.conf',
 );

insert          
into
    MyResult
    select
        channel,
        name                                            
    from
        MyTable a      

6.hbase数据

数据内容说明

hbase的rowkey 构建规则:以描述的rowkey字段值作为key,多个字段以'+'连接

数据内容示例

hbase(main):007:0> scan 'myresult' ROW COLUMN+CELL
roc-daishu column=cf:channel, timestamp=1589183971724, value=daishu
roc-daishu column=cf:name, timestamp=1589183971724, value=roc