class ArgoRollout():
def __init__(self, namespace, app_name, image_name, port=8080):
self.k8s_api = client.ApiClient()
self.app_name = app_name
self.namespace = namespace
self.image_name = image_name
self.port = port
self.template_yaml_path = "k8s-rollout-template.yml"
def _init_body(self,
"""根据模版初始化yaml"""
with open(self.template_yaml_path) as f:
depoy_tmp = yaml.safe_load(f)
# 转化为字符串,方便进行关键字替换(可优化)
deploy_tmp_str = json.dumps(depoy_tmp)
deploy_ins_str = deploy_tmp_str.replace('__argo_rollout_name', self.app_name).replace('__namespace_name',
self.namespace).replace(
'__image_name', self.image_name)
deploy_ins = json.loads(deploy_ins_str)
# 副本数和端口必须是int型
deploy_ins['spec']['template']['spec']['containers'][0]['ports'][0]['containerPort'] = self.port
deploy_ins['spec']['template']['spec']['containers'][0]['livenessProbe']['httpGet']['port'] = self.port
deploy_ins['spec']['template']['spec']['containers'][0]['readinessProbe']['httpGet']['port'] = self.port
deploy_ins['spec']['template']['spec']['containers'][0]['startupProbe']['httpGet']['port'] = self.port
return deploy_ins
def create_argo_rollouts(self):
"""创建 argo rollouts"""
body = self._init_body()
url = "/apis/argoproj.io/v1alpha1/namespaces/{namespace}/rollouts".format(namespace=self.namespace)
resp = self.k8s_api.call_api(url, "POST", body=body, _preload_content=False)
rollout_info = resp[0].data.decode("utf-8")
rollout_info = json.loads(rollout_info)
return rollout_info
def get_argo_rollout_info(self):
url = "/apis/argoproj.io/v1alpha1/namespaces/{namespace}/rollouts/{app_name}".format(namespace=self.namespace,
app_name=self.app_name)
resp = self.k8s_api.call_api(url, "GET", _preload_content=False)
rollout_info = resp[0].data.decode("utf-8")
rollout_info = json.loads(rollout_info)
return rollout_info
def list_rollouts(self):
"""获取 argo rollouts 列表"""
url = "/apis/argoproj.io/v1alpha1/namespaces/{namespace}/rollouts/".format(namespace=self.namespace)
rollout = []
try:
resp = self.k8s_api.call_api(url, "GET", _preload_content=False)
rollout_by_ns = resp[0].data.decode("utf-8")
rollout_by_ns = json.loads(rollout_by_ns)
for deploy in rollout_by_ns["items"]:
rollout.append(deploy["metadata"]["name"])
except Exception as e:
print(e)
finally:
return rollout
def patch_rollouts(self, batch: int=None, release_type="auto", batch_wait_time=60, gray_percentage=1):
"""更新 argo rollouts"""
url = "/apis/argoproj.io/v1alpha1/namespaces/{namespace}/rollouts/{name}".format(namespace=self.namespace,
name=self.app_name)
body = self.get_argo_rollout_info()
# 根据副本个数进行分批发布
replicas = body["spec"]["replicas"]
if batch is None:
if replicas == 0:
raise Exception("replicas cannot be 0")
elif replicas == 1:
raise Exception("replicas is 1, Unable to perform grayscale publishing")
elif replicas == 2:
batch = 1
elif 2 <= replicas < 6:
batch = 2
elif 6 <= replicas < 20:
batch = 3
elif 20 <= replicas < 40:
batch = 5
elif 40 <= replicas < 60:
batch = 6
elif 60 <= replicas < 80:
batch = 7
elif 80 <= replicas < 90:
batch = 8
elif 90 <= replicas < 100:
batch = 9
else:
if batch == 0:
raise Exception("batch cannot be 0")
# import math
# gray_percentage = 1 # math.ceil(1/replicas * 100) 灰度比例
# 更新image
body["spec"]["template"]["spec"]["containers"][0]["image"] = self.image_name
# 更新发布策略
steps = list()
steps.append({'setWeight': gray_percentage})
steps.append({'pause': {}})
for i in range(batch - 1):
steps.append({'setWeight': int(100/ batch) * (i + 1)})
if release_type == "auto":
steps.append({'pause': {"duration": batch_wait_time}})
else:
steps.append({'pause': {}})
# 最后一批发布进度100%
steps.append({'setWeight': 100})
steps.append({'pause': {"duration": batch_wait_time}})
# 发布策略更新
body["spec"]["strategy"]["canary"]["steps"] = steps
# 发布更新
resp = self.k8s_api.call_api(url,
"PATCH",
body=body,
_preload_content=False,
header_params={'Content-Type': 'application/merge-patch+json'})
rollout_info = resp[0].data.decode("utf-8")
rollout_info = json.loads(rollout_info)
return rollout_info
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)