"use strict"; var __importDefault = (this && this.__importDefault) || function (mod) { return (mod && mod.__esModule) ? mod : { "default": mod }; }; const stream_1 = require("stream"); const url_1 = require("url"); const miniget_1 = __importDefault(require("miniget")); const m3u8_parser_1 = __importDefault(require("./m3u8-parser")); const dash_mpd_parser_1 = __importDefault(require("./dash-mpd-parser")); const queue_1 = __importDefault(require("./queue")); const parse_time_1 = require("./parse-time"); const supportedParsers = { 'm3u8': m3u8_parser_1.default, 'dash-mpd': dash_mpd_parser_1.default, }; let m3u8stream = (playlistURL, options = {}) => { const stream = new stream_1.PassThrough(); const chunkReadahead = options.chunkReadahead || 3; const liveBuffer = options.liveBuffer || 20000; // 20 seconds const requestOptions = options.requestOptions; const Parser = supportedParsers[options.parser || (/\.mpd$/.test(playlistURL) ? 'dash-mpd' : 'm3u8')]; if (!Parser) { throw TypeError(`parser '${options.parser}' not supported`); } let begin = 0; if (typeof options.begin !== 'undefined') { begin = typeof options.begin === 'string' ? parse_time_1.humanStr(options.begin) : Math.max(options.begin - liveBuffer, 0); } let liveBegin = Date.now() - liveBuffer; let currSegment; const streamQueue = new queue_1.default((req, callback) => { currSegment = req; // Count the size manually, since the `content-length` header is not // always there. let size = 0; req.on('data', (chunk) => size += chunk.length); req.pipe(stream, { end: false }); req.on('end', () => callback(undefined, size)); }, { concurrency: 1 }); let segmentNumber = 0; let downloaded = 0; const requestQueue = new queue_1.default((segment, callback) => { let req = miniget_1.default(url_1.resolve(playlistURL, segment.url), requestOptions); req.on('error', callback); streamQueue.push(req, (err, size) => { downloaded += +size; stream.emit('progress', { num: ++segmentNumber, size: size, duration: segment.duration, url: segment.url, }, requestQueue.total, downloaded); callback(); }); }, { concurrency: chunkReadahead }); const onError = (err) => { if (ended) { return; } stream.emit('error', err); // Stop on any error. stream.end(); }; // When to look for items again. let refreshThreshold; let minRefreshTime; let refreshTimeout; let fetchingPlaylist = true; let ended = false; let isStatic = false; let lastRefresh; const onQueuedEnd = (err) => { currSegment = null; if (err) { onError(err); } else if (!fetchingPlaylist && !ended && !isStatic && requestQueue.tasks.length + requestQueue.active <= refreshThreshold) { let ms = Math.max(0, minRefreshTime - (Date.now() - lastRefresh)); fetchingPlaylist = true; refreshTimeout = setTimeout(refreshPlaylist, ms); } else if ((ended || isStatic) && !requestQueue.tasks.length && !requestQueue.active) { stream.end(); } }; let currPlaylist; let lastSeq; let starttime = 0; const refreshPlaylist = () => { lastRefresh = Date.now(); currPlaylist = miniget_1.default(playlistURL, requestOptions); currPlaylist.on('error', onError); const parser = currPlaylist.pipe(new Parser(options.id)); parser.on('starttime', (a) => { if (starttime) { return; } starttime = a; if (typeof options.begin === 'string' && begin >= 0) { begin += starttime; } }); parser.on('endlist', () => { isStatic = true; }); parser.on('endearly', currPlaylist.unpipe.bind(currPlaylist, parser)); let addedItems = []; let liveAddedItems = []; const addItem = (item, isLive) => { if (item.seq <= lastSeq) { return; } lastSeq = item.seq; begin = item.time; requestQueue.push(item, onQueuedEnd); addedItems.push(item); if (isLive) { liveAddedItems.push(item); } }; let tailedItems = [], tailedItemsDuration = 0; parser.on('item', (item) => { let timedItem = Object.assign({ time: starttime }, item); let isLive = liveBegin <= timedItem.time; if (begin <= timedItem.time) { addItem(timedItem, isLive); } else { tailedItems.push(timedItem); tailedItemsDuration += timedItem.duration; // Only keep the last `liveBuffer` of items. while (tailedItems.length > 1 && tailedItemsDuration - tailedItems[0].duration > liveBuffer) { tailedItemsDuration -= tailedItems.shift().duration; } } starttime += timedItem.duration; }); parser.on('end', () => { currPlaylist = null; // If we are too ahead of the stream, make sure to get the // latest available items with a small buffer. if (!addedItems.length && tailedItems.length) { tailedItems.forEach((item) => { addItem(item, true); }); } // Refresh the playlist when remaining segments get low. refreshThreshold = Math.max(1, Math.ceil(addedItems.length * 0.01)); // Throttle refreshing the playlist by looking at the duration // of live items added on this refresh. minRefreshTime = addedItems.reduce(((total, item) => item.duration + total), 0); fetchingPlaylist = false; }); }; refreshPlaylist(); stream.end = () => { ended = true; streamQueue.die(); requestQueue.die(); clearTimeout(refreshTimeout); if (currPlaylist) { currPlaylist.unpipe(); currPlaylist.abort(); } if (currSegment) { currSegment.unpipe(); currSegment.abort(); } stream_1.PassThrough.prototype.end.call(stream, null); }; return stream; }; module.exports = m3u8stream; //# sourceMappingURL=index.js.map