Skip to content

gRPC Support

super-http ships a first-class gRPC client via the super-http/grpc entry point. It is TypeScript-first (no .proto files required), runs the same resilience pipeline as the HTTP client (circuit breaker, retry, bulkhead, rate limiter, metrics), and speaks the Connect-RPC JSON protocol over native Node.js HTTP/2 — with zero extra dependencies.

ts
import { defineService, unary, serverStream, createGrpcClient } from 'super-http/grpc'

const UserService = defineService('UserService', {
  getUser:   unary<{ id: string }, User>(),
  listUsers: serverStream<{ filter?: string }, User>(),
})

const client = createGrpcClient(UserService, 'grpcs://api.example.com:443', {
  preset: 'resilient-api',   // circuit breaker + retry x3 + bulkhead
})

const user = await client.getUser({ id: '1' })           // Promise<User>
for await (const u of client.listUsers({ filter: 'active' })) { … } // AsyncIterable<User>

Why not @grpc/grpc-js?

@grpc/grpc-jsnice-grpcConnect-RPCsuper-http/grpc
.proto files required❌ TypeScript-first
Circuit breaker✅ same as HTTP
Retry✅ same as HTTP
Bulkhead✅ same as HTTP
NestJS DImanualmanualmanual@InjectSuperHttp
Streaming backpressurecallbackscallbackscallbacksAsyncIterable
Presets✅ zero-config
Extra dependencies@grpc/grpc-jsnice-grpc + grpc-js@connectrpc/* + protobufnone

The core advantage: if you already use super-http for HTTP, adding gRPC requires zero new concepts — same .on() hooks, same presets, same MetricsSnapshot, same NestJS decorators.


Architecture


Installation

No extra packages needed — super-http/grpc uses Node.js native http2:

bash
npm install super-http
# Node.js >= 20 · TypeScript >= 5

Wire format

super-http/grpc uses the Connect-RPC JSON protocol by default (Content-Type: application/connect+json). This means any JSON-serialisable TypeScript type works as a request or response — no Message classes, no code generation.

To use protobuf binary encoding, see Binary encoding.


Service definition

Define your service once in TypeScript. This is the single source of truth — it drives both compile-time type inference and runtime dispatch.

ts
import { defineService, unary, serverStream, clientStream, bidi } from 'super-http/grpc'

// Your domain types
interface GetUserRequest { id: string }
interface ListFilter      { active?: boolean; role?: string }
interface CreateUserInput { name: string; email: string }
interface User            { id: string; name: string; email: string }
interface LogEntry        { level: string; message: string; timestamp: number }
interface UploadSummary   { received: number; failed: number }
interface ChatMessage     { text: string; from: string }

const UserServiceDef = defineService('UserService', {
  // Unary: one request → one response
  getUser:    unary<GetUserRequest, User>(),
  createUser: unary<CreateUserInput, User>(),

  // Server streaming: one request → stream of responses
  listUsers:  serverStream<ListFilter, User>(),

  // Client streaming: stream of requests → one response
  uploadLogs: clientStream<LogEntry, UploadSummary>(),

  // Bidirectional streaming: stream ↔ stream
  chat:       bidi<ChatMessage, ChatMessage>(),
})

Method call types

HelperDirectionTypeScript return
unary<TReq, TRes>()1 → 1(req: TReq) => Promise<TRes>
serverStream<TReq, TRes>()1 → N(req: TReq) => AsyncIterable<TRes>
clientStream<TReq, TRes>()N → 1(stream: AsyncIterable<TReq>) => Promise<TRes>
bidi<TReq, TRes>()N → N(stream: AsyncIterable<TReq>) => AsyncIterable<TRes>

Creating a client

ts
import { createGrpcClient } from 'super-http/grpc'

const client = createGrpcClient(UserServiceDef, 'grpcs://api.example.com:443', {
  preset:  'resilient-api',
  headers: { 'x-api-key': process.env.API_KEY! },
})

Address formats

FormatProtocol
grpc://host:portInsecure HTTP/2
grpcs://host:portTLS HTTP/2
host:portInsecure HTTP/2 (shorthand)
http://host:portInsecure HTTP/2
https://host:portTLS HTTP/2

Making calls

Unary

ts
// Fully typed — TypeScript infers User from the service definition
const user = await client.getUser({ id: '42' })
console.log(user.name)

// Per-call options: custom headers, timeout, cancellation
const ac = new AbortController()
const user2 = await client.getUser({ id: '1' }, {
  metadata:  { 'x-request-id': requestId },
  timeoutMs: 5_000,
  signal:    ac.signal,
})

Server streaming

ts
// Async generator — native backpressure, no callbacks
for await (const user of client.listUsers({ active: true })) {
  console.log(user.name)
  await processUser(user)   // if slow, HTTP/2 flow control pauses the stream
}

Client streaming

ts
async function* generateLogs(): AsyncIterable<LogEntry> {
  for (const entry of logBuffer) {
    yield entry
  }
}

const summary = await client.uploadLogs(generateLogs())
console.log(`Uploaded ${summary.received} logs, ${summary.failed} failed`)

Bidirectional streaming

ts
async function* messages(): AsyncIterable<ChatMessage> {
  yield { text: 'Hello!', from: 'alice' }
  yield { text: 'How are you?', from: 'alice' }
}

for await (const reply of client.chat(messages())) {
  console.log(`${reply.from}: ${reply.text}`)
}

Configuration

Presets (zero-config)

Same preset names as the HTTP client:

ts
// resilient-api: circuit breaker + retry x3 + bulkhead
const client = createGrpcClient(Svc, address, { preset: 'resilient-api' })

// high-throughput: 4 sessions, retry x1, no CB
const client = createGrpcClient(Svc, address, { preset: 'high-throughput' })

// low-latency: 4 sessions, 2s timeout, no retry
const client = createGrpcClient(Svc, address, { preset: 'low-latency' })
PresetSessionsTimeoutRetryCircuit BreakerBulkhead
high-throughput48 s1x jitter
resilient-api215 s3x jitter10 failures → open50 concurrent
low-latency42 s

Manual configuration

All preset fields are individually overridable:

ts
import { ExponentialJitterRetryStrategy } from 'super-http'

const client = createGrpcClient(UserServiceDef, 'grpcs://api:443', {
  // Connection
  maxSessions:  2,
  timeoutMs:    10_000,
  headers:      { 'x-service': 'my-app' },

  // Retry
  retries:       3,
  retryStrategy: new ExponentialJitterRetryStrategy(100, 5_000),

  // Circuit breaker
  circuitBreaker: {
    failureThreshold: 10,
    successThreshold:  3,
    timeoutMs:        10_000,
  },

  // Bulkhead
  bulkhead: {
    maxConcurrent: 50,
    maxQueue:      200,
    queueTimeoutMs: 5_000,
  },

  // Rate limiter
  rateLimit: {
    permitLimit: 500,
    windowMs:   1_000,
  },

  // Wire format
  encoding: 'json',      // or 'proto' (requires @bufbuild/protobuf)
  protocol: 'connect',   // or 'grpc' / 'grpc-web'
})

Observability hooks

ts
const client = createGrpcClient(UserServiceDef, 'grpcs://api:443', {
  preset: 'resilient-api',
  on: {
    onRetry: ({ attempt, delayMs, error }) =>
      logger.warn(`gRPC retry #${attempt} in ${delayMs}ms: ${error}`),

    onCircuitStateChange: ({ from, to, failures }) =>
      to === 'open' && alerts.critical(`gRPC circuit open (${failures} failures)`),

    onBulkheadReject: ({ active, queued }) =>
      metrics.inc('grpc.bulkhead.reject', { active, queued }),
  },
})

// Or add hooks after creation
client.on({
  onRetry: ({ attempt }) => console.warn(`retry #${attempt}`),
})

Error handling

All non-OK gRPC responses throw a GrpcError:

ts
import { GrpcError } from 'super-http/grpc'

try {
  const user = await client.getUser({ id: 'missing' })
} catch (err) {
  if (err instanceof GrpcError) {
    console.log(err.code)     // 'not_found'
    console.log(err.message)  // 'user not found'
    console.log(err.details)  // optional structured details
  }
}

Status codes and resilience decisions

The retry and circuit breaker layers use gRPC status codes to decide automatically what to do:

CodeRetryableTrips circuitNotes
unavailableService down — retry and trip
resource_exhaustedBackpressure — retry after delay
deadline_exceededTimeout — retry
abortedOptimistic lock conflict — retry
unknownUnknown server error
internalServer bug — trip, don't retry
not_foundClient error — fail fast
permission_deniedAuth error — fail fast
invalid_argumentClient error — fail fast
unimplementedMethod not available

You can inspect a code's decision programmatically:

ts
import { getDecision } from 'super-http/grpc'

const { retryable, tripCircuit } = getDecision('unavailable')
// { retryable: true, tripCircuit: true }

Resilience pipeline explained

Streaming pipeline

For streaming calls, the circuit breaker and rate limiter wrap only the stream open phase. The actual message flow is consumer-paced via AsyncIterable — if your consumer is slow, HTTP/2 flow control naturally pauses the server.


Metrics

GrpcClient uses the same MetricsCollector as HttpClient:

ts
const m = client.metrics()

console.log(m.requests)            // total RPC calls
console.log(m.success)             // successful calls
console.log(m.failed)              // failed calls
console.log(m.retries)             // retry attempts fired
console.log(m.circuitBreakerTrips) // times circuit opened
console.log(m.p50Latency)          // p50 call latency (ms)
console.log(m.p99Latency)          // p99 call latency (ms)

// Reset counters
client.resetMetrics()

NestJS integration

gRPC clients work with the same SuperHttpModule.forFeature() and @InjectSuperHttp() as HTTP clients.

Mixed HTTP + gRPC feature registration

ts
// posts.module.ts
import { Module } from '@nestjs/common'
import { SuperHttpModule } from 'super-http/nestjs'
import { UserServiceDef } from './user-service.def'

@Module({
  imports: [
    SuperHttpModule.forFeature([
      // HTTP client
      {
        name:    'CATALOG_HTTP',
        baseURL: 'https://catalog.internal',
        preset:  'high-throughput',
      },
      // gRPC client — distinguished by grpc: true
      {
        name:    'USER_GRPC',
        grpc:    true,
        address: 'user-service.internal:50051',
        service: UserServiceDef,
        preset:  'resilient-api',
      },
    ]),
  ],
  providers: [PostsService],
  exports:   [PostsService],
})
export class PostsModule {}

Injecting a gRPC client

ts
// posts.service.ts
import { Injectable } from '@nestjs/common'
import { InjectSuperHttp } from 'super-http/nestjs'
import type { GrpcClient } from 'super-http/grpc'
import type { UserServiceDef } from './user-service.def'

@Injectable()
export class PostsService {
  constructor(
    @InjectSuperHttp('USER_GRPC')
    private readonly users: GrpcClient<typeof UserServiceDef>,
  ) {}

  async getPostWithAuthor(postId: string) {
    const [post, author] = await Promise.all([
      this.fetchPost(postId),
      this.users.getUser({ id: postId }),  // fully typed
    ])
    return { ...post, author }
  }

  async *streamActiveUsers() {
    yield* this.users.listUsers({ active: true })
  }
}

Unit testing with mocks

ts
// posts.service.spec.ts
import { getSuperHttpClientToken } from 'super-http/nestjs'

const mockUserGrpc = {
  getUser:   jest.fn(),
  listUsers: jest.fn(),
}

describe('PostsService', () => {
  let service: PostsService

  beforeEach(async () => {
    const module = await Test.createTestingModule({
      providers: [
        PostsService,
        {
          provide:   getSuperHttpClientToken('USER_GRPC'),
          useValue:  mockUserGrpc,
        },
      ],
    }).compile()

    service = module.get(PostsService)
    jest.clearAllMocks()
  })

  it('fetches user via gRPC', async () => {
    mockUserGrpc.getUser.mockResolvedValue({ id: '1', name: 'Ana', email: 'ana@acme.com' })

    const result = await service.getPostWithAuthor('1')
    expect(result.author.name).toBe('Ana')
    expect(mockUserGrpc.getUser).toHaveBeenCalledWith({ id: '1' })
  })
})

Binary encoding (protobuf)

By default, super-http/grpc uses JSON encoding. To use protobuf binary (smaller payloads, stricter schema), install @bufbuild/protobuf and use generated Message classes as your types:

bash
npm install @bufbuild/protobuf
# plus your code generator of choice (e.g. buf)
ts
import { GetUserRequest, User } from './gen/user_pb'   // generated Message classes

const UserServiceDef = defineService('UserService', {
  getUser: unary<GetUserRequest, User>(),
})

const client = createGrpcClient(UserServiceDef, 'grpcs://api:443', {
  encoding: 'proto',   // switch to binary
})

Proto binary requires Message classes

encoding: 'proto' requires that your request and response types extend @bufbuild/protobuf's Message base class. Plain TypeScript interfaces only work with encoding: 'json' (the default).


Protocol compatibility

The protocol option controls the wire framing:

ValueContent-TypeWorks with
connect (default)application/connect+jsonConnect-RPC servers (buf.build, any modern gRPC server with Connect adapter)
grpcapplication/grpc+jsonStandard gRPC over HTTP/2 (any server)
grpc-webapplication/grpc-web+jsongRPC-Web (through proxies, Envoy)
ts
// Connecting to a standard gRPC server (not Connect-RPC)
const client = createGrpcClient(UserServiceDef, 'grpcs://api:443', {
  protocol: 'grpc',
  encoding: 'json',   // JSON-encoded protobuf fields
})

Advanced: non-idempotent mutations

Disable retry for mutations that must not be sent twice:

ts
// Pass retry: false as a per-call option
const newUser = await client.createUser(
  { name: 'Ana', email: 'ana@acme.com' },
  { retry: false },   // non-idempotent — no retry
)

Advanced: cancellation

ts
const ac = new AbortController()

// Cancel after 2 s
setTimeout(() => ac.abort(), 2_000)

try {
  const user = await client.getUser({ id: '1' }, { signal: ac.signal })
} catch (err) {
  if (err instanceof GrpcError && err.code === 'canceled') {
    console.log('Request was cancelled')
  }
}

Advanced: custom per-call metadata

ts
// Propagate tracing headers per call
const user = await client.getUser({ id: '1' }, {
  metadata: {
    'x-trace-id':   span.traceId,
    'x-request-id': requestId,
    'authorization': `Bearer ${token}`,
  },
})

Channel management

HTTP/2 sessions are cached automatically by GrpcChannelRegistry. Each address gets up to maxSessions sessions (default 2), and HTTP/2 multiplexes thousands of concurrent RPCs over each session.

ts
import { GrpcChannelRegistry } from 'super-http/grpc'

// Current open sessions (for health endpoints)
console.log(GrpcChannelRegistry.sessionCount)

// Graceful drain — waits for in-flight RPCs to complete
await GrpcChannelRegistry.closeAddress('grpcs://api.example.com:443')

// Close all sessions (e.g. on app shutdown)
await GrpcChannelRegistry.closeAll()

// Immediate destroy (useful in tests)
GrpcChannelRegistry.clear()

Testing patterns

Unit tests — mock the transport

ts
import { GrpcTransport } from 'super-http/grpc'

jest.spyOn(GrpcTransport.prototype, 'call').mockResolvedValue({
  data: { id: '1', name: 'Ana', email: 'ana@acme.com' },
  transportType: 'grpc',
})

const client = createGrpcClient(UserServiceDef, 'grpc://localhost:50051')
const user = await client.getUser({ id: '1' })
expect(user.name).toBe('Ana')

Test server streaming

ts
jest.spyOn(GrpcTransport.prototype, 'serverStream').mockReturnValue(
  (async function* () {
    yield { id: '1', name: 'Ana',  email: 'a@x.com' }
    yield { id: '2', name: 'Bob',  email: 'b@x.com' }
  })(),
)

const users: User[] = []
for await (const u of client.listUsers({})) users.push(u)
expect(users).toHaveLength(2)

Integration tests — real server

Use any Connect-RPC compatible test server. The Connect conformance suite provides test servers for all languages.

ts
// Clean up sessions between tests to avoid state leakage
afterEach(() => GrpcChannelRegistry.clear())

Complete example

ts
import {
  defineService, unary, serverStream, clientStream, bidi,
  createGrpcClient, GrpcError, GrpcChannelRegistry,
} from 'super-http/grpc'
import { ExponentialJitterRetryStrategy, LoggerPlugin } from 'super-http'

// ── 1. Define the service contract ──────────────────────────────────────────

interface GetUserRequest  { id: string }
interface ListFilter      { role?: string; active?: boolean }
interface User            { id: string; name: string; email: string; role: string }
interface LogEntry        { level: string; message: string }
interface UploadSummary   { received: number }

const UserServiceDef = defineService('UserService', {
  getUser:    unary<GetUserRequest, User>(),
  listUsers:  serverStream<ListFilter, User>(),
  uploadLogs: clientStream<LogEntry, UploadSummary>(),
})

// ── 2. Create a fully resilient client ─────────────────────────────────────

const userClient = createGrpcClient(
  UserServiceDef,
  process.env.USER_SERVICE_ADDRESS ?? 'grpcs://user-service:443',
  {
    preset:   'resilient-api',
    headers:  { 'x-service': 'checkout' },
    maxSessions: 2,

    // Fine-tune the preset
    circuitBreaker: { failureThreshold: 10, successThreshold: 3, timeoutMs: 10_000 },
    retryStrategy:  new ExponentialJitterRetryStrategy(100, 5_000),

    on: {
      onRetry: ({ attempt, error }) =>
        console.warn(`[users] retry #${attempt}: ${String(error)}`),
      onCircuitStateChange: ({ from, to, failures }) =>
        console.error(`[users] circuit ${from} → ${to} (${failures} failures)`),
    },
  },
)

// ── 3. Unary call with error handling ──────────────────────────────────────

async function getUser(id: string): Promise<User | null> {
  try {
    return await userClient.getUser({ id })
  } catch (err) {
    if (err instanceof GrpcError && err.code === 'not_found') return null
    throw err
  }
}

// ── 4. Server streaming with backpressure ──────────────────────────────────

async function printActiveAdmins(): Promise<void> {
  for await (const user of userClient.listUsers({ role: 'admin', active: true })) {
    await sendEmail(user.email)   // consumer pace drives HTTP/2 flow control
  }
}

// ── 5. Client streaming ────────────────────────────────────────────────────

async function* collectLogs(): AsyncIterable<LogEntry> {
  for (const entry of logBuffer) yield entry
}

async function flushLogs(): Promise<void> {
  const { received } = await userClient.uploadLogs(collectLogs(), { retry: false })
  console.log(`Flushed ${received} log entries`)
}

// ── 6. Metrics health endpoint ─────────────────────────────────────────────

function healthCheck() {
  const m = userClient.metrics()
  return {
    status:   m.circuitBreakerTrips > 0 ? 'degraded' : 'ok',
    grpc:     { requests: m.requests, errors: m.failed, p99: m.p99Latency },
    sessions: GrpcChannelRegistry.sessionCount,
  }
}

// ── 7. Graceful shutdown ───────────────────────────────────────────────────

process.on('SIGTERM', async () => {
  await GrpcChannelRegistry.closeAll()
  process.exit(0)
})

API reference

defineService(name, methods)

Creates a ServiceDefinition — the TypeScript-first service contract.

ParamTypeDescription
namestringFully-qualified service name (used in HTTP/2 path)
methodsServiceMethodsMap of method name → descriptor

unary<TReq, TRes>()

Returns a UnaryMethodDescriptor. Client API: (req: TReq) => Promise<TRes>.

serverStream<TReq, TRes>()

Returns a ServerStreamMethodDescriptor. Client API: (req: TReq) => AsyncIterable<TRes>.

clientStream<TReq, TRes>()

Returns a ClientStreamMethodDescriptor. Client API: (stream: AsyncIterable<TReq>) => Promise<TRes>.

bidi<TReq, TRes>()

Returns a BidiStreamMethodDescriptor. Client API: (stream: AsyncIterable<TReq>) => AsyncIterable<TRes>.

createGrpcClient(definition, address, config?)

Creates a GrpcClient<TDef>. The returned object exposes:

  • One typed method per service method
  • .metrics()MetricsSnapshot
  • .resetMetrics() — clears counters
  • .on(events) — add resilience hooks
  • .close() — gracefully closes HTTP/2 sessions

GrpcError

Thrown for all non-OK gRPC responses.

PropertyTypeDescription
.codeGrpcCodegRPC status code string (e.g. 'not_found')
.messagestringHuman-readable error message
.detailsunknown[]Optional structured error details
.metadataRecord<string, string>Optional response metadata

GrpcClientConfig

FieldTypeDefaultDescription
presetPreset'high-throughput' | 'resilient-api' | 'low-latency'
encoding'json' | 'proto''json'Wire encoding
protocol'connect' | 'grpc' | 'grpc-web''connect'Wire protocol
timeoutMsnumber15000Default call timeout
headersRecord<string, string>Default headers / metadata
maxSessionsnumber2HTTP/2 sessions per address
retriesnumber0Max retry attempts
retryStrategyRetryStrategyExponentialJitter(100, 10k)Backoff strategy
circuitBreakerCircuitBreakerConfigCircuit breaker config
bulkheadBulkheadConfigBulkhead config
rateLimitRateLimitConfigRate limit config
onResilienceEventsEvent hooks

Released under the MIT License.