Files
2025-08-25 15:46:12 +08:00

404 lines
15 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package k8s_agent
import (
"context"
"encoding/json"
"fmt"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
k8s_api_v1 "code.gitea.io/gitea/modules/k8s/api/devcontainer/v1"
devcontainer_errors "code.gitea.io/gitea/modules/k8s/errors"
devcontainer_k8s_agent_modules_errors "code.gitea.io/gitea/modules/k8s/errors"
apimachinery_api_metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachinery_apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apimachinery_apis_v1_unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
apimachinery_runtime_utils "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
apimachinery_watch "k8s.io/apimachinery/pkg/watch"
dynamic_client "k8s.io/client-go/dynamic"
dynamicclient "k8s.io/client-go/dynamic"
clientgorest "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
// IsK8sDevcontainerStatusReady 工具类方法,判断给定的 DevcontainerApp.Status 是否达到就绪状态
// 1. DevcontainerApp.Status.Ready == true
// 2. DevcontainerApp.Status.NodePortAssigned 介于闭区间 [30000, 32767]
func IsK8sDevcontainerStatusReady(devcontainerAppStatus *k8s_api_v1.DevcontainerAppStatus) bool {
return devcontainerAppStatus != nil &&
devcontainerAppStatus.Ready &&
devcontainerAppStatus.NodePortAssigned >= 30000 &&
devcontainerAppStatus.NodePortAssigned <= 32767
}
// groupVersionResource 用于描述 CRD供 dynamic Client 交互使用
var groupVersionResource = schema.GroupVersionResource{
Group: k8s_api_v1.GroupVersion.Group,
Version: k8s_api_v1.GroupVersion.Version,
Resource: "devcontainerapps",
}
// GetKubernetesClient 通过用户提供的 kubeconfig 原始内容与可选的 contextName 获取动态客户端
func GetKubernetesClient(ctx context.Context, kubeconfig []byte, contextName string) (dynamicclient.Interface, error) {
var config *clientgorest.Config
var err error
if len(kubeconfig) == 0 {
// 未提供 kubeconfig 内容:优先使用本机默认 kubeconfig其次回退到 InCluster
config, err = clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
log.Warn("Failed to obtain Kubernetes config outside of cluster: " + clientcmd.RecommendedHomeFile)
config, err = clientgorest.InClusterConfig()
if err != nil {
log.Error("Failed to obtain Kubernetes config both inside/outside of cluster, the DevContainer is Disabled")
setting.Devcontainer.Enabled = false
return nil, err
}
}
} else {
// 提供了 kubeconfig 内容:按用户提供的内容与可选 context 获取配置
config, err = restConfigFromKubeconfigBytes(kubeconfig, contextName)
if err != nil {
return nil, err
}
}
applyClientDefaults(config)
return dynamicclient.NewForConfig(config)
}
// restConfigFromKubeconfigBytes 基于 kubeconfig 内容构造 *rest.Config支持指定 context为空则使用 current-context
func restConfigFromKubeconfigBytes(kubeconfig []byte, contextName string) (*clientgorest.Config, error) {
if contextName == "" {
cfg, err := clientcmd.RESTConfigFromKubeConfig(kubeconfig)
if err != nil {
return nil, err
}
applyClientDefaults(cfg)
return cfg, nil
}
// 指定 context 的解析路径
apiConfig, err := clientcmd.Load(kubeconfig)
if err != nil {
return nil, err
}
overrides := &clientcmd.ConfigOverrides{CurrentContext: contextName}
clientConfig := clientcmd.NewDefaultClientConfig(*apiConfig, overrides)
cfg, err := clientConfig.ClientConfig()
if err != nil {
return nil, err
}
applyClientDefaults(cfg)
return cfg, nil
}
// applyClientDefaults 统一设置 QPS/Burst可按需设置超时等
func applyClientDefaults(cfg *clientgorest.Config) {
if cfg == nil {
return
}
if cfg.QPS == 0 {
cfg.QPS = 50
}
if cfg.Burst == 0 {
cfg.Burst = 100
}
}
func GetDevcontainer(ctx context.Context, client dynamic_client.Interface, opts *GetDevcontainerOptions) (*k8s_api_v1.DevcontainerApp, error) {
// 0. 检查参数
if ctx == nil || opts == nil || len(opts.Namespace) == 0 || len(opts.Name) == 0 {
return nil, devcontainer_errors.ErrIllegalDevcontainerParameters{
FieldList: []string{"ctx", "opts", "opts.Name", "opts.Namespace"},
Message: "cannot be nil",
}
}
// 1. 获取 k8s CRD 资源 DevcontainerApp
devcontainerUnstructured, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Get(ctx, opts.Name, opts.GetOptions)
if err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: "Get DevcontainerApp thru k8s API Server",
Message: err.Error(),
}
}
// 2. 解析 DevcontainerApp Status 域,装填 VO
devcontainerApp := &k8s_api_v1.DevcontainerApp{}
err = apimachinery_runtime_utils.DefaultUnstructuredConverter.FromUnstructured(devcontainerUnstructured.Object, &devcontainerApp)
if err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: "Convert k8s API Server unstructured response into DevcontainerApp",
Message: err.Error(),
}
}
// 3. 检查 Devcontainer 是否就绪
if !IsK8sDevcontainerStatusReady(&devcontainerApp.Status) {
// 3.1 检查 Wait 参数,若用户不需要阻塞式等待,直接返回 “DevContainer 未就绪” 错误
if opts.Wait == false {
return nil, devcontainer_errors.ErrK8sDevcontainerNotReady{
Name: opts.Name,
Namespace: opts.Namespace,
Wait: opts.Wait,
}
}
// 3.2 执行阻塞式等待
devcontainerStatusVO, err := waitUntilDevcontainerReadyWithTimeout(ctx, client, opts)
if err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: "wait for k8s DevContainer to be ready",
Message: err.Error(),
}
}
devcontainerApp.Status.Ready = devcontainerStatusVO.Ready
devcontainerApp.Status.NodePortAssigned = devcontainerStatusVO.NodePortAssigned
}
// 4. 将就绪的 DevContainer Status VO 返回
return devcontainerApp, nil
}
// waitUntilDevcontainerReadyWithTimeout 辅助方法:在超时时间内阻塞等待 DevContainer 就绪
func waitUntilDevcontainerReadyWithTimeout(ctx context.Context, client dynamic_client.Interface, opts *GetDevcontainerOptions) (*DevcontainerStatusK8sAgentVO, error) {
// 0. 检查参数
if ctx == nil || client == nil || opts == nil || len(opts.Name) == 0 || len(opts.Namespace) == 0 {
return nil, devcontainer_errors.ErrIllegalDevcontainerParameters{
FieldList: []string{"ctx", "client", "opts", "opts.Name", "opts.Namespace"},
Message: "could not be nil",
}
}
// 1. 注册 watcher 监听 DevContainer Status 变化
watcherTimeoutSeconds := setting.Devcontainer.TimeoutSeconds
watcher, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Watch(ctx, apimachinery_apis_v1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.name=%s", opts.Name),
Watch: true,
TimeoutSeconds: &watcherTimeoutSeconds,
})
if err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: "register watcher of DevContainer Readiness",
Message: err.Error(),
}
}
defer watcher.Stop()
// 2. 当 DevContainer Watcher 事件处理
devcontainerStatusVO := &DevcontainerStatusK8sAgentVO{}
for event := range watcher.ResultChan() {
switch event.Type {
case apimachinery_watch.Added:
// 2.1 监听 DevcontainerApp ADDED 事件,直接 fallthrough 到 MODIFIED 事件合并处理
fallthrough
case apimachinery_watch.Modified:
// 2.2 监听 DevcontainerApp MODIFIED 事件
if devcontainerUnstructured, ok := event.Object.(*apimachinery_apis_v1_unstructured.Unstructured); ok {
// 2.2.1 解析 status 域
statusDevcontainer, ok, err := apimachinery_apis_v1_unstructured.NestedMap(devcontainerUnstructured.Object, "status")
if err == nil && ok {
devcontainerCurrentStatus := &k8s_api_v1.DevcontainerAppStatus{
Ready: statusDevcontainer["ready"].(bool),
NodePortAssigned: uint16(statusDevcontainer["nodePortAssigned"].(int64)),
}
// 2.2.2 当 Status 达到就绪状态后,返回
if IsK8sDevcontainerStatusReady(devcontainerCurrentStatus) {
devcontainerStatusVO.Ready = devcontainerCurrentStatus.Ready
devcontainerStatusVO.NodePortAssigned = devcontainerCurrentStatus.NodePortAssigned
return devcontainerStatusVO, nil
}
}
}
case apimachinery_watch.Error:
// 2.3 监听 DevcontainerApp ERROR 事件,返回报错信息
apimachineryApiMetav1Status, ok := event.Object.(*apimachinery_api_metav1.Status)
if !ok {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: fmt.Sprintf("wait for Devcontainer '%s' in namespace '%s' to be ready", opts.Name, opts.Namespace),
Message: fmt.Sprintf("An error occurred in k8s CRD DevcontainerApp Watcher: \n"+
" Code: %v (status = %v)\n"+
"Message: %v\n"+
" Reason: %v\n"+
"Details: %v",
apimachineryApiMetav1Status.Code, apimachineryApiMetav1Status.Status,
apimachineryApiMetav1Status.Message,
apimachineryApiMetav1Status.Reason,
apimachineryApiMetav1Status.Details),
}
}
case apimachinery_watch.Deleted:
// 2.4 监听 DevcontainerApp DELETED 事件,返回报错信息
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: fmt.Sprintf("Open DevContainer '%s' in namespace '%s'", opts.Name, opts.Namespace),
Message: fmt.Sprintf("'%s' of Kind DevcontainerApp has been Deleted", opts.Name),
}
}
}
// 3. k8s CRD DevcontainerApp Watcher 超时关闭处理:直接返回超时错误
return nil, devcontainer_errors.ErrOpenDevcontainerTimeout{
Name: opts.Name,
Namespace: opts.Namespace,
TimeoutSeconds: setting.Devcontainer.TimeoutSeconds,
}
}
// 修改 CreateDevcontainer 函数
func CreateDevcontainer(ctx context.Context, client dynamic_client.Interface, opts *CreateDevcontainerOptions) (*k8s_api_v1.DevcontainerApp, error) {
// 记录日志
log.Info("Creating DevContainer with options: name=%s, namespace=%s, image=%s",
opts.Name, opts.Namespace, opts.Image)
// 创建资源定义
devcontainerApp := &k8s_api_v1.DevcontainerApp{
TypeMeta: apimachinery_apis_v1.TypeMeta{
Kind: "DevcontainerApp",
APIVersion: "devcontainer.devstar.cn/v1",
},
ObjectMeta: apimachinery_apis_v1.ObjectMeta{
Name: opts.Name,
Namespace: opts.Namespace,
Labels: map[string]string{
"app.kubernetes.io/name": "devcontainer-operator",
"app.kubernetes.io/managed-by": "kustomize",
},
},
Spec: k8s_api_v1.DevcontainerAppSpec{
StatefulSet: k8s_api_v1.StatefulSetSpec{
Image: opts.Image,
Command: opts.CommandList,
ContainerPort: opts.ContainerPort,
SSHPublicKeyList: opts.SSHPublicKeyList,
GitRepositoryURL: opts.GitRepositoryURL,
},
Service: k8s_api_v1.ServiceSpec{
ServicePort: opts.ServicePort,
ExtraPorts: opts.ExtraPorts, // 添加 ExtraPorts 配置
},
},
}
// 转换为 JSON
jsonData, err := json.Marshal(devcontainerApp)
if err != nil {
log.Error("Failed to marshal DevcontainerApp to JSON: %v", err)
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
Action: "Marshal JSON",
Message: err.Error(),
}
}
// 输出 JSON 以便调试
log.Debug("Generated JSON for DevcontainerApp:\n%s", string(jsonData))
// 转换为 Unstructured 对象
unstructuredObj := &apimachinery_apis_v1_unstructured.Unstructured{}
err = unstructuredObj.UnmarshalJSON(jsonData)
if err != nil {
log.Error("Failed to unmarshal JSON to Unstructured: %v", err)
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
Action: "Unmarshal JSON to Unstructured",
Message: err.Error(),
}
}
// 确认 GroupVersionResource 定义
log.Debug("Using GroupVersionResource: Group=%s, Version=%s, Resource=%s",
groupVersionResource.Group, groupVersionResource.Version, groupVersionResource.Resource)
// 创建资源
log.Info("Creating DevcontainerApp resource in namespace %s", opts.Namespace)
result, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Create(ctx, unstructuredObj, opts.CreateOptions)
if err != nil {
log.Error("Failed to create DevcontainerApp: %v", err)
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
Action: "create DevContainer via Dynamic Client",
Message: err.Error(),
}
}
log.Info("DevcontainerApp resource created successfully")
// 将结果转换回 DevcontainerApp 结构体
resultJSON, err := result.MarshalJSON()
if err != nil {
log.Error("Failed to marshal result to JSON: %v", err)
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
Action: "Marshal result JSON",
Message: err.Error(),
}
}
createdDevcontainer := &k8s_api_v1.DevcontainerApp{}
if err := json.Unmarshal(resultJSON, createdDevcontainer); err != nil {
log.Error("Failed to unmarshal result JSON: %v", err)
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
Action: "Unmarshal result JSON",
Message: err.Error(),
}
}
return createdDevcontainer, nil
}
func DeleteDevcontainer(ctx context.Context, client dynamic_client.Interface, opts *DeleteDevcontainerOptions) error {
if ctx == nil || opts == nil || len(opts.Namespace) == 0 || len(opts.Name) == 0 {
return devcontainer_errors.ErrIllegalDevcontainerParameters{
FieldList: []string{"ctx", "opts", "opts.Name", "opts.Namespace"},
Message: "cannot be nil",
}
}
err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Delete(ctx, opts.Name, opts.DeleteOptions)
if err != nil {
log.Warn("Failed to delete DevcontainerApp '%s' in namespace '%s': %s", opts.Name, opts.Namespace, err.Error())
return devcontainer_errors.ErrOperateDevcontainer{
Action: fmt.Sprintf("delete devcontainer '%s' in namespace '%s'", opts.Name, opts.Namespace),
Message: err.Error(),
}
}
return nil
}
// ListDevcontainers 根据条件列举 DevContainer
func ListDevcontainers(ctx context.Context, client dynamic_client.Interface, opts *ListDevcontainersOptions) (*k8s_api_v1.DevcontainerAppList, error) {
if ctx == nil || opts == nil || len(opts.Namespace) == 0 {
return nil, devcontainer_errors.ErrIllegalDevcontainerParameters{
FieldList: []string{"ctx", "namespace"},
Message: "cannot be empty",
}
}
list, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).List(ctx, opts.ListOptions)
if err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: fmt.Sprintf("List Devcontainer in namespace '%s'", opts.Namespace),
Message: err.Error(),
}
}
// JSON 反序列化为 DevcontainerAppList
jsonData, err := list.MarshalJSON()
if err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: "verify JSON data of Devcontainer List",
Message: err.Error(),
}
}
devcontainerList := &k8s_api_v1.DevcontainerAppList{}
if err := json.Unmarshal(jsonData, devcontainerList); err != nil {
return nil, devcontainer_errors.ErrOperateDevcontainer{
Action: "deserialize Devcontainer List data",
Message: err.Error(),
}
}
return devcontainerList, nil
}