telemetry
This commit is contained in:
parent
a6c5ba5957
commit
3d989c2f47
@ -3,38 +3,55 @@ package api
|
|||||||
import (
|
import (
|
||||||
"ZhenTuLocalPassiveAdapter/dto"
|
"ZhenTuLocalPassiveAdapter/dto"
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
)
|
)
|
||||||
|
|
||||||
func UploadTaskFile(task dto.Task, file dto.FileObject) error {
|
func UploadTaskFile(ctx context.Context, task dto.Task, file dto.FileObject) error {
|
||||||
url, err := QueryUploadUrlForTask(task.TaskID)
|
url, err := QueryUploadUrlForTask(ctx, task.TaskID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := OssUpload(url, file.URL); err != nil {
|
log.Printf("开始上传文件, URL:【%s】\n", url)
|
||||||
|
if err := OssUpload(ctx, url, file.URL); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func OssUpload(url, filePath string) error {
|
func OssUpload(ctx context.Context, url, filePath string) error {
|
||||||
|
_, span := tracer.Start(ctx, "OssUpload")
|
||||||
|
defer span.End()
|
||||||
|
span.SetAttributes(attribute.String("file.path", filePath))
|
||||||
// 使用 http put 请求上传文件
|
// 使用 http put 请求上传文件
|
||||||
file, err := os.Open(filePath)
|
file, err := os.Open(filePath)
|
||||||
defer os.Remove(filePath)
|
defer os.Remove(filePath)
|
||||||
defer file.Close()
|
defer file.Close()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "文件打开失败")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
fileBytes, err := io.ReadAll(file)
|
fileBytes, err := io.ReadAll(file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "文件读取失败")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
span.SetAttributes(attribute.Int("file.size", len(fileBytes)))
|
||||||
|
|
||||||
|
span.SetAttributes(attribute.String("http.url", url))
|
||||||
|
span.SetAttributes(attribute.String("http.method", "PUT"))
|
||||||
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(fileBytes))
|
req, err := http.NewRequest("PUT", url, bytes.NewBuffer(fileBytes))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "创建请求失败")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "video/mp4")
|
req.Header.Set("Content-Type", "video/mp4")
|
||||||
@ -42,11 +59,17 @@ func OssUpload(url, filePath string) error {
|
|||||||
client := &http.Client{}
|
client := &http.Client{}
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "发送请求失败")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
span.SetAttributes(attribute.String("http.status", resp.Status))
|
||||||
|
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
|
||||||
if resp.StatusCode != http.StatusOK {
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
span.SetStatus(codes.Error, "上传失败")
|
||||||
return fmt.Errorf("upload failed with status code %d", resp.StatusCode)
|
return fmt.Errorf("upload failed with status code %d", resp.StatusCode)
|
||||||
}
|
}
|
||||||
|
span.SetStatus(codes.Ok, "上传成功")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -4,69 +4,127 @@ import (
|
|||||||
"ZhenTuLocalPassiveAdapter/config"
|
"ZhenTuLocalPassiveAdapter/config"
|
||||||
"ZhenTuLocalPassiveAdapter/dto"
|
"ZhenTuLocalPassiveAdapter/dto"
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
)
|
)
|
||||||
|
|
||||||
func QueryUploadUrlForTask(taskId string) (string, error) {
|
func QueryUploadUrlForTask(ctx context.Context, taskId string) (string, error) {
|
||||||
|
_, span := tracer.Start(ctx, "QueryUploadUrlForTask")
|
||||||
|
defer span.End()
|
||||||
url := config.Config.Api.BaseUrl + "/" + taskId + "/uploadUrl"
|
url := config.Config.Api.BaseUrl + "/" + taskId + "/uploadUrl"
|
||||||
|
span.SetAttributes(attribute.String("http.url", url))
|
||||||
|
span.SetAttributes(attribute.String("http.method", "POST"))
|
||||||
req, err := http.NewRequest("POST", url, nil)
|
req, err := http.NewRequest("POST", url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "创建请求失败")
|
||||||
log.Println("Error creating request:", err)
|
log.Println("Error creating request:", err)
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
client := &http.Client{}
|
client := &http.Client{}
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "发送请求失败")
|
||||||
log.Println("Error sending request:", err)
|
log.Println("Error sending request:", err)
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
|
span.SetAttributes(attribute.String("http.status", resp.Status))
|
||||||
|
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
body, err := io.ReadAll(resp.Body)
|
body, err := io.ReadAll(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "读取响应体失败")
|
||||||
log.Println("Error reading response body:", err)
|
log.Println("Error reading response body:", err)
|
||||||
return "", err
|
return "", err
|
||||||
}
|
}
|
||||||
return string(body), nil
|
return string(body), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReportTaskFailure(taskId string) {
|
func ReportTaskFailure(ctx context.Context, taskId string) bool {
|
||||||
|
_, span := tracer.Start(ctx, "ReportTaskFailure")
|
||||||
|
defer span.End()
|
||||||
url := config.Config.Api.BaseUrl + "/" + taskId + "/failure"
|
url := config.Config.Api.BaseUrl + "/" + taskId + "/failure"
|
||||||
|
span.SetAttributes(attribute.String("http.url", url))
|
||||||
|
span.SetAttributes(attribute.String("http.method", "POST"))
|
||||||
|
|
||||||
req, err := http.NewRequest("POST", url, nil)
|
req, err := http.NewRequest("POST", url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "创建请求失败")
|
||||||
log.Println("Error creating request:", err)
|
log.Println("Error creating request:", err)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
client := &http.Client{}
|
client := &http.Client{}
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "发送请求失败")
|
||||||
log.Println("Error sending request:", err)
|
log.Println("Error sending request:", err)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
span.SetAttributes(attribute.String("http.status", resp.Status))
|
||||||
|
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
|
||||||
|
if resp.StatusCode == 200 {
|
||||||
|
span.SetStatus(codes.Ok, "成功")
|
||||||
|
return true
|
||||||
|
} else {
|
||||||
|
span.SetStatus(codes.Error, "失败")
|
||||||
|
return false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func ReportTaskSuccess(taskId string, file *dto.FileObject) {
|
func ReportTaskSuccess(ctx context.Context, taskId string, file *dto.FileObject) (b bool) {
|
||||||
|
_, span := tracer.Start(ctx, "ReportTaskSuccess")
|
||||||
|
defer span.End()
|
||||||
url := config.Config.Api.BaseUrl + "/" + taskId + "/success"
|
url := config.Config.Api.BaseUrl + "/" + taskId + "/success"
|
||||||
|
span.SetAttributes(attribute.String("http.url", url))
|
||||||
|
span.SetAttributes(attribute.String("http.method", "POST"))
|
||||||
|
|
||||||
jsonData, err := json.Marshal(file)
|
jsonData, err := json.Marshal(file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "序列化JSON失败")
|
||||||
log.Println("Error marshaling JSON:", err)
|
log.Println("Error marshaling JSON:", err)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "创建请求失败")
|
||||||
log.Println("Error creating request:", err)
|
log.Println("Error creating request:", err)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
client := &http.Client{}
|
client := &http.Client{}
|
||||||
resp, err := client.Do(req)
|
resp, err := client.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "发送请求失败")
|
||||||
log.Println("Error sending request:", err)
|
log.Println("Error sending request:", err)
|
||||||
return
|
return false
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
span.SetAttributes(attribute.String("http.status", resp.Status))
|
||||||
|
span.SetAttributes(attribute.Int("http.status_code", resp.StatusCode))
|
||||||
|
|
||||||
|
if resp.StatusCode == 200 {
|
||||||
|
span.SetStatus(codes.Ok, "成功")
|
||||||
|
return true
|
||||||
|
} else {
|
||||||
|
span.SetStatus(codes.Error, "失败")
|
||||||
|
return false
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
5
api/tracer.go
Normal file
5
api/tracer.go
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import "go.opentelemetry.io/otel"
|
||||||
|
|
||||||
|
var tracer = otel.Tracer("api")
|
32
core/task.go
32
core/task.go
@ -5,37 +5,63 @@ import (
|
|||||||
"ZhenTuLocalPassiveAdapter/dto"
|
"ZhenTuLocalPassiveAdapter/dto"
|
||||||
"ZhenTuLocalPassiveAdapter/fs"
|
"ZhenTuLocalPassiveAdapter/fs"
|
||||||
"ZhenTuLocalPassiveAdapter/util"
|
"ZhenTuLocalPassiveAdapter/util"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
)
|
)
|
||||||
|
|
||||||
func HandleTask(device config.DeviceMapping, task dto.Task) (*dto.FileObject, error) {
|
var tracer = otel.Tracer("task")
|
||||||
|
|
||||||
|
func HandleTask(ctx context.Context, device config.DeviceMapping, task dto.Task) (*dto.FileObject, error) {
|
||||||
|
_, span := tracer.Start(ctx, "startTask")
|
||||||
adapter := fs.GetAdapter()
|
adapter := fs.GetAdapter()
|
||||||
|
span.SetAttributes()
|
||||||
fileList, err := adapter.GetFileList(
|
fileList, err := adapter.GetFileList(
|
||||||
|
ctx,
|
||||||
path.Join(device.Name, task.StartTime.Format("2006"+config.Config.FileName.DateSeparator+"01"+config.Config.FileName.DateSeparator+"02")),
|
path.Join(device.Name, task.StartTime.Format("2006"+config.Config.FileName.DateSeparator+"01"+config.Config.FileName.DateSeparator+"02")),
|
||||||
task.StartTime,
|
task.StartTime,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "获取文件列表失败")
|
||||||
|
log.Printf("获取文件列表失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
files := util.FilterAndSortFiles(fileList, task.StartTime, task.EndTime)
|
files := util.FilterAndSortFiles(fileList, task.StartTime, task.EndTime)
|
||||||
if len(files) == 0 {
|
if len(files) == 0 {
|
||||||
|
span.SetStatus(codes.Error, "没有找到文件")
|
||||||
return nil, fmt.Errorf("没有找到文件")
|
return nil, fmt.Errorf("没有找到文件")
|
||||||
}
|
}
|
||||||
constructTask, err := util.CheckFileCoverageAndConstructTask(files, task.StartTime, task.EndTime, task)
|
span.SetAttributes(attribute.Int("fileCount", len(files)))
|
||||||
|
constructTask, err := util.CheckFileCoverageAndConstructTask(ctx, files, task.StartTime, task.EndTime, task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "文件片段检查失败")
|
||||||
|
log.Printf("文件片段检查失败, DeviceNo: %s, 错误: %v\n", device.DeviceNo, err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ok := util.RunFfmpegTask(constructTask)
|
ok := util.RunFfmpegTask(ctx, constructTask)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
span.SetAttributes(attribute.String("error", "ffmpeg任务执行失败"))
|
||||||
|
span.SetStatus(codes.Error, "ffmpeg任务执行失败")
|
||||||
return nil, fmt.Errorf("ffmpeg任务执行失败")
|
return nil, fmt.Errorf("ffmpeg任务执行失败")
|
||||||
}
|
}
|
||||||
outfile, err := os.Stat(constructTask.OutputFile)
|
outfile, err := os.Stat(constructTask.OutputFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "文件不存在")
|
||||||
return nil, fmt.Errorf("文件不存在:%s", constructTask.OutputFile)
|
return nil, fmt.Errorf("文件不存在:%s", constructTask.OutputFile)
|
||||||
}
|
}
|
||||||
|
span.SetAttributes(attribute.String("file.name", outfile.Name()))
|
||||||
|
span.SetAttributes(attribute.Int64("file.size", outfile.Size()))
|
||||||
if outfile.Size() < 4096 {
|
if outfile.Size() < 4096 {
|
||||||
|
span.SetAttributes(attribute.String("error", "文件大小过小"))
|
||||||
|
span.SetStatus(codes.Error, "文件大小过小")
|
||||||
return nil, fmt.Errorf("文件大小过小:%s", constructTask.OutputFile)
|
return nil, fmt.Errorf("文件大小过小:%s", constructTask.OutputFile)
|
||||||
}
|
}
|
||||||
return &dto.FileObject{
|
return &dto.FileObject{
|
||||||
|
@ -3,11 +3,12 @@ package fs
|
|||||||
import (
|
import (
|
||||||
"ZhenTuLocalPassiveAdapter/config"
|
"ZhenTuLocalPassiveAdapter/config"
|
||||||
"ZhenTuLocalPassiveAdapter/dto"
|
"ZhenTuLocalPassiveAdapter/dto"
|
||||||
|
"context"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Adapter interface {
|
type Adapter interface {
|
||||||
GetFileList(path string, relDt time.Time) ([]dto.File, error)
|
GetFileList(ctx context.Context, path string, relDt time.Time) ([]dto.File, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetAdapter() Adapter {
|
func GetAdapter() Adapter {
|
||||||
|
@ -4,7 +4,10 @@ import (
|
|||||||
"ZhenTuLocalPassiveAdapter/config"
|
"ZhenTuLocalPassiveAdapter/config"
|
||||||
"ZhenTuLocalPassiveAdapter/dto"
|
"ZhenTuLocalPassiveAdapter/dto"
|
||||||
"ZhenTuLocalPassiveAdapter/util"
|
"ZhenTuLocalPassiveAdapter/util"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"sort"
|
"sort"
|
||||||
@ -15,15 +18,22 @@ type LocalAdapter struct {
|
|||||||
StorageConfig config.StorageConfig
|
StorageConfig config.StorageConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *LocalAdapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, error) {
|
func (l *LocalAdapter) GetFileList(ctx context.Context, dirPath string, relDt time.Time) ([]dto.File, error) {
|
||||||
|
_, span := tracer.Start(ctx, "GetFileList_local")
|
||||||
|
defer span.End()
|
||||||
if l.StorageConfig.Path == "" {
|
if l.StorageConfig.Path == "" {
|
||||||
|
span.SetAttributes(attribute.String("error", "未配置存储路径"))
|
||||||
|
span.SetStatus(codes.Error, "未配置存储路径")
|
||||||
return nil, fmt.Errorf("未配置存储路径")
|
return nil, fmt.Errorf("未配置存储路径")
|
||||||
}
|
}
|
||||||
// 读取文件夹下目录
|
// 读取文件夹下目录
|
||||||
files, err := os.ReadDir(path.Join(l.StorageConfig.Path, dirPath))
|
files, err := os.ReadDir(path.Join(l.StorageConfig.Path, dirPath))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "文件夹读取失败")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
span.SetAttributes(attribute.Int("file.count", len(files)))
|
||||||
|
|
||||||
var fileList []dto.File
|
var fileList []dto.File
|
||||||
for _, file := range files {
|
for _, file := range files {
|
||||||
@ -44,7 +54,7 @@ func (l *LocalAdapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File,
|
|||||||
if startTime.Equal(stopTime) || stopTime.IsZero() {
|
if startTime.Equal(stopTime) || stopTime.IsZero() {
|
||||||
// 如果文件名没有时间戳,则认为该文件是未录制完成的
|
// 如果文件名没有时间戳,则认为该文件是未录制完成的
|
||||||
// 尝试读取一下视频信息
|
// 尝试读取一下视频信息
|
||||||
duration, err := util.GetVideoDuration(path.Join(l.StorageConfig.Path, dirPath, file.Name()))
|
duration, err := util.GetVideoDuration(ctx, path.Join(l.StorageConfig.Path, dirPath, file.Name()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// 如果还是没有,就按照配置文件里的加起来
|
// 如果还是没有,就按照配置文件里的加起来
|
||||||
stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration))
|
stopTime = stopTime.Add(time.Second * time.Duration(config.Config.Record.Duration))
|
||||||
@ -64,5 +74,6 @@ func (l *LocalAdapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File,
|
|||||||
return fileList[i].StartTime.Before(fileList[j].StartTime)
|
return fileList[i].StartTime.Before(fileList[j].StartTime)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
span.SetStatus(codes.Ok, "文件读取成功")
|
||||||
return fileList, nil
|
return fileList, nil
|
||||||
}
|
}
|
||||||
|
@ -14,6 +14,8 @@ import (
|
|||||||
|
|
||||||
"github.com/aws/aws-sdk-go-v2/aws"
|
"github.com/aws/aws-sdk-go-v2/aws"
|
||||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
)
|
)
|
||||||
|
|
||||||
type S3Adapter struct {
|
type S3Adapter struct {
|
||||||
@ -43,8 +45,13 @@ func (s *S3Adapter) getClient() (*s3.Client, error) {
|
|||||||
return s.s3Client, nil
|
return s.s3Client, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *S3Adapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, error) {
|
func (s *S3Adapter) GetFileList(ctx context.Context, dirPath string, relDt time.Time) ([]dto.File, error) {
|
||||||
|
_, span := tracer.Start(ctx, "GetFileList_s3")
|
||||||
|
defer span.End()
|
||||||
|
|
||||||
if s.StorageConfig.S3.Bucket == "" {
|
if s.StorageConfig.S3.Bucket == "" {
|
||||||
|
span.SetAttributes(attribute.String("error", "未配置S3存储桶"))
|
||||||
|
span.SetStatus(codes.Error, "未配置S3存储桶")
|
||||||
return nil, fmt.Errorf("未配置S3存储桶")
|
return nil, fmt.Errorf("未配置S3存储桶")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -55,6 +62,8 @@ func (s *S3Adapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, er
|
|||||||
|
|
||||||
client, err := s.getClient()
|
client, err := s.getClient()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "创建S3客户端失败")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -68,6 +77,8 @@ func (s *S3Adapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, er
|
|||||||
|
|
||||||
result, err := client.ListObjectsV2(context.TODO(), listObjectsInput)
|
result, err := client.ListObjectsV2(context.TODO(), listObjectsInput)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "文件列表读取失败")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -91,6 +102,8 @@ func (s *S3Adapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, er
|
|||||||
presignOptions.Expires = 10 * time.Minute
|
presignOptions.Expires = 10 * time.Minute
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "生成预签名URL失败")
|
||||||
log.Println("Error presigning GetObject request:", err)
|
log.Println("Error presigning GetObject request:", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -110,8 +123,10 @@ func (s *S3Adapter) GetFileList(dirPath string, relDt time.Time) ([]dto.File, er
|
|||||||
continuationToken = result.NextContinuationToken
|
continuationToken = result.NextContinuationToken
|
||||||
}
|
}
|
||||||
|
|
||||||
|
span.SetAttributes(attribute.Int("file.count", len(fileList)))
|
||||||
sort.Slice(fileList, func(i, j int) bool {
|
sort.Slice(fileList, func(i, j int) bool {
|
||||||
return fileList[i].StartTime.Before(fileList[j].StartTime)
|
return fileList[i].StartTime.Before(fileList[j].StartTime)
|
||||||
})
|
})
|
||||||
|
span.SetStatus(codes.Ok, "文件读取成功")
|
||||||
return fileList, nil
|
return fileList, nil
|
||||||
}
|
}
|
||||||
|
5
fs/tracer.go
Normal file
5
fs/tracer.go
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
package fs
|
||||||
|
|
||||||
|
import "go.opentelemetry.io/otel"
|
||||||
|
|
||||||
|
var tracer = otel.Tracer("fs")
|
21
go.mod
21
go.mod
@ -9,6 +9,11 @@ require (
|
|||||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.62
|
github.com/aws/aws-sdk-go-v2/credentials v1.17.62
|
||||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2
|
github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2
|
||||||
github.com/spf13/viper v1.20.0
|
github.com/spf13/viper v1.20.0
|
||||||
|
go.opentelemetry.io/otel v1.35.0
|
||||||
|
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0
|
||||||
|
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0
|
||||||
|
go.opentelemetry.io/otel/sdk v1.35.0
|
||||||
|
go.opentelemetry.io/otel/sdk/metric v1.35.0
|
||||||
)
|
)
|
||||||
|
|
||||||
require (
|
require (
|
||||||
@ -21,8 +26,13 @@ require (
|
|||||||
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect
|
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 // indirect
|
||||||
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect
|
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.15 // indirect
|
||||||
github.com/aws/smithy-go v1.22.3 // indirect
|
github.com/aws/smithy-go v1.22.3 // indirect
|
||||||
|
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||||
github.com/fsnotify/fsnotify v1.8.0 // indirect
|
github.com/fsnotify/fsnotify v1.8.0 // indirect
|
||||||
|
github.com/go-logr/logr v1.4.2 // indirect
|
||||||
|
github.com/go-logr/stdr v1.2.2 // indirect
|
||||||
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
|
github.com/go-viper/mapstructure/v2 v2.2.1 // indirect
|
||||||
|
github.com/google/uuid v1.6.0 // indirect
|
||||||
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 // indirect
|
||||||
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
|
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
|
||||||
github.com/sagikazarmark/locafero v0.8.0 // indirect
|
github.com/sagikazarmark/locafero v0.8.0 // indirect
|
||||||
github.com/sourcegraph/conc v0.3.0 // indirect
|
github.com/sourcegraph/conc v0.3.0 // indirect
|
||||||
@ -30,8 +40,19 @@ require (
|
|||||||
github.com/spf13/cast v1.7.1 // indirect
|
github.com/spf13/cast v1.7.1 // indirect
|
||||||
github.com/spf13/pflag v1.0.6 // indirect
|
github.com/spf13/pflag v1.0.6 // indirect
|
||||||
github.com/subosito/gotenv v1.6.0 // indirect
|
github.com/subosito/gotenv v1.6.0 // indirect
|
||||||
|
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
|
||||||
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 // indirect
|
||||||
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 // indirect
|
||||||
|
go.opentelemetry.io/otel/metric v1.35.0 // indirect
|
||||||
|
go.opentelemetry.io/otel/trace v1.35.0 // indirect
|
||||||
|
go.opentelemetry.io/proto/otlp v1.5.0 // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
|
golang.org/x/net v0.35.0 // indirect
|
||||||
golang.org/x/sys v0.31.0 // indirect
|
golang.org/x/sys v0.31.0 // indirect
|
||||||
golang.org/x/text v0.23.0 // indirect
|
golang.org/x/text v0.23.0 // indirect
|
||||||
|
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a // indirect
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a // indirect
|
||||||
|
google.golang.org/grpc v1.71.0 // indirect
|
||||||
|
google.golang.org/protobuf v1.36.5 // indirect
|
||||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||||
)
|
)
|
||||||
|
59
go.sum
59
go.sum
@ -2,8 +2,6 @@ github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38y
|
|||||||
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
|
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
|
||||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
|
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 h1:zAybnyUQXIZ5mok5Jqwlf58/TFE7uvd3IAsa1aF9cXs=
|
||||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14=
|
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10/go.mod h1:qqvMj6gHLR/EXWZw4ZbqlPbQUyenf4h82UQUlKc+l14=
|
||||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.60 h1:1dq+ELaT5ogfmqtV1eocq8SpOK1NRsuUfmhQtD/XAh4=
|
|
||||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.60/go.mod h1:HDes+fn/xo9VeszXqjBVkxOo/aUy8Mc6QqKvZk32GlE=
|
|
||||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.62 h1:fvtQY3zFzYJ9CfixuAQ96IxDrBajbBWGqjNTCa79ocU=
|
github.com/aws/aws-sdk-go-v2/credentials v1.17.62 h1:fvtQY3zFzYJ9CfixuAQ96IxDrBajbBWGqjNTCa79ocU=
|
||||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.62/go.mod h1:ElETBxIQqcxej++Cs8GyPBbgMys5DgQPTwo7cUPDKt8=
|
github.com/aws/aws-sdk-go-v2/credentials v1.17.62/go.mod h1:ElETBxIQqcxej++Cs8GyPBbgMys5DgQPTwo7cUPDKt8=
|
||||||
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q=
|
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q=
|
||||||
@ -24,16 +22,27 @@ github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2 h1:jIiopHEV22b4yQP2q36Y0OmwLbsxN
|
|||||||
github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2/go.mod h1:U5SNqwhXB3Xe6F47kXvWihPl/ilGaEDe8HD/50Z9wxc=
|
github.com/aws/aws-sdk-go-v2/service/s3 v1.78.2/go.mod h1:U5SNqwhXB3Xe6F47kXvWihPl/ilGaEDe8HD/50Z9wxc=
|
||||||
github.com/aws/smithy-go v1.22.3 h1:Z//5NuZCSW6R4PhQ93hShNbyBbn8BWCmCVCt+Q8Io5k=
|
github.com/aws/smithy-go v1.22.3 h1:Z//5NuZCSW6R4PhQ93hShNbyBbn8BWCmCVCt+Q8Io5k=
|
||||||
github.com/aws/smithy-go v1.22.3/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI=
|
github.com/aws/smithy-go v1.22.3/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI=
|
||||||
|
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
|
||||||
|
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
|
||||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
|
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
|
||||||
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
|
github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0=
|
||||||
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
|
github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M=
|
||||||
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
|
github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0=
|
||||||
|
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
|
||||||
|
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
|
||||||
|
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
|
||||||
|
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
|
||||||
|
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
|
||||||
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
|
github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss=
|
||||||
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
|
github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
|
||||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
|
||||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
|
||||||
|
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
|
||||||
|
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||||
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1 h1:e9Rjr40Z98/clHv5Yg79Is0NtosR5LXRvdr7o/6NwbA=
|
||||||
|
github.com/grpc-ecosystem/grpc-gateway/v2 v2.26.1/go.mod h1:tIxuGz/9mpox++sgp9fJjHO0+q1X9/UOWd798aAm22M=
|
||||||
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
|
||||||
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
|
||||||
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
|
||||||
@ -42,8 +51,8 @@ github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNH
|
|||||||
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
|
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
|
||||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8=
|
github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII=
|
||||||
github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs=
|
github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o=
|
||||||
github.com/sagikazarmark/locafero v0.8.0 h1:mXaMVw7IqxNBxfv3LdWt9MDmcWDQ1fagDH918lOdVaQ=
|
github.com/sagikazarmark/locafero v0.8.0 h1:mXaMVw7IqxNBxfv3LdWt9MDmcWDQ1fagDH918lOdVaQ=
|
||||||
github.com/sagikazarmark/locafero v0.8.0/go.mod h1:UBUyz37V+EdMS3hDF3QWIiVr/2dPrx49OMO0Bn0hJqk=
|
github.com/sagikazarmark/locafero v0.8.0/go.mod h1:UBUyz37V+EdMS3hDF3QWIiVr/2dPrx49OMO0Bn0hJqk=
|
||||||
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
|
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
|
||||||
@ -60,14 +69,48 @@ github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOf
|
|||||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||||
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
|
github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8=
|
||||||
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
|
github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU=
|
||||||
|
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
|
||||||
|
go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A=
|
||||||
|
go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ=
|
||||||
|
go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y=
|
||||||
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0 h1:1fTNlAIJZGWLP5FVu0fikVry1IsiUnXjf7QFvoNN3Xw=
|
||||||
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.35.0/go.mod h1:zjPK58DtkqQFn+YUMbx0M2XV3QgKU0gS9LeGohREyK4=
|
||||||
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0 h1:xJ2qHD0C1BeYVTLLR9sX12+Qb95kfeD/byKj6Ky1pXg=
|
||||||
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.35.0/go.mod h1:u5BF1xyjstDowA1R5QAO9JHzqK+ublenEW/dyqTjBVk=
|
||||||
|
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0 h1:PB3Zrjs1sG1GBX51SXyTSoOTqcDglmsk7nT6tkKPb/k=
|
||||||
|
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.35.0/go.mod h1:U2R3XyVPzn0WX7wOIypPuptulsMcPDPs/oiSVOMVnHY=
|
||||||
|
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0 h1:T0Ec2E+3YZf5bgTNQVet8iTDW7oIk03tXHq+wkwIDnE=
|
||||||
|
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.35.0/go.mod h1:30v2gqH+vYGJsesLWFov8u47EpYTcIQcBjKpI6pJThg=
|
||||||
|
go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M=
|
||||||
|
go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE=
|
||||||
|
go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY=
|
||||||
|
go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg=
|
||||||
|
go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o=
|
||||||
|
go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w=
|
||||||
|
go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs=
|
||||||
|
go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc=
|
||||||
|
go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4=
|
||||||
|
go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4=
|
||||||
|
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||||
|
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||||
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
|
||||||
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||||
|
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
|
||||||
|
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
|
||||||
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
|
golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik=
|
||||||
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||||
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
|
golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY=
|
||||||
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
|
golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4=
|
||||||
|
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a h1:nwKuGPlUAt+aR+pcrkfFRrTU1BVrSmYyYMxYbUIVHr0=
|
||||||
|
google.golang.org/genproto/googleapis/api v0.0.0-20250218202821-56aae31c358a/go.mod h1:3kWAYMk1I75K4vykHtKt2ycnOgpA6974V7bREqbsenU=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a h1:51aaUVRocpvUOSQKM6Q7VuoaktNIaMCLuhZB6DKksq4=
|
||||||
|
google.golang.org/genproto/googleapis/rpc v0.0.0-20250218202821-56aae31c358a/go.mod h1:uRxBH1mhmO8PGhU89cMcHaXKZqO+OfakD8QQO0oYwlQ=
|
||||||
|
google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg=
|
||||||
|
google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec=
|
||||||
|
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
|
||||||
|
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
|
||||||
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
40
main.go
40
main.go
@ -5,27 +5,49 @@ import (
|
|||||||
"ZhenTuLocalPassiveAdapter/config"
|
"ZhenTuLocalPassiveAdapter/config"
|
||||||
"ZhenTuLocalPassiveAdapter/core"
|
"ZhenTuLocalPassiveAdapter/core"
|
||||||
"ZhenTuLocalPassiveAdapter/dto"
|
"ZhenTuLocalPassiveAdapter/dto"
|
||||||
|
"ZhenTuLocalPassiveAdapter/telemetry"
|
||||||
|
"context"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var tracer = otel.Tracer("vpt")
|
||||||
|
|
||||||
func startTask(device config.DeviceMapping, task dto.Task) {
|
func startTask(device config.DeviceMapping, task dto.Task) {
|
||||||
fo, err := core.HandleTask(device, task)
|
ctx, span := tracer.Start(context.Background(), "startTask")
|
||||||
|
span.SetAttributes(attribute.String("deviceNo", device.DeviceNo))
|
||||||
|
span.SetAttributes(attribute.String("taskId", task.TaskID))
|
||||||
|
span.SetAttributes(attribute.String("scenicId", task.ScenicID))
|
||||||
|
span.SetAttributes(attribute.String("startTime", task.StartTime.Format("2006-01-02 15:04:05")))
|
||||||
|
span.SetAttributes(attribute.String("endTime", task.EndTime.Format("2006-01-02 15:04:05")))
|
||||||
|
fo, err := core.HandleTask(ctx, device, task)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetStatus(codes.Error, "处理任务失败")
|
||||||
log.Printf("处理任务失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
|
log.Printf("处理任务失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
|
||||||
api.ReportTaskFailure(task.TaskID)
|
api.ReportTaskFailure(ctx, task.TaskID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
span.SetAttributes(attribute.String("fileUrl", fo.URL))
|
||||||
log.Printf("处理任务成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo)
|
log.Printf("处理任务成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo)
|
||||||
err = api.UploadTaskFile(task, *fo)
|
err = api.UploadTaskFile(ctx, task, *fo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetStatus(codes.Error, "上传文件失败")
|
||||||
log.Printf("上传文件失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
|
log.Printf("上传文件失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
|
||||||
api.ReportTaskFailure(task.TaskID)
|
api.ReportTaskFailure(ctx, task.TaskID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
result := api.ReportTaskSuccess(ctx, task.TaskID, fo)
|
||||||
|
if result {
|
||||||
|
span.SetStatus(codes.Error, "上报任务成功失败")
|
||||||
|
log.Printf("上报任务成功失败, TaskID:【%s】, DeviceNo: %s, 错误: %v\n", task.TaskID, task.DeviceNo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
span.SetStatus(codes.Ok, "上传文件成功")
|
||||||
log.Printf("上传文件成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo)
|
log.Printf("上传文件成功, TaskID:【%s】, DeviceNo: %s\n", task.TaskID, task.DeviceNo)
|
||||||
api.ReportTaskSuccess(task.TaskID, fo)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -36,7 +58,13 @@ func main() {
|
|||||||
}
|
}
|
||||||
// 日志文件路径
|
// 日志文件路径
|
||||||
logFilePath := "app.log"
|
logFilePath := "app.log"
|
||||||
|
ctx := context.Background()
|
||||||
|
shutdown, err := telemetry.InitTelemetry(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("Failed to initialize telemetry: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer shutdown(ctx)
|
||||||
// 创建或打开日志文件
|
// 创建或打开日志文件
|
||||||
logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
|
logFile, err := os.OpenFile(logFilePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
69
telemetry/init.go
Normal file
69
telemetry/init.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
package telemetry
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"go.opentelemetry.io/otel"
|
||||||
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
||||||
|
"go.opentelemetry.io/otel/propagation"
|
||||||
|
"go.opentelemetry.io/otel/sdk/trace"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func InitTelemetry(ctx context.Context) (shutdown func(context.Context) error, err error) {
|
||||||
|
var shutdownFuncs []func(context.Context) error
|
||||||
|
|
||||||
|
// shutdown 会调用通过 shutdownFuncs 注册的清理函数。
|
||||||
|
// 调用产生的错误会被合并。
|
||||||
|
// 每个注册的清理函数将被调用一次。
|
||||||
|
shutdown = func(ctx context.Context) error {
|
||||||
|
var err error
|
||||||
|
for _, fn := range shutdownFuncs {
|
||||||
|
err = errors.Join(err, fn(ctx))
|
||||||
|
}
|
||||||
|
shutdownFuncs = nil
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleErr 调用 shutdown 进行清理,并确保返回所有错误信息。
|
||||||
|
handleErr := func(inErr error) {
|
||||||
|
err = errors.Join(inErr, shutdown(ctx))
|
||||||
|
}
|
||||||
|
|
||||||
|
// 设置传播器
|
||||||
|
prop := newPropagator()
|
||||||
|
otel.SetTextMapPropagator(prop)
|
||||||
|
|
||||||
|
// 设置 trace provider.
|
||||||
|
tracerProvider, err := newJaegerTraceProvider(ctx)
|
||||||
|
if err != nil {
|
||||||
|
handleErr(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown)
|
||||||
|
otel.SetTracerProvider(tracerProvider)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPropagator() propagation.TextMapPropagator {
|
||||||
|
return propagation.NewCompositeTextMapPropagator(
|
||||||
|
propagation.TraceContext{},
|
||||||
|
propagation.Baggage{},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newJaegerTraceProvider(ctx context.Context) (*trace.TracerProvider, error) {
|
||||||
|
// 创建一个使用 HTTP 协议连接本机Jaeger的 Exporter
|
||||||
|
traceExporter, err := otlptracehttp.New(ctx,
|
||||||
|
otlptracehttp.WithEndpointURL("https://oltp.jerryyan.top/v1/traces"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
traceProvider := trace.NewTracerProvider(
|
||||||
|
trace.WithBatcher(traceExporter,
|
||||||
|
// 默认为 5s。为便于演示,设置为 1s。
|
||||||
|
trace.WithBatchTimeout(time.Second)),
|
||||||
|
)
|
||||||
|
return traceProvider, nil
|
||||||
|
}
|
130
util/ffmpeg.go
130
util/ffmpeg.go
@ -3,7 +3,10 @@ package util
|
|||||||
import (
|
import (
|
||||||
"ZhenTuLocalPassiveAdapter/dto"
|
"ZhenTuLocalPassiveAdapter/dto"
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"go.opentelemetry.io/otel/attribute"
|
||||||
|
"go.opentelemetry.io/otel/codes"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
@ -17,25 +20,36 @@ import (
|
|||||||
|
|
||||||
const FfmpegExec = "ffmpeg"
|
const FfmpegExec = "ffmpeg"
|
||||||
|
|
||||||
func RunFfmpegTask(task *dto.FfmpegTask) bool {
|
func RunFfmpegTask(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||||
|
_, span := tracer.Start(ctx, "RunFfmpegTask")
|
||||||
|
defer span.End()
|
||||||
var result bool
|
var result bool
|
||||||
if len(task.Files) == 1 {
|
if len(task.Files) == 1 {
|
||||||
// 单个文件切割,用简单方法
|
// 单个文件切割,用简单方法
|
||||||
result = runFfmpegForSingleFile(task)
|
result = runFfmpegForSingleFile(ctx, task)
|
||||||
} else {
|
} else {
|
||||||
// 多个文件切割,用速度快的
|
// 多个文件切割,用速度快的
|
||||||
result = runFfmpegForMultipleFile1(task)
|
result = runFfmpegForMultipleFile1(ctx, task)
|
||||||
}
|
}
|
||||||
// 先尝试方法1
|
// 先尝试方法1
|
||||||
if result {
|
if result {
|
||||||
|
span.SetStatus(codes.Ok, "FFMPEG简易方法成功")
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
log.Printf("FFMPEG简易方法失败,尝试复杂方法转码")
|
log.Printf("FFMPEG简易方法失败,尝试复杂方法转码")
|
||||||
// 不行再尝试方法二
|
// 不行再尝试方法二
|
||||||
return runFfmpegForMultipleFile2(task)
|
result = runFfmpegForMultipleFile2(ctx, task)
|
||||||
|
if result {
|
||||||
|
span.SetStatus(codes.Ok, "FFMPEG复杂方法成功")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
span.SetStatus(codes.Error, "FFMPEG复杂方法失败")
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func runFfmpegForMultipleFile1(task *dto.FfmpegTask) bool {
|
func runFfmpegForMultipleFile1(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||||
|
_, span := tracer.Start(ctx, "runFfmpegForMultipleFile1")
|
||||||
|
defer span.End()
|
||||||
// 多文件,方法一:先转换成ts,然后合并切割
|
// 多文件,方法一:先转换成ts,然后合并切割
|
||||||
// 步骤一:先转换成ts,并行转换
|
// 步骤一:先转换成ts,并行转换
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
@ -47,7 +61,7 @@ func runFfmpegForMultipleFile1(task *dto.FfmpegTask) bool {
|
|||||||
go func(file *dto.File) {
|
go func(file *dto.File) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
tmpFile := path.Join(os.TempDir(), file.Name+".ts")
|
tmpFile := path.Join(os.TempDir(), file.Name+".ts")
|
||||||
result, err := convertMp4ToTs(*file, tmpFile)
|
result, err := convertMp4ToTs(ctx, *file, tmpFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("转码出错: %v", err)
|
log.Printf("转码出错: %v", err)
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
@ -69,12 +83,15 @@ func runFfmpegForMultipleFile1(task *dto.FfmpegTask) bool {
|
|||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|
||||||
if notOk {
|
if notOk {
|
||||||
|
span.SetStatus(codes.Error, "FFMPEG多文件转码失败")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// 步骤二:使用concat协议拼接裁切
|
// 步骤二:使用concat协议拼接裁切
|
||||||
result, err := QuickConcatVideoCut(task.Files, int64(task.Offset), int64(task.Length), task.OutputFile)
|
result, err := QuickConcatVideoCut(ctx, task.Files, int64(task.Offset), int64(task.Length), task.OutputFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "FFMPEG多文件concat协议转码失败")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -84,34 +101,49 @@ func runFfmpegForMultipleFile1(task *dto.FfmpegTask) bool {
|
|||||||
log.Printf("删除临时文件失败: %v", err)
|
log.Printf("删除临时文件失败: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if result {
|
||||||
|
span.SetStatus(codes.Ok, "FFMPEG多文件concat协议转码成功")
|
||||||
|
} else {
|
||||||
|
span.SetStatus(codes.Error, "FFMPEG多文件concat协议转码失败")
|
||||||
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func runFfmpegForMultipleFile2(task *dto.FfmpegTask) bool {
|
func runFfmpegForMultipleFile2(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||||
|
_, span := tracer.Start(ctx, "runFfmpegForMultipleFile2")
|
||||||
|
defer span.End()
|
||||||
// 多文件,方法二:使用计算资源编码
|
// 多文件,方法二:使用计算资源编码
|
||||||
result, err := SlowVideoCut(task.Files, int64(task.Offset), int64(task.Length), task.OutputFile)
|
result, err := SlowVideoCut(ctx, task.Files, int64(task.Offset), int64(task.Length), task.OutputFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func runFfmpegForSingleFile(task *dto.FfmpegTask) bool {
|
func runFfmpegForSingleFile(ctx context.Context, task *dto.FfmpegTask) bool {
|
||||||
result, err := QuickVideoCut(task.Files[0].Url, int64(task.Offset), int64(task.Length), task.OutputFile)
|
_, span := tracer.Start(ctx, "runFfmpegForSingleFile")
|
||||||
|
defer span.End()
|
||||||
|
result, err := QuickVideoCut(ctx, task.Files[0].Url, int64(task.Offset), int64(task.Length), task.OutputFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetStatus(codes.Error, "FFMPEG单个文件裁切失败")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
_, err = os.Stat(task.OutputFile)
|
stat, err := os.Stat(task.OutputFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetStatus(codes.Error, "文件不存在")
|
||||||
log.Printf("文件不存在:%s", task.OutputFile)
|
log.Printf("文件不存在:%s", task.OutputFile)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
span.SetAttributes(attribute.String("file.name", task.OutputFile))
|
||||||
|
span.SetAttributes(attribute.Int64("file.size", stat.Size()))
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func CheckFileCoverageAndConstructTask(fileList []dto.File, beginDt, endDt time.Time, task dto.Task) (*dto.FfmpegTask, error) {
|
func CheckFileCoverageAndConstructTask(ctx context.Context, fileList []dto.File, beginDt, endDt time.Time, task dto.Task) (*dto.FfmpegTask, error) {
|
||||||
|
_, span := tracer.Start(ctx, "CheckFileCoverageAndConstructTask")
|
||||||
|
defer span.End()
|
||||||
if fileList == nil || len(fileList) == 0 {
|
if fileList == nil || len(fileList) == 0 {
|
||||||
|
span.SetStatus(codes.Error, "无法根据要求找到对应录制片段")
|
||||||
log.Printf("无法根据要求找到对应录制片段!ID:【%s】,开始时间:【%s】,结束时间:【%s】", task.TaskID, beginDt, endDt)
|
log.Printf("无法根据要求找到对应录制片段!ID:【%s】,开始时间:【%s】,结束时间:【%s】", task.TaskID, beginDt, endDt)
|
||||||
return nil, fmt.Errorf("无法根据要求找到对应录制片段")
|
return nil, fmt.Errorf("无法根据要求找到对应录制片段")
|
||||||
}
|
}
|
||||||
@ -126,6 +158,7 @@ func CheckFileCoverageAndConstructTask(fileList []dto.File, beginDt, endDt time.
|
|||||||
}
|
}
|
||||||
if file.StartTime.Sub(lastFile.EndTime).Milliseconds() > 2000 {
|
if file.StartTime.Sub(lastFile.EndTime).Milliseconds() > 2000 {
|
||||||
// 片段断开
|
// 片段断开
|
||||||
|
span.SetStatus(codes.Error, "FFMPEG片段断开")
|
||||||
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s,%s】中间断开【%f】秒(超过2秒)", task.TaskID, lastFile.Name, file.Name, file.StartTime.Sub(lastFile.EndTime).Seconds())
|
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s,%s】中间断开【%f】秒(超过2秒)", task.TaskID, lastFile.Name, file.Name, file.StartTime.Sub(lastFile.EndTime).Seconds())
|
||||||
return nil, fmt.Errorf("片段断开")
|
return nil, fmt.Errorf("片段断开")
|
||||||
}
|
}
|
||||||
@ -135,6 +168,7 @@ func CheckFileCoverageAndConstructTask(fileList []dto.File, beginDt, endDt time.
|
|||||||
|
|
||||||
// 通过文件列表构造的任务仍然是缺失的
|
// 通过文件列表构造的任务仍然是缺失的
|
||||||
if fileList[len(fileList)-1].EndTime.Before(endDt) {
|
if fileList[len(fileList)-1].EndTime.Before(endDt) {
|
||||||
|
span.SetStatus(codes.Error, "FFMPEG片段断开")
|
||||||
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s】,无法完整覆盖时间点【%s】", task.TaskID, fileList[len(fileList)-1].Name, endDt)
|
log.Printf("分析FFMPEG任务失败:ID:【%s】,文件片段:【%s】,无法完整覆盖时间点【%s】", task.TaskID, fileList[len(fileList)-1].Name, endDt)
|
||||||
return nil, fmt.Errorf("片段断开")
|
return nil, fmt.Errorf("片段断开")
|
||||||
}
|
}
|
||||||
@ -146,11 +180,16 @@ func CheckFileCoverageAndConstructTask(fileList []dto.File, beginDt, endDt time.
|
|||||||
Offset: int(beginDt.Sub(fileList[0].StartTime).Seconds()),
|
Offset: int(beginDt.Sub(fileList[0].StartTime).Seconds()),
|
||||||
OutputFile: path.Join(os.TempDir(), task.TaskID+".mp4"),
|
OutputFile: path.Join(os.TempDir(), task.TaskID+".mp4"),
|
||||||
}
|
}
|
||||||
|
span.SetAttributes(attribute.Int("task.files", len(ffmpegTask.Files)))
|
||||||
|
span.SetAttributes(attribute.Int("task.offset", ffmpegTask.Offset))
|
||||||
|
span.SetAttributes(attribute.Int("task.length", ffmpegTask.Length))
|
||||||
|
span.SetStatus(codes.Ok, "FFMPEG任务构造成功")
|
||||||
return ffmpegTask, nil
|
return ffmpegTask, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func convertMp4ToTs(file dto.File, outFileName string) (bool, error) {
|
func convertMp4ToTs(ctx context.Context, file dto.File, outFileName string) (bool, error) {
|
||||||
|
_, span := tracer.Start(ctx, "convertMp4ToTs")
|
||||||
|
defer span.End()
|
||||||
ffmpegCmd := []string{
|
ffmpegCmd := []string{
|
||||||
FfmpegExec,
|
FfmpegExec,
|
||||||
"-hide_banner",
|
"-hide_banner",
|
||||||
@ -161,10 +200,12 @@ func convertMp4ToTs(file dto.File, outFileName string) (bool, error) {
|
|||||||
"-f", "mpegts",
|
"-f", "mpegts",
|
||||||
outFileName,
|
outFileName,
|
||||||
}
|
}
|
||||||
return handleFfmpegProcess(ffmpegCmd)
|
return handleFfmpegProcess(ctx, ffmpegCmd)
|
||||||
}
|
}
|
||||||
|
|
||||||
func convertHevcToTs(file dto.File, outFileName string) (bool, error) {
|
func convertHevcToTs(ctx context.Context, file dto.File, outFileName string) (bool, error) {
|
||||||
|
_, span := tracer.Start(ctx, "convertHevcToTs")
|
||||||
|
defer span.End()
|
||||||
ffmpegCmd := []string{
|
ffmpegCmd := []string{
|
||||||
FfmpegExec,
|
FfmpegExec,
|
||||||
"-hide_banner",
|
"-hide_banner",
|
||||||
@ -175,10 +216,12 @@ func convertHevcToTs(file dto.File, outFileName string) (bool, error) {
|
|||||||
"-f", "mpegts",
|
"-f", "mpegts",
|
||||||
outFileName,
|
outFileName,
|
||||||
}
|
}
|
||||||
return handleFfmpegProcess(ffmpegCmd)
|
return handleFfmpegProcess(ctx, ffmpegCmd)
|
||||||
}
|
}
|
||||||
|
|
||||||
func QuickVideoCut(inputFile string, offset, length int64, outputFile string) (bool, error) {
|
func QuickVideoCut(ctx context.Context, inputFile string, offset, length int64, outputFile string) (bool, error) {
|
||||||
|
_, span := tracer.Start(ctx, "QuickVideoCut")
|
||||||
|
defer span.End()
|
||||||
ffmpegCmd := []string{
|
ffmpegCmd := []string{
|
||||||
FfmpegExec,
|
FfmpegExec,
|
||||||
"-hide_banner",
|
"-hide_banner",
|
||||||
@ -189,16 +232,21 @@ func QuickVideoCut(inputFile string, offset, length int64, outputFile string) (b
|
|||||||
"-reset_timestamps", "1",
|
"-reset_timestamps", "1",
|
||||||
"-ss", strconv.FormatInt(offset, 10),
|
"-ss", strconv.FormatInt(offset, 10),
|
||||||
"-t", strconv.FormatInt(length, 10),
|
"-t", strconv.FormatInt(length, 10),
|
||||||
|
"-fflags", "+genpts",
|
||||||
"-f", "mp4",
|
"-f", "mp4",
|
||||||
outputFile,
|
outputFile,
|
||||||
}
|
}
|
||||||
return handleFfmpegProcess(ffmpegCmd)
|
return handleFfmpegProcess(ctx, ffmpegCmd)
|
||||||
}
|
}
|
||||||
|
|
||||||
func QuickConcatVideoCut(inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) {
|
func QuickConcatVideoCut(ctx context.Context, inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) {
|
||||||
|
_, span := tracer.Start(ctx, "QuickConcatVideoCut")
|
||||||
|
defer span.End()
|
||||||
tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64())
|
tmpFile := fmt.Sprintf("tmp%.10f.txt", rand.Float64())
|
||||||
tmpFileObj, err := os.Create(tmpFile)
|
tmpFileObj, err := os.Create(tmpFile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "创建临时文件失败")
|
||||||
log.Printf("创建临时文件失败:%s", tmpFile)
|
log.Printf("创建临时文件失败:%s", tmpFile)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -208,6 +256,8 @@ func QuickConcatVideoCut(inputFiles []dto.File, offset, length int64, outputFile
|
|||||||
for _, filePo := range inputFiles {
|
for _, filePo := range inputFiles {
|
||||||
_, err := tmpFileObj.WriteString(fmt.Sprintf("file '%s'\n", filePo.Url))
|
_, err := tmpFileObj.WriteString(fmt.Sprintf("file '%s'\n", filePo.Url))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "写入临时文件失败")
|
||||||
log.Printf("写入临时文件失败:%s", tmpFile)
|
log.Printf("写入临时文件失败:%s", tmpFile)
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -227,10 +277,12 @@ func QuickConcatVideoCut(inputFiles []dto.File, offset, length int64, outputFile
|
|||||||
"-f", "mp4",
|
"-f", "mp4",
|
||||||
outputFile,
|
outputFile,
|
||||||
}
|
}
|
||||||
return handleFfmpegProcess(ffmpegCmd)
|
return handleFfmpegProcess(ctx, ffmpegCmd)
|
||||||
}
|
}
|
||||||
|
|
||||||
func SlowVideoCut(inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) {
|
func SlowVideoCut(ctx context.Context, inputFiles []dto.File, offset, length int64, outputFile string) (bool, error) {
|
||||||
|
_, span := tracer.Start(ctx, "SlowVideoCut")
|
||||||
|
defer span.End()
|
||||||
ffmpegCmd := []string{
|
ffmpegCmd := []string{
|
||||||
FfmpegExec,
|
FfmpegExec,
|
||||||
"-hide_banner",
|
"-hide_banner",
|
||||||
@ -259,11 +311,17 @@ func SlowVideoCut(inputFiles []dto.File, offset, length int64, outputFile string
|
|||||||
outputFile,
|
outputFile,
|
||||||
)
|
)
|
||||||
|
|
||||||
return handleFfmpegProcess(ffmpegCmd)
|
return handleFfmpegProcess(ctx, ffmpegCmd)
|
||||||
}
|
}
|
||||||
|
|
||||||
func handleFfmpegProcess(ffmpegCmd []string) (bool, error) {
|
func handleFfmpegProcess(ctx context.Context, ffmpegCmd []string) (bool, error) {
|
||||||
|
_, span := tracer.Start(ctx, "handleFfmpegProcess")
|
||||||
|
defer span.End()
|
||||||
|
span.SetAttributes(attribute.String("ffmpeg.cmd", strings.Join(ffmpegCmd, " ")))
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
defer func() {
|
||||||
|
span.SetAttributes(attribute.Int64("ffmpeg.duration", int64(time.Since(startTime).Seconds())))
|
||||||
|
}()
|
||||||
log.Printf("FFMPEG执行命令:【%s】", strings.Join(ffmpegCmd, " "))
|
log.Printf("FFMPEG执行命令:【%s】", strings.Join(ffmpegCmd, " "))
|
||||||
cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...)
|
cmd := exec.Command(ffmpegCmd[0], ffmpegCmd[1:]...)
|
||||||
|
|
||||||
@ -272,6 +330,8 @@ func handleFfmpegProcess(ffmpegCmd []string) (bool, error) {
|
|||||||
|
|
||||||
err := cmd.Start()
|
err := cmd.Start()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
|
||||||
|
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
|
||||||
log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " "))
|
log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " "))
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -284,10 +344,14 @@ func handleFfmpegProcess(ffmpegCmd []string) (bool, error) {
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(1 * time.Minute):
|
case <-time.After(1 * time.Minute):
|
||||||
|
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
|
||||||
|
span.SetStatus(codes.Error, "FFMPEG执行命令没有在1分钟内退出")
|
||||||
log.Printf("FFMPEG执行命令没有在1分钟内退出,命令:【%s】", strings.Join(ffmpegCmd, " "))
|
log.Printf("FFMPEG执行命令没有在1分钟内退出,命令:【%s】", strings.Join(ffmpegCmd, " "))
|
||||||
return false, fmt.Errorf("ffmpeg command timed out")
|
return false, fmt.Errorf("ffmpeg command timed out")
|
||||||
case err := <-done:
|
case err := <-done:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("ffmpeg.stderr", stderr.String()))
|
||||||
|
span.SetStatus(codes.Error, "FFMPEG执行命令失败")
|
||||||
log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " "))
|
log.Printf("FFMPEG执行命令失败,错误信息:%s,命令:【%s】", stderr.String(), strings.Join(ffmpegCmd, " "))
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -297,7 +361,9 @@ func handleFfmpegProcess(ffmpegCmd []string) (bool, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetVideoDuration(filePath string) (float64, error) {
|
func GetVideoDuration(ctx context.Context, filePath string) (float64, error) {
|
||||||
|
_, span := tracer.Start(ctx, "GetVideoDuration")
|
||||||
|
defer span.End()
|
||||||
ffprobeCmd := []string{
|
ffprobeCmd := []string{
|
||||||
"ffprobe",
|
"ffprobe",
|
||||||
"-v", "error",
|
"-v", "error",
|
||||||
@ -305,21 +371,25 @@ func GetVideoDuration(filePath string) (float64, error) {
|
|||||||
"-of", "default=noprint_wrappers=1:nokey=1",
|
"-of", "default=noprint_wrappers=1:nokey=1",
|
||||||
filePath,
|
filePath,
|
||||||
}
|
}
|
||||||
|
span.SetAttributes(attribute.String("ffprobe.cmd", strings.Join(ffprobeCmd, " ")))
|
||||||
cmd := exec.Command(ffprobeCmd[0], ffprobeCmd[1:]...)
|
cmd := exec.Command(ffprobeCmd[0], ffprobeCmd[1:]...)
|
||||||
var out bytes.Buffer
|
var out bytes.Buffer
|
||||||
cmd.Stdout = &out
|
cmd.Stdout = &out
|
||||||
|
|
||||||
err := cmd.Run()
|
err := cmd.Run()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "failed to get video duration")
|
||||||
return 0, fmt.Errorf("failed to get video duration: %w", err)
|
return 0, fmt.Errorf("failed to get video duration: %w", err)
|
||||||
}
|
}
|
||||||
|
span.SetAttributes(attribute.String("ffmpeg.stdout", out.String()))
|
||||||
durationStr := strings.TrimSpace(out.String())
|
durationStr := strings.TrimSpace(out.String())
|
||||||
duration, err := strconv.ParseFloat(durationStr, 64)
|
duration, err := strconv.ParseFloat(durationStr, 64)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
span.SetAttributes(attribute.String("error", err.Error()))
|
||||||
|
span.SetStatus(codes.Error, "failed to parse video duration")
|
||||||
return 0, fmt.Errorf("failed to parse video duration: %w", err)
|
return 0, fmt.Errorf("failed to parse video duration: %w", err)
|
||||||
}
|
}
|
||||||
|
span.SetAttributes(attribute.Float64("video.duration", duration))
|
||||||
return duration, nil
|
return duration, nil
|
||||||
}
|
}
|
||||||
|
5
util/tracer.go
Normal file
5
util/tracer.go
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
package util
|
||||||
|
|
||||||
|
import "go.opentelemetry.io/otel"
|
||||||
|
|
||||||
|
var tracer = otel.Tracer("util")
|
Loading…
x
Reference in New Issue
Block a user