"use strict"; /* * Copyright 2019 gRPC authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ Object.defineProperty(exports, "__esModule", { value: true }); const http2 = require("http2"); const url_1 = require("url"); const constants_1 = require("./constants"); const metadata_1 = require("./metadata"); const server_call_1 = require("./server-call"); function noop() { } const unimplementedStatusResponse = { code: constants_1.Status.UNIMPLEMENTED, details: 'The server does not implement this method', metadata: new metadata_1.Metadata(), }; const defaultHandler = { unary(call, callback) { callback(unimplementedStatusResponse, null); }, clientStream(call, callback) { callback(unimplementedStatusResponse, null); }, serverStream(call) { call.emit('error', unimplementedStatusResponse); }, bidi(call) { call.emit('error', unimplementedStatusResponse); }, }; // tslint:enable:no-any class Server { constructor(options) { this.http2Server = null; this.handlers = new Map(); this.sessions = new Set(); this.started = false; this.options = (options !== null && options !== void 0 ? options : {}); } addProtoService() { throw new Error('Not implemented. Use addService() instead'); } addService(service, implementation) { if (this.started === true) { throw new Error("Can't add a service to a started server."); } if (service === null || typeof service !== 'object' || implementation === null || typeof implementation !== 'object') { throw new Error('addService() requires two objects as arguments'); } const serviceKeys = Object.keys(service); if (serviceKeys.length === 0) { throw new Error('Cannot add an empty service to a server'); } serviceKeys.forEach(name => { const attrs = service[name]; let methodType; if (attrs.requestStream) { if (attrs.responseStream) { methodType = 'bidi'; } else { methodType = 'clientStream'; } } else { if (attrs.responseStream) { methodType = 'serverStream'; } else { methodType = 'unary'; } } let implFn = implementation[name]; let impl; if (implFn === undefined && typeof attrs.originalName === 'string') { implFn = implementation[attrs.originalName]; } if (implFn !== undefined) { impl = implFn.bind(implementation); } else { impl = defaultHandler[methodType]; } const success = this.register(attrs.path, impl, attrs.responseSerialize, attrs.requestDeserialize, methodType); if (success === false) { throw new Error(`Method handler for ${attrs.path} already provided.`); } }); } bind(port, creds) { throw new Error('Not implemented. Use bindAsync() instead'); } bindAsync(port, creds, callback) { if (this.started === true) { throw new Error('server is already started'); } if (typeof port !== 'string') { throw new TypeError('port must be a string'); } if (creds === null || typeof creds !== 'object') { throw new TypeError('creds must be an object'); } if (typeof callback !== 'function') { throw new TypeError('callback must be a function'); } const url = new url_1.URL(`http://${port}`); const options = { host: url.hostname, port: +url.port }; const serverOptions = {}; if ('grpc.max_concurrent_streams' in this.options) { serverOptions.settings = { maxConcurrentStreams: this.options['grpc.max_concurrent_streams'] }; } if (creds._isSecure()) { const secureServerOptions = Object.assign(serverOptions, creds._getSettings()); this.http2Server = http2.createSecureServer(secureServerOptions); } else { this.http2Server = http2.createServer(serverOptions); } this.http2Server.setTimeout(0, noop); this._setupHandlers(); function onError(err) { callback(err, -1); } this.http2Server.once('error', onError); this.http2Server.listen(options, () => { const server = this.http2Server; const port = server.address().port; server.removeListener('error', onError); callback(null, port); }); } forceShutdown() { // Close the server if it is still running. if (this.http2Server && this.http2Server.listening) { this.http2Server.close(); } this.started = false; // Always destroy any available sessions. It's possible that one or more // tryShutdown() calls are in progress. Don't wait on them to finish. this.sessions.forEach(session => { // Cast NGHTTP2_CANCEL to any because TypeScript doesn't seem to // recognize destroy(code) as a valid signature. // tslint:disable-next-line:no-any session.destroy(http2.constants.NGHTTP2_CANCEL); }); this.sessions.clear(); } register(name, handler, serialize, deserialize, type) { if (this.handlers.has(name)) { return false; } this.handlers.set(name, { func: handler, serialize, deserialize, type, }); return true; } start() { if (this.http2Server === null || this.http2Server.listening !== true) { throw new Error('server must be bound in order to start'); } if (this.started === true) { throw new Error('server is already started'); } this.started = true; } tryShutdown(callback) { let pendingChecks = 0; function maybeCallback() { pendingChecks--; if (pendingChecks === 0) { callback(); } } // Close the server if necessary. this.started = false; if (this.http2Server && this.http2Server.listening) { pendingChecks++; this.http2Server.close(maybeCallback); } // If any sessions are active, close them gracefully. pendingChecks += this.sessions.size; this.sessions.forEach(session => { session.close(maybeCallback); }); // If the server is closed and there are no active sessions, just call back. if (pendingChecks === 0) { callback(); } } addHttp2Port() { throw new Error('Not yet implemented'); } _setupHandlers() { if (this.http2Server === null) { return; } this.http2Server.on('stream', (stream, headers) => { const contentType = headers[http2.constants.HTTP2_HEADER_CONTENT_TYPE]; if (typeof contentType !== 'string' || !contentType.startsWith('application/grpc')) { stream.respond({ [http2.constants.HTTP2_HEADER_STATUS]: http2.constants.HTTP_STATUS_UNSUPPORTED_MEDIA_TYPE, }, { endStream: true }); return; } try { const path = headers[http2.constants.HTTP2_HEADER_PATH]; const handler = this.handlers.get(path); if (handler === undefined) { throw unimplementedStatusResponse; } const call = new server_call_1.Http2ServerCallStream(stream, handler); const metadata = call.receiveMetadata(headers); switch (handler.type) { case 'unary': handleUnary(call, handler, metadata); break; case 'clientStream': handleClientStreaming(call, handler, metadata); break; case 'serverStream': handleServerStreaming(call, handler, metadata); break; case 'bidi': handleBidiStreaming(call, handler, metadata); break; default: throw new Error(`Unknown handler type: ${handler.type}`); } } catch (err) { const call = new server_call_1.Http2ServerCallStream(stream, null); if (err.code === undefined) { err.code = constants_1.Status.INTERNAL; } call.sendError(err); } }); this.http2Server.on('session', session => { if (!this.started) { session.destroy(); return; } this.sessions.add(session); }); } } exports.Server = Server; async function handleUnary(call, handler, metadata) { const emitter = new server_call_1.ServerUnaryCallImpl(call, metadata); const request = await call.receiveUnaryMessage(); if (request === undefined || call.cancelled) { return; } emitter.request = request; handler.func(emitter, (err, value, trailer, flags) => { call.sendUnaryMessage(err, value, trailer, flags); }); } function handleClientStreaming(call, handler, metadata) { const stream = new server_call_1.ServerReadableStreamImpl(call, metadata, handler.deserialize); function respond(err, value, trailer, flags) { stream.destroy(); call.sendUnaryMessage(err, value, trailer, flags); } if (call.cancelled) { return; } stream.on('error', respond); handler.func(stream, respond); } async function handleServerStreaming(call, handler, metadata) { const request = await call.receiveUnaryMessage(); if (request === undefined || call.cancelled) { return; } const stream = new server_call_1.ServerWritableStreamImpl(call, metadata, handler.serialize); stream.request = request; handler.func(stream); } function handleBidiStreaming(call, handler, metadata) { const stream = new server_call_1.ServerDuplexStreamImpl(call, metadata, handler.serialize, handler.deserialize); if (call.cancelled) { return; } handler.func(stream); } //# sourceMappingURL=server.js.map