import fs from 'fs' import fsPromise from 'fs/promises' import path from 'path' import events from 'events' import hasNativeRecursive from './has-native-recursive.mjs' import * as is from './is.mjs' const EVENT_UPDATE = 'update'; const EVENT_REMOVE = 'remove'; const TYPE_FILE = 'file' const TYPE_DIRECTORY = 'directory' function unique(arr) { return arr.filter(function(v, i, self) { return self.indexOf(v) === i; }); } export default class Watcher extends events.EventEmitter { constructor(path, options = null, fn = null, { fs: fsoverwrite } = {}) { super() this.ac = new AbortController() events.setMaxListeners(2000, this.ac.signal) this._fs = fsoverwrite || fs this._cache = [] this._cacheTimeout = null this.listeners = [] this.closed = false let paths = path if (is.buffer(paths)) { paths = paths.toString() } if (!is.array(paths)) { paths = [paths] } paths = unique(paths) this.options = options || {} this.fn = fn || null if (is.func(this.options)) { this.fn = this.options this.options = {} } this._verifyOptions(paths) } isClosed() { return this.closed } close() { this.closed = true this.ac.abort() this._cache = this.listeners = [] this.emitAsync('close') } emitAsync(name, ...args) { process.nextTick(() => this.emit(name, ...args)) } _verifyOptions(paths) { for (let path of paths) { if (!is.exists(path)) { this.emitAsync('error', new Error(path + ' does not exist.')) } } if (this.options.encoding) { if (this.options.encoding && this.options.encoding !== 'buffer' && !Buffer.isEncoding(this.options.encoding)) { throw new Error('Unknown encoding: ' + this.options.encoding); } } else { this.options.encoding = 'utf8' } if (this.options.delay !== 0 && !this.options.delay) { this.options.delay = 200 } if (is.func(this.fn)) { this.on('change', this.fn) } if (this.options.manualRecursive !== true) { hasNativeRecursive(nativeRecursive => { this.supportsNativeRecursive = nativeRecursive this.options.manualRecursive = !nativeRecursive this._startListeners(paths) }) } else { this._startListeners(paths) } } _startListeners(paths) { Promise.all(paths.map(path => this.safeAdd(path))) .then( () => this.emit('ready'), err => this.emit('error', err), ) } getWatcherOrNull(name) { for (let check of this.listeners) { if (check.path === name) { return check } } return null } shouldInclude(name) { return this.options.filter ? (is.func(this.options.filter) && this.options.filter.call(this, name) === true) || (is.regExp(this.options.filter) && this.options.filter.test(name)) : true } closeWatch(orgItem) { let item = orgItem if (typeof item === 'string') { item = getWatcherOrNull(item) } if (!item) { this.emit('error', new Error(`attempted to close watcher for ${item} but such a watcher could not be found`)) return } if (item.watcher) { item.watcher.close() } this._emitEvent(item, EVENT_REMOVE, item.path) let index = this.listeners.indexOf(item) if (index < 0) return this.listeners.splice(index, 1) } _emitEvent(item, evt, name) { if (item.type === TYPE_FILE && !is.samePath(name, item.filename)) return if (item.type === TYPE_DIRECTORY && !this.shouldInclude(name)) return if (item.flag) { item.flag = '' return } let outputName = name if (this.options.encoding !== 'utf8') { outputName = Buffer.from(outputName) if (this.options.encoding !== 'buffer') { outputName = outputName.toString(this.options.encoding) } } if (!this.options.delay) { this.emit('change', evt, outputName) return } this._cache.push([evt, name, outputName]) if (this._cacheTimeout) return this._cacheTimeout = setTimeout(() => { let cache = this._filterCache(this._cache) this._cache = [] this._cacheTimeout = null for (let event of cache) { try { this.emit('change', event[0], event[2]) } catch (err) { this.emit('error', err) } } }, this.options.delay) } _filterCache(cache) { let setFound = new Set() let out = cache.reverse().filter(([evt, name]) => { if (setFound.has(name)) return false setFound.add(name) return true }).reverse() return out } _watcherSink(item, rawEvt, rawName) { if (this.closed) return let name = path.join(item.path, rawName || '') let evt = is.exists(name) ? EVENT_UPDATE : EVENT_REMOVE if (this.options.recursive && this.options.manualRecursive && item.type === TYPE_DIRECTORY) { if (evt === EVENT_REMOVE) { this.closeWatch(name) return } else { if (is.directory(name) && this.getWatcherOrNull(name) === null && this.shouldInclude(name) === false) { this.safeAdd(subItem, TYPE_DIRECTORY) } } } this._emitEvent(item, evt, name) } _pathToItem(name, type) { if (type === TYPE_FILE) { let parent = path.join(name, '../') return { path: parent, type: TYPE_FILE, filename: name, watcher: null, flag: '', } } else { return { path: name, type: TYPE_DIRECTORY, watcher: null, flag: '', } } } _watcherError(item, err) { if (this.closed) return if (is.windows() && err.code === 'EPERM') { this.closeWatch(item) item.flag = 'windows-error' } else { self.emit('error', err) } } safeAdd(name, orgType) { let type = orgType if (!type) { type = is.file(name) ? TYPE_FILE : TYPE_DIRECTORY } let item = this._pathToItem(name, type) let options = { encoding: 'utf8', signal: this.ac.signal, } if (!this.options.manualRecursive && item.type !== TYPE_FILE && this.options.recursive) { options.recursive = true } try { item.watcher = this._fs.watch(item.path, options) } catch (err) { this.emitAsync('error', err) } if (!item.watcher) return this.listeners.push(item) item.watcher.on('error', this._watcherError.bind(this, item)) item.watcher.on('change', this._watcherSink.bind(this, item)) if (options.recursive || item.type === TYPE_FILE) return return fsPromise.readdir(item.path, { withFileTypes: true }) .then(directories => directories.filter(dir => dir.isDirectory())) .then(directories => { return Promise.all(directories.map(dir => this.safeAdd(path.join(item.path, dir.name), TYPE_DIRECTORY))) }) } }