在Node.js中提供gRPC服务

简介

gRPC 是一个高性能、开源的、通用的、面向移动端的 RPC 框架,传输协议基于 HTTP/2,这意味着它支持 双向流、流控、头部压缩、单 TCP 连接上的请求多路复用 等特性。

接口层面,gRPC默认使用 Protocol Buffers (简称 protobuf)做为其接口定义语言(IDL)来描述其服务接口和负载消息的格式。

简单使用

  • 初始化项目npm init -y

  • 下载grpc @grpc/proto-loadernpm i grpc @grpc/proto-loader --save

proto

  • 在根目录下新建proto/目录

  • proto/目录下新建test.proto文件

syntax = "proto3";
 
package testPackage;
 
service testService {
  rpc ping (pingRequest) returns (pingReply) {}
}
 
message pingRequest {
}
 
message pingReply {
  string message = 1;
}

服务端

  • 在根目录下新建server.js文件
const path = require('path') const grpc = require('grpc') // proto const PROTO_PATH = path.resolve(__dirname, './proto/test.proto') const testProto = grpc.load(PROTO_PATH).testPackage function test(call, callback) { callback(null, { message: 'Pong' }) } const server = new grpc.Server() server.addProtoService(testProto.testService.service, { ping: test }) // 这里使用的是 grpc.ServerCredentials.createInsecure() 创建的非安全连接 // 如果是在公网提供 RPC 服务,请参考鉴权手册选择适合的方案: http://www.grpc.io/docs/guides/auth.html server.bind('0.0.0.0:50051', grpc.ServerCredentials.createInsecure()) server.start()
  • 启动服务端
node .\server.js # (node:12632) DeprecationWarning: grpc.load: Use the @grpc/proto-loader module with grpc.loadPackageDefinition instead # (node:12632) DeprecationWarning: Server#addProtoService: Use Server#addService instead
  • 修改下server.js,去掉warning。再重新启动
const path = require('path') const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') // proto const PROTO_PATH = path.resolve(__dirname, './proto/test.proto') const packageDefinition = protoLoader.loadSync(PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }) const testProto = grpc.loadPackageDefinition(packageDefinition).testPackage; function test(call, callback) { callback(null, { message: 'Pong' }) } const server = new grpc.Server() server.addService(testProto.testService.service, { ping: test }) server.bind('0.0.0.0:50051', grpc.ServerCredentials.createInsecure()) server.start()

客户端

  • 在根目录下新建client.js文件
const path = require('path') const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') const PROTO_PATH = path.resolve(__dirname, './proto/test.proto') const packageDefinition = protoLoader.loadSync(PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }) const testProto = grpc.loadPackageDefinition(packageDefinition).testPackage const client = new testProto.testService('0.0.0.0:50051', grpc.credentials.createInsecure()) client.ping({}, function(err, res) { console.log(err) console.log(res) console.log('ping -> :', res.message) })
  • 执行node .\client.js,即可看到结果

优化

  • .proto 文件置于指定文件夹,自动加载
  • 包和服务名称不需要硬编码,全由调用端动态指定
  • 可同时暴露/调用 多个包的多个RPC端点

为了能够自动加载,我们做如下约束

  • 所有的.proto 文件置于proto/目录下
  • xxxx.proto 文件,内部的package名为xxxxPackage;内部的service名为xxxxService
  • 服务端的处理方法写成单独的.js文件;名称与.proto 文件命名一样;对应的方法写在里面
  • 服务的地址、端口号等写在配置文件中

配置

  • 在根目录下新建config/目录

  • config/目录下新建index.js文件,存放地址、端口号等

const path = require('path') const config = { rpc: { address: '0.0.0.0', port: 50051, protoDir: path.join(__dirname, '../proto/') } } module.exports = config

proto

  • proto/目录下已经有test.proto文件了,再加个rpc
syntax = "proto3";
 
package testPackage;
 
service testService {
  rpc ping (pingRequest) returns (pingReply) {}
  rpc test (testRequest) returns (pingReply) {}
}
 
message pingRequest {
}
 
message pingReply {
  string message = 1;
}

message testRequest {
  string test = 1;
}
  • proto/目录下新建test.js文件
function ping(call, callback) { callback(null, { message: 'Pong' }) } function test(call, callback) { console.log(call.request) const { request } = call callback(null, { message: 'test params: ' + request.test }) } module.exports = { ping, test }
  • 优化下proto/test.js文件,改成class写法,方便继承等(这里使用静态方法,不用实例化即可调用)
class testFuns { static ping(call, callback) { callback(null, { message: 'Pong' }) } static test(call, callback) { console.log(call.request) const { request } = call callback(null, { message: 'test params: ' + request.test }) } } module.exports = testFuns
  • 测试多个proto。在proto/目录下新建hello.proto文件
syntax = "proto3";

package helloPackage;


service helloService {
  rpc SayHello (HelloRequest) returns (HelloReply) {}

  rpc printAge (printAgeRequest) returns (printAgeReply) {}
}

