1
0
mirror of https://github.com/musix-org/musix-oss synced 2024-12-23 22:33:17 +00:00
musix-oss/node_modules/m3u8stream/lib/index.js

179 lines
5.3 KiB
JavaScript
Raw Normal View History

const PassThrough = require('stream').PassThrough;
const urlResolve = require('url').resolve;
const miniget = require('miniget');
const m3u8Parser = require('./m3u8-parser');
const DashMPDParser = require('./dash-mpd-parser');
const Queue = require('./queue');
const parseTime = require('./parse-time');
/**
* @param {string} playlistURL
* @param {Object} options
* @return {stream.Readable}
*/
module.exports = (playlistURL, options) => {
const stream = new PassThrough();
options = options || {};
const chunkReadahead = options.chunkReadahead || 3;
const liveBuffer = options.liveBuffer || 20000; // 20 seconds
const requestOptions = options.requestOptions;
const Parser = {
'm3u8': m3u8Parser,
'dash-mpd': DashMPDParser,
}[options.parser || (/\.mpd$/.test(playlistURL) ? 'dash-mpd' : 'm3u8')];
if (!Parser) {
throw TypeError(`parser '${options.parser}' not supported`);
}
let relativeBegin = typeof options.begin === 'string';
let begin = relativeBegin ?
parseTime.humanStr(options.begin) :
Math.max(options.begin - liveBuffer, 0) || 0;
let liveBegin = Date.now() - liveBuffer;
let currSegment;
const streamQueue = new Queue((req, callback) => {
currSegment = req;
// Count the size manually, since the `content-length` header is not
// always there.
let size = 0;
req.on('data', (chunk) => size += chunk.length);
req.pipe(stream, { end: false });
req.on('end', () => callback(size));
}, { concurrency: 1 });
let segmentNumber = 0;
let downloaded = 0;
const requestQueue = new Queue((segment, callback) => {
let req = miniget(urlResolve(playlistURL, segment.url), requestOptions);
req.on('error', callback);
streamQueue.push(req, (size) => {
downloaded += size;
stream.emit('progress', {
num: ++segmentNumber,
size: size,
url: segment.url,
duration: segment.duration,
}, requestQueue.total, downloaded);
callback();
});
}, { concurrency: chunkReadahead });
const onError = (err) => {
if (ended) { return; }
stream.emit('error', err);
// Stop on any error.
stream.end();
};
// When to look for items again.
let refreshThreshold;
let minRefreshTime;
let refreshTimeout;
let fetchingPlaylist = false;
let ended = false;
let isStatic = false;
let lastRefresh;
const onQueuedEnd = (err) => {
currSegment = null;
if (err) {
onError(err);
} else if (!fetchingPlaylist && !ended && !isStatic &&
requestQueue.tasks.length + requestQueue.active === refreshThreshold) {
let ms = Math.max(0, minRefreshTime - (Date.now() - lastRefresh));
refreshTimeout = setTimeout(refreshPlaylist, ms);
} else if ((ended || isStatic) &&
!requestQueue.tasks.length && !requestQueue.active) {
stream.end();
}
};
let currPlaylist;
let lastSeq;
const refreshPlaylist = () => {
fetchingPlaylist = true;
lastRefresh = Date.now();
currPlaylist = miniget(playlistURL, requestOptions);
currPlaylist.on('error', onError);
const parser = currPlaylist.pipe(new Parser(options.id));
let starttime = null;
parser.on('starttime', (a) => {
starttime = a;
if (relativeBegin && begin >= 0) {
begin += starttime;
}
});
parser.on('endlist', () => { isStatic = true; });
parser.on('endearly', () => { currPlaylist.unpipe(parser); });
let addedItems = [];
let liveAddedItems = [];
const addItem = (item, isLive) => {
if (item.seq <= lastSeq) { return; }
lastSeq = item.seq;
begin = item.time;
requestQueue.push(item, onQueuedEnd);
addedItems.push(item);
if (isLive) {
liveAddedItems.push(item);
}
};
let tailedItems = [], tailedItemsDuration = 0;
parser.on('item', (item) => {
item.time = starttime;
if (!starttime || begin <= item.time) {
addItem(item, liveBegin <= item.time);
} else {
tailedItems.push(item);
tailedItemsDuration += item.duration;
// Only keep the last `liveBuffer` of items.
while (tailedItems.length > 1 &&
tailedItemsDuration - tailedItems[0].duration > liveBuffer) {
tailedItemsDuration -= tailedItems.shift().duration;
}
}
starttime += item.duration;
});
parser.on('end', () => {
currPlaylist = null;
// If we are too ahead of the stream, make sure to get the
// latest available items with a small buffer.
if (!addedItems.length && tailedItems.length) {
tailedItems.forEach((item) => { addItem(item, true); });
}
// Refresh the playlist when remaining segments get low.
refreshThreshold = Math.max(1, Math.ceil(addedItems.length * 0.01));
// Throttle refreshing the playlist by looking at the duration
// of live items added on this refresh.
minRefreshTime =
addedItems.reduce(((total, item) => item.duration + total), 0);
fetchingPlaylist = false;
});
};
refreshPlaylist();
stream.end = () => {
ended = true;
streamQueue.die();
requestQueue.die();
clearTimeout(refreshTimeout);
if (currPlaylist) {
currPlaylist.unpipe();
currPlaylist.abort();
}
if (currSegment) {
currSegment.unpipe();
currSegment.abort();
}
PassThrough.prototype.end.call(stream);
};
return stream;
};