impalaSink.md

May 21, 2020 · View on GitHub

1.格式:

CREATE TABLE tableName(
    colName colType,
    ...
    colNameX colType
 )WITH(
    type ='impala',
    url ='jdbcUrl',
    userName ='userName',
    password ='pwd',
    tableName ='tableName',
    parallelism ='parllNum'
 );

2.支持版本

2.10.0-cdh5.13.0

3.表结构定义

参数名称含义
tableName在 sql 中使用的名称;即注册到flink-table-env上的名称
colName列名称
colType列类型 colType支持的类型

4.参数:

参数名称含义是否必填默认值
type表明 输出表类型[impala]
url连接postgresql数据库 jdbcUrl
userNamepostgresql连接用户名
passwordpostgresql连接密码
tableNamepostgresqll表名称
authMech身份验证机制 (0, 1, 2, 3),暂不支持kerberos0
principalkerberos用于登录的principal(authMech=1时独有)authMech=1为必填
keyTabFilePathkeytab文件的路径(authMech=1时独有)authMech=1为必填
krb5FilePathkrb5.conf文件路径(authMech=1时独有)authMech=1为必填
krbHostFQDN主机的标准域名(authMech=1时独有)authMech=1为必填
krbServiceNameImpala服务器的Kerberos principal名称(authMech=1时独有)authMech=1为必填
krbRealmKerberos的域名(authMech=1时独有)HADOOP.COM
enablePartition是否支持分区false
partitionFields分区字段名否,enablePartition='true'时为必填
parallelism并行度设置1
parallelism并行度设置1
updateMode只支持APPEND模式,过滤掉回撤数据

5.样例:

CREATE TABLE MyTable(
      channel VARCHAR,
      pt int,
      xctime varchar,
      name varchar
 )WITH(
    type ='kafka11',
    bootstrapServers ='172.16.8.107:9092',
    zookeeperQuorum ='172.16.8.107:2181/kafka',
    offsetReset ='latest',
    topic ='mqTest03'
 );

CREATE TABLE MyResult(
    a STRING,
    b STRING
 )WITH(
    type ='impala',
    url ='jdbc:impala://172.16.101.252:21050/hxbho_pub',
    userName ='root',
    password ='pwd',
    authMech ='3',
    tableName ='tb_result_4',
    parallelism ='1',
    -- 指定分区
    partitionFields  = 'pt=1001,name="name1001" ',
    batchSize = '1000',
    parallelism ='2'
 );

CREATE TABLE MyResult1(
    a STRING,
    b STRING,
    pt int,
    name STRING
 )WITH(
    type ='impala',
    url ='jdbc:impala://172.16.101.252:21050/hxbho_pub',
    userName ='root',
    password ='Wscabc123..@',
    authMech ='3',
    tableName ='tb_result_4',
    parallelism ='1',
    enablePartition ='true',
    -- 动态分区
    partitionFields  = 'pt,name ',
    batchSize = '1000',
    parallelism ='2'
 );


insert  
into
    MyResult1
    select
       xctime AS b,
       channel AS a,
       pt,
       name 
    from
        MyTable;



insert  
into
    MyResult
    select
       xctime AS b,
       channel AS a
    from
        MyTable;