const PassThrough = require('stream').PassThrough; const urlResolve = require('url').resolve; const miniget = require('miniget'); const m3u8Parser = require('./m3u8-parser'); const DashMPDParser = require('./dash-mpd-parser'); const Queue = require('./queue'); const parseTime = require('./parse-time'); /** * @param {string} playlistURL * @param {Object} options * @return {stream.Readable} */ module.exports = (playlistURL, options) => { const stream = new PassThrough(); options = options || {}; const chunkReadahead = options.chunkReadahead || 3; const liveBuffer = options.liveBuffer || 20000; // 20 seconds const requestOptions = options.requestOptions; const Parser = { 'm3u8': m3u8Parser, 'dash-mpd': DashMPDParser, }[options.parser || (/\.mpd$/.test(playlistURL) ? 'dash-mpd' : 'm3u8')]; if (!Parser) { throw TypeError(`parser '${options.parser}' not supported`); } let relativeBegin = typeof options.begin === 'string'; let begin = relativeBegin ? parseTime.humanStr(options.begin) : Math.max(options.begin - liveBuffer, 0) || 0; let liveBegin = Date.now() - liveBuffer; let currSegment; const streamQueue = new Queue((req, callback) => { currSegment = req; // Count the size manually, since the `content-length` header is not // always there. let size = 0; req.on('data', (chunk) => size += chunk.length); req.pipe(stream, { end: false }); req.on('end', () => callback(size)); }, { concurrency: 1 }); let segmentNumber = 0; let downloaded = 0; const requestQueue = new Queue((segment, callback) => { let req = miniget(urlResolve(playlistURL, segment.url), requestOptions); req.on('error', callback); streamQueue.push(req, (size) => { downloaded += size; stream.emit('progress', { num: ++segmentNumber, size: size, url: segment.url, duration: segment.duration, }, requestQueue.total, downloaded); callback(); }); }, { concurrency: chunkReadahead }); const onError = (err) => { if (ended) { return; } stream.emit('error', err); // Stop on any error. stream.end(); }; // When to look for items again. let refreshThreshold; let minRefreshTime; let refreshTimeout; let fetchingPlaylist = false; let ended = false; let isStatic = false; let lastRefresh; const onQueuedEnd = (err) => { currSegment = null; if (err) { onError(err); } else if (!fetchingPlaylist && !ended && !isStatic && requestQueue.tasks.length + requestQueue.active === refreshThreshold) { let ms = Math.max(0, minRefreshTime - (Date.now() - lastRefresh)); refreshTimeout = setTimeout(refreshPlaylist, ms); } else if ((ended || isStatic) && !requestQueue.tasks.length && !requestQueue.active) { stream.end(); } }; let currPlaylist; let lastSeq; const refreshPlaylist = () => { fetchingPlaylist = true; lastRefresh = Date.now(); currPlaylist = miniget(playlistURL, requestOptions); currPlaylist.on('error', onError); const parser = currPlaylist.pipe(new Parser(options.id)); let starttime = null; parser.on('starttime', (a) => { starttime = a; if (relativeBegin && begin >= 0) { begin += starttime; } }); parser.on('endlist', () => { isStatic = true; }); parser.on('endearly', () => { currPlaylist.unpipe(parser); }); let addedItems = []; let liveAddedItems = []; const addItem = (item, isLive) => { if (item.seq <= lastSeq) { return; } lastSeq = item.seq; begin = item.time; requestQueue.push(item, onQueuedEnd); addedItems.push(item); if (isLive) { liveAddedItems.push(item); } }; let tailedItems = [], tailedItemsDuration = 0; parser.on('item', (item) => { item.time = starttime; if (!starttime || begin <= item.time) { addItem(item, liveBegin <= item.time); } else { tailedItems.push(item); tailedItemsDuration += item.duration; // Only keep the last `liveBuffer` of items. while (tailedItems.length > 1 && tailedItemsDuration - tailedItems[0].duration > liveBuffer) { tailedItemsDuration -= tailedItems.shift().duration; } } starttime += item.duration; }); parser.on('end', () => { currPlaylist = null; // If we are too ahead of the stream, make sure to get the // latest available items with a small buffer. if (!addedItems.length && tailedItems.length) { tailedItems.forEach((item) => { addItem(item, true); }); } // Refresh the playlist when remaining segments get low. refreshThreshold = Math.max(1, Math.ceil(addedItems.length * 0.01)); // Throttle refreshing the playlist by looking at the duration // of live items added on this refresh. minRefreshTime = addedItems.reduce(((total, item) => item.duration + total), 0); fetchingPlaylist = false; }); }; refreshPlaylist(); stream.end = () => { ended = true; streamQueue.die(); requestQueue.die(); clearTimeout(refreshTimeout); if (currPlaylist) { currPlaylist.unpipe(); currPlaylist.abort(); } if (currSegment) { currSegment.unpipe(); currSegment.abort(); } PassThrough.prototype.end.call(stream); }; return stream; };