package utils import ( "bytes" "fmt" "text/template" applicationv1 "code.gitea.io/gitea/modules/k8s/api/application/v1" apps_v1 "k8s.io/api/apps/v1" core_v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/scheme" ) const ( TemplatePath = "modules/k8s/controller/application/templates/" ) // parseTemplate 解析 Go Template 模板文件 func parseTemplate(templateName string, app *applicationv1.Application) ([]byte, error) { tmpl, err := template. New(templateName + ".yaml"). Funcs(template.FuncMap{"default": DefaultFunc}). ParseFiles(TemplatePath + templateName + ".yaml") if err != nil { return nil, fmt.Errorf("failed to parse template %s: %w", templateName, err) } b := new(bytes.Buffer) err = tmpl.Execute(b, app) if err != nil { return nil, fmt.Errorf("failed to execute template %s: %w", templateName, err) } return b.Bytes(), nil } // NewDeployment 使用模板创建 Deployment func NewDeployment(app *applicationv1.Application) (*apps_v1.Deployment, error) { yamlBytes, err := parseTemplate("deployment", app) if err != nil { return nil, fmt.Errorf("failed to parse deployment template: %w", err) } deployment := &apps_v1.Deployment{} decoder := serializer.NewCodecFactory(scheme.Scheme).UniversalDecoder() err = runtime.DecodeInto(decoder, yamlBytes, deployment) if err != nil { return nil, fmt.Errorf("failed to decode deployment YAML: %w", err) } // 设置 ObjectMeta deployment.Name = app.Name deployment.Namespace = app.Namespace return deployment, nil } // NewService 使用模板创建 Service func NewService(app *applicationv1.Application) (*core_v1.Service, error) { // 检查是否需要创建 Service if !shouldCreateService(app) { return nil, nil } yamlBytes, err := parseTemplate("service", app) if err != nil { return nil, fmt.Errorf("failed to parse service template: %w", err) } service := &core_v1.Service{} decoder := serializer.NewCodecFactory(scheme.Scheme).UniversalDecoder() err = runtime.DecodeInto(decoder, yamlBytes, service) if err != nil { return nil, fmt.Errorf("failed to decode service YAML: %w", err) } // 设置 ObjectMeta service.Name = app.Name + "-svc" service.Namespace = app.Namespace // 后处理:根据新的 Service 配置更新 updateServiceWithConfig(service, app) return service, nil } // NewStatefulSet 使用模板创建 StatefulSet func NewStatefulSet(app *applicationv1.Application) (*apps_v1.StatefulSet, error) { yamlBytes, err := parseTemplate("statefulset", app) if err != nil { return nil, fmt.Errorf("failed to parse statefulset template: %w", err) } statefulSet := &apps_v1.StatefulSet{} decoder := serializer.NewCodecFactory(scheme.Scheme).UniversalDecoder() err = runtime.DecodeInto(decoder, yamlBytes, statefulSet) if err != nil { return nil, fmt.Errorf("failed to decode statefulset YAML: %w", err) } // 设置 ObjectMeta statefulSet.Name = app.Name statefulSet.Namespace = app.Namespace return statefulSet, nil } // updateServiceWithConfig 根据新的 Service 配置更新 Service func updateServiceWithConfig(service *core_v1.Service, app *applicationv1.Application) { if app.Spec.Service == nil { return } config := app.Spec.Service // 更新服务类型 if config.Type != "" { service.Spec.Type = core_v1.ServiceType(config.Type) } // 添加自定义注解 if config.Annotations != nil { if service.Annotations == nil { service.Annotations = make(map[string]string) } for k, v := range config.Annotations { service.Annotations[k] = v } } // 添加自定义标签 if config.Labels != nil { if service.Labels == nil { service.Labels = make(map[string]string) } for k, v := range config.Labels { service.Labels[k] = v } } // 设置特定配置 switch config.Type { case "LoadBalancer": if config.LoadBalancerIP != "" { service.Spec.LoadBalancerIP = config.LoadBalancerIP } if len(config.LoadBalancerSourceRanges) > 0 { service.Spec.LoadBalancerSourceRanges = config.LoadBalancerSourceRanges } case "ExternalName": if config.ExternalName != "" { service.Spec.ExternalName = config.ExternalName } // ExternalName 类型不需要 selector 和 ports service.Spec.Selector = nil service.Spec.Ports = nil } // 设置会话亲和性 if config.SessionAffinity != "" { service.Spec.SessionAffinity = core_v1.ServiceAffinity(config.SessionAffinity) } // 更新端口配置(如果有自定义端口配置) if len(config.Ports) > 0 { service.Spec.Ports = getServicePorts(app, config) } else if config.NodePorts != nil { // 如果只配置了 NodePorts,更新现有端口的 NodePort updateServiceNodePorts(service, config) } } // updateServiceNodePorts 更新服务的 NodePort 配置 func updateServiceNodePorts(service *core_v1.Service, config *applicationv1.ServiceConfig) { for i, port := range service.Spec.Ports { if nodePort, exists := config.NodePorts[port.Name]; exists { service.Spec.Ports[i].NodePort = nodePort } } } // getServicePorts 获取 Service 端口配置 func getServicePorts(app *applicationv1.Application, config *applicationv1.ServiceConfig) []core_v1.ServicePort { var servicePorts []core_v1.ServicePort // 如果配置了自定义端口,使用自定义端口 if len(config.Ports) > 0 { for _, port := range config.Ports { servicePort := core_v1.ServicePort{ Name: port.Name, Port: port.Port, Protocol: core_v1.Protocol(getPortProtocol(port.Protocol)), } // 设置目标端口 if port.TargetPort != "" { servicePort.TargetPort = intstr.FromString(port.TargetPort) } else { servicePort.TargetPort = intstr.FromInt(int(port.Port)) } // 设置 NodePort(仅适用于 NodePort 和 LoadBalancer 类型) if (config.Type == "NodePort" || config.Type == "LoadBalancer") && port.NodePort > 0 { servicePort.NodePort = port.NodePort } servicePorts = append(servicePorts, servicePort) } } else { // 使用模板中的端口配置 for _, port := range app.Spec.Template.Ports { servicePort := core_v1.ServicePort{ Name: port.Name, Port: port.Port, TargetPort: intstr.FromInt(int(port.Port)), Protocol: core_v1.Protocol(getPortProtocol(port.Protocol)), } // 如果是 NodePort 类型,检查是否有指定的 NodePort if (config.Type == "NodePort" || config.Type == "LoadBalancer") && config.NodePorts != nil { if nodePort, exists := config.NodePorts[port.Name]; exists { servicePort.NodePort = nodePort } } servicePorts = append(servicePorts, servicePort) } } return servicePorts } // shouldCreateService 判断是否需要创建 Service func shouldCreateService(app *applicationv1.Application) bool { // 优先使用新的 Service 配置 if app.Spec.Service != nil { return app.Spec.Service.Enabled } // 向后兼容:使用旧的 expose 配置 return app.Spec.Expose && len(app.Spec.Template.Ports) > 0 } // getPortProtocol 获取端口协议,设置默认值 func getPortProtocol(protocol string) string { if protocol == "" { return "TCP" } return protocol } // DefaultFunc 函数用于实现默认值 func DefaultFunc(value interface{}, defaultValue interface{}) interface{} { if value == nil || value == "" { return defaultValue } return value }