网站建设维护招聘,卖鞋的网站建设思路,网站开发怎样验收,企业建站网站认证文章目录 1.1.漏洞修复1.1.1.Apache Kafka反序列化漏洞1.1.2.pm2-kafka代码执行漏洞1.1.3.Apache Kafka安全绕过漏洞1.1.4.Apache Kafka Distribution - Schema Repository跨站请求伪造漏洞1.1.5.Apache Kafka输入验证错误漏洞的补丁1.1.6.Apache Kafka信息泄露漏洞1.1.7.Apach… 文章目录 1.1.漏洞修复1.1.1.Apache Kafka反序列化漏洞1.1.2.pm2-kafka代码执行漏洞1.1.3.Apache Kafka安全绕过漏洞1.1.4.Apache Kafka Distribution - Schema Repository跨站请求伪造漏洞1.1.5.Apache Kafka输入验证错误漏洞的补丁1.1.6.Apache Kafka信息泄露漏洞1.1.7.Apache Kafka定时攻击漏洞 1.2.安全加固1.2.1.安全概述1.2.2.使用SSL加密1.2.3.使用SASL/PLAIN认证 1.1.漏洞修复
1.1.1.Apache Kafka反序列化漏洞
公开日期 2017-09-04 危害级别 高 (AV:N/AC:L/Au:N/C:P/I:P/A:P) 影响产品 Apache Kafka 0.11.0.0 漏洞描述 Apache Kafka是用于构建实时数据流水线和流媒体的应用服务。
Apache kafka connect-api在执行FileOffsetBackingStore类存在反序列化漏洞。攻击者可以利用漏洞导致远程代码执行。 漏洞类型 通用型漏洞 漏洞解决方案 目前没有详细解决方案提供 http://www.apache.org/
1.1.2.pm2-kafka代码执行漏洞
公开日期 2018-06-19 危害级别 高 (AV:N/AC:L/Au:N/C:P/I:P/A:P) 影响产品 pm2-kafka pm2-kafka CVE ID CVE-2016-10693 漏洞描述 pm2-kafka是一个用于安装并运行kafka服务器的PM2模块。
pm2-kafka中存在安全漏洞该漏洞源于程序通过未加密的HTTP连接下载二进制文件。远程攻击者可通过拦截响应并用恶意的可执行文件替换被请求的可执行文件利用该漏洞在系统上执行代码。 漏洞类型 通用型漏洞 漏洞解决方案 目前厂商暂未发布修复措施解决此安全问题建议使用此软件的用户随时关注厂商主页或参考网址以获取解决办法 https://npmjs.org/package/pm2-kafka
1.1.3.Apache Kafka安全绕过漏洞
公开日期 2018-08-17 危害级别 中 (AV:N/AC:M/Au:N/C:P/I:P/A:N) 影响产品 Apache Kafka 0.9.0.0 0.9.0.1 Apache Kafka 0.10.0.0 0.10.2.1 Apache Kafka 0.11.0.0 0.11.0.2 Apache Kafka 1.0.0 漏洞描述 Apache Kafka是美国阿帕奇Apache软件基金会开发的一个开源的分布式流媒体平台。该平台能够获取实时数据用于构建对数据流的变化进行实时反应的应用程序。
Apache Kafka 0.9.0.0版本至0.9.0.1版本0.10.0.0版本至0.10.2.1版本0.11.0.0版本至0.11.0.2版本1.0.0版本中存在安全绕过漏洞。攻击者可通过发送特制的请求干扰到数据的复制利用该漏洞造成数据丢失。 漏洞类型 通用型漏洞 漏洞解决方案 目前已发布升级补丁以修复漏洞补丁获取链接 https://lists.apache.org/thread.html/29f61337323f48c47d4b41d74b9e452bd60e65d0e5103af9a6bb2fef%3Cusers.kafka.apache.org%3E
1.1.4.Apache Kafka Distribution - Schema Repository跨站请求伪造漏洞
公开日期 2018-12-21 危害级别 高 (AV:N/AC:H/Au:N/C:C/I:C/A:C) 影响产品 TIBCO Enterprise Edition 1.0.0 TIBCO Messaging - Apache Kafka Distribution - Schema Repository - Community Edition CVE ID CVE-2018-12413 漏洞描述 TIBCO Messaging - Apache Kafka Distribution - Schema Repository是美国TIBCO软件公司的一套应用开发、部署解决方案。
TIBCO Messaging - Apache Kafka Distribution - Schema Repository - Community Edition和Enterprise Edition1.0.0版本中的Schema repository server组件存在跨站请求伪造漏洞攻击者可利用该漏洞执行未授权操作。 漏洞类型 通用型漏洞 漏洞解决方案 厂商已发布了漏洞修复程序请及时关注更新 https://www.tibco.com/support/advisories/2018/11/tibco-security-advisory-november-6-2018-tibco-messaging-apache-kafka-distribution-schema-repository
1.1.5.Apache Kafka输入验证错误漏洞的补丁
公开日期 2019-09-29 危害级别 中 (AV:N/AC:L/Au:S/C:P/I:P/A:P) 影响产品 Apache Apache Kafka 0.11.0.02.1.0 CVE ID CVE-2018-17196 漏洞描述 Apache Kafka是美国阿帕奇Apache软件基金会的一套开源的分布式流媒体平台。该平台能够获取实时数据用于构建对数据流的变化进行实时反应的应用程序。
Apache Kafka 0.11.0.0版本至2.1.0版本中存在安全漏洞。攻击者可通过构建Produce请求利用该漏洞绕过ACL验证。 漏洞类型 通用型漏洞 漏洞解决方案 目前厂商已发布升级补丁以修复漏洞详情请关注厂商主页 https://kafka.apache.org/
1.1.6.Apache Kafka信息泄露漏洞
公开日期 2020-03-06 危害级别 中 (AV:N/AC:L/Au:N/C:P/I:N/A:N) 影响产品 Apache FusionAuth 1.11.0 CVE ID CVE-2019-12399 漏洞描述 Apache Kafka是美国阿帕奇Apache软件基金会的一套开源的分布式流媒体平台。该平台能够获取实时数据用于构建对数据流的变化进行实时反应的应用程序。 Apache Kafka中存在信息泄露漏洞。该漏洞源于网络系统或产品在运行过程中存在配置等错误。未授权的攻击者可利用漏洞获取受影响组件敏感信息。 漏洞类型 通用型漏洞 参考链接 https://nvd.nist.gov/vuln/detail/CVE-2019-12399 漏洞解决方案 目前厂商已发布升级补丁以修复漏洞详情请关注厂商主页 https://www.apache.org/
1.1.7.Apache Kafka定时攻击漏洞
公开日期 2022-02-23 危害级别 高 (AV:N/AC:M/Au:N/C:P/I:C/A:N) 影响产品 Apache Kafka 2.8.0 Apache Kafka 2.0.02.6.3 Apache Kafka 2.7.02.7.2 CVE ID CVE-2021-38153 漏洞描述 Apache Kafka是美国阿帕奇Apache软件基金会开发的一套开源分布式流媒体平台。该平台能够获取实时数据用于构建对数据流的变化进行实时反应的应用程序。
Apache Kafka部分版本中存在定时攻击漏洞。该漏洞主要由于Kafka使用了Arrays.equals组件来进行密钥或密码验证攻击者可使用该漏洞对使用此类凭证验证身份的用户进行暴力破解从而获取访问凭据并提升系统权限。 漏洞类型 通用型漏洞 漏洞解决方案 目前厂商已发布升级补丁以修复漏洞补丁获取链接 https://kafka.apache.org/cve-list
1.2.安全加固
1.2.1.安全概述
随着对kafka集群提供安全配置的呼声越来越高社区终于在0.9.0.0版本正式添加了安全特性并在0.10.0.0版本中进一步完善。目前支持以下的安全措施可以组合使用也可以分开使用。 第一链接认证机制包含服务器与客户端(生产者/消费者)链接服务器间链接以及服务器与工具间链 接。支持的认证机制包括SSL(TLS)或SASL 第二服务器与zookeeper之间的身份验证 第三基于SSL的链接通道数据传输加密 第四客户端读写操作验证 第五支持可插拔的授权服务和外部授权服务的集成 当前Kafka安全主要包含三大功能认证加密和授权下面分别介绍这三大功能。
1.2.2.使用SSL加密
1.2.2.1.为每个Kafka broker生成SSL密钥和证书 部署HTTPS第一步是为集群的每台机器生成密钥和证书可以使用java的keytool来生产。我们将生成密钥到一个临时的密钥库之后我们可以导出并用CA签名它。 keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey
你需要用上面命令来指定两个参数。
keystore: 密钥仓库存储证书文件。密钥仓库文件包含证书的私钥保证私钥的安全。validity: 证书的有效时间天
配置主机名验证 从Kafka 2.0.0版开始默认情况下客户端连接以及broker之间的连接启用了服务器的主机名验证为防止中间人攻击。 可以通过将ssl.endpoint.identification.algorithm设置为空字符串来关闭服务器主机名验证。 例如ssl.endpoint.identification.algorithm 对于动态配置的broker侦听可以使用kafka-configs.sh禁用主机名验证。 如 bin/kafka-configs.sh --bootstrap-server localhost:9093 --entity-type brokers --entity-name 0 --alter --add-config “listener.name.internal.ssl.endpoint.identification.algorithm” 对于老版本的Kafka默认情况下未定义ssl.endpoint.identification.algorithm因此不会执行主机名验证。 通过设置为HTTPS以启用主机名验证。 ssl.endpoint.identification.algorithmHTTPS 如果服务器端没有进行外部验证则必须启用主机名验证以防止中间人攻击。 在证书中配置主机名 如果启用了主机名验证客户端将根据以下两个字段之一验证服务器的完全限定域名FQDN
Common Name (CN)Subject Alternative Name (SAN) 这两个字段均有效但是RFC-2818建议使用SAN。SAN更加的灵活允许声明多个DNS条目。另一个优点是出于授权目的可以将CN设置为更有意义的值。 要添加SAN需将以下参数-ext SANDNS:{FQDN}追加到keytool命令 keytool -keystore server.keystore.jks -alias localhost -validity {validity} -genkey -keyalg RSA -ext SANDNS:{FQDN} 然后可以运行以下命令来验证生成的证书的内容 keytool -list -v -keystore server.keystore.jks
1.2.2.2.创建自己的CA 通过第一步集群中的每台机器都生成一对公私钥和一个证书来识别机器。但是证书是未签名的这意味着攻击者可以创建一个这样的证书来伪装成任何机器。 因此通过对集群中的每台机器进行签名来防止伪造的证书。证书颁发机构CA负责签名证书。CA的工作机制像一个颁发护照的政府。政府印章标志每本护照这样护照很难伪造。其他政府核实护照的印章以确保护照是真实的。同样CA签名的证书和加密保证签名证书很难伪造。因此只要CA是一个真正和值得信赖的权威client就能有较高的保障连接的是真正的机器。 openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
生成的CA是一个简单的公私钥对和证书用于签名其他的证书。 下一步是将生成的CA添加到clients’ truststore客户的信任库以便client可以信任这个CA: keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert 注意还需设置ssl.client.auth(”requested 或 required”)来要求broker对客户端连接进行验证当然你必须为broker提供信任库以及所有客户端签名了密钥的CA证书通过下面的命令 keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
相反在步骤5.2.1中密钥库存储了每个机器自己的身份。客户端的信任库存储所有客户端信任的证书将证书导入到一个信任仓库也意味着信任由该证书签名的所有证书正如上面的比喻信任政府CA也意味着信任它颁发的所有护照证书此特性称为信任链在大型的kafka集群上部署SSL时特别有用的。可以用单个CA签名集群中的所有证书并且所有的机器共享相同的信任仓库这样所有的机器也可以验证其他的机器了。
1.2.2.3.签名证书 用步骤5.2.2生成的CA来签名所有步骤5.2.1生成的证书首先你需要从密钥仓库导出证书 keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file 然后用CA签名 openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days {validity} -CAcreateserial -passin pass:{ca-password} 最后,你需要导入CA的证书和已签名的证书到密钥仓库: keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed
参数的定义如下
keystore: 密钥仓库的位置ca-cert: CA的证书ca-key: CA的私钥ca-password: CA的密码cert-file: 出口服务器的未签名证书cert-signed: 已签名的服务器证书 这是上面所有步骤的一个bash脚本例子。注意这里假设密码“test1234”。
#!/bin/bash
#Step 1
keytool -keystore server.keystore.jks -alias localhost -validity 365 -genkey
#Step 2
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
#Step 3
keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:test1234
keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed1.2.2.4.配置Kafka Broker Kafka Broker支持监听多个端口上的连接通过server.properties 配置最少监听1个端口用逗号分隔。 listeners
如果broker之间通讯未启用SSL参照下面启动它PLAINTEXT和SSL端口是必须要配置。 listenersPLAINTEXT://host.name:port,SSL://host.name:port
下面是broker端需要的SSL配置 ssl.keystore.location/var/private/ssl/server.keystore.jks ssl.keystore.passwordtest1234 ssl.key.passwordtest1234 ssl.truststore.location/var/private/ssl/server.truststore.jks ssl.truststore.passwordtest1234
注意ssl.truststore.password在是可选的但强烈建议使用。如果未设置密码则仍然可以访问信任库但就不是完整性的检查了。 其他还有一些的可选的配置
ssl.client.auth none (“required”客户端身份验证是必需的“requested”客户端身份验证请求客户端没有证书仍然可以连接。使用“requested”是纸老虎因为它提供了一种虚假的安全感错误的配置客户端仍将连接成功。)ssl.cipher.suites可选。密码套件是利用TLS或SSL网络协议的网络连接的安全设置。是认证加密MAC和密钥交换算法的组合。 默认值是一个空表ssl.enabled.protocols TLSv1.2 TLSv1.1 TLSv1 (接收来自客户端列出的SSL协议注意不推荐在生产中使用SSL推荐使用TLS)。ssl.keystore.typeJKSssl.truststore.typeJKSssl.secure.random.implementationSHA1PRNG
如果你想启用SSL用于broker内部通讯将以下内容添加到broker配置文件默认是PLAINTEXT security.inter.broker.protocolSSL
由于一些国家的进口规定oracle的实现限制了默认的加密算法的强度。如果需要更强的算法例如AES 256位密钥该JCE Unlimited Strength Jurisdiction Policy Files必须获得并安装JDK和JRE。更多信息参见JCA文档。 JRE/JDK有一个默认伪随机数生成者PRNG,用于加密操作。因此不需要用实现。 然而有些实现存在性能问题尤其是在Linux系统选择的默认值NativePRNG,使用全局锁。在SSL连接的性能出现问题的情况下请考虑明确设置要使用的实现。 如SHA1PRNG实现是非阻塞的并且在高负载50MB/秒的生成消息以及每个代理的复制流量下显示出非常好的性能特征。 ssl.secure.random.implementation
一旦你启动broker你应该就能在server.log看到 with addresses: PLAINTEXT - EndPoint(192.168.64.1,9092,PLAINTEXT),SSL - EndPoint(192.168.64.1,9093,SSL)
用以下命令快速验证服务器的keystore和truststore设置是否正确 -----BEGIN CERTIFICATE-----{variable sized random bytes}-----END CERTIFICATE-----subject/CUS/STCA/LSanta Clara/Oorg/OUorg/CNSriharsha Chintalapaniissuer/CUS/STCA/LSanta Clara/Oorg/OUorg/CNkafka/emailAddresstesttest.com(注意: TLSv1 应列出 ssl.enabled.protocols) 在命令的输出中你应该能看到服务器的证书: openssl s_client -debug -connect localhost:9093 -tls1
如果证书没有出现或者有任何其他错误信息那么你的keystore设置不正确。 1.2.2.5.配置kafka 客户端 SSL仅支持新的Producer和Consumer不支持老的APIProducer和Consumer的SSL的配置是相同的。 如果broker中不需要client客户端验证那么下面是最小的配置示例 security.protocolSSL ssl.truststore.location/var/private/ssl/client.truststore.jks ssl.truststore.passwordtest1234
注意ssl.truststore.password 在技术上是可选的但强烈推荐。 如果未设置密码对信任库的访问仍然可用就不属于完整性检查。 如果需要客户端认证则必须像步骤1一样创建密钥库并且还必须配置以下内容 ssl.keystore.location/var/private/ssl/client.keystore.jks ssl.keystore.passwordtest1234 ssl.key.passwordtest1234
也可以根据我的需求在broker上设置其他的配置
ssl.provider (可选的). 用于SSL连接的安全提供程序名称默认值是JVM的默认的安全提供程序。ssl.cipher.suites (可选).密码套件是身份验证、 加密、 MAC 和密钥交换算法用于协商使用 TLS 或 SSL 网络协议的网络连接的安全设置的已命名的组合。ssl.enabled.protocolsTLSv1.2TLSv1.1TLSv1。broker配置协议至少一个。ssl.truststore.typeJKSssl.keystore.typeJKS
举个console-produce和console-consumer的例子
kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties
kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config 1.2.3.使用SASL/PLAIN认证
SASL/PLAIN是一种简单的用户名/密码的认证机制通常与TLS加密一起使用以实现安全的认证。Kafka支持SASL/PLAIN的默认实现可作为生产者的扩展使用。 username用作ACL等配置已认证的Principal。
1.2.3.1.配置Kafka Brokers 在每个Kafka broker的config目录下添加一个类似于下面的修改后的JAAS文件我们姑且将其称为kafka_server_jaas.conf。
KafkaServer { org.apache.kafka.common.security.plain.PlainLoginModule requiredusernameadminpasswordadmin-secretuser_adminadmin-secretuser_alicealice-secret;};此配置定义了2个用户admin 和 alice。 在KafkaServer中username和password是broker用于初始化连接到其他的broker在这个例子中admin是broker之间通信的用户。user_userName定义了连接到broker的所有用户的密码broker使用这些来验证所有客户端的连接包括来自其他的broker的连接。 将JAAS配置文件位置作为JVM参数传递给每个Kafka broker
Djava.security.auth.login.config/etc/kafka/kafka_server_jaas.conf在server.properties中配置SASL端口和SASL机制。 例如
listenersSASL_SSL://host.name:port
security.inter.broker.protocolSASL_SSL
sasl.mechanism.inter.broker.protocolPLAIN
sasl.enabled.mechanismsPLAIN1.2.3.2.配置Kafka客户端 在客户端上配置SASL身份验证 为producer.properties或consumer.properties中的每个客户端配置JAAS。登录模块展示了客户端如何连接Broker的和生产者和消费者一样。以下是PLAIN机制的客户端的示例配置
sasl.jaas.configorg.apache.kafka.common.security.plain.PlainLoginModule required \usernamealice \passwordalice-secret;客户端选择用户名和密码为客户端配置连接的用户。 在此示例中客户端以用户alice连接到broker。也可以通过在sasl.jaas.config中指定不同的用户名和密码JVM中的不同客户端可以根据不同的用户来进行连接。 客户端的JAAS配置可以指定为类似于这里描述的broker作为JVM参数。客户端使用的命名为KafkaClient。 此选项仅允许来自JVM的所有客户端连接中的一个用户。
在producer.properties或consumer.properties中配置以下属性 security.protocolSASL_SSLsasl.mechanismPLAIN1.2.3.3.在生产者中使用SASL/PLAIN
SASL/PLAIN应仅用SSL作为传输层以确保在没有加密的情况下不会在线上明文传输。
Kafka中SASL / PLAIN的默认实现在JAAS配置文件中指定用户名和密码如下所示。 从Kafka 2.0版开始您可以通过使用配置sasl.server.callback.handler.class和sasl.client.callback.handler.class配置自己的回调处理程序来从外部源获取用户名和密码从而避免在磁盘上存储明文密码。
在生产系统中外部认证服务器可以实现密码认证。从Kafka 2.0版开始可以通过配置sasl.server.callback.handler.class使用外部身份验证服务器进行密码验证的自己的回调处理程序。
1.2.4.ACL授权 kafka附带一个可插拔的ACLAccess Control List 访问控制列表它使用zookeeper来存储。通过在server.properties中设置authorizer.class.name来启用 authorizer.class.namekafka.security.auth.SimpleAclAuthorizer
Kafka acls的格式为 Principal P is [Allowed/Denied] Operation O From Host H On Resource R”你可以使用Kafka authorizer CLI 来添加删除或查询所有acl。默认情况下如果ResourcePatterns与特定的资源R没有匹配则除了超级用户之外都不允许访问R。如果要更改该行为可以在server.properties中包含以下内容。 allow.everyone.if.no.acl.foundtrue
你也可以在server.properties添加超级用户像这样注意分隔符是分号因为SSL的用户名是逗号。 super.usersUser:Bob;User:Alice
默认情况下SSL用户名的格式为“CNwriteuser,OUUnknown,OUnknown,LUnknown,STUnknown,CUnknown”。可以通过在server.properties中设置自定义的PrincipalBuilder来改变它如下所示: principal.builder.classCustomizedPrincipalBuilderClass
可以通过修改server.properties中的sasl.kerberos.principal.to.local.rules自定义规则。sasl.kerberos.principal.to.local.rules的格式是一个列表其中每个规则的工作方式与Kerberos 配置文件 (krb5.conf)中的auth_to_local相同。 也支持小写规则可通过在规则的末尾添加“/L”强制转移全部结果为小写。每个规则都以RULE开头并包含一个表达式格式如下。 有关更多详细信息请参阅kerberos文档。
RULE:[n:string](regexp)s/pattern/replacement/
RULE:[n:string](regexp)s/pattern/replacement/g
RULE:[n:string](regexp)s/pattern/replacement//L
RULE:[n:string](regexp)s/pattern/replacement/g/L举个例子添加规则将userMYDOMAIN.COM转换为用户同时保持默认规则示例如下
sasl.kerberos.principal.to.local.rulesRULE:[1:$1$0](.MYDOMAIN.COM)s/.//,DEFAULT1.2.4.1.命令行界面
Kafka认证管理CLI和其他的CLI脚本可以在bin目录中找到。CLI脚本名是kafka-acls.sh。以下列出了所有脚本支持的选项
选项描述默认类型选择add添加一个aclActionremove移除一个aclActionList列出allActionauthorizerauthorizer的完全限定类名kafka.security.auth.SimpleAclAuthorizerConfigurationauthorizer-propertieskeyval传给authorizer进行初始化例如zookeeper.connectlocalhost:2181Configurationcluster指定集群作为资源Resourcetopic [topic-name]指定topic作为资源Resourcegroup [group-name]指定 consumer-group 作为资源Resourceallow-principal添加到允许访问的ACL中Principal是PrincipalType:name格式。你可以指定多个。Principaldeny-principal添加到拒绝访问的ACL中Principal是PrincipalType:name格式。你可以指定多个。Principalallow-host–allow-principal中的principal的IP地址允许访问。如果–allow-principal指定的默认值是*则意味着指定“所有主机”Hostdeny-host允许或拒绝的操作。有效值为读写创建删除更改描述ClusterAction全部ALLOperationoperation–deny-principal中的principals的IP地址拒绝访问如果 --deny-principal指定的默认值是 * 则意味着指定 “所有主机”Hostconsumer为consumer role添加/删除acl生成acl允许在topic上READ, DESCRIBE 和 consumer-group上READ。Convenienceforce假设所有操作都是yes规避提示Convenience
1.2.4.2.使用案例
添加acl 假设你要添加一个acl “以允许x.x.x.0和x.x.x.1Principal为User:Bob和User:Alice对主题是Test-Topic有Read和Write的执行权限” 。可通过以下命令实现
bin/kafka-acls.sh --authorizer-properties zookeeper.connectlocalhost:2181 --add --allow-principal User:Bob --allow-principal User:Alice --allow-host x.x.x.0 --allow-host x.x.x.1 --operation Read --operation Write --topic Test-topic默认情况下所有的principal在没有一个明确的对资源操作访问的acl都是拒绝访问的。在极少的情况下acl允许访问所有的资源但一些principal我们可以使用 --deny-principal 和 --deny-host来拒绝访问。例如如果我们想让所有用户读取Test-topic只拒绝IP为x.x.x.3的User:BadBob我们可以使用下面的命令:
bin/kafka-acls.sh --authorizer-properties zookeeper.connectlocalhost:2181 --add --allow-principal User: --allow-host --deny-principal User:BadBob --deny-host x.x.x.3 --operation Read --topic Test-topic需要注意的是–allow-host和deny-host仅支持IP地址主机名不支持。上面的例子中通过指定–topic [topic-name]作为资源选项添加ACL到一个topic。同样用户通过指定–cluster和通过指定–group [group-name]消费者组添加ACL。
删除acl 删除和添加是一样的—add换成–remove选项要删除第一个例子中添加的可以使用下面的命令
bin/kafka-acls.sh --authorizer-properties zookeeper.connectlocalhost:2181 --remove --allow-principal User:Bob --allow-principal User:Alice --allow-host x.x.x.0 --allow-host x.x.x.1 --operation Read --operation Write --topic Test-topicacl列表
bin/kafka-acls.sh --authorizer-properties zookeeper.connectlocalhost:2181 --list --topic Test-topic添加或删除作为生产者或消费者的principal Acl管理添加/移除一个生产者或消费者principal是最常见的使用情况所以我们增加更便利的选项处理这些情况。为主题Test-topic添加一个生产者User:Bob我们可以执行以下命令
bin/kafka-acls.sh --authorizer-properties zookeeper.connectlocalhost:2181 --add --allow-principal User:Bob --producer --topic Test-topic同样添加Alice作为主题Test-topic的消费者用消费者组为Group-1我们只用 --consumer 选项
bin/kafka-acls.sh --authorizer-properties zookeeper.connectlocalhost:2181 --add --allow-principal User:Bob --consumer --topic test-topic --group Group-1注意消费者的选择我们还必须指定消费者组。从生产者或消费者角色删除主体我们只需要通过–remove选项。