Files
devstar-create-from-template/services/runners/runners.go

436 lines
11 KiB
Go
Raw Normal View History

2025-08-25 15:46:12 +08:00
package runners
import (
"context"
"fmt"
"net"
"strings"
"time"
actions_module "code.gitea.io/gitea/models/actions"
docker_module "code.gitea.io/gitea/modules/docker"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
func RegistGlobalRunner(ctx context.Context) error {
log.Info("获取全局RunnerToken...")
actionRunnerToken, err := actions_module.NewRunnerToken(ctx, 0, 0)
if err != nil {
return fmt.Errorf("获取全局RunnerToken失败:%v", err)
}
runnerCount := setting.Runner.Count
for i := 0; i < runnerCount; i++ {
err := RegistRunner(ctx, actionRunnerToken.Token)
if err != nil {
return fmt.Errorf("注册Runner失败:%v", err)
}
}
return nil
}
func checkK8sIsEnable() bool {
return setting.K8sConfig.Enable
}
func RegistRunner(ctx context.Context, token string) error {
log.Info("开始注册Runner...")
var err error
if checkK8sIsEnable() {
err = registK8sRunner(ctx, token)
} else {
err = registDockerRunner(ctx, token)
}
if err != nil {
return fmt.Errorf("注册Runner失败:%v", err)
}
log.Info("Runner注册成功: %s", token)
return nil
}
func registDockerRunner(ctx context.Context, token string) error {
log.Info("开始注册Runner...")
cli, err := docker_module.CreateDockerClient(ctx)
if err != nil {
return err
}
defer cli.Close()
//拉取act_runner镜像
dockerHost, err := docker_module.GetDockerSocketPath()
if err != nil {
return fmt.Errorf("获取docker socket路径失败:%v", err)
}
// 拉取镜像
err = docker_module.PullImage(cli, dockerHost, setting.Runner.Image)
if err != nil {
return fmt.Errorf("拉取act_runner镜像失败:%v", err)
}
//获取本机IP
ips, err := getLocalIP()
if err != nil {
return fmt.Errorf("获取本机IP失败:%v", err)
}
//获取InstanceUrl
conntype := strings.Split(setting.AppURL, "://")[0]
port := setting.HTTPPort
instanceURL := conntype + "://" + ips[0] + ":" + port
timestamp := time.Now().Format("20060102150405")
//Runner配置
env := []string{
"GITEA_INSTANCE_URL=" + instanceURL,
"GITEA_RUNNER_REGISTRATION_TOKEN=" + token,
"GITEA_RUNNER_NAME=runner-" + timestamp,
}
binds := []string{
"/var/run/docker.sock:/var/run/docker.sock",
}
containerName := "runner-" + timestamp
//创建并启动Runner容器
err = docker_module.CreateAndStartContainer(cli, setting.Runner.Image, nil, env, binds, nil, containerName)
if err != nil {
return fmt.Errorf("创建并注册Runner失败:%v", err)
}
return nil
}
func DeleteRunnerByName(ctx context.Context, runnerName string) error {
log.Info("开始停止并删除容器: %s", runnerName)
var err error
if checkK8sIsEnable() {
err = deleteK8sRunnerByName(ctx, runnerName)
} else {
err = deleteDockerRunnerByName(ctx, runnerName)
}
if err != nil {
return fmt.Errorf("删除Runner失败:%v", err)
}
log.Info("Runner删除成功: %s", runnerName)
return nil
}
func deleteDockerRunnerByName(ctx context.Context, runnerName string) error {
log.Info("开始停止并删除容器: %s", runnerName)
// 创建Docker客户端
cli, err := docker_module.CreateDockerClient(ctx)
if err != nil {
return fmt.Errorf("Docker client创建失败:%v", err)
}
log.Info("[StopAndRemoveContainer]Docker client创建成功")
defer cli.Close()
err = docker_module.DeleteContainer(cli, runnerName)
if err != nil {
return fmt.Errorf("Runner创建失败:%v", err)
}
return nil
}
func getLocalIP() ([]string, error) {
var ips []string
interfaces, err := net.Interfaces()
if err != nil {
return nil, err
}
for _, iface := range interfaces {
if iface.Flags&net.FlagUp == 0 ||
iface.Flags&net.FlagLoopback != 0 {
continue
}
addrs, err := iface.Addrs()
if err != nil {
continue
}
// 遍历地址列表
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip == nil || ip.IsLoopback() {
continue
}
ip = ip.To4()
if ip == nil {
continue // 非IPv4地址
}
ips = append(ips, ip.String())
}
}
if len(ips) == 0 {
return nil, fmt.Errorf("no valid IP address found")
}
return ips, nil
}
func getK8sUrlAndToken() (string, string, error) {
if !checkK8sIsEnable() {
return "", "", fmt.Errorf("K8s未启用")
}
k8sUrl := setting.K8sConfig.Url
k8sToken := setting.K8sConfig.Token
if k8sUrl == "" || k8sToken == "" {
return "", "", fmt.Errorf("K8s配置不完整")
}
return k8sUrl, k8sToken, nil
}
func registK8sRunner(ctx context.Context, token string) error {
log.Info("开始注册Kubernetes Runner: %s", token)
k8sURL, k8sToken, err := getK8sUrlAndToken()
if err != nil {
return fmt.Errorf("获取K8s配置失败: %v", err)
}
// 测试连接
err = testKubernetesConnection(k8sURL, k8sToken)
if err != nil {
return fmt.Errorf("Kubernetes连接测试失败: %v", err)
}
// 创建K8s客户端
clientset, err := createKubernetesClient(k8sURL, k8sToken)
if err != nil {
return fmt.Errorf("创建Kubernetes客户端失败: %v", err)
}
// 获取实例URL
instanceURL, err := getInstanceURL()
if err != nil {
return fmt.Errorf("获取实例URL失败: %v", err)
}
// 创建Runner Deployment
deployment, err := createRunnerDeployment(token, instanceURL)
if err != nil {
return fmt.Errorf("创建Runner Deployment配置失败: %v", err)
}
// 部署到Kubernetes
//namespace := setting.K8sConfig.Namespace
var namespace string
if namespace == "" {
namespace = "act-runner"
}
_, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("在Kubernetes中创建Runner Deployment失败: %v", err)
}
log.Info("成功在Kubernetes中创建Runner: %s", deployment.Name)
return nil
}
func getInstanceURL() (string, error) {
// 如果AppURL是公网地址直接使用
if setting.AppURL != "" &&
!strings.Contains(setting.AppURL, "127.0.0.1") &&
!strings.Contains(setting.AppURL, "localhost") {
log.Info("使用配置的AppURL: %s", setting.AppURL)
return setting.AppURL, nil
}
// 否则构建URL
ips, err := getLocalIP()
if err != nil {
return "", fmt.Errorf("获取本机IP失败: %v", err)
}
if len(ips) == 0 {
return "", fmt.Errorf("没有找到有效的IP地址")
}
// 使用第一个IP构建URL
conntype := "http"
if strings.Contains(setting.AppURL, "https://") {
conntype = "https"
}
port := setting.HTTPPort
instanceURL := conntype + "://" + ips[0] + ":" + port
log.Info("构建的实例URL: %s", instanceURL)
return instanceURL, nil
}
func deleteK8sRunnerByName(ctx context.Context, runnerName string) error {
log.Info("开始删除K8s Runner: %s", runnerName)
// 创建Kubernetes客户端
clientset, err := createKubernetesClient(setting.K8sConfig.Url, setting.K8sConfig.Token)
if err != nil {
return fmt.Errorf("创建Kubernetes客户端失败: %v", err)
}
// 设置namespace与创建时保持一致
namespace := "act-runner"
// 删除Deployment
err = clientset.AppsV1().Deployments(namespace).Delete(ctx, runnerName, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("删除K8s Runner Deployment失败: %v", err)
}
log.Info("成功删除K8s Runner Deployment: %s", runnerName)
return nil
}
func createKubernetesClient(k8sURL, token string) (*kubernetes.Clientset, error) {
config := &rest.Config{
Host: k8sURL,
BearerToken: token,
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
},
}
// 创建客户端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("创建Kubernetes客户端失败: %v", err)
}
return clientset, nil
}
func testKubernetesConnection(k8sURL, token string) error {
clientset, err := createKubernetesClient(k8sURL, token)
if err != nil {
return err
}
// 尝试获取节点列表来测试连接
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err = clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{Limit: 1})
if err != nil {
return fmt.Errorf("无法连接到Kubernetes集群: %v", err)
}
log.Info("Kubernetes连接测试成功")
return nil
}
func createRunnerDeployment(token, instanceURL string) (*appsv1.Deployment, error) {
timestamp := time.Now().Format("20060102150405")
name := "act-runner-" + timestamp
labels := map[string]string{
"app": "act-runner",
"type": "runner",
"version": "1.0",
}
// 副本数从配置获取
replicas := int32(1)
// 创建Deployment配置
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: labels,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "act-runner",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "act-runner",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "act-runner", // 匹配现有配置中的容器名
Image: setting.Runner.Image,
Ports: []corev1.ContainerPort{
{
Name: "http-0",
ContainerPort: 3000,
Protocol: corev1.ProtocolTCP,
},
},
Env: []corev1.EnvVar{
{
Name: "GITEA_INSTANCE_URL",
Value: instanceURL,
},
{
Name: "GITEA_RUNNER_REGISTRATION_TOKEN",
Value: token,
},
{
Name: "GITEA_RUNNER_NAME", // 可选如果需要设置runner名称
Value: name,
},
},
// 移除资源限制以匹配现有配置(现有配置中 resources: {}
Resources: corev1.ResourceRequirements{},
// 挂载Docker socket
VolumeMounts: []corev1.VolumeMount{
{
Name: "docker-sock",
MountPath: "/var/run/docker.sock",
},
},
ImagePullPolicy: corev1.PullIfNotPresent,
},
},
// Docker socket卷
Volumes: []corev1.Volume{
{
Name: "docker-sock",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/run/docker.sock",
},
},
},
},
RestartPolicy: corev1.RestartPolicyAlways,
ServiceAccountName: "default", // 匹配现有配置
DNSPolicy: corev1.DNSClusterFirst,
// 添加节点选择器(如果需要)
NodeSelector: map[string]string{
"kubernetes.io/hostname": "node1", // 可以从配置中读取
},
// 添加容忍度
Tolerations: []corev1.Toleration{
{
Key: "node.kubernetes.io/not-ready",
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoExecute,
TolerationSeconds: func() *int64 { i := int64(300); return &i }(),
},
{
Key: "node.kubernetes.io/unreachable",
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoExecute,
TolerationSeconds: func() *int64 { i := int64(300); return &i }(),
},
},
},
},
},
}
return deployment, nil
}