message HelloRequest {
  string name = 1;
  string city = 2;
}

message HelloReply {
  string message = 1;
}

message printAgeRequest {
  string age = 1;
}

message printAgeReply {
  string text = 1;
}
  • proto/目录下新建hello.js文件
class HelloFuns { static SayHello(call, callback) { try { let data = 'hello ' + call.request.name + ' and city is ' + call.request.city; callback && callback(null, {message: data}) } catch (err) { console.log('错误'); callback && callback(err) } } static printAge(call, callback) { try{ let text = 'current age is ' + call.request.age; callback && callback(null, {text}) } catch (err) { console.log('错误'); callback && callback(err) } } } module.exports = HelloFuns

服务端

  • 在根目录下新增rpcServer.js文件
const path = require('path') const fs = require('fs') const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') const logger = console class rpcServer { constructor(ip, port) { this.ip = ip this.port = port this.services = {} this.functions = {} } // 自动加载proto并且运行Server autoRun(protoDir) { // 这里我们有个规范 // xxx.proto -> package xxxPackage; -> service xxxService fs.readdir(protoDir, (err, files) => { if (err) { return logger.error(err) } files.forEach(file => { const filePart = path.parse(file) const packageName = filePart.name + 'Package' const serviceName = filePart.name + 'Service' const extName = filePart.ext const filePath = path.join(protoDir, file) if (extName === '.js') { // const functions = require(filePath).default // proto/xxx.js文件 -> 对象的写法 // const functions = require(filePath) // proto/xxx.js文件 -> 类-静态方法的写法 const className = require(filePath) const props = Object.getOwnPropertyNames(className) const functions = {} props.forEach(item => { typeof(className[item]) === 'function' && (functions[item] = className[item]) }) this.functions[serviceName] = Object.assign({}, functions) } else if (extName === '.proto') { const packageDefinition = protoLoader.loadSync(filePath, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }) this.services[serviceName] = grpc.loadPackageDefinition(packageDefinition)[packageName][serviceName].service } }) return this.runServer() }) } runServer() { const server = new grpc.Server() Object.keys(this.services).forEach(serviceName => { const service = this.services[serviceName] server.addService(service, this.functions[serviceName]) }) server.bind(`${this.ip}:${this.port}`, grpc.ServerCredentials.createInsecure()) server.start() } } module.exports = rpcServer
  • 修改server.js文件
const RpcServer = require('./rpcServer') const rpcConfig = require('./config/index').rpc const logger = console logger.info('Starting RPC Server: ') const rpcServer = new RpcServer(rpcConfig.address, rpcConfig.port) rpcServer.autoRun(rpcConfig.protoDir)

客户端

  • 在根目录下新增rpcClient.js文件
const path = require('path') const fs = require('fs') const grpc = require('grpc') const protoLoader = require('@grpc/proto-loader') const { resolve } = require('path') const logger = console class rpcClient { constructor(ip, port) { this.ip = ip this.port = port this.services = {} this.clients = {} } // 自动加载proto并且connect autoRun(protoDir) { return new Promise((resolve, reject) => { fs.readdir(protoDir, (err, files) => { err && reject(err) files.forEach(file => { const filePart = path.parse(file) const packageName = filePart.name + 'Package' const serviceName = filePart.name + 'Service' const extName = filePart.ext const filePath = path.join(protoDir, file) if (extName === '.proto') { const packageDefinition = protoLoader.loadSync(filePath, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }) const Service = grpc.loadPackageDefinition(packageDefinition)[packageName][serviceName] this.services[serviceName] = Service this.clients[serviceName] = new Service(`${this.ip}:${this.port}`, grpc.credentials.createInsecure()) } }) resolve(true) }) }) } async invoke(serviceName, name, params) { return new Promise((resolve, reject) => { function callback(err, res) { err ? reject(err) : resolve(res) } params = params || {} if (this.clients[serviceName] && this.clients[serviceName][name]) { this.clients[serviceName][name](params, callback) } else { const err = new Error(`RPC endpoint: "${serviceName}.${name}" does not exists.`) reject(err) } }) } } module.exports = rpcClient
  • 修改client.js文件
const RpcClient = require('./rpcClient') const rpcConfig = require('./config/index').rpc const logger = console let client = null async function initClient() { const rpcClient = new RpcClient(rpcConfig.address, rpcConfig.port) await rpcClient.autoRun(rpcConfig.protoDir) client = rpcClient } (async () => { try { !client && await initClient() // testService - ping // const result = await client.invoke('testService', 'ping') // testService - test // const result = await client.invoke('testService', 'test', { test: 'kkk' }) // helloService - SayHello const result = await client.invoke('helloService', 'SayHello', {name: 'locy', city: '上海'}) logger.log('Greeting: ', result.message) } catch (err) { logger.error(err) } })()

创作不易,若本文对你有帮助,欢迎打赏支持作者!

 分享给好友: