| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284 |
- package ffmpeg
- import (
- "bufio"
- "bytes"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "os"
- "os/exec"
- "regexp"
- "silk2audio/transcoder"
- "silk2audio/transcoder/utils"
- "strconv"
- "strings"
- )
- // Transcoder ...
- type Transcoder struct {
- config *Config
- input string
- output string
- options []string
- metadata *Metadata
- inputPipeReader *io.ReadCloser
- outputPipeReader *io.ReadCloser
- inputPipeWriter *io.WriteCloser
- outputPipeWriter *io.WriteCloser
- }
- // New ...
- func New(cfg *Config) transcoder.Transcoder {
- return &Transcoder{config: cfg}
- }
- // Start ...
- func (t *Transcoder) Start(opts transcoder.Options) (<-chan transcoder.Progress, error) {
- var stderrIn io.ReadCloser
- out := make(chan transcoder.Progress)
- defer t.closePipes()
- // Validates config
- if err := t.validate(); err != nil {
- return nil, err
- }
- // Get file metadata
- //_, err := t.getMetadata()
- //if err != nil {
- // return nil, err
- //}
- var err error
- // Get executable flags
- args := append(opts.GetStrArguments())
- // Append output flag
- args = append(args, []string{"-i", t.input, t.output}...)
- // Initialize command
- cmd := exec.Command(t.config.FfmpegBinPath, args...)
- // If progresss enabled, get stderr pipe and start progress process
- if t.config.ProgressEnabled && !t.config.Verbose {
- stderrIn, err = cmd.StderrPipe()
- if err != nil {
- return nil, fmt.Errorf("Failed getting transcoding progress (%s) with args (%s) with error %s", t.config.FfmpegBinPath, args, err)
- }
- }
- if t.config.Verbose {
- cmd.Stderr = os.Stdout
- }
- // Start process
- err = cmd.Start()
- if err != nil {
- return nil, fmt.Errorf("Failed starting transcoding (%s) with args (%s) with error %s", t.config.FfmpegBinPath, args, err)
- }
- if t.config.ProgressEnabled && !t.config.Verbose {
- go func() {
- t.progress(stderrIn, out)
- }()
- go func() {
- defer close(out)
- err = cmd.Wait()
- }()
- } else {
- err = cmd.Wait()
- }
- return out, nil
- }
- // Input ...
- func (t *Transcoder) Input(arg string) transcoder.Transcoder {
- t.input = arg
- return t
- }
- // Output ...
- func (t *Transcoder) Output(arg string) transcoder.Transcoder {
- t.output = arg
- return t
- }
- // InputPipe ...
- func (t *Transcoder) InputPipe(w *io.WriteCloser, r *io.ReadCloser) transcoder.Transcoder {
- if &t.input == nil {
- t.inputPipeWriter = w
- t.inputPipeReader = r
- }
- return t
- }
- // OutputPipe ...
- func (t *Transcoder) OutputPipe(w *io.WriteCloser, r *io.ReadCloser) transcoder.Transcoder {
- if &t.output == nil {
- t.outputPipeWriter = w
- t.outputPipeReader = r
- }
- return t
- }
- // WithOptions ...
- func (t *Transcoder) WithOptions(opts transcoder.Options) transcoder.Transcoder {
- t.options = opts.GetStrArguments()
- return t
- }
- // validate ...
- func (t *Transcoder) validate() error {
- if t.config.FfmpegBinPath == "" {
- return errors.New("ffmpeg binary path not found")
- }
- if t.input == "" {
- return errors.New("missing input option")
- }
- if t.output == "" {
- return errors.New("missing output option")
- }
- return nil
- }
- func (t *Transcoder) getMetadata() (metadata *Metadata, err error) {
- if t.config.FfprobeBinPath != "" {
- var outb, errb bytes.Buffer
- input := t.input
- if t.inputPipeReader != nil {
- input = "pipe:"
- }
- args := []string{"-i", input, "-print_format", "json", "-show_format", "-show_streams", "-show_error"}
- cmd := exec.Command(t.config.FfprobeBinPath, args...)
- cmd.Stdout = &outb
- cmd.Stderr = &errb
- err := cmd.Run()
- if err != nil {
- return nil, fmt.Errorf("error executing (%s) with args (%s) | error: %s | message: %s %s", t.config.FfprobeBinPath, args, err, outb.String(), errb.String())
- }
- if err = json.Unmarshal([]byte(outb.String()), &metadata); err != nil {
- return nil, err
- }
- t.metadata = metadata
- return metadata, nil
- }
- return nil, errors.New("ffprobe binary not found")
- }
- // progress sends through given channel the transcoding status
- func (t *Transcoder) progress(stream io.ReadCloser, out chan transcoder.Progress) {
- defer stream.Close()
- split := func(data []byte, atEOF bool) (advance int, token []byte, spliterror error) {
- if atEOF && len(data) == 0 {
- return 0, nil, nil
- }
- if i := bytes.IndexByte(data, '\n'); i >= 0 {
- // We have a full newline-terminated line.
- return i + 1, data[0:i], nil
- }
- if i := bytes.IndexByte(data, '\r'); i >= 0 {
- // We have a cr terminated line
- return i + 1, data[0:i], nil
- }
- if atEOF {
- return len(data), data, nil
- }
- return 0, nil, nil
- }
- scanner := bufio.NewScanner(stream)
- scanner.Split(split)
- buf := make([]byte, 2)
- scanner.Buffer(buf, bufio.MaxScanTokenSize)
- for scanner.Scan() {
- Progress := new(Progress)
- line := scanner.Text()
- if strings.Contains(line, "frame=") && strings.Contains(line, "time=") && strings.Contains(line, "bitrate=") {
- var re = regexp.MustCompile(`=\s+`)
- st := re.ReplaceAllString(line, `=`)
- f := strings.Fields(st)
- var framesProcessed string
- var currentTime string
- var currentBitrate string
- var currentSpeed string
- for j := 0; j < len(f); j++ {
- field := f[j]
- fieldSplit := strings.Split(field, "=")
- if len(fieldSplit) > 1 {
- fieldname := strings.Split(field, "=")[0]
- fieldvalue := strings.Split(field, "=")[1]
- if fieldname == "frame" {
- framesProcessed = fieldvalue
- }
- if fieldname == "time" {
- currentTime = fieldvalue
- }
- if fieldname == "bitrate" {
- currentBitrate = fieldvalue
- }
- if fieldname == "speed" {
- currentSpeed = fieldvalue
- }
- }
- }
- timesec := utils.DurToSec(currentTime)
- dursec, _ := strconv.ParseFloat(t.metadata.Format.Duration, 64)
- progress := (timesec * 100) / dursec
- Progress.Progress = progress
- Progress.CurrentBitrate = currentBitrate
- Progress.FramesProcessed = framesProcessed
- Progress.CurrentTime = currentTime
- Progress.Speed = currentSpeed
- out <- *Progress
- }
- }
- }
- // closePipes Closes pipes if opened
- func (t *Transcoder) closePipes() {
- if t.inputPipeReader != nil {
- ipr := *t.inputPipeReader
- ipr.Close()
- }
- if t.outputPipeWriter != nil {
- opr := *t.outputPipeWriter
- opr.Close()
- }
- }
|