Kafka中的SASL安全认证以及配置ACL权限控制

参考文档

kafka测试环境搭建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
version: '3.8'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
mem_limit: 256m
kafka:
image: wurstmeister/kafka
depends_on: [ zookeeper ]
ports:
- "9092:9092"
volumes:
- /data/cluster/kafka_redis_zk/kafka_config:/opt/kafka_2.13-2.8.1/config
- /data/cluster/kafka_redis_zk/kafka_bin:/opt/kafka_2.13-2.8.1/bin
environment:
#KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_ADVERTISED_HOST_NAME: 192.168.16.110
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
mem_limit: 1024m
redis-master:
image: redis:3.2.3-alpine
container_name: redis_3
ports:
- 26379:6379
volumes:
- /etc/timezone:/etc/timezone:ro
- /etc/localtime:/etc/localtime:ro
- /data/app/redis3/redis.conf:/usr/local/etc/redis/redis.conf:rw
- /data/app/redis3/data:/data:rw
mem_limit: 512m
  • 其中kafka需要先启动一个未配置挂载目录的容器.将bin与config目录拷贝出来.

修改kafka配置开启SASL

server 的配置

因为这个是用作单节点测试的,启动在容器中的服务.部分服务IP需要配置成容器宿主的IP .

1
vi server.properties

操作符 G 可以跳转到文件末尾

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 因环境变量的配置,初始会增加如下配置
port=9092
# 这里配置的是容器宿主机的地址.
advertised.host.name=192.168.16.110

# 还需要增加sasl验证的相关配置
#--------------------------------------------------
# 配置ACL入口类
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer

# SASL_PLAINTEXT
#这里的listener中的wyk01 在三台机器上换成每台机器对应的hostname/ip
listeners=SASL_PLAINTEXT://kafka:9092
# 我测试使用的是 kafka_2.13-2.8.1 该配置不能与 security.inter.broker.protocol 同时存在,据说高版本可以同时存在,未做验证
#inter.broker.listener.name=SASL_PLAINTEXT

# 指定了外部客户端连接到 Kafka broker 时使用的地址。
# 这里配置的地址应该是客户端能够访问到的地址,如果 Kafka broker 在一个内部网络中运行,而客户端在外部网络,则这里可能需要配置成外部网络的地址。
# 192.168.16.110 是容器宿主机的地址. 9092是对外映射的端口
advertised.listeners=SASL_PLAINTEXT://192.168.16.110:9092
# 指定了 Kafka broker 之间通信的安全协议。在这里,设置为 SASL_PLAINTEXT,表示 broker 之间的通信使用 SASL_PLAINTEXT 安全协议
security.inter.broker.protocol=SASL_PLAINTEXT
# 指定了 broker 之间通信时使用的 SASL 机制。在这里,设置为 PLAIN,表示使用简单的用户名和密码进行认证。
sasl.mechanism.inter.broker.protocol=PLAIN
# 指定了允许的 SASL 认证机制。在这里,设置为 PLAIN,表示允许使用简单的用户名和密码进行认证。
sasl.enabled.mechanisms=PLAIN

# 设置了超级用户,即具有所有权限的用户。在这里,设置为 User:admin,表示用户名为 admin 的用户拥有所有权限。
super.users=User:admin

#设置为true,ACL机制为黑名单机制,只有黑名单中的用户无法访问
#默认为false,ACL机制为白名单机制,只有白名单中的用户可以访问
allow.everyone.if.no.acl.found=true
#--------------------------------------------------

新增 kafka_server_jaas.conf 配置文件添加用户,注意最后的两个分号

前三行是配置管理员账户(该账户与上面server.properties中配置的super.users一样),后面的user_wyk_reader=”wyk_reader_pwd”表示添加一个用户名为wyk_reader对应的密码为wyk_reader_pwd。即 user_用户名=”该用户的密码”。之后配置ACL的时候需要用到这里配置的用户

1
2
3
4
5
6
7
8
9
10
11
vim /data/cluster/kafka_redis_zk/kafka_config/kafka_server_jaas.conf
#添加下面的内容
#--------------------------------------------------
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_wyk_reader="wyk_reader_pwd"
user_wyk_writer="wyk_writer_pwd"
user_no_acl="no_acl_pwd";
};

修改kafka-server-start.sh文件,将刚刚配置的用户列表添加到kafka启动脚本内:

1
2
3
4
5
6
vim /data/cluster/kafka_redis_zk/kafka_bin/kafka-server-start.sh
#修改最后一行,改成下面的内容
#-------------------------------------------
#exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"
source /etc/profile
exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$KAFKA_HOME/config/kafka_server_jaas.conf kafka.Kafka "$@"

