kafka 命令封装脚本

kafka 命令封装脚本,第1张

#!/bin/bash
#kafka集群地址: ip/主机名:9092
KAFKA_CLUSTER=10.10.2.234:9092
#kafka bin目录
KAFKA_HOME_BIN=/opt/kafka_2.12-2.2.2/bin
#分区数量
PARTITIONS=5
#副本数量
REPLICATIONFACTOR=2
#脚本路径
SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd)"

#参数判断不能小于1
if [ $# -lt 1 ]; then
  echo -e "\033[31mAt least two parameters \033[0mare required"
  sh /${SCRIPT_DIR}/kafka --help
  exit
fi
#打印传入的参数
#echo "============Args: $1 $2 ================="


#输入错误参数,报错且返回--help内容
errorFun(){
    echo -e "\033[31mInput Args Error!!!!\033[0m"
    sh /${SCRIPT_DIR}/kafka --help
}


case $1 in
#脚本用法--help
--help)
    echo " Usage: kafka [OPTIONS] SUBCOMMAND [SUBCOMMAND OPTIONS]
	     
        OPTIONS is none or any of:
          -l or list        <-t> <-g>                                        list all topics/groups
	  -c or create       [partitions] [replication-factor]    create default topic or custom topic
          -d or desc        [-t ] [-g ]                describe all topics/topic/group
          -del or delete    <-t > <-g >                delete topic/group
          -p or producer                                          producer data to topic  
          -con or consumer  -e==earliest -l==latest    consumer data from earliest or latest
          -r or reset                       reset all partitions offset to earliest,reset all partitions 
                               offset to optional position,reset one of partitions offset to 
                                                                             optional position
         "
    exit 0
    ;;
	
