ffmpeg.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. package ffmpeg
  2. import (
  3. "bufio"
  4. "bytes"
  5. "encoding/json"
  6. "errors"
  7. "fmt"
  8. "io"
  9. "os"
  10. "os/exec"
  11. "regexp"
  12. "silk2audio/transcoder"
  13. "silk2audio/transcoder/utils"
  14. "strconv"
  15. "strings"
  16. )
  17. // Transcoder ...
  18. type Transcoder struct {
  19. config *Config
  20. input string
  21. output string
  22. options []string
  23. metadata *Metadata
  24. inputPipeReader *io.ReadCloser
  25. outputPipeReader *io.ReadCloser
  26. inputPipeWriter *io.WriteCloser
  27. outputPipeWriter *io.WriteCloser
  28. }
  29. // New ...
  30. func New(cfg *Config) transcoder.Transcoder {
  31. return &Transcoder{config: cfg}
  32. }
  33. // Start ...
  34. func (t *Transcoder) Start(opts transcoder.Options) (<-chan transcoder.Progress, error) {
  35. var stderrIn io.ReadCloser
  36. out := make(chan transcoder.Progress)
  37. defer t.closePipes()
  38. // Validates config
  39. if err := t.validate(); err != nil {
  40. return nil, err
  41. }
  42. // Get file metadata
  43. //_, err := t.getMetadata()
  44. //if err != nil {
  45. // return nil, err
  46. //}
  47. var err error
  48. // Get executable flags
  49. args := append(opts.GetStrArguments())
  50. // Append output flag
  51. args = append(args, []string{"-i", t.input, t.output}...)
  52. // Initialize command
  53. cmd := exec.Command(t.config.FfmpegBinPath, args...)
  54. // If progresss enabled, get stderr pipe and start progress process
  55. if t.config.ProgressEnabled && !t.config.Verbose {
  56. stderrIn, err = cmd.StderrPipe()
  57. if err != nil {
  58. return nil, fmt.Errorf("Failed getting transcoding progress (%s) with args (%s) with error %s", t.config.FfmpegBinPath, args, err)
  59. }
  60. }
  61. if t.config.Verbose {
  62. cmd.Stderr = os.Stdout
  63. }
  64. // Start process
  65. err = cmd.Start()
  66. if err != nil {
  67. return nil, fmt.Errorf("Failed starting transcoding (%s) with args (%s) with error %s", t.config.FfmpegBinPath, args, err)
  68. }
  69. if t.config.ProgressEnabled && !t.config.Verbose {
  70. go func() {
  71. t.progress(stderrIn, out)
  72. }()
  73. go func() {
  74. defer close(out)
  75. err = cmd.Wait()
  76. }()
  77. } else {
  78. err = cmd.Wait()
  79. }
  80. return out, nil
  81. }
  82. // Input ...
  83. func (t *Transcoder) Input(arg string) transcoder.Transcoder {
  84. t.input = arg
  85. return t
  86. }
  87. // Output ...
  88. func (t *Transcoder) Output(arg string) transcoder.Transcoder {
  89. t.output = arg
  90. return t
  91. }
  92. // InputPipe ...
  93. func (t *Transcoder) InputPipe(w *io.WriteCloser, r *io.ReadCloser) transcoder.Transcoder {
  94. if &t.input == nil {
  95. t.inputPipeWriter = w
  96. t.inputPipeReader = r
  97. }
  98. return t
  99. }
  100. // OutputPipe ...
  101. func (t *Transcoder) OutputPipe(w *io.WriteCloser, r *io.ReadCloser) transcoder.Transcoder {
  102. if &t.output == nil {
  103. t.outputPipeWriter = w
  104. t.outputPipeReader = r
  105. }
  106. return t
  107. }
  108. // WithOptions ...
  109. func (t *Transcoder) WithOptions(opts transcoder.Options) transcoder.Transcoder {
  110. t.options = opts.GetStrArguments()
  111. return t
  112. }
  113. // validate ...
  114. func (t *Transcoder) validate() error {
  115. if t.config.FfmpegBinPath == "" {
  116. return errors.New("ffmpeg binary path not found")
  117. }
  118. if t.input == "" {
  119. return errors.New("missing input option")
  120. }
  121. if t.output == "" {
  122. return errors.New("missing output option")
  123. }
  124. return nil
  125. }
  126. func (t *Transcoder) getMetadata() (metadata *Metadata, err error) {
  127. if t.config.FfprobeBinPath != "" {
  128. var outb, errb bytes.Buffer
  129. input := t.input
  130. if t.inputPipeReader != nil {
  131. input = "pipe:"
  132. }
  133. args := []string{"-i", input, "-print_format", "json", "-show_format", "-show_streams", "-show_error"}
  134. cmd := exec.Command(t.config.FfprobeBinPath, args...)
  135. cmd.Stdout = &outb
  136. cmd.Stderr = &errb
  137. err := cmd.Run()
  138. if err != nil {
  139. return nil, fmt.Errorf("error executing (%s) with args (%s) | error: %s | message: %s %s", t.config.FfprobeBinPath, args, err, outb.String(), errb.String())
  140. }
  141. if err = json.Unmarshal([]byte(outb.String()), &metadata); err != nil {
  142. return nil, err
  143. }
  144. t.metadata = metadata
  145. return metadata, nil
  146. }
  147. return nil, errors.New("ffprobe binary not found")
  148. }
  149. // progress sends through given channel the transcoding status
  150. func (t *Transcoder) progress(stream io.ReadCloser, out chan transcoder.Progress) {
  151. defer stream.Close()
  152. split := func(data []byte, atEOF bool) (advance int, token []byte, spliterror error) {
  153. if atEOF && len(data) == 0 {
  154. return 0, nil, nil
  155. }
  156. if i := bytes.IndexByte(data, '\n'); i >= 0 {
  157. // We have a full newline-terminated line.
  158. return i + 1, data[0:i], nil
  159. }
  160. if i := bytes.IndexByte(data, '\r'); i >= 0 {
  161. // We have a cr terminated line
  162. return i + 1, data[0:i], nil
  163. }
  164. if atEOF {
  165. return len(data), data, nil
  166. }
  167. return 0, nil, nil
  168. }
  169. scanner := bufio.NewScanner(stream)
  170. scanner.Split(split)
  171. buf := make([]byte, 2)
  172. scanner.Buffer(buf, bufio.MaxScanTokenSize)
  173. for scanner.Scan() {
  174. Progress := new(Progress)
  175. line := scanner.Text()
  176. if strings.Contains(line, "frame=") && strings.Contains(line, "time=") && strings.Contains(line, "bitrate=") {
  177. var re = regexp.MustCompile(`=\s+`)
  178. st := re.ReplaceAllString(line, `=`)
  179. f := strings.Fields(st)
  180. var framesProcessed string
  181. var currentTime string
  182. var currentBitrate string
  183. var currentSpeed string
  184. for j := 0; j < len(f); j++ {
  185. field := f[j]
  186. fieldSplit := strings.Split(field, "=")
  187. if len(fieldSplit) > 1 {
  188. fieldname := strings.Split(field, "=")[0]
  189. fieldvalue := strings.Split(field, "=")[1]
  190. if fieldname == "frame" {
  191. framesProcessed = fieldvalue
  192. }
  193. if fieldname == "time" {
  194. currentTime = fieldvalue
  195. }
  196. if fieldname == "bitrate" {
  197. currentBitrate = fieldvalue
  198. }
  199. if fieldname == "speed" {
  200. currentSpeed = fieldvalue
  201. }
  202. }
  203. }
  204. timesec := utils.DurToSec(currentTime)
  205. dursec, _ := strconv.ParseFloat(t.metadata.Format.Duration, 64)
  206. progress := (timesec * 100) / dursec
  207. Progress.Progress = progress
  208. Progress.CurrentBitrate = currentBitrate
  209. Progress.FramesProcessed = framesProcessed
  210. Progress.CurrentTime = currentTime
  211. Progress.Speed = currentSpeed
  212. out <- *Progress
  213. }
  214. }
  215. }
  216. // closePipes Closes pipes if opened
  217. func (t *Transcoder) closePipes() {
  218. if t.inputPipeReader != nil {
  219. ipr := *t.inputPipeReader
  220. ipr.Close()
  221. }
  222. if t.outputPipeWriter != nil {
  223. opr := *t.outputPipeWriter
  224. opr.Close()
  225. }
  226. }