重启kafka集群

1
docker restart kafka 

控制台客户端配置认证文件

生产者

新增配置文件wyk_writer_jaas.conf,这里的用户名密码必须和服务器端配置的一样,我们的客户端是拿着这个认证信息去和kafka 的服务端做校验,如果不匹配就等同于登录失败。

1
2
3
4
5
6
7
vim  /data/cluster/kafka_redis_zk/kafka_config/wyk_writer_jaas.conf
#添加下面的内容
#-------------------------------
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required
username="wyk_writer"
password="wyk_writer_pwd";
};

修改生产者启动脚本,在启动生产者客户端时会去加载该认证文件。

1
2
3
4
5
6
vim /data/cluster/kafka_redis_zk/kafka_bin/kafka-console-producer.sh
#修改最后一行,改成下面的内容
#-------------------------------------------
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
source /etc/profile
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/wyk_writer_jaas.conf kafka.tools.ConsoleProducer "$@"

修改 producer.properties 添加SASL认证。

1
2
3
4
5
vim /data/cluster/kafka_redis_zk/kafka_config/producer.properties
#添加下面的内容
#-------------------------------------------
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

消费者

新增配置文件wyk_reader_jaas.conf,这里的用户名密码必须和服务器端配置的一样,我们的客户端是拿着这个认证信息去和kafka 的服务端做校验,如果不匹配就等同于登录失败。

1
2
3
4
5
6
7
8
vim /data/cluster/kafka_redis_zk/kafka_config/wyk_reader_jaas.conf
#添加下面的内容
#-------------------------------
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="wyk_reader"
password="wyk_reader_pwd";
};

修改消费者启动脚本,在启动消费者客户端时会去加载该认证文件。

1
2
3
4
5
6
vim /data/cluster/kafka_redis_zk/kafka_bin/kafka-console-consumer.sh 
#修改最后一行,改成下面的内容
#-------------------------------------------
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleConsumer "$@"
source /etc/profile
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/wyk_reader_jaas.conf kafka.tools.ConsoleConsumer "$@"

修改 consumer.properties 添加SASL认证。需注意,这个配置文件中有一个消费者组group.id=test-consumer-group,待会在给控制台客户端授权消费者权限时还需要指定这个消费者组。

1
2
3
4
5
vim /data/cluster/kafka_redis_zk/kafka_config/consumer.properties
#添加下面的内容
#-------------------------------------------
security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

ACL授权及验证

我们已经开启了SASL,并给控制台客户端配置了生产者和消费者的认证文件,下面我们来测试授权之后和未授权用户的区别。

为了起到对比效果,我们先新增一个认证文件用户为no_acl

新增配置文件no_acl_jaas.conf

1
2
3
4
5
6
7
8
vim /data/cluster/kafka_redis_zk/kafka_config/no_acl_jaas.conf
#添加下面的内容
#-------------------------------
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="no_acl"
password="no_acl_pwd";
};

使用下面的命令行授权用户 wyk_reader 对主题csdn01 的读权限,授权用户 wyk_writer 对主题csdn01 的写权限,no_acl用户不设置任何权限用做对比。

这里调试需要进入容器中调试

1
docker exec -it kafka  bash 

在容器的环境内操作

1
2
3
4
5
6
7
cd /opt/kafka_2.13-2.8.1
# 给用户wyk_writer 添加csdn01主题的 生产者权限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zookeeper:2181 --add --allow-principal User:wyk_writer --operation Write --topic csdn01

# 给用户wyk_reader 添加csdn01主题的 消费者权限
#需要注意这里消费者还需要给消费者组配置权限,消费者组名称要和consumer.properties中配置的group.id一致
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zookeeper:2181 --add --allow-principal User:wyk_reader --operation Read --topic csdn01 --group test-consumer-group

操作信息如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
userapp@home-center:/data/cluster/kafka_redis_zk/kafka_config$ docker exec -it kafka_redis_zk-kafka-1 bash
root@19bb3ec321e0:/# cd /opt/kafka_2.13-2.8.1
root@19bb3ec321e0:/opt/kafka_2.13-2.8.1#
root@19bb3ec321e0:/opt/kafka_2.13-2.8.1# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zookeeper:2181 --add --allow-principal User:wyk_writer --operation Write --topic csdn01
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=csdn01, patternType=LITERAL)`:
(principal=User:wyk_writer, host=*, operation=WRITE, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=csdn01, patternType=LITERAL)`:
(principal=User:wyk_writer, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:wyk_reader, host=*, operation=READ, permissionType=ALLOW)

