Jeff Bahr
jbahr@meta.com
90d · built 2026-05-28
90-day totals
- Commits
- 36
- Grow
- 5.7
- Maintenance
- 9.2
- Fixes
- 3.4
- Total ETV
- 18.3
30-day trajectory
Last 30 days vs. the 30 days before. Up arrows on Growth and ETV mean improvement; up arrow on Fixes share means more time on fixes (worse).
Daily performance
Daily ETV, stacked by Growth, Maintenance and Fixes.
Work-mix over time
Share of Growth / Maintenance / Fixes over a rolling 7-day window. Reads as 'where is effort flowing right now'.
Bug flow over time
Monthly bug flow attributed to this developer. The left bar (red) is bug impact this dev authored that was addressed in the given month — combining bugs others fixed for them and bugs they fixed themselves. The right bar is fixes they personally shipped that month, split between self-fixes (overlap with the red bar) and fixes done for someone else. X-axis is fix-time, not introduction-time — the Navigara API attributes bugs backward to the author at the moment the fix lands.
- Self-fix share
- 18%
- Bugs you introduced
- 1.5
- Bugs you fixed
- 7.1
Repository spread
Where this developer's commits land. Concentrated work (top1 > 80%) vs polymath spread (top1 < 30%).
Most impactful commits
Top 20 by ETV in the 90-day window.
- 2.9ETVAdd ALPN support to thrift java stack Summary: ## What this diff does Introduces a new server transport for Java Thrift — **`UnifiedServerTransport`** — that listens on a single TLS port and uses **ALPN (Application-Layer Protocol Negotiation)** to dynamically select the wire protocol per connection. Two protocols are advertised and supported: `"rs"` (RSocket) and `"thrift"` (THeader/Rocket). This enables in-place migration from Header → Rocket without touching listening ports, DNS, or connection routing. ## Why Today, a Java Thrift server is bound to exactly one wire protocol at process start (`LegacyServerTransportFactory` for Header, `RSocketServerTransportFactory` for Rocket). Migrating a service from Header to Rocket requires a port change, a load-balancer reconfiguration, or a full restart with a new transport. ALPN lets a single server speak both protocols simultaneously, so individual clients can be migrated one at a time without coordinating with the server-side rollout. This matches the C++ ThriftServer behavior, which has supported the same pattern for years. ## Architecture The diff adds five new classes under `com.facebook.thrift.transport.unified`: - **`UnifiedServerTransportFactory`** — `ServerTransportFactory` impl that constructs `UnifiedServerTransport` instances. Plugged into `RpcThriftServer` dispatch (see below). - **`UnifiedServerTransport`** — Reactor-Netty `TcpServer` bound to the configured address with TLS+ALPN. On `doOnConnection`, inspects `SslHandler.applicationProtocol()` and routes: - ALPN selected `"rs"` → adds RSocket length-field decoder, hands the connection to `RSocketServer.asConnectionAcceptor()` wrapped in `TcpDuplexConnection`. - ALPN selected `"thrift"` → adds `ThriftHeaderFrameLengthBasedDecoder` + `LengthFieldPrepender` and hands the connection to `ThriftConnectionAcceptor`. - ALPN absent or unrecognised → **falls back to HEADER** (matches C++ ThriftServer behavior; downgrades from "fail closed" to "fail safe" so legacy clients without ALPN still work). - **`ThriftConnectionAcceptor`** — Reactor-Netty connection handler that bridges Netty inbound bytes to `RpcServerHandler` via `ReactiveHeaderCodec`. Decodes `ByteBuf` → `ThriftFrame`, dispatches request kind (oneway / req-resp), encodes response back, manages timeouts and per-request reference counting. - **`ReactiveHeaderCodec`** — Stateless encoder/decoder for THeader frames, ported from `HeaderTransportCodec` with full parity to landed bug fixes (multi-transform decompression via `CompressionManager` in reverse order, persistent-only header-section parsing, `readString` bounds check, per-header error logging on parse failure). - **`UnifiedServerTransport$ThriftChannelInitializer`** — `ChannelPipelineConfigurer` that enforces `ThriftServerConfig.connectionLimit` and installs `MetricsChannelDuplexHandler` for accepted/rejected/dropped connection counters. Supporting changes: - **`ThriftTransportType`** — adds an `UNKNOWN` enum constant; `fromProtocol(null | unrecognised)` now returns `UNKNOWN` (instead of throwing) so the unified server can apply the fallback-to-HEADER policy uniformly. - **`RpcServerUtils`** — `getSslContext()` configures `ApplicationProtocolConfig` with `[RSOCKET, HEADER]` when `config.isEnableAlpn()`. Also adds `isSslCloseNotify(Throwable)` helper to filter benign TLS shutdown alerts from error logs. - **`RpcThriftServer`** — dispatch logic now routes to `UnifiedServerTransportFactory` first when `config.isEnableAlpn()`, then `RSocketServerTransportFactory` if `useRSocket`, then `LegacyServerTransportFactory` (the old default). - **Client side** (`ThriftClientInitializer`, `LegacyRpcClientFactory`) — propagates the remote `InetSocketAddress` into `SslContext.newHandler(...)` so the client SNI/ALPN handshake completes correctly. No client behavior change when ALPN is disabled. ## Migration strategy **Opt-in via config flag.** The new transport is activated only when `ThriftServerConfig.setEnableAlpn(true)` is called. Default behavior is unchanged — services that don't opt in continue to use `LegacyServerTransportFactory` or `RSocketServerTransportFactory` exactly as before. **Backwards compatible at the wire level.** A unified server with ALPN enabled accepts: - ALPN-aware clients negotiating `"thrift"` → routed to Header path (same wire format as existing Header servers). - ALPN-aware clients negotiating `"rs"` → routed to Rocket path (same wire format as existing Rocket servers). - Legacy clients that don't send an ALPN extension at all → falls back to Header (per the new no-ALPN handling). This means a Header-speaking legacy client can connect to an ALPN-enabled server without modification. **No client SDK changes required for adoption.** Any existing TLS-enabled Java client that does TLS handshake against an ALPN-enabled server will work — either via ALPN negotiation (when the client supports ALPN) or via the no-ALPN HEADER fallback. **Per-service rollout.** Services adopt the unified transport one at a time by setting `setEnableAlpn(true)` in their `ThriftServerConfig`. **Rollback.** A single-line config revert (`setEnableAlpn(false)`) returns a service to `LegacyServerTransportFactory`. No data-format compatibility concerns — the wire formats are unchanged on each ALPN path. Reviewed By: adolfojunior, RayanRal Differential Revision: D87745292 fbshipit-source-id: a5587ceef699ecf6893a9dcd97ab8fab19596af4github.com-facebook-fbthrift · 972aa12a · 2026-04-30
- 2.0ETVImprove MonoTimeoutTransformer's reactive compliance and multithreaded safety Summary: # MonoTimeoutTransformer: Complete Rewrite for Reactive Streams Compliance ## Summary: This diff is a complete rewrite of MonoTimeoutTransformer to fix multiple race conditions, ensure Reactive Streams specification compliance, and add comprehensive test coverage. ## Why This Rewrite Was Necessary The original implementation had fundamental design flaws: 1. **No explicit state machine**: Used `isDisposed()` checks which are not atomic with respect to signal emission, allowing races. 2. **Incorrect fallback subscription**: Subscribed fallback directly to `actual::onNext, actual::onError, actual::onComplete` method references, bypassing cancellation checks entirely. 3. **Race conditions**: Multiple unhandled races between: - Source emission vs timeout firing (Race 1) - Cancel vs fallback subscription setup (Race 2) - Cancel after timeout but before scheduler execution (Race 3) 4. **No timer cleanup verification**: Timer cancellation on normal completion was not guaranteed. 5. **Untestable**: Timer was hardcoded, making deterministic race testing impossible. ## The Three Critical Race Conditions ### Race 1: Source vs Timeout (Signal Race) ``` Source Thread Timer Thread │ │ │── onNext() ──────────────────────────│── run(Timeout) ── │ [want to emit value] │ [want to emit error] │ │ └──────────── WHO WINS? ───────────────┘ ``` **Old behavior**: Both could win, potentially emitting both value and error. **New behavior**: Atomic CAS on STATE ensures exactly one winner. ### Race 2: Cancel vs Fallback Setup ``` Scheduler Thread Downstream Thread │ │ │─ fallback.subscribe(actual) ────────────────▶│ │ │── cancel() ── │ │ [too late?] ``` **Old behavior**: Fallback subscription ignored cancellation. **New behavior**: FallbackSubscriber uses Operators.set() to atomically check for cancellation before accepting the subscription. ### Race 3: Cancel After Timeout, Before Scheduler (TIMEOUT_MARKER Fix) ``` Timer Thread Downstream Thread Scheduler Thread │ │ │ │─ run(Timeout) ──────────▶│ │ │ [S → ???] │ │ │ [schedule(this)] │ │ │ │ │ │ │── cancel() ──────────────▶│ │ │ [Operators.terminate(S)]│ │ │ [did it work?] │ │ │ │ │ │ │─ run() ── │ │ │ [should skip?] ``` **Old behavior (if it existed)**: Would use `cancelledSubscription()` as marker in run(Timeout). But `cancel()` also uses `cancelledSubscription()`, so `Operators.terminate()` would see S as already cancelled and do nothing. The scheduler's `run()` would then proceed to emit signals to a cancelled subscriber. **New behavior**: Introduced `TIMEOUT_MARKER` - a distinct sentinel that marks "timeout fired, transitioning to fallback." This allows: 1. `run(Timeout)` sets S to TIMEOUT_MARKER (not cancelledSubscription) 2. If `cancel()` is called, `Operators.terminate()` successfully changes S from TIMEOUT_MARKER to cancelledSubscription() 3. `run()` uses CAS to check if S is still TIMEOUT_MARKER - if not, cancellation occurred and we skip signal emission ## Architecture Overview ### Old Architecture (Flawed) ``` ┌─────────────────────────────────────────────────────────────────────────────┐ │ MonoTimeoutTransformer (Old) │ │ │ │ ┌─────────────┐ ┌──────────────────────┐ ┌─────────────────────────┐ │ │ │ Source │───▶│ TimeoutSubscription │───▶│ Downstream │ │ │ │ Mono │ │ (just a holder) │ │ (CoreSubscriber) │ │ │ └─────────────┘ └──────────┬───────────┘ └─────────────────────────┘ │ │ │ │ │ │ creates │ │ ▼ │ │ ┌──────────────────────┐ │ │ │ SourceSubscriber │ ◀── extends BaseSubscriber │ │ │ (no state machine) │ (uses isDisposed()) │ │ └──────────────────────┘ │ │ │ │ Problems: │ │ - No atomic state management │ │ - Fallback subscribed via method references (no cancel check) │ │ - Race conditions between timeout/source/cancel │ └─────────────────────────────────────────────────────────────────────────────┘ ``` ### New Architecture (Correct) ``` ┌─────────────────────────────────────────────────────────────────────────────────┐ │ MonoTimeoutTransformer (New) │ │ │ │ ┌─────────────┐ ┌───────────────────┐ ┌────────────────────────────┐ │ │ │ Source │─────▶│ TimeoutSubscriber │─────▶│ Downstream │ │ │ │ Mono │ │ (CoreSubscriber,│ │ (CoreSubscriber) │ │ │ └─────────────┘ │ Subscription, │ └────────────────────────────┘ │ │ │ TimerTask, │ │ │ │ Runnable) │ │ │ └────────┬──────────┘ │ │ │ │ │ ┌────────────────────┼────────────────────┐ │ │ │ │ │ │ │ ▼ ▼ ▼ │ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────────────────────┐ │ │ │ STATE (atomic) │ │ S (atomic) │ │ FallbackSubscriber │ │ │ │ INIT=0 │ │ source sub │ │ (defensive cancel checks) │ │ │ │ VALUE_EMITTED=1│ │ TIMEOUT_MARKER │ │ - checks S before onNext │ │ │ │ TERMINATED=2 │ │ fallback sub │ │ - checks S before onError │ │ │ └─────────────────┘ │ CANCELLED │ │ - checks S before onComplete │ │ │ └─────────────────┘ └─────────────────────────────────┘ │ │ │ │ Key invariants: │ │ - STATE transitions are atomic (CAS) │ │ - Only one thread can win INIT → VALUE_EMITTED or INIT → TERMINATED │ │ - S tracks active subscription and cancellation state │ │ - TIMEOUT_MARKER enables Race 3 detection │ └─────────────────────────────────────────────────────────────────────────────────┘ ``` ## State Machine ``` ┌───────────────────────────────────────────────────┐ │ │ ▼ │ ┌──────────┐ │ │ INIT │◀─────────────────────────────────────────────┘ │ (0) │ (initial state) └────┬─────┘ │ ┌─────────┴─────────┬──────────────────┬──────────────────┐ │ │ │ │ │ onNext() │ timeout fires │ onComplete() │ cancel() │ [CAS succeeds] │ [CAS succeeds] │ [CAS succeeds] │ [getAndSet] ▼ │ │ │ ┌─────────────┐ │ │ │ │VALUE_EMITTED│ │ │ │ │ (1) │ │ │ │ └──────┬──────┘ │ │ │ │ │ │ │ │ onComplete() │ │ │ │ [CAS succeeds] │ │ │ ▼ ▼ ▼ ▼ ┌─────────────────────────────────────────────────────────────┐ │ TERMINATED (2) │ │ (terminal state - no further state transitions allowed) │ └─────────────────────────────────────────────────────────────┘ ``` ## Key Implementation Changes ### 1. Atomic State Management (Race 1 Fix) The original code had no coordination between source signals and timeout. Both could proceed simultaneously, potentially delivering multiple terminal signals. **Before - Source emits value:** ```java Override protected void hookOnNext(T value) { timeout.cancel(); // Cancel timer, but might be too late actual.onNext(value); // Emit value unconditionally } ``` **Before - Timeout fires:** ```java private void doTimeout(Timeout timeout) { if (!isDisposed() && !timeout.isCancelled()) { // Check disposed doTimeout(); // But source might emit between check and here! } } private void doTimeoutException() { TimeoutException e = new TimeoutException("..."); scheduler.schedule(() -> actual.onError(e)); // Emit error } ``` **The Race:** If source emits while timeout fires: 1. `hookOnNext` calls `timeout.cancel()` - but timer callback already started 2. `doTimeout` checks `isDisposed()` → false (source hasn't finished yet) 3. Both `actual.onNext(value)` AND `actual.onError(e)` can execute 4. Downstream receives BOTH value and error → violates Reactive Streams spec **After - Both paths compete via atomic CAS:** ```java Override public void onNext(T t) { // CAS: Only ONE thread can transition INIT → VALUE_EMITTED if (STATE.compareAndSet(this, STATE_INIT, STATE_VALUE_EMITTED)) { cancelTimer(); actual.onNext(t); } else { // Lost the race - drop the value Operators.onNextDropped(t, actual.currentContext()); } } Override // TimerTask.run - called by timer thread public void run(Timeout timeout) { // CAS: Only ONE thread can transition INIT → TERMINATED if (STATE.compareAndSet(this, STATE_INIT, STATE_TERMINATED)) { // Won the race - cancel source and emit timeout Subscription current = S.getAndSet(this, TIMEOUT_MARKER); if (current != null) current.cancel(); scheduler.schedule(this); // Will emit error or subscribe fallback } // If CAS fails, source already won - do nothing } ``` **Why this works:** The CAS operation is atomic. Exactly one of source or timeout can successfully change STATE from INIT. The loser's CAS returns false, and they drop their signal. ### 2. Atomic Subscription Management (Race 2 & 3 Fix) The original code had no atomic tracking of subscription state. The new code uses an atomic `S` field to track the active subscription and detect cancellation. **Before:** ```java private static class TimeoutSubscription<T> implements Subscription { private SourceSubscriber<T> sourceSubscriber; // Not atomic! Override public void cancel() { if (sourceSubscriber != null) { sourceSubscriber.dispose(); // Just sets a boolean } } } ``` **Problem:** `dispose()` sets an internal boolean, but this isn't atomic with respect to subscription switching. If cancel() races with fallback subscription setup: - `dispose()` might set the boolean after fallback checks it - Or before fallback even exists **After:** ```java volatile Subscription s; static final AtomicReferenceFieldUpdater<TimeoutSubscriber, Subscription> S = ...; // TIMEOUT_MARKER: distinct sentinel for timeout-in-progress state static final Subscription TIMEOUT_MARKER = new Subscription() { ... }; Override // TimerTask.run public void run(Timeout timeout) { if (STATE.compareAndSet(this, STATE_INIT, STATE_TERMINATED)) { // Set S to TIMEOUT_MARKER (not cancelledSubscription!) Subscription current = S.getAndSet(this, TIMEOUT_MARKER); if (current != null) current.cancel(); scheduler.schedule(this); } } Override // Runnable.run - scheduler thread public void run() { // Race 3 detection: if cancel() was called, S is now cancelledSubscription if (s == Operators.cancelledSubscription()) { return; // Cancel won - don't emit anything } // CAS to detect late cancel: TIMEOUT_MARKER → null if (S.compareAndSet(this, TIMEOUT_MARKER, null)) { fallback.subscribe(new FallbackSubscriber<>(actual, this)); } // If CAS failed, cancel() changed S to cancelledSubscription - skip fallback } ``` **Why TIMEOUT_MARKER?** If we used `cancelledSubscription()` in run(Timeout): - `cancel()` calls `Operators.terminate(S, this)` which does: `S.getAndSet(this, cancelledSubscription())` - If S is already `cancelledSubscription()`, this is a no-op! - The scheduler's `run()` would have no way to detect the cancel With TIMEOUT_MARKER: - `cancel()` successfully changes S from TIMEOUT_MARKER → cancelledSubscription() - The scheduler's CAS from TIMEOUT_MARKER → null fails - Cancel is detected, fallback is skipped ### 3. Proper Fallback Handling (Race 2 Fix) The original code subscribed to fallback using method references, which bypassed all cancellation checks. **Before:** ```java private void doTimeoutFallback() { scheduler.schedule(() -> { try { // WRONG: Method references go directly to downstream! // If cancel() is called, these lambdas still execute! fallback.subscribe(actual::onNext, actual::onError, actual::onComplete); } catch (Throwable t) { actual.onError(t); } }); } ``` **Problem:** `actual::onNext` is just a method reference - it has no cancellation check. Even after `cancel()`, fallback signals go directly to downstream. **After:** ```java Override public void run() { // Check cancellation before any signal emission if (s == Operators.cancelledSubscription()) { return; } // Standard fallback path if (S.compareAndSet(this, TIMEOUT_MARKER, null)) { // FallbackSubscriber has defensive checks fallback.subscribe(new FallbackSubscriber<>(actual, this)); } // If CAS failed, cancel() was called - skip fallback } ``` ### 4. FallbackSubscriber with Defensive Checks Even with atomic subscription management, a misbehaving fallback publisher might ignore cancellation and continue emitting signals. FallbackSubscriber adds a final line of defense by checking cancellation state before forwarding each signal. ```java static final class FallbackSubscriber<T> implements CoreSubscriber<T> { Override public void onSubscribe(Subscription s) { // Operators.set() atomically: // 1. Checks if S is already cancelledSubscription() // 2. If so, cancels the incoming subscription and returns false // 3. Otherwise, sets S = s and returns true if (Operators.set(S, parent, s)) { s.request(Long.MAX_VALUE); } // If set() returned false, subscription was rejected } Override public void onNext(T t) { // Defensive check: drop signals if cancelled if (parent.s == Operators.cancelledSubscription()) { Operators.onNextDropped(t, actual.currentContext()); return; } actual.onNext(t); } Override public void onError(Throwable t) { if (parent.s == Operators.cancelledSubscription()) { Operators.onErrorDropped(t, actual.currentContext()); return; } actual.onError(t); } Override public void onComplete() { if (parent.s == Operators.cancelledSubscription()) { return; // Silently drop } actual.onComplete(); } } ``` **Why defensive checks in every method?** A misbehaving publisher might: - Ignore the `cancel()` call on its subscription - Continue emitting values after cancellation - Emit onError or onComplete after cancellation The defensive checks ensure no signals leak to downstream after cancellation. ### 5. Timer Injection for Testability **Before:** ```java // Timer hardcoded - impossible to test race conditions deterministically this.timeout = RpcResources.getHashedWheelTimer().newTimeout(this::doTimeout, delay, unit); ``` **After:** ```java private final Timer timer; // Public constructor uses default timer public MonoTimeoutTransformer(Scheduler scheduler, long delay, TimeUnit unit, Mono<T> fallback) { this(scheduler, delay, unit, fallback, RpcResources.getHashedWheelTimer()); } // Package-private constructor for testing VisibleForTesting MonoTimeoutTransformer(Scheduler scheduler, long delay, TimeUnit unit, Mono<T> fallback, Timer timer) { this.timer = Objects.requireNonNull(timer, "timer"); // ... } ``` ### 6. Input Validation **Before:** ```java public MonoTimeoutTransformer(Scheduler scheduler, long delay, TimeUnit unit, Mono<T> fallback) { this.scheduler = scheduler; // No validation this.delay = delay; // No validation this.unit = unit; // No validation this.fallback = fallback; } ``` **After:** ```java MonoTimeoutTransformer(Scheduler scheduler, long delay, TimeUnit unit, Mono<T> fallback, Timer timer) { this.scheduler = Objects.requireNonNull(scheduler, "scheduler"); if (delay < 0) { throw new IllegalArgumentException("delay must be non-negative, was: " + delay); } this.delay = delay; this.unit = Objects.requireNonNull(unit, "unit"); this.fallback = fallback; // null allowed (means no fallback) this.timer = Objects.requireNonNull(timer, "timer"); } ``` ## Summary of Key Bugs Fixed 1. **Multiple signals possible**: Without atomic state, both source and timeout could emit signals. Now CAS ensures exactly one wins. 2. **Fallback ignores cancellation**: Method references bypassed all checks. Now FallbackSubscriber checks cancellation before every signal. 3. **Cancel after timeout lost**: Using `cancelledSubscription()` as timeout marker made cancel() a no-op. Now TIMEOUT_MARKER is distinct. 4. **Timer not cancelled**: No guarantee timer was cancelled on completion. Now `cancelTimer()` is called in all terminal paths. 5. **Untestable races**: Hardcoded timer made deterministic testing impossible. Now timer is injectable via package-private constructor. 6. **No input validation**: Invalid inputs (null scheduler, negative delay) were silently accepted. Now validated with clear error messages. Reviewed By: robertroeser Differential Revision: D95107830 fbshipit-source-id: 3b869f00824a14d14ae394706190ec2896736789github.com-facebook-fbthrift · ff73b836 · 2026-03-05
- 1.9ETVFix state races in FusableReconnectingRpcClientMono Summary: Replaces the two separate AtomicReferenceFieldUpdaters (state + rpcClient) with a single composite ConnectionState reference and makes all state transitions CAS-guarded. Also fixes several additional concurrency and resource lifecycle issues. Bug 1: Torn reads between state and rpcClient. Fix: Combined into a single immutable ConnectionState(phase, client) object. One AtomicReferenceFieldUpdater, one CAS per transition, no torn reads. Bug 2: handleIncomingRpcClient/handleConnectionError used unconditional set(). A stale connection attempt's success could clobber a healthy connection, or a stale failure could tear down a good replacement. Fix: Both callbacks CAS against their specific attempt's ConnectionState identity. Stale results are discarded (client disposed on stale success). Bug 3: onClose callback had a TOCTOU race — read snapshot, then unconditional set to DISCONNECTED. Another thread could advance state between read and write. Fix: trySetStateDisconnected uses CAS on the exact observed snapshot. The onClose callback uses a CAS-loop: read → check identity → CAS to DISCONNECTED. Bug 4: CONNECTING state carried the old client reference. Old client's onClose identity guard matched during CONNECTING, triggering spurious duplicate connects. Fix: CONNECTING uses client=null. Bug 5: Cancelled subscribers leaked during outages. The queue was only drained during CONNECTED. During prolonged DISCONNECTED/CONNECTING cycles, cancelled InnerSubscription objects accumulated unboundedly. Fix: drain() calls pruneCancelledSubscribers() when not CONNECTED. Bug 6: Backoff was applied as a timeout (MonoTimeoutTransformer) instead of a delay. This made backoff act as an increasingly strict timeout on the connection attempt — cancelling slow-but-valid TCP handshakes. On instant failures (connection refused), there was zero delay, causing a CPU-pegging spin-loop. Fix: Backoff applied as delaySubscription() before the attempt. A separate fixed CONNECT_TIMEOUT (10s) limits individual connection attempts. Bug 7: drainWithRpcClient emitted to all queued subscribers without rechecking isDisposed(). If a connection died mid-drain, remaining subscribers received a dead client instead of waiting for reconnection. Fix: Check isDisposed() at the top of the drain loop. Break if disposed. Bug 8: ClassCastException in trySetStateDisconnected. Hard cast to Scannable on onClose() Mono would throw if the publisher didn't implement Scannable, wedging the state machine permanently in CONNECTED with a dead client. Fix: Use Scannable.from() which safely wraps any publisher. Bug 9: Duplicate subscriber queue entries. request(n) used a plain volatile boolean for the enqueue-once guard. Concurrent request() calls could both observe enqueued=false and enqueue the same InnerSubscription twice. Fix: AtomicIntegerFieldUpdater CAS(0, 1) ensures exactly one enqueue. Reviewed By: prakashgayasen, cnli87 Differential Revision: D100047014 fbshipit-source-id: a076b4cad0294dc6755d417ea8bba1a8c93b4089github.com-facebook-fbthrift · 9fe1d18c · 2026-04-14
- 1.5ETVAdd manager-backed client ownership primitives Summary: Typed clients need an explicit lifecycle boundary before we can switch any callers over to the v2 runtime. The legacy Mono<RpcClient> surface does not encode whether closing a typed client should tear down the underlying transport or only release a borrowed handle. This diff introduces that ownership layer: RpcClientManager, RpcClientBinding, BindingRpcClientSource, and the basic single, reconnecting, and simple load-balancing manager implementations. It also adds focused unit tests that lock down the owned-vs-borrowed semantics and manager selection behavior. Nothing uses these classes yet. Follow-up diffs will wire them into runtime selection and Service Router once the lifecycle model is in place. Reviewed By: robertroeser Differential Revision: D101737023 fbshipit-source-id: 0e07253d7c08a551a118390f184577c969859e35github.com-facebook-fbthrift · b2e3d032 · 2026-04-24
- 1.5ETVSelect legacy or manager-backed typed client runtimes Summary: The manager primitives are in place, but typed clients still always behaved like the legacy Mono<RpcClient> runtime. This diff introduces the runtime selector and the v2 factory path so callers can opt into explicit connection ownership without changing the generated client APIs. RpcClientFactory.Builder now chooses between the legacy decorator chain and a new RpcClientFactoryV2 manager stack. ClientBuilder and generated reactive-client Mono constructors route through ClientRuntimeSelector, and ThriftClientConfig plus RpcResources expose the runtime mode from config or a JVM flag. The reconnecting monos now implement Disposable so the compatibility manager can own and shut them down cleanly. Tests cover legacy parity, v2 close semantics, borrowed-vs-owned manager usage, and both header and RSocket factory builds. The ping example is updated to use the builder entrypoint so it follows the same selection path. Differential Revision: D101737024 fbshipit-source-id: 4d0df3952b3f714c266c76495f077f8cd7e81654github.com-facebook-fbthrift · 08e2a091 · 2026-04-24
- 1.0ETVUnifiedServerTransport: honor allowPlaintext, peek-and-demux on the same port Summary: The unified factory (`RpcThriftServer` -> `UnifiedServerTransportFactory`, selected when `ThriftServerConfig.enableAlpn=true`) hard-wires reactor-netty's `.secure()` for every connection. As a result `ThriftServerConfig.allowPlaintext` is silently ignored on this code path, even though it is honored by `LegacyServerTransportFactory`. Any service that turns on ALPN therefore drops plaintext clients on the floor — including Tupperware-side fb303 monitoring (fbagent's per-task fb303 collector, the SR fb303 tier `fbagent.fb303.thrift`) which talks header-thrift over plaintext loopback because `Fb303Collector.transport` is not plumbed through `getFb303ConnectionParams` and `FB303ClientBuilder::buildOldClient` hardcodes `thrift_security=disabled` for loopback. This was discovered while debugging an fblite canary failure (`Wait for Tupperware canary targets to become healthy`): ~6% of canary tasks were missing from the per-task ODS metric `snaptu.gateway.AppSrvSessionPool.NumOfSessions.val`, so the Sandcastle wait step (`SandcastleFBLiteCanaryTupperwareWaitForTargetsStep::genIsTaskReportingToOds`) marked them not-ready and the test groups violated the SLO. Direct fb303-status probes against the affected tasks worked over rocket and theader (both ALPN-aware), but fbagent's plaintext header scrape was getting cut off in the SSL handshake because the unified factory has no plaintext path. # What changes `UnifiedServerTransport.createNewInstance` now branches on `config.isAllowPlaintext()`: - `allowPlaintext=false` (default for callers that explicitly opt out): existing `.secure(SslProvider)` path, byte-for-byte unchanged. TLS-only. - `allowPlaintext=true` (the existing `ThriftServerConfig` default): install `OptionalSslPeeker` + `DeferChannelActiveHandler` in `doOnChannelInit` instead of `.secure()`. Per-connection peek of the first 5 bytes: - If the bytes match a TLS record header, install an `SslHandler` (built from the same `SslContext` that `RpcServerUtils.getSslContext(config)` already provides — same cert, ALPN config, cipher list, JDK-vs-OpenSSL provider) immediately after the peeker. The cumulation buffer flushed at `handlerRemoved` flows through the new `SslHandler`. Handshake then proceeds normally and `SslHandshakeCompletionEvent.SUCCESS` fires upstream. - Otherwise, fire `PlaintextConfirmedEvent.INSTANCE` upstream as a Netty user event. `DeferChannelActiveHandler` mirrors reactor-netty's internal `SslReadHandler`: it suppresses the original `channelActive` and re-fires it on either `SslHandshakeCompletionEvent.SUCCESS` (TLS branch) or `PlaintextConfirmedEvent.INSTANCE` (plaintext branch). This keeps `doOnConnection` as the single stack-wide entry point — ALPN selection, rocket/header pipeline install, and `Connection`-object lifecycle are all unchanged. `getProtocol` returns `HEADER` when no `SslHandler` is in the pipeline (instead of throwing). On the plaintext branch ALPN is impossible, so HEADER is the only sane fallback — matching the C++ ThriftServer behavior when no ALPN protocol is negotiated. This mirrors C++'s `setAllowPlaintextOnLoopback` semantics in spirit (peek-then-demux on the same port, fall back to plaintext header), but without adding a new config knob — the Java-side `allowPlaintext` field has existed since the legacy factory and is the natural toggle to plumb through. # Two correctness bugs caught in review and fixed in the same commit 1. **Peek-phase timeout in `OptionalSslPeeker`.** With the peeker installed but no `SslHandler` yet, a client that connects but sends 0-4 bytes would consume a server channel indefinitely — the `SslHandler.handshakeTimeoutMillis` only starts once the SslHandler is in the pipeline. Added a `ScheduledFuture` started in `handlerAdded`/`channelActive` that closes the channel after `PEEK_TIMEOUT_MILLIS` (10s — same default as `SslHandler.handshakeTimeoutMillis` so the worst-case wall time for a slow client is identical on both branches). The timeout is cancelled both on `decode()` reaching a decision and on `handlerRemoved0`. A test-only second constructor takes an explicit timeout for `EmbeddedChannel`-based unit tests. 2. **Unmatched `channelInactive` in `MetricsChannelDuplexHandler`.** The handler unconditionally decremented `channelCount` and bumped `droppedConnections` in `channelInactive`, but its `channelActive` is gated by any upstream defer-active handler (reactor-netty's `SslReadHandler` or our new `DeferChannelActiveHandler`). If a client disconnected before `channelActive` propagated downstream — during the peek wait, during TLS handshake, or any other defer — the gauge would underflow and connection-limit accounting would silently break. Fix: track a per-instance `incremented` flag, only decrement when it's set, and reset it on `channelInactive`. This protects every deferred-active path, not just ours. # Behavioral implications for existing callers `ThriftServerConfig.allowPlaintext` defaults to `true` (set in the legacy factory's day). Existing callers that enable ALPN therefore go from "TLS-only via `.secure()`" to "peek-and-demux via `OptionalSslPeeker`" by default. The peeker is transparent for TLS clients (same `SslContext`, same ALPN handshake), and all 35 existing `UnifiedServerTransportTest` cases pass under the new path. Callers that genuinely want TLS-only must explicitly set `setAllowPlaintext(false)`. This matches the legacy factory's existing behavior: the field has the same default and the same semantics there. Reviewed By: IlayDavid95, cnli87 Differential Revision: D104335933 fbshipit-source-id: 8743de27f11422d1fb61e60d51ea975c1472c594github.com-facebook-fbthrift · 3df68197 · 2026-05-12
- 0.8ETVFix stream exception handling for Java Rsocket client/server Summary: Fixes bugs in exception handling for streaming RPC methods where undeclared exceptions were incorrectly decoded as TTransportException and stream-level declared exceptions could be incorrectly sent during the initial response phase. Changes: Server-side (StreamResponseHandlerTemplate): - Properly separate function-level vs stream-level declared exceptions - Stream-declared exceptions thrown before stream establishment are now wrapped in TApplicationException per protocol specification - Added context-aware exception checking (isKnownException now takes checkStreamExceptions parameter) Client-side (RSocketRpcClient): - Check both ResponseRpcMetadata and StreamPayloadMetadata for undeclared exceptions in streaming responses - Function-level exceptions use ResponseRpcMetadata, stream-level use StreamPayloadMetadata per Rocket protocol specification Utility improvements (RpcClientUtils): - Added getUndeclaredException(StreamPayloadMetadata) overload - Changed undefined stream exceptions from TTransportException to TApplicationException for consistency with non-streaming behavior - Refactored using Supplier pattern to eliminate code duplication Test updates (UnifiedServerTransportTest): - Updated expectations to verify TApplicationException (not TTransportException) - Verify exception message content instead of incorrect cause chain This ensures Java server compatibility with C++ clients via Service Router and adherence to the Thrift streaming protocol specification. Reviewed By: robertroeser Differential Revision: D88109675 fbshipit-source-id: 456e35879d7e26d0fcfabb40b9dd13befcc08752github.com-facebook-fbthrift · 2ba3ca18 · 2026-04-25
- 0.8ETVAdd Java 25 FFI-based ZSTD compressor via multi-release JAR Summary: Replaces the zstd-jni (JNI) path on Java 25+ with direct Foreign Function & Memory API calls to native libzstd. Zero-copy for direct Netty ByteBufs via MemorySegment.ofAddress(memoryAddress()). Thread-local ZSTD_CCtx and ZSTD_DCtx cached on Netty event loop threads via FastThreadLocal, matching the zstd recommendation to allocate contexts once and reuse per-thread. Range checks: compressBound overflow, createCCtx/createDCtx NULL checks to prevent JVM segfaults from native memory allocation failures. Reviewed By: skrueger, cnli87, adolfojunior Differential Revision: D99339656 fbshipit-source-id: 1afe1486ae951c18836dea7901ba604703075249github.com-facebook-fbthrift · 1562e5ab · 2026-04-15
- 0.6ETVAdd compression framework with ThriftCompressor interface and CompressionManager Summary: Introduces the compression framework package (`com.facebook.thrift.compression`) that mirrors C++ CompressionManager + CompressionAlgorithmSelector: - `ThriftCompressor` interface with ownership-transferring compress/decompress - `CompressionManager` registry mapping TTransform wire IDs and CompressionAlgorithm values to compressor instances, using the generated `TTransform` enum - Stub compressors for ZLIB, ZSTD, and LZ4 (implementations follow in subsequent diffs) Reviewed By: robertroeser Differential Revision: D99339654 fbshipit-source-id: 2e7edeaca62103b6bbf6b4fadeab420320e4e123github.com-facebook-fbthrift · 2ed00906 · 2026-04-15
- 0.5ETVAdd io_uring transport support to Java runtime Summary: This change adds support for io_uring, a high-performance asynchronous I/O interface in the Linux kernel, to the Thrift Java runtime. io_uring provides better performance than epoll for I/O operations and is now prioritized as the default transport on Linux systems where available. The implementation introduces a TransportType enum (IO_URING, EPOLL, KQUEUE, NIO) and refactors the channel selection logic to be based on transport capabilities rather than EventLoopGroup types. The transport type is determined at runtime with the priority: io_uring > epoll > kqueue > nio. Key changes: - Added netty-incubator-transport-io_uring dependencies for Linux (x86_64 and aarch64) - Refactored NettyUtil.createEventLoopGroup() to use the new MultiThreadIoEventLoopGroup with IoHandlerFactory pattern - Updated RpcClientUtils and RpcServerUtils to select channel classes based on TransportType rather than EventLoopGroup instances - Added support for NioDomainSocketChannel (Java 16+) as a fallback for domain sockets with NIO - Updated LegacyRpcClientFactory and LegacyServerTransport to work with the new channel selection API Reviewed By: RayanRal, cnli87, adolfojunior Differential Revision: D84779000 fbshipit-source-id: 83b3539ce3ba9efc26776495788270dc77d2ce45github.com-facebook-fbthrift · 913c306d · 2026-03-03
- 0.5ETVImplement ZSTD compressor with direct ByteBuffer APIs Summary: Implements ZstdCompressor using zstd-jni with direct ByteBuffer APIs: - `Zstd.compress(ByteBuffer, ByteBuffer, int)` and `Zstd.decompress(ByteBuffer, ByteBuffer)` via `data.nioBuffer()` for zero-copy when the underlying ByteBuf is direct - `Zstd.getDirectByteBufferFrameContentSize()` replaces deprecated `decompressedSize()` - Heap-backed buffers are transparently copied to a temporary direct ByteBuffer since zstd-jni's ByteBuffer API requires direct buffers - Adds `zstd-jni` dependency to runtime and runtime-test BUCK targets Reviewed By: robertroeser Differential Revision: D99339650 fbshipit-source-id: 0ae36951ca570e4db8b0f53445681fc38d1ac6b0github.com-facebook-fbthrift · c2c56538 · 2026-04-15
- 0.5ETVFix FusableReconnectingRpcClientMono returning disposed clients Summary: FusableReconnectingRpcClientMono.fastPath() and drain() did not check isDisposed() before returning cached RpcClients to subscribers. This caused a race window where disposed/dead clients were served to callers. In production (CLF sidecar), when an RSocket connection dies, the underlying RSocket is disposed but onClose() may not complete immediately because TcpDuplexConnection.onClose() uses Mono.whenDelayError() waiting for both UnboundedProcessor finalization AND connection.onTerminate(). During this window (which can last up to the keep-alive timeout — 1 hour by default), every subscription to the reconnecting mono returns the dead client via the fast path, causing all RPC calls to fail or hang. The fix adds isDisposed() checks in two places: 1. subscribe() fast path (line 94): if the cached client is disposed, fall through to the slow path which queues the subscriber for reconnection. 2. drain() loop: if the current client is disposed while in CONNECTED state, transition to DISCONNECTED to trigger immediate reconnection rather than emitting the stale client to waiting subscribers. Reviewed By: cnli87 Differential Revision: D100048708 fbshipit-source-id: 88b1ef024d2c10feda6907655034359c71ba96a3github.com-facebook-fbthrift · bd37b3f9 · 2026-04-14
- 0.4ETVBack out "Add io_uring transport support to Java runtime" Summary: Original commit changeset: 83b3539ce3ba Original Phabricator Diff: D84779000 Differential Revision: D97182221 fbshipit-source-id: 25b3a861974c77a5f4d9ecfd5110b07baa6de556github.com-facebook-fbthrift · b30a151f · 2026-03-19
- 0.4ETVImplement ZLIB compressor with thread-local caching and Java 11 multi-release Summary: Implements ZlibCompressor using JDK Deflater/Inflater: - Base (Java 8): byte[] I/O with `ByteBufUtil.getBytes()` / chunked deflate - Java 11 multi-release: zero-copy ByteBuffer I/O via `data.nioBuffer()`, `deflater.setInput(ByteBuffer)` + `deflater.deflate(ByteBuffer)` writing directly into the output ByteBuf's backing memory - Thread-local Deflater/Inflater on Netty event loop threads (`FastThreadLocalThread`) to avoid native resource allocation per request; `reset()` between uses, `end()` on non-event-loop threads Reviewed By: robertroeser Differential Revision: D99339651 fbshipit-source-id: c3e3d68039cefa08bb798461ce66b933237d691agithub.com-facebook-fbthrift · db8004f2 · 2026-04-15
- 0.3ETVImplement LZ4 compressor with varint size prefix matching C++ LZ4_VARINT_SIZE Summary: Wire format: [varint: uncompressed_size] [LZ4 block compressed data] Uses at.yawk.lz4:lz4-java (net.jpountz.lz4) with LZ4Factory.fastestInstance(). The varint encoding matches folly::encodeVarint — standard protobuf base-128 variable-length encoding — ensuring wire compatibility with C++ Thrift. Uses ByteBuffer APIs via nioBuffer() for zero-copy when the underlying ByteBuf is direct. Reviewed By: robertroeser Differential Revision: D99339657 fbshipit-source-id: 982e1fc419a87f87677eaa7497808986a322fde4github.com-facebook-fbthrift · 7824c4bb · 2026-04-15
- 0.3ETVAdd pooled managers for direct Service Router tiers Summary: Direct Service Router clients bypass the local SR socket and track backend hosts in Java. In the v2 runtime that means we need a manager-layer equivalent of the old pooled mono so borrowed typed clients can share host refresh state and child connections instead of rebuilding them per client. This diff adds PooledRpcClientManager and PooledRpcClientManagerFactory. They keep one pool per TierSocketAddress, periodically refresh the tier host list, reuse child managers for unchanged addresses, and dispose managers for removed hosts. ServiceRouterV2ClientFactoryImpl and ServiceRouterV2BindingsClientFactoryImpl now use borrowed pooled managers in V2 mode for createDirectClient requests, while leaving the legacy path unchanged. The CLF integration suite now exercises explicit V2 runtime selection and borrowed close semantics, and the runtime test covers pooled-manager factory reuse and disposal. Reviewed By: adolfojunior Differential Revision: D101737022 fbshipit-source-id: c99522b4d551a044a93b030db10af8edb106d76egithub.com-facebook-fbthrift · 74517a20 · 2026-04-24
- 0.3ETVAlign declared exception messages with C++ Rocket semantics Summary: The C++ Rocket reference always populates name_utf8 and what_utf8 for exception metadata, and generated C++ exceptions always provide a non-null what() fallback. Java was still missing part of that contract: declared exception metadata now sets name_utf8/what_utf8 for both stream-level and function-level responses, and Java exception codegen now guarantees a meaningful getMessage() for exceptions without a literal message field by falling back to the generated class name. This keeps Java aligned with the documented server transport exception contract and with the C++ reference behavior for the three exception-message shapes: - literal message field: use that field - thrift.ExceptionMessage field: use the annotated field - no message field: fall back to the exception class name The rsocket server tests now cover all three cases and verify that declared exception metadata carries the expected what_utf8 values on the wire. Reviewed By: robertroeser Differential Revision: D95857603 fbshipit-source-id: d52672376c976e4c845c9fd85e5d0641d02c27c3github.com-facebook-fbthrift · ced7ee34 · 2026-04-25
- 0.3ETVAdd Java 25 FFI-based LZ4 compressor via multi-release JAR Summary: Replaces lz4-java JNI path on Java 25+ with direct Foreign Function & Memory API calls to native liblz4. Uses LZ4_decompress_safe() (the safe API) and LZ4_compress_fast_extState() with a thread-local state buffer on Netty event loop threads. Decompression is stateless (no context needed). Wire format unchanged: [varint: uncompressed_size] [LZ4 block]. Reviewed By: robertroeser Differential Revision: D99339648 fbshipit-source-id: b9814f8b0489531e38c745f8be9b3784f3f1a19fgithub.com-facebook-fbthrift · c9e00a94 · 2026-04-15
- 0.2ETVSet RequestContext thread-local before ThriftEventHandler callbacks Summary: The generated RpcServerHandler code previously set the RequestContext thread-local via `.doFirst()` on the delegate Mono, which only executed after `preRead`/`postRead` event handler callbacks had already run. When `isForceExecutionOffEventLoop()` is true, the entire `Mono.defer()` runs on an off-loop scheduler thread where the thread-local was not set, causing `RequestContexts.getCurrentContext()` to return null during event handler callbacks. This moves `RequestContexts.setCurrentContext()` to the top of the `Mono.defer()` lambda in `singleRequestSingleResponse`, `singleRequestStreamingResponse`, and `singleRequestNoResponse`, so the thread-local is available for the entire request lifecycle including `getContext`, `preRead`, `postRead`, and `preWrite` callbacks. Also adds `doFinally(() -> clearCurrentContext())` to prevent thread-local leaks on thread pool threads, and removes the now-redundant `.doFirst()` from the `_do<method>()` delegate calls. Reviewed By: prakashgayasen, robertroeser Differential Revision: D96822801 fbshipit-source-id: 80c64b49ad1466e279cb4073e431d8bf18f650f0github.com-facebook-fbthrift · 1c7192e3 · 2026-03-20
- 0.2ETVWire CompressionManager into Rocket transport Summary: PayloadUtil.createPayload() now compresses outbound data when a CompressionAlgorithm is specified (was a no-op). PayloadUtil.getData() now decompresses inbound data. ThriftServerRSocket decompresses incoming request payloads via maybeDecompressRequestData() which always returns an owned ByteBuf (retained slice when uncompressed, new buffer when decompressed). The buffer is released in doFinally alongside the original payload. ThriftSocketAcceptor sends a SetupResponse with zstdSupported=true during connection setup, matching C++ ThriftRocketServerHandler. This enables C++ clients to auto-upgrade ZLIB -> ZSTD compression. Reviewed By: robertroeser Differential Revision: D99339653 fbshipit-source-id: ee4de95a9fa5c33003f1e2dbfcb9dd464dce877cgithub.com-facebook-fbthrift · 8254ed7e · 2026-04-15