diff --git a/api/oss_upload.go b/api/oss_upload.go index b643eaa..ca5c6d4 100644 --- a/api/oss_upload.go +++ b/api/oss_upload.go @@ -3,38 +3,55 @@ package api import ( "ZhenTuLocalPassiveAdapter/dto" "bytes" + "context" "fmt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "io" + "log" "net/http" "os" ) -func UploadTaskFile(task dto.Task, file dto.FileObject) error { - url, err := QueryUploadUrlForTask(task.TaskID) +func UploadTaskFile(ctx context.Context, task dto.Task, file dto.FileObject) error { + url, err := QueryUploadUrlForTask(ctx, task.TaskID) if err != nil { return err } - if err := OssUpload(url, file.URL); err != nil { + log.Printf("开始上传文件, URL:【%s】\n", url) + if err := OssUpload(ctx, url, file.URL); err != nil { return err } return nil } -func OssUpload(url, filePath string) error { +func OssUpload(ctx context.Context, url, filePath string) error { + _, span := tracer.Start(ctx, "OssUpload") + defer span.End() + span.SetAttributes(attribute.String("file.path", filePath)) // 使用 http put 请求上传文件 file, err := os.Open(filePath) defer os.Remove(filePath) defer file.Close() if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "文件打开失败") return err } fileBytes, err := io.ReadAll(file) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "文件读取失败") return err } + span.SetAttributes(attribute.Int("file.size", len(fileBytes))) + span.SetAttributes(attribute.String("http.url", url)) + span.SetAttributes(attribute.String("http.method", "PUT")) req, err := http.NewRequest("PUT", url, bytes.NewBuffer(fileBytes)) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "创建请求失败") return err } req.Header.Set("Content-Type", "video/mp4") @@ -42,11 +59,17 @@ func OssUpload(url, filePath string) error { client := &http.Client{} resp, err := client.Do(req) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "发送请求失败") return err } defer resp.Body.Close() + span.SetAttributes(attribute.String("http.status", resp.Status)) + span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode)) if resp.StatusCode != http.StatusOK { + span.SetStatus(codes.Error, "上传失败") return fmt.Errorf("upload failed with status code %d", resp.StatusCode) } + span.SetStatus(codes.Ok, "上传成功") return nil } diff --git a/api/task_report.go b/api/task_report.go index a034acd..39f9a1c 100644 --- a/api/task_report.go +++ b/api/task_report.go @@ -4,69 +4,127 @@ import ( "ZhenTuLocalPassiveAdapter/config" "ZhenTuLocalPassiveAdapter/dto" "bytes" + "context" "encoding/json" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "io" "log" "net/http" ) -func QueryUploadUrlForTask(taskId string) (string, error) { +func QueryUploadUrlForTask(ctx context.Context, taskId string) (string, error) { + _, span := tracer.Start(ctx, "QueryUploadUrlForTask") + defer span.End() url := config.Config.Api.BaseUrl + "/" + taskId + "/uploadUrl" + span.SetAttributes(attribute.String("http.url", url)) + span.SetAttributes(attribute.String("http.method", "POST")) req, err := http.NewRequest("POST", url, nil) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "创建请求失败") log.Println("Error creating request:", err) return "", err } client := &http.Client{} resp, err := client.Do(req) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "发送请求失败") log.Println("Error sending request:", err) return "", err } + span.SetAttributes(attribute.String("http.status", resp.Status)) + span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode)) defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "读取响应体失败") log.Println("Error reading response body:", err) return "", err } return string(body), nil } -func ReportTaskFailure(taskId string) { +func ReportTaskFailure(ctx context.Context, taskId string) bool { + _, span := tracer.Start(ctx, "ReportTaskFailure") + defer span.End() url := config.Config.Api.BaseUrl + "/" + taskId + "/failure" + span.SetAttributes(attribute.String("http.url", url)) + span.SetAttributes(attribute.String("http.method", "POST")) req, err := http.NewRequest("POST", url, nil) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "创建请求失败") log.Println("Error creating request:", err) - return + return false } + client := &http.Client{} resp, err := client.Do(req) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "发送请求失败") log.Println("Error sending request:", err) - return + return false } defer resp.Body.Close() + + span.SetAttributes(attribute.String("http.status", resp.Status)) + span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode)) + if resp.StatusCode == 200 { + span.SetStatus(codes.Ok, "成功") + return true + } else { + span.SetStatus(codes.Error, "失败") + return false + } } -func ReportTaskSuccess(taskId string, file *dto.FileObject) { +func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject) (b bool) { + _, span := tracer.Start(ctx, "ReportTaskSuccess") + defer span.End() url := config.Config.Api.BaseUrl + "/" + taskId + "/success" + span.SetAttributes(attribute.String("http.url", url)) + span.SetAttributes(attribute.String("http.method", "POST")) + jsonData, err := json.Marshal(file) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "序列化JSON失败") log.Println("Error marshaling JSON:", err) - return + return false } + req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData)) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "创建请求失败") log.Println("Error creating request:", err) - return + return false } req.Header.Set("Content-Type", "application/json") + client := &http.Client{} resp, err := client.Do(req) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "发送请求失败") log.Println("Error sending request:", err) - return + return false } defer resp.Body.Close() + + span.SetAttributes(attribute.String("http.status", resp.Status)) + span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode)) + + if resp.StatusCode == 200 { + span.SetStatus(codes.Ok, "成功") + return true + } else { + span.SetStatus(codes.Error, "失败") + return false + } } diff --git a/api/tracer.go b/api/tracer.go new file mode 100644 index 0000000..4c74752 --- /dev/null +++ b/api/tracer.go @@ -0,0 +1,5 @@ +package api + +import "go.opentelemetry.io/otel" + +var tracer = otel.Tracer("api") diff --git a/core/task.go b/core/task.go index 79afb22..7c12992 100644 --- a/core/task.go +++ b/core/task.go @@ -5,37 +5,63 @@ import ( "ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/fs" "ZhenTuLocalPassiveAdapter/util" + "context" "fmt" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "log" "os" "path" ) -func HandleTask(device config.DeviceMapping, task dto.Task) (*dto.FileObject, error) { +var tracer = otel.Tracer("task") + +func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task) (*dto.FileObject, error) { + _, span := tracer.Start(ctx, "startTask") adapter := fs.GetAdapter() + span.SetAttributes() fileList, err := adapter.GetFileList( + ctx, path.Join(device.Name, task.StartTime.Format("2006"+config.Config.FileName.DateSeparator+"01"+config.Config.FileName.DateSeparator+"02")), task.StartTime, ) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "获取文件列表失败") + log.Printf("获取文件列表失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err) return nil, err } files := util.FilterAndSortFiles(fileList, task.StartTime, task.EndTime) if len(files) == 0 { + span.SetStatus(codes.Error, "没有找到文件") return nil, fmt.Errorf("没有找到文件") } - constructTask, err := util.CheckFileCoverageAndConstructTask(files, task.StartTime, task.EndTime, task) + span.SetAttributes(attribute.Int("fileCount", len(files))) + constructTask, err := util.CheckFileCoverageAndConstructTask(ctx, files, task.StartTime, task.EndTime, task) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "文件片段检查失败") + log.Printf("文件片段检查失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err) return nil, err } - ok := util.RunFfmpegTask(constructTask) + ok := util.RunFfmpegTask(ctx, constructTask) if !ok { + span.SetAttributes(attribute.String("error", "ffmpeg任务执行失败")) + span.SetStatus(codes.Error, "ffmpeg任务执行失败") return nil, fmt.Errorf("ffmpeg任务执行失败") } outfile, err := os.Stat(constructTask.OutputFile) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "文件不存在") return nil, fmt.Errorf("文件不存在:%s", constructTask.OutputFile) } + span.SetAttributes(attribute.String("file.name", outfile.Name())) + span.SetAttributes(attribute.Int64("file.size", outfile.Size())) if outfile.Size() < 4096 { + span.SetAttributes(attribute.String("error", "文件大小过小")) + span.SetStatus(codes.Error, "文件大小过小") return nil, fmt.Errorf("文件大小过小:%s", constructTask.OutputFile) } return &dto.FileObject{ diff --git a/fs/adapter.go b/fs/adapter.go index 20cf8ff..bde2abb 100644 --- a/fs/adapter.go +++ b/fs/adapter.go @@ -3,11 +3,12 @@ package fs import ( "ZhenTuLocalPassiveAdapter/config" "ZhenTuLocalPassiveAdapter/dto" + "context" "time" ) type Adapter interface { - GetFileList(path string, relDt time.Time) ([]dto.File, error) + GetFileList(ctx context.Context, path string, relDt time.Time) ([]dto.File, error) } func GetAdapter() Adapter { diff --git a/fs/local_adapter.go b/fs/local_adapter.go index 8e48e80..62861db 100644 --- a/fs/local_adapter.go +++ b/fs/local_adapter.go @@ -4,7 +4,10 @@ import ( "ZhenTuLocalPassiveAdapter/config" "ZhenTuLocalPassiveAdapter/dto" "ZhenTuLocalPassiveAdapter/util" + "context" "fmt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "os" "path" "sort" @@ -15,15 +18,22 @@ type LocalAdapter struct { StorageConfig config.StorageConfig } -func (l *LocalAdapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, error) { +func (l *LocalAdapter) GetFileList(ctx context.Context, dirPath string, relDt time.Time) ([]dto.File, error) { + _, span := tracer.Start(ctx, "GetFileList_local") + defer span.End() if l.StorageConfig.Path == "" { + span.SetAttributes(attribute.String("error", "未配置存储路径")) + span.SetStatus(codes.Error, "未配置存储路径") return nil, fmt.Errorf("未配置存储路径") } // 读取文件夹下目录 files, err := os.ReadDir(path.Join(l.StorageConfig.Path, dirPath)) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "文件夹读取失败") return nil, err } + span.SetAttributes(attribute.Int("file.count", len(files))) var fileList []dto.File for _, file := range files { @@ -44,7 +54,7 @@ func (l *LocalAdapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, if startTime.Equal(stopTime) || stopTime.IsZero() { // 如果文件名没有时间戳,则认为该文件是未录制完成的 // 尝试读取一下视频信息 - duration, err := util.GetVideoDuration(path.Join(l.StorageConfig.Path, dirPath, file.Name())) + duration, err := util.GetVideoDuration(ctx, path.Join(l.StorageConfig.Path, dirPath, file.Name())) if err != nil { // 如果还是没有,就按照配置文件里的加起来 stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration)) @@ -64,5 +74,6 @@ func (l *LocalAdapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, return fileList[i].StartTime.Before(fileList[j].StartTime) }) } + span.SetStatus(codes.Ok, "文件读取成功") return fileList, nil } diff --git a/fs/s3_adapter.go b/fs/s3_adapter.go index 009cfa5..aac52ce 100644 --- a/fs/s3_adapter.go +++ b/fs/s3_adapter.go @@ -14,6 +14,8 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" ) type S3Adapter struct { @@ -43,8 +45,13 @@ func (s *S3Adapter) getClient() (*s3.Client, error) { return s.s3Client, nil } -func (s *S3Adapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, error) { +func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.Time) ([]dto.File, error) { + _, span := tracer.Start(ctx, "GetFileList_s3") + defer span.End() + if s.StorageConfig.S3.Bucket == "" { + span.SetAttributes(attribute.String("error", "未配置S3存储桶")) + span.SetStatus(codes.Error, "未配置S3存储桶") return nil, fmt.Errorf("未配置S3存储桶") } @@ -55,6 +62,8 @@ func (s *S3Adapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, er client, err := s.getClient() if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "创建S3客户端失败") return nil, err } @@ -68,6 +77,8 @@ func (s *S3Adapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, er result, err := client.ListObjectsV2(context.TODO(), listObjectsInput) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "文件列表读取失败") return nil, err } @@ -91,6 +102,8 @@ func (s *S3Adapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, er presignOptions.Expires = 10 * time.Minute }) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "生成预签名URL失败") log.Println("Error presigning GetObject request:", err) continue } @@ -110,8 +123,10 @@ func (s *S3Adapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, er continuationToken = result.NextContinuationToken } + span.SetAttributes(attribute.Int("file.count", len(fileList))) sort.Slice(fileList, func(i, j int) bool { return fileList[i].StartTime.Before(fileList[j].StartTime) }) + span.SetStatus(codes.Ok, "文件读取成功") return fileList, nil } diff --git a/fs/tracer.go b/fs/tracer.go new file mode 100644 index 0000000..fb82b18 --- /dev/null +++ b/fs/tracer.go @@ -0,0 +1,5 @@ +package fs + +import "go.opentelemetry.io/otel" + +var tracer = otel.Tracer("fs") diff --git a/go.mod b/go.mod index 4361398..347a9b6 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,11 @@ require ( github.com/aws/aws-sdk-go-v2/credentials v1.17.62 github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2 github.com/spf13/viper v1.20.0 + go.opentelemetry.io/otel v1.35.0 + go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0 + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0 + go.opentelemetry.io/otel/sdk v1.35.0 + go.opentelemetry.io/otel/sdk/metric v1.35.0 ) require ( @@ -21,8 +26,13 @@ require ( github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect github.com/aws/smithy-go v1.22.3 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect + github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect github.com/pelletier/go-toml/v2 v2.2.3 // indirect github.com/sagikazarmark/locafero v0.8.0 // indirect github.com/sourcegraph/conc v0.3.0 // indirect @@ -30,8 +40,19 @@ require ( github.com/spf13/cast v1.7.1 // indirect github.com/spf13/pflag v1.0.6 // indirect github.com/subosito/gotenv v1.6.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 // indirect + go.opentelemetry.io/otel/metric v1.35.0 // indirect + go.opentelemetry.io/otel/trace v1.35.0 // indirect + go.opentelemetry.io/proto/otlp v1.5.0 // indirect go.uber.org/multierr v1.11.0 // indirect + golang.org/x/net v0.35.0 // indirect golang.org/x/sys v0.31.0 // indirect golang.org/x/text v0.23.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect + google.golang.org/grpc v1.71.0 // indirect + google.golang.org/protobuf v1.36.5 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 2fea549..8fea7c7 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,6 @@ github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38y github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14= -github.com/aws/aws-sdk-go-v2/credentials v1.17.60 h1:1dq+ELaT5ogfmqtV1eocq8SpOK1NRsuUfmhQtD/XAh4= -github.com/aws/aws-sdk-go-v2/credentials v1.17.60/go.mod h1:HDes+fn/xo9VeszXqjBVkxOo/aUy8Mc6QqKvZk32GlE= github.com/aws/aws-sdk-go-v2/credentials v1.17.62 h1:fvtQY3zFzYJ9CfixuAQ96IxDrBajbBWGqjNTCa79ocU= github.com/aws/aws-sdk-go-v2/credentials v1.17.62/go.mod h1:ElETBxIQqcxej++Cs8GyPBbgMys5DgQPTwo7cUPDKt8= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q= @@ -24,16 +22,27 @@ github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2 h1:jIiopHEV22b4yQP2q36Y0OmwLbsxN github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2/go.mod h1:U5SNqwhXB3Xe6F47kXvWihPl/ilGaEDe8HD/50Z9wxc= github.com/aws/smithy-go v1.22.3 h1:Z//5NuZCSW6R4PhQ93hShNbyBbn8BWCmCVCt+Q8Io5k= github.com/aws/smithy-go v1.22.3/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= +github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= +github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= +github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 h1:e9Rjr40Z98/clHv5Yg79Is0NtosR5LXRvdr7o/6NwbA= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1/go.mod h1:tIxuGz/9mpox++sgp9fJjHO0+q1X9/UOWd798aAm22M= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -42,8 +51,8 @@ github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNH github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= -github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/sagikazarmark/locafero v0.8.0 h1:mXaMVw7IqxNBxfv3LdWt9MDmcWDQ1fagDH918lOdVaQ= github.com/sagikazarmark/locafero v0.8.0/go.mod h1:UBUyz37V+EdMS3hDF3QWIiVr/2dPrx49OMO0Bn0hJqk= github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= @@ -60,14 +69,48 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 h1:1fTNlAIJZGWLP5FVu0fikVry1IsiUnXjf7QFvoNN3Xw= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 h1:xJ2qHD0C1BeYVTLLR9sX12+Qb95kfeD/byKj6Ky1pXg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0/go.mod h1:u5BF1xyjstDowA1R5QAO9JHzqK+ublenEW/dyqTjBVk= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0 h1:PB3Zrjs1sG1GBX51SXyTSoOTqcDglmsk7nT6tkKPb/k= +go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0/go.mod h1:U2R3XyVPzn0WX7wOIypPuptulsMcPDPs/oiSVOMVnHY= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0 h1:T0Ec2E+3YZf5bgTNQVet8iTDW7oIk03tXHq+wkwIDnE= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0/go.mod h1:30v2gqH+vYGJsesLWFov8u47EpYTcIQcBjKpI6pJThg= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= +go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= +go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= +go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= +go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= +go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= +golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a h1:nwKuGPlUAt+aR+pcrkfFRrTU1BVrSmYyYMxYbUIVHr0= +google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a/go.mod h1:3kWAYMk1I75K4vykHtKt2ycnOgpA6974V7bREqbsenU= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ= +google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= +google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= -gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 177d1d6..e894079 100644 --- a/main.go +++ b/main.go @@ -5,27 +5,49 @@ import ( "ZhenTuLocalPassiveAdapter/config" "ZhenTuLocalPassiveAdapter/core" "ZhenTuLocalPassiveAdapter/dto" + "ZhenTuLocalPassiveAdapter/telemetry" + "context" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "log" "os" "time" ) +var tracer = otel.Tracer("vpt") + func startTask(device config.DeviceMapping, task dto.Task) { - fo, err := core.HandleTask(device, task) + ctx, span := tracer.Start(context.Background(), "startTask") + span.SetAttributes(attribute.String("deviceNo", device.DeviceNo)) + span.SetAttributes(attribute.String("taskId", task.TaskID)) + span.SetAttributes(attribute.String("scenicId", task.ScenicID)) + span.SetAttributes(attribute.String("startTime", task.StartTime.Format("2006-01-02 15:04:05"))) + span.SetAttributes(attribute.String("endTime", task.EndTime.Format("2006-01-02 15:04:05"))) + fo, err := core.HandleTask(ctx, device, task) if err != nil { + span.SetStatus(codes.Error, "处理任务失败") log.Printf("处理任务失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err) - api.ReportTaskFailure(task.TaskID) + api.ReportTaskFailure(ctx, task.TaskID) return } + span.SetAttributes(attribute.String("fileUrl", fo.URL)) log.Printf("处理任务成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo) - err = api.UploadTaskFile(task, *fo) + err = api.UploadTaskFile(ctx, task, *fo) if err != nil { + span.SetStatus(codes.Error, "上传文件失败") log.Printf("上传文件失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err) - api.ReportTaskFailure(task.TaskID) + api.ReportTaskFailure(ctx, task.TaskID) return } + result := api.ReportTaskSuccess(ctx, task.TaskID, fo) + if result { + span.SetStatus(codes.Error, "上报任务成功失败") + log.Printf("上报任务成功失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err) + return + } + span.SetStatus(codes.Ok, "上传文件成功") log.Printf("上传文件成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo) - api.ReportTaskSuccess(task.TaskID, fo) } func main() { @@ -36,7 +58,13 @@ func main() { } // 日志文件路径 logFilePath := "app.log" - + ctx := context.Background() + shutdown, err := telemetry.InitTelemetry(ctx) + if err != nil { + log.Fatalf("Failed to initialize telemetry: %v", err) + return + } + defer shutdown(ctx) // 创建或打开日志文件 logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666) if err != nil { diff --git a/telemetry/init.go b/telemetry/init.go new file mode 100644 index 0000000..da12db1 --- /dev/null +++ b/telemetry/init.go @@ -0,0 +1,69 @@ +package telemetry + +import ( + "context" + "errors" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/trace" + "time" +) + +func InitTelemetry(ctx context.Context) (shutdown func(context.Context) error, err error) { + var shutdownFuncs []func(context.Context) error + + // shutdown 会调用通过 shutdownFuncs 注册的清理函数。 + // 调用产生的错误会被合并。 + // 每个注册的清理函数将被调用一次。 + shutdown = func(ctx context.Context) error { + var err error + for _, fn := range shutdownFuncs { + err = errors.Join(err, fn(ctx)) + } + shutdownFuncs = nil + return err + } + + // handleErr 调用 shutdown 进行清理,并确保返回所有错误信息。 + handleErr := func(inErr error) { + err = errors.Join(inErr, shutdown(ctx)) + } + + // 设置传播器 + prop := newPropagator() + otel.SetTextMapPropagator(prop) + + // 设置 trace provider. + tracerProvider, err := newJaegerTraceProvider(ctx) + if err != nil { + handleErr(err) + return + } + shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown) + otel.SetTracerProvider(tracerProvider) + + return +} + +func newPropagator() propagation.TextMapPropagator { + return propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ) +} + +func newJaegerTraceProvider(ctx context.Context) (*trace.TracerProvider, error) { + // 创建一个使用 HTTP 协议连接本机Jaeger的 Exporter + traceExporter, err := otlptracehttp.New(ctx, + otlptracehttp.WithEndpointURL("https://oltp.jerryyan.top/v1/traces")) + if err != nil { + return nil, err + } + traceProvider := trace.NewTracerProvider( + trace.WithBatcher(traceExporter, + // 默认为 5s。为便于演示,设置为 1s。 + trace.WithBatchTimeout(time.Second)), + ) + return traceProvider, nil +} diff --git a/util/ffmpeg.go b/util/ffmpeg.go index 83cc309..bfc9b1a 100644 --- a/util/ffmpeg.go +++ b/util/ffmpeg.go @@ -3,7 +3,10 @@ package util import ( "ZhenTuLocalPassiveAdapter/dto" "bytes" + "context" "fmt" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "log" "math/rand" "os" @@ -17,25 +20,36 @@ import ( const FfmpegExec = "ffmpeg" -func RunFfmpegTask(task *dto.FfmpegTask) bool { +func RunFfmpegTask(ctx context.Context, task *dto.FfmpegTask) bool { + _, span := tracer.Start(ctx, "RunFfmpegTask") + defer span.End() var result bool if len(task.Files) == 1 { // 单个文件切割,用简单方法 - result = runFfmpegForSingleFile(task) + result = runFfmpegForSingleFile(ctx, task) } else { // 多个文件切割,用速度快的 - result = runFfmpegForMultipleFile1(task) + result = runFfmpegForMultipleFile1(ctx, task) } // 先尝试方法1 if result { + span.SetStatus(codes.Ok, "FFMPEG简易方法成功") return true } log.Printf("FFMPEG简易方法失败,尝试复杂方法转码") // 不行再尝试方法二 - return runFfmpegForMultipleFile2(task) + result = runFfmpegForMultipleFile2(ctx, task) + if result { + span.SetStatus(codes.Ok, "FFMPEG复杂方法成功") + return true + } + span.SetStatus(codes.Error, "FFMPEG复杂方法失败") + return result } -func runFfmpegForMultipleFile1(task *dto.FfmpegTask) bool { +func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool { + _, span := tracer.Start(ctx, "runFfmpegForMultipleFile1") + defer span.End() // 多文件,方法一:先转换成ts,然后合并切割 // 步骤一:先转换成ts,并行转换 var wg sync.WaitGroup @@ -47,7 +61,7 @@ func runFfmpegForMultipleFile1(task *dto.FfmpegTask) bool { go func(file *dto.File) { defer wg.Done() tmpFile := path.Join(os.TempDir(), file.Name+".ts") - result, err := convertMp4ToTs(*file, tmpFile) + result, err := convertMp4ToTs(ctx, *file, tmpFile) if err != nil { log.Printf("转码出错: %v", err) mu.Lock() @@ -69,12 +83,15 @@ func runFfmpegForMultipleFile1(task *dto.FfmpegTask) bool { wg.Wait() if notOk { + span.SetStatus(codes.Error, "FFMPEG多文件转码失败") return false } // 步骤二:使用concat协议拼接裁切 - result, err := QuickConcatVideoCut(task.Files, int64(task.Offset), int64(task.Length), task.OutputFile) + result, err := QuickConcatVideoCut(ctx, task.Files, int64(task.Offset), int64(task.Length), task.OutputFile) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "FFMPEG多文件concat协议转码失败") return false } @@ -84,34 +101,49 @@ func runFfmpegForMultipleFile1(task *dto.FfmpegTask) bool { log.Printf("删除临时文件失败: %v", err) } } - + if result { + span.SetStatus(codes.Ok, "FFMPEG多文件concat协议转码成功") + } else { + span.SetStatus(codes.Error, "FFMPEG多文件concat协议转码失败") + } return result } -func runFfmpegForMultipleFile2(task *dto.FfmpegTask) bool { +func runFfmpegForMultipleFile2(ctx context.Context, task *dto.FfmpegTask) bool { + _, span := tracer.Start(ctx, "runFfmpegForMultipleFile2") + defer span.End() // 多文件,方法二:使用计算资源编码 - result, err := SlowVideoCut(task.Files, int64(task.Offset), int64(task.Length), task.OutputFile) + result, err := SlowVideoCut(ctx, task.Files, int64(task.Offset), int64(task.Length), task.OutputFile) if err != nil { return false } return result } -func runFfmpegForSingleFile(task *dto.FfmpegTask) bool { - result, err := QuickVideoCut(task.Files[0].Url, int64(task.Offset), int64(task.Length), task.OutputFile) +func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool { + _, span := tracer.Start(ctx, "runFfmpegForSingleFile") + defer span.End() + result, err := QuickVideoCut(ctx, task.Files[0].Url, int64(task.Offset), int64(task.Length), task.OutputFile) if err != nil { + span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败") return false } - _, err = os.Stat(task.OutputFile) + stat, err := os.Stat(task.OutputFile) if err != nil { + span.SetStatus(codes.Error, "文件不存在") log.Printf("文件不存在:%s", task.OutputFile) return false } + span.SetAttributes(attribute.String("file.name", task.OutputFile)) + span.SetAttributes(attribute.Int64("file.size", stat.Size())) return result } -func CheckFileCoverageAndConstructTask(fileList []dto.File, beginDt, endDt time.Time, task dto.Task) (*dto.FfmpegTask, error) { +func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File, beginDt, endDt time.Time, task dto.Task) (*dto.FfmpegTask, error) { + _, span := tracer.Start(ctx, "CheckFileCoverageAndConstructTask") + defer span.End() if fileList == nil || len(fileList) == 0 { + span.SetStatus(codes.Error, "无法根据要求找到对应录制片段") log.Printf("无法根据要求找到对应录制片段!ID:【%s】,开始时间:【%s】,结束时间:【%s】", task.TaskID, beginDt, endDt) return nil, fmt.Errorf("无法根据要求找到对应录制片段") } @@ -126,6 +158,7 @@ func CheckFileCoverageAndConstructTask(fileList []dto.File, beginDt, endDt time. } if file.StartTime.Sub(lastFile.EndTime).Milliseconds() > 2000 { // 片段断开 + span.SetStatus(codes.Error, "FFMPEG片段断开") log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s,%s】中间断开【%f】秒(超过2秒)", task.TaskID, lastFile.Name, file.Name, file.StartTime.Sub(lastFile.EndTime).Seconds()) return nil, fmt.Errorf("片段断开") } @@ -135,6 +168,7 @@ func CheckFileCoverageAndConstructTask(fileList []dto.File, beginDt, endDt time. // 通过文件列表构造的任务仍然是缺失的 if fileList[len(fileList)-1].EndTime.Before(endDt) { + span.SetStatus(codes.Error, "FFMPEG片段断开") log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s】,无法完整覆盖时间点【%s】", task.TaskID, fileList[len(fileList)-1].Name, endDt) return nil, fmt.Errorf("片段断开") } @@ -146,11 +180,16 @@ func CheckFileCoverageAndConstructTask(fileList []dto.File, beginDt, endDt time. Offset: int(beginDt.Sub(fileList[0].StartTime).Seconds()), OutputFile: path.Join(os.TempDir(), task.TaskID+".mp4"), } - + span.SetAttributes(attribute.Int("task.files", len(ffmpegTask.Files))) + span.SetAttributes(attribute.Int("task.offset", ffmpegTask.Offset)) + span.SetAttributes(attribute.Int("task.length", ffmpegTask.Length)) + span.SetStatus(codes.Ok, "FFMPEG任务构造成功") return ffmpegTask, nil } -func convertMp4ToTs(file dto.File, outFileName string) (bool, error) { +func convertMp4ToTs(ctx context.Context, file dto.File, outFileName string) (bool, error) { + _, span := tracer.Start(ctx, "convertMp4ToTs") + defer span.End() ffmpegCmd := []string{ FfmpegExec, "-hide_banner", @@ -161,10 +200,12 @@ func convertMp4ToTs(file dto.File, outFileName string) (bool, error) { "-f", "mpegts", outFileName, } - return handleFfmpegProcess(ffmpegCmd) + return handleFfmpegProcess(ctx, ffmpegCmd) } -func convertHevcToTs(file dto.File, outFileName string) (bool, error) { +func convertHevcToTs(ctx context.Context, file dto.File, outFileName string) (bool, error) { + _, span := tracer.Start(ctx, "convertHevcToTs") + defer span.End() ffmpegCmd := []string{ FfmpegExec, "-hide_banner", @@ -175,10 +216,12 @@ func convertHevcToTs(file dto.File, outFileName string) (bool, error) { "-f", "mpegts", outFileName, } - return handleFfmpegProcess(ffmpegCmd) + return handleFfmpegProcess(ctx, ffmpegCmd) } -func QuickVideoCut(inputFile string, offset, length int64, outputFile string) (bool, error) { +func QuickVideoCut(ctx context.Context, inputFile string, offset, length int64, outputFile string) (bool, error) { + _, span := tracer.Start(ctx, "QuickVideoCut") + defer span.End() ffmpegCmd := []string{ FfmpegExec, "-hide_banner", @@ -189,16 +232,21 @@ func QuickVideoCut(inputFile string, offset, length int64, outputFile string) (b "-reset_timestamps", "1", "-ss", strconv.FormatInt(offset, 10), "-t", strconv.FormatInt(length, 10), + "-fflags", "+genpts", "-f", "mp4", outputFile, } - return handleFfmpegProcess(ffmpegCmd) + return handleFfmpegProcess(ctx, ffmpegCmd) } -func QuickConcatVideoCut(inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) { +func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) { + _, span := tracer.Start(ctx, "QuickConcatVideoCut") + defer span.End() tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64()) tmpFileObj, err := os.Create(tmpFile) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "创建临时文件失败") log.Printf("创建临时文件失败:%s", tmpFile) return false, err } @@ -208,6 +256,8 @@ func QuickConcatVideoCut(inputFiles []dto.File, offset, length int64, outputFile for _, filePo := range inputFiles { _, err := tmpFileObj.WriteString(fmt.Sprintf("file '%s'\n", filePo.Url)) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "写入临时文件失败") log.Printf("写入临时文件失败:%s", tmpFile) return false, err } @@ -227,10 +277,12 @@ func QuickConcatVideoCut(inputFiles []dto.File, offset, length int64, outputFile "-f", "mp4", outputFile, } - return handleFfmpegProcess(ffmpegCmd) + return handleFfmpegProcess(ctx, ffmpegCmd) } -func SlowVideoCut(inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) { +func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) { + _, span := tracer.Start(ctx, "SlowVideoCut") + defer span.End() ffmpegCmd := []string{ FfmpegExec, "-hide_banner", @@ -259,11 +311,17 @@ func SlowVideoCut(inputFiles []dto.File, offset, length int64, outputFile string outputFile, ) - return handleFfmpegProcess(ffmpegCmd) + return handleFfmpegProcess(ctx, ffmpegCmd) } -func handleFfmpegProcess(ffmpegCmd []string) (bool, error) { +func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error) { + _, span := tracer.Start(ctx, "handleFfmpegProcess") + defer span.End() + span.SetAttributes(attribute.String("ffmpeg.cmd", strings.Join(ffmpegCmd, " "))) startTime := time.Now() + defer func() { + span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds()))) + }() log.Printf("FFMPEG执行命令:【%s】", strings.Join(ffmpegCmd, " ")) cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...) @@ -272,6 +330,8 @@ func handleFfmpegProcess(ffmpegCmd []string) (bool, error) { err := cmd.Start() if err != nil { + span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) + span.SetStatus(codes.Error, "FFMPEG执行命令失败") log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " ")) return false, err } @@ -284,10 +344,14 @@ func handleFfmpegProcess(ffmpegCmd []string) (bool, error) { select { case <-time.After(1 * time.Minute): + span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) + span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出") log.Printf("FFMPEG执行命令没有在1分钟内退出,命令:【%s】", strings.Join(ffmpegCmd, " ")) return false, fmt.Errorf("ffmpeg command timed out") case err := <-done: if err != nil { + span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String())) + span.SetStatus(codes.Error, "FFMPEG执行命令失败") log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " ")) return false, err } @@ -297,7 +361,9 @@ func handleFfmpegProcess(ffmpegCmd []string) (bool, error) { } } -func GetVideoDuration(filePath string) (float64, error) { +func GetVideoDuration(ctx context.Context, filePath string) (float64, error) { + _, span := tracer.Start(ctx, "GetVideoDuration") + defer span.End() ffprobeCmd := []string{ "ffprobe", "-v", "error", @@ -305,21 +371,25 @@ func GetVideoDuration(filePath string) (float64, error) { "-of", "default=noprint_wrappers=1:nokey=1", filePath, } - + span.SetAttributes(attribute.String("ffprobe.cmd", strings.Join(ffprobeCmd, " "))) cmd := exec.Command(ffprobeCmd[0], ffprobeCmd[1:]...) var out bytes.Buffer cmd.Stdout = &out err := cmd.Run() if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "failed to get video duration") return 0, fmt.Errorf("failed to get video duration: %w", err) } - + span.SetAttributes(attribute.String("ffmpeg.stdout", out.String())) durationStr := strings.TrimSpace(out.String()) duration, err := strconv.ParseFloat(durationStr, 64) if err != nil { + span.SetAttributes(attribute.String("error", err.Error())) + span.SetStatus(codes.Error, "failed to parse video duration") return 0, fmt.Errorf("failed to parse video duration: %w", err) } - + span.SetAttributes(attribute.Float64("video.duration", duration)) return duration, nil } diff --git a/util/tracer.go b/util/tracer.go new file mode 100644 index 0000000..9eae6b3 --- /dev/null +++ b/util/tracer.go @@ -0,0 +1,5 @@ +package util + +import "go.opentelemetry.io/otel" + +var tracer = otel.Tracer("util")