root@19bb3ec321e0:/opt/kafka_2.13-2.8.1# bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zookeeper:2181 --add --allow-principal User:wyk_reader --operation Read --topic csdn01 --group test-consumer-group
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=csdn01, patternType=LITERAL)`:
(principal=User:wyk_reader, host=*, operation=READ, permissionType=ALLOW)

Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=test-consumer-group, patternType=LITERAL)`:
(principal=User:wyk_reader, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=test-consumer-group, patternType=LITERAL)`:
(principal=User:wyk_reader, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=csdn01, patternType=LITERAL)`:
(principal=User:wyk_writer, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:wyk_reader, host=*, operation=READ, permissionType=ALLOW)

root@19bb3ec321e0:/opt/kafka_2.13-2.8.1#

验证ACL权限:

当使用no_acl用户作为生产者时可以看到启动时会报错:Not authorized to access topics:[csdn01]。而使用拥有csdn01主题的生产者权限的用户wyk_writer启动时可以正常的使用:

修改脚本 kafka-console-producer.sh 增加两行测试命令

1
2
#exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/wyk_writer_jaas.conf kafka.tools.ConsoleProducer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/no_acl_jaas.conf kafka.tools.ConsoleProducer "$@"

先测试no_acl_jaas

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
root@19bb3ec321e0:/opt/kafka_2.13-2.8.1# cat bin/kafka-console-producer.sh 
#!/bin/bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx512M"
fi
#exec $(dirname $0)/kafka-run-class.sh kafka.tools.ConsoleProducer "$@"
#exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/wyk_writer_jaas.conf kafka.tools.ConsoleProducer "$@"
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/no_acl_jaas.conf kafka.tools.ConsoleProducer "$@"

执行测试

1
2
3
4
5
6
7
8
root@19bb3ec321e0:/opt/kafka_2.13-2.8.1# bin/kafka-console-producer.sh --topic csdn01 --broker-list kafka:9092 --producer.config config/producer.properties
>333
[2024-06-13 14:42:31,493] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 3 : {csdn01=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2024-06-13 14:42:31,496] ERROR [Producer clientId=console-producer] Topic authorization failed for topics [csdn01] (org.apache.kafka.clients.Metadata)
[2024-06-13 14:42:31,498] ERROR Error when sending message to topic csdn01 with key: null, value: 3 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [csdn01]
>

可以看到报错无选权限
修改命令行为

1
2
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/wyk_writer_jaas.conf kafka.tools.ConsoleProducer "$@"
#exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$KAFKA_HOME/config/no_acl_jaas.conf kafka.tools.ConsoleProducer "$@"

操作可以成功

1
2
3
4
5
6
7
8
9
root@19bb3ec321e0:/opt/kafka_2.13-2.8.1# 
root@19bb3ec321e0:/opt/kafka_2.13-2.8.1# bin/kafka-console-producer.sh --topic csdn01 --broker-list kafka:9092 --producer.config config/producer.properties
>333
>999
>333
>1111
>999
>

将生产者的sasl认证文件里的密码 改为错误密码后,启动时会报错invalid username or password:

修改wyk_writer_jaas.conf中的密码为错误的密码

1
2
3
4
5
root@home-center:/data/cluster/kafka_redis_zk/kafka_config# cat wyk_writer_jaas.conf 
KafkaClient { org.apache.kafka.common.security.plain.PlainLoginModule required
username="wyk_writer"
password="wyk_writer_pwd_wrong";
};

执行报错: Invalid username or password

1
2
3
4
5
6
7
8
9
root@19bb3ec321e0:/opt/kafka_2.13-2.8.1# bin/kafka-console-producer.sh --topic csdn01 --broker-list kafka:9092 --producer.config config/producer.properties
>[2024-06-13 14:45:48,018] ERROR [Producer clientId=console-producer] Connection to node -1 (kafka/172.19.0.4:9092) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)
[2024-06-13 14:45:48,019] WARN [Producer clientId=console-producer] Bootstrap broker kafka:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2024-06-13 14:45:48,424] ERROR [Producer clientId=console-producer] Connection to node -1 (kafka/172.19.0.4:9092) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)
[2024-06-13 14:45:48,424] WARN [Producer clientId=console-producer] Bootstrap broker kafka:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2024-06-13 14:45:48,881] ERROR [Producer clientId=console-producer] Connection to node -1 (kafka/172.19.0.4:9092) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)
[2024-06-13 14:45:48,881] WARN [Producer clientId=console-producer] Bootstrap broker kafka:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2024-06-13 14:45:49,287] ERROR [Producer clientId=console-producer] Connection to node -1 (kafka/172.19.0.4:9092) failed authentication due to: Authentication failed: Invalid username or password (org.apache.kafka.clients.NetworkClient)
[2024-06-13 14:45:49,287] WARN [Producer clientId=console-producer] Bootstrap broker kafka:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)

