Files
devstar-create-from-template/services/runners/runners.go
2025-08-25 15:46:12 +08:00

436 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package runners
import (
"context"
"fmt"
"net"
"strings"
"time"
actions_module "code.gitea.io/gitea/models/actions"
docker_module "code.gitea.io/gitea/modules/docker"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
func RegistGlobalRunner(ctx context.Context) error {
log.Info("获取全局RunnerToken...")
actionRunnerToken, err := actions_module.NewRunnerToken(ctx, 0, 0)
if err != nil {
return fmt.Errorf("获取全局RunnerToken失败:%v", err)
}
runnerCount := setting.Runner.Count
for i := 0; i < runnerCount; i++ {
err := RegistRunner(ctx, actionRunnerToken.Token)
if err != nil {
return fmt.Errorf("注册Runner失败:%v", err)
}
}
return nil
}
func checkK8sIsEnable() bool {
return setting.K8sConfig.Enable
}
func RegistRunner(ctx context.Context, token string) error {
log.Info("开始注册Runner...")
var err error
if checkK8sIsEnable() {
err = registK8sRunner(ctx, token)
} else {
err = registDockerRunner(ctx, token)
}
if err != nil {
return fmt.Errorf("注册Runner失败:%v", err)
}
log.Info("Runner注册成功: %s", token)
return nil
}
func registDockerRunner(ctx context.Context, token string) error {
log.Info("开始注册Runner...")
cli, err := docker_module.CreateDockerClient(ctx)
if err != nil {
return err
}
defer cli.Close()
//拉取act_runner镜像
dockerHost, err := docker_module.GetDockerSocketPath()
if err != nil {
return fmt.Errorf("获取docker socket路径失败:%v", err)
}
// 拉取镜像
err = docker_module.PullImage(cli, dockerHost, setting.Runner.Image)
if err != nil {
return fmt.Errorf("拉取act_runner镜像失败:%v", err)
}
//获取本机IP
ips, err := getLocalIP()
if err != nil {
return fmt.Errorf("获取本机IP失败:%v", err)
}
//获取InstanceUrl
conntype := strings.Split(setting.AppURL, "://")[0]
port := setting.HTTPPort
instanceURL := conntype + "://" + ips[0] + ":" + port
timestamp := time.Now().Format("20060102150405")
//Runner配置
env := []string{
"GITEA_INSTANCE_URL=" + instanceURL,
"GITEA_RUNNER_REGISTRATION_TOKEN=" + token,
"GITEA_RUNNER_NAME=runner-" + timestamp,
}
binds := []string{
"/var/run/docker.sock:/var/run/docker.sock",
}
containerName := "runner-" + timestamp
//创建并启动Runner容器
err = docker_module.CreateAndStartContainer(cli, setting.Runner.Image, nil, env, binds, nil, containerName)
if err != nil {
return fmt.Errorf("创建并注册Runner失败:%v", err)
}
return nil
}
func DeleteRunnerByName(ctx context.Context, runnerName string) error {
log.Info("开始停止并删除容器: %s", runnerName)
var err error
if checkK8sIsEnable() {
err = deleteK8sRunnerByName(ctx, runnerName)
} else {
err = deleteDockerRunnerByName(ctx, runnerName)
}
if err != nil {
return fmt.Errorf("删除Runner失败:%v", err)
}
log.Info("Runner删除成功: %s", runnerName)
return nil
}
func deleteDockerRunnerByName(ctx context.Context, runnerName string) error {
log.Info("开始停止并删除容器: %s", runnerName)
// 创建Docker客户端
cli, err := docker_module.CreateDockerClient(ctx)
if err != nil {
return fmt.Errorf("Docker client创建失败:%v", err)
}
log.Info("[StopAndRemoveContainer]Docker client创建成功")
defer cli.Close()
err = docker_module.DeleteContainer(cli, runnerName)
if err != nil {
return fmt.Errorf("Runner创建失败:%v", err)
}
return nil
}
func getLocalIP() ([]string, error) {
var ips []string
interfaces, err := net.Interfaces()
if err != nil {
return nil, err
}
for _, iface := range interfaces {
if iface.Flags&net.FlagUp == 0 ||
iface.Flags&net.FlagLoopback != 0 {
continue
}
addrs, err := iface.Addrs()
if err != nil {
continue
}
// 遍历地址列表
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip == nil || ip.IsLoopback() {
continue
}
ip = ip.To4()
if ip == nil {
continue // 非IPv4地址
}
ips = append(ips, ip.String())
}
}
if len(ips) == 0 {
return nil, fmt.Errorf("no valid IP address found")
}
return ips, nil
}
func getK8sUrlAndToken() (string, string, error) {
if !checkK8sIsEnable() {
return "", "", fmt.Errorf("K8s未启用")
}
k8sUrl := setting.K8sConfig.Url
k8sToken := setting.K8sConfig.Token
if k8sUrl == "" || k8sToken == "" {
return "", "", fmt.Errorf("K8s配置不完整")
}
return k8sUrl, k8sToken, nil
}
func registK8sRunner(ctx context.Context, token string) error {
log.Info("开始注册Kubernetes Runner: %s", token)
k8sURL, k8sToken, err := getK8sUrlAndToken()
if err != nil {
return fmt.Errorf("获取K8s配置失败: %v", err)
}
// 测试连接
err = testKubernetesConnection(k8sURL, k8sToken)
if err != nil {
return fmt.Errorf("Kubernetes连接测试失败: %v", err)
}
// 创建K8s客户端
clientset, err := createKubernetesClient(k8sURL, k8sToken)
if err != nil {
return fmt.Errorf("创建Kubernetes客户端失败: %v", err)
}
// 获取实例URL
instanceURL, err := getInstanceURL()
if err != nil {
return fmt.Errorf("获取实例URL失败: %v", err)
}
// 创建Runner Deployment
deployment, err := createRunnerDeployment(token, instanceURL)
if err != nil {
return fmt.Errorf("创建Runner Deployment配置失败: %v", err)
}
// 部署到Kubernetes
//namespace := setting.K8sConfig.Namespace
var namespace string
if namespace == "" {
namespace = "act-runner"
}
_, err = clientset.AppsV1().Deployments(namespace).Create(ctx, deployment, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("在Kubernetes中创建Runner Deployment失败: %v", err)
}
log.Info("成功在Kubernetes中创建Runner: %s", deployment.Name)
return nil
}
func getInstanceURL() (string, error) {
// 如果AppURL是公网地址直接使用
if setting.AppURL != "" &&
!strings.Contains(setting.AppURL, "127.0.0.1") &&
!strings.Contains(setting.AppURL, "localhost") {
log.Info("使用配置的AppURL: %s", setting.AppURL)
return setting.AppURL, nil
}
// 否则构建URL
ips, err := getLocalIP()
if err != nil {
return "", fmt.Errorf("获取本机IP失败: %v", err)
}
if len(ips) == 0 {
return "", fmt.Errorf("没有找到有效的IP地址")
}
// 使用第一个IP构建URL
conntype := "http"
if strings.Contains(setting.AppURL, "https://") {
conntype = "https"
}
port := setting.HTTPPort
instanceURL := conntype + "://" + ips[0] + ":" + port
log.Info("构建的实例URL: %s", instanceURL)
return instanceURL, nil
}
func deleteK8sRunnerByName(ctx context.Context, runnerName string) error {
log.Info("开始删除K8s Runner: %s", runnerName)
// 创建Kubernetes客户端
clientset, err := createKubernetesClient(setting.K8sConfig.Url, setting.K8sConfig.Token)
if err != nil {
return fmt.Errorf("创建Kubernetes客户端失败: %v", err)
}
// 设置namespace与创建时保持一致
namespace := "act-runner"
// 删除Deployment
err = clientset.AppsV1().Deployments(namespace).Delete(ctx, runnerName, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("删除K8s Runner Deployment失败: %v", err)
}
log.Info("成功删除K8s Runner Deployment: %s", runnerName)
return nil
}
func createKubernetesClient(k8sURL, token string) (*kubernetes.Clientset, error) {
config := &rest.Config{
Host: k8sURL,
BearerToken: token,
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
},
}
// 创建客户端
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("创建Kubernetes客户端失败: %v", err)
}
return clientset, nil
}
func testKubernetesConnection(k8sURL, token string) error {
clientset, err := createKubernetesClient(k8sURL, token)
if err != nil {
return err
}
// 尝试获取节点列表来测试连接
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err = clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{Limit: 1})
if err != nil {
return fmt.Errorf("无法连接到Kubernetes集群: %v", err)
}
log.Info("Kubernetes连接测试成功")
return nil
}
func createRunnerDeployment(token, instanceURL string) (*appsv1.Deployment, error) {
timestamp := time.Now().Format("20060102150405")
name := "act-runner-" + timestamp
labels := map[string]string{
"app": "act-runner",
"type": "runner",
"version": "1.0",
}
// 副本数从配置获取
replicas := int32(1)
// 创建Deployment配置
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: labels,
},
Spec: appsv1.DeploymentSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "act-runner",
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "act-runner",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "act-runner", // 匹配现有配置中的容器名
Image: setting.Runner.Image,
Ports: []corev1.ContainerPort{
{
Name: "http-0",
ContainerPort: 3000,
Protocol: corev1.ProtocolTCP,
},
},
Env: []corev1.EnvVar{
{
Name: "GITEA_INSTANCE_URL",
Value: instanceURL,
},
{
Name: "GITEA_RUNNER_REGISTRATION_TOKEN",
Value: token,
},
{
Name: "GITEA_RUNNER_NAME", // 可选如果需要设置runner名称
Value: name,
},
},
// 移除资源限制以匹配现有配置(现有配置中 resources: {}
Resources: corev1.ResourceRequirements{},
// 挂载Docker socket
VolumeMounts: []corev1.VolumeMount{
{
Name: "docker-sock",
MountPath: "/var/run/docker.sock",
},
},
ImagePullPolicy: corev1.PullIfNotPresent,
},
},
// Docker socket卷
Volumes: []corev1.Volume{
{
Name: "docker-sock",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/var/run/docker.sock",
},
},
},
},
RestartPolicy: corev1.RestartPolicyAlways,
ServiceAccountName: "default", // 匹配现有配置
DNSPolicy: corev1.DNSClusterFirst,
// 添加节点选择器(如果需要)
NodeSelector: map[string]string{
"kubernetes.io/hostname": "node1", // 可以从配置中读取
},
// 添加容忍度
Tolerations: []corev1.Toleration{
{
Key: "node.kubernetes.io/not-ready",
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoExecute,
TolerationSeconds: func() *int64 { i := int64(300); return &i }(),
},
{
Key: "node.kubernetes.io/unreachable",
Operator: corev1.TolerationOpExists,
Effect: corev1.TaintEffectNoExecute,
TolerationSeconds: func() *int64 { i := int64(300); return &i }(),
},
},
},
},
},
}
return deployment, nil
}