webhook-action/node_modules/@nodelib/fs.walk/src/readers/async.ts

127 lines
3.1 KiB
TypeScript
Raw Normal View History

import { EventEmitter } from 'events';
import * as fsScandir from '@nodelib/fs.scandir';
import * as fastq from 'fastq';
import Settings from '../settings';
import { Entry, Errno, QueueItem } from '../types';
import * as common from './common';
import Reader from './reader';
type EntryEventCallback = (entry: Entry) => void;
type ErrorEventCallback = (error: Errno) => void;
type EndEventCallback = () => void;
export default class AsyncReader extends Reader {
protected readonly _scandir: typeof fsScandir.scandir = fsScandir.scandir;
protected readonly _emitter: EventEmitter = new EventEmitter();
private readonly _queue: fastq.queue = fastq(this._worker.bind(this), this._settings.concurrency);
private _isFatalError: boolean = false;
private _isDestroyed: boolean = false;
constructor(_root: string, protected readonly _settings: Settings) {
super(_root, _settings);
this._queue.drain = () => {
if (!this._isFatalError) {
this._emitter.emit('end');
}
};
}
public read(): EventEmitter {
this._isFatalError = false;
this._isDestroyed = false;
setImmediate(() => {
this._pushToQueue(this._root, this._settings.basePath);
});
return this._emitter;
}
public get isDestroyed(): boolean {
return this._isDestroyed;
}
public destroy(): void {
if (this._isDestroyed) {
throw new Error('The reader is already destroyed');
}
this._isDestroyed = true;
this._queue.killAndDrain();
}
public onEntry(callback: EntryEventCallback): void {
this._emitter.on('entry', callback);
}
public onError(callback: ErrorEventCallback): void {
this._emitter.once('error', callback);
}
public onEnd(callback: EndEventCallback): void {
this._emitter.once('end', callback);
}
private _pushToQueue(directory: string, base?: string): void {
const queueItem: QueueItem = { directory, base };
this._queue.push(queueItem, (error: Error | null) => {
if (error !== null) {
this._handleError(error);
}
});
}
private _worker(item: QueueItem, done: fastq.done): void {
this._scandir(item.directory, this._settings.fsScandirSettings, (error: NodeJS.ErrnoException | null, entries) => {
if (error !== null) {
return done(error, undefined);
}
for (const entry of entries) {
this._handleEntry(entry, item.base);
}
done(null as unknown as Error, undefined);
});
}
private _handleError(error: Error): void {
if (this._isDestroyed || !common.isFatalError(this._settings, error)) {
return;
}
this._isFatalError = true;
this._isDestroyed = true;
this._emitter.emit('error', error);
}
private _handleEntry(entry: Entry, base?: string): void {
if (this._isDestroyed || this._isFatalError) {
return;
}
const fullpath = entry.path;
if (base !== undefined) {
entry.path = common.joinPathSegments(base, entry.name, this._settings.pathSegmentSeparator);
}
if (common.isAppliedFilter(this._settings.entryFilter, entry)) {
this._emitEntry(entry);
}
if (entry.dirent.isDirectory() && common.isAppliedFilter(this._settings.deepFilter, entry)) {
this._pushToQueue(fullpath, entry.path);
}
}
private _emitEntry(entry: Entry): void {
this._emitter.emit('entry', entry);
}
}