Files
devstar/modules/k8s/controller/application/application_controller.go
2025-11-18 19:25:54 +08:00

2337 lines
77 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package application
import (
"bytes"
"context"
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
k8s_sigs_controller_runtime_utils "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
applicationv1 "code.gitea.io/gitea/modules/k8s/api/application/v1"
application_controller_utils "code.gitea.io/gitea/modules/k8s/controller/application/utils"
apps_v1 "k8s.io/api/apps/v1"
core_v1 "k8s.io/api/core/v1"
"google.golang.org/protobuf/types/known/durationpb"
istioapinetworkingv1 "istio.io/api/networking/v1"
istionetworkingv1 "istio.io/client-go/pkg/apis/networking/v1"
)
// ApplicationReconciler reconciles a Application object
type ApplicationReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=application.devstar.cn,resources=applications,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=application.devstar.cn,resources=applications/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=application.devstar.cn,resources=applications/finalizers,verbs=update
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=create;delete;get;list;watch;update;patch
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=create;delete;get;list;watch;update;patch
// +kubebuilder:rbac:groups="",resources=services,verbs=create;delete;get;list;watch;update;patch
// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=create;delete;get;list;watch;update;patch
// +kubebuilder:rbac:groups="",resources=secrets,verbs=get;list;watch;create;update;patch
// +kubebuilder:rbac:groups="",resources=namespaces,verbs=get;list;watch;create
// Reconcile is part of the main kubernetes reconciliation loop
func (r *ApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 获取Application实例
app := &applicationv1.Application{}
err := r.Get(ctx, req.NamespacedName, app)
if err != nil {
if errors.IsNotFound(err) {
logger.Info("Application resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
logger.Error(err, "Failed to get Application")
return ctrl.Result{}, err
}
logger.Info("Processing Application", "name", app.Name, "namespace", app.Namespace, "type", app.Spec.Template.Type)
// 确保命名空间存在
if err := r.ensureNamespace(ctx, app.Namespace); err != nil {
logger.Error(err, "Failed to ensure namespace exists", "namespace", app.Namespace)
return ctrl.Result{}, err
}
// 添加 finalizer 处理逻辑
finalizerName := "application.devstar.cn/finalizer"
// 检查对象是否正在被删除
if !app.ObjectMeta.DeletionTimestamp.IsZero() {
// 对象正在被删除 - 处理 finalizer
if k8s_sigs_controller_runtime_utils.ContainsFinalizer(app, finalizerName) {
// 执行清理操作
logger.Info("Cleaning up resources before deletion", "name", app.Name)
// 清理 Gateway 资源(包括 Secret
if err := r.cleanupGateway(ctx, app); err != nil {
logger.Error(err, "Failed to cleanup gateway resources")
return ctrl.Result{}, err
}
// 清理 Mesh 资源
if err := r.cleanupMesh(ctx, app); err != nil {
logger.Error(err, "Failed to cleanup mesh resources")
return ctrl.Result{}, err
}
// 清理完成后移除 finalizer
k8s_sigs_controller_runtime_utils.RemoveFinalizer(app, finalizerName)
if err := r.Update(ctx, app); err != nil {
logger.Error(err, "Failed to remove finalizer")
return ctrl.Result{}, err
}
}
// 已标记为删除且处理完成,允许继续删除流程
return ctrl.Result{}, nil
}
// 如果对象不包含 finalizer就添加它
if !k8s_sigs_controller_runtime_utils.ContainsFinalizer(app, finalizerName) {
logger.Info("Adding finalizer", "name", app.Name)
k8s_sigs_controller_runtime_utils.AddFinalizer(app, finalizerName)
if err := r.Update(ctx, app); err != nil {
logger.Error(err, "Failed to add finalizer")
return ctrl.Result{}, err
}
}
// 根据应用类型协调相应的资源
if app.Spec.Template.Type == "stateful" {
// 协调 StatefulSet
if err := r.reconcileStatefulSet(ctx, app); err != nil {
logger.Error(err, "Failed to reconcile StatefulSet")
return ctrl.Result{}, err
}
} else {
// 协调 Deployment默认为无状态应用
if err := r.reconcileDeployment(ctx, app); err != nil {
logger.Error(err, "Failed to reconcile Deployment")
return ctrl.Result{}, err
}
}
// 协调 Service
if err := r.reconcileService(ctx, app); err != nil {
logger.Error(err, "Failed to reconcile Service")
return ctrl.Result{}, err
}
// 更新状态
if err := r.updateStatus(ctx, app); err != nil {
logger.Error(err, "Failed to update status")
return ctrl.Result{}, err
}
// 协调网络策略
if err := r.reconcileNetworkPolicy(ctx, app); err != nil {
logger.Error(err, "Failed to reconcile network policy")
return ctrl.Result{}, err
}
// 如果配置了旧版TrafficPolicy为向后兼容处理旧的流量策略
if app.Spec.TrafficPolicy != nil {
if err := r.reconcileIstioTraffic(ctx, app); err != nil {
logger.Error(err, "Failed to reconcile Istio traffic policy")
return ctrl.Result{}, err
}
}
logger.Info("Successfully reconciled Application", "name", app.Name)
return ctrl.Result{RequeueAfter: time.Minute * 1}, nil
}
// 服务网格流量治理逻辑
func (r *ApplicationReconciler) reconcileIstioTraffic(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
if app.Spec.TrafficPolicy == nil {
logger.Info("No legacy TrafficPolicy configured, skipping")
return nil
}
// 如果已经有NetworkPolicy配置使用它而不是旧版TrafficPolicy
if app.Spec.NetworkPolicy != nil &&
((app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Enabled) ||
(app.Spec.NetworkPolicy.Gateway != nil && app.Spec.NetworkPolicy.Gateway.Enabled)) {
logger.Info("NetworkPolicy already configured, skipping legacy TrafficPolicy")
return nil
}
// 处理金丝雀发布
if app.Spec.TrafficPolicy.Canary != nil && app.Spec.TrafficPolicy.Canary.Enabled {
logger.Info("Processing legacy Canary traffic configuration")
// 这里实现金丝雀发布的逻辑
// 为简化实现可以创建一个VirtualService来处理金丝雀流量
vsName := app.Name + "-canary-vs"
vs := &istionetworkingv1.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: vsName,
Namespace: app.Namespace,
},
}
// 检查服务是否存在
serviceName := app.Name + "-svc"
service := &core_v1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service)
if errors.IsNotFound(err) {
logger.Info("Service not found, skipping legacy VirtualService creation", "service", serviceName)
return nil
} else if err != nil {
return fmt.Errorf("failed to get service: %w", err)
}
// 构建金丝雀服务名
canaryService := app.Name + "-" + app.Spec.TrafficPolicy.Canary.CanaryVersion + "-svc"
// 创建或更新VirtualService
op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, vs, func() error {
// 设置控制器引用
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, vs, r.Scheme); err != nil {
return err
}
// 配置金丝雀VirtualService
mainWeight := int32(app.Spec.TrafficPolicy.Canary.MainWeight)
canaryWeight := int32(100 - app.Spec.TrafficPolicy.Canary.MainWeight)
vs.Spec.Hosts = []string{service.Name}
vs.Spec.Gateways = []string{"mesh"} // 默认只在网格内生效
vs.Spec.Http = []*istioapinetworkingv1.HTTPRoute{
{
Route: []*istioapinetworkingv1.HTTPRouteDestination{
{
Destination: &istioapinetworkingv1.Destination{
Host: service.Name,
},
Weight: mainWeight,
},
{
Destination: &istioapinetworkingv1.Destination{
Host: canaryService,
},
Weight: canaryWeight,
},
},
},
}
return nil
})
if err != nil {
return fmt.Errorf("failed to create or update legacy Canary VirtualService: %w", err)
}
logger.Info("Legacy Canary VirtualService reconciled", "name", vsName, "operation", op)
}
// 如果配置了熔断器确保创建DestinationRule
if app.Spec.TrafficPolicy.CircuitBreaker != nil {
logger.Info("Processing legacy CircuitBreaker configuration")
// reconcileDestinationRule函数已经支持从TrafficPolicy获取熔断器配置
if err := r.reconcileDestinationRule(ctx, app); err != nil {
return fmt.Errorf("failed to reconcile legacy DestinationRule: %w", err)
}
}
return nil
}
// 修复 reconcileDeployment 函数中的调用
func (r *ApplicationReconciler) reconcileDeployment(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
// 检查 Deployment 是否存在
deployment := &apps_v1.Deployment{}
err := r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, deployment)
if errors.IsNotFound(err) {
// 创建新的 Deployment
logger.Info("Creating new Deployment", "name", app.Name)
newDeployment, err := application_controller_utils.NewDeployment(app)
if err != nil {
return fmt.Errorf("failed to generate deployment: %w", err)
}
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newDeployment, r.Scheme); err != nil {
return fmt.Errorf("failed to set controller reference: %w", err)
}
if err := r.Create(ctx, newDeployment); err != nil {
return fmt.Errorf("failed to create deployment: %w", err)
}
logger.Info("Successfully created Deployment", "name", app.Name)
return nil
} else if err != nil {
return fmt.Errorf("failed to get deployment: %w", err)
}
// 获取期望的副本数
desiredReplicas := int32(1) // 默认值
if app.Spec.Replicas != nil {
desiredReplicas = *app.Spec.Replicas
}
// 获取当前的副本数
currentReplicas := int32(1) // 默认值
if deployment.Spec.Replicas != nil {
currentReplicas = *deployment.Spec.Replicas
}
logger.Info("Deployment replica status",
"name", app.Name,
"current-spec-replicas", currentReplicas,
"desired-replicas", desiredReplicas,
"actual-replicas", deployment.Status.Replicas,
"ready-replicas", deployment.Status.ReadyReplicas)
// 检查是否需要更新
needsUpdate := false
updateFields := make(map[string]interface{})
// 1. 检查副本数是否变更
if currentReplicas != desiredReplicas {
logger.Info("Replica count changed", "current", currentReplicas, "desired", desiredReplicas)
needsUpdate = true
updateFields["replicas"] = desiredReplicas
deployment.Spec.Replicas = &desiredReplicas
}
// 2. 生成期望的 Deployment 来比较其他字段
updatedDeployment, err := application_controller_utils.NewDeployment(app)
if err != nil {
return fmt.Errorf("failed to generate updated deployment: %w", err)
}
// 3. 检查镜像是否变更
if len(deployment.Spec.Template.Spec.Containers) > 0 &&
len(updatedDeployment.Spec.Template.Spec.Containers) > 0 {
currentImage := deployment.Spec.Template.Spec.Containers[0].Image
desiredImage := updatedDeployment.Spec.Template.Spec.Containers[0].Image
if currentImage != desiredImage {
logger.Info("Image changed", "current", currentImage, "desired", desiredImage)
needsUpdate = true
updateFields["image"] = desiredImage
deployment.Spec.Template.Spec.Containers[0].Image = desiredImage
}
}
// 4. 检查环境变量是否变更
if len(deployment.Spec.Template.Spec.Containers) > 0 &&
len(updatedDeployment.Spec.Template.Spec.Containers) > 0 {
if !equalEnvVars(deployment.Spec.Template.Spec.Containers[0].Env,
updatedDeployment.Spec.Template.Spec.Containers[0].Env) {
logger.Info("Environment variables changed")
needsUpdate = true
updateFields["env"] = "changed"
deployment.Spec.Template.Spec.Containers[0].Env = updatedDeployment.Spec.Template.Spec.Containers[0].Env
}
}
// 5. 检查资源配置是否变更
if len(deployment.Spec.Template.Spec.Containers) > 0 &&
len(updatedDeployment.Spec.Template.Spec.Containers) > 0 {
if !equalResources(deployment.Spec.Template.Spec.Containers[0].Resources,
updatedDeployment.Spec.Template.Spec.Containers[0].Resources) {
logger.Info("Resource requirements changed")
needsUpdate = true
updateFields["resources"] = "changed"
deployment.Spec.Template.Spec.Containers[0].Resources = updatedDeployment.Spec.Template.Spec.Containers[0].Resources
}
}
// 6. 检查端口配置是否变更
if len(deployment.Spec.Template.Spec.Containers) > 0 &&
len(updatedDeployment.Spec.Template.Spec.Containers) > 0 {
if !equalPorts(deployment.Spec.Template.Spec.Containers[0].Ports,
updatedDeployment.Spec.Template.Spec.Containers[0].Ports) {
logger.Info("Container ports changed")
needsUpdate = true
updateFields["ports"] = "changed"
deployment.Spec.Template.Spec.Containers[0].Ports = updatedDeployment.Spec.Template.Spec.Containers[0].Ports
}
}
// 7. 检查健康检查配置是否变更
if len(deployment.Spec.Template.Spec.Containers) > 0 &&
len(updatedDeployment.Spec.Template.Spec.Containers) > 0 {
if !equalProbes(deployment.Spec.Template.Spec.Containers[0].LivenessProbe,
updatedDeployment.Spec.Template.Spec.Containers[0].LivenessProbe) ||
!equalProbes(deployment.Spec.Template.Spec.Containers[0].ReadinessProbe,
updatedDeployment.Spec.Template.Spec.Containers[0].ReadinessProbe) {
logger.Info("Health check probes changed")
needsUpdate = true
updateFields["probes"] = "changed"
deployment.Spec.Template.Spec.Containers[0].LivenessProbe = updatedDeployment.Spec.Template.Spec.Containers[0].LivenessProbe
deployment.Spec.Template.Spec.Containers[0].ReadinessProbe = updatedDeployment.Spec.Template.Spec.Containers[0].ReadinessProbe
}
}
// 执行更新
if needsUpdate {
logger.Info("Updating deployment", "name", app.Name, "fields", updateFields)
if err := r.Update(ctx, deployment); err != nil {
return fmt.Errorf("failed to update deployment: %w", err)
}
logger.Info("Deployment updated successfully", "name", app.Name, "fields", updateFields)
} else {
logger.Info("No deployment updates needed", "name", app.Name)
}
return nil
}
// 添加辅助函数:比较端口配置
func equalPorts(current, desired []core_v1.ContainerPort) bool {
if len(current) != len(desired) {
return false
}
currentMap := make(map[string]core_v1.ContainerPort)
for _, port := range current {
currentMap[port.Name] = port
}
for _, port := range desired {
if currentPort, exists := currentMap[port.Name]; !exists ||
currentPort.ContainerPort != port.ContainerPort ||
currentPort.Protocol != port.Protocol {
return false
}
}
return true
}
// 添加辅助函数:比较探针配置
func equalProbes(current, desired *core_v1.Probe) bool {
if current == nil && desired == nil {
return true
}
if current == nil || desired == nil {
return false
}
// 简单比较,可以根据需要扩展
if current.InitialDelaySeconds != desired.InitialDelaySeconds ||
current.PeriodSeconds != desired.PeriodSeconds ||
current.TimeoutSeconds != desired.TimeoutSeconds ||
current.SuccessThreshold != desired.SuccessThreshold ||
current.FailureThreshold != desired.FailureThreshold {
return false
}
// 比较 HTTPGet 配置
if current.HTTPGet != nil && desired.HTTPGet != nil {
return current.HTTPGet.Path == desired.HTTPGet.Path &&
current.HTTPGet.Port == desired.HTTPGet.Port
}
return current.HTTPGet == nil && desired.HTTPGet == nil
}
// 添加辅助函数:比较环境变量
func equalEnvVars(current, desired []core_v1.EnvVar) bool {
if len(current) != len(desired) {
return false
}
currentMap := make(map[string]string)
for _, env := range current {
currentMap[env.Name] = env.Value
}
desiredMap := make(map[string]string)
for _, env := range desired {
desiredMap[env.Name] = env.Value
}
// 检查每个期望的环境变量是否匹配
for key, value := range desiredMap {
if currentMap[key] != value {
return false
}
}
// 检查是否有多余的环境变量
for key := range currentMap {
if _, exists := desiredMap[key]; !exists {
return false
}
}
return true
}
// 添加辅助函数:比较资源配置
func equalResources(current, desired core_v1.ResourceRequirements) bool {
// 比较 Limits
if current.Limits == nil && desired.Limits != nil {
return false
}
if current.Limits != nil && desired.Limits == nil {
return false
}
if current.Limits != nil && desired.Limits != nil {
// 比较 CPU
currentCPU := current.Limits.Cpu()
desiredCPU := desired.Limits.Cpu()
if currentCPU != nil && desiredCPU != nil {
if !currentCPU.Equal(*desiredCPU) {
return false
}
} else if currentCPU != desiredCPU { // 一个为 nil另一个不为 nil
return false
}
// 比较 Memory
currentMemory := current.Limits.Memory()
desiredMemory := desired.Limits.Memory()
if currentMemory != nil && desiredMemory != nil {
if !currentMemory.Equal(*desiredMemory) {
return false
}
} else if currentMemory != desiredMemory { // 一个为 nil另一个不为 nil
return false
}
}
// 比较 Requests
if current.Requests == nil && desired.Requests != nil {
return false
}
if current.Requests != nil && desired.Requests == nil {
return false
}
if current.Requests != nil && desired.Requests != nil {
// 比较 CPU
currentCPU := current.Requests.Cpu()
desiredCPU := desired.Requests.Cpu()
if currentCPU != nil && desiredCPU != nil {
if !currentCPU.Equal(*desiredCPU) {
return false
}
} else if currentCPU != desiredCPU { // 一个为 nil另一个不为 nil
return false
}
// 比较 Memory
currentMemory := current.Requests.Memory()
desiredMemory := desired.Requests.Memory()
if currentMemory != nil && desiredMemory != nil {
if !currentMemory.Equal(*desiredMemory) {
return false
}
} else if currentMemory != desiredMemory { // 一个为 nil另一个不为 nil
return false
}
}
return true
}
func (r *ApplicationReconciler) reconcileStatefulSet(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
// 检查 StatefulSet 是否存在
statefulSet := &apps_v1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, statefulSet)
if errors.IsNotFound(err) {
// 创建新的 StatefulSet
logger.Info("Creating new StatefulSet", "name", app.Name)
newStatefulSet, err := application_controller_utils.NewStatefulSet(app)
if err != nil {
return fmt.Errorf("failed to generate statefulset: %w", err)
}
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newStatefulSet, r.Scheme); err != nil {
return fmt.Errorf("failed to set controller reference: %w", err)
}
if err := r.Create(ctx, newStatefulSet); err != nil {
return fmt.Errorf("failed to create statefulset: %w", err)
}
logger.Info("Successfully created StatefulSet", "name", app.Name)
return nil
} else if err != nil {
return fmt.Errorf("failed to get statefulset: %w", err)
}
// 更新现有 StatefulSet
logger.Info("Checking StatefulSet for updates", "name", app.Name)
updatedStatefulSet, err := application_controller_utils.NewStatefulSet(app)
if err != nil {
return fmt.Errorf("failed to generate updated statefulset: %w", err)
}
// 检查是否需要更新
needsUpdate := false
updateFields := make(map[string]interface{})
// 检查镜像是否变更
if len(statefulSet.Spec.Template.Spec.Containers) > 0 &&
len(updatedStatefulSet.Spec.Template.Spec.Containers) > 0 {
currentImage := statefulSet.Spec.Template.Spec.Containers[0].Image
desiredImage := updatedStatefulSet.Spec.Template.Spec.Containers[0].Image
if currentImage != desiredImage {
logger.Info("StatefulSet image changed", "current", currentImage, "desired", desiredImage)
needsUpdate = true
updateFields["image"] = desiredImage
}
}
// 检查副本数是否变更
currentReplicas := int32(1)
if statefulSet.Spec.Replicas != nil {
currentReplicas = *statefulSet.Spec.Replicas
}
desiredReplicas := int32(1)
if updatedStatefulSet.Spec.Replicas != nil {
desiredReplicas = *updatedStatefulSet.Spec.Replicas
}
if currentReplicas != desiredReplicas {
logger.Info("StatefulSet replica count changed", "current", currentReplicas, "desired", desiredReplicas)
needsUpdate = true
updateFields["replicas"] = desiredReplicas
}
if needsUpdate {
logger.Info("Updating StatefulSet", "name", app.Name, "fields", updateFields)
statefulSet.Spec = updatedStatefulSet.Spec
if err := r.Update(ctx, statefulSet); err != nil {
return fmt.Errorf("failed to update statefulset: %w", err)
}
logger.Info("StatefulSet updated successfully", "name", app.Name)
} else {
logger.Info("No StatefulSet updates needed", "name", app.Name)
}
return nil
}
// 添加辅助函数:比较字符串映射
func equalStringMaps(current, desired map[string]string) bool {
if len(current) != len(desired) {
return false
}
for k, v := range desired {
if current[k] != v {
return false
}
}
return true
}
// equalIngressTLS函数已废弃
func (r *ApplicationReconciler) updateStatus(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
var deployment *apps_v1.Deployment
var statefulSet *apps_v1.StatefulSet
var err error
// 根据应用类型获取对应的资源状态
if app.Spec.Template.Type == "stateful" {
statefulSet = &apps_v1.StatefulSet{}
err = r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, statefulSet)
} else {
deployment = &apps_v1.Deployment{}
err = r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, deployment)
}
if err != nil {
app.Status.Phase = "Failed"
app.Status.Message = fmt.Sprintf("Workload not found: %v", err)
logger.Error(err, "Failed to get workload for status update")
} else {
// 根据工作负载类型更新状态
if statefulSet != nil {
app.Status.Replicas = statefulSet.Status.Replicas
app.Status.ReadyReplicas = statefulSet.Status.ReadyReplicas
} else if deployment != nil {
app.Status.Replicas = deployment.Status.Replicas
app.Status.ReadyReplicas = deployment.Status.ReadyReplicas
}
// 状态判断逻辑
if app.Status.ReadyReplicas == 0 {
app.Status.Phase = "Pending"
app.Status.Message = "Application is starting"
} else if app.Status.ReadyReplicas < app.Status.Replicas {
app.Status.Phase = "Scaling"
app.Status.Message = fmt.Sprintf("Ready: %d/%d",
app.Status.ReadyReplicas, app.Status.Replicas)
} else {
app.Status.Phase = "Running"
app.Status.Message = "Application is healthy"
}
}
app.Status.LastUpdated = metav1.Now()
if err := r.Status().Update(ctx, app); err != nil {
return fmt.Errorf("failed to update status: %w", err)
}
logger.Info("Updated Application status", "phase", app.Status.Phase, "message", app.Status.Message)
return nil
}
// 协调网络策略
func (r *ApplicationReconciler) reconcileNetworkPolicy(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
// 如果没有配置网络策略,跳过处理
if app.Spec.NetworkPolicy == nil {
logger.Info("No network policy configured, skipping", "name", app.Name)
return nil
}
// 协调Gateway(南北向流量)
if app.Spec.NetworkPolicy.Gateway != nil && app.Spec.NetworkPolicy.Gateway.Enabled {
if err := r.reconcileGateway(ctx, app); err != nil {
return fmt.Errorf("failed to reconcile gateway: %w", err)
}
} else {
// 清理不再需要的Gateway资源
if err := r.cleanupGateway(ctx, app); err != nil {
return fmt.Errorf("failed to cleanup gateway: %w", err)
}
}
// 协调Mesh(东西向流量)
if app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Enabled {
if err := r.reconcileMesh(ctx, app); err != nil {
return fmt.Errorf("failed to reconcile mesh: %w", err)
}
} else {
// 清理不再需要的Mesh资源
if err := r.cleanupMesh(ctx, app); err != nil {
return fmt.Errorf("failed to cleanup mesh: %w", err)
}
}
return nil
}
// cleanupGateway 清理不再需要的Gateway资源
func (r *ApplicationReconciler) cleanupGateway(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
gatewayName := app.Name + "-gateway"
vsName := app.Name + "-gateway-vs"
targetNamespaces := map[string]struct{}{app.Namespace: {}}
if ns, err := r.determineGatewayNamespace(ctx, app); err == nil {
targetNamespaces[ns] = struct{}{}
} else {
logger.Error(err, "Failed to determine gateway namespace during cleanup, fallback to application namespace")
}
for ns := range targetNamespaces {
gateway := &istionetworkingv1.Gateway{}
err := r.Get(ctx, types.NamespacedName{Name: gatewayName, Namespace: ns}, gateway)
if err == nil {
logger.Info("Cleaning up Gateway that is no longer needed", "name", gatewayName, "namespace", ns)
if err := r.Delete(ctx, gateway); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete Gateway %s/%s: %w", ns, gatewayName, err)
}
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get Gateway %s/%s: %w", ns, gatewayName, err)
}
}
// 清理VirtualService
vs := &istionetworkingv1.VirtualService{}
err := r.Get(ctx, types.NamespacedName{Name: vsName, Namespace: app.Namespace}, vs)
if err == nil {
logger.Info("Cleaning up Gateway VirtualService that is no longer needed", "name", vsName)
if err := r.Delete(ctx, vs); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete Gateway VirtualService: %w", err)
}
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get Gateway VirtualService: %w", err)
}
// 清理 TLS Secret如果是由控制器创建的
if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Gateway != nil {
for _, tls := range app.Spec.NetworkPolicy.Gateway.TLS {
if strings.EqualFold(tls.Mode, "PASSTHROUGH") {
continue
}
if tls.SecretName == "" {
continue
}
// 确定 Secret 的命名空间
secretNS, err := r.detectIngressNamespace(ctx, tls.SecretNamespace)
if err != nil {
logger.Error(err, "Failed to determine secret namespace for cleanup, skipping", "secretName", tls.SecretName)
continue
}
// 检查 Secret 是否存在,并且有我们的标签(说明是我们创建的)
secret := &core_v1.Secret{}
err = r.Get(ctx, types.NamespacedName{Name: tls.SecretName, Namespace: secretNS}, secret)
if err == nil {
// 检查标签,确认是我们创建的
if secret.Labels != nil &&
secret.Labels["app.k8s.devstar/name"] == app.Name &&
secret.Labels["app.k8s.devstar/type"] == "gateway-tls" {
logger.Info("Cleaning up Gateway TLS Secret", "name", tls.SecretName, "namespace", secretNS)
if err := r.Delete(ctx, secret); err != nil && !errors.IsNotFound(err) {
logger.Error(err, "Failed to delete Gateway TLS Secret", "name", tls.SecretName, "namespace", secretNS)
// 不返回错误,继续清理其他资源
}
}
} else if !errors.IsNotFound(err) {
logger.Error(err, "Failed to get Gateway TLS Secret for cleanup", "name", tls.SecretName, "namespace", secretNS)
}
}
}
return nil
}
// cleanupMesh 清理不再需要的Mesh资源
func (r *ApplicationReconciler) cleanupMesh(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
vsName := app.Name + "-mesh-vs"
drName := app.Name + "-dr"
// 清理VirtualService
vs := &istionetworkingv1.VirtualService{}
err := r.Get(ctx, types.NamespacedName{Name: vsName, Namespace: app.Namespace}, vs)
if err == nil {
logger.Info("Cleaning up Mesh VirtualService that is no longer needed", "name", vsName)
if err := r.Delete(ctx, vs); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete Mesh VirtualService: %w", err)
}
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get Mesh VirtualService: %w", err)
}
// 清理DestinationRule
dr := &istionetworkingv1.DestinationRule{}
err = r.Get(ctx, types.NamespacedName{Name: drName, Namespace: app.Namespace}, dr)
if err == nil {
logger.Info("Cleaning up DestinationRule that is no longer needed", "name", drName)
if err := r.Delete(ctx, dr); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete DestinationRule: %w", err)
}
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get DestinationRule: %w", err)
}
return nil
}
// ensureNamespace 确保命名空间存在,如果不存在则创建
func (r *ApplicationReconciler) ensureNamespace(ctx context.Context, namespace string) error {
logger := log.FromContext(ctx)
// 跳过默认命名空间(这些命名空间通常由系统管理)
if namespace == "default" || namespace == "kube-system" || namespace == "kube-public" {
return nil
}
// 检查命名空间是否存在
ns := &core_v1.Namespace{}
err := r.Get(ctx, types.NamespacedName{Name: namespace}, ns)
if err == nil {
// 命名空间已存在
return nil
}
if !errors.IsNotFound(err) {
// 获取命名空间时发生其他错误
return fmt.Errorf("failed to get namespace %s: %w", namespace, err)
}
// 命名空间不存在,创建它
logger.Info("Creating namespace", "namespace", namespace)
newNS := &core_v1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: namespace,
},
}
if err := r.Create(ctx, newNS); err != nil {
return fmt.Errorf("failed to create namespace %s: %w", namespace, err)
}
logger.Info("Successfully created namespace", "namespace", namespace)
return nil
}
// 为了在SetupWithManager中注册Istio资源监控
func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&applicationv1.Application{}).
Owns(&apps_v1.Deployment{}).
Owns(&apps_v1.StatefulSet{}).
Owns(&core_v1.Service{}).
// 添加对Istio资源的监控
Owns(&istionetworkingv1.VirtualService{}).
Owns(&istionetworkingv1.Gateway{}).
Owns(&istionetworkingv1.DestinationRule{}).
Complete(r)
}
func (r *ApplicationReconciler) reconcileService(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
serviceName := app.Name + "-svc"
// 检查是否需要创建 Service
shouldCreate := false
if app.Spec.Service != nil {
shouldCreate = app.Spec.Service.Enabled
} else {
// 向后兼容:使用旧的 expose 配置
shouldCreate = app.Spec.Expose && len(app.Spec.Template.Ports) > 0
}
// 获取现有 Service
service := &core_v1.Service{}
err := r.Get(ctx, types.NamespacedName{
Name: serviceName,
Namespace: app.Namespace,
}, service)
serviceExists := !errors.IsNotFound(err)
if !shouldCreate {
// 如果不需要 Service 但存在,则删除
if serviceExists {
logger.Info("Deleting existing Service as it's disabled", "name", serviceName)
if err := r.Delete(ctx, service); err != nil {
return fmt.Errorf("failed to delete service: %w", err)
}
}
return nil
}
// 需要创建 Service
if !serviceExists {
// Service 不存在,创建新的
logger.Info("Creating new Service", "name", serviceName)
newService, err := application_controller_utils.NewService(app)
if err != nil {
return fmt.Errorf("failed to generate service: %w", err)
}
if newService == nil {
logger.Info("Service creation skipped", "name", serviceName)
return nil
}
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newService, r.Scheme); err != nil {
return fmt.Errorf("failed to set controller reference: %w", err)
}
if err := r.Create(ctx, newService); err != nil {
return fmt.Errorf("failed to create service: %w", err)
}
logger.Info("Successfully created Service",
"name", serviceName,
"type", newService.Spec.Type,
"ports", len(newService.Spec.Ports))
return nil
}
// Service 存在,检查是否需要更新
if err != nil {
return fmt.Errorf("failed to get service: %w", err)
}
logger.Info("Checking Service for updates", "name", serviceName)
updatedService, err := application_controller_utils.NewService(app)
if err != nil {
return fmt.Errorf("failed to generate updated service: %w", err)
}
if updatedService == nil {
logger.Info("Service should be deleted", "name", serviceName)
if err := r.Delete(ctx, service); err != nil {
return fmt.Errorf("failed to delete service: %w", err)
}
return nil
}
// 检查是否需要更新
needsUpdate := false
updateFields := make(map[string]interface{})
// 检查服务类型
if service.Spec.Type != updatedService.Spec.Type {
logger.Info("Service type changed",
"current", service.Spec.Type,
"desired", updatedService.Spec.Type)
needsUpdate = true
updateFields["type"] = updatedService.Spec.Type
service.Spec.Type = updatedService.Spec.Type
}
// 检查端口配置
if !equalServicePorts(service.Spec.Ports, updatedService.Spec.Ports) {
logger.Info("Service ports changed")
needsUpdate = true
updateFields["ports"] = "changed"
service.Spec.Ports = updatedService.Spec.Ports
}
// 检查 LoadBalancer 配置
if service.Spec.LoadBalancerIP != updatedService.Spec.LoadBalancerIP {
needsUpdate = true
updateFields["loadBalancerIP"] = updatedService.Spec.LoadBalancerIP
service.Spec.LoadBalancerIP = updatedService.Spec.LoadBalancerIP
}
if !equalStringSlices(service.Spec.LoadBalancerSourceRanges, updatedService.Spec.LoadBalancerSourceRanges) {
needsUpdate = true
updateFields["loadBalancerSourceRanges"] = "changed"
service.Spec.LoadBalancerSourceRanges = updatedService.Spec.LoadBalancerSourceRanges
}
// 检查 ExternalName
if service.Spec.ExternalName != updatedService.Spec.ExternalName {
needsUpdate = true
updateFields["externalName"] = updatedService.Spec.ExternalName
service.Spec.ExternalName = updatedService.Spec.ExternalName
}
// 检查会话亲和性
if service.Spec.SessionAffinity != updatedService.Spec.SessionAffinity {
needsUpdate = true
updateFields["sessionAffinity"] = updatedService.Spec.SessionAffinity
service.Spec.SessionAffinity = updatedService.Spec.SessionAffinity
}
// 更新标签
if !equalStringMaps(service.Labels, updatedService.Labels) {
needsUpdate = true
updateFields["labels"] = "changed"
if service.Labels == nil {
service.Labels = make(map[string]string)
}
for k, v := range updatedService.Labels {
service.Labels[k] = v
}
}
// 更新注解
if !equalStringMaps(service.Annotations, updatedService.Annotations) {
needsUpdate = true
updateFields["annotations"] = "changed"
if service.Annotations == nil {
service.Annotations = make(map[string]string)
}
for k, v := range updatedService.Annotations {
service.Annotations[k] = v
}
}
if needsUpdate {
logger.Info("Updating Service", "name", serviceName, "fields", updateFields)
if err := r.Update(ctx, service); err != nil {
return fmt.Errorf("failed to update service: %w", err)
}
logger.Info("Service updated successfully", "name", serviceName)
} else {
logger.Info("No service updates needed", "name", serviceName)
}
return nil
}
// 添加辅助函数:比较服务端口
func equalServicePorts(current, desired []core_v1.ServicePort) bool {
if len(current) != len(desired) {
return false
}
currentMap := make(map[string]core_v1.ServicePort)
for _, port := range current {
currentMap[port.Name] = port
}
for _, port := range desired {
if currentPort, exists := currentMap[port.Name]; !exists ||
currentPort.Port != port.Port ||
currentPort.TargetPort != port.TargetPort ||
currentPort.Protocol != port.Protocol ||
currentPort.NodePort != port.NodePort {
return false
}
}
return true
}
// 添加辅助函数:比较字符串切片
func equalStringSlices(current, desired []string) bool {
if len(current) != len(desired) {
return false
}
currentMap := make(map[string]bool)
for _, item := range current {
currentMap[item] = true
}
for _, item := range desired {
if !currentMap[item] {
return false
}
}
return true
}
// reconcileGateway 处理Gateway资源
func (r *ApplicationReconciler) reconcileGateway(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
gatewayName := app.Name + "-gateway"
// 在创建/更新 Gateway 前,确保 TLS Secret
if err := r.reconcileGatewayTLSSecret(ctx, app); err != nil {
return fmt.Errorf("failed to reconcile gateway TLS secret: %w", err)
}
gatewayNamespace, err := r.determineGatewayNamespace(ctx, app)
if err != nil {
return fmt.Errorf("failed to determine gateway namespace: %w", err)
}
// 删除遗留在应用命名空间的 Gateway兼容旧版本
if gatewayNamespace != app.Namespace {
legacyGateway := &istionetworkingv1.Gateway{}
legacyKey := types.NamespacedName{Name: gatewayName, Namespace: app.Namespace}
if legacyErr := r.Get(ctx, legacyKey, legacyGateway); legacyErr == nil {
logger.Info("Deleting legacy Gateway from application namespace", "name", gatewayName)
if err := r.Delete(ctx, legacyGateway); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete legacy gateway: %w", err)
}
}
}
// Get + Create/Update 以规避 CreateOrUpdate 对 protobuf 类型的反射补丁
existingGateway := &istionetworkingv1.Gateway{}
gatewayKey := types.NamespacedName{Name: gatewayName, Namespace: gatewayNamespace}
err = r.Get(ctx, gatewayKey, existingGateway)
if errors.IsNotFound(err) {
logger.Info("Creating new Gateway", "name", gatewayName)
newGateway := &istionetworkingv1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Name: gatewayName,
Namespace: gatewayNamespace,
},
}
ensureGatewayLabels(newGateway, app)
if err := r.configureGateway(ctx, newGateway, app); err != nil {
return fmt.Errorf("failed to configure Gateway: %w", err)
}
if gatewayNamespace == app.Namespace {
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newGateway, r.Scheme); err != nil {
return fmt.Errorf("failed to set controller reference: %w", err)
}
}
if err := r.Create(ctx, newGateway); err != nil {
return fmt.Errorf("failed to create Gateway: %w", err)
}
logger.Info("Gateway created", "name", gatewayName, "namespace", gatewayNamespace)
} else if err != nil {
return fmt.Errorf("failed to get Gateway: %w", err)
} else {
ensureGatewayLabels(existingGateway, app)
if err := r.configureGateway(ctx, existingGateway, app); err != nil {
return fmt.Errorf("failed to configure Gateway: %w", err)
}
if err := r.Update(ctx, existingGateway); err != nil {
return fmt.Errorf("failed to update Gateway: %w", err)
}
logger.Info("Gateway updated", "name", gatewayName, "namespace", gatewayNamespace)
}
// 协调与Gateway关联的VirtualService
if err := r.reconcileGatewayVirtualService(ctx, app, gatewayNamespace); err != nil {
return fmt.Errorf("failed to reconcile gateway VirtualService: %w", err)
}
return nil
}
// configureGateway 配置Gateway资源
func (r *ApplicationReconciler) configureGateway(ctx context.Context, gateway *istionetworkingv1.Gateway, app *applicationv1.Application) error {
// 设置Gateway选择器
gateway.Spec.Selector = map[string]string{
"istio": "ingressgateway",
}
// 清空服务器列表,准备重新添加
gateway.Spec.Servers = []*istioapinetworkingv1.Server{}
// 如果未配置端口则添加默认HTTP端口
if len(app.Spec.NetworkPolicy.Gateway.Ports) == 0 {
gateway.Spec.Servers = append(gateway.Spec.Servers, &istioapinetworkingv1.Server{
Port: &istioapinetworkingv1.Port{
Number: 80,
Protocol: "HTTP",
Name: "http",
},
Hosts: getHosts(app),
})
} else {
// 添加所有配置的端口
for _, port := range app.Spec.NetworkPolicy.Gateway.Ports {
server := &istioapinetworkingv1.Server{
Port: &istioapinetworkingv1.Port{
Number: uint32(port.Number),
Protocol: port.Protocol,
Name: port.Name,
},
Hosts: getHosts(app),
}
gateway.Spec.Servers = append(gateway.Spec.Servers, server)
}
}
// 配置TLS
if len(app.Spec.NetworkPolicy.Gateway.TLS) > 0 {
for _, tls := range app.Spec.NetworkPolicy.Gateway.TLS {
// 查找相应协议的服务器
var serverIndex = -1
for j, server := range gateway.Spec.Servers {
if server.Port.Protocol == "HTTPS" || server.Port.Protocol == "TLS" {
serverIndex = j
break
}
}
// 如果没有找到HTTPS/TLS服务器创建一个
if serverIndex == -1 {
gateway.Spec.Servers = append(gateway.Spec.Servers, &istioapinetworkingv1.Server{
Port: &istioapinetworkingv1.Port{
Number: 443,
Protocol: "HTTPS",
Name: "https",
},
Hosts: getHosts(app),
})
serverIndex = len(gateway.Spec.Servers) - 1
}
// 配置TLS设置
server := gateway.Spec.Servers[serverIndex]
// 确定 Secret 的命名空间
secretNS, err := r.detectIngressNamespace(ctx, tls.SecretNamespace)
if err != nil {
return fmt.Errorf("failed to determine secret namespace for TLS: %w", err)
}
// 如果 Secret 不在 Gateway 所在命名空间,使用 namespace/secretName 格式
credentialName := tls.SecretName
if secretNS != gateway.Namespace {
credentialName = secretNS + "/" + tls.SecretName
}
server.Tls = &istioapinetworkingv1.ServerTLSSettings{
Mode: getIstioTLSMode(tls.Mode),
CredentialName: credentialName,
}
// 设置最小TLS版本
if tls.MinProtocolVersion != "" {
server.Tls.MinProtocolVersion = getIstioTLSVersion(tls.MinProtocolVersion)
}
// 如果指定了特定主机,覆盖默认主机
if len(tls.Hosts) > 0 {
server.Hosts = tls.Hosts
}
}
}
return nil
}
// 辅助函数
func getHosts(app *applicationv1.Application) []string {
// 如果NetworkPolicy.Gateway.Hosts存在使用它
if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Gateway != nil && len(app.Spec.NetworkPolicy.Gateway.Hosts) > 0 {
return app.Spec.NetworkPolicy.Gateway.Hosts
}
// 不再检查旧的Ingress配置
// 使用通配符
return []string{"*"}
}
func getIstioTLSMode(mode string) istioapinetworkingv1.ServerTLSSettings_TLSmode {
switch mode {
case "MUTUAL":
return istioapinetworkingv1.ServerTLSSettings_MUTUAL
case "PASSTHROUGH":
return istioapinetworkingv1.ServerTLSSettings_PASSTHROUGH
default:
return istioapinetworkingv1.ServerTLSSettings_SIMPLE
}
}
func getIstioTLSVersion(version string) istioapinetworkingv1.ServerTLSSettings_TLSProtocol {
switch version {
case "TLSv1_0":
return istioapinetworkingv1.ServerTLSSettings_TLSV1_0
case "TLSv1_1":
return istioapinetworkingv1.ServerTLSSettings_TLSV1_1
case "TLSv1_3":
return istioapinetworkingv1.ServerTLSSettings_TLSV1_3
default:
return istioapinetworkingv1.ServerTLSSettings_TLSV1_2
}
}
// detectIngressNamespace 自动探测 IngressGateway 所在命名空间
// 优先级CRD 指定 -> 根据 selector {istio=ingressgateway} 查找 Service
func (r *ApplicationReconciler) detectIngressNamespace(ctx context.Context, explicit string) (string, error) {
if explicit != "" {
return explicit, nil
}
// 优先找名为 istio-ingressgateway 的 Service
svcList := &core_v1.ServiceList{}
if err := r.List(ctx, svcList, client.MatchingLabels{"istio": "ingressgateway"}); err != nil {
return "", fmt.Errorf("list ingressgateway services failed: %w", err)
}
var fallback string
for _, svc := range svcList.Items {
if svc.Name == "istio-ingressgateway" {
return svc.Namespace, nil
}
if fallback == "" {
fallback = svc.Namespace
}
}
if fallback != "" {
return fallback, nil
}
return "", fmt.Errorf("cannot detect ingress gateway namespace; set tls.secretNamespace or ISTIO_INGRESS_NAMESPACE")
}
// determineGatewayNamespace 决定 Gateway 应创建到的命名空间
func (r *ApplicationReconciler) determineGatewayNamespace(ctx context.Context, app *applicationv1.Application) (string, error) {
var explicit string
if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Gateway != nil {
for _, tls := range app.Spec.NetworkPolicy.Gateway.TLS {
if tls.SecretNamespace != "" {
explicit = tls.SecretNamespace
break
}
}
}
return r.detectIngressNamespace(ctx, explicit)
}
func ensureGatewayLabels(gateway *istionetworkingv1.Gateway, app *applicationv1.Application) {
if gateway.Labels == nil {
gateway.Labels = make(map[string]string)
}
gateway.Labels["app.k8s.devstar/name"] = app.Name
gateway.Labels["app.k8s.devstar/namespace"] = app.Namespace
}
// reconcileGatewayTLSSecret 确保 Gateway TLS 所需的 Secret 存在/最新
func (r *ApplicationReconciler) reconcileGatewayTLSSecret(ctx context.Context, app *applicationv1.Application) error {
if app.Spec.NetworkPolicy == nil || app.Spec.NetworkPolicy.Gateway == nil {
return nil
}
tlsList := app.Spec.NetworkPolicy.Gateway.TLS
if len(tlsList) == 0 {
return nil
}
for _, tls := range tlsList {
if strings.EqualFold(tls.Mode, "PASSTHROUGH") {
continue
}
secretName := tls.SecretName
if secretName == "" {
return fmt.Errorf("gateway.tls.secretName is required when TLS mode is %s", tls.Mode)
}
// 自动确定目标命名空间
targetNS, err := r.detectIngressNamespace(ctx, tls.SecretNamespace)
if err != nil {
return err
}
hasInline := tls.Certificate != "" && tls.PrivateKey != ""
existing := &core_v1.Secret{}
getErr := r.Get(ctx, types.NamespacedName{Namespace: targetNS, Name: secretName}, existing)
if errors.IsNotFound(getErr) {
if !hasInline {
return fmt.Errorf("secret %s/%s not found; provide certificate/privateKey or create it manually", targetNS, secretName)
}
// 规范化证书和私钥格式(确保是有效的 PEM 格式)
certPEM := normalizePEM(tls.Certificate, "CERTIFICATE")
keyPEM := normalizePEM(tls.PrivateKey, "PRIVATE KEY")
newSec := &core_v1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: secretName,
Namespace: targetNS,
Labels: map[string]string{
"app.k8s.devstar/name": app.Name,
"app.k8s.devstar/type": "gateway-tls",
},
},
Type: core_v1.SecretTypeTLS,
Data: map[string][]byte{
core_v1.TLSCertKey: []byte(certPEM),
core_v1.TLSPrivateKeyKey: []byte(keyPEM),
},
}
if err := r.Create(ctx, newSec); err != nil {
return fmt.Errorf("create tls secret %s/%s failed: %w", targetNS, secretName, err)
}
continue
} else if getErr != nil {
return fmt.Errorf("get secret %s/%s failed: %w", targetNS, secretName, getErr)
}
if hasInline {
// 规范化证书和私钥格式
certPEM := normalizePEM(tls.Certificate, "CERTIFICATE")
keyPEM := normalizePEM(tls.PrivateKey, "PRIVATE KEY")
if existing.Type != core_v1.SecretTypeTLS ||
!bytes.Equal(existing.Data[core_v1.TLSCertKey], []byte(certPEM)) ||
!bytes.Equal(existing.Data[core_v1.TLSPrivateKeyKey], []byte(keyPEM)) {
if existing.Data == nil {
existing.Data = map[string][]byte{}
}
existing.Type = core_v1.SecretTypeTLS
existing.Data[core_v1.TLSCertKey] = []byte(certPEM)
existing.Data[core_v1.TLSPrivateKeyKey] = []byte(keyPEM)
if existing.Labels == nil {
existing.Labels = map[string]string{}
}
existing.Labels["app.k8s.devstar/name"] = app.Name
existing.Labels["app.k8s.devstar/type"] = "gateway-tls"
if err := r.Update(ctx, existing); err != nil {
return fmt.Errorf("update tls secret %s/%s failed: %w", targetNS, secretName, err)
}
}
}
}
return nil
}
// detectPrivateKeyType 检测私钥的原始格式类型
// 返回 "RSA PRIVATE KEY" 或 "PRIVATE KEY"
func detectPrivateKeyType(content string) string {
contentUpper := strings.ToUpper(content)
if strings.Contains(contentUpper, "-----BEGIN RSA PRIVATE KEY-----") {
return "RSA PRIVATE KEY"
}
if strings.Contains(contentUpper, "-----BEGIN PRIVATE KEY-----") {
return "PRIVATE KEY"
}
if strings.Contains(contentUpper, "-----BEGIN EC PRIVATE KEY-----") {
return "EC PRIVATE KEY"
}
// 默认返回 PRIVATE KEY
return "PRIVATE KEY"
}
// normalizePEM 规范化 PEM 格式的证书或私钥
// 支持两种输入格式:
// 1. 只包含 base64 内容(没有 BEGIN/END自动添加标记并格式化
// 2. 完整 PEM 格式(包含 BEGIN/END提取内容并重新格式化为标准格式
// 对于私钥,如果 pemType 是 "PRIVATE KEY"会自动检测原始格式RSA PRIVATE KEY 或 PRIVATE KEY并保持
func normalizePEM(content, pemType string) string {
content = strings.TrimSpace(content)
if content == "" {
return ""
}
// 对于私钥,如果指定为 "PRIVATE KEY",自动检测原始格式
if pemType == "PRIVATE KEY" && strings.Contains(strings.ToUpper(content), "-----BEGIN") {
detectedType := detectPrivateKeyType(content)
if detectedType != "PRIVATE KEY" {
// 使用检测到的原始格式
pemType = detectedType
}
}
// 如果已经包含 BEGIN 标记,需要提取 BEGIN 和 END 之间的内容
if strings.Contains(content, "-----BEGIN") {
// 特别处理证书链:保留所有 CERTIFICATE 分段
if pemType == "CERTIFICATE" {
var blocks []string
search := content
for {
beginIdx := strings.Index(search, "-----BEGIN CERTIFICATE-----")
if beginIdx == -1 {
break
}
searchFrom := search[beginIdx+len("-----BEGIN CERTIFICATE-----"):]
endMarker := "-----END CERTIFICATE-----"
endIdx := strings.Index(searchFrom, endMarker)
if endIdx == -1 {
// 没有匹配的 END使用 BEGIN 之后的所有内容作为最后一段
body := searchFrom
blocks = append(blocks, body)
break
}
body := searchFrom[:endIdx]
blocks = append(blocks, body)
// 继续在 END 之后搜索下一段
nextStart := endIdx + len(endMarker)
if nextStart >= len(searchFrom) {
break
}
search = searchFrom[nextStart:]
}
if len(blocks) > 0 {
return buildPEMChainFromContent(blocks, "CERTIFICATE")
}
// 如果没有成功解析到分段,退化为单段处理
}
// 非证书链,按单段处理
// 对于私钥,需要匹配对应的 BEGIN/END 标记
if pemType == "RSA PRIVATE KEY" || pemType == "EC PRIVATE KEY" || pemType == "PRIVATE KEY" {
// 查找对应的 BEGIN 标记
beginMarker := fmt.Sprintf("-----BEGIN %s-----", pemType)
beginIdx := strings.Index(strings.ToUpper(content), strings.ToUpper(beginMarker))
if beginIdx == -1 {
// 如果找不到精确匹配,尝试查找任何 BEGIN 标记
beginIdx = strings.Index(strings.ToUpper(content), "-----BEGIN")
}
if beginIdx == -1 {
return buildPEMSingleFromContent(content, pemType)
}
// 计算 BEGIN 标记行的结束位置
// 先尝试查找换行符
beginLineEnd := strings.Index(content[beginIdx:], "\n")
if beginLineEnd == -1 {
beginLineEnd = strings.Index(content[beginIdx:], "\r")
}
if beginLineEnd == -1 {
// 如果没有换行符,直接使用 BEGIN 标记的长度
// 这样可以避免在单行格式中错误地匹配到 END 标记
beginLineEnd = len(beginMarker)
} else {
// 找到了换行符beginLineEnd 是相对于 beginIdx 的偏移
beginLineEnd += 1 // 包含换行符
}
// 查找对应的 END 标记
endMarker := fmt.Sprintf("-----END %s-----", pemType)
searchStart := beginIdx + beginLineEnd
if searchStart > len(content) {
searchStart = len(content)
}
endIdx := strings.Index(strings.ToUpper(content[searchStart:]), strings.ToUpper(endMarker))
if endIdx == -1 {
// 如果找不到精确匹配,尝试查找任何 END 标记
endIdx = strings.Index(strings.ToUpper(content[searchStart:]), "-----END")
}
if endIdx == -1 {
// 如果找不到 END提取 BEGIN 之后的所有内容
bodyContent := content[beginIdx+beginLineEnd:]
return buildPEMSingleFromContent(bodyContent, pemType)
}
// 提取 BEGIN 行结束和 END 标记开始之间的内容
bodyContent := content[beginIdx+beginLineEnd : searchStart+endIdx]
// 移除可能包含的 END 标记前缀(防止单行格式时误包含)
bodyContent = strings.TrimSpace(bodyContent)
if strings.HasPrefix(strings.ToUpper(bodyContent), "-----END") {
// 如果 bodyContent 以 END 标记开头,说明提取错误,需要重新提取
endMarkerStart := strings.Index(strings.ToUpper(bodyContent), strings.ToUpper(endMarker))
if endMarkerStart != -1 {
bodyContent = bodyContent[:endMarkerStart]
} else {
// 如果找不到完整的 END 标记,尝试查找 "-----END" 的位置
endDashIdx := strings.Index(strings.ToUpper(bodyContent), "-----END")
if endDashIdx != -1 {
bodyContent = bodyContent[:endDashIdx]
}
}
}
return buildPEMSingleFromContent(bodyContent, pemType)
}
// 其他类型(证书等)的原有逻辑
// 查找 BEGIN 标记的结束位置(包含完整标记行)
beginIdx := strings.Index(content, "-----BEGIN")
if beginIdx == -1 {
return buildPEMSingleFromContent(content, pemType)
}
// 找到 BEGIN 行结束(下一个换行符,或字符串结束)
beginLineEnd := strings.Index(content[beginIdx:], "\n")
if beginLineEnd == -1 {
beginLineEnd = strings.Index(content[beginIdx:], "\r")
}
if beginLineEnd == -1 {
// 如果没有换行,查找下一个 "-----" 作为结束
nextDash := strings.Index(content[beginIdx+10:], "-----")
if nextDash != -1 {
beginLineEnd = beginIdx + 10 + nextDash + 5
} else {
beginLineEnd = len(content)
}
} else {
beginLineEnd += beginIdx + 1
}
// 查找 END 标记
endPattern := "-----END"
searchStart := beginLineEnd
if searchStart > len(content) {
searchStart = len(content)
}
endIdx := strings.Index(content[searchStart:], endPattern)
if endIdx == -1 {
// 如果找不到 END提取 BEGIN 之后的所有内容
bodyContent := content[beginLineEnd:]
return buildPEMSingleFromContent(bodyContent, pemType)
}
// 提取 BEGIN 行结束和 END 标记开始之间的内容
bodyContent := content[beginLineEnd : searchStart+endIdx]
// 清理并重新格式化
return buildPEMSingleFromContent(bodyContent, pemType)
}
// 如果没有 BEGIN 标记,直接格式化
return buildPEMSingleFromContent(content, pemType)
}
// buildPEMSingleFromContent 从清理后的内容构建单段标准 PEM 格式
// 只移除空白字符,保留所有实际的 base64 内容
func buildPEMSingleFromContent(content, pemType string) string {
// 移除所有空格、换行符、制表符等空白字符
cleaned := strings.ReplaceAll(content, " ", "")
cleaned = strings.ReplaceAll(cleaned, "\n", "")
cleaned = strings.ReplaceAll(cleaned, "\r", "")
cleaned = strings.ReplaceAll(cleaned, "\t", "")
cleaned = strings.TrimSpace(cleaned)
if cleaned == "" {
return ""
}
// 构建标准 PEM 格式
var pem strings.Builder
pem.WriteString("-----BEGIN ")
pem.WriteString(pemType)
pem.WriteString("-----\n")
// 每 64 字符一行
for i := 0; i < len(cleaned); i += 64 {
end := i + 64
if end > len(cleaned) {
end = len(cleaned)
}
pem.WriteString(cleaned[i:end])
pem.WriteString("\n")
}
pem.WriteString("-----END ")
pem.WriteString(pemType)
pem.WriteString("-----\n")
return pem.String()
}
// buildPEMChainFromContent 根据多段内容构建包含多段的 PEM 证书链
func buildPEMChainFromContent(blockBodies []string, pemType string) string {
var b strings.Builder
for _, body := range blockBodies {
seg := buildPEMSingleFromContent(body, pemType)
if seg == "" {
continue
}
b.WriteString(seg)
}
return b.String()
}
// reconcileGatewayVirtualService 处理与Gateway关联的VirtualService
func (r *ApplicationReconciler) reconcileGatewayVirtualService(ctx context.Context, app *applicationv1.Application, gatewayNamespace string) error {
logger := log.FromContext(ctx)
vsName := app.Name + "-gateway-vs"
gatewayName := app.Name + "-gateway"
gatewayRef := gatewayName
if gatewayNamespace != "" && gatewayNamespace != app.Namespace {
gatewayRef = fmt.Sprintf("%s/%s", gatewayNamespace, gatewayName)
}
// 检查服务是否存在
serviceName := app.Name + "-svc"
service := &core_v1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service)
if errors.IsNotFound(err) {
logger.Info("Service not found, skipping VirtualService creation", "service", serviceName)
return nil
} else if err != nil {
return fmt.Errorf("failed to get service: %w", err)
}
// Get + Create/Update VS
existingVS := &istionetworkingv1.VirtualService{}
err = r.Get(ctx, types.NamespacedName{Name: vsName, Namespace: app.Namespace}, existingVS)
if errors.IsNotFound(err) {
logger.Info("Creating new Gateway VirtualService", "name", vsName)
newVS := &istionetworkingv1.VirtualService{ObjectMeta: metav1.ObjectMeta{Name: vsName, Namespace: app.Namespace}}
if err := r.configureGatewayVirtualService(newVS, app, service, gatewayRef); err != nil {
return fmt.Errorf("failed to configure VirtualService: %w", err)
}
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newVS, r.Scheme); err != nil {
return fmt.Errorf("failed to set controller reference: %w", err)
}
if err := r.Create(ctx, newVS); err != nil {
return fmt.Errorf("failed to create VirtualService: %w", err)
}
logger.Info("Gateway VirtualService created", "name", vsName)
} else if err != nil {
return fmt.Errorf("failed to get VirtualService: %w", err)
} else {
if err := r.configureGatewayVirtualService(existingVS, app, service, gatewayRef); err != nil {
return fmt.Errorf("failed to configure VirtualService: %w", err)
}
if err := r.Update(ctx, existingVS); err != nil {
return fmt.Errorf("failed to update VirtualService: %w", err)
}
logger.Info("Gateway VirtualService updated", "name", vsName)
}
return nil
}
// configureGatewayVirtualService 配置与Gateway关联的VirtualService
func (r *ApplicationReconciler) configureGatewayVirtualService(vs *istionetworkingv1.VirtualService, app *applicationv1.Application, service *core_v1.Service, gatewayRef string) error {
// 设置基本字段
vs.Spec.Hosts = getHosts(app)
vs.Spec.Gateways = []string{gatewayRef}
// 创建HTTP路由
httpRoute := &istioapinetworkingv1.HTTPRoute{
Route: []*istioapinetworkingv1.HTTPRouteDestination{
{
Destination: &istioapinetworkingv1.Destination{
Host: service.Name,
},
},
},
}
// 配置匹配条件
if app.Spec.NetworkPolicy.Mesh != nil && len(app.Spec.NetworkPolicy.Mesh.Routes) > 0 {
for _, route := range app.Spec.NetworkPolicy.Mesh.Routes {
if route.Match != nil {
match := &istioapinetworkingv1.HTTPMatchRequest{}
// URI匹配
if route.Match.URI != nil {
match.Uri = convertStringMatch(route.Match.URI)
}
// 方法匹配
if route.Match.Method != nil {
match.Method = convertStringMatch(route.Match.Method)
}
// 头部匹配
if len(route.Match.Headers) > 0 {
match.Headers = make(map[string]*istioapinetworkingv1.StringMatch)
for key, value := range route.Match.Headers {
match.Headers[key] = convertStringMatch(&value)
}
}
httpRoute.Match = append(httpRoute.Match, match)
}
}
} else {
// 默认匹配所有路径
httpRoute.Match = []*istioapinetworkingv1.HTTPMatchRequest{
{
Uri: &istioapinetworkingv1.StringMatch{
MatchType: &istioapinetworkingv1.StringMatch_Prefix{
Prefix: "/",
},
},
},
}
}
// 配置超时
if app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Timeout > 0 {
seconds := app.Spec.NetworkPolicy.Mesh.Timeout / 1000
nanos := (app.Spec.NetworkPolicy.Mesh.Timeout % 1000) * 1000000
httpRoute.Timeout = &durationpb.Duration{
Seconds: int64(seconds),
Nanos: int32(nanos),
}
}
// 配置重试
if app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Retry != nil {
retryOn := "5xx,gateway-error,connect-failure"
if len(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn) > 0 {
retryOn = strings.Join(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn, ",")
}
retry := &istioapinetworkingv1.HTTPRetry{
Attempts: app.Spec.NetworkPolicy.Mesh.Retry.Attempts,
RetryOn: retryOn,
}
if app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout > 0 {
seconds := app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout / 1000
nanos := (app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout % 1000) * 1000000
retry.PerTryTimeout = &durationpb.Duration{
Seconds: int64(seconds),
Nanos: int32(nanos),
}
}
httpRoute.Retries = retry
}
vs.Spec.Http = []*istioapinetworkingv1.HTTPRoute{httpRoute}
return nil
}
// convertStringMatch 将Application CRD的StringMatch转换为Istio API的StringMatch
func convertStringMatch(match *applicationv1.StringMatch) *istioapinetworkingv1.StringMatch {
if match == nil {
return nil
}
result := &istioapinetworkingv1.StringMatch{}
if match.Exact != "" {
result.MatchType = &istioapinetworkingv1.StringMatch_Exact{
Exact: match.Exact,
}
} else if match.Prefix != "" {
result.MatchType = &istioapinetworkingv1.StringMatch_Prefix{
Prefix: match.Prefix,
}
} else if match.Regex != "" {
result.MatchType = &istioapinetworkingv1.StringMatch_Regex{
Regex: match.Regex,
}
}
return result
}
// reconcileMesh 处理服务网格配置
func (r *ApplicationReconciler) reconcileMesh(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
// 注入Sidecar (如果需要)
if app.Spec.NetworkPolicy.Mesh.Sidecar != nil && app.Spec.NetworkPolicy.Mesh.Sidecar.Inject {
if err := r.ensureSidecarInjection(ctx, app); err != nil {
return fmt.Errorf("failed to ensure sidecar injection: %w", err)
}
}
// 协调东西向流量的VirtualService
if err := r.reconcileMeshVirtualService(ctx, app); err != nil {
return fmt.Errorf("failed to reconcile mesh VirtualService: %w", err)
}
// 协调DestinationRule (熔断器)
if app.Spec.NetworkPolicy.Mesh.CircuitBreaker != nil {
if err := r.reconcileDestinationRule(ctx, app); err != nil {
return fmt.Errorf("failed to reconcile DestinationRule: %w", err)
}
} else {
// 清理不再需要的DestinationRule
if err := r.cleanupDestinationRule(ctx, app); err != nil {
return fmt.Errorf("failed to cleanup DestinationRule: %w", err)
}
}
logger.Info("Mesh resources reconciled", "name", app.Name)
return nil
}
// ensureSidecarInjection 确保应用的工作负载启用了Sidecar注入
func (r *ApplicationReconciler) ensureSidecarInjection(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
// 检查应用类型,更新相应的工作负载
if app.Spec.Template.Type == "stateful" {
// 更新StatefulSet
statefulSet := &apps_v1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, statefulSet)
if err == nil {
// 设置注入注解
if statefulSet.Spec.Template.Annotations == nil {
statefulSet.Spec.Template.Annotations = make(map[string]string)
}
statefulSet.Spec.Template.Annotations["sidecar.istio.io/inject"] = "true"
if err := r.Update(ctx, statefulSet); err != nil {
return fmt.Errorf("failed to update StatefulSet for sidecar injection: %w", err)
}
logger.Info("Enabled sidecar injection for StatefulSet", "name", statefulSet.Name)
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get StatefulSet: %w", err)
}
} else {
// 更新Deployment
deployment := &apps_v1.Deployment{}
err := r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, deployment)
if err == nil {
// 设置注入注解
if deployment.Spec.Template.Annotations == nil {
deployment.Spec.Template.Annotations = make(map[string]string)
}
deployment.Spec.Template.Annotations["sidecar.istio.io/inject"] = "true"
if err := r.Update(ctx, deployment); err != nil {
return fmt.Errorf("failed to update Deployment for sidecar injection: %w", err)
}
logger.Info("Enabled sidecar injection for Deployment", "name", deployment.Name)
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get Deployment: %w", err)
}
}
return nil
}
// reconcileMeshVirtualService 处理服务网格内的VirtualService
func (r *ApplicationReconciler) reconcileMeshVirtualService(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
vsName := app.Name + "-mesh-vs"
// 检查服务是否存在
serviceName := app.Name + "-svc"
service := &core_v1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service)
if errors.IsNotFound(err) {
logger.Info("Service not found, skipping mesh VirtualService creation", "service", serviceName)
return nil
} else if err != nil {
return fmt.Errorf("failed to get service: %w", err)
}
// 创建或更新VirtualService
vs := &istionetworkingv1.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: vsName,
Namespace: app.Namespace,
},
}
op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, vs, func() error {
// 设置控制器引用
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, vs, r.Scheme); err != nil {
return err
}
// 配置VirtualService
vs.Spec.Hosts = []string{service.Name}
vs.Spec.Gateways = []string{"mesh"} // 服务网格内部流量
// 创建HTTP路由
httpRoutes := []*istioapinetworkingv1.HTTPRoute{}
// 如果定义了路由规则,使用它们
if len(app.Spec.NetworkPolicy.Mesh.Routes) > 0 {
for _, route := range app.Spec.NetworkPolicy.Mesh.Routes {
httpRoute := &istioapinetworkingv1.HTTPRoute{}
// 配置匹配条件
if route.Match != nil {
match := &istioapinetworkingv1.HTTPMatchRequest{}
// URI匹配
if route.Match.URI != nil {
match.Uri = convertStringMatch(route.Match.URI)
}
// 方法匹配
if route.Match.Method != nil {
match.Method = convertStringMatch(route.Match.Method)
}
// 头部匹配
if len(route.Match.Headers) > 0 {
match.Headers = make(map[string]*istioapinetworkingv1.StringMatch)
for key, value := range route.Match.Headers {
match.Headers[key] = convertStringMatch(&value)
}
}
httpRoute.Match = []*istioapinetworkingv1.HTTPMatchRequest{match}
}
// 配置目标
destination := &istioapinetworkingv1.HTTPRouteDestination{
Destination: &istioapinetworkingv1.Destination{
Host: route.Destination.Host,
},
Weight: route.Weight,
}
if route.Destination.Subset != "" {
destination.Destination.Subset = route.Destination.Subset
}
if route.Destination.Port > 0 {
destination.Destination.Port = &istioapinetworkingv1.PortSelector{
Number: uint32(route.Destination.Port),
}
}
httpRoute.Route = []*istioapinetworkingv1.HTTPRouteDestination{destination}
httpRoutes = append(httpRoutes, httpRoute)
}
} else {
// 默认路由
httpRoute := &istioapinetworkingv1.HTTPRoute{
Route: []*istioapinetworkingv1.HTTPRouteDestination{
{
Destination: &istioapinetworkingv1.Destination{
Host: service.Name,
},
},
},
}
httpRoutes = append(httpRoutes, httpRoute)
}
// 配置超时
if app.Spec.NetworkPolicy.Mesh.Timeout > 0 {
for _, route := range httpRoutes {
seconds := app.Spec.NetworkPolicy.Mesh.Timeout / 1000
nanos := (app.Spec.NetworkPolicy.Mesh.Timeout % 1000) * 1000000
route.Timeout = &durationpb.Duration{
Seconds: int64(seconds),
Nanos: int32(nanos),
}
}
}
// 配置重试
if app.Spec.NetworkPolicy.Mesh.Retry != nil {
for _, route := range httpRoutes {
retryOn := "5xx,gateway-error,connect-failure"
if len(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn) > 0 {
retryOn = strings.Join(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn, ",")
}
retry := &istioapinetworkingv1.HTTPRetry{
Attempts: app.Spec.NetworkPolicy.Mesh.Retry.Attempts,
RetryOn: retryOn,
}
if app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout > 0 {
seconds := app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout / 1000
nanos := (app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout % 1000) * 1000000
retry.PerTryTimeout = &durationpb.Duration{
Seconds: int64(seconds),
Nanos: int32(nanos),
}
}
route.Retries = retry
}
}
// 配置故障注入(测试用途)
if app.Spec.NetworkPolicy.Mesh.FaultInjection != nil {
for _, route := range httpRoutes {
fault := &istioapinetworkingv1.HTTPFaultInjection{}
// 延迟故障
if app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay != nil {
delayDuration := &durationpb.Duration{
Seconds: int64(app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay.FixedDelay / 1000),
Nanos: int32((app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay.FixedDelay % 1000) * 1000000),
}
fault.Delay = &istioapinetworkingv1.HTTPFaultInjection_Delay{
HttpDelayType: &istioapinetworkingv1.HTTPFaultInjection_Delay_FixedDelay{
FixedDelay: delayDuration,
},
Percentage: &istioapinetworkingv1.Percent{
Value: float64(app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay.Percentage) / 100.0,
},
}
}
// 中止故障
if app.Spec.NetworkPolicy.Mesh.FaultInjection.Abort != nil {
httpStatus := int32(app.Spec.NetworkPolicy.Mesh.FaultInjection.Abort.HttpStatus)
fault.Abort = &istioapinetworkingv1.HTTPFaultInjection_Abort{
ErrorType: &istioapinetworkingv1.HTTPFaultInjection_Abort_HttpStatus{
HttpStatus: httpStatus,
},
Percentage: &istioapinetworkingv1.Percent{
Value: float64(app.Spec.NetworkPolicy.Mesh.FaultInjection.Abort.Percentage) / 100.0,
},
}
}
route.Fault = fault
}
}
vs.Spec.Http = httpRoutes
return nil
})
if err != nil {
return fmt.Errorf("failed to create or update mesh VirtualService: %w", err)
}
logger.Info("Mesh VirtualService reconciled", "name", vsName, "operation", op)
return nil
}
// reconcileDestinationRule 处理DestinationRule(熔断器)
func (r *ApplicationReconciler) reconcileDestinationRule(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
drName := app.Name + "-dr"
// 检查服务是否存在
serviceName := app.Name + "-svc"
service := &core_v1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service)
if errors.IsNotFound(err) {
logger.Info("Service not found, skipping DestinationRule creation", "service", serviceName)
return nil
} else if err != nil {
return fmt.Errorf("failed to get service: %w", err)
}
// Get + Create/Update DestinationRule
existingDR := &istionetworkingv1.DestinationRule{}
err = r.Get(ctx, types.NamespacedName{Name: drName, Namespace: app.Namespace}, existingDR)
if errors.IsNotFound(err) {
logger.Info("Creating new DestinationRule", "name", drName)
newDR := &istionetworkingv1.DestinationRule{ObjectMeta: metav1.ObjectMeta{Name: drName, Namespace: app.Namespace}}
newDR.Spec.Host = service.Name
var cb *applicationv1.CircuitBreaker
if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil {
cb = app.Spec.NetworkPolicy.Mesh.CircuitBreaker
}
if cb == nil && app.Spec.TrafficPolicy != nil {
cb = app.Spec.TrafficPolicy.CircuitBreaker
}
if cb != nil {
connectionPool := &istioapinetworkingv1.ConnectionPoolSettings{
Http: &istioapinetworkingv1.ConnectionPoolSettings_HTTPSettings{Http1MaxPendingRequests: 100, MaxRequestsPerConnection: 1},
Tcp: &istioapinetworkingv1.ConnectionPoolSettings_TCPSettings{MaxConnections: 100},
}
maxEjectionPercent := uint32(100)
if cb.MaxEjectionPercent > 0 {
maxEjectionPercent = uint32(cb.MaxEjectionPercent)
}
outlierDetection := &istioapinetworkingv1.OutlierDetection{
ConsecutiveErrors: int32(cb.ConsecutiveErrors),
Interval: &durationpb.Duration{Seconds: 1},
BaseEjectionTime: &durationpb.Duration{Seconds: int64(cb.BaseEjectionTime)},
MaxEjectionPercent: int32(maxEjectionPercent),
}
var loadBalancer *istioapinetworkingv1.LoadBalancerSettings
if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.LoadBalancer != nil {
loadBalancer = convertLoadBalancerSettings(app.Spec.NetworkPolicy.Mesh.LoadBalancer)
} else if app.Spec.TrafficPolicy != nil && app.Spec.TrafficPolicy.LoadBalancer != nil {
loadBalancer = convertLoadBalancerSettings(app.Spec.TrafficPolicy.LoadBalancer)
} else {
loadBalancer = &istioapinetworkingv1.LoadBalancerSettings{LbPolicy: &istioapinetworkingv1.LoadBalancerSettings_Simple{Simple: istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN}}
}
newDR.Spec.TrafficPolicy = &istioapinetworkingv1.TrafficPolicy{ConnectionPool: connectionPool, OutlierDetection: outlierDetection, LoadBalancer: loadBalancer}
}
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newDR, r.Scheme); err != nil {
return fmt.Errorf("failed to set controller reference: %w", err)
}
if err := r.Create(ctx, newDR); err != nil {
return fmt.Errorf("failed to create DestinationRule: %w", err)
}
logger.Info("DestinationRule created", "name", drName)
} else if err != nil {
return fmt.Errorf("failed to get DestinationRule: %w", err)
} else {
existingDR.Spec.Host = service.Name
var cb *applicationv1.CircuitBreaker
if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil {
cb = app.Spec.NetworkPolicy.Mesh.CircuitBreaker
}
if cb == nil && app.Spec.TrafficPolicy != nil {
cb = app.Spec.TrafficPolicy.CircuitBreaker
}
if cb != nil {
connectionPool := &istioapinetworkingv1.ConnectionPoolSettings{
Http: &istioapinetworkingv1.ConnectionPoolSettings_HTTPSettings{Http1MaxPendingRequests: 100, MaxRequestsPerConnection: 1},
Tcp: &istioapinetworkingv1.ConnectionPoolSettings_TCPSettings{MaxConnections: 100},
}
maxEjectionPercent := uint32(100)
if cb.MaxEjectionPercent > 0 {
maxEjectionPercent = uint32(cb.MaxEjectionPercent)
}
outlierDetection := &istioapinetworkingv1.OutlierDetection{ConsecutiveErrors: int32(cb.ConsecutiveErrors), Interval: &durationpb.Duration{Seconds: 1}, BaseEjectionTime: &durationpb.Duration{Seconds: int64(cb.BaseEjectionTime)}, MaxEjectionPercent: int32(maxEjectionPercent)}
var loadBalancer *istioapinetworkingv1.LoadBalancerSettings
if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.LoadBalancer != nil {
loadBalancer = convertLoadBalancerSettings(app.Spec.NetworkPolicy.Mesh.LoadBalancer)
} else if app.Spec.TrafficPolicy != nil && app.Spec.TrafficPolicy.LoadBalancer != nil {
loadBalancer = convertLoadBalancerSettings(app.Spec.TrafficPolicy.LoadBalancer)
} else {
loadBalancer = &istioapinetworkingv1.LoadBalancerSettings{LbPolicy: &istioapinetworkingv1.LoadBalancerSettings_Simple{Simple: istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN}}
}
existingDR.Spec.TrafficPolicy = &istioapinetworkingv1.TrafficPolicy{ConnectionPool: connectionPool, OutlierDetection: outlierDetection, LoadBalancer: loadBalancer}
} else {
existingDR.Spec.TrafficPolicy = nil
}
if err := r.Update(ctx, existingDR); err != nil {
return fmt.Errorf("failed to update DestinationRule: %w", err)
}
logger.Info("DestinationRule updated", "name", drName)
}
return nil
}
// cleanupDestinationRule 清理不再需要的DestinationRule
func (r *ApplicationReconciler) cleanupDestinationRule(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
drName := app.Name + "-dr"
dr := &istionetworkingv1.DestinationRule{}
err := r.Get(ctx, types.NamespacedName{Name: drName, Namespace: app.Namespace}, dr)
if err == nil {
logger.Info("Cleaning up DestinationRule that is no longer needed", "name", drName)
if err := r.Delete(ctx, dr); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete DestinationRule: %w", err)
}
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get DestinationRule: %w", err)
}
return nil
}
// 辅助函数 - 转换负载均衡设置
func convertLoadBalancerSettings(settings *applicationv1.LoadBalancerSettings) *istioapinetworkingv1.LoadBalancerSettings {
result := &istioapinetworkingv1.LoadBalancerSettings{}
if settings.ConsistentHash != nil {
consistentHash := &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB{}
if settings.ConsistentHash.HttpHeaderName != "" {
consistentHash.HashKey = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_HttpHeaderName{
HttpHeaderName: settings.ConsistentHash.HttpHeaderName,
}
} else if settings.ConsistentHash.UseSourceIp {
consistentHash.HashKey = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_UseSourceIp{
UseSourceIp: true,
}
} else if settings.ConsistentHash.HttpCookie != nil {
consistentHash.HashKey = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_HttpCookie{
HttpCookie: &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_HTTPCookie{
Name: settings.ConsistentHash.HttpCookie.Name,
Path: settings.ConsistentHash.HttpCookie.Path,
Ttl: &durationpb.Duration{
Seconds: int64(settings.ConsistentHash.HttpCookie.Ttl),
},
},
}
}
result.LbPolicy = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHash{
ConsistentHash: consistentHash,
}
} else {
// 简单负载均衡
var simpleType istioapinetworkingv1.LoadBalancerSettings_SimpleLB
switch settings.Simple {
case "LEAST_CONN":
simpleType = istioapinetworkingv1.LoadBalancerSettings_LEAST_CONN
case "RANDOM":
simpleType = istioapinetworkingv1.LoadBalancerSettings_RANDOM
case "PASSTHROUGH":
simpleType = istioapinetworkingv1.LoadBalancerSettings_PASSTHROUGH
default:
simpleType = istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN
}
result.LbPolicy = &istioapinetworkingv1.LoadBalancerSettings_Simple{
Simple: simpleType,
}
}
return result
}
// 此处已删除重复的 reconcileIstioTraffic 函数实现保留第139行的定义