#!/bin/bash
#kafka集群地址: ip/主机名:9092
KAFKA_CLUSTER=10.10.2.234:9092
#kafka bin目录
KAFKA_HOME_BIN=/opt/kafka_2.12-2.2.2/bin
#分区数量
PARTITIONS=5
#副本数量
REPLICATIONFACTOR=2
#脚本路径
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"
#参数判断不能小于1
if [ $# -lt 1 ]; then
echo -e "\033[31mAt least two parameters \033[0mare required"
sh /${SCRIPT_DIR}/kafka --help
exit
fi
#打印传入的参数
#echo "============Args: $1 $2 ================="
#输入错误参数,报错且返回--help内容
errorFun(){
echo -e "\033[31mInput Args Error!!!!\033[0m"
sh /${SCRIPT_DIR}/kafka --help
}
case $1 in
#脚本用法--help
--help)
echo " Usage: kafka [OPTIONS] SUBCOMMAND [SUBCOMMAND OPTIONS]
OPTIONS is none or any of:
-l or list <-t> <-g> list all topics/groups
-c or create [partitions] [replication-factor] create default topic or custom topic
-d or desc [-t ] [-g ] describe all topics/topic/group
-del or delete <-t > <-g > delete topic/group
-p or producer producer data to topic
-con or consumer -e==earliest -l==latest consumer data from earliest or latest
-r or reset reset all partitions offset to earliest,reset all partitions
offset to optional position,reset one of partitions offset to
optional position
"
exit 0
;;
#创建topic
create|-c)
if [ $# -eq 2 ]; then
${KAFKA_HOME_BIN}/kafka-topics.sh --create --topic "$2" \
--bootstrap-server ${KAFKA_CLUSTER} \
--partitions ${PARTITIONS} \
--replication-factor ${REPLICATIONFACTOR}
elif [ $# -eq 4 ]; then
if [ ! -n "$(echo $3| sed -n "/^[0-9]\+$/p")" ]||[ ! -n "$(echo $4| sed -n "/^[0-9]\+$/p")" ];then
errorFun
else
${KAFKA_HOME_BIN}/kafka-topics.sh --create --topic "$2" \
--bootstrap-server ${KAFKA_CLUSTER} \
--partitions "$3" \
--replication-factor "$4"
fi
#if [ $# != 2 ]&&[ $# != 4 ];then
else
errorFun
fi
;;
#列出当前kafka中的topic和消费组
list|-l)
if [ $# -eq 2 ]; then
if [ "$2" == "-t" ];then
${KAFKA_HOME_BIN}/kafka-topics.sh --list \
--bootstrap-server ${KAFKA_CLUSTER}
elif [ "$2" == "-g" ];then
${KAFKA_HOME_BIN}/kafka-consumer-groups.sh --list \
--bootstrap-server ${KAFKA_CLUSTER}
else
errorFun
fi
else
#if [ $# != 2 ]&&[ "$2" != "-t" ]&&[ "$2" != "-g" ]; then
echo -e "At least \033[31mtwo parameters \033[0mare required"
sh /${SCRIPT_DIR}/kafka --help
fi
;;
#查询所有topic详情或者单个topic的详情
desc|-d)
if [ $# -eq 1 ]; then
${KAFKA_HOME_BIN}/kafka-topics.sh --describe \
--bootstrap-server ${KAFKA_CLUSTER}
elif [ $# -eq 3 ]; then
if [ "$2" == "-t" ];then
${KAFKA_HOME_BIN}/kafka-topics.sh --describe \
--bootstrap-server ${KAFKA_CLUSTER} \
--topic "$3"
elif [ "$2" == "-g" ];then
${KAFKA_HOME_BIN}/kafka-consumer-groups.sh --describe --group "$3" \
--bootstrap-server ${KAFKA_CLUSTER}
else
errorFun
fi
else
errorFun
fi
;;
#生产数据
producer|-p)
if [ $# -eq 2 ]; then
${KAFKA_HOME_BIN}/kafka-console-producer.sh --topic "$2" \
--broker-list ${KAFKA_CLUSTER}
else
errorFun
fi
;;
#消费数据
consumer|-con)
if [ $# -eq 4 ]; then
if [ "$2" == "-e" ]; then
${KAFKA_HOME_BIN}/kafka-console-consumer.sh --topic "$3" \
--bootstrap-server ${KAFKA_CLUSTER} \
--from-beginning \
--group "$4"
elif [ "$2" == "-l" ];then
${KAFKA_HOME_BIN}/kafka-console-consumer.sh --topic "$3" \
--bootstrap-server ${KAFKA_CLUSTER} \
--group "$4"
fi
else
errorFun
fi
;;
#重置偏离量
-r)
if [ $# -eq 3 ]; then
${KAFKA_HOME_BIN}/kafka-consumer-groups.sh --topic "$2" --group "$3" --reset-offsets --to-earliest --execute \
--bootstrap-server ${KAFKA_CLUSTER}
elif [ $# -eq 4 ]&&[ -n "$(echo $4| sed -n "/^[0-9]\+$/p")" ]; then
${KAFKA_HOME_BIN}/kafka-consumer-groups.sh --topic "$2" --group "$3" --reset-offsets --to-offset "$4" --execute \
--bootstrap-server ${KAFKA_CLUSTER}
else
errorFun
fi
;;
#逻辑删除topic或消费组
-del|delete)
if [ $# -eq 3 ]; then
if [ "$2" == "-t" ];then
${KAFKA_HOME_BIN}/kafka-topics.sh --delete --topic "$3" \
--bootstrap-server ${KAFKA_CLUSTER}
elif [ "$2" == "-g" ];then
${KAFKA_HOME_BIN}/kafka-consumer-groups.sh --delete --group "$3" \
--bootstrap-server ${KAFKA_CLUSTER}
else
errorFun
fi
else
echo -e "At least \033[31mtwo parameters \033[0mare required"
sh /${SCRIPT_DIR}/kafka --help
fi
;;
*)
errorFun
;;
esac
#!/bin/bash
kafka_topic_bin=/opt/kafka/bin/kafka-topics.sh
kafka_consumer_bin=/opt/kafka/bin/kafka-consumer-groups.sh
zks="127.0.0.1:2181"
servers="127.0.0.1:9092"
###创建topic
createTopic(){
topicName=$2
sh $kafka_topic_bin --bootstrap-server $servers --create --topic $topicName --partitions 3 --replication-factor 2
echo ">>>>>> create topic: $topicName success."
##--config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1
}
###查看topic
descTopic(){
descType=$2
topicName=$3
if [ "${descType}" = "list" ];then
sh $kafka_topic_bin --bootstrap-server $servers --list
elif [ "${descType}" = "describe" ];then
sh $kafka_topic_bin --bootstrap-server $servers --describe --topic $topicName
else
echo "------query error,please check scripts params define."
fi
}
###删除topic
deleteTopic(){
topicName=$2
sh $kafka_topic_bin --zookeeper $zks --delete --topic $topicName
echo ">>>>>> delete topic: $topicName success."
}
###修改topic
alterTopic(){
$topicName=$2
$config=$3
sh $kafka_topic_bin --zookeeper $zks --alter --topic $topicName $config
#sh $kafka_topic_bin --zookeeper $zks --alter --topic $topicName --config max.message.bytes=128000
#sh $kafka_topic_bin --zookeeper $zks --alter --topic $topicName --delete-config max.message.bytes
#Kafka分区数量只允许增加,不允许减少
#sh $kafka_topic_bin --zookeeper $zks --alter --topic $topicName --partitions 5
echo ">>>>>> alter topic: $topicName success."
}
##查看消费组
consumer(){
optype=$2
consumer_group=$3
if [ "${optype}" = "describe" ];then
$kafka_consumer_bin --bootstrap-server "${servers}" --group "${consumer_group}" --describe
elif [ "${optype}" = "delete" ];then
$kafka_consumer_bin --bootstrap-server "${servers}" --group "${consumer_group}" --delete
echo "...."
else
echo ">>>>>> consumer params error, please check script define."
fi
}
#kafka-console-producer.sh --broker-list localhost:9092 --topic test
#kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
case $1 in
create)
createTopic $1 $2;;
query)
descTopic $1 $2 $3;;
delete)
deleteTopic $1 $2;;
alter)
alterTopic $1 $2 $3;;
consumer)
consumer $1 $2 $3;;
*)
echo "Usage: $0 (create|desc|del|alter|consumerDesc).";;
esac
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)