kafkaSink.md

December 29, 2020 · View on GitHub

1.格式:

CREATE TABLE tableName(
    colName colType,
    ...
    function(colNameX) AS aliasName,
    WATERMARK FOR colName AS withOffset( colName , delayTime )
 )WITH(
    type ='kafka11',
    bootstrapServers ='ip:port,ip:port...',
    zookeeperQuorum ='ip:port,ip:port/zkparent',
    offsetReset ='latest',
    topic ='topicName',
    groupId='test',
    parallelism ='parllNum',
 );

2.支持的版本

kafka09,kafka10,kafka11及以上版本    

kafka读取和写入的版本必须一致,否则会有兼容性错误。

3.表结构定义

参数名称含义
tableName在 sql 中使用的名称;即注册到flink-table-env上的名称
colName列名称
colType列类型 colType支持的类型

4.参数:

参数名称含义是否必填默认值
typekafka09kafka09、kafka10、kafka11、kafka(对应kafka1.0及以上版本)
groupId需要读取的 groupId 名称
bootstrapServerskafka bootstrap-server 地址信息(多个用逗号隔开)
zookeeperQuorumkafka zk地址信息(多个之间用逗号分隔)
topic需要读取的 topic 名称
parallelism并行度设置1
partitionKeys用来分区的字段
updateMode回溯流数据下发模式,append,upsert.upsert模式下会将是否为回溯信息以字段形式进行下发。append
sinkdatatype写入kafka数据格式,json,avro,csvjson
fieldDelimitercsv数据分隔符,

kafka相关参数可以自定义,使用kafka.开头即可。

kafka.consumer.id
kafka.socket.timeout.ms
kafka.fetch.message.max.bytes
kafka.num.consumer.fetchers
kafka.auto.commit.enable
kafka.auto.commit.interval.ms
kafka.queued.max.message.chunks
kafka.rebalance.max.retries
kafka.fetch.min.bytes
kafka.fetch.wait.max.ms
kafka.rebalance.backoff.ms
kafka.refresh.leader.backoff.ms
kafka.consumer.timeout.ms
kafka.exclude.internal.topics
kafka.partition.assignment.strategy
kafka.client.id
kafka.zookeeper.session.timeout.ms
kafka.zookeeper.connection.timeout.ms
kafka.zookeeper.sync.time.ms
kafka.offsets.storage
kafka.offsets.channel.backoff.ms
kafka.offsets.channel.socket.timeout.ms
kafka.offsets.commit.max.retries
kafka.dual.commit.enabled
kafka.partition.assignment.strategy
kafka.socket.receive.buffer.bytes
kafka.fetch.min.bytes

5.样例:

json格式:

CREATE TABLE MyResult(
    channel varchar,
    pv varchar
 )WITH(
    type='kafka',
    bootstrapServers='172.16.8.107:9092',
    topic='mqTest02',
    parallelism ='2',
    partitionKeys = 'channel,pv',
    updateMode='upsert'
 );
 
upsert模式下发的数据格式:{"channel":"zs","pv":"330",retract:true}
append模式下发的数据格式:{"channel":"zs","pv":"330"}

avro格式:

如果updateMode='upsert',schemaInfo需要包含retract属性信息。

CREATE TABLE MyTable(
    channel varchar,
    pv varchar
    --xctime bigint
 )WITH(
   type='kafka',
   bootstrapServers='172.16.8.107:9092',
   groupId='mqTest01',
   offsetReset='latest',
   topic='mqTest01',
   parallelism ='1',
   topicIsPattern ='false'
 );

create table sideTable(
    channel varchar,
    xccount int,
    PRIMARY KEY(channel),
    PERIOD FOR SYSTEM_TIME
 )WITH(
    type='mysql',
    url='jdbc:mysql://172.16.8.109:3306/test?charset=utf8',
    userName='dtstack',
    password='abc123',
    tableName='sidetest',
    cache = 'LRU',
    cacheTTLMs='10000',
    parallelism ='1'

 );


CREATE TABLE MyResult(
    channel varchar,
    pv varchar
 )WITH(
    --type='console'
    type='kafka',
    bootstrapServers='172.16.8.107:9092',
    topic='mqTest02',
    parallelism ='1',
    updateMode='upsert',
    sinkdatatype = 'avro',
    schemaInfo = '{"type":"record","name":"MyResult","fields":[{"name":"channel","type":"string"}
    ,{"name":"pv","type":"string"},{"name":"channel","type":"string"},
    {"name":"retract","type":"boolean"}]}'

 );

 
insert 
into
    MyResult   
    select
        a.channel as channel,
        a.pv as pv
    from
            MyTable a

csv格式:

CREATE TABLE MyTable(
    channel varchar,
    pv varchar
    --xctime bigint
 )WITH(
   type='kafka',
   bootstrapServers='172.16.8.107:9092',
   groupId='mqTest01',
   offsetReset='latest',
   topic='mqTest01',
   parallelism ='2',
   topicIsPattern ='false'
 );

create table sideTable(
    channel varchar,
    xccount int,
    PRIMARY KEY(channel),
    PERIOD FOR SYSTEM_TIME
 )WITH(
    type='mysql',
    url='jdbc:mysql://172.16.8.109:3306/test?charset=utf8',
    userName='dtstack',
    password='abc123',
    tableName='sidetest',
    cache = 'LRU',
    cacheTTLMs='10000',
    parallelism ='1'

 );


CREATE TABLE MyResult(
    channel varchar,
    pv varchar
 )WITH(
    type='kafka',
    bootstrapServers='172.16.8.107:9092',
    topic='mqTest02',
    parallelism ='2',
    updateMode='upsert',
    sinkdatatype = 'csv',
    fieldDelimiter='*'
    
   

 );

 
insert 
into
    MyResult   
    select
        a.channel as channel,
        a.pv as pv
    from
            MyTable a

MAP类型示例

目前Kafka Sink支持Map类型

CREATE TABLE ods(
    id INT,
    name STRING
) WITH (
    ...
);

CREATE TABLE dwd (
    id INT,
    dids MAP<STRING, INT>>
) WITH (
    type ='kafka',
    bootstrapServers ='localhost:9092',
    offsetReset ='latest',
    groupId='wuren_foo',
    topic ='luna_foo',
    parallelism ='1'
);

INSERT INTO dwd
    SELECT ods.id, MAP['foo', 1, 'bar', 2] AS dids
    FROM ods;