Files
devstar-create-from-template/modules/k8s/controller/application/application_controller.go
2025-08-25 15:46:12 +08:00

1805 lines
56 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package application
import (
"context"
"fmt"
"strings"
"time"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
k8s_sigs_controller_runtime_utils "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
applicationv1 "code.gitea.io/gitea/modules/k8s/api/application/v1"
application_controller_utils "code.gitea.io/gitea/modules/k8s/controller/application/utils"
apps_v1 "k8s.io/api/apps/v1"
core_v1 "k8s.io/api/core/v1"
"google.golang.org/protobuf/types/known/durationpb"
istioapinetworkingv1 "istio.io/api/networking/v1"
istionetworkingv1 "istio.io/client-go/pkg/apis/networking/v1"
)
// ApplicationReconciler reconciles a Application object
type ApplicationReconciler struct {
client.Client
Scheme *runtime.Scheme
}
// +kubebuilder:rbac:groups=application.devstar.cn,resources=applications,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=application.devstar.cn,resources=applications/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=application.devstar.cn,resources=applications/finalizers,verbs=update
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=create;delete;get;list;watch;update;patch
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=create;delete;get;list;watch;update;patch
// +kubebuilder:rbac:groups="",resources=services,verbs=create;delete;get;list;watch;update;patch
// +kubebuilder:rbac:groups=networking.k8s.io,resources=ingresses,verbs=create;delete;get;list;watch;update;patch
// Reconcile is part of the main kubernetes reconciliation loop
func (r *ApplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
// 获取Application实例
app := &applicationv1.Application{}
err := r.Get(ctx, req.NamespacedName, app)
if err != nil {
if errors.IsNotFound(err) {
logger.Info("Application resource not found. Ignoring since object must be deleted")
return ctrl.Result{}, nil
}
logger.Error(err, "Failed to get Application")
return ctrl.Result{}, err
}
logger.Info("Processing Application", "name", app.Name, "namespace", app.Namespace, "type", app.Spec.Template.Type)
// 添加 finalizer 处理逻辑
finalizerName := "application.devstar.cn/finalizer"
// 检查对象是否正在被删除
if !app.ObjectMeta.DeletionTimestamp.IsZero() {
// 对象正在被删除 - 处理 finalizer
if k8s_sigs_controller_runtime_utils.ContainsFinalizer(app, finalizerName) {
// 执行清理操作
logger.Info("Cleaning up resources before deletion", "name", app.Name)
// 清理完成后移除 finalizer
k8s_sigs_controller_runtime_utils.RemoveFinalizer(app, finalizerName)
if err := r.Update(ctx, app); err != nil {
logger.Error(err, "Failed to remove finalizer")
return ctrl.Result{}, err
}
}
// 已标记为删除且处理完成,允许继续删除流程
return ctrl.Result{}, nil
}
// 如果对象不包含 finalizer就添加它
if !k8s_sigs_controller_runtime_utils.ContainsFinalizer(app, finalizerName) {
logger.Info("Adding finalizer", "name", app.Name)
k8s_sigs_controller_runtime_utils.AddFinalizer(app, finalizerName)
if err := r.Update(ctx, app); err != nil {
logger.Error(err, "Failed to add finalizer")
return ctrl.Result{}, err
}
}
// 根据应用类型协调相应的资源
if app.Spec.Template.Type == "stateful" {
// 协调 StatefulSet
if err := r.reconcileStatefulSet(ctx, app); err != nil {
logger.Error(err, "Failed to reconcile StatefulSet")
return ctrl.Result{}, err
}
} else {
// 协调 Deployment默认为无状态应用
if err := r.reconcileDeployment(ctx, app); err != nil {
logger.Error(err, "Failed to reconcile Deployment")
return ctrl.Result{}, err
}
}
// 协调 Service
if err := r.reconcileService(ctx, app); err != nil {
logger.Error(err, "Failed to reconcile Service")
return ctrl.Result{}, err
}
// 更新状态
if err := r.updateStatus(ctx, app); err != nil {
logger.Error(err, "Failed to update status")
return ctrl.Result{}, err
}
// 协调网络策略
if err := r.reconcileNetworkPolicy(ctx, app); err != nil {
logger.Error(err, "Failed to reconcile network policy")
return ctrl.Result{}, err
}
// 如果配置了旧版TrafficPolicy为向后兼容处理旧的流量策略
if app.Spec.TrafficPolicy != nil {
if err := r.reconcileIstioTraffic(ctx, app); err != nil {
logger.Error(err, "Failed to reconcile Istio traffic policy")
return ctrl.Result{}, err
}
}
logger.Info("Successfully reconciled Application", "name", app.Name)
return ctrl.Result{RequeueAfter: time.Minute * 1}, nil
}
// 服务网格流量治理逻辑
func (r *ApplicationReconciler) reconcileIstioTraffic(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
if app.Spec.TrafficPolicy == nil {
logger.Info("No legacy TrafficPolicy configured, skipping")
return nil
}
// 如果已经有NetworkPolicy配置使用它而不是旧版TrafficPolicy
if app.Spec.NetworkPolicy != nil &&
((app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Enabled) ||
(app.Spec.NetworkPolicy.Gateway != nil && app.Spec.NetworkPolicy.Gateway.Enabled)) {
logger.Info("NetworkPolicy already configured, skipping legacy TrafficPolicy")
return nil
}
// 处理金丝雀发布
if app.Spec.TrafficPolicy.Canary != nil && app.Spec.TrafficPolicy.Canary.Enabled {
logger.Info("Processing legacy Canary traffic configuration")
// 这里实现金丝雀发布的逻辑
// 为简化实现可以创建一个VirtualService来处理金丝雀流量
vsName := app.Name + "-canary-vs"
vs := &istionetworkingv1.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: vsName,
Namespace: app.Namespace,
},
}
// 检查服务是否存在
serviceName := app.Name + "-svc"
service := &core_v1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service)
if errors.IsNotFound(err) {
logger.Info("Service not found, skipping legacy VirtualService creation", "service", serviceName)
return nil
} else if err != nil {
return fmt.Errorf("failed to get service: %w", err)
}
// 构建金丝雀服务名
canaryService := app.Name + "-" + app.Spec.TrafficPolicy.Canary.CanaryVersion + "-svc"
// 创建或更新VirtualService
op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, vs, func() error {
// 设置控制器引用
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, vs, r.Scheme); err != nil {
return err
}
// 配置金丝雀VirtualService
mainWeight := int32(app.Spec.TrafficPolicy.Canary.MainWeight)
canaryWeight := int32(100 - app.Spec.TrafficPolicy.Canary.MainWeight)
vs.Spec.Hosts = []string{service.Name}
vs.Spec.Gateways = []string{"mesh"} // 默认只在网格内生效
vs.Spec.Http = []*istioapinetworkingv1.HTTPRoute{
{
Route: []*istioapinetworkingv1.HTTPRouteDestination{
{
Destination: &istioapinetworkingv1.Destination{
Host: service.Name,
},
Weight: mainWeight,
},
{
Destination: &istioapinetworkingv1.Destination{
Host: canaryService,
},
Weight: canaryWeight,
},
},
},
}
return nil
})
if err != nil {
return fmt.Errorf("failed to create or update legacy Canary VirtualService: %w", err)
}
logger.Info("Legacy Canary VirtualService reconciled", "name", vsName, "operation", op)
}
// 如果配置了熔断器确保创建DestinationRule
if app.Spec.TrafficPolicy.CircuitBreaker != nil {
logger.Info("Processing legacy CircuitBreaker configuration")
// reconcileDestinationRule函数已经支持从TrafficPolicy获取熔断器配置
if err := r.reconcileDestinationRule(ctx, app); err != nil {
return fmt.Errorf("failed to reconcile legacy DestinationRule: %w", err)
}
}
return nil
}
// 修复 reconcileDeployment 函数中的调用
func (r *ApplicationReconciler) reconcileDeployment(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
// 检查 Deployment 是否存在
deployment := &apps_v1.Deployment{}
err := r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, deployment)
if errors.IsNotFound(err) {
// 创建新的 Deployment
logger.Info("Creating new Deployment", "name", app.Name)
newDeployment, err := application_controller_utils.NewDeployment(app)
if err != nil {
return fmt.Errorf("failed to generate deployment: %w", err)
}
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newDeployment, r.Scheme); err != nil {
return fmt.Errorf("failed to set controller reference: %w", err)
}
if err := r.Create(ctx, newDeployment); err != nil {
return fmt.Errorf("failed to create deployment: %w", err)
}
logger.Info("Successfully created Deployment", "name", app.Name)
return nil
} else if err != nil {
return fmt.Errorf("failed to get deployment: %w", err)
}
// 获取期望的副本数
desiredReplicas := int32(1) // 默认值
if app.Spec.Replicas != nil {
desiredReplicas = *app.Spec.Replicas
}
// 获取当前的副本数
currentReplicas := int32(1) // 默认值
if deployment.Spec.Replicas != nil {
currentReplicas = *deployment.Spec.Replicas
}
logger.Info("Deployment replica status",
"name", app.Name,
"current-spec-replicas", currentReplicas,
"desired-replicas", desiredReplicas,
"actual-replicas", deployment.Status.Replicas,
"ready-replicas", deployment.Status.ReadyReplicas)
// 检查是否需要更新
needsUpdate := false
updateFields := make(map[string]interface{})
// 1. 检查副本数是否变更
if currentReplicas != desiredReplicas {
logger.Info("Replica count changed", "current", currentReplicas, "desired", desiredReplicas)
needsUpdate = true
updateFields["replicas"] = desiredReplicas
deployment.Spec.Replicas = &desiredReplicas
}
// 2. 生成期望的 Deployment 来比较其他字段
updatedDeployment, err := application_controller_utils.NewDeployment(app)
if err != nil {
return fmt.Errorf("failed to generate updated deployment: %w", err)
}
// 3. 检查镜像是否变更
if len(deployment.Spec.Template.Spec.Containers) > 0 &&
len(updatedDeployment.Spec.Template.Spec.Containers) > 0 {
currentImage := deployment.Spec.Template.Spec.Containers[0].Image
desiredImage := updatedDeployment.Spec.Template.Spec.Containers[0].Image
if currentImage != desiredImage {
logger.Info("Image changed", "current", currentImage, "desired", desiredImage)
needsUpdate = true
updateFields["image"] = desiredImage
deployment.Spec.Template.Spec.Containers[0].Image = desiredImage
}
}
// 4. 检查环境变量是否变更
if len(deployment.Spec.Template.Spec.Containers) > 0 &&
len(updatedDeployment.Spec.Template.Spec.Containers) > 0 {
if !equalEnvVars(deployment.Spec.Template.Spec.Containers[0].Env,
updatedDeployment.Spec.Template.Spec.Containers[0].Env) {
logger.Info("Environment variables changed")
needsUpdate = true
updateFields["env"] = "changed"
deployment.Spec.Template.Spec.Containers[0].Env = updatedDeployment.Spec.Template.Spec.Containers[0].Env
}
}
// 5. 检查资源配置是否变更
if len(deployment.Spec.Template.Spec.Containers) > 0 &&
len(updatedDeployment.Spec.Template.Spec.Containers) > 0 {
if !equalResources(deployment.Spec.Template.Spec.Containers[0].Resources,
updatedDeployment.Spec.Template.Spec.Containers[0].Resources) {
logger.Info("Resource requirements changed")
needsUpdate = true
updateFields["resources"] = "changed"
deployment.Spec.Template.Spec.Containers[0].Resources = updatedDeployment.Spec.Template.Spec.Containers[0].Resources
}
}
// 6. 检查端口配置是否变更
if len(deployment.Spec.Template.Spec.Containers) > 0 &&
len(updatedDeployment.Spec.Template.Spec.Containers) > 0 {
if !equalPorts(deployment.Spec.Template.Spec.Containers[0].Ports,
updatedDeployment.Spec.Template.Spec.Containers[0].Ports) {
logger.Info("Container ports changed")
needsUpdate = true
updateFields["ports"] = "changed"
deployment.Spec.Template.Spec.Containers[0].Ports = updatedDeployment.Spec.Template.Spec.Containers[0].Ports
}
}
// 7. 检查健康检查配置是否变更
if len(deployment.Spec.Template.Spec.Containers) > 0 &&
len(updatedDeployment.Spec.Template.Spec.Containers) > 0 {
if !equalProbes(deployment.Spec.Template.Spec.Containers[0].LivenessProbe,
updatedDeployment.Spec.Template.Spec.Containers[0].LivenessProbe) ||
!equalProbes(deployment.Spec.Template.Spec.Containers[0].ReadinessProbe,
updatedDeployment.Spec.Template.Spec.Containers[0].ReadinessProbe) {
logger.Info("Health check probes changed")
needsUpdate = true
updateFields["probes"] = "changed"
deployment.Spec.Template.Spec.Containers[0].LivenessProbe = updatedDeployment.Spec.Template.Spec.Containers[0].LivenessProbe
deployment.Spec.Template.Spec.Containers[0].ReadinessProbe = updatedDeployment.Spec.Template.Spec.Containers[0].ReadinessProbe
}
}
// 执行更新
if needsUpdate {
logger.Info("Updating deployment", "name", app.Name, "fields", updateFields)
if err := r.Update(ctx, deployment); err != nil {
return fmt.Errorf("failed to update deployment: %w", err)
}
logger.Info("Deployment updated successfully", "name", app.Name, "fields", updateFields)
} else {
logger.Info("No deployment updates needed", "name", app.Name)
}
return nil
}
// 添加辅助函数:比较端口配置
func equalPorts(current, desired []core_v1.ContainerPort) bool {
if len(current) != len(desired) {
return false
}
currentMap := make(map[string]core_v1.ContainerPort)
for _, port := range current {
currentMap[port.Name] = port
}
for _, port := range desired {
if currentPort, exists := currentMap[port.Name]; !exists ||
currentPort.ContainerPort != port.ContainerPort ||
currentPort.Protocol != port.Protocol {
return false
}
}
return true
}
// 添加辅助函数:比较探针配置
func equalProbes(current, desired *core_v1.Probe) bool {
if current == nil && desired == nil {
return true
}
if current == nil || desired == nil {
return false
}
// 简单比较,可以根据需要扩展
if current.InitialDelaySeconds != desired.InitialDelaySeconds ||
current.PeriodSeconds != desired.PeriodSeconds ||
current.TimeoutSeconds != desired.TimeoutSeconds ||
current.SuccessThreshold != desired.SuccessThreshold ||
current.FailureThreshold != desired.FailureThreshold {
return false
}
// 比较 HTTPGet 配置
if current.HTTPGet != nil && desired.HTTPGet != nil {
return current.HTTPGet.Path == desired.HTTPGet.Path &&
current.HTTPGet.Port == desired.HTTPGet.Port
}
return current.HTTPGet == nil && desired.HTTPGet == nil
}
// 添加辅助函数:比较环境变量
func equalEnvVars(current, desired []core_v1.EnvVar) bool {
if len(current) != len(desired) {
return false
}
currentMap := make(map[string]string)
for _, env := range current {
currentMap[env.Name] = env.Value
}
desiredMap := make(map[string]string)
for _, env := range desired {
desiredMap[env.Name] = env.Value
}
// 检查每个期望的环境变量是否匹配
for key, value := range desiredMap {
if currentMap[key] != value {
return false
}
}
// 检查是否有多余的环境变量
for key := range currentMap {
if _, exists := desiredMap[key]; !exists {
return false
}
}
return true
}
// 添加辅助函数:比较资源配置
func equalResources(current, desired core_v1.ResourceRequirements) bool {
// 比较 Limits
if current.Limits == nil && desired.Limits != nil {
return false
}
if current.Limits != nil && desired.Limits == nil {
return false
}
if current.Limits != nil && desired.Limits != nil {
// 比较 CPU
currentCPU := current.Limits.Cpu()
desiredCPU := desired.Limits.Cpu()
if currentCPU != nil && desiredCPU != nil {
if !currentCPU.Equal(*desiredCPU) {
return false
}
} else if currentCPU != desiredCPU { // 一个为 nil另一个不为 nil
return false
}
// 比较 Memory
currentMemory := current.Limits.Memory()
desiredMemory := desired.Limits.Memory()
if currentMemory != nil && desiredMemory != nil {
if !currentMemory.Equal(*desiredMemory) {
return false
}
} else if currentMemory != desiredMemory { // 一个为 nil另一个不为 nil
return false
}
}
// 比较 Requests
if current.Requests == nil && desired.Requests != nil {
return false
}
if current.Requests != nil && desired.Requests == nil {
return false
}
if current.Requests != nil && desired.Requests != nil {
// 比较 CPU
currentCPU := current.Requests.Cpu()
desiredCPU := desired.Requests.Cpu()
if currentCPU != nil && desiredCPU != nil {
if !currentCPU.Equal(*desiredCPU) {
return false
}
} else if currentCPU != desiredCPU { // 一个为 nil另一个不为 nil
return false
}
// 比较 Memory
currentMemory := current.Requests.Memory()
desiredMemory := desired.Requests.Memory()
if currentMemory != nil && desiredMemory != nil {
if !currentMemory.Equal(*desiredMemory) {
return false
}
} else if currentMemory != desiredMemory { // 一个为 nil另一个不为 nil
return false
}
}
return true
}
func (r *ApplicationReconciler) reconcileStatefulSet(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
// 检查 StatefulSet 是否存在
statefulSet := &apps_v1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, statefulSet)
if errors.IsNotFound(err) {
// 创建新的 StatefulSet
logger.Info("Creating new StatefulSet", "name", app.Name)
newStatefulSet, err := application_controller_utils.NewStatefulSet(app)
if err != nil {
return fmt.Errorf("failed to generate statefulset: %w", err)
}
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newStatefulSet, r.Scheme); err != nil {
return fmt.Errorf("failed to set controller reference: %w", err)
}
if err := r.Create(ctx, newStatefulSet); err != nil {
return fmt.Errorf("failed to create statefulset: %w", err)
}
logger.Info("Successfully created StatefulSet", "name", app.Name)
return nil
} else if err != nil {
return fmt.Errorf("failed to get statefulset: %w", err)
}
// 更新现有 StatefulSet
logger.Info("Checking StatefulSet for updates", "name", app.Name)
updatedStatefulSet, err := application_controller_utils.NewStatefulSet(app)
if err != nil {
return fmt.Errorf("failed to generate updated statefulset: %w", err)
}
// 检查是否需要更新
needsUpdate := false
updateFields := make(map[string]interface{})
// 检查镜像是否变更
if len(statefulSet.Spec.Template.Spec.Containers) > 0 &&
len(updatedStatefulSet.Spec.Template.Spec.Containers) > 0 {
currentImage := statefulSet.Spec.Template.Spec.Containers[0].Image
desiredImage := updatedStatefulSet.Spec.Template.Spec.Containers[0].Image
if currentImage != desiredImage {
logger.Info("StatefulSet image changed", "current", currentImage, "desired", desiredImage)
needsUpdate = true
updateFields["image"] = desiredImage
}
}
// 检查副本数是否变更
currentReplicas := int32(1)
if statefulSet.Spec.Replicas != nil {
currentReplicas = *statefulSet.Spec.Replicas
}
desiredReplicas := int32(1)
if updatedStatefulSet.Spec.Replicas != nil {
desiredReplicas = *updatedStatefulSet.Spec.Replicas
}
if currentReplicas != desiredReplicas {
logger.Info("StatefulSet replica count changed", "current", currentReplicas, "desired", desiredReplicas)
needsUpdate = true
updateFields["replicas"] = desiredReplicas
}
if needsUpdate {
logger.Info("Updating StatefulSet", "name", app.Name, "fields", updateFields)
statefulSet.Spec = updatedStatefulSet.Spec
if err := r.Update(ctx, statefulSet); err != nil {
return fmt.Errorf("failed to update statefulset: %w", err)
}
logger.Info("StatefulSet updated successfully", "name", app.Name)
} else {
logger.Info("No StatefulSet updates needed", "name", app.Name)
}
return nil
}
// 添加辅助函数:比较字符串映射
func equalStringMaps(current, desired map[string]string) bool {
if len(current) != len(desired) {
return false
}
for k, v := range desired {
if current[k] != v {
return false
}
}
return true
}
// equalIngressTLS函数已废弃
func (r *ApplicationReconciler) updateStatus(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
var deployment *apps_v1.Deployment
var statefulSet *apps_v1.StatefulSet
var err error
// 根据应用类型获取对应的资源状态
if app.Spec.Template.Type == "stateful" {
statefulSet = &apps_v1.StatefulSet{}
err = r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, statefulSet)
} else {
deployment = &apps_v1.Deployment{}
err = r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, deployment)
}
if err != nil {
app.Status.Phase = "Failed"
app.Status.Message = fmt.Sprintf("Workload not found: %v", err)
logger.Error(err, "Failed to get workload for status update")
} else {
// 根据工作负载类型更新状态
if statefulSet != nil {
app.Status.Replicas = statefulSet.Status.Replicas
app.Status.ReadyReplicas = statefulSet.Status.ReadyReplicas
} else if deployment != nil {
app.Status.Replicas = deployment.Status.Replicas
app.Status.ReadyReplicas = deployment.Status.ReadyReplicas
}
// 状态判断逻辑
if app.Status.ReadyReplicas == 0 {
app.Status.Phase = "Pending"
app.Status.Message = "Application is starting"
} else if app.Status.ReadyReplicas < app.Status.Replicas {
app.Status.Phase = "Scaling"
app.Status.Message = fmt.Sprintf("Ready: %d/%d",
app.Status.ReadyReplicas, app.Status.Replicas)
} else {
app.Status.Phase = "Running"
app.Status.Message = "Application is healthy"
}
}
app.Status.LastUpdated = metav1.Now()
if err := r.Status().Update(ctx, app); err != nil {
return fmt.Errorf("failed to update status: %w", err)
}
logger.Info("Updated Application status", "phase", app.Status.Phase, "message", app.Status.Message)
return nil
}
// 协调网络策略
func (r *ApplicationReconciler) reconcileNetworkPolicy(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
// 如果没有配置网络策略,跳过处理
if app.Spec.NetworkPolicy == nil {
logger.Info("No network policy configured, skipping", "name", app.Name)
return nil
}
// 协调Gateway(南北向流量)
if app.Spec.NetworkPolicy.Gateway != nil && app.Spec.NetworkPolicy.Gateway.Enabled {
if err := r.reconcileGateway(ctx, app); err != nil {
return fmt.Errorf("failed to reconcile gateway: %w", err)
}
} else {
// 清理不再需要的Gateway资源
if err := r.cleanupGateway(ctx, app); err != nil {
return fmt.Errorf("failed to cleanup gateway: %w", err)
}
}
// 协调Mesh(东西向流量)
if app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Enabled {
if err := r.reconcileMesh(ctx, app); err != nil {
return fmt.Errorf("failed to reconcile mesh: %w", err)
}
} else {
// 清理不再需要的Mesh资源
if err := r.cleanupMesh(ctx, app); err != nil {
return fmt.Errorf("failed to cleanup mesh: %w", err)
}
}
return nil
}
// cleanupGateway 清理不再需要的Gateway资源
func (r *ApplicationReconciler) cleanupGateway(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
gatewayName := app.Name + "-gateway"
vsName := app.Name + "-gateway-vs"
// 清理Gateway
gateway := &istionetworkingv1.Gateway{}
err := r.Get(ctx, types.NamespacedName{Name: gatewayName, Namespace: app.Namespace}, gateway)
if err == nil {
logger.Info("Cleaning up Gateway that is no longer needed", "name", gatewayName)
if err := r.Delete(ctx, gateway); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete Gateway: %w", err)
}
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get Gateway: %w", err)
}
// 清理VirtualService
vs := &istionetworkingv1.VirtualService{}
err = r.Get(ctx, types.NamespacedName{Name: vsName, Namespace: app.Namespace}, vs)
if err == nil {
logger.Info("Cleaning up Gateway VirtualService that is no longer needed", "name", vsName)
if err := r.Delete(ctx, vs); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete Gateway VirtualService: %w", err)
}
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get Gateway VirtualService: %w", err)
}
return nil
}
// cleanupMesh 清理不再需要的Mesh资源
func (r *ApplicationReconciler) cleanupMesh(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
vsName := app.Name + "-mesh-vs"
drName := app.Name + "-dr"
// 清理VirtualService
vs := &istionetworkingv1.VirtualService{}
err := r.Get(ctx, types.NamespacedName{Name: vsName, Namespace: app.Namespace}, vs)
if err == nil {
logger.Info("Cleaning up Mesh VirtualService that is no longer needed", "name", vsName)
if err := r.Delete(ctx, vs); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete Mesh VirtualService: %w", err)
}
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get Mesh VirtualService: %w", err)
}
// 清理DestinationRule
dr := &istionetworkingv1.DestinationRule{}
err = r.Get(ctx, types.NamespacedName{Name: drName, Namespace: app.Namespace}, dr)
if err == nil {
logger.Info("Cleaning up DestinationRule that is no longer needed", "name", drName)
if err := r.Delete(ctx, dr); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete DestinationRule: %w", err)
}
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get DestinationRule: %w", err)
}
return nil
}
// 为了在SetupWithManager中注册Istio资源监控
func (r *ApplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&applicationv1.Application{}).
Owns(&apps_v1.Deployment{}).
Owns(&apps_v1.StatefulSet{}).
Owns(&core_v1.Service{}).
// 添加对Istio资源的监控
// Owns(&istionetworkingv1.VirtualService{}).
//Owns(&istionetworkingv1.Gateway{}).
//Owns(&istionetworkingv1.DestinationRule{}).
Complete(r)
}
func (r *ApplicationReconciler) reconcileService(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
serviceName := app.Name + "-svc"
// 检查是否需要创建 Service
shouldCreate := false
if app.Spec.Service != nil {
shouldCreate = app.Spec.Service.Enabled
} else {
// 向后兼容:使用旧的 expose 配置
shouldCreate = app.Spec.Expose && len(app.Spec.Template.Ports) > 0
}
// 获取现有 Service
service := &core_v1.Service{}
err := r.Get(ctx, types.NamespacedName{
Name: serviceName,
Namespace: app.Namespace,
}, service)
serviceExists := !errors.IsNotFound(err)
if !shouldCreate {
// 如果不需要 Service 但存在,则删除
if serviceExists {
logger.Info("Deleting existing Service as it's disabled", "name", serviceName)
if err := r.Delete(ctx, service); err != nil {
return fmt.Errorf("failed to delete service: %w", err)
}
}
return nil
}
// 需要创建 Service
if !serviceExists {
// Service 不存在,创建新的
logger.Info("Creating new Service", "name", serviceName)
newService, err := application_controller_utils.NewService(app)
if err != nil {
return fmt.Errorf("failed to generate service: %w", err)
}
if newService == nil {
logger.Info("Service creation skipped", "name", serviceName)
return nil
}
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, newService, r.Scheme); err != nil {
return fmt.Errorf("failed to set controller reference: %w", err)
}
if err := r.Create(ctx, newService); err != nil {
return fmt.Errorf("failed to create service: %w", err)
}
logger.Info("Successfully created Service",
"name", serviceName,
"type", newService.Spec.Type,
"ports", len(newService.Spec.Ports))
return nil
}
// Service 存在,检查是否需要更新
if err != nil {
return fmt.Errorf("failed to get service: %w", err)
}
logger.Info("Checking Service for updates", "name", serviceName)
updatedService, err := application_controller_utils.NewService(app)
if err != nil {
return fmt.Errorf("failed to generate updated service: %w", err)
}
if updatedService == nil {
logger.Info("Service should be deleted", "name", serviceName)
if err := r.Delete(ctx, service); err != nil {
return fmt.Errorf("failed to delete service: %w", err)
}
return nil
}
// 检查是否需要更新
needsUpdate := false
updateFields := make(map[string]interface{})
// 检查服务类型
if service.Spec.Type != updatedService.Spec.Type {
logger.Info("Service type changed",
"current", service.Spec.Type,
"desired", updatedService.Spec.Type)
needsUpdate = true
updateFields["type"] = updatedService.Spec.Type
service.Spec.Type = updatedService.Spec.Type
}
// 检查端口配置
if !equalServicePorts(service.Spec.Ports, updatedService.Spec.Ports) {
logger.Info("Service ports changed")
needsUpdate = true
updateFields["ports"] = "changed"
service.Spec.Ports = updatedService.Spec.Ports
}
// 检查 LoadBalancer 配置
if service.Spec.LoadBalancerIP != updatedService.Spec.LoadBalancerIP {
needsUpdate = true
updateFields["loadBalancerIP"] = updatedService.Spec.LoadBalancerIP
service.Spec.LoadBalancerIP = updatedService.Spec.LoadBalancerIP
}
if !equalStringSlices(service.Spec.LoadBalancerSourceRanges, updatedService.Spec.LoadBalancerSourceRanges) {
needsUpdate = true
updateFields["loadBalancerSourceRanges"] = "changed"
service.Spec.LoadBalancerSourceRanges = updatedService.Spec.LoadBalancerSourceRanges
}
// 检查 ExternalName
if service.Spec.ExternalName != updatedService.Spec.ExternalName {
needsUpdate = true
updateFields["externalName"] = updatedService.Spec.ExternalName
service.Spec.ExternalName = updatedService.Spec.ExternalName
}
// 检查会话亲和性
if service.Spec.SessionAffinity != updatedService.Spec.SessionAffinity {
needsUpdate = true
updateFields["sessionAffinity"] = updatedService.Spec.SessionAffinity
service.Spec.SessionAffinity = updatedService.Spec.SessionAffinity
}
// 更新标签
if !equalStringMaps(service.Labels, updatedService.Labels) {
needsUpdate = true
updateFields["labels"] = "changed"
if service.Labels == nil {
service.Labels = make(map[string]string)
}
for k, v := range updatedService.Labels {
service.Labels[k] = v
}
}
// 更新注解
if !equalStringMaps(service.Annotations, updatedService.Annotations) {
needsUpdate = true
updateFields["annotations"] = "changed"
if service.Annotations == nil {
service.Annotations = make(map[string]string)
}
for k, v := range updatedService.Annotations {
service.Annotations[k] = v
}
}
if needsUpdate {
logger.Info("Updating Service", "name", serviceName, "fields", updateFields)
if err := r.Update(ctx, service); err != nil {
return fmt.Errorf("failed to update service: %w", err)
}
logger.Info("Service updated successfully", "name", serviceName)
} else {
logger.Info("No service updates needed", "name", serviceName)
}
return nil
}
// 添加辅助函数:比较服务端口
func equalServicePorts(current, desired []core_v1.ServicePort) bool {
if len(current) != len(desired) {
return false
}
currentMap := make(map[string]core_v1.ServicePort)
for _, port := range current {
currentMap[port.Name] = port
}
for _, port := range desired {
if currentPort, exists := currentMap[port.Name]; !exists ||
currentPort.Port != port.Port ||
currentPort.TargetPort != port.TargetPort ||
currentPort.Protocol != port.Protocol ||
currentPort.NodePort != port.NodePort {
return false
}
}
return true
}
// 添加辅助函数:比较字符串切片
func equalStringSlices(current, desired []string) bool {
if len(current) != len(desired) {
return false
}
currentMap := make(map[string]bool)
for _, item := range current {
currentMap[item] = true
}
for _, item := range desired {
if !currentMap[item] {
return false
}
}
return true
}
// reconcileGateway 处理Gateway资源
func (r *ApplicationReconciler) reconcileGateway(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
gatewayName := app.Name + "-gateway"
// 创建或更新Gateway资源
gateway := &istionetworkingv1.Gateway{
ObjectMeta: metav1.ObjectMeta{
Name: gatewayName,
Namespace: app.Namespace,
},
}
op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, gateway, func() error {
// 设置控制器引用
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, gateway, r.Scheme); err != nil {
return err
}
// 配置Gateway
return r.configureGateway(gateway, app)
})
if err != nil {
return fmt.Errorf("failed to create or update Gateway: %w", err)
}
logger.Info("Gateway reconciled", "name", gatewayName, "operation", op)
// 协调与Gateway关联的VirtualService
if err := r.reconcileGatewayVirtualService(ctx, app); err != nil {
return fmt.Errorf("failed to reconcile gateway VirtualService: %w", err)
}
return nil
}
// configureGateway 配置Gateway资源
func (r *ApplicationReconciler) configureGateway(gateway *istionetworkingv1.Gateway, app *applicationv1.Application) error {
// 设置Gateway选择器
gateway.Spec.Selector = map[string]string{
"istio": "ingressgateway",
}
// 清空服务器列表,准备重新添加
gateway.Spec.Servers = []*istioapinetworkingv1.Server{}
// 如果未配置端口则添加默认HTTP端口
if len(app.Spec.NetworkPolicy.Gateway.Ports) == 0 {
gateway.Spec.Servers = append(gateway.Spec.Servers, &istioapinetworkingv1.Server{
Port: &istioapinetworkingv1.Port{
Number: 80,
Protocol: "HTTP",
Name: "http",
},
Hosts: getHosts(app),
})
} else {
// 添加所有配置的端口
for _, port := range app.Spec.NetworkPolicy.Gateway.Ports {
server := &istioapinetworkingv1.Server{
Port: &istioapinetworkingv1.Port{
Number: uint32(port.Number),
Protocol: port.Protocol,
Name: port.Name,
},
Hosts: getHosts(app),
}
gateway.Spec.Servers = append(gateway.Spec.Servers, server)
}
}
// 配置TLS
if len(app.Spec.NetworkPolicy.Gateway.TLS) > 0 {
for _, tls := range app.Spec.NetworkPolicy.Gateway.TLS {
// 查找相应协议的服务器
var serverIndex = -1
for j, server := range gateway.Spec.Servers {
if server.Port.Protocol == "HTTPS" || server.Port.Protocol == "TLS" {
serverIndex = j
break
}
}
// 如果没有找到HTTPS/TLS服务器创建一个
if serverIndex == -1 {
gateway.Spec.Servers = append(gateway.Spec.Servers, &istioapinetworkingv1.Server{
Port: &istioapinetworkingv1.Port{
Number: 443,
Protocol: "HTTPS",
Name: "https",
},
Hosts: getHosts(app),
})
serverIndex = len(gateway.Spec.Servers) - 1
}
// 配置TLS设置
server := gateway.Spec.Servers[serverIndex]
server.Tls = &istioapinetworkingv1.ServerTLSSettings{
Mode: getIstioTLSMode(tls.Mode),
CredentialName: tls.SecretName,
}
// 设置最小TLS版本
if tls.MinProtocolVersion != "" {
server.Tls.MinProtocolVersion = getIstioTLSVersion(tls.MinProtocolVersion)
}
// 如果指定了特定主机,覆盖默认主机
if len(tls.Hosts) > 0 {
server.Hosts = tls.Hosts
}
}
}
return nil
}
// 辅助函数
func getHosts(app *applicationv1.Application) []string {
// 如果NetworkPolicy.Gateway.Hosts存在使用它
if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Gateway != nil && len(app.Spec.NetworkPolicy.Gateway.Hosts) > 0 {
return app.Spec.NetworkPolicy.Gateway.Hosts
}
// 不再检查旧的Ingress配置
// 使用通配符
return []string{"*"}
}
func getIstioTLSMode(mode string) istioapinetworkingv1.ServerTLSSettings_TLSmode {
switch mode {
case "MUTUAL":
return istioapinetworkingv1.ServerTLSSettings_MUTUAL
case "PASSTHROUGH":
return istioapinetworkingv1.ServerTLSSettings_PASSTHROUGH
default:
return istioapinetworkingv1.ServerTLSSettings_SIMPLE
}
}
func getIstioTLSVersion(version string) istioapinetworkingv1.ServerTLSSettings_TLSProtocol {
switch version {
case "TLSv1_0":
return istioapinetworkingv1.ServerTLSSettings_TLSV1_0
case "TLSv1_1":
return istioapinetworkingv1.ServerTLSSettings_TLSV1_1
case "TLSv1_3":
return istioapinetworkingv1.ServerTLSSettings_TLSV1_3
default:
return istioapinetworkingv1.ServerTLSSettings_TLSV1_2
}
}
// reconcileGatewayVirtualService 处理与Gateway关联的VirtualService
func (r *ApplicationReconciler) reconcileGatewayVirtualService(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
vsName := app.Name + "-gateway-vs"
// 检查服务是否存在
serviceName := app.Name + "-svc"
service := &core_v1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service)
if errors.IsNotFound(err) {
logger.Info("Service not found, skipping VirtualService creation", "service", serviceName)
return nil
} else if err != nil {
return fmt.Errorf("failed to get service: %w", err)
}
// 创建或更新VirtualService
vs := &istionetworkingv1.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: vsName,
Namespace: app.Namespace,
},
}
op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, vs, func() error {
// 设置控制器引用
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, vs, r.Scheme); err != nil {
return err
}
// 配置VirtualService
return r.configureGatewayVirtualService(vs, app, service)
})
if err != nil {
return fmt.Errorf("failed to create or update VirtualService: %w", err)
}
logger.Info("Gateway VirtualService reconciled", "name", vsName, "operation", op)
return nil
}
// configureGatewayVirtualService 配置与Gateway关联的VirtualService
func (r *ApplicationReconciler) configureGatewayVirtualService(vs *istionetworkingv1.VirtualService, app *applicationv1.Application, service *core_v1.Service) error {
// 设置基本字段
vs.Spec.Hosts = getHosts(app)
vs.Spec.Gateways = []string{app.Name + "-gateway"}
// 创建HTTP路由
httpRoute := &istioapinetworkingv1.HTTPRoute{
Route: []*istioapinetworkingv1.HTTPRouteDestination{
{
Destination: &istioapinetworkingv1.Destination{
Host: service.Name,
},
},
},
}
// 配置匹配条件
if app.Spec.NetworkPolicy.Mesh != nil && len(app.Spec.NetworkPolicy.Mesh.Routes) > 0 {
for _, route := range app.Spec.NetworkPolicy.Mesh.Routes {
if route.Match != nil {
match := &istioapinetworkingv1.HTTPMatchRequest{}
// URI匹配
if route.Match.URI != nil {
match.Uri = convertStringMatch(route.Match.URI)
}
// 方法匹配
if route.Match.Method != nil {
match.Method = convertStringMatch(route.Match.Method)
}
// 头部匹配
if len(route.Match.Headers) > 0 {
match.Headers = make(map[string]*istioapinetworkingv1.StringMatch)
for key, value := range route.Match.Headers {
match.Headers[key] = convertStringMatch(&value)
}
}
httpRoute.Match = append(httpRoute.Match, match)
}
}
} else {
// 默认匹配所有路径
httpRoute.Match = []*istioapinetworkingv1.HTTPMatchRequest{
{
Uri: &istioapinetworkingv1.StringMatch{
MatchType: &istioapinetworkingv1.StringMatch_Prefix{
Prefix: "/",
},
},
},
}
}
// 配置超时
if app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Timeout > 0 {
seconds := app.Spec.NetworkPolicy.Mesh.Timeout / 1000
nanos := (app.Spec.NetworkPolicy.Mesh.Timeout % 1000) * 1000000
httpRoute.Timeout = &durationpb.Duration{
Seconds: int64(seconds),
Nanos: int32(nanos),
}
}
// 配置重试
if app.Spec.NetworkPolicy.Mesh != nil && app.Spec.NetworkPolicy.Mesh.Retry != nil {
retryOn := "5xx,gateway-error,connect-failure"
if len(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn) > 0 {
retryOn = strings.Join(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn, ",")
}
retry := &istioapinetworkingv1.HTTPRetry{
Attempts: app.Spec.NetworkPolicy.Mesh.Retry.Attempts,
RetryOn: retryOn,
}
if app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout > 0 {
seconds := app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout / 1000
nanos := (app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout % 1000) * 1000000
retry.PerTryTimeout = &durationpb.Duration{
Seconds: int64(seconds),
Nanos: int32(nanos),
}
}
httpRoute.Retries = retry
}
vs.Spec.Http = []*istioapinetworkingv1.HTTPRoute{httpRoute}
return nil
}
// convertStringMatch 将Application CRD的StringMatch转换为Istio API的StringMatch
func convertStringMatch(match *applicationv1.StringMatch) *istioapinetworkingv1.StringMatch {
if match == nil {
return nil
}
result := &istioapinetworkingv1.StringMatch{}
if match.Exact != "" {
result.MatchType = &istioapinetworkingv1.StringMatch_Exact{
Exact: match.Exact,
}
} else if match.Prefix != "" {
result.MatchType = &istioapinetworkingv1.StringMatch_Prefix{
Prefix: match.Prefix,
}
} else if match.Regex != "" {
result.MatchType = &istioapinetworkingv1.StringMatch_Regex{
Regex: match.Regex,
}
}
return result
}
// reconcileMesh 处理服务网格配置
func (r *ApplicationReconciler) reconcileMesh(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
// 注入Sidecar (如果需要)
if app.Spec.NetworkPolicy.Mesh.Sidecar != nil && app.Spec.NetworkPolicy.Mesh.Sidecar.Inject {
if err := r.ensureSidecarInjection(ctx, app); err != nil {
return fmt.Errorf("failed to ensure sidecar injection: %w", err)
}
}
// 协调东西向流量的VirtualService
if err := r.reconcileMeshVirtualService(ctx, app); err != nil {
return fmt.Errorf("failed to reconcile mesh VirtualService: %w", err)
}
// 协调DestinationRule (熔断器)
if app.Spec.NetworkPolicy.Mesh.CircuitBreaker != nil {
if err := r.reconcileDestinationRule(ctx, app); err != nil {
return fmt.Errorf("failed to reconcile DestinationRule: %w", err)
}
} else {
// 清理不再需要的DestinationRule
if err := r.cleanupDestinationRule(ctx, app); err != nil {
return fmt.Errorf("failed to cleanup DestinationRule: %w", err)
}
}
logger.Info("Mesh resources reconciled", "name", app.Name)
return nil
}
// ensureSidecarInjection 确保应用的工作负载启用了Sidecar注入
func (r *ApplicationReconciler) ensureSidecarInjection(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
// 检查应用类型,更新相应的工作负载
if app.Spec.Template.Type == "stateful" {
// 更新StatefulSet
statefulSet := &apps_v1.StatefulSet{}
err := r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, statefulSet)
if err == nil {
// 设置注入注解
if statefulSet.Spec.Template.Annotations == nil {
statefulSet.Spec.Template.Annotations = make(map[string]string)
}
statefulSet.Spec.Template.Annotations["sidecar.istio.io/inject"] = "true"
if err := r.Update(ctx, statefulSet); err != nil {
return fmt.Errorf("failed to update StatefulSet for sidecar injection: %w", err)
}
logger.Info("Enabled sidecar injection for StatefulSet", "name", statefulSet.Name)
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get StatefulSet: %w", err)
}
} else {
// 更新Deployment
deployment := &apps_v1.Deployment{}
err := r.Get(ctx, types.NamespacedName{
Name: app.Name,
Namespace: app.Namespace,
}, deployment)
if err == nil {
// 设置注入注解
if deployment.Spec.Template.Annotations == nil {
deployment.Spec.Template.Annotations = make(map[string]string)
}
deployment.Spec.Template.Annotations["sidecar.istio.io/inject"] = "true"
if err := r.Update(ctx, deployment); err != nil {
return fmt.Errorf("failed to update Deployment for sidecar injection: %w", err)
}
logger.Info("Enabled sidecar injection for Deployment", "name", deployment.Name)
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get Deployment: %w", err)
}
}
return nil
}
// reconcileMeshVirtualService 处理服务网格内的VirtualService
func (r *ApplicationReconciler) reconcileMeshVirtualService(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
vsName := app.Name + "-mesh-vs"
// 检查服务是否存在
serviceName := app.Name + "-svc"
service := &core_v1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service)
if errors.IsNotFound(err) {
logger.Info("Service not found, skipping mesh VirtualService creation", "service", serviceName)
return nil
} else if err != nil {
return fmt.Errorf("failed to get service: %w", err)
}
// 创建或更新VirtualService
vs := &istionetworkingv1.VirtualService{
ObjectMeta: metav1.ObjectMeta{
Name: vsName,
Namespace: app.Namespace,
},
}
op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, vs, func() error {
// 设置控制器引用
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, vs, r.Scheme); err != nil {
return err
}
// 配置VirtualService
vs.Spec.Hosts = []string{service.Name}
vs.Spec.Gateways = []string{"mesh"} // 服务网格内部流量
// 创建HTTP路由
httpRoutes := []*istioapinetworkingv1.HTTPRoute{}
// 如果定义了路由规则,使用它们
if len(app.Spec.NetworkPolicy.Mesh.Routes) > 0 {
for _, route := range app.Spec.NetworkPolicy.Mesh.Routes {
httpRoute := &istioapinetworkingv1.HTTPRoute{}
// 配置匹配条件
if route.Match != nil {
match := &istioapinetworkingv1.HTTPMatchRequest{}
// URI匹配
if route.Match.URI != nil {
match.Uri = convertStringMatch(route.Match.URI)
}
// 方法匹配
if route.Match.Method != nil {
match.Method = convertStringMatch(route.Match.Method)
}
// 头部匹配
if len(route.Match.Headers) > 0 {
match.Headers = make(map[string]*istioapinetworkingv1.StringMatch)
for key, value := range route.Match.Headers {
match.Headers[key] = convertStringMatch(&value)
}
}
httpRoute.Match = []*istioapinetworkingv1.HTTPMatchRequest{match}
}
// 配置目标
destination := &istioapinetworkingv1.HTTPRouteDestination{
Destination: &istioapinetworkingv1.Destination{
Host: route.Destination.Host,
},
Weight: route.Weight,
}
if route.Destination.Subset != "" {
destination.Destination.Subset = route.Destination.Subset
}
if route.Destination.Port > 0 {
destination.Destination.Port = &istioapinetworkingv1.PortSelector{
Number: uint32(route.Destination.Port),
}
}
httpRoute.Route = []*istioapinetworkingv1.HTTPRouteDestination{destination}
httpRoutes = append(httpRoutes, httpRoute)
}
} else {
// 默认路由
httpRoute := &istioapinetworkingv1.HTTPRoute{
Route: []*istioapinetworkingv1.HTTPRouteDestination{
{
Destination: &istioapinetworkingv1.Destination{
Host: service.Name,
},
},
},
}
httpRoutes = append(httpRoutes, httpRoute)
}
// 配置超时
if app.Spec.NetworkPolicy.Mesh.Timeout > 0 {
for _, route := range httpRoutes {
seconds := app.Spec.NetworkPolicy.Mesh.Timeout / 1000
nanos := (app.Spec.NetworkPolicy.Mesh.Timeout % 1000) * 1000000
route.Timeout = &durationpb.Duration{
Seconds: int64(seconds),
Nanos: int32(nanos),
}
}
}
// 配置重试
if app.Spec.NetworkPolicy.Mesh.Retry != nil {
for _, route := range httpRoutes {
retryOn := "5xx,gateway-error,connect-failure"
if len(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn) > 0 {
retryOn = strings.Join(app.Spec.NetworkPolicy.Mesh.Retry.RetryOn, ",")
}
retry := &istioapinetworkingv1.HTTPRetry{
Attempts: app.Spec.NetworkPolicy.Mesh.Retry.Attempts,
RetryOn: retryOn,
}
if app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout > 0 {
seconds := app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout / 1000
nanos := (app.Spec.NetworkPolicy.Mesh.Retry.PerTryTimeout % 1000) * 1000000
retry.PerTryTimeout = &durationpb.Duration{
Seconds: int64(seconds),
Nanos: int32(nanos),
}
}
route.Retries = retry
}
}
// 配置故障注入(测试用途)
if app.Spec.NetworkPolicy.Mesh.FaultInjection != nil {
for _, route := range httpRoutes {
fault := &istioapinetworkingv1.HTTPFaultInjection{}
// 延迟故障
if app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay != nil {
delayDuration := &durationpb.Duration{
Seconds: int64(app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay.FixedDelay / 1000),
Nanos: int32((app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay.FixedDelay % 1000) * 1000000),
}
fault.Delay = &istioapinetworkingv1.HTTPFaultInjection_Delay{
HttpDelayType: &istioapinetworkingv1.HTTPFaultInjection_Delay_FixedDelay{
FixedDelay: delayDuration,
},
Percentage: &istioapinetworkingv1.Percent{
Value: float64(app.Spec.NetworkPolicy.Mesh.FaultInjection.Delay.Percentage) / 100.0,
},
}
}
// 中止故障
if app.Spec.NetworkPolicy.Mesh.FaultInjection.Abort != nil {
httpStatus := int32(app.Spec.NetworkPolicy.Mesh.FaultInjection.Abort.HttpStatus)
fault.Abort = &istioapinetworkingv1.HTTPFaultInjection_Abort{
ErrorType: &istioapinetworkingv1.HTTPFaultInjection_Abort_HttpStatus{
HttpStatus: httpStatus,
},
Percentage: &istioapinetworkingv1.Percent{
Value: float64(app.Spec.NetworkPolicy.Mesh.FaultInjection.Abort.Percentage) / 100.0,
},
}
}
route.Fault = fault
}
}
vs.Spec.Http = httpRoutes
return nil
})
if err != nil {
return fmt.Errorf("failed to create or update mesh VirtualService: %w", err)
}
logger.Info("Mesh VirtualService reconciled", "name", vsName, "operation", op)
return nil
}
// reconcileDestinationRule 处理DestinationRule(熔断器)
func (r *ApplicationReconciler) reconcileDestinationRule(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
drName := app.Name + "-dr"
// 检查服务是否存在
serviceName := app.Name + "-svc"
service := &core_v1.Service{}
err := r.Get(ctx, types.NamespacedName{Name: serviceName, Namespace: app.Namespace}, service)
if errors.IsNotFound(err) {
logger.Info("Service not found, skipping DestinationRule creation", "service", serviceName)
return nil
} else if err != nil {
return fmt.Errorf("failed to get service: %w", err)
}
// 创建或更新DestinationRule
dr := &istionetworkingv1.DestinationRule{
ObjectMeta: metav1.ObjectMeta{
Name: drName,
Namespace: app.Namespace,
},
}
op, err := k8s_sigs_controller_runtime_utils.CreateOrUpdate(ctx, r.Client, dr, func() error {
// 设置控制器引用
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, dr, r.Scheme); err != nil {
return err
}
// 基本配置
dr.Spec.Host = service.Name
// 配置熔断器
var cb *applicationv1.CircuitBreaker
// 从NetworkPolicy.Mesh中获取熔断器配置
if app.Spec.NetworkPolicy != nil && app.Spec.NetworkPolicy.Mesh != nil {
cb = app.Spec.NetworkPolicy.Mesh.CircuitBreaker
}
// 如果NetworkPolicy中没有尝试从旧版TrafficPolicy中获取
if cb == nil && app.Spec.TrafficPolicy != nil {
cb = app.Spec.TrafficPolicy.CircuitBreaker
}
if cb != nil {
// 连接池设置
connectionPool := &istioapinetworkingv1.ConnectionPoolSettings{
Http: &istioapinetworkingv1.ConnectionPoolSettings_HTTPSettings{
Http1MaxPendingRequests: 100,
MaxRequestsPerConnection: 1,
},
Tcp: &istioapinetworkingv1.ConnectionPoolSettings_TCPSettings{
MaxConnections: 100,
},
}
// 异常检测(熔断器核心逻辑)
maxEjectionPercent := uint32(100)
if cb.MaxEjectionPercent > 0 {
maxEjectionPercent = uint32(cb.MaxEjectionPercent)
}
outlierDetection := &istioapinetworkingv1.OutlierDetection{
ConsecutiveErrors: int32(cb.ConsecutiveErrors),
Interval: &durationpb.Duration{
Seconds: 1,
},
BaseEjectionTime: &durationpb.Duration{
Seconds: int64(cb.BaseEjectionTime),
},
MaxEjectionPercent: int32(maxEjectionPercent),
}
// 负载均衡策略
var loadBalancer *istioapinetworkingv1.LoadBalancerSettings
if app.Spec.NetworkPolicy != nil &&
app.Spec.NetworkPolicy.Mesh != nil &&
app.Spec.NetworkPolicy.Mesh.LoadBalancer != nil {
loadBalancer = convertLoadBalancerSettings(app.Spec.NetworkPolicy.Mesh.LoadBalancer)
} else if app.Spec.TrafficPolicy != nil &&
app.Spec.TrafficPolicy.LoadBalancer != nil {
loadBalancer = convertLoadBalancerSettings(app.Spec.TrafficPolicy.LoadBalancer)
} else {
loadBalancer = &istioapinetworkingv1.LoadBalancerSettings{
LbPolicy: &istioapinetworkingv1.LoadBalancerSettings_Simple{
Simple: istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN,
},
}
}
dr.Spec.TrafficPolicy = &istioapinetworkingv1.TrafficPolicy{
ConnectionPool: connectionPool,
OutlierDetection: outlierDetection,
LoadBalancer: loadBalancer,
}
}
return nil
})
if err != nil {
return fmt.Errorf("failed to create or update DestinationRule: %w", err)
}
logger.Info("DestinationRule reconciled", "name", drName, "operation", op)
return nil
}
// cleanupDestinationRule 清理不再需要的DestinationRule
func (r *ApplicationReconciler) cleanupDestinationRule(ctx context.Context, app *applicationv1.Application) error {
logger := log.FromContext(ctx)
drName := app.Name + "-dr"
dr := &istionetworkingv1.DestinationRule{}
err := r.Get(ctx, types.NamespacedName{Name: drName, Namespace: app.Namespace}, dr)
if err == nil {
logger.Info("Cleaning up DestinationRule that is no longer needed", "name", drName)
if err := r.Delete(ctx, dr); err != nil && !errors.IsNotFound(err) {
return fmt.Errorf("failed to delete DestinationRule: %w", err)
}
} else if !errors.IsNotFound(err) {
return fmt.Errorf("failed to get DestinationRule: %w", err)
}
return nil
}
// 辅助函数 - 转换负载均衡设置
func convertLoadBalancerSettings(settings *applicationv1.LoadBalancerSettings) *istioapinetworkingv1.LoadBalancerSettings {
result := &istioapinetworkingv1.LoadBalancerSettings{}
if settings.ConsistentHash != nil {
consistentHash := &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB{}
if settings.ConsistentHash.HttpHeaderName != "" {
consistentHash.HashKey = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_HttpHeaderName{
HttpHeaderName: settings.ConsistentHash.HttpHeaderName,
}
} else if settings.ConsistentHash.UseSourceIp {
consistentHash.HashKey = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_UseSourceIp{
UseSourceIp: true,
}
} else if settings.ConsistentHash.HttpCookie != nil {
consistentHash.HashKey = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_HttpCookie{
HttpCookie: &istioapinetworkingv1.LoadBalancerSettings_ConsistentHashLB_HTTPCookie{
Name: settings.ConsistentHash.HttpCookie.Name,
Path: settings.ConsistentHash.HttpCookie.Path,
Ttl: &durationpb.Duration{
Seconds: int64(settings.ConsistentHash.HttpCookie.Ttl),
},
},
}
}
result.LbPolicy = &istioapinetworkingv1.LoadBalancerSettings_ConsistentHash{
ConsistentHash: consistentHash,
}
} else {
// 简单负载均衡
var simpleType istioapinetworkingv1.LoadBalancerSettings_SimpleLB
switch settings.Simple {
case "LEAST_CONN":
simpleType = istioapinetworkingv1.LoadBalancerSettings_LEAST_CONN
case "RANDOM":
simpleType = istioapinetworkingv1.LoadBalancerSettings_RANDOM
case "PASSTHROUGH":
simpleType = istioapinetworkingv1.LoadBalancerSettings_PASSTHROUGH
default:
simpleType = istioapinetworkingv1.LoadBalancerSettings_ROUND_ROBIN
}
result.LbPolicy = &istioapinetworkingv1.LoadBalancerSettings_Simple{
Simple: simpleType,
}
}
return result
}
// 此处已删除重复的 reconcileIstioTraffic 函数实现保留第139行的定义