flink读取kafka数据测试demo

flink读取kafka数据测试demo.
java仅提供代码样例. kafka环境请自行搭建.

kafka快速搭建

1
docker-compose up -d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
mem_limit: 256m
kafka:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
environment:
#KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_HOST_NAME: 100.100.100.1
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
mem_limit: 1024m

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import lombok.Data;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;


public class FlinkKafkaProducer1 {

public static void main(String[] args) throws Exception {
// 0 初始化 flink 环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1 读取集合中数据
ArrayList<String> wordsList = new ArrayList<>();
wordsList.add("hello");
wordsList.add("world");
wordsList.add("offset 1");
wordsList.add("offset 2");
wordsList.add("offset 3");

DataStream<String> stream = env.fromCollection(wordsList);
// 2 kafka 生产者配置信息
Properties properties = new Properties();
// properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "100.100.100.1:9092");
// 3 创建 kafka 生产者
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
"first1",
new SimpleStringSchema(),
properties
);
// 4 生产者和 flink 流关联
stream.addSink(kafkaProducer);
// 5 执行
env.execute();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import lombok.Data;
import lombok.val;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.util.*;


@Data
class KafkaMsg {
private String key;
private String value;
private String topic;
private Integer partition;
private Long offSet;

public KafkaMsg() {
}

public KafkaMsg(String key, String value, String topic, Integer partition, Long offSet) {
this.key = key;
this.value = value;
this.topic = topic;
this.partition = partition;
this.offSet = offSet;
}
}

class TypedKeyedDeserializationSchema implements KeyedDeserializationSchema<KafkaMsg> {

@Override
public KafkaMsg deserialize(byte[] key, byte[] value, String topic, int partition, long offset) throws IOException {
System.out.println("key = " + key);
if (key == null){
key=new byte[0] ;
}
System.out.println("value = " + value);
System.out.println("topic = " + topic);
System.out.println("partition = " + partition);
System.out.println("offset = " + offset);
return new KafkaMsg(new String(key),new String(value),topic,partition,offset);
}

@Override
public boolean isEndOfStream(KafkaMsg kafkaMsg) {
return false;
}

@Override
public TypeInformation<KafkaMsg> getProducedType() {
return TypeInformation.of(KafkaMsg.class);
}
}


public class FlinkKafkaConsumer1 {
public static void main(String[] args) throws Exception {
// 0 初始化 flink 环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1 kafka 消费者配置信息
Properties properties = new Properties();
properties.setProperty("group.id","test");
// properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "100.100.100.1:9092");
// 2 创建 kafka 消费者
FlinkKafkaConsumer<KafkaMsg> kafkaConsumer = new FlinkKafkaConsumer<>(
"first1",
new TypedKeyedDeserializationSchema(),
properties
);

// 获取最大offset
Map map = latest_off("100.100.100.1:9092","first1",0);
System.out.println("map = " + map);

// // 指定offset
// /**
// * Flink从topic中最初的数据开始消费
// */
// kafkaConsumer.setStartFromEarliest();
// /**
// * Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略
// */
// kafkaConsumer.setStartFromTimestamp(1559801580000l);
/**
* Flink从topic中指定的offset开始,这个比较复杂,需要手动指定offset
*/
// 定义offset map
HashMap<KafkaTopicPartition, Long> kafkaTopicPartitionLongHashMap = new HashMap<>();
// topic 分区 作为key
KafkaTopicPartition partition = new KafkaTopicPartition("first1", 0);
// offset 作为 value
Long offset = 0L ;
kafkaTopicPartitionLongHashMap.put(partition, offset);

kafkaConsumer.setStartFromSpecificOffsets(kafkaTopicPartitionLongHashMap);
// /**
// * Flink从topic中最新的数据开始消费
// */
// kafkaConsumer.setStartFromLatest();
// /**
// * Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
// */
// kafkaConsumer.setStartFromGroupOffsets();

// 3 消费者和 flink 流关联
env.addSource(kafkaConsumer).print();
// 4 执行
env.execute();
}




public static Map latest_off(String broker, String topic, Integer partition) {
Properties props = new Properties() ;
props.put("bootstrap.servers", broker);
props.put("group.id", "offsetHunter");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer(props);
Collection<TopicPartition> partitions = new ArrayList<>();

TopicPartition topicPartition = new TopicPartition(topic, partition);
partitions.add(topicPartition);

Map map = consumer.endOffsets(partitions);
return map ;
}

}