您的位置:首頁 > 軟件教程 > 教程 > ArgoWorkflow教程(五)---Workflow 的多種觸發(fā)模式:手動(dòng)、定時(shí)任務(wù)與事件觸發(fā)

ArgoWorkflow教程(五)---Workflow 的多種觸發(fā)模式:手動(dòng)、定時(shí)任務(wù)與事件觸發(fā)

來源:好特整理 | 時(shí)間:2024-09-25 09:47:47 | 閱讀:62 |  標(biāo)簽: a Flow GO 教程 K Argo workflow AR   | 分享到:

上一篇我們分析了argo-workflow 中的 archive,包括 流水線GC、流水線歸檔、日志歸檔等功能。本篇主要分析 Workflow 中的幾種觸發(fā)方式,包括手動(dòng)觸發(fā)、定時(shí)觸發(fā)、Event 事件觸發(fā)等。 1. 概述 Argo Workflows 的流水線有多種觸發(fā)方式: 手動(dòng)觸發(fā):手動(dòng)提交一

ArgoWorkflow教程(五)---Workflow 的多種觸發(fā)模式:手動(dòng)、定時(shí)任務(wù)與事件觸發(fā)

上一篇我們分析了argo-workflow 中的 archive,包括 流水線GC、流水線歸檔、日志歸檔等功能。本篇主要分析 Workflow 中的幾種觸發(fā)方式,包括手動(dòng)觸發(fā)、定時(shí)觸發(fā)、Event 事件觸發(fā)等。

1. 概述

Argo Workflows 的流水線有多種觸發(fā)方式:

  • 手動(dòng)觸發(fā):手動(dòng)提交一個(gè) Workflow,就會(huì)觸發(fā)一次構(gòu)建,那么我們創(chuàng)建的流水線,理論上是 WorkflowTemplate 對象。
  • 定時(shí)觸發(fā): CronWorkflow ,類似于 k8s 中的 job 和 cronjob,CronWorkflow 會(huì)定時(shí)創(chuàng)建 Workflow 來實(shí)現(xiàn)定時(shí)觸發(fā)。
  • Event 事件觸發(fā):比如通過git commit 觸發(fā),借助 argo-events 可以實(shí)現(xiàn)此功能。

2. 定時(shí)觸發(fā)

CronWorkflow 本質(zhì)上就是一個(gè) Workflow + Cron Spec。

設(shè)計(jì)上參考了 k8s 中的 CronJob

Demo

一個(gè)簡單的 CronWorkflow 如下:

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: test-cron-wf
spec:
  schedule: "* * * * *"
  concurrencyPolicy: "Replace"
  startingDeadlineSeconds: 0
  workflowSpec:
    entrypoint: whalesay
    templates:
    - name: whalesay
      container:
        image: alpine:3.6
        command: [sh, -c]
        args: ["date; sleep 90"]

apply 一下,可以看到創(chuàng)建出來的 Workflow 命名為 $cronWorkflowName-xxx

[root@lixd-argo workdir]# k get cwf
NAME           AGE
test-cron-wf   116s
[root@lixd-argo workdir]# k get wf
NAME                      STATUS    AGE   MESSAGE
test-cron-wf-1711852560   Running   47s

由于 template 中運(yùn)行任務(wù)是 sleep 90s 因此,整個(gè)任務(wù)耗時(shí)肯定是超過 60s 的,根據(jù)設(shè)置的 concurrencyPolicy 為 Replace ,因此 60s 后,第二個(gè) Workflow 被創(chuàng)建出來,第一個(gè)就會(huì)被停止掉。

[root@lixd-argo workdir]# k get wf
NAME                      STATUS    AGE    MESSAGE
test-cron-wf-1711852560   Failed    103s   Stopped with strategy 'Terminate'
test-cron-wf-1711852620   Running   43s

具體參數(shù)

支持的具體參數(shù)如下:

type CronWorkflowSpec struct {
	// WorkflowSpec is the spec of the workflow to be run
	WorkflowSpec WorkflowSpec `json:"workflowSpec" protobuf:"bytes,1,opt,name=workflowSpec,casttype=WorkflowSpec"`
	// Schedule is a schedule to run the Workflow in Cron format
	Schedule string `json:"schedule" protobuf:"bytes,2,opt,name=schedule"`
	// ConcurrencyPolicy is the K8s-style concurrency policy that will be used
	ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty" protobuf:"bytes,3,opt,name=concurrencyPolicy,casttype=ConcurrencyPolicy"`
	// Suspend is a flag that will stop new CronWorkflows from running if set to true
	Suspend bool `json:"suspend,omitempty" protobuf:"varint,4,opt,name=suspend"`
	// StartingDeadlineSeconds is the K8s-style deadline that will limit the time a CronWorkflow will be run after its
	// original scheduled time if it is missed.
	StartingDeadlineSeconds *int64 `json:"startingDeadlineSeconds,omitempty" protobuf:"varint,5,opt,name=startingDeadlineSeconds"`
	// SuccessfulJobsHistoryLimit is the number of successful jobs to be kept at a time
	SuccessfulJobsHistoryLimit *int32 `json:"successfulJobsHistoryLimit,omitempty" protobuf:"varint,6,opt,name=successfulJobsHistoryLimit"`
	// FailedJobsHistoryLimit is the number of failed jobs to be kept at a time
	FailedJobsHistoryLimit *int32 `json:"failedJobsHistoryLimit,omitempty" protobuf:"varint,7,opt,name=failedJobsHistoryLimit"`
	// Timezone is the timezone against which the cron schedule will be calculated, e.g. "Asia/Tokyo". Default is machine's local time.
	Timezone string `json:"timezone,omitempty" protobuf:"bytes,8,opt,name=timezone"`
	// WorkflowMetadata contains some metadata of the workflow to be run
	WorkflowMetadata *metav1.ObjectMeta `json:"workflowMetadata,omitempty" protobuf:"bytes,9,opt,name=workflowMeta"`
}

內(nèi)容可以分為 3 部分:

  • WorkflowSpec :這個(gè)就是 Workflow 的 Spec,一模一樣的
  • Cron Spec:增加了一些 Cron 相關(guān)字段
  • WorkflowMetadata:一些 metadata,后續(xù)該 CronWorkflow 創(chuàng)建的 Workflow 都會(huì)攜帶上這里指定的 metadata

WorkflowSpec 和 WorkflowMetadata 沒太大區(qū)別,就不贅述了,分析一下 Cron Spec 相關(guān)的幾個(gè)字段:

  • schedule:cron 表達(dá)式, * * * * * 每分鐘創(chuàng)建一次
  • concurrencyPolicy:并發(fā)模式,支持 Allow、Forbid、Replace
    • Allow:允許同時(shí)運(yùn)行多個(gè) Workflow
    • Forbid:禁止并發(fā),有 Workflow 運(yùn)行時(shí),就不會(huì)再創(chuàng)建新的
    • Replace: 表示新創(chuàng)建 Workflow 替換掉舊的,不會(huì)同時(shí)運(yùn)行多個(gè) Workflow。
  • startingDeadlineSeconds:Workflow 創(chuàng)建出來到第一個(gè) Pod 啟動(dòng)的最大時(shí)間,超時(shí)后就會(huì)被標(biāo)記為失敗。
  • suspend:flag 是否停止 CronWorkflow,在定時(shí)任務(wù)不需要執(zhí)行是可以設(shè)置為 true。
  • timezone:時(shí)區(qū),默認(rèn)使用機(jī)器上的本地時(shí)間

大部分字段和 K8s CronJob 一致

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: my-cron
spec:
  schedule: "* * * * *"
  concurrencyPolicy: "Replace"
  startingDeadlineSeconds: 0
  workflowSpec:
    entrypoint: whalesay
    templates:
    - name: whalesay
      container:
        image: alpine:3.6
        command: [sh, -c]
        args: ["date; sleep 10"]
  workflowMetadata:
    labels:
      from: cron

增加了 metadata,測試一下

[root@lixd-argo workdir]# k get wf my-cron-1711853400 -oyaml|grep labels -A 1
  labels:
    from: cron

可以看到,創(chuàng)建出來的 Workflow 確實(shí)攜帶上了,在 CronWorkflow 中指定的 label。

3. Event

argo 提供了一個(gè) Event API: /api/v1/events/{namespace}/{discriminator} ,該 API 可以接受任意 json 數(shù)據(jù)。

通過 event API 可以創(chuàng)建 Workflow ,類似于 Webhook。

具體請求長這樣:

curl https://localhost:2746/api/v1/events/argo/ \
  -H "Authorization: $ARGO_TOKEN" \
  -d '{"message": "hello"}'

或者這樣:

curl https://localhost:2746/api/v1/events/argo/my-discriminator \
  -H "Authorization: $ARGO_TOKEN" \
  -d '{"message": "hello"}'
  • 1)準(zhǔn)備 Token
  • 2)創(chuàng)建 WorkflowEventBinding,配置接收 event 以及收到 event 后創(chuàng)建的 Workflow 信息
  • 3)發(fā)送請求進(jìn)行測試

Token

創(chuàng)建 RBAC 相關(guān)對象,role、rolebinding、sa,其中 role 只需要提供最小權(quán)限即可。

直接創(chuàng)建在 default 命名空間

kubectl apply -f - <

serviceaccount 和 rolebinding

kubectl create sa test

kubectl create rolebinding test --role=test --serviceaccount=default:test

然后創(chuàng)建一個(gè) Secret

kubectl apply -f - <

最后就可以查詢 Secret 解析 Token 了

ARGO_TOKEN="Bearer $(kubectl get secret test.service-account-token -o=jsonpath='{.data.token}' | base64 --decode)"

echo $ARGO_TOKEN
Bearer ZXlKaGJHY2lPaUpTVXpJMU5pSXNJbXRwWkNJNkltS...

測試,能否正常使用

ARGO_SERVER=$(kubectl get svc argo-workflows-server -n argo -o=jsonpath='{.spec.clusterIP}')

curl http://$ARGO_SERVER:2746/api/v1/workflow-event-bindings/default -H "Authorization: $ARGO_TOKEN"

WorkflowEventBinding

為了接收 Event,可以創(chuàng)建 WorkflowEventBinding 對象,具體如下:

apiVersion: argoproj.io/v1alpha1
kind: WorkflowEventBinding
metadata:
  name: event-consumer
spec:
  event:
    # metadata header name must be lowercase to match in selector
    selector: payload.message != "" && metadata["x-argo-e2e"] == ["true"] && discriminator == "my-discriminator"
  submit:
    workflowTemplateRef:
      name: my-wf-tmple
    arguments:
      parameters:
      - name: message
        valueFrom:
          event: payload.message

spec.event 指定了該 Binding 該如何匹配收到的 Event,比如這里的條件就是:

  • 1)payload 中有一個(gè) message 參數(shù),值不為空
  • 2)header 中包含 x-argo-e2e,且值為 true
    • 注意:這里匹配的時(shí)候 header 都會(huì)被轉(zhuǎn)為小寫
  • 3)最后就是 discriminator 名字為 my-discriminator

如果匹配則會(huì)使用 submit 下面指定的內(nèi)容創(chuàng)建 Workflow:

  • 1)使用 my-wf-tmple 這個(gè) workflowTemplate 創(chuàng)建 Workflow
  • 2)使用 payload.message 作為參數(shù)

至于創(chuàng)建出的 Workflow 則是由 my-wf-tmple 定義了,先創(chuàng)建這個(gè) Template

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: my-wf-tmple
spec:
  templates:
    - name: main
      inputs:
        parameters:
          - name: message
            value: "{{workflow.parameters.message}}"
      container:
        image: docker/whalesay:latest
        command: [cowsay]
        args: ["{{inputs.parameters.message}}"]
  entrypoint: main

最后我們就可以發(fā)送 API 來觸發(fā) event 實(shí)現(xiàn) Workflow 的創(chuàng)建

curl $ARGO_SERVER:2746/api/v1/events/default/my-discriminator \
    -H "Authorization: $ARGO_TOKEN" \
    -H "X-Argo-E2E: true" \
    -d '{"message": "hello events"}'

測試一下:

{}[root@lixd-argo workdir]# curl $ARGO_SERVER:2746/api/v1/events/default/my-discriminator \
>     -H "Authorization: $ARGO_TOKEN" \
>     -H "X-Argo-E2E: true" \
>     -d '{"message": "hello events"}'
{}[root@lixd-argo workdir]# k get wf
NAME                STATUS    AGE   MESSAGE
my-wf-tmple-ea81n   Running   5s
[root@lixd-argo workdir]# k get wf my-wf-tmple-ea81n -oyaml|grep parameters -A 5
    parameters:
    - name: message
      value: hello events

可以看到,Workflow 已經(jīng)創(chuàng)建出來了,而且參數(shù)也是我們發(fā)請求時(shí)給的 hello events。

擴(kuò)容

默認(rèn)情況下 argo-server 可以同時(shí)處理 64 個(gè)事件,再多就會(huì)直接返回 503 了,可以通過以下參數(shù)進(jìn)行調(diào)整:

  • 1)--event-operation-queue-size:增加隊(duì)列大小,以接收更多的事件
  • 2)--event-worker-count:增加 worker 數(shù)量,提升處理速度

4. Webhook

前面 Event 章節(jié)提到了可以通過發(fā)送 HTTP 請求的方式來創(chuàng)建觸發(fā) event 以 Workflow,但是需要客戶端提供 AuthToken。

問題來了,對于一些不能指定 Token 的客戶端來說就比較麻煩了,比如 Github、Gitlab 等 Git 倉庫,都可以配置 Webhook,在收到 commit 的時(shí)候調(diào)用 Webhook 來觸發(fā)流水線。

此時(shí),這些發(fā)送過來的請求肯定是沒有帶 Token 的,因此需要額外配置來進(jìn)行驗(yàn)證,保證 argo 只處理來自 Github、Gitlab 等等平臺(tái)的 Webhook 請求。

  • 1)創(chuàng)建 RBAC 相關(guān)對象,role、rolebinding、sa 準(zhǔn)備好 token
  • 2)配置 Webhook-clients,告訴 argo 什么類型的 Webhook 過來使用那個(gè) secret 作為 token

第一步 Token 和 Event 章節(jié)一致,就不在贅述了,主要是第二步。

webhook-clients config

上一步,創(chuàng)建 RBAC 對象,準(zhǔn)備好 Secret 之后,一般客戶端都是解析 Secret 中的 Token,然后帶上該 Token 發(fā)送請求,就像這樣:

ARGO_SERVER=$(kubectl get svc argo-workflows-server -n argo -o=jsonpath='{.spec.clusterIP}')
ARGO_TOKEN="Bearer $(kubectl get secret jenkins.service-account-token -o=jsonpath='{.data.token}' | base64 --decode)"

curl https://$ARGO_SERVER:2746/api/v1/events/default/ \
  -H "Authorization: $ARGO_TOKEN" \
  -d '{"message": "hello"}'

但是,對于 Webhook 客戶端來說,是沒辦法這樣指定 token 的,因此需要通過 argo-workflows-webhook-clients 配置來告訴 argo,哪個(gè) Webhook 使用哪個(gè) Secret 中的 token。

創(chuàng)建一個(gè)名為 argo-workflows-webhook-clients 的 Secret,內(nèi)容大致是這樣的:

kind: Secret
apiVersion: v1
metadata:
  name: argo-workflows-webhook-clients
# The data keys must be the name of a service account.
stringData:
  # https://support.atlassian.com/bitbucket-cloud/docs/manage-webhooks/
  bitbucket.org: |
    type: bitbucket
    secret: "my-uuid"
  # https://confluence.atlassian.com/bitbucketserver/managing-webhooks-in-bitbucket-server-938025878.html
  bitbucketserver: |
    type: bitbucketserver
    secret: "shh!"
  # https://developer.github.com/webhooks/securing/
  github.com: |
    type: github
    secret: "shh!"
  # https://docs.gitlab.com/ee/user/project/integrations/webhooks.html
  gitlab.com: |
    type: gitlab
    secret: "shh!"
  • 其中 Key 必須是當(dāng)前 Namespace 下的 Serviceaccount 名稱。
  • Value 則包含 type 和 secret 兩部分。
    • type:Webhook 來源,比如 github、gitlab
    • secret:一個(gè)字符串,非 k8s secret,一般在對應(yīng)平臺(tái)添加 Webhook 時(shí)進(jìn)行配置

