404 lines
15 KiB
Go
404 lines
15 KiB
Go
package k8s_agent
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
|
||
"code.gitea.io/gitea/modules/log"
|
||
"code.gitea.io/gitea/modules/setting"
|
||
|
||
k8s_api_v1 "code.gitea.io/gitea/modules/k8s/api/devcontainer/v1"
|
||
|
||
devcontainer_errors "code.gitea.io/gitea/modules/k8s/errors"
|
||
devcontainer_k8s_agent_modules_errors "code.gitea.io/gitea/modules/k8s/errors"
|
||
apimachinery_api_metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||
apimachinery_apis_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||
apimachinery_apis_v1_unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||
apimachinery_runtime_utils "k8s.io/apimachinery/pkg/runtime"
|
||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||
apimachinery_watch "k8s.io/apimachinery/pkg/watch"
|
||
dynamic_client "k8s.io/client-go/dynamic"
|
||
dynamicclient "k8s.io/client-go/dynamic"
|
||
clientgorest "k8s.io/client-go/rest"
|
||
"k8s.io/client-go/tools/clientcmd"
|
||
)
|
||
|
||
// IsK8sDevcontainerStatusReady 工具类方法,判断给定的 DevcontainerApp.Status 是否达到就绪状态
|
||
// 1. DevcontainerApp.Status.Ready == true
|
||
// 2. DevcontainerApp.Status.NodePortAssigned 介于闭区间 [30000, 32767]
|
||
func IsK8sDevcontainerStatusReady(devcontainerAppStatus *k8s_api_v1.DevcontainerAppStatus) bool {
|
||
return devcontainerAppStatus != nil &&
|
||
devcontainerAppStatus.Ready &&
|
||
devcontainerAppStatus.NodePortAssigned >= 30000 &&
|
||
devcontainerAppStatus.NodePortAssigned <= 32767
|
||
}
|
||
|
||
// groupVersionResource 用于描述 CRD,供 dynamic Client 交互使用
|
||
var groupVersionResource = schema.GroupVersionResource{
|
||
Group: k8s_api_v1.GroupVersion.Group,
|
||
Version: k8s_api_v1.GroupVersion.Version,
|
||
Resource: "devcontainerapps",
|
||
}
|
||
|
||
// GetKubernetesClient 通过用户提供的 kubeconfig 原始内容与可选的 contextName 获取动态客户端
|
||
func GetKubernetesClient(ctx context.Context, kubeconfig []byte, contextName string) (dynamicclient.Interface, error) {
|
||
var config *clientgorest.Config
|
||
var err error
|
||
|
||
if len(kubeconfig) == 0 {
|
||
// 未提供 kubeconfig 内容:优先使用本机默认 kubeconfig,其次回退到 InCluster
|
||
config, err = clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
|
||
if err != nil {
|
||
log.Warn("Failed to obtain Kubernetes config outside of cluster: " + clientcmd.RecommendedHomeFile)
|
||
config, err = clientgorest.InClusterConfig()
|
||
if err != nil {
|
||
log.Error("Failed to obtain Kubernetes config both inside/outside of cluster, the DevContainer is Disabled")
|
||
setting.Devcontainer.Enabled = false
|
||
return nil, err
|
||
}
|
||
}
|
||
} else {
|
||
// 提供了 kubeconfig 内容:按用户提供的内容与可选 context 获取配置
|
||
config, err = restConfigFromKubeconfigBytes(kubeconfig, contextName)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
}
|
||
|
||
applyClientDefaults(config)
|
||
return dynamicclient.NewForConfig(config)
|
||
}
|
||
|
||
// restConfigFromKubeconfigBytes 基于 kubeconfig 内容构造 *rest.Config,支持指定 context(为空则使用 current-context)
|
||
func restConfigFromKubeconfigBytes(kubeconfig []byte, contextName string) (*clientgorest.Config, error) {
|
||
|
||
if contextName == "" {
|
||
cfg, err := clientcmd.RESTConfigFromKubeConfig(kubeconfig)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
applyClientDefaults(cfg)
|
||
return cfg, nil
|
||
}
|
||
// 指定 context 的解析路径
|
||
apiConfig, err := clientcmd.Load(kubeconfig)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
overrides := &clientcmd.ConfigOverrides{CurrentContext: contextName}
|
||
clientConfig := clientcmd.NewDefaultClientConfig(*apiConfig, overrides)
|
||
cfg, err := clientConfig.ClientConfig()
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
applyClientDefaults(cfg)
|
||
return cfg, nil
|
||
}
|
||
|
||
// applyClientDefaults 统一设置 QPS/Burst(可按需设置超时等)
|
||
func applyClientDefaults(cfg *clientgorest.Config) {
|
||
if cfg == nil {
|
||
return
|
||
}
|
||
if cfg.QPS == 0 {
|
||
cfg.QPS = 50
|
||
}
|
||
if cfg.Burst == 0 {
|
||
cfg.Burst = 100
|
||
}
|
||
}
|
||
|
||
func GetDevcontainer(ctx context.Context, client dynamic_client.Interface, opts *GetDevcontainerOptions) (*k8s_api_v1.DevcontainerApp, error) {
|
||
|
||
// 0. 检查参数
|
||
if ctx == nil || opts == nil || len(opts.Namespace) == 0 || len(opts.Name) == 0 {
|
||
return nil, devcontainer_errors.ErrIllegalDevcontainerParameters{
|
||
FieldList: []string{"ctx", "opts", "opts.Name", "opts.Namespace"},
|
||
Message: "cannot be nil",
|
||
}
|
||
}
|
||
|
||
// 1. 获取 k8s CRD 资源 DevcontainerApp
|
||
devcontainerUnstructured, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Get(ctx, opts.Name, opts.GetOptions)
|
||
if err != nil {
|
||
return nil, devcontainer_errors.ErrOperateDevcontainer{
|
||
Action: "Get DevcontainerApp thru k8s API Server",
|
||
Message: err.Error(),
|
||
}
|
||
}
|
||
|
||
// 2. 解析 DevcontainerApp Status 域,装填 VO
|
||
devcontainerApp := &k8s_api_v1.DevcontainerApp{}
|
||
err = apimachinery_runtime_utils.DefaultUnstructuredConverter.FromUnstructured(devcontainerUnstructured.Object, &devcontainerApp)
|
||
if err != nil {
|
||
return nil, devcontainer_errors.ErrOperateDevcontainer{
|
||
Action: "Convert k8s API Server unstructured response into DevcontainerApp",
|
||
Message: err.Error(),
|
||
}
|
||
}
|
||
|
||
// 3. 检查 Devcontainer 是否就绪
|
||
if !IsK8sDevcontainerStatusReady(&devcontainerApp.Status) {
|
||
// 3.1 检查 Wait 参数,若用户不需要阻塞式等待,直接返回 “DevContainer 未就绪” 错误
|
||
if opts.Wait == false {
|
||
return nil, devcontainer_errors.ErrK8sDevcontainerNotReady{
|
||
Name: opts.Name,
|
||
Namespace: opts.Namespace,
|
||
Wait: opts.Wait,
|
||
}
|
||
}
|
||
|
||
// 3.2 执行阻塞式等待
|
||
devcontainerStatusVO, err := waitUntilDevcontainerReadyWithTimeout(ctx, client, opts)
|
||
if err != nil {
|
||
return nil, devcontainer_errors.ErrOperateDevcontainer{
|
||
Action: "wait for k8s DevContainer to be ready",
|
||
Message: err.Error(),
|
||
}
|
||
}
|
||
devcontainerApp.Status.Ready = devcontainerStatusVO.Ready
|
||
devcontainerApp.Status.NodePortAssigned = devcontainerStatusVO.NodePortAssigned
|
||
}
|
||
|
||
// 4. 将就绪的 DevContainer Status VO 返回
|
||
return devcontainerApp, nil
|
||
}
|
||
|
||
// waitUntilDevcontainerReadyWithTimeout 辅助方法:在超时时间内阻塞等待 DevContainer 就绪
|
||
func waitUntilDevcontainerReadyWithTimeout(ctx context.Context, client dynamic_client.Interface, opts *GetDevcontainerOptions) (*DevcontainerStatusK8sAgentVO, error) {
|
||
|
||
// 0. 检查参数
|
||
if ctx == nil || client == nil || opts == nil || len(opts.Name) == 0 || len(opts.Namespace) == 0 {
|
||
return nil, devcontainer_errors.ErrIllegalDevcontainerParameters{
|
||
FieldList: []string{"ctx", "client", "opts", "opts.Name", "opts.Namespace"},
|
||
Message: "could not be nil",
|
||
}
|
||
}
|
||
|
||
// 1. 注册 watcher 监听 DevContainer Status 变化
|
||
watcherTimeoutSeconds := setting.Devcontainer.TimeoutSeconds
|
||
watcher, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Watch(ctx, apimachinery_apis_v1.ListOptions{
|
||
FieldSelector: fmt.Sprintf("metadata.name=%s", opts.Name),
|
||
Watch: true,
|
||
TimeoutSeconds: &watcherTimeoutSeconds,
|
||
})
|
||
if err != nil {
|
||
return nil, devcontainer_errors.ErrOperateDevcontainer{
|
||
Action: "register watcher of DevContainer Readiness",
|
||
Message: err.Error(),
|
||
}
|
||
}
|
||
defer watcher.Stop()
|
||
|
||
// 2. 当 DevContainer Watcher 事件处理
|
||
devcontainerStatusVO := &DevcontainerStatusK8sAgentVO{}
|
||
for event := range watcher.ResultChan() {
|
||
switch event.Type {
|
||
case apimachinery_watch.Added:
|
||
// 2.1 监听 DevcontainerApp ADDED 事件,直接 fallthrough 到 MODIFIED 事件合并处理
|
||
fallthrough
|
||
case apimachinery_watch.Modified:
|
||
// 2.2 监听 DevcontainerApp MODIFIED 事件
|
||
if devcontainerUnstructured, ok := event.Object.(*apimachinery_apis_v1_unstructured.Unstructured); ok {
|
||
// 2.2.1 解析 status 域
|
||
statusDevcontainer, ok, err := apimachinery_apis_v1_unstructured.NestedMap(devcontainerUnstructured.Object, "status")
|
||
if err == nil && ok {
|
||
devcontainerCurrentStatus := &k8s_api_v1.DevcontainerAppStatus{
|
||
Ready: statusDevcontainer["ready"].(bool),
|
||
NodePortAssigned: uint16(statusDevcontainer["nodePortAssigned"].(int64)),
|
||
}
|
||
// 2.2.2 当 Status 达到就绪状态后,返回
|
||
if IsK8sDevcontainerStatusReady(devcontainerCurrentStatus) {
|
||
devcontainerStatusVO.Ready = devcontainerCurrentStatus.Ready
|
||
devcontainerStatusVO.NodePortAssigned = devcontainerCurrentStatus.NodePortAssigned
|
||
return devcontainerStatusVO, nil
|
||
}
|
||
}
|
||
}
|
||
case apimachinery_watch.Error:
|
||
// 2.3 监听 DevcontainerApp ERROR 事件,返回报错信息
|
||
apimachineryApiMetav1Status, ok := event.Object.(*apimachinery_api_metav1.Status)
|
||
if !ok {
|
||
return nil, devcontainer_errors.ErrOperateDevcontainer{
|
||
Action: fmt.Sprintf("wait for Devcontainer '%s' in namespace '%s' to be ready", opts.Name, opts.Namespace),
|
||
Message: fmt.Sprintf("An error occurred in k8s CRD DevcontainerApp Watcher: \n"+
|
||
" Code: %v (status = %v)\n"+
|
||
"Message: %v\n"+
|
||
" Reason: %v\n"+
|
||
"Details: %v",
|
||
apimachineryApiMetav1Status.Code, apimachineryApiMetav1Status.Status,
|
||
apimachineryApiMetav1Status.Message,
|
||
apimachineryApiMetav1Status.Reason,
|
||
apimachineryApiMetav1Status.Details),
|
||
}
|
||
}
|
||
case apimachinery_watch.Deleted:
|
||
// 2.4 监听 DevcontainerApp DELETED 事件,返回报错信息
|
||
return nil, devcontainer_errors.ErrOperateDevcontainer{
|
||
Action: fmt.Sprintf("Open DevContainer '%s' in namespace '%s'", opts.Name, opts.Namespace),
|
||
Message: fmt.Sprintf("'%s' of Kind DevcontainerApp has been Deleted", opts.Name),
|
||
}
|
||
}
|
||
}
|
||
|
||
// 3. k8s CRD DevcontainerApp Watcher 超时关闭处理:直接返回超时错误
|
||
return nil, devcontainer_errors.ErrOpenDevcontainerTimeout{
|
||
Name: opts.Name,
|
||
Namespace: opts.Namespace,
|
||
TimeoutSeconds: setting.Devcontainer.TimeoutSeconds,
|
||
}
|
||
}
|
||
|
||
// 修改 CreateDevcontainer 函数
|
||
func CreateDevcontainer(ctx context.Context, client dynamic_client.Interface, opts *CreateDevcontainerOptions) (*k8s_api_v1.DevcontainerApp, error) {
|
||
// 记录日志
|
||
log.Info("Creating DevContainer with options: name=%s, namespace=%s, image=%s",
|
||
opts.Name, opts.Namespace, opts.Image)
|
||
|
||
// 创建资源定义
|
||
devcontainerApp := &k8s_api_v1.DevcontainerApp{
|
||
TypeMeta: apimachinery_apis_v1.TypeMeta{
|
||
Kind: "DevcontainerApp",
|
||
APIVersion: "devcontainer.devstar.cn/v1",
|
||
},
|
||
ObjectMeta: apimachinery_apis_v1.ObjectMeta{
|
||
Name: opts.Name,
|
||
Namespace: opts.Namespace,
|
||
Labels: map[string]string{
|
||
"app.kubernetes.io/name": "devcontainer-operator",
|
||
"app.kubernetes.io/managed-by": "kustomize",
|
||
},
|
||
},
|
||
Spec: k8s_api_v1.DevcontainerAppSpec{
|
||
StatefulSet: k8s_api_v1.StatefulSetSpec{
|
||
Image: opts.Image,
|
||
Command: opts.CommandList,
|
||
ContainerPort: opts.ContainerPort,
|
||
SSHPublicKeyList: opts.SSHPublicKeyList,
|
||
GitRepositoryURL: opts.GitRepositoryURL,
|
||
},
|
||
Service: k8s_api_v1.ServiceSpec{
|
||
ServicePort: opts.ServicePort,
|
||
ExtraPorts: opts.ExtraPorts, // 添加 ExtraPorts 配置
|
||
},
|
||
},
|
||
}
|
||
|
||
// 转换为 JSON
|
||
jsonData, err := json.Marshal(devcontainerApp)
|
||
if err != nil {
|
||
log.Error("Failed to marshal DevcontainerApp to JSON: %v", err)
|
||
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
|
||
Action: "Marshal JSON",
|
||
Message: err.Error(),
|
||
}
|
||
}
|
||
|
||
// 输出 JSON 以便调试
|
||
log.Debug("Generated JSON for DevcontainerApp:\n%s", string(jsonData))
|
||
|
||
// 转换为 Unstructured 对象
|
||
unstructuredObj := &apimachinery_apis_v1_unstructured.Unstructured{}
|
||
err = unstructuredObj.UnmarshalJSON(jsonData)
|
||
if err != nil {
|
||
log.Error("Failed to unmarshal JSON to Unstructured: %v", err)
|
||
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
|
||
Action: "Unmarshal JSON to Unstructured",
|
||
Message: err.Error(),
|
||
}
|
||
}
|
||
|
||
// 确认 GroupVersionResource 定义
|
||
log.Debug("Using GroupVersionResource: Group=%s, Version=%s, Resource=%s",
|
||
groupVersionResource.Group, groupVersionResource.Version, groupVersionResource.Resource)
|
||
|
||
// 创建资源
|
||
log.Info("Creating DevcontainerApp resource in namespace %s", opts.Namespace)
|
||
result, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Create(ctx, unstructuredObj, opts.CreateOptions)
|
||
if err != nil {
|
||
log.Error("Failed to create DevcontainerApp: %v", err)
|
||
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
|
||
Action: "create DevContainer via Dynamic Client",
|
||
Message: err.Error(),
|
||
}
|
||
}
|
||
|
||
log.Info("DevcontainerApp resource created successfully")
|
||
|
||
// 将结果转换回 DevcontainerApp 结构体
|
||
resultJSON, err := result.MarshalJSON()
|
||
if err != nil {
|
||
log.Error("Failed to marshal result to JSON: %v", err)
|
||
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
|
||
Action: "Marshal result JSON",
|
||
Message: err.Error(),
|
||
}
|
||
}
|
||
|
||
createdDevcontainer := &k8s_api_v1.DevcontainerApp{}
|
||
if err := json.Unmarshal(resultJSON, createdDevcontainer); err != nil {
|
||
log.Error("Failed to unmarshal result JSON: %v", err)
|
||
return nil, devcontainer_k8s_agent_modules_errors.ErrOperateDevcontainer{
|
||
Action: "Unmarshal result JSON",
|
||
Message: err.Error(),
|
||
}
|
||
}
|
||
|
||
return createdDevcontainer, nil
|
||
}
|
||
|
||
func DeleteDevcontainer(ctx context.Context, client dynamic_client.Interface, opts *DeleteDevcontainerOptions) error {
|
||
if ctx == nil || opts == nil || len(opts.Namespace) == 0 || len(opts.Name) == 0 {
|
||
return devcontainer_errors.ErrIllegalDevcontainerParameters{
|
||
FieldList: []string{"ctx", "opts", "opts.Name", "opts.Namespace"},
|
||
Message: "cannot be nil",
|
||
}
|
||
}
|
||
|
||
err := client.Resource(groupVersionResource).Namespace(opts.Namespace).Delete(ctx, opts.Name, opts.DeleteOptions)
|
||
if err != nil {
|
||
log.Warn("Failed to delete DevcontainerApp '%s' in namespace '%s': %s", opts.Name, opts.Namespace, err.Error())
|
||
return devcontainer_errors.ErrOperateDevcontainer{
|
||
Action: fmt.Sprintf("delete devcontainer '%s' in namespace '%s'", opts.Name, opts.Namespace),
|
||
Message: err.Error(),
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// ListDevcontainers 根据条件列举 DevContainer
|
||
func ListDevcontainers(ctx context.Context, client dynamic_client.Interface, opts *ListDevcontainersOptions) (*k8s_api_v1.DevcontainerAppList, error) {
|
||
|
||
if ctx == nil || opts == nil || len(opts.Namespace) == 0 {
|
||
return nil, devcontainer_errors.ErrIllegalDevcontainerParameters{
|
||
FieldList: []string{"ctx", "namespace"},
|
||
Message: "cannot be empty",
|
||
}
|
||
}
|
||
|
||
list, err := client.Resource(groupVersionResource).Namespace(opts.Namespace).List(ctx, opts.ListOptions)
|
||
if err != nil {
|
||
return nil, devcontainer_errors.ErrOperateDevcontainer{
|
||
Action: fmt.Sprintf("List Devcontainer in namespace '%s'", opts.Namespace),
|
||
Message: err.Error(),
|
||
}
|
||
}
|
||
// JSON 反序列化为 DevcontainerAppList
|
||
jsonData, err := list.MarshalJSON()
|
||
if err != nil {
|
||
return nil, devcontainer_errors.ErrOperateDevcontainer{
|
||
Action: "verify JSON data of Devcontainer List",
|
||
Message: err.Error(),
|
||
}
|
||
}
|
||
devcontainerList := &k8s_api_v1.DevcontainerAppList{}
|
||
if err := json.Unmarshal(jsonData, devcontainerList); err != nil {
|
||
return nil, devcontainer_errors.ErrOperateDevcontainer{
|
||
Action: "deserialize Devcontainer List data",
|
||
Message: err.Error(),
|
||
}
|
||
}
|
||
return devcontainerList, nil
|
||
}
|