背景 最近公司將我們之前使用的鏈路工具切換為了 OpenTelemetry. 我們的技術(shù)棧是: OTLP Client──────────?Collect────────?StartRocks (Agent) ▲ │ │ Jaeger 其中客戶端使用 OpenTelemetry 提供的 Java A
最近公司將我們之前使用的鏈路工具切換為了
OpenTelemetry
.
我們的技術(shù)棧是:
OTLP
Client──────────?Collect────────?StartRocks
(Agent) ▲
│
│
Jaeger
其中客戶端使用 OpenTelemetry 提供的 Java Agent 進(jìn)行埋點(diǎn)收集數(shù)據(jù),再由 Agent 通過(guò) OTLP(OpenTelemetry Protocol) 協(xié)議將數(shù)據(jù)發(fā)往 Collector,在
Collector
中我們可以自行任意處理數(shù)據(jù),并決定將這些數(shù)據(jù)如何存儲(chǔ)(這點(diǎn)在以往的 SkyWalking 體系中是很難自定義的)
這里我們將數(shù)據(jù)寫(xiě)入 StartRocks 中,供之后的 UI 層進(jìn)行查看。
OpenTelemetry
是可觀測(cè)系統(tǒng)的新標(biāo)準(zhǔn),基于它可以兼容以前使用的 Prometheus、 victoriametrics、skywalking 等系統(tǒng),同時(shí)還可以靈活擴(kuò)展,不用與任何但一生態(tài)或技術(shù)棧進(jìn)行綁定。
更多關(guān)于 OTel 的內(nèi)容會(huì)在今后介紹。
其中有一個(gè)關(guān)鍵問(wèn)題就是:如何在線上進(jìn)行 無(wú)縫切換 。
雖然我們內(nèi)部的發(fā)布系統(tǒng)已經(jīng)支持重新發(fā)布后就會(huì)切換到新的鏈路,也可以讓業(yè)務(wù)自行發(fā)布然后逐步的切換到新的系統(tǒng),這樣也是最保險(xiǎn)的方式。
但這樣會(huì)有幾個(gè)問(wèn)題:
所以最好的方式還是由我們?cè)诤笈_(tái)統(tǒng)一發(fā)布,對(duì)外沒(méi)有任何感知就可以一鍵全部切換為 OpenTelemetry。
仔細(xì)一看貌似也沒(méi)什么難的,無(wú)非就是模擬用戶點(diǎn)擊發(fā)布按鈕而已。
但這事由我們自動(dòng)來(lái)做就不一樣了,用戶點(diǎn)擊發(fā)布的時(shí)候會(huì)選擇他們認(rèn)為可以發(fā)布的分支進(jìn)行發(fā)布,我們不能自作主張的比如選擇 main 分支,有可能只是合并了但還不具備發(fā)布條件。
所以保險(xiǎn)的方式還是得用當(dāng)前項(xiàng)目上一次發(fā)布時(shí)所使用的 git hash 值重新打包發(fā)布。
但這也有幾個(gè)問(wèn)題:
所以思來(lái)想去最保險(xiǎn)的方法還是將業(yè)務(wù)鏡像拉取下來(lái),然后手動(dòng)刪除鏡像中的 skywalking 包以及 JVM 參數(shù),全部替換為 OpenTelemetry 的包和 JVM 參數(shù)。
整體的方案如下:
pod >0
的 deployment
因?yàn)樾枰婕暗讲僮?kubernetes,所以整體就使用 Golang 實(shí)現(xiàn)了。
func ProcessDeployment(ctx context.Context, finish []string, deployment v1.Deployment, clientSet kubernetes.Interface) error {
deploymentName := deployment.Name
for _, s := range finish {
if s == deploymentName {
klog.Infof("Skip finish deployment:%s", deploymentName)
return nil
}
}
// Write finish deployment name to a file
defer writeDeploymentName2File(deploymentName, fmt.Sprintf("finish-%s.log", deployment.Namespace))
appName := deployment.GetObjectMeta().GetLabels()["appName"]
klog.Infof("Begin to process deployment:%s, appName:%s", deploymentName, appName)
upgrade, err := checkContainIstio(ctx, deployment, clientSet)
if err != nil {
return err
}
if upgrade == false {
klog.Infof("Don't have istio, No need to upgrade deployment:%s appName:%s", deploymentName, appName)
return nil
}
for i, container := range deployment.Spec.Template.Spec.Containers {
if strings.HasPrefix(deploymentName, container.Name) {
// Check if container has sw jvm
for _, envVar := range container.Env {
if envVar.Name == "CATALINA_OPTS" {
if !strings.Contains(envVar.Value, "skywalking") {
klog.Infof("Skip upgrade don't have sw jvm deployment:%s container:%s", deploymentName, container.Name)
return nil
}
}
}
upgrade(container)
// Check newDeployment status
go checkNewDeploymentStatus(ctx, clientSet, newDeployment)
// delete from image
deleteImage(container.Image)
}
}
return nil
}
這個(gè)函數(shù)需要傳入一個(gè) deployment ,同時(shí)還有一個(gè)已經(jīng)完成了的列表進(jìn)來(lái)。
已完成列表用于多次運(yùn)行的時(shí)候可以快速跳過(guò)已經(jīng)執(zhí)行的 deployment。
checkContainIstio()
函數(shù)很簡(jiǎn)單,判斷是否包含了 Istio 容器,如果沒(méi)有包含說(shuō)明不是后端應(yīng)用(可能是前端、大數(shù)據(jù)之類的任務(wù)),就可以直接跳過(guò)了。
而判斷是否需要替換的前提這事判斷環(huán)境變量
CATALINA_OPTS
中是否包含了 skywalking 的內(nèi)容,如果包含則說(shuō)明需要進(jìn)行替換。
func upgrade(container Container){
klog.Infof("Begin to upgrade deployment:%s container:%s", deploymentName, container.Name)
newImageName := fmt.Sprintf("%s-otel-%s", container.Image, generateRandomString(4))
err := BuildNewOtelImage(container.Image, newImageName)
if err != nil {
return err
}
// Update deployment jvm ENV
for e, envVar := range container.Env {
if envVar.Name == "CATALINA_OPTS" {
otelJVM := replaceSWAgent2OTel(envVar.Value, appName)
deployment.Spec.Template.Spec.Containers[i].Env[e].Value = otelJVM
}
}
// Update deployment image
deployment.Spec.Template.Spec.Containers[i].Image = newImageName
newDeployment, err := clientSet.AppsV1().Deployments(deployment.Namespace).Update(ctx, &deployment, metav1.UpdateOptions{})
if err != nil {
return err
}
klog.Infof("Finish upgrade deployment:%s container:%s", deploymentName, container.Name)
}
這里一共分為以下幾部:
CATALINA_OPTS
環(huán)境變量,也就是替換 skywalking 的參數(shù)
dockerfile = fmt.Sprintf(`FROM %s
COPY %s /home/admin/%s
COPY otel.tar.gz /home/admin/otel.tar.gz
RUN tar -zxvf /home/admin/otel.tar.gz -C /home/admin
RUN rm -rf /home/admin/skywalking-agent
ENTRYPOINT ["/bin/sh", "/home/admin/start.sh"]
`, fromImage, script, script)
idx := strings.LastIndex(newImageName, "/") + 1
dockerFileName := newImageName[idx:]
create, err := os.Create(fmt.Sprintf("Dockerfile-%s", dockerFileName))
if err != nil {
return err
}
defer func() {
create.Close()
os.Remove(create.Name())
}()
_, err = create.WriteString(dockerfile)
if err != nil {
return err
}
cmd := exec.Command("docker", "build", ".", "-f", create.Name(), "-t", newImageName)
cmd.Stdin = strings.NewReader(dockerfile)
if err := cmd.Run(); err != nil {
return err
}
其實(shí)這里的重點(diǎn)就是構(gòu)建這個(gè)新鏡像,從這個(gè) dockerfile 中也能看出具體的邏輯,也就是上文提到的刪除原有的 skywalking 資源同時(shí)將新的 OpenTelemetry 資源打包進(jìn)去。
最后再將這個(gè)鏡像上傳到私服。
其中的替換 JVM 參數(shù)也比較簡(jiǎn)單,直接刪除 skywalking 的內(nèi)容,然后再追加上 OpenTelemetry 需要的參數(shù)即可。
func checkNewDeploymentStatus(ctx context.Context, clientSet kubernetes.Interface, newDeployment *v1.Deployment) error {
ready := true
tick := time.Tick(10 * time.Second)
for i := 0; i < 30; i++ {
<-tick
originPodList, err := clientSet.CoreV1().Pods(newDeployment.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{
MatchLabels: newDeployment.Spec.Selector.MatchLabels,
}),
})
if err != nil {
return err
}
// Check if there are any Pods
if len(originPodList.Items) == 0 {
klog.Infof("No Pod in deployment:%s, Skip", newDeployment.Name)
}
for _, item := range originPodList.Items {
// Check Pod running
for _, status := range item.Status.ContainerStatuses {
if status.RestartCount > 0 {
ready = false
break
}
}
}
klog.Infof("Check deployment:%s namespace:%s status:%t", newDeployment.Name, newDeployment.Namespace, ready)
if ready == false {
break
}
}
if ready == false {
// rollback
klog.Infof("=======Rollback deployment:%s namespace:%s", newDeployment.Name, newDeployment.Namespace)
writeDeploymentName2File(newDeployment.Name, fmt.Sprintf("rollback-%s.log", newDeployment.Namespace))
}
return nil
}
這里會(huì)啟動(dòng)一個(gè) 10s 執(zhí)行一次的定時(shí)任務(wù),每次都會(huì)檢測(cè)是否有容器發(fā)生了重啟(正常情況下是不會(huì)出現(xiàn)重啟的)
如果檢測(cè)了 30 次都沒(méi)有重啟的容器,那就說(shuō)明本次替換成功了,不然就記錄一個(gè)日志文件,然后人工處理。
這種通常是原有的鏡像與 OpenTelemetry 不兼容,比如里面寫(xiě)死了一些 skywalking 的 API,導(dǎo)致啟動(dòng)失敗。
所以替換任務(wù)跑完之后我還會(huì)檢測(cè)這個(gè)
rollback-$namespace
的日志文件,人工處理這些失敗的應(yīng)用。
最后講講如何單個(gè)調(diào)用剛才的
ProcessDeployment()
函數(shù)。
考慮到不能對(duì) kubernetes 產(chǎn)生影響,所以我們需要限制并發(fā)處理 deployment 的數(shù)量(我這里的限制是 10 個(gè))。
所以就得分批進(jìn)行替換,每次替換 10 個(gè),而且其中有一個(gè)執(zhí)行失敗就得暫停后續(xù)任務(wù),由人工檢測(cè)失敗原因再?zèng)Q定是否繼續(xù)處理。
畢竟處理的是線上應(yīng)用,需要小心謹(jǐn)慎。
所以觸發(fā)的代碼如下:
func ProcessDeploymentList(ctx context.Context, data []v1.Deployment, clientSet kubernetes.Interface) error {
file, err := os.ReadFile(fmt.Sprintf("finish-%s.log", data[0].Namespace))
if err != nil {
return err
}
split := strings.Split(string(file), "\n")
batchSize := 10
start := 0
for start < len(data) {
end := start + batchSize
if end > len(data) {
end = len(data)
}
batch := data[start:end]
//等待goroutine結(jié)束
var wg sync.WaitGroup
klog.Infof("Start process batch size %d", len(batch))
errs := make(chan error, len(batch))
wg.Add(len(batch))
for _, item := range batch {
d := item
go func() {
defer wg.Done()
if err := ProcessDeployment(ctx, split, d, clientSet); err != nil {
klog.Errorf("!!!Process deployment name:%s error: %v", d.Name, err)
errs <- err
return
}
}()
}
go func() {
wg.Wait()
close(errs)
}()
//任何一個(gè)失敗就返回
for err := range errs {
if err != nil {
return err
}
}
start = end
klog.Infof("Deal next batch")
}
return nil
}
使用
WaitGroup
來(lái)控制一組任務(wù),使用一個(gè) chan 來(lái)傳遞異常;這類分批處理的代碼在一些批處理框架中還蠻常見(jiàn)的。
最后只需要查詢某個(gè) namespace 下的所有 deployment 列表傳入這個(gè)批處理函數(shù)即可。
不過(guò)整個(gè)過(guò)程中還是有幾個(gè)點(diǎn)需要注意:
其實(shí)這個(gè)功能依然有提升空間,考慮到后續(xù)會(huì)升級(jí) OpenTelemetry agent 的版本,甚至也需要增減一些 JVM 參數(shù)。
所以最后有一個(gè)統(tǒng)一的工具,可以直接升級(jí) Agent,而不是每次我都需要修改這里的代碼。
后來(lái)在網(wǎng)上看到了得物的相關(guān)分享,他們可以遠(yuǎn)程加載配置來(lái)解決這個(gè)問(wèn)題。
這也是一種解決方案,直到我們看到了 OpenTelemetry 社區(qū)提供了 Operator ,其中也包含了注入 agent 的功能。
apiVersion: opentelemetry.io/v1alpha1
kind: Instrumentation
metadata:
name: my-instrumentation
spec:
exporter:
endpoint: http://otel-collector:4317
propagators:
- tracecontext
- baggage
- b3
sampler:
type: parentbased_traceidratio
argument: "0.25"
java:
image: private/autoinstrumentation-java:1.32.0-1
我們可以使用他提供的 CRD 來(lái)配置我們 agent,只要維護(hù)好自己的鏡像就好了。
使用起來(lái)也很簡(jiǎn)單,只要安裝好了 OpenTelemetry-operator ,然后再需要注入 Java Agent 的 Pod 中使用注解:
instrumentation.opentelemetry.io/inject-java: "true"
operator 就會(huì)自動(dòng)從剛才我們配置的鏡像中讀取 agent,然后復(fù)制到我們的業(yè)務(wù)容器。
再配置上環(huán)境變量
$JAVA_TOOL_OPTIONS=/otel/javaagent.java
, 這是一個(gè) Java 內(nèi)置的環(huán)境變量,應(yīng)用啟動(dòng)的時(shí)候會(huì)自動(dòng)識(shí)別,這樣就可以自動(dòng)注入 agent 了。
envJavaToolsOptions = "JAVA_TOOL_OPTIONS"
// set env value
idx := getIndexOfEnv(container.Env, envJavaToolsOptions)
if idx == -1 {
container.Env = append(container.Env, corev1.EnvVar{
Name: envJavaToolsOptions,
Value: javaJVMArgument,
})} else {
container.Env[idx].Value = container.Env[idx].Value + javaJVMArgument
}
// copy javaagent.jar
pod.Spec.InitContainers = append(pod.Spec.InitContainers, corev1.Container{
Name: javaInitContainerName,
Image: javaSpec.Image,
Command: []string{"cp", "/javaagent.jar", javaInstrMountPath + "/javaagent.jar"},
Resources: javaSpec.Resources,
VolumeMounts: []corev1.VolumeMount{{
Name: javaVolumeName,
MountPath: javaInstrMountPath,
}},})
大致的運(yùn)行原理是當(dāng)有 Pod 的事件發(fā)生了變化(重啟、重新部署等),operator 就會(huì)檢測(cè)到變化,此時(shí)會(huì)判斷是否開(kāi)啟了剛才的注解:
instrumentation.opentelemetry.io/inject-java: "true"
接著會(huì)寫(xiě)入環(huán)境變量
JAVA_TOOL_OPTIONS
,同時(shí)將 jar 包從 InitContainers 中復(fù)制到業(yè)務(wù)容器中。
這里使用到了 kubernetes 的初始化容器,該容器是用于做一些準(zhǔn)備工作的,比如依賴安裝、配置檢測(cè)或者是等待其他一些組件啟動(dòng)成功后再啟動(dòng)業(yè)務(wù)容器。
目前這個(gè) operator 還處于使用階段,同時(shí)部分功能還不滿足(比如支持自定義擴(kuò)展),今后有時(shí)間也可以分析下它的運(yùn)行原理。
參考鏈接:
機(jī)器學(xué)習(xí):神經(jīng)網(wǎng)絡(luò)構(gòu)建(下)
閱讀華為Mate品牌盛典:HarmonyOS NEXT加持下游戲性能得到充分釋放
閱讀實(shí)現(xiàn)對(duì)象集合與DataTable的相互轉(zhuǎn)換
閱讀鴻蒙NEXT元服務(wù):論如何免費(fèi)快速上架作品
閱讀算法與數(shù)據(jù)結(jié)構(gòu) 1 - 模擬
閱讀基于鴻蒙NEXT的血型遺傳計(jì)算器開(kāi)發(fā)案例
閱讀5. Spring Cloud OpenFeign 聲明式 WebService 客戶端的超詳細(xì)使用
閱讀Java代理模式:靜態(tài)代理和動(dòng)態(tài)代理的對(duì)比分析
閱讀Win11筆記本“自動(dòng)管理應(yīng)用的顏色”顯示規(guī)則
閱讀本站所有軟件,都由網(wǎng)友上傳,如有侵犯你的版權(quán),請(qǐng)發(fā)郵件[email protected]
湘ICP備2022002427號(hào)-10 湘公網(wǎng)安備:43070202000427號(hào)© 2013~2025 haote.com 好特網(wǎng)