aboutsummaryrefslogtreecommitdiffhomepage
path: root/packages/peertube-runner/shared
diff options
context:
space:
mode:
authorChocobozzz <me@florianbigard.com>2023-04-21 15:05:27 +0200
committerChocobozzz <chocobozzz@cpy.re>2023-05-09 08:57:34 +0200
commit1772b383de490cf406fe93ef3aa3a941f6db513c (patch)
tree7cecc404c8d71951c22079e9bf5180095981b7f9 /packages/peertube-runner/shared
parent118626c8752bee7b05c4e0b668852e1aba2416f1 (diff)
downloadPeerTube-1772b383de490cf406fe93ef3aa3a941f6db513c.tar.gz
PeerTube-1772b383de490cf406fe93ef3aa3a941f6db513c.tar.zst
PeerTube-1772b383de490cf406fe93ef3aa3a941f6db513c.zip
Add peertube runner cli
Diffstat (limited to 'packages/peertube-runner/shared')
-rw-r--r--packages/peertube-runner/shared/config-manager.ts139
-rw-r--r--packages/peertube-runner/shared/http.ts66
-rw-r--r--packages/peertube-runner/shared/index.ts3
-rw-r--r--packages/peertube-runner/shared/ipc/index.ts2
-rw-r--r--packages/peertube-runner/shared/ipc/ipc-client.ts74
-rw-r--r--packages/peertube-runner/shared/ipc/ipc-server.ts61
-rw-r--r--packages/peertube-runner/shared/ipc/shared/index.ts2
-rw-r--r--packages/peertube-runner/shared/ipc/shared/ipc-request.model.ts15
-rw-r--r--packages/peertube-runner/shared/ipc/shared/ipc-response.model.ts15
-rw-r--r--packages/peertube-runner/shared/logger.ts12
10 files changed, 389 insertions, 0 deletions
diff --git a/packages/peertube-runner/shared/config-manager.ts b/packages/peertube-runner/shared/config-manager.ts
new file mode 100644
index 000000000..352bae1fa
--- /dev/null
+++ b/packages/peertube-runner/shared/config-manager.ts
@@ -0,0 +1,139 @@
1import envPaths from 'env-paths'
2import { ensureDir, pathExists, readFile, remove, writeFile } from 'fs-extra'
3import { merge } from 'lodash'
4import { logger } from 'packages/peertube-runner/shared/logger'
5import { dirname, join } from 'path'
6import { parse, stringify } from '@iarna/toml'
7
8const paths = envPaths('peertube-runner')
9
10type Config = {
11 jobs: {
12 concurrency: number
13 }
14
15 ffmpeg: {
16 threads: number
17 nice: number
18 }
19
20 registeredInstances: {
21 url: string
22 runnerToken: string
23 runnerName: string
24 runnerDescription?: string
25 }[]
26}
27
28export class ConfigManager {
29 private static instance: ConfigManager
30
31 private config: Config = {
32 jobs: {
33 concurrency: 2
34 },
35 ffmpeg: {
36 threads: 2,
37 nice: 20
38 },
39 registeredInstances: []
40 }
41
42 private id: string
43 private configFilePath: string
44
45 private constructor () {}
46
47 init (id: string) {
48 this.id = id
49 this.configFilePath = join(this.getConfigDir(), 'config.toml')
50 }
51
52 async load () {
53 logger.info(`Using ${this.configFilePath} as configuration file`)
54
55 if (this.isTestInstance()) {
56 logger.info('Removing configuration file as we are using the "test" id')
57 await remove(this.configFilePath)
58 }
59
60 await ensureDir(dirname(this.configFilePath))
61
62 if (!await pathExists(this.configFilePath)) {
63 await this.save()
64 }
65
66 const file = await readFile(this.configFilePath, 'utf-8')
67
68 this.config = merge(this.config, parse(file))
69 }
70
71 save () {
72 return writeFile(this.configFilePath, stringify(this.config))
73 }
74
75 // ---------------------------------------------------------------------------
76
77 async setRegisteredInstances (registeredInstances: {
78 url: string
79 runnerToken: string
80 runnerName: string
81 runnerDescription?: string
82 }[]) {
83 this.config.registeredInstances = registeredInstances
84
85 await this.save()
86 }
87
88 // ---------------------------------------------------------------------------
89
90 getConfig () {
91 return this.deepFreeze(this.config)
92 }
93
94 // ---------------------------------------------------------------------------
95
96 getTranscodingDirectory () {
97 return join(paths.cache, this.id, 'transcoding')
98 }
99
100 getSocketDirectory () {
101 return join(paths.data, this.id)
102 }
103
104 getSocketPath () {
105 return join(this.getSocketDirectory(), 'peertube-runner.sock')
106 }
107
108 getConfigDir () {
109 return join(paths.config, this.id)
110 }
111
112 // ---------------------------------------------------------------------------
113
114 isTestInstance () {
115 return this.id === 'test'
116 }
117
118 // ---------------------------------------------------------------------------
119
120 // Thanks: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Object/freeze
121 private deepFreeze <T extends object> (object: T) {
122 const propNames = Reflect.ownKeys(object)
123
124 // Freeze properties before freezing self
125 for (const name of propNames) {
126 const value = object[name]
127
128 if ((value && typeof value === 'object') || typeof value === 'function') {
129 this.deepFreeze(value)
130 }
131 }
132
133 return Object.freeze({ ...object })
134 }
135
136 static get Instance () {
137 return this.instance || (this.instance = new this())
138 }
139}
diff --git a/packages/peertube-runner/shared/http.ts b/packages/peertube-runner/shared/http.ts
new file mode 100644
index 000000000..d3fff70d1
--- /dev/null
+++ b/packages/peertube-runner/shared/http.ts
@@ -0,0 +1,66 @@
1import { createWriteStream, remove } from 'fs-extra'
2import { request as requestHTTP } from 'http'
3import { request as requestHTTPS, RequestOptions } from 'https'
4import { logger } from './logger'
5
6export function downloadFile (options: {
7 url: string
8 destination: string
9 runnerToken: string
10 jobToken: string
11}) {
12 const { url, destination, runnerToken, jobToken } = options
13
14 logger.debug(`Downloading file ${url}`)
15
16 return new Promise<void>((res, rej) => {
17 const parsed = new URL(url)
18
19 const body = JSON.stringify({
20 runnerToken,
21 jobToken
22 })
23
24 const getOptions: RequestOptions = {
25 method: 'POST',
26 hostname: parsed.hostname,
27 port: parsed.port,
28 path: parsed.pathname,
29 headers: {
30 'Content-Type': 'application/json',
31 'Content-Length': Buffer.byteLength(body, 'utf-8')
32 }
33 }
34
35 const request = getRequest(url)(getOptions, response => {
36 const code = response.statusCode ?? 0
37
38 if (code >= 400) {
39 return rej(new Error(response.statusMessage))
40 }
41
42 const file = createWriteStream(destination)
43 file.on('finish', () => res())
44
45 response.pipe(file)
46 })
47
48 request.on('error', err => {
49 remove(destination)
50 .catch(err => console.error(err))
51
52 return rej(err)
53 })
54
55 request.write(body)
56 request.end()
57 })
58}
59
60// ---------------------------------------------------------------------------
61
62function getRequest (url: string) {
63 if (url.startsWith('https://')) return requestHTTPS
64
65 return requestHTTP
66}
diff --git a/packages/peertube-runner/shared/index.ts b/packages/peertube-runner/shared/index.ts
new file mode 100644
index 000000000..d0b5a2e3e
--- /dev/null
+++ b/packages/peertube-runner/shared/index.ts
@@ -0,0 +1,3 @@
1export * from './config-manager'
2export * from './http'
3export * from './logger'
diff --git a/packages/peertube-runner/shared/ipc/index.ts b/packages/peertube-runner/shared/ipc/index.ts
new file mode 100644
index 000000000..ad4590281
--- /dev/null
+++ b/packages/peertube-runner/shared/ipc/index.ts
@@ -0,0 +1,2 @@
1export * from './ipc-client'
2export * from './ipc-server'
diff --git a/packages/peertube-runner/shared/ipc/ipc-client.ts b/packages/peertube-runner/shared/ipc/ipc-client.ts
new file mode 100644
index 000000000..7f5951157
--- /dev/null
+++ b/packages/peertube-runner/shared/ipc/ipc-client.ts
@@ -0,0 +1,74 @@
1import CliTable3 from 'cli-table3'
2import { ensureDir } from 'fs-extra'
3import { Client as NetIPC } from 'net-ipc'
4import { ConfigManager } from '../config-manager'
5import { IPCReponse, IPCReponseData, IPCRequest } from './shared'
6
7export class IPCClient {
8 private netIPC: NetIPC
9
10 async run () {
11 await ensureDir(ConfigManager.Instance.getSocketDirectory())
12
13 const socketPath = ConfigManager.Instance.getSocketPath()
14 this.netIPC = new NetIPC({ path: socketPath })
15 await this.netIPC.connect()
16 }
17
18 async askRegister (options: {
19 url: string
20 registrationToken: string
21 runnerName: string
22 runnerDescription?: string
23 }) {
24 const req: IPCRequest = {
25 type: 'register',
26 ...options
27 }
28
29 const { success, error } = await this.netIPC.request(req) as IPCReponse
30
31 if (success) console.log('PeerTube instance registered')
32 else console.error('Could not register PeerTube instance on runner server side', error)
33 }
34
35 async askUnregister (options: {
36 url: string
37 }) {
38 const req: IPCRequest = {
39 type: 'unregister',
40 ...options
41 }
42
43 const { success, error } = await this.netIPC.request(req) as IPCReponse
44
45 if (success) console.log('PeerTube instance unregistered')
46 else console.error('Could not unregister PeerTube instance on runner server side', error)
47 }
48
49 async askListRegistered () {
50 const req: IPCRequest = {
51 type: 'list-registered'
52 }
53
54 const { success, error, data } = await this.netIPC.request(req) as IPCReponse<IPCReponseData>
55 if (!success) {
56 console.error('Could not list registered PeerTube instances', error)
57 return
58 }
59
60 const table = new CliTable3({
61 head: [ 'instance', 'runner name', 'runner description' ]
62 })
63
64 for (const server of data.servers) {
65 table.push([ server.url, server.runnerName, server.runnerDescription ])
66 }
67
68 console.log(table.toString())
69 }
70
71 stop () {
72 this.netIPC.destroy()
73 }
74}
diff --git a/packages/peertube-runner/shared/ipc/ipc-server.ts b/packages/peertube-runner/shared/ipc/ipc-server.ts
new file mode 100644
index 000000000..bc340198b
--- /dev/null
+++ b/packages/peertube-runner/shared/ipc/ipc-server.ts
@@ -0,0 +1,61 @@
1import { ensureDir } from 'fs-extra'
2import { Server as NetIPC } from 'net-ipc'
3import { pick } from '@shared/core-utils'
4import { RunnerServer } from '../../server'
5import { ConfigManager } from '../config-manager'
6import { logger } from '../logger'
7import { IPCReponse, IPCReponseData, IPCRequest } from './shared'
8
9export class IPCServer {
10 private netIPC: NetIPC
11 private runnerServer: RunnerServer
12
13 async run (runnerServer: RunnerServer) {
14 this.runnerServer = runnerServer
15
16 await ensureDir(ConfigManager.Instance.getSocketDirectory())
17
18 const socketPath = ConfigManager.Instance.getSocketPath()
19 this.netIPC = new NetIPC({ path: socketPath })
20 await this.netIPC.start()
21
22 logger.info(`IPC socket created on ${socketPath}`)
23
24 this.netIPC.on('request', async (req: IPCRequest, res) => {
25 try {
26 const data = await this.process(req)
27
28 this.sendReponse(res, { success: true, data })
29 } catch (err) {
30 console.error('Cannot execute RPC call', err)
31 this.sendReponse(res, { success: false, error: err.message })
32 }
33 })
34 }
35
36 private async process (req: IPCRequest) {
37 switch (req.type) {
38 case 'register':
39 await this.runnerServer.registerRunner(pick(req, [ 'url', 'registrationToken', 'runnerName', 'runnerDescription' ]))
40 return undefined
41
42 case 'unregister':
43 await this.runnerServer.unregisterRunner({ url: req.url })
44 return undefined
45
46 case 'list-registered':
47 return Promise.resolve(this.runnerServer.listRegistered())
48
49 default:
50 throw new Error('Unknown RPC call ' + (req as any).type)
51 }
52 }
53
54 private sendReponse <T extends IPCReponseData> (
55 response: (data: any) => Promise<void>,
56 body: IPCReponse<T>
57 ) {
58 response(body)
59 .catch(err => console.error('Cannot send response after IPC request', err))
60 }
61}
diff --git a/packages/peertube-runner/shared/ipc/shared/index.ts b/packages/peertube-runner/shared/ipc/shared/index.ts
new file mode 100644
index 000000000..deaaa152e
--- /dev/null
+++ b/packages/peertube-runner/shared/ipc/shared/index.ts
@@ -0,0 +1,2 @@
1export * from './ipc-request.model'
2export * from './ipc-response.model'
diff --git a/packages/peertube-runner/shared/ipc/shared/ipc-request.model.ts b/packages/peertube-runner/shared/ipc/shared/ipc-request.model.ts
new file mode 100644
index 000000000..0f733cdfe
--- /dev/null
+++ b/packages/peertube-runner/shared/ipc/shared/ipc-request.model.ts
@@ -0,0 +1,15 @@
1export type IPCRequest =
2 IPCRequestRegister |
3 IPCRequestUnregister |
4 IPCRequestListRegistered
5
6export type IPCRequestRegister = {
7 type: 'register'
8 url: string
9 registrationToken: string
10 runnerName: string
11 runnerDescription?: string
12}
13
14export type IPCRequestUnregister = { type: 'unregister', url: string }
15export type IPCRequestListRegistered = { type: 'list-registered' }
diff --git a/packages/peertube-runner/shared/ipc/shared/ipc-response.model.ts b/packages/peertube-runner/shared/ipc/shared/ipc-response.model.ts
new file mode 100644
index 000000000..689d6e09a
--- /dev/null
+++ b/packages/peertube-runner/shared/ipc/shared/ipc-response.model.ts
@@ -0,0 +1,15 @@
1export type IPCReponse <T extends IPCReponseData = undefined> = {
2 success: boolean
3 error?: string
4 data?: T
5}
6
7export type IPCReponseData =
8 // list registered
9 {
10 servers: {
11 runnerName: string
12 runnerDescription: string
13 url: string
14 }[]
15 }
diff --git a/packages/peertube-runner/shared/logger.ts b/packages/peertube-runner/shared/logger.ts
new file mode 100644
index 000000000..bf0f41828
--- /dev/null
+++ b/packages/peertube-runner/shared/logger.ts
@@ -0,0 +1,12 @@
1import { pino } from 'pino'
2import pretty from 'pino-pretty'
3
4const logger = pino(pretty({
5 colorize: true
6}))
7
8logger.level = 'info'
9
10export {
11 logger
12}