以 Github 具體,secret 配置如下:

在添加 Webhook 時(shí)可以填一個(gè) Secret 配置,實(shí)際就是一串加密字符,隨便填什么都可以。

這樣 Github 發(fā)送 Webhook 請求時(shí)就會(huì)攜帶上這個(gè) Secret 信息,Argo 收到后就根據(jù) argo-workflows-webhook-clients 的 Secret 里配置的 type=github 的 secret 字段進(jìn)行對比,如果匹配上就處理,否則就忽略該請求。

ArgoWorkflow教程(五)---Workflow 的多種觸發(fā)模式:手動(dòng)、定時(shí)任務(wù)與事件觸發(fā)

如果能匹配上就從對應(yīng)的 Serviceaccount 中解析 Token 作為 Authorization 信息。

源碼分析

Webhook 這一塊,官方文檔不是很詳細(xì),一筆帶過了,因此翻了下源碼。

這塊邏輯以一個(gè) Interceptor 的形式出現(xiàn),對于所有 Event API 都會(huì)經(jīng)過該邏輯, 用于為沒有攜帶 Authorization 的請求添加 Authorization 信息 。

// Interceptor creates an annotator that verifies webhook signatures and adds the appropriate access token to the request.
func Interceptor(client kubernetes.Interface) func(w http.ResponseWriter, r *http.Request, next http.Handler) {
	return func(w http.ResponseWriter, r *http.Request, next http.Handler) {
		err := addWebhookAuthorization(r, client)
		if err != nil {
			log.WithError(err).Error("Failed to process webhook request")
			w.WriteHeader(403)
			// hide the message from the user, because it could help them attack us
			_, _ = w.Write([]byte(`{"message": "failed to process webhook request"}`))
		} else {
			next.ServeHTTP(w, r)
		}
	}
}

調(diào)用 addWebhookAuthorization 嘗試添加認(rèn)證信息。

func addWebhookAuthorization(r *http.Request, kube kubernetes.Interface) error {
	// try and exit quickly before we do anything API calls
	if r.Method != "POST" || len(r.Header["Authorization"]) > 0 || !strings.HasPrefix(r.URL.Path, pathPrefix) {
		return nil
	}
	parts := strings.SplitN(strings.TrimPrefix(r.URL.Path, pathPrefix), "/", 2)
	if len(parts) != 2 {
		return nil
	}
	namespace := parts[0]
	secretsInterface := kube.CoreV1().Secrets(namespace)
	ctx := r.Context()

	webhookClients, err := secretsInterface.Get(ctx, "argo-workflows-webhook-clients", metav1.GetOptions{})
	if err != nil {
		return fmt.Errorf("failed to get webhook clients: %w", err)
	}
	// we need to read the request body to check the signature, but we still need it for the GRPC request,
	// so read it all now, and then reinstate when we are done
	buf, _ := io.ReadAll(r.Body)
	defer func() { r.Body = io.NopCloser(bytes.NewBuffer(buf)) }()
	serviceAccountInterface := kube.CoreV1().ServiceAccounts(namespace)
	for serviceAccountName, data := range webhookClients.Data {
		r.Body = io.NopCloser(bytes.NewBuffer(buf))
		client := &webhookClient{}
		err := yaml.Unmarshal(data, client)
		if err != nil {
			return fmt.Errorf("failed to unmarshal webhook client \"%s\": %w", serviceAccountName, err)
		}
		log.WithFields(log.Fields{"serviceAccountName": serviceAccountName, "webhookType": client.Type}).Debug("Attempting to match webhook request")
		ok := webhookParsers[client.Type](client.Secret, r)
		if ok {
			log.WithField("serviceAccountName", serviceAccountName).Debug("Matched webhook request")
			serviceAccount, err := serviceAccountInterface.Get(ctx, serviceAccountName, metav1.GetOptions{})
			if err != nil {
				return fmt.Errorf("failed to get service account \"%s\": %w", serviceAccountName, err)
			}
			tokenSecret, err := secretsInterface.Get(ctx, secrets.TokenNameForServiceAccount(serviceAccount), metav1.GetOptions{})
			if err != nil {
				return fmt.Errorf("failed to get token secret \"%s\": %w", tokenSecret, err)
			}
			r.Header["Authorization"] = []string{"Bearer " + string(tokenSecret.Data["token"])}
			return nil
		}
	}
	return nil
}

