diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2OverH2TunnelExchangeHandler.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2OverH2TunnelExchangeHandler.java new file mode 100644 index 0000000000..ddb21238e3 --- /dev/null +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2OverH2TunnelExchangeHandler.java @@ -0,0 +1,284 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.nio.support; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ConnectionClosedException; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.StreamControl; +import org.apache.hc.core5.http.HttpRequestInterceptor; +import org.apache.hc.core5.http.impl.BasicEntityDetails; +import org.apache.hc.core5.http.message.BasicHttpRequest; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.http.protocol.HttpContext; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.net.NamedEndpoint; +import org.apache.hc.core5.net.URIAuthority; +import org.apache.hc.core5.reactor.IOEventHandler; +import org.apache.hc.core5.reactor.IOEventHandlerFactory; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.reactor.ProtocolIOSession; +import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer; +import org.apache.hc.core5.util.Timeout; + +/** + * Exchange handler that establishes CONNECT and then exposes the tunnel stream as a ProtocolIOSession. + */ +final class H2OverH2TunnelExchangeHandler implements AsyncClientExchangeHandler { + + private final IOSession physicalSession; + private final NamedEndpoint targetEndpoint; + private final Timeout connectTimeout; + private final boolean secure; + private final TlsStrategy tlsStrategy; + private final HttpRequestInterceptor connectRequestInterceptor; + private final IOEventHandlerFactory protocolStarter; + private final FutureCallback callback; + + private final AtomicBoolean done; + + private volatile DataStreamChannel dataChannel; + private volatile CapacityChannel capacityChannel; + private volatile StreamControl streamControl; + private volatile H2TunnelProtocolIOSession tunnelSession; + + H2OverH2TunnelExchangeHandler( + final IOSession physicalSession, + final NamedEndpoint targetEndpoint, + final Timeout connectTimeout, + final boolean secure, + final TlsStrategy tlsStrategy, + final HttpRequestInterceptor connectRequestInterceptor, + final IOEventHandlerFactory protocolStarter, + final FutureCallback callback) { + this.physicalSession = physicalSession; + this.targetEndpoint = targetEndpoint; + this.connectTimeout = connectTimeout; + this.secure = secure; + this.tlsStrategy = tlsStrategy; + this.connectRequestInterceptor = connectRequestInterceptor; + this.protocolStarter = protocolStarter; + this.callback = callback; + this.done = new AtomicBoolean(false); + } + + void initiated(final StreamControl streamControl) { + this.streamControl = streamControl; + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + if (tunnel != null) { + tunnel.bindStreamControl(streamControl); + } + } + + @Override + public void releaseResources() { + } + + @Override + public void failed(final Exception cause) { + fail(cause); + } + + @Override + public void cancel() { + fail(new ConnectionClosedException("Tunnel setup cancelled")); + } + + @Override + public void produceRequest(final RequestChannel requestChannel, final HttpContext context) throws HttpException, IOException { + final HttpRequest connectRequest = new BasicHttpRequest(Method.CONNECT.name(), (String) null); + connectRequest.setAuthority(new URIAuthority(targetEndpoint)); + if (connectRequestInterceptor != null) { + connectRequestInterceptor.process(connectRequest, null, context); + } + requestChannel.sendRequest(connectRequest, new BasicEntityDetails(-1, null), context); + } + + @Override + public int available() { + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + return tunnel != null ? tunnel.available() : 0; + } + + @Override + public void produce(final DataStreamChannel channel) throws IOException { + this.dataChannel = channel; + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + if (tunnel != null) { + tunnel.attachChannel(channel); + tunnel.onOutputReady(); + } + } + + @Override + public void consumeInformation(final HttpResponse response, final HttpContext context) { + } + + @Override + public void consumeResponse( + final HttpResponse response, + final EntityDetails entityDetails, + final HttpContext context) throws HttpException, IOException { + + final int status = response.getCode(); + if (status < 200 || status >= 300) { + throw new TunnelRefusedException(response); + } + + if (entityDetails == null) { + throw new HttpException("CONNECT response does not provide a tunneled data stream"); + } + + if (this.tunnelSession != null) { + return; + } + + final H2TunnelProtocolIOSession tunnel = + new H2TunnelProtocolIOSession(physicalSession, targetEndpoint, connectTimeout, streamControl); + + final DataStreamChannel currentChannel = this.dataChannel; + if (currentChannel != null) { + tunnel.attachChannel(currentChannel); + } + final CapacityChannel currentCapacity = this.capacityChannel; + if (currentCapacity != null) { + tunnel.updateCapacityChannel(currentCapacity); + } + this.tunnelSession = tunnel; + + if (secure) { + tlsStrategy.upgrade( + tunnel, + targetEndpoint, + null, + connectTimeout, + new FutureCallback() { + + @Override + public void completed(final TransportSecurityLayer transportSecurityLayer) { + try { + startProtocol(tunnel); + complete(tunnel); + } catch (final Exception ex) { + fail(ex); + } + } + + @Override + public void failed(final Exception ex) { + fail(ex); + } + + @Override + public void cancelled() { + fail(new ConnectionClosedException("Tunnel TLS upgrade cancelled")); + } + }); + } else { + startProtocol(tunnel); + complete(tunnel); + } + } + + private void startProtocol(final H2TunnelProtocolIOSession tunnel) throws IOException { + if (protocolStarter == null) { + return; + } + final IOEventHandler protocolHandler = protocolStarter.createHandler(tunnel, null); + tunnel.upgrade(protocolHandler); + protocolHandler.connected(tunnel); + } + + @Override + public void updateCapacity(final CapacityChannel capacityChannel) throws IOException { + this.capacityChannel = capacityChannel; + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + if (tunnel != null) { + tunnel.updateCapacityChannel(capacityChannel); + } + } + + @Override + public void consume(final ByteBuffer src) throws IOException { + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + if (tunnel != null && src != null && src.hasRemaining()) { + tunnel.onInput(src); + } + } + + @Override + public void streamEnd(final List trailers) { + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + if (tunnel != null) { + tunnel.onRemoteStreamEnd(); + } else { + closeTransport(CloseMode.GRACEFUL); + } + if (done.compareAndSet(false, true) && callback != null) { + callback.failed(new ConnectionClosedException("Tunnel stream closed")); + } + } + + private void closeTransport(final CloseMode closeMode) { + final H2TunnelProtocolIOSession tunnel = this.tunnelSession; + if (tunnel != null) { + tunnel.close(closeMode); + return; + } + final StreamControl currentStreamControl = this.streamControl; + if (currentStreamControl != null) { + currentStreamControl.cancel(); + } + } + + private void fail(final Exception cause) { + closeTransport(CloseMode.IMMEDIATE); + if (done.compareAndSet(false, true) && callback != null) { + callback.failed(cause); + } + } + + private void complete(final H2TunnelProtocolIOSession tunnel) { + if (done.compareAndSet(false, true) && callback != null) { + callback.completed(tunnel); + } + } +} \ No newline at end of file diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2OverH2TunnelSupport.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2OverH2TunnelSupport.java new file mode 100644 index 0000000000..1bc1aee7df --- /dev/null +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2OverH2TunnelSupport.java @@ -0,0 +1,167 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.nio.support; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpRequestInterceptor; +import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.net.NamedEndpoint; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOEventHandlerFactory; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.reactor.ProtocolIOSession; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.Timeout; + +/** + * Helper for establishing HTTP/2 tunnels through HTTP/2 proxies using the CONNECT method. + * + *

+ * Multiplexing-safe: tunnel close/reset affects only the CONNECT stream (via StreamControl), + * never the underlying physical HTTP/2 connection. + *

+ * + *

+ * Note: This helper does not implement proxy authentication challenge handling (407 retries). + * That belongs in client implementations that maintain authentication state. + *

+ * + * @since 5.5 + */ +public final class H2OverH2TunnelSupport { + + private H2OverH2TunnelSupport() { + } + + public static void establish( + final IOSession proxySession, + final NamedEndpoint target, + final Timeout connectTimeout, + final boolean secure, + final TlsStrategy tlsStrategy, + final FutureCallback callback) { + establishInternal(proxySession, target, connectTimeout, secure, tlsStrategy, null, null, callback); + } + + public static void establish( + final IOSession proxySession, + final NamedEndpoint target, + final Timeout connectTimeout, + final boolean secure, + final TlsStrategy tlsStrategy, + final HttpRequestInterceptor connectRequestInterceptor, + final FutureCallback callback) { + establishInternal(proxySession, target, connectTimeout, secure, tlsStrategy, connectRequestInterceptor, null, callback); + } + + public static void establish( + final IOSession proxySession, + final NamedEndpoint target, + final Timeout connectTimeout, + final boolean secure, + final TlsStrategy tlsStrategy, + final IOEventHandlerFactory protocolStarter, + final FutureCallback callback) { + establish(proxySession, target, connectTimeout, secure, tlsStrategy, null, protocolStarter, callback); + } + + public static void establish( + final IOSession proxySession, + final NamedEndpoint target, + final Timeout connectTimeout, + final boolean secure, + final TlsStrategy tlsStrategy, + final HttpRequestInterceptor connectRequestInterceptor, + final IOEventHandlerFactory protocolStarter, + final FutureCallback callback) { + + final FutureCallback adapter = callback != null ? new FutureCallback() { + + @Override + public void completed(final ProtocolIOSession result) { + callback.completed(result); + } + + @Override + public void failed(final Exception ex) { + callback.failed(ex); + } + + @Override + public void cancelled() { + callback.cancelled(); + } + + } : null; + + establishInternal(proxySession, target, connectTimeout, secure, tlsStrategy, connectRequestInterceptor, protocolStarter, adapter); + } + + private static void establishInternal( + final IOSession proxySession, + final NamedEndpoint target, + final Timeout connectTimeout, + final boolean secure, + final TlsStrategy tlsStrategy, + final HttpRequestInterceptor connectRequestInterceptor, + final IOEventHandlerFactory protocolStarter, + final FutureCallback callback) { + + Args.notNull(proxySession, "Proxy I/O session"); + Args.notNull(target, "Tunnel target endpoint"); + if (secure) { + Args.notNull(tlsStrategy, "TLS strategy"); + } + + final H2OverH2TunnelExchangeHandler exchangeHandler = new H2OverH2TunnelExchangeHandler( + proxySession, + target, + connectTimeout, + secure, + tlsStrategy, + connectRequestInterceptor, + protocolStarter, + callback); + + proxySession.enqueue( + new RequestExecutionCommand( + exchangeHandler, + null, + HttpCoreContext.create(), + exchangeHandler::initiated), + Command.Priority.NORMAL); + } + + static void closeQuietly(final ProtocolIOSession session) { + if (session != null) { + session.close(CloseMode.IMMEDIATE); + } + } +} \ No newline at end of file diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2TunnelProtocolIOSession.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2TunnelProtocolIOSession.java new file mode 100644 index 0000000000..19b99237ba --- /dev/null +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2TunnelProtocolIOSession.java @@ -0,0 +1,380 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.nio.support; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; + +import org.apache.hc.core5.concurrent.CallbackContribution; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.StreamControl; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.net.NamedEndpoint; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOEventHandler; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.reactor.ProtocolIOSession; +import org.apache.hc.core5.reactor.ssl.SSLBufferMode; +import org.apache.hc.core5.reactor.ssl.SSLIOSession; +import org.apache.hc.core5.reactor.ssl.SSLMode; +import org.apache.hc.core5.reactor.ssl.SSLSessionInitializer; +import org.apache.hc.core5.reactor.ssl.SSLSessionVerifier; +import org.apache.hc.core5.reactor.ssl.TlsDetails; +import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer; +import org.apache.hc.core5.util.Timeout; + +/** + * ProtocolIOSession backed by a single HTTP/2 CONNECT stream. + */ +final class H2TunnelProtocolIOSession implements ProtocolIOSession { + + private static final IOEventHandler NOOP_HANDLER = new IOEventHandler() { + + @Override + public void connected(final IOSession session) { + } + + @Override + public void inputReady(final IOSession session, final ByteBuffer src) { + } + + @Override + public void outputReady(final IOSession session) { + } + + @Override + public void timeout(final IOSession session, final Timeout timeout) { + } + + @Override + public void exception(final IOSession session, final Exception cause) { + } + + @Override + public void disconnected(final IOSession session) { + } + }; + + private final NamedEndpoint initialEndpoint; + private final H2TunnelRawIOSession ioSession; + private final AtomicReference tlsSessionRef; + private final AtomicReference currentSessionRef; + + H2TunnelProtocolIOSession( + final IOSession physicalSession, + final NamedEndpoint initialEndpoint, + final Timeout socketTimeout, + final StreamControl streamControl) { + this.initialEndpoint = initialEndpoint; + this.ioSession = new H2TunnelRawIOSession(physicalSession, socketTimeout, streamControl); + this.tlsSessionRef = new AtomicReference<>(); + this.currentSessionRef = new AtomicReference<>(ioSession); + this.ioSession.upgrade(NOOP_HANDLER); + } + + void bindStreamControl(final StreamControl streamControl) { + ioSession.bindStreamControl(streamControl); + } + + void attachChannel(final DataStreamChannel channel) { + ioSession.attachChannel(channel); + } + + void updateCapacityChannel(final CapacityChannel capacityChannel) throws IOException { + ioSession.updateCapacityChannel(capacityChannel); + } + + int available() { + return ioSession.available(); + } + + void onInput(final ByteBuffer src) throws IOException { + final ByteBuffer handlerSrc = src != null ? src.asReadOnlyBuffer() : null; + + ioSession.appendInput(src); + + final IOSession currentSession = currentSessionRef.get(); + final IOEventHandler handler = currentSession.getHandler(); + if (handler != null) { + handler.inputReady(currentSession, handlerSrc); + if (handlerSrc != null) { + final int consumed = handlerSrc.position(); + if (consumed > 0) { + ioSession.discardInbound(consumed); + } + } + } + + if (ioSession.available() > 0) { + ioSession.requestOutput(); + } + } + + void onOutputReady() throws IOException { + final IOSession currentSession = currentSessionRef.get(); + final IOEventHandler handler = currentSession.getHandler(); + if (handler != null) { + handler.outputReady(currentSession); + } + ioSession.flushOutput(); + if (ioSession.available() > 0) { + ioSession.requestOutput(); + } + } + + void onRemoteStreamEnd() { + ioSession.onRemoteStreamEnd(); + final IOSession currentSession = currentSessionRef.get(); + final IOEventHandler handler = currentSession.getHandler(); + if (handler != null) { + handler.disconnected(currentSession); + } + } + + @Override + public NamedEndpoint getInitialEndpoint() { + return initialEndpoint; + } + + @Override + public IOEventHandler getHandler() { + return currentSessionRef.get().getHandler(); + } + + @Override + public void upgrade(final IOEventHandler handler) { + currentSessionRef.get().upgrade(handler); + } + + @Override + public Lock getLock() { + return ioSession.getLock(); + } + + @Override + public void enqueue(final Command command, final Command.Priority priority) { + currentSessionRef.get().enqueue(command, priority); + } + + @Override + public boolean hasCommands() { + return currentSessionRef.get().hasCommands(); + } + + @Override + public Command poll() { + return currentSessionRef.get().poll(); + } + + @Override + public ByteChannel channel() { + return currentSessionRef.get().channel(); + } + + @Override + public SocketAddress getRemoteAddress() { + return ioSession.getRemoteAddress(); + } + + @Override + public SocketAddress getLocalAddress() { + return ioSession.getLocalAddress(); + } + + @Override + public int getEventMask() { + return currentSessionRef.get().getEventMask(); + } + + @Override + public void setEventMask(final int ops) { + currentSessionRef.get().setEventMask(ops); + } + + @Override + public void setEvent(final int op) { + currentSessionRef.get().setEvent(op); + } + + @Override + public void clearEvent(final int op) { + currentSessionRef.get().clearEvent(op); + } + + @Override + public void close() { + close(CloseMode.GRACEFUL); + } + + @Override + public void close(final CloseMode closeMode) { + if (closeMode == CloseMode.IMMEDIATE) { + ioSession.close(closeMode); + } else { + currentSessionRef.get().close(closeMode); + } + } + + @Override + public Status getStatus() { + return currentSessionRef.get().getStatus(); + } + + @Override + public Timeout getSocketTimeout() { + return ioSession.getSocketTimeout(); + } + + @Override + public void setSocketTimeout(final Timeout timeout) { + ioSession.setSocketTimeout(timeout); + } + + @Override + public long getLastReadTime() { + return ioSession.getLastReadTime(); + } + + @Override + public long getLastWriteTime() { + return ioSession.getLastWriteTime(); + } + + @Override + public long getLastEventTime() { + return ioSession.getLastEventTime(); + } + + @Override + public void updateReadTime() { + ioSession.updateReadTime(); + } + + @Override + public void updateWriteTime() { + ioSession.updateWriteTime(); + } + + @Override + public int read(final ByteBuffer dst) throws IOException { + return currentSessionRef.get().read(dst); + } + + @Override + public int write(final ByteBuffer src) throws IOException { + return currentSessionRef.get().write(src); + } + + @Override + public boolean isOpen() { + return currentSessionRef.get().isOpen(); + } + + @Override + public String getId() { + return ioSession.getId(); + } + + @Override + public void startTls( + final SSLContext sslContext, + final NamedEndpoint endpoint, + final SSLBufferMode sslBufferMode, + final SSLSessionInitializer initializer, + final SSLSessionVerifier verifier, + final Timeout handshakeTimeout) { + startTls(sslContext, endpoint, sslBufferMode, initializer, verifier, handshakeTimeout, null); + } + + @Override + public void startTls( + final SSLContext sslContext, + final NamedEndpoint endpoint, + final SSLBufferMode sslBufferMode, + final SSLSessionInitializer initializer, + final SSLSessionVerifier verifier, + final Timeout handshakeTimeout, + final FutureCallback callback) { + + final SSLIOSession sslioSession = new SSLIOSession( + endpoint != null ? endpoint : initialEndpoint, + ioSession, + SSLMode.CLIENT, + sslContext, + sslBufferMode, + initializer, + verifier, + handshakeTimeout, + null, + null, + new CallbackContribution(callback) { + + @Override + public void completed(final SSLSession sslSession) { + if (callback != null) { + callback.completed(H2TunnelProtocolIOSession.this); + } + } + + }); + + if (tlsSessionRef.compareAndSet(null, sslioSession)) { + currentSessionRef.set(sslioSession); + } else { + throw new IllegalStateException("TLS already activated"); + } + + try { + sslioSession.beginHandshake(this); + } catch (final Exception ex) { + if (callback != null) { + callback.failed(ex); + } + close(CloseMode.IMMEDIATE); + } + } + + @Override + public TlsDetails getTlsDetails() { + final SSLIOSession sslIoSession = tlsSessionRef.get(); + return sslIoSession != null ? sslIoSession.getTlsDetails() : null; + } + + @Override + public int getPendingCommandCount() { + return currentSessionRef.get().getPendingCommandCount(); + } +} \ No newline at end of file diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2TunnelRawIOSession.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2TunnelRawIOSession.java new file mode 100644 index 0000000000..3d8d0b3d77 --- /dev/null +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/H2TunnelRawIOSession.java @@ -0,0 +1,692 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.nio.support; + +import java.io.IOException; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.nio.channels.SelectionKey; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hc.core5.http.StreamControl; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOEventHandler; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.util.Timeout; + +/** + * Raw tunnel IOSession implementation: bounded buffering + capacity updates + stream-scoped close/cancel. + *

+ */ +final class H2TunnelRawIOSession implements IOSession { + + private static final int INBOUND_BUFFER_LIMIT = 64 * 1024; + private static final int OUTBOUND_BUFFER_LIMIT = 64 * 1024; + + private final IOSession physicalSession; + private final String id; + private final Lock lock; + + private final Deque commandQueue; + private final Deque inboundQueue; + private final Deque outboundQueue; + + private final AtomicReference handlerRef; + private final AtomicReference dataChannelRef; + private final AtomicReference streamControlRef; + + private CapacityChannel capacityChannel; + private Timeout socketTimeout; + private int eventMask; + private Status status; + + private int inboundBytes; + private int outboundBytes; + private int consumedBytesSinceUpdate; + + private boolean capacityInitialized; + private boolean localEndStreamSent; + private boolean remoteEndStream; + + private long lastReadTime; + private long lastWriteTime; + private long lastEventTime; + + H2TunnelRawIOSession( + final IOSession physicalSession, + final Timeout socketTimeout, + final StreamControl streamControl) { + this.physicalSession = physicalSession; + this.id = physicalSession.getId() + "-h2-tunnel"; + this.lock = new ReentrantLock(); + this.commandQueue = new ArrayDeque<>(); + this.inboundQueue = new ArrayDeque<>(); + this.outboundQueue = new ArrayDeque<>(); + this.handlerRef = new AtomicReference<>(); + this.dataChannelRef = new AtomicReference<>(); + this.streamControlRef = new AtomicReference<>(streamControl); + + this.capacityChannel = null; + this.socketTimeout = socketTimeout; + this.eventMask = SelectionKey.OP_READ; + this.status = Status.ACTIVE; + + this.capacityInitialized = false; + this.localEndStreamSent = false; + this.remoteEndStream = false; + + final long now = System.currentTimeMillis(); + this.lastReadTime = now; + this.lastWriteTime = now; + this.lastEventTime = now; + } + + void bindStreamControl(final StreamControl streamControl) { + streamControlRef.compareAndSet(null, streamControl); + } + + void attachChannel(final DataStreamChannel channel) { + dataChannelRef.set(channel); + } + + void updateCapacityChannel(final CapacityChannel capacityChannel) throws IOException { + int update = 0; + lock.lock(); + try { + this.capacityChannel = capacityChannel; + if (!capacityInitialized) { + update += Math.max(0, INBOUND_BUFFER_LIMIT - inboundBytes); + capacityInitialized = true; + } + if (consumedBytesSinceUpdate > 0) { + update += consumedBytesSinceUpdate; + consumedBytesSinceUpdate = 0; + } + } finally { + lock.unlock(); + } + if (update > 0) { + capacityChannel.update(update); + } + } + + void onRemoteStreamEnd() { + lock.lock(); + try { + remoteEndStream = true; + if (status == Status.ACTIVE) { + status = Status.CLOSING; + } + if (localEndStreamSent) { + status = Status.CLOSED; + } + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + } + + void requestOutput() { + final DataStreamChannel dataChannel = dataChannelRef.get(); + if (dataChannel != null) { + dataChannel.requestOutput(); + } + } + + int available() { + lock.lock(); + try { + if (outboundBytes > 0) { + return outboundBytes; + } + if (!localEndStreamSent && status == Status.CLOSING) { + return 1; + } + if (!commandQueue.isEmpty() || (eventMask & SelectionKey.OP_WRITE) != 0) { + return 1; + } + return 0; + } finally { + lock.unlock(); + } + } + + void appendInput(final ByteBuffer src) throws IOException { + if (src == null || !src.hasRemaining()) { + return; + } + lock.lock(); + try { + if (status == Status.CLOSED) { + return; + } + final int remaining = src.remaining(); + final int freeSpace = INBOUND_BUFFER_LIMIT - inboundBytes; + if (remaining > freeSpace) { + throw new IOException("Tunnel inbound buffer overflow"); + } + final byte[] data = new byte[remaining]; + src.get(data); + inboundQueue.addLast(ByteBuffer.wrap(data)); + inboundBytes += data.length; + final long now = System.currentTimeMillis(); + lastReadTime = now; + lastEventTime = now; + } finally { + lock.unlock(); + } + } + + void discardInbound(final int bytes) throws IOException { + if (bytes <= 0) { + return; + } + int remaining = bytes; + int update = 0; + CapacityChannel currentCapacityChannel = null; + + lock.lock(); + try { + while (remaining > 0) { + final ByteBuffer buffer = inboundQueue.peekFirst(); + if (buffer == null) { + break; + } + final int chunk = Math.min(remaining, buffer.remaining()); + if (chunk <= 0) { + break; + } + buffer.position(buffer.position() + chunk); + remaining -= chunk; + inboundBytes -= chunk; + if (!buffer.hasRemaining()) { + inboundQueue.pollFirst(); + } + consumedBytesSinceUpdate += chunk; + } + if (capacityChannel != null && consumedBytesSinceUpdate > 0) { + currentCapacityChannel = capacityChannel; + update = consumedBytesSinceUpdate; + consumedBytesSinceUpdate = 0; + } + } finally { + lock.unlock(); + } + + if (currentCapacityChannel != null && update > 0) { + currentCapacityChannel.update(update); + } + } + + void flushOutput() throws IOException { + final DataStreamChannel dataChannel = dataChannelRef.get(); + if (dataChannel == null) { + return; + } + + boolean sendEndStream = false; + + lock.lock(); + try { + for (; ; ) { + final ByteBuffer buffer = outboundQueue.peekFirst(); + if (buffer == null) { + break; + } + final int bytesWritten = dataChannel.write(buffer); + if (bytesWritten <= 0) { + break; + } + outboundBytes -= bytesWritten; + if (!buffer.hasRemaining()) { + outboundQueue.pollFirst(); + } + final long now = System.currentTimeMillis(); + lastWriteTime = now; + lastEventTime = now; + } + if (!localEndStreamSent && status == Status.CLOSING && outboundQueue.isEmpty()) { + localEndStreamSent = true; + sendEndStream = true; + } + } finally { + lock.unlock(); + } + + if (sendEndStream) { + try { + dataChannel.endStream(null); + } finally { + lock.lock(); + try { + if (remoteEndStream) { + status = Status.CLOSED; + } + } finally { + lock.unlock(); + } + } + } + } + + private void cancelPendingCommands() { + for (final Command command : commandQueue) { + command.cancel(); + } + commandQueue.clear(); + } + + private void cancelStream() { + final StreamControl streamControl = streamControlRef.get(); + if (streamControl != null) { + streamControl.cancel(); + } + } + + @Override + public IOEventHandler getHandler() { + return handlerRef.get(); + } + + @Override + public void upgrade(final IOEventHandler handler) { + handlerRef.set(handler); + } + + @Override + public Lock getLock() { + return lock; + } + + @Override + public void enqueue(final Command command, final Command.Priority priority) { + if (command == null) { + return; + } + lock.lock(); + try { + if (status != Status.ACTIVE) { + command.cancel(); + return; + } + if (priority == Command.Priority.IMMEDIATE) { + commandQueue.addFirst(command); + } else { + commandQueue.addLast(command); + } + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + requestOutput(); + } + + @Override + public boolean hasCommands() { + lock.lock(); + try { + return !commandQueue.isEmpty(); + } finally { + lock.unlock(); + } + } + + @Override + public Command poll() { + lock.lock(); + try { + return commandQueue.pollFirst(); + } finally { + lock.unlock(); + } + } + + @Override + public ByteChannel channel() { + return this; + } + + @Override + public SocketAddress getRemoteAddress() { + return physicalSession.getRemoteAddress(); + } + + @Override + public SocketAddress getLocalAddress() { + return physicalSession.getLocalAddress(); + } + + @Override + public int getEventMask() { + lock.lock(); + try { + return eventMask; + } finally { + lock.unlock(); + } + } + + @Override + public void setEventMask(final int ops) { + final boolean wantOutput = (ops & SelectionKey.OP_WRITE) != 0; + lock.lock(); + try { + eventMask = ops; + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + if (wantOutput) { + requestOutput(); + } + } + + @Override + public void setEvent(final int op) { + final boolean wantOutput = (op & SelectionKey.OP_WRITE) != 0; + lock.lock(); + try { + eventMask |= op; + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + if (wantOutput) { + requestOutput(); + } + } + + @Override + public void clearEvent(final int op) { + lock.lock(); + try { + eventMask &= ~op; + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + } + + @Override + public void close() { + close(CloseMode.GRACEFUL); + } + + @Override + public void close(final CloseMode closeMode) { + boolean cancel = false; + + lock.lock(); + try { + if (status == Status.CLOSED) { + return; + } + if (closeMode == CloseMode.IMMEDIATE) { + status = Status.CLOSED; + localEndStreamSent = true; + cancelPendingCommands(); + inboundQueue.clear(); + inboundBytes = 0; + outboundQueue.clear(); + outboundBytes = 0; + consumedBytesSinceUpdate = 0; + cancel = true; + } else { + status = Status.CLOSING; + if (dataChannelRef.get() == null && outboundBytes == 0) { + status = Status.CLOSED; + localEndStreamSent = true; + cancel = true; + } + } + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + + if (cancel) { + cancelStream(); + } else { + requestOutput(); + } + } + + @Override + public Status getStatus() { + lock.lock(); + try { + return status; + } finally { + lock.unlock(); + } + } + + @Override + public Timeout getSocketTimeout() { + lock.lock(); + try { + return socketTimeout; + } finally { + lock.unlock(); + } + } + + @Override + public void setSocketTimeout(final Timeout timeout) { + lock.lock(); + try { + socketTimeout = timeout; + lastEventTime = System.currentTimeMillis(); + } finally { + lock.unlock(); + } + } + + @Override + public long getLastReadTime() { + lock.lock(); + try { + return lastReadTime; + } finally { + lock.unlock(); + } + } + + @Override + public long getLastWriteTime() { + lock.lock(); + try { + return lastWriteTime; + } finally { + lock.unlock(); + } + } + + @Override + public long getLastEventTime() { + lock.lock(); + try { + return lastEventTime; + } finally { + lock.unlock(); + } + } + + @Override + public void updateReadTime() { + final long now = System.currentTimeMillis(); + lock.lock(); + try { + lastReadTime = now; + lastEventTime = now; + } finally { + lock.unlock(); + } + } + + @Override + public void updateWriteTime() { + final long now = System.currentTimeMillis(); + lock.lock(); + try { + lastWriteTime = now; + lastEventTime = now; + } finally { + lock.unlock(); + } + } + + @Override + public int read(final ByteBuffer dst) throws IOException { + int total = 0; + int update = 0; + CapacityChannel currentCapacityChannel = null; + + lock.lock(); + try { + if (inboundQueue.isEmpty()) { + return remoteEndStream || status == Status.CLOSED ? -1 : 0; + } + while (dst.hasRemaining()) { + final ByteBuffer buffer = inboundQueue.peekFirst(); + if (buffer == null) { + break; + } + final int chunk = Math.min(dst.remaining(), buffer.remaining()); + if (chunk <= 0) { + break; + } + + if (buffer.hasArray()) { + final int pos = buffer.position(); + dst.put(buffer.array(), buffer.arrayOffset() + pos, chunk); + buffer.position(pos + chunk); + } else { + for (int i = 0; i < chunk; i++) { + dst.put(buffer.get()); + } + } + + total += chunk; + inboundBytes -= chunk; + if (!buffer.hasRemaining()) { + inboundQueue.pollFirst(); + } + } + + if (total > 0) { + consumedBytesSinceUpdate += total; + final long now = System.currentTimeMillis(); + lastReadTime = now; + lastEventTime = now; + if (capacityChannel != null && consumedBytesSinceUpdate > 0) { + currentCapacityChannel = capacityChannel; + update = consumedBytesSinceUpdate; + consumedBytesSinceUpdate = 0; + } + } + } finally { + lock.unlock(); + } + + if (currentCapacityChannel != null && update > 0) { + currentCapacityChannel.update(update); + } + return total; + } + + @Override + public int write(final ByteBuffer src) { + if (src == null || !src.hasRemaining()) { + return 0; + } + int bytesAccepted = 0; + + lock.lock(); + try { + if (status != Status.ACTIVE) { + return 0; + } + final int freeSpace = OUTBOUND_BUFFER_LIMIT - outboundBytes; + if (freeSpace <= 0) { + return 0; + } + bytesAccepted = Math.min(src.remaining(), freeSpace); + if (bytesAccepted <= 0) { + return 0; + } + + final byte[] data = new byte[bytesAccepted]; + src.get(data); + + outboundQueue.addLast(ByteBuffer.wrap(data)); + outboundBytes += bytesAccepted; + + final long now = System.currentTimeMillis(); + lastWriteTime = now; + lastEventTime = now; + } finally { + lock.unlock(); + } + + requestOutput(); + return bytesAccepted; + } + + @Override + public boolean isOpen() { + lock.lock(); + try { + return status != Status.CLOSED && physicalSession.isOpen(); + } finally { + lock.unlock(); + } + } + + @Override + public String getId() { + return id; + } + + @Override + public int getPendingCommandCount() { + lock.lock(); + try { + return commandQueue.size(); + } finally { + lock.unlock(); + } + } +} \ No newline at end of file diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/TunnelRefusedException.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/TunnelRefusedException.java new file mode 100644 index 0000000000..77acc476db --- /dev/null +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/nio/support/TunnelRefusedException.java @@ -0,0 +1,68 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.nio.support; + +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.message.BasicHttpResponse; +import org.apache.hc.core5.http.message.StatusLine; +import org.apache.hc.core5.util.Args; + +/** + * Exception indicating CONNECT tunnel refusal by the proxy. + * + * @since 5.5 + */ +public final class TunnelRefusedException extends HttpException { + + private static final long serialVersionUID = 1L; + + private final HttpResponse response; + + public TunnelRefusedException(final HttpResponse response) { + super("Tunnel refused: " + new StatusLine(Args.notNull(response, "Response"))); + this.response = copy(response); + } + + public HttpResponse getResponse() { + return response; + } + + public int getStatusCode() { + return response.getCode(); + } + + private static HttpResponse copy(final HttpResponse response) { + final BasicHttpResponse copy = new BasicHttpResponse(response.getCode(), response.getReasonPhrase()); + copy.setVersion(response.getVersion()); + for (final Header header : response.getHeaders()) { + copy.addHeader(header); + } + return copy; + } +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaH2ProxyExecutionExample.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaH2ProxyExecutionExample.java new file mode 100644 index 0000000000..c275fec2ee --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/examples/H2ViaH2ProxyExecutionExample.java @@ -0,0 +1,178 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.examples; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +import org.apache.hc.core5.concurrent.ComplexFuture; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.concurrent.FutureContribution; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http.protocol.HttpProcessorBuilder; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.impl.nio.ClientH2PrefaceHandler; +import org.apache.hc.core5.http2.impl.nio.ClientH2StreamMultiplexerFactory; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap; +import org.apache.hc.core5.http2.nio.support.H2OverH2TunnelSupport; +import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOEventHandlerFactory; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.ssl.SSLContexts; +import org.apache.hc.core5.util.Timeout; + +/** + * Full example of HTTP/2 request execution through an HTTP/2 proxy tunnel. + */ +public class H2ViaH2ProxyExecutionExample { + + private static TlsStrategy createTlsStrategy() throws Exception { + final String trustStore = System.getProperty("h2.truststore"); + if (trustStore == null || trustStore.isEmpty()) { + return new H2ClientTlsStrategy(); + } + final String trustStorePassword = System.getProperty("h2.truststore.password", "changeit"); + final SSLContext sslContext = SSLContexts.custom() + .loadTrustMaterial(new File(trustStore), trustStorePassword.toCharArray()) + .build(); + return new H2ClientTlsStrategy(sslContext); + } + + public static void main(final String[] args) throws Exception { + final String proxyScheme = System.getProperty("h2.proxy.scheme", "http"); + final String proxyHost = System.getProperty("h2.proxy.host", "localhost"); + final int proxyPort = Integer.parseInt(System.getProperty("h2.proxy.port", "8080")); + final String targetScheme = System.getProperty("h2.target.scheme", "https"); + final String targetHost = System.getProperty("h2.target.host", "origin"); + final int targetPort = Integer.parseInt(System.getProperty("h2.target.port", "9443")); + final String[] requestUris = System.getProperty("h2.paths", "/").split(","); + + final TlsStrategy tlsStrategy = createTlsStrategy(); + + final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap() + .setH2Config(H2Config.custom().setPushEnabled(false).build()) + .setTlsStrategy(tlsStrategy) + .create(); + requester.start(); + + final HttpHost proxy = new HttpHost(proxyScheme, proxyHost, proxyPort); + final HttpHost target = new HttpHost(targetScheme, targetHost, targetPort); + final Timeout timeout = Timeout.ofSeconds(30); + + final IOEventHandlerFactory tunnelProtocolStarter = (ioSession, attachment) -> + new ClientH2PrefaceHandler(ioSession, new ClientH2StreamMultiplexerFactory( + HttpProcessorBuilder.create().build(), + null, + H2Config.DEFAULT, + org.apache.hc.core5.http.config.CharCodingConfig.DEFAULT, + null), false, null); + + final ComplexFuture tunnelFuture = new ComplexFuture<>(null); + tunnelFuture.setDependency(requester.getConnPool().getSession(proxy, timeout, new FutureContribution(tunnelFuture) { + + @Override + public void completed(final IOSession proxySession) { + H2OverH2TunnelSupport.establish( + proxySession, + target, + timeout, + true, + tlsStrategy, + tunnelProtocolStarter, + new FutureContribution(tunnelFuture) { + + @Override + public void completed(final IOSession tunnelSession) { + tunnelFuture.completed(tunnelSession); + } + }); + } + + })); + + final IOSession tunnelSession = tunnelFuture.get(1, TimeUnit.MINUTES); + try { + final CountDownLatch latch = new CountDownLatch(requestUris.length); + + for (final String requestUri : requestUris) { + final String normalizedRequestUri = requestUri.trim(); + final AsyncClientExchangeHandler exchangeHandler = new org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler<>( + new BasicRequestProducer(Method.GET, target, normalizedRequestUri), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + new FutureCallback>() { + + @Override + public void completed(final Message message) { + final HttpResponse response = message.getHead(); + final String body = message.getBody(); + System.out.println(normalizedRequestUri + " -> " + response.getCode()); + System.out.println(body); + latch.countDown(); + } + + @Override + public void failed(final Exception ex) { + System.out.println(normalizedRequestUri + " -> " + ex); + latch.countDown(); + } + + @Override + public void cancelled() { + System.out.println(normalizedRequestUri + " cancelled"); + latch.countDown(); + } + + }); + + tunnelSession.enqueue( + new RequestExecutionCommand(exchangeHandler, HttpCoreContext.create()), + Command.Priority.NORMAL); + } + latch.await(); + } finally { + tunnelSession.close(CloseMode.GRACEFUL); + } + requester.close(CloseMode.GRACEFUL); + } + +} diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/nio/support/TestH2OverH2TunnelSupport.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/nio/support/TestH2OverH2TunnelSupport.java new file mode 100644 index 0000000000..3728df6ca6 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/nio/support/TestH2OverH2TunnelSupport.java @@ -0,0 +1,666 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.nio.support; + +import java.io.IOException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.EntityDetails; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpRequest; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.StreamControl; +import org.apache.hc.core5.http.message.BasicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.RequestChannel; +import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.net.NamedEndpoint; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOEventHandler; +import org.apache.hc.core5.reactor.IOEventHandlerFactory; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.reactor.ProtocolIOSession; +import org.apache.hc.core5.reactor.ssl.TransportSecurityLayer; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestH2OverH2TunnelSupport { + + @Test + void testEstablishBuildsH2ConnectForAuthorityAndCompletes() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true); + final HttpHost target = new HttpHost("https", "example.org", 443); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish(session, target, Timeout.ofSeconds(1), false, null, callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNull(callback.failed); + Assertions.assertFalse(callback.cancelled); + Assertions.assertNotNull(session.capturedRequest); + Assertions.assertEquals("CONNECT", session.capturedRequest.getMethod()); + Assertions.assertEquals("example.org", session.capturedRequest.getAuthority().getHostName()); + Assertions.assertEquals(443, session.capturedRequest.getAuthority().getPort()); + Assertions.assertNull(session.capturedRequest.getScheme()); + Assertions.assertNull(session.capturedRequest.getPath()); + } + + @Test + void testEstablishAppliesConnectRequestInterceptor() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("https", "example.org", 443), + Timeout.ofSeconds(1), + false, + null, + (request, entityDetails, context) -> request.addHeader("Proxy-Authorization", "Basic dGVzdDp0ZXN0"), + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNotNull(session.capturedRequest); + Assertions.assertEquals("Basic dGVzdDp0ZXN0", session.capturedRequest.getFirstHeader("Proxy-Authorization").getValue()); + } + + @Test + void testEstablishFailsOnRefusedTunnel() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED, false); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("https", "example.org", 443), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertFalse(callback.completed); + Assertions.assertNotNull(callback.failed); + Assertions.assertInstanceOf(TunnelRefusedException.class, callback.failed); + Assertions.assertEquals( + HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED, + ((TunnelRefusedException) callback.failed).getStatusCode()); + } + + @Test + void testEstablishFailsWhenConnectResponseHasNoTunnelStream() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, false); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("https", "example.org", 443), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertFalse(callback.completed); + Assertions.assertNotNull(callback.failed); + } + + @Test + void testEstablishWithProtocolStarterInvokesConnectedAndSeesInputBuffer() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true, false, true, false); + final RecordingProtocolStarter protocolStarter = new RecordingProtocolStarter(); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + protocolStarter, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNull(callback.failed); + Assertions.assertTrue(protocolStarter.connectedCalled); + Assertions.assertTrue(protocolStarter.inputBufferSeen); + } + + @Test + void testEstablishSecureTunnelUsesTlsStrategy() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true); + final RecordingTlsStrategy tlsStrategy = new RecordingTlsStrategy(); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("https", "example.org", 443), + Timeout.ofSeconds(1), + true, + tlsStrategy, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNull(callback.failed); + Assertions.assertTrue(tlsStrategy.invoked); + } + + @Test + void testClosingTunnelDoesNotClosePhysicalSession() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNotNull(callback.result); + callback.result.close(CloseMode.IMMEDIATE); + Assertions.assertTrue(session.isOpen(), "Closing tunnel session must not close physical HTTP/2 connection"); + } + + @Test + void testTunnelImmediateCloseCancelsStreamControlWhenPresent() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true, false, false, true); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNotNull(callback.result); + callback.result.close(CloseMode.IMMEDIATE); + + Assertions.assertTrue(session.streamCancelCalled.get(), "IMMEDIATE close must cancel the CONNECT stream"); + Assertions.assertTrue(session.isOpen(), "Cancelling tunnel stream must not close physical HTTP/2 connection"); + } + + @Test + void testTunnelWriteBufferIsBounded() throws Exception { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNotNull(callback.result); + final int payloadSize = 1024 * 1024; + final ByteBuffer src = ByteBuffer.allocate(payloadSize); + final int written = callback.result.write(src); + Assertions.assertTrue(written > 0); + Assertions.assertTrue(written < payloadSize, "Outbound writes must be bounded by tunnel buffer capacity"); + } + + @Test + void testCapacityUpdateIsNotUnbounded() { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true, true, false, false); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertTrue(session.lastCapacityUpdate > 0, "Tunnel setup should publish initial bounded capacity"); + Assertions.assertTrue(session.lastCapacityUpdate < Integer.MAX_VALUE, "Capacity must not be unbounded"); + } + + @Test + void testGracefulCloseEndsStreamAfterDrain() throws Exception { + final ScriptedProxySession session = new ScriptedProxySession(HttpStatus.SC_OK, true); + final RecordingCallback callback = new RecordingCallback<>(); + + H2OverH2TunnelSupport.establish( + session, + new HttpHost("http", "example.org", 80), + Timeout.ofSeconds(1), + false, + null, + callback); + + Assertions.assertTrue(callback.completed); + Assertions.assertNotNull(callback.result); + + final ByteBuffer src = ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5}); + final int written = callback.result.write(src); + Assertions.assertEquals(5, written); + + callback.result.close(CloseMode.GRACEFUL); + + // Drive output flush: test runs in same package -> can call package-private method. + ((H2TunnelProtocolIOSession) callback.result).onOutputReady(); + + Assertions.assertTrue(session.endStreamCalled.get(), "GRACEFUL close must end the CONNECT stream after draining"); + Assertions.assertTrue(session.isOpen(), "Ending tunnel stream must not close physical HTTP/2 connection"); + } + + static class RecordingCallback implements FutureCallback { + + volatile boolean completed; + volatile boolean cancelled; + volatile Exception failed; + volatile T result; + + @Override + public void completed(final T result) { + this.completed = true; + this.result = result; + } + + @Override + public void failed(final Exception ex) { + this.failed = ex; + } + + @Override + public void cancelled() { + this.cancelled = true; + } + } + + static class RecordingProtocolStarter implements IOEventHandlerFactory { + + volatile boolean connectedCalled; + volatile boolean inputBufferSeen; + + @Override + public IOEventHandler createHandler(final ProtocolIOSession ioSession, final Object attachment) { + return new IOEventHandler() { + + @Override + public void connected(final IOSession session) { + connectedCalled = true; + } + + @Override + public void inputReady(final IOSession session, final ByteBuffer src) { + if (src != null && src.hasRemaining()) { + inputBufferSeen = true; + } + } + + @Override + public void outputReady(final IOSession session) { + } + + @Override + public void timeout(final IOSession session, final Timeout timeout) { + } + + @Override + public void exception(final IOSession session, final Exception cause) { + } + + @Override + public void disconnected(final IOSession session) { + } + }; + } + } + + static class RecordingTlsStrategy implements TlsStrategy { + + volatile boolean invoked; + + @Override + public boolean upgrade( + final TransportSecurityLayer sessionLayer, + final HttpHost host, + final SocketAddress localAddress, + final SocketAddress remoteAddress, + final Object attachment, + final Timeout handshakeTimeout) { + invoked = true; + return true; + } + + @Override + public void upgrade( + final TransportSecurityLayer sessionLayer, + final NamedEndpoint endpoint, + final Object attachment, + final Timeout handshakeTimeout, + final FutureCallback callback) { + invoked = true; + if (callback != null) { + callback.completed(sessionLayer); + } + } + } + + static class ScriptedProxySession implements IOSession { + + private final int responseCode; + private final boolean withTunnelStream; + private final boolean signalCapacity; + private final boolean emitInputData; + private final boolean provideStreamControl; + + private final Lock lock; + private Timeout socketTimeout; + + HttpRequest capturedRequest; + volatile int lastCapacityUpdate; + + private final AtomicBoolean open; + final AtomicBoolean streamCancelCalled; + final AtomicBoolean endStreamCalled; + + ScriptedProxySession(final int responseCode, final boolean withTunnelStream) { + this(responseCode, withTunnelStream, false, false, false); + } + + ScriptedProxySession(final int responseCode, final boolean withTunnelStream, final boolean signalCapacity) { + this(responseCode, withTunnelStream, signalCapacity, false, false); + } + + ScriptedProxySession( + final int responseCode, + final boolean withTunnelStream, + final boolean signalCapacity, + final boolean emitInputData, + final boolean provideStreamControl) { + this.responseCode = responseCode; + this.withTunnelStream = withTunnelStream; + this.signalCapacity = signalCapacity; + this.emitInputData = emitInputData; + this.provideStreamControl = provideStreamControl; + + this.lock = new ReentrantLock(); + this.socketTimeout = Timeout.ofSeconds(30); + this.lastCapacityUpdate = -1; + this.open = new AtomicBoolean(true); + this.streamCancelCalled = new AtomicBoolean(false); + this.endStreamCalled = new AtomicBoolean(false); + } + + @Override + public IOEventHandler getHandler() { + return null; + } + + @Override + public void upgrade(final IOEventHandler handler) { + } + + @Override + public Lock getLock() { + return lock; + } + + @Override + public void enqueue(final Command command, final Command.Priority priority) { + if (!(command instanceof RequestExecutionCommand)) { + return; + } + + final RequestExecutionCommand requestExecutionCommand = (RequestExecutionCommand) command; + final AsyncClientExchangeHandler exchangeHandler = requestExecutionCommand.getExchangeHandler(); + final org.apache.hc.core5.http.protocol.HttpContext context = requestExecutionCommand.getContext(); + + try { + if (provideStreamControl && exchangeHandler instanceof H2OverH2TunnelExchangeHandler) { + final StreamControl streamControl = (StreamControl) Proxy.newProxyInstance( + StreamControl.class.getClassLoader(), + new Class[]{StreamControl.class}, + (proxy, method, args) -> { + if ("cancel".equals(method.getName())) { + streamCancelCalled.set(true); + return method.getReturnType() == Boolean.TYPE ? Boolean.TRUE : null; + } + return defaultValue(method); + }); + + ((H2OverH2TunnelExchangeHandler) exchangeHandler).initiated(streamControl); + } + + exchangeHandler.produceRequest((RequestChannel) (request, entityDetails, requestContext) -> capturedRequest = request, context); + + if (signalCapacity) { + exchangeHandler.updateCapacity((CapacityChannel) increment -> lastCapacityUpdate = increment); + } + + final EntityDetails responseEntityDetails = + withTunnelStream ? new org.apache.hc.core5.http.impl.BasicEntityDetails(-1, null) : null; + exchangeHandler.consumeResponse(new BasicHttpResponse(responseCode), responseEntityDetails, context); + + if (withTunnelStream && responseCode == HttpStatus.SC_OK) { + exchangeHandler.produce(new DataStreamChannel() { + + @Override + public void requestOutput() { + } + + @Override + public int write(final ByteBuffer src) { + final int remaining = src != null ? src.remaining() : 0; + if (remaining > 0 && src != null) { + final byte[] tmp = new byte[remaining]; + src.get(tmp); + } + return remaining; + } + + @Override + public void endStream() throws IOException { + + } + + @Override + public void endStream(final java.util.List trailers) { + endStreamCalled.set(true); + } + + }); + + if (emitInputData) { + exchangeHandler.consume(ByteBuffer.wrap(new byte[]{1, 2, 3})); + } + } + } catch (final Exception ex) { + exchangeHandler.failed(ex); + } + } + + private static Object defaultValue(final Method method) { + final Class rt = method.getReturnType(); + if (rt == Void.TYPE) { + return null; + } + if (rt == Boolean.TYPE) { + return false; + } + if (rt == Integer.TYPE) { + return 0; + } + if (rt == Long.TYPE) { + return 0L; + } + if (rt == Short.TYPE) { + return (short) 0; + } + if (rt == Byte.TYPE) { + return (byte) 0; + } + if (rt == Character.TYPE) { + return (char) 0; + } + if (rt == Float.TYPE) { + return 0f; + } + if (rt == Double.TYPE) { + return 0d; + } + return null; + } + + @Override + public boolean hasCommands() { + return false; + } + + @Override + public Command poll() { + return null; + } + + @Override + public ByteChannel channel() { + return this; + } + + @Override + public SocketAddress getRemoteAddress() { + return null; + } + + @Override + public SocketAddress getLocalAddress() { + return null; + } + + @Override + public int getEventMask() { + return 0; + } + + @Override + public void setEventMask(final int ops) { + } + + @Override + public void setEvent(final int op) { + } + + @Override + public void clearEvent(final int op) { + } + + @Override + public void close() { + open.set(false); + } + + @Override + public void close(final CloseMode closeMode) { + open.set(false); + } + + @Override + public Status getStatus() { + return open.get() ? Status.ACTIVE : Status.CLOSED; + } + + @Override + public Timeout getSocketTimeout() { + return socketTimeout; + } + + @Override + public void setSocketTimeout(final Timeout timeout) { + this.socketTimeout = timeout; + } + + @Override + public long getLastReadTime() { + return 0; + } + + @Override + public long getLastWriteTime() { + return 0; + } + + @Override + public long getLastEventTime() { + return 0; + } + + @Override + public void updateReadTime() { + } + + @Override + public void updateWriteTime() { + } + + @Override + public int read(final ByteBuffer dst) { + return 0; + } + + @Override + public int write(final ByteBuffer src) { + final int remaining = src != null ? src.remaining() : 0; + if (remaining > 0) { + final byte[] tmp = new byte[remaining]; + src.get(tmp); + } + return remaining; + } + + @Override + public boolean isOpen() { + return open.get(); + } + + @Override + public String getId() { + return "proxy-session"; + } + } +} \ No newline at end of file diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/nio/support/TestH2TunnelRawIOSession.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/nio/support/TestH2TunnelRawIOSession.java new file mode 100644 index 0000000000..fe5225fcd8 --- /dev/null +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/nio/support/TestH2TunnelRawIOSession.java @@ -0,0 +1,359 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.http2.nio.support; + +import java.io.IOException; +import java.lang.reflect.Proxy; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.ByteChannel; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hc.core5.http.StreamControl; +import org.apache.hc.core5.http.nio.CapacityChannel; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOEventHandler; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +class TestH2TunnelRawIOSession { + + @Test + void testCapacityInitialUpdateIsBounded() throws Exception { + final DummyPhysicalSession physical = new DummyPhysicalSession(); + final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null); + + final AtomicInteger last = new AtomicInteger(0); + raw.updateCapacityChannel(new CapacityChannel() { + @Override + public void update(final int increment) { + last.set(increment); + } + }); + + Assertions.assertTrue(last.get() > 0); + Assertions.assertTrue(last.get() < Integer.MAX_VALUE); + } + + @Test + void testAppendInputOverflowFails() throws Exception { + final DummyPhysicalSession physical = new DummyPhysicalSession(); + final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null); + + // INBOUND_BUFFER_LIMIT is 64k in the implementation; overflow by 1. + final ByteBuffer tooBig = ByteBuffer.allocate(64 * 1024 + 1); + Assertions.assertThrows(IOException.class, () -> raw.appendInput(tooBig)); + } + + @Test + void testReadTriggersCapacityUpdateOnConsumption() throws Exception { + final DummyPhysicalSession physical = new DummyPhysicalSession(); + final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null); + + final AtomicInteger last = new AtomicInteger(-1); + raw.updateCapacityChannel(new CapacityChannel() { + @Override + public void update(final int increment) { + last.set(increment); + } + }); + + raw.appendInput(ByteBuffer.wrap(new byte[]{1, 2, 3, 4})); + + final ByteBuffer dst = ByteBuffer.allocate(4); + final int n = raw.read(dst); + Assertions.assertEquals(4, n); + // Capacity update should publish the consumed bytes (4), not unbounded. + Assertions.assertEquals(4, last.get()); + } + + @Test + void testWriteIsBounded() { + final DummyPhysicalSession physical = new DummyPhysicalSession(); + final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null); + + final ByteBuffer src = ByteBuffer.allocate(1024 * 1024); + final int n = raw.write(src); + Assertions.assertTrue(n > 0); + Assertions.assertTrue(n < 1024 * 1024); + Assertions.assertEquals(64 * 1024, n, "Expected OUTBOUND_BUFFER_LIMIT (64k) write bound"); + } + + @Test + void testImmediateCloseCancelsStreamControlButNotPhysicalSession() { + final DummyPhysicalSession physical = new DummyPhysicalSession(); + + final AtomicBoolean cancelled = new AtomicBoolean(false); + final StreamControl streamControl = (StreamControl) Proxy.newProxyInstance( + StreamControl.class.getClassLoader(), + new Class[]{StreamControl.class}, + (proxy, method, args) -> { + final String name = method.getName(); + final Class rt = method.getReturnType(); + + if ("cancel".equals(name)) { + cancelled.set(true); + // IMPORTANT: cancel() may return boolean (Cancellable) + return rt == Boolean.TYPE ? Boolean.TRUE : null; + } + + if (rt == Void.TYPE) { + return null; + } + if (rt == Boolean.TYPE) { + return false; + } + if (rt == Integer.TYPE) { + return 0; + } + if (rt == Long.TYPE) { + return 0L; + } + if (rt == Short.TYPE) { + return (short) 0; + } + if (rt == Byte.TYPE) { + return (byte) 0; + } + if (rt == Character.TYPE) { + return (char) 0; + } + if (rt == Float.TYPE) { + return 0f; + } + if (rt == Double.TYPE) { + return 0d; + } + return null; + }); + + final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), streamControl); + raw.close(CloseMode.IMMEDIATE); + + Assertions.assertTrue(cancelled.get(), "IMMEDIATE close must cancel stream"); + Assertions.assertTrue(physical.isOpen(), "Tunnel close must not close physical HTTP/2 connection"); + } + + @Test + void testGracefulCloseEndsStreamAfterDrain() throws Exception { + final DummyPhysicalSession physical = new DummyPhysicalSession(); + + final AtomicBoolean endStreamCalled = new AtomicBoolean(false); + rawAttachChannel(physical, endStreamCalled); + + final H2TunnelRawIOSession raw = new H2TunnelRawIOSession(physical, Timeout.ofSeconds(5), null); + raw.attachChannel(physical.dataChannel); + + // Put some outbound bytes + final ByteBuffer src = ByteBuffer.wrap(new byte[]{1, 2, 3, 4, 5}); + final int written = raw.write(src); + Assertions.assertEquals(5, written); + + raw.close(CloseMode.GRACEFUL); + raw.flushOutput(); + + Assertions.assertTrue(endStreamCalled.get(), "GRACEFUL close must endStream once outbound drained"); + } + + private static void rawAttachChannel(final DummyPhysicalSession physical, final AtomicBoolean endStreamCalled) { + physical.dataChannel = new DataStreamChannel() { + + @Override + public void requestOutput() { + } + + @Override + public int write(final ByteBuffer src) { + final int remaining = src != null ? src.remaining() : 0; + if (remaining > 0 && src != null) { + final byte[] tmp = new byte[remaining]; + src.get(tmp); + } + return remaining; + } + + @Override + public void endStream() throws IOException { + + } + + @Override + public void endStream(final java.util.List trailers) { + endStreamCalled.set(true); + } + }; + } + + static final class DummyPhysicalSession implements IOSession { + + private final Lock lock = new ReentrantLock(); + private volatile boolean open = true; + private volatile Timeout socketTimeout = Timeout.ofSeconds(30); + + volatile DataStreamChannel dataChannel; + + @Override + public IOEventHandler getHandler() { + return null; + } + + @Override + public void upgrade(final IOEventHandler handler) { + } + + @Override + public Lock getLock() { + return lock; + } + + @Override + public void enqueue(final Command command, final Command.Priority priority) { + } + + @Override + public boolean hasCommands() { + return false; + } + + @Override + public Command poll() { + return null; + } + + @Override + public ByteChannel channel() { + return this; + } + + @Override + public SocketAddress getRemoteAddress() { + return null; + } + + @Override + public SocketAddress getLocalAddress() { + return null; + } + + @Override + public int getEventMask() { + return 0; + } + + @Override + public void setEventMask(final int ops) { + } + + @Override + public void setEvent(final int op) { + } + + @Override + public void clearEvent(final int op) { + } + + @Override + public void close() { + open = false; + } + + @Override + public void close(final CloseMode closeMode) { + open = false; + } + + @Override + public Status getStatus() { + return open ? Status.ACTIVE : Status.CLOSED; + } + + @Override + public Timeout getSocketTimeout() { + return socketTimeout; + } + + @Override + public void setSocketTimeout(final Timeout timeout) { + socketTimeout = timeout; + } + + @Override + public long getLastReadTime() { + return 0; + } + + @Override + public long getLastWriteTime() { + return 0; + } + + @Override + public long getLastEventTime() { + return 0; + } + + @Override + public void updateReadTime() { + } + + @Override + public void updateWriteTime() { + } + + @Override + public int read(final ByteBuffer dst) { + return 0; + } + + @Override + public int write(final ByteBuffer src) { + final int remaining = src != null ? src.remaining() : 0; + if (remaining > 0 && src != null) { + final byte[] tmp = new byte[remaining]; + src.get(tmp); + } + return remaining; + } + + @Override + public boolean isOpen() { + return open; + } + + @Override + public String getId() { + return "dummy-physical"; + } + } +} \ No newline at end of file diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/H2OverH2TunnelContainerIT.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/H2OverH2TunnelContainerIT.java new file mode 100644 index 0000000000..d5aed28f08 --- /dev/null +++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/H2OverH2TunnelContainerIT.java @@ -0,0 +1,323 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.core5.testing.nio; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; + +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.HttpStatus; +import org.apache.hc.core5.http.Message; +import org.apache.hc.core5.http.Method; +import org.apache.hc.core5.http.config.CharCodingConfig; +import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; +import org.apache.hc.core5.http.nio.command.RequestExecutionCommand; +import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer; +import org.apache.hc.core5.http.nio.ssl.TlsStrategy; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.http.nio.support.BasicResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpCoreContext; +import org.apache.hc.core5.http.protocol.HttpProcessorBuilder; +import org.apache.hc.core5.http2.config.H2Config; +import org.apache.hc.core5.http2.impl.nio.ClientH2PrefaceHandler; +import org.apache.hc.core5.http2.impl.nio.ClientH2StreamMultiplexerFactory; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequester; +import org.apache.hc.core5.http2.impl.nio.bootstrap.H2MultiplexingRequesterBootstrap; +import org.apache.hc.core5.http2.nio.support.H2OverH2TunnelSupport; +import org.apache.hc.core5.http2.ssl.H2ClientTlsStrategy; +import org.apache.hc.core5.io.CloseMode; +import org.apache.hc.core5.reactor.Command; +import org.apache.hc.core5.reactor.IOEventHandlerFactory; +import org.apache.hc.core5.reactor.IOSession; +import org.apache.hc.core5.ssl.SSLContexts; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import org.testcontainers.utility.MountableFile; + +/** + * Full integration test for HTTP/2 CONNECT tunneling over a real proxy container. + */ +@Testcontainers(disabledWithoutDocker = true) +class H2OverH2TunnelContainerIT { + + private static final String ORIGIN_BODY = "h2-tunnel-it-ok"; + private static final int ORIGIN_PORT = 9443; + private static final int PROXY_PORT = 8080; + private static final Duration STARTUP_TIMEOUT = Duration.ofSeconds(90); + private static final String ORIGIN_ALIAS = "h2-origin"; + + private static Network network; + private static GenericContainer originContainer; + private static GenericContainer proxyContainer; + + @BeforeAll + @SuppressWarnings("resource") + static void setUp() throws Exception { + network = Network.newNetwork(); + + originContainer = new GenericContainer<>(DockerImageName.parse("nginx:1.27.4-alpine")) + .withNetwork(network) + .withNetworkAliases(ORIGIN_ALIAS) + .withCopyFileToContainer( + MountableFile.forClasspathResource("h2-tunnel-it/nginx-tls.conf"), + "/etc/nginx/nginx.conf") + .withCopyFileToContainer( + MountableFile.forClasspathResource("h2-tunnel-it/certs/origin.crt"), + "/etc/nginx/certs/origin.crt") + .withCopyFileToContainer( + MountableFile.forClasspathResource("h2-tunnel-it/certs/origin.key"), + "/etc/nginx/certs/origin.key") + .withExposedPorts(ORIGIN_PORT) + .waitingFor(Wait.forListeningPort()) + .withStartupTimeout(STARTUP_TIMEOUT); + originContainer.start(); + + proxyContainer = new GenericContainer<>(DockerImageName.parse("envoyproxy/envoy:v1.31.2")) + .withNetwork(network) + .withCommand("envoy", "-c", "/etc/envoy/envoy.yaml", "-l", "info") + .withCopyFileToContainer( + MountableFile.forClasspathResource("h2-tunnel-it/envoy.yaml"), + "/etc/envoy/envoy.yaml") + .withExposedPorts(PROXY_PORT) + .waitingFor(Wait.forListeningPort()) + .withStartupTimeout(STARTUP_TIMEOUT); + proxyContainer.start(); + } + + @AfterAll + static void tearDown() { + if (proxyContainer != null) { + proxyContainer.close(); + } + if (originContainer != null) { + originContainer.close(); + } + if (network != null) { + network.close(); + } + } + + @Test + void testHttp2TunnelOverHttp2ProxyAndConnectionReuse() throws Exception { + final Timeout timeout = Timeout.ofSeconds(30); + final HttpHost proxy = new HttpHost( + "http", + proxyContainer.getHost(), + proxyContainer.getMappedPort(PROXY_PORT)); + final HttpHost target = new HttpHost("https", ORIGIN_ALIAS, ORIGIN_PORT); + + final H2MultiplexingRequester requester = H2MultiplexingRequesterBootstrap.bootstrap() + .setH2Config(H2Config.custom().setPushEnabled(false).build()) + .create(); + requester.start(); + try { + final IOSession proxySession = awaitProxySession(requester, proxy, timeout); + + final IOSession tunnelA = awaitTunnelSession( + proxySession, + target, + timeout, + true, + createTunnelTlsStrategy()); + assertOriginResponse(executeGet(tunnelA, target, "/a")); + + final IOSession tunnelB = awaitTunnelSession( + proxySession, + target, + timeout, + true, + createTunnelTlsStrategy()); + assertOriginResponse(executeGet(tunnelB, target, "/b")); + + tunnelA.close(CloseMode.IMMEDIATE); + Assertions.assertTrue(proxySession.isOpen(), + "Closing one tunnel must not close the shared proxy HTTP/2 connection"); + + assertOriginResponse(executeGet(tunnelB, target, "/still-open")); + tunnelB.close(CloseMode.GRACEFUL); + } finally { + requester.close(CloseMode.GRACEFUL); + } + } + + private static IOEventHandlerFactory tunnelProtocolStarter() { + return (ioSession, attachment) -> new ClientH2PrefaceHandler( + ioSession, + new ClientH2StreamMultiplexerFactory( + HttpProcessorBuilder.create().build(), + null, + H2Config.DEFAULT, + CharCodingConfig.DEFAULT, + null), + false, + null); + } + + private static IOSession awaitProxySession( + final H2MultiplexingRequester requester, + final HttpHost proxy, + final Timeout timeout) throws Exception { + + final CompletableFuture resultFuture = new CompletableFuture<>(); + requester.getConnPool().getSession(proxy, timeout, new FutureCallback() { + + @Override + public void completed(final IOSession result) { + resultFuture.complete(result); + } + + @Override + public void failed(final Exception ex) { + resultFuture.completeExceptionally(ex); + } + + @Override + public void cancelled() { + resultFuture.cancel(false); + } + + }); + return resultFuture.get(1, TimeUnit.MINUTES); + } + + private static IOSession awaitTunnelSession( + final IOSession proxySession, + final HttpHost target, + final Timeout timeout, + final boolean secure, + final TlsStrategy tlsStrategy) throws Exception { + + final CompletableFuture resultFuture = new CompletableFuture<>(); + H2OverH2TunnelSupport.establish( + proxySession, + target, + timeout, + secure, + tlsStrategy, + tunnelProtocolStarter(), + new FutureCallback() { + + @Override + public void completed(final IOSession result) { + resultFuture.complete(result); + } + + @Override + public void failed(final Exception ex) { + resultFuture.completeExceptionally(ex); + } + + @Override + public void cancelled() { + resultFuture.cancel(false); + } + + }); + try { + return resultFuture.get(1, TimeUnit.MINUTES); + } catch (final Exception ex) { + final String proxyLogs = proxyContainer != null ? proxyContainer.getLogs() : ""; + final String originLogs = originContainer != null ? originContainer.getLogs() : ""; + throw new IllegalStateException( + "Tunnel establishment failed. Proxy logs:\n" + proxyLogs + "\nOrigin logs:\n" + originLogs, + ex); + } + } + + private static Message executeGet( + final IOSession tunnelSession, + final HttpHost target, + final String requestUri) throws Exception { + + final CompletableFuture> responseFuture = new CompletableFuture<>(); + + final AsyncClientExchangeHandler exchangeHandler = new org.apache.hc.core5.http.nio.support.BasicClientExchangeHandler<>( + new BasicRequestProducer(Method.GET, target, requestUri), + new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), + new FutureCallback>() { + + @Override + public void completed(final Message message) { + responseFuture.complete(message); + } + + @Override + public void failed(final Exception ex) { + responseFuture.completeExceptionally(ex); + } + + @Override + public void cancelled() { + responseFuture.cancel(false); + } + + }); + + tunnelSession.enqueue( + new RequestExecutionCommand(exchangeHandler, HttpCoreContext.create()), + Command.Priority.NORMAL); + + try { + return responseFuture.get(1, TimeUnit.MINUTES); + } catch (final Exception ex) { + final String proxyLogs = proxyContainer != null ? proxyContainer.getLogs() : ""; + final String originLogs = originContainer != null ? originContainer.getLogs() : ""; + throw new IllegalStateException( + "Tunnel request failed. Proxy logs:\n" + proxyLogs + "\nOrigin logs:\n" + originLogs, + ex); + } + } + + private static TlsStrategy createTunnelTlsStrategy() throws Exception { + final SSLContext sslContext = SSLContexts.custom() + .loadTrustMaterial(null, (chain, authType) -> true) + .build(); + return new H2ClientTlsStrategy(sslContext); + } + + private static void assertOriginResponse(final Message message) { + Assertions.assertNotNull(message); + Assertions.assertNotNull(message.getHead()); + Assertions.assertEquals(HttpStatus.SC_OK, message.getHead().getCode()); + Assertions.assertNotNull(message.getBody()); + Assertions.assertEquals(ORIGIN_BODY, message.getBody().trim()); + } + +} \ No newline at end of file diff --git a/httpcore5-testing/src/test/resources/h2-tunnel-it/certs/origin.crt b/httpcore5-testing/src/test/resources/h2-tunnel-it/certs/origin.crt new file mode 100644 index 0000000000..6477bf6bfd --- /dev/null +++ b/httpcore5-testing/src/test/resources/h2-tunnel-it/certs/origin.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDHzCCAgegAwIBAgIUN0piqpG3WxzuNDapinZ4C5iLoWcwDQYJKoZIhvcNAQEL +BQAwFDESMBAGA1UEAwwJaDItb3JpZ2luMB4XDTI2MDMwMzEzNDcwNFoXDTM2MDIy +OTEzNDcwNFowFDESMBAGA1UEAwwJaDItb3JpZ2luMIIBIjANBgkqhkiG9w0BAQEF +AAOCAQ8AMIIBCgKCAQEAumfV8zdii2PnkhxIth3gvW9bTagDTWBGn3xjLLlbqh+i +5WqdEXfV2TzE8Ps36WweJTOQxOJQUjcuprxLHu+eCdD8WOHby4eCNez0IY8dB2At +IQE6IzQJV/3+ir+dRL4vBJPWplRNJmuDVbsbatyaIHWWRcXeZimxgTuUs0Lkp7QR +l32C6uEnycDH+ftKJbLNZa0iBmDYar7CJ5Sdf64S9kwhU/eoCDB6woZCGzL55gHe +jHdqNNKYB+INfLcpdcXzPGWrCJNYlCfRMw05UHXCydCoAA85tdd219X5pvtxwCo7 +aFYueSFFpiBcBqCZQYOEXdSoCR/8Hjvqh4zcHCychQIDAQABo2kwZzAdBgNVHQ4E +FgQUtZmPTJvO/Z7jdiBnal8cxGVZhhowHwYDVR0jBBgwFoAUtZmPTJvO/Z7jdiBn +al8cxGVZhhowDwYDVR0TAQH/BAUwAwEB/zAUBgNVHREEDTALggloMi1vcmlnaW4w +DQYJKoZIhvcNAQELBQADggEBAB5woEcorqa5b61ijwK6q98/61EUqEVMiGqu93Ta +i75abG4Nk6Jdhh6QskLVOb1gqlmwzDQ8IobO3dodm4SrmzusZg7tIHoPcZAGf35a +e+J5nH8ryAAKwvhRV/Tdu1pQi5EGc+WW/18zJZGyxjDincbUTl1uwR8uNmTsjawr +zz09NF9nUTR04dFu39Wv9kOHDptuloz82Jw7VdjtUhuDfheNOTOxERxU5JKhJhcJ +XINkOL8NV9mqXFadQKkViaTjcAdbtXmNd0BxDxlCU3nLLL7jk1AaViSuumQjYo7R +5dpxYuk/45FF0Jb4vDC+r40Jue7r2O5+cZp7IO3E+YEybMI= +-----END CERTIFICATE----- diff --git a/httpcore5-testing/src/test/resources/h2-tunnel-it/certs/origin.key b/httpcore5-testing/src/test/resources/h2-tunnel-it/certs/origin.key new file mode 100644 index 0000000000..a381c78a77 --- /dev/null +++ b/httpcore5-testing/src/test/resources/h2-tunnel-it/certs/origin.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvgIBADANBgkqhkiG9w0BAQEFAASCBKgwggSkAgEAAoIBAQC6Z9XzN2KLY+eS +HEi2HeC9b1tNqANNYEaffGMsuVuqH6Llap0Rd9XZPMTw+zfpbB4lM5DE4lBSNy6m +vEse754J0PxY4dvLh4I17PQhjx0HYC0hATojNAlX/f6Kv51Evi8Ek9amVE0ma4NV +uxtq3JogdZZFxd5mKbGBO5SzQuSntBGXfYLq4SfJwMf5+0olss1lrSIGYNhqvsIn +lJ1/rhL2TCFT96gIMHrChkIbMvnmAd6Md2o00pgH4g18tyl1xfM8ZasIk1iUJ9Ez +DTlQdcLJ0KgADzm113bX1fmm+3HAKjtoVi55IUWmIFwGoJlBg4Rd1KgJH/weO+qH +jNwcLJyFAgMBAAECggEAAu2dpUfx8tmbaiaql73JaYBl0Ub54k3IXjoAftPclkQP +9YWiuQMGZ3a2a0iu/Ko3oQL527XoaBo4z+K2VWKTO3k+dZD6uGxFBd7WiO5sGNEQ +dGvGA4aOPQUe6gQPjuRj7bD61rsNSTS4J/EcAaY8f5UJSshMcZNnF+4dLGG5IM9C +9RaP2npSeGE55y0cmUw0FqlxnuQYocGdZHZUglngRan22VI+gY/8qrZ4Q4BNWdPn +EGE28lYCxRkzRocybV5dUsgaXIYOx+Wl1+XprcZWZqVlauQ5UMMBVJg9mxaNGStn +C5Fqa86lAtNHIJcMiSey/k6qpBGdIBhIjW1V1EMcAwKBgQDogkLm59E8h8H66Hq/ +M0Uh0n8x/8nxLoDBzn7ZNF6VDwRvD+R8LuGvS4UwmBUJmAj8fbCFf3IOTRCuup+d +cEzYD7dMUiVn+wJjIoT8WsKxMZ0VRBOSuOOraGYgPgIIcLXhpMH4EC3oLChedWhI +kQSL84aX0D1gyMOiNpffs2ZGXwKBgQDNPSKA5czqtdL3PcK+UZd5YNEdkoiZcKYn +niiMTLTqj1GD8X7C9efqP+Qa2CTQUaOeOfmqAti9rGcY83VtytcNtdZ+YsYj1Zbi +rUBa4I8X0LLfn9lI/xxN1b6gQoQ+fDbAt+bFldF3t9B28sbkmBGFaAb1TtQizdia +co35plWfmwKBgQDmXbG1oDeSdpu+YrrDWCQF186IlnvaB44w98x8nkOcAk4NUDy8 +waKAER48wGIPqGA28r2D93rlKnv98xAUaGDqrd+ZscY4GN4LpPcIJVDDSXnuyQ1v +kNqaSQzuoyFWhX3fvGMmybkCUUYKGN+jDnPnyfgv0HYPv7r9rIObc99AlwKBgEeI +eVAnyCY+PUuDMS8YTQ03G2uNOSMRyjegvk04Jw5h6W1tbFsTTkOtBRn+H8ajzb1G +Q6hn2ZcyUbS2lkUwH4hdyma+koTG2xIihH2oKveH+/BJTHhOwlS2nPxKcsE8lfDR +qBNRxnJNlNEAiSX/govW2CYD1ZhT2pzqNGXA/bLlAoGBAL7EPoMbfPIZitrCrApA +wfI2XUUCjJFF+s+eKHYnrNB9GDsPUxckg7jIhf5ERSWtzbcSD7wXIYcVYBBdtZ9o +/p5XmsHfRNQn6PzVtc/R9JzEkAk2a2m66ec+QbHs561f36WXNPDhbASuWY7O6NYe +086+jvxYtE6/W9zuoEEygTxo +-----END PRIVATE KEY----- diff --git a/httpcore5-testing/src/test/resources/h2-tunnel-it/envoy.yaml b/httpcore5-testing/src/test/resources/h2-tunnel-it/envoy.yaml new file mode 100644 index 0000000000..4d1018d46c --- /dev/null +++ b/httpcore5-testing/src/test/resources/h2-tunnel-it/envoy.yaml @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +static_resources: + listeners: + - name: h2_connect_proxy_listener + address: + socket_address: + address: 0.0.0.0 + port_value: 8080 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: h2_connect_proxy + codec_type: HTTP2 + http2_protocol_options: + allow_connect: true + route_config: + name: local_route + virtual_hosts: + - name: any + domains: ["*"] + routes: + - match: + connect_matcher: {} + route: + cluster: dynamic_forward_proxy_cluster + timeout: 0s + upgrade_configs: + - upgrade_type: CONNECT + connect_config: {} + access_log: + - name: envoy.access_loggers.stdout + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StdoutAccessLog + log_format: + text_format_source: + inline_string: "[%START_TIME%] \"%REQ(:METHOD)% %REQ(:AUTHORITY)% %PROTOCOL%\" %RESPONSE_CODE% %RESPONSE_FLAGS% upstream=%UPSTREAM_HOST%\n" + http_filters: + - name: envoy.filters.http.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.dynamic_forward_proxy.v3.FilterConfig + dns_cache_config: + name: local_dns_cache + dns_lookup_family: V4_ONLY + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + + clusters: + - name: dynamic_forward_proxy_cluster + connect_timeout: 5s + lb_policy: CLUSTER_PROVIDED + cluster_type: + name: envoy.clusters.dynamic_forward_proxy + typed_config: + "@type": type.googleapis.com/envoy.extensions.clusters.dynamic_forward_proxy.v3.ClusterConfig + dns_cache_config: + name: local_dns_cache + dns_lookup_family: V4_ONLY \ No newline at end of file diff --git a/httpcore5-testing/src/test/resources/h2-tunnel-it/nginx-tls.conf b/httpcore5-testing/src/test/resources/h2-tunnel-it/nginx-tls.conf new file mode 100644 index 0000000000..4bd8f2c374 --- /dev/null +++ b/httpcore5-testing/src/test/resources/h2-tunnel-it/nginx-tls.conf @@ -0,0 +1,35 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +worker_processes 1; + +events { + worker_connections 1024; +} + +http { + server { + listen 9443 ssl http2; + server_name h2-origin; + + ssl_certificate /etc/nginx/certs/origin.crt; + ssl_certificate_key /etc/nginx/certs/origin.key; + ssl_protocols TLSv1.2 TLSv1.3; + + location / { + default_type text/plain; + return 200 "h2-tunnel-it-ok"; + } + } +} \ No newline at end of file