package k8s_agent import ( "context" "encoding/json" "fmt" "code.gitea.io/gitea/modules/log" "code.gitea.io/gitea/modules/setting" k8s_api_v1 "code.gitea.io/gitea/modules/k8s/api/devcontainer/v1" devcontainer_errors "code.gitea.io/gitea/modules/k8s/errors" devcontainer_k8s_agent_modules_errors "code.gitea.io/gitea/modules/k8s/errors" apimachinery_api_metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apimachinery_apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" apimachinery_apis_v1_unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" apimachinery_runtime_utils "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" apimachinery_watch "k8s.io/apimachinery/pkg/watch" dynamic_client "k8s.io/client-go/dynamic" dynamicclient "k8s.io/client-go/dynamic" clientgorest "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) // IsK8sDevcontainerStatusReady 工具类方法,判断给定的 DevcontainerApp.Status 是否达到就绪状态 // 1. DevcontainerApp.Status.Ready == true // 2. DevcontainerApp.Status.NodePortAssigned 介于闭区间 [30000, 32767] func IsK8sDevcontainerStatusReady(devcontainerAppStatus *k8s_api_v1.DevcontainerAppStatus) bool { return devcontainerAppStatus != nil && devcontainerAppStatus.Ready && devcontainerAppStatus.NodePortAssigned >= 30000 && devcontainerAppStatus.NodePortAssigned <= 32767 } // groupVersionResource 用于描述 CRD,供 dynamic Client 交互使用 var groupVersionResource = schema.GroupVersionResource{ Group: k8s_api_v1.GroupVersion.Group, Version: k8s_api_v1.GroupVersion.Version, Resource: "devcontainerapps", } // GetKubernetesClient 通过用户提供的 kubeconfig 原始内容与可选的 contextName 获取动态客户端 func GetKubernetesClient(ctx context.Context, kubeconfig []byte, contextName string) (dynamicclient.Interface, error) { var config *clientgorest.Config var err error if len(kubeconfig) == 0 { // 未提供 kubeconfig 内容:优先使用本机默认 kubeconfig,其次回退到 InCluster config, err = clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile) if err != nil { log.Warn("Failed to obtain Kubernetes config outside of cluster: " + clientcmd.RecommendedHomeFile) config, err = clientgorest.InClusterConfig() if err != nil { log.Error("Failed to obtain Kubernetes config both inside/outside of cluster, the DevContainer is Disabled") setting.Devcontainer.Enabled = false return nil, err } } } else { // 提供了 kubeconfig 内容:按用户提供的内容与可选 context 获取配置 config, err = restConfigFromKubeconfigBytes(kubeconfig, contextName) if err != nil { return nil, err } } applyClientDefaults(config) return dynamicclient.NewForConfig(config) } // restConfigFromKubeconfigBytes 基于 kubeconfig 内容构造 *rest.Config,支持指定 context(为空则使用 current-context) func restConfigFromKubeconfigBytes(kubeconfig []byte, contextName string) (*clientgorest.Config, error) { if contextName == "" { cfg, err := clientcmd.RESTConfigFromKubeConfig(kubeconfig) if err != nil { return nil, err } applyClientDefaults(cfg) return cfg, nil } // 指定 context 的解析路径 apiConfig, err := clientcmd.Load(kubeconfig) if err != nil { return nil, err } overrides := &clientcmd.ConfigOverrides{CurrentContext: contextName} clientConfig := clientcmd.NewDefaultClientConfig(*apiConfig, overrides) cfg, err := clientConfig.ClientConfig() if err != nil { return nil, err } applyClientDefaults(cfg) return cfg, nil } // applyClientDefaults 统一设置 QPS/Burst(可按需设置超时等) func applyClientDefaults(cfg *clientgorest.Config) { if cfg == nil { return } if cfg.QPS == 0 { cfg.QPS = 50 } if cfg.Burst == 0 { cfg.Burst = 100 } } func GetDevcontainer(ctx context.Context, client dynamic_client.Interface, opts *GetDevcontainerOptions) (*k8s_api_v1.DevcontainerApp, error) { // 0. 检查参数 if ctx == nil || opts == nil || len(opts.Namespace) == 0 || len(opts.Name) == 0 { return nil, devcontainer_errors.ErrIllegalDevcontainerParameters{ FieldList: []string{"ctx", "opts", "opts.Name", "opts.Namespace"}, Message: "cannot be nil", } } // 1. 获取 k8s CRD 资源 DevcontainerApp devcontainerUnstructured, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Get(ctx, opts.Name, opts.GetOptions) if err != nil { return nil, devcontainer_errors.ErrOperateDevcontainer{ Action: "Get DevcontainerApp thru k8s API Server", Message: err.Error(), } } // 2. 解析 DevcontainerApp Status 域,装填 VO devcontainerApp := &k8s_api_v1.DevcontainerApp{} err = apimachinery_runtime_utils.DefaultUnstructuredConverter.FromUnstructured(devcontainerUnstructured.Object, &devcontainerApp) if err != nil { return nil, devcontainer_errors.ErrOperateDevcontainer{ Action: "Convert k8s API Server unstructured response into DevcontainerApp", Message: err.Error(), } } // 3. 检查 Devcontainer 是否就绪 if !IsK8sDevcontainerStatusReady(&devcontainerApp.Status) { // 3.1 检查 Wait 参数,若用户不需要阻塞式等待,直接返回 “DevContainer 未就绪” 错误 if opts.Wait == false { return nil, devcontainer_errors.ErrK8sDevcontainerNotReady{ Name: opts.Name, Namespace: opts.Namespace, Wait: opts.Wait, } } // 3.2 执行阻塞式等待 devcontainerStatusVO, err := waitUntilDevcontainerReadyWithTimeout(ctx, client, opts) if err != nil { return nil, devcontainer_errors.ErrOperateDevcontainer{ Action: "wait for k8s DevContainer to be ready", Message: err.Error(), } } devcontainerApp.Status.Ready = devcontainerStatusVO.Ready devcontainerApp.Status.NodePortAssigned = devcontainerStatusVO.NodePortAssigned } // 4. 将就绪的 DevContainer Status VO 返回 return devcontainerApp, nil } // waitUntilDevcontainerReadyWithTimeout 辅助方法:在超时时间内阻塞等待 DevContainer 就绪 func waitUntilDevcontainerReadyWithTimeout(ctx context.Context, client dynamic_client.Interface, opts *GetDevcontainerOptions) (*DevcontainerStatusK8sAgentVO, error) { // 0. 检查参数 if ctx == nil || client == nil || opts == nil || len(opts.Name) == 0 || len(opts.Namespace) == 0 { return nil, devcontainer_errors.ErrIllegalDevcontainerParameters{ FieldList: []string{"ctx", "client", "opts", "opts.Name", "opts.Namespace"}, Message: "could not be nil", } } // 1. 注册 watcher 监听 DevContainer Status 变化 watcherTimeoutSeconds := setting.Devcontainer.TimeoutSeconds watcher, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Watch(ctx, apimachinery_apis_v1.ListOptions{ FieldSelector: fmt.Sprintf("metadata.name=%s", opts.Name), Watch: true, TimeoutSeconds: &watcherTimeoutSeconds, }) if err != nil { return nil, devcontainer_errors.ErrOperateDevcontainer{ Action: "register watcher of DevContainer Readiness", Message: err.Error(), } } defer watcher.Stop() // 2. 当 DevContainer Watcher 事件处理 devcontainerStatusVO := &DevcontainerStatusK8sAgentVO{} for event := range watcher.ResultChan() { switch event.Type { case apimachinery_watch.Added: // 2.1 监听 DevcontainerApp ADDED 事件,直接 fallthrough 到 MODIFIED 事件合并处理 fallthrough case apimachinery_watch.Modified: // 2.2 监听 DevcontainerApp MODIFIED 事件 if devcontainerUnstructured, ok := event.Object.(*apimachinery_apis_v1_unstructured.Unstructured); ok { // 2.2.1 解析 status 域 statusDevcontainer, ok, err := apimachinery_apis_v1_unstructured.NestedMap(devcontainerUnstructured.Object, "status") if err == nil && ok { devcontainerCurrentStatus := &k8s_api_v1.DevcontainerAppStatus{ Ready: statusDevcontainer["ready"].(bool), NodePortAssigned: uint16(statusDevcontainer["nodePortAssigned"].(int64)), } // 2.2.2 当 Status 达到就绪状态后,返回 if IsK8sDevcontainerStatusReady(devcontainerCurrentStatus) { devcontainerStatusVO.Ready = devcontainerCurrentStatus.Ready devcontainerStatusVO.NodePortAssigned = devcontainerCurrentStatus.NodePortAssigned return devcontainerStatusVO, nil } } } case apimachinery_watch.Error: // 2.3 监听 DevcontainerApp ERROR 事件,返回报错信息 apimachineryApiMetav1Status, ok := event.Object.(*apimachinery_api_metav1.Status) if !ok { return nil, devcontainer_errors.ErrOperateDevcontainer{ Action: fmt.Sprintf("wait for Devcontainer '%s' in namespace '%s' to be ready", opts.Name, opts.Namespace), Message: fmt.Sprintf("An error occurred in k8s CRD DevcontainerApp Watcher: \n"+ " Code: %v (status = %v)\n"+ "Message: %v\n"+ " Reason: %v\n"+ "Details: %v", apimachineryApiMetav1Status.Code, apimachineryApiMetav1Status.Status, apimachineryApiMetav1Status.Message, apimachineryApiMetav1Status.Reason, apimachineryApiMetav1Status.Details), } } case apimachinery_watch.Deleted: // 2.4 监听 DevcontainerApp DELETED 事件,返回报错信息 return nil, devcontainer_errors.ErrOperateDevcontainer{ Action: fmt.Sprintf("Open DevContainer '%s' in namespace '%s'", opts.Name, opts.Namespace), Message: fmt.Sprintf("'%s' of Kind DevcontainerApp has been Deleted", opts.Name), } } } // 3. k8s CRD DevcontainerApp Watcher 超时关闭处理:直接返回超时错误 return nil, devcontainer_errors.ErrOpenDevcontainerTimeout{ Name: opts.Name, Namespace: opts.Namespace, TimeoutSeconds: setting.Devcontainer.TimeoutSeconds, } } // 修改 CreateDevcontainer 函数 func CreateDevcontainer(ctx context.Context, client dynamic_client.Interface, opts *CreateDevcontainerOptions) (*k8s_api_v1.DevcontainerApp, error) { // 记录日志 log.Info("Creating DevContainer with options: name=%s, namespace=%s, image=%s", opts.Name, opts.Namespace, opts.Image) // 创建资源定义 devcontainerApp := &k8s_api_v1.DevcontainerApp{ TypeMeta: apimachinery_apis_v1.TypeMeta{ Kind: "DevcontainerApp", APIVersion: "devcontainer.devstar.cn/v1", }, ObjectMeta: apimachinery_apis_v1.ObjectMeta{ Name: opts.Name, Namespace: opts.Namespace, Labels: map[string]string{ "app.kubernetes.io/name": "devcontainer-operator", "app.kubernetes.io/managed-by": "kustomize", }, }, Spec: k8s_api_v1.DevcontainerAppSpec{ StatefulSet: k8s_api_v1.StatefulSetSpec{ Image: opts.Image, Command: opts.CommandList, ContainerPort: opts.ContainerPort, SSHPublicKeyList: opts.SSHPublicKeyList, GitRepositoryURL: opts.GitRepositoryURL, }, Service: k8s_api_v1.ServiceSpec{ ServicePort: opts.ServicePort, ExtraPorts: opts.ExtraPorts, // 添加 ExtraPorts 配置 }, }, } // 转换为 JSON jsonData, err := json.Marshal(devcontainerApp) if err != nil { log.Error("Failed to marshal DevcontainerApp to JSON: %v", err) return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{ Action: "Marshal JSON", Message: err.Error(), } } // 输出 JSON 以便调试 log.Debug("Generated JSON for DevcontainerApp:\n%s", string(jsonData)) // 转换为 Unstructured 对象 unstructuredObj := &apimachinery_apis_v1_unstructured.Unstructured{} err = unstructuredObj.UnmarshalJSON(jsonData) if err != nil { log.Error("Failed to unmarshal JSON to Unstructured: %v", err) return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{ Action: "Unmarshal JSON to Unstructured", Message: err.Error(), } } // 确认 GroupVersionResource 定义 log.Debug("Using GroupVersionResource: Group=%s, Version=%s, Resource=%s", groupVersionResource.Group, groupVersionResource.Version, groupVersionResource.Resource) // 创建资源 log.Info("Creating DevcontainerApp resource in namespace %s", opts.Namespace) result, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Create(ctx, unstructuredObj, opts.CreateOptions) if err != nil { log.Error("Failed to create DevcontainerApp: %v", err) return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{ Action: "create DevContainer via Dynamic Client", Message: err.Error(), } } log.Info("DevcontainerApp resource created successfully") // 将结果转换回 DevcontainerApp 结构体 resultJSON, err := result.MarshalJSON() if err != nil { log.Error("Failed to marshal result to JSON: %v", err) return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{ Action: "Marshal result JSON", Message: err.Error(), } } createdDevcontainer := &k8s_api_v1.DevcontainerApp{} if err := json.Unmarshal(resultJSON, createdDevcontainer); err != nil { log.Error("Failed to unmarshal result JSON: %v", err) return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{ Action: "Unmarshal result JSON", Message: err.Error(), } } return createdDevcontainer, nil } func DeleteDevcontainer(ctx context.Context, client dynamic_client.Interface, opts *DeleteDevcontainerOptions) error { if ctx == nil || opts == nil || len(opts.Namespace) == 0 || len(opts.Name) == 0 { return devcontainer_errors.ErrIllegalDevcontainerParameters{ FieldList: []string{"ctx", "opts", "opts.Name", "opts.Namespace"}, Message: "cannot be nil", } } err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Delete(ctx, opts.Name, opts.DeleteOptions) if err != nil { log.Warn("Failed to delete DevcontainerApp '%s' in namespace '%s': %s", opts.Name, opts.Namespace, err.Error()) return devcontainer_errors.ErrOperateDevcontainer{ Action: fmt.Sprintf("delete devcontainer '%s' in namespace '%s'", opts.Name, opts.Namespace), Message: err.Error(), } } return nil } // ListDevcontainers 根据条件列举 DevContainer func ListDevcontainers(ctx context.Context, client dynamic_client.Interface, opts *ListDevcontainersOptions) (*k8s_api_v1.DevcontainerAppList, error) { if ctx == nil || opts == nil || len(opts.Namespace) == 0 { return nil, devcontainer_errors.ErrIllegalDevcontainerParameters{ FieldList: []string{"ctx", "namespace"}, Message: "cannot be empty", } } list, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).List(ctx, opts.ListOptions) if err != nil { return nil, devcontainer_errors.ErrOperateDevcontainer{ Action: fmt.Sprintf("List Devcontainer in namespace '%s'", opts.Namespace), Message: err.Error(), } } // JSON 反序列化为 DevcontainerAppList jsonData, err := list.MarshalJSON() if err != nil { return nil, devcontainer_errors.ErrOperateDevcontainer{ Action: "verify JSON data of Devcontainer List", Message: err.Error(), } } devcontainerList := &k8s_api_v1.DevcontainerAppList{} if err := json.Unmarshal(jsonData, devcontainerList); err != nil { return nil, devcontainer_errors.ErrOperateDevcontainer{ Action: "deserialize Devcontainer List data", Message: err.Error(), } } return devcontainerList, nil }