450 lines
16 KiB
Go
450 lines
16 KiB
Go
/*
|
||
Copyright 2024.
|
||
|
||
Licensed under the Apache License, Version 2.0 (the "License");
|
||
you may not use this file except in compliance with the License.
|
||
You may obtain a copy of the License at
|
||
|
||
http://www.apache.org/licenses/LICENSE-2.0
|
||
|
||
Unless required by applicable law or agreed to in writing, software
|
||
distributed under the License is distributed on an "AS IS" BASIS,
|
||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
See the License for the specific language governing permissions and
|
||
limitations under the License.
|
||
*/
|
||
|
||
package devcontainer
|
||
|
||
import (
|
||
"context"
|
||
"strconv"
|
||
"strings"
|
||
|
||
"k8s.io/apimachinery/pkg/api/errors"
|
||
"k8s.io/apimachinery/pkg/types"
|
||
|
||
"k8s.io/apimachinery/pkg/runtime"
|
||
ctrl "sigs.k8s.io/controller-runtime"
|
||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||
"sigs.k8s.io/controller-runtime/pkg/log"
|
||
|
||
devcontainer_v1 "code.gitea.io/gitea/modules/k8s/api/devcontainer/v1"
|
||
devcontainer_controller_utils "code.gitea.io/gitea/modules/k8s/controller/devcontainer/utils"
|
||
apps_v1 "k8s.io/api/apps/v1"
|
||
core_v1 "k8s.io/api/core/v1"
|
||
k8s_sigs_controller_runtime_utils "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
||
)
|
||
|
||
// DevcontainerAppReconciler reconciles a DevcontainerApp object
|
||
type DevcontainerAppReconciler struct {
|
||
client.Client
|
||
Scheme *runtime.Scheme
|
||
}
|
||
|
||
// +kubebuilder:rbac:groups=devcontainer.devstar.cn,resources=devcontainerapps,verbs=get;list;watch;create;update;patch;delete
|
||
// +kubebuilder:rbac:groups=devcontainer.devstar.cn,resources=devcontainerapps/status,verbs=get;update;patch
|
||
// +kubebuilder:rbac:groups=devcontainer.devstar.cn,resources=devcontainerapps/finalizers,verbs=update
|
||
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=create;delete;get;list;watch
|
||
// +kubebuilder:rbac:groups="",resources=services,verbs=create;delete;get;list;watch
|
||
// +kubebuilder:rbac:groups="",resources=persistentvolumeclaims,verbs=get;list;watch;delete
|
||
|
||
// Reconcile is part of the main kubernetes reconciliation loop which aims to
|
||
// move the current state of the cluster closer to the desired state.
|
||
// Modify the Reconcile function to compare the state specified by
|
||
// the DevcontainerApp object against the actual cluster state, and then
|
||
// perform operations to make the cluster state reflect the state specified by
|
||
// the user.
|
||
//
|
||
// For more details, check Reconcile and its Result here:
|
||
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.19.0/pkg/reconcile
|
||
|
||
func (r *DevcontainerAppReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
|
||
logger := log.FromContext(ctx)
|
||
var err error
|
||
|
||
// 1. 读取缓存中的 DevcontainerApp
|
||
app := &devcontainer_v1.DevcontainerApp{}
|
||
err = r.Get(ctx, req.NamespacedName, app)
|
||
if err != nil {
|
||
// 当 CRD 资源 "DevcontainerApp" 被删除后,直接返回空结果,跳过剩下步骤
|
||
return ctrl.Result{}, client.IgnoreNotFound(err)
|
||
}
|
||
|
||
// 添加 finalizer 处理逻辑
|
||
finalizerName := "devcontainer.devstar.cn/finalizer"
|
||
|
||
// 检查对象是否正在被删除
|
||
if !app.ObjectMeta.DeletionTimestamp.IsZero() {
|
||
// 对象正在被删除 - 处理 finalizer
|
||
if k8s_sigs_controller_runtime_utils.ContainsFinalizer(app, finalizerName) {
|
||
// 执行清理操作
|
||
logger.Info("Cleaning up resources before deletion", "name", app.Name)
|
||
|
||
// 查找并删除关联的 PVC
|
||
if err := r.cleanupPersistentVolumeClaims(ctx, app); err != nil {
|
||
logger.Error(err, "Failed to clean up PVCs")
|
||
return ctrl.Result{}, err
|
||
}
|
||
|
||
// 删除完成后移除 finalizer
|
||
k8s_sigs_controller_runtime_utils.RemoveFinalizer(app, finalizerName)
|
||
if err := r.Update(ctx, app); err != nil {
|
||
logger.Error(err, "Failed to remove finalizer")
|
||
return ctrl.Result{}, err
|
||
}
|
||
}
|
||
|
||
// 已标记为删除且处理完成,允许继续删除流程
|
||
return ctrl.Result{}, nil
|
||
}
|
||
|
||
// 如果对象不包含 finalizer,就添加它
|
||
if !k8s_sigs_controller_runtime_utils.ContainsFinalizer(app, finalizerName) {
|
||
logger.Info("Adding finalizer", "name", app.Name)
|
||
k8s_sigs_controller_runtime_utils.AddFinalizer(app, finalizerName)
|
||
if err := r.Update(ctx, app); err != nil {
|
||
logger.Error(err, "Failed to add finalizer")
|
||
return ctrl.Result{}, err
|
||
}
|
||
}
|
||
|
||
// 检查停止容器的注解
|
||
if desiredReplicas, exists := app.Annotations["devstar.io/desiredReplicas"]; exists && desiredReplicas == "0" {
|
||
logger.Info("DevContainer stop requested via annotation", "name", app.Name)
|
||
|
||
// 获取当前的 StatefulSet
|
||
statefulSetInNamespace := &apps_v1.StatefulSet{}
|
||
err = r.Get(ctx, req.NamespacedName, statefulSetInNamespace)
|
||
if err == nil {
|
||
// 设置副本数为0
|
||
replicas := int32(0)
|
||
statefulSetInNamespace.Spec.Replicas = &replicas
|
||
if err := r.Update(ctx, statefulSetInNamespace); err != nil {
|
||
logger.Error(err, "Failed to scale down StatefulSet replicas to 0")
|
||
return ctrl.Result{}, err
|
||
}
|
||
logger.Info("StatefulSet scaled down to 0 replicas due to stop request")
|
||
|
||
// 标记容器为未就绪
|
||
app.Status.Ready = false
|
||
if err := r.Status().Update(ctx, app); err != nil {
|
||
logger.Error(err, "Failed to update DevcontainerApp status")
|
||
return ctrl.Result{}, err
|
||
}
|
||
|
||
// 继续处理其他逻辑(如更新 Service)
|
||
}
|
||
}
|
||
|
||
// 2. 根据 DevcontainerApp 配置信息进行处理
|
||
// 2.1 StatefulSet 处理
|
||
statefulSet := devcontainer_controller_utils.NewStatefulSet(app)
|
||
err = k8s_sigs_controller_runtime_utils.SetControllerReference(app, statefulSet, r.Scheme)
|
||
if err != nil {
|
||
return ctrl.Result{}, err
|
||
}
|
||
|
||
// 2.2 查找 集群中同名称的 StatefulSet
|
||
statefulSetInNamespace := &apps_v1.StatefulSet{}
|
||
err = r.Get(ctx, req.NamespacedName, statefulSetInNamespace)
|
||
if err != nil {
|
||
if !errors.IsNotFound(err) {
|
||
return ctrl.Result{}, err
|
||
}
|
||
err = r.Create(ctx, statefulSet)
|
||
if err != nil && !errors.IsAlreadyExists(err) {
|
||
logger.Error(err, "Failed to create StatefulSet")
|
||
return ctrl.Result{}, err
|
||
}
|
||
} else {
|
||
// 处理重启注解
|
||
if restartedAt, exists := app.Annotations["devstar.io/restartedAt"]; exists {
|
||
// 检查注解是否已经应用到StatefulSet
|
||
needsRestart := true
|
||
|
||
if statefulSetInNamespace.Spec.Template.Annotations != nil {
|
||
if currentRestartTime, exists := statefulSetInNamespace.Spec.Template.Annotations["devstar.io/restartedAt"]; exists && currentRestartTime == restartedAt {
|
||
needsRestart = false
|
||
}
|
||
} else {
|
||
statefulSetInNamespace.Spec.Template.Annotations = make(map[string]string)
|
||
}
|
||
|
||
if needsRestart {
|
||
logger.Info("DevContainer restart requested", "name", app.Name, "time", restartedAt)
|
||
|
||
// 将重启注解传递到 Pod 模板以触发滚动更新
|
||
statefulSetInNamespace.Spec.Template.Annotations["devstar.io/restartedAt"] = restartedAt
|
||
|
||
// 确保副本数至少为1(防止之前被停止)
|
||
replicas := int32(1)
|
||
if statefulSetInNamespace.Spec.Replicas != nil && *statefulSetInNamespace.Spec.Replicas > 0 {
|
||
replicas = *statefulSetInNamespace.Spec.Replicas
|
||
}
|
||
statefulSetInNamespace.Spec.Replicas = &replicas
|
||
|
||
if err := r.Update(ctx, statefulSetInNamespace); err != nil {
|
||
logger.Error(err, "Failed to update StatefulSet for restart")
|
||
return ctrl.Result{}, err
|
||
}
|
||
logger.Info("StatefulSet restarted successfully")
|
||
}
|
||
}
|
||
|
||
// 若 StatefulSet.Status.readyReplicas 变化,则更新 DevcontainerApp.Status.Ready 域
|
||
if statefulSetInNamespace.Status.ReadyReplicas > 0 {
|
||
app.Status.Ready = true
|
||
if err := r.Status().Update(ctx, app); err != nil {
|
||
logger.Error(err, "Failed to update DevcontainerApp.Status.Ready", "DevcontainerApp.Status.Ready", app.Status.Ready)
|
||
return ctrl.Result{}, err
|
||
}
|
||
logger.Info("DevContainer is READY", "ReadyReplicas", statefulSetInNamespace.Status.ReadyReplicas)
|
||
} else if app.Status.Ready {
|
||
// 只有当目前状态为Ready但实际不再Ready时才更新
|
||
app.Status.Ready = false
|
||
if err := r.Status().Update(ctx, app); err != nil {
|
||
logger.Error(err, "Failed to un-mark DevcontainerApp.Status.Ready", "DevcontainerApp.Status.Ready", app.Status.Ready)
|
||
return ctrl.Result{}, err
|
||
}
|
||
logger.Info("DevContainer is NOT ready", "ReadyReplicas", statefulSetInNamespace.Status.ReadyReplicas)
|
||
}
|
||
|
||
// 修复方法:加上判断条件,避免循环触发更新
|
||
needsUpdate := false
|
||
|
||
// 检查镜像是否变更
|
||
if app.Spec.StatefulSet.Image != statefulSetInNamespace.Spec.Template.Spec.Containers[0].Image {
|
||
needsUpdate = true
|
||
}
|
||
|
||
// 检查副本数 - 如果指定了 desiredReplicas 注解但不为 0(停止已在前面处理)
|
||
if desiredReplicas, exists := app.Annotations["devstar.io/desiredReplicas"]; exists && desiredReplicas != "0" {
|
||
replicas, err := strconv.ParseInt(desiredReplicas, 10, 32)
|
||
if err == nil {
|
||
currentReplicas := int32(1) // 默认值
|
||
if statefulSetInNamespace.Spec.Replicas != nil {
|
||
currentReplicas = *statefulSetInNamespace.Spec.Replicas
|
||
}
|
||
|
||
if currentReplicas != int32(replicas) {
|
||
r32 := int32(replicas)
|
||
statefulSet.Spec.Replicas = &r32
|
||
needsUpdate = true
|
||
}
|
||
}
|
||
}
|
||
|
||
if needsUpdate {
|
||
if err := r.Update(ctx, statefulSet); err != nil {
|
||
return ctrl.Result{}, err
|
||
}
|
||
logger.Info("StatefulSet updated", "name", statefulSet.Name)
|
||
}
|
||
}
|
||
|
||
// 2.3 Service 处理
|
||
service := devcontainer_controller_utils.NewService(app)
|
||
if err := k8s_sigs_controller_runtime_utils.SetControllerReference(app, service, r.Scheme); err != nil {
|
||
return ctrl.Result{}, err
|
||
}
|
||
serviceInCluster := &core_v1.Service{}
|
||
err = r.Get(ctx, types.NamespacedName{Name: app.Name, Namespace: app.Namespace}, serviceInCluster)
|
||
if err != nil {
|
||
if !errors.IsNotFound(err) {
|
||
return ctrl.Result{}, err
|
||
}
|
||
err = r.Create(ctx, service)
|
||
if err == nil {
|
||
// 创建 NodePort Service 成功只执行一次 ==> 将NodePort 端口分配信息更新到 app.Status
|
||
logger.Info("[DevStar][DevContainer] NodePort Assigned", "nodePortAssigned", service.Spec.Ports[0].NodePort)
|
||
|
||
// 设置主 SSH 端口的 NodePort
|
||
app.Status.NodePortAssigned = uint16(service.Spec.Ports[0].NodePort)
|
||
|
||
// 处理额外端口
|
||
extraPortsAssigned := []devcontainer_v1.ExtraPortAssigned{}
|
||
|
||
// 处理额外端口,从第二个端口开始(索引为1)
|
||
// 因为第一个端口(索引为0)是 SSH 端口
|
||
for i := 1; i < len(service.Spec.Ports); i++ {
|
||
port := service.Spec.Ports[i]
|
||
|
||
// 查找对应的端口规格
|
||
var containerPort uint16 = 0
|
||
|
||
// 如果存在额外端口配置,尝试匹配
|
||
if app.Spec.Service.ExtraPorts != nil {
|
||
for _, ep := range app.Spec.Service.ExtraPorts {
|
||
if (ep.Name != "" && ep.Name == port.Name) ||
|
||
(uint16(port.Port) == ep.ServicePort) {
|
||
containerPort = ep.ContainerPort
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
// 如果没有找到匹配项,使用目标端口
|
||
if containerPort == 0 && port.TargetPort.IntVal > 0 {
|
||
containerPort = uint16(port.TargetPort.IntVal)
|
||
}
|
||
|
||
// 添加到额外端口列表
|
||
extraPortsAssigned = append(extraPortsAssigned, devcontainer_v1.ExtraPortAssigned{
|
||
Name: port.Name,
|
||
ServicePort: uint16(port.Port),
|
||
ContainerPort: containerPort,
|
||
NodePort: uint16(port.NodePort),
|
||
})
|
||
|
||
logger.Info("[DevStar][DevContainer] Extra Port NodePort Assigned",
|
||
"name", port.Name,
|
||
"servicePort", port.Port,
|
||
"nodePort", port.NodePort)
|
||
}
|
||
|
||
// 更新 CRD 状态,包括额外端口
|
||
app.Status.ExtraPortsAssigned = extraPortsAssigned
|
||
|
||
if err := r.Status().Update(ctx, app); err != nil {
|
||
logger.Error(err, "Failed to update NodePorts of DevcontainerApp",
|
||
"nodePortAssigned", service.Spec.Ports[0].NodePort,
|
||
"extraPortsCount", len(extraPortsAssigned))
|
||
return ctrl.Result{}, err
|
||
}
|
||
} else if !errors.IsAlreadyExists(err) {
|
||
logger.Error(err, "Failed to create DevcontainerApp NodePort Service", "nodePortServiceName", service.Name)
|
||
return ctrl.Result{}, err
|
||
}
|
||
} else {
|
||
// Service 已存在,检查它的端口信息
|
||
// 检查是否需要更新状态
|
||
needStatusUpdate := false
|
||
|
||
// 如果主端口未记录,记录之
|
||
if app.Status.NodePortAssigned == 0 && len(serviceInCluster.Spec.Ports) > 0 {
|
||
app.Status.NodePortAssigned = uint16(serviceInCluster.Spec.Ports[0].NodePort)
|
||
needStatusUpdate = true
|
||
logger.Info("[DevStar][DevContainer] Found existing main NodePort",
|
||
"nodePort", serviceInCluster.Spec.Ports[0].NodePort)
|
||
}
|
||
|
||
// 处理额外端口
|
||
if len(serviceInCluster.Spec.Ports) > 1 {
|
||
// 如果额外端口状态为空,或者数量不匹配
|
||
if app.Status.ExtraPortsAssigned == nil ||
|
||
len(app.Status.ExtraPortsAssigned) != len(serviceInCluster.Spec.Ports)-1 {
|
||
|
||
extraPortsAssigned := []devcontainer_v1.ExtraPortAssigned{}
|
||
|
||
// 从索引 1 开始,跳过主端口
|
||
for i := 1; i < len(serviceInCluster.Spec.Ports); i++ {
|
||
port := serviceInCluster.Spec.Ports[i]
|
||
|
||
// 查找对应的端口规格
|
||
var containerPort uint16 = 0
|
||
|
||
// 如果存在额外端口配置,尝试匹配
|
||
if app.Spec.Service.ExtraPorts != nil {
|
||
for _, ep := range app.Spec.Service.ExtraPorts {
|
||
if (ep.Name != "" && ep.Name == port.Name) ||
|
||
(uint16(port.Port) == ep.ServicePort) {
|
||
containerPort = ep.ContainerPort
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
// 如果没有找到匹配项,使用目标端口
|
||
if containerPort == 0 && port.TargetPort.IntVal > 0 {
|
||
containerPort = uint16(port.TargetPort.IntVal)
|
||
}
|
||
|
||
// 添加到额外端口列表
|
||
extraPortsAssigned = append(extraPortsAssigned, devcontainer_v1.ExtraPortAssigned{
|
||
Name: port.Name,
|
||
ServicePort: uint16(port.Port),
|
||
ContainerPort: containerPort,
|
||
NodePort: uint16(port.NodePort),
|
||
})
|
||
|
||
logger.Info("[DevStar][DevContainer] Found existing extra NodePort",
|
||
"name", port.Name,
|
||
"nodePort", port.NodePort)
|
||
}
|
||
|
||
// 更新额外端口状态
|
||
app.Status.ExtraPortsAssigned = extraPortsAssigned
|
||
needStatusUpdate = true
|
||
}
|
||
}
|
||
|
||
// 如果需要更新状态
|
||
if needStatusUpdate {
|
||
if err := r.Status().Update(ctx, app); err != nil {
|
||
logger.Error(err, "Failed to update NodePorts status for existing service")
|
||
return ctrl.Result{}, err
|
||
}
|
||
logger.Info("[DevStar][DevContainer] Updated NodePorts status for existing service",
|
||
"mainNodePort", app.Status.NodePortAssigned,
|
||
"extraPortsCount", len(app.Status.ExtraPortsAssigned))
|
||
}
|
||
}
|
||
return ctrl.Result{}, nil
|
||
}
|
||
|
||
// cleanupPersistentVolumeClaims 查找并删除与 DevcontainerApp 关联的所有 PVC
|
||
func (r *DevcontainerAppReconciler) cleanupPersistentVolumeClaims(ctx context.Context, app *devcontainer_v1.DevcontainerApp) error {
|
||
logger := log.FromContext(ctx)
|
||
|
||
// 查找关联的 PVC
|
||
pvcList := &core_v1.PersistentVolumeClaimList{}
|
||
|
||
// 按标签筛选
|
||
labelSelector := client.MatchingLabels{
|
||
"app": app.Name,
|
||
}
|
||
if err := r.List(ctx, pvcList, client.InNamespace(app.Namespace), labelSelector); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 如果按标签没找到,尝试按名称模式查找
|
||
if len(pvcList.Items) == 0 {
|
||
if err := r.List(ctx, pvcList, client.InNamespace(app.Namespace)); err != nil {
|
||
return err
|
||
}
|
||
|
||
// 筛选出名称包含 DevcontainerApp 名称的 PVC
|
||
var filteredItems []core_v1.PersistentVolumeClaim
|
||
for _, pvc := range pvcList.Items {
|
||
// StatefulSet PVC 命名格式通常为: <volumeClaimTemplate名称>-<StatefulSet名称>-<序号>
|
||
// 检查是否包含 app 名称作为名称的一部分
|
||
if strings.Contains(pvc.Name, app.Name+"-") {
|
||
filteredItems = append(filteredItems, pvc)
|
||
logger.Info("Found PVC to delete", "name", pvc.Name)
|
||
}
|
||
}
|
||
pvcList.Items = filteredItems
|
||
}
|
||
|
||
// 删除找到的 PVC
|
||
for i := range pvcList.Items {
|
||
logger.Info("Deleting PVC", "name", pvcList.Items[i].Name)
|
||
if err := r.Delete(ctx, &pvcList.Items[i]); err != nil && !errors.IsNotFound(err) {
|
||
logger.Error(err, "Failed to delete PVC", "name", pvcList.Items[i].Name)
|
||
return err
|
||
}
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
// SetupWithManager sets up the controller with the Manager.
|
||
func (r *DevcontainerAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
|
||
return ctrl.NewControllerManagedBy(mgr).
|
||
For(&devcontainer_v1.DevcontainerApp{}).
|
||
Owns(&apps_v1.StatefulSet{}).
|
||
Owns(&core_v1.Service{}).
|
||
Complete(r)
|
||
}
|