#创建topic
create|-c)
  if [ $# -eq 2 ]; then
    ${KAFKA_HOME_BIN}/kafka-topics.sh --create --topic "$2" \
    --bootstrap-server ${KAFKA_CLUSTER} \
    --partitions ${PARTITIONS} \
    --replication-factor ${REPLICATIONFACTOR}
  elif [ $# -eq 4 ]; then
    if [ ! -n "$(echo $3| sed -n "/^[0-9]\+$/p")" ]||[ ! -n "$(echo $4| sed -n "/^[0-9]\+$/p")" ];then
      errorFun
    else
      ${KAFKA_HOME_BIN}/kafka-topics.sh --create --topic "$2" \
      --bootstrap-server ${KAFKA_CLUSTER} \
      --partitions "$3" \
      --replication-factor "$4"
    fi
  #if [ $# != 2 ]&&[ $# != 4 ];then
  else
    errorFun
  fi
  ;;
  
#列出当前kafka中的topic和消费组
list|-l)
  if [ $# -eq 2 ]; then
    if [ "$2" == "-t" ];then
      ${KAFKA_HOME_BIN}/kafka-topics.sh --list \
      --bootstrap-server ${KAFKA_CLUSTER}
    elif [ "$2" == "-g" ];then
      ${KAFKA_HOME_BIN}/kafka-consumer-groups.sh --list \
       --bootstrap-server ${KAFKA_CLUSTER}  
    else
      errorFun 
    fi
  else
  #if [ $# != 2 ]&&[ "$2" != "-t" ]&&[ "$2" != "-g" ]; then
    echo -e "At least \033[31mtwo parameters \033[0mare required"
    sh /${SCRIPT_DIR}/kafka --help
  fi
  ;;
  
#查询所有topic详情或者单个topic的详情
desc|-d)
  if [ $# -eq 1 ]; then
    ${KAFKA_HOME_BIN}/kafka-topics.sh --describe \
    --bootstrap-server ${KAFKA_CLUSTER}
  elif [ $# -eq 3 ]; then
    if [ "$2" == "-t" ];then
      ${KAFKA_HOME_BIN}/kafka-topics.sh --describe \
      --bootstrap-server ${KAFKA_CLUSTER} \
      --topic "$3"
    elif [ "$2" == "-g" ];then
      ${KAFKA_HOME_BIN}/kafka-consumer-groups.sh --describe --group "$3" \
      --bootstrap-server ${KAFKA_CLUSTER} 
    else
      errorFun
    fi
  else
     errorFun
  fi
  ;;
  
#生产数据
producer|-p)
  if [ $# -eq 2 ]; then
    ${KAFKA_HOME_BIN}/kafka-console-producer.sh --topic "$2" \
    --broker-list ${KAFKA_CLUSTER}
  else
    errorFun
  fi
  ;;
  
#消费数据
consumer|-con)
  if [ $# -eq 4 ]; then
    if [ "$2" == "-e" ]; then
      ${KAFKA_HOME_BIN}/kafka-console-consumer.sh --topic "$3" \
      --bootstrap-server ${KAFKA_CLUSTER} \
      --from-beginning \
      --group "$4"
    elif [ "$2" == "-l" ];then
       ${KAFKA_HOME_BIN}/kafka-console-consumer.sh --topic "$3" \
      --bootstrap-server ${KAFKA_CLUSTER} \
      --group "$4"
    fi	
  else
    errorFun
  fi
  ;;
  
  
#重置偏离量
-r)
  if [ $# -eq 3 ]; then
     ${KAFKA_HOME_BIN}/kafka-consumer-groups.sh --topic "$2" --group "$3" --reset-offsets --to-earliest --execute \
     --bootstrap-server ${KAFKA_CLUSTER}
  elif [ $# -eq 4 ]&&[ -n "$(echo $4| sed -n "/^[0-9]\+$/p")" ]; then
     ${KAFKA_HOME_BIN}/kafka-consumer-groups.sh --topic "$2" --group "$3" --reset-offsets --to-offset "$4" --execute \
     --bootstrap-server ${KAFKA_CLUSTER}
  else
    errorFun
  fi	
  ;;
  
#逻辑删除topic或消费组
-del|delete)
  if [ $# -eq 3 ]; then
    if [ "$2" == "-t" ];then
      ${KAFKA_HOME_BIN}/kafka-topics.sh --delete --topic "$3" \
      --bootstrap-server ${KAFKA_CLUSTER}
    elif [ "$2" == "-g" ];then
      ${KAFKA_HOME_BIN}/kafka-consumer-groups.sh --delete --group "$3" \
      --bootstrap-server ${KAFKA_CLUSTER}
    else
      errorFun
    fi
  else  
    echo -e "At least \033[31mtwo parameters \033[0mare required"
    sh /${SCRIPT_DIR}/kafka --help
  fi
  ;; 
*)
  errorFun
  ;;
esac

#!/bin/bash

kafka_topic_bin=/opt/kafka/bin/kafka-topics.sh
kafka_consumer_bin=/opt/kafka/bin/kafka-consumer-groups.sh
zks="127.0.0.1:2181"
servers="127.0.0.1:9092"

###创建topic
createTopic(){
  topicName=$2
  sh $kafka_topic_bin --bootstrap-server $servers --create --topic $topicName --partitions 3 --replication-factor 2
  echo ">>>>>> create topic: $topicName success."
  ##--config max.message.bytes=12800000 --config flush.messages=1 --partitions 5 --replication-factor 1
}

###查看topic
descTopic(){
  descType=$2
  topicName=$3
  if [ "${descType}" = "list" ];then
    sh $kafka_topic_bin --bootstrap-server $servers --list
  elif [ "${descType}" = "describe" ];then
    sh $kafka_topic_bin --bootstrap-server $servers --describe --topic $topicName
  else
    echo "------query error,please check scripts params define."
  fi
}

###删除topic
deleteTopic(){
  topicName=$2
  sh $kafka_topic_bin --zookeeper $zks --delete --topic $topicName
  echo ">>>>>> delete topic: $topicName success."
}

###修改topic
alterTopic(){
  $topicName=$2
  $config=$3
  sh $kafka_topic_bin --zookeeper $zks --alter --topic $topicName $config
  #sh $kafka_topic_bin --zookeeper $zks --alter --topic $topicName --config max.message.bytes=128000
  #sh $kafka_topic_bin --zookeeper $zks --alter --topic $topicName --delete-config max.message.bytes
  #Kafka分区数量只允许增加,不允许减少 
  #sh $kafka_topic_bin --zookeeper $zks --alter --topic $topicName --partitions 5
  echo ">>>>>> alter topic: $topicName success."
}

##查看消费组
consumer(){
  optype=$2
  consumer_group=$3
  if [ "${optype}" = "describe" ];then
    $kafka_consumer_bin --bootstrap-server "${servers}" --group "${consumer_group}" --describe
  elif [ "${optype}" = "delete" ];then
    $kafka_consumer_bin --bootstrap-server "${servers}" --group "${consumer_group}" --delete
    echo "...."
  else
    echo ">>>>>> consumer params error, please check script define."
  fi
}

#kafka-console-producer.sh --broker-list localhost:9092 --topic test
#kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

case $1 in
  create)
    createTopic $1 $2;;
  query)
    descTopic $1 $2 $3;;
  delete)
    deleteTopic $1 $2;;
  alter)
    alterTopic $1 $2 $3;;
  consumer)
    consumer $1 $2 $3;;
  *)
   echo "Usage: $0 (create|desc|del|alter|consumerDesc).";;
esac

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/732540.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-04-27
下一篇 2022-04-27

发表评论

登录后才能评论

评论列表(0条)

保存