具體流程如下:

  • 首先判斷了,只有 POST 方法,而且 Authorization 為空時(shí)才會(huì)自動(dòng)添加。
  • 然后就從 API 中指定的 Namespace 下查詢名為 argo-workflows-webhook-clients 的 Secret。
  • 最后就是循環(huán)對比,Secret 中的 type 和 secret 能否和當(dāng)前請求匹配上,如果匹配上則把 data 對應(yīng)的 key 做了 serviceaccount 名去查詢 token 然后解析 token 做了 Authorization 使用。

第三步會(huì)直接使用 key 作為 serviceaccount,這也就是為什么配置 argo-workflows-webhook-clients 時(shí)需要把 serviceaccount 名稱做為 key。


【ArgoWorkflow 系列】 持續(xù)更新中,搜索公眾號(hào)【 探索云原生 】訂閱,閱讀更多文章。

ArgoWorkflow教程(五)---Workflow 的多種觸發(fā)模式:手動(dòng)、定時(shí)任務(wù)與事件觸發(fā)


5. 小結(jié)

本文主要分析了 Argo 中的 Workflow 的幾種觸發(fā)方式。

  • 1)手動(dòng)觸發(fā):手動(dòng)創(chuàng)建 Workflow 對象方式觸發(fā)流水線運(yùn)行
  • 2)定時(shí)觸發(fā):使用 CronWorkflow 根據(jù) Cron 表達(dá)式自動(dòng)創(chuàng)建 Workflow
  • 3)Event:使用 argo-server 提供的 event api 配合WorkflowEventBinding 創(chuàng)建 Workflow
  • 4)Webhook:該方式實(shí)則是 Event 方式的擴(kuò)展,Event 方式請求時(shí)需要 Token 認(rèn)證,Webhook 方式則通過 argo-workflows-webhook-clients 配置好不同來源的 Webhook 使用的 Secret 以實(shí)現(xiàn)認(rèn)證,這樣就可以把 Event API 用作 Webhook 端點(diǎn) 配置到 Github、Gitlab 等環(huán)境了。
小編推薦閱讀

好特網(wǎng)發(fā)布此文僅為傳遞信息,不代表好特網(wǎng)認(rèn)同期限觀點(diǎn)或證實(shí)其描述。

a 1.0
a 1.0
類型:休閑益智  運(yùn)營狀態(tài):正式運(yùn)營  語言:中文   

游戲攻略

游戲禮包

游戲視頻

游戲下載

游戲活動(dòng)

《alittletotheleft》官網(wǎng)正版是一款備受歡迎的休閑益智整理游戲。玩家的任務(wù)是對日常生活中的各種雜亂物
Go v1.62
Go v1.62
類型:動(dòng)作冒險(xiǎn)  運(yùn)營狀態(tài):正式運(yùn)營  語言:中文   

游戲攻略

游戲禮包

游戲視頻

游戲下載

游戲活動(dòng)

GoEscape是一款迷宮逃脫休閑闖關(guān)游戲。在這款游戲中,玩家可以挑戰(zhàn)大量關(guān)卡,通過旋轉(zhuǎn)屏幕的方式幫助球球

相關(guān)視頻攻略

更多

掃二維碼進(jìn)入好特網(wǎng)手機(jī)版本!

掃二維碼進(jìn)入好特網(wǎng)微信公眾號(hào)!

本站所有軟件,都由網(wǎng)友上傳,如有侵犯你的版權(quán),請發(fā)郵件[email protected]

湘ICP備2022002427號(hào)-10 湘公網(wǎng)安備:43070202000427號(hào)© 2013~2025 haote.com 好特網(wǎng)