-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkafka-tuning
335 lines (249 loc) · 14.1 KB
/
kafka-tuning
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
Loss of an EBS volume
start producing messages and observing topic
detach EBS volume
check Kafka Broker
check producer / consumer / partitions
terminate Kafka instance
recreate EBS and EC2
check after recreation of EBS and EC2
/kafka/etc/producer-saslssl.properties :
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.truststore.location=/kafka/ssl/kafka.client.truststore.jks
ssl.truststore.password=<<grab-from-1password>>
bootstrap.servers=kafka-0.some.domain:9095,kafka-1.some.domain:9095,kafka-2.some.domain:9095
SASL_PLAINTEXT for Kafka Brokers
SASL_SSL connection to Kafka Brokers
SASL between Zookeeper servers
---
DR
https://github.com/hiimivantang/kafka-dr-runbook-example/blob/03ea0c1b27f618fea8799259df47a326cefbacfa/steps/start_broker.adoc
https://docs.confluent.io/platform/current/kafka/rebalancer/index.html#decommissioning-brokers
https://github.com/shanewillcocks/venafi_api_integration/blob/main/generate_cert.yml
https://github.com/Venafi/ansible-collection-venafi/tree/main
# Define your AWS provider configuration
provider "aws" {
region = "us-east-1" # Replace with your desired AWS region
}
----
Enable monitoring on partition health, especially for unhealthy partitions. You mentioned using Splunk for monitoring.
bin/kafka-leader-election.sh --bootstrap-server 127.0.0.1:9093 --admin.config
config/consumer.properties --topic Correlation-Store --partition 19 --election-type UNCLEAN
vim bin/kafka-console-consumer-alice.sh
exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_alice.conf kafka.tools.ConsoleConsumer "$@"
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2182 --add --allow-principal User:alice --operation All --topic xiaoming
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2182 --add --allow-principal User:alice --operation All --group test-consumer-group
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2182 --add --allow-principal User:alice --operation All --group apple-group-03
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2182 --list
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2182 --remove --allow-principal User:alice --operation Read --topic xiaoming --allow-host *
./bin/kafka-console-producer-alice.sh --broker-list 192.168.197.128:9092 --topic xiaoming --producer.config config/producer.properties
./bin/kafka-console-consumer-alice.sh --bootstrap-server 192.168.197.128:9092 --from-beginning --topic xiaoming --consumer.config config/consumer.properties
./bin/kafka-configs.sh --zookeeper localhost:2182 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=alice-s…
./bin/kafka-configs.sh --zookeeper localhost:2182 --alter --add-config 'SCRAM-SHA-256=[password=admin-secret],SCRAM-SHA…
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=loca…
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=loca…
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=loca…
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=loca…
./bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=loc
---
Add an ACL to Give `kafkauser` Read and Write Access to the `inventory_purchases` Topic
Create the ACL.
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:kafkauser --operation read --operation write --topic inventory_purchases
Verify that the read access works by consuming from the topic.
kafka-console-consumer --bootstrap-server zoo1:9093 --topic inventory_purchases --from-beginning --consumer.config client-ssl.properties
Verify that the write access works by writing data to the topic.
kafka-console-producer --broker-list zoo1:9093 --topic inventory_purchases --producer.config client-ssl.properties
Remove All Existing ACLs for the `member_signups` Topic
List the ACLs for the topic.
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --topic member_signups --list
Remove the existing ACL for the topic.
kafka-acls --authorizer-properties zookeeper.connect=localhost:2181 --topic member_signups --remove
Verify that you can read from the topic as kafkauser.
kafka-console-consumer --bootstrap-server zoo1:9093 --topic member_signups --from-beginning --consumer.config client-ssl.properties
# Einmal im Zookeeper Container:
export KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka/secrets/zookeeper_broker_client_jaas.conf"
kafka-configs --zookeeper zk-1:2181 --describe --entity-type users
kafka-configs --zookeeper zk-1:2181 --alter --add-config 'SCRAM-SHA-256=[password=broker-secret]' --entity-type users --entity-name broker
kafka-configs --zookeeper zk-1:2181 --alter --add-config 'SCRAM-SHA-256=[password=metricsreporter-secret]' --entity-type users --entity-name metricsreporter
kafka-configs --zookeeper zk-1:2181 --alter --add-config 'SCRAM-SHA-256=[password=c3-secret]' --entity-type users --entity-name c3
kafka-configs --zookeeper zk-1:2181 --alter --add-config 'SCRAM-SHA-256=[password=kafkaclient-secret]' --entity-type users --entity-name kafkaclient
kafka-configs --zookeeper zk-1:2181 --describe --entity-type users
# Wenn der User (hier kafkaclient) kein Super-User ist und alle Rechte bekommen soll
kafka-acls --authorizer-properties zookeeper.connect=zk-1:2181 --list
kafka-acls --authorizer-properties zookeeper.connect=zk-1:2181 --add --cluster --allow-principal User:kafkaclient --operation All --topic '*' --group '*'
kafka-acls --authorizer-properties zookeeper.connect=zk-1:2181 --add --cluster --allow-principal User:c3 --operation All --topic '*' --group '*'
-----------
Mitigation: Monitor disk I/O metrics and consider using high-performance storage solutions. Optimize Kafka configuration, such as adjusting batch sizes and compression settings.
---
For the "multi-region-async" topic, partition 0:
docker compose exec broker-east-4 kafka-leader-election --bootstrap-server broker-east-4:19094 --election-type UNCLEAN --topic multi-region-async --partition 0
For the "multi-region-default" topic, partition 0:
docker compose exec broker-east-4 kafka-leader-election --bootstrap-server broker-east-4:19094 --election-type UNCLEAN --topic multi-region-default --partition 0
These commands are attempting to initiate an unclean leader election for the specified topics and partitions on the Kafka broker running on broker-east-4 with the given bootstrap server and election type.
------
num.network.threads
Create a Processor to process the number of network request threads. It is recommended to set it to the current number
of CPU cores of the Broker * 2. If this value is too low, the network idleness will often be too low and replicas will
be missing.
num.io.threads
Create a KafkaRequestHandler to handle the specific number of request threads. It is recommended to set it to the number
of Broker disks * 2.
num.replica.fetchers
It is recommended to set it to the number of CPU cores/4. Appropriate increase can improve the CPU utilization and
the parallelism
of Follower synchronization Leader data.
compression.type
It is recommended to use the lz4 compression type. Compression can improve CPU utilization and reduce the amount of
data transmitted over the network.
log.flush.xxx log.flush.scheduler.interval.ms
log.flush.interval.ms
log.flush.interval.messages
These parameters represent the strategy for flushing log data to the disk. The default configuration should be maintained.
The flushing strategy is completed by the operating system, which decides when to flush the data. If this parameter is set,
it may affect the throughput. Very big
-----------
provisioner "remote-exec" {
inline = [
"sudo systemctl daemon-reload",
"sudo systemctl enable zookeeper",
"sudo systemctl restart zookeeper"
"sudo systemctl daemon-reload",
"sudo systemctl enable kafka",
"sudo systemctl restart kafka"
---
output "kafka_private_dns" {
value = ["${aws_instance.kafka.*.private_dns}"]
}
output "kafka_elb_dns" {
value = "${aws_route53_record.kafka_elb.fqdn}"
}
output "zk_elb_dns" {
value = "${aws_route53_record.zk_elb.fqdn}"
}
output "kafka_fqdn_dns" {
value = ["${aws_route53_record.kafka.*.fqdn}"]
}
output "zookeeper_private_dns" {
value = ["${aws_instance.zookeeper.*.private_dns}"]
}
output "zookeeper_fqdn_dns" {
value = ["${aws_route53_record.zookeeper.*.fqdn}"]
}
-----------
Several scenarios that trigger partition leader election:
Offline:
Create a new partition or a partition loses its existing leader
Reassign:
User performs reallocation operation
PreferredReplica:
Migrate the leader back to the preferred replica
ControlledShutdown:
The existing leader of the partition is about to go offline
---------------
# Create a VPC, subnets, security groups, IAM roles, etc.
# ...
# Create Kafka brokers and ZooKeeper nodes using EC2 instances
resource "aws_instance" "kafka_broker" {
count = 3
# Define Kafka broker instance configurations here
}
resource "aws_instance" "zookeeper" {
count = 2
# Define ZooKeeper instance configurations here
}
# Define shell scripts for managing topics
locals {
create_topic_script = <<-EOT
#!/bin/bash
KAFKA_HOME=/path/to/kafka # Set to the Kafka installation directory
ZOOKEEPER_CONNECT=zookeeper1:2181,zookeeper2:2181 # Replace with your ZooKeeper connection string
TOPIC_NAME=$1
${KAFKA_HOME}/bin/kafka-topics.sh --create --topic ${TOPIC_NAME} --partitions 3 --replication-factor 2 --zookeeper ${ZOOKEEPER_CONNECT}
EOT
delete_topic_script = <<-EOT
#!/bin/bash
KAFKA_HOME=/path/to/kafka # Set to the Kafka installation directory
ZOOKEEPER_CONNECT=zookeeper1:2181,zookeeper2:2181 # Replace with your ZooKeeper connection string
TOPIC_NAME=$1
${KAFKA_HOME}/bin/kafka-topics.sh --delete --topic ${TOPIC_NAME} --zookeeper ${ZOOKEEPER_CONNECT}
EOT
}
# Execute create_topic.sh on one of the Kafka broker instances
resource "null_resource" "create_topic" {
triggers = {
always_run = "${timestamp()}"
}
provisioner "remote-exec" {
inline = [
"echo '${local.create_topic_script}' > /tmp/create_topic.sh",
"chmod +x /tmp/create_topic.sh",
"/tmp/create_topic.sh my_topic", # Replace my_topic with the topic name
]
connection {
host = element(aws_instance.kafka_broker.*.private_ip, 0) # Use the first Kafka broker
type = "ssh"
user = "ec2-user" # Adjust as needed
private_key = file("/path/to/your/key.pem") # Replace with your SSH private key
}
}
}
# Execute delete_topic.sh on one of the Kafka broker instances
resource "null_resource" "delete_topic" {
triggers = {
always_run = "${timestamp()}"
}
provisioner "remote-exec" {
inline = [
"echo '${local.delete_topic_script}' > /tmp/delete_topic.sh",
"chmod +x /tmp/delete_topic.sh",
"/tmp/delete_topic.sh my_topic_to_delete", # Replace my_topic_to_delete with the topic name to delete
]
connection {
host = element(aws_instance.kafka_broker.*.private_ip, 0) # Use the first Kafka broker
type = "ssh"
user = "ec2-user" # Adjust as needed
private_key = file("/path/to/your/key.pem") # Replace with your SSH private key
}
}
}
# Define outputs or additional configurations as needed
--------
export KAFKA_OPTS--Djava.security.auth.login.config-dev_producer_ jaas.conf
export BootstrapBrokerStringĪls2-"b-1.efos-kafka-dev2.dss95y.c3.kafka.ap-east-
1.amazonaws.com:9096,b-2.efos-kafka-dev2.dss95y.c3.kafka.ap-east-1.amazonaws.com:9096"
kafka _2.12-2.7.0/bin/kafka-topics.sh --bootstrap-server $BootstrapBrokerStringTls2
ommand-config client_sasl.properties --replication-factor 2 --partitions 1 --create
topic topic vi watchlist request
xport KAFKA OPTS=-Djava.securityauth.login.config-dev_producer_ jaas.conf
export BootstrapBrokerStringTls2-"b-1.efos-kafka-dev2.dss95y.c3.kafka.ap-east
1.amazonaws com:9096,b-2.efos-kafka-dev2.dss95y.c3.kafka.ap-east-1.amazonaws.com:9096"
kafka_2.12-2.7.0/bin/kafka-topics.sh --bootstrap-server $BootstrapBrokerStringTls2
command-config client_sasl.properties --replication-factor 2 --partitions 1 --delete
topic topic v watchlist request
kafka 2.12-2] 0/bin/kafka-topics.sh --bootstrap-server $BootstrapBrokerStringTls2
ommand-config client_sasl.properties --replication-factor 2 --partitions 1 --delete
opic txStreamingiesposne
// Read the file content
var filePath = '/path/to/your/file.txt';
var fileContent = fs.readFile(filePath);
// Encode the file content as Base64
var encodedContent = fs.encodeBase64(fileContent);
// Create the MIME message
var mimeMessage = 'Content-Type: application/octet-stream\r\n';
mimeMessage += 'Content-Disposition: attachment; filename="file.txt"\r\n\r\n';
mimeMessage += encodedContent;
// Now you can use the 'mimeMessage' as part of your request or message
console.log("Starting...");
const start ppppp
var delayInSeconds = 5; // Adjust this value to set your desired delay in seconds
// Calculate the delay in milliseconds
var delayInMilliseconds = delayInSeconds * 1000;
// Sleep function to introduce a delay
function sleep(ms) {
var start = new Date().getTime();
while (new Date().getTime() - start < ms) {}
}
console.log("Starting...");
// Introduce a delay of 'delayInMilliseconds' milliseconds
sleep(delayInMilliseconds);
console.log("After " + delayInSeconds + " seconds, this will be executed.");