1 import { omit, pick, wait } from '@shared/core-utils'
8 isHLSTranscodingPayloadSuccess,
9 isLiveRTMPHLSTranscodingUpdatePayload,
10 isWebVideoOrAudioMergeTranscodingPayloadSuccess,
12 RequestRunnerJobResult,
15 RunnerJobLiveRTMPHLSTranscodingPayload,
19 RunnerJobSuccessPayload,
23 } from '@shared/models'
24 import { unwrapBody } from '../requests'
25 import { waitJobs } from '../server'
26 import { AbstractCommand, OverrideCommandOptions } from '../shared'
28 export class RunnerJobsCommand extends AbstractCommand {
30 list (options: OverrideCommandOptions & {
36 const path = '/api/v1/runners/jobs'
38 return this.getRequestBody<ResultList<RunnerJobAdmin>>({
42 query: pick(options, [ 'start', 'count', 'sort', 'search' ]),
44 defaultExpectedStatus: HttpStatusCode.OK_200
48 cancelByAdmin (options: OverrideCommandOptions & { jobUUID: string }) {
49 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/cancel'
51 return this.postBodyRequest({
56 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
60 // ---------------------------------------------------------------------------
62 request (options: OverrideCommandOptions & RequestRunnerJobBody) {
63 const path = '/api/v1/runners/jobs/request'
65 return unwrapBody<RequestRunnerJobResult>(this.postBodyRequest({
69 fields: pick(options, [ 'runnerToken' ]),
71 defaultExpectedStatus: HttpStatusCode.OK_200
75 async requestVOD (options: OverrideCommandOptions & RequestRunnerJobBody) {
76 const vodTypes = new Set<RunnerJobType>([ 'vod-audio-merge-transcoding', 'vod-hls-transcoding', 'vod-web-video-transcoding' ])
78 const { availableJobs } = await this.request(options)
81 availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
82 } as RequestRunnerJobResult<RunnerJobVODPayload>
85 async requestLive (options: OverrideCommandOptions & RequestRunnerJobBody) {
86 const vodTypes = new Set<RunnerJobType>([ 'live-rtmp-hls-transcoding' ])
88 const { availableJobs } = await this.request(options)
91 availableJobs: availableJobs.filter(j => vodTypes.has(j.type))
92 } as RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>
95 // ---------------------------------------------------------------------------
97 accept <T extends RunnerJobPayload = RunnerJobPayload> (options: OverrideCommandOptions & AcceptRunnerJobBody & { jobUUID: string }) {
98 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/accept'
100 return unwrapBody<AcceptRunnerJobResult<T>>(this.postBodyRequest({
104 fields: pick(options, [ 'runnerToken' ]),
105 implicitToken: false,
106 defaultExpectedStatus: HttpStatusCode.OK_200
110 abort (options: OverrideCommandOptions & AbortRunnerJobBody & { jobUUID: string }) {
111 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/abort'
113 return this.postBodyRequest({
117 fields: pick(options, [ 'reason', 'jobToken', 'runnerToken' ]),
118 implicitToken: false,
119 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
123 update (options: OverrideCommandOptions & RunnerJobUpdateBody & { jobUUID: string }) {
124 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/update'
126 const { payload } = options
127 const attaches: { [id: string]: any } = {}
128 let payloadWithoutFiles = payload
130 if (isLiveRTMPHLSTranscodingUpdatePayload(payload)) {
131 if (payload.masterPlaylistFile) {
132 attaches[`payload[masterPlaylistFile]`] = payload.masterPlaylistFile
135 attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile
136 attaches[`payload[videoChunkFile]`] = payload.videoChunkFile
138 payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'masterPlaylistFile', 'resolutionPlaylistFile', 'videoChunkFile' ])
141 return this.postUploadRequest({
146 ...pick(options, [ 'progress', 'jobToken', 'runnerToken' ]),
148 payload: payloadWithoutFiles
151 implicitToken: false,
152 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
156 error (options: OverrideCommandOptions & ErrorRunnerJobBody & { jobUUID: string }) {
157 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/error'
159 return this.postBodyRequest({
163 fields: pick(options, [ 'message', 'jobToken', 'runnerToken' ]),
164 implicitToken: false,
165 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
169 success (options: OverrideCommandOptions & RunnerJobSuccessBody & { jobUUID: string }) {
170 const { payload } = options
172 const path = '/api/v1/runners/jobs/' + options.jobUUID + '/success'
173 const attaches: { [id: string]: any } = {}
174 let payloadWithoutFiles = payload
176 if ((isWebVideoOrAudioMergeTranscodingPayloadSuccess(payload) || isHLSTranscodingPayloadSuccess(payload)) && payload.videoFile) {
177 attaches[`payload[videoFile]`] = payload.videoFile
179 payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'videoFile' ])
182 if (isHLSTranscodingPayloadSuccess(payload) && payload.resolutionPlaylistFile) {
183 attaches[`payload[resolutionPlaylistFile]`] = payload.resolutionPlaylistFile
185 payloadWithoutFiles = omit(payloadWithoutFiles as any, [ 'resolutionPlaylistFile' ])
188 return this.postUploadRequest({
194 ...pick(options, [ 'jobToken', 'runnerToken' ]),
196 payload: payloadWithoutFiles
198 implicitToken: false,
199 defaultExpectedStatus: HttpStatusCode.NO_CONTENT_204
203 getJobFile (options: OverrideCommandOptions & { url: string, jobToken: string, runnerToken: string }) {
204 const { host, protocol, pathname } = new URL(options.url)
206 return this.postBodyRequest({
207 url: `${protocol}//${host}`,
210 fields: pick(options, [ 'jobToken', 'runnerToken' ]),
211 implicitToken: false,
212 defaultExpectedStatus: HttpStatusCode.OK_200
216 // ---------------------------------------------------------------------------
218 async autoAccept (options: OverrideCommandOptions & RequestRunnerJobBody & { type?: RunnerJobType }) {
219 const { availableJobs } = await this.request(options)
221 const job = options.type
222 ? availableJobs.find(j => j.type === options.type)
225 return this.accept({ ...options, jobUUID: job.uuid })
228 async autoProcessWebVideoJob (runnerToken: string, jobUUIDToProcess?: string) {
229 let jobUUID = jobUUIDToProcess
232 const { availableJobs } = await this.request({ runnerToken })
233 jobUUID = availableJobs[0].uuid
236 const { job } = await this.accept({ runnerToken, jobUUID })
237 const jobToken = job.jobToken
239 const payload: RunnerJobSuccessPayload = { videoFile: 'video_short.mp4' }
240 await this.success({ runnerToken, jobUUID, jobToken, payload })
242 await waitJobs([ this.server ])
247 async cancelAllJobs (options: { state?: RunnerJobState } = {}) {
248 const { state } = options
250 const { data } = await this.list({ count: 100 })
252 const allowedStates = new Set<RunnerJobState>([
253 RunnerJobState.PENDING,
254 RunnerJobState.PROCESSING,
255 RunnerJobState.WAITING_FOR_PARENT_JOB
258 for (const job of data) {
259 if (state && job.state.id !== state) continue
260 else if (allowedStates.has(job.state.id) !== true) continue
262 await this.cancelByAdmin({ jobUUID: job.uuid })
266 async getJob (options: OverrideCommandOptions & { uuid: string }) {
267 const { data } = await this.list({ ...options, count: 100, sort: '-updatedAt' })
269 return data.find(j => j.uuid === options.uuid)
272 async requestLiveJob (runnerToken: string) {
273 let availableJobs: RequestRunnerJobResult<RunnerJobLiveRTMPHLSTranscodingPayload>['availableJobs'] = []
275 while (availableJobs.length === 0) {
276 const result = await this.requestLive({ runnerToken })
277 availableJobs = result.availableJobs
279 if (availableJobs.length === 1) break
284 return availableJobs[0]