Files
2025-08-25 15:46:12 +08:00

263 lines
7.3 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package utils
import (
"bytes"
"fmt"
"text/template"
applicationv1 "code.gitea.io/gitea/modules/k8s/api/application/v1"
apps_v1 "k8s.io/api/apps/v1"
core_v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/client-go/kubernetes/scheme"
)
const (
TemplatePath = "modules/k8s/controller/application/templates/"
)
// parseTemplate 解析 Go Template 模板文件
func parseTemplate(templateName string, app *applicationv1.Application) ([]byte, error) {
tmpl, err := template.
New(templateName + ".yaml").
Funcs(template.FuncMap{"default": DefaultFunc}).
ParseFiles(TemplatePath + templateName + ".yaml")
if err != nil {
return nil, fmt.Errorf("failed to parse template %s: %w", templateName, err)
}
b := new(bytes.Buffer)
err = tmpl.Execute(b, app)
if err != nil {
return nil, fmt.Errorf("failed to execute template %s: %w", templateName, err)
}
return b.Bytes(), nil
}
// NewDeployment 使用模板创建 Deployment
func NewDeployment(app *applicationv1.Application) (*apps_v1.Deployment, error) {
yamlBytes, err := parseTemplate("deployment", app)
if err != nil {
return nil, fmt.Errorf("failed to parse deployment template: %w", err)
}
deployment := &apps_v1.Deployment{}
decoder := serializer.NewCodecFactory(scheme.Scheme).UniversalDecoder()
err = runtime.DecodeInto(decoder, yamlBytes, deployment)
if err != nil {
return nil, fmt.Errorf("failed to decode deployment YAML: %w", err)
}
// 设置 ObjectMeta
deployment.Name = app.Name
deployment.Namespace = app.Namespace
return deployment, nil
}
// NewService 使用模板创建 Service
func NewService(app *applicationv1.Application) (*core_v1.Service, error) {
// 检查是否需要创建 Service
if !shouldCreateService(app) {
return nil, nil
}
yamlBytes, err := parseTemplate("service", app)
if err != nil {
return nil, fmt.Errorf("failed to parse service template: %w", err)
}
service := &core_v1.Service{}
decoder := serializer.NewCodecFactory(scheme.Scheme).UniversalDecoder()
err = runtime.DecodeInto(decoder, yamlBytes, service)
if err != nil {
return nil, fmt.Errorf("failed to decode service YAML: %w", err)
}
// 设置 ObjectMeta
service.Name = app.Name + "-svc"
service.Namespace = app.Namespace
// 后处理:根据新的 Service 配置更新
updateServiceWithConfig(service, app)
return service, nil
}
// NewStatefulSet 使用模板创建 StatefulSet
func NewStatefulSet(app *applicationv1.Application) (*apps_v1.StatefulSet, error) {
yamlBytes, err := parseTemplate("statefulset", app)
if err != nil {
return nil, fmt.Errorf("failed to parse statefulset template: %w", err)
}
statefulSet := &apps_v1.StatefulSet{}
decoder := serializer.NewCodecFactory(scheme.Scheme).UniversalDecoder()
err = runtime.DecodeInto(decoder, yamlBytes, statefulSet)
if err != nil {
return nil, fmt.Errorf("failed to decode statefulset YAML: %w", err)
}
// 设置 ObjectMeta
statefulSet.Name = app.Name
statefulSet.Namespace = app.Namespace
return statefulSet, nil
}
// updateServiceWithConfig 根据新的 Service 配置更新 Service
func updateServiceWithConfig(service *core_v1.Service, app *applicationv1.Application) {
if app.Spec.Service == nil {
return
}
config := app.Spec.Service
// 更新服务类型
if config.Type != "" {
service.Spec.Type = core_v1.ServiceType(config.Type)
}
// 添加自定义注解
if config.Annotations != nil {
if service.Annotations == nil {
service.Annotations = make(map[string]string)
}
for k, v := range config.Annotations {
service.Annotations[k] = v
}
}
// 添加自定义标签
if config.Labels != nil {
if service.Labels == nil {
service.Labels = make(map[string]string)
}
for k, v := range config.Labels {
service.Labels[k] = v
}
}
// 设置特定配置
switch config.Type {
case "LoadBalancer":
if config.LoadBalancerIP != "" {
service.Spec.LoadBalancerIP = config.LoadBalancerIP
}
if len(config.LoadBalancerSourceRanges) > 0 {
service.Spec.LoadBalancerSourceRanges = config.LoadBalancerSourceRanges
}
case "ExternalName":
if config.ExternalName != "" {
service.Spec.ExternalName = config.ExternalName
}
// ExternalName 类型不需要 selector 和 ports
service.Spec.Selector = nil
service.Spec.Ports = nil
}
// 设置会话亲和性
if config.SessionAffinity != "" {
service.Spec.SessionAffinity = core_v1.ServiceAffinity(config.SessionAffinity)
}
// 更新端口配置(如果有自定义端口配置)
if len(config.Ports) > 0 {
service.Spec.Ports = getServicePorts(app, config)
} else if config.NodePorts != nil {
// 如果只配置了 NodePorts更新现有端口的 NodePort
updateServiceNodePorts(service, config)
}
}
// updateServiceNodePorts 更新服务的 NodePort 配置
func updateServiceNodePorts(service *core_v1.Service, config *applicationv1.ServiceConfig) {
for i, port := range service.Spec.Ports {
if nodePort, exists := config.NodePorts[port.Name]; exists {
service.Spec.Ports[i].NodePort = nodePort
}
}
}
// getServicePorts 获取 Service 端口配置
func getServicePorts(app *applicationv1.Application, config *applicationv1.ServiceConfig) []core_v1.ServicePort {
var servicePorts []core_v1.ServicePort
// 如果配置了自定义端口,使用自定义端口
if len(config.Ports) > 0 {
for _, port := range config.Ports {
servicePort := core_v1.ServicePort{
Name: port.Name,
Port: port.Port,
Protocol: core_v1.Protocol(getPortProtocol(port.Protocol)),
}
// 设置目标端口
if port.TargetPort != "" {
servicePort.TargetPort = intstr.FromString(port.TargetPort)
} else {
servicePort.TargetPort = intstr.FromInt(int(port.Port))
}
// 设置 NodePort仅适用于 NodePort 和 LoadBalancer 类型)
if (config.Type == "NodePort" || config.Type == "LoadBalancer") && port.NodePort > 0 {
servicePort.NodePort = port.NodePort
}
servicePorts = append(servicePorts, servicePort)
}
} else {
// 使用模板中的端口配置
for _, port := range app.Spec.Template.Ports {
servicePort := core_v1.ServicePort{
Name: port.Name,
Port: port.Port,
TargetPort: intstr.FromInt(int(port.Port)),
Protocol: core_v1.Protocol(getPortProtocol(port.Protocol)),
}
// 如果是 NodePort 类型,检查是否有指定的 NodePort
if (config.Type == "NodePort" || config.Type == "LoadBalancer") &&
config.NodePorts != nil {
if nodePort, exists := config.NodePorts[port.Name]; exists {
servicePort.NodePort = nodePort
}
}
servicePorts = append(servicePorts, servicePort)
}
}
return servicePorts
}
// shouldCreateService 判断是否需要创建 Service
func shouldCreateService(app *applicationv1.Application) bool {
// 优先使用新的 Service 配置
if app.Spec.Service != nil {
return app.Spec.Service.Enabled
}
// 向后兼容:使用旧的 expose 配置
return app.Spec.Expose && len(app.Spec.Template.Ports) > 0
}
// getPortProtocol 获取端口协议,设置默认值
func getPortProtocol(protocol string) string {
if protocol == "" {
return "TCP"
}
return protocol
}
// DefaultFunc 函数用于实现默认值
func DefaultFunc(value interface{}, defaultValue interface{}) interface{} {
if value == nil || value == "" {
return defaultValue
}
return value
}