diff --git a/packages/playwright-core/src/client/elementHandle.ts b/packages/playwright-core/src/client/elementHandle.ts index b1ef721ce3..5c0988f438 100644 --- a/packages/playwright-core/src/client/elementHandle.ts +++ b/packages/playwright-core/src/client/elementHandle.ts @@ -14,9 +14,6 @@ * limitations under the License. */ -import { pipeline } from 'stream'; -import { promisify } from 'util'; - import { Frame } from './frame'; import { JSHandle, parseResult, serializeArgument } from './jsHandle'; import { assert } from '../utils/isomorphic/debug'; @@ -34,8 +31,6 @@ import type * as api from '../../types/types'; import type { Platform } from '../common/platform'; import type * as channels from '@protocol/channels'; -const pipelineAsync = promisify(pipeline); - export class ElementHandle extends JSHandle implements api.ElementHandle { readonly _elementChannel: channels.ElementHandleChannel; @@ -306,7 +301,7 @@ export async function convertInputFiles(platform: Platform, files: string | File }), true); for (let i = 0; i < files.length; i++) { const writable = WritableStream.from(writableStreams[i]); - await pipelineAsync(platform.fs().createReadStream(files[i]), writable.stream()); + await platform.streamFile(files[i], writable.stream()); } return { directoryStream: rootDir, diff --git a/packages/playwright-core/src/client/stream.ts b/packages/playwright-core/src/client/stream.ts index 4f43b9afad..97c1e4b634 100644 --- a/packages/playwright-core/src/client/stream.ts +++ b/packages/playwright-core/src/client/stream.ts @@ -30,29 +30,6 @@ export class Stream extends ChannelOwner { } stream(): Readable { - return new StreamImpl(this._channel); - } -} - -class StreamImpl extends Readable { - private _channel: channels.StreamChannel; - - constructor(channel: channels.StreamChannel) { - super(); - this._channel = channel; - } - - override async _read() { - const result = await this._channel.read({ size: 1024 * 1024 }); - if (result.binary.byteLength) - this.push(result.binary); - else - this.push(null); - } - - override _destroy(error: Error | null, callback: (error: Error | null | undefined) => void): void { - // Stream might be destroyed after the connection was closed. - this._channel.close().catch(e => null); - super._destroy(error, callback); + return this._platform.streamReadable(this._channel); } } diff --git a/packages/playwright-core/src/client/writableStream.ts b/packages/playwright-core/src/client/writableStream.ts index 66cf17201d..38a2d0214a 100644 --- a/packages/playwright-core/src/client/writableStream.ts +++ b/packages/playwright-core/src/client/writableStream.ts @@ -14,11 +14,10 @@ * limitations under the License. */ -import { Writable } from 'stream'; - import { ChannelOwner } from './channelOwner'; import type * as channels from '@protocol/channels'; +import type { Writable } from 'stream'; export class WritableStream extends ChannelOwner { static from(Stream: channels.WritableStreamChannel): WritableStream { @@ -30,26 +29,6 @@ export class WritableStream extends ChannelOwner } stream(): Writable { - return new WritableStreamImpl(this._channel); - } -} - -class WritableStreamImpl extends Writable { - private _channel: channels.WritableStreamChannel; - - constructor(channel: channels.WritableStreamChannel) { - super(); - this._channel = channel; - } - - override async _write(chunk: Buffer | string, encoding: BufferEncoding, callback: (error?: Error | null) => void) { - const error = await this._channel.write({ binary: typeof chunk === 'string' ? Buffer.from(chunk) : chunk }).catch(e => e); - callback(error || null); - } - - override async _final(callback: (error?: Error | null) => void) { - // Stream might be destroyed after the connection was closed. - const error = await this._channel.close().catch(e => e); - callback(error || null); + return this._platform.streamWritable(this._channel); } } diff --git a/packages/playwright-core/src/common/platform.ts b/packages/playwright-core/src/common/platform.ts index 10bdc35922..25819b9a6f 100644 --- a/packages/playwright-core/src/common/platform.ts +++ b/packages/playwright-core/src/common/platform.ts @@ -19,6 +19,8 @@ import { webColors, noColors } from '../utils/isomorphic/colors'; import type * as fs from 'fs'; import type * as path from 'path'; import type { Colors } from '../utils/isomorphic/colors'; +import type { Readable, Writable } from 'stream'; +import type * as channels from '@protocol/channels'; export type Zone = { push(data: unknown): Zone; @@ -47,47 +49,12 @@ export type Platform = { log(name: 'api' | 'channel', message: string | Error | object): void; path: () => typeof path; pathSeparator: string; + streamFile(path: string, writable: Writable): Promise, + streamReadable: (channel: channels.StreamChannel) => Readable, + streamWritable: (channel: channels.WritableStreamChannel) => Writable, zones: { empty: Zone, current: () => Zone; }; }; -export const webPlatform: Platform = { - name: 'web', - - calculateSha1: async (text: string) => { - const bytes = new TextEncoder().encode(text); - const hashBuffer = await window.crypto.subtle.digest('SHA-1', bytes); - return Array.from(new Uint8Array(hashBuffer), b => b.toString(16).padStart(2, '0')).join(''); - }, - - colors: webColors, - - createGuid: () => { - return Array.from(window.crypto.getRandomValues(new Uint8Array(16)), b => b.toString(16).padStart(2, '0')).join(''); - }, - - fs: () => { - throw new Error('File system is not available'); - }, - - inspectCustom: undefined, - - isDebuggerAttached: () => false, - - isLogEnabled(name: 'api' | 'channel') { - return false; - }, - - log(name: 'api' | 'channel', message: string | Error | object) {}, - - path: () => { - throw new Error('Path module is not available'); - }, - - pathSeparator: '/', - - zones: { empty: noopZone, current: () => noopZone }, -}; - export const emptyPlatform: Platform = { name: 'empty', @@ -121,5 +88,35 @@ export const emptyPlatform: Platform = { pathSeparator: '/', + streamFile(path: string, writable: Writable): Promise { + throw new Error('Streams are not available'); + }, + + streamReadable: (channel: channels.StreamChannel) => { + throw new Error('Streams are not available'); + }, + + streamWritable: (channel: channels.WritableStreamChannel) => { + throw new Error('Streams are not available'); + }, + zones: { empty: noopZone, current: () => noopZone }, }; + +export const webPlatform: Platform = { + ...emptyPlatform, + + name: 'web', + + calculateSha1: async (text: string) => { + const bytes = new TextEncoder().encode(text); + const hashBuffer = await window.crypto.subtle.digest('SHA-1', bytes); + return Array.from(new Uint8Array(hashBuffer), b => b.toString(16).padStart(2, '0')).join(''); + }, + + colors: webColors, + + createGuid: () => { + return Array.from(window.crypto.getRandomValues(new Uint8Array(16)), b => b.toString(16).padStart(2, '0')).join(''); + }, +}; diff --git a/packages/playwright-core/src/server/utils/nodePlatform.ts b/packages/playwright-core/src/server/utils/nodePlatform.ts index e9883da361..fcdfebde2a 100644 --- a/packages/playwright-core/src/server/utils/nodePlatform.ts +++ b/packages/playwright-core/src/server/utils/nodePlatform.ts @@ -18,6 +18,7 @@ import * as crypto from 'crypto'; import * as fs from 'fs'; import * as path from 'path'; import * as util from 'util'; +import { Readable, Writable, pipeline } from 'stream'; import { colors } from '../../utilsBundle'; import { debugLogger } from './debugLogger'; @@ -25,6 +26,9 @@ import { currentZone, emptyZone } from './zones'; import type { Platform, Zone } from '../../common/platform'; import type { Zone as ZoneImpl } from './zones'; +import type * as channels from '@protocol/channels'; + +const pipelineAsync = util.promisify(pipeline); class NodeZone implements Zone { private _zone: ZoneImpl; @@ -81,8 +85,63 @@ export const nodePlatform: Platform = { pathSeparator: path.sep, + async streamFile(path: string, stream: Writable): Promise { + await pipelineAsync(fs.createReadStream(path), stream); + }, + + streamReadable: (channel: channels.StreamChannel) => { + return new ReadableStreamImpl(channel); + }, + + streamWritable: (channel: channels.WritableStreamChannel) => { + return new WritableStreamImpl(channel); + }, + zones: { current: () => new NodeZone(currentZone()), empty: new NodeZone(emptyZone), } }; + +class ReadableStreamImpl extends Readable { + private _channel: channels.StreamChannel; + + constructor(channel: channels.StreamChannel) { + super(); + this._channel = channel; + } + + override async _read() { + const result = await this._channel.read({ size: 1024 * 1024 }); + if (result.binary.byteLength) + this.push(result.binary); + else + this.push(null); + } + + override _destroy(error: Error | null, callback: (error: Error | null | undefined) => void): void { + // Stream might be destroyed after the connection was closed. + this._channel.close().catch(e => null); + super._destroy(error, callback); + } +} + +class WritableStreamImpl extends Writable { + private _channel: channels.WritableStreamChannel; + + constructor(channel: channels.WritableStreamChannel) { + super(); + this._channel = channel; + } + + override async _write(chunk: Buffer | string, encoding: BufferEncoding, callback: (error?: Error | null) => void) { + const error = await this._channel.write({ binary: typeof chunk === 'string' ? Buffer.from(chunk) : chunk }).catch(e => e); + callback(error || null); + } + + override async _final(callback: (error?: Error | null) => void) { + // Stream might be destroyed after the connection was closed. + const error = await this._channel.close().catch(e => e); + callback(error || null); + } +}