Python客户端访问 (未验证)

需要增加测试用户不需要重启服务. 或者使用刚才的用户.

生产者脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['wyk01:9092','wyk02:9092','wyk03:9092'],
security_protocol="SASL_PLAINTEXT",
sasl_mechanism="PLAIN",
sasl_plain_username="python_wyk",
sasl_plain_password="python_wyk_pwd"
)

producer.send('csdn01', json.dumps({"id":"1","name":"wyk1","company":"csdn1"}).encode('utf-8'))
producer.send('csdn01', json.dumps({"id":"2","name":"wyk2","company":"csdn2"}).encode('utf-8'))
producer.send('csdn01', json.dumps({"id":"3","name":"wyk3","company":"csdn3"}).encode('utf-8'))
producer.flush()

消费者脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import json

consumer = KafkaConsumer('csdn01',
bootstrap_servers=['wyk01:9092','wyk02:9092','wyk03:9092']
,group_id='test-consumer-group'
,auto_offset_reset='latest'
,enable_auto_commit=False
,security_protocol="SASL_PLAINTEXT"
,sasl_mechanism="PLAIN"
,sasl_plain_username="python_wyk"
,sasl_plain_password="python_wyk_pwd"
)
for msg in consumer:
print(msg)

java代码 flink读取kafka(已验证)

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import lombok.Data;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Properties;


public class FlinkKafkaProducer1 {

public static void main(String[] args) throws Exception {
// 0 初始化 flink 环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1 读取集合中数据
ArrayList<String> wordsList = new ArrayList<>();
wordsList.add("hello");
wordsList.add("world");
wordsList.add("offset 1");
wordsList.add("offset 2");
wordsList.add("offset 3");

DataStream<String> stream = env.fromCollection(wordsList);
// 2 kafka 生产者配置信息
Properties properties = new Properties();
// properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "100.100.100.1:9092");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.16.110:9092");


// properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
System.setProperty("java.security.auth.login.config", "D:\\workspace\\java\\yunque\\flinkconnectors\\src\\main\\resources\\write_jaas.conf");

// properties.put("sasl.plain.username", "wyk_writer");
// properties.put("sasl.plain.password", "wyk_writer_pwd");


// 3 创建 kafka 生产者
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>(
"csdn01",
new SimpleStringSchema(),
properties
);
// 4 生产者和 flink 流关联
stream.addSink(kafkaProducer);
// 5 执行
env.execute();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import lombok.Data;
import lombok.val;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.util.*;


@Data
class KafkaMsg {
private String key;
private String value;
private String topic;
private Integer partition;
private Long offSet;

public KafkaMsg() {
}

public KafkaMsg(String key, String value, String topic, Integer partition, Long offSet) {
this.key = key;
this.value = value;
this.topic = topic;
this.partition = partition;
this.offSet = offSet;
}
}

class TypedKeyedDeserializationSchema implements KeyedDeserializationSchema<KafkaMsg> {

@Override
public KafkaMsg deserialize(byte[] key, byte[] value, String topic, int partition, long offset) throws IOException {
System.out.println("key = " + key);
if (key == null){
key=new byte[0] ;
}
System.out.println("value = " + value);
System.out.println("topic = " + topic);
System.out.println("partition = " + partition);
System.out.println("offset = " + offset);
return new KafkaMsg(new String(key),new String(value),topic,partition,offset);
}

@Override
public boolean isEndOfStream(KafkaMsg kafkaMsg) {
return false;
}

@Override
public TypeInformation<KafkaMsg> getProducedType() {
return TypeInformation.of(KafkaMsg.class);
}
}


public class FlinkKafkaConsumer1 {
public static void main(String[] args) throws Exception {
// 0 初始化 flink 环境
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// 1 kafka 消费者配置信息
Properties properties = new Properties();
properties.setProperty("group.id","test-consumer-group");
// properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
// properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "100.100.100.1:9092");
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.16.110:9092");

properties.put("security.protocol", "SASL_PLAINTEXT");
properties.put("sasl.mechanism", "PLAIN");
System.setProperty("java.security.auth.login.config", "D:\\workspace\\java\\yunque\\flinkconnectors\\src\\main\\resources\\read_jaas.conf");


// 2 创建 kafka 消费者
FlinkKafkaConsumer<KafkaMsg> kafkaConsumer = new FlinkKafkaConsumer<>(
"csdn01",
new TypedKeyedDeserializationSchema(),
properties
);

// 获取最大offset
Map map = latest_off("192.168.16.110:9092","csdn01",0);
System.out.println("map = " + map);

// // 指定offset
// /**
// * Flink从topic中最初的数据开始消费
// */
// kafkaConsumer.setStartFromEarliest();
// /**
// * Flink从topic中指定的时间点开始消费,指定时间点之前的数据忽略
// */
// kafkaConsumer.setStartFromTimestamp(1559801580000l);
/**
* Flink从topic中指定的offset开始,这个比较复杂,需要手动指定offset
*/
// 定义offset map
HashMap<KafkaTopicPartition, Long> kafkaTopicPartitionLongHashMap = new HashMap<>();
// topic 分区 作为key
KafkaTopicPartition partition = new KafkaTopicPartition("first1", 0);
// offset 作为 value
Long offset = 0L ;
kafkaTopicPartitionLongHashMap.put(partition, offset);

kafkaConsumer.setStartFromSpecificOffsets(kafkaTopicPartitionLongHashMap);
// /**
// * Flink从topic中最新的数据开始消费
// */
// kafkaConsumer.setStartFromLatest();
// /**
// * Flink从topic中指定的group上次消费的位置开始消费,所以必须配置group.id参数
// */
// kafkaConsumer.setStartFromGroupOffsets();

// 3 消费者和 flink 流关联
env.addSource(kafkaConsumer).print();
// 4 执行
env.execute();
}




public static Map latest_off(String broker, String topic, Integer partition) {
Properties props = new Properties() ;
props.put("bootstrap.servers", broker);
props.put("group.id", "test-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
System.setProperty("java.security.auth.login.config", "D:\\workspace\\java\\yunque\\flinkconnectors\\src\\main\\resources\\read_jaas.conf");

KafkaConsumer consumer = new KafkaConsumer(props);
Collection<TopicPartition> partitions = new ArrayList<>();

TopicPartition topicPartition = new TopicPartition(topic, partition);
partitions.add(topicPartition);

Map map = consumer.endOffsets(partitions);
return map ;
}

}

ACL常用命令

查看权限列表:

1
2
3
4
5
#查看所有权限列表
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper:2181 --list

#查看指定topic的权限列表
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper:2181 --list --topic csdn01
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
root@19bb3ec321e0:/opt/kafka_2.13-2.8.1# bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper:2181 --list
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=csdn01, patternType=LITERAL)`:
(principal=User:wyk_writer, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:wyk_reader, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=test-consumer-group, patternType=LITERAL)`:
(principal=User:wyk_reader, host=*, operation=READ, permissionType=ALLOW)

root@19bb3ec321e0:/opt/kafka_2.13-2.8.1#
root@19bb3ec321e0:/opt/kafka_2.13-2.8.1#
root@19bb3ec321e0:/opt/kafka_2.13-2.8.1#
root@19bb3ec321e0:/opt/kafka_2.13-2.8.1# bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper:2181 --list --topic csdn01
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=csdn01, patternType=LITERAL)`:
(principal=User:wyk_writer, host=*, operation=WRITE, permissionType=ALLOW)
(principal=User:wyk_reader, host=*, operation=READ, permissionType=ALLOW)

root@19bb3ec321e0:/opt/kafka_2.13-2.8.1#

添加读写权限:

1
2
3
4
5
6
# 给用户wyk_writer 添加csdn01主题的 生产者权限
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zookeeper:2181 --add --allow-principal User:wyk_writer --operation Write --topic csdn01

# 给用户wyk_reader 添加csdn01主题的 消费者权限
#需要注意这里消费者还需要给消费者组配置权限,消费者组名称要和consumer.properties中配置的group.id一致
bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=zookeeper:2181 --add --allow-principal User:wyk_reader --operation Read --topic csdn01 --group test-consumer-group

移除权限

1
2
#移除用户wyk_all 对csdn01的读写权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper:2181 --remove --allow-principal User:wyk_all --operation Read --operation Write --topic csdn01

添加黑名单:

1
2
#添加用户wyk_yes在ip192.168.145.100 对csdn01的读写权限
bin/kafka-acls.sh --authorizer-properties zookeeper.connect=zookeeper:2181 --add --allow-principal User:wyk_yes --allow-host 192.168.145.100 --operation Read --operation Write